🏡 index : ~doyle/titanirc.git

author Jordan Doyle <jordan@doyle.la> 2021-01-31 20:29:35.0 +00:00:00
committer Jordan Doyle <jordan@doyle.la> 2021-01-31 20:29:35.0 +00:00:00
commit
afca2b1e4a1872dac57d2c7b3dde451e2f65d3c0 [patch]
tree
99b5058ae09e0e3983295a737d0a226820bc590d
parent
47b42fcb8b8d1cf4ac574b5b2183531738d0b60d
download
afca2b1e4a1872dac57d2c7b3dde451e2f65d3c0.tar.gz

Implement channel messaging



Diff

 titanirc-server/src/entities/channel/mod.rs   | 27 +++++++++++++++-------
 titanirc-server/src/entities/mod.rs           |  7 +++---
 titanirc-server/src/entities/user/commands.rs | 35 ++++++++++++++++++++++++++--
 titanirc-server/src/entities/user/mod.rs      | 27 ++++++++++++++++++++--
 titanirc-server/src/server.rs                 | 23 +++++++++++++++++-
 5 files changed, 103 insertions(+), 16 deletions(-)

diff --git a/titanirc-server/src/entities/channel/mod.rs b/titanirc-server/src/entities/channel/mod.rs
index 3bc306d..6626c03 100644
--- a/titanirc-server/src/entities/channel/mod.rs
+++ b/titanirc-server/src/entities/channel/mod.rs
@@ -5,6 +5,8 @@ use std::sync::Arc;

use crate::entities::user::User;

use self::events::JoinBroadcast;

