From fe11889362af8ca98f02c80178cc31ac1d87364e Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Tue, 4 Jun 2024 12:07:23 -0500 Subject: feat(i3blocks): add scrolling text to title --- i3blocks/.local/lib/i3blocks/i3blocks-title | 119 +++++++++++++++++----------- 1 file changed, 73 insertions(+), 46 deletions(-) (limited to 'i3blocks') diff --git a/i3blocks/.local/lib/i3blocks/i3blocks-title b/i3blocks/.local/lib/i3blocks/i3blocks-title index 2f7e395..44c89de 100755 --- a/i3blocks/.local/lib/i3blocks/i3blocks-title +++ b/i3blocks/.local/lib/i3blocks/i3blocks-title @@ -1,53 +1,80 @@ #!/usr/bin/env python3 -import os -from typing import Union +import asyncio +from asyncio import TaskGroup, CancelledError +from collections import deque +from itertools import islice import i3ipc -from i3ipc import Event - - -class Connection(i3ipc.Connection): - def handle( - self, - *events: Union[Event, str], - ): - def wrapped(handler): - for event in events: - self.on(event, handler) - return handler - - return wrapped - - -max_length = int(os.environ.get("max_length", 30)) - - -sway = Connection() - - -@sway.handle( - Event.WINDOW_FOCUS, - Event.WINDOW_TITLE, - Event.WINDOW_NEW, - Event.WINDOW_CLOSE, - Event.WORKSPACE_FOCUS, -) -def print_status(sway: i3ipc.Connection, _=None): - focused = sway.get_tree() - while focused is not None and focused.ipc_data["type"] != "con": - focused = focused.find_focused() - - if focused: - print(focused.ipc_data["name"][:max_length], flush=True) - else: - print("", flush=True) - - -def main(): - print_status(sway) - sway.main() +from i3ipc.aio import Connection + + +class Status: + def __init__(self, title: str = "", width: int = 30): + self.width = width + self.title = title + if len(title) > self.width: + self.iter = deque(list(title + " ")) + else: + self.iter = list(title) + + def __iter__(self): + while self.iter: + title = "".join(islice(self.iter, 0, self.width)) + yield title + if isinstance(self.iter, deque): + self.iter.rotate(-1) + else: + break + + +class Blocklet: + EVENTS = [ + i3ipc.Event.WINDOW_FOCUS, + i3ipc.Event.WINDOW_TITLE, + i3ipc.Event.WINDOW_NEW, + i3ipc.Event.WINDOW_CLOSE, + ] + + def __init__(self, task_group: TaskGroup): + self.i3 = None + self.task_group = task_group + self.printer_task = None + + @classmethod + async def connect(cls, task_group: TaskGroup): + self = cls(task_group) + self.i3 = await Connection().connect() + + for event in Blocklet.EVENTS: + self.i3.on(event, lambda _, event: self.update_focus(event.container)) + return self + + async def main(self): + if focused := (await self.i3.get_tree()).find_focused(): + self.update_focus(focused) + await self.i3.main() + + async def printer(self, title: str): + for status in iter(Status(title)): + print(status, flush=True) + await asyncio.sleep(0.5) + + def update_focus(self, container: i3ipc.Con): + if self.printer_task is not None: + self.printer_task.cancel() + + self.printer_task = self.task_group.create_task(self.printer(container.name)) + + +async def main(): + try: + async with TaskGroup() as task_group: + blocklet = await Blocklet.connect(task_group) + task_group.create_task(blocklet.main()) + except CancelledError: + return if __name__ == "__main__": - main() + asyncio.run(main()) -- cgit v1.2.3-70-g09d2