Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion api/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from core.routing import (
RouteDecision,
WORKFLOW_ANNOUNCE_READY_ISSUE,
WORKFLOW_CANCEL_REVIEW_RUNS,
WORKFLOW_PLAN_APPROVED,
needs_triage_bot_author_allowlist,
route_event,
Expand Down Expand Up @@ -133,6 +134,7 @@ def process_webhook_request(
store: StateStore | None = None,
sync_plan_approved: Callable[[Mapping[str, Any]], dict[str, Any] | None] | None = None,
sync_announce_ready_issue: Callable[[Mapping[str, Any]], dict[str, Any]] | None = None,
sync_cancel_review_runs: Callable[[Mapping[str, Any]], dict[str, Any]] | None = None,
triage_bot_author_allowlist: Iterable[str] | None = None,
triage_bot_author_allowlist_loader: Callable[[Mapping[str, Any]], Iterable[str]] | None = None,
) -> WebhookResponse:
Expand Down Expand Up @@ -265,6 +267,25 @@ def process_webhook_request(
body={**base_body, "announce_ready_issue": outcome},
)

# ``cancel-review-runs`` is fully synchronous: the webhook cancels
# any in-flight review runs for the closed PR and never dispatches
# a cloud agent.
if decision.workflow == WORKFLOW_CANCEL_REVIEW_RUNS:
if sync_cancel_review_runs is None:
return WebhookResponse(status=202, body=base_body)
try:
outcome = sync_cancel_review_runs(payload)
except Exception as exc:
logger.exception("Synchronous cancel-review-runs failed")
return WebhookResponse(
status=500,
body={**base_body, "error": f"cancel-review-runs path failed: {exc}"},
)
return WebhookResponse(
status=202,
body={**base_body, "cancel_review_runs": outcome},
)

if builder_registry is None or runner is None or config_factory is None or store is None:
# The webhook handler is partially wired (e.g. unit tests that
# only exercise routing). Keep returning 202 + reason so the
Expand Down Expand Up @@ -357,6 +378,7 @@ def do_POST(self) -> None: # noqa: N802 - signature comes from BaseHTTPRequestH
store=wiring["store"],
sync_plan_approved=wiring["sync_plan_approved"],
sync_announce_ready_issue=wiring["sync_announce_ready_issue"],
sync_cancel_review_runs=wiring["sync_cancel_review_runs"],
triage_bot_author_allowlist_loader=wiring[
"triage_bot_author_allowlist_loader"
],
Expand Down Expand Up @@ -390,6 +412,7 @@ def _build_runtime_wiring(*, body: bytes) -> dict[str, Any]:

from api.cron import build_state_store
from core.builders import build_builder_registry
from core.cancel_runs import cancel_in_flight_review_runs
from core.github_app import fetch_installation_token
from oz.oz_client import ( # type: ignore[import-not-found]
build_agent_config,
Expand Down Expand Up @@ -529,6 +552,16 @@ def sync_announce_ready_issue(
repo_handle, payload=payload
)

store = build_state_store()

def sync_cancel_review_runs(payload: Mapping[str, Any]) -> dict[str, Any]:
return cancel_in_flight_review_runs(
store=store,
canceller=sdk_client.agent.runs.cancel,
payload=payload,
github_client_factory=_mint_github_client,
)

def triage_bot_author_allowlist_loader(payload: Mapping[str, Any]) -> frozenset[str]:
full_name = str(
(payload.get("repository") or {}).get("full_name") or ""
Expand All @@ -549,9 +582,10 @@ def triage_bot_author_allowlist_loader(payload: Mapping[str, Any]) -> frozenset[
"builder_registry": builder_registry,
"runner": runner,
"config_factory": config_factory,
"store": build_state_store(),
"store": store,
"sync_plan_approved": sync_plan_approved,
"sync_announce_ready_issue": sync_announce_ready_issue,
"sync_cancel_review_runs": sync_cancel_review_runs,
"triage_bot_author_allowlist_loader": triage_bot_author_allowlist_loader,
}

Expand Down
233 changes: 233 additions & 0 deletions core/cancel_runs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
"""Eagerly cancel in-flight review runs when a pull request closes.

The webhook routes ``pull_request.closed`` to this fully-synchronous
path. The helper scans the in-flight KV records for review runs that
target the closed PR, cancels them via the Oz API, and deletes the KV
record on success so the cron poller never treats the cancellation as
a workflow failure. Cancel failures fail open: the record is left in
place and the cron drains the run with today's semantics.

Webhook deliveries are not ordered, so a stale ``closed`` event can
arrive after the PR was already reopened (and a fresh review run
dispatched). Before cancelling anything, the helper re-checks the
PR's live state on GitHub and skips cancellation unless the PR is
still closed.
"""

from __future__ import annotations

import logging
import time
from typing import Any, Callable, Mapping

from .routing import WORKFLOW_REVIEW_PR
from .state import RunState, StateStore, delete_run_state, list_in_flight_runs
from .workflow_adapters import reconstruct_progress

logger = logging.getLogger(__name__)

# The cancel endpoint returns 409 while a run is still PENDING; one short
# retry covers the dispatch-to-queued transition without blocking the
# webhook for long.
_PENDING_RETRY_DELAY_SECONDS = 1.0
_PENDING_STATUS_CODE = 409

CANCELLED_PROGRESS_MESSAGE = (
"I cancelled the in-progress review run because this pull request was closed."
)


def _status_code(exc: Exception) -> int:
try:
return int(getattr(exc, "status_code", 0) or 0)
except (TypeError, ValueError):
return 0


def _cancel_run(
canceller: Callable[[str], Any],
run_id: str,
*,
sleep: Callable[[float], None],
) -> bool:
for attempt in range(2):
try:
canceller(run_id)
return True
except Exception as exc:
if _status_code(exc) == _PENDING_STATUS_CODE and attempt == 0:
sleep(_PENDING_RETRY_DELAY_SECONDS)
continue
logger.warning(
"cancel-review-runs: failed to cancel run %s; leaving its "
"in-flight record for the cron poller: %s",
run_id,
exc,
)
return False
return False


def _complete_progress_comment(
state: RunState,
*,
github_client_factory: Callable[[int], Any] | None,
) -> None:
if github_client_factory is None:
return
try:
client = github_client_factory(state.installation_id)
repo_handle = client.get_repo(state.repo)
progress = reconstruct_progress(
repo_handle,
state=state,
workflow=WORKFLOW_REVIEW_PR,
)
progress.complete(CANCELLED_PROGRESS_MESSAGE)
except Exception:
logger.exception(
"cancel-review-runs: failed to update progress comment for run %s",
state.run_id,
)


def _live_pr_state(
*,
github_client_factory: Callable[[int], Any],
installation_id: int,
repo_full_name: str,
pr_number: int,
) -> str:
"""Return the PR's current state on GitHub, or ``""`` when unknown."""
try:
pr = (
github_client_factory(installation_id)
.get_repo(repo_full_name)
.get_pull(pr_number)
)
return str(getattr(pr, "state", "") or "").strip().lower()
except Exception:
logger.exception(
"cancel-review-runs: failed to verify state of %s PR #%s",
repo_full_name,
pr_number,
)
return ""


def _matches_pr(state: RunState, *, repo_full_name: str, pr_number: int) -> bool:
if state.workflow != WORKFLOW_REVIEW_PR:
return False
if state.repo.lower() != repo_full_name.lower():
return False
try:
return int((state.payload_subset or {}).get("pr_number") or 0) == pr_number
except (TypeError, ValueError):
return False


def cancel_in_flight_review_runs(
*,
store: StateStore,
canceller: Callable[[str], Any],
payload: Mapping[str, Any],
github_client_factory: Callable[[int], Any] | None = None,
sleep: Callable[[float], None] = time.sleep,
) -> dict[str, Any]:
"""Cancel every in-flight review run targeting the closed PR.

Returns a structured outcome the webhook surfaces in the 202
response body:

- ``{"action": "skipped", "reason": ...}`` when the payload is
missing the repository slug or PR number, or when the PR is not
verifiably still closed on GitHub (stale ``closed`` delivery
after a reopen, or a failed state lookup).
- ``{"action": "noop", ...}`` when no in-flight review run targets
the PR (the common case).
- ``{"action": "cancelled", "cancelled_run_ids": [...],
"failed_run_ids": [...]}`` otherwise. Failed cancels keep their
KV record so the cron poller drains them as before.
"""
repo_payload = payload.get("repository") or {}
full_name = str(
repo_payload.get("full_name") or "" if isinstance(repo_payload, dict) else ""
).strip()
pr_payload = payload.get("pull_request") or {}
try:
pr_number = int(
(pr_payload.get("number") if isinstance(pr_payload, dict) else 0) or 0
)
except (TypeError, ValueError):
pr_number = 0
if "/" not in full_name or pr_number <= 0:
return {
"action": "skipped",
"reason": "missing repository.full_name or pull_request.number",
}

matches = [

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ [IMPORTANT] Before selecting runs to cancel, verify the PR is still closed (or otherwise guard against stale close deliveries); a delayed pull_request.closed webhook after a quick reopen will match by repo/PR number here and cancel the fresh review run that reopened just dispatched.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably very rare and deals with the case that a reopen webhook event's processed before a close event (since webhook events aren't inherently ordered?). Latest commit includes a fix for this race.

state
for state in list_in_flight_runs(store)
if _matches_pr(state, repo_full_name=full_name, pr_number=pr_number)
]
if not matches:
return {
"action": "noop",
"reason": "no in-flight review runs",
"pr_number": pr_number,
}

# Deliveries can arrive late or out of order, so the event alone
# does not prove the PR is still closed. Only cancel when the live
# GitHub state confirms it; an unknown state also skips so a stale
# delivery can never kill a run dispatched for a reopened PR.
if github_client_factory is not None:
try:
installation_id = int(
(payload.get("installation") or {}).get("id") or 0
)
except (TypeError, ValueError):
installation_id = 0
if installation_id <= 0:
installation_id = matches[0].installation_id
pr_state = _live_pr_state(
github_client_factory=github_client_factory,
installation_id=installation_id,
repo_full_name=full_name,
pr_number=pr_number,
)
if pr_state != "closed":
return {
"action": "skipped",
"reason": (
"pull request is no longer closed"
if pr_state == "open"
else "could not verify pull request is closed"
),
"pr_number": pr_number,
}

cancelled_run_ids: list[str] = []
failed_run_ids: list[str] = []
for state in matches:
if not _cancel_run(canceller, state.run_id, sleep=sleep):
failed_run_ids.append(state.run_id)
continue
delete_run_state(store, state.run_id)
cancelled_run_ids.append(state.run_id)
_complete_progress_comment(
state, github_client_factory=github_client_factory
)
return {
"action": "cancelled",
"pr_number": pr_number,
"cancelled_run_ids": cancelled_run_ids,
"failed_run_ids": failed_run_ids,
}


__all__ = [
"CANCELLED_PROGRESS_MESSAGE",
"cancel_in_flight_review_runs",
]
11 changes: 11 additions & 0 deletions core/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

- ``opened`` / ``reopened`` (non-draft) and ``ready_for_review`` route to
``review-pull-request``.
- ``closed`` routes to ``cancel-review-runs`` so the webhook can
eagerly cancel in-flight review runs for the PR.
- ``review_requested`` routes to ``review-pull-request`` when
the requested reviewer is ``oz-agent``.
- ``labeled`` routes to ``review-pull-request`` for the
Expand Down Expand Up @@ -91,6 +93,7 @@
WORKFLOW_CREATE_IMPLEMENTATION_FROM_ISSUE = "create-implementation-from-issue"
WORKFLOW_PLAN_APPROVED = "plan-approved"
WORKFLOW_ANNOUNCE_READY_ISSUE = "announce-ready-issue"
WORKFLOW_CANCEL_REVIEW_RUNS = "cancel-review-runs"

OZ_AGENT_LOGIN = "oz-agent"
OZ_REVIEW_LABEL = "oz-review"
Expand Down Expand Up @@ -419,6 +422,13 @@ def _route_pull_request(payload: dict[str, Any]) -> RouteDecision:
pr = payload.get("pull_request") or {}
if not isinstance(pr, dict):
return RouteDecision(None, "missing pull_request payload")
if action == "closed":
# Both merged and unmerged closes route here so any in-flight
# review run for the PR can be eagerly cancelled.
return RouteDecision(
WORKFLOW_CANCEL_REVIEW_RUNS,
"pull_request closed; cancelling in-flight review runs",
)
if pr.get("state") != "open":
return RouteDecision(None, "pull_request is not open")
if action in {"opened", "reopened"} and not pr.get("draft", False):
Expand Down Expand Up @@ -538,6 +548,7 @@ def route_event(
"RouteDecision",
"TRIAGED_LABEL",
"WORKFLOW_ANNOUNCE_READY_ISSUE",
"WORKFLOW_CANCEL_REVIEW_RUNS",
"WORKFLOW_CREATE_IMPLEMENTATION_FROM_ISSUE",
"WORKFLOW_CREATE_SPEC_FROM_ISSUE",
"WORKFLOW_PLAN_APPROVED",
Expand Down
Loading
Loading