🏡 index : ~doyle/titanirc.git

author Jordan Doyle <jordan@doyle.la> 2021-01-30 18:38:55.0 +00:00:00
committer Jordan Doyle <jordan@doyle.la> 2021-01-30 18:38:55.0 +00:00:00
commit
292cd71297abde6adbf1a8a01b8b72f0c046b622 [patch]
tree
0fbbf7ea5ad4574c48cf14820be9c964b567952c
parent
5ca8ef9e99ff57e347f54c53b495c86b591b3694
download
292cd71297abde6adbf1a8a01b8b72f0c046b622.tar.gz

Introduce concept of channels



Diff

 Cargo.lock                              |  28 ++++++-
 titanirc-server/Cargo.toml              |   3 +-
 titanirc-server/src/entities/channel.rs |  97 ++++++++++++++++++++++-
 titanirc-server/src/entities/mod.rs     |  13 +++-
 titanirc-server/src/entities/user.rs    | 148 +++++++++++++++++++++++++++++++++-
 titanirc-server/src/main.rs             |   8 +-
 titanirc-server/src/server.rs           |  48 +++++++++--
 titanirc-server/src/session.rs          |  90 +--------------------
 titanirc-types/src/lib.rs               |   2 +-
 9 files changed, 336 insertions(+), 101 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 5ea1955..1a756ae 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -288,6 +288,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28be053525281ad8259d47e4de5de657b25e7bac113458555bb4b70bc6870500"

[[package]]
name = "futures-macro"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c287d25add322d9f9abdcdc5927ca398917996600182178774032e9f8258fedd"
dependencies = [
 "proc-macro-hack",
 "proc-macro2",
 "quote",
 "syn",
]

[[package]]
name = "futures-sink"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -309,9 +321,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "632a8cd0f2a4b3fdea1657f08bde063848c3bd00f9bbf6e256b8be78802e624b"
dependencies = [
 "futures-core",
 "futures-macro",
 "futures-task",
 "pin-project-lite",
 "pin-utils",
 "proc-macro-hack",
 "proc-macro-nested",
 "slab",
]

@@ -631,6 +646,18 @@ dependencies = [
]

[[package]]
name = "proc-macro-hack"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"

[[package]]
name = "proc-macro-nested"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"

[[package]]
name = "proc-macro2"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -861,6 +888,7 @@ dependencies = [
 "async-stream",
 "clap",
 "displaydoc",
 "futures-util",
 "thiserror",
 "titanirc-codec",
 "titanirc-types",
diff --git a/titanirc-server/Cargo.toml b/titanirc-server/Cargo.toml
index 8cb9be8..1b4032c 100644
--- a/titanirc-server/Cargo.toml
+++ b/titanirc-server/Cargo.toml
@@ -17,4 +17,5 @@ tokio-util = "0.6"
async-stream = "0.3"
thiserror = "1"
displaydoc = "0.1"
clap = "3.0.0-beta.2"
\ No newline at end of file
clap = "3.0.0-beta.2"
futures-util = "0.3"
\ No newline at end of file
diff --git a/titanirc-server/src/entities/channel.rs b/titanirc-server/src/entities/channel.rs
new file mode 100644
index 0000000..b399cc2
--- /dev/null
+++ b/titanirc-server/src/entities/channel.rs
@@ -0,0 +1,97 @@
use std::sync::Arc;

use actix::prelude::*;

use crate::{entities::user::User, server::Server};

pub mod events {
    use crate::entities::user::User;
    use actix::prelude::*;

    pub type JoinResult = Result<super::Handle, JoinError>;

    #[derive(Message)]
    #[rtype(result = "JoinResult")]
    pub struct Join {
        pub channel_name: String,
        pub nick: String,
        pub user: Addr<User>,
    }

    #[derive(Debug, thiserror::Error, displaydoc::Display)]
    pub enum JoinError {
        /// Failed to send join request to channel: {0}
        Mailbox(#[from] actix::MailboxError),
    }

    #[derive(Message)]
    #[rtype(result = "")]
    pub struct JoinBroadcast {
        pub channel_name: String,
        pub nick: String,
    }

    impl From<Join> for JoinBroadcast {
        fn from(
            Join {
                channel_name, nick, ..
            }: Join,
        ) -> Self {
            Self { channel_name, nick }
        }
    }
}

pub struct Handle {
    //pub name_change: actix::Recipient<super::user::events::NameChange>,
    pub message: actix::Recipient<super::common_events::Message>,
}

pub struct Channel {
    pub members: Vec<Addr<User>>,
}

impl Channel {
    fn announce_join(&self, join: events::Join) -> impl Future<Output = ()> {
        let mut futures = Vec::new();

        let broadcast = Arc::new(events::JoinBroadcast::from(join));

        for member in &self.members {
            futures.push(member.send(broadcast.clone()));
        }

        async {
            futures_util::future::try_join_all(futures).await.unwrap();
        }
    }
}

impl Actor for Channel {
    type Context = Context<Self>;
}

impl actix::Handler<events::Join> for Channel {
    type Result = events::JoinResult;

    fn handle(&mut self, msg: events::Join, ctx: &mut Self::Context) -> Self::Result {
        self.members.push(msg.user.clone());

        ctx.spawn(self.announce_join(msg).into_actor(self));

        Ok(Handle {
            message: ctx.address().recipient(),
        })
    }
}

impl actix::Handler<super::common_events::Message> for Channel {
    type Result = ();

    fn handle(
        &mut self,
        msg: super::common_events::Message,
        ctx: &mut Self::Context,
    ) -> Self::Result {
    }
}
diff --git a/titanirc-server/src/entities/mod.rs b/titanirc-server/src/entities/mod.rs
new file mode 100644
index 0000000..56754c4
--- /dev/null
+++ b/titanirc-server/src/entities/mod.rs
@@ -0,0 +1,13 @@
pub mod channel;
pub mod user;

pub mod common_events {
    use actix::prelude::*;

    #[derive(Debug, Message)]
    #[rtype(result = "()")]
    pub struct Message {
        from: String,
        message: String,
    }
}
diff --git a/titanirc-server/src/entities/user.rs b/titanirc-server/src/entities/user.rs
new file mode 100644
index 0000000..738f283
--- /dev/null
+++ b/titanirc-server/src/entities/user.rs
@@ -0,0 +1,148 @@
use actix::{
    io::{FramedWrite, WriteHandler},
    prelude::*,
};
use std::time::{Duration, Instant};
use titanirc_types::Command;
use tokio::{io::WriteHalf, net::TcpStream};

pub mod events {
    use actix::prelude::*;

    #[derive(Message)]
    #[rtype(result = "()")]
    pub struct NameChange {
        pub old: String,
        pub new: String,
    }
}

pub struct User {
    pub server: Addr<crate::server::Server>,
    pub writer:
        FramedWrite<titanirc_types::ServerMessage, WriteHalf<TcpStream>, titanirc_codec::Encoder>,
    pub last_active: Instant,
    pub nick: Option<String>, // should probably be an arc so we dont have to keep cloning the string
}

fn schedule_ping(ctx: &mut <User as Actor>::Context) {
    ctx.run_later(Duration::from_secs(30), |act, ctx| {
        if Instant::now().duration_since(act.last_active) > Duration::from_secs(240) {
            // send `QUIT :Ping timeout: 120 seconds` & `ERROR :Closing Link: {ip} (Ping timeout: 120 seconds)`
            eprintln!("ping timeout");
            ctx.stop();
        }

        act.writer.write(titanirc_types::ServerMessage::Ping);
        schedule_ping(ctx);
    });
}

impl Actor for User {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        schedule_ping(ctx);
    }
}

impl WriteHandler<std::io::Error> for User {}

impl StreamHandler<Result<Command, std::io::Error>> for User {
    /// This is main event loop for client requests
    fn handle(&mut self, cmd: Result<Command, std::io::Error>, ctx: &mut Self::Context) {
        self.last_active = Instant::now();

        match cmd {
            Ok(Command::Nick(titanirc_types::NickCommand { nick })) => {
                self.writer.write(titanirc_types::Reply::RplWelcome.into());
                self.writer.write(titanirc_types::Reply::RplYourHost.into());
                self.writer.write(titanirc_types::Reply::RplCreated.into());
                self.writer.write(titanirc_types::Reply::RplMyInfo.into());
                self.writer.write(titanirc_types::Reply::RplISupport.into());
                self.nick = Some(nick.0);
                // LUSERS
                // RPL_UMODEIS
                // MOTD
            }
            Ok(Command::Join(titanirc_types::JoinCommand { channel })) => {
                if let Some(ref nick) = self.nick {
                    let server_addr = self.server.clone();
                    let ctx_addr = ctx.address();
                    let nick = nick.clone();

                    ctx.spawn(
                        async move {
                            server_addr
                                .send(crate::entities::channel::events::Join {
                                    channel_name: channel.0,
                                    user: ctx_addr,
                                    nick,
                                })
                                .await
                                .unwrap()
                                .unwrap();

                            println!("joined chan!");
                        }
                        .into_actor(self),
                    );
                }
            }
            Ok(Command::Mode(titanirc_types::ModeCommand { mode, .. })) => self
                .writer
                .write(titanirc_types::Reply::RplUmodeIs(mode).into()),
            Ok(Command::Motd(_)) => {
                self.writer.write(
                    titanirc_types::Reply::RplMotdStart(titanirc_types::ServerName(
                        "my.test.server".to_string(),
                    ))
                    .into(),
                );
                self.writer.write(
                    titanirc_types::Reply::RplMotd(titanirc_types::FreeText(
                        "Hello, welcome to this server!".to_string(),
                    ))
                    .into(),
                );
                self.writer.write(
                    titanirc_types::Reply::RplMotd(titanirc_types::FreeText(
                        "it's very cool!".to_string(),
                    ))
                    .into(),
                );
                self.writer
                    .write(titanirc_types::Reply::RplEndOfMotd.into());
            }
            Ok(Command::Version(_)) => self.writer.write(
                titanirc_types::Reply::RplVersion(
                    clap::crate_version!().to_string(),
                    "release".to_string(),
                    titanirc_types::ServerName("my.test.server".to_string()),
                    titanirc_types::FreeText("https://github.com/MITBorg/titanirc".to_string()),
                )
                .into(),
            ),
            Ok(Command::Pong(_)) => {}
            Ok(cmd) => println!("cmd: {:?}", cmd),
            Err(e) => eprintln!("error decoding: {}", e),
        }
    }
}

impl actix::Handler<std::sync::Arc<crate::entities::channel::events::JoinBroadcast>> for User {
    type Result = ();

    fn handle(
        &mut self,
        msg: std::sync::Arc<crate::entities::channel::events::JoinBroadcast>,
        ctx: &mut Self::Context,
    ) -> Self::Result {
        self.writer.write(titanirc_types::ServerMessage::Command(
            titanirc_types::Source::User(titanirc_types::Nick(msg.nick.clone())),
            titanirc_types::Command::Join(titanirc_types::JoinCommand {
                channel: titanirc_types::Channel::from(msg.channel_name.clone()),
            }),
        ));
    }
}
diff --git a/titanirc-server/src/main.rs b/titanirc-server/src/main.rs
index 95f8781..6771939 100644
--- a/titanirc-server/src/main.rs
+++ b/titanirc-server/src/main.rs
@@ -1,9 +1,11 @@
#![deny(clippy::pedantic)]
#![allow(clippy::missing_errors_doc)]

mod entities;
mod error;
mod server;
mod session;

use std::collections::HashMap;

use crate::{
    error::Result,
@@ -38,7 +40,9 @@ async fn main() -> Result<()> {

    Server::create(move |ctx| {
        ctx.add_message_stream(stream);
        Server {}
        Server {
            channels: HashMap::new(),
        }
    });

    println!("Running IRC server on 0.0.0.0:6667");
diff --git a/titanirc-server/src/server.rs b/titanirc-server/src/server.rs
index e0af39d..f243971 100644
--- a/titanirc-server/src/server.rs
+++ b/titanirc-server/src/server.rs
@@ -1,12 +1,15 @@
use crate::session::Session;
use crate::entities::{channel::Channel, user::User};

use std::net::SocketAddr;
use std::{collections::HashMap, net::SocketAddr};

use actix::{io::FramedWrite, prelude::*};
use futures_util::future::TryFutureExt;
use tokio::net::TcpStream;
use tokio_util::codec::FramedRead;

pub struct Server {}
pub struct Server {
    pub channels: HashMap<String, Addr<Channel>>,
}

impl Actor for Server {
    type Context = Context<Self>;
@@ -19,16 +22,47 @@ pub struct Connection(pub TcpStream, pub SocketAddr);
impl Handler<Connection> for Server {
    type Result = ();

    fn handle(&mut self, Connection(stream, remote): Connection, _ctx: &mut Self::Context) {
    fn handle(&mut self, Connection(stream, remote): Connection, server_ctx: &mut Self::Context) {
        println!("Accepted connection from {}", remote);

        Session::create(move |ctx| {
        User::create(move |ctx| {
            let (read, write) = tokio::io::split(stream);
            Session::add_stream(FramedRead::new(read, titanirc_codec::Decoder), ctx);
            Session {
            User::add_stream(FramedRead::new(read, titanirc_codec::Decoder), ctx);
            User {
                server: server_ctx.address(),
                writer: FramedWrite::new(write, titanirc_codec::Encoder, ctx),
                last_active: std::time::Instant::now(),
                nick: None,
            }
        });
    }
}

impl Handler<crate::entities::channel::events::Join> for Server {
    type Result = ResponseActFuture<Self, crate::entities::channel::events::JoinResult>;

    fn handle(
        &mut self,
        msg: crate::entities::channel::events::Join,
        ctx: &mut Self::Context,
    ) -> Self::Result {
        let channel = if let Some(channel) = self.channels.get(&msg.channel_name) {
            channel
        } else {
            let channel = Channel::create(|ctx| Channel {
                members: Default::default(),
            });

            self.channels
                .entry(msg.channel_name.clone())
                .or_insert(channel)
        };

        Box::pin(
            channel
                .send(msg)
                .into_actor(self)
                .map(|v, _, _| v.map_err(|e| e.into()).and_then(|v| v)),
        )
    }
}
diff --git a/titanirc-server/src/session.rs b/titanirc-server/src/session.rs
deleted file mode 100644
index f87cd9f..0000000
--- a/titanirc-server/src/session.rs
+++ /dev/null
@@ -1,90 +0,0 @@
use actix::{
    io::{FramedWrite, WriteHandler},
    prelude::*,
};
use std::time::{Duration, Instant};
use titanirc_types::Command;
use tokio::{io::WriteHalf, net::TcpStream};

pub struct Session {
    pub writer:
        FramedWrite<titanirc_types::ServerMessage, WriteHalf<TcpStream>, titanirc_codec::Encoder>,
    pub last_active: Instant,
}

fn schedule_ping(ctx: &mut <Session as Actor>::Context) {
    ctx.run_later(Duration::from_secs(30), |act, ctx| {
        if Instant::now().duration_since(act.last_active) > Duration::from_secs(240) {
            // send `QUIT :Ping timeout: 120 seconds` & `ERROR :Closing Link: {ip} (Ping timeout: 120 seconds)`
            eprintln!("ping timeout");
            ctx.stop();
        }

        act.writer.write(titanirc_types::ServerMessage::Ping);
        schedule_ping(ctx);
    });
}

impl Actor for Session {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        schedule_ping(ctx);
    }
}

impl WriteHandler<std::io::Error> for Session {}

impl StreamHandler<Result<Command, std::io::Error>> for Session {
    /// This is main event loop for client requests
    fn handle(&mut self, cmd: Result<Command, std::io::Error>, _ctx: &mut Self::Context) {
        self.last_active = Instant::now();

        match cmd {
            Ok(Command::Nick(_v)) => {
                self.writer.write(titanirc_types::Reply::RplWelcome.into());
                self.writer.write(titanirc_types::Reply::RplYourHost.into());
                self.writer.write(titanirc_types::Reply::RplCreated.into());
                self.writer.write(titanirc_types::Reply::RplMyInfo.into());
                self.writer.write(titanirc_types::Reply::RplISupport.into());
            }
            Ok(Command::Mode(titanirc_types::ModeCommand { mode, .. })) => self
                .writer
                .write(titanirc_types::Reply::RplUmodeIs(mode).into()),
            Ok(Command::Motd(_)) => {
                self.writer.write(
                    titanirc_types::Reply::RplMotdStart(titanirc_types::ServerName(
                        "my.test.server".to_string(),
                    ))
                    .into(),
                );
                self.writer.write(
                    titanirc_types::Reply::RplMotd(titanirc_types::FreeText(
                        "Hello, welcome to this server!".to_string(),
                    ))
                    .into(),
                );
                self.writer.write(
                    titanirc_types::Reply::RplMotd(titanirc_types::FreeText(
                        "it's very cool!".to_string(),
                    ))
                    .into(),
                );
                self.writer
                    .write(titanirc_types::Reply::RplEndOfMotd.into());
            }
            Ok(Command::Version(_)) => self.writer.write(
                titanirc_types::Reply::RplVersion(
                    clap::crate_version!().to_string(),
                    "release".to_string(),
                    titanirc_types::ServerName("my.test.server".to_string()),
                    titanirc_types::FreeText("https://github.com/MITBorg/titanirc".to_string()),
                )
                .into(),
            ),
            Ok(Command::Pong(_)) => {}
            Ok(cmd) => println!("cmd: {:?}", cmd),
            Err(e) => eprintln!("error decoding: {}", e),
        }
    }
}
diff --git a/titanirc-types/src/lib.rs b/titanirc-types/src/lib.rs
index 81e6b3c..be5555f 100644
--- a/titanirc-types/src/lib.rs
+++ b/titanirc-types/src/lib.rs
@@ -5,7 +5,7 @@ mod primitives;
mod replies;

pub use crate::primitives::*;
pub use crate::replies::{Reply, ServerMessage};
pub use crate::replies::{Reply, ServerMessage, Source};

use nom::{
    bytes::complete::{tag, take_till},