🏡 index : ~doyle/titanirc.git

author Jordan Doyle <jordan@doyle.la> 2021-02-02 4:02:11.0 +00:00:00
committer Jordan Doyle <jordan@doyle.la> 2021-02-02 4:05:38.0 +00:00:00
commit
a96eedd20338cd64b14e558b8c679f6a9c44827b [patch]
tree
1ff16945e47c3b688fa84a482ebbcecdf46088d3
parent
e6cae0179d49c667ec9e7357338ff2cd18cf760c
download
a96eedd20338cd64b14e558b8c679f6a9c44827b.tar.gz

User must have handle before sending messages to channel



Diff

 titanirc-server/src/entities/channel/events.rs | 32 +----------
 titanirc-server/src/entities/channel/mod.rs    | 76 ++++++++++++++------------
 titanirc-server/src/entities/mod.rs            | 13 +++-
 titanirc-server/src/entities/user/commands.rs  | 80 +++++++++++++--------------
 titanirc-server/src/entities/user/mod.rs       | 67 ++++++++++++++++++-----
 titanirc-server/src/server.rs                  | 73 ++++++-------------------
 titanirc-types/src/protocol/replies.rs         |  3 +-
 7 files changed, 171 insertions(+), 173 deletions(-)

diff --git a/titanirc-server/src/entities/channel/events.rs b/titanirc-server/src/entities/channel/events.rs
index 4662145..74c2e3e 100644
--- a/titanirc-server/src/entities/channel/events.rs
+++ b/titanirc-server/src/entities/channel/events.rs
@@ -2,39 +2,17 @@ use crate::entities::user::{User, UserUuid};
use actix::prelude::*;
use titanirc_types::RegisteredNick;

pub type JoinResult = Result<super::Handle, JoinError>;


/// Send from `User` to `Channel` via `Server`, the `Channel` then replies back
/// with a direct handle for the `User` to interact with the `Channel`.
///
/// This message is also broadcast to all users in the `Channel`.
#[derive(Message)]
#[rtype(result = "JoinResult")]
#[rtype(result = "")]
pub struct Join {
    pub channel_name: String,
    pub channel_name: bytes::Bytes,
    pub user_uuid: UserUuid,
    pub nick: RegisteredNick,
    pub user: Addr<User>,
}

