Skip to content

Commit df74625

Browse files
authored
Merge branch 'main' into sortedMerge
2 parents 9e40e4c + 347d723 commit df74625

20 files changed

Lines changed: 809 additions & 254 deletions

.devcontainer/Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ RUN cp -r .pgenv/src .pgenv/pgsql-* .pgenv/config .pgenv-staging/
9797
RUN rm .pgenv-staging/config/default.conf
9898

9999
FROM base AS pg18
100-
RUN MAKEFLAGS="-j $(nproc)" pgenv build 18.1
100+
RUN MAKEFLAGS="-j $(nproc)" pgenv build 18.3
101101
RUN rm .pgenv/src/*.tar*
102102
RUN make -C .pgenv/src/postgresql-*/ clean
103103
RUN make -C .pgenv/src/postgresql-*/src/include install
@@ -216,7 +216,7 @@ COPY --chown=citus:citus .psqlrc .
216216
RUN sudo chown --from=root:root citus:citus -R ~
217217

218218
# sets default pg version
219-
RUN pgenv switch 18.1
219+
RUN pgenv switch 18.3
220220

221221
# make connecting to the coordinator easy
222222
ENV PGPORT=9700

.devcontainer/src/test/regress/Pipfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ tornado = ">=6.5.1,<6.6.0"
1616
zstandard = ">=0.25.0"
1717
construct = "*"
1818
docopt = "==0.6.2"
19-
cryptography = "==44.0.3"
19+
cryptography = "==46.0.5"
2020
pytest = "*"
2121
psycopg = "*"
2222
filelock = "*"
@@ -25,7 +25,7 @@ pytest-timeout = "*"
2525
pytest-xdist = "*"
2626
pytest-repeat = "*"
2727
pyyaml = "*"
28-
werkzeug = "==3.1.4"
28+
werkzeug = "==3.1.5"
2929
"typing-extensions" = ">=4.13.2,<5"
3030
pyperclip = "==1.9.0"
3131

.devcontainer/src/test/regress/Pipfile.lock

Lines changed: 103 additions & 91 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.github/workflows/build_and_test.yml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ jobs:
3131
pgupgrade_image_name: "ghcr.io/citusdata/pgupgradetester"
3232
style_checker_image_name: "ghcr.io/citusdata/stylechecker"
3333
style_checker_tools_version: "0.8.33"
34-
sql_snapshot_pg_version: "18.1"
35-
image_suffix: "-v181ce3c"
36-
pg16_version: '{ "major": "16", "full": "16.11" }'
37-
pg17_version: '{ "major": "17", "full": "17.7" }'
38-
pg18_version: '{ "major": "18", "full": "18.1" }'
39-
upgrade_pg_versions: "16.11-17.7-18.1"
34+
sql_snapshot_pg_version: "18.3"
35+
image_suffix: "-vac4338a"
36+
pg16_version: '{ "major": "16", "full": "16.13" }'
37+
pg17_version: '{ "major": "17", "full": "17.9" }'
38+
pg18_version: '{ "major": "18", "full": "18.3" }'
39+
upgrade_pg_versions: "16.13-17.9-18.3"
4040
steps:
4141
# Since GHA jobs need at least one step we use a noop step here.
4242
- name: Set up parameters

src/backend/distributed/deparser/deparse_statistics_stmts.c

Lines changed: 86 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@
1515
#include "catalog/namespace.h"
1616
#include "lib/stringinfo.h"
1717
#include "nodes/nodes.h"
18+
#include "parser/parse_expr.h"
1819
#include "utils/builtins.h"
20+
#include "utils/lsyscache.h"
21+
#include "utils/ruleutils.h"
1922

2023
#include "pg_version_constants.h"
2124

2225
#include "distributed/citus_ruleutils.h"
26+
#include "distributed/commands.h"
2327
#include "distributed/deparser.h"
2428
#include "distributed/listutils.h"
2529
#include "distributed/relay_utility.h"
@@ -231,6 +235,42 @@ AppendStatTypes(StringInfo buf, CreateStatsStmt *stmt)
231235
}
232236

233237

