Skip to content

infer.online

Real-time prediction loop: poll ActivityWatch, predict, smooth, and report.

See Inference Contract for the canonical pipeline order and how this module fits the online runtime path. OnlinePredictor is implemented as a slotted dataclass while preserving its constructor shape (model, metadata, then keyword-only options).

Model auto-resolution

--model-dir is optional. When omitted, the model is resolved automatically using resolve_model_dir() from taskclf.infer.resolve:

  1. If models/active.json exists and points to a valid, compatible bundle, use it.
  2. Otherwise, scan --models-dir (default models/) and select the best bundle by policy.
  3. If no eligible model is found, exit with a descriptive error.

--model-dir always overrides auto-resolution when provided.

Model hot-reload

When --models-dir is provided (always the case with the default CLI), the online loop watches models/active.json (see model bundle layout) for changes using mtime polling (default interval: 60 seconds). When a change is detected:

  1. The new active bundle is resolved and loaded.
  2. The OnlinePredictor is rebuilt with the new model.
  3. If loading fails, the current model is kept and a warning is logged.

This allows retraining to promote a new model while the online loop is running, without requiring a restart.

Label queue integration

When label_queue_path is provided, the online loop auto-enqueues predictions whose confidence falls below label_confidence_threshold (default 0.55) into the ActiveLabelingQueue. Enqueued items surface in taskclf labels show-queue and the web UI for manual review.

Enable via CLI:

taskclf infer online \
  --label-queue \
  --label-confidence 0.50

At shutdown, the loop prints how many buckets were enqueued during the session.

Persistent feature state

The online loop creates an OnlineFeatureState (see infer.feature_state) that buffers recent FeatureRow values across poll cycles. After each row is built from the current poll window, it is pushed into the state and the corrected rolling aggregates (15-minute app switch counts, rolling means, deltas, session length) are overlaid onto the row before prediction. This ensures features match the full history windows the model was trained on, rather than being truncated to the narrow poll slice.

The feature state is preserved across model hot-reloads since it tracks feature history, not model state.

Missing-value handling

OnlinePredictor._encode_value() fills missing numeric values with 0.0, matching the training and batch inference paths which use fillna(0).

Per-user reject thresholds

OnlinePredictor accepts an optional per_user_reject_thresholds dict mapping user IDs to individual reject thresholds. When present and the current row's user_id is found in the dict, the per-user threshold overrides the global reject_threshold for the rejection decision. Users not in the dict fall back to the global threshold.

Per-user thresholds are loaded from InferencePolicy.per_user_reject_thresholds by resolve_inference_config() and threaded through ResolvedInferenceConfig.

Unknown-category handling

When a categorical value is not found in the fitted encoder's vocabulary, _encode_value() checks whether the encoder contains an "__unknown__" class (present when the model was trained with encode_categoricals using min_category_freq / unknown_mask_rate). If so, the __unknown__ code is returned; otherwise -1.0 is used as a legacy fallback. This ensures that models trained with explicit unknown-category exposure produce calibrated confidence on novel inputs rather than defaulting to an out-of-vocabulary sentinel the model never learned.

taskclf.infer.online

Real-time prediction loop: poll ActivityWatch, predict, smooth, and report.

The online predictor never retrains -- it loads a frozen model bundle and applies it to live data from a running ActivityWatch server instance.

OnlinePredictor dataclass

Stateful single-bucket predictor with rolling smoothing.

Maintains a buffer of recent raw predictions and a running list of segments so that callers only need to feed one :class:FeatureRow at a time.

