Files
ncspot/src/ipc.rs
Thomas Frans 9c3c7f7c87 chore: Update to edition 2024
* chore(deps): `cargo update`

* chore: update to Rust edition 2024 and apply fixes

Update the Rust edition and apply changes required in the new edition.
Also update the Cargo manifests to reflect the edition change, and
ensure changes automatically apply to workspace members in the future.

* chore: format all code with `rustfmt`

The new Rust edition comes with some new formatting defaults, which need
to be applied since the edition was increased.

* style: change suggested matches back to `if let`

There has been a breaking change in Rust edition 2024 that changed the
behavior of `if let` statements slightly. The new behavior is more in
line with what users would expect, but could lead to problems in
existing code. The automatic edition update therefore changed such `if
let` statements to match statements instead. That lead to deeply nested
code which was hard to reason about.

This changes most of them back to regular `if let` chains, as the new
behavior shouldn't cause problems for these cases.

---------

Co-authored-by: Henrik Friedrichsen <henrik@affekt.org>
2025-03-11 09:06:39 +00:00

139 lines
4.4 KiB
Rust

use std::{io, path::PathBuf};
use futures::SinkExt;
use log::{debug, error, info};
use tokio::net::{UnixListener, UnixStream};
use tokio::runtime::Handle;
use tokio::sync::watch::{Receiver, Sender};
use tokio_stream::StreamExt;
use tokio_stream::wrappers::WatchStream;
use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec};
use crate::events::{Event, EventManager};
use crate::model::playable::Playable;
use crate::spotify::PlayerEvent;
pub struct IpcSocket {
tx: Sender<Status>,
path: PathBuf,
}
#[derive(Clone, Debug, Serialize)]
struct Status {
mode: PlayerEvent,
playable: Option<Playable>,
}
impl Drop for IpcSocket {
fn drop(&mut self) {
self.try_remove_socket();
}
}
impl IpcSocket {
pub fn new(handle: &Handle, path: PathBuf, ev: EventManager) -> io::Result<Self> {
let path = if path.exists() && Self::is_open_socket(&path) {
let mut new_path = path;
new_path.set_file_name(format!("ncspot.{}.sock", std::process::id()));
new_path
} else if path.exists() && !Self::is_open_socket(&path) {
std::fs::remove_file(&path)?;
path
} else {
path
};
info!("Creating IPC domain socket at {path:?}");
let status = Status {
mode: PlayerEvent::Stopped,
playable: None,
};
let (tx, rx) = tokio::sync::watch::channel(status);
let listener_path = path.clone();
handle.spawn(async move {
let listener =
UnixListener::bind(listener_path).expect("Could not create IPC domain socket");
Self::worker(listener, ev, rx.clone()).await;
});
Ok(Self { tx, path })
}
fn is_open_socket(path: &PathBuf) -> bool {
std::os::unix::net::UnixStream::connect(path).is_ok()
}
pub fn publish(&self, event: &PlayerEvent, playable: Option<Playable>) {
let status = Status {
mode: event.clone(),
playable,
};
self.tx.send(status).unwrap();
}
async fn worker(listener: UnixListener, ev: EventManager, tx: Receiver<Status>) {
loop {
match listener.accept().await {
Ok((stream, sockaddr)) => {
debug!("Connection from {:?}", sockaddr);
tokio::spawn(Self::stream_handler(
stream,
ev.clone(),
WatchStream::new(tx.clone()),
));
}
Err(e) => error!("Error accepting connection: {e}"),
}
}
}
async fn stream_handler(
mut stream: UnixStream,
ev: EventManager,
mut rx: WatchStream<Status>,
) -> Result<(), String> {
let (reader, writer) = stream.split();
let mut framed_reader = FramedRead::new(reader, LinesCodec::new());
let mut framed_writer = FramedWrite::new(writer, LinesCodec::new());
loop {
tokio::select! {
line = framed_reader.next() => {
match line {
Some(Ok(line)) => {
debug!("Received line: \"{line}\"");
ev.send(Event::IpcInput(line));
}
Some(Err(e)) => error!("Error reading line: {e}"),
None => {
debug!("Closing IPC connection");
return Ok(())
}
}
}
Some(status) = rx.next() => {
debug!("IPC Status update: {status:?}");
let status_str = serde_json::to_string(&status).map_err(|e| e.to_string())?;
framed_writer.send(status_str).await.map_err(|e| e.to_string())?;
}
else => {
error!("All streams are closed");
return Ok(())
}
}
}
}
/// Try to remove the IPC socket if there is one for this instance of `ncspot`. Don't do
/// anything if the socket has already been removed for some reason.
fn try_remove_socket(&mut self) {
if std::fs::remove_file(&self.path).is_ok() {
info!("removed socket at {:?}", self.path);
} else {
info!("socket already removed");
}
}
}