Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions aw_query/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
flood,
limit_events,
merge_events_by_keys,
merge_subwatcher_fields,
period_union,
simplify_string,
sort_by_duration,
Expand Down Expand Up @@ -227,6 +228,14 @@ def q2_merge_events_by_keys(events: list, keys: list) -> List[Event]:
return merge_events_by_keys(events, keys)


@q2_function(merge_subwatcher_fields)
@q2_typecheck
def q2_merge_subwatcher_fields(
base_events: list, subwatcher_events: list, keys: list, conflict: str = "base_wins"
) -> List[Event]:
return merge_subwatcher_fields(base_events, subwatcher_events, keys, conflict)


@q2_function(chunk_events_by_key)
@q2_typecheck
def q2_chunk_events_by_key(events: list, key: str) -> List[Event]:
Expand Down
2 changes: 2 additions & 0 deletions aw_transform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
limit_events,
)
from .split_url_events import split_url_events
from .merge_subwatcher_fields import merge_subwatcher_fields
from .simplify import simplify_string
from .flood import flood
from .classify import categorize, tag, Rule
Expand All @@ -33,6 +34,7 @@
"heartbeat_reduce",
"heartbeat_merge",
"merge_events_by_keys",
"merge_subwatcher_fields",
"chunk_events_by_key",
"limit_events",
"filter_keyvals",
Expand Down
105 changes: 105 additions & 0 deletions aw_transform/merge_subwatcher_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import logging
from copy import deepcopy
from typing import List, Optional

from aw_core.models import Event

from .filter_period_intersect import _get_event_period

logger = logging.getLogger(__name__)


def merge_subwatcher_fields(
base_events: List[Event],
subwatcher_events: List[Event],
keys: List[str],
conflict: str = "base_wins",
) -> List[Event]:
"""
For each event in *base_events*, find the longest-overlapping event in
*subwatcher_events* and copy the named *keys* from that subwatcher event
into the base event's ``data`` dict.

Timestamps, durations, and event count of *base_events* are **unchanged**
— no phantom events are created. This makes duration/app/title aggregations
stay correct, unlike the ``concat`` workaround.

This is the backend primitive that lets every client (webui, native UIs,
exporters) categorize by subwatcher fields (browser ``url``/``$domain``;
editor ``project``/``file``/``language``) without bespoke per-watcher
client-side code.

Args:
base_events: The canonical window/afk-filtered stream to enrich.
subwatcher_events: Events from a subwatcher bucket (e.g. aw-watcher-vim,
aw-watcher-web). Should already be clipped via
``filter_period_intersect`` before passing here.
keys: Which keys to copy from the subwatcher event into the base event.
Keys already present in the base event are left untouched when
``conflict="base_wins"`` (default).
conflict: ``"base_wins"`` (default) — base event's existing keys are
never overwritten; subwatcher fields are purely additive.
``"sub_wins"`` — subwatcher fields overwrite base fields.

Returns:
A new list of base events with subwatcher fields injected. Events in
*base_events* that have no overlapping subwatcher event are returned
with their original data unchanged.

Example::

window_events = query_bucket(bid_window)
editor_events = flood(query_bucket(bid_editor))
editor_events = filter_period_intersect(editor_events, window_events)
window_events = merge_subwatcher_fields(
window_events, editor_events, ["project", "file", "language"]
)
# Now categorize(window_events, ...) can match on "project"/"file"

Note on N:1 overlap:
When multiple subwatcher events overlap a single base event, the one
with the **longest overlap duration** is used (attach-longest strategy).
This matches heartbeat granularity and avoids splitting base events.
"""
if conflict not in ("base_wins", "sub_wins"):
raise ValueError(
f"conflict must be 'base_wins' or 'sub_wins', got {conflict!r}"
)
if not subwatcher_events or not keys:
return [deepcopy(e) for e in base_events]

# Build a sorted copy so we can do a linear scan
sub_sorted = sorted(subwatcher_events, key=lambda e: e.timestamp)

result: List[Event] = []
for base in base_events:
base_period = _get_event_period(base)
best_sub: Optional[Event] = None
best_overlap_secs: float = 0.0

for sub in sub_sorted:
sub_period = _get_event_period(sub)
# Once sub starts after base ends we can stop
if sub_period.start >= base_period.end:
break
# Skip sub events that end before base starts
if sub_period.end <= base_period.start:
continue
ip = base_period.intersection(sub_period)
if ip:
overlap_secs = ip.duration.total_seconds()
if overlap_secs > best_overlap_secs:
best_overlap_secs = overlap_secs
best_sub = sub

enriched = deepcopy(base)
if best_sub is not None:
for key in keys:
if key in best_sub.data:
if conflict == "base_wins" and key in enriched.data:
pass # base keeps its value
else:
enriched.data[key] = best_sub.data[key]
result.append(enriched)

return result
140 changes: 140 additions & 0 deletions tests/test_transforms.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from pprint import pprint
from datetime import datetime, timedelta, timezone

