use bus_queue::flavors::arc_swap::{async_bounded, AsyncPublisher, AsyncSubscriber}; use bytes::Bytes; use mime::Mime; use std::sync::atomic::{AtomicU64, Ordering}; use tokio::sync::Mutex; use ustr::ustr; use crate::metadata::MetadataContainer; pub struct Stream { publisher: Mutex>, subscriber: AsyncSubscriber, metadata: MetadataContainer, #[allow(dead_code)] // unimplemented bitrate: usize, #[allow(dead_code)] // unimplemented max_conns: usize, #[allow(dead_code)] // unimplemented password: String, pub content_type: Mime, pub listen_uri: &'static str, } impl From for Stream { fn from(config: crate::config::StreamConfig) -> Self { let (publisher, subscriber) = async_bounded::(config.buffer_size); let content_type: Mime = config.content_type.parse().unwrap(); let listen_uri = match (content_type.type_(), content_type.subtype()) { (mime::AUDIO, mime::MPEG) => ustr("/listen.mp3").as_str(), (mime::AUDIO, mime::OGG) => ustr("/listen.ogg").as_str(), _ => panic!("unknown stream content type"), }; Self { publisher: Mutex::new(publisher), subscriber, metadata: MetadataContainer::default(), bitrate: config.bitrate, max_conns: config.max_conns, password: config.password, content_type, listen_uri, } } } impl Stream { pub fn listen(&self) -> Listener<'_> { Listener::new(self) } pub fn try_broadcast(&self) -> Result, tokio::sync::TryLockError> { Ok(BroadcastHandle { publisher: self.publisher.try_lock()?, }) } pub fn metadata(&self) -> &MetadataContainer { &self.metadata } } pub struct BroadcastHandle<'a> { pub publisher: tokio::sync::MutexGuard<'a, AsyncPublisher>, } pub struct Listener<'a> { pub subscriber: AsyncSubscriber, listener_count: &'a AtomicU64, } impl Drop for Listener<'_> { fn drop(&mut self) { self.listener_count.fetch_sub(1, Ordering::Relaxed); } } impl<'a> Listener<'a> { pub fn new(stream: &'a Stream) -> Listener<'a> { stream.metadata.listeners.fetch_add(1, Ordering::Relaxed); Self { subscriber: stream.subscriber.clone(), listener_count: &stream.metadata.listeners, } } }