Skip to content

train.build_dataset

Training dataset builder: join features with labels, apply exclusion rules, split by time, and write X.parquet, y.parquet, and splits.json.

Usage

from pathlib import Path
from taskclf.train.build_dataset import build_training_dataset

manifest = build_training_dataset(
    features_df,
    label_spans,
    output_dir=Path("data/processed/training_dataset"),
    train_ratio=0.70,
    val_ratio=0.15,
    holdout_user_fraction=0.1,
)
print(manifest.total_rows, manifest.train_rows)

Label projection uses project_blocks_to_windows() with strict containment rules per time_spec.md Section 6: full window must fall inside a single block, conflicting multi-block overlaps are dropped.

Output artifacts

File Contents
X.parquet Feature columns + ID columns (user_id, bucket_start_ts, session_id) + schema_version
y.parquet user_id, bucket_start_ts, label, provenance
splits.json Train/val/test index lists, holdout users, and metadata (schema versions, class distribution, user count)

Exclusion rules

Windows are dropped from the dataset if:

  • They overlap multiple label blocks with conflicting labels or have no covering label.
  • All numeric features are null (no useful signal).
  • They belong to sessions shorter than MIN_BLOCK_DURATION_SECONDS (180s = 3 buckets).

taskclf.train.build_dataset

Training dataset builder: join, exclude, split, and write X/y/splits artifacts.

DatasetManifest

Bases: BaseModel

Summary returned by :func:build_training_dataset.

Source code in src/taskclf/train/build_dataset.py
class DatasetManifest(BaseModel, frozen=True):
    """Summary returned by :func:`build_training_dataset`."""

    x_path: str
    y_path: str
    splits_path: str
    total_rows: int
    train_rows: int
    val_rows: int
    test_rows: int
    excluded_rows: int
    holdout_users: list[str]
    class_distribution: dict[str, int]

build_training_dataset(features_df, label_spans, *, output_dir, train_ratio=0.7, val_ratio=0.15, holdout_user_fraction=0.0, bucket_seconds=DEFAULT_BUCKET_SECONDS)

Join features with labels, apply exclusions, split, and write artifacts.

Label projection uses strict block-to-window containment rules from time_spec.md Section 6 (full window must fall inside a single block; conflicting multi-block overlaps are dropped).

Outputs

output_dir/X.parquet -- feature matrix with ID columns and schema_version. output_dir/y.parquet -- labels keyed by user_id and bucket_start_ts. output_dir/splits.json -- train/val/test index lists and metadata.

Parameters:

Name Type Description Default
features_df DataFrame

Feature DataFrame conforming to FeatureSchemaV1.

required
label_spans Sequence[LabelSpan]

Label spans to project onto feature windows.

required
output_dir Path

Directory to write artifacts into (created if needed).

required
train_ratio float

Fraction of each user's data for training.

0.7
val_ratio float

Fraction for validation.

0.15
holdout_user_fraction float

Fraction of users held out entirely for the test set (cold-start evaluation).

0.0
bucket_seconds int

Window width in seconds.

DEFAULT_BUCKET_SECONDS

Returns:

Name Type Description
A DatasetManifest

class:DatasetManifest with paths and summary statistics.

Source code in src/taskclf/train/build_dataset.py
def build_training_dataset(
    features_df: pd.DataFrame,
    label_spans: Sequence[LabelSpan],
    *,
    output_dir: Path,
    train_ratio: float = 0.70,
    val_ratio: float = 0.15,
    holdout_user_fraction: float = 0.0,
    bucket_seconds: int = DEFAULT_BUCKET_SECONDS,
) -> DatasetManifest:
    """Join features with labels, apply exclusions, split, and write artifacts.

    Label projection uses strict block-to-window containment rules from
    ``time_spec.md`` Section 6 (full window must fall inside a single
    block; conflicting multi-block overlaps are dropped).

    Outputs:
        ``output_dir/X.parquet`` -- feature matrix with ID columns and
        ``schema_version``.
        ``output_dir/y.parquet`` -- labels keyed by ``user_id`` and
        ``bucket_start_ts``.
        ``output_dir/splits.json`` -- train/val/test index lists and
        metadata.

    Args:
        features_df: Feature DataFrame conforming to ``FeatureSchemaV1``.
        label_spans: Label spans to project onto feature windows.
        output_dir: Directory to write artifacts into (created if needed).
        train_ratio: Fraction of each user's data for training.
        val_ratio: Fraction for validation.
        holdout_user_fraction: Fraction of users held out entirely for
            the test set (cold-start evaluation).
        bucket_seconds: Window width in seconds.

    Returns:
        A :class:`DatasetManifest` with paths and summary statistics.
    """
    features_df = features_df.copy()
    if "user_id" not in features_df.columns:
        features_df["user_id"] = "default-user"

    schema_version = _resolve_schema_version(features_df)
    feature_columns = get_feature_columns(schema_version)

    labeled = project_blocks_to_windows(
        features_df, label_spans, bucket_seconds=bucket_seconds
    )
    pre_exclusion = len(labeled)

    labeled = _exclude_short_sessions(labeled, bucket_seconds=bucket_seconds)
    labeled = _exclude_missing_critical(labeled)
    labeled = labeled.sort_values("bucket_start_ts").reset_index(drop=True)

    excluded = pre_exclusion - len(labeled)

    splits = split_by_time(
        labeled,
        train_ratio=train_ratio,
        val_ratio=val_ratio,
        holdout_user_fraction=holdout_user_fraction,
    )

    id_and_meta = [c for c in _ID_COLUMNS if c in labeled.columns] + ["schema_version"]
    seen = set(id_and_meta)
    x_cols = id_and_meta + [
        c for c in feature_columns if c in labeled.columns and c not in seen
    ]
    x_df = labeled[x_cols]

    provenance_col = "provenance"
    y_cols = [c for c in ("user_id", "bucket_start_ts") if c in labeled.columns]
    y_cols.append("label")
    if provenance_col in labeled.columns:
        y_cols.append(provenance_col)
    y_df = labeled[y_cols]

    output_dir = Path(output_dir)
    x_path = output_dir / "X.parquet"
    y_path = output_dir / "y.parquet"
    splits_path = output_dir / "splits.json"

    write_parquet(x_df, x_path)
    write_parquet(y_df, y_path)

    class_dist = labeled["label"].value_counts().to_dict()

    splits_payload: dict[str, Any] = {
        "train": splits["train"],
        "val": splits["val"],
        "test": splits["test"],
        "holdout_users": splits["holdout_users"],
        "metadata": {
            "feature_schema_version": schema_version,
            "label_schema_version": "labels_v1",
            "total_rows": len(labeled),
            "excluded_rows": excluded,
            "user_count": labeled["user_id"].nunique(),
            "class_distribution": {str(k): int(v) for k, v in class_dist.items()},
            "train_ratio": train_ratio,
            "val_ratio": val_ratio,
            "holdout_user_fraction": holdout_user_fraction,
        },
    }
    splits_path.parent.mkdir(parents=True, exist_ok=True)
    splits_path.write_text(json.dumps(splits_payload, indent=2, default=str))

    return DatasetManifest(
        x_path=str(x_path),
        y_path=str(y_path),
        splits_path=str(splits_path),
        total_rows=len(labeled),
        train_rows=len(splits["train"]),
        val_rows=len(splits["val"]),
        test_rows=len(splits["test"]),
        excluded_rows=excluded,
        holdout_users=splits["holdout_users"],
        class_distribution={str(k): int(v) for k, v in class_dist.items()},
    )