Skip to content

ui.events

Thread-safe asyncio event bus for broadcasting prediction and suggestion events to WebSocket clients.

Overview

EventBus bridges the gap between synchronous background threads (like ActivityMonitor) and the async FastAPI WebSocket layer. It provides a pub/sub mechanism where publishers push events and subscribers receive them via asyncio queues. EventBus is implemented as a slotted dataclass; public method behavior is unchanged.

ActivityMonitor (thread) → publish_threadsafe → EventBus
                                              subscribe (async)
                                          WebSocket /ws/predictions

EventBus

Methods

Method Sync/Async Description
bind_loop(loop) sync Bind to the running asyncio event loop (call once at startup)
publish(event) async Broadcast an event dict to all current subscribers
publish_threadsafe(event) sync Schedule a publish from a non-async thread
subscribe() async context manager Yields an asyncio.Queue receiving all published events
snapshot() sync (thread-safe) Return latest event per type for reconnecting client hydration

bind_loop

bind_loop(loop: asyncio.AbstractEventLoop) -> None

Must be called once at startup to associate the bus with the running event loop. Without a bound loop, publish_threadsafe silently drops events.

publish

async publish(event: dict[str, Any]) -> None

Broadcasts event to every subscriber queue. If a subscriber's queue is full (capacity exceeded), that subscriber is evicted from the subscriber set on the same call.

publish_threadsafe

publish_threadsafe(event: dict[str, Any]) -> None

Schedules a publish coroutine on the bound event loop via asyncio.run_coroutine_threadsafe. Safe to call from any thread (e.g. ActivityMonitor). No-ops silently when no loop is bound or the loop is closed.

snapshot

snapshot() -> dict[str, dict[str, Any]]

Returns a copy of the most recent event for each known event type. Thread-safe (guarded by a threading.Lock). Used by GET /api/ws/snapshot so that reconnecting WebSocket clients can hydrate their store immediately instead of waiting for the next push.

subscribe

@asynccontextmanager
async subscribe() -> AsyncIterator[asyncio.Queue[dict[str, Any]]]

Context manager that creates a queue (capacity 256), adds it to the subscriber set, yields it, and removes it on exit. Typical usage is inside a WebSocket handler that reads from the queue in a loop.

Queue behaviour

  • Each subscriber gets an independent asyncio.Queue with maxsize=256.
  • When a queue is full, the subscriber is evicted on the next publish call (the overflowing event is dropped for that subscriber).
  • On context-manager exit, the queue is removed from the subscriber set regardless of whether it was previously evicted.

Usage

import asyncio
from taskclf.ui.events import EventBus

bus = EventBus()

async def main():
    bus.bind_loop(asyncio.get_running_loop())

    async with bus.subscribe() as queue:
        bus.publish_threadsafe({"type": "status", "state": "idle"})
        await asyncio.sleep(0.05)
        event = queue.get_nowait()
        print(event)  # {"type": "status", "state": "idle"}

asyncio.run(main())

See ui.server for the WebSocket endpoint that consumes events, and ui.window for the native window integration.

taskclf.ui.events

Asyncio event bus for broadcasting prediction and suggestion events to WebSocket clients.

EventBus dataclass

Thread-safe asyncio pub/sub for server-push events.

The ActivityMonitor (running in a background thread) publishes events via :meth:publish_threadsafe; WebSocket handlers subscribe via :meth:subscribe and receive events as an async iterator.

The bus also retains the most recent event of each type so that newly-connected (or reconnecting) clients can hydrate their state immediately via :meth:snapshot.