238+
/* See ruleutils.c in postgres for the logic here. */
239+
static bool
240+
looks_like_function(Node *node)
241+
{
242+
if (node == NULL)
243+
{
244+
return false; /* probably shouldn't happen */
245+
}
246+
switch (nodeTag(node))
247+
{
248+
case T_FuncExpr:
249+
{
250+
/* OK, unless it's going to deparse as a cast */
251+
return (((FuncExpr *) node)->funcformat == COERCE_EXPLICIT_CALL ||
252+
((FuncExpr *) node)->funcformat == COERCE_SQL_SYNTAX);
253+
}
254+
255+
case T_NullIfExpr:
256+
case T_CoalesceExpr:
257+
case T_MinMaxExpr:
258+
case T_SQLValueFunction:
259+
case T_XmlExpr:
260+
{
261+
/* these are all accepted by func_expr_common_subexpr */
262+
return true;
263+
}
264+
265+
default:
266+
{
267+
break;
268+
}
269+
}
270+
return false;
271+
}
272+
273+
234274
static void
235275
AppendColumnNames(StringInfo buf, CreateStatsStmt *stmt)
236276
{
@@ -240,15 +280,54 @@ AppendColumnNames(StringInfo buf, CreateStatsStmt *stmt)
240280
{
241281
if (!column->name)
242282
{
243-
ereport(ERROR,
244-
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
245-
errmsg("only simple column references are allowed "
246-
"in CREATE STATISTICS")));
283+
/*
284+
* Since these expressions are parser statements, we first call
285+
* transform to get the transformed Expr tree, and then deparse
286+
* the transformed tree. This is similar to the logic found in
287+
* deparse_table_stmts for check constraints.
288+
*/
289+
if (list_length(stmt->relations) != 1)
290+
{
291+
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
292+
errmsg(
293+
"cannot use expressions in CREATE STATISTICS with multiple relations")));
294+
}
295+
296+
RangeVar *rel = (RangeVar *) linitial(stmt->relations);
297+
bool missingOk = false;
298+
Oid relOid = RangeVarGetRelid(rel, AccessShareLock, missingOk);
299+
300+
/* Add table name to the name space in parse state. Otherwise column names
301+
* cannot be found.
302+
*/
303+
Relation relation = table_open(relOid, AccessShareLock);
304+
ParseState *pstate = make_parsestate(NULL);
305+
AddRangeTableEntryToQueryCompat(pstate, relation);
306+
Node *exprCooked = transformExpr(pstate, column->expr,
307+
EXPR_KIND_STATS_EXPRESSION);
308+
309+
char *relationName = get_rel_name(relOid);
310+
List *relationCtx = deparse_context_for(relationName, relOid);
311+
312+
char *exprSql = deparse_expression(exprCooked, relationCtx, false, false);
313+
relation_close(relation, NoLock);
314+
315+
/* Need parens if it's not a bare function call */
316+
if (looks_like_function(exprCooked))
317+
{
318+
appendStringInfoString(buf, exprSql);
319+
}
320+
else
321+
{
322+
appendStringInfo(buf, "(%s)", exprSql);
323+
}
247324
}
325+
else
326+
{
327+
const char *columnName = quote_identifier(column->name);
248328

249-
const char *columnName = quote_identifier(column->name);
250-
251-
appendStringInfoString(buf, columnName);
329+
appendStringInfoString(buf, columnName);
330+
}
252331

