🏡 index : ~doyle/titanirc.git

author Jordan Doyle <jordan@doyle.la> 2023-01-09 20:15:58.0 +00:00:00
committer Jordan Doyle <jordan@doyle.la> 2023-01-09 20:18:11.0 +00:00:00
commit
a9a679f17febb8c9b3b3ae05a73e4888bb018916 [patch]
tree
bf26f051750404e164ee216bf8b0f38764c0540d
parent
c1a4bdabe22d83923adcf4c983ef8733769c27f9
download
a9a679f17febb8c9b3b3ae05a73e4888bb018916.tar.gz

Add a max retention for stored messaged used for replay



Diff

 Cargo.lock                                  |  26 ++++++++-
 Cargo.toml                                  |   3 +-
 config.toml                                 |   2 +-
 migrations/2023010814480_initial-schema.sql |  13 +---
 src/config.rs                               |  18 ++++-
 src/main.rs                                 |   3 +-
 src/persistence.rs                          |  99 +++++++++++++++++++----------
 7 files changed, 119 insertions(+), 45 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 738938e..98a5cac 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -676,6 +676,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"

[[package]]
name = "humantime"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
dependencies = [
 "quick-error",
]

[[package]]
name = "iana-time-zone"
version = "0.1.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1092,6 +1101,12 @@ dependencies = [
]

