diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 934aa13f2d..e1f14aeecb 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -683,12 +683,14 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write) * Update progress tracking (if supported). */ void -OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx) +OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, + bool skipped_xact) { if (!ctx->update_progress) return; - ctx->update_progress(ctx, ctx->write_location, ctx->write_xid); + ctx->update_progress(ctx, ctx->write_location, ctx->write_xid, + skipped_xact); } /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 893833ea83..20d0b1e125 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -183,6 +183,36 @@ typedef struct RelationSyncEntry MemoryContext entry_cxt; } RelationSyncEntry; +/* + * Maintain a per-transaction level variable to track whether the transaction + * has sent BEGIN. BEGIN is only sent when the first change in a transaction + * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT + * messages for empty transactions which saves network bandwidth. + * + * This optimization is not used for prepared transactions because if the + * WALSender restarts after prepare of a transaction and before commit prepared + * of the same transaction then we won't be able to figure out if we have + * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is + * because we would have lost the in-memory txndata information that was + * present prior to the restart. This will result in sending a spurious + * COMMIT PREPARED without a corresponding prepared transaction at the + * downstream which would lead to an error when it tries to process it. + * + * XXX We could achieve this optimization by changing protocol to send + * additional information so that downstream can detect that the corresponding + * prepare has not been sent. However, adding such a check for every + * transaction in the downstream could be costly so we might want to do it + * optionally. + * + * We also don't have this optimization for streamed transactions because + * they can contain prepared transactions. + */ +typedef struct PGOutputTxnData +{ + bool sent_begin_txn; /* flag indicating whether BEGIN has + * been sent */ +} PGOutputTxnData; + /* Map used to remember which relation schemas we sent. */ static HTAB *RelationSyncCache = NULL; @@ -488,15 +518,41 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, } /* - * BEGIN callback + * BEGIN callback. + * + * Don't send the BEGIN message here instead postpone it until the first + * change. In logical replication, a common scenario is to replicate a set of + * tables (instead of all tables) and transactions whose changes were on + * the table(s) that are not published will produce empty transactions. These + * empty transactions will send BEGIN and COMMIT messages to subscribers, + * using bandwidth on something with little/no use for logical replication. */ static void -pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +pgoutput_begin_txn(LogicalDecodingContext * ctx, ReorderBufferTXN * txn) +{ + PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context, + sizeof(PGOutputTxnData)); + + txn->output_plugin_private = txndata; +} + +/* + * Send BEGIN. + * + * This is called while processing the first change of the transaction. + */ +static void +pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + + Assert(txndata); + Assert(!txndata->sent_begin_txn); OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin(ctx->out, txn); + txndata->sent_begin_txn = true; send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, send_replication_origin); @@ -511,7 +567,25 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - OutputPluginUpdateProgress(ctx); + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + bool sent_begin_txn; + + Assert(txndata); + + /* + * We don't need to send the commit message unless some relevant change + * from this transaction has been sent to the downstream. + */ + sent_begin_txn = txndata->sent_begin_txn; + OutputPluginUpdateProgress(ctx, !sent_begin_txn); + pfree(txndata); + txn->output_plugin_private = NULL; + + if (!sent_begin_txn) + { + elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid); + return; + } OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit(ctx->out, txn, commit_lsn); @@ -542,7 +616,7 @@ static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_prepare(ctx->out, txn, prepare_lsn); @@ -556,7 +630,7 @@ static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); @@ -572,7 +646,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time) { - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, @@ -1295,6 +1369,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; TransactionId xid = InvalidTransactionId; @@ -1370,6 +1445,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, &action)) break; + /* + * Send BEGIN if we haven't yet. + * + * We send the BEGIN message after ensuring that we will actually + * send the change. This avoids sending a pair of BEGIN/COMMIT + * messages for empty transactions. + */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + /* * Schema should be sent using the original relation because it * also sends the ancestor's relation. @@ -1420,6 +1505,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, relentry, &action)) break; + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + maybe_send_schema(ctx, change, relation, relentry); OutputPluginPrepareWrite(ctx, true); @@ -1480,6 +1569,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, relentry, &action)) break; + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + maybe_send_schema(ctx, change, relation, relentry); OutputPluginPrepareWrite(ctx, true); @@ -1510,6 +1603,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; int i; @@ -1548,6 +1642,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, continue; relids[nrelids++] = relid; + + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + maybe_send_schema(ctx, change, relation, relentry); } @@ -1585,6 +1684,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (in_streaming) xid = txn->xid; + /* + * Output BEGIN if we haven't yet. Avoid for non-transactional + * messages. + */ + if (transactional) + { + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_message(ctx->out, xid, @@ -1629,6 +1741,19 @@ pgoutput_sequence(LogicalDecodingContext *ctx, if (!relentry->pubactions.pubsequence) return; + /* + * Output BEGIN if we haven't yet. Avoid for non-transactional + * sequence changes. + */ + if (transactional) + { + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_sequence(ctx->out, relation, @@ -1799,7 +1924,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, Assert(!in_streaming); Assert(rbtxn_is_streamed(txn)); - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); @@ -1820,7 +1945,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, { Assert(rbtxn_is_streamed(txn)); - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index cffb3482ad..75400a53f2 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -242,14 +242,16 @@ static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); static void ProcessRepliesIfAny(void); -static void WalSndKeepalive(bool requestReply); +static void ProcessPendingWrites(void); +static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr); static void WalSndKeepaliveIfNecessary(void); static void WalSndCheckTimeOut(void); static long WalSndComputeSleeptime(TimestampTz now); static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event); static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); -static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid); +static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, + bool skipped_xact); static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); @@ -1399,6 +1401,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, } /* If we have pending write here, go to slow path */ + ProcessPendingWrites(); +} + +/* + * Wait until there is no pending write. Also process replies from the other + * side and check timeouts during that. + */ +static void +ProcessPendingWrites(void) +{ for (;;) { long sleeptime; @@ -1447,9 +1459,12 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, * LogicalDecodingContext 'update_progress' callback. * * Write the current position to the lag tracker (see XLogSendPhysical). + * + * When skipping empty transactions, send a keepalive message if necessary. */ static void -WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid) +WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, + bool skipped_xact) { static TimestampTz sendTime = 0; TimestampTz now = GetCurrentTimestamp(); @@ -1459,12 +1474,35 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId * avoid flooding the lag tracker when we commit frequently. */ #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000 - if (!TimestampDifferenceExceeds(sendTime, now, - WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) - return; + if (TimestampDifferenceExceeds(sendTime, now, + WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) + { + LagTrackerWrite(lsn, now); + sendTime = now; + } - LagTrackerWrite(lsn, now); - sendTime = now; + /* + * When skipping empty transactions in synchronous replication, we send a + * keepalive message to avoid delaying such transactions. + * + * It is okay to check sync_standbys_defined flag without lock here as + * in the worst case we will just send an extra keepalive message when it + * is really not required. + */ + if (skipped_xact && + SyncRepRequested() && + ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined) + { + WalSndKeepalive(false, lsn); + + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + + /* If we have pending write here, make sure it's actually flushed */ + if (pq_is_send_pending()) + ProcessPendingWrites(); + } } /* @@ -1550,7 +1588,7 @@ WalSndWaitForWal(XLogRecPtr loc) if (MyWalSnd->flush < sentPtr && MyWalSnd->write < sentPtr && !waiting_for_ping_response) - WalSndKeepalive(false); + WalSndKeepalive(false, InvalidXLogRecPtr); /* check whether we're done */ if (loc <= RecentFlushPtr) @@ -2068,7 +2106,7 @@ ProcessStandbyReplyMessage(void) /* Send a reply if the standby requested one. */ if (replyRequested) - WalSndKeepalive(false); + WalSndKeepalive(false, InvalidXLogRecPtr); /* * Update shared state for this WalSender process based on reply data from @@ -3074,7 +3112,7 @@ WalSndDone(WalSndSendDataCallback send_data) proc_exit(0); } if (!waiting_for_ping_response) - WalSndKeepalive(true); + WalSndKeepalive(true, InvalidXLogRecPtr); } /* @@ -3563,18 +3601,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) * * If requestReply is set, the message requests the other party to send * a message back to us, for heartbeat purposes. We also set a flag to - * let nearby code that we're waiting for that response, to avoid + * let nearby code know that we're waiting for that response, to avoid * repeated requests. + * + * writePtr is the location up to which the WAL is sent. It is essentially + * the same as sentPtr but in some cases, we need to send keep alive before + * sentPtr is updated like when skipping empty transactions. */ static void -WalSndKeepalive(bool requestReply) +WalSndKeepalive(bool requestReply, XLogRecPtr writePtr) { elog(DEBUG2, "sending replication keepalive"); /* construct the message... */ resetStringInfo(&output_message); pq_sendbyte(&output_message, 'k'); - pq_sendint64(&output_message, sentPtr); + pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr); pq_sendint64(&output_message, GetCurrentTimestamp()); pq_sendbyte(&output_message, requestReply ? 1 : 0); @@ -3613,7 +3655,7 @@ WalSndKeepaliveIfNecessary(void) wal_sender_timeout / 2); if (last_processing >= ping_time) { - WalSndKeepalive(true); + WalSndKeepalive(true, InvalidXLogRecPtr); /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 1097cc9799..a6ef16ad5b 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -26,7 +26,8 @@ typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite; typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr, XLogRecPtr Ptr, - TransactionId xid + TransactionId xid, + bool skipped_xact ); typedef struct LogicalDecodingContext diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index a16bebf76c..fe85d49a03 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -270,6 +270,6 @@ typedef struct OutputPluginCallbacks /* Functions in replication/logical/logical.c */ extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write); extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write); -extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx); +extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact); #endif /* OUTPUT_PLUGIN_H */ diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index eca1c63335..d35a133f15 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -473,6 +473,34 @@ $node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(0)"); $node_publisher->wait_for_catchup('tap_sub'); +# Check that we don't send BEGIN and COMMIT because of empty transaction +# optimization. We have to look for the DEBUG1 log messages about that, so +# temporarily bump up the log verbosity. +$node_publisher->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_publisher->reload; + +# Note that the current location of the log file is not grabbed immediately +# after reloading the configuration, but after sending one SQL command to +# the node so that we are sure that the reloading has taken effect. +$log_location = -s $node_publisher->logfile; + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_notrep VALUES (11)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$logfile = slurp_file($node_publisher->logfile, $log_location); +ok( $logfile =~ + qr/skipped replication of an empty transaction with XID/, + 'empty transaction is skipped'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_notrep"); +is($result, qq(0), 'check non-replicated table is empty on subscriber'); + +$node_publisher->append_conf('postgresql.conf', + "log_min_messages = warning"); +$node_publisher->reload; + # note that data are different on provider and subscriber $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_ins"); diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl index b5045ff3c4..d21d929c2d 100644 --- a/src/test/subscription/t/020_messages.pl +++ b/src/test/subscription/t/020_messages.pl @@ -87,9 +87,8 @@ $result = $node_publisher->safe_psql( 'publication_names', 'tap_pub') )); -# 66 67 == B C == BEGIN COMMIT -is( $result, qq(66 -67), +# no message and no BEGIN and COMMIT because of empty transaction optimization +is($result, qq(), 'option messages defaults to false so message (M) is not available on slot' ); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 6b77cc64ef..72fafb795b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1616,6 +1616,7 @@ PGMessageField PGModuleMagicFunction PGNoticeHooks PGOutputData +PGOutputTxnData PGPROC PGP_CFB PGP_Context