🏡 index : ~doyle/scrs.git

author Jordan Doyle <jordan@doyle.la> 2020-08-16 15:24:33.0 +01:00:00
committer Jordan Doyle <jordan@doyle.la> 2020-08-16 15:24:33.0 +01:00:00
commit
00e9ab2e529acba5a9c66a463871068ba35b2978 [patch]
tree
ff45de9fa59a81e881a2855f8805a470f393d370
parent
85aca7e56612ab342d0d00d3d8694c7fb4571a45
download
00e9ab2e529acba5a9c66a463871068ba35b2978.tar.gz

Move to slog



Diff

 Cargo.lock                 | 221 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------
 scrs-serve/Cargo.toml      |   5 +++--
 scrs-serve/src/handler.rs  |  60 ++++++++++++++++++++++++++++++++++++++++++------------------
 scrs-serve/src/main.rs     |  24 +++++++++++++++---------
 scrs-serve/src/metadata.rs |  12 ++++++------
 5 files changed, 236 insertions(+), 86 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index aa914b6..800e390 100644
--- a/Cargo.lock
+++ a/Cargo.lock
@@ -10,19 +10,22 @@
]

[[package]]
name = "aho-corasick"
version = "0.7.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86"
dependencies = [
 "memchr",
]

[[package]]
name = "arc-swap"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034"

[[package]]
name = "arrayref"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544"

[[package]]
name = "arrayvec"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8"

[[package]]
name = "atty"
@@ -40,12 +43,29 @@
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"

[[package]]
name = "base64"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7"

[[package]]
name = "bitflags"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"

[[package]]
name = "blake2b_simd"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8fb2d74254a3a0b5cac33ac9f8ed0e44aa50378d9dbb2e5d83bd21ed1dc2c8a"
dependencies = [
 "arrayref",
 "arrayvec",
 "constant_time_eq",
]

[[package]]
name = "bus_queue"
@@ -77,6 +97,17 @@
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"

[[package]]
name = "chrono"
version = "0.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "942f72db697d8767c22d46a598e01f2d3b475501ea43d0db4f16d90259182d0b"
dependencies = [
 "num-integer",
 "num-traits",
 "time",
]

[[package]]
name = "clap"
version = "3.0.0-beta.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -135,6 +166,33 @@
dependencies = [
 "getrandom",
 "proc-macro-hack",
]

[[package]]
name = "constant_time_eq"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"

[[package]]
name = "crossbeam-channel"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ee0cc8804d5393478d743b035099520087a5186f3b93fa58cec08fa62407b6"
dependencies = [
 "cfg-if",
 "crossbeam-utils",
]

[[package]]
name = "crossbeam-utils"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
dependencies = [
 "autocfg",
 "cfg-if",
 "lazy_static",
]

[[package]]
@@ -149,16 +207,24 @@
]

[[package]]
name = "env_logger"
version = "0.7.1"
name = "dirs"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
checksum = "13aea89a5c93364a98e9b37b2fa237effbb694d5cfe01c5b70941f7eb087d5e3"
dependencies = [
 "atty",
 "humantime",
 "log",
 "regex",
 "termcolor",
 "cfg-if",
 "dirs-sys",
]

[[package]]
name = "dirs-sys"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e93d7f5705de3e49895a2b5e0b8855a1c27f080192ae9c32a6432d50741a57a"
dependencies = [
 "libc",
 "redox_users",
 "winapi 0.3.9",
]

[[package]]
@@ -338,15 +404,6 @@
version = "1.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9"

[[package]]
name = "humantime"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
dependencies = [
 "quick-error",
]

[[package]]
name = "idna"
@@ -515,6 +572,25 @@
 "cfg-if",
 "libc",
 "winapi 0.3.9",
]

[[package]]
name = "num-integer"
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d59457e662d541ba17869cf51cf177c0b5f0cbf476c66bdc90bf1edac4f875b"
dependencies = [
 "autocfg",
 "num-traits",
]

