🏡 index : ~doyle/titanirc.git

author Jordan Doyle <jordan@doyle.la> 2023-01-08 1:23:02.0 +00:00:00
committer Jordan Doyle <jordan@doyle.la> 2023-01-08 1:28:30.0 +00:00:00
commit
fc6e6eaef3e1fa32c97da7b14f0a68d080ee356f [patch]
tree
d64d76591be0749b0b60560f3d9156cebce2d3fd
parent
bdd2421072bf3334f1bc05c03f1c5ec1ceac46f6
download
fc6e6eaef3e1fa32c97da7b14f0a68d080ee356f.tar.gz

Allow configuring seperate thread pools for each component



Diff

 Cargo.lock     | 48 +++++++++++++++++++++++++++++++++++++-
 Cargo.toml     |  1 +-
 config.toml    |  3 ++-
 src/channel.rs |  2 +-
 src/config.rs  | 20 ++++++++++++++++-
 src/main.rs    | 76 ++++++++++++++++++++++++++++++++++++++---------------------
 src/server.rs  | 20 ++++++++++++----
 7 files changed, 138 insertions(+), 32 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index b9d2640..9059b75 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -421,6 +421,17 @@ dependencies = [
]

[[package]]
name = "getrandom"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31"
dependencies = [
 "cfg-if",
 "libc",
 "wasi 0.11.0+wasi-snapshot-preview1",
]

[[package]]
name = "heck"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -683,6 +694,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"

[[package]]
name = "ppv-lite86"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"

