Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ if (ENABLE_ZLIB)
add_definitions(-DENABLE_GZIP)
endif()

option(ENABLE_THREADS "Enable multi-threaded read/write (requires pthreads)" ON)
message("cmake using: ENABLE_THREADS=${ENABLE_THREADS}")

if (ENABLE_THREADS)
add_definitions(-DENABLE_THREADS)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ENABLE_THREADS adds a compile definition, but the build does not link against the platform thread library (e.g., -pthread / Threads::Threads). On many toolchains this will cause unresolved symbols when using std::thread. When ENABLE_THREADS is ON, find_package(Threads REQUIRED) and link Threads::Threads to the relevant targets (at least tsfile and any object libs that end up in it).

Suggested change
add_definitions(-DENABLE_THREADS)
add_definitions(-DENABLE_THREADS)
find_package(Threads REQUIRED)
link_libraries(Threads::Threads)

Copilot uses AI. Check for mistakes.
endif()

option(ENABLE_SIMDE "Enable SIMDe (SIMD Everywhere)" OFF)
message("cmake using: ENABLE_SIMDE=${ENABLE_SIMDE}")

Expand Down
2 changes: 2 additions & 0 deletions cpp/src/common/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ typedef struct ConfigValue {
TSEncoding double_encoding_type_;
TSEncoding string_encoding_type_;
CompressionType default_compression_type_;
bool parallel_write_enabled_;
int32_t write_thread_count_;
// When true, aligned writer enforces page size limit strictly by
// interleaving time/value writes and sealing pages together when any side
// becomes full.
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/common/global.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ void init_config_value() {
g_config_value_.int64_encoding_type_ = TS_2DIFF;
g_config_value_.float_encoding_type_ = GORILLA;
g_config_value_.double_encoding_type_ = GORILLA;
g_config_value_.string_encoding_type_ = PLAIN;
// Default compression type is LZ4
#ifdef ENABLE_LZ4
g_config_value_.default_compression_type_ = LZ4;
#else
g_config_value_.default_compression_type_ = UNCOMPRESSED;
#endif
g_config_value_.parallel_write_enabled_ = true;
g_config_value_.write_thread_count_ = 6;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to sense the number of cores?
We may forget to disable parallel-write for end scenarios, and it would be better to auto-disable parallel-write when there is only one core.

// Enforce aligned page size limits strictly by default.
g_config_value_.strict_page_size_ = true;
}
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/common/global.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,20 @@ FORCE_INLINE uint8_t get_global_compression() {
return static_cast<uint8_t>(g_config_value_.default_compression_type_);
}

FORCE_INLINE void set_parallel_write_enabled(bool enabled) {
g_config_value_.parallel_write_enabled_ = enabled;
}

FORCE_INLINE bool get_parallel_write_enabled() {
return g_config_value_.parallel_write_enabled_;
}
Comment on lines +170 to +172
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&& g_config_value_.write_thread_count_ != 1?


FORCE_INLINE int set_write_thread_count(int32_t count) {
if (count < 1 || count > 64) return E_INVALID_ARG;
g_config_value_.write_thread_count_ = count;
return E_OK;
}
Comment on lines +174 to +178
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set_write_thread_count() updates the global config, but TsFileWriter's thread_pool_ is constructed once and never resized. As written, calling this at runtime will not affect existing writers and may mislead API consumers. Either document that it must be called before constructing writers, or adjust the writer/pool to honor runtime updates.

Copilot uses AI. Check for mistakes.

extern int init_common();
extern bool is_timestamp_column_name(const char* time_col_name);
extern void cols_to_json(ByteStream* byte_stream,
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/common/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class TabletColIterator;
* with their associated metadata such as column names and types.
*/
class Tablet {
public:
// Arrow-style string column: offsets + contiguous buffer.
// string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i]
struct StringColumn {
Expand Down Expand Up @@ -235,6 +236,12 @@ class Tablet {
}
size_t get_column_count() const { return schema_vec_->size(); }
uint32_t get_cur_row_size() const { return cur_row_size_; }
int64_t get_timestamp(uint32_t row_index) const {
return timestamps_[row_index];
}
bool is_null(uint32_t row_index, uint32_t col_index) const {
return bitmaps_[col_index].test(row_index);
}

/**
* @brief Adds a timestamp to the specified row.
Expand Down
122 changes: 122 additions & 0 deletions cpp/src/common/thread_pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef COMMON_THREAD_POOL_H
#define COMMON_THREAD_POOL_H

#ifdef ENABLE_THREADS

#include <condition_variable>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThreadPool uses std::result_of in the submit(F&&) template but the header does not include <type_traits>, where std::result_of is defined. This makes the header non-self-contained and can fail to compile depending on transitive includes. Add the missing include (or switch to std::invoke_result if/when the project moves off C++11).

Suggested change
#include <thread>
#include <thread>
#include <type_traits>

Copilot uses AI. Check for mistakes.
#include <vector>

namespace common {

// Unified fixed-size thread pool supporting both fire-and-forget tasks
// (submit void + wait_all) and future-returning tasks (submit<F>).
// Used by both write path (column-parallel encoding) and read path
// (column-parallel decoding).
class ThreadPool {
public:
explicit ThreadPool(size_t num_threads) : stop_(false), active_(0) {
for (size_t i = 0; i < num_threads; i++) {
workers_.emplace_back([this] { worker_loop(); });
}
}

~ThreadPool() {
{
std::lock_guard<std::mutex> lk(mu_);
stop_ = true;
}
cv_work_.notify_all();
for (auto& w : workers_) {
if (w.joinable()) w.join();
}
}

// Submit a fire-and-forget task (no return value).
void submit(std::function<void()> task) {
{
std::lock_guard<std::mutex> lk(mu_);
tasks_.push(std::move(task));
active_++;
}
cv_work_.notify_one();
}

// Submit a task that returns a value via std::future.
template <typename F>
std::future<typename std::result_of<F()>::type> submit(F&& f) {
using RetType = typename std::result_of<F()>::type;
auto task =
std::make_shared<std::packaged_task<RetType()>>(std::forward<F>(f));
std::future<RetType> result = task->get_future();
{
std::lock_guard<std::mutex> lk(mu_);
tasks_.emplace([task]() { (*task)(); });
active_++;
}
cv_work_.notify_one();
return result;
}

// Block until all submitted tasks have completed.
void wait_all() {
std::unique_lock<std::mutex> lk(mu_);
cv_done_.wait(lk, [this] { return active_ == 0 && tasks_.empty(); });
}

private:
void worker_loop() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lk(mu_);
cv_work_.wait(lk, [this] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) return;
task = std::move(tasks_.front());
tasks_.pop();
}
task();
{
std::lock_guard<std::mutex> lk(mu_);
active_--;
}
cv_done_.notify_one();
}
Comment on lines +90 to +106
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worker_loop() executes task() without any exception handling. If a task throws, the worker thread will terminate and active_ will never be decremented, causing wait_all() (and any code waiting on futures) to potentially block forever. Wrap task execution in a try/catch that ensures active_ is decremented and optionally stores/logs the exception.

Copilot uses AI. Check for mistakes.
}

std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex mu_;
std::condition_variable cv_work_;
std::condition_variable cv_done_;
bool stop_;
int active_;
};

} // namespace common

#endif // ENABLE_THREADS

#endif // COMMON_THREAD_POOL_H
Loading
Loading