Skip to content

ui.runtime

Shared non-GUI runtime helpers used by both taskclf ui and taskclf tray.

  • ActivityMonitor owns activity-source polling, transition detection, cached provider diagnostics, and status event publishing.
  • _LabelSuggester wraps online inference for interval label suggestions.
  • Keeping these classes outside taskclf.ui.tray lets browser-only UI startup avoid importing tray icon dependencies such as Pillow and pystray.

Activity source status

ActivityMonitor now publishes a provider-neutral activity_provider object in every WebSocket status event. The object is initialized in checking state immediately at startup, then updated to:

  • ready after a successful initial probe
  • setup_required after the first failed startup probe, so the UI can show setup guidance without blocking manual labeling

Legacy aw_* fields remain in the status payload temporarily for compatibility, but new UI code should prefer status.activity_provider.

taskclf.ui.runtime

Shared non-GUI runtime pieces for the UI and tray flows.

ActivityMonitor dataclass

Poll the configured activity source and emit app transitions.

Source code in src/taskclf/ui/runtime.py
@dataclass(kw_only=True, eq=False)
class ActivityMonitor:
    """Poll the configured activity source and emit app transitions."""

    aw_host: str = DEFAULT_AW_HOST
    title_salt: str = DEFAULT_TITLE_SALT
    poll_seconds: int = DEFAULT_POLL_SECONDS
    aw_timeout_seconds: int = DEFAULT_AW_TIMEOUT_SECONDS
    transition_minutes: int = DEFAULT_TRANSITION_MINUTES
    idle_transition_minutes: int = DEFAULT_IDLE_TRANSITION_MINUTES
    on_transition: Callable[[str, str, dt.datetime, dt.datetime], Any] | None = None
    on_poll: Callable[[str], Any] | None = None
    on_initial_app: Callable[[str, dt.datetime], Any] | None = None
    event_bus: EventBus | None = None
    _aw_host: str = field(init=False)
    _title_salt: str = field(init=False)
    _poll_seconds: int = field(init=False)
    _aw_timeout_seconds: int = field(init=False)
    _transition_threshold: int = field(init=False)
    _idle_transition_threshold: int = field(init=False)
    _on_transition: Callable[[str, str, dt.datetime, dt.datetime], Any] | None = field(
        init=False, default=None
    )
    _on_poll: Callable[[str], Any] | None = field(init=False, default=None)
    _on_initial_app: Callable[[str, dt.datetime], Any] | None = field(
        init=False, default=None
    )
    _event_bus: EventBus | None = field(init=False, default=None)
    _current_app: str | None = field(init=False, default=None)
    _current_app_since: dt.datetime | None = field(init=False, default=None)
    _candidate_app: str | None = field(init=False, default=None)
    _candidate_duration: int = field(init=False, default=0)
    _candidate_first_seen: dt.datetime | None = field(init=False, default=None)
    _last_check_time: dt.datetime | None = field(init=False, default=None)
    _bucket_id: str | None = field(init=False, default=None)
    _aw_warned: bool = field(init=False, default=False)
    _stop: threading.Event = field(init=False, default_factory=threading.Event)
    _paused: threading.Event = field(init=False, default_factory=threading.Event)
    _poll_count: int = field(init=False, default=0)
    _last_event_count: int = field(init=False, default=0)
    _last_app_counts: dict[str, int] = field(init=False, default_factory=dict)
    _last_poll_ts: dt.datetime | None = field(init=False, default=None)
    _started_at: dt.datetime | None = field(init=False, default=None)
    _consecutive_failures: int = field(init=False, default=0)
    _backoff_seconds: int = field(init=False, default=0)
    _provider: ActivityWatchProvider = field(init=False)
    _provider_status: ActivityProviderStatus = field(init=False)

    def __post_init__(self) -> None:
        self._aw_host = self.aw_host
        self._title_salt = self.title_salt
        self._poll_seconds = self.poll_seconds
        self._aw_timeout_seconds = self.aw_timeout_seconds
        self._transition_threshold = self.transition_minutes * 60
        self._idle_transition_threshold = self.idle_transition_minutes * 60
        self._on_transition = self.on_transition
        self._on_poll = self.on_poll
        self._on_initial_app = self.on_initial_app
        self._event_bus = self.event_bus
        self._provider = ActivityWatchProvider(
            endpoint=self._aw_host,
            title_salt=self._title_salt,
            timeout_seconds=self._aw_timeout_seconds,
        )
        self._provider_status = self._provider.initial_status()

    def _discover_bucket(self, *, timeout_seconds: float | int | None = None) -> str:
        return self._provider.discover_source_id(timeout_seconds=timeout_seconds)

    def _after_fetch_failure(self, exc: ActivityProviderUnavailableError) -> None:
        """Update retry state after a failed activity-source request."""
        self._consecutive_failures += 1
        if exc.retryable:
            self._backoff_seconds = min(
                self._poll_seconds * (2**self._consecutive_failures),
                _MAX_BACKOFF_SECONDS,
            )
        else:
            self._backoff_seconds = 0
        self._provider_status = self._provider.setup_required_status(
            source_id=self._bucket_id,
            last_sample_count=self._last_event_count,
            last_sample_breakdown=self._last_app_counts,
        )

        if self._consecutive_failures == _WARN_AFTER_FAILURES:
            logger.warning(
                "Activity source unreachable (%d consecutive failures): %s",
                self._consecutive_failures,
                exc,
            )
        elif self._consecutive_failures > _WARN_AFTER_FAILURES:
            logger.debug(
                "Activity source still unreachable (%d failures)",
                self._consecutive_failures,
            )

    def _after_fetch_success(self) -> None:
        """Reset retry state after a successful activity-source request."""
        if self._consecutive_failures >= _WARN_AFTER_FAILURES:
            logger.info(
                "Activity source connection restored after %d failures",
                self._consecutive_failures,
            )
        self._consecutive_failures = 0
        self._backoff_seconds = 0
        self._provider_status = self._provider.ready_status(
            source_id=self._bucket_id,
            last_sample_count=self._last_event_count,
            last_sample_breakdown=self._last_app_counts,
        )

    def _poll_dominant_app(self) -> str | None:
        """Fetch recent activity-source events and return the most common app id."""

        if self._bucket_id is None:
            try:
                startup_probe_timeout = (
                    min(2, self._aw_timeout_seconds)
                    if self._poll_count == 0
                    else self._aw_timeout_seconds
                )
                self._bucket_id = self._discover_bucket(
                    timeout_seconds=startup_probe_timeout
                )
                if self._aw_warned:
                    print(f"Connected to ActivityWatch at {self._aw_host}")
                    self._aw_warned = False
                self._after_fetch_success()
            except ActivityProviderUnavailableError as exc:
                self._after_fetch_failure(exc)
                if not self._aw_warned:
                    print(
                        f"Waiting for ActivityWatch at {self._aw_host} "
                        f"(retrying every {self._poll_seconds}s)..."
                    )
                    self._aw_warned = True
                self._last_event_count = 0
                self._last_app_counts = {}
                return None

        now = dt.datetime.now(dt.timezone.utc)
        start = now - dt.timedelta(seconds=self._poll_seconds)
        try:
            events = self._provider.fetch_events(
                self._bucket_id,
                start,
                now,
                timeout_seconds=self._aw_timeout_seconds,
            )
        except ActivityProviderUnavailableError as exc:
            if exc.source_lost:
                logger.warning("Activity source unavailable, will rediscover: %s", exc)
                self._bucket_id = None
            self._after_fetch_failure(exc)
            self._last_event_count = 0
            self._last_app_counts = {}
            return None
        except Exception:
            self._consecutive_failures += 1
            logger.debug("Failed to fetch activity-source events", exc_info=True)
            self._last_event_count = 0
            self._last_app_counts = {}
            return None

        if not events:
            self._last_event_count = 0
            self._last_app_counts = {}
            self._after_fetch_success()
            return None

        self._last_event_count = len(events)
        self._last_app_counts = summarize_events_by_app(events)
        self._after_fetch_success()
        return max(self._last_app_counts, key=lambda app: self._last_app_counts[app])

    def check_transition(
        self,
        dominant_app: str,
        *,
        _now: dt.datetime | None = None,
    ) -> None:
        """Update transition state and fire the callback when warranted."""
        now = _now or dt.datetime.now(dt.timezone.utc)
        elapsed = (
            int((now - self._last_check_time).total_seconds())
            if self._last_check_time is not None
            else self._poll_seconds
        )
        self._last_check_time = now

        if self._current_app is None:
            self._current_app = dominant_app
            self._current_app_since = now
            logger.debug("DEBUG poll: initial app=%r", dominant_app)
            if self._on_initial_app is not None:
                self._on_initial_app(dominant_app, now)
            return

        if dominant_app != self._current_app:
            leaving_lockscreen = (
                self._current_app in _LOCKSCREEN_APP_IDS
                and dominant_app not in _LOCKSCREEN_APP_IDS
            )
            if leaving_lockscreen:
                block_start = self._current_app_since or now
                block_end = now
                prev = self._current_app

                logger.debug(
                    "DEBUG poll: IMMEDIATE idle->active transition %r -> %r (block %s -> %s)",
                    prev,
                    dominant_app,
                    block_start.isoformat(),
                    block_end.isoformat(),
                )

                self._current_app = dominant_app
                self._current_app_since = block_end
                self._candidate_app = None
                self._candidate_duration = 0
                self._candidate_first_seen = None

                if self._on_transition is not None:
                    self._on_transition(prev, dominant_app, block_start, block_end)
            elif self._candidate_app == dominant_app:
                self._candidate_duration += elapsed
                is_idle_candidate = dominant_app in _LOCKSCREEN_APP_IDS
                effective_threshold = (
                    self._idle_transition_threshold
                    if is_idle_candidate
                    else self._transition_threshold
                )
                logger.debug(
                    "DEBUG poll: candidate %r held %ds / %ds threshold%s",
                    dominant_app,
                    self._candidate_duration,
                    effective_threshold,
                    " (idle)" if is_idle_candidate else "",
                )
                if self._candidate_duration >= effective_threshold:
                    block_start = self._current_app_since or now
                    block_end = self._candidate_first_seen or now
                    prev = self._current_app

                    logger.debug(
                        "DEBUG poll: TRANSITION FIRED %r -> %r (block %s -> %s)",
                        prev,
                        dominant_app,
                        block_start.isoformat(),
                        block_end.isoformat(),
                    )

                    self._current_app = dominant_app
                    self._current_app_since = block_end
                    self._candidate_app = None
                    self._candidate_duration = 0
                    self._candidate_first_seen = None

                    if self._on_transition is not None:
                        self._on_transition(prev, dominant_app, block_start, block_end)
            else:
                logger.debug(
                    "DEBUG poll: new candidate %r (was %r, current %r)",
                    dominant_app,
                    self._candidate_app,
                    self._current_app,
                )
                self._candidate_app = dominant_app
                self._candidate_duration = elapsed
                self._candidate_first_seen = now
        else:
            if self._candidate_app is not None:
                logger.debug(
                    "DEBUG poll: candidate %r reset, back to current %r",
                    self._candidate_app,
                    self._current_app,
                )
            self._candidate_app = None
            self._candidate_duration = 0
            self._candidate_first_seen = None

    def _publish_status(
        self,
        dominant_app: str,
        *,
        state: str = "collecting",
        count_as_poll: bool = True,
    ) -> None:
        now = dt.datetime.now(dt.timezone.utc)
        self._last_poll_ts = now
        if count_as_poll:
            self._poll_count += 1

        if self._event_bus is not None:
            uptime_s = (
                int((now - self._started_at).total_seconds()) if self._started_at else 0
            )
            self._event_bus.publish_threadsafe(
                {
                    "type": "status",
                    "state": state,
                    "current_app": dominant_app,
                    "current_app_since": (
                        self._current_app_since.isoformat()
                        if self._current_app_since
                        else None
                    ),
                    "candidate_app": self._candidate_app,
                    "candidate_duration_s": self._candidate_duration,
                    "transition_threshold_s": self._transition_threshold,
                    "poll_seconds": self._poll_seconds,
                    "poll_count": self._poll_count,
                    "last_poll_ts": now.isoformat(),
                    "uptime_s": uptime_s,
                    "activity_provider": self._provider_status.to_payload(),
                    "aw_connected": self._bucket_id is not None,
                    "aw_bucket_id": self._bucket_id,
                    "aw_host": self._aw_host,
                    "last_event_count": self._last_event_count,
                    "last_app_counts": self._last_app_counts,
                }
            )

    def run(self) -> None:
        """Blocking poll loop. Call from a daemon thread."""
        if self._event_bus is not None and not self._event_bus.wait_ready(timeout=30):
            logger.warning("EventBus loop not bound after 30s, starting anyway")
        self._started_at = dt.datetime.now(dt.timezone.utc)
        self._publish_status(
            self._current_app or "unknown",
            state="idle",
            count_as_poll=False,
        )
        while not self._stop.is_set():
            if self._paused.is_set():
                app = self._current_app or "unknown"
                self._publish_status(app, state="paused")
            else:
                dominant = self._poll_dominant_app()
                app = dominant or self._current_app or "unknown"
                if self._on_poll is not None:
                    self._on_poll(app)
                self._publish_status(app)
                if dominant is not None:
                    self.check_transition(dominant)
            self._stop.wait(timeout=self._poll_seconds + self._backoff_seconds)

    def stop(self) -> None:
        """Signal the poll loop to stop."""
        self._stop.set()

    def pause(self) -> None:
        """Pause monitoring without clearing session state."""
        self._paused.set()

    def resume(self) -> None:
        """Resume monitoring after a pause."""
        self._last_check_time = None
        self._paused.clear()

    @property
    def is_paused(self) -> bool:
        return self._paused.is_set()

    @property
    def current_app(self) -> str | None:
        return self._current_app

    @property
    def activity_provider_status(self) -> dict[str, object]:
        """Return the latest cached activity-source status snapshot."""
        return self._provider_status.to_payload()

