Use streaming I/O in sequential scans.

Instead of calling ReadBuffer() for each block, heap sequential scans
and TID range scans now use the streaming API introduced in b5a9b18cd0.

Author: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/flat/CAAKRu_YtXJiYKQvb5JsA2SkwrsizYLugs4sSOZh3EAjKUg%3DgEQ%40mail.gmail.com
This commit is contained in:
Thomas Munro 2024-04-08 01:48:27 +12:00
parent 6ed83d5fa5
commit b7b0f3f272
2 changed files with 180 additions and 76 deletions

View File

@ -223,6 +223,68 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
* ----------------------------------------------------------------
*/
/*
* Streaming read API callback for parallel sequential scans. Returns the next
* block the caller wants from the read stream or InvalidBlockNumber when done.
*/
static BlockNumber
heap_scan_stream_read_next_parallel(ReadStream *stream,
void *callback_private_data,
void *per_buffer_data)
{
HeapScanDesc scan = (HeapScanDesc) callback_private_data;
Assert(ScanDirectionIsForward(scan->rs_dir));
Assert(scan->rs_base.rs_parallel);
if (unlikely(!scan->rs_inited))
{
/* parallel scan */
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
scan->rs_parallelworkerdata,
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
/* may return InvalidBlockNumber if there are no more blocks */
scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
scan->rs_parallelworkerdata,
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
scan->rs_inited = true;
}
else
{
scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc)
scan->rs_base.rs_parallel);
}
return scan->rs_prefetch_block;
}
/*
* Streaming read API callback for serial sequential and TID range scans.
* Returns the next block the caller wants from the read stream or
* InvalidBlockNumber when done.
*/
static BlockNumber
heap_scan_stream_read_next_serial(ReadStream *stream,
void *callback_private_data,
void *per_buffer_data)
{
HeapScanDesc scan = (HeapScanDesc) callback_private_data;
if (unlikely(!scan->rs_inited))
{
scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir);
scan->rs_inited = true;
}
else
scan->rs_prefetch_block = heapgettup_advance_block(scan,
scan->rs_prefetch_block,
scan->rs_dir);
return scan->rs_prefetch_block;
}
/* ----------------
* initscan - scan code common to heap_beginscan and heap_rescan
* ----------------
@ -325,6 +387,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
scan->rs_cbuf = InvalidBuffer;
scan->rs_cblock = InvalidBlockNumber;
/*
* Initialize to ForwardScanDirection because it is most common and
* because heap scans go forward before going backward (e.g. CURSORs).
*/
scan->rs_dir = ForwardScanDirection;
scan->rs_prefetch_block = InvalidBlockNumber;
/* page-at-a-time fields are always invalid when not rs_inited */
/*
@ -508,12 +577,14 @@ heap_prepare_pagescan(TableScanDesc sscan)
/*
* heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM.
*
* Read the next block of the scan relation into a buffer and pin that buffer
* before saving it in the scan descriptor.
* Read the next block of the scan relation from the read stream and save it
* in the scan descriptor. It is already pinned.
*/
static inline void
heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
{
Assert(scan->rs_read_stream);
/* release previous scan buffer, if any */
if (BufferIsValid(scan->rs_cbuf))
{
@ -528,25 +599,23 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
*/
CHECK_FOR_INTERRUPTS();
if (unlikely(!scan->rs_inited))
/*
* If the scan direction is changing, reset the prefetch block to the
* current block. Otherwise, we will incorrectly prefetch the blocks
* between the prefetch block and the current block again before
* prefetching blocks in the new, correct scan direction.
*/
if (unlikely(scan->rs_dir != dir))
{
scan->rs_cblock = heapgettup_initial_block(scan, dir);
/* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
Assert(scan->rs_cblock != InvalidBlockNumber ||
!BufferIsValid(scan->rs_cbuf));
scan->rs_inited = true;
scan->rs_prefetch_block = scan->rs_cblock;
read_stream_reset(scan->rs_read_stream);
}
else
scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock,
dir);
/* read block if valid */
if (BlockNumberIsValid(scan->rs_cblock))
scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
scan->rs_cblock, RBM_NORMAL,
scan->rs_strategy);
scan->rs_dir = dir;
scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL);
if (BufferIsValid(scan->rs_cbuf))
scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
}
/*
@ -560,6 +629,7 @@ static pg_noinline BlockNumber
heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
{
Assert(!scan->rs_inited);
Assert(scan->rs_base.rs_parallel == NULL);
/* When there are no pages to scan, return InvalidBlockNumber */
if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0)
@ -567,27 +637,10 @@ heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
if (ScanDirectionIsForward(dir))
{
/* serial scan */
if (scan->rs_base.rs_parallel == NULL)
return scan->rs_startblock;
else
{
/* parallel scan */
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
scan->rs_parallelworkerdata,
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
/* may return InvalidBlockNumber if there are no more blocks */
return table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
scan->rs_parallelworkerdata,
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
}
return scan->rs_startblock;
}
else
{
/* backward parallel scan not supported */
Assert(scan->rs_base.rs_parallel == NULL);
/*
* Disable reporting to syncscan logic in a backwards scan; it's not
* very likely anyone else is doing the same thing at the same time,
@ -699,50 +752,43 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft,
static inline BlockNumber
heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir)
{
if (ScanDirectionIsForward(dir))
Assert(scan->rs_base.rs_parallel == NULL);
if (likely(ScanDirectionIsForward(dir)))
{
if (scan->rs_base.rs_parallel == NULL)
block++;
/* wrap back to the start of the heap */
if (block >= scan->rs_nblocks)
block = 0;
/*
* Report our new scan position for synchronization purposes. We don't
* do that when moving backwards, however. That would just mess up any
* other forward-moving scanners.
*
* Note: we do this before checking for end of scan so that the final
* state of the position hint is back at the start of the rel. That's
* not strictly necessary, but otherwise when you run the same query
* multiple times the starting position would shift a little bit
* backwards on every invocation, which is confusing. We don't
* guarantee any specific ordering in general, though.
*/
if (scan->rs_base.rs_flags & SO_ALLOW_SYNC)
ss_report_location(scan->rs_base.rs_rd, block);
/* we're done if we're back at where we started */
if (block == scan->rs_startblock)
return InvalidBlockNumber;
/* check if the limit imposed by heap_setscanlimits() is met */
if (scan->rs_numblocks != InvalidBlockNumber)
{
block++;
/* wrap back to the start of the heap */
if (block >= scan->rs_nblocks)
block = 0;
/*
* Report our new scan position for synchronization purposes. We
* don't do that when moving backwards, however. That would just
* mess up any other forward-moving scanners.
*
* Note: we do this before checking for end of scan so that the
* final state of the position hint is back at the start of the
* rel. That's not strictly necessary, but otherwise when you run
* the same query multiple times the starting position would shift
* a little bit backwards on every invocation, which is confusing.
* We don't guarantee any specific ordering in general, though.
*/
if (scan->rs_base.rs_flags & SO_ALLOW_SYNC)
ss_report_location(scan->rs_base.rs_rd, block);
/* we're done if we're back at where we started */
if (block == scan->rs_startblock)
if (--scan->rs_numblocks == 0)
return InvalidBlockNumber;
/* check if the limit imposed by heap_setscanlimits() is met */
if (scan->rs_numblocks != InvalidBlockNumber)
{
if (--scan->rs_numblocks == 0)
return InvalidBlockNumber;
}
return block;
}
else
{
return table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc)
scan->rs_base.rs_parallel);
}
return block;
}
else
{
@ -879,6 +925,7 @@ continue_page:
scan->rs_cbuf = InvalidBuffer;
scan->rs_cblock = InvalidBlockNumber;
scan->rs_prefetch_block = InvalidBlockNumber;
tuple->t_data = NULL;
scan->rs_inited = false;
}
@ -974,6 +1021,7 @@ continue_page:
ReleaseBuffer(scan->rs_cbuf);
scan->rs_cbuf = InvalidBuffer;
scan->rs_cblock = InvalidBlockNumber;
scan->rs_prefetch_block = InvalidBlockNumber;
tuple->t_data = NULL;
scan->rs_inited = false;
}
@ -1069,6 +1117,33 @@ heap_beginscan(Relation relation, Snapshot snapshot,
initscan(scan, key, false);
scan->rs_read_stream = NULL;
/*
* Set up a read stream for sequential scans and TID range scans. This
* should be done after initscan() because initscan() allocates the
* BufferAccessStrategy object passed to the streaming read API.
*/
if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN ||
scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN)
{
ReadStreamBlockNumberCB cb;
if (scan->rs_base.rs_parallel)
cb = heap_scan_stream_read_next_parallel;
else
cb = heap_scan_stream_read_next_serial;
scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL,
scan->rs_strategy,
scan->rs_base.rs_rd,
MAIN_FORKNUM,
cb,
scan,
0);
}
return (TableScanDesc) scan;
}
@ -1111,6 +1186,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
Assert(scan->rs_empty_tuples_pending == 0);
/*
* The read stream is reset on rescan. This must be done before
* initscan(), as some state referred to by read_stream_reset() is reset
* in initscan().
*/
if (scan->rs_read_stream)
read_stream_reset(scan->rs_read_stream);
/*
* reinitialize scan descriptor
*/
@ -1135,6 +1218,12 @@ heap_endscan(TableScanDesc sscan)
Assert(scan->rs_empty_tuples_pending == 0);
/*
* Must free the read stream before freeing the BufferAccessStrategy.
*/
if (scan->rs_read_stream)
read_stream_end(scan->rs_read_stream);
/*
* decrement relation reference count and free scan descriptor storage
*/

View File

@ -25,6 +25,7 @@
#include "storage/bufpage.h"
#include "storage/dsm.h"
#include "storage/lockdefs.h"
#include "storage/read_stream.h"
#include "storage/shm_toc.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
@ -70,6 +71,20 @@ typedef struct HeapScanDescData
HeapTupleData rs_ctup; /* current tuple in scan, if any */
/* For scans that stream reads */
ReadStream *rs_read_stream;
/*
* For sequential scans and TID range scans to stream reads. The read
* stream is allocated at the beginning of the scan and reset on rescan or
* when the scan direction changes. The scan direction is saved each time
* a new page is requested. If the scan direction changes from one page to
* the next, the read stream releases all previously pinned buffers and
* resets the prefetch block.
*/
ScanDirection rs_dir;
BlockNumber rs_prefetch_block;
/*
* For parallel scans to store page allocation data. NULL when not
* performing a parallel scan.