Skip to content

core.telemetry

Point-in-time quality telemetry: snapshot computation and persistence.

Overview

Collects aggregate statistics on feature quality, prediction confidence, reject rates, and class distributions. Stores only numerical summaries; never raw content. TelemetryStore is implemented as a slotted dataclass and keeps the same constructor argument (store_dir).

TelemetrySnapshot

Field Type Description
timestamp datetime When the snapshot was computed
user_id str \| None Scoped user (None = global)
window_start datetime \| None Earliest bucket in the window
window_end datetime \| None Latest bucket in the window
total_windows int Number of prediction windows
feature_missingness dict[str, float] Fraction missing per feature
confidence_stats ConfidenceStats \| None mean, median, p5, p95, std
reject_rate float Fraction of rejected predictions
mean_entropy float Mean prediction entropy
class_distribution dict[str, float] Fraction per class
schema_version str Feature schema version
suggestions_per_day int Number of suggestions surfaced on the snapshot's day (default 0)

compute_telemetry

from taskclf.core.telemetry import compute_telemetry
snapshot = compute_telemetry(
    features_df,
    labels=predicted_labels,
    confidences=confidence_array,
    core_probs=probability_matrix,
    user_id="user-1",
)

TelemetryStore

Append-only JSONL store. One file per user (or global).

from taskclf.core.telemetry import TelemetryStore

store = TelemetryStore("artifacts/telemetry")
store.append(snapshot)

recent = store.read_recent(10, user_id="user-1")
in_range = store.read_range(start_dt, end_dt)

File layout:

artifacts/telemetry/
  telemetry_global.jsonl
  telemetry_user-1.jsonl
  telemetry_user-2.jsonl

SuggestionTracker

In-memory counter of suggestion events grouped by calendar date. Implements the decision-#4 guardrail: a loaded model must produce at least one suggestion per active day.

from datetime import datetime, timezone
from taskclf.core.telemetry import SuggestionTracker

tracker = SuggestionTracker()

# Record a suggestion event
tracker.record(datetime.now(tz=timezone.utc))

# Query the count
count = tracker.count_for_date("2026-03-28")

# End-of-day check (logs a warning if zero suggestions with a loaded model)
tracker.check_zero_suggestions("2026-03-28", model_loaded=True)
Method Description
record(ts) Increment the suggestion count for the date derived from ts
count_for_date(date_str) Return the count for a YYYY-MM-DD date string
check_zero_suggestions(date_str, *, model_loaded) Log a warning if model_loaded is True and count is 0

taskclf.core.telemetry

Point-in-time quality telemetry: collection and persistence.

Stores only aggregate statistics -- never raw content.

ConfidenceStats

Bases: BaseModel

Aggregate statistics on prediction confidence.

Source code in src/taskclf/core/telemetry.py
class ConfidenceStats(BaseModel):
    """Aggregate statistics on prediction confidence."""

    mean: float
    median: float
    p5: float
    p95: float
    std: float

TelemetrySnapshot

Bases: BaseModel

Point-in-time quality snapshot of a prediction window.

Source code in src/taskclf/core/telemetry.py
class TelemetrySnapshot(BaseModel):
    """Point-in-time quality snapshot of a prediction window."""

    timestamp: datetime
    user_id: str | None = None
    window_start: datetime | None = None
    window_end: datetime | None = None
    total_windows: int
    feature_missingness: dict[str, float] = Field(default_factory=dict)
    confidence_stats: ConfidenceStats | None = None
    reject_rate: float = 0.0
    mean_entropy: float = 0.0
    class_distribution: dict[str, float] = Field(default_factory=dict)
    schema_version: str = f"features_{LATEST_FEATURE_SCHEMA_VERSION}"
    suggestions_per_day: int = 0

TelemetryStore dataclass

Append-only JSONL store for telemetry snapshots.

