Skip to content

infer.feature_state

Persistent rolling feature state for the online inference loop.

Overview

The online loop polls ActivityWatch in short windows, so build_features_from_aw_events() only sees a narrow slice of recent events. Rolling features (15-minute switch counts, rolling keyboard/mouse means, deltas, session length) are therefore truncated to the poll window rather than reflecting the full history the model was trained on.

OnlineFeatureState solves this by maintaining a circular buffer of recent FeatureRow values across poll cycles. After each row is built, it is pushed into the state, and get_context() returns corrected rolling aggregates that are overlaid onto the row before prediction.

Pipeline position

poll AW events → build_features_from_aw_events()
    → feature_state.push(row)
    → context = feature_state.get_context()
    → row.model_copy(update=context)
    → predictor.predict_bucket(row)

OnlineFeatureState

Circular buffer of recent FeatureRow values with rolling aggregate computation. Constructor parameters:

Parameter Type Default Description
buffer_minutes int 15 Minutes of history to retain
bucket_seconds int 60 Width of each time bucket in seconds
idle_gap_seconds float 300.0 Gap between rows that triggers a session reset

Methods

push(row: FeatureRow) -> None

Record a newly built feature row. Feeds the row's input metrics into an internal DynamicsTracker and detects idle-gap session boundaries.

get_context() -> dict

Return rolling aggregates derived from the full buffer. The returned dict maps FeatureRow field names to corrected values:

Key Type Description
app_switch_count_last_15m int Unique app switches across the buffered 15-minute window
keys_per_min_rolling_5 float \| None 5-bucket rolling mean of keys_per_min
keys_per_min_rolling_15 float \| None 15-bucket rolling mean of keys_per_min
mouse_distance_rolling_5 float \| None 5-bucket rolling mean of mouse_distance
mouse_distance_rolling_15 float \| None 15-bucket rolling mean of mouse_distance
keys_per_min_delta float \| None Change in keys_per_min from previous bucket
clicks_per_min_delta float \| None Change in clicks_per_min from previous bucket
mouse_distance_delta float \| None Change in mouse_distance from previous bucket
session_length_so_far float Minutes since the current session started

Session tracking

A new session starts when the gap between consecutive pushed rows exceeds idle_gap_seconds. The session_length_so_far field resets to 0.0 at the boundary.

Model hot-reload

When the online loop hot-reloads a new model, the OnlineFeatureState instance is preserved (not reset), since it tracks feature history rather than model state.

taskclf.infer.feature_state

Persistent rolling feature state for the online inference loop.

The online loop polls ActivityWatch in short windows, so :func:~taskclf.features.build.build_features_from_aw_events only sees a narrow slice of recent events. Rolling features (15-minute switch counts, rolling keyboard/mouse means, deltas, session length) are therefore truncated to the poll window.

:class:OnlineFeatureState solves this by maintaining a circular buffer of recent :class:~taskclf.core.types.FeatureRow values across poll cycles. After each row is built, it is pushed into the state, and get_context() returns corrected rolling aggregates that can be overlaid onto the row before prediction.

OnlineFeatureState dataclass

Circular buffer of recent feature rows with rolling aggregate computation.

Preserves enough history for all derived features that span beyond a single poll window: 5/15-minute rolling means, deltas, app switch counts, and session length.

Parameters:

Name Type Description Default
buffer_minutes int

How many minutes of history to retain.

DEFAULT_APP_SWITCH_WINDOW_15M
bucket_seconds int

Width of each time bucket in seconds.

DEFAULT_BUCKET_SECONDS
idle_gap_seconds float

Gap (seconds) between consecutive rows that triggers a session reset.