Source code in src/taskclf/ui/events.py
@dataclass(eq=False)
class EventBus:
    """Thread-safe asyncio pub/sub for server-push events.

    The ``ActivityMonitor`` (running in a background thread) publishes
    events via :meth:`publish_threadsafe`; WebSocket handlers subscribe
    via :meth:`subscribe` and receive events as an async iterator.

    The bus also retains the most recent event of each ``type`` so that
    newly-connected (or reconnecting) clients can hydrate their state
    immediately via :meth:`snapshot`.
    """

    _subscribers: set[asyncio.Queue[dict[str, Any]]] = field(
        init=False, default_factory=set
    )
    _lock: asyncio.Lock = field(init=False, default_factory=asyncio.Lock)
    _loop: asyncio.AbstractEventLoop | None = field(init=False, default=None)
    _ready: threading.Event = field(init=False, default_factory=threading.Event)
    _latest: dict[str, dict[str, Any]] = field(init=False, default_factory=dict)
    _latest_lock: threading.Lock = field(init=False, default_factory=threading.Lock)

    def bind_loop(self, loop: asyncio.AbstractEventLoop) -> None:
        """Bind to the running event loop (call once at startup)."""
        self._loop = loop
        self._ready.set()

    def wait_ready(self, timeout: float = 30) -> bool:
        """Block until :meth:`bind_loop` has been called.

        Returns ``True`` if the loop was bound within *timeout* seconds.
        Safe to call from any thread.
        """
        return self._ready.wait(timeout=timeout)

    def snapshot(self) -> dict[str, dict[str, Any]]:
        """Return the latest event for each known type (thread-safe).

        Used by the REST hydration endpoint so reconnecting WebSocket
        clients can immediately recover current state.
        """
        with self._latest_lock:
            return copy.deepcopy(self._latest)

    async def publish(self, event: dict[str, Any]) -> None:
        """Broadcast *event* to all current subscribers.

        When a subscriber's queue is full, the oldest event is evicted
        so the subscriber keeps receiving new events (at the cost of
        missing stale ones).  The subscriber is never silently dropped.
        """
        event_type = event.get("type")
        if event_type:
            with self._latest_lock:
                self._latest[event_type] = event
        async with self._lock:
            for q in self._subscribers:
                try:
                    q.put_nowait(event)
                except asyncio.QueueFull:
                    try:
                        q.get_nowait()
                    except asyncio.QueueEmpty:
                        pass
                    try:
                        q.put_nowait(event)
                    except asyncio.QueueFull:
                        logger.warning("EventBus: failed to enqueue after eviction")

    @property
    def has_subscribers(self) -> bool:
        """Return ``True`` if at least one WebSocket client is subscribed."""
        return bool(self._subscribers)

    def publish_threadsafe(self, event: dict[str, Any]) -> None:
        """Schedule a publish from a non-async thread (e.g. ``ActivityMonitor``)."""
        loop = self._loop
        if loop is None or loop.is_closed():
            return
        asyncio.run_coroutine_threadsafe(self.publish(event), loop)

    @asynccontextmanager
    async def subscribe(self) -> AsyncIterator[asyncio.Queue[dict[str, Any]]]:
        """Context manager that yields a queue receiving all published events."""
        q: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=256)
        async with self._lock:
            self._subscribers.add(q)
        try:
            yield q
        finally:
            async with self._lock:
                self._subscribers.discard(q)

has_subscribers property

Return True if at least one WebSocket client is subscribed.

bind_loop(loop)

Bind to the running event loop (call once at startup).

Source code in src/taskclf/ui/events.py
def bind_loop(self, loop: asyncio.AbstractEventLoop) -> None:
    """Bind to the running event loop (call once at startup)."""
    self._loop = loop
    self._ready.set()

wait_ready(timeout=30)

Block until :meth:bind_loop has been called.

Returns True if the loop was bound within timeout seconds. Safe to call from any thread.

Source code in src/taskclf/ui/events.py
def wait_ready(self, timeout: float = 30) -> bool:
    """Block until :meth:`bind_loop` has been called.

    Returns ``True`` if the loop was bound within *timeout* seconds.
    Safe to call from any thread.
    """
    return self._ready.wait(timeout=timeout)

snapshot()

Return the latest event for each known type (thread-safe).

Used by the REST hydration endpoint so reconnecting WebSocket clients can immediately recover current state.

Source code in src/taskclf/ui/events.py
def snapshot(self) -> dict[str, dict[str, Any]]:
    """Return the latest event for each known type (thread-safe).

    Used by the REST hydration endpoint so reconnecting WebSocket
    clients can immediately recover current state.
    """
    with self._latest_lock:
        return copy.deepcopy(self._latest)

publish(event) async

Broadcast event to all current subscribers.

When a subscriber's queue is full, the oldest event is evicted so the subscriber keeps receiving new events (at the cost of missing stale ones). The subscriber is never silently dropped.

Source code in src/taskclf/ui/events.py
async def publish(self, event: dict[str, Any]) -> None:
    """Broadcast *event* to all current subscribers.

    When a subscriber's queue is full, the oldest event is evicted
    so the subscriber keeps receiving new events (at the cost of
    missing stale ones).  The subscriber is never silently dropped.
    """
    event_type = event.get("type")
    if event_type:
        with self._latest_lock:
            self._latest[event_type] = event
    async with self._lock:
        for q in self._subscribers:
            try:
                q.put_nowait(event)
            except asyncio.QueueFull:
                try:
                    q.get_nowait()
                except asyncio.QueueEmpty:
                    pass
                try:
                    q.put_nowait(event)
                except asyncio.QueueFull:
                    logger.warning("EventBus: failed to enqueue after eviction")

publish_threadsafe(event)

Schedule a publish from a non-async thread (e.g. ActivityMonitor).

Source code in src/taskclf/ui/events.py
def publish_threadsafe(self, event: dict[str, Any]) -> None:
    """Schedule a publish from a non-async thread (e.g. ``ActivityMonitor``)."""
    loop = self._loop
    if loop is None or loop.is_closed():
        return
    asyncio.run_coroutine_threadsafe(self.publish(event), loop)

subscribe() async

Context manager that yields a queue receiving all published events.

Source code in src/taskclf/ui/events.py
@asynccontextmanager
async def subscribe(self) -> AsyncIterator[asyncio.Queue[dict[str, Any]]]:
    """Context manager that yields a queue receiving all published events."""
    q: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=256)
    async with self._lock:
        self._subscribers.add(q)
    try:
        yield q
    finally:
        async with self._lock:
            self._subscribers.discard(q)