/// A handle to this `Channel` that `User` actors can use to communicate with the
/// rest of the channel.
pub struct Handle {
@@ -24,14 +26,19 @@ impl Channel {
        }
    }

    /// Announce a user's join event to the rest of the channel.
    fn announce_join(&self, join: events::Join) -> impl Future<Output = ()> {
        let mut futures = Vec::new();
    fn broadcast_message<M>(&self, msg: M) -> impl Future<Output = ()>
    where
        M: Message + Send + Sync,
        M::Result: Send,
        Arc<M>: 'static,
        User: Handler<Arc<M>>,
    {
        let mut futures = Vec::with_capacity(self.members.len());

        let broadcast = Arc::new(events::JoinBroadcast::from(join));
        let msg = Arc::new(msg);

        for member in &self.members {
            futures.push(member.send(broadcast.clone()));
            futures.push(member.send(msg.clone()));
        }

        async {
@@ -50,7 +57,10 @@ impl actix::Handler<events::Join> for Channel {
    fn handle(&mut self, msg: events::Join, ctx: &mut Self::Context) -> Self::Result {
        self.members.push(msg.user.clone());

        ctx.spawn(self.announce_join(msg).into_actor(self));
        ctx.spawn(
            self.broadcast_message(JoinBroadcast::from(msg))
                .into_actor(self),
        );

        Ok(Handle {
            message: ctx.address().recipient(),
@@ -63,8 +73,9 @@ impl actix::Handler<super::common_events::Message> for Channel {

    fn handle(
        &mut self,
        _msg: super::common_events::Message,
        _ctx: &mut Self::Context,
        msg: super::common_events::Message,
        ctx: &mut Self::Context,
    ) -> Self::Result {
        ctx.spawn(self.broadcast_message(msg).into_actor(self));
    }
}
diff --git a/titanirc-server/src/entities/mod.rs b/titanirc-server/src/entities/mod.rs
index 56754c4..4e9e097 100644
--- a/titanirc-server/src/entities/mod.rs
+++ b/titanirc-server/src/entities/mod.rs
@@ -5,9 +5,10 @@ pub mod common_events {
    use actix::prelude::*;

    #[derive(Debug, Message)]
    #[rtype(result = "()")]
    #[rtype(result = "")]
    pub struct Message {
        from: String,
        message: String,
        pub from: String,
        pub to: String,
        pub message: String,
    }
}
diff --git a/titanirc-server/src/entities/user/commands.rs b/titanirc-server/src/entities/user/commands.rs
index af5b679..3bab702 100644
--- a/titanirc-server/src/entities/user/commands.rs
+++ b/titanirc-server/src/entities/user/commands.rs
@@ -1,7 +1,9 @@
use std::time::Instant;
use std::{ops::Deref, time::Instant};

use actix::{Actor, AsyncContext, StreamHandler, WrapFuture};
use titanirc_types::{Command, JoinCommand, ModeCommand, MotdCommand, NickCommand, VersionCommand};
use titanirc_types::{
    Command, JoinCommand, ModeCommand, MotdCommand, NickCommand, PrivmsgCommand, VersionCommand,
};

pub trait CommandHandler<T>: Actor {
    fn handle_cmd(&mut self, command: T, ctx: &mut Self::Context);
@@ -16,6 +18,7 @@ impl StreamHandler<Result<Command, std::io::Error>> for super::User {
            Ok(Command::Join(v)) => self.handle_cmd(v, ctx),
            Ok(Command::Mode(v)) => self.handle_cmd(v, ctx),
            Ok(Command::Motd(v)) => self.handle_cmd(v, ctx),
            Ok(Command::Privmsg(v)) => self.handle_cmd(v, ctx),
            Ok(Command::Version(v)) => self.handle_cmd(v, ctx),
            Ok(Command::Pong(_)) => {}
            Ok(cmd) => println!("cmd: {:?}", cmd),
@@ -111,3 +114,31 @@ impl CommandHandler<VersionCommand> for super::User {
        )
    }
}

impl CommandHandler<PrivmsgCommand> for super::User {
    fn handle_cmd(
        &mut self,
        PrivmsgCommand {
            receiver,
            free_text,
        }: PrivmsgCommand,
        ctx: &mut Self::Context,
    ) {
        if let Some(nick) = &self.nick {
            let msg = crate::entities::common_events::Message {
                from: nick.clone(), // TODO: this need to be a full user string i think
                to: receiver.to_string(),
                message: free_text.to_string(),
            };

            let server_addr = self.server.clone();

            ctx.spawn(
                async move {
                    server_addr.send(msg).await.unwrap();
                }
                .into_actor(self),
            );
        }
    }
}
diff --git a/titanirc-server/src/entities/user/mod.rs b/titanirc-server/src/entities/user/mod.rs
index c0eff2a..0912d23 100644
--- a/titanirc-server/src/entities/user/mod.rs
+++ b/titanirc-server/src/entities/user/mod.rs
@@ -3,14 +3,16 @@ pub mod events;

use crate::{entities::channel::events::JoinBroadcast, server::Server};

use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use actix::{
    io::{FramedWrite, WriteHandler},
    prelude::*,
};
use std::time::{Duration, Instant};
use titanirc_types::{Channel, JoinCommand, ServerMessage, Source};
use titanirc_types::{
    Channel, FreeText, JoinCommand, PrivmsgCommand, Receiver, ServerMessage, Source,
};
use tokio::{io::WriteHalf, net::TcpStream};

pub struct User {
@@ -76,3 +78,24 @@ impl actix::Handler<Arc<JoinBroadcast>> for User {
        ));
    }
}

impl actix::Handler<Arc<crate::entities::common_events::Message>> for User {
    type Result = ();

    fn handle(
        &mut self,
        msg: Arc<crate::entities::common_events::Message>,
        _ctx: &mut Self::Context,
    ) -> Self::Result {
        self.writer.write(ServerMessage::Command(
            Source::User(bytes::Bytes::from(msg.from.as_bytes().to_owned()).into()),
            PrivmsgCommand {
                free_text: FreeText(bytes::Bytes::from(msg.message.as_bytes().to_owned())),
                receiver: {
                    Receiver::Channel(bytes::Bytes::from(msg.to.as_bytes().to_owned()).into())
                },
            }
            .into(),
        ));
    }
}
diff --git a/titanirc-server/src/server.rs b/titanirc-server/src/server.rs
index 35b72cd..6a2cf04 100644
--- a/titanirc-server/src/server.rs
+++ b/titanirc-server/src/server.rs
@@ -55,7 +55,7 @@ impl Handler<Connection> for Server {
    }
}

/// Send by `User` actors to arbitrate access to the requested channel.
/// Sent by `User` actors to arbitrate access to the requested channel.
impl Handler<crate::entities::channel::events::Join> for Server {
    type Result = ResponseActFuture<Self, crate::entities::channel::events::JoinResult>;

@@ -85,3 +85,24 @@ impl Handler<crate::entities::channel::events::Join> for Server {
        )
    }
}

impl Handler<crate::entities::common_events::Message> for Server {
    type Result = ();

    fn handle(
        &mut self,
        msg: crate::entities::common_events::Message,
        ctx: &mut Self::Context,
    ) -> Self::Result {
        eprintln!("to: {}", msg.to);

        let channel = self.channels.get(&msg.to).unwrap().clone();

        ctx.spawn(
            async move {
                channel.send(msg).await.unwrap();
            }
            .into_actor(self),
        );
    }
}