Source code in src/taskclf/infer/online.py
@dataclass(eq=False)
class OnlinePredictor:
    """Stateful single-bucket predictor with rolling smoothing.

    Maintains a buffer of recent raw predictions and a running list of
    segments so that callers only need to feed one :class:`FeatureRow`
    at a time.
    """

    model: lgb.Booster
    metadata: ModelMetadata
    _: KW_ONLY
    cat_encoders: dict[str, LabelEncoder] = field(default_factory=dict)
    smooth_window: int = DEFAULT_SMOOTH_WINDOW
    bucket_seconds: int = DEFAULT_BUCKET_SECONDS
    reject_threshold: float | None = DEFAULT_REJECT_THRESHOLD
    taxonomy: TaxonomyConfig | None = None
    calibrator: Calibrator = field(default_factory=IdentityCalibrator)
    calibrator_store: CalibratorStore | None = None
    per_user_reject_thresholds: dict[str, float] | None = None
    _le: LabelEncoder = field(init=False)
    _resolver: TaxonomyResolver | None = field(init=False, default=None)
    _raw_buffer: deque[str] = field(init=False)
    _bucket_ts_buffer: deque[datetime] = field(init=False)
    _all_bucket_ts: list[datetime] = field(init=False, default_factory=list)
    _all_raw: list[str] = field(init=False, default_factory=list)
    _all_smoothed: list[str] = field(init=False, default_factory=list)
    _feature_columns: list[str] = field(init=False)
    _categorical_columns: set[str] = field(init=False)

    def __post_init__(self) -> None:
        self._le = LabelEncoder()
        self._le.fit(_SORTED_LABELS)
        self._feature_columns = get_feature_columns(self.metadata.schema_version)
        self._categorical_columns = set(
            get_categorical_columns(self.metadata.schema_version)
        )

        if self.taxonomy is not None:
            self._resolver = TaxonomyResolver(self.taxonomy)

        maxlen = max(self.smooth_window, 1)
        self._raw_buffer = deque(maxlen=maxlen)
        self._bucket_ts_buffer = deque(maxlen=maxlen)

    def _encode_value(self, col: str, value: Any) -> float:
        """Encode a single feature value, handling categoricals and nulls."""
        if col in self._categorical_columns:
            le = self.cat_encoders.get(col)
            if le is not None:
                str_val = str(value)
                if str_val in set(le.classes_):
                    return float(le.transform([str_val])[0])
                if "__unknown__" in set(le.classes_):
                    return float(le.transform(["__unknown__"])[0])
            return -1.0
        return float(value) if value is not None else 0.0

    def predict_bucket(self, row: FeatureRowBase) -> WindowPrediction:
        """Predict a single bucket and return a full :class:`WindowPrediction`.

        Pipeline: raw model proba -> calibrate -> reject check ->
        rolling majority smoothing (on core labels) -> taxonomy resolve
        -> assemble ``WindowPrediction``.

        Args:
            row: A validated :class:`FeatureRow` for one time bucket.

        Returns:
            A :class:`WindowPrediction` containing core and mapped
            predictions, confidence, and rejection status.
        """
        x = np.array(
            [[self._encode_value(c, getattr(row, c)) for c in self._feature_columns]],
            dtype=np.float64,
        )
        raw_proba: np.ndarray = np.asarray(self.model.predict(x))

        if self.calibrator_store is not None:
            cal = self.calibrator_store.get_calibrator(row.user_id)
        else:
            cal = self.calibrator
        calibrated = cal.calibrate(raw_proba)
        proba_vec: np.ndarray = calibrated[0]

        confidence = float(proba_vec.max())
        pred_idx = int(proba_vec.argmax())
        core_label_name: str = self._le.inverse_transform([pred_idx])[0]

        effective_threshold = self.reject_threshold
        if (
            self.per_user_reject_thresholds is not None
            and row.user_id in self.per_user_reject_thresholds
        ):
            effective_threshold = self.per_user_reject_thresholds[row.user_id]

        is_rejected = (
            effective_threshold is not None and confidence < effective_threshold
        )

        smoothing_label = MIXED_UNKNOWN if is_rejected else core_label_name
        self._raw_buffer.append(smoothing_label)
        self._bucket_ts_buffer.append(row.bucket_start_ts)

        smoothed = rolling_majority(list(self._raw_buffer), window=self.smooth_window)
        smoothed_label = smoothed[-1]

        self._all_bucket_ts.append(row.bucket_start_ts)
        self._all_raw.append(smoothing_label)
        self._all_smoothed.append(smoothed_label)

        if self._resolver is not None:
            tax_result = self._resolver.resolve(
                pred_idx,
                proba_vec,
                is_rejected=is_rejected,
            )
            mapped_label_name = tax_result.mapped_label
            mapped_probs = tax_result.mapped_probs
        else:
            mapped_label_name = smoothed_label
            mapped_probs = {
                lbl: float(proba_vec[i]) for i, lbl in enumerate(_SORTED_LABELS)
            }

        return WindowPrediction(
            user_id=row.user_id,
            bucket_start_ts=row.bucket_start_ts,
            core_label_id=pred_idx,
            core_label_name=core_label_name,
            core_probs=[round(float(p), 6) for p in proba_vec],
            confidence=round(confidence, 6),
            is_rejected=is_rejected,
            mapped_label_name=mapped_label_name,
            mapped_probs=mapped_probs,
            model_version=self.metadata.schema_hash,
            schema_version=f"features_{self.metadata.schema_version}",
            label_version="labels_v1",
        )

    def get_segments(self) -> list[Segment]:
        """Return the running segment list built from all predictions so far.

        Applies hysteresis merging so segments shorter than
        ``MIN_BLOCK_DURATION_SECONDS`` are absorbed by their neighbours.
        """
        if not self._all_bucket_ts:
            return []
        segments = segmentize(
            self._all_bucket_ts,
            self._all_smoothed,
            bucket_seconds=self.bucket_seconds,
        )
        return merge_short_segments(segments, bucket_seconds=self.bucket_seconds)

predict_bucket(row)

Predict a single bucket and return a full :class:WindowPrediction.

Pipeline: raw model proba -> calibrate -> reject check -> rolling majority smoothing (on core labels) -> taxonomy resolve -> assemble WindowPrediction.

Parameters:

Name Type Description Default
row FeatureRowBase

A validated :class:FeatureRow for one time bucket.

required

Returns:

Name Type Description
A WindowPrediction

class:WindowPrediction containing core and mapped

WindowPrediction

predictions, confidence, and rejection status.

Source code in src/taskclf/infer/online.py
def predict_bucket(self, row: FeatureRowBase) -> WindowPrediction:
    """Predict a single bucket and return a full :class:`WindowPrediction`.

    Pipeline: raw model proba -> calibrate -> reject check ->
    rolling majority smoothing (on core labels) -> taxonomy resolve
    -> assemble ``WindowPrediction``.

    Args:
        row: A validated :class:`FeatureRow` for one time bucket.

    Returns:
        A :class:`WindowPrediction` containing core and mapped
        predictions, confidence, and rejection status.
    """
    x = np.array(
        [[self._encode_value(c, getattr(row, c)) for c in self._feature_columns]],
        dtype=np.float64,
    )
    raw_proba: np.ndarray = np.asarray(self.model.predict(x))

    if self.calibrator_store is not None:
        cal = self.calibrator_store.get_calibrator(row.user_id)
    else:
        cal = self.calibrator
    calibrated = cal.calibrate(raw_proba)
    proba_vec: np.ndarray = calibrated[0]

    confidence = float(proba_vec.max())
    pred_idx = int(proba_vec.argmax())
    core_label_name: str = self._le.inverse_transform([pred_idx])[0]

    effective_threshold = self.reject_threshold
    if (
        self.per_user_reject_thresholds is not None
        and row.user_id in self.per_user_reject_thresholds
    ):
        effective_threshold = self.per_user_reject_thresholds[row.user_id]

    is_rejected = (
        effective_threshold is not None and confidence < effective_threshold
    )

    smoothing_label = MIXED_UNKNOWN if is_rejected else core_label_name
    self._raw_buffer.append(smoothing_label)
    self._bucket_ts_buffer.append(row.bucket_start_ts)

    smoothed = rolling_majority(list(self._raw_buffer), window=self.smooth_window)
    smoothed_label = smoothed[-1]

    self._all_bucket_ts.append(row.bucket_start_ts)
    self._all_raw.append(smoothing_label)
    self._all_smoothed.append(smoothed_label)

    if self._resolver is not None:
        tax_result = self._resolver.resolve(
            pred_idx,
            proba_vec,
            is_rejected=is_rejected,
        )
        mapped_label_name = tax_result.mapped_label
        mapped_probs = tax_result.mapped_probs
    else:
        mapped_label_name = smoothed_label
        mapped_probs = {
            lbl: float(proba_vec[i]) for i, lbl in enumerate(_SORTED_LABELS)
        }

    return WindowPrediction(
        user_id=row.user_id,
        bucket_start_ts=row.bucket_start_ts,
        core_label_id=pred_idx,
        core_label_name=core_label_name,
        core_probs=[round(float(p), 6) for p in proba_vec],
        confidence=round(confidence, 6),
        is_rejected=is_rejected,
        mapped_label_name=mapped_label_name,
        mapped_probs=mapped_probs,
        model_version=self.metadata.schema_hash,
        schema_version=f"features_{self.metadata.schema_version}",
        label_version="labels_v1",
    )

