Skip to content

adapters.activitywatch

ActivityWatch adapter: data ingestion from AW JSON exports and the AW REST API.

Error handling

All REST API functions raise typed exceptions so callers can distinguish failure modes and respond accordingly:

  • AWConnectionError -- the AW server is unreachable or refused the connection (wraps ConnectionRefusedError, ConnectionResetError, urllib.error.URLError).
  • AWTimeoutError -- the AW server did not respond within the configured timeout (wraps TimeoutError, socket.timeout).

All REST functions accept an optional timeout keyword argument (default: DEFAULT_AW_TIMEOUT_SECONDS = 10). The tray app exposes this as the aw_timeout_seconds config setting and --aw-timeout CLI flag.

When the tray polls AW and encounters repeated failures, it applies adaptive backoff: connection-refused errors progressively increase the sleep between polls (exponential, capped at 5 minutes), while timeout errors keep the normal polling interval (since the timeout itself is already the wait). After 3 consecutive failures a WARNING is logged and an aw_unreachable status event is published to the web UI. On recovery, an INFO message is logged and the backoff resets.

types

taskclf.adapters.activitywatch.types

Privacy-safe normalized event types for ActivityWatch data.

AWEvent

Bases: BaseModel

A single ActivityWatch window event, normalized and privacy-scrubbed.

Raw app names are mapped to reverse-domain identifiers via :func:~taskclf.adapters.activitywatch.mapping.normalize_app. Raw title strings are replaced with a salted hash via :func:~taskclf.core.hashing.salted_hash -- the original title is never persisted.

Source code in src/taskclf/adapters/activitywatch/types.py
class AWEvent(BaseModel, frozen=True):
    """A single ActivityWatch window event, normalized and privacy-scrubbed.

    Raw ``app`` names are mapped to reverse-domain identifiers via
    :func:`~taskclf.adapters.activitywatch.mapping.normalize_app`.
    Raw ``title`` strings are replaced with a salted hash via
    :func:`~taskclf.core.hashing.salted_hash` -- the original title
    is never persisted.
    """

    timestamp: datetime = Field(description="Event start (UTC).")
    duration_seconds: float = Field(ge=0, description="Duration in seconds.")
    app_id: str = Field(description="Reverse-domain app identifier.")
    window_title_hash: str = Field(description="Salted SHA-256 of the window title.")
    title_token_sketch: tuple[float, ...] = Field(
        default_factory=lambda: tuple(
            0.0 for _ in range(DEFAULT_TITLE_TOKEN_SKETCH_BUCKETS)
        ),
        description="Keyed token-hash sketch of the window title.",
    )
    title_char3_sketch: tuple[float, ...] = Field(
        default_factory=lambda: tuple(
            0.0 for _ in range(DEFAULT_TITLE_CHAR3_SKETCH_BUCKETS)
        ),
        description="Keyed character 3-gram sketch of the window title.",
    )
    title_char_count: int = Field(default=0, ge=0)
    title_token_count: int = Field(default=0, ge=0)
    title_unique_token_ratio: float = Field(default=0.0, ge=0.0, le=1.0)
    title_digit_ratio: float = Field(default=0.0, ge=0.0, le=1.0)
    title_separator_count: int = Field(default=0, ge=0)
    is_browser: bool = Field(description="True if the app is a web browser.")
    is_editor: bool = Field(description="True if the app is a code editor.")
    is_terminal: bool = Field(description="True if the app is a terminal emulator.")
    app_category: str = Field(
        description="Semantic app category (e.g. 'editor', 'chat')."
    )

AWInputEvent

Bases: BaseModel

Aggregated keyboard/mouse activity from aw-watcher-input.

Each event covers a short polling interval (typically 5 s) and carries only aggregate counts -- never individual key identities. This makes the type privacy-safe by construction.

The upstream AW fields deltaX/deltaY and scrollX/scrollY are mapped to snake_case for consistency with project conventions.

