🏡 index : ~doyle/titanirc.git

author Jordan Doyle <jordan@doyle.la> 2023-01-08 20:51:48.0 +00:00:00
committer Jordan Doyle <jordan@doyle.la> 2023-01-08 20:55:10.0 +00:00:00
commit
624dc3d1a1395a727a8b974defeb1ae71fb6abf1 [patch]
tree
377a0df08a7739d5b2bec6c4cfb93d0ddba21fb4
parent
636411f8e0cbab37ba3676f045e8f1434a29189f
download
624dc3d1a1395a727a8b974defeb1ae71fb6abf1.tar.gz

Avoid lookups by storing the channel's id



Diff

 src/channel.rs            | 36 ++++++++++++++++++++++++++----------
 src/persistence.rs        | 48 +++++++++++++++++++++++++-----------------------
 src/persistence/events.rs | 10 +++++-----
 src/server.rs             |  3 ++-
 4 files changed, 58 insertions(+), 39 deletions(-)

diff --git a/src/channel.rs b/src/channel.rs
index c5a6c81..9498509 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -3,8 +3,8 @@ pub mod response;
use std::collections::HashMap;

use actix::{
    Actor, ActorFutureExt, Addr, AsyncContext, Context, Handler, MessageResult, ResponseActFuture,
    Supervised, WrapFuture,
    Actor, ActorContext, ActorFutureExt, Addr, AsyncContext, Context, Handler, MessageResult,
    ResponseActFuture, Supervised, WrapFuture,
};
use chrono::{DateTime, Utc};
use futures::future::Either;
@@ -24,6 +24,9 @@ use crate::{
    server::Server,
};

#[derive(Copy, Clone)]
pub struct ChannelId(pub i64);

/// A channel is an IRC channel (ie. #abc) that multiple users can connect to in order
/// to chat together.
pub struct Channel {
@@ -32,16 +35,29 @@ pub struct Channel {
    pub clients: HashMap<Addr<Client>, InitiatedConnection>,
    pub topic: Option<CurrentChannelTopic>,
    pub persistence: Addr<Persistence>,
    pub channel_id: ChannelId,
}

impl Actor for Channel {
    type Context = Context<Self>;

