diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 788769dd73..625a7f4273 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -746,6 +746,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.startup_cb(ctx, opt, is_init); @@ -773,6 +774,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx) /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.shutdown_cb(ctx); @@ -808,6 +810,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->first_lsn; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.begin_cb(ctx, txn); @@ -839,6 +842,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ + ctx->end_xact = true; /* do the actual work: call callback */ ctx->callbacks.commit_cb(ctx, txn, commit_lsn); @@ -879,6 +883,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->first_lsn; + ctx->end_xact = false; /* * If the plugin supports two-phase commits then begin prepare callback is @@ -923,6 +928,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ + ctx->end_xact = true; /* * If the plugin supports two-phase commits then prepare callback is @@ -967,6 +973,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ + ctx->end_xact = true; /* * If the plugin support two-phase commits then commit prepared callback @@ -1012,6 +1019,7 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ + ctx->end_xact = true; /* * If the plugin support two-phase commits then rollback prepared callback @@ -1062,6 +1070,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + ctx->callbacks.change_cb(ctx, txn, relation, change); /* Pop the error context stack */ @@ -1102,6 +1112,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change); /* Pop the error context stack */ @@ -1129,6 +1141,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid); @@ -1159,6 +1172,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id); @@ -1196,6 +1210,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; ctx->write_location = message_lsn; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix, @@ -1239,6 +1254,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = first_lsn; + ctx->end_xact = false; + /* in streaming mode, stream_start_cb is required */ if (ctx->callbacks.stream_start_cb == NULL) ereport(ERROR, @@ -1286,6 +1303,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = last_lsn; + ctx->end_xact = false; + /* in streaming mode, stream_stop_cb is required */ if (ctx->callbacks.stream_stop_cb == NULL) ereport(ERROR, @@ -1325,6 +1344,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = abort_lsn; + ctx->end_xact = true; /* in streaming mode, stream_abort_cb is required */ if (ctx->callbacks.stream_abort_cb == NULL) @@ -1369,6 +1389,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; + ctx->end_xact = true; /* in streaming mode with two-phase commits, stream_prepare_cb is required */ if (ctx->callbacks.stream_prepare_cb == NULL) @@ -1409,6 +1430,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; + ctx->end_xact = true; /* in streaming mode, stream_commit_cb is required */ if (ctx->callbacks.stream_commit_cb == NULL) @@ -1457,6 +1479,8 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + /* in streaming mode, stream_change_cb is required */ if (ctx->callbacks.stream_change_cb == NULL) ereport(ERROR, @@ -1501,6 +1525,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; ctx->write_location = message_lsn; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix, @@ -1549,6 +1574,8 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change); /* Pop the error context stack */ diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index b197bfd565..406ad84e1d 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -91,6 +91,8 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid, static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); +static void update_replication_progress(LogicalDecodingContext *ctx, + bool skipped_xact); /* * Only 3 publication actions are used for row filtering ("insert", "update", @@ -558,7 +560,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * from this transaction has been sent to the downstream. */ sent_begin_txn = txndata->sent_begin_txn; - OutputPluginUpdateProgress(ctx, !sent_begin_txn); + update_replication_progress(ctx, !sent_begin_txn); pfree(txndata); txn->output_plugin_private = NULL; @@ -597,7 +599,7 @@ static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { - OutputPluginUpdateProgress(ctx, false); + update_replication_progress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_prepare(ctx->out, txn, prepare_lsn); @@ -611,7 +613,7 @@ static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - OutputPluginUpdateProgress(ctx, false); + update_replication_progress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); @@ -627,7 +629,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time) { - OutputPluginUpdateProgress(ctx, false); + update_replication_progress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, @@ -1360,6 +1362,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleTableSlot *old_slot = NULL; TupleTableSlot *new_slot = NULL; + update_replication_progress(ctx, false); + if (!is_publishable_relation(relation)) return; @@ -1592,6 +1596,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Oid *relids; TransactionId xid = InvalidTransactionId; + update_replication_progress(ctx, false); + /* Remember the xid for the change in streaming mode. See pgoutput_change. */ if (in_streaming) xid = change->txn->xid; @@ -1655,6 +1661,8 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; TransactionId xid = InvalidTransactionId; + update_replication_progress(ctx, false); + if (!data->messages) return; @@ -1847,7 +1855,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, Assert(!in_streaming); Assert(rbtxn_is_streamed(txn)); - OutputPluginUpdateProgress(ctx, false); + update_replication_progress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); @@ -1868,7 +1876,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, { Assert(rbtxn_is_streamed(txn)); - OutputPluginUpdateProgress(ctx, false); + update_replication_progress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); @@ -2361,3 +2369,37 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, } } } + +/* + * Try to update progress and send a keepalive message if too many changes were + * processed. + * + * For a large transaction, if we don't send any change to the downstream for a + * long time (exceeds the wal_receiver_timeout of standby) then it can timeout. + * This can happen when all or most of the changes are either not published or + * got filtered out. + */ +static void +update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact) +{ + static int changes_count = 0; + + /* + * We don't want to try sending a keepalive message after processing each + * change as that can have overhead. Tests revealed that there is no + * noticeable overhead in doing it after continuously processing 100 or so + * changes. + */ +#define CHANGES_THRESHOLD 100 + + /* + * If we are at the end of transaction LSN, update progress tracking. + * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we + * try to send a keepalive message if required. + */ + if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD) + { + OutputPluginUpdateProgress(ctx, skipped_xact); + changes_count = 0; + } +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 63a818140b..c6c196b2fa 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1482,14 +1482,20 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId { static TimestampTz sendTime = 0; TimestampTz now = GetCurrentTimestamp(); + bool pending_writes = false; + bool end_xact = ctx->end_xact; /* * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to * avoid flooding the lag tracker when we commit frequently. + * + * We don't have a mechanism to get the ack for any LSN other than end + * xact LSN from the downstream. So, we track lag only for end of + * transaction LSN. */ #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000 - if (TimestampDifferenceExceeds(sendTime, now, - WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) + if (end_xact && TimestampDifferenceExceeds(sendTime, now, + WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) { LagTrackerWrite(lsn, now); sendTime = now; @@ -1515,8 +1521,20 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId /* If we have pending write here, make sure it's actually flushed */ if (pq_is_send_pending()) - ProcessPendingWrites(); + pending_writes = true; } + + /* + * Process pending writes if any or try to send a keepalive if required. + * We don't need to try sending keep alive messages at the transaction end + * as that will be done at a later point in time. This is required only + * for large transactions where we don't send any changes to the + * downstream and the receiver can timeout due to that. + */ + if (pending_writes || (!end_xact && + now >= TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2))) + ProcessPendingWrites(); } /* diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index a6ef16ad5b..edadacd589 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -107,6 +107,8 @@ typedef struct LogicalDecodingContext bool prepared_write; XLogRecPtr write_location; TransactionId write_xid; + /* Are we processing the end LSN of a transaction? */ + bool end_xact; } LogicalDecodingContext;