pgstat: revise replication slot API in preparation for shared memory stats.

Previously the pgstat <-> replication slots API was done with on the basis of
names. However, the upcoming move to storing stats in shared memory makes it
more convenient to use a integer as key.

Change the replication slot functions to take the slot rather than the slot
name, and expose ReplicationSlotIndex() to compute the index of an replication
slot. Special handling will be required for restarts, as the index is not
stable across restarts. For now pgstat internally still uses names.

Rename pgstat_report_replslot_{create,drop}() to
pgstat_{create,drop}_replslot() to match the functions for other kinds of
stats.

Reviewed-By: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Discussion: https://postgr.es/m/20220404041516.cctrvpadhuriawlq@alap3.anarazel.de
This commit is contained in:
Andres Freund 2022-04-06 18:26:17 -07:00
parent 8b1dccd37c
commit e41aed674f
6 changed files with 41 additions and 12 deletions

View File

@ -862,7 +862,15 @@ pgstat_vacuum_stat(void)
CHECK_FOR_INTERRUPTS();
if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL)
pgstat_report_replslot_drop(NameStr(slotentry->slotname));
{
PgStat_MsgReplSlot msg;
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
namestrcpy(&msg.m_slotname, NameStr(slotentry->slotname));
msg.m_create = false;
msg.m_drop = true;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
}
}

View File

@ -1921,7 +1921,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
repSlotStat.total_txns = rb->totalTxns;
repSlotStat.total_bytes = rb->totalBytes;
pgstat_report_replslot(&repSlotStat);
pgstat_report_replslot(ctx->slot, &repSlotStat);
rb->spillTxns = 0;
rb->spillCount = 0;

View File

@ -356,7 +356,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
* ReplicationSlotAllocationLock.
*/
if (SlotIsLogical(slot))
pgstat_report_replslot_create(NameStr(slot->data.name));
pgstat_create_replslot(slot);
/*
* Now that the slot has been marked as in_use and active, it's safe to
@ -399,6 +399,22 @@ SearchNamedReplicationSlot(const char *name, bool need_lock)
return slot;
}
/*
* Return the index of the replication slot in
* ReplicationSlotCtl->replication_slots.
*
* This is mainly useful to have an efficient key for storing replication slot
* stats.
*/
int
ReplicationSlotIndex(ReplicationSlot *slot)
{
Assert(slot >= ReplicationSlotCtl->replication_slots &&
slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
return slot - ReplicationSlotCtl->replication_slots;
}
/*
* Find a previously created slot and mark it as used by this process.
*
@ -746,7 +762,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
* doesn't seem worth doing as in practice this won't happen frequently.
*/
if (SlotIsLogical(slot))
pgstat_report_replslot_drop(NameStr(slot->data.name));
pgstat_drop_replslot(slot);
/*
* We release this at the very end, so that nobody starts trying to create

View File

@ -69,7 +69,7 @@ pgstat_reset_replslot(const char *name)
* Report replication slot statistics.
*/
void
pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat)
pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat)
{
PgStat_MsgReplSlot msg;
@ -93,14 +93,17 @@ pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat)
/*
* Report replication slot creation.
*
* NB: This gets called with ReplicationSlotAllocationLock already held, be
* careful about calling back into slot.c.
*/
void
pgstat_report_replslot_create(const char *slotname)
pgstat_create_replslot(ReplicationSlot *slot)
{
PgStat_MsgReplSlot msg;
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
namestrcpy(&msg.m_slotname, slotname);
namestrcpy(&msg.m_slotname, NameStr(slot->data.name));
msg.m_create = true;
msg.m_drop = false;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
@ -110,12 +113,12 @@ pgstat_report_replslot_create(const char *slotname)
* Report replication slot drop.
*/
void
pgstat_report_replslot_drop(const char *slotname)
pgstat_drop_replslot(ReplicationSlot *slot)
{
PgStat_MsgReplSlot msg;
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
namestrcpy(&msg.m_slotname, slotname);
namestrcpy(&msg.m_slotname, NameStr(slot->data.name));
msg.m_create = false;
msg.m_drop = true;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));

View File

@ -1137,9 +1137,10 @@ extern PgStat_TableStatus *find_tabstat_entry(Oid rel_id);
*/
extern void pgstat_reset_replslot(const char *name);
extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
extern void pgstat_report_replslot_create(const char *slotname);
extern void pgstat_report_replslot_drop(const char *slotname);
struct ReplicationSlot;
extern void pgstat_report_replslot(struct ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat);
extern void pgstat_create_replslot(struct ReplicationSlot *slot);
extern void pgstat_drop_replslot(struct ReplicationSlot *slot);
/*

View File

@ -216,6 +216,7 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
extern void ReplicationSlotsDropDBSlots(Oid dboid);
extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
extern int ReplicationSlotIndex(ReplicationSlot *slot);
extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot);
extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);