🏡 index : ~doyle/scrs.git

author Jordan Doyle <jordan@doyle.la> 2020-07-25 13:59:02.0 +00:00:00
committer Jordan Doyle <jordan@doyle.la> 2020-07-25 14:10:20.0 +00:00:00
commit
12a3e5ab31acaef514427fb5df355c1667ccd033 [patch]
tree
5b3676b38f42f617249d6468bcd6f485f133b473
parent
073311bc40feae2260dcd40e7aeeea5bcd19f722
download
12a3e5ab31acaef514427fb5df355c1667ccd033.tar.gz

Split up handlers and server, introduce config



Diff

 scrs-serve/Cargo.lock          | 281 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 scrs-serve/Cargo.toml          |   9 +++++++--
 scrs-serve/config-example.toml |  10 ++++++++++
 scrs-serve/src/config.rs       |  24 ++++++++++++++++++++++++
 scrs-serve/src/handler.rs      | 213 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 scrs-serve/src/main.rs         | 302 ++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------
 scrs-serve/src/metadata.rs     |  72 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 scrs-serve/src/stream.rs       |  86 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 8 files changed, 729 insertions(+), 268 deletions(-)

diff --git a/scrs-serve/Cargo.lock b/scrs-serve/Cargo.lock
index e94b896..06a5c69 100644
--- a/scrs-serve/Cargo.lock
+++ a/scrs-serve/Cargo.lock
@@ -1,21 +1,21 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "aho-corasick"
version = "0.7.13"
name = "ahash"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86"
checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217"
dependencies = [
 "memchr",
 "const-random",
]

[[package]]
name = "ansi_term"
version = "0.11.0"
name = "aho-corasick"
version = "0.7.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86"
dependencies = [
 "winapi 0.3.9",
 "memchr",
]

[[package]]
@@ -36,6 +36,12 @@
]

[[package]]
name = "autocfg"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"

[[package]]
name = "bitflags"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -51,6 +57,12 @@
 "futures-core",
 "futures-sink",
]

[[package]]
name = "byteorder"
version = "1.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de"

[[package]]
name = "bytes"
@@ -66,20 +78,66 @@

[[package]]
name = "clap"
version = "2.33.1"
version = "3.0.0-beta.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129"
checksum = "860643c53f980f0d38a5e25dfab6c3c93b2cb3aa1fe192643d17a293c6c41936"
dependencies = [
 "ansi_term",
 "atty",
 "bitflags",
 "clap_derive",
 "indexmap",
 "lazy_static",
 "os_str_bytes",
 "strsim",
 "termcolor",
 "textwrap",
 "unicode-width",
 "vec_map",
]

[[package]]
name = "clap_derive"
version = "3.0.0-beta.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb51c9e75b94452505acd21d929323f5a5c6c4735a852adbd39ef5fb1b014f30"
dependencies = [
 "heck",
 "proc-macro-error",
 "proc-macro2",
 "quote",
 "syn",
]

[[package]]
name = "cloudabi"
version = "0.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
dependencies = [
 "bitflags",
]

[[package]]
name = "const-random"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a"
dependencies = [
 "const-random-macro",
 "proc-macro-hack",
]

[[package]]
name = "const-random-macro"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a"
dependencies = [
 "getrandom",
 "proc-macro-hack",
]

[[package]]
name = "derive_more"
version = "0.99.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -218,6 +276,35 @@
 "proc-macro-hack",
 "proc-macro-nested",
 "slab",
]

[[package]]
name = "getrandom"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
dependencies = [
 "cfg-if",
 "libc",
 "wasi",
]

[[package]]
name = "hashbrown"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34f595585f103464d8d2f6e9864682d74c1601fed5e07d62b1c9058dba8246fb"
dependencies = [
 "autocfg",
]

[[package]]
name = "heck"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205"
dependencies = [
 "unicode-segmentation",
]

[[package]]
@@ -264,6 +351,16 @@
 "matches",
 "unicode-bidi",
 "unicode-normalization",
]

