aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/i3blocks/.local
diff options
context:
space:
mode:
Diffstat (limited to 'i3blocks/.local')
-rwxr-xr-xi3blocks/.local/lib/i3blocks/i3blocks-mpris465
1 files changed, 199 insertions, 266 deletions
diff --git a/i3blocks/.local/lib/i3blocks/i3blocks-mpris b/i3blocks/.local/lib/i3blocks/i3blocks-mpris
index 3ccbd9a..c553f81 100755
--- a/i3blocks/.local/lib/i3blocks/i3blocks-mpris
+++ b/i3blocks/.local/lib/i3blocks/i3blocks-mpris
@@ -5,20 +5,20 @@ import json
import os
import sys
from asyncio import CancelledError, TaskGroup, run
-from collections import defaultdict, deque
-from dataclasses import dataclass, field
-from functools import cached_property
-from itertools import islice
-
+from enum import Enum, auto
from dbus_next import Variant
from dbus_next.aio import MessageBus
from dbus_next.errors import DBusError
+
BUS_NAME_PREFIX = "org.mpris.MediaPlayer2."
BUS_NAME_IGNORE = ["playerctld", "kdeconnect"]
OBJECT_PATH = "/org/mpris/MediaPlayer2"
+MPRIS_INTERFACE = "org.mpris.MediaPlayer2"
PLAYER_INTERFACE = "org.mpris.MediaPlayer2.Player"
PROPERTIES_INTERFACE = "org.freedesktop.DBus.Properties"
+PLAYERCTL_INTERFACE = "com.github.altdesktop.playerctld"
+PLAYERCTL_BUS_NAME = "org.mpris.MediaPlayer2.playerctld"
def parse_dbus_values(item: Variant | dict | list) -> dict:
@@ -33,31 +33,15 @@ def parse_dbus_values(item: Variant | dict | list) -> dict:
return v
-@dataclass
-class ScrollingText:
- """Class to produce a rotating string iterator"""
-
- text: str = ""
- width: int = 10
- deque: deque = field(init=False)
- rotate: bool = field(init=False)
-
- def __post_init__(self):
- self.rotate = len(self.text) > self.width
- self.deque = deque(list(self.text + " "))
-
- def update(self, value: str = "") -> None:
- if value == self.text:
- return
- self.text = "" if value is None else value
- self.__post_init__()
-
- def __iter__(self):
- while self.rotate:
- yield "".join(islice(self.deque, 0, self.width))
- self.deque.rotate(-1)
-
- yield self.text
+class Section(Enum):
+ BLOCK = auto()
+ ICON = auto()
+ TITLE = auto()
+ PREV = auto()
+ PLAY = auto()
+ PAUSE = auto()
+ NEXT = auto()
+ VOLUME = auto()
class Printer:
@@ -75,244 +59,212 @@ class Printer:
"low": "\U000f057f", # 󰕿
}
- def __init__(self, name, player, width: int = 10):
- self.name = name
- self.text = ScrollingText()
- self.player = player
- self.width = width
-
- @property
- def player(self):
- return self._player
-
- @player.setter
- def player(self, value):
- self._player = value
- # clear possibly outdated properties
- # See: https://stackoverflow.com/a/69367025/11477336
- for attr in [
- "title",
- "controls",
- "volume",
- "color",
- "background",
- "short_text",
- ]:
- self.__dict__.pop(attr, None)
-
- self.text.update(self.title)
-
- @cached_property
- def title(self) -> str:
- return self.player.get("Metadata", {}).get("xesam:title", "")
-
- @cached_property
- def controls(self) -> str:
- controls = []
-
- if self.player.get("CanGoPrevious"):
- controls.append(Printer.ICONS["prev"])
-
- match self.player.get("PlaybackStatus"):
- case "Playing" if self.player.get("CanPause"):
- controls.append(Printer.ICONS["pause"])
- case _ if self.player.get("CanPlay"):
- controls.append(Printer.ICONS["play"])
-
- if self.player.get("CanGoNext"):
- controls.append(Printer.ICONS["next"])
-
- if len(controls) > 0:
- return " ".join(controls) + " "
-
- @cached_property
- def volume(self) -> str:
- match self.player.get("Volume"):
- case float(vol) if vol > 0.66:
- return f"{Printer.ICONS["high"]} {vol:.0%}"
- case float(vol) if vol > 0.33:
- return f"{Printer.ICONS["med"]} {vol:.0%}"
- case float(vol):
- return f"{Printer.ICONS["low"]} {vol:.0%}"
-
- @cached_property
- def background(self):
- match self.player.get("PlaybackStatus"):
+ def __init__(self, bus, bus_name):
+ self.bus = bus
+ self.bus_name = bus_name
+ self.updated = asyncio.Event()
+ self.title = None
+ self.mpris = None
+ self.player = None
+ self.playerctl = None
+ self.mpris_props = {}
+ self.properties = {}
+
+ @classmethod
+ async def create(cls, bus, bus_name):
+ self = cls(bus, bus_name)
+
+ player_obj = self.bus.get_proxy_object(
+ bus_name,
+ OBJECT_PATH,
+ await self.bus.introspect(self.bus_name, OBJECT_PATH),
+ )
+ prop_iface = player_obj.get_interface(PROPERTIES_INTERFACE)
+ self.mpris = player_obj.get_interface(MPRIS_INTERFACE)
+ self.player = player_obj.get_interface(PLAYER_INTERFACE)
+ self.playerctl = self.bus.get_proxy_object(
+ PLAYERCTL_BUS_NAME,
+ OBJECT_PATH,
+ await self.bus.introspect(PLAYERCTL_BUS_NAME, OBJECT_PATH),
+ ).get_interface(PLAYERCTL_INTERFACE)
+
+ self.mpris_props = await prop_iface.call_get_all(MPRIS_INTERFACE)
+ self.build(None, await prop_iface.call_get_all(PLAYER_INTERFACE), None)
+
+ prop_iface.on_properties_changed(self.build)
+ return self
+
+ def build(self, _bus_name, property, _invalidated):
+ self.properties |= parse_dbus_values(property)
+
+ self.sections = dict()
+ self.width = 0
+
+ match self.properties.get("PlaybackStatus"):
case "Playing":
- return Printer.GREEN
+ self.color = Printer.BLACK
+ self.background = Printer.GREEN
case "Paused":
- return Printer.YELLOW
+ self.color = Printer.BLACK
+ self.background = Printer.YELLOW
case _:
- return None
-
- @cached_property
- def color(self):
- match self.player.get("PlaybackStatus"):
- case "Playing" | "Paused":
- return Printer.BLACK
+ self.color = None
+ self.background = None
+
+ self.text = f" {Printer.ICONS["icon"]} "
+ self.width += len(f" {Printer.ICONS["icon"]} ")
+ self.sections[self.width] = Section.ICON
+
+ title = self.properties.get("Metadata", {}).get("xesam:title")
+ if title != self.title:
+ self.title = title
+ self.title_iter = self.title + " "
+
+ self.text += "{title} "
+ self.width += min(len(self.title), 10) + 1
+ self.sections[self.width] = Section.TITLE
+
+ if self.properties.get("CanGoPrevious"):
+ self.text += f"{Printer.ICONS["prev"]} "
+ self.width += len(f"{Printer.ICONS["prev"]} ")
+ self.sections[self.width] = Section.PREV
+
+ match self.properties.get("PlaybackStatus"):
+ case "Playing" if self.properties.get("CanPause"):
+ self.text += f"{Printer.ICONS["pause"]} "
+ self.width += len(f"{Printer.ICONS["pause"]} ")
+ self.sections[self.width] = Section.PAUSE
+ case _ if self.properties.get("CanPlay"):
+ self.text += f"{Printer.ICONS["play"]} "
+ self.width += len(f"{Printer.ICONS["play"]} ")
+ self.sections[self.width] = Section.PLAY
+
+ if self.properties.get("CanGoNext"):
+ self.text += f"{Printer.ICONS["next"]} "
+ self.width += len(f"{Printer.ICONS["next"]} ")
+ self.sections[self.width] = Section.NEXT
+
+ if vol := self.properties.get("Volume"):
+ match vol:
+ case float(vol) if vol > 0.66:
+ icon = Printer.ICONS["high"]
+ case float(vol) if vol > 0.33:
+ icon = Printer.ICONS["med"]
+ case _:
+ icon = Printer.ICONS["low"]
+
+ self.text += f"{icon} {vol:.0%} "
+ self.width += len(f"{icon} {vol:.0%} ")
+ self.sections[self.width] = Section.VOLUME
+
+ self.updated.set()
+
+ async def handle_button(self, data) -> bool:
+ with open("/tmp/mpris.json", "w") as f:
+ f.write(json.dumps(data))
+
+ match data:
+ case {"button": b, "relative_x": x, "width": w}:
+ button = b
+ pos = x / w
case _:
- return None
-
- @cached_property
- def short_text(self) -> str:
- s = f" {Printer.ICONS["icon"]} "
- if c := self.controls:
- s += f" {c} "
- return s
-
- def status(self) -> dict:
- full_text = ["", Printer.ICONS["icon"], ""]
-
- if c := self.controls:
- full_text.append(c)
-
- if v := self.volume:
- full_text.append(v)
-
- full_text.append("")
- for title in iter(self.text):
- full_text[2] = title
- yield {
- "full_text": " ".join(full_text),
- "short_text": self.short_text,
- "color": self.color,
- "background": self.background,
- }
-
- async def print(self):
- for status in iter(self.status()):
- print(json.dumps(status, ensure_ascii=False), flush=True)
- await asyncio.sleep(0.5)
-
- def toggle_name(self):
- if self.text.text == self.name:
- self.text.update(self.title)
- else:
- self.text.update(self.name)
+ return False
+
+ section = Section.BLOCK
+ for n, section in self.sections.items():
+ if (n / self.width) > pos:
+ break
+
+ try:
+ match (section, button):
+ case (Section.ICON, 1) if self.mpris_props.get("CanRaise"):
+ await self.mpris.call_raise()
+ case (Section.TITLE, 1):
+ pass
+ case (Section.PREV, 1):
+ await self.player.call_previous()
+ case (Section.PLAY, 1) | (Section.PAUSE, 1):
+ await self.player.call_play_pause()
+ case (Section.NEXT, 1):
+ await self.player.call_next()
+ case (Section.VOLUME, 4):
+ volume = await self.player.get_volume()
+ await self.player.set_volume(volume + 0.05)
+ case (Section.VOLUME, 5):
+ volume = await self.player.get_volume()
+ await self.player.set_volume(volume - 0.05)
+ case (_, 3):
+ await self.playerctl.call_shift()
+ case (_, _):
+ return
+
+ self.print()
+ except DBusError:
+ pass
+
+ def print(self):
+ status = {
+ "full_text": self.text.format(title=self.title_iter[:10]),
+ "short_text": self.text.format(title=""),
+ "color": self.color,
+ "background": self.background,
+ }
+ print(json.dumps(status, ensure_ascii=False), flush=True)
+
+ async def print_task(self):
+ while True:
+ self.updated.clear()
+ self.print()
+ if len(self.title) > 10:
+ self.title_iter = self.title_iter[1:] + self.title_iter[:1]
+ await asyncio.sleep(0.5)
+ else:
+ await self.updated.wait()
class MPRIS:
def __init__(self, task_group: TaskGroup):
self.task_group = task_group
self.printer_task = None
- self.active_buses: deque(str) = deque([])
+ self.active = None
self.bus = None
- self.dbus_iface = None
- self.players = defaultdict(lambda: {})
- self.player_iter = self.players.keys()
- self.ifaces = {}
+ self.playerctl = None
+ self.players = {}
@classmethod
async def connect(cls, task_group: TaskGroup):
self = cls(task_group)
self.bus = await MessageBus().connect()
-
- self.dbus_iface = self.bus.get_proxy_object(
- "org.freedesktop.DBus",
- "/org/freedesktop/DBus",
- await self.bus.introspect("org.freedesktop.DBus", "/org/freedesktop/DBus"),
- ).get_interface("org.freedesktop.DBus")
-
- for bus_name in await self.dbus_iface.call_list_names():
- if self.valid_player(bus_name):
- await self.add_player(bus_name)
-
return self
- @staticmethod
- def valid_player(bus_name: str) -> bool:
- return bus_name.startswith(BUS_NAME_PREFIX) and not any(
- bus_name.removeprefix(BUS_NAME_PREFIX).startswith(n)
- for n in BUS_NAME_IGNORE
- )
-
- @property
- def active_bus(self) -> str | None:
- if len(self.active_buses) > 0:
- return self.active_buses[-1]
-
- @property
- def active_player(self) -> Printer | None:
- return self.players.get(self.active_bus)
-
- @property
- def active_iface(self) -> Printer | None:
- return self.ifaces.get(self.active_bus)
-
- def cycle_player(self):
- self.active_buses.rotate(-1)
- self.update_printer()
-
- def update_printer(self):
+ async def activate(self, bus_name):
if self.printer_task is not None:
self.printer_task.cancel()
- if player := self.active_player:
- self.printer_task = self.task_group.create_task(player.print())
- else:
- print(flush=True)
-
- def on_properties_changed(self, bus_name: str, property, invalidated):
- props = parse_dbus_values(property)
- self.players[bus_name].player |= props
- match props:
- case {"PlaybackStatus": "Playing"}:
- if bus_name in self.active_buses:
- self.active_buses.remove(bus_name)
- self.active_buses.append(bus_name)
- case {"PlaybackStatus": "Stopped"}:
- self.active_buses.remove(bus_name)
-
- self.update_printer()
-
- async def add_player(self, bus_name):
- properties = None
- self.players[bus_name] = None
- self.ifaces[bus_name] = None
-
- obj = self.bus.get_proxy_object(
- bus_name, OBJECT_PATH, await self.bus.introspect(bus_name, OBJECT_PATH)
- )
+ if bus_name == "":
+ bus_name = None
- properties = obj.get_interface(PROPERTIES_INTERFACE)
+ if bus_name and bus_name not in self.players:
+ self.players[bus_name] = await Printer.create(self.bus, bus_name)
- self.players[bus_name] = Printer(
- bus_name.removeprefix(BUS_NAME_PREFIX),
- parse_dbus_values(await properties.call_get_all(PLAYER_INTERFACE)),
- )
- self.ifaces[bus_name] = obj.get_interface(PLAYER_INTERFACE)
+ if player := self.players.get(bus_name):
+ self.active = bus_name
+ self.printer_task = self.task_group.create_task(player.print_task())
+ else:
+ print(flush=True)
+ return
- properties.on_properties_changed(
- lambda _, p, i: self.on_properties_changed(bus_name, p, i)
- )
+ async def main(self):
+ playerctl = self.bus.get_proxy_object(
+ PLAYERCTL_BUS_NAME,
+ OBJECT_PATH,
+ await self.bus.introspect(PLAYERCTL_BUS_NAME, OBJECT_PATH),
+ ).get_interface(PLAYERCTL_INTERFACE)
- if bus_name in self.active_buses:
- self.active_buses.remove(bus_name)
+ bus_name = next(iter(await playerctl.get_player_names()), None)
+ await self.activate(bus_name)
- match self.players[bus_name].player.get("PlaybackStatus"):
- case "Playing":
- self.active_buses.append(bus_name)
- self.update_printer()
- case "Paused":
- self.active_buses.appendleft(bus_name)
- self.update_printer()
-
- def remove_player(self, bus_name):
- self.players.pop(bus_name, None)
- self.ifaces.pop(bus_name, None)
- if bus_name in self.active_buses:
- self.active_buses.remove(bus_name)
- self.update_printer()
-
- async def on_name_owner_changed(self, bus_name, old, new):
- if self.valid_player(bus_name):
- if old:
- self.remove_player(bus_name)
- if new:
- await self.add_player(bus_name)
-
- async def listener(self):
- self.dbus_iface.on_name_owner_changed(self.on_name_owner_changed)
+ playerctl.on_active_player_change_end(self.activate)
await self.bus.wait_for_disconnect()
async def button_handler(self):
@@ -322,37 +274,18 @@ class MPRIS:
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
while True:
- line = await reader.readline()
-
- if not line:
+ if line := await reader.readline():
+ if player := self.players.get(self.active):
+ await player.handle_button(json.loads(line))
+ else:
await asyncio.sleep(1)
- continue
-
- try:
- match json.loads(line):
- case {"button": 1} if iface := self.active_iface:
- await iface.call_play_pause()
- case {"button": 2} if player := self.active_player:
- player.toggle_name()
- self.update_printer()
- case {"button": 3}:
- self.active_buses.rotate(-1)
- self.update_printer()
- case {"button": 4} if iface := self.active_iface:
- volume = await iface.get_volume()
- await iface.set_volume(volume + 0.05)
- case {"button": 5} if iface := self.active_iface:
- volume = await iface.get_volume()
- await iface.set_volume(volume - 0.05)
- except DBusError:
- pass
async def main():
try:
async with TaskGroup() as task_group:
mpris = await MPRIS.connect(task_group)
- task_group.create_task(mpris.listener())
+ task_group.create_task(mpris.main())
task_group.create_task(mpris.button_handler())
except CancelledError:
return