From afca2b1e4a1872dac57d2c7b3dde451e2f65d3c0 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Sun, 31 Jan 2021 20:29:35 +0000 Subject: [PATCH] Implement channel messaging --- titanirc-server/src/entities/channel/mod.rs | 27 +++++++++++++++++++-------- titanirc-server/src/entities/mod.rs | 7 ++++--- titanirc-server/src/entities/user/commands.rs | 35 +++++++++++++++++++++++++++++++++-- titanirc-server/src/entities/user/mod.rs | 27 +++++++++++++++++++++++++-- titanirc-server/src/server.rs | 23 ++++++++++++++++++++++- 5 files changed, 103 insertions(+), 16 deletions(-) diff --git a/titanirc-server/src/entities/channel/mod.rs b/titanirc-server/src/entities/channel/mod.rs index 3bc306d..6626c03 100644 --- a/titanirc-server/src/entities/channel/mod.rs +++ b/titanirc-server/src/entities/channel/mod.rs @@ -5,6 +5,8 @@ use std::sync::Arc; use crate::entities::user::User; +use self::events::JoinBroadcast; + /// A handle to this `Channel` that `User` actors can use to communicate with the /// rest of the channel. pub struct Handle { @@ -24,14 +26,19 @@ impl Channel { } } - /// Announce a user's join event to the rest of the channel. - fn announce_join(&self, join: events::Join) -> impl Future { - let mut futures = Vec::new(); + fn broadcast_message(&self, msg: M) -> impl Future + where + M: Message + Send + Sync, + M::Result: Send, + Arc: 'static, + User: Handler>, + { + let mut futures = Vec::with_capacity(self.members.len()); - let broadcast = Arc::new(events::JoinBroadcast::from(join)); + let msg = Arc::new(msg); for member in &self.members { - futures.push(member.send(broadcast.clone())); + futures.push(member.send(msg.clone())); } async { @@ -50,7 +57,10 @@ impl actix::Handler for Channel { fn handle(&mut self, msg: events::Join, ctx: &mut Self::Context) -> Self::Result { self.members.push(msg.user.clone()); - ctx.spawn(self.announce_join(msg).into_actor(self)); + ctx.spawn( + self.broadcast_message(JoinBroadcast::from(msg)) + .into_actor(self), + ); Ok(Handle { message: ctx.address().recipient(), @@ -63,8 +73,9 @@ impl actix::Handler for Channel { fn handle( &mut self, - _msg: super::common_events::Message, - _ctx: &mut Self::Context, + msg: super::common_events::Message, + ctx: &mut Self::Context, ) -> Self::Result { + ctx.spawn(self.broadcast_message(msg).into_actor(self)); } } diff --git a/titanirc-server/src/entities/mod.rs b/titanirc-server/src/entities/mod.rs index 56754c4..4e9e097 100644 --- a/titanirc-server/src/entities/mod.rs +++ b/titanirc-server/src/entities/mod.rs @@ -5,9 +5,10 @@ pub mod common_events { use actix::prelude::*; #[derive(Debug, Message)] - #[rtype(result = "()")] + #[rtype(result = "")] pub struct Message { - from: String, - message: String, + pub from: String, + pub to: String, + pub message: String, } } diff --git a/titanirc-server/src/entities/user/commands.rs b/titanirc-server/src/entities/user/commands.rs index af5b679..3bab702 100644 --- a/titanirc-server/src/entities/user/commands.rs +++ b/titanirc-server/src/entities/user/commands.rs @@ -1,7 +1,9 @@ -use std::time::Instant; +use std::{ops::Deref, time::Instant}; use actix::{Actor, AsyncContext, StreamHandler, WrapFuture}; -use titanirc_types::{Command, JoinCommand, ModeCommand, MotdCommand, NickCommand, VersionCommand}; +use titanirc_types::{ + Command, JoinCommand, ModeCommand, MotdCommand, NickCommand, PrivmsgCommand, VersionCommand, +}; pub trait CommandHandler: Actor { fn handle_cmd(&mut self, command: T, ctx: &mut Self::Context); @@ -16,6 +18,7 @@ impl StreamHandler> for super::User { Ok(Command::Join(v)) => self.handle_cmd(v, ctx), Ok(Command::Mode(v)) => self.handle_cmd(v, ctx), Ok(Command::Motd(v)) => self.handle_cmd(v, ctx), + Ok(Command::Privmsg(v)) => self.handle_cmd(v, ctx), Ok(Command::Version(v)) => self.handle_cmd(v, ctx), Ok(Command::Pong(_)) => {} Ok(cmd) => println!("cmd: {:?}", cmd), @@ -111,3 +114,31 @@ impl CommandHandler for super::User { ) } } + +impl CommandHandler for super::User { + fn handle_cmd( + &mut self, + PrivmsgCommand { + receiver, + free_text, + }: PrivmsgCommand, + ctx: &mut Self::Context, + ) { + if let Some(nick) = &self.nick { + let msg = crate::entities::common_events::Message { + from: nick.clone(), // TODO: this need to be a full user string i think + to: receiver.to_string(), + message: free_text.to_string(), + }; + + let server_addr = self.server.clone(); + + ctx.spawn( + async move { + server_addr.send(msg).await.unwrap(); + } + .into_actor(self), + ); + } + } +} diff --git a/titanirc-server/src/entities/user/mod.rs b/titanirc-server/src/entities/user/mod.rs index c0eff2a..0912d23 100644 --- a/titanirc-server/src/entities/user/mod.rs +++ b/titanirc-server/src/entities/user/mod.rs @@ -3,14 +3,16 @@ pub mod events; use crate::{entities::channel::events::JoinBroadcast, server::Server}; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use actix::{ io::{FramedWrite, WriteHandler}, prelude::*, }; use std::time::{Duration, Instant}; -use titanirc_types::{Channel, JoinCommand, ServerMessage, Source}; +use titanirc_types::{ + Channel, FreeText, JoinCommand, PrivmsgCommand, Receiver, ServerMessage, Source, +}; use tokio::{io::WriteHalf, net::TcpStream}; pub struct User { @@ -76,3 +78,24 @@ impl actix::Handler> for User { )); } } + +impl actix::Handler> for User { + type Result = (); + + fn handle( + &mut self, + msg: Arc, + _ctx: &mut Self::Context, + ) -> Self::Result { + self.writer.write(ServerMessage::Command( + Source::User(bytes::Bytes::from(msg.from.as_bytes().to_owned()).into()), + PrivmsgCommand { + free_text: FreeText(bytes::Bytes::from(msg.message.as_bytes().to_owned())), + receiver: { + Receiver::Channel(bytes::Bytes::from(msg.to.as_bytes().to_owned()).into()) + }, + } + .into(), + )); + } +} diff --git a/titanirc-server/src/server.rs b/titanirc-server/src/server.rs index 35b72cd..6a2cf04 100644 --- a/titanirc-server/src/server.rs +++ b/titanirc-server/src/server.rs @@ -55,7 +55,7 @@ impl Handler for Server { } } -/// Send by `User` actors to arbitrate access to the requested channel. +/// Sent by `User` actors to arbitrate access to the requested channel. impl Handler for Server { type Result = ResponseActFuture; @@ -85,3 +85,24 @@ impl Handler for Server { ) } } + +impl Handler for Server { + type Result = (); + + fn handle( + &mut self, + msg: crate::entities::common_events::Message, + ctx: &mut Self::Context, + ) -> Self::Result { + eprintln!("to: {}", msg.to); + + let channel = self.channels.get(&msg.to).unwrap().clone(); + + ctx.spawn( + async move { + channel.send(msg).await.unwrap(); + } + .into_actor(self), + ); + } +} -- libgit2 1.7.2