get_segments()

Return the running segment list built from all predictions so far.

Applies hysteresis merging so segments shorter than MIN_BLOCK_DURATION_SECONDS are absorbed by their neighbours.

Source code in src/taskclf/infer/online.py
def get_segments(self) -> list[Segment]:
    """Return the running segment list built from all predictions so far.

    Applies hysteresis merging so segments shorter than
    ``MIN_BLOCK_DURATION_SECONDS`` are absorbed by their neighbours.
    """
    if not self._all_bucket_ts:
        return []
    segments = segmentize(
        self._all_bucket_ts,
        self._all_smoothed,
        bucket_seconds=self.bucket_seconds,
    )
    return merge_short_segments(segments, bucket_seconds=self.bucket_seconds)

run_online_loop(*, model_dir, models_dir=None, aw_host=DEFAULT_AW_HOST, poll_seconds=DEFAULT_POLL_SECONDS, smooth_window=DEFAULT_SMOOTH_WINDOW, title_salt=DEFAULT_TITLE_SALT, out_dir=Path(DEFAULT_OUT_DIR), bucket_seconds=DEFAULT_BUCKET_SECONDS, idle_gap_seconds=DEFAULT_IDLE_GAP_SECONDS, reject_threshold=DEFAULT_REJECT_THRESHOLD, taxonomy_path=None, calibrator_path=None, calibrator_store_path=None, label_queue_path=None, label_confidence_threshold=DEFAULT_LABEL_CONFIDENCE_THRESHOLD)

Poll ActivityWatch, predict, smooth, and write results continuously.

Runs until interrupted with KeyboardInterrupt (Ctrl+C). On shutdown, writes final segments and prints a summary.

Session state is tracked across poll cycles. A new session starts when the gap since the last observed event exceeds idle_gap_seconds.

When models_dir is provided, the loop watches models/active.json for changes and hot-reloads the model bundle without restarting. The swap only occurs after the new bundle loads successfully; on failure the current model is kept.

Parameters:

Name Type Description Default
model_dir Path

Path to a trained model bundle directory.

required
models_dir Path | None

Base directory for model bundles. When provided, enables automatic model reload on active.json changes.

None
aw_host str

Base URL of the ActivityWatch server.

DEFAULT_AW_HOST
poll_seconds int

Seconds between polling iterations.

DEFAULT_POLL_SECONDS
smooth_window int

Rolling majority window size.

DEFAULT_SMOOTH_WINDOW
title_salt str

Salt for hashing window titles.

DEFAULT_TITLE_SALT
out_dir Path

Directory for predictions.csv and segments.json.

Path(DEFAULT_OUT_DIR)
bucket_seconds int

Width of each time bucket in seconds.

DEFAULT_BUCKET_SECONDS
idle_gap_seconds float

Minimum gap (seconds) that starts a new session.

DEFAULT_IDLE_GAP_SECONDS
reject_threshold float | None

If given, predictions with max(proba) < reject_threshold become Mixed/Unknown.

DEFAULT_REJECT_THRESHOLD
taxonomy_path Path | None

Optional path to a taxonomy YAML file. When provided, output labels are mapped to user-defined buckets.

None
calibrator_path Path | None

Optional path to a calibrator JSON file. When provided, raw model probabilities are calibrated before the reject decision.

None
calibrator_store_path Path | None

Optional path to a calibrator store directory. When provided, per-user calibration is applied. Takes precedence over calibrator_path.

None
label_queue_path Path | None

Optional path to the labeling queue JSON. When provided, low-confidence predictions are auto-enqueued for manual labeling.

None
label_confidence_threshold float

Predictions with confidence below this value are enqueued when label_queue_path is set.

