diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index aa1552e031..bfbf7f903f 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -61,7 +61,6 @@ #include "port/pg_lfind.h" #include "storage/proc.h" #include "storage/procarray.h" -#include "storage/spin.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/rel.h" @@ -82,7 +81,6 @@ typedef struct ProcArrayStruct int numKnownAssignedXids; /* current # of valid entries */ int tailKnownAssignedXids; /* index of oldest valid element */ int headKnownAssignedXids; /* index of newest element, + 1 */ - slock_t known_assigned_xids_lck; /* protects head/tail pointers */ /* * Highest subxid that has been removed from KnownAssignedXids array to @@ -441,7 +439,6 @@ CreateSharedProcArray(void) procArray->numKnownAssignedXids = 0; procArray->tailKnownAssignedXids = 0; procArray->headKnownAssignedXids = 0; - SpinLockInit(&procArray->known_assigned_xids_lck); procArray->lastOverflowedXid = InvalidTransactionId; procArray->replication_slot_xmin = InvalidTransactionId; procArray->replication_slot_catalog_xmin = InvalidTransactionId; @@ -4533,22 +4530,19 @@ KnownAssignedTransactionIdsIdleMaintenance(void) * during normal running). Compressing unused entries out of the array * likewise requires exclusive lock. To add XIDs to the array, we just insert * them into slots to the right of the head pointer and then advance the head - * pointer. This wouldn't require any lock at all, except that on machines - * with weak memory ordering we need to be careful that other processors - * see the array element changes before they see the head pointer change. - * We handle this by using a spinlock to protect reads and writes of the - * head/tail pointers. (We could dispense with the spinlock if we were to - * create suitable memory access barrier primitives and use those instead.) - * The spinlock must be taken to read or write the head/tail pointers unless - * the caller holds ProcArrayLock exclusively. + * pointer. This doesn't require any lock at all, but on machines with weak + * memory ordering, we need to be careful that other processors see the array + * element changes before they see the head pointer change. We handle this by + * using memory barriers when reading or writing the head/tail pointers (unless + * the caller holds ProcArrayLock exclusively). * * Algorithmic analysis: * * If we have a maximum of M slots, with N XIDs currently spread across * S elements then we have N <= S <= M always. * - * * Adding a new XID is O(1) and needs little locking (unless compression - * must happen) + * * Adding a new XID is O(1) and needs no lock (unless compression must + * happen) * * Compressing the array is O(S) and requires exclusive lock * * Removing an XID is O(logS) and requires exclusive lock * * Taking a snapshot is O(S) and requires shared lock @@ -4778,22 +4772,15 @@ KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid, pArray->numKnownAssignedXids += nxids; /* - * Now update the head pointer. We use a spinlock to protect this - * pointer, not because the update is likely to be non-atomic, but to - * ensure that other processors see the above array updates before they - * see the head pointer change. - * - * If we're holding ProcArrayLock exclusively, there's no need to take the - * spinlock. + * Now update the head pointer. We use a write barrier to ensure that + * other processors see the above array updates before they see the head + * pointer change. The barrier isn't required if we're holding + * ProcArrayLock exclusively. */ - if (exclusive_lock) - pArray->headKnownAssignedXids = head; - else - { - SpinLockAcquire(&pArray->known_assigned_xids_lck); - pArray->headKnownAssignedXids = head; - SpinLockRelease(&pArray->known_assigned_xids_lck); - } + if (!exclusive_lock) + pg_write_barrier(); + + pArray->headKnownAssignedXids = head; } /* @@ -4815,20 +4802,15 @@ KnownAssignedXidsSearch(TransactionId xid, bool remove) int tail; int result_index = -1; - if (remove) - { - /* we hold ProcArrayLock exclusively, so no need for spinlock */ - tail = pArray->tailKnownAssignedXids; - head = pArray->headKnownAssignedXids; - } - else - { - /* take spinlock to ensure we see up-to-date array contents */ - SpinLockAcquire(&pArray->known_assigned_xids_lck); - tail = pArray->tailKnownAssignedXids; - head = pArray->headKnownAssignedXids; - SpinLockRelease(&pArray->known_assigned_xids_lck); - } + tail = pArray->tailKnownAssignedXids; + head = pArray->headKnownAssignedXids; + + /* + * Only the startup process removes entries, so we don't need the read + * barrier in that case. + */ + if (!remove) + pg_read_barrier(); /* pairs with KnownAssignedXidsAdd */ /* * Standard binary search. Note we can ignore the KnownAssignedXidsValid @@ -5066,13 +5048,11 @@ KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin, * cannot enter and then leave the array while we hold ProcArrayLock. We * might miss newly-added xids, but they should be >= xmax so irrelevant * anyway. - * - * Must take spinlock to ensure we see up-to-date array contents. */ - SpinLockAcquire(&procArray->known_assigned_xids_lck); tail = procArray->tailKnownAssignedXids; head = procArray->headKnownAssignedXids; - SpinLockRelease(&procArray->known_assigned_xids_lck); + + pg_read_barrier(); /* pairs with KnownAssignedXidsAdd */ for (i = tail; i < head; i++) { @@ -5119,10 +5099,10 @@ KnownAssignedXidsGetOldestXmin(void) /* * Fetch head just once, since it may change while we loop. */ - SpinLockAcquire(&procArray->known_assigned_xids_lck); tail = procArray->tailKnownAssignedXids; head = procArray->headKnownAssignedXids; - SpinLockRelease(&procArray->known_assigned_xids_lck); + + pg_read_barrier(); /* pairs with KnownAssignedXidsAdd */ for (i = tail; i < head; i++) {