Skip to content

infer.batch

Batch inference: predict, smooth, and segmentize over a feature DataFrame.

See Inference Contract for the canonical pipeline order and how this module fits the batch runtime path.

Overview

run_batch_inference executes the full post-training prediction pipeline in a single call:

predict_proba → calibrate → reject → smooth → segmentize → hysteresis merge

Each step is also available as a standalone function for granular use.

BatchInferenceResult

Frozen dataclass returned by run_batch_inference. Core prediction fields are always populated; taxonomy-mapped fields are None when no taxonomy config was provided.

Field Type Description
raw_labels list[str] Pre-smoothing labels (rejected buckets become Mixed/Unknown)
smoothed_labels list[str] Post-smoothing labels
segments list[Segment] Hysteresis-merged contiguous segments
confidences np.ndarray max(proba) per row
is_rejected np.ndarray Boolean rejection flags
core_probs np.ndarray (N, 8) probability matrix
mapped_labels list[str] \| None Taxonomy-mapped labels (if taxonomy provided)
mapped_probs list[dict] \| None Per-bucket mapped probabilities (if taxonomy provided)

Functions

predict_proba

Return the raw probability matrix (n_rows, n_classes) for a feature DataFrame. Applies categorical encoding via encode_categoricals before prediction.

predict_labels

Run the model and return predicted label strings. When reject_threshold is set, predictions with max(proba) below the threshold are replaced with Mixed/Unknown.

run_batch_inference

End-to-end pipeline that predicts, calibrates, rejects, smooths, segmentizes, and applies hysteresis merging.

Optional parameters:

  • calibrator -- a single Calibrator instance applied to all rows.
  • calibrator_store -- a CalibratorStore for per-user calibration; takes precedence over calibrator when user_id is present in the DataFrame.
  • taxonomy -- a TaxonomyConfig to map core labels to user-defined buckets, populating mapped_labels and mapped_probs on the result.
  • reject_threshold -- confidence floor; predictions below this become Mixed/Unknown before smoothing.
  • smooth_window -- window size for rolling_majority (default from core.defaults).

write_predictions_csv

Write per-bucket predictions to CSV with columns for timestamp, predicted label, confidence, rejection flag, mapped label, and core probabilities.

write_segments_json / read_segments_json

Serialize and deserialize Segment lists as JSON. Timestamps are stored in ISO 8601 format.

Usage

from pathlib import Path
from taskclf.infer.batch import run_batch_inference, write_segments_json

result = run_batch_inference(
    model, features_df,
    cat_encoders=cat_encoders,
    reject_threshold=0.40,
)

print(f"{len(result.segments)} segments, "
      f"{result.is_rejected.sum()} rejected buckets")

write_segments_json(result.segments, Path("artifacts/segments.json"))

taskclf.infer.batch

Batch inference: predict, smooth, and segmentize over a feature DataFrame.

BatchInferenceResult dataclass

Container for batch inference outputs.

Always contains core prediction fields. Taxonomy-mapped fields (mapped_labels, mapped_probs) are None when no taxonomy config was provided.

Source code in src/taskclf/infer/batch.py
@dataclass(frozen=True, slots=True)
class BatchInferenceResult:
    """Container for batch inference outputs.

    Always contains core prediction fields.  Taxonomy-mapped fields
    (``mapped_labels``, ``mapped_probs``) are ``None`` when no taxonomy
    config was provided.
    """

    raw_labels: list[str]
    smoothed_labels: list[str]
    segments: list[Segment]
    confidences: np.ndarray
    is_rejected: np.ndarray
    core_probs: np.ndarray
    mapped_labels: list[str] | None = field(default=None)
    mapped_probs: list[dict[str, float]] | None = field(default=None)

predict_proba(model, features_df, cat_encoders=None, *, schema_version=None)

Return raw probability matrix for features_df.

Parameters:

Name Type Description Default
model Booster

Trained LightGBM booster.

required
features_df DataFrame

Feature DataFrame with the model feature columns for the selected schema.

required
cat_encoders dict[str, LabelEncoder] | None

Pre-fitted categorical encoders for string columns.

None
schema_version str | None

"v1", "v2", or "v3". When omitted, inferred from features_df.schema_version if present.

None

Returns:

Type Description
ndarray

Probability matrix of shape (n_rows, n_classes).

