diff options
-rw-r--r-- | i3blocks/.config/i3blocks/config | 2 | ||||
-rwxr-xr-x | i3blocks/.local/lib/i3blocks/i3blocks-mpris | 326 |
2 files changed, 154 insertions, 174 deletions
diff --git a/i3blocks/.config/i3blocks/config b/i3blocks/.config/i3blocks/config index 51fed44..3765819 100644 --- a/i3blocks/.config/i3blocks/config +++ b/i3blocks/.config/i3blocks/config @@ -32,7 +32,7 @@ command=$SCRIPT_DIR/i3blocks-gpu [net] command=$SCRIPT_DIR/i3blocks-net -[volume] +[mpris] command=$SCRIPT_DIR/i3blocks-mpris interval=persist format=json diff --git a/i3blocks/.local/lib/i3blocks/i3blocks-mpris b/i3blocks/.local/lib/i3blocks/i3blocks-mpris index 6d8328b..ed9279a 100755 --- a/i3blocks/.local/lib/i3blocks/i3blocks-mpris +++ b/i3blocks/.local/lib/i3blocks/i3blocks-mpris @@ -1,11 +1,11 @@ #!/usr/bin/env python3 import asyncio -from asyncio import Event, CancelledError, TaskGroup, run import json import os import sys -from collections import deque, OrderedDict +from asyncio import CancelledError, TaskGroup, run +from collections import defaultdict, deque from itertools import islice from dbus_next import Variant @@ -30,144 +30,129 @@ def parse_dbus_values(item: Variant | dict | list) -> dict: return v -class PlayerStatus: - ICON = "\U000f075a" # +class Printer: BLACK = f"#{os.environ.get("BASE16_COLOR_00_HEX")}" GREEN = f"#{os.environ.get("BASE16_COLOR_0C_HEX")}" YELLOW = f"#{os.environ.get("BASE16_COLOR_0A_HEX")}" - - def __init__(self, player={}, width: int = 10): - self.width = width + ICONS = { + "icon": "\U000f075a", # + "prev": "\U000f04ae ", # + "play": "\U000f040a ", # + "pause": "\U000f03e4 ", # + "next": "\U000f04ad ", # + "high": "\U000f057e", # + "med": "\U000f0580", # + "low": "\U000f057f", # + } + + FMT = " {icon} {title} {prev} {play_pause} {next} {vol} {volume:.0%} " + + def __init__(self, player, width: int = 10): self.player = {} - self.update_player(player) - - def update_player(self, player): - old_title = self.title - self.player |= player - title = player.get("Metadata", {}).get("xesam:title", "") - if old_title != title: - if len(title) > self.width: - self.title_deque = deque(list(title + " ")) - else: - self.title_deque = list(title) - - @property - def status(self): - return self.player.get("PlaybackStatus", "Stopped") - - @property - def playing(self): - return self.status == "Playing" - - @property - def paused(self): - return self.status == "Paused" - - @property - def stopped(self): - return self.status == "Stopped" - - @property - def can_prev(self): - return self.player.get("CanGoPrevious", False) - - @property - def can_play(self) -> bool: - return self.player.get("CanPlay", False) and self.paused - - @property - def can_pause(self) -> bool: - return self.player.get("CanPause", False) and self.playing - - @property - def can_next(self): - return self.player.get("CanGoNext", False) - - @property - def metadata(self): - return self.player.get("Metadata", {}) - - @property - def title(self): - return self.metadata.get("xesam:title", "") - - @property - def volume(self): - if self.stopped: - return None - if v := self.player.get("Volume"): - return round(float(v) * 100) - - @property - def color(self) -> str | None: - match self.status: - case "Playing" | "Paused": - return PlayerStatus.BLACK - case "Stopped": - return None - - @property - def background(self) -> str | None: - match self.status: + self.values = { + "icon": Printer.ICONS["icon"], + "prev": "XDELETEX", + "play_pause": "XDELETEX", + "next": "XDELETEX", + "vol": "XDELETEX", + "volume": 1337, + } + self.width = width + self.iter = [""] + self.update_props(player) + + def update_props(self, props): + old_title = self.player.get("Metadata", {}).get("xesam:title") + self.player |= props + + match self.player.get("Metadata", {}).get("xesam:title"): + case None: + pass + case "": + self.iter = [""] + case str(s) if s != old_title: + self.iter = deque(list(s + " ")) + + if self.player.get("CanGoPrevious"): + self.values["prev"] = Printer.ICONS["prev"] + + match self.player.get("PlaybackStatus"): + case "Playing" if self.player.get("CanPause"): + self.values["play_pause"] = Printer.ICONS["pause"] + case _ if self.player.get("CanPlay"): + self.values["play_pause"] = Printer.ICONS["play"] + + if self.player.get("CanGoNext"): + self.values["next"] = Printer.ICONS["next"] + + if self.player.get("Volume"): + self.values["volume"] = self.player.get("Volume") + match self.player.get("Volume"): + case float(vol) if vol > 0.66: + self.values["vol"] = Printer.ICONS["high"] + case float(vol) if vol > 0.33: + self.values["vol"] = Printer.ICONS["med"] + case float(vol): + self.values["vol"] = Printer.ICONS["low"] + + match self.player.get("PlaybackStatus"): case "Playing": - return PlayerStatus.GREEN + self.colors = { + "color": Printer.BLACK, + "background": Printer.GREEN, + } case "Paused": - return PlayerStatus.YELLOW + self.colors = { + "color": Printer.BLACK, + "background": Printer.YELLOW, + } case "Stopped": - return None + self.colors = {} def __iter__(self): - while self.title_deque: - title = "".join(islice(self.title_deque, 0, self.width)) - - components = [PlayerStatus.ICON] - if len(title) > 0: - components.append(title) - if self.can_prev: - components.append("\U000f04ae ") # - if self.can_pause: - components.append("\U000f03e4 ") # - if self.can_play: - components.append("\U000f040a ") # - if self.can_next: - components.append("\U000f04ad ") # - if v := self.volume: - match v: - case v if v > 66: - components.append("\U000f057e") # - case v if v > 33: - components.append("\U000f0580") # - case v: - components.append("\U000f057f") # - if v: - components.append(f"{v}%") - - output = {"full_text": f" {" ".join(components)} "} - if self.color: - output["color"] = self.color - if self.background: - output["background"] = self.background - + while self.iter: + full_text = ( + Printer.FMT.format( + title="".join(islice(self.iter, 0, self.width)), + **self.values, + **self.player, + ) + .strip() + .replace("XDELETEX ", "") + .replace(" 133700%", "") + ) + + output = {"full_text": full_text.center(len(full_text) + 2)} + output.update(self.colors) yield output - if isinstance(self.title_deque, deque): - self.title_deque.rotate(-1) + if len(self.iter) > self.width: + self.iter.rotate(-1) else: break + async def print(self): + for status in iter(self): + print(json.dumps(status, ensure_ascii=False), flush=True) + await asyncio.sleep(0.5) + class MPRIS: - def __init__(self): + PROPS = ["xesam:title"] + + def __init__(self, task_group: TaskGroup): + self.task_group = task_group + self.printer_task = None + self.active_players = deque([]) self.bus = None self.dbus_iface = None - self.players = {} + self.players = defaultdict(lambda: {}) + self.player_iter = self.players.keys() self.ifaces = {} - self.statuses = OrderedDict({}) - self.status_update = Event() @classmethod - async def connect(cls): - self = cls() + async def connect(cls, task_group: TaskGroup): + self = cls(task_group) self.bus = await MessageBus().connect() self.dbus_iface = self.bus.get_proxy_object( @@ -186,47 +171,43 @@ class MPRIS: def valid_player(bus_name: str) -> bool: return bus_name.startswith(BUS_NAME_PREFIX) and bus_name not in BUS_NAME_IGNORE - async def promote_player(self, bus_name): - if self.statuses.get(bus_name): - self.statuses.move_to_end(bus_name) - else: - self.statuses[bus_name] = PlayerStatus(self.players[bus_name]) + @property + def active_player(self): + if len(self.active_players) > 0: + return self.active_players[-1] - self.status_update.set() + def cycle_player(self): + self.active_players.rotate(-1) + self.update_printer() - def remove_player(self, bus_name): - self.statuses.pop(bus_name, None) - self.status_update.set() + def update_printer(self): + if self.printer_task is not None: + self.printer_task.cancel() - def cycle_player(self): - if k := next(reversed(self.statuses), None): - self.promote_player(k) - self.status_update.set() - - async def status_printer(self, interval=1): - while await self.status_update.wait(): - self.status_update.clear() - status = next(reversed(self.statuses.values()), PlayerStatus()) - for output in iter(status): - print(json.dumps(output, ensure_ascii=False), flush=True) - if self.status_update.is_set(): - break - else: - await asyncio.sleep(0.5) + if len(self.active_players) > 0: + self.printer_task = self.task_group.create_task( + self.players[self.active_player].print() + ) + else: + print(flush=True) def on_properties_changed(self, bus_name: str, property, invalidated): props = parse_dbus_values(property) - self.statuses[bus_name].update_player(props) + self.players[bus_name].update_props(props) match props: case {"PlaybackStatus": "Playing"}: - self.promote_player(bus_name) + if bus_name in self.active_players: + self.active_players.remove(bus_name) + self.active_players.append(bus_name) case {"PlaybackStatus": "Stopped"}: - self.remove_player(bus_name) + self.active_players.remove(bus_name) - self.status_update.set() + self.update_printer() async def add_player(self, bus_name): - self.players[bus_name] = {} + properties = None + self.players[bus_name] = None + self.ifaces[bus_name] = None obj = self.bus.get_proxy_object( bus_name, OBJECT_PATH, await self.bus.introspect(bus_name, OBJECT_PATH) @@ -234,8 +215,8 @@ class MPRIS: properties = obj.get_interface(PROPERTIES_INTERFACE) - self.players[bus_name] = parse_dbus_values( - await properties.call_get_all(PLAYER_INTERFACE) + self.players[bus_name] = Printer( + parse_dbus_values(await properties.call_get_all(PLAYER_INTERFACE)) ) self.ifaces[bus_name] = obj.get_interface(PLAYER_INTERFACE) @@ -243,18 +224,23 @@ class MPRIS: lambda _, p, i: self.on_properties_changed(bus_name, p, i) ) - if self.players[bus_name].get("PlaybackStatus", "Stopped") != "Stopped": - self.statuses[bus_name] = PlayerStatus(self.players[bus_name]) - - if self.players[bus_name].get("PlaybackStatus", "Stopped") == "Paused": - self.statuses.move_to_end(bus_name, last=False) + if bus_name in self.active_players: + self.active_players.remove(bus_name) - self.status_update.set() + match self.players[bus_name].player.get("PlaybackStatus"): + case "Playing": + self.active_players.append(bus_name) + self.update_printer() + case "Paused": + self.active_players.appendleft(bus_name) + self.update_printer() - async def remove_player_iface(self, bus_name): - self.remove_player(bus_name) + def remove_player(self, bus_name): self.players.pop(bus_name, None) self.ifaces.pop(bus_name, None) + if bus_name in self.active_players: + self.active_players.remove(bus_name) + self.update_printer() async def on_name_owner_changed(self, bus_name, old, new): if self.valid_player(bus_name): @@ -281,30 +267,24 @@ class MPRIS: continue match json.loads(line): - case {"button": 1}: - pass + case {"button": 1} if bus_name := self.active_player: + await self.ifaces[bus_name].call_play_pause() case {"button": 2}: pass case {"button": 3}: self.cycle_player() - - case {"button": 4}: - if bus_name := next(reversed(self.statuses)): - volume = await self.ifaces[bus_name].get_volume() - await self.ifaces[bus_name].set_volume(volume + 0.05) - case {"button": 5}: - if bus_name := next(reversed(self.statuses)): - volume = await self.ifaces[bus_name].get_volume() - await self.ifaces[bus_name].set_volume(volume - 0.05) - pass + case {"button": 4} if bus_name := self.active_player: + volume = await self.ifaces[bus_name].get_volume() + await self.ifaces[bus_name].set_volume(volume + 0.05) + case {"button": 5} if bus_name := self.active_player: + volume = await self.ifaces[bus_name].get_volume() + await self.ifaces[bus_name].set_volume(volume - 0.05) async def main(): - mpris = await MPRIS.connect() - try: async with TaskGroup() as task_group: - task_group.create_task(mpris.status_printer()) + mpris = await MPRIS.connect(task_group) task_group.create_task(mpris.listener()) task_group.create_task(mpris.button_handler()) except CancelledError: |