Skip to content

core.schema

Feature schema versioning, deterministic hashing, and DataFrame validation. The schema is the versioned contract between feature producers (features.build) and consumers (train, infer). Per AGENTS.md, inference must refuse to run when the schema hash recorded in a model bundle differs from the hash of the feature pipeline that produced the input data.

Feature schemas

TaskCLF currently supports three persisted feature contracts:

  • FeatureSchemaV1: original schema, includes user_id in persisted rows and model features
  • FeatureSchemaV2: removes user_id from the schema/model feature contract
  • FeatureSchemaV3: current default; keeps user_id on persisted rows for joins/evaluation while using v2-style model semantics plus keyed title-sketch features

FeatureSchemaV1

Central class that owns the canonical column registry, the schema hash, and both row-level and DataFrame-level validators. FeatureSchemaV1 is implemented as a frozen slotted dataclass with class-level constants (VERSION, COLUMNS, SCHEMA_HASH).

Attribute Type Description
VERSION str "v1" -- schema generation tag
COLUMNS dict[str, type] Ordered column-name to Python-type mapping (41 columns)
SCHEMA_HASH str Deterministic hex digest derived from column names + types

The hash is computed at import time by JSON-serialising the ordered [[name, type_name], ...] pairs and passing them through stable_hash. Any column addition, removal, rename, or type change produces a different hash automatically.

Column registry

Columns are grouped by role. All columns are required; nullable fields (e.g. keys_per_min when no input watcher is present) are typed float but may contain None at the Pydantic model level.

Identity and time

Column Type Description
user_id str Pseudonymous user identifier
device_id str Optional device identifier
session_id str Hash-based session ID (see features.sessions)
bucket_start_ts datetime UTC-aligned bucket start
bucket_end_ts datetime bucket_start_ts + bucket_seconds

Schema metadata

Column Type Description
schema_version str Must equal FeatureSchemaV1.VERSION
schema_hash str Must equal FeatureSchemaV1.SCHEMA_HASH
source_ids list Collector IDs that contributed (e.g. ["aw-watcher-window"])

Application context

Column Type Description
app_id str Bundle ID of the dominant app in the bucket
app_category str Semantic category (e.g. "editor", "browser")
window_title_hash str Privacy-safe hash of the window title
is_browser bool Whether the dominant app is a browser
is_editor bool Whether the dominant app is a code editor
is_terminal bool Whether the dominant app is a terminal
domain_category str Browser domain classification (see features.domain)
window_title_bucket int Hash-bucketed title ID (see features.text)
title_repeat_count_session int How many times this title hash appeared in the current session

App-switching metrics

Column Type Description
app_switch_count_last_5m int Unique-app switches in the 5-minute look-back window
app_switch_count_last_15m int Same metric over 15 minutes
app_foreground_time_ratio float Fraction of the bucket the dominant app was foreground
app_change_count int App changes within the bucket itself
top2_app_concentration_15m float Combined time share of the two most-used apps over the last 15 minutes

Input activity

Column Type Description
keys_per_min float Keystrokes per minute (aggregate, no raw keys stored)
backspace_ratio float Fraction of keystrokes that are backspace
shortcut_rate float Fraction of keystrokes involving modifier keys
clicks_per_min float Mouse clicks per minute
scroll_events_per_min float Scroll events per minute
mouse_distance float Total mouse travel in pixels
active_seconds_keyboard float Seconds with keyboard activity in the bucket
active_seconds_mouse float Seconds with mouse activity
active_seconds_any float Seconds with any input
max_idle_run_seconds float Longest consecutive idle stretch
event_density float Active events per second of activity

Temporal dynamics (rolling)

Column Type Description
keys_per_min_rolling_5 float 5-bucket rolling mean of keys_per_min
keys_per_min_rolling_15 float 15-bucket rolling mean
mouse_distance_rolling_5 float 5-bucket rolling mean of mouse_distance
mouse_distance_rolling_15 float 15-bucket rolling mean
keys_per_min_delta float Current minus rolling-5 mean
clicks_per_min_delta float Current minus rolling-5 mean
mouse_distance_delta float Current minus rolling-5 mean

Calendar and session

Column Type Description
hour_of_day int 0--23 hour extracted from bucket_start_ts
day_of_week int 0 (Monday) -- 6 (Sunday)
session_length_so_far float Minutes elapsed since session start