Source code in src/taskclf/infer/batch.py
def predict_proba(
    model: lgb.Booster,
    features_df: pd.DataFrame,
    cat_encoders: dict[str, LabelEncoder] | None = None,
    *,
    schema_version: str | None = None,
) -> np.ndarray:
    """Return raw probability matrix for *features_df*.

    Args:
        model: Trained LightGBM booster.
        features_df: Feature DataFrame with the model feature columns for
            the selected schema.
        cat_encoders: Pre-fitted categorical encoders for string columns.
        schema_version: ``"v1"``, ``"v2"``, or ``"v3"``. When omitted,
            inferred from ``features_df.schema_version`` if present.

    Returns:
        Probability matrix of shape ``(n_rows, n_classes)``.
    """
    resolved_schema_version = _resolve_schema_version(features_df, schema_version)
    feature_columns = get_feature_columns(resolved_schema_version)
    feat_df = features_df[feature_columns].copy()
    feat_df, _ = encode_categoricals(
        feat_df,
        cat_encoders,
        schema_version=resolved_schema_version,
    )
    x = feat_df.fillna(0).to_numpy(dtype=np.float64)
    return np.asarray(model.predict(x))

predict_labels(model, features_df, label_encoder, cat_encoders=None, *, reject_threshold=None, schema_version=None)

Run the model on features_df and return predicted label strings.

When reject_threshold is set, any prediction whose maximum probability falls below the threshold is replaced with Mixed/Unknown.

Parameters:

Name Type Description Default
model Booster

Trained LightGBM booster.

required
features_df DataFrame

Feature DataFrame with the model feature columns for the selected schema.

required
label_encoder LabelEncoder

Encoder fitted on the canonical label vocabulary.

required
cat_encoders dict[str, LabelEncoder] | None

Pre-fitted categorical encoders for string columns.

None
reject_threshold float | None

If given, predictions with max(proba) < reject_threshold become Mixed/Unknown.

None

Returns:

Type Description
list[str]

Predicted label per row.

Source code in src/taskclf/infer/batch.py
def predict_labels(
    model: lgb.Booster,
    features_df: pd.DataFrame,
    label_encoder: LabelEncoder,
    cat_encoders: dict[str, LabelEncoder] | None = None,
    *,
    reject_threshold: float | None = None,
    schema_version: str | None = None,
) -> list[str]:
    """Run the model on *features_df* and return predicted label strings.

    When *reject_threshold* is set, any prediction whose maximum
    probability falls below the threshold is replaced with
    ``Mixed/Unknown``.

    Args:
        model: Trained LightGBM booster.
        features_df: Feature DataFrame with the model feature columns for
            the selected schema.
        label_encoder: Encoder fitted on the canonical label vocabulary.
        cat_encoders: Pre-fitted categorical encoders for string columns.
        reject_threshold: If given, predictions with
            ``max(proba) < reject_threshold`` become ``Mixed/Unknown``.

    Returns:
        Predicted label per row.
    """
    proba = predict_proba(
        model,
        features_df,
        cat_encoders,
        schema_version=schema_version,
    )
    pred_indices = proba.argmax(axis=1)
    labels = list(label_encoder.inverse_transform(pred_indices))

    if reject_threshold is not None:
        confidences = proba.max(axis=1)
        labels = [
            MIXED_UNKNOWN if conf < reject_threshold else lbl
            for lbl, conf in zip(labels, confidences)
        ]

    return labels

run_batch_inference(model, features_df, *, cat_encoders=None, smooth_window=DEFAULT_SMOOTH_WINDOW, bucket_seconds=DEFAULT_BUCKET_SECONDS, reject_threshold=None, taxonomy=None, calibrator=None, calibrator_store=None, schema_version=None)

Predict, smooth, segmentize, and apply hysteresis merging.

Parameters:

Name Type Description Default
model Booster

Trained LightGBM booster.

required
features_df DataFrame

Feature DataFrame with the selected schema's feature columns and bucket_start_ts.

required
cat_encoders dict[str, LabelEncoder] | None

Pre-fitted categorical encoders for string columns.

None
smooth_window int

Window size for rolling-majority smoothing.

DEFAULT_SMOOTH_WINDOW
bucket_seconds int

Width of each time bucket in seconds.

DEFAULT_BUCKET_SECONDS
reject_threshold float | None

If given, predictions with max(proba) < reject_threshold become Mixed/Unknown before smoothing.