Source code in src/taskclf/core/telemetry.py
@dataclass(eq=False)
class TelemetryStore:
    """Append-only JSONL store for telemetry snapshots."""

    store_dir: str | Path = DEFAULT_TELEMETRY_DIR
    _dir: Path = field(init=False)

    def __post_init__(self) -> None:
        self._dir = Path(self.store_dir)
        self._dir.mkdir(parents=True, exist_ok=True)

    def _path_for(self, user_id: str | None) -> Path:
        name = f"telemetry_{user_id}.jsonl" if user_id else "telemetry_global.jsonl"
        return self._dir / name

    def append(self, snapshot: TelemetrySnapshot) -> Path:
        """Append a snapshot to the appropriate JSONL file.

        Returns:
            Path of the file written to.
        """
        path = self._path_for(snapshot.user_id)
        line = snapshot.model_dump_json() + "\n"

        path.parent.mkdir(parents=True, exist_ok=True)
        fd, tmp_str = tempfile.mkstemp(dir=self._dir, suffix=".tmp")
        tmp = Path(tmp_str)
        try:
            existing = path.read_text() if path.exists() else ""
            os.write(fd, (existing + line).encode())
            os.close(fd)
            tmp.replace(path)
        except BaseException:
            os.close(fd)
            tmp.unlink(missing_ok=True)
            raise
        return path

    def read_recent(
        self,
        n: int = 10,
        *,
        user_id: str | None = None,
    ) -> list[TelemetrySnapshot]:
        """Read the last *n* snapshots.

        Args:
            n: Maximum number of snapshots to return.
            user_id: Scope to a specific user (``None`` = global).

        Returns:
            List of :class:`TelemetrySnapshot` (newest last).
        """
        path = self._path_for(user_id)
        if not path.exists():
            return []
        lines = [line for line in path.read_text().splitlines() if line.strip()]
        return [TelemetrySnapshot.model_validate_json(line) for line in lines[-n:]]

    def read_range(
        self,
        start: datetime,
        end: datetime,
        *,
        user_id: str | None = None,
    ) -> list[TelemetrySnapshot]:
        """Read snapshots whose timestamp falls within [*start*, *end*].

        Args:
            start: Inclusive lower bound.
            end: Inclusive upper bound.
            user_id: Scope to a specific user (``None`` = global).

        Returns:
            Matching snapshots ordered by timestamp.
        """
        all_snaps = self.read_recent(n=10_000, user_id=user_id)
        return [s for s in all_snaps if start <= s.timestamp <= end]

append(snapshot)

Append a snapshot to the appropriate JSONL file.

Returns:

Type Description
Path

Path of the file written to.

Source code in src/taskclf/core/telemetry.py
def append(self, snapshot: TelemetrySnapshot) -> Path:
    """Append a snapshot to the appropriate JSONL file.

    Returns:
        Path of the file written to.
    """
    path = self._path_for(snapshot.user_id)
    line = snapshot.model_dump_json() + "\n"

    path.parent.mkdir(parents=True, exist_ok=True)
    fd, tmp_str = tempfile.mkstemp(dir=self._dir, suffix=".tmp")
    tmp = Path(tmp_str)
    try:
        existing = path.read_text() if path.exists() else ""
        os.write(fd, (existing + line).encode())
        os.close(fd)
        tmp.replace(path)
    except BaseException:
        os.close(fd)
        tmp.unlink(missing_ok=True)
        raise
    return path

read_recent(n=10, *, user_id=None)

Read the last n snapshots.

Parameters:

Name Type Description Default
n int

Maximum number of snapshots to return.

10
user_id str | None

Scope to a specific user (None = global).

None

Returns:

Type Description
list[TelemetrySnapshot]

List of :class:TelemetrySnapshot (newest last).

Source code in src/taskclf/core/telemetry.py
def read_recent(
    self,
    n: int = 10,
    *,
    user_id: str | None = None,
) -> list[TelemetrySnapshot]:
    """Read the last *n* snapshots.

    Args:
        n: Maximum number of snapshots to return.
        user_id: Scope to a specific user (``None`` = global).

    Returns:
        List of :class:`TelemetrySnapshot` (newest last).
    """
    path = self._path_for(user_id)
    if not path.exists():
        return []
    lines = [line for line in path.read_text().splitlines() if line.strip()]
    return [TelemetrySnapshot.model_validate_json(line) for line in lines[-n:]]