validate_row

Validates a raw dict as a FeatureRow via Pydantic, then checks that schema_version and schema_hash match the current contract.

from taskclf.core.schema import FeatureSchemaV1

row = FeatureSchemaV1.validate_row(raw_dict)
# raises ValueError on schema_version or schema_hash mismatch

Returns the validated FeatureRow on success.

coerce_nullable_numeric

Converts nullable numeric columns from object dtype (caused by None values from FeatureRow.model_dump()) to float64 (with NaN). Call this before validate_dataframe whenever a DataFrame is built from model-dumped rows that may contain None in numeric fields.

The function modifies the DataFrame in place and also returns it for chaining convenience.

import pandas as pd
from taskclf.core.schema import FeatureSchemaV1, coerce_nullable_numeric

df = pd.DataFrame([row.model_dump() for row in feature_rows])
coerce_nullable_numeric(df)
FeatureSchemaV1.validate_dataframe(df)

validate_dataframe

Checks that a DataFrame has exactly the expected columns (no missing, no extra) and that pandas dtype kinds are compatible with the declared Python types.

The dtype compatibility mapping:

Python type Accepted pandas dtype kinds
int i (signed), u (unsigned)
float f (float), i, u (promotion safe)
bool b (bool), i, u (numpy coercion)
str O (object), U (unicode)

Types not in this map (e.g. datetime, list) are skipped during dtype checking.

import pandas as pd
from taskclf.core.schema import FeatureSchemaV1, coerce_nullable_numeric

df = pd.DataFrame([row.model_dump() for row in feature_rows])
coerce_nullable_numeric(df)
FeatureSchemaV1.validate_dataframe(df)  # raises ValueError on mismatch

See also

  • core.types -- FeatureRow Pydantic model
  • core.hashing -- stable_hash used for schema hash computation
  • features.build -- feature computation pipeline that produces schema-conformant rows

taskclf.core.schema

Feature schema versioning, deterministic hashing, and DataFrame validation.

FeatureSchemaV1 dataclass

Schema contract for feature rows (v1).

Holds the canonical column list, computes a deterministic schema hash, and validates individual rows or DataFrames against the contract.

Source code in src/taskclf/core/schema.py
@dataclass(frozen=True, eq=False)
class FeatureSchemaV1:
    """Schema contract for feature rows (v1).

    Holds the canonical column list, computes a deterministic schema hash,
    and validates individual rows or DataFrames against the contract.
    """

    VERSION: ClassVar[Final[str]] = "v1"
    COLUMNS: ClassVar[Final[dict[str, type]]] = _COLUMNS_V1
    SCHEMA_HASH: ClassVar[Final[str]] = _build_schema_hash(_COLUMNS_V1)

    # -- single-row validation ------------------------------------------

    @classmethod
    def validate_row(cls, data: dict[str, Any]) -> FeatureRowBase:
        """Validate *data* as a ``FeatureRow`` and verify schema metadata.

        Args:
            data: Raw dict of field values (e.g. from JSON or ``model_dump()``).

        Returns:
            The validated ``FeatureRow``.

        Raises:
            ValueError: If pydantic validation fails, or ``schema_version`` /
                ``schema_hash`` do not match the current contract.
        """
        row = FeatureRow.model_validate(data)
        if row.schema_version != cls.VERSION:
            raise ValueError(
                f"schema_version mismatch: expected {cls.VERSION!r}, "
                f"got {row.schema_version!r}"
            )
        if row.schema_hash != cls.SCHEMA_HASH:
            raise ValueError(
                f"schema_hash mismatch: expected {cls.SCHEMA_HASH!r}, "
                f"got {row.schema_hash!r}"
            )
        return row

    # -- DataFrame-level validation -------------------------------------

    @classmethod
    def validate_dataframe(cls, df: pd.DataFrame) -> None:
        """Check that *df* conforms to the v1 column contract.

        Args:
            df: DataFrame to validate (typically built from ``FeatureRow.model_dump()``).

        Raises:
            ValueError: If columns are missing, unexpected columns are present,
                or pandas dtype kinds do not match the expected Python types.
        """
        expected = set(cls.COLUMNS)
        actual = set(df.columns)

        missing = expected - actual
        if missing:
            raise ValueError(f"Missing columns: {sorted(missing)}")

        extra = actual - expected
        if extra:
            raise ValueError(f"Unexpected columns: {sorted(extra)}")

        _check_dataframe_dtypes(df, cls.COLUMNS)

