Skip to content

features.build

Feature computation pipeline: convert normalised ActivityWatch events into bucketed, schema-validated FeatureRow instances and optionally write them to Parquet.

Pipeline overview

The feature build pipeline operates in three modes:

  1. Batch (build_features_from_aw_events) -- converts a sorted sequence of normalised Event objects (plus optional AWInputEvent objects) into per-bucket FeatureRow instances.
  2. Dummy (generate_dummy_features) -- produces synthetic feature rows for a given date, useful for testing and development.
  3. File (build_features_for_date) -- fetches events from a running ActivityWatch server (or generates dummy features when no server is available) and writes them to the Parquet partition layout.

The batch pipeline follows this data flow:

Events ──► bucket by time ──► dominant app per bucket ──► context features
Input events ──► bucket ──► aggregate (keys/clicks/mouse) ─┘
                        ┌──────────────────────────────────┘
              session detection ──► dynamics tracker ──► FeatureRow

build_features_from_aw_events

Core function that converts normalised events into per-bucket feature rows. Events are grouped into fixed-width time buckets (default 60 s). For each bucket, the dominant application (longest total duration) determines the context columns (app ID, category, title hash, flags).

Parameter Type Default Description
events Sequence[Event] -- Sorted normalised events
user_id str "default-user" Pseudonymous user identifier
device_id str \| None None Optional device identifier
input_events Sequence[AWInputEvent] \| None None Input watcher events for keyboard/mouse features
bucket_seconds int DEFAULT_BUCKET_SECONDS Width of each time bucket
session_start datetime \| None None Fixed session start (online mode); None triggers auto-detection
idle_gap_seconds float DEFAULT_IDLE_GAP_SECONDS Minimum gap that splits sessions (batch mode)
schema_version str "v3" Feature schema to emit ("v1", "v2", or "v3")

When input_events is None, all keyboard/mouse feature columns (keys_per_min, clicks_per_min, etc.) are set to None.

When session_start is None (batch mode), sessions are detected automatically from idle gaps via features.sessions.detect_session_boundaries. In online mode, the caller passes the known session start to avoid resetting the session each poll cycle.

Sub-modules invoked per bucket:

Sub-module Columns produced
features.windows app_switch_count_last_5m, app_switch_count_last_15m, app_entropy_5m, app_entropy_15m, top2_app_concentration_15m
features.sessions session_id, session_length_so_far
features.domain domain_category
features.text window_title_bucket, title_repeat_count_session, keyed title-sketch features (v3)
features.dynamics Rolling means and deltas (7 columns)
(inline) app_dwell_time_seconds

app_dwell_time_seconds is computed directly in build_features_from_aw_events (not via a sub-module). It tracks how long the current dominant app has been foreground continuously across consecutive buckets. When the dominant app changes the counter resets; when it stays the same the previous dwell is accumulated.

app_entropy_5m and app_entropy_15m are Shannon entropy values of the app duration distribution over the last 5 and 15 minutes respectively. They are computed by features.windows.app_entropy_in_window. A single focused app yields entropy 0; uniform usage across N apps yields log2(N). These features capture how "scattered" the user's app usage is within a time window.

top2_app_concentration_15m is the combined time share of the two most-used apps over the last 15 minutes. It is computed by features.windows.top2_app_concentration_in_window. A value of 1.0 means at most two apps were used; lower values indicate more fragmented usage across three or more apps.

from taskclf.features.build import build_features_from_aw_events

rows = build_features_from_aw_events(
    events,
    user_id="user-001",
    input_events=input_events,
)

generate_dummy_features

Creates n_rows synthetic FeatureRow instances spanning hours 9--17 of the given date. Cycles through 10 dummy applications to produce realistic variety. Useful for pipeline testing without real ActivityWatch data.

Parameter Type Default Description
date date -- Calendar date to generate buckets for
n_rows int DEFAULT_DUMMY_ROWS Number of rows to generate
user_id str "dummy-user-001" User identifier
device_id str \| None None Optional device identifier

build_features_for_date

Builds feature rows for a given date and writes them to Parquet using the partitioned layout:

data_dir/features_<schema_version>/date=YYYY-MM-DD/features.parquet

When aw_host is provided (and synthetic is False), events are fetched live from a running ActivityWatch server via the REST API. Both aw-watcher-window and aw-watcher-input buckets are queried automatically. Without aw_host (or with synthetic=True), dummy features are generated for testing.

