🏡 index : ~doyle/titanirc.git

author Jordan Doyle <jordan@doyle.la> 2021-01-30 19:50:50.0 +00:00:00
committer Jordan Doyle <jordan@doyle.la> 2021-01-31 0:06:54.0 +00:00:00
commit
47b42fcb8b8d1cf4ac574b5b2183531738d0b60d [patch]
tree
61dcb6807910113456f8aeb6e02b932c616262a1
parent
292cd71297abde6adbf1a8a01b8b72f0c046b622
download
47b42fcb8b8d1cf4ac574b5b2183531738d0b60d.tar.gz

Light refactoring



Diff

 Cargo.lock                                     |  41 ++++---
 titanirc-codec/src/lib.rs                      |   2 +-
 titanirc-codec/src/wire.rs                     |  12 +-
 titanirc-server/Cargo.toml                     |   3 +-
 titanirc-server/src/entities/channel.rs        |  97 +-----------------
 titanirc-server/src/entities/channel/events.rs |  38 +++++++-
 titanirc-server/src/entities/channel/mod.rs    |  70 ++++++++++++-
 titanirc-server/src/entities/user.rs           | 148 +--------------------------
 titanirc-server/src/entities/user/commands.rs  | 113 ++++++++++++++++++++-
 titanirc-server/src/entities/user/events.rs    |   8 +-
 titanirc-server/src/entities/user/mod.rs       |  78 ++++++++++++++-
 titanirc-server/src/main.rs                    |   8 +-
 titanirc-server/src/server.rs                  |  43 +++++---
 titanirc-types/Cargo.toml                      |   3 +-
 titanirc-types/src/lib.rs                      |  45 ++++----
 titanirc-types/src/primitives.rs               |  78 ++++++++------
 titanirc-types/src/replies.rs                  |   4 +-
 17 files changed, 456 insertions(+), 335 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 1a756ae..a06c6d5 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -212,9 +212,9 @@ dependencies = [

[[package]]
name = "data-encoding"
version = "2.3.1"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "993a608597367c6377b258c25d7120740f00ed23a2252b729b1932dd7866f908"
checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57"

[[package]]
name = "derive_more"
@@ -445,9 +445,9 @@ dependencies = [

[[package]]
name = "libc"
version = "0.2.82"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89203f3fba0a3795506acaad8ebce3c80c0af93f994d5a1d7a0b1eeb23271929"
checksum = "1cca32fa0182e8c0989459524dc356b8f2b5c10f1b9eb521b7d182c03cf8c5ff"

[[package]]
name = "linked-hash-map"
@@ -466,11 +466,11 @@ dependencies = [

[[package]]
name = "log"
version = "0.4.13"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcf3805d4480bb5b86070dcfeb9e2cb2ebc148adb753c5cca5f884d1d65a42b2"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
dependencies = [
 "cfg-if 0.1.10",
 "cfg-if 1.0.0",
]

[[package]]
@@ -536,6 +536,15 @@ dependencies = [
]

[[package]]
name = "nom-bytes"
version = "0.1.0"
source = "git+https://github.com/w4/nom-bytes#7e44afde2e53f447fc9c77297eb513186b8fb0da"
dependencies = [
 "bytes",
 "nom",
]

[[package]]
name = "ntapi"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -689,9 +698,9 @@ checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8"

[[package]]
name = "rand"
version = "0.8.2"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18519b42a40024d661e1714153e9ad0c3de27cd495760ceb09710920f1098b1e"
checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e"
dependencies = [
 "libc",
 "rand_chacha",
@@ -856,9 +865,9 @@ dependencies = [

[[package]]
name = "tinyvec"
version = "1.1.0"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccf8dbc19eb42fba10e8feaaec282fb50e2c14b2726d6301dbfeed0f73306a6f"
checksum = "317cca572a0e89c3ce0ca1f1bdc9369547fe318a683418e42ac8f59d14701023"
dependencies = [
 "tinyvec_macros",
]
@@ -886,6 +895,7 @@ dependencies = [
 "actix",
 "actix-rt",
 "async-stream",
 "bytes",
 "clap",
 "displaydoc",
 "futures-util",
@@ -903,14 +913,15 @@ dependencies = [
 "bytes",
 "derive_more",
 "nom",
 "nom-bytes",
 "paste",
]

[[package]]
name = "tokio"
version = "1.1.0"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8efab2086f17abcddb8f756117665c958feee6b2e39974c2f1600592ab3a4195"
checksum = "6714d663090b6b0acb0fa85841c6d66233d150cdb2602c8f9b8abb03370beb3f"
dependencies = [
 "autocfg",
 "bytes",
@@ -1058,9 +1069,9 @@ checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"

[[package]]
name = "wasi"
version = "0.10.1+wasi-snapshot-preview1"
version = "0.10.2+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93c6c3420963c5c64bca373b25e77acb562081b9bb4dd5bb864187742186cea9"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"

[[package]]
name = "widestring"
diff --git a/titanirc-codec/src/lib.rs b/titanirc-codec/src/lib.rs
index 206da83..f44a408 100644
--- a/titanirc-codec/src/lib.rs
+++ b/titanirc-codec/src/lib.rs
@@ -1,3 +1,5 @@
//! Used to encode/decode messages to and from the client.

#![deny(clippy::pedantic)]
#![allow(clippy::missing_errors_doc)]

diff --git a/titanirc-codec/src/wire.rs b/titanirc-codec/src/wire.rs
index 9b9cee5..01e6da7 100644
--- a/titanirc-codec/src/wire.rs
+++ b/titanirc-codec/src/wire.rs
@@ -1,4 +1,4 @@
use bytes::{Buf, BytesMut};
use bytes::{BytesMut};
use titanirc_types::Command;
use tokio_util::codec::Decoder as FrameDecoder;

@@ -25,11 +25,15 @@ impl FrameDecoder for Decoder {
            return Ok(None);
        };

        let bytes = src.copy_to_bytes(length + 1);
        let bytes = {
            let mut b = src.split_to(length + 1);
            b.truncate(b.len() - 2); // remove the crlf at the end of the buffer
            b.freeze()
        };

        eprintln!("{:?}", std::str::from_utf8(&bytes[..bytes.len() - 2]));
        eprintln!("{:?}", std::str::from_utf8(&bytes[..]));

        match Command::parse(&bytes[..bytes.len() - 2]) {
        match Command::parse(bytes) {
            Ok(Some(msg)) => Ok(Some(msg)),
            Ok(None) => Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
diff --git a/titanirc-server/Cargo.toml b/titanirc-server/Cargo.toml
index 1b4032c..5dcc9ad 100644
--- a/titanirc-server/Cargo.toml
+++ b/titanirc-server/Cargo.toml
@@ -18,4 +18,5 @@ async-stream = "0.3"
thiserror = "1"
displaydoc = "0.1"
clap = "3.0.0-beta.2"
futures-util = "0.3"
\ No newline at end of file
futures-util = "0.3"
bytes = "1.0"
\ No newline at end of file
diff --git a/titanirc-server/src/entities/channel.rs b/titanirc-server/src/entities/channel.rs
deleted file mode 100644
index b399cc2..0000000
--- a/titanirc-server/src/entities/channel.rs
+++ /dev/null
@@ -1,97 +0,0 @@
use std::sync::Arc;

use actix::prelude::*;

use crate::{entities::user::User, server::Server};

pub mod events {
    use crate::entities::user::User;
    use actix::prelude::*;

    pub type JoinResult = Result<super::Handle, JoinError>;

    #[derive(Message)]
    #[rtype(result = "JoinResult")]
    pub struct Join {
        pub channel_name: String,
        pub nick: String,
        pub user: Addr<User>,
    }

    #[derive(Debug, thiserror::Error, displaydoc::Display)]
    pub enum JoinError {
        /// Failed to send join request to channel: {0}
        Mailbox(#[from] actix::MailboxError),
    }

    #[derive(Message)]
    #[rtype(result = "")]
    pub struct JoinBroadcast {
        pub channel_name: String,
        pub nick: String,
    }

    impl From<Join> for JoinBroadcast {
        fn from(
            Join {
                channel_name, nick, ..
            }: Join,
        ) -> Self {
            Self { channel_name, nick }
        }
    }
}

pub struct Handle {
    //pub name_change: actix::Recipient<super::user::events::NameChange>,
    pub message: actix::Recipient<super::common_events::Message>,
}

pub struct Channel {
    pub members: Vec<Addr<User>>,
}

impl Channel {
    fn announce_join(&self, join: events::Join) -> impl Future<Output = ()> {
        let mut futures = Vec::new();

        let broadcast = Arc::new(events::JoinBroadcast::from(join));

        for member in &self.members {
            futures.push(member.send(broadcast.clone()));
        }

        async {
            futures_util::future::try_join_all(futures).await.unwrap();
        }
    }
}

impl Actor for Channel {
    type Context = Context<Self>;
}

impl actix::Handler<events::Join> for Channel {
    type Result = events::JoinResult;

    fn handle(&mut self, msg: events::Join, ctx: &mut Self::Context) -> Self::Result {
        self.members.push(msg.user.clone());

        ctx.spawn(self.announce_join(msg).into_actor(self));

        Ok(Handle {
            message: ctx.address().recipient(),
        })
    }
}

impl actix::Handler<super::common_events::Message> for Channel {
    type Result = ();

    fn handle(
        &mut self,
        msg: super::common_events::Message,
        ctx: &mut Self::Context,
    ) -> Self::Result {
    }
}
diff --git a/titanirc-server/src/entities/channel/events.rs b/titanirc-server/src/entities/channel/events.rs
new file mode 100644
index 0000000..f40b862
--- /dev/null
+++ b/titanirc-server/src/entities/channel/events.rs
@@ -0,0 +1,38 @@
use crate::entities::user::User;
use actix::prelude::*;

pub type JoinResult = Result<super::Handle, JoinError>;

/// Send from `User` to `Channel` via `Server`, the `Channel` then replies back
/// with a direct handle for the `User` to interact with the `Channel`.
#[derive(Message)]
#[rtype(result = "JoinResult")]
pub struct Join {
    pub channel_name: String,
    pub nick: String,
    pub user: Addr<User>,
}

#[derive(Debug, thiserror::Error, displaydoc::Display)]
pub enum JoinError {
    /// Failed to send join request to channel: {0}
    Mailbox(#[from] actix::MailboxError),
}

/// Sent directly to every `User` when another `User` joins the channel.
#[derive(Message)]
#[rtype(result = "")]
pub struct JoinBroadcast {
    pub channel_name: String,
    pub nick: String,
}

impl From<Join> for JoinBroadcast {
    fn from(
        Join {
            channel_name, nick, ..
        }: Join,
    ) -> Self {
        Self { channel_name, nick }
    }
}
diff --git a/titanirc-server/src/entities/channel/mod.rs b/titanirc-server/src/entities/channel/mod.rs
new file mode 100644
index 0000000..3bc306d
--- /dev/null
+++ b/titanirc-server/src/entities/channel/mod.rs
@@ -0,0 +1,70 @@
pub mod events;

use actix::prelude::*;
use std::sync::Arc;

use crate::entities::user::User;

/// A handle to this `Channel` that `User` actors can use to communicate with the
/// rest of the channel.
pub struct Handle {
    //pub name_change: actix::Recipient<super::user::events::NameChange>,
    pub message: actix::Recipient<super::common_events::Message>,
}

/// An IRC channel.
pub struct Channel {
    pub members: Vec<Addr<User>>,
}

impl Channel {
    pub fn new() -> Self {
        Self {
            members: Vec::new(),
        }
    }

    /// Announce a user's join event to the rest of the channel.
    fn announce_join(&self, join: events::Join) -> impl Future<Output = ()> {
        let mut futures = Vec::new();

        let broadcast = Arc::new(events::JoinBroadcast::from(join));

        for member in &self.members {
            futures.push(member.send(broadcast.clone()));
        }

        async {
            futures_util::future::try_join_all(futures).await.unwrap();
        }
    }
}

impl Actor for Channel {
    type Context = Context<Self>;
}

impl actix::Handler<events::Join> for Channel {
    type Result = events::JoinResult;

    fn handle(&mut self, msg: events::Join, ctx: &mut Self::Context) -> Self::Result {
        self.members.push(msg.user.clone());

        ctx.spawn(self.announce_join(msg).into_actor(self));

        Ok(Handle {
            message: ctx.address().recipient(),
        })
    }
}

impl actix::Handler<super::common_events::Message> for Channel {
    type Result = ();

    fn handle(
        &mut self,
        _msg: super::common_events::Message,
        _ctx: &mut Self::Context,
    ) -> Self::Result {
    }
}
diff --git a/titanirc-server/src/entities/user.rs b/titanirc-server/src/entities/user.rs
deleted file mode 100644
index 738f283..0000000
--- a/titanirc-server/src/entities/user.rs
+++ /dev/null
@@ -1,148 +0,0 @@
use actix::{
    io::{FramedWrite, WriteHandler},
    prelude::*,
};
use std::time::{Duration, Instant};
use titanirc_types::Command;
use tokio::{io::WriteHalf, net::TcpStream};

pub mod events {
    use actix::prelude::*;

    #[derive(Message)]
    #[rtype(result = "()")]
    pub struct NameChange {
        pub old: String,
        pub new: String,
    }
}

pub struct User {
    pub server: Addr<crate::server::Server>,
    pub writer:
        FramedWrite<titanirc_types::ServerMessage, WriteHalf<TcpStream>, titanirc_codec::Encoder>,
    pub last_active: Instant,
    pub nick: Option<String>, // should probably be an arc so we dont have to keep cloning the string
}

fn schedule_ping(ctx: &mut <User as Actor>::Context) {
    ctx.run_later(Duration::from_secs(30), |act, ctx| {
        if Instant::now().duration_since(act.last_active) > Duration::from_secs(240) {
            // send `QUIT :Ping timeout: 120 seconds` & `ERROR :Closing Link: {ip} (Ping timeout: 120 seconds)`
            eprintln!("ping timeout");
            ctx.stop();
        }

        act.writer.write(titanirc_types::ServerMessage::Ping);
        schedule_ping(ctx);
    });
}

impl Actor for User {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        schedule_ping(ctx);
    }
}

impl WriteHandler<std::io::Error> for User {}

impl StreamHandler<Result<Command, std::io::Error>> for User {
    /// This is main event loop for client requests
    fn handle(&mut self, cmd: Result<Command, std::io::Error>, ctx: &mut Self::Context) {
        self.last_active = Instant::now();

        match cmd {
            Ok(Command::Nick(titanirc_types::NickCommand { nick })) => {
                self.writer.write(titanirc_types::Reply::RplWelcome.into());
                self.writer.write(titanirc_types::Reply::RplYourHost.into());
                self.writer.write(titanirc_types::Reply::RplCreated.into());
                self.writer.write(titanirc_types::Reply::RplMyInfo.into());
                self.writer.write(titanirc_types::Reply::RplISupport.into());
                self.nick = Some(nick.0);
                // LUSERS
                // RPL_UMODEIS
                // MOTD
            }
            Ok(Command::Join(titanirc_types::JoinCommand { channel })) => {
                if let Some(ref nick) = self.nick {
                    let server_addr = self.server.clone();
                    let ctx_addr = ctx.address();
                    let nick = nick.clone();

                    ctx.spawn(
                        async move {
                            server_addr
                                .send(crate::entities::channel::events::Join {
                                    channel_name: channel.0,
                                    user: ctx_addr,
                                    nick,
                                })
                                .await
                                .unwrap()
                                .unwrap();

                            println!("joined chan!");
                        }
                        .into_actor(self),
                    );
                }
            }
            Ok(Command::Mode(titanirc_types::ModeCommand { mode, .. })) => self
                .writer
                .write(titanirc_types::Reply::RplUmodeIs(mode).into()),
            Ok(Command::Motd(_)) => {
                self.writer.write(
                    titanirc_types::Reply::RplMotdStart(titanirc_types::ServerName(
                        "my.test.server".to_string(),
                    ))
                    .into(),
                );
                self.writer.write(
                    titanirc_types::Reply::RplMotd(titanirc_types::FreeText(
                        "Hello, welcome to this server!".to_string(),
                    ))
                    .into(),
                );
                self.writer.write(
                    titanirc_types::Reply::RplMotd(titanirc_types::FreeText(
                        "it's very cool!".to_string(),
                    ))
                    .into(),
                );
                self.writer
                    .write(titanirc_types::Reply::RplEndOfMotd.into());
            }
            Ok(Command::Version(_)) => self.writer.write(
                titanirc_types::Reply::RplVersion(
                    clap::crate_version!().to_string(),
                    "release".to_string(),
                    titanirc_types::ServerName("my.test.server".to_string()),
                    titanirc_types::FreeText("https://github.com/MITBorg/titanirc".to_string()),
                )
                .into(),
            ),
            Ok(Command::Pong(_)) => {}
            Ok(cmd) => println!("cmd: {:?}", cmd),
            Err(e) => eprintln!("error decoding: {}", e),
        }
    }
}

impl actix::Handler<std::sync::Arc<crate::entities::channel::events::JoinBroadcast>> for User {
    type Result = ();

    fn handle(
        &mut self,
        msg: std::sync::Arc<crate::entities::channel::events::JoinBroadcast>,
        ctx: &mut Self::Context,
    ) -> Self::Result {
        self.writer.write(titanirc_types::ServerMessage::Command(
            titanirc_types::Source::User(titanirc_types::Nick(msg.nick.clone())),
            titanirc_types::Command::Join(titanirc_types::JoinCommand {
                channel: titanirc_types::Channel::from(msg.channel_name.clone()),
            }),
        ));
    }
}
diff --git a/titanirc-server/src/entities/user/commands.rs b/titanirc-server/src/entities/user/commands.rs
new file mode 100644
index 0000000..af5b679
--- /dev/null
+++ b/titanirc-server/src/entities/user/commands.rs
@@ -0,0 +1,113 @@
use std::time::Instant;

use actix::{Actor, AsyncContext, StreamHandler, WrapFuture};
use titanirc_types::{Command, JoinCommand, ModeCommand, MotdCommand, NickCommand, VersionCommand};

pub trait CommandHandler<T>: Actor {
    fn handle_cmd(&mut self, command: T, ctx: &mut Self::Context);
}

impl StreamHandler<Result<Command, std::io::Error>> for super::User {
    fn handle(&mut self, cmd: Result<Command, std::io::Error>, ctx: &mut Self::Context) {
        self.last_active = Instant::now();

        match cmd {
            Ok(Command::Nick(v)) => self.handle_cmd(v, ctx),
            Ok(Command::Join(v)) => self.handle_cmd(v, ctx),
            Ok(Command::Mode(v)) => self.handle_cmd(v, ctx),
            Ok(Command::Motd(v)) => self.handle_cmd(v, ctx),
            Ok(Command::Version(v)) => self.handle_cmd(v, ctx),
            Ok(Command::Pong(_)) => {}
            Ok(cmd) => println!("cmd: {:?}", cmd),
            Err(e) => eprintln!("error decoding: {}", e),
        }
    }
}

// TODO: all the 'raw' writes using byte strings below probably need to
//  be wrapped in something a bit more friendly.

impl CommandHandler<NickCommand> for super::User {
    fn handle_cmd(&mut self, NickCommand { nick }: NickCommand, _ctx: &mut Self::Context) {
        self.writer.write(titanirc_types::Reply::RplWelcome.into());
        self.writer.write(titanirc_types::Reply::RplYourHost.into());
        self.writer.write(titanirc_types::Reply::RplCreated.into());
        self.writer.write(titanirc_types::Reply::RplMyInfo.into());
        self.writer.write(titanirc_types::Reply::RplISupport.into());
        self.nick = Some(std::str::from_utf8(&nick.0[..]).unwrap().to_string());
        // LUSERS
        // RPL_UMODEIS
        // MOTD
    }
}

impl CommandHandler<JoinCommand> for super::User {
    fn handle_cmd(&mut self, JoinCommand { channel }: JoinCommand, ctx: &mut Self::Context) {
        if let Some(ref nick) = self.nick {
            let server_addr = self.server.clone();
            let ctx_addr = ctx.address();
            let nick = nick.clone();

            ctx.spawn(
                async move {
                    server_addr
                        .send(crate::entities::channel::events::Join {
                            channel_name: std::str::from_utf8(&channel.0[..]).unwrap().to_string(),
                            user: ctx_addr,
                            nick,
                        })
                        .await
                        .unwrap()
                        .unwrap();

                    println!("joined chan!");
                }
                .into_actor(self),
            );
        }
    }
}

impl CommandHandler<ModeCommand> for super::User {
    fn handle_cmd(&mut self, ModeCommand { mode, .. }: ModeCommand, _ctx: &mut Self::Context) {
        self.writer
            .write(titanirc_types::Reply::RplUmodeIs(mode).into())
    }
}

impl CommandHandler<MotdCommand> for super::User {
    fn handle_cmd(&mut self, _command: MotdCommand, _ctx: &mut Self::Context) {
        static SERVER_NAME: bytes::Bytes = bytes::Bytes::from_static(b"my.test.server");
        static MOTD1: bytes::Bytes = bytes::Bytes::from_static(b"Hello, welcome to this server!");
        static MOTD2: bytes::Bytes = bytes::Bytes::from_static(b"it's very cool!");

        self.writer.write(
            titanirc_types::Reply::RplMotdStart(titanirc_types::ServerName(SERVER_NAME.clone()))
                .into(),
        );
        self.writer
            .write(titanirc_types::Reply::RplMotd(titanirc_types::FreeText(MOTD1.clone())).into());
        self.writer
            .write(titanirc_types::Reply::RplMotd(titanirc_types::FreeText(MOTD2.clone())).into());
        self.writer
            .write(titanirc_types::Reply::RplEndOfMotd.into());
    }
}

impl CommandHandler<VersionCommand> for super::User {
    fn handle_cmd(&mut self, _command: VersionCommand, _ctx: &mut Self::Context) {
        static SERVER_NAME: bytes::Bytes = bytes::Bytes::from_static(b"my.test.server");
        static INFO: bytes::Bytes =
            bytes::Bytes::from_static(b"https://github.com/MITBorg/titanirc");

        self.writer.write(
            titanirc_types::Reply::RplVersion(
                clap::crate_version!().to_string(),
                "release".to_string(),
                titanirc_types::ServerName(SERVER_NAME.clone()),
                titanirc_types::FreeText(INFO.clone()),
            )
            .into(),
        )
    }
}
diff --git a/titanirc-server/src/entities/user/events.rs b/titanirc-server/src/entities/user/events.rs
new file mode 100644
index 0000000..0f5feb4
--- /dev/null
+++ b/titanirc-server/src/entities/user/events.rs
@@ -0,0 +1,8 @@
use actix::prelude::*;

#[derive(Message)]
#[rtype(result = "()")]
pub struct NameChange {
    pub old: String,
    pub new: String,
}
diff --git a/titanirc-server/src/entities/user/mod.rs b/titanirc-server/src/entities/user/mod.rs
new file mode 100644
index 0000000..c0eff2a
--- /dev/null
+++ b/titanirc-server/src/entities/user/mod.rs
@@ -0,0 +1,78 @@
mod commands;
pub mod events;

use crate::{entities::channel::events::JoinBroadcast, server::Server};

use std::sync::Arc;

use actix::{
    io::{FramedWrite, WriteHandler},
    prelude::*,
};
use std::time::{Duration, Instant};
use titanirc_types::{Channel, JoinCommand, ServerMessage, Source};
use tokio::{io::WriteHalf, net::TcpStream};

pub struct User {
    pub server: Addr<Server>,
    pub writer: FramedWrite<ServerMessage, WriteHalf<TcpStream>, titanirc_codec::Encoder>,
    pub last_active: Instant,
    pub nick: Option<String>,
}

// TODO: broadcast a leave to all the user's channels on actor shutdown

impl User {
    pub fn new(
        server: Addr<Server>,
        writer: FramedWrite<ServerMessage, WriteHalf<TcpStream>, titanirc_codec::Encoder>,
    ) -> Self {
        Self {
            server,
            writer,
            last_active: Instant::now(),
            nick: None,
        }
    }
}

/// Sends a ping to the user every 30 seconds, if we haven't received a protocol
/// message from the user in 240 seconds then disconnect.
fn schedule_ping(ctx: &mut <User as Actor>::Context) {
    ctx.run_later(Duration::from_secs(30), |act, ctx| {
        if Instant::now().duration_since(act.last_active) > Duration::from_secs(240) {
            // send `QUIT :Ping timeout: 120 seconds` & `ERROR :Closing Link: {ip} (Ping timeout: 120 seconds)`
            ctx.stop();
        }

        act.writer.write(titanirc_types::ServerMessage::Ping);
        schedule_ping(ctx);
    });
}

impl Actor for User {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        schedule_ping(ctx);
    }
}

/// Handles errors from our socket Writer.
impl WriteHandler<std::io::Error> for User {}

/// Handles `JoinBroadcast`s sent by a channel the user is in, and forwards a
/// `JOIN` onto them.
impl actix::Handler<Arc<JoinBroadcast>> for User {
    type Result = ();

    fn handle(&mut self, msg: Arc<JoinBroadcast>, _ctx: &mut Self::Context) -> Self::Result {
        self.writer.write(ServerMessage::Command(
            Source::User(bytes::Bytes::from(msg.nick.as_bytes().to_owned()).into()),
            JoinCommand {
                channel: Channel::from(bytes::Bytes::from(msg.channel_name.as_bytes().to_owned())),
            }
            .into(),
        ));
    }
}
diff --git a/titanirc-server/src/main.rs b/titanirc-server/src/main.rs
index 6771939..8ae5e23 100644
--- a/titanirc-server/src/main.rs
+++ b/titanirc-server/src/main.rs
@@ -5,8 +5,6 @@ mod entities;
mod error;
mod server;

use std::collections::HashMap;

use crate::{
    error::Result,
    server::{Connection, Server},
@@ -29,6 +27,7 @@ async fn main() -> Result<()> {
        .await
        .map_err(InitError::TcpBind)?;

    // connection acceptor loop
    let stream = async_stream::stream! {
        loop {
            match listener.accept().await {
@@ -38,11 +37,10 @@ async fn main() -> Result<()> {
        }
    };

    // Spawn the server and pass connections from `stream` to `Handler<Connection>`.
    Server::create(move |ctx| {
        ctx.add_message_stream(stream);
        Server {
            channels: HashMap::new(),
        }
        Server::new()
    });

    println!("Running IRC server on 0.0.0.0:6667");
diff --git a/titanirc-server/src/server.rs b/titanirc-server/src/server.rs
index f243971..35b72cd 100644
--- a/titanirc-server/src/server.rs
+++ b/titanirc-server/src/server.rs
@@ -3,14 +3,28 @@ use crate::entities::{channel::Channel, user::User};
use std::{collections::HashMap, net::SocketAddr};

use actix::{io::FramedWrite, prelude::*};
use futures_util::future::TryFutureExt;
use tokio::net::TcpStream;
use tokio_util::codec::FramedRead;

/// The core of our server:
///
/// - Handles incoming connections and spawns `User` actors for each.
/// - Handles channel creation, access control, operator config, etc.
///
/// Essentially acts as the middleman for each entity communicating with each other.
pub struct Server {
    /// A list of known channels and the addresses to them.
    pub channels: HashMap<String, Addr<Channel>>,
}

impl Server {
    pub fn new() -> Self {
        Self {
            channels: HashMap::new(),
        }
    }
}

impl Actor for Server {
    type Context = Context<Self>;
}
@@ -19,6 +33,9 @@ impl Actor for Server {
#[rtype(result = "()")]
pub struct Connection(pub TcpStream, pub SocketAddr);

/// Handles incoming connections from our connection acceptor loop and spawns
/// a `User` actor for each which handles reading from the socket and acting
/// accordingly.
impl Handler<Connection> for Server {
    type Result = ();

@@ -27,37 +44,39 @@ impl Handler<Connection> for Server {

        User::create(move |ctx| {
            let (read, write) = tokio::io::split(stream);
            User::add_stream(FramedRead::new(read, titanirc_codec::Decoder), ctx);
            User {
                server: server_ctx.address(),
                writer: FramedWrite::new(write, titanirc_codec::Encoder, ctx),
                last_active: std::time::Instant::now(),
                nick: None,
            }
            let read = FramedRead::new(read, titanirc_codec::Decoder);
            let write = FramedWrite::new(write, titanirc_codec::Encoder, ctx);

            // Make our new `User` handle all events from this socket in `StreamHandler<Result<Command, _>>`.
            ctx.add_stream(read);

            User::new(server_ctx.address(), write)
        });
    }
}

/// Send by `User` actors to arbitrate access to the requested channel.
impl Handler<crate::entities::channel::events::Join> for Server {
    type Result = ResponseActFuture<Self, crate::entities::channel::events::JoinResult>;

    fn handle(
        &mut self,
        msg: crate::entities::channel::events::Join,
        ctx: &mut Self::Context,
        _ctx: &mut Self::Context,
    ) -> Self::Result {
        // get the channel or create it if it doesn't already exist
        #[allow(clippy::option_if_let_else)]
        let channel = if let Some(channel) = self.channels.get(&msg.channel_name) {
            channel
        } else {
            let channel = Channel::create(|ctx| Channel {
                members: Default::default(),
            });
            let channel = Channel::create(|_ctx| Channel::new());

            self.channels
                .entry(msg.channel_name.clone())
                .or_insert(channel)
        };

        // forward the user's join event onto the channel
        Box::pin(
            channel
                .send(msg)
diff --git a/titanirc-types/Cargo.toml b/titanirc-types/Cargo.toml
index 562d290..b71e64f 100644
--- a/titanirc-types/Cargo.toml
+++ b/titanirc-types/Cargo.toml
@@ -10,4 +10,5 @@ edition = "2018"
paste = "1.0"
nom = "6.1"
derive_more = "0.99"
bytes = "1.0"
\ No newline at end of file
bytes = "1.0"
nom-bytes = { git = "https://github.com/w4/nom-bytes" }
\ No newline at end of file
diff --git a/titanirc-types/src/lib.rs b/titanirc-types/src/lib.rs
index be5555f..582f5dc 100644
--- a/titanirc-types/src/lib.rs
+++ b/titanirc-types/src/lib.rs
@@ -7,15 +7,17 @@ mod replies;
pub use crate::primitives::*;
pub use crate::replies::{Reply, ServerMessage, Source};

use bytes::Bytes;
use nom::{
    bytes::complete::{tag, take_till},
    error::Error as NomError,
};
use nom_bytes::BytesWrapper;

fn parse_optional_source(input: &[u8]) -> nom::IResult<&[u8], &[u8]> {
    let (rest, _) = tag(":")(input)?;
fn parse_optional_source(input: BytesWrapper) -> nom::IResult<BytesWrapper, BytesWrapper> {
    let (rest, _) = tag(":".as_bytes())(input)?;
    let (rest, _) = take_till(|c| c == b' ')(rest)?;
    tag(" ")(rest)
    tag(" ".as_bytes())(rest)
}

macro_rules! define_commands {
@@ -35,19 +37,20 @@ macro_rules! define_commands {
            $(const [<$name _BYTES>]: &[u8] = stringify!($name).as_bytes();)*

            impl Command {
                pub fn parse(input: &[u8]) -> Result<Option<Self>, nom::Err<NomError<&[u8]>>> {
                pub fn parse(input: Bytes) -> Result<Option<Self>, nom::Err<NomError<BytesWrapper>>> {
                    let input = BytesWrapper::from(input);

                    // skip the optional source at the start of the message
                    let rest = if let Ok((rest, _)) = parse_optional_source(input) {
                        rest
                    let input = if let Ok((input, _)) = parse_optional_source(input.clone()) {
                        input
                    } else {
                        input
                    };

                    let (rest, kind) = take_till(|c| c == b' ')(rest)?;
                    let (params, command) = take_till(|c| c == b' ')(input)?;

                    // fix this shit
                    match std::str::from_utf8(kind).unwrap().to_uppercase().as_bytes() {
                        $([<$name _BYTES>] => Ok(Some(Self::[<$name:camel>]([<$name:camel Command>]::parse(rest)?)))),*,
                    match command.to_ascii_uppercase().as_ref() {
                        $([<$name _BYTES>] => Ok(Some(Self::[<$name:camel>]([<$name:camel Command>]::parse(params)?)))),*,
                        _ => Ok(None)
                    }
                }
@@ -69,10 +72,10 @@ macro_rules! define_commands {

                impl [<$name:camel Command>] {
                    #[allow(unused_variables)]
                    pub fn parse(rest: &[u8]) -> Result<Self, nom::Err<nom::error::Error<&[u8]>>> {
                    pub fn parse(rest: BytesWrapper) -> Result<Self, nom::Err<nom::error::Error<BytesWrapper>>> {
                        $(
                            $(
                                let (rest, _) = tag(" ")(rest)?;
                                let (rest, _) = tag(" ".as_bytes())(rest)?;
                                let (rest, [<$param:snake>]) = $param::parse(rest)?;
                            )*
                        )*
@@ -97,6 +100,12 @@ macro_rules! define_commands {
                        Ok(())
                    }
                }

                impl Into<Command> for [<$name:camel Command>] {
                    fn into(self) -> Command {
                        Command::[<$name:camel>](self)
                    }
                }
            )*
        }
    };
@@ -126,16 +135,17 @@ define_commands! {
#[cfg(test)]
mod tests {
    use super::Command;
    use bytes::Bytes;

    #[test]
    fn parse_empty() {
        assert!(matches!(Command::parse(b""), Ok(None)));
        assert!(matches!(Command::parse(Bytes::from_static(b"")), Ok(None)));
    }

    #[test]
    fn parse_privmsg() {
        assert!(matches!(
            Command::parse(b"PRIVMSG foo :baz"),
            Command::parse(Bytes::from_static(b"PRIVMSG foo :baz")),
            Ok(Some(Command::Privmsg(super::PrivmsgCommand {
                receiver: super::Receiver::User(super::Nick(nick)),
                free_text: super::primitives::FreeText(msg),
@@ -145,13 +155,8 @@ mod tests {

    #[test]
    fn parse_privmsg_opt_source() {
        eprintln!(
            "{:?}",
            Command::parse(b":some-fake-source!dude@nice PRIVMSG foo :baz")
        );

        assert!(matches!(
            Command::parse(b":some-fake-source!dude@nice PRIVMSG foo :baz"),
            Command::parse(Bytes::from_static(b":some-fake-source!dude@nice PRIVMSG foo :baz")),
            Ok(Some(Command::Privmsg(super::PrivmsgCommand {
                receiver: super::Receiver::User(super::Nick(nick)),
                free_text: super::primitives::FreeText(msg),
diff --git a/titanirc-types/src/primitives.rs b/titanirc-types/src/primitives.rs
index 7803339..c5ea3fc 100644
--- a/titanirc-types/src/primitives.rs
+++ b/titanirc-types/src/primitives.rs
@@ -1,17 +1,19 @@
use bytes::Bytes;
use derive_more::{Deref, From};
use nom::{
    bytes::complete::{tag, take_till},
    combinator::{iterator, map_res},
    combinator::{iterator},
    sequence::terminated,
    IResult,
};
use nom_bytes::BytesWrapper;

pub trait ValidatingParser {
    fn validate(bytes: &[u8]) -> bool;
}

pub trait PrimitiveParser {
    fn parse(bytes: &[u8]) -> IResult<&[u8], Self>
    fn parse(bytes: BytesWrapper) -> IResult<BytesWrapper, Self>
    where
        Self: Sized;
}
@@ -29,15 +31,22 @@ macro_rules! noop_validator {
macro_rules! free_text_primitive {
    ($name:ty) => {
        impl PrimitiveParser for $name {
            fn parse(bytes: &[u8]) -> IResult<&[u8], Self> {
                let (rest, _) = tag(b":")(bytes)?;
                Ok((&[], Self(std::str::from_utf8(rest).unwrap().to_string())))
            fn parse(bytes: BytesWrapper) -> IResult<BytesWrapper, Self> {
                let (rest, _) = tag(":".as_bytes())(bytes)?;
                Ok((Bytes::new().into(), Self(rest.into())))
            }
        }

        impl std::fmt::Display for $name {
            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
                f.write_str(&self.0)
                match std::str::from_utf8(&self.0[..]) {
                    Ok(v) => f.write_str(v),
                    Err(_e) => {
                        // todo: report this better
                        eprintln!("Invalid utf-8 in {}", stringify!($name));
                        Err(std::fmt::Error)
                    }
                }
            }
        }
    };
@@ -46,24 +55,30 @@ macro_rules! free_text_primitive {
macro_rules! space_terminated_primitive {
    ($name:ty) => {
        impl PrimitiveParser for $name {
            fn parse(bytes: &[u8]) -> IResult<&[u8], Self> {
                let (rest, val) = map_res(take_till(|c| c == b' '), std::str::from_utf8)(bytes)?;
            fn parse(bytes: BytesWrapper) -> IResult<BytesWrapper, Self> {
                let (rest, val) = take_till(|c| c == b' ')(bytes.clone())?;

                if !<Self as ValidatingParser>::validate(val.as_bytes()) {
                if !<Self as ValidatingParser>::validate(&val[..]) {
                    return Err(nom::Err::Failure(nom::error::Error::new(
                        bytes,
                        nom::error::ErrorKind::Verify,
                    )));
                }

                // TODO: don't clone
                Ok((rest, Self(val.to_string())))
                Ok((rest, Self(val.into())))
            }
        }

        impl std::fmt::Display for $name {
            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
                f.write_str(&self.0)
                match std::str::from_utf8(&self.0[..]) {
                    Ok(v) => f.write_str(v),
                    Err(_e) => {
                        // todo: report this better
                        eprintln!("Invalid utf-8 in {}", stringify!($name));
                        Err(std::fmt::Error)
                    }
                }
            }
        }
    };
@@ -122,32 +137,32 @@ impl ValidatingParser for Special {
}

#[derive(Debug, Deref, From)]
pub struct Username(pub String);
pub struct Username(pub Bytes);
space_terminated_primitive!(Username);
noop_validator!(Username);

#[derive(Debug, Deref, From)]
pub struct Mode(pub String);
pub struct Mode(pub Bytes);
space_terminated_primitive!(Mode);
noop_validator!(Mode);

#[derive(Debug, Deref, From)]
pub struct HostName(pub String);
pub struct HostName(pub Bytes);
space_terminated_primitive!(HostName);
noop_validator!(HostName);

#[derive(Debug, Deref, From)]
pub struct ServerName(pub String);
pub struct ServerName(pub Bytes);
space_terminated_primitive!(ServerName);
noop_validator!(ServerName);

#[derive(Debug, Deref, From)]
pub struct RealName(pub String);
pub struct RealName(pub Bytes);
space_terminated_primitive!(RealName);
noop_validator!(RealName);

#[derive(Debug, Deref, From)]
pub struct Nick(pub String);
pub struct Nick(pub Bytes);
space_terminated_primitive!(Nick);

// TODO: i feel like this would be better suited as a nom chomper to stop
@@ -169,12 +184,12 @@ impl ValidatingParser for Nick {
}

#[derive(Debug, Deref, From)]
pub struct Channel(pub String);
pub struct Channel(pub Bytes);
space_terminated_primitive!(Channel);
noop_validator!(Channel);

#[derive(Debug, Deref, From)]
pub struct FreeText(pub String);
pub struct FreeText(pub Bytes);
free_text_primitive!(FreeText);
noop_validator!(FreeText);

@@ -183,12 +198,13 @@ pub struct Nicks(pub Vec<Nick>);
space_delimited_display!(Nicks);

impl PrimitiveParser for Nicks {
    fn parse(bytes: &[u8]) -> IResult<&[u8], Self> {
        let mut it = iterator(bytes, terminated(take_till(|c| c == b' '), tag(b" ")));
    fn parse(bytes: BytesWrapper) -> IResult<BytesWrapper, Self> {
        let mut it = iterator(
            bytes,
            terminated(take_till(|c| c == b' '), tag(" ".as_bytes())),
        );

        let parsed = it
            .map(|v| Nick(std::str::from_utf8(v).unwrap().to_string()))
            .collect();
        let parsed = it.map(|v| Nick(v.into())).collect();

        it.finish()
            .map(move |(remaining, _)| (remaining, Self(parsed)))
@@ -245,20 +261,20 @@ pub enum Receiver {
}

impl std::ops::Deref for Receiver {
    type Target = String;
    type Target = str;

    fn deref(&self) -> &Self::Target {
        match self {
        std::str::from_utf8(match self {
            Self::User(nick) => &*nick,
            Self::Channel(channel) => &*channel,
        }
        })
        .unwrap()
    }
}

impl PrimitiveParser for Receiver {
    fn parse(bytes: &[u8]) -> IResult<&[u8], Self> {
        if let Ok((_, _)) = nom::bytes::complete::tag::<_, _, nom::error::Error<&[u8]>>("#")(bytes)
        {
    fn parse(bytes: BytesWrapper) -> IResult<BytesWrapper, Self> {
        if bytes.get(0) == Some(&b'#') {
            let (rest, channel) = Channel::parse(bytes)?;
            Ok((rest, Self::Channel(channel)))
        } else {
diff --git a/titanirc-types/src/replies.rs b/titanirc-types/src/replies.rs
index e89296f..0e8809b 100644
--- a/titanirc-types/src/replies.rs
+++ b/titanirc-types/src/replies.rs
@@ -1,3 +1,5 @@
#![allow(clippy::wildcard_imports)]

use crate::{primitives::*, Command};
use std::fmt::Write;

@@ -30,7 +32,7 @@ impl ServerMessage {
            Self::Pong => write!(dst, "PONG :{}", server_name),
            Self::Command(source, command) => {
                let source = match &source {
                    Source::User(nick) => nick.as_str(),
                    Source::User(nick) => std::str::from_utf8(nick).unwrap(),
                    Source::Server => server_name,
                };
                write!(dst, ":{} {}", source, command)