Replace replication slot's invalidated_at LSN with an enum

This is mainly useful because the upcoming logical-decoding-on-standby feature
adds further reasons for invalidating slots, and we don't want to end up with
multiple invalidated_* fields, or check different attributes.

Eventually we should consider not resetting restart_lsn when invalidating a
slot due to max_slot_wal_keep_size. But that's a user visible change, so left
for later.

Increases SLOT_VERSION, due to the changed field (with a different alignment,
no less).

Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Discussion: https://postgr.es/m/20230407075009.igg7be27ha2htkbt@awork3.anarazel.de
This commit is contained in:
Andres Freund 2023-04-07 21:47:25 -07:00
parent d4e71df6d7
commit 15f8203a59
4 changed files with 41 additions and 11 deletions

View File

@ -89,7 +89,7 @@ typedef struct ReplicationSlotOnDisk
sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
#define SLOT_MAGIC 0x1051CA1 /* format identifier */ #define SLOT_MAGIC 0x1051CA1 /* format identifier */
#define SLOT_VERSION 2 /* version for new files */ #define SLOT_VERSION 3 /* version for new files */
/* Control array for replication slot management */ /* Control array for replication slot management */
ReplicationSlotCtlData *ReplicationSlotCtl = NULL; ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@ -855,8 +855,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
SpinLockAcquire(&s->mutex); SpinLockAcquire(&s->mutex);
effective_xmin = s->effective_xmin; effective_xmin = s->effective_xmin;
effective_catalog_xmin = s->effective_catalog_xmin; effective_catalog_xmin = s->effective_catalog_xmin;
invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) && invalidated = s->data.invalidated != RS_INVAL_NONE;
XLogRecPtrIsInvalid(s->data.restart_lsn));
SpinLockRelease(&s->mutex); SpinLockRelease(&s->mutex);
/* invalidated slots need not apply */ /* invalidated slots need not apply */
@ -901,14 +900,20 @@ ReplicationSlotsComputeRequiredLSN(void)
{ {
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_lsn; XLogRecPtr restart_lsn;
bool invalidated;
if (!s->in_use) if (!s->in_use)
continue; continue;
SpinLockAcquire(&s->mutex); SpinLockAcquire(&s->mutex);
restart_lsn = s->data.restart_lsn; restart_lsn = s->data.restart_lsn;
invalidated = s->data.invalidated != RS_INVAL_NONE;
SpinLockRelease(&s->mutex); SpinLockRelease(&s->mutex);
/* invalidated slots need not apply */
if (invalidated)
continue;
if (restart_lsn != InvalidXLogRecPtr && if (restart_lsn != InvalidXLogRecPtr &&
(min_required == InvalidXLogRecPtr || (min_required == InvalidXLogRecPtr ||
restart_lsn < min_required)) restart_lsn < min_required))
@ -946,6 +951,7 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
{ {
ReplicationSlot *s; ReplicationSlot *s;
XLogRecPtr restart_lsn; XLogRecPtr restart_lsn;
bool invalidated;
s = &ReplicationSlotCtl->replication_slots[i]; s = &ReplicationSlotCtl->replication_slots[i];
@ -960,8 +966,13 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
/* read once, it's ok if it increases while we're checking */ /* read once, it's ok if it increases while we're checking */
SpinLockAcquire(&s->mutex); SpinLockAcquire(&s->mutex);
restart_lsn = s->data.restart_lsn; restart_lsn = s->data.restart_lsn;
invalidated = s->data.invalidated != RS_INVAL_NONE;
SpinLockRelease(&s->mutex); SpinLockRelease(&s->mutex);
/* invalidated slots need not apply */
if (invalidated)
continue;
if (restart_lsn == InvalidXLogRecPtr) if (restart_lsn == InvalidXLogRecPtr)
continue; continue;
@ -1012,6 +1023,8 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
if (s->data.database != dboid) if (s->data.database != dboid)
continue; continue;
/* NB: intentionally counting invalidated slots */
/* count slots with spinlock held */ /* count slots with spinlock held */
SpinLockAcquire(&s->mutex); SpinLockAcquire(&s->mutex);
(*nslots)++; (*nslots)++;
@ -1069,6 +1082,8 @@ restart:
if (s->data.database != dboid) if (s->data.database != dboid)
continue; continue;
/* NB: intentionally including invalidated slots */
/* acquire slot, so ReplicationSlotDropAcquired can be reused */ /* acquire slot, so ReplicationSlotDropAcquired can be reused */
SpinLockAcquire(&s->mutex); SpinLockAcquire(&s->mutex);
/* can't change while ReplicationSlotControlLock is held */ /* can't change while ReplicationSlotControlLock is held */
@ -1294,7 +1309,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
{ {
MyReplicationSlot = s; MyReplicationSlot = s;
s->active_pid = MyProcPid; s->active_pid = MyProcPid;
s->data.invalidated_at = restart_lsn; s->data.invalidated = RS_INVAL_WAL_REMOVED;
/*
* XXX: We should consider not overwriting restart_lsn and instead
* just rely on .invalidated.
*/
s->data.restart_lsn = InvalidXLogRecPtr; s->data.restart_lsn = InvalidXLogRecPtr;
/* Let caller know */ /* Let caller know */

View File

@ -315,12 +315,10 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
nulls[i++] = true; nulls[i++] = true;
/* /*
* If invalidated_at is valid and restart_lsn is invalid, we know for * If the slot has not been invalidated, test availability from
* certain that the slot has been invalidated. Otherwise, test * restart_lsn.
* availability from restart_lsn.
*/ */
if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) && if (slot_contents.data.invalidated != RS_INVAL_NONE)
!XLogRecPtrIsInvalid(slot_contents.data.invalidated_at))
walstate = WALAVAIL_REMOVED; walstate = WALAVAIL_REMOVED;
else else
walstate = GetWALAvailability(slot_contents.data.restart_lsn); walstate = GetWALAvailability(slot_contents.data.restart_lsn);

View File

@ -37,6 +37,17 @@ typedef enum ReplicationSlotPersistency
RS_TEMPORARY RS_TEMPORARY
} ReplicationSlotPersistency; } ReplicationSlotPersistency;
/*
* Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
* 'invalidated' field is set to a value other than _NONE.
*/
typedef enum ReplicationSlotInvalidationCause
{
RS_INVAL_NONE,
/* required WAL has been removed */
RS_INVAL_WAL_REMOVED,
} ReplicationSlotInvalidationCause;
/* /*
* On-Disk data of a replication slot, preserved across restarts. * On-Disk data of a replication slot, preserved across restarts.
*/ */
@ -72,8 +83,8 @@ typedef struct ReplicationSlotPersistentData
/* oldest LSN that might be required by this replication slot */ /* oldest LSN that might be required by this replication slot */
XLogRecPtr restart_lsn; XLogRecPtr restart_lsn;
/* restart_lsn is copied here when the slot is invalidated */ /* RS_INVAL_NONE if valid, or the reason for having been invalidated */
XLogRecPtr invalidated_at; ReplicationSlotInvalidationCause invalidated;
/* /*
* Oldest LSN that the client has acked receipt for. This is used as the * Oldest LSN that the client has acked receipt for. This is used as the

View File

@ -2339,6 +2339,7 @@ ReplicaIdentityStmt
ReplicationKind ReplicationKind
ReplicationSlot ReplicationSlot
ReplicationSlotCtlData ReplicationSlotCtlData
ReplicationSlotInvalidationCause
ReplicationSlotOnDisk ReplicationSlotOnDisk
ReplicationSlotPersistency ReplicationSlotPersistency
ReplicationSlotPersistentData ReplicationSlotPersistentData