DEFAULT_LABEL_CONFIDENCE_THRESHOLD
Source code in src/taskclf/infer/online.py
def run_online_loop(
    *,
    model_dir: Path,
    models_dir: Path | None = None,
    aw_host: str = DEFAULT_AW_HOST,
    poll_seconds: int = DEFAULT_POLL_SECONDS,
    smooth_window: int = DEFAULT_SMOOTH_WINDOW,
    title_salt: str = DEFAULT_TITLE_SALT,
    out_dir: Path = Path(DEFAULT_OUT_DIR),
    bucket_seconds: int = DEFAULT_BUCKET_SECONDS,
    idle_gap_seconds: float = DEFAULT_IDLE_GAP_SECONDS,
    reject_threshold: float | None = DEFAULT_REJECT_THRESHOLD,
    taxonomy_path: Path | None = None,
    calibrator_path: Path | None = None,
    calibrator_store_path: Path | None = None,
    label_queue_path: Path | None = None,
    label_confidence_threshold: float = DEFAULT_LABEL_CONFIDENCE_THRESHOLD,
) -> None:
    """Poll ActivityWatch, predict, smooth, and write results continuously.

    Runs until interrupted with ``KeyboardInterrupt`` (Ctrl+C).  On
    shutdown, writes final segments and prints a summary.

    Session state is tracked across poll cycles.  A new session starts
    when the gap since the last observed event exceeds
    *idle_gap_seconds*.

    When *models_dir* is provided, the loop watches
    ``models/active.json`` for changes and hot-reloads the model bundle
    without restarting.  The swap only occurs after the new bundle loads
    successfully; on failure the current model is kept.

    Args:
        model_dir: Path to a trained model bundle directory.
        models_dir: Base directory for model bundles.  When provided,
            enables automatic model reload on ``active.json`` changes.
        aw_host: Base URL of the ActivityWatch server.
        poll_seconds: Seconds between polling iterations.
        smooth_window: Rolling majority window size.
        title_salt: Salt for hashing window titles.
        out_dir: Directory for ``predictions.csv`` and ``segments.json``.
        bucket_seconds: Width of each time bucket in seconds.
        idle_gap_seconds: Minimum gap (seconds) that starts a new session.
        reject_threshold: If given, predictions with
            ``max(proba) < reject_threshold`` become ``Mixed/Unknown``.
        taxonomy_path: Optional path to a taxonomy YAML file.  When
            provided, output labels are mapped to user-defined buckets.
        calibrator_path: Optional path to a calibrator JSON file.  When
            provided, raw model probabilities are calibrated before the
            reject decision.
        calibrator_store_path: Optional path to a calibrator store
            directory.  When provided, per-user calibration is applied.
            Takes precedence over *calibrator_path*.
        label_queue_path: Optional path to the labeling queue JSON.
            When provided, low-confidence predictions are auto-enqueued
            for manual labeling.
        label_confidence_threshold: Predictions with confidence below
            this value are enqueued when *label_queue_path* is set.
    """
    from taskclf.adapters.activitywatch.client import (
        AWConnectionError,
        AWTimeoutError,
        fetch_aw_events,
        fetch_aw_input_events,
        find_input_bucket_id,
        find_window_bucket_id,
    )
    from taskclf.features.build import build_features_from_aw_events
    from taskclf.infer.calibration import load_calibrator, load_calibrator_store
    from taskclf.infer.feature_state import OnlineFeatureState
    from taskclf.infer.resolve import (
        InferencePolicyReloader,
        resolve_inference_config,
    )
    from taskclf.infer.taxonomy import load_taxonomy

    taxonomy: TaxonomyConfig | None = None
    if taxonomy_path is not None:
        taxonomy = load_taxonomy(taxonomy_path)
        logger.info(
            "Loaded taxonomy from %s (user=%s)", taxonomy_path, taxonomy.user_id
        )

    effective_threshold: float | None
    per_user_thresholds: dict[str, float] | None = None
    if models_dir is not None:
        resolved = resolve_inference_config(
            models_dir,
            model_dir_override=str(model_dir),
            reject_threshold_override=reject_threshold,
            calibrator_store_override=calibrator_store_path,
            calibrator_path_override=calibrator_path,
        )
        model = resolved.model
        metadata = resolved.metadata
        cat_encoders = resolved.cat_encoders
        effective_threshold = resolved.reject_threshold
        calibrator = resolved.calibrator
        cal_store = resolved.calibrator_store
        per_user_thresholds = resolved.per_user_reject_thresholds
    else:
        calibrator_: Calibrator = IdentityCalibrator()
        if calibrator_path is not None:
            calibrator_ = load_calibrator(calibrator_path)
            logger.info("Loaded calibrator from %s", calibrator_path)

        cal_store_: CalibratorStore | None = None
        if calibrator_store_path is not None:
            cal_store_ = load_calibrator_store(calibrator_store_path)
            logger.info(
                "Loaded calibrator store from %s (%d per-user calibrators)",
                calibrator_store_path,
                len(cal_store_.user_calibrators),
            )

        model, metadata, cat_encoders = load_model_bundle(Path(model_dir))
        logger.info("Loaded model from %s (schema=%s)", model_dir, metadata.schema_hash)
        effective_threshold = reject_threshold
        calibrator = calibrator_
        cal_store = cal_store_

    bucket_id = find_window_bucket_id(aw_host)
    logger.info("Using AW window bucket: %s", bucket_id)

    input_bucket_id = find_input_bucket_id(aw_host)
    if input_bucket_id:
        logger.info("Using AW input bucket: %s", input_bucket_id)
    else:
        logger.info(
            "No aw-watcher-input bucket found; keyboard/mouse features will be None"
        )

    predictor = OnlinePredictor(
        model,
        metadata,
        cat_encoders=cat_encoders,
        smooth_window=smooth_window,
        bucket_seconds=bucket_seconds,
        reject_threshold=effective_threshold,
        taxonomy=taxonomy,
        calibrator=calibrator,
        calibrator_store=cal_store,
        per_user_reject_thresholds=per_user_thresholds,
    )

    feature_state = OnlineFeatureState(
        bucket_seconds=bucket_seconds,
        idle_gap_seconds=idle_gap_seconds,
    )

    policy_reloader: InferencePolicyReloader | None = None
    if models_dir is not None:
        policy_reloader = InferencePolicyReloader(models_dir)

    label_queue = None
    if label_queue_path is not None:
        from taskclf.labels.queue import ActiveLabelingQueue

        label_queue = ActiveLabelingQueue(label_queue_path)
        logger.info(
            "Label queue active: %s (threshold=%.2f)",
            label_queue_path,
            label_confidence_threshold,
        )

    pred_path = out_dir / "predictions.csv"
    seg_path = out_dir / "segments.json"

    session_start: datetime | None = None
    last_event_ts: datetime | None = None
    idle_gap = timedelta(seconds=idle_gap_seconds)
    total_enqueued = 0

    print(
        f"Online inference started (polling every {poll_seconds}s, bucket={bucket_id})"
    )
    if input_bucket_id:
        print(f"Input watcher active: {input_bucket_id}")
    if policy_reloader is not None and models_dir is not None:
        print(f"Policy reload enabled (watching {models_dir})")
    print("Press Ctrl+C to stop.\n")

    try:
        while True:
            if policy_reloader is not None:
                new_config = policy_reloader.check_reload()
                if new_config is not None:
                    predictor = OnlinePredictor(
                        new_config.model,
                        new_config.metadata,
                        cat_encoders=new_config.cat_encoders,
                        smooth_window=smooth_window,
                        bucket_seconds=bucket_seconds,
                        reject_threshold=new_config.reject_threshold,
                        taxonomy=taxonomy,
                        calibrator=new_config.calibrator,
                        calibrator_store=new_config.calibrator_store,
                        per_user_reject_thresholds=new_config.per_user_reject_thresholds,
                    )
                    print(
                        f"Config reloaded (schema={new_config.metadata.schema_hash}, "
                        f"threshold={new_config.reject_threshold:.4f})"
                    )

            now = datetime.now(timezone.utc)
            window_start = now - timedelta(seconds=poll_seconds)

            try:
                events = fetch_aw_events(
                    aw_host,
                    bucket_id,
                    window_start,
                    now,
                    title_salt=title_salt,
                )
            except AWConnectionError:
                logger.warning(
                    "ActivityWatch unreachable, will retry in %ds", poll_seconds
                )
                time.sleep(poll_seconds)
                continue
            except AWTimeoutError:
                logger.warning(
                    "ActivityWatch request timed out, will retry in %ds", poll_seconds
                )
                time.sleep(poll_seconds)
                continue
            except Exception:
                logger.warning("Failed to fetch AW events, will retry", exc_info=True)
                time.sleep(poll_seconds)
                continue

            if not events:
                logger.debug("No events in window [%s, %s)", window_start, now)
                time.sleep(poll_seconds)
                continue

            input_events = None
            if input_bucket_id:
                try:
                    input_events = (
                        fetch_aw_input_events(
                            aw_host,
                            input_bucket_id,
                            window_start,
                            now,
                        )
                        or None
                    )
                except Exception:
                    logger.warning("Failed to fetch input events", exc_info=True)

            earliest_new = min(ev.timestamp for ev in events)
            if last_event_ts is not None and earliest_new - last_event_ts >= idle_gap:
                session_start = earliest_new
                logger.info("New session started at %s", session_start)

            if session_start is None:
                session_start = earliest_new

            rows = build_features_from_aw_events(
                events,
                input_events=input_events,
                bucket_seconds=bucket_seconds,
                session_start=session_start,
                schema_version=predictor.metadata.schema_version,
            )
            if not rows:
                time.sleep(poll_seconds)
                continue

            last_event_ts = max(ev.timestamp for ev in events)

            for row in rows:
                feature_state.push(row)
                context = feature_state.get_context()
                row = row.model_copy(update=context)
                prediction = predictor.predict_bucket(row)
                ts_str = row.bucket_start_ts.strftime("%H:%M")
                conf_str = f"{prediction.confidence:.2f}"
                print(
                    f"[{ts_str}] {prediction.mapped_label_name} (confidence: {conf_str})"
                )
                _append_prediction_csv(pred_path, prediction)

                if (
                    label_queue is not None
                    and prediction.confidence < label_confidence_threshold
                ):
                    import pandas as pd

                    enqueue_df = pd.DataFrame(
                        [
                            {
                                "user_id": prediction.user_id or "default-user",
                                "bucket_start_ts": prediction.bucket_start_ts,
                                "bucket_end_ts": prediction.bucket_start_ts
                                + timedelta(seconds=bucket_seconds),
                                "confidence": prediction.confidence,
                                "predicted_label": prediction.core_label_name,
                            }
                        ]
                    )
                    n = label_queue.enqueue_low_confidence(
                        enqueue_df,
                        threshold=label_confidence_threshold,
                    )
                    if n > 0:
                        total_enqueued += n
                        print(f"  → enqueued for labeling (conf={conf_str})")

            segments = predictor.get_segments()
            write_segments_json(segments, seg_path)

            time.sleep(poll_seconds)

    except KeyboardInterrupt:
        print("\nShutting down...")

    segments = predictor.get_segments()
    if segments:
        write_segments_json(segments, seg_path)
        print(f"Final segments written to {seg_path} ({len(segments)} segments)")

        try:
            from taskclf.report.daily import build_daily_report
            from taskclf.report.export import export_report_json

            report = build_daily_report(
                segments,
                bucket_seconds=bucket_seconds,
                raw_labels=predictor._all_raw or None,
                smoothed_labels=predictor._all_smoothed or None,
            )
            report_path = export_report_json(
                report, out_dir / f"report_{report.date}.json"
            )
            print(f"Daily report written to {report_path}")
            if report.flap_rate_raw is not None:
                print(
                    f"Flap rate: raw={report.flap_rate_raw:.4f}  "
                    f"smoothed={report.flap_rate_smoothed:.4f}"
                )
        except Exception:
            logger.warning("Could not generate daily report", exc_info=True)

        try:
            from taskclf.core.telemetry import TelemetryStore, compute_telemetry

            import pandas as pd

            ts_records = [{"bucket_start_ts": ts} for ts in predictor._all_bucket_ts]
            ts_df = pd.DataFrame(ts_records) if ts_records else pd.DataFrame()

            if not ts_df.empty:
                snapshot = compute_telemetry(
                    ts_df,
                    labels=predictor._all_smoothed or None,
                )
                store = TelemetryStore(out_dir / "telemetry")
                store_path = store.append(snapshot)
                print(f"Telemetry snapshot written to {store_path}")
        except Exception:
            logger.warning("Could not write telemetry snapshot", exc_info=True)
    else:
        print("No predictions were made.")

    if total_enqueued > 0:
        print(f"Enqueued {total_enqueued} low-confidence bucket(s) for labeling.")