Source code in src/taskclf/adapters/activitywatch/types.py
class AWInputEvent(BaseModel, frozen=True):
    """Aggregated keyboard/mouse activity from ``aw-watcher-input``.

    Each event covers a short polling interval (typically 5 s) and
    carries only aggregate counts -- never individual key identities.
    This makes the type privacy-safe by construction.

    The upstream AW fields ``deltaX``/``deltaY`` and ``scrollX``/``scrollY``
    are mapped to snake_case for consistency with project conventions.
    """

    timestamp: datetime = Field(description="Interval start (UTC).")
    duration_seconds: float = Field(ge=0, description="Duration in seconds.")
    presses: int = Field(ge=0, description="Key presses in this interval.")
    clicks: int = Field(ge=0, description="Mouse clicks in this interval.")
    delta_x: int = Field(ge=0, description="Absolute horizontal mouse movement (px).")
    delta_y: int = Field(ge=0, description="Absolute vertical mouse movement (px).")
    scroll_x: int = Field(ge=0, description="Absolute horizontal scroll delta.")
    scroll_y: int = Field(ge=0, description="Absolute vertical scroll delta.")

mapping

taskclf.adapters.activitywatch.mapping

App-name normalization, classification, and category assignment.

ActivityWatch reports the foreground application as a human-readable name (e.g. "Firefox", "Code"). This module maps those names to reverse-domain identifiers, boolean flags, and a semantic category consumed by :class:~taskclf.core.types.FeatureRow.

normalize_app(app_name)

Map an AW application name to a reverse-domain ID, flags, and category.

Performs a case-insensitive lookup in :data:KNOWN_APPS. Unknown applications fall back to "unknown.<sanitized_name>" with all flags set to False and category "other".

Parameters:

Name Type Description Default
app_name str

Application name as reported by ActivityWatch (e.g. "Firefox").

required

Returns:

Type Description
AppInfo

A (app_id, is_browser, is_editor, is_terminal, app_category)

AppInfo

tuple.

Source code in src/taskclf/adapters/activitywatch/mapping.py
def normalize_app(app_name: str) -> AppInfo:
    """Map an AW application name to a reverse-domain ID, flags, and category.

    Performs a case-insensitive lookup in :data:`KNOWN_APPS`.  Unknown
    applications fall back to ``"unknown.<sanitized_name>"`` with all
    flags set to ``False`` and category ``"other"``.

    Args:
        app_name: Application name as reported by ActivityWatch
            (e.g. ``"Firefox"``).

    Returns:
        A ``(app_id, is_browser, is_editor, is_terminal, app_category)``
        tuple.
    """
    key = app_name.strip().lower()
    if key in KNOWN_APPS:
        return KNOWN_APPS[key]
    sanitized = key.replace(" ", "_").replace("/", "_")
    return (f"unknown.{sanitized}", False, False, False, "other")

client

taskclf.adapters.activitywatch.client

ActivityWatch data access: JSON export parsing and REST API client.

Provides two data-ingestion paths:

  • File-based -- :func:parse_aw_export reads an AW JSON export (the format produced by Export all buckets as JSON in the AW web UI or GET /api/0/export).
  • REST-based -- :func:fetch_aw_events queries a running aw-server instance for events in a time range.

Both paths normalize application names via :func:~taskclf.adapters.activitywatch.mapping.normalize_app and replace raw window titles with salted hashes so that no sensitive text is ever persisted.

AWConnectionError

Bases: OSError

The ActivityWatch server refused the connection or is unreachable.

Source code in src/taskclf/adapters/activitywatch/client.py
class AWConnectionError(OSError):
    """The ActivityWatch server refused the connection or is unreachable."""

    def __init__(self, url: str) -> None:
        self.url = url
        super().__init__(f"Cannot connect to ActivityWatch at {url}")

AWTimeoutError

Bases: OSError

The ActivityWatch server did not respond within the timeout.

Source code in src/taskclf/adapters/activitywatch/client.py
class AWTimeoutError(OSError):
    """The ActivityWatch server did not respond within the timeout."""

    def __init__(self, url: str, timeout: int) -> None:
        self.url = url
        self.timeout = timeout
        super().__init__(f"ActivityWatch request to {url} timed out after {timeout}s")

AWNotFoundError

Bases: OSError

