Skip to content

Commit 81dc58d

Browse files
add methods to support finishing existing queue before ending recording
1 parent 561dcf4 commit 81dc58d

4 files changed

Lines changed: 61 additions & 1 deletion

File tree

data_tamer_cpp/include/data_tamer/data_sink.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,12 @@ class DataSinkBase
9696

9797
void stopThread();
9898

99+
void stopAcceptingSnapshots();
100+
101+
void processQueuedSnapshots();
102+
103+
void startAcceptingSnapshots();
104+
99105
private:
100106
struct Pimpl;
101107
std::unique_ptr<Pimpl> _p;

data_tamer_cpp/include/data_tamer/sinks/mcap_sink.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ class MCAPSink : public DataSinkBase
5252
/// Stop recording and save the file
5353
void stopRecording();
5454

55+
/// Stop taking snapshots, finish the existing queue, then `stopRecording`
56+
/// will block for at least 250 us to ensure the queue is empty
57+
void finishQueueAndStop();
58+
5559
/**
5660
* @brief restartRecording saves the current file (unless we did it already,
5761
* calling stopRecording) and start recording into a new one.

data_tamer_cpp/src/data_sink.cpp

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ struct DataSinkBase::Pimpl
2929

3030
std::thread thread;
3131
std::atomic_bool run = true;
32+
std::atomic_bool accept_snapshots = true;
3233
moodycamel::ConcurrentQueue<Snapshot> queue;
3334
};
3435

@@ -41,7 +42,33 @@ DataSinkBase::~DataSinkBase()
4142

4243
bool DataSinkBase::pushSnapshot(const Snapshot& snapshot)
4344
{
44-
return _p->queue.enqueue(snapshot);
45+
if(_p->accept_snapshots)
46+
{
47+
return _p->queue.enqueue(snapshot);
48+
}
49+
else
50+
{
51+
return false;
52+
}
53+
}
54+
55+
void DataSinkBase::stopAcceptingSnapshots()
56+
{
57+
_p->accept_snapshots = false;
58+
}
59+
60+
void DataSinkBase::startAcceptingSnapshots()
61+
{
62+
_p->accept_snapshots = true;
63+
}
64+
65+
void DataSinkBase::processQueuedSnapshots()
66+
{
67+
Snapshot snapshot_copy;
68+
while(_p->queue.try_dequeue(snapshot_copy))
69+
{
70+
this->storeSnapshot(snapshot_copy);
71+
}
4572
}
4673

4774
void DataSinkBase::stopThread()

data_tamer_cpp/src/sinks/mcap_sink.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <sstream>
66
#include <mutex>
77
#include <string>
8+
#include <thread>
89

910
#ifndef USING_ROS2
1011
#define MCAP_IMPLEMENTATION
@@ -151,6 +152,25 @@ void MCAPSink::stopRecording()
151152
writer_.reset();
152153
}
153154

155+
void MCAPSink::finishQueueAndStop()
156+
{
157+
{
158+
std::scoped_lock lk(mutex_);
159+
// stop accepting new snapshots
160+
stopAcceptingSnapshots();
161+
162+
// finish any that are queued
163+
processQueuedSnapshots();
164+
165+
// sleep and process any that were missed by previous processing
166+
std::this_thread::sleep_for(std::chrono::microseconds(250));
167+
processQueuedSnapshots();
168+
}
169+
170+
// now stop the recording as normal
171+
stopRecording();
172+
}
173+
154174
void MCAPSink::restartRecording(const std::string& filepath, bool do_compression)
155175
{
156176
restartRecordingImpl(filepath, do_compression, true);
@@ -175,6 +195,9 @@ void MCAPSink::restartRecordingImpl(const std::string& filepath, bool do_compres
175195
{
176196
addChannel(name, schema);
177197
}
198+
199+
// start accepting snapshots again in case they were stopped
200+
startAcceptingSnapshots();
178201
}
179202

180203
} // namespace DataTamer

0 commit comments

Comments
 (0)