Introduce concept of channels
Diff
Cargo.lock | 28 ++++++-
titanirc-server/Cargo.toml | 3 +-
titanirc-server/src/entities/channel.rs | 97 ++++++++++++++++++++++-
titanirc-server/src/entities/mod.rs | 13 +++-
titanirc-server/src/entities/user.rs | 148 +++++++++++++++++++++++++++++++++-
titanirc-server/src/main.rs | 8 +-
titanirc-server/src/server.rs | 48 +++++++++--
titanirc-server/src/session.rs | 90 +--------------------
titanirc-types/src/lib.rs | 2 +-
9 files changed, 336 insertions(+), 101 deletions(-)
@@ -288,6 +288,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28be053525281ad8259d47e4de5de657b25e7bac113458555bb4b70bc6870500"
[[package]]
name = "futures-macro"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c287d25add322d9f9abdcdc5927ca398917996600182178774032e9f8258fedd"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -309,9 +321,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "632a8cd0f2a4b3fdea1657f08bde063848c3bd00f9bbf6e256b8be78802e624b"
dependencies = [
"futures-core",
"futures-macro",
"futures-task",
"pin-project-lite",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
"slab",
]
@@ -631,6 +646,18 @@ dependencies = [
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro-nested"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"
[[package]]
name = "proc-macro2"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -861,6 +888,7 @@ dependencies = [
"async-stream",
"clap",
"displaydoc",
"futures-util",
"thiserror",
"titanirc-codec",
"titanirc-types",
@@ -17,4 +17,5 @@ tokio-util = "0.6"
async-stream = "0.3"
thiserror = "1"
displaydoc = "0.1"
clap = "3.0.0-beta.2"
\ No newline at end of file
clap = "3.0.0-beta.2"
futures-util = "0.3"
\ No newline at end of file
@@ -0,0 +1,97 @@
use std::sync::Arc;
use actix::prelude::*;
use crate::{entities::user::User, server::Server};
pub mod events {
use crate::entities::user::User;
use actix::prelude::*;
pub type JoinResult = Result<super::Handle, JoinError>;
#[derive(Message)]
#[rtype(result = "JoinResult")]
pub struct Join {
pub channel_name: String,
pub nick: String,
pub user: Addr<User>,
}
#[derive(Debug, thiserror::Error, displaydoc::Display)]
pub enum JoinError {
Mailbox(#[from] actix::MailboxError),
}
#[derive(Message)]
#[rtype(result = "")]
pub struct JoinBroadcast {
pub channel_name: String,
pub nick: String,
}
impl From<Join> for JoinBroadcast {
fn from(
Join {
channel_name, nick, ..
}: Join,
) -> Self {
Self { channel_name, nick }
}
}
}
pub struct Handle {
pub message: actix::Recipient<super::common_events::Message>,
}
pub struct Channel {
pub members: Vec<Addr<User>>,
}
impl Channel {
fn announce_join(&self, join: events::Join) -> impl Future<Output = ()> {
let mut futures = Vec::new();
let broadcast = Arc::new(events::JoinBroadcast::from(join));
for member in &self.members {
futures.push(member.send(broadcast.clone()));
}
async {
futures_util::future::try_join_all(futures).await.unwrap();
}
}
}
impl Actor for Channel {
type Context = Context<Self>;
}
impl actix::Handler<events::Join> for Channel {
type Result = events::JoinResult;
fn handle(&mut self, msg: events::Join, ctx: &mut Self::Context) -> Self::Result {
self.members.push(msg.user.clone());
ctx.spawn(self.announce_join(msg).into_actor(self));
Ok(Handle {
message: ctx.address().recipient(),
})
}
}
impl actix::Handler<super::common_events::Message> for Channel {
type Result = ();
fn handle(
&mut self,
msg: super::common_events::Message,
ctx: &mut Self::Context,
) -> Self::Result {
}
}
@@ -0,0 +1,13 @@
pub mod channel;
pub mod user;
pub mod common_events {
use actix::prelude::*;
#[derive(Debug, Message)]
#[rtype(result = "()")]
pub struct Message {
from: String,
message: String,
}
}
@@ -0,0 +1,148 @@
use actix::{
io::{FramedWrite, WriteHandler},
prelude::*,
};
use std::time::{Duration, Instant};
use titanirc_types::Command;
use tokio::{io::WriteHalf, net::TcpStream};
pub mod events {
use actix::prelude::*;
#[derive(Message)]
#[rtype(result = "()")]
pub struct NameChange {
pub old: String,
pub new: String,
}
}
pub struct User {
pub server: Addr<crate::server::Server>,
pub writer:
FramedWrite<titanirc_types::ServerMessage, WriteHalf<TcpStream>, titanirc_codec::Encoder>,
pub last_active: Instant,
pub nick: Option<String>, }
fn schedule_ping(ctx: &mut <User as Actor>::Context) {
ctx.run_later(Duration::from_secs(30), |act, ctx| {
if Instant::now().duration_since(act.last_active) > Duration::from_secs(240) {
eprintln!("ping timeout");
ctx.stop();
}
act.writer.write(titanirc_types::ServerMessage::Ping);
schedule_ping(ctx);
});
}
impl Actor for User {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
schedule_ping(ctx);
}
}
impl WriteHandler<std::io::Error> for User {}
impl StreamHandler<Result<Command, std::io::Error>> for User {
fn handle(&mut self, cmd: Result<Command, std::io::Error>, ctx: &mut Self::Context) {
self.last_active = Instant::now();
match cmd {
Ok(Command::Nick(titanirc_types::NickCommand { nick })) => {
self.writer.write(titanirc_types::Reply::RplWelcome.into());
self.writer.write(titanirc_types::Reply::RplYourHost.into());
self.writer.write(titanirc_types::Reply::RplCreated.into());
self.writer.write(titanirc_types::Reply::RplMyInfo.into());
self.writer.write(titanirc_types::Reply::RplISupport.into());
self.nick = Some(nick.0);
}
Ok(Command::Join(titanirc_types::JoinCommand { channel })) => {
if let Some(ref nick) = self.nick {
let server_addr = self.server.clone();
let ctx_addr = ctx.address();
let nick = nick.clone();
ctx.spawn(
async move {
server_addr
.send(crate::entities::channel::events::Join {
channel_name: channel.0,
user: ctx_addr,
nick,
})
.await
.unwrap()
.unwrap();
println!("joined chan!");
}
.into_actor(self),
);
}
}
Ok(Command::Mode(titanirc_types::ModeCommand { mode, .. })) => self
.writer
.write(titanirc_types::Reply::RplUmodeIs(mode).into()),
Ok(Command::Motd(_)) => {
self.writer.write(
titanirc_types::Reply::RplMotdStart(titanirc_types::ServerName(
"my.test.server".to_string(),
))
.into(),
);
self.writer.write(
titanirc_types::Reply::RplMotd(titanirc_types::FreeText(
"Hello, welcome to this server!".to_string(),
))
.into(),
);
self.writer.write(
titanirc_types::Reply::RplMotd(titanirc_types::FreeText(
"it's very cool!".to_string(),
))
.into(),
);
self.writer
.write(titanirc_types::Reply::RplEndOfMotd.into());
}
Ok(Command::Version(_)) => self.writer.write(
titanirc_types::Reply::RplVersion(
clap::crate_version!().to_string(),
"release".to_string(),
titanirc_types::ServerName("my.test.server".to_string()),
titanirc_types::FreeText("https://github.com/MITBorg/titanirc".to_string()),
)
.into(),
),
Ok(Command::Pong(_)) => {}
Ok(cmd) => println!("cmd: {:?}", cmd),
Err(e) => eprintln!("error decoding: {}", e),
}
}
}
impl actix::Handler<std::sync::Arc<crate::entities::channel::events::JoinBroadcast>> for User {
type Result = ();
fn handle(
&mut self,
msg: std::sync::Arc<crate::entities::channel::events::JoinBroadcast>,
ctx: &mut Self::Context,
) -> Self::Result {
self.writer.write(titanirc_types::ServerMessage::Command(
titanirc_types::Source::User(titanirc_types::Nick(msg.nick.clone())),
titanirc_types::Command::Join(titanirc_types::JoinCommand {
channel: titanirc_types::Channel::from(msg.channel_name.clone()),
}),
));
}
}
@@ -1,9 +1,11 @@
#![deny(clippy::pedantic)]
#![allow(clippy::missing_errors_doc)]
mod entities;
mod error;
mod server;
mod session;
use std::collections::HashMap;
use crate::{
error::Result,
@@ -38,7 +40,9 @@ async fn main() -> Result<()> {
Server::create(move |ctx| {
ctx.add_message_stream(stream);
Server {}
Server {
channels: HashMap::new(),
}
});
println!("Running IRC server on 0.0.0.0:6667");
@@ -1,12 +1,15 @@
use crate::session::Session;
use crate::entities::{channel::Channel, user::User};
use std::net::SocketAddr;
use std::{collections::HashMap, net::SocketAddr};
use actix::{io::FramedWrite, prelude::*};
use futures_util::future::TryFutureExt;
use tokio::net::TcpStream;
use tokio_util::codec::FramedRead;
pub struct Server {}
pub struct Server {
pub channels: HashMap<String, Addr<Channel>>,
}
impl Actor for Server {
type Context = Context<Self>;
@@ -19,16 +22,47 @@ pub struct Connection(pub TcpStream, pub SocketAddr);
impl Handler<Connection> for Server {
type Result = ();
fn handle(&mut self, Connection(stream, remote): Connection, _ctx: &mut Self::Context) {
fn handle(&mut self, Connection(stream, remote): Connection, server_ctx: &mut Self::Context) {
println!("Accepted connection from {}", remote);
Session::create(move |ctx| {
User::create(move |ctx| {
let (read, write) = tokio::io::split(stream);
Session::add_stream(FramedRead::new(read, titanirc_codec::Decoder), ctx);
Session {
User::add_stream(FramedRead::new(read, titanirc_codec::Decoder), ctx);
User {
server: server_ctx.address(),
writer: FramedWrite::new(write, titanirc_codec::Encoder, ctx),
last_active: std::time::Instant::now(),
nick: None,
}
});
}
}
impl Handler<crate::entities::channel::events::Join> for Server {
type Result = ResponseActFuture<Self, crate::entities::channel::events::JoinResult>;
fn handle(
&mut self,
msg: crate::entities::channel::events::Join,
ctx: &mut Self::Context,
) -> Self::Result {
let channel = if let Some(channel) = self.channels.get(&msg.channel_name) {
channel
} else {
let channel = Channel::create(|ctx| Channel {
members: Default::default(),
});
self.channels
.entry(msg.channel_name.clone())
.or_insert(channel)
};
Box::pin(
channel
.send(msg)
.into_actor(self)
.map(|v, _, _| v.map_err(|e| e.into()).and_then(|v| v)),
)
}
}
@@ -1,90 +0,0 @@
use actix::{
io::{FramedWrite, WriteHandler},
prelude::*,
};
use std::time::{Duration, Instant};
use titanirc_types::Command;
use tokio::{io::WriteHalf, net::TcpStream};
pub struct Session {
pub writer:
FramedWrite<titanirc_types::ServerMessage, WriteHalf<TcpStream>, titanirc_codec::Encoder>,
pub last_active: Instant,
}
fn schedule_ping(ctx: &mut <Session as Actor>::Context) {
ctx.run_later(Duration::from_secs(30), |act, ctx| {
if Instant::now().duration_since(act.last_active) > Duration::from_secs(240) {
eprintln!("ping timeout");
ctx.stop();
}
act.writer.write(titanirc_types::ServerMessage::Ping);
schedule_ping(ctx);
});
}
impl Actor for Session {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
schedule_ping(ctx);
}
}
impl WriteHandler<std::io::Error> for Session {}
impl StreamHandler<Result<Command, std::io::Error>> for Session {
fn handle(&mut self, cmd: Result<Command, std::io::Error>, _ctx: &mut Self::Context) {
self.last_active = Instant::now();
match cmd {
Ok(Command::Nick(_v)) => {
self.writer.write(titanirc_types::Reply::RplWelcome.into());
self.writer.write(titanirc_types::Reply::RplYourHost.into());
self.writer.write(titanirc_types::Reply::RplCreated.into());
self.writer.write(titanirc_types::Reply::RplMyInfo.into());
self.writer.write(titanirc_types::Reply::RplISupport.into());
}
Ok(Command::Mode(titanirc_types::ModeCommand { mode, .. })) => self
.writer
.write(titanirc_types::Reply::RplUmodeIs(mode).into()),
Ok(Command::Motd(_)) => {
self.writer.write(
titanirc_types::Reply::RplMotdStart(titanirc_types::ServerName(
"my.test.server".to_string(),
))
.into(),
);
self.writer.write(
titanirc_types::Reply::RplMotd(titanirc_types::FreeText(
"Hello, welcome to this server!".to_string(),
))
.into(),
);
self.writer.write(
titanirc_types::Reply::RplMotd(titanirc_types::FreeText(
"it's very cool!".to_string(),
))
.into(),
);
self.writer
.write(titanirc_types::Reply::RplEndOfMotd.into());
}
Ok(Command::Version(_)) => self.writer.write(
titanirc_types::Reply::RplVersion(
clap::crate_version!().to_string(),
"release".to_string(),
titanirc_types::ServerName("my.test.server".to_string()),
titanirc_types::FreeText("https://github.com/MITBorg/titanirc".to_string()),
)
.into(),
),
Ok(Command::Pong(_)) => {}
Ok(cmd) => println!("cmd: {:?}", cmd),
Err(e) => eprintln!("error decoding: {}", e),
}
}
}
@@ -5,7 +5,7 @@ mod primitives;
mod replies;
pub use crate::primitives::*;
pub use crate::replies::{Reply, ServerMessage};
pub use crate::replies::{Reply, ServerMessage, Source};
use nom::{
bytes::complete::{tag, take_till},