From 44a1108518cc324a9441a63064cea6860a7e4cf6 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Wed, 1 Feb 2023 22:54:55 +0000 Subject: [PATCH] Implement offline private messaging --- migrations/2023010814480_initial-schema.sql | 10 ++++++++++ src/client.rs | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----- src/database/mod.rs | 2 +- src/messages.rs | 10 +++++++--- src/persistence.rs | 80 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- src/persistence/events.rs | 23 ++++++++++++++++++++++- src/server.rs | 53 +++++++++++++++++++++++++++++++++-------------------- 7 files changed, 216 insertions(+), 33 deletions(-) diff --git a/migrations/2023010814480_initial-schema.sql b/migrations/2023010814480_initial-schema.sql index 8d76446..72079f0 100644 --- a/migrations/2023010814480_initial-schema.sql +++ b/migrations/2023010814480_initial-schema.sql @@ -39,3 +39,13 @@ CREATE TABLE channel_users ( -- FOREIGN KEY(channel, last_seen_message_timestamp) REFERENCES channel_messages(channel, timestamp) PRIMARY KEY(channel, user) ); + +CREATE TABLE private_messages ( + timestamp INT NOT NULL PRIMARY KEY, + sender VARCHAR(255) NOT NULL, + receiver INT NOT NULL, + message VARCHAR(255) NOT NULL, + FOREIGN KEY(receiver) REFERENCES users(id) +); + +CREATE INDEX private_messages_receiver ON private_messages(receiver); diff --git a/src/client.rs b/src/client.rs index b9e67a2..656e4ce 100644 --- a/src/client.rs +++ b/src/client.rs @@ -16,11 +16,14 @@ use crate::{ messages::{ Broadcast, ChannelFetchTopic, ChannelInvite, ChannelJoin, ChannelKickUser, ChannelList, ChannelMemberList, ChannelMessage, ChannelPart, ChannelSetMode, ChannelUpdateTopic, - FetchClientDetails, PeerToPeerMessage, ServerDisconnect, ServerFetchMotd, + FetchClientDetails, PrivateMessage, ServerDisconnect, ServerFetchMotd, UserKickedFromChannel, UserNickChange, UserNickChangeInternal, }, persistence::{ - events::{FetchUnseenMessages, FetchUserChannels, ReserveNick}, + events::{ + FetchUnseenChannelMessages, FetchUnseenPrivateMessages, FetchUserChannels, + FetchUserIdByNick, ReserveNick, + }, Persistence, }, server::Server, @@ -94,6 +97,27 @@ impl Actor for Client { }); }), ); + + ctx.spawn( + self.persistence + .send(FetchUnseenPrivateMessages { + user_id: self.connection.user_id, + span: Span::current(), + }) + .into_actor(self) + .map(move |res, this, ctx| { + for (sender, message) in res.unwrap() { + ctx.notify(Broadcast { + message: Message { + tags: None, + prefix: Some(Prefix::new_from_str(&sender)), + command: Command::PRIVMSG(this.connection.nick.clone(), message), + }, + span: this.span.clone(), + }); + } + }), + ); } /// Called when the actor is shutting down, either gracefully by the client or forcefully @@ -185,7 +209,7 @@ impl Handler for Client { span: Span::current(), }); - let channel_messages_fut = self.persistence.send(FetchUnseenMessages { + let channel_messages_fut = self.persistence.send(FetchUnseenChannelMessages { channel_name: channel_name.to_string(), user_id: self.connection.user_id, span: Span::current(), @@ -337,6 +361,35 @@ impl Handler for Client { } } +/// Self-message to send a peer-to-peer message via the server. +impl Handler for Client { + type Result = ResponseActFuture; + + #[instrument(parent = &msg.span, skip_all)] + fn handle(&mut self, msg: SendPrivateMessage, _ctx: &mut Self::Context) -> Self::Result { + self.persistence + .send(FetchUserIdByNick { + nick: msg.destination, + }) + .into_actor(self) + .map(|res, this, ctx| { + let Some(destination) = res.unwrap() else { + // TODO + eprintln!("User attempted to send a message to non-existent user"); + return; + }; + + this.server.do_send(PrivateMessage { + destination, + message: msg.message, + from: ctx.address(), + span: msg.span, + }); + }) + .boxed_local() + } +} + /// Receives messages from the user's incoming TCP stream and processes them, passing them onto /// other actors or self-notifying and calling a [`Handler`]. impl StreamHandler> for Client { @@ -511,10 +564,9 @@ impl StreamHandler> for Client { Command::PRIVMSG(target, message) => { if !target.is_channel_name() { // private message to another user - self.server.do_send(PeerToPeerMessage { + ctx.notify(SendPrivateMessage { destination: target, message, - from: ctx.address(), span: Span::current(), }); } else if let Some(channel) = self.channels.get(&target) { @@ -645,6 +697,15 @@ impl WriteHandler for Client { } } +/// A [`Client`] internal self-notification to send a peer-to-peer message to another user +#[derive(actix::Message, Debug)] +#[rtype(result = "()")] +struct SendPrivateMessage { + destination: String, + message: String, + span: Span, +} + /// A [`Client`] internal self-notification to grab a list of users in each channel #[derive(actix::Message, Debug)] #[rtype(result = "()")] diff --git a/src/database/mod.rs b/src/database/mod.rs index 16aac39..b830103 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -48,6 +48,6 @@ pub async fn reserve_nick( } /// Compares a password to a hash stored in the database. -pub fn verify_password(password: &[u8], hash: &PasswordHash) -> argon2::password_hash::Result<()> { +pub fn verify_password(password: &[u8], hash: &PasswordHash<'_>) -> argon2::password_hash::Result<()> { Argon2::default().verify_password(password, hash) } diff --git a/src/messages.rs b/src/messages.rs index 68dfbdf..e82b5b1 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -3,7 +3,11 @@ use anyhow::Result; use irc_proto::{ChannelMode, Mode}; use tracing::Span; -use crate::{channel::Channel, client::Client, connection::InitiatedConnection}; +use crate::{ + channel::Channel, + client::Client, + connection::{InitiatedConnection, UserId}, +}; /// Sent when a user is connecting to the server. #[derive(Message, Clone)] @@ -172,8 +176,8 @@ pub struct FetchClientByNick { /// Sends a private message between two users. #[derive(Message)] #[rtype(result = "()")] -pub struct PeerToPeerMessage { - pub destination: String, +pub struct PrivateMessage { + pub destination: UserId, pub message: String, pub from: Addr, pub span: Span, diff --git a/src/persistence.rs b/src/persistence.rs index 7f806ce..75b50e7 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -12,7 +12,8 @@ use crate::{ connection::UserId, persistence::events::{ ChannelCreated, ChannelJoined, ChannelMessage, ChannelParted, - FetchAllUserChannelPermissions, FetchUnseenMessages, FetchUserChannels, ReserveNick, + FetchAllUserChannelPermissions, FetchUnseenChannelMessages, FetchUnseenPrivateMessages, + FetchUserChannels, FetchUserIdByNick, PrivateMessage, ReserveNick, SetUserChannelPermissions, }, }; @@ -203,6 +204,27 @@ impl Handler for Persistence { } } +impl Handler for Persistence { + type Result = ResponseFuture>; + + fn handle(&mut self, msg: FetchUserIdByNick, _ctx: &mut Self::Context) -> Self::Result { + let conn = self.database.clone(); + + Box::pin(async move { + sqlx::query_as( + "SELECT user + FROM user_nicks + WHERE nick = ?", + ) + .bind(msg.nick) + .fetch_optional(&conn) + .await + .unwrap() + .map(|(v,)| v) + }) + } +} + impl Handler for Persistence { type Result = ResponseFuture<()>; @@ -242,11 +264,63 @@ impl Handler for Persistence { } } -impl Handler for Persistence { +impl Handler for Persistence { + type Result = ResponseFuture<()>; + + fn handle(&mut self, msg: PrivateMessage, _ctx: &mut Self::Context) -> Self::Result { + let conn = self.database.clone(); + let timestamp = self.monotonically_increasing_id(); + + Box::pin(async move { + sqlx::query( + "INSERT INTO private_messages + (timestamp, sender, receiver, message) + VALUES (?, ?, ?, ?)", + ) + .bind(timestamp) + .bind(msg.sender) + .bind(msg.receiver) + .bind(msg.message) + .execute(&conn) + .await + .unwrap(); + }) + } +} + +impl Handler for Persistence { + type Result = ResponseFuture>; + + fn handle( + &mut self, + msg: FetchUnseenPrivateMessages, + _ctx: &mut Self::Context, + ) -> Self::Result { + let conn = self.database.clone(); + + Box::pin(async move { + sqlx::query_as( + "DELETE FROM private_messages + WHERE receiver = ? + RETURNING sender, message", + ) + .bind(msg.user_id) + .fetch_all(&conn) + .await + .unwrap() + }) + } +} + +impl Handler for Persistence { type Result = ResponseFuture>; #[instrument(parent = &msg.span, skip_all)] - fn handle(&mut self, msg: FetchUnseenMessages, _ctx: &mut Self::Context) -> Self::Result { + fn handle( + &mut self, + msg: FetchUnseenChannelMessages, + _ctx: &mut Self::Context, + ) -> Self::Result { let conn = self.database.clone(); let max_message_reply_since = Utc::now() - chrono::Duration::from_std(self.max_message_replay_since).unwrap(); diff --git a/src/persistence/events.rs b/src/persistence/events.rs index 1591817..390a5fc 100644 --- a/src/persistence/events.rs +++ b/src/persistence/events.rs @@ -52,6 +52,12 @@ pub struct SetUserChannelPermissions { } #[derive(Message)] +#[rtype(result = "Option")] +pub struct FetchUserIdByNick { + pub nick: String, +} + +#[derive(Message)] #[rtype(result = "()")] pub struct ChannelMessage { pub channel_id: ChannelId, @@ -61,8 +67,23 @@ pub struct ChannelMessage { } #[derive(Message)] +#[rtype(result = "()")] +pub struct PrivateMessage { + pub sender: String, + pub receiver: UserId, + pub message: String, +} + +#[derive(Message)] +#[rtype(result = "Vec<(String, String)>")] +pub struct FetchUnseenPrivateMessages { + pub user_id: UserId, + pub span: Span, +} + +#[derive(Message)] #[rtype(result = "Vec<(String, String)>")] -pub struct FetchUnseenMessages { +pub struct FetchUnseenChannelMessages { pub channel_name: String, pub user_id: UserId, pub span: Span, diff --git a/src/server.rs b/src/server.rs index ea70404..218799e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -21,7 +21,7 @@ use crate::{ connection::InitiatedConnection, messages::{ Broadcast, ChannelFetchTopic, ChannelJoin, ChannelList, ChannelMemberList, - FetchClientByNick, PeerToPeerMessage, ServerDisconnect, ServerFetchMotd, UserConnected, + FetchClientByNick, PrivateMessage, ServerDisconnect, ServerFetchMotd, UserConnected, UserNickChange, UserNickChangeInternal, }, persistence::Persistence, @@ -73,7 +73,11 @@ impl Handler for Server { ), ( Response::RPL_YOURHOST, - vec!["Your host is a sick kid".into()], + vec![format!( + "Your host is {SERVER_NAME}, running version {}", + crate_version!() + ) + .into()], ), ( Response::RPL_CREATED, @@ -257,35 +261,44 @@ impl Handler for Server { } } -// TODO: implement offline messaging and replay -impl Handler for Server { +impl Handler for Server { type Result = (); #[instrument(parent = &msg.span, skip_all)] - fn handle(&mut self, msg: PeerToPeerMessage, _ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: PrivateMessage, _ctx: &mut Self::Context) -> Self::Result { let Some(source) = self.clients.get(&msg.from) else { // user is not yet registered with the server return; }; - // TODO: O(1) lookup of users by nick - let target = self + let mut seen_by_user = false; + + // TODO: O(1) lookup of users by id + for (target, target_conn) in self .clients .iter() - .find(|(_handle, connection)| connection.nick == msg.destination); - let Some((target, _)) = target else { - // return error to caller that user does not exist - return; - }; + .filter(|(_handle, connection)| connection.user_id == msg.destination) + { + target.do_send(Broadcast { + message: Message { + tags: None, + prefix: Some(source.to_nick()), + command: Command::PRIVMSG(target_conn.nick.clone(), msg.message.clone()), + }, + span: msg.span.clone(), + }); + + seen_by_user = true; + } - target.do_send(Broadcast { - message: Message { - tags: None, - prefix: Some(source.to_nick()), - command: Command::PRIVMSG(msg.destination, msg.message), - }, - span: msg.span, - }); + if !seen_by_user { + self.persistence + .do_send(crate::persistence::events::PrivateMessage { + sender: source.to_nick().to_string(), + receiver: msg.destination, + message: msg.message, + }); + } } } -- libgit2 1.7.2