use std::{collections::HashMap, time::Duration};
use actix::{
fut::wrap_future, io::WriteHandler, Actor, ActorContext, ActorFutureExt, Addr, AsyncContext,
Context, Handler, MessageResult, ResponseActFuture, Running, StreamHandler, WrapFuture,
};
use clap::{crate_name, crate_version};
use futures::FutureExt;
use irc_proto::{error::ProtocolError, ChannelExt, Command, Message, Prefix, Response};
use tokio::time::Instant;
use tracing::{debug, error, info_span, instrument, warn, Instrument, Span};
use crate::{
channel::Channel,
connection::{InitiatedConnection, MessageSink},
messages::{
Broadcast, ChannelFetchTopic, ChannelInvite, ChannelJoin, ChannelKickUser, ChannelList,
ChannelMemberList, ChannelMessage, ChannelPart, ChannelUpdateTopic, FetchClientDetails,
ServerDisconnect, ServerFetchMotd, UserKickedFromChannel, UserNickChange,
UserNickChangeInternal,
},
persistence::{events::FetchUserChannels, Persistence},
server::Server,
SERVER_NAME,
};
pub struct Client {
pub writer: MessageSink,
pub connection: InitiatedConnection,
pub server: Addr<Server>,
pub channels: HashMap<String, Addr<Channel>>,
pub last_active: Instant,
pub graceful_shutdown: bool,
pub server_leave_reason: Option<String>,
pub persistence: Addr<Persistence>,
pub span: Span,
}
impl Actor for Client {
type Context = Context<Self>;
#[instrument(parent = &self.span, skip_all)]
fn started(&mut self, ctx: &mut Self::Context) {
ctx.run_interval(Duration::from_secs(30), |this, ctx| {
let _span = info_span!(parent: &this.span, "ping").entered();
if Instant::now().duration_since(this.last_active) > Duration::from_secs(120) {
this.server_leave_reason = Some("Ping timeout: 120 seconds".to_string());
ctx.stop();
}
this.writer.write(Message {
tags: None,
prefix: None,
command: Command::PING(SERVER_NAME.to_string(), None),
});
});
ctx.spawn(
self.persistence
.send(FetchUserChannels {
username: self.connection.user.to_string(),
span: Span::current(),
})
.into_actor(self)
.map(move |res, this, ctx| {
ctx.notify(JoinChannelRequest {
channels: res.unwrap(),
span: this.span.clone(),
});
}),
);
}
#[instrument(parent = &self.span, skip_all)]
fn stopped(&mut self, ctx: &mut Self::Context) {
let message = self.server_leave_reason.take();
self.server.do_send(ServerDisconnect {
client: ctx.address(),
message: message.clone(),
span: Span::current(),
});
for channel in self.channels.values() {
channel.do_send(ServerDisconnect {
client: ctx.address(),
message: message.clone(),
span: Span::current(),
});
}
if self.graceful_shutdown {
self.writer.write(Message {
tags: None,
prefix: Some(self.connection.to_nick()),
command: Command::QUIT(message),
});
} else {
let message = message.unwrap_or_else(|| "Ungraceful shutdown".to_string());
self.writer.write(Message {
tags: None,
prefix: None,
command: Command::ERROR(message),
});
}
}
}
impl Handler<Broadcast> for Client {
type Result = ();
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: Broadcast, _ctx: &mut Self::Context) -> Self::Result {
self.writer.write(msg.message);
}
}
impl Handler<FetchClientDetails> for Client {
type Result = MessageResult<FetchClientDetails>;
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: FetchClientDetails, _ctx: &mut Self::Context) -> Self::Result {
MessageResult(self.connection.clone())
}
}
impl Handler<JoinChannelRequest> for Client {
type Result = ResponseActFuture<Self, ()>;
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: JoinChannelRequest, ctx: &mut Self::Context) -> Self::Result {
let mut futures = Vec::with_capacity(msg.channels.len());
for channel_name in msg.channels {
if !channel_name.is_channel_name() {
continue;
}
futures.push(
self.server
.clone()
.send(ChannelJoin {
channel_name: channel_name.to_string(),
client: ctx.address(),
connection: self.connection.clone(),
span: Span::current(),
})
.map(move |v| (channel_name, v.unwrap().unwrap())),
);
}
let fut = wrap_future::<_, Self>(
futures::future::join_all(futures.into_iter()).instrument(Span::current()),
)
.map(|result, this, _ctx| {
for (channel_name, handle) in result {
this.channels.insert(channel_name.clone(), handle);
}
});
Box::pin(fut)
}
}
impl Handler<ListChannelMemberRequest> for Client {
type Result = ResponseActFuture<Self, ()>;
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: ListChannelMemberRequest, _ctx: &mut Self::Context) -> Self::Result {
let mut futures = Vec::with_capacity(msg.channels.len());
for (channel_name, handle) in &self.channels {
if !msg.channels.contains(channel_name) {
continue;
}
futures.push(handle.send(ChannelMemberList {
span: Span::current(),
}));
}
let fut = wrap_future::<_, Self>(
futures::future::join_all(futures.into_iter()).instrument(Span::current()),
)
.map(|result, this, _ctx| {
for list in result {
let list = list.unwrap();
for message in list.into_messages(this.connection.nick.clone()) {
this.writer.write(message);
}
}
});
Box::pin(fut)
}
}
impl Handler<UserNickChangeInternal> for Client {
type Result = ();
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: UserNickChangeInternal, ctx: &mut Self::Context) -> Self::Result {
self.server.do_send(UserNickChange {
client: ctx.address(),
connection: self.connection.clone(),
new_nick: msg.new_nick.clone(),
span: Span::current(),
});
for channel in self.channels.values() {
channel.do_send(UserNickChange {
client: ctx.address(),
connection: self.connection.clone(),
new_nick: msg.new_nick.clone(),
span: Span::current(),
});
}
self.connection.nick = msg.new_nick;
}
}
impl Handler<UserNickChange> for Client {
type Result = ();
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: UserNickChange, _ctx: &mut Self::Context) -> Self::Result {
self.writer.write(Message {
tags: None,
prefix: Some(msg.connection.to_nick()),
command: Command::NICK(msg.new_nick),
});
}
}
impl Handler<UserKickedFromChannel> for Client {
type Result = ();
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: UserKickedFromChannel, _ctx: &mut Self::Context) -> Self::Result {
self.channels.remove(&msg.channel);
}
}
impl StreamHandler<Result<irc_proto::Message, ProtocolError>> for Client {
#[instrument(parent = &self.span, skip_all)]
fn handle(&mut self, item: Result<irc_proto::Message, ProtocolError>, ctx: &mut Self::Context) {
let item = match item {
Ok(item) => {
debug!(?item, "Received message from client");
item
}
Err(error) => {
error!(%error, "Client sent a bad message");
return;
}
};
if item
.source_nickname()
.map_or(false, |v| v != self.connection.nick)
{
warn!("Rejecting message from client due to incorrect nick");
return;
}
#[allow(clippy::match_same_arms)]
match item.command {
Command::USER(_, _, _) | Command::PASS(_) | Command::CAP(_, _, _, _) => {
}
Command::NICK(new_nick) => {
ctx.notify(UserNickChangeInternal {
old_nick: self.connection.nick.to_string(),
new_nick,
span: Span::current(),
});
}
Command::OPER(_, _) => {}
Command::UserMODE(_, _) => {}
Command::SERVICE(_, _, _, _, _, _) => {}
Command::QUIT(message) => {
self.graceful_shutdown = true;
self.server_leave_reason = message;
ctx.stop();
}
Command::SQUIT(_, _) => {}
Command::JOIN(channel_names, _passwords, _real_name) => {
let channels = parse_channel_name_list(&channel_names);
ctx.notify(JoinChannelRequest {
channels,
span: Span::current(),
});
}
Command::PART(channel, message) => {
let Some(channel) = self.channels.remove(&channel) else {
return;
};
channel.do_send(ChannelPart {
client: ctx.address(),
message,
span: Span::current(),
});
}
Command::ChannelMODE(_, _) => {}
Command::TOPIC(channel, topic) => {
let Some(channel) = self.channels.get(&channel) else {
return;
};
#[allow(clippy::option_if_let_else)]
if let Some(topic) = topic {
channel.do_send(ChannelUpdateTopic {
topic,
client: ctx.address(),
span: Span::current(),
});
} else {
let span = Span::current();
let fut = channel
.send(ChannelFetchTopic { span })
.into_actor(self)
.map(|result, this, _ctx| {
for message in result
.unwrap()
.into_messages(this.connection.nick.to_string(), false)
{
this.writer.write(message);
}
});
ctx.spawn(fut);
}
}
Command::NAMES(channel_names, _) => {
let channels = parse_channel_name_list(channel_names.as_deref().unwrap_or(""));
if channels.is_empty() {
warn!("Client didn't request names for a particular channel");
return;
}
ctx.notify(ListChannelMemberRequest {
channels,
span: Span::current(),
});
}
Command::LIST(_, _) => {
let span = Span::current();
let fut = self.server.send(ChannelList { span }).into_actor(self).map(
|result, this, _ctx| {
for message in result
.unwrap()
.into_messages(this.connection.nick.to_string())
{
this.writer.write(message);
}
},
);
ctx.spawn(fut);
}
Command::INVITE(nick, channel) => {
let Some(channel) = self.channels.get(&channel) else {
error!(%channel, "User not connected to channel");
return;
};
channel.do_send(ChannelInvite {
nick,
client: ctx.address(),
span: Span::current(),
});
}
Command::KICK(channel, users, reason) => {
let Some(channel) = self.channels.get(&channel) else {
error!(%channel, "User not connected to channel");
return;
};
for user in parse_channel_name_list(&users) {
channel.do_send(ChannelKickUser {
span: Span::current(),
client: ctx.address(),
user,
reason: reason.clone(),
});
}
}
Command::PRIVMSG(target, message) => {
if !target.is_channel_name() {
error!("Private messages not implemented");
} else if let Some(channel) = self.channels.get(&target) {
channel.do_send(ChannelMessage {
client: ctx.address(),
message,
span: Span::current(),
});
} else {
error!("User not connected to channel");
}
}
Command::NOTICE(_, _) => {}
Command::MOTD(_) => {
let span = Span::current();
let fut = self
.server
.send(ServerFetchMotd { span })
.into_actor(self)
.map(|result, this, _ctx| {
for message in result
.unwrap()
.into_messages(this.connection.nick.to_string())
{
this.writer.write(message);
}
});
ctx.spawn(fut);
}
Command::LUSERS(_, _) => {}
Command::VERSION(_) => {
self.writer.write(Message {
tags: None,
prefix: Some(Prefix::ServerName(SERVER_NAME.to_string())),
command: Command::Response(
Response::RPL_VERSION,
vec![
self.connection.nick.to_string(),
format!("{}-{}", crate_name!(), crate_version!()),
SERVER_NAME.to_string(),
],
),
});
}
Command::STATS(_, _) => {}
Command::LINKS(_, _) => {}
Command::TIME(_) => {}
Command::CONNECT(_, _, _) => {}
Command::TRACE(_) => {}
Command::ADMIN(_) => {}
Command::INFO(_) => {}
Command::SERVLIST(_, _) => {}
Command::SQUERY(_, _) => {}
Command::WHO(_, _) => {}
Command::WHOIS(_, _) => {}
Command::WHOWAS(_, _, _) => {}
Command::KILL(_, _) => {}
Command::PING(_, _) => {}
Command::PONG(_, _) => {
self.last_active = Instant::now();
}
Command::ERROR(_) => {}
Command::AWAY(_) => {}
Command::REHASH => {}
Command::DIE => {}
Command::RESTART => {}
Command::SUMMON(_, _, _) => {}
Command::USERS(_) => {}
Command::WALLOPS(_) => {}
Command::USERHOST(_) => {}
Command::ISON(_) => {}
Command::SAJOIN(_, _) => {}
Command::SAMODE(_, _, _) => {}
Command::SANICK(old_nick, new_nick) => {
self.server.do_send(UserNickChangeInternal {
old_nick,
new_nick,
span: Span::current(),
});
}
Command::SAPART(_, _) => {}
Command::SAQUIT(_, _) => {}
Command::NICKSERV(_) => {}
Command::CHANSERV(_) => {}
Command::OPERSERV(_) => {}
Command::BOTSERV(_) => {}
Command::HOSTSERV(_) => {}
Command::MEMOSERV(_) => {}
Command::AUTHENTICATE(_) => {}
Command::ACCOUNT(_) => {}
Command::METADATA(_, _, _) => {}
Command::MONITOR(_, _) => {}
Command::BATCH(_, _, _) => {}
Command::CHGHOST(_, _) => {}
Command::Response(_, _) => {}
Command::Raw(_, _) => {}
}
}
}
#[must_use]
pub fn parse_channel_name_list(s: &str) -> Vec<String> {
s.split(',')
.filter(|v| !v.is_empty())
.map(ToString::to_string)
.collect()
}
impl WriteHandler<ProtocolError> for Client {
#[instrument(parent = &self.span, skip_all)]
fn error(&mut self, error: ProtocolError, _ctx: &mut Self::Context) -> Running {
error!(%error, "Failed to write message to client");
Running::Continue
}
}
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
struct ListChannelMemberRequest {
channels: Vec<String>,
span: Span,
}
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
struct JoinChannelRequest {
channels: Vec<String>,
span: Span,
}