From e99caac73a44f9500169a9892d29b494abe6e056 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Sat, 22 Feb 2020 22:45:05 +0000 Subject: [PATCH] Move to async-based API --- Cargo.toml | 5 +++-- src/device.rs | 148 +++++++++++++++++++++++++++++++++++++++++++++++++------------------------------- src/discovery.rs | 43 +++++++++++++++++++++---------------------- 3 files changed, 91 insertions(+), 105 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1649359..8f5bf1e 100644 --- a/Cargo.toml +++ a/Cargo.toml @@ -10,9 +10,10 @@ edition = "2018" [dependencies] -reqwest = { version = "0.10", features = ["blocking"] } +reqwest = "0.10" log = "0.4" -ssdp = "0.7" +ssdp-client = "0.5" +futures = "0.3" xmltree = "0.10" failure = "0.1" regex = "1" diff --git a/src/device.rs b/src/device.rs index e355bcc..0bca45c 100644 --- a/src/device.rs +++ a/src/device.rs @@ -1,6 +1,6 @@ use std::net::IpAddr; -use std::io::Read; use std::time::Duration; + use xmltree::{Element, XMLNode}; use reqwest::header::HeaderMap; use regex::Regex; @@ -53,14 +53,14 @@ impl Speaker { /// Create a new instance of this struct from an IP address - pub fn from_ip(ip: IpAddr) -> Result { - let resp = reqwest::blocking::get(&format!("http://{}:1400/xml/device_description.xml", ip))?; + pub async fn from_ip(ip: IpAddr) -> Result { + let resp = reqwest::get(&format!("http://{}:1400/xml/device_description.xml", ip)).await?; if !resp.status().is_success() { return Err(SonosError::BadResponse(resp.status().as_u16()).into()); } - let elements = Element::parse(resp)?; + let elements = Element::parse(resp.bytes().await?.as_ref())?; let device_description = elements .get_child("device") .ok_or_else(|| SonosError::ParseError("missing root element"))?; @@ -95,22 +95,15 @@ /// Get the coordinator for this speaker. #[deprecated(note = "Broken on Sonos 9.1")] - pub fn coordinator(&self) -> Result { - let mut resp = reqwest::blocking::get(&format!("http://{}:1400/status/topology", self.ip))?; + pub async fn coordinator(&self) -> Result { + let resp = reqwest::get(&format!("http://{}:1400/status/topology", self.ip)).await?; if !resp.status().is_success() { return Err(SonosError::BadResponse(resp.status().as_u16()).into()); } - let mut content = String::new(); - resp.read_to_string(&mut content)?; + let content = resp.text().await?; - // clean up xml so xmltree can read it - let content = content.replace( - "", - "", - ); - // parse the topology xml let elements = Element::parse(content.as_bytes())?; @@ -163,7 +156,7 @@ /// * `payload` - XML doc to pass inside the action call body /// * `coordinator` - Whether this SOAP call should be performed on the group coordinator or /// the speaker it was called on - pub fn soap( + pub async fn soap( &self, endpoint: &str, service: &str, @@ -175,9 +168,9 @@ headers.insert("Content-Type", "application/xml".parse()?); headers.insert("SOAPAction", format!("\"{}#{}\"", service, action).parse()?); - let client = reqwest::blocking::Client::new(); + let client = reqwest::Client::new(); let coordinator = if coordinator { - self.coordinator()? + self.coordinator().await? } else { self.ip }; @@ -201,9 +194,10 @@ action = action, payload = payload )) - .send()?; + .send() + .await?; - let element = Element::parse(request)?; + let element = Element::parse(request.bytes().await?.as_ref())?; let body = element .get_child("Body") @@ -230,72 +224,72 @@ } /// Play the current track - pub fn play(&self) -> Result<(), Error> { + pub async fn play(&self) -> Result<(), Error> { self.soap( "MediaRenderer/AVTransport/Control", "urn:schemas-upnp-org:service:AVTransport:1", "Play", "01", true, - )?; + ).await?; Ok(()) } /// Pause the current track - pub fn pause(&self) -> Result<(), Error> { + pub async fn pause(&self) -> Result<(), Error> { self.soap( "MediaRenderer/AVTransport/Control", "urn:schemas-upnp-org:service:AVTransport:1", "Pause", "0", true, - )?; + ).await?; Ok(()) } /// Stop the current queue - pub fn stop(&self) -> Result<(), Error> { + pub async fn stop(&self) -> Result<(), Error> { self.soap( "MediaRenderer/AVTransport/Control", "urn:schemas-upnp-org:service:AVTransport:1", "Stop", "0", true, - )?; + ).await?; Ok(()) } /// Skip the current track - pub fn next(&self) -> Result<(), Error> { + pub async fn next(&self) -> Result<(), Error> { self.soap( "MediaRenderer/AVTransport/Control", "urn:schemas-upnp-org:service:AVTransport:1", "Next", "0", true, - )?; + ).await?; Ok(()) } /// Go to the previous track - pub fn previous(&self) -> Result<(), Error> { + pub async fn previous(&self) -> Result<(), Error> { self.soap( "MediaRenderer/AVTransport/Control", "urn:schemas-upnp-org:service:AVTransport:1", "Previous", "0", true, - )?; + ).await?; Ok(()) } /// Seek to a time on the current track - pub fn seek(&self, time: &Duration) -> Result<(), Error> { + pub async fn seek(&self, time: &Duration) -> Result<(), Error> { const SECS_PER_MINUTE: u64 = 60; const MINS_PER_HOUR: u64 = 60; const SECS_PER_HOUR: u64 = 3600; @@ -313,13 +307,13 @@ hours, minutes, seconds ), true, - )?; + ).await?; Ok(()) } /// Change the track, beginning at 1 - pub fn play_queue_item(&self, track: &u64) -> Result<(), Error> { + pub async fn play_queue_item(&self, track: &u64) -> Result<(), Error> { self.soap( "MediaRenderer/AVTransport/Control", "urn:schemas-upnp-org:service:AVTransport:1", @@ -329,13 +323,13 @@ track ), true, - )?; + ).await?; Ok(()) } /// Remove track at index from queue, beginning at 1 - pub fn remove_track(&self, track: &u64) -> Result<(), Error> { + pub async fn remove_track(&self, track: &u64) -> Result<(), Error> { self.soap( "MediaRenderer/AVTransport/Control", "urn:schemas-upnp-org:service:AVTransport:1", @@ -345,13 +339,13 @@ track ), true, - )?; + ).await?; Ok(()) } /// Add a new track to the end of the queue - pub fn queue_track(&self, uri: &str) -> Result<(), Error> { + pub async fn queue_track(&self, uri: &str) -> Result<(), Error> { self.soap( "MediaRenderer/AVTransport/Control", "urn:schemas-upnp-org:service:AVTransport:1", @@ -366,13 +360,13 @@ uri ), true, - )?; + ).await?; Ok(()) } /// Add a track to the queue to play next - pub fn queue_next(&self, uri: &str) -> Result<(), Error> { + pub async fn queue_next(&self, uri: &str) -> Result<(), Error> { self.soap( "MediaRenderer/AVTransport/Control", "urn:schemas-upnp-org:service:AVTransport:1", @@ -387,13 +381,13 @@ uri ), true, - )?; + ).await?; Ok(()) } /// Replace the current track with a new one - pub fn play_track(&self, uri: &str) -> Result<(), Error> { + pub async fn play_track(&self, uri: &str) -> Result<(), Error> { self.soap( "MediaRenderer/AVTransport/Control", "urn:schemas-upnp-org:service:AVTransport:1", @@ -406,42 +400,42 @@ uri ), true, - )?; + ).await?; Ok(()) } /// Remove every track from the queue - pub fn clear_queue(&self) -> Result<(), Error> { + pub async fn clear_queue(&self) -> Result<(), Error> { self.soap( "MediaRenderer/AVTransport/Control", "urn:schemas-upnp-org:service:AVTransport:1", "RemoveAllTracksFromQueue", "0", true, - )?; + ).await?; Ok(()) } /// Get the current volume - pub fn volume(&self) -> Result { + pub async fn volume(&self) -> Result { let res = self.soap( "MediaRenderer/RenderingControl/Control", "urn:schemas-upnp-org:service:RenderingControl:1", "GetVolume", "0Master", false, - )?; + ).await?; - let volume = element_to_string(res.get_child("CurrentVolume").ok_or_else(|| SonosError::ParseError("failed to find CurrentVolume element"))?) - .parse::()?; + let volume_element = res.get_child("CurrentVolume").ok_or_else(|| SonosError::ParseError("failed to find CurrentVolume element"))?; + let volume = element_to_string(volume_element).parse::()?; Ok(volume) } /// Set a new volume from 0-100. - pub fn set_volume(&self, volume: u8) -> Result<(), Error> { + pub async fn set_volume(&self, volume: u8) -> Result<(), Error> { if volume > 100 { panic!("Volume must be between 0 and 100, got {}.", volume); } @@ -458,89 +452,87 @@ volume ), false, - )?; + ).await?; + Ok(()) } /// Check if this player is currently muted - pub fn muted(&self) -> Result { + pub async fn muted(&self) -> Result { let resp = self.soap( "MediaRenderer/RenderingControl/Control", "urn:schemas-upnp-org:service:RenderingControl:1", "GetMute", "0Master", false, - )?; - - Ok(match element_to_string(resp.get_child("CurrentMute") - .ok_or_else(|| SonosError::ParseError("failed to find CurrentMute element"))?) - .as_str() - { + ).await?; + + let mute_element = resp.get_child("CurrentMute").ok_or_else(|| SonosError::ParseError("failed to find CurrentMute element"))?; + + Ok(match element_to_string(mute_element).as_str() { "1" => true, "0" | _ => false, }) } /// Mute the current player - pub fn mute(&self) -> Result<(), Error> { + pub async fn mute(&self) -> Result<(), Error> { self.soap( "MediaRenderer/RenderingControl/Control", "urn:schemas-upnp-org:service:RenderingControl:1", "SetMute", "0Master1", false, - )?; + ).await?; Ok(()) } /// Unmute the current player - pub fn unmute(&self) -> Result<(), Error> { + pub async fn unmute(&self) -> Result<(), Error> { self.soap( "MediaRenderer/RenderingControl/Control", "urn:schemas-upnp-org:service:RenderingControl:1", "SetMute", "0Master0", false, - )?; + ).await?; Ok(()) } /// Get the transport state of the current player - pub fn transport_state(&self) -> Result { + pub async fn transport_state(&self) -> Result { let resp = self.soap( "MediaRenderer/AVTransport/Control", "urn:schemas-upnp-org:service:AVTransport:1", "GetTransportInfo", "0", false, - )?; - - Ok( - match element_to_string(resp.get_child("CurrentTransportState") - .ok_or_else(|| SonosError::ParseError("failed to find CurrentTransportState element"))?) - .as_str() - { - "PLAYING" => TransportState::Playing, - "PAUSED_PLAYBACK" => TransportState::PausedPlayback, - "PAUSED_RECORDING" => TransportState::PausedRecording, - "RECORDING" => TransportState::Recording, - "TRANSITIONING" => TransportState::Transitioning, - "STOPPED" | _ => TransportState::Stopped, - }, - ) + ).await?; + + let transport_state_element = resp.get_child("CurrentTransportState") + .ok_or_else(|| SonosError::ParseError("failed to find CurrentTransportState element"))?; + + Ok(match element_to_string(transport_state_element).as_str() { + "PLAYING" => TransportState::Playing, + "PAUSED_PLAYBACK" => TransportState::PausedPlayback, + "PAUSED_RECORDING" => TransportState::PausedRecording, + "RECORDING" => TransportState::Recording, + "TRANSITIONING" => TransportState::Transitioning, + "STOPPED" | _ => TransportState::Stopped, + }) } /// Get information about the current track - pub fn track(&self) -> Result { + pub async fn track(&self) -> Result { let resp = self.soap( "MediaRenderer/AVTransport/Control", "urn:schemas-upnp-org:service:AVTransport:1", "GetPositionInfo", "0", true, - )?; + ).await?; let metadata = element_to_string(resp.get_child("TrackMetaData") .ok_or_else(|| SonosError::ParseError("failed to find TrackMetaData element"))?); diff --git a/src/discovery.rs b/src/discovery.rs index 9eaa530..7227e0c 100644 --- a/src/discovery.rs +++ a/src/discovery.rs @@ -1,42 +1,35 @@ -use ssdp::FieldMap; -use ssdp::header::{HeaderMut, HeaderRef, Man, MX, ST}; -use ssdp::message::{Multicast, SearchRequest, SearchResponse}; - -use failure::{Error, SyncFailure}; - use crate::device::Speaker; -use crate::error::*; -const SONOS_URN: &str = "schemas-upnp-org:device:ZonePlayer:1"; +use std::time::Duration; +use regex::Regex; -/// Convenience method to grab a header from an SSDP search as a string. -fn get_header(msg: &SearchResponse, header: &str) -> Result { - let bytes = msg.get_raw(header).ok_or_else(|| SonosError::ParseError("failed to find header"))?; +use ssdp_client::URN; +use failure::Error; - Ok(String::from_utf8(bytes[0].clone())?) +use futures::prelude::*; + +lazy_static! { + static ref LOCATION_REGEX: Regex = Regex::new(r"^https?://(.+?):1400/xml") + .expect("Failed to create regex"); } /// Discover all speakers on the current network. /// /// This method **will** block for 2 seconds while waiting for broadcast responses. -pub fn discover() -> Result, Error> { - let mut request = SearchRequest::new(); +pub async fn discover() -> Result, Error> { + let search_target = URN::device("schemas-upnp-org", "ZonePlayer", 1).into(); + let timeout = Duration::from_secs(2); + let responses = ssdp_client::search(&search_target, timeout, 1).await?; + futures::pin_mut!(responses); - request.set(Man); // required header for discovery - request.set(MX(2)); // set maximum wait to 2 seconds - request.set(ST::Target(FieldMap::URN(String::from(SONOS_URN)))); // we're only looking for sonos - let mut speakers = Vec::new(); - for (msg, src) in request.multicast().map_err(SyncFailure::new)? { - let usn = get_header(&msg, "USN")?; + while let Some(response) = responses.next().await { + let response = response?; - if !usn.contains(SONOS_URN) { - error!("Misbehaving client responded to our discovery ({})", usn); - continue; + if let Some(ip) = LOCATION_REGEX.captures(response.location()).and_then(|x| x.get(1)).map(|x| x.as_str()) { + speakers.push(Speaker::from_ip(ip.parse()?).await?); } - - speakers.push(Speaker::from_ip(src.ip())?); } Ok(speakers) -- rgit 0.1.3