activity_provider_status property

Return the latest cached activity-source status snapshot.

check_transition(dominant_app, *, _now=None)

Update transition state and fire the callback when warranted.

Source code in src/taskclf/ui/runtime.py
def check_transition(
    self,
    dominant_app: str,
    *,
    _now: dt.datetime | None = None,
) -> None:
    """Update transition state and fire the callback when warranted."""
    now = _now or dt.datetime.now(dt.timezone.utc)
    elapsed = (
        int((now - self._last_check_time).total_seconds())
        if self._last_check_time is not None
        else self._poll_seconds
    )
    self._last_check_time = now

    if self._current_app is None:
        self._current_app = dominant_app
        self._current_app_since = now
        logger.debug("DEBUG poll: initial app=%r", dominant_app)
        if self._on_initial_app is not None:
            self._on_initial_app(dominant_app, now)
        return

    if dominant_app != self._current_app:
        leaving_lockscreen = (
            self._current_app in _LOCKSCREEN_APP_IDS
            and dominant_app not in _LOCKSCREEN_APP_IDS
        )
        if leaving_lockscreen:
            block_start = self._current_app_since or now
            block_end = now
            prev = self._current_app

            logger.debug(
                "DEBUG poll: IMMEDIATE idle->active transition %r -> %r (block %s -> %s)",
                prev,
                dominant_app,
                block_start.isoformat(),
                block_end.isoformat(),
            )

            self._current_app = dominant_app
            self._current_app_since = block_end
            self._candidate_app = None
            self._candidate_duration = 0
            self._candidate_first_seen = None

            if self._on_transition is not None:
                self._on_transition(prev, dominant_app, block_start, block_end)
        elif self._candidate_app == dominant_app:
            self._candidate_duration += elapsed
            is_idle_candidate = dominant_app in _LOCKSCREEN_APP_IDS
            effective_threshold = (
                self._idle_transition_threshold
                if is_idle_candidate
                else self._transition_threshold
            )
            logger.debug(
                "DEBUG poll: candidate %r held %ds / %ds threshold%s",
                dominant_app,
                self._candidate_duration,
                effective_threshold,
                " (idle)" if is_idle_candidate else "",
            )
            if self._candidate_duration >= effective_threshold:
                block_start = self._current_app_since or now
                block_end = self._candidate_first_seen or now
                prev = self._current_app

                logger.debug(
                    "DEBUG poll: TRANSITION FIRED %r -> %r (block %s -> %s)",
                    prev,
                    dominant_app,
                    block_start.isoformat(),
                    block_end.isoformat(),
                )

                self._current_app = dominant_app
                self._current_app_since = block_end
                self._candidate_app = None
                self._candidate_duration = 0
                self._candidate_first_seen = None

                if self._on_transition is not None:
                    self._on_transition(prev, dominant_app, block_start, block_end)
        else:
            logger.debug(
                "DEBUG poll: new candidate %r (was %r, current %r)",
                dominant_app,
                self._candidate_app,
                self._current_app,
            )
            self._candidate_app = dominant_app
            self._candidate_duration = elapsed
            self._candidate_first_seen = now
    else:
        if self._candidate_app is not None:
            logger.debug(
                "DEBUG poll: candidate %r reset, back to current %r",
                self._candidate_app,
                self._current_app,
            )
        self._candidate_app = None
        self._candidate_duration = 0
        self._candidate_first_seen = None

