diff --git a/src/events.rs b/src/events.rs index 0760b84..6e6f95c 100644 --- a/src/events.rs +++ b/src/events.rs @@ -5,6 +5,7 @@ use crate::spotify::PlayerEvent; pub enum Event { Player(PlayerEvent), + SessionDied, } pub type EventSender = Sender; diff --git a/src/main.rs b/src/main.rs index 0e5e491..c69e259 100644 --- a/src/main.rs +++ b/src/main.rs @@ -297,6 +297,7 @@ fn main() { queue.next(false); } } + Event::SessionDied => spotify.start_worker(None), } } } diff --git a/src/spotify.rs b/src/spotify.rs index 9fc67af..3bad3b8 100644 --- a/src/spotify.rs +++ b/src/spotify.rs @@ -76,13 +76,16 @@ pub enum PlayerEvent { } pub struct Spotify { + events: EventManager, + credentials: Credentials, + cfg: config::Config, status: RwLock, api: RwLock, elapsed: RwLock>, since: RwLock>, token_issued: RwLock>, - channel: mpsc::UnboundedSender, - user: String, + channel: RwLock>>, + user: Option, pub volume: AtomicU16, pub repeat: queue::RepeatSetting, pub shuffle: bool, @@ -140,6 +143,10 @@ impl futures::Future for Worker { loop { let mut progress = false; + if self.session.is_invalid() { + return Poll::Ready(Result::Err(())); + } + if let Poll::Ready(Some(cmd)) = self.commands.as_mut().poll_next(cx) { progress = true; debug!("message received!"); @@ -221,7 +228,6 @@ impl futures::Future for Worker { impl Spotify { pub fn new(events: EventManager, credentials: Credentials, cfg: &config::Config) -> Spotify { - let (user_tx, user_rx) = oneshot::channel(); let volume = match &cfg.saved_state { Some(state) => match state.volume { Some(vol) => ((std::cmp::min(vol, 100) as f32) / 100.0 * 65535_f32).ceil() as u16, @@ -248,35 +254,50 @@ impl Spotify { None => false, }; - let (tx, rx) = mpsc::unbounded(); - { - let cfg = cfg.clone(); - thread::spawn(move || { - Self::worker(events, Box::pin(rx), cfg, credentials, user_tx, volume) - }); - } - - let user = futures::executor::block_on(user_rx); - - let spotify = Spotify { + let mut spotify = Spotify { + events, + credentials, + cfg: cfg.clone(), status: RwLock::new(PlayerEvent::Stopped), api: RwLock::new(SpotifyAPI::default()), elapsed: RwLock::new(None), since: RwLock::new(None), token_issued: RwLock::new(None), - channel: tx, - user: user.expect("error retrieving userid from worker"), + channel: RwLock::new(None), + user: None, volume: AtomicU16::new(volume), repeat, shuffle, }; - // acquire token for web api usage - spotify.refresh_token(); + let (user_tx, user_rx) = oneshot::channel(); + spotify.start_worker(Some(user_tx)); + spotify.user = futures::executor::block_on(user_rx).ok(); spotify.set_volume(volume); + spotify } + pub fn start_worker(&self, user_tx: Option>) { + let (tx, rx) = mpsc::unbounded(); + *self + .channel + .write() + .expect("can't writelock worker channel") = Some(tx); + { + let cfg = self.cfg.clone(); + let events = self.events.clone(); + let volume = self.volume(); + let credentials = self.credentials.clone(); + thread::spawn(move || { + Self::worker(events, Box::pin(rx), cfg, credentials, user_tx, volume) + }); + } + + // acquire token for web api usage + self.refresh_token(); + } + pub fn session_config() -> SessionConfig { let mut session_config = SessionConfig::default(); match env::var("http_proxy") { @@ -350,7 +371,7 @@ impl Spotify { commands: Pin>>, cfg: config::Config, credentials: Credentials, - user_tx: oneshot::Sender, + user_tx: Option>, volume: u16, ) { let player_config = PlayerConfig { @@ -362,9 +383,7 @@ impl Spotify { let mut core = Core::new().unwrap(); let session = Self::create_session(&mut core, &cfg, credentials); - user_tx - .send(session.username()) - .expect("could not pass username back to Spotify::new"); + user_tx.map(|tx| tx.send(session.username())); let create_mixer = librespot_playback::mixer::find(Some("softvol".to_owned())) .expect("could not create softvol mixer"); @@ -379,10 +398,12 @@ impl Spotify { move || (backend)(cfg.backend_device), ); - let worker = Worker::new(events, commands, session, player, mixer); + let worker = Worker::new(events.clone(), commands, session, player, mixer); debug!("worker thread ready."); - core.run(futures::compat::Compat::new(worker)).unwrap(); - debug!("worker thread finished."); + if core.run(futures::compat::Compat::new(worker)).is_err() { + error!("worker thread died, requesting restart"); + events.send(Event::SessionDied) + } } pub fn get_current_status(&self) -> PlayerEvent { @@ -445,6 +466,10 @@ impl Spotify { let (token_tx, token_rx) = oneshot::channel(); self.channel + .read() + .expect("can't readlock worker channel") + .as_ref() + .expect("channel to worker is missing") .unbounded_send(WorkerCommand::RequestToken(token_tx)) .unwrap(); let token = futures::executor::block_on(token_rx).unwrap(); @@ -503,7 +528,12 @@ impl Spotify { position: Option, ) -> bool { self.api_with_retry(|api| { - api.user_playlist_add_tracks(&self.user, playlist_id, &tracks, position) + api.user_playlist_add_tracks( + self.user.as_ref().unwrap(), + playlist_id, + &tracks, + position, + ) }) .is_some() } @@ -523,9 +553,9 @@ impl Spotify { None }; - if let Some(()) = - self.api_with_retry(|api| api.user_playlist_replace_tracks(&self.user, id, &tracks)) - { + if let Some(()) = self.api_with_retry(|api| { + api.user_playlist_replace_tracks(self.user.as_ref().unwrap(), id, &tracks) + }) { debug!("saved {} tracks to playlist {}", tracks.len(), id); while let Some(ref mut tracks) = remainder.clone() { // grab the next set of 100 tracks @@ -549,7 +579,7 @@ impl Spotify { } pub fn delete_playlist(&self, id: &str) -> bool { - self.api_with_retry(|api| api.user_playlist_unfollow(&self.user, id)) + self.api_with_retry(|api| api.user_playlist_unfollow(self.user.as_ref().unwrap(), id)) .is_some() } @@ -560,7 +590,12 @@ impl Spotify { description: Option, ) -> Option { let result = self.api_with_retry(|api| { - api.user_playlist_create(&self.user, name, public, description.clone()) + api.user_playlist_create( + self.user.as_ref().unwrap(), + name, + public, + description.clone(), + ) }); result.map(|r| r.id) } @@ -606,9 +641,9 @@ impl Spotify { limit: u32, offset: u32, ) -> Option> { - let user = self.user.clone(); + let user = self.user.as_ref().unwrap(); self.api_with_retry(|api| { - api.user_playlist_tracks(&user, playlist_id, None, limit, offset, None) + api.user_playlist_tracks(user, playlist_id, None, limit, offset, None) }) } @@ -688,6 +723,10 @@ impl Spotify { pub fn load(&self, track: &Track) { info!("loading track: {:?}", track); self.channel + .read() + .expect("can't readlock worker channel") + .as_ref() + .expect("channel to worker is missing") .unbounded_send(WorkerCommand::Load(Box::new(track.clone()))) .unwrap(); } @@ -721,7 +760,13 @@ impl Spotify { pub fn play(&self) { info!("play()"); - self.channel.unbounded_send(WorkerCommand::Play).unwrap(); + self.channel + .read() + .expect("can't readlock worker channel") + .as_ref() + .expect("channel to worker is missing") + .unbounded_send(WorkerCommand::Play) + .unwrap(); } pub fn toggleplayback(&self) { @@ -738,12 +783,24 @@ impl Spotify { pub fn pause(&self) { info!("pause()"); - self.channel.unbounded_send(WorkerCommand::Pause).unwrap(); + self.channel + .read() + .expect("can't readlock worker channel") + .as_ref() + .expect("channel to worker is missing") + .unbounded_send(WorkerCommand::Pause) + .unwrap(); } pub fn stop(&self) { info!("stop()"); - self.channel.unbounded_send(WorkerCommand::Stop).unwrap(); + self.channel + .read() + .expect("can't readlock worker channel") + .as_ref() + .expect("channel to worker is missing") + .unbounded_send(WorkerCommand::Stop) + .unwrap(); } pub fn seek(&self, position_ms: u32) { @@ -755,6 +812,10 @@ impl Spotify { }); self.channel + .read() + .expect("can't readlock worker channel") + .as_ref() + .expect("channel to worker is missing") .unbounded_send(WorkerCommand::Seek(position_ms)) .unwrap(); } @@ -790,6 +851,10 @@ impl Spotify { info!("setting volume to {}", volume); self.volume.store(volume, Ordering::Relaxed); self.channel + .read() + .expect("can't readlock worker channel") + .as_ref() + .expect("channel to worker is missing") .unbounded_send(WorkerCommand::SetVolume(Self::log_scale(volume))) .unwrap(); }