From c9bea1a6103a9dd96a1f7e729f5bb8d0dabe6854 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Mon, 29 Jan 2024 00:10:24 +0000 Subject: [PATCH] Implement LUSERS command --- src/channel.rs | 11 +++++++++-- src/client.rs | 156 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------------------------- src/connection.rs | 5 ++++- src/connection/authenticate.rs | 8 ++++++-- src/main.rs | 1 + src/messages.rs | 7 +++++++ src/server.rs | 19 +++++++++++++++++-- src/server/response.rs | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 204 insertions(+), 74 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 15100d9..61b8f9f 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -245,7 +245,11 @@ impl Handler for Channel { let permissions = self.get_user_permissions(msg.requester.user_id); // TODO: this should allow setting perms not currently in the channel - let Some((_, affected_user)) = self.clients.iter().find(|(_, connection)| connection.nick == msg.affected_nick) else { + let Some((_, affected_user)) = self + .clients + .iter() + .find(|(_, connection)| connection.nick == msg.affected_nick) + else { error!("Unknown user to set perms on"); return; }; @@ -290,7 +294,10 @@ impl Handler for Channel { .values() .filter(|connection| connection.user_id == affected_user.user_id); for connection in all_connected_for_user_id { - let Some(mode) = msg.user_mode.into_mode(msg.add, connection.nick.to_string()) else { + let Some(mode) = msg + .user_mode + .into_mode(msg.add, connection.nick.to_string()) + else { continue; }; diff --git a/src/client.rs b/src/client.rs index 3d38d16..118fc5e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,8 +1,9 @@ use std::{collections::HashMap, time::Duration}; use actix::{ - fut::wrap_future, io::WriteHandler, Actor, ActorContext, ActorFutureExt, Addr, AsyncContext, - Context, Handler, MessageResult, ResponseActFuture, Running, StreamHandler, WrapFuture, + fut::wrap_future, io::WriteHandler, Actor, ActorContext, ActorFuture, ActorFutureExt, Addr, + AsyncContext, Context, Handler, MessageResult, ResponseActFuture, Running, StreamHandler, + WrapFuture, }; use chrono::{DateTime, SecondsFormat, Utc}; use clap::{crate_name, crate_version}; @@ -11,7 +12,7 @@ use irc_proto::{ error::ProtocolError, message::Tag, ChannelExt, Command, Message, Prefix, Response, }; use tokio::time::Instant; -use tracing::{debug, error, info, info_span, instrument, warn, Instrument, Span}; +use tracing::{debug, error, info, instrument, warn, Instrument, Span}; use crate::{ channel::Channel, @@ -23,7 +24,7 @@ use crate::{ Broadcast, ChannelFetchTopic, ChannelInvite, ChannelJoin, ChannelKickUser, ChannelList, ChannelMemberList, ChannelMessage, ChannelPart, ChannelSetMode, ChannelUpdateTopic, FetchClientDetails, MessageKind, PrivateMessage, ServerDisconnect, ServerFetchMotd, - UserKickedFromChannel, UserNickChange, UserNickChangeInternal, + ServerListUsers, UserKickedFromChannel, UserNickChange, UserNickChangeInternal, }, persistence::{ events::{ @@ -79,6 +80,74 @@ impl Client { Some(time.to_rfc3339_opts(SecondsFormat::Millis, true)), )) } + + /// Send scheduled pings to the client + #[instrument(parent = &self.span, skip_all)] + fn handle_ping_interval(&mut self, ctx: &mut Context) { + if Instant::now().duration_since(self.last_active) >= Duration::from_secs(120) { + self.server_leave_reason = Some("Ping timeout: 120 seconds".to_string()); + ctx.stop(); + } + + self.writer.write(Message { + tags: None, + prefix: None, + command: Command::PING(SERVER_NAME.to_string(), None), + }); + } + + //// Join the user to all the channels they were previously in before disconnecting from + //// the server + fn rejoin_channels(&self) -> impl ActorFuture + 'static { + self.persistence + .send(FetchUserChannels { + user_id: self.connection.user_id, + span: Span::current(), + }) + .into_actor(self) + .map(move |res, this, ctx| { + ctx.notify(JoinChannelRequest { + channels: res.unwrap(), + span: this.span.clone(), + }); + }) + } + + fn build_unseen_message( + &self, + sent: DateTime, + sender: &str, + message: String, + kind: MessageKind, + ) -> Message { + Message { + tags: TagBuilder::default() + .insert(self.maybe_build_time_tag(sent)) + .into(), + prefix: Some(Prefix::new_from_str(sender)), + command: match kind { + MessageKind::Normal => Command::PRIVMSG(self.connection.nick.clone(), message), + MessageKind::Notice => Command::NOTICE(self.connection.nick.clone(), message), + }, + } + } + + fn send_unseen_private_messages(&self) -> impl ActorFuture + 'static { + self.persistence + .send(FetchUnseenPrivateMessages { + user_id: self.connection.user_id, + span: Span::current(), + }) + .into_actor(self) + .map(move |res, this, ctx| { + for (sent, sender, message, kind) in res.unwrap() { + ctx.notify(Broadcast { + message: this.build_unseen_message(sent, &sender, message, kind), + span: this.span.clone(), + }); + } + }) + } } impl Actor for Client { @@ -91,68 +160,9 @@ impl Actor for Client { fn started(&mut self, ctx: &mut Self::Context) { info!(?self.connection, "Client has successfully joined to server"); - // schedule pings to the client - ctx.run_interval(Duration::from_secs(30), |this, ctx| { - let _span = info_span!(parent: &this.span, "ping").entered(); - - if Instant::now().duration_since(this.last_active) >= Duration::from_secs(120) { - this.server_leave_reason = Some("Ping timeout: 120 seconds".to_string()); - ctx.stop(); - } - - this.writer.write(Message { - tags: None, - prefix: None, - command: Command::PING(SERVER_NAME.to_string(), None), - }); - }); - - // join the user to all the channels they were previously in before disconnecting from - // the server - ctx.spawn( - self.persistence - .send(FetchUserChannels { - user_id: self.connection.user_id, - span: Span::current(), - }) - .into_actor(self) - .map(move |res, this, ctx| { - ctx.notify(JoinChannelRequest { - channels: res.unwrap(), - span: this.span.clone(), - }); - }), - ); - - ctx.spawn( - self.persistence - .send(FetchUnseenPrivateMessages { - user_id: self.connection.user_id, - span: Span::current(), - }) - .into_actor(self) - .map(move |res, this, ctx| { - for (sent, sender, message, kind) in res.unwrap() { - ctx.notify(Broadcast { - message: Message { - tags: TagBuilder::default() - .insert(this.maybe_build_time_tag(sent)) - .into(), - prefix: Some(Prefix::new_from_str(&sender)), - command: match kind { - MessageKind::Normal => { - Command::PRIVMSG(this.connection.nick.clone(), message) - } - MessageKind::Notice => { - Command::NOTICE(this.connection.nick.clone(), message) - } - }, - }, - span: this.span.clone(), - }); - } - }), - ); + ctx.run_interval(Duration::from_secs(30), Self::handle_ping_interval); + ctx.spawn(self.rejoin_channels()); + ctx.spawn(self.send_unseen_private_messages()); } /// Called when the actor is shutting down, either gracefully by the client or forcefully @@ -652,7 +662,19 @@ impl StreamHandler> for Client { ctx.spawn(fut); } - Command::LUSERS(_, _) => {} + Command::LUSERS(_, _) => { + let span = Span::current(); + let fut = self + .server + .send(ServerListUsers { span }) + .into_actor(self) + .map(|result, this, _ctx| { + for message in result.unwrap().into_messages(&this.connection.nick) { + this.writer.write(message); + } + }); + ctx.spawn(fut); + } Command::VERSION(_) => { self.writer.write(Message { tags: None, diff --git a/src/connection.rs b/src/connection.rs index 84fd8ab..7d5d343 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,3 +1,5 @@ +#![allow(clippy::iter_without_into_iter)] + mod authenticate; pub mod sasl; @@ -81,7 +83,8 @@ impl TryFrom for InitiatedConnection { real_name: Some(real_name), user_id: Some(user_id), capabilities, - } = value else { + } = value + else { return Err(value); }; diff --git a/src/connection/authenticate.rs b/src/connection/authenticate.rs index 2ad106d..9a2f2bc 100644 --- a/src/connection/authenticate.rs +++ b/src/connection/authenticate.rs @@ -54,7 +54,9 @@ impl Handler for Authenticate { Err(_) => SaslStrategyUnsupported::into_message(), }; - return Box::pin(futures::future::ok(AuthenticateResult::Reply(Box::new(message)))); + return Box::pin(futures::future::ok(AuthenticateResult::Reply(Box::new( + message, + )))); }; // user has cancelled authentication @@ -96,7 +98,9 @@ pub async fn handle_plain_authentication( // split the PLAIN message into its respective parts let mut message = arguments.splitn(3, |f| *f == b'\0'); - let (Some(authorization_identity), Some(authentication_identity), Some(password)) = (message.next(), message.next(), message.next()) else { + let (Some(authorization_identity), Some(authentication_identity), Some(password)) = + (message.next(), message.next(), message.next()) + else { return Err(Error::new(ErrorKind::InvalidData, "bad plain message")); }; diff --git a/src/main.rs b/src/main.rs index 2ce8d41..8fb17b4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -98,6 +98,7 @@ async fn main() -> anyhow::Result<()> { channel_arbiters: build_arbiters(opts.config.channel_threads), config: opts.config, persistence, + max_clients: 0, }); let listener = TcpListener::bind(listen_address).await?; diff --git a/src/messages.rs b/src/messages.rs index 3f014df..15a9d50 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -115,6 +115,13 @@ pub struct ServerFetchMotd { pub span: Span, } +/// Returns the result of `LUSERS`. +#[derive(Message)] +#[rtype(result = "super::server::response::ListUsers")] +pub struct ServerListUsers { + pub span: Span, +} + /// Sent from channels to users when a user is removed from the channel. #[derive(Message)] #[rtype(result = "()")] diff --git a/src/server.rs b/src/server.rs index 7a19466..f85993f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -22,10 +22,10 @@ use crate::{ messages::{ Broadcast, ChannelFetchTopic, ChannelJoin, ChannelList, ChannelMemberList, FetchClientByNick, MessageKind, PrivateMessage, ServerDisconnect, ServerFetchMotd, - UserConnected, UserNickChange, UserNickChangeInternal, + ServerListUsers, UserConnected, UserNickChange, UserNickChangeInternal, }, persistence::Persistence, - server::response::Motd, + server::response::{ListUsers, Motd}, SERVER_NAME, }; @@ -34,6 +34,7 @@ pub struct Server { pub channel_arbiters: Vec, pub channels: HashMap>, pub clients: HashMap, InitiatedConnection>, + pub max_clients: usize, pub config: Config, pub persistence: Addr, } @@ -125,6 +126,7 @@ impl Handler for Server { } self.clients.insert(msg.handle, msg.connection); + self.max_clients = self.clients.len().max(self.max_clients); } } @@ -261,6 +263,19 @@ impl Handler for Server { } } +impl Handler for Server { + type Result = MessageResult; + + fn handle(&mut self, _msg: ServerListUsers, _ctx: &mut Self::Context) -> Self::Result { + MessageResult(ListUsers { + current_clients: self.clients.len(), + max_clients: self.max_clients, + operators_online: 0, + channels_formed: self.channels.len(), + }) + } +} + impl Handler for Server { type Result = (); diff --git a/src/server/response.rs b/src/server/response.rs index ac743d9..07ec7ab 100644 --- a/src/server/response.rs +++ b/src/server/response.rs @@ -2,6 +2,77 @@ use irc_proto::{Command, Message, Prefix, Response}; use crate::{server::Server, SERVER_NAME}; +pub struct ListUsers { + pub current_clients: usize, + pub max_clients: usize, + pub operators_online: usize, + pub channels_formed: usize, +} + +impl ListUsers { + #[must_use] + pub fn into_messages(self, for_user: &str) -> Vec { + macro_rules! msg { + ($response:ident, $($payload:expr),*) => { + + Message { + tags: None, + prefix: Some(Prefix::ServerName(SERVER_NAME.to_string())), + command: Command::Response( + Response::$response, + vec![for_user.to_string(), $($payload),*], + ), + } + }; + } + + vec![ + msg!( + RPL_LUSERCLIENT, + format!( + "There are {} users and 0 invisible on 1 servers", + self.current_clients + ) + ), + msg!( + RPL_LUSEROP, + "0".to_string(), + "operator(s) online".to_string() + ), + msg!( + RPL_LUSERCHANNELS, + self.channels_formed.to_string(), + "channels formed".to_string() + ), + msg!( + RPL_LUSERME, + format!( + "I have {} clients and 1 servers", + self.current_clients.to_string() + ) + ), + msg!( + RPL_LOCALUSERS, + self.current_clients.to_string(), + self.max_clients.to_string(), + format!( + "Current local users {}, max {}", + self.current_clients, self.max_clients + ) + ), + msg!( + RPL_GLOBALUSERS, + self.current_clients.to_string(), + self.max_clients.to_string(), + format!( + "Current global users {}, max {}", + self.current_clients, self.max_clients + ) + ), + ] + } +} + #[derive(Default)] pub struct Motd { pub motd: Option, -- libgit2 1.7.2