diff options
-rw-r--r-- | i3blocks/.config/i3blocks/config | 17 | ||||
-rwxr-xr-x | i3blocks/.local/lib/i3blocks/i3blocks-mpris | 236 |
2 files changed, 247 insertions, 6 deletions
diff --git a/i3blocks/.config/i3blocks/config b/i3blocks/.config/i3blocks/config index 9807c90..899627a 100644 --- a/i3blocks/.config/i3blocks/config +++ b/i3blocks/.config/i3blocks/config @@ -10,6 +10,13 @@ interval=persist command=curl -s wttr.in/?m\&format="%c+%t\n" | sed 's/\s\++\(.*\)C/ \1/' interval=600 +[miniflux] +command=$SCRIPT_DIR/i3blocks-miniflux +interval=30 + +[notmuch] +command=$SCRIPT_DIR/i3blocks-notmuch + [disk] command=$SCRIPT_DIR/i3blocks-disk interval=20 @@ -26,12 +33,10 @@ command=$SCRIPT_DIR/i3blocks-gpu [net] command=$SCRIPT_DIR/i3blocks-net -[miniflux] -command=$SCRIPT_DIR/i3blocks-miniflux -interval=30 - -[notmuch] -command=$SCRIPT_DIR/i3blocks-notmuch +[volume] +command=$SCRIPT_DIR/i3blocks-mpris +interval=persist +format=json [volume] command=$SCRIPT_DIR/i3blocks-volume diff --git a/i3blocks/.local/lib/i3blocks/i3blocks-mpris b/i3blocks/.local/lib/i3blocks/i3blocks-mpris new file mode 100755 index 0000000..82f48f2 --- /dev/null +++ b/i3blocks/.local/lib/i3blocks/i3blocks-mpris @@ -0,0 +1,236 @@ +#!/usr/bin/env python3 + +import asyncio +import json +import os +import sys +from collections import deque +from itertools import islice + +from dbus_next import Variant +from dbus_next.aio import MessageBus + +BUS_NAME_PREFIX = "org.mpris.MediaPlayer2." +BUS_NAME_IGNORE = ["org.mpris.MediaPlayer2.playerctld"] +OBJECT_PATH = "/org/mpris/MediaPlayer2" +PLAYER_INTERFACE = "org.mpris.MediaPlayer2.Player" +PROPERTIES_INTERFACE = "org.freedesktop.DBus.Properties" + + +def parse_dbus_values(item: Variant | dict | list) -> any: + match item: + case Variant(): + return parse_dbus_values(item.value) + case dict(item): + return {k: parse_dbus_values(v) for k, v in item.items()} + case list(item): + return [parse_dbus_values(v) for v in item] + case v: + return v + + +class Status: + ICON = "\U000f075a" # + + def __init__(self, width: int = 10): + self.title = "" + self.width = width + self.can_prev = False + self.can_play_pause = False + self.can_next = False + self.play_pause = None + self.color = None + self.background = None + + def update_metadata(self, metadata): + title = metadata.get("xesam:title", "") + if self.title == title: + return + + self.title = title + if len(self.title) < self.width: + self.title_gen = self.static_title(title) + else: + self.title_gen = self.scrolling_title(title) + + def update_status(self, status): + match status: + case "Playing": + self.play_pause = "\U000f03e4 " # + self.color = f"#{os.environ.get("BASE16_COLOR_00_HEX")}" + self.background = f"#{os.environ.get("BASE16_COLOR_0C_HEX")}" + case "Paused": + self.play_pause = "\U000f040a " # + self.color = f"#{os.environ.get("BASE16_COLOR_00_HEX")}" + self.background = f"#{os.environ.get("BASE16_COLOR_0A_HEX")}" + case "Stopped": + self.play_pause = None + self.color = None + self.background = None + + def update_player(self, player): + self.update_metadata(player.get("Metadata")) + self.update_status(player.get("PlaybackStatus")) + self.can_next = player.get("CanGoNext") + self.can_play_pause = player.get("CanPause") + self.can_prev = player.get("CanGoPrevious") + + def static_title(self, title): + while True: + yield title + + def scrolling_title(self, title): + iter = deque(list(title)) + + while True: + yield "".join(islice(iter, 0, self.width)) + iter.rotate(-1) + + def __str__(self): + output = {"full_text": f" {Status.ICON} "} + + if len(self.title) > 0: + output["full_text"] += f"{next(self.title_gen)} " + + if self.can_prev: + output["full_text"] += "\U000f04ae " # + if self.can_play_pause: + output["full_text"] += self.play_pause + if self.can_next: + output["full_text"] += "\U000f04ad " # + + if self.color: + output["color"] = self.color + if self.background: + output["background"] = self.background + + return json.dumps(output, ensure_ascii=False) + + +class MPRIS: + def __init__(self): + self.bus = None + self.dbus_iface = None + self.players = {} + self.active_players = [] + self.status = Status() + + @classmethod + async def connect(cls): + self = cls() + self.bus = await MessageBus().connect() + + self.dbus_iface = self.bus.get_proxy_object( + "org.freedesktop.DBus", + "/org/freedesktop/DBus", + await self.bus.introspect("org.freedesktop.DBus", "/org/freedesktop/DBus"), + ).get_interface("org.freedesktop.DBus") + + self.players = {} + + for bus_name in await self.dbus_iface.call_list_names(): + if self.valid_player(bus_name): + await self.add_player(bus_name) + + return self + + @staticmethod + def valid_player(bus_name: str) -> bool: + return bus_name.startswith(BUS_NAME_PREFIX) and bus_name not in BUS_NAME_IGNORE + + async def status_printer(self, interval=1): + while True: + print(str(self.status), flush=True) + await asyncio.sleep(interval) + + def on_properties_changed(self, bus_name: str, property, invalidated): + self.players[bus_name].update(parse_dbus_values(property)) + for name, value in self.players[bus_name].items(): + match name, value: + case "Metadata", _: + continue + case "PlaybackStatus", "Playing": + if bus_name in self.active_players: + self.active_players.remove(bus_name) + self.active_players.append(bus_name) + self.status.update_player(self.players[bus_name]) + case "PlaybackStatus", "Stopped" if bus_name in self.active_players: + self.active_players.remove(bus_name) + if len(self.active_players) > 0: + self.status.update_player(self.players[self.active_players[-1]]) + + async def add_player(self, bus_name): + self.players[bus_name] = {} + + iface = self.bus.get_proxy_object( + bus_name, OBJECT_PATH, await self.bus.introspect(bus_name, OBJECT_PATH) + ).get_interface(PROPERTIES_INTERFACE) + + self.players[bus_name] = parse_dbus_values( + await iface.call_get_all(PLAYER_INTERFACE) + ) + + iface.on_properties_changed( + lambda _, p, i: self.on_properties_changed(bus_name, p, i) + ) + + async def remove_player(self, bus_name): + self.players.pop(bus_name) + + async def on_name_owner_changed(self, bus_name, old, new): + if self.valid_player(bus_name): + if old: + await self.remove_player(bus_name) + if new: + await self.add_player(bus_name) + + async def listener(self): + self.dbus_iface.on_name_owner_changed(self.on_name_owner_changed) + await self.bus.wait_for_disconnect() + + async def button_handler(self): + loop = asyncio.get_event_loop() + reader = asyncio.StreamReader() + protocol = asyncio.StreamReaderProtocol(reader) + await loop.connect_read_pipe(lambda: protocol, sys.stdin) + + while True: + line = await reader.readline() + + if not line: + await asyncio.sleep(1) + continue + + match json.loads(line): + case {"button": 1}: + pass + case {"button": 2}: + pass + case {"button": 3}: + pass + case {"button": 4}: + pass + case {"button": 5}: + pass + + +async def main(): + mpris = await MPRIS.connect() + + try: + async with asyncio.TaskGroup() as task_group: + task_group.create_task(mpris.status_printer()) + task_group.create_task(mpris.listener()) + # task_group.create_task(mpris.button_handler()) + except asyncio.CancelledError: + return + + +if __name__ == "__main__": + asyncio.run(main()) + asyncio.run(main()) + asyncio.run(main()) + asyncio.run(main()) + asyncio.run(main()) + asyncio.run(main()) + asyncio.run(main()) |