From dff4f3145ac94ac570fc88dc49286a9d78735331 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Thu, 02 Feb 2023 01:58:35 +0000 Subject: [PATCH] Add support for server-time capability for proper timestamping of offline messages --- src/client.rs | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- src/connection.rs | 5 ++++- src/persistence.rs | 22 ++++++++++++++++------ src/persistence/events.rs | 5 +++-- 4 files changed, 75 insertions(+), 18 deletions(-) diff --git a/src/client.rs b/src/client.rs index 0f76b8a..1e83f47 100644 --- a/src/client.rs +++ a/src/client.rs @@ -1,14 +1,17 @@ use std::{collections::HashMap, time::Duration}; use actix::{ fut::wrap_future, io::WriteHandler, Actor, ActorContext, ActorFutureExt, Addr, AsyncContext, Context, Handler, MessageResult, ResponseActFuture, Running, StreamHandler, WrapFuture, }; +use chrono::{DateTime, SecondsFormat, Utc}; use clap::{crate_name, crate_version}; use futures::FutureExt; -use irc_proto::{error::ProtocolError, ChannelExt, Command, Message, Prefix, Response}; +use irc_proto::{ + error::ProtocolError, message::Tag, ChannelExt, Command, Message, Prefix, Response, +}; use tokio::time::Instant; -use tracing::{debug, error, info_span, instrument, warn, Instrument, Span}; +use tracing::{debug, error, info, info_span, instrument, warn, Instrument, Span}; use crate::{ channel::Channel, @@ -55,6 +58,24 @@ pub persistence: Addr, /// The connection span to group all logs for the same connection pub span: Span, +} + +impl Client { + #[must_use] + pub fn maybe_build_time_tag(&self, time: DateTime) -> Option { + if !self + .connection + .capabilities + .contains(Capability::SERVER_TIME) + { + return None; + } + + Some(Tag( + "time".to_string(), + Some(time.to_rfc3339_opts(SecondsFormat::Millis, true)), + )) + } } impl Actor for Client { @@ -65,6 +86,8 @@ /// We currently just use this to schedule pings towards the client. #[instrument(parent = &self.span, skip_all)] fn started(&mut self, ctx: &mut Self::Context) { + info!(?self.connection, "Client has successfully joined to server"); + // schedule pings to the client ctx.run_interval(Duration::from_secs(30), |this, ctx| { let _span = info_span!(parent: &this.span, "ping").entered(); @@ -106,10 +129,12 @@ }) .into_actor(self) .map(move |res, this, ctx| { - for (sender, message) in res.unwrap() { + for (sent, sender, message) in res.unwrap() { ctx.notify(Broadcast { message: Message { - tags: None, + tags: TagBuilder::default() + .insert(this.maybe_build_time_tag(sent)) + .into(), prefix: Some(Prefix::new_from_str(&sender)), command: Command::PRIVMSG(this.connection.nick.clone(), message), }, @@ -238,9 +263,11 @@ this.channels.insert(channel_name.clone(), handle); - for (source, message) in messages { + for (sent, source, message) in messages { this.writer.write(Message { - tags: None, + tags: TagBuilder::default() + .insert(this.maybe_build_time_tag(sent)) + .into(), prefix: Some(Prefix::new_from_str(&source)), command: Command::PRIVMSG(channel_name.clone(), message), }); @@ -694,7 +721,29 @@ Command::CHGHOST(_, _) => {} Command::Response(_, _) => {} Command::Raw(_, _) => {} + } + } +} + +#[derive(Default)] +pub struct TagBuilder { + inner: Vec, +} + +impl TagBuilder { + #[must_use] + pub fn insert(mut self, tag: impl Into>) -> Self { + if let Some(tag) = tag.into() { + self.inner.push(tag); } + + self + } +} + +impl From for Option> { + fn from(value: TagBuilder) -> Self { + Some(value.inner).filter(|v| !v.is_empty()) } } diff --git a/src/connection.rs b/src/connection.rs index ef5abca..ad1bf46 100644 --- a/src/connection.rs +++ a/src/connection.rs @@ -47,7 +47,7 @@ capabilities: Capability, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct InitiatedConnection { pub host: SocketAddr, pub nick: String, @@ -269,12 +269,14 @@ #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)] pub struct Capability: u32 { const USERHOST_IN_NAMES = 0b0000_0000_0000_0000_0000_0000_0000_0001; + const SERVER_TIME = 0b0000_0000_0000_0000_0000_0000_0000_0010; } } impl Capability { pub const SUPPORTED: &'static [&'static str] = &[ "userhost-in-names", + "server-time", concatcp!("sasl=", AuthStrategy::SUPPORTED), ]; } @@ -285,6 +287,7 @@ fn from_str(s: &str) -> Result { match s { "userhost-in-names" => Ok(Self::USERHOST_IN_NAMES), + "server-time" => Ok(Self::SERVER_TIME), _ => Err(()), } } diff --git a/src/persistence.rs b/src/persistence.rs index 75b50e7..82ac1e0 100644 --- a/src/persistence.rs +++ a/src/persistence.rs @@ -1,9 +1,9 @@ pub mod events; use std::{collections::HashMap, time::Duration}; use actix::{AsyncContext, Context, Handler, ResponseFuture, WrapFuture}; -use chrono::Utc; +use chrono::{DateTime, TimeZone, Utc}; use itertools::Itertools; use tracing::instrument; @@ -289,7 +289,7 @@ } impl Handler for Persistence { - type Result = ResponseFuture>; + type Result = ResponseFuture, String, String)>>; fn handle( &mut self, @@ -302,18 +302,21 @@ sqlx::query_as( "DELETE FROM private_messages WHERE receiver = ? - RETURNING sender, message", + RETURNING timestamp, sender, message", ) .bind(msg.user_id) .fetch_all(&conn) .await .unwrap() + .into_iter() + .map(|(timestamp, sender, message)| (Utc.timestamp_nanos(timestamp), sender, message)) + .collect() }) } } impl Handler for Persistence { - type Result = ResponseFuture>; + type Result = ResponseFuture, String, String)>>; #[instrument(parent = &msg.span, skip_all)] fn handle( @@ -328,9 +331,9 @@ Box::pin(async move { // select the last 500 messages, or the last message the user saw - whichever dataset // is smaller. - let res = sqlx::query_as( + sqlx::query_as( "WITH channel AS (SELECT id FROM channels WHERE name = ?) - SELECT sender, message + SELECT timestamp, sender, message FROM channel_messages WHERE channel = (SELECT id FROM channel) AND timestamp > MAX( @@ -349,9 +352,10 @@ .bind(msg.user_id.0) .fetch_all(&conn) .await - .unwrap(); - - res + .unwrap() + .into_iter() + .map(|(timestamp, sender, message)| (Utc.timestamp_nanos(timestamp), sender, message)) + .collect() }) } } diff --git a/src/persistence/events.rs b/src/persistence/events.rs index 390a5fc..d2fca48 100644 --- a/src/persistence/events.rs +++ a/src/persistence/events.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use actix::Message; +use chrono::{DateTime, Utc}; use tracing::Span; use crate::{ @@ -75,14 +76,14 @@ } #[derive(Message)] -#[rtype(result = "Vec<(String, String)>")] +#[rtype(result = "Vec<(DateTime, String, String)>")] pub struct FetchUnseenPrivateMessages { pub user_id: UserId, pub span: Span, } #[derive(Message)] -#[rtype(result = "Vec<(String, String)>")] +#[rtype(result = "Vec<(DateTime, String, String)>")] pub struct FetchUnseenChannelMessages { pub channel_name: String, pub user_id: UserId, -- rgit 0.1.5