Update to librespot 0.2.0 and Tokio v1

This also brings some much need async/.await rewrites
This commit is contained in:
Henrik Friedrichsen
2021-05-07 22:42:08 +02:00
parent 71d42d489e
commit f6a895e160
6 changed files with 736 additions and 861 deletions

1277
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -24,24 +24,24 @@ fern = "0.6"
futures = { version = "0.3", features = ["compat"] } futures = { version = "0.3", features = ["compat"] }
futures_01 = { version = "0.1", package = "futures" } futures_01 = { version = "0.1", package = "futures" }
lazy_static = "1.3.0" lazy_static = "1.3.0"
librespot-core = "0.1.6" librespot-core = "0.2.0"
librespot-playback = "0.1.6" librespot-playback = "0.2.0"
librespot-protocol = "0.1.6" librespot-protocol = "0.2.0"
log = "0.4.13" log = "0.4.13"
notify-rust = { version = "4", optional = true } notify-rust = { version = "4", optional = true }
rspotify = { version = "0.10.0", features = ["blocking"] } rspotify = { version = "0.10.0", features = ["blocking"] }
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
tokio = { version = "1", features = ["rt-multi-thread", "sync", "time"] }
tokio-stream = "0.1.5"
toml = "0.5" toml = "0.5"
tokio-core = "0.1"
tokio-timer = "0.2"
unicode-width = "0.1.8" unicode-width = "0.1.8"
dbus = { version = "0.9.2", optional = true } dbus = { version = "0.9.2", optional = true }
dbus-tree = { version = "0.9.0", optional = true } dbus-tree = { version = "0.9.0", optional = true }
rand = "0.8" rand = "0.8"
webbrowser = "0.5" webbrowser = "0.5"
clipboard = { version = "0.5", optional = true } clipboard = { version = "0.5", optional = true }
url = "1.7" url = "2.2"
strum = "0.20.0" strum = "0.20.0"
strum_macros = "0.20.1" strum_macros = "0.20.1"
regex = "1" regex = "1"

View File

