From adfadc3e367e7c4dd80fb067e582e103e83852aa Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Sun, 8 Jan 2023 18:32:54 +0000 Subject: [PATCH] Send all (max 500) unseen messages to the user when they connect to the server --- migrations/2023010814480_initial-schema.sql | 11 +++++++++++ src/channel.rs | 8 ++++++++ src/client.rs | 49 ++++++++++++++++++++++++++++++++++++------------- src/persistence.rs | 90 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- src/persistence/events.rs | 17 +++++++++++++++++ 5 files changed, 161 insertions(+), 14 deletions(-) diff --git a/migrations/2023010814480_initial-schema.sql b/migrations/2023010814480_initial-schema.sql index 9d8fdd6..f103235 100644 --- a/migrations/2023010814480_initial-schema.sql +++ b/migrations/2023010814480_initial-schema.sql @@ -13,13 +13,24 @@ CREATE TABLE channels ( CREATE UNIQUE INDEX channel_name ON channels(name); +CREATE TABLE channel_messages ( + channel INT NOT NULL, + idx INT NOT NULL, + sender VARCHAR(255), + message VARCHAR(255), + FOREIGN KEY(channel) REFERENCES channels(id), + PRIMARY KEY(channel, idx) +); + CREATE TABLE channel_users ( channel INT NOT NULL, user INT NOT NULL, permissions INT NOT NULL DEFAULT 0, in_channel BOOLEAN DEFAULT false, + last_seen_message_idx INT, FOREIGN KEY(user) REFERENCES users(id), FOREIGN KEY(channel) REFERENCES channels(id) + -- FOREIGN KEY(channel, last_seen_message_idx) REFERENCES channels(channel, idx) ); CREATE UNIQUE INDEX channel_user ON channel_users(channel, user); diff --git a/src/channel.rs b/src/channel.rs index 1eb4c5f..26afbee 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -85,6 +85,14 @@ impl Handler for Channel { // build the nick prefix for the message we're about to broadcast let nick = sender.to_nick(); + self.persistence + .do_send(crate::persistence::events::ChannelMessage { + channel_name: self.name.to_string(), + sender: nick.to_string(), + message: msg.message.to_string(), + receivers: self.clients.values().map(|v| v.user.to_string()).collect(), + }); + for client in self.clients.keys() { if client == &msg.client { // don't echo the message back to the sender diff --git a/src/client.rs b/src/client.rs index fcac020..073ba9d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -19,7 +19,10 @@ use crate::{ ServerDisconnect, ServerFetchMotd, UserKickedFromChannel, UserNickChange, UserNickChangeInternal, }, - persistence::{events::FetchUserChannels, Persistence}, + persistence::{ + events::{FetchUnseenMessages, FetchUserChannels}, + Persistence, + }, server::Server, SERVER_NAME, }; @@ -168,21 +171,30 @@ impl Handler for Client { // loop over all the channels and send a channel join notification to the root // server actor to get a handle back for channel_name in msg.channels { - if !channel_name.is_channel_name() { + if !channel_name.is_channel_name() || self.channels.contains_key(&channel_name) { // todo: send message to client informing them of the invalid channel name continue; } + let channel_handle_fut = self.server.clone().send(ChannelJoin { + channel_name: channel_name.to_string(), + client: ctx.address(), + connection: self.connection.clone(), + span: Span::current(), + }); + + let channel_messages_fut = self.persistence.send(FetchUnseenMessages { + channel_name: channel_name.to_string(), + username: self.connection.user.to_string(), + span: Span::current(), + }); + futures.push( - self.server - .clone() - .send(ChannelJoin { - channel_name: channel_name.to_string(), - client: ctx.address(), - connection: self.connection.clone(), - span: Span::current(), - }) - .map(move |v| (channel_name, v.unwrap().unwrap())), + futures::future::join(channel_handle_fut, channel_messages_fut).map( + move |(handle, messages)| { + (channel_name, handle.unwrap().unwrap(), messages.unwrap()) + }, + ), ); } @@ -191,9 +203,20 @@ impl Handler for Client { let fut = wrap_future::<_, Self>( futures::future::join_all(futures.into_iter()).instrument(Span::current()), ) - .map(|result, this, _ctx| { - for (channel_name, handle) in result { + .map(|result, this, ctx| { + for (channel_name, handle, messages) in result { this.channels.insert(channel_name.clone(), handle); + + for (source, message) in messages { + ctx.notify(Broadcast { + message: Message { + tags: None, + prefix: Some(Prefix::new_from_str(&source)), + command: Command::PRIVMSG(channel_name.clone(), message), + }, + span: this.span.clone(), + }); + } } }); diff --git a/src/persistence.rs b/src/persistence.rs index d0bb723..90d7041 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -1,9 +1,13 @@ pub mod events; use actix::{Context, Handler, ResponseFuture}; +use itertools::Itertools; use tracing::instrument; -use crate::persistence::events::{ChannelCreated, ChannelJoined, ChannelParted, FetchUserChannels}; +use crate::persistence::events::{ + ChannelCreated, ChannelJoined, ChannelMessage, ChannelParted, FetchUnseenMessages, + FetchUserChannels, +}; /// Takes events destined for other actors and persists them to the database. pub struct Persistence { @@ -107,3 +111,87 @@ impl Handler for Persistence { }) } } + +impl Handler for Persistence { + type Result = ResponseFuture<()>; + + fn handle(&mut self, msg: ChannelMessage, _ctx: &mut Self::Context) -> Self::Result { + let conn = self.database.clone(); + + Box::pin(async move { + let (channel, idx): (i64, i64) = sqlx::query_as( + "WITH channel AS (SELECT id FROM channels WHERE name = ?) + INSERT INTO channel_messages (channel, idx, sender, message) + SELECT + channel.id, + COALESCE((SELECT MAX(idx) + 1 FROM channel_messages WHERE channel = channel.id), 0), + ?, + ? + FROM channel + RETURNING channel, idx", + ) + .bind(msg.channel_name) + .bind(msg.sender) + .bind(msg.message) + .fetch_one(&conn) + .await + .unwrap(); + + let query = format!( + "UPDATE channel_users + SET last_seen_message_idx = ? + WHERE channel = ? + AND user IN (SELECT id FROM users WHERE username IN ({}))", + msg.receivers.iter().map(|_| "?").join(",") + ); + + let mut query = sqlx::query(&query).bind(idx).bind(channel); + for receiver in msg.receivers { + query = query.bind(receiver); + } + + query.execute(&conn).await.unwrap(); + }) + } +} + +impl Handler for Persistence { + type Result = ResponseFuture>; + + #[instrument(parent = &msg.span, skip_all)] + fn handle(&mut self, msg: FetchUnseenMessages, _ctx: &mut Self::Context) -> Self::Result { + let conn = self.database.clone(); + + Box::pin(async move { + // select the last 500 messages, or the last message the user saw - whichever dataset + // is smaller. + let res = sqlx::query_as( + "WITH channel AS (SELECT id FROM channels WHERE name = ?) + SELECT sender, message + FROM channel_messages + WHERE channel = (SELECT id FROM channel) + AND idx > MAX( + ( + SELECT MAX(0, MAX(idx) - 500) + FROM channel_messages + WHERE channel = (SELECT id FROM channel) + ), + ( + SELECT last_seen_message_idx + FROM channel_users + WHERE channel = (SELECT id FROM channel) + AND user = (SELECT id FROM users WHERE username = ?) + ) + ) + ORDER BY idx DESC", + ) + .bind(msg.channel_name.to_string()) + .bind(msg.username.to_string()) + .fetch_all(&conn) + .await + .unwrap(); + + res + }) + } +} diff --git a/src/persistence/events.rs b/src/persistence/events.rs index 7f0f634..8e69bb2 100644 --- a/src/persistence/events.rs +++ b/src/persistence/events.rs @@ -29,3 +29,20 @@ pub struct FetchUserChannels { pub username: String, pub span: Span, } + +#[derive(Message)] +#[rtype(result = "()")] +pub struct ChannelMessage { + pub channel_name: String, + pub sender: String, + pub message: String, + pub receivers: Vec, +} + +#[derive(Message)] +#[rtype(result = "Vec<(String, String)>")] +pub struct FetchUnseenMessages { + pub channel_name: String, + pub username: String, + pub span: Span, +} -- libgit2 1.7.2