Seems I was too optimistic in supposing that sinval's maxMsgNum could be
read and written without a lock. The value itself is atomic, sure, but on processors with weak memory ordering it's possible for a reader to see the value change before it sees the associated message written into the buffer array. Fix by introducing a spinlock that's used just to read and write maxMsgNum. (We could do this with less overhead if we recognized a concept of "memory access barrier"; is it worth introducing such a thing? At the moment probably not --- I can't measure any clear slowdown from adding the spinlock, so this solution is probably fine.) Per buildfarm results.
This commit is contained in:
parent
fad153ec45
commit
dab421d2f0
@ -8,7 +8,7 @@
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.71 2008/06/19 21:32:56 tgl Exp $
|
||||
* $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.72 2008/06/20 00:24:53 tgl Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -23,6 +23,7 @@
|
||||
#include "storage/proc.h"
|
||||
#include "storage/shmem.h"
|
||||
#include "storage/sinvaladt.h"
|
||||
#include "storage/spin.h"
|
||||
|
||||
|
||||
/*
|
||||
@ -84,10 +85,20 @@
|
||||
* can operate in parallel with one or more readers, because the writer
|
||||
* has no need to touch anyone's ProcState, except in the infrequent cases
|
||||
* when SICleanupQueue is needed. The only point of overlap is that
|
||||
* the writer might change maxMsgNum while readers are looking at it.
|
||||
* This should be okay: we are assuming that fetching or storing an int
|
||||
* is atomic, an assumption also made elsewhere in Postgres. However
|
||||
* readers mustn't assume that maxMsgNum isn't changing under them.
|
||||
* the writer wants to change maxMsgNum while readers need to read it.
|
||||
* We deal with that by having a spinlock that readers must take for just
|
||||
* long enough to read maxMsgNum, while writers take it for just long enough
|
||||
* to write maxMsgNum. (The exact rule is that you need the spinlock to
|
||||
* read maxMsgNum if you are not holding SInvalWriteLock, and you need the
|
||||
* spinlock to write maxMsgNum unless you are holding both locks.)
|
||||
*
|
||||
* Note: since maxMsgNum is an int and hence presumably atomically readable/
|
||||
* writable, the spinlock might seem unnecessary. The reason it is needed
|
||||
* is to provide a memory barrier: we need to be sure that messages written
|
||||
* to the array are actually there before maxMsgNum is increased, and that
|
||||
* readers will see that data after fetching maxMsgNum. Multiprocessors
|
||||
* that have weak memory-ordering guarantees can fail without the memory
|
||||
* barrier instructions that are included in the spinlock sequences.
|
||||
*/
|
||||
|
||||
|
||||
@ -153,6 +164,8 @@ typedef struct SISeg
|
||||
int lastBackend; /* index of last active procState entry, +1 */
|
||||
int maxBackends; /* size of procState array */
|
||||
|
||||
slock_t msgnumLock; /* spinlock protecting maxMsgNum */
|
||||
|
||||
/*
|
||||
* Circular buffer holding shared-inval messages
|
||||
*/
|
||||
@ -209,12 +222,13 @@ CreateSharedInvalidationState(void)
|
||||
if (found)
|
||||
return;
|
||||
|
||||
/* Clear message counters, save size of procState array */
|
||||
/* Clear message counters, save size of procState array, init spinlock */
|
||||
shmInvalBuffer->minMsgNum = 0;
|
||||
shmInvalBuffer->maxMsgNum = 0;
|
||||
shmInvalBuffer->nextThreshold = CLEANUP_MIN;
|
||||
shmInvalBuffer->lastBackend = 0;
|
||||
shmInvalBuffer->maxBackends = MaxBackends;
|
||||
SpinLockInit(&shmInvalBuffer->msgnumLock);
|
||||
|
||||
/* The buffer[] array is initially all unused, so we need not fill it */
|
||||
|
||||
@ -365,6 +379,7 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
|
||||
{
|
||||
int nthistime = Min(n, WRITE_QUANTUM);
|
||||
int numMsgs;
|
||||
int max;
|
||||
|
||||
n -= nthistime;
|
||||
|
||||
@ -388,10 +403,21 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
|
||||
/*
|
||||
* Insert new message(s) into proper slot of circular buffer
|
||||
*/
|
||||
max = segP->maxMsgNum;
|
||||
while (nthistime-- > 0)
|
||||
{
|
||||
segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data++;
|
||||
segP->maxMsgNum++;
|
||||
segP->buffer[max % MAXNUMMESSAGES] = *data++;
|
||||
max++;
|
||||
}
|
||||
|
||||
/* Update current value of maxMsgNum using spinlock */
|
||||
{
|
||||
/* use volatile pointer to prevent code rearrangement */
|
||||
volatile SISeg *vsegP = segP;
|
||||
|
||||
SpinLockAcquire(&vsegP->msgnumLock);
|
||||
vsegP->maxMsgNum = max;
|
||||
SpinLockRelease(&vsegP->msgnumLock);
|
||||
}
|
||||
|
||||
LWLockRelease(SInvalWriteLock);
|
||||
@ -431,6 +457,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
|
||||
{
|
||||
SISeg *segP;
|
||||
ProcState *stateP;
|
||||
int max;
|
||||
int n;
|
||||
|
||||
LWLockAcquire(SInvalReadLock, LW_SHARED);
|
||||
@ -438,6 +465,16 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
|
||||
segP = shmInvalBuffer;
|
||||
stateP = &segP->procState[MyBackendId - 1];
|
||||
|
||||
/* Fetch current value of maxMsgNum using spinlock */
|
||||
{
|
||||
/* use volatile pointer to prevent code rearrangement */
|
||||
volatile SISeg *vsegP = segP;
|
||||
|
||||
SpinLockAcquire(&vsegP->msgnumLock);
|
||||
max = vsegP->maxMsgNum;
|
||||
SpinLockRelease(&vsegP->msgnumLock);
|
||||
}
|
||||
|
||||
if (stateP->resetState)
|
||||
{
|
||||
/*
|
||||
@ -445,7 +482,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
|
||||
* since the reset, as well; and that means we should clear the
|
||||
* signaled flag, too.
|
||||
*/
|
||||
stateP->nextMsgNum = segP->maxMsgNum;
|
||||
stateP->nextMsgNum = max;
|
||||
stateP->resetState = false;
|
||||
stateP->signaled = false;
|
||||
LWLockRelease(SInvalReadLock);
|
||||
@ -459,13 +496,9 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
|
||||
* There may be other backends that haven't read the message(s), so we
|
||||
* cannot delete them here. SICleanupQueue() will eventually remove them
|
||||
* from the queue.
|
||||
*
|
||||
* Note: depending on the compiler, we might read maxMsgNum only once
|
||||
* here, or each time through the loop. It doesn't matter (as long as
|
||||
* each fetch is atomic).
|
||||
*/
|
||||
n = 0;
|
||||
while (n < datasize && stateP->nextMsgNum < segP->maxMsgNum)
|
||||
while (n < datasize && stateP->nextMsgNum < max)
|
||||
{
|
||||
data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
|
||||
stateP->nextMsgNum++;
|
||||
@ -474,7 +507,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
|
||||
/*
|
||||
* Reset our "signaled" flag whenever we have caught up completely.
|
||||
*/
|
||||
if (stateP->nextMsgNum >= segP->maxMsgNum)
|
||||
if (stateP->nextMsgNum >= max)
|
||||
stateP->signaled = false;
|
||||
|
||||
LWLockRelease(SInvalReadLock);
|
||||
|
Loading…
x
Reference in New Issue
Block a user