Skip to content

[WIP] Add non-blocking asynchronous bag splitting to Rosbag2 recorder/writer#2382

Draft
MichaelOrlov wants to merge 8 commits intorollingfrom
morlov/async_bag_split
Draft

[WIP] Add non-blocking asynchronous bag splitting to Rosbag2 recorder/writer#2382
MichaelOrlov wants to merge 8 commits intorollingfrom
morlov/async_bag_split

Conversation

@MichaelOrlov
Copy link
Copy Markdown
Contributor

@MichaelOrlov MichaelOrlov commented Mar 16, 2026

Description

Add non-blocking asynchronous bag splitting to rosbag2 recorder/writer.
At a high level, the PR changes bag splitting from a blocking storage switch into an asynchronous operation that can run without stalling ongoing message writes. The concurrency work ensures the writer can continue accepting messages while the split is being processed, and the recorder now uses that async path directly.
Change also includes test updates for the async path and removes one timing-sensitive assertion that was no longer reliable with asynchronous execution.

Is this user-facing behavior change?

Yes.
From a user perspective, bag splitting during recording becomes more robust under load.
User-visible impact:

  • recorder-triggered splits no longer block recording in the same way as before
  • incoming messages can continue to be processed while a split is happening
  • the risk of stalls and message loss during long split operations is reduced
  • service- or recorder-triggered splits may complete slightly after the exact request point because execution is asynchronous
    This is most relevant for users:
  • recording high-rate topics
  • splitting by size or time during active recording
  • triggering bag splits while recording is ongoing
  • using recorder services that depend on split behavior

Backward Compatibility

This change is largely backward compatible at the API and workflow level, but there is an observable behavioral nuance.
Compatible aspects:

  • existing recording workflows continue to work
  • bag splitting remains available through the same recorder flows
  • no user-facing CLI split options are removed or renamed
  • the new async capability is additive at the writer API level

Behavioral difference to call out:

  • split timing is no longer strictly synchronous from the caller’s perspective
  • callers should not assume the new bagfile is opened immediately at the exact instruction boundary
  • if any downstream code relied on split completion being instantaneous/blocking, that assumption may no longer hold

So the practical compatibility summary is:

  • no breaking interface change for normal users
  • minor semantic change for code or tests that depended on synchronous split completion timing

Did you use Generative AI?

Yes. I used Codex gpt-5.4 to help with some tasks and analysis of the problems.

Additional Information

Not backportable due to the API/ABI breaking changes.

This PR depends on the:

  1. Add "--max-cache-duration" option to the Recorder to facilitate the time bounded snapshot feature (#2289)
  2. Feature: Circular logging by split count (--max-bag-files) (#2218)
  3. Update Rosbag2 filename format to index+name+timestamp (#2265)
  4. Fix for a possible race condition in compression writer on close (#2362)

Signed-off-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Michael Orlov <morlovmr@gmail.com>
Preserve cache wakeup across bag split flushing

When SequentialWriter splits a bag while cache writing is enabled,
writes that arrive after CacheConsumer::begin_flushing() can still be
accepted into MessageCache::producer_buffer_. The split then flushes
the pre-split buffer, opens the new storage, and restarts the consumer
thread.

The race was that the old consumer path could clear data_ready_ while
flushing only the pre-split buffer. If a message was queued into
producer_buffer_ during that window, the restarted consumer could go
back to sleep even though data was already buffered for the new bagfile.
This made
writer_with_cache_splits_when_storage_bagfile_size_gt_max_bagfile_size
flaky, with fake_storage_size_ sometimes staying at 0 for the first
post-split write.

Fix MessageCache::done_flushing() to restore readiness when
producer_buffer_ still contains messages after flushing completes.
This preserves the wakeup for the restarted consumer and ensures
post-split cached messages are written to the new storage promptly.

Also add a regression test covering the flush handoff case where a
message is pushed during flushing and must still wake the consumer
after done_flushing().

Signed-off-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Michael Orlov <morlovmr@gmail.com>
The cached SequentialWriter split test was still asserting a stronger
condition than the implementation guarantees: that the 6th and 11th
writes would already be visible as the first write in a newly opened
bagfile.

That is not stable in cache mode. Split decisions are based on persisted
storage size, while writes continue through the cache and bag splitting
happens asynchronously. Depending on scheduling, the next split may not
be observable at that exact message boundary even though the writer
still drains correctly and produces the expected split files.

Drop the per-message `fake_storage_size_ == 1` assertion and keep the
checks that match the actual contract:
- all expected messages are written
- the storage is reopened the expected number of times
- metadata reports the expected split file set

This keeps the regression coverage while removing a flaky timing
assumption from the test.

Signed-off-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Michael Orlov <morlovmr@gmail.com>
@carlos-apex
Copy link
Copy Markdown
Contributor

@MichaelOrlov I continued working on top of your commits. I pushed our changes to the following branch.

rolling...carlos-apex:rosbag2:csv/async-split-nonblocking-upstream

Here is a short description and justification for the new commits.


3e4ac61

This commit changes the order in which pending split timestamps are handled. It could be potentially moved to a follow-up PR with some re-work.


3328e9e

This commit addresses a problem where split_bagfile_shared_future_ was not clean-up properly. The future was expected to be cleaned-up as part of wait_for_pending_split. However, this might not be the case if the function is not called.

The commit does a lazy clean-up at the start_split_bagfile_async function beginning. As a consequence, simultaneous async splits are not allowed. If there is an ongoing async split the new one is rejected.


857ded7

This commit addresses this TODO by adding a specific mutex.

// TODO(morlov): Protect messages_dropped_per_topic_ with mutex since we can call write(msg) concurrently


0888531

Added new tests to exercise split concurrency


290d6bf

This commit addresses a TSAN violation caught by the tests in the commit above

@MichaelOrlov
Copy link
Copy Markdown
Contributor Author

@carlos-apex Thank you for following up and your contribution.
Unfortunately, I currently don't have much capacity to finish this PR, while I am searching for a new job. I will get back to it when things settle down.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants