From fc6e6eaef3e1fa32c97da7b14f0a68d080ee356f Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Sun, 8 Jan 2023 01:23:02 +0000 Subject: [PATCH] Allow configuring seperate thread pools for each component --- Cargo.lock | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + config.toml | 3 +++ src/channel.rs | 2 +- src/config.rs | 20 ++++++++++++++++++++ src/main.rs | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------- src/server.rs | 20 +++++++++++++++----- 7 files changed, 138 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b9d2640..9059b75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -421,6 +421,17 @@ dependencies = [ ] [[package]] +name = "getrandom" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.11.0+wasi-snapshot-preview1", +] + +[[package]] name = "heck" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -683,6 +694,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + +[[package]] name = "proc-macro-error" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -725,6 +742,36 @@ dependencies = [ ] [[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + +[[package]] name = "redox_syscall" version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -941,6 +988,7 @@ dependencies = [ "futures", "irc-proto", "itertools", + "rand", "serde", "tokio", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index 1329604..b4b33bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ anyhow = "1.0" chrono = "0.4" clap = { version = "4.0", features = ["cargo", "derive", "std", "suggestions", "color"] } futures = "0.3" +rand = "0.8" serde = { version = "1.0", features = ["derive"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } diff --git a/config.toml b/config.toml index b5aac20..3c33392 100644 --- a/config.toml +++ b/config.toml @@ -1,5 +1,8 @@ listen-address = "[::]:6667" +client-threads = 1 +channel-threads = 1 + motd = """ Welcome to our IRC server diff --git a/src/channel.rs b/src/channel.rs index f08cb94..371de39 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -43,7 +43,7 @@ impl Handler for Channel { #[instrument(parent = &msg.span, skip_all)] fn handle(&mut self, msg: Broadcast, _ctx: &mut Self::Context) -> Self::Result { for client in self.clients.keys() { - client.try_send(msg.clone()).unwrap(); + client.do_send(msg.clone()); } } } diff --git a/src/config.rs b/src/config.rs index 90cf2f0..35f6dd0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -18,6 +18,26 @@ pub struct Args { pub struct Config { pub listen_address: SocketAddr, pub motd: Option, + /// Amount of threads to spawn for processing client commands, set to 0 to spawn clients on the + /// main server thread. + #[serde(default = "Config::default_client_threads")] + pub client_threads: usize, + /// Amount of threads to spawn for processing channel commands, set to 0 to spawn channels on + /// the main server thread. + #[serde(default = "Config::default_channel_threads")] + pub channel_threads: usize, +} + +impl Config { + #[must_use] + const fn default_client_threads() -> usize { + 1 + } + + #[must_use] + const fn default_channel_threads() -> usize { + 1 + } } impl FromStr for Config { diff --git a/src/main.rs b/src/main.rs index 3723f92..0aced60 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,13 @@ #![deny(clippy::nursery, clippy::pedantic)] #![allow(clippy::module_name_repetitions)] -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use actix::{io::FramedWrite, Actor, Addr, AsyncContext}; -use actix_rt::System; +use actix_rt::{Arbiter, System}; use clap::Parser; use irc_proto::IrcCodec; +use rand::seq::SliceRandom; use tokio::{net::TcpListener, time::Instant}; use tokio_util::codec::FramedRead; use tracing::{error, info, info_span, Instrument}; @@ -45,16 +46,17 @@ async fn main() -> anyhow::Result<()> { subscriber.init(); let listen_address = opts.config.listen_address; + let client_threads = opts.config.client_threads; - let server = Server { + let server = Server::start_in_arbiter(&Arbiter::new().handle(), |_ctx| Server { channels: HashMap::default(), clients: HashMap::default(), + channel_arbiters: build_arbiters(opts.config.channel_threads), config: opts.config, - } - .start(); + }); let listener = TcpListener::bind(listen_address).await?; - actix_rt::spawn(start_tcp_acceptor_loop(listener, server)); + actix_rt::spawn(start_tcp_acceptor_loop(listener, server, client_threads)); info!("Server listening on {}", listen_address); @@ -66,7 +68,13 @@ async fn main() -> anyhow::Result<()> { /// Start listening for new connections from clients, and create a new client handle for /// them. -async fn start_tcp_acceptor_loop(listener: TcpListener, server: Addr) { +async fn start_tcp_acceptor_loop( + listener: TcpListener, + server: Addr, + client_threads: usize, +) { + let client_arbiters = Arc::new(build_arbiters(client_threads)); + while let Ok((stream, addr)) = listener.accept().await { let span = info_span!("connection", %addr); let _entered = span.clone().entered(); @@ -74,6 +82,7 @@ async fn start_tcp_acceptor_loop(listener: TcpListener, server: Addr) { info!("Accepted connection"); let server = server.clone(); + let client_arbiters = client_arbiters.clone(); actix_rt::spawn(async move { // split the stream into its read and write halves and setup codecs @@ -88,28 +97,43 @@ async fn start_tcp_acceptor_loop(listener: TcpListener, server: Addr) { }; // spawn the client's actor - let handle = Client::create(|ctx| { - // setup the writer codec for the user - let writer = FramedWrite::new(writer, IrcCodec::new("utf8").unwrap(), ctx); - - // add the user's incoming tcp stream to the actor, messages over the tcp stream - // will be sent to the actor over the `StreamHandler` - ctx.add_stream(read); - - Client { - writer, - connection: connection.clone(), - server: server.clone(), - channels: HashMap::new(), - last_active: Instant::now(), - graceful_shutdown: false, - server_leave_reason: None, - span: span.clone(), - } - }); + let handle = { + let server = server.clone(); + let arbiter = client_arbiters.choose(&mut rand::thread_rng()).map_or_else(Arbiter::current, Arbiter::handle); + let span = span.clone(); + let connection = connection.clone(); + + Client::start_in_arbiter(&arbiter, move |ctx| { + // setup the writer codec for the user + let writer = FramedWrite::new(writer, IrcCodec::new("utf8").unwrap(), ctx); + + // add the user's incoming tcp stream to the actor, messages over the tcp stream + // will be sent to the actor over the `StreamHandler` + ctx.add_stream(read); + + Client { + writer, + connection, + server, + channels: HashMap::new(), + last_active: Instant::now(), + graceful_shutdown: false, + server_leave_reason: None, + span, + } + }) + }; // inform the server of the new connection server.do_send(UserConnected { handle, connection, span }); }.instrument(info_span!("negotiation"))); } } + +#[must_use] +pub fn build_arbiters(count: usize) -> Vec { + std::iter::repeat(()) + .take(count) + .map(|()| Arbiter::new()) + .collect() +} diff --git a/src/server.rs b/src/server.rs index 5fa8adc..fb9972b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,8 +3,10 @@ pub mod response; use std::collections::HashMap; use actix::{Actor, Addr, AsyncContext, Context, Handler, MessageResult, ResponseFuture}; +use actix_rt::Arbiter; use futures::{stream::FuturesOrdered, TryFutureExt}; use irc_proto::{Command, Message, Prefix, Response}; +use rand::seq::SliceRandom; use tokio_stream::StreamExt; use tracing::{debug, instrument, warn, Span}; @@ -24,6 +26,7 @@ use crate::{ /// The root actor for arbitration between clients and channels. pub struct Server { + pub channel_arbiters: Vec, pub channels: HashMap>, pub clients: HashMap, InitiatedConnection>, pub config: Config, @@ -137,13 +140,20 @@ impl Handler for Server { .channels .entry(msg.channel_name.clone()) .or_insert_with(|| { - Channel { - name: msg.channel_name.clone(), + let arbiter = self + .channel_arbiters + .choose(&mut rand::thread_rng()) + .map_or_else(Arbiter::current, Arbiter::handle); + + let channel_name = msg.channel_name.clone(); + let server = ctx.address(); + + Channel::start_in_arbiter(&arbiter, move |_ctx| Channel { + name: channel_name, clients: HashMap::new(), topic: None, - server: ctx.address(), - } - .start() + server, + }) }) .clone(); -- libgit2 1.7.2