validate_row(data) classmethod

Validate data as a FeatureRow and verify schema metadata.

Parameters:

Name Type Description Default
data dict[str, Any]

Raw dict of field values (e.g. from JSON or model_dump()).

required

Returns:

Type Description
FeatureRowBase

The validated FeatureRow.

Raises:

Type Description
ValueError

If pydantic validation fails, or schema_version / schema_hash do not match the current contract.

Source code in src/taskclf/core/schema.py
@classmethod
def validate_row(cls, data: dict[str, Any]) -> FeatureRowBase:
    """Validate *data* as a ``FeatureRow`` and verify schema metadata.

    Args:
        data: Raw dict of field values (e.g. from JSON or ``model_dump()``).

    Returns:
        The validated ``FeatureRow``.

    Raises:
        ValueError: If pydantic validation fails, or ``schema_version`` /
            ``schema_hash`` do not match the current contract.
    """
    row = FeatureRow.model_validate(data)
    if row.schema_version != cls.VERSION:
        raise ValueError(
            f"schema_version mismatch: expected {cls.VERSION!r}, "
            f"got {row.schema_version!r}"
        )
    if row.schema_hash != cls.SCHEMA_HASH:
        raise ValueError(
            f"schema_hash mismatch: expected {cls.SCHEMA_HASH!r}, "
            f"got {row.schema_hash!r}"
        )
    return row

validate_dataframe(df) classmethod

Check that df conforms to the v1 column contract.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to validate (typically built from FeatureRow.model_dump()).

required

Raises:

Type Description
ValueError

If columns are missing, unexpected columns are present, or pandas dtype kinds do not match the expected Python types.

Source code in src/taskclf/core/schema.py
@classmethod
def validate_dataframe(cls, df: pd.DataFrame) -> None:
    """Check that *df* conforms to the v1 column contract.

    Args:
        df: DataFrame to validate (typically built from ``FeatureRow.model_dump()``).

    Raises:
        ValueError: If columns are missing, unexpected columns are present,
            or pandas dtype kinds do not match the expected Python types.
    """
    expected = set(cls.COLUMNS)
    actual = set(df.columns)

    missing = expected - actual
    if missing:
        raise ValueError(f"Missing columns: {sorted(missing)}")

    extra = actual - expected
    if extra:
        raise ValueError(f"Unexpected columns: {sorted(extra)}")

    _check_dataframe_dtypes(df, cls.COLUMNS)

FeatureSchemaV2 dataclass

Schema contract for feature rows (v2).

Identical to :class:FeatureSchemaV1 except user_id has been removed from the column registry. Personalization shifts to calibrators and per-user post-processing.

Source code in src/taskclf/core/schema.py
@dataclass(frozen=True, eq=False)
class FeatureSchemaV2:
    """Schema contract for feature rows (v2).

    Identical to :class:`FeatureSchemaV1` except ``user_id`` has been
    removed from the column registry.  Personalization shifts to
    calibrators and per-user post-processing.
    """

    VERSION: ClassVar[Final[str]] = "v2"
    COLUMNS: ClassVar[Final[dict[str, type]]] = _COLUMNS_V2
    SCHEMA_HASH: ClassVar[Final[str]] = _build_schema_hash(_COLUMNS_V2)

    @classmethod
    def validate_row(cls, data: dict[str, Any]) -> FeatureRowBase:
        """Validate *data* as a ``FeatureRow`` and verify schema metadata.

        Args:
            data: Raw dict of field values (e.g. from JSON or ``model_dump()``).

        Returns:
            The validated ``FeatureRow``.

        Raises:
            ValueError: If pydantic validation fails, or ``schema_version`` /
                ``schema_hash`` do not match the v2 contract.
        """
        row = FeatureRow.model_validate(data)
        if row.schema_version != cls.VERSION:
            raise ValueError(
                f"schema_version mismatch: expected {cls.VERSION!r}, "
                f"got {row.schema_version!r}"
            )
        if row.schema_hash != cls.SCHEMA_HASH:
            raise ValueError(
                f"schema_hash mismatch: expected {cls.SCHEMA_HASH!r}, "
                f"got {row.schema_hash!r}"
            )
        return row

    @classmethod
    def validate_dataframe(cls, df: pd.DataFrame) -> None:
        """Check that *df* conforms to the v2 column contract.

        Args:
            df: DataFrame to validate.

        Raises:
            ValueError: If columns are missing, unexpected columns are present,
                or pandas dtype kinds do not match the expected Python types.
        """
        expected = set(cls.COLUMNS)
        actual = set(df.columns)

        missing = expected - actual
        if missing:
            raise ValueError(f"Missing columns: {sorted(missing)}")

        extra = actual - expected
        if extra:
            raise ValueError(f"Unexpected columns: {sorted(extra)}")

        _check_dataframe_dtypes(df, cls.COLUMNS)

