mirror of https://github.com/postgres/postgres
bufmgr: Introduce infrastructure for faster relation extension
The primary bottlenecks for relation extension are: 1) The extension lock is held while acquiring a victim buffer for the new page. Acquiring a victim buffer can require writing out the old page contents including possibly needing to flush WAL. 2) When extending via ReadBuffer() et al, we write a zero page during the extension, and then later write out the actual page contents. This can nearly double the write rate. 3) The existing bulk relation extension infrastructure in hio.c just amortized the cost of acquiring the relation extension lock, but none of the other costs. Unfortunately 1) cannot currently be addressed in a central manner as the callers to ReadBuffer() need to acquire the extension lock. To address that, this this commit moves the responsibility for acquiring the extension lock into bufmgr.c functions. That allows to acquire the relation extension lock for just the required time. This will also allow us to improve relation extension further, without changing callers. The reason we write all-zeroes pages during relation extension is that we hope to get ENOSPC errors earlier that way (largely works, except for CoW filesystems). It is easier to handle out-of-space errors gracefully if the page doesn't yet contain actual tuples. This commit addresses 2), by using the recently introduced smgrzeroextend(), which extends the relation, without dirtying the kernel page cache for all the extended pages. To address 3), this commit introduces a function to extend a relation by multiple blocks at a time. There are three new exposed functions: ExtendBufferedRel() for extending the relation by a single block, ExtendBufferedRelBy() to extend a relation by multiple blocks at once, and ExtendBufferedRelTo() for extending a relation up to a certain size. To avoid duplicating code between ReadBuffer(P_NEW) and the new functions, ReadBuffer(P_NEW) now implements relation extension with ExtendBufferedRel(), using a flag to tell ExtendBufferedRel() that the relation lock is already held. Note that this commit does not yet lead to a meaningful performance or scalability improvement - for that uses of ReadBuffer(P_NEW) will need to be converted to ExtendBuffered*(), which will be done in subsequent commits. Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi> Reviewed-by: Melanie Plageman <melanieplageman@gmail.com> Discussion: https://postgr.es/m/20221029025420.eplyow6k7tgu6he3@awork3.anarazel.de
This commit is contained in:
parent
8eda731465
commit
31966b151e
|
@ -7776,33 +7776,52 @@ FROM pg_stat_get_backend_idset() AS backendid;
|
|||
<entry>Probe that fires when the two-phase portion of a checkpoint is
|
||||
complete.</entry>
|
||||
</row>
|
||||
<row>
|
||||
<entry><literal>buffer-extend-start</literal></entry>
|
||||
<entry><literal>(ForkNumber, BlockNumber, Oid, Oid, Oid, int, unsigned int)</literal></entry>
|
||||
<entry>Probe that fires when a relation extension starts.
|
||||
arg0 contains the fork to be extended. arg1, arg2, and arg3 contain the
|
||||
tablespace, database, and relation OIDs identifying the relation. arg4
|
||||
is the ID of the backend which created the temporary relation for a
|
||||
local buffer, or <symbol>InvalidBackendId</symbol> (-1) for a shared
|
||||
buffer. arg5 is the number of blocks the caller would like to extend
|
||||
by.</entry>
|
||||
</row>
|
||||
<row>
|
||||
<entry><literal>buffer-extend-done</literal></entry>
|
||||
<entry><literal>(ForkNumber, BlockNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber)</literal></entry>
|
||||
<entry>Probe that fires when a relation extension is complete.
|
||||
arg0 contains the fork to be extended. arg1, arg2, and arg3 contain the
|
||||
tablespace, database, and relation OIDs identifying the relation. arg4
|
||||
is the ID of the backend which created the temporary relation for a
|
||||
local buffer, or <symbol>InvalidBackendId</symbol> (-1) for a shared
|
||||
buffer. arg5 is the number of blocks the relation was extended by, this
|
||||
can be less than the number in the
|
||||
<literal>buffer-extend-start</literal> due to resource
|
||||
constraints. arg6 contains the BlockNumber of the first new
|
||||
block.</entry>
|
||||
</row>
|
||||
<row>
|
||||
<entry><literal>buffer-read-start</literal></entry>
|
||||
<entry><literal>(ForkNumber, BlockNumber, Oid, Oid, Oid, int, bool)</literal></entry>
|
||||
<entry><literal>(ForkNumber, BlockNumber, Oid, Oid, Oid, int)</literal></entry>
|
||||
<entry>Probe that fires when a buffer read is started.
|
||||
arg0 and arg1 contain the fork and block numbers of the page (but
|
||||
arg1 will be -1 if this is a relation extension request).
|
||||
arg0 and arg1 contain the fork and block numbers of the page.
|
||||
arg2, arg3, and arg4 contain the tablespace, database, and relation OIDs
|
||||
identifying the relation.
|
||||
arg5 is the ID of the backend which created the temporary relation for a
|
||||
local buffer, or <symbol>InvalidBackendId</symbol> (-1) for a shared buffer.
|
||||
arg6 is true for a relation extension request, false for normal
|
||||
read.</entry>
|
||||
</entry>
|
||||
</row>
|
||||
<row>
|
||||
<entry><literal>buffer-read-done</literal></entry>
|
||||
<entry><literal>(ForkNumber, BlockNumber, Oid, Oid, Oid, int, bool, bool)</literal></entry>
|
||||
<entry><literal>(ForkNumber, BlockNumber, Oid, Oid, Oid, int, bool)</literal></entry>
|
||||
<entry>Probe that fires when a buffer read is complete.
|
||||
arg0 and arg1 contain the fork and block numbers of the page (if this
|
||||
is a relation extension request, arg1 now contains the block number
|
||||
of the newly added block).
|
||||
arg0 and arg1 contain the fork and block numbers of the page.
|
||||
arg2, arg3, and arg4 contain the tablespace, database, and relation OIDs
|
||||
identifying the relation.
|
||||
arg5 is the ID of the backend which created the temporary relation for a
|
||||
local buffer, or <symbol>InvalidBackendId</symbol> (-1) for a shared buffer.
|
||||
arg6 is true for a relation extension request, false for normal
|
||||
read.
|
||||
arg7 is true if the buffer was found in the pool, false if not.</entry>
|
||||
arg6 is true if the buffer was found in the pool, false if not.</entry>
|
||||
</row>
|
||||
<row>
|
||||
<entry><literal>buffer-flush-start</literal></entry>
|
||||
|
|
|
@ -48,6 +48,7 @@
|
|||
#include "storage/buf_internals.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "storage/proc.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "storage/standby.h"
|
||||
|
@ -450,6 +451,22 @@ static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence,
|
|||
ForkNumber forkNum, BlockNumber blockNum,
|
||||
ReadBufferMode mode, BufferAccessStrategy strategy,
|
||||
bool *hit);
|
||||
static BlockNumber ExtendBufferedRelCommon(ExtendBufferedWhat eb,
|
||||
ForkNumber fork,
|
||||
BufferAccessStrategy strategy,
|
||||
uint32 flags,
|
||||
uint32 extend_by,
|
||||
BlockNumber extend_upto,
|
||||
Buffer *buffers,
|
||||
uint32 *extended_by);
|
||||
static BlockNumber ExtendBufferedRelShared(ExtendBufferedWhat eb,
|
||||
ForkNumber fork,
|
||||
BufferAccessStrategy strategy,
|
||||
uint32 flags,
|
||||
uint32 extend_by,
|
||||
BlockNumber extend_upto,
|
||||
Buffer *buffers,
|
||||
uint32 *extended_by);
|
||||
static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy);
|
||||
static void PinBuffer_Locked(BufferDesc *buf);
|
||||
static void UnpinBuffer(BufferDesc *buf);
|
||||
|
@ -785,6 +802,180 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum,
|
|||
mode, strategy, &hit);
|
||||
}
|
||||
|
||||
/*
|
||||
* Convenience wrapper around ExtendBufferedRelBy() extending by one block.
|
||||
*/
|
||||
Buffer
|
||||
ExtendBufferedRel(ExtendBufferedWhat eb,
|
||||
ForkNumber forkNum,
|
||||
BufferAccessStrategy strategy,
|
||||
uint32 flags)
|
||||
{
|
||||
Buffer buf;
|
||||
uint32 extend_by = 1;
|
||||
|
||||
ExtendBufferedRelBy(eb, forkNum, strategy, flags, extend_by,
|
||||
&buf, &extend_by);
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
/*
|
||||
* Extend relation by multiple blocks.
|
||||
*
|
||||
* Tries to extend the relation by extend_by blocks. Depending on the
|
||||
* availability of resources the relation may end up being extended by a
|
||||
* smaller number of pages (unless an error is thrown, always by at least one
|
||||
* page). *extended_by is updated to the number of pages the relation has been
|
||||
* extended to.
|
||||
*
|
||||
* buffers needs to be an array that is at least extend_by long. Upon
|
||||
* completion, the first extend_by array elements will point to a pinned
|
||||
* buffer.
|
||||
*
|
||||
* If EB_LOCK_FIRST is part of flags, the first returned buffer is
|
||||
* locked. This is useful for callers that want a buffer that is guaranteed to
|
||||
* be empty.
|
||||
*/
|
||||
BlockNumber
|
||||
ExtendBufferedRelBy(ExtendBufferedWhat eb,
|
||||
ForkNumber fork,
|
||||
BufferAccessStrategy strategy,
|
||||
uint32 flags,
|
||||
uint32 extend_by,
|
||||
Buffer *buffers,
|
||||
uint32 *extended_by)
|
||||
{
|
||||
Assert((eb.rel != NULL) != (eb.smgr != NULL));
|
||||
Assert(eb.smgr == NULL || eb.relpersistence != 0);
|
||||
Assert(extend_by > 0);
|
||||
|
||||
if (eb.smgr == NULL)
|
||||
{
|
||||
eb.smgr = RelationGetSmgr(eb.rel);
|
||||
eb.relpersistence = eb.rel->rd_rel->relpersistence;
|
||||
}
|
||||
|
||||
return ExtendBufferedRelCommon(eb, fork, strategy, flags,
|
||||
extend_by, InvalidBlockNumber,
|
||||
buffers, extended_by);
|
||||
}
|
||||
|
||||
/*
|
||||
* Extend the relation so it is at least extend_to blocks large, return buffer
|
||||
* (extend_to - 1).
|
||||
*
|
||||
* This is useful for callers that want to write a specific page, regardless
|
||||
* of the current size of the relation (e.g. useful for visibilitymap and for
|
||||
* crash recovery).
|
||||
*/
|
||||
Buffer
|
||||
ExtendBufferedRelTo(ExtendBufferedWhat eb,
|
||||
ForkNumber fork,
|
||||
BufferAccessStrategy strategy,
|
||||
uint32 flags,
|
||||
BlockNumber extend_to,
|
||||
ReadBufferMode mode)
|
||||
{
|
||||
BlockNumber current_size;
|
||||
uint32 extended_by = 0;
|
||||
Buffer buffer = InvalidBuffer;
|
||||
Buffer buffers[64];
|
||||
|
||||
Assert((eb.rel != NULL) != (eb.smgr != NULL));
|
||||
Assert(eb.smgr == NULL || eb.relpersistence != 0);
|
||||
Assert(extend_to != InvalidBlockNumber && extend_to > 0);
|
||||
Assert(mode == RBM_NORMAL || mode == RBM_ZERO_ON_ERROR ||
|
||||
mode == RBM_ZERO_AND_LOCK);
|
||||
|
||||
if (eb.smgr == NULL)
|
||||
{
|
||||
eb.smgr = RelationGetSmgr(eb.rel);
|
||||
eb.relpersistence = eb.rel->rd_rel->relpersistence;
|
||||
}
|
||||
|
||||
/*
|
||||
* If desired, create the file if it doesn't exist. If
|
||||
* smgr_cached_nblocks[fork] is positive then it must exist, no need for
|
||||
* an smgrexists call.
|
||||
*/
|
||||
if ((flags & EB_CREATE_FORK_IF_NEEDED) &&
|
||||
(eb.smgr->smgr_cached_nblocks[fork] == 0 ||
|
||||
eb.smgr->smgr_cached_nblocks[fork] == InvalidBlockNumber) &&
|
||||
!smgrexists(eb.smgr, fork))
|
||||
{
|
||||
LockRelationForExtension(eb.rel, ExclusiveLock);
|
||||
|
||||
/* could have been closed while waiting for lock */
|
||||
if (eb.rel)
|
||||
eb.smgr = RelationGetSmgr(eb.rel);
|
||||
|
||||
/* recheck, fork might have been created concurrently */
|
||||
if (!smgrexists(eb.smgr, fork))
|
||||
smgrcreate(eb.smgr, fork, flags & EB_PERFORMING_RECOVERY);
|
||||
|
||||
UnlockRelationForExtension(eb.rel, ExclusiveLock);
|
||||
}
|
||||
|
||||
/*
|
||||
* If requested, invalidate size cache, so that smgrnblocks asks the
|
||||
* kernel.
|
||||
*/
|
||||
if (flags & EB_CLEAR_SIZE_CACHE)
|
||||
eb.smgr->smgr_cached_nblocks[fork] = InvalidBlockNumber;
|
||||
|
||||
/*
|
||||
* Estimate how many pages we'll need to extend by. This avoids acquiring
|
||||
* unnecessarily many victim buffers.
|
||||
*/
|
||||
current_size = smgrnblocks(eb.smgr, fork);
|
||||
|
||||
if (mode == RBM_ZERO_AND_LOCK)
|
||||
flags |= EB_LOCK_TARGET;
|
||||
|
||||
while (current_size < extend_to)
|
||||
{
|
||||
uint32 num_pages = lengthof(buffers);
|
||||
BlockNumber first_block;
|
||||
|
||||
if ((uint64) current_size + num_pages > extend_to)
|
||||
num_pages = extend_to - current_size;
|
||||
|
||||
first_block = ExtendBufferedRelCommon(eb, fork, strategy, flags,
|
||||
num_pages, extend_to,
|
||||
buffers, &extended_by);
|
||||
|
||||
current_size = first_block + extended_by;
|
||||
Assert(current_size <= extend_to);
|
||||
Assert(num_pages != 0 || current_size >= extend_to);
|
||||
|
||||
for (int i = 0; i < extended_by; i++)
|
||||
{
|
||||
if (first_block + i != extend_to - 1)
|
||||
ReleaseBuffer(buffers[i]);
|
||||
else
|
||||
buffer = buffers[i];
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* It's possible that another backend concurrently extended the relation.
|
||||
* In that case read the buffer.
|
||||
*
|
||||
* XXX: Should we control this via a flag?
|
||||
*/
|
||||
if (buffer == InvalidBuffer)
|
||||
{
|
||||
bool hit;
|
||||
|
||||
Assert(extended_by == 0);
|
||||
buffer = ReadBuffer_common(eb.smgr, eb.relpersistence,
|
||||
fork, extend_to - 1, mode, strategy,
|
||||
&hit);
|
||||
}
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
/*
|
||||
* ReadBuffer_common -- common logic for all ReadBuffer variants
|
||||
|
@ -801,35 +992,38 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
|
|||
bool found;
|
||||
IOContext io_context;
|
||||
IOObject io_object;
|
||||
bool isExtend;
|
||||
bool isLocalBuf = SmgrIsTemp(smgr);
|
||||
|
||||
*hit = false;
|
||||
|
||||
/*
|
||||
* Backward compatibility path, most code should use ExtendBufferedRel()
|
||||
* instead, as acquiring the extension lock inside ExtendBufferedRel()
|
||||
* scales a lot better.
|
||||
*/
|
||||
if (unlikely(blockNum == P_NEW))
|
||||
{
|
||||
uint32 flags = EB_SKIP_EXTENSION_LOCK;
|
||||
|
||||
Assert(mode == RBM_NORMAL ||
|
||||
mode == RBM_ZERO_AND_LOCK ||
|
||||
mode == RBM_ZERO_ON_ERROR);
|
||||
|
||||
if (mode == RBM_ZERO_AND_LOCK)
|
||||
flags |= EB_LOCK_FIRST;
|
||||
|
||||
return ExtendBufferedRel(EB_SMGR(smgr, relpersistence),
|
||||
forkNum, strategy, flags);
|
||||
}
|
||||
|
||||
/* Make sure we will have room to remember the buffer pin */
|
||||
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
|
||||
|
||||
isExtend = (blockNum == P_NEW);
|
||||
|
||||
TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
|
||||
smgr->smgr_rlocator.locator.spcOid,
|
||||
smgr->smgr_rlocator.locator.dbOid,
|
||||
smgr->smgr_rlocator.locator.relNumber,
|
||||
smgr->smgr_rlocator.backend,
|
||||
isExtend);
|
||||
|
||||
/* Substitute proper block number if caller asked for P_NEW */
|
||||
if (isExtend)
|
||||
{
|
||||
blockNum = smgrnblocks(smgr, forkNum);
|
||||
/* Fail if relation is already at maximum possible length */
|
||||
if (blockNum == P_NEW)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
|
||||
errmsg("cannot extend relation %s beyond %u blocks",
|
||||
relpath(smgr->smgr_rlocator, forkNum),
|
||||
P_NEW)));
|
||||
}
|
||||
smgr->smgr_rlocator.backend);
|
||||
|
||||
if (isLocalBuf)
|
||||
{
|
||||
|
@ -844,8 +1038,6 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
|
|||
bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
|
||||
if (found)
|
||||
pgBufferUsage.local_blks_hit++;
|
||||
else if (isExtend)
|
||||
pgBufferUsage.local_blks_written++;
|
||||
else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
|
||||
mode == RBM_ZERO_ON_ERROR)
|
||||
pgBufferUsage.local_blks_read++;
|
||||
|
@ -862,8 +1054,6 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
|
|||
strategy, &found, io_context);
|
||||
if (found)
|
||||
pgBufferUsage.shared_blks_hit++;
|
||||
else if (isExtend)
|
||||
pgBufferUsage.shared_blks_written++;
|
||||
else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
|
||||
mode == RBM_ZERO_ON_ERROR)
|
||||
pgBufferUsage.shared_blks_read++;
|
||||
|
@ -874,175 +1064,91 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
|
|||
/* if it was already in the buffer pool, we're done */
|
||||
if (found)
|
||||
{
|
||||
if (!isExtend)
|
||||
{
|
||||
/* Just need to update stats before we exit */
|
||||
*hit = true;
|
||||
VacuumPageHit++;
|
||||
pgstat_count_io_op(io_object, io_context, IOOP_HIT);
|
||||
/* Just need to update stats before we exit */
|
||||
*hit = true;
|
||||
VacuumPageHit++;
|
||||
pgstat_count_io_op(io_object, io_context, IOOP_HIT);
|
||||
|
||||
if (VacuumCostActive)
|
||||
VacuumCostBalance += VacuumCostPageHit;
|
||||
if (VacuumCostActive)
|
||||
VacuumCostBalance += VacuumCostPageHit;
|
||||
|
||||
TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
|
||||
smgr->smgr_rlocator.locator.spcOid,
|
||||
smgr->smgr_rlocator.locator.dbOid,
|
||||
smgr->smgr_rlocator.locator.relNumber,
|
||||
smgr->smgr_rlocator.backend,
|
||||
isExtend,
|
||||
found);
|
||||
|
||||
/*
|
||||
* In RBM_ZERO_AND_LOCK mode the caller expects the page to be
|
||||
* locked on return.
|
||||
*/
|
||||
if (!isLocalBuf)
|
||||
{
|
||||
if (mode == RBM_ZERO_AND_LOCK)
|
||||
LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
|
||||
LW_EXCLUSIVE);
|
||||
else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
|
||||
LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
|
||||
}
|
||||
|
||||
return BufferDescriptorGetBuffer(bufHdr);
|
||||
}
|
||||
TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
|
||||
smgr->smgr_rlocator.locator.spcOid,
|
||||
smgr->smgr_rlocator.locator.dbOid,
|
||||
smgr->smgr_rlocator.locator.relNumber,
|
||||
smgr->smgr_rlocator.backend,
|
||||
found);
|
||||
|
||||
/*
|
||||
* We get here only in the corner case where we are trying to extend
|
||||
* the relation but we found a pre-existing buffer marked BM_VALID.
|
||||
* This can happen because mdread doesn't complain about reads beyond
|
||||
* EOF (when zero_damaged_pages is ON) and so a previous attempt to
|
||||
* read a block beyond EOF could have left a "valid" zero-filled
|
||||
* buffer. Unfortunately, we have also seen this case occurring
|
||||
* because of buggy Linux kernels that sometimes return an
|
||||
* lseek(SEEK_END) result that doesn't account for a recent write. In
|
||||
* that situation, the pre-existing buffer would contain valid data
|
||||
* that we don't want to overwrite. Since the legitimate case should
|
||||
* always have left a zero-filled buffer, complain if not PageIsNew.
|
||||
* In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked
|
||||
* on return.
|
||||
*/
|
||||
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
|
||||
if (!PageIsNew((Page) bufBlock))
|
||||
ereport(ERROR,
|
||||
(errmsg("unexpected data beyond EOF in block %u of relation %s",
|
||||
blockNum, relpath(smgr->smgr_rlocator, forkNum)),
|
||||
errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
|
||||
|
||||
/*
|
||||
* We *must* do smgrextend before succeeding, else the page will not
|
||||
* be reserved by the kernel, and the next P_NEW call will decide to
|
||||
* return the same page. Clear the BM_VALID bit, do the StartBufferIO
|
||||
* call that BufferAlloc didn't, and proceed.
|
||||
*/
|
||||
if (isLocalBuf)
|
||||
if (!isLocalBuf)
|
||||
{
|
||||
/* Only need to adjust flags */
|
||||
uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
|
||||
|
||||
Assert(buf_state & BM_VALID);
|
||||
buf_state &= ~BM_VALID;
|
||||
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
|
||||
if (mode == RBM_ZERO_AND_LOCK)
|
||||
LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
|
||||
LW_EXCLUSIVE);
|
||||
else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
|
||||
LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Loop to handle the very small possibility that someone re-sets
|
||||
* BM_VALID between our clearing it and StartBufferIO inspecting
|
||||
* it.
|
||||
*/
|
||||
do
|
||||
{
|
||||
uint32 buf_state = LockBufHdr(bufHdr);
|
||||
|
||||
Assert(buf_state & BM_VALID);
|
||||
buf_state &= ~BM_VALID;
|
||||
UnlockBufHdr(bufHdr, buf_state);
|
||||
} while (!StartBufferIO(bufHdr, true));
|
||||
}
|
||||
return BufferDescriptorGetBuffer(bufHdr);
|
||||
}
|
||||
|
||||
/*
|
||||
* if we have gotten to this point, we have allocated a buffer for the
|
||||
* page but its contents are not yet valid. IO_IN_PROGRESS is set for it,
|
||||
* if it's a shared buffer.
|
||||
*
|
||||
* Note: if smgrextend fails, we will end up with a buffer that is
|
||||
* allocated but not marked BM_VALID. P_NEW will still select the same
|
||||
* block number (because the relation didn't get any longer on disk) and
|
||||
* so future attempts to extend the relation will find the same buffer (if
|
||||
* it's not been recycled) but come right back here to try smgrextend
|
||||
* again.
|
||||
*/
|
||||
Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */
|
||||
|
||||
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
|
||||
|
||||
if (isExtend)
|
||||
{
|
||||
/* new buffers are zero-filled */
|
||||
/*
|
||||
* Read in the page, unless the caller intends to overwrite it and just
|
||||
* wants us to allocate a buffer.
|
||||
*/
|
||||
if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
|
||||
MemSet((char *) bufBlock, 0, BLCKSZ);
|
||||
/* don't set checksum for all-zero page */
|
||||
smgrextend(smgr, forkNum, blockNum, bufBlock, false);
|
||||
|
||||
pgstat_count_io_op(io_object, io_context, IOOP_EXTEND);
|
||||
|
||||
/*
|
||||
* NB: we're *not* doing a ScheduleBufferTagForWriteback here;
|
||||
* although we're essentially performing a write. At least on linux
|
||||
* doing so defeats the 'delayed allocation' mechanism, leading to
|
||||
* increased file fragmentation.
|
||||
*/
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Read in the page, unless the caller intends to overwrite it and
|
||||
* just wants us to allocate a buffer.
|
||||
*/
|
||||
if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
|
||||
MemSet((char *) bufBlock, 0, BLCKSZ);
|
||||
else
|
||||
instr_time io_start,
|
||||
io_time;
|
||||
|
||||
if (track_io_timing)
|
||||
INSTR_TIME_SET_CURRENT(io_start);
|
||||
|
||||
smgrread(smgr, forkNum, blockNum, bufBlock);
|
||||
|
||||
if (track_io_timing)
|
||||
{
|
||||
instr_time io_start,
|
||||
io_time;
|
||||
INSTR_TIME_SET_CURRENT(io_time);
|
||||
INSTR_TIME_SUBTRACT(io_time, io_start);
|
||||
pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
|
||||
INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
|
||||
}
|
||||
|
||||
if (track_io_timing)
|
||||
INSTR_TIME_SET_CURRENT(io_start);
|
||||
pgstat_count_io_op(io_object, io_context, IOOP_READ);
|
||||
|
||||
/* check for garbage data */
|
||||
if (!PageIsVerifiedExtended((Page) bufBlock, blockNum,
|
||||
PIV_LOG_WARNING | PIV_REPORT_STAT))
|
||||
{
|
||||
if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
|
||||
{
|
||||
ereport(WARNING,
|
||||
(errcode(ERRCODE_DATA_CORRUPTED),
|
||||
errmsg("invalid page in block %u of relation %s; zeroing out page",
|
||||
blockNum,
|
||||
relpath(smgr->smgr_rlocator, forkNum))));
|
||||
MemSet((char *) bufBlock, 0, BLCKSZ);
|
||||
}
|
||||
else
|
||||
INSTR_TIME_SET_ZERO(io_start);
|
||||
|
||||
smgrread(smgr, forkNum, blockNum, bufBlock);
|
||||
|
||||
pgstat_count_io_op(io_object, io_context, IOOP_READ);
|
||||
|
||||
if (track_io_timing)
|
||||
{
|
||||
INSTR_TIME_SET_CURRENT(io_time);
|
||||
INSTR_TIME_SUBTRACT(io_time, io_start);
|
||||
pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
|
||||
INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
|
||||
}
|
||||
|
||||
/* check for garbage data */
|
||||
if (!PageIsVerifiedExtended((Page) bufBlock, blockNum,
|
||||
PIV_LOG_WARNING | PIV_REPORT_STAT))
|
||||
{
|
||||
if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
|
||||
{
|
||||
ereport(WARNING,
|
||||
(errcode(ERRCODE_DATA_CORRUPTED),
|
||||
errmsg("invalid page in block %u of relation %s; zeroing out page",
|
||||
blockNum,
|
||||
relpath(smgr->smgr_rlocator, forkNum))));
|
||||
MemSet((char *) bufBlock, 0, BLCKSZ);
|
||||
}
|
||||
else
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DATA_CORRUPTED),
|
||||
errmsg("invalid page in block %u of relation %s",
|
||||
blockNum,
|
||||
relpath(smgr->smgr_rlocator, forkNum))));
|
||||
}
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DATA_CORRUPTED),
|
||||
errmsg("invalid page in block %u of relation %s",
|
||||
blockNum,
|
||||
relpath(smgr->smgr_rlocator, forkNum))));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1085,7 +1191,6 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
|
|||
smgr->smgr_rlocator.locator.dbOid,
|
||||
smgr->smgr_rlocator.locator.relNumber,
|
||||
smgr->smgr_rlocator.backend,
|
||||
isExtend,
|
||||
found);
|
||||
|
||||
return BufferDescriptorGetBuffer(bufHdr);
|
||||
|
@ -1219,8 +1324,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
|
|||
UnpinBuffer(victim_buf_hdr);
|
||||
|
||||
/*
|
||||
* The victim buffer we acquired peviously is clean and unused,
|
||||
* let it be found again quickly
|
||||
* The victim buffer we acquired peviously is clean and unused, let it
|
||||
* be found again quickly
|
||||
*/
|
||||
StrategyFreeBuffer(victim_buf_hdr);
|
||||
|
||||
|
@ -1633,6 +1738,365 @@ again:
|
|||
return buf;
|
||||
}
|
||||
|
||||
/*
|
||||
* Limit the number of pins a batch operation may additionally acquire, to
|
||||
* avoid running out of pinnable buffers.
|
||||
*
|
||||
* One additional pin is always allowed, as otherwise the operation likely
|
||||
* cannot be performed at all.
|
||||
*
|
||||
* The number of allowed pins for a backend is computed based on
|
||||
* shared_buffers and the maximum number of connections possible. That's very
|
||||
* pessimistic, but outside of toy-sized shared_buffers it should allow
|
||||
* sufficient pins.
|
||||
*/
|
||||
static void
|
||||
LimitAdditionalPins(uint32 *additional_pins)
|
||||
{
|
||||
uint32 max_backends;
|
||||
int max_proportional_pins;
|
||||
|
||||
if (*additional_pins <= 1)
|
||||
return;
|
||||
|
||||
max_backends = MaxBackends + NUM_AUXILIARY_PROCS;
|
||||
max_proportional_pins = NBuffers / max_backends;
|
||||
|
||||
/*
|
||||
* Subtract the approximate number of buffers already pinned by this
|
||||
* backend. We get the number of "overflowed" pins for free, but don't
|
||||
* know the number of pins in PrivateRefCountArray. The cost of
|
||||
* calculating that exactly doesn't seem worth it, so just assume the max.
|
||||
*/
|
||||
max_proportional_pins -= PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES;
|
||||
|
||||
if (max_proportional_pins < 0)
|
||||
max_proportional_pins = 1;
|
||||
|
||||
if (*additional_pins > max_proportional_pins)
|
||||
*additional_pins = max_proportional_pins;
|
||||
}
|
||||
|
||||
/*
|
||||
* Logic shared between ExtendBufferedRelBy(), ExtendBufferedRelTo(). Just to
|
||||
* avoid duplicating the tracing and relpersistence related logic.
|
||||
*/
|
||||
static BlockNumber
|
||||
ExtendBufferedRelCommon(ExtendBufferedWhat eb,
|
||||
ForkNumber fork,
|
||||
BufferAccessStrategy strategy,
|
||||
uint32 flags,
|
||||
uint32 extend_by,
|
||||
BlockNumber extend_upto,
|
||||
Buffer *buffers,
|
||||
uint32 *extended_by)
|
||||
{
|
||||
BlockNumber first_block;
|
||||
|
||||
TRACE_POSTGRESQL_BUFFER_EXTEND_START(fork,
|
||||
eb.smgr->smgr_rlocator.locator.spcOid,
|
||||
eb.smgr->smgr_rlocator.locator.dbOid,
|
||||
eb.smgr->smgr_rlocator.locator.relNumber,
|
||||
eb.smgr->smgr_rlocator.backend,
|
||||
extend_by);
|
||||
|
||||
if (eb.relpersistence == RELPERSISTENCE_TEMP)
|
||||
first_block = ExtendBufferedRelLocal(eb, fork, flags,
|
||||
extend_by, extend_upto,
|
||||
buffers, &extend_by);
|
||||
else
|
||||
first_block = ExtendBufferedRelShared(eb, fork, strategy, flags,
|
||||
extend_by, extend_upto,
|
||||
buffers, &extend_by);
|
||||
*extended_by = extend_by;
|
||||
|
||||
TRACE_POSTGRESQL_BUFFER_EXTEND_DONE(fork,
|
||||
eb.smgr->smgr_rlocator.locator.spcOid,
|
||||
eb.smgr->smgr_rlocator.locator.dbOid,
|
||||
eb.smgr->smgr_rlocator.locator.relNumber,
|
||||
eb.smgr->smgr_rlocator.backend,
|
||||
*extended_by,
|
||||
first_block);
|
||||
|
||||
return first_block;
|
||||
}
|
||||
|
||||
/*
|
||||
* Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
|
||||
* shared buffers.
|
||||
*/
|
||||
static BlockNumber
|
||||
ExtendBufferedRelShared(ExtendBufferedWhat eb,
|
||||
ForkNumber fork,
|
||||
BufferAccessStrategy strategy,
|
||||
uint32 flags,
|
||||
uint32 extend_by,
|
||||
BlockNumber extend_upto,
|
||||
Buffer *buffers,
|
||||
uint32 *extended_by)
|
||||
{
|
||||
BlockNumber first_block;
|
||||
IOContext io_context = IOContextForStrategy(strategy);
|
||||
|
||||
LimitAdditionalPins(&extend_by);
|
||||
|
||||
/*
|
||||
* Acquire victim buffers for extension without holding extension lock.
|
||||
* Writing out victim buffers is the most expensive part of extending the
|
||||
* relation, particularly when doing so requires WAL flushes. Zeroing out
|
||||
* the buffers is also quite expensive, so do that before holding the
|
||||
* extension lock as well.
|
||||
*
|
||||
* These pages are pinned by us and not valid. While we hold the pin they
|
||||
* can't be acquired as victim buffers by another backend.
|
||||
*/
|
||||
for (uint32 i = 0; i < extend_by; i++)
|
||||
{
|
||||
Block buf_block;
|
||||
|
||||
buffers[i] = GetVictimBuffer(strategy, io_context);
|
||||
buf_block = BufHdrGetBlock(GetBufferDescriptor(buffers[i] - 1));
|
||||
|
||||
/* new buffers are zero-filled */
|
||||
MemSet((char *) buf_block, 0, BLCKSZ);
|
||||
}
|
||||
|
||||
/* in case we need to pin an existing buffer below */
|
||||
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
|
||||
|
||||
/*
|
||||
* Lock relation against concurrent extensions, unless requested not to.
|
||||
*
|
||||
* We use the same extension lock for all forks. That's unnecessarily
|
||||
* restrictive, but currently extensions for forks don't happen often
|
||||
* enough to make it worth locking more granularly.
|
||||
*
|
||||
* Note that another backend might have extended the relation by the time
|
||||
* we get the lock.
|
||||
*/
|
||||
if (!(flags & EB_SKIP_EXTENSION_LOCK))
|
||||
{
|
||||
LockRelationForExtension(eb.rel, ExclusiveLock);
|
||||
if (eb.rel)
|
||||
eb.smgr = RelationGetSmgr(eb.rel);
|
||||
}
|
||||
|
||||
/*
|
||||
* If requested, invalidate size cache, so that smgrnblocks asks the
|
||||
* kernel.
|
||||
*/
|
||||
if (flags & EB_CLEAR_SIZE_CACHE)
|
||||
eb.smgr->smgr_cached_nblocks[fork] = InvalidBlockNumber;
|
||||
|
||||
first_block = smgrnblocks(eb.smgr, fork);
|
||||
|
||||
/*
|
||||
* Now that we have the accurate relation size, check if the caller wants
|
||||
* us to extend to only up to a specific size. If there were concurrent
|
||||
* extensions, we might have acquired too many buffers and need to release
|
||||
* them.
|
||||
*/
|
||||
if (extend_upto != InvalidBlockNumber)
|
||||
{
|
||||
uint32 orig_extend_by = extend_by;
|
||||
|
||||
if (first_block > extend_upto)
|
||||
extend_by = 0;
|
||||
else if ((uint64) first_block + extend_by > extend_upto)
|
||||
extend_by = extend_upto - first_block;
|
||||
|
||||
for (uint32 i = extend_by; i < orig_extend_by; i++)
|
||||
{
|
||||
BufferDesc *buf_hdr = GetBufferDescriptor(buffers[i] - 1);
|
||||
|
||||
/*
|
||||
* The victim buffer we acquired peviously is clean and unused,
|
||||
* let it be found again quickly
|
||||
*/
|
||||
StrategyFreeBuffer(buf_hdr);
|
||||
UnpinBuffer(buf_hdr);
|
||||
}
|
||||
|
||||
if (extend_by == 0)
|
||||
{
|
||||
if (!(flags & EB_SKIP_EXTENSION_LOCK))
|
||||
UnlockRelationForExtension(eb.rel, ExclusiveLock);
|
||||
*extended_by = extend_by;
|
||||
return first_block;
|
||||
}
|
||||
}
|
||||
|
||||
/* Fail if relation is already at maximum possible length */
|
||||
if ((uint64) first_block + extend_by >= MaxBlockNumber)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
|
||||
errmsg("cannot extend relation %s beyond %u blocks",
|
||||
relpath(eb.smgr->smgr_rlocator, fork),
|
||||
MaxBlockNumber)));
|
||||
|
||||
/*
|
||||
* Insert buffers into buffer table, mark as IO_IN_PROGRESS.
|
||||
*
|
||||
* This needs to happen before we extend the relation, because as soon as
|
||||
* we do, other backends can start to read in those pages.
|
||||
*/
|
||||
for (int i = 0; i < extend_by; i++)
|
||||
{
|
||||
Buffer victim_buf = buffers[i];
|
||||
BufferDesc *victim_buf_hdr = GetBufferDescriptor(victim_buf - 1);
|
||||
BufferTag tag;
|
||||
uint32 hash;
|
||||
LWLock *partition_lock;
|
||||
int existing_id;
|
||||
|
||||
InitBufferTag(&tag, &eb.smgr->smgr_rlocator.locator, fork, first_block + i);
|
||||
hash = BufTableHashCode(&tag);
|
||||
partition_lock = BufMappingPartitionLock(hash);
|
||||
|
||||
LWLockAcquire(partition_lock, LW_EXCLUSIVE);
|
||||
|
||||
existing_id = BufTableInsert(&tag, hash, victim_buf_hdr->buf_id);
|
||||
|
||||
/*
|
||||
* We get here only in the corner case where we are trying to extend
|
||||
* the relation but we found a pre-existing buffer. This can happen
|
||||
* because a prior attempt at extending the relation failed, and
|
||||
* because mdread doesn't complain about reads beyond EOF (when
|
||||
* zero_damaged_pages is ON) and so a previous attempt to read a block
|
||||
* beyond EOF could have left a "valid" zero-filled buffer.
|
||||
* Unfortunately, we have also seen this case occurring because of
|
||||
* buggy Linux kernels that sometimes return an lseek(SEEK_END) result
|
||||
* that doesn't account for a recent write. In that situation, the
|
||||
* pre-existing buffer would contain valid data that we don't want to
|
||||
* overwrite. Since the legitimate cases should always have left a
|
||||
* zero-filled buffer, complain if not PageIsNew.
|
||||
*/
|
||||
if (existing_id >= 0)
|
||||
{
|
||||
BufferDesc *existing_hdr = GetBufferDescriptor(existing_id);
|
||||
Block buf_block;
|
||||
bool valid;
|
||||
|
||||
/*
|
||||
* Pin the existing buffer before releasing the partition lock,
|
||||
* preventing it from being evicted.
|
||||
*/
|
||||
valid = PinBuffer(existing_hdr, strategy);
|
||||
|
||||
LWLockRelease(partition_lock);
|
||||
|
||||
/*
|
||||
* The victim buffer we acquired peviously is clean and unused,
|
||||
* let it be found again quickly
|
||||
*/
|
||||
StrategyFreeBuffer(victim_buf_hdr);
|
||||
UnpinBuffer(victim_buf_hdr);
|
||||
|
||||
buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
|
||||
buf_block = BufHdrGetBlock(existing_hdr);
|
||||
|
||||
if (valid && !PageIsNew((Page) buf_block))
|
||||
ereport(ERROR,
|
||||
(errmsg("unexpected data beyond EOF in block %u of relation %s",
|
||||
existing_hdr->tag.blockNum, relpath(eb.smgr->smgr_rlocator, fork)),
|
||||
errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
|
||||
|
||||
/*
|
||||
* We *must* do smgr[zero]extend before succeeding, else the page
|
||||
* will not be reserved by the kernel, and the next P_NEW call
|
||||
* will decide to return the same page. Clear the BM_VALID bit,
|
||||
* do StartBufferIO() and proceed.
|
||||
*
|
||||
* Loop to handle the very small possibility that someone re-sets
|
||||
* BM_VALID between our clearing it and StartBufferIO inspecting
|
||||
* it.
|
||||
*/
|
||||
do
|
||||
{
|
||||
uint32 buf_state = LockBufHdr(existing_hdr);
|
||||
|
||||
buf_state &= ~BM_VALID;
|
||||
UnlockBufHdr(existing_hdr, buf_state);
|
||||
} while (!StartBufferIO(existing_hdr, true));
|
||||
}
|
||||
else
|
||||
{
|
||||
uint32 buf_state;
|
||||
|
||||
buf_state = LockBufHdr(victim_buf_hdr);
|
||||
|
||||
/* some sanity checks while we hold the buffer header lock */
|
||||
Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
|
||||
Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 1);
|
||||
|
||||
victim_buf_hdr->tag = tag;
|
||||
|
||||
buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
|
||||
if (eb.relpersistence == RELPERSISTENCE_PERMANENT || fork == INIT_FORKNUM)
|
||||
buf_state |= BM_PERMANENT;
|
||||
|
||||
UnlockBufHdr(victim_buf_hdr, buf_state);
|
||||
|
||||
LWLockRelease(partition_lock);
|
||||
|
||||
/* XXX: could combine the locked operations in it with the above */
|
||||
StartBufferIO(victim_buf_hdr, true);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Note: if smgzerorextend fails, we will end up with buffers that are
|
||||
* allocated but not marked BM_VALID. The next relation extension will
|
||||
* still select the same block number (because the relation didn't get any
|
||||
* longer on disk) and so future attempts to extend the relation will find
|
||||
* the same buffers (if they have not been recycled) but come right back
|
||||
* here to try smgrzeroextend again.
|
||||
*
|
||||
* We don't need to set checksum for all-zero pages.
|
||||
*/
|
||||
smgrzeroextend(eb.smgr, fork, first_block, extend_by, false);
|
||||
|
||||
/*
|
||||
* Release the file-extension lock; it's now OK for someone else to extend
|
||||
* the relation some more.
|
||||
*
|
||||
* We remove IO_IN_PROGRESS after this, as waking up waiting backends can
|
||||
* take noticeable time.
|
||||
*/
|
||||
if (!(flags & EB_SKIP_EXTENSION_LOCK))
|
||||
UnlockRelationForExtension(eb.rel, ExclusiveLock);
|
||||
|
||||
/* Set BM_VALID, terminate IO, and wake up any waiters */
|
||||
for (int i = 0; i < extend_by; i++)
|
||||
{
|
||||
Buffer buf = buffers[i];
|
||||
BufferDesc *buf_hdr = GetBufferDescriptor(buf - 1);
|
||||
bool lock = false;
|
||||
|
||||
if (flags & EB_LOCK_FIRST && i == 0)
|
||||
lock = true;
|
||||
else if (flags & EB_LOCK_TARGET)
|
||||
{
|
||||
Assert(extend_upto != InvalidBlockNumber);
|
||||
if (first_block + i + 1 == extend_upto)
|
||||
lock = true;
|
||||
}
|
||||
|
||||
if (lock)
|
||||
LWLockAcquire(BufferDescriptorGetContentLock(buf_hdr), LW_EXCLUSIVE);
|
||||
|
||||
TerminateBufferIO(buf_hdr, false, BM_VALID);
|
||||
}
|
||||
|
||||
pgBufferUsage.shared_blks_written += extend_by;
|
||||
pgstat_count_io_op_n(IOOBJECT_RELATION, io_context, IOOP_EXTEND,
|
||||
extend_by);
|
||||
|
||||
*extended_by = extend_by;
|
||||
|
||||
return first_block;
|
||||
}
|
||||
|
||||
/*
|
||||
* MarkBufferDirty
|
||||
*
|
||||
|
|
|
@ -49,6 +49,9 @@ static int nextFreeLocalBufId = 0;
|
|||
|
||||
static HTAB *LocalBufHash = NULL;
|
||||
|
||||
/* number of local buffers pinned at least once */
|
||||
static int NLocalPinnedBuffers = 0;
|
||||
|
||||
|
||||
static void InitLocalBuffers(void);
|
||||
static Block GetLocalBufferStorage(void);
|
||||
|
@ -273,6 +276,154 @@ GetLocalVictimBuffer(void)
|
|||
return BufferDescriptorGetBuffer(bufHdr);
|
||||
}
|
||||
|
||||
/* see LimitAdditionalPins() */
|
||||
static void
|
||||
LimitAdditionalLocalPins(uint32 *additional_pins)
|
||||
{
|
||||
uint32 max_pins;
|
||||
|
||||
if (*additional_pins <= 1)
|
||||
return;
|
||||
|
||||
/*
|
||||
* In contrast to LimitAdditionalPins() other backends don't play a role
|
||||
* here. We can allow up to NLocBuffer pins in total.
|
||||
*/
|
||||
max_pins = (NLocBuffer - NLocalPinnedBuffers);
|
||||
|
||||
if (*additional_pins >= max_pins)
|
||||
*additional_pins = max_pins;
|
||||
}
|
||||
|
||||
/*
|
||||
* Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
|
||||
* temporary buffers.
|
||||
*/
|
||||
BlockNumber
|
||||
ExtendBufferedRelLocal(ExtendBufferedWhat eb,
|
||||
ForkNumber fork,
|
||||
uint32 flags,
|
||||
uint32 extend_by,
|
||||
BlockNumber extend_upto,
|
||||
Buffer *buffers,
|
||||
uint32 *extended_by)
|
||||
{
|
||||
BlockNumber first_block;
|
||||
|
||||
/* Initialize local buffers if first request in this session */
|
||||
if (LocalBufHash == NULL)
|
||||
InitLocalBuffers();
|
||||
|
||||
LimitAdditionalLocalPins(&extend_by);
|
||||
|
||||
for (uint32 i = 0; i < extend_by; i++)
|
||||
{
|
||||
BufferDesc *buf_hdr;
|
||||
Block buf_block;
|
||||
|
||||
buffers[i] = GetLocalVictimBuffer();
|
||||
buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
|
||||
buf_block = LocalBufHdrGetBlock(buf_hdr);
|
||||
|
||||
/* new buffers are zero-filled */
|
||||
MemSet((char *) buf_block, 0, BLCKSZ);
|
||||
}
|
||||
|
||||
first_block = smgrnblocks(eb.smgr, fork);
|
||||
|
||||
if (extend_upto != InvalidBlockNumber)
|
||||
{
|
||||
/*
|
||||
* In contrast to shared relations, nothing could change the relation
|
||||
* size concurrently. Thus we shouldn't end up finding that we don't
|
||||
* need to do anything.
|
||||
*/
|
||||
Assert(first_block <= extend_upto);
|
||||
|
||||
Assert((uint64) first_block + extend_by <= extend_upto);
|
||||
}
|
||||
|
||||
/* Fail if relation is already at maximum possible length */
|
||||
if ((uint64) first_block + extend_by >= MaxBlockNumber)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
|
||||
errmsg("cannot extend relation %s beyond %u blocks",
|
||||
relpath(eb.smgr->smgr_rlocator, fork),
|
||||
MaxBlockNumber)));
|
||||
|
||||
for (int i = 0; i < extend_by; i++)
|
||||
{
|
||||
int victim_buf_id;
|
||||
BufferDesc *victim_buf_hdr;
|
||||
BufferTag tag;
|
||||
LocalBufferLookupEnt *hresult;
|
||||
bool found;
|
||||
|
||||
victim_buf_id = -buffers[i] - 1;
|
||||
victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
|
||||
|
||||
InitBufferTag(&tag, &eb.smgr->smgr_rlocator.locator, fork, first_block + i);
|
||||
|
||||
hresult = (LocalBufferLookupEnt *)
|
||||
hash_search(LocalBufHash, (void *) &tag, HASH_ENTER, &found);
|
||||
if (found)
|
||||
{
|
||||
BufferDesc *existing_hdr = GetLocalBufferDescriptor(hresult->id);
|
||||
uint32 buf_state;
|
||||
|
||||
UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
|
||||
|
||||
existing_hdr = GetLocalBufferDescriptor(hresult->id);
|
||||
PinLocalBuffer(existing_hdr, false);
|
||||
buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
|
||||
|
||||
buf_state = pg_atomic_read_u32(&existing_hdr->state);
|
||||
Assert(buf_state & BM_TAG_VALID);
|
||||
Assert(!(buf_state & BM_DIRTY));
|
||||
buf_state &= BM_VALID;
|
||||
pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
|
||||
}
|
||||
else
|
||||
{
|
||||
uint32 buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
|
||||
|
||||
Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
|
||||
|
||||
victim_buf_hdr->tag = tag;
|
||||
|
||||
buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
|
||||
|
||||
pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
|
||||
|
||||
hresult->id = victim_buf_id;
|
||||
}
|
||||
}
|
||||
|
||||
/* actually extend relation */
|
||||
smgrzeroextend(eb.smgr, fork, first_block, extend_by, false);
|
||||
|
||||
for (int i = 0; i < extend_by; i++)
|
||||
{
|
||||
Buffer buf = buffers[i];
|
||||
BufferDesc *buf_hdr;
|
||||
uint32 buf_state;
|
||||
|
||||
buf_hdr = GetLocalBufferDescriptor(-buf - 1);
|
||||
|
||||
buf_state = pg_atomic_read_u32(&buf_hdr->state);
|
||||
buf_state |= BM_VALID;
|
||||
pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
|
||||
}
|
||||
|
||||
*extended_by = extend_by;
|
||||
|
||||
pgBufferUsage.temp_blks_written += extend_by;
|
||||
pgstat_count_io_op_n(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
|
||||
extend_by);
|
||||
|
||||
return first_block;
|
||||
}
|
||||
|
||||
/*
|
||||
* MarkLocalBufferDirty -
|
||||
* mark a local buffer dirty
|
||||
|
@ -492,6 +643,7 @@ PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
|
|||
|
||||
if (LocalRefCount[bufid] == 0)
|
||||
{
|
||||
NLocalPinnedBuffers++;
|
||||
if (adjust_usagecount &&
|
||||
BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
|
||||
{
|
||||
|
@ -513,9 +665,11 @@ UnpinLocalBuffer(Buffer buffer)
|
|||
|
||||
Assert(BufferIsLocal(buffer));
|
||||
Assert(LocalRefCount[buffid] > 0);
|
||||
Assert(NLocalPinnedBuffers > 0);
|
||||
|
||||
ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
|
||||
LocalRefCount[buffid]--;
|
||||
if (--LocalRefCount[buffid] == 0)
|
||||
NLocalPinnedBuffers--;
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -64,13 +64,19 @@ pgstat_bktype_io_stats_valid(PgStat_BktypeIO *backend_io,
|
|||
|
||||
void
|
||||
pgstat_count_io_op(IOObject io_object, IOContext io_context, IOOp io_op)
|
||||
{
|
||||
pgstat_count_io_op_n(io_object, io_context, io_op, 1);
|
||||
}
|
||||
|
||||
void
|
||||
pgstat_count_io_op_n(IOObject io_object, IOContext io_context, IOOp io_op, uint32 cnt)
|
||||
{
|
||||
Assert((unsigned int) io_object < IOOBJECT_NUM_TYPES);
|
||||
Assert((unsigned int) io_context < IOCONTEXT_NUM_TYPES);
|
||||
Assert((unsigned int) io_op < IOOP_NUM_TYPES);
|
||||
Assert(pgstat_tracks_io_op(MyBackendType, io_object, io_context, io_op));
|
||||
|
||||
PendingIOStats.data[io_object][io_context][io_op]++;
|
||||
PendingIOStats.data[io_object][io_context][io_op] += cnt;
|
||||
|
||||
have_iostats = true;
|
||||
}
|
||||
|
|
|
@ -55,10 +55,12 @@ provider postgresql {
|
|||
probe sort__start(int, bool, int, int, bool, int);
|
||||
probe sort__done(bool, long);
|
||||
|
||||
probe buffer__read__start(ForkNumber, BlockNumber, Oid, Oid, Oid, int, bool);
|
||||
probe buffer__read__done(ForkNumber, BlockNumber, Oid, Oid, Oid, int, bool, bool);
|
||||
probe buffer__read__start(ForkNumber, BlockNumber, Oid, Oid, Oid, int);
|
||||
probe buffer__read__done(ForkNumber, BlockNumber, Oid, Oid, Oid, int, bool);
|
||||
probe buffer__flush__start(ForkNumber, BlockNumber, Oid, Oid, Oid);
|
||||
probe buffer__flush__done(ForkNumber, BlockNumber, Oid, Oid, Oid);
|
||||
probe buffer__extend__start(ForkNumber, Oid, Oid, Oid, int, unsigned int);
|
||||
probe buffer__extend__done(ForkNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber);
|
||||
|
||||
probe buffer__checkpoint__start(int);
|
||||
probe buffer__checkpoint__sync__start();
|
||||
|
|
|
@ -516,6 +516,7 @@ extern PgStat_CheckpointerStats *pgstat_fetch_stat_checkpointer(void);
|
|||
extern bool pgstat_bktype_io_stats_valid(PgStat_BktypeIO *context_ops,
|
||||
BackendType bktype);
|
||||
extern void pgstat_count_io_op(IOObject io_object, IOContext io_context, IOOp io_op);
|
||||
extern void pgstat_count_io_op_n(IOObject io_object, IOContext io_context, IOOp io_op, uint32 cnt);
|
||||
extern PgStat_IO *pgstat_fetch_stat_io(void);
|
||||
extern const char *pgstat_get_io_context_name(IOContext io_context);
|
||||
extern const char *pgstat_get_io_object_name(IOObject io_object);
|
||||
|
|
|
@ -422,6 +422,13 @@ extern PrefetchBufferResult PrefetchLocalBuffer(SMgrRelation smgr,
|
|||
BlockNumber blockNum);
|
||||
extern BufferDesc *LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum,
|
||||
BlockNumber blockNum, bool *foundPtr);
|
||||
extern BlockNumber ExtendBufferedRelLocal(ExtendBufferedWhat eb,
|
||||
ForkNumber fork,
|
||||
uint32 flags,
|
||||
uint32 extend_by,
|
||||
BlockNumber extend_upto,
|
||||
Buffer *buffers,
|
||||
uint32 *extended_by);
|
||||
extern void MarkLocalBufferDirty(Buffer buffer);
|
||||
extern void DropRelationLocalBuffers(RelFileLocator rlocator,
|
||||
ForkNumber forkNum,
|
||||
|
|
|
@ -60,6 +60,53 @@ typedef struct PrefetchBufferResult
|
|||
bool initiated_io; /* If true, a miss resulting in async I/O */
|
||||
} PrefetchBufferResult;
|
||||
|
||||
/*
|
||||
* Flags influencing the behaviour of ExtendBufferedRel*
|
||||
*/
|
||||
typedef enum ExtendBufferedFlags
|
||||
{
|
||||
/*
|
||||
* Don't acquire extension lock. This is safe only if the relation isn't
|
||||
* shared, an access exclusive lock is held or if this is the startup
|
||||
* process.
|
||||
*/
|
||||
EB_SKIP_EXTENSION_LOCK = (1 << 0),
|
||||
|
||||
/* Is this extension part of recovery? */
|
||||
EB_PERFORMING_RECOVERY = (1 << 1),
|
||||
|
||||
/*
|
||||
* Should the fork be created if it does not currently exist? This likely
|
||||
* only ever makes sense for relation forks.
|
||||
*/
|
||||
EB_CREATE_FORK_IF_NEEDED = (1 << 2),
|
||||
|
||||
/* Should the first (possibly only) return buffer be returned locked? */
|
||||
EB_LOCK_FIRST = (1 << 3),
|
||||
|
||||
/* Should the smgr size cache be cleared? */
|
||||
EB_CLEAR_SIZE_CACHE = (1 << 4),
|
||||
|
||||
/* internal flags follow */
|
||||
EB_LOCK_TARGET = (1 << 5),
|
||||
} ExtendBufferedFlags;
|
||||
|
||||
/*
|
||||
* To identify the relation - either relation or smgr + relpersistence has to
|
||||
* be specified. Used via the EB_REL()/EB_SMGR() macros below. This allows us
|
||||
* to use the same function for both crash recovery and normal operation.
|
||||
*/
|
||||
typedef struct ExtendBufferedWhat
|
||||
{
|
||||
Relation rel;
|
||||
struct SMgrRelationData *smgr;
|
||||
char relpersistence;
|
||||
} ExtendBufferedWhat;
|
||||
|
||||
#define EB_REL(p_rel) ((ExtendBufferedWhat){.rel = p_rel})
|
||||
#define EB_SMGR(p_smgr, p_relpersistence) ((ExtendBufferedWhat){.smgr = p_smgr, .relpersistence = p_relpersistence})
|
||||
|
||||
|
||||
/* forward declared, to avoid having to expose buf_internals.h here */
|
||||
struct WritebackContext;
|
||||
|
||||
|
@ -138,6 +185,24 @@ extern void CheckBufferIsPinnedOnce(Buffer buffer);
|
|||
extern Buffer ReleaseAndReadBuffer(Buffer buffer, Relation relation,
|
||||
BlockNumber blockNum);
|
||||
|
||||
extern Buffer ExtendBufferedRel(ExtendBufferedWhat eb,
|
||||
ForkNumber forkNum,
|
||||
BufferAccessStrategy strategy,
|
||||
uint32 flags);
|
||||
extern BlockNumber ExtendBufferedRelBy(ExtendBufferedWhat eb,
|
||||
ForkNumber fork,
|
||||
BufferAccessStrategy strategy,
|
||||
uint32 flags,
|
||||
uint32 extend_by,
|
||||
Buffer *buffers,
|
||||
uint32 *extended_by);
|
||||
extern Buffer ExtendBufferedRelTo(ExtendBufferedWhat eb,
|
||||
ForkNumber fork,
|
||||
BufferAccessStrategy strategy,
|
||||
uint32 flags,
|
||||
BlockNumber extend_to,
|
||||
ReadBufferMode mode);
|
||||
|
||||
extern void InitBufferPoolAccess(void);
|
||||
extern void AtEOXact_Buffers(bool isCommit);
|
||||
extern void PrintBufferLeakWarning(Buffer buffer);
|
||||
|
|
|
@ -708,6 +708,8 @@ ExprEvalRowtypeCache
|
|||
ExprEvalStep
|
||||
ExprState
|
||||
ExprStateEvalFunc
|
||||
ExtendBufferedFlags
|
||||
ExtendBufferedWhat
|
||||
ExtensibleNode
|
||||
ExtensibleNodeEntry
|
||||
ExtensibleNodeMethods
|
||||
|
|
Loading…
Reference in New Issue