Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/src/sgml/high-availability.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -2194,7 +2194,7 @@ HINT: You can then restart the server after making the necessary configuration
Currently, temporary table creation is not allowed during read-only
transactions, so in some cases existing scripts will not run correctly.
This restriction might be relaxed in a later release. This is
both an SQL Standard compliance issue and a technical issue.
both an SQL standard compliance issue and a technical issue.
</para>

<para>
Expand Down
7 changes: 4 additions & 3 deletions doc/src/sgml/mvcc.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,10 @@

<para>
The table also shows that PostgreSQL's Repeatable Read implementation
does not allow phantom reads. Stricter behavior is permitted by the
SQL standard: the four isolation levels only define which phenomena
must not happen, not which phenomena <emphasis>must</emphasis> happen.
does not allow phantom reads. This is acceptable under the SQL
standard because the standard specifies which anomalies must
<emphasis>not</emphasis> occur at certain isolation levels; higher
guarantees are acceptable.
The behavior of the available isolation levels is detailed in the
following subsections.
</para>
Expand Down
2 changes: 1 addition & 1 deletion doc/src/sgml/plpgsql.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -3767,7 +3767,7 @@ RAISE ;

<para>
After <replaceable class="parameter">level</replaceable> if any,
you can write a <replaceable class="parameter">format</replaceable>
you can specify a <replaceable class="parameter">format</replaceable> string
(which must be a simple string literal, not an expression). The
format string specifies the error message text to be reported.
The format string can be followed
Expand Down
10 changes: 0 additions & 10 deletions src/backend/executor/spi.c
Original file line number Diff line number Diff line change
Expand Up @@ -442,16 +442,6 @@ SPI_rollback_and_chain(void)
_SPI_rollback(true);
}

/*
* SPICleanup is a no-op, kept for backwards compatibility. We rely on
* AtEOXact_SPI to cleanup. Extensions should not (need to) fiddle with the
* internal SPI state directly.
*/
void
SPICleanup(void)
{
}

/*
* Clean up SPI state at transaction commit or abort.
*/
Expand Down
170 changes: 120 additions & 50 deletions src/backend/replication/logical/relation.c
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,67 @@ logicalrep_report_missing_attrs(LogicalRepRelation *remoterel,
}
}

/*
* Check if replica identity matches and mark the updatable flag.
*
* We allow for stricter replica identity (fewer columns) on subscriber as
* that will not stop us from finding unique tuple. IE, if publisher has
* identity (id,timestamp) and subscriber just (id) this will not be a
* problem, but in the opposite scenario it will.
*
* We just mark the relation entry as not updatable here if the local
* replica identity is found to be insufficient for applying
* updates/deletes (inserts don't care!) and leave it to
* check_relation_updatable() to throw the actual error if needed.
*/
static void
logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry)
{
Bitmapset *idkey;
LogicalRepRelation *remoterel = &entry->remoterel;
int i;

entry->updatable = true;

idkey = RelationGetIndexAttrBitmap(entry->localrel,
INDEX_ATTR_BITMAP_IDENTITY_KEY);
/* fallback to PK if no replica identity */
if (idkey == NULL)
{
idkey = RelationGetIndexAttrBitmap(entry->localrel,
INDEX_ATTR_BITMAP_PRIMARY_KEY);

/*
* If no replica identity index and no PK, the published table must
* have replica identity FULL.
*/
if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
entry->updatable = false;
}

i = -1;
while ((i = bms_next_member(idkey, i)) >= 0)
{
int attnum = i + FirstLowInvalidHeapAttributeNumber;

if (!AttrNumberIsForUserDefinedAttr(attnum))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication target relation \"%s.%s\" uses "
"system columns in REPLICA IDENTITY index",
remoterel->nspname, remoterel->relname)));

attnum = AttrNumberGetAttrOffset(attnum);

if (entry->attrmap->attnums[attnum] < 0 ||
!bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
{
entry->updatable = false;
break;
}
}
}

