Fix partition table's REPLICA IDENTITY checking on the subscriber.
In logical replication, we will check if the target table on the subscriber is updatable by comparing the replica identity of the table on the publisher with the table on the subscriber. When the target table is a partitioned table, we only check its replica identity but not for the partition tables. This leads to assertion failure while applying changes for update/delete as we expect those to succeed only when the corresponding partition table has a primary key or has a replica identity defined. Fix it by checking the replica identity of the partition table while applying changes. Reported-by: Shi Yu Author: Shi Yu, Hou Zhijie Reviewed-by: Amit Langote, Amit Kapila Backpatch-through: 13, where it was introduced Discussion: https://postgr.es/m/OSZPR01MB6310F46CD425A967E4AEF736FDA49@OSZPR01MB6310.jpnprd01.prod.outlook.com
This commit is contained in:
parent
2253f5b497
commit
26b3455afa
@ -249,6 +249,67 @@ logicalrep_report_missing_attrs(LogicalRepRelation *remoterel,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Check if replica identity matches and mark the updatable flag.
|
||||||
|
*
|
||||||
|
* We allow for stricter replica identity (fewer columns) on subscriber as
|
||||||
|
* that will not stop us from finding unique tuple. IE, if publisher has
|
||||||
|
* identity (id,timestamp) and subscriber just (id) this will not be a
|
||||||
|
* problem, but in the opposite scenario it will.
|
||||||
|
*
|
||||||
|
* We just mark the relation entry as not updatable here if the local
|
||||||
|
* replica identity is found to be insufficient for applying
|
||||||
|
* updates/deletes (inserts don't care!) and leave it to
|
||||||
|
* check_relation_updatable() to throw the actual error if needed.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry)
|
||||||
|
{
|
||||||
|
Bitmapset *idkey;
|
||||||
|
LogicalRepRelation *remoterel = &entry->remoterel;
|
||||||
|
int i;
|
||||||
|
|
||||||
|
entry->updatable = true;
|
||||||
|
|
||||||
|
idkey = RelationGetIndexAttrBitmap(entry->localrel,
|
||||||
|
INDEX_ATTR_BITMAP_IDENTITY_KEY);
|
||||||
|
/* fallback to PK if no replica identity */
|
||||||
|
if (idkey == NULL)
|
||||||
|
{
|
||||||
|
idkey = RelationGetIndexAttrBitmap(entry->localrel,
|
||||||
|
INDEX_ATTR_BITMAP_PRIMARY_KEY);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If no replica identity index and no PK, the published table must
|
||||||
|
* have replica identity FULL.
|
||||||
|
*/
|
||||||
|
if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
|
||||||
|
entry->updatable = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
i = -1;
|
||||||
|
while ((i = bms_next_member(idkey, i)) >= 0)
|
||||||
|
{
|
||||||
|
int attnum = i + FirstLowInvalidHeapAttributeNumber;
|
||||||
|
|
||||||
|
if (!AttrNumberIsForUserDefinedAttr(attnum))
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("logical replication target relation \"%s.%s\" uses "
|
||||||
|
"system columns in REPLICA IDENTITY index",
|
||||||
|
remoterel->nspname, remoterel->relname)));
|
||||||
|
|
||||||
|
attnum = AttrNumberGetAttrOffset(attnum);
|
||||||
|
|
||||||
|
if (entry->attrmap->attnums[attnum] < 0 ||
|
||||||
|
!bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
|
||||||
|
{
|
||||||
|
entry->updatable = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Open the local relation associated with the remote one.
|
* Open the local relation associated with the remote one.
|
||||||
*
|
*
|
||||||
@ -307,7 +368,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
|
|||||||
if (!entry->localrelvalid)
|
if (!entry->localrelvalid)
|
||||||
{
|
{
|
||||||
Oid relid;
|
Oid relid;
|
||||||
Bitmapset *idkey;
|
|
||||||
TupleDesc desc;
|
TupleDesc desc;
|
||||||
MemoryContext oldctx;
|
MemoryContext oldctx;
|
||||||
int i;
|
int i;
|
||||||
@ -366,54 +426,10 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
|
|||||||
bms_free(missingatts);
|
bms_free(missingatts);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check that replica identity matches. We allow for stricter replica
|
* Set if the table's replica identity is enough to apply
|
||||||
* identity (fewer columns) on subscriber as that will not stop us
|
* update/delete.
|
||||||
* from finding unique tuple. IE, if publisher has identity
|
|
||||||
* (id,timestamp) and subscriber just (id) this will not be a problem,
|
|
||||||
* but in the opposite scenario it will.
|
|
||||||
*
|
|
||||||
* Don't throw any error here just mark the relation entry as not
|
|
||||||
* updatable, as replica identity is only for updates and deletes but
|
|
||||||
* inserts can be replicated even without it.
|
|
||||||
*/
|
*/
|
||||||
entry->updatable = true;
|
logicalrep_rel_mark_updatable(entry);
|
||||||
idkey = RelationGetIndexAttrBitmap(entry->localrel,
|
|
||||||
INDEX_ATTR_BITMAP_IDENTITY_KEY);
|
|
||||||
/* fallback to PK if no replica identity */
|
|
||||||
if (idkey == NULL)
|
|
||||||
{
|
|
||||||
idkey = RelationGetIndexAttrBitmap(entry->localrel,
|
|
||||||
INDEX_ATTR_BITMAP_PRIMARY_KEY);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If no replica identity index and no PK, the published table
|
|
||||||
* must have replica identity FULL.
|
|
||||||
*/
|
|
||||||
if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
|
|
||||||
entry->updatable = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
i = -1;
|
|
||||||
while ((i = bms_next_member(idkey, i)) >= 0)
|
|
||||||
{
|
|
||||||
int attnum = i + FirstLowInvalidHeapAttributeNumber;
|
|
||||||
|
|
||||||
if (!AttrNumberIsForUserDefinedAttr(attnum))
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
||||||
errmsg("logical replication target relation \"%s.%s\" uses "
|
|
||||||
"system columns in REPLICA IDENTITY index",
|
|
||||||
remoterel->nspname, remoterel->relname)));
|
|
||||||
|
|
||||||
attnum = AttrNumberGetAttrOffset(attnum);
|
|
||||||
|
|
||||||
if (entry->attrmap->attnums[attnum] < 0 ||
|
|
||||||
!bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
|
|
||||||
{
|
|
||||||
entry->updatable = false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
entry->localrelvalid = true;
|
entry->localrelvalid = true;
|
||||||
}
|
}
|
||||||
@ -651,7 +667,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
|
|||||||
attrmap->maplen * sizeof(AttrNumber));
|
attrmap->maplen * sizeof(AttrNumber));
|
||||||
}
|
}
|
||||||
|
|
||||||
entry->updatable = root->updatable;
|
/* Set if the table's replica identity is enough to apply update/delete. */
|
||||||
|
logicalrep_rel_mark_updatable(entry);
|
||||||
|
|
||||||
entry->localrelvalid = true;
|
entry->localrelvalid = true;
|
||||||
|
|
||||||
|
@ -1738,6 +1738,13 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
|
|||||||
static void
|
static void
|
||||||
check_relation_updatable(LogicalRepRelMapEntry *rel)
|
check_relation_updatable(LogicalRepRelMapEntry *rel)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* For partitioned tables, we only need to care if the target partition is
|
||||||
|
* updatable (aka has PK or RI defined for it).
|
||||||
|
*/
|
||||||
|
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
|
||||||
|
return;
|
||||||
|
|
||||||
/* Updatable, no error. */
|
/* Updatable, no error. */
|
||||||
if (rel->updatable)
|
if (rel->updatable)
|
||||||
return;
|
return;
|
||||||
@ -2121,6 +2128,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
|
|||||||
TupleTableSlot *remoteslot_part;
|
TupleTableSlot *remoteslot_part;
|
||||||
TupleConversionMap *map;
|
TupleConversionMap *map;
|
||||||
MemoryContext oldctx;
|
MemoryContext oldctx;
|
||||||
|
LogicalRepRelMapEntry *part_entry = NULL;
|
||||||
|
AttrMap *attrmap = NULL;
|
||||||
|
|
||||||
/* ModifyTableState is needed for ExecFindPartition(). */
|
/* ModifyTableState is needed for ExecFindPartition(). */
|
||||||
edata->mtstate = mtstate = makeNode(ModifyTableState);
|
edata->mtstate = mtstate = makeNode(ModifyTableState);
|
||||||
@ -2152,8 +2161,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
|
|||||||
remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
|
remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
|
||||||
map = partrelinfo->ri_RootToPartitionMap;
|
map = partrelinfo->ri_RootToPartitionMap;
|
||||||
if (map != NULL)
|
if (map != NULL)
|
||||||
remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
|
{
|
||||||
|
attrmap = map->attrMap;
|
||||||
|
remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
|
||||||
remoteslot_part);
|
remoteslot_part);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
|
remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
|
||||||
@ -2161,6 +2173,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
|
|||||||
}
|
}
|
||||||
MemoryContextSwitchTo(oldctx);
|
MemoryContextSwitchTo(oldctx);
|
||||||
|
|
||||||
|
/* Check if we can do the update or delete on the leaf partition. */
|
||||||
|
if (operation == CMD_UPDATE || operation == CMD_DELETE)
|
||||||
|
{
|
||||||
|
part_entry = logicalrep_partition_open(relmapentry, partrel,
|
||||||
|
attrmap);
|
||||||
|
check_relation_updatable(part_entry);
|
||||||
|
}
|
||||||
|
|
||||||
switch (operation)
|
switch (operation)
|
||||||
{
|
{
|
||||||
case CMD_INSERT:
|
case CMD_INSERT:
|
||||||
@ -2182,15 +2202,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
|
|||||||
* suitable partition.
|
* suitable partition.
|
||||||
*/
|
*/
|
||||||
{
|
{
|
||||||
AttrMap *attrmap = map ? map->attrMap : NULL;
|
|
||||||
LogicalRepRelMapEntry *part_entry;
|
|
||||||
TupleTableSlot *localslot;
|
TupleTableSlot *localslot;
|
||||||
ResultRelInfo *partrelinfo_new;
|
ResultRelInfo *partrelinfo_new;
|
||||||
bool found;
|
bool found;
|
||||||
|
|
||||||
part_entry = logicalrep_partition_open(relmapentry, partrel,
|
|
||||||
attrmap);
|
|
||||||
|
|
||||||
/* Get the matching local tuple from the partition. */
|
/* Get the matching local tuple from the partition. */
|
||||||
found = FindReplTupleInLocalRel(estate, partrel,
|
found = FindReplTupleInLocalRel(estate, partrel,
|
||||||
&part_entry->remoterel,
|
&part_entry->remoterel,
|
||||||
|
@ -868,4 +868,18 @@ $result = $node_subscriber2->safe_psql('postgres',
|
|||||||
"SELECT a, b, c FROM tab5 ORDER BY 1");
|
"SELECT a, b, c FROM tab5 ORDER BY 1");
|
||||||
is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on publisher');
|
is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on publisher');
|
||||||
|
|
||||||
|
# Test that replication works correctly as long as the leaf partition
|
||||||
|
# has the necessary REPLICA IDENTITY, even though the actual target
|
||||||
|
# partitioned table does not.
|
||||||
|
$node_subscriber2->safe_psql('postgres',
|
||||||
|
"ALTER TABLE tab5 REPLICA IDENTITY NOTHING");
|
||||||
|
|
||||||
|
$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 4 WHERE a = 3");
|
||||||
|
|
||||||
|
$node_publisher->wait_for_catchup('sub2');
|
||||||
|
|
||||||
|
$result = $node_subscriber2->safe_psql('postgres',
|
||||||
|
"SELECT a, b, c FROM tab5_1 ORDER BY 1");
|
||||||
|
is($result, qq(4||1), 'updates of tab5 replicated correctly');
|
||||||
|
|
||||||
done_testing();
|
done_testing();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user