Add the additional information to the logical replication worker errcontext.

This commits adds both the finish LSN (commit_lsn in case transaction got
committed, prepare_lsn in case of a prepared transaction, etc.) and
replication origin name to the existing error context message.

This will help users in specifying the origin name and transaction finish
LSN to pg_replication_origin_advance() SQL function to skip a particular
transaction.

Author: Masahiko Sawada
Reviewed-by: Takamichi Osumi, Euler Taveira, and Amit Kapila
Discussion: https://postgr.es/m/CAD21AoBarBf2oTF71ig2g_o=3Z_Dt6_sOpMQma1kFgbnA5OZ_w@mail.gmail.com
This commit is contained in:
Amit Kapila 2022-03-08 08:08:32 +05:30
parent 4228cabb72
commit d3e8368c4b
2 changed files with 75 additions and 23 deletions

View File

@ -352,11 +352,26 @@
<para> <para>
The resolution can be done either by changing data or permissions on the subscriber so The resolution can be done either by changing data or permissions on the subscriber so
that it does not conflict with the incoming change or by skipping the that it does not conflict with the incoming change or by skipping the
transaction that conflicts with the existing data. The transaction can be transaction that conflicts with the existing data. When a conflict produces
skipped by calling the <link linkend="pg-replication-origin-advance"> an error, the replication won't proceed, and the logical replication worker will
emit the following kind of message to the subscriber's server log:
<screen>
ERROR: duplicate key value violates unique constraint "test_pkey"
DETAIL: Key (c)=(1) already exists.
CONTEXT: processing remote data for replication origin "pg_16395" during "INSERT" for replication target relation "public.test" in transaction 725 finished at 0/14C0378
</screen>
The LSN of the transaction that contains the change violating the constraint and
the replication origin name can be found from the server log (LSN 0/14C0378 and
replication origin <literal>pg_16395</literal> in the above case). To skip the
transaction, the subscription needs to be disabled temporarily by
<command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the transaction
can be skipped by calling the
<link linkend="pg-replication-origin-advance">
<function>pg_replication_origin_advance()</function></link> function with <function>pg_replication_origin_advance()</function></link> function with
a <parameter>node_name</parameter> corresponding to the subscription name, the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the
and a position. The current position of origins can be seen in the next LSN of the transaction's LSN (i.e., LSN 0/14C0379). After that the replication
can be resumed by <command>ALTER SUBSCRIPTION ... ENABLE</command>. The current
position of origins can be seen in the
<link linkend="view-pg-replication-origin-status"> <link linkend="view-pg-replication-origin-status">
<structname>pg_replication_origin_status</structname></link> system view. <structname>pg_replication_origin_status</structname></link> system view.
</para> </para>

View File