/*
* Open the local relation associated with the remote one.
*
Expand Down Expand Up @@ -307,7 +368,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
if (!entry->localrelvalid)
{
Oid relid;
Bitmapset *idkey;
TupleDesc desc;
MemoryContext oldctx;
int i;
Expand All @@ -316,7 +376,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
/* Release the no-longer-useful attrmap, if any. */
if (entry->attrmap)
{
pfree(entry->attrmap);
free_attrmap(entry->attrmap);
entry->attrmap = NULL;
}

Expand Down Expand Up @@ -373,54 +433,10 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
bms_free(missingatts);

/*
* Check that replica identity matches. We allow for stricter replica
* identity (fewer columns) on subscriber as that will not stop us
* from finding unique tuple. IE, if publisher has identity
* (id,timestamp) and subscriber just (id) this will not be a problem,
* but in the opposite scenario it will.
*
* Don't throw any error here just mark the relation entry as not
* updatable, as replica identity is only for updates and deletes but
* inserts can be replicated even without it.
* Set if the table's replica identity is enough to apply
* update/delete.
*/
entry->updatable = true;
idkey = RelationGetIndexAttrBitmap(entry->localrel,
INDEX_ATTR_BITMAP_IDENTITY_KEY);
/* fallback to PK if no replica identity */
if (idkey == NULL)
{
idkey = RelationGetIndexAttrBitmap(entry->localrel,
INDEX_ATTR_BITMAP_PRIMARY_KEY);

/*
* If no replica identity index and no PK, the published table
* must have replica identity FULL.
*/
if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
entry->updatable = false;
}

i = -1;
while ((i = bms_next_member(idkey, i)) >= 0)
{
int attnum = i + FirstLowInvalidHeapAttributeNumber;

if (!AttrNumberIsForUserDefinedAttr(attnum))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication target relation \"%s.%s\" uses "
"system columns in REPLICA IDENTITY index",
remoterel->nspname, remoterel->relname)));

attnum = AttrNumberGetAttrOffset(attnum);

if (entry->attrmap->attnums[attnum] < 0 ||
!bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
{
entry->updatable = false;
break;
}
}
logicalrep_rel_mark_updatable(entry);

entry->localrelvalid = true;
}
Expand Down Expand Up @@ -493,6 +509,40 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
}
}

/*
* Reset the entries in the partition map that refer to remoterel.
*
* Called when new relation mapping is sent by the publisher to update our
* expected view of incoming data from said publisher.
*
* Note that we don't update the remoterel information in the entry here,
* we will update the information in logicalrep_partition_open to avoid
* unnecessary work.
*/
void
logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
{
HASH_SEQ_STATUS status;
LogicalRepPartMapEntry *part_entry;
LogicalRepRelMapEntry *entry;

if (LogicalRepPartMap == NULL)
return;

hash_seq_init(&status, LogicalRepPartMap);
while ((part_entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
{
entry = &part_entry->relmapentry;

if (entry->remoterel.remoteid != remoterel->remoteid)
continue;

logicalrep_relmap_free_entry(entry);

memset(entry, 0, sizeof(LogicalRepRelMapEntry));
}
}

/*
* Initialize the partition map cache.
*/
Expand Down Expand Up @@ -553,8 +603,20 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,

entry = &part_entry->relmapentry;

/*
* We must always overwrite entry->localrel with the latest partition
* Relation pointer, because the Relation pointed to by the old value may
* have been cleared after the caller would have closed the partition
* relation after the last use of this entry. Note that localrelvalid is
* only updated by the relcache invalidation callback, so it may still be
* true irrespective of whether the Relation pointed to by localrel has
* been cleared or not.
*/
if (found && entry->localrelvalid)
{
entry->localrel = partrel;
return entry;
}

/* Switch to longer-lived context. */
oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
Expand All @@ -565,6 +627,13 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
part_entry->partoid = partOid;
}

/* Release the no-longer-useful attrmap, if any. */
if (entry->attrmap)
{
free_attrmap(entry->attrmap);
entry->attrmap = NULL;
}

if (!entry->remoterel.remoteid)
{
int i;
Expand Down Expand Up @@ -624,7 +693,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
attrmap->maplen * sizeof(AttrNumber));
}

