Fix RBM_ZERO_AND_LOCK.

Commit 210622c6 accidentally zeroed out pages even if they were found in
the buffer pool.  It should always lock the page, but it should only
zero pages that were not already valid.  Otherwise, concurrent readers
that hold only a pin could see corrupted page contents changing under
their feet.

While here, rename ZeroAndLockBuffer() to match the RBM_ flag name.
Also restore a some useful comments lost by 210622c6's refactoring, and
add some new ones to clarify why we need to use the BM_IO_IN_PROGRESS
infrastructure despite not doing I/O.

Reported-by: Noah Misch <noah@leadboat.com>
Reported-by: Alexander Lakhin <exclusion@gmail.com>
Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org> (earlier version)
Reviewed-by: Robert Haas <robertmhaas@gmail.com> (earlier version)
Discussion: https://postgr.es/m/20240512171658.7e.nmisch@google.com
Discussion: https://postgr.es/m/7ed10231-ce47-03d5-d3f9-4aea0dc7d5a4%40gmail.com
This commit is contained in:
Thomas Munro 2024-06-10 11:43:41 +12:00
parent 00ac25a3c3
commit e656657f2b

View File

@ -1010,44 +1010,90 @@ ExtendBufferedRelTo(BufferManagerRelation bmr,
}
/*
* Zero a buffer and lock it, as part of the implementation of
* Lock and optionally zero a buffer, as part of the implementation of
* RBM_ZERO_AND_LOCK or RBM_ZERO_AND_CLEANUP_LOCK. The buffer must be already
* pinned. It does not have to be valid, but it is valid and locked on
* return.
* pinned. If the buffer is not already valid, it is zeroed and made valid.
*/
static void
ZeroBuffer(Buffer buffer, ReadBufferMode mode)
ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
{
BufferDesc *bufHdr;
uint32 buf_state;
bool need_to_zero;
bool isLocalBuf = BufferIsLocal(buffer);
Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
if (BufferIsLocal(buffer))
if (already_valid)
{
/*
* If the caller already knew the buffer was valid, we can skip some
* header interaction. The caller just wants to lock the buffer.
*/
need_to_zero = false;
}
else if (isLocalBuf)
{
/* Simple case for non-shared buffers. */
bufHdr = GetLocalBufferDescriptor(-buffer - 1);
need_to_zero = (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
}
else
{
/*
* Take BM_IO_IN_PROGRESS, or discover that BM_VALID has been set
* concurrently. Even though we aren't doing I/O, that ensures that
* we don't zero a page that someone else has pinned. An exclusive
* content lock wouldn't be enough, because readers are allowed to
* drop the content lock after determining that a tuple is visible
* (see buffer access rules in README).
*/
bufHdr = GetBufferDescriptor(buffer - 1);
need_to_zero = StartBufferIO(bufHdr, true, false);
}
if (need_to_zero)
{
memset(BufferGetPage(buffer), 0, BLCKSZ);
/*
* Grab the buffer content lock before marking the page as valid, to
* make sure that no other backend sees the zeroed page before the
* caller has had a chance to initialize it.
*
* Since no-one else can be looking at the page contents yet, there is
* no difference between an exclusive lock and a cleanup-strength
* lock. (Note that we cannot use LockBuffer() or
* LockBufferForCleanup() here, because they assert that the buffer is
* already valid.)
*/
if (!isLocalBuf)
LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
if (isLocalBuf)
{
/* Only need to adjust flags */
uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
buf_state |= BM_VALID;
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
}
else
{
/* Set BM_VALID, terminate IO, and wake up any waiters */
TerminateBufferIO(bufHdr, false, BM_VALID, true);
}
}
else if (!isLocalBuf)
{
/*
* The buffer is valid, so we can't zero it. The caller still expects
* the page to be locked on return.
*/
if (mode == RBM_ZERO_AND_LOCK)
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
else
LockBufferForCleanup(buffer);
}
memset(BufferGetPage(buffer), 0, BLCKSZ);
if (BufferIsLocal(buffer))
{
buf_state = pg_atomic_read_u32(&bufHdr->state);
buf_state |= BM_VALID;
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
}
else
{
buf_state = LockBufHdr(bufHdr);
buf_state |= BM_VALID;
UnlockBufHdr(bufHdr, buf_state);
}
}
/*
@ -1185,7 +1231,7 @@ ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence,
buffer = PinBufferForBlock(rel, smgr, smgr_persistence,
forkNum, blockNum, strategy, &found);
ZeroBuffer(buffer, mode);
ZeroAndLockBuffer(buffer, mode, found);
return buffer;
}