🏡 index : ~doyle/titanirc.git

author Jordan Doyle <jordan@doyle.la> 2023-02-01 22:54:55.0 +00:00:00
committer Jordan Doyle <jordan@doyle.la> 2023-02-01 22:54:55.0 +00:00:00
commit
44a1108518cc324a9441a63064cea6860a7e4cf6 [patch]
tree
f69144e3dfa2808b717c725ef4060e1c700db407
parent
c471119d217ab39539ffabced0caeed3ff9640ea
download
44a1108518cc324a9441a63064cea6860a7e4cf6.tar.gz

Implement offline private messaging



Diff

 migrations/2023010814480_initial-schema.sql | 10 ++++++++++
 src/client.rs                               | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 src/messages.rs                             | 10 +++++++---
 src/persistence.rs                          | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 src/server.rs                               | 53 ++++++++++++++++++++++++++++++++++++++++-------------
 src/database/mod.rs                         |  2 +-
 src/persistence/events.rs                   | 23 +++++++++++++++++++++++
 7 files changed, 216 insertions(+), 33 deletions(-)

diff --git a/migrations/2023010814480_initial-schema.sql b/migrations/2023010814480_initial-schema.sql
index 8d76446..72079f0 100644
--- a/migrations/2023010814480_initial-schema.sql
+++ a/migrations/2023010814480_initial-schema.sql
@@ -39,3 +39,13 @@
    -- FOREIGN KEY(channel, last_seen_message_timestamp) REFERENCES channel_messages(channel, timestamp)
    PRIMARY KEY(channel, user)
);

CREATE TABLE private_messages (
    timestamp INT NOT NULL PRIMARY KEY,
    sender VARCHAR(255) NOT NULL,
    receiver INT NOT NULL,
    message VARCHAR(255) NOT NULL,
    FOREIGN KEY(receiver) REFERENCES users(id)
);

CREATE INDEX private_messages_receiver ON private_messages(receiver);
diff --git a/src/client.rs b/src/client.rs
index b9e67a2..656e4ce 100644
--- a/src/client.rs
+++ a/src/client.rs
@@ -16,11 +16,14 @@
    messages::{
        Broadcast, ChannelFetchTopic, ChannelInvite, ChannelJoin, ChannelKickUser, ChannelList,
        ChannelMemberList, ChannelMessage, ChannelPart, ChannelSetMode, ChannelUpdateTopic,
        FetchClientDetails, PeerToPeerMessage, ServerDisconnect, ServerFetchMotd,
        FetchClientDetails, PrivateMessage, ServerDisconnect, ServerFetchMotd,
        UserKickedFromChannel, UserNickChange, UserNickChangeInternal,
    },
    persistence::{
        events::{FetchUnseenMessages, FetchUserChannels, ReserveNick},
        events::{
            FetchUnseenChannelMessages, FetchUnseenPrivateMessages, FetchUserChannels,
            FetchUserIdByNick, ReserveNick,
        },
        Persistence,
    },
    server::Server,
@@ -94,6 +97,27 @@
                    });
                }),
        );

        ctx.spawn(
            self.persistence
                .send(FetchUnseenPrivateMessages {
                    user_id: self.connection.user_id,
                    span: Span::current(),
                })
                .into_actor(self)
                .map(move |res, this, ctx| {
                    for (sender, message) in res.unwrap() {
                        ctx.notify(Broadcast {
                            message: Message {
                                tags: None,
                                prefix: Some(Prefix::new_from_str(&sender)),
                                command: Command::PRIVMSG(this.connection.nick.clone(), message),
                            },
                            span: this.span.clone(),
                        });
                    }
                }),
        );
    }

    /// Called when the actor is shutting down, either gracefully by the client or forcefully

