From 1fde66a0c55370c5ad0fdc7f3963272c8deb8607 Mon Sep 17 00:00:00 2001 From: Chris Bartholomew Date: Thu, 19 Mar 2026 09:33:21 -0400 Subject: [PATCH 1/2] Fix orphaned batch_retain parents when child fails via unhandled exception When a child retain operation fails with an unhandled exception (e.g. a DB constraint violation), the memory engine's transaction is rolled back entirely, including any call to _maybe_update_parent_operation. The poller's fallback _mark_failed then updates the child status but leaves the parent batch_retain permanently stuck in 'pending'. Fix: wrap _mark_failed in a transaction and call a new poller-level _maybe_update_parent_operation after marking the child failed. This mirrors the memory engine's own parent-update logic and ensures the parent is resolved to completed/failed regardless of how the child failure was detected. The poller's implementation locks the parent row, checks all siblings, and only finalises the parent once all siblings have reached a terminal state. Errors in parent propagation are logged but do not affect the child failure path, which is the critical state change. --- .../hindsight_api/worker/poller.py | 107 ++++++++++++++++-- 1 file changed, 97 insertions(+), 10 deletions(-) diff --git a/hindsight-api-slim/hindsight_api/worker/poller.py b/hindsight-api-slim/hindsight_api/worker/poller.py index 775a7f8f2..9f556da9c 100644 --- a/hindsight-api-slim/hindsight_api/worker/poller.py +++ b/hindsight-api-slim/hindsight_api/worker/poller.py @@ -302,19 +302,106 @@ async def _mark_completed(self, operation_id: str, schema: str | None): ) async def _mark_failed(self, operation_id: str, error_message: str, schema: str | None): - """Mark a task as failed with error message.""" + """Mark a task as failed with error message, then propagate to parent if applicable.""" table = fq_table("async_operations", schema) # Truncate error message if too long (max 5000 chars in schema) error_message = error_message[:5000] if len(error_message) > 5000 else error_message - await self._pool.execute( - f""" - UPDATE {table} - SET status = 'failed', error_message = $2, completed_at = now(), updated_at = now() - WHERE operation_id = $1 - """, - operation_id, - error_message, - ) + + async with self._pool.acquire() as conn: + async with conn.transaction(): + await conn.execute( + f""" + UPDATE {table} + SET status = 'failed', error_message = $2, completed_at = now(), updated_at = now() + WHERE operation_id = $1 + """, + operation_id, + error_message, + ) + await self._maybe_update_parent_operation(operation_id, schema, conn) + + async def _maybe_update_parent_operation(self, child_operation_id: str, schema: str | None, conn) -> None: + """If this operation is a child of a batch_retain, update the parent status when all siblings are done. + + Must be called within an active transaction that has already updated the child's status. + The memory engine has an equivalent method that runs inside task execution transactions. + This poller-level version handles the case where a task fails via an unhandled exception + that bypasses the memory engine's own failure path (e.g. a DB constraint violation that + rolls back the engine's transaction before it can update the parent). + """ + import json + import uuid + + table = fq_table("async_operations", schema) + + try: + row = await conn.fetchrow( + f"SELECT result_metadata, bank_id FROM {table} WHERE operation_id = $1", + uuid.UUID(child_operation_id), + ) + if not row: + return + + result_metadata = row["result_metadata"] or {} + if isinstance(result_metadata, str): + result_metadata = json.loads(result_metadata) + parent_operation_id = result_metadata.get("parent_operation_id") + if not parent_operation_id: + return + + bank_id = row["bank_id"] + + # Lock parent to prevent concurrent sibling updates + parent_row = await conn.fetchrow( + f"SELECT operation_id FROM {table} WHERE operation_id = $1 AND bank_id = $2 FOR UPDATE", + uuid.UUID(parent_operation_id), + bank_id, + ) + if not parent_row: + return + + # Check whether all siblings are done + siblings = await conn.fetch( + f""" + SELECT status FROM {table} + WHERE bank_id = $1 + AND result_metadata::jsonb @> $2::jsonb + """, + bank_id, + json.dumps({"parent_operation_id": parent_operation_id}), + ) + if not siblings or not all(s["status"] in ("completed", "failed") for s in siblings): + return + + any_failed = any(s["status"] == "failed" for s in siblings) + if any_failed: + await conn.execute( + f""" + UPDATE {table} + SET status = 'failed', error_message = $2, updated_at = now() + WHERE operation_id = $1 + """, + uuid.UUID(parent_operation_id), + "One or more sub-batches failed", + ) + else: + await conn.execute( + f""" + UPDATE {table} + SET status = 'completed', updated_at = now(), completed_at = now() + WHERE operation_id = $1 + """, + uuid.UUID(parent_operation_id), + ) + logger.info( + f"Poller updated parent operation {parent_operation_id} to " + f"{'failed' if any_failed else 'completed'} (all siblings done)" + ) + except Exception as e: + # Log but don't re-raise — the child has already been marked failed, + # which is the critical state change. A stuck parent will be caught on + # the next run or via monitoring. + logger.error(f"Failed to update parent operation for child {child_operation_id}: {e}") async def _schedule_retry(self, operation_id: str, retry_at: "Any", error_message: str, schema: str | None): """Reset task to pending with a future retry timestamp.""" From c63b9edbfb4221c6416c36516fb6ef8523258f8b Mon Sep 17 00:00:00 2001 From: Chris Bartholomew Date: Thu, 19 Mar 2026 09:37:56 -0400 Subject: [PATCH 2/2] Add tests for _mark_failed parent propagation in WorkerPoller MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tests cover the new _maybe_update_parent_operation logic: - Last sibling fails → parent batch_retain becomes failed - Sole child fails → parent becomes failed - Sibling still pending → parent stays pending (no premature resolution) - No parent in result_metadata → safe no-op - End-to-end: unhandled exception via execute_task propagates to parent --- hindsight-api-slim/tests/test_worker.py | 246 +++++++++++++++++++++++- 1 file changed, 237 insertions(+), 9 deletions(-) diff --git a/hindsight-api-slim/tests/test_worker.py b/hindsight-api-slim/tests/test_worker.py index 64bafddee..219260691 100644 --- a/hindsight-api-slim/tests/test_worker.py +++ b/hindsight-api-slim/tests/test_worker.py @@ -58,10 +58,14 @@ async def pool(pg0_db_url): async def clean_operations(pool): """Clean up async_operations table before and after tests.""" # Clean before test - covers both 'test-worker-' and 'test_worker_recovery' patterns - await pool.execute("DELETE FROM async_operations WHERE bank_id LIKE 'test-worker-%' OR bank_id LIKE 'test_worker_%'") + await pool.execute( + "DELETE FROM async_operations WHERE bank_id LIKE 'test-worker-%' OR bank_id LIKE 'test_worker_%'" + ) yield # Clean after test - await pool.execute("DELETE FROM async_operations WHERE bank_id LIKE 'test-worker-%' OR bank_id LIKE 'test_worker_%'") + await pool.execute( + "DELETE FROM async_operations WHERE bank_id LIKE 'test-worker-%' OR bank_id LIKE 'test_worker_%'" + ) class TestBrokerTaskBackend: @@ -387,9 +391,7 @@ async def failing_executor(task_dict): "SELECT status, error_message, retry_count FROM async_operations WHERE operation_id = $1", op_id, ) - assert row["status"] == "failed", ( - f"Expected 'failed' for plain exception, got '{row['status']}'" - ) + assert row["status"] == "failed", f"Expected 'failed' for plain exception, got '{row['status']}'" assert row["error_message"] is not None assert row["retry_count"] == 0 # not incremented; plain exception = immediate fail @@ -1265,7 +1267,9 @@ async def blocking_executor(task_dict: dict): for i in range(2): op_id = uuid.uuid4() task_ids.append(str(op_id)) - payload = json.dumps({"type": "test", "operation_type": "retain", "operation_id": str(op_id), "bank_id": bank_id}) + payload = json.dumps( + {"type": "test", "operation_type": "retain", "operation_id": str(op_id), "bank_id": bank_id} + ) await pool.execute( """ INSERT INTO async_operations (operation_id, bank_id, operation_type, status, task_payload) @@ -1294,7 +1298,9 @@ async def blocking_executor(task_dict: dict): for i in range(2): op_id = uuid.uuid4() task_ids.append(str(op_id)) - payload = json.dumps({"type": "test", "operation_type": "retain", "operation_id": str(op_id), "bank_id": bank_id}) + payload = json.dumps( + {"type": "test", "operation_type": "retain", "operation_id": str(op_id), "bank_id": bank_id} + ) await pool.execute( """ INSERT INTO async_operations (operation_id, bank_id, operation_type, status, task_payload) @@ -1364,7 +1370,9 @@ async def controlled_executor(task_dict: dict): await _ensure_bank(pool, bank_id) for i in range(10): op_id = uuid.uuid4() - payload = json.dumps({"type": "test", "operation_type": "retain", "operation_id": str(op_id), "bank_id": bank_id}) + payload = json.dumps( + {"type": "test", "operation_type": "retain", "operation_id": str(op_id), "bank_id": bank_id} + ) await pool.execute( """ INSERT INTO async_operations (operation_id, bank_id, operation_type, status, task_payload) @@ -1396,7 +1404,7 @@ async def controlled_executor(task_dict: dict): completed = 0 while completed < 10 and len(tasks_started) < 10: # Release the next batch - events_to_release = list(task_events.values())[completed:completed+3] + events_to_release = list(task_events.values())[completed : completed + 3] for event in events_to_release: event.set() completed += len(events_to_release) @@ -1417,3 +1425,223 @@ async def controlled_executor(task_dict: dict): await asyncio.wait_for(poll_task, timeout=1.0) except asyncio.CancelledError: pass + + +class TestMarkFailedParentPropagation: + """Tests for _mark_failed parent propagation in WorkerPoller. + + When a child retain operation fails via an unhandled exception, the memory + engine's transaction is rolled back entirely — including any call to + _maybe_update_parent_operation inside the engine. The poller's fallback + _mark_failed must detect this and finalise the parent batch_retain itself. + """ + + async def _insert_op( + self, + pool, + *, + op_id: "uuid.UUID", + bank_id: str, + operation_type: str, + status: str, + result_metadata: dict | None = None, + ) -> None: + meta_json = json.dumps(result_metadata if result_metadata is not None else {}) + await pool.execute( + """ + INSERT INTO async_operations + (operation_id, bank_id, operation_type, status, result_metadata) + VALUES ($1, $2, $3, $4, $5::jsonb) + """, + op_id, + bank_id, + operation_type, + status, + meta_json, + ) + + @pytest.mark.asyncio + async def test_mark_failed_finalises_parent_when_last_sibling_fails(self, pool, clean_operations): + """When the last pending child fails, parent batch_retain is marked failed.""" + from hindsight_api.worker import WorkerPoller + + bank_id = f"test-worker-{uuid.uuid4().hex[:8]}" + await _ensure_bank(pool, bank_id) + + parent_id = uuid.uuid4() + child1_id = uuid.uuid4() + child2_id = uuid.uuid4() + + # Parent batch_retain still pending + await self._insert_op(pool, op_id=parent_id, bank_id=bank_id, operation_type="batch_retain", status="pending") + + # child1 already completed + await self._insert_op( + pool, + op_id=child1_id, + bank_id=bank_id, + operation_type="retain", + status="completed", + result_metadata={"parent_operation_id": str(parent_id)}, + ) + + # child2 still processing — this is the one that will fail + await self._insert_op( + pool, + op_id=child2_id, + bank_id=bank_id, + operation_type="retain", + status="processing", + result_metadata={"parent_operation_id": str(parent_id)}, + ) + + poller = WorkerPoller(pool=pool, worker_id="test-worker-1", executor=lambda x: None) + await poller._mark_failed(str(child2_id), "DB constraint violation", schema=None) + + # child2 must be failed + child2_row = await pool.fetchrow( + "SELECT status, error_message FROM async_operations WHERE operation_id = $1", child2_id + ) + assert child2_row["status"] == "failed" + assert "DB constraint violation" in child2_row["error_message"] + + # parent must now be failed (all siblings done, at least one failed) + parent_row = await pool.fetchrow("SELECT status FROM async_operations WHERE operation_id = $1", parent_id) + assert parent_row["status"] == "failed", ( + f"Parent should be 'failed' when last sibling fails, got '{parent_row['status']}'" + ) + + @pytest.mark.asyncio + async def test_mark_failed_finalises_parent_when_last_sibling_is_sole_child(self, pool, clean_operations): + """When the only child fails, parent batch_retain becomes failed.""" + from hindsight_api.worker import WorkerPoller + + bank_id = f"test-worker-{uuid.uuid4().hex[:8]}" + await _ensure_bank(pool, bank_id) + + parent_id = uuid.uuid4() + child_id = uuid.uuid4() + + await self._insert_op(pool, op_id=parent_id, bank_id=bank_id, operation_type="batch_retain", status="pending") + await self._insert_op( + pool, + op_id=child_id, + bank_id=bank_id, + operation_type="retain", + status="processing", + result_metadata={"parent_operation_id": str(parent_id)}, + ) + + poller = WorkerPoller(pool=pool, worker_id="test-worker-1", executor=lambda x: None) + await poller._mark_failed(str(child_id), "unexpected error", schema=None) + + parent_row = await pool.fetchrow("SELECT status FROM async_operations WHERE operation_id = $1", parent_id) + assert parent_row["status"] == "failed" + + @pytest.mark.asyncio + async def test_mark_failed_does_not_finalise_parent_when_siblings_still_pending(self, pool, clean_operations): + """Parent is NOT updated while other siblings are still processing/pending.""" + from hindsight_api.worker import WorkerPoller + + bank_id = f"test-worker-{uuid.uuid4().hex[:8]}" + await _ensure_bank(pool, bank_id) + + parent_id = uuid.uuid4() + child1_id = uuid.uuid4() + child2_id = uuid.uuid4() + + await self._insert_op(pool, op_id=parent_id, bank_id=bank_id, operation_type="batch_retain", status="pending") + + # child1 is the one failing + await self._insert_op( + pool, + op_id=child1_id, + bank_id=bank_id, + operation_type="retain", + status="processing", + result_metadata={"parent_operation_id": str(parent_id)}, + ) + # child2 is still pending — not done yet + await self._insert_op( + pool, + op_id=child2_id, + bank_id=bank_id, + operation_type="retain", + status="pending", + result_metadata={"parent_operation_id": str(parent_id)}, + ) + + poller = WorkerPoller(pool=pool, worker_id="test-worker-1", executor=lambda x: None) + await poller._mark_failed(str(child1_id), "early failure", schema=None) + + # child1 is failed + child1_row = await pool.fetchrow("SELECT status FROM async_operations WHERE operation_id = $1", child1_id) + assert child1_row["status"] == "failed" + + # parent must still be pending (child2 not done) + parent_row = await pool.fetchrow("SELECT status FROM async_operations WHERE operation_id = $1", parent_id) + assert parent_row["status"] == "pending", ( + f"Parent should remain 'pending' while siblings are outstanding, got '{parent_row['status']}'" + ) + + @pytest.mark.asyncio + async def test_mark_failed_no_parent_is_safe(self, pool, clean_operations): + """Operations without a parent (no result_metadata parent_operation_id) fail cleanly.""" + from hindsight_api.worker import WorkerPoller + + bank_id = f"test-worker-{uuid.uuid4().hex[:8]}" + await _ensure_bank(pool, bank_id) + + op_id = uuid.uuid4() + await self._insert_op(pool, op_id=op_id, bank_id=bank_id, operation_type="retain", status="processing") + + poller = WorkerPoller(pool=pool, worker_id="test-worker-1", executor=lambda x: None) + # Must not raise + await poller._mark_failed(str(op_id), "standalone failure", schema=None) + + row = await pool.fetchrow("SELECT status FROM async_operations WHERE operation_id = $1", op_id) + assert row["status"] == "failed" + + @pytest.mark.asyncio + async def test_unhandled_exception_via_execute_task_propagates_to_parent(self, pool, clean_operations): + """End-to-end: executor raises a plain exception, poller calls _mark_failed, + which then resolves the parent batch_retain to failed.""" + from hindsight_api.worker import WorkerPoller + from hindsight_api.worker.poller import ClaimedTask + + bank_id = f"test-worker-{uuid.uuid4().hex[:8]}" + await _ensure_bank(pool, bank_id) + + parent_id = uuid.uuid4() + child_id = uuid.uuid4() + + await self._insert_op(pool, op_id=parent_id, bank_id=bank_id, operation_type="batch_retain", status="pending") + await self._insert_op( + pool, + op_id=child_id, + bank_id=bank_id, + operation_type="retain", + status="processing", + result_metadata={"parent_operation_id": str(parent_id)}, + ) + + async def crashing_executor(task_dict): + raise RuntimeError("Simulated DB constraint violation — transaction rolled back") + + poller = WorkerPoller(pool=pool, worker_id="test-worker-1", executor=crashing_executor) + + task_dict = {"type": "retain", "operation_id": str(child_id), "bank_id": bank_id} + claimed_task = ClaimedTask(operation_id=str(child_id), task_dict=task_dict, schema=None) + await poller.execute_task(claimed_task) + + completed = await poller.wait_for_active_tasks(timeout=5.0) + assert completed, "Task did not complete within timeout" + + child_row = await pool.fetchrow("SELECT status FROM async_operations WHERE operation_id = $1", child_id) + assert child_row["status"] == "failed" + + parent_row = await pool.fetchrow("SELECT status FROM async_operations WHERE operation_id = $1", parent_id) + assert parent_row["status"] == "failed", ( + f"Parent batch_retain should be 'failed' after child fails via unhandled exception, " + f"got '{parent_row['status']}'" + )