Implement LUSERS command
Diff
src/channel.rs | 11 ++-
src/client.rs | 156 +++++++++++++++++++++++-------------------
src/connection.rs | 5 +-
src/connection/authenticate.rs | 8 +-
src/main.rs | 1 +-
src/messages.rs | 7 ++-
src/server.rs | 19 ++++-
src/server/response.rs | 71 +++++++++++++++++++-
8 files changed, 204 insertions(+), 74 deletions(-)
@@ -245,7 +245,11 @@ impl Handler<SetUserMode> for Channel {
let permissions = self.get_user_permissions(msg.requester.user_id);
let Some((_, affected_user)) = self.clients.iter().find(|(_, connection)| connection.nick == msg.affected_nick) else {
let Some((_, affected_user)) = self
.clients
.iter()
.find(|(_, connection)| connection.nick == msg.affected_nick)
else {
error!("Unknown user to set perms on");
return;
};
@@ -290,7 +294,10 @@ impl Handler<SetUserMode> for Channel {
.values()
.filter(|connection| connection.user_id == affected_user.user_id);
for connection in all_connected_for_user_id {
let Some(mode) = msg.user_mode.into_mode(msg.add, connection.nick.to_string()) else {
let Some(mode) = msg
.user_mode
.into_mode(msg.add, connection.nick.to_string())
else {
continue;
};
@@ -1,8 +1,9 @@
use std::{collections::HashMap, time::Duration};
use actix::{
fut::wrap_future, io::WriteHandler, Actor, ActorContext, ActorFutureExt, Addr, AsyncContext,
Context, Handler, MessageResult, ResponseActFuture, Running, StreamHandler, WrapFuture,
fut::wrap_future, io::WriteHandler, Actor, ActorContext, ActorFuture, ActorFutureExt, Addr,
AsyncContext, Context, Handler, MessageResult, ResponseActFuture, Running, StreamHandler,
WrapFuture,
};
use chrono::{DateTime, SecondsFormat, Utc};
use clap::{crate_name, crate_version};
@@ -11,7 +12,7 @@ use irc_proto::{
error::ProtocolError, message::Tag, ChannelExt, Command, Message, Prefix, Response,
};
use tokio::time::Instant;
use tracing::{debug, error, info, info_span, instrument, warn, Instrument, Span};
use tracing::{debug, error, info, instrument, warn, Instrument, Span};
use crate::{
channel::Channel,
@@ -23,7 +24,7 @@ use crate::{
Broadcast, ChannelFetchTopic, ChannelInvite, ChannelJoin, ChannelKickUser, ChannelList,
ChannelMemberList, ChannelMessage, ChannelPart, ChannelSetMode, ChannelUpdateTopic,
FetchClientDetails, MessageKind, PrivateMessage, ServerDisconnect, ServerFetchMotd,
UserKickedFromChannel, UserNickChange, UserNickChangeInternal,
ServerListUsers, UserKickedFromChannel, UserNickChange, UserNickChangeInternal,
},
persistence::{
events::{
@@ -79,6 +80,74 @@ impl Client {
Some(time.to_rfc3339_opts(SecondsFormat::Millis, true)),
))
}
#[instrument(parent = &self.span, skip_all)]
fn handle_ping_interval(&mut self, ctx: &mut Context<Self>) {
if Instant::now().duration_since(self.last_active) >= Duration::from_secs(120) {
self.server_leave_reason = Some("Ping timeout: 120 seconds".to_string());
ctx.stop();
}
self.writer.write(Message {
tags: None,
prefix: None,
command: Command::PING(SERVER_NAME.to_string(), None),
});
}
fn rejoin_channels(&self) -> impl ActorFuture<Self, Output = ()> + 'static {
self.persistence
.send(FetchUserChannels {
user_id: self.connection.user_id,
span: Span::current(),
})
.into_actor(self)
.map(move |res, this, ctx| {
ctx.notify(JoinChannelRequest {
channels: res.unwrap(),
span: this.span.clone(),
});
})
}
fn build_unseen_message(
&self,
sent: DateTime<Utc>,
sender: &str,
message: String,
kind: MessageKind,
) -> Message {
Message {
tags: TagBuilder::default()
.insert(self.maybe_build_time_tag(sent))
.into(),
prefix: Some(Prefix::new_from_str(sender)),
command: match kind {
MessageKind::Normal => Command::PRIVMSG(self.connection.nick.clone(), message),
MessageKind::Notice => Command::NOTICE(self.connection.nick.clone(), message),
},
}
}
fn send_unseen_private_messages(&self) -> impl ActorFuture<Self, Output = ()> + 'static {
self.persistence
.send(FetchUnseenPrivateMessages {
user_id: self.connection.user_id,
span: Span::current(),
})
.into_actor(self)
.map(move |res, this, ctx| {
for (sent, sender, message, kind) in res.unwrap() {
ctx.notify(Broadcast {
message: this.build_unseen_message(sent, &sender, message, kind),
span: this.span.clone(),
});
}
})
}
}
impl Actor for Client {
@@ -91,68 +160,9 @@ impl Actor for Client {
fn started(&mut self, ctx: &mut Self::Context) {
info!(?self.connection, "Client has successfully joined to server");
ctx.run_interval(Duration::from_secs(30), |this, ctx| {
let _span = info_span!(parent: &this.span, "ping").entered();
if Instant::now().duration_since(this.last_active) >= Duration::from_secs(120) {
this.server_leave_reason = Some("Ping timeout: 120 seconds".to_string());
ctx.stop();
}
this.writer.write(Message {
tags: None,
prefix: None,
command: Command::PING(SERVER_NAME.to_string(), None),
});
});
ctx.spawn(
self.persistence
.send(FetchUserChannels {
user_id: self.connection.user_id,
span: Span::current(),
})
.into_actor(self)
.map(move |res, this, ctx| {
ctx.notify(JoinChannelRequest {
channels: res.unwrap(),
span: this.span.clone(),
});
}),
);
ctx.spawn(
self.persistence
.send(FetchUnseenPrivateMessages {
user_id: self.connection.user_id,
span: Span::current(),
})
.into_actor(self)
.map(move |res, this, ctx| {
for (sent, sender, message, kind) in res.unwrap() {
ctx.notify(Broadcast {
message: Message {
tags: TagBuilder::default()
.insert(this.maybe_build_time_tag(sent))
.into(),
prefix: Some(Prefix::new_from_str(&sender)),
command: match kind {
MessageKind::Normal => {
Command::PRIVMSG(this.connection.nick.clone(), message)
}
MessageKind::Notice => {
Command::NOTICE(this.connection.nick.clone(), message)
}
},
},
span: this.span.clone(),
});
}
}),
);
ctx.run_interval(Duration::from_secs(30), Self::handle_ping_interval);
ctx.spawn(self.rejoin_channels());
ctx.spawn(self.send_unseen_private_messages());
}
@@ -652,7 +662,19 @@ impl StreamHandler<Result<irc_proto::Message, ProtocolError>> for Client {
ctx.spawn(fut);
}
Command::LUSERS(_, _) => {}
Command::LUSERS(_, _) => {
let span = Span::current();
let fut = self
.server
.send(ServerListUsers { span })
.into_actor(self)
.map(|result, this, _ctx| {
for message in result.unwrap().into_messages(&this.connection.nick) {
this.writer.write(message);
}
});
ctx.spawn(fut);
}
Command::VERSION(_) => {
self.writer.write(Message {
tags: None,
@@ -1,3 +1,5 @@
#![allow(clippy::iter_without_into_iter)]
mod authenticate;
pub mod sasl;
@@ -81,7 +83,8 @@ impl TryFrom<ConnectionRequest> for InitiatedConnection {
real_name: Some(real_name),
user_id: Some(user_id),
capabilities,
} = value else {
} = value
else {
return Err(value);
};
@@ -54,7 +54,9 @@ impl Handler<AuthenticateMessage> for Authenticate {
Err(_) => SaslStrategyUnsupported::into_message(),
};
return Box::pin(futures::future::ok(AuthenticateResult::Reply(Box::new(message))));
return Box::pin(futures::future::ok(AuthenticateResult::Reply(Box::new(
message,
))));
};
@@ -96,7 +98,9 @@ pub async fn handle_plain_authentication(
let mut message = arguments.splitn(3, |f| *f == b'\0');
let (Some(authorization_identity), Some(authentication_identity), Some(password)) = (message.next(), message.next(), message.next()) else {
let (Some(authorization_identity), Some(authentication_identity), Some(password)) =
(message.next(), message.next(), message.next())
else {
return Err(Error::new(ErrorKind::InvalidData, "bad plain message"));
};
@@ -98,6 +98,7 @@ async fn main() -> anyhow::Result<()> {
channel_arbiters: build_arbiters(opts.config.channel_threads),
config: opts.config,
persistence,
max_clients: 0,
});
let listener = TcpListener::bind(listen_address).await?;
@@ -115,6 +115,13 @@ pub struct ServerFetchMotd {
pub span: Span,
}
#[derive(Message)]
#[rtype(result = "super::server::response::ListUsers")]
pub struct ServerListUsers {
pub span: Span,
}
#[derive(Message)]
#[rtype(result = "()")]
@@ -22,10 +22,10 @@ use crate::{
messages::{
Broadcast, ChannelFetchTopic, ChannelJoin, ChannelList, ChannelMemberList,
FetchClientByNick, MessageKind, PrivateMessage, ServerDisconnect, ServerFetchMotd,
UserConnected, UserNickChange, UserNickChangeInternal,
ServerListUsers, UserConnected, UserNickChange, UserNickChangeInternal,
},
persistence::Persistence,
server::response::Motd,
server::response::{ListUsers, Motd},
SERVER_NAME,
};
@@ -34,6 +34,7 @@ pub struct Server {
pub channel_arbiters: Vec<Arbiter>,
pub channels: HashMap<String, Addr<Channel>>,
pub clients: HashMap<Addr<Client>, InitiatedConnection>,
pub max_clients: usize,
pub config: Config,
pub persistence: Addr<Persistence>,
}
@@ -125,6 +126,7 @@ impl Handler<UserConnected> for Server {
}
self.clients.insert(msg.handle, msg.connection);
self.max_clients = self.clients.len().max(self.max_clients);
}
}
@@ -261,6 +263,19 @@ impl Handler<ChannelList> for Server {
}
}
impl Handler<ServerListUsers> for Server {
type Result = MessageResult<ServerListUsers>;
fn handle(&mut self, _msg: ServerListUsers, _ctx: &mut Self::Context) -> Self::Result {
MessageResult(ListUsers {
current_clients: self.clients.len(),
max_clients: self.max_clients,
operators_online: 0,
channels_formed: self.channels.len(),
})
}
}
impl Handler<PrivateMessage> for Server {
type Result = ();
@@ -2,6 +2,77 @@ use irc_proto::{Command, Message, Prefix, Response};
use crate::{server::Server, SERVER_NAME};
pub struct ListUsers {
pub current_clients: usize,
pub max_clients: usize,
pub operators_online: usize,
pub channels_formed: usize,
}
impl ListUsers {
#[must_use]
pub fn into_messages(self, for_user: &str) -> Vec<Message> {
macro_rules! msg {
($response:ident, $($payload:expr),*) => {
Message {
tags: None,
prefix: Some(Prefix::ServerName(SERVER_NAME.to_string())),
command: Command::Response(
Response::$response,
vec![for_user.to_string(), $($payload),*],
),
}
};
}
vec![
msg!(
RPL_LUSERCLIENT,
format!(
"There are {} users and 0 invisible on 1 servers",
self.current_clients
)
),
msg!(
RPL_LUSEROP,
"0".to_string(),
"operator(s) online".to_string()
),
msg!(
RPL_LUSERCHANNELS,
self.channels_formed.to_string(),
"channels formed".to_string()
),
msg!(
RPL_LUSERME,
format!(
"I have {} clients and 1 servers",
self.current_clients.to_string()
)
),
msg!(
RPL_LOCALUSERS,
self.current_clients.to_string(),
self.max_clients.to_string(),
format!(
"Current local users {}, max {}",
self.current_clients, self.max_clients
)
),
msg!(
RPL_GLOBALUSERS,
self.current_clients.to_string(),
self.max_clients.to_string(),
format!(
"Current global users {}, max {}",
self.current_clients, self.max_clients
)
),
]
}
}
#[derive(Default)]
pub struct Motd {
pub motd: Option<String>,