Allow configuring seperate thread pools for each component
Diff
Cargo.lock | 48 ++++++++++++++++++++++++++++++++++++++++++++++++
Cargo.toml | 1 +
config.toml | 3 +++
src/channel.rs | 2 +-
src/config.rs | 20 ++++++++++++++++++++
src/main.rs | 76 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------
src/server.rs | 20 ++++++++++++++++----
7 files changed, 138 insertions(+), 32 deletions(-)
@@ -421,6 +421,17 @@
]
[[package]]
name = "getrandom"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31"
dependencies = [
"cfg-if",
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
]
[[package]]
name = "heck"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -681,6 +692,12 @@
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "ppv-lite86"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro-error"
@@ -722,6 +739,36 @@
checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b"
dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]]
@@ -941,6 +988,7 @@
"futures",
"irc-proto",
"itertools",
"rand",
"serde",
"tokio",
"tokio-stream",
@@ -12,6 +12,7 @@
chrono = "0.4"
clap = { version = "4.0", features = ["cargo", "derive", "std", "suggestions", "color"] }
futures = "0.3"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
@@ -1,5 +1,8 @@
listen-address = "[::]:6667"
client-threads = 1
channel-threads = 1
motd = """
Welcome to our IRC server
@@ -43,7 +43,7 @@
#[instrument(parent = &msg.span, skip_all)]
fn handle(&mut self, msg: Broadcast, _ctx: &mut Self::Context) -> Self::Result {
for client in self.clients.keys() {
client.try_send(msg.clone()).unwrap();
client.do_send(msg.clone());
}
}
}
@@ -18,6 +18,26 @@
pub struct Config {
pub listen_address: SocketAddr,
pub motd: Option<String>,
#[serde(default = "Config::default_client_threads")]
pub client_threads: usize,
#[serde(default = "Config::default_channel_threads")]
pub channel_threads: usize,
}
impl Config {
#[must_use]
const fn default_client_threads() -> usize {
1
}
#[must_use]
const fn default_channel_threads() -> usize {
1
}
}
impl FromStr for Config {
@@ -1,12 +1,13 @@
#![deny(clippy::nursery, clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};
use actix::{io::FramedWrite, Actor, Addr, AsyncContext};
use actix_rt::System;
use actix_rt::{Arbiter, System};
use clap::Parser;
use irc_proto::IrcCodec;
use rand::seq::SliceRandom;
use tokio::{net::TcpListener, time::Instant};
use tokio_util::codec::FramedRead;
use tracing::{error, info, info_span, Instrument};
@@ -45,16 +46,17 @@
subscriber.init();
let listen_address = opts.config.listen_address;
let client_threads = opts.config.client_threads;
let server = Server {
let server = Server::start_in_arbiter(&Arbiter::new().handle(), |_ctx| Server {
channels: HashMap::default(),
clients: HashMap::default(),
channel_arbiters: build_arbiters(opts.config.channel_threads),
config: opts.config,
}
.start();
});
let listener = TcpListener::bind(listen_address).await?;
actix_rt::spawn(start_tcp_acceptor_loop(listener, server));
actix_rt::spawn(start_tcp_acceptor_loop(listener, server, client_threads));
info!("Server listening on {}", listen_address);
@@ -66,7 +68,13 @@
async fn start_tcp_acceptor_loop(listener: TcpListener, server: Addr<Server>) {
async fn start_tcp_acceptor_loop(
listener: TcpListener,
server: Addr<Server>,
client_threads: usize,
) {
let client_arbiters = Arc::new(build_arbiters(client_threads));
while let Ok((stream, addr)) = listener.accept().await {
let span = info_span!("connection", %addr);
let _entered = span.clone().entered();
@@ -74,6 +82,7 @@
info!("Accepted connection");
let server = server.clone();
let client_arbiters = client_arbiters.clone();
actix_rt::spawn(async move {
@@ -88,28 +97,43 @@
};
let handle = Client::create(|ctx| {
let writer = FramedWrite::new(writer, IrcCodec::new("utf8").unwrap(), ctx);
ctx.add_stream(read);
Client {
writer,
connection: connection.clone(),
server: server.clone(),
channels: HashMap::new(),
last_active: Instant::now(),
graceful_shutdown: false,
server_leave_reason: None,
span: span.clone(),
}
});
let handle = {
let server = server.clone();
let arbiter = client_arbiters.choose(&mut rand::thread_rng()).map_or_else(Arbiter::current, Arbiter::handle);
let span = span.clone();
let connection = connection.clone();
Client::start_in_arbiter(&arbiter, move |ctx| {
let writer = FramedWrite::new(writer, IrcCodec::new("utf8").unwrap(), ctx);
ctx.add_stream(read);
Client {
writer,
connection,
server,
channels: HashMap::new(),
last_active: Instant::now(),
graceful_shutdown: false,
server_leave_reason: None,
span,
}
})
};
server.do_send(UserConnected { handle, connection, span });
}.instrument(info_span!("negotiation")));
}
}
#[must_use]
pub fn build_arbiters(count: usize) -> Vec<Arbiter> {
std::iter::repeat(())
.take(count)
.map(|()| Arbiter::new())
.collect()
}
@@ -1,10 +1,12 @@
pub mod response;
use std::collections::HashMap;
use actix::{Actor, Addr, AsyncContext, Context, Handler, MessageResult, ResponseFuture};
use actix_rt::Arbiter;
use futures::{stream::FuturesOrdered, TryFutureExt};
use irc_proto::{Command, Message, Prefix, Response};
use rand::seq::SliceRandom;
use tokio_stream::StreamExt;
use tracing::{debug, instrument, warn, Span};
@@ -24,6 +26,7 @@
pub struct Server {
pub channel_arbiters: Vec<Arbiter>,
pub channels: HashMap<String, Addr<Channel>>,
pub clients: HashMap<Addr<Client>, InitiatedConnection>,
pub config: Config,
@@ -137,13 +140,20 @@
.channels
.entry(msg.channel_name.clone())
.or_insert_with(|| {
Channel {
name: msg.channel_name.clone(),
let arbiter = self
.channel_arbiters
.choose(&mut rand::thread_rng())
.map_or_else(Arbiter::current, Arbiter::handle);
let channel_name = msg.channel_name.clone();
let server = ctx.address();
Channel::start_in_arbiter(&arbiter, move |_ctx| Channel {
name: channel_name,
clients: HashMap::new(),
topic: None,
server: ctx.address(),
}
.start()
server,
})
})
.clone();