From 763e91a5a5366f730c27adadced5e9a7c2046560 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Tue, 25 Jun 2024 12:12:58 -0500 Subject: feat(i3blocks): rewrite and impl button handler for mpris --- i3blocks/.local/lib/i3blocks/i3blocks-mpris | 465 ++++++++++++---------------- 1 file changed, 199 insertions(+), 266 deletions(-) (limited to 'i3blocks/.local') diff --git a/i3blocks/.local/lib/i3blocks/i3blocks-mpris b/i3blocks/.local/lib/i3blocks/i3blocks-mpris index 3ccbd9a..c553f81 100755 --- a/i3blocks/.local/lib/i3blocks/i3blocks-mpris +++ b/i3blocks/.local/lib/i3blocks/i3blocks-mpris @@ -5,20 +5,20 @@ import json import os import sys from asyncio import CancelledError, TaskGroup, run -from collections import defaultdict, deque -from dataclasses import dataclass, field -from functools import cached_property -from itertools import islice - +from enum import Enum, auto from dbus_next import Variant from dbus_next.aio import MessageBus from dbus_next.errors import DBusError + BUS_NAME_PREFIX = "org.mpris.MediaPlayer2." BUS_NAME_IGNORE = ["playerctld", "kdeconnect"] OBJECT_PATH = "/org/mpris/MediaPlayer2" +MPRIS_INTERFACE = "org.mpris.MediaPlayer2" PLAYER_INTERFACE = "org.mpris.MediaPlayer2.Player" PROPERTIES_INTERFACE = "org.freedesktop.DBus.Properties" +PLAYERCTL_INTERFACE = "com.github.altdesktop.playerctld" +PLAYERCTL_BUS_NAME = "org.mpris.MediaPlayer2.playerctld" def parse_dbus_values(item: Variant | dict | list) -> dict: @@ -33,31 +33,15 @@ def parse_dbus_values(item: Variant | dict | list) -> dict: return v -@dataclass -class ScrollingText: - """Class to produce a rotating string iterator""" - - text: str = "" - width: int = 10 - deque: deque = field(init=False) - rotate: bool = field(init=False) - - def __post_init__(self): - self.rotate = len(self.text) > self.width - self.deque = deque(list(self.text + " ")) - - def update(self, value: str = "") -> None: - if value == self.text: - return - self.text = "" if value is None else value - self.__post_init__() - - def __iter__(self): - while self.rotate: - yield "".join(islice(self.deque, 0, self.width)) - self.deque.rotate(-1) - - yield self.text +class Section(Enum): + BLOCK = auto() + ICON = auto() + TITLE = auto() + PREV = auto() + PLAY = auto() + PAUSE = auto() + NEXT = auto() + VOLUME = auto() class Printer: @@ -75,244 +59,212 @@ class Printer: "low": "\U000f057f", # 󰕿 } - def __init__(self, name, player, width: int = 10): - self.name = name - self.text = ScrollingText() - self.player = player - self.width = width - - @property - def player(self): - return self._player - - @player.setter - def player(self, value): - self._player = value - # clear possibly outdated properties - # See: https://stackoverflow.com/a/69367025/11477336 - for attr in [ - "title", - "controls", - "volume", - "color", - "background", - "short_text", - ]: - self.__dict__.pop(attr, None) - - self.text.update(self.title) - - @cached_property - def title(self) -> str: - return self.player.get("Metadata", {}).get("xesam:title", "") - - @cached_property - def controls(self) -> str: - controls = [] - - if self.player.get("CanGoPrevious"): - controls.append(Printer.ICONS["prev"]) - - match self.player.get("PlaybackStatus"): - case "Playing" if self.player.get("CanPause"): - controls.append(Printer.ICONS["pause"]) - case _ if self.player.get("CanPlay"): - controls.append(Printer.ICONS["play"]) - - if self.player.get("CanGoNext"): - controls.append(Printer.ICONS["next"]) - - if len(controls) > 0: - return " ".join(controls) + " " - - @cached_property - def volume(self) -> str: - match self.player.get("Volume"): - case float(vol) if vol > 0.66: - return f"{Printer.ICONS["high"]} {vol:.0%}" - case float(vol) if vol > 0.33: - return f"{Printer.ICONS["med"]} {vol:.0%}" - case float(vol): - return f"{Printer.ICONS["low"]} {vol:.0%}" - - @cached_property - def background(self): - match self.player.get("PlaybackStatus"): + def __init__(self, bus, bus_name): + self.bus = bus + self.bus_name = bus_name + self.updated = asyncio.Event() + self.title = None + self.mpris = None + self.player = None + self.playerctl = None + self.mpris_props = {} + self.properties = {} + + @classmethod + async def create(cls, bus, bus_name): + self = cls(bus, bus_name) + + player_obj = self.bus.get_proxy_object( + bus_name, + OBJECT_PATH, + await self.bus.introspect(self.bus_name, OBJECT_PATH), + ) + prop_iface = player_obj.get_interface(PROPERTIES_INTERFACE) + self.mpris = player_obj.get_interface(MPRIS_INTERFACE) + self.player = player_obj.get_interface(PLAYER_INTERFACE) + self.playerctl = self.bus.get_proxy_object( + PLAYERCTL_BUS_NAME, + OBJECT_PATH, + await self.bus.introspect(PLAYERCTL_BUS_NAME, OBJECT_PATH), + ).get_interface(PLAYERCTL_INTERFACE) + + self.mpris_props = await prop_iface.call_get_all(MPRIS_INTERFACE) + self.build(None, await prop_iface.call_get_all(PLAYER_INTERFACE), None) + + prop_iface.on_properties_changed(self.build) + return self + + def build(self, _bus_name, property, _invalidated): + self.properties |= parse_dbus_values(property) + + self.sections = dict() + self.width = 0 + + match self.properties.get("PlaybackStatus"): case "Playing": - return Printer.GREEN + self.color = Printer.BLACK + self.background = Printer.GREEN case "Paused": - return Printer.YELLOW + self.color = Printer.BLACK + self.background = Printer.YELLOW case _: - return None - - @cached_property - def color(self): - match self.player.get("PlaybackStatus"): - case "Playing" | "Paused": - return Printer.BLACK + self.color = None + self.background = None + + self.text = f" {Printer.ICONS["icon"]} " + self.width += len(f" {Printer.ICONS["icon"]} ") + self.sections[self.width] = Section.ICON + + title = self.properties.get("Metadata", {}).get("xesam:title") + if title != self.title: + self.title = title + self.title_iter = self.title + " " + + self.text += "{title} " + self.width += min(len(self.title), 10) + 1 + self.sections[self.width] = Section.TITLE + + if self.properties.get("CanGoPrevious"): + self.text += f"{Printer.ICONS["prev"]} " + self.width += len(f"{Printer.ICONS["prev"]} ") + self.sections[self.width] = Section.PREV + + match self.properties.get("PlaybackStatus"): + case "Playing" if self.properties.get("CanPause"): + self.text += f"{Printer.ICONS["pause"]} " + self.width += len(f"{Printer.ICONS["pause"]} ") + self.sections[self.width] = Section.PAUSE + case _ if self.properties.get("CanPlay"): + self.text += f"{Printer.ICONS["play"]} " + self.width += len(f"{Printer.ICONS["play"]} ") + self.sections[self.width] = Section.PLAY + + if self.properties.get("CanGoNext"): + self.text += f"{Printer.ICONS["next"]} " + self.width += len(f"{Printer.ICONS["next"]} ") + self.sections[self.width] = Section.NEXT + + if vol := self.properties.get("Volume"): + match vol: + case float(vol) if vol > 0.66: + icon = Printer.ICONS["high"] + case float(vol) if vol > 0.33: + icon = Printer.ICONS["med"] + case _: + icon = Printer.ICONS["low"] + + self.text += f"{icon} {vol:.0%} " + self.width += len(f"{icon} {vol:.0%} ") + self.sections[self.width] = Section.VOLUME + + self.updated.set() + + async def handle_button(self, data) -> bool: + with open("/tmp/mpris.json", "w") as f: + f.write(json.dumps(data)) + + match data: + case {"button": b, "relative_x": x, "width": w}: + button = b + pos = x / w case _: - return None - - @cached_property - def short_text(self) -> str: - s = f" {Printer.ICONS["icon"]} " - if c := self.controls: - s += f" {c} " - return s - - def status(self) -> dict: - full_text = ["", Printer.ICONS["icon"], ""] - - if c := self.controls: - full_text.append(c) - - if v := self.volume: - full_text.append(v) - - full_text.append("") - for title in iter(self.text): - full_text[2] = title - yield { - "full_text": " ".join(full_text), - "short_text": self.short_text, - "color": self.color, - "background": self.background, - } - - async def print(self): - for status in iter(self.status()): - print(json.dumps(status, ensure_ascii=False), flush=True) - await asyncio.sleep(0.5) - - def toggle_name(self): - if self.text.text == self.name: - self.text.update(self.title) - else: - self.text.update(self.name) + return False + + section = Section.BLOCK + for n, section in self.sections.items(): + if (n / self.width) > pos: + break + + try: + match (section, button): + case (Section.ICON, 1) if self.mpris_props.get("CanRaise"): + await self.mpris.call_raise() + case (Section.TITLE, 1): + pass + case (Section.PREV, 1): + await self.player.call_previous() + case (Section.PLAY, 1) | (Section.PAUSE, 1): + await self.player.call_play_pause() + case (Section.NEXT, 1): + await self.player.call_next() + case (Section.VOLUME, 4): + volume = await self.player.get_volume() + await self.player.set_volume(volume + 0.05) + case (Section.VOLUME, 5): + volume = await self.player.get_volume() + await self.player.set_volume(volume - 0.05) + case (_, 3): + await self.playerctl.call_shift() + case (_, _): + return + + self.print() + except DBusError: + pass + + def print(self): + status = { + "full_text": self.text.format(title=self.title_iter[:10]), + "short_text": self.text.format(title=""), + "color": self.color, + "background": self.background, + } + print(json.dumps(status, ensure_ascii=False), flush=True) + + async def print_task(self): + while True: + self.updated.clear() + self.print() + if len(self.title) > 10: + self.title_iter = self.title_iter[1:] + self.title_iter[:1] + await asyncio.sleep(0.5) + else: + await self.updated.wait() class MPRIS: def __init__(self, task_group: TaskGroup): self.task_group = task_group self.printer_task = None - self.active_buses: deque(str) = deque([]) + self.active = None self.bus = None - self.dbus_iface = None - self.players = defaultdict(lambda: {}) - self.player_iter = self.players.keys() - self.ifaces = {} + self.playerctl = None + self.players = {} @classmethod async def connect(cls, task_group: TaskGroup): self = cls(task_group) self.bus = await MessageBus().connect() - - self.dbus_iface = self.bus.get_proxy_object( - "org.freedesktop.DBus", - "/org/freedesktop/DBus", - await self.bus.introspect("org.freedesktop.DBus", "/org/freedesktop/DBus"), - ).get_interface("org.freedesktop.DBus") - - for bus_name in await self.dbus_iface.call_list_names(): - if self.valid_player(bus_name): - await self.add_player(bus_name) - return self - @staticmethod - def valid_player(bus_name: str) -> bool: - return bus_name.startswith(BUS_NAME_PREFIX) and not any( - bus_name.removeprefix(BUS_NAME_PREFIX).startswith(n) - for n in BUS_NAME_IGNORE - ) - - @property - def active_bus(self) -> str | None: - if len(self.active_buses) > 0: - return self.active_buses[-1] - - @property - def active_player(self) -> Printer | None: - return self.players.get(self.active_bus) - - @property - def active_iface(self) -> Printer | None: - return self.ifaces.get(self.active_bus) - - def cycle_player(self): - self.active_buses.rotate(-1) - self.update_printer() - - def update_printer(self): + async def activate(self, bus_name): if self.printer_task is not None: self.printer_task.cancel() - if player := self.active_player: - self.printer_task = self.task_group.create_task(player.print()) - else: - print(flush=True) - - def on_properties_changed(self, bus_name: str, property, invalidated): - props = parse_dbus_values(property) - self.players[bus_name].player |= props - match props: - case {"PlaybackStatus": "Playing"}: - if bus_name in self.active_buses: - self.active_buses.remove(bus_name) - self.active_buses.append(bus_name) - case {"PlaybackStatus": "Stopped"}: - self.active_buses.remove(bus_name) - - self.update_printer() - - async def add_player(self, bus_name): - properties = None - self.players[bus_name] = None - self.ifaces[bus_name] = None - - obj = self.bus.get_proxy_object( - bus_name, OBJECT_PATH, await self.bus.introspect(bus_name, OBJECT_PATH) - ) + if bus_name == "": + bus_name = None - properties = obj.get_interface(PROPERTIES_INTERFACE) + if bus_name and bus_name not in self.players: + self.players[bus_name] = await Printer.create(self.bus, bus_name) - self.players[bus_name] = Printer( - bus_name.removeprefix(BUS_NAME_PREFIX), - parse_dbus_values(await properties.call_get_all(PLAYER_INTERFACE)), - ) - self.ifaces[bus_name] = obj.get_interface(PLAYER_INTERFACE) + if player := self.players.get(bus_name): + self.active = bus_name + self.printer_task = self.task_group.create_task(player.print_task()) + else: + print(flush=True) + return - properties.on_properties_changed( - lambda _, p, i: self.on_properties_changed(bus_name, p, i) - ) + async def main(self): + playerctl = self.bus.get_proxy_object( + PLAYERCTL_BUS_NAME, + OBJECT_PATH, + await self.bus.introspect(PLAYERCTL_BUS_NAME, OBJECT_PATH), + ).get_interface(PLAYERCTL_INTERFACE) - if bus_name in self.active_buses: - self.active_buses.remove(bus_name) + bus_name = next(iter(await playerctl.get_player_names()), None) + await self.activate(bus_name) - match self.players[bus_name].player.get("PlaybackStatus"): - case "Playing": - self.active_buses.append(bus_name) - self.update_printer() - case "Paused": - self.active_buses.appendleft(bus_name) - self.update_printer() - - def remove_player(self, bus_name): - self.players.pop(bus_name, None) - self.ifaces.pop(bus_name, None) - if bus_name in self.active_buses: - self.active_buses.remove(bus_name) - self.update_printer() - - async def on_name_owner_changed(self, bus_name, old, new): - if self.valid_player(bus_name): - if old: - self.remove_player(bus_name) - if new: - await self.add_player(bus_name) - - async def listener(self): - self.dbus_iface.on_name_owner_changed(self.on_name_owner_changed) + playerctl.on_active_player_change_end(self.activate) await self.bus.wait_for_disconnect() async def button_handler(self): @@ -322,37 +274,18 @@ class MPRIS: await loop.connect_read_pipe(lambda: protocol, sys.stdin) while True: - line = await reader.readline() - - if not line: + if line := await reader.readline(): + if player := self.players.get(self.active): + await player.handle_button(json.loads(line)) + else: await asyncio.sleep(1) - continue - - try: - match json.loads(line): - case {"button": 1} if iface := self.active_iface: - await iface.call_play_pause() - case {"button": 2} if player := self.active_player: - player.toggle_name() - self.update_printer() - case {"button": 3}: - self.active_buses.rotate(-1) - self.update_printer() - case {"button": 4} if iface := self.active_iface: - volume = await iface.get_volume() - await iface.set_volume(volume + 0.05) - case {"button": 5} if iface := self.active_iface: - volume = await iface.get_volume() - await iface.set_volume(volume - 0.05) - except DBusError: - pass async def main(): try: async with TaskGroup() as task_group: mpris = await MPRIS.connect(task_group) - task_group.create_task(mpris.listener()) + task_group.create_task(mpris.main()) task_group.create_task(mpris.button_handler()) except CancelledError: return -- cgit v1.2.3-70-g09d2