From 624dc3d1a1395a727a8b974defeb1ae71fb6abf1 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Sun, 8 Jan 2023 20:51:48 +0000 Subject: [PATCH] Avoid lookups by storing the channel's id --- src/channel.rs | 36 ++++++++++++++++++++++++++---------- src/persistence.rs | 48 +++++++++++++++++++++++++----------------------- src/persistence/events.rs | 10 +++++----- src/server.rs | 3 ++- 4 files changed, 58 insertions(+), 39 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index c5a6c81..9498509 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -3,8 +3,8 @@ pub mod response; use std::collections::HashMap; use actix::{ - Actor, ActorFutureExt, Addr, AsyncContext, Context, Handler, MessageResult, ResponseActFuture, - Supervised, WrapFuture, + Actor, ActorContext, ActorFutureExt, Addr, AsyncContext, Context, Handler, MessageResult, + ResponseActFuture, Supervised, WrapFuture, }; use chrono::{DateTime, Utc}; use futures::future::Either; @@ -24,6 +24,9 @@ use crate::{ server::Server, }; +#[derive(Copy, Clone)] +pub struct ChannelId(pub i64); + /// A channel is an IRC channel (ie. #abc) that multiple users can connect to in order /// to chat together. pub struct Channel { @@ -32,16 +35,29 @@ pub struct Channel { pub clients: HashMap, InitiatedConnection>, pub topic: Option, pub persistence: Addr, + pub channel_id: ChannelId, } impl Actor for Channel { type Context = Context; - fn started(&mut self, _ctx: &mut Self::Context) { - self.persistence - .do_send(crate::persistence::events::ChannelCreated { - name: self.name.to_string(), - }); + fn started(&mut self, ctx: &mut Self::Context) { + ctx.wait( + self.persistence + .send(crate::persistence::events::ChannelCreated { + name: self.name.to_string(), + }) + .into_actor(self) + .map(|res, this, ctx| match res { + Ok(channel_id) => { + this.channel_id.0 = channel_id; + } + Err(error) => { + error!(%error, "Failed to create channel in database"); + ctx.terminate(); + } + }), + ); } } @@ -87,7 +103,7 @@ impl Handler for Channel { self.persistence .do_send(crate::persistence::events::ChannelMessage { - channel_name: self.name.to_string(), + channel_id: self.channel_id, sender: nick.to_string(), message: msg.message.to_string(), receivers: self.clients.values().map(|v| v.user_id).collect(), @@ -143,7 +159,7 @@ impl Handler for Channel { // persist the user's join to the database self.persistence .do_send(crate::persistence::events::ChannelJoined { - channel_name: self.name.to_string(), + channel_id: self.channel_id, user_id: msg.connection.user_id, span: msg.span.clone(), }); @@ -281,7 +297,7 @@ impl Handler for Channel { // update the client's state in the database self.persistence .do_send(crate::persistence::events::ChannelParted { - channel_name: self.name.to_string(), + channel_id: self.channel_id, user_id: client_info.user_id, span: msg.span.clone(), }); diff --git a/src/persistence.rs b/src/persistence.rs index 20d2fb8..361a2e6 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -22,17 +22,24 @@ impl actix::Actor for Persistence { /// Create a new channel in the database, if one doesn't already exist. impl Handler for Persistence { - type Result = ResponseFuture<()>; + type Result = ResponseFuture; fn handle(&mut self, msg: ChannelCreated, _ctx: &mut Self::Context) -> Self::Result { let conn = self.database.clone(); Box::pin(async move { - sqlx::query("INSERT OR IGNORE INTO channels (name) VALUES (?)") - .bind(msg.name) - .execute(&conn) - .await - .unwrap(); + sqlx::query_as( + "INSERT OR IGNORE INTO channels + (name) VALUES (?) + ON CONFLICT(name) + DO UPDATE SET name = name + RETURNING id", + ) + .bind(msg.name) + .fetch_one(&conn) + .await + .map(|(v,)| v) + .unwrap() }) } } @@ -48,10 +55,10 @@ impl Handler for Persistence { Box::pin(async move { sqlx::query( "INSERT INTO channel_users (channel, user, permissions, in_channel) - VALUES ((SELECT id FROM channels WHERE name = ?), ?, ?, ?) + VALUES (?, ?, ?, ?) ON CONFLICT(channel, user) DO UPDATE SET in_channel = excluded.in_channel", ) - .bind(msg.channel_name) + .bind(msg.channel_id.0) .bind(msg.user_id.0) .bind(0i32) .bind(true) @@ -74,10 +81,10 @@ impl Handler for Persistence { sqlx::query( "UPDATE channel_users SET in_channel = false - WHERE channel = (SELECT id FROM channels WHERE name = ?) + WHERE channel = ? AND user = ?", ) - .bind(msg.channel_name) + .bind(msg.channel_id.0) .bind(msg.user_id.0) .execute(&conn) .await @@ -98,7 +105,7 @@ impl Handler for Persistence { FROM channel_users INNER JOIN channels ON channels.id = channel_users.channel - WHERE user = (SELECT id FROM users WHERE username = ?) + WHERE user = ? AND in_channel = true", ) .bind(msg.user_id.0) @@ -119,18 +126,13 @@ impl Handler for Persistence { let conn = self.database.clone(); Box::pin(async move { - let (channel, idx): (i64, i64) = sqlx::query_as( - "WITH channel AS (SELECT id FROM channels WHERE name = ?) - INSERT INTO channel_messages (channel, idx, sender, message) - SELECT - channel.id, - COALESCE((SELECT MAX(idx) + 1 FROM channel_messages WHERE channel = channel.id), 0), - ?, - ? - FROM channel - RETURNING channel, idx", + let (idx,): (i64,) = sqlx::query_as( + "INSERT INTO channel_messages (channel, idx, sender, message) + VALUES (?, COALESCE((SELECT MAX(idx) + 1 FROM channel_messages WHERE channel = ?), 0), ?, ?) + RETURNING idx", ) - .bind(msg.channel_name) + .bind(msg.channel_id.0) + .bind(msg.channel_id.0) .bind(msg.sender) .bind(msg.message) .fetch_one(&conn) @@ -146,7 +148,7 @@ impl Handler for Persistence { msg.receivers.iter().map(|_| "?").join(",") ); - let mut query = sqlx::query(&query).bind(idx).bind(channel); + let mut query = sqlx::query(&query).bind(idx).bind(msg.channel_id.0); for receiver in msg.receivers { query = query.bind(receiver.0); } diff --git a/src/persistence/events.rs b/src/persistence/events.rs index e064511..2117825 100644 --- a/src/persistence/events.rs +++ b/src/persistence/events.rs @@ -1,10 +1,10 @@ use actix::Message; use tracing::Span; -use crate::connection::UserId; +use crate::{channel::ChannelId, connection::UserId}; #[derive(Message)] -#[rtype(result = "()")] +#[rtype(result = "i64")] pub struct ChannelCreated { pub name: String, } @@ -12,7 +12,7 @@ pub struct ChannelCreated { #[derive(Message)] #[rtype(result = "()")] pub struct ChannelJoined { - pub channel_name: String, + pub channel_id: ChannelId, pub user_id: UserId, pub span: Span, } @@ -20,7 +20,7 @@ pub struct ChannelJoined { #[derive(Message)] #[rtype(result = "()")] pub struct ChannelParted { - pub channel_name: String, + pub channel_id: ChannelId, pub user_id: UserId, pub span: Span, } @@ -35,7 +35,7 @@ pub struct FetchUserChannels { #[derive(Message)] #[rtype(result = "()")] pub struct ChannelMessage { - pub channel_name: String, + pub channel_id: ChannelId, pub sender: String, pub message: String, pub receivers: Vec, diff --git a/src/server.rs b/src/server.rs index 24c7214..ac57ed1 100644 --- a/src/server.rs +++ b/src/server.rs @@ -15,7 +15,7 @@ use tokio_stream::StreamExt; use tracing::{debug, instrument, warn, Span}; use crate::{ - channel::Channel, + channel::{Channel, ChannelId}, client::Client, config::Config, connection::InitiatedConnection, @@ -167,6 +167,7 @@ impl Handler for Server { topic: None, server, persistence, + channel_id: ChannelId(0), }) }) .clone(); -- libgit2 1.7.2