validate_row(data) classmethod

Validate data as a FeatureRow and verify schema metadata.

Parameters:

Name Type Description Default
data dict[str, Any]

Raw dict of field values (e.g. from JSON or model_dump()).

required

Returns:

Type Description
FeatureRowBase

The validated FeatureRow.

Raises:

Type Description
ValueError

If pydantic validation fails, or schema_version / schema_hash do not match the v2 contract.

Source code in src/taskclf/core/schema.py
@classmethod
def validate_row(cls, data: dict[str, Any]) -> FeatureRowBase:
    """Validate *data* as a ``FeatureRow`` and verify schema metadata.

    Args:
        data: Raw dict of field values (e.g. from JSON or ``model_dump()``).

    Returns:
        The validated ``FeatureRow``.

    Raises:
        ValueError: If pydantic validation fails, or ``schema_version`` /
            ``schema_hash`` do not match the v2 contract.
    """
    row = FeatureRow.model_validate(data)
    if row.schema_version != cls.VERSION:
        raise ValueError(
            f"schema_version mismatch: expected {cls.VERSION!r}, "
            f"got {row.schema_version!r}"
        )
    if row.schema_hash != cls.SCHEMA_HASH:
        raise ValueError(
            f"schema_hash mismatch: expected {cls.SCHEMA_HASH!r}, "
            f"got {row.schema_hash!r}"
        )
    return row

validate_dataframe(df) classmethod

Check that df conforms to the v2 column contract.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to validate.

required

Raises:

Type Description
ValueError

If columns are missing, unexpected columns are present, or pandas dtype kinds do not match the expected Python types.

Source code in src/taskclf/core/schema.py
@classmethod
def validate_dataframe(cls, df: pd.DataFrame) -> None:
    """Check that *df* conforms to the v2 column contract.

    Args:
        df: DataFrame to validate.

    Raises:
        ValueError: If columns are missing, unexpected columns are present,
            or pandas dtype kinds do not match the expected Python types.
    """
    expected = set(cls.COLUMNS)
    actual = set(df.columns)

    missing = expected - actual
    if missing:
        raise ValueError(f"Missing columns: {sorted(missing)}")

    extra = actual - expected
    if extra:
        raise ValueError(f"Unexpected columns: {sorted(extra)}")

    _check_dataframe_dtypes(df, cls.COLUMNS)

FeatureSchemaV3 dataclass

Schema contract for feature rows (v3).

Extends :class:FeatureSchemaV1 with high-signal keyed title sketch features while keeping user_id on persisted rows for joins and per-user evaluation.

Source code in src/taskclf/core/schema.py
@dataclass(frozen=True, eq=False)
class FeatureSchemaV3:
    """Schema contract for feature rows (v3).

    Extends :class:`FeatureSchemaV1` with high-signal keyed title sketch
    features while keeping ``user_id`` on persisted rows for joins and
    per-user evaluation.
    """

    VERSION: ClassVar[Final[str]] = "v3"
    COLUMNS: ClassVar[Final[dict[str, type]]] = _COLUMNS_V3
    SCHEMA_HASH: ClassVar[Final[str]] = _build_schema_hash(_COLUMNS_V3)

    @classmethod
    def validate_row(cls, data: dict[str, Any]) -> FeatureRowBase:
        row = FeatureRow.model_validate(data)
        if row.schema_version != cls.VERSION:
            raise ValueError(
                f"schema_version mismatch: expected {cls.VERSION!r}, "
                f"got {row.schema_version!r}"
            )
        if row.schema_hash != cls.SCHEMA_HASH:
            raise ValueError(
                f"schema_hash mismatch: expected {cls.SCHEMA_HASH!r}, "
                f"got {row.schema_hash!r}"
            )
        return row

    @classmethod
    def validate_dataframe(cls, df: pd.DataFrame) -> None:
        expected = set(cls.COLUMNS)
        actual = set(df.columns)

        missing = expected - actual
        if missing:
            raise ValueError(f"Missing columns: {sorted(missing)}")

        extra = actual - expected
        if extra:
            raise ValueError(f"Unexpected columns: {sorted(extra)}")

        _check_dataframe_dtypes(df, cls.COLUMNS)

