Merge branch 'feature/handle_expired_tokens' into develop

This commit is contained in:
Henrik Friedrichsen
2019-03-22 16:09:09 +01:00
2 changed files with 105 additions and 46 deletions

View File

@@ -85,7 +85,6 @@ fn main() {
event_manager.clone(), event_manager.clone(),
cfg.username, cfg.username,
cfg.password, cfg.password,
config::CLIENT_ID.to_string(),
)); ));
let queue = Arc::new(queue::Queue::new(spotify.clone())); let queue = Arc::new(queue::Queue::new(spotify.clone()));

View File

@@ -1,7 +1,7 @@
use librespot::core::authentication::Credentials; use librespot::core::authentication::Credentials;
use librespot::core::config::SessionConfig; use librespot::core::config::SessionConfig;
use librespot::core::keymaster::get_token;
use librespot::core::keymaster::Token; use librespot::core::keymaster::Token;
use librespot::core::mercury::MercuryError;
use librespot::core::session::Session; use librespot::core::session::Session;
use librespot::core::spotify_id::SpotifyId; use librespot::core::spotify_id::SpotifyId;
use librespot::playback::config::PlayerConfig; use librespot::playback::config::PlayerConfig;
@@ -31,6 +31,7 @@ use std::sync::RwLock;
use std::thread; use std::thread;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use config;
use events::{Event, EventManager}; use events::{Event, EventManager};
use track::Track; use track::Track;
@@ -39,6 +40,7 @@ enum WorkerCommand {
Play, Play,
Pause, Pause,
Stop, Stop,
RequestToken(oneshot::Sender<Token>),
} }
#[derive(Clone, PartialEq)] #[derive(Clone, PartialEq)]
@@ -51,7 +53,7 @@ pub enum PlayerEvent {
pub struct Spotify { pub struct Spotify {
status: RwLock<PlayerEvent>, status: RwLock<PlayerEvent>,
pub api: SpotifyAPI, api: RwLock<SpotifyAPI>,
elapsed: RwLock<Option<Duration>>, elapsed: RwLock<Option<Duration>>,
since: RwLock<Option<SystemTime>>, since: RwLock<Option<SystemTime>>,
channel: mpsc::UnboundedSender<WorkerCommand>, channel: mpsc::UnboundedSender<WorkerCommand>,
@@ -61,9 +63,11 @@ pub struct Spotify {
struct Worker { struct Worker {
events: EventManager, events: EventManager,
commands: mpsc::UnboundedReceiver<WorkerCommand>, commands: mpsc::UnboundedReceiver<WorkerCommand>,
session: Session,
player: Player, player: Player,
play_task: Box<futures::Future<Item = (), Error = oneshot::Canceled>>, play_task: Box<futures::Future<Item = (), Error = oneshot::Canceled>>,
refresh_task: Box<futures::Stream<Item = (), Error = tokio_timer::Error>>, refresh_task: Box<futures::Stream<Item = (), Error = tokio_timer::Error>>,
token_task: Box<futures::Future<Item = (), Error = MercuryError>>,
active: bool, active: bool,
} }
@@ -71,14 +75,17 @@ impl Worker {
fn new( fn new(
events: EventManager, events: EventManager,
commands: mpsc::UnboundedReceiver<WorkerCommand>, commands: mpsc::UnboundedReceiver<WorkerCommand>,
session: Session,
player: Player, player: Player,
) -> Worker { ) -> Worker {
Worker { Worker {
events: events, events: events,
commands: commands, commands: commands,
player: player, player: player,
session: session,
play_task: Box::new(futures::empty()), play_task: Box::new(futures::empty()),
refresh_task: Box::new(futures::stream::empty()), refresh_task: Box::new(futures::stream::empty()),
token_task: Box::new(futures::empty()),
active: false, active: false,
} }
} }
@@ -128,6 +135,10 @@ impl futures::Future for Worker {
self.events.send(Event::Player(PlayerEvent::Stopped)); self.events.send(Event::Player(PlayerEvent::Stopped));
self.active = false; self.active = false;
} }
WorkerCommand::RequestToken(sender) => {
self.token_task = Spotify::get_token(&self.session, sender);
progress = true;
}
} }
} }
match self.play_task.poll() { match self.play_task.poll() {
@@ -154,6 +165,16 @@ impl futures::Future for Worker {
} }
_ => (), _ => (),
} }
match self.token_task.poll() {
Ok(Async::Ready(_)) => {
info!("token updated!");
self.token_task = Box::new(futures::empty())
}
Ok(Async::NotReady) => debug!("waiting for token"),
Err(e) => {
error!("could not generate token: {:?}", e);
}
}
if !progress { if !progress {
return Ok(Async::NotReady); return Ok(Async::NotReady);
@@ -163,8 +184,7 @@ impl futures::Future for Worker {
} }
impl Spotify { impl Spotify {
pub fn new(events: EventManager, user: String, password: String, client_id: String) -> Spotify { pub fn new(events: EventManager, user: String, password: String) -> Spotify {
let session_config = SessionConfig::default();
let player_config = PlayerConfig { let player_config = PlayerConfig {
bitrate: Bitrate::Bitrate320, bitrate: Bitrate::Bitrate320,
normalisation: false, normalisation: false,
@@ -173,62 +193,75 @@ impl Spotify {
let credentials = Credentials::with_password(user.clone(), password.clone()); let credentials = Credentials::with_password(user.clone(), password.clone());
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded();
let (p, c) = oneshot::channel();
{ {
let events = events.clone(); let events = events.clone();
thread::spawn(move || { thread::spawn(move || Self::worker(events, rx, player_config, credentials));
Self::worker(
events,
rx,
p,
session_config,
player_config,
credentials,
client_id,
)
});
} }
let token = c.wait().unwrap(); let spotify = Spotify {
debug!("token received: {:?}", token);
let api = SpotifyAPI::default().access_token(&token.access_token);
Spotify {
status: RwLock::new(PlayerEvent::Stopped), status: RwLock::new(PlayerEvent::Stopped),
api: api, api: RwLock::new(SpotifyAPI::default()),
elapsed: RwLock::new(None), elapsed: RwLock::new(None),
since: RwLock::new(None), since: RwLock::new(None),
channel: tx, channel: tx,
user: user, user: user,
} };
// acquire token for web api usage
spotify.refresh_token();
spotify
}
fn create_session(core: &mut Core, credentials: Credentials) -> Session {
let session_config = SessionConfig::default();
let handle = core.handle();
core.run(Session::connect(session_config, credentials, None, handle))
.ok()
.unwrap()
}
fn get_token(
session: &Session,
sender: oneshot::Sender<Token>,
) -> Box<Future<Item = (), Error = MercuryError>> {
let client_id = config::CLIENT_ID;
let scopes = "user-read-private,playlist-read-private,playlist-read-collaborative,playlist-modify-public,playlist-modify-private,user-follow-modify,user-follow-read,user-library-read,user-library-modify,user-top-read,user-read-recently-played";
let url = format!(
"hm://keymaster/token/authenticated?client_id={}&scope={}",
client_id, scopes
);
Box::new(
session
.mercury()
.get(url)
.map(move |response| {
let data = response.payload.first().expect("Empty payload");
let data = String::from_utf8(data.clone()).unwrap();
let token: Token = serde_json::from_str(&data).unwrap();
info!("new token received: {:?}", token);
token
})
.map(|token| sender.send(token).unwrap()),
)
} }
fn worker( fn worker(
events: EventManager, events: EventManager,
commands: mpsc::UnboundedReceiver<WorkerCommand>, commands: mpsc::UnboundedReceiver<WorkerCommand>,
token_channel: oneshot::Sender<Token>,
session_config: SessionConfig,
player_config: PlayerConfig, player_config: PlayerConfig,
credentials: Credentials, credentials: Credentials,
client_id: String,
) { ) {
let mut core = Core::new().unwrap(); let mut core = Core::new().unwrap();
let handle = core.handle();
let session = core let session = Self::create_session(&mut core, credentials);
.run(Session::connect(session_config, credentials, None, handle))
.ok()
.unwrap();
let scopes = "user-read-private,playlist-read-private,playlist-read-collaborative,playlist-modify-public,playlist-modify-private,user-follow-modify,user-follow-read,user-library-read,user-library-modify,user-top-read,user-read-recently-played";
let token = core.run(get_token(&session, &client_id, &scopes)).unwrap();
token_channel.send(token).unwrap();
let backend = audio_backend::find(None).unwrap(); let backend = audio_backend::find(None).unwrap();
let (player, _eventchannel) = let (player, _eventchannel) =
Player::new(player_config, session, None, move || (backend)(None)); Player::new(player_config, session.clone(), None, move || {
(backend)(None)
});
let worker = Worker::new(events, commands, player); let worker = Worker::new(events, commands, session, player);
debug!("worker thread ready."); debug!("worker thread ready.");
core.run(worker).unwrap(); core.run(worker).unwrap();
debug!("worker thread finished."); debug!("worker thread finished.");
@@ -282,22 +315,48 @@ impl Spotify {
(*since).clone() (*since).clone()
} }
fn refresh_token(&self) {
let (token_tx, token_rx) = oneshot::channel();
self.channel
.unbounded_send(WorkerCommand::RequestToken(token_tx))
.unwrap();
let token = token_rx.wait().unwrap();
// update token used by web api calls
self.api.write().expect("can't writelock api").access_token = Some(token.access_token);
}
/// retries once when rate limits are hit /// retries once when rate limits are hit
fn api_with_retry<F, R>(&self, cb: F) -> Option<R> fn api_with_retry<F, R>(&self, cb: F) -> Option<R>
where where
F: Fn(&SpotifyAPI) -> Result<R, Error>, F: Fn(&SpotifyAPI) -> Result<R, Error>,
{ {
match cb(&self.api) { let result = {
let api = self.api.read().expect("can't read api");
cb(&api)
};
match result {
Ok(v) => Some(v), Ok(v) => Some(v),
Err(e) => { Err(e) => {
debug!("api error: {:?}", e); debug!("api error: {:?}", e);
if let Ok(apierror) = e.downcast::<ApiError>() { if let Ok(apierror) = e.downcast::<ApiError>() {
if let ApiError::RateLimited(d) = apierror { match apierror {
debug!("rate limit hit. waiting {:?} seconds", d); ApiError::RateLimited(d) => {
thread::sleep(Duration::from_secs(d.unwrap_or(0) as u64)); debug!("rate limit hit. waiting {:?} seconds", d);
cb(&self.api).ok() thread::sleep(Duration::from_secs(d.unwrap_or(0) as u64));
} else { let api = self.api.read().expect("can't read api");
None cb(&api).ok()
}
ApiError::Unauthorized => {
debug!("token unauthorized. trying refresh..");
self.refresh_token();
let api = self.api.read().expect("can't read api");
cb(&api).ok()
}
e => {
error!("unhandled api error: {}", e);
None
}
} }
} else { } else {
None None
@@ -324,8 +383,9 @@ impl Spotify {
limit: u32, limit: u32,
offset: u32, offset: u32,
) -> Option<Page<PlaylistTrack>> { ) -> Option<Page<PlaylistTrack>> {
let user = self.user.clone();
self.api_with_retry(|api| { self.api_with_retry(|api| {
api.user_playlist_tracks(&self.user, playlist_id, None, limit, offset, None) api.user_playlist_tracks(&user, playlist_id, None, limit, offset, None)
}) })
} }