Track last_inactive_time in pg_replication_slots.
This commit adds a new property called last_inactive_time for slots. It is set to 0 whenever a slot is made active/acquired and set to the current timestamp whenever the slot is inactive/released or restored from the disk. Note that we don't set the last_inactive_time for the slots currently being synced from the primary to the standby because such slots are typically inactive as decoding is not allowed on those. The 'last_inactive_time' will be useful on production servers to debug and analyze inactive replication slots. It will also help to know the lifetime of a replication slot - one can know how long a streaming standby, logical subscriber, or replication slot consumer is down. The 'last_inactive_time' will also be useful to implement inactive timeout-based replication slot invalidation in a future commit. Author: Bharath Rupireddy Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
This commit is contained in:
parent
0f7863afef
commit
a11f330b55
@ -2523,6 +2523,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
|
|||||||
</para></entry>
|
</para></entry>
|
||||||
</row>
|
</row>
|
||||||
|
|
||||||
|
<row>
|
||||||
|
<entry role="catalog_table_entry"><para role="column_definition">
|
||||||
|
<structfield>last_inactive_time</structfield> <type>timestamptz</type>
|
||||||
|
</para>
|
||||||
|
<para>
|
||||||
|
The time at which the slot became inactive.
|
||||||
|
<literal>NULL</literal> if the slot is currently being used.
|
||||||
|
</para></entry>
|
||||||
|
</row>
|
||||||
|
|
||||||
<row>
|
<row>
|
||||||
<entry role="catalog_table_entry"><para role="column_definition">
|
<entry role="catalog_table_entry"><para role="column_definition">
|
||||||
<structfield>conflicting</structfield> <type>bool</type>
|
<structfield>conflicting</structfield> <type>bool</type>
|
||||||
|
@ -1023,6 +1023,7 @@ CREATE VIEW pg_replication_slots AS
|
|||||||
L.wal_status,
|
L.wal_status,
|
||||||
L.safe_wal_size,
|
L.safe_wal_size,
|
||||||
L.two_phase,
|
L.two_phase,
|
||||||
|
L.last_inactive_time,
|
||||||
L.conflicting,
|
L.conflicting,
|
||||||
L.invalidation_reason,
|
L.invalidation_reason,
|
||||||
L.failover,
|
L.failover,
|
||||||
|
@ -409,6 +409,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
|
|||||||
slot->candidate_restart_valid = InvalidXLogRecPtr;
|
slot->candidate_restart_valid = InvalidXLogRecPtr;
|
||||||
slot->candidate_restart_lsn = InvalidXLogRecPtr;
|
slot->candidate_restart_lsn = InvalidXLogRecPtr;
|
||||||
slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
|
slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
|
||||||
|
slot->last_inactive_time = 0;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Create the slot on disk. We haven't actually marked the slot allocated
|
* Create the slot on disk. We haven't actually marked the slot allocated
|
||||||
@ -622,6 +623,11 @@ retry:
|
|||||||
if (SlotIsLogical(s))
|
if (SlotIsLogical(s))
|
||||||
pgstat_acquire_replslot(s);
|
pgstat_acquire_replslot(s);
|
||||||
|
|
||||||
|
/* Reset the last inactive time as the slot is active now. */
|
||||||
|
SpinLockAcquire(&s->mutex);
|
||||||
|
s->last_inactive_time = 0;
|
||||||
|
SpinLockRelease(&s->mutex);
|
||||||
|
|
||||||
if (am_walsender)
|
if (am_walsender)
|
||||||
{
|
{
|
||||||
ereport(log_replication_commands ? LOG : DEBUG1,
|
ereport(log_replication_commands ? LOG : DEBUG1,
|
||||||
@ -645,6 +651,7 @@ ReplicationSlotRelease(void)
|
|||||||
ReplicationSlot *slot = MyReplicationSlot;
|
ReplicationSlot *slot = MyReplicationSlot;
|
||||||
char *slotname = NULL; /* keep compiler quiet */
|
char *slotname = NULL; /* keep compiler quiet */
|
||||||
bool is_logical = false; /* keep compiler quiet */
|
bool is_logical = false; /* keep compiler quiet */
|
||||||
|
TimestampTz now = 0;
|
||||||
|
|
||||||
Assert(slot != NULL && slot->active_pid != 0);
|
Assert(slot != NULL && slot->active_pid != 0);
|
||||||
|
|
||||||
@ -679,6 +686,15 @@ ReplicationSlotRelease(void)
|
|||||||
ReplicationSlotsComputeRequiredXmin(false);
|
ReplicationSlotsComputeRequiredXmin(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Set the last inactive time after marking the slot inactive. We don't set
|
||||||
|
* it for the slots currently being synced from the primary to the standby
|
||||||
|
* because such slots are typically inactive as decoding is not allowed on
|
||||||
|
* those.
|
||||||
|
*/
|
||||||
|
if (!(RecoveryInProgress() && slot->data.synced))
|
||||||
|
now = GetCurrentTimestamp();
|
||||||
|
|
||||||
if (slot->data.persistency == RS_PERSISTENT)
|
if (slot->data.persistency == RS_PERSISTENT)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
@ -687,9 +703,16 @@ ReplicationSlotRelease(void)
|
|||||||
*/
|
*/
|
||||||
SpinLockAcquire(&slot->mutex);
|
SpinLockAcquire(&slot->mutex);
|
||||||
slot->active_pid = 0;
|
slot->active_pid = 0;
|
||||||
|
slot->last_inactive_time = now;
|
||||||
SpinLockRelease(&slot->mutex);
|
SpinLockRelease(&slot->mutex);
|
||||||
ConditionVariableBroadcast(&slot->active_cv);
|
ConditionVariableBroadcast(&slot->active_cv);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
SpinLockAcquire(&slot->mutex);
|
||||||
|
slot->last_inactive_time = now;
|
||||||
|
SpinLockRelease(&slot->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
MyReplicationSlot = NULL;
|
MyReplicationSlot = NULL;
|
||||||
|
|
||||||
@ -2342,6 +2365,18 @@ RestoreSlotFromDisk(const char *name)
|
|||||||
slot->in_use = true;
|
slot->in_use = true;
|
||||||
slot->active_pid = 0;
|
slot->active_pid = 0;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We set the last inactive time after loading the slot from the disk
|
||||||
|
* into memory. Whoever acquires the slot i.e. makes the slot active
|
||||||
|
* will reset it. We don't set it for the slots currently being synced
|
||||||
|
* from the primary to the standby because such slots are typically
|
||||||
|
* inactive as decoding is not allowed on those.
|
||||||
|
*/
|
||||||
|
if (!(RecoveryInProgress() && slot->data.synced))
|
||||||
|
slot->last_inactive_time = GetCurrentTimestamp();
|
||||||
|
else
|
||||||
|
slot->last_inactive_time = 0;
|
||||||
|
|
||||||
restored = true;
|
restored = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -239,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
|
|||||||
Datum
|
Datum
|
||||||
pg_get_replication_slots(PG_FUNCTION_ARGS)
|
pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
#define PG_GET_REPLICATION_SLOTS_COLS 18
|
#define PG_GET_REPLICATION_SLOTS_COLS 19
|
||||||
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||||
XLogRecPtr currlsn;
|
XLogRecPtr currlsn;
|
||||||
int slotno;
|
int slotno;
|
||||||
@ -410,6 +410,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
|||||||
|
|
||||||
values[i++] = BoolGetDatum(slot_contents.data.two_phase);
|
values[i++] = BoolGetDatum(slot_contents.data.two_phase);
|
||||||
|
|
||||||
|
if (slot_contents.last_inactive_time > 0)
|
||||||
|
values[i++] = TimestampTzGetDatum(slot_contents.last_inactive_time);
|
||||||
|
else
|
||||||
|
nulls[i++] = true;
|
||||||
|
|
||||||
cause = slot_contents.data.invalidated;
|
cause = slot_contents.data.invalidated;
|
||||||
|
|
||||||
if (SlotIsPhysical(&slot_contents))
|
if (SlotIsPhysical(&slot_contents))
|
||||||
|
@ -57,6 +57,6 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
/* yyyymmddN */
|
/* yyyymmddN */
|
||||||
#define CATALOG_VERSION_NO 202403222
|
#define CATALOG_VERSION_NO 202403251
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -11133,9 +11133,9 @@
|
|||||||
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
|
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
|
||||||
proretset => 't', provolatile => 's', prorettype => 'record',
|
proretset => 't', provolatile => 's', prorettype => 'record',
|
||||||
proargtypes => '',
|
proargtypes => '',
|
||||||
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,text,bool,bool}',
|
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,timestamptz,bool,text,bool,bool}',
|
||||||
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
|
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
|
||||||
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,invalidation_reason,failover,synced}',
|
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,last_inactive_time,conflicting,invalidation_reason,failover,synced}',
|
||||||
prosrc => 'pg_get_replication_slots' },
|
prosrc => 'pg_get_replication_slots' },
|
||||||
{ oid => '3786', descr => 'set up a logical replication slot',
|
{ oid => '3786', descr => 'set up a logical replication slot',
|
||||||
proname => 'pg_create_logical_replication_slot', provolatile => 'v',
|
proname => 'pg_create_logical_replication_slot', provolatile => 'v',
|
||||||
|
@ -201,6 +201,9 @@ typedef struct ReplicationSlot
|
|||||||
* forcibly flushed or not.
|
* forcibly flushed or not.
|
||||||
*/
|
*/
|
||||||
XLogRecPtr last_saved_confirmed_flush;
|
XLogRecPtr last_saved_confirmed_flush;
|
||||||
|
|
||||||
|
/* The time at which this slot becomes inactive */
|
||||||
|
TimestampTz last_inactive_time;
|
||||||
} ReplicationSlot;
|
} ReplicationSlot;
|
||||||
|
|
||||||
#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
|
#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
|
||||||
|
@ -410,4 +410,156 @@ kill 'CONT', $receiverpid;
|
|||||||
$node_primary3->stop;
|
$node_primary3->stop;
|
||||||
$node_standby3->stop;
|
$node_standby3->stop;
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Testcase start: Check last_inactive_time property of the streaming standby's slot
|
||||||
|
#
|
||||||
|
|
||||||
|
# Initialize primary node
|
||||||
|
my $primary4 = PostgreSQL::Test::Cluster->new('primary4');
|
||||||
|
$primary4->init(allows_streaming => 'logical');
|
||||||
|
$primary4->start;
|
||||||
|
|
||||||
|
# Take backup
|
||||||
|
$backup_name = 'my_backup4';
|
||||||
|
$primary4->backup($backup_name);
|
||||||
|
|
||||||
|
# Create a standby linking to the primary using the replication slot
|
||||||
|
my $standby4 = PostgreSQL::Test::Cluster->new('standby4');
|
||||||
|
$standby4->init_from_backup($primary4, $backup_name, has_streaming => 1);
|
||||||
|
|
||||||
|
my $sb4_slot = 'sb4_slot';
|
||||||
|
$standby4->append_conf('postgresql.conf', "primary_slot_name = '$sb4_slot'");
|
||||||
|
|
||||||
|
my $slot_creation_time = $primary4->safe_psql(
|
||||||
|
'postgres', qq[
|
||||||
|
SELECT current_timestamp;
|
||||||
|
]);
|
||||||
|
|
||||||
|
$primary4->safe_psql(
|
||||||
|
'postgres', qq[
|
||||||
|
SELECT pg_create_physical_replication_slot(slot_name := '$sb4_slot');
|
||||||
|
]);
|
||||||
|
|
||||||
|
# Get last_inactive_time value after the slot's creation. Note that the slot
|
||||||
|
# is still inactive till it's used by the standby below.
|
||||||
|
my $last_inactive_time =
|
||||||
|
capture_and_validate_slot_last_inactive_time($primary4, $sb4_slot, $slot_creation_time);
|
||||||
|
|
||||||
|
$standby4->start;
|
||||||
|
|
||||||
|
# Wait until standby has replayed enough data
|
||||||
|
$primary4->wait_for_catchup($standby4);
|
||||||
|
|
||||||
|
# Now the slot is active so last_inactive_time value must be NULL
|
||||||
|
is( $primary4->safe_psql(
|
||||||
|
'postgres',
|
||||||
|
qq[SELECT last_inactive_time IS NULL FROM pg_replication_slots WHERE slot_name = '$sb4_slot';]
|
||||||
|
),
|
||||||
|
't',
|
||||||
|
'last inactive time for an active physical slot is NULL');
|
||||||
|
|
||||||
|
# Stop the standby to check its last_inactive_time value is updated
|
||||||
|
$standby4->stop;
|
||||||
|
|
||||||
|
# Let's restart the primary so that the last_inactive_time is set upon
|
||||||
|
# loading the slot from the disk.
|
||||||
|
$primary4->restart;
|
||||||
|
|
||||||
|
is( $primary4->safe_psql(
|
||||||
|
'postgres',
|
||||||
|
qq[SELECT last_inactive_time > '$last_inactive_time'::timestamptz FROM pg_replication_slots WHERE slot_name = '$sb4_slot' AND last_inactive_time IS NOT NULL;]
|
||||||
|
),
|
||||||
|
't',
|
||||||
|
'last inactive time for an inactive physical slot is updated correctly');
|
||||||
|
|
||||||
|
$standby4->stop;
|
||||||
|
|
||||||
|
# Testcase end: Check last_inactive_time property of the streaming standby's slot
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Testcase start: Check last_inactive_time property of the logical subscriber's slot
|
||||||
|
my $publisher4 = $primary4;
|
||||||
|
|
||||||
|
# Create subscriber node
|
||||||
|
my $subscriber4 = PostgreSQL::Test::Cluster->new('subscriber4');
|
||||||
|
$subscriber4->init;
|
||||||
|
|
||||||
|
# Setup logical replication
|
||||||
|
my $publisher4_connstr = $publisher4->connstr . ' dbname=postgres';
|
||||||
|
$publisher4->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES");
|
||||||
|
|
||||||
|
$slot_creation_time = $publisher4->safe_psql(
|
||||||
|
'postgres', qq[
|
||||||
|
SELECT current_timestamp;
|
||||||
|
]);
|
||||||
|
|
||||||
|
my $lsub4_slot = 'lsub4_slot';
|
||||||
|
$publisher4->safe_psql('postgres',
|
||||||
|
"SELECT pg_create_logical_replication_slot(slot_name := '$lsub4_slot', plugin := 'pgoutput');"
|
||||||
|
);
|
||||||
|
|
||||||
|
# Get last_inactive_time value after the slot's creation. Note that the slot
|
||||||
|
# is still inactive till it's used by the subscriber below.
|
||||||
|
$last_inactive_time =
|
||||||
|
capture_and_validate_slot_last_inactive_time($publisher4, $lsub4_slot, $slot_creation_time);
|
||||||
|
|
||||||
|
$subscriber4->start;
|
||||||
|
$subscriber4->safe_psql('postgres',
|
||||||
|
"CREATE SUBSCRIPTION sub CONNECTION '$publisher4_connstr' PUBLICATION pub WITH (slot_name = '$lsub4_slot', create_slot = false)"
|
||||||
|
);
|
||||||
|
|
||||||
|
# Wait until subscriber has caught up
|
||||||
|
$subscriber4->wait_for_subscription_sync($publisher4, 'sub');
|
||||||
|
|
||||||
|
# Now the slot is active so last_inactive_time value must be NULL
|
||||||
|
is( $publisher4->safe_psql(
|
||||||
|
'postgres',
|
||||||
|
qq[SELECT last_inactive_time IS NULL FROM pg_replication_slots WHERE slot_name = '$lsub4_slot';]
|
||||||
|
),
|
||||||
|
't',
|
||||||
|
'last inactive time for an active logical slot is NULL');
|
||||||
|
|
||||||
|
# Stop the subscriber to check its last_inactive_time value is updated
|
||||||
|
$subscriber4->stop;
|
||||||
|
|
||||||
|
# Let's restart the publisher so that the last_inactive_time is set upon
|
||||||
|
# loading the slot from the disk.
|
||||||
|
$publisher4->restart;
|
||||||
|
|
||||||
|
is( $publisher4->safe_psql(
|
||||||
|
'postgres',
|
||||||
|
qq[SELECT last_inactive_time > '$last_inactive_time'::timestamptz FROM pg_replication_slots WHERE slot_name = '$lsub4_slot' AND last_inactive_time IS NOT NULL;]
|
||||||
|
),
|
||||||
|
't',
|
||||||
|
'last inactive time for an inactive logical slot is updated correctly');
|
||||||
|
|
||||||
|
# Testcase end: Check last_inactive_time property of the logical subscriber's slot
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
$publisher4->stop;
|
||||||
|
$subscriber4->stop;
|
||||||
|
|
||||||
|
# Capture and validate last_inactive_time of a given slot.
|
||||||
|
sub capture_and_validate_slot_last_inactive_time
|
||||||
|
{
|
||||||
|
my ($node, $slot_name, $slot_creation_time) = @_;
|
||||||
|
|
||||||
|
my $last_inactive_time = $node->safe_psql('postgres',
|
||||||
|
qq(SELECT last_inactive_time FROM pg_replication_slots
|
||||||
|
WHERE slot_name = '$slot_name' AND last_inactive_time IS NOT NULL;)
|
||||||
|
);
|
||||||
|
|
||||||
|
# Check that the captured time is sane
|
||||||
|
is( $node->safe_psql(
|
||||||
|
'postgres',
|
||||||
|
qq[SELECT '$last_inactive_time'::timestamptz > to_timestamp(0) AND
|
||||||
|
'$last_inactive_time'::timestamptz >= '$slot_creation_time'::timestamptz;]
|
||||||
|
),
|
||||||
|
't',
|
||||||
|
"last inactive time for an active slot $slot_name is sane");
|
||||||
|
|
||||||
|
return $last_inactive_time;
|
||||||
|
}
|
||||||
|
|
||||||
done_testing();
|
done_testing();
|
||||||
|
@ -1473,11 +1473,12 @@ pg_replication_slots| SELECT l.slot_name,
|
|||||||
l.wal_status,
|
l.wal_status,
|
||||||
l.safe_wal_size,
|
l.safe_wal_size,
|
||||||
l.two_phase,
|
l.two_phase,
|
||||||
|
l.last_inactive_time,
|
||||||
l.conflicting,
|
l.conflicting,
|
||||||
l.invalidation_reason,
|
l.invalidation_reason,
|
||||||
l.failover,
|
l.failover,
|
||||||
l.synced
|
l.synced
|
||||||
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, invalidation_reason, failover, synced)
|
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, last_inactive_time, conflicting, invalidation_reason, failover, synced)
|
||||||
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
|
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
|
||||||
pg_roles| SELECT pg_authid.rolname,
|
pg_roles| SELECT pg_authid.rolname,
|
||||||
pg_authid.rolsuper,
|
pg_authid.rolsuper,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user