    fn started(&mut self, _ctx: &mut Self::Context) {
        self.persistence
            .do_send(crate::persistence::events::ChannelCreated {
                name: self.name.to_string(),
            });
    fn started(&mut self, ctx: &mut Self::Context) {
        ctx.wait(
            self.persistence
                .send(crate::persistence::events::ChannelCreated {
                    name: self.name.to_string(),
                })
                .into_actor(self)
                .map(|res, this, ctx| match res {
                    Ok(channel_id) => {
                        this.channel_id.0 = channel_id;
                    }
                    Err(error) => {
                        error!(%error, "Failed to create channel in database");
                        ctx.terminate();
                    }
                }),
        );
    }
}

@@ -87,7 +103,7 @@ impl Handler<ChannelMessage> for Channel {

        self.persistence
            .do_send(crate::persistence::events::ChannelMessage {
                channel_name: self.name.to_string(),
                channel_id: self.channel_id,
                sender: nick.to_string(),
                message: msg.message.to_string(),
                receivers: self.clients.values().map(|v| v.user_id).collect(),
@@ -143,7 +159,7 @@ impl Handler<ChannelJoin> for Channel {
        // persist the user's join to the database
        self.persistence
            .do_send(crate::persistence::events::ChannelJoined {
                channel_name: self.name.to_string(),
                channel_id: self.channel_id,
                user_id: msg.connection.user_id,
                span: msg.span.clone(),
            });
@@ -281,7 +297,7 @@ impl Handler<ChannelPart> for Channel {
        // update the client's state in the database
        self.persistence
            .do_send(crate::persistence::events::ChannelParted {
                channel_name: self.name.to_string(),
                channel_id: self.channel_id,
                user_id: client_info.user_id,
                span: msg.span.clone(),
            });
diff --git a/src/persistence.rs b/src/persistence.rs
index 20d2fb8..361a2e6 100644
--- a/src/persistence.rs
+++ b/src/persistence.rs
@@ -22,17 +22,24 @@ impl actix::Actor for Persistence {

/// Create a new channel in the database, if one doesn't already exist.
impl Handler<ChannelCreated> for Persistence {
    type Result = ResponseFuture<()>;
    type Result = ResponseFuture<i64>;

    fn handle(&mut self, msg: ChannelCreated, _ctx: &mut Self::Context) -> Self::Result {
        let conn = self.database.clone();

        Box::pin(async move {
            sqlx::query("INSERT OR IGNORE INTO channels (name) VALUES (?)")
                .bind(msg.name)
                .execute(&conn)
                .await
                .unwrap();
            sqlx::query_as(
                "INSERT OR IGNORE INTO channels
                 (name) VALUES (?)
                 ON CONFLICT(name)
                   DO UPDATE SET name = name
                 RETURNING id",
            )
            .bind(msg.name)
            .fetch_one(&conn)
            .await
            .map(|(v,)| v)
            .unwrap()
        })
    }
}
@@ -48,10 +55,10 @@ impl Handler<ChannelJoined> for Persistence {
        Box::pin(async move {
            sqlx::query(
                "INSERT INTO channel_users (channel, user, permissions, in_channel)
                 VALUES ((SELECT id FROM channels WHERE name = ?), ?, ?, ?)
                 VALUES (?, ?, ?, ?)
                 ON CONFLICT(channel, user) DO UPDATE SET in_channel = excluded.in_channel",
            )
            .bind(msg.channel_name)
            .bind(msg.channel_id.0)
            .bind(msg.user_id.0)
            .bind(0i32)
            .bind(true)
@@ -74,10 +81,10 @@ impl Handler<ChannelParted> for Persistence {
            sqlx::query(
                "UPDATE channel_users
                 SET in_channel = false
                 WHERE channel = (SELECT id FROM channels WHERE name = ?)
                 WHERE channel = ?
                   AND user = ?",
            )
            .bind(msg.channel_name)
            .bind(msg.channel_id.0)
            .bind(msg.user_id.0)
            .execute(&conn)
            .await
@@ -98,7 +105,7 @@ impl Handler<FetchUserChannels> for Persistence {
                  FROM channel_users
                  INNER JOIN channels
                    ON channels.id = channel_users.channel
                  WHERE user = (SELECT id FROM users WHERE username = ?)
                  WHERE user = ?
                    AND in_channel = true",
            )
            .bind(msg.user_id.0)
@@ -119,18 +126,13 @@ impl Handler<ChannelMessage> for Persistence {
        let conn = self.database.clone();

        Box::pin(async move {
            let (channel, idx): (i64, i64) = sqlx::query_as(
                "WITH channel AS (SELECT id FROM channels WHERE name = ?)
                 INSERT INTO channel_messages (channel, idx, sender, message)
                     SELECT
                        channel.id,
                        COALESCE((SELECT MAX(idx) + 1 FROM channel_messages WHERE channel = channel.id), 0),
                        ?,
                        ?
                    FROM channel
                 RETURNING channel, idx",
            let (idx,): (i64,) = sqlx::query_as(
                "INSERT INTO channel_messages (channel, idx, sender, message)
                     VALUES (?, COALESCE((SELECT MAX(idx) + 1 FROM channel_messages WHERE channel = ?), 0), ?, ?)
                     RETURNING idx",
            )
            .bind(msg.channel_name)
            .bind(msg.channel_id.0)
            .bind(msg.channel_id.0)
            .bind(msg.sender)
            .bind(msg.message)
            .fetch_one(&conn)
@@ -146,7 +148,7 @@ impl Handler<ChannelMessage> for Persistence {
                    msg.receivers.iter().map(|_| "?").join(",")
                );

                let mut query = sqlx::query(&query).bind(idx).bind(channel);
                let mut query = sqlx::query(&query).bind(idx).bind(msg.channel_id.0);
                for receiver in msg.receivers {
                    query = query.bind(receiver.0);
                }
diff --git a/src/persistence/events.rs b/src/persistence/events.rs
index e064511..2117825 100644
--- a/src/persistence/events.rs
+++ b/src/persistence/events.rs
@@ -1,10 +1,10 @@
use actix::Message;
use tracing::Span;

use crate::connection::UserId;
use crate::{channel::ChannelId, connection::UserId};

#[derive(Message)]
#[rtype(result = "()")]
#[rtype(result = "i64")]
pub struct ChannelCreated {
    pub name: String,
}
@@ -12,7 +12,7 @@ pub struct ChannelCreated {
#[derive(Message)]
#[rtype(result = "()")]
pub struct ChannelJoined {
    pub channel_name: String,
    pub channel_id: ChannelId,
    pub user_id: UserId,
    pub span: Span,
}
@@ -20,7 +20,7 @@ pub struct ChannelJoined {
#[derive(Message)]
#[rtype(result = "()")]
pub struct ChannelParted {
    pub channel_name: String,
    pub channel_id: ChannelId,
    pub user_id: UserId,
    pub span: Span,
}
@@ -35,7 +35,7 @@ pub struct FetchUserChannels {
#[derive(Message)]
#[rtype(result = "()")]
pub struct ChannelMessage {
    pub channel_name: String,
    pub channel_id: ChannelId,
    pub sender: String,
    pub message: String,
    pub receivers: Vec<UserId>,
diff --git a/src/server.rs b/src/server.rs
index 24c7214..ac57ed1 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -15,7 +15,7 @@ use tokio_stream::StreamExt;
use tracing::{debug, instrument, warn, Span};

use crate::{
    channel::Channel,
    channel::{Channel, ChannelId},
    client::Client,
    config::Config,
    connection::InitiatedConnection,
@@ -167,6 +167,7 @@ impl Handler<ChannelJoin> for Server {
                    topic: None,
                    server,
                    persistence,
                    channel_id: ChannelId(0),
                })
            })
            .clone();