Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
examples/**/temp*
.claude

# Byte-compiled / optimized / DLL files
Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ asyncio.run(main())

A `Handoff` holds your transport config — the streaming server and notifiers — and is reusable across pages and runs. You decide *what* to watch for per call, so the same `Handoff` serves any number of scenarios.

**Let the library detect the moment** with `handoff.run(page, scenarios=[...])`. A `Scenario` is a pair: a `trigger` that says "stop, a human is needed" and a `complete` that says "OK, they're done." `run` watches every scenario's trigger. If none fires within `timeout` seconds, it returns `HandoffResult(was_blocked=False)` and your script keeps going. If one fires, it starts a local streaming server, surfaces the URL (printed to logs and pushed to your notifiers), and waits until that scenario's `complete` matches — or until `server.session_timeout` elapses, in which case the result has `timed_out=True`. It never raises on timeout; check the result.
**Let the library detect the moment** with `handoff.run(page, scenarios=[...])`. A `Scenario` is a pair: a `trigger` that says "stop, a human is needed" and a `complete` that says "OK, they're done." `run` watches every scenario's trigger. If none fires within `trigger_timeout` seconds, it returns `HandoffResult(was_blocked=False)` and your script keeps going. If one fires, it starts a local streaming server, surfaces the URL (printed to logs and pushed to your notifiers), and waits until that scenario's `complete` matches — or until one of the handoff timers fires (`access_timeout` if the operator never opens the link, `completion_timeout` if they open it but don't finish). On timeout the result has `timed_out=True` and `timeout_cause` set to `"access"` or `"completion"`. It never raises on timeout; check the result.

**Already know a human is needed?** Skip trigger detection and stream right away with `handoff.wait_for_completion(page, on=...)`. This is the right call when something upstream already decided — e.g. an AI agent navigated to the payment page itself — so watching for a trigger would be redundant:

Expand Down Expand Up @@ -194,7 +194,8 @@ Handoff(
host="127.0.0.1", # "0.0.0.0" to expose on LAN
port=8080,
public_base="https://my-tunnel.example.com", # what notifiers link to
session_timeout=600, # max session lifetime / human wait (s)
access_timeout=600, # pre-connect bound (s)
completion_timeout=1800, # post-connect work budget (s)
jpeg_quality=75,
every_nth_frame=1,
),
Expand All @@ -203,7 +204,7 @@ Handoff(

### Access control

The stream URL carries a high-entropy capability token (`…/?t=<token>`): whoever holds the link can view **and control** the page, so treat it like a password. The token is unguessable, decoupled from internal ids, and expires when the handoff finishes or `session_timeout` elapses — a stale link stops working. When exposing beyond loopback (`0.0.0.0`, a tunnel, or a sandbox preview URL), **serve over HTTPS/WSS** so the token isn't readable in transit; set `public_base` to your public `https://` origin and the operator link is built from it. There is no second factor yet — one leaked, still-active link grants control, so deliver it over a trusted channel.
The stream URL carries a high-entropy capability token (`…/?t=<token>`): whoever holds the link can view **and control** the page, so treat it like a password. The token is unguessable, decoupled from internal ids, and expires when the handoff finishes or the worst-case session lifetime (`access_timeout` + `completion_timeout`) elapses — a stale link stops working. When exposing beyond loopback (`0.0.0.0`, a tunnel, or a sandbox preview URL), **serve over HTTPS/WSS** so the token isn't readable in transit; set `public_base` to your public `https://` origin and the operator link is built from it. There is no second factor yet — one leaked, still-active link grants control, so deliver it over a trusted channel.

## Examples

Expand Down
177 changes: 138 additions & 39 deletions browser_handoff/handoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from contextlib import suppress
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Literal

from .config import load_file, load_json, load_yaml
from .detection.base import BaseDetection, DetectionResult
Expand Down Expand Up @@ -125,6 +125,17 @@ async def _capture_crop_metrics(
return None


def _resolve_timeout(
per_call: float | None, default: float | None
) -> float | None:
"""Resolve a per-call timeout against the ServerConfig default.

`None` per-call inherits the default; any other value (including
`math.inf`) overrides. `None` on both layers means truly disabled.
"""
return default if per_call is None else per_call


def _detection_tree_has_llm(detection: BaseDetection) -> bool:
"""True if `detection` or any nested child is an LLMDetection.

Expand All @@ -150,16 +161,19 @@ class HandoffResult:
"""Outcome of a Handoff.run() call.

Three terminal states:
- was_blocked=False → no trigger fired within timeout
- was_blocked=False → no trigger fired within trigger_timeout
- was_blocked=True, timed_out=False → human completed the task
- was_blocked=True, timed_out=True → human exceeded session_timeout
- was_blocked=True, timed_out=True → one of the handoff timers fired
"""

was_blocked: bool
"""Whether a trigger fired and a human handoff ran."""

timed_out: bool = False
"""Only meaningful if `was_blocked`: human exceeded session_timeout."""
"""Only meaningful if `was_blocked`: either timer fired."""

timeout_cause: Literal["access", "completion"] | None = None
"""Which timer fired. None unless `timed_out`."""

scenario_name: str | None = None
"""Name of the scenario that fired."""
Expand Down Expand Up @@ -298,28 +312,34 @@ async def run(
page: "Page",
*,
scenarios: list[Scenario] | None = None,
timeout: float = 30.0,
trigger_timeout: float = 30.0,
access_timeout: float | None = None,
completion_timeout: float | None = None,
stream_url: str | None = None,
) -> "HandoffResult":
"""Watch for triggers; on match, run a handoff and await completion.

Registers listeners on every scenario's trigger and waits for
one to fire (within `timeout`) or for `timeout` to elapse.
one to fire (within `trigger_timeout`) or for it to elapse.

Args:
page: Playwright page to monitor.
scenarios: Trigger-completion pairs to watch. Falls back to
the scenarios set on the instance. ValueError if neither.
timeout: Max seconds to wait for any trigger. Does NOT bound
the human-completion phase — that uses
`ServerConfig.session_timeout` (default 600s).
trigger_timeout: Max seconds to wait for any trigger. Does
NOT bound the human-completion phase.
access_timeout: Per-call override for the pre-connect bound.
None inherits `ServerConfig.access_timeout`.
completion_timeout: Per-call override for the post-connect
work budget. None inherits `ServerConfig.completion_timeout`.
stream_url: Optional substrate viewer URL. When set, the
handoff runs in passthrough mode and `stream_url` is
forwarded to `wait_for_completion`.

Returns:
HandoffResult describing what happened. Never raises on
completion-phase timeout — check `result.timed_out`.
handoff-phase timeout — check `result.timed_out` and
`result.timeout_cause`.
"""
scenarios = scenarios if scenarios is not None else self.scenarios
if not scenarios:
Expand Down Expand Up @@ -377,12 +397,14 @@ async def on_trigger(detection: BaseDetection) -> None:

if not trigger_event.is_set():
with suppress(asyncio.TimeoutError):
await asyncio.wait_for(trigger_event.wait(), timeout=timeout)
await asyncio.wait_for(
trigger_event.wait(), timeout=trigger_timeout
)

if matched_scenario is None or matched_result is None:
logger.info(
"handoff.run: no trigger matched within %.1fs (page url=%s)",
timeout, page.url,
trigger_timeout, page.url,
)
return HandoffResult(was_blocked=False)

Expand All @@ -396,6 +418,8 @@ async def on_trigger(detection: BaseDetection) -> None:
reason=matched_result.reason,
name=matched_scenario.name,
stream_url=stream_url,
access_timeout=access_timeout,
completion_timeout=completion_timeout,
)
finally:
for cleanup in cleanups:
Expand All @@ -410,6 +434,8 @@ async def wait_for_completion(
reason: str = "Human intervention required",
name: str = "handoff",
stream_url: str | None = None,
access_timeout: float | None = None,
completion_timeout: float | None = None,
) -> "HandoffResult":
"""Stream the page to a human *now* and wait until `on` matches.

Expand All @@ -426,11 +452,14 @@ async def wait_for_completion(
stream_url: Optional substrate viewer URL. When set, the
wrapper iframes this URL; bh still owns detection,
notification, and lifecycle.
access_timeout: Per-call override of the pre-connect bound.
None inherits `ServerConfig.access_timeout`.
completion_timeout: Per-call override of the post-connect
work budget. None inherits `ServerConfig.completion_timeout`.

Returns:
HandoffResult with `was_blocked=True`. Check `timed_out` for
whether the human finished within session_timeout. Never
raises on timeout.
HandoffResult with `was_blocked=True`. Check `timed_out` /
`timeout_cause` for which timer fired. Never raises on timeout.
"""
context = page.context
start_time = time.time()
Expand All @@ -439,6 +468,11 @@ async def wait_for_completion(
completion_event = asyncio.Event()
completion_reason: str | None = None

resolved_access = _resolve_timeout(access_timeout, self.server.access_timeout)
resolved_completion = _resolve_timeout(
completion_timeout, self.server.completion_timeout
)

# Closure cell — the gated callback is defined before the
# session exists; we patch it in once register_session returns.
session_ref: dict[str, Any] = {"session": None}
Expand Down Expand Up @@ -493,6 +527,8 @@ async def on_completion_detected(detection: BaseDetection) -> None:
viewport_size=viewport_size,
stream_url=stream_url,
crop_metrics=crop_metrics,
access_timeout=resolved_access,
completion_timeout=resolved_completion,
)
session_ref["session"] = session

Expand All @@ -517,33 +553,28 @@ async def on_completion_detected(detection: BaseDetection) -> None:
completion_reason = initial.reason
completion_event.set()

# Lazy install: defer register_listeners until an operator
# opens the wrapper. Closes the substrate-URL leak — the
# in-page watcher only runs after wrapper auth via the
# access token. session_timeout bounds the wait.
timed_out = False
try:
if not completion_event.is_set():
await asyncio.wait_for(
session.presence.wait_until_connected(),
timeout=self.server.session_timeout,
)
# Lazy install gates listener registration on first connect
# (substrate-URL leak defense). The three-way race below
# starts immediately so access_timeout can fire before any
# connect; a separate side task arms listeners on connect.
timeout_cause: Literal["access", "completion"] | None = None

async def install_listeners_after_connect() -> None:
await session.presence.wait_until_connected()
if completion_event.is_set():
return
listener_cleanups.append(
on.register_listeners(page, on_completion_detected)
)

listener_install_task = asyncio.create_task(
install_listeners_after_connect()
)
try:
if not completion_event.is_set():
listener_cleanups.append(
on.register_listeners(page, on_completion_detected)
timeout_cause = await self._await_timeout_cause(
session, completion_event
)

await asyncio.wait_for(
completion_event.wait(),
timeout=self.server.session_timeout,
)
except asyncio.TimeoutError:
timed_out = True
logger.warning(
"Handoff session_timeout: human did not finish "
"within %.0fs", self.server.session_timeout,
)
except asyncio.CancelledError:
# Caller (per-step timeout, ctrl-c, explicit cancel)
# gave up. Push a task_cancelled event so the wrapper
Expand All @@ -554,6 +585,16 @@ async def on_completion_detected(detection: BaseDetection) -> None:
with suppress(Exception):
await server.notify_task_cancelled(session_id)
raise
finally:
listener_install_task.cancel()
with suppress(asyncio.CancelledError, Exception):
await listener_install_task

timed_out = timeout_cause is not None
if timed_out:
logger.warning(
"Handoff %s_timeout fired", timeout_cause,
)

await server.stop_screencast(session_id)

Expand All @@ -565,6 +606,7 @@ async def on_completion_detected(detection: BaseDetection) -> None:
return HandoffResult(
was_blocked=True,
timed_out=timed_out,
timeout_cause=timeout_cause,
scenario_name=name,
trigger_reason=reason,
completion_reason=None if timed_out else completion_reason,
Expand All @@ -579,6 +621,63 @@ async def on_completion_detected(detection: BaseDetection) -> None:
await server.unregister_session(session_id)
await self._release_server()

@staticmethod
async def _await_timeout_cause(
session: "Any", completion_event: asyncio.Event
) -> Literal["access", "completion"] | None:
"""Return the timer that fired, or None if detection matched first.

- "access": pre-first-connect window expired.
- "completion": post-first-connect work budget expired.
- None: detection matched (completion_event set).

Sets `session.access_timer_fired` right before returning "access"
so the WS guard can reject late operator clicks.
"""
async def access_timeout_branch() -> Literal["access"]:
if session.access_timeout is None:
await asyncio.Event().wait() # never fires
try:
await asyncio.wait_for(
session.presence.wait_until_connected(),
timeout=session.access_timeout,
)
# Connect won; retire. Block until outer cancels us.
await asyncio.Event().wait()
except asyncio.TimeoutError:
session.access_timer_fired = True
return "access"
# unreachable
return "access"

async def completion_timeout_branch() -> Literal["completion"]:
await session.presence.wait_until_connected()
if session.completion_timeout is None:
await asyncio.Event().wait() # never fires
await asyncio.sleep(session.completion_timeout)
return "completion"

async def match_branch() -> None:
await completion_event.wait()
return None

tasks = [
asyncio.create_task(access_timeout_branch()),
asyncio.create_task(completion_timeout_branch()),
asyncio.create_task(match_branch()),
]
try:
done, _pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
return done.pop().result()
finally:
for t in tasks:
t.cancel()
for t in tasks:
with suppress(asyncio.CancelledError, Exception):
await t

async def _acquire_server(self) -> StreamingServer:
"""Return the shared streaming server, starting it on first use.

Expand Down
Loading