Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 90 additions & 9 deletions src/backend/distributed/executor/adaptive_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@
#include "distributed/repartition_join_execution.h"
#include "distributed/resource_lock.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/sorted_merge.h"
#include "distributed/stats/stat_counters.h"
#include "distributed/subplan_execution.h"
#include "distributed/transaction_identifier.h"
Expand Down Expand Up @@ -315,6 +316,15 @@ typedef struct DistributedExecution
* fail, such as CREATE INDEX CONCURRENTLY.
*/
bool localExecutionSupported;

/*
* Sorted merge: when useSortedMerge is true, worker results are routed
* to per-task tuple stores. After execution completes, these stores are
* k-way merged into the final scanState->tuplestorestate.
*/
bool useSortedMerge;
Tuplestorestate **perTaskStores;
int perTaskStoreCount;
} DistributedExecution;


Expand Down Expand Up @@ -641,7 +651,10 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel
TransactionProperties *
xactProperties,
List *jobIdList,
bool localExecutionSupported);
bool localExecutionSupported,
bool useSortedMerge,
Tuplestorestate **perTaskStores,
int perTaskStoreCount);
static TransactionProperties DecideTaskListTransactionProperties(RowModifyLevel
modLevel,
List *taskList,
Expand Down Expand Up @@ -799,12 +812,45 @@ AdaptiveExecutor(CitusScanState *scanState)
/* Reset Task fields that are only valid for a single execution */
ResetExplainAnalyzeData(taskList);

scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);

TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
TupleDestination *defaultTupleDest =
CreateTupleStoreTupleDest(scanState->tuplestorestate, tupleDescriptor);
TupleDestination *defaultTupleDest = NULL;

/*
* When sorted merge is active, route worker results into per-task tuple
* stores. After execution completes, these stores are k-way merged into
* the final scanState->tuplestorestate.
*
* useSortedMerge is a plan-time decision — if the plan says merge, the
* executor must merge, because the combine query plan has no Sort node
* above us. Skipping the merge would produce silently unsorted output.
*
* This applies even under EXPLAIN ANALYZE: the ExplainAnalyzeDestination
* wrapper forwards data tuples (queryNumber == 0) to the per-task
* dispatch, which routes them to the correct per-task store. Plan-fetch
* tuples (queryNumber == 1) are handled entirely within
* ExplainAnalyzeDestPutTuple and never reach the per-task dispatch.
*/
Tuplestorestate **perTaskStores = NULL;
int perTaskStoreCount = 0;

if (distributedPlan->useSortedMerge)
{
TupleDestinationStats *sharedStats = palloc0(sizeof(TupleDestinationStats));
defaultTupleDest = CreatePerTaskDispatchDest(taskList, tupleDescriptor,
sharedStats,
&perTaskStores,
&perTaskStoreCount);

/* final tuplestore created after merge */
scanState->tuplestorestate = NULL;
}
else
{
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
defaultTupleDest =
CreateTupleStoreTupleDest(scanState->tuplestorestate, tupleDescriptor);
}

bool localExecutionSupported = true;

Expand Down Expand Up @@ -865,7 +911,10 @@ AdaptiveExecutor(CitusScanState *scanState)
defaultTupleDest,
&xactProperties,
jobIdList,
localExecutionSupported);
localExecutionSupported,
distributedPlan->useSortedMerge,
perTaskStores,
perTaskStoreCount);

/*
* Make sure that we acquire the appropriate locks even if the local tasks
Expand Down Expand Up @@ -897,6 +946,30 @@ AdaptiveExecutor(CitusScanState *scanState)

FinishDistributedExecution(execution);

/*
* When sorted merge is active, k-way merge the per-task stores into
* the final tuplestore. This produces globally sorted output that the
* existing ReturnTupleFromTuplestore() path can read unchanged.
*/
if (execution->useSortedMerge && execution->perTaskStoreCount > 0)
{
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);

MergePerTaskStoresIntoFinalStore(scanState->tuplestorestate,
execution->perTaskStores,
execution->perTaskStoreCount,
distributedPlan->sortedMergeKeys,
distributedPlan->sortedMergeKeyCount,
tupleDescriptor);

/* free per-task stores — they are no longer needed */
for (int i = 0; i < execution->perTaskStoreCount; i++)
{
tuplestore_end(execution->perTaskStores[i]);
}
}

if (SortReturning && distributedPlan->expectResults && commandType != CMD_SELECT)
{
SortTupleStore(scanState);
Expand Down Expand Up @@ -1105,7 +1178,8 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
executionParams->modLevel, executionParams->taskList,
executionParams->paramListInfo, executionParams->targetPoolSize,
defaultTupleDest, &executionParams->xactProperties,
executionParams->jobIdList, executionParams->localExecutionSupported);
executionParams->jobIdList, executionParams->localExecutionSupported,
false, NULL, 0);

/*
* If current transaction accessed local placements and task list includes
Expand Down Expand Up @@ -1170,7 +1244,10 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
ParamListInfo paramListInfo,
int targetPoolSize, TupleDestination *defaultTupleDest,
TransactionProperties *xactProperties,
List *jobIdList, bool localExecutionSupported)
List *jobIdList, bool localExecutionSupported,
bool useSortedMerge,
Tuplestorestate **perTaskStores,
int perTaskStoreCount)
{
DistributedExecution *execution =
(DistributedExecution *) palloc0(sizeof(DistributedExecution));
Expand Down Expand Up @@ -1200,6 +1277,10 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,

execution->localExecutionSupported = localExecutionSupported;

execution->useSortedMerge = useSortedMerge;
execution->perTaskStores = perTaskStores;
execution->perTaskStoreCount = perTaskStoreCount;

/*
* Since task can have multiple queries, we are not sure how many columns we should
* allocate for. We start with 16, and reallocate when we need more.
Expand Down
3 changes: 3 additions & 0 deletions src/backend/distributed/executor/multi_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ ParamListInfo executorBoundParams = NULL;
/* sort the returning to get consistent outputs, used only for testing */
bool SortReturning = false;

/* when true at planning time, enables coordinator sorted merge for ORDER BY */
bool EnableSortedMerge = false;

/*
* How many nested executors have we started? This can happen for SQL
* UDF calls. The outer query starts an executor, then postgres opens
Expand Down
Loading
Loading