Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions hindsight-api-slim/hindsight_api/engine/memory_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ def fq_table(table_name: str) -> str:
return f"{get_current_schema()}.{table_name}"


def _json_default(obj: Any) -> str:
"""JSON serializer for types commonly carried through async task payloads."""
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")


# Tables that must be schema-qualified (for runtime validation)
_PROTECTED_TABLES = frozenset(
[
Expand Down Expand Up @@ -7407,28 +7414,35 @@ async def _submit_async_operation(

operation_id = uuid.uuid4()

# Insert operation record into database
# Build full payload before INSERT so task_payload is included atomically.
# Previously the INSERT omitted task_payload and a separate submit_task call
# did an UPDATE — a crash between the two left a null-payload row that the
# worker's claim query (task_payload IS NOT NULL) could never pick up.
full_payload = {
"type": task_type,
"operation_id": str(operation_id),
"bank_id": bank_id,
**task_payload,
}

# Insert operation record with task_payload in a single atomic statement
async with acquire_with_retry(pool) as conn:
await conn.execute(
f"""
INSERT INTO {fq_table("async_operations")} (operation_id, bank_id, operation_type, result_metadata, status)
VALUES ($1, $2, $3, $4, $5)
INSERT INTO {fq_table("async_operations")} (operation_id, bank_id, operation_type, result_metadata, status, task_payload)
VALUES ($1, $2, $3, $4, $5, $6::jsonb)
""",
operation_id,
bank_id,
operation_type,
json.dumps(result_metadata or {}),
json.dumps(result_metadata or {}, default=_json_default),
"pending",
json.dumps(full_payload, default=_json_default),
)

# Build and submit task payload
full_payload = {
"type": task_type,
"operation_id": str(operation_id),
"bank_id": bank_id,
**task_payload,
}

# For SyncTaskBackend: executes the task immediately.
# For BrokerTaskBackend: does an idempotent UPDATE (payload already set above),
# kept for symmetry and to support any future notification mechanisms.
await self._task_backend.submit_task(full_payload)

logger.info(f"{operation_type} task queued for bank_id={bank_id}, operation_id={operation_id}")
Expand Down
72 changes: 72 additions & 0 deletions hindsight-api-slim/tests/test_file_retain.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import asyncio
import io
import json
from datetime import datetime, timezone

import pytest
from httpx import ASGITransport, AsyncClient
Expand Down Expand Up @@ -471,6 +472,77 @@ async def read(self):
assert len(doc["original_text"]) > 0


@pytest.mark.asyncio
async def test_async_file_retain_serializes_datetime_timestamp(memory_no_llm_verify, sample_txt_content):
"""Async file retain should accept Python datetimes in task payloads."""
from hindsight_api.engine.parsers.base import FileParser
from hindsight_api.models import RequestContext

bank_id = f"test_file_timestamp_bank_{datetime.now(timezone.utc).timestamp()}"
timestamp = datetime(2024, 1, 15, 10, 30, tzinfo=timezone.utc)

context = RequestContext(internal=True)
await memory_no_llm_verify.get_bank_profile(bank_id, request_context=context)

class MockFile:
def __init__(self, content, filename, content_type):
self.content = content
self.filename = filename
self.content_type = content_type

async def read(self):
return self.content

class TimestampParser(FileParser):
async def convert(self, file_data: bytes, filename: str) -> str:
return file_data.decode("utf-8")

def supports(self, filename: str, content_type: str | None = None) -> bool:
return filename.endswith(".txt")

def name(self) -> str:
return "timestamp_parser"

memory_no_llm_verify._parser_registry.register(TimestampParser())

mock_file = MockFile(sample_txt_content, "timestamped.txt", "text/plain")

result = await memory_no_llm_verify.submit_async_file_retain(
bank_id=bank_id,
file_items=[
{
"file": mock_file,
"document_id": "timestamped_doc",
"context": "timestamp test",
"metadata": {},
"tags": [],
"timestamp": timestamp,
"parser": ["timestamp_parser"],
}
],
document_tags=None,
request_context=context,
)

operation_id = result["operation_ids"][0]
pool = await memory_no_llm_verify._get_pool()
from hindsight_api.engine.memory_engine import get_current_schema

async with pool.acquire() as conn:
row = await conn.fetchrow(
f"""
SELECT status, task_payload->>'timestamp' AS timestamp
FROM {get_current_schema()}.async_operations
WHERE operation_id = $1
""",
operation_id,
)

assert row is not None
assert row["status"] == "completed"
assert row["timestamp"] == "2024-01-15T10:30:00+00:00"


@pytest.mark.asyncio
async def test_file_conversion_failure_sets_status_to_failed(memory_no_llm_verify, sample_txt_content):
"""Test that when file conversion fails, the operation status is set to 'failed' not 'completed'."""
Expand Down
Loading