@@ -95,7 +95,8 @@ struct UserDataInner {
pub cmd: CommandManager, pub cmd: CommandManager,
} }
fn main() { #[tokio::main]
async fn main() {
let backends = { let backends = {
let backends: Vec<&str> = audio_backend::BACKENDS.iter().map(|b| b.0).collect(); let backends: Vec<&str> = audio_backend::BACKENDS.iter().map(|b| b.0).collect();
format!("Audio backends: {}", backends.join(", ")) format!("Audio backends: {}", backends.join(", "))
@@ -139,7 +140,12 @@ fn main() {
// otherwise the error message will not be seen by a user // otherwise the error message will not be seen by a user
let cfg: Arc<crate::config::Config> = Arc::new(Config::new()); let cfg: Arc<crate::config::Config> = Arc::new(Config::new());
let cache = Cache::new(config::cache_path("librespot"), true); let cache = Cache::new(
Some(config::cache_path("librespot")),
Some(config::cache_path("librespot").join("files")),
None,
)
.expect("Could not create librespot cache");
let mut credentials = { let mut credentials = {
let cached_credentials = cache.credentials(); let cached_credentials = cache.credentials();
match cached_credentials { match cached_credentials {
@@ -152,10 +158,7 @@ fn main() {
}; };
while let Err(error) = spotify::Spotify::test_credentials(credentials.clone()) { while let Err(error) = spotify::Spotify::test_credentials(credentials.clone()) {
let error_msg = match error.get_ref() { let error_msg = format!("{}", error);
Some(inner) => inner.to_string(),
None => error.to_string(),
};
credentials = credentials_prompt(Some(error_msg)); credentials = credentials_prompt(Some(error_msg));
} }

View File

@@ -103,10 +103,8 @@ impl Queue {
} }
pub fn get_current(&self) -> Option<Playable> { pub fn get_current(&self) -> Option<Playable> {
match self.get_current_index() { self.get_current_index()
Some(index) => Some(self.queue.read().unwrap()[index].clone()), .map(|index| self.queue.read().unwrap()[index].clone())
None => None,
}
} }
pub fn get_current_index(&self) -> Option<usize> { pub fn get_current_index(&self) -> Option<usize> {

View File

@@ -1,9 +1,8 @@
use librespot_core::authentication::Credentials; use librespot_core::authentication::Credentials;
use librespot_core::cache::Cache; use librespot_core::cache::Cache;
use librespot_core::config::SessionConfig; use librespot_core::config::SessionConfig;
use librespot_core::keymaster::Token;
use librespot_core::mercury::MercuryError;
use librespot_core::session::Session; use librespot_core::session::Session;
use librespot_core::session::SessionError;
use librespot_playback::config::PlayerConfig; use librespot_playback::config::PlayerConfig;
use log::{debug, error, info}; use log::{debug, error, info};
@@ -26,22 +25,16 @@ use serde_json::{json, Map};
use failure::Error; use failure::Error;
use futures_01::future::Future as v01_Future;
use futures::channel::mpsc;
use futures::channel::oneshot; use futures::channel::oneshot;
use futures::compat::Future01CompatExt; use tokio::sync::mpsc;
use futures::Future;
use tokio_core::reactor::Core;
use url::Url; use url::Url;
use std::pin::Pin; use std::env;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread; use std::thread;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use std::{env, io};
use crate::artist::Artist; use crate::artist::Artist;
use crate::config; use crate::config;
@@ -117,7 +110,7 @@ impl Spotify {
} }
pub fn start_worker(&self, user_tx: Option<oneshot::Sender<String>>) { pub fn start_worker(&self, user_tx: Option<oneshot::Sender<String>>) {
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded_channel();
*self *self
.channel .channel
.write() .write()
@@ -127,15 +120,9 @@ impl Spotify {
let events = self.events.clone(); let events = self.events.clone();
let volume = self.volume(); let volume = self.volume();
let credentials = self.credentials.clone(); let credentials = self.credentials.clone();
thread::spawn(move || { let handle = tokio::runtime::Handle::current();
Self::worker( handle.spawn(async move {
events, Self::worker(events, rx, cfg.clone(), credentials, user_tx, volume).await
Box::pin(rx),
cfg.clone(),
credentials,
user_tx,
volume,
)
}); });
} }
@@ -155,72 +142,37 @@ impl Spotify {
session_config session_config
} }
pub fn test_credentials(credentials: Credentials) -> Result<Session, std::io::Error> { pub fn test_credentials(credentials: Credentials) -> Result<Session, SessionError> {
let jh = thread::spawn(move || {
let mut core = Core::new().unwrap();
let config = Self::session_config(); let config = Self::session_config();
let handle = core.handle(); // let rt = Runtime::new().unwrap();
let handle = tokio::runtime::Handle::current();
core.run(Session::connect(config, credentials, None, handle)) let jh = handle.spawn(async { Session::connect(config, credentials, None).await });
}); futures::executor::block_on(jh).unwrap()
match jh.join() {
Ok(session) => session,
Err(e) => Err(io::Error::new(
io::ErrorKind::Other,
e.downcast_ref::<String>()
.unwrap_or(&"N/A".to_string())
.to_string(),
)),
}
} }
fn create_session(core: &mut Core, cfg: &config::Config, credentials: Credentials) -> Session { async fn create_session(
cfg: &config::Config,
credentials: Credentials,
) -> Result<Session, SessionError> {
let session_config = Self::session_config(); let session_config = Self::session_config();
let audio_cache_path = match cfg.values().audio_cache.unwrap_or(true) {
true => Some(config::cache_path("librespot").join("files")),
false => None,
};
let cache = Cache::new( let cache = Cache::new(
config::cache_path("librespot"), Some(config::cache_path("librespot")),
cfg.values().audio_cache.unwrap_or(true), audio_cache_path,
); None,
let handle = core.handle(); )
.expect("Could not create cache");
debug!("opening spotify session"); debug!("opening spotify session");
println!("Connecting to Spotify.."); println!("Connecting to Spotify..");
core.run(Session::connect( Session::connect(session_config, credentials, Some(cache)).await
session_config,
credentials,
Some(cache),
handle,
))
.expect("could not open spotify session")
} }
pub(crate) fn get_token( async fn worker(
session: &Session,
sender: oneshot::Sender<Token>,
) -> Pin<Box<dyn Future<Output = Result<(), MercuryError>>>> {
let client_id = config::CLIENT_ID;
let scopes = "user-read-private,playlist-read-private,playlist-read-collaborative,playlist-modify-public,playlist-modify-private,user-follow-modify,user-follow-read,user-library-read,user-library-modify,user-top-read,user-read-recently-played";
let url = format!(
"hm://keymaster/token/authenticated?client_id={}&scope={}",
client_id, scopes
);
Box::pin(
session
.mercury()
.get(url)
.map(move |response| {
let data = response.payload.first().expect("Empty payload");
let data = String::from_utf8(data.clone()).unwrap();
let token: Token = serde_json::from_str(&data).unwrap();
info!("new token received: {:?}", token);
token
})
.map(|token| sender.send(token).unwrap())
.compat(),
)
}
fn worker(
events: EventManager, events: EventManager,
commands: Pin<Box<mpsc::UnboundedReceiver<WorkerCommand>>>, commands: mpsc::UnboundedReceiver<WorkerCommand>,
cfg: Arc<config::Config>, cfg: Arc<config::Config>,
credentials: Credentials, credentials: Credentials,
user_tx: Option<oneshot::Sender<String>>, user_tx: Option<oneshot::Sender<String>>,
@@ -237,11 +189,12 @@ impl Spotify {
bitrate: bitrate.unwrap_or(Bitrate::Bitrate320), bitrate: bitrate.unwrap_or(Bitrate::Bitrate320),
normalisation: cfg.values().volnorm.unwrap_or(false), normalisation: cfg.values().volnorm.unwrap_or(false),
normalisation_pregain: cfg.values().volnorm_pregain.unwrap_or(0.0), normalisation_pregain: cfg.values().volnorm_pregain.unwrap_or(0.0),
..Default::default()
}; };
let mut core = Core::new().unwrap(); let session = Self::create_session(&cfg, credentials)
.await
let session = Self::create_session(&mut core, &cfg, credentials); .expect("Could not create session");
user_tx.map(|tx| tx.send(session.username())); user_tx.map(|tx| tx.send(session.username()));
let create_mixer = librespot_playback::mixer::find(Some("softvol".to_owned())) let create_mixer = librespot_playback::mixer::find(Some("softvol".to_owned()))
@@ -250,14 +203,15 @@ impl Spotify {
mixer.set_volume(volume); mixer.set_volume(volume);
let backend = audio_backend::find(cfg.values().backend.clone()).unwrap(); let backend = audio_backend::find(cfg.values().backend.clone()).unwrap();
let audio_format: librespot_playback::config::AudioFormat = Default::default();
let (player, player_events) = Player::new( let (player, player_events) = Player::new(
player_config, player_config,
session.clone(), session.clone(),
mixer.get_audio_filter(), mixer.get_audio_filter(),
move || (backend)(cfg.values().backend_device.clone()), move || (backend)(cfg.values().backend_device.clone(), audio_format),
); );
let worker = Worker::new( let mut worker = Worker::new(
events.clone(), events.clone(),
player_events, player_events,
commands, commands,
@@ -266,11 +220,11 @@ impl Spotify {
mixer, mixer,
); );
debug!("worker thread ready."); debug!("worker thread ready.");
if core.run(futures::compat::Compat::new(worker)).is_err() { worker.run_loop().await;
error!("worker thread died, requesting restart"); error!("worker thread died, requesting restart");
events.send(Event::SessionDied) events.send(Event::SessionDied)
} }
}
pub fn get_current_status(&self) -> PlayerEvent { pub fn get_current_status(&self) -> PlayerEvent {
let status = self let status = self
@@ -789,9 +743,7 @@ impl Spotify {
fn send_worker(&self, cmd: WorkerCommand) { fn send_worker(&self, cmd: WorkerCommand) {
let channel = self.channel.read().expect("can't readlock worker channel"); let channel = self.channel.read().expect("can't readlock worker channel");
match channel.as_ref() { match channel.as_ref() {
Some(channel) => channel Some(channel) => channel.send(cmd).expect("can't send message to worker"),
.unbounded_send(cmd)
.expect("can't send message to worker"),
None => error!("no channel to worker available"), None => error!("no channel to worker available"),
} }
} }

View File

@@ -1,16 +1,11 @@
use crate::config;
use crate::events::{Event, EventManager}; use crate::events::{Event, EventManager};
use crate::playable::Playable; use crate::playable::Playable;
use crate::queue::QueueEvent; use crate::queue::QueueEvent;
use crate::spotify::{PlayerEvent, Spotify}; use crate::spotify::PlayerEvent;
use futures::channel::{mpsc, oneshot}; use futures::channel::oneshot;
use futures::compat::Stream01CompatExt; use futures::{Future, FutureExt};
use futures::task::{Context, Poll};
use futures::{Future, Stream};
use futures_01::stream::Stream as v01_Stream;
use futures_01::sync::mpsc::UnboundedReceiver;
use futures_01::Async as v01_Async;
use librespot_core::keymaster::Token; use librespot_core::keymaster::Token;
use librespot_core::mercury::MercuryError;
use librespot_core::session::Session; use librespot_core::session::Session;
use librespot_core::spotify_id::{SpotifyAudioType, SpotifyId}; use librespot_core::spotify_id::{SpotifyAudioType, SpotifyId};
use librespot_playback::mixer::Mixer; use librespot_playback::mixer::Mixer;
@@ -18,7 +13,12 @@ use librespot_playback::player::{Player, PlayerEvent as LibrespotPlayerEvent};
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use std::time::Duration; use std::time::Duration;
use std::{pin::Pin, time::SystemTime}; use std::{pin::Pin, time::SystemTime};
use tokio::sync::mpsc;
use tokio::time;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::StreamExt;
#[derive(Debug)]
pub(crate) enum WorkerCommand { pub(crate) enum WorkerCommand {
Load(Playable, bool, u32), Load(Playable, bool, u32),
Play, Play,
@@ -33,12 +33,11 @@ pub(crate) enum WorkerCommand {
pub struct Worker { pub struct Worker {
events: EventManager, events: EventManager,
player_events: UnboundedReceiver<LibrespotPlayerEvent>, player_events: UnboundedReceiverStream<LibrespotPlayerEvent>,
commands: Pin<Box<mpsc::UnboundedReceiver<WorkerCommand>>>, commands: UnboundedReceiverStream<WorkerCommand>,
session: Session, session: Session,
player: Player, player: Player,
refresh_task: Pin<Box<dyn Stream<Item = Result<(), tokio_timer::Error>>>>, token_task: Pin<Box<dyn Future<Output = ()> + Send>>,
token_task: Pin<Box<dyn Future<Output = Result<(), MercuryError>>>>,
active: bool, active: bool,
mixer: Box<dyn Mixer>, mixer: Box<dyn Mixer>,
} }
@@ -46,19 +45,18 @@ pub struct Worker {
impl Worker { impl Worker {
pub(crate) fn new( pub(crate) fn new(
events: EventManager, events: EventManager,
player_events: UnboundedReceiver<LibrespotPlayerEvent>, player_events: mpsc::UnboundedReceiver<LibrespotPlayerEvent>,
commands: Pin<Box<mpsc::UnboundedReceiver<WorkerCommand>>>, commands: mpsc::UnboundedReceiver<WorkerCommand>,
session: Session, session: Session,
player: Player, player: Player,
mixer: Box<dyn Mixer>, mixer: Box<dyn Mixer>,
) -> Worker { ) -> Worker {
Worker { Worker {
events, events,
player_events, player_events: UnboundedReceiverStream::new(player_events),
commands, commands: UnboundedReceiverStream::new(commands),
player, player,
session, session,
refresh_task: Box::pin(futures::stream::empty()),
token_task: Box::pin(futures::future::pending()), token_task: Box::pin(futures::future::pending()),
active: false, active: false,
mixer, mixer,
@@ -67,33 +65,43 @@ impl Worker {
} }
impl Worker { impl Worker {
fn create_refresh(&self) -> Pin<Box<dyn Stream<Item = Result<(), tokio_timer::Error>>>> { fn get_token(
let ev = self.events.clone(); &self,
let future = sender: oneshot::Sender<Token>,
tokio_timer::Interval::new_interval(Duration::from_millis(400)).map(move |_| { ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
ev.trigger(); let client_id = config::CLIENT_ID;
}); let scopes = "user-read-private,playlist-read-private,playlist-read-collaborative,playlist-modify-public,playlist-modify-private,user-follow-modify,user-follow-read,user-library-read,user-library-modify,user-top-read,user-read-recently-played";
Box::pin(future.compat()) let url = format!(
"hm://keymaster/token/authenticated?client_id={}&scope={}",
client_id, scopes
);
Box::pin(
self.session
.mercury()
.get(url)
.map(move |response| {
let payload = response
.as_ref()
.unwrap()
.payload
.first()
.expect("Empty payload");
let data = String::from_utf8(payload.clone()).unwrap();
let token: Token = serde_json::from_str(&data).unwrap();
info!("new token received: {:?}", token);
token
})
.map(|token| sender.send(token).unwrap()),
)
} }
}
impl futures::Future for Worker { pub async fn run_loop(&mut self) {
type Output = Result<(), ()>; let mut ui_refresh = time::interval(Duration::from_millis(400));
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> futures::task::Poll<Self::Output> {
loop { loop {
let mut progress = false; tokio::select! {
cmd = self.commands.next() => match cmd {
if self.session.is_invalid() { Some(WorkerCommand::Load(playable, start_playing, position_ms)) => {
self.events.send(Event::Player(PlayerEvent::Stopped));
return Poll::Ready(Result::Err(()));
}
if let Poll::Ready(Some(cmd)) = self.commands.as_mut().poll_next(cx) {
progress = true;
debug!("message received!");
match cmd {
WorkerCommand::Load(playable, start_playing, position_ms) => {
match SpotifyId::from_uri(&playable.uri()) { match SpotifyId::from_uri(&playable.uri()) {
Ok(id) => { Ok(id) => {
info!("player loading track: {:?}", id); info!("player loading track: {:?}", id);
@@ -110,46 +118,37 @@ impl futures::Future for Worker {
} }
} }
} }
WorkerCommand::Play => { Some(WorkerCommand::Play) => {
self.player.play(); self.player.play();
} }
WorkerCommand::Pause => { Some(WorkerCommand::Pause) => {
self.player.pause(); self.player.pause();
} }
WorkerCommand::Stop => { Some(WorkerCommand::Stop) => {
self.player.stop(); self.player.stop();
} }
WorkerCommand::Seek(pos) => { Some(WorkerCommand::Seek(pos)) => {
self.player.seek(pos); self.player.seek(pos);
} }
WorkerCommand::SetVolume(volume) => { Some(WorkerCommand::SetVolume(volume)) => {
self.mixer.set_volume(volume); self.mixer.set_volume(volume);
} }
WorkerCommand::RequestToken(sender) => { Some(WorkerCommand::RequestToken(sender)) => {
self.token_task = Spotify::get_token(&self.session, sender); self.token_task = self.get_token(sender);
progress = true;
} }
WorkerCommand::Preload(playable) => { Some(WorkerCommand::Preload(playable)) => {
if let Ok(id) = SpotifyId::from_uri(&playable.uri()) { if let Ok(id) = SpotifyId::from_uri(&playable.uri()) {
debug!("Preloading {:?}", id); debug!("Preloading {:?}", id);
self.player.preload(id); self.player.preload(id);
} }
} }
WorkerCommand::Shutdown => { Some(WorkerCommand::Shutdown) => {
self.player.stop(); self.player.stop();
self.session.shutdown(); self.session.shutdown();
} }
} None => info!("empty stream")
} },
event = self.player_events.next() => match event.unwrap() {
if let Ok(v01_Async::Ready(Some(event))) = self.player_events.poll() {
debug!("librespot player event: {:?}", event);
match event {
LibrespotPlayerEvent::Started { .. }
| LibrespotPlayerEvent::Loading { .. }
| LibrespotPlayerEvent::Changed { .. } => {
progress = true;
}
LibrespotPlayerEvent::Playing { LibrespotPlayerEvent::Playing {
play_request_id: _, play_request_id: _,
track_id: _, track_id: _,
@@ -160,7 +159,6 @@ impl futures::Future for Worker {
let playback_start = SystemTime::now() - position; let playback_start = SystemTime::now() - position;
self.events self.events
.send(Event::Player(PlayerEvent::Playing(playback_start))); .send(Event::Player(PlayerEvent::Playing(playback_start)));
self.refresh_task = self.create_refresh();
self.active = true; self.active = true;
} }
LibrespotPlayerEvent::Paused { LibrespotPlayerEvent::Paused {
@@ -180,38 +178,27 @@ impl futures::Future for Worker {
} }
LibrespotPlayerEvent::EndOfTrack { .. } => { LibrespotPlayerEvent::EndOfTrack { .. } => {
self.events.send(Event::Player(PlayerEvent::FinishedTrack)); self.events.send(Event::Player(PlayerEvent::FinishedTrack));
progress = true;
} }
LibrespotPlayerEvent::TimeToPreloadNextTrack { .. } => { LibrespotPlayerEvent::TimeToPreloadNextTrack { .. } => {
self.events self.events
.send(Event::Queue(QueueEvent::PreloadTrackRequest)); .send(Event::Queue(QueueEvent::PreloadTrackRequest));
} }
_ => {} _ => {}
},
_ = ui_refresh.tick() => {
if self.active {
self.events.trigger();
} }
} },
_ = self.token_task.as_mut() => {
if let Poll::Ready(Some(Ok(_))) = self.refresh_task.as_mut().poll_next(cx) {
self.refresh_task = if self.active {
progress = true;
self.create_refresh()
} else {
Box::pin(futures::stream::empty())
};
}
match self.token_task.as_mut().poll(cx) {
Poll::Ready(Ok(_)) => {
info!("token updated!"); info!("token updated!");
self.token_task = Box::pin(futures::future::pending()) self.token_task = Box::pin(futures::future::pending());
} }
Poll::Ready(Err(e)) => {
error!("could not generate token: {:?}", e);
}
_ => (),
} }
if !progress { if self.session.is_invalid() {
return Poll::Pending; self.events.send(Event::Player(PlayerEvent::Stopped));
break;
} }
} }
} }