use std::sync::Arc; use futures_util::StreamExt; use tokio::{sync::Mutex, task::JoinSet}; use zbus::Connection; pub use crate::{ component::{Button, Component, Update}, error::{Error, Result}, }; use crate::{ dbus::{player::PlayerProxy, playerctld::PlayerctldProxy}, i3bar::{Block, Click}, }; pub mod component; pub mod dbus; pub mod error; pub mod i3bar; const IGNORED: [&str; 2] = ["playerctld", "kdeconnect"]; pub async fn run(mut block: Block) -> Result<(), Error> where C: Component, { let conn = Connection::session().await?; block.name = Some(format!("mpris-{}", C::NAME)); tokio::spawn(listeners::(conn.clone(), block)); for click in std::io::stdin() .lines() .map_while(Result::ok) .flat_map(|s| serde_json::from_str::(&s)) { <::Handler as Button>::handle(conn.clone(), click).await?; } Ok(()) } async fn listeners(conn: Connection, block: Block) -> Result<(), Error> where C: Component, { let mut join_set = JoinSet::new(); let (tx_player, mut rx_player) = tokio::sync::mpsc::channel(128); let (tx_status, mut rx_status) = tokio::sync::mpsc::channel(128); let (tx_value, mut rx_value) = tokio::sync::mpsc::channel(128); let proxy = PlayerctldProxy::builder(&conn).build().await?; tokio::spawn(async move { let mut last = proxy .player_names() .await? .into_iter() .find(|s| s.split('.').nth(3).is_some_and(|s| !IGNORED.contains(&s))) .unwrap_or_default(); tx_player.send(last.clone()).await?; let mut stream = proxy.receive_active_player_change_end().await?; while let Some(signal) = stream.next().await { let name = signal.args()?.name.to_owned(); if name != last && name .split('.') .nth(3) .is_some_and(|s| !IGNORED.contains(&s)) { last.clone_from(&name); tx_player.send(name).await?; } } Result::<_, Error>::Ok(()) }); let block = Arc::new(Mutex::new(block)); loop { let updated = tokio::select! { Some(name) = rx_player.recv() => { join_set.shutdown().await; if name.is_empty() { let mut block = block.lock().await; block.full_text = String::new(); block.instance = None; true } else { let mut block = block.lock().await; block.instance.clone_from(&Some(name.clone())); let proxy = PlayerProxy::builder(&conn) .destination(name)? .build() .await?; join_set.spawn(<::Colorer as Update>::listen(tx_status.clone(), proxy.clone())); join_set.spawn(<::Updater as Update>::listen(tx_value.clone(), proxy)); false } } Some(color) = rx_status.recv() => <::Colorer as Update>::update(color, block.clone()).await?, Some(value) = rx_value.recv() => <::Updater as Update>::update(value, block.clone()).await? }; if updated { let s = block.lock().await; s.write_stdout()?; } } }