taskclf.infer.resolve

Model resolution for inference: resolve --model-dir and hot-reload.

Bridges CLI arguments to the model registry, providing:

  • :func:resolve_model_dir — resolve an optional --model-dir to a concrete :class:~pathlib.Path using the active pointer / best-model selection fallback. Deprecated — prefer :func:resolve_inference_config.
  • :func:resolve_inference_config — resolve the full inference configuration (model + calibrator + threshold) from an :class:~taskclf.core.inference_policy.InferencePolicy.
  • :class:ActiveModelReloader — lightweight mtime-based watcher that detects active.json changes and reloads the model bundle for long-running online inference loops. Deprecated — prefer :class:InferencePolicyReloader.
  • :class:InferencePolicyReloader — watches inference_policy.json (falling back to active.json) and reloads the full inference config on change.

ModelResolutionError dataclass

Bases: Exception

Raised when no model can be resolved for inference.

Source code in src/taskclf/infer/resolve.py
@dataclass(eq=False)
class ModelResolutionError(Exception):
    """Raised when no model can be resolved for inference."""

    message: str
    report: SelectionReport | None = None

    def __post_init__(self) -> None:
        super().__init__(self.message)

ActiveModelReloader dataclass

Watch active.json and reload the model bundle on change.

.. deprecated:: Use :class:InferencePolicyReloader instead. This class only reloads the model bundle; it does not update the calibrator store or reject threshold when the policy changes.

