diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 24ab1b2b21..b2fe2d04cc 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -1836,7 +1836,7 @@ PerformWalRecovery(void) */ if (waitLSN && (XLogRecoveryCtl->lastReplayedEndRecPtr >= - pg_atomic_read_u64(&waitLSN->minLSN))) + pg_atomic_read_u64(&waitLSN->minWaitedLSN))) WaitLSNSetLatches(XLogRecoveryCtl->lastReplayedEndRecPtr); /* Else, try to fetch the next WAL record */ diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c index 63e9ebf173..51a34d422e 100644 --- a/src/backend/commands/waitlsn.c +++ b/src/backend/commands/waitlsn.c @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * waitlsn.c - * Implements waiting for the given LSN, which is used in + * Implements waiting for the given replay LSN, which is used in * CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8). * * Copyright (c) 2024, PostgreSQL Global Development Group @@ -26,21 +26,17 @@ #include "storage/latch.h" #include "storage/proc.h" #include "storage/shmem.h" +#include "utils/fmgrprotos.h" #include "utils/pg_lsn.h" #include "utils/snapmgr.h" -#include "utils/fmgrprotos.h" #include "utils/wait_event_types.h" -/* Add to / delete from shared memory array */ -static void addLSNWaiter(XLogRecPtr lsn); -static void deleteLSNWaiter(void); +static int lsn_cmp(const pairingheap_node *a, const pairingheap_node *b, + void *arg); struct WaitLSNState *waitLSN = NULL; -static volatile sig_atomic_t haveShmemItem = false; -/* - * Report the amount of shared memory space needed for WaitLSNState - */ +/* Report the amount of shared memory space needed for WaitLSNState. */ Size WaitLSNShmemSize(void) { @@ -51,7 +47,7 @@ WaitLSNShmemSize(void) return size; } -/* Initialize the WaitLSNState in the shared memory */ +/* Initialize the WaitLSNState in the shared memory. */ void WaitLSNShmemInit(void) { @@ -62,81 +58,93 @@ WaitLSNShmemInit(void) &found); if (!found) { - SpinLockInit(&waitLSN->mutex); - waitLSN->numWaitedProcs = 0; - pg_atomic_init_u64(&waitLSN->minLSN, PG_UINT64_MAX); + SpinLockInit(&waitLSN->waitersHeapMutex); + pg_atomic_init_u64(&waitLSN->minWaitedLSN, PG_UINT64_MAX); + pairingheap_initialize(&waitLSN->waitersHeap, lsn_cmp, NULL); + memset(&waitLSN->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo)); } } /* - * Add the information about the LSN waiter backend to the shared memory - * array. + * Comparison function for waitLSN->waitersHeap heap. Waiting processes are + * ordered by lsn, so that the waiter with smallest lsn is at the top. + */ +static int +lsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) +{ + const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a); + const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b); + + if (aproc->waitLSN < bproc->waitLSN) + return 1; + else if (aproc->waitLSN > bproc->waitLSN) + return -1; + else + return 0; +} + +/* + * Update waitLSN->minWaitedLSN according to the current state of + * waitLSN->waitersHeap. + */ +static void +updateMinWaitedLSN(void) +{ + XLogRecPtr minWaitedLSN = PG_UINT64_MAX; + + if (!pairingheap_is_empty(&waitLSN->waitersHeap)) + { + pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap); + + minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN; + } + + pg_atomic_write_u64(&waitLSN->minWaitedLSN, minWaitedLSN); +} + +/* + * Put the current process into the heap of LSN waiters. */ static void addLSNWaiter(XLogRecPtr lsn) { - WaitLSNProcInfo cur; - int i; + WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber]; - cur.procnum = MyProcNumber; - cur.waitLSN = lsn; + Assert(!procInfo->inHeap); - SpinLockAcquire(&waitLSN->mutex); + procInfo->procnum = MyProcNumber; + procInfo->waitLSN = lsn; - for (i = 0; i < waitLSN->numWaitedProcs; i++) - { - if (waitLSN->procInfos[i].waitLSN >= cur.waitLSN) - { - WaitLSNProcInfo tmp; + SpinLockAcquire(&waitLSN->waitersHeapMutex); - tmp = waitLSN->procInfos[i]; - waitLSN->procInfos[i] = cur; - cur = tmp; - } - } - waitLSN->procInfos[i] = cur; - waitLSN->numWaitedProcs++; + pairingheap_add(&waitLSN->waitersHeap, &procInfo->phNode); + procInfo->inHeap = true; + updateMinWaitedLSN(); - pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN); - SpinLockRelease(&waitLSN->mutex); + SpinLockRelease(&waitLSN->waitersHeapMutex); } /* - * Delete the information about the LSN waiter backend from the shared memory - * array. + * Remove the current process from the heap of LSN waiters if it's there. */ static void deleteLSNWaiter(void) { - int i; - bool found = false; + WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber]; - SpinLockAcquire(&waitLSN->mutex); + SpinLockAcquire(&waitLSN->waitersHeapMutex); - for (i = 0; i < waitLSN->numWaitedProcs; i++) + if (!procInfo->inHeap) { - if (waitLSN->procInfos[i].procnum == MyProcNumber) - found = true; - - if (found && i < waitLSN->numWaitedProcs - 1) - { - waitLSN->procInfos[i] = waitLSN->procInfos[i + 1]; - } - } - - if (!found) - { - SpinLockRelease(&waitLSN->mutex); + SpinLockRelease(&waitLSN->waitersHeapMutex); return; } - waitLSN->numWaitedProcs--; - if (waitLSN->numWaitedProcs != 0) - pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN); - else - pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX); + pairingheap_remove(&waitLSN->waitersHeap, &procInfo->phNode); + procInfo->inHeap = false; + updateMinWaitedLSN(); - SpinLockRelease(&waitLSN->mutex); + SpinLockRelease(&waitLSN->waitersHeapMutex); } /* @@ -148,41 +156,33 @@ WaitLSNSetLatches(XLogRecPtr currentLSN) { int i; int *wakeUpProcNums; - int numWakeUpProcs; + int numWakeUpProcs = 0; wakeUpProcNums = palloc(sizeof(int) * MaxBackends); - SpinLockAcquire(&waitLSN->mutex); + SpinLockAcquire(&waitLSN->waitersHeapMutex); /* - * Remember processes, whose waited LSNs are already replayed. We should - * set their latches later after spinlock release. + * Iterate the pairing heap of waiting processes till we find LSN not yet + * replayed. Record the process numbers to set their latches later. */ - for (i = 0; i < waitLSN->numWaitedProcs; i++) + while (!pairingheap_is_empty(&waitLSN->waitersHeap)) { + pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap); + WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node); + if (!XLogRecPtrIsInvalid(currentLSN) && - waitLSN->procInfos[i].waitLSN > currentLSN) + procInfo->waitLSN > currentLSN) break; - wakeUpProcNums[i] = waitLSN->procInfos[i].procnum; + wakeUpProcNums[numWakeUpProcs++] = procInfo->procnum; + (void) pairingheap_remove_first(&waitLSN->waitersHeap); + procInfo->inHeap = false; } - /* - * Immediately remove those processes from the shmem array. Otherwise, - * shmem array items will be here till corresponding processes wake up and - * delete themselves. - */ - numWakeUpProcs = i; - for (i = 0; i < waitLSN->numWaitedProcs - numWakeUpProcs; i++) - waitLSN->procInfos[i] = waitLSN->procInfos[i + numWakeUpProcs]; - waitLSN->numWaitedProcs -= numWakeUpProcs; + updateMinWaitedLSN(); - if (waitLSN->numWaitedProcs != 0) - pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN); - else - pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX); - - SpinLockRelease(&waitLSN->mutex); + SpinLockRelease(&waitLSN->waitersHeapMutex); /* * Set latches for processes, whose waited LSNs are already replayed. This @@ -204,7 +204,7 @@ WaitLSNSetLatches(XLogRecPtr currentLSN) void WaitLSNCleanup(void) { - if (haveShmemItem) + if (waitLSN->procInfos[MyProcNumber].inHeap) deleteLSNWaiter(); } @@ -222,7 +222,7 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout) Assert(waitLSN); /* Should be only called by a backend */ - Assert(MyBackendType == B_BACKEND); + Assert(MyBackendType == B_BACKEND && MyProcNumber <= MaxBackends); if (!RecoveryInProgress()) ereport(ERROR, @@ -238,7 +238,6 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout) endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout); addLSNWaiter(targetLSN); - haveShmemItem = true; for (;;) { @@ -280,17 +279,12 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout) if (targetLSN > currentLSN) { deleteLSNWaiter(); - haveShmemItem = false; ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X", LSN_FORMAT_ARGS(targetLSN), LSN_FORMAT_ARGS(currentLSN)))); } - else - { - haveShmemItem = false; - } } Datum diff --git a/src/backend/lib/pairingheap.c b/src/backend/lib/pairingheap.c index fe1deba13e..7858e5e076 100644 --- a/src/backend/lib/pairingheap.c +++ b/src/backend/lib/pairingheap.c @@ -44,12 +44,26 @@ pairingheap_allocate(pairingheap_comparator compare, void *arg) pairingheap *heap; heap = (pairingheap *) palloc(sizeof(pairingheap)); + pairingheap_initialize(heap, compare, arg); + + return heap; +} + +/* + * pairingheap_initialize + * + * Same as pairingheap_allocate(), but initializes the pairing heap in-place + * rather than allocating a new chunk of memory. Useful to store the pairing + * heap in a shared memory. + */ +void +pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, + void *arg) +{ heap->ph_compare = compare; heap->ph_arg = arg; heap->ph_root = NULL; - - return heap; } /* diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h index 10ef63f0c0..0d80248682 100644 --- a/src/include/commands/waitlsn.h +++ b/src/include/commands/waitlsn.h @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * waitlsn.h - * Declarations for LSN waiting routines. + * Declarations for LSN replay waiting routines. * * Copyright (c) 2024, PostgreSQL Global Development Group * @@ -12,23 +12,57 @@ #ifndef WAIT_LSN_H #define WAIT_LSN_H +#include "lib/pairingheap.h" #include "postgres.h" #include "port/atomics.h" #include "storage/spin.h" #include "tcop/dest.h" -/* Shared memory structures */ +/* + * WaitLSNProcInfo – the shared memory structure representing information + * about the single process, which may wait for LSN replay. An item of + * waitLSN->procInfos array. + */ typedef struct WaitLSNProcInfo { + /* + * A process number, same as the index of this item in waitLSN->procInfos. + * Stored for convenience. + */ int procnum; + + /* LSN, which this process is waiting for */ XLogRecPtr waitLSN; + + /* A pairing heap node for participation in waitLSN->waitersHeap */ + pairingheap_node phNode; + + /* A flag indicating that this item is added to waitLSN->waitersHeap */ + bool inHeap; } WaitLSNProcInfo; +/* + * WaitLSNState - the shared memory state for the replay LSN waiting facility. + */ typedef struct WaitLSNState { - pg_atomic_uint64 minLSN; - slock_t mutex; - int numWaitedProcs; + /* + * The minimum LSN value some process is waiting for. Used for the + * fast-path checking if we need to wake up any waiters after replaying a + * WAL record. + */ + pg_atomic_uint64 minWaitedLSN; + + /* + * A pairing heap of waiting processes order by LSN values (least LSN is + * on top). + */ + pairingheap waitersHeap; + + /* A mutex protecting the pairing heap above */ + slock_t waitersHeapMutex; + + /* An array with per-process information, indexed by the process number */ WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]; } WaitLSNState; diff --git a/src/include/lib/pairingheap.h b/src/include/lib/pairingheap.h index 7eade81535..9e1c26033a 100644 --- a/src/include/lib/pairingheap.h +++ b/src/include/lib/pairingheap.h @@ -77,6 +77,9 @@ typedef struct pairingheap extern pairingheap *pairingheap_allocate(pairingheap_comparator compare, void *arg); +extern void pairingheap_initialize(pairingheap *heap, + pairingheap_comparator compare, + void *arg); extern void pairingheap_free(pairingheap *heap); extern void pairingheap_add(pairingheap *heap, pairingheap_node *node); extern pairingheap_node *pairingheap_first(pairingheap *heap);