From c1a4bdabe22d83923adcf4c983ef8733769c27f9 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Mon, 9 Jan 2023 17:35:54 +0000 Subject: [PATCH] Truncate channel_messages when they've been seen by all users --- src/persistence.rs | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/src/persistence.rs b/src/persistence.rs index 361a2e6..52f65aa 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -1,6 +1,8 @@ pub mod events; -use actix::{Context, Handler, ResponseFuture}; +use std::time::Duration; + +use actix::{AsyncContext, Context, Handler, ResponseFuture, WrapFuture}; use itertools::Itertools; use tracing::instrument; @@ -18,6 +20,15 @@ impl actix::Supervised for Persistence {} impl actix::Actor for Persistence { type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + // truncate the messages table every 5 minutes for messages all users have seen + ctx.run_interval(Duration::from_secs(300), |this, ctx| { + let database = this.database.clone(); + + ctx.spawn(truncate_seen_messages(database).into_actor(this)); + }); + } } /// Create a new channel in the database, if one doesn't already exist. @@ -199,3 +210,26 @@ impl Handler for Persistence { }) } } + +/// Remove any messages from the messages table whenever they've been seen by all users. +pub async fn truncate_seen_messages(db: sqlx::Pool) { + // fetch the minimum last seen message by channel + let messages = sqlx::query_as::<_, (i64, i64)>( + "SELECT channel, MIN(last_seen_message_idx) + FROM channel_users + GROUP BY channel", + ) + .fetch_all(&db) + .await + .unwrap(); + + // delete all messages that have been by all users + for (channel, min_seen_id) in messages { + sqlx::query("DELETE FROM channel_messages WHERE channel = ? AND idx < ?") + .bind(channel) + .bind(min_seen_id) + .execute(&db) + .await + .unwrap(); + } +} -- libgit2 1.7.2