Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 89 additions & 24 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <Operators/UnorderedSourceOp.h>
#include <Parsers/makeDummyQuery.h>
#include <Storages/DeltaMerge/Index/VectorIndex/Stream/Ctx.h>
#include <Storages/DeltaMerge/ReadThread/ActiveSegmentReadTaskQueue.h>
#include <Storages/DeltaMerge/Remote/DisaggSnapshot.h>
#include <Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.h>
#include <Storages/DeltaMerge/ScanContext.h>
Expand Down Expand Up @@ -917,6 +918,9 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead()
std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSelectQueryInfos()
{
std::unordered_map<TableID, SelectQueryInfo> ret;
bool use_unordered_concat = context.getSettingsRef().dt_enable_unordered_concat;
// Shared read queue for all physical tables
auto shared_read_queue = std::make_shared<DM::ActiveSegmentReadTaskQueue>(max_streams, log);
auto create_query_info = [&](Int64 table_id) -> SelectQueryInfo {
SelectQueryInfo query_info;
/// to avoid null point exception
Expand All @@ -934,6 +938,16 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
query_info.req_id = fmt::format("{} table_id={}", log->identifier(), table_id);
query_info.keep_order = table_scan.keepOrder();
query_info.is_fast_scan = table_scan.isFastScan();
if (use_unordered_concat)
{
query_info.read_queue = shared_read_queue;
}
else
{
// Different read queue for different physical table
query_info.read_queue
= std::make_shared<DM::ActiveSegmentReadTaskQueue>(max_streams, Logger::get(query_info.req_id));
}
return query_info;
};
RUNTIME_CHECK_MSG(mvcc_query_info->scan_context != nullptr, "Unexpected null scan_context");
Expand Down Expand Up @@ -1062,16 +1076,16 @@ Int32 getMaxAllowRetryForLocalRead(const SelectQueryInfo & query_info)
}
} // namespace

DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocalStreamsForPhysicalTable(
void DAGStorageInterpreter::buildLocalStreamsForPhysicalTable(
const TableID & table_id,
const SelectQueryInfo & query_info,
DAGPipeline & pipeline,
DM::Remote::DisaggReadSnapshotPtr & disagg_snap,
size_t max_block_size)
{
DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap;
size_t region_num = query_info.mvcc_query_info->regions_query_info.size();
if (region_num == 0)
return table_snap;
return;

assert(storages_with_structure_lock.find(table_id) != storages_with_structure_lock.end());
auto & storage = storages_with_structure_lock[table_id].storage;
Expand All @@ -1089,6 +1103,7 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
{
try
{
DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap;
if (!dag_context.is_disaggregated_task)
{
// build local inputstreams
Expand Down Expand Up @@ -1116,6 +1131,12 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
// (by calling `validateQueryInfo`). In case the key ranges of Regions have changed (Region merge/split), those `streams`
// may contain different data other than expected.
validateQueryInfo(*query_info.mvcc_query_info, learner_read_snapshot, tmt, log);

// Only after all streams are built successfully, we add the task to mvcc_query_info
if (table_snap)
{
disagg_snap->addTask(table_id, std::move(table_snap));
}
break;
}
catch (RegionException & e)
Expand All @@ -1126,6 +1147,8 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
{
// clean all streams from local because we are not sure the correctness of those streams
pipeline.streams.clear();
// clean table task from read_queue
query_info.read_queue->resetTableTask(table_id);
if (likely(checkRetriableForBatchCopOrMPP(table_id, query_info, e, num_allow_retry)))
continue; // next retry to read from local storage
else
Expand All @@ -1145,20 +1168,19 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
throw;
}
}
return table_snap;
}

DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocalExecForPhysicalTable(
void DAGStorageInterpreter::buildLocalExecForPhysicalTable(
PipelineExecutorContext & exec_context,
PipelineExecGroupBuilder & group_builder,
const TableID & table_id,
const SelectQueryInfo & query_info,
DM::Remote::DisaggReadSnapshotPtr & disagg_snap,
size_t max_block_size)
{
DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap;
size_t region_num = query_info.mvcc_query_info->regions_query_info.size();
if (region_num == 0)
return table_snap;
return;

RUNTIME_CHECK(storages_with_structure_lock.find(table_id) != storages_with_structure_lock.end());
auto & storage = storages_with_structure_lock[table_id].storage;
Expand All @@ -1169,6 +1191,7 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
{
try
{
DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap;
if (!dag_context.is_disaggregated_task)
{
storage->read(
Expand Down Expand Up @@ -1199,6 +1222,13 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
// (by calling `validateQueryInfo`). In case the key ranges of Regions have changed (Region merge/split), those `sourceOps`
// may contain different data other than expected.
validateQueryInfo(*query_info.mvcc_query_info, learner_read_snapshot, tmt, log);

// Only after all sourceOps are built and verified, we add the snapshot to group_builder
if (table_snap != nullptr)
{
RUNTIME_CHECK(disagg_snap != nullptr);
disagg_snap->addTask(table_id, std::move(table_snap));
}
break;
}
catch (RegionException & e)
Expand All @@ -1209,6 +1239,8 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
{
// clean all operator from local because we are not sure the correctness of those operators
group_builder.reset();
// clean table task from read_queue
query_info.read_queue->resetTableTask(table_id);
if (likely(checkRetriableForBatchCopOrMPP(table_id, query_info, e, num_allow_retry)))
continue;
else
Expand All @@ -1228,7 +1260,6 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
throw;
}
}
return table_snap;
}

void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max_block_size)
Expand All @@ -1239,6 +1270,7 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max
return;
mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num);
const auto table_query_infos = generateSelectQueryInfos();
RUNTIME_CHECK_MSG(!table_query_infos.empty(), "No table query info generated for local read");
bool has_multiple_partitions = table_query_infos.size() > 1;
// MultiPartitionStreamPool will be disabled in no partition mode or single-partition case
std::shared_ptr<MultiPartitionStreamPool> stream_pool
Expand All @@ -1250,12 +1282,17 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max
DAGPipeline current_pipeline;
const TableID physical_table_id = table_query_info.first;
const SelectQueryInfo & query_info = table_query_info.second;
auto table_snap
= buildLocalStreamsForPhysicalTable(physical_table_id, query_info, current_pipeline, max_block_size);
if (table_snap)
{
disaggregated_snap->addTask(physical_table_id, std::move(table_snap));
}
RUNTIME_CHECK_MSG(
query_info.read_queue != nullptr,
"read_queue should not be null, table_id={}",
physical_table_id);
buildLocalStreamsForPhysicalTable(
physical_table_id,
query_info,
current_pipeline,
disaggregated_snap,
max_block_size);

if (has_multiple_partitions)
stream_pool->addPartitionStreams(current_pipeline.streams);
else
Expand All @@ -1265,6 +1302,10 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max
current_pipeline.streams.end());
}

assert(!table_query_infos.empty()); // check at start of this function
auto read_queue = table_query_infos.begin()->second.read_queue;
read_queue->finishQueueIfEmpty();

LOG_DEBUG(
log,
"local streams built, is_disaggregated_task={} snap_id={}",
Expand Down Expand Up @@ -1306,30 +1347,54 @@ void DAGStorageInterpreter::buildLocalExec(
return;
mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num);
const auto table_query_infos = generateSelectQueryInfos();
bool has_multiple_partitions = table_query_infos.size() > 1;
ConcatBuilderPool builder_pool{max_streams};
RUNTIME_CHECK_MSG(!table_query_infos.empty(), "No table query info generated for local read");
const bool has_multiple_partitions = table_query_infos.size() > 1;

const bool use_unordered_concat = context.getSettingsRef().dt_enable_unordered_concat;
ConcatBuilderPool builder_pool{max_streams, context.getSettingsRef().dt_enable_unordered_concat};

auto disaggregated_snap = std::make_shared<DM::Remote::DisaggReadSnapshot>();
for (const auto & table_query_info : table_query_infos)
{
PipelineExecGroupBuilder builder;
const TableID physical_table_id = table_query_info.first;
const SelectQueryInfo & query_info = table_query_info.second;
auto table_snap
= buildLocalExecForPhysicalTable(exec_context, builder, physical_table_id, query_info, max_block_size);
if (table_snap)
{
disaggregated_snap->addTask(physical_table_id, std::move(table_snap));
}
RUNTIME_CHECK_MSG(
query_info.read_queue != nullptr,
"read_queue should not be null, table_id={}",
physical_table_id);
PipelineExecGroupBuilder builder;
buildLocalExecForPhysicalTable(
exec_context,
builder,
physical_table_id,
query_info,
disaggregated_snap,
max_block_size);

if (has_multiple_partitions)
builder_pool.add(builder);
else
group_builder.merge(std::move(builder));
}

LOG_DEBUG(log, "local sourceOps built, is_disaggregated_task={}", dag_context.is_disaggregated_task);
if (use_unordered_concat)
{
assert(!table_query_infos.empty()); // check at start of this function
auto read_queue = table_query_infos.begin()->second.read_queue;
read_queue->finishQueueIfEmpty();
exec_context.addStorageTaskQueue(read_queue);
}
else
{
for (const auto & table_query_info : table_query_infos)
{
const SelectQueryInfo & query_info = table_query_info.second;
query_info.read_queue->finishQueueIfEmpty();
exec_context.addStorageTaskQueue(query_info.read_queue);
}
}

LOG_DEBUG(log, "local sourceOps built, is_disaggregated_task={}", dag_context.is_disaggregated_task);
if (dag_context.is_disaggregated_task)
{
// register the snapshot to manager
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,19 @@ class DAGStorageInterpreter
const RegionException & e,
int num_allow_retry);

DM::Remote::DisaggPhysicalTableReadSnapshotPtr buildLocalStreamsForPhysicalTable(
void buildLocalStreamsForPhysicalTable(
const TableID & table_id,
const SelectQueryInfo & query_info,
DAGPipeline & pipeline,
DM::Remote::DisaggReadSnapshotPtr & disagg_snap,
size_t max_block_size);

DM::Remote::DisaggPhysicalTableReadSnapshotPtr buildLocalExecForPhysicalTable(
void buildLocalExecForPhysicalTable(
PipelineExecutorContext & exec_context,
PipelineExecGroupBuilder & group_builder,
const TableID & table_id,
const SelectQueryInfo & query_info,
DM::Remote::DisaggReadSnapshotPtr & disagg_snap,
size_t max_block_size);

void buildLocalStreams(DAGPipeline & pipeline, size_t max_block_size);
Expand Down
21 changes: 21 additions & 0 deletions dbms/src/Flash/Executor/PipelineExecutorContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <Flash/Pipeline/Schedule/Tasks/OneTimeNotifyFuture.h>
#include <Operators/CTE.h>
#include <Operators/SharedQueue.h>
#include <Storages/DeltaMerge/ReadThread/ActiveSegmentReadTaskQueue.h>

#include <exception>

Expand Down Expand Up @@ -178,6 +179,7 @@ void PipelineExecutorContext::cancel()
bool origin_value = false;
if (is_cancelled.compare_exchange_strong(origin_value, true, std::memory_order_release))
{
cancelStorageTaskQueues();
cancelSharedQueues();
cancelOneTimeFutures();
if (likely(dag_context))
Expand Down Expand Up @@ -234,6 +236,25 @@ void PipelineExecutorContext::cancelSharedQueues()
shared_queue->cancel();
}

void PipelineExecutorContext::addStorageTaskQueue(const DM::ActiveSegmentReadTaskQueuePtr & storage_task_queue)
{
std::lock_guard lock(mu);
RUNTIME_CHECK_MSG(!isCancelled(), "query has been cancelled.");
assert(storage_task_queue);
storage_task_queues.push_back(storage_task_queue);
}

void PipelineExecutorContext::cancelStorageTaskQueues()
{
std::vector<DM::ActiveSegmentReadTaskQueuePtr> tmp;
{
std::lock_guard lock(mu);
std::swap(tmp, storage_task_queues);
}
for (const auto & storage_task_queue : tmp)
storage_task_queue->finishQueue();
}

void PipelineExecutorContext::addOneTimeFuture(const OneTimeNotifyFuturePtr & future)
{
std::lock_guard lock(mu);
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Flash/Executor/PipelineExecutorContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ class OneTimeNotifyFuture;
using OneTimeNotifyFuturePtr = std::shared_ptr<OneTimeNotifyFuture>;
class DAGContext;

namespace DM
{
class ActiveSegmentReadTaskQueue;
using ActiveSegmentReadTaskQueuePtr = std::shared_ptr<ActiveSegmentReadTaskQueue>;
} // namespace DM


class PipelineExecutorContext : private boost::noncopyable
{
public:
Expand Down Expand Up @@ -146,6 +153,8 @@ class PipelineExecutorContext : private boost::noncopyable

void addOneTimeFuture(const OneTimeNotifyFuturePtr & future);

void addStorageTaskQueue(const DM::ActiveSegmentReadTaskQueuePtr & storage_task_queue);

private:
bool setExceptionPtr(const std::exception_ptr & exception_ptr_);

Expand All @@ -160,6 +169,8 @@ class PipelineExecutorContext : private boost::noncopyable

void cancelOneTimeFutures();

void cancelStorageTaskQueues();

void cancelResultQueueIfNeed();

private:
Expand Down Expand Up @@ -196,5 +207,7 @@ class PipelineExecutorContext : private boost::noncopyable
std::vector<SharedQueuePtr> shared_queues;

std::vector<OneTimeNotifyFuturePtr> one_time_futures;

std::vector<DM::ActiveSegmentReadTaskQueuePtr> storage_task_queues;
};
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ struct Settings
M(SettingUInt64, dt_merged_file_max_size, 16 * 1024 * 1024, "Small files are merged into one or more files not larger than dt_merged_file_max_size") \
M(SettingDouble, dt_page_gc_threshold, 0.5, "Max valid rate of deciding to do a GC in PageStorage") \
M(SettingDouble, dt_page_gc_threshold_raft_data, 0.05, "Max valid rate of deciding to do a GC for BlobFile storing PageData in PageStorage") \
M(SettingBool, dt_enable_unordered_concat, false, "") \
M(SettingInt64, enable_version_chain, 1, "Enable version chain or not: 0 - disable, 1 - enabled. " \
"More details are in the comments of `enum class VersionChainMode`." \
"Modifying this configuration requires a restart to reset the in-memory state.") \
Expand Down
Loading