Skip to content

labels.queue

Active labeling queue for prioritising windows that need human labels.

Overview

The queue tracks buckets flagged for labeling due to low model confidence or detected drift. A daily ask limit prevents user fatigue.

LabelRequest

Pydantic model representing a single labeling request:

Field Type Description
request_id str UUID
user_id str User whose bucket needs labeling
bucket_start_ts datetime Bucket start (UTC)
bucket_end_ts datetime Bucket end (UTC, exclusive)
reason "low_confidence" \| "drift" Why enqueued
confidence float \| None Model confidence at enqueue time
predicted_label str \| None Model prediction at enqueue time
created_at datetime Creation timestamp (UTC)
status "pending" \| "labeled" \| "skipped" Lifecycle state

ActiveLabelingQueue

ActiveLabelingQueue is implemented as a slotted dataclass; constructor parameters remain queue_path and max_asks_per_day.

from taskclf.labels.queue import ActiveLabelingQueue

queue = ActiveLabelingQueue(Path("data/processed/labels_v1/queue.json"))
queue.enqueue_low_confidence(predictions_df, threshold=0.55)
pending = queue.get_pending(user_id="u1", limit=10)
queue.mark_done(pending[0].request_id, status="labeled")

taskclf.labels.queue

Active labeling queue: prioritise windows/blocks that need human labels.

The queue tracks buckets flagged for labeling (low model confidence or detected drift) and enforces a daily ask limit so users are not overwhelmed.

LabelRequest

Bases: BaseModel

A single item in the active labeling queue.

Source code in src/taskclf/labels/queue.py
class LabelRequest(BaseModel, frozen=True):
    """A single item in the active labeling queue."""

    request_id: str = Field(description="Unique request identifier (UUID).")
    user_id: str = Field(description="User whose bucket needs labeling.")
    bucket_start_ts: datetime = Field(description="Start of the bucket (aware UTC).")
    bucket_end_ts: datetime = Field(
        description="End of the bucket (aware UTC, exclusive)."
    )
    reason: Literal["low_confidence", "drift"] = Field(
        description="Why this bucket was enqueued."
    )
    confidence: float | None = Field(
        default=None, description="Model confidence at time of enqueue."
    )
    predicted_label: str | None = Field(
        default=None, description="Model prediction at time of enqueue."
    )
    created_at: datetime = Field(
        description="When the request was created (aware UTC)."
    )
    status: Literal["pending", "labeled", "skipped"] = Field(
        default="pending", description="Current lifecycle state."
    )

    @field_validator("bucket_start_ts", "bucket_end_ts", "created_at", mode="before")
    @classmethod
    def _ensure_aware_utc(cls, v: object) -> object:
        """Normalize timestamp fields to aware UTC."""
        if isinstance(v, datetime):
            from taskclf.core.time import ts_utc_aware_get

            return ts_utc_aware_get(v)
        return v

ActiveLabelingQueue dataclass

Manages a persisted queue of labeling requests.

State lives in a single JSON file; mutations are atomic (write-to-temp then rename).

Parameters:

Name Type Description Default
queue_path Path

Path to the JSON file backing the queue.

required
max_asks_per_day int

Upper bound on pending items served per calendar day (UTC).

