diff --git a/aw_query/functions.py b/aw_query/functions.py index b02ce5e..a87d332 100644 --- a/aw_query/functions.py +++ b/aw_query/functions.py @@ -23,6 +23,7 @@ flood, limit_events, merge_events_by_keys, + merge_subwatcher_fields, period_union, simplify_string, sort_by_duration, @@ -227,6 +228,14 @@ def q2_merge_events_by_keys(events: list, keys: list) -> List[Event]: return merge_events_by_keys(events, keys) +@q2_function(merge_subwatcher_fields) +@q2_typecheck +def q2_merge_subwatcher_fields( + base_events: list, subwatcher_events: list, keys: list, conflict: str = "base_wins" +) -> List[Event]: + return merge_subwatcher_fields(base_events, subwatcher_events, keys, conflict) + + @q2_function(chunk_events_by_key) @q2_typecheck def q2_chunk_events_by_key(events: list, key: str) -> List[Event]: diff --git a/aw_transform/__init__.py b/aw_transform/__init__.py index 1297e42..d17e0fe 100644 --- a/aw_transform/__init__.py +++ b/aw_transform/__init__.py @@ -11,6 +11,7 @@ limit_events, ) from .split_url_events import split_url_events +from .merge_subwatcher_fields import merge_subwatcher_fields from .simplify import simplify_string from .flood import flood from .classify import categorize, tag, Rule @@ -33,6 +34,7 @@ "heartbeat_reduce", "heartbeat_merge", "merge_events_by_keys", + "merge_subwatcher_fields", "chunk_events_by_key", "limit_events", "filter_keyvals", diff --git a/aw_transform/merge_subwatcher_fields.py b/aw_transform/merge_subwatcher_fields.py new file mode 100644 index 0000000..6cb7f3f --- /dev/null +++ b/aw_transform/merge_subwatcher_fields.py @@ -0,0 +1,105 @@ +import logging +from copy import deepcopy +from typing import List, Optional + +from aw_core.models import Event + +from .filter_period_intersect import _get_event_period + +logger = logging.getLogger(__name__) + + +def merge_subwatcher_fields( + base_events: List[Event], + subwatcher_events: List[Event], + keys: List[str], + conflict: str = "base_wins", +) -> List[Event]: + """ + For each event in *base_events*, find the longest-overlapping event in + *subwatcher_events* and copy the named *keys* from that subwatcher event + into the base event's ``data`` dict. + + Timestamps, durations, and event count of *base_events* are **unchanged** + — no phantom events are created. This makes duration/app/title aggregations + stay correct, unlike the ``concat`` workaround. + + This is the backend primitive that lets every client (webui, native UIs, + exporters) categorize by subwatcher fields (browser ``url``/``$domain``; + editor ``project``/``file``/``language``) without bespoke per-watcher + client-side code. + + Args: + base_events: The canonical window/afk-filtered stream to enrich. + subwatcher_events: Events from a subwatcher bucket (e.g. aw-watcher-vim, + aw-watcher-web). Should already be clipped via + ``filter_period_intersect`` before passing here. + keys: Which keys to copy from the subwatcher event into the base event. + Keys already present in the base event are left untouched when + ``conflict="base_wins"`` (default). + conflict: ``"base_wins"`` (default) — base event's existing keys are + never overwritten; subwatcher fields are purely additive. + ``"sub_wins"`` — subwatcher fields overwrite base fields. + + Returns: + A new list of base events with subwatcher fields injected. Events in + *base_events* that have no overlapping subwatcher event are returned + with their original data unchanged. + + Example:: + + window_events = query_bucket(bid_window) + editor_events = flood(query_bucket(bid_editor)) + editor_events = filter_period_intersect(editor_events, window_events) + window_events = merge_subwatcher_fields( + window_events, editor_events, ["project", "file", "language"] + ) + # Now categorize(window_events, ...) can match on "project"/"file" + + Note on N:1 overlap: + When multiple subwatcher events overlap a single base event, the one + with the **longest overlap duration** is used (attach-longest strategy). + This matches heartbeat granularity and avoids splitting base events. + """ + if conflict not in ("base_wins", "sub_wins"): + raise ValueError( + f"conflict must be 'base_wins' or 'sub_wins', got {conflict!r}" + ) + if not subwatcher_events or not keys: + return [deepcopy(e) for e in base_events] + + # Build a sorted copy so we can do a linear scan + sub_sorted = sorted(subwatcher_events, key=lambda e: e.timestamp) + + result: List[Event] = [] + for base in base_events: + base_period = _get_event_period(base) + best_sub: Optional[Event] = None + best_overlap_secs: float = 0.0 + + for sub in sub_sorted: + sub_period = _get_event_period(sub) + # Once sub starts after base ends we can stop + if sub_period.start >= base_period.end: + break + # Skip sub events that end before base starts + if sub_period.end <= base_period.start: + continue + ip = base_period.intersection(sub_period) + if ip: + overlap_secs = ip.duration.total_seconds() + if overlap_secs > best_overlap_secs: + best_overlap_secs = overlap_secs + best_sub = sub + + enriched = deepcopy(base) + if best_sub is not None: + for key in keys: + if key in best_sub.data: + if conflict == "base_wins" and key in enriched.data: + pass # base keeps its value + else: + enriched.data[key] = best_sub.data[key] + result.append(enriched) + + return result diff --git a/tests/test_transforms.py b/tests/test_transforms.py index 3aa119c..a2562d9 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -1,11 +1,13 @@ from pprint import pprint from datetime import datetime, timedelta, timezone +import pytest from aw_core.models import Event from aw_transform import ( filter_period_intersect, filter_keyvals_regex, filter_keyvals, + merge_subwatcher_fields, period_union, sort_by_timestamp, sort_by_duration, @@ -469,3 +471,141 @@ def test_union_no_overlap(): dur = sum((e.duration for e in events_union), timedelta(0)) assert dur == timedelta(hours=5, minutes=0) assert sorted(events_union, key=lambda e: e.timestamp) + + +def test_merge_subwatcher_fields_basic(): + """Subwatcher fields are injected into overlapping base events.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + + base = [ + Event( + timestamp=now, + duration=td1h, + data={"app": "vim", "title": "file.py"}, + ) + ] + sub = [ + Event( + timestamp=now, + duration=td1h, + data={"project": "myproject", "file": "file.py", "language": "python"}, + ) + ] + result = merge_subwatcher_fields(base, sub, ["project", "file", "language"]) + + assert len(result) == 1 + # Original base fields preserved + assert result[0].data["app"] == "vim" + assert result[0].data["title"] == "file.py" + # Subwatcher fields injected + assert result[0].data["project"] == "myproject" + assert result[0].data["language"] == "python" + # Timestamps and durations unchanged + assert result[0].timestamp == now + assert result[0].duration == td1h + + +def test_merge_subwatcher_fields_no_overlap(): + """Base events with no overlapping subwatcher event are returned unchanged.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + + base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] + # Subwatcher event is entirely after the base event + sub = [ + Event( + timestamp=now + 2 * td1h, + duration=td1h, + data={"project": "other"}, + ) + ] + result = merge_subwatcher_fields(base, sub, ["project"]) + + assert len(result) == 1 + assert "project" not in result[0].data + assert result[0].data["app"] == "vim" + + +def test_merge_subwatcher_fields_base_wins_conflict(): + """With conflict='base_wins' (default), existing base keys are not overwritten.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + + base = [Event(timestamp=now, duration=td1h, data={"app": "vim", "file": "base.py"})] + sub = [Event(timestamp=now, duration=td1h, data={"file": "sub.py", "project": "p"})] + + result = merge_subwatcher_fields( + base, sub, ["file", "project"], conflict="base_wins" + ) + # base's "file" must not be overwritten + assert result[0].data["file"] == "base.py" + # "project" not in base → injected from sub + assert result[0].data["project"] == "p" + + +def test_merge_subwatcher_fields_sub_wins_conflict(): + """With conflict='sub_wins', subwatcher fields overwrite base fields.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + + base = [Event(timestamp=now, duration=td1h, data={"app": "vim", "file": "base.py"})] + sub = [Event(timestamp=now, duration=td1h, data={"file": "sub.py"})] + + result = merge_subwatcher_fields(base, sub, ["file"], conflict="sub_wins") + assert result[0].data["file"] == "sub.py" + + +def test_merge_subwatcher_fields_attach_longest(): + """When multiple subwatcher events overlap a base event, the longest overlap wins.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td30m = timedelta(minutes=30) + td1h = timedelta(hours=1) + + # Base event: 12:00 – 13:00 + base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] + # Short overlap: 12:00 – 12:30 (30 min) + sub_short = Event(timestamp=now, duration=td30m, data={"project": "short"}) + # Long overlap: 12:30 – 13:00 (30 min) — same overlap here, but added first + # Make one clearly longer: 11:45 – 13:00 (75 min overlap into base) + sub_long = Event( + timestamp=now - timedelta(minutes=15), + duration=td1h + timedelta(minutes=15), + data={"project": "long"}, + ) + result = merge_subwatcher_fields(base, [sub_short, sub_long], ["project"]) + assert result[0].data["project"] == "long" + + +def test_merge_subwatcher_fields_empty_inputs(): + """Empty sub or keys returns a defensive copy of base (not the same list).""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] + + # Empty subwatcher list — returns a new list, data unchanged + result = merge_subwatcher_fields(base, [], ["project"]) + assert result[0].data == {"app": "vim"} + assert result is not base + + # Empty keys list + sub = [Event(timestamp=now, duration=td1h, data={"project": "p"})] + result = merge_subwatcher_fields(base, sub, []) + assert "project" not in result[0].data + assert result is not base + + # Both empty + result = merge_subwatcher_fields(base, [], []) + assert result[0].data == {"app": "vim"} + assert result is not base + + +def test_merge_subwatcher_fields_invalid_conflict(): + """Invalid conflict value raises ValueError immediately.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] + sub = [Event(timestamp=now, duration=td1h, data={"project": "p"})] + + with pytest.raises(ValueError, match="conflict must be"): + merge_subwatcher_fields(base, sub, ["project"], conflict="invalid")