diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 9d2015e48..3f1d8bdd3 100755 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -150,6 +150,15 @@ if (ENABLE_ZLIB) add_definitions(-DENABLE_GZIP) endif() +option(ENABLE_THREADS "Enable multi-threaded read/write (requires pthreads)" ON) +message("cmake using: ENABLE_THREADS=${ENABLE_THREADS}") + +if (ENABLE_THREADS) + add_definitions(-DENABLE_THREADS) + find_package(Threads REQUIRED) + link_libraries(Threads::Threads) +endif() + option(ENABLE_SIMDE "Enable SIMDe (SIMD Everywhere)" OFF) message("cmake using: ENABLE_SIMDE=${ENABLE_SIMDE}") diff --git a/cpp/README-zh.md b/cpp/README-zh.md index 2a8c84b74..1ff0372f3 100644 --- a/cpp/README-zh.md +++ b/cpp/README-zh.md @@ -105,6 +105,35 @@ make --- +## 并行写入 + +TsFile C++ 支持基于线程池的列级并行编码,适用于表模型写入路径(`write_table`)。启用后,时间列和所有值列使用预计算的 page 边界并行写入,同时保证各列 page 对齐封盘。 + +### 编译选项 + +并行写入通过 `ENABLE_THREADS` CMake 选项控制(默认开启): + +```bash +cmake .. -DENABLE_THREADS=ON # 开启(默认) +cmake .. -DENABLE_THREADS=OFF # 关闭——编译期剥离所有线程代码 +``` + +### 运行时配置 + +```cpp +#include "common/global.h" + +// 运行时开启或关闭并行写入(单核机器自动禁用) +storage::set_parallel_write_enabled(true); + +// 设置工作线程数(必须在创建 TsFileWriter 之前调用) +storage::set_write_thread_count(4); +``` + +默认情况下,当机器 CPU 核数大于 1 时自动启用并行写入,线程数设为硬件核数(上限 64)。 + +--- + ## 使用 TsFile 你可以在 `./examples/cpp_examples` 目录下的 `demo_read.cpp` 和 `demo_write.cpp` 中查看读写数据的示例。 diff --git a/cpp/README.md b/cpp/README.md index e328413ca..9ee9d7c97 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -75,6 +75,33 @@ cmake .. -DToolChain=ON make ``` +## Parallel Write + +TsFile C++ supports thread pool-based parallel column encoding for the table write path (`write_table`). When enabled, each column (time and value columns) is written in parallel using precomputed page boundaries, while maintaining aligned page sealing across columns. + +### Build Options + +Parallel write is controlled by the `ENABLE_THREADS` CMake option (ON by default): + +```bash +cmake .. -DENABLE_THREADS=ON # enable (default) +cmake .. -DENABLE_THREADS=OFF # disable — all thread code is stripped at compile time +``` + +### Runtime Configuration + +```cpp +#include "common/global.h" + +// Enable or disable parallel write at runtime (auto-disabled on single-core machines) +storage::set_parallel_write_enabled(true); + +// Set the number of worker threads (must be called before creating TsFileWriter) +storage::set_write_thread_count(4); +``` + +By default, parallel write is enabled when the machine has more than one CPU core, and the thread count is set to the number of hardware cores (capped at 64). + ## Use TsFile You can find examples on how to read and write data in `demo_read.cpp` and `demo_write.cpp` located under `./examples/cpp_examples`. There are also examples under `./examples/c_examples`on how to use a C-style API to read and write data in a C environment. You can run `bash build.sh` under `./examples` to generate an executable output under `./examples/build`. \ No newline at end of file diff --git a/cpp/src/common/config/config.h b/cpp/src/common/config/config.h index 81dad924f..e2b2039a7 100644 --- a/cpp/src/common/config/config.h +++ b/cpp/src/common/config/config.h @@ -46,6 +46,8 @@ typedef struct ConfigValue { TSEncoding double_encoding_type_; TSEncoding string_encoding_type_; CompressionType default_compression_type_; + bool parallel_write_enabled_; + int32_t write_thread_count_; // When true, aligned writer enforces page size limit strictly by // interleaving time/value writes and sealing pages together when any side // becomes full. diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc index 91ecedda1..859bb2520 100644 --- a/cpp/src/common/global.cc +++ b/cpp/src/common/global.cc @@ -24,6 +24,8 @@ #endif #include +#include + #include "utils/injection.h" namespace common { @@ -54,12 +56,18 @@ void init_config_value() { g_config_value_.int64_encoding_type_ = TS_2DIFF; g_config_value_.float_encoding_type_ = GORILLA; g_config_value_.double_encoding_type_ = GORILLA; + g_config_value_.string_encoding_type_ = PLAIN; // Default compression type is LZ4 #ifdef ENABLE_LZ4 g_config_value_.default_compression_type_ = LZ4; #else g_config_value_.default_compression_type_ = UNCOMPRESSED; #endif + unsigned int hw_cores = std::thread::hardware_concurrency(); + if (hw_cores == 0) hw_cores = 1; // fallback if detection fails + g_config_value_.parallel_write_enabled_ = (hw_cores > 1); + g_config_value_.write_thread_count_ = + static_cast(std::min(hw_cores, 64u)); // Enforce aligned page size limits strictly by default. g_config_value_.strict_page_size_ = true; } diff --git a/cpp/src/common/global.h b/cpp/src/common/global.h index 7937e9203..3a4525b70 100644 --- a/cpp/src/common/global.h +++ b/cpp/src/common/global.h @@ -163,6 +163,24 @@ FORCE_INLINE uint8_t get_global_compression() { return static_cast(g_config_value_.default_compression_type_); } +FORCE_INLINE void set_parallel_write_enabled(bool enabled) { + g_config_value_.parallel_write_enabled_ = enabled; +} + +FORCE_INLINE bool get_parallel_write_enabled() { + return g_config_value_.parallel_write_enabled_ && + g_config_value_.write_thread_count_ > 1; +} + +// Set the number of threads for parallel writes. Must be called before +// constructing TsFileWriter instances — existing writers' thread pools +// are not resized at runtime. +FORCE_INLINE int set_write_thread_count(int32_t count) { + if (count < 1 || count > 64) return E_INVALID_ARG; + g_config_value_.write_thread_count_ = count; + return E_OK; +} + extern int init_common(); extern bool is_timestamp_column_name(const char* time_col_name); extern void cols_to_json(ByteStream* byte_stream, diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index beedacc04..50750d02b 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -46,6 +46,7 @@ class TabletColIterator; * with their associated metadata such as column names and types. */ class Tablet { + public: // Arrow-style string column: offsets + contiguous buffer. // string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i] struct StringColumn { @@ -235,6 +236,12 @@ class Tablet { } size_t get_column_count() const { return schema_vec_->size(); } uint32_t get_cur_row_size() const { return cur_row_size_; } + int64_t get_timestamp(uint32_t row_index) const { + return timestamps_[row_index]; + } + bool is_null(uint32_t row_index, uint32_t col_index) const { + return bitmaps_[col_index].test(row_index); + } /** * @brief Adds a timestamp to the specified row. diff --git a/cpp/src/common/thread_pool.h b/cpp/src/common/thread_pool.h new file mode 100644 index 000000000..f82aea038 --- /dev/null +++ b/cpp/src/common/thread_pool.h @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef COMMON_THREAD_POOL_H +#define COMMON_THREAD_POOL_H + +#ifdef ENABLE_THREADS + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace common { + +// Unified fixed-size thread pool supporting both fire-and-forget tasks +// (submit void + wait_all) and future-returning tasks (submit). +// Used by both write path (column-parallel encoding) and read path +// (column-parallel decoding). +class ThreadPool { + public: + explicit ThreadPool(size_t num_threads) : stop_(false), active_(0) { + for (size_t i = 0; i < num_threads; i++) { + workers_.emplace_back([this] { worker_loop(); }); + } + } + + ~ThreadPool() { + { + std::lock_guard lk(mu_); + stop_ = true; + } + cv_work_.notify_all(); + for (auto& w : workers_) { + if (w.joinable()) w.join(); + } + } + + // Submit a fire-and-forget task (no return value). + void submit(std::function task) { + { + std::lock_guard lk(mu_); + tasks_.push(std::move(task)); + active_++; + } + cv_work_.notify_one(); + } + + // Submit a task that returns a value via std::future. + template + std::future::type> submit(F&& f) { + using RetType = typename std::result_of::type; + auto task = + std::make_shared>(std::forward(f)); + std::future result = task->get_future(); + { + std::lock_guard lk(mu_); + tasks_.emplace([task]() { (*task)(); }); + active_++; + } + cv_work_.notify_one(); + return result; + } + + // Block until all submitted tasks have completed. + void wait_all() { + std::unique_lock lk(mu_); + cv_done_.wait(lk, [this] { return active_ == 0 && tasks_.empty(); }); + } + + private: + void worker_loop() { + while (true) { + std::function task; + { + std::unique_lock lk(mu_); + cv_work_.wait(lk, [this] { return stop_ || !tasks_.empty(); }); + if (stop_ && tasks_.empty()) return; + task = std::move(tasks_.front()); + tasks_.pop(); + } + task(); + { + std::lock_guard lk(mu_); + active_--; + } + cv_done_.notify_one(); + } + } + + std::vector workers_; + std::queue> tasks_; + std::mutex mu_; + std::condition_variable cv_work_; + std::condition_variable cv_done_; + bool stop_; + int active_; +}; + +} // namespace common + +#endif // ENABLE_THREADS + +#endif // COMMON_THREAD_POOL_H diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 657fcabc2..cee742f10 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -1142,274 +1142,159 @@ int TsFileWriter::write_table(Tablet& tablet) { auto device_id = device_id_end_index_pair.first; int end_idx = device_id_end_index_pair.second; if (end_idx == 0) continue; - if (table_aligned_) { - SimpleVector value_chunk_writers; - TimeChunkWriter* time_chunk_writer = nullptr; - if (RET_FAIL(do_check_schema_table(device_id, tablet, - time_chunk_writer, - value_chunk_writers))) { - return ret; - } - const bool strict_page_size = - common::g_config_value_.strict_page_size_; + SimpleVector value_chunk_writers; + TimeChunkWriter* time_chunk_writer = nullptr; + if (RET_FAIL(do_check_schema_table(device_id, tablet, time_chunk_writer, + value_chunk_writers))) { + return ret; + } - std::vector field_columns; - field_columns.reserve(tablet.get_column_count()); - for (uint32_t col = 0; col < tablet.get_column_count(); ++col) { - if (tablet.column_categories_[col] == - common::ColumnCategory::FIELD) { - field_columns.push_back(col); - } + std::vector field_columns; + field_columns.reserve(tablet.get_column_count()); + for (uint32_t col = 0; col < tablet.get_column_count(); ++col) { + if (tablet.column_categories_[col] == + common::ColumnCategory::FIELD) { + field_columns.push_back(col); } - ASSERT(field_columns.size() == value_chunk_writers.size()); - - const bool has_varlen_field_column = [&]() { - for (uint32_t i = 0; i < field_columns.size(); i++) { - const common::TSDataType t = - tablet.schema_vec_->at(field_columns[i]).data_type_; - if (t == common::STRING || t == common::TEXT || - t == common::BLOB) { - return true; - } - } - return false; - }(); - - // Keep writers' seal-check behavior consistent across calls. - time_chunk_writer->set_enable_page_seal_if_full(strict_page_size); - for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { - if (!IS_NULL(value_chunk_writers[c])) { - value_chunk_writers[c]->set_enable_page_seal_if_full( - strict_page_size); + } + ASSERT(field_columns.size() == value_chunk_writers.size()); + + // Precompute page boundaries from point counts — no serial write + // needed. The first segment may be shorter if the time page already + // holds data from a previous write_table call. + const uint32_t page_max_points = std::max( + 1, common::g_config_value_.page_writer_max_point_num_); + const uint32_t si = static_cast(start_idx); + const uint32_t ei = static_cast(end_idx); + + // If the current unsealed page is already at or past capacity (from + // a previous write_table call), seal it before starting new segments. + uint32_t time_cur_points = time_chunk_writer->get_point_numer(); + if (time_cur_points >= page_max_points) { + if (time_chunk_writer->has_current_page_data()) { + if (RET_FAIL(time_chunk_writer->seal_current_page())) { + return ret; } } - - if (strict_page_size) { - // Strict: row-based insertion and force aligned page sealing - // when either time or any value page becomes full. - for (int i = start_idx; i < end_idx; i++) { - int32_t time_pages_before = - time_chunk_writer->num_of_pages(); - std::vector value_pages_before( - value_chunk_writers.size(), 0); - for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { - if (!IS_NULL(value_chunk_writers[k])) { - value_pages_before[k] = - value_chunk_writers[k]->num_of_pages(); - } - } - - if (RET_FAIL( - time_chunk_writer->write(tablet.timestamps_[i]))) { - return ret; - } - - for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { - ValueChunkWriter* value_chunk_writer = - value_chunk_writers[k]; - if (IS_NULL(value_chunk_writer)) { - continue; - } - const uint32_t tablet_col_idx = field_columns[k]; - if (RET_FAIL(value_write_column(value_chunk_writer, - tablet, tablet_col_idx, - i, i + 1))) { - return ret; - } - } - - if (RET_FAIL(maybe_seal_aligned_pages_together( - time_chunk_writer, value_chunk_writers, - time_pages_before, value_pages_before))) { + for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { + if (!IS_NULL(value_chunk_writers[k]) && + value_chunk_writers[k]->has_current_page_data()) { + if (RET_FAIL(value_chunk_writers[k]->seal_current_page())) { return ret; } } - } else if (!has_varlen_field_column) { - // Optimization: no string/blob/text columns, so we can - // segment by point-number and seal pages at those boundaries - // in column-based order. - const uint32_t points_per_page = - common::g_config_value_.page_writer_max_point_num_; - - time_chunk_writer->set_enable_page_seal_if_full(false); - for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { - if (!IS_NULL(value_chunk_writers[c])) { - value_chunk_writers[c]->set_enable_page_seal_if_full( - false); - } - } - - // Fill the already-unsealed time page first. - uint32_t time_cur_points = time_chunk_writer->get_point_numer(); - if (time_cur_points >= points_per_page && - time_chunk_writer->has_current_page_data()) { - if (RET_FAIL(time_chunk_writer->seal_current_page())) { - return ret; - } - for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { - if (!IS_NULL(value_chunk_writers[c]) && - value_chunk_writers[c]->has_current_page_data()) { - if (RET_FAIL(value_chunk_writers[c] - ->seal_current_page())) { - return ret; - } - } - } - time_cur_points = 0; - } - - const uint32_t first_seg_len = - (time_cur_points > 0 && time_cur_points < points_per_page) - ? (points_per_page - time_cur_points) - : points_per_page; - - // 1) Write time in segments (seal all full segments). - uint32_t seg_start = static_cast(start_idx); - uint32_t seg_len = first_seg_len; - while (static_cast(seg_start) < end_idx) { - const uint32_t seg_end = std::min( - seg_start + seg_len, static_cast(end_idx)); - if (RET_FAIL(time_write_column(time_chunk_writer, tablet, - seg_start, seg_end))) { - return ret; - } - seg_start = seg_end; - if (static_cast(seg_start) < end_idx) { - if (RET_FAIL(time_chunk_writer->seal_current_page())) { - return ret; - } - } - seg_len = points_per_page; - } - - // 2) Write each value column (same segments). - for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { - ValueChunkWriter* value_chunk_writer = - value_chunk_writers[k]; - if (IS_NULL(value_chunk_writer)) { - continue; - } - seg_start = static_cast(start_idx); - seg_len = first_seg_len; - while (static_cast(seg_start) < end_idx) { - const uint32_t seg_end = - std::min(seg_start + seg_len, - static_cast(end_idx)); - if (RET_FAIL(value_write_column( - value_chunk_writer, tablet, field_columns[k], - seg_start, seg_end))) { - return ret; - } - seg_start = seg_end; - if (static_cast(seg_start) < end_idx) { - if (value_chunk_writer->has_current_page_data() && - RET_FAIL( - value_chunk_writer->seal_current_page())) { - return ret; - } - } - seg_len = points_per_page; - } - } - } else { - // General non-strict (may have varlen STRING/TEXT/BLOB - // columns): time auto-seals to provide aligned page boundaries; - // value writers skip auto page sealing and are sealed manually - // at recorded time-page boundaries. Attention: since value-side - // auto-seal is disabled, if a varlen value page hits the memory - // threshold earlier, it may not seal immediately and will be - // sealed later at the time-page boundaries (non-strict - // sacrifices the strict page size/memory limit for - // performance). - time_chunk_writer->set_enable_page_seal_if_full(true); - for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { - if (!IS_NULL(value_chunk_writers[c])) { - value_chunk_writers[c]->set_enable_page_seal_if_full( - false); - } - } - - std::vector time_page_row_ends; - const uint32_t page_max_points = std::max( - 1, common::g_config_value_.page_writer_max_point_num_); - const uint32_t batch_rows = - static_cast(end_idx - start_idx); - time_page_row_ends.reserve(batch_rows / page_max_points + 1); - for (uint32_t r = static_cast(start_idx); - r < static_cast(end_idx); r++) { - const int32_t pages_before = - time_chunk_writer->num_of_pages(); - if (RET_FAIL( - time_chunk_writer->write(tablet.timestamps_[r]))) { - return ret; - } - const int32_t pages_after = - time_chunk_writer->num_of_pages(); - if (pages_after > pages_before) { - const uint32_t boundary_end = r + 1; - if (time_page_row_ends.empty() || - time_page_row_ends.back() != boundary_end) { - time_page_row_ends.push_back(boundary_end); - } - } + } + time_cur_points = 0; + } + const uint32_t first_seg_cap = + (time_cur_points > 0 && time_cur_points < page_max_points) + ? (page_max_points - time_cur_points) + : page_max_points; + + std::vector page_boundaries; // row indices where a page + // should seal + { + uint32_t pos = si; + uint32_t seg_cap = first_seg_cap; + while (pos < ei) { + uint32_t seg_end = std::min(pos + seg_cap, ei); + if (seg_end < ei) { + page_boundaries.push_back(seg_end); } + pos = seg_end; + seg_cap = page_max_points; + } + } - // Write values column-by-column and seal at recorded time - // boundaries. - for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { - ValueChunkWriter* value_chunk_writer = - value_chunk_writers[k]; - if (IS_NULL(value_chunk_writer)) { - continue; - } - uint32_t seg_start = static_cast(start_idx); - for (uint32_t boundary_end : time_page_row_ends) { - if (boundary_end <= seg_start) { - continue; - } - if (RET_FAIL(value_write_column( - value_chunk_writer, tablet, field_columns[k], - seg_start, boundary_end))) { - return ret; - } - if (value_chunk_writer->has_current_page_data() && - RET_FAIL(value_chunk_writer->seal_current_page())) { - return ret; - } - seg_start = boundary_end; - } - if (seg_start < static_cast(end_idx)) { - if (RET_FAIL(value_write_column( - value_chunk_writer, tablet, field_columns[k], - seg_start, static_cast(end_idx)))) { - return ret; - } - } - } + // We control page sealing explicitly at precomputed boundaries, so + // auto-seal must be disabled during segmented writes — otherwise a + // segment of exactly page_max_points would trigger auto-seal AND + // our explicit seal, double-sealing (sealing an empty page → crash). + // Note: with auto-seal off, the memory-based threshold + // (page_writer_max_memory_bytes_) is not enforced within a segment. + // For varlen columns (STRING/TEXT/BLOB), individual pages may exceed + // the memory limit. Each segment is still bounded by + // page_max_points rows, keeping pages within a reasonable size. + auto write_time_in_segments = [this, &tablet, &page_boundaries, si, + ei](TimeChunkWriter* tcw) -> int { + int r = E_OK; + tcw->set_enable_page_seal_if_full(false); + uint32_t seg_start = si; + for (uint32_t boundary : page_boundaries) { + if ((r = time_write_column(tcw, tablet, seg_start, boundary)) != + E_OK) + return r; + if ((r = tcw->seal_current_page()) != E_OK) return r; + seg_start = boundary; } - start_idx = end_idx; - } else { - MeasurementNamesFromTablet mnames_getter(tablet); - SimpleVector chunk_writers; - SimpleVector data_types; - if (RET_FAIL(do_check_schema(device_id, mnames_getter, - chunk_writers, data_types))) { + if (seg_start < ei) { + r = time_write_column(tcw, tablet, seg_start, ei); + } + tcw->set_enable_page_seal_if_full(true); + return r; + }; + + auto write_value_in_segments = [this, &tablet, &page_boundaries, si, + ei](ValueChunkWriter* vcw, + uint32_t col_idx) -> int { + int r = E_OK; + vcw->set_enable_page_seal_if_full(false); + uint32_t seg_start = si; + for (uint32_t boundary : page_boundaries) { + if ((r = value_write_column(vcw, tablet, col_idx, seg_start, + boundary)) != E_OK) + return r; + if (vcw->has_current_page_data() && + (r = vcw->seal_current_page()) != E_OK) + return r; + seg_start = boundary; + } + if (seg_start < ei) { + r = value_write_column(vcw, tablet, col_idx, seg_start, ei); + } + vcw->set_enable_page_seal_if_full(true); + return r; + }; + + // All columns (time + values) write the same row segments and seal + // at the same boundaries — fully parallel. +#ifdef ENABLE_THREADS + if (g_config_value_.parallel_write_enabled_) { + std::vector> futures; + futures.push_back(thread_pool_.submit( + [&write_time_in_segments, time_chunk_writer]() { + return write_time_in_segments(time_chunk_writer); + })); + for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { + ValueChunkWriter* vcw = value_chunk_writers[k]; + if (IS_NULL(vcw)) continue; + uint32_t col_idx = field_columns[k]; + futures.push_back(thread_pool_.submit( + [&write_value_in_segments, vcw, col_idx]() { + return write_value_in_segments(vcw, col_idx); + })); + } + for (auto& f : futures) { + int r = f.get(); + if (r != E_OK && ret == E_OK) ret = r; + } + if (ret != E_OK) return ret; + } else +#endif + { + if (RET_FAIL(write_time_in_segments(time_chunk_writer))) { return ret; } - ASSERT(chunk_writers.size() == tablet.get_column_count()); - for (uint32_t c = 0; c < chunk_writers.size(); c++) { - ChunkWriter* chunk_writer = chunk_writers[c]; - if (IS_NULL(chunk_writer)) { - continue; - } - if (RET_FAIL(write_column(chunk_writer, tablet, c, start_idx, - device_id_end_index_pair.second))) { + for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { + ValueChunkWriter* vcw = value_chunk_writers[k]; + if (IS_NULL(vcw)) continue; + if (RET_FAIL(write_value_in_segments(vcw, field_columns[k]))) { return ret; } } - start_idx = device_id_end_index_pair.second; } + start_idx = end_idx; } record_count_since_last_flush_ += tablet.cur_row_size_; // Reset string column buffers so the tablet can be reused for the next diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h index 01028e2e2..bf409879e 100644 --- a/cpp/src/writer/tsfile_writer.h +++ b/cpp/src/writer/tsfile_writer.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -32,6 +33,9 @@ #include "common/record.h" #include "common/schema.h" #include "common/tablet.h" +#ifdef ENABLE_THREADS +#include "common/thread_pool.h" +#endif namespace storage { class WriteFile; @@ -194,7 +198,13 @@ class TsFileWriter { int64_t record_count_for_next_mem_check_; bool write_file_created_; bool io_writer_owned_; // false when init(RestorableTsFileIOWriter*) - bool table_aligned_ = true; +#ifdef ENABLE_THREADS + static size_t get_thread_pool_size() { + int32_t count = common::g_config_value_.write_thread_count_; + return count > 0 ? static_cast(count) : size_t{1}; + } + common::ThreadPool thread_pool_{get_thread_pool_size()}; +#endif int write_typed_column(ValueChunkWriter* value_chunk_writer, int64_t* timestamps, bool* col_values, diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc b/cpp/test/writer/table_view/tsfile_writer_table_test.cc index 477f875e7..d1f3b92e4 100644 --- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc +++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc @@ -20,6 +20,7 @@ #include +#include "common/global.h" #include "common/record.h" #include "common/schema.h" #include "common/tablet.h" @@ -31,10 +32,11 @@ using namespace storage; using namespace common; -class TsFileWriterTableTest : public ::testing::Test { +class TsFileWriterTableTest : public ::testing::TestWithParam { protected: void SetUp() override { libtsfile_init(); + set_parallel_write_enabled(GetParam()); file_name_ = std::string("tsfile_writer_table_test_") + generate_random_string(10) + std::string(".tsfile"); remove(file_name_.c_str()); @@ -133,7 +135,7 @@ class TsFileWriterTableTest : public ::testing::Test { } }; -TEST_F(TsFileWriterTableTest, WriteTableTest) { +TEST_P(TsFileWriterTableTest, WriteTableTest) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared(&write_file_, table_schema); @@ -144,7 +146,7 @@ TEST_F(TsFileWriterTableTest, WriteTableTest) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WithoutTagAndMultiPage) { +TEST_P(TsFileWriterTableTest, WithoutTagAndMultiPage) { std::vector measurement_schemas; std::vector column_categories; measurement_schemas.resize(1); @@ -192,7 +194,7 @@ TEST_F(TsFileWriterTableTest, WithoutTagAndMultiPage) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteDisorderTest) { +TEST_P(TsFileWriterTableTest, WriteDisorderTest) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared(&write_file_, table_schema); @@ -242,7 +244,7 @@ TEST_F(TsFileWriterTableTest, WriteDisorderTest) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteTableTestMultiFlush) { +TEST_P(TsFileWriterTableTest, WriteTableTestMultiFlush) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared( &write_file_, table_schema, 2 * 1024); @@ -255,7 +257,7 @@ TEST_F(TsFileWriterTableTest, WriteTableTestMultiFlush) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteNonExistColumnTest) { +TEST_P(TsFileWriterTableTest, WriteNonExistColumnTest) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared(&write_file_, table_schema); @@ -283,7 +285,7 @@ TEST_F(TsFileWriterTableTest, WriteNonExistColumnTest) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteNonExistTableTest) { +TEST_P(TsFileWriterTableTest, WriteNonExistTableTest) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared(&write_file_, table_schema); @@ -295,7 +297,7 @@ TEST_F(TsFileWriterTableTest, WriteNonExistTableTest) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriterWithMemoryThreshold) { +TEST_P(TsFileWriterTableTest, WriterWithMemoryThreshold) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared( &write_file_, table_schema, 256 * 1024 * 1024); @@ -305,7 +307,7 @@ TEST_F(TsFileWriterTableTest, WriterWithMemoryThreshold) { delete table_schema; } -TEST_F(TsFileWriterTableTest, EmptyTagWrite) { +TEST_P(TsFileWriterTableTest, EmptyTagWrite) { std::vector measurement_schemas; std::vector column_categories; measurement_schemas.resize(3); @@ -361,7 +363,7 @@ TEST_F(TsFileWriterTableTest, EmptyTagWrite) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WritehDataTypeMisMatch) { +TEST_P(TsFileWriterTableTest, WritehDataTypeMisMatch) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared( &write_file_, table_schema, 256 * 1024 * 1024); @@ -412,7 +414,7 @@ TEST_F(TsFileWriterTableTest, WritehDataTypeMisMatch) { tsfile_table_writer_->close(); } -TEST_F(TsFileWriterTableTest, WriteAndReadSimple) { +TEST_P(TsFileWriterTableTest, WriteAndReadSimple) { std::vector measurement_schemas; std::vector column_categories; measurement_schemas.resize(2); @@ -467,7 +469,7 @@ TEST_F(TsFileWriterTableTest, WriteAndReadSimple) { delete table_schema; } -TEST_F(TsFileWriterTableTest, DuplicateColumnName) { +TEST_P(TsFileWriterTableTest, DuplicateColumnName) { std::vector measurement_schemas; std::vector column_categories; measurement_schemas.resize(3); @@ -505,7 +507,7 @@ TEST_F(TsFileWriterTableTest, DuplicateColumnName) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteWithNullAndEmptyTag) { +TEST_P(TsFileWriterTableTest, WriteWithNullAndEmptyTag) { std::vector measurement_schemas; std::vector column_categories; for (int i = 0; i < 3; i++) { @@ -637,7 +639,7 @@ TEST_F(TsFileWriterTableTest, WriteWithNullAndEmptyTag) { ASSERT_EQ(reader.close(), common::E_OK); } -TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) { +TEST_P(TsFileWriterTableTest, MultiDeviceMultiFields) { common::config_set_max_degree_of_index_node(5); auto table_schema = gen_table_schema(0, 1, 100); auto tsfile_table_writer_ = @@ -696,7 +698,7 @@ TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteDataWithEmptyField) { +TEST_P(TsFileWriterTableTest, WriteDataWithEmptyField) { std::vector measurement_schemas; std::vector column_categories; for (int i = 0; i < 3; i++) { @@ -773,7 +775,7 @@ TEST_F(TsFileWriterTableTest, WriteDataWithEmptyField) { ASSERT_EQ(reader.close(), common::E_OK); } -TEST_F(TsFileWriterTableTest, MultiDatatypes) { +TEST_P(TsFileWriterTableTest, MultiDatatypes) { std::vector measurement_schemas; std::vector column_categories; @@ -877,7 +879,7 @@ TEST_F(TsFileWriterTableTest, MultiDatatypes) { delete[] literal; } -TEST_F(TsFileWriterTableTest, DiffCodecTypes) { +TEST_P(TsFileWriterTableTest, DiffCodecTypes) { std::vector measurement_schemas; std::vector column_categories; @@ -985,7 +987,7 @@ TEST_F(TsFileWriterTableTest, DiffCodecTypes) { delete[] literal; } -TEST_F(TsFileWriterTableTest, EncodingConfigIntegration) { +TEST_P(TsFileWriterTableTest, EncodingConfigIntegration) { // 1. Test setting global compression type ASSERT_EQ(E_OK, set_global_compression(SNAPPY)); @@ -1098,7 +1100,7 @@ TEST_F(TsFileWriterTableTest, EncodingConfigIntegration) { } #ifdef ENABLE_MEM_STAT -TEST_F(TsFileWriterTableTest, DISABLED_MemStatWriteAndVerify) { +TEST_P(TsFileWriterTableTest, DISABLED_MemStatWriteAndVerify) { TableSchema* table_schema = gen_table_schema(0, 2, 3); auto tsfile_table_writer = std::make_shared(&write_file_, table_schema); @@ -1172,4 +1174,9 @@ TEST_F(TsFileWriterTableTest, DISABLED_MemStatWriteAndVerify) { delete table_schema; } -#endif \ No newline at end of file +#endif + +INSTANTIATE_TEST_SUITE_P(Serial, TsFileWriterTableTest, + ::testing::Values(false)); +INSTANTIATE_TEST_SUITE_P(Parallel, TsFileWriterTableTest, + ::testing::Values(true)); \ No newline at end of file