From dff4f3145ac94ac570fc88dc49286a9d78735331 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Thu, 2 Feb 2023 01:58:35 +0000 Subject: [PATCH] Add support for server-time capability for proper timestamping of offline messages --- src/client.rs | 61 +++++++++++++++++++++++++++++++++++++++++++++++++++++++------ src/connection.rs | 5 ++++- src/persistence.rs | 22 +++++++++++++--------- src/persistence/events.rs | 5 +++-- 4 files changed, 75 insertions(+), 18 deletions(-) diff --git a/src/client.rs b/src/client.rs index 0f76b8a..1e83f47 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,11 +4,14 @@ use actix::{ fut::wrap_future, io::WriteHandler, Actor, ActorContext, ActorFutureExt, Addr, AsyncContext, Context, Handler, MessageResult, ResponseActFuture, Running, StreamHandler, WrapFuture, }; +use chrono::{DateTime, SecondsFormat, Utc}; use clap::{crate_name, crate_version}; use futures::FutureExt; -use irc_proto::{error::ProtocolError, ChannelExt, Command, Message, Prefix, Response}; +use irc_proto::{ + error::ProtocolError, message::Tag, ChannelExt, Command, Message, Prefix, Response, +}; use tokio::time::Instant; -use tracing::{debug, error, info_span, instrument, warn, Instrument, Span}; +use tracing::{debug, error, info, info_span, instrument, warn, Instrument, Span}; use crate::{ channel::Channel, @@ -57,6 +60,24 @@ pub struct Client { pub span: Span, } +impl Client { + #[must_use] + pub fn maybe_build_time_tag(&self, time: DateTime) -> Option { + if !self + .connection + .capabilities + .contains(Capability::SERVER_TIME) + { + return None; + } + + Some(Tag( + "time".to_string(), + Some(time.to_rfc3339_opts(SecondsFormat::Millis, true)), + )) + } +} + impl Actor for Client { type Context = Context; @@ -65,6 +86,8 @@ impl Actor for Client { /// We currently just use this to schedule pings towards the client. #[instrument(parent = &self.span, skip_all)] fn started(&mut self, ctx: &mut Self::Context) { + info!(?self.connection, "Client has successfully joined to server"); + // schedule pings to the client ctx.run_interval(Duration::from_secs(30), |this, ctx| { let _span = info_span!(parent: &this.span, "ping").entered(); @@ -106,10 +129,12 @@ impl Actor for Client { }) .into_actor(self) .map(move |res, this, ctx| { - for (sender, message) in res.unwrap() { + for (sent, sender, message) in res.unwrap() { ctx.notify(Broadcast { message: Message { - tags: None, + tags: TagBuilder::default() + .insert(this.maybe_build_time_tag(sent)) + .into(), prefix: Some(Prefix::new_from_str(&sender)), command: Command::PRIVMSG(this.connection.nick.clone(), message), }, @@ -238,9 +263,11 @@ impl Handler for Client { this.channels.insert(channel_name.clone(), handle); - for (source, message) in messages { + for (sent, source, message) in messages { this.writer.write(Message { - tags: None, + tags: TagBuilder::default() + .insert(this.maybe_build_time_tag(sent)) + .into(), prefix: Some(Prefix::new_from_str(&source)), command: Command::PRIVMSG(channel_name.clone(), message), }); @@ -698,6 +725,28 @@ impl StreamHandler> for Client { } } +#[derive(Default)] +pub struct TagBuilder { + inner: Vec, +} + +impl TagBuilder { + #[must_use] + pub fn insert(mut self, tag: impl Into>) -> Self { + if let Some(tag) = tag.into() { + self.inner.push(tag); + } + + self + } +} + +impl From for Option> { + fn from(value: TagBuilder) -> Self { + Some(value.inner).filter(|v| !v.is_empty()) + } +} + #[must_use] pub fn parse_channel_name_list(s: &str) -> Vec { s.split(',') diff --git a/src/connection.rs b/src/connection.rs index ef5abca..ad1bf46 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -47,7 +47,7 @@ pub struct ConnectionRequest { capabilities: Capability, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct InitiatedConnection { pub host: SocketAddr, pub nick: String, @@ -269,12 +269,14 @@ bitflags! { #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)] pub struct Capability: u32 { const USERHOST_IN_NAMES = 0b0000_0000_0000_0000_0000_0000_0000_0001; + const SERVER_TIME = 0b0000_0000_0000_0000_0000_0000_0000_0010; } } impl Capability { pub const SUPPORTED: &'static [&'static str] = &[ "userhost-in-names", + "server-time", concatcp!("sasl=", AuthStrategy::SUPPORTED), ]; } @@ -285,6 +287,7 @@ impl FromStr for Capability { fn from_str(s: &str) -> Result { match s { "userhost-in-names" => Ok(Self::USERHOST_IN_NAMES), + "server-time" => Ok(Self::SERVER_TIME), _ => Err(()), } } diff --git a/src/persistence.rs b/src/persistence.rs index 75b50e7..82ac1e0 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -3,7 +3,7 @@ pub mod events; use std::{collections::HashMap, time::Duration}; use actix::{AsyncContext, Context, Handler, ResponseFuture, WrapFuture}; -use chrono::Utc; +use chrono::{DateTime, TimeZone, Utc}; use itertools::Itertools; use tracing::instrument; @@ -289,7 +289,7 @@ impl Handler for Persistence { } impl Handler for Persistence { - type Result = ResponseFuture>; + type Result = ResponseFuture, String, String)>>; fn handle( &mut self, @@ -302,18 +302,21 @@ impl Handler for Persistence { sqlx::query_as( "DELETE FROM private_messages WHERE receiver = ? - RETURNING sender, message", + RETURNING timestamp, sender, message", ) .bind(msg.user_id) .fetch_all(&conn) .await .unwrap() + .into_iter() + .map(|(timestamp, sender, message)| (Utc.timestamp_nanos(timestamp), sender, message)) + .collect() }) } } impl Handler for Persistence { - type Result = ResponseFuture>; + type Result = ResponseFuture, String, String)>>; #[instrument(parent = &msg.span, skip_all)] fn handle( @@ -328,9 +331,9 @@ impl Handler for Persistence { Box::pin(async move { // select the last 500 messages, or the last message the user saw - whichever dataset // is smaller. - let res = sqlx::query_as( + sqlx::query_as( "WITH channel AS (SELECT id FROM channels WHERE name = ?) - SELECT sender, message + SELECT timestamp, sender, message FROM channel_messages WHERE channel = (SELECT id FROM channel) AND timestamp > MAX( @@ -349,9 +352,10 @@ impl Handler for Persistence { .bind(msg.user_id.0) .fetch_all(&conn) .await - .unwrap(); - - res + .unwrap() + .into_iter() + .map(|(timestamp, sender, message)| (Utc.timestamp_nanos(timestamp), sender, message)) + .collect() }) } } diff --git a/src/persistence/events.rs b/src/persistence/events.rs index 390a5fc..d2fca48 100644 --- a/src/persistence/events.rs +++ b/src/persistence/events.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use actix::Message; +use chrono::{DateTime, Utc}; use tracing::Span; use crate::{ @@ -75,14 +76,14 @@ pub struct PrivateMessage { } #[derive(Message)] -#[rtype(result = "Vec<(String, String)>")] +#[rtype(result = "Vec<(DateTime, String, String)>")] pub struct FetchUnseenPrivateMessages { pub user_id: UserId, pub span: Span, } #[derive(Message)] -#[rtype(result = "Vec<(String, String)>")] +#[rtype(result = "Vec<(DateTime, String, String)>")] pub struct FetchUnseenChannelMessages { pub channel_name: String, pub user_id: UserId, -- libgit2 1.7.2