Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 97 additions & 10 deletions hindsight-api-slim/hindsight_api/worker/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,19 +302,106 @@ async def _mark_completed(self, operation_id: str, schema: str | None):
)

async def _mark_failed(self, operation_id: str, error_message: str, schema: str | None):
"""Mark a task as failed with error message."""
"""Mark a task as failed with error message, then propagate to parent if applicable."""
table = fq_table("async_operations", schema)
# Truncate error message if too long (max 5000 chars in schema)
error_message = error_message[:5000] if len(error_message) > 5000 else error_message
await self._pool.execute(
f"""
UPDATE {table}
SET status = 'failed', error_message = $2, completed_at = now(), updated_at = now()
WHERE operation_id = $1
""",
operation_id,
error_message,
)

async with self._pool.acquire() as conn:
async with conn.transaction():
await conn.execute(
f"""
UPDATE {table}
SET status = 'failed', error_message = $2, completed_at = now(), updated_at = now()
WHERE operation_id = $1
""",
operation_id,
error_message,
)
await self._maybe_update_parent_operation(operation_id, schema, conn)

async def _maybe_update_parent_operation(self, child_operation_id: str, schema: str | None, conn) -> None:
"""If this operation is a child of a batch_retain, update the parent status when all siblings are done.

Must be called within an active transaction that has already updated the child's status.
The memory engine has an equivalent method that runs inside task execution transactions.
This poller-level version handles the case where a task fails via an unhandled exception
that bypasses the memory engine's own failure path (e.g. a DB constraint violation that
rolls back the engine's transaction before it can update the parent).
"""
import json
import uuid

table = fq_table("async_operations", schema)

try:
row = await conn.fetchrow(
f"SELECT result_metadata, bank_id FROM {table} WHERE operation_id = $1",
uuid.UUID(child_operation_id),
)
if not row:
return

result_metadata = row["result_metadata"] or {}
if isinstance(result_metadata, str):
result_metadata = json.loads(result_metadata)
parent_operation_id = result_metadata.get("parent_operation_id")
if not parent_operation_id:
return

bank_id = row["bank_id"]

# Lock parent to prevent concurrent sibling updates
parent_row = await conn.fetchrow(
f"SELECT operation_id FROM {table} WHERE operation_id = $1 AND bank_id = $2 FOR UPDATE",
uuid.UUID(parent_operation_id),
bank_id,
)
if not parent_row:
return

# Check whether all siblings are done
siblings = await conn.fetch(
f"""
SELECT status FROM {table}
WHERE bank_id = $1
AND result_metadata::jsonb @> $2::jsonb
""",
bank_id,
json.dumps({"parent_operation_id": parent_operation_id}),
)
if not siblings or not all(s["status"] in ("completed", "failed") for s in siblings):
return

any_failed = any(s["status"] == "failed" for s in siblings)
if any_failed:
await conn.execute(
f"""
UPDATE {table}
SET status = 'failed', error_message = $2, updated_at = now()
WHERE operation_id = $1
""",
uuid.UUID(parent_operation_id),
"One or more sub-batches failed",
)
else:
await conn.execute(
f"""
UPDATE {table}
SET status = 'completed', updated_at = now(), completed_at = now()
WHERE operation_id = $1
""",
uuid.UUID(parent_operation_id),
)
logger.info(
f"Poller updated parent operation {parent_operation_id} to "
f"{'failed' if any_failed else 'completed'} (all siblings done)"
)
except Exception as e:
# Log but don't re-raise — the child has already been marked failed,
# which is the critical state change. A stuck parent will be caught on
# the next run or via monitoring.
logger.error(f"Failed to update parent operation for child {child_operation_id}: {e}")

async def _schedule_retry(self, operation_id: str, retry_at: "Any", error_message: str, schema: str | None):
"""Reset task to pending with a future retry timestamp."""
Expand Down
Loading
Loading