Persist users, channels and channel members to the database
Diff
migrations/2023010814480_initial-schema.sql | 19 +++++++++++++++++++
src/channel.rs | 25 +++++++++++++++++++++++++
src/main.rs | 24 +++++++++++++++++++-----
src/persistence.rs | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/server.rs | 5 ++++-
src/persistence/events.rs | 24 ++++++++++++++++++++++++
6 files changed, 172 insertions(+), 8 deletions(-)
@@ -1,6 +1,25 @@
CREATE TABLE users (
id INTEGER PRIMARY KEY,
username VARCHAR(255) NOT NULL,
password VARCHAR(255) NOT NULL
);
CREATE UNIQUE INDEX users_username ON users(username);
CREATE TABLE channels (
id INTEGER PRIMARY KEY,
name VARCHAR(255) NOT NULL
);
CREATE UNIQUE INDEX channel_name ON channels(name);
CREATE TABLE channel_users (
channel INT NOT NULL,
user INT NOT NULL,
permissions INT NOT NULL DEFAULT 0,
in_channel BOOLEAN DEFAULT false,
FOREIGN KEY(user) REFERENCES users(id),
FOREIGN KEY(channel) REFERENCES channels(id)
);
CREATE UNIQUE INDEX channel_user ON channel_users(channel, user);
@@ -20,6 +20,7 @@
ChannelMemberList, ChannelMessage, ChannelPart, ChannelUpdateTopic, FetchClientByNick,
ServerDisconnect, UserKickedFromChannel, UserNickChange,
},
persistence::Persistence,
server::Server,
};
@@ -30,10 +31,18 @@
pub server: Addr<Server>,
pub clients: HashMap<Addr<Client>, InitiatedConnection>,
pub topic: Option<CurrentChannelTopic>,
pub persistence: Addr<Persistence>,
}
impl Actor for Channel {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
self.persistence
.do_send(crate::persistence::events::ChannelCreated {
name: self.name.to_string(),
});
}
}
impl Supervised for Channel {}
@@ -123,6 +132,14 @@
fn handle(&mut self, msg: ChannelJoin, ctx: &mut Self::Context) -> Self::Result {
info!(self.name, msg.connection.nick, "User is joining channel");
self.persistence
.do_send(crate::persistence::events::ChannelJoined {
channel_name: self.name.to_string(),
username: msg.connection.user.to_string(),
span: msg.span.clone(),
});
self.clients
.insert(msg.client.clone(), msg.connection.clone());
@@ -252,6 +269,14 @@
let Some(client_info) = self.clients.remove(&msg.client) else {
return;
};
self.persistence
.do_send(crate::persistence::events::ChannelParted {
channel_name: self.name.to_string(),
username: client_info.user.to_string(),
span: msg.span.clone(),
});
let message = Broadcast {
message: irc_proto::Message {
@@ -23,7 +23,9 @@
use tracing::{error, info, info_span, Instrument};
use tracing_subscriber::EnvFilter;
use crate::{client::Client, config::Args, messages::UserConnected, server::Server};
use crate::{
client::Client, config::Args, messages::UserConnected, persistence::Persistence, server::Server,
};
pub mod channel;
pub mod client;
@@ -31,6 +33,7 @@
pub mod connection;
pub mod database;
pub mod messages;
pub mod persistence;
pub mod server;
pub const SERVER_NAME: &str = "my.cool.server";
@@ -74,17 +77,24 @@
let listen_address = opts.config.listen_address;
let client_threads = opts.config.client_threads;
let server = {
let server_arbiter = Arbiter::new();
let persistence = {
let database = database.clone();
Supervisor::start_in_arbiter(&Arbiter::new().handle(), move |_ctx| Server {
channels: HashMap::default(),
clients: HashMap::default(),
channel_arbiters: build_arbiters(opts.config.channel_threads),
config: opts.config,
Supervisor::start_in_arbiter(&server_arbiter.handle(), move |_ctx| Persistence {
database,
})
};
let server = Supervisor::start_in_arbiter(&server_arbiter.handle(), move |_ctx| Server {
channels: HashMap::default(),
clients: HashMap::default(),
channel_arbiters: build_arbiters(opts.config.channel_threads),
config: opts.config,
persistence,
});
let listener = TcpListener::bind(listen_address).await?;
actix_rt::spawn(start_tcp_acceptor_loop(
@@ -1,0 +1,83 @@
pub mod events;
use actix::{Context, Handler, ResponseFuture};
use tracing::instrument;
use crate::persistence::events::{ChannelCreated, ChannelJoined, ChannelParted};
pub struct Persistence {
pub database: sqlx::Pool<sqlx::Any>,
}
impl actix::Supervised for Persistence {}
impl actix::Actor for Persistence {
type Context = Context<Self>;
}
impl Handler<ChannelCreated> for Persistence {
type Result = ResponseFuture<()>;
fn handle(&mut self, msg: ChannelCreated, _ctx: &mut Self::Context) -> Self::Result {
let conn = self.database.clone();
Box::pin(async move {
sqlx::query("INSERT OR IGNORE INTO channels (name) VALUES (?)")
.bind(msg.name)
.execute(&conn)
.await
.unwrap();
})
}
}
impl Handler<ChannelJoined> for Persistence {
type Result = ResponseFuture<()>;
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: ChannelJoined, _ctx: &mut Self::Context) -> Self::Result {
let conn = self.database.clone();
Box::pin(async move {
sqlx::query(
"INSERT INTO channel_users (channel, user, permissions, in_channel)
VALUES ((SELECT id FROM channels WHERE name = ?), (SELECT id FROM users WHERE username = ?), ?, ?)
ON CONFLICT(channel, user) DO UPDATE SET in_channel = excluded.in_channel"
)
.bind(msg.channel_name)
.bind(msg.username)
.bind(0i32)
.bind(true)
.execute(&conn)
.await
.unwrap();
})
}
}
impl Handler<ChannelParted> for Persistence {
type Result = ResponseFuture<()>;
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: ChannelParted, _ctx: &mut Self::Context) -> Self::Result {
let conn = self.database.clone();
Box::pin(async move {
sqlx::query(
"UPDATE channel_users
SET in_channel = false
WHERE channel = (SELECT id FROM channels WHERE name = ?)
AND user = (SELECT id FROM users WHERE username = ?)",
)
.bind(msg.channel_name)
.bind(msg.username)
.execute(&conn)
.await
.unwrap();
})
}
}
@@ -24,6 +24,7 @@
FetchClientByNick, ServerDisconnect, ServerFetchMotd, UserConnected, UserNickChange,
UserNickChangeInternal,
},
persistence::Persistence,
server::response::Motd,
SERVER_NAME,
};
@@ -34,7 +35,7 @@
pub channels: HashMap<String, Addr<Channel>>,
pub clients: HashMap<Addr<Client>, InitiatedConnection>,
pub config: Config,
pub database: sqlx::Pool<sqlx::Any>,
pub persistence: Addr<Persistence>,
}
impl Supervised for Server {}
@@ -158,12 +159,14 @@
let channel_name = msg.channel_name.clone();
let server = ctx.address();
let persistence = self.persistence.clone();
Supervisor::start_in_arbiter(&arbiter, move |_ctx| Channel {
name: channel_name,
clients: HashMap::new(),
topic: None,
server,
persistence,
})
})
.clone();
@@ -1,0 +1,24 @@
use actix::Message;
use tracing::Span;
#[derive(Message)]
#[rtype(result = "()")]
pub struct ChannelCreated {
pub name: String,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct ChannelJoined {
pub channel_name: String,
pub username: String,
pub span: Span,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct ChannelParted {
pub channel_name: String,
pub username: String,
pub span: Span,
}