Skip to content

labels.store

Label span I/O, validation, and synthetic label generation.

Key functions

Function Description
write_label_spans Serialize spans to parquet
read_label_spans Deserialize spans from parquet (converts NaN values to None for nullable fields)
import_labels_from_csv Read spans from CSV (supports optional user_id and confidence columns)
append_label_span Append a single span to an existing parquet file with overlap validation; if the previous same-user label has extend_forward=True, its end_ts is stretched to the new span's start_ts for contiguous coverage. Before overlap checks, same-user boundaries within 1 ms are snapped together so JavaScript millisecond precision does not create false microsecond overlaps. Accepts allow_overlap=True to skip the overlap check and permit multiple labels on the same time range
overwrite_label_span Append a span, resolving overlaps by truncating, splitting, or removing conflicting same-user spans. For extend_forward labels, overlap resolution uses the label's effective coverage through the next same-user label (or open-endedly when none exists), so retrospective inserts can split a running label into before/after fragments and preserve the resumed active fragment
update_label_span Change the label, timestamps, and optionally extend_forward on an existing span identified by its start_ts and end_ts; validates the resulting span against LABEL_SET_V1
delete_label_span Remove a label span identified by its start_ts and end_ts
generate_label_summary Summarise features in a time range (top apps, input rates, session count)
generate_dummy_labels Create synthetic spans for testing

taskclf.labels.store

Label span I/O, validation, export, and synthetic label generation.

write_label_spans(spans, path)

Serialize spans to a parquet file at path.

Parameters:

Name Type Description Default
spans Sequence[LabelSpan]

Label span instances to persist.

required
path Path

Destination parquet file path.

required

Returns:

Type Description
Path

The path that was written.

Source code in src/taskclf/labels/store.py
def write_label_spans(spans: Sequence[LabelSpan], path: Path) -> Path:
    """Serialize *spans* to a parquet file at *path*.

    Args:
        spans: Label span instances to persist.
        path: Destination parquet file path.

    Returns:
        The *path* that was written.
    """
    import pandas as pd

    from taskclf.core.store import write_parquet

    df = pd.DataFrame([s.model_dump() for s in spans])
    return write_parquet(df, path)

read_label_spans(path)

Deserialize label spans from a parquet file.

Parameters:

Name Type Description Default
path Path

Path to an existing parquet file written by :func:write_label_spans.

required

Returns:

Type Description
list[LabelSpan]

List of validated LabelSpan instances.

Source code in src/taskclf/labels/store.py
def read_label_spans(path: Path) -> list[LabelSpan]:
    """Deserialize label spans from a parquet file.

    Args:
        path: Path to an existing parquet file written by
            :func:`write_label_spans`.

    Returns:
        List of validated ``LabelSpan`` instances.
    """
    import pandas as pd

    from taskclf.core.store import read_parquet

    df = read_parquet(path)
    records = df.to_dict(orient="records")
    for row in records:
        for k, v in row.items():
            if isinstance(v, float) and pd.isna(v):
                row[k] = None
    return [LabelSpan.model_validate(row) for row in records]

import_labels_from_csv(path)

Read label spans from a CSV file and validate each row.

Required columns: start_ts, end_ts, label, provenance. Optional columns: user_id, confidence.

Timestamps are parsed via pd.to_datetime so ISO-8601 and common date-time formats are accepted.

Parameters:

Name Type Description Default
path Path

Path to an existing CSV file.

required

Returns:

Type Description
list[LabelSpan]

List of validated LabelSpan instances.

Raises:

Type Description
ValueError

If required columns are missing or any row fails LabelSpan validation.

