Avoid lookups by storing the channel's id
Diff
src/channel.rs | 36 +++++++++++++++++++++++++++++++++---
src/persistence.rs | 48 +++++++++++++++++++++++++++++++++++++-----------
src/server.rs | 3 ++-
src/persistence/events.rs | 10 +++++-----
4 files changed, 58 insertions(+), 39 deletions(-)
@@ -1,10 +1,10 @@
pub mod response;
use std::collections::HashMap;
use actix::{
Actor, ActorFutureExt, Addr, AsyncContext, Context, Handler, MessageResult, ResponseActFuture,
Supervised, WrapFuture,
Actor, ActorContext, ActorFutureExt, Addr, AsyncContext, Context, Handler, MessageResult,
ResponseActFuture, Supervised, WrapFuture,
};
use chrono::{DateTime, Utc};
use futures::future::Either;
@@ -24,6 +24,9 @@
server::Server,
};
#[derive(Copy, Clone)]
pub struct ChannelId(pub i64);
pub struct Channel {
@@ -32,16 +35,29 @@
pub clients: HashMap<Addr<Client>, InitiatedConnection>,
pub topic: Option<CurrentChannelTopic>,
pub persistence: Addr<Persistence>,
pub channel_id: ChannelId,
}
impl Actor for Channel {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
self.persistence
.do_send(crate::persistence::events::ChannelCreated {
name: self.name.to_string(),
});
fn started(&mut self, ctx: &mut Self::Context) {
ctx.wait(
self.persistence
.send(crate::persistence::events::ChannelCreated {
name: self.name.to_string(),
})
.into_actor(self)
.map(|res, this, ctx| match res {
Ok(channel_id) => {
this.channel_id.0 = channel_id;
}
Err(error) => {
error!(%error, "Failed to create channel in database");
ctx.terminate();
}
}),
);
}
}
@@ -87,7 +103,7 @@
self.persistence
.do_send(crate::persistence::events::ChannelMessage {
channel_name: self.name.to_string(),
channel_id: self.channel_id,
sender: nick.to_string(),
message: msg.message.to_string(),
receivers: self.clients.values().map(|v| v.user_id).collect(),
@@ -143,7 +159,7 @@
self.persistence
.do_send(crate::persistence::events::ChannelJoined {
channel_name: self.name.to_string(),
channel_id: self.channel_id,
user_id: msg.connection.user_id,
span: msg.span.clone(),
});
@@ -281,7 +297,7 @@
self.persistence
.do_send(crate::persistence::events::ChannelParted {
channel_name: self.name.to_string(),
channel_id: self.channel_id,
user_id: client_info.user_id,
span: msg.span.clone(),
});
@@ -22,17 +22,24 @@
impl Handler<ChannelCreated> for Persistence {
type Result = ResponseFuture<()>;
type Result = ResponseFuture<i64>;
fn handle(&mut self, msg: ChannelCreated, _ctx: &mut Self::Context) -> Self::Result {
let conn = self.database.clone();
Box::pin(async move {
sqlx::query("INSERT OR IGNORE INTO channels (name) VALUES (?)")
.bind(msg.name)
.execute(&conn)
.await
.unwrap();
sqlx::query_as(
"INSERT OR IGNORE INTO channels
(name) VALUES (?)
ON CONFLICT(name)
DO UPDATE SET name = name
RETURNING id",
)
.bind(msg.name)
.fetch_one(&conn)
.await
.map(|(v,)| v)
.unwrap()
})
}
}
@@ -48,10 +55,10 @@
Box::pin(async move {
sqlx::query(
"INSERT INTO channel_users (channel, user, permissions, in_channel)
VALUES ((SELECT id FROM channels WHERE name = ?), ?, ?, ?)
VALUES (?, ?, ?, ?)
ON CONFLICT(channel, user) DO UPDATE SET in_channel = excluded.in_channel",
)
.bind(msg.channel_name)
.bind(msg.channel_id.0)
.bind(msg.user_id.0)
.bind(0i32)
.bind(true)
@@ -74,10 +81,10 @@
sqlx::query(
"UPDATE channel_users
SET in_channel = false
WHERE channel = (SELECT id FROM channels WHERE name = ?)
WHERE channel = ?
AND user = ?",
)
.bind(msg.channel_name)
.bind(msg.channel_id.0)
.bind(msg.user_id.0)
.execute(&conn)
.await
@@ -98,7 +105,7 @@
FROM channel_users
INNER JOIN channels
ON channels.id = channel_users.channel
WHERE user = (SELECT id FROM users WHERE username = ?)
WHERE user = ?
AND in_channel = true",
)
.bind(msg.user_id.0)
@@ -119,18 +126,13 @@
let conn = self.database.clone();
Box::pin(async move {
let (channel, idx): (i64, i64) = sqlx::query_as(
"WITH channel AS (SELECT id FROM channels WHERE name = ?)
INSERT INTO channel_messages (channel, idx, sender, message)
SELECT
channel.id,
COALESCE((SELECT MAX(idx) + 1 FROM channel_messages WHERE channel = channel.id), 0),
?,
?
FROM channel
RETURNING channel, idx",
let (idx,): (i64,) = sqlx::query_as(
"INSERT INTO channel_messages (channel, idx, sender, message)
VALUES (?, COALESCE((SELECT MAX(idx) + 1 FROM channel_messages WHERE channel = ?), 0), ?, ?)
RETURNING idx",
)
.bind(msg.channel_name)
.bind(msg.channel_id.0)
.bind(msg.channel_id.0)
.bind(msg.sender)
.bind(msg.message)
.fetch_one(&conn)
@@ -146,7 +148,7 @@
msg.receivers.iter().map(|_| "?").join(",")
);
let mut query = sqlx::query(&query).bind(idx).bind(channel);
let mut query = sqlx::query(&query).bind(idx).bind(msg.channel_id.0);
for receiver in msg.receivers {
query = query.bind(receiver.0);
}
@@ -15,7 +15,7 @@
use tracing::{debug, instrument, warn, Span};
use crate::{
channel::Channel,
channel::{Channel, ChannelId},
client::Client,
config::Config,
connection::InitiatedConnection,
@@ -167,6 +167,7 @@
topic: None,
server,
persistence,
channel_id: ChannelId(0),
})
})
.clone();
@@ -1,10 +1,10 @@
use actix::Message;
use tracing::Span;
use crate::connection::UserId;
use crate::{channel::ChannelId, connection::UserId};
#[derive(Message)]
#[rtype(result = "()")]
#[rtype(result = "i64")]
pub struct ChannelCreated {
pub name: String,
}
@@ -12,7 +12,7 @@
#[derive(Message)]
#[rtype(result = "()")]
pub struct ChannelJoined {
pub channel_name: String,
pub channel_id: ChannelId,
pub user_id: UserId,
pub span: Span,
}
@@ -20,7 +20,7 @@
#[derive(Message)]
#[rtype(result = "()")]
pub struct ChannelParted {
pub channel_name: String,
pub channel_id: ChannelId,
pub user_id: UserId,
pub span: Span,
}
@@ -35,7 +35,7 @@
#[derive(Message)]
#[rtype(result = "()")]
pub struct ChannelMessage {
pub channel_name: String,
pub channel_id: ChannelId,
pub sender: String,
pub message: String,
pub receivers: Vec<UserId>,