@dataclass(kw_only=True, eq=False)
class ActivityMonitor:
"""Poll the configured activity source and emit app transitions."""
aw_host: str = DEFAULT_AW_HOST
title_salt: str = DEFAULT_TITLE_SALT
poll_seconds: int = DEFAULT_POLL_SECONDS
aw_timeout_seconds: int = DEFAULT_AW_TIMEOUT_SECONDS
transition_minutes: int = DEFAULT_TRANSITION_MINUTES
idle_transition_minutes: int = DEFAULT_IDLE_TRANSITION_MINUTES
on_transition: Callable[[str, str, dt.datetime, dt.datetime], Any] | None = None
on_poll: Callable[[str], Any] | None = None
on_initial_app: Callable[[str, dt.datetime], Any] | None = None
event_bus: EventBus | None = None
_aw_host: str = field(init=False)
_title_salt: str = field(init=False)
_poll_seconds: int = field(init=False)
_aw_timeout_seconds: int = field(init=False)
_transition_threshold: int = field(init=False)
_idle_transition_threshold: int = field(init=False)
_on_transition: Callable[[str, str, dt.datetime, dt.datetime], Any] | None = field(
init=False, default=None
)
_on_poll: Callable[[str], Any] | None = field(init=False, default=None)
_on_initial_app: Callable[[str, dt.datetime], Any] | None = field(
init=False, default=None
)
_event_bus: EventBus | None = field(init=False, default=None)
_current_app: str | None = field(init=False, default=None)
_current_app_since: dt.datetime | None = field(init=False, default=None)
_candidate_app: str | None = field(init=False, default=None)
_candidate_duration: int = field(init=False, default=0)
_candidate_first_seen: dt.datetime | None = field(init=False, default=None)
_last_check_time: dt.datetime | None = field(init=False, default=None)
_bucket_id: str | None = field(init=False, default=None)
_aw_warned: bool = field(init=False, default=False)
_stop: threading.Event = field(init=False, default_factory=threading.Event)
_paused: threading.Event = field(init=False, default_factory=threading.Event)
_poll_count: int = field(init=False, default=0)
_last_event_count: int = field(init=False, default=0)
_last_app_counts: dict[str, int] = field(init=False, default_factory=dict)
_last_poll_ts: dt.datetime | None = field(init=False, default=None)
_started_at: dt.datetime | None = field(init=False, default=None)
_consecutive_failures: int = field(init=False, default=0)
_backoff_seconds: int = field(init=False, default=0)
_provider: ActivityWatchProvider = field(init=False)
_provider_status: ActivityProviderStatus = field(init=False)
def __post_init__(self) -> None:
self._aw_host = self.aw_host
self._title_salt = self.title_salt
self._poll_seconds = self.poll_seconds
self._aw_timeout_seconds = self.aw_timeout_seconds
self._transition_threshold = self.transition_minutes * 60
self._idle_transition_threshold = self.idle_transition_minutes * 60
self._on_transition = self.on_transition
self._on_poll = self.on_poll
self._on_initial_app = self.on_initial_app
self._event_bus = self.event_bus
self._provider = ActivityWatchProvider(
endpoint=self._aw_host,
title_salt=self._title_salt,
timeout_seconds=self._aw_timeout_seconds,
)
self._provider_status = self._provider.initial_status()
def _discover_bucket(self, *, timeout_seconds: float | int | None = None) -> str:
return self._provider.discover_source_id(timeout_seconds=timeout_seconds)
def _after_fetch_failure(self, exc: ActivityProviderUnavailableError) -> None:
"""Update retry state after a failed activity-source request."""
self._consecutive_failures += 1
if exc.retryable:
self._backoff_seconds = min(
self._poll_seconds * (2**self._consecutive_failures),
_MAX_BACKOFF_SECONDS,
)
else:
self._backoff_seconds = 0
self._provider_status = self._provider.setup_required_status(
source_id=self._bucket_id,
last_sample_count=self._last_event_count,
last_sample_breakdown=self._last_app_counts,
)
if self._consecutive_failures == _WARN_AFTER_FAILURES:
logger.warning(
"Activity source unreachable (%d consecutive failures): %s",
self._consecutive_failures,
exc,
)
elif self._consecutive_failures > _WARN_AFTER_FAILURES:
logger.debug(
"Activity source still unreachable (%d failures)",
self._consecutive_failures,
)
def _after_fetch_success(self) -> None:
"""Reset retry state after a successful activity-source request."""
if self._consecutive_failures >= _WARN_AFTER_FAILURES:
logger.info(
"Activity source connection restored after %d failures",
self._consecutive_failures,
)
self._consecutive_failures = 0
self._backoff_seconds = 0
self._provider_status = self._provider.ready_status(
source_id=self._bucket_id,
last_sample_count=self._last_event_count,
last_sample_breakdown=self._last_app_counts,
)
def _poll_dominant_app(self) -> str | None:
"""Fetch recent activity-source events and return the most common app id."""
if self._bucket_id is None:
try:
startup_probe_timeout = (
min(2, self._aw_timeout_seconds)
if self._poll_count == 0
else self._aw_timeout_seconds
)
self._bucket_id = self._discover_bucket(
timeout_seconds=startup_probe_timeout
)
if self._aw_warned:
print(f"Connected to ActivityWatch at {self._aw_host}")
self._aw_warned = False
self._after_fetch_success()
except ActivityProviderUnavailableError as exc:
self._after_fetch_failure(exc)
if not self._aw_warned:
print(
f"Waiting for ActivityWatch at {self._aw_host} "
f"(retrying every {self._poll_seconds}s)..."
)
self._aw_warned = True
self._last_event_count = 0
self._last_app_counts = {}
return None
now = dt.datetime.now(dt.timezone.utc)
start = now - dt.timedelta(seconds=self._poll_seconds)
try:
events = self._provider.fetch_events(
self._bucket_id,
start,
now,
timeout_seconds=self._aw_timeout_seconds,
)
except ActivityProviderUnavailableError as exc:
if exc.source_lost:
logger.warning("Activity source unavailable, will rediscover: %s", exc)
self._bucket_id = None
self._after_fetch_failure(exc)
self._last_event_count = 0
self._last_app_counts = {}
return None
except Exception:
self._consecutive_failures += 1
logger.debug("Failed to fetch activity-source events", exc_info=True)
self._last_event_count = 0
self._last_app_counts = {}
return None
if not events:
self._last_event_count = 0
self._last_app_counts = {}
self._after_fetch_success()
return None
self._last_event_count = len(events)
self._last_app_counts = summarize_events_by_app(events)
self._after_fetch_success()
return max(self._last_app_counts, key=lambda app: self._last_app_counts[app])
def check_transition(
self,
dominant_app: str,
*,
_now: dt.datetime | None = None,
) -> None:
"""Update transition state and fire the callback when warranted."""
now = _now or dt.datetime.now(dt.timezone.utc)
elapsed = (
int((now - self._last_check_time).total_seconds())
if self._last_check_time is not None
else self._poll_seconds
)
self._last_check_time = now
if self._current_app is None:
self._current_app = dominant_app
self._current_app_since = now
logger.debug("DEBUG poll: initial app=%r", dominant_app)
if self._on_initial_app is not None:
self._on_initial_app(dominant_app, now)
return
if dominant_app != self._current_app:
leaving_lockscreen = (
self._current_app in _LOCKSCREEN_APP_IDS
and dominant_app not in _LOCKSCREEN_APP_IDS
)
if leaving_lockscreen:
block_start = self._current_app_since or now
block_end = now
prev = self._current_app
logger.debug(
"DEBUG poll: IMMEDIATE idle->active transition %r -> %r (block %s -> %s)",
prev,
dominant_app,
block_start.isoformat(),
block_end.isoformat(),
)
self._current_app = dominant_app
self._current_app_since = block_end
self._candidate_app = None
self._candidate_duration = 0
self._candidate_first_seen = None
if self._on_transition is not None:
self._on_transition(prev, dominant_app, block_start, block_end)
elif self._candidate_app == dominant_app:
self._candidate_duration += elapsed
is_idle_candidate = dominant_app in _LOCKSCREEN_APP_IDS
effective_threshold = (
self._idle_transition_threshold
if is_idle_candidate
else self._transition_threshold
)
logger.debug(
"DEBUG poll: candidate %r held %ds / %ds threshold%s",
dominant_app,
self._candidate_duration,
effective_threshold,
" (idle)" if is_idle_candidate else "",
)
if self._candidate_duration >= effective_threshold:
block_start = self._current_app_since or now
block_end = self._candidate_first_seen or now
prev = self._current_app
logger.debug(
"DEBUG poll: TRANSITION FIRED %r -> %r (block %s -> %s)",
prev,
dominant_app,
block_start.isoformat(),
block_end.isoformat(),
)
self._current_app = dominant_app
self._current_app_since = block_end
self._candidate_app = None
self._candidate_duration = 0
self._candidate_first_seen = None
if self._on_transition is not None:
self._on_transition(prev, dominant_app, block_start, block_end)
else:
logger.debug(
"DEBUG poll: new candidate %r (was %r, current %r)",
dominant_app,
self._candidate_app,
self._current_app,
)
self._candidate_app = dominant_app
self._candidate_duration = elapsed
self._candidate_first_seen = now
else:
if self._candidate_app is not None:
logger.debug(
"DEBUG poll: candidate %r reset, back to current %r",
self._candidate_app,
self._current_app,
)
self._candidate_app = None
self._candidate_duration = 0
self._candidate_first_seen = None
def _publish_status(
self,
dominant_app: str,
*,
state: str = "collecting",
count_as_poll: bool = True,
) -> None:
now = dt.datetime.now(dt.timezone.utc)
self._last_poll_ts = now
if count_as_poll:
self._poll_count += 1
if self._event_bus is not None:
uptime_s = (
int((now - self._started_at).total_seconds()) if self._started_at else 0
)
self._event_bus.publish_threadsafe(
{
"type": "status",
"state": state,
"current_app": dominant_app,
"current_app_since": (
self._current_app_since.isoformat()
if self._current_app_since
else None
),
"candidate_app": self._candidate_app,
"candidate_duration_s": self._candidate_duration,
"transition_threshold_s": self._transition_threshold,
"poll_seconds": self._poll_seconds,
"poll_count": self._poll_count,
"last_poll_ts": now.isoformat(),
"uptime_s": uptime_s,
"activity_provider": self._provider_status.to_payload(),
"aw_connected": self._bucket_id is not None,
"aw_bucket_id": self._bucket_id,
"aw_host": self._aw_host,
"last_event_count": self._last_event_count,
"last_app_counts": self._last_app_counts,
}
)
def run(self) -> None:
"""Blocking poll loop. Call from a daemon thread."""
if self._event_bus is not None and not self._event_bus.wait_ready(timeout=30):
logger.warning("EventBus loop not bound after 30s, starting anyway")
self._started_at = dt.datetime.now(dt.timezone.utc)
self._publish_status(
self._current_app or "unknown",
state="idle",
count_as_poll=False,
)
while not self._stop.is_set():
if self._paused.is_set():
app = self._current_app or "unknown"
self._publish_status(app, state="paused")
else:
dominant = self._poll_dominant_app()
app = dominant or self._current_app or "unknown"
if self._on_poll is not None:
self._on_poll(app)
self._publish_status(app)
if dominant is not None:
self.check_transition(dominant)
self._stop.wait(timeout=self._poll_seconds + self._backoff_seconds)
def stop(self) -> None:
"""Signal the poll loop to stop."""
self._stop.set()
def pause(self) -> None:
"""Pause monitoring without clearing session state."""
self._paused.set()
def resume(self) -> None:
"""Resume monitoring after a pause."""
self._last_check_time = None
self._paused.clear()
@property
def is_paused(self) -> bool:
return self._paused.is_set()
@property
def current_app(self) -> str | None:
return self._current_app
@property
def activity_provider_status(self) -> dict[str, object]:
"""Return the latest cached activity-source status snapshot."""
return self._provider_status.to_payload()