From 4ec1572889fc5cb525e23bc776d44922247ecd12 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Thu, 6 Jun 2024 17:28:44 -0500 Subject: fix(i3blocks): clean up scrolling text --- i3blocks/.local/lib/i3blocks/i3blocks-mpris | 173 ++++++++++++++++------------ 1 file changed, 101 insertions(+), 72 deletions(-) (limited to 'i3blocks/.local/lib') diff --git a/i3blocks/.local/lib/i3blocks/i3blocks-mpris b/i3blocks/.local/lib/i3blocks/i3blocks-mpris index 8f7be61..3ccbd9a 100755 --- a/i3blocks/.local/lib/i3blocks/i3blocks-mpris +++ b/i3blocks/.local/lib/i3blocks/i3blocks-mpris @@ -6,6 +6,7 @@ import os import sys from asyncio import CancelledError, TaskGroup, run from collections import defaultdict, deque +from dataclasses import dataclass, field from functools import cached_property from itertools import islice @@ -14,7 +15,7 @@ from dbus_next.aio import MessageBus from dbus_next.errors import DBusError BUS_NAME_PREFIX = "org.mpris.MediaPlayer2." -BUS_NAME_IGNORE = ["org.mpris.MediaPlayer2.playerctld"] +BUS_NAME_IGNORE = ["playerctld", "kdeconnect"] OBJECT_PATH = "/org/mpris/MediaPlayer2" PLAYER_INTERFACE = "org.mpris.MediaPlayer2.Player" PROPERTIES_INTERFACE = "org.freedesktop.DBus.Properties" @@ -32,6 +33,33 @@ def parse_dbus_values(item: Variant | dict | list) -> dict: return v +@dataclass +class ScrollingText: + """Class to produce a rotating string iterator""" + + text: str = "" + width: int = 10 + deque: deque = field(init=False) + rotate: bool = field(init=False) + + def __post_init__(self): + self.rotate = len(self.text) > self.width + self.deque = deque(list(self.text + " ")) + + def update(self, value: str = "") -> None: + if value == self.text: + return + self.text = "" if value is None else value + self.__post_init__() + + def __iter__(self): + while self.rotate: + yield "".join(islice(self.deque, 0, self.width)) + self.deque.rotate(-1) + + yield self.text + + class Printer: BLACK = f"#{os.environ.get("BASE16_COLOR_00_HEX")}" GREEN = f"#{os.environ.get("BASE16_COLOR_0C_HEX")}" @@ -47,23 +75,11 @@ class Printer: "low": "\U000f057f", # 󰕿 } - def __init__(self, player, width: int = 10): - self.width = width + def __init__(self, name, player, width: int = 10): + self.name = name + self.text = ScrollingText() self.player = player - - def __iter__(self): - while self.iter: - yield { - "full_text": self.full_text, - "short_text": self.short_text, - "color": self.color, - "background": self.background, - } - - if len(self.iter) > self.width: - self.iter.rotate(-1) - else: - break + self.width = width @property def player(self): @@ -84,15 +100,11 @@ class Printer: ]: self.__dict__.pop(attr, None) + self.text.update(self.title) + @cached_property - def title(self): - title = self.player.get("Metadata", {"xesam:title": self.prev_title}).get( - "xesam:title" - ) - if title != self.prev_title: - self.__dict__.pop("iter", None) - self.prev_title = title - return self.prev_title + def title(self) -> str: + return self.player.get("Metadata", {}).get("xesam:title", "") @cached_property def controls(self) -> str: @@ -141,14 +153,6 @@ class Printer: case _: return None - @cached_property - def iter(self) -> list | deque: - match self.player.get("Metadata", {}).get("xesam:title", ""): - case str(""): - return [""] - case str(s): - return deque(list(s + " ")) - @cached_property def short_text(self) -> str: s = f" {Printer.ICONS["icon"]} " @@ -156,29 +160,42 @@ class Printer: s += f" {c} " return s - @property - def full_text(self) -> str: - s = f" {Printer.ICONS["icon"]} {"".join(islice(self.iter, 0, self.width))} " + def status(self) -> dict: + full_text = ["", Printer.ICONS["icon"], ""] if c := self.controls: - s += f"{c} " + full_text.append(c) if v := self.volume: - s += f"{v} " + full_text.append(v) - return s + full_text.append("") + for title in iter(self.text): + full_text[2] = title + yield { + "full_text": " ".join(full_text), + "short_text": self.short_text, + "color": self.color, + "background": self.background, + } async def print(self): - for status in iter(self): + for status in iter(self.status()): print(json.dumps(status, ensure_ascii=False), flush=True) await asyncio.sleep(0.5) + def toggle_name(self): + if self.text.text == self.name: + self.text.update(self.title) + else: + self.text.update(self.name) + class MPRIS: def __init__(self, task_group: TaskGroup): self.task_group = task_group self.printer_task = None - self.active_players = deque([]) + self.active_buses: deque(str) = deque([]) self.bus = None self.dbus_iface = None self.players = defaultdict(lambda: {}) @@ -204,25 +221,34 @@ class MPRIS: @staticmethod def valid_player(bus_name: str) -> bool: - return bus_name.startswith(BUS_NAME_PREFIX) and bus_name not in BUS_NAME_IGNORE + return bus_name.startswith(BUS_NAME_PREFIX) and not any( + bus_name.removeprefix(BUS_NAME_PREFIX).startswith(n) + for n in BUS_NAME_IGNORE + ) + + @property + def active_bus(self) -> str | None: + if len(self.active_buses) > 0: + return self.active_buses[-1] + + @property + def active_player(self) -> Printer | None: + return self.players.get(self.active_bus) @property - def active_player(self): - if len(self.active_players) > 0: - return self.active_players[-1] + def active_iface(self) -> Printer | None: + return self.ifaces.get(self.active_bus) def cycle_player(self): - self.active_players.rotate(-1) + self.active_buses.rotate(-1) self.update_printer() def update_printer(self): if self.printer_task is not None: self.printer_task.cancel() - if len(self.active_players) > 0: - self.printer_task = self.task_group.create_task( - self.players[self.active_player].print() - ) + if player := self.active_player: + self.printer_task = self.task_group.create_task(player.print()) else: print(flush=True) @@ -231,11 +257,11 @@ class MPRIS: self.players[bus_name].player |= props match props: case {"PlaybackStatus": "Playing"}: - if bus_name in self.active_players: - self.active_players.remove(bus_name) - self.active_players.append(bus_name) + if bus_name in self.active_buses: + self.active_buses.remove(bus_name) + self.active_buses.append(bus_name) case {"PlaybackStatus": "Stopped"}: - self.active_players.remove(bus_name) + self.active_buses.remove(bus_name) self.update_printer() @@ -251,7 +277,8 @@ class MPRIS: properties = obj.get_interface(PROPERTIES_INTERFACE) self.players[bus_name] = Printer( - parse_dbus_values(await properties.call_get_all(PLAYER_INTERFACE)) + bus_name.removeprefix(BUS_NAME_PREFIX), + parse_dbus_values(await properties.call_get_all(PLAYER_INTERFACE)), ) self.ifaces[bus_name] = obj.get_interface(PLAYER_INTERFACE) @@ -259,22 +286,22 @@ class MPRIS: lambda _, p, i: self.on_properties_changed(bus_name, p, i) ) - if bus_name in self.active_players: - self.active_players.remove(bus_name) + if bus_name in self.active_buses: + self.active_buses.remove(bus_name) match self.players[bus_name].player.get("PlaybackStatus"): case "Playing": - self.active_players.append(bus_name) + self.active_buses.append(bus_name) self.update_printer() case "Paused": - self.active_players.appendleft(bus_name) + self.active_buses.appendleft(bus_name) self.update_printer() def remove_player(self, bus_name): self.players.pop(bus_name, None) self.ifaces.pop(bus_name, None) - if bus_name in self.active_players: - self.active_players.remove(bus_name) + if bus_name in self.active_buses: + self.active_buses.remove(bus_name) self.update_printer() async def on_name_owner_changed(self, bus_name, old, new): @@ -303,18 +330,20 @@ class MPRIS: try: match json.loads(line): - case {"button": 1} if bus_name := self.active_player: - await self.ifaces[bus_name].call_play_pause() - case {"button": 2}: - pass + case {"button": 1} if iface := self.active_iface: + await iface.call_play_pause() + case {"button": 2} if player := self.active_player: + player.toggle_name() + self.update_printer() case {"button": 3}: - self.cycle_player() - case {"button": 4} if bus_name := self.active_player: - volume = await self.ifaces[bus_name].get_volume() - await self.ifaces[bus_name].set_volume(volume + 0.05) - case {"button": 5} if bus_name := self.active_player: - volume = await self.ifaces[bus_name].get_volume() - await self.ifaces[bus_name].set_volume(volume - 0.05) + self.active_buses.rotate(-1) + self.update_printer() + case {"button": 4} if iface := self.active_iface: + volume = await iface.get_volume() + await iface.set_volume(volume + 0.05) + case {"button": 5} if iface := self.active_iface: + volume = await iface.get_volume() + await iface.set_volume(volume - 0.05) except DBusError: pass -- cgit v1.2.3-70-g09d2