DEFAULT_LABEL_MAX_ASKS_PER_DAY
Source code in src/taskclf/labels/queue.py
@dataclass(eq=False)
class ActiveLabelingQueue:
    """Manages a persisted queue of labeling requests.

    State lives in a single JSON file; mutations are atomic
    (write-to-temp then rename).

    Args:
        queue_path: Path to the JSON file backing the queue.
        max_asks_per_day: Upper bound on pending items served per
            calendar day (UTC).
    """

    queue_path: Path
    max_asks_per_day: int = DEFAULT_LABEL_MAX_ASKS_PER_DAY
    _items: list[LabelRequest] = field(default_factory=list, init=False)

    def __post_init__(self) -> None:
        if self.queue_path.exists():
            self._load()

    def _load(self) -> None:
        raw = json.loads(self.queue_path.read_text())
        self._items = [LabelRequest.model_validate(r) for r in raw]

    def _save(self) -> None:
        self.queue_path.parent.mkdir(parents=True, exist_ok=True)
        payload = json.dumps(
            [r.model_dump(mode="json") for r in self._items],
            indent=2,
            default=str,
        )
        fd, tmp = tempfile.mkstemp(dir=str(self.queue_path.parent), suffix=".tmp")
        try:
            os.write(fd, payload.encode())
            os.close(fd)
            os.replace(tmp, str(self.queue_path))
        except BaseException:
            os.close(fd) if not os.get_inheritable(fd) else None  # pragma: no cover
            if os.path.exists(tmp):
                os.unlink(tmp)
            raise

    def _bucket_key(self, user_id: str, bucket_start_ts: datetime) -> tuple[str, str]:
        from taskclf.core.time import ts_utc_aware_get

        return (user_id, ts_utc_aware_get(bucket_start_ts).isoformat())

    def _existing_keys(self) -> set[tuple[str, str]]:
        return {
            self._bucket_key(r.user_id, r.bucket_start_ts)
            for r in self._items
            if r.status == "pending"
        }

    def enqueue_low_confidence(
        self,
        predictions_df: pd.DataFrame,
        threshold: float = DEFAULT_LABEL_CONFIDENCE_THRESHOLD,
    ) -> int:
        """Add buckets whose model confidence is below *threshold*.

        *predictions_df* must have columns: ``user_id``,
        ``bucket_start_ts``, ``bucket_end_ts``, ``confidence``,
        ``predicted_label``.

        Returns the number of newly enqueued items.
        """
        import pandas as pd

        existing = self._existing_keys()
        added = 0
        now = datetime.now(tz=timezone.utc)

        for _, row in predictions_df.iterrows():
            if row["confidence"] >= threshold:
                continue
            key = self._bucket_key(
                row["user_id"], pd.Timestamp(row["bucket_start_ts"]).to_pydatetime()
            )
            if key in existing:
                continue
            req = LabelRequest(
                request_id=str(uuid.uuid4()),
                user_id=row["user_id"],
                bucket_start_ts=pd.Timestamp(row["bucket_start_ts"]).to_pydatetime(),
                bucket_end_ts=pd.Timestamp(row["bucket_end_ts"]).to_pydatetime(),
                reason="low_confidence",
                confidence=float(row["confidence"]),
                predicted_label=str(row["predicted_label"]),
                created_at=now,
                status="pending",
            )
            self._items.append(req)
            existing.add(key)
            added += 1

        if added:
            self._save()
        return added

    def enqueue_drift(
        self,
        buckets: Sequence[dict],
    ) -> int:
        """Add drift-flagged buckets.

        Each dict in *buckets* must contain ``user_id``,
        ``bucket_start_ts``, ``bucket_end_ts``, and optionally
        ``predicted_label`` and ``confidence``.

        Returns the number of newly enqueued items.
        """
        import pandas as pd

        existing = self._existing_keys()
        added = 0
        now = datetime.now(tz=timezone.utc)

        for b in buckets:
            ts = pd.Timestamp(b["bucket_start_ts"]).to_pydatetime()
            key = self._bucket_key(b["user_id"], ts)
            if key in existing:
                continue
            req = LabelRequest(
                request_id=str(uuid.uuid4()),
                user_id=b["user_id"],
                bucket_start_ts=ts,
                bucket_end_ts=pd.Timestamp(b["bucket_end_ts"]).to_pydatetime(),
                reason="drift",
                confidence=b.get("confidence"),
                predicted_label=b.get("predicted_label"),
                created_at=now,
                status="pending",
            )
            self._items.append(req)
            existing.add(key)
            added += 1

        if added:
            self._save()
        return added

    def get_pending(
        self,
        user_id: str | None = None,
        limit: int | None = None,
    ) -> list[LabelRequest]:
        """Return pending items, respecting the daily ask cap.

        Items are sorted by confidence ascending (lowest first) so the
        most uncertain buckets surface first.

        Args:
            user_id: Filter to a specific user (``None`` = all users).
            limit: Maximum items to return (capped by daily limit).

        Returns:
            List of pending ``LabelRequest`` instances.
        """
        pending = [r for r in self._items if r.status == "pending"]
        if user_id is not None:
            pending = [r for r in pending if r.user_id == user_id]

        pending.sort(key=lambda r: r.confidence if r.confidence is not None else 0.0)

        today = datetime.now(tz=timezone.utc).date()
        served_today = sum(
            1
            for r in self._items
            if r.status in ("labeled", "skipped") and r.created_at.date() == today
        )
        daily_remaining = max(0, self.max_asks_per_day - served_today)

        cap = daily_remaining
        if limit is not None:
            cap = min(cap, limit)

        return pending[:cap]

    def mark_done(
        self,
        request_id: str,
        status: Literal["labeled", "skipped"] = "labeled",
    ) -> LabelRequest | None:
        """Transition a request to *status*.

        Returns the updated request, or ``None`` if *request_id* was
        not found.
        """
        for i, r in enumerate(self._items):
            if r.request_id == request_id:
                updated = r.model_copy(update={"status": status})
                self._items[i] = updated
                self._save()
                return updated
        return None

    @property
    def all_items(self) -> list[LabelRequest]:
        """All items currently in the queue (any status)."""
        return list(self._items)

all_items property

All items currently in the queue (any status).

enqueue_low_confidence(predictions_df, threshold=DEFAULT_LABEL_CONFIDENCE_THRESHOLD)

Add buckets whose model confidence is below threshold.

predictions_df must have columns: user_id, bucket_start_ts, bucket_end_ts, confidence, predicted_label.

Returns the number of newly enqueued items.

