Move to slog
Diff
Cargo.lock | 221 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------
scrs-serve/Cargo.toml | 5 +++--
scrs-serve/src/handler.rs | 60 ++++++++++++++++++++++++++++++++++++++++++------------------
scrs-serve/src/main.rs | 24 +++++++++++++++---------
scrs-serve/src/metadata.rs | 12 ++++++------
5 files changed, 236 insertions(+), 86 deletions(-)
@@ -10,19 +10,22 @@
]
[[package]]
name = "aho-corasick"
version = "0.7.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86"
dependencies = [
"memchr",
]
[[package]]
name = "arc-swap"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034"
[[package]]
name = "arrayref"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544"
[[package]]
name = "arrayvec"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8"
[[package]]
name = "atty"
@@ -40,12 +43,29 @@
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
[[package]]
name = "base64"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7"
[[package]]
name = "bitflags"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "blake2b_simd"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8fb2d74254a3a0b5cac33ac9f8ed0e44aa50378d9dbb2e5d83bd21ed1dc2c8a"
dependencies = [
"arrayref",
"arrayvec",
"constant_time_eq",
]
[[package]]
name = "bus_queue"
@@ -77,6 +97,17 @@
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "chrono"
version = "0.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "942f72db697d8767c22d46a598e01f2d3b475501ea43d0db4f16d90259182d0b"
dependencies = [
"num-integer",
"num-traits",
"time",
]
[[package]]
name = "clap"
version = "3.0.0-beta.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -135,6 +166,33 @@
dependencies = [
"getrandom",
"proc-macro-hack",
]
[[package]]
name = "constant_time_eq"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
[[package]]
name = "crossbeam-channel"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ee0cc8804d5393478d743b035099520087a5186f3b93fa58cec08fa62407b6"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
dependencies = [
"autocfg",
"cfg-if",
"lazy_static",
]
[[package]]
@@ -149,16 +207,24 @@
]
[[package]]
name = "env_logger"
version = "0.7.1"
name = "dirs"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
checksum = "13aea89a5c93364a98e9b37b2fa237effbb694d5cfe01c5b70941f7eb087d5e3"
dependencies = [
"atty",
"humantime",
"log",
"regex",
"termcolor",
"cfg-if",
"dirs-sys",
]
[[package]]
name = "dirs-sys"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e93d7f5705de3e49895a2b5e0b8855a1c27f080192ae9c32a6432d50741a57a"
dependencies = [
"libc",
"redox_users",
"winapi 0.3.9",
]
[[package]]
@@ -338,15 +404,6 @@
version = "1.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9"
[[package]]
name = "humantime"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
dependencies = [
"quick-error",
]
[[package]]
name = "idna"
@@ -515,6 +572,25 @@
"cfg-if",
"libc",
"winapi 0.3.9",
]
[[package]]
name = "num-integer"
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d59457e662d541ba17869cf51cf177c0b5f0cbf476c66bdc90bf1edac4f875b"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac267bcc07f48ee5f8935ab0d24f316fb722d7a1292e2913f0cc196b29ffd611"
dependencies = [
"autocfg",
]
[[package]]
@@ -647,12 +723,6 @@
dependencies = [
"unicode-xid",
]
[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quote"
@@ -670,22 +740,27 @@
checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
[[package]]
name = "regex"
version = "1.3.9"
name = "redox_users"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c3780fcf44b193bc4d09f36d2a3c87b251da4a046c87795a0d35f4f927ad8e6"
checksum = "09b23093265f8d200fa7b4c2c76297f47e681c655f6f1285a8780d6a022f7431"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
"thread_local",
"getrandom",
"redox_syscall",
"rust-argon2",
]
[[package]]
name = "regex-syntax"
version = "0.6.18"
name = "rust-argon2"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8"
checksum = "2bc8af4bda8e1ff4932523b94d3dd20ee30a87232323eda55903ffd71d2fb017"
dependencies = [
"base64",
"blake2b_simd",
"constant_time_eq",
"crossbeam-utils",
]
[[package]]
name = "ryu"
@@ -708,14 +783,15 @@
"bytes",
"clap",
"derive_more",
"env_logger",
"futures",
"http",
"httparse",
"log",
"mime",
"serde",
"serde_json",
"slog",
"slog-async",
"slog-term",
"thiserror",
"tokio",
"toml",
@@ -773,8 +849,39 @@
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]]
name = "slog"
version = "2.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cc9c640a4adbfbcc11ffb95efe5aa7af7309e002adab54b185507dbf2377b99"
[[package]]
name = "slog-async"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b3336ce47ce2f96673499fc07eb85e3472727b9a7a2959964b002c2ce8fbbb"
dependencies = [
"crossbeam-channel",
"slog",
"take_mut",
"thread_local",
]
[[package]]
name = "slog-term"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bab1d807cf71129b05ce36914e1dbb6fbfbdecaf686301cb457f4fa967f9f5b6"
dependencies = [
"atty",
"chrono",
"slog",
"term",
"thread_local",
]
[[package]]
name = "smallvec"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -818,6 +925,22 @@
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "take_mut"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60"
[[package]]
name = "term"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0863a3345e70f61d613eab32ee046ccd1bcc5f9105fe402c61fcd0c13eeb8b5"
dependencies = [
"dirs",
"winapi 0.3.9",
]
[[package]]
@@ -865,6 +988,16 @@
checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
dependencies = [
"lazy_static",
]
[[package]]
name = "time"
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
dependencies = [
"libc",
"winapi 0.3.9",
]
[[package]]
@@ -11,8 +11,9 @@
futures = "0.3"
tokio = { version = "0.2", features = ["full"] }
http = "0.2"
env_logger = "0.7"
log = "0.4"
slog = ""
slog-async = ""
slog-term = ""
thiserror = "1.0"
bytes = "0.5"
httparse = "1.3"
@@ -1,8 +1,8 @@
use crate::metadata::{StreamMetadata, TrackMetadata};
use derive_more::Deref;
use futures::{SinkExt, StreamExt};
use slog::{debug, info};
use thiserror::Error;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
@@ -24,6 +24,8 @@
IoError(#[from] std::io::Error),
#[error("source attempted to stream data, but the stream no longer exists: `{0}`")]
StreamDisconnected(#[from] std::sync::mpsc::SendError<bytes::Bytes>),
#[error("endpoint not found: `{} {}`", .0.method(), .0.uri())]
EndpointNotFound(http::Request<()>),
}
#[derive(Debug, Deref)]
@@ -52,21 +54,19 @@
}
macro_rules! write_response {
($writer:ident, $resp:expr) => {{
($log:ident, $writer:ident, $resp:expr) => {{
let resp = $resp.body(())?;
$writer
.write_all(
format!(
"{:?} {} {}\r\n",
resp.version(),
resp.status().as_str(),
resp.status().canonical_reason().unwrap_or("unknown error")
)
.as_bytes(),
)
.await?;
let resp_head = format!(
"{:?} {} {}\r\n",
resp.version(),
resp.status().as_str(),
resp.status().canonical_reason().unwrap_or("unknown error")
);
debug!($log, "Response: {}", &resp_head[..resp_head.len() - 2]);
$writer.write_all(resp_head.as_bytes()).await?;
for (name, value) in resp.headers() {
$writer.write_all(name.as_str().as_bytes()).await?;
$writer.write_all(b": ").await?;
@@ -101,6 +101,7 @@
pub async fn process(
mut conn: TcpStream,
stream: Arc<crate::stream::Stream>,
log: slog::Logger,
) -> Result<(), HandlerError> {
let mut buffer = bytes::BytesMut::with_capacity(1024);
@@ -117,6 +118,8 @@
Request::try_from(parsed)?
};
debug!(log, "Request: {} {}", req.method(), req.uri(); "user-agent" => req.headers().get("user-agent").and_then(|v| v.to_str().ok()));
let resp = http::Response::builder()
.version(http::Version::HTTP_11)
@@ -132,9 +135,10 @@
v if v == stream.listen_uri => {
let resp = resp.header("Content-Type", stream.content_type.as_ref());
write_response!(conn, resp);
write_response!(log, conn, resp);
let mut handle = stream.listen();
debug!(log, "Streaming to new listener"; "mountpoint" => "/");
loop {
if let Some(v) = handle.subscriber.next().await {
@@ -143,7 +147,6 @@
}
}
"/stream" => {
println!("stream req: {:?}", req.headers());
if let Ok(ref mut handle) = stream.try_broadcast() {
let mut resp = resp
.header("Connection", "Close")
@@ -152,7 +155,7 @@
let content_type = header_bytes_or_empty(&req, "content-type");
if content_type != stream.content_type.as_ref().as_bytes() {
write_response!(conn, resp.status(403));
write_response!(log, conn, resp.status(403));
return Ok(());
}
@@ -161,13 +164,13 @@
resp = resp.status(100);
}
write_response!(conn, resp);
write_response!(log, conn, resp);
let metadata = StreamMetadata::from(req.headers());
info!(log, "Accepted stream request"; "mountpoint" => "/", "user-agent" => &metadata.user_agent, "content-type" => &metadata.content_type, "name" => &metadata.name);
stream
.metadata()
.stream
.store(Arc::new(Some(StreamMetadata::from(req.headers()))));
stream.metadata().stream.store(Arc::new(Some(metadata)));
loop {
if conn.read_buf(&mut buffer).await? == 0 {
@@ -187,7 +190,7 @@
let resp = resp
.header("Connection", "Close")
.header("Content-Type", "application/json");
write_response!(conn, resp);
write_response!(log, conn, resp);
write_json!(conn, stream.metadata());
}
"/admin/metadata" => {
@@ -197,16 +200,25 @@
.map(str::as_bytes)
.map(url::form_urlencoded::parse)
{
let metadata = TrackMetadata::from(query);
debug!(log, "Updated stream metadata"; "mountpoint" => "/", "track" => metadata.title, "artist" => metadata.artist, "album" => metadata.album);
stream
.metadata()
.track
.store(Arc::new(Some(TrackMetadata::from(query))));
write_response!(conn, resp);
write_response!(log, conn, resp);
}
}
_ => {
let resp = resp.header("Connection", "Close").status(404);
write_response!(log, conn, resp);
return Err(HandlerError::EndpointNotFound(req.0));
}
_ => panic!("Invalid request: {:?}", req),
}
Ok(())
@@ -8,7 +8,7 @@
mod stream;
use clap::Clap;
use log::{debug, error, info};
use slog::{crit, debug, error, info, o, Drain};
use std::sync::Arc;
use tokio::net::TcpListener;
@@ -29,7 +29,10 @@
#[tokio::main]
async fn main() {
env_logger::init();
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::CompactFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
let log = slog::Logger::root(drain, o!());
let opts: Opts = Opts::parse();
@@ -39,15 +42,15 @@
{
Ok(v) => v,
Err(e) => {
error!("{}", e);
crit!(log, "{}", e);
return;
}
};
listen_forward(cfg).await;
listen_forward(cfg, log).await;
}
async fn listen_forward(config: config::Config) {
async fn listen_forward(config: config::Config, log: slog::Logger) {
let stream = Arc::new(stream::Stream::from(config.stream));
let mut listener = TcpListener::bind(config.server.listen_address)
@@ -55,19 +58,20 @@
.unwrap();
info!(
"Listening for new connections on {}",
config.server.listen_address
log,
"Listening for new connections on {}", config.server.listen_address
);
loop {
let (conn, remote) = listener.accept().await.unwrap();
debug!("Accepted connection from {}", remote);
let log = log.new(o!("remote" => remote));
debug!(log, "Accepted connection from {}", remote);
let stream = stream.clone();
tokio::spawn(async move {
if let Err(e) = handler::process(conn, stream).await {
error!("Error handling request from {}: {}", remote, e);
if let Err(e) = handler::process(conn, stream, log.clone()).await {
error!(log, "Error handling request: {}", e);
}
});
}
@@ -25,9 +25,9 @@
#[derive(Debug, Default, Serialize)]
pub struct StreamMetadata {
user_agent: String,
content_type: String,
name: String,
pub user_agent: String,
pub content_type: String,
pub name: String,
}
impl From<&http::HeaderMap> for StreamMetadata {
@@ -49,9 +49,9 @@
#[derive(Debug, Default, Serialize)]
pub struct TrackMetadata {
artist: Option<String>,
title: Option<String>,
album: Option<String>,
pub artist: Option<String>,
pub title: Option<String>,
pub album: Option<String>,
}
impl From<url::form_urlencoded::Parse<'_>> for TrackMetadata {