Split up handlers and server, introduce config
Diff
scrs-serve/Cargo.lock | 281 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
scrs-serve/Cargo.toml | 9 +++++++--
scrs-serve/config-example.toml | 10 ++++++++++
scrs-serve/src/config.rs | 24 ++++++++++++++++++++++++
scrs-serve/src/handler.rs | 213 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
scrs-serve/src/main.rs | 302 ++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------
scrs-serve/src/metadata.rs | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
scrs-serve/src/stream.rs | 86 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
8 files changed, 729 insertions(+), 268 deletions(-)
@@ -1,21 +1,21 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "aho-corasick"
version = "0.7.13"
name = "ahash"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86"
checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217"
dependencies = [
"memchr",
"const-random",
]
[[package]]
name = "ansi_term"
version = "0.11.0"
name = "aho-corasick"
version = "0.7.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86"
dependencies = [
"winapi 0.3.9",
"memchr",
]
[[package]]
@@ -36,6 +36,12 @@
]
[[package]]
name = "autocfg"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
[[package]]
name = "bitflags"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -51,6 +57,12 @@
"futures-core",
"futures-sink",
]
[[package]]
name = "byteorder"
version = "1.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de"
[[package]]
name = "bytes"
@@ -66,20 +78,66 @@
[[package]]
name = "clap"
version = "2.33.1"
version = "3.0.0-beta.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129"
checksum = "860643c53f980f0d38a5e25dfab6c3c93b2cb3aa1fe192643d17a293c6c41936"
dependencies = [
"ansi_term",
"atty",
"bitflags",
"clap_derive",
"indexmap",
"lazy_static",
"os_str_bytes",
"strsim",
"termcolor",
"textwrap",
"unicode-width",
"vec_map",
]
[[package]]
name = "clap_derive"
version = "3.0.0-beta.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb51c9e75b94452505acd21d929323f5a5c6c4735a852adbd39ef5fb1b014f30"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "cloudabi"
version = "0.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
dependencies = [
"bitflags",
]
[[package]]
name = "const-random"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a"
dependencies = [
"const-random-macro",
"proc-macro-hack",
]
[[package]]
name = "const-random-macro"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a"
dependencies = [
"getrandom",
"proc-macro-hack",
]
[[package]]
name = "derive_more"
version = "0.99.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -218,6 +276,35 @@
"proc-macro-hack",
"proc-macro-nested",
"slab",
]
[[package]]
name = "getrandom"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "hashbrown"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34f595585f103464d8d2f6e9864682d74c1601fed5e07d62b1c9058dba8246fb"
dependencies = [
"autocfg",
]
[[package]]
name = "heck"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205"
dependencies = [
"unicode-segmentation",
]
[[package]]
@@ -264,6 +351,16 @@
"matches",
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "indexmap"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b88cd59ee5f71fea89a62248fc8f387d44400cefe05ef548466d61ced9029a7"
dependencies = [
"autocfg",
"hashbrown",
]
[[package]]
@@ -302,6 +399,15 @@
version = "0.2.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd7d4bd64732af4bf3a67f367c27df8520ad7e230c5817b8ff485864d80242b9"
[[package]]
name = "lock_api"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75"
dependencies = [
"scopeguard",
]
[[package]]
name = "log"
@@ -325,6 +431,12 @@
checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400"
[[package]]
name = "mime"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "mio"
version = "0.6.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -414,8 +526,38 @@
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d"
[[package]]
name = "os_str_bytes"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06de47b848347d8c4c94219ad8ecd35eb90231704b067e67e6ae2e36ee023510"
[[package]]
name = "parking_lot"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3"
dependencies = [
"cfg-if",
"cloudabi",
"libc",
"redox_syscall",
"smallvec",
"winapi 0.3.9",
]
[[package]]
name = "percent-encoding"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -452,6 +594,32 @@
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro-error"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn-mid",
"version_check",
]
[[package]]
name = "proc-macro-hack"
@@ -520,6 +688,12 @@
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "scrs"
version = "0.1.0"
dependencies = [
@@ -532,10 +706,15 @@
"futures",
"http",
"httparse",
"log",
"mime",
"serde",
"serde_json",
"thiserror",
"tokio",
"toml",
"url",
"ustr",
]
[[package]]
@@ -586,6 +765,12 @@
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]]
name = "smallvec"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3757cb9d89161a2f24e1cf78efa0c1fcff485d18e3f55e0aa3480824ddaa0f3f"
[[package]]
name = "socket2"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -599,9 +784,9 @@
[[package]]
name = "strsim"
version = "0.8.0"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "syn"
@@ -612,6 +797,17 @@
"proc-macro2",
"quote",
"unicode-xid",
]
[[package]]
name = "syn-mid"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7be3539f6c128a931cf19dcee741c1af532c7fd387baa739c03dd2e96479338a"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
@@ -630,6 +826,26 @@
checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
dependencies = [
"unicode-width",
]
[[package]]
name = "thiserror"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dfdd070ccd8ccb78f4ad66bf1982dc37f620ef696c6b5028fe2ed83dd3d0d08"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
@@ -680,6 +896,15 @@
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "toml"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffc92d160b1eef40665be3a05630d003936a3bc7da7421277846c2613e92c71a"
dependencies = [
"serde",
]
[[package]]
@@ -699,6 +924,12 @@
dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-segmentation"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e83e153d1053cbb5a118eeff7fd5be06ed99153f00dbcd8ae310c5fb2b22edc0"
[[package]]
name = "unicode-width"
@@ -721,6 +952,18 @@
"idna",
"matches",
"percent-encoding",
]
[[package]]
name = "ustr"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0548099019a3ce5393ab9f2754ee9e4cb49e41cc0ebe3319171c43a59a147c53"
dependencies = [
"ahash",
"byteorder",
"lazy_static",
"parking_lot",
]
[[package]]
@@ -728,6 +971,18 @@
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "version_check"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "winapi"
@@ -12,11 +12,16 @@
tokio = { version = "0.2", features = ["full"] }
http = "0.2"
env_logger = "0.7"
log = "0.4"
thiserror = "1.0"
bytes = "0.5"
httparse = "1.3"
clap = "2.33"
derive_more = "0.99"
arc-swap = "0.4"
url = "2.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_json = "1"
mime = "0.3"
ustr = "0.7"
clap = "3.0.0-beta.1"
toml = "0.5"
@@ -1,0 +1,10 @@
[server]
listen_address = "0.0.0.0:3000"
[stream]
mount_point = "/"
content_type = "audio/mpeg"
bitrate = 128
max_conns = 1000
password = "abcdef"
buffer_size = 128
@@ -1,0 +1,24 @@
use serde::Deserialize;
use std::net::SocketAddr;
#[derive(Debug, Deserialize)]
pub struct Config {
pub server: ServerConfig,
pub stream: StreamConfig,
}
#[derive(Debug, Deserialize)]
pub struct ServerConfig {
pub listen_address: SocketAddr,
}
#[derive(Debug, Deserialize)]
pub struct StreamConfig {
pub content_type: String,
pub bitrate: usize,
pub max_conns: usize,
pub password: String,
pub buffer_size: usize,
}
@@ -1,0 +1,213 @@
use crate::metadata::{StreamMetadata, TrackMetadata};
use derive_more::Deref;
use futures::{SinkExt, StreamExt};
use thiserror::Error;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use std::convert::TryFrom;
use std::sync::Arc;
#[derive(Error, Debug)]
pub enum HandlerError {
#[error("failed to parse request from client: `{0}`")]
RequestError(#[from] httparse::Error),
#[error("failed to create response for client: `{0}`")]
ResponseError(#[from] http::Error),
#[error("other side didn't send valid a valid http request")]
InvalidHttp,
#[error("failed to create JSON blob response for client: `{0}`")]
JsonWriteError(serde_json::Error),
#[error("i/o failed for client: `{0}`")]
IoError(#[from] std::io::Error),
#[error("source attempted to stream data, but the stream no longer exists: `{0}`")]
StreamDisconnected(#[from] std::sync::mpsc::SendError<bytes::Bytes>),
}
#[derive(Debug, Deref)]
struct Request(http::Request<()>);
impl TryFrom<httparse::Request<'_, '_>> for Request {
type Error = HandlerError;
fn try_from(parsed: httparse::Request<'_, '_>) -> Result<Self, Self::Error> {
let mut req = http::Request::builder()
.version(http::Version::HTTP_11)
.method(parsed.method.ok_or(HandlerError::InvalidHttp)?)
.uri(parsed.path.ok_or(HandlerError::InvalidHttp)?);
for header in parsed.headers {
req.headers_mut().unwrap().insert(
http::header::HeaderName::from_bytes(header.name.as_bytes())
.map_err(|_| httparse::Error::HeaderName)?,
http::HeaderValue::try_from(header.value)
.map_err(|_| httparse::Error::HeaderValue)?,
);
}
Ok(Self(req.body(())?))
}
}
macro_rules! write_response {
($writer:ident, $resp:expr) => {{
let resp = $resp.body(())?;
$writer
.write_all(
format!(
"{:?} {} {}\r\n",
resp.version(),
resp.status().as_str(),
resp.status().canonical_reason().unwrap_or("unknown error")
)
.as_bytes(),
)
.await?;
for (name, value) in resp.headers() {
$writer.write_all(name.as_str().as_bytes()).await?;
$writer.write_all(b": ").await?;
$writer.write_all(value.as_bytes()).await?;
$writer.write_all(b"\r\n").await?;
}
$writer.write_all(b"\r\n").await?;
$writer.flush().await?;
}};
}
macro_rules! write_json {
($conn:ident, $x:expr) => {
$conn
.write_all(
serde_json::to_string($x)
.map_err(HandlerError::JsonWriteError)?
.as_bytes(),
)
.await?;
};
}
fn header_bytes_or_empty<'a>(req: &'a Request, header: &str) -> &'a [u8] {
req.headers()
.get(header)
.map(http::HeaderValue::as_bytes)
.unwrap_or_default()
}
pub async fn process(
mut conn: TcpStream,
stream: Arc<crate::stream::Stream>,
) -> Result<(), HandlerError> {
let mut buffer = bytes::BytesMut::with_capacity(1024);
if conn.read_buf(&mut buffer).await? == 0 {
return Ok(());
}
let req = {
let mut headers = [httparse::EMPTY_HEADER; 16];
let mut parsed = httparse::Request::new(&mut headers);
let header_buffer = buffer.split().freeze();
parsed.parse(&header_buffer[..])?;
Request::try_from(parsed)?
};
let resp = http::Response::builder()
.version(http::Version::HTTP_11)
.header(
"Server",
concat!(clap::crate_name!(), "/", clap::crate_version!()),
)
.header("Accept-Encoding", "identity")
.header("Connection", "keep-alive")
.header("Access-Control-Allow-Origin", "*");
match req.uri().path() {
v if v == stream.listen_uri => {
let resp = resp.header("Content-Type", stream.content_type.as_ref());
write_response!(conn, resp);
let mut handle = stream.listen();
loop {
if let Some(v) = handle.subscriber.next().await {
conn.write_all(v.as_ref()).await?;
}
}
}
"/stream" => {
println!("stream req: {:?}", req.headers());
if let Ok(ref mut handle) = stream.try_broadcast() {
let mut resp = resp
.header("Connection", "Close")
.header("Allow", "PUT, SOURCE");
let content_type = header_bytes_or_empty(&req, "content-type");
if content_type != stream.content_type.as_ref().as_bytes() {
write_response!(conn, resp.status(403));
return Ok(());
}
let expect_header = header_bytes_or_empty(&req, "expect");
if expect_header == b"100-continue" {
resp = resp.status(100);
}
write_response!(conn, resp);
stream
.metadata()
.stream
.store(Arc::new(Some(StreamMetadata::from(req.headers()))));
loop {
if conn.read_buf(&mut buffer).await? == 0 {
break;
}
handle.publisher.send(buffer.split().freeze()).await?;
}
stream.metadata().stream.store(Arc::new(None));
stream.metadata().track.store(Arc::new(None));
} else {
panic!("someone's already streaming!");
}
}
"/metadata" => {
let resp = resp
.header("Connection", "Close")
.header("Content-Type", "application/json");
write_response!(conn, resp);
write_json!(conn, stream.metadata());
}
"/admin/metadata" => {
if let Some(query) = req
.uri()
.query()
.map(|v| v.as_bytes())
.map(url::form_urlencoded::parse)
{
stream
.metadata()
.track
.store(Arc::new(Some(TrackMetadata::from(query))));
write_response!(conn, resp);
}
}
_ => panic!("Invalid request: {:?}", req),
}
Ok(())
}
@@ -1,278 +1,74 @@
#![deny(clippy::pedantic)]
#![allow(clippy::used_underscore_binding)]
#![allow(clippy::module_name_repetitions)]
use arc_swap::ArcSwap;
use bus_queue::flavors::arc_swap::{bounded, Publisher, Subscriber};
use bytes::Bytes;
use derive_more::Deref;
use futures::{SinkExt, StreamExt};
use serde::Serialize;
mod config;
mod handler;
mod metadata;
mod stream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use clap::Clap;
use log::{debug, error, info};
use std::sync::Arc;
use tokio::net::TcpListener;
use std::convert::TryFrom;
use std::net::SocketAddr;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
#[derive(Debug, Deref)]
struct Request(http::Request<()>);
impl From<httparse::Request<'_, '_>> for Request {
fn from(parsed: httparse::Request<'_, '_>) -> Self {
let mut req = http::Request::builder()
.version(http::Version::HTTP_11)
.method(parsed.method.unwrap())
.uri(parsed.path.unwrap());
for header in parsed.headers {
req.headers_mut().unwrap().insert(
http::header::HeaderName::from_bytes(header.name.as_bytes()).unwrap(),
http::HeaderValue::try_from(header.value).unwrap(),
);
}
Self(req.body(()).unwrap())
}
}
#[derive(Debug, Default)]
struct MetadataContainer {
stream: ArcSwap<Option<StreamMetadata>>,
track: ArcSwap<Option<TrackMetadata>>,
listeners: AtomicU64,
#[derive(thiserror::Error, Debug)]
pub enum InitError {
#[error("Couldn't read config file: `{0}`")]
ReadError(#[from] std::io::Error),
#[error("Couldn't parse config file: `{0}`")]
DeserializationError(#[from] toml::de::Error),
}
impl serde::ser::Serialize for MetadataContainer {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeStruct;
let mut struc = serializer.serialize_struct("", 2)?;
struc.serialize_field("stream", &**self.stream.load())?;
struc.serialize_field("track", &**self.track.load())?;
struc.serialize_field("listeners", &self.listeners)?;
struc.end()
}
#[derive(Clap)]
#[clap(version = "1.0", author = "Jordan D. <jordan@doyle.la>")]
struct Opts {
#[clap(short, long)]
config: std::path::PathBuf,
}
#[derive(Debug, Default, Serialize)]
struct StreamMetadata {
user_agent: String,
content_type: String,
name: String,
}
#[tokio::main]
async fn main() {
env_logger::init();
impl From<&http::HeaderMap> for StreamMetadata {
fn from(map: &http::HeaderMap) -> Self {
let string_from_header = |v: Option<&http::HeaderValue>| {
v.map(http::HeaderValue::as_bytes)
.map(String::from_utf8_lossy)
.unwrap_or_default()
.into_owned()
};
let opts: Opts = Opts::parse();
Self {
user_agent: string_from_header(map.get("user-agent")),
content_type: string_from_header(map.get("content-type")),
name: string_from_header(map.get("ice-name")),
let cfg = match std::fs::read_to_string(opts.config)
.map_err(InitError::ReadError)
.and_then(|s| toml::from_str(&s).map_err(Into::into))
{
Ok(v) => v,
Err(e) => {
error!("{}", e);
return;
}
}
}
};
#[derive(Debug, Default, Serialize)]
struct TrackMetadata {
artist: Option<String>,
title: Option<String>,
album: Option<String>,
listen_forward(cfg).await;
}
impl From<url::form_urlencoded::Parse<'_>> for TrackMetadata {
fn from(query: url::form_urlencoded::Parse<'_>) -> Self {
let mut meta = Self::default();
for (key, value) in query {
match key.as_ref() {
"artist" => meta.artist = Some(value.into_owned()),
"title" => meta.title = Some(value.into_owned()),
"album" => meta.album = Some(value.into_owned()),
_ => {}
}
}
meta
}
}
async fn listen_forward(config: config::Config) {
let stream = Arc::new(stream::Stream::from(config.stream));
async fn write_response<W: tokio::io::AsyncWrite + Unpin>(mut writer: W, resp: http::Response<()>) {
writer
.write_all(
format!(
"{:?} {} {}\r\n",
resp.version(),
resp.status().as_str(),
resp.status().canonical_reason().unwrap()
)
.as_bytes(),
)
let mut listener = TcpListener::bind(config.server.listen_address)
.await
.unwrap();
for (name, value) in resp.headers() {
writer.write_all(name.as_str().as_bytes()).await.unwrap();
writer.write_all(b": ").await.unwrap();
writer.write_all(value.as_bytes()).await.unwrap();
writer.write_all(b"\r\n").await.unwrap();
}
writer.write_all(b"\r\n").await.unwrap();
writer.flush().await.unwrap();
}
async fn process(
mut stream: TcpStream,
publisher: Arc<Mutex<Publisher<Bytes>>>,
mut subscriber: Subscriber<Bytes>,
metadata: Arc<MetadataContainer>,
) {
println!("accepted");
let mut buffer = bytes::BytesMut::with_capacity(1024);
if stream.read_buf(&mut buffer).await.unwrap() == 0 {
return;
}
let req = {
let mut headers = [httparse::EMPTY_HEADER; 16];
let mut parsed = httparse::Request::new(&mut headers);
let header_buffer = buffer.split().freeze();
parsed.parse(&header_buffer[..]).unwrap();
Request::from(parsed)
};
let resp = http::Response::builder()
.version(http::Version::HTTP_11)
.header(
"Server",
concat!(clap::crate_name!(), "/", clap::crate_version!()),
)
.header("Accept-Encoding", "identity")
.header("Connection", "keep-alive")
.header("Access-Control-Allow-Origin", "*");
match req.uri().path() {
"/listen.mp3" => {
let resp = resp.header("Content-Type", "audio/mpeg").body(()).unwrap();
write_response(&mut stream, resp).await;
metadata.listeners.fetch_add(1, Ordering::Relaxed);
loop {
if let Some(v) = subscriber.next().await {
if stream.write_all(v.as_ref()).await.is_err() {
break;
}
}
}
metadata.listeners.fetch_sub(1, Ordering::Relaxed);
}
"/stream" => {
println!("stream req: {:?}", req.headers());
if let Ok(ref mut publisher) = publisher.try_lock() {
metadata
.stream
.store(Arc::new(Some(StreamMetadata::from(req.headers()))));
let mut resp = resp
.header("Connection", "Close")
.header("Allow", "PUT, SOURCE");
let expect_header = req
.headers()
.get("expect")
.map(http::HeaderValue::as_bytes)
.unwrap_or_default();
if expect_header == b"100-continue" {
resp = resp.status(100);
}
write_response(&mut stream, resp.body(()).unwrap()).await;
loop {
if stream.read_buf(&mut buffer).await.unwrap() == 0 {
break;
}
publisher.send(buffer.split().freeze()).await.unwrap();
}
metadata.stream.store(Arc::new(None));
metadata.track.store(Arc::new(None));
} else {
panic!("someone's already streaming!");
}
}
"/metadata" => {
let resp = resp
.header("Connection", "Close")
.header("Content-Type", "application/json")
.body(())
.unwrap();
write_response(&mut stream, resp).await;
stream
.write_all(serde_json::to_string(&*metadata).unwrap().as_bytes())
.await
.unwrap();
}
"/admin/metadata" => {
let query = url::form_urlencoded::parse(req.uri().query().unwrap().as_bytes());
metadata
.track
.store(Arc::new(Some(TrackMetadata::from(query))));
let resp = resp.body(()).unwrap();
write_response(&mut stream, resp).await;
}
_ => panic!("Invalid request: {:?}", req),
}
}
#[tokio::main]
async fn main() {
env_logger::init();
listen_forward(3000).await;
}
info!(
"Listening for new connections on {}",
config.server.listen_address
);
async fn listen_forward(port: u16) {
let (publisher, subscriber) = bounded::<bytes::Bytes>(128);
let publisher = Arc::new(Mutex::new(publisher));
let metadata = Arc::new(MetadataContainer::default());
let addr = SocketAddr::from(([127, 0, 0, 1], port));
let mut listener = tokio::net::TcpListener::bind(addr).await.unwrap();
loop {
println!("listening for new conns...");
let (stream, _) = listener.accept().await.unwrap();
let (conn, remote) = listener.accept().await.unwrap();
debug!("Accepted connection from {}", remote);
let publisher = publisher.clone();
let subscriber = subscriber.clone();
let metadata = metadata.clone();
let stream = stream.clone();
tokio::spawn(process(stream, publisher, subscriber, metadata));
tokio::spawn(async move {
if let Err(e) = handler::process(conn, stream).await {
error!("Error handling request from {}: {}", remote, e);
}
});
}
}
@@ -1,0 +1,72 @@
use arc_swap::ArcSwap;
use serde::Serialize;
use std::sync::atomic::AtomicU64;
#[derive(Debug, Default)]
pub struct MetadataContainer {
pub stream: ArcSwap<Option<StreamMetadata>>,
pub track: ArcSwap<Option<TrackMetadata>>,
pub listeners: AtomicU64,
}
impl serde::ser::Serialize for MetadataContainer {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeStruct;
let mut struc = serializer.serialize_struct("", 3)?;
struc.serialize_field("stream", &**self.stream.load())?;
struc.serialize_field("track", &**self.track.load())?;
struc.serialize_field("listeners", &self.listeners)?;
struc.end()
}
}
#[derive(Debug, Default, Serialize)]
pub struct StreamMetadata {
user_agent: String,
content_type: String,
name: String,
}
impl From<&http::HeaderMap> for StreamMetadata {
fn from(map: &http::HeaderMap) -> Self {
let string_from_header = |v: Option<&http::HeaderValue>| {
v.map(http::HeaderValue::as_bytes)
.map(String::from_utf8_lossy)
.unwrap_or_default()
.into_owned()
};
Self {
user_agent: string_from_header(map.get("user-agent")),
content_type: string_from_header(map.get("content-type")),
name: string_from_header(map.get("ice-name")),
}
}
}
#[derive(Debug, Default, Serialize)]
pub struct TrackMetadata {
artist: Option<String>,
title: Option<String>,
album: Option<String>,
}
impl From<url::form_urlencoded::Parse<'_>> for TrackMetadata {
fn from(query: url::form_urlencoded::Parse<'_>) -> Self {
let mut meta = Self::default();
for (key, value) in query {
match key.as_ref() {
"artist" => meta.artist = Some(value.into_owned()),
"title" => meta.title = Some(value.into_owned()),
"album" => meta.album = Some(value.into_owned()),
_ => {}
}
}
meta
}
}
@@ -1,0 +1,86 @@
use bus_queue::flavors::arc_swap::{bounded, Publisher, Subscriber};
use bytes::Bytes;
use mime::Mime;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::Mutex;
use ustr::ustr;
use crate::metadata::MetadataContainer;
pub struct Stream {
publisher: Mutex<Publisher<Bytes>>,
subscriber: Subscriber<Bytes>,
metadata: MetadataContainer,
bitrate: usize,
max_conns: usize,
password: String,
pub content_type: Mime,
pub listen_uri: &'static str,
}
impl From<crate::config::StreamConfig> for Stream {
fn from(config: crate::config::StreamConfig) -> Self {
let (publisher, subscriber) = bounded::<Bytes>(config.buffer_size);
let content_type: Mime = config.content_type.parse().unwrap();
let listen_uri = match (content_type.type_(), content_type.subtype()) {
(mime::AUDIO, mime::MPEG) => ustr("/listen.mp3").as_str(),
(mime::AUDIO, mime::OGG) => ustr("/listen.ogg").as_str(),
_ => panic!("unknown stream content type"),
};
Self {
publisher: Mutex::new(publisher),
subscriber,
metadata: MetadataContainer::default(),
bitrate: config.bitrate,
max_conns: config.max_conns,
password: config.password,
content_type,
listen_uri,
}
}
}
impl Stream {
pub fn listen(&self) -> Listener<'_> {
Listener::new(self)
}
pub fn try_broadcast(&self) -> Result<BroadcastHandle<'_>, tokio::sync::TryLockError> {
Ok(BroadcastHandle {
publisher: self.publisher.try_lock()?,
})
}
pub fn metadata(&self) -> &MetadataContainer {
&self.metadata
}
}
pub struct BroadcastHandle<'a> {
pub publisher: tokio::sync::MutexGuard<'a, Publisher<Bytes>>,
}
pub struct Listener<'a> {
pub subscriber: Subscriber<Bytes>,
listener_count: &'a AtomicU64,
}
impl Drop for Listener<'_> {
fn drop(&mut self) {
self.listener_count.fetch_sub(1, Ordering::Relaxed);
}
}
impl<'a> Listener<'a> {
pub fn new(stream: &'a Stream) -> Listener<'a> {
stream.metadata.listeners.fetch_add(1, Ordering::Relaxed);
Self {
subscriber: stream.subscriber.clone(),
listener_count: &stream.metadata.listeners,
}
}
}