From e842d34c9ae3a48a78bbcaa5b9fc58216e1d49a6 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Mon, 7 Oct 2024 13:30:57 -0500 Subject: refactor(i3blocks): move i3blocks to bin dir --- i3blocks/.local/bin/i3blocks-volume | 128 ++++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100755 i3blocks/.local/bin/i3blocks-volume (limited to 'i3blocks/.local/bin/i3blocks-volume') diff --git a/i3blocks/.local/bin/i3blocks-volume b/i3blocks/.local/bin/i3blocks-volume new file mode 100755 index 0000000..40854e7 --- /dev/null +++ b/i3blocks/.local/bin/i3blocks-volume @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 + +import asyncio +import json +import os +import subprocess +import sys + + +ICONS = { + "mute": "\U000f075f", # 󰝟 + "low": "\U000f057f", # 󰕿 + "medium": "\U000f0580", # 󰖀 + "high": "\U000f057e", # 󰕾 +} + + +def is_muted(): + return ( + subprocess.run( + ["pactl", "get-sink-mute", "@DEFAULT_SINK@"], + capture_output=True, + encoding="UTF-8", + ) + .stdout.removeprefix("Mute: ") + .strip() + == "yes" + ) + + +def get_volume(): + stdout = subprocess.run( + ["pactl", "get-sink-volume", "@DEFAULT_SINK@"], + capture_output=True, + encoding="UTF-8", + ).stdout.strip() + + for s in stdout.removeprefix("Volume: ").split(): + if s.endswith("%"): + return int(s.rstrip("%")) + + +def print_status(): + match get_volume(): + case None: + output = {} + case v if is_muted(): + output = { + "full_text": f" {ICONS["mute"]} {v}% ", + "color": f"#{os.environ.get("BASE16_COLOR_00_HEX")}", + "background": f"#{os.environ.get("BASE16_COLOR_0A_HEX")}", + } + case v if v > 66: + output = {"full_text": f" {ICONS["high"]} {v}% "} + case v if v > 33: + output = {"full_text": f" {ICONS["medium"]} {v}% "} + case v: + output = {"full_text": f" {ICONS["low"]} {v}% "} + + print(json.dumps(output, ensure_ascii=False), flush=True) + + +async def listener(): + process = await asyncio.create_subprocess_exec( + "pactl", + "--format=json", + "subscribe", + stdout=asyncio.subprocess.PIPE, + ) + + while True: + line = await process.stdout.readline() + + if not line: + await asyncio.sleep(1) + continue + + match json.loads(line.decode("UTF-8")): + case {"on": "sink"} | {"on": "source-output"}: + print_status() + + +async def button_handler(): + loop = asyncio.get_event_loop() + reader = asyncio.StreamReader() + protocol = asyncio.StreamReaderProtocol(reader) + await loop.connect_read_pipe(lambda: protocol, sys.stdin) + proc: subprocess.Popen = None + + while True: + line = await reader.readline() + + if not line: + await asyncio.sleep(1) + continue + + match json.loads(line): + case {"button": 1} if proc: + proc.terminate() + proc = None + case {"button": 1, "gui": cmd}: + proc = subprocess.Popen(cmd, shell=True) + case {"button": 2}: + pass + case {"button": 3}: + subprocess.run(["wpctl", "set-mute", "@DEFAULT_AUDIO_SINK@", "toggle"]) + case {"button": 4}: + subprocess.run( + ["wpctl", "set-volume", "-l", "1.5", "@DEFAULT_AUDIO_SINK@", "5%+"] + ) + case {"button": 5}: + subprocess.run( + ["wpctl", "set-volume", "-l", "1.5", "@DEFAULT_AUDIO_SINK@", "5%-"] + ) + + +async def main(): + print_status() + try: + async with asyncio.TaskGroup() as task_group: + task_group.create_task(listener()) + task_group.create_task(button_handler()) + except asyncio.CancelledError: + return + + +if __name__ == "__main__": + asyncio.run(main()) -- cgit v1.2.3-70-g09d2