Source code in src/taskclf/labels/queue.py
def enqueue_low_confidence(
    self,
    predictions_df: pd.DataFrame,
    threshold: float = DEFAULT_LABEL_CONFIDENCE_THRESHOLD,
) -> int:
    """Add buckets whose model confidence is below *threshold*.

    *predictions_df* must have columns: ``user_id``,
    ``bucket_start_ts``, ``bucket_end_ts``, ``confidence``,
    ``predicted_label``.

    Returns the number of newly enqueued items.
    """
    import pandas as pd

    existing = self._existing_keys()
    added = 0
    now = datetime.now(tz=timezone.utc)

    for _, row in predictions_df.iterrows():
        if row["confidence"] >= threshold:
            continue
        key = self._bucket_key(
            row["user_id"], pd.Timestamp(row["bucket_start_ts"]).to_pydatetime()
        )
        if key in existing:
            continue
        req = LabelRequest(
            request_id=str(uuid.uuid4()),
            user_id=row["user_id"],
            bucket_start_ts=pd.Timestamp(row["bucket_start_ts"]).to_pydatetime(),
            bucket_end_ts=pd.Timestamp(row["bucket_end_ts"]).to_pydatetime(),
            reason="low_confidence",
            confidence=float(row["confidence"]),
            predicted_label=str(row["predicted_label"]),
            created_at=now,
            status="pending",
        )
        self._items.append(req)
        existing.add(key)
        added += 1

    if added:
        self._save()
    return added

enqueue_drift(buckets)

Add drift-flagged buckets.

Each dict in buckets must contain user_id, bucket_start_ts, bucket_end_ts, and optionally predicted_label and confidence.

Returns the number of newly enqueued items.

Source code in src/taskclf/labels/queue.py
def enqueue_drift(
    self,
    buckets: Sequence[dict],
) -> int:
    """Add drift-flagged buckets.

    Each dict in *buckets* must contain ``user_id``,
    ``bucket_start_ts``, ``bucket_end_ts``, and optionally
    ``predicted_label`` and ``confidence``.

    Returns the number of newly enqueued items.
    """
    import pandas as pd

    existing = self._existing_keys()
    added = 0
    now = datetime.now(tz=timezone.utc)

    for b in buckets:
        ts = pd.Timestamp(b["bucket_start_ts"]).to_pydatetime()
        key = self._bucket_key(b["user_id"], ts)
        if key in existing:
            continue
        req = LabelRequest(
            request_id=str(uuid.uuid4()),
            user_id=b["user_id"],
            bucket_start_ts=ts,
            bucket_end_ts=pd.Timestamp(b["bucket_end_ts"]).to_pydatetime(),
            reason="drift",
            confidence=b.get("confidence"),
            predicted_label=b.get("predicted_label"),
            created_at=now,
            status="pending",
        )
        self._items.append(req)
        existing.add(key)
        added += 1

    if added:
        self._save()
    return added

get_pending(user_id=None, limit=None)

Return pending items, respecting the daily ask cap.

Items are sorted by confidence ascending (lowest first) so the most uncertain buckets surface first.

Parameters:

Name Type Description Default
user_id str | None

Filter to a specific user (None = all users).

None
limit int | None

Maximum items to return (capped by daily limit).

None

Returns:

Type Description
list[LabelRequest]

List of pending LabelRequest instances.

Source code in src/taskclf/labels/queue.py
def get_pending(
    self,
    user_id: str | None = None,
    limit: int | None = None,
) -> list[LabelRequest]:
    """Return pending items, respecting the daily ask cap.

    Items are sorted by confidence ascending (lowest first) so the
    most uncertain buckets surface first.

    Args:
        user_id: Filter to a specific user (``None`` = all users).
        limit: Maximum items to return (capped by daily limit).

    Returns:
        List of pending ``LabelRequest`` instances.
    """
    pending = [r for r in self._items if r.status == "pending"]
    if user_id is not None:
        pending = [r for r in pending if r.user_id == user_id]

    pending.sort(key=lambda r: r.confidence if r.confidence is not None else 0.0)

    today = datetime.now(tz=timezone.utc).date()
    served_today = sum(
        1
        for r in self._items
        if r.status in ("labeled", "skipped") and r.created_at.date() == today
    )
    daily_remaining = max(0, self.max_asks_per_day - served_today)

    cap = daily_remaining
    if limit is not None:
        cap = min(cap, limit)

    return pending[:cap]

mark_done(request_id, status='labeled')

Transition a request to status.

Returns the updated request, or None if request_id was not found.

Source code in src/taskclf/labels/queue.py
def mark_done(
    self,
    request_id: str,
    status: Literal["labeled", "skipped"] = "labeled",
) -> LabelRequest | None:
    """Transition a request to *status*.

    Returns the updated request, or ``None`` if *request_id* was
    not found.
    """
    for i, r in enumerate(self._items):
        if r.request_id == request_id:
            updated = r.model_copy(update={"status": status})
            self._items[i] = updated
            self._save()
            return updated
    return None