#[derive(Debug, thiserror::Error, displaydoc::Display)]
pub enum JoinError {
    /// Failed to send join request to channel: {0}
    Mailbox(#[from] actix::MailboxError),
}

/// Sent directly to every `User` when another `User` joins the channel.
#[derive(Message)]
#[rtype(result = "")]
pub struct JoinBroadcast {
    pub channel_name: String,
    pub nick: RegisteredNick,
}

impl From<Join> for JoinBroadcast {
    fn from(
        Join {
            channel_name, nick, ..
        }: Join,
    ) -> Self {
        Self { channel_name, nick }
    }
}
diff --git a/titanirc-server/src/entities/channel/mod.rs b/titanirc-server/src/entities/channel/mod.rs
index 17dffec..842325f 100644
--- a/titanirc-server/src/entities/channel/mod.rs
+++ b/titanirc-server/src/entities/channel/mod.rs
@@ -1,47 +1,61 @@
pub mod events;

use actix::prelude::*;
use bytes::Bytes;
use derive_more::Deref;
use std::{collections::HashMap, sync::Arc};

use crate::entities::user::{User, UserUuid};

use self::events::JoinBroadcast;
#[derive(Clone, Debug, Deref, Eq, PartialEq, Hash)]
#[allow(clippy::module_name_repetitions)]
pub struct ChannelName(Bytes);

/// A handle to this `Channel` that `User` actors can use to communicate with the
/// rest of the channel.
impl ChannelName {
    pub fn new(name: Bytes) -> Self {
        Self(name)
    }
}

impl std::borrow::Borrow<[u8]> for ChannelName {
    fn borrow(&self) -> &[u8] {
        &self.0[..]
    }
}

/// A handle to this `Channel` that `User` actors can use to communicate with
/// the rest of the channel. This is sent back to the `User` when it sends a
/// `events::Join`.
#[derive(Message)]
#[rtype(result = "")]
pub struct Handle {
    pub channel_name: ChannelName,
    //pub name_change: actix::Recipient<super::user::events::NameChange>,
    pub message: actix::Recipient<super::common_events::Message>,
    pub message: actix::Recipient<super::common_events::ChannelMessage>,
}

/// An IRC channel.
pub struct Channel {
    pub channel_name: ChannelName,
    pub members: HashMap<UserUuid, Addr<User>>,
}

impl Channel {
    pub fn new() -> Self {
    pub fn new(channel_name: ChannelName) -> Self {
        Self {
            channel_name,
            members: HashMap::new(),
        }
    }

    // TODO: add a flag not to broadcast messages to the source so PRIVMSGs dont get duplicated
    fn broadcast_message<M>(
        &self,
        skip_sender: Option<UserUuid>,
        msg: M,
    ) -> impl Future<Output = ()>
    fn broadcast_message<M>(&self, skip_sender: Option<UserUuid>, msg: Arc<M>)
    where
        M: Message + Send + Sync,
        M::Result: Send,
        Arc<M>: 'static,
        User: Handler<Arc<M>>,
    {
        let mut futures = Vec::with_capacity(self.members.len());

        let msg = Arc::new(msg);

        for (uuid, member) in &self.members {
            if let Some(skip_sender) = &skip_sender {
                if skip_sender == uuid {
@@ -49,11 +63,8 @@ impl Channel {
                }
            }

            futures.push(member.send(msg.clone()));
        }

        async {
            futures_util::future::try_join_all(futures).await.unwrap();
            // TODO: should this just be `send`?
            member.do_send(msg.clone());
        }
    }
}
@@ -63,34 +74,31 @@ impl Actor for Channel {
}

impl actix::Handler<events::Join> for Channel {
    type Result = events::JoinResult;
    type Result = ();

    fn handle(&mut self, event: events::Join, ctx: &mut Self::Context) -> Self::Result {
        self.members.insert(event.user_uuid, event.user.clone());

    fn handle(&mut self, msg: events::Join, ctx: &mut Self::Context) -> Self::Result {
        self.members.insert(msg.user_uuid, msg.user.clone());
        let event = Arc::new(event);

        ctx.spawn(
            self.broadcast_message(None, JoinBroadcast::from(msg))
                .into_actor(self),
        );
        self.broadcast_message(None, Arc::clone(&event));

        Ok(Handle {
        event.user.do_send(Handle {
            channel_name: self.channel_name.clone(),
            message: ctx.address().recipient(),
        })
    }
}

impl actix::Handler<super::common_events::Message> for Channel {
impl actix::Handler<super::common_events::ChannelMessage> for Channel {
    type Result = ();

    fn handle(
        &mut self,
        msg: super::common_events::Message,
        ctx: &mut Self::Context,
        msg: super::common_events::ChannelMessage,
        _ctx: &mut Self::Context,
    ) -> Self::Result {
        // TODO: don't allow messages from unconnected clients
        ctx.spawn(
            self.broadcast_message(Some(msg.user_uuid), msg)
                .into_actor(self),
        );
        self.broadcast_message(Some(msg.0.user_uuid), Arc::new(msg));
    }
}
diff --git a/titanirc-server/src/entities/mod.rs b/titanirc-server/src/entities/mod.rs
index 90cbc3f..602f4c2 100644
--- a/titanirc-server/src/entities/mod.rs
+++ b/titanirc-server/src/entities/mod.rs
@@ -2,14 +2,23 @@ pub mod channel;
pub mod user;

pub mod common_events {
    use std::fmt::Debug;

    use actix::prelude::*;

    #[derive(Debug, Message)]
    #[rtype(result = "")]
    pub struct Message {
    pub struct ChannelMessage(pub Message<titanirc_types::protocol::primitives::Channel<'static>>);

    #[derive(Debug, Message)]
    #[rtype(result = "")]
    pub struct UserMessage(pub Message<titanirc_types::protocol::primitives::Nick<'static>>);

    #[derive(Debug)]
    pub struct Message<T: Debug> {
        pub from: titanirc_types::RegisteredNick,
        pub user_uuid: crate::entities::user::UserUuid,
        pub to: titanirc_types::protocol::primitives::Receiver<'static>,
        pub to: T,
        pub message: String,
    }
}
diff --git a/titanirc-server/src/entities/user/commands.rs b/titanirc-server/src/entities/user/commands.rs
index 6d6eadb..3685514 100644
--- a/titanirc-server/src/entities/user/commands.rs
+++ b/titanirc-server/src/entities/user/commands.rs
@@ -2,7 +2,7 @@

use std::{sync::Arc, time::Instant};

use actix::{Actor, AsyncContext, StreamHandler, WrapFuture};
use actix::{Actor, AsyncContext, StreamHandler};
use titanirc_types::protocol::{
    commands::{
        Command, JoinCommand, ModeCommand, MotdCommand, NickCommand, PrivmsgCommand, VersionCommand,
@@ -62,30 +62,13 @@ impl CommandHandler<JoinCommand<'static>> for super::User {
        ctx: &mut Self::Context,
    ) {
        // TODO: ensure the user has a nick set before they join a channel!!!

        let server_addr = self.server.clone();
        let ctx_addr = ctx.address();
        let nick = self.nick.clone();
        let user_uuid = self.session_id;

        // TODO: needs to send MODE & NAMES (353, 366)
        ctx.spawn(
            async move {
                server_addr
                    .send(crate::entities::channel::events::Join {
                        channel_name: std::str::from_utf8(&channel.0[..]).unwrap().to_string(),
                        user_uuid,
                        user: ctx_addr,
                        nick,
                    })
                    .await
                    .unwrap()
                    .unwrap();

                println!("joined chan!");
            }
            .into_actor(self),
        );
        self.server.do_send(crate::entities::channel::events::Join {
            channel_name: channel.to_bytes(),
            user_uuid: self.session_id,
            user: ctx.address(),
            nick: self.nick.clone(),
        });
    }
}

@@ -141,24 +124,41 @@ impl CommandHandler<PrivmsgCommand<'static>> for super::User {
            free_text,
            ..
        }: PrivmsgCommand<'static>,
        ctx: &mut Self::Context,
        _ctx: &mut Self::Context,
    ) {
        // TODO: ensure the user has a nick before sending messages!!

        let msg = crate::entities::common_events::Message {
            from: self.nick.clone(), // TODO: this need to be a full user string i think
            user_uuid: self.session_id,
            to: receiver,
            message: free_text.to_string(),
        };

        let server_addr = self.server.clone();

        ctx.spawn(
            async move {
                server_addr.send(msg).await.unwrap();
        match receiver {
            primitives::Receiver::User(nick) => {
                let msg = crate::entities::common_events::UserMessage(
                    crate::entities::common_events::Message {
                        from: self.nick.clone(),
                        user_uuid: self.session_id,
                        to: nick,
                        message: free_text.to_string(),
                    },
                );

                self.server.do_send(msg);
            }
            primitives::Receiver::Channel(channel_name) => {
                if let Some(handle) = self.channels.get(channel_name.as_ref()) {
                    // todo: specific channel event?
                    let msg = crate::entities::common_events::ChannelMessage(
                        crate::entities::common_events::Message {
                            from: self.nick.clone(),
                            user_uuid: self.session_id,
                            to: channel_name,
                            message: free_text.to_string(),
                        },
                    );

                    // todo: should this be do_send?
                    handle.message.do_send(msg).unwrap();
                } else {
                    // todo: error back to user if not in channel
                    panic!("user not in channel")
                }
            }
            .into_actor(self),
        );
        }
    }
}
diff --git a/titanirc-server/src/entities/user/mod.rs b/titanirc-server/src/entities/user/mod.rs
index 8a55af8..388c2ac 100644
--- a/titanirc-server/src/entities/user/mod.rs
+++ b/titanirc-server/src/entities/user/mod.rs
@@ -1,7 +1,7 @@
mod commands;
pub mod events;

use crate::{entities::channel::events::JoinBroadcast, server::Server};
use crate::server::Server;

use std::{collections::HashMap, hash::Hash, sync::Arc};

@@ -9,7 +9,7 @@ use actix::{
    io::{FramedWrite, WriteHandler},
    prelude::*,
};
use bytes::Bytes;

use derive_more::Deref;
use std::time::{Duration, Instant};
use titanirc_types::{
@@ -22,6 +22,8 @@ use titanirc_types::{
use tokio::{io::WriteHalf, net::TcpStream};
use uuid::Uuid;

use super::channel::ChannelName;

#[derive(Debug, Deref, Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
#[allow(clippy::module_name_repetitions)]
pub struct UserUuid(Uuid);
@@ -36,7 +38,8 @@ pub struct User {
    >,
    pub last_active: Instant,
    pub nick: RegisteredNick,
    pub channels: HashMap<Arc<String>, crate::entities::channel::Handle>,

    pub channels: HashMap<ChannelName, crate::entities::channel::Handle>,
}

// TODO: broadcast a leave to all the user's channels on actor shutdown
@@ -84,37 +87,75 @@ impl Actor for User {
/// Handles errors from our socket Writer.
impl WriteHandler<std::io::Error> for User {}

/// Handles `JoinBroadcast`s sent by a channel the user is in, and forwards a
/// `JOIN` onto them.
impl actix::Handler<Arc<JoinBroadcast>> for User {
impl actix::Handler<crate::entities::channel::Handle> for User {
    type Result = ();

    fn handle(
        &mut self,
        msg: crate::entities::channel::Handle,
        _ctx: &mut Self::Context,
    ) -> Self::Result {
        self.channels.insert(msg.channel_name.clone(), msg);
    }
}

/// Handles `Join`s sent by a channel the user is in, and forwards a
/// `JOIN` command to the user.
impl actix::Handler<Arc<super::channel::events::Join>> for User {
    type Result = ();

    fn handle(&mut self, msg: Arc<JoinBroadcast>, _ctx: &mut Self::Context) -> Self::Result {
    fn handle(
        &mut self,
        msg: Arc<super::channel::events::Join>,
        _ctx: &mut Self::Context,
    ) -> Self::Result {
        self.writer.write(ServerMessage::Command(
            Source::User(Nick((*msg.nick.load().unwrap()).clone().into())),
            JoinCommand {
                _phantom: std::marker::PhantomData,
                channel: Channel(msg.channel_name.as_bytes().into()),
                channel: Channel((&msg.channel_name[..]).into()),
            }
            .into(),
        ));
    }
}

/// Handles messages that have been forwarded from a channel to this user.
impl actix::Handler<Arc<super::common_events::ChannelMessage>> for User {
    type Result = ();

    fn handle(
        &mut self,
        msg: Arc<super::common_events::ChannelMessage>,
        _ctx: &mut Self::Context,
    ) -> Self::Result {
        self.writer.write(ServerMessage::Command(
            Source::User(Nick((*msg.0.from.load().unwrap()).clone().into())),
            PrivmsgCommand {
                _phantom: std::marker::PhantomData,
                free_text: FreeText(msg.0.message.as_bytes().into()),
                receiver: Receiver::Channel(msg.0.to.clone()),
            }
            .into(),
        ));
    }
}

impl actix::Handler<Arc<crate::entities::common_events::Message>> for User {
/// Handles messages that have been sent directly to this user.
impl actix::Handler<Arc<super::common_events::UserMessage>> for User {
    type Result = ();

    fn handle(
        &mut self,
        msg: Arc<crate::entities::common_events::Message>,
        msg: Arc<super::common_events::UserMessage>,
        _ctx: &mut Self::Context,
    ) -> Self::Result {
        self.writer.write(ServerMessage::Command(
            Source::User(Nick((*msg.from.load().unwrap()).clone().into())),
            Source::User(Nick((*msg.0.from.load().unwrap()).clone().into())),
            PrivmsgCommand {
                _phantom: std::marker::PhantomData,
                free_text: FreeText(msg.message.as_bytes().into()),
                receiver: msg.to.clone(),
                free_text: FreeText(msg.0.message.as_bytes().into()),
                receiver: Receiver::User(msg.0.to.clone()),
            }
            .into(),
        ));
diff --git a/titanirc-server/src/server.rs b/titanirc-server/src/server.rs
index f9de0e0..756db93 100644
--- a/titanirc-server/src/server.rs
+++ b/titanirc-server/src/server.rs
@@ -1,9 +1,12 @@
use crate::entities::{channel::Channel, user::User};
use crate::entities::{
    channel::{Channel, ChannelName},
    user::User,
};

use std::{collections::HashMap, net::SocketAddr};

use actix::{io::FramedWrite, prelude::*};
use titanirc_types::{protocol::primitives::Receiver, RegisteredNick, UserIdent};
use titanirc_types::RegisteredNick;
use tokio::net::TcpStream;
use tokio_util::codec::FramedRead;

@@ -15,7 +18,7 @@ use tokio_util::codec::FramedRead;
/// Essentially acts as the middleman for each entity communicating with each other.
pub struct Server {
    /// A list of known channels and the addresses to them.
    pub channels: HashMap<String, Addr<Channel>>,
    pub channels: HashMap<ChannelName, Addr<Channel>>,
    // A list of known connected users.
    // pub users: Vec<(UserIdent, Addr<User>)>,    // todo: add this when we know how auth is gonna work
}
@@ -69,7 +72,7 @@ impl Handler<Connection> for Server {

/// Sent by `User` actors to arbitrate access to the requested channel.
impl Handler<crate::entities::channel::events::Join> for Server {
    type Result = ResponseActFuture<Self, crate::entities::channel::events::JoinResult>;
    type Result = ();

    // TODO: validate channel name
    fn handle(
@@ -79,70 +82,30 @@ impl Handler<crate::entities::channel::events::Join> for Server {
    ) -> Self::Result {
        // get the channel or create it if it doesn't already exist
        #[allow(clippy::option_if_let_else)]
        let channel = if let Some(channel) = self.channels.get(&msg.channel_name) {
        let channel = if let Some(channel) = self.channels.get(&msg.channel_name[..]) {
            channel
        } else {
            let channel = Channel::create(|_ctx| Channel::new());
            let channel_name = ChannelName::new(msg.channel_name.clone());

            let channel = Channel::create(|_ctx| Channel::new(channel_name.clone()));

            self.channels
                .entry(msg.channel_name.clone())
                .or_insert(channel)
            self.channels.entry(channel_name).or_insert(channel)
        };

        // forward the user's join event onto the channel
        Box::pin(
            channel
                .send(msg)
                .into_actor(self)
                .map(|v, _, _| v.map_err(|e| e.into()).and_then(|v| v)),
        )
        channel.do_send(msg);
    }
}

impl Handler<crate::entities::common_events::Message> for Server {
impl Handler<crate::entities::common_events::UserMessage> for Server {
    type Result = ();

    fn handle(
        &mut self,
        msg: crate::entities::common_events::Message,
        ctx: &mut Self::Context,
        _msg: crate::entities::common_events::UserMessage,
        _ctx: &mut Self::Context,
    ) -> Self::Result {
        let dest = MessageDestination::get_destination_from_receiver(&self, &msg.to).unwrap();
        dest.send(ctx, msg);
    }
}

pub enum MessageDestination<'a> {
    User(&'a Server, Addr<User>),
    Channel(&'a Server, Addr<Channel>),
}

impl<'a> MessageDestination<'a> {
    pub fn get_destination_from_receiver<'b>(
        server: &'a Server,
        receiver: &Receiver<'b>,
    ) -> Option<Self> {
        match receiver {
            Receiver::Channel(c) => server
                .channels
                .get(&c.to_string())
                .cloned()
                .map(move |c| Self::Channel(server, c)),
            Receiver::User(_u) => todo!(),
        }
    }

    pub fn send(self, ctx: &mut Context<Server>, msg: crate::entities::common_events::Message) {
        match self {
            Self::Channel(actor, channel) => {
                ctx.spawn(
                    async move {
                        channel.send(msg).await.unwrap();
                    }
                    .into_actor(actor),
                );
            }
            Self::User(_actor, _u) => todo!(),
        }
        // TODO: implement this when we have a `users` hashmap
        todo!()
    }
}
diff --git a/titanirc-types/src/protocol/replies.rs b/titanirc-types/src/protocol/replies.rs
index eb5796c..fd0300c 100644
--- a/titanirc-types/src/protocol/replies.rs
+++ b/titanirc-types/src/protocol/replies.rs
@@ -1,7 +1,6 @@
#![allow(clippy::wildcard_imports)]

use super::{commands::Command, primitives::*};
use std::fmt::Write;
use super::primitives::*;

/// The origin of a message that's about to be returned to the client.
#[derive(Debug)]