From 47b42fcb8b8d1cf4ac574b5b2183531738d0b60d Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Sat, 30 Jan 2021 19:50:50 +0000 Subject: [PATCH] Light refactoring --- Cargo.lock | 41 ++++++++++++++++++++++++++--------------- titanirc-codec/src/lib.rs | 2 ++ titanirc-codec/src/wire.rs | 12 ++++++++---- titanirc-server/Cargo.toml | 3 ++- titanirc-server/src/entities/channel.rs | 97 ------------------------------------------------------------------------------------------------- titanirc-server/src/entities/channel/events.rs | 38 ++++++++++++++++++++++++++++++++++++++ titanirc-server/src/entities/channel/mod.rs | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ titanirc-server/src/entities/user.rs | 148 ---------------------------------------------------------------------------------------------------------------------------------------------------- titanirc-server/src/entities/user/commands.rs | 113 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ titanirc-server/src/entities/user/events.rs | 8 ++++++++ titanirc-server/src/entities/user/mod.rs | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ titanirc-server/src/main.rs | 8 +++----- titanirc-server/src/server.rs | 43 +++++++++++++++++++++++++++++++------------ titanirc-types/Cargo.toml | 3 ++- titanirc-types/src/lib.rs | 45 +++++++++++++++++++++++++-------------------- titanirc-types/src/primitives.rs | 78 +++++++++++++++++++++++++++++++++++++++++++++++------------------------------- titanirc-types/src/replies.rs | 4 +++- 17 files changed, 456 insertions(+), 335 deletions(-) delete mode 100644 titanirc-server/src/entities/channel.rs create mode 100644 titanirc-server/src/entities/channel/events.rs create mode 100644 titanirc-server/src/entities/channel/mod.rs delete mode 100644 titanirc-server/src/entities/user.rs create mode 100644 titanirc-server/src/entities/user/commands.rs create mode 100644 titanirc-server/src/entities/user/events.rs create mode 100644 titanirc-server/src/entities/user/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 1a756ae..a06c6d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -212,9 +212,9 @@ dependencies = [ [[package]] name = "data-encoding" -version = "2.3.1" +version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "993a608597367c6377b258c25d7120740f00ed23a2252b729b1932dd7866f908" +checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57" [[package]] name = "derive_more" @@ -445,9 +445,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.82" +version = "0.2.84" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89203f3fba0a3795506acaad8ebce3c80c0af93f994d5a1d7a0b1eeb23271929" +checksum = "1cca32fa0182e8c0989459524dc356b8f2b5c10f1b9eb521b7d182c03cf8c5ff" [[package]] name = "linked-hash-map" @@ -466,11 +466,11 @@ dependencies = [ [[package]] name = "log" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcf3805d4480bb5b86070dcfeb9e2cb2ebc148adb753c5cca5f884d1d65a42b2" +checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", ] [[package]] @@ -536,6 +536,15 @@ dependencies = [ ] [[package]] +name = "nom-bytes" +version = "0.1.0" +source = "git+https://github.com/w4/nom-bytes#7e44afde2e53f447fc9c77297eb513186b8fb0da" +dependencies = [ + "bytes", + "nom", +] + +[[package]] name = "ntapi" version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -689,9 +698,9 @@ checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8" [[package]] name = "rand" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18519b42a40024d661e1714153e9ad0c3de27cd495760ceb09710920f1098b1e" +checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e" dependencies = [ "libc", "rand_chacha", @@ -856,9 +865,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccf8dbc19eb42fba10e8feaaec282fb50e2c14b2726d6301dbfeed0f73306a6f" +checksum = "317cca572a0e89c3ce0ca1f1bdc9369547fe318a683418e42ac8f59d14701023" dependencies = [ "tinyvec_macros", ] @@ -886,6 +895,7 @@ dependencies = [ "actix", "actix-rt", "async-stream", + "bytes", "clap", "displaydoc", "futures-util", @@ -903,14 +913,15 @@ dependencies = [ "bytes", "derive_more", "nom", + "nom-bytes", "paste", ] [[package]] name = "tokio" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8efab2086f17abcddb8f756117665c958feee6b2e39974c2f1600592ab3a4195" +checksum = "6714d663090b6b0acb0fa85841c6d66233d150cdb2602c8f9b8abb03370beb3f" dependencies = [ "autocfg", "bytes", @@ -1058,9 +1069,9 @@ checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" [[package]] name = "wasi" -version = "0.10.1+wasi-snapshot-preview1" +version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93c6c3420963c5c64bca373b25e77acb562081b9bb4dd5bb864187742186cea9" +checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" [[package]] name = "widestring" diff --git a/titanirc-codec/src/lib.rs b/titanirc-codec/src/lib.rs index 206da83..f44a408 100644 --- a/titanirc-codec/src/lib.rs +++ b/titanirc-codec/src/lib.rs @@ -1,3 +1,5 @@ +//! Used to encode/decode messages to and from the client. + #![deny(clippy::pedantic)] #![allow(clippy::missing_errors_doc)] diff --git a/titanirc-codec/src/wire.rs b/titanirc-codec/src/wire.rs index 9b9cee5..01e6da7 100644 --- a/titanirc-codec/src/wire.rs +++ b/titanirc-codec/src/wire.rs @@ -1,4 +1,4 @@ -use bytes::{Buf, BytesMut}; +use bytes::{BytesMut}; use titanirc_types::Command; use tokio_util::codec::Decoder as FrameDecoder; @@ -25,11 +25,15 @@ impl FrameDecoder for Decoder { return Ok(None); }; - let bytes = src.copy_to_bytes(length + 1); + let bytes = { + let mut b = src.split_to(length + 1); + b.truncate(b.len() - 2); // remove the crlf at the end of the buffer + b.freeze() + }; - eprintln!("{:?}", std::str::from_utf8(&bytes[..bytes.len() - 2])); + eprintln!("{:?}", std::str::from_utf8(&bytes[..])); - match Command::parse(&bytes[..bytes.len() - 2]) { + match Command::parse(bytes) { Ok(Some(msg)) => Ok(Some(msg)), Ok(None) => Err(std::io::Error::new( std::io::ErrorKind::InvalidData, diff --git a/titanirc-server/Cargo.toml b/titanirc-server/Cargo.toml index 1b4032c..5dcc9ad 100644 --- a/titanirc-server/Cargo.toml +++ b/titanirc-server/Cargo.toml @@ -18,4 +18,5 @@ async-stream = "0.3" thiserror = "1" displaydoc = "0.1" clap = "3.0.0-beta.2" -futures-util = "0.3" \ No newline at end of file +futures-util = "0.3" +bytes = "1.0" \ No newline at end of file diff --git a/titanirc-server/src/entities/channel.rs b/titanirc-server/src/entities/channel.rs deleted file mode 100644 index b399cc2..0000000 --- a/titanirc-server/src/entities/channel.rs +++ /dev/null @@ -1,97 +0,0 @@ -use std::sync::Arc; - -use actix::prelude::*; - -use crate::{entities::user::User, server::Server}; - -pub mod events { - use crate::entities::user::User; - use actix::prelude::*; - - pub type JoinResult = Result; - - #[derive(Message)] - #[rtype(result = "JoinResult")] - pub struct Join { - pub channel_name: String, - pub nick: String, - pub user: Addr, - } - - #[derive(Debug, thiserror::Error, displaydoc::Display)] - pub enum JoinError { - /// Failed to send join request to channel: {0} - Mailbox(#[from] actix::MailboxError), - } - - #[derive(Message)] - #[rtype(result = "")] - pub struct JoinBroadcast { - pub channel_name: String, - pub nick: String, - } - - impl From for JoinBroadcast { - fn from( - Join { - channel_name, nick, .. - }: Join, - ) -> Self { - Self { channel_name, nick } - } - } -} - -pub struct Handle { - //pub name_change: actix::Recipient, - pub message: actix::Recipient, -} - -pub struct Channel { - pub members: Vec>, -} - -impl Channel { - fn announce_join(&self, join: events::Join) -> impl Future { - let mut futures = Vec::new(); - - let broadcast = Arc::new(events::JoinBroadcast::from(join)); - - for member in &self.members { - futures.push(member.send(broadcast.clone())); - } - - async { - futures_util::future::try_join_all(futures).await.unwrap(); - } - } -} - -impl Actor for Channel { - type Context = Context; -} - -impl actix::Handler for Channel { - type Result = events::JoinResult; - - fn handle(&mut self, msg: events::Join, ctx: &mut Self::Context) -> Self::Result { - self.members.push(msg.user.clone()); - - ctx.spawn(self.announce_join(msg).into_actor(self)); - - Ok(Handle { - message: ctx.address().recipient(), - }) - } -} - -impl actix::Handler for Channel { - type Result = (); - - fn handle( - &mut self, - msg: super::common_events::Message, - ctx: &mut Self::Context, - ) -> Self::Result { - } -} diff --git a/titanirc-server/src/entities/channel/events.rs b/titanirc-server/src/entities/channel/events.rs new file mode 100644 index 0000000..f40b862 --- /dev/null +++ b/titanirc-server/src/entities/channel/events.rs @@ -0,0 +1,38 @@ +use crate::entities::user::User; +use actix::prelude::*; + +pub type JoinResult = Result; + +/// Send from `User` to `Channel` via `Server`, the `Channel` then replies back +/// with a direct handle for the `User` to interact with the `Channel`. +#[derive(Message)] +#[rtype(result = "JoinResult")] +pub struct Join { + pub channel_name: String, + pub nick: String, + pub user: Addr, +} + +#[derive(Debug, thiserror::Error, displaydoc::Display)] +pub enum JoinError { + /// Failed to send join request to channel: {0} + Mailbox(#[from] actix::MailboxError), +} + +/// Sent directly to every `User` when another `User` joins the channel. +#[derive(Message)] +#[rtype(result = "")] +pub struct JoinBroadcast { + pub channel_name: String, + pub nick: String, +} + +impl From for JoinBroadcast { + fn from( + Join { + channel_name, nick, .. + }: Join, + ) -> Self { + Self { channel_name, nick } + } +} diff --git a/titanirc-server/src/entities/channel/mod.rs b/titanirc-server/src/entities/channel/mod.rs new file mode 100644 index 0000000..3bc306d --- /dev/null +++ b/titanirc-server/src/entities/channel/mod.rs @@ -0,0 +1,70 @@ +pub mod events; + +use actix::prelude::*; +use std::sync::Arc; + +use crate::entities::user::User; + +/// A handle to this `Channel` that `User` actors can use to communicate with the +/// rest of the channel. +pub struct Handle { + //pub name_change: actix::Recipient, + pub message: actix::Recipient, +} + +/// An IRC channel. +pub struct Channel { + pub members: Vec>, +} + +impl Channel { + pub fn new() -> Self { + Self { + members: Vec::new(), + } + } + + /// Announce a user's join event to the rest of the channel. + fn announce_join(&self, join: events::Join) -> impl Future { + let mut futures = Vec::new(); + + let broadcast = Arc::new(events::JoinBroadcast::from(join)); + + for member in &self.members { + futures.push(member.send(broadcast.clone())); + } + + async { + futures_util::future::try_join_all(futures).await.unwrap(); + } + } +} + +impl Actor for Channel { + type Context = Context; +} + +impl actix::Handler for Channel { + type Result = events::JoinResult; + + fn handle(&mut self, msg: events::Join, ctx: &mut Self::Context) -> Self::Result { + self.members.push(msg.user.clone()); + + ctx.spawn(self.announce_join(msg).into_actor(self)); + + Ok(Handle { + message: ctx.address().recipient(), + }) + } +} + +impl actix::Handler for Channel { + type Result = (); + + fn handle( + &mut self, + _msg: super::common_events::Message, + _ctx: &mut Self::Context, + ) -> Self::Result { + } +} diff --git a/titanirc-server/src/entities/user.rs b/titanirc-server/src/entities/user.rs deleted file mode 100644 index 738f283..0000000 --- a/titanirc-server/src/entities/user.rs +++ /dev/null @@ -1,148 +0,0 @@ -use actix::{ - io::{FramedWrite, WriteHandler}, - prelude::*, -}; -use std::time::{Duration, Instant}; -use titanirc_types::Command; -use tokio::{io::WriteHalf, net::TcpStream}; - -pub mod events { - use actix::prelude::*; - - #[derive(Message)] - #[rtype(result = "()")] - pub struct NameChange { - pub old: String, - pub new: String, - } -} - -pub struct User { - pub server: Addr, - pub writer: - FramedWrite, titanirc_codec::Encoder>, - pub last_active: Instant, - pub nick: Option, // should probably be an arc so we dont have to keep cloning the string -} - -fn schedule_ping(ctx: &mut ::Context) { - ctx.run_later(Duration::from_secs(30), |act, ctx| { - if Instant::now().duration_since(act.last_active) > Duration::from_secs(240) { - // send `QUIT :Ping timeout: 120 seconds` & `ERROR :Closing Link: {ip} (Ping timeout: 120 seconds)` - eprintln!("ping timeout"); - ctx.stop(); - } - - act.writer.write(titanirc_types::ServerMessage::Ping); - schedule_ping(ctx); - }); -} - -impl Actor for User { - type Context = Context; - - fn started(&mut self, ctx: &mut Self::Context) { - schedule_ping(ctx); - } -} - -impl WriteHandler for User {} - -impl StreamHandler> for User { - /// This is main event loop for client requests - fn handle(&mut self, cmd: Result, ctx: &mut Self::Context) { - self.last_active = Instant::now(); - - match cmd { - Ok(Command::Nick(titanirc_types::NickCommand { nick })) => { - self.writer.write(titanirc_types::Reply::RplWelcome.into()); - self.writer.write(titanirc_types::Reply::RplYourHost.into()); - self.writer.write(titanirc_types::Reply::RplCreated.into()); - self.writer.write(titanirc_types::Reply::RplMyInfo.into()); - self.writer.write(titanirc_types::Reply::RplISupport.into()); - self.nick = Some(nick.0); - // LUSERS - // RPL_UMODEIS - // MOTD - } - Ok(Command::Join(titanirc_types::JoinCommand { channel })) => { - if let Some(ref nick) = self.nick { - let server_addr = self.server.clone(); - let ctx_addr = ctx.address(); - let nick = nick.clone(); - - ctx.spawn( - async move { - server_addr - .send(crate::entities::channel::events::Join { - channel_name: channel.0, - user: ctx_addr, - nick, - }) - .await - .unwrap() - .unwrap(); - - println!("joined chan!"); - } - .into_actor(self), - ); - } - } - Ok(Command::Mode(titanirc_types::ModeCommand { mode, .. })) => self - .writer - .write(titanirc_types::Reply::RplUmodeIs(mode).into()), - Ok(Command::Motd(_)) => { - self.writer.write( - titanirc_types::Reply::RplMotdStart(titanirc_types::ServerName( - "my.test.server".to_string(), - )) - .into(), - ); - self.writer.write( - titanirc_types::Reply::RplMotd(titanirc_types::FreeText( - "Hello, welcome to this server!".to_string(), - )) - .into(), - ); - self.writer.write( - titanirc_types::Reply::RplMotd(titanirc_types::FreeText( - "it's very cool!".to_string(), - )) - .into(), - ); - self.writer - .write(titanirc_types::Reply::RplEndOfMotd.into()); - } - Ok(Command::Version(_)) => self.writer.write( - titanirc_types::Reply::RplVersion( - clap::crate_version!().to_string(), - "release".to_string(), - titanirc_types::ServerName("my.test.server".to_string()), - titanirc_types::FreeText("https://github.com/MITBorg/titanirc".to_string()), - ) - .into(), - ), - Ok(Command::Pong(_)) => {} - Ok(cmd) => println!("cmd: {:?}", cmd), - Err(e) => eprintln!("error decoding: {}", e), - } - } -} - -impl actix::Handler> for User { - type Result = (); - - fn handle( - &mut self, - msg: std::sync::Arc, - ctx: &mut Self::Context, - ) -> Self::Result { - self.writer.write(titanirc_types::ServerMessage::Command( - titanirc_types::Source::User(titanirc_types::Nick(msg.nick.clone())), - titanirc_types::Command::Join(titanirc_types::JoinCommand { - channel: titanirc_types::Channel::from(msg.channel_name.clone()), - }), - )); - } -} diff --git a/titanirc-server/src/entities/user/commands.rs b/titanirc-server/src/entities/user/commands.rs new file mode 100644 index 0000000..af5b679 --- /dev/null +++ b/titanirc-server/src/entities/user/commands.rs @@ -0,0 +1,113 @@ +use std::time::Instant; + +use actix::{Actor, AsyncContext, StreamHandler, WrapFuture}; +use titanirc_types::{Command, JoinCommand, ModeCommand, MotdCommand, NickCommand, VersionCommand}; + +pub trait CommandHandler: Actor { + fn handle_cmd(&mut self, command: T, ctx: &mut Self::Context); +} + +impl StreamHandler> for super::User { + fn handle(&mut self, cmd: Result, ctx: &mut Self::Context) { + self.last_active = Instant::now(); + + match cmd { + Ok(Command::Nick(v)) => self.handle_cmd(v, ctx), + Ok(Command::Join(v)) => self.handle_cmd(v, ctx), + Ok(Command::Mode(v)) => self.handle_cmd(v, ctx), + Ok(Command::Motd(v)) => self.handle_cmd(v, ctx), + Ok(Command::Version(v)) => self.handle_cmd(v, ctx), + Ok(Command::Pong(_)) => {} + Ok(cmd) => println!("cmd: {:?}", cmd), + Err(e) => eprintln!("error decoding: {}", e), + } + } +} + +// TODO: all the 'raw' writes using byte strings below probably need to +// be wrapped in something a bit more friendly. + +impl CommandHandler for super::User { + fn handle_cmd(&mut self, NickCommand { nick }: NickCommand, _ctx: &mut Self::Context) { + self.writer.write(titanirc_types::Reply::RplWelcome.into()); + self.writer.write(titanirc_types::Reply::RplYourHost.into()); + self.writer.write(titanirc_types::Reply::RplCreated.into()); + self.writer.write(titanirc_types::Reply::RplMyInfo.into()); + self.writer.write(titanirc_types::Reply::RplISupport.into()); + self.nick = Some(std::str::from_utf8(&nick.0[..]).unwrap().to_string()); + // LUSERS + // RPL_UMODEIS + // MOTD + } +} + +impl CommandHandler for super::User { + fn handle_cmd(&mut self, JoinCommand { channel }: JoinCommand, ctx: &mut Self::Context) { + if let Some(ref nick) = self.nick { + let server_addr = self.server.clone(); + let ctx_addr = ctx.address(); + let nick = nick.clone(); + + ctx.spawn( + async move { + server_addr + .send(crate::entities::channel::events::Join { + channel_name: std::str::from_utf8(&channel.0[..]).unwrap().to_string(), + user: ctx_addr, + nick, + }) + .await + .unwrap() + .unwrap(); + + println!("joined chan!"); + } + .into_actor(self), + ); + } + } +} + +impl CommandHandler for super::User { + fn handle_cmd(&mut self, ModeCommand { mode, .. }: ModeCommand, _ctx: &mut Self::Context) { + self.writer + .write(titanirc_types::Reply::RplUmodeIs(mode).into()) + } +} + +impl CommandHandler for super::User { + fn handle_cmd(&mut self, _command: MotdCommand, _ctx: &mut Self::Context) { + static SERVER_NAME: bytes::Bytes = bytes::Bytes::from_static(b"my.test.server"); + static MOTD1: bytes::Bytes = bytes::Bytes::from_static(b"Hello, welcome to this server!"); + static MOTD2: bytes::Bytes = bytes::Bytes::from_static(b"it's very cool!"); + + self.writer.write( + titanirc_types::Reply::RplMotdStart(titanirc_types::ServerName(SERVER_NAME.clone())) + .into(), + ); + self.writer + .write(titanirc_types::Reply::RplMotd(titanirc_types::FreeText(MOTD1.clone())).into()); + self.writer + .write(titanirc_types::Reply::RplMotd(titanirc_types::FreeText(MOTD2.clone())).into()); + self.writer + .write(titanirc_types::Reply::RplEndOfMotd.into()); + } +} + +impl CommandHandler for super::User { + fn handle_cmd(&mut self, _command: VersionCommand, _ctx: &mut Self::Context) { + static SERVER_NAME: bytes::Bytes = bytes::Bytes::from_static(b"my.test.server"); + static INFO: bytes::Bytes = + bytes::Bytes::from_static(b"https://github.com/MITBorg/titanirc"); + + self.writer.write( + titanirc_types::Reply::RplVersion( + clap::crate_version!().to_string(), + "release".to_string(), + titanirc_types::ServerName(SERVER_NAME.clone()), + titanirc_types::FreeText(INFO.clone()), + ) + .into(), + ) + } +} diff --git a/titanirc-server/src/entities/user/events.rs b/titanirc-server/src/entities/user/events.rs new file mode 100644 index 0000000..0f5feb4 --- /dev/null +++ b/titanirc-server/src/entities/user/events.rs @@ -0,0 +1,8 @@ +use actix::prelude::*; + +#[derive(Message)] +#[rtype(result = "()")] +pub struct NameChange { + pub old: String, + pub new: String, +} diff --git a/titanirc-server/src/entities/user/mod.rs b/titanirc-server/src/entities/user/mod.rs new file mode 100644 index 0000000..c0eff2a --- /dev/null +++ b/titanirc-server/src/entities/user/mod.rs @@ -0,0 +1,78 @@ +mod commands; +pub mod events; + +use crate::{entities::channel::events::JoinBroadcast, server::Server}; + +use std::sync::Arc; + +use actix::{ + io::{FramedWrite, WriteHandler}, + prelude::*, +}; +use std::time::{Duration, Instant}; +use titanirc_types::{Channel, JoinCommand, ServerMessage, Source}; +use tokio::{io::WriteHalf, net::TcpStream}; + +pub struct User { + pub server: Addr, + pub writer: FramedWrite, titanirc_codec::Encoder>, + pub last_active: Instant, + pub nick: Option, +} + +// TODO: broadcast a leave to all the user's channels on actor shutdown + +impl User { + pub fn new( + server: Addr, + writer: FramedWrite, titanirc_codec::Encoder>, + ) -> Self { + Self { + server, + writer, + last_active: Instant::now(), + nick: None, + } + } +} + +/// Sends a ping to the user every 30 seconds, if we haven't received a protocol +/// message from the user in 240 seconds then disconnect. +fn schedule_ping(ctx: &mut ::Context) { + ctx.run_later(Duration::from_secs(30), |act, ctx| { + if Instant::now().duration_since(act.last_active) > Duration::from_secs(240) { + // send `QUIT :Ping timeout: 120 seconds` & `ERROR :Closing Link: {ip} (Ping timeout: 120 seconds)` + ctx.stop(); + } + + act.writer.write(titanirc_types::ServerMessage::Ping); + schedule_ping(ctx); + }); +} + +impl Actor for User { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + schedule_ping(ctx); + } +} + +/// Handles errors from our socket Writer. +impl WriteHandler for User {} + +/// Handles `JoinBroadcast`s sent by a channel the user is in, and forwards a +/// `JOIN` onto them. +impl actix::Handler> for User { + type Result = (); + + fn handle(&mut self, msg: Arc, _ctx: &mut Self::Context) -> Self::Result { + self.writer.write(ServerMessage::Command( + Source::User(bytes::Bytes::from(msg.nick.as_bytes().to_owned()).into()), + JoinCommand { + channel: Channel::from(bytes::Bytes::from(msg.channel_name.as_bytes().to_owned())), + } + .into(), + )); + } +} diff --git a/titanirc-server/src/main.rs b/titanirc-server/src/main.rs index 6771939..8ae5e23 100644 --- a/titanirc-server/src/main.rs +++ b/titanirc-server/src/main.rs @@ -5,8 +5,6 @@ mod entities; mod error; mod server; -use std::collections::HashMap; - use crate::{ error::Result, server::{Connection, Server}, @@ -29,6 +27,7 @@ async fn main() -> Result<()> { .await .map_err(InitError::TcpBind)?; + // connection acceptor loop let stream = async_stream::stream! { loop { match listener.accept().await { @@ -38,11 +37,10 @@ async fn main() -> Result<()> { } }; + // Spawn the server and pass connections from `stream` to `Handler`. Server::create(move |ctx| { ctx.add_message_stream(stream); - Server { - channels: HashMap::new(), - } + Server::new() }); println!("Running IRC server on 0.0.0.0:6667"); diff --git a/titanirc-server/src/server.rs b/titanirc-server/src/server.rs index f243971..35b72cd 100644 --- a/titanirc-server/src/server.rs +++ b/titanirc-server/src/server.rs @@ -3,14 +3,28 @@ use crate::entities::{channel::Channel, user::User}; use std::{collections::HashMap, net::SocketAddr}; use actix::{io::FramedWrite, prelude::*}; -use futures_util::future::TryFutureExt; use tokio::net::TcpStream; use tokio_util::codec::FramedRead; +/// The core of our server: +/// +/// - Handles incoming connections and spawns `User` actors for each. +/// - Handles channel creation, access control, operator config, etc. +/// +/// Essentially acts as the middleman for each entity communicating with each other. pub struct Server { + /// A list of known channels and the addresses to them. pub channels: HashMap>, } +impl Server { + pub fn new() -> Self { + Self { + channels: HashMap::new(), + } + } +} + impl Actor for Server { type Context = Context; } @@ -19,6 +33,9 @@ impl Actor for Server { #[rtype(result = "()")] pub struct Connection(pub TcpStream, pub SocketAddr); +/// Handles incoming connections from our connection acceptor loop and spawns +/// a `User` actor for each which handles reading from the socket and acting +/// accordingly. impl Handler for Server { type Result = (); @@ -27,37 +44,39 @@ impl Handler for Server { User::create(move |ctx| { let (read, write) = tokio::io::split(stream); - User::add_stream(FramedRead::new(read, titanirc_codec::Decoder), ctx); - User { - server: server_ctx.address(), - writer: FramedWrite::new(write, titanirc_codec::Encoder, ctx), - last_active: std::time::Instant::now(), - nick: None, - } + let read = FramedRead::new(read, titanirc_codec::Decoder); + let write = FramedWrite::new(write, titanirc_codec::Encoder, ctx); + + // Make our new `User` handle all events from this socket in `StreamHandler>`. + ctx.add_stream(read); + + User::new(server_ctx.address(), write) }); } } +/// Send by `User` actors to arbitrate access to the requested channel. impl Handler for Server { type Result = ResponseActFuture; fn handle( &mut self, msg: crate::entities::channel::events::Join, - ctx: &mut Self::Context, + _ctx: &mut Self::Context, ) -> Self::Result { + // get the channel or create it if it doesn't already exist + #[allow(clippy::option_if_let_else)] let channel = if let Some(channel) = self.channels.get(&msg.channel_name) { channel } else { - let channel = Channel::create(|ctx| Channel { - members: Default::default(), - }); + let channel = Channel::create(|_ctx| Channel::new()); self.channels .entry(msg.channel_name.clone()) .or_insert(channel) }; + // forward the user's join event onto the channel Box::pin( channel .send(msg) diff --git a/titanirc-types/Cargo.toml b/titanirc-types/Cargo.toml index 562d290..b71e64f 100644 --- a/titanirc-types/Cargo.toml +++ b/titanirc-types/Cargo.toml @@ -10,4 +10,5 @@ edition = "2018" paste = "1.0" nom = "6.1" derive_more = "0.99" -bytes = "1.0" \ No newline at end of file +bytes = "1.0" +nom-bytes = { git = "https://github.com/w4/nom-bytes" } \ No newline at end of file diff --git a/titanirc-types/src/lib.rs b/titanirc-types/src/lib.rs index be5555f..582f5dc 100644 --- a/titanirc-types/src/lib.rs +++ b/titanirc-types/src/lib.rs @@ -7,15 +7,17 @@ mod replies; pub use crate::primitives::*; pub use crate::replies::{Reply, ServerMessage, Source}; +use bytes::Bytes; use nom::{ bytes::complete::{tag, take_till}, error::Error as NomError, }; +use nom_bytes::BytesWrapper; -fn parse_optional_source(input: &[u8]) -> nom::IResult<&[u8], &[u8]> { - let (rest, _) = tag(":")(input)?; +fn parse_optional_source(input: BytesWrapper) -> nom::IResult { + let (rest, _) = tag(":".as_bytes())(input)?; let (rest, _) = take_till(|c| c == b' ')(rest)?; - tag(" ")(rest) + tag(" ".as_bytes())(rest) } macro_rules! define_commands { @@ -35,19 +37,20 @@ macro_rules! define_commands { $(const [<$name _BYTES>]: &[u8] = stringify!($name).as_bytes();)* impl Command { - pub fn parse(input: &[u8]) -> Result, nom::Err>> { + pub fn parse(input: Bytes) -> Result, nom::Err>> { + let input = BytesWrapper::from(input); + // skip the optional source at the start of the message - let rest = if let Ok((rest, _)) = parse_optional_source(input) { - rest + let input = if let Ok((input, _)) = parse_optional_source(input.clone()) { + input } else { input }; - let (rest, kind) = take_till(|c| c == b' ')(rest)?; + let (params, command) = take_till(|c| c == b' ')(input)?; - // fix this shit - match std::str::from_utf8(kind).unwrap().to_uppercase().as_bytes() { - $([<$name _BYTES>] => Ok(Some(Self::[<$name:camel>]([<$name:camel Command>]::parse(rest)?)))),*, + match command.to_ascii_uppercase().as_ref() { + $([<$name _BYTES>] => Ok(Some(Self::[<$name:camel>]([<$name:camel Command>]::parse(params)?)))),*, _ => Ok(None) } } @@ -69,10 +72,10 @@ macro_rules! define_commands { impl [<$name:camel Command>] { #[allow(unused_variables)] - pub fn parse(rest: &[u8]) -> Result>> { + pub fn parse(rest: BytesWrapper) -> Result>> { $( $( - let (rest, _) = tag(" ")(rest)?; + let (rest, _) = tag(" ".as_bytes())(rest)?; let (rest, [<$param:snake>]) = $param::parse(rest)?; )* )* @@ -97,6 +100,12 @@ macro_rules! define_commands { Ok(()) } } + + impl Into for [<$name:camel Command>] { + fn into(self) -> Command { + Command::[<$name:camel>](self) + } + } )* } }; @@ -126,16 +135,17 @@ define_commands! { #[cfg(test)] mod tests { use super::Command; + use bytes::Bytes; #[test] fn parse_empty() { - assert!(matches!(Command::parse(b""), Ok(None))); + assert!(matches!(Command::parse(Bytes::from_static(b"")), Ok(None))); } #[test] fn parse_privmsg() { assert!(matches!( - Command::parse(b"PRIVMSG foo :baz"), + Command::parse(Bytes::from_static(b"PRIVMSG foo :baz")), Ok(Some(Command::Privmsg(super::PrivmsgCommand { receiver: super::Receiver::User(super::Nick(nick)), free_text: super::primitives::FreeText(msg), @@ -145,13 +155,8 @@ mod tests { #[test] fn parse_privmsg_opt_source() { - eprintln!( - "{:?}", - Command::parse(b":some-fake-source!dude@nice PRIVMSG foo :baz") - ); - assert!(matches!( - Command::parse(b":some-fake-source!dude@nice PRIVMSG foo :baz"), + Command::parse(Bytes::from_static(b":some-fake-source!dude@nice PRIVMSG foo :baz")), Ok(Some(Command::Privmsg(super::PrivmsgCommand { receiver: super::Receiver::User(super::Nick(nick)), free_text: super::primitives::FreeText(msg), diff --git a/titanirc-types/src/primitives.rs b/titanirc-types/src/primitives.rs index 7803339..c5ea3fc 100644 --- a/titanirc-types/src/primitives.rs +++ b/titanirc-types/src/primitives.rs @@ -1,17 +1,19 @@ +use bytes::Bytes; use derive_more::{Deref, From}; use nom::{ bytes::complete::{tag, take_till}, - combinator::{iterator, map_res}, + combinator::{iterator}, sequence::terminated, IResult, }; +use nom_bytes::BytesWrapper; pub trait ValidatingParser { fn validate(bytes: &[u8]) -> bool; } pub trait PrimitiveParser { - fn parse(bytes: &[u8]) -> IResult<&[u8], Self> + fn parse(bytes: BytesWrapper) -> IResult where Self: Sized; } @@ -29,15 +31,22 @@ macro_rules! noop_validator { macro_rules! free_text_primitive { ($name:ty) => { impl PrimitiveParser for $name { - fn parse(bytes: &[u8]) -> IResult<&[u8], Self> { - let (rest, _) = tag(b":")(bytes)?; - Ok((&[], Self(std::str::from_utf8(rest).unwrap().to_string()))) + fn parse(bytes: BytesWrapper) -> IResult { + let (rest, _) = tag(":".as_bytes())(bytes)?; + Ok((Bytes::new().into(), Self(rest.into()))) } } impl std::fmt::Display for $name { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(&self.0) + match std::str::from_utf8(&self.0[..]) { + Ok(v) => f.write_str(v), + Err(_e) => { + // todo: report this better + eprintln!("Invalid utf-8 in {}", stringify!($name)); + Err(std::fmt::Error) + } + } } } }; @@ -46,24 +55,30 @@ macro_rules! free_text_primitive { macro_rules! space_terminated_primitive { ($name:ty) => { impl PrimitiveParser for $name { - fn parse(bytes: &[u8]) -> IResult<&[u8], Self> { - let (rest, val) = map_res(take_till(|c| c == b' '), std::str::from_utf8)(bytes)?; + fn parse(bytes: BytesWrapper) -> IResult { + let (rest, val) = take_till(|c| c == b' ')(bytes.clone())?; - if !::validate(val.as_bytes()) { + if !::validate(&val[..]) { return Err(nom::Err::Failure(nom::error::Error::new( bytes, nom::error::ErrorKind::Verify, ))); } - // TODO: don't clone - Ok((rest, Self(val.to_string()))) + Ok((rest, Self(val.into()))) } } impl std::fmt::Display for $name { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(&self.0) + match std::str::from_utf8(&self.0[..]) { + Ok(v) => f.write_str(v), + Err(_e) => { + // todo: report this better + eprintln!("Invalid utf-8 in {}", stringify!($name)); + Err(std::fmt::Error) + } + } } } }; @@ -122,32 +137,32 @@ impl ValidatingParser for Special { } #[derive(Debug, Deref, From)] -pub struct Username(pub String); +pub struct Username(pub Bytes); space_terminated_primitive!(Username); noop_validator!(Username); #[derive(Debug, Deref, From)] -pub struct Mode(pub String); +pub struct Mode(pub Bytes); space_terminated_primitive!(Mode); noop_validator!(Mode); #[derive(Debug, Deref, From)] -pub struct HostName(pub String); +pub struct HostName(pub Bytes); space_terminated_primitive!(HostName); noop_validator!(HostName); #[derive(Debug, Deref, From)] -pub struct ServerName(pub String); +pub struct ServerName(pub Bytes); space_terminated_primitive!(ServerName); noop_validator!(ServerName); #[derive(Debug, Deref, From)] -pub struct RealName(pub String); +pub struct RealName(pub Bytes); space_terminated_primitive!(RealName); noop_validator!(RealName); #[derive(Debug, Deref, From)] -pub struct Nick(pub String); +pub struct Nick(pub Bytes); space_terminated_primitive!(Nick); // TODO: i feel like this would be better suited as a nom chomper to stop @@ -169,12 +184,12 @@ impl ValidatingParser for Nick { } #[derive(Debug, Deref, From)] -pub struct Channel(pub String); +pub struct Channel(pub Bytes); space_terminated_primitive!(Channel); noop_validator!(Channel); #[derive(Debug, Deref, From)] -pub struct FreeText(pub String); +pub struct FreeText(pub Bytes); free_text_primitive!(FreeText); noop_validator!(FreeText); @@ -183,12 +198,13 @@ pub struct Nicks(pub Vec); space_delimited_display!(Nicks); impl PrimitiveParser for Nicks { - fn parse(bytes: &[u8]) -> IResult<&[u8], Self> { - let mut it = iterator(bytes, terminated(take_till(|c| c == b' '), tag(b" "))); + fn parse(bytes: BytesWrapper) -> IResult { + let mut it = iterator( + bytes, + terminated(take_till(|c| c == b' '), tag(" ".as_bytes())), + ); - let parsed = it - .map(|v| Nick(std::str::from_utf8(v).unwrap().to_string())) - .collect(); + let parsed = it.map(|v| Nick(v.into())).collect(); it.finish() .map(move |(remaining, _)| (remaining, Self(parsed))) @@ -245,20 +261,20 @@ pub enum Receiver { } impl std::ops::Deref for Receiver { - type Target = String; + type Target = str; fn deref(&self) -> &Self::Target { - match self { + std::str::from_utf8(match self { Self::User(nick) => &*nick, Self::Channel(channel) => &*channel, - } + }) + .unwrap() } } impl PrimitiveParser for Receiver { - fn parse(bytes: &[u8]) -> IResult<&[u8], Self> { - if let Ok((_, _)) = nom::bytes::complete::tag::<_, _, nom::error::Error<&[u8]>>("#")(bytes) - { + fn parse(bytes: BytesWrapper) -> IResult { + if bytes.get(0) == Some(&b'#') { let (rest, channel) = Channel::parse(bytes)?; Ok((rest, Self::Channel(channel))) } else { diff --git a/titanirc-types/src/replies.rs b/titanirc-types/src/replies.rs index e89296f..0e8809b 100644 --- a/titanirc-types/src/replies.rs +++ b/titanirc-types/src/replies.rs @@ -1,3 +1,5 @@ +#![allow(clippy::wildcard_imports)] + use crate::{primitives::*, Command}; use std::fmt::Write; @@ -30,7 +32,7 @@ impl ServerMessage { Self::Pong => write!(dst, "PONG :{}", server_name), Self::Command(source, command) => { let source = match &source { - Source::User(nick) => nick.as_str(), + Source::User(nick) => std::str::from_utf8(nick).unwrap(), Source::Server => server_name, }; write!(dst, ":{} {}", source, command) -- libgit2 1.7.2