Send all (max 500) unseen messages to the user when they connect to the server
Diff
migrations/2023010814480_initial-schema.sql | 11 +++++++++++
src/channel.rs | 8 ++++++++
src/client.rs | 49 ++++++++++++++++++++++++++++++++++++++++++++++---
src/persistence.rs | 90 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/persistence/events.rs | 17 +++++++++++++++++
5 files changed, 161 insertions(+), 14 deletions(-)
@@ -13,13 +13,24 @@
CREATE UNIQUE INDEX channel_name ON channels(name);
CREATE TABLE channel_messages (
channel INT NOT NULL,
idx INT NOT NULL,
sender VARCHAR(255),
message VARCHAR(255),
FOREIGN KEY(channel) REFERENCES channels(id),
PRIMARY KEY(channel, idx)
);
CREATE TABLE channel_users (
channel INT NOT NULL,
user INT NOT NULL,
permissions INT NOT NULL DEFAULT 0,
in_channel BOOLEAN DEFAULT false,
last_seen_message_idx INT,
FOREIGN KEY(user) REFERENCES users(id),
FOREIGN KEY(channel) REFERENCES channels(id)
);
CREATE UNIQUE INDEX channel_user ON channel_users(channel, user);
@@ -85,6 +85,14 @@
let nick = sender.to_nick();
self.persistence
.do_send(crate::persistence::events::ChannelMessage {
channel_name: self.name.to_string(),
sender: nick.to_string(),
message: msg.message.to_string(),
receivers: self.clients.values().map(|v| v.user.to_string()).collect(),
});
for client in self.clients.keys() {
if client == &msg.client {
@@ -19,7 +19,10 @@
ServerDisconnect, ServerFetchMotd, UserKickedFromChannel, UserNickChange,
UserNickChangeInternal,
},
persistence::{events::FetchUserChannels, Persistence},
persistence::{
events::{FetchUnseenMessages, FetchUserChannels},
Persistence,
},
server::Server,
SERVER_NAME,
};
@@ -168,21 +171,30 @@
for channel_name in msg.channels {
if !channel_name.is_channel_name() {
if !channel_name.is_channel_name() || self.channels.contains_key(&channel_name) {
continue;
}
let channel_handle_fut = self.server.clone().send(ChannelJoin {
channel_name: channel_name.to_string(),
client: ctx.address(),
connection: self.connection.clone(),
span: Span::current(),
});
let channel_messages_fut = self.persistence.send(FetchUnseenMessages {
channel_name: channel_name.to_string(),
username: self.connection.user.to_string(),
span: Span::current(),
});
futures.push(
self.server
.clone()
.send(ChannelJoin {
channel_name: channel_name.to_string(),
client: ctx.address(),
connection: self.connection.clone(),
span: Span::current(),
})
.map(move |v| (channel_name, v.unwrap().unwrap())),
futures::future::join(channel_handle_fut, channel_messages_fut).map(
move |(handle, messages)| {
(channel_name, handle.unwrap().unwrap(), messages.unwrap())
},
),
);
}
@@ -191,9 +203,20 @@
let fut = wrap_future::<_, Self>(
futures::future::join_all(futures.into_iter()).instrument(Span::current()),
)
.map(|result, this, _ctx| {
for (channel_name, handle) in result {
.map(|result, this, ctx| {
for (channel_name, handle, messages) in result {
this.channels.insert(channel_name.clone(), handle);
for (source, message) in messages {
ctx.notify(Broadcast {
message: Message {
tags: None,
prefix: Some(Prefix::new_from_str(&source)),
command: Command::PRIVMSG(channel_name.clone(), message),
},
span: this.span.clone(),
});
}
}
});
@@ -1,9 +1,13 @@
pub mod events;
use actix::{Context, Handler, ResponseFuture};
use itertools::Itertools;
use tracing::instrument;
use crate::persistence::events::{ChannelCreated, ChannelJoined, ChannelParted, FetchUserChannels};
use crate::persistence::events::{
ChannelCreated, ChannelJoined, ChannelMessage, ChannelParted, FetchUnseenMessages,
FetchUserChannels,
};
pub struct Persistence {
@@ -104,6 +108,90 @@
.into_iter()
.map(|(v,)| v)
.collect()
})
}
}
impl Handler<ChannelMessage> for Persistence {
type Result = ResponseFuture<()>;
fn handle(&mut self, msg: ChannelMessage, _ctx: &mut Self::Context) -> Self::Result {
let conn = self.database.clone();
Box::pin(async move {
let (channel, idx): (i64, i64) = sqlx::query_as(
"WITH channel AS (SELECT id FROM channels WHERE name = ?)
INSERT INTO channel_messages (channel, idx, sender, message)
SELECT
channel.id,
COALESCE((SELECT MAX(idx) + 1 FROM channel_messages WHERE channel = channel.id), 0),
?,
?
FROM channel
RETURNING channel, idx",
)
.bind(msg.channel_name)
.bind(msg.sender)
.bind(msg.message)
.fetch_one(&conn)
.await
.unwrap();
let query = format!(
"UPDATE channel_users
SET last_seen_message_idx = ?
WHERE channel = ?
AND user IN (SELECT id FROM users WHERE username IN ({}))",
msg.receivers.iter().map(|_| "?").join(",")
);
let mut query = sqlx::query(&query).bind(idx).bind(channel);
for receiver in msg.receivers {
query = query.bind(receiver);
}
query.execute(&conn).await.unwrap();
})
}
}
impl Handler<FetchUnseenMessages> for Persistence {
type Result = ResponseFuture<Vec<(String, String)>>;
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: FetchUnseenMessages, _ctx: &mut Self::Context) -> Self::Result {
let conn = self.database.clone();
Box::pin(async move {
let res = sqlx::query_as(
"WITH channel AS (SELECT id FROM channels WHERE name = ?)
SELECT sender, message
FROM channel_messages
WHERE channel = (SELECT id FROM channel)
AND idx > MAX(
(
SELECT MAX(0, MAX(idx) - 500)
FROM channel_messages
WHERE channel = (SELECT id FROM channel)
),
(
SELECT last_seen_message_idx
FROM channel_users
WHERE channel = (SELECT id FROM channel)
AND user = (SELECT id FROM users WHERE username = ?)
)
)
ORDER BY idx DESC",
)
.bind(msg.channel_name.to_string())
.bind(msg.username.to_string())
.fetch_all(&conn)
.await
.unwrap();
res
})
}
}
@@ -29,3 +29,20 @@
pub username: String,
pub span: Span,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct ChannelMessage {
pub channel_name: String,
pub sender: String,
pub message: String,
pub receivers: Vec<String>,
}
#[derive(Message)]
#[rtype(result = "Vec<(String, String)>")]
pub struct FetchUnseenMessages {
pub channel_name: String,
pub username: String,
pub span: Span,
}