User must have handle before sending messages to channel
Diff
titanirc-server/src/entities/channel/events.rs | 32 +----------
titanirc-server/src/entities/channel/mod.rs | 76 ++++++++++++++------------
titanirc-server/src/entities/mod.rs | 13 +++-
titanirc-server/src/entities/user/commands.rs | 80 +++++++++++++--------------
titanirc-server/src/entities/user/mod.rs | 67 ++++++++++++++++++-----
titanirc-server/src/server.rs | 73 ++++++-------------------
titanirc-types/src/protocol/replies.rs | 3 +-
7 files changed, 171 insertions(+), 173 deletions(-)
@@ -2,39 +2,17 @@ use crate::entities::user::{User, UserUuid};
use actix::prelude::*;
use titanirc_types::RegisteredNick;
pub type JoinResult = Result<super::Handle, JoinError>;
#[derive(Message)]
#[rtype(result = "JoinResult")]
#[rtype(result = "")]
pub struct Join {
pub channel_name: String,
pub channel_name: bytes::Bytes,
pub user_uuid: UserUuid,
pub nick: RegisteredNick,
pub user: Addr<User>,
}
#[derive(Debug, thiserror::Error, displaydoc::Display)]
pub enum JoinError {
Mailbox(#[from] actix::MailboxError),
}
#[derive(Message)]
#[rtype(result = "")]
pub struct JoinBroadcast {
pub channel_name: String,
pub nick: RegisteredNick,
}
impl From<Join> for JoinBroadcast {
fn from(
Join {
channel_name, nick, ..
}: Join,
) -> Self {
Self { channel_name, nick }
}
}
@@ -1,47 +1,61 @@
pub mod events;
use actix::prelude::*;
use bytes::Bytes;
use derive_more::Deref;
use std::{collections::HashMap, sync::Arc};
use crate::entities::user::{User, UserUuid};
use self::events::JoinBroadcast;
#[derive(Clone, Debug, Deref, Eq, PartialEq, Hash)]
#[allow(clippy::module_name_repetitions)]
pub struct ChannelName(Bytes);
impl ChannelName {
pub fn new(name: Bytes) -> Self {
Self(name)
}
}
impl std::borrow::Borrow<[u8]> for ChannelName {
fn borrow(&self) -> &[u8] {
&self.0[..]
}
}
#[derive(Message)]
#[rtype(result = "")]
pub struct Handle {
pub channel_name: ChannelName,
pub message: actix::Recipient<super::common_events::Message>,
pub message: actix::Recipient<super::common_events::ChannelMessage>,
}
pub struct Channel {
pub channel_name: ChannelName,
pub members: HashMap<UserUuid, Addr<User>>,
}
impl Channel {
pub fn new() -> Self {
pub fn new(channel_name: ChannelName) -> Self {
Self {
channel_name,
members: HashMap::new(),
}
}
fn broadcast_message<M>(
&self,
skip_sender: Option<UserUuid>,
msg: M,
) -> impl Future<Output = ()>
fn broadcast_message<M>(&self, skip_sender: Option<UserUuid>, msg: Arc<M>)
where
M: Message + Send + Sync,
M::Result: Send,
Arc<M>: 'static,
User: Handler<Arc<M>>,
{
let mut futures = Vec::with_capacity(self.members.len());
let msg = Arc::new(msg);
for (uuid, member) in &self.members {
if let Some(skip_sender) = &skip_sender {
if skip_sender == uuid {
@@ -49,11 +63,8 @@ impl Channel {
}
}
futures.push(member.send(msg.clone()));
}
async {
futures_util::future::try_join_all(futures).await.unwrap();
member.do_send(msg.clone());
}
}
}
@@ -63,34 +74,31 @@ impl Actor for Channel {
}
impl actix::Handler<events::Join> for Channel {
type Result = events::JoinResult;
type Result = ();
fn handle(&mut self, event: events::Join, ctx: &mut Self::Context) -> Self::Result {
self.members.insert(event.user_uuid, event.user.clone());
fn handle(&mut self, msg: events::Join, ctx: &mut Self::Context) -> Self::Result {
self.members.insert(msg.user_uuid, msg.user.clone());
let event = Arc::new(event);
ctx.spawn(
self.broadcast_message(None, JoinBroadcast::from(msg))
.into_actor(self),
);
self.broadcast_message(None, Arc::clone(&event));
Ok(Handle {
event.user.do_send(Handle {
channel_name: self.channel_name.clone(),
message: ctx.address().recipient(),
})
}
}
impl actix::Handler<super::common_events::Message> for Channel {
impl actix::Handler<super::common_events::ChannelMessage> for Channel {
type Result = ();
fn handle(
&mut self,
msg: super::common_events::Message,
ctx: &mut Self::Context,
msg: super::common_events::ChannelMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
ctx.spawn(
self.broadcast_message(Some(msg.user_uuid), msg)
.into_actor(self),
);
self.broadcast_message(Some(msg.0.user_uuid), Arc::new(msg));
}
}
@@ -2,14 +2,23 @@ pub mod channel;
pub mod user;
pub mod common_events {
use std::fmt::Debug;
use actix::prelude::*;
#[derive(Debug, Message)]
#[rtype(result = "")]
pub struct Message {
pub struct ChannelMessage(pub Message<titanirc_types::protocol::primitives::Channel<'static>>);
#[derive(Debug, Message)]
#[rtype(result = "")]
pub struct UserMessage(pub Message<titanirc_types::protocol::primitives::Nick<'static>>);
#[derive(Debug)]
pub struct Message<T: Debug> {
pub from: titanirc_types::RegisteredNick,
pub user_uuid: crate::entities::user::UserUuid,
pub to: titanirc_types::protocol::primitives::Receiver<'static>,
pub to: T,
pub message: String,
}
}
@@ -2,7 +2,7 @@
use std::{sync::Arc, time::Instant};
use actix::{Actor, AsyncContext, StreamHandler, WrapFuture};
use actix::{Actor, AsyncContext, StreamHandler};
use titanirc_types::protocol::{
commands::{
Command, JoinCommand, ModeCommand, MotdCommand, NickCommand, PrivmsgCommand, VersionCommand,
@@ -62,30 +62,13 @@ impl CommandHandler<JoinCommand<'static>> for super::User {
ctx: &mut Self::Context,
) {
let server_addr = self.server.clone();
let ctx_addr = ctx.address();
let nick = self.nick.clone();
let user_uuid = self.session_id;
ctx.spawn(
async move {
server_addr
.send(crate::entities::channel::events::Join {
channel_name: std::str::from_utf8(&channel.0[..]).unwrap().to_string(),
user_uuid,
user: ctx_addr,
nick,
})
.await
.unwrap()
.unwrap();
println!("joined chan!");
}
.into_actor(self),
);
self.server.do_send(crate::entities::channel::events::Join {
channel_name: channel.to_bytes(),
user_uuid: self.session_id,
user: ctx.address(),
nick: self.nick.clone(),
});
}
}
@@ -141,24 +124,41 @@ impl CommandHandler<PrivmsgCommand<'static>> for super::User {
free_text,
..
}: PrivmsgCommand<'static>,
ctx: &mut Self::Context,
_ctx: &mut Self::Context,
) {
let msg = crate::entities::common_events::Message {
from: self.nick.clone(), user_uuid: self.session_id,
to: receiver,
message: free_text.to_string(),
};
let server_addr = self.server.clone();
ctx.spawn(
async move {
server_addr.send(msg).await.unwrap();
match receiver {
primitives::Receiver::User(nick) => {
let msg = crate::entities::common_events::UserMessage(
crate::entities::common_events::Message {
from: self.nick.clone(),
user_uuid: self.session_id,
to: nick,
message: free_text.to_string(),
},
);
self.server.do_send(msg);
}
primitives::Receiver::Channel(channel_name) => {
if let Some(handle) = self.channels.get(channel_name.as_ref()) {
let msg = crate::entities::common_events::ChannelMessage(
crate::entities::common_events::Message {
from: self.nick.clone(),
user_uuid: self.session_id,
to: channel_name,
message: free_text.to_string(),
},
);
handle.message.do_send(msg).unwrap();
} else {
panic!("user not in channel")
}
}
.into_actor(self),
);
}
}
}
@@ -1,7 +1,7 @@
mod commands;
pub mod events;
use crate::{entities::channel::events::JoinBroadcast, server::Server};
use crate::server::Server;
use std::{collections::HashMap, hash::Hash, sync::Arc};
@@ -9,7 +9,7 @@ use actix::{
io::{FramedWrite, WriteHandler},
prelude::*,
};
use bytes::Bytes;
use derive_more::Deref;
use std::time::{Duration, Instant};
use titanirc_types::{
@@ -22,6 +22,8 @@ use titanirc_types::{
use tokio::{io::WriteHalf, net::TcpStream};
use uuid::Uuid;
use super::channel::ChannelName;
#[derive(Debug, Deref, Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
#[allow(clippy::module_name_repetitions)]
pub struct UserUuid(Uuid);
@@ -36,7 +38,8 @@ pub struct User {
>,
pub last_active: Instant,
pub nick: RegisteredNick,
pub channels: HashMap<Arc<String>, crate::entities::channel::Handle>,
pub channels: HashMap<ChannelName, crate::entities::channel::Handle>,
}
@@ -84,37 +87,75 @@ impl Actor for User {
impl WriteHandler<std::io::Error> for User {}
impl actix::Handler<Arc<JoinBroadcast>> for User {
impl actix::Handler<crate::entities::channel::Handle> for User {
type Result = ();
fn handle(
&mut self,
msg: crate::entities::channel::Handle,
_ctx: &mut Self::Context,
) -> Self::Result {
self.channels.insert(msg.channel_name.clone(), msg);
}
}
impl actix::Handler<Arc<super::channel::events::Join>> for User {
type Result = ();
fn handle(&mut self, msg: Arc<JoinBroadcast>, _ctx: &mut Self::Context) -> Self::Result {
fn handle(
&mut self,
msg: Arc<super::channel::events::Join>,
_ctx: &mut Self::Context,
) -> Self::Result {
self.writer.write(ServerMessage::Command(
Source::User(Nick((*msg.nick.load().unwrap()).clone().into())),
JoinCommand {
_phantom: std::marker::PhantomData,
channel: Channel(msg.channel_name.as_bytes().into()),
channel: Channel((&msg.channel_name[..]).into()),
}
.into(),
));
}
}
impl actix::Handler<Arc<super::common_events::ChannelMessage>> for User {
type Result = ();
fn handle(
&mut self,
msg: Arc<super::common_events::ChannelMessage>,
_ctx: &mut Self::Context,
) -> Self::Result {
self.writer.write(ServerMessage::Command(
Source::User(Nick((*msg.0.from.load().unwrap()).clone().into())),
PrivmsgCommand {
_phantom: std::marker::PhantomData,
free_text: FreeText(msg.0.message.as_bytes().into()),
receiver: Receiver::Channel(msg.0.to.clone()),
}
.into(),
));
}
}
impl actix::Handler<Arc<crate::entities::common_events::Message>> for User {
impl actix::Handler<Arc<super::common_events::UserMessage>> for User {
type Result = ();
fn handle(
&mut self,
msg: Arc<crate::entities::common_events::Message>,
msg: Arc<super::common_events::UserMessage>,
_ctx: &mut Self::Context,
) -> Self::Result {
self.writer.write(ServerMessage::Command(
Source::User(Nick((*msg.from.load().unwrap()).clone().into())),
Source::User(Nick((*msg.0.from.load().unwrap()).clone().into())),
PrivmsgCommand {
_phantom: std::marker::PhantomData,
free_text: FreeText(msg.message.as_bytes().into()),
receiver: msg.to.clone(),
free_text: FreeText(msg.0.message.as_bytes().into()),
receiver: Receiver::User(msg.0.to.clone()),
}
.into(),
));
@@ -1,9 +1,12 @@
use crate::entities::{channel::Channel, user::User};
use crate::entities::{
channel::{Channel, ChannelName},
user::User,
};
use std::{collections::HashMap, net::SocketAddr};
use actix::{io::FramedWrite, prelude::*};
use titanirc_types::{protocol::primitives::Receiver, RegisteredNick, UserIdent};
use titanirc_types::RegisteredNick;
use tokio::net::TcpStream;
use tokio_util::codec::FramedRead;
@@ -15,7 +18,7 @@ use tokio_util::codec::FramedRead;
pub struct Server {
pub channels: HashMap<String, Addr<Channel>>,
pub channels: HashMap<ChannelName, Addr<Channel>>,
}
@@ -69,7 +72,7 @@ impl Handler<Connection> for Server {
impl Handler<crate::entities::channel::events::Join> for Server {
type Result = ResponseActFuture<Self, crate::entities::channel::events::JoinResult>;
type Result = ();
fn handle(
@@ -79,70 +82,30 @@ impl Handler<crate::entities::channel::events::Join> for Server {
) -> Self::Result {
#[allow(clippy::option_if_let_else)]
let channel = if let Some(channel) = self.channels.get(&msg.channel_name) {
let channel = if let Some(channel) = self.channels.get(&msg.channel_name[..]) {
channel
} else {
let channel = Channel::create(|_ctx| Channel::new());
let channel_name = ChannelName::new(msg.channel_name.clone());
let channel = Channel::create(|_ctx| Channel::new(channel_name.clone()));
self.channels
.entry(msg.channel_name.clone())
.or_insert(channel)
self.channels.entry(channel_name).or_insert(channel)
};
Box::pin(
channel
.send(msg)
.into_actor(self)
.map(|v, _, _| v.map_err(|e| e.into()).and_then(|v| v)),
)
channel.do_send(msg);
}
}
impl Handler<crate::entities::common_events::Message> for Server {
impl Handler<crate::entities::common_events::UserMessage> for Server {
type Result = ();
fn handle(
&mut self,
msg: crate::entities::common_events::Message,
ctx: &mut Self::Context,
_msg: crate::entities::common_events::UserMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
let dest = MessageDestination::get_destination_from_receiver(&self, &msg.to).unwrap();
dest.send(ctx, msg);
}
}
pub enum MessageDestination<'a> {
User(&'a Server, Addr<User>),
Channel(&'a Server, Addr<Channel>),
}
impl<'a> MessageDestination<'a> {
pub fn get_destination_from_receiver<'b>(
server: &'a Server,
receiver: &Receiver<'b>,
) -> Option<Self> {
match receiver {
Receiver::Channel(c) => server
.channels
.get(&c.to_string())
.cloned()
.map(move |c| Self::Channel(server, c)),
Receiver::User(_u) => todo!(),
}
}
pub fn send(self, ctx: &mut Context<Server>, msg: crate::entities::common_events::Message) {
match self {
Self::Channel(actor, channel) => {
ctx.spawn(
async move {
channel.send(msg).await.unwrap();
}
.into_actor(actor),
);
}
Self::User(_actor, _u) => todo!(),
}
todo!()
}
}
@@ -1,7 +1,6 @@
#![allow(clippy::wildcard_imports)]
use super::{commands::Command, primitives::*};
use std::fmt::Write;
use super::primitives::*;
#[derive(Debug)]