DEFAULT_IDLE_GAP_SECONDS
Source code in src/taskclf/infer/feature_state.py
@dataclass(eq=False)
class OnlineFeatureState:
    """Circular buffer of recent feature rows with rolling aggregate computation.

    Preserves enough history for all derived features that span beyond
    a single poll window: 5/15-minute rolling means, deltas, app switch
    counts, and session length.

    Args:
        buffer_minutes: How many minutes of history to retain.
        bucket_seconds: Width of each time bucket in seconds.
        idle_gap_seconds: Gap (seconds) between consecutive rows that
            triggers a session reset.
    """

    buffer_minutes: int = DEFAULT_APP_SWITCH_WINDOW_15M
    bucket_seconds: int = DEFAULT_BUCKET_SECONDS
    idle_gap_seconds: float = DEFAULT_IDLE_GAP_SECONDS

    _capacity: int = field(init=False)
    _buffer: deque[FeatureRowBase] = field(init=False)
    _dynamics: DynamicsTracker = field(init=False)
    _session_start_ts: datetime | None = field(init=False, default=None)
    _last_dynamics: dict[str, float | None] = field(init=False, default_factory=dict)

    def __post_init__(self) -> None:
        buckets_per_minute = 60 / self.bucket_seconds
        self._capacity = max(int(self.buffer_minutes * buckets_per_minute), 1)
        self._buffer = deque(maxlen=self._capacity)
        self._dynamics = DynamicsTracker(
            rolling_5=DEFAULT_ROLLING_WINDOW_5,
            rolling_15=DEFAULT_ROLLING_WINDOW_15,
        )

    def push(self, row: FeatureRowBase) -> None:
        """Record a newly built feature row.

        Feeds the row's input metrics into the internal
        :class:`~taskclf.features.dynamics.DynamicsTracker` and detects
        idle-gap session boundaries.
        """
        if self._buffer:
            prev = self._buffer[-1]
            gap = (row.bucket_start_ts - prev.bucket_start_ts).total_seconds()
            if gap >= self.idle_gap_seconds:
                self._session_start_ts = row.bucket_start_ts

        if self._session_start_ts is None:
            self._session_start_ts = row.bucket_start_ts

        self._last_dynamics = self._dynamics.update(
            row.keys_per_min,
            row.clicks_per_min,
            row.mouse_distance,
        )
        self._buffer.append(row)

    def get_context(self) -> dict[str, int | float | None]:
        """Return rolling aggregates derived from the full buffer.

        The returned dict contains keys that directly correspond to
        :class:`~taskclf.core.types.FeatureRow` field names and can be
        used with ``row.model_copy(update=context)`` to overlay the
        corrected values.
        """
        if not self._buffer:
            return {}

        current = self._buffer[-1]

        switch_window = timedelta(minutes=DEFAULT_APP_SWITCH_WINDOW_15M)
        cutoff = current.bucket_start_ts - switch_window
        apps_in_window: set[str] = set()
        for row in self._buffer:
            if row.bucket_start_ts >= cutoff:
                apps_in_window.add(row.app_id)
        app_switch_count_15m = max(0, len(apps_in_window) - 1)

        session_length = 0.0
        if self._session_start_ts is not None:
            session_length = round(
                (current.bucket_start_ts - self._session_start_ts).total_seconds()
                / 60.0,
                2,
            )

        return {
            "app_switch_count_last_15m": app_switch_count_15m,
            "keys_per_min_rolling_5": self._last_dynamics.get("keys_per_min_rolling_5"),
            "keys_per_min_rolling_15": self._last_dynamics.get(
                "keys_per_min_rolling_15"
            ),
            "mouse_distance_rolling_5": self._last_dynamics.get(
                "mouse_distance_rolling_5"
            ),
            "mouse_distance_rolling_15": self._last_dynamics.get(
                "mouse_distance_rolling_15"
            ),
            "keys_per_min_delta": self._last_dynamics.get("keys_per_min_delta"),
            "clicks_per_min_delta": self._last_dynamics.get("clicks_per_min_delta"),
            "mouse_distance_delta": self._last_dynamics.get("mouse_distance_delta"),
            "session_length_so_far": session_length,
        }

push(row)

Record a newly built feature row.

Feeds the row's input metrics into the internal :class:~taskclf.features.dynamics.DynamicsTracker and detects idle-gap session boundaries.

Source code in src/taskclf/infer/feature_state.py
def push(self, row: FeatureRowBase) -> None:
    """Record a newly built feature row.

    Feeds the row's input metrics into the internal
    :class:`~taskclf.features.dynamics.DynamicsTracker` and detects
    idle-gap session boundaries.
    """
    if self._buffer:
        prev = self._buffer[-1]
        gap = (row.bucket_start_ts - prev.bucket_start_ts).total_seconds()
        if gap >= self.idle_gap_seconds:
            self._session_start_ts = row.bucket_start_ts

    if self._session_start_ts is None:
        self._session_start_ts = row.bucket_start_ts

    self._last_dynamics = self._dynamics.update(
        row.keys_per_min,
        row.clicks_per_min,
        row.mouse_distance,
    )
    self._buffer.append(row)

get_context()

Return rolling aggregates derived from the full buffer.

The returned dict contains keys that directly correspond to :class:~taskclf.core.types.FeatureRow field names and can be used with row.model_copy(update=context) to overlay the corrected values.

Source code in src/taskclf/infer/feature_state.py
def get_context(self) -> dict[str, int | float | None]:
    """Return rolling aggregates derived from the full buffer.

    The returned dict contains keys that directly correspond to
    :class:`~taskclf.core.types.FeatureRow` field names and can be
    used with ``row.model_copy(update=context)`` to overlay the
    corrected values.
    """
    if not self._buffer:
        return {}

    current = self._buffer[-1]

    switch_window = timedelta(minutes=DEFAULT_APP_SWITCH_WINDOW_15M)
    cutoff = current.bucket_start_ts - switch_window
    apps_in_window: set[str] = set()
    for row in self._buffer:
        if row.bucket_start_ts >= cutoff:
            apps_in_window.add(row.app_id)
    app_switch_count_15m = max(0, len(apps_in_window) - 1)

    session_length = 0.0
    if self._session_start_ts is not None:
        session_length = round(
            (current.bucket_start_ts - self._session_start_ts).total_seconds()
            / 60.0,
            2,
        )

    return {
        "app_switch_count_last_15m": app_switch_count_15m,
        "keys_per_min_rolling_5": self._last_dynamics.get("keys_per_min_rolling_5"),
        "keys_per_min_rolling_15": self._last_dynamics.get(
            "keys_per_min_rolling_15"
        ),
        "mouse_distance_rolling_5": self._last_dynamics.get(
            "mouse_distance_rolling_5"
        ),
        "mouse_distance_rolling_15": self._last_dynamics.get(
            "mouse_distance_rolling_15"
        ),
        "keys_per_min_delta": self._last_dynamics.get("keys_per_min_delta"),
        "clicks_per_min_delta": self._last_dynamics.get("clicks_per_min_delta"),
        "mouse_distance_delta": self._last_dynamics.get("mouse_distance_delta"),
        "session_length_so_far": session_length,
    }