Morph pg_replication_slots.min_safe_lsn to safe_wal_size
The previous definition of the column was almost universally disliked, so provide this updated definition which is more useful for monitoring purposes: a large positive value is good, while zero or a negative value means danger. This should be operationally more convenient. Backpatch to 13, where the new column to pg_replication_slots (and the feature it represents) were added. Author: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Author: Álvaro Herrera <alvherre@alvh.no-ip.org> Reported-by: Fujii Masao <masao.fujii@oss.nttdata.com> Discussion: https://postgr.es/m/9ddfbf8c-2f67-904d-44ed-cf8bc5916228@oss.nttdata.com
This commit is contained in:
parent
da6b6ff95b
commit
c54b5891f4
@ -11275,10 +11275,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
|
||||
|
||||
<row>
|
||||
<entry role="catalog_table_entry"><para role="column_definition">
|
||||
<structfield>min_safe_lsn</structfield> <type>pg_lsn</type>
|
||||
<structfield>safe_wal_size</structfield> <type>int8</type>
|
||||
</para>
|
||||
<para>
|
||||
The minimum LSN currently available for walsenders.
|
||||
The number of bytes that can be written to WAL such that this slot
|
||||
is not in danger of getting in state "lost". It is NULL for lost
|
||||
slots, as well as if <varname>max_slot_wal_keep_size</varname>
|
||||
is <literal>-1</literal>.
|
||||
</para></entry>
|
||||
</row>
|
||||
</tbody>
|
||||
|
@ -764,8 +764,7 @@ static ControlFileData *ControlFile = NULL;
|
||||
* Convert values of GUCs measured in megabytes to equiv. segment count.
|
||||
* Rounds down.
|
||||
*/
|
||||
#define ConvertToXSegs(x, segsize) \
|
||||
((x) / ((segsize) / (1024 * 1024)))
|
||||
#define ConvertToXSegs(x, segsize) XLogMBVarToSegs((x), (segsize))
|
||||
|
||||
/* The number of bytes in a WAL segment usable for WAL data. */
|
||||
static int UsableBytesInSegment;
|
||||
@ -9513,8 +9512,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
|
||||
XLogSegNo targetSeg; /* segid of targetLSN */
|
||||
XLogSegNo oldestSeg; /* actual oldest segid */
|
||||
XLogSegNo oldestSegMaxWalSize; /* oldest segid kept by max_wal_size */
|
||||
XLogSegNo oldestSlotSeg = InvalidXLogRecPtr; /* oldest segid kept by
|
||||
* slot */
|
||||
XLogSegNo oldestSlotSeg; /* oldest segid kept by slot */
|
||||
uint64 keepSegs;
|
||||
|
||||
/*
|
||||
|
@ -879,7 +879,7 @@ CREATE VIEW pg_replication_slots AS
|
||||
L.restart_lsn,
|
||||
L.confirmed_flush_lsn,
|
||||
L.wal_status,
|
||||
L.min_safe_lsn
|
||||
L.safe_wal_size
|
||||
FROM pg_get_replication_slots() AS L
|
||||
LEFT JOIN pg_database D ON (L.datoid = D.oid);
|
||||
|
||||
|
@ -242,6 +242,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
Tuplestorestate *tupstore;
|
||||
MemoryContext per_query_ctx;
|
||||
MemoryContext oldcontext;
|
||||
XLogRecPtr currlsn;
|
||||
int slotno;
|
||||
|
||||
/* check to see if caller supports us returning a tuplestore */
|
||||
@ -274,6 +275,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
currlsn = GetXLogWriteRecPtr();
|
||||
|
||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||
for (slotno = 0; slotno < max_replication_slots; slotno++)
|
||||
{
|
||||
@ -282,7 +285,6 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
Datum values[PG_GET_REPLICATION_SLOTS_COLS];
|
||||
bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
|
||||
WALAvailability walstate;
|
||||
XLogSegNo last_removed_seg;
|
||||
int i;
|
||||
|
||||
if (!slot->in_use)
|
||||
@ -380,6 +382,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
* we looked. If checkpointer signalled the process to
|
||||
* termination, then it's definitely lost; but if a process is
|
||||
* still alive, then "unreserved" seems more appropriate.
|
||||
*
|
||||
* If we do change it, save the state for safe_wal_size below.
|
||||
*/
|
||||
if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
|
||||
{
|
||||
@ -387,10 +391,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
pid = slot->active_pid;
|
||||
slot_contents.data.restart_lsn = slot->data.restart_lsn;
|
||||
SpinLockRelease(&slot->mutex);
|
||||
if (pid != 0)
|
||||
{
|
||||
values[i++] = CStringGetTextDatum("unreserved");
|
||||
walstate = WALAVAIL_UNRESERVED;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -398,18 +404,32 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
break;
|
||||
}
|
||||
|
||||
if (max_slot_wal_keep_size_mb >= 0 &&
|
||||
(walstate == WALAVAIL_RESERVED || walstate == WALAVAIL_EXTENDED) &&
|
||||
((last_removed_seg = XLogGetLastRemovedSegno()) != 0))
|
||||
{
|
||||
XLogRecPtr min_safe_lsn;
|
||||
|
||||
XLogSegNoOffsetToRecPtr(last_removed_seg + 1, 0,
|
||||
wal_segment_size, min_safe_lsn);
|
||||
values[i++] = Int64GetDatum(min_safe_lsn);
|
||||
}
|
||||
else
|
||||
/*
|
||||
* safe_wal_size is only computed for slots that have not been lost,
|
||||
* and only if there's a configured maximum size.
|
||||
*/
|
||||
if (walstate == WALAVAIL_REMOVED || max_slot_wal_keep_size_mb < 0)
|
||||
nulls[i++] = true;
|
||||
else
|
||||
{
|
||||
XLogSegNo targetSeg;
|
||||
XLogSegNo keepSegs;
|
||||
XLogSegNo failSeg;
|
||||
XLogRecPtr failLSN;
|
||||
|
||||
XLByteToSeg(slot_contents.data.restart_lsn, targetSeg, wal_segment_size);
|
||||
|
||||
/* determine how many segments slots can be kept by slots ... */
|
||||
keepSegs = XLogMBVarToSegs(max_slot_wal_keep_size_mb, wal_segment_size);
|
||||
/* ... and override by wal_keep_segments as needed */
|
||||
keepSegs = Max(keepSegs, wal_keep_segments);
|
||||
|
||||
/* if currpos reaches failLSN, we lose our segment */
|
||||
failSeg = targetSeg + keepSegs + 1;
|
||||
XLogSegNoOffsetToRecPtr(failSeg, 0, wal_segment_size, failLSN);
|
||||
|
||||
values[i++] = Int64GetDatum(failLSN - currlsn);
|
||||
}
|
||||
|
||||
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
|
||||
|
||||
|
@ -121,6 +121,13 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
|
||||
#define XLByteToPrevSeg(xlrp, logSegNo, wal_segsz_bytes) \
|
||||
logSegNo = ((xlrp) - 1) / (wal_segsz_bytes)
|
||||
|
||||
/*
|
||||
* Convert values of GUCs measured in megabytes to equiv. segment count.
|
||||
* Rounds down.
|
||||
*/
|
||||
#define XLogMBVarToSegs(mbvar, wal_segsz_bytes) \
|
||||
((mbvar) / ((wal_segsz_bytes) / (1024 * 1024)))
|
||||
|
||||
/*
|
||||
* Is an XLogRecPtr within a particular XLOG segment?
|
||||
*
|
||||
|
@ -53,6 +53,6 @@
|
||||
*/
|
||||
|
||||
/* yyyymmddN */
|
||||
#define CATALOG_VERSION_NO 202005171
|
||||
#define CATALOG_VERSION_NO 202007071
|
||||
|
||||
#endif
|
||||
|
@ -10063,9 +10063,9 @@
|
||||
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
|
||||
proretset => 't', provolatile => 's', prorettype => 'record',
|
||||
proargtypes => '',
|
||||
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,pg_lsn}',
|
||||
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}',
|
||||
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
|
||||
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,min_safe_lsn}',
|
||||
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size}',
|
||||
prosrc => 'pg_get_replication_slots' },
|
||||
{ oid => '3786', descr => 'set up a logical replication slot',
|
||||
proname => 'pg_create_logical_replication_slot', provolatile => 'v',
|
||||
|
@ -28,7 +28,7 @@ $node_master->safe_psql('postgres',
|
||||
|
||||
# The slot state and remain should be null before the first connection
|
||||
my $result = $node_master->safe_psql('postgres',
|
||||
"SELECT restart_lsn IS NULL, wal_status is NULL, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
|
||||
"SELECT restart_lsn IS NULL, wal_status is NULL, safe_wal_size is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
|
||||
);
|
||||
is($result, "t|t|t", 'check the state of non-reserved slot is "unknown"');
|
||||
|
||||
@ -52,9 +52,9 @@ $node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
|
||||
# Stop standby
|
||||
$node_standby->stop;
|
||||
|
||||
# Preparation done, the slot is the state "normal" now
|
||||
# Preparation done, the slot is the state "reserved" now
|
||||
$result = $node_master->safe_psql('postgres',
|
||||
"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
|
||||
"SELECT wal_status, safe_wal_size IS NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
|
||||
);
|
||||
is($result, "reserved|t", 'check the catching-up state');
|
||||
|
||||
@ -64,7 +64,7 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
|
||||
|
||||
# The slot is always "safe" when fitting max_wal_size
|
||||
$result = $node_master->safe_psql('postgres',
|
||||
"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
|
||||
"SELECT wal_status, safe_wal_size IS NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
|
||||
);
|
||||
is($result, "reserved|t",
|
||||
'check that it is safe if WAL fits in max_wal_size');
|
||||
@ -74,7 +74,7 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
|
||||
|
||||
# The slot is always "safe" when max_slot_wal_keep_size is not set
|
||||
$result = $node_master->safe_psql('postgres',
|
||||
"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
|
||||
"SELECT wal_status, safe_wal_size IS NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
|
||||
);
|
||||
is($result, "reserved|t", 'check that slot is working');
|
||||
|
||||
@ -94,9 +94,7 @@ max_slot_wal_keep_size = ${max_slot_wal_keep_size_mb}MB
|
||||
));
|
||||
$node_master->reload;
|
||||
|
||||
# The slot is in safe state. The distance from the min_safe_lsn should
|
||||
# be as almost (max_slot_wal_keep_size - 1) times large as the segment
|
||||
# size
|
||||
# The slot is in safe state.
|
||||
|
||||
$result = $node_master->safe_psql('postgres',
|
||||
"SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'");
|
||||
@ -110,7 +108,7 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
|
||||
$result = $node_master->safe_psql('postgres',
|
||||
"SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'");
|
||||
is($result, "reserved",
|
||||
'check that min_safe_lsn gets close to the current LSN');
|
||||
'check that safe_wal_size gets close to the current LSN');
|
||||
|
||||
# The standby can reconnect to master
|
||||
$node_standby->start;
|
||||
@ -152,9 +150,9 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
|
||||
# Advance WAL again without checkpoint; remain goes to 0.
|
||||
advance_wal($node_master, 1);
|
||||
|
||||
# Slot gets into 'unreserved' state
|
||||
# Slot gets into 'unreserved' state and safe_wal_size is negative
|
||||
$result = $node_master->safe_psql('postgres',
|
||||
"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
|
||||
"SELECT wal_status, safe_wal_size <= 0 FROM pg_replication_slots WHERE slot_name = 'rep1'"
|
||||
);
|
||||
is($result, "unreserved|t",
|
||||
'check that the slot state changes to "unreserved"');
|
||||
@ -186,7 +184,7 @@ ok( find_in_log(
|
||||
|
||||
# This slot should be broken
|
||||
$result = $node_master->safe_psql('postgres',
|
||||
"SELECT slot_name, active, restart_lsn IS NULL, wal_status, min_safe_lsn FROM pg_replication_slots WHERE slot_name = 'rep1'"
|
||||
"SELECT slot_name, active, restart_lsn IS NULL, wal_status, safe_wal_size FROM pg_replication_slots WHERE slot_name = 'rep1'"
|
||||
);
|
||||
is($result, "rep1|f|t|lost|",
|
||||
'check that the slot became inactive and the state "lost" persists');
|
||||
|
@ -1464,8 +1464,8 @@ pg_replication_slots| SELECT l.slot_name,
|
||||
l.restart_lsn,
|
||||
l.confirmed_flush_lsn,
|
||||
l.wal_status,
|
||||
l.min_safe_lsn
|
||||
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, min_safe_lsn)
|
||||
l.safe_wal_size
|
||||
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size)
|
||||
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
|
||||
pg_roles| SELECT pg_authid.rolname,
|
||||
pg_authid.rolsuper,
|
||||
|
Loading…
x
Reference in New Issue
Block a user