Skip to content

[Detail Bug] Batching API drops already-received records when source iterator raises an exception #43

@detail-app

Description

@detail-app

Detail Bug Report

https://app.detail.dev/org_89d327b3-b883-4365-b6a3-46b6701342a9/bugs/bug_4c9a3815-6b6d-4803-ae5f-89e6930e707d

Introduced in #1 by @quettabit on Apr 7, 2026

Summary

  • Context: Records successfully received and added to the internal accumulator are silently discarded when the source async iterator raises an exception. The caller receives the exception but has no way to know which records were successfully batched before the failure.
  • Bug: Records in the accumulator are lost when the source async iterator raises a non-control-flow exception (e.g., RuntimeError, IOError) before a batch is yielded.
  • Actual vs. expected: Actual: exception propagates and any accumulated-but-not-yet-yielded records are discarded (e.g., 0/3 yielded when source fails after 3 yields). Expected: exception propagates, but any records already added to the accumulator are yielded as a final partial batch before re-raising.
  • Impact: Data loss on source failure; caller cannot determine which records were processed before the error.

Code with Bug

async def append_record_batches(...) -> AsyncIterable[list[Record]]:
    ...
    try:
        while True:
            ...
            next_task = asyncio.create_task(anext(record_iter, None))
            done, _ = await asyncio.wait({next_task}, timeout=remaining)
            if not done:
                pending_next = next_task
                break
            ...
    finally:
        if pending_next is not None:
            pending_next.cancel()  # <-- BUG 🔴 accumulated records are discarded on source exception (no yield before re-raise)

Explanation

append_record_batches only treats StopAsyncIteration/timeout as normal control flow. If the source iterator raises any other exception mid-stream, the exception propagates immediately and the generator exits without yielding acc (the internal accumulator). As a result, records that were successfully processed via acc.add(record) are lost to the caller.

Empirical repro shows the loss:

  • Immediate failure after yielding 3 records with linger=0: 0/3 records yielded.
  • Failure after linger: only a subset of already-received records are yielded (e.g., 2/5, 3/5).

Recommended Fix

Catch non-BaseException exceptions, yield any pending accumulator contents, then re-raise:

async def append_record_batches(...) -> AsyncIterable[list[Record]]:
    ...
    try:
        while True:
            ...
    except Exception:
        if not acc.is_empty():
            yield acc.take()
        raise
    finally:
        if pending_next is not None:
            pending_next.cancel()

History

This bug was introduced in commit 3dc9795. Subsequent refactoring in commit aaf4bfe restructured the async iteration logic to fix a linger timeout issue, but preserved the missing exception handling for source failures.

Metadata

Metadata

Assignees

Labels

detail-bugbug flagged by https://detail.dev/

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions