diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/client.rs | 31 | ||||
-rw-r--r-- | src/dbus/player.rs | 5 | ||||
-rw-r--r-- | src/dbus_server.rs | 190 | ||||
-rw-r--r-- | src/error.rs | 3 | ||||
-rw-r--r-- | src/lib.rs | 7 | ||||
-rw-r--r-- | src/player.rs | 44 | ||||
-rw-r--r-- | src/printer.rs | 201 | ||||
-rw-r--r-- | src/server.rs | 173 |
8 files changed, 482 insertions, 172 deletions
diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..1e38cce --- /dev/null +++ b/src/client.rs @@ -0,0 +1,31 @@ +use serde::{Deserialize, Serialize}; +use tokio::{io::AsyncWriteExt, net::UnixStream}; + +use crate::Error; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum ClientKind { + #[serde(rename = "mpris-icon")] + Icon, + #[serde(rename = "mpris-title")] + Title, + #[serde(rename = "mpris-prev")] + Prev, + #[serde(rename = "mpris-playPause")] + PlayPause, + #[serde(rename = "mpris-next")] + Next, + #[serde(rename = "mpris-volume")] + Volume, +} + +async fn client(kind: ClientKind) -> Result<(), Error> { + let stream = UnixStream::connect("/tmp/i3blocks-mpris.sock").await?; + let (mut reader, mut writer) = stream.into_split(); + + let buf = bincode::serialize(&kind)?; + writer.write_all(&buf).await?; + + todo!() +} + diff --git a/src/dbus/player.rs b/src/dbus/player.rs index 63a13ca..424d277 100644 --- a/src/dbus/player.rs +++ b/src/dbus/player.rs @@ -19,12 +19,15 @@ //! //! [Writing a client proxy]: https://dbus2.github.io/zbus/client.html //! [D-Bus standard interfaces]: https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces, +use serde::{Deserialize, Serialize}; use zbus::{ proxy, zvariant::{OwnedValue, Type}, }; -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Type)] +#[derive( + Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Type, Serialize, Deserialize, +)] pub enum PlaybackStatus { Playing, Paused, diff --git a/src/dbus_server.rs b/src/dbus_server.rs new file mode 100644 index 0000000..6f7fee4 --- /dev/null +++ b/src/dbus_server.rs @@ -0,0 +1,190 @@ +use futures_util::StreamExt; +use std::sync::Arc; + +use tokio::{ + io::{AsyncReadExt, BufReader}, + net::{UnixListener, UnixStream}, + sync::{mpsc::Sender, Mutex, Notify}, + task::JoinSet, +}; +use zbus::{ + fdo::{DBusProxy, NameOwnerChangedArgs}, + interface, + names::OwnedBusName, + zvariant::Optional, + Connection, +}; + +use crate::{ + client::ClientKind, + dbus::{ + media_player2::MediaPlayer2Proxy, + player::{PlaybackStatus, PlayerProxy}, + }, + i3bar::Click, + player::Event, + printer::Status, + Error, +}; + +pub struct MprisStatus { + player: Option<String>, + title: String, + can_go_previous: bool, + playback_status: PlaybackStatus, + can_go_next: bool, + volume: Option<String> + color: Option<String>, + background: Option<String>, +} + +#[interface( + name = "org.tobyvin.i3blocks.Mpris", + proxy( + default_path = "/org/i3blocks/Mpris", + default_service = "org.i3blocks.Mpris", + ) +)] +impl MprisStatus { + #[zbus(property)] + async fn title(&self) -> Optional<String> { + if self.player.is_none() { + return None.into(); + } + + Some(self.title.clone()).into() + } + + #[zbus(property)] + async fn can_go_previous(&self) -> Optional<String> { + if self.player.is_none() { + return None.into(); + } + self.can_go_previous.then_some(" ".into()).into() + } + + #[zbus(property)] + async fn playback_status(&self) -> Optional<String> { + if self.player.is_none() { + return None.into(); + } + + match self.playback_status { + PlaybackStatus::Playing => Some(" ".into()), + _ => Some(" ".into()), + } + .into() + } + + #[zbus(property)] + async fn can_go_next(&self) -> Optional<String> { + if self.player.is_none() { + return None.into(); + } + + self.can_go_next.then_some(" ".into()).into() + } +} + +impl MprisStatus { + async fn set_title(conn: Connection, value: String) -> Result<(), Error> { + let iface_ref = conn + .object_server() + .interface::<_, Self>("/org/i3blocks/Mpris") + .await?; + let mut iface = iface_ref.get_mut().await; + iface.title = value; + iface + .title_changed(iface_ref.signal_context()) + .await + .map_err(Into::into) + } + + async fn set_can_go_previous(conn: Connection, value: bool) -> Result<(), Error> { + let iface_ref = conn + .object_server() + .interface::<_, Self>("/org/i3blocks/Mpris") + .await?; + let mut iface = iface_ref.get_mut().await; + iface.can_go_previous = value; + iface + .can_go_previous_changed(iface_ref.signal_context()) + .await + .map_err(Into::into) + } + + async fn set_playback_status(conn: Connection, value: PlaybackStatus) -> Result<(), Error> { + let iface_ref = conn + .object_server() + .interface::<_, Self>("/org/i3blocks/Mpris") + .await?; + let mut iface = iface_ref.get_mut().await; + iface.playback_status = value; + iface + .playback_status_changed(iface_ref.signal_context()) + .await + .map_err(Into::into) + } + + async fn set_can_go_next(conn: Connection, value: bool) -> Result<(), Error> { + let iface_ref = conn + .object_server() + .interface::<_, Self>("/org/i3blocks/Mpris") + .await?; + let mut iface = iface_ref.get_mut().await; + iface.can_go_next = value; + iface + .can_go_next_changed(iface_ref.signal_context()) + .await + .map_err(Into::into) + } + + async fn set_volume(conn: Connection, value: f64) -> Result<(), Error> { + let iface_ref = conn + .object_server() + .interface::<_, Self>("/org/i3blocks/Mpris") + .await?; + let mut iface = iface_ref.get_mut().await; + + iface.volume = match (value * 100_f64) as u32 { + v @ 66.. => format!(" {v}% "), + v @ 33.. => format!(" {v}% "), + v @ 0.. => format!(" {v}% "), + }; + + iface + .volume_changed(iface_ref.signal_context()) + .await + .map_err(Into::into) + } +} + +//full_text.push_str(" "); +// +//if let Some(title) = self.title.as_ref() { +// full_text.push_str(title); +// full_text.push(' '); +//} +// +//if self.can_go_previous { +// full_text.push_str(" "); +//} +// +//if self.playback_status == PlaybackStatus::Playing { +// full_text.push_str(" "); +//} else { +// full_text.push_str(" "); +//} +// +//if self.can_go_next { +// full_text.push_str(" "); +//} +// +//if let Some(volume) = self.volume.map(|v| (v * 100_f64) as u32) { +// match volume { +// v @ 66.. => full_text.push_str(&format!(" {v}% ")), +// v @ 33.. => full_text.push_str(&format!(" {v}% ")), +// v @ 0.. => full_text.push_str(&format!(" {v}% ")), +// } +//} + diff --git a/src/error.rs b/src/error.rs index f0f204f..dd4a04a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -11,6 +11,9 @@ pub enum Error { #[error("Join error: {0}")] Join(#[from] tokio::task::JoinError), + #[error("Bincode error: {0}")] + Bincode(#[from] bincode::Error), + #[error("ZBus error: {0}")] ZBus(#[from] zbus::Error), @@ -5,6 +5,9 @@ pub mod components; pub mod dbus; pub mod error; pub mod i3bar; -pub mod mpris; +//pub mod mpris; +pub mod client; +pub mod dbus_server; pub mod player; -//pub mod printer; +pub mod printer; +pub mod server; diff --git a/src/player.rs b/src/player.rs index de4226d..8dd3a51 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,7 +1,7 @@ -use std::{collections::VecDeque, io::Write}; +use std::collections::VecDeque; use tokio::{ - sync::mpsc::{Receiver, Sender}, + sync::{mpsc::Receiver, watch::Sender}, task::{AbortHandle, JoinHandle, JoinSet}, }; use zbus::{names::OwnedBusName, Connection}; @@ -9,7 +9,7 @@ use zbus::{names::OwnedBusName, Connection}; use crate::{ components::Component, dbus::player::{PlaybackStatus, PlayerProxy}, - i3bar::Block, + printer::Status, Result, }; @@ -36,10 +36,10 @@ pub enum Event { Shift, } -pub async fn listener<T: Component>( +pub async fn listener( conn: Connection, mut rx: Receiver<Event>, - component: &'static T, + wtx: Sender<Status>, ) -> Result<()> { let mut players: VecDeque<OwnedBusName> = VecDeque::new(); let mut active: Option<OwnedBusName> = Default::default(); @@ -88,39 +88,7 @@ pub async fn listener<T: Component>( .build() .await?; - let (tx, mut rx): (Sender<Block>, _) = tokio::sync::mpsc::channel(128); - let conn = conn.clone(); - listener = Some(join_set.spawn(component.listen(proxy, tx.clone()))); - printer = Some(join_set.spawn(async move { - while let Some(mut block) = rx.recv().await { - let proxy = PlayerProxy::builder(&conn) - .destination(name.clone())? - .build() - .await?; - - match proxy.playback_status().await? { - PlaybackStatus::Playing => { - block.color = Some(BLACK.into()); - block.background = Some(CYAN.into()) - } - PlaybackStatus::Paused => { - block.color = Some(BLACK.into()); - block.background = Some(YELLOW.into()) - } - _ => {} - }; - - block.instance = name.as_str().to_owned(); - block.instance = name.as_str().to_owned(); - - let mut w = std::io::stdout().lock(); - let mut v = serde_json::to_vec(&block)?; - v.push(b'\n'); - w.write_all(&v)?; - w.flush()?; - } - Ok(()) - })); + listener = Some(join_set.spawn(crate::printer::printer(proxy, wtx.clone()))); } } } diff --git a/src/printer.rs b/src/printer.rs index db5a554..5a6b0ae 100644 --- a/src/printer.rs +++ b/src/printer.rs @@ -1,8 +1,9 @@ -use std::{collections::HashMap, io::Write, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use futures_util::stream::StreamExt; +use serde::{Deserialize, Serialize}; use tokio::{ - sync::{Mutex, Notify}, + sync::{watch::Sender, Mutex}, task::{JoinHandle, JoinSet}, }; use zbus::{proxy::PropertyStream, zvariant::OwnedValue}; @@ -21,226 +22,163 @@ const BLACK: &str = "#1d2021"; const YELLOW: &str = "#fabd2f"; const CYAN: &str = "#8ec07c"; -#[derive(Debug, Clone)] -pub enum Signal { - Print, -} - -pub async fn printer(proxy: PlayerProxy<'static>) -> Result<()> { +pub async fn printer( + proxy: PlayerProxy<'static>, + wtx: tokio::sync::watch::Sender<Status>, +) -> Result<()> { let mut join_set: JoinSet<Result<()>> = JoinSet::new(); let mut rotator = None; - let notify = Arc::new(Notify::new()); - - let status = Arc::new(Mutex::new(Status { - title: None, + let status = Status { can_go_previous: proxy.can_go_previous().await.unwrap_or_default(), playback_status: proxy.playback_status().await.unwrap_or_default(), can_go_next: proxy.can_go_next().await.unwrap_or_default(), volume: proxy.volume().await.ok(), - indexes: Vec::new(), - })); + ..Default::default() + }; + + wtx.send_modify(|s| *s = status); process_metadata( - notify.clone(), - status.clone(), + wtx.clone(), proxy.metadata().await?, &mut rotator, + &mut None, ) .await; - notify.notify_one(); - join_set.spawn(metadata( - notify.clone(), + wtx.clone(), proxy.receive_metadata_changed().await, - status.clone(), rotator, )); join_set.spawn(can_go_previous( - notify.clone(), + wtx.clone(), proxy.receive_can_go_previous_changed().await, - status.clone(), )); join_set.spawn(playback_state( - notify.clone(), + wtx.clone(), proxy.receive_playback_status_changed().await, - status.clone(), )); join_set.spawn(can_go_next( - notify.clone(), + wtx.clone(), proxy.receive_can_go_next_changed().await, - status.clone(), )); - join_set.spawn(volume( - notify.clone(), - proxy.receive_volume_changed().await, - status.clone(), - )); + join_set.spawn(volume(wtx.clone(), proxy.receive_volume_changed().await)); - loop { - notify.notified().await; - let mut status = status.lock().await; - - //let (color, background) = match proxy.playback_status().await.ok() { - // Some(PlaybackStatus::Playing) => (Some(BLACK.into()), Some(CYAN.into())), - // Some(PlaybackStatus::Paused) => (Some(BLACK.into()), Some(YELLOW.into())), - // Some(PlaybackStatus::Stopped) => (None, None), - //}; - - //let s = match instance { - // Instance::Icon => " ", - // Instance::Title => &status.title, - // Instance::Prev => " ", - // Instance::Play => " ", - // Instance::Pause => " ", - // Instance::Next => " ", - // Instance::Volume => { - // if let Some(volume) = self.volume.map(|v| (v * 100_f64) as u32) { - // match volume { - // v @ 66.. => &format!(" {v}% "), - // v @ 33.. => &format!(" {v}% "), - // v @ 0.. => &format!(" {v}% "), - // } - // } - // } - //}; - // - //let block = Block { - // full_text, - // ..Default::default() - //}; - - let mut w = std::io::stdout().lock(); - let mut v = serde_json::to_vec(&status.build()).unwrap(); - v.push(b'\n'); - w.write_all(&v)?; - w.flush()?; + while let Some(res) = join_set.join_next().await { + res??; } + + Ok(()) } async fn process_metadata( - notify: Arc<Notify>, - status_lock: StatusLock, + wtx: Sender<Status>, metadata: HashMap<String, OwnedValue>, rotator: &mut Option<JoinHandle<Result<()>>>, -) -> Option<()> { - let title: String = metadata - .get("xesam:title")? - .try_to_owned() - .ok()? - .try_into() - .ok()?; - - if (status_lock.lock().await) - .title - .as_ref() - .is_some_and(|s| *s == title) - { - return None; + old_title: &mut Option<String>, +) { + let title: Option<String> = metadata + .get("xesam:title") + .and_then(|o| o.try_to_owned().ok()) + .and_then(|o| o.try_into().ok()); + + if *old_title == title { + return; } - if let Some(h) = rotator.take() { - h.abort() + *old_title = title.clone(); + let Some(title) = title else { + wtx.send_modify(|s| { + s.title = None; + }); + return; }; - let status = status_lock.clone(); - if title.len() > 10 { + if let Some(h) = rotator.take() { + h.abort() + }; + + let wtx = wtx.clone(); *rotator = Some(tokio::spawn(async move { let mut interval = tokio::time::interval(TICK_RATE); let mut chars = title.chars().collect::<Vec<char>>(); chars.push(' '); loop { interval.tick().await; - let mut status = status.lock().await; - status.title = Some(chars[0..10].iter().collect()); - notify.notify_one(); + wtx.send_modify(|s| s.title = Some(chars[0..10].iter().collect())); chars.rotate_left(1); } })); } else { - let mut status = status.lock().await; - status.title = Some(title); - notify.notify_one(); + wtx.send_modify(|s| s.title = Some(title)); } - - Some(()) } async fn metadata( - notify: Arc<Notify>, + wtx: Sender<Status>, mut stream: PropertyStream<'_, HashMap<String, OwnedValue>>, - status_lock: StatusLock, mut rotator: Option<JoinHandle<Result<()>>>, ) -> Result<()> { + let mut old_title = None; while let Some(signal) = stream.next().await { if let Ok(metadata) = signal.get().await { - process_metadata(notify.clone(), status_lock.clone(), metadata, &mut rotator).await; + process_metadata(wtx.clone(), metadata, &mut rotator, &mut old_title).await }; } Ok(()) } -async fn can_go_previous( - notify: Arc<Notify>, - mut stream: PropertyStream<'_, bool>, - status_lock: StatusLock, -) -> Result<()> { +async fn can_go_previous(wtx: Sender<Status>, mut stream: PropertyStream<'_, bool>) -> Result<()> { while let Some(signal) = stream.next().await { if let Ok(val) = signal.get().await { - let mut status = status_lock.lock().await; - status.can_go_previous = val; - notify.notify_one(); + wtx.send_modify(|status| status.can_go_previous = val); }; } Ok(()) } async fn playback_state( - notify: Arc<Notify>, + wtx: Sender<Status>, mut stream: PropertyStream<'_, PlaybackStatus>, - status_lock: StatusLock, ) -> Result<()> { while let Some(signal) = stream.next().await { if let Ok(val) = signal.get().await { - let mut status = status_lock.lock().await; - status.playback_status = val; - notify.notify_one(); + let (color, background) = match val { + PlaybackStatus::Playing => (Some(BLACK.into()), Some(CYAN.into())), + PlaybackStatus::Paused => (Some(BLACK.into()), Some(YELLOW.into())), + PlaybackStatus::Stopped => (None, None), + }; + + wtx.send_modify(|status| { + status.playback_status = val; + status.color = color; + status.background = background; + }); } } Ok(()) } -async fn can_go_next( - notify: Arc<Notify>, - mut stream: PropertyStream<'_, bool>, - status_lock: StatusLock, -) -> Result<()> { +async fn can_go_next(wtx: Sender<Status>, mut stream: PropertyStream<'_, bool>) -> Result<()> { while let Some(signal) = stream.next().await { if let Ok(val) = signal.get().await { - let mut status = status_lock.lock().await; - status.can_go_next = val; - notify.notify_one(); + wtx.send_modify(|status| status.can_go_next = val); }; } Ok(()) } -async fn volume( - notify: Arc<Notify>, - mut stream: PropertyStream<'_, f64>, - status_lock: StatusLock, -) -> Result<()> { +async fn volume(wtx: Sender<Status>, mut stream: PropertyStream<'_, f64>) -> Result<()> { while let Some(signal) = stream.next().await { if let Ok(val) = signal.get().await { - let mut status = status_lock.lock().await; - status.volume = Some(val); - notify.notify_one(); + wtx.send_modify(|status| status.volume = Some(val)); }; } Ok(()) @@ -258,14 +196,15 @@ pub enum Component { Space, } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct Status { title: Option<String>, can_go_previous: bool, playback_status: PlaybackStatus, can_go_next: bool, volume: Option<f64>, - indexes: Vec<(usize, Component)>, + color: Option<String>, + background: Option<String>, } impl Status { diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..4346923 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,173 @@ +use futures_util::StreamExt; + +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt, BufReader}, + net::{UnixListener, UnixStream}, + sync::mpsc::Sender, + task::JoinSet, +}; +use zbus::{ + fdo::{DBusProxy, NameOwnerChangedArgs}, + names::OwnedBusName, + Connection, +}; + +use crate::{ + dbus::{media_player2::MediaPlayer2Proxy, player::PlayerProxy}, + i3bar::Click, + player::Event, + printer::Status, + Error, +}; + +pub async fn server() -> Result<(), Error> { + let mut join_set = JoinSet::new(); + let conn = Connection::session().await?; + + let status = Status::default(); + + let (tx, rx) = tokio::sync::mpsc::channel(128); + let (wtx, wrx) = tokio::sync::watch::channel(status); + + join_set.spawn(handle_name_owner_change(tx.clone(), conn.clone())); + join_set.spawn(crate::player::listener(conn.clone(), rx, wtx)); + + let listener = UnixListener::bind("/tmp/i3blocks-mpris.sock").unwrap(); + + loop { + match listener.accept().await { + Ok((stream, _addr)) => { + tokio::spawn(handle_client(stream, conn.clone(), wrx.clone(), tx.clone())); + } + Err(err) => { + println!("Error: {}", err); + break; + } + } + } + + while let Some(res) = join_set.join_next().await { + if let Err(err) = res? { + eprintln!("{err}") + }; + } + + Ok(()) +} + +async fn handle_client( + stream: UnixStream, + conn: Connection, + mut wrx: tokio::sync::watch::Receiver<Status>, + tx: Sender<Event>, +) -> Result<(), Error> { + let (reader, mut writer) = stream.into_split(); + + let mut reader = BufReader::new(reader); + let mut buf = Default::default(); + + reader.read_to_end(&mut buf).await?; + + tokio::spawn(async move { + while reader.read_to_end(&mut buf).await.is_ok() { + handle_click(conn.clone(), tx.clone(), serde_json::from_slice(&buf)?).await?; + } + Result::<(), Error>::Ok(()) + }); + + loop { + if wrx.changed().await.is_ok() { + let status = wrx.borrow_and_update(); + let Ok(v) = bincode::serialize(&*status) else { + continue; + }; + writer.write_all(&v).await; + } + } +} + +pub async fn handle_click( + conn: Connection, + tx: Sender<Event>, + click: Click, +) -> Result<(), Error> { + let Some(name) = click.name else { + return Ok(()); + }; + + let Some(instance) = click.instance else { + return Ok(()); + }; + + let player_proxy = PlayerProxy::builder(&conn) + .destination(name)? + .build() + .await?; + + let mpris_proxy = MediaPlayer2Proxy::builder(player_proxy.inner().connection()) + .destination(player_proxy.inner().destination())? + .build() + .await?; + + match (instance.as_ref(), click.button) { + ("mpris-icon", 1) if mpris_proxy.can_raise().await? => mpris_proxy.raise().await?, + ("mpris-title", _) => {} + ("mpris-prev", 1) => player_proxy.previous().await?, + ("mpris-playPause", 1) => player_proxy.play_pause().await?, + ("mpris-next", 1) => player_proxy.next().await?, + ("mpris-volume", 4) => { + player_proxy + .set_volume(player_proxy.volume().await? - 0.05) + .await? + } + ("mpris-volume", 5) => { + player_proxy + .set_volume(player_proxy.volume().await? + 0.05) + .await? + } + (_, 3) => tx.send(Event::Shift).await?, + _ => {} + } + + Ok(()) +} + +const IGNORED: [&str; 1] = ["playerctld"]; + +fn valid_player(name: &str) -> bool { + name.strip_prefix("org.mpris.MediaPlayer2.") + .and_then(|s| s.split('.').next()) + .is_some_and(|s| !IGNORED.contains(&s)) +} + +async fn handle_name_owner_change(tx: Sender<Event>, conn: Connection) -> Result<(), Error> { + let dbus_proxy = DBusProxy::new(&conn).await?; + let names: Vec<OwnedBusName> = dbus_proxy.list_names().await?; + + let players = names.iter().filter(|s| valid_player(s)).collect::<Vec<_>>(); + + for player in players { + tx.send(Event::Add(player.clone())).await?; + } + + let mut stream = dbus_proxy.receive_name_owner_changed().await?; + + while let Some(signal) = stream.next().await { + match signal.args()? { + NameOwnerChangedArgs { + name, old_owner, .. + } if valid_player(&name) && old_owner.is_some() => { + tx.send(Event::Remove(name.into())).await? + } + NameOwnerChangedArgs { + name, new_owner, .. + } if valid_player(&name) && new_owner.is_some() => { + tx.send(Event::Add(name.into())).await? + } + _ => {} + } + } + + Ok(()) +} + |