read_range(start, end, *, user_id=None)

Read snapshots whose timestamp falls within [start, end].

Parameters:

Name Type Description Default
start datetime

Inclusive lower bound.

required
end datetime

Inclusive upper bound.

required
user_id str | None

Scope to a specific user (None = global).

None

Returns:

Type Description
list[TelemetrySnapshot]

Matching snapshots ordered by timestamp.

Source code in src/taskclf/core/telemetry.py
def read_range(
    self,
    start: datetime,
    end: datetime,
    *,
    user_id: str | None = None,
) -> list[TelemetrySnapshot]:
    """Read snapshots whose timestamp falls within [*start*, *end*].

    Args:
        start: Inclusive lower bound.
        end: Inclusive upper bound.
        user_id: Scope to a specific user (``None`` = global).

    Returns:
        Matching snapshots ordered by timestamp.
    """
    all_snaps = self.read_recent(n=10_000, user_id=user_id)
    return [s for s in all_snaps if start <= s.timestamp <= end]

SuggestionTracker dataclass

In-memory counter of suggestion events grouped by calendar date.

Used to enforce the decision-#4 guardrail: a loaded model must produce at least one suggestion per active day. The tray or online loop calls :meth:record for each non-rejected prediction surfaced to the user. At end-of-day (or on shutdown), :meth:check_zero_suggestions logs a warning if the count is zero.

Source code in src/taskclf/core/telemetry.py
@dataclass
class SuggestionTracker:
    """In-memory counter of suggestion events grouped by calendar date.

    Used to enforce the decision-#4 guardrail: a loaded model must
    produce at least one suggestion per active day.  The tray or
    online loop calls :meth:`record` for each non-rejected prediction
    surfaced to the user.  At end-of-day (or on shutdown),
    :meth:`check_zero_suggestions` logs a warning if the count is zero.
    """

    _counts: dict[str, int] = field(default_factory=lambda: defaultdict(int))

    def record(self, ts: datetime) -> None:
        """Record a suggestion event at *ts*."""
        date_key = ts.strftime("%Y-%m-%d")
        self._counts[date_key] += 1

    def count_for_date(self, date_str: str) -> int:
        """Return the suggestion count for a given date string (``YYYY-MM-DD``)."""
        return self._counts.get(date_str, 0)

    def check_zero_suggestions(self, date_str: str, *, model_loaded: bool) -> None:
        """Log a warning if *model_loaded* and zero suggestions were recorded.

        Args:
            date_str: Calendar date to check (``YYYY-MM-DD``).
            model_loaded: Whether a model was loaded during the day.
        """
        if model_loaded and self.count_for_date(date_str) == 0:
            logger.warning(
                "Zero suggestions on %s with a loaded model — "
                "reject threshold may be too high",
                date_str,
            )

record(ts)

Record a suggestion event at ts.

Source code in src/taskclf/core/telemetry.py
def record(self, ts: datetime) -> None:
    """Record a suggestion event at *ts*."""
    date_key = ts.strftime("%Y-%m-%d")
    self._counts[date_key] += 1

count_for_date(date_str)

Return the suggestion count for a given date string (YYYY-MM-DD).

Source code in src/taskclf/core/telemetry.py
def count_for_date(self, date_str: str) -> int:
    """Return the suggestion count for a given date string (``YYYY-MM-DD``)."""
    return self._counts.get(date_str, 0)

check_zero_suggestions(date_str, *, model_loaded)

Log a warning if model_loaded and zero suggestions were recorded.

Parameters:

Name Type Description Default
date_str str

Calendar date to check (YYYY-MM-DD).

required
model_loaded bool

Whether a model was loaded during the day.

required
Source code in src/taskclf/core/telemetry.py
def check_zero_suggestions(self, date_str: str, *, model_loaded: bool) -> None:
    """Log a warning if *model_loaded* and zero suggestions were recorded.

    Args:
        date_str: Calendar date to check (``YYYY-MM-DD``).
        model_loaded: Whether a model was loaded during the day.
    """
    if model_loaded and self.count_for_date(date_str) == 0:
        logger.warning(
            "Zero suggestions on %s with a loaded model — "
            "reject threshold may be too high",
            date_str,
        )