Source code in src/taskclf/labels/store.py
def import_labels_from_csv(path: Path) -> list[LabelSpan]:
    """Read label spans from a CSV file and validate each row.

    Required columns: ``start_ts``, ``end_ts``, ``label``, ``provenance``.
    Optional columns: ``user_id``, ``confidence``.

    Timestamps are parsed via ``pd.to_datetime`` so ISO-8601 and common
    date-time formats are accepted.

    Args:
        path: Path to an existing CSV file.

    Returns:
        List of validated ``LabelSpan`` instances.

    Raises:
        ValueError: If required columns are missing or any row fails
            ``LabelSpan`` validation.
    """
    import pandas as pd

    df = pd.read_csv(path)

    required = {"start_ts", "end_ts", "label", "provenance"}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(f"CSV missing required columns: {sorted(missing)}")

    df["start_ts"] = pd.to_datetime(df["start_ts"])
    df["end_ts"] = pd.to_datetime(df["end_ts"])

    has_user_id = "user_id" in df.columns
    has_confidence = "confidence" in df.columns

    spans: list[LabelSpan] = []
    for _i, row in df.iterrows():
        kwargs: dict = {
            "start_ts": row["start_ts"].to_pydatetime(),
            "end_ts": row["end_ts"].to_pydatetime(),
            "label": row["label"],
            "provenance": row["provenance"],
        }
        if has_user_id and pd.notna(row["user_id"]):
            kwargs["user_id"] = str(row["user_id"])
        if has_confidence and pd.notna(row["confidence"]):
            kwargs["confidence"] = float(row["confidence"])
        spans.append(LabelSpan(**kwargs))
    return spans

export_labels_to_csv(parquet_path, csv_path)

Export label spans from a Parquet file to CSV.

Columns written: start_ts, end_ts, label, provenance, user_id, confidence, extend_forward.

Parameters:

Name Type Description Default
parquet_path Path

Path to an existing labels Parquet file.

required
csv_path Path

Destination CSV file path.

required

Returns:

Type Description
Path

The csv_path that was written.

Raises:

Type Description
ValueError

If the Parquet file does not exist or contains no spans.

