diff --git a/src/main.rs b/src/main.rs index ec9cede..e68a8bd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -85,7 +85,6 @@ fn main() { event_manager.clone(), cfg.username, cfg.password, - config::CLIENT_ID.to_string(), )); let queue = Arc::new(queue::Queue::new(spotify.clone())); diff --git a/src/spotify.rs b/src/spotify.rs index 49d9b1d..1dabf44 100644 --- a/src/spotify.rs +++ b/src/spotify.rs @@ -1,7 +1,7 @@ use librespot::core::authentication::Credentials; use librespot::core::config::SessionConfig; -use librespot::core::keymaster::get_token; use librespot::core::keymaster::Token; +use librespot::core::mercury::MercuryError; use librespot::core::session::Session; use librespot::core::spotify_id::SpotifyId; use librespot::playback::config::PlayerConfig; @@ -31,6 +31,7 @@ use std::sync::RwLock; use std::thread; use std::time::{Duration, SystemTime}; +use config; use events::{Event, EventManager}; use track::Track; @@ -39,6 +40,7 @@ enum WorkerCommand { Play, Pause, Stop, + RequestToken(oneshot::Sender), } #[derive(Clone, PartialEq)] @@ -51,7 +53,7 @@ pub enum PlayerEvent { pub struct Spotify { status: RwLock, - pub api: SpotifyAPI, + api: RwLock, elapsed: RwLock>, since: RwLock>, channel: mpsc::UnboundedSender, @@ -61,9 +63,11 @@ pub struct Spotify { struct Worker { events: EventManager, commands: mpsc::UnboundedReceiver, + session: Session, player: Player, play_task: Box>, refresh_task: Box>, + token_task: Box>, active: bool, } @@ -71,14 +75,17 @@ impl Worker { fn new( events: EventManager, commands: mpsc::UnboundedReceiver, + session: Session, player: Player, ) -> Worker { Worker { events: events, commands: commands, player: player, + session: session, play_task: Box::new(futures::empty()), refresh_task: Box::new(futures::stream::empty()), + token_task: Box::new(futures::empty()), active: false, } } @@ -128,6 +135,10 @@ impl futures::Future for Worker { self.events.send(Event::Player(PlayerEvent::Stopped)); self.active = false; } + WorkerCommand::RequestToken(sender) => { + self.token_task = Spotify::get_token(&self.session, sender); + progress = true; + } } } match self.play_task.poll() { @@ -154,6 +165,16 @@ impl futures::Future for Worker { } _ => (), } + match self.token_task.poll() { + Ok(Async::Ready(_)) => { + info!("token updated!"); + self.token_task = Box::new(futures::empty()) + } + Ok(Async::NotReady) => debug!("waiting for token"), + Err(e) => { + error!("could not generate token: {:?}", e); + } + } if !progress { return Ok(Async::NotReady); @@ -163,8 +184,7 @@ impl futures::Future for Worker { } impl Spotify { - pub fn new(events: EventManager, user: String, password: String, client_id: String) -> Spotify { - let session_config = SessionConfig::default(); + pub fn new(events: EventManager, user: String, password: String) -> Spotify { let player_config = PlayerConfig { bitrate: Bitrate::Bitrate320, normalisation: false, @@ -173,62 +193,75 @@ impl Spotify { let credentials = Credentials::with_password(user.clone(), password.clone()); let (tx, rx) = mpsc::unbounded(); - let (p, c) = oneshot::channel(); { let events = events.clone(); - thread::spawn(move || { - Self::worker( - events, - rx, - p, - session_config, - player_config, - credentials, - client_id, - ) - }); + thread::spawn(move || Self::worker(events, rx, player_config, credentials)); } - let token = c.wait().unwrap(); - debug!("token received: {:?}", token); - let api = SpotifyAPI::default().access_token(&token.access_token); - - Spotify { + let spotify = Spotify { status: RwLock::new(PlayerEvent::Stopped), - api: api, + api: RwLock::new(SpotifyAPI::default()), elapsed: RwLock::new(None), since: RwLock::new(None), channel: tx, user: user, - } + }; + + // acquire token for web api usage + spotify.refresh_token(); + spotify + } + + fn create_session(core: &mut Core, credentials: Credentials) -> Session { + let session_config = SessionConfig::default(); + let handle = core.handle(); + core.run(Session::connect(session_config, credentials, None, handle)) + .ok() + .unwrap() + } + + fn get_token( + session: &Session, + sender: oneshot::Sender, + ) -> Box> { + let client_id = config::CLIENT_ID; + let scopes = "user-read-private,playlist-read-private,playlist-read-collaborative,playlist-modify-public,playlist-modify-private,user-follow-modify,user-follow-read,user-library-read,user-library-modify,user-top-read,user-read-recently-played"; + let url = format!( + "hm://keymaster/token/authenticated?client_id={}&scope={}", + client_id, scopes + ); + Box::new( + session + .mercury() + .get(url) + .map(move |response| { + let data = response.payload.first().expect("Empty payload"); + let data = String::from_utf8(data.clone()).unwrap(); + let token: Token = serde_json::from_str(&data).unwrap(); + info!("new token received: {:?}", token); + token + }) + .map(|token| sender.send(token).unwrap()), + ) } fn worker( events: EventManager, commands: mpsc::UnboundedReceiver, - token_channel: oneshot::Sender, - session_config: SessionConfig, player_config: PlayerConfig, credentials: Credentials, - client_id: String, ) { let mut core = Core::new().unwrap(); - let handle = core.handle(); - let session = core - .run(Session::connect(session_config, credentials, None, handle)) - .ok() - .unwrap(); - - let scopes = "user-read-private,playlist-read-private,playlist-read-collaborative,playlist-modify-public,playlist-modify-private,user-follow-modify,user-follow-read,user-library-read,user-library-modify,user-top-read,user-read-recently-played"; - let token = core.run(get_token(&session, &client_id, &scopes)).unwrap(); - token_channel.send(token).unwrap(); + let session = Self::create_session(&mut core, credentials); let backend = audio_backend::find(None).unwrap(); let (player, _eventchannel) = - Player::new(player_config, session, None, move || (backend)(None)); + Player::new(player_config, session.clone(), None, move || { + (backend)(None) + }); - let worker = Worker::new(events, commands, player); + let worker = Worker::new(events, commands, session, player); debug!("worker thread ready."); core.run(worker).unwrap(); debug!("worker thread finished."); @@ -282,22 +315,48 @@ impl Spotify { (*since).clone() } + fn refresh_token(&self) { + let (token_tx, token_rx) = oneshot::channel(); + self.channel + .unbounded_send(WorkerCommand::RequestToken(token_tx)) + .unwrap(); + let token = token_rx.wait().unwrap(); + + // update token used by web api calls + self.api.write().expect("can't writelock api").access_token = Some(token.access_token); + } + /// retries once when rate limits are hit fn api_with_retry(&self, cb: F) -> Option where F: Fn(&SpotifyAPI) -> Result, { - match cb(&self.api) { + let result = { + let api = self.api.read().expect("can't read api"); + cb(&api) + }; + match result { Ok(v) => Some(v), Err(e) => { debug!("api error: {:?}", e); if let Ok(apierror) = e.downcast::() { - if let ApiError::RateLimited(d) = apierror { - debug!("rate limit hit. waiting {:?} seconds", d); - thread::sleep(Duration::from_secs(d.unwrap_or(0) as u64)); - cb(&self.api).ok() - } else { - None + match apierror { + ApiError::RateLimited(d) => { + debug!("rate limit hit. waiting {:?} seconds", d); + thread::sleep(Duration::from_secs(d.unwrap_or(0) as u64)); + let api = self.api.read().expect("can't read api"); + cb(&api).ok() + } + ApiError::Unauthorized => { + debug!("token unauthorized. trying refresh.."); + self.refresh_token(); + let api = self.api.read().expect("can't read api"); + cb(&api).ok() + } + e => { + error!("unhandled api error: {}", e); + None + } } } else { None @@ -324,8 +383,9 @@ impl Spotify { limit: u32, offset: u32, ) -> Option> { + let user = self.user.clone(); self.api_with_retry(|api| { - api.user_playlist_tracks(&self.user, playlist_id, None, limit, offset, None) + api.user_playlist_tracks(&user, playlist_id, None, limit, offset, None) }) }