compute_telemetry(features_df, *, labels=None, confidences=None, core_probs=None, user_id=None, reject_label=MIXED_UNKNOWN)

Compute a telemetry snapshot from features and prediction outputs.

Parameters:

Name Type Description Default
features_df DataFrame

Feature DataFrame (one row per bucket).

required
labels Sequence[str] | None

Predicted labels (post-reject).

None
confidences ndarray | Sequence[float] | None

max(proba) per row.

None
core_probs ndarray | None

Full probability matrix (n, k) for entropy.

None
user_id str | None

Optional user to scope the snapshot.

None
reject_label str

Label used for rejected predictions.

MIXED_UNKNOWN

Returns:

Type Description
TelemetrySnapshot

A populated :class:TelemetrySnapshot.

Source code in src/taskclf/core/telemetry.py
def compute_telemetry(
    features_df: pd.DataFrame,
    *,
    labels: Sequence[str] | None = None,
    confidences: np.ndarray | Sequence[float] | None = None,
    core_probs: np.ndarray | None = None,
    user_id: str | None = None,
    reject_label: str = MIXED_UNKNOWN,
) -> TelemetrySnapshot:
    """Compute a telemetry snapshot from features and prediction outputs.

    Args:
        features_df: Feature DataFrame (one row per bucket).
        labels: Predicted labels (post-reject).
        confidences: ``max(proba)`` per row.
        core_probs: Full probability matrix ``(n, k)`` for entropy.
        user_id: Optional user to scope the snapshot.
        reject_label: Label used for rejected predictions.

    Returns:
        A populated :class:`TelemetrySnapshot`.
    """
    n = len(features_df)
    now = datetime.now(tz=timezone.utc)

    missingness: dict[str, float] = {}
    for col in NUMERICAL_FEATURES:
        if col in features_df.columns:
            null_frac = float(features_df[col].isna().mean())
            missingness[col] = round(null_frac, 4)

    window_start: datetime | None = None
    window_end: datetime | None = None
    if "bucket_start_ts" in features_df.columns and n > 0:
        window_start = pd.Timestamp(
            features_df["bucket_start_ts"].min()
        ).to_pydatetime()
        window_end = pd.Timestamp(features_df["bucket_start_ts"].max()).to_pydatetime()

    conf_stats: ConfidenceStats | None = None
    if confidences is not None:
        c = np.asarray(confidences, dtype=np.float64)
        if len(c) > 0:
            conf_stats = ConfidenceStats(
                mean=round(float(np.mean(c)), 4),
                median=round(float(np.median(c)), 4),
                p5=round(float(np.percentile(c, 5)), 4),
                p95=round(float(np.percentile(c, 95)), 4),
                std=round(float(np.std(c)), 4),
            )

    rr = 0.0
    class_dist: dict[str, float] = {}
    if labels is not None and len(labels) > 0:
        rr = sum(1 for lbl in labels if lbl == reject_label) / len(labels)
        from collections import Counter

        counts = Counter(labels)
        total = sum(counts.values())
        class_dist = {k: round(v / total, 4) for k, v in sorted(counts.items())}

    mean_ent = 0.0
    if core_probs is not None:
        p = np.asarray(core_probs, dtype=np.float64)
        if len(p) > 0:
            p_clipped = np.clip(p, _EPS, 1.0)
            entropies = -np.sum(p_clipped * np.log(p_clipped), axis=1)
            mean_ent = float(np.mean(entropies))

    return TelemetrySnapshot(
        timestamp=now,
        user_id=user_id,
        window_start=window_start,
        window_end=window_end,
        total_windows=n,
        feature_missingness=missingness,
        confidence_stats=conf_stats,
        reject_rate=round(rr, 4),
        mean_entropy=round(mean_ent, 6),
        class_distribution=class_dist,
        schema_version=(
            f"features_{features_df['schema_version'].iloc[0]}"
            if "schema_version" in features_df.columns and n > 0
            else f"features_{LATEST_FEATURE_SCHEMA_VERSION}"
        ),
    )