None
taxonomy TaxonomyConfig | None

Optional taxonomy config. When provided, core labels are mapped to user-defined buckets and mapped_labels / mapped_probs are populated on the result.

None
calibrator Calibrator | None

Optional probability calibrator. When provided, raw model probabilities are calibrated before the reject decision.

None
calibrator_store CalibratorStore | None

Optional per-user calibrator store. When provided, per-user calibration is applied using the user_id column in features_df. Takes precedence over calibrator.

None

Returns:

Name Type Description
A BatchInferenceResult

class:BatchInferenceResult with core predictions, segments

BatchInferenceResult

(hysteresis-merged), and optional taxonomy-mapped outputs.

Source code in src/taskclf/infer/batch.py
def run_batch_inference(
    model: lgb.Booster,
    features_df: pd.DataFrame,
    *,
    cat_encoders: dict[str, LabelEncoder] | None = None,
    smooth_window: int = DEFAULT_SMOOTH_WINDOW,
    bucket_seconds: int = DEFAULT_BUCKET_SECONDS,
    reject_threshold: float | None = None,
    taxonomy: TaxonomyConfig | None = None,
    calibrator: Calibrator | None = None,
    calibrator_store: CalibratorStore | None = None,
    schema_version: str | None = None,
) -> BatchInferenceResult:
    """Predict, smooth, segmentize, and apply hysteresis merging.

    Args:
        model: Trained LightGBM booster.
        features_df: Feature DataFrame with the selected schema's feature
            columns and ``bucket_start_ts``.
        cat_encoders: Pre-fitted categorical encoders for string columns.
        smooth_window: Window size for rolling-majority smoothing.
        bucket_seconds: Width of each time bucket in seconds.
        reject_threshold: If given, predictions with
            ``max(proba) < reject_threshold`` become ``Mixed/Unknown``
            before smoothing.
        taxonomy: Optional taxonomy config.  When provided, core labels
            are mapped to user-defined buckets and ``mapped_labels`` /
            ``mapped_probs`` are populated on the result.
        calibrator: Optional probability calibrator.  When provided,
            raw model probabilities are calibrated before the reject
            decision.
        calibrator_store: Optional per-user calibrator store.  When
            provided, per-user calibration is applied using the
            ``user_id`` column in *features_df*.  Takes precedence
            over *calibrator*.

    Returns:
        A :class:`BatchInferenceResult` with core predictions, segments
        (hysteresis-merged), and optional taxonomy-mapped outputs.
    """
    le = LabelEncoder()
    le.fit(sorted(LABEL_SET_V1))

    resolved_schema_version = _resolve_schema_version(features_df, schema_version)
    proba = predict_proba(
        model,
        features_df,
        cat_encoders,
        schema_version=resolved_schema_version,
    )

    if calibrator_store is not None and "user_id" in features_df.columns:
        proba = calibrator_store.calibrate_batch(
            proba,
            list(features_df["user_id"].values),
        )
    else:
        effective_calibrator = (
            calibrator if calibrator is not None else IdentityCalibrator()
        )
        proba = effective_calibrator.calibrate(proba)
    confidences = proba.max(axis=1)
    is_rejected = (
        confidences < reject_threshold
        if reject_threshold is not None
        else np.zeros(len(confidences), dtype=bool)
    )

    pred_indices = proba.argmax(axis=1)
    raw_labels: list[str] = list(le.inverse_transform(pred_indices))
    raw_labels = [
        MIXED_UNKNOWN if rej else lbl for lbl, rej in zip(raw_labels, is_rejected)
    ]

    smoothed = rolling_majority(raw_labels, window=smooth_window)

    bucket_starts: list[datetime] = [
        ts_utc_aware_get(pd.Timestamp(ts).to_pydatetime())
        for ts in features_df["bucket_start_ts"].values
    ]
    segments = segmentize(bucket_starts, smoothed, bucket_seconds=bucket_seconds)
    segments = merge_short_segments(segments, bucket_seconds=bucket_seconds)

    mapped_labels: list[str] | None = None
    mapped_probs_list: list[dict[str, float]] | None = None
    if taxonomy is not None:
        resolver = TaxonomyResolver(taxonomy)
        results = resolver.resolve_batch(pred_indices, proba, is_rejected=is_rejected)
        mapped_labels = [r.mapped_label for r in results]
        mapped_probs_list = [r.mapped_probs for r in results]

    return BatchInferenceResult(
        raw_labels=raw_labels,
        smoothed_labels=smoothed,
        segments=segments,
        confidences=confidences,
        is_rejected=is_rejected,
        core_probs=proba,
        mapped_labels=mapped_labels,
        mapped_probs=mapped_probs_list,
    )