@@ -185,7 +209,7 @@
                span: Span::current(),
            });

            let channel_messages_fut = self.persistence.send(FetchUnseenMessages {
            let channel_messages_fut = self.persistence.send(FetchUnseenChannelMessages {
                channel_name: channel_name.to_string(),
                user_id: self.connection.user_id,
                span: Span::current(),
@@ -334,6 +358,35 @@
    #[instrument(parent = &msg.span, skip_all)]
    fn handle(&mut self, msg: UserKickedFromChannel, _ctx: &mut Self::Context) -> Self::Result {
        self.channels.remove(&msg.channel);
    }
}

/// Self-message to send a peer-to-peer message via the server.

impl Handler<SendPrivateMessage> for Client {
    type Result = ResponseActFuture<Self, ()>;

    #[instrument(parent = &msg.span, skip_all)]
    fn handle(&mut self, msg: SendPrivateMessage, _ctx: &mut Self::Context) -> Self::Result {
        self.persistence
            .send(FetchUserIdByNick {
                nick: msg.destination,
            })
            .into_actor(self)
            .map(|res, this, ctx| {
                let Some(destination) = res.unwrap() else {
                    // TODO
                    eprintln!("User attempted to send a message to non-existent user");
                    return;
                };

                this.server.do_send(PrivateMessage {
                    destination,
                    message: msg.message,
                    from: ctx.address(),
                    span: msg.span,
                });
            })
            .boxed_local()
    }
}

@@ -511,10 +564,9 @@
            Command::PRIVMSG(target, message) => {
                if !target.is_channel_name() {
                    // private message to another user
                    self.server.do_send(PeerToPeerMessage {
                    ctx.notify(SendPrivateMessage {
                        destination: target,
                        message,
                        from: ctx.address(),
                        span: Span::current(),
                    });
                } else if let Some(channel) = self.channels.get(&target) {
@@ -643,6 +695,15 @@
        error!(%error, "Failed to write message to client");
        Running::Continue
    }
}

/// A [`Client`] internal self-notification to send a peer-to-peer message to another user

#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
struct SendPrivateMessage {
    destination: String,
    message: String,
    span: Span,
}

/// A [`Client`] internal self-notification to grab a list of users in each channel

diff --git a/src/messages.rs b/src/messages.rs
index 68dfbdf..e82b5b1 100644
--- a/src/messages.rs
+++ a/src/messages.rs
@@ -1,9 +1,13 @@
use actix::{Addr, Message};
use anyhow::Result;
use irc_proto::{ChannelMode, Mode};
use tracing::Span;

use crate::{channel::Channel, client::Client, connection::InitiatedConnection};
use crate::{
    channel::Channel,
    client::Client,
    connection::{InitiatedConnection, UserId},
};

/// Sent when a user is connecting to the server.

#[derive(Message, Clone)]
@@ -172,8 +176,8 @@
/// Sends a private message between two users.

#[derive(Message)]
#[rtype(result = "()")]
pub struct PeerToPeerMessage {
    pub destination: String,
pub struct PrivateMessage {
    pub destination: UserId,
    pub message: String,
    pub from: Addr<Client>,
    pub span: Span,
diff --git a/src/persistence.rs b/src/persistence.rs
index 7f806ce..75b50e7 100644
--- a/src/persistence.rs
+++ a/src/persistence.rs
@@ -12,7 +12,8 @@
    connection::UserId,
    persistence::events::{
        ChannelCreated, ChannelJoined, ChannelMessage, ChannelParted,
        FetchAllUserChannelPermissions, FetchUnseenMessages, FetchUserChannels, ReserveNick,
        FetchAllUserChannelPermissions, FetchUnseenChannelMessages, FetchUnseenPrivateMessages,
        FetchUserChannels, FetchUserIdByNick, PrivateMessage, ReserveNick,
        SetUserChannelPermissions,
    },
};
@@ -203,6 +204,27 @@
    }
}

impl Handler<FetchUserIdByNick> for Persistence {
    type Result = ResponseFuture<Option<UserId>>;

    fn handle(&mut self, msg: FetchUserIdByNick, _ctx: &mut Self::Context) -> Self::Result {
        let conn = self.database.clone();

        Box::pin(async move {
            sqlx::query_as(
                "SELECT user
                 FROM user_nicks
                 WHERE nick = ?",
            )
            .bind(msg.nick)
            .fetch_optional(&conn)
            .await
            .unwrap()
            .map(|(v,)| v)
        })
    }
}

impl Handler<ChannelMessage> for Persistence {
    type Result = ResponseFuture<()>;

@@ -238,15 +260,67 @@

                query.execute(&conn).await.unwrap();
            }
        })
    }
}

impl Handler<PrivateMessage> for Persistence {
    type Result = ResponseFuture<()>;

    fn handle(&mut self, msg: PrivateMessage, _ctx: &mut Self::Context) -> Self::Result {
        let conn = self.database.clone();
        let timestamp = self.monotonically_increasing_id();

        Box::pin(async move {
            sqlx::query(
                "INSERT INTO private_messages
                 (timestamp, sender, receiver, message)
                 VALUES (?, ?, ?, ?)",
            )
            .bind(timestamp)
            .bind(msg.sender)
            .bind(msg.receiver)
            .bind(msg.message)
            .execute(&conn)
            .await
            .unwrap();
        })
    }
}

impl Handler<FetchUnseenPrivateMessages> for Persistence {
    type Result = ResponseFuture<Vec<(String, String)>>;

    fn handle(
        &mut self,
        msg: FetchUnseenPrivateMessages,
        _ctx: &mut Self::Context,
    ) -> Self::Result {
        let conn = self.database.clone();

        Box::pin(async move {
            sqlx::query_as(
                "DELETE FROM private_messages
                 WHERE receiver = ?
                 RETURNING sender, message",
            )
            .bind(msg.user_id)
            .fetch_all(&conn)
            .await
            .unwrap()
        })
    }
}

impl Handler<FetchUnseenMessages> for Persistence {
impl Handler<FetchUnseenChannelMessages> for Persistence {
    type Result = ResponseFuture<Vec<(String, String)>>;

