From 12a3e5ab31acaef514427fb5df355c1667ccd033 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Sat, 25 Jul 2020 13:59:02 +0000 Subject: [PATCH] Split up handlers and server, introduce config --- scrs-serve/Cargo.lock | 281 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- scrs-serve/Cargo.toml | 9 +++++++-- scrs-serve/config-example.toml | 10 ++++++++++ scrs-serve/src/config.rs | 24 ++++++++++++++++++++++++ scrs-serve/src/handler.rs | 213 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ scrs-serve/src/main.rs | 302 ++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------- scrs-serve/src/metadata.rs | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ scrs-serve/src/stream.rs | 86 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 729 insertions(+), 268 deletions(-) diff --git a/scrs-serve/Cargo.lock b/scrs-serve/Cargo.lock index e94b896..06a5c69 100644 --- a/scrs-serve/Cargo.lock +++ a/scrs-serve/Cargo.lock @@ -1,21 +1,21 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. [[package]] -name = "aho-corasick" -version = "0.7.13" +name = "ahash" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86" +checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217" dependencies = [ - "memchr", + "const-random", ] [[package]] -name = "ansi_term" -version = "0.11.0" +name = "aho-corasick" +version = "0.7.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" +checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86" dependencies = [ - "winapi 0.3.9", + "memchr", ] [[package]] @@ -36,6 +36,12 @@ ] [[package]] +name = "autocfg" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" + +[[package]] name = "bitflags" version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -51,6 +57,12 @@ "futures-core", "futures-sink", ] + +[[package]] +name = "byteorder" +version = "1.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" [[package]] name = "bytes" @@ -66,20 +78,66 @@ [[package]] name = "clap" -version = "2.33.1" +version = "3.0.0-beta.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129" +checksum = "860643c53f980f0d38a5e25dfab6c3c93b2cb3aa1fe192643d17a293c6c41936" dependencies = [ - "ansi_term", "atty", "bitflags", + "clap_derive", + "indexmap", + "lazy_static", + "os_str_bytes", "strsim", + "termcolor", "textwrap", "unicode-width", "vec_map", +] + +[[package]] +name = "clap_derive" +version = "3.0.0-beta.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb51c9e75b94452505acd21d929323f5a5c6c4735a852adbd39ef5fb1b014f30" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "cloudabi" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +dependencies = [ + "bitflags", +] + +[[package]] +name = "const-random" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a" +dependencies = [ + "const-random-macro", + "proc-macro-hack", ] [[package]] +name = "const-random-macro" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a" +dependencies = [ + "getrandom", + "proc-macro-hack", +] + +[[package]] name = "derive_more" version = "0.99.9" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -218,6 +276,35 @@ "proc-macro-hack", "proc-macro-nested", "slab", +] + +[[package]] +name = "getrandom" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "hashbrown" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34f595585f103464d8d2f6e9864682d74c1601fed5e07d62b1c9058dba8246fb" +dependencies = [ + "autocfg", +] + +[[package]] +name = "heck" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205" +dependencies = [ + "unicode-segmentation", ] [[package]] @@ -264,6 +351,16 @@ "matches", "unicode-bidi", "unicode-normalization", +] + +[[package]] +name = "indexmap" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b88cd59ee5f71fea89a62248fc8f387d44400cefe05ef548466d61ced9029a7" +dependencies = [ + "autocfg", + "hashbrown", ] [[package]] @@ -302,6 +399,15 @@ version = "0.2.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd7d4bd64732af4bf3a67f367c27df8520ad7e230c5817b8ff485864d80242b9" + +[[package]] +name = "lock_api" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75" +dependencies = [ + "scopeguard", +] [[package]] name = "log" @@ -325,6 +431,12 @@ checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" [[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + +[[package]] name = "mio" version = "0.6.22" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -414,8 +526,38 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" + +[[package]] +name = "os_str_bytes" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06de47b848347d8c4c94219ad8ecd35eb90231704b067e67e6ae2e36ee023510" [[package]] +name = "parking_lot" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3" +dependencies = [ + "cfg-if", + "cloudabi", + "libc", + "redox_syscall", + "smallvec", + "winapi 0.3.9", +] + +[[package]] name = "percent-encoding" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -452,6 +594,32 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro-error" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "syn-mid", + "version_check", +] [[package]] name = "proc-macro-hack" @@ -520,6 +688,12 @@ checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" [[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] name = "scrs" version = "0.1.0" dependencies = [ @@ -532,10 +706,15 @@ "futures", "http", "httparse", + "log", + "mime", "serde", "serde_json", + "thiserror", "tokio", + "toml", "url", + "ustr", ] [[package]] @@ -586,6 +765,12 @@ checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" [[package]] +name = "smallvec" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3757cb9d89161a2f24e1cf78efa0c1fcff485d18e3f55e0aa3480824ddaa0f3f" + +[[package]] name = "socket2" version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -599,9 +784,9 @@ [[package]] name = "strsim" -version = "0.8.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "syn" @@ -612,6 +797,17 @@ "proc-macro2", "quote", "unicode-xid", +] + +[[package]] +name = "syn-mid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7be3539f6c128a931cf19dcee741c1af532c7fd387baa739c03dd2e96479338a" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -630,6 +826,26 @@ checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" dependencies = [ "unicode-width", +] + +[[package]] +name = "thiserror" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dfdd070ccd8ccb78f4ad66bf1982dc37f620ef696c6b5028fe2ed83dd3d0d08" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -680,6 +896,15 @@ "proc-macro2", "quote", "syn", +] + +[[package]] +name = "toml" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffc92d160b1eef40665be3a05630d003936a3bc7da7421277846c2613e92c71a" +dependencies = [ + "serde", ] [[package]] @@ -699,6 +924,12 @@ dependencies = [ "tinyvec", ] + +[[package]] +name = "unicode-segmentation" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e83e153d1053cbb5a118eeff7fd5be06ed99153f00dbcd8ae310c5fb2b22edc0" [[package]] name = "unicode-width" @@ -721,6 +952,18 @@ "idna", "matches", "percent-encoding", +] + +[[package]] +name = "ustr" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0548099019a3ce5393ab9f2754ee9e4cb49e41cc0ebe3319171c43a59a147c53" +dependencies = [ + "ahash", + "byteorder", + "lazy_static", + "parking_lot", ] [[package]] @@ -728,6 +971,18 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + +[[package]] +name = "version_check" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" + +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" [[package]] name = "winapi" diff --git a/scrs-serve/Cargo.toml b/scrs-serve/Cargo.toml index 64efb3f..0b87003 100644 --- a/scrs-serve/Cargo.toml +++ a/scrs-serve/Cargo.toml @@ -12,11 +12,16 @@ tokio = { version = "0.2", features = ["full"] } http = "0.2" env_logger = "0.7" +log = "0.4" +thiserror = "1.0" bytes = "0.5" httparse = "1.3" -clap = "2.33" derive_more = "0.99" arc-swap = "0.4" url = "2.1" serde = { version = "1.0", features = ["derive"] } -serde_json = "1"+serde_json = "1" +mime = "0.3" +ustr = "0.7" +clap = "3.0.0-beta.1" +toml = "0.5" diff --git a/scrs-serve/config-example.toml b/scrs-serve/config-example.toml new file mode 100644 index 0000000..5a5837d 100644 --- /dev/null +++ a/scrs-serve/config-example.toml @@ -1,0 +1,10 @@ +[server] +listen_address = "0.0.0.0:3000" + +[stream] +mount_point = "/" +content_type = "audio/mpeg" +bitrate = 128 +max_conns = 1000 +password = "abcdef" +buffer_size = 128diff --git a/scrs-serve/src/config.rs b/scrs-serve/src/config.rs new file mode 100644 index 0000000..0041103 100644 --- /dev/null +++ a/scrs-serve/src/config.rs @@ -1,0 +1,24 @@ +use serde::Deserialize; + +use std::net::SocketAddr; + +#[derive(Debug, Deserialize)] +pub struct Config { + pub server: ServerConfig, + pub stream: StreamConfig, +} + +#[derive(Debug, Deserialize)] +pub struct ServerConfig { + pub listen_address: SocketAddr, +} + +#[derive(Debug, Deserialize)] +pub struct StreamConfig { + // pub mount_point: String, + pub content_type: String, + pub bitrate: usize, + pub max_conns: usize, + pub password: String, + pub buffer_size: usize, +} diff --git a/scrs-serve/src/handler.rs b/scrs-serve/src/handler.rs new file mode 100644 index 0000000..e2a20ec 100644 --- /dev/null +++ a/scrs-serve/src/handler.rs @@ -1,0 +1,213 @@ +use crate::metadata::{StreamMetadata, TrackMetadata}; + +use derive_more::Deref; +use futures::{SinkExt, StreamExt}; + +use thiserror::Error; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; + +use std::convert::TryFrom; +use std::sync::Arc; + +#[derive(Error, Debug)] +pub enum HandlerError { + #[error("failed to parse request from client: `{0}`")] + RequestError(#[from] httparse::Error), + #[error("failed to create response for client: `{0}`")] + ResponseError(#[from] http::Error), + #[error("other side didn't send valid a valid http request")] + InvalidHttp, + #[error("failed to create JSON blob response for client: `{0}`")] + JsonWriteError(serde_json::Error), + #[error("i/o failed for client: `{0}`")] + IoError(#[from] std::io::Error), + #[error("source attempted to stream data, but the stream no longer exists: `{0}`")] + StreamDisconnected(#[from] std::sync::mpsc::SendError), +} + +#[derive(Debug, Deref)] +struct Request(http::Request<()>); + +impl TryFrom> for Request { + type Error = HandlerError; + + fn try_from(parsed: httparse::Request<'_, '_>) -> Result { + let mut req = http::Request::builder() + .version(http::Version::HTTP_11) + .method(parsed.method.ok_or(HandlerError::InvalidHttp)?) + .uri(parsed.path.ok_or(HandlerError::InvalidHttp)?); + + for header in parsed.headers { + req.headers_mut().unwrap().insert( + http::header::HeaderName::from_bytes(header.name.as_bytes()) + .map_err(|_| httparse::Error::HeaderName)?, + http::HeaderValue::try_from(header.value) + .map_err(|_| httparse::Error::HeaderValue)?, + ); + } + + Ok(Self(req.body(())?)) + } +} + +macro_rules! write_response { + ($writer:ident, $resp:expr) => {{ + let resp = $resp.body(())?; + + $writer + .write_all( + format!( + "{:?} {} {}\r\n", + resp.version(), + resp.status().as_str(), + resp.status().canonical_reason().unwrap_or("unknown error") + ) + .as_bytes(), + ) + .await?; + + for (name, value) in resp.headers() { + $writer.write_all(name.as_str().as_bytes()).await?; + $writer.write_all(b": ").await?; + $writer.write_all(value.as_bytes()).await?; + $writer.write_all(b"\r\n").await?; + } + + $writer.write_all(b"\r\n").await?; + $writer.flush().await?; + }}; +} + +macro_rules! write_json { + ($conn:ident, $x:expr) => { + $conn + .write_all( + serde_json::to_string($x) + .map_err(HandlerError::JsonWriteError)? + .as_bytes(), + ) + .await?; + }; +} + +fn header_bytes_or_empty<'a>(req: &'a Request, header: &str) -> &'a [u8] { + req.headers() + .get(header) + .map(http::HeaderValue::as_bytes) + .unwrap_or_default() +} + +pub async fn process( + mut conn: TcpStream, + stream: Arc, +) -> Result<(), HandlerError> { + let mut buffer = bytes::BytesMut::with_capacity(1024); + + if conn.read_buf(&mut buffer).await? == 0 { + return Ok(()); + } + + let req = { + let mut headers = [httparse::EMPTY_HEADER; 16]; + let mut parsed = httparse::Request::new(&mut headers); + + let header_buffer = buffer.split().freeze(); + parsed.parse(&header_buffer[..])?; + + Request::try_from(parsed)? + }; + + let resp = http::Response::builder() + .version(http::Version::HTTP_11) + .header( + "Server", + concat!(clap::crate_name!(), "/", clap::crate_version!()), + ) + .header("Accept-Encoding", "identity") + .header("Connection", "keep-alive") + .header("Access-Control-Allow-Origin", "*"); + + match req.uri().path() { + v if v == stream.listen_uri => { + // TODO: allow the streamer to set content-type + let resp = resp.header("Content-Type", stream.content_type.as_ref()); + write_response!(conn, resp); + + let mut handle = stream.listen(); + + loop { + if let Some(v) = handle.subscriber.next().await { + conn.write_all(v.as_ref()).await?; + } + } + } + "/stream" => { + println!("stream req: {:?}", req.headers()); + if let Ok(ref mut handle) = stream.try_broadcast() { + let mut resp = resp + .header("Connection", "Close") + .header("Allow", "PUT, SOURCE"); + + let content_type = header_bytes_or_empty(&req, "content-type"); + if content_type != stream.content_type.as_ref().as_bytes() { + // todo: custom message - unsupported content type + write_response!(conn, resp.status(403)); + return Ok(()); + } + + let expect_header = header_bytes_or_empty(&req, "expect"); + if expect_header == b"100-continue" { + resp = resp.status(100); + } + + write_response!(conn, resp); + + // todo: only stream handles should have write access to the metadata + stream + .metadata() + .stream + .store(Arc::new(Some(StreamMetadata::from(req.headers())))); + + loop { + if conn.read_buf(&mut buffer).await? == 0 { + break; + } + + handle.publisher.send(buffer.split().freeze()).await?; + } + + stream.metadata().stream.store(Arc::new(None)); + stream.metadata().track.store(Arc::new(None)); + } else { + panic!("someone's already streaming!"); + } + } + "/metadata" => { + let resp = resp + .header("Connection", "Close") + .header("Content-Type", "application/json"); + write_response!(conn, resp); + write_json!(conn, stream.metadata()); + } + "/admin/metadata" => { + if let Some(query) = req + .uri() + .query() + .map(|v| v.as_bytes()) + .map(url::form_urlencoded::parse) + { + // todo: auth + stream + .metadata() + .track + .store(Arc::new(Some(TrackMetadata::from(query)))); + + write_response!(conn, resp); + } + } + _ => panic!("Invalid request: {:?}", req), + } + + Ok(()) +} diff --git a/scrs-serve/src/main.rs b/scrs-serve/src/main.rs index 0b97f9a..11ccd4e 100644 --- a/scrs-serve/src/main.rs +++ a/scrs-serve/src/main.rs @@ -1,278 +1,74 @@ #![deny(clippy::pedantic)] #![allow(clippy::used_underscore_binding)] +#![allow(clippy::module_name_repetitions)] -use arc_swap::ArcSwap; -use bus_queue::flavors::arc_swap::{bounded, Publisher, Subscriber}; -use bytes::Bytes; -use derive_more::Deref; -use futures::{SinkExt, StreamExt}; -use serde::Serialize; +mod config; +mod handler; +mod metadata; +mod stream; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::TcpStream; -use tokio::sync::Mutex; +use clap::Clap; +use log::{debug, error, info}; +use std::sync::Arc; +use tokio::net::TcpListener; -use std::convert::TryFrom; -use std::net::SocketAddr; -use std::sync::{ - atomic::{AtomicU64, Ordering}, - Arc, -}; - -#[derive(Debug, Deref)] -struct Request(http::Request<()>); - -impl From> for Request { - fn from(parsed: httparse::Request<'_, '_>) -> Self { - let mut req = http::Request::builder() - .version(http::Version::HTTP_11) - .method(parsed.method.unwrap()) - .uri(parsed.path.unwrap()); - - for header in parsed.headers { - req.headers_mut().unwrap().insert( - http::header::HeaderName::from_bytes(header.name.as_bytes()).unwrap(), - http::HeaderValue::try_from(header.value).unwrap(), - ); - } - - Self(req.body(()).unwrap()) - } -} - -#[derive(Debug, Default)] -struct MetadataContainer { - stream: ArcSwap>, - track: ArcSwap>, - listeners: AtomicU64, +#[derive(thiserror::Error, Debug)] +pub enum InitError { + #[error("Couldn't read config file: `{0}`")] + ReadError(#[from] std::io::Error), + #[error("Couldn't parse config file: `{0}`")] + DeserializationError(#[from] toml::de::Error), } -impl serde::ser::Serialize for MetadataContainer { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - use serde::ser::SerializeStruct; - let mut struc = serializer.serialize_struct("", 2)?; - struc.serialize_field("stream", &**self.stream.load())?; - struc.serialize_field("track", &**self.track.load())?; - struc.serialize_field("listeners", &self.listeners)?; - struc.end() - } +#[derive(Clap)] +#[clap(version = "1.0", author = "Jordan D. ")] +struct Opts { + #[clap(short, long)] + config: std::path::PathBuf, } -#[derive(Debug, Default, Serialize)] -struct StreamMetadata { - user_agent: String, - content_type: String, - name: String, -} +#[tokio::main] +async fn main() { + env_logger::init(); -impl From<&http::HeaderMap> for StreamMetadata { - fn from(map: &http::HeaderMap) -> Self { - let string_from_header = |v: Option<&http::HeaderValue>| { - v.map(http::HeaderValue::as_bytes) - .map(String::from_utf8_lossy) - .unwrap_or_default() - .into_owned() - }; + let opts: Opts = Opts::parse(); - Self { - user_agent: string_from_header(map.get("user-agent")), - content_type: string_from_header(map.get("content-type")), - name: string_from_header(map.get("ice-name")), + let cfg = match std::fs::read_to_string(opts.config) + .map_err(InitError::ReadError) + .and_then(|s| toml::from_str(&s).map_err(Into::into)) + { + Ok(v) => v, + Err(e) => { + error!("{}", e); + return; } - } -} + }; -#[derive(Debug, Default, Serialize)] -struct TrackMetadata { - artist: Option, - title: Option, - album: Option, + listen_forward(cfg).await; } - -impl From> for TrackMetadata { - fn from(query: url::form_urlencoded::Parse<'_>) -> Self { - let mut meta = Self::default(); - - for (key, value) in query { - match key.as_ref() { - "artist" => meta.artist = Some(value.into_owned()), - "title" => meta.title = Some(value.into_owned()), - "album" => meta.album = Some(value.into_owned()), - _ => {} - } - } - meta - } -} +async fn listen_forward(config: config::Config) { + let stream = Arc::new(stream::Stream::from(config.stream)); -async fn write_response(mut writer: W, resp: http::Response<()>) { - writer - .write_all( - format!( - "{:?} {} {}\r\n", - resp.version(), - resp.status().as_str(), - resp.status().canonical_reason().unwrap() - ) - .as_bytes(), - ) + let mut listener = TcpListener::bind(config.server.listen_address) .await .unwrap(); - - for (name, value) in resp.headers() { - writer.write_all(name.as_str().as_bytes()).await.unwrap(); - writer.write_all(b": ").await.unwrap(); - writer.write_all(value.as_bytes()).await.unwrap(); - writer.write_all(b"\r\n").await.unwrap(); - } - - writer.write_all(b"\r\n").await.unwrap(); - writer.flush().await.unwrap(); -} - -async fn process( - mut stream: TcpStream, - publisher: Arc>>, - mut subscriber: Subscriber, - metadata: Arc, -) { - println!("accepted"); - - let mut buffer = bytes::BytesMut::with_capacity(1024); - - if stream.read_buf(&mut buffer).await.unwrap() == 0 { - return; - } - - let req = { - let mut headers = [httparse::EMPTY_HEADER; 16]; - let mut parsed = httparse::Request::new(&mut headers); - - let header_buffer = buffer.split().freeze(); - parsed.parse(&header_buffer[..]).unwrap(); - - Request::from(parsed) - }; - - let resp = http::Response::builder() - .version(http::Version::HTTP_11) - .header( - "Server", - concat!(clap::crate_name!(), "/", clap::crate_version!()), - ) - .header("Accept-Encoding", "identity") - .header("Connection", "keep-alive") - .header("Access-Control-Allow-Origin", "*"); - - match req.uri().path() { - "/listen.mp3" => { - // TODO: allow the streamer to set content-type - let resp = resp.header("Content-Type", "audio/mpeg").body(()).unwrap(); - write_response(&mut stream, resp).await; - - metadata.listeners.fetch_add(1, Ordering::Relaxed); - - loop { - if let Some(v) = subscriber.next().await { - if stream.write_all(v.as_ref()).await.is_err() { - break; - } - } - } - - metadata.listeners.fetch_sub(1, Ordering::Relaxed); - } - "/stream" => { - println!("stream req: {:?}", req.headers()); - if let Ok(ref mut publisher) = publisher.try_lock() { - metadata - .stream - .store(Arc::new(Some(StreamMetadata::from(req.headers())))); - - let mut resp = resp - .header("Connection", "Close") - .header("Allow", "PUT, SOURCE"); - - let expect_header = req - .headers() - .get("expect") - .map(http::HeaderValue::as_bytes) - .unwrap_or_default(); - if expect_header == b"100-continue" { - resp = resp.status(100); - } - - write_response(&mut stream, resp.body(()).unwrap()).await; - - loop { - if stream.read_buf(&mut buffer).await.unwrap() == 0 { - break; - } - - publisher.send(buffer.split().freeze()).await.unwrap(); - } - - metadata.stream.store(Arc::new(None)); - metadata.track.store(Arc::new(None)); - } else { - panic!("someone's already streaming!"); - } - } - "/metadata" => { - let resp = resp - .header("Connection", "Close") - .header("Content-Type", "application/json") - .body(()) - .unwrap(); - write_response(&mut stream, resp).await; - - stream - .write_all(serde_json::to_string(&*metadata).unwrap().as_bytes()) - .await - .unwrap(); - } - "/admin/metadata" => { - let query = url::form_urlencoded::parse(req.uri().query().unwrap().as_bytes()); - - metadata - .track - .store(Arc::new(Some(TrackMetadata::from(query)))); - - let resp = resp.body(()).unwrap(); - write_response(&mut stream, resp).await; - } - _ => panic!("Invalid request: {:?}", req), - } -} -#[tokio::main] -async fn main() { - env_logger::init(); - listen_forward(3000).await; -} + info!( + "Listening for new connections on {}", + config.server.listen_address + ); -async fn listen_forward(port: u16) { - let (publisher, subscriber) = bounded::(128); - let publisher = Arc::new(Mutex::new(publisher)); - let metadata = Arc::new(MetadataContainer::default()); - - let addr = SocketAddr::from(([127, 0, 0, 1], port)); - - // let the consumer pass this in - let mut listener = tokio::net::TcpListener::bind(addr).await.unwrap(); - loop { - println!("listening for new conns..."); - let (stream, _) = listener.accept().await.unwrap(); + let (conn, remote) = listener.accept().await.unwrap(); + debug!("Accepted connection from {}", remote); - let publisher = publisher.clone(); - let subscriber = subscriber.clone(); - let metadata = metadata.clone(); + let stream = stream.clone(); - tokio::spawn(process(stream, publisher, subscriber, metadata)); + tokio::spawn(async move { + if let Err(e) = handler::process(conn, stream).await { + error!("Error handling request from {}: {}", remote, e); + } + }); } } diff --git a/scrs-serve/src/metadata.rs b/scrs-serve/src/metadata.rs new file mode 100644 index 0000000..f1b0581 100644 --- /dev/null +++ a/scrs-serve/src/metadata.rs @@ -1,0 +1,72 @@ +use arc_swap::ArcSwap; +use serde::Serialize; +use std::sync::atomic::AtomicU64; + +#[derive(Debug, Default)] +pub struct MetadataContainer { + pub stream: ArcSwap>, + pub track: ArcSwap>, + pub listeners: AtomicU64, +} + +impl serde::ser::Serialize for MetadataContainer { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut struc = serializer.serialize_struct("", 3)?; + struc.serialize_field("stream", &**self.stream.load())?; + struc.serialize_field("track", &**self.track.load())?; + struc.serialize_field("listeners", &self.listeners)?; + struc.end() + } +} + +#[derive(Debug, Default, Serialize)] +pub struct StreamMetadata { + user_agent: String, + content_type: String, + name: String, +} + +impl From<&http::HeaderMap> for StreamMetadata { + fn from(map: &http::HeaderMap) -> Self { + let string_from_header = |v: Option<&http::HeaderValue>| { + v.map(http::HeaderValue::as_bytes) + .map(String::from_utf8_lossy) + .unwrap_or_default() + .into_owned() + }; + + Self { + user_agent: string_from_header(map.get("user-agent")), + content_type: string_from_header(map.get("content-type")), + name: string_from_header(map.get("ice-name")), + } + } +} + +#[derive(Debug, Default, Serialize)] +pub struct TrackMetadata { + artist: Option, + title: Option, + album: Option, +} + +impl From> for TrackMetadata { + fn from(query: url::form_urlencoded::Parse<'_>) -> Self { + let mut meta = Self::default(); + + for (key, value) in query { + match key.as_ref() { + "artist" => meta.artist = Some(value.into_owned()), + "title" => meta.title = Some(value.into_owned()), + "album" => meta.album = Some(value.into_owned()), + _ => {} + } + } + + meta + } +} diff --git a/scrs-serve/src/stream.rs b/scrs-serve/src/stream.rs new file mode 100644 index 0000000..5143cb9 100644 --- /dev/null +++ a/scrs-serve/src/stream.rs @@ -1,0 +1,86 @@ +use bus_queue::flavors::arc_swap::{bounded, Publisher, Subscriber}; +use bytes::Bytes; +use mime::Mime; +use std::sync::atomic::{AtomicU64, Ordering}; +use tokio::sync::Mutex; +use ustr::ustr; + +use crate::metadata::MetadataContainer; + +pub struct Stream { + publisher: Mutex>, + subscriber: Subscriber, + metadata: MetadataContainer, + bitrate: usize, + max_conns: usize, + password: String, + pub content_type: Mime, + pub listen_uri: &'static str, +} + +impl From for Stream { + fn from(config: crate::config::StreamConfig) -> Self { + let (publisher, subscriber) = bounded::(config.buffer_size); + + let content_type: Mime = config.content_type.parse().unwrap(); + + let listen_uri = match (content_type.type_(), content_type.subtype()) { + (mime::AUDIO, mime::MPEG) => ustr("/listen.mp3").as_str(), + (mime::AUDIO, mime::OGG) => ustr("/listen.ogg").as_str(), + _ => panic!("unknown stream content type"), + }; + + Self { + publisher: Mutex::new(publisher), + subscriber, + metadata: MetadataContainer::default(), + bitrate: config.bitrate, + max_conns: config.max_conns, + password: config.password, + content_type, + listen_uri, + } + } +} + +impl Stream { + pub fn listen(&self) -> Listener<'_> { + Listener::new(self) + } + + pub fn try_broadcast(&self) -> Result, tokio::sync::TryLockError> { + Ok(BroadcastHandle { + publisher: self.publisher.try_lock()?, + }) + } + + pub fn metadata(&self) -> &MetadataContainer { + &self.metadata + } +} + +pub struct BroadcastHandle<'a> { + pub publisher: tokio::sync::MutexGuard<'a, Publisher>, +} + +pub struct Listener<'a> { + pub subscriber: Subscriber, + listener_count: &'a AtomicU64, +} + +impl Drop for Listener<'_> { + fn drop(&mut self) { + self.listener_count.fetch_sub(1, Ordering::Relaxed); + } +} + +impl<'a> Listener<'a> { + pub fn new(stream: &'a Stream) -> Listener<'a> { + stream.metadata.listeners.fetch_add(1, Ordering::Relaxed); + + Self { + subscriber: stream.subscriber.clone(), + listener_count: &stream.metadata.listeners, + } + } +} -- rgit 0.1.3