Skip to content

Commit 94cf89b

Browse files
Fix non-atomic async operation creation (#619)
* Fix non-atomic async operation creation in _submit_async_operation Previously the method performed two separate database round-trips: 1. INSERT into async_operations with no task_payload (null) 2. submit_task → UPDATE to set task_payload A process crash or network error between steps 1 and 2 left a row with task_payload IS NULL permanently. The worker's claim query requires task_payload IS NOT NULL, so these orphaned rows could never be picked up and the queue appeared degraded indefinitely. Fix: build full_payload before the INSERT and include task_payload in the same INSERT statement, making operation creation atomic. submit_task is still called afterwards — for SyncTaskBackend it executes the task immediately (unchanged behaviour); for BrokerTaskBackend it becomes an idempotent UPDATE (payload already set) kept for symmetry. * Preserve datetime payloads in atomic async insert
1 parent 4394245 commit 94cf89b

2 files changed

Lines changed: 98 additions & 12 deletions

File tree

hindsight-api-slim/hindsight_api/engine/memory_engine.py

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,13 @@ def fq_table(table_name: str) -> str:
6767
return f"{get_current_schema()}.{table_name}"
6868

6969

70+
def _json_default(obj: Any) -> str:
71+
"""JSON serializer for types commonly carried through async task payloads."""
72+
if isinstance(obj, datetime):
73+
return obj.isoformat()
74+
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
75+
76+
7077
# Tables that must be schema-qualified (for runtime validation)
7178
_PROTECTED_TABLES = frozenset(
7279
[
@@ -7407,28 +7414,35 @@ async def _submit_async_operation(
74077414

74087415
operation_id = uuid.uuid4()
74097416

7410-
# Insert operation record into database
7417+
# Build full payload before INSERT so task_payload is included atomically.
7418+
# Previously the INSERT omitted task_payload and a separate submit_task call
7419+
# did an UPDATE — a crash between the two left a null-payload row that the
7420+
# worker's claim query (task_payload IS NOT NULL) could never pick up.
7421+
full_payload = {
7422+
"type": task_type,
7423+
"operation_id": str(operation_id),
7424+
"bank_id": bank_id,
7425+
**task_payload,
7426+
}
7427+
7428+
# Insert operation record with task_payload in a single atomic statement
74117429
async with acquire_with_retry(pool) as conn:
74127430
await conn.execute(
74137431
f"""
7414-
INSERT INTO {fq_table("async_operations")} (operation_id, bank_id, operation_type, result_metadata, status)
7415-
VALUES ($1, $2, $3, $4, $5)
7432+
INSERT INTO {fq_table("async_operations")} (operation_id, bank_id, operation_type, result_metadata, status, task_payload)
7433+
VALUES ($1, $2, $3, $4, $5, $6::jsonb)
74167434
""",
74177435
operation_id,
74187436
bank_id,
74197437
operation_type,
7420-
json.dumps(result_metadata or {}),
7438+
json.dumps(result_metadata or {}, default=_json_default),
74217439
"pending",
7440+
json.dumps(full_payload, default=_json_default),
74227441
)
74237442

7424-
# Build and submit task payload
7425-
full_payload = {
7426-
"type": task_type,
7427-
"operation_id": str(operation_id),
7428-
"bank_id": bank_id,
7429-
**task_payload,
7430-
}
7431-
7443+
# For SyncTaskBackend: executes the task immediately.
7444+
# For BrokerTaskBackend: does an idempotent UPDATE (payload already set above),
7445+
# kept for symmetry and to support any future notification mechanisms.
74327446
await self._task_backend.submit_task(full_payload)
74337447

74347448
logger.info(f"{operation_type} task queued for bank_id={bank_id}, operation_id={operation_id}")

hindsight-api-slim/tests/test_file_retain.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import asyncio
66
import io
77
import json
8+
from datetime import datetime, timezone
89

910
import pytest
1011
from httpx import ASGITransport, AsyncClient
@@ -471,6 +472,77 @@ async def read(self):
471472
assert len(doc["original_text"]) > 0
472473

473474

475+
@pytest.mark.asyncio
476+
async def test_async_file_retain_serializes_datetime_timestamp(memory_no_llm_verify, sample_txt_content):
477+
"""Async file retain should accept Python datetimes in task payloads."""
478+
from hindsight_api.engine.parsers.base import FileParser
479+
from hindsight_api.models import RequestContext
480+
481+
bank_id = f"test_file_timestamp_bank_{datetime.now(timezone.utc).timestamp()}"
482+
timestamp = datetime(2024, 1, 15, 10, 30, tzinfo=timezone.utc)
483+
484+
context = RequestContext(internal=True)
485+
await memory_no_llm_verify.get_bank_profile(bank_id, request_context=context)
486+
487+
class MockFile:
488+
def __init__(self, content, filename, content_type):
489+
self.content = content
490+
self.filename = filename
491+
self.content_type = content_type
492+
493+
async def read(self):
494+
return self.content
495+
496+
class TimestampParser(FileParser):
497+
async def convert(self, file_data: bytes, filename: str) -> str:
498+
return file_data.decode("utf-8")
499+
500+
def supports(self, filename: str, content_type: str | None = None) -> bool:
501+
return filename.endswith(".txt")
502+
503+
def name(self) -> str:
504+
return "timestamp_parser"
505+
506+
memory_no_llm_verify._parser_registry.register(TimestampParser())
507+
508+
mock_file = MockFile(sample_txt_content, "timestamped.txt", "text/plain")
509+
510+
result = await memory_no_llm_verify.submit_async_file_retain(
511+
bank_id=bank_id,
512+
file_items=[
513+
{
514+
"file": mock_file,
515+
"document_id": "timestamped_doc",
516+
"context": "timestamp test",
517+
"metadata": {},
518+
"tags": [],
519+
"timestamp": timestamp,
520+
"parser": ["timestamp_parser"],
521+
}
522+
],
523+
document_tags=None,
524+
request_context=context,
525+
)
526+
527+
operation_id = result["operation_ids"][0]
528+
pool = await memory_no_llm_verify._get_pool()
529+
from hindsight_api.engine.memory_engine import get_current_schema
530+
531+
async with pool.acquire() as conn:
532+
row = await conn.fetchrow(
533+
f"""
534+
SELECT status, task_payload->>'timestamp' AS timestamp
535+
FROM {get_current_schema()}.async_operations
536+
WHERE operation_id = $1
537+
""",
538+
operation_id,
539+
)
540+
541+
assert row is not None
542+
assert row["status"] == "completed"
543+
assert row["timestamp"] == "2024-01-15T10:30:00+00:00"
544+
545+
474546
@pytest.mark.asyncio
475547
async def test_file_conversion_failure_sets_status_to_failed(memory_no_llm_verify, sample_txt_content):
476548
"""Test that when file conversion fails, the operation status is set to 'failed' not 'completed'."""

0 commit comments

Comments
 (0)