From 292cd71297abde6adbf1a8a01b8b72f0c046b622 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Sat, 30 Jan 2021 18:38:55 +0000 Subject: [PATCH] Introduce concept of channels --- Cargo.lock | 28 ++++++++++++++++++++++++++++ titanirc-server/Cargo.toml | 3 ++- titanirc-server/src/entities/channel.rs | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ titanirc-server/src/entities/mod.rs | 13 +++++++++++++ titanirc-server/src/entities/user.rs | 148 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ titanirc-server/src/main.rs | 8 ++++++-- titanirc-server/src/server.rs | 48 +++++++++++++++++++++++++++++++++++++++++------- titanirc-server/src/session.rs | 90 ------------------------------------------------------------------------------------------ titanirc-types/src/lib.rs | 2 +- 9 files changed, 336 insertions(+), 101 deletions(-) create mode 100644 titanirc-server/src/entities/channel.rs create mode 100644 titanirc-server/src/entities/mod.rs create mode 100644 titanirc-server/src/entities/user.rs delete mode 100644 titanirc-server/src/session.rs diff --git a/Cargo.lock b/Cargo.lock index 5ea1955..1a756ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -288,6 +288,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28be053525281ad8259d47e4de5de657b25e7bac113458555bb4b70bc6870500" [[package]] +name = "futures-macro" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c287d25add322d9f9abdcdc5927ca398917996600182178774032e9f8258fedd" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] name = "futures-sink" version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -309,9 +321,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "632a8cd0f2a4b3fdea1657f08bde063848c3bd00f9bbf6e256b8be78802e624b" dependencies = [ "futures-core", + "futures-macro", "futures-task", "pin-project-lite", "pin-utils", + "proc-macro-hack", + "proc-macro-nested", "slab", ] @@ -631,6 +646,18 @@ dependencies = [ ] [[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + +[[package]] +name = "proc-macro-nested" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" + +[[package]] name = "proc-macro2" version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -861,6 +888,7 @@ dependencies = [ "async-stream", "clap", "displaydoc", + "futures-util", "thiserror", "titanirc-codec", "titanirc-types", diff --git a/titanirc-server/Cargo.toml b/titanirc-server/Cargo.toml index 8cb9be8..1b4032c 100644 --- a/titanirc-server/Cargo.toml +++ b/titanirc-server/Cargo.toml @@ -17,4 +17,5 @@ tokio-util = "0.6" async-stream = "0.3" thiserror = "1" displaydoc = "0.1" -clap = "3.0.0-beta.2" \ No newline at end of file +clap = "3.0.0-beta.2" +futures-util = "0.3" \ No newline at end of file diff --git a/titanirc-server/src/entities/channel.rs b/titanirc-server/src/entities/channel.rs new file mode 100644 index 0000000..b399cc2 --- /dev/null +++ b/titanirc-server/src/entities/channel.rs @@ -0,0 +1,97 @@ +use std::sync::Arc; + +use actix::prelude::*; + +use crate::{entities::user::User, server::Server}; + +pub mod events { + use crate::entities::user::User; + use actix::prelude::*; + + pub type JoinResult = Result; + + #[derive(Message)] + #[rtype(result = "JoinResult")] + pub struct Join { + pub channel_name: String, + pub nick: String, + pub user: Addr, + } + + #[derive(Debug, thiserror::Error, displaydoc::Display)] + pub enum JoinError { + /// Failed to send join request to channel: {0} + Mailbox(#[from] actix::MailboxError), + } + + #[derive(Message)] + #[rtype(result = "")] + pub struct JoinBroadcast { + pub channel_name: String, + pub nick: String, + } + + impl From for JoinBroadcast { + fn from( + Join { + channel_name, nick, .. + }: Join, + ) -> Self { + Self { channel_name, nick } + } + } +} + +pub struct Handle { + //pub name_change: actix::Recipient, + pub message: actix::Recipient, +} + +pub struct Channel { + pub members: Vec>, +} + +impl Channel { + fn announce_join(&self, join: events::Join) -> impl Future { + let mut futures = Vec::new(); + + let broadcast = Arc::new(events::JoinBroadcast::from(join)); + + for member in &self.members { + futures.push(member.send(broadcast.clone())); + } + + async { + futures_util::future::try_join_all(futures).await.unwrap(); + } + } +} + +impl Actor for Channel { + type Context = Context; +} + +impl actix::Handler for Channel { + type Result = events::JoinResult; + + fn handle(&mut self, msg: events::Join, ctx: &mut Self::Context) -> Self::Result { + self.members.push(msg.user.clone()); + + ctx.spawn(self.announce_join(msg).into_actor(self)); + + Ok(Handle { + message: ctx.address().recipient(), + }) + } +} + +impl actix::Handler for Channel { + type Result = (); + + fn handle( + &mut self, + msg: super::common_events::Message, + ctx: &mut Self::Context, + ) -> Self::Result { + } +} diff --git a/titanirc-server/src/entities/mod.rs b/titanirc-server/src/entities/mod.rs new file mode 100644 index 0000000..56754c4 --- /dev/null +++ b/titanirc-server/src/entities/mod.rs @@ -0,0 +1,13 @@ +pub mod channel; +pub mod user; + +pub mod common_events { + use actix::prelude::*; + + #[derive(Debug, Message)] + #[rtype(result = "()")] + pub struct Message { + from: String, + message: String, + } +} diff --git a/titanirc-server/src/entities/user.rs b/titanirc-server/src/entities/user.rs new file mode 100644 index 0000000..738f283 --- /dev/null +++ b/titanirc-server/src/entities/user.rs @@ -0,0 +1,148 @@ +use actix::{ + io::{FramedWrite, WriteHandler}, + prelude::*, +}; +use std::time::{Duration, Instant}; +use titanirc_types::Command; +use tokio::{io::WriteHalf, net::TcpStream}; + +pub mod events { + use actix::prelude::*; + + #[derive(Message)] + #[rtype(result = "()")] + pub struct NameChange { + pub old: String, + pub new: String, + } +} + +pub struct User { + pub server: Addr, + pub writer: + FramedWrite, titanirc_codec::Encoder>, + pub last_active: Instant, + pub nick: Option, // should probably be an arc so we dont have to keep cloning the string +} + +fn schedule_ping(ctx: &mut ::Context) { + ctx.run_later(Duration::from_secs(30), |act, ctx| { + if Instant::now().duration_since(act.last_active) > Duration::from_secs(240) { + // send `QUIT :Ping timeout: 120 seconds` & `ERROR :Closing Link: {ip} (Ping timeout: 120 seconds)` + eprintln!("ping timeout"); + ctx.stop(); + } + + act.writer.write(titanirc_types::ServerMessage::Ping); + schedule_ping(ctx); + }); +} + +impl Actor for User { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + schedule_ping(ctx); + } +} + +impl WriteHandler for User {} + +impl StreamHandler> for User { + /// This is main event loop for client requests + fn handle(&mut self, cmd: Result, ctx: &mut Self::Context) { + self.last_active = Instant::now(); + + match cmd { + Ok(Command::Nick(titanirc_types::NickCommand { nick })) => { + self.writer.write(titanirc_types::Reply::RplWelcome.into()); + self.writer.write(titanirc_types::Reply::RplYourHost.into()); + self.writer.write(titanirc_types::Reply::RplCreated.into()); + self.writer.write(titanirc_types::Reply::RplMyInfo.into()); + self.writer.write(titanirc_types::Reply::RplISupport.into()); + self.nick = Some(nick.0); + // LUSERS + // RPL_UMODEIS + // MOTD + } + Ok(Command::Join(titanirc_types::JoinCommand { channel })) => { + if let Some(ref nick) = self.nick { + let server_addr = self.server.clone(); + let ctx_addr = ctx.address(); + let nick = nick.clone(); + + ctx.spawn( + async move { + server_addr + .send(crate::entities::channel::events::Join { + channel_name: channel.0, + user: ctx_addr, + nick, + }) + .await + .unwrap() + .unwrap(); + + println!("joined chan!"); + } + .into_actor(self), + ); + } + } + Ok(Command::Mode(titanirc_types::ModeCommand { mode, .. })) => self + .writer + .write(titanirc_types::Reply::RplUmodeIs(mode).into()), + Ok(Command::Motd(_)) => { + self.writer.write( + titanirc_types::Reply::RplMotdStart(titanirc_types::ServerName( + "my.test.server".to_string(), + )) + .into(), + ); + self.writer.write( + titanirc_types::Reply::RplMotd(titanirc_types::FreeText( + "Hello, welcome to this server!".to_string(), + )) + .into(), + ); + self.writer.write( + titanirc_types::Reply::RplMotd(titanirc_types::FreeText( + "it's very cool!".to_string(), + )) + .into(), + ); + self.writer + .write(titanirc_types::Reply::RplEndOfMotd.into()); + } + Ok(Command::Version(_)) => self.writer.write( + titanirc_types::Reply::RplVersion( + clap::crate_version!().to_string(), + "release".to_string(), + titanirc_types::ServerName("my.test.server".to_string()), + titanirc_types::FreeText("https://github.com/MITBorg/titanirc".to_string()), + ) + .into(), + ), + Ok(Command::Pong(_)) => {} + Ok(cmd) => println!("cmd: {:?}", cmd), + Err(e) => eprintln!("error decoding: {}", e), + } + } +} + +impl actix::Handler> for User { + type Result = (); + + fn handle( + &mut self, + msg: std::sync::Arc, + ctx: &mut Self::Context, + ) -> Self::Result { + self.writer.write(titanirc_types::ServerMessage::Command( + titanirc_types::Source::User(titanirc_types::Nick(msg.nick.clone())), + titanirc_types::Command::Join(titanirc_types::JoinCommand { + channel: titanirc_types::Channel::from(msg.channel_name.clone()), + }), + )); + } +} diff --git a/titanirc-server/src/main.rs b/titanirc-server/src/main.rs index 95f8781..6771939 100644 --- a/titanirc-server/src/main.rs +++ b/titanirc-server/src/main.rs @@ -1,9 +1,11 @@ #![deny(clippy::pedantic)] #![allow(clippy::missing_errors_doc)] +mod entities; mod error; mod server; -mod session; + +use std::collections::HashMap; use crate::{ error::Result, @@ -38,7 +40,9 @@ async fn main() -> Result<()> { Server::create(move |ctx| { ctx.add_message_stream(stream); - Server {} + Server { + channels: HashMap::new(), + } }); println!("Running IRC server on 0.0.0.0:6667"); diff --git a/titanirc-server/src/server.rs b/titanirc-server/src/server.rs index e0af39d..f243971 100644 --- a/titanirc-server/src/server.rs +++ b/titanirc-server/src/server.rs @@ -1,12 +1,15 @@ -use crate::session::Session; +use crate::entities::{channel::Channel, user::User}; -use std::net::SocketAddr; +use std::{collections::HashMap, net::SocketAddr}; use actix::{io::FramedWrite, prelude::*}; +use futures_util::future::TryFutureExt; use tokio::net::TcpStream; use tokio_util::codec::FramedRead; -pub struct Server {} +pub struct Server { + pub channels: HashMap>, +} impl Actor for Server { type Context = Context; @@ -19,16 +22,47 @@ pub struct Connection(pub TcpStream, pub SocketAddr); impl Handler for Server { type Result = (); - fn handle(&mut self, Connection(stream, remote): Connection, _ctx: &mut Self::Context) { + fn handle(&mut self, Connection(stream, remote): Connection, server_ctx: &mut Self::Context) { println!("Accepted connection from {}", remote); - Session::create(move |ctx| { + User::create(move |ctx| { let (read, write) = tokio::io::split(stream); - Session::add_stream(FramedRead::new(read, titanirc_codec::Decoder), ctx); - Session { + User::add_stream(FramedRead::new(read, titanirc_codec::Decoder), ctx); + User { + server: server_ctx.address(), writer: FramedWrite::new(write, titanirc_codec::Encoder, ctx), last_active: std::time::Instant::now(), + nick: None, } }); } } + +impl Handler for Server { + type Result = ResponseActFuture; + + fn handle( + &mut self, + msg: crate::entities::channel::events::Join, + ctx: &mut Self::Context, + ) -> Self::Result { + let channel = if let Some(channel) = self.channels.get(&msg.channel_name) { + channel + } else { + let channel = Channel::create(|ctx| Channel { + members: Default::default(), + }); + + self.channels + .entry(msg.channel_name.clone()) + .or_insert(channel) + }; + + Box::pin( + channel + .send(msg) + .into_actor(self) + .map(|v, _, _| v.map_err(|e| e.into()).and_then(|v| v)), + ) + } +} diff --git a/titanirc-server/src/session.rs b/titanirc-server/src/session.rs deleted file mode 100644 index f87cd9f..0000000 --- a/titanirc-server/src/session.rs +++ /dev/null @@ -1,90 +0,0 @@ -use actix::{ - io::{FramedWrite, WriteHandler}, - prelude::*, -}; -use std::time::{Duration, Instant}; -use titanirc_types::Command; -use tokio::{io::WriteHalf, net::TcpStream}; - -pub struct Session { - pub writer: - FramedWrite, titanirc_codec::Encoder>, - pub last_active: Instant, -} - -fn schedule_ping(ctx: &mut ::Context) { - ctx.run_later(Duration::from_secs(30), |act, ctx| { - if Instant::now().duration_since(act.last_active) > Duration::from_secs(240) { - // send `QUIT :Ping timeout: 120 seconds` & `ERROR :Closing Link: {ip} (Ping timeout: 120 seconds)` - eprintln!("ping timeout"); - ctx.stop(); - } - - act.writer.write(titanirc_types::ServerMessage::Ping); - schedule_ping(ctx); - }); -} - -impl Actor for Session { - type Context = Context; - - fn started(&mut self, ctx: &mut Self::Context) { - schedule_ping(ctx); - } -} - -impl WriteHandler for Session {} - -impl StreamHandler> for Session { - /// This is main event loop for client requests - fn handle(&mut self, cmd: Result, _ctx: &mut Self::Context) { - self.last_active = Instant::now(); - - match cmd { - Ok(Command::Nick(_v)) => { - self.writer.write(titanirc_types::Reply::RplWelcome.into()); - self.writer.write(titanirc_types::Reply::RplYourHost.into()); - self.writer.write(titanirc_types::Reply::RplCreated.into()); - self.writer.write(titanirc_types::Reply::RplMyInfo.into()); - self.writer.write(titanirc_types::Reply::RplISupport.into()); - } - Ok(Command::Mode(titanirc_types::ModeCommand { mode, .. })) => self - .writer - .write(titanirc_types::Reply::RplUmodeIs(mode).into()), - Ok(Command::Motd(_)) => { - self.writer.write( - titanirc_types::Reply::RplMotdStart(titanirc_types::ServerName( - "my.test.server".to_string(), - )) - .into(), - ); - self.writer.write( - titanirc_types::Reply::RplMotd(titanirc_types::FreeText( - "Hello, welcome to this server!".to_string(), - )) - .into(), - ); - self.writer.write( - titanirc_types::Reply::RplMotd(titanirc_types::FreeText( - "it's very cool!".to_string(), - )) - .into(), - ); - self.writer - .write(titanirc_types::Reply::RplEndOfMotd.into()); - } - Ok(Command::Version(_)) => self.writer.write( - titanirc_types::Reply::RplVersion( - clap::crate_version!().to_string(), - "release".to_string(), - titanirc_types::ServerName("my.test.server".to_string()), - titanirc_types::FreeText("https://github.com/MITBorg/titanirc".to_string()), - ) - .into(), - ), - Ok(Command::Pong(_)) => {} - Ok(cmd) => println!("cmd: {:?}", cmd), - Err(e) => eprintln!("error decoding: {}", e), - } - } -} diff --git a/titanirc-types/src/lib.rs b/titanirc-types/src/lib.rs index 81e6b3c..be5555f 100644 --- a/titanirc-types/src/lib.rs +++ b/titanirc-types/src/lib.rs @@ -5,7 +5,7 @@ mod primitives; mod replies; pub use crate::primitives::*; -pub use crate::replies::{Reply, ServerMessage}; +pub use crate::replies::{Reply, ServerMessage, Source}; use nom::{ bytes::complete::{tag, take_till}, -- libgit2 1.7.2