Source code in src/taskclf/labels/store.py
def export_labels_to_csv(parquet_path: Path, csv_path: Path) -> Path:
    """Export label spans from a Parquet file to CSV.

    Columns written: ``start_ts``, ``end_ts``, ``label``, ``provenance``,
    ``user_id``, ``confidence``, ``extend_forward``.

    Args:
        parquet_path: Path to an existing labels Parquet file.
        csv_path: Destination CSV file path.

    Returns:
        The *csv_path* that was written.

    Raises:
        ValueError: If the Parquet file does not exist or contains no spans.
    """
    if not parquet_path.exists():
        raise ValueError(f"Labels file not found: {parquet_path}")
    spans = read_label_spans(parquet_path)
    if not spans:
        raise ValueError("No labels to export")
    rows = [s.model_dump() for s in spans]
    fieldnames = list(rows[0].keys())
    csv_path.parent.mkdir(parents=True, exist_ok=True)
    with open(csv_path, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(rows)
    return csv_path

merge_label_spans(existing, imported)

Merge imported spans into existing, deduplicating and checking overlaps.

Deduplication key is (start_ts, end_ts, user_id); when a collision occurs the imported span wins (newer provenance).

Parameters:

Name Type Description Default
existing Sequence[LabelSpan]

Currently stored label spans.

required
imported Sequence[LabelSpan]

Newly imported label spans.

required

Returns:

Type Description
list[LabelSpan]

Merged list of LabelSpan instances, sorted by start_ts.

Raises:

Type Description
ValueError

If the merged set contains overlapping spans for the same user.

Source code in src/taskclf/labels/store.py
def merge_label_spans(
    existing: Sequence[LabelSpan],
    imported: Sequence[LabelSpan],
) -> list[LabelSpan]:
    """Merge *imported* spans into *existing*, deduplicating and checking overlaps.

    Deduplication key is ``(start_ts, end_ts, user_id)``; when a
    collision occurs the imported span wins (newer provenance).

    Args:
        existing: Currently stored label spans.
        imported: Newly imported label spans.

    Returns:
        Merged list of ``LabelSpan`` instances, sorted by ``start_ts``.

    Raises:
        ValueError: If the merged set contains overlapping spans for
            the same user.
    """
    by_key: dict[tuple, LabelSpan] = {}
    for s in existing:
        by_key[(s.start_ts, s.end_ts, s.user_id)] = s
    for s in imported:
        by_key[(s.start_ts, s.end_ts, s.user_id)] = s

    merged = sorted(by_key.values(), key=lambda s: s.start_ts)

    for i, a in enumerate(merged):
        for j, b in enumerate(merged):
            if i >= j:
                continue
            if not _same_user(a, b):
                continue
            if a.start_ts < b.end_ts and b.start_ts < a.end_ts:
                raise ValueError(
                    f"Span [{a.start_ts}, {a.end_ts}) overlaps "
                    f"[{b.start_ts}, {b.end_ts}) for user {a.user_id!r}"
                )

    return merged

append_label_span(span, path, *, allow_overlap=False)

Append a single label span to an existing (or new) parquet file.

If the most recent same-user label has extend_forward=True, its end_ts is stretched (or truncated) to span.start_ts so labels form contiguous coverage. Otherwise a plain overlap check is performed.

Parameters:

Name Type Description Default
span LabelSpan

The label span to append.

required
path Path

Parquet file to read-append-write.

required
allow_overlap bool

When True, skip the overlap check and allow multiple labels to coexist on overlapping time ranges.

False

Returns:

Type Description
Path

The path that was written.

Raises:

Type Description
ValueError

If the span overlaps an existing span for the same user (after any extension) and allow_overlap is False.

Source code in src/taskclf/labels/store.py
def append_label_span(
    span: LabelSpan,
    path: Path,
    *,
    allow_overlap: bool = False,
) -> Path:
    """Append a single label span to an existing (or new) parquet file.

    If the most recent same-user label has ``extend_forward=True``, its
    ``end_ts`` is stretched (or truncated) to ``span.start_ts`` so labels
    form contiguous coverage.  Otherwise a plain overlap check is performed.

    Args:
        span: The label span to append.
        path: Parquet file to read-append-write.
        allow_overlap: When True, skip the overlap check and allow
            multiple labels to coexist on overlapping time ranges.

    Returns:
        The *path* that was written.

    Raises:
        ValueError: If the span overlaps an existing span for the
            same user (after any extension) and *allow_overlap* is False.
    """
    existing: list[LabelSpan] = []
    if path.exists():
        existing = read_label_spans(path)

    span = _snap_same_user_adjacent_boundaries(existing, span)
    existing = _handoff_active_span_for_now_label(existing, span)

    prev: LabelSpan | None = None
    prev_idx: int | None = None
    for i, ex in enumerate(existing):
        if not _same_user(ex, span):
            continue
        if ex.start_ts >= span.start_ts:
            continue
        if prev is None or ex.start_ts > prev.start_ts:
            prev = ex
            prev_idx = i

    if prev is not None and prev_idx is not None and prev.extend_forward:
        updated = prev.model_copy(update={"end_ts": span.start_ts})
        existing[prev_idx] = updated

    if not allow_overlap:
        for ex in existing:
            if not _same_user(ex, span):
                continue
            if ex.start_ts < span.end_ts and span.start_ts < ex.end_ts:
                raise ValueError(
                    f"Span [{span.start_ts}, {span.end_ts}) overlaps "
                    f"[{ex.start_ts}, {ex.end_ts}) for user {span.user_id!r}"
                )

    existing.append(span)

    return write_label_spans(existing, path)

overwrite_label_span(span, path)

Append a label span, resolving overlaps by truncating/splitting/removing existing same-user spans that conflict with the new one.

Extend-forward handling is identical to :func:append_label_span.

For each same-user span that overlaps span:

  • Fully contained (existing inside new) -- removed.
  • Partial overlap at start (existing starts before new, ends inside new) -- existing end_ts truncated to span.start_ts.
  • Partial overlap at end (existing starts inside new, ends after new) -- existing start_ts moved to span.end_ts.
  • Fully contains (existing wraps around new) -- split into a before and after fragment.

Parameters:

Name Type Description Default
span LabelSpan

The label span to insert.

required
path Path

Parquet file to read-modify-write.

required

Returns:

Type Description
Path

The path that was written.

Source code in src/taskclf/labels/store.py
def overwrite_label_span(span: LabelSpan, path: Path) -> Path:
    """Append a label span, resolving overlaps by truncating/splitting/removing
    existing same-user spans that conflict with the new one.

    Extend-forward handling is identical to :func:`append_label_span`.

    For each same-user span that overlaps *span*:

    * **Fully contained** (existing inside new) -- removed.
    * **Partial overlap at start** (existing starts before new, ends inside
      new) -- existing ``end_ts`` truncated to ``span.start_ts``.
    * **Partial overlap at end** (existing starts inside new, ends after
      new) -- existing ``start_ts`` moved to ``span.end_ts``.
    * **Fully contains** (existing wraps around new) -- split into a
      *before* and *after* fragment.

    Args:
        span: The label span to insert.
        path: Parquet file to read-modify-write.

    Returns:
        The *path* that was written.
    """
    existing: list[LabelSpan] = []
    if path.exists():
        existing = read_label_spans(path)

    span = _snap_same_user_adjacent_boundaries(existing, span)

    # Extend-forward: stretch the most recent same-user span up to span.start_ts
    prev: LabelSpan | None = None
    prev_idx: int | None = None
    for i, ex in enumerate(existing):
        if not _same_user(ex, span):
            continue
        if ex.start_ts >= span.start_ts:
            continue
        if prev is None or ex.start_ts > prev.start_ts:
            prev = ex
            prev_idx = i

    if prev is not None and prev_idx is not None and prev.extend_forward:
        updated = prev.model_copy(update={"end_ts": span.start_ts})
        existing[prev_idx] = updated

    resolved: list[LabelSpan] = []
    for i, ex in enumerate(existing):
        if not _same_user(ex, span):
            resolved.append(ex)
            continue
        effective_end = _effective_extend_forward_end(existing, i)
        if not (
            ex.start_ts < span.end_ts
            and (effective_end is None or span.start_ts < effective_end)
        ):
            resolved.append(ex)
            continue

        if ex.start_ts < span.start_ts:
            resolved.append(ex.model_copy(update={"end_ts": span.start_ts}))

        if effective_end is None:
            resolved.append(
                ex.model_copy(update={"start_ts": span.end_ts, "end_ts": span.end_ts})
            )
            continue

        if span.end_ts < effective_end:
            resolved.append(
                ex.model_copy(update={"start_ts": span.end_ts, "end_ts": effective_end})
            )

    resolved.append(span)
    resolved.sort(key=lambda s: (s.start_ts, s.end_ts, s.label, s.user_id or ""))
    return write_label_spans(resolved, path)

update_label_span(start_ts, end_ts, new_label, path, *, new_start_ts=None, new_end_ts=None, new_extend_forward=None)

Change the label and/or timestamps on an existing span.

Parameters:

Name Type Description Default
start_ts datetime

Original start timestamp of the span (lookup key).

required
end_ts datetime

Original end timestamp of the span (lookup key).

required
new_label str

Replacement label (must be in LABEL_SET_V1).

required
path Path

Parquet file containing label spans.

required
new_start_ts datetime | None

If provided, replaces the span's start timestamp.

None
new_end_ts datetime | None

If provided, replaces the span's end timestamp.

None
new_extend_forward bool | None

If provided, replaces the span's extend_forward flag.

None

Returns:

Type Description
LabelSpan

The updated LabelSpan.

Raises:

Type Description
ValueError

If no matching span is found or the new label is invalid.

Source code in src/taskclf/labels/store.py
def update_label_span(
    start_ts: dt.datetime,
    end_ts: dt.datetime,
    new_label: str,
    path: Path,
    *,
    new_start_ts: dt.datetime | None = None,
    new_end_ts: dt.datetime | None = None,
    new_extend_forward: bool | None = None,
) -> LabelSpan:
    """Change the label and/or timestamps on an existing span.

    Args:
        start_ts: Original start timestamp of the span (lookup key).
        end_ts: Original end timestamp of the span (lookup key).
        new_label: Replacement label (must be in ``LABEL_SET_V1``).
        path: Parquet file containing label spans.
        new_start_ts: If provided, replaces the span's start timestamp.
        new_end_ts: If provided, replaces the span's end timestamp.
        new_extend_forward: If provided, replaces the span's
            ``extend_forward`` flag.

    Returns:
        The updated ``LabelSpan``.

    Raises:
        ValueError: If no matching span is found or the new label is
            invalid.
    """
    if not path.exists():
        raise ValueError("No labels file found")

    start_ts = ts_utc_aware_get(start_ts)
    end_ts = ts_utc_aware_get(end_ts)

    spans = read_label_spans(path)
    for i, s in enumerate(spans):
        if s.start_ts == start_ts and s.end_ts == end_ts:
            updated = LabelSpan(
                start_ts=new_start_ts if new_start_ts is not None else s.start_ts,
                end_ts=new_end_ts if new_end_ts is not None else s.end_ts,
                label=new_label,
                provenance=s.provenance,
                user_id=s.user_id,
                confidence=s.confidence,
                extend_forward=(
                    new_extend_forward
                    if new_extend_forward is not None
                    else s.extend_forward
                ),
            )
            spans[i] = updated
            write_label_spans(spans, path)
            return updated
    raise ValueError(f"No label found for [{start_ts}, {end_ts})")

delete_label_span(start_ts, end_ts, path)

Remove a label span identified by its timestamps.

Parameters:

Name Type Description Default
start_ts datetime

Start timestamp of the span to remove.

required
end_ts datetime

End timestamp of the span to remove.

required
path Path

Parquet file containing label spans.

required

Raises:

Type Description
ValueError

If no matching span is found.

Source code in src/taskclf/labels/store.py
def delete_label_span(
    start_ts: dt.datetime,
    end_ts: dt.datetime,
    path: Path,
) -> None:
    """Remove a label span identified by its timestamps.

    Args:
        start_ts: Start timestamp of the span to remove.
        end_ts: End timestamp of the span to remove.
        path: Parquet file containing label spans.

    Raises:
        ValueError: If no matching span is found.
    """
    if not path.exists():
        raise ValueError("No labels file found")

    start_ts = ts_utc_aware_get(start_ts)
    end_ts = ts_utc_aware_get(end_ts)

    spans = read_label_spans(path)
    original_len = len(spans)
    spans = [s for s in spans if not (s.start_ts == start_ts and s.end_ts == end_ts)]
    if len(spans) == original_len:
        raise ValueError(f"No label found for [{start_ts}, {end_ts})")
    write_label_spans(spans, path)

generate_label_summary(features_df, start_ts, end_ts)

Summarise feature rows within a time range for display in CLI / UI.

Returns a dict with top apps, aggregated interaction stats, and session count. Respects privacy: no raw titles.

Parameters:

Name Type Description Default
features_df DataFrame

Feature DataFrame with bucket_start_ts column.

required
start_ts datetime

Start of summary window (inclusive).

required
end_ts datetime

End of summary window (exclusive).

required

Returns:

Type Description
dict

Dict with keys top_apps, mean_keys_per_min,

dict

mean_clicks_per_min, mean_scroll_per_min,

dict

total_buckets, session_count.

Source code in src/taskclf/labels/store.py
def generate_label_summary(
    features_df: pd.DataFrame,
    start_ts: dt.datetime,
    end_ts: dt.datetime,
) -> dict:
    """Summarise feature rows within a time range for display in CLI / UI.

    Returns a dict with top apps, aggregated interaction stats, and
    session count.  Respects privacy: no raw titles.

    Args:
        features_df: Feature DataFrame with ``bucket_start_ts`` column.
        start_ts: Start of summary window (inclusive).
        end_ts: End of summary window (exclusive).

    Returns:
        Dict with keys ``top_apps``, ``mean_keys_per_min``,
        ``mean_clicks_per_min``, ``mean_scroll_per_min``,
        ``total_buckets``, ``session_count``.
    """
    import pandas as pd

    if features_df.empty or "bucket_start_ts" not in features_df.columns:
        return {
            "top_apps": [],
            "mean_keys_per_min": None,
            "mean_clicks_per_min": None,
            "mean_scroll_per_min": None,
            "total_buckets": 0,
            "session_count": 0,
        }

    col = features_df["bucket_start_ts"]
    col_is_utc = hasattr(col.dtype, "tz") and col.dtype.tz is not None
    if col_is_utc:
        _start = (
            pd.Timestamp(start_ts, tz="UTC")
            if start_ts.tzinfo is None
            else pd.Timestamp(start_ts).tz_convert("UTC")
        )
        _end = (
            pd.Timestamp(end_ts, tz="UTC")
            if end_ts.tzinfo is None
            else pd.Timestamp(end_ts).tz_convert("UTC")
        )
    else:
        _start = pd.Timestamp(start_ts)
        _end = pd.Timestamp(end_ts)
    mask = (col >= _start) & (col < _end)
    window = features_df.loc[mask]

    if window.empty:
        return {
            "top_apps": [],
            "mean_keys_per_min": None,
            "mean_clicks_per_min": None,
            "mean_scroll_per_min": None,
            "total_buckets": 0,
            "session_count": 0,
        }

    top_apps: list[dict] = []
    if "app_id" in window.columns:
        counts = window["app_id"].value_counts().head(5)
        top_apps = [{"app_id": app, "buckets": int(cnt)} for app, cnt in counts.items()]

    def _safe_mean(col: str) -> float | None:
        if col in window.columns:
            vals = window[col].dropna()
            if not vals.empty:
                return round(float(vals.mean()), 2)
        return None

    session_count = 0
    if "session_id" in window.columns:
        session_count = int(window["session_id"].nunique())

    return {
        "top_apps": top_apps,
        "mean_keys_per_min": _safe_mean("keys_per_min"),
        "mean_clicks_per_min": _safe_mean("clicks_per_min"),
        "mean_scroll_per_min": _safe_mean("scroll_events_per_min"),
        "total_buckets": len(window),
        "session_count": session_count,
    }

generate_dummy_labels(date, n_rows=DEFAULT_DUMMY_ROWS)

Create synthetic label spans aligned to the dummy feature timestamps.

Each span covers exactly one minute bucket, mirroring the timestamps generated by features.build.generate_dummy_features so that every feature row has a covering label.

Parameters:

Name Type Description Default
date date

Calendar date to generate spans for.

required
n_rows int

Number of one-minute spans to create.

DEFAULT_DUMMY_ROWS

Returns:

Type Description
list[LabelSpan]

List of LabelSpan instances with provenance "synthetic".

Source code in src/taskclf/labels/store.py
def generate_dummy_labels(
    date: dt.date, n_rows: int = DEFAULT_DUMMY_ROWS
) -> list[LabelSpan]:
    """Create synthetic label spans aligned to the dummy feature timestamps.

    Each span covers exactly one minute bucket, mirroring the timestamps
    generated by ``features.build.generate_dummy_features`` so that every
    feature row has a covering label.

    Args:
        date: Calendar date to generate spans for.
        n_rows: Number of one-minute spans to create.

    Returns:
        List of ``LabelSpan`` instances with provenance ``"synthetic"``.
    """
    spans: list[LabelSpan] = []
    for i in range(n_rows):
        hour = 9 + (i * 8 // max(n_rows, 1))
        minute = (i * 7) % 60
        start = dt.datetime(date.year, date.month, date.day, hour, minute)
        end = start + dt.timedelta(seconds=DEFAULT_BUCKET_SECONDS)

        app_id = _DUMMY_APPS_ORDER[i % len(_DUMMY_APPS_ORDER)]
        label = _APP_LABEL_MAP[app_id]

        spans.append(
            LabelSpan(
                start_ts=start,
                end_ts=end,
                label=label,
                provenance="synthetic",
            )
        )
    return spans