Skip to content

Commit 9207296

Browse files
committed
support write parallel.
1 parent e2fba46 commit 9207296

File tree

10 files changed

+432
-275
lines changed

10 files changed

+432
-275
lines changed

cpp/CMakeLists.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,13 @@ if (ENABLE_ZLIB)
150150
add_definitions(-DENABLE_GZIP)
151151
endif()
152152

153+
option(ENABLE_THREADS "Enable multi-threaded read/write (requires pthreads)" ON)
154+
message("cmake using: ENABLE_THREADS=${ENABLE_THREADS}")
155+
156+
if (ENABLE_THREADS)
157+
add_definitions(-DENABLE_THREADS)
158+
endif()
159+
153160
option(ENABLE_SIMDE "Enable SIMDe (SIMD Everywhere)" OFF)
154161
message("cmake using: ENABLE_SIMDE=${ENABLE_SIMDE}")
155162

cpp/src/common/config/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ typedef struct ConfigValue {
4646
TSEncoding double_encoding_type_;
4747
TSEncoding string_encoding_type_;
4848
CompressionType default_compression_type_;
49+
bool parallel_write_enabled_;
50+
int32_t write_thread_count_;
4951
// When true, aligned writer enforces page size limit strictly by
5052
// interleaving time/value writes and sealing pages together when any side
5153
// becomes full.

cpp/src/common/global.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,15 @@ void init_config_value() {
5454
g_config_value_.int64_encoding_type_ = TS_2DIFF;
5555
g_config_value_.float_encoding_type_ = GORILLA;
5656
g_config_value_.double_encoding_type_ = GORILLA;
57+
g_config_value_.string_encoding_type_ = PLAIN;
5758
// Default compression type is LZ4
5859
#ifdef ENABLE_LZ4
5960
g_config_value_.default_compression_type_ = LZ4;
6061
#else
6162
g_config_value_.default_compression_type_ = UNCOMPRESSED;
6263
#endif
64+
g_config_value_.parallel_write_enabled_ = true;
65+
g_config_value_.write_thread_count_ = 6;
6366
// Enforce aligned page size limits strictly by default.
6467
g_config_value_.strict_page_size_ = true;
6568
}

cpp/src/common/global.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,20 @@ FORCE_INLINE uint8_t get_global_compression() {
163163
return static_cast<uint8_t>(g_config_value_.default_compression_type_);
164164
}
165165

166+
FORCE_INLINE void set_parallel_write_enabled(bool enabled) {
167+
g_config_value_.parallel_write_enabled_ = enabled;
168+
}
169+
170+
FORCE_INLINE bool get_parallel_write_enabled() {
171+
return g_config_value_.parallel_write_enabled_;
172+
}
173+
174+
FORCE_INLINE int set_write_thread_count(int32_t count) {
175+
if (count < 1 || count > 64) return E_INVALID_ARG;
176+
g_config_value_.write_thread_count_ = count;
177+
return E_OK;
178+
}
179+
166180
extern int init_common();
167181
extern bool is_timestamp_column_name(const char* time_col_name);
168182
extern void cols_to_json(ByteStream* byte_stream,

cpp/src/common/tablet.cc

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
#include "tablet.h"
2121

22+
#include <algorithm>
2223
#include <cstdlib>
24+
#include <numeric>
2325

2426
#include "allocator/alloc_base.h"
2527
#include "datatype/date_converter.h"
@@ -491,6 +493,135 @@ void Tablet::set_column_categories(
491493
}
492494
}
493495

496+
namespace {
497+
498+
template <typename T>
499+
void permute_array(T* arr, const std::vector<uint32_t>& perm, uint32_t n) {
500+
std::vector<T> tmp(n);
501+
for (uint32_t i = 0; i < n; i++) tmp[i] = arr[perm[i]];
502+
std::copy(tmp.begin(), tmp.end(), arr);
503+
}
504+
505+
void permute_string_column(Tablet::StringColumn* sc, BitMap& bm,
506+
const std::vector<uint32_t>& perm, uint32_t n,
507+
uint32_t max_rows) {
508+
Tablet::StringColumn tmp;
509+
tmp.init(max_rows, sc->buf_used > 0 ? sc->buf_used : 64);
510+
for (uint32_t i = 0; i < n; i++) {
511+
uint32_t r = perm[i];
512+
if (bm.test(r)) {
513+
// Null row — write a zero-length entry to keep offsets valid.
514+
tmp.append(i, "", 0);
515+
} else {
516+
int32_t off = sc->offsets[r];
517+
uint32_t len =
518+
static_cast<uint32_t>(sc->offsets[r + 1] - off);
519+
tmp.append(i, sc->buffer + off, len);
520+
}
521+
}
522+
// Swap contents.
523+
std::swap(sc->offsets, tmp.offsets);
524+
std::swap(sc->buffer, tmp.buffer);
525+
std::swap(sc->buf_capacity, tmp.buf_capacity);
526+
std::swap(sc->buf_used, tmp.buf_used);
527+
tmp.destroy();
528+
}
529+
530+
void permute_bitmap(BitMap& bm, const std::vector<uint32_t>& perm,
531+
uint32_t n) {
532+
if (!bm.get_bitmap()) return;
533+
uint32_t size_bytes = bm.get_size();
534+
// Save original bits.
535+
std::vector<char> orig(bm.get_bitmap(), bm.get_bitmap() + size_bytes);
536+
// Clear all bits (= all non-null).
537+
bm.clear_all();
538+
// Re-set null bits through the permutation.
539+
for (uint32_t i = 0; i < n; i++) {
540+
uint32_t src = perm[i];
541+
if (orig[src >> 3] & (1 << (src & 7))) {
542+
bm.set(i);
543+
}
544+
}
545+
}
546+
547+
} // anonymous namespace
548+
549+
void Tablet::sort_by_device() {
550+
if (id_column_indexes_.empty() || cur_row_size_ <= 1) return;
551+
552+
const uint32_t n = cur_row_size_;
553+
554+
// Build permutation sorted by tag column values (stable sort keeps
555+
// timestamp order within each device).
556+
std::vector<uint32_t> perm(n);
557+
std::iota(perm.begin(), perm.end(), 0);
558+
559+
std::stable_sort(perm.begin(), perm.end(), [this](uint32_t a, uint32_t b) {
560+
for (int idx : id_column_indexes_) {
561+
bool a_null = bitmaps_[idx].test(a);
562+
bool b_null = bitmaps_[idx].test(b);
563+
if (a_null != b_null) return a_null > b_null; // null sorts last
564+
if (a_null) continue; // both null — equal on this column
565+
const StringColumn& sc = *value_matrix_[idx].string_col;
566+
int32_t a_off = sc.offsets[a];
567+
uint32_t a_len = static_cast<uint32_t>(sc.offsets[a + 1] - a_off);
568+
int32_t b_off = sc.offsets[b];
569+
uint32_t b_len = static_cast<uint32_t>(sc.offsets[b + 1] - b_off);
570+
uint32_t min_len = std::min(a_len, b_len);
571+
int cmp = (min_len > 0)
572+
? memcmp(sc.buffer + a_off, sc.buffer + b_off, min_len)
573+
: 0;
574+
if (cmp != 0) return cmp < 0;
575+
if (a_len != b_len) return a_len < b_len;
576+
}
577+
return false;
578+
});
579+
580+
// Check if already sorted.
581+
bool sorted = true;
582+
for (uint32_t i = 0; i < n && sorted; i++) {
583+
if (perm[i] != i) sorted = false;
584+
}
585+
if (sorted) return;
586+
587+
// Apply permutation to timestamps.
588+
permute_array(timestamps_, perm, n);
589+
590+
// Apply permutation to each column.
591+
uint32_t col_count = static_cast<uint32_t>(schema_vec_->size());
592+
for (uint32_t c = 0; c < col_count; c++) {
593+
TSDataType dt = schema_vec_->at(c).data_type_;
594+
switch (dt) {
595+
case BOOLEAN:
596+
permute_array(value_matrix_[c].bool_data, perm, n);
597+
break;
598+
case INT32:
599+
case DATE:
600+
permute_array(value_matrix_[c].int32_data, perm, n);
601+
break;
602+
case INT64:
603+
case TIMESTAMP:
604+
permute_array(value_matrix_[c].int64_data, perm, n);
605+
break;
606+
case FLOAT:
607+
permute_array(value_matrix_[c].float_data, perm, n);
608+
break;
609+
case DOUBLE:
610+
permute_array(value_matrix_[c].double_data, perm, n);
611+
break;
612+
case STRING:
613+
case TEXT:
614+
case BLOB:
615+
permute_string_column(value_matrix_[c].string_col, bitmaps_[c],
616+
perm, n, max_row_num_);
617+
break;
618+
default:
619+
break;
620+
}
621+
permute_bitmap(bitmaps_[c], perm, n);
622+
}
623+
}
624+
494625
void Tablet::reset_string_columns() {
495626
size_t schema_count = schema_vec_->size();
496627
for (size_t c = 0; c < schema_count; c++) {

cpp/src/common/tablet.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class TabletColIterator;
4646
* with their associated metadata such as column names and types.
4747
*/
4848
class Tablet {
49+
public:
4950
// Arrow-style string column: offsets + contiguous buffer.
5051
// string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i]
5152
struct StringColumn {
@@ -284,6 +285,10 @@ class Tablet {
284285

285286
void set_column_categories(
286287
const std::vector<common::ColumnCategory>& column_categories);
288+
// Sort rows so that rows belonging to the same device (same TAG column
289+
// values) are contiguous. Stable sort: preserves timestamp order within
290+
// each device. No-op when there are no TAG columns or ≤1 rows.
291+
void sort_by_device();
287292
std::shared_ptr<IDeviceID> get_device_id(int i) const;
288293
std::vector<uint32_t> find_all_device_boundaries() const;
289294

cpp/src/common/thread_pool.h

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* License); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#ifndef COMMON_THREAD_POOL_H
20+
#define COMMON_THREAD_POOL_H
21+
22+
#ifdef ENABLE_THREADS
23+
24+
#include <condition_variable>
25+
#include <functional>
26+
#include <future>
27+
#include <mutex>
28+
#include <queue>
29+
#include <thread>
30+
#include <vector>
31+
32+
namespace common {
33+
34+
// Unified fixed-size thread pool supporting both fire-and-forget tasks
35+
// (submit void + wait_all) and future-returning tasks (submit<F>).
36+
// Used by both write path (column-parallel encoding) and read path
37+
// (column-parallel decoding).
38+
class ThreadPool {
39+
public:
40+
explicit ThreadPool(size_t num_threads) : stop_(false), active_(0) {
41+
for (size_t i = 0; i < num_threads; i++) {
42+
workers_.emplace_back([this] { worker_loop(); });
43+
}
44+
}
45+
46+
~ThreadPool() {
47+
{
48+
std::lock_guard<std::mutex> lk(mu_);
49+
stop_ = true;
50+
}
51+
cv_work_.notify_all();
52+
for (auto& w : workers_) {
53+
if (w.joinable()) w.join();
54+
}
55+
}
56+
57+
// Submit a fire-and-forget task (no return value).
58+
void submit(std::function<void()> task) {
59+
{
60+
std::lock_guard<std::mutex> lk(mu_);
61+
tasks_.push(std::move(task));
62+
active_++;
63+
}
64+
cv_work_.notify_one();
65+
}
66+
67+
// Submit a task that returns a value via std::future.
68+
template <typename F>
69+
std::future<typename std::result_of<F()>::type> submit(F&& f) {
70+
using RetType = typename std::result_of<F()>::type;
71+
auto task =
72+
std::make_shared<std::packaged_task<RetType()>>(std::forward<F>(f));
73+
std::future<RetType> result = task->get_future();
74+
{
75+
std::lock_guard<std::mutex> lk(mu_);
76+
tasks_.emplace([task]() { (*task)(); });
77+
active_++;
78+
}
79+
cv_work_.notify_one();
80+
return result;
81+
}
82+
83+
// Block until all submitted tasks have completed.
84+
void wait_all() {
85+
std::unique_lock<std::mutex> lk(mu_);
86+
cv_done_.wait(lk, [this] { return active_ == 0 && tasks_.empty(); });
87+
}
88+
89+
private:
90+
void worker_loop() {
91+
while (true) {
92+
std::function<void()> task;
93+
{
94+
std::unique_lock<std::mutex> lk(mu_);
95+
cv_work_.wait(lk, [this] { return stop_ || !tasks_.empty(); });
96+
if (stop_ && tasks_.empty()) return;
97+
task = std::move(tasks_.front());
98+
tasks_.pop();
99+
}
100+
task();
101+
{
102+
std::lock_guard<std::mutex> lk(mu_);
103+
active_--;
104+
}
105+
cv_done_.notify_one();
106+
}
107+
}
108+
109+
std::vector<std::thread> workers_;
110+
std::queue<std::function<void()>> tasks_;
111+
std::mutex mu_;
112+
std::condition_variable cv_work_;
113+
std::condition_variable cv_done_;
114+
bool stop_;
115+
int active_;
116+
};
117+
118+
} // namespace common
119+
120+
#endif // ENABLE_THREADS
121+
122+
#endif // COMMON_THREAD_POOL_H

0 commit comments

Comments
 (0)