Fix: update token in blocking task
* attempt to fix https://github.com/hrkfdn/ncspot/issues/1358 mpris runs inside `tokio::runtime.spawn`. At some point it hits `metadata_changed->WebApi::track->api_with_retry`, which gets 401 response and tries to update the token. This goes into `update_token`, which at a certain point calls *blocking* `token_rx.recv()`. Tokio runtime is not happy about that, as it uses the same thread for the `run_loop` in the worker. That's why `tokio::select!` doesn't work, we're essentially deadlocking, so the worker never pick up `RequestToken` command and `update_token` block forever. Here I'm changing `update_token` to wrap `recv` in `spawn_blocking`, which makes `update_token` return a `JoinHandle`, on which the caller has to await. This doesn't work nicely in case of mpris though, as it is not an async function, and I don't know how to make it async as it goes through the dbus `metadata_changed` interface. My best effort here is to do `and_then`, which seems to work, but I'm not confident that's the right approach. * fmt * move token_rx.recv inside spawn_blocking
This commit is contained in:
committed by
GitHub
parent
7dec5a2767
commit
5c71e2f4bf
@@ -83,7 +83,11 @@ impl Spotify {
|
|||||||
spotify.set_volume(volume);
|
spotify.set_volume(volume);
|
||||||
|
|
||||||
spotify.api.set_worker_channel(spotify.channel.clone());
|
spotify.api.set_worker_channel(spotify.channel.clone());
|
||||||
spotify.api.update_token();
|
ASYNC_RUNTIME
|
||||||
|
.get()
|
||||||
|
.unwrap()
|
||||||
|
.block_on(spotify.api.update_token().unwrap())
|
||||||
|
.ok();
|
||||||
|
|
||||||
spotify.api.set_user(user);
|
spotify.api.set_user(user);
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ use std::sync::{Arc, RwLock};
|
|||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use crate::application::ASYNC_RUNTIME;
|
||||||
use chrono::{DateTime, Duration as ChronoDuration, Utc};
|
use chrono::{DateTime, Duration as ChronoDuration, Utc};
|
||||||
use log::{debug, error, info};
|
use log::{debug, error, info};
|
||||||
use rspotify::http::HttpError;
|
use rspotify::http::HttpError;
|
||||||
@@ -15,6 +16,7 @@ use rspotify::model::{
|
|||||||
};
|
};
|
||||||
use rspotify::{prelude::*, AuthCodeSpotify, ClientError, ClientResult, Config, Token};
|
use rspotify::{prelude::*, AuthCodeSpotify, ClientError, ClientResult, Config, Token};
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
use crate::model::album::Album;
|
use crate::model::album::Album;
|
||||||
use crate::model::artist::Artist;
|
use crate::model::artist::Artist;
|
||||||
@@ -79,7 +81,7 @@ impl WebApi {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Update the authentication token when it expires.
|
/// Update the authentication token when it expires.
|
||||||
pub fn update_token(&self) {
|
pub fn update_token(&self) -> Option<JoinHandle<()>> {
|
||||||
{
|
{
|
||||||
let token_expiration = self.token_expiration.read().unwrap();
|
let token_expiration = self.token_expiration.read().unwrap();
|
||||||
let now = Utc::now();
|
let now = Utc::now();
|
||||||
@@ -87,7 +89,7 @@ impl WebApi {
|
|||||||
|
|
||||||
// token is valid for 5 more minutes, renewal is not necessary yet
|
// token is valid for 5 more minutes, renewal is not necessary yet
|
||||||
if delta.num_seconds() > 60 * 5 {
|
if delta.num_seconds() > 60 * 5 {
|
||||||
return;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Token will expire in {}, renewing", delta);
|
info!("Token will expire in {}, renewing", delta);
|
||||||
@@ -97,22 +99,26 @@ impl WebApi {
|
|||||||
let cmd = WorkerCommand::RequestToken(token_tx);
|
let cmd = WorkerCommand::RequestToken(token_tx);
|
||||||
if let Some(channel) = self.worker_channel.read().unwrap().as_ref() {
|
if let Some(channel) = self.worker_channel.read().unwrap().as_ref() {
|
||||||
channel.send(cmd).unwrap();
|
channel.send(cmd).unwrap();
|
||||||
let token_option = token_rx.recv().unwrap();
|
let api_token = self.api.token.clone();
|
||||||
if let Some(token) = token_option {
|
let api_token_expiration = self.token_expiration.clone();
|
||||||
*self.api.token.lock().unwrap() = Some(Token {
|
Some(ASYNC_RUNTIME.get().unwrap().spawn_blocking(move || {
|
||||||
access_token: token.access_token,
|
if let Some(token) = token_rx.recv().unwrap() {
|
||||||
expires_in: chrono::Duration::seconds(token.expires_in.into()),
|
*api_token.lock().unwrap() = Some(Token {
|
||||||
scopes: HashSet::from_iter(token.scope),
|
access_token: token.access_token,
|
||||||
expires_at: None,
|
expires_in: chrono::Duration::seconds(token.expires_in.into()),
|
||||||
refresh_token: None,
|
scopes: HashSet::from_iter(token.scope),
|
||||||
});
|
expires_at: None,
|
||||||
*self.token_expiration.write().unwrap() =
|
refresh_token: None,
|
||||||
Utc::now() + ChronoDuration::seconds(token.expires_in.into());
|
});
|
||||||
} else {
|
*api_token_expiration.write().unwrap() =
|
||||||
error!("Failed to update token");
|
Utc::now() + ChronoDuration::seconds(token.expires_in.into());
|
||||||
}
|
} else {
|
||||||
|
error!("Failed to update token");
|
||||||
|
}
|
||||||
|
}))
|
||||||
} else {
|
} else {
|
||||||
error!("worker channel is not set");
|
error!("worker channel is not set");
|
||||||
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -138,8 +144,8 @@ impl WebApi {
|
|||||||
}
|
}
|
||||||
401 => {
|
401 => {
|
||||||
debug!("token unauthorized. trying refresh..");
|
debug!("token unauthorized. trying refresh..");
|
||||||
self.update_token();
|
self.update_token()
|
||||||
api_call(&self.api).ok()
|
.and_then(move |_| api_call(&self.api).ok())
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
error!("unhandled api error: {:?}", response);
|
error!("unhandled api error: {:?}", response);
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use crate::application::ASYNC_RUNTIME;
|
||||||
use crate::command::Command;
|
use crate::command::Command;
|
||||||
use crate::commands::CommandResult;
|
use crate::commands::CommandResult;
|
||||||
use crate::events::EventManager;
|
use crate::events::EventManager;
|
||||||
@@ -391,7 +392,11 @@ impl SearchResultsView {
|
|||||||
// check if API token refresh is necessary before commencing multiple
|
// check if API token refresh is necessary before commencing multiple
|
||||||
// requests to avoid deadlock, as the parallel requests might
|
// requests to avoid deadlock, as the parallel requests might
|
||||||
// simultaneously try to refresh the token
|
// simultaneously try to refresh the token
|
||||||
self.spotify.api.update_token();
|
ASYNC_RUNTIME
|
||||||
|
.get()
|
||||||
|
.unwrap()
|
||||||
|
.block_on(self.spotify.api.update_token().unwrap())
|
||||||
|
.ok();
|
||||||
|
|
||||||
// is the query a Spotify URI?
|
// is the query a Spotify URI?
|
||||||
if let Ok(uritype) = query.parse() {
|
if let Ok(uritype) = query.parse() {
|
||||||
|
|||||||
Reference in New Issue
Block a user