253332
if (column != llast(stmt->exprs))
254333
{

src/backend/distributed/planner/multi_logical_optimizer.c

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
int LimitClauseRowFetchCount = -1; /* number of rows to fetch from each task */
6666
double CountDistinctErrorRate = 0.0; /* precision of count(distinct) approximate */
6767
int CoordinatorAggregationStrategy = COORDINATOR_AGGREGATION_ROW_GATHER;
68+
bool AllowAggregateWorkerCombineOnInternalTypes = true;
6869

6970
/* Constant used throughout file */
7071
static const uint32 masterTableId = 1; /* first range table reference on the master node */
@@ -282,7 +283,7 @@ static Oid CitusFunctionOidWithSignature(char *functionName, int numargs, Oid *a
282283
static Oid WorkerPartialAggOid(void);
283284
static Oid WorkerBinaryPartialAggOid(void);
284285
static Oid CoordBinaryCombineAggOid(void);
285-
static bool IsTypeBinarySerializable(Oid transitionType);
286+
static bool IsAggTransTypeBinarySerializable(Form_pg_aggregate aggForm);
286287
static Oid CoordCombineAggOid(void);
287288
static Oid AggregateFunctionOid(const char *functionName, Oid inputType);
288289
static Oid TypeOid(Oid schemaId, const char *typeName);
@@ -2133,7 +2134,7 @@ MasterAggregateExpression(Aggref *originalAggregate,
21332134
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
21342135
combine = aggform->aggcombinefn;
21352136
useBinaryCoordinatorCombine = aggform->aggtranstype != InvalidOid &&
2136-
IsTypeBinarySerializable(aggform->aggtranstype);
2137+
IsAggTransTypeBinarySerializable(aggform);
21372138
ReleaseSysCache(aggTuple);
21382139
}
21392140

@@ -3323,7 +3324,7 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
33233324
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
33243325
combine = aggform->aggcombinefn;
33253326
useBinaryWorkerAggregate = (OidIsValid(aggform->aggtranstype) &&
3326-
IsTypeBinarySerializable(aggform->aggtranstype));
3327+
IsAggTransTypeBinarySerializable(aggform));
33273328

33283329
ReleaseSysCache(aggTuple);
33293330
}
@@ -3595,6 +3596,18 @@ AggregateEnabledCustom(Aggref *aggregateExpression)
35953596

35963597
bool supportsSafeCombine = typeform->typtype != TYPTYPE_PSEUDO;
35973598

3599+
if (AllowAggregateWorkerCombineOnInternalTypes &&
3600+
typeform->oid == INTERNALOID && !supportsSafeCombine)
3601+
{
3602+
/* check if the type supports a SERIALFUNC/DESERIALFUNC - if it does
3603+
* then we can leverage that for safe transfer of the state across the wire.
3604+
*/
3605+
if (aggform->aggserialfn != InvalidOid && aggform->aggdeserialfn != InvalidOid)
3606+
{
3607+
supportsSafeCombine = true;
3608+
}
3609+
}
3610+
35983611
ReleaseSysCache(aggTuple);
35993612
ReleaseSysCache(typeTuple);
36003613

@@ -3875,8 +3888,20 @@ TypeOid(Oid schemaId, const char *typeName)
38753888

38763889

38773890
static bool
3878-
IsTypeBinarySerializable(Oid transitionType)
3891+
IsAggTransTypeBinarySerializable(Form_pg_aggregate aggForm)
38793892
{
3893+
Oid transitionType = aggForm->aggtranstype;
3894+
3895+
if (AllowAggregateWorkerCombineOnInternalTypes &&
3896+
transitionType == INTERNALOID)
3897+
{
3898+
/* For aggregates with internal transition types, we apply the binary serialization
3899+
* check on the output value of the SERIALFUNC. If a serialfunc exists, Postgres
3900+
* requires that the serialfunc return a bytea - which will be binary serializable
3901+
*/
3902+
return (aggForm->aggserialfn != InvalidOid);
3903+
}
3904+
38803905
HeapTuple typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(transitionType));
38813906
if (!HeapTupleIsValid(typeTuple))
38823907
{

src/backend/distributed/shared_library_init.c

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,22 @@ RegisterCitusConfigVariables(void)
984984
GUC_STANDARD,
985985
NULL, NULL, NULL);
986986

987+
DefineCustomBoolVariable(
988+
"citus.allow_aggregate_worker_combine_on_internal_types",
989+
gettext_noop("Enables aggregate worker partial aggregates on aggregates that "
990+
"have internal type for the aggregate partial state storage."),
991+
gettext_noop(
992+
"This setting allows the use of pushdown of custom aggregates that have "
993+
"an STYPE that is internal. This is typically okay to do, but if a custom aggregate "
994+
"persists OID information or any node specific data into the state, this can cause "
995+
"weirdness when combining in the coordinator, so this is left as an option to turn off "
996+
"in those cases worker combine functions on internal types."),
997+
&AllowAggregateWorkerCombineOnInternalTypes,
998+
true,
999+
PGC_USERSET,
1000+
GUC_STANDARD,
1001+
NULL, NULL, NULL);
1002+
9871003
DefineCustomBoolVariable(
9881004
"citus.allow_modifications_from_workers_to_replicated_tables",
9891005
gettext_noop("Enables modifications from workers to replicated "

0 commit comments

Comments
 (0)