From 00e9ab2e529acba5a9c66a463871068ba35b2978 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Sun, 16 Aug 2020 15:24:33 +0100 Subject: [PATCH] Move to slog --- Cargo.lock | 221 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------- scrs-serve/Cargo.toml | 5 +++-- scrs-serve/src/handler.rs | 60 ++++++++++++++++++++++++++++++++++++++++++------------------ scrs-serve/src/main.rs | 24 +++++++++++++++--------- scrs-serve/src/metadata.rs | 12 ++++++------ 5 files changed, 236 insertions(+), 86 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa914b6..800e390 100644 --- a/Cargo.lock +++ a/Cargo.lock @@ -10,19 +10,22 @@ ] [[package]] -name = "aho-corasick" -version = "0.7.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86" -dependencies = [ - "memchr", -] - -[[package]] name = "arc-swap" version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034" + +[[package]] +name = "arrayref" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544" + +[[package]] +name = "arrayvec" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8" [[package]] name = "atty" @@ -40,12 +43,29 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" + +[[package]] +name = "base64" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" [[package]] name = "bitflags" version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" + +[[package]] +name = "blake2b_simd" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8fb2d74254a3a0b5cac33ac9f8ed0e44aa50378d9dbb2e5d83bd21ed1dc2c8a" +dependencies = [ + "arrayref", + "arrayvec", + "constant_time_eq", +] [[package]] name = "bus_queue" @@ -77,6 +97,17 @@ checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" [[package]] +name = "chrono" +version = "0.4.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "942f72db697d8767c22d46a598e01f2d3b475501ea43d0db4f16d90259182d0b" +dependencies = [ + "num-integer", + "num-traits", + "time", +] + +[[package]] name = "clap" version = "3.0.0-beta.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -135,6 +166,33 @@ dependencies = [ "getrandom", "proc-macro-hack", +] + +[[package]] +name = "constant_time_eq" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" + +[[package]] +name = "crossbeam-channel" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ee0cc8804d5393478d743b035099520087a5186f3b93fa58cec08fa62407b6" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if", + "lazy_static", ] [[package]] @@ -149,16 +207,24 @@ ] [[package]] -name = "env_logger" -version = "0.7.1" +name = "dirs" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" +checksum = "13aea89a5c93364a98e9b37b2fa237effbb694d5cfe01c5b70941f7eb087d5e3" dependencies = [ - "atty", - "humantime", - "log", - "regex", - "termcolor", + "cfg-if", + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e93d7f5705de3e49895a2b5e0b8855a1c27f080192ae9c32a6432d50741a57a" +dependencies = [ + "libc", + "redox_users", + "winapi 0.3.9", ] [[package]] @@ -338,15 +404,6 @@ version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9" - -[[package]] -name = "humantime" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" -dependencies = [ - "quick-error", -] [[package]] name = "idna" @@ -515,6 +572,25 @@ "cfg-if", "libc", "winapi 0.3.9", +] + +[[package]] +name = "num-integer" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d59457e662d541ba17869cf51cf177c0b5f0cbf476c66bdc90bf1edac4f875b" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac267bcc07f48ee5f8935ab0d24f316fb722d7a1292e2913f0cc196b29ffd611" +dependencies = [ + "autocfg", ] [[package]] @@ -647,12 +723,6 @@ dependencies = [ "unicode-xid", ] - -[[package]] -name = "quick-error" -version = "1.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quote" @@ -670,22 +740,27 @@ checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" [[package]] -name = "regex" -version = "1.3.9" +name = "redox_users" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c3780fcf44b193bc4d09f36d2a3c87b251da4a046c87795a0d35f4f927ad8e6" +checksum = "09b23093265f8d200fa7b4c2c76297f47e681c655f6f1285a8780d6a022f7431" dependencies = [ - "aho-corasick", - "memchr", - "regex-syntax", - "thread_local", + "getrandom", + "redox_syscall", + "rust-argon2", ] [[package]] -name = "regex-syntax" -version = "0.6.18" +name = "rust-argon2" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8" +checksum = "2bc8af4bda8e1ff4932523b94d3dd20ee30a87232323eda55903ffd71d2fb017" +dependencies = [ + "base64", + "blake2b_simd", + "constant_time_eq", + "crossbeam-utils", +] [[package]] name = "ryu" @@ -708,14 +783,15 @@ "bytes", "clap", "derive_more", - "env_logger", "futures", "http", "httparse", - "log", "mime", "serde", "serde_json", + "slog", + "slog-async", + "slog-term", "thiserror", "tokio", "toml", @@ -773,8 +849,39 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" + +[[package]] +name = "slog" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cc9c640a4adbfbcc11ffb95efe5aa7af7309e002adab54b185507dbf2377b99" [[package]] +name = "slog-async" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51b3336ce47ce2f96673499fc07eb85e3472727b9a7a2959964b002c2ce8fbbb" +dependencies = [ + "crossbeam-channel", + "slog", + "take_mut", + "thread_local", +] + +[[package]] +name = "slog-term" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bab1d807cf71129b05ce36914e1dbb6fbfbdecaf686301cb457f4fa967f9f5b6" +dependencies = [ + "atty", + "chrono", + "slog", + "term", + "thread_local", +] + +[[package]] name = "smallvec" version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -818,6 +925,22 @@ "proc-macro2", "quote", "syn", +] + +[[package]] +name = "take_mut" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" + +[[package]] +name = "term" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0863a3345e70f61d613eab32ee046ccd1bcc5f9105fe402c61fcd0c13eeb8b5" +dependencies = [ + "dirs", + "winapi 0.3.9", ] [[package]] @@ -865,6 +988,16 @@ checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" dependencies = [ "lazy_static", +] + +[[package]] +name = "time" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +dependencies = [ + "libc", + "winapi 0.3.9", ] [[package]] diff --git a/scrs-serve/Cargo.toml b/scrs-serve/Cargo.toml index 508ca3f..f599f0f 100644 --- a/scrs-serve/Cargo.toml +++ a/scrs-serve/Cargo.toml @@ -11,8 +11,9 @@ futures = "0.3" tokio = { version = "0.2", features = ["full"] } http = "0.2" -env_logger = "0.7" -log = "0.4" +slog = "" +slog-async = "" +slog-term = "" thiserror = "1.0" bytes = "0.5" httparse = "1.3" diff --git a/scrs-serve/src/handler.rs b/scrs-serve/src/handler.rs index 1b9209b..fd1dbd8 100644 --- a/scrs-serve/src/handler.rs +++ a/scrs-serve/src/handler.rs @@ -1,8 +1,8 @@ use crate::metadata::{StreamMetadata, TrackMetadata}; use derive_more::Deref; use futures::{SinkExt, StreamExt}; - +use slog::{debug, info}; use thiserror::Error; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; @@ -24,6 +24,8 @@ IoError(#[from] std::io::Error), #[error("source attempted to stream data, but the stream no longer exists: `{0}`")] StreamDisconnected(#[from] std::sync::mpsc::SendError), + #[error("endpoint not found: `{} {}`", .0.method(), .0.uri())] + EndpointNotFound(http::Request<()>), } #[derive(Debug, Deref)] @@ -52,21 +54,19 @@ } macro_rules! write_response { - ($writer:ident, $resp:expr) => {{ + ($log:ident, $writer:ident, $resp:expr) => {{ let resp = $resp.body(())?; - $writer - .write_all( - format!( - "{:?} {} {}\r\n", - resp.version(), - resp.status().as_str(), - resp.status().canonical_reason().unwrap_or("unknown error") - ) - .as_bytes(), - ) - .await?; + let resp_head = format!( + "{:?} {} {}\r\n", + resp.version(), + resp.status().as_str(), + resp.status().canonical_reason().unwrap_or("unknown error") + ); + debug!($log, "Response: {}", &resp_head[..resp_head.len() - 2]); + $writer.write_all(resp_head.as_bytes()).await?; + for (name, value) in resp.headers() { $writer.write_all(name.as_str().as_bytes()).await?; $writer.write_all(b": ").await?; @@ -101,6 +101,7 @@ pub async fn process( mut conn: TcpStream, stream: Arc, + log: slog::Logger, ) -> Result<(), HandlerError> { let mut buffer = bytes::BytesMut::with_capacity(1024); @@ -117,6 +118,8 @@ Request::try_from(parsed)? }; + + debug!(log, "Request: {} {}", req.method(), req.uri(); "user-agent" => req.headers().get("user-agent").and_then(|v| v.to_str().ok())); let resp = http::Response::builder() .version(http::Version::HTTP_11) @@ -132,9 +135,10 @@ v if v == stream.listen_uri => { // TODO: allow the streamer to set content-type let resp = resp.header("Content-Type", stream.content_type.as_ref()); - write_response!(conn, resp); + write_response!(log, conn, resp); let mut handle = stream.listen(); + debug!(log, "Streaming to new listener"; "mountpoint" => "/"); loop { if let Some(v) = handle.subscriber.next().await { @@ -143,7 +147,6 @@ } } "/stream" => { - println!("stream req: {:?}", req.headers()); if let Ok(ref mut handle) = stream.try_broadcast() { let mut resp = resp .header("Connection", "Close") @@ -152,7 +155,7 @@ let content_type = header_bytes_or_empty(&req, "content-type"); if content_type != stream.content_type.as_ref().as_bytes() { // todo: custom message - unsupported content type - write_response!(conn, resp.status(403)); + write_response!(log, conn, resp.status(403)); return Ok(()); } @@ -161,13 +164,13 @@ resp = resp.status(100); } - write_response!(conn, resp); + write_response!(log, conn, resp); + let metadata = StreamMetadata::from(req.headers()); + info!(log, "Accepted stream request"; "mountpoint" => "/", "user-agent" => &metadata.user_agent, "content-type" => &metadata.content_type, "name" => &metadata.name); + // todo: only stream handles should have write access to the metadata - stream - .metadata() - .stream - .store(Arc::new(Some(StreamMetadata::from(req.headers())))); + stream.metadata().stream.store(Arc::new(Some(metadata))); loop { if conn.read_buf(&mut buffer).await? == 0 { @@ -187,7 +190,7 @@ let resp = resp .header("Connection", "Close") .header("Content-Type", "application/json"); - write_response!(conn, resp); + write_response!(log, conn, resp); write_json!(conn, stream.metadata()); } "/admin/metadata" => { @@ -197,16 +200,25 @@ .map(str::as_bytes) .map(url::form_urlencoded::parse) { + let metadata = TrackMetadata::from(query); + + debug!(log, "Updated stream metadata"; "mountpoint" => "/", "track" => metadata.title, "artist" => metadata.artist, "album" => metadata.album); + // todo: auth stream .metadata() .track .store(Arc::new(Some(TrackMetadata::from(query)))); - write_response!(conn, resp); + write_response!(log, conn, resp); } + } + _ => { + let resp = resp.header("Connection", "Close").status(404); + write_response!(log, conn, resp); + + return Err(HandlerError::EndpointNotFound(req.0)); } - _ => panic!("Invalid request: {:?}", req), } Ok(()) diff --git a/scrs-serve/src/main.rs b/scrs-serve/src/main.rs index 11ccd4e..f68c793 100644 --- a/scrs-serve/src/main.rs +++ a/scrs-serve/src/main.rs @@ -8,7 +8,7 @@ mod stream; use clap::Clap; -use log::{debug, error, info}; +use slog::{crit, debug, error, info, o, Drain}; use std::sync::Arc; use tokio::net::TcpListener; @@ -29,7 +29,10 @@ #[tokio::main] async fn main() { - env_logger::init(); + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::CompactFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + let log = slog::Logger::root(drain, o!()); let opts: Opts = Opts::parse(); @@ -39,15 +42,15 @@ { Ok(v) => v, Err(e) => { - error!("{}", e); + crit!(log, "{}", e); return; } }; - listen_forward(cfg).await; + listen_forward(cfg, log).await; } -async fn listen_forward(config: config::Config) { +async fn listen_forward(config: config::Config, log: slog::Logger) { let stream = Arc::new(stream::Stream::from(config.stream)); let mut listener = TcpListener::bind(config.server.listen_address) @@ -55,19 +58,20 @@ .unwrap(); info!( - "Listening for new connections on {}", - config.server.listen_address + log, + "Listening for new connections on {}", config.server.listen_address ); loop { let (conn, remote) = listener.accept().await.unwrap(); - debug!("Accepted connection from {}", remote); + let log = log.new(o!("remote" => remote)); + debug!(log, "Accepted connection from {}", remote); let stream = stream.clone(); tokio::spawn(async move { - if let Err(e) = handler::process(conn, stream).await { - error!("Error handling request from {}: {}", remote, e); + if let Err(e) = handler::process(conn, stream, log.clone()).await { + error!(log, "Error handling request: {}", e); } }); } diff --git a/scrs-serve/src/metadata.rs b/scrs-serve/src/metadata.rs index f1b0581..f995521 100644 --- a/scrs-serve/src/metadata.rs +++ a/scrs-serve/src/metadata.rs @@ -25,9 +25,9 @@ #[derive(Debug, Default, Serialize)] pub struct StreamMetadata { - user_agent: String, - content_type: String, - name: String, + pub user_agent: String, + pub content_type: String, + pub name: String, } impl From<&http::HeaderMap> for StreamMetadata { @@ -49,9 +49,9 @@ #[derive(Debug, Default, Serialize)] pub struct TrackMetadata { - artist: Option, - title: Option, - album: Option, + pub artist: Option, + pub title: Option, + pub album: Option, } impl From> for TrackMetadata { -- rgit 0.1.3