🏡 index : ~doyle/titanirc.git

author Jordan Doyle <jordan@doyle.la> 2023-01-08 18:32:54.0 +00:00:00
committer Jordan Doyle <jordan@doyle.la> 2023-01-08 18:33:46.0 +00:00:00
commit
adfadc3e367e7c4dd80fb067e582e103e83852aa [patch]
tree
42f5cfc2c12291f1341f488e4dde8035bd151740
parent
46ab04be24499a412f14ae21d822b6275fb27e73
download
adfadc3e367e7c4dd80fb067e582e103e83852aa.tar.gz

Send all (max 500) unseen messages to the user when they connect to the server



Diff

 migrations/2023010814480_initial-schema.sql | 11 +++++++++++
 src/channel.rs                              |  8 ++++++++
 src/client.rs                               | 49 ++++++++++++++++++++++++++++++++++++++++++++++---
 src/persistence.rs                          | 90 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 src/persistence/events.rs                   | 17 +++++++++++++++++
 5 files changed, 161 insertions(+), 14 deletions(-)

diff --git a/migrations/2023010814480_initial-schema.sql b/migrations/2023010814480_initial-schema.sql
index 9d8fdd6..f103235 100644
--- a/migrations/2023010814480_initial-schema.sql
+++ a/migrations/2023010814480_initial-schema.sql
@@ -13,13 +13,24 @@

CREATE UNIQUE INDEX channel_name ON channels(name);

CREATE TABLE channel_messages (
      channel INT NOT NULL,
      idx INT NOT NULL,
      sender VARCHAR(255),
      message VARCHAR(255),
      FOREIGN KEY(channel) REFERENCES channels(id),
      PRIMARY KEY(channel, idx)
);

CREATE TABLE channel_users (
    channel INT NOT NULL,
    user INT NOT NULL,
    permissions INT NOT NULL DEFAULT 0,
    in_channel BOOLEAN DEFAULT false,
    last_seen_message_idx INT,
    FOREIGN KEY(user) REFERENCES users(id),
    FOREIGN KEY(channel) REFERENCES channels(id)
    -- FOREIGN KEY(channel, last_seen_message_idx) REFERENCES channels(channel, idx)
);