@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg
/* Remote node information */ /* Remote node information */
int remote_attnum; /* -1 if invalid */ int remote_attnum; /* -1 if invalid */
TransactionId remote_xid; TransactionId remote_xid;
XLogRecPtr finish_lsn;
char *origin_name;
} ApplyErrorCallbackArg; } ApplyErrorCallbackArg;
static ApplyErrorCallbackArg apply_error_callback_arg = static ApplyErrorCallbackArg apply_error_callback_arg =
@ -234,6 +236,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.rel = NULL, .rel = NULL,
.remote_attnum = -1, .remote_attnum = -1,
.remote_xid = InvalidTransactionId, .remote_xid = InvalidTransactionId,
.finish_lsn = InvalidXLogRecPtr,
.origin_name = NULL,
}; };
static MemoryContext ApplyMessageContext = NULL; static MemoryContext ApplyMessageContext = NULL;
@ -332,7 +336,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/* Functions for apply error callback */ /* Functions for apply error callback */
static void apply_error_callback(void *arg); static void apply_error_callback(void *arg);
static inline void set_apply_error_context_xact(TransactionId xid); static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
static inline void reset_apply_error_context_info(void); static inline void reset_apply_error_context_info(void);
/* /*
@ -785,7 +789,7 @@ apply_handle_begin(StringInfo s)
LogicalRepBeginData begin_data; LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data); logicalrep_read_begin(s, &begin_data);
set_apply_error_context_xact(begin_data.xid); set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
remote_final_lsn = begin_data.final_lsn; remote_final_lsn = begin_data.final_lsn;
@ -837,7 +841,7 @@ apply_handle_begin_prepare(StringInfo s)
errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data); logicalrep_read_begin_prepare(s, &begin_data);
set_apply_error_context_xact(begin_data.xid); set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
remote_final_lsn = begin_data.prepare_lsn; remote_final_lsn = begin_data.prepare_lsn;
@ -936,7 +940,7 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE]; char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data); logicalrep_read_commit_prepared(s, &prepare_data);
set_apply_error_context_xact(prepare_data.xid); set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
/* Compute GID for two_phase transactions. */ /* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@ -977,7 +981,7 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE]; char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data); logicalrep_read_rollback_prepared(s, &rollback_data);
set_apply_error_context_xact(rollback_data.xid); set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
/* Compute GID for two_phase transactions. */ /* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@ -1042,7 +1046,7 @@ apply_handle_stream_prepare(StringInfo s)
errmsg_internal("tablesync worker received a STREAM PREPARE message"))); errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data); logicalrep_read_stream_prepare(s, &prepare_data);
set_apply_error_context_xact(prepare_data.xid); set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid); elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
@ -1124,7 +1128,7 @@ apply_handle_stream_start(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION), (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction"))); errmsg_internal("invalid transaction ID in streamed replication transaction")));
set_apply_error_context_xact(stream_xid); set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
/* /*
* Initialize the worker's stream_fileset if we haven't yet. This will be * Initialize the worker's stream_fileset if we haven't yet. This will be
@ -1213,7 +1217,7 @@ apply_handle_stream_abort(StringInfo s)
*/ */
if (xid == subxid) if (xid == subxid)
{ {
set_apply_error_context_xact(xid); set_apply_error_context_xact(xid, InvalidXLogRecPtr);
stream_cleanup_files(MyLogicalRepWorker->subid, xid); stream_cleanup_files(MyLogicalRepWorker->subid, xid);
} }
else else
@ -1239,7 +1243,7 @@ apply_handle_stream_abort(StringInfo s)
bool found = false; bool found = false;
char path[MAXPGPATH]; char path[MAXPGPATH];
set_apply_error_context_xact(subxid); set_apply_error_context_xact(subxid, InvalidXLogRecPtr);
subidx = -1; subidx = -1;
begin_replication_step(); begin_replication_step();
@ -1424,7 +1428,7 @@ apply_handle_stream_commit(StringInfo s)
errmsg_internal("STREAM COMMIT message without STREAM STOP"))); errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data); xid = logicalrep_read_stream_commit(s, &commit_data);
set_apply_error_context_xact(xid); set_apply_error_context_xact(xid, commit_data.commit_lsn);
elog(DEBUG1, "received commit for streamed transaction %u", xid); elog(DEBUG1, "received commit for streamed transaction %u", xid);
@ -3499,6 +3503,17 @@ ApplyWorkerMain(Datum main_arg)
myslotname = MemoryContextStrdup(ApplyContext, syncslotname); myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
pfree(syncslotname); pfree(syncslotname);
/*
* Allocate the origin name in long-lived context for error context
* message.
*/
ReplicationOriginNameForTablesync(MySubscription->oid,
MyLogicalRepWorker->relid,
originname,
sizeof(originname));
apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
originname);
} }
else else
{ {
@ -3542,6 +3557,13 @@ ApplyWorkerMain(Datum main_arg)
* does some initializations on the upstream so let's still call it. * does some initializations on the upstream so let's still call it.
*/ */
(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
/*
* Allocate the origin name in long-lived context for error context
* message.
*/
apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
originname);
} }
/* /*
@ -3651,36 +3673,51 @@ apply_error_callback(void *arg)
if (apply_error_callback_arg.command == 0) if (apply_error_callback_arg.command == 0)
return; return;
Assert(errarg->origin_name);
if (errarg->rel == NULL) if (errarg->rel == NULL)
{ {
if (!TransactionIdIsValid(errarg->remote_xid)) if (!TransactionIdIsValid(errarg->remote_xid))
errcontext("processing remote data during \"%s\"", errcontext("processing remote data for replication origin \"%s\" during \"%s\"",
errarg->origin_name,
logicalrep_message_type(errarg->command)); logicalrep_message_type(errarg->command));
else else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
errcontext("processing remote data during \"%s\" in transaction %u", errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u",
errarg->origin_name,
logicalrep_message_type(errarg->command), logicalrep_message_type(errarg->command),
errarg->remote_xid); errarg->remote_xid);
else
errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u finished at %X/%X",
errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->remote_xid,
LSN_FORMAT_ARGS(errarg->finish_lsn));
} }
else if (errarg->remote_attnum < 0) else if (errarg->remote_attnum < 0)
errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u", errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" in transaction %u finished at %X/%X",
errarg->origin_name,
logicalrep_message_type(errarg->command), logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname, errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname, errarg->rel->remoterel.relname,
errarg->remote_xid); errarg->remote_xid,
LSN_FORMAT_ARGS(errarg->finish_lsn));
else else
errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u", errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u finished at %X/%X",
errarg->origin_name,
logicalrep_message_type(errarg->command), logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname, errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname, errarg->rel->remoterel.relname,
errarg->rel->remoterel.attnames[errarg->remote_attnum], errarg->rel->remoterel.attnames[errarg->remote_attnum],
errarg->remote_xid); errarg->remote_xid,
LSN_FORMAT_ARGS(errarg->finish_lsn));
} }
/* Set transaction information of apply error callback */ /* Set transaction information of apply error callback */
static inline void static inline void
set_apply_error_context_xact(TransactionId xid) set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
{ {
apply_error_callback_arg.remote_xid = xid; apply_error_callback_arg.remote_xid = xid;
apply_error_callback_arg.finish_lsn = lsn;
} }
/* Reset all information of apply error callback */ /* Reset all information of apply error callback */
@ -3690,5 +3727,5 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.command = 0; apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL; apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1; apply_error_callback_arg.remote_attnum = -1;
set_apply_error_context_xact(InvalidTransactionId); set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
} }