[[package]]
name = "indexmap"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b88cd59ee5f71fea89a62248fc8f387d44400cefe05ef548466d61ced9029a7"
dependencies = [
 "autocfg",
 "hashbrown",
]

[[package]]
@@ -302,6 +399,15 @@
version = "0.2.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd7d4bd64732af4bf3a67f367c27df8520ad7e230c5817b8ff485864d80242b9"

[[package]]
name = "lock_api"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75"
dependencies = [
 "scopeguard",
]

[[package]]
name = "log"
@@ -325,6 +431,12 @@
checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400"

[[package]]
name = "mime"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"

[[package]]
name = "mio"
version = "0.6.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -414,8 +526,38 @@
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d"

[[package]]
name = "os_str_bytes"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06de47b848347d8c4c94219ad8ecd35eb90231704b067e67e6ae2e36ee023510"

[[package]]
name = "parking_lot"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e"
dependencies = [
 "lock_api",
 "parking_lot_core",
]

[[package]]
name = "parking_lot_core"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3"
dependencies = [
 "cfg-if",
 "cloudabi",
 "libc",
 "redox_syscall",
 "smallvec",
 "winapi 0.3.9",
]

[[package]]
name = "percent-encoding"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -452,6 +594,32 @@
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"

[[package]]
name = "proc-macro-error"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7"
dependencies = [
 "proc-macro-error-attr",
 "proc-macro2",
 "quote",
 "syn",
 "version_check",
]

[[package]]
name = "proc-macro-error-attr"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de"
dependencies = [
 "proc-macro2",
 "quote",
 "syn",
 "syn-mid",
 "version_check",
]

[[package]]
name = "proc-macro-hack"
@@ -520,6 +688,12 @@
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"

[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"

[[package]]
name = "scrs"
version = "0.1.0"
dependencies = [
@@ -532,10 +706,15 @@
 "futures",
 "http",
 "httparse",
 "log",
 "mime",
 "serde",
 "serde_json",
 "thiserror",
 "tokio",
 "toml",
 "url",
 "ustr",
]

[[package]]
@@ -586,6 +765,12 @@
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"

[[package]]
name = "smallvec"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3757cb9d89161a2f24e1cf78efa0c1fcff485d18e3f55e0aa3480824ddaa0f3f"

[[package]]
name = "socket2"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -599,9 +784,9 @@

[[package]]
name = "strsim"
version = "0.8.0"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"

[[package]]
name = "syn"
@@ -612,6 +797,17 @@
 "proc-macro2",
 "quote",
 "unicode-xid",
]

[[package]]
name = "syn-mid"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7be3539f6c128a931cf19dcee741c1af532c7fd387baa739c03dd2e96479338a"
dependencies = [
 "proc-macro2",
 "quote",
 "syn",
]

[[package]]
@@ -630,6 +826,26 @@
checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
dependencies = [
 "unicode-width",
]

[[package]]
name = "thiserror"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dfdd070ccd8ccb78f4ad66bf1982dc37f620ef696c6b5028fe2ed83dd3d0d08"
dependencies = [
 "thiserror-impl",
]

[[package]]
name = "thiserror-impl"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793"
dependencies = [
 "proc-macro2",
 "quote",
 "syn",
]

[[package]]
@@ -680,6 +896,15 @@
 "proc-macro2",
 "quote",
 "syn",
]

[[package]]
name = "toml"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffc92d160b1eef40665be3a05630d003936a3bc7da7421277846c2613e92c71a"
dependencies = [
 "serde",
]

[[package]]
@@ -699,6 +924,12 @@
dependencies = [
 "tinyvec",
]

[[package]]
name = "unicode-segmentation"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e83e153d1053cbb5a118eeff7fd5be06ed99153f00dbcd8ae310c5fb2b22edc0"

[[package]]
name = "unicode-width"
@@ -721,6 +952,18 @@
 "idna",
 "matches",
 "percent-encoding",
]