Parameter Type Default Description
date date -- Calendar date to build features for
data_dir Path -- Root of processed data
aw_host str \| None None AW server URL; None falls back to dummy generation
title_salt str \| None None Optional process override for title hashing; defaults to local .title_secret
user_id str "default-user" Pseudonymous user identifier
device_id str \| None None Optional device identifier
synthetic bool False Force dummy feature generation
schema_version str "v3" Output schema version

When aw_host is set and title_salt is omitted, the builder resolves the per-install local secret from UserConfig(data_dir).title_secret. New builds default to features_v3/.

See also

taskclf.features.build

Feature computation pipeline: build bucketed feature rows and write to parquet.

generate_dummy_features(date, n_rows=DEFAULT_DUMMY_ROWS, *, user_id='dummy-user-001', device_id=None, schema_version=LATEST_FEATURE_SCHEMA_VERSION)

Create n_rows synthetic FeatureRow instances spanning date.

Parameters:

Name Type Description Default
date date

The calendar date to generate buckets for (hours 9-17).

required
n_rows int

Number of rows to generate.

DEFAULT_DUMMY_ROWS
user_id str

User identifier for all generated rows.

'dummy-user-001'
device_id str | None

Optional device identifier.

None

Returns:

Type Description
list[FeatureRowBase]

Validated FeatureRow instances with dummy app/keyboard/mouse data.

