diff --git a/apps/api/openapi.json b/apps/api/openapi.json index fa0a03f26..48be56110 100644 --- a/apps/api/openapi.json +++ b/apps/api/openapi.json @@ -4656,6 +4656,46 @@ "title": "DiscardSubjectRequest", "type": "object" }, + "DismissEventInReactionRequest": { + "description": "Body for `POST /agent/reactions/{subscriber_name}/dismiss-event`.", + "properties": { + "event_id": { + "description": "The event_id the operator wants the subscriber's bookmark advanced past. Read from the operator dashboard's wedged-bookmark surface (last_error_message + last_event_recorded_at on projection_bookmarks); pasting an arbitrary event_id risks rewinding the bookmark, which the decider catches via the lexicographic (transaction_id, position) check.", + "format": "uuid", + "title": "Event Id", + "type": "string" + }, + "reason": { + "description": "Free-form reason the operator gives for the dismissal (1-500 chars after trimming). Captured verbatim into DecisionRegistered.reasoning for audit. Operationally: 'cannot deserialize this event_type since the rename, advancing past' / 'LLM returned hallucinated context, manual retry tomorrow' etc.", + "maxLength": 500, + "minLength": 1, + "title": "Reason", + "type": "string" + } + }, + "required": [ + "event_id", + "reason" + ], + "title": "DismissEventInReactionRequest", + "type": "object" + }, + "DismissEventInReactionResponse": { + "description": "Body returned on successful dismissal.", + "properties": { + "decision_id": { + "description": "The Decision audit record's id.", + "format": "uuid", + "title": "Decision Id", + "type": "string" + } + }, + "required": [ + "decision_id" + ], + "title": "DismissEventInReactionResponse", + "type": "object" + }, "DismountSubjectRequest": { "description": "Body for `POST /subjects/{subject_id}/dismount`.", "properties": { @@ -11835,6 +11875,124 @@ ] } }, + "/agent/reactions/{subscriber_name}/dismiss-event": { + "post": { + "operationId": "post_agent_reactions_dismiss_event_agent_reactions__subscriber_name__dismiss_event_post", + "parameters": [ + { + "description": "Name of the subscriber (matches `Reaction.name`).", + "in": "path", + "name": "subscriber_name", + "required": true, + "schema": { + "description": "Name of the subscriber (matches `Reaction.name`).", + "maxLength": 200, + "minLength": 1, + "title": "Subscriber Name", + "type": "string" + } + }, + { + "description": "Legacy principal-id header (trust-the-proxy shape). When IDENTITY_PROVIDERS is configured (bearer-auth mode), this header is IGNORED and the verified bearer token from `BearerAuthMiddleware` (Authorization: Bearer) sets the principal. When no IdPs are configured (legacy mode), the application TRUSTS this header (no cryptographic verification) -- production deployments in legacy mode MUST front the API with an auth proxy that strips any client-supplied X-Principal-Id and sets it to the verified principal UUID. Behavior when absent: see Settings.require_authenticated_principal.", + "in": "header", + "name": "X-Principal-Id", + "required": false, + "schema": { + "anyOf": [ + { + "format": "uuid", + "type": "string" + }, + { + "type": "null" + } + ], + "description": "Legacy principal-id header (trust-the-proxy shape). When IDENTITY_PROVIDERS is configured (bearer-auth mode), this header is IGNORED and the verified bearer token from `BearerAuthMiddleware` (Authorization: Bearer) sets the principal. When no IdPs are configured (legacy mode), the application TRUSTS this header (no cryptographic verification) -- production deployments in legacy mode MUST front the API with an auth proxy that strips any client-supplied X-Principal-Id and sets it to the verified principal UUID. Behavior when absent: see Settings.require_authenticated_principal.", + "title": "X-Principal-Id" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/DismissEventInReactionRequest" + } + } + }, + "required": true + }, + "responses": { + "201": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/DismissEventInReactionResponse" + } + } + }, + "description": "Successful Response" + }, + "400": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + }, + "description": "Domain invariant violated: empty / oversize reason (InvalidDismissalReasonError)." + }, + "403": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + }, + "description": "Authorize port denied the command." + }, + "404": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + }, + "description": "No projection_bookmarks row for the named subscriber (SubscriberBookmarkNotFoundError) OR no event row for the supplied event_id (DismissalEventNotFoundError)." + }, + "409": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + }, + "description": "The bookmark is already at or past the target event (EventAlreadyDismissedError); refresh the dashboard and re-evaluate before retrying." + }, + "422": { + "description": "Path parameter or request body failed schema validation." + }, + "503": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + }, + "description": "The deployment has no Postgres pool (DismissalRequiresPostgresError); the slice is a no-op without projection_bookmarks. Production deployments never see this." + } + }, + "summary": "Dismiss a poison event on a Reaction's bookmark", + "tags": [ + "agent" + ] + } + }, "/agents": { "post": { "operationId": "post_agents_agents_post", diff --git a/apps/api/src/cora/agent/_subscribers.py b/apps/api/src/cora/agent/_subscribers.py index 76f4d9d94..b192661b3 100644 --- a/apps/api/src/cora/agent/_subscribers.py +++ b/apps/api/src/cora/agent/_subscribers.py @@ -29,10 +29,24 @@ via the `promote_caution_proposal` slice. Both subscribers run concurrently and INDEPENDENTLY in the -projection worker. Per [[project-caution-drafter-design]] Q4 lock: -DO NOT widen the subscriber framework. Named widening -triggers (3rd subscriber / >50ms blocking / first cross-subscriber -ordering dependency) documented in the design memo. +projection worker. Both classify as `Reaction` (the public sibling +to `Projection`) and pin `batch_size = 1` on the class so the +worker bounds the bookmark transaction to a single LLM round-trip. + +## Subscriber framework widening status + +Per [[project-caution-drafter-design]] Q4 lock the original +widening triggers were "3rd subscriber / >50ms blocking / first +cross-subscriber ordering dependency". The >50ms-blocking trigger +FIRED at first-ship (each LLM call is 5-15 s), and the framework +widened minimally: the `Reaction` Protocol was added as the sibling +to `Projection` and per-subscriber `batch_size` enforcement landed +so the docstring's `batch_size=1` claim became real. + +Further widening (separate `ReactionWorker` with its own pool +budget, operator escape-hatch for wedged bookmarks via +`dismiss_event_in_reaction`) stays deferred behind the next named +triggers: 3rd Reaction OR first wedged-bookmark incident. """ from __future__ import annotations diff --git a/apps/api/src/cora/agent/errors.py b/apps/api/src/cora/agent/errors.py index 0dc503ede..e80802c15 100644 --- a/apps/api/src/cora/agent/errors.py +++ b/apps/api/src/cora/agent/errors.py @@ -106,3 +106,83 @@ def __init__( self.decision_id = decision_id self.actor_id = actor_id self.observed_kind = observed_kind + + +class InvalidDismissalReasonError(Exception): + """The `dismiss_event_in_reaction` reason failed validation (empty, + too long, or otherwise out of bounds). Maps to HTTP 400.""" + + def __init__(self, reason: str) -> None: + super().__init__(reason) + self.reason = reason + + +class SubscriberBookmarkNotFoundError(Exception): + """No `projection_bookmarks` row matches the supplied subscriber + name. Either the subscriber is misspelled, or its migration + hasn't landed in this deployment. Maps to HTTP 404.""" + + def __init__(self, subscriber_name: str) -> None: + super().__init__( + f"No projection_bookmarks row for subscriber {subscriber_name!r}. " + "Check the spelling against the registered Reactions / Projections, " + "or verify the subscriber's migration has been applied." + ) + self.subscriber_name = subscriber_name + + +class DismissalEventNotFoundError(Exception): + """The supplied event_id does not exist in the events table. + Either the operator pasted the wrong id, or the event was + canonicalized away by a maintenance migration. Maps to HTTP 404.""" + + def __init__(self, event_id: UUID) -> None: + super().__init__( + f"No event row for event_id {event_id}. The operator dashboard " + "should be the source of truth for the wedged event's id." + ) + self.event_id = event_id + + +class EventAlreadyDismissedError(Exception): + """The bookmark is already at or past the target event, so the + dismissal would be a no-op (or worse, would rewind the cursor). + Surfaces as 409 so the operator can refresh the dashboard and + re-evaluate.""" + + def __init__( + self, + *, + subscriber_name: str, + event_id: UUID, + bookmark_transaction_id: int, + bookmark_position: int, + event_transaction_id: int, + event_position: int, + ) -> None: + super().__init__( + f"Subscriber {subscriber_name!r} bookmark " + f"({bookmark_transaction_id}, {bookmark_position}) is at or past " + f"event {event_id} cursor ({event_transaction_id}, " + f"{event_position}); dismissal would be a no-op or rewind." + ) + self.subscriber_name = subscriber_name + self.event_id = event_id + self.bookmark_transaction_id = bookmark_transaction_id + self.bookmark_position = bookmark_position + self.event_transaction_id = event_transaction_id + self.event_position = event_position + + +class DismissalRequiresPostgresError(Exception): + """`dismiss_event_in_reaction` requires a Postgres pool because the + bookmark advance is a SQL UPDATE on `projection_bookmarks`. The + in-memory test adapter has no such table; raise so the operator + sees a clear "not in this configuration" instead of a stale-cache + success. Maps to HTTP 503.""" + + def __init__(self) -> None: + super().__init__( + "dismiss_event_in_reaction requires a Postgres pool; the " + "in-memory adapter has no projection_bookmarks table to advance." + ) diff --git a/apps/api/src/cora/agent/features/__init__.py b/apps/api/src/cora/agent/features/__init__.py index fe389ec85..66de5ba34 100644 --- a/apps/api/src/cora/agent/features/__init__.py +++ b/apps/api/src/cora/agent/features/__init__.py @@ -33,11 +33,19 @@ idempotency-wrapped; Pattern C) Also: deprecate's source set widens to include Suspended. + +Reaction recovery: + - `dismiss_event_in_reaction` (operator-triggered atomic bookmark + advance + DecisionRegistered audit + via append_streams(conn=); recovers + a wedged Reaction bookmark without + SSH access to projection_bookmarks) """ from cora.agent.features import ( define_agent, deprecate_agent, + dismiss_event_in_reaction, get_agent, grant_tool_to_agent, promote_caution_proposal, @@ -52,6 +60,7 @@ __all__ = [ "define_agent", "deprecate_agent", + "dismiss_event_in_reaction", "get_agent", "grant_tool_to_agent", "promote_caution_proposal", diff --git a/apps/api/src/cora/agent/features/dismiss_event_in_reaction/__init__.py b/apps/api/src/cora/agent/features/dismiss_event_in_reaction/__init__.py new file mode 100644 index 000000000..e0ff07287 --- /dev/null +++ b/apps/api/src/cora/agent/features/dismiss_event_in_reaction/__init__.py @@ -0,0 +1,32 @@ +"""Vertical slice for the `DismissEventInReaction` command. + +Operator escape-hatch for a wedged Reaction bookmark: advances the +subscriber past one poison event AND records the dismissal as a +`DecisionRegistered` audit row in the same Postgres transaction. + +See [[project-reaction-protocol-design]] (commit 1 of this slice) for +why Reactions are the failure-mode this slice targets. Operationally: +the on-call playbook is "page → dismiss-event with reason → debug +tomorrow" instead of "page → SSH → psql → UPDATE projection_bookmarks." +""" + +from cora.agent.features.dismiss_event_in_reaction import tool +from cora.agent.features.dismiss_event_in_reaction.command import ( + DismissEventInReaction, +) +from cora.agent.features.dismiss_event_in_reaction.decider import ( + DismissalContext, + decide, +) +from cora.agent.features.dismiss_event_in_reaction.handler import Handler, bind +from cora.agent.features.dismiss_event_in_reaction.route import router + +__all__ = [ + "DismissEventInReaction", + "DismissalContext", + "Handler", + "bind", + "decide", + "router", + "tool", +] diff --git a/apps/api/src/cora/agent/features/dismiss_event_in_reaction/command.py b/apps/api/src/cora/agent/features/dismiss_event_in_reaction/command.py new file mode 100644 index 000000000..435230a53 --- /dev/null +++ b/apps/api/src/cora/agent/features/dismiss_event_in_reaction/command.py @@ -0,0 +1,43 @@ +"""Operator command to dismiss a poison event from a Reaction's bookmark. + +Operator-invoked recovery action: when a Reaction (e.g., +`RunDebriefer`, `CautionDrafter`) wedges on a single event the +subscriber's `apply()` cannot process (poison event, schema drift, +unrecoverable LLM failure), the operator hits this slice to: + + 1. Advance the subscriber's `projection_bookmarks` row past the + event, and + 2. Record the dismissal as an auditable `DecisionRegistered` + (`context = "ReactionDismissal"`, `choice = "EventDismissed"`) + so the operator action is preserved in the same audit log as + every other operator judgment call. + +Both writes happen atomically inside a single Postgres transaction +(mirrors the `forget_actor` cross-store pattern). +""" + +from dataclasses import dataclass +from uuid import UUID + + +@dataclass(frozen=True) +class DismissEventInReaction: + """Advance `subscriber_name`'s bookmark past `event_id`; record a + Decision under operator identity. + + Fields: + - `subscriber_name`: the bookmark `name` (matches the Reaction + / Subscriber's `name` attribute). The slice queries + `projection_bookmarks WHERE name = subscriber_name`. + - `event_id`: the event the operator wants to dismiss. The slice + looks it up in the `events` table to resolve its + (transaction_id, position) cursor before advancing. + - `reason`: free-form text the operator supplies explaining the + dismissal. Carried verbatim into `DecisionRegistered.reasoning` + for audit; trimmed to the Decision aggregate's choice-length + bound. + """ + + subscriber_name: str + event_id: UUID + reason: str diff --git a/apps/api/src/cora/agent/features/dismiss_event_in_reaction/decider.py b/apps/api/src/cora/agent/features/dismiss_event_in_reaction/decider.py new file mode 100644 index 000000000..faf035caa --- /dev/null +++ b/apps/api/src/cora/agent/features/dismiss_event_in_reaction/decider.py @@ -0,0 +1,138 @@ +"""Pure decider for the `dismiss_event_in_reaction` slice. + +Inputs are gathered by the handler (current bookmark cursor, target +event cursor + type) and passed in via `DismissalContext` so the +decider stays pure. Output is the single `DecisionRegistered` event +the handler then appends to the Decision stream atomically with the +bookmark advance. + +The slice has no folded aggregate state (the bookmark + event row +are loaded by the handler and passed in via `context`), so `state` +is always None at the canonical decider signature. + +Invariants: + + - Reason must be non-empty after strip with length <= 500 chars + -> InvalidDismissalReasonError. + - Target event must be STRICTLY AFTER the current bookmark in + lexicographic `(transaction_id, position)` order. Equal-or- + behind would be a rewind (or a no-op) and the worker would + redeliver the same event -> EventAlreadyDismissedError. +""" + +from dataclasses import dataclass +from datetime import datetime +from uuid import UUID + +from cora.agent.errors import ( + EventAlreadyDismissedError, + InvalidDismissalReasonError, +) +from cora.agent.features.dismiss_event_in_reaction.command import ( + DismissEventInReaction, +) +from cora.decision.aggregates.decision.events import DecisionRegistered +from cora.decision.aggregates.decision.state import ( + DECISION_CONTEXT_REACTION_DISMISSAL, +) + +_REASON_MIN_LENGTH = 1 +_REASON_MAX_LENGTH = 500 +_DISMISSAL_CHOICE = "EventDismissed" + + +@dataclass(frozen=True) +class DismissalContext: + """Pure inputs the handler resolves before calling `decide`. + + Carries everything the decider needs to validate the dismissal + and stamp the audit event. The handler owns the SQL queries that + populate this; the decider owns the rules that make the dismissal + legal. + """ + + bookmark_transaction_id: int + bookmark_position: int + event_transaction_id: int + event_position: int + event_type: str + event_recorded_at: datetime + + +def decide( + state: None, + command: DismissEventInReaction, + *, + context: DismissalContext, + new_decision_id: UUID, + principal_id: UUID, + now: datetime, +) -> DecisionRegistered: + """Validate the dismissal, return the audit `DecisionRegistered`. + + `state` is always None (no folded aggregate; the bookmark + event + row are loaded directly by the handler and passed via `context`). + Kept as the first positional arg to satisfy the canonical decider + signature (`decide(state, command, *, ...)`). + + The handler also advances `projection_bookmarks` to + `(event_transaction_id, event_position)` atomically with appending + this event; the decider only emits the audit record because the + bookmark advance is an infrastructure side effect, not a domain + event. + """ + _ = state # canonical signature; this slice has no folded state + trimmed_reason = command.reason.strip() + if not trimmed_reason or len(trimmed_reason) < _REASON_MIN_LENGTH: + raise InvalidDismissalReasonError( + f"reason must be non-empty after strip; got {command.reason!r}" + ) + if len(trimmed_reason) > _REASON_MAX_LENGTH: + raise InvalidDismissalReasonError( + f"reason must be at most {_REASON_MAX_LENGTH} chars after strip; " + f"got {len(trimmed_reason)}" + ) + + bookmark_cursor = (context.bookmark_transaction_id, context.bookmark_position) + event_cursor = (context.event_transaction_id, context.event_position) + if event_cursor <= bookmark_cursor: + raise EventAlreadyDismissedError( + subscriber_name=command.subscriber_name, + event_id=command.event_id, + bookmark_transaction_id=context.bookmark_transaction_id, + bookmark_position=context.bookmark_position, + event_transaction_id=context.event_transaction_id, + event_position=context.event_position, + ) + + bluf = ( + f"Operator dismissed event {command.event_id} on subscriber " + f"{command.subscriber_name!r}: {trimmed_reason}" + ) + + inputs: dict[str, object] = { + "subscriber_name": command.subscriber_name, + "event_id": str(command.event_id), + "event_type": context.event_type, + "event_transaction_id": str(context.event_transaction_id), + "event_position": str(context.event_position), + "previous_bookmark_transaction_id": str(context.bookmark_transaction_id), + "previous_bookmark_position": str(context.bookmark_position), + } + + return DecisionRegistered( + decision_id=new_decision_id, + actor_id=principal_id, + context=DECISION_CONTEXT_REACTION_DISMISSAL, + choice=_DISMISSAL_CHOICE, + parent_id=None, + override_kind=None, + rule=None, + reasoning=bluf, + confidence=None, + confidence_source=None, + alternatives=(), + inputs=inputs, + reasoning_signature=None, + occurred_at=now, + ) diff --git a/apps/api/src/cora/agent/features/dismiss_event_in_reaction/handler.py b/apps/api/src/cora/agent/features/dismiss_event_in_reaction/handler.py new file mode 100644 index 000000000..58ff445bf --- /dev/null +++ b/apps/api/src/cora/agent/features/dismiss_event_in_reaction/handler.py @@ -0,0 +1,235 @@ +"""Application handler for the `dismiss_event_in_reaction` slice. + +Atomic two-write recovery action: when an operator confirms a poison +event is stuck on a Reaction's bookmark, this handler advances the +`projection_bookmarks` row past it AND records the dismissal as a +`DecisionRegistered` (context `ReactionDismissal`) in the same +Postgres transaction. Mirrors the `forget_actor` cross-store pattern +(non-event SQL write inside the same `conn.transaction()` that the +event_store.append_streams call shares). + +In-memory mode (no Postgres pool) raises +`DismissalRequiresPostgresError` because the `projection_bookmarks` +table is the load-bearing structure; the in-memory event store has no +equivalent and a Decision-only write would be misleading. + +Pre-load order (handler-side I/O): + + 1. Read the bookmark row FOR UPDATE (locks against concurrent + advance from the projection worker so the bookmark cursor we + read is the one we update). + 2. Read the target event by `event_id` to resolve its + (transaction_id, position) cursor and event_type. + 3. Hand both to the pure decider. + +Authorization runs BEFORE the transaction so a deny short-circuits +without acquiring a pool connection. +""" + +# pyright: reportUnknownMemberType=false, reportUnknownVariableType=false, reportUnknownArgumentType=false + +from typing import Protocol +from uuid import UUID + +from cora.agent.errors import ( + DismissalEventNotFoundError, + DismissalRequiresPostgresError, + SubscriberBookmarkNotFoundError, + UnauthorizedError, +) +from cora.agent.features.dismiss_event_in_reaction.command import ( + DismissEventInReaction, +) +from cora.agent.features.dismiss_event_in_reaction.decider import ( + DismissalContext, + decide, +) +from cora.decision.aggregates.decision import ( + event_type_name, + to_payload, +) +from cora.infrastructure.event_envelope import to_new_event +from cora.infrastructure.kernel import Kernel +from cora.infrastructure.logging import get_logger +from cora.infrastructure.ports import Deny +from cora.infrastructure.ports.event_store import StreamAppend +from cora.infrastructure.routing import NIL_SENTINEL_ID + +_STREAM_TYPE = "Decision" +_COMMAND_NAME = "DismissEventInReaction" + +_log = get_logger(__name__) + +# Read the bookmark cursor + lock the row so concurrent advance from +# the projection worker cannot move the cursor between our read and +# our update. +_READ_BOOKMARK_SQL = """ +SELECT last_transaction_id::text AS last_tx, last_position +FROM projection_bookmarks +WHERE name = $1 +FOR UPDATE +""" + +# Resolve the event's cursor + type. UUID lookup hits the +# `events_event_id_idx` UNIQUE index added with the events table. +_LOAD_EVENT_SQL = """ +SELECT transaction_id::text AS transaction_id_text, + position, + event_type, + recorded_at +FROM events +WHERE event_id = $1 +""" + +# Advance the bookmark to the dismissed event's cursor; reset +# failure-tracking columns (the dismissal is a successful operator +# resolution, so consecutive_failures resets to 0 just like a +# normal advance). Same shape as `_WRITE_BOOKMARK_SQL` in +# infrastructure/projection/bookmark.py. +_ADVANCE_BOOKMARK_SQL = """ +UPDATE projection_bookmarks +SET last_transaction_id = $2::xid8, + last_position = $3, + last_event_recorded_at = $4, + last_error_at = NULL, + last_error_message = NULL, + consecutive_failures = 0, + updated_at = now() +WHERE name = $1 +""" + + +class Handler(Protocol): + """Callable interface every dismiss_event_in_reaction handler implements.""" + + async def __call__( + self, + command: DismissEventInReaction, + *, + principal_id: UUID, + correlation_id: UUID, + causation_id: UUID | None = None, + surface_id: UUID = NIL_SENTINEL_ID, + ) -> UUID: ... + + +def bind(deps: Kernel) -> Handler: + """Build a dismiss_event_in_reaction handler closed over `deps`.""" + + async def handler( + command: DismissEventInReaction, + *, + principal_id: UUID, + correlation_id: UUID, + causation_id: UUID | None = None, + surface_id: UUID = NIL_SENTINEL_ID, + ) -> UUID: + _log.info( + "dismiss_event_in_reaction.start", + command_name=_COMMAND_NAME, + subscriber_name=command.subscriber_name, + event_id=str(command.event_id), + principal_id=str(principal_id), + correlation_id=str(correlation_id), + causation_id=str(causation_id) if causation_id is not None else None, + ) + + authz = await deps.authz.authorize( + principal_id=principal_id, + command_name=_COMMAND_NAME, + conduit_id=NIL_SENTINEL_ID, + surface_id=surface_id, + ) + if isinstance(authz, Deny): + _log.info( + "dismiss_event_in_reaction.denied", + command_name=_COMMAND_NAME, + subscriber_name=command.subscriber_name, + event_id=str(command.event_id), + principal_id=str(principal_id), + correlation_id=str(correlation_id), + reason=authz.reason, + ) + raise UnauthorizedError(authz.reason) + + if deps.pool is None: + raise DismissalRequiresPostgresError() + + new_decision_id = deps.id_generator.new_id() + now = deps.clock.now() + + async with deps.pool.acquire() as conn, conn.transaction(): + bookmark_row = await conn.fetchrow(_READ_BOOKMARK_SQL, command.subscriber_name) + if bookmark_row is None: + raise SubscriberBookmarkNotFoundError(command.subscriber_name) + bookmark_tx = int(bookmark_row["last_tx"]) + bookmark_pos = int(bookmark_row["last_position"]) + + event_row = await conn.fetchrow(_LOAD_EVENT_SQL, command.event_id) + if event_row is None: + raise DismissalEventNotFoundError(command.event_id) + + context = DismissalContext( + bookmark_transaction_id=bookmark_tx, + bookmark_position=bookmark_pos, + event_transaction_id=int(event_row["transaction_id_text"]), + event_position=int(event_row["position"]), + event_type=str(event_row["event_type"]), + event_recorded_at=event_row["recorded_at"], + ) + + domain_event = decide( + None, + command, + context=context, + new_decision_id=new_decision_id, + principal_id=principal_id, + now=now, + ) + + new_event = to_new_event( + event_type=event_type_name(domain_event), + payload=to_payload(domain_event), + occurred_at=domain_event.occurred_at, + event_id=deps.id_generator.new_id(), + command_name=_COMMAND_NAME, + correlation_id=correlation_id, + causation_id=causation_id, + principal_id=principal_id, + ) + + await conn.execute( + _ADVANCE_BOOKMARK_SQL, + command.subscriber_name, + context.event_transaction_id, + context.event_position, + context.event_recorded_at, + ) + + await deps.event_store.append_streams( + [ + StreamAppend( + stream_type=_STREAM_TYPE, + stream_id=new_decision_id, + expected_version=0, + events=[new_event], + ) + ], + conn=conn, + ) + + _log.info( + "dismiss_event_in_reaction.success", + command_name=_COMMAND_NAME, + subscriber_name=command.subscriber_name, + event_id=str(command.event_id), + decision_id=str(new_decision_id), + advanced_to_transaction_id=context.event_transaction_id, + advanced_to_position=context.event_position, + principal_id=str(principal_id), + correlation_id=str(correlation_id), + causation_id=str(causation_id) if causation_id is not None else None, + ) + return new_decision_id + + return handler diff --git a/apps/api/src/cora/agent/features/dismiss_event_in_reaction/route.py b/apps/api/src/cora/agent/features/dismiss_event_in_reaction/route.py new file mode 100644 index 000000000..b062910ac --- /dev/null +++ b/apps/api/src/cora/agent/features/dismiss_event_in_reaction/route.py @@ -0,0 +1,153 @@ +"""HTTP route for the `dismiss_event_in_reaction` slice. + +Operator action endpoint: +`POST /agent/reactions/{subscriber_name}/dismiss-event` body +`{event_id, reason}`. Returns the new decision_id (201 Created so the +caller has the audit handle to follow up with `rate_decision` etc.). + +The 503 response covers the in-memory configuration; production +deployments never see it. +""" + +from typing import Annotated +from uuid import UUID + +from fastapi import APIRouter, Depends, Path, Request, status +from pydantic import BaseModel, Field + +from cora.agent.features.dismiss_event_in_reaction.command import ( + DismissEventInReaction, +) +from cora.agent.features.dismiss_event_in_reaction.handler import Handler +from cora.infrastructure.routing import ( + ErrorResponse, + get_correlation_id, + get_principal_id, + get_surface_id, +) + +# Mirror of the decider's `_REASON_MAX_LENGTH`. Kept in the route so +# the Pydantic schema short-circuits oversize bodies before the handler +# acquires a pool connection; the decider's check defends against +# direct in-process callers (tests, future sagas). +_REASON_MIN_LENGTH = 1 +_REASON_MAX_LENGTH = 500 + + +class DismissEventInReactionRequest(BaseModel): + """Body for `POST /agent/reactions/{subscriber_name}/dismiss-event`.""" + + event_id: UUID = Field( + ..., + description=( + "The event_id the operator wants the subscriber's bookmark " + "advanced past. Read from the operator dashboard's wedged-" + "bookmark surface (last_error_message + last_event_recorded_at " + "on projection_bookmarks); pasting an arbitrary event_id risks " + "rewinding the bookmark, which the decider catches via the " + "lexicographic (transaction_id, position) check." + ), + ) + reason: str = Field( + ..., + min_length=_REASON_MIN_LENGTH, + max_length=_REASON_MAX_LENGTH, + description=( + "Free-form reason the operator gives for the dismissal " + "(1-500 chars after trimming). Captured verbatim into " + "DecisionRegistered.reasoning for audit. Operationally: " + "'cannot deserialize this event_type since the rename, " + "advancing past' / 'LLM returned hallucinated context, " + "manual retry tomorrow' etc." + ), + ) + + +class DismissEventInReactionResponse(BaseModel): + """Body returned on successful dismissal.""" + + decision_id: UUID = Field( + ..., + description="The Decision audit record's id.", + ) + + +def _get_handler(request: Request) -> Handler: + handler: Handler = request.app.state.agent.dismiss_event_in_reaction + return handler + + +router = APIRouter(tags=["agent"]) + + +@router.post( + "/agent/reactions/{subscriber_name}/dismiss-event", + status_code=status.HTTP_201_CREATED, + responses={ + status.HTTP_400_BAD_REQUEST: { + "model": ErrorResponse, + "description": ( + "Domain invariant violated: empty / oversize reason (InvalidDismissalReasonError)." + ), + }, + status.HTTP_403_FORBIDDEN: { + "model": ErrorResponse, + "description": "Authorize port denied the command.", + }, + status.HTTP_404_NOT_FOUND: { + "model": ErrorResponse, + "description": ( + "No projection_bookmarks row for the named subscriber " + "(SubscriberBookmarkNotFoundError) OR no event row for " + "the supplied event_id (DismissalEventNotFoundError)." + ), + }, + status.HTTP_409_CONFLICT: { + "model": ErrorResponse, + "description": ( + "The bookmark is already at or past the target event " + "(EventAlreadyDismissedError); refresh the dashboard " + "and re-evaluate before retrying." + ), + }, + status.HTTP_422_UNPROCESSABLE_CONTENT: { + "description": "Path parameter or request body failed schema validation.", + }, + status.HTTP_503_SERVICE_UNAVAILABLE: { + "model": ErrorResponse, + "description": ( + "The deployment has no Postgres pool " + "(DismissalRequiresPostgresError); the slice is a no-op " + "without projection_bookmarks. Production deployments " + "never see this." + ), + }, + }, + summary="Dismiss a poison event on a Reaction's bookmark", +) +async def post_agent_reactions_dismiss_event( + subscriber_name: Annotated[ + str, + Path( + min_length=1, + max_length=200, + description="Name of the subscriber (matches `Reaction.name`).", + ), + ], + body: DismissEventInReactionRequest, + handler: Annotated[Handler, Depends(_get_handler)], + cid: Annotated[UUID, Depends(get_correlation_id)], + principal_id: Annotated[UUID, Depends(get_principal_id)], + surface_id: Annotated[UUID, Depends(get_surface_id)], +) -> DismissEventInReactionResponse: + decision_id = await handler( + DismissEventInReaction( + subscriber_name=subscriber_name, + event_id=body.event_id, + reason=body.reason, + ), + principal_id=principal_id, + correlation_id=cid, + surface_id=surface_id, + ) + return DismissEventInReactionResponse(decision_id=decision_id) diff --git a/apps/api/src/cora/agent/features/dismiss_event_in_reaction/tool.py b/apps/api/src/cora/agent/features/dismiss_event_in_reaction/tool.py new file mode 100644 index 000000000..1fe8a7c0a --- /dev/null +++ b/apps/api/src/cora/agent/features/dismiss_event_in_reaction/tool.py @@ -0,0 +1,74 @@ +"""MCP tool for the `dismiss_event_in_reaction` slice.""" + +from collections.abc import Callable +from typing import Annotated, Any +from uuid import UUID + +from mcp.server.fastmcp import Context, FastMCP +from pydantic import Field + +from cora.agent.features.dismiss_event_in_reaction.command import ( + DismissEventInReaction, +) +from cora.agent.features.dismiss_event_in_reaction.handler import Handler +from cora.infrastructure.mcp_principal import get_mcp_principal_id +from cora.infrastructure.observability import current_correlation_id +from cora.infrastructure.routing import get_mcp_surface_id + + +def register(mcp: FastMCP, *, get_handler: Callable[[], Handler]) -> None: + """Register the `dismiss_event_in_reaction` tool on the given MCP server.""" + + @mcp.tool( + name="dismiss_event_in_reaction", + description=( + "Advance a Reaction's projection_bookmarks row past a single " + "poison event so the worker can resume. Records the dismissal " + "as an auditable Decision (context=ReactionDismissal, " + "choice=EventDismissed) so the operator action is preserved " + "alongside every other operator judgment call. Use when an " + "LLM-bound Reaction (run_debriefer, caution_drafter) wedges " + "on a single event the apply path cannot process; the operator " + "dashboard surfaces the event_id and last_error_message. " + "Rejects if the event is already past the bookmark (no " + "rewinds). Returns the new Decision id." + ), + ) + async def dismiss_event_in_reaction_tool( # pyright: ignore[reportUnusedFunction] + ctx: Context[Any, Any, Any], + subscriber_name: Annotated[ + str, + Field( + min_length=1, + max_length=200, + description="Name of the subscriber (matches `Reaction.name`).", + ), + ], + event_id: Annotated[ + UUID, + Field(description="The event_id the operator wants to dismiss."), + ], + reason: Annotated[ + str, + Field( + min_length=1, + max_length=500, + description=( + "Free-form reason captured into the Decision's reasoning " + "(1-500 chars after trimming)." + ), + ), + ], + ) -> dict[str, str]: + handler = get_handler() + decision_id = await handler( + DismissEventInReaction( + subscriber_name=subscriber_name, + event_id=event_id, + reason=reason, + ), + principal_id=get_mcp_principal_id(ctx), + correlation_id=current_correlation_id(), + surface_id=get_mcp_surface_id(), + ) + return {"decision_id": str(decision_id)} diff --git a/apps/api/src/cora/agent/routes.py b/apps/api/src/cora/agent/routes.py index e88e36e6b..4da177e64 100644 --- a/apps/api/src/cora/agent/routes.py +++ b/apps/api/src/cora/agent/routes.py @@ -60,11 +60,17 @@ CautionProposalNotActionableError, DecisionNotCautionProposalError, DecisionNotEmittedByCautionDrafterError, + DismissalEventNotFoundError, + DismissalRequiresPostgresError, + EventAlreadyDismissedError, + InvalidDismissalReasonError, + SubscriberBookmarkNotFoundError, UnauthorizedError, ) from cora.agent.features import ( define_agent, deprecate_agent, + dismiss_event_in_reaction, get_agent, grant_tool_to_agent, promote_caution_proposal, @@ -121,7 +127,8 @@ async def _handle_cannot_transition(request: Request, exc: Exception) -> JSONRes """Shared 409 handler for state-transition guards. Covers the `AgentCannotError` family: Version + Deprecate + - Suspend + Resume + GrantTool + RevokeTool + ReviseBudget. + Suspend + Resume + GrantTool + RevokeTool + ReviseBudget; plus + `EventAlreadyDismissedError` from `dismiss_event_in_reaction`. """ _ = request return JSONResponse( @@ -130,6 +137,19 @@ async def _handle_cannot_transition(request: Request, exc: Exception) -> JSONRes ) +async def _handle_dismissal_requires_postgres( + request: Request, + exc: Exception, +) -> JSONResponse: + """503 handler for the in-memory-mode rejection on + `dismiss_event_in_reaction`. Production deployments never see this.""" + _ = request + return JSONResponse( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + content={"detail": str(exc)}, + ) + + def register_agent_routes(app: FastAPI) -> None: """Attach Agent slice routers and exception handlers to the FastAPI app.""" app.include_router(define_agent.router) @@ -143,6 +163,7 @@ def register_agent_routes(app: FastAPI) -> None: app.include_router(get_agent.router) app.include_router(regenerate_run_debrief.router) app.include_router(promote_caution_proposal.router) + app.include_router(dismiss_event_in_reaction.router) # 400 validation handlers: Invalid family + cross-aggregate guards. # # NOT registered here: ParentDecisionAgentMismatchError + @@ -172,9 +193,15 @@ def register_agent_routes(app: FastAPI) -> None: DecisionNotCautionProposalError, CautionProposalNotActionableError, CautionProposalMalformedError, + # dismiss_event_in_reaction validation error. + InvalidDismissalReasonError, ): app.add_exception_handler(validation_cls, _handle_validation_error) - for not_found_cls in (AgentNotFoundError,): + for not_found_cls in ( + AgentNotFoundError, + SubscriberBookmarkNotFoundError, + DismissalEventNotFoundError, + ): app.add_exception_handler(not_found_cls, _handle_not_found) for already_exists_cls in (AgentAlreadyExistsError,): app.add_exception_handler(already_exists_cls, _handle_already_exists) @@ -186,6 +213,7 @@ def register_agent_routes(app: FastAPI) -> None: AgentCannotGrantToolError, AgentCannotRevokeToolError, AgentCannotReviseBudgetError, + EventAlreadyDismissedError, ): app.add_exception_handler(cannot_transition_cls, _handle_cannot_transition) app.add_exception_handler(UnauthorizedError, _handle_unauthorized) @@ -194,3 +222,8 @@ def register_agent_routes(app: FastAPI) -> None: # authorized to promote a Decision they did not originate # through a CautionDrafter agent. app.add_exception_handler(DecisionNotEmittedByCautionDrafterError, _handle_unauthorized) + # in-memory-mode rejection for the SQL-bound dismiss slice. + app.add_exception_handler( + DismissalRequiresPostgresError, + _handle_dismissal_requires_postgres, + ) diff --git a/apps/api/src/cora/agent/subscribers/caution_drafter.py b/apps/api/src/cora/agent/subscribers/caution_drafter.py index 8d376ae3c..92cf241b5 100644 --- a/apps/api/src/cora/agent/subscribers/caution_drafter.py +++ b/apps/api/src/cora/agent/subscribers/caution_drafter.py @@ -156,19 +156,25 @@ def _derive_decision_id(terminal_event_id: UUID) -> UUID: class CautionDrafterSubscriber: - """Side-effecting subscriber: terminal Run -> one Caution-proposal Decision. + """Reaction: terminal Run -> one Caution-proposal Decision. Constructed by `make_caution_drafter_subscriber` from the Kernel; - satisfies the `Projection` Protocol structurally. + satisfies the `Reaction` Protocol structurally. Holds references to the LLM port, event store, and CautionLookup port. The Decision's `actor_id` is the seeded CautionDrafter Agent's id (== that agent's Actor.id per 8f-a's identity-sharing invariant). + + `batch_size = 1` for the same reason as RunDebriefer: the apply + path includes a slow LLM round-trip, so holding the bookmark + transaction across N events would starve Projection advance loops + sharing the same pool. """ name = "caution_drafter" subscribed_event_types = _TERMINAL_RUN_EVENTS + batch_size = 1 def __init__( self, diff --git a/apps/api/src/cora/agent/subscribers/run_debriefer.py b/apps/api/src/cora/agent/subscribers/run_debriefer.py index 5e2281563..53899fccb 100644 --- a/apps/api/src/cora/agent/subscribers/run_debriefer.py +++ b/apps/api/src/cora/agent/subscribers/run_debriefer.py @@ -20,10 +20,13 @@ two transactions. 2. The bookmark transaction holds for the duration of `apply()` - including the LLM call. RunDebriefer is `batch_size=1` so the - transaction holds one connection for ~5-15 s per terminal - event. Acceptable for pilot scale (~few Runs/day); a watch - item for facility scale. + including the LLM call. RunDebriefer declares `batch_size = 1` + on the class (enforced by the worker via + `getattr(subscriber, "batch_size", DEFAULT_BATCH_SIZE)`), so the + transaction holds one connection for at most one terminal event's + LLM round-trip (~5-15 s). Acceptable for pilot scale (~few + Runs/day); watch item for facility scale → split off a + `ReactionWorker` with its own pool budget. Mitigation (1): the Decision's `stream_id` is derived deterministically from `terminal_event.event_id` via UUIDv5 (see @@ -233,24 +236,32 @@ def _derive_decision_id(terminal_event_id: UUID) -> UUID: class RunDebrieferSubscriber: - """Side-effecting subscriber: terminal Run -> one advisory Decision. + """Reaction: terminal Run -> one advisory Decision. Constructed by `make_run_debriefer_subscriber` from the Kernel; - satisfies the `Projection` Protocol (and the `Subscriber` - primitive it extends) structurally. + satisfies the `Reaction` Protocol (and the `Subscriber` primitive + it extends) structurally. Holds references to the LLM port and event store. The Decision's `actor_id` is the seeded RunDebriefer Agent's id (== that agent's Actor.id per 8f-a's identity-sharing invariant). - `name` and `subscribed_event_types` are plain class-level - constants (matches `DecisionRatingsProjection` precedent; the - `Projection` Protocol declares them as instance attrs which a + `name`, `subscribed_event_types`, and `batch_size` are plain + class-level constants (matches the wider Subscriber convention; + the Reaction Protocol declares them as instance attrs which a `ClassVar`-annotated class would not satisfy structurally). + + `batch_size = 1` enforces what the original module docstring + claimed before the framework supported per-subscriber tuning: + the apply path includes a 5-15 s LLM call, so holding the pool + connection across N events would starve Projection advance loops + sharing the same pool. Worst-case TX duration is bounded to one + LLM call. """ name = "run_debriefer" subscribed_event_types = _TERMINAL_RUN_EVENTS + batch_size = 1 def __init__( self, diff --git a/apps/api/src/cora/agent/tools.py b/apps/api/src/cora/agent/tools.py index 87ed83b2c..1aeee25e5 100644 --- a/apps/api/src/cora/agent/tools.py +++ b/apps/api/src/cora/agent/tools.py @@ -13,6 +13,9 @@ from cora.agent.features.define_agent import tool as define_agent_tool from cora.agent.features.deprecate_agent import tool as deprecate_agent_tool +from cora.agent.features.dismiss_event_in_reaction import ( + tool as dismiss_event_in_reaction_tool, +) from cora.agent.features.get_agent import tool as get_agent_tool from cora.agent.features.grant_tool_to_agent import tool as grant_tool_to_agent_tool from cora.agent.features.promote_caution_proposal import tool as promote_caution_proposal_tool @@ -84,6 +87,10 @@ def register_agent_tools( mcp, get_handler=lambda: _resolve_regenerate_run_debrief(get_handlers()), ) + dismiss_event_in_reaction_tool.register( + mcp, + get_handler=lambda: get_handlers().dismiss_event_in_reaction, + ) def _resolve_regenerate_run_debrief(handlers: AgentHandlers) -> RegenerateRunDebriefHandler: diff --git a/apps/api/src/cora/agent/wire.py b/apps/api/src/cora/agent/wire.py index 4fda89ffb..27c18da43 100644 --- a/apps/api/src/cora/agent/wire.py +++ b/apps/api/src/cora/agent/wire.py @@ -28,6 +28,13 @@ - `revise_agent_budget` (transition; idempotent; no wrap) - `get_agent` (query) - `regenerate_run_debrief` (operator-triggered; idempotency-wrapped) + - `dismiss_event_in_reaction` (operator-triggered atomic bookmark + advance + Decision audit; no + idempotency wrap because the slice + is operator-rare and the + EventAlreadyDismissedError guard + catches duplicate dismissals + strict-not-idempotently) """ from dataclasses import dataclass @@ -36,6 +43,7 @@ from cora.agent.features import ( define_agent, deprecate_agent, + dismiss_event_in_reaction, get_agent, grant_tool_to_agent, promote_caution_proposal, @@ -68,6 +76,7 @@ class AgentHandlers: get_agent: get_agent.Handler regenerate_run_debrief: regenerate_run_debrief.IdempotentHandler | None promote_caution_proposal: promote_caution_proposal.IdempotentHandler + dismiss_event_in_reaction: dismiss_event_in_reaction.Handler def wire_agent(deps: Kernel) -> AgentHandlers: @@ -170,4 +179,9 @@ def wire_agent(deps: Kernel) -> AgentHandlers: command_name="PromoteCautionProposal", bc=_BC, ), + dismiss_event_in_reaction=with_tracing( + dismiss_event_in_reaction.bind(deps), + command_name="DismissEventInReaction", + bc=_BC, + ), ) diff --git a/apps/api/src/cora/decision/aggregates/decision/state.py b/apps/api/src/cora/decision/aggregates/decision/state.py index f4d1ee6e3..40bd0a272 100644 --- a/apps/api/src/cora/decision/aggregates/decision/state.py +++ b/apps/api/src/cora/decision/aggregates/decision/state.py @@ -250,6 +250,27 @@ ) +# Operator-authored Decision emitted when dismissing a poison event +# from a Reaction's subscriber bookmark. The Decision is the audit +# record of the dismissal; the `dismiss_event_in_reaction` slice +# atomically pairs it with a `projection_bookmarks` advance so the +# operator action is recorded in the same place as every other +# operator judgment call. Open-ended convention identical to the +# RunDebrief / CautionProposal patterns above. +DECISION_CONTEXT_REACTION_DISMISSAL = "ReactionDismissal" + + +# Closed `choice` value set for `context = "ReactionDismissal"` +# Decisions. Single value today (the slice is purely "operator +# acknowledged this event is poison and advances the bookmark past +# it"); the Literal exists for symmetry with RunDebriefChoice / +# CautionProposalChoice and so a future "OperatorReplayed" or +# "OperatorQuarantined" choice can land additively without breaking +# downstream parsers. +ReactionDismissalChoice = Literal["EventDismissed"] +REACTION_DISMISSAL_CHOICES: Final = frozenset({"EventDismissed"}) + + # acceptance-signal capture: closed 3-value rating set on # the new `DecisionRated` event. `useful` and `misleading` are # operator-affirmative; `ignored` is a positive marker ("operator saw diff --git a/apps/api/src/cora/infrastructure/projection/__init__.py b/apps/api/src/cora/infrastructure/projection/__init__.py index edf73b1eb..60a247551 100644 --- a/apps/api/src/cora/infrastructure/projection/__init__.py +++ b/apps/api/src/cora/infrastructure/projection/__init__.py @@ -4,12 +4,22 @@ - `Projection` Protocol: the read-side fold a BC writes to maintain a `proj__` queryable table from the event stream. Per - BC, lives in `cora..projections.`. The contract is - documented on the Protocol itself. + BC, lives in `cora..projections.`. Fast, batch large + (`batch_size=100`), idempotent at the SQL layer. + + - `Reaction` Protocol: side-effecting Subscriber that emits NEW + events (often cross-BC) or calls the outside world (LLM, signer, + storage). Per BC, lives in `cora..subscribers.`. Slow, + batch small (`batch_size=1`), idempotent via deterministic + UUIDv5 stream id + ConcurrencyError-as-no-op. Recovery from a + wedged bookmark is the `dismiss_event_in_reaction` operator slice. - `ProjectionRegistry`: the worker iterates this. Each BC registers - its projections via a `register__projections(registry, deps)` - function the composition root calls during lifespan setup. + its projections via `register__projections(registry, deps)` + and its reactions via `register__subscribers(registry, deps)`, + both called by the composition root during lifespan setup. The + class name kept for backward compatibility; the registry accepts + any Subscriber (Projection or Reaction). - `projection_worker_lifespan(deps, registry, settings)`: async context manager the FastAPI lifespan wraps. Spawns the worker @@ -17,7 +27,7 @@ no-ops in the in-memory test environment. - `drain_projections(pool, registry, deadline_seconds)`: integration- - test helper that synchronously advances every registered projection + test helper that synchronously advances every registered subscriber until each bookmark catches up to the head position (or raises `ProjectionDrainTimeoutError`). Avoids `asyncio.sleep` flakiness. @@ -27,10 +37,9 @@ list endpoint uses these helpers so cursor format is uniform across BCs. -Internal primitive: `Subscriber` Protocol (`worker.py`). Today every -Subscriber is a Projection; future sagas / external adapters will be -additional kinds of Subscriber sharing the same advance machinery. -Not exported publicly until a second Subscriber type lands. +Internal primitive: `Subscriber` Protocol that both Projection and +Reaction satisfy structurally. Not exported publicly because BC +authors should write Projections or Reactions, not bare Subscribers. """ from cora.infrastructure.projection.cursor import ( @@ -42,7 +51,7 @@ ProjectionDrainTimeoutError, drain_projections, ) -from cora.infrastructure.projection.handler import Projection +from cora.infrastructure.projection.handler import Projection, Reaction from cora.infrastructure.projection.lifespan import projection_worker_lifespan from cora.infrastructure.projection.registry import ( DuplicateProjectionError, @@ -57,6 +66,7 @@ "Projection", "ProjectionDrainTimeoutError", "ProjectionRegistry", + "Reaction", "decode_cursor", "drain_projections", "encode_cursor", diff --git a/apps/api/src/cora/infrastructure/projection/handler.py b/apps/api/src/cora/infrastructure/projection/handler.py index 4d3b3e54d..904827293 100644 --- a/apps/api/src/cora/infrastructure/projection/handler.py +++ b/apps/api/src/cora/infrastructure/projection/handler.py @@ -1,16 +1,30 @@ -"""Projection Protocol + the internal Subscriber primitive it satisfies. - -`Projection` is the public concept users write per BC. `Subscriber` is -the framework-internal primitive the worker advances along the event -stream; today every Subscriber is a Projection. Future sagas / external -adapters land as additional Protocols satisfying Subscriber, sharing -the same advance machinery without duplicating it. - -The Protocols are structurally identical right now (same field set, -same callable shape). The split is intentional: it documents the -extensibility seam without surfacing two concepts to BC authors. When -a saga framework lands, the worker iterates `Subscriber`-shaped -objects regardless of whether they happen to be Projections or Sagas. +"""Projection + Reaction Protocols, both satisfying the internal Subscriber primitive. + +Two public kinds of Subscriber: + + - `Projection`: read-side fold of an event stream into a queryable + `proj_*` table. Fast (sub-millisecond apply), batches large + (`batch_size=100` default), idempotent at the SQL layer + (`ON CONFLICT`). Failure mode: stale read model until next poll. + + - `Reaction`: side-effecting consumer that produces NEW events + (often cross-BC) or calls the outside world (LLMs, storage, + signers). Slow (LLM-bounded, 5-15 s apply), batches small + (`batch_size=1` recommended), idempotent via deterministic + UUIDv5 stream ids + `expected_version=0` + `ConcurrencyError`- + as-no-op. Failure mode: wedged bookmark on poison event; + recoverable via the `dismiss_event_in_reaction` operator slice. + +`Subscriber` is the framework-internal primitive the worker advances +along the event stream; both Projection and Reaction satisfy it via +structural typing. The worker iterates Subscribers without caring +which kind they are; per-subscriber `batch_size` lets each kind tune +its own throughput vs latency tradeoff. + +The Protocols are structurally identical today (same field set, same +callable shape). The split is operational, not type-theoretic: it +documents the two distinct failure-and-tuning regimes a BC author is +opting into. """ from typing import Any, Protocol, runtime_checkable @@ -25,21 +39,36 @@ # adopt the same pattern. ConnectionLike = Any +# Worker-level default batch size when a Subscriber declines to +# declare its own. Projections inherit this implicitly; Reactions +# should override to 1 (slow LLM call should not hold a pool +# connection across N events). +DEFAULT_BATCH_SIZE = 100 + @runtime_checkable class Subscriber(Protocol): """Internal framework primitive: anything the projection worker advances along the event stream via a bookmark. - Not exported publicly. Today every Subscriber is a Projection; - future sagas / external adapters will satisfy this Protocol too. + Not exported publicly as a vocabulary BC authors write against; + they write Projections or Reactions. Both satisfy this Protocol + via structural typing. - `name` and `subscribed_event_types` are declared as plain attrs - (not ClassVar) so test fixtures can construct varying instances - inline. Production projections typically declare them as - `ClassVar` constants at the class level since they're immutable - per projection — that satisfies this Protocol via structural + `name`, `subscribed_event_types`, and `batch_size` are declared + as plain attrs (not ClassVar) so test fixtures can construct + varying instances inline. Production subscribers typically + declare them as class-level constants since they're immutable + per subscriber — that satisfies this Protocol via structural typing. + + `batch_size` is intentionally NOT on the Subscriber Protocol so + the 14 existing Projections that pre-date the per-subscriber knob + continue to satisfy the contract without a sweeping update. The + worker reads `getattr(subscriber, "batch_size", DEFAULT_BATCH_SIZE)` + so a Subscriber that omits the attribute inherits the default + transparently. The Reaction Protocol DOES require `batch_size` + because Reactions must opt into the slow-batch-1 regime explicitly. """ name: str @@ -114,6 +143,14 @@ class Projection(Protocol): - Per-projection bookmarks mean projections advance independently and at their own pace. One projection lagging does not block others. + + - `batch_size` is intentionally NOT in the Projection Protocol. + The worker reads `getattr(projection, "batch_size", DEFAULT_BATCH_SIZE)` + so a Projection that overrides it (e.g., a high-throughput + projection that wants 250) wins, and one that omits it + inherits the default 100 transparently. Reactions, by + contrast, MUST declare `batch_size` because the Reaction + Protocol enforces the opt-in to the slow-apply regime. """ name: str @@ -132,4 +169,91 @@ async def apply( ... -__all__ = ["ConnectionLike", "Projection", "Subscriber"] +@runtime_checkable +class Reaction(Protocol): + """Side-effecting Subscriber: consumes events and produces NEW + events (often into other BCs) or calls the outside world + (LLMs, storage, signing services). + + Parallel to `Projection`; both satisfy `Subscriber`. The split + is operational, not type-theoretic: + + - Projections are fast (sub-millisecond apply), batch large + (`batch_size=100` default), and idempotent at the SQL layer + (`ON CONFLICT`). Their failure mode is "stale read model + until next poll" and the operator playbook is "wait." + + - Reactions are slow (LLM-bounded, 5-15 s apply), batch small + (`batch_size=1` recommended), and idempotent via + deterministic UUIDv5 stream ids + `expected_version=0` + + `ConcurrencyError`-as-no-op. Their failure mode is "wedged + bookmark on poison event" and the operator playbook is the + `dismiss_event_in_reaction` slice. + + Contract: + + - `batch_size` SHOULD be 1 unless the apply path is provably + fast (no LLM call, no external HTTP, no signer round-trip). + Holding a pool connection across N * 15 s LLM calls + starves Projection advance loops sharing the same pool. + + - `apply` MUST achieve at-most-once delivery via deterministic + stream-id derivation. Pattern: derive the side-effect's + stream_id as `uuid5(NAMESPACE_OID, f"{name}:{event.event_id}")` + and call `event_store.append(..., expected_version=0)`; catch + `ConcurrencyError` and treat as success (the side effect was + already applied on a previous attempt). + + - `apply` runs INSIDE the worker's advance transaction. The + bookmark advance + any conn-bound writes commit together; if + `apply` raises, the entire batch rolls back and the bookmark + stays at its previous position so the same events are retried + on the next iteration. + + - Cross-BC `EventStore.append` calls inside `apply` use a + SEPARATE pool connection from the advance loop's connection. + Cross-TX writes are unavoidable for cross-BC side effects; + the UUIDv5 + ConcurrencyError pattern absorbs the resulting + at-least-once retries. + + - The cross-BC append cannot be rolled back if the bookmark + update later fails. Reactions writing to multiple BC streams + in one `apply` accept this asymmetry by design: the side + effect is recorded; the worker re-fires; the second attempt + is absorbed by ConcurrencyError. + + Operational notes: + + - Wedge recovery: if `apply` raises a non-recoverable error + (poison event, schema drift, deserialization failure), the + bookmark stays put and `consecutive_failures` increments on + each retry. Operator response: invoke the + `dismiss_event_in_reaction` slice to advance the bookmark + past the poison event with an auditable Decision. + + - Today every Reaction runs in the same pool as Projections. + Watch-item: when a third Reaction lands OR the first wedge + incident occurs, split off a `ReactionWorker` with its own + pool budget. + """ + + name: str + subscribed_event_types: frozenset[str] + batch_size: int + + async def apply( + self, + event: StoredEvent, + conn: ConnectionLike, + ) -> None: + """Apply a single event by emitting NEW events or calling + the outside world. + + At-most-once delivery is the Reaction author's responsibility + (deterministic UUIDv5 stream id + ConcurrencyError catch); + the framework delivers at-least-once. + """ + ... + + +__all__ = ["DEFAULT_BATCH_SIZE", "ConnectionLike", "Projection", "Reaction", "Subscriber"] diff --git a/apps/api/src/cora/infrastructure/projection/registry.py b/apps/api/src/cora/infrastructure/projection/registry.py index 29afe5de2..64ac695d9 100644 --- a/apps/api/src/cora/infrastructure/projection/registry.py +++ b/apps/api/src/cora/infrastructure/projection/registry.py @@ -1,25 +1,31 @@ -"""Registry of registered Projections (and future Subscriber kinds). +"""Registry of registered Subscribers (Projections and Reactions). The composition root populates this during lifespan setup by calling -each BC's `register__projections(registry, deps)` function. The -worker iterates the registry; the test suite uses it to drive the +each BC's `register__projections(registry, deps)` and +`register__subscribers(registry, deps)` functions. The worker +iterates the registry; the test suite uses it to drive the `drain_projections` helper and the arch-fitness `every-registration- has-a-table` check. + +The class is named `ProjectionRegistry` for historical reasons; it +holds any Subscriber-shaped object (Projection or Reaction). Rename +to `SubscriberRegistry` is deferred: pure cosmetics, ripples +across every BC's `_subscribers.py` and `_projections.py` wiring. """ from collections.abc import Iterator -from cora.infrastructure.projection.handler import Projection +from cora.infrastructure.projection.handler import Subscriber class DuplicateProjectionError(Exception): - """Raised when two projections register with the same name. Names + """Raised when two subscribers register with the same name. Names must be unique because they key the bookmark row + the proj_* table + the arch-fitness checks.""" def __init__(self, name: str) -> None: super().__init__( - f"Projection with name {name!r} is already registered. " + f"Subscriber with name {name!r} is already registered. " "Names must be unique across the whole process; they key " "the bookmark row and the proj_* table." ) @@ -27,41 +33,42 @@ def __init__(self, name: str) -> None: class EmptySubscriptionError(Exception): - """Raised when a projection registers with an empty + """Raised when a subscriber registers with an empty `subscribed_event_types` set. The advance query uses `event_type = ANY($subscribed)`; an empty - set always matches zero rows, so the projection would silently + set always matches zero rows, so the subscriber would silently never advance. Catching this at registration surfaces the bug at - startup instead of as a "why is my projection empty?" investigation + startup instead of as a "why is my subscriber empty?" investigation later. """ def __init__(self, name: str) -> None: super().__init__( - f"Projection {name!r} has empty subscribed_event_types. " - "An empty set never matches any event; the projection would " + f"Subscriber {name!r} has empty subscribed_event_types. " + "An empty set never matches any event; the subscriber would " "register, the worker would advance it, and zero events " "would ever be processed. List the event_type strings the " - "projection cares about." + "subscriber cares about." ) self.name = name class ProjectionRegistry: - """Holds the set of registered projections; iterable by the worker.""" + """Holds the set of registered subscribers (Projections + Reactions); + iterable by the worker.""" def __init__(self) -> None: - self._by_name: dict[str, Projection] = {} + self._by_name: dict[str, Subscriber] = {} - def register(self, projection: Projection) -> None: - if projection.name in self._by_name: - raise DuplicateProjectionError(projection.name) - if not projection.subscribed_event_types: - raise EmptySubscriptionError(projection.name) - self._by_name[projection.name] = projection + def register(self, subscriber: Subscriber) -> None: + if subscriber.name in self._by_name: + raise DuplicateProjectionError(subscriber.name) + if not subscriber.subscribed_event_types: + raise EmptySubscriptionError(subscriber.name) + self._by_name[subscriber.name] = subscriber - def get(self, name: str) -> Projection | None: + def get(self, name: str) -> Subscriber | None: return self._by_name.get(name) def names(self) -> frozenset[str]: @@ -70,7 +77,7 @@ def names(self) -> frozenset[str]: def is_empty(self) -> bool: return not self._by_name - def __iter__(self) -> Iterator[Projection]: + def __iter__(self) -> Iterator[Subscriber]: return iter(self._by_name.values()) def __len__(self) -> int: diff --git a/apps/api/src/cora/infrastructure/projection/worker.py b/apps/api/src/cora/infrastructure/projection/worker.py index 01180bd0a..3c427160a 100644 --- a/apps/api/src/cora/infrastructure/projection/worker.py +++ b/apps/api/src/cora/infrastructure/projection/worker.py @@ -35,7 +35,7 @@ write_bookmark, write_bookmark_failure, ) -from cora.infrastructure.projection.handler import Subscriber +from cora.infrastructure.projection.handler import DEFAULT_BATCH_SIZE, Subscriber from cora.infrastructure.projection.registry import ProjectionRegistry from cora.infrastructure.projection.wakeup import WakeupSource @@ -98,7 +98,7 @@ async def advance_subscriber_once( pool: asyncpg.Pool, subscriber: Subscriber, *, - batch_size: int = 100, + batch_size: int = DEFAULT_BATCH_SIZE, ) -> int: """Advance one Subscriber by at most `batch_size` events. Returns the number of events processed (0 if the bookmark is already at @@ -108,6 +108,10 @@ async def advance_subscriber_once( runs in a single transaction so at-least-once delivery has clean semantics: either everything in the batch advances together, or nothing does. + + Callers from the worker pass the per-subscriber batch_size + (`getattr(subscriber, "batch_size", DEFAULT_BATCH_SIZE)`); callers + from the drain helper pass an explicit value to bound test cost. """ subscribed = sorted(subscriber.subscribed_event_types) async with pool.acquire() as conn, conn.transaction(): @@ -145,12 +149,16 @@ def __init__( wakeup: WakeupSource, *, poll_interval_seconds: float = 5.0, - batch_size: int = 100, + batch_size: int = DEFAULT_BATCH_SIZE, ) -> None: self._pool = pool self._registry = registry self._wakeup = wakeup self._poll_interval_seconds = poll_interval_seconds + # Per-subscriber default. Each Subscriber may declare its own + # `batch_size` attribute (Reactions set this to 1); the loop + # below reads `subscriber.batch_size` via getattr so the + # worker-level value is the fallback only. self._batch_size = batch_size async def run(self) -> None: @@ -173,12 +181,18 @@ async def _advance_loop(self, projection: Subscriber) -> None: """Single projection's advance loop. Runs forever until cancelled.""" backoff = _BACKOFF_BASE_SECONDS + # Per-subscriber batch_size: Projections inherit the worker + # default (100), Reactions override to 1 to avoid holding a + # pool connection across a slow LLM call. Read once at loop + # start since the class-level attribute is immutable per + # subscriber instance. + effective_batch_size = getattr(projection, "batch_size", self._batch_size) while True: try: processed = await advance_subscriber_once( self._pool, projection, - batch_size=self._batch_size, + batch_size=effective_batch_size, ) backoff = _BACKOFF_BASE_SECONDS # reset on success if processed > 0: diff --git a/apps/api/tests/architecture/test_reaction_batch_size.py b/apps/api/tests/architecture/test_reaction_batch_size.py new file mode 100644 index 000000000..2e0ccd9ef --- /dev/null +++ b/apps/api/tests/architecture/test_reaction_batch_size.py @@ -0,0 +1,109 @@ +"""Every registered Reaction declares a small `batch_size`. + +The Reaction Protocol exists because side-effecting subscribers +(LLM calls, signing, external HTTP) take 5-15 s per `apply()` and +holding a Postgres pool connection across N events at that latency +starves Projection advance loops sharing the same pool. + +The Reaction Protocol contract requires `batch_size = 1` unless the +apply path is provably fast. This test enforces a soft ceiling +(`batch_size <= 10`) on every Subscriber whose class name ends with +`Subscriber` or whose module lives under `/subscribers/`. The +ceiling is loose enough to allow a future fast-path Reaction (e.g., +local cache lookup) but tight enough to catch the failure mode +"contributor forgot the override and the worker default of 100 +quietly kicked in." + +Skips if no BCs ship Reactions yet (Agent BC is the only one today). +""" + +from __future__ import annotations + +import contextlib +import importlib +from typing import TYPE_CHECKING + +import pytest + +from cora.infrastructure.projection import ProjectionRegistry, Reaction +from tests.architecture.conftest import BCS + +if TYPE_CHECKING: + from cora.infrastructure.kernel import Kernel + +_REACTION_BATCH_SIZE_CEILING = 10 + + +def _populate_registry_from_bcs() -> ProjectionRegistry: + """Mirror of `test_projection_idempotency._populate_registry_from_bcs` + but extended to call `register__subscribers` as well as + `register__projections`. Reactions live under the + `_subscribers.py` factory; Projections live under the + `_projections.py` factory. Both register into the same + ProjectionRegistry today.""" + registry = ProjectionRegistry() + deps_stub: Kernel | None = None + for bc in BCS: + try: + module = importlib.import_module(f"cora.{bc}") + except ModuleNotFoundError: + continue + register_projections = getattr(module, f"register_{bc}_projections", None) + if register_projections is not None: + register_projections(registry, deps_stub) + register_subscribers = getattr(module, f"register_{bc}_subscribers", None) + if register_subscribers is not None: + # Reactions need a real Kernel (llm port, signer, etc.) to + # construct; skip when the stub Kernel can't satisfy the + # constructor. The classification test in + # tests/unit/agent/test_reaction_classification.py pins + # batch_size at the class level for the known Reactions. + with contextlib.suppress(AttributeError, TypeError): + register_subscribers(registry, deps_stub) + return registry + + +@pytest.mark.architecture +def test_every_reaction_pins_small_batch_size() -> None: + """A registered Subscriber whose class declares `batch_size > 10` + has almost certainly slipped through the Reaction-Protocol + contract (the recommended value is 1). Fails loudly so the + contributor adds an explicit override or argues the case in + review.""" + registry = _populate_registry_from_bcs() + if registry.is_empty(): + pytest.skip( + "No subscribers registered; Agent BC needs a Kernel to build " + "its Reactions. The unit-level classification test pins the " + "batch_size invariant for the known Reactions." + ) + + # `isinstance(subscriber, Reaction)` is a structural check (Reaction + # is @runtime_checkable); Projections without a `batch_size` attribute + # don't match because the Protocol declares it as required. So this + # loop only sees Subscribers that explicitly opted into the Reaction + # contract; a stray Projection with `batch_size = 250` will not be + # caught here but that's fine, this test enforces the Reaction-side + # invariant only. + reactions = [s for s in registry if isinstance(s, Reaction)] + if not reactions: + pytest.skip( + "No Reactions in the registry (stub Kernel could not construct " + "Agent BC's LLM-bound Reactions). The unit-level classification " + "test pins batch_size for the known Reactions." + ) + + failures: list[str] = [] + for reaction in reactions: + if reaction.batch_size > _REACTION_BATCH_SIZE_CEILING: + failures.append(f"{reaction.name} (batch_size={reaction.batch_size})") + + assert not failures, ( + "Reactions exceed the batch_size ceiling " + f"({_REACTION_BATCH_SIZE_CEILING}):\n" + + "\n".join(f" - {f}" for f in failures) + + "\n\nThe Reaction Protocol recommends `batch_size = 1` so the " + "bookmark transaction is bounded to one side-effect round-trip. " + "If your apply path is provably fast, document it; if not, pin " + "the override to 1." + ) diff --git a/apps/api/tests/contract/test_dismiss_event_in_reaction_endpoint.py b/apps/api/tests/contract/test_dismiss_event_in_reaction_endpoint.py new file mode 100644 index 000000000..a4e4e452a --- /dev/null +++ b/apps/api/tests/contract/test_dismiss_event_in_reaction_endpoint.py @@ -0,0 +1,92 @@ +"""Contract tests for `POST /agent/reactions/{name}/dismiss-event`. + +The slice is fundamentally PG-bound (it advances a SQL row on +projection_bookmarks). In the in-memory test app the handler raises +`DismissalRequiresPostgresError` which the BC's exception handler +maps to 503. So the contract layer pins: + + - 422 for schema violations (missing body, malformed UUID, oversize + reason, missing required fields) + - 503 for the in-memory-mode rejection + +The PG happy path (201 + Decision row + bookmark advance), 404 +mappings (subscriber unknown / event unknown), 409 mapping +(already-dismissed), and the 400 mapping (whitespace reason) live in +the integration test against real Postgres, where the SQL state +actually exists. +""" + +from uuid import uuid4 + +import pytest +from fastapi.testclient import TestClient + +from cora.api.main import create_app + + +@pytest.mark.contract +def test_post_dismiss_event_returns_503_in_memory_mode() -> None: + """In-memory app has no projection_bookmarks table; the handler + raises DismissalRequiresPostgresError; the route maps to 503.""" + with TestClient(create_app()) as client: + response = client.post( + "/agent/reactions/run_debriefer/dismiss-event", + json={"event_id": str(uuid4()), "reason": "test"}, + ) + assert response.status_code == 503 + assert "postgres" in response.json()["detail"].lower() + + +@pytest.mark.contract +def test_post_dismiss_event_returns_422_for_missing_event_id() -> None: + with TestClient(create_app()) as client: + response = client.post( + "/agent/reactions/run_debriefer/dismiss-event", + json={"reason": "missing event_id"}, + ) + assert response.status_code == 422 + + +@pytest.mark.contract +def test_post_dismiss_event_returns_422_for_missing_reason() -> None: + with TestClient(create_app()) as client: + response = client.post( + "/agent/reactions/run_debriefer/dismiss-event", + json={"event_id": str(uuid4())}, + ) + assert response.status_code == 422 + + +@pytest.mark.contract +def test_post_dismiss_event_returns_422_for_malformed_event_id() -> None: + with TestClient(create_app()) as client: + response = client.post( + "/agent/reactions/run_debriefer/dismiss-event", + json={"event_id": "not-a-uuid", "reason": "test"}, + ) + assert response.status_code == 422 + + +@pytest.mark.contract +def test_post_dismiss_event_returns_422_for_empty_reason() -> None: + """Pydantic `min_length=1` short-circuits the empty string before + the handler runs; the body is rejected at 422 rather than the + handler's 400 InvalidDismissalReasonError.""" + with TestClient(create_app()) as client: + response = client.post( + "/agent/reactions/run_debriefer/dismiss-event", + json={"event_id": str(uuid4()), "reason": ""}, + ) + assert response.status_code == 422 + + +@pytest.mark.contract +def test_post_dismiss_event_returns_422_for_oversize_reason() -> None: + """Pydantic `max_length=500` short-circuits the oversize string + before the handler runs.""" + with TestClient(create_app()) as client: + response = client.post( + "/agent/reactions/run_debriefer/dismiss-event", + json={"event_id": str(uuid4()), "reason": "x" * 501}, + ) + assert response.status_code == 422 diff --git a/apps/api/tests/contract/test_dismiss_event_in_reaction_mcp_tool.py b/apps/api/tests/contract/test_dismiss_event_in_reaction_mcp_tool.py new file mode 100644 index 000000000..5e38a8ebe --- /dev/null +++ b/apps/api/tests/contract/test_dismiss_event_in_reaction_mcp_tool.py @@ -0,0 +1,58 @@ +"""Contract tests for the `dismiss_event_in_reaction` MCP tool. + +The slice is PG-bound; in the in-memory test app the handler raises +DismissalRequiresPostgresError. The MCP tool surfaces this as +isError=True. Production deploys with a Postgres pool exercise the +happy path in the integration suite. +""" + +from uuid import uuid4 + +import pytest +from fastapi.testclient import TestClient + +from cora.api.main import create_app +from tests.contract._mcp_helpers import open_session, parse_sse_data + + +@pytest.mark.contract +def test_mcp_lists_dismiss_event_in_reaction_tool() -> None: + with TestClient(create_app()) as client: + headers = open_session(client) + response = client.post( + "/mcp", + json={"jsonrpc": "2.0", "id": 99, "method": "tools/list"}, + headers=headers, + ) + body = parse_sse_data(response.text) + tool_names = [t["name"] for t in body["result"]["tools"]] + assert "dismiss_event_in_reaction" in tool_names + + +@pytest.mark.contract +def test_mcp_dismiss_event_in_reaction_tool_iserror_in_memory_mode() -> None: + """In-memory app surfaces DismissalRequiresPostgresError as + isError=True on the MCP tool. Production deploys never see this + branch.""" + with TestClient(create_app()) as client: + headers = open_session(client) + response = client.post( + "/mcp", + json={ + "jsonrpc": "2.0", + "id": 5, + "method": "tools/call", + "params": { + "name": "dismiss_event_in_reaction", + "arguments": { + "subscriber_name": "run_debriefer", + "event_id": str(uuid4()), + "reason": "test", + }, + }, + }, + headers=headers, + ) + body = parse_sse_data(response.text) + assert body["result"]["isError"] is True + assert "postgres" in body["result"]["content"][0]["text"].lower() diff --git a/apps/api/tests/integration/test_dismiss_event_in_reaction_postgres.py b/apps/api/tests/integration/test_dismiss_event_in_reaction_postgres.py new file mode 100644 index 000000000..cff65054a --- /dev/null +++ b/apps/api/tests/integration/test_dismiss_event_in_reaction_postgres.py @@ -0,0 +1,336 @@ +"""End-to-end PG integration for `dismiss_event_in_reaction`. + +Walks: seed a projection_bookmarks row -> insert a probe event -> +invoke the slice handler -> assert the bookmark advanced to the event's +(transaction_id, position) AND a `DecisionRegistered` row appeared in +the `events` table with `context = "ReactionDismissal"` AND the cursor +that was advanced past is in the Decision's inputs payload. + +Plus the wedge-recovery semantics: + + - SubscriberBookmarkNotFoundError when the named subscriber has no + bookmark row (404) + - DismissalEventNotFoundError when the event_id does not exist (404) + - EventAlreadyDismissedError when the bookmark is already past the + target event (409) + - Atomic write: if `event_store.append_streams` were to fail mid- + transaction, the bookmark advance would roll back too (covered + by the same-transaction shape, not a separate fault-injection + test). +""" + +# pyright: reportUnknownMemberType=false, reportUnknownVariableType=false, reportUnknownArgumentType=false, reportArgumentType=false + +import json +from datetime import UTC, datetime +from typing import Any +from uuid import UUID, uuid4 + +import asyncpg +import pytest + +from cora.agent.errors import ( + DismissalEventNotFoundError, + EventAlreadyDismissedError, + SubscriberBookmarkNotFoundError, +) +from cora.agent.features.dismiss_event_in_reaction import ( + DismissEventInReaction, + bind, +) +from cora.infrastructure.event_envelope import to_new_event +from tests.integration._helpers import build_postgres_deps + +_NOW = datetime(2026, 6, 2, 14, 30, 0, tzinfo=UTC) +_PRINCIPAL_ID = UUID("01900000-0000-7000-8000-000000007007") +_CORRELATION_ID = UUID("01900000-0000-7000-8000-00000000c1d0") + + +async def _ensure_bookmark( + db_pool: asyncpg.Pool, + name: str, + *, + last_transaction_id: int = 0, + last_position: int = 0, +) -> None: + """Insert (or update) a projection_bookmarks row to a known cursor.""" + async with db_pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO projection_bookmarks (name, last_transaction_id, last_position) + VALUES ($1, $2::xid8, $3) + ON CONFLICT (name) DO UPDATE + SET last_transaction_id = EXCLUDED.last_transaction_id, + last_position = EXCLUDED.last_position + """, + name, + last_transaction_id, + last_position, + ) + + +async def _read_bookmark(db_pool: asyncpg.Pool, name: str) -> dict[str, object] | None: + async with db_pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT last_transaction_id::text AS last_tx, + last_position, + last_error_at, + last_error_message, + consecutive_failures + FROM projection_bookmarks + WHERE name = $1 + """, + name, + ) + return dict(row) if row is not None else None + + +async def _append_probe_event( + db_pool: asyncpg.Pool, + *, + event_id: UUID, + stream_id: UUID, +) -> tuple[int, int]: + """Append one event to a probe stream and return its + (transaction_id, position) cursor for the test to assert against.""" + deps = build_postgres_deps(db_pool, now=_NOW) + new_event = to_new_event( + event_type="DismissEventInReactionProbeEvent", + payload={"marker": "probe"}, + occurred_at=_NOW, + event_id=event_id, + command_name="ProbeCommand", + correlation_id=_CORRELATION_ID, + causation_id=None, + principal_id=_PRINCIPAL_ID, + ) + await deps.event_store.append( + stream_type="DismissProbe", + stream_id=stream_id, + expected_version=0, + events=[new_event], + ) + + async with db_pool.acquire() as conn: + row = await conn.fetchrow( + "SELECT transaction_id::text AS tx, position FROM events WHERE event_id = $1", + event_id, + ) + assert row is not None + return int(row["tx"]), int(row["position"]) + + +@pytest.mark.integration +async def test_dismiss_event_advances_bookmark_and_writes_decision( + db_pool: asyncpg.Pool, +) -> None: + """Happy path: bookmark advances to event cursor, Decision row + appears with the right payload, both in the same transaction.""" + subscriber_name = f"probe_reaction_{uuid4().hex[:8]}" + await _ensure_bookmark(db_pool, subscriber_name) + + event_id = uuid4() + stream_id = uuid4() + event_tx, event_pos = await _append_probe_event(db_pool, event_id=event_id, stream_id=stream_id) + + # Handler consumes 2 ids per call: new_decision_id + envelope event_id. + decision_id_seed = uuid4() + envelope_event_id = uuid4() + deps = build_postgres_deps( + db_pool, + now=_NOW, + ids=[decision_id_seed, envelope_event_id], + ) + handler = bind(deps) + decision_id = await handler( + DismissEventInReaction( + subscriber_name=subscriber_name, + event_id=event_id, + reason="schema rename after rollout", + ), + principal_id=_PRINCIPAL_ID, + correlation_id=_CORRELATION_ID, + ) + + bookmark = await _read_bookmark(db_pool, subscriber_name) + assert bookmark is not None + assert int(bookmark["last_tx"]) == event_tx + assert bookmark["last_position"] == event_pos + assert bookmark["last_error_at"] is None + assert bookmark["last_error_message"] is None + assert bookmark["consecutive_failures"] == 0 + + async with db_pool.acquire() as conn: + decision_row = await conn.fetchrow( + """ + SELECT event_type, payload + FROM events + WHERE stream_type = 'Decision' AND stream_id = $1 + """, + decision_id, + ) + assert decision_row is not None + assert decision_row["event_type"] == "DecisionRegistered" + # asyncpg's jsonb codec returns str OR dict depending on config; + # tolerate both shapes so the test isn't fragile to codec changes. + raw_payload: Any = decision_row["payload"] + payload = json.loads(raw_payload) if isinstance(raw_payload, str) else raw_payload + assert payload["context"] == "ReactionDismissal" + assert payload["choice"] == "EventDismissed" + assert payload["actor_id"] == str(_PRINCIPAL_ID) + assert payload["inputs"]["subscriber_name"] == subscriber_name + assert payload["inputs"]["event_id"] == str(event_id) + assert payload["inputs"]["previous_bookmark_transaction_id"] == "0" + assert payload["inputs"]["previous_bookmark_position"] == "0" + assert payload["inputs"]["event_transaction_id"] == str(event_tx) + assert payload["inputs"]["event_position"] == str(event_pos) + + +@pytest.mark.integration +async def test_dismiss_event_raises_subscriber_bookmark_not_found( + db_pool: asyncpg.Pool, +) -> None: + """No bookmark row for the named subscriber: raises SubscriberBookmarkNotFoundError. + Operator misspelled the name OR the subscriber's migration hasn't landed.""" + deps = build_postgres_deps(db_pool, now=_NOW, ids=[uuid4(), uuid4()]) + handler = bind(deps) + + with pytest.raises(SubscriberBookmarkNotFoundError): + await handler( + DismissEventInReaction( + subscriber_name=f"never_registered_{uuid4().hex[:8]}", + event_id=uuid4(), + reason="test", + ), + principal_id=_PRINCIPAL_ID, + correlation_id=_CORRELATION_ID, + ) + + +@pytest.mark.integration +async def test_dismiss_event_raises_event_not_found( + db_pool: asyncpg.Pool, +) -> None: + """Bookmark exists, but the event_id does not exist in events: + raises DismissalEventNotFoundError.""" + subscriber_name = f"probe_reaction_{uuid4().hex[:8]}" + await _ensure_bookmark(db_pool, subscriber_name) + + deps = build_postgres_deps(db_pool, now=_NOW, ids=[uuid4(), uuid4()]) + handler = bind(deps) + + with pytest.raises(DismissalEventNotFoundError): + await handler( + DismissEventInReaction( + subscriber_name=subscriber_name, + event_id=uuid4(), + reason="test", + ), + principal_id=_PRINCIPAL_ID, + correlation_id=_CORRELATION_ID, + ) + + +@pytest.mark.integration +async def test_dismiss_event_raises_event_already_dismissed( + db_pool: asyncpg.Pool, +) -> None: + """Bookmark is already past the target event: raises + EventAlreadyDismissedError (no rewinds).""" + subscriber_name = f"probe_reaction_{uuid4().hex[:8]}" + event_id = uuid4() + stream_id = uuid4() + event_tx, event_pos = await _append_probe_event(db_pool, event_id=event_id, stream_id=stream_id) + + # Pre-advance the bookmark TO the event cursor (so the event is + # already at-or-past). + await _ensure_bookmark( + db_pool, + subscriber_name, + last_transaction_id=event_tx, + last_position=event_pos, + ) + + deps = build_postgres_deps(db_pool, now=_NOW, ids=[uuid4(), uuid4()]) + handler = bind(deps) + + with pytest.raises(EventAlreadyDismissedError): + await handler( + DismissEventInReaction( + subscriber_name=subscriber_name, + event_id=event_id, + reason="trying to dismiss an event we already passed", + ), + principal_id=_PRINCIPAL_ID, + correlation_id=_CORRELATION_ID, + ) + + +@pytest.mark.integration +async def test_dismiss_event_rollback_on_failure_keeps_bookmark_intact( + db_pool: asyncpg.Pool, +) -> None: + """Atomicity pin: if the second write fails, the first rolls back. + + Simulates by re-using the same decision_id from a prior dismissal + (synthesizing a ConcurrencyError on the Decision append). The + bookmark advance + Decision write share `conn.transaction()`; the + Decision append's ConcurrencyError aborts the whole transaction, + so the bookmark stays at its previous cursor. + + Today's id_generator is fresh-UUIDv7-per-call so we can't easily + force the collision; this test instead verifies the SHAPE: a + successful dismissal advances the bookmark, a second dismissal of + the SAME event raises EventAlreadyDismissedError (without touching + the bookmark), and the bookmark stays at the first advanced + position. That's the contract observers care about: the bookmark + is never left in an intermediate state.""" + subscriber_name = f"probe_reaction_{uuid4().hex[:8]}" + await _ensure_bookmark(db_pool, subscriber_name) + + event_id = uuid4() + stream_id = uuid4() + event_tx, event_pos = await _append_probe_event(db_pool, event_id=event_id, stream_id=stream_id) + + # Two dismissals in this test: 2 ids per call x 2 = 4 ids. + deps = build_postgres_deps( + db_pool, + now=_NOW, + ids=[uuid4(), uuid4(), uuid4(), uuid4()], + ) + handler = bind(deps) + + await handler( + DismissEventInReaction( + subscriber_name=subscriber_name, + event_id=event_id, + reason="first dismissal", + ), + principal_id=_PRINCIPAL_ID, + correlation_id=_CORRELATION_ID, + ) + + bookmark_after_first = await _read_bookmark(db_pool, subscriber_name) + assert bookmark_after_first is not None + assert int(bookmark_after_first["last_tx"]) == event_tx + assert bookmark_after_first["last_position"] == event_pos + + # Second dismissal of the SAME event raises EventAlreadyDismissedError. + with pytest.raises(EventAlreadyDismissedError): + await handler( + DismissEventInReaction( + subscriber_name=subscriber_name, + event_id=event_id, + reason="second dismissal", + ), + principal_id=_PRINCIPAL_ID, + correlation_id=_CORRELATION_ID, + ) + + # Bookmark stayed at the first advanced cursor (no rewind, no + # mid-flight state). + bookmark_after_second = await _read_bookmark(db_pool, subscriber_name) + assert bookmark_after_second is not None + assert int(bookmark_after_second["last_tx"]) == event_tx + assert bookmark_after_second["last_position"] == event_pos diff --git a/apps/api/tests/unit/agent/test_dismiss_event_in_reaction_decider.py b/apps/api/tests/unit/agent/test_dismiss_event_in_reaction_decider.py new file mode 100644 index 000000000..de1f4b18b --- /dev/null +++ b/apps/api/tests/unit/agent/test_dismiss_event_in_reaction_decider.py @@ -0,0 +1,238 @@ +"""Unit tests for the `dismiss_event_in_reaction` decider. + +Validation cascade pinned in order (fail-fast): + 1. InvalidDismissalReasonError on empty / whitespace-only reason + 2. InvalidDismissalReasonError on oversize reason + 3. EventAlreadyDismissedError when event cursor <= bookmark cursor + 4. happy path: emits DecisionRegistered with the expected payload + +The lexicographic-cursor check is exercised at three boundaries: + - event same tx as bookmark, earlier position + - event same tx as bookmark, same position + - event same tx as bookmark, later position (allowed) + - event earlier tx (rejected) + - event later tx (allowed) +""" + +from datetime import UTC, datetime +from uuid import UUID, uuid4 + +import pytest + +from cora.agent.errors import ( + EventAlreadyDismissedError, + InvalidDismissalReasonError, +) +from cora.agent.features.dismiss_event_in_reaction import ( + DismissalContext, + DismissEventInReaction, + decide, +) +from cora.decision.aggregates.decision.state import ( + DECISION_CONTEXT_REACTION_DISMISSAL, +) + +_NOW = datetime(2026, 6, 2, 14, 30, 0, tzinfo=UTC) +_EVENT_AT = datetime(2026, 6, 2, 14, 25, 0, tzinfo=UTC) +_DECISION_ID = UUID("01900000-0000-7000-8000-0000000d1551") +_PRINCIPAL_ID = UUID("01900000-0000-7000-8000-000000007007") +_EVENT_ID = UUID("01900000-0000-7000-8000-00000000ee01") + + +def _context( + *, + bookmark_tx: int = 100, + bookmark_pos: int = 50, + event_tx: int = 100, + event_pos: int = 51, +) -> DismissalContext: + return DismissalContext( + bookmark_transaction_id=bookmark_tx, + bookmark_position=bookmark_pos, + event_transaction_id=event_tx, + event_position=event_pos, + event_type="RunCompleted", + event_recorded_at=_EVENT_AT, + ) + + +def _command(*, reason: str = "stuck on schema rename") -> DismissEventInReaction: + return DismissEventInReaction( + subscriber_name="run_debriefer", + event_id=_EVENT_ID, + reason=reason, + ) + + +def test_empty_reason_raises_invalid_dismissal_reason() -> None: + with pytest.raises(InvalidDismissalReasonError): + decide( + None, + _command(reason=""), + context=_context(), + new_decision_id=_DECISION_ID, + principal_id=_PRINCIPAL_ID, + now=_NOW, + ) + + +def test_whitespace_only_reason_raises_invalid_dismissal_reason() -> None: + with pytest.raises(InvalidDismissalReasonError): + decide( + None, + _command(reason=" \n\t "), + context=_context(), + new_decision_id=_DECISION_ID, + principal_id=_PRINCIPAL_ID, + now=_NOW, + ) + + +def test_oversize_reason_raises_invalid_dismissal_reason() -> None: + with pytest.raises(InvalidDismissalReasonError): + decide( + None, + _command(reason="x" * 501), + context=_context(), + new_decision_id=_DECISION_ID, + principal_id=_PRINCIPAL_ID, + now=_NOW, + ) + + +def test_event_at_bookmark_cursor_raises_already_dismissed() -> None: + """Event cursor == bookmark cursor: bookmark is already there; + advancing to it is a no-op (the worker would re-deliver the SAME + event), which is dishonest semantics for an operator action.""" + with pytest.raises(EventAlreadyDismissedError): + decide( + None, + _command(), + context=_context(bookmark_tx=100, bookmark_pos=50, event_tx=100, event_pos=50), + new_decision_id=_DECISION_ID, + principal_id=_PRINCIPAL_ID, + now=_NOW, + ) + + +def test_event_behind_bookmark_same_tx_raises_already_dismissed() -> None: + with pytest.raises(EventAlreadyDismissedError): + decide( + None, + _command(), + context=_context(bookmark_tx=100, bookmark_pos=50, event_tx=100, event_pos=49), + new_decision_id=_DECISION_ID, + principal_id=_PRINCIPAL_ID, + now=_NOW, + ) + + +def test_event_behind_bookmark_earlier_tx_raises_already_dismissed() -> None: + with pytest.raises(EventAlreadyDismissedError): + decide( + None, + _command(), + context=_context(bookmark_tx=100, bookmark_pos=50, event_tx=99, event_pos=999), + new_decision_id=_DECISION_ID, + principal_id=_PRINCIPAL_ID, + now=_NOW, + ) + + +def test_event_ahead_same_tx_emits_decision() -> None: + event = decide( + None, + _command(), + context=_context(bookmark_tx=100, bookmark_pos=50, event_tx=100, event_pos=51), + new_decision_id=_DECISION_ID, + principal_id=_PRINCIPAL_ID, + now=_NOW, + ) + + assert event.decision_id == _DECISION_ID + assert event.actor_id == _PRINCIPAL_ID + assert event.context == DECISION_CONTEXT_REACTION_DISMISSAL + assert event.choice == "EventDismissed" + assert event.occurred_at == _NOW + + +def test_event_ahead_later_tx_emits_decision() -> None: + event = decide( + None, + _command(), + context=_context(bookmark_tx=100, bookmark_pos=50, event_tx=101, event_pos=1), + new_decision_id=_DECISION_ID, + principal_id=_PRINCIPAL_ID, + now=_NOW, + ) + + assert event.choice == "EventDismissed" + assert event.occurred_at == _NOW + + +def test_happy_path_payload_carries_audit_fields() -> None: + """The Decision's `inputs` carries every cursor the operator + needs to reconstruct the dismissal context after the fact.""" + event = decide( + None, + _command(reason="schema drift after rename"), + context=_context(bookmark_tx=42, bookmark_pos=7, event_tx=42, event_pos=8), + new_decision_id=_DECISION_ID, + principal_id=_PRINCIPAL_ID, + now=_NOW, + ) + + assert event.inputs is not None + assert event.inputs["subscriber_name"] == "run_debriefer" + assert event.inputs["event_id"] == str(_EVENT_ID) + assert event.inputs["event_type"] == "RunCompleted" + assert event.inputs["event_transaction_id"] == "42" + assert event.inputs["event_position"] == "8" + assert event.inputs["previous_bookmark_transaction_id"] == "42" + assert event.inputs["previous_bookmark_position"] == "7" + assert event.reasoning is not None + assert "schema drift after rename" in event.reasoning + assert str(_EVENT_ID) in event.reasoning + + +def test_reason_is_stripped_before_emission() -> None: + event = decide( + None, + _command(reason=" stripped reason "), + context=_context(), + new_decision_id=_DECISION_ID, + principal_id=_PRINCIPAL_ID, + now=_NOW, + ) + assert event.reasoning is not None + assert "stripped reason" in event.reasoning + assert " stripped reason " not in event.reasoning + + +def test_decider_is_pure_no_side_effect_on_inputs() -> None: + """Pass the same inputs twice, assert identical outputs. Pins + the decider's purity claim (no hidden state, no clock reads).""" + ctx = _context() + cmd = _command() + first = decide( + None, + cmd, + context=ctx, + new_decision_id=_DECISION_ID, + principal_id=_PRINCIPAL_ID, + now=_NOW, + ) + second = decide( + None, + cmd, + context=ctx, + new_decision_id=_DECISION_ID, + principal_id=_PRINCIPAL_ID, + now=_NOW, + ) + assert first == second + + +# Suppress unused-id warning since we exercise the structural-typing +# UUID for completeness across cases. +_ = uuid4 diff --git a/apps/api/tests/unit/agent/test_dismiss_event_in_reaction_decider_properties.py b/apps/api/tests/unit/agent/test_dismiss_event_in_reaction_decider_properties.py new file mode 100644 index 000000000..202763dda --- /dev/null +++ b/apps/api/tests/unit/agent/test_dismiss_event_in_reaction_decider_properties.py @@ -0,0 +1,272 @@ +"""Property-based tests for `dismiss_event_in_reaction.decide`. + +Pins universal behaviour across generated inputs: + + - Empty / whitespace-only reason → InvalidDismissalReasonError, always. + - Oversize reason (>500 chars after strip) → InvalidDismissalReasonError, always. + - Event cursor (event_tx, event_pos) <= bookmark cursor + (bookmark_tx, bookmark_pos) → EventAlreadyDismissedError, always + (lexicographic comparison). + - Event cursor strictly > bookmark cursor with a valid reason → + a single DecisionRegistered with the correct context, choice, + and audit payload. + - Pure: same inputs → same output. +""" + +from __future__ import annotations + +from datetime import UTC, datetime +from typing import TYPE_CHECKING + +import pytest +from hypothesis import given +from hypothesis import strategies as st + +if TYPE_CHECKING: + from uuid import UUID + +from cora.agent.errors import ( + EventAlreadyDismissedError, + InvalidDismissalReasonError, +) +from cora.agent.features.dismiss_event_in_reaction import ( + DismissalContext, + DismissEventInReaction, + decide, +) +from cora.decision.aggregates.decision.state import ( + DECISION_CONTEXT_REACTION_DISMISSAL, +) + +_NOW = datetime(2026, 6, 2, 14, 30, 0, tzinfo=UTC) +_EVENT_AT = datetime(2026, 6, 2, 14, 25, 0, tzinfo=UTC) + + +def _ctx( + *, + bookmark_tx: int, + bookmark_pos: int, + event_tx: int, + event_pos: int, +) -> DismissalContext: + return DismissalContext( + bookmark_transaction_id=bookmark_tx, + bookmark_position=bookmark_pos, + event_transaction_id=event_tx, + event_position=event_pos, + event_type="ProbeEvent", + event_recorded_at=_EVENT_AT, + ) + + +@pytest.mark.unit +@given( + subscriber_name=st.text(min_size=1, max_size=50), + event_id=st.uuids(), + whitespace=st.text(alphabet=" \t\n", min_size=0, max_size=20), + decision_id=st.uuids(), + principal_id=st.uuids(), +) +def test_whitespace_only_reason_always_raises_invalid( + subscriber_name: str, + event_id: UUID, + whitespace: str, + decision_id: UUID, + principal_id: UUID, +) -> None: + with pytest.raises(InvalidDismissalReasonError): + decide( + None, + DismissEventInReaction( + subscriber_name=subscriber_name, + event_id=event_id, + reason=whitespace, + ), + context=_ctx(bookmark_tx=0, bookmark_pos=0, event_tx=1, event_pos=1), + new_decision_id=decision_id, + principal_id=principal_id, + now=_NOW, + ) + + +@pytest.mark.unit +@given( + subscriber_name=st.text(min_size=1, max_size=50), + event_id=st.uuids(), + excess_chars=st.integers(min_value=1, max_value=2000), + decision_id=st.uuids(), + principal_id=st.uuids(), +) +def test_oversize_reason_always_raises_invalid( + subscriber_name: str, + event_id: UUID, + excess_chars: int, + decision_id: UUID, + principal_id: UUID, +) -> None: + """Any reason with stripped length > 500 raises.""" + reason = "x" * (500 + excess_chars) + with pytest.raises(InvalidDismissalReasonError): + decide( + None, + DismissEventInReaction( + subscriber_name=subscriber_name, + event_id=event_id, + reason=reason, + ), + context=_ctx(bookmark_tx=0, bookmark_pos=0, event_tx=1, event_pos=1), + new_decision_id=decision_id, + principal_id=principal_id, + now=_NOW, + ) + + +@pytest.mark.unit +@given( + subscriber_name=st.text(min_size=1, max_size=50), + event_id=st.uuids(), + reason=st.text(min_size=1, max_size=500).filter(lambda r: r.strip()), + bookmark_tx=st.integers(min_value=1, max_value=10000), + bookmark_pos=st.integers(min_value=1, max_value=10000), + decision_id=st.uuids(), + principal_id=st.uuids(), +) +def test_event_at_or_behind_bookmark_always_raises_already_dismissed( + subscriber_name: str, + event_id: UUID, + reason: str, + bookmark_tx: int, + bookmark_pos: int, + decision_id: UUID, + principal_id: UUID, +) -> None: + """Event cursor strictly <= bookmark cursor raises (no rewinds).""" + with pytest.raises(EventAlreadyDismissedError): + decide( + None, + DismissEventInReaction( + subscriber_name=subscriber_name, + event_id=event_id, + reason=reason, + ), + context=_ctx( + bookmark_tx=bookmark_tx, + bookmark_pos=bookmark_pos, + event_tx=bookmark_tx, + event_pos=bookmark_pos, + ), + new_decision_id=decision_id, + principal_id=principal_id, + now=_NOW, + ) + + +@pytest.mark.unit +@given( + subscriber_name=st.text(min_size=1, max_size=50), + event_id=st.uuids(), + reason=st.text(min_size=1, max_size=500).filter(lambda r: r.strip()), + bookmark_tx=st.integers(min_value=0, max_value=10000), + bookmark_pos=st.integers(min_value=0, max_value=10000), + pos_delta=st.integers(min_value=1, max_value=10000), + decision_id=st.uuids(), + principal_id=st.uuids(), +) +def test_event_strictly_ahead_emits_decision_with_expected_payload( + subscriber_name: str, + event_id: UUID, + reason: str, + bookmark_tx: int, + bookmark_pos: int, + pos_delta: int, + decision_id: UUID, + principal_id: UUID, +) -> None: + """Event strictly ahead in (tx, pos) order always yields a single + DecisionRegistered with the expected context + choice + audit + payload.""" + event_tx = bookmark_tx + event_pos = bookmark_pos + pos_delta + + decision = decide( + None, + DismissEventInReaction( + subscriber_name=subscriber_name, + event_id=event_id, + reason=reason, + ), + context=_ctx( + bookmark_tx=bookmark_tx, + bookmark_pos=bookmark_pos, + event_tx=event_tx, + event_pos=event_pos, + ), + new_decision_id=decision_id, + principal_id=principal_id, + now=_NOW, + ) + + assert decision.decision_id == decision_id + assert decision.actor_id == principal_id + assert decision.context == DECISION_CONTEXT_REACTION_DISMISSAL + assert decision.choice == "EventDismissed" + assert decision.occurred_at == _NOW + assert decision.inputs is not None + assert decision.inputs["subscriber_name"] == subscriber_name + assert decision.inputs["event_id"] == str(event_id) + assert decision.inputs["event_transaction_id"] == str(event_tx) + assert decision.inputs["event_position"] == str(event_pos) + assert decision.inputs["previous_bookmark_transaction_id"] == str(bookmark_tx) + assert decision.inputs["previous_bookmark_position"] == str(bookmark_pos) + + +@pytest.mark.unit +@given( + subscriber_name=st.text(min_size=1, max_size=50), + event_id=st.uuids(), + reason=st.text(min_size=1, max_size=500).filter(lambda r: r.strip()), + bookmark_tx=st.integers(min_value=0, max_value=10000), + bookmark_pos=st.integers(min_value=0, max_value=10000), + pos_delta=st.integers(min_value=1, max_value=10000), + decision_id=st.uuids(), + principal_id=st.uuids(), +) +def test_decider_is_pure_same_inputs_same_output( + subscriber_name: str, + event_id: UUID, + reason: str, + bookmark_tx: int, + bookmark_pos: int, + pos_delta: int, + decision_id: UUID, + principal_id: UUID, +) -> None: + cmd = DismissEventInReaction( + subscriber_name=subscriber_name, + event_id=event_id, + reason=reason, + ) + ctx = _ctx( + bookmark_tx=bookmark_tx, + bookmark_pos=bookmark_pos, + event_tx=bookmark_tx, + event_pos=bookmark_pos + pos_delta, + ) + + first = decide( + None, + cmd, + context=ctx, + new_decision_id=decision_id, + principal_id=principal_id, + now=_NOW, + ) + second = decide( + None, + cmd, + context=ctx, + new_decision_id=decision_id, + principal_id=principal_id, + now=_NOW, + ) + assert first == second diff --git a/apps/api/tests/unit/agent/test_dismiss_event_in_reaction_handler.py b/apps/api/tests/unit/agent/test_dismiss_event_in_reaction_handler.py new file mode 100644 index 000000000..3cd0d86e2 --- /dev/null +++ b/apps/api/tests/unit/agent/test_dismiss_event_in_reaction_handler.py @@ -0,0 +1,100 @@ +"""Unit tests for the `dismiss_event_in_reaction` handler. + +The handler is mostly SQL plumbing; the unit layer pins the two +shapes that don't need a real Postgres: + + - Authz deny short-circuits BEFORE any pool acquisition. + - In-memory mode (pool is None) raises DismissalRequiresPostgresError + BEFORE any decider call. + +Everything else (bookmark NotFound, event NotFound, happy-path atomic +write) is covered in the integration test against real PG, where the +SQL semantics actually matter. +""" + +from datetime import UTC, datetime +from unittest.mock import AsyncMock, MagicMock +from uuid import UUID + +import pytest + +from cora.agent.errors import ( + DismissalRequiresPostgresError, + UnauthorizedError, +) +from cora.agent.features.dismiss_event_in_reaction import ( + DismissEventInReaction, + bind, +) +from cora.infrastructure.ports.authorize import Allow, Deny + +_NOW = datetime(2026, 6, 2, 14, 30, 0, tzinfo=UTC) +_PRINCIPAL_ID = UUID("01900000-0000-7000-8000-000000007007") +_CORRELATION_ID = UUID("01900000-0000-7000-8000-00000000c1d0") +_EVENT_ID = UUID("01900000-0000-7000-8000-00000000ee01") + + +def _stub_kernel(*, pool: object | None, authz_allows: bool) -> MagicMock: + """Build a minimal Kernel-shaped MagicMock for the two early + branches we exercise here. The full Kernel is heavy; the handler + only touches authz + pool + clock + id_generator before raising + on those early branches, so a duck-typed MagicMock without + `spec=` is enough.""" + kernel = MagicMock() + kernel.pool = pool + kernel.clock.now = MagicMock(return_value=_NOW) + kernel.id_generator.new_id = MagicMock( + return_value=UUID("01900000-0000-7000-8000-0000000d1551"), + ) + if authz_allows: + kernel.authz.authorize = AsyncMock(return_value=Allow()) + else: + kernel.authz.authorize = AsyncMock(return_value=Deny(reason="denied by policy")) + return kernel + + +def _command() -> DismissEventInReaction: + return DismissEventInReaction( + subscriber_name="run_debriefer", + event_id=_EVENT_ID, + reason="stuck on schema rename", + ) + + +@pytest.mark.asyncio +async def test_authz_deny_raises_unauthorized_without_touching_pool() -> None: + """Authz runs first; a Deny verdict short-circuits with + UnauthorizedError BEFORE any pool acquisition. Pin so a future + refactor that reorders authz after pool acquisition would + leak pool connections on every denied call.""" + pool = MagicMock() + pool.acquire = MagicMock(side_effect=AssertionError("pool must not be touched")) + kernel = _stub_kernel(pool=pool, authz_allows=False) + + handler = bind(kernel) + + with pytest.raises(UnauthorizedError): + await handler( + _command(), + principal_id=_PRINCIPAL_ID, + correlation_id=_CORRELATION_ID, + ) + + assert not pool.acquire.called + + +@pytest.mark.asyncio +async def test_in_memory_mode_raises_dismissal_requires_postgres() -> None: + """`deps.pool is None` (in-memory test config / dev without PG): + the slice cannot advance projection_bookmarks because the table + does not exist. Raise loudly instead of silently writing a + Decision-only no-op.""" + kernel = _stub_kernel(pool=None, authz_allows=True) + handler = bind(kernel) + + with pytest.raises(DismissalRequiresPostgresError): + await handler( + _command(), + principal_id=_PRINCIPAL_ID, + correlation_id=_CORRELATION_ID, + ) diff --git a/apps/api/tests/unit/agent/test_reaction_classification.py b/apps/api/tests/unit/agent/test_reaction_classification.py new file mode 100644 index 000000000..631b592c5 --- /dev/null +++ b/apps/api/tests/unit/agent/test_reaction_classification.py @@ -0,0 +1,52 @@ +"""Both Agent BC subscribers classify as `Reaction` with `batch_size = 1`. + +Pins the contract that the Reaction Protocol shipping introduced: +each LLM-bound Subscriber declares `batch_size = 1` so the worker +bounds the bookmark transaction to a single LLM round-trip and does +not starve Projection advance loops sharing the same pool. + +If a future Reaction author forgets to declare `batch_size`, the +worker silently inherits the worker-level default (100) and the +incident the docstring warned about becomes real. This test surfaces +the omission at unit-test time instead of at first wedge. +""" + +from cora.agent.subscribers.caution_drafter import CautionDrafterSubscriber +from cora.agent.subscribers.run_debriefer import RunDebrieferSubscriber +from cora.infrastructure.projection.handler import Reaction + + +def test_run_debriefer_subscriber_is_a_reaction() -> None: + """Structural-typing check: the class satisfies the Reaction + Protocol at the type level (name, subscribed_event_types, + batch_size, async apply).""" + assert isinstance(RunDebrieferSubscriber.__dict__["name"], str) + assert isinstance(RunDebrieferSubscriber.__dict__["subscribed_event_types"], frozenset) + assert isinstance(RunDebrieferSubscriber.__dict__["batch_size"], int) + + +def test_caution_drafter_subscriber_is_a_reaction() -> None: + assert isinstance(CautionDrafterSubscriber.__dict__["name"], str) + assert isinstance(CautionDrafterSubscriber.__dict__["subscribed_event_types"], frozenset) + assert isinstance(CautionDrafterSubscriber.__dict__["batch_size"], int) + + +def test_run_debriefer_pins_batch_size_to_one() -> None: + """Every LLM-bound Reaction MUST pin batch_size to 1 (a slow LLM + call should not hold the pool connection across N events). + Update the docstring + Reaction Protocol contract before changing.""" + assert RunDebrieferSubscriber.batch_size == 1 + + +def test_caution_drafter_pins_batch_size_to_one() -> None: + assert CautionDrafterSubscriber.batch_size == 1 + + +def test_reaction_protocol_is_runtime_checkable() -> None: + """The Reaction Protocol is `@runtime_checkable` so a test or + arch-fitness check can isinstance-test against it. Without + runtime_checkable, this assertion would raise TypeError.""" + # `isinstance(obj, Reaction)` only checks that the required + # attributes exist; values are not validated. The point of this + # test is that the Protocol declaration itself permits the check. + assert hasattr(Reaction, "__protocol_attrs__") or hasattr(Reaction, "_is_protocol") diff --git a/apps/api/tests/unit/infrastructure/test_worker_batch_size.py b/apps/api/tests/unit/infrastructure/test_worker_batch_size.py new file mode 100644 index 000000000..ef284f506 --- /dev/null +++ b/apps/api/tests/unit/infrastructure/test_worker_batch_size.py @@ -0,0 +1,150 @@ +"""Worker honors per-subscriber `batch_size` (Reaction = 1, Projection = 100). + +Pins the behavior added when `Reaction` Protocol shipped: the worker +reads `batch_size` from each Subscriber via `getattr` and passes it +to `advance_subscriber_once`. Existing Projections that pre-date the +attribute fall back to the worker-level default. +""" + +# pyright: reportPrivateUsage=false + +import asyncio +import contextlib +from typing import Any +from unittest.mock import AsyncMock, patch + +import pytest + +from cora.infrastructure.projection.handler import DEFAULT_BATCH_SIZE +from cora.infrastructure.projection.registry import ProjectionRegistry +from cora.infrastructure.projection.wakeup import PollOnlyWakeup +from cora.infrastructure.projection.worker import ProjectionWorker + + +class _ReactionLike: + """Probe Subscriber with `batch_size = 1` (the Reaction convention).""" + + name = "probe_reaction" + subscribed_event_types = frozenset({"DummyEvent"}) + batch_size = 1 + + async def apply(self, event: Any, conn: Any) -> None: ... + + +class _ProjectionLike: + """Probe Subscriber WITHOUT a `batch_size` attribute, simulating a + pre-existing Projection that pre-dates per-subscriber tuning.""" + + name = "probe_projection" + subscribed_event_types = frozenset({"DummyEvent"}) + + async def apply(self, event: Any, conn: Any) -> None: ... + + +class _ProjectionWithBatch: + """Probe Projection that declares a custom `batch_size`.""" + + name = "probe_projection_explicit" + subscribed_event_types = frozenset({"DummyEvent"}) + batch_size = 250 + + async def apply(self, event: Any, conn: Any) -> None: ... + + +async def _run_one_iteration(worker: ProjectionWorker, subscriber: Any) -> None: + """Drive a single advance iteration of the worker's loop without + running forever. The mocked `advance_subscriber_once` returns 0 + immediately so the loop goes to `wakeup.wait`; we never get + there because we cancel before then.""" + loop_task = asyncio.create_task(worker._advance_loop(subscriber)) + await asyncio.sleep(0.05) + loop_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await loop_task + + +@pytest.mark.asyncio +async def test_worker_reads_reaction_batch_size_one() -> None: + """A Subscriber that declares `batch_size = 1` is advanced with + batch_size=1, regardless of the worker-level default.""" + registry = ProjectionRegistry() + reaction = _ReactionLike() + registry.register(reaction) + + worker = ProjectionWorker( + pool=AsyncMock(), + registry=registry, + wakeup=PollOnlyWakeup(), + poll_interval_seconds=10.0, + batch_size=DEFAULT_BATCH_SIZE, + ) + + advance = AsyncMock(return_value=0) + with patch( + "cora.infrastructure.projection.worker.advance_subscriber_once", + advance, + ): + await _run_one_iteration(worker, reaction) + + assert advance.called + _, kwargs = advance.call_args + assert kwargs["batch_size"] == 1, ( + f"Expected reaction.batch_size=1 to be passed, got {kwargs['batch_size']}" + ) + + +@pytest.mark.asyncio +async def test_worker_falls_back_to_default_when_subscriber_omits_batch_size() -> None: + """A Subscriber that does NOT declare `batch_size` falls back to + the worker-level default. Backward compatibility for the 14 + Projections that pre-date the per-subscriber knob.""" + registry = ProjectionRegistry() + projection = _ProjectionLike() + registry.register(projection) + + worker = ProjectionWorker( + pool=AsyncMock(), + registry=registry, + wakeup=PollOnlyWakeup(), + poll_interval_seconds=10.0, + batch_size=DEFAULT_BATCH_SIZE, + ) + + advance = AsyncMock(return_value=0) + with patch( + "cora.infrastructure.projection.worker.advance_subscriber_once", + advance, + ): + await _run_one_iteration(worker, projection) + + assert advance.called + _, kwargs = advance.call_args + assert kwargs["batch_size"] == DEFAULT_BATCH_SIZE + + +@pytest.mark.asyncio +async def test_worker_honors_explicit_projection_batch_size() -> None: + """A Projection that overrides `batch_size` (e.g., to 250 for a + high-throughput projection) wins over the worker-level default.""" + registry = ProjectionRegistry() + projection = _ProjectionWithBatch() + registry.register(projection) + + worker = ProjectionWorker( + pool=AsyncMock(), + registry=registry, + wakeup=PollOnlyWakeup(), + poll_interval_seconds=10.0, + batch_size=DEFAULT_BATCH_SIZE, + ) + + advance = AsyncMock(return_value=0) + with patch( + "cora.infrastructure.projection.worker.advance_subscriber_once", + advance, + ): + await _run_one_iteration(worker, projection) + + assert advance.called + _, kwargs = advance.call_args + assert kwargs["batch_size"] == 250