Implement offline private messaging
Diff
migrations/2023010814480_initial-schema.sql | 10 ++++++++++
src/client.rs | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
src/messages.rs | 10 +++++++---
src/persistence.rs | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/server.rs | 53 ++++++++++++++++++++++++++++++++++++++++-------------
src/database/mod.rs | 2 +-
src/persistence/events.rs | 23 +++++++++++++++++++++++
7 files changed, 216 insertions(+), 33 deletions(-)
@@ -39,3 +39,13 @@
PRIMARY KEY(channel, user)
);
CREATE TABLE private_messages (
timestamp INT NOT NULL PRIMARY KEY,
sender VARCHAR(255) NOT NULL,
receiver INT NOT NULL,
message VARCHAR(255) NOT NULL,
FOREIGN KEY(receiver) REFERENCES users(id)
);
CREATE INDEX private_messages_receiver ON private_messages(receiver);
@@ -16,11 +16,14 @@
messages::{
Broadcast, ChannelFetchTopic, ChannelInvite, ChannelJoin, ChannelKickUser, ChannelList,
ChannelMemberList, ChannelMessage, ChannelPart, ChannelSetMode, ChannelUpdateTopic,
FetchClientDetails, PeerToPeerMessage, ServerDisconnect, ServerFetchMotd,
FetchClientDetails, PrivateMessage, ServerDisconnect, ServerFetchMotd,
UserKickedFromChannel, UserNickChange, UserNickChangeInternal,
},
persistence::{
events::{FetchUnseenMessages, FetchUserChannels, ReserveNick},
events::{
FetchUnseenChannelMessages, FetchUnseenPrivateMessages, FetchUserChannels,
FetchUserIdByNick, ReserveNick,
},
Persistence,
},
server::Server,
@@ -94,6 +97,27 @@
});
}),
);
ctx.spawn(
self.persistence
.send(FetchUnseenPrivateMessages {
user_id: self.connection.user_id,
span: Span::current(),
})
.into_actor(self)
.map(move |res, this, ctx| {
for (sender, message) in res.unwrap() {
ctx.notify(Broadcast {
message: Message {
tags: None,
prefix: Some(Prefix::new_from_str(&sender)),
command: Command::PRIVMSG(this.connection.nick.clone(), message),
},
span: this.span.clone(),
});
}
}),
);
}
@@ -185,7 +209,7 @@
span: Span::current(),
});
let channel_messages_fut = self.persistence.send(FetchUnseenMessages {
let channel_messages_fut = self.persistence.send(FetchUnseenChannelMessages {
channel_name: channel_name.to_string(),
user_id: self.connection.user_id,
span: Span::current(),
@@ -334,6 +358,35 @@
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: UserKickedFromChannel, _ctx: &mut Self::Context) -> Self::Result {
self.channels.remove(&msg.channel);
}
}
impl Handler<SendPrivateMessage> for Client {
type Result = ResponseActFuture<Self, ()>;
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: SendPrivateMessage, _ctx: &mut Self::Context) -> Self::Result {
self.persistence
.send(FetchUserIdByNick {
nick: msg.destination,
})
.into_actor(self)
.map(|res, this, ctx| {
let Some(destination) = res.unwrap() else {
eprintln!("User attempted to send a message to non-existent user");
return;
};
this.server.do_send(PrivateMessage {
destination,
message: msg.message,
from: ctx.address(),
span: msg.span,
});
})
.boxed_local()
}
}
@@ -511,10 +564,9 @@
Command::PRIVMSG(target, message) => {
if !target.is_channel_name() {
self.server.do_send(PeerToPeerMessage {
ctx.notify(SendPrivateMessage {
destination: target,
message,
from: ctx.address(),
span: Span::current(),
});
} else if let Some(channel) = self.channels.get(&target) {
@@ -643,6 +695,15 @@
error!(%error, "Failed to write message to client");
Running::Continue
}
}
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
struct SendPrivateMessage {
destination: String,
message: String,
span: Span,
}
@@ -1,9 +1,13 @@
use actix::{Addr, Message};
use anyhow::Result;
use irc_proto::{ChannelMode, Mode};
use tracing::Span;
use crate::{channel::Channel, client::Client, connection::InitiatedConnection};
use crate::{
channel::Channel,
client::Client,
connection::{InitiatedConnection, UserId},
};
#[derive(Message, Clone)]
@@ -172,8 +176,8 @@
#[derive(Message)]
#[rtype(result = "()")]
pub struct PeerToPeerMessage {
pub destination: String,
pub struct PrivateMessage {
pub destination: UserId,
pub message: String,
pub from: Addr<Client>,
pub span: Span,
@@ -12,7 +12,8 @@
connection::UserId,
persistence::events::{
ChannelCreated, ChannelJoined, ChannelMessage, ChannelParted,
FetchAllUserChannelPermissions, FetchUnseenMessages, FetchUserChannels, ReserveNick,
FetchAllUserChannelPermissions, FetchUnseenChannelMessages, FetchUnseenPrivateMessages,
FetchUserChannels, FetchUserIdByNick, PrivateMessage, ReserveNick,
SetUserChannelPermissions,
},
};
@@ -203,6 +204,27 @@
}
}
impl Handler<FetchUserIdByNick> for Persistence {
type Result = ResponseFuture<Option<UserId>>;
fn handle(&mut self, msg: FetchUserIdByNick, _ctx: &mut Self::Context) -> Self::Result {
let conn = self.database.clone();
Box::pin(async move {
sqlx::query_as(
"SELECT user
FROM user_nicks
WHERE nick = ?",
)
.bind(msg.nick)
.fetch_optional(&conn)
.await
.unwrap()
.map(|(v,)| v)
})
}
}
impl Handler<ChannelMessage> for Persistence {
type Result = ResponseFuture<()>;
@@ -238,15 +260,67 @@
query.execute(&conn).await.unwrap();
}
})
}
}
impl Handler<PrivateMessage> for Persistence {
type Result = ResponseFuture<()>;
fn handle(&mut self, msg: PrivateMessage, _ctx: &mut Self::Context) -> Self::Result {
let conn = self.database.clone();
let timestamp = self.monotonically_increasing_id();
Box::pin(async move {
sqlx::query(
"INSERT INTO private_messages
(timestamp, sender, receiver, message)
VALUES (?, ?, ?, ?)",
)
.bind(timestamp)
.bind(msg.sender)
.bind(msg.receiver)
.bind(msg.message)
.execute(&conn)
.await
.unwrap();
})
}
}
impl Handler<FetchUnseenPrivateMessages> for Persistence {
type Result = ResponseFuture<Vec<(String, String)>>;
fn handle(
&mut self,
msg: FetchUnseenPrivateMessages,
_ctx: &mut Self::Context,
) -> Self::Result {
let conn = self.database.clone();
Box::pin(async move {
sqlx::query_as(
"DELETE FROM private_messages
WHERE receiver = ?
RETURNING sender, message",
)
.bind(msg.user_id)
.fetch_all(&conn)
.await
.unwrap()
})
}
}
impl Handler<FetchUnseenMessages> for Persistence {
impl Handler<FetchUnseenChannelMessages> for Persistence {
type Result = ResponseFuture<Vec<(String, String)>>;
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: FetchUnseenMessages, _ctx: &mut Self::Context) -> Self::Result {
fn handle(
&mut self,
msg: FetchUnseenChannelMessages,
_ctx: &mut Self::Context,
) -> Self::Result {
let conn = self.database.clone();
let max_message_reply_since =
Utc::now() - chrono::Duration::from_std(self.max_message_replay_since).unwrap();
@@ -21,7 +21,7 @@
connection::InitiatedConnection,
messages::{
Broadcast, ChannelFetchTopic, ChannelJoin, ChannelList, ChannelMemberList,
FetchClientByNick, PeerToPeerMessage, ServerDisconnect, ServerFetchMotd, UserConnected,
FetchClientByNick, PrivateMessage, ServerDisconnect, ServerFetchMotd, UserConnected,
UserNickChange, UserNickChangeInternal,
},
persistence::Persistence,
@@ -73,7 +73,11 @@
),
(
Response::RPL_YOURHOST,
vec!["Your host is a sick kid".into()],
vec![format!(
"Your host is {SERVER_NAME}, running version {}",
crate_version!()
)
.into()],
),
(
Response::RPL_CREATED,
@@ -257,35 +261,44 @@
}
}
impl Handler<PeerToPeerMessage> for Server {
impl Handler<PrivateMessage> for Server {
type Result = ();
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: PeerToPeerMessage, _ctx: &mut Self::Context) -> Self::Result {
fn handle(&mut self, msg: PrivateMessage, _ctx: &mut Self::Context) -> Self::Result {
let Some(source) = self.clients.get(&msg.from) else {
return;
};
let target = self
let mut seen_by_user = false;
for (target, target_conn) in self
.clients
.iter()
.find(|(_handle, connection)| connection.nick == msg.destination);
let Some((target, _)) = target else {
return;
};
.filter(|(_handle, connection)| connection.user_id == msg.destination)
{
target.do_send(Broadcast {
message: Message {
tags: None,
prefix: Some(source.to_nick()),
command: Command::PRIVMSG(target_conn.nick.clone(), msg.message.clone()),
},
span: msg.span.clone(),
});
seen_by_user = true;
}
target.do_send(Broadcast {
message: Message {
tags: None,
prefix: Some(source.to_nick()),
command: Command::PRIVMSG(msg.destination, msg.message),
},
span: msg.span,
});
if !seen_by_user {
self.persistence
.do_send(crate::persistence::events::PrivateMessage {
sender: source.to_nick().to_string(),
receiver: msg.destination,
message: msg.message,
});
}
}
}
@@ -48,6 +48,6 @@
}
pub fn verify_password(password: &[u8], hash: &PasswordHash) -> argon2::password_hash::Result<()> {
pub fn verify_password(password: &[u8], hash: &PasswordHash<'_>) -> argon2::password_hash::Result<()> {
Argon2::default().verify_password(password, hash)
}
@@ -52,17 +52,38 @@
}
#[derive(Message)]
#[rtype(result = "Option<UserId>")]
pub struct FetchUserIdByNick {
pub nick: String,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct ChannelMessage {
pub channel_id: ChannelId,
pub sender: String,
pub message: String,
pub receivers: Vec<UserId>,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct PrivateMessage {
pub sender: String,
pub receiver: UserId,
pub message: String,
}
#[derive(Message)]
#[rtype(result = "Vec<(String, String)>")]
pub struct FetchUnseenPrivateMessages {
pub user_id: UserId,
pub span: Span,
}
#[derive(Message)]
#[rtype(result = "Vec<(String, String)>")]
pub struct FetchUnseenMessages {
pub struct FetchUnseenChannelMessages {
pub channel_name: String,
pub user_id: UserId,
pub span: Span,