[[package]]
name = "num-traits"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac267bcc07f48ee5f8935ab0d24f316fb722d7a1292e2913f0cc196b29ffd611"
dependencies = [
 "autocfg",
]

[[package]]
@@ -647,12 +723,6 @@
dependencies = [
 "unicode-xid",
]

[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"

[[package]]
name = "quote"
@@ -670,22 +740,27 @@
checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"

[[package]]
name = "regex"
version = "1.3.9"
name = "redox_users"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c3780fcf44b193bc4d09f36d2a3c87b251da4a046c87795a0d35f4f927ad8e6"
checksum = "09b23093265f8d200fa7b4c2c76297f47e681c655f6f1285a8780d6a022f7431"
dependencies = [
 "aho-corasick",
 "memchr",
 "regex-syntax",
 "thread_local",
 "getrandom",
 "redox_syscall",
 "rust-argon2",
]

[[package]]
name = "regex-syntax"
version = "0.6.18"
name = "rust-argon2"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8"
checksum = "2bc8af4bda8e1ff4932523b94d3dd20ee30a87232323eda55903ffd71d2fb017"
dependencies = [
 "base64",
 "blake2b_simd",
 "constant_time_eq",
 "crossbeam-utils",
]

[[package]]
name = "ryu"
@@ -708,14 +783,15 @@
 "bytes",
 "clap",
 "derive_more",
 "env_logger",
 "futures",
 "http",
 "httparse",
 "log",
 "mime",
 "serde",
 "serde_json",
 "slog",
 "slog-async",
 "slog-term",
 "thiserror",
 "tokio",
 "toml",
@@ -773,8 +849,39 @@
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"

[[package]]
name = "slog"
version = "2.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cc9c640a4adbfbcc11ffb95efe5aa7af7309e002adab54b185507dbf2377b99"

[[package]]
name = "slog-async"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b3336ce47ce2f96673499fc07eb85e3472727b9a7a2959964b002c2ce8fbbb"
dependencies = [
 "crossbeam-channel",
 "slog",
 "take_mut",
 "thread_local",
]

[[package]]
name = "slog-term"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bab1d807cf71129b05ce36914e1dbb6fbfbdecaf686301cb457f4fa967f9f5b6"
dependencies = [
 "atty",
 "chrono",
 "slog",
 "term",
 "thread_local",
]

[[package]]
name = "smallvec"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -818,6 +925,22 @@
 "proc-macro2",
 "quote",
 "syn",
]

[[package]]
name = "take_mut"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60"

[[package]]
name = "term"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0863a3345e70f61d613eab32ee046ccd1bcc5f9105fe402c61fcd0c13eeb8b5"
dependencies = [
 "dirs",
 "winapi 0.3.9",
]

[[package]]
@@ -865,6 +988,16 @@
checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
dependencies = [
 "lazy_static",
]

[[package]]
name = "time"
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
dependencies = [
 "libc",
 "winapi 0.3.9",
]

[[package]]
diff --git a/scrs-serve/Cargo.toml b/scrs-serve/Cargo.toml
index 508ca3f..f599f0f 100644
--- a/scrs-serve/Cargo.toml
+++ a/scrs-serve/Cargo.toml
@@ -11,8 +11,9 @@
futures = "0.3"
tokio = { version = "0.2", features = ["full"] }
http = "0.2"
env_logger = "0.7"
log = "0.4"
slog = ""
slog-async = ""
slog-term = ""
thiserror = "1.0"
bytes = "0.5"
httparse = "1.3"
diff --git a/scrs-serve/src/handler.rs b/scrs-serve/src/handler.rs
index 1b9209b..fd1dbd8 100644
--- a/scrs-serve/src/handler.rs
+++ a/scrs-serve/src/handler.rs
@@ -1,8 +1,8 @@
use crate::metadata::{StreamMetadata, TrackMetadata};

use derive_more::Deref;
use futures::{SinkExt, StreamExt};