CREATE UNIQUE INDEX channel_user ON channel_users(channel, user);
diff --git a/src/channel.rs b/src/channel.rs
index 1eb4c5f..26afbee 100644
--- a/src/channel.rs
+++ a/src/channel.rs
@@ -85,6 +85,14 @@
        // build the nick prefix for the message we're about to broadcast
        let nick = sender.to_nick();

        self.persistence
            .do_send(crate::persistence::events::ChannelMessage {
                channel_name: self.name.to_string(),
                sender: nick.to_string(),
                message: msg.message.to_string(),
                receivers: self.clients.values().map(|v| v.user.to_string()).collect(),
            });

        for client in self.clients.keys() {
            if client == &msg.client {
                // don't echo the message back to the sender
diff --git a/src/client.rs b/src/client.rs
index fcac020..073ba9d 100644
--- a/src/client.rs
+++ a/src/client.rs
@@ -19,7 +19,10 @@
        ServerDisconnect, ServerFetchMotd, UserKickedFromChannel, UserNickChange,
        UserNickChangeInternal,
    },
    persistence::{events::FetchUserChannels, Persistence},
    persistence::{
        events::{FetchUnseenMessages, FetchUserChannels},
        Persistence,
    },
    server::Server,
    SERVER_NAME,
};
@@ -168,21 +171,30 @@
        // loop over all the channels and send a channel join notification to the root
        // server actor to get a handle back
        for channel_name in msg.channels {
            if !channel_name.is_channel_name() {
            if !channel_name.is_channel_name() || self.channels.contains_key(&channel_name) {
                // todo: send message to client informing them of the invalid channel name
                continue;
            }

            let channel_handle_fut = self.server.clone().send(ChannelJoin {
                channel_name: channel_name.to_string(),
                client: ctx.address(),
                connection: self.connection.clone(),
                span: Span::current(),
            });

            let channel_messages_fut = self.persistence.send(FetchUnseenMessages {
                channel_name: channel_name.to_string(),
                username: self.connection.user.to_string(),
                span: Span::current(),
            });

            futures.push(
                self.server
                    .clone()
                    .send(ChannelJoin {
                        channel_name: channel_name.to_string(),
                        client: ctx.address(),
                        connection: self.connection.clone(),
                        span: Span::current(),
                    })
                    .map(move |v| (channel_name, v.unwrap().unwrap())),
                futures::future::join(channel_handle_fut, channel_messages_fut).map(
                    move |(handle, messages)| {
                        (channel_name, handle.unwrap().unwrap(), messages.unwrap())
                    },
                ),
            );
        }

@@ -191,9 +203,20 @@
        let fut = wrap_future::<_, Self>(
            futures::future::join_all(futures.into_iter()).instrument(Span::current()),
        )
        .map(|result, this, _ctx| {
            for (channel_name, handle) in result {
        .map(|result, this, ctx| {
            for (channel_name, handle, messages) in result {
                this.channels.insert(channel_name.clone(), handle);

                for (source, message) in messages {
                    ctx.notify(Broadcast {
                        message: Message {
                            tags: None,
                            prefix: Some(Prefix::new_from_str(&source)),
                            command: Command::PRIVMSG(channel_name.clone(), message),
                        },
                        span: this.span.clone(),
                    });
                }
            }
        });

diff --git a/src/persistence.rs b/src/persistence.rs
index d0bb723..90d7041 100644
--- a/src/persistence.rs
+++ a/src/persistence.rs
@@ -1,9 +1,13 @@
pub mod events;

use actix::{Context, Handler, ResponseFuture};
use itertools::Itertools;
use tracing::instrument;

use crate::persistence::events::{ChannelCreated, ChannelJoined, ChannelParted, FetchUserChannels};
use crate::persistence::events::{
    ChannelCreated, ChannelJoined, ChannelMessage, ChannelParted, FetchUnseenMessages,
    FetchUserChannels,
};

/// Takes events destined for other actors and persists them to the database.

pub struct Persistence {
@@ -104,6 +108,90 @@
            .into_iter()
            .map(|(v,)| v)
            .collect()
        })
    }
}

impl Handler<ChannelMessage> for Persistence {
    type Result = ResponseFuture<()>;

    fn handle(&mut self, msg: ChannelMessage, _ctx: &mut Self::Context) -> Self::Result {
        let conn = self.database.clone();

        Box::pin(async move {
            let (channel, idx): (i64, i64) = sqlx::query_as(
                "WITH channel AS (SELECT id FROM channels WHERE name = ?)
                 INSERT INTO channel_messages (channel, idx, sender, message)
                     SELECT
                        channel.id,
                        COALESCE((SELECT MAX(idx) + 1 FROM channel_messages WHERE channel = channel.id), 0),
                        ?,
                        ?
                    FROM channel
                 RETURNING channel, idx",
            )
            .bind(msg.channel_name)
            .bind(msg.sender)
            .bind(msg.message)
            .fetch_one(&conn)
            .await
            .unwrap();

            let query = format!(
                "UPDATE channel_users
                 SET last_seen_message_idx = ?
                 WHERE channel = ?
                   AND user IN (SELECT id FROM users WHERE username IN ({}))",
                msg.receivers.iter().map(|_| "?").join(",")
            );

            let mut query = sqlx::query(&query).bind(idx).bind(channel);
            for receiver in msg.receivers {
                query = query.bind(receiver);
            }

            query.execute(&conn).await.unwrap();
        })
    }
}

impl Handler<FetchUnseenMessages> for Persistence {
    type Result = ResponseFuture<Vec<(String, String)>>;

    #[instrument(parent = &msg.span, skip_all)]
    fn handle(&mut self, msg: FetchUnseenMessages, _ctx: &mut Self::Context) -> Self::Result {
        let conn = self.database.clone();

        Box::pin(async move {
            // select the last 500 messages, or the last message the user saw - whichever dataset
            // is smaller.
            let res = sqlx::query_as(
                "WITH channel AS (SELECT id FROM channels WHERE name = ?)
                 SELECT sender, message
                 FROM channel_messages
                 WHERE channel = (SELECT id FROM channel)
                    AND idx > MAX(
                        (
                            SELECT MAX(0, MAX(idx) - 500)
                            FROM channel_messages
                            WHERE channel = (SELECT id FROM channel)
                        ),
                        (
                            SELECT last_seen_message_idx
                            FROM channel_users
                            WHERE channel = (SELECT id FROM channel)
                              AND user = (SELECT id FROM users WHERE username = ?)
                        )
                    )
                 ORDER BY idx DESC",
            )
            .bind(msg.channel_name.to_string())
            .bind(msg.username.to_string())
            .fetch_all(&conn)
            .await
            .unwrap();

            res
        })
    }
}
diff --git a/src/persistence/events.rs b/src/persistence/events.rs
index 7f0f634..8e69bb2 100644
--- a/src/persistence/events.rs
+++ a/src/persistence/events.rs
@@ -29,3 +29,20 @@
    pub username: String,
    pub span: Span,
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct ChannelMessage {
    pub channel_name: String,
    pub sender: String,
    pub message: String,
    pub receivers: Vec<String>,
}

#[derive(Message)]
#[rtype(result = "Vec<(String, String)>")]
pub struct FetchUnseenMessages {
    pub channel_name: String,
    pub username: String,
    pub span: Span,
}