🏡 index : ~doyle/titanirc.git

author Jordan Doyle <jordan@doyle.la> 2023-02-02 1:58:35.0 +00:00:00
committer Jordan Doyle <jordan@doyle.la> 2023-02-02 1:58:35.0 +00:00:00
commit
dff4f3145ac94ac570fc88dc49286a9d78735331 [patch]
tree
95c3aadfe613b51d5276a430ad87142b90db27d7
parent
4d57bff5266dfc0ea5db9d76e647c88bd00b3c4f
download
dff4f3145ac94ac570fc88dc49286a9d78735331.tar.gz

Add support for server-time capability for proper timestamping of offline messages



Diff

 src/client.rs             | 61 +++++++++++++++++++++++++++++++++++++++++++-----
 src/connection.rs         |  5 +++-
 src/persistence.rs        | 22 ++++++++++-------
 src/persistence/events.rs |  5 ++--
 4 files changed, 75 insertions(+), 18 deletions(-)

diff --git a/src/client.rs b/src/client.rs
index 0f76b8a..1e83f47 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -4,11 +4,14 @@ use actix::{
    fut::wrap_future, io::WriteHandler, Actor, ActorContext, ActorFutureExt, Addr, AsyncContext,
    Context, Handler, MessageResult, ResponseActFuture, Running, StreamHandler, WrapFuture,
};
use chrono::{DateTime, SecondsFormat, Utc};
use clap::{crate_name, crate_version};
use futures::FutureExt;
use irc_proto::{error::ProtocolError, ChannelExt, Command, Message, Prefix, Response};
use irc_proto::{
    error::ProtocolError, message::Tag, ChannelExt, Command, Message, Prefix, Response,
};
use tokio::time::Instant;
use tracing::{debug, error, info_span, instrument, warn, Instrument, Span};
use tracing::{debug, error, info, info_span, instrument, warn, Instrument, Span};