use slog::{debug, info};
use thiserror::Error;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
@@ -24,6 +24,8 @@
    IoError(#[from] std::io::Error),
    #[error("source attempted to stream data, but the stream no longer exists: `{0}`")]
    StreamDisconnected(#[from] std::sync::mpsc::SendError<bytes::Bytes>),
    #[error("endpoint not found: `{} {}`", .0.method(), .0.uri())]
    EndpointNotFound(http::Request<()>),
}

#[derive(Debug, Deref)]
@@ -52,21 +54,19 @@
}

macro_rules! write_response {
    ($writer:ident, $resp:expr) => {{
    ($log:ident, $writer:ident, $resp:expr) => {{
        let resp = $resp.body(())?;

        $writer
            .write_all(
                format!(
                    "{:?} {} {}\r\n",
                    resp.version(),
                    resp.status().as_str(),
                    resp.status().canonical_reason().unwrap_or("unknown error")
                )
                .as_bytes(),
            )
            .await?;
        let resp_head = format!(
            "{:?} {} {}\r\n",
            resp.version(),
            resp.status().as_str(),
            resp.status().canonical_reason().unwrap_or("unknown error")
        );
        debug!($log, "Response: {}", &resp_head[..resp_head.len() - 2]);

        $writer.write_all(resp_head.as_bytes()).await?;

        for (name, value) in resp.headers() {
            $writer.write_all(name.as_str().as_bytes()).await?;
            $writer.write_all(b": ").await?;
@@ -101,6 +101,7 @@
pub async fn process(
    mut conn: TcpStream,
    stream: Arc<crate::stream::Stream>,
    log: slog::Logger,
) -> Result<(), HandlerError> {
    let mut buffer = bytes::BytesMut::with_capacity(1024);

@@ -117,6 +118,8 @@

        Request::try_from(parsed)?
    };

    debug!(log, "Request: {} {}", req.method(), req.uri(); "user-agent" => req.headers().get("user-agent").and_then(|v| v.to_str().ok()));

    let resp = http::Response::builder()
        .version(http::Version::HTTP_11)
@@ -132,9 +135,10 @@
        v if v == stream.listen_uri => {
            // TODO: allow the streamer to set content-type
            let resp = resp.header("Content-Type", stream.content_type.as_ref());
            write_response!(conn, resp);
            write_response!(log, conn, resp);

            let mut handle = stream.listen();
            debug!(log, "Streaming to new listener"; "mountpoint" => "/");

            loop {
                if let Some(v) = handle.subscriber.next().await {
@@ -143,7 +147,6 @@
            }
        }
        "/stream" => {
            println!("stream req: {:?}", req.headers());
            if let Ok(ref mut handle) = stream.try_broadcast() {
                let mut resp = resp
                    .header("Connection", "Close")
@@ -152,7 +155,7 @@
                let content_type = header_bytes_or_empty(&req, "content-type");
                if content_type != stream.content_type.as_ref().as_bytes() {
                    // todo: custom message - unsupported content type
                    write_response!(conn, resp.status(403));
                    write_response!(log, conn, resp.status(403));
                    return Ok(());
                }

@@ -161,13 +164,13 @@
                    resp = resp.status(100);
                }

                write_response!(conn, resp);
                write_response!(log, conn, resp);

                let metadata = StreamMetadata::from(req.headers());
                info!(log, "Accepted stream request"; "mountpoint" => "/", "user-agent" => &metadata.user_agent, "content-type" => &metadata.content_type, "name" => &metadata.name);

                // todo: only stream handles should have write access to the metadata
                stream
                    .metadata()
                    .stream
                    .store(Arc::new(Some(StreamMetadata::from(req.headers()))));
                stream.metadata().stream.store(Arc::new(Some(metadata)));

                loop {
                    if conn.read_buf(&mut buffer).await? == 0 {
@@ -187,7 +190,7 @@
            let resp = resp
                .header("Connection", "Close")
                .header("Content-Type", "application/json");
            write_response!(conn, resp);
            write_response!(log, conn, resp);
            write_json!(conn, stream.metadata());
        }
        "/admin/metadata" => {
@@ -197,16 +200,25 @@
                .map(str::as_bytes)
                .map(url::form_urlencoded::parse)
            {
                let metadata = TrackMetadata::from(query);

                debug!(log, "Updated stream metadata"; "mountpoint" => "/", "track" => metadata.title, "artist" => metadata.artist, "album" => metadata.album);

                // todo: auth
                stream
                    .metadata()
                    .track
                    .store(Arc::new(Some(TrackMetadata::from(query))));

                write_response!(conn, resp);
                write_response!(log, conn, resp);
            }
        }
        _ => {
            let resp = resp.header("Connection", "Close").status(404);
            write_response!(log, conn, resp);

            return Err(HandlerError::EndpointNotFound(req.0));
        }
        _ => panic!("Invalid request: {:?}", req),
    }

    Ok(())
diff --git a/scrs-serve/src/main.rs b/scrs-serve/src/main.rs
index 11ccd4e..f68c793 100644
--- a/scrs-serve/src/main.rs
+++ a/scrs-serve/src/main.rs
@@ -8,7 +8,7 @@
mod stream;

use clap::Clap;
use log::{debug, error, info};
use slog::{crit, debug, error, info, o, Drain};
use std::sync::Arc;
use tokio::net::TcpListener;

@@ -29,7 +29,10 @@

#[tokio::main]
async fn main() {
    env_logger::init();
    let decorator = slog_term::TermDecorator::new().build();
    let drain = slog_term::CompactFormat::new(decorator).build().fuse();
    let drain = slog_async::Async::new(drain).build().fuse();
    let log = slog::Logger::root(drain, o!());

    let opts: Opts = Opts::parse();

@@ -39,15 +42,15 @@
    {
        Ok(v) => v,
        Err(e) => {
            error!("{}", e);
            crit!(log, "{}", e);
            return;
        }
    };

    listen_forward(cfg).await;
    listen_forward(cfg, log).await;
}

async fn listen_forward(config: config::Config) {
async fn listen_forward(config: config::Config, log: slog::Logger) {
    let stream = Arc::new(stream::Stream::from(config.stream));

    let mut listener = TcpListener::bind(config.server.listen_address)
@@ -55,19 +58,20 @@
        .unwrap();

    info!(
        "Listening for new connections on {}",
        config.server.listen_address
        log,
        "Listening for new connections on {}", config.server.listen_address
    );

    loop {
        let (conn, remote) = listener.accept().await.unwrap();
        debug!("Accepted connection from {}", remote);
        let log = log.new(o!("remote" => remote));
        debug!(log, "Accepted connection from {}", remote);

        let stream = stream.clone();

        tokio::spawn(async move {
            if let Err(e) = handler::process(conn, stream).await {
                error!("Error handling request from {}: {}", remote, e);
            if let Err(e) = handler::process(conn, stream, log.clone()).await {
                error!(log, "Error handling request: {}", e);
            }
        });
    }
diff --git a/scrs-serve/src/metadata.rs b/scrs-serve/src/metadata.rs
index f1b0581..f995521 100644
--- a/scrs-serve/src/metadata.rs
+++ a/scrs-serve/src/metadata.rs
@@ -25,9 +25,9 @@

#[derive(Debug, Default, Serialize)]
pub struct StreamMetadata {
    user_agent: String,
    content_type: String,
    name: String,
    pub user_agent: String,
    pub content_type: String,
    pub name: String,
}

impl From<&http::HeaderMap> for StreamMetadata {
@@ -49,9 +49,9 @@

#[derive(Debug, Default, Serialize)]
pub struct TrackMetadata {
    artist: Option<String>,
    title: Option<String>,
    album: Option<String>,
    pub artist: Option<String>,
    pub title: Option<String>,
    pub album: Option<String>,
}

impl From<url::form_urlencoded::Parse<'_>> for TrackMetadata {