[[package]]
name = "ustr"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0548099019a3ce5393ab9f2754ee9e4cb49e41cc0ebe3319171c43a59a147c53"
dependencies = [
 "ahash",
 "byteorder",
 "lazy_static",
 "parking_lot",
]

[[package]]
@@ -728,6 +971,18 @@
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"

[[package]]
name = "version_check"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"

[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"

[[package]]
name = "winapi"
diff --git a/scrs-serve/Cargo.toml b/scrs-serve/Cargo.toml
index 64efb3f..0b87003 100644
--- a/scrs-serve/Cargo.toml
+++ a/scrs-serve/Cargo.toml
@@ -12,11 +12,16 @@
tokio = { version = "0.2", features = ["full"] }
http = "0.2"
env_logger = "0.7"
log = "0.4"
thiserror = "1.0"
bytes = "0.5"
httparse = "1.3"
clap = "2.33"
derive_more = "0.99"
arc-swap = "0.4"
url = "2.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_json = "1"
mime = "0.3"
ustr = "0.7"
clap = "3.0.0-beta.1"
toml = "0.5"
diff --git a/scrs-serve/config-example.toml b/scrs-serve/config-example.toml
new file mode 100644
index 0000000..5a5837d 100644
--- /dev/null
+++ a/scrs-serve/config-example.toml
@@ -1,0 +1,10 @@
[server]
listen_address = "0.0.0.0:3000"

[stream]
mount_point = "/"
content_type = "audio/mpeg"
bitrate = 128
max_conns = 1000
password = "abcdef"
buffer_size = 128
diff --git a/scrs-serve/src/config.rs b/scrs-serve/src/config.rs
new file mode 100644
index 0000000..0041103 100644
--- /dev/null
+++ a/scrs-serve/src/config.rs
@@ -1,0 +1,24 @@
use serde::Deserialize;

use std::net::SocketAddr;

#[derive(Debug, Deserialize)]
pub struct Config {
    pub server: ServerConfig,
    pub stream: StreamConfig,
}

#[derive(Debug, Deserialize)]
pub struct ServerConfig {
    pub listen_address: SocketAddr,
}

#[derive(Debug, Deserialize)]
pub struct StreamConfig {
    // pub mount_point: String,
    pub content_type: String,
    pub bitrate: usize,
    pub max_conns: usize,
    pub password: String,
    pub buffer_size: usize,
}
diff --git a/scrs-serve/src/handler.rs b/scrs-serve/src/handler.rs
new file mode 100644
index 0000000..e2a20ec 100644
--- /dev/null
+++ a/scrs-serve/src/handler.rs
@@ -1,0 +1,213 @@
use crate::metadata::{StreamMetadata, TrackMetadata};

use derive_more::Deref;
use futures::{SinkExt, StreamExt};

use thiserror::Error;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

use std::convert::TryFrom;
use std::sync::Arc;

#[derive(Error, Debug)]
pub enum HandlerError {
    #[error("failed to parse request from client: `{0}`")]
    RequestError(#[from] httparse::Error),
    #[error("failed to create response for client: `{0}`")]
    ResponseError(#[from] http::Error),
    #[error("other side didn't send valid a valid http request")]
    InvalidHttp,
    #[error("failed to create JSON blob response for client: `{0}`")]
    JsonWriteError(serde_json::Error),
    #[error("i/o failed for client: `{0}`")]
    IoError(#[from] std::io::Error),
    #[error("source attempted to stream data, but the stream no longer exists: `{0}`")]
    StreamDisconnected(#[from] std::sync::mpsc::SendError<bytes::Bytes>),
}

#[derive(Debug, Deref)]
struct Request(http::Request<()>);

impl TryFrom<httparse::Request<'_, '_>> for Request {
    type Error = HandlerError;

    fn try_from(parsed: httparse::Request<'_, '_>) -> Result<Self, Self::Error> {
        let mut req = http::Request::builder()
            .version(http::Version::HTTP_11)
            .method(parsed.method.ok_or(HandlerError::InvalidHttp)?)
            .uri(parsed.path.ok_or(HandlerError::InvalidHttp)?);

        for header in parsed.headers {
            req.headers_mut().unwrap().insert(
                http::header::HeaderName::from_bytes(header.name.as_bytes())
                    .map_err(|_| httparse::Error::HeaderName)?,
                http::HeaderValue::try_from(header.value)
                    .map_err(|_| httparse::Error::HeaderValue)?,
            );
        }

        Ok(Self(req.body(())?))
    }
}

macro_rules! write_response {
    ($writer:ident, $resp:expr) => {{
        let resp = $resp.body(())?;

        $writer
            .write_all(
                format!(
                    "{:?} {} {}\r\n",
                    resp.version(),
                    resp.status().as_str(),
                    resp.status().canonical_reason().unwrap_or("unknown error")
                )
                .as_bytes(),
            )
            .await?;

        for (name, value) in resp.headers() {
            $writer.write_all(name.as_str().as_bytes()).await?;
            $writer.write_all(b": ").await?;
            $writer.write_all(value.as_bytes()).await?;
            $writer.write_all(b"\r\n").await?;
        }

        $writer.write_all(b"\r\n").await?;
        $writer.flush().await?;
    }};
}

macro_rules! write_json {
    ($conn:ident, $x:expr) => {
        $conn
            .write_all(
                serde_json::to_string($x)
                    .map_err(HandlerError::JsonWriteError)?
                    .as_bytes(),
            )
            .await?;
    };
}

fn header_bytes_or_empty<'a>(req: &'a Request, header: &str) -> &'a [u8] {
    req.headers()
        .get(header)
        .map(http::HeaderValue::as_bytes)
        .unwrap_or_default()
}

pub async fn process(
    mut conn: TcpStream,
    stream: Arc<crate::stream::Stream>,
) -> Result<(), HandlerError> {
    let mut buffer = bytes::BytesMut::with_capacity(1024);

    if conn.read_buf(&mut buffer).await? == 0 {
        return Ok(());
    }

    let req = {
        let mut headers = [httparse::EMPTY_HEADER; 16];
        let mut parsed = httparse::Request::new(&mut headers);

        let header_buffer = buffer.split().freeze();
        parsed.parse(&header_buffer[..])?;

        Request::try_from(parsed)?
    };

    let resp = http::Response::builder()
        .version(http::Version::HTTP_11)
        .header(
            "Server",
            concat!(clap::crate_name!(), "/", clap::crate_version!()),
        )
        .header("Accept-Encoding", "identity")
        .header("Connection", "keep-alive")
        .header("Access-Control-Allow-Origin", "*");

    match req.uri().path() {
        v if v == stream.listen_uri => {
            // TODO: allow the streamer to set content-type
            let resp = resp.header("Content-Type", stream.content_type.as_ref());
            write_response!(conn, resp);

            let mut handle = stream.listen();

            loop {
                if let Some(v) = handle.subscriber.next().await {
                    conn.write_all(v.as_ref()).await?;
                }
            }
        }
        "/stream" => {
            println!("stream req: {:?}", req.headers());
            if let Ok(ref mut handle) = stream.try_broadcast() {
                let mut resp = resp
                    .header("Connection", "Close")
                    .header("Allow", "PUT, SOURCE");

                let content_type = header_bytes_or_empty(&req, "content-type");
                if content_type != stream.content_type.as_ref().as_bytes() {
                    // todo: custom message - unsupported content type
                    write_response!(conn, resp.status(403));
                    return Ok(());
                }

                let expect_header = header_bytes_or_empty(&req, "expect");
                if expect_header == b"100-continue" {
                    resp = resp.status(100);
                }

                write_response!(conn, resp);

                // todo: only stream handles should have write access to the metadata
                stream
                    .metadata()
                    .stream
                    .store(Arc::new(Some(StreamMetadata::from(req.headers()))));

                loop {
                    if conn.read_buf(&mut buffer).await? == 0 {
                        break;
                    }

                    handle.publisher.send(buffer.split().freeze()).await?;
                }

                stream.metadata().stream.store(Arc::new(None));
                stream.metadata().track.store(Arc::new(None));
            } else {
                panic!("someone's already streaming!");
            }
        }
        "/metadata" => {
            let resp = resp
                .header("Connection", "Close")
                .header("Content-Type", "application/json");
            write_response!(conn, resp);
            write_json!(conn, stream.metadata());
        }
        "/admin/metadata" => {
            if let Some(query) = req
                .uri()
                .query()
                .map(|v| v.as_bytes())
                .map(url::form_urlencoded::parse)
            {
                // todo: auth
                stream
                    .metadata()
                    .track
                    .store(Arc::new(Some(TrackMetadata::from(query))));

                write_response!(conn, resp);
            }
        }
        _ => panic!("Invalid request: {:?}", req),
    }

    Ok(())
}
diff --git a/scrs-serve/src/main.rs b/scrs-serve/src/main.rs
index 0b97f9a..11ccd4e 100644
--- a/scrs-serve/src/main.rs
+++ a/scrs-serve/src/main.rs
@@ -1,278 +1,74 @@
#![deny(clippy::pedantic)]
#![allow(clippy::used_underscore_binding)]
#![allow(clippy::module_name_repetitions)]

use arc_swap::ArcSwap;
use bus_queue::flavors::arc_swap::{bounded, Publisher, Subscriber};
use bytes::Bytes;
use derive_more::Deref;
use futures::{SinkExt, StreamExt};
use serde::Serialize;
mod config;
mod handler;
mod metadata;
mod stream;

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use clap::Clap;
use log::{debug, error, info};
use std::sync::Arc;
use tokio::net::TcpListener;

use std::convert::TryFrom;
use std::net::SocketAddr;
use std::sync::{
    atomic::{AtomicU64, Ordering},
    Arc,
};

#[derive(Debug, Deref)]
struct Request(http::Request<()>);

impl From<httparse::Request<'_, '_>> for Request {
    fn from(parsed: httparse::Request<'_, '_>) -> Self {
        let mut req = http::Request::builder()
            .version(http::Version::HTTP_11)
            .method(parsed.method.unwrap())
            .uri(parsed.path.unwrap());

        for header in parsed.headers {
            req.headers_mut().unwrap().insert(
                http::header::HeaderName::from_bytes(header.name.as_bytes()).unwrap(),
                http::HeaderValue::try_from(header.value).unwrap(),
            );
        }

        Self(req.body(()).unwrap())
    }
}

#[derive(Debug, Default)]
struct MetadataContainer {
    stream: ArcSwap<Option<StreamMetadata>>,
    track: ArcSwap<Option<TrackMetadata>>,
    listeners: AtomicU64,
#[derive(thiserror::Error, Debug)]
pub enum InitError {
    #[error("Couldn't read config file: `{0}`")]
    ReadError(#[from] std::io::Error),
    #[error("Couldn't parse config file: `{0}`")]
    DeserializationError(#[from] toml::de::Error),
}

impl serde::ser::Serialize for MetadataContainer {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        use serde::ser::SerializeStruct;
        let mut struc = serializer.serialize_struct("", 2)?;
        struc.serialize_field("stream", &**self.stream.load())?;
        struc.serialize_field("track", &**self.track.load())?;
        struc.serialize_field("listeners", &self.listeners)?;
        struc.end()
    }
#[derive(Clap)]
#[clap(version = "1.0", author = "Jordan D. <jordan@doyle.la>")]
struct Opts {
    #[clap(short, long)]
    config: std::path::PathBuf,
}

#[derive(Debug, Default, Serialize)]
struct StreamMetadata {
    user_agent: String,
    content_type: String,
    name: String,
}
#[tokio::main]
async fn main() {
    env_logger::init();

impl From<&http::HeaderMap> for StreamMetadata {
    fn from(map: &http::HeaderMap) -> Self {
        let string_from_header = |v: Option<&http::HeaderValue>| {
            v.map(http::HeaderValue::as_bytes)
                .map(String::from_utf8_lossy)
                .unwrap_or_default()
                .into_owned()
        };
    let opts: Opts = Opts::parse();

        Self {
            user_agent: string_from_header(map.get("user-agent")),
            content_type: string_from_header(map.get("content-type")),
            name: string_from_header(map.get("ice-name")),
    let cfg = match std::fs::read_to_string(opts.config)
        .map_err(InitError::ReadError)
        .and_then(|s| toml::from_str(&s).map_err(Into::into))
    {
        Ok(v) => v,
        Err(e) => {
            error!("{}", e);
            return;
        }
    }
}
    };

#[derive(Debug, Default, Serialize)]
struct TrackMetadata {
    artist: Option<String>,
    title: Option<String>,
    album: Option<String>,
    listen_forward(cfg).await;
}

impl From<url::form_urlencoded::Parse<'_>> for TrackMetadata {
    fn from(query: url::form_urlencoded::Parse<'_>) -> Self {
        let mut meta = Self::default();

        for (key, value) in query {
            match key.as_ref() {
                "artist" => meta.artist = Some(value.into_owned()),
                "title" => meta.title = Some(value.into_owned()),
                "album" => meta.album = Some(value.into_owned()),
                _ => {}
            }
        }

        meta
    }
}
async fn listen_forward(config: config::Config) {
    let stream = Arc::new(stream::Stream::from(config.stream));

async fn write_response<W: tokio::io::AsyncWrite + Unpin>(mut writer: W, resp: http::Response<()>) {
    writer
        .write_all(
            format!(
                "{:?} {} {}\r\n",
                resp.version(),
                resp.status().as_str(),
                resp.status().canonical_reason().unwrap()
            )
            .as_bytes(),
        )
    let mut listener = TcpListener::bind(config.server.listen_address)
        .await
        .unwrap();

    for (name, value) in resp.headers() {
        writer.write_all(name.as_str().as_bytes()).await.unwrap();
        writer.write_all(b": ").await.unwrap();
        writer.write_all(value.as_bytes()).await.unwrap();
        writer.write_all(b"\r\n").await.unwrap();
    }

    writer.write_all(b"\r\n").await.unwrap();
    writer.flush().await.unwrap();
}

async fn process(
    mut stream: TcpStream,
    publisher: Arc<Mutex<Publisher<Bytes>>>,
    mut subscriber: Subscriber<Bytes>,
    metadata: Arc<MetadataContainer>,
) {
    println!("accepted");

    let mut buffer = bytes::BytesMut::with_capacity(1024);

    if stream.read_buf(&mut buffer).await.unwrap() == 0 {
        return;
    }

    let req = {
        let mut headers = [httparse::EMPTY_HEADER; 16];
        let mut parsed = httparse::Request::new(&mut headers);

        let header_buffer = buffer.split().freeze();
        parsed.parse(&header_buffer[..]).unwrap();

        Request::from(parsed)
    };

    let resp = http::Response::builder()
        .version(http::Version::HTTP_11)
        .header(
            "Server",
            concat!(clap::crate_name!(), "/", clap::crate_version!()),
        )
        .header("Accept-Encoding", "identity")
        .header("Connection", "keep-alive")
        .header("Access-Control-Allow-Origin", "*");

    match req.uri().path() {
        "/listen.mp3" => {
            // TODO: allow the streamer to set content-type
            let resp = resp.header("Content-Type", "audio/mpeg").body(()).unwrap();
            write_response(&mut stream, resp).await;

            metadata.listeners.fetch_add(1, Ordering::Relaxed);

            loop {
                if let Some(v) = subscriber.next().await {
                    if stream.write_all(v.as_ref()).await.is_err() {
                        break;
                    }
                }
            }

            metadata.listeners.fetch_sub(1, Ordering::Relaxed);
        }
        "/stream" => {
            println!("stream req: {:?}", req.headers());
            if let Ok(ref mut publisher) = publisher.try_lock() {
                metadata
                    .stream
                    .store(Arc::new(Some(StreamMetadata::from(req.headers()))));

                let mut resp = resp
                    .header("Connection", "Close")
                    .header("Allow", "PUT, SOURCE");

                let expect_header = req
                    .headers()
                    .get("expect")
                    .map(http::HeaderValue::as_bytes)
                    .unwrap_or_default();
                if expect_header == b"100-continue" {
                    resp = resp.status(100);
                }

                write_response(&mut stream, resp.body(()).unwrap()).await;

                loop {
                    if stream.read_buf(&mut buffer).await.unwrap() == 0 {
                        break;
                    }

                    publisher.send(buffer.split().freeze()).await.unwrap();
                }

                metadata.stream.store(Arc::new(None));
                metadata.track.store(Arc::new(None));
            } else {
                panic!("someone's already streaming!");
            }
        }
        "/metadata" => {
            let resp = resp
                .header("Connection", "Close")
                .header("Content-Type", "application/json")
                .body(())
                .unwrap();
            write_response(&mut stream, resp).await;

            stream
                .write_all(serde_json::to_string(&*metadata).unwrap().as_bytes())
                .await
                .unwrap();
        }
        "/admin/metadata" => {
            let query = url::form_urlencoded::parse(req.uri().query().unwrap().as_bytes());

            metadata
                .track
                .store(Arc::new(Some(TrackMetadata::from(query))));

            let resp = resp.body(()).unwrap();
            write_response(&mut stream, resp).await;
        }
        _ => panic!("Invalid request: {:?}", req),
    }
}

#[tokio::main]
async fn main() {
    env_logger::init();
    listen_forward(3000).await;
}
    info!(
        "Listening for new connections on {}",
        config.server.listen_address
    );

async fn listen_forward(port: u16) {
    let (publisher, subscriber) = bounded::<bytes::Bytes>(128);
    let publisher = Arc::new(Mutex::new(publisher));
    let metadata = Arc::new(MetadataContainer::default());

    let addr = SocketAddr::from(([127, 0, 0, 1], port));

    // let the consumer pass this in
    let mut listener = tokio::net::TcpListener::bind(addr).await.unwrap();

    loop {
        println!("listening for new conns...");
        let (stream, _) = listener.accept().await.unwrap();
        let (conn, remote) = listener.accept().await.unwrap();
        debug!("Accepted connection from {}", remote);

        let publisher = publisher.clone();
        let subscriber = subscriber.clone();
        let metadata = metadata.clone();
        let stream = stream.clone();

        tokio::spawn(process(stream, publisher, subscriber, metadata));
        tokio::spawn(async move {
            if let Err(e) = handler::process(conn, stream).await {
                error!("Error handling request from {}: {}", remote, e);
            }
        });
    }
}
diff --git a/scrs-serve/src/metadata.rs b/scrs-serve/src/metadata.rs
new file mode 100644
index 0000000..f1b0581 100644
--- /dev/null
+++ a/scrs-serve/src/metadata.rs
@@ -1,0 +1,72 @@
use arc_swap::ArcSwap;
use serde::Serialize;
use std::sync::atomic::AtomicU64;

#[derive(Debug, Default)]
pub struct MetadataContainer {
    pub stream: ArcSwap<Option<StreamMetadata>>,
    pub track: ArcSwap<Option<TrackMetadata>>,
    pub listeners: AtomicU64,
}

impl serde::ser::Serialize for MetadataContainer {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        use serde::ser::SerializeStruct;
        let mut struc = serializer.serialize_struct("", 3)?;
        struc.serialize_field("stream", &**self.stream.load())?;
        struc.serialize_field("track", &**self.track.load())?;
        struc.serialize_field("listeners", &self.listeners)?;
        struc.end()
    }
}

#[derive(Debug, Default, Serialize)]
pub struct StreamMetadata {
    user_agent: String,
    content_type: String,
    name: String,
}

impl From<&http::HeaderMap> for StreamMetadata {
    fn from(map: &http::HeaderMap) -> Self {
        let string_from_header = |v: Option<&http::HeaderValue>| {
            v.map(http::HeaderValue::as_bytes)
                .map(String::from_utf8_lossy)
                .unwrap_or_default()
                .into_owned()
        };

        Self {
            user_agent: string_from_header(map.get("user-agent")),
            content_type: string_from_header(map.get("content-type")),
            name: string_from_header(map.get("ice-name")),
        }
    }
}

#[derive(Debug, Default, Serialize)]
pub struct TrackMetadata {
    artist: Option<String>,
    title: Option<String>,
    album: Option<String>,
}

impl From<url::form_urlencoded::Parse<'_>> for TrackMetadata {
    fn from(query: url::form_urlencoded::Parse<'_>) -> Self {
        let mut meta = Self::default();

        for (key, value) in query {
            match key.as_ref() {
                "artist" => meta.artist = Some(value.into_owned()),
                "title" => meta.title = Some(value.into_owned()),
                "album" => meta.album = Some(value.into_owned()),
                _ => {}
            }
        }

        meta
    }
}
diff --git a/scrs-serve/src/stream.rs b/scrs-serve/src/stream.rs
new file mode 100644
index 0000000..5143cb9 100644
--- /dev/null
+++ a/scrs-serve/src/stream.rs
@@ -1,0 +1,86 @@
use bus_queue::flavors::arc_swap::{bounded, Publisher, Subscriber};
use bytes::Bytes;
use mime::Mime;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::Mutex;
use ustr::ustr;

use crate::metadata::MetadataContainer;

pub struct Stream {
    publisher: Mutex<Publisher<Bytes>>,
    subscriber: Subscriber<Bytes>,
    metadata: MetadataContainer,
    bitrate: usize,
    max_conns: usize,
    password: String,
    pub content_type: Mime,
    pub listen_uri: &'static str,
}

impl From<crate::config::StreamConfig> for Stream {
    fn from(config: crate::config::StreamConfig) -> Self {
        let (publisher, subscriber) = bounded::<Bytes>(config.buffer_size);

        let content_type: Mime = config.content_type.parse().unwrap();

        let listen_uri = match (content_type.type_(), content_type.subtype()) {
            (mime::AUDIO, mime::MPEG) => ustr("/listen.mp3").as_str(),
            (mime::AUDIO, mime::OGG) => ustr("/listen.ogg").as_str(),
            _ => panic!("unknown stream content type"),
        };

        Self {
            publisher: Mutex::new(publisher),
            subscriber,
            metadata: MetadataContainer::default(),
            bitrate: config.bitrate,
            max_conns: config.max_conns,
            password: config.password,
            content_type,
            listen_uri,
        }
    }
}

impl Stream {
    pub fn listen(&self) -> Listener<'_> {
        Listener::new(self)
    }

    pub fn try_broadcast(&self) -> Result<BroadcastHandle<'_>, tokio::sync::TryLockError> {
        Ok(BroadcastHandle {
            publisher: self.publisher.try_lock()?,
        })
    }

    pub fn metadata(&self) -> &MetadataContainer {
        &self.metadata
    }
}

pub struct BroadcastHandle<'a> {
    pub publisher: tokio::sync::MutexGuard<'a, Publisher<Bytes>>,
}

pub struct Listener<'a> {
    pub subscriber: Subscriber<Bytes>,
    listener_count: &'a AtomicU64,
}

impl Drop for Listener<'_> {
    fn drop(&mut self) {
        self.listener_count.fetch_sub(1, Ordering::Relaxed);
    }
}

impl<'a> Listener<'a> {
    pub fn new(stream: &'a Stream) -> Listener<'a> {
        stream.metadata.listeners.fetch_add(1, Ordering::Relaxed);

        Self {
            subscriber: stream.subscriber.clone(),
            listener_count: &stream.metadata.listeners,
        }
    }
}