From 8c58624df46222d4d09c5655d8350f3b037880c8 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Wed, 8 Feb 2023 07:58:25 +0530 Subject: [PATCH] Fix the logical replication timeout during large DDLs. The DDLs like Refresh Materialized views that generate lots of temporary data due to rewrite rules may not be processed by output plugins (for example pgoutput). So, we won't send keep-alive messages for a long time while processing such commands and that can lead the subscriber side to timeout. We have previously fixed a similar case for large transactions in commit f95d53eded where the output plugin filters all or most of the changes but missed to handle the DDLs. We decided not to backpatch this as this adds a new callback in the existing exposed structure and moreover, users can increase the wal_sender_timeout and wal_receiver_timeout to avoid this problem. Author: Wang wei, Hou Zhijie Reviewed-by: Peter Smith, Ashutosh Bapat, Shi yu, Amit Kapila Discussion: https://postgr.es/m/OS3PR01MB6275478E5D29E4A563302D3D9E2B9@OS3PR01MB6275.jpnprd01.prod.outlook.com Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com --- src/backend/replication/logical/logical.c | 50 +++++++++++++++++ .../replication/logical/reorderbuffer.c | 20 +++++++ src/backend/replication/pgoutput/pgoutput.c | 54 +++---------------- src/include/replication/reorderbuffer.h | 12 +++++ src/tools/pgindent/typedefs.list | 1 + 5 files changed, 89 insertions(+), 48 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1a58dd7649..c3ec97a0a6 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -93,6 +93,11 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); +/* callback to update txn's progress */ +static void update_progress_txn_cb_wrapper(ReorderBuffer *cache, + ReorderBufferTXN *txn, + XLogRecPtr lsn); + static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin); /* @@ -278,6 +283,12 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->commit_prepared = commit_prepared_cb_wrapper; ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper; + /* + * Callback to support updating progress during sending data of a + * transaction (and its subtransactions) to the output plugin. + */ + ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper; + ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; ctx->write = do_write; @@ -1584,6 +1595,45 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "update_progress_txn"; + state.report_location = lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + ctx->write_xid = txn->xid; + + /* + * Report this change's lsn so replies from clients can give an up-to-date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = lsn; + + ctx->end_xact = false; + + OutputPluginUpdateProgress(ctx, false); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + /* * Set the required catalog xmin horizon for historic snapshots in the current * replication slot. diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 0468d12936..d5f90a5f5d 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2100,6 +2100,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, PG_TRY(); { ReorderBufferChange *change; + int changes_count = 0; /* used to accumulate the number of + * changes */ if (using_subtxn) BeginInternalSubTransaction(streaming ? "stream" : "replay"); @@ -2440,6 +2442,24 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, elog(ERROR, "tuplecid value in changequeue"); break; } + + /* + * It is possible that the data is not sent to downstream for a + * long time either because the output plugin filtered it or there + * is a DDL that generates a lot of data that is not processed by + * the plugin. So, in such cases, the downstream can timeout. To + * avoid that we try to send a keepalive message if required. + * Trying to send a keepalive message after every change has some + * overhead, but testing showed there is no noticeable overhead if + * we do it after every ~100 changes. + */ +#define CHANGES_THRESHOLD 100 + + if (++changes_count >= CHANGES_THRESHOLD) + { + rb->update_progress_txn(rb, txn, change->lsn); + changes_count = 0; + } } /* speculative insertion record must be freed by now */ diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index e4938d8888..73b080060d 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -92,8 +92,6 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid, static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); -static void update_replication_progress(LogicalDecodingContext *ctx, - bool skipped_xact); /* * Only 3 publication actions are used for row filtering ("insert", "update", @@ -586,7 +584,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * from this transaction has been sent to the downstream. */ sent_begin_txn = txndata->sent_begin_txn; - update_replication_progress(ctx, !sent_begin_txn); + OutputPluginUpdateProgress(ctx, !sent_begin_txn); pfree(txndata); txn->output_plugin_private = NULL; @@ -625,7 +623,7 @@ static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_prepare(ctx->out, txn, prepare_lsn); @@ -639,7 +637,7 @@ static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); @@ -655,7 +653,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time) { - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, @@ -1401,8 +1399,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleTableSlot *old_slot = NULL; TupleTableSlot *new_slot = NULL; - update_replication_progress(ctx, false); - if (!is_publishable_relation(relation)) return; @@ -1637,8 +1633,6 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Oid *relids; TransactionId xid = InvalidTransactionId; - update_replication_progress(ctx, false); - /* Remember the xid for the change in streaming mode. See pgoutput_change. */ if (in_streaming) xid = change->txn->xid; @@ -1702,8 +1696,6 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; TransactionId xid = InvalidTransactionId; - update_replication_progress(ctx, false); - if (!data->messages) return; @@ -1903,7 +1895,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, Assert(!in_streaming); Assert(rbtxn_is_streamed(txn)); - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); @@ -1924,7 +1916,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, { Assert(rbtxn_is_streamed(txn)); - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); @@ -2424,37 +2416,3 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, } } } - -/* - * Try to update progress and send a keepalive message if too many changes were - * processed. - * - * For a large transaction, if we don't send any change to the downstream for a - * long time (exceeds the wal_receiver_timeout of standby) then it can timeout. - * This can happen when all or most of the changes are either not published or - * got filtered out. - */ -static void -update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact) -{ - static int changes_count = 0; - - /* - * We don't want to try sending a keepalive message after processing each - * change as that can have overhead. Tests revealed that there is no - * noticeable overhead in doing it after continuously processing 100 or so - * changes. - */ -#define CHANGES_THRESHOLD 100 - - /* - * If we are at the end of transaction LSN, update progress tracking. - * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we - * try to send a keepalive message if required. - */ - if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD) - { - OutputPluginUpdateProgress(ctx, skipped_xact); - changes_count = 0; - } -} diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index e5db041df1..215d1494e9 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -526,6 +526,12 @@ typedef void (*ReorderBufferStreamTruncateCB) ( Relation relations[], ReorderBufferChange *change); +/* update progress txn callback signature */ +typedef void (*ReorderBufferUpdateProgressTxnCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr lsn); + struct ReorderBuffer { /* @@ -589,6 +595,12 @@ struct ReorderBuffer ReorderBufferStreamMessageCB stream_message; ReorderBufferStreamTruncateCB stream_truncate; + /* + * Callback to be called when updating progress during sending data of a + * transaction (and its subtransactions) to the output plugin. + */ + ReorderBufferUpdateProgressTxnCB update_progress_txn; + /* * Pointer that will be passed untouched to the callbacks. */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 07fbb7ccf6..d3224dfc36 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2311,6 +2311,7 @@ ReorderBufferToastEnt ReorderBufferTupleBuf ReorderBufferTupleCidEnt ReorderBufferTupleCidKey +ReorderBufferUpdateProgressTxnCB ReorderTuple RepOriginId ReparameterizeForeignPathByChild_function