import pytest
from aw_core.models import Event
from aw_transform import (
filter_period_intersect,
filter_keyvals_regex,
filter_keyvals,
merge_subwatcher_fields,
period_union,
sort_by_timestamp,
sort_by_duration,
Expand Down Expand Up @@ -469,3 +471,141 @@ def test_union_no_overlap():
dur = sum((e.duration for e in events_union), timedelta(0))
assert dur == timedelta(hours=5, minutes=0)
assert sorted(events_union, key=lambda e: e.timestamp)


def test_merge_subwatcher_fields_basic():
"""Subwatcher fields are injected into overlapping base events."""
now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc)
td1h = timedelta(hours=1)

base = [
Event(
timestamp=now,
duration=td1h,
data={"app": "vim", "title": "file.py"},
)
]
sub = [
Event(
timestamp=now,
duration=td1h,
data={"project": "myproject", "file": "file.py", "language": "python"},
)
]
result = merge_subwatcher_fields(base, sub, ["project", "file", "language"])

assert len(result) == 1
# Original base fields preserved
assert result[0].data["app"] == "vim"
assert result[0].data["title"] == "file.py"
# Subwatcher fields injected
assert result[0].data["project"] == "myproject"
assert result[0].data["language"] == "python"
# Timestamps and durations unchanged
assert result[0].timestamp == now
assert result[0].duration == td1h


def test_merge_subwatcher_fields_no_overlap():
"""Base events with no overlapping subwatcher event are returned unchanged."""
now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc)
td1h = timedelta(hours=1)

base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})]
# Subwatcher event is entirely after the base event
sub = [
Event(
timestamp=now + 2 * td1h,
duration=td1h,
data={"project": "other"},
)
]
result = merge_subwatcher_fields(base, sub, ["project"])

assert len(result) == 1
assert "project" not in result[0].data
assert result[0].data["app"] == "vim"


def test_merge_subwatcher_fields_base_wins_conflict():
"""With conflict='base_wins' (default), existing base keys are not overwritten."""
now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc)
td1h = timedelta(hours=1)

base = [Event(timestamp=now, duration=td1h, data={"app": "vim", "file": "base.py"})]
sub = [Event(timestamp=now, duration=td1h, data={"file": "sub.py", "project": "p"})]

result = merge_subwatcher_fields(
base, sub, ["file", "project"], conflict="base_wins"
)
# base's "file" must not be overwritten
assert result[0].data["file"] == "base.py"
# "project" not in base → injected from sub
assert result[0].data["project"] == "p"


def test_merge_subwatcher_fields_sub_wins_conflict():
"""With conflict='sub_wins', subwatcher fields overwrite base fields."""
now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc)
td1h = timedelta(hours=1)

base = [Event(timestamp=now, duration=td1h, data={"app": "vim", "file": "base.py"})]
sub = [Event(timestamp=now, duration=td1h, data={"file": "sub.py"})]

result = merge_subwatcher_fields(base, sub, ["file"], conflict="sub_wins")
assert result[0].data["file"] == "sub.py"


def test_merge_subwatcher_fields_attach_longest():
"""When multiple subwatcher events overlap a base event, the longest overlap wins."""
now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc)
td30m = timedelta(minutes=30)
td1h = timedelta(hours=1)

# Base event: 12:00 – 13:00
base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})]
# Short overlap: 12:00 – 12:30 (30 min)
sub_short = Event(timestamp=now, duration=td30m, data={"project": "short"})
# Long overlap: 12:30 – 13:00 (30 min) — same overlap here, but added first
# Make one clearly longer: 11:45 – 13:00 (75 min overlap into base)
sub_long = Event(
timestamp=now - timedelta(minutes=15),
duration=td1h + timedelta(minutes=15),
data={"project": "long"},
)
result = merge_subwatcher_fields(base, [sub_short, sub_long], ["project"])
assert result[0].data["project"] == "long"


def test_merge_subwatcher_fields_empty_inputs():
"""Empty sub or keys returns a defensive copy of base (not the same list)."""
now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc)
td1h = timedelta(hours=1)
base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})]

# Empty subwatcher list — returns a new list, data unchanged
result = merge_subwatcher_fields(base, [], ["project"])
assert result[0].data == {"app": "vim"}
assert result is not base

# Empty keys list
sub = [Event(timestamp=now, duration=td1h, data={"project": "p"})]
result = merge_subwatcher_fields(base, sub, [])
assert "project" not in result[0].data
assert result is not base

# Both empty
result = merge_subwatcher_fields(base, [], [])
assert result[0].data == {"app": "vim"}
assert result is not base


def test_merge_subwatcher_fields_invalid_conflict():
"""Invalid conflict value raises ValueError immediately."""
now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc)
td1h = timedelta(hours=1)
base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})]
sub = [Event(timestamp=now, duration=td1h, data={"project": "p"})]

with pytest.raises(ValueError, match="conflict must be"):
merge_subwatcher_fields(base, sub, ["project"], conflict="invalid")
Loading