🏡 index : ~doyle/scrs.git

author Jordan Doyle <jordan@doyle.la> 2020-07-20 0:22:23.0 +01:00:00
committer Jordan Doyle <jordan@doyle.la> 2020-07-20 0:28:02.0 +01:00:00
commit
89e860b65e7db7961ce3c077e29243aec85c79da [patch]
tree
3757b70dc6f010819acce077420e78a17b3f3e6c
parent
dbd577be7ea51a1d37fd70a54c14fcc97d8f7229
download
89e860b65e7db7961ce3c077e29243aec85c79da.tar.gz

Store stream metadata along with track info



Diff

 scrs-serve/src/main.rs | 98 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------
 1 file changed, 76 insertions(+), 22 deletions(-)

diff --git a/scrs-serve/src/main.rs b/scrs-serve/src/main.rs
index eaac621..13fe317 100644
--- a/scrs-serve/src/main.rs
+++ a/scrs-serve/src/main.rs
@@ -19,8 +19,8 @@
#[derive(Debug, Deref)]
struct Request(http::Request<()>);

impl<'a, 'b: 'a> From<httparse::Request<'a, 'b>> for Request {
    fn from(parsed: httparse::Request<'a, 'b>) -> Self {
impl From<httparse::Request<'_, '_>> for Request {
    fn from(parsed: httparse::Request<'_, '_>) -> Self {
        let mut req = http::Request::builder()
            .version(http::Version::HTTP_11)
            .method(parsed.method.unwrap())
@@ -37,11 +37,71 @@
    }
}

#[derive(Debug, Default)]
struct MetadataContainer {
    stream: ArcSwap<Option<StreamMetadata>>,
    track: ArcSwap<Option<TrackMetadata>>,
}

impl serde::ser::Serialize for MetadataContainer {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        use serde::ser::SerializeStruct;
        let mut struc = serializer.serialize_struct("", 2)?;
        struc.serialize_field("stream", &**self.stream.load())?;
        struc.serialize_field("track", &**self.track.load())?;
        struc.end()
    }
}

#[derive(Debug, Default, Serialize)]
struct StreamMetadata {
    user_agent: String,
    content_type: String,
    name: String,
}

impl From<&http::HeaderMap> for StreamMetadata {
    fn from(map: &http::HeaderMap) -> Self {
        let string_from_header = |v: Option<&http::HeaderValue>| {
            v.map(http::HeaderValue::as_bytes)
                .map(String::from_utf8_lossy)
                .unwrap_or_default()
                .into_owned()
        };

        Self {
            user_agent: string_from_header(map.get("user-agent")),
            content_type: string_from_header(map.get("content-type")),
            name: string_from_header(map.get("ice-name")),
        }
    }
}

#[derive(Debug, Default, Serialize)]
struct TrackMetadata {
    artist: Option<String>,
    title: Option<String>,
    album: Option<String>,
}

impl From<url::form_urlencoded::Parse<'_>> for TrackMetadata {
    fn from(query: url::form_urlencoded::Parse<'_>) -> Self {
        let mut meta = Self::default();

        for (key, value) in query {
            match key.as_ref() {
                "artist" => meta.artist = Some(value.into_owned()),
                "title" => meta.title = Some(value.into_owned()),
                "album" => meta.album = Some(value.into_owned()),
                _ => {}
            }
        }

        meta
    }
}

async fn write_response<W: tokio::io::AsyncWrite + Unpin>(mut writer: W, resp: http::Response<()>) {
@@ -73,7 +133,7 @@
    mut stream: TcpStream,
    publisher: Arc<Mutex<Publisher<Bytes>>>,
    mut subscriber: Subscriber<Bytes>,
    metadata: Arc<ArcSwap<StreamMetadata>>,
    metadata: Arc<MetadataContainer>,
) {
    println!("accepted");

@@ -120,7 +180,11 @@
        "/stream" => {
            println!("stream req: {:?}", req.headers());
            if let Ok(ref mut publisher) = publisher.try_lock() {
                let resp = resp
                metadata
                    .stream
                    .store(Arc::new(Some(StreamMetadata::from(req.headers()))));

                let mut resp = resp
                    .header("Connection", "Close")
                    .header("Allow", "PUT, SOURCE")
                    .body(())
@@ -134,6 +198,9 @@

                    publisher.send(buffer.split().freeze()).await.unwrap();
                }

                metadata.stream.store(Arc::new(None));
                metadata.track.store(Arc::new(None));
            } else {
                panic!("someone's already streaming!");
            }
@@ -147,29 +214,16 @@
            write_response(&mut stream, resp).await;

            stream
                .write_all(
                    serde_json::to_string(metadata.load().as_ref())
                        .unwrap()
                        .as_bytes(),
                )
                .write_all(serde_json::to_string(&*metadata).unwrap().as_bytes())
                .await
                .unwrap();
        }
        "/admin/metadata" => {
            let query = url::form_urlencoded::parse(req.uri().query().unwrap().as_bytes());

            let mut meta = StreamMetadata::default();

            for (key, value) in query {
                match key.as_ref() {
                    "artist" => meta.artist = Some(value.into_owned()),
                    "title" => meta.title = Some(value.into_owned()),
                    "album" => meta.album = Some(value.into_owned()),
                    _ => {}
                }
            }

            metadata.store(Arc::new(meta));
            metadata
                .track
                .store(Arc::new(Some(TrackMetadata::from(query))));

            let resp = resp.body(()).unwrap();
            write_response(&mut stream, resp).await;
@@ -187,7 +241,7 @@
async fn listen_forward(port: u16) {
    let (publisher, subscriber) = bounded::<bytes::Bytes>(128);
    let publisher = Arc::new(Mutex::new(publisher));
    let metadata = Arc::new(ArcSwap::new(Arc::from(StreamMetadata::default())));
    let metadata = Arc::new(MetadataContainer::default());

    let addr = SocketAddr::from(([127, 0, 0, 1], port));