From b98bf5092071f83a4fabbef03e38242c34117047 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Sun, 8 Jan 2023 16:33:29 +0000 Subject: [PATCH] Persist users, channels and channel members to the database --- migrations/2023010814480_initial-schema.sql | 19 +++++++++++++++++++ src/channel.rs | 25 +++++++++++++++++++++++++ src/main.rs | 24 +++++++++++++++++------- src/persistence.rs | 83 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/persistence/events.rs | 24 ++++++++++++++++++++++++ src/server.rs | 5 ++++- 6 files changed, 172 insertions(+), 8 deletions(-) create mode 100644 src/persistence.rs create mode 100644 src/persistence/events.rs diff --git a/migrations/2023010814480_initial-schema.sql b/migrations/2023010814480_initial-schema.sql index 9f0ab38..9d8fdd6 100644 --- a/migrations/2023010814480_initial-schema.sql +++ b/migrations/2023010814480_initial-schema.sql @@ -1,6 +1,25 @@ CREATE TABLE users ( + id INTEGER PRIMARY KEY, username VARCHAR(255) NOT NULL, password VARCHAR(255) NOT NULL ); CREATE UNIQUE INDEX users_username ON users(username); + +CREATE TABLE channels ( + id INTEGER PRIMARY KEY, + name VARCHAR(255) NOT NULL +); + +CREATE UNIQUE INDEX channel_name ON channels(name); + +CREATE TABLE channel_users ( + channel INT NOT NULL, + user INT NOT NULL, + permissions INT NOT NULL DEFAULT 0, + in_channel BOOLEAN DEFAULT false, + FOREIGN KEY(user) REFERENCES users(id), + FOREIGN KEY(channel) REFERENCES channels(id) +); + +CREATE UNIQUE INDEX channel_user ON channel_users(channel, user); diff --git a/src/channel.rs b/src/channel.rs index 50b607e..1eb4c5f 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -20,6 +20,7 @@ use crate::{ ChannelMemberList, ChannelMessage, ChannelPart, ChannelUpdateTopic, FetchClientByNick, ServerDisconnect, UserKickedFromChannel, UserNickChange, }, + persistence::Persistence, server::Server, }; @@ -30,10 +31,18 @@ pub struct Channel { pub server: Addr, pub clients: HashMap, InitiatedConnection>, pub topic: Option, + pub persistence: Addr, } impl Actor for Channel { type Context = Context; + + fn started(&mut self, _ctx: &mut Self::Context) { + self.persistence + .do_send(crate::persistence::events::ChannelCreated { + name: self.name.to_string(), + }); + } } impl Supervised for Channel {} @@ -123,6 +132,14 @@ impl Handler for Channel { fn handle(&mut self, msg: ChannelJoin, ctx: &mut Self::Context) -> Self::Result { info!(self.name, msg.connection.nick, "User is joining channel"); + // persist the user's join to the database + self.persistence + .do_send(crate::persistence::events::ChannelJoined { + channel_name: self.name.to_string(), + username: msg.connection.user.to_string(), + span: msg.span.clone(), + }); + self.clients .insert(msg.client.clone(), msg.connection.clone()); @@ -253,6 +270,14 @@ impl Handler for Channel { return; }; + // update the client's state in the database + self.persistence + .do_send(crate::persistence::events::ChannelParted { + channel_name: self.name.to_string(), + username: client_info.user.to_string(), + span: msg.span.clone(), + }); + let message = Broadcast { message: irc_proto::Message { tags: None, diff --git a/src/main.rs b/src/main.rs index dd746c2..f551b49 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,7 +23,9 @@ use tokio_util::codec::FramedRead; use tracing::{error, info, info_span, Instrument}; use tracing_subscriber::EnvFilter; -use crate::{client::Client, config::Args, messages::UserConnected, server::Server}; +use crate::{ + client::Client, config::Args, messages::UserConnected, persistence::Persistence, server::Server, +}; pub mod channel; pub mod client; @@ -31,6 +33,7 @@ pub mod config; pub mod connection; pub mod database; pub mod messages; +pub mod persistence; pub mod server; pub const SERVER_NAME: &str = "my.cool.server"; @@ -74,17 +77,24 @@ async fn main() -> anyhow::Result<()> { let listen_address = opts.config.listen_address; let client_threads = opts.config.client_threads; - let server = { + let server_arbiter = Arbiter::new(); + + let persistence = { let database = database.clone(); - Supervisor::start_in_arbiter(&Arbiter::new().handle(), move |_ctx| Server { - channels: HashMap::default(), - clients: HashMap::default(), - channel_arbiters: build_arbiters(opts.config.channel_threads), - config: opts.config, + Supervisor::start_in_arbiter(&server_arbiter.handle(), move |_ctx| Persistence { database, }) }; + + let server = Supervisor::start_in_arbiter(&server_arbiter.handle(), move |_ctx| Server { + channels: HashMap::default(), + clients: HashMap::default(), + channel_arbiters: build_arbiters(opts.config.channel_threads), + config: opts.config, + persistence, + }); + let listener = TcpListener::bind(listen_address).await?; actix_rt::spawn(start_tcp_acceptor_loop( diff --git a/src/persistence.rs b/src/persistence.rs new file mode 100644 index 0000000..8bcaa3d --- /dev/null +++ b/src/persistence.rs @@ -0,0 +1,83 @@ +pub mod events; + +use actix::{Context, Handler, ResponseFuture}; +use tracing::instrument; + +use crate::persistence::events::{ChannelCreated, ChannelJoined, ChannelParted}; + +/// Takes events destined for other actors and persists them to the database. +pub struct Persistence { + pub database: sqlx::Pool, +} + +impl actix::Supervised for Persistence {} + +impl actix::Actor for Persistence { + type Context = Context; +} + +/// Create a new channel in the database, if one doesn't already exist. +impl Handler for Persistence { + type Result = ResponseFuture<()>; + + fn handle(&mut self, msg: ChannelCreated, _ctx: &mut Self::Context) -> Self::Result { + let conn = self.database.clone(); + + Box::pin(async move { + sqlx::query("INSERT OR IGNORE INTO channels (name) VALUES (?)") + .bind(msg.name) + .execute(&conn) + .await + .unwrap(); + }) + } +} + +/// Insert a new channel member into the database. +impl Handler for Persistence { + type Result = ResponseFuture<()>; + + #[instrument(parent = &msg.span, skip_all)] + fn handle(&mut self, msg: ChannelJoined, _ctx: &mut Self::Context) -> Self::Result { + let conn = self.database.clone(); + + Box::pin(async move { + sqlx::query( + "INSERT INTO channel_users (channel, user, permissions, in_channel) + VALUES ((SELECT id FROM channels WHERE name = ?), (SELECT id FROM users WHERE username = ?), ?, ?) + ON CONFLICT(channel, user) DO UPDATE SET in_channel = excluded.in_channel" + ) + .bind(msg.channel_name) + .bind(msg.username) + .bind(0i32) + .bind(true) + .execute(&conn) + .await + .unwrap(); + }) + } +} + +/// Update a user to not being in a channel anymore. +impl Handler for Persistence { + type Result = ResponseFuture<()>; + + #[instrument(parent = &msg.span, skip_all)] + fn handle(&mut self, msg: ChannelParted, _ctx: &mut Self::Context) -> Self::Result { + let conn = self.database.clone(); + + Box::pin(async move { + sqlx::query( + "UPDATE channel_users + SET in_channel = false + WHERE channel = (SELECT id FROM channels WHERE name = ?) + AND user = (SELECT id FROM users WHERE username = ?)", + ) + .bind(msg.channel_name) + .bind(msg.username) + .execute(&conn) + .await + .unwrap(); + }) + } +} diff --git a/src/persistence/events.rs b/src/persistence/events.rs new file mode 100644 index 0000000..beaeb53 --- /dev/null +++ b/src/persistence/events.rs @@ -0,0 +1,24 @@ +use actix::Message; +use tracing::Span; + +#[derive(Message)] +#[rtype(result = "()")] +pub struct ChannelCreated { + pub name: String, +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct ChannelJoined { + pub channel_name: String, + pub username: String, + pub span: Span, +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct ChannelParted { + pub channel_name: String, + pub username: String, + pub span: Span, +} diff --git a/src/server.rs b/src/server.rs index b78cec7..24c7214 100644 --- a/src/server.rs +++ b/src/server.rs @@ -24,6 +24,7 @@ use crate::{ FetchClientByNick, ServerDisconnect, ServerFetchMotd, UserConnected, UserNickChange, UserNickChangeInternal, }, + persistence::Persistence, server::response::Motd, SERVER_NAME, }; @@ -34,7 +35,7 @@ pub struct Server { pub channels: HashMap>, pub clients: HashMap, InitiatedConnection>, pub config: Config, - pub database: sqlx::Pool, + pub persistence: Addr, } impl Supervised for Server {} @@ -158,12 +159,14 @@ impl Handler for Server { let channel_name = msg.channel_name.clone(); let server = ctx.address(); + let persistence = self.persistence.clone(); Supervisor::start_in_arbiter(&arbiter, move |_ctx| Channel { name: channel_name, clients: HashMap::new(), topic: None, server, + persistence, }) }) .clone(); -- libgit2 1.7.2