diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f6f8adc72a..f26c8d18a6 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7039,7 +7039,7 @@ static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags) { CheckPointRelationMap(); - CheckPointReplicationSlots(); + CheckPointReplicationSlots(flags & CHECKPOINT_IS_SHUTDOWN); CheckPointSnapBuild(); CheckPointLogicalRewriteHeap(); CheckPointReplicationOrigin(); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index bb09c4010f..3ded3c1473 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -321,6 +321,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->candidate_xmin_lsn = InvalidXLogRecPtr; slot->candidate_restart_valid = InvalidXLogRecPtr; slot->candidate_restart_lsn = InvalidXLogRecPtr; + slot->last_saved_confirmed_flush = InvalidXLogRecPtr; /* * Create the slot on disk. We haven't actually marked the slot allocated @@ -1572,11 +1573,13 @@ restart: /* * Flush all replication slots to disk. * - * This needn't actually be part of a checkpoint, but it's a convenient - * location. + * It is convenient to flush dirty replication slots at the time of checkpoint. + * Additionally, in case of a shutdown checkpoint, we also identify the slots + * for which the confirmed_flush LSN has been updated since the last time it + * was saved and flush them. */ void -CheckPointReplicationSlots(void) +CheckPointReplicationSlots(bool is_shutdown) { int i; @@ -1601,6 +1604,30 @@ CheckPointReplicationSlots(void) /* save the slot to disk, locking is handled in SaveSlotToPath() */ sprintf(path, "pg_replslot/%s", NameStr(s->data.name)); + + /* + * Slot's data is not flushed each time the confirmed_flush LSN is + * updated as that could lead to frequent writes. However, we decide + * to force a flush of all logical slot's data at the time of shutdown + * if the confirmed_flush LSN is changed since we last flushed it to + * disk. This helps in avoiding an unnecessary retreat of the + * confirmed_flush LSN after restart. + */ + if (is_shutdown && SlotIsLogical(s)) + { + SpinLockAcquire(&s->mutex); + + Assert(s->data.confirmed_flush >= s->last_saved_confirmed_flush); + + if (s->data.invalidated == RS_INVAL_NONE && + s->data.confirmed_flush != s->last_saved_confirmed_flush) + { + s->just_dirtied = true; + s->dirty = true; + } + SpinLockRelease(&s->mutex); + } + SaveSlotToPath(s, path, LOG); } LWLockRelease(ReplicationSlotAllocationLock); @@ -1873,11 +1900,12 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) /* * Successfully wrote, unset dirty bit, unless somebody dirtied again - * already. + * already and remember the confirmed_flush LSN value. */ SpinLockAcquire(&slot->mutex); if (!slot->just_dirtied) slot->dirty = false; + slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush; SpinLockRelease(&slot->mutex); LWLockRelease(&slot->io_in_progress_lock); @@ -2074,6 +2102,7 @@ RestoreSlotFromDisk(const char *name) /* initialize in memory state */ slot->effective_xmin = cp.slotdata.xmin; slot->effective_catalog_xmin = cp.slotdata.catalog_xmin; + slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush; slot->candidate_catalog_xmin = InvalidTransactionId; slot->candidate_xmin_lsn = InvalidXLogRecPtr; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a8a89dc784..758ca79a81 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -178,6 +178,13 @@ typedef struct ReplicationSlot XLogRecPtr candidate_xmin_lsn; XLogRecPtr candidate_restart_valid; XLogRecPtr candidate_restart_lsn; + + /* + * This value tracks the last confirmed_flush LSN flushed which is used + * during a shutdown checkpoint to decide if logical's slot data should be + * forcibly flushed or not. + */ + XLogRecPtr last_saved_confirmed_flush; } ReplicationSlot; #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid) @@ -241,7 +248,7 @@ extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslo extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); extern void StartupReplicationSlots(void); -extern void CheckPointReplicationSlots(void); +extern void CheckPointReplicationSlots(bool is_shutdown); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index e7328e4894..646d6ffde4 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -43,6 +43,7 @@ tests += { 't/035_standby_logical_decoding.pl', 't/036_truncated_dropped.pl', 't/037_invalid_database.pl', + 't/038_save_logical_slots_shutdown.pl', ], }, } diff --git a/src/test/recovery/t/038_save_logical_slots_shutdown.pl b/src/test/recovery/t/038_save_logical_slots_shutdown.pl new file mode 100644 index 0000000000..de19829560 --- /dev/null +++ b/src/test/recovery/t/038_save_logical_slots_shutdown.pl @@ -0,0 +1,102 @@ + +# Copyright (c) 2023, PostgreSQL Global Development Group + +# Test logical replication slots are always flushed to disk during a shutdown +# checkpoint. + +use strict; +use warnings; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +sub compare_confirmed_flush +{ + my ($node, $confirmed_flush_from_log) = @_; + + # Fetch Latest checkpoint location from the control file + my ($stdout, $stderr) = + run_command([ 'pg_controldata', $node->data_dir ]); + my @control_data = split("\n", $stdout); + my $latest_checkpoint = undef; + foreach (@control_data) + { + if ($_ =~ /^Latest checkpoint location:\s*(.*)$/mg) + { + $latest_checkpoint = $1; + last; + } + } + die "Latest checkpoint location not found in control file\n" + unless defined($latest_checkpoint); + + # Is it same as the value read from log? + ok( $latest_checkpoint eq $confirmed_flush_from_log, + "Check that the slot's confirmed_flush LSN is the same as the latest_checkpoint location" + ); + + return; +} + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('pub'); +$node_publisher->init(allows_streaming => 'logical'); +# Avoid checkpoint during the test, otherwise, the latest checkpoint location +# will change. +$node_publisher->append_conf( + 'postgresql.conf', q{ +checkpoint_timeout = 1h +autovacuum = off +}); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('sub'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create tables +$node_publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)"); + +# Insert some data +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tbl VALUES (generate_series(1, 5));"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub FOR ALL TABLES"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub" +); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub'); + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl"); + +is($result, qq(5), "check initial copy was done"); + +my $offset = -s $node_publisher->logfile; + +# Restart the publisher to ensure that the slot will be flushed if required +$node_publisher->restart(); + +# Wait until the walsender creates decoding context +$node_publisher->wait_for_log( + qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./, + $offset); + +# Extract confirmed_flush from the logfile +my $log_contents = slurp_file($node_publisher->logfile, $offset); +$log_contents =~ + qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./ + or die "could not get confirmed_flush_lsn"; + +# Ensure that the slot's confirmed_flush LSN is the same as the +# latest_checkpoint location. +compare_confirmed_flush($node_publisher, $1); + +done_testing();