recreate librespot session when it's invalid

initial attempt to handle #192
This commit is contained in:
Henrik Friedrichsen
2020-06-13 20:23:28 +02:00
parent e55d0ac7ba
commit 8013609d0e
3 changed files with 103 additions and 36 deletions

View File

@@ -5,6 +5,7 @@ use crate::spotify::PlayerEvent;
pub enum Event { pub enum Event {
Player(PlayerEvent), Player(PlayerEvent),
SessionDied,
} }
pub type EventSender = Sender<Event>; pub type EventSender = Sender<Event>;

View File

@@ -297,6 +297,7 @@ fn main() {
queue.next(false); queue.next(false);
} }
} }
Event::SessionDied => spotify.start_worker(None),
} }
} }
} }

View File

@@ -76,13 +76,16 @@ pub enum PlayerEvent {
} }
pub struct Spotify { pub struct Spotify {
events: EventManager,
credentials: Credentials,
cfg: config::Config,
status: RwLock<PlayerEvent>, status: RwLock<PlayerEvent>,
api: RwLock<SpotifyAPI>, api: RwLock<SpotifyAPI>,
elapsed: RwLock<Option<Duration>>, elapsed: RwLock<Option<Duration>>,
since: RwLock<Option<SystemTime>>, since: RwLock<Option<SystemTime>>,
token_issued: RwLock<Option<SystemTime>>, token_issued: RwLock<Option<SystemTime>>,
channel: mpsc::UnboundedSender<WorkerCommand>, channel: RwLock<Option<mpsc::UnboundedSender<WorkerCommand>>>,
user: String, user: Option<String>,
pub volume: AtomicU16, pub volume: AtomicU16,
pub repeat: queue::RepeatSetting, pub repeat: queue::RepeatSetting,
pub shuffle: bool, pub shuffle: bool,
@@ -140,6 +143,10 @@ impl futures::Future for Worker {
loop { loop {
let mut progress = false; let mut progress = false;
if self.session.is_invalid() {
return Poll::Ready(Result::Err(()));
}
if let Poll::Ready(Some(cmd)) = self.commands.as_mut().poll_next(cx) { if let Poll::Ready(Some(cmd)) = self.commands.as_mut().poll_next(cx) {
progress = true; progress = true;
debug!("message received!"); debug!("message received!");
@@ -221,7 +228,6 @@ impl futures::Future for Worker {
impl Spotify { impl Spotify {
pub fn new(events: EventManager, credentials: Credentials, cfg: &config::Config) -> Spotify { pub fn new(events: EventManager, credentials: Credentials, cfg: &config::Config) -> Spotify {
let (user_tx, user_rx) = oneshot::channel();
let volume = match &cfg.saved_state { let volume = match &cfg.saved_state {
Some(state) => match state.volume { Some(state) => match state.volume {
Some(vol) => ((std::cmp::min(vol, 100) as f32) / 100.0 * 65535_f32).ceil() as u16, Some(vol) => ((std::cmp::min(vol, 100) as f32) / 100.0 * 65535_f32).ceil() as u16,
@@ -248,35 +254,50 @@ impl Spotify {
None => false, None => false,
}; };
let (tx, rx) = mpsc::unbounded(); let mut spotify = Spotify {
{ events,
let cfg = cfg.clone(); credentials,
thread::spawn(move || { cfg: cfg.clone(),
Self::worker(events, Box::pin(rx), cfg, credentials, user_tx, volume)
});
}
let user = futures::executor::block_on(user_rx);
let spotify = Spotify {
status: RwLock::new(PlayerEvent::Stopped), status: RwLock::new(PlayerEvent::Stopped),
api: RwLock::new(SpotifyAPI::default()), api: RwLock::new(SpotifyAPI::default()),
elapsed: RwLock::new(None), elapsed: RwLock::new(None),
since: RwLock::new(None), since: RwLock::new(None),
token_issued: RwLock::new(None), token_issued: RwLock::new(None),
channel: tx, channel: RwLock::new(None),
user: user.expect("error retrieving userid from worker"), user: None,
volume: AtomicU16::new(volume), volume: AtomicU16::new(volume),
repeat, repeat,
shuffle, shuffle,
}; };
// acquire token for web api usage let (user_tx, user_rx) = oneshot::channel();
spotify.refresh_token(); spotify.start_worker(Some(user_tx));
spotify.user = futures::executor::block_on(user_rx).ok();
spotify.set_volume(volume); spotify.set_volume(volume);
spotify spotify
} }
pub fn start_worker(&self, user_tx: Option<oneshot::Sender<String>>) {
let (tx, rx) = mpsc::unbounded();
*self
.channel
.write()
.expect("can't writelock worker channel") = Some(tx);
{
let cfg = self.cfg.clone();
let events = self.events.clone();
let volume = self.volume();
let credentials = self.credentials.clone();
thread::spawn(move || {
Self::worker(events, Box::pin(rx), cfg, credentials, user_tx, volume)
});
}
// acquire token for web api usage
self.refresh_token();
}
pub fn session_config() -> SessionConfig { pub fn session_config() -> SessionConfig {
let mut session_config = SessionConfig::default(); let mut session_config = SessionConfig::default();
match env::var("http_proxy") { match env::var("http_proxy") {
@@ -350,7 +371,7 @@ impl Spotify {
commands: Pin<Box<mpsc::UnboundedReceiver<WorkerCommand>>>, commands: Pin<Box<mpsc::UnboundedReceiver<WorkerCommand>>>,
cfg: config::Config, cfg: config::Config,
credentials: Credentials, credentials: Credentials,
user_tx: oneshot::Sender<String>, user_tx: Option<oneshot::Sender<String>>,
volume: u16, volume: u16,
) { ) {
let player_config = PlayerConfig { let player_config = PlayerConfig {
@@ -362,9 +383,7 @@ impl Spotify {
let mut core = Core::new().unwrap(); let mut core = Core::new().unwrap();
let session = Self::create_session(&mut core, &cfg, credentials); let session = Self::create_session(&mut core, &cfg, credentials);
user_tx user_tx.map(|tx| tx.send(session.username()));
.send(session.username())
.expect("could not pass username back to Spotify::new");
let create_mixer = librespot_playback::mixer::find(Some("softvol".to_owned())) let create_mixer = librespot_playback::mixer::find(Some("softvol".to_owned()))
.expect("could not create softvol mixer"); .expect("could not create softvol mixer");
@@ -379,10 +398,12 @@ impl Spotify {
move || (backend)(cfg.backend_device), move || (backend)(cfg.backend_device),
); );
let worker = Worker::new(events, commands, session, player, mixer); let worker = Worker::new(events.clone(), commands, session, player, mixer);
debug!("worker thread ready."); debug!("worker thread ready.");
core.run(futures::compat::Compat::new(worker)).unwrap(); if core.run(futures::compat::Compat::new(worker)).is_err() {
debug!("worker thread finished."); error!("worker thread died, requesting restart");
events.send(Event::SessionDied)
}
} }
pub fn get_current_status(&self) -> PlayerEvent { pub fn get_current_status(&self) -> PlayerEvent {
@@ -445,6 +466,10 @@ impl Spotify {
let (token_tx, token_rx) = oneshot::channel(); let (token_tx, token_rx) = oneshot::channel();
self.channel self.channel
.read()
.expect("can't readlock worker channel")
.as_ref()
.expect("channel to worker is missing")
.unbounded_send(WorkerCommand::RequestToken(token_tx)) .unbounded_send(WorkerCommand::RequestToken(token_tx))
.unwrap(); .unwrap();
let token = futures::executor::block_on(token_rx).unwrap(); let token = futures::executor::block_on(token_rx).unwrap();
@@ -503,7 +528,12 @@ impl Spotify {
position: Option<i32>, position: Option<i32>,
) -> bool { ) -> bool {
self.api_with_retry(|api| { self.api_with_retry(|api| {
api.user_playlist_add_tracks(&self.user, playlist_id, &tracks, position) api.user_playlist_add_tracks(
self.user.as_ref().unwrap(),
playlist_id,
&tracks,
position,
)
}) })
.is_some() .is_some()
} }
@@ -523,9 +553,9 @@ impl Spotify {
None None
}; };
if let Some(()) = if let Some(()) = self.api_with_retry(|api| {
self.api_with_retry(|api| api.user_playlist_replace_tracks(&self.user, id, &tracks)) api.user_playlist_replace_tracks(self.user.as_ref().unwrap(), id, &tracks)
{ }) {
debug!("saved {} tracks to playlist {}", tracks.len(), id); debug!("saved {} tracks to playlist {}", tracks.len(), id);
while let Some(ref mut tracks) = remainder.clone() { while let Some(ref mut tracks) = remainder.clone() {
// grab the next set of 100 tracks // grab the next set of 100 tracks
@@ -549,7 +579,7 @@ impl Spotify {
} }
pub fn delete_playlist(&self, id: &str) -> bool { pub fn delete_playlist(&self, id: &str) -> bool {
self.api_with_retry(|api| api.user_playlist_unfollow(&self.user, id)) self.api_with_retry(|api| api.user_playlist_unfollow(self.user.as_ref().unwrap(), id))
.is_some() .is_some()
} }
@@ -560,7 +590,12 @@ impl Spotify {
description: Option<String>, description: Option<String>,
) -> Option<String> { ) -> Option<String> {
let result = self.api_with_retry(|api| { let result = self.api_with_retry(|api| {
api.user_playlist_create(&self.user, name, public, description.clone()) api.user_playlist_create(
self.user.as_ref().unwrap(),
name,
public,
description.clone(),
)
}); });
result.map(|r| r.id) result.map(|r| r.id)
} }
@@ -606,9 +641,9 @@ impl Spotify {
limit: u32, limit: u32,
offset: u32, offset: u32,
) -> Option<Page<PlaylistTrack>> { ) -> Option<Page<PlaylistTrack>> {
let user = self.user.clone(); let user = self.user.as_ref().unwrap();
self.api_with_retry(|api| { self.api_with_retry(|api| {
api.user_playlist_tracks(&user, playlist_id, None, limit, offset, None) api.user_playlist_tracks(user, playlist_id, None, limit, offset, None)
}) })
} }
@@ -688,6 +723,10 @@ impl Spotify {
pub fn load(&self, track: &Track) { pub fn load(&self, track: &Track) {
info!("loading track: {:?}", track); info!("loading track: {:?}", track);
self.channel self.channel
.read()
.expect("can't readlock worker channel")
.as_ref()
.expect("channel to worker is missing")
.unbounded_send(WorkerCommand::Load(Box::new(track.clone()))) .unbounded_send(WorkerCommand::Load(Box::new(track.clone())))
.unwrap(); .unwrap();
} }
@@ -721,7 +760,13 @@ impl Spotify {
pub fn play(&self) { pub fn play(&self) {
info!("play()"); info!("play()");
self.channel.unbounded_send(WorkerCommand::Play).unwrap(); self.channel
.read()
.expect("can't readlock worker channel")
.as_ref()
.expect("channel to worker is missing")
.unbounded_send(WorkerCommand::Play)
.unwrap();
} }
pub fn toggleplayback(&self) { pub fn toggleplayback(&self) {
@@ -738,12 +783,24 @@ impl Spotify {
pub fn pause(&self) { pub fn pause(&self) {
info!("pause()"); info!("pause()");
self.channel.unbounded_send(WorkerCommand::Pause).unwrap(); self.channel
.read()
.expect("can't readlock worker channel")
.as_ref()
.expect("channel to worker is missing")
.unbounded_send(WorkerCommand::Pause)
.unwrap();
} }
pub fn stop(&self) { pub fn stop(&self) {
info!("stop()"); info!("stop()");
self.channel.unbounded_send(WorkerCommand::Stop).unwrap(); self.channel
.read()
.expect("can't readlock worker channel")
.as_ref()
.expect("channel to worker is missing")
.unbounded_send(WorkerCommand::Stop)
.unwrap();
} }
pub fn seek(&self, position_ms: u32) { pub fn seek(&self, position_ms: u32) {
@@ -755,6 +812,10 @@ impl Spotify {
}); });
self.channel self.channel
.read()
.expect("can't readlock worker channel")
.as_ref()
.expect("channel to worker is missing")
.unbounded_send(WorkerCommand::Seek(position_ms)) .unbounded_send(WorkerCommand::Seek(position_ms))
.unwrap(); .unwrap();
} }
@@ -790,6 +851,10 @@ impl Spotify {
info!("setting volume to {}", volume); info!("setting volume to {}", volume);
self.volume.store(volume, Ordering::Relaxed); self.volume.store(volume, Ordering::Relaxed);
self.channel self.channel
.read()
.expect("can't readlock worker channel")
.as_ref()
.expect("channel to worker is missing")
.unbounded_send(WorkerCommand::SetVolume(Self::log_scale(volume))) .unbounded_send(WorkerCommand::SetVolume(Self::log_scale(volume)))
.unwrap(); .unwrap();
} }