From a06fc43b26c9de9f37c5270a421af62c790e5df0 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Sun, 21 Jul 2024 17:06:40 -0500 Subject: fix(i3blocks): update config for i3blocks-mpris --- i3blocks/.local/lib/i3blocks/i3blocks-mpris | 292 ---------------------------- 1 file changed, 292 deletions(-) delete mode 100755 i3blocks/.local/lib/i3blocks/i3blocks-mpris (limited to 'i3blocks/.local') diff --git a/i3blocks/.local/lib/i3blocks/i3blocks-mpris b/i3blocks/.local/lib/i3blocks/i3blocks-mpris deleted file mode 100755 index 18fafc1..0000000 --- a/i3blocks/.local/lib/i3blocks/i3blocks-mpris +++ /dev/null @@ -1,292 +0,0 @@ -#!/usr/bin/env python3 - -import asyncio -import json -import os -import sys -from asyncio import CancelledError, TaskGroup, run -from enum import Enum, auto -from dbus_next import Variant -from dbus_next.aio import MessageBus -from dbus_next.errors import DBusError - - -BUS_NAME_PREFIX = "org.mpris.MediaPlayer2." -BUS_NAME_IGNORE = ["playerctld", "kdeconnect"] -OBJECT_PATH = "/org/mpris/MediaPlayer2" -MPRIS_INTERFACE = "org.mpris.MediaPlayer2" -PLAYER_INTERFACE = "org.mpris.MediaPlayer2.Player" -PROPERTIES_INTERFACE = "org.freedesktop.DBus.Properties" -PLAYERCTL_INTERFACE = "com.github.altdesktop.playerctld" -PLAYERCTL_BUS_NAME = "org.mpris.MediaPlayer2.playerctld" - - -def parse_dbus_values(item: Variant | dict | list) -> dict: - match item: - case Variant(): - return parse_dbus_values(item.value) - case dict(item): - return {k: parse_dbus_values(v) for k, v in item.items()} - case list(item): - return [parse_dbus_values(v) for v in item] - case v: - return v - - -class Section(Enum): - BLOCK = auto() - ICON = auto() - TITLE = auto() - PREV = auto() - PLAY = auto() - PAUSE = auto() - NEXT = auto() - VOLUME = auto() - - -class Printer: - BLACK = f"#{os.environ.get("BASE16_COLOR_00_HEX")}" - GREEN = f"#{os.environ.get("BASE16_COLOR_0C_HEX")}" - YELLOW = f"#{os.environ.get("BASE16_COLOR_0A_HEX")}" - ICONS = { - "icon": "\U000f075a", # 󰝚 - "prev": "\U000f04ae", # 󰒮 - "play": "\U000f040a", # 󰐊 - "pause": "\U000f03e4", # 󰏤 - "next": "\U000f04ad", # 󰒭 - "high": "\U000f057e", # 󰕾 - "med": "\U000f0580", # 󰖀 - "low": "\U000f057f", # 󰕿 - } - - def __init__(self, bus, bus_name): - self.bus = bus - self.bus_name = bus_name - self.updated = asyncio.Event() - self.title = None - self.title_iter = "" - self.mpris = None - self.player = None - self.playerctl = None - self.mpris_props = {} - self.properties = {} - - @classmethod - async def create(cls, bus, bus_name): - self = cls(bus, bus_name) - - player_obj = self.bus.get_proxy_object( - bus_name, - OBJECT_PATH, - await self.bus.introspect(self.bus_name, OBJECT_PATH), - ) - prop_iface = player_obj.get_interface(PROPERTIES_INTERFACE) - self.mpris = player_obj.get_interface(MPRIS_INTERFACE) - self.player = player_obj.get_interface(PLAYER_INTERFACE) - self.playerctl = self.bus.get_proxy_object( - PLAYERCTL_BUS_NAME, - OBJECT_PATH, - await self.bus.introspect(PLAYERCTL_BUS_NAME, OBJECT_PATH), - ).get_interface(PLAYERCTL_INTERFACE) - - self.mpris_props = await prop_iface.call_get_all(MPRIS_INTERFACE) - self.build(None, await prop_iface.call_get_all(PLAYER_INTERFACE), None) - - prop_iface.on_properties_changed(self.build) - return self - - def build(self, _bus_name, property, _invalidated): - self.properties |= parse_dbus_values(property) - - self.sections = dict() - self.width = 0 - - match self.properties.get("PlaybackStatus"): - case "Playing": - self.color = Printer.BLACK - self.background = Printer.GREEN - case "Paused": - self.color = Printer.BLACK - self.background = Printer.YELLOW - case _: - self.color = None - self.background = None - - self.text = f" {Printer.ICONS["icon"]} " - self.width += len(f" {Printer.ICONS["icon"]} ") - self.sections[self.width] = Section.ICON - - title = self.properties.get("Metadata", {}).get("xesam:title", "") - if title != self.title: - self.title = title - self.title_iter = self.title + " " - - self.text += "{title} " - self.width += min(len(self.title), 10) + 1 - self.sections[self.width] = Section.TITLE - - if self.properties.get("CanGoPrevious"): - self.text += f"{Printer.ICONS["prev"]} " - self.width += len(f"{Printer.ICONS["prev"]} ") - self.sections[self.width] = Section.PREV - - match self.properties.get("PlaybackStatus"): - case "Playing" if self.properties.get("CanPause"): - self.text += f"{Printer.ICONS["pause"]} " - self.width += len(f"{Printer.ICONS["pause"]} ") - self.sections[self.width] = Section.PAUSE - case _ if self.properties.get("CanPlay"): - self.text += f"{Printer.ICONS["play"]} " - self.width += len(f"{Printer.ICONS["play"]} ") - self.sections[self.width] = Section.PLAY - - if self.properties.get("CanGoNext"): - self.text += f"{Printer.ICONS["next"]} " - self.width += len(f"{Printer.ICONS["next"]} ") - self.sections[self.width] = Section.NEXT - - if vol := self.properties.get("Volume"): - match vol: - case float(vol) if vol > 0.66: - icon = Printer.ICONS["high"] - case float(vol) if vol > 0.33: - icon = Printer.ICONS["med"] - case _: - icon = Printer.ICONS["low"] - - self.text += f"{icon} {vol:.0%} " - self.width += len(f"{icon} {vol:.0%} ") - self.sections[self.width] = Section.VOLUME - - self.updated.set() - self.print() - - async def handle_button(self, data) -> bool: - match data: - case {"button": b, "relative_x": x, "width": w}: - button = b - pos = x / w - case _: - return False - - section = Section.BLOCK - for n, section in self.sections.items(): - if (n / self.width) > pos: - break - - try: - match (section, button): - case (Section.ICON, 1) if self.mpris_props.get("CanRaise"): - await self.mpris.call_raise() - case (Section.TITLE, 1): - pass - case (Section.PREV, 1): - await self.player.call_previous() - case (Section.PLAY, 1) | (Section.PAUSE, 1): - await self.player.call_play_pause() - case (Section.NEXT, 1): - await self.player.call_next() - case (Section.VOLUME, 4): - volume = await self.player.get_volume() - await self.player.set_volume(volume + 0.05) - case (Section.VOLUME, 5): - volume = await self.player.get_volume() - await self.player.set_volume(volume - 0.05) - case (_, 3): - await self.playerctl.call_shift() - case (_, _): - return - except DBusError: - pass - - def print(self): - status = { - "full_text": self.text.format(title=self.title_iter[:10]), - "short_text": self.text.format(title=""), - "color": self.color, - "background": self.background, - } - print(json.dumps(status, ensure_ascii=False), flush=True) - - async def print_task(self): - while True: - self.updated.clear() - self.print() - if len(self.title) > 10: - self.title_iter = self.title_iter[1:] + self.title_iter[:1] - await asyncio.sleep(0.5) - else: - await self.updated.wait() - - -class MPRIS: - def __init__(self, task_group: TaskGroup): - self.task_group = task_group - self.printer_task = None - self.active = None - self.bus = None - self.playerctl = None - self.players = {} - - @classmethod - async def connect(cls, task_group: TaskGroup): - self = cls(task_group) - self.bus = await MessageBus().connect() - return self - - async def activate(self, bus_name): - if self.printer_task is not None: - self.printer_task.cancel() - - if bus_name == "": - bus_name = None - - if bus_name and bus_name not in self.players: - self.players[bus_name] = await Printer.create(self.bus, bus_name) - - if player := self.players.get(bus_name): - self.active = bus_name - self.printer_task = self.task_group.create_task(player.print_task()) - else: - print(flush=True) - return - - async def main(self): - playerctl = self.bus.get_proxy_object( - PLAYERCTL_BUS_NAME, - OBJECT_PATH, - await self.bus.introspect(PLAYERCTL_BUS_NAME, OBJECT_PATH), - ).get_interface(PLAYERCTL_INTERFACE) - - bus_name = next(iter(await playerctl.get_player_names()), None) - await self.activate(bus_name) - - playerctl.on_active_player_change_end(self.activate) - await self.bus.wait_for_disconnect() - - async def button_handler(self): - loop = asyncio.get_event_loop() - reader = asyncio.StreamReader() - protocol = asyncio.StreamReaderProtocol(reader) - await loop.connect_read_pipe(lambda: protocol, sys.stdin) - - while True: - if line := await reader.readline(): - if player := self.players.get(self.active): - await player.handle_button(json.loads(line)) - else: - await asyncio.sleep(1) - - -async def main(): - try: - async with TaskGroup() as task_group: - mpris = await MPRIS.connect(task_group) - task_group.create_task(mpris.main()) - task_group.create_task(mpris.button_handler()) - except CancelledError: - return - - -if __name__ == "__main__": - run(main()) -- cgit v1.2.3-70-g09d2