🏡 index : ~doyle/titanirc.git

author Jordan Doyle <jordan@doyle.la> 2023-01-08 16:33:29.0 +00:00:00
committer Jordan Doyle <jordan@doyle.la> 2023-01-08 16:34:18.0 +00:00:00
commit
b98bf5092071f83a4fabbef03e38242c34117047 [patch]
tree
3f9a7c5252427a8a03a266500ebafb99f2b537d8
parent
8b285671492300a6535cc0997727dbf118d829b0
download
b98bf5092071f83a4fabbef03e38242c34117047.tar.gz

Persist users, channels and channel members to the database



Diff

 migrations/2023010814480_initial-schema.sql | 19 +++++++-
 src/channel.rs                              | 25 +++++++++-
 src/main.rs                                 | 24 ++++++---
 src/persistence.rs                          | 83 ++++++++++++++++++++++++++++++-
 src/persistence/events.rs                   | 24 +++++++++-
 src/server.rs                               |  5 +-
 6 files changed, 172 insertions(+), 8 deletions(-)

diff --git a/migrations/2023010814480_initial-schema.sql b/migrations/2023010814480_initial-schema.sql
index 9f0ab38..9d8fdd6 100644
--- a/migrations/2023010814480_initial-schema.sql
+++ b/migrations/2023010814480_initial-schema.sql
@@ -1,6 +1,25 @@
CREATE TABLE users (
    id INTEGER PRIMARY KEY,
    username VARCHAR(255) NOT NULL,
    password VARCHAR(255) NOT NULL
);

CREATE UNIQUE INDEX users_username ON users(username);

CREATE TABLE channels (
    id INTEGER PRIMARY KEY,
    name VARCHAR(255) NOT NULL
);

CREATE UNIQUE INDEX channel_name ON channels(name);

CREATE TABLE channel_users (
    channel INT NOT NULL,
    user INT NOT NULL,
    permissions INT NOT NULL DEFAULT 0,
    in_channel BOOLEAN DEFAULT false,
    FOREIGN KEY(user) REFERENCES users(id),
    FOREIGN KEY(channel) REFERENCES channels(id)
);

CREATE UNIQUE INDEX channel_user ON channel_users(channel, user);
diff --git a/src/channel.rs b/src/channel.rs
index 50b607e..1eb4c5f 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -20,6 +20,7 @@ use crate::{
        ChannelMemberList, ChannelMessage, ChannelPart, ChannelUpdateTopic, FetchClientByNick,
        ServerDisconnect, UserKickedFromChannel, UserNickChange,
    },
    persistence::Persistence,
    server::Server,
};

@@ -30,10 +31,18 @@ pub struct Channel {
    pub server: Addr<Server>,
    pub clients: HashMap<Addr<Client>, InitiatedConnection>,
    pub topic: Option<CurrentChannelTopic>,
    pub persistence: Addr<Persistence>,
}

impl Actor for Channel {
    type Context = Context<Self>;

    fn started(&mut self, _ctx: &mut Self::Context) {
        self.persistence
            .do_send(crate::persistence::events::ChannelCreated {
                name: self.name.to_string(),
            });
    }
}