entry->updatable = root->updatable;
/* Set if the table's replica identity is enough to apply update/delete. */
logicalrep_rel_mark_updatable(entry);

entry->localrelvalid = true;

Expand Down
30 changes: 24 additions & 6 deletions src/backend/replication/logical/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,9 @@ apply_handle_relation(StringInfo s)

rel = logicalrep_read_rel(s);
logicalrep_relmap_update(rel);

/* Also reset all entries in the partition map that refer to remoterel. */
logicalrep_partmap_reset_relmap(rel);
}

/*
Expand Down Expand Up @@ -1320,6 +1323,13 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
static void
check_relation_updatable(LogicalRepRelMapEntry *rel)
{
/*
* For partitioned tables, we only need to care if the target partition is
* updatable (aka has PK or RI defined for it).
*/
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
return;

/* Updatable, no error. */
if (rel->updatable)
return;
Expand Down Expand Up @@ -1673,6 +1683,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
TupleTableSlot *remoteslot_part;
TupleConversionMap *map;
MemoryContext oldctx;
LogicalRepRelMapEntry *part_entry = NULL;
AttrMap *attrmap = NULL;

/* ModifyTableState is needed for ExecFindPartition(). */
edata->mtstate = mtstate = makeNode(ModifyTableState);
Expand Down Expand Up @@ -1704,15 +1716,26 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
map = partrelinfo->ri_RootToPartitionMap;
if (map != NULL)
remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
{
attrmap = map->attrMap;
remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
remoteslot_part);
}
else
{
remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
slot_getallattrs(remoteslot_part);
}
MemoryContextSwitchTo(oldctx);

/* Check if we can do the update or delete on the leaf partition. */
if (operation == CMD_UPDATE || operation == CMD_DELETE)
{
part_entry = logicalrep_partition_open(relmapentry, partrel,
attrmap);
check_relation_updatable(part_entry);
}

switch (operation)
{
case CMD_INSERT:
Expand All @@ -1734,15 +1757,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
* suitable partition.
*/
{
AttrMap *attrmap = map ? map->attrMap : NULL;
LogicalRepRelMapEntry *part_entry;
TupleTableSlot *localslot;
ResultRelInfo *partrelinfo_new;
bool found;

part_entry = logicalrep_partition_open(relmapentry, partrel,
attrmap);

/* Get the matching local tuple from the partition. */
found = FindReplTupleInLocalRel(estate, partrel,
&part_entry->remoterel,
Expand Down
1 change: 0 additions & 1 deletion src/include/executor/spi.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ extern void SPI_commit_and_chain(void);
extern void SPI_rollback(void);
extern void SPI_rollback_and_chain(void);

extern void SPICleanup(void);
extern void AtEOXact_SPI(bool isCommit);
extern void AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid);
extern bool SPI_inside_nonatomic_context(void);
Expand Down
1 change: 1 addition & 0 deletions src/include/replication/logicalrelation.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ typedef struct LogicalRepRelMapEntry
} LogicalRepRelMapEntry;

extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel);

extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
LOCKMODE lockmode);
Expand Down
9 changes: 6 additions & 3 deletions src/test/perl/PostgreSQL/Test/Cluster.pm
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@

# Copyright (c) 2022, PostgreSQL Global Development Group

# allow use of release 15+ perl namespace in older branches
# just 'use' the older module name.
# See PostgresNode.pm for function implementations
# Allow use of release 15+ Perl package name in older branches, by giving that
# package the same symbol table as the older package. See PostgresNode::new
# for supporting heuristics.

package PostgreSQL::Test::Cluster;

use strict;
use warnings;

use PostgresNode;
BEGIN { *PostgreSQL::Test::Cluster:: = \*PostgresNode::; }

use Exporter 'import';

1;
Loading
Loading