aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--i3blocks/.config/i3blocks/config17
-rwxr-xr-xi3blocks/.local/lib/i3blocks/i3blocks-mpris236
2 files changed, 247 insertions, 6 deletions
diff --git a/i3blocks/.config/i3blocks/config b/i3blocks/.config/i3blocks/config
index 9807c90..899627a 100644
--- a/i3blocks/.config/i3blocks/config
+++ b/i3blocks/.config/i3blocks/config
@@ -10,6 +10,13 @@ interval=persist
command=curl -s wttr.in/?m\&format="%c+%t\n" | sed 's/\s\++\(.*\)C/ \1/'
interval=600
+[miniflux]
+command=$SCRIPT_DIR/i3blocks-miniflux
+interval=30
+
+[notmuch]
+command=$SCRIPT_DIR/i3blocks-notmuch
+
[disk]
command=$SCRIPT_DIR/i3blocks-disk
interval=20
@@ -26,12 +33,10 @@ command=$SCRIPT_DIR/i3blocks-gpu
[net]
command=$SCRIPT_DIR/i3blocks-net
-[miniflux]
-command=$SCRIPT_DIR/i3blocks-miniflux
-interval=30
-
-[notmuch]
-command=$SCRIPT_DIR/i3blocks-notmuch
+[volume]
+command=$SCRIPT_DIR/i3blocks-mpris
+interval=persist
+format=json
[volume]
command=$SCRIPT_DIR/i3blocks-volume
diff --git a/i3blocks/.local/lib/i3blocks/i3blocks-mpris b/i3blocks/.local/lib/i3blocks/i3blocks-mpris
new file mode 100755
index 0000000..82f48f2
--- /dev/null
+++ b/i3blocks/.local/lib/i3blocks/i3blocks-mpris
@@ -0,0 +1,236 @@
+#!/usr/bin/env python3
+
+import asyncio
+import json
+import os
+import sys
+from collections import deque
+from itertools import islice
+
+from dbus_next import Variant
+from dbus_next.aio import MessageBus
+
+BUS_NAME_PREFIX = "org.mpris.MediaPlayer2."
+BUS_NAME_IGNORE = ["org.mpris.MediaPlayer2.playerctld"]
+OBJECT_PATH = "/org/mpris/MediaPlayer2"
+PLAYER_INTERFACE = "org.mpris.MediaPlayer2.Player"
+PROPERTIES_INTERFACE = "org.freedesktop.DBus.Properties"
+
+
+def parse_dbus_values(item: Variant | dict | list) -> any:
+ match item:
+ case Variant():
+ return parse_dbus_values(item.value)
+ case dict(item):
+ return {k: parse_dbus_values(v) for k, v in item.items()}
+ case list(item):
+ return [parse_dbus_values(v) for v in item]
+ case v:
+ return v
+
+
+class Status:
+ ICON = "\U000f075a" # 󰝚
+
+ def __init__(self, width: int = 10):
+ self.title = ""
+ self.width = width
+ self.can_prev = False
+ self.can_play_pause = False
+ self.can_next = False
+ self.play_pause = None
+ self.color = None
+ self.background = None
+
+ def update_metadata(self, metadata):
+ title = metadata.get("xesam:title", "")
+ if self.title == title:
+ return
+
+ self.title = title
+ if len(self.title) < self.width:
+ self.title_gen = self.static_title(title)
+ else:
+ self.title_gen = self.scrolling_title(title)
+
+ def update_status(self, status):
+ match status:
+ case "Playing":
+ self.play_pause = "\U000f03e4 " # 󰏤
+ self.color = f"#{os.environ.get("BASE16_COLOR_00_HEX")}"
+ self.background = f"#{os.environ.get("BASE16_COLOR_0C_HEX")}"
+ case "Paused":
+ self.play_pause = "\U000f040a " # 󰐊
+ self.color = f"#{os.environ.get("BASE16_COLOR_00_HEX")}"
+ self.background = f"#{os.environ.get("BASE16_COLOR_0A_HEX")}"
+ case "Stopped":
+ self.play_pause = None
+ self.color = None
+ self.background = None
+
+ def update_player(self, player):
+ self.update_metadata(player.get("Metadata"))
+ self.update_status(player.get("PlaybackStatus"))
+ self.can_next = player.get("CanGoNext")
+ self.can_play_pause = player.get("CanPause")
+ self.can_prev = player.get("CanGoPrevious")
+
+ def static_title(self, title):
+ while True:
+ yield title
+
+ def scrolling_title(self, title):
+ iter = deque(list(title))
+
+ while True:
+ yield "".join(islice(iter, 0, self.width))
+ iter.rotate(-1)
+
+ def __str__(self):
+ output = {"full_text": f" {Status.ICON} "}
+
+ if len(self.title) > 0:
+ output["full_text"] += f"{next(self.title_gen)} "
+
+ if self.can_prev:
+ output["full_text"] += "\U000f04ae " # 󰒮
+ if self.can_play_pause:
+ output["full_text"] += self.play_pause
+ if self.can_next:
+ output["full_text"] += "\U000f04ad " # 󰒭
+
+ if self.color:
+ output["color"] = self.color
+ if self.background:
+ output["background"] = self.background
+
+ return json.dumps(output, ensure_ascii=False)
+
+
+class MPRIS:
+ def __init__(self):
+ self.bus = None
+ self.dbus_iface = None
+ self.players = {}
+ self.active_players = []
+ self.status = Status()
+
+ @classmethod
+ async def connect(cls):
+ self = cls()
+ self.bus = await MessageBus().connect()
+
+ self.dbus_iface = self.bus.get_proxy_object(
+ "org.freedesktop.DBus",
+ "/org/freedesktop/DBus",
+ await self.bus.introspect("org.freedesktop.DBus", "/org/freedesktop/DBus"),
+ ).get_interface("org.freedesktop.DBus")
+
+ self.players = {}
+
+ for bus_name in await self.dbus_iface.call_list_names():
+ if self.valid_player(bus_name):
+ await self.add_player(bus_name)
+
+ return self
+
+ @staticmethod
+ def valid_player(bus_name: str) -> bool:
+ return bus_name.startswith(BUS_NAME_PREFIX) and bus_name not in BUS_NAME_IGNORE
+
+ async def status_printer(self, interval=1):
+ while True:
+ print(str(self.status), flush=True)
+ await asyncio.sleep(interval)
+
+ def on_properties_changed(self, bus_name: str, property, invalidated):
+ self.players[bus_name].update(parse_dbus_values(property))
+ for name, value in self.players[bus_name].items():
+ match name, value:
+ case "Metadata", _:
+ continue
+ case "PlaybackStatus", "Playing":
+ if bus_name in self.active_players:
+ self.active_players.remove(bus_name)
+ self.active_players.append(bus_name)
+ self.status.update_player(self.players[bus_name])
+ case "PlaybackStatus", "Stopped" if bus_name in self.active_players:
+ self.active_players.remove(bus_name)
+ if len(self.active_players) > 0:
+ self.status.update_player(self.players[self.active_players[-1]])
+
+ async def add_player(self, bus_name):
+ self.players[bus_name] = {}
+
+ iface = self.bus.get_proxy_object(
+ bus_name, OBJECT_PATH, await self.bus.introspect(bus_name, OBJECT_PATH)
+ ).get_interface(PROPERTIES_INTERFACE)
+
+ self.players[bus_name] = parse_dbus_values(
+ await iface.call_get_all(PLAYER_INTERFACE)
+ )
+
+ iface.on_properties_changed(
+ lambda _, p, i: self.on_properties_changed(bus_name, p, i)
+ )
+
+ async def remove_player(self, bus_name):
+ self.players.pop(bus_name)
+
+ async def on_name_owner_changed(self, bus_name, old, new):
+ if self.valid_player(bus_name):
+ if old:
+ await self.remove_player(bus_name)
+ if new:
+ await self.add_player(bus_name)
+
+ async def listener(self):
+ self.dbus_iface.on_name_owner_changed(self.on_name_owner_changed)
+ await self.bus.wait_for_disconnect()
+
+ async def button_handler(self):
+ loop = asyncio.get_event_loop()
+ reader = asyncio.StreamReader()
+ protocol = asyncio.StreamReaderProtocol(reader)
+ await loop.connect_read_pipe(lambda: protocol, sys.stdin)
+
+ while True:
+ line = await reader.readline()
+
+ if not line:
+ await asyncio.sleep(1)
+ continue
+
+ match json.loads(line):
+ case {"button": 1}:
+ pass
+ case {"button": 2}:
+ pass
+ case {"button": 3}:
+ pass
+ case {"button": 4}:
+ pass
+ case {"button": 5}:
+ pass
+
+
+async def main():
+ mpris = await MPRIS.connect()
+
+ try:
+ async with asyncio.TaskGroup() as task_group:
+ task_group.create_task(mpris.status_printer())
+ task_group.create_task(mpris.listener())
+ # task_group.create_task(mpris.button_handler())
+ except asyncio.CancelledError:
+ return
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
+ asyncio.run(main())
+ asyncio.run(main())
+ asyncio.run(main())
+ asyncio.run(main())
+ asyncio.run(main())
+ asyncio.run(main())