Designed for the online inference loop: polls the file's mtime at a configurable interval and, when a change is detected, loads the new bundle. The caller only swaps to the new model after a successful load — on failure the current model is kept.

Parameters:

Name Type Description Default
models_dir Path

Directory containing active.json.

required
check_interval_s float

Minimum seconds between mtime checks.

60.0
Source code in src/taskclf/infer/resolve.py
@dataclass(eq=False)
class ActiveModelReloader:
    """Watch ``active.json`` and reload the model bundle on change.

    .. deprecated::
        Use :class:`InferencePolicyReloader` instead.  This class only
        reloads the model bundle; it does not update the calibrator
        store or reject threshold when the policy changes.

    Designed for the online inference loop: polls the file's mtime at a
    configurable interval and, when a change is detected, loads the new
    bundle.  The caller only swaps to the new model after a successful
    load — on failure the current model is kept.

    Args:
        models_dir: Directory containing ``active.json``.
        check_interval_s: Minimum seconds between mtime checks.
    """

    models_dir: Path
    check_interval_s: float = 60.0
    _active_path: Path = field(init=False)
    _last_mtime: float | None = field(init=False)
    _last_check: float = field(init=False)

    def __post_init__(self) -> None:
        self._active_path = self.models_dir / _ACTIVE_FILE
        self._last_mtime = self._current_mtime()
        self._last_check = time.monotonic()

    def _current_mtime(self) -> float | None:
        try:
            return self._active_path.stat().st_mtime
        except OSError:
            logger.debug("Could not stat %s", self._active_path, exc_info=True)
            return None

    def check_reload(
        self,
    ) -> tuple[lgb.Booster, ModelMetadata, dict[str, Any]] | None:
        """Check whether ``active.json`` changed and reload if so.

        Returns the new ``(model, metadata, cat_encoders)`` tuple when a
        reload succeeds, or ``None`` when no reload is needed or the
        reload fails (a warning is logged on failure).
        """
        now = time.monotonic()
        if now - self._last_check < self.check_interval_s:
            return None
        self._last_check = now

        mtime = self._current_mtime()
        if mtime == self._last_mtime:
            return None

        logger.info(
            "active.json changed (mtime %s -> %s), reloading", self._last_mtime, mtime
        )

        try:
            resolved = resolve_model_dir(None, self.models_dir)
            model, metadata, cat_encoders = load_model_bundle(resolved)
        except Exception:
            logger.warning(
                "Failed to reload model after active.json change; keeping current model",
                exc_info=True,
            )
            return None

        self._last_mtime = mtime
        logger.info(
            "Reloaded model from %s (schema=%s)", resolved, metadata.schema_hash
        )
        return model, metadata, cat_encoders

check_reload()

Check whether active.json changed and reload if so.

Returns the new (model, metadata, cat_encoders) tuple when a reload succeeds, or None when no reload is needed or the reload fails (a warning is logged on failure).

Source code in src/taskclf/infer/resolve.py
def check_reload(
    self,
) -> tuple[lgb.Booster, ModelMetadata, dict[str, Any]] | None:
    """Check whether ``active.json`` changed and reload if so.

    Returns the new ``(model, metadata, cat_encoders)`` tuple when a
    reload succeeds, or ``None`` when no reload is needed or the
    reload fails (a warning is logged on failure).
    """
    now = time.monotonic()
    if now - self._last_check < self.check_interval_s:
        return None
    self._last_check = now

    mtime = self._current_mtime()
    if mtime == self._last_mtime:
        return None

    logger.info(
        "active.json changed (mtime %s -> %s), reloading", self._last_mtime, mtime
    )

    try:
        resolved = resolve_model_dir(None, self.models_dir)
        model, metadata, cat_encoders = load_model_bundle(resolved)
    except Exception:
        logger.warning(
            "Failed to reload model after active.json change; keeping current model",
            exc_info=True,
        )
        return None

    self._last_mtime = mtime
    logger.info(
        "Reloaded model from %s (schema=%s)", resolved, metadata.schema_hash
    )
    return model, metadata, cat_encoders

ResolvedInferenceConfig dataclass

Fully resolved inference configuration ready for use.

Produced by :func:resolve_inference_config. Contains all loaded artifacts so callers do not need to perform additional I/O.

Source code in src/taskclf/infer/resolve.py
@dataclass(frozen=True)
class ResolvedInferenceConfig:
    """Fully resolved inference configuration ready for use.

    Produced by :func:`resolve_inference_config`.  Contains all
    loaded artifacts so callers do not need to perform additional I/O.
    """

    model: lgb.Booster
    metadata: ModelMetadata
    cat_encoders: dict[str, LabelEncoder]
    reject_threshold: float
    calibrator: Calibrator
    calibrator_store: CalibratorStore | None
    policy: InferencePolicy | None
    per_user_reject_thresholds: dict[str, float] | None = None

