Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
356 changes: 306 additions & 50 deletions src/backend/distributed/executor/adaptive_executor.c

Large diffs are not rendered by default.

27 changes: 23 additions & 4 deletions src/backend/distributed/executor/citus_custom_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "pg_version_constants.h"

#include "distributed/adaptive_executor.h"
#include "distributed/backend_data.h"
#include "distributed/citus_clauses.h"
#include "distributed/citus_custom_scan.h"
Expand Down Expand Up @@ -264,11 +265,11 @@ CitusExecScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;

if (!scanState->finishedRemoteScan)
if (!scanState->executionStarted)
{
bool isMultiTaskPlan = IsMultiTaskPlan(scanState->distributedPlan);

AdaptiveExecutor(scanState);
AdaptiveExecutorStart(scanState);

if (isMultiTaskPlan)
{
Expand All @@ -279,10 +280,20 @@ CitusExecScan(CustomScanState *node)
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}

scanState->finishedRemoteScan = true;
scanState->executionStarted = true;
}

return ReturnTupleFromTuplestore(scanState);
TupleTableSlot *resultSlot = ReturnTupleFromTuplestore(scanState);
if (TupIsNull(resultSlot) && !scanState->finishedRemoteScan)
{
tuplestore_clear(scanState->tuplestorestate);

scanState->finishedRemoteScan = AdaptiveExecutorRun(scanState);

resultSlot = ReturnTupleFromTuplestore(scanState);
}

return resultSlot;
}


Expand Down Expand Up @@ -720,6 +731,7 @@ AdaptiveExecutorCreateScan(CustomScan *scan)

scanState->finishedPreScan = false;
scanState->finishedRemoteScan = false;
scanState->executionStarted = false;

return (Node *) scanState;
}
Expand Down Expand Up @@ -837,6 +849,13 @@ CitusEndScan(CustomScanState *node)
tuplestore_end(scanState->tuplestorestate);
scanState->tuplestorestate = NULL;
}

/*
* Clean up any in-flight distributed execution. This handles the case
* where an error occurs between batches in the adaptive executor,
* ensuring sessions and connections are properly released.
*/
AdaptiveExecutorEnd(scanState);
}


Expand Down
35 changes: 28 additions & 7 deletions src/backend/distributed/planner/distributed_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ static PlannedStmt * TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
Query *query, ParamListInfo
boundParams,
PlannerRestrictionContext *
plannerRestrictionContext);
plannerRestrictionContext,
int cursorOptions);
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
relationId);

Expand Down Expand Up @@ -852,7 +853,8 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
distributedPlan->planId = planId;

/* create final plan by combining local plan with distributed plan */
resultPlan = FinalizePlan(planContext->plan, distributedPlan);
resultPlan = FinalizePlan(planContext->plan, distributedPlan,
planContext->cursorOptions);

/*
* As explained above, force planning costs to be unrealistically high if
Expand Down Expand Up @@ -901,7 +903,9 @@ InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
planContext->query,
planContext->boundParams,
planContext->
plannerRestrictionContext);
plannerRestrictionContext,
planContext->
cursorOptions);

return result;
}
Expand All @@ -917,7 +921,8 @@ static PlannedStmt *
TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
Query *originalQuery,
Query *query, ParamListInfo boundParams,
PlannerRestrictionContext *plannerRestrictionContext)
PlannerRestrictionContext *plannerRestrictionContext,
int cursorOptions)
{
MemoryContext savedContext = CurrentMemoryContext;
PlannedStmt *result = NULL;
Expand All @@ -929,6 +934,7 @@ TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
planContext->originalQuery = originalQuery;
planContext->query = query;
planContext->plannerRestrictionContext = plannerRestrictionContext;
planContext->cursorOptions = cursorOptions;


PG_TRY();
Expand Down Expand Up @@ -1436,7 +1442,8 @@ GetDistributedPlan(CustomScan *customScan)
* which can be run by the PostgreSQL executor.
*/
PlannedStmt *
FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan,
int cursorOptions)
{
PlannedStmt *finalPlan = NULL;
CustomScan *customScan = makeNode(CustomScan);
Expand Down Expand Up @@ -1497,8 +1504,8 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)

customScan->custom_private = list_make1(distributedPlanData);

/* necessary to avoid extra Result node in PG15 */
customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN | CUSTOMPATH_SUPPORT_PROJECTION;
/* CUSTOMPATH_SUPPORT_PROJECTION avoids an extra Result node in PG15+ */
customScan->flags = CUSTOMPATH_SUPPORT_PROJECTION;

/*
* Fast path queries cannot have any subplans by definition, so skip
Expand All @@ -1525,6 +1532,20 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
finalPlan = FinalizeRouterPlan(localPlan, customScan);
}

/*
* For SCROLL cursors, wrap the plan in a Material node so that backward
* scan works correctly with batched execution. PG's standard_planner()
* already added a Material node, but Citus discarded the entire plan tree
* above and replaced it with a CustomScan. Re-apply it here.
*
* Material is lazy (non-blocking): it fetches one tuple at a time from the
* child and appends it to its own tuplestore, so batching is preserved.
*/
if (cursorOptions & CURSOR_OPT_SCROLL)
{
finalPlan->planTree = materialize_finished_plan(finalPlan->planTree);
}

return finalPlan;
}

Expand Down
3 changes: 2 additions & 1 deletion src/backend/distributed/planner/function_call_delegation.c
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,8 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
/* worker will take care of any necessary locking, treat query as read-only */
distributedPlan->modLevel = ROW_MODIFY_READONLY;

return FinalizePlan(planContext->plan, distributedPlan);
return FinalizePlan(planContext->plan, distributedPlan,
planContext->cursorOptions);
}


Expand Down
25 changes: 25 additions & 0 deletions src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -1696,6 +1696,31 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);

DefineCustomIntVariable(
"citus.executor_batch_size",
gettext_noop("Maximum number of rows per batch in the adaptive executor."),
gettext_noop("When set to 0 (the default), the batch size is automatically "
"calculated from work_mem and the estimated tuple size. A positive "
"value overrides the automatic calculation with a fixed row count."),
&ExecutorBatchSize,
0, 0, INT_MAX,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomIntVariable(
"citus.executor_chunk_size",
gettext_noop("Chunk size in bytes for libpq chunked row mode."),
gettext_noop("Controls the chunk size passed to PQsetChunkedRowsMode when "
"fetching rows from workers. Larger values reduce per-result "
"overhead but increase memory usage per fetch. Only effective "
"on PostgreSQL 17 and later."),
&ExecutorChunkSize,
8192, 1, INT_MAX,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomIntVariable(
"citus.executor_slow_start_interval",
gettext_noop("Time to wait between opening connections to the same worker node"),
Expand Down
12 changes: 12 additions & 0 deletions src/include/distributed/adaptive_executor.h
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
#ifndef ADAPTIVE_EXECUTOR_H
#define ADAPTIVE_EXECUTOR_H

#include "distributed/citus_custom_scan.h"
#include "distributed/multi_physical_planner.h"


/* GUC, determining whether Citus opens 1 connection per task */
extern bool ForceMaxQueryParallelization;
extern int MaxAdaptiveExecutorPoolSize;
extern bool EnableBinaryProtocol;

/* GUC, number of rows per batch (0 = auto from work_mem) */
extern int ExecutorBatchSize;

/* GUC, libpq chunk size in bytes for chunked row mode (PG17+) */
extern int ExecutorChunkSize;

/* GUC, number of ms to wait between opening connections to the same worker */
extern int ExecutorSlowStartInterval;
extern bool EnableCostBasedConnectionEstablishment;
extern bool PreventIncompleteConnectionEstablishment;

extern void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState);
extern void AdaptiveExecutorStart(CitusScanState *scanState);
extern bool AdaptiveExecutorRun(CitusScanState *scanState);
extern void AdaptiveExecutorEnd(CitusScanState *scanState);
extern bool ShouldRunTasksSequentially(List *taskList);
extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList);
extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported);
extern uint64 ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize,
Expand Down
6 changes: 6 additions & 0 deletions src/include/distributed/citus_custom_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include "distributed/distributed_planner.h"
#include "distributed/multi_server_executor.h"

struct DistributedExecution;

typedef struct CitusScanState
{
CustomScanState customScanState; /* underlying custom scan node */
Expand All @@ -27,7 +29,11 @@ typedef struct CitusScanState
DistributedPlan *distributedPlan; /* distributed execution plan */
MultiExecutorType executorType; /* distributed executor type */
bool finishedRemoteScan; /* flag to check if remote scan is finished */
bool executionStarted; /* flag to check whether execution started */
Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */

/* execution state when using adaptive executor */
struct DistributedExecution *execution;
} CitusScanState;


Expand Down
3 changes: 2 additions & 1 deletion src/include/distributed/distributed_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ extern LOCKMODE GetQueryLockMode(Query *query);
extern int32 BlessRecordExpression(Expr *expr);
extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan);
extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan,
struct DistributedPlan *distributedPlan);
struct DistributedPlan *distributedPlan,
int cursorOptions);
extern bool ContainsSingleShardTable(Query *query);
extern RTEListProperties * GetRTEListPropertiesForQuery(Query *query);

Expand Down
2 changes: 0 additions & 2 deletions src/include/distributed/multi_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ extern int ExecutorLevel;
extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags);
extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count,
bool execute_once);
extern void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState);
extern TupleTableSlot * AdaptiveExecutor(CitusScanState *scanState);


/*
Expand Down
3 changes: 3 additions & 0 deletions src/test/regress/citus_tests/run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ def extra_tests(self):
"multi_subquery_in_where_reference_clause": TestDeps(
"minimal_schedule", ["multi_behavioral_analytics_create_table"]
),
"adaptive_executor_batching": TestDeps(
"minimal_schedule", ["multi_behavioral_analytics_create_table"]
),
"subquery_in_where": TestDeps(
"minimal_schedule", ["multi_behavioral_analytics_create_table"]
),
Expand Down
Loading
Loading