diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index fb4472356d..82326c3901 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -352,11 +352,26 @@ The resolution can be done either by changing data or permissions on the subscriber so that it does not conflict with the incoming change or by skipping the - transaction that conflicts with the existing data. The transaction can be - skipped by calling the + transaction that conflicts with the existing data. When a conflict produces + an error, the replication won't proceed, and the logical replication worker will + emit the following kind of message to the subscriber's server log: + +ERROR: duplicate key value violates unique constraint "test_pkey" +DETAIL: Key (c)=(1) already exists. +CONTEXT: processing remote data for replication origin "pg_16395" during "INSERT" for replication target relation "public.test" in transaction 725 finished at 0/14C0378 + + The LSN of the transaction that contains the change violating the constraint and + the replication origin name can be found from the server log (LSN 0/14C0378 and + replication origin pg_16395 in the above case). To skip the + transaction, the subscription needs to be disabled temporarily by + ALTER SUBSCRIPTION ... DISABLE first. Then, the transaction + can be skipped by calling the + pg_replication_origin_advance() function with - a node_name corresponding to the subscription name, - and a position. The current position of origins can be seen in the + the node_name (i.e., pg_16395) and the + next LSN of the transaction's LSN (i.e., LSN 0/14C0379). After that the replication + can be resumed by ALTER SUBSCRIPTION ... ENABLE. The current + position of origins can be seen in the pg_replication_origin_status system view. diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 92aa794706..8653e1d840 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg /* Remote node information */ int remote_attnum; /* -1 if invalid */ TransactionId remote_xid; + XLogRecPtr finish_lsn; + char *origin_name; } ApplyErrorCallbackArg; static ApplyErrorCallbackArg apply_error_callback_arg = @@ -234,6 +236,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg = .rel = NULL, .remote_attnum = -1, .remote_xid = InvalidTransactionId, + .finish_lsn = InvalidXLogRecPtr, + .origin_name = NULL, }; static MemoryContext ApplyMessageContext = NULL; @@ -332,7 +336,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); /* Functions for apply error callback */ static void apply_error_callback(void *arg); -static inline void set_apply_error_context_xact(TransactionId xid); +static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn); static inline void reset_apply_error_context_info(void); /* @@ -785,7 +789,7 @@ apply_handle_begin(StringInfo s) LogicalRepBeginData begin_data; logicalrep_read_begin(s, &begin_data); - set_apply_error_context_xact(begin_data.xid); + set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn); remote_final_lsn = begin_data.final_lsn; @@ -837,7 +841,7 @@ apply_handle_begin_prepare(StringInfo s) errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); logicalrep_read_begin_prepare(s, &begin_data); - set_apply_error_context_xact(begin_data.xid); + set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn); remote_final_lsn = begin_data.prepare_lsn; @@ -936,7 +940,7 @@ apply_handle_commit_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_commit_prepared(s, &prepare_data); - set_apply_error_context_xact(prepare_data.xid); + set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn); /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, @@ -977,7 +981,7 @@ apply_handle_rollback_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_rollback_prepared(s, &rollback_data); - set_apply_error_context_xact(rollback_data.xid); + set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn); /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, @@ -1042,7 +1046,7 @@ apply_handle_stream_prepare(StringInfo s) errmsg_internal("tablesync worker received a STREAM PREPARE message"))); logicalrep_read_stream_prepare(s, &prepare_data); - set_apply_error_context_xact(prepare_data.xid); + set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn); elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid); @@ -1124,7 +1128,7 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid transaction ID in streamed replication transaction"))); - set_apply_error_context_xact(stream_xid); + set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr); /* * Initialize the worker's stream_fileset if we haven't yet. This will be @@ -1213,7 +1217,7 @@ apply_handle_stream_abort(StringInfo s) */ if (xid == subxid) { - set_apply_error_context_xact(xid); + set_apply_error_context_xact(xid, InvalidXLogRecPtr); stream_cleanup_files(MyLogicalRepWorker->subid, xid); } else @@ -1239,7 +1243,7 @@ apply_handle_stream_abort(StringInfo s) bool found = false; char path[MAXPGPATH]; - set_apply_error_context_xact(subxid); + set_apply_error_context_xact(subxid, InvalidXLogRecPtr); subidx = -1; begin_replication_step(); @@ -1424,7 +1428,7 @@ apply_handle_stream_commit(StringInfo s) errmsg_internal("STREAM COMMIT message without STREAM STOP"))); xid = logicalrep_read_stream_commit(s, &commit_data); - set_apply_error_context_xact(xid); + set_apply_error_context_xact(xid, commit_data.commit_lsn); elog(DEBUG1, "received commit for streamed transaction %u", xid); @@ -3499,6 +3503,17 @@ ApplyWorkerMain(Datum main_arg) myslotname = MemoryContextStrdup(ApplyContext, syncslotname); pfree(syncslotname); + + /* + * Allocate the origin name in long-lived context for error context + * message. + */ + ReplicationOriginNameForTablesync(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, + originname); } else { @@ -3542,6 +3557,13 @@ ApplyWorkerMain(Datum main_arg) * does some initializations on the upstream so let's still call it. */ (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + + /* + * Allocate the origin name in long-lived context for error context + * message. + */ + apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, + originname); } /* @@ -3651,36 +3673,51 @@ apply_error_callback(void *arg) if (apply_error_callback_arg.command == 0) return; + Assert(errarg->origin_name); + if (errarg->rel == NULL) { if (!TransactionIdIsValid(errarg->remote_xid)) - errcontext("processing remote data during \"%s\"", + errcontext("processing remote data for replication origin \"%s\" during \"%s\"", + errarg->origin_name, logicalrep_message_type(errarg->command)); - else - errcontext("processing remote data during \"%s\" in transaction %u", + else if (XLogRecPtrIsInvalid(errarg->finish_lsn)) + errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u", + errarg->origin_name, logicalrep_message_type(errarg->command), errarg->remote_xid); + else + errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u finished at %X/%X", + errarg->origin_name, + logicalrep_message_type(errarg->command), + errarg->remote_xid, + LSN_FORMAT_ARGS(errarg->finish_lsn)); } else if (errarg->remote_attnum < 0) - errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u", + errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" in transaction %u finished at %X/%X", + errarg->origin_name, logicalrep_message_type(errarg->command), errarg->rel->remoterel.nspname, errarg->rel->remoterel.relname, - errarg->remote_xid); + errarg->remote_xid, + LSN_FORMAT_ARGS(errarg->finish_lsn)); else - errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u", + errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u finished at %X/%X", + errarg->origin_name, logicalrep_message_type(errarg->command), errarg->rel->remoterel.nspname, errarg->rel->remoterel.relname, errarg->rel->remoterel.attnames[errarg->remote_attnum], - errarg->remote_xid); + errarg->remote_xid, + LSN_FORMAT_ARGS(errarg->finish_lsn)); } /* Set transaction information of apply error callback */ static inline void -set_apply_error_context_xact(TransactionId xid) +set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn) { apply_error_callback_arg.remote_xid = xid; + apply_error_callback_arg.finish_lsn = lsn; } /* Reset all information of apply error callback */ @@ -3690,5 +3727,5 @@ reset_apply_error_context_info(void) apply_error_callback_arg.command = 0; apply_error_callback_arg.rel = NULL; apply_error_callback_arg.remote_attnum = -1; - set_apply_error_context_xact(InvalidTransactionId); + set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr); }