use crate::{
    channel::Channel,
@@ -57,6 +60,24 @@ pub struct Client {
    pub span: Span,
}

impl Client {
    #[must_use]
    pub fn maybe_build_time_tag(&self, time: DateTime<Utc>) -> Option<Tag> {
        if !self
            .connection
            .capabilities
            .contains(Capability::SERVER_TIME)
        {
            return None;
        }

        Some(Tag(
            "time".to_string(),
            Some(time.to_rfc3339_opts(SecondsFormat::Millis, true)),
        ))
    }
}

impl Actor for Client {
    type Context = Context<Self>;

@@ -65,6 +86,8 @@ impl Actor for Client {
    /// We currently just use this to schedule pings towards the client.
    #[instrument(parent = &self.span, skip_all)]
    fn started(&mut self, ctx: &mut Self::Context) {
        info!(?self.connection, "Client has successfully joined to server");

        // schedule pings to the client
        ctx.run_interval(Duration::from_secs(30), |this, ctx| {
            let _span = info_span!(parent: &this.span, "ping").entered();
@@ -106,10 +129,12 @@ impl Actor for Client {
                })
                .into_actor(self)
                .map(move |res, this, ctx| {
                    for (sender, message) in res.unwrap() {
                    for (sent, sender, message) in res.unwrap() {
                        ctx.notify(Broadcast {
                            message: Message {
                                tags: None,
                                tags: TagBuilder::default()
                                    .insert(this.maybe_build_time_tag(sent))
                                    .into(),
                                prefix: Some(Prefix::new_from_str(&sender)),
                                command: Command::PRIVMSG(this.connection.nick.clone(), message),
                            },
@@ -238,9 +263,11 @@ impl Handler<JoinChannelRequest> for Client {

                this.channels.insert(channel_name.clone(), handle);

                for (source, message) in messages {
                for (sent, source, message) in messages {
                    this.writer.write(Message {
                        tags: None,
                        tags: TagBuilder::default()
                            .insert(this.maybe_build_time_tag(sent))
                            .into(),
                        prefix: Some(Prefix::new_from_str(&source)),
                        command: Command::PRIVMSG(channel_name.clone(), message),
                    });
@@ -698,6 +725,28 @@ impl StreamHandler<Result<irc_proto::Message, ProtocolError>> for Client {
    }
}

#[derive(Default)]
pub struct TagBuilder {
    inner: Vec<Tag>,
}

impl TagBuilder {
    #[must_use]
    pub fn insert(mut self, tag: impl Into<Option<Tag>>) -> Self {
        if let Some(tag) = tag.into() {
            self.inner.push(tag);
        }

        self
    }
}

impl From<TagBuilder> for Option<Vec<Tag>> {
    fn from(value: TagBuilder) -> Self {
        Some(value.inner).filter(|v| !v.is_empty())
    }
}

#[must_use]
pub fn parse_channel_name_list(s: &str) -> Vec<String> {
    s.split(',')
diff --git a/src/connection.rs b/src/connection.rs
index ef5abca..ad1bf46 100644
--- a/src/connection.rs
+++ b/src/connection.rs
@@ -47,7 +47,7 @@ pub struct ConnectionRequest {
    capabilities: Capability,
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct InitiatedConnection {
    pub host: SocketAddr,
    pub nick: String,
@@ -269,12 +269,14 @@ bitflags! {
    #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
    pub struct Capability: u32 {
        const USERHOST_IN_NAMES = 0b0000_0000_0000_0000_0000_0000_0000_0001;
        const SERVER_TIME       = 0b0000_0000_0000_0000_0000_0000_0000_0010;
    }
}

impl Capability {
    pub const SUPPORTED: &'static [&'static str] = &[
        "userhost-in-names",
        "server-time",
        concatcp!("sasl=", AuthStrategy::SUPPORTED),
    ];
}
@@ -285,6 +287,7 @@ impl FromStr for Capability {
    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s {
            "userhost-in-names" => Ok(Self::USERHOST_IN_NAMES),
            "server-time" => Ok(Self::SERVER_TIME),
            _ => Err(()),
        }
    }
diff --git a/src/persistence.rs b/src/persistence.rs
index 75b50e7..82ac1e0 100644
--- a/src/persistence.rs
+++ b/src/persistence.rs
@@ -3,7 +3,7 @@ pub mod events;
use std::{collections::HashMap, time::Duration};

use actix::{AsyncContext, Context, Handler, ResponseFuture, WrapFuture};
use chrono::Utc;
use chrono::{DateTime, TimeZone, Utc};
use itertools::Itertools;
use tracing::instrument;

@@ -289,7 +289,7 @@ impl Handler<PrivateMessage> for Persistence {
}

impl Handler<FetchUnseenPrivateMessages> for Persistence {
    type Result = ResponseFuture<Vec<(String, String)>>;
    type Result = ResponseFuture<Vec<(DateTime<Utc>, String, String)>>;

    fn handle(
        &mut self,
@@ -302,18 +302,21 @@ impl Handler<FetchUnseenPrivateMessages> for Persistence {
            sqlx::query_as(
                "DELETE FROM private_messages
                 WHERE receiver = ?
                 RETURNING sender, message",
                 RETURNING timestamp, sender, message",
            )
            .bind(msg.user_id)
            .fetch_all(&conn)
            .await
            .unwrap()
            .into_iter()
            .map(|(timestamp, sender, message)| (Utc.timestamp_nanos(timestamp), sender, message))
            .collect()
        })
    }
}

impl Handler<FetchUnseenChannelMessages> for Persistence {
    type Result = ResponseFuture<Vec<(String, String)>>;
    type Result = ResponseFuture<Vec<(DateTime<Utc>, String, String)>>;

    #[instrument(parent = &msg.span, skip_all)]
    fn handle(
@@ -328,9 +331,9 @@ impl Handler<FetchUnseenChannelMessages> for Persistence {
        Box::pin(async move {
            // select the last 500 messages, or the last message the user saw - whichever dataset
            // is smaller.
            let res = sqlx::query_as(
            sqlx::query_as(
                "WITH channel AS (SELECT id FROM channels WHERE name = ?)
                 SELECT sender, message
                 SELECT timestamp, sender, message
                 FROM channel_messages
                 WHERE channel = (SELECT id FROM channel)
                    AND timestamp > MAX(
@@ -349,9 +352,10 @@ impl Handler<FetchUnseenChannelMessages> for Persistence {
            .bind(msg.user_id.0)
            .fetch_all(&conn)
            .await
            .unwrap();

            res
            .unwrap()
            .into_iter()
            .map(|(timestamp, sender, message)| (Utc.timestamp_nanos(timestamp), sender, message))
            .collect()
        })
    }
}
diff --git a/src/persistence/events.rs b/src/persistence/events.rs
index 390a5fc..d2fca48 100644
--- a/src/persistence/events.rs
+++ b/src/persistence/events.rs
@@ -1,6 +1,7 @@
use std::collections::HashMap;

use actix::Message;
use chrono::{DateTime, Utc};
use tracing::Span;

use crate::{
@@ -75,14 +76,14 @@ pub struct PrivateMessage {
}

#[derive(Message)]
#[rtype(result = "Vec<(String, String)>")]
#[rtype(result = "Vec<(DateTime<Utc>, String, String)>")]
pub struct FetchUnseenPrivateMessages {
    pub user_id: UserId,
    pub span: Span,
}

#[derive(Message)]
#[rtype(result = "Vec<(String, String)>")]
#[rtype(result = "Vec<(DateTime<Utc>, String, String)>")]
pub struct FetchUnseenChannelMessages {
    pub channel_name: String,
    pub user_id: UserId,