migrate to futures 0.3

This commit is contained in:
Henrik Friedrichsen
2020-03-28 22:49:23 +01:00
parent c6b85f7d9b
commit 1b0ac4d5cc
4 changed files with 313 additions and 289 deletions

501
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -21,7 +21,8 @@ crossbeam-channel = "0.4"
directories = "2.0" directories = "2.0"
failure = "0.1" failure = "0.1"
fern = "0.5" fern = "0.5"
futures = "0.1" futures = { version = "0.3", features = ["compat"] }
futures_01 = { version = "0.1", package = "futures" }
lazy_static = "1.3.0" lazy_static = "1.3.0"
librespot-core = "0.1.1" librespot-core = "0.1.1"
librespot-playback = "0.1.1" librespot-playback = "0.1.1"
@@ -31,7 +32,6 @@ rspotify = "0.8"
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
toml = "0.5" toml = "0.5"
tokio = "0.1"
tokio-core = "0.1" tokio-core = "0.1"
tokio-timer = "0.2" tokio-timer = "0.2"
unicode-width = "0.1.5" unicode-width = "0.1.5"

View File

@@ -13,7 +13,6 @@ extern crate librespot_core;
extern crate librespot_playback; extern crate librespot_playback;
extern crate librespot_protocol; extern crate librespot_protocol;
extern crate rspotify; extern crate rspotify;
extern crate tokio;
extern crate tokio_core; extern crate tokio_core;
extern crate tokio_timer; extern crate tokio_timer;
extern crate unicode_width; extern crate unicode_width;

View File

@@ -26,17 +26,26 @@ use rspotify::spotify::model::user::PrivateUser;
use failure::Error; use failure::Error;
use futures; use futures_01::future::Future as v01_Future;
use futures::sync::mpsc; use futures_01::stream::Stream as v01_Stream;
use futures::sync::oneshot; use futures_01::sync::oneshot::Canceled;
use futures::Async;
use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::compat::Future01CompatExt;
use futures::compat::Stream01CompatExt;
use futures::task::Context;
use futures::Future; use futures::Future;
use futures::Stream; use futures::Stream;
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use tokio_timer; use tokio_timer;
use url::Url; use url::Url;
use core::task::Poll;
use std::env; use std::env;
use std::pin::Pin;
use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::RwLock; use std::sync::RwLock;
use std::thread; use std::thread;
@@ -83,12 +92,12 @@ pub struct Spotify {
struct Worker { struct Worker {
events: EventManager, events: EventManager,
commands: mpsc::UnboundedReceiver<WorkerCommand>, commands: Pin<Box<mpsc::UnboundedReceiver<WorkerCommand>>>,
session: Session, session: Session,
player: Player, player: Player,
play_task: Box<dyn futures::Future<Item = (), Error = oneshot::Canceled>>, play_task: Pin<Box<dyn Future<Output = Result<(), Canceled>>>>,
refresh_task: Box<dyn futures::Stream<Item = (), Error = tokio_timer::Error>>, refresh_task: Pin<Box<dyn Stream<Item = Result<(), tokio_timer::Error>>>>,
token_task: Box<dyn futures::Future<Item = (), Error = MercuryError>>, token_task: Pin<Box<dyn Future<Output = Result<(), MercuryError>>>>,
active: bool, active: bool,
mixer: Box<dyn Mixer>, mixer: Box<dyn Mixer>,
} }
@@ -96,7 +105,7 @@ struct Worker {
impl Worker { impl Worker {
fn new( fn new(
events: EventManager, events: EventManager,
commands: mpsc::UnboundedReceiver<WorkerCommand>, commands: Pin<Box<mpsc::UnboundedReceiver<WorkerCommand>>>,
session: Session, session: Session,
player: Player, player: Player,
mixer: Box<dyn Mixer>, mixer: Box<dyn Mixer>,
@@ -106,9 +115,9 @@ impl Worker {
commands, commands,
player, player,
session, session,
play_task: Box::new(futures::empty()), play_task: Box::pin(futures::future::pending()),
refresh_task: Box::new(futures::stream::empty()), refresh_task: Box::pin(futures::stream::empty()),
token_task: Box::new(futures::empty()), token_task: Box::pin(futures::future::pending()),
active: false, active: false,
mixer, mixer,
} }
@@ -116,32 +125,31 @@ impl Worker {
} }
impl Worker { impl Worker {
fn create_refresh(&self) -> Box<dyn futures::Stream<Item = (), Error = tokio_timer::Error>> { fn create_refresh(&self) -> Pin<Box<dyn Stream<Item = Result<(), tokio_timer::Error>>>> {
let ev = self.events.clone(); let ev = self.events.clone();
let future = let future =
tokio_timer::Interval::new_interval(Duration::from_millis(400)).map(move |_| { tokio_timer::Interval::new_interval(Duration::from_millis(400)).map(move |_| {
ev.trigger(); ev.trigger();
}); });
Box::new(future) Box::pin(future.compat())
} }
} }
impl futures::Future for Worker { impl futures::Future for Worker {
type Item = (); type Output = Result<(), ()>;
type Error = ();
fn poll(&mut self) -> futures::Poll<(), ()> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> futures::task::Poll<Self::Output> {
loop { loop {
let mut progress = false; let mut progress = false;
if let Async::Ready(Some(cmd)) = self.commands.poll().unwrap() { if let Poll::Ready(Some(cmd)) = self.commands.as_mut().poll_next(cx) {
progress = true; progress = true;
debug!("message received!"); debug!("message received!");
match cmd { match cmd {
WorkerCommand::Load(track) => { WorkerCommand::Load(track) => {
if let Some(track_id) = &track.id { if let Some(track_id) = &track.id {
let id = SpotifyId::from_base62(track_id).expect("could not parse id"); let id = SpotifyId::from_base62(track_id).expect("could not parse id");
self.play_task = Box::new(self.player.load(id, false, 0)); self.play_task = Box::pin(self.player.load(id, false, 0).compat());
info!("player loading track: {:?}", track); info!("player loading track: {:?}", track);
} else { } else {
self.events.send(Event::Player(PlayerEvent::FinishedTrack)); self.events.send(Event::Player(PlayerEvent::FinishedTrack));
@@ -175,39 +183,39 @@ impl futures::Future for Worker {
} }
} }
} }
match self.play_task.poll() { match self.play_task.as_mut().poll(cx) {
Ok(Async::Ready(())) => { Poll::Ready(Ok(())) => {
debug!("end of track!"); debug!("end of track!");
progress = true; progress = true;
self.events.send(Event::Player(PlayerEvent::FinishedTrack)); self.events.send(Event::Player(PlayerEvent::FinishedTrack));
} }
Ok(Async::NotReady) => (), Poll::Ready(Err(Canceled)) => {
Err(oneshot::Canceled) => {
debug!("player task is over!"); debug!("player task is over!");
self.play_task = Box::new(futures::empty()); self.play_task = Box::pin(futures::future::pending());
} }
Poll::Pending => (),
} }
if let Ok(Async::Ready(_)) = self.refresh_task.poll() { if let Poll::Ready(Some(Ok(_))) = self.refresh_task.as_mut().poll_next(cx) {
self.refresh_task = if self.active { self.refresh_task = if self.active {
progress = true; progress = true;
self.create_refresh() self.create_refresh()
} else { } else {
Box::new(futures::stream::empty()) Box::pin(futures::stream::empty())
}; };
} }
match self.token_task.poll() { match self.token_task.as_mut().poll(cx) {
Ok(Async::Ready(_)) => { Poll::Ready(Ok(_)) => {
info!("token updated!"); info!("token updated!");
self.token_task = Box::new(futures::empty()) self.token_task = Box::pin(futures::future::pending())
} }
Err(e) => { Poll::Ready(Err(e)) => {
error!("could not generate token: {:?}", e); error!("could not generate token: {:?}", e);
} }
_ => (), _ => (),
} }
if !progress { if !progress {
return Ok(Async::NotReady); return Poll::Pending;
} }
} }
} }
@@ -250,10 +258,19 @@ impl Spotify {
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded();
{ {
thread::spawn(move || { thread::spawn(move || {
Self::worker(events, rx, player_config, credentials, user_tx, volume) Self::worker(
events,
Box::pin(rx),
player_config,
credentials,
user_tx,
volume,
)
}); });
} }
let user = futures::executor::block_on(user_rx);
let spotify = Spotify { let spotify = Spotify {
status: RwLock::new(PlayerEvent::Stopped), status: RwLock::new(PlayerEvent::Stopped),
api: RwLock::new(SpotifyAPI::default()), api: RwLock::new(SpotifyAPI::default()),
@@ -261,7 +278,7 @@ impl Spotify {
since: RwLock::new(None), since: RwLock::new(None),
token_issued: RwLock::new(None), token_issued: RwLock::new(None),
channel: tx, channel: tx,
user: user_rx.wait().expect("error retrieving userid from worker"), user: user.expect("error retrieving userid from worker"),
volume: AtomicU16::new(volume), volume: AtomicU16::new(volume),
repeat, repeat,
shuffle, shuffle,
@@ -315,14 +332,14 @@ impl Spotify {
fn get_token( fn get_token(
session: &Session, session: &Session,
sender: oneshot::Sender<Token>, sender: oneshot::Sender<Token>,
) -> Box<dyn Future<Item = (), Error = MercuryError>> { ) -> Pin<Box<dyn Future<Output = Result<(), MercuryError>>>> {
let client_id = config::CLIENT_ID; let client_id = config::CLIENT_ID;
let scopes = "user-read-private,playlist-read-private,playlist-read-collaborative,playlist-modify-public,playlist-modify-private,user-follow-modify,user-follow-read,user-library-read,user-library-modify,user-top-read,user-read-recently-played"; let scopes = "user-read-private,playlist-read-private,playlist-read-collaborative,playlist-modify-public,playlist-modify-private,user-follow-modify,user-follow-read,user-library-read,user-library-modify,user-top-read,user-read-recently-played";
let url = format!( let url = format!(
"hm://keymaster/token/authenticated?client_id={}&scope={}", "hm://keymaster/token/authenticated?client_id={}&scope={}",
client_id, scopes client_id, scopes
); );
Box::new( Box::pin(
session session
.mercury() .mercury()
.get(url) .get(url)
@@ -333,13 +350,14 @@ impl Spotify {
info!("new token received: {:?}", token); info!("new token received: {:?}", token);
token token
}) })
.map(|token| sender.send(token).unwrap()), .map(|token| sender.send(token).unwrap())
.compat(),
) )
} }
fn worker( fn worker(
events: EventManager, events: EventManager,
commands: mpsc::UnboundedReceiver<WorkerCommand>, commands: Pin<Box<mpsc::UnboundedReceiver<WorkerCommand>>>,
player_config: PlayerConfig, player_config: PlayerConfig,
credentials: Credentials, credentials: Credentials,
user_tx: oneshot::Sender<String>, user_tx: oneshot::Sender<String>,
@@ -367,7 +385,7 @@ impl Spotify {
let worker = Worker::new(events, commands, session, player, mixer); let worker = Worker::new(events, commands, session, player, mixer);
debug!("worker thread ready."); debug!("worker thread ready.");
core.run(worker).unwrap(); core.run(futures::compat::Compat::new(worker)).unwrap();
debug!("worker thread finished."); debug!("worker thread finished.");
} }
@@ -433,7 +451,7 @@ impl Spotify {
self.channel self.channel
.unbounded_send(WorkerCommand::RequestToken(token_tx)) .unbounded_send(WorkerCommand::RequestToken(token_tx))
.unwrap(); .unwrap();
let token = token_rx.wait().unwrap(); let token = futures::executor::block_on(token_rx).unwrap();
// update token used by web api calls // update token used by web api calls
self.api.write().expect("can't writelock api").access_token = Some(token.access_token); self.api.write().expect("can't writelock api").access_token = Some(token.access_token);