[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"

[[package]]
name = "quote"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1251,6 +1266,16 @@ dependencies = [
]

[[package]]
name = "serde-humantime"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c367b5dafa12cef19c554638db10acde90d5e9acea2b80e1ad98b00f88068f7d"
dependencies = [
 "humantime",
 "serde",
]

[[package]]
name = "serde_derive"
version = "1.0.152"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1553,6 +1578,7 @@ dependencies = [
 "itertools",
 "rand",
 "serde",
 "serde-humantime",
 "sqlx",
 "tokio",
 "tokio-stream",
diff --git a/Cargo.toml b/Cargo.toml
index 16acba7..5f9fb3f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -9,15 +9,16 @@ edition = "2021"
actix = "0.13"
actix-rt = "2.7"
anyhow = "1.0"
argon2 = "0.4"
base64 = "0.21.0-rc.1"
bytes = "1.3"
const_format = "0.2"
chrono = "0.4"
clap = { version = "4.0", features = ["cargo", "derive", "std", "suggestions", "color"] }
futures = "0.3"
argon2 = "0.4"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde-humantime = "0.1"
sqlx = { version = "0.6", features = ["runtime-actix-rustls", "sqlite", "any"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
diff --git a/config.toml b/config.toml
index 15850ef..2042b93 100644
--- a/config.toml
+++ b/config.toml
@@ -1,6 +1,8 @@
listen-address = "[::]:6667"
database-uri = "sqlite://titanircd.db"

max-message-replay-since = "1d"

client-threads = 1
channel-threads = 1

diff --git a/migrations/2023010814480_initial-schema.sql b/migrations/2023010814480_initial-schema.sql
index f103235..303ffe5 100644
--- a/migrations/2023010814480_initial-schema.sql
+++ b/migrations/2023010814480_initial-schema.sql
@@ -15,11 +15,11 @@ CREATE UNIQUE INDEX channel_name ON channels(name);

CREATE TABLE channel_messages (
      channel INT NOT NULL,
      idx INT NOT NULL,
      timestamp INT NOT NULL,
      sender VARCHAR(255),
      message VARCHAR(255),
      FOREIGN KEY(channel) REFERENCES channels(id),
      PRIMARY KEY(channel, idx)
      PRIMARY KEY(channel, timestamp)
);

CREATE TABLE channel_users (
@@ -27,10 +27,9 @@ CREATE TABLE channel_users (
    user INT NOT NULL,
    permissions INT NOT NULL DEFAULT 0,
    in_channel BOOLEAN DEFAULT false,
    last_seen_message_idx INT,
    last_seen_message_timestamp INT,
    FOREIGN KEY(user) REFERENCES users(id),
    FOREIGN KEY(channel) REFERENCES channels(id)
    -- FOREIGN KEY(channel, last_seen_message_idx) REFERENCES channels(channel, idx)
    FOREIGN KEY(channel) REFERENCES channels(id),
    -- FOREIGN KEY(channel, last_seen_message_timestamp) REFERENCES channel_messages(channel, timestamp)
    PRIMARY KEY(channel, user)
);

CREATE UNIQUE INDEX channel_user ON channel_users(channel, user);
diff --git a/src/config.rs b/src/config.rs
index daf3e7d..48d4472 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -1,4 +1,4 @@
use std::{net::SocketAddr, str::FromStr};
use std::{net::SocketAddr, str::FromStr, time::Duration};

use clap::Parser;
use serde::Deserialize;
@@ -19,12 +19,19 @@ pub struct Config {
    pub listen_address: SocketAddr,
    pub database_uri: String,
    pub motd: Option<String>,
    /// Maximum amount of messages to replay upon rejoin to a channel, if set to 0 an unlimited
    /// amount of messages will be retained. Defaults to 1 day.
    #[serde(
        default = "Config::default_max_message_replay_since",
        with = "serde_humantime"
    )]
    pub max_message_replay_since: Duration,
    /// Amount of threads to spawn for processing client commands, set to 0 to spawn clients on the
    /// main server thread.
    /// main server thread. Defaults to 1 thread.
    #[serde(default = "Config::default_client_threads")]
    pub client_threads: usize,
    /// Amount of threads to spawn for processing channel commands, set to 0 to spawn channels on
    /// the main server thread.
    /// the main server thread. Defaults to 1 thread.
    #[serde(default = "Config::default_channel_threads")]
    pub channel_threads: usize,
}
@@ -39,6 +46,11 @@ impl Config {
    const fn default_channel_threads() -> usize {
        1
    }

    #[must_use]
    const fn default_max_message_replay_since() -> Duration {
        Duration::from_secs(24 * 60 * 60)
    }
}

impl FromStr for Config {
diff --git a/src/main.rs b/src/main.rs
index 709406b..faaad24 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -81,9 +81,12 @@ async fn main() -> anyhow::Result<()> {

    let persistence_addr = {
        let database = database.clone();
        let config = opts.config.clone();

        Supervisor::start_in_arbiter(&server_arbiter.handle(), move |_ctx| Persistence {
            database,
            max_message_replay_since: config.max_message_replay_since,
            last_seen_clock: 0,
        })
    };

diff --git a/src/persistence.rs b/src/persistence.rs
index 52f65aa..ec5ee13 100644
--- a/src/persistence.rs
+++ b/src/persistence.rs
@@ -3,6 +3,7 @@ pub mod events;
use std::time::Duration;

use actix::{AsyncContext, Context, Handler, ResponseFuture, WrapFuture};
use chrono::Utc;
use itertools::Itertools;
use tracing::instrument;

@@ -14,6 +15,23 @@ use crate::persistence::events::{
/// Takes events destined for other actors and persists them to the database.
pub struct Persistence {
    pub database: sqlx::Pool<sqlx::Any>,
    pub max_message_replay_since: Duration,
    pub last_seen_clock: i64,
}

impl Persistence {
    /// Grabs the current time to use as an ID, preventing against backwards clockskew.
    fn monotonically_increasing_id(&mut self) -> i64 {
        let now = Utc::now().timestamp_nanos();

        self.last_seen_clock = if now <= self.last_seen_clock {
            self.last_seen_clock + 1
        } else {
            now
        };

        self.last_seen_clock
    }
}

impl actix::Supervised for Persistence {}
@@ -25,8 +43,9 @@ impl actix::Actor for Persistence {
        // truncate the messages table every 5 minutes for messages all users have seen
        ctx.run_interval(Duration::from_secs(300), |this, ctx| {
            let database = this.database.clone();
            let max_message_replay_since = this.max_message_replay_since;

            ctx.spawn(truncate_seen_messages(database).into_actor(this));
            ctx.spawn(truncate_seen_messages(database, max_message_replay_since).into_actor(this));
        });
    }
}
@@ -135,31 +154,30 @@ impl Handler<ChannelMessage> for Persistence {

    fn handle(&mut self, msg: ChannelMessage, _ctx: &mut Self::Context) -> Self::Result {
        let conn = self.database.clone();
        let timestamp = self.monotonically_increasing_id();

        Box::pin(async move {
            let (idx,): (i64,) = sqlx::query_as(
                "INSERT INTO channel_messages (channel, idx, sender, message)
                     VALUES (?, COALESCE((SELECT MAX(idx) + 1 FROM channel_messages WHERE channel = ?), 0), ?, ?)
                     RETURNING idx",
            sqlx::query(
                "INSERT INTO channel_messages (channel, timestamp, sender, message) VALUES (?, ?, ?, ?)",
            )
            .bind(msg.channel_id.0)
            .bind(msg.channel_id.0)
            .bind(timestamp)
            .bind(msg.sender)
            .bind(msg.message)
            .fetch_one(&conn)
            .execute(&conn)
            .await
            .unwrap();

            if !msg.receivers.is_empty() {
                let query = format!(
                    "UPDATE channel_users
                     SET last_seen_message_idx = ?
                     SET last_seen_message_timestamp = ?
                     WHERE channel = ?
                       AND user IN ({})",
                    msg.receivers.iter().map(|_| "?").join(",")
                );

                let mut query = sqlx::query(&query).bind(idx).bind(msg.channel_id.0);
                let mut query = sqlx::query(&query).bind(timestamp).bind(msg.channel_id.0);
                for receiver in msg.receivers {
                    query = query.bind(receiver.0);
                }
@@ -176,6 +194,8 @@ impl Handler<FetchUnseenMessages> for Persistence {
    #[instrument(parent = &msg.span, skip_all)]
    fn handle(&mut self, msg: FetchUnseenMessages, _ctx: &mut Self::Context) -> Self::Result {
        let conn = self.database.clone();
        let max_message_reply_since =
            Utc::now() - chrono::Duration::from_std(self.max_message_replay_since).unwrap();

        Box::pin(async move {
            // select the last 500 messages, or the last message the user saw - whichever dataset
@@ -185,22 +205,19 @@ impl Handler<FetchUnseenMessages> for Persistence {
                 SELECT sender, message
                 FROM channel_messages
                 WHERE channel = (SELECT id FROM channel)
                    AND idx > MAX(
                        (
                            SELECT MAX(0, MAX(idx) - 500)
                            FROM channel_messages
                            WHERE channel = (SELECT id FROM channel)
                        ),
                        (
                            SELECT last_seen_message_idx
                            FROM channel_users
                            WHERE channel = (SELECT id FROM channel)
                              AND user = ?
                        )
                    AND timestamp > MAX(
                      ?,
                      COALESCE((
                        SELECT last_seen_message_timestamp
                        FROM channel_users
                        WHERE channel = (SELECT id FROM channel)
                          AND user = ?
                      ), 0)
                    )
                 ORDER BY idx ASC",
                 ORDER BY timestamp ASC",
            )
            .bind(msg.channel_name.to_string())
            .bind(&msg.channel_name)
            .bind(max_message_reply_since.timestamp_nanos())
            .bind(msg.user_id.0)
            .fetch_all(&conn)
            .await
@@ -211,11 +228,13 @@ impl Handler<FetchUnseenMessages> for Persistence {
    }
}

/// Remove any messages from the messages table whenever they've been seen by all users.
pub async fn truncate_seen_messages(db: sqlx::Pool<sqlx::Any>) {
/// Remove any messages from the messages table whenever they've been seen by all users
/// or have passed their retention period
/// .
pub async fn truncate_seen_messages(db: sqlx::Pool<sqlx::Any>, max_replay_since: Duration) {
    // fetch the minimum last seen message by channel
    let messages = sqlx::query_as::<_, (i64, i64)>(
        "SELECT channel, MIN(last_seen_message_idx)
        "SELECT channel, MIN(last_seen_message_timestamp)
         FROM channel_users
         GROUP BY channel",
    )
@@ -223,13 +242,25 @@ pub async fn truncate_seen_messages(db: sqlx::Pool<sqlx::Any>) {
    .await
    .unwrap();

    // delete all messages that have been by all users
    for (channel, min_seen_id) in messages {
        sqlx::query("DELETE FROM channel_messages WHERE channel = ? AND idx < ?")
            .bind(channel)
            .bind(min_seen_id)
            .execute(&db)
            .await
            .unwrap();
    let max_replay_since = Utc::now() - chrono::Duration::from_std(max_replay_since).unwrap();

    // delete all messages that have been by all users or have passed their retention period
    for (channel, min_seen_timestamp) in messages {
        let mut tx = db.begin().await.unwrap();

        let remove_before = std::cmp::max(min_seen_timestamp, max_replay_since.timestamp_nanos());

        sqlx::query(
            "DELETE FROM channel_messages
             WHERE channel = ?
               AND timestamp <= ?",
        )
        .bind(channel)
        .bind(remove_before)
        .execute(&mut tx)
        .await
        .unwrap();

        tx.commit().await.unwrap();
    }
}