impl Supervised for Channel {}
@@ -123,6 +132,14 @@ impl Handler<ChannelJoin> for Channel {
    fn handle(&mut self, msg: ChannelJoin, ctx: &mut Self::Context) -> Self::Result {
        info!(self.name, msg.connection.nick, "User is joining channel");

        // persist the user's join to the database
        self.persistence
            .do_send(crate::persistence::events::ChannelJoined {
                channel_name: self.name.to_string(),
                username: msg.connection.user.to_string(),
                span: msg.span.clone(),
            });

        self.clients
            .insert(msg.client.clone(), msg.connection.clone());

@@ -253,6 +270,14 @@ impl Handler<ChannelPart> for Channel {
            return;
        };

        // update the client's state in the database
        self.persistence
            .do_send(crate::persistence::events::ChannelParted {
                channel_name: self.name.to_string(),
                username: client_info.user.to_string(),
                span: msg.span.clone(),
            });

        let message = Broadcast {
            message: irc_proto::Message {
                tags: None,
diff --git a/src/main.rs b/src/main.rs
index dd746c2..f551b49 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -23,7 +23,9 @@ use tokio_util::codec::FramedRead;
use tracing::{error, info, info_span, Instrument};
use tracing_subscriber::EnvFilter;

use crate::{client::Client, config::Args, messages::UserConnected, server::Server};
use crate::{
    client::Client, config::Args, messages::UserConnected, persistence::Persistence, server::Server,
};

pub mod channel;
pub mod client;
@@ -31,6 +33,7 @@ pub mod config;
pub mod connection;
pub mod database;
pub mod messages;
pub mod persistence;
pub mod server;

pub const SERVER_NAME: &str = "my.cool.server";
@@ -74,17 +77,24 @@ async fn main() -> anyhow::Result<()> {
    let listen_address = opts.config.listen_address;
    let client_threads = opts.config.client_threads;

    let server = {
    let server_arbiter = Arbiter::new();

    let persistence = {
        let database = database.clone();

        Supervisor::start_in_arbiter(&Arbiter::new().handle(), move |_ctx| Server {
            channels: HashMap::default(),
            clients: HashMap::default(),
            channel_arbiters: build_arbiters(opts.config.channel_threads),
            config: opts.config,
        Supervisor::start_in_arbiter(&server_arbiter.handle(), move |_ctx| Persistence {
            database,
        })
    };

    let server = Supervisor::start_in_arbiter(&server_arbiter.handle(), move |_ctx| Server {
        channels: HashMap::default(),
        clients: HashMap::default(),
        channel_arbiters: build_arbiters(opts.config.channel_threads),
        config: opts.config,
        persistence,
    });

    let listener = TcpListener::bind(listen_address).await?;

    actix_rt::spawn(start_tcp_acceptor_loop(
diff --git a/src/persistence.rs b/src/persistence.rs
new file mode 100644
index 0000000..8bcaa3d
--- /dev/null
+++ b/src/persistence.rs
@@ -0,0 +1,83 @@
pub mod events;

use actix::{Context, Handler, ResponseFuture};
use tracing::instrument;

use crate::persistence::events::{ChannelCreated, ChannelJoined, ChannelParted};

/// Takes events destined for other actors and persists them to the database.
pub struct Persistence {
    pub database: sqlx::Pool<sqlx::Any>,
}

impl actix::Supervised for Persistence {}

impl actix::Actor for Persistence {
    type Context = Context<Self>;
}

/// Create a new channel in the database, if one doesn't already exist.
impl Handler<ChannelCreated> for Persistence {
    type Result = ResponseFuture<()>;

    fn handle(&mut self, msg: ChannelCreated, _ctx: &mut Self::Context) -> Self::Result {
        let conn = self.database.clone();

        Box::pin(async move {
            sqlx::query("INSERT OR IGNORE INTO channels (name) VALUES (?)")
                .bind(msg.name)
                .execute(&conn)
                .await
                .unwrap();
        })
    }
}

/// Insert a new channel member into the database.
impl Handler<ChannelJoined> for Persistence {
    type Result = ResponseFuture<()>;

    #[instrument(parent = &msg.span, skip_all)]
    fn handle(&mut self, msg: ChannelJoined, _ctx: &mut Self::Context) -> Self::Result {
        let conn = self.database.clone();

        Box::pin(async move {
            sqlx::query(
                "INSERT INTO channel_users (channel, user, permissions, in_channel)
                 VALUES ((SELECT id FROM channels WHERE name = ?), (SELECT id FROM users WHERE username = ?), ?, ?)
                 ON CONFLICT(channel, user) DO UPDATE SET in_channel = excluded.in_channel"
            )
            .bind(msg.channel_name)
            .bind(msg.username)
            .bind(0i32)
            .bind(true)
            .execute(&conn)
            .await
            .unwrap();
        })
    }
}

/// Update a user to not being in a channel anymore.
impl Handler<ChannelParted> for Persistence {
    type Result = ResponseFuture<()>;

    #[instrument(parent = &msg.span, skip_all)]
    fn handle(&mut self, msg: ChannelParted, _ctx: &mut Self::Context) -> Self::Result {
        let conn = self.database.clone();

        Box::pin(async move {
            sqlx::query(
                "UPDATE channel_users
                 SET in_channel = false
                 WHERE channel = (SELECT id FROM channels WHERE name = ?)
                   AND user = (SELECT id FROM users WHERE username = ?)",
            )
            .bind(msg.channel_name)
            .bind(msg.username)
            .execute(&conn)
            .await
            .unwrap();
        })
    }
}
diff --git a/src/persistence/events.rs b/src/persistence/events.rs
new file mode 100644
index 0000000..beaeb53
--- /dev/null
+++ b/src/persistence/events.rs
@@ -0,0 +1,24 @@
use actix::Message;
use tracing::Span;

#[derive(Message)]
#[rtype(result = "()")]
pub struct ChannelCreated {
    pub name: String,
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct ChannelJoined {
    pub channel_name: String,
    pub username: String,
    pub span: Span,
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct ChannelParted {
    pub channel_name: String,
    pub username: String,
    pub span: Span,
}
diff --git a/src/server.rs b/src/server.rs
index b78cec7..24c7214 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -24,6 +24,7 @@ use crate::{
        FetchClientByNick, ServerDisconnect, ServerFetchMotd, UserConnected, UserNickChange,
        UserNickChangeInternal,
    },
    persistence::Persistence,
    server::response::Motd,
    SERVER_NAME,
};
@@ -34,7 +35,7 @@ pub struct Server {
    pub channels: HashMap<String, Addr<Channel>>,
    pub clients: HashMap<Addr<Client>, InitiatedConnection>,
    pub config: Config,
    pub database: sqlx::Pool<sqlx::Any>,
    pub persistence: Addr<Persistence>,
}

impl Supervised for Server {}
@@ -158,12 +159,14 @@ impl Handler<ChannelJoin> for Server {

                let channel_name = msg.channel_name.clone();
                let server = ctx.address();
                let persistence = self.persistence.clone();

                Supervisor::start_in_arbiter(&arbiter, move |_ctx| Channel {
                    name: channel_name,
                    clients: HashMap::new(),
                    topic: None,
                    server,
                    persistence,
                })
            })
            .clone();