diff --git a/src/Storages/ExternalStream/ExternalStreamCounter.h b/src/Storages/ExternalStream/ExternalStreamCounter.h index d13d9ef873..888b24137b 100644 --- a/src/Storages/ExternalStream/ExternalStreamCounter.h +++ b/src/Storages/ExternalStream/ExternalStreamCounter.h @@ -67,33 +67,33 @@ class ExternalStreamCounter return written_failed.load(std::memory_order_relaxed); } - inline void addReadBytes(uint64_t bytes) { read_bytes += bytes; } - inline void addReadRows(uint64_t counts) { read_rows += counts; } - inline void addReadFailed(uint64_t amount) { read_failed += amount; } - inline void addWrittenBytes(uint64_t bytes) { written_bytes += bytes; } - inline void addWrittenRows(uint64_t counts) { written_rows += counts; } - inline void addWrittenFailed(uint64_t amount) { written_failed += amount; } - inline void addToMessagesBySize(uint64_t counts) { messages_by_size += counts; } - inline void addToMessagesByRow(uint64_t counts) { messages_by_row += counts; } + inline void addReadBytes(uint64_t bytes) { read_bytes.fetch_add(bytes, std::memory_order_relaxed); } + inline void addReadRows(uint64_t counts) { read_rows.fetch_add(counts, std::memory_order_relaxed); } + inline void addReadFailed(uint64_t amount) { read_failed.fetch_add(amount, std::memory_order_relaxed); } + inline void addWrittenBytes(uint64_t bytes) { written_bytes.fetch_add(bytes, std::memory_order_relaxed); } + inline void addWrittenRows(uint64_t counts) { written_rows.fetch_add(counts, std::memory_order_relaxed); } + inline void addWrittenFailed(uint64_t amount) { written_failed.fetch_add(amount, std::memory_order_relaxed); } + inline void addToMessagesBySize(uint64_t counts) { messages_by_size.fetch_add(counts, std::memory_order_relaxed); } + inline void addToMessagesByRow(uint64_t counts) { messages_by_row.fetch_add(counts, std::memory_order_relaxed); } - inline void setReadBytes(uint64_t bytes) { read_bytes = bytes; } - inline void setReadRows(uint64_t counts) { read_rows = counts; } - inline void setReadFailed(uint64_t amount) { read_failed = amount; } - inline void setWrittenBytes(uint64_t bytes) { written_bytes = bytes; } - inline void setWrittenRows(uint64_t counts) { written_rows = counts; } - inline void setWrittenFailed(uint64_t amount) { written_failed = amount; } + inline void setReadBytes(uint64_t bytes) { read_bytes.store(bytes, std::memory_order_relaxed); } + inline void setReadRows(uint64_t counts) { read_rows.store(counts, std::memory_order_relaxed); } + inline void setReadFailed(uint64_t amount) { read_failed.store(amount, std::memory_order_relaxed); } + inline void setWrittenBytes(uint64_t bytes) { written_bytes.store(bytes, std::memory_order_relaxed); } + inline void setWrittenRows(uint64_t counts) { written_rows.store(counts, std::memory_order_relaxed); } + inline void setWrittenFailed(uint64_t amount) { written_failed.store(amount, std::memory_order_relaxed); } std::map getCounters() const { return { - {"ReadBytes", read_bytes.load()}, - {"ReadRows", read_rows.load()}, - {"ReadFailed", read_failed.load()}, - {"WrittenBytes", written_bytes.load()}, - {"WrittenRows", written_rows.load()}, - {"WrittenFailed", written_failed.load()}, - {"MessagesBySize", messages_by_size.load()}, - {"MessagesByRow", messages_by_row.load()}, + {"ReadBytes", read_bytes.load(std::memory_order_relaxed)}, + {"ReadRows", read_rows.load(std::memory_order_relaxed)}, + {"ReadFailed", read_failed.load(std::memory_order_relaxed)}, + {"WrittenBytes", written_bytes.load(std::memory_order_relaxed)}, + {"WrittenRows", written_rows.load(std::memory_order_relaxed)}, + {"WrittenFailed", written_failed.load(std::memory_order_relaxed)}, + {"MessagesBySize", messages_by_size.load(std::memory_order_relaxed)}, + {"MessagesByRow", messages_by_row.load(std::memory_order_relaxed)}, }; }