From 533ea0c9b4aeaff457932b34d26d8db69ac3e74b Mon Sep 17 00:00:00 2001 From: BharatDeva <278575558+BharatDeva@users.noreply.github.com> Date: Sun, 17 May 2026 19:24:25 -0500 Subject: [PATCH] Use relaxed ordering for external stream counters ExternalStreamCounter metrics are plain counters and do not publish or order other data. Use explicit std::memory_order_relaxed for add, set, and snapshot operations to match the existing reset/read methods and avoid unnecessary sequential consistency barriers. Fixes #488. Signed-off-by: BharatDeva <278575558+BharatDeva@users.noreply.github.com> --- .../ExternalStream/ExternalStreamCounter.h | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/Storages/ExternalStream/ExternalStreamCounter.h b/src/Storages/ExternalStream/ExternalStreamCounter.h index d13d9ef873..888b24137b 100644 --- a/src/Storages/ExternalStream/ExternalStreamCounter.h +++ b/src/Storages/ExternalStream/ExternalStreamCounter.h @@ -67,33 +67,33 @@ class ExternalStreamCounter return written_failed.load(std::memory_order_relaxed); } - inline void addReadBytes(uint64_t bytes) { read_bytes += bytes; } - inline void addReadRows(uint64_t counts) { read_rows += counts; } - inline void addReadFailed(uint64_t amount) { read_failed += amount; } - inline void addWrittenBytes(uint64_t bytes) { written_bytes += bytes; } - inline void addWrittenRows(uint64_t counts) { written_rows += counts; } - inline void addWrittenFailed(uint64_t amount) { written_failed += amount; } - inline void addToMessagesBySize(uint64_t counts) { messages_by_size += counts; } - inline void addToMessagesByRow(uint64_t counts) { messages_by_row += counts; } + inline void addReadBytes(uint64_t bytes) { read_bytes.fetch_add(bytes, std::memory_order_relaxed); } + inline void addReadRows(uint64_t counts) { read_rows.fetch_add(counts, std::memory_order_relaxed); } + inline void addReadFailed(uint64_t amount) { read_failed.fetch_add(amount, std::memory_order_relaxed); } + inline void addWrittenBytes(uint64_t bytes) { written_bytes.fetch_add(bytes, std::memory_order_relaxed); } + inline void addWrittenRows(uint64_t counts) { written_rows.fetch_add(counts, std::memory_order_relaxed); } + inline void addWrittenFailed(uint64_t amount) { written_failed.fetch_add(amount, std::memory_order_relaxed); } + inline void addToMessagesBySize(uint64_t counts) { messages_by_size.fetch_add(counts, std::memory_order_relaxed); } + inline void addToMessagesByRow(uint64_t counts) { messages_by_row.fetch_add(counts, std::memory_order_relaxed); } - inline void setReadBytes(uint64_t bytes) { read_bytes = bytes; } - inline void setReadRows(uint64_t counts) { read_rows = counts; } - inline void setReadFailed(uint64_t amount) { read_failed = amount; } - inline void setWrittenBytes(uint64_t bytes) { written_bytes = bytes; } - inline void setWrittenRows(uint64_t counts) { written_rows = counts; } - inline void setWrittenFailed(uint64_t amount) { written_failed = amount; } + inline void setReadBytes(uint64_t bytes) { read_bytes.store(bytes, std::memory_order_relaxed); } + inline void setReadRows(uint64_t counts) { read_rows.store(counts, std::memory_order_relaxed); } + inline void setReadFailed(uint64_t amount) { read_failed.store(amount, std::memory_order_relaxed); } + inline void setWrittenBytes(uint64_t bytes) { written_bytes.store(bytes, std::memory_order_relaxed); } + inline void setWrittenRows(uint64_t counts) { written_rows.store(counts, std::memory_order_relaxed); } + inline void setWrittenFailed(uint64_t amount) { written_failed.store(amount, std::memory_order_relaxed); } std::map getCounters() const { return { - {"ReadBytes", read_bytes.load()}, - {"ReadRows", read_rows.load()}, - {"ReadFailed", read_failed.load()}, - {"WrittenBytes", written_bytes.load()}, - {"WrittenRows", written_rows.load()}, - {"WrittenFailed", written_failed.load()}, - {"MessagesBySize", messages_by_size.load()}, - {"MessagesByRow", messages_by_row.load()}, + {"ReadBytes", read_bytes.load(std::memory_order_relaxed)}, + {"ReadRows", read_rows.load(std::memory_order_relaxed)}, + {"ReadFailed", read_failed.load(std::memory_order_relaxed)}, + {"WrittenBytes", written_bytes.load(std::memory_order_relaxed)}, + {"WrittenRows", written_rows.load(std::memory_order_relaxed)}, + {"WrittenFailed", written_failed.load(std::memory_order_relaxed)}, + {"MessagesBySize", messages_by_size.load(std::memory_order_relaxed)}, + {"MessagesByRow", messages_by_row.load(std::memory_order_relaxed)}, }; }