[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -725,6 +742,36 @@ dependencies = [
]

[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
 "libc",
 "rand_chacha",
 "rand_core",
]

[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
 "ppv-lite86",
 "rand_core",
]

[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
 "getrandom",
]

[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -941,6 +988,7 @@ dependencies = [
 "futures",
 "irc-proto",
 "itertools",
 "rand",
 "serde",
 "tokio",
 "tokio-stream",
diff --git a/Cargo.toml b/Cargo.toml
index 1329604..b4b33bd 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -12,6 +12,7 @@ anyhow = "1.0"
chrono = "0.4"
clap = { version = "4.0", features = ["cargo", "derive", "std", "suggestions", "color"] }
futures = "0.3"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
diff --git a/config.toml b/config.toml
index b5aac20..3c33392 100644
--- a/config.toml
+++ b/config.toml
@@ -1,5 +1,8 @@
listen-address = "[::]:6667"

client-threads = 1
channel-threads = 1

motd = """
Welcome to our IRC server

diff --git a/src/channel.rs b/src/channel.rs
index f08cb94..371de39 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -43,7 +43,7 @@ impl Handler<Broadcast> for Channel {
    #[instrument(parent = &msg.span, skip_all)]
    fn handle(&mut self, msg: Broadcast, _ctx: &mut Self::Context) -> Self::Result {
        for client in self.clients.keys() {
            client.try_send(msg.clone()).unwrap();
            client.do_send(msg.clone());
        }
    }
}
diff --git a/src/config.rs b/src/config.rs
index 90cf2f0..35f6dd0 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -18,6 +18,26 @@ pub struct Args {
pub struct Config {
    pub listen_address: SocketAddr,
    pub motd: Option<String>,
    /// Amount of threads to spawn for processing client commands, set to 0 to spawn clients on the
    /// main server thread.
    #[serde(default = "Config::default_client_threads")]
    pub client_threads: usize,
    /// Amount of threads to spawn for processing channel commands, set to 0 to spawn channels on
    /// the main server thread.
    #[serde(default = "Config::default_channel_threads")]
    pub channel_threads: usize,
}

impl Config {
    #[must_use]
    const fn default_client_threads() -> usize {
        1
    }

    #[must_use]
    const fn default_channel_threads() -> usize {
        1
    }
}

impl FromStr for Config {
diff --git a/src/main.rs b/src/main.rs
index 3723f92..0aced60 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,12 +1,13 @@
#![deny(clippy::nursery, clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]

use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

use actix::{io::FramedWrite, Actor, Addr, AsyncContext};
use actix_rt::System;
use actix_rt::{Arbiter, System};
use clap::Parser;
use irc_proto::IrcCodec;
use rand::seq::SliceRandom;
use tokio::{net::TcpListener, time::Instant};
use tokio_util::codec::FramedRead;
use tracing::{error, info, info_span, Instrument};
@@ -45,16 +46,17 @@ async fn main() -> anyhow::Result<()> {
    subscriber.init();

    let listen_address = opts.config.listen_address;
    let client_threads = opts.config.client_threads;

    let server = Server {
    let server = Server::start_in_arbiter(&Arbiter::new().handle(), |_ctx| Server {
        channels: HashMap::default(),
        clients: HashMap::default(),
        channel_arbiters: build_arbiters(opts.config.channel_threads),
        config: opts.config,
    }
    .start();
    });
    let listener = TcpListener::bind(listen_address).await?;

    actix_rt::spawn(start_tcp_acceptor_loop(listener, server));
    actix_rt::spawn(start_tcp_acceptor_loop(listener, server, client_threads));

    info!("Server listening on {}", listen_address);

@@ -66,7 +68,13 @@ async fn main() -> anyhow::Result<()> {

/// Start listening for new connections from clients, and create a new client handle for
/// them.
async fn start_tcp_acceptor_loop(listener: TcpListener, server: Addr<Server>) {
async fn start_tcp_acceptor_loop(
    listener: TcpListener,
    server: Addr<Server>,
    client_threads: usize,
) {
    let client_arbiters = Arc::new(build_arbiters(client_threads));

    while let Ok((stream, addr)) = listener.accept().await {
        let span = info_span!("connection", %addr);
        let _entered = span.clone().entered();
@@ -74,6 +82,7 @@ async fn start_tcp_acceptor_loop(listener: TcpListener, server: Addr<Server>) {
        info!("Accepted connection");

        let server = server.clone();
        let client_arbiters = client_arbiters.clone();

        actix_rt::spawn(async move {
            // split the stream into its read and write halves and setup codecs
@@ -88,28 +97,43 @@ async fn start_tcp_acceptor_loop(listener: TcpListener, server: Addr<Server>) {
            };

            // spawn the client's actor
            let handle = Client::create(|ctx| {
                // setup the writer codec for the user
                let writer = FramedWrite::new(writer, IrcCodec::new("utf8").unwrap(), ctx);

                // add the user's incoming tcp stream to the actor, messages over the tcp stream
                // will be sent to the actor over the `StreamHandler`
                ctx.add_stream(read);

                Client {
                    writer,
                    connection: connection.clone(),
                    server: server.clone(),
                    channels: HashMap::new(),
                    last_active: Instant::now(),
                    graceful_shutdown: false,
                    server_leave_reason: None,
                    span: span.clone(),
                }
            });
            let handle = {
                let server = server.clone();
                let arbiter = client_arbiters.choose(&mut rand::thread_rng()).map_or_else(Arbiter::current, Arbiter::handle);
                let span = span.clone();
                let connection = connection.clone();

                Client::start_in_arbiter(&arbiter, move |ctx| {
                    // setup the writer codec for the user
                    let writer = FramedWrite::new(writer, IrcCodec::new("utf8").unwrap(), ctx);

                    // add the user's incoming tcp stream to the actor, messages over the tcp stream
                    // will be sent to the actor over the `StreamHandler`
                    ctx.add_stream(read);

                    Client {
                        writer,
                        connection,
                        server,
                        channels: HashMap::new(),
                        last_active: Instant::now(),
                        graceful_shutdown: false,
                        server_leave_reason: None,
                        span,
                    }
                })
            };

            // inform the server of the new connection
            server.do_send(UserConnected { handle, connection, span });
        }.instrument(info_span!("negotiation")));
    }
}

#[must_use]
pub fn build_arbiters(count: usize) -> Vec<Arbiter> {
    std::iter::repeat(())
        .take(count)
        .map(|()| Arbiter::new())
        .collect()
}
diff --git a/src/server.rs b/src/server.rs
index 5fa8adc..fb9972b 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -3,8 +3,10 @@ pub mod response;
use std::collections::HashMap;

use actix::{Actor, Addr, AsyncContext, Context, Handler, MessageResult, ResponseFuture};
use actix_rt::Arbiter;
use futures::{stream::FuturesOrdered, TryFutureExt};
use irc_proto::{Command, Message, Prefix, Response};
use rand::seq::SliceRandom;
use tokio_stream::StreamExt;
use tracing::{debug, instrument, warn, Span};

@@ -24,6 +26,7 @@ use crate::{

/// The root actor for arbitration between clients and channels.
pub struct Server {
    pub channel_arbiters: Vec<Arbiter>,
    pub channels: HashMap<String, Addr<Channel>>,
    pub clients: HashMap<Addr<Client>, InitiatedConnection>,
    pub config: Config,
@@ -137,13 +140,20 @@ impl Handler<ChannelJoin> for Server {
            .channels
            .entry(msg.channel_name.clone())
            .or_insert_with(|| {
                Channel {
                    name: msg.channel_name.clone(),
                let arbiter = self
                    .channel_arbiters
                    .choose(&mut rand::thread_rng())
                    .map_or_else(Arbiter::current, Arbiter::handle);

                let channel_name = msg.channel_name.clone();
                let server = ctx.address();

                Channel::start_in_arbiter(&arbiter, move |_ctx| Channel {
                    name: channel_name,
                    clients: HashMap::new(),
                    topic: None,
                    server: ctx.address(),
                }
                .start()
                    server,
                })
            })
            .clone();