Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
363 changes: 361 additions & 2 deletions cpp/mcap/include/mcap/writer.inl
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,60 @@
# include <zstd_errors.h>
#endif

#define ALIGNED_WRITE

#include <chrono>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <pthread.h>
#include <sched.h>
#include <sys/resource.h> // setpriority, PRIO_PROCESS

#ifdef ALIGNED_WRITE
#include <liburing.h>
#include <cstring>
#include <vector>
#include <chrono>
#include <arm_neon.h>
#include <cstddef>
#include <cstdint>
#include <fcntl.h>
#include <unistd.h>
#endif


namespace mcap {


#ifdef ALIGNED_WRITE
void neon_memcpy(uint8_t* dst, const uint8_t* src, size_t size) {
size_t i = 0;
// Copy 16 bytes (128 bits) at a time
for (; i + 15 < size; i += 16) {
uint8x16_t data = vld1q_u8(src + i);
vst1q_u8(dst + i, data);
}
// Copy remaining bytes
for (; i < size; ++i) {
dst[i] = src[i];
}
}
void neon_memset(uint8_t* dst, uint8_t value, size_t size) {
size_t i = 0;
uint8x16_t val = vdupq_n_u8(value);
for (; i + 15 < size; i += 16) {
vst1q_u8(dst + i, val);
}
for (; i < size; ++i) {
dst[i] = value;
}
}
#endif


// IWritable ///////////////////////////////////////////////////////////////////

IWritable::IWritable() noexcept
Expand All @@ -37,12 +89,313 @@ void IWritable::resetCrc() {
crc_ = internal::CRC32_INIT;
}

// FileWriter //////////////////////////////////////////////////////////////////
// FileWriter ///////////////////////////////////////////////////////////////////
#ifdef ALIGNED_WRITE
constexpr size_t BUFFER_SIZE = 140 * 1024 * 1024;
constexpr size_t IORING_BUFF_SIZE = 100 * 1024 * 1024;
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

submit constant size, currently 100MB

int fd_=-1;
struct io_uring ring_;
bool ringInited_=false;
void* buf_ = nullptr;
void *bufpong_=nullptr;
uint64_t writeOffset_=0; // current offset in file
static constexpr int QUEUE_DEPTH = 128;
static constexpr size_t ALIGNMENT = 512;
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
std::atomic<bool> running{true};
uint64_t mcap_buffsize=0;
int final_footer_count = 0;
bool final_footer = false;
bool ping_buffer_used=true;
std::thread workerThread_;
uint64_t aligned_blocks=0;

void worker_thread();

Status FileWriter::open(std::string_view filename) {
end();
fd_ = ::open(filename.data(), O_CREAT |O_RDWR | O_TRUNC | O_DIRECT, 0644);
if (fd_ < 0) {
perror("open");
}

if (posix_memalign(&buf_, ALIGNMENT, BUFFER_SIZE) != 0) {
perror("posix_memalign");
close(fd_);
}

if (posix_memalign(&bufpong_, ALIGNMENT,BUFFER_SIZE) != 0) {
perror("posix_memalign mcapbuf_");
free(buf_); // Clean up buf_
close(fd_);
}
int ret = io_uring_queue_init(QUEUE_DEPTH, &ring_, 0);
if (ret < 0) {
std::cerr << "io_uring_queue_init failed: " << strerror(-ret) << "\n";
close(fd_);
}
ringInited_ = true;

// Register buffer
struct iovec iovecs[2];
iovecs[0].iov_base = buf_;
iovecs[0].iov_len = BUFFER_SIZE;

iovecs[1].iov_base = bufpong_;
iovecs[1].iov_len = BUFFER_SIZE;
ret = io_uring_register_buffers(&ring_, iovecs, 2);
if (ret < 0) {
std::cerr << "io_uring_register_buffers failed: " << strerror(-ret) << "\n";
close(fd_);
}

// Register file descriptor
ret = io_uring_register_files(&ring_, &fd_, 1);
if (ret < 0) {
std::cerr << "io_uring_register_files failed: " << strerror(-ret) << "\n";
close(fd_);
}

size_ = 0;
writeOffset_ = 0;
running=true;
ready=false;
mcap_buffsize=0;
ping_buffer_used=true;
std::thread t(worker_thread);
workerThread_ = std::move(t); // Store in a class member
return StatusCode::Success;
}


void worker_thread() {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(0, &cpuset);
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
pid_t tid = gettid(); // get thread ID (Linux-specific)
if (setpriority(PRIO_PROCESS, tid, -18) != 0) {
perror("setpriority failed");
} else {
std::cout << " rest Nice value set to " << std::endl;
}

std::unique_lock<std::mutex> lock(mtx); // ✅ This must be defined
while(running)
{
cv.wait(lock, [] { return ready; }); // Wait until ready is
io_uring_cqe* cqe;
if (io_uring_wait_cqe(&ring_, &cqe) < 0) {
std::cerr << "io_uring_wait_cqe failed\n";
continue;
}
if (cqe->res < 0) {
std::cerr << "Async write error: " << strerror(-cqe->res) << "\n";
} else {
//std::cout << " io_ring Write completed: " << cqe->res << " bytes\n";
}
io_uring_cqe_seen(&ring_, cqe);
ready=false;
}
std::cout << "Detached thread received signal. Proceeding..." << std::endl;
}
inline void* getActiveBuffer() {
return ping_buffer_used ? buf_ : bufpong_;
}

inline void switchBuffer() {
ping_buffer_used = !ping_buffer_used;
}

inline void* getInactiveBuffer() {
return ping_buffer_used ? bufpong_ : buf_;
}

// Your requested handleWrite equivalent
void FileWriter::handleWrite(const std::byte* data, uint64_t size) {
if (!ringInited_ || !data || size == 0)
return;

// Append incoming data to internal MCAP buffer if size is reasonable
if (mcap_buffsize + size > BUFFER_SIZE) {
std::cerr << "Error: Buffer overflow risk. Data size exceeds allocated buffer.\n";
return;
}
void* activeBuf = getActiveBuffer();
neon_memcpy(static_cast<uint8_t*>(activeBuf) + mcap_buffsize,
reinterpret_cast<const uint8_t*>(data),
size);
mcap_buffsize += size;

// Check for MCAP magic footer (assuming Magic is a byte array and sizeof(Magic) is valid)
if (size == sizeof(Magic) && memcmp(data, Magic, sizeof(Magic)) == 0) {
final_footer_count++;
if (final_footer_count == 2) {
final_footer = true;
final_footer_count=0;
}
}

// Lambda to submit a fixed buffer write using io_uring
auto submit_fixed_write = [&](void* buf, size_t bufSize, off_t offset, int bufIndex) -> bool {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (!sqe) {
std::cerr << "io_uring_get_sqe failed\n";
return false;
}
io_uring_prep_write(sqe, 0, buf,IORING_BUFF_SIZE, offset);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io_uring_prep_write

io_uring_prep_write_fixed - by right should be zero copy. but we have speed issue

sqe->flags |= IOSQE_FIXED_FILE;
return true;
};

if (mcap_buffsize >= IORING_BUFF_SIZE) {
// Align mcap_buffsize down to multiple of ALIGNMENT
size_t remain_mcapbytes = mcap_buffsize -IORING_BUFF_SIZE;

void* activeBuf = getActiveBuffer();
if(ping_buffer_used)
{
if (!submit_fixed_write(activeBuf, aligned_blocks, writeOffset_, 0))
return;
}
else
{
if (!submit_fixed_write(activeBuf, aligned_blocks, writeOffset_, 1))
return;
}

int ret = io_uring_submit(&ring_);
if (ret < 0) {
std::cerr << "io_uring_submit failed: " << strerror(-ret) << "\n";
return;
}
{
std::lock_guard<std::mutex> lock(mtx);
ready = true;
}
cv.notify_one(); // response thread

if (remain_mcapbytes > 0) {
void* InactiveBuf = getInactiveBuffer();
void* activeBuf = getActiveBuffer();
std::memcpy(reinterpret_cast<uint8_t*>(InactiveBuf),
reinterpret_cast<uint8_t*>(activeBuf) +IORING_BUFF_SIZE,
remain_mcapbytes);
switchBuffer();
}
mcap_buffsize = remain_mcapbytes;
writeOffset_ +=IORING_BUFF_SIZE;
size_ += IORING_BUFF_SIZE;
}

if (final_footer) {
// Align mcap_buffsize up to next multiple of ALIGNMENT
size_t aligned_size = (mcap_buffsize + (ALIGNMENT - 1)) & ~(ALIGNMENT - 1);
size_t appended_bytes = aligned_size - mcap_buffsize;

// Submit final footer write
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (!sqe) {
std::cerr << "io_uring_get_sqe failed\n";
return;
}
void* activeBuf = getActiveBuffer();
if(ping_buffer_used)
{
if (!submit_fixed_write(activeBuf, aligned_size, writeOffset_, 0))
return;
}
else
{
if (!submit_fixed_write(activeBuf,aligned_size, writeOffset_, 1))
return;
}

int ret = io_uring_submit(&ring_);
if (ret < 0) {
std::cerr << "io_uring_submit failed: " << strerror(-ret) << "\n";
return;
} else if (ret == 0) {
std::cerr << "io_uring_submit submitted 0 entries, nothing queued\n";
return;
} else {
std::cout << "Submitted " << ret << " sqe(s) successfully\n";
}

// Wait for completion
struct io_uring_cqe* cqe;
if (io_uring_wait_cqe(&ring_, &cqe) < 0) {
std::cerr << "io_uring_wait_cqe failed\n";
return;
}
if (cqe->res < 0) {
std::cerr << "Async write error 0001: " << strerror(-cqe->res) << "\n";
} else {
std::cout << "Write completed: " << cqe->res << " bytes\n";
}
io_uring_cqe_seen(&ring_, cqe);

writeOffset_ += aligned_size;

// Truncate file to remove padded bytes
struct stat st;
if (fstat(fd_, &st) < 0) {
perror("fstat");
return;
}
off_t current_size = st.st_size;
off_t new_size = current_size - appended_bytes;

if (new_size < 0) {
std::cerr << "Cannot remove more bytes than file size\n";
return;
}

if (ftruncate(fd_, new_size) < 0) {
perror("ftruncate");
return;
} else {
std::cout << "File truncated to " << new_size << " bytes\n";
}


std::cout << " after current_size " << current_size << " bytes\n";
final_footer = false;
}

}


void io_close() {
if (ringInited_) {
io_uring_unregister_buffers(&ring_);
io_uring_unregister_files(&ring_);
io_uring_queue_exit(&ring_);
ringInited_ = false;
}
if (fd_ >= 0) {
::close(fd_);
fd_ = -1;
}
if (buf_) {
free(buf_);
buf_ = nullptr;
}
writeOffset_ = 0;
running = false;
final_footer_count=0;
final_footer=false;
cv.notify_all();
if (workerThread_.joinable()) workerThread_.join();
}
#endif

FileWriter::~FileWriter() {
end();
}

#ifndef ALIGNED_WRITE
Status FileWriter::open(std::string_view filename) {
end();
file_ = std::fopen(filename.data(), "wb");
Expand All @@ -61,6 +414,8 @@ void FileWriter::handleWrite(const std::byte* data, uint64_t size) {
size_ += size;
}

#endif

void FileWriter::flush() {
if (file_) {
std::fflush(file_);
Expand All @@ -72,6 +427,10 @@ void FileWriter::end() {
std::fclose(file_);
file_ = nullptr;
}
#ifdef ALIGNED_WRITE
io_close();
close(fd_);
#endif
size_ = 0;
}

Expand Down