Implement NOTICE command
Diff
migrations/2023010814480_initial-schema.sql | 2 ++
src/channel.rs | 12 +++++++++++-
src/client.rs | 35 +++++++++++++++++++++++++++++++++--
src/messages.rs | 11 +++++++++++
src/persistence.rs | 25 +++++++++++++++++++------
src/server.rs | 22 ++++++++++++++++------
src/persistence/events.rs | 7 +++++--
7 files changed, 85 insertions(+), 29 deletions(-)
@@ -24,6 +24,7 @@
timestamp INT NOT NULL,
sender VARCHAR(255),
message VARCHAR(255),
kind SMALLINT NOT NULL,
FOREIGN KEY(channel) REFERENCES channels(id),
PRIMARY KEY(channel, timestamp)
);
@@ -45,6 +46,7 @@
sender VARCHAR(255) NOT NULL,
receiver INT NOT NULL,
message VARCHAR(255) NOT NULL,
kind SMALLINT NOT NULL,
FOREIGN KEY(receiver) REFERENCES users(id)
);
@@ -24,7 +24,7 @@
messages::{
Broadcast, ChannelFetchTopic, ChannelInvite, ChannelJoin, ChannelKickUser,
ChannelMemberList, ChannelMessage, ChannelPart, ChannelSetMode, ChannelUpdateTopic,
FetchClientByNick, ServerDisconnect, UserKickedFromChannel, UserNickChange,
FetchClientByNick, MessageKind, ServerDisconnect, UserKickedFromChannel, UserNickChange,
},
persistence::{
events::{FetchAllUserChannelPermissions, SetUserChannelPermissions},
@@ -152,6 +152,7 @@
sender: nick.to_string(),
message: msg.message.to_string(),
receivers: self.clients.values().map(|v| v.user_id).collect(),
kind: msg.kind,
});
for client in self.clients.keys() {
@@ -166,7 +167,14 @@
message: Message {
tags: None,
prefix: Some(nick.clone()),
command: Command::PRIVMSG(self.name.to_string(), msg.message.clone()),
command: match msg.kind {
MessageKind::Normal => {
Command::PRIVMSG(self.name.to_string(), msg.message.clone())
}
MessageKind::Notice => {
Command::NOTICE(self.name.to_string(), msg.message.clone())
}
},
},
});
}
@@ -19,7 +19,7 @@
messages::{
Broadcast, ChannelFetchTopic, ChannelInvite, ChannelJoin, ChannelKickUser, ChannelList,
ChannelMemberList, ChannelMessage, ChannelPart, ChannelSetMode, ChannelUpdateTopic,
FetchClientDetails, PrivateMessage, ServerDisconnect, ServerFetchMotd,
FetchClientDetails, MessageKind, PrivateMessage, ServerDisconnect, ServerFetchMotd,
UserKickedFromChannel, UserNickChange, UserNickChangeInternal,
},
persistence::{
@@ -129,14 +129,21 @@
})
.into_actor(self)
.map(move |res, this, ctx| {
for (sent, sender, message) in res.unwrap() {
for (sent, sender, message, kind) in res.unwrap() {
ctx.notify(Broadcast {
message: Message {
tags: TagBuilder::default()
.insert(this.maybe_build_time_tag(sent))
.into(),
prefix: Some(Prefix::new_from_str(&sender)),
command: Command::PRIVMSG(this.connection.nick.clone(), message),
command: match kind {
MessageKind::Normal => {
Command::PRIVMSG(this.connection.nick.clone(), message)
}
MessageKind::Notice => {
Command::NOTICE(this.connection.nick.clone(), message)
}
},
},
span: this.span.clone(),
});
@@ -263,13 +270,16 @@
this.channels.insert(channel_name.clone(), handle);
for (sent, source, message) in messages {
for (sent, source, message, kind) in messages {
this.writer.write(Message {
tags: TagBuilder::default()
.insert(this.maybe_build_time_tag(sent))
.into(),
prefix: Some(Prefix::new_from_str(&source)),
command: Command::PRIVMSG(channel_name.clone(), message),
command: match kind {
MessageKind::Normal => Command::PRIVMSG(channel_name.clone(), message),
MessageKind::Notice => Command::NOTICE(channel_name.clone(), message),
},
});
}
}
@@ -401,7 +411,7 @@
nick: msg.destination,
})
.into_actor(self)
.map(|res, this, ctx| {
.map(move |res, this, ctx| {
let Some(destination) = res.unwrap() else {
eprintln!("User attempted to send a message to non-existent user");
@@ -411,6 +421,7 @@
this.server.do_send(PrivateMessage {
destination,
message: msg.message,
kind: msg.kind,
from: ctx.address(),
span: msg.span,
});
@@ -590,18 +601,26 @@
});
}
}
Command::PRIVMSG(target, message) => {
command @ (Command::NOTICE(_, _) | Command::PRIVMSG(_, _)) => {
let (target, message, kind) = match command {
Command::PRIVMSG(target, message) => (target, message, MessageKind::Normal),
Command::NOTICE(target, message) => (target, message, MessageKind::Notice),
_ => unreachable!(),
};
if !target.is_channel_name() {
ctx.notify(SendPrivateMessage {
destination: target,
message,
kind,
span: Span::current(),
});
} else if let Some(channel) = self.channels.get(&target) {
channel.do_send(ChannelMessage {
client: ctx.address(),
message,
kind,
span: Span::current(),
});
} else {
@@ -609,7 +628,6 @@
error!("User not connected to channel");
}
}
Command::NOTICE(_, _) => {}
Command::MOTD(_) => {
let span = Span::current();
let fut = self
@@ -770,6 +788,7 @@
struct SendPrivateMessage {
destination: String,
message: String,
kind: MessageKind,
span: Span,
}
@@ -148,11 +148,21 @@
pub span: Span,
}
#[derive(Copy, Clone, Debug, sqlx::Type)]
#[repr(i16)]
pub enum MessageKind {
Normal = 0,
Notice = 1,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct ChannelMessage {
pub client: Addr<Client>,
pub kind: MessageKind,
pub message: String,
pub span: Span,
}
@@ -179,6 +189,7 @@
pub struct PrivateMessage {
pub destination: UserId,
pub message: String,
pub kind: MessageKind,
pub from: Addr<Client>,
pub span: Span,
}
@@ -10,6 +10,7 @@
use crate::{
channel::permissions::Permission,
connection::UserId,
messages::MessageKind,
persistence::events::{
ChannelCreated, ChannelJoined, ChannelMessage, ChannelParted,
FetchAllUserChannelPermissions, FetchUnseenChannelMessages, FetchUnseenPrivateMessages,
@@ -234,12 +235,13 @@
Box::pin(async move {
sqlx::query(
"INSERT INTO channel_messages (channel, timestamp, sender, message) VALUES (?, ?, ?, ?)",
"INSERT INTO channel_messages (channel, timestamp, sender, message, kind) VALUES (?, ?, ?, ?, ?)",
)
.bind(msg.channel_id.0)
.bind(timestamp)
.bind(msg.sender)
.bind(msg.message)
.bind(msg.kind)
.execute(&conn)
.await
.unwrap();
@@ -274,13 +276,14 @@
Box::pin(async move {
sqlx::query(
"INSERT INTO private_messages
(timestamp, sender, receiver, message)
VALUES (?, ?, ?, ?)",
(timestamp, sender, receiver, message, kind)
VALUES (?, ?, ?, ?, ?)",
)
.bind(timestamp)
.bind(msg.sender)
.bind(msg.receiver)
.bind(msg.message)
.bind(msg.kind)
.execute(&conn)
.await
.unwrap();
@@ -289,7 +292,7 @@
}
impl Handler<FetchUnseenPrivateMessages> for Persistence {
type Result = ResponseFuture<Vec<(DateTime<Utc>, String, String)>>;
type Result = ResponseFuture<Vec<(DateTime<Utc>, String, String, MessageKind)>>;
fn handle(
&mut self,
@@ -302,21 +305,23 @@
sqlx::query_as(
"DELETE FROM private_messages
WHERE receiver = ?
RETURNING timestamp, sender, message",
RETURNING timestamp, sender, message, kind",
)
.bind(msg.user_id)
.fetch_all(&conn)
.await
.unwrap()
.into_iter()
.map(|(timestamp, sender, message)| (Utc.timestamp_nanos(timestamp), sender, message))
.map(|(timestamp, sender, message, kind)| {
(Utc.timestamp_nanos(timestamp), sender, message, kind)
})
.collect()
})
}
}
impl Handler<FetchUnseenChannelMessages> for Persistence {
type Result = ResponseFuture<Vec<(DateTime<Utc>, String, String)>>;
type Result = ResponseFuture<Vec<(DateTime<Utc>, String, String, MessageKind)>>;
#[instrument(parent = &msg.span, skip_all)]
fn handle(
@@ -333,7 +338,7 @@
sqlx::query_as(
"WITH channel AS (SELECT id FROM channels WHERE name = ?)
SELECT timestamp, sender, message
SELECT timestamp, sender, message, kind
FROM channel_messages
WHERE channel = (SELECT id FROM channel)
AND timestamp > MAX(
@@ -354,7 +359,9 @@
.await
.unwrap()
.into_iter()
.map(|(timestamp, sender, message)| (Utc.timestamp_nanos(timestamp), sender, message))
.map(|(timestamp, sender, message, kind)| {
(Utc.timestamp_nanos(timestamp), sender, message, kind)
})
.collect()
})
}
@@ -21,8 +21,8 @@
connection::InitiatedConnection,
messages::{
Broadcast, ChannelFetchTopic, ChannelJoin, ChannelList, ChannelMemberList,
FetchClientByNick, PrivateMessage, ServerDisconnect, ServerFetchMotd, UserConnected,
UserNickChange, UserNickChangeInternal,
FetchClientByNick, MessageKind, PrivateMessage, ServerDisconnect, ServerFetchMotd,
UserConnected, UserNickChange, UserNickChangeInternal,
},
persistence::Persistence,
server::response::Motd,
@@ -274,16 +274,21 @@
let mut seen_by_user = false;
for (target, target_conn) in self
.clients
.iter()
.filter(|(_handle, connection)| connection.user_id == msg.destination)
{
for (target, target_conn) in self.clients.iter().filter(|(handle, connection)| {
connection.user_id == msg.destination && msg.from != **handle
}) {
target.do_send(Broadcast {
message: Message {
tags: None,
prefix: Some(source.to_nick()),
command: Command::PRIVMSG(target_conn.nick.clone(), msg.message.clone()),
command: match msg.kind {
MessageKind::Normal => {
Command::PRIVMSG(target_conn.nick.clone(), msg.message.clone())
}
MessageKind::Notice => {
Command::NOTICE(target_conn.nick.clone(), msg.message.clone())
}
},
},
span: msg.span.clone(),
});
@@ -297,6 +302,7 @@
sender: source.to_nick().to_string(),
receiver: msg.destination,
message: msg.message,
kind: msg.kind,
});
}
}
@@ -7,6 +7,7 @@
use crate::{
channel::{permissions::Permission, ChannelId},
connection::UserId,
messages::MessageKind,
};
#[derive(Message)]
@@ -65,6 +66,7 @@
pub sender: String,
pub message: String,
pub receivers: Vec<UserId>,
pub kind: MessageKind,
}
#[derive(Message)]
@@ -73,17 +75,18 @@
pub sender: String,
pub receiver: UserId,
pub message: String,
pub kind: MessageKind,
}
#[derive(Message)]
#[rtype(result = "Vec<(DateTime<Utc>, String, String)>")]
#[rtype(result = "Vec<(DateTime<Utc>, String, String, MessageKind)>")]
pub struct FetchUnseenPrivateMessages {
pub user_id: UserId,
pub span: Span,
}
#[derive(Message)]
#[rtype(result = "Vec<(DateTime<Utc>, String, String)>")]
#[rtype(result = "Vec<(DateTime<Utc>, String, String, MessageKind)>")]
pub struct FetchUnseenChannelMessages {
pub channel_name: String,
pub user_id: UserId,