Store stream metadata along with track info
Diff
scrs-serve/src/main.rs | 98 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------
1 file changed, 76 insertions(+), 22 deletions(-)
@@ -19,8 +19,8 @@
#[derive(Debug, Deref)]
struct Request(http::Request<()>);
impl<'a, 'b: 'a> From<httparse::Request<'a, 'b>> for Request {
fn from(parsed: httparse::Request<'a, 'b>) -> Self {
impl From<httparse::Request<'_, '_>> for Request {
fn from(parsed: httparse::Request<'_, '_>) -> Self {
let mut req = http::Request::builder()
.version(http::Version::HTTP_11)
.method(parsed.method.unwrap())
@@ -37,11 +37,71 @@
}
}
#[derive(Debug, Default)]
struct MetadataContainer {
stream: ArcSwap<Option<StreamMetadata>>,
track: ArcSwap<Option<TrackMetadata>>,
}
impl serde::ser::Serialize for MetadataContainer {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeStruct;
let mut struc = serializer.serialize_struct("", 2)?;
struc.serialize_field("stream", &**self.stream.load())?;
struc.serialize_field("track", &**self.track.load())?;
struc.end()
}
}
#[derive(Debug, Default, Serialize)]
struct StreamMetadata {
user_agent: String,
content_type: String,
name: String,
}
impl From<&http::HeaderMap> for StreamMetadata {
fn from(map: &http::HeaderMap) -> Self {
let string_from_header = |v: Option<&http::HeaderValue>| {
v.map(http::HeaderValue::as_bytes)
.map(String::from_utf8_lossy)
.unwrap_or_default()
.into_owned()
};
Self {
user_agent: string_from_header(map.get("user-agent")),
content_type: string_from_header(map.get("content-type")),
name: string_from_header(map.get("ice-name")),
}
}
}
#[derive(Debug, Default, Serialize)]
struct TrackMetadata {
artist: Option<String>,
title: Option<String>,
album: Option<String>,
}
impl From<url::form_urlencoded::Parse<'_>> for TrackMetadata {
fn from(query: url::form_urlencoded::Parse<'_>) -> Self {
let mut meta = Self::default();
for (key, value) in query {
match key.as_ref() {
"artist" => meta.artist = Some(value.into_owned()),
"title" => meta.title = Some(value.into_owned()),
"album" => meta.album = Some(value.into_owned()),
_ => {}
}
}
meta
}
}
async fn write_response<W: tokio::io::AsyncWrite + Unpin>(mut writer: W, resp: http::Response<()>) {
@@ -73,7 +133,7 @@
mut stream: TcpStream,
publisher: Arc<Mutex<Publisher<Bytes>>>,
mut subscriber: Subscriber<Bytes>,
metadata: Arc<ArcSwap<StreamMetadata>>,
metadata: Arc<MetadataContainer>,
) {
println!("accepted");
@@ -120,7 +180,11 @@
"/stream" => {
println!("stream req: {:?}", req.headers());
if let Ok(ref mut publisher) = publisher.try_lock() {
let resp = resp
metadata
.stream
.store(Arc::new(Some(StreamMetadata::from(req.headers()))));
let mut resp = resp
.header("Connection", "Close")
.header("Allow", "PUT, SOURCE")
.body(())
@@ -134,6 +198,9 @@
publisher.send(buffer.split().freeze()).await.unwrap();
}
metadata.stream.store(Arc::new(None));
metadata.track.store(Arc::new(None));
} else {
panic!("someone's already streaming!");
}
@@ -147,29 +214,16 @@
write_response(&mut stream, resp).await;
stream
.write_all(
serde_json::to_string(metadata.load().as_ref())
.unwrap()
.as_bytes(),
)
.write_all(serde_json::to_string(&*metadata).unwrap().as_bytes())
.await
.unwrap();
}
"/admin/metadata" => {
let query = url::form_urlencoded::parse(req.uri().query().unwrap().as_bytes());
let mut meta = StreamMetadata::default();
for (key, value) in query {
match key.as_ref() {
"artist" => meta.artist = Some(value.into_owned()),
"title" => meta.title = Some(value.into_owned()),
"album" => meta.album = Some(value.into_owned()),
_ => {}
}
}
metadata.store(Arc::new(meta));
metadata
.track
.store(Arc::new(Some(TrackMetadata::from(query))));
let resp = resp.body(()).unwrap();
write_response(&mut stream, resp).await;
@@ -187,7 +241,7 @@
async fn listen_forward(port: u16) {
let (publisher, subscriber) = bounded::<bytes::Bytes>(128);
let publisher = Arc::new(Mutex::new(publisher));
let metadata = Arc::new(ArcSwap::new(Arc::from(StreamMetadata::default())));
let metadata = Arc::new(MetadataContainer::default());
let addr = SocketAddr::from(([127, 0, 0, 1], port));