InferencePolicyReloader dataclass

Watch inference_policy.json and reload the full config on change.

Falls back to watching active.json when no policy file exists. Designed for the online inference loop: polls file mtimes at a configurable interval and returns a :class:ResolvedInferenceConfig when a reload is needed.

Parameters:

Name Type Description Default
models_dir Path

Directory containing policy / active pointer files.

required
check_interval_s float

Minimum seconds between mtime checks.

60.0
Source code in src/taskclf/infer/resolve.py
@dataclass(eq=False)
class InferencePolicyReloader:
    """Watch ``inference_policy.json`` and reload the full config on change.

    Falls back to watching ``active.json`` when no policy file exists.
    Designed for the online inference loop: polls file mtimes at a
    configurable interval and returns a :class:`ResolvedInferenceConfig`
    when a reload is needed.

    Args:
        models_dir: Directory containing policy / active pointer files.
        check_interval_s: Minimum seconds between mtime checks.
    """

    models_dir: Path
    check_interval_s: float = 60.0
    _policy_path: Path = field(init=False)
    _active_path: Path = field(init=False)
    _last_mtime: float | None = field(init=False)
    _last_check: float = field(init=False)

    def __post_init__(self) -> None:
        self._policy_path = self.models_dir / DEFAULT_INFERENCE_POLICY_FILE
        self._active_path = self.models_dir / _ACTIVE_FILE
        self._last_mtime = self._current_mtime()
        self._last_check = time.monotonic()

    def _watched_path(self) -> Path:
        return self._policy_path if self._policy_path.is_file() else self._active_path

    def _current_mtime(self) -> float | None:
        for path in (self._policy_path, self._active_path):
            try:
                return path.stat().st_mtime
            except OSError:
                continue
        return None

    def check_reload(self) -> ResolvedInferenceConfig | None:
        """Check whether the policy/active file changed and reload if so.

        Returns a :class:`ResolvedInferenceConfig` when a reload
        succeeds, or ``None`` when no reload is needed or the reload
        fails.
        """
        now = time.monotonic()
        if now - self._last_check < self.check_interval_s:
            return None
        self._last_check = now

        mtime = self._current_mtime()
        if mtime == self._last_mtime:
            return None

        watched = self._watched_path()
        logger.info(
            "%s changed (mtime %s -> %s), reloading",
            watched.name,
            self._last_mtime,
            mtime,
        )

        try:
            config = resolve_inference_config(self.models_dir)
        except Exception:
            logger.warning(
                "Failed to reload after %s change; keeping current config",
                watched.name,
                exc_info=True,
            )
            return None

        self._last_mtime = mtime
        return config

check_reload()

Check whether the policy/active file changed and reload if so.

Returns a :class:ResolvedInferenceConfig when a reload succeeds, or None when no reload is needed or the reload fails.

Source code in src/taskclf/infer/resolve.py
def check_reload(self) -> ResolvedInferenceConfig | None:
    """Check whether the policy/active file changed and reload if so.

    Returns a :class:`ResolvedInferenceConfig` when a reload
    succeeds, or ``None`` when no reload is needed or the reload
    fails.
    """
    now = time.monotonic()
    if now - self._last_check < self.check_interval_s:
        return None
    self._last_check = now

    mtime = self._current_mtime()
    if mtime == self._last_mtime:
        return None

    watched = self._watched_path()
    logger.info(
        "%s changed (mtime %s -> %s), reloading",
        watched.name,
        self._last_mtime,
        mtime,
    )

    try:
        config = resolve_inference_config(self.models_dir)
    except Exception:
        logger.warning(
            "Failed to reload after %s change; keeping current config",
            watched.name,
            exc_info=True,
        )
        return None

    self._last_mtime = mtime
    return config

resolve_model_dir(model_dir, models_dir, policy=None)

Resolve the model directory for inference.

.. deprecated:: Use :func:resolve_inference_config instead. This function only resolves the model bundle path; it does not load the calibrator store or reject threshold from the inference policy.

Resolution precedence:

  1. If model_dir is provided, validate that it exists and return it.
  2. Otherwise, delegate to :func:~taskclf.model_registry.resolve_active_model which reads active.json or falls back to best-model selection.
  3. If no eligible model is found, raise :class:ModelResolutionError with a descriptive message including exclusion reasons.

Parameters:

Name Type Description Default
model_dir str | None

Explicit --model-dir value from CLI, or None.

required
models_dir Path

Base directory containing promoted model bundles.

required
policy SelectionPolicy | None

Selection policy override (defaults to policy v1).

None

Returns:

Type Description
Path

Path to the resolved model bundle directory.

Raises:

Type Description
ModelResolutionError

When no model can be resolved.

Source code in src/taskclf/infer/resolve.py
def resolve_model_dir(
    model_dir: str | None,
    models_dir: Path,
    policy: SelectionPolicy | None = None,
) -> Path:
    """Resolve the model directory for inference.

    .. deprecated::
        Use :func:`resolve_inference_config` instead.  This function
        only resolves the model bundle path; it does not load the
        calibrator store or reject threshold from the inference policy.

    Resolution precedence:

    1. If *model_dir* is provided, validate that it exists and return it.
    2. Otherwise, delegate to :func:`~taskclf.model_registry.resolve_active_model`
       which reads ``active.json`` or falls back to best-model selection.
    3. If no eligible model is found, raise :class:`ModelResolutionError`
       with a descriptive message including exclusion reasons.

    Args:
        model_dir: Explicit ``--model-dir`` value from CLI, or ``None``.
        models_dir: Base directory containing promoted model bundles.
        policy: Selection policy override (defaults to policy v1).

    Returns:
        Path to the resolved model bundle directory.

    Raises:
        ModelResolutionError: When no model can be resolved.
    """
    if model_dir is not None:
        path = Path(model_dir)
        if not path.is_dir():
            raise ModelResolutionError(
                f"Explicit --model-dir does not exist: {model_dir}"
            )
        return path

    if not models_dir.is_dir():
        raise ModelResolutionError(
            f"Models directory does not exist: {models_dir}. "
            "Provide --model-dir explicitly or train a model first."
        )

    bundle, report = resolve_active_model(models_dir, policy)

    if bundle is not None:
        logger.info("Resolved model: %s", bundle.path)
        return bundle.path

    lines = [
        f"No eligible model found in {models_dir}.",
        "Provide --model-dir explicitly or train a compatible model.",
    ]
    if report is not None and report.excluded:
        lines.append("Excluded bundles:")
        for rec in report.excluded:
            lines.append(f"  - {rec.model_id}: {rec.reason}")
    raise ModelResolutionError("\n".join(lines), report=report)