The ActivityWatch resource was not found (HTTP 404).

Source code in src/taskclf/adapters/activitywatch/client.py
class AWNotFoundError(OSError):
    """The ActivityWatch resource was not found (HTTP 404)."""

    def __init__(self, url: str) -> None:
        self.url = url
        super().__init__(f"ActivityWatch resource not found at {url}")

parse_aw_export(path, *, title_salt)

Parse an ActivityWatch JSON export file into normalized events.

Filters for buckets of type currentwindow (i.e. aw-watcher-window data). Each event's application name is normalized and its window title is replaced with a salted hash.

Parameters:

Name Type Description Default
path Path

Path to the AW export JSON file.

required
title_salt str

Salt used for hashing window titles.

required

Returns:

Type Description
list[AWEvent]

Sorted (by timestamp) list of :class:AWEvent instances.

Raises:

Type Description
FileNotFoundError

If path does not exist.

KeyError

If the JSON structure is missing expected keys.

Source code in src/taskclf/adapters/activitywatch/client.py
def parse_aw_export(path: Path, *, title_salt: str) -> list[AWEvent]:
    """Parse an ActivityWatch JSON export file into normalized events.

    Filters for buckets of type ``currentwindow`` (i.e.
    ``aw-watcher-window`` data).  Each event's application name is
    normalized and its window title is replaced with a salted hash.

    Args:
        path: Path to the AW export JSON file.
        title_salt: Salt used for hashing window titles.

    Returns:
        Sorted (by timestamp) list of :class:`AWEvent` instances.

    Raises:
        FileNotFoundError: If *path* does not exist.
        KeyError: If the JSON structure is missing expected keys.
    """
    raw = json.loads(path.read_text(encoding="utf-8"))

    buckets: dict[str, Any] = raw.get("buckets", raw)

    events: list[AWEvent] = []
    for bucket_id, bucket in buckets.items():
        bucket_type = bucket.get("type", "")
        if bucket_type != _CURRENTWINDOW_TYPE:
            logger.debug("Skipping bucket %s (type=%s)", bucket_id, bucket_type)
            continue

        logger.info(
            "Processing bucket %s (%d events)",
            bucket_id,
            len(bucket.get("events", [])),
        )
        for raw_event in bucket.get("events", []):
            events.append(_raw_event_to_aw_event(raw_event, title_salt=title_salt))

    events.sort(key=lambda e: e.timestamp)
    return events

parse_aw_input_export(path)

Parse aw-watcher-input events from an AW JSON export.

Filters for buckets of type os.hid.input. These events carry only aggregate counts (key presses, mouse clicks, movement, scroll) and contain no sensitive payload.

Parameters:

Name Type Description Default
path Path

Path to the AW export JSON file.

required

Returns:

Type Description
list[AWInputEvent]

Sorted (by timestamp) list of :class:AWInputEvent instances.

list[AWInputEvent]

Empty if no os.hid.input bucket exists in the export.

Source code in src/taskclf/adapters/activitywatch/client.py
def parse_aw_input_export(path: Path) -> list[AWInputEvent]:
    """Parse ``aw-watcher-input`` events from an AW JSON export.

    Filters for buckets of type ``os.hid.input``.  These events carry
    only aggregate counts (key presses, mouse clicks, movement, scroll)
    and contain no sensitive payload.

    Args:
        path: Path to the AW export JSON file.

    Returns:
        Sorted (by timestamp) list of :class:`AWInputEvent` instances.
        Empty if no ``os.hid.input`` bucket exists in the export.
    """
    raw = json.loads(path.read_text(encoding="utf-8"))
    buckets: dict[str, Any] = raw.get("buckets", raw)

    events: list[AWInputEvent] = []
    for bucket_id, bucket in buckets.items():
        bucket_type = bucket.get("type", "")
        if bucket_type != _INPUT_TYPE:
            continue

        logger.info(
            "Processing input bucket %s (%d events)",
            bucket_id,
            len(bucket.get("events", [])),
        )
        for raw_event in bucket.get("events", []):
            events.append(_raw_to_input_event(raw_event))

    events.sort(key=lambda e: e.timestamp)
    return events

