diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index a23a3d57e2..885a2d70ae 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1579,8 +1579,91 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
node. If incoming data violates any constraints the replication will
stop. This is referred to as a conflict. When
replicating UPDATE or DELETE
- operations, missing data will not produce a conflict and such operations
- will simply be skipped.
+ operations, missing data is also considered as a
+ conflict, but does not result in an error and such
+ operations will simply be skipped.
+
+
+
+ Additional logging is triggered in the following conflict
+ cases:
+
+
+ insert_exists
+
+
+ Inserting a row that violates a NOT DEFERRABLE
+ unique constraint. Note that to log the origin and commit
+ timestamp details of the conflicting key,
+ track_commit_timestamp
+ should be enabled on the subscriber. In this case, an error will be
+ raised until the conflict is resolved manually.
+
+
+
+
+ update_differ
+
+
+ Updating a row that was previously modified by another origin.
+ Note that this conflict can only be detected when
+ track_commit_timestamp
+ is enabled on the subscriber. Currenly, the update is always applied
+ regardless of the origin of the local row.
+
+
+
+
+ update_exists
+
+
+ The updated value of a row violates a NOT DEFERRABLE
+ unique constraint. Note that to log the origin and commit
+ timestamp details of the conflicting key,
+ track_commit_timestamp
+ should be enabled on the subscriber. In this case, an error will be
+ raised until the conflict is resolved manually. Note that when updating a
+ partitioned table, if the updated row value satisfies another partition
+ constraint resulting in the row being inserted into a new partition, the
+ insert_exists conflict may arise if the new row
+ violates a NOT DEFERRABLE unique constraint.
+
+
+
+
+ update_missing
+
+
+ The tuple to be updated was not found. The update will simply be
+ skipped in this scenario.
+
+
+
+
+ delete_differ
+
+
+ Deleting a row that was previously modified by another origin. Note that
+ this conflict can only be detected when
+ track_commit_timestamp
+ is enabled on the subscriber. Currenly, the delete is always applied
+ regardless of the origin of the local row.
+
+
+
+
+ delete_missing
+
+
+ The tuple to be deleted was not found. The delete will simply be
+ skipped in this scenario.
+
+
+
+
+ Note that there are other conflict scenarios, such as exclusion constraint
+ violations. Currently, we do not provide additional details for them in the
+ log.
@@ -1597,7 +1680,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
- A conflict will produce an error and will stop the replication; it must be
+ A conflict that produces an error will stop the replication; it must be
resolved manually by the user. Details about the conflict can be found in
the subscriber's server log.
@@ -1609,8 +1692,9 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
an error, the replication won't proceed, and the logical replication worker will
emit the following kind of message to the subscriber's server log:
-ERROR: duplicate key value violates unique constraint "test_pkey"
-DETAIL: Key (c)=(1) already exists.
+ERROR: conflict detected on relation "public.test": conflict=insert_exists
+DETAIL: Key already exists in unique index "t_pkey", which was modified locally in transaction 740 at 2024-06-26 10:47:04.727375+08.
+Key (c)=(1); existing local tuple (1, 'local'); remote tuple (1, 'remote').
CONTEXT: processing remote data for replication origin "pg_16395" during "INSERT" for replication target relation "public.test" in transaction 725 finished at 0/14C0378
The LSN of the transaction that contains the change violating the constraint and
@@ -1636,6 +1720,15 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
Please note that skipping the whole transaction includes skipping changes that
might not violate any constraint. This can easily make the subscriber
inconsistent.
+ The additional details regarding conflicting rows, such as their origin and
+ commit timestamp can be seen in the DETAIL line of the
+ log. But note that this information is only available when
+ track_commit_timestamp
+ is enabled on the subscriber. Users can use this information to decide
+ whether to retain the local change or adopt the remote alteration. For
+ instance, the DETAIL line in the above log indicates that
+ the existing row was modified locally. Users can manually perform a
+ remote-change-win.
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index de751e8e4a..43c95d6109 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -154,8 +154,9 @@ IndexScanEnd(IndexScanDesc scan)
*
* Construct a string describing the contents of an index entry, in the
* form "(key_name, ...)=(key_value, ...)". This is currently used
- * for building unique-constraint and exclusion-constraint error messages,
- * so only key columns of the index are checked and printed.
+ * for building unique-constraint, exclusion-constraint error messages, and
+ * logical replication conflict error messages so only key columns of the index
+ * are checked and printed.
*
* Note that if the user does not have permissions to view all of the
* columns involved then a NULL is returned. Returning a partial key seems
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index a819b4197c..33759056e3 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2631,8 +2631,9 @@ CompareIndexInfo(const IndexInfo *info1, const IndexInfo *info2,
* Add extra state to IndexInfo record
*
* For unique indexes, we usually don't want to add info to the IndexInfo for
- * checking uniqueness, since the B-Tree AM handles that directly. However,
- * in the case of speculative insertion, additional support is required.
+ * checking uniqueness, since the B-Tree AM handles that directly. However, in
+ * the case of speculative insertion and conflict detection in logical
+ * replication, additional support is required.
*
* Do this processing here rather than in BuildIndexInfo() to not incur the
* overhead in the common non-speculative cases.
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c
index 9f05b3654c..403a3f4055 100644
--- a/src/backend/executor/execIndexing.c
+++ b/src/backend/executor/execIndexing.c
@@ -207,8 +207,9 @@ ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
ii = BuildIndexInfo(indexDesc);
/*
- * If the indexes are to be used for speculative insertion, add extra
- * information required by unique index entries.
+ * If the indexes are to be used for speculative insertion or conflict
+ * detection in logical replication, add extra information required by
+ * unique index entries.
*/
if (speculative && ii->ii_Unique)
BuildSpeculativeIndexInfo(indexDesc, ii);
@@ -519,14 +520,18 @@ ExecInsertIndexTuples(ResultRelInfo *resultRelInfo,
*
* Note that this doesn't lock the values in any way, so it's
* possible that a conflicting tuple is inserted immediately
- * after this returns. But this can be used for a pre-check
- * before insertion.
+ * after this returns. This can be used for either a pre-check
+ * before insertion or a re-check after finding a conflict.
+ *
+ * 'tupleid' should be the TID of the tuple that has been recently
+ * inserted (or can be invalid if we haven't inserted a new tuple yet).
+ * This tuple will be excluded from conflict checking.
* ----------------------------------------------------------------
*/
bool
ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
EState *estate, ItemPointer conflictTid,
- List *arbiterIndexes)
+ ItemPointer tupleid, List *arbiterIndexes)
{
int i;
int numIndices;
@@ -629,7 +634,7 @@ ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
satisfiesConstraint =
check_exclusion_or_unique_constraint(heapRelation, indexRelation,
- indexInfo, &invalidItemPtr,
+ indexInfo, tupleid,
values, isnull, estate, false,
CEOUC_WAIT, true,
conflictTid);
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 4d7c92d63c..29e186fa73 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -88,11 +88,6 @@ static bool ExecCheckPermissionsModified(Oid relOid, Oid userid,
Bitmapset *modifiedCols,
AclMode requiredPerms);
static void ExecCheckXactReadOnly(PlannedStmt *plannedstmt);
-static char *ExecBuildSlotValueDescription(Oid reloid,
- TupleTableSlot *slot,
- TupleDesc tupdesc,
- Bitmapset *modifiedCols,
- int maxfieldlen);
static void EvalPlanQualStart(EPQState *epqstate, Plan *planTree);
/* end of local decls */
@@ -2210,7 +2205,7 @@ ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo,
* column involved, that subset will be returned with a key identifying which
* columns they are.
*/
-static char *
+char *
ExecBuildSlotValueDescription(Oid reloid,
TupleTableSlot *slot,
TupleDesc tupdesc,
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index d0a89cd577..1086cbc962 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -23,6 +23,7 @@
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
+#include "replication/conflict.h"
#include "replication/logicalrelation.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
@@ -166,6 +167,51 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
return skey_attoff;
}
+
+/*
+ * Helper function to check if it is necessary to re-fetch and lock the tuple
+ * due to concurrent modifications. This function should be called after
+ * invoking table_tuple_lock.
+ */
+static bool
+should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
+{
+ bool refetch = false;
+
+ switch (res)
+ {
+ case TM_Ok:
+ break;
+ case TM_Updated:
+ /* XXX: Improve handling here */
+ if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
+ else
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent update, retrying")));
+ refetch = true;
+ break;
+ case TM_Deleted:
+ /* XXX: Improve handling here */
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent delete, retrying")));
+ refetch = true;
+ break;
+ case TM_Invisible:
+ elog(ERROR, "attempted to lock invisible tuple");
+ break;
+ default:
+ elog(ERROR, "unexpected table_tuple_lock status: %u", res);
+ break;
+ }
+
+ return refetch;
+}
+
/*
* Search the relation 'rel' for tuple using the index.
*
@@ -260,34 +306,8 @@ retry:
PopActiveSnapshot();
- switch (res)
- {
- case TM_Ok:
- break;
- case TM_Updated:
- /* XXX: Improve handling here */
- if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
- else
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent update, retrying")));
- goto retry;
- case TM_Deleted:
- /* XXX: Improve handling here */
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent delete, retrying")));
- goto retry;
- case TM_Invisible:
- elog(ERROR, "attempted to lock invisible tuple");
- break;
- default:
- elog(ERROR, "unexpected table_tuple_lock status: %u", res);
- break;
- }
+ if (should_refetch_tuple(res, &tmfd))
+ goto retry;
}
index_endscan(scan);
@@ -444,34 +464,8 @@ retry:
PopActiveSnapshot();
- switch (res)
- {
- case TM_Ok:
- break;
- case TM_Updated:
- /* XXX: Improve handling here */
- if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
- else
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent update, retrying")));
- goto retry;
- case TM_Deleted:
- /* XXX: Improve handling here */
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent delete, retrying")));
- goto retry;
- case TM_Invisible:
- elog(ERROR, "attempted to lock invisible tuple");
- break;
- default:
- elog(ERROR, "unexpected table_tuple_lock status: %u", res);
- break;
- }
+ if (should_refetch_tuple(res, &tmfd))
+ goto retry;
}
table_endscan(scan);
@@ -480,6 +474,89 @@ retry:
return found;
}
+/*
+ * Find the tuple that violates the passed unique index (conflictindex).
+ *
+ * If the conflicting tuple is found return true, otherwise false.
+ *
+ * We lock the tuple to avoid getting it deleted before the caller can fetch
+ * the required information. Note that if the tuple is deleted before a lock
+ * is acquired, we will retry to find the conflicting tuple again.
+ */
+static bool
+FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
+ Oid conflictindex, TupleTableSlot *slot,
+ TupleTableSlot **conflictslot)
+{
+ Relation rel = resultRelInfo->ri_RelationDesc;
+ ItemPointerData conflictTid;
+ TM_FailureData tmfd;
+ TM_Result res;
+
+ *conflictslot = NULL;
+
+retry:
+ if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
+ &conflictTid, &slot->tts_tid,
+ list_make1_oid(conflictindex)))
+ {
+ if (*conflictslot)
+ ExecDropSingleTupleTableSlot(*conflictslot);
+
+ *conflictslot = NULL;
+ return false;
+ }
+
+ *conflictslot = table_slot_create(rel, NULL);
+
+ PushActiveSnapshot(GetLatestSnapshot());
+
+ res = table_tuple_lock(rel, &conflictTid, GetLatestSnapshot(),
+ *conflictslot,
+ GetCurrentCommandId(false),
+ LockTupleShare,
+ LockWaitBlock,
+ 0 /* don't follow updates */ ,
+ &tmfd);
+
+ PopActiveSnapshot();
+
+ if (should_refetch_tuple(res, &tmfd))
+ goto retry;
+
+ return true;
+}
+
+/*
+ * Check all the unique indexes in 'recheckIndexes' for conflict with the
+ * tuple in 'remoteslot' and report if found.
+ */
+static void
+CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
+ ConflictType type, List *recheckIndexes,
+ TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
+{
+ /* Check all the unique indexes for a conflict */
+ foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
+ {
+ TupleTableSlot *conflictslot;
+
+ if (list_member_oid(recheckIndexes, uniqueidx) &&
+ FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
+ &conflictslot))
+ {
+ RepOriginId origin;
+ TimestampTz committs;
+ TransactionId xmin;
+
+ GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
+ ReportApplyConflict(estate, resultRelInfo, ERROR, type,
+ searchslot, conflictslot, remoteslot,
+ uniqueidx, xmin, origin, committs);
+ }
+ }
+}
+
/*
* Insert tuple represented in the slot to the relation, update the indexes,
* and execute any constraints and per-row triggers.
@@ -509,6 +586,8 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
if (!skip_tuple)
{
List *recheckIndexes = NIL;
+ List *conflictindexes;
+ bool conflict = false;
/* Compute stored generated columns */
if (rel->rd_att->constr &&
@@ -525,10 +604,33 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
/* OK, store the tuple and create index entries for it */
simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
+ conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
+
if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
- slot, estate, false, false,
- NULL, NIL, false);
+ slot, estate, false,
+ conflictindexes ? true : false,
+ &conflict,
+ conflictindexes, false);
+
+ /*
+ * Checks the conflict indexes to fetch the conflicting local tuple
+ * and reports the conflict. We perform this check here, instead of
+ * performing an additional index scan before the actual insertion and
+ * reporting the conflict if any conflicting tuples are found. This is
+ * to avoid the overhead of executing the extra scan for each INSERT
+ * operation, even when no conflict arises, which could introduce
+ * significant overhead to replication, particularly in cases where
+ * conflicts are rare.
+ *
+ * XXX OTOH, this could lead to clean-up effort for dead tuples added
+ * in heap and index in case of conflicts. But as conflicts shouldn't
+ * be a frequent thing so we preferred to save the performance
+ * overhead of extra scan before each insertion.
+ */
+ if (conflict)
+ CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
+ recheckIndexes, NULL, slot);
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate, resultRelInfo, slot,
@@ -577,6 +679,8 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
{
List *recheckIndexes = NIL;
TU_UpdateIndexes update_indexes;
+ List *conflictindexes;
+ bool conflict = false;
/* Compute stored generated columns */
if (rel->rd_att->constr &&
@@ -593,12 +697,24 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
&update_indexes);
+ conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
+
if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
- slot, estate, true, false,
- NULL, NIL,
+ slot, estate, true,
+ conflictindexes ? true : false,
+ &conflict, conflictindexes,
(update_indexes == TU_Summarizing));
+ /*
+ * Refer to the comments above the call to CheckAndReportConflict() in
+ * ExecSimpleRelationInsert to understand why this check is done at
+ * this point.
+ */
+ if (conflict)
+ CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
+ recheckIndexes, searchslot, slot);
+
/* AFTER ROW UPDATE Triggers */
ExecARUpdateTriggers(estate, resultRelInfo,
NULL, NULL,
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 4913e49319..8bf4c80d4a 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -1019,9 +1019,11 @@ ExecInsert(ModifyTableContext *context,
/* Perform a speculative insertion. */
uint32 specToken;
ItemPointerData conflictTid;
+ ItemPointerData invalidItemPtr;
bool specConflict;
List *arbiterIndexes;
+ ItemPointerSetInvalid(&invalidItemPtr);
arbiterIndexes = resultRelInfo->ri_onConflictArbiterIndexes;
/*
@@ -1041,7 +1043,8 @@ ExecInsert(ModifyTableContext *context,
CHECK_FOR_INTERRUPTS();
specConflict = false;
if (!ExecCheckIndexConstraints(resultRelInfo, slot, estate,
- &conflictTid, arbiterIndexes))
+ &conflictTid, &invalidItemPtr,
+ arbiterIndexes))
{
/* committed conflict tuple found */
if (onconflict == ONCONFLICT_UPDATE)
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index ba03eeff1c..1e08bbbd4e 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -16,6 +16,7 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
OBJS = \
applyparallelworker.o \
+ conflict.o \
decode.o \
launcher.o \
logical.o \
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
new file mode 100644
index 0000000000..0bc7959980
--- /dev/null
+++ b/src/backend/replication/logical/conflict.c
@@ -0,0 +1,488 @@
+/*-------------------------------------------------------------------------
+ * conflict.c
+ * Support routines for logging conflicts.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/conflict.c
+ *
+ * This file contains the code for logging conflicts on the subscriber during
+ * logical replication.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/commit_ts.h"
+#include "access/tableam.h"
+#include "executor/executor.h"
+#include "replication/conflict.h"
+#include "replication/logicalrelation.h"
+#include "storage/lmgr.h"
+#include "utils/lsyscache.h"
+
+static const char *const ConflictTypeNames[] = {
+ [CT_INSERT_EXISTS] = "insert_exists",
+ [CT_UPDATE_DIFFER] = "update_differ",
+ [CT_UPDATE_EXISTS] = "update_exists",
+ [CT_UPDATE_MISSING] = "update_missing",
+ [CT_DELETE_DIFFER] = "delete_differ",
+ [CT_DELETE_MISSING] = "delete_missing"
+};
+
+static int errcode_apply_conflict(ConflictType type);
+static int errdetail_apply_conflict(EState *estate,
+ ResultRelInfo *relinfo,
+ ConflictType type,
+ TupleTableSlot *searchslot,
+ TupleTableSlot *localslot,
+ TupleTableSlot *remoteslot,
+ Oid indexoid, TransactionId localxmin,
+ RepOriginId localorigin,
+ TimestampTz localts);
+static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
+ ConflictType type,
+ TupleTableSlot *searchslot,
+ TupleTableSlot *localslot,
+ TupleTableSlot *remoteslot,
+ Oid indexoid);
+static char *build_index_value_desc(EState *estate, Relation localrel,
+ TupleTableSlot *slot, Oid indexoid);
+
+/*
+ * Get the xmin and commit timestamp data (origin and timestamp) associated
+ * with the provided local tuple.
+ *
+ * Return true if the commit timestamp data was found, false otherwise.
+ */
+bool
+GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
+ RepOriginId *localorigin, TimestampTz *localts)
+{
+ Datum xminDatum;
+ bool isnull;
+
+ xminDatum = slot_getsysattr(localslot, MinTransactionIdAttributeNumber,
+ &isnull);
+ *xmin = DatumGetTransactionId(xminDatum);
+ Assert(!isnull);
+
+ /*
+ * The commit timestamp data is not available if track_commit_timestamp is
+ * disabled.
+ */
+ if (!track_commit_timestamp)
+ {
+ *localorigin = InvalidRepOriginId;
+ *localts = 0;
+ return false;
+ }
+
+ return TransactionIdGetCommitTsData(*xmin, localts, localorigin);
+}
+
+/*
+ * This function is used to report a conflict while applying replication
+ * changes.
+ *
+ * 'searchslot' should contain the tuple used to search the local tuple to be
+ * updated or deleted.
+ *
+ * 'localslot' should contain the existing local tuple, if any, that conflicts
+ * with the remote tuple. 'localxmin', 'localorigin', and 'localts' provide the
+ * transaction information related to this existing local tuple.
+ *
+ * 'remoteslot' should contain the remote new tuple, if any.
+ *
+ * The 'indexoid' represents the OID of the unique index that triggered the
+ * constraint violation error. We use this to report the key values for
+ * conflicting tuple.
+ *
+ * The caller must ensure that the index with the OID 'indexoid' is locked so
+ * that we can fetch and display the conflicting key value.
+ */
+void
+ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
+ ConflictType type, TupleTableSlot *searchslot,
+ TupleTableSlot *localslot, TupleTableSlot *remoteslot,
+ Oid indexoid, TransactionId localxmin,
+ RepOriginId localorigin, TimestampTz localts)
+{
+ Relation localrel = relinfo->ri_RelationDesc;
+
+ Assert(!OidIsValid(indexoid) ||
+ CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+
+ ereport(elevel,
+ errcode_apply_conflict(type),
+ errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+ get_namespace_name(RelationGetNamespace(localrel)),
+ RelationGetRelationName(localrel),
+ ConflictTypeNames[type]),
+ errdetail_apply_conflict(estate, relinfo, type, searchslot,
+ localslot, remoteslot, indexoid,
+ localxmin, localorigin, localts));
+}
+
+/*
+ * Find all unique indexes to check for a conflict and store them into
+ * ResultRelInfo.
+ */
+void
+InitConflictIndexes(ResultRelInfo *relInfo)
+{
+ List *uniqueIndexes = NIL;
+
+ for (int i = 0; i < relInfo->ri_NumIndices; i++)
+ {
+ Relation indexRelation = relInfo->ri_IndexRelationDescs[i];
+
+ if (indexRelation == NULL)
+ continue;
+
+ /* Detect conflict only for unique indexes */
+ if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique)
+ continue;
+
+ /* Don't support conflict detection for deferrable index */
+ if (!indexRelation->rd_index->indimmediate)
+ continue;
+
+ uniqueIndexes = lappend_oid(uniqueIndexes,
+ RelationGetRelid(indexRelation));
+ }
+
+ relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
+}
+
+/*
+ * Add SQLSTATE error code to the current conflict report.
+ */
+static int
+errcode_apply_conflict(ConflictType type)
+{
+ switch (type)
+ {
+ case CT_INSERT_EXISTS:
+ case CT_UPDATE_EXISTS:
+ return errcode(ERRCODE_UNIQUE_VIOLATION);
+ case CT_UPDATE_DIFFER:
+ case CT_UPDATE_MISSING:
+ case CT_DELETE_DIFFER:
+ case CT_DELETE_MISSING:
+ return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE);
+ }
+
+ Assert(false);
+ return 0; /* silence compiler warning */
+}
+
+/*
+ * Add an errdetail() line showing conflict detail.
+ *
+ * The DETAIL line comprises of two parts:
+ * 1. Explanation of the conflict type, including the origin and commit
+ * timestamp of the existing local tuple.
+ * 2. Display of conflicting key, existing local tuple, remote new tuple, and
+ * replica identity columns, if any. The remote old tuple is excluded as its
+ * information is covered in the replica identity columns.
+ */
+static int
+errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
+ ConflictType type, TupleTableSlot *searchslot,
+ TupleTableSlot *localslot, TupleTableSlot *remoteslot,
+ Oid indexoid, TransactionId localxmin,
+ RepOriginId localorigin, TimestampTz localts)
+{
+ StringInfoData err_detail;
+ char *val_desc;
+ char *origin_name;
+
+ initStringInfo(&err_detail);
+
+ /* First, construct a detailed message describing the type of conflict */
+ switch (type)
+ {
+ case CT_INSERT_EXISTS:
+ case CT_UPDATE_EXISTS:
+ Assert(OidIsValid(indexoid));
+
+ if (localts)
+ {
+ if (localorigin == InvalidRepOriginId)
+ appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s."),
+ get_rel_name(indexoid),
+ localxmin, timestamptz_to_str(localts));
+ else if (replorigin_by_oid(localorigin, true, &origin_name))
+ appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by origin \"%s\" in transaction %u at %s."),
+ get_rel_name(indexoid), origin_name,
+ localxmin, timestamptz_to_str(localts));
+
+ /*
+ * The origin that modified this row has been removed. This
+ * can happen if the origin was created by a different apply
+ * worker and its associated subscription and origin were
+ * dropped after updating the row, or if the origin was
+ * manually dropped by the user.
+ */
+ else
+ appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by a non-existent origin in transaction %u at %s."),
+ get_rel_name(indexoid),
+ localxmin, timestamptz_to_str(localts));
+ }
+ else
+ appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified in transaction %u."),
+ get_rel_name(indexoid), localxmin);
+
+ break;
+
+ case CT_UPDATE_DIFFER:
+ if (localorigin == InvalidRepOriginId)
+ appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s."),
+ localxmin, timestamptz_to_str(localts));
+ else if (replorigin_by_oid(localorigin, true, &origin_name))
+ appendStringInfo(&err_detail, _("Updating the row that was modified by a different origin \"%s\" in transaction %u at %s."),
+ origin_name, localxmin, timestamptz_to_str(localts));
+
+ /* The origin that modified this row has been removed. */
+ else
+ appendStringInfo(&err_detail, _("Updating the row that was modified by a non-existent origin in transaction %u at %s."),
+ localxmin, timestamptz_to_str(localts));
+
+ break;
+
+ case CT_UPDATE_MISSING:
+ appendStringInfo(&err_detail, _("Could not find the row to be updated."));
+ break;
+
+ case CT_DELETE_DIFFER:
+ if (localorigin == InvalidRepOriginId)
+ appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s."),
+ localxmin, timestamptz_to_str(localts));
+ else if (replorigin_by_oid(localorigin, true, &origin_name))
+ appendStringInfo(&err_detail, _("Deleting the row that was modified by a different origin \"%s\" in transaction %u at %s."),
+ origin_name, localxmin, timestamptz_to_str(localts));
+
+ /* The origin that modified this row has been removed. */
+ else
+ appendStringInfo(&err_detail, _("Deleting the row that was modified by a non-existent origin in transaction %u at %s."),
+ localxmin, timestamptz_to_str(localts));
+
+ break;
+
+ case CT_DELETE_MISSING:
+ appendStringInfo(&err_detail, _("Could not find the row to be deleted."));
+ break;
+ }
+
+ Assert(err_detail.len > 0);
+
+ val_desc = build_tuple_value_details(estate, relinfo, type, searchslot,
+ localslot, remoteslot, indexoid);
+
+ /*
+ * Next, append the key values, existing local tuple, remote tuple and
+ * replica identity columns after the message.
+ */
+ if (val_desc)
+ appendStringInfo(&err_detail, "\n%s", val_desc);
+
+ return errdetail_internal("%s", err_detail.data);
+}
+
+/*
+ * Helper function to build the additional details for conflicting key,
+ * existing local tuple, remote tuple, and replica identity columns.
+ *
+ * If the return value is NULL, it indicates that the current user lacks
+ * permissions to view the columns involved.
+ */
+static char *
+build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
+ ConflictType type,
+ TupleTableSlot *searchslot,
+ TupleTableSlot *localslot,
+ TupleTableSlot *remoteslot,
+ Oid indexoid)
+{
+ Relation localrel = relinfo->ri_RelationDesc;
+ Oid relid = RelationGetRelid(localrel);
+ TupleDesc tupdesc = RelationGetDescr(localrel);
+ StringInfoData tuple_value;
+ char *desc = NULL;
+
+ Assert(searchslot || localslot || remoteslot);
+
+ initStringInfo(&tuple_value);
+
+ /*
+ * Report the conflicting key values in the case of a unique constraint
+ * violation.
+ */
+ if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS)
+ {
+ Assert(OidIsValid(indexoid) && localslot);
+
+ desc = build_index_value_desc(estate, localrel, localslot, indexoid);
+
+ if (desc)
+ appendStringInfo(&tuple_value, _("Key %s"), desc);
+ }
+
+ if (localslot)
+ {
+ /*
+ * The 'modifiedCols' only applies to the new tuple, hence we pass
+ * NULL for the existing local tuple.
+ */
+ desc = ExecBuildSlotValueDescription(relid, localslot, tupdesc,
+ NULL, 64);
+
+ if (desc)
+ {
+ if (tuple_value.len > 0)
+ {
+ appendStringInfoString(&tuple_value, "; ");
+ appendStringInfo(&tuple_value, _("existing local tuple %s"),
+ desc);
+ }
+ else
+ {
+ appendStringInfo(&tuple_value, _("Existing local tuple %s"),
+ desc);
+ }
+ }
+ }
+
+ if (remoteslot)
+ {
+ Bitmapset *modifiedCols;
+
+ /*
+ * Although logical replication doesn't maintain the bitmap for the
+ * columns being inserted, we still use it to create 'modifiedCols'
+ * for consistency with other calls to ExecBuildSlotValueDescription.
+ *
+ * Note that generated columns are formed locally on the subscriber.
+ */
+ modifiedCols = bms_union(ExecGetInsertedCols(relinfo, estate),
+ ExecGetUpdatedCols(relinfo, estate));
+ desc = ExecBuildSlotValueDescription(relid, remoteslot, tupdesc,
+ modifiedCols, 64);
+
+ if (desc)
+ {
+ if (tuple_value.len > 0)
+ {
+ appendStringInfoString(&tuple_value, "; ");
+ appendStringInfo(&tuple_value, _("remote tuple %s"), desc);
+ }
+ else
+ {
+ appendStringInfo(&tuple_value, _("Remote tuple %s"), desc);
+ }
+ }
+ }
+
+ if (searchslot)
+ {
+ /*
+ * Note that while index other than replica identity may be used (see
+ * IsIndexUsableForReplicaIdentityFull for details) to find the tuple
+ * when applying update or delete, such an index scan may not result
+ * in a unique tuple and we still compare the complete tuple in such
+ * cases, thus such indexes are not used here.
+ */
+ Oid replica_index = GetRelationIdentityOrPK(localrel);
+
+ Assert(type != CT_INSERT_EXISTS);
+
+ /*
+ * If the table has a valid replica identity index, build the index
+ * key value string. Otherwise, construct the full tuple value for
+ * REPLICA IDENTITY FULL cases.
+ */
+ if (OidIsValid(replica_index))
+ desc = build_index_value_desc(estate, localrel, searchslot, replica_index);
+ else
+ desc = ExecBuildSlotValueDescription(relid, searchslot, tupdesc, NULL, 64);
+
+ if (desc)
+ {
+ if (tuple_value.len > 0)
+ {
+ appendStringInfoString(&tuple_value, "; ");
+ appendStringInfo(&tuple_value, OidIsValid(replica_index)
+ ? _("replica identity %s")
+ : _("replica identity full %s"), desc);
+ }
+ else
+ {
+ appendStringInfo(&tuple_value, OidIsValid(replica_index)
+ ? _("Replica identity %s")
+ : _("Replica identity full %s"), desc);
+ }
+ }
+ }
+
+ if (tuple_value.len == 0)
+ return NULL;
+
+ appendStringInfoChar(&tuple_value, '.');
+ return tuple_value.data;
+}
+
+/*
+ * Helper functions to construct a string describing the contents of an index
+ * entry. See BuildIndexValueDescription for details.
+ *
+ * The caller must ensure that the index with the OID 'indexoid' is locked so
+ * that we can fetch and display the conflicting key value.
+ */
+static char *
+build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
+ Oid indexoid)
+{
+ char *index_value;
+ Relation indexDesc;
+ Datum values[INDEX_MAX_KEYS];
+ bool isnull[INDEX_MAX_KEYS];
+ TupleTableSlot *tableslot = slot;
+
+ if (!tableslot)
+ return NULL;
+
+ Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+
+ indexDesc = index_open(indexoid, NoLock);
+
+ /*
+ * If the slot is a virtual slot, copy it into a heap tuple slot as
+ * FormIndexDatum only works with heap tuple slots.
+ */
+ if (TTS_IS_VIRTUAL(slot))
+ {
+ tableslot = table_slot_create(localrel, &estate->es_tupleTable);
+ tableslot = ExecCopySlot(tableslot, slot);
+ }
+
+ /*
+ * Initialize ecxt_scantuple for potential use in FormIndexDatum when
+ * index expressions are present.
+ */
+ GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+
+ /*
+ * The values/nulls arrays passed to BuildIndexValueDescription should be
+ * the results of FormIndexDatum, which are the "raw" input to the index
+ * AM.
+ */
+ FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
+
+ index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+
+ index_close(indexDesc, NoLock);
+
+ return index_value;
+}
diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build
index 3dec36a6de..3d36249d8a 100644
--- a/src/backend/replication/logical/meson.build
+++ b/src/backend/replication/logical/meson.build
@@ -2,6 +2,7 @@
backend_sources += files(
'applyparallelworker.c',
+ 'conflict.c',
'decode.c',
'launcher.c',
'logical.c',
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 245e9be6f2..cdea6295d8 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -167,6 +167,7 @@
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/walwriter.h"
+#include "replication/conflict.h"
#include "replication/logicallauncher.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
@@ -2481,7 +2482,8 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
EState *estate = edata->estate;
/* We must open indexes here. */
- ExecOpenIndices(relinfo, false);
+ ExecOpenIndices(relinfo, true);
+ InitConflictIndexes(relinfo);
/* Do the insert. */
TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
@@ -2669,13 +2671,12 @@ apply_handle_update_internal(ApplyExecutionData *edata,
MemoryContext oldctx;
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
- ExecOpenIndices(relinfo, false);
+ ExecOpenIndices(relinfo, true);
found = FindReplTupleInLocalRel(edata, localrel,
&relmapentry->remoterel,
localindexoid,
remoteslot, &localslot);
- ExecClearTuple(remoteslot);
/*
* Tuple found.
@@ -2684,6 +2685,28 @@ apply_handle_update_internal(ApplyExecutionData *edata,
*/
if (found)
{
+ RepOriginId localorigin;
+ TransactionId localxmin;
+ TimestampTz localts;
+
+ /*
+ * Report the conflict if the tuple was modified by a different
+ * origin.
+ */
+ if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
+ localorigin != replorigin_session_origin)
+ {
+ TupleTableSlot *newslot;
+
+ /* Store the new tuple for conflict reporting */
+ newslot = table_slot_create(localrel, &estate->es_tupleTable);
+ slot_store_data(newslot, relmapentry, newtup);
+
+ ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_DIFFER,
+ remoteslot, localslot, newslot,
+ InvalidOid, localxmin, localorigin, localts);
+ }
+
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_modify_data(remoteslot, localslot, relmapentry, newtup);
@@ -2691,6 +2714,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
EvalPlanQualSetSlot(&epqstate, remoteslot);
+ InitConflictIndexes(relinfo);
+
/* Do the actual update. */
TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
@@ -2698,16 +2723,19 @@ apply_handle_update_internal(ApplyExecutionData *edata,
}
else
{
+ TupleTableSlot *newslot = localslot;
+
+ /* Store the new tuple for conflict reporting */
+ slot_store_data(newslot, relmapentry, newtup);
+
/*
* The tuple to be updated could not be found. Do nothing except for
* emitting a log message.
- *
- * XXX should this be promoted to ereport(LOG) perhaps?
*/
- elog(DEBUG1,
- "logical replication did not find row to be updated "
- "in replication target relation \"%s\"",
- RelationGetRelationName(localrel));
+ ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
+ remoteslot, NULL, newslot,
+ InvalidOid, InvalidTransactionId,
+ InvalidRepOriginId, 0);
}
/* Cleanup. */
@@ -2830,6 +2858,20 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
/* If found delete it. */
if (found)
{
+ RepOriginId localorigin;
+ TransactionId localxmin;
+ TimestampTz localts;
+
+ /*
+ * Report the conflict if the tuple was modified by a different
+ * origin.
+ */
+ if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
+ localorigin != replorigin_session_origin)
+ ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_DIFFER,
+ remoteslot, localslot, NULL,
+ InvalidOid, localxmin, localorigin, localts);
+
EvalPlanQualSetSlot(&epqstate, localslot);
/* Do the actual delete. */
@@ -2841,13 +2883,11 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
/*
* The tuple to be deleted could not be found. Do nothing except for
* emitting a log message.
- *
- * XXX should this be promoted to ereport(LOG) perhaps?
*/
- elog(DEBUG1,
- "logical replication did not find row to be deleted "
- "in replication target relation \"%s\"",
- RelationGetRelationName(localrel));
+ ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
+ remoteslot, NULL, NULL,
+ InvalidOid, InvalidTransactionId,
+ InvalidRepOriginId, 0);
}
/* Cleanup. */
@@ -3015,6 +3055,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
Relation partrel_new;
bool found;
EPQState epqstate;
+ RepOriginId localorigin;
+ TransactionId localxmin;
+ TimestampTz localts;
/* Get the matching local tuple from the partition. */
found = FindReplTupleInLocalRel(edata, partrel,
@@ -3023,19 +3066,43 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
remoteslot_part, &localslot);
if (!found)
{
+ TupleTableSlot *newslot = localslot;
+
+ /* Store the new tuple for conflict reporting */
+ slot_store_data(newslot, part_entry, newtup);
+
/*
* The tuple to be updated could not be found. Do nothing
* except for emitting a log message.
- *
- * XXX should this be promoted to ereport(LOG) perhaps?
*/
- elog(DEBUG1,
- "logical replication did not find row to be updated "
- "in replication target relation's partition \"%s\"",
- RelationGetRelationName(partrel));
+ ReportApplyConflict(estate, partrelinfo,
+ LOG, CT_UPDATE_MISSING,
+ remoteslot_part, NULL, newslot,
+ InvalidOid, InvalidTransactionId,
+ InvalidRepOriginId, 0);
+
return;
}
+ /*
+ * Report the conflict if the tuple was modified by a
+ * different origin.
+ */
+ if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
+ localorigin != replorigin_session_origin)
+ {
+ TupleTableSlot *newslot;
+
+ /* Store the new tuple for conflict reporting */
+ newslot = table_slot_create(partrel, &estate->es_tupleTable);
+ slot_store_data(newslot, part_entry, newtup);
+
+ ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_DIFFER,
+ remoteslot_part, localslot, newslot,
+ InvalidOid, localxmin, localorigin,
+ localts);
+ }
+
/*
* Apply the update to the local tuple, putting the result in
* remoteslot_part.
@@ -3046,7 +3113,6 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
MemoryContextSwitchTo(oldctx);
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
- ExecOpenIndices(partrelinfo, false);
/*
* Does the updated tuple still satisfy the current
@@ -3063,6 +3129,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
* work already done above to find the local tuple in the
* partition.
*/
+ ExecOpenIndices(partrelinfo, true);
+ InitConflictIndexes(partrelinfo);
+
EvalPlanQualSetSlot(&epqstate, remoteslot_part);
TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
ACL_UPDATE);
@@ -3110,6 +3179,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
get_namespace_name(RelationGetNamespace(partrel_new)),
RelationGetRelationName(partrel_new));
+ ExecOpenIndices(partrelinfo, false);
+
/* DELETE old tuple found in the old partition. */
EvalPlanQualSetSlot(&epqstate, localslot);
TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 046a7fb69b..69c3ebff00 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -228,6 +228,10 @@ extern void ExecPartitionCheckEmitError(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate);
extern void ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate);
+extern char *ExecBuildSlotValueDescription(Oid reloid, TupleTableSlot *slot,
+ TupleDesc tupdesc,
+ Bitmapset *modifiedCols,
+ int maxfieldlen);
extern LockTupleMode ExecUpdateLockMode(EState *estate, ResultRelInfo *relinfo);
extern ExecRowMark *ExecFindRowMark(EState *estate, Index rti, bool missing_ok);
extern ExecAuxRowMark *ExecBuildAuxRowMark(ExecRowMark *erm, List *targetlist);
@@ -643,6 +647,7 @@ extern List *ExecInsertIndexTuples(ResultRelInfo *resultRelInfo,
extern bool ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot,
EState *estate, ItemPointer conflictTid,
+ ItemPointer tupleid,
List *arbiterIndexes);
extern void check_exclusion_constraint(Relation heap, Relation index,
IndexInfo *indexInfo,
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
new file mode 100644
index 0000000000..02cb84da7e
--- /dev/null
+++ b/src/include/replication/conflict.h
@@ -0,0 +1,58 @@
+/*-------------------------------------------------------------------------
+ * conflict.h
+ * Exports for conflicts logging.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CONFLICT_H
+#define CONFLICT_H
+
+#include "nodes/execnodes.h"
+#include "utils/timestamp.h"
+
+/*
+ * Conflict types that could occur while applying remote changes.
+ */
+typedef enum
+{
+ /* The row to be inserted violates unique constraint */
+ CT_INSERT_EXISTS,
+
+ /* The row to be updated was modified by a different origin */
+ CT_UPDATE_DIFFER,
+
+ /* The updated row value violates unique constraint */
+ CT_UPDATE_EXISTS,
+
+ /* The row to be updated is missing */
+ CT_UPDATE_MISSING,
+
+ /* The row to be deleted was modified by a different origin */
+ CT_DELETE_DIFFER,
+
+ /* The row to be deleted is missing */
+ CT_DELETE_MISSING,
+
+ /*
+ * Other conflicts, such as exclusion constraint violations, involve more
+ * complex rules than simple equality checks. These conflicts are left for
+ * future improvements.
+ */
+} ConflictType;
+
+extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
+ TransactionId *xmin,
+ RepOriginId *localorigin,
+ TimestampTz *localts);
+extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
+ int elevel, ConflictType type,
+ TupleTableSlot *searchslot,
+ TupleTableSlot *localslot,
+ TupleTableSlot *remoteslot,
+ Oid indexoid, TransactionId localxmin,
+ RepOriginId localorigin, TimestampTz localts);
+extern void InitConflictIndexes(ResultRelInfo *relInfo);
+
+#endif
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 471e981962..d377e7ae2b 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -331,13 +331,8 @@ is( $result, qq(1|bar
2|baz),
'update works with REPLICA IDENTITY FULL and a primary key');
-# Check that subscriber handles cases where update/delete target tuple
-# is missing. We have to look for the DEBUG1 log messages about that,
-# so temporarily bump up the log verbosity.
-$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
-$node_subscriber->reload;
-
$node_subscriber->safe_psql('postgres', "DELETE FROM tab_full_pk");
+$node_subscriber->safe_psql('postgres', "DELETE FROM tab_full WHERE a = 25");
# Note that the current location of the log file is not grabbed immediately
# after reloading the configuration, but after sending one SQL command to
@@ -346,16 +341,21 @@ my $log_location = -s $node_subscriber->logfile;
$node_publisher->safe_psql('postgres',
"UPDATE tab_full_pk SET b = 'quux' WHERE a = 1");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_full SET a = a + 1 WHERE a = 25");
$node_publisher->safe_psql('postgres', "DELETE FROM tab_full_pk WHERE a = 2");
$node_publisher->wait_for_catchup('tap_sub');
my $logfile = slurp_file($node_subscriber->logfile, $log_location);
ok( $logfile =~
- qr/logical replication did not find row to be updated in replication target relation "tab_full_pk"/,
+ qr/conflict detected on relation "public.tab_full_pk": conflict=update_missing.*\n.*DETAIL:.* Could not find the row to be updated.*\n.*Remote tuple \(1, quux\); replica identity \(a\)=\(1\)/m,
'update target row is missing');
ok( $logfile =~
- qr/logical replication did not find row to be deleted in replication target relation "tab_full_pk"/,
+ qr/conflict detected on relation "public.tab_full": conflict=update_missing.*\n.*DETAIL:.* Could not find the row to be updated.*\n.*Remote tuple \(26\); replica identity full \(25\)/m,
+ 'update target row is missing');
+ok( $logfile =~
+ qr/conflict detected on relation "public.tab_full_pk": conflict=delete_missing.*\n.*DETAIL:.* Could not find the row to be deleted.*\n.*Replica identity \(a\)=\(2\)/m,
'delete target row is missing');
$node_subscriber->append_conf('postgresql.conf',
@@ -517,7 +517,7 @@ is($result, qq(1052|1|1002),
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_full");
-is($result, qq(21|0|100), 'check replicated insert after alter publication');
+is($result, qq(19|0|100), 'check replicated insert after alter publication');
# check restart on rename
$oldpid = $node_publisher->safe_psql('postgres',
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 29580525a9..cf91542ed0 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -343,13 +343,6 @@ $result =
$node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1");
is($result, qq(), 'truncate of tab1 replicated');
-# Check that subscriber handles cases where update/delete target tuple
-# is missing. We have to look for the DEBUG1 log messages about that,
-# so temporarily bump up the log verbosity.
-$node_subscriber1->append_conf('postgresql.conf',
- "log_min_messages = debug1");
-$node_subscriber1->reload;
-
$node_publisher->safe_psql('postgres',
"INSERT INTO tab1 VALUES (1, 'foo'), (4, 'bar'), (10, 'baz')");
@@ -372,22 +365,18 @@ $node_publisher->wait_for_catchup('sub2');
my $logfile = slurp_file($node_subscriber1->logfile(), $log_location);
ok( $logfile =~
- qr/logical replication did not find row to be updated in replication target relation's partition "tab1_2_2"/,
+ qr/conflict detected on relation "public.tab1_2_2": conflict=update_missing.*\n.*DETAIL:.* Could not find the row to be updated.*\n.*Remote tuple \(null, 4, quux\); replica identity \(a\)=\(4\)/,
'update target row is missing in tab1_2_2');
ok( $logfile =~
- qr/logical replication did not find row to be deleted in replication target relation "tab1_1"/,
+ qr/conflict detected on relation "public.tab1_1": conflict=delete_missing.*\n.*DETAIL:.* Could not find the row to be deleted.*\n.*Replica identity \(a\)=\(1\)/,
'delete target row is missing in tab1_1');
ok( $logfile =~
- qr/logical replication did not find row to be deleted in replication target relation "tab1_2_2"/,
+ qr/conflict detected on relation "public.tab1_2_2": conflict=delete_missing.*\n.*DETAIL:.* Could not find the row to be deleted.*\n.*Replica identity \(a\)=\(4\)/,
'delete target row is missing in tab1_2_2');
ok( $logfile =~
- qr/logical replication did not find row to be deleted in replication target relation "tab1_def"/,
+ qr/conflict detected on relation "public.tab1_def": conflict=delete_missing.*\n.*DETAIL:.* Could not find the row to be deleted.*\n.*Replica identity \(a\)=\(10\)/,
'delete target row is missing in tab1_def');
-$node_subscriber1->append_conf('postgresql.conf',
- "log_min_messages = warning");
-$node_subscriber1->reload;
-
# Tests for replication using root table identity and schema
# publisher
@@ -773,13 +762,6 @@ pub_tab2|3|yyy
pub_tab2|5|zzz
xxx_c|6|aaa), 'inserts into tab2 replicated');
-# Check that subscriber handles cases where update/delete target tuple
-# is missing. We have to look for the DEBUG1 log messages about that,
-# so temporarily bump up the log verbosity.
-$node_subscriber1->append_conf('postgresql.conf',
- "log_min_messages = debug1");
-$node_subscriber1->reload;
-
$node_subscriber1->safe_psql('postgres', "DELETE FROM tab2");
# Note that the current location of the log file is not grabbed immediately
@@ -796,15 +778,34 @@ $node_publisher->wait_for_catchup('sub2');
$logfile = slurp_file($node_subscriber1->logfile(), $log_location);
ok( $logfile =~
- qr/logical replication did not find row to be updated in replication target relation's partition "tab2_1"/,
+ qr/conflict detected on relation "public.tab2_1": conflict=update_missing.*\n.*DETAIL:.* Could not find the row to be updated.*\n.*Remote tuple \(pub_tab2, quux, 5\); replica identity \(a\)=\(5\)/,
'update target row is missing in tab2_1');
ok( $logfile =~
- qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/,
+ qr/conflict detected on relation "public.tab2_1": conflict=delete_missing.*\n.*DETAIL:.* Could not find the row to be deleted.*\n.*Replica identity \(a\)=\(1\)/,
'delete target row is missing in tab2_1');
+# Enable the track_commit_timestamp to detect the conflict when attempting
+# to update a row that was previously modified by a different origin.
$node_subscriber1->append_conf('postgresql.conf',
- "log_min_messages = warning");
-$node_subscriber1->reload;
+ 'track_commit_timestamp = on');
+$node_subscriber1->restart;
+
+$node_subscriber1->safe_psql('postgres',
+ "INSERT INTO tab2 VALUES (3, 'yyy')");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab2 SET b = 'quux' WHERE a = 3");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+
+$logfile = slurp_file($node_subscriber1->logfile(), $log_location);
+ok( $logfile =~
+ qr/conflict detected on relation "public.tab2_1": conflict=update_differ.*\n.*DETAIL:.* Updating the row that was modified locally in transaction [0-9]+ at .*\n.*Existing local tuple \(yyy, null, 3\); remote tuple \(pub_tab2, quux, 3\); replica identity \(a\)=\(3\)/,
+ 'updating a tuple that was modified by a different origin');
+
+# The remaining tests no longer test conflict detection.
+$node_subscriber1->append_conf('postgresql.conf',
+ 'track_commit_timestamp = off');
+$node_subscriber1->restart;
# Test that replication continues to work correctly after altering the
# partition of a partitioned target table.
diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl
index 0ab57a4b5b..2f099a74f3 100644
--- a/src/test/subscription/t/029_on_error.pl
+++ b/src/test/subscription/t/029_on_error.pl
@@ -30,7 +30,7 @@ sub test_skip_lsn
# ERROR with its CONTEXT when retrieving this information.
my $contents = slurp_file($node_subscriber->logfile, $offset);
$contents =~
- qr/duplicate key value violates unique constraint "tbl_pkey".*\n.*DETAIL:.*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m
+ qr/conflict detected on relation "public.tbl".*\n.*DETAIL:.* Key already exists in unique index "tbl_pkey", modified by .*origin.* transaction \d+ at .*\n.*Key \(i\)=\(\d+\); existing local tuple .*; remote tuple .*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m
or die "could not get error-LSN";
my $lsn = $1;
@@ -83,6 +83,7 @@ $node_subscriber->append_conf(
'postgresql.conf',
qq[
max_prepared_transactions = 10
+track_commit_timestamp = on
]);
$node_subscriber->start;
@@ -93,6 +94,7 @@ $node_publisher->safe_psql(
'postgres',
qq[
CREATE TABLE tbl (i INT, t BYTEA);
+ALTER TABLE tbl REPLICA IDENTITY FULL;
INSERT INTO tbl VALUES (1, NULL);
]);
$node_subscriber->safe_psql(
@@ -144,13 +146,14 @@ COMMIT;
test_skip_lsn($node_publisher, $node_subscriber,
"(2, NULL)", "2", "test skipping transaction");
-# Test for PREPARE and COMMIT PREPARED. Insert the same data to tbl and
-# PREPARE the transaction, raising an error. Then skip the transaction.
+# Test for PREPARE and COMMIT PREPARED. Update the data and PREPARE the
+# transaction, raising an error on the subscriber due to violation of the
+# unique constraint on tbl. Then skip the transaction.
$node_publisher->safe_psql(
'postgres',
qq[
BEGIN;
-INSERT INTO tbl VALUES (1, NULL);
+UPDATE tbl SET i = 2;
PREPARE TRANSACTION 'gtx';
COMMIT PREPARED 'gtx';
]);
diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl
index 056561f008..01536a13e7 100644
--- a/src/test/subscription/t/030_origin.pl
+++ b/src/test/subscription/t/030_origin.pl
@@ -27,9 +27,14 @@ my $stderr;
my $node_A = PostgreSQL::Test::Cluster->new('node_A');
$node_A->init(allows_streaming => 'logical');
$node_A->start;
+
# node_B
my $node_B = PostgreSQL::Test::Cluster->new('node_B');
$node_B->init(allows_streaming => 'logical');
+
+# Enable the track_commit_timestamp to detect the conflict when attempting to
+# update a row that was previously modified by a different origin.
+$node_B->append_conf('postgresql.conf', 'track_commit_timestamp = on');
$node_B->start;
# Create table on node_A
@@ -139,6 +144,48 @@ is($result, qq(),
'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none'
);
+###############################################################################
+# Check that the conflict can be detected when attempting to update or
+# delete a row that was previously modified by a different source.
+###############################################################################
+
+$node_B->safe_psql('postgres', "DELETE FROM tab;");
+
+$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (32);");
+
+$node_A->wait_for_catchup($subname_BA);
+$node_B->wait_for_catchup($subname_AB);
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(32), 'The node_A data replicated to node_B');
+
+# The update should update the row on node B that was inserted by node A.
+$node_C->safe_psql('postgres', "UPDATE tab SET a = 33 WHERE a = 32;");
+
+$node_B->wait_for_log(
+ qr/conflict detected on relation "public.tab": conflict=update_differ.*\n.*DETAIL:.* Updating the row that was modified by a different origin ".*" in transaction [0-9]+ at .*\n.*Existing local tuple \(32\); remote tuple \(33\); replica identity \(a\)=\(32\)/
+);
+
+$node_B->safe_psql('postgres', "DELETE FROM tab;");
+$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (33);");
+
+$node_A->wait_for_catchup($subname_BA);
+$node_B->wait_for_catchup($subname_AB);
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(33), 'The node_A data replicated to node_B');
+
+# The delete should remove the row on node B that was inserted by node A.
+$node_C->safe_psql('postgres', "DELETE FROM tab WHERE a = 33;");
+
+$node_B->wait_for_log(
+ qr/conflict detected on relation "public.tab": conflict=delete_differ.*\n.*DETAIL:.* Deleting the row that was modified by a different origin ".*" in transaction [0-9]+ at .*\n.*Existing local tuple \(33\); replica identity \(a\)=\(33\)/
+);
+
+# The remaining tests no longer test conflict detection.
+$node_B->append_conf('postgresql.conf', 'track_commit_timestamp = off');
+$node_B->restart;
+
###############################################################################
# Specifying origin = NONE indicates that the publisher should only replicate the
# changes that are generated locally from node_B, but in this case since the
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 547d14b3e7..6d424c8918 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -467,6 +467,7 @@ ConditionVariableMinimallyPadded
ConditionalStack
ConfigData
ConfigVariable
+ConflictType
ConnCacheEntry
ConnCacheKey
ConnParams