From 9207296265ebb4e2f5379d5fe50b2cef6ab7d330 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 9 Apr 2026 21:24:44 +0800 Subject: [PATCH 1/5] support write parallel. --- cpp/CMakeLists.txt | 7 + cpp/src/common/config/config.h | 2 + cpp/src/common/global.cc | 3 + cpp/src/common/global.h | 14 + cpp/src/common/tablet.cc | 131 +++++++ cpp/src/common/tablet.h | 5 + cpp/src/common/thread_pool.h | 122 ++++++ cpp/src/writer/tsfile_writer.cc | 369 ++++++------------ cpp/src/writer/tsfile_writer.h | 9 +- .../table_view/tsfile_writer_table_test.cc | 45 ++- 10 files changed, 432 insertions(+), 275 deletions(-) create mode 100644 cpp/src/common/thread_pool.h diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 9d2015e48..cc50a1782 100755 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -150,6 +150,13 @@ if (ENABLE_ZLIB) add_definitions(-DENABLE_GZIP) endif() +option(ENABLE_THREADS "Enable multi-threaded read/write (requires pthreads)" ON) +message("cmake using: ENABLE_THREADS=${ENABLE_THREADS}") + +if (ENABLE_THREADS) + add_definitions(-DENABLE_THREADS) +endif() + option(ENABLE_SIMDE "Enable SIMDe (SIMD Everywhere)" OFF) message("cmake using: ENABLE_SIMDE=${ENABLE_SIMDE}") diff --git a/cpp/src/common/config/config.h b/cpp/src/common/config/config.h index 81dad924f..e2b2039a7 100644 --- a/cpp/src/common/config/config.h +++ b/cpp/src/common/config/config.h @@ -46,6 +46,8 @@ typedef struct ConfigValue { TSEncoding double_encoding_type_; TSEncoding string_encoding_type_; CompressionType default_compression_type_; + bool parallel_write_enabled_; + int32_t write_thread_count_; // When true, aligned writer enforces page size limit strictly by // interleaving time/value writes and sealing pages together when any side // becomes full. diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc index 91ecedda1..721522c77 100644 --- a/cpp/src/common/global.cc +++ b/cpp/src/common/global.cc @@ -54,12 +54,15 @@ void init_config_value() { g_config_value_.int64_encoding_type_ = TS_2DIFF; g_config_value_.float_encoding_type_ = GORILLA; g_config_value_.double_encoding_type_ = GORILLA; + g_config_value_.string_encoding_type_ = PLAIN; // Default compression type is LZ4 #ifdef ENABLE_LZ4 g_config_value_.default_compression_type_ = LZ4; #else g_config_value_.default_compression_type_ = UNCOMPRESSED; #endif + g_config_value_.parallel_write_enabled_ = true; + g_config_value_.write_thread_count_ = 6; // Enforce aligned page size limits strictly by default. g_config_value_.strict_page_size_ = true; } diff --git a/cpp/src/common/global.h b/cpp/src/common/global.h index 7937e9203..01be5a9c0 100644 --- a/cpp/src/common/global.h +++ b/cpp/src/common/global.h @@ -163,6 +163,20 @@ FORCE_INLINE uint8_t get_global_compression() { return static_cast(g_config_value_.default_compression_type_); } +FORCE_INLINE void set_parallel_write_enabled(bool enabled) { + g_config_value_.parallel_write_enabled_ = enabled; +} + +FORCE_INLINE bool get_parallel_write_enabled() { + return g_config_value_.parallel_write_enabled_; +} + +FORCE_INLINE int set_write_thread_count(int32_t count) { + if (count < 1 || count > 64) return E_INVALID_ARG; + g_config_value_.write_thread_count_ = count; + return E_OK; +} + extern int init_common(); extern bool is_timestamp_column_name(const char* time_col_name); extern void cols_to_json(ByteStream* byte_stream, diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index 4088a6927..6233abd79 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -19,7 +19,9 @@ #include "tablet.h" +#include #include +#include #include "allocator/alloc_base.h" #include "datatype/date_converter.h" @@ -491,6 +493,135 @@ void Tablet::set_column_categories( } } +namespace { + +template +void permute_array(T* arr, const std::vector& perm, uint32_t n) { + std::vector tmp(n); + for (uint32_t i = 0; i < n; i++) tmp[i] = arr[perm[i]]; + std::copy(tmp.begin(), tmp.end(), arr); +} + +void permute_string_column(Tablet::StringColumn* sc, BitMap& bm, + const std::vector& perm, uint32_t n, + uint32_t max_rows) { + Tablet::StringColumn tmp; + tmp.init(max_rows, sc->buf_used > 0 ? sc->buf_used : 64); + for (uint32_t i = 0; i < n; i++) { + uint32_t r = perm[i]; + if (bm.test(r)) { + // Null row — write a zero-length entry to keep offsets valid. + tmp.append(i, "", 0); + } else { + int32_t off = sc->offsets[r]; + uint32_t len = + static_cast(sc->offsets[r + 1] - off); + tmp.append(i, sc->buffer + off, len); + } + } + // Swap contents. + std::swap(sc->offsets, tmp.offsets); + std::swap(sc->buffer, tmp.buffer); + std::swap(sc->buf_capacity, tmp.buf_capacity); + std::swap(sc->buf_used, tmp.buf_used); + tmp.destroy(); +} + +void permute_bitmap(BitMap& bm, const std::vector& perm, + uint32_t n) { + if (!bm.get_bitmap()) return; + uint32_t size_bytes = bm.get_size(); + // Save original bits. + std::vector orig(bm.get_bitmap(), bm.get_bitmap() + size_bytes); + // Clear all bits (= all non-null). + bm.clear_all(); + // Re-set null bits through the permutation. + for (uint32_t i = 0; i < n; i++) { + uint32_t src = perm[i]; + if (orig[src >> 3] & (1 << (src & 7))) { + bm.set(i); + } + } +} + +} // anonymous namespace + +void Tablet::sort_by_device() { + if (id_column_indexes_.empty() || cur_row_size_ <= 1) return; + + const uint32_t n = cur_row_size_; + + // Build permutation sorted by tag column values (stable sort keeps + // timestamp order within each device). + std::vector perm(n); + std::iota(perm.begin(), perm.end(), 0); + + std::stable_sort(perm.begin(), perm.end(), [this](uint32_t a, uint32_t b) { + for (int idx : id_column_indexes_) { + bool a_null = bitmaps_[idx].test(a); + bool b_null = bitmaps_[idx].test(b); + if (a_null != b_null) return a_null > b_null; // null sorts last + if (a_null) continue; // both null — equal on this column + const StringColumn& sc = *value_matrix_[idx].string_col; + int32_t a_off = sc.offsets[a]; + uint32_t a_len = static_cast(sc.offsets[a + 1] - a_off); + int32_t b_off = sc.offsets[b]; + uint32_t b_len = static_cast(sc.offsets[b + 1] - b_off); + uint32_t min_len = std::min(a_len, b_len); + int cmp = (min_len > 0) + ? memcmp(sc.buffer + a_off, sc.buffer + b_off, min_len) + : 0; + if (cmp != 0) return cmp < 0; + if (a_len != b_len) return a_len < b_len; + } + return false; + }); + + // Check if already sorted. + bool sorted = true; + for (uint32_t i = 0; i < n && sorted; i++) { + if (perm[i] != i) sorted = false; + } + if (sorted) return; + + // Apply permutation to timestamps. + permute_array(timestamps_, perm, n); + + // Apply permutation to each column. + uint32_t col_count = static_cast(schema_vec_->size()); + for (uint32_t c = 0; c < col_count; c++) { + TSDataType dt = schema_vec_->at(c).data_type_; + switch (dt) { + case BOOLEAN: + permute_array(value_matrix_[c].bool_data, perm, n); + break; + case INT32: + case DATE: + permute_array(value_matrix_[c].int32_data, perm, n); + break; + case INT64: + case TIMESTAMP: + permute_array(value_matrix_[c].int64_data, perm, n); + break; + case FLOAT: + permute_array(value_matrix_[c].float_data, perm, n); + break; + case DOUBLE: + permute_array(value_matrix_[c].double_data, perm, n); + break; + case STRING: + case TEXT: + case BLOB: + permute_string_column(value_matrix_[c].string_col, bitmaps_[c], + perm, n, max_row_num_); + break; + default: + break; + } + permute_bitmap(bitmaps_[c], perm, n); + } +} + void Tablet::reset_string_columns() { size_t schema_count = schema_vec_->size(); for (size_t c = 0; c < schema_count; c++) { diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index beedacc04..a6bebae92 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -46,6 +46,7 @@ class TabletColIterator; * with their associated metadata such as column names and types. */ class Tablet { + public: // Arrow-style string column: offsets + contiguous buffer. // string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i] struct StringColumn { @@ -284,6 +285,10 @@ class Tablet { void set_column_categories( const std::vector& column_categories); + // Sort rows so that rows belonging to the same device (same TAG column + // values) are contiguous. Stable sort: preserves timestamp order within + // each device. No-op when there are no TAG columns or ≤1 rows. + void sort_by_device(); std::shared_ptr get_device_id(int i) const; std::vector find_all_device_boundaries() const; diff --git a/cpp/src/common/thread_pool.h b/cpp/src/common/thread_pool.h new file mode 100644 index 000000000..9285d4ff4 --- /dev/null +++ b/cpp/src/common/thread_pool.h @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef COMMON_THREAD_POOL_H +#define COMMON_THREAD_POOL_H + +#ifdef ENABLE_THREADS + +#include +#include +#include +#include +#include +#include +#include + +namespace common { + +// Unified fixed-size thread pool supporting both fire-and-forget tasks +// (submit void + wait_all) and future-returning tasks (submit). +// Used by both write path (column-parallel encoding) and read path +// (column-parallel decoding). +class ThreadPool { + public: + explicit ThreadPool(size_t num_threads) : stop_(false), active_(0) { + for (size_t i = 0; i < num_threads; i++) { + workers_.emplace_back([this] { worker_loop(); }); + } + } + + ~ThreadPool() { + { + std::lock_guard lk(mu_); + stop_ = true; + } + cv_work_.notify_all(); + for (auto& w : workers_) { + if (w.joinable()) w.join(); + } + } + + // Submit a fire-and-forget task (no return value). + void submit(std::function task) { + { + std::lock_guard lk(mu_); + tasks_.push(std::move(task)); + active_++; + } + cv_work_.notify_one(); + } + + // Submit a task that returns a value via std::future. + template + std::future::type> submit(F&& f) { + using RetType = typename std::result_of::type; + auto task = + std::make_shared>(std::forward(f)); + std::future result = task->get_future(); + { + std::lock_guard lk(mu_); + tasks_.emplace([task]() { (*task)(); }); + active_++; + } + cv_work_.notify_one(); + return result; + } + + // Block until all submitted tasks have completed. + void wait_all() { + std::unique_lock lk(mu_); + cv_done_.wait(lk, [this] { return active_ == 0 && tasks_.empty(); }); + } + + private: + void worker_loop() { + while (true) { + std::function task; + { + std::unique_lock lk(mu_); + cv_work_.wait(lk, [this] { return stop_ || !tasks_.empty(); }); + if (stop_ && tasks_.empty()) return; + task = std::move(tasks_.front()); + tasks_.pop(); + } + task(); + { + std::lock_guard lk(mu_); + active_--; + } + cv_done_.notify_one(); + } + } + + std::vector workers_; + std::queue> tasks_; + std::mutex mu_; + std::condition_variable cv_work_; + std::condition_variable cv_done_; + bool stop_; + int active_; +}; + +} // namespace common + +#endif // ENABLE_THREADS + +#endif // COMMON_THREAD_POOL_H diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 657fcabc2..e705bab67 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -1136,280 +1136,141 @@ int TsFileWriter::write_table(Tablet& tablet) { return ret; } + // Sort tablet so same-device rows are contiguous. + tablet.sort_by_device(); + auto device_id_end_index_pairs = split_tablet_by_device(tablet); int start_idx = 0; for (auto& device_id_end_index_pair : device_id_end_index_pairs) { auto device_id = device_id_end_index_pair.first; int end_idx = device_id_end_index_pair.second; if (end_idx == 0) continue; - if (table_aligned_) { - SimpleVector value_chunk_writers; - TimeChunkWriter* time_chunk_writer = nullptr; - if (RET_FAIL(do_check_schema_table(device_id, tablet, - time_chunk_writer, - value_chunk_writers))) { - return ret; - } - const bool strict_page_size = - common::g_config_value_.strict_page_size_; - - std::vector field_columns; - field_columns.reserve(tablet.get_column_count()); - for (uint32_t col = 0; col < tablet.get_column_count(); ++col) { - if (tablet.column_categories_[col] == - common::ColumnCategory::FIELD) { - field_columns.push_back(col); - } - } - ASSERT(field_columns.size() == value_chunk_writers.size()); - - const bool has_varlen_field_column = [&]() { - for (uint32_t i = 0; i < field_columns.size(); i++) { - const common::TSDataType t = - tablet.schema_vec_->at(field_columns[i]).data_type_; - if (t == common::STRING || t == common::TEXT || - t == common::BLOB) { - return true; - } - } - return false; - }(); + SimpleVector value_chunk_writers; + TimeChunkWriter* time_chunk_writer = nullptr; + if (RET_FAIL(do_check_schema_table(device_id, tablet, + time_chunk_writer, + value_chunk_writers))) { + return ret; + } - // Keep writers' seal-check behavior consistent across calls. - time_chunk_writer->set_enable_page_seal_if_full(strict_page_size); - for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { - if (!IS_NULL(value_chunk_writers[c])) { - value_chunk_writers[c]->set_enable_page_seal_if_full( - strict_page_size); - } + std::vector field_columns; + field_columns.reserve(tablet.get_column_count()); + for (uint32_t col = 0; col < tablet.get_column_count(); ++col) { + if (tablet.column_categories_[col] == + common::ColumnCategory::FIELD) { + field_columns.push_back(col); } + } + ASSERT(field_columns.size() == value_chunk_writers.size()); - if (strict_page_size) { - // Strict: row-based insertion and force aligned page sealing - // when either time or any value page becomes full. - for (int i = start_idx; i < end_idx; i++) { - int32_t time_pages_before = - time_chunk_writer->num_of_pages(); - std::vector value_pages_before( - value_chunk_writers.size(), 0); - for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { - if (!IS_NULL(value_chunk_writers[k])) { - value_pages_before[k] = - value_chunk_writers[k]->num_of_pages(); - } - } - - if (RET_FAIL( - time_chunk_writer->write(tablet.timestamps_[i]))) { - return ret; - } - - for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { - ValueChunkWriter* value_chunk_writer = - value_chunk_writers[k]; - if (IS_NULL(value_chunk_writer)) { - continue; - } - const uint32_t tablet_col_idx = field_columns[k]; - if (RET_FAIL(value_write_column(value_chunk_writer, - tablet, tablet_col_idx, - i, i + 1))) { - return ret; - } - } - - if (RET_FAIL(maybe_seal_aligned_pages_together( - time_chunk_writer, value_chunk_writers, - time_pages_before, value_pages_before))) { - return ret; - } - } - } else if (!has_varlen_field_column) { - // Optimization: no string/blob/text columns, so we can - // segment by point-number and seal pages at those boundaries - // in column-based order. - const uint32_t points_per_page = - common::g_config_value_.page_writer_max_point_num_; - - time_chunk_writer->set_enable_page_seal_if_full(false); - for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { - if (!IS_NULL(value_chunk_writers[c])) { - value_chunk_writers[c]->set_enable_page_seal_if_full( - false); - } - } - - // Fill the already-unsealed time page first. - uint32_t time_cur_points = time_chunk_writer->get_point_numer(); - if (time_cur_points >= points_per_page && - time_chunk_writer->has_current_page_data()) { - if (RET_FAIL(time_chunk_writer->seal_current_page())) { - return ret; - } - for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { - if (!IS_NULL(value_chunk_writers[c]) && - value_chunk_writers[c]->has_current_page_data()) { - if (RET_FAIL(value_chunk_writers[c] - ->seal_current_page())) { - return ret; - } - } - } - time_cur_points = 0; - } - - const uint32_t first_seg_len = - (time_cur_points > 0 && time_cur_points < points_per_page) - ? (points_per_page - time_cur_points) - : points_per_page; - - // 1) Write time in segments (seal all full segments). - uint32_t seg_start = static_cast(start_idx); - uint32_t seg_len = first_seg_len; - while (static_cast(seg_start) < end_idx) { - const uint32_t seg_end = std::min( - seg_start + seg_len, static_cast(end_idx)); - if (RET_FAIL(time_write_column(time_chunk_writer, tablet, - seg_start, seg_end))) { - return ret; - } - seg_start = seg_end; - if (static_cast(seg_start) < end_idx) { - if (RET_FAIL(time_chunk_writer->seal_current_page())) { - return ret; - } - } - seg_len = points_per_page; - } - - // 2) Write each value column (same segments). - for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { - ValueChunkWriter* value_chunk_writer = - value_chunk_writers[k]; - if (IS_NULL(value_chunk_writer)) { - continue; - } - seg_start = static_cast(start_idx); - seg_len = first_seg_len; - while (static_cast(seg_start) < end_idx) { - const uint32_t seg_end = - std::min(seg_start + seg_len, - static_cast(end_idx)); - if (RET_FAIL(value_write_column( - value_chunk_writer, tablet, field_columns[k], - seg_start, seg_end))) { - return ret; - } - seg_start = seg_end; - if (static_cast(seg_start) < end_idx) { - if (value_chunk_writer->has_current_page_data() && - RET_FAIL( - value_chunk_writer->seal_current_page())) { - return ret; - } - } - seg_len = points_per_page; - } - } - } else { - // General non-strict (may have varlen STRING/TEXT/BLOB - // columns): time auto-seals to provide aligned page boundaries; - // value writers skip auto page sealing and are sealed manually - // at recorded time-page boundaries. Attention: since value-side - // auto-seal is disabled, if a varlen value page hits the memory - // threshold earlier, it may not seal immediately and will be - // sealed later at the time-page boundaries (non-strict - // sacrifices the strict page size/memory limit for - // performance). - time_chunk_writer->set_enable_page_seal_if_full(true); - for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { - if (!IS_NULL(value_chunk_writers[c])) { - value_chunk_writers[c]->set_enable_page_seal_if_full( - false); - } - } + // Precompute page boundaries from point counts — no serial write + // needed. The first segment may be shorter if the time page already + // holds data from a previous write_table call. + const uint32_t page_max_points = std::max( + 1, common::g_config_value_.page_writer_max_point_num_); + const uint32_t si = static_cast(start_idx); + const uint32_t ei = static_cast(end_idx); - std::vector time_page_row_ends; - const uint32_t page_max_points = std::max( - 1, common::g_config_value_.page_writer_max_point_num_); - const uint32_t batch_rows = - static_cast(end_idx - start_idx); - time_page_row_ends.reserve(batch_rows / page_max_points + 1); - for (uint32_t r = static_cast(start_idx); - r < static_cast(end_idx); r++) { - const int32_t pages_before = - time_chunk_writer->num_of_pages(); - if (RET_FAIL( - time_chunk_writer->write(tablet.timestamps_[r]))) { - return ret; - } - const int32_t pages_after = - time_chunk_writer->num_of_pages(); - if (pages_after > pages_before) { - const uint32_t boundary_end = r + 1; - if (time_page_row_ends.empty() || - time_page_row_ends.back() != boundary_end) { - time_page_row_ends.push_back(boundary_end); - } - } + uint32_t time_cur_points = time_chunk_writer->get_point_numer(); + const uint32_t first_seg_cap = + (time_cur_points > 0 && time_cur_points < page_max_points) + ? (page_max_points - time_cur_points) + : page_max_points; + + std::vector page_boundaries; // row indices where a page + // should seal + { + uint32_t pos = si; + uint32_t seg_cap = first_seg_cap; + while (pos < ei) { + uint32_t seg_end = std::min(pos + seg_cap, ei); + if (seg_end < ei) { + page_boundaries.push_back(seg_end); } + pos = seg_end; + seg_cap = page_max_points; + } + } - // Write values column-by-column and seal at recorded time - // boundaries. - for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { - ValueChunkWriter* value_chunk_writer = - value_chunk_writers[k]; - if (IS_NULL(value_chunk_writer)) { - continue; - } - uint32_t seg_start = static_cast(start_idx); - for (uint32_t boundary_end : time_page_row_ends) { - if (boundary_end <= seg_start) { - continue; - } - if (RET_FAIL(value_write_column( - value_chunk_writer, tablet, field_columns[k], - seg_start, boundary_end))) { - return ret; - } - if (value_chunk_writer->has_current_page_data() && - RET_FAIL(value_chunk_writer->seal_current_page())) { - return ret; - } - seg_start = boundary_end; - } - if (seg_start < static_cast(end_idx)) { - if (RET_FAIL(value_write_column( - value_chunk_writer, tablet, field_columns[k], - seg_start, static_cast(end_idx)))) { - return ret; - } - } - } + // Write one column in segments defined by page_boundaries, sealing + // at each boundary. Works for both time and value columns. + auto write_time_in_segments = + [this, &tablet, &page_boundaries, si, ei]( + TimeChunkWriter* tcw) -> int { + int r = E_OK; + uint32_t seg_start = si; + for (uint32_t boundary : page_boundaries) { + if ((r = time_write_column(tcw, tablet, seg_start, + boundary)) != E_OK) + return r; + if ((r = tcw->seal_current_page()) != E_OK) return r; + seg_start = boundary; } - start_idx = end_idx; - } else { - MeasurementNamesFromTablet mnames_getter(tablet); - SimpleVector chunk_writers; - SimpleVector data_types; - if (RET_FAIL(do_check_schema(device_id, mnames_getter, - chunk_writers, data_types))) { + if (seg_start < ei) { + r = time_write_column(tcw, tablet, seg_start, ei); + } + return r; + }; + + auto write_value_in_segments = + [this, &tablet, &page_boundaries, si, ei]( + ValueChunkWriter* vcw, uint32_t col_idx) -> int { + int r = E_OK; + uint32_t seg_start = si; + for (uint32_t boundary : page_boundaries) { + if ((r = value_write_column(vcw, tablet, col_idx, seg_start, + boundary)) != E_OK) + return r; + if (vcw->has_current_page_data() && + (r = vcw->seal_current_page()) != E_OK) + return r; + seg_start = boundary; + } + if (seg_start < ei) { + r = value_write_column(vcw, tablet, col_idx, seg_start, ei); + } + return r; + }; + + // All columns (time + values) write the same row segments and seal + // at the same boundaries — fully parallel. +#ifdef ENABLE_THREADS + if (g_config_value_.parallel_write_enabled_) { + std::vector> futures; + futures.push_back(thread_pool_.submit( + [&write_time_in_segments, time_chunk_writer]() { + return write_time_in_segments(time_chunk_writer); + })); + for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { + ValueChunkWriter* vcw = value_chunk_writers[k]; + if (IS_NULL(vcw)) continue; + uint32_t col_idx = field_columns[k]; + futures.push_back(thread_pool_.submit( + [&write_value_in_segments, vcw, col_idx]() { + return write_value_in_segments(vcw, col_idx); + })); + } + for (auto& f : futures) { + int r = f.get(); + if (r != E_OK && ret == E_OK) ret = r; + } + if (ret != E_OK) return ret; + } else +#endif + { + if (RET_FAIL(write_time_in_segments(time_chunk_writer))) { return ret; } - ASSERT(chunk_writers.size() == tablet.get_column_count()); - for (uint32_t c = 0; c < chunk_writers.size(); c++) { - ChunkWriter* chunk_writer = chunk_writers[c]; - if (IS_NULL(chunk_writer)) { - continue; - } - if (RET_FAIL(write_column(chunk_writer, tablet, c, start_idx, - device_id_end_index_pair.second))) { + for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { + ValueChunkWriter* vcw = value_chunk_writers[k]; + if (IS_NULL(vcw)) continue; + if (RET_FAIL(write_value_in_segments(vcw, field_columns[k]))) { return ret; } } - start_idx = device_id_end_index_pair.second; } + start_idx = end_idx; } record_count_since_last_flush_ += tablet.cur_row_size_; // Reset string column buffers so the tablet can be reused for the next diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h index 01028e2e2..87646bc7c 100644 --- a/cpp/src/writer/tsfile_writer.h +++ b/cpp/src/writer/tsfile_writer.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -32,6 +33,9 @@ #include "common/record.h" #include "common/schema.h" #include "common/tablet.h" +#ifdef ENABLE_THREADS +#include "common/thread_pool.h" +#endif namespace storage { class WriteFile; @@ -194,7 +198,10 @@ class TsFileWriter { int64_t record_count_for_next_mem_check_; bool write_file_created_; bool io_writer_owned_; // false when init(RestorableTsFileIOWriter*) - bool table_aligned_ = true; +#ifdef ENABLE_THREADS + common::ThreadPool thread_pool_{ + (size_t)common::g_config_value_.write_thread_count_}; +#endif int write_typed_column(ValueChunkWriter* value_chunk_writer, int64_t* timestamps, bool* col_values, diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc b/cpp/test/writer/table_view/tsfile_writer_table_test.cc index 477f875e7..52f647360 100644 --- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc +++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc @@ -26,15 +26,17 @@ #include "file/tsfile_io_writer.h" #include "file/write_file.h" #include "reader/tsfile_reader.h" +#include "common/global.h" #include "writer/chunk_writer.h" #include "writer/tsfile_table_writer.h" using namespace storage; using namespace common; -class TsFileWriterTableTest : public ::testing::Test { +class TsFileWriterTableTest : public ::testing::TestWithParam { protected: void SetUp() override { libtsfile_init(); + g_config_value_.parallel_write_enabled_ = GetParam(); file_name_ = std::string("tsfile_writer_table_test_") + generate_random_string(10) + std::string(".tsfile"); remove(file_name_.c_str()); @@ -133,7 +135,7 @@ class TsFileWriterTableTest : public ::testing::Test { } }; -TEST_F(TsFileWriterTableTest, WriteTableTest) { +TEST_P(TsFileWriterTableTest, WriteTableTest) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared(&write_file_, table_schema); @@ -144,7 +146,7 @@ TEST_F(TsFileWriterTableTest, WriteTableTest) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WithoutTagAndMultiPage) { +TEST_P(TsFileWriterTableTest, WithoutTagAndMultiPage) { std::vector measurement_schemas; std::vector column_categories; measurement_schemas.resize(1); @@ -192,7 +194,7 @@ TEST_F(TsFileWriterTableTest, WithoutTagAndMultiPage) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteDisorderTest) { +TEST_P(TsFileWriterTableTest, WriteDisorderTest) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared(&write_file_, table_schema); @@ -242,7 +244,7 @@ TEST_F(TsFileWriterTableTest, WriteDisorderTest) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteTableTestMultiFlush) { +TEST_P(TsFileWriterTableTest, WriteTableTestMultiFlush) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared( &write_file_, table_schema, 2 * 1024); @@ -255,7 +257,7 @@ TEST_F(TsFileWriterTableTest, WriteTableTestMultiFlush) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteNonExistColumnTest) { +TEST_P(TsFileWriterTableTest, WriteNonExistColumnTest) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared(&write_file_, table_schema); @@ -283,7 +285,7 @@ TEST_F(TsFileWriterTableTest, WriteNonExistColumnTest) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteNonExistTableTest) { +TEST_P(TsFileWriterTableTest, WriteNonExistTableTest) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared(&write_file_, table_schema); @@ -295,7 +297,7 @@ TEST_F(TsFileWriterTableTest, WriteNonExistTableTest) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriterWithMemoryThreshold) { +TEST_P(TsFileWriterTableTest, WriterWithMemoryThreshold) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared( &write_file_, table_schema, 256 * 1024 * 1024); @@ -305,7 +307,7 @@ TEST_F(TsFileWriterTableTest, WriterWithMemoryThreshold) { delete table_schema; } -TEST_F(TsFileWriterTableTest, EmptyTagWrite) { +TEST_P(TsFileWriterTableTest, EmptyTagWrite) { std::vector measurement_schemas; std::vector column_categories; measurement_schemas.resize(3); @@ -361,7 +363,7 @@ TEST_F(TsFileWriterTableTest, EmptyTagWrite) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WritehDataTypeMisMatch) { +TEST_P(TsFileWriterTableTest, WritehDataTypeMisMatch) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared( &write_file_, table_schema, 256 * 1024 * 1024); @@ -412,7 +414,7 @@ TEST_F(TsFileWriterTableTest, WritehDataTypeMisMatch) { tsfile_table_writer_->close(); } -TEST_F(TsFileWriterTableTest, WriteAndReadSimple) { +TEST_P(TsFileWriterTableTest, WriteAndReadSimple) { std::vector measurement_schemas; std::vector column_categories; measurement_schemas.resize(2); @@ -467,7 +469,7 @@ TEST_F(TsFileWriterTableTest, WriteAndReadSimple) { delete table_schema; } -TEST_F(TsFileWriterTableTest, DuplicateColumnName) { +TEST_P(TsFileWriterTableTest, DuplicateColumnName) { std::vector measurement_schemas; std::vector column_categories; measurement_schemas.resize(3); @@ -505,7 +507,7 @@ TEST_F(TsFileWriterTableTest, DuplicateColumnName) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteWithNullAndEmptyTag) { +TEST_P(TsFileWriterTableTest, WriteWithNullAndEmptyTag) { std::vector measurement_schemas; std::vector column_categories; for (int i = 0; i < 3; i++) { @@ -637,7 +639,7 @@ TEST_F(TsFileWriterTableTest, WriteWithNullAndEmptyTag) { ASSERT_EQ(reader.close(), common::E_OK); } -TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) { +TEST_P(TsFileWriterTableTest, MultiDeviceMultiFields) { common::config_set_max_degree_of_index_node(5); auto table_schema = gen_table_schema(0, 1, 100); auto tsfile_table_writer_ = @@ -696,7 +698,7 @@ TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteDataWithEmptyField) { +TEST_P(TsFileWriterTableTest, WriteDataWithEmptyField) { std::vector measurement_schemas; std::vector column_categories; for (int i = 0; i < 3; i++) { @@ -773,7 +775,7 @@ TEST_F(TsFileWriterTableTest, WriteDataWithEmptyField) { ASSERT_EQ(reader.close(), common::E_OK); } -TEST_F(TsFileWriterTableTest, MultiDatatypes) { +TEST_P(TsFileWriterTableTest, MultiDatatypes) { std::vector measurement_schemas; std::vector column_categories; @@ -877,7 +879,7 @@ TEST_F(TsFileWriterTableTest, MultiDatatypes) { delete[] literal; } -TEST_F(TsFileWriterTableTest, DiffCodecTypes) { +TEST_P(TsFileWriterTableTest, DiffCodecTypes) { std::vector measurement_schemas; std::vector column_categories; @@ -985,7 +987,7 @@ TEST_F(TsFileWriterTableTest, DiffCodecTypes) { delete[] literal; } -TEST_F(TsFileWriterTableTest, EncodingConfigIntegration) { +TEST_P(TsFileWriterTableTest, EncodingConfigIntegration) { // 1. Test setting global compression type ASSERT_EQ(E_OK, set_global_compression(SNAPPY)); @@ -1098,7 +1100,7 @@ TEST_F(TsFileWriterTableTest, EncodingConfigIntegration) { } #ifdef ENABLE_MEM_STAT -TEST_F(TsFileWriterTableTest, DISABLED_MemStatWriteAndVerify) { +TEST_P(TsFileWriterTableTest, DISABLED_MemStatWriteAndVerify) { TableSchema* table_schema = gen_table_schema(0, 2, 3); auto tsfile_table_writer = std::make_shared(&write_file_, table_schema); @@ -1172,4 +1174,7 @@ TEST_F(TsFileWriterTableTest, DISABLED_MemStatWriteAndVerify) { delete table_schema; } -#endif \ No newline at end of file +#endif + +INSTANTIATE_TEST_SUITE_P(Serial, TsFileWriterTableTest, ::testing::Values(false)); +INSTANTIATE_TEST_SUITE_P(Parallel, TsFileWriterTableTest, ::testing::Values(true)); \ No newline at end of file From c828b0142fba6a8bffd08a1d338f426d3e636b44 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 9 Apr 2026 21:38:28 +0800 Subject: [PATCH 2/5] fix format. --- cpp/src/common/tablet.cc | 12 +++++------ cpp/src/writer/tsfile_writer.cc | 20 +++++++++---------- .../table_view/tsfile_writer_table_test.cc | 8 +++++--- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index 6233abd79..148a051b5 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -514,8 +514,7 @@ void permute_string_column(Tablet::StringColumn* sc, BitMap& bm, tmp.append(i, "", 0); } else { int32_t off = sc->offsets[r]; - uint32_t len = - static_cast(sc->offsets[r + 1] - off); + uint32_t len = static_cast(sc->offsets[r + 1] - off); tmp.append(i, sc->buffer + off, len); } } @@ -527,8 +526,7 @@ void permute_string_column(Tablet::StringColumn* sc, BitMap& bm, tmp.destroy(); } -void permute_bitmap(BitMap& bm, const std::vector& perm, - uint32_t n) { +void permute_bitmap(BitMap& bm, const std::vector& perm, uint32_t n) { if (!bm.get_bitmap()) return; uint32_t size_bytes = bm.get_size(); // Save original bits. @@ -568,9 +566,9 @@ void Tablet::sort_by_device() { int32_t b_off = sc.offsets[b]; uint32_t b_len = static_cast(sc.offsets[b + 1] - b_off); uint32_t min_len = std::min(a_len, b_len); - int cmp = (min_len > 0) - ? memcmp(sc.buffer + a_off, sc.buffer + b_off, min_len) - : 0; + int cmp = (min_len > 0) ? memcmp(sc.buffer + a_off, + sc.buffer + b_off, min_len) + : 0; if (cmp != 0) return cmp < 0; if (a_len != b_len) return a_len < b_len; } diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index e705bab67..a3e72461b 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -1148,8 +1148,7 @@ int TsFileWriter::write_table(Tablet& tablet) { SimpleVector value_chunk_writers; TimeChunkWriter* time_chunk_writer = nullptr; - if (RET_FAIL(do_check_schema_table(device_id, tablet, - time_chunk_writer, + if (RET_FAIL(do_check_schema_table(device_id, tablet, time_chunk_writer, value_chunk_writers))) { return ret; } @@ -1179,7 +1178,7 @@ int TsFileWriter::write_table(Tablet& tablet) { : page_max_points; std::vector page_boundaries; // row indices where a page - // should seal + // should seal { uint32_t pos = si; uint32_t seg_cap = first_seg_cap; @@ -1195,14 +1194,13 @@ int TsFileWriter::write_table(Tablet& tablet) { // Write one column in segments defined by page_boundaries, sealing // at each boundary. Works for both time and value columns. - auto write_time_in_segments = - [this, &tablet, &page_boundaries, si, ei]( - TimeChunkWriter* tcw) -> int { + auto write_time_in_segments = [this, &tablet, &page_boundaries, si, + ei](TimeChunkWriter* tcw) -> int { int r = E_OK; uint32_t seg_start = si; for (uint32_t boundary : page_boundaries) { - if ((r = time_write_column(tcw, tablet, seg_start, - boundary)) != E_OK) + if ((r = time_write_column(tcw, tablet, seg_start, boundary)) != + E_OK) return r; if ((r = tcw->seal_current_page()) != E_OK) return r; seg_start = boundary; @@ -1213,9 +1211,9 @@ int TsFileWriter::write_table(Tablet& tablet) { return r; }; - auto write_value_in_segments = - [this, &tablet, &page_boundaries, si, ei]( - ValueChunkWriter* vcw, uint32_t col_idx) -> int { + auto write_value_in_segments = [this, &tablet, &page_boundaries, si, + ei](ValueChunkWriter* vcw, + uint32_t col_idx) -> int { int r = E_OK; uint32_t seg_start = si; for (uint32_t boundary : page_boundaries) { diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc b/cpp/test/writer/table_view/tsfile_writer_table_test.cc index 52f647360..00d4405e3 100644 --- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc +++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc @@ -20,13 +20,13 @@ #include +#include "common/global.h" #include "common/record.h" #include "common/schema.h" #include "common/tablet.h" #include "file/tsfile_io_writer.h" #include "file/write_file.h" #include "reader/tsfile_reader.h" -#include "common/global.h" #include "writer/chunk_writer.h" #include "writer/tsfile_table_writer.h" using namespace storage; @@ -1176,5 +1176,7 @@ TEST_P(TsFileWriterTableTest, DISABLED_MemStatWriteAndVerify) { } #endif -INSTANTIATE_TEST_SUITE_P(Serial, TsFileWriterTableTest, ::testing::Values(false)); -INSTANTIATE_TEST_SUITE_P(Parallel, TsFileWriterTableTest, ::testing::Values(true)); \ No newline at end of file +INSTANTIATE_TEST_SUITE_P(Serial, TsFileWriterTableTest, + ::testing::Values(false)); +INSTANTIATE_TEST_SUITE_P(Parallel, TsFileWriterTableTest, + ::testing::Values(true)); \ No newline at end of file From ec4b47d181765100695a9a0fa173144136583c84 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Fri, 10 Apr 2026 09:57:32 +0800 Subject: [PATCH 3/5] fix seg segv. --- cpp/src/common/tablet.cc | 129 -------------------------------- cpp/src/common/tablet.h | 10 ++- cpp/src/writer/tsfile_writer.cc | 11 ++- 3 files changed, 14 insertions(+), 136 deletions(-) diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index 148a051b5..4088a6927 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -19,9 +19,7 @@ #include "tablet.h" -#include #include -#include #include "allocator/alloc_base.h" #include "datatype/date_converter.h" @@ -493,133 +491,6 @@ void Tablet::set_column_categories( } } -namespace { - -template -void permute_array(T* arr, const std::vector& perm, uint32_t n) { - std::vector tmp(n); - for (uint32_t i = 0; i < n; i++) tmp[i] = arr[perm[i]]; - std::copy(tmp.begin(), tmp.end(), arr); -} - -void permute_string_column(Tablet::StringColumn* sc, BitMap& bm, - const std::vector& perm, uint32_t n, - uint32_t max_rows) { - Tablet::StringColumn tmp; - tmp.init(max_rows, sc->buf_used > 0 ? sc->buf_used : 64); - for (uint32_t i = 0; i < n; i++) { - uint32_t r = perm[i]; - if (bm.test(r)) { - // Null row — write a zero-length entry to keep offsets valid. - tmp.append(i, "", 0); - } else { - int32_t off = sc->offsets[r]; - uint32_t len = static_cast(sc->offsets[r + 1] - off); - tmp.append(i, sc->buffer + off, len); - } - } - // Swap contents. - std::swap(sc->offsets, tmp.offsets); - std::swap(sc->buffer, tmp.buffer); - std::swap(sc->buf_capacity, tmp.buf_capacity); - std::swap(sc->buf_used, tmp.buf_used); - tmp.destroy(); -} - -void permute_bitmap(BitMap& bm, const std::vector& perm, uint32_t n) { - if (!bm.get_bitmap()) return; - uint32_t size_bytes = bm.get_size(); - // Save original bits. - std::vector orig(bm.get_bitmap(), bm.get_bitmap() + size_bytes); - // Clear all bits (= all non-null). - bm.clear_all(); - // Re-set null bits through the permutation. - for (uint32_t i = 0; i < n; i++) { - uint32_t src = perm[i]; - if (orig[src >> 3] & (1 << (src & 7))) { - bm.set(i); - } - } -} - -} // anonymous namespace - -void Tablet::sort_by_device() { - if (id_column_indexes_.empty() || cur_row_size_ <= 1) return; - - const uint32_t n = cur_row_size_; - - // Build permutation sorted by tag column values (stable sort keeps - // timestamp order within each device). - std::vector perm(n); - std::iota(perm.begin(), perm.end(), 0); - - std::stable_sort(perm.begin(), perm.end(), [this](uint32_t a, uint32_t b) { - for (int idx : id_column_indexes_) { - bool a_null = bitmaps_[idx].test(a); - bool b_null = bitmaps_[idx].test(b); - if (a_null != b_null) return a_null > b_null; // null sorts last - if (a_null) continue; // both null — equal on this column - const StringColumn& sc = *value_matrix_[idx].string_col; - int32_t a_off = sc.offsets[a]; - uint32_t a_len = static_cast(sc.offsets[a + 1] - a_off); - int32_t b_off = sc.offsets[b]; - uint32_t b_len = static_cast(sc.offsets[b + 1] - b_off); - uint32_t min_len = std::min(a_len, b_len); - int cmp = (min_len > 0) ? memcmp(sc.buffer + a_off, - sc.buffer + b_off, min_len) - : 0; - if (cmp != 0) return cmp < 0; - if (a_len != b_len) return a_len < b_len; - } - return false; - }); - - // Check if already sorted. - bool sorted = true; - for (uint32_t i = 0; i < n && sorted; i++) { - if (perm[i] != i) sorted = false; - } - if (sorted) return; - - // Apply permutation to timestamps. - permute_array(timestamps_, perm, n); - - // Apply permutation to each column. - uint32_t col_count = static_cast(schema_vec_->size()); - for (uint32_t c = 0; c < col_count; c++) { - TSDataType dt = schema_vec_->at(c).data_type_; - switch (dt) { - case BOOLEAN: - permute_array(value_matrix_[c].bool_data, perm, n); - break; - case INT32: - case DATE: - permute_array(value_matrix_[c].int32_data, perm, n); - break; - case INT64: - case TIMESTAMP: - permute_array(value_matrix_[c].int64_data, perm, n); - break; - case FLOAT: - permute_array(value_matrix_[c].float_data, perm, n); - break; - case DOUBLE: - permute_array(value_matrix_[c].double_data, perm, n); - break; - case STRING: - case TEXT: - case BLOB: - permute_string_column(value_matrix_[c].string_col, bitmaps_[c], - perm, n, max_row_num_); - break; - default: - break; - } - permute_bitmap(bitmaps_[c], perm, n); - } -} - void Tablet::reset_string_columns() { size_t schema_count = schema_vec_->size(); for (size_t c = 0; c < schema_count; c++) { diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index a6bebae92..50750d02b 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -236,6 +236,12 @@ class Tablet { } size_t get_column_count() const { return schema_vec_->size(); } uint32_t get_cur_row_size() const { return cur_row_size_; } + int64_t get_timestamp(uint32_t row_index) const { + return timestamps_[row_index]; + } + bool is_null(uint32_t row_index, uint32_t col_index) const { + return bitmaps_[col_index].test(row_index); + } /** * @brief Adds a timestamp to the specified row. @@ -285,10 +291,6 @@ class Tablet { void set_column_categories( const std::vector& column_categories); - // Sort rows so that rows belonging to the same device (same TAG column - // values) are contiguous. Stable sort: preserves timestamp order within - // each device. No-op when there are no TAG columns or ≤1 rows. - void sort_by_device(); std::shared_ptr get_device_id(int i) const; std::vector find_all_device_boundaries() const; diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index a3e72461b..2cc903819 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -1136,9 +1136,6 @@ int TsFileWriter::write_table(Tablet& tablet) { return ret; } - // Sort tablet so same-device rows are contiguous. - tablet.sort_by_device(); - auto device_id_end_index_pairs = split_tablet_by_device(tablet); int start_idx = 0; for (auto& device_id_end_index_pair : device_id_end_index_pairs) { @@ -1194,9 +1191,14 @@ int TsFileWriter::write_table(Tablet& tablet) { // Write one column in segments defined by page_boundaries, sealing // at each boundary. Works for both time and value columns. + // We control page sealing explicitly at precomputed boundaries, so + // auto-seal must be disabled — otherwise a segment of exactly + // page_max_points would trigger auto-seal AND our explicit seal, + // double-sealing (sealing an empty page → crash). auto write_time_in_segments = [this, &tablet, &page_boundaries, si, ei](TimeChunkWriter* tcw) -> int { int r = E_OK; + tcw->set_enable_page_seal_if_full(false); uint32_t seg_start = si; for (uint32_t boundary : page_boundaries) { if ((r = time_write_column(tcw, tablet, seg_start, boundary)) != @@ -1208,6 +1210,7 @@ int TsFileWriter::write_table(Tablet& tablet) { if (seg_start < ei) { r = time_write_column(tcw, tablet, seg_start, ei); } + tcw->set_enable_page_seal_if_full(true); return r; }; @@ -1215,6 +1218,7 @@ int TsFileWriter::write_table(Tablet& tablet) { ei](ValueChunkWriter* vcw, uint32_t col_idx) -> int { int r = E_OK; + vcw->set_enable_page_seal_if_full(false); uint32_t seg_start = si; for (uint32_t boundary : page_boundaries) { if ((r = value_write_column(vcw, tablet, col_idx, seg_start, @@ -1228,6 +1232,7 @@ int TsFileWriter::write_table(Tablet& tablet) { if (seg_start < ei) { r = value_write_column(vcw, tablet, col_idx, seg_start, ei); } + vcw->set_enable_page_seal_if_full(true); return r; }; From 9958a5d609f13ba624b65592722633a768f465f7 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Fri, 10 Apr 2026 15:56:03 +0800 Subject: [PATCH 4/5] fix comment. --- cpp/CMakeLists.txt | 2 ++ cpp/src/common/global.cc | 9 ++++-- cpp/src/common/global.h | 6 +++- cpp/src/common/thread_pool.h | 1 + cpp/src/writer/tsfile_writer.cc | 31 ++++++++++++++++--- cpp/src/writer/tsfile_writer.h | 7 +++-- .../table_view/tsfile_writer_table_test.cc | 2 +- 7 files changed, 47 insertions(+), 11 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index cc50a1782..3f1d8bdd3 100755 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -155,6 +155,8 @@ message("cmake using: ENABLE_THREADS=${ENABLE_THREADS}") if (ENABLE_THREADS) add_definitions(-DENABLE_THREADS) + find_package(Threads REQUIRED) + link_libraries(Threads::Threads) endif() option(ENABLE_SIMDE "Enable SIMDe (SIMD Everywhere)" OFF) diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc index 721522c77..859bb2520 100644 --- a/cpp/src/common/global.cc +++ b/cpp/src/common/global.cc @@ -24,6 +24,8 @@ #endif #include +#include + #include "utils/injection.h" namespace common { @@ -61,8 +63,11 @@ void init_config_value() { #else g_config_value_.default_compression_type_ = UNCOMPRESSED; #endif - g_config_value_.parallel_write_enabled_ = true; - g_config_value_.write_thread_count_ = 6; + unsigned int hw_cores = std::thread::hardware_concurrency(); + if (hw_cores == 0) hw_cores = 1; // fallback if detection fails + g_config_value_.parallel_write_enabled_ = (hw_cores > 1); + g_config_value_.write_thread_count_ = + static_cast(std::min(hw_cores, 64u)); // Enforce aligned page size limits strictly by default. g_config_value_.strict_page_size_ = true; } diff --git a/cpp/src/common/global.h b/cpp/src/common/global.h index 01be5a9c0..3a4525b70 100644 --- a/cpp/src/common/global.h +++ b/cpp/src/common/global.h @@ -168,9 +168,13 @@ FORCE_INLINE void set_parallel_write_enabled(bool enabled) { } FORCE_INLINE bool get_parallel_write_enabled() { - return g_config_value_.parallel_write_enabled_; + return g_config_value_.parallel_write_enabled_ && + g_config_value_.write_thread_count_ > 1; } +// Set the number of threads for parallel writes. Must be called before +// constructing TsFileWriter instances — existing writers' thread pools +// are not resized at runtime. FORCE_INLINE int set_write_thread_count(int32_t count) { if (count < 1 || count > 64) return E_INVALID_ARG; g_config_value_.write_thread_count_ = count; diff --git a/cpp/src/common/thread_pool.h b/cpp/src/common/thread_pool.h index 9285d4ff4..f82aea038 100644 --- a/cpp/src/common/thread_pool.h +++ b/cpp/src/common/thread_pool.h @@ -27,6 +27,7 @@ #include #include #include +#include #include namespace common { diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 2cc903819..cee742f10 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -1168,7 +1168,25 @@ int TsFileWriter::write_table(Tablet& tablet) { const uint32_t si = static_cast(start_idx); const uint32_t ei = static_cast(end_idx); + // If the current unsealed page is already at or past capacity (from + // a previous write_table call), seal it before starting new segments. uint32_t time_cur_points = time_chunk_writer->get_point_numer(); + if (time_cur_points >= page_max_points) { + if (time_chunk_writer->has_current_page_data()) { + if (RET_FAIL(time_chunk_writer->seal_current_page())) { + return ret; + } + } + for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { + if (!IS_NULL(value_chunk_writers[k]) && + value_chunk_writers[k]->has_current_page_data()) { + if (RET_FAIL(value_chunk_writers[k]->seal_current_page())) { + return ret; + } + } + } + time_cur_points = 0; + } const uint32_t first_seg_cap = (time_cur_points > 0 && time_cur_points < page_max_points) ? (page_max_points - time_cur_points) @@ -1189,12 +1207,15 @@ int TsFileWriter::write_table(Tablet& tablet) { } } - // Write one column in segments defined by page_boundaries, sealing - // at each boundary. Works for both time and value columns. // We control page sealing explicitly at precomputed boundaries, so - // auto-seal must be disabled — otherwise a segment of exactly - // page_max_points would trigger auto-seal AND our explicit seal, - // double-sealing (sealing an empty page → crash). + // auto-seal must be disabled during segmented writes — otherwise a + // segment of exactly page_max_points would trigger auto-seal AND + // our explicit seal, double-sealing (sealing an empty page → crash). + // Note: with auto-seal off, the memory-based threshold + // (page_writer_max_memory_bytes_) is not enforced within a segment. + // For varlen columns (STRING/TEXT/BLOB), individual pages may exceed + // the memory limit. Each segment is still bounded by + // page_max_points rows, keeping pages within a reasonable size. auto write_time_in_segments = [this, &tablet, &page_boundaries, si, ei](TimeChunkWriter* tcw) -> int { int r = E_OK; diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h index 87646bc7c..bf409879e 100644 --- a/cpp/src/writer/tsfile_writer.h +++ b/cpp/src/writer/tsfile_writer.h @@ -199,8 +199,11 @@ class TsFileWriter { bool write_file_created_; bool io_writer_owned_; // false when init(RestorableTsFileIOWriter*) #ifdef ENABLE_THREADS - common::ThreadPool thread_pool_{ - (size_t)common::g_config_value_.write_thread_count_}; + static size_t get_thread_pool_size() { + int32_t count = common::g_config_value_.write_thread_count_; + return count > 0 ? static_cast(count) : size_t{1}; + } + common::ThreadPool thread_pool_{get_thread_pool_size()}; #endif int write_typed_column(ValueChunkWriter* value_chunk_writer, diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc b/cpp/test/writer/table_view/tsfile_writer_table_test.cc index 00d4405e3..d1f3b92e4 100644 --- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc +++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc @@ -36,7 +36,7 @@ class TsFileWriterTableTest : public ::testing::TestWithParam { protected: void SetUp() override { libtsfile_init(); - g_config_value_.parallel_write_enabled_ = GetParam(); + set_parallel_write_enabled(GetParam()); file_name_ = std::string("tsfile_writer_table_test_") + generate_random_string(10) + std::string(".tsfile"); remove(file_name_.c_str()); From 97e7bae41f2d1f9cb630c6877b98a450625344e7 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Fri, 10 Apr 2026 16:13:40 +0800 Subject: [PATCH 5/5] add readme. --- cpp/README-zh.md | 29 +++++++++++++++++++++++++++++ cpp/README.md | 27 +++++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/cpp/README-zh.md b/cpp/README-zh.md index 2a8c84b74..1ff0372f3 100644 --- a/cpp/README-zh.md +++ b/cpp/README-zh.md @@ -105,6 +105,35 @@ make --- +## 并行写入 + +TsFile C++ 支持基于线程池的列级并行编码,适用于表模型写入路径(`write_table`)。启用后,时间列和所有值列使用预计算的 page 边界并行写入,同时保证各列 page 对齐封盘。 + +### 编译选项 + +并行写入通过 `ENABLE_THREADS` CMake 选项控制(默认开启): + +```bash +cmake .. -DENABLE_THREADS=ON # 开启(默认) +cmake .. -DENABLE_THREADS=OFF # 关闭——编译期剥离所有线程代码 +``` + +### 运行时配置 + +```cpp +#include "common/global.h" + +// 运行时开启或关闭并行写入(单核机器自动禁用) +storage::set_parallel_write_enabled(true); + +// 设置工作线程数(必须在创建 TsFileWriter 之前调用) +storage::set_write_thread_count(4); +``` + +默认情况下,当机器 CPU 核数大于 1 时自动启用并行写入,线程数设为硬件核数(上限 64)。 + +--- + ## 使用 TsFile 你可以在 `./examples/cpp_examples` 目录下的 `demo_read.cpp` 和 `demo_write.cpp` 中查看读写数据的示例。 diff --git a/cpp/README.md b/cpp/README.md index e328413ca..9ee9d7c97 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -75,6 +75,33 @@ cmake .. -DToolChain=ON make ``` +## Parallel Write + +TsFile C++ supports thread pool-based parallel column encoding for the table write path (`write_table`). When enabled, each column (time and value columns) is written in parallel using precomputed page boundaries, while maintaining aligned page sealing across columns. + +### Build Options + +Parallel write is controlled by the `ENABLE_THREADS` CMake option (ON by default): + +```bash +cmake .. -DENABLE_THREADS=ON # enable (default) +cmake .. -DENABLE_THREADS=OFF # disable — all thread code is stripped at compile time +``` + +### Runtime Configuration + +```cpp +#include "common/global.h" + +// Enable or disable parallel write at runtime (auto-disabled on single-core machines) +storage::set_parallel_write_enabled(true); + +// Set the number of worker threads (must be called before creating TsFileWriter) +storage::set_write_thread_count(4); +``` + +By default, parallel write is enabled when the machine has more than one CPU core, and the thread count is set to the number of hardware cores (capped at 64). + ## Use TsFile You can find examples on how to read and write data in `demo_read.cpp` and `demo_write.cpp` located under `./examples/cpp_examples`. There are also examples under `./examples/c_examples`on how to use a C-style API to read and write data in a C environment. You can run `bash build.sh` under `./examples` to generate an executable output under `./examples/build`. \ No newline at end of file