Source code in src/taskclf/features/build.py
def generate_dummy_features(
    date: dt.date,
    n_rows: int = DEFAULT_DUMMY_ROWS,
    *,
    user_id: str = "dummy-user-001",
    device_id: str | None = None,
    schema_version: str = LATEST_FEATURE_SCHEMA_VERSION,
) -> list[FeatureRowBase]:
    """Create *n_rows* synthetic FeatureRow instances spanning *date*.

    Args:
        date: The calendar date to generate buckets for (hours 9-17).
        n_rows: Number of rows to generate.
        user_id: User identifier for all generated rows.
        device_id: Optional device identifier.

    Returns:
        Validated ``FeatureRow`` instances with dummy app/keyboard/mouse data.
    """
    rows: list[FeatureRowBase] = []
    schema = get_feature_schema(schema_version)
    day_of_week = date.weekday()
    session_start = dt.datetime(
        date.year, date.month, date.day, 9, 0, tzinfo=dt.timezone.utc
    )
    sid = stable_hash(f"{user_id}:{session_start.isoformat()}")

    tracker = DynamicsTracker()
    title_counts: dict[str, int] = defaultdict(int)

    for i in range(n_rows):
        hour = 9 + (i * 8 // max(n_rows, 1))
        minute = (i * 7) % 60
        ts = dt.datetime(
            date.year, date.month, date.day, hour, minute, tzinfo=dt.timezone.utc
        )
        end_ts = ts + dt.timedelta(seconds=DEFAULT_BUCKET_SECONDS)

        app_id, is_browser, is_editor, is_terminal, app_category = _DUMMY_APPS[
            i % len(_DUMMY_APPS)
        ]
        title_hash = stable_hash(f"window-title-{app_id}-{i}")
        title_counts[title_hash] += 1

        keys = float(40 + i * 10)
        clicks = float(3 + i % 8)
        mouse_dist = float(200 + i * 50)
        dynamics = tracker.update(keys, clicks, mouse_dist)

        rows.append(
            _make_feature_row(
                user_id=user_id,
                device_id=device_id,
                session_id=sid,
                bucket_start_ts=ts,
                bucket_end_ts=end_ts,
                schema_version=schema.VERSION,
                schema_hash=schema.SCHEMA_HASH,
                source_ids=[f"dummy-{i:03d}"],
                app_id=app_id,
                app_category=app_category,
                window_title_hash=title_hash,
                is_browser=is_browser,
                is_editor=is_editor,
                is_terminal=is_terminal,
                app_switch_count_last_5m=i % 5,
                app_foreground_time_ratio=round(0.5 + (i % 5) * 0.1, 2),
                app_change_count=i % 4,
                app_dwell_time_seconds=round(
                    DEFAULT_BUCKET_SECONDS * (0.5 + (i % 5) * 0.1), 2
                ),
                idle_return_indicator=(i == 0),
                app_entropy_5m=round(0.5 + (i % 5) * 0.3, 2),
                app_entropy_15m=round(0.8 + (i % 5) * 0.25, 2),
                top2_app_concentration_15m=round(0.6 + (i % 5) * 0.08, 4),
                keys_per_min=keys,
                backspace_ratio=round(0.05 + (i % 5) * 0.02, 2),
                shortcut_rate=round(0.1 + (i % 3) * 0.05, 2),
                clicks_per_min=clicks,
                scroll_events_per_min=float(i % 6),
                mouse_distance=mouse_dist,
                active_seconds_keyboard=float(20 + (i % 8) * 5),
                active_seconds_mouse=float(15 + (i % 9) * 5),
                active_seconds_any=float(30 + (i % 6) * 5),
                max_idle_run_seconds=float(5 + (i % 4) * 5),
                event_density=round(1.5 + (i % 5) * 0.3, 2),
                domain_category=classify_domain(None, is_browser=is_browser),
                window_title_bucket=title_hash_bucket(
                    title_hash, DEFAULT_TITLE_HASH_BUCKETS
                ),
                title_repeat_count_session=title_counts[title_hash],
                keys_per_min_rolling_5=dynamics["keys_per_min_rolling_5"],
                keys_per_min_rolling_15=dynamics["keys_per_min_rolling_15"],
                mouse_distance_rolling_5=dynamics["mouse_distance_rolling_5"],
                mouse_distance_rolling_15=dynamics["mouse_distance_rolling_15"],
                keys_per_min_delta=dynamics["keys_per_min_delta"],
                clicks_per_min_delta=dynamics["clicks_per_min_delta"],
                mouse_distance_delta=dynamics["mouse_distance_delta"],
                app_switch_count_last_15m=i % 8,
                hour_of_day=hour,
                day_of_week=day_of_week,
                session_length_so_far=float(i * 5),
            )
        )

    return rows

build_features_from_aw_events(events, *, user_id='default-user', device_id=None, input_events=None, bucket_seconds=DEFAULT_BUCKET_SECONDS, session_start=None, idle_gap_seconds=DEFAULT_IDLE_GAP_SECONDS, schema_version=LATEST_FEATURE_SCHEMA_VERSION)

Convert normalised events into per-bucket :class:FeatureRow instances.

Events are grouped into fixed-width time buckets. For each bucket the dominant application (longest total duration) is selected and its metadata (app ID, title hash, flags) is used to populate the context columns.

When input_events from aw-watcher-input are provided, keyboard and mouse features (keys_per_min, clicks_per_min, scroll_events_per_min, mouse_distance) are computed by aggregating the 5-second input samples that fall within each bucket. Without input events those fields remain None.

Session detection is performed automatically via idle-gap analysis (see :func:~taskclf.features.sessions.detect_session_boundaries). In online mode the caller may pass a known session_start to avoid resetting the session each poll cycle.

Parameters:

Name Type Description Default
events Sequence[Event]

Sorted, normalised events satisfying the :class:~taskclf.core.types.Event protocol (e.g. from :func:~taskclf.adapters.activitywatch.client.parse_aw_export).

required
user_id str

Random UUID identifying the user (not PII).

'default-user'
device_id str | None

Optional device identifier.

None
input_events Sequence[AWInputEvent] | None

Optional sorted input events from aw-watcher-input. When provided, keyboard/mouse feature columns are populated; otherwise they remain None.

None
bucket_seconds int

Width of each time bucket in seconds (default 60).

DEFAULT_BUCKET_SECONDS
session_start datetime | None

If provided, used as the session start for every bucket (online mode). When None (batch mode), sessions are detected from idle gaps in events.

None
idle_gap_seconds float

Minimum gap in seconds that splits sessions (only used when session_start is None).

DEFAULT_IDLE_GAP_SECONDS

Returns:

Type Description
list[FeatureRowBase]

Validated FeatureRow instances ordered by bucket_start_ts.

Source code in src/taskclf/features/build.py
def build_features_from_aw_events(
    events: Sequence[Event],
    *,
    user_id: str = "default-user",
    device_id: str | None = None,
    input_events: Sequence[AWInputEvent] | None = None,
    bucket_seconds: int = DEFAULT_BUCKET_SECONDS,
    session_start: dt.datetime | None = None,
    idle_gap_seconds: float = DEFAULT_IDLE_GAP_SECONDS,
    schema_version: str = LATEST_FEATURE_SCHEMA_VERSION,
) -> list[FeatureRowBase]:
    """Convert normalised events into per-bucket :class:`FeatureRow` instances.

    Events are grouped into fixed-width time buckets.  For each bucket
    the *dominant* application (longest total duration) is selected and
    its metadata (app ID, title hash, flags) is used to populate the
    context columns.

    When *input_events* from ``aw-watcher-input`` are provided, keyboard
    and mouse features (``keys_per_min``, ``clicks_per_min``,
    ``scroll_events_per_min``, ``mouse_distance``) are computed by
    aggregating the 5-second input samples that fall within each bucket.
    Without input events those fields remain ``None``.

    Session detection is performed automatically via idle-gap analysis
    (see :func:`~taskclf.features.sessions.detect_session_boundaries`).
    In online mode the caller may pass a known *session_start* to
    avoid resetting the session each poll cycle.

    Args:
        events: Sorted, normalised events satisfying the
            :class:`~taskclf.core.types.Event` protocol (e.g. from
            :func:`~taskclf.adapters.activitywatch.client.parse_aw_export`).
        user_id: Random UUID identifying the user (not PII).
        device_id: Optional device identifier.
        input_events: Optional sorted input events from
            ``aw-watcher-input``.  When provided, keyboard/mouse feature
            columns are populated; otherwise they remain ``None``.
        bucket_seconds: Width of each time bucket in seconds (default 60).
        session_start: If provided, used as the session start for every
            bucket (online mode).  When ``None`` (batch mode), sessions
            are detected from idle gaps in *events*.
        idle_gap_seconds: Minimum gap in seconds that splits sessions
            (only used when *session_start* is ``None``).

    Returns:
        Validated ``FeatureRow`` instances ordered by ``bucket_start_ts``.
    """
    if not events:
        return []
    schema = get_feature_schema(schema_version)

    bucket_events: dict[dt.datetime, list[Event]] = defaultdict(list)
    for ev in events:
        bucket_ts = align_to_bucket(ev.timestamp, bucket_seconds)
        bucket_events[bucket_ts].append(ev)

    bucket_input_events: dict[dt.datetime, list[AWInputEvent]] = defaultdict(list)
    if input_events:
        for ie in input_events:
            ie_bucket = align_to_bucket(ie.timestamp, bucket_seconds)
            bucket_input_events[ie_bucket].append(ie)

    has_input = bool(input_events)

    sorted_buckets = sorted(bucket_events.keys())
    all_events_sorted = sorted(events, key=lambda e: e.timestamp)

    if session_start is not None:
        session_starts: list[dt.datetime] = [
            align_to_bucket(session_start, bucket_seconds),
        ]
    else:
        session_starts = [
            align_to_bucket(ts, bucket_seconds)
            for ts in detect_session_boundaries(
                all_events_sorted,
                idle_gap_seconds=idle_gap_seconds,
            )
        ]

    # Pre-compute session_id for each session start
    session_id_map: dict[dt.datetime, str] = {
        ss: stable_hash(f"{user_id}:{ss.isoformat()}") for ss in session_starts
    }

    dynamics = DynamicsTracker(
        rolling_5=DEFAULT_ROLLING_WINDOW_5,
        rolling_15=DEFAULT_ROLLING_WINDOW_15,
    )
    session_title_counts: dict[dt.datetime, dict[str, int]] = defaultdict(
        lambda: defaultdict(int)
    )

    prev_dominant_app: str | None = None
    current_dwell: float = 0.0

    rows: list[FeatureRowBase] = []
    for bucket_ts in sorted_buckets:
        evs = bucket_events[bucket_ts]

        app_durations: dict[str, float] = defaultdict(float)
        for ev in evs:
            app_durations[ev.app_id] += ev.duration_seconds
        dominant_app_id = max(app_durations, key=app_durations.get)  # type: ignore[arg-type]

        dominant_ev = next(ev for ev in evs if ev.app_id == dominant_app_id)

        foreground_ratio = min(app_durations[dominant_app_id] / bucket_seconds, 1.0)

        dominant_foreground_secs = app_durations[dominant_app_id]
        if dominant_app_id == prev_dominant_app:
            current_dwell += dominant_foreground_secs
        else:
            current_dwell = dominant_foreground_secs
        prev_dominant_app = dominant_app_id

        sorted_evs = sorted(evs, key=lambda e: e.timestamp)
        change_count = sum(
            1 for a, b in zip(sorted_evs, sorted_evs[1:]) if a.app_id != b.app_id
        )

        switch_count = app_switch_count_in_window(
            all_events_sorted,
            bucket_ts,
            bucket_seconds=bucket_seconds,
        )
        switch_count_15m = app_switch_count_in_window(
            all_events_sorted,
            bucket_ts,
            window_minutes=DEFAULT_APP_SWITCH_WINDOW_15M,
            bucket_seconds=bucket_seconds,
        )

        entropy_5m = app_entropy_in_window(
            all_events_sorted,
            bucket_ts,
            window_minutes=DEFAULT_ROLLING_WINDOW_5,
            bucket_seconds=bucket_seconds,
        )
        entropy_15m = app_entropy_in_window(
            all_events_sorted,
            bucket_ts,
            window_minutes=DEFAULT_APP_SWITCH_WINDOW_15M,
            bucket_seconds=bucket_seconds,
        )

        top2_conc_15m = top2_app_concentration_in_window(
            all_events_sorted,
            bucket_ts,
            window_minutes=DEFAULT_APP_SWITCH_WINDOW_15M,
            bucket_seconds=bucket_seconds,
        )

        cur_session = session_start_for_bucket(bucket_ts, session_starts)
        elapsed_minutes = (bucket_ts - cur_session).total_seconds() / 60.0
        sid = session_id_map[cur_session]

        input_agg = _aggregate_input_for_bucket(
            bucket_ts,
            bucket_input_events.get(bucket_ts, []),
            bucket_seconds,
        )

        # Title clustering (item 39)
        title_hash = dominant_ev.window_title_hash
        session_title_counts[cur_session][title_hash] += 1
        w_title_bucket = title_hash_bucket(title_hash, DEFAULT_TITLE_HASH_BUCKETS)

        # Domain classification (item 38)
        domain_cat = classify_domain(None, is_browser=dominant_ev.is_browser)

        # Temporal dynamics (item 40)
        dyn = dynamics.update(
            input_agg["keys_per_min"],
            input_agg["clicks_per_min"],
            input_agg["mouse_distance"],
        )

        source_ids = ["aw-watcher-window"]
        if has_input:
            source_ids.append("aw-watcher-input")

        rows.append(
            _make_feature_row(
                user_id=user_id,
                device_id=device_id,
                session_id=sid,
                bucket_start_ts=bucket_ts,
                bucket_end_ts=bucket_ts + dt.timedelta(seconds=bucket_seconds),
                schema_version=schema.VERSION,
                schema_hash=schema.SCHEMA_HASH,
                source_ids=source_ids,
                app_id=dominant_app_id,
                app_category=dominant_ev.app_category,
                window_title_hash=title_hash,
                is_browser=dominant_ev.is_browser,
                is_editor=dominant_ev.is_editor,
                is_terminal=dominant_ev.is_terminal,
                app_switch_count_last_5m=switch_count,
                app_foreground_time_ratio=round(foreground_ratio, 4),
                app_change_count=change_count,
                app_dwell_time_seconds=round(current_dwell, 2),
                idle_return_indicator=(bucket_ts == cur_session),
                app_entropy_5m=entropy_5m,
                app_entropy_15m=entropy_15m,
                top2_app_concentration_15m=top2_conc_15m,
                keys_per_min=input_agg["keys_per_min"],
                backspace_ratio=None,
                shortcut_rate=None,
                clicks_per_min=input_agg["clicks_per_min"],
                scroll_events_per_min=input_agg["scroll_events_per_min"],
                mouse_distance=input_agg["mouse_distance"],
                active_seconds_keyboard=input_agg["active_seconds_keyboard"],
                active_seconds_mouse=input_agg["active_seconds_mouse"],
                active_seconds_any=input_agg["active_seconds_any"],
                max_idle_run_seconds=input_agg["max_idle_run_seconds"],
                event_density=input_agg["event_density"],
                domain_category=domain_cat,
                window_title_bucket=w_title_bucket,
                title_repeat_count_session=session_title_counts[cur_session][
                    title_hash
                ],
                keys_per_min_rolling_5=dyn["keys_per_min_rolling_5"],
                keys_per_min_rolling_15=dyn["keys_per_min_rolling_15"],
                mouse_distance_rolling_5=dyn["mouse_distance_rolling_5"],
                mouse_distance_rolling_15=dyn["mouse_distance_rolling_15"],
                keys_per_min_delta=dyn["keys_per_min_delta"],
                clicks_per_min_delta=dyn["clicks_per_min_delta"],
                mouse_distance_delta=dyn["mouse_distance_delta"],
                app_switch_count_last_15m=switch_count_15m,
                hour_of_day=bucket_ts.hour,
                day_of_week=bucket_ts.weekday(),
                session_length_so_far=round(elapsed_minutes, 2),
                **_dominant_title_feature_payload(dominant_ev, schema_version),
            )
        )

    return rows

build_features_for_date(date, data_dir, *, aw_host=None, title_salt=None, user_id='default-user', device_id=None, synthetic=False, schema_version=LATEST_FEATURE_SCHEMA_VERSION)

Build feature rows for date, validate, and write to parquet.

When aw_host is provided (and synthetic is False), events are fetched live from a running ActivityWatch server. Otherwise dummy/synthetic rows are generated for testing.

Parameters:

Name Type Description Default
date date

Calendar date to build features for.

required
data_dir Path

Root of processed data (e.g. Path("data/processed")). Output lands at data_dir/features_<schema_version>/date=YYYY-MM-DD/features.parquet.

required
aw_host str | None

Base URL of a running AW server (e.g. "http://localhost:5600"). When None or synthetic is True, dummy features are generated.

None
title_salt str | None

Optional process override for title hashing. When omitted and aw_host is set, the local .title_secret is used.

None
user_id str

Pseudonymous user identifier.

'default-user'
device_id str | None

Optional device identifier.

None
synthetic bool

Force dummy feature generation even if aw_host is set.

False

Returns:

Type Description
Path

Path of the written parquet file.

Raises:

Type Description
ValueError

If generated data fails the selected feature-schema validation.

Source code in src/taskclf/features/build.py
def build_features_for_date(
    date: dt.date,
    data_dir: Path,
    *,
    aw_host: str | None = None,
    title_salt: str | None = None,
    user_id: str = "default-user",
    device_id: str | None = None,
    synthetic: bool = False,
    schema_version: str = LATEST_FEATURE_SCHEMA_VERSION,
) -> Path:
    """Build feature rows for *date*, validate, and write to parquet.

    When *aw_host* is provided (and *synthetic* is ``False``), events
    are fetched live from a running ActivityWatch server.  Otherwise
    dummy/synthetic rows are generated for testing.

    Args:
        date: Calendar date to build features for.
        data_dir: Root of processed data (e.g. ``Path("data/processed")``).
            Output lands at
            ``data_dir/features_<schema_version>/date=YYYY-MM-DD/features.parquet``.
        aw_host: Base URL of a running AW server
            (e.g. ``"http://localhost:5600"``).  When ``None`` or
            *synthetic* is ``True``, dummy features are generated.
        title_salt: Optional process override for title hashing.  When omitted
            and *aw_host* is set, the local ``.title_secret`` is used.
        user_id: Pseudonymous user identifier.
        device_id: Optional device identifier.
        synthetic: Force dummy feature generation even if *aw_host* is
            set.

    Returns:
        Path of the written parquet file.

    Raises:
        ValueError: If generated data fails the selected feature-schema validation.
    """
    schema = get_feature_schema(schema_version)
    if not synthetic and aw_host is not None:
        if not title_salt:
            from taskclf.core.config import UserConfig

            title_salt = UserConfig(data_dir).title_secret
        rows = _fetch_aw_features_for_date(
            date,
            aw_host=aw_host,
            title_salt=title_salt,
            user_id=user_id,
            device_id=device_id,
            schema_version=schema_version,
        )
        if not rows:
            logger.debug("No AW events found for %s — writing empty parquet", date)
    else:
        rows = generate_dummy_features(
            date,
            user_id=user_id,
            device_id=device_id,
            schema_version=schema_version,
        )

    df = pd.DataFrame([r.model_dump() for r in rows])

    if not df.empty:
        coerce_nullable_numeric(df)
        schema.validate_dataframe(df)

    out_path = (
        data_dir
        / get_feature_storage_dir(schema_version)
        / f"date={date.isoformat()}"
        / "features.parquet"
    )
    return write_parquet(df, out_path)