use std::{collections::HashMap, time::Duration};
use actix::{
fut::wrap_future, io::WriteHandler, Actor, ActorContext, ActorFutureExt, Addr, AsyncContext,
Context, Handler, MessageResult, ResponseActFuture, Running, StreamHandler, WrapFuture,
};
use chrono::{DateTime, SecondsFormat, Utc};
use clap::{crate_name, crate_version};
use futures::FutureExt;
use irc_proto::{
error::ProtocolError, message::Tag, ChannelExt, Command, Message, Prefix, Response,
};
use tokio::time::Instant;
use tracing::{debug, error, info, info_span, instrument, warn, Instrument, Span};
use crate::{
channel::Channel,
connection::{sasl::SaslAlreadyAuthenticated, Capability, InitiatedConnection, MessageSink},
messages::{
Broadcast, ChannelFetchTopic, ChannelInvite, ChannelJoin, ChannelKickUser, ChannelList,
ChannelMemberList, ChannelMessage, ChannelPart, ChannelSetMode, ChannelUpdateTopic,
FetchClientDetails, MessageKind, PrivateMessage, ServerDisconnect, ServerFetchMotd,
UserKickedFromChannel, UserNickChange, UserNickChangeInternal,
},
persistence::{
events::{
FetchUnseenChannelMessages, FetchUnseenPrivateMessages, FetchUserChannels,
FetchUserIdByNick, ReserveNick,
},
Persistence,
},
server::Server,
SERVER_NAME,
};
pub struct Client {
pub writer: MessageSink,
pub connection: InitiatedConnection,
pub server: Addr<Server>,
pub channels: HashMap<String, Addr<Channel>>,
pub last_active: Instant,
pub graceful_shutdown: bool,
pub server_leave_reason: Option<String>,
pub persistence: Addr<Persistence>,
pub span: Span,
}
impl Client {
#[must_use]
pub fn maybe_build_time_tag(&self, time: DateTime<Utc>) -> Option<Tag> {
if !self
.connection
.capabilities
.contains(Capability::SERVER_TIME)
{
return None;
}
Some(Tag(
"time".to_string(),
Some(time.to_rfc3339_opts(SecondsFormat::Millis, true)),
))
}
}
impl Actor for Client {
type Context = Context<Self>;
#[instrument(parent = &self.span, skip_all)]
fn started(&mut self, ctx: &mut Self::Context) {
info!(?self.connection, "Client has successfully joined to server");
ctx.run_interval(Duration::from_secs(30), |this, ctx| {
let _span = info_span!(parent: &this.span, "ping").entered();
if Instant::now().duration_since(this.last_active) >= Duration::from_secs(120) {
this.server_leave_reason = Some("Ping timeout: 120 seconds".to_string());
ctx.stop();
}
this.writer.write(Message {
tags: None,
prefix: None,
command: Command::PING(SERVER_NAME.to_string(), None),
});
});
ctx.spawn(
self.persistence
.send(FetchUserChannels {
user_id: self.connection.user_id,
span: Span::current(),
})
.into_actor(self)
.map(move |res, this, ctx| {
ctx.notify(JoinChannelRequest {
channels: res.unwrap(),
span: this.span.clone(),
});
}),
);
ctx.spawn(
self.persistence
.send(FetchUnseenPrivateMessages {
user_id: self.connection.user_id,
span: Span::current(),
})
.into_actor(self)
.map(move |res, this, ctx| {
for (sent, sender, message, kind) in res.unwrap() {
ctx.notify(Broadcast {
message: Message {
tags: TagBuilder::default()
.insert(this.maybe_build_time_tag(sent))
.into(),
prefix: Some(Prefix::new_from_str(&sender)),
command: match kind {
MessageKind::Normal => {
Command::PRIVMSG(this.connection.nick.clone(), message)
}
MessageKind::Notice => {
Command::NOTICE(this.connection.nick.clone(), message)
}
},
},
span: this.span.clone(),
});
}
}),
);
}
#[instrument(parent = &self.span, skip_all)]
fn stopped(&mut self, ctx: &mut Self::Context) {
let message = self.server_leave_reason.take();
self.server.do_send(ServerDisconnect {
client: ctx.address(),
message: if self.graceful_shutdown {
Some(format!("Quit: {}", message.as_deref().unwrap_or("")))
} else {
message.clone()
},
span: Span::current(),
});
for channel in self.channels.values() {
channel.do_send(ServerDisconnect {
client: ctx.address(),
message: message.clone(),
span: Span::current(),
});
}
self.writer.write(Message {
tags: None,
prefix: None,
command: Command::ERROR(if self.graceful_shutdown {
String::new()
} else {
message.unwrap_or_else(|| "Ungraceful shutdown".to_string())
}),
});
}
}
impl Handler<Broadcast> for Client {
type Result = ();
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: Broadcast, _ctx: &mut Self::Context) -> Self::Result {
self.writer.write(msg.message);
}
}
impl Handler<FetchClientDetails> for Client {
type Result = MessageResult<FetchClientDetails>;
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: FetchClientDetails, _ctx: &mut Self::Context) -> Self::Result {
MessageResult(self.connection.clone())
}
}
impl Handler<JoinChannelRequest> for Client {
type Result = ResponseActFuture<Self, ()>;
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: JoinChannelRequest, ctx: &mut Self::Context) -> Self::Result {
let mut futures = Vec::with_capacity(msg.channels.len());
for channel_name in msg.channels {
if !channel_name.is_channel_name() || self.channels.contains_key(&channel_name) {
continue;
}
let channel_handle_fut = self.server.clone().send(ChannelJoin {
channel_name: channel_name.to_string(),
client: ctx.address(),
connection: self.connection.clone(),
span: Span::current(),
});
let channel_messages_fut = self.persistence.send(FetchUnseenChannelMessages {
channel_name: channel_name.to_string(),
user_id: self.connection.user_id,
span: Span::current(),
});
futures.push(
futures::future::join(channel_handle_fut, channel_messages_fut).map(
move |(handle, messages)| {
(channel_name, handle.unwrap().unwrap(), messages.unwrap())
},
),
);
}
let fut = wrap_future::<_, Self>(
futures::future::join_all(futures.into_iter()).instrument(Span::current()),
)
.map(|result, this, _ctx| {
for (channel_name, handle, messages) in result {
let handle = match handle {
Ok(v) => v,
Err(error) => {
error!(?error, "Failed to join user to channel");
continue;
}
};
this.channels.insert(channel_name.clone(), handle);
for (sent, source, message, kind) in messages {
this.writer.write(Message {
tags: TagBuilder::default()
.insert(this.maybe_build_time_tag(sent))
.into(),
prefix: Some(Prefix::new_from_str(&source)),
command: match kind {
MessageKind::Normal => Command::PRIVMSG(channel_name.clone(), message),
MessageKind::Notice => Command::NOTICE(channel_name.clone(), message),
},
});
}
}
});
Box::pin(fut)
}
}
impl Handler<ListChannelMemberRequest> for Client {
type Result = ResponseActFuture<Self, ()>;
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: ListChannelMemberRequest, _ctx: &mut Self::Context) -> Self::Result {
let mut futures = Vec::with_capacity(msg.channels.len());
for (channel_name, handle) in &self.channels {
if !msg.channels.contains(channel_name) {
continue;
}
futures.push(handle.send(ChannelMemberList {
span: Span::current(),
}));
}
let fut = wrap_future::<_, Self>(
futures::future::join_all(futures.into_iter()).instrument(Span::current()),
)
.map(|result, this, _ctx| {
for list in result {
let list = list.unwrap();
for message in list.into_messages(
this.connection.nick.clone(),
this.connection
.capabilities
.contains(Capability::USERHOST_IN_NAMES),
) {
this.writer.write(message);
}
}
});
Box::pin(fut)
}
}
impl Handler<UserNickChangeInternal> for Client {
type Result = ResponseActFuture<Self, ()>;
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: UserNickChangeInternal, _ctx: &mut Self::Context) -> Self::Result {
self.persistence
.send(ReserveNick {
user_id: self.connection.user_id,
nick: msg.new_nick.clone(),
})
.into_actor(self)
.map(|res, this, ctx| {
if !res.unwrap() {
return;
}
this.server.do_send(UserNickChange {
client: ctx.address(),
connection: this.connection.clone(),
new_nick: msg.new_nick.clone(),
span: Span::current(),
});
for channel in this.channels.values() {
channel.do_send(UserNickChange {
client: ctx.address(),
connection: this.connection.clone(),
new_nick: msg.new_nick.clone(),
span: Span::current(),
});
}
this.connection.nick = msg.new_nick;
})
.boxed_local()
}
}
impl Handler<UserNickChange> for Client {
type Result = ();
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: UserNickChange, _ctx: &mut Self::Context) -> Self::Result {
self.writer.write(Message {
tags: None,
prefix: Some(msg.connection.to_nick()),
command: Command::NICK(msg.new_nick),
});
}
}
impl Handler<UserKickedFromChannel> for Client {
type Result = ();
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: UserKickedFromChannel, _ctx: &mut Self::Context) -> Self::Result {
self.channels.remove(&msg.channel);
}
}
impl Handler<SendPrivateMessage> for Client {
type Result = ResponseActFuture<Self, ()>;
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: SendPrivateMessage, _ctx: &mut Self::Context) -> Self::Result {
self.persistence
.send(FetchUserIdByNick {
nick: msg.destination,
})
.into_actor(self)
.map(move |res, this, ctx| {
let Some(destination) = res.unwrap() else {
eprintln!("User attempted to send a message to non-existent user");
return;
};
this.server.do_send(PrivateMessage {
destination,
message: msg.message,
kind: msg.kind,
from: ctx.address(),
span: msg.span,
});
})
.boxed_local()
}
}
impl StreamHandler<Result<irc_proto::Message, ProtocolError>> for Client {
#[instrument(parent = &self.span, skip_all)]
fn handle(&mut self, item: Result<irc_proto::Message, ProtocolError>, ctx: &mut Self::Context) {
let item = match item {
Ok(item) => {
debug!(?item, "Received message from client");
item
}
Err(error) => {
error!(%error, "Client sent a bad message");
return;
}
};
if item
.source_nickname()
.map_or(false, |v| v != self.connection.nick)
{
warn!("Rejecting message from client due to incorrect nick");
return;
}
#[allow(clippy::match_same_arms)]
match item.command {
Command::USER(_, _, _) | Command::PASS(_) | Command::CAP(_, _, _, _) => {
}
Command::NICK(new_nick) => {
ctx.notify(UserNickChangeInternal {
old_nick: self.connection.nick.to_string(),
new_nick,
span: Span::current(),
});
}
Command::OPER(_, _) => {}
Command::UserMODE(_, _) => {}
Command::SERVICE(_, _, _, _, _, _) => {}
Command::QUIT(message) => {
self.graceful_shutdown = true;
self.server_leave_reason = message;
ctx.stop();
}
Command::SQUIT(_, _) => {}
Command::JOIN(channel_names, _passwords, _real_name) => {
let channels = parse_channel_name_list(&channel_names);
ctx.notify(JoinChannelRequest {
channels,
span: Span::current(),
});
}
Command::PART(channel, message) => {
let Some(channel) = self.channels.remove(&channel) else {
return;
};
channel.do_send(ChannelPart {
client: ctx.address(),
message,
span: Span::current(),
});
}
Command::ChannelMODE(channel, modes) => {
let Some(channel) = self.channels.get(&channel) else {
return;
};
channel.do_send(ChannelSetMode {
span: Span::current(),
client: ctx.address(),
modes,
});
}
Command::TOPIC(channel, topic) => {
let Some(channel) = self.channels.get(&channel) else {
return;
};
#[allow(clippy::option_if_let_else)]
if let Some(topic) = topic {
channel.do_send(ChannelUpdateTopic {
topic,
client: ctx.address(),
span: Span::current(),
});
} else {
let span = Span::current();
let fut = channel
.send(ChannelFetchTopic { span })
.into_actor(self)
.map(|result, this, _ctx| {
for message in result
.unwrap()
.into_messages(this.connection.nick.to_string(), false)
{
this.writer.write(message);
}
});
ctx.spawn(fut);
}
}
Command::NAMES(channel_names, _) => {
let channels = parse_channel_name_list(channel_names.as_deref().unwrap_or(""));
if channels.is_empty() {
warn!("Client didn't request names for a particular channel");
return;
}
ctx.notify(ListChannelMemberRequest {
channels,
span: Span::current(),
});
}
Command::LIST(_, _) => {
let span = Span::current();
let fut = self.server.send(ChannelList { span }).into_actor(self).map(
|result, this, _ctx| {
for message in result
.unwrap()
.into_messages(this.connection.nick.to_string())
{
this.writer.write(message);
}
},
);
ctx.spawn(fut);
}
Command::INVITE(nick, channel) => {
let Some(channel) = self.channels.get(&channel) else {
error!(%channel, "User not connected to channel");
return;
};
channel.do_send(ChannelInvite {
nick,
client: ctx.address(),
span: Span::current(),
});
}
Command::KICK(channel, users, reason) => {
let Some(channel) = self.channels.get(&channel) else {
error!(%channel, "User not connected to channel");
return;
};
for user in parse_channel_name_list(&users) {
channel.do_send(ChannelKickUser {
span: Span::current(),
client: ctx.address(),
user,
reason: reason.clone(),
});
}
}
command @ (Command::NOTICE(_, _) | Command::PRIVMSG(_, _)) => {
let (target, message, kind) = match command {
Command::PRIVMSG(target, message) => (target, message, MessageKind::Normal),
Command::NOTICE(target, message) => (target, message, MessageKind::Notice),
_ => unreachable!(),
};
if !target.is_channel_name() {
ctx.notify(SendPrivateMessage {
destination: target,
message,
kind,
span: Span::current(),
});
} else if let Some(channel) = self.channels.get(&target) {
channel.do_send(ChannelMessage {
client: ctx.address(),
message,
kind,
span: Span::current(),
});
} else {
error!("User not connected to channel");
}
}
Command::MOTD(_) => {
let span = Span::current();
let fut = self
.server
.send(ServerFetchMotd { span })
.into_actor(self)
.map(|result, this, _ctx| {
for message in result
.unwrap()
.into_messages(this.connection.nick.to_string())
{
this.writer.write(message);
}
});
ctx.spawn(fut);
}
Command::LUSERS(_, _) => {}
Command::VERSION(_) => {
self.writer.write(Message {
tags: None,
prefix: Some(Prefix::ServerName(SERVER_NAME.to_string())),
command: Command::Response(
Response::RPL_VERSION,
vec![
self.connection.nick.to_string(),
format!("{}-{}", crate_name!(), crate_version!()),
SERVER_NAME.to_string(),
],
),
});
}
Command::STATS(_, _) => {}
Command::LINKS(_, _) => {}
Command::TIME(_) => {
let time = chrono::Utc::now();
self.writer.write(Message {
tags: None,
prefix: Some(Prefix::ServerName(SERVER_NAME.to_string())),
command: Command::Response(
Response::RPL_TIME,
vec![
self.connection.nick.to_string(),
SERVER_NAME.to_string(),
time.timestamp().to_string(),
time.format("%a %b %d %Y %T").to_string(),
],
),
});
}
Command::CONNECT(_, _, _) => {}
Command::TRACE(_) => {}
Command::ADMIN(_) => {}
Command::INFO(_) => {}
Command::SERVLIST(_, _) => {}
Command::SQUERY(_, _) => {}
Command::WHO(_, _) => {}
Command::WHOIS(_, _) => {}
Command::WHOWAS(_, _, _) => {}
Command::KILL(_, _) => {}
Command::PING(v, _) => {
self.writer.write(Message {
tags: None,
prefix: None,
command: Command::PONG(v, None),
});
}
Command::PONG(_, _) => {
self.last_active = Instant::now();
}
Command::ERROR(_) => {}
Command::AWAY(_) => {}
Command::REHASH => {}
Command::DIE => {}
Command::RESTART => {}
Command::SUMMON(_, _, _) => {}
Command::USERS(_) => {}
Command::WALLOPS(_) => {}
Command::USERHOST(_) => {}
Command::ISON(_) => {}
Command::SAJOIN(_, _) => {}
Command::SAMODE(_, _, _) => {}
Command::SANICK(old_nick, new_nick) => {
self.server.do_send(UserNickChangeInternal {
old_nick,
new_nick,
span: Span::current(),
});
}
Command::SAPART(_, _) => {}
Command::SAQUIT(_, _) => {}
Command::NICKSERV(_) => {}
Command::CHANSERV(_) => {}
Command::OPERSERV(_) => {}
Command::BOTSERV(_) => {}
Command::HOSTSERV(_) => {}
Command::MEMOSERV(_) => {}
Command::AUTHENTICATE(_) => {
self.writer.write(
SaslAlreadyAuthenticated(self.connection.nick.to_string()).into_message(),
);
}
Command::ACCOUNT(_) => {}
Command::METADATA(_, _, _) => {}
Command::MONITOR(_, _) => {}
Command::BATCH(_, _, _) => {}
Command::CHGHOST(_, _) => {}
Command::Response(_, _) => {}
Command::Raw(_, _) => {}
}
}
}
#[derive(Default)]
pub struct TagBuilder {
inner: Vec<Tag>,
}
impl TagBuilder {
#[must_use]
pub fn insert(mut self, tag: impl Into<Option<Tag>>) -> Self {
if let Some(tag) = tag.into() {
self.inner.push(tag);
}
self
}
}
impl From<TagBuilder> for Option<Vec<Tag>> {
fn from(value: TagBuilder) -> Self {
Some(value.inner).filter(|v| !v.is_empty())
}
}
#[must_use]
pub fn parse_channel_name_list(s: &str) -> Vec<String> {
s.split(',')
.filter(|v| !v.is_empty())
.map(ToString::to_string)
.collect()
}
impl WriteHandler<ProtocolError> for Client {
#[instrument(parent = &self.span, skip_all)]
fn error(&mut self, error: ProtocolError, _ctx: &mut Self::Context) -> Running {
error!(%error, "Failed to write message to client");
Running::Continue
}
}
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
struct SendPrivateMessage {
destination: String,
message: String,
kind: MessageKind,
span: Span,
}
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
struct ListChannelMemberRequest {
channels: Vec<String>,
span: Span,
}
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
struct JoinChannelRequest {
channels: Vec<String>,
span: Span,
}