Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ RUN cp -r .pgenv/src .pgenv/pgsql-* .pgenv/config .pgenv-staging/
RUN rm .pgenv-staging/config/default.conf

FROM base AS pg18
RUN MAKEFLAGS="-j $(nproc)" pgenv build 18.1
RUN MAKEFLAGS="-j $(nproc)" pgenv build 18.3
RUN rm .pgenv/src/*.tar*
RUN make -C .pgenv/src/postgresql-*/ clean
RUN make -C .pgenv/src/postgresql-*/src/include install
Expand Down Expand Up @@ -216,7 +216,7 @@ COPY --chown=citus:citus .psqlrc .
RUN sudo chown --from=root:root citus:citus -R ~

# sets default pg version
RUN pgenv switch 18.1
RUN pgenv switch 18.3

# make connecting to the coordinator easy
ENV PGPORT=9700
4 changes: 2 additions & 2 deletions .devcontainer/src/test/regress/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ tornado = ">=6.5.1,<6.6.0"
zstandard = ">=0.25.0"
construct = "*"
docopt = "==0.6.2"
cryptography = "==44.0.3"
cryptography = "==46.0.5"
pytest = "*"
psycopg = "*"
filelock = "*"
Expand All @@ -25,7 +25,7 @@ pytest-timeout = "*"
pytest-xdist = "*"
pytest-repeat = "*"
pyyaml = "*"
werkzeug = "==3.1.4"
werkzeug = "==3.1.5"
"typing-extensions" = ">=4.13.2,<5"
pyperclip = "==1.9.0"

Expand Down
194 changes: 103 additions & 91 deletions .devcontainer/src/test/regress/Pipfile.lock

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ jobs:
pgupgrade_image_name: "ghcr.io/citusdata/pgupgradetester"
style_checker_image_name: "ghcr.io/citusdata/stylechecker"
style_checker_tools_version: "0.8.33"
sql_snapshot_pg_version: "18.1"
image_suffix: "-v181ce3c"
pg16_version: '{ "major": "16", "full": "16.11" }'
pg17_version: '{ "major": "17", "full": "17.7" }'
pg18_version: '{ "major": "18", "full": "18.1" }'
upgrade_pg_versions: "16.11-17.7-18.1"
sql_snapshot_pg_version: "18.3"
image_suffix: "-vac4338a"
pg16_version: '{ "major": "16", "full": "16.13" }'
pg17_version: '{ "major": "17", "full": "17.9" }'
pg18_version: '{ "major": "18", "full": "18.3" }'
upgrade_pg_versions: "16.13-17.9-18.3"
steps:
# Since GHA jobs need at least one step we use a noop step here.
- name: Set up parameters
Expand Down
5 changes: 3 additions & 2 deletions src/backend/distributed/commands/alter_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,7 @@ CreateTableConversion(TableConversionParameters *params)
}


Oid relam = relation->rd_rel->relam;
relation_close(relation, NoLock);
con->distributionKey =
BuildDistributionKeyFromColumnName(con->relationId, con->distributionColumn,
Expand All @@ -1363,11 +1364,11 @@ CreateTableConversion(TableConversionParameters *params)
if (!PartitionedTable(con->relationId) && !IsForeignTable(con->relationId))
{
HeapTuple amTuple = SearchSysCache1(AMOID, ObjectIdGetDatum(
relation->rd_rel->relam));
relam));
if (!HeapTupleIsValid(amTuple))
{
ereport(ERROR, (errmsg("cache lookup failed for access method %d",
relation->rd_rel->relam)));
relam)));
}
Form_pg_am amForm = (Form_pg_am) GETSTRUCT(amTuple);
con->originalAccessMethod = NameStr(amForm->amname);
Expand Down
3 changes: 2 additions & 1 deletion src/backend/distributed/commands/index.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand,
namespaceName);
}

Oid relationOid = relation->rd_id;
table_close(relation, NoLock);

Oid relationId = CreateIndexStmtGetRelationId(createIndexStatement);
Expand All @@ -199,7 +200,7 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand,
* it on a copy not to interfere with standard process utility.
*/
IndexStmt *copyCreateIndexStatement =
transformIndexStmt(relation->rd_id, copyObject(createIndexStatement),
transformIndexStmt(relationOid, copyObject(createIndexStatement),
createIndexCommand);

/* ensure we copy string into proper context */
Expand Down
93 changes: 86 additions & 7 deletions src/backend/distributed/deparser/deparse_statistics_stmts.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
#include "catalog/namespace.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
#include "parser/parse_expr.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/ruleutils.h"

#include "pg_version_constants.h"

#include "distributed/citus_ruleutils.h"
#include "distributed/commands.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/relay_utility.h"
Expand Down Expand Up @@ -231,6 +235,42 @@ AppendStatTypes(StringInfo buf, CreateStatsStmt *stmt)
}


/* See ruleutils.c in postgres for the logic here. */
static bool
looks_like_function(Node *node)
{
if (node == NULL)
{
return false; /* probably shouldn't happen */
}
switch (nodeTag(node))
{
case T_FuncExpr:
{
/* OK, unless it's going to deparse as a cast */
return (((FuncExpr *) node)->funcformat == COERCE_EXPLICIT_CALL ||
((FuncExpr *) node)->funcformat == COERCE_SQL_SYNTAX);
}

case T_NullIfExpr:
case T_CoalesceExpr:
case T_MinMaxExpr:
case T_SQLValueFunction:
case T_XmlExpr:
{
/* these are all accepted by func_expr_common_subexpr */
return true;
}

default:
{
break;
}
}
return false;
}


static void
AppendColumnNames(StringInfo buf, CreateStatsStmt *stmt)
{
Expand All @@ -240,15 +280,54 @@ AppendColumnNames(StringInfo buf, CreateStatsStmt *stmt)
{
if (!column->name)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("only simple column references are allowed "
"in CREATE STATISTICS")));
/*
* Since these expressions are parser statements, we first call
* transform to get the transformed Expr tree, and then deparse
* the transformed tree. This is similar to the logic found in
* deparse_table_stmts for check constraints.
*/
if (list_length(stmt->relations) != 1)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"cannot use expressions in CREATE STATISTICS with multiple relations")));
}

RangeVar *rel = (RangeVar *) linitial(stmt->relations);
bool missingOk = false;
Oid relOid = RangeVarGetRelid(rel, AccessShareLock, missingOk);

/* Add table name to the name space in parse state. Otherwise column names
* cannot be found.
*/
Relation relation = table_open(relOid, AccessShareLock);
ParseState *pstate = make_parsestate(NULL);
AddRangeTableEntryToQueryCompat(pstate, relation);
Node *exprCooked = transformExpr(pstate, column->expr,
EXPR_KIND_STATS_EXPRESSION);

char *relationName = get_rel_name(relOid);
List *relationCtx = deparse_context_for(relationName, relOid);

char *exprSql = deparse_expression(exprCooked, relationCtx, false, false);
relation_close(relation, NoLock);

/* Need parens if it's not a bare function call */
if (looks_like_function(exprCooked))
{
appendStringInfoString(buf, exprSql);
}
else
{
appendStringInfo(buf, "(%s)", exprSql);
}
}
else
{
const char *columnName = quote_identifier(column->name);

const char *columnName = quote_identifier(column->name);

appendStringInfoString(buf, columnName);
appendStringInfoString(buf, columnName);
}

if (column != llast(stmt->exprs))
{
Expand Down
33 changes: 29 additions & 4 deletions src/backend/distributed/planner/multi_logical_optimizer.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
int LimitClauseRowFetchCount = -1; /* number of rows to fetch from each task */
double CountDistinctErrorRate = 0.0; /* precision of count(distinct) approximate */
int CoordinatorAggregationStrategy = COORDINATOR_AGGREGATION_ROW_GATHER;
bool AllowAggregateWorkerCombineOnInternalTypes = true;

/* Constant used throughout file */
static const uint32 masterTableId = 1; /* first range table reference on the master node */
Expand Down Expand Up @@ -281,7 +282,7 @@ static Oid CitusFunctionOidWithSignature(char *functionName, int numargs, Oid *a
static Oid WorkerPartialAggOid(void);
static Oid WorkerBinaryPartialAggOid(void);
static Oid CoordBinaryCombineAggOid(void);
static bool IsTypeBinarySerializable(Oid transitionType);
static bool IsAggTransTypeBinarySerializable(Form_pg_aggregate aggForm);
static Oid CoordCombineAggOid(void);
static Oid AggregateFunctionOid(const char *functionName, Oid inputType);
static Oid TypeOid(Oid schemaId, const char *typeName);
Expand Down Expand Up @@ -2131,7 +2132,7 @@ MasterAggregateExpression(Aggref *originalAggregate,
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
combine = aggform->aggcombinefn;
useBinaryCoordinatorCombine = aggform->aggtranstype != InvalidOid &&
IsTypeBinarySerializable(aggform->aggtranstype);
IsAggTransTypeBinarySerializable(aggform);
ReleaseSysCache(aggTuple);
}

Expand Down Expand Up @@ -3296,7 +3297,7 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
combine = aggform->aggcombinefn;
useBinaryWorkerAggregate = (OidIsValid(aggform->aggtranstype) &&
IsTypeBinarySerializable(aggform->aggtranstype));
IsAggTransTypeBinarySerializable(aggform));

ReleaseSysCache(aggTuple);
}
Expand Down Expand Up @@ -3568,6 +3569,18 @@ AggregateEnabledCustom(Aggref *aggregateExpression)

bool supportsSafeCombine = typeform->typtype != TYPTYPE_PSEUDO;

if (AllowAggregateWorkerCombineOnInternalTypes &&
typeform->oid == INTERNALOID && !supportsSafeCombine)
{
/* check if the type supports a SERIALFUNC/DESERIALFUNC - if it does
* then we can leverage that for safe transfer of the state across the wire.
*/
if (aggform->aggserialfn != InvalidOid && aggform->aggdeserialfn != InvalidOid)
{
supportsSafeCombine = true;
}
}

ReleaseSysCache(aggTuple);
ReleaseSysCache(typeTuple);

Expand Down Expand Up @@ -3848,8 +3861,20 @@ TypeOid(Oid schemaId, const char *typeName)


static bool
IsTypeBinarySerializable(Oid transitionType)
IsAggTransTypeBinarySerializable(Form_pg_aggregate aggForm)
{
Oid transitionType = aggForm->aggtranstype;

if (AllowAggregateWorkerCombineOnInternalTypes &&
transitionType == INTERNALOID)
{
/* For aggregates with internal transition types, we apply the binary serialization
* check on the output value of the SERIALFUNC. If a serialfunc exists, Postgres
* requires that the serialfunc return a bytea - which will be binary serializable
*/
return (aggForm->aggserialfn != InvalidOid);
}

HeapTuple typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(transitionType));
if (!HeapTupleIsValid(typeTuple))
{
Expand Down
16 changes: 16 additions & 0 deletions src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,22 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomBoolVariable(
"citus.allow_aggregate_worker_combine_on_internal_types",
gettext_noop("Enables aggregate worker partial aggregates on aggregates that "
"have internal type for the aggregate partial state storage."),
gettext_noop(
"This setting allows the use of pushdown of custom aggregates that have "
"an STYPE that is internal. This is typically okay to do, but if a custom aggregate "
"persists OID information or any node specific data into the state, this can cause "
"weirdness when combining in the coordinator, so this is left as an option to turn off "
"in those cases worker combine functions on internal types."),
&AllowAggregateWorkerCombineOnInternalTypes,
true,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomBoolVariable(
"citus.allow_modifications_from_workers_to_replicated_tables",
gettext_noop("Enables modifications from workers to replicated "
Expand Down
Loading
Loading