diff options
-rwxr-xr-x | i3blocks/.local/lib/i3blocks/i3blocks-mpris | 301 |
1 files changed, 190 insertions, 111 deletions
diff --git a/i3blocks/.local/lib/i3blocks/i3blocks-mpris b/i3blocks/.local/lib/i3blocks/i3blocks-mpris index 82f48f2..6d8328b 100755 --- a/i3blocks/.local/lib/i3blocks/i3blocks-mpris +++ b/i3blocks/.local/lib/i3blocks/i3blocks-mpris @@ -1,10 +1,11 @@ #!/usr/bin/env python3 import asyncio +from asyncio import Event, CancelledError, TaskGroup, run import json import os import sys -from collections import deque +from collections import deque, OrderedDict from itertools import islice from dbus_next import Variant @@ -17,7 +18,7 @@ PLAYER_INTERFACE = "org.mpris.MediaPlayer2.Player" PROPERTIES_INTERFACE = "org.freedesktop.DBus.Properties" -def parse_dbus_values(item: Variant | dict | list) -> any: +def parse_dbus_values(item: Variant | dict | list) -> dict: match item: case Variant(): return parse_dbus_values(item.value) @@ -29,82 +30,130 @@ def parse_dbus_values(item: Variant | dict | list) -> any: return v -class Status: +class PlayerStatus: ICON = "\U000f075a" # + BLACK = f"#{os.environ.get("BASE16_COLOR_00_HEX")}" + GREEN = f"#{os.environ.get("BASE16_COLOR_0C_HEX")}" + YELLOW = f"#{os.environ.get("BASE16_COLOR_0A_HEX")}" - def __init__(self, width: int = 10): - self.title = "" + def __init__(self, player={}, width: int = 10): self.width = width - self.can_prev = False - self.can_play_pause = False - self.can_next = False - self.play_pause = None - self.color = None - self.background = None - - def update_metadata(self, metadata): - title = metadata.get("xesam:title", "") - if self.title == title: - return - - self.title = title - if len(self.title) < self.width: - self.title_gen = self.static_title(title) - else: - self.title_gen = self.scrolling_title(title) + self.player = {} + self.update_player(player) - def update_status(self, status): - match status: + def update_player(self, player): + old_title = self.title + self.player |= player + title = player.get("Metadata", {}).get("xesam:title", "") + if old_title != title: + if len(title) > self.width: + self.title_deque = deque(list(title + " ")) + else: + self.title_deque = list(title) + + @property + def status(self): + return self.player.get("PlaybackStatus", "Stopped") + + @property + def playing(self): + return self.status == "Playing" + + @property + def paused(self): + return self.status == "Paused" + + @property + def stopped(self): + return self.status == "Stopped" + + @property + def can_prev(self): + return self.player.get("CanGoPrevious", False) + + @property + def can_play(self) -> bool: + return self.player.get("CanPlay", False) and self.paused + + @property + def can_pause(self) -> bool: + return self.player.get("CanPause", False) and self.playing + + @property + def can_next(self): + return self.player.get("CanGoNext", False) + + @property + def metadata(self): + return self.player.get("Metadata", {}) + + @property + def title(self): + return self.metadata.get("xesam:title", "") + + @property + def volume(self): + if self.stopped: + return None + if v := self.player.get("Volume"): + return round(float(v) * 100) + + @property + def color(self) -> str | None: + match self.status: + case "Playing" | "Paused": + return PlayerStatus.BLACK + case "Stopped": + return None + + @property + def background(self) -> str | None: + match self.status: case "Playing": - self.play_pause = "\U000f03e4 " # - self.color = f"#{os.environ.get("BASE16_COLOR_00_HEX")}" - self.background = f"#{os.environ.get("BASE16_COLOR_0C_HEX")}" + return PlayerStatus.GREEN case "Paused": - self.play_pause = "\U000f040a " # - self.color = f"#{os.environ.get("BASE16_COLOR_00_HEX")}" - self.background = f"#{os.environ.get("BASE16_COLOR_0A_HEX")}" + return PlayerStatus.YELLOW case "Stopped": - self.play_pause = None - self.color = None - self.background = None - - def update_player(self, player): - self.update_metadata(player.get("Metadata")) - self.update_status(player.get("PlaybackStatus")) - self.can_next = player.get("CanGoNext") - self.can_play_pause = player.get("CanPause") - self.can_prev = player.get("CanGoPrevious") - - def static_title(self, title): - while True: - yield title - - def scrolling_title(self, title): - iter = deque(list(title)) - - while True: - yield "".join(islice(iter, 0, self.width)) - iter.rotate(-1) - - def __str__(self): - output = {"full_text": f" {Status.ICON} "} - - if len(self.title) > 0: - output["full_text"] += f"{next(self.title_gen)} " - - if self.can_prev: - output["full_text"] += "\U000f04ae " # - if self.can_play_pause: - output["full_text"] += self.play_pause - if self.can_next: - output["full_text"] += "\U000f04ad " # - - if self.color: - output["color"] = self.color - if self.background: - output["background"] = self.background - - return json.dumps(output, ensure_ascii=False) + return None + + def __iter__(self): + while self.title_deque: + title = "".join(islice(self.title_deque, 0, self.width)) + + components = [PlayerStatus.ICON] + if len(title) > 0: + components.append(title) + if self.can_prev: + components.append("\U000f04ae ") # + if self.can_pause: + components.append("\U000f03e4 ") # + if self.can_play: + components.append("\U000f040a ") # + if self.can_next: + components.append("\U000f04ad ") # + if v := self.volume: + match v: + case v if v > 66: + components.append("\U000f057e") # + case v if v > 33: + components.append("\U000f0580") # + case v: + components.append("\U000f057f") # + if v: + components.append(f"{v}%") + + output = {"full_text": f" {" ".join(components)} "} + if self.color: + output["color"] = self.color + if self.background: + output["background"] = self.background + + yield output + + if isinstance(self.title_deque, deque): + self.title_deque.rotate(-1) + else: + break class MPRIS: @@ -112,8 +161,9 @@ class MPRIS: self.bus = None self.dbus_iface = None self.players = {} - self.active_players = [] - self.status = Status() + self.ifaces = {} + self.statuses = OrderedDict({}) + self.status_update = Event() @classmethod async def connect(cls): @@ -126,8 +176,6 @@ class MPRIS: await self.bus.introspect("org.freedesktop.DBus", "/org/freedesktop/DBus"), ).get_interface("org.freedesktop.DBus") - self.players = {} - for bus_name in await self.dbus_iface.call_list_names(): if self.valid_player(bus_name): await self.add_player(bus_name) @@ -138,49 +186,80 @@ class MPRIS: def valid_player(bus_name: str) -> bool: return bus_name.startswith(BUS_NAME_PREFIX) and bus_name not in BUS_NAME_IGNORE + async def promote_player(self, bus_name): + if self.statuses.get(bus_name): + self.statuses.move_to_end(bus_name) + else: + self.statuses[bus_name] = PlayerStatus(self.players[bus_name]) + + self.status_update.set() + + def remove_player(self, bus_name): + self.statuses.pop(bus_name, None) + self.status_update.set() + + def cycle_player(self): + if k := next(reversed(self.statuses), None): + self.promote_player(k) + self.status_update.set() + async def status_printer(self, interval=1): - while True: - print(str(self.status), flush=True) - await asyncio.sleep(interval) + while await self.status_update.wait(): + self.status_update.clear() + status = next(reversed(self.statuses.values()), PlayerStatus()) + for output in iter(status): + print(json.dumps(output, ensure_ascii=False), flush=True) + if self.status_update.is_set(): + break + else: + await asyncio.sleep(0.5) def on_properties_changed(self, bus_name: str, property, invalidated): - self.players[bus_name].update(parse_dbus_values(property)) - for name, value in self.players[bus_name].items(): - match name, value: - case "Metadata", _: - continue - case "PlaybackStatus", "Playing": - if bus_name in self.active_players: - self.active_players.remove(bus_name) - self.active_players.append(bus_name) - self.status.update_player(self.players[bus_name]) - case "PlaybackStatus", "Stopped" if bus_name in self.active_players: - self.active_players.remove(bus_name) - if len(self.active_players) > 0: - self.status.update_player(self.players[self.active_players[-1]]) + props = parse_dbus_values(property) + self.statuses[bus_name].update_player(props) + match props: + case {"PlaybackStatus": "Playing"}: + self.promote_player(bus_name) + case {"PlaybackStatus": "Stopped"}: + self.remove_player(bus_name) + + self.status_update.set() async def add_player(self, bus_name): self.players[bus_name] = {} - iface = self.bus.get_proxy_object( + obj = self.bus.get_proxy_object( bus_name, OBJECT_PATH, await self.bus.introspect(bus_name, OBJECT_PATH) - ).get_interface(PROPERTIES_INTERFACE) + ) + + properties = obj.get_interface(PROPERTIES_INTERFACE) self.players[bus_name] = parse_dbus_values( - await iface.call_get_all(PLAYER_INTERFACE) + await properties.call_get_all(PLAYER_INTERFACE) ) + self.ifaces[bus_name] = obj.get_interface(PLAYER_INTERFACE) - iface.on_properties_changed( + properties.on_properties_changed( lambda _, p, i: self.on_properties_changed(bus_name, p, i) ) - async def remove_player(self, bus_name): - self.players.pop(bus_name) + if self.players[bus_name].get("PlaybackStatus", "Stopped") != "Stopped": + self.statuses[bus_name] = PlayerStatus(self.players[bus_name]) + + if self.players[bus_name].get("PlaybackStatus", "Stopped") == "Paused": + self.statuses.move_to_end(bus_name, last=False) + + self.status_update.set() + + async def remove_player_iface(self, bus_name): + self.remove_player(bus_name) + self.players.pop(bus_name, None) + self.ifaces.pop(bus_name, None) async def on_name_owner_changed(self, bus_name, old, new): if self.valid_player(bus_name): if old: - await self.remove_player(bus_name) + self.remove_player(bus_name) if new: await self.add_player(bus_name) @@ -207,10 +286,16 @@ class MPRIS: case {"button": 2}: pass case {"button": 3}: - pass + self.cycle_player() + case {"button": 4}: - pass + if bus_name := next(reversed(self.statuses)): + volume = await self.ifaces[bus_name].get_volume() + await self.ifaces[bus_name].set_volume(volume + 0.05) case {"button": 5}: + if bus_name := next(reversed(self.statuses)): + volume = await self.ifaces[bus_name].get_volume() + await self.ifaces[bus_name].set_volume(volume - 0.05) pass @@ -218,19 +303,13 @@ async def main(): mpris = await MPRIS.connect() try: - async with asyncio.TaskGroup() as task_group: + async with TaskGroup() as task_group: task_group.create_task(mpris.status_printer()) task_group.create_task(mpris.listener()) - # task_group.create_task(mpris.button_handler()) - except asyncio.CancelledError: + task_group.create_task(mpris.button_handler()) + except CancelledError: return if __name__ == "__main__": - asyncio.run(main()) - asyncio.run(main()) - asyncio.run(main()) - asyncio.run(main()) - asyncio.run(main()) - asyncio.run(main()) - asyncio.run(main()) + run(main()) |