diff options
author | Toby Vincent <tobyv@tobyvin.dev> | 2024-07-16 17:09:27 -0500 |
---|---|---|
committer | Toby Vincent <tobyv@tobyvin.dev> | 2024-07-16 17:09:27 -0500 |
commit | 9638edcbafea29b76f019ff73e2347cb7c758ba0 (patch) | |
tree | 738adda62ff27798ebf43635665a889ad1dcd1f1 /src | |
parent | 3e7721dfbaeb0b57f48801660a64c13989643198 (diff) |
more wip
Diffstat (limited to 'src')
-rw-r--r-- | src/components.rs | 234 | ||||
-rw-r--r-- | src/error.rs | 21 | ||||
-rw-r--r-- | src/lib.rs | 4 | ||||
-rw-r--r-- | src/main.rs | 82 | ||||
-rw-r--r-- | src/mpris.rs | 129 | ||||
-rw-r--r-- | src/player.rs | 90 | ||||
-rw-r--r-- | src/printer.rs | 107 |
7 files changed, 489 insertions, 178 deletions
diff --git a/src/components.rs b/src/components.rs new file mode 100644 index 0000000..1548365 --- /dev/null +++ b/src/components.rs @@ -0,0 +1,234 @@ +use std::{collections::HashMap, option::Option, time::Duration}; + +use futures_util::StreamExt; +use tokio::{sync::mpsc::Sender, task::JoinHandle}; +use zbus::zvariant::OwnedValue; + +use crate::{ + dbus::player::{PlaybackStatus, PlayerProxy}, + error::Result, + i3bar::Block, +}; + +const TICK_RATE: Duration = Duration::from_millis(500); + +pub trait Component: std::marker::Sized { + fn listen( + &self, + proxy: PlayerProxy<'static>, + tx: Sender<Block>, + ) -> impl std::future::Future<Output = Result<()>> + Send; +} + +#[derive(Debug, Clone, Default)] +pub struct Icon; + +impl Component for Icon { + async fn listen(&self, _: PlayerProxy<'static>, tx: Sender<Block>) -> Result<()> { + tx.send(Block { + full_text: " ".into(), + instance: "mpris-icon".into(), + ..Default::default() + }) + .await + .map_err(Into::into) + } +} + +#[derive(Debug, Clone, Default)] +pub struct Title; + +impl Title { + async fn process_metadata( + tx: Sender<Block>, + metadata: HashMap<String, OwnedValue>, + rotator: &mut Option<JoinHandle<Result<()>>>, + last: &mut Option<String>, + ) -> Result<()> { + let Some(val) = metadata.get("xesam:title") else { + return Ok(()); + }; + + let title: String = val.try_to_owned()?.try_into()?; + + if last.as_ref().is_some_and(|s| *s == title) { + return Ok(()); + } + + let mut block = Block { + instance: "mpris-title".into(), + ..Default::default() + }; + + if let Some(h) = rotator.take() { + h.abort() + }; + + if title.len() > 10 { + let mut chars = title.chars().collect::<Vec<char>>(); + *last = Some(title); + *rotator = Some(tokio::spawn(async move { + let mut interval = tokio::time::interval(TICK_RATE); + chars.push(' '); + + loop { + interval.tick().await; + block.full_text = chars[0..10].iter().collect(); + tx.send(block.clone()).await?; + chars.rotate_left(1); + } + })); + } else { + *last = Some(title.clone()); + block.full_text = title; + tx.send(block).await?; + } + + Ok(()) + } +} + +impl Component for Title { + async fn listen(&self, proxy: PlayerProxy<'static>, tx: Sender<Block>) -> Result<()> { + let mut stream = proxy.receive_metadata_changed().await; + let mut rotator = None; + let mut last = None; + + Self::process_metadata(tx.clone(), proxy.metadata().await?, &mut rotator, &mut last) + .await?; + + while let Some(signal) = stream.next().await { + if let Ok(metadata) = signal.get().await { + Self::process_metadata(tx.clone(), metadata, &mut rotator, &mut last).await?; + }; + } + Ok(()) + } +} + +#[derive(Debug, Clone, Default)] +pub struct Prev; + +impl Component for Prev { + async fn listen(&self, proxy: PlayerProxy<'static>, tx: Sender<Block>) -> Result<()> { + let mut block = Block { + full_text: proxy + .can_go_previous() + .await + .unwrap_or_default() + .then_some(" ".into()) + .unwrap_or_default(), + instance: "mpris-prev".into(), + ..Default::default() + }; + + tx.send(block.clone()).await?; + + let mut stream = proxy.receive_can_go_previous_changed().await; + while let Some(signal) = stream.next().await { + if let Ok(val) = signal.get().await { + block.full_text = val.then_some(" ".into()).unwrap_or_default(); + + tx.send(block.clone()).await?; + }; + } + Ok(()) + } +} + +#[derive(Debug, Clone, Default)] +pub struct PlayPause; + +impl Component for PlayPause { + async fn listen(&self, proxy: PlayerProxy<'static>, tx: Sender<Block>) -> Result<()> { + let mut block = Block { + instance: "mpris-playpause".into(), + ..Default::default() + }; + + block.full_text = match proxy.playback_status().await? { + PlaybackStatus::Playing => " ", + PlaybackStatus::Paused | PlaybackStatus::Stopped => " ", + } + .to_string(); + + tx.send(block.clone()).await?; + + let mut stream = proxy.receive_playback_status_changed().await; + while let Some(signal) = stream.next().await { + if let Ok(val) = signal.get().await { + block.full_text = match val { + PlaybackStatus::Playing => " ", + PlaybackStatus::Paused | PlaybackStatus::Stopped => " ", + } + .to_string(); + + tx.send(block.clone()).await?; + }; + } + Ok(()) + } +} + +#[derive(Debug, Clone, Default)] +pub struct Next; + +impl Component for Next { + async fn listen(&self, proxy: PlayerProxy<'static>, tx: Sender<Block>) -> Result<()> { + let mut block = Block { + full_text: proxy + .can_go_next() + .await + .unwrap_or_default() + .then_some(" ".into()) + .unwrap_or_default(), + instance: "mpris-next".into(), + ..Default::default() + }; + + tx.send(block.clone()).await?; + + let mut stream = proxy.receive_can_go_next_changed().await; + while let Some(signal) = stream.next().await { + if let Ok(val) = signal.get().await { + block.full_text = val.then_some(" ".into()).unwrap_or_default(); + tx.send(block.clone()).await?; + }; + } + Ok(()) + } +} + +#[derive(Debug, Clone, Default)] +pub struct Volume; + +impl Component for Volume { + async fn listen(&self, proxy: PlayerProxy<'static>, tx: Sender<Block>) -> Result<()> { + let mut block = Block { + instance: "mpris-volume".into(), + ..Default::default() + }; + + block.full_text = match (proxy.volume().await? * 100_f64) as u32 { + v @ 66.. => format!(" {v}% "), + v @ 33.. => format!(" {v}% "), + v @ 0.. => format!(" {v}% "), + }; + + tx.send(block.clone()).await?; + + let mut stream = proxy.receive_volume_changed().await; + while let Some(signal) = stream.next().await { + if let Ok(val) = signal.get().await { + block.full_text = match (val * 100_f64) as u32 { + v @ 66.. => format!(" {v}% "), + v @ 33.. => format!(" {v}% "), + v @ 0.. => format!(" {v}% "), + }; + + tx.send(block.clone()).await?; + }; + } + Ok(()) + } +} diff --git a/src/error.rs b/src/error.rs index 96d864c..f0f204f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,12 +5,24 @@ pub enum Error { #[error("IO error: {0}")] IO(#[from] std::io::Error), + #[error("Poison error: {0}")] + Poison(String), + #[error("Join error: {0}")] Join(#[from] tokio::task::JoinError), #[error("ZBus error: {0}")] ZBus(#[from] zbus::Error), + #[error("ZBus fdo error: {0}")] + Fdo(#[from] zbus::fdo::Error), + + #[error("ZBus zvariant error: {0}")] + ZVariant(#[from] zbus::zvariant::Error), + + #[error("Json serde error: {0}")] + Json(#[from] serde_json::Error), + #[error("Invalid color format: {0}")] Color(String), @@ -22,6 +34,9 @@ pub enum Error { #[error("Send error: {0}")] Send(String), + + #[error("Invalid instance value")] + Instance, } impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error { @@ -29,3 +44,9 @@ impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error { Self::Send(err.to_string()) } } + +impl<T> From<std::sync::PoisonError<T>> for Error { + fn from(err: std::sync::PoisonError<T>) -> Self { + Self::Poison(err.to_string()) + } +} @@ -1,8 +1,10 @@ pub use error::{Error, Result}; pub mod color; +pub mod components; pub mod dbus; pub mod error; pub mod i3bar; +pub mod mpris; pub mod player; -pub mod printer; +//pub mod printer; diff --git a/src/main.rs b/src/main.rs index 276ec05..84e5e51 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,81 +1,7 @@ -use futures_util::StreamExt; -use tokio::{ - sync::mpsc::{self, Sender}, - task::JoinSet, -}; -use zbus::fdo::NameOwnerChangedStream; -use zbus::{ - fdo::{DBusProxy, NameOwnerChangedArgs}, - names::OwnedBusName, - Connection, -}; - -use i3blocks::{i3bar::Click, player::Event, Error}; - -const IGNORED: [&str; 1] = ["playerctld"]; +use main_error::MainError; #[tokio::main] -async fn main() -> anyhow::Result<()> { - let mut join_set = JoinSet::new(); - let (tx, rx) = mpsc::channel(128); - - let conn = Connection::session().await?; - - let dbus_proxy = DBusProxy::new(&conn).await?; - let names: Vec<OwnedBusName> = dbus_proxy.list_names().await?; - - let players = names.iter().filter(|s| valid_player(s)).collect::<Vec<_>>(); - - for player in players { - tx.send(Event::Add(player.clone())).await?; - } - - let name_owner_changed = dbus_proxy.receive_name_owner_changed().await?; - - join_set.spawn(listen_name_owner_change(tx.clone(), name_owner_changed)); - join_set.spawn(i3blocks::player::listener(conn.clone(), rx)); - - for line in std::io::stdin().lines() { - let click: Click = dbg!(serde_json::from_str(&line?)?); - let pos = - click.full_text.chars().count() as f64 * (click.relative_x as f64 / click.width as f64); - tx.send(Event::Click((click.button, pos as usize))).await?; - } - - while let Some(res) = join_set.join_next().await { - if let Err(err) = res? { - eprintln!("{err}") - }; - } - - Ok(()) -} - -fn valid_player(name: &str) -> bool { - name.strip_prefix("org.mpris.MediaPlayer2.") - .and_then(|s| s.split('.').next()) - .is_some_and(|s| !IGNORED.contains(&s)) -} - -async fn listen_name_owner_change( - tx: Sender<Event>, - mut stream: NameOwnerChangedStream<'static>, -) -> Result<(), Error> { - while let Some(signal) = stream.next().await { - match signal.args()? { - NameOwnerChangedArgs { - name, old_owner, .. - } if valid_player(&name) && old_owner.is_some() => { - tx.send(Event::Remove(name.into())).await? - } - NameOwnerChangedArgs { - name, new_owner, .. - } if valid_player(&name) && new_owner.is_some() => { - tx.send(Event::Add(name.into())).await? - } - _ => {} - } - } - - Ok(()) +async fn main() -> Result<(), MainError> { + todo!() + //i3blocks::mpris::mpris().await.map_err(Into::into) } diff --git a/src/mpris.rs b/src/mpris.rs new file mode 100644 index 0000000..09aab74 --- /dev/null +++ b/src/mpris.rs @@ -0,0 +1,129 @@ +use std::marker::Sync; + +use futures_util::StreamExt; +use tokio::{sync::mpsc::Sender, task::JoinSet}; +use zbus::{ + fdo::{DBusProxy, NameOwnerChangedArgs, NameOwnerChangedStream}, + names::OwnedBusName, + Connection, +}; + +use crate::{ + components::Component, + dbus::{media_player2::MediaPlayer2Proxy, player::PlayerProxy}, + i3bar::Click, + player::{self, Event}, + Result, +}; + +const IGNORED: [&str; 1] = ["playerctld"]; + +#[derive(Debug, Clone)] +pub enum Instance { + Icon, + Title, + Prev, + PlayPause, + Next, + Volume, +} + +pub async fn mpris<T: Component + Sync>(component: &'static T) -> Result<()> { + let mut join_set = JoinSet::new(); + let (tx, rx) = tokio::sync::mpsc::channel(128); + + let conn = Connection::session().await?; + + let dbus_proxy = DBusProxy::new(&conn).await?; + let names: Vec<OwnedBusName> = dbus_proxy.list_names().await?; + + let players = names.iter().filter(|s| valid_player(s)).collect::<Vec<_>>(); + + for player in players { + tx.send(Event::Add(player.clone())).await?; + } + + let name_owner_changed = dbus_proxy.receive_name_owner_changed().await?; + + join_set.spawn(listen_name_owner_change(tx.clone(), name_owner_changed)); + join_set.spawn(player::listener(conn.clone(), rx, component)); + + for line in std::io::stdin().lines() { + let click: Click = dbg!(serde_json::from_str(&line?)?); + + let Some(name) = click.name else { + continue; + }; + + let Some(instance) = click.instance else { + continue; + }; + + let player_proxy = PlayerProxy::builder(&conn) + .destination(name)? + .build() + .await?; + + let mpris_proxy = MediaPlayer2Proxy::builder(player_proxy.inner().connection()) + .destination(player_proxy.inner().destination())? + .build() + .await?; + + match (instance.as_ref(), click.button) { + ("mpris-icon", 1) if mpris_proxy.can_raise().await? => mpris_proxy.raise().await?, + ("mpris-title", _) => {} + ("mpris-prev", 1) => player_proxy.previous().await?, + ("mpris-playPause", 1) => player_proxy.play_pause().await?, + ("mpris-next", 1) => player_proxy.next().await?, + ("mpris-volume", 4) => { + player_proxy + .set_volume(player_proxy.volume().await? - 0.05) + .await? + } + ("mpris-volume", 5) => { + player_proxy + .set_volume(player_proxy.volume().await? + 0.05) + .await? + } + (_, 3) => tx.send(Event::Shift).await?, + _ => {} + } + } + + while let Some(res) = join_set.join_next().await { + if let Err(err) = res? { + eprintln!("{err}") + }; + } + + Ok(()) +} + +fn valid_player(name: &str) -> bool { + name.strip_prefix("org.mpris.MediaPlayer2.") + .and_then(|s| s.split('.').next()) + .is_some_and(|s| !IGNORED.contains(&s)) +} + +async fn listen_name_owner_change( + tx: Sender<Event>, + mut stream: NameOwnerChangedStream<'static>, +) -> Result<()> { + while let Some(signal) = stream.next().await { + match signal.args()? { + NameOwnerChangedArgs { + name, old_owner, .. + } if valid_player(&name) && old_owner.is_some() => { + tx.send(Event::Remove(name.into())).await? + } + NameOwnerChangedArgs { + name, new_owner, .. + } if valid_player(&name) && new_owner.is_some() => { + tx.send(Event::Add(name.into())).await? + } + _ => {} + } + } + + Ok(()) +} diff --git a/src/player.rs b/src/player.rs index 2f69d20..de4226d 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,16 +1,22 @@ -use std::collections::VecDeque; +use std::{collections::VecDeque, io::Write}; use tokio::{ sync::mpsc::{Receiver, Sender}, - task::JoinHandle, + task::{AbortHandle, JoinHandle, JoinSet}, }; use zbus::{names::OwnedBusName, Connection}; use crate::{ + components::Component, dbus::player::{PlaybackStatus, PlayerProxy}, - printer, Result, + i3bar::Block, + Result, }; +const BLACK: &str = "#1d2021"; +const YELLOW: &str = "#fabd2f"; +const CYAN: &str = "#8ec07c"; + pub enum Command { Shift, Raise(String), @@ -27,12 +33,19 @@ pub struct Player { pub enum Event { Add(OwnedBusName), Remove(OwnedBusName), - Click((u8, usize)), + Shift, } -pub async fn listener(conn: Connection, mut rx: Receiver<Event>) -> Result<()> { +pub async fn listener<T: Component>( + conn: Connection, + mut rx: Receiver<Event>, + component: &'static T, +) -> Result<()> { let mut players: VecDeque<OwnedBusName> = VecDeque::new(); - let mut active: Option<Player> = None; + let mut active: Option<OwnedBusName> = Default::default(); + let mut listener: Option<AbortHandle> = None; + let mut printer: Option<AbortHandle> = None; + let mut join_set = JoinSet::new(); while let Some(value) = rx.recv().await { match value { @@ -53,37 +66,62 @@ pub async fn listener(conn: Connection, mut rx: Receiver<Event>) -> Result<()> { players.remove(index); } } - Event::Click((3, _)) if players.len() > 1 => players.rotate_left(1), - Event::Click(click) => { - if let Some(p) = active.as_ref() { - p.tx.send(click).await?; - } - continue; - } + Event::Shift if !players.is_empty() => players.rotate_left(1), + _ => {} }; - if let Some(name) = players.front() { - if !active.as_ref().is_some_and(|s| &s.name == name) { - let (tx, rx) = tokio::sync::mpsc::channel(128); + if players.front() != active.as_ref() { + if active.is_some() { + if let Some(h) = listener.take() { + h.abort(); + } - if let Some(player) = active.take() { - player.handle.abort(); + if let Some(h) = printer.take() { + h.abort(); } + active = None; + } + if let Some(name) = players.front().cloned() { let proxy = PlayerProxy::builder(&conn) .destination(name.clone())? .build() .await?; - let handle = tokio::spawn(printer::printer(proxy, rx)); - active = Some(Player { - name: name.clone(), - handle, - tx, - }); + let (tx, mut rx): (Sender<Block>, _) = tokio::sync::mpsc::channel(128); + let conn = conn.clone(); + listener = Some(join_set.spawn(component.listen(proxy, tx.clone()))); + printer = Some(join_set.spawn(async move { + while let Some(mut block) = rx.recv().await { + let proxy = PlayerProxy::builder(&conn) + .destination(name.clone())? + .build() + .await?; + + match proxy.playback_status().await? { + PlaybackStatus::Playing => { + block.color = Some(BLACK.into()); + block.background = Some(CYAN.into()) + } + PlaybackStatus::Paused => { + block.color = Some(BLACK.into()); + block.background = Some(YELLOW.into()) + } + _ => {} + }; + + block.instance = name.as_str().to_owned(); + block.instance = name.as_str().to_owned(); + + let mut w = std::io::stdout().lock(); + let mut v = serde_json::to_vec(&block)?; + v.push(b'\n'); + w.write_all(&v)?; + w.flush()?; + } + Ok(()) + })); } - } else if let Some(player) = active.take() { - player.handle.abort(); } } diff --git a/src/printer.rs b/src/printer.rs index a609616..db5a554 100644 --- a/src/printer.rs +++ b/src/printer.rs @@ -2,19 +2,15 @@ use std::{collections::HashMap, io::Write, sync::Arc, time::Duration}; use futures_util::stream::StreamExt; use tokio::{ - sync::{mpsc::Receiver, Mutex, Notify}, + sync::{Mutex, Notify}, task::{JoinHandle, JoinSet}, }; -use unicode_segmentation::UnicodeSegmentation; use zbus::{proxy::PropertyStream, zvariant::OwnedValue}; -use crate::Result; use crate::{ - dbus::{ - media_player2::MediaPlayer2Proxy, - player::{PlaybackStatus, PlayerProxy}, - }, + dbus::player::{PlaybackStatus, PlayerProxy}, i3bar::Block, + Result, }; type StatusLock = Arc<Mutex<Status>>; @@ -30,7 +26,7 @@ pub enum Signal { Print, } -pub async fn printer(proxy: PlayerProxy<'static>, rx_click: Receiver<(u8, usize)>) -> Result<()> { +pub async fn printer(proxy: PlayerProxy<'static>) -> Result<()> { let mut join_set: JoinSet<Result<()>> = JoinSet::new(); let mut rotator = None; @@ -86,12 +82,39 @@ pub async fn printer(proxy: PlayerProxy<'static>, rx_click: Receiver<(u8, usize) status.clone(), )); - join_set.spawn(click_listener(proxy, rx_click, status.clone())); - loop { notify.notified().await; let mut status = status.lock().await; + //let (color, background) = match proxy.playback_status().await.ok() { + // Some(PlaybackStatus::Playing) => (Some(BLACK.into()), Some(CYAN.into())), + // Some(PlaybackStatus::Paused) => (Some(BLACK.into()), Some(YELLOW.into())), + // Some(PlaybackStatus::Stopped) => (None, None), + //}; + + //let s = match instance { + // Instance::Icon => " ", + // Instance::Title => &status.title, + // Instance::Prev => " ", + // Instance::Play => " ", + // Instance::Pause => " ", + // Instance::Next => " ", + // Instance::Volume => { + // if let Some(volume) = self.volume.map(|v| (v * 100_f64) as u32) { + // match volume { + // v @ 66.. => &format!(" {v}% "), + // v @ 33.. => &format!(" {v}% "), + // v @ 0.. => &format!(" {v}% "), + // } + // } + // } + //}; + // + //let block = Block { + // full_text, + // ..Default::default() + //}; + let mut w = std::io::stdout().lock(); let mut v = serde_json::to_vec(&status.build()).unwrap(); v.push(b'\n'); @@ -100,48 +123,6 @@ pub async fn printer(proxy: PlayerProxy<'static>, rx_click: Receiver<(u8, usize) } } -async fn click_listener( - player_proxy: PlayerProxy<'static>, - mut rx: Receiver<(u8, usize)>, - status: StatusLock, -) -> Result<()> { - let mpris_proxy = MediaPlayer2Proxy::builder(player_proxy.inner().connection()) - .destination(player_proxy.inner().destination())? - .build() - .await?; - - while let Some((button, index)) = dbg!(rx.recv().await) { - let status = status.lock().await; - if let Some(c) = dbg!(&status.indexes) - .iter() - .rev() - .find_map(|(i, b)| (*i <= index).then_some(b)) - { - match dbg!((c, button)) { - (Component::Icon, 1) if mpris_proxy.can_raise().await? => { - mpris_proxy.raise().await? - } - (Component::Title, _) => {} - (Component::Prev, 1) => player_proxy.previous().await?, - (Component::Play, 1) | (Component::Pause, 1) => player_proxy.play_pause().await?, - (Component::Next, 1) => player_proxy.next().await?, - (Component::Volume, 4) => { - player_proxy - .set_volume(player_proxy.volume().await? - 0.05) - .await? - } - (Component::Volume, 5) => { - player_proxy - .set_volume(player_proxy.volume().await? + 0.05) - .await? - } - _ => {} - } - } - } - Ok(()) -} - async fn process_metadata( notify: Arc<Notify>, status_lock: StatusLock, @@ -230,7 +211,7 @@ async fn playback_state( let mut status = status_lock.lock().await; status.playback_status = val; notify.notify_one(); - }; + } } Ok(()) } @@ -290,45 +271,28 @@ pub struct Status { impl Status { fn build(&mut self) -> Block { let mut full_text = String::new(); - self.indexes = Vec::new(); - - self.indexes - .push((full_text.chars().count(), Component::Icon)); full_text.push_str(" "); if let Some(title) = self.title.as_ref() { - self.indexes - .push((full_text.chars().count(), Component::Title)); full_text.push_str(title); full_text.push(' '); } if self.can_go_previous { - self.indexes - .push((full_text.chars().count(), Component::Prev)); full_text.push_str(" "); } if self.playback_status == PlaybackStatus::Playing { - self.indexes - .push((full_text.chars().count(), Component::Pause)); full_text.push_str(" "); } else { - self.indexes - .push((full_text.chars().count(), Component::Play)); full_text.push_str(" "); } if self.can_go_next { - self.indexes - .push((full_text.chars().count(), Component::Next)); full_text.push_str(" "); } if let Some(volume) = self.volume.map(|v| (v * 100_f64) as u32) { - self.indexes - .push((full_text.chars().count(), Component::Volume)); - match volume { v @ 66.. => full_text.push_str(&format!(" {v}% ")), v @ 33.. => full_text.push_str(&format!(" {v}% ")), @@ -336,9 +300,6 @@ impl Status { } } - self.indexes - .push((full_text.chars().count(), Component::Space)); - let (color, background) = match self.playback_status { PlaybackStatus::Playing => (Some(BLACK.into()), Some(CYAN.into())), PlaybackStatus::Paused => (Some(BLACK.into()), Some(YELLOW.into())), |