list_aw_buckets(host, *, timeout=DEFAULT_AW_TIMEOUT_SECONDS)

List all buckets from a running AW server.

Parameters:

Name Type Description Default
host str

Base URL of the AW server (e.g. "http://localhost:5600").

required
timeout int

Seconds to wait for a response.

DEFAULT_AW_TIMEOUT_SECONDS

Returns:

Type Description
dict[str, dict]

Dict mapping bucket IDs to their metadata.

Source code in src/taskclf/adapters/activitywatch/client.py
def list_aw_buckets(
    host: str,
    *,
    timeout: int = DEFAULT_AW_TIMEOUT_SECONDS,
) -> dict[str, dict]:
    """List all buckets from a running AW server.

    Args:
        host: Base URL of the AW server (e.g. ``"http://localhost:5600"``).
        timeout: Seconds to wait for a response.

    Returns:
        Dict mapping bucket IDs to their metadata.
    """
    url = f"{host.rstrip('/')}/api/0/buckets/"
    return _api_get(url, timeout=timeout)

find_window_bucket_id(host, *, timeout=DEFAULT_AW_TIMEOUT_SECONDS)

Auto-discover the aw-watcher-window bucket on host.

Parameters:

Name Type Description Default
host str

Base URL of the AW server.

required
timeout int

Seconds to wait for a response.

DEFAULT_AW_TIMEOUT_SECONDS

Returns:

Type Description
str

The bucket ID whose type is currentwindow.

Raises:

Type Description
ValueError

If no currentwindow bucket exists on the server.

Source code in src/taskclf/adapters/activitywatch/client.py
def find_window_bucket_id(
    host: str,
    *,
    timeout: int = DEFAULT_AW_TIMEOUT_SECONDS,
) -> str:
    """Auto-discover the ``aw-watcher-window`` bucket on *host*.

    Args:
        host: Base URL of the AW server.
        timeout: Seconds to wait for a response.

    Returns:
        The bucket ID whose ``type`` is ``currentwindow``.

    Raises:
        ValueError: If no ``currentwindow`` bucket exists on the server.
    """
    buckets = list_aw_buckets(host, timeout=timeout)
    for bucket_id, meta in buckets.items():
        if meta.get("type") == _CURRENTWINDOW_TYPE:
            return bucket_id
    raise ValueError(
        f"No bucket with type={_CURRENTWINDOW_TYPE!r} found on {host}. "
        f"Available: {list(buckets.keys())}"
    )

find_input_bucket_id(host, *, timeout=DEFAULT_AW_TIMEOUT_SECONDS)

Auto-discover the aw-watcher-input bucket on host.

Unlike :func:find_window_bucket_id, this returns None when no input bucket exists because aw-watcher-input is an optional watcher that many users don't run.

Parameters:

Name Type Description Default
host str

Base URL of the AW server.

required
timeout int

Seconds to wait for a response.

DEFAULT_AW_TIMEOUT_SECONDS

Returns:

Type Description
str | None

The bucket ID whose type is os.hid.input, or None.

Source code in src/taskclf/adapters/activitywatch/client.py
def find_input_bucket_id(
    host: str,
    *,
    timeout: int = DEFAULT_AW_TIMEOUT_SECONDS,
) -> str | None:
    """Auto-discover the ``aw-watcher-input`` bucket on *host*.

    Unlike :func:`find_window_bucket_id`, this returns ``None`` when no
    input bucket exists because ``aw-watcher-input`` is an optional
    watcher that many users don't run.

    Args:
        host: Base URL of the AW server.
        timeout: Seconds to wait for a response.

    Returns:
        The bucket ID whose ``type`` is ``os.hid.input``, or ``None``.
    """
    buckets = list_aw_buckets(host, timeout=timeout)
    for bucket_id, meta in buckets.items():
        if meta.get("type") == _INPUT_TYPE:
            return bucket_id
    return None

fetch_aw_events(host, bucket_id, start, end, *, title_salt, timeout=DEFAULT_AW_TIMEOUT_SECONDS)

Fetch events from the AW REST API for a time range.

Parameters:

