Implement channel messaging
Diff
titanirc-server/src/entities/channel/mod.rs | 27 +++++++++++++++-------
titanirc-server/src/entities/mod.rs | 7 +++---
titanirc-server/src/entities/user/commands.rs | 35 ++++++++++++++++++++++++++--
titanirc-server/src/entities/user/mod.rs | 27 ++++++++++++++++++++--
titanirc-server/src/server.rs | 23 +++++++++++++++++-
5 files changed, 103 insertions(+), 16 deletions(-)
@@ -5,6 +5,8 @@ use std::sync::Arc;
use crate::entities::user::User;
use self::events::JoinBroadcast;
pub struct Handle {
@@ -24,14 +26,19 @@ impl Channel {
}
}
fn announce_join(&self, join: events::Join) -> impl Future<Output = ()> {
let mut futures = Vec::new();
fn broadcast_message<M>(&self, msg: M) -> impl Future<Output = ()>
where
M: Message + Send + Sync,
M::Result: Send,
Arc<M>: 'static,
User: Handler<Arc<M>>,
{
let mut futures = Vec::with_capacity(self.members.len());
let broadcast = Arc::new(events::JoinBroadcast::from(join));
let msg = Arc::new(msg);
for member in &self.members {
futures.push(member.send(broadcast.clone()));
futures.push(member.send(msg.clone()));
}
async {
@@ -50,7 +57,10 @@ impl actix::Handler<events::Join> for Channel {
fn handle(&mut self, msg: events::Join, ctx: &mut Self::Context) -> Self::Result {
self.members.push(msg.user.clone());
ctx.spawn(self.announce_join(msg).into_actor(self));
ctx.spawn(
self.broadcast_message(JoinBroadcast::from(msg))
.into_actor(self),
);
Ok(Handle {
message: ctx.address().recipient(),
@@ -63,8 +73,9 @@ impl actix::Handler<super::common_events::Message> for Channel {
fn handle(
&mut self,
_msg: super::common_events::Message,
_ctx: &mut Self::Context,
msg: super::common_events::Message,
ctx: &mut Self::Context,
) -> Self::Result {
ctx.spawn(self.broadcast_message(msg).into_actor(self));
}
}
@@ -5,9 +5,10 @@ pub mod common_events {
use actix::prelude::*;
#[derive(Debug, Message)]
#[rtype(result = "()")]
#[rtype(result = "")]
pub struct Message {
from: String,
message: String,
pub from: String,
pub to: String,
pub message: String,
}
}
@@ -1,7 +1,9 @@
use std::time::Instant;
use std::{ops::Deref, time::Instant};
use actix::{Actor, AsyncContext, StreamHandler, WrapFuture};
use titanirc_types::{Command, JoinCommand, ModeCommand, MotdCommand, NickCommand, VersionCommand};
use titanirc_types::{
Command, JoinCommand, ModeCommand, MotdCommand, NickCommand, PrivmsgCommand, VersionCommand,
};
pub trait CommandHandler<T>: Actor {
fn handle_cmd(&mut self, command: T, ctx: &mut Self::Context);
@@ -16,6 +18,7 @@ impl StreamHandler<Result<Command, std::io::Error>> for super::User {
Ok(Command::Join(v)) => self.handle_cmd(v, ctx),
Ok(Command::Mode(v)) => self.handle_cmd(v, ctx),
Ok(Command::Motd(v)) => self.handle_cmd(v, ctx),
Ok(Command::Privmsg(v)) => self.handle_cmd(v, ctx),
Ok(Command::Version(v)) => self.handle_cmd(v, ctx),
Ok(Command::Pong(_)) => {}
Ok(cmd) => println!("cmd: {:?}", cmd),
@@ -111,3 +114,31 @@ impl CommandHandler<VersionCommand> for super::User {
)
}
}
impl CommandHandler<PrivmsgCommand> for super::User {
fn handle_cmd(
&mut self,
PrivmsgCommand {
receiver,
free_text,
}: PrivmsgCommand,
ctx: &mut Self::Context,
) {
if let Some(nick) = &self.nick {
let msg = crate::entities::common_events::Message {
from: nick.clone(), to: receiver.to_string(),
message: free_text.to_string(),
};
let server_addr = self.server.clone();
ctx.spawn(
async move {
server_addr.send(msg).await.unwrap();
}
.into_actor(self),
);
}
}
}
@@ -3,14 +3,16 @@ pub mod events;
use crate::{entities::channel::events::JoinBroadcast, server::Server};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use actix::{
io::{FramedWrite, WriteHandler},
prelude::*,
};
use std::time::{Duration, Instant};
use titanirc_types::{Channel, JoinCommand, ServerMessage, Source};
use titanirc_types::{
Channel, FreeText, JoinCommand, PrivmsgCommand, Receiver, ServerMessage, Source,
};
use tokio::{io::WriteHalf, net::TcpStream};
pub struct User {
@@ -76,3 +78,24 @@ impl actix::Handler<Arc<JoinBroadcast>> for User {
));
}
}
impl actix::Handler<Arc<crate::entities::common_events::Message>> for User {
type Result = ();
fn handle(
&mut self,
msg: Arc<crate::entities::common_events::Message>,
_ctx: &mut Self::Context,
) -> Self::Result {
self.writer.write(ServerMessage::Command(
Source::User(bytes::Bytes::from(msg.from.as_bytes().to_owned()).into()),
PrivmsgCommand {
free_text: FreeText(bytes::Bytes::from(msg.message.as_bytes().to_owned())),
receiver: {
Receiver::Channel(bytes::Bytes::from(msg.to.as_bytes().to_owned()).into())
},
}
.into(),
));
}
}
@@ -55,7 +55,7 @@ impl Handler<Connection> for Server {
}
}
impl Handler<crate::entities::channel::events::Join> for Server {
type Result = ResponseActFuture<Self, crate::entities::channel::events::JoinResult>;
@@ -85,3 +85,24 @@ impl Handler<crate::entities::channel::events::Join> for Server {
)
}
}
impl Handler<crate::entities::common_events::Message> for Server {
type Result = ();
fn handle(
&mut self,
msg: crate::entities::common_events::Message,
ctx: &mut Self::Context,
) -> Self::Result {
eprintln!("to: {}", msg.to);
let channel = self.channels.get(&msg.to).unwrap().clone();
ctx.spawn(
async move {
channel.send(msg).await.unwrap();
}
.into_actor(self),
);
}
}