From a96eedd20338cd64b14e558b8c679f6a9c44827b Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Tue, 2 Feb 2021 04:02:11 +0000 Subject: [PATCH] User must have handle before sending messages to channel --- titanirc-server/src/entities/channel/events.rs | 32 +++++--------------------------- titanirc-server/src/entities/channel/mod.rs | 76 ++++++++++++++++++++++++++++++++++++++++++---------------------------------- titanirc-server/src/entities/mod.rs | 13 +++++++++++-- titanirc-server/src/entities/user/commands.rs | 80 ++++++++++++++++++++++++++++++++++++++++---------------------------------------- titanirc-server/src/entities/user/mod.rs | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++++------------- titanirc-server/src/server.rs | 73 ++++++++++++++++++------------------------------------------------------- titanirc-types/src/protocol/replies.rs | 3 +-- 7 files changed, 171 insertions(+), 173 deletions(-) diff --git a/titanirc-server/src/entities/channel/events.rs b/titanirc-server/src/entities/channel/events.rs index 4662145..74c2e3e 100644 --- a/titanirc-server/src/entities/channel/events.rs +++ b/titanirc-server/src/entities/channel/events.rs @@ -2,39 +2,17 @@ use crate::entities::user::{User, UserUuid}; use actix::prelude::*; use titanirc_types::RegisteredNick; -pub type JoinResult = Result; + /// Send from `User` to `Channel` via `Server`, the `Channel` then replies back /// with a direct handle for the `User` to interact with the `Channel`. +/// +/// This message is also broadcast to all users in the `Channel`. #[derive(Message)] -#[rtype(result = "JoinResult")] +#[rtype(result = "")] pub struct Join { - pub channel_name: String, + pub channel_name: bytes::Bytes, pub user_uuid: UserUuid, pub nick: RegisteredNick, pub user: Addr, } - -#[derive(Debug, thiserror::Error, displaydoc::Display)] -pub enum JoinError { - /// Failed to send join request to channel: {0} - Mailbox(#[from] actix::MailboxError), -} - -/// Sent directly to every `User` when another `User` joins the channel. -#[derive(Message)] -#[rtype(result = "")] -pub struct JoinBroadcast { - pub channel_name: String, - pub nick: RegisteredNick, -} - -impl From for JoinBroadcast { - fn from( - Join { - channel_name, nick, .. - }: Join, - ) -> Self { - Self { channel_name, nick } - } -} diff --git a/titanirc-server/src/entities/channel/mod.rs b/titanirc-server/src/entities/channel/mod.rs index 17dffec..842325f 100644 --- a/titanirc-server/src/entities/channel/mod.rs +++ b/titanirc-server/src/entities/channel/mod.rs @@ -1,47 +1,61 @@ pub mod events; use actix::prelude::*; +use bytes::Bytes; +use derive_more::Deref; use std::{collections::HashMap, sync::Arc}; use crate::entities::user::{User, UserUuid}; -use self::events::JoinBroadcast; +#[derive(Clone, Debug, Deref, Eq, PartialEq, Hash)] +#[allow(clippy::module_name_repetitions)] +pub struct ChannelName(Bytes); -/// A handle to this `Channel` that `User` actors can use to communicate with the -/// rest of the channel. +impl ChannelName { + pub fn new(name: Bytes) -> Self { + Self(name) + } +} + +impl std::borrow::Borrow<[u8]> for ChannelName { + fn borrow(&self) -> &[u8] { + &self.0[..] + } +} + +/// A handle to this `Channel` that `User` actors can use to communicate with +/// the rest of the channel. This is sent back to the `User` when it sends a +/// `events::Join`. +#[derive(Message)] +#[rtype(result = "")] pub struct Handle { + pub channel_name: ChannelName, //pub name_change: actix::Recipient, - pub message: actix::Recipient, + pub message: actix::Recipient, } /// An IRC channel. pub struct Channel { + pub channel_name: ChannelName, pub members: HashMap>, } impl Channel { - pub fn new() -> Self { + pub fn new(channel_name: ChannelName) -> Self { Self { + channel_name, members: HashMap::new(), } } // TODO: add a flag not to broadcast messages to the source so PRIVMSGs dont get duplicated - fn broadcast_message( - &self, - skip_sender: Option, - msg: M, - ) -> impl Future + fn broadcast_message(&self, skip_sender: Option, msg: Arc) where M: Message + Send + Sync, M::Result: Send, Arc: 'static, User: Handler>, { - let mut futures = Vec::with_capacity(self.members.len()); - - let msg = Arc::new(msg); - for (uuid, member) in &self.members { if let Some(skip_sender) = &skip_sender { if skip_sender == uuid { @@ -49,11 +63,8 @@ impl Channel { } } - futures.push(member.send(msg.clone())); - } - - async { - futures_util::future::try_join_all(futures).await.unwrap(); + // TODO: should this just be `send`? + member.do_send(msg.clone()); } } } @@ -63,34 +74,31 @@ impl Actor for Channel { } impl actix::Handler for Channel { - type Result = events::JoinResult; + type Result = (); + + fn handle(&mut self, event: events::Join, ctx: &mut Self::Context) -> Self::Result { + self.members.insert(event.user_uuid, event.user.clone()); - fn handle(&mut self, msg: events::Join, ctx: &mut Self::Context) -> Self::Result { - self.members.insert(msg.user_uuid, msg.user.clone()); + let event = Arc::new(event); - ctx.spawn( - self.broadcast_message(None, JoinBroadcast::from(msg)) - .into_actor(self), - ); + self.broadcast_message(None, Arc::clone(&event)); - Ok(Handle { + event.user.do_send(Handle { + channel_name: self.channel_name.clone(), message: ctx.address().recipient(), }) } } -impl actix::Handler for Channel { +impl actix::Handler for Channel { type Result = (); fn handle( &mut self, - msg: super::common_events::Message, - ctx: &mut Self::Context, + msg: super::common_events::ChannelMessage, + _ctx: &mut Self::Context, ) -> Self::Result { // TODO: don't allow messages from unconnected clients - ctx.spawn( - self.broadcast_message(Some(msg.user_uuid), msg) - .into_actor(self), - ); + self.broadcast_message(Some(msg.0.user_uuid), Arc::new(msg)); } } diff --git a/titanirc-server/src/entities/mod.rs b/titanirc-server/src/entities/mod.rs index 90cbc3f..602f4c2 100644 --- a/titanirc-server/src/entities/mod.rs +++ b/titanirc-server/src/entities/mod.rs @@ -2,14 +2,23 @@ pub mod channel; pub mod user; pub mod common_events { + use std::fmt::Debug; + use actix::prelude::*; #[derive(Debug, Message)] #[rtype(result = "")] - pub struct Message { + pub struct ChannelMessage(pub Message>); + + #[derive(Debug, Message)] + #[rtype(result = "")] + pub struct UserMessage(pub Message>); + + #[derive(Debug)] + pub struct Message { pub from: titanirc_types::RegisteredNick, pub user_uuid: crate::entities::user::UserUuid, - pub to: titanirc_types::protocol::primitives::Receiver<'static>, + pub to: T, pub message: String, } } diff --git a/titanirc-server/src/entities/user/commands.rs b/titanirc-server/src/entities/user/commands.rs index 6d6eadb..3685514 100644 --- a/titanirc-server/src/entities/user/commands.rs +++ b/titanirc-server/src/entities/user/commands.rs @@ -2,7 +2,7 @@ use std::{sync::Arc, time::Instant}; -use actix::{Actor, AsyncContext, StreamHandler, WrapFuture}; +use actix::{Actor, AsyncContext, StreamHandler}; use titanirc_types::protocol::{ commands::{ Command, JoinCommand, ModeCommand, MotdCommand, NickCommand, PrivmsgCommand, VersionCommand, @@ -62,30 +62,13 @@ impl CommandHandler> for super::User { ctx: &mut Self::Context, ) { // TODO: ensure the user has a nick set before they join a channel!!! - - let server_addr = self.server.clone(); - let ctx_addr = ctx.address(); - let nick = self.nick.clone(); - let user_uuid = self.session_id; - // TODO: needs to send MODE & NAMES (353, 366) - ctx.spawn( - async move { - server_addr - .send(crate::entities::channel::events::Join { - channel_name: std::str::from_utf8(&channel.0[..]).unwrap().to_string(), - user_uuid, - user: ctx_addr, - nick, - }) - .await - .unwrap() - .unwrap(); - - println!("joined chan!"); - } - .into_actor(self), - ); + self.server.do_send(crate::entities::channel::events::Join { + channel_name: channel.to_bytes(), + user_uuid: self.session_id, + user: ctx.address(), + nick: self.nick.clone(), + }); } } @@ -141,24 +124,41 @@ impl CommandHandler> for super::User { free_text, .. }: PrivmsgCommand<'static>, - ctx: &mut Self::Context, + _ctx: &mut Self::Context, ) { // TODO: ensure the user has a nick before sending messages!! - - let msg = crate::entities::common_events::Message { - from: self.nick.clone(), // TODO: this need to be a full user string i think - user_uuid: self.session_id, - to: receiver, - message: free_text.to_string(), - }; - - let server_addr = self.server.clone(); - - ctx.spawn( - async move { - server_addr.send(msg).await.unwrap(); + match receiver { + primitives::Receiver::User(nick) => { + let msg = crate::entities::common_events::UserMessage( + crate::entities::common_events::Message { + from: self.nick.clone(), + user_uuid: self.session_id, + to: nick, + message: free_text.to_string(), + }, + ); + + self.server.do_send(msg); + } + primitives::Receiver::Channel(channel_name) => { + if let Some(handle) = self.channels.get(channel_name.as_ref()) { + // todo: specific channel event? + let msg = crate::entities::common_events::ChannelMessage( + crate::entities::common_events::Message { + from: self.nick.clone(), + user_uuid: self.session_id, + to: channel_name, + message: free_text.to_string(), + }, + ); + + // todo: should this be do_send? + handle.message.do_send(msg).unwrap(); + } else { + // todo: error back to user if not in channel + panic!("user not in channel") + } } - .into_actor(self), - ); + } } } diff --git a/titanirc-server/src/entities/user/mod.rs b/titanirc-server/src/entities/user/mod.rs index 8a55af8..388c2ac 100644 --- a/titanirc-server/src/entities/user/mod.rs +++ b/titanirc-server/src/entities/user/mod.rs @@ -1,7 +1,7 @@ mod commands; pub mod events; -use crate::{entities::channel::events::JoinBroadcast, server::Server}; +use crate::server::Server; use std::{collections::HashMap, hash::Hash, sync::Arc}; @@ -9,7 +9,7 @@ use actix::{ io::{FramedWrite, WriteHandler}, prelude::*, }; -use bytes::Bytes; + use derive_more::Deref; use std::time::{Duration, Instant}; use titanirc_types::{ @@ -22,6 +22,8 @@ use titanirc_types::{ use tokio::{io::WriteHalf, net::TcpStream}; use uuid::Uuid; +use super::channel::ChannelName; + #[derive(Debug, Deref, Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)] #[allow(clippy::module_name_repetitions)] pub struct UserUuid(Uuid); @@ -36,7 +38,8 @@ pub struct User { >, pub last_active: Instant, pub nick: RegisteredNick, - pub channels: HashMap, crate::entities::channel::Handle>, + + pub channels: HashMap, } // TODO: broadcast a leave to all the user's channels on actor shutdown @@ -84,37 +87,75 @@ impl Actor for User { /// Handles errors from our socket Writer. impl WriteHandler for User {} -/// Handles `JoinBroadcast`s sent by a channel the user is in, and forwards a -/// `JOIN` onto them. -impl actix::Handler> for User { +impl actix::Handler for User { + type Result = (); + + fn handle( + &mut self, + msg: crate::entities::channel::Handle, + _ctx: &mut Self::Context, + ) -> Self::Result { + self.channels.insert(msg.channel_name.clone(), msg); + } +} + +/// Handles `Join`s sent by a channel the user is in, and forwards a +/// `JOIN` command to the user. +impl actix::Handler> for User { type Result = (); - fn handle(&mut self, msg: Arc, _ctx: &mut Self::Context) -> Self::Result { + fn handle( + &mut self, + msg: Arc, + _ctx: &mut Self::Context, + ) -> Self::Result { self.writer.write(ServerMessage::Command( Source::User(Nick((*msg.nick.load().unwrap()).clone().into())), JoinCommand { _phantom: std::marker::PhantomData, - channel: Channel(msg.channel_name.as_bytes().into()), + channel: Channel((&msg.channel_name[..]).into()), + } + .into(), + )); + } +} + +/// Handles messages that have been forwarded from a channel to this user. +impl actix::Handler> for User { + type Result = (); + + fn handle( + &mut self, + msg: Arc, + _ctx: &mut Self::Context, + ) -> Self::Result { + self.writer.write(ServerMessage::Command( + Source::User(Nick((*msg.0.from.load().unwrap()).clone().into())), + PrivmsgCommand { + _phantom: std::marker::PhantomData, + free_text: FreeText(msg.0.message.as_bytes().into()), + receiver: Receiver::Channel(msg.0.to.clone()), } .into(), )); } } -impl actix::Handler> for User { +/// Handles messages that have been sent directly to this user. +impl actix::Handler> for User { type Result = (); fn handle( &mut self, - msg: Arc, + msg: Arc, _ctx: &mut Self::Context, ) -> Self::Result { self.writer.write(ServerMessage::Command( - Source::User(Nick((*msg.from.load().unwrap()).clone().into())), + Source::User(Nick((*msg.0.from.load().unwrap()).clone().into())), PrivmsgCommand { _phantom: std::marker::PhantomData, - free_text: FreeText(msg.message.as_bytes().into()), - receiver: msg.to.clone(), + free_text: FreeText(msg.0.message.as_bytes().into()), + receiver: Receiver::User(msg.0.to.clone()), } .into(), )); diff --git a/titanirc-server/src/server.rs b/titanirc-server/src/server.rs index f9de0e0..756db93 100644 --- a/titanirc-server/src/server.rs +++ b/titanirc-server/src/server.rs @@ -1,9 +1,12 @@ -use crate::entities::{channel::Channel, user::User}; +use crate::entities::{ + channel::{Channel, ChannelName}, + user::User, +}; use std::{collections::HashMap, net::SocketAddr}; use actix::{io::FramedWrite, prelude::*}; -use titanirc_types::{protocol::primitives::Receiver, RegisteredNick, UserIdent}; +use titanirc_types::RegisteredNick; use tokio::net::TcpStream; use tokio_util::codec::FramedRead; @@ -15,7 +18,7 @@ use tokio_util::codec::FramedRead; /// Essentially acts as the middleman for each entity communicating with each other. pub struct Server { /// A list of known channels and the addresses to them. - pub channels: HashMap>, + pub channels: HashMap>, // A list of known connected users. // pub users: Vec<(UserIdent, Addr)>, // todo: add this when we know how auth is gonna work } @@ -69,7 +72,7 @@ impl Handler for Server { /// Sent by `User` actors to arbitrate access to the requested channel. impl Handler for Server { - type Result = ResponseActFuture; + type Result = (); // TODO: validate channel name fn handle( @@ -79,70 +82,30 @@ impl Handler for Server { ) -> Self::Result { // get the channel or create it if it doesn't already exist #[allow(clippy::option_if_let_else)] - let channel = if let Some(channel) = self.channels.get(&msg.channel_name) { + let channel = if let Some(channel) = self.channels.get(&msg.channel_name[..]) { channel } else { - let channel = Channel::create(|_ctx| Channel::new()); + let channel_name = ChannelName::new(msg.channel_name.clone()); + + let channel = Channel::create(|_ctx| Channel::new(channel_name.clone())); - self.channels - .entry(msg.channel_name.clone()) - .or_insert(channel) + self.channels.entry(channel_name).or_insert(channel) }; // forward the user's join event onto the channel - Box::pin( - channel - .send(msg) - .into_actor(self) - .map(|v, _, _| v.map_err(|e| e.into()).and_then(|v| v)), - ) + channel.do_send(msg); } } -impl Handler for Server { +impl Handler for Server { type Result = (); fn handle( &mut self, - msg: crate::entities::common_events::Message, - ctx: &mut Self::Context, + _msg: crate::entities::common_events::UserMessage, + _ctx: &mut Self::Context, ) -> Self::Result { - let dest = MessageDestination::get_destination_from_receiver(&self, &msg.to).unwrap(); - dest.send(ctx, msg); - } -} - -pub enum MessageDestination<'a> { - User(&'a Server, Addr), - Channel(&'a Server, Addr), -} - -impl<'a> MessageDestination<'a> { - pub fn get_destination_from_receiver<'b>( - server: &'a Server, - receiver: &Receiver<'b>, - ) -> Option { - match receiver { - Receiver::Channel(c) => server - .channels - .get(&c.to_string()) - .cloned() - .map(move |c| Self::Channel(server, c)), - Receiver::User(_u) => todo!(), - } - } - - pub fn send(self, ctx: &mut Context, msg: crate::entities::common_events::Message) { - match self { - Self::Channel(actor, channel) => { - ctx.spawn( - async move { - channel.send(msg).await.unwrap(); - } - .into_actor(actor), - ); - } - Self::User(_actor, _u) => todo!(), - } + // TODO: implement this when we have a `users` hashmap + todo!() } } diff --git a/titanirc-types/src/protocol/replies.rs b/titanirc-types/src/protocol/replies.rs index eb5796c..fd0300c 100644 --- a/titanirc-types/src/protocol/replies.rs +++ b/titanirc-types/src/protocol/replies.rs @@ -1,7 +1,6 @@ #![allow(clippy::wildcard_imports)] -use super::{commands::Command, primitives::*}; -use std::fmt::Write; +use super::primitives::*; /// The origin of a message that's about to be returned to the client. #[derive(Debug)] -- libgit2 1.7.2