Name Type Description Default
host str

Base URL of the AW server (e.g. "http://localhost:5600").

required
bucket_id str

Bucket to query (e.g. "aw-watcher-window_myhostname").

required
start datetime

Inclusive start of the query window (UTC).

required
end datetime

Exclusive end of the query window (UTC).

required
title_salt str

Salt used for hashing window titles.

required
timeout int

Seconds to wait for a response.

DEFAULT_AW_TIMEOUT_SECONDS

Returns:

Type Description
list[AWEvent]

Sorted list of :class:AWEvent instances.

Source code in src/taskclf/adapters/activitywatch/client.py
def fetch_aw_events(
    host: str,
    bucket_id: str,
    start: datetime,
    end: datetime,
    *,
    title_salt: str,
    timeout: int = DEFAULT_AW_TIMEOUT_SECONDS,
) -> list[AWEvent]:
    """Fetch events from the AW REST API for a time range.

    Args:
        host: Base URL of the AW server (e.g. ``"http://localhost:5600"``).
        bucket_id: Bucket to query (e.g. ``"aw-watcher-window_myhostname"``).
        start: Inclusive start of the query window (UTC).
        end: Exclusive end of the query window (UTC).
        title_salt: Salt used for hashing window titles.
        timeout: Seconds to wait for a response.

    Returns:
        Sorted list of :class:`AWEvent` instances.
    """
    base = host.rstrip("/")
    start_iso = start.isoformat() + "Z" if start.tzinfo is None else start.isoformat()
    end_iso = end.isoformat() + "Z" if end.tzinfo is None else end.isoformat()

    qs = urllib.parse.urlencode(
        {"start": start_iso, "end": end_iso},
        safe=":",
        quote_via=urllib.parse.quote,
    )
    url = f"{base}/api/0/buckets/{bucket_id}/events?{qs}"
    raw_events: list[dict] = _api_get(url, timeout=timeout)

    events = [_raw_event_to_aw_event(e, title_salt=title_salt) for e in raw_events]
    events.sort(key=lambda e: e.timestamp)
    return events

fetch_aw_input_events(host, bucket_id, start, end, *, timeout=DEFAULT_AW_TIMEOUT_SECONDS)

Fetch input events from the AW REST API for a time range.

Parameters:

Name Type Description Default
host str

Base URL of the AW server.

required
bucket_id str

Input bucket to query (e.g. "aw-watcher-input_myhostname").

required
start datetime

Inclusive start of the query window (UTC).

required
end datetime

Exclusive end of the query window (UTC).

required
timeout int

Seconds to wait for a response.

DEFAULT_AW_TIMEOUT_SECONDS

Returns:

Type Description
list[AWInputEvent]

Sorted list of :class:AWInputEvent instances.

Source code in src/taskclf/adapters/activitywatch/client.py
def fetch_aw_input_events(
    host: str,
    bucket_id: str,
    start: datetime,
    end: datetime,
    *,
    timeout: int = DEFAULT_AW_TIMEOUT_SECONDS,
) -> list[AWInputEvent]:
    """Fetch input events from the AW REST API for a time range.

    Args:
        host: Base URL of the AW server.
        bucket_id: Input bucket to query (e.g.
            ``"aw-watcher-input_myhostname"``).
        start: Inclusive start of the query window (UTC).
        end: Exclusive end of the query window (UTC).
        timeout: Seconds to wait for a response.

    Returns:
        Sorted list of :class:`AWInputEvent` instances.
    """
    base = host.rstrip("/")
    start_iso = start.isoformat() + "Z" if start.tzinfo is None else start.isoformat()
    end_iso = end.isoformat() + "Z" if end.tzinfo is None else end.isoformat()

    qs = urllib.parse.urlencode(
        {"start": start_iso, "end": end_iso},
        safe=":",
        quote_via=urllib.parse.quote,
    )
    url = f"{base}/api/0/buckets/{bucket_id}/events?{qs}"
    raw_events: list[dict] = _api_get(url, timeout=timeout)

    events = [_raw_to_input_event(e) for e in raw_events]
    events.sort(key=lambda e: e.timestamp)
    return events