run()

Blocking poll loop. Call from a daemon thread.

Source code in src/taskclf/ui/runtime.py
def run(self) -> None:
    """Blocking poll loop. Call from a daemon thread."""
    if self._event_bus is not None and not self._event_bus.wait_ready(timeout=30):
        logger.warning("EventBus loop not bound after 30s, starting anyway")
    self._started_at = dt.datetime.now(dt.timezone.utc)
    self._publish_status(
        self._current_app or "unknown",
        state="idle",
        count_as_poll=False,
    )
    while not self._stop.is_set():
        if self._paused.is_set():
            app = self._current_app or "unknown"
            self._publish_status(app, state="paused")
        else:
            dominant = self._poll_dominant_app()
            app = dominant or self._current_app or "unknown"
            if self._on_poll is not None:
                self._on_poll(app)
            self._publish_status(app)
            if dominant is not None:
                self.check_transition(dominant)
        self._stop.wait(timeout=self._poll_seconds + self._backoff_seconds)

stop()

Signal the poll loop to stop.

Source code in src/taskclf/ui/runtime.py
def stop(self) -> None:
    """Signal the poll loop to stop."""
    self._stop.set()

pause()

Pause monitoring without clearing session state.

Source code in src/taskclf/ui/runtime.py
def pause(self) -> None:
    """Pause monitoring without clearing session state."""
    self._paused.set()

resume()

Resume monitoring after a pause.

Source code in src/taskclf/ui/runtime.py
def resume(self) -> None:
    """Resume monitoring after a pause."""
    self._last_check_time = None
    self._paused.clear()