diff --git a/contrib/citext/Makefile b/contrib/citext/Makefile
index b9b43ee787f..f8bb56c0975 100644
--- a/contrib/citext/Makefile
+++ b/contrib/citext/Makefile
@@ -11,7 +11,7 @@ DATA = citext--1.4.sql \
citext--1.0--1.1.sql
PGFILEDESC = "citext - case-insensitive character string data type"
-REGRESS = citext
+REGRESS = create_index_acl citext
REGRESS_OPTS += --init-file=$(top_srcdir)/src/test/regress/init_file
ifdef USE_PGXS
diff --git a/contrib/citext/expected/create_index_acl.out b/contrib/citext/expected/create_index_acl.out
new file mode 100644
index 00000000000..f35f60b421d
--- /dev/null
+++ b/contrib/citext/expected/create_index_acl.out
@@ -0,0 +1,78 @@
+-- Each DefineIndex() ACL check uses either the original userid or the table
+-- owner userid; see its header comment. Here, confirm that DefineIndex()
+-- uses its original userid where necessary. The test works by creating
+-- indexes that refer to as many sorts of objects as possible, with the table
+-- owner having as few applicable privileges as possible. (The privileges.sql
+-- regress_sro_user tests look for the opposite defect; they confirm that
+-- DefineIndex() uses the table owner userid where necessary.)
+-- Don't override tablespaces; this version lacks allow_in_place_tablespaces.
+BEGIN;
+CREATE ROLE regress_minimal;
+CREATE SCHEMA s;
+CREATE EXTENSION citext SCHEMA s;
+-- Revoke all conceivably-relevant ACLs within the extension. The system
+-- doesn't check all these ACLs, but this will provide some coverage if that
+-- ever changes.
+REVOKE ALL ON TYPE s.citext FROM PUBLIC;
+REVOKE ALL ON FUNCTION s.citext_pattern_lt FROM PUBLIC;
+REVOKE ALL ON FUNCTION s.citext_pattern_le FROM PUBLIC;
+REVOKE ALL ON FUNCTION s.citext_eq FROM PUBLIC;
+REVOKE ALL ON FUNCTION s.citext_pattern_ge FROM PUBLIC;
+REVOKE ALL ON FUNCTION s.citext_pattern_gt FROM PUBLIC;
+REVOKE ALL ON FUNCTION s.citext_pattern_cmp FROM PUBLIC;
+-- Functions sufficient for making an index column that has the side effect of
+-- changing search_path at expression planning time.
+CREATE FUNCTION public.setter() RETURNS bool VOLATILE
+ LANGUAGE SQL AS $$SET search_path = s; SELECT true$$;
+CREATE FUNCTION s.const() RETURNS bool IMMUTABLE
+ LANGUAGE SQL AS $$SELECT public.setter()$$;
+CREATE FUNCTION s.index_this_expr(s.citext, bool) RETURNS s.citext IMMUTABLE
+ LANGUAGE SQL AS $$SELECT $1$$;
+REVOKE ALL ON FUNCTION public.setter FROM PUBLIC;
+REVOKE ALL ON FUNCTION s.const FROM PUBLIC;
+REVOKE ALL ON FUNCTION s.index_this_expr FROM PUBLIC;
+-- Even for an empty table, expression planning calls s.const & public.setter.
+GRANT EXECUTE ON FUNCTION public.setter TO regress_minimal;
+GRANT EXECUTE ON FUNCTION s.const TO regress_minimal;
+-- Function for index predicate.
+CREATE FUNCTION s.index_row_if(s.citext) RETURNS bool IMMUTABLE
+ LANGUAGE SQL AS $$SELECT $1 IS NOT NULL$$;
+REVOKE ALL ON FUNCTION s.index_row_if FROM PUBLIC;
+-- Even for an empty table, CREATE INDEX checks ii_Predicate permissions.
+GRANT EXECUTE ON FUNCTION s.index_row_if TO regress_minimal;
+-- Non-extension, non-function objects.
+CREATE COLLATION s.coll (LOCALE="C");
+CREATE TABLE s.x (y s.citext);
+ALTER TABLE s.x OWNER TO regress_minimal;
+-- Empty-table DefineIndex()
+CREATE UNIQUE INDEX u0rows ON s.x USING btree
+ ((s.index_this_expr(y, s.const())) COLLATE s.coll s.citext_pattern_ops)
+ WHERE s.index_row_if(y);
+ALTER TABLE s.x ADD CONSTRAINT e0rows EXCLUDE USING btree
+ ((s.index_this_expr(y, s.const())) COLLATE s.coll WITH s.=)
+ WHERE (s.index_row_if(y));
+-- Make the table nonempty.
+INSERT INTO s.x VALUES ('foo'), ('bar');
+-- If the INSERT runs the planner on index expressions, a search_path change
+-- survives. As of 2022-06, the INSERT reuses a cached plan. It does so even
+-- under debug_discard_caches, since each index is new-in-transaction. If
+-- future work changes a cache lifecycle, this RESET may become necessary.
+RESET search_path;
+-- For a nonempty table, owner needs permissions throughout ii_Expressions.
+GRANT EXECUTE ON FUNCTION s.index_this_expr TO regress_minimal;
+CREATE UNIQUE INDEX u2rows ON s.x USING btree
+ ((s.index_this_expr(y, s.const())) COLLATE s.coll s.citext_pattern_ops)
+ WHERE s.index_row_if(y);
+ALTER TABLE s.x ADD CONSTRAINT e2rows EXCLUDE USING btree
+ ((s.index_this_expr(y, s.const())) COLLATE s.coll WITH s.=)
+ WHERE (s.index_row_if(y));
+-- Shall not find s.coll via search_path, despite the s.const->public.setter
+-- call having set search_path=s during expression planning. Suppress the
+-- message itself, which depends on the database encoding.
+\set VERBOSITY sqlstate
+ALTER TABLE s.x ADD CONSTRAINT underqualified EXCLUDE USING btree
+ ((s.index_this_expr(y, s.const())) COLLATE coll WITH s.=)
+ WHERE (s.index_row_if(y));
+ERROR: 42704
+\set VERBOSITY default
+ROLLBACK;
diff --git a/contrib/citext/sql/create_index_acl.sql b/contrib/citext/sql/create_index_acl.sql
new file mode 100644
index 00000000000..a5f4e6b30a8
--- /dev/null
+++ b/contrib/citext/sql/create_index_acl.sql
@@ -0,0 +1,79 @@
+-- Each DefineIndex() ACL check uses either the original userid or the table
+-- owner userid; see its header comment. Here, confirm that DefineIndex()
+-- uses its original userid where necessary. The test works by creating
+-- indexes that refer to as many sorts of objects as possible, with the table
+-- owner having as few applicable privileges as possible. (The privileges.sql
+-- regress_sro_user tests look for the opposite defect; they confirm that
+-- DefineIndex() uses the table owner userid where necessary.)
+
+-- Don't override tablespaces; this version lacks allow_in_place_tablespaces.
+
+BEGIN;
+CREATE ROLE regress_minimal;
+CREATE SCHEMA s;
+CREATE EXTENSION citext SCHEMA s;
+-- Revoke all conceivably-relevant ACLs within the extension. The system
+-- doesn't check all these ACLs, but this will provide some coverage if that
+-- ever changes.
+REVOKE ALL ON TYPE s.citext FROM PUBLIC;
+REVOKE ALL ON FUNCTION s.citext_pattern_lt FROM PUBLIC;
+REVOKE ALL ON FUNCTION s.citext_pattern_le FROM PUBLIC;
+REVOKE ALL ON FUNCTION s.citext_eq FROM PUBLIC;
+REVOKE ALL ON FUNCTION s.citext_pattern_ge FROM PUBLIC;
+REVOKE ALL ON FUNCTION s.citext_pattern_gt FROM PUBLIC;
+REVOKE ALL ON FUNCTION s.citext_pattern_cmp FROM PUBLIC;
+-- Functions sufficient for making an index column that has the side effect of
+-- changing search_path at expression planning time.
+CREATE FUNCTION public.setter() RETURNS bool VOLATILE
+ LANGUAGE SQL AS $$SET search_path = s; SELECT true$$;
+CREATE FUNCTION s.const() RETURNS bool IMMUTABLE
+ LANGUAGE SQL AS $$SELECT public.setter()$$;
+CREATE FUNCTION s.index_this_expr(s.citext, bool) RETURNS s.citext IMMUTABLE
+ LANGUAGE SQL AS $$SELECT $1$$;
+REVOKE ALL ON FUNCTION public.setter FROM PUBLIC;
+REVOKE ALL ON FUNCTION s.const FROM PUBLIC;
+REVOKE ALL ON FUNCTION s.index_this_expr FROM PUBLIC;
+-- Even for an empty table, expression planning calls s.const & public.setter.
+GRANT EXECUTE ON FUNCTION public.setter TO regress_minimal;
+GRANT EXECUTE ON FUNCTION s.const TO regress_minimal;
+-- Function for index predicate.
+CREATE FUNCTION s.index_row_if(s.citext) RETURNS bool IMMUTABLE
+ LANGUAGE SQL AS $$SELECT $1 IS NOT NULL$$;
+REVOKE ALL ON FUNCTION s.index_row_if FROM PUBLIC;
+-- Even for an empty table, CREATE INDEX checks ii_Predicate permissions.
+GRANT EXECUTE ON FUNCTION s.index_row_if TO regress_minimal;
+-- Non-extension, non-function objects.
+CREATE COLLATION s.coll (LOCALE="C");
+CREATE TABLE s.x (y s.citext);
+ALTER TABLE s.x OWNER TO regress_minimal;
+-- Empty-table DefineIndex()
+CREATE UNIQUE INDEX u0rows ON s.x USING btree
+ ((s.index_this_expr(y, s.const())) COLLATE s.coll s.citext_pattern_ops)
+ WHERE s.index_row_if(y);
+ALTER TABLE s.x ADD CONSTRAINT e0rows EXCLUDE USING btree
+ ((s.index_this_expr(y, s.const())) COLLATE s.coll WITH s.=)
+ WHERE (s.index_row_if(y));
+-- Make the table nonempty.
+INSERT INTO s.x VALUES ('foo'), ('bar');
+-- If the INSERT runs the planner on index expressions, a search_path change
+-- survives. As of 2022-06, the INSERT reuses a cached plan. It does so even
+-- under debug_discard_caches, since each index is new-in-transaction. If
+-- future work changes a cache lifecycle, this RESET may become necessary.
+RESET search_path;
+-- For a nonempty table, owner needs permissions throughout ii_Expressions.
+GRANT EXECUTE ON FUNCTION s.index_this_expr TO regress_minimal;
+CREATE UNIQUE INDEX u2rows ON s.x USING btree
+ ((s.index_this_expr(y, s.const())) COLLATE s.coll s.citext_pattern_ops)
+ WHERE s.index_row_if(y);
+ALTER TABLE s.x ADD CONSTRAINT e2rows EXCLUDE USING btree
+ ((s.index_this_expr(y, s.const())) COLLATE s.coll WITH s.=)
+ WHERE (s.index_row_if(y));
+-- Shall not find s.coll via search_path, despite the s.const->public.setter
+-- call having set search_path=s during expression planning. Suppress the
+-- message itself, which depends on the database encoding.
+\set VERBOSITY sqlstate
+ALTER TABLE s.x ADD CONSTRAINT underqualified EXCLUDE USING btree
+ ((s.index_this_expr(y, s.const())) COLLATE coll WITH s.=)
+ WHERE (s.index_row_if(y));
+\set VERBOSITY default
+ROLLBACK;
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 8266615ea47..ceb09d788cc 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -15446,6 +15446,10 @@ table2-mapping
json_array_length('[1,2,3,{"f1":1,"f2":[5,6]},4]')
5
+
+
+ jsonb_array_length('[]')
+ 0
@@ -17887,10 +17891,19 @@ SELECT NULLIF(value, '(none)') ...
Returns the length of the requested array dimension.
+ (Produces NULL instead of 0 for empty or missing array dimensions.)
array_length(array[1,2,3], 1)
3
+
+
+ array_length(array[]::int[], 1)
+ NULL
+
+
+ array_length(array['text'], 2)
+ NULL
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index a265409f025..eaa6f4b53cc 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -2194,7 +2194,7 @@ HINT: You can then restart the server after making the necessary configuration
Currently, temporary table creation is not allowed during read-only
transactions, so in some cases existing scripts will not run correctly.
This restriction might be relaxed in a later release. This is
- both an SQL Standard compliance issue and a technical issue.
+ both an SQL standard compliance issue and a technical issue.
diff --git a/doc/src/sgml/json.sgml b/doc/src/sgml/json.sgml
index a173368229b..c421d4ba75a 100644
--- a/doc/src/sgml/json.sgml
+++ b/doc/src/sgml/json.sgml
@@ -701,10 +701,10 @@ UPDATE table_name SET jsonb_field[2] = '2';
assigned value can be placed.
--- Where jsonb_field was {}, it is now {'a': [{'b': 1}]}
+-- Where jsonb_field was {}, it is now {"a": [{"b": 1}]}
UPDATE table_name SET jsonb_field['a'][0]['b'] = '1';
--- Where jsonb_field was [], it is now [null, {'a': 1}]
+-- Where jsonb_field was [], it is now [null, {"a": 1}]
UPDATE table_name SET jsonb_field[1]['a'] = '1';
diff --git a/doc/src/sgml/mvcc.sgml b/doc/src/sgml/mvcc.sgml
index 6c94f6a9429..3d3cbb339ce 100644
--- a/doc/src/sgml/mvcc.sgml
+++ b/doc/src/sgml/mvcc.sgml
@@ -277,9 +277,10 @@
The table also shows that PostgreSQL's Repeatable Read implementation
- does not allow phantom reads. Stricter behavior is permitted by the
- SQL standard: the four isolation levels only define which phenomena
- must not happen, not which phenomena must happen.
+ does not allow phantom reads. This is acceptable under the SQL
+ standard because the standard specifies which anomalies must
+ not occur at certain isolation levels; higher
+ guarantees are acceptable.
The behavior of the available isolation levels is detailed in the
following subsections.
diff --git a/doc/src/sgml/plpgsql.sgml b/doc/src/sgml/plpgsql.sgml
index 4cd4bcba802..22fa317f7b5 100644
--- a/doc/src/sgml/plpgsql.sgml
+++ b/doc/src/sgml/plpgsql.sgml
@@ -3767,7 +3767,7 @@ RAISE ;
After level if any,
- you can write a format
+ you can specify a format string
(which must be a simple string literal, not an expression). The
format string specifies the error message text to be reported.
The format string can be followed
diff --git a/src/backend/access/transam/transam.c b/src/backend/access/transam/transam.c
index 1c881550b65..f7c8e1b3466 100644
--- a/src/backend/access/transam/transam.c
+++ b/src/backend/access/transam/transam.c
@@ -249,10 +249,15 @@ TransactionIdDidAbortForReader(TransactionId transactionId)
*
* This does NOT look into pg_xact but merely probes our local cache
* (and so it's not named TransactionIdDidComplete, which would be the
- * appropriate name for a function that worked that way). The intended
- * use is just to short-circuit TransactionIdIsInProgress calls when doing
- * repeated heapam_visibility.c checks for the same XID. If this isn't
- * extremely fast then it will be counterproductive.
+ * appropriate name for a function that worked that way).
+ *
+ * NB: This is unused, and will be removed in v15. This was used to
+ * short-circuit TransactionIdIsInProgress, but that was wrong for a
+ * transaction that was known to be marked as committed in CLOG but not
+ * yet removed from the proc array. This is kept in backbranches just in
+ * case it is still used by extensions. However, extensions doing
+ * something similar to tuple visibility checks should also be careful to
+ * check the proc array first!
*
* Note:
* Assumes transaction identifier is valid.
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 2065667ce42..ca023623955 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -98,7 +98,10 @@ static void ComputeIndexAttrs(IndexInfo *indexInfo,
Oid relId,
const char *accessMethodName, Oid accessMethodId,
bool amcanorder,
- bool isconstraint);
+ bool isconstraint,
+ Oid ddl_userid,
+ int ddl_sec_context,
+ int *ddl_save_nestlevel);
static char *ChooseIndexName(const char *tabname, Oid namespaceId,
List *colnames, List *exclusionOpNames,
bool primary, bool isconstraint);
@@ -397,9 +400,8 @@ CheckIndexCompatible(Oid oldId,
* Compute the operator classes, collations, and exclusion operators for
* the new index, so we can test whether it's compatible with the existing
* one. Note that ComputeIndexAttrs might fail here, but that's OK:
- * DefineIndex would have called this function with the same arguments
- * later on, and it would have failed then anyway. Our attributeList
- * contains only key attributes, thus we're filling ii_NumIndexAttrs and
+ * DefineIndex would have failed later. Our attributeList contains only
+ * key attributes, thus we're filling ii_NumIndexAttrs and
* ii_NumIndexKeyAttrs with same value.
*/
indexInfo = makeIndexInfo(numberOfAttributes, numberOfAttributes,
@@ -413,7 +415,7 @@ CheckIndexCompatible(Oid oldId,
coloptions, attributeList,
exclusionOpNames, relationId,
accessMethodName, accessMethodId,
- amcanorder, isconstraint);
+ amcanorder, isconstraint, InvalidOid, 0, NULL);
/* Get the soon-obsolete pg_index tuple. */
@@ -659,6 +661,19 @@ WaitForOlderSnapshots(TransactionId limitXmin, bool progress)
* DefineIndex
* Creates a new index.
*
+ * This function manages the current userid according to the needs of pg_dump.
+ * Recreating old-database catalog entries in new-database is fine, regardless
+ * of which users would have permission to recreate those entries now. That's
+ * just preservation of state. Running opaque expressions, like calling a
+ * function named in a catalog entry or evaluating a pg_node_tree in a catalog
+ * entry, as anyone other than the object owner, is not fine. To adhere to
+ * those principles and to remain fail-safe, use the table owner userid for
+ * most ACL checks. Use the original userid for ACL checks reached without
+ * traversing opaque expressions. (pg_dump can predict such ACL checks from
+ * catalogs.) Overall, this is a mess. Future DDL development should
+ * consider offering one DDL command for catalog setup and a separate DDL
+ * command for steps that run opaque expressions.
+ *
* 'relationId': the OID of the heap relation on which the index is to be
* created
* 'stmt': IndexStmt describing the properties of the new index.
@@ -1184,7 +1199,8 @@ DefineIndex(Oid relationId,
coloptions, allIndexParams,
stmt->excludeOpNames, relationId,
accessMethodName, accessMethodId,
- amcanorder, stmt->isconstraint);
+ amcanorder, stmt->isconstraint, root_save_userid,
+ root_save_sec_context, &root_save_nestlevel);
/*
* We disallow unique indexes on IVM columns of IMMVs.
@@ -1623,11 +1639,8 @@ DefineIndex(Oid relationId,
/*
* Roll back any GUC changes executed by index functions, and keep
- * subsequent changes local to this command. It's barely possible that
- * some index function changed a behavior-affecting GUC, e.g. xmloption,
- * that affects subsequent steps. This improves bug-compatibility with
- * older PostgreSQL versions. They did the AtEOXact_GUC() here for the
- * purpose of clearing the above default_tablespace change.
+ * subsequent changes local to this command. This is essential if some
+ * index function changed a behavior-affecting GUC, e.g. search_path.
*/
AtEOXact_GUC(false, root_save_nestlevel);
root_save_nestlevel = NewGUCNestLevel();
@@ -2282,6 +2295,10 @@ CheckPredicate(Expr *predicate)
* Compute per-index-column information, including indexed column numbers
* or index expressions, opclasses and their options. Note, all output vectors
* should be allocated for all columns, including "including" ones.
+ *
+ * If the caller switched to the table owner, ddl_userid is the role for ACL
+ * checks reached without traversing opaque expressions. Otherwise, it's
+ * InvalidOid, and other ddl_* arguments are undefined.
*/
static void
ComputeIndexAttrs(IndexInfo *indexInfo,
@@ -2295,12 +2312,17 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
const char *accessMethodName,
Oid accessMethodId,
bool amcanorder,
- bool isconstraint)
+ bool isconstraint,
+ Oid ddl_userid,
+ int ddl_sec_context,
+ int *ddl_save_nestlevel)
{
ListCell *nextExclOp;
ListCell *lc;
int attn;
int nkeycols = indexInfo->ii_NumIndexKeyAttrs;
+ Oid save_userid;
+ int save_sec_context;
/* Allocate space for exclusion operator info, if needed */
if (exclusionOpNames)
@@ -2314,6 +2336,9 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
else
nextExclOp = NULL;
+ if (OidIsValid(ddl_userid))
+ GetUserIdAndSecContext(&save_userid, &save_sec_context);
+
/*
* process attributeList
*/
@@ -2450,10 +2475,24 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
}
/*
- * Apply collation override if any
+ * Apply collation override if any. Use of ddl_userid is necessary
+ * due to ACL checks therein, and it's safe because collations don't
+ * contain opaque expressions (or non-opaque expressions).
*/
if (attribute->collation)
+ {
+ if (OidIsValid(ddl_userid))
+ {
+ AtEOXact_GUC(false, *ddl_save_nestlevel);
+ SetUserIdAndSecContext(ddl_userid, ddl_sec_context);
+ }
attcollation = get_collation_oid(attribute->collation, false);
+ if (OidIsValid(ddl_userid))
+ {
+ SetUserIdAndSecContext(save_userid, save_sec_context);
+ *ddl_save_nestlevel = NewGUCNestLevel();
+ }
+ }
/*
* Check we have a collation iff it's a collatable type. The only
@@ -2481,12 +2520,25 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
collationOidP[attn] = attcollation;
/*
- * Identify the opclass to use.
+ * Identify the opclass to use. Use of ddl_userid is necessary due to
+ * ACL checks therein. This is safe despite opclasses containing
+ * opaque expressions (specifically, functions), because only
+ * superusers can define opclasses.
*/
+ if (OidIsValid(ddl_userid))
+ {
+ AtEOXact_GUC(false, *ddl_save_nestlevel);
+ SetUserIdAndSecContext(ddl_userid, ddl_sec_context);
+ }
classOidP[attn] = ResolveOpClass(attribute->opclass,
atttype,
accessMethodName,
accessMethodId);
+ if (OidIsValid(ddl_userid))
+ {
+ SetUserIdAndSecContext(save_userid, save_sec_context);
+ *ddl_save_nestlevel = NewGUCNestLevel();
+ }
/*
* Identify the exclusion operator, if any.
@@ -2500,9 +2552,23 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
/*
* Find the operator --- it must accept the column datatype
- * without runtime coercion (but binary compatibility is OK)
+ * without runtime coercion (but binary compatibility is OK).
+ * Operators contain opaque expressions (specifically, functions).
+ * compatible_oper_opid() boils down to oper() and
+ * IsBinaryCoercible(). PostgreSQL would have security problems
+ * elsewhere if oper() started calling opaque expressions.
*/
+ if (OidIsValid(ddl_userid))
+ {
+ AtEOXact_GUC(false, *ddl_save_nestlevel);
+ SetUserIdAndSecContext(ddl_userid, ddl_sec_context);
+ }
opid = compatible_oper_opid(opname, atttype, atttype, false);
+ if (OidIsValid(ddl_userid))
+ {
+ SetUserIdAndSecContext(save_userid, save_sec_context);
+ *ddl_save_nestlevel = NewGUCNestLevel();
+ }
/*
* Only allow commutative operators to be used in exclusion
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 4a2ddd5dff3..5db53b125ee 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -442,16 +442,6 @@ SPI_rollback_and_chain(void)
_SPI_rollback(true);
}
-/*
- * SPICleanup is a no-op, kept for backwards compatibility. We rely on
- * AtEOXact_SPI to cleanup. Extensions should not (need to) fiddle with the
- * internal SPI state directly.
- */
-void
-SPICleanup(void)
-{
-}
-
/*
* Clean up SPI state at transaction commit or abort.
*/
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index fad8c92b2ef..bd303546cce 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -249,6 +249,67 @@ logicalrep_report_missing_attrs(LogicalRepRelation *remoterel,
}
}
+/*
+ * Check if replica identity matches and mark the updatable flag.
+ *
+ * We allow for stricter replica identity (fewer columns) on subscriber as
+ * that will not stop us from finding unique tuple. IE, if publisher has
+ * identity (id,timestamp) and subscriber just (id) this will not be a
+ * problem, but in the opposite scenario it will.
+ *
+ * We just mark the relation entry as not updatable here if the local
+ * replica identity is found to be insufficient for applying
+ * updates/deletes (inserts don't care!) and leave it to
+ * check_relation_updatable() to throw the actual error if needed.
+ */
+static void
+logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry)
+{
+ Bitmapset *idkey;
+ LogicalRepRelation *remoterel = &entry->remoterel;
+ int i;
+
+ entry->updatable = true;
+
+ idkey = RelationGetIndexAttrBitmap(entry->localrel,
+ INDEX_ATTR_BITMAP_IDENTITY_KEY);
+ /* fallback to PK if no replica identity */
+ if (idkey == NULL)
+ {
+ idkey = RelationGetIndexAttrBitmap(entry->localrel,
+ INDEX_ATTR_BITMAP_PRIMARY_KEY);
+
+ /*
+ * If no replica identity index and no PK, the published table must
+ * have replica identity FULL.
+ */
+ if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
+ entry->updatable = false;
+ }
+
+ i = -1;
+ while ((i = bms_next_member(idkey, i)) >= 0)
+ {
+ int attnum = i + FirstLowInvalidHeapAttributeNumber;
+
+ if (!AttrNumberIsForUserDefinedAttr(attnum))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication target relation \"%s.%s\" uses "
+ "system columns in REPLICA IDENTITY index",
+ remoterel->nspname, remoterel->relname)));
+
+ attnum = AttrNumberGetAttrOffset(attnum);
+
+ if (entry->attrmap->attnums[attnum] < 0 ||
+ !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
+ {
+ entry->updatable = false;
+ break;
+ }
+ }
+}
+
/*
* Open the local relation associated with the remote one.
*
@@ -307,7 +368,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
if (!entry->localrelvalid)
{
Oid relid;
- Bitmapset *idkey;
TupleDesc desc;
MemoryContext oldctx;
int i;
@@ -316,7 +376,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
/* Release the no-longer-useful attrmap, if any. */
if (entry->attrmap)
{
- pfree(entry->attrmap);
+ free_attrmap(entry->attrmap);
entry->attrmap = NULL;
}
@@ -373,54 +433,10 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
bms_free(missingatts);
/*
- * Check that replica identity matches. We allow for stricter replica
- * identity (fewer columns) on subscriber as that will not stop us
- * from finding unique tuple. IE, if publisher has identity
- * (id,timestamp) and subscriber just (id) this will not be a problem,
- * but in the opposite scenario it will.
- *
- * Don't throw any error here just mark the relation entry as not
- * updatable, as replica identity is only for updates and deletes but
- * inserts can be replicated even without it.
+ * Set if the table's replica identity is enough to apply
+ * update/delete.
*/
- entry->updatable = true;
- idkey = RelationGetIndexAttrBitmap(entry->localrel,
- INDEX_ATTR_BITMAP_IDENTITY_KEY);
- /* fallback to PK if no replica identity */
- if (idkey == NULL)
- {
- idkey = RelationGetIndexAttrBitmap(entry->localrel,
- INDEX_ATTR_BITMAP_PRIMARY_KEY);
-
- /*
- * If no replica identity index and no PK, the published table
- * must have replica identity FULL.
- */
- if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
- entry->updatable = false;
- }
-
- i = -1;
- while ((i = bms_next_member(idkey, i)) >= 0)
- {
- int attnum = i + FirstLowInvalidHeapAttributeNumber;
-
- if (!AttrNumberIsForUserDefinedAttr(attnum))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("logical replication target relation \"%s.%s\" uses "
- "system columns in REPLICA IDENTITY index",
- remoterel->nspname, remoterel->relname)));
-
- attnum = AttrNumberGetAttrOffset(attnum);
-
- if (entry->attrmap->attnums[attnum] < 0 ||
- !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
- {
- entry->updatable = false;
- break;
- }
- }
+ logicalrep_rel_mark_updatable(entry);
entry->localrelvalid = true;
}
@@ -493,6 +509,40 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
}
}
+/*
+ * Reset the entries in the partition map that refer to remoterel.
+ *
+ * Called when new relation mapping is sent by the publisher to update our
+ * expected view of incoming data from said publisher.
+ *
+ * Note that we don't update the remoterel information in the entry here,
+ * we will update the information in logicalrep_partition_open to avoid
+ * unnecessary work.
+ */
+void
+logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
+{
+ HASH_SEQ_STATUS status;
+ LogicalRepPartMapEntry *part_entry;
+ LogicalRepRelMapEntry *entry;
+
+ if (LogicalRepPartMap == NULL)
+ return;
+
+ hash_seq_init(&status, LogicalRepPartMap);
+ while ((part_entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
+ {
+ entry = &part_entry->relmapentry;
+
+ if (entry->remoterel.remoteid != remoterel->remoteid)
+ continue;
+
+ logicalrep_relmap_free_entry(entry);
+
+ memset(entry, 0, sizeof(LogicalRepRelMapEntry));
+ }
+}
+
/*
* Initialize the partition map cache.
*/
@@ -553,8 +603,20 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
entry = &part_entry->relmapentry;
+ /*
+ * We must always overwrite entry->localrel with the latest partition
+ * Relation pointer, because the Relation pointed to by the old value may
+ * have been cleared after the caller would have closed the partition
+ * relation after the last use of this entry. Note that localrelvalid is
+ * only updated by the relcache invalidation callback, so it may still be
+ * true irrespective of whether the Relation pointed to by localrel has
+ * been cleared or not.
+ */
if (found && entry->localrelvalid)
+ {
+ entry->localrel = partrel;
return entry;
+ }
/* Switch to longer-lived context. */
oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
@@ -565,6 +627,13 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
part_entry->partoid = partOid;
}
+ /* Release the no-longer-useful attrmap, if any. */
+ if (entry->attrmap)
+ {
+ free_attrmap(entry->attrmap);
+ entry->attrmap = NULL;
+ }
+
if (!entry->remoterel.remoteid)
{
int i;
@@ -624,7 +693,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
attrmap->maplen * sizeof(AttrNumber));
}
- entry->updatable = root->updatable;
+ /* Set if the table's replica identity is enough to apply update/delete. */
+ logicalrep_rel_mark_updatable(entry);
entry->localrelvalid = true;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index adcbc36ecef..7190dd94ebf 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1191,6 +1191,9 @@ apply_handle_relation(StringInfo s)
rel = logicalrep_read_rel(s);
logicalrep_relmap_update(rel);
+
+ /* Also reset all entries in the partition map that refer to remoterel. */
+ logicalrep_partmap_reset_relmap(rel);
}
/*
@@ -1320,6 +1323,13 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
static void
check_relation_updatable(LogicalRepRelMapEntry *rel)
{
+ /*
+ * For partitioned tables, we only need to care if the target partition is
+ * updatable (aka has PK or RI defined for it).
+ */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ return;
+
/* Updatable, no error. */
if (rel->updatable)
return;
@@ -1673,6 +1683,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
TupleTableSlot *remoteslot_part;
TupleConversionMap *map;
MemoryContext oldctx;
+ LogicalRepRelMapEntry *part_entry = NULL;
+ AttrMap *attrmap = NULL;
/* ModifyTableState is needed for ExecFindPartition(). */
edata->mtstate = mtstate = makeNode(ModifyTableState);
@@ -1704,8 +1716,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
map = partrelinfo->ri_RootToPartitionMap;
if (map != NULL)
- remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
+ {
+ attrmap = map->attrMap;
+ remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
remoteslot_part);
+ }
else
{
remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
@@ -1713,6 +1728,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
}
MemoryContextSwitchTo(oldctx);
+ /* Check if we can do the update or delete on the leaf partition. */
+ if (operation == CMD_UPDATE || operation == CMD_DELETE)
+ {
+ part_entry = logicalrep_partition_open(relmapentry, partrel,
+ attrmap);
+ check_relation_updatable(part_entry);
+ }
+
switch (operation)
{
case CMD_INSERT:
@@ -1734,15 +1757,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
* suitable partition.
*/
{
- AttrMap *attrmap = map ? map->attrMap : NULL;
- LogicalRepRelMapEntry *part_entry;
TupleTableSlot *localslot;
ResultRelInfo *partrelinfo_new;
bool found;
- part_entry = logicalrep_partition_open(relmapentry, partrel,
- attrmap);
-
/* Get the matching local tuple from the partition. */
found = FindReplTupleInLocalRel(estate, partrel,
&part_entry->remoterel,
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index e81041ae029..7ecd3afe1b9 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -72,7 +72,7 @@
#if defined(WAIT_USE_EPOLL) || defined(WAIT_USE_POLL) || \
defined(WAIT_USE_KQUEUE) || defined(WAIT_USE_WIN32)
/* don't overwrite manual choice */
-#elif defined(HAVE_SYS_EPOLL_H) && defined(HAVE_SYS_SIGNALFD_H)
+#elif defined(HAVE_SYS_EPOLL_H)
#define WAIT_USE_EPOLL
#elif defined(HAVE_KQUEUE)
#define WAIT_USE_KQUEUE
@@ -84,6 +84,22 @@
#error "no wait set implementation available"
#endif
+/*
+ * By default, we use a self-pipe with poll() and a signalfd with epoll(), if
+ * available. We avoid signalfd on illumos for now based on problem reports.
+ * For testing the choice can also be manually specified.
+ */
+#if defined(WAIT_USE_POLL) || defined(WAIT_USE_EPOLL)
+#if defined(WAIT_USE_SELF_PIPE) || defined(WAIT_USE_SIGNALFD)
+/* don't overwrite manual choice */
+#elif defined(WAIT_USE_EPOLL) && defined(HAVE_SYS_SIGNALFD_H) && \
+ !defined(__illumos__)
+#define WAIT_USE_SIGNALFD
+#else
+#define WAIT_USE_SELF_PIPE
+#endif
+#endif
+
/* typedef in latch.h */
struct WaitEventSet
{
@@ -146,12 +162,12 @@ static WaitEventSet *LatchWaitSet;
static volatile sig_atomic_t waiting = false;
#endif
-#ifdef WAIT_USE_EPOLL
+#ifdef WAIT_USE_SIGNALFD
/* On Linux, we'll receive SIGURG via a signalfd file descriptor. */
static int signal_fd = -1;
#endif
-#if defined(WAIT_USE_POLL)
+#ifdef WAIT_USE_SELF_PIPE
/* Read and write ends of the self-pipe */
static int selfpipe_readfd = -1;
static int selfpipe_writefd = -1;
@@ -164,7 +180,7 @@ static void latch_sigurg_handler(SIGNAL_ARGS);
static void sendSelfPipeByte(void);
#endif
-#if defined(WAIT_USE_POLL) || defined(WAIT_USE_EPOLL)
+#if defined(WAIT_USE_SELF_PIPE) || defined(WAIT_USE_SIGNALFD)
static void drain(void);
#endif
@@ -190,7 +206,7 @@ static inline int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
void
InitializeLatchSupport(void)
{
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_SELF_PIPE)
int pipefd[2];
if (IsUnderPostmaster)
@@ -264,7 +280,7 @@ InitializeLatchSupport(void)
pqsignal(SIGURG, latch_sigurg_handler);
#endif
-#ifdef WAIT_USE_EPOLL
+#ifdef WAIT_USE_SIGNALFD
sigset_t signalfd_mask;
/* Block SIGURG, because we'll receive it through a signalfd. */
@@ -316,7 +332,7 @@ ShutdownLatchSupport(void)
LatchWaitSet = NULL;
}
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_SELF_PIPE)
close(selfpipe_readfd);
close(selfpipe_writefd);
selfpipe_readfd = -1;
@@ -324,7 +340,7 @@ ShutdownLatchSupport(void)
selfpipe_owner_pid = InvalidPid;
#endif
-#if defined(WAIT_USE_EPOLL)
+#if defined(WAIT_USE_SIGNALFD)
close(signal_fd);
signal_fd = -1;
#endif
@@ -341,9 +357,12 @@ InitLatch(Latch *latch)
latch->owner_pid = MyProcPid;
latch->is_shared = false;
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_SELF_PIPE)
/* Assert InitializeLatchSupport has been called in this process */
Assert(selfpipe_readfd >= 0 && selfpipe_owner_pid == MyProcPid);
+#elif defined(WAIT_USE_SIGNALFD)
+ /* Assert InitializeLatchSupport has been called in this process */
+ Assert(signal_fd >= 0);
#elif defined(WAIT_USE_WIN32)
latch->event = CreateEvent(NULL, TRUE, FALSE, NULL);
if (latch->event == NULL)
@@ -405,9 +424,12 @@ OwnLatch(Latch *latch)
/* Sanity checks */
Assert(latch->is_shared);
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_SELF_PIPE)
/* Assert InitializeLatchSupport has been called in this process */
Assert(selfpipe_readfd >= 0 && selfpipe_owner_pid == MyProcPid);
+#elif defined(WAIT_USE_SIGNALFD)
+ /* Assert InitializeLatchSupport has been called in this process */
+ Assert(signal_fd >= 0);
#endif
if (latch->owner_pid != 0)
@@ -618,7 +640,7 @@ SetLatch(Latch *latch)
return;
else if (owner_pid == MyProcPid)
{
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_SELF_PIPE)
if (waiting)
sendSelfPipeByte();
#else
@@ -983,9 +1005,9 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
{
set->latch = latch;
set->latch_pos = event->pos;
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_SELF_PIPE)
event->fd = selfpipe_readfd;
-#elif defined(WAIT_USE_EPOLL)
+#elif defined(WAIT_USE_SIGNALFD)
event->fd = signal_fd;
#else
event->fd = PGINVALID_SOCKET;
@@ -2102,7 +2124,7 @@ GetNumRegisteredWaitEvents(WaitEventSet *set)
return set->nevents;
}
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_SELF_PIPE)
/*
* SetLatch uses SIGURG to wake up the process waiting on the latch.
@@ -2153,7 +2175,7 @@ sendSelfPipeByte(void)
#endif
-#if defined(WAIT_USE_POLL) || defined(WAIT_USE_EPOLL)
+#if defined(WAIT_USE_SELF_PIPE) || defined(WAIT_USE_SIGNALFD)
/*
* Read all available data from self-pipe or signalfd.
@@ -2169,7 +2191,7 @@ drain(void)
int rc;
int fd;
-#ifdef WAIT_USE_POLL
+#ifdef WAIT_USE_SELF_PIPE
fd = selfpipe_readfd;
#else
fd = signal_fd;
@@ -2187,7 +2209,7 @@ drain(void)
else
{
waiting = false;
-#ifdef WAIT_USE_POLL
+#ifdef WAIT_USE_SELF_PIPE
elog(ERROR, "read() on self-pipe failed: %m");
#else
elog(ERROR, "read() on signalfd failed: %m");
@@ -2197,7 +2219,7 @@ drain(void)
else if (rc == 0)
{
waiting = false;
-#ifdef WAIT_USE_POLL
+#ifdef WAIT_USE_SELF_PIPE
elog(ERROR, "unexpected EOF on self-pipe");
#else
elog(ERROR, "unexpected EOF on signalfd");
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 57c03cce7d9..89fafdbedc1 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -277,6 +277,11 @@ static ProcArrayStruct *procArray;
static PGPROC *allProcs;
static TMGXACT *allTmGxact;
+/*
+ * Cache to reduce overhead of repeated calls to TransactionIdIsInProgress()
+ */
+static TransactionId cachedXidIsNotInProgress = InvalidTransactionId;
+
/*
* Bookkeeping for tracking emulated transactions in recovery
*/
@@ -1486,7 +1491,7 @@ TransactionIdIsInProgress(TransactionId xid)
* already known to be completed, we can fall out without any access to
* shared memory.
*/
- if (TransactionIdIsKnownCompleted(xid))
+ if (TransactionIdEquals(cachedXidIsNotInProgress, xid))
{
xc_by_known_xact_inc();
return false;
@@ -1644,6 +1649,7 @@ TransactionIdIsInProgress(TransactionId xid)
if (nxids == 0)
{
xc_no_overflow_inc();
+ cachedXidIsNotInProgress = xid;
return false;
}
@@ -1658,7 +1664,10 @@ TransactionIdIsInProgress(TransactionId xid)
xc_slow_answer_inc();
if (TransactionIdDidAbort(xid))
+ {
+ cachedXidIsNotInProgress = xid;
return false;
+ }
/*
* It isn't aborted, so check whether the transaction tree it belongs to
@@ -1676,6 +1685,7 @@ TransactionIdIsInProgress(TransactionId xid)
}
}
+ cachedXidIsNotInProgress = xid;
return false;
}
diff --git a/src/backend/utils/adt/xid8funcs.c b/src/backend/utils/adt/xid8funcs.c
index 78b0b9b6d68..0c9f14a0c83 100644
--- a/src/backend/utils/adt/xid8funcs.c
+++ b/src/backend/utils/adt/xid8funcs.c
@@ -36,6 +36,7 @@
#include "miscadmin.h"
#include "postmaster/postmaster.h"
#include "storage/lwlock.h"
+#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
@@ -810,29 +811,22 @@ pg_xact_status(PG_FUNCTION_ARGS)
{
Assert(TransactionIdIsValid(xid));
- if (TransactionIdIsCurrentTransactionId(xid))
+ /*
+ * Like when doing visiblity checks on a row, check whether the
+ * transaction is still in progress before looking into the CLOG.
+ * Otherwise we would incorrectly return "committed" for a transaction
+ * that is committing and has already updated the CLOG, but hasn't
+ * removed its XID from the proc array yet. (See comment on that race
+ * condition at the top of heapam_visibility.c)
+ */
+ if (TransactionIdIsInProgress(xid))
status = "in progress";
else if (TransactionIdDidCommit(xid))
status = "committed";
- else if (TransactionIdDidAbort(xid))
- status = "aborted";
else
{
- /*
- * The xact is not marked as either committed or aborted in clog.
- *
- * It could be a transaction that ended without updating clog or
- * writing an abort record due to a crash. We can safely assume
- * it's aborted if it isn't committed and is older than our
- * snapshot xmin.
- *
- * Otherwise it must be in-progress (or have been at the time we
- * checked commit/abort status).
- */
- if (TransactionIdPrecedes(xid, GetActiveSnapshot()->xmin))
- status = "aborted";
- else
- status = "in progress";
+ /* it must have aborted or crashed */
+ status = "aborted";
}
}
else
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 1859443ed87..973a6d4639d 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -28,6 +28,7 @@ static void check_proper_datallowconn(ClusterInfo *cluster);
static void check_for_prepared_transactions(ClusterInfo *cluster);
static void check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster);
static void check_for_user_defined_postfix_ops(ClusterInfo *cluster);
+static void check_for_incompatible_polymorphics(ClusterInfo *cluster);
static void check_for_tables_with_oids(ClusterInfo *cluster);
static void check_for_composite_data_type_usage(ClusterInfo *cluster);
static void check_for_reg_data_type_usage(ClusterInfo *cluster);
@@ -157,6 +158,13 @@ check_and_dump_old_cluster(bool live_check, char **sequence_script_file_name)
check_for_removed_data_type_usage(&old_cluster, "12", "tinterval");
}
+ /*
+ * PG 14 changed polymorphic functions from anyarray to
+ * anycompatiblearray.
+ */
+ if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1300)
+ check_for_incompatible_polymorphics(&old_cluster);
+
/*
* Pre-PG 12 allowed tables to be declared WITH OIDS, which is not
* supported anymore. Verify there are none, iff applicable.
@@ -1178,6 +1186,135 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
check_ok();
}
+/*
+ * check_for_incompatible_polymorphics()
+ *
+ * Make sure nothing is using old polymorphic functions with
+ * anyarray/anyelement rather than the new anycompatible variants.
+ */
+static void
+check_for_incompatible_polymorphics(ClusterInfo *cluster)
+{
+ PGresult *res;
+ FILE *script = NULL;
+ char output_path[MAXPGPATH];
+ PQExpBufferData old_polymorphics;
+
+ prep_status("Checking for incompatible polymorphic functions");
+
+ snprintf(output_path, sizeof(output_path),
+ "incompatible_polymorphics.txt");
+
+ /* The set of problematic functions varies a bit in different versions */
+ initPQExpBuffer(&old_polymorphics);
+
+ appendPQExpBufferStr(&old_polymorphics,
+ "'array_append(anyarray,anyelement)'"
+ ", 'array_cat(anyarray,anyarray)'"
+ ", 'array_prepend(anyelement,anyarray)'");
+
+ if (GET_MAJOR_VERSION(cluster->major_version) >= 903)
+ appendPQExpBufferStr(&old_polymorphics,
+ ", 'array_remove(anyarray,anyelement)'"
+ ", 'array_replace(anyarray,anyelement,anyelement)'");
+
+ if (GET_MAJOR_VERSION(cluster->major_version) >= 905)
+ appendPQExpBufferStr(&old_polymorphics,
+ ", 'array_position(anyarray,anyelement)'"
+ ", 'array_position(anyarray,anyelement,integer)'"
+ ", 'array_positions(anyarray,anyelement)'"
+ ", 'width_bucket(anyelement,anyarray)'");
+
+ for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+ {
+ bool db_used = false;
+ DbInfo *active_db = &cluster->dbarr.dbs[dbnum];
+ PGconn *conn = connectToServer(cluster, active_db->db_name);
+ int ntups;
+ int i_objkind,
+ i_objname;
+
+ /*
+ * The query below hardcodes FirstNormalObjectId as 16384 rather than
+ * interpolating that C #define into the query because, if that
+ * #define is ever changed, the cutoff we want to use is the value
+ * used by pre-version 14 servers, not that of some future version.
+ */
+ res = executeQueryOrDie(conn,
+ /* Aggregate transition functions */
+ "SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
+ "FROM pg_proc AS p "
+ "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
+ "JOIN pg_proc AS transfn ON transfn.oid=a.aggtransfn "
+ "WHERE p.oid >= 16384 "
+ "AND a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) "
+ "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+ /* Aggregate final functions */
+ "UNION ALL "
+ "SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
+ "FROM pg_proc AS p "
+ "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
+ "JOIN pg_proc AS finalfn ON finalfn.oid=a.aggfinalfn "
+ "WHERE p.oid >= 16384 "
+ "AND a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) "
+ "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+ /* Operators */
+ "UNION ALL "
+ "SELECT 'operator' AS objkind, op.oid::regoperator::text AS objname "
+ "FROM pg_operator AS op "
+ "WHERE op.oid >= 16384 "
+ "AND oprcode = ANY(ARRAY[%s]::regprocedure[]) "
+ "AND oprleft = ANY(ARRAY['anyarray', 'anyelement']::regtype[]);",
+ old_polymorphics.data,
+ old_polymorphics.data,
+ old_polymorphics.data);
+
+ ntups = PQntuples(res);
+
+ i_objkind = PQfnumber(res, "objkind");
+ i_objname = PQfnumber(res, "objname");
+
+ for (int rowno = 0; rowno < ntups; rowno++)
+ {
+ if (script == NULL &&
+ (script = fopen_priv(output_path, "w")) == NULL)
+ pg_fatal("could not open file \"%s\": %s\n",
+ output_path, strerror(errno));
+ if (!db_used)
+ {
+ fprintf(script, "In database: %s\n", active_db->db_name);
+ db_used = true;
+ }
+
+ fprintf(script, " %s: %s\n",
+ PQgetvalue(res, rowno, i_objkind),
+ PQgetvalue(res, rowno, i_objname));
+ }
+
+ PQclear(res);
+ PQfinish(conn);
+ }
+
+ if (script)
+ {
+ fclose(script);
+ pg_log(PG_REPORT, "fatal\n");
+ pg_fatal("Your installation contains user-defined objects that refer to internal\n"
+ "polymorphic functions with arguments of type 'anyarray' or 'anyelement'.\n"
+ "These user-defined objects must be dropped before upgrading and restored\n"
+ "afterwards, changing them to refer to the new corresponding functions with\n"
+ "arguments of type 'anycompatiblearray' and 'anycompatible'.\n"
+ "A list of the problematic objects is in the file:\n"
+ " %s\n\n", output_path);
+ }
+ else
+ check_ok();
+
+ termPQExpBuffer(&old_polymorphics);
+}
+
/*
* Verify that no tables are declared WITH OIDS.
*/
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index ef1964b709d..fc60fdb9584 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -205,7 +205,6 @@ extern void SPI_commit_and_chain(void);
extern void SPI_rollback(void);
extern void SPI_rollback_and_chain(void);
-extern void SPICleanup(void);
extern void AtEOXact_SPI(bool isCommit);
extern void AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid);
extern bool SPI_inside_nonatomic_context(void);
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 3c662d3abcf..10f91490b5c 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -38,6 +38,7 @@ typedef struct LogicalRepRelMapEntry
} LogicalRepRelMapEntry;
extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
+extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel);
extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
LOCKMODE lockmode);
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 85696712b38..b2c7727f68d 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1313,7 +1313,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
* itself consume commands from the queue; if we're in any other
* state, we don't have to do anything.
*/
- if (conn->asyncStatus == PGASYNC_IDLE)
+ if (conn->asyncStatus == PGASYNC_IDLE ||
+ conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
{
resetPQExpBuffer(&conn->errorMessage);
pqPipelineProcessQueue(conn);
@@ -1372,6 +1373,7 @@ int
PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
{
PGcmdQueueEntry *entry = NULL;
+ PGcmdQueueEntry *entry2 = NULL;
if (!PQsendQueryStart(conn, newQuery))
return 0;
@@ -1387,6 +1389,12 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
entry = pqAllocCmdQueueEntry(conn);
if (entry == NULL)
return 0; /* error msg already set */
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ entry2 = pqAllocCmdQueueEntry(conn);
+ if (entry2 == NULL)
+ goto sendFailed;
+ }
/* Send the query message(s) */
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
@@ -1456,6 +1464,20 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
+
+ /*
+ * When pipeline mode is in use, we need a second entry in the command
+ * queue to represent Close Portal message. This allows us later to wait
+ * for the CloseComplete message to be received before getting in IDLE
+ * state.
+ */
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ entry2->queryclass = PGQUERY_CLOSE;
+ entry2->query = NULL;
+ pqAppendCmdQueueEntry(conn, entry2);
+ }
+
return 1;
sendFailed:
@@ -1702,11 +1724,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
switch (conn->asyncStatus)
{
case PGASYNC_IDLE:
+ case PGASYNC_PIPELINE_IDLE:
case PGASYNC_READY:
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
/* ok to queue */
break;
+
case PGASYNC_COPY_IN:
case PGASYNC_COPY_OUT:
case PGASYNC_COPY_BOTH:
@@ -2082,19 +2106,22 @@ PQgetResult(PGconn *conn)
{
case PGASYNC_IDLE:
res = NULL; /* query is complete */
- if (conn->pipelineStatus != PQ_PIPELINE_OFF)
- {
- /*
- * We're about to return the NULL that terminates the round of
- * results from the current query; prepare to send the results
- * of the next query when we're called next. Also, since this
- * is the start of the results of the next query, clear any
- * prior error message.
- */
- resetPQExpBuffer(&conn->errorMessage);
- pqPipelineProcessQueue(conn);
- }
break;
+ case PGASYNC_PIPELINE_IDLE:
+ Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
+
+ /*
+ * We're about to return the NULL that terminates the round of
+ * results from the current query; prepare to send the results
+ * of the next query, if any, when we're called next. If there's
+ * no next element in the command queue, this gets us in IDLE
+ * state.
+ */
+ resetPQExpBuffer(&conn->errorMessage);
+ pqPipelineProcessQueue(conn);
+ res = NULL; /* query is complete */
+ break;
+
case PGASYNC_READY:
/*
@@ -2115,7 +2142,7 @@ PQgetResult(PGconn *conn)
* We're about to send the results of the current query. Set
* us idle now, and ...
*/
- conn->asyncStatus = PGASYNC_IDLE;
+ conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
/*
* ... in cases when we're sending a pipeline-sync result,
@@ -2159,6 +2186,22 @@ PQgetResult(PGconn *conn)
break;
}
+ /* If the next command we expect is CLOSE, read and consume it */
+ if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE &&
+ conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
+ {
+ if (res && res->resultStatus != PGRES_FATAL_ERROR)
+ {
+ conn->asyncStatus = PGASYNC_BUSY;
+ parseInput(conn);
+ conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
+ }
+ else
+ /* we won't ever see the Close */
+ pqCommandQueueAdvance(conn);
+ }
+
if (res)
{
int i;
@@ -2967,7 +3010,10 @@ PQexitPipelineMode(PGconn *conn)
if (!conn)
return 0;
- if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF &&
+ (conn->asyncStatus == PGASYNC_IDLE ||
+ conn->asyncStatus == PGASYNC_PIPELINE_IDLE) &&
+ conn->cmd_queue_head == NULL)
return 1;
switch (conn->asyncStatus)
@@ -2984,9 +3030,16 @@ PQexitPipelineMode(PGconn *conn)
libpq_gettext("cannot exit pipeline mode while busy\n"));
return 0;
- default:
+ case PGASYNC_IDLE:
+ case PGASYNC_PIPELINE_IDLE:
/* OK */
break;
+
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("cannot exit pipeline mode while in COPY\n"));
}
/* still work to process */
@@ -3023,6 +3076,10 @@ pqCommandQueueAdvance(PGconn *conn)
prevquery = conn->cmd_queue_head;
conn->cmd_queue_head = conn->cmd_queue_head->next;
+ /* If the queue is now empty, reset the tail too */
+ if (conn->cmd_queue_head == NULL)
+ conn->cmd_queue_tail = NULL;
+
/* and make it recyclable */
prevquery->next = NULL;
pqRecycleCmdQueueEntry(conn, prevquery);
@@ -3045,15 +3102,35 @@ pqPipelineProcessQueue(PGconn *conn)
case PGASYNC_BUSY:
/* client still has to process current query or results */
return;
+
case PGASYNC_IDLE:
+ /*
+ * If we're in IDLE mode and there's some command in the queue,
+ * get us into PIPELINE_IDLE mode and process normally. Otherwise
+ * there's nothing for us to do.
+ */
+ if (conn->cmd_queue_head != NULL)
+ {
+ conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
+ break;
+ }
+ return;
+
+ case PGASYNC_PIPELINE_IDLE:
+ Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
/* next query please */
break;
}
- /* Nothing to do if not in pipeline mode, or queue is empty */
- if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
- conn->cmd_queue_head == NULL)
+ /*
+ * If there are no further commands to process in the queue, get us in
+ * "real idle" mode now.
+ */
+ if (conn->cmd_queue_head == NULL)
+ {
+ conn->asyncStatus = PGASYNC_IDLE;
return;
+ }
/* Initialize async result-accumulation state */
pqClearAsyncResult(conn);
@@ -3140,6 +3217,7 @@ PQpipelineSync(PGconn *conn)
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
case PGASYNC_IDLE:
+ case PGASYNC_PIPELINE_IDLE:
/* OK to send sync */
break;
}
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 9d74dd0e39d..5311a40a147 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -192,18 +192,6 @@ pqParseInput3(PGconn *conn)
if (conn->asyncStatus != PGASYNC_IDLE)
return;
- /*
- * We're also notionally not-IDLE when in pipeline mode the state
- * says "idle" (so we have completed receiving the results of one
- * query from the server and dispatched them to the application)
- * but another query is queued; yield back control to caller so
- * that they can initiate processing of the next query in the
- * queue.
- */
- if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
- conn->cmd_queue_head != NULL)
- return;
-
/*
* Unexpected message in IDLE state; need to recover somehow.
* ERROR messages are handled using the notice processor;
@@ -330,8 +318,24 @@ pqParseInput3(PGconn *conn)
}
break;
case '2': /* Bind Complete */
+ /* Nothing to do for this message type */
+ break;
case '3': /* Close Complete */
- /* Nothing to do for these message types */
+ /*
+ * If we get CloseComplete when waiting for it, consume
+ * the queue element and keep going. A result is not
+ * expected from this message; it is just there so that
+ * we know to wait for it when PQsendQuery is used in
+ * pipeline mode, before going in IDLE state. Failing to
+ * do this makes us receive CloseComplete when IDLE, which
+ * creates problems.
+ */
+ if (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
+ {
+ pqCommandQueueAdvance(conn);
+ }
+
break;
case 'S': /* parameter status */
if (getParameterStatus(conn))
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 70094e5fb70..0cbd611bd98 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -268,7 +268,8 @@ typedef enum
* query */
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
- PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
+ PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */
+ PGASYNC_PIPELINE_IDLE, /* "Idle" between commands in pipeline mode */
} PGAsyncStatusType;
/* Target server type (decoded value of target_session_attrs) */
@@ -354,7 +355,8 @@ typedef enum
PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
PGQUERY_PREPARE, /* Parse only (PQprepare) */
PGQUERY_DESCRIBE, /* Describe Statement or Portal */
- PGQUERY_SYNC /* Sync (at end of a pipeline) */
+ PGQUERY_SYNC, /* Sync (at end of a pipeline) */
+ PGQUERY_CLOSE
} PGQueryClass;
/*
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index c27c4e0adaf..dfab924965d 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -581,8 +581,6 @@ test_pipeline_abort(PGconn *conn)
if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
pg_fatal("exiting pipeline mode didn't seem to work");
- fprintf(stderr, "ok\n");
-
/*-
* Since we fired the pipelines off without a surrounding xact, the results
* should be:
@@ -614,6 +612,8 @@ test_pipeline_abort(PGconn *conn)
}
PQclear(res);
+
+ fprintf(stderr, "ok\n");
}
/* State machine enum for test_pipelined_insert */
@@ -968,6 +968,207 @@ test_prepared(PGconn *conn)
fprintf(stderr, "ok\n");
}
+/* Notice processor: print notices, and count how many we got */
+static void
+notice_processor(void *arg, const char *message)
+{
+ int *n_notices = (int *) arg;
+
+ (*n_notices)++;
+ fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
+}
+
+/* Verify behavior in "idle" state */
+static void
+test_pipeline_idle(PGconn *conn)
+{
+ PGresult *res;
+ int n_notices = 0;
+
+ fprintf(stderr, "\npipeline idle...\n");
+
+ PQsetNoticeProcessor(conn, notice_processor, &n_notices);
+
+ /*
+ * Cause a Close message to be sent to the server, and watch libpq's
+ * reaction to the resulting CloseComplete. libpq must not get in IDLE
+ * state until that has been received.
+ */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+ if (PQsendQuery(conn, "SELECT 1") != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Unexpected result code %s from first pipeline item",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("expected NULL result");
+
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+ PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+ PQclear(res);
+ res = NULL;
+
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+ PQerrorMessage(conn));
+
+ /*
+ * Must not have got any notices here; note bug as described in
+ * https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
+ */
+ if (n_notices > 0)
+ pg_fatal("got %d notice(s)", n_notices);
+ fprintf(stderr, "ok - 1\n");
+
+ /*
+ * Verify that we can send a query using simple query protocol after one
+ * in pipeline mode.
+ */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+ if (PQsendQuery(conn, "SELECT 1") != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("unexpected result code %s from first pipeline item",
+ PQresStatus(PQresultStatus(res)));
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("got unexpected non-null result");
+ /* We can exit pipeline mode now */
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+ PQerrorMessage(conn));
+ res = PQexec(conn, "SELECT 2");
+ if (n_notices > 0)
+ pg_fatal("got %d notice(s)", n_notices);
+ if (res == NULL)
+ pg_fatal("PQexec returned NULL");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("unexpected result code %s from non-pipeline query",
+ PQresStatus(PQresultStatus(res)));
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("did not receive terminating NULL");
+ if (n_notices > 0)
+ pg_fatal("got %d notice(s)", n_notices);
+ fprintf(stderr, "ok - 2\n");
+
+ /*
+ * Case 2: exiting pipeline mode is not OK if a second command is sent.
+ */
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+ if (PQsendQuery(conn, "SELECT 1") != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("unexpected result code %s from first pipeline item",
+ PQresStatus(PQresultStatus(res)));
+ if (PQsendQuery(conn, "SELECT 2") != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+ /* read terminating null from first query */
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("did not receive terminating NULL");
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("unexpected result code %s from first pipeline item",
+ PQresStatus(PQresultStatus(res)));
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("did not receive terminating NULL");
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+ PQerrorMessage(conn));
+
+ /* Try to exit pipeline mode in pipeline-idle state */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+ if (PQsendQuery(conn, "SELECT 1") != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("unexpected result code %s from first pipeline item",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("did not receive terminating NULL");
+ if (PQsendQuery(conn, "SELECT 2") != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ if (PQexitPipelineMode(conn) == 1)
+ pg_fatal("exiting pipeline succeeded when it shouldn't");
+ if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
+ strlen("cannot exit pipeline mode")) != 0)
+ pg_fatal("did not get expected error; got: %s",
+ PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("unexpected result code %s from second pipeline item",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("did not receive terminating NULL");
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
+
+ if (n_notices > 0)
+ pg_fatal("got %d notice(s)", n_notices);
+ fprintf(stderr, "ok - 3\n");
+
+ /* Have a WARNING in the middle of a resultset */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
+ if (PQsendQuery(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)") != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("unexpected NULL result received");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
+ fprintf(stderr, "ok - 4\n");
+}
+
static void
test_simple_pipeline(PGconn *conn)
{
@@ -1160,6 +1361,8 @@ test_singlerowmode(PGconn *conn)
if (PQexitPipelineMode(conn) != 1)
pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
+
+ fprintf(stderr, "ok\n");
}
/*
@@ -1549,6 +1752,7 @@ print_test_list(void)
printf("multi_pipelines\n");
printf("nosync\n");
printf("pipeline_abort\n");
+ printf("pipeline_idle\n");
printf("pipelined_insert\n");
printf("prepared\n");
printf("simple_pipeline\n");
@@ -1630,7 +1834,10 @@ main(int argc, char **argv)
/* Set the trace file, if requested */
if (tracefile != NULL)
{
- trace = fopen(tracefile, "w");
+ if (strcmp(tracefile, "-") == 0)
+ trace = stdout;
+ else
+ trace = fopen(tracefile, "w");
if (trace == NULL)
pg_fatal("could not open file \"%s\": %m", tracefile);
@@ -1650,6 +1857,8 @@ main(int argc, char **argv)
test_nosync(conn);
else if (strcmp(testname, "pipeline_abort") == 0)
test_pipeline_abort(conn);
+ else if (strcmp(testname, "pipeline_idle") == 0)
+ test_pipeline_idle(conn);
else if (strcmp(testname, "pipelined_insert") == 0)
test_pipelined_insert(conn, numrows);
else if (strcmp(testname, "prepared") == 0)
diff --git a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
index d8d496c995a..b02928cad29 100644
--- a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
+++ b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
@@ -26,7 +26,8 @@
my @extraargs = ('-r', $numrows);
my $cmptrace = grep(/^$testname$/,
qw(simple_pipeline nosync multi_pipelines prepared singlerow
- pipeline_abort transaction disallowed_in_pipeline)) > 0;
+ pipeline_abort pipeline_idle transaction
+ disallowed_in_pipeline)) > 0;
# For a bunch of tests, generate a libpq trace file too.
my $traceout = "$TestLib::tmp_check/traces/$testname.trace";
diff --git a/src/test/modules/libpq_pipeline/traces/pipeline_idle.trace b/src/test/modules/libpq_pipeline/traces/pipeline_idle.trace
new file mode 100644
index 00000000000..3957ee4dfe1
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/traces/pipeline_idle.trace
@@ -0,0 +1,93 @@
+F 16 Parse "" "SELECT 1" 0
+F 12 Bind "" "" 0 0 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 6 Close P ""
+F 4 Flush
+B 4 ParseComplete
+B 4 BindComplete
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '1'
+B 13 CommandComplete "SELECT 1"
+B 4 CloseComplete
+F 4 Sync
+B 5 ReadyForQuery I
+F 16 Parse "" "SELECT 1" 0
+F 12 Bind "" "" 0 0 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 6 Close P ""
+F 4 Flush
+B 4 ParseComplete
+B 4 BindComplete
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '1'
+B 13 CommandComplete "SELECT 1"
+B 4 CloseComplete
+F 13 Query "SELECT 2"
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '2'
+B 13 CommandComplete "SELECT 1"
+B 5 ReadyForQuery I
+F 16 Parse "" "SELECT 1" 0
+F 12 Bind "" "" 0 0 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 6 Close P ""
+F 4 Flush
+B 4 ParseComplete
+B 4 BindComplete
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '1'
+B 13 CommandComplete "SELECT 1"
+B 4 CloseComplete
+F 16 Parse "" "SELECT 2" 0
+F 12 Bind "" "" 0 0 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 6 Close P ""
+F 4 Flush
+B 4 ParseComplete
+B 4 BindComplete
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '2'
+B 13 CommandComplete "SELECT 1"
+B 4 CloseComplete
+F 16 Parse "" "SELECT 1" 0
+F 12 Bind "" "" 0 0 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 6 Close P ""
+F 4 Flush
+B 4 ParseComplete
+B 4 BindComplete
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '1'
+B 13 CommandComplete "SELECT 1"
+B 4 CloseComplete
+F 16 Parse "" "SELECT 2" 0
+F 12 Bind "" "" 0 0 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 6 Close P ""
+F 4 Flush
+B 4 ParseComplete
+B 4 BindComplete
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '2'
+B 13 CommandComplete "SELECT 1"
+B 4 CloseComplete
+F 49 Parse "" "SELECT pg_catalog.pg_advisory_unlock(1,1)" 0
+F 12 Bind "" "" 0 0 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 6 Close P ""
+F 4 Flush
+B 4 ParseComplete
+B 4 BindComplete
+B 43 RowDescription 1 "pg_advisory_unlock" NNNN 0 NNNN 1 -1 0
+B NN NoticeResponse S "WARNING" V "WARNING" C "01000" M "you don't own a lock of type ExclusiveLock" F "SSSS" L "SSSS" R "SSSS" \x00
+B 11 DataRow 1 1 'f'
+B 13 CommandComplete "SELECT 1"
+B 4 CloseComplete
+F 4 Terminate
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index 12339c23de1..14e9138a394 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -1,15 +1,13 @@
# Copyright (c) 2022, PostgreSQL Global Development Group
-# allow use of release 15+ perl namespace in older branches
-# just 'use' the older module name.
-# See PostgresNode.pm for function implementations
-
-package PostgreSQL::Test::Cluster;
+# Allow use of release 15+ Perl package name in older branches, by giving that
+# package the same symbol table as the older package. See PostgresNode::new
+# for supporting heuristics.
use strict;
use warnings;
-
-use PostgresNode;
+BEGIN { *PostgreSQL::Test::Cluster:: = \*PostgresNode::; }
+use PostgresNode ();
1;
diff --git a/src/test/perl/PostgreSQL/Test/Utils.pm b/src/test/perl/PostgreSQL/Test/Utils.pm
index bdbbd6e4706..2d15bbf21d7 100644
--- a/src/test/perl/PostgreSQL/Test/Utils.pm
+++ b/src/test/perl/PostgreSQL/Test/Utils.pm
@@ -1,48 +1,11 @@
# Copyright (c) 2022, PostgreSQL Global Development Group
-# allow use of release 15+ perl namespace in older branches
-# just 'use' the older module name.
-# We export the same names as the v15 module.
-# See TestLib.pm for alias assignment that makes this all work.
-
-package PostgreSQL::Test::Utils;
+# Allow use of release 15+ Perl package name in older branches, by giving that
+# package the same symbol table as the older package.
use strict;
use warnings;
-
-use Exporter 'import';
-
-use TestLib;
-
-our @EXPORT = qw(
- generate_ascii_string
- slurp_dir
- slurp_file
- append_to_file
- check_mode_recursive
- chmod_recursive
- check_pg_config
- dir_symlink
- system_or_bail
- system_log
- run_log
- run_command
- pump_until
-
- command_ok
- command_fails
- command_exit_is
- program_help_ok
- program_version_ok
- program_options_handling_ok
- command_like
- command_like_safe
- command_fails_like
- command_checks_all
-
- $windows_os
- $is_msys2
- $use_unix_sockets
-);
+BEGIN { *PostgreSQL::Test::Utils:: = \*TestLib::; }
+use TestLib ();
1;
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 9e6d4c653b9..241ed8d49e8 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -162,6 +162,17 @@ of finding port numbers, registering instances for cleanup, etc.
sub new
{
my ($class, $name, $pghost, $pgport) = @_;
+
+ # Use release 15+ semantics when the arguments look like (node_name,
+ # %params). We can't use $class to decide, because get_new_node() passes
+ # a v14- argument list regardless of the class. $class might be an
+ # out-of-core subclass. $class->isa('PostgresNode') returns true even for
+ # descendants of PostgreSQL::Test::Cluster, so it doesn't help.
+ return $class->get_new_node(@_[ 1 .. $#_ ])
+ if !$pghost
+ or !$pgport
+ or $pghost =~ /^[a-zA-Z0-9_]$/;
+
my $testname = basename($0);
$testname =~ s/\.[^.]+$//;
@@ -3068,18 +3079,4 @@ sub corrupt_page_checksum
=cut
-# support release 15+ perl module namespace
-
-package PostgreSQL::Test::Cluster; ## no critic (ProhibitMultiplePackages)
-
-sub new
-{
- shift; # remove class param from args
- return PostgresNode->get_new_node(@_);
-}
-
-no warnings 'once';
-
-*get_free_port = *PostgresNode::get_free_port;
-
1;
diff --git a/src/test/perl/TestLib.pm b/src/test/perl/TestLib.pm
index f3ee20af41c..610050e1c4b 100644
--- a/src/test/perl/TestLib.pm
+++ b/src/test/perl/TestLib.pm
@@ -979,46 +979,4 @@ sub command_checks_all
=cut
-# support release 15+ perl module namespace
-
-package PostgreSQL::Test::Utils; ## no critic (ProhibitMultiplePackages)
-
-# we don't want to export anything here, but we want to support things called
-# via this package name explicitly.
-
-# use typeglobs to alias these functions and variables
-
-no warnings qw(once);
-
-*generate_ascii_string = *TestLib::generate_ascii_string;
-*slurp_dir = *TestLib::slurp_dir;
-*slurp_file = *TestLib::slurp_file;
-*append_to_file = *TestLib::append_to_file;
-*check_mode_recursive = *TestLib::check_mode_recursive;
-*chmod_recursive = *TestLib::chmod_recursive;
-*check_pg_config = *TestLib::check_pg_config;
-*dir_symlink = *TestLib::dir_symlink;
-*system_or_bail = *TestLib::system_or_bail;
-*system_log = *TestLib::system_log;
-*run_log = *TestLib::run_log;
-*run_command = *TestLib::run_command;
-*command_ok = *TestLib::command_ok;
-*command_fails = *TestLib::command_fails;
-*command_exit_is = *TestLib::command_exit_is;
-*program_help_ok = *TestLib::program_help_ok;
-*program_version_ok = *TestLib::program_version_ok;
-*program_options_handling_ok = *TestLib::program_options_handling_ok;
-*command_like = *TestLib::command_like;
-*command_like_safe = *TestLib::command_like_safe;
-*command_fails_like = *TestLib::command_fails_like;
-*command_checks_all = *TestLib::command_checks_all;
-
-*windows_os = *TestLib::windows_os;
-*is_msys2 = *TestLib::is_msys2;
-*use_unix_sockets = *TestLib::use_unix_sockets;
-*timeout_default = *TestLib::timeout_default;
-*tmp_check = *TestLib::tmp_check;
-*log_path = *TestLib::log_path;
-*test_logfile = *TestLib::test_log_file;
-
1;
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index e53bc5b568f..dfe2cb6deae 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -6,7 +6,7 @@
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 69;
+use Test::More tests => 71;
# setup
@@ -841,3 +841,32 @@ BEGIN
$result = $node_subscriber2->safe_psql('postgres',
"SELECT a, b, c FROM tab5 ORDER BY 1");
is($result, qq(3|1|), 'updates of tab5 replicated correctly after altering table on subscriber');
+
+# Test that replication into the partitioned target table continues to
+# work correctly when the published table is altered.
+$node_publisher->safe_psql(
+ 'postgres', q{
+ ALTER TABLE tab5 DROP COLUMN b, ADD COLUMN c INT;
+ ALTER TABLE tab5 ADD COLUMN b INT;});
+
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET c = 1 WHERE a = 3");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b, c FROM tab5 ORDER BY 1");
+is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on publisher');
+
+# Test that replication works correctly as long as the leaf partition
+# has the necessary REPLICA IDENTITY, even though the actual target
+# partitioned table does not.
+$node_subscriber2->safe_psql('postgres',
+ "ALTER TABLE tab5 REPLICA IDENTITY NOTHING");
+
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 4 WHERE a = 3");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b, c FROM tab5_1 ORDER BY 1");
+is($result, qq(4||1), 'updates of tab5 replicated correctly');