write_predictions_csv(features_df, labels, path, *, confidences=None, is_rejected=None, mapped_labels=None, core_probs=None)

Write per-bucket predictions to a CSV file.

Parameters:

Name Type Description Default
features_df DataFrame

Source feature DataFrame.

required
labels Sequence[str]

Predicted (smoothed) label per row.

required
path Path

Destination CSV path.

required
confidences ndarray | None

Optional max-probability per row.

None
is_rejected ndarray | None

Optional boolean rejection flag per row.

None
mapped_labels Sequence[str] | None

Optional taxonomy-mapped label per row.

None
core_probs ndarray | None

Optional probability matrix (n_rows, n_classes).

None

Returns:

Type Description
Path

The path that was written.

Source code in src/taskclf/infer/batch.py
def write_predictions_csv(
    features_df: pd.DataFrame,
    labels: Sequence[str],
    path: Path,
    *,
    confidences: np.ndarray | None = None,
    is_rejected: np.ndarray | None = None,
    mapped_labels: Sequence[str] | None = None,
    core_probs: np.ndarray | None = None,
) -> Path:
    """Write per-bucket predictions to a CSV file.

    Args:
        features_df: Source feature DataFrame.
        labels: Predicted (smoothed) label per row.
        path: Destination CSV path.
        confidences: Optional max-probability per row.
        is_rejected: Optional boolean rejection flag per row.
        mapped_labels: Optional taxonomy-mapped label per row.
        core_probs: Optional probability matrix ``(n_rows, n_classes)``.

    Returns:
        The *path* that was written.
    """
    data: dict[str, object] = {
        "bucket_start_ts": features_df["bucket_start_ts"].values,
        "predicted_label": labels,
    }
    if confidences is not None:
        data["confidence"] = np.round(confidences, 4)
    if is_rejected is not None:
        data["is_rejected"] = is_rejected
    if mapped_labels is not None:
        data["mapped_label"] = list(mapped_labels)
    if core_probs is not None:
        import json as _json

        data["core_probs"] = [
            _json.dumps([round(float(p), 4) for p in row]) for row in core_probs
        ]

    out = pd.DataFrame(data)
    path.parent.mkdir(parents=True, exist_ok=True)
    out.to_csv(path, index=False)
    return path

write_segments_json(segments, path)

Write segments to a JSON file.

Parameters:

Name Type Description Default
segments Sequence[Segment]

Segment instances from :func:run_batch_inference.

required
path Path

Destination JSON path.

required

Returns:

Type Description
Path

The path that was written.

Source code in src/taskclf/infer/batch.py
def write_segments_json(segments: Sequence[Segment], path: Path) -> Path:
    """Write segments to a JSON file.

    Args:
        segments: Segment instances from :func:`run_batch_inference`.
        path: Destination JSON path.

    Returns:
        The *path* that was written.
    """
    records = []
    for seg in segments:
        d = asdict(seg)
        d["start_ts"] = seg.start_ts.isoformat()
        d["end_ts"] = seg.end_ts.isoformat()
        records.append(d)

    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(records, indent=2))
    return path

read_segments_json(path)

Read segments from a JSON file written by :func:write_segments_json.

Parameters:

Name Type Description Default
path Path

Path to an existing segments JSON file.

required

Returns:

Type Description
list[Segment]

List of Segment instances.

Source code in src/taskclf/infer/batch.py
def read_segments_json(path: Path) -> list[Segment]:
    """Read segments from a JSON file written by :func:`write_segments_json`.

    Args:
        path: Path to an existing segments JSON file.

    Returns:
        List of ``Segment`` instances.
    """
    records = json.loads(path.read_text())
    return [
        Segment(
            start_ts=ts_utc_aware_get(datetime.fromisoformat(r["start_ts"])),
            end_ts=ts_utc_aware_get(datetime.fromisoformat(r["end_ts"])),
            label=r["label"],
            bucket_count=r["bucket_count"],
        )
        for r in records
    ]