aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/i3blocks/.local
diff options
context:
space:
mode:
Diffstat (limited to 'i3blocks/.local')
-rwxr-xr-xi3blocks/.local/lib/i3blocks/i3blocks-mpris173
1 files changed, 101 insertions, 72 deletions
diff --git a/i3blocks/.local/lib/i3blocks/i3blocks-mpris b/i3blocks/.local/lib/i3blocks/i3blocks-mpris
index 8f7be61..3ccbd9a 100755
--- a/i3blocks/.local/lib/i3blocks/i3blocks-mpris
+++ b/i3blocks/.local/lib/i3blocks/i3blocks-mpris
@@ -6,6 +6,7 @@ import os
import sys
from asyncio import CancelledError, TaskGroup, run
from collections import defaultdict, deque
+from dataclasses import dataclass, field
from functools import cached_property
from itertools import islice
@@ -14,7 +15,7 @@ from dbus_next.aio import MessageBus
from dbus_next.errors import DBusError
BUS_NAME_PREFIX = "org.mpris.MediaPlayer2."
-BUS_NAME_IGNORE = ["org.mpris.MediaPlayer2.playerctld"]
+BUS_NAME_IGNORE = ["playerctld", "kdeconnect"]
OBJECT_PATH = "/org/mpris/MediaPlayer2"
PLAYER_INTERFACE = "org.mpris.MediaPlayer2.Player"
PROPERTIES_INTERFACE = "org.freedesktop.DBus.Properties"
@@ -32,6 +33,33 @@ def parse_dbus_values(item: Variant | dict | list) -> dict:
return v
+@dataclass
+class ScrollingText:
+ """Class to produce a rotating string iterator"""
+
+ text: str = ""
+ width: int = 10
+ deque: deque = field(init=False)
+ rotate: bool = field(init=False)
+
+ def __post_init__(self):
+ self.rotate = len(self.text) > self.width
+ self.deque = deque(list(self.text + " "))
+
+ def update(self, value: str = "") -> None:
+ if value == self.text:
+ return
+ self.text = "" if value is None else value
+ self.__post_init__()
+
+ def __iter__(self):
+ while self.rotate:
+ yield "".join(islice(self.deque, 0, self.width))
+ self.deque.rotate(-1)
+
+ yield self.text
+
+
class Printer:
BLACK = f"#{os.environ.get("BASE16_COLOR_00_HEX")}"
GREEN = f"#{os.environ.get("BASE16_COLOR_0C_HEX")}"
@@ -47,23 +75,11 @@ class Printer:
"low": "\U000f057f", # 󰕿
}
- def __init__(self, player, width: int = 10):
- self.width = width
+ def __init__(self, name, player, width: int = 10):
+ self.name = name
+ self.text = ScrollingText()
self.player = player
-
- def __iter__(self):
- while self.iter:
- yield {
- "full_text": self.full_text,
- "short_text": self.short_text,
- "color": self.color,
- "background": self.background,
- }
-
- if len(self.iter) > self.width:
- self.iter.rotate(-1)
- else:
- break
+ self.width = width
@property
def player(self):
@@ -84,15 +100,11 @@ class Printer:
]:
self.__dict__.pop(attr, None)
+ self.text.update(self.title)
+
@cached_property
- def title(self):
- title = self.player.get("Metadata", {"xesam:title": self.prev_title}).get(
- "xesam:title"
- )
- if title != self.prev_title:
- self.__dict__.pop("iter", None)
- self.prev_title = title
- return self.prev_title
+ def title(self) -> str:
+ return self.player.get("Metadata", {}).get("xesam:title", "")
@cached_property
def controls(self) -> str:
@@ -142,43 +154,48 @@ class Printer:
return None
@cached_property
- def iter(self) -> list | deque:
- match self.player.get("Metadata", {}).get("xesam:title", ""):
- case str(""):
- return [""]
- case str(s):
- return deque(list(s + " "))
-
- @cached_property
def short_text(self) -> str:
s = f" {Printer.ICONS["icon"]} "
if c := self.controls:
s += f" {c} "
return s
- @property
- def full_text(self) -> str:
- s = f" {Printer.ICONS["icon"]} {"".join(islice(self.iter, 0, self.width))} "
+ def status(self) -> dict:
+ full_text = ["", Printer.ICONS["icon"], ""]
if c := self.controls:
- s += f"{c} "
+ full_text.append(c)
if v := self.volume:
- s += f"{v} "
+ full_text.append(v)
- return s
+ full_text.append("")
+ for title in iter(self.text):
+ full_text[2] = title
+ yield {
+ "full_text": " ".join(full_text),
+ "short_text": self.short_text,
+ "color": self.color,
+ "background": self.background,
+ }
async def print(self):
- for status in iter(self):
+ for status in iter(self.status()):
print(json.dumps(status, ensure_ascii=False), flush=True)
await asyncio.sleep(0.5)
+ def toggle_name(self):
+ if self.text.text == self.name:
+ self.text.update(self.title)
+ else:
+ self.text.update(self.name)
+
class MPRIS:
def __init__(self, task_group: TaskGroup):
self.task_group = task_group
self.printer_task = None
- self.active_players = deque([])
+ self.active_buses: deque(str) = deque([])
self.bus = None
self.dbus_iface = None
self.players = defaultdict(lambda: {})
@@ -204,25 +221,34 @@ class MPRIS:
@staticmethod
def valid_player(bus_name: str) -> bool:
- return bus_name.startswith(BUS_NAME_PREFIX) and bus_name not in BUS_NAME_IGNORE
+ return bus_name.startswith(BUS_NAME_PREFIX) and not any(
+ bus_name.removeprefix(BUS_NAME_PREFIX).startswith(n)
+ for n in BUS_NAME_IGNORE
+ )
+
+ @property
+ def active_bus(self) -> str | None:
+ if len(self.active_buses) > 0:
+ return self.active_buses[-1]
+
+ @property
+ def active_player(self) -> Printer | None:
+ return self.players.get(self.active_bus)
@property
- def active_player(self):
- if len(self.active_players) > 0:
- return self.active_players[-1]
+ def active_iface(self) -> Printer | None:
+ return self.ifaces.get(self.active_bus)
def cycle_player(self):
- self.active_players.rotate(-1)
+ self.active_buses.rotate(-1)
self.update_printer()
def update_printer(self):
if self.printer_task is not None:
self.printer_task.cancel()
- if len(self.active_players) > 0:
- self.printer_task = self.task_group.create_task(
- self.players[self.active_player].print()
- )
+ if player := self.active_player:
+ self.printer_task = self.task_group.create_task(player.print())
else:
print(flush=True)
@@ -231,11 +257,11 @@ class MPRIS:
self.players[bus_name].player |= props
match props:
case {"PlaybackStatus": "Playing"}:
- if bus_name in self.active_players:
- self.active_players.remove(bus_name)
- self.active_players.append(bus_name)
+ if bus_name in self.active_buses:
+ self.active_buses.remove(bus_name)
+ self.active_buses.append(bus_name)
case {"PlaybackStatus": "Stopped"}:
- self.active_players.remove(bus_name)
+ self.active_buses.remove(bus_name)
self.update_printer()
@@ -251,7 +277,8 @@ class MPRIS:
properties = obj.get_interface(PROPERTIES_INTERFACE)
self.players[bus_name] = Printer(
- parse_dbus_values(await properties.call_get_all(PLAYER_INTERFACE))
+ bus_name.removeprefix(BUS_NAME_PREFIX),
+ parse_dbus_values(await properties.call_get_all(PLAYER_INTERFACE)),
)
self.ifaces[bus_name] = obj.get_interface(PLAYER_INTERFACE)
@@ -259,22 +286,22 @@ class MPRIS:
lambda _, p, i: self.on_properties_changed(bus_name, p, i)
)
- if bus_name in self.active_players:
- self.active_players.remove(bus_name)
+ if bus_name in self.active_buses:
+ self.active_buses.remove(bus_name)
match self.players[bus_name].player.get("PlaybackStatus"):
case "Playing":
- self.active_players.append(bus_name)
+ self.active_buses.append(bus_name)
self.update_printer()
case "Paused":
- self.active_players.appendleft(bus_name)
+ self.active_buses.appendleft(bus_name)
self.update_printer()
def remove_player(self, bus_name):
self.players.pop(bus_name, None)
self.ifaces.pop(bus_name, None)
- if bus_name in self.active_players:
- self.active_players.remove(bus_name)
+ if bus_name in self.active_buses:
+ self.active_buses.remove(bus_name)
self.update_printer()
async def on_name_owner_changed(self, bus_name, old, new):
@@ -303,18 +330,20 @@ class MPRIS:
try:
match json.loads(line):
- case {"button": 1} if bus_name := self.active_player:
- await self.ifaces[bus_name].call_play_pause()
- case {"button": 2}:
- pass
+ case {"button": 1} if iface := self.active_iface:
+ await iface.call_play_pause()
+ case {"button": 2} if player := self.active_player:
+ player.toggle_name()
+ self.update_printer()
case {"button": 3}:
- self.cycle_player()
- case {"button": 4} if bus_name := self.active_player:
- volume = await self.ifaces[bus_name].get_volume()
- await self.ifaces[bus_name].set_volume(volume + 0.05)
- case {"button": 5} if bus_name := self.active_player:
- volume = await self.ifaces[bus_name].get_volume()
- await self.ifaces[bus_name].set_volume(volume - 0.05)
+ self.active_buses.rotate(-1)
+ self.update_printer()
+ case {"button": 4} if iface := self.active_iface:
+ volume = await iface.get_volume()
+ await iface.set_volume(volume + 0.05)
+ case {"button": 5} if iface := self.active_iface:
+ volume = await iface.get_volume()
+ await iface.set_volume(volume - 0.05)
except DBusError:
pass