aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rwxr-xr-xi3blocks/.local/lib/i3blocks/i3blocks-mpris301
1 files changed, 190 insertions, 111 deletions
diff --git a/i3blocks/.local/lib/i3blocks/i3blocks-mpris b/i3blocks/.local/lib/i3blocks/i3blocks-mpris
index 82f48f2..6d8328b 100755
--- a/i3blocks/.local/lib/i3blocks/i3blocks-mpris
+++ b/i3blocks/.local/lib/i3blocks/i3blocks-mpris
@@ -1,10 +1,11 @@
#!/usr/bin/env python3
import asyncio
+from asyncio import Event, CancelledError, TaskGroup, run
import json
import os
import sys
-from collections import deque
+from collections import deque, OrderedDict
from itertools import islice
from dbus_next import Variant
@@ -17,7 +18,7 @@ PLAYER_INTERFACE = "org.mpris.MediaPlayer2.Player"
PROPERTIES_INTERFACE = "org.freedesktop.DBus.Properties"
-def parse_dbus_values(item: Variant | dict | list) -> any:
+def parse_dbus_values(item: Variant | dict | list) -> dict:
match item:
case Variant():
return parse_dbus_values(item.value)
@@ -29,82 +30,130 @@ def parse_dbus_values(item: Variant | dict | list) -> any:
return v
-class Status:
+class PlayerStatus:
ICON = "\U000f075a" # 󰝚
+ BLACK = f"#{os.environ.get("BASE16_COLOR_00_HEX")}"
+ GREEN = f"#{os.environ.get("BASE16_COLOR_0C_HEX")}"
+ YELLOW = f"#{os.environ.get("BASE16_COLOR_0A_HEX")}"
- def __init__(self, width: int = 10):
- self.title = ""
+ def __init__(self, player={}, width: int = 10):
self.width = width
- self.can_prev = False
- self.can_play_pause = False
- self.can_next = False
- self.play_pause = None
- self.color = None
- self.background = None
-
- def update_metadata(self, metadata):
- title = metadata.get("xesam:title", "")
- if self.title == title:
- return
-
- self.title = title
- if len(self.title) < self.width:
- self.title_gen = self.static_title(title)
- else:
- self.title_gen = self.scrolling_title(title)
+ self.player = {}
+ self.update_player(player)
- def update_status(self, status):
- match status:
+ def update_player(self, player):
+ old_title = self.title
+ self.player |= player
+ title = player.get("Metadata", {}).get("xesam:title", "")
+ if old_title != title:
+ if len(title) > self.width:
+ self.title_deque = deque(list(title + " "))
+ else:
+ self.title_deque = list(title)
+
+ @property
+ def status(self):
+ return self.player.get("PlaybackStatus", "Stopped")
+
+ @property
+ def playing(self):
+ return self.status == "Playing"
+
+ @property
+ def paused(self):
+ return self.status == "Paused"
+
+ @property
+ def stopped(self):
+ return self.status == "Stopped"
+
+ @property
+ def can_prev(self):
+ return self.player.get("CanGoPrevious", False)
+
+ @property
+ def can_play(self) -> bool:
+ return self.player.get("CanPlay", False) and self.paused
+
+ @property
+ def can_pause(self) -> bool:
+ return self.player.get("CanPause", False) and self.playing
+
+ @property
+ def can_next(self):
+ return self.player.get("CanGoNext", False)
+
+ @property
+ def metadata(self):
+ return self.player.get("Metadata", {})
+
+ @property
+ def title(self):
+ return self.metadata.get("xesam:title", "")
+
+ @property
+ def volume(self):
+ if self.stopped:
+ return None
+ if v := self.player.get("Volume"):
+ return round(float(v) * 100)
+
+ @property
+ def color(self) -> str | None:
+ match self.status:
+ case "Playing" | "Paused":
+ return PlayerStatus.BLACK
+ case "Stopped":
+ return None
+
+ @property
+ def background(self) -> str | None:
+ match self.status:
case "Playing":
- self.play_pause = "\U000f03e4 " # 󰏤
- self.color = f"#{os.environ.get("BASE16_COLOR_00_HEX")}"
- self.background = f"#{os.environ.get("BASE16_COLOR_0C_HEX")}"
+ return PlayerStatus.GREEN
case "Paused":
- self.play_pause = "\U000f040a " # 󰐊
- self.color = f"#{os.environ.get("BASE16_COLOR_00_HEX")}"
- self.background = f"#{os.environ.get("BASE16_COLOR_0A_HEX")}"
+ return PlayerStatus.YELLOW
case "Stopped":
- self.play_pause = None
- self.color = None
- self.background = None
-
- def update_player(self, player):
- self.update_metadata(player.get("Metadata"))
- self.update_status(player.get("PlaybackStatus"))
- self.can_next = player.get("CanGoNext")
- self.can_play_pause = player.get("CanPause")
- self.can_prev = player.get("CanGoPrevious")
-
- def static_title(self, title):
- while True:
- yield title
-
- def scrolling_title(self, title):
- iter = deque(list(title))
-
- while True:
- yield "".join(islice(iter, 0, self.width))
- iter.rotate(-1)
-
- def __str__(self):
- output = {"full_text": f" {Status.ICON} "}
-
- if len(self.title) > 0:
- output["full_text"] += f"{next(self.title_gen)} "
-
- if self.can_prev:
- output["full_text"] += "\U000f04ae " # 󰒮
- if self.can_play_pause:
- output["full_text"] += self.play_pause
- if self.can_next:
- output["full_text"] += "\U000f04ad " # 󰒭
-
- if self.color:
- output["color"] = self.color
- if self.background:
- output["background"] = self.background
-
- return json.dumps(output, ensure_ascii=False)
+ return None
+
+ def __iter__(self):
+ while self.title_deque:
+ title = "".join(islice(self.title_deque, 0, self.width))
+
+ components = [PlayerStatus.ICON]
+ if len(title) > 0:
+ components.append(title)
+ if self.can_prev:
+ components.append("\U000f04ae ") # 󰒮
+ if self.can_pause:
+ components.append("\U000f03e4 ") # 󰏤
+ if self.can_play:
+ components.append("\U000f040a ") # 󰐊
+ if self.can_next:
+ components.append("\U000f04ad ") # 󰒭
+ if v := self.volume:
+ match v:
+ case v if v > 66:
+ components.append("\U000f057e") # 󰕾
+ case v if v > 33:
+ components.append("\U000f0580") # 󰖀
+ case v:
+ components.append("\U000f057f") # 󰕿
+ if v:
+ components.append(f"{v}%")
+
+ output = {"full_text": f" {" ".join(components)} "}
+ if self.color:
+ output["color"] = self.color
+ if self.background:
+ output["background"] = self.background
+
+ yield output
+
+ if isinstance(self.title_deque, deque):
+ self.title_deque.rotate(-1)
+ else:
+ break
class MPRIS:
@@ -112,8 +161,9 @@ class MPRIS:
self.bus = None
self.dbus_iface = None
self.players = {}
- self.active_players = []
- self.status = Status()
+ self.ifaces = {}
+ self.statuses = OrderedDict({})
+ self.status_update = Event()
@classmethod
async def connect(cls):
@@ -126,8 +176,6 @@ class MPRIS:
await self.bus.introspect("org.freedesktop.DBus", "/org/freedesktop/DBus"),
).get_interface("org.freedesktop.DBus")
- self.players = {}
-
for bus_name in await self.dbus_iface.call_list_names():
if self.valid_player(bus_name):
await self.add_player(bus_name)
@@ -138,49 +186,80 @@ class MPRIS:
def valid_player(bus_name: str) -> bool:
return bus_name.startswith(BUS_NAME_PREFIX) and bus_name not in BUS_NAME_IGNORE
+ async def promote_player(self, bus_name):
+ if self.statuses.get(bus_name):
+ self.statuses.move_to_end(bus_name)
+ else:
+ self.statuses[bus_name] = PlayerStatus(self.players[bus_name])
+
+ self.status_update.set()
+
+ def remove_player(self, bus_name):
+ self.statuses.pop(bus_name, None)
+ self.status_update.set()
+
+ def cycle_player(self):
+ if k := next(reversed(self.statuses), None):
+ self.promote_player(k)
+ self.status_update.set()
+
async def status_printer(self, interval=1):
- while True:
- print(str(self.status), flush=True)
- await asyncio.sleep(interval)
+ while await self.status_update.wait():
+ self.status_update.clear()
+ status = next(reversed(self.statuses.values()), PlayerStatus())
+ for output in iter(status):
+ print(json.dumps(output, ensure_ascii=False), flush=True)
+ if self.status_update.is_set():
+ break
+ else:
+ await asyncio.sleep(0.5)
def on_properties_changed(self, bus_name: str, property, invalidated):
- self.players[bus_name].update(parse_dbus_values(property))
- for name, value in self.players[bus_name].items():
- match name, value:
- case "Metadata", _:
- continue
- case "PlaybackStatus", "Playing":
- if bus_name in self.active_players:
- self.active_players.remove(bus_name)
- self.active_players.append(bus_name)
- self.status.update_player(self.players[bus_name])
- case "PlaybackStatus", "Stopped" if bus_name in self.active_players:
- self.active_players.remove(bus_name)
- if len(self.active_players) > 0:
- self.status.update_player(self.players[self.active_players[-1]])
+ props = parse_dbus_values(property)
+ self.statuses[bus_name].update_player(props)
+ match props:
+ case {"PlaybackStatus": "Playing"}:
+ self.promote_player(bus_name)
+ case {"PlaybackStatus": "Stopped"}:
+ self.remove_player(bus_name)
+
+ self.status_update.set()
async def add_player(self, bus_name):
self.players[bus_name] = {}
- iface = self.bus.get_proxy_object(
+ obj = self.bus.get_proxy_object(
bus_name, OBJECT_PATH, await self.bus.introspect(bus_name, OBJECT_PATH)
- ).get_interface(PROPERTIES_INTERFACE)
+ )
+
+ properties = obj.get_interface(PROPERTIES_INTERFACE)
self.players[bus_name] = parse_dbus_values(
- await iface.call_get_all(PLAYER_INTERFACE)
+ await properties.call_get_all(PLAYER_INTERFACE)
)
+ self.ifaces[bus_name] = obj.get_interface(PLAYER_INTERFACE)
- iface.on_properties_changed(
+ properties.on_properties_changed(
lambda _, p, i: self.on_properties_changed(bus_name, p, i)
)
- async def remove_player(self, bus_name):
- self.players.pop(bus_name)
+ if self.players[bus_name].get("PlaybackStatus", "Stopped") != "Stopped":
+ self.statuses[bus_name] = PlayerStatus(self.players[bus_name])
+
+ if self.players[bus_name].get("PlaybackStatus", "Stopped") == "Paused":
+ self.statuses.move_to_end(bus_name, last=False)
+
+ self.status_update.set()
+
+ async def remove_player_iface(self, bus_name):
+ self.remove_player(bus_name)
+ self.players.pop(bus_name, None)
+ self.ifaces.pop(bus_name, None)
async def on_name_owner_changed(self, bus_name, old, new):
if self.valid_player(bus_name):
if old:
- await self.remove_player(bus_name)
+ self.remove_player(bus_name)
if new:
await self.add_player(bus_name)
@@ -207,10 +286,16 @@ class MPRIS:
case {"button": 2}:
pass
case {"button": 3}:
- pass
+ self.cycle_player()
+
case {"button": 4}:
- pass
+ if bus_name := next(reversed(self.statuses)):
+ volume = await self.ifaces[bus_name].get_volume()
+ await self.ifaces[bus_name].set_volume(volume + 0.05)
case {"button": 5}:
+ if bus_name := next(reversed(self.statuses)):
+ volume = await self.ifaces[bus_name].get_volume()
+ await self.ifaces[bus_name].set_volume(volume - 0.05)
pass
@@ -218,19 +303,13 @@ async def main():
mpris = await MPRIS.connect()
try:
- async with asyncio.TaskGroup() as task_group:
+ async with TaskGroup() as task_group:
task_group.create_task(mpris.status_printer())
task_group.create_task(mpris.listener())
- # task_group.create_task(mpris.button_handler())
- except asyncio.CancelledError:
+ task_group.create_task(mpris.button_handler())
+ except CancelledError:
return
if __name__ == "__main__":
- asyncio.run(main())
- asyncio.run(main())
- asyncio.run(main())
- asyncio.run(main())
- asyncio.run(main())
- asyncio.run(main())
- asyncio.run(main())
+ run(main())