resolve_inference_config(models_dir, *, model_dir_override=None, reject_threshold_override=None, calibrator_store_override=None, calibrator_path_override=None)

Resolve the full inference configuration from policy or fallback.

Resolution precedence:

  1. Explicit model_dir_override — bypasses policy; uses override flags for threshold and calibrator.
  2. models/inference_policy.json — loads model, calibrator store, and threshold from the policy. Explicit overrides still take precedence for individual fields.
  3. models/active.json + code defaults — deprecated legacy fallback.
  4. Best-model selection + code defaults — no-config fallback.

Parameters:

Name Type Description Default
models_dir Path

The models/ directory.

required
model_dir_override str | None

Explicit --model-dir value (takes highest precedence).

None
reject_threshold_override float | None

Explicit threshold that overrides the policy value.

None
calibrator_store_override Path | None

Explicit calibrator store path that overrides the policy value.

None
calibrator_path_override Path | None

Explicit single-calibrator JSON path (lowest calibrator precedence).

None

Returns:

Type Description
ResolvedInferenceConfig

A fully resolved :class:ResolvedInferenceConfig.

Raises:

Type Description
ModelResolutionError

When no model can be resolved.

Source code in src/taskclf/infer/resolve.py
def resolve_inference_config(
    models_dir: Path,
    *,
    model_dir_override: str | None = None,
    reject_threshold_override: float | None = None,
    calibrator_store_override: Path | None = None,
    calibrator_path_override: Path | None = None,
) -> ResolvedInferenceConfig:
    """Resolve the full inference configuration from policy or fallback.

    Resolution precedence:

    1. Explicit *model_dir_override* — bypasses policy; uses override
       flags for threshold and calibrator.
    2. ``models/inference_policy.json`` — loads model, calibrator store,
       and threshold from the policy.  Explicit overrides still take
       precedence for individual fields.
    3. ``models/active.json`` + code defaults — deprecated legacy
       fallback.
    4. Best-model selection + code defaults — no-config fallback.

    Args:
        models_dir: The ``models/`` directory.
        model_dir_override: Explicit ``--model-dir`` value (takes
            highest precedence).
        reject_threshold_override: Explicit threshold that overrides
            the policy value.
        calibrator_store_override: Explicit calibrator store path that
            overrides the policy value.
        calibrator_path_override: Explicit single-calibrator JSON path
            (lowest calibrator precedence).

    Returns:
        A fully resolved :class:`ResolvedInferenceConfig`.

    Raises:
        ModelResolutionError: When no model can be resolved.
    """
    policy: InferencePolicy | None = None
    model_path: Path | None = None
    reject_threshold: float = DEFAULT_REJECT_THRESHOLD
    calibrator: Calibrator = IdentityCalibrator()
    cal_store: CalibratorStore | None = None

    if model_dir_override is not None:
        model_path = Path(model_dir_override)
        if not model_path.is_dir():
            raise ModelResolutionError(
                f"Explicit --model-dir does not exist: {model_dir_override}"
            )
    else:
        policy = load_inference_policy(models_dir)
        if policy is not None:
            base = models_dir.parent
            model_path = base / policy.model_dir
            if not model_path.is_dir():
                logger.warning(
                    "Policy model_dir %s does not exist; falling back",
                    policy.model_dir,
                )
                policy = None
                model_path = None

        if model_path is None:
            if policy is not None:
                logger.warning("Policy references missing model; falling back")
                policy = None
            logger.warning(
                "No inference policy found; falling back to active.json "
                "resolution.  Create a policy with 'taskclf policy create' "
                "or 'taskclf train tune-reject --write-policy'."
            )
            model_path = resolve_model_dir(None, models_dir)

    per_user_thresholds: dict[str, float] | None = None
    if policy is not None:
        reject_threshold = policy.reject_threshold
        per_user_thresholds = policy.per_user_reject_thresholds

    # Load calibrator store
    if calibrator_store_override is not None:
        cal_store = load_calibrator_store(calibrator_store_override)
    elif policy is not None and policy.calibrator_store_dir is not None:
        store_path = models_dir.parent / policy.calibrator_store_dir
        if store_path.is_dir():
            cal_store = load_calibrator_store(store_path)
        else:
            logger.warning(
                "Policy calibrator_store_dir %s does not exist; using identity",
                policy.calibrator_store_dir,
            )

    if calibrator_path_override is not None and cal_store is None:
        calibrator = load_calibrator(calibrator_path_override)

    if reject_threshold_override is not None:
        reject_threshold = reject_threshold_override

    model, metadata, cat_encoders = load_model_bundle(model_path)
    logger.info(
        "Resolved inference config: model=%s schema=%s threshold=%.4f "
        "calibrator_store=%s policy=%s",
        model_path.name,
        metadata.schema_hash,
        reject_threshold,
        "yes" if cal_store is not None else "no",
        "yes" if policy is not None else "legacy",
    )

    return ResolvedInferenceConfig(
        model=model,
        metadata=metadata,
        cat_encoders=cat_encoders,
        reject_threshold=reject_threshold,
        calibrator=calibrator,
        calibrator_store=cal_store,
        policy=policy,
        per_user_reject_thresholds=per_user_thresholds,
    )