Add support for server-time capability for proper timestamping of offline messages
Diff
src/client.rs | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
src/connection.rs | 5 ++++-
src/persistence.rs | 22 ++++++++++++++++------
src/persistence/events.rs | 5 +++--
4 files changed, 75 insertions(+), 18 deletions(-)
@@ -1,14 +1,17 @@
use std::{collections::HashMap, time::Duration};
use actix::{
fut::wrap_future, io::WriteHandler, Actor, ActorContext, ActorFutureExt, Addr, AsyncContext,
Context, Handler, MessageResult, ResponseActFuture, Running, StreamHandler, WrapFuture,
};
use chrono::{DateTime, SecondsFormat, Utc};
use clap::{crate_name, crate_version};
use futures::FutureExt;
use irc_proto::{error::ProtocolError, ChannelExt, Command, Message, Prefix, Response};
use irc_proto::{
error::ProtocolError, message::Tag, ChannelExt, Command, Message, Prefix, Response,
};
use tokio::time::Instant;
use tracing::{debug, error, info_span, instrument, warn, Instrument, Span};
use tracing::{debug, error, info, info_span, instrument, warn, Instrument, Span};
use crate::{
channel::Channel,
@@ -55,6 +58,24 @@
pub persistence: Addr<Persistence>,
pub span: Span,
}
impl Client {
#[must_use]
pub fn maybe_build_time_tag(&self, time: DateTime<Utc>) -> Option<Tag> {
if !self
.connection
.capabilities
.contains(Capability::SERVER_TIME)
{
return None;
}
Some(Tag(
"time".to_string(),
Some(time.to_rfc3339_opts(SecondsFormat::Millis, true)),
))
}
}
impl Actor for Client {
@@ -65,6 +86,8 @@
#[instrument(parent = &self.span, skip_all)]
fn started(&mut self, ctx: &mut Self::Context) {
info!(?self.connection, "Client has successfully joined to server");
ctx.run_interval(Duration::from_secs(30), |this, ctx| {
let _span = info_span!(parent: &this.span, "ping").entered();
@@ -106,10 +129,12 @@
})
.into_actor(self)
.map(move |res, this, ctx| {
for (sender, message) in res.unwrap() {
for (sent, sender, message) in res.unwrap() {
ctx.notify(Broadcast {
message: Message {
tags: None,
tags: TagBuilder::default()
.insert(this.maybe_build_time_tag(sent))
.into(),
prefix: Some(Prefix::new_from_str(&sender)),
command: Command::PRIVMSG(this.connection.nick.clone(), message),
},
@@ -238,9 +263,11 @@
this.channels.insert(channel_name.clone(), handle);
for (source, message) in messages {
for (sent, source, message) in messages {
this.writer.write(Message {
tags: None,
tags: TagBuilder::default()
.insert(this.maybe_build_time_tag(sent))
.into(),
prefix: Some(Prefix::new_from_str(&source)),
command: Command::PRIVMSG(channel_name.clone(), message),
});
@@ -694,7 +721,29 @@
Command::CHGHOST(_, _) => {}
Command::Response(_, _) => {}
Command::Raw(_, _) => {}
}
}
}
#[derive(Default)]
pub struct TagBuilder {
inner: Vec<Tag>,
}
impl TagBuilder {
#[must_use]
pub fn insert(mut self, tag: impl Into<Option<Tag>>) -> Self {
if let Some(tag) = tag.into() {
self.inner.push(tag);
}
self
}
}
impl From<TagBuilder> for Option<Vec<Tag>> {
fn from(value: TagBuilder) -> Self {
Some(value.inner).filter(|v| !v.is_empty())
}
}
@@ -47,7 +47,7 @@
capabilities: Capability,
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct InitiatedConnection {
pub host: SocketAddr,
pub nick: String,
@@ -269,12 +269,14 @@
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
pub struct Capability: u32 {
const USERHOST_IN_NAMES = 0b0000_0000_0000_0000_0000_0000_0000_0001;
const SERVER_TIME = 0b0000_0000_0000_0000_0000_0000_0000_0010;
}
}
impl Capability {
pub const SUPPORTED: &'static [&'static str] = &[
"userhost-in-names",
"server-time",
concatcp!("sasl=", AuthStrategy::SUPPORTED),
];
}
@@ -285,6 +287,7 @@
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"userhost-in-names" => Ok(Self::USERHOST_IN_NAMES),
"server-time" => Ok(Self::SERVER_TIME),
_ => Err(()),
}
}
@@ -1,9 +1,9 @@
pub mod events;
use std::{collections::HashMap, time::Duration};
use actix::{AsyncContext, Context, Handler, ResponseFuture, WrapFuture};
use chrono::Utc;
use chrono::{DateTime, TimeZone, Utc};
use itertools::Itertools;
use tracing::instrument;
@@ -289,7 +289,7 @@
}
impl Handler<FetchUnseenPrivateMessages> for Persistence {
type Result = ResponseFuture<Vec<(String, String)>>;
type Result = ResponseFuture<Vec<(DateTime<Utc>, String, String)>>;
fn handle(
&mut self,
@@ -302,18 +302,21 @@
sqlx::query_as(
"DELETE FROM private_messages
WHERE receiver = ?
RETURNING sender, message",
RETURNING timestamp, sender, message",
)
.bind(msg.user_id)
.fetch_all(&conn)
.await
.unwrap()
.into_iter()
.map(|(timestamp, sender, message)| (Utc.timestamp_nanos(timestamp), sender, message))
.collect()
})
}
}
impl Handler<FetchUnseenChannelMessages> for Persistence {
type Result = ResponseFuture<Vec<(String, String)>>;
type Result = ResponseFuture<Vec<(DateTime<Utc>, String, String)>>;
#[instrument(parent = &msg.span, skip_all)]
fn handle(
@@ -328,9 +331,9 @@
Box::pin(async move {
let res = sqlx::query_as(
sqlx::query_as(
"WITH channel AS (SELECT id FROM channels WHERE name = ?)
SELECT sender, message
SELECT timestamp, sender, message
FROM channel_messages
WHERE channel = (SELECT id FROM channel)
AND timestamp > MAX(
@@ -349,9 +352,10 @@
.bind(msg.user_id.0)
.fetch_all(&conn)
.await
.unwrap();
res
.unwrap()
.into_iter()
.map(|(timestamp, sender, message)| (Utc.timestamp_nanos(timestamp), sender, message))
.collect()
})
}
}
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use actix::Message;
use chrono::{DateTime, Utc};
use tracing::Span;
use crate::{
@@ -75,14 +76,14 @@
}
#[derive(Message)]
#[rtype(result = "Vec<(String, String)>")]
#[rtype(result = "Vec<(DateTime<Utc>, String, String)>")]
pub struct FetchUnseenPrivateMessages {
pub user_id: UserId,
pub span: Span,
}
#[derive(Message)]
#[rtype(result = "Vec<(String, String)>")]
#[rtype(result = "Vec<(DateTime<Utc>, String, String)>")]
pub struct FetchUnseenChannelMessages {
pub channel_name: String,
pub user_id: UserId,