get_feature_schema(schema_version)

Return the schema class for schema_version.

Source code in src/taskclf/core/schema.py
def get_feature_schema(schema_version: str):
    """Return the schema class for *schema_version*."""
    schema = FEATURE_SCHEMA_REGISTRY.get(schema_version)
    if schema is None:
        raise ValueError(f"Unknown schema version: {schema_version!r}")
    return schema

get_feature_storage_dir(schema_version)

Return the processed-feature directory name for schema_version.

Source code in src/taskclf/core/schema.py
def get_feature_storage_dir(schema_version: str) -> str:
    """Return the processed-feature directory name for *schema_version*."""
    return f"features_{schema_version}"

iter_feature_schema_versions(preferred_schema_version=None)

Return schema versions ordered for lookup, newest-first by default.

Source code in src/taskclf/core/schema.py
def iter_feature_schema_versions(
    preferred_schema_version: str | None = None,
) -> tuple[str, ...]:
    """Return schema versions ordered for lookup, newest-first by default."""
    if preferred_schema_version is None:
        return FEATURE_SCHEMA_VERSION_ORDER
    if preferred_schema_version not in FEATURE_SCHEMA_REGISTRY:
        raise ValueError(f"Unknown schema version: {preferred_schema_version!r}")
    return (preferred_schema_version,) + tuple(
        version
        for version in FEATURE_SCHEMA_VERSION_ORDER
        if version != preferred_schema_version
    )

resolve_feature_parquet_path(data_dir, target_date, *, schema_version=None)

Return the first existing feature parquet path for target_date.

When schema_version is provided it is checked first, then older/newer versions are tried as fallbacks. When omitted, lookup proceeds newest-first.

Source code in src/taskclf/core/schema.py
def resolve_feature_parquet_path(
    data_dir: str | PathLike[str],
    target_date: date,
    *,
    schema_version: str | None = None,
) -> Path | None:
    """Return the first existing feature parquet path for *target_date*.

    When *schema_version* is provided it is checked first, then older/newer
    versions are tried as fallbacks. When omitted, lookup proceeds newest-first.
    """
    root = Path(data_dir)
    for version in iter_feature_schema_versions(schema_version):
        candidate = (
            root
            / get_feature_storage_dir(version)
            / f"date={target_date.isoformat()}"
            / "features.parquet"
        )
        if candidate.exists():
            return candidate
    return None

coerce_nullable_numeric(df)

Convert nullable numeric columns from object (None) to float64 (NaN).

When FeatureRow.model_dump() emits None for Optional[float] fields, pandas stores the column as object dtype. This helper coerces those columns to float64 so downstream validation and parquet writing see the correct dtype.

The DataFrame is modified in-place and also returned for convenience.

Source code in src/taskclf/core/schema.py
def coerce_nullable_numeric(df: pd.DataFrame) -> pd.DataFrame:
    """Convert nullable numeric columns from object (None) to float64 (NaN).

    When ``FeatureRow.model_dump()`` emits ``None`` for ``Optional[float]``
    fields, pandas stores the column as ``object`` dtype.  This helper
    coerces those columns to ``float64`` so downstream validation and
    parquet writing see the correct dtype.

    The DataFrame is modified **in-place** and also returned for convenience.
    """
    for col, expected_type in _COLUMNS_V3.items():
        if col not in df.columns:
            continue
        if expected_type in (float, int) and df[col].dtype.kind == "O":
            df[col] = pd.to_numeric(df[col], errors="coerce")
    return df