    #[instrument(parent = &msg.span, skip_all)]
    fn handle(&mut self, msg: FetchUnseenMessages, _ctx: &mut Self::Context) -> Self::Result {
    fn handle(
        &mut self,
        msg: FetchUnseenChannelMessages,
        _ctx: &mut Self::Context,
    ) -> Self::Result {
        let conn = self.database.clone();
        let max_message_reply_since =
            Utc::now() - chrono::Duration::from_std(self.max_message_replay_since).unwrap();
diff --git a/src/server.rs b/src/server.rs
index ea70404..218799e 100644
--- a/src/server.rs
+++ a/src/server.rs
@@ -21,7 +21,7 @@
    connection::InitiatedConnection,
    messages::{
        Broadcast, ChannelFetchTopic, ChannelJoin, ChannelList, ChannelMemberList,
        FetchClientByNick, PeerToPeerMessage, ServerDisconnect, ServerFetchMotd, UserConnected,
        FetchClientByNick, PrivateMessage, ServerDisconnect, ServerFetchMotd, UserConnected,
        UserNickChange, UserNickChangeInternal,
    },
    persistence::Persistence,
@@ -73,7 +73,11 @@
            ),
            (
                Response::RPL_YOURHOST,
                vec!["Your host is a sick kid".into()],
                vec![format!(
                    "Your host is {SERVER_NAME}, running version {}",
                    crate_version!()
                )
                .into()],
            ),
            (
                Response::RPL_CREATED,
@@ -257,35 +261,44 @@
    }
}

// TODO: implement offline messaging and replay
impl Handler<PeerToPeerMessage> for Server {
impl Handler<PrivateMessage> for Server {
    type Result = ();

    #[instrument(parent = &msg.span, skip_all)]
    fn handle(&mut self, msg: PeerToPeerMessage, _ctx: &mut Self::Context) -> Self::Result {
    fn handle(&mut self, msg: PrivateMessage, _ctx: &mut Self::Context) -> Self::Result {
        let Some(source) = self.clients.get(&msg.from) else {
            // user is not yet registered with the server
            return;
        };

        // TODO: O(1) lookup of users by nick
        let target = self
        let mut seen_by_user = false;

        // TODO: O(1) lookup of users by id
        for (target, target_conn) in self
            .clients
            .iter()
            .find(|(_handle, connection)| connection.nick == msg.destination);
        let Some((target, _)) = target else {
            // return error to caller that user does not exist
            return;
        };
            .filter(|(_handle, connection)| connection.user_id == msg.destination)
        {
            target.do_send(Broadcast {
                message: Message {
                    tags: None,
                    prefix: Some(source.to_nick()),
                    command: Command::PRIVMSG(target_conn.nick.clone(), msg.message.clone()),
                },
                span: msg.span.clone(),
            });

            seen_by_user = true;
        }

        target.do_send(Broadcast {
            message: Message {
                tags: None,
                prefix: Some(source.to_nick()),
                command: Command::PRIVMSG(msg.destination, msg.message),
            },
            span: msg.span,
        });
        if !seen_by_user {
            self.persistence
                .do_send(crate::persistence::events::PrivateMessage {
                    sender: source.to_nick().to_string(),
                    receiver: msg.destination,
                    message: msg.message,
                });
        }
    }
}

diff --git a/src/database/mod.rs b/src/database/mod.rs
index 16aac39..b830103 100644
--- a/src/database/mod.rs
+++ a/src/database/mod.rs
@@ -48,6 +48,6 @@
}

/// Compares a password to a hash stored in the database.

pub fn verify_password(password: &[u8], hash: &PasswordHash) -> argon2::password_hash::Result<()> {
pub fn verify_password(password: &[u8], hash: &PasswordHash<'_>) -> argon2::password_hash::Result<()> {
    Argon2::default().verify_password(password, hash)
}
diff --git a/src/persistence/events.rs b/src/persistence/events.rs
index 1591817..390a5fc 100644
--- a/src/persistence/events.rs
+++ a/src/persistence/events.rs
@@ -52,17 +52,38 @@
}

#[derive(Message)]
#[rtype(result = "Option<UserId>")]
pub struct FetchUserIdByNick {
    pub nick: String,
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct ChannelMessage {
    pub channel_id: ChannelId,
    pub sender: String,
    pub message: String,
    pub receivers: Vec<UserId>,
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct PrivateMessage {
    pub sender: String,
    pub receiver: UserId,
    pub message: String,
}

#[derive(Message)]
#[rtype(result = "Vec<(String, String)>")]
pub struct FetchUnseenPrivateMessages {
    pub user_id: UserId,
    pub span: Span,
}

#[derive(Message)]
#[rtype(result = "Vec<(String, String)>")]
pub struct FetchUnseenMessages {
pub struct FetchUnseenChannelMessages {
    pub channel_name: String,
    pub user_id: UserId,
    pub span: Span,