mirror of https://github.com/postgres/postgres
Use streaming I/O in sequential scans.
Instead of calling ReadBuffer() for each block, heap sequential scans
and TID range scans now use the streaming API introduced in b5a9b18cd0
.
Author: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/flat/CAAKRu_YtXJiYKQvb5JsA2SkwrsizYLugs4sSOZh3EAjKUg%3DgEQ%40mail.gmail.com
This commit is contained in:
parent
6ed83d5fa5
commit
b7b0f3f272
|
@ -223,6 +223,68 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
|
|||
* ----------------------------------------------------------------
|
||||
*/
|
||||
|
||||
/*
|
||||
* Streaming read API callback for parallel sequential scans. Returns the next
|
||||
* block the caller wants from the read stream or InvalidBlockNumber when done.
|
||||
*/
|
||||
static BlockNumber
|
||||
heap_scan_stream_read_next_parallel(ReadStream *stream,
|
||||
void *callback_private_data,
|
||||
void *per_buffer_data)
|
||||
{
|
||||
HeapScanDesc scan = (HeapScanDesc) callback_private_data;
|
||||
|
||||
Assert(ScanDirectionIsForward(scan->rs_dir));
|
||||
Assert(scan->rs_base.rs_parallel);
|
||||
|
||||
if (unlikely(!scan->rs_inited))
|
||||
{
|
||||
/* parallel scan */
|
||||
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
|
||||
scan->rs_parallelworkerdata,
|
||||
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
|
||||
|
||||
/* may return InvalidBlockNumber if there are no more blocks */
|
||||
scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
|
||||
scan->rs_parallelworkerdata,
|
||||
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
|
||||
scan->rs_inited = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
|
||||
scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc)
|
||||
scan->rs_base.rs_parallel);
|
||||
}
|
||||
|
||||
return scan->rs_prefetch_block;
|
||||
}
|
||||
|
||||
/*
|
||||
* Streaming read API callback for serial sequential and TID range scans.
|
||||
* Returns the next block the caller wants from the read stream or
|
||||
* InvalidBlockNumber when done.
|
||||
*/
|
||||
static BlockNumber
|
||||
heap_scan_stream_read_next_serial(ReadStream *stream,
|
||||
void *callback_private_data,
|
||||
void *per_buffer_data)
|
||||
{
|
||||
HeapScanDesc scan = (HeapScanDesc) callback_private_data;
|
||||
|
||||
if (unlikely(!scan->rs_inited))
|
||||
{
|
||||
scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir);
|
||||
scan->rs_inited = true;
|
||||
}
|
||||
else
|
||||
scan->rs_prefetch_block = heapgettup_advance_block(scan,
|
||||
scan->rs_prefetch_block,
|
||||
scan->rs_dir);
|
||||
|
||||
return scan->rs_prefetch_block;
|
||||
}
|
||||
|
||||
/* ----------------
|
||||
* initscan - scan code common to heap_beginscan and heap_rescan
|
||||
* ----------------
|
||||
|
@ -325,6 +387,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
|
|||
scan->rs_cbuf = InvalidBuffer;
|
||||
scan->rs_cblock = InvalidBlockNumber;
|
||||
|
||||
/*
|
||||
* Initialize to ForwardScanDirection because it is most common and
|
||||
* because heap scans go forward before going backward (e.g. CURSORs).
|
||||
*/
|
||||
scan->rs_dir = ForwardScanDirection;
|
||||
scan->rs_prefetch_block = InvalidBlockNumber;
|
||||
|
||||
/* page-at-a-time fields are always invalid when not rs_inited */
|
||||
|
||||
/*
|
||||
|
@ -508,12 +577,14 @@ heap_prepare_pagescan(TableScanDesc sscan)
|
|||
/*
|
||||
* heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM.
|
||||
*
|
||||
* Read the next block of the scan relation into a buffer and pin that buffer
|
||||
* before saving it in the scan descriptor.
|
||||
* Read the next block of the scan relation from the read stream and save it
|
||||
* in the scan descriptor. It is already pinned.
|
||||
*/
|
||||
static inline void
|
||||
heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
|
||||
{
|
||||
Assert(scan->rs_read_stream);
|
||||
|
||||
/* release previous scan buffer, if any */
|
||||
if (BufferIsValid(scan->rs_cbuf))
|
||||
{
|
||||
|
@ -528,25 +599,23 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
|
|||
*/
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
if (unlikely(!scan->rs_inited))
|
||||
/*
|
||||
* If the scan direction is changing, reset the prefetch block to the
|
||||
* current block. Otherwise, we will incorrectly prefetch the blocks
|
||||
* between the prefetch block and the current block again before
|
||||
* prefetching blocks in the new, correct scan direction.
|
||||
*/
|
||||
if (unlikely(scan->rs_dir != dir))
|
||||
{
|
||||
scan->rs_cblock = heapgettup_initial_block(scan, dir);
|
||||
|
||||
/* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
|
||||
Assert(scan->rs_cblock != InvalidBlockNumber ||
|
||||
!BufferIsValid(scan->rs_cbuf));
|
||||
|
||||
scan->rs_inited = true;
|
||||
scan->rs_prefetch_block = scan->rs_cblock;
|
||||
read_stream_reset(scan->rs_read_stream);
|
||||
}
|
||||
else
|
||||
scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock,
|
||||
dir);
|
||||
|
||||
/* read block if valid */
|
||||
if (BlockNumberIsValid(scan->rs_cblock))
|
||||
scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
|
||||
scan->rs_cblock, RBM_NORMAL,
|
||||
scan->rs_strategy);
|
||||
scan->rs_dir = dir;
|
||||
|
||||
scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL);
|
||||
if (BufferIsValid(scan->rs_cbuf))
|
||||
scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -560,6 +629,7 @@ static pg_noinline BlockNumber
|
|||
heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
|
||||
{
|
||||
Assert(!scan->rs_inited);
|
||||
Assert(scan->rs_base.rs_parallel == NULL);
|
||||
|
||||
/* When there are no pages to scan, return InvalidBlockNumber */
|
||||
if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0)
|
||||
|
@ -567,27 +637,10 @@ heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
|
|||
|
||||
if (ScanDirectionIsForward(dir))
|
||||
{
|
||||
/* serial scan */
|
||||
if (scan->rs_base.rs_parallel == NULL)
|
||||
return scan->rs_startblock;
|
||||
else
|
||||
{
|
||||
/* parallel scan */
|
||||
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
|
||||
scan->rs_parallelworkerdata,
|
||||
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
|
||||
|
||||
/* may return InvalidBlockNumber if there are no more blocks */
|
||||
return table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
|
||||
scan->rs_parallelworkerdata,
|
||||
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
|
||||
}
|
||||
return scan->rs_startblock;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* backward parallel scan not supported */
|
||||
Assert(scan->rs_base.rs_parallel == NULL);
|
||||
|
||||
/*
|
||||
* Disable reporting to syncscan logic in a backwards scan; it's not
|
||||
* very likely anyone else is doing the same thing at the same time,
|
||||
|
@ -699,50 +752,43 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft,
|
|||
static inline BlockNumber
|
||||
heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir)
|
||||
{
|
||||
if (ScanDirectionIsForward(dir))
|
||||
Assert(scan->rs_base.rs_parallel == NULL);
|
||||
|
||||
if (likely(ScanDirectionIsForward(dir)))
|
||||
{
|
||||
if (scan->rs_base.rs_parallel == NULL)
|
||||
block++;
|
||||
|
||||
/* wrap back to the start of the heap */
|
||||
if (block >= scan->rs_nblocks)
|
||||
block = 0;
|
||||
|
||||
/*
|
||||
* Report our new scan position for synchronization purposes. We don't
|
||||
* do that when moving backwards, however. That would just mess up any
|
||||
* other forward-moving scanners.
|
||||
*
|
||||
* Note: we do this before checking for end of scan so that the final
|
||||
* state of the position hint is back at the start of the rel. That's
|
||||
* not strictly necessary, but otherwise when you run the same query
|
||||
* multiple times the starting position would shift a little bit
|
||||
* backwards on every invocation, which is confusing. We don't
|
||||
* guarantee any specific ordering in general, though.
|
||||
*/
|
||||
if (scan->rs_base.rs_flags & SO_ALLOW_SYNC)
|
||||
ss_report_location(scan->rs_base.rs_rd, block);
|
||||
|
||||
/* we're done if we're back at where we started */
|
||||
if (block == scan->rs_startblock)
|
||||
return InvalidBlockNumber;
|
||||
|
||||
/* check if the limit imposed by heap_setscanlimits() is met */
|
||||
if (scan->rs_numblocks != InvalidBlockNumber)
|
||||
{
|
||||
block++;
|
||||
|
||||
/* wrap back to the start of the heap */
|
||||
if (block >= scan->rs_nblocks)
|
||||
block = 0;
|
||||
|
||||
/*
|
||||
* Report our new scan position for synchronization purposes. We
|
||||
* don't do that when moving backwards, however. That would just
|
||||
* mess up any other forward-moving scanners.
|
||||
*
|
||||
* Note: we do this before checking for end of scan so that the
|
||||
* final state of the position hint is back at the start of the
|
||||
* rel. That's not strictly necessary, but otherwise when you run
|
||||
* the same query multiple times the starting position would shift
|
||||
* a little bit backwards on every invocation, which is confusing.
|
||||
* We don't guarantee any specific ordering in general, though.
|
||||
*/
|
||||
if (scan->rs_base.rs_flags & SO_ALLOW_SYNC)
|
||||
ss_report_location(scan->rs_base.rs_rd, block);
|
||||
|
||||
/* we're done if we're back at where we started */
|
||||
if (block == scan->rs_startblock)
|
||||
if (--scan->rs_numblocks == 0)
|
||||
return InvalidBlockNumber;
|
||||
|
||||
/* check if the limit imposed by heap_setscanlimits() is met */
|
||||
if (scan->rs_numblocks != InvalidBlockNumber)
|
||||
{
|
||||
if (--scan->rs_numblocks == 0)
|
||||
return InvalidBlockNumber;
|
||||
}
|
||||
|
||||
return block;
|
||||
}
|
||||
else
|
||||
{
|
||||
return table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
|
||||
scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc)
|
||||
scan->rs_base.rs_parallel);
|
||||
}
|
||||
|
||||
return block;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -879,6 +925,7 @@ continue_page:
|
|||
|
||||
scan->rs_cbuf = InvalidBuffer;
|
||||
scan->rs_cblock = InvalidBlockNumber;
|
||||
scan->rs_prefetch_block = InvalidBlockNumber;
|
||||
tuple->t_data = NULL;
|
||||
scan->rs_inited = false;
|
||||
}
|
||||
|
@ -974,6 +1021,7 @@ continue_page:
|
|||
ReleaseBuffer(scan->rs_cbuf);
|
||||
scan->rs_cbuf = InvalidBuffer;
|
||||
scan->rs_cblock = InvalidBlockNumber;
|
||||
scan->rs_prefetch_block = InvalidBlockNumber;
|
||||
tuple->t_data = NULL;
|
||||
scan->rs_inited = false;
|
||||
}
|
||||
|
@ -1069,6 +1117,33 @@ heap_beginscan(Relation relation, Snapshot snapshot,
|
|||
|
||||
initscan(scan, key, false);
|
||||
|
||||
scan->rs_read_stream = NULL;
|
||||
|
||||
/*
|
||||
* Set up a read stream for sequential scans and TID range scans. This
|
||||
* should be done after initscan() because initscan() allocates the
|
||||
* BufferAccessStrategy object passed to the streaming read API.
|
||||
*/
|
||||
if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN ||
|
||||
scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN)
|
||||
{
|
||||
ReadStreamBlockNumberCB cb;
|
||||
|
||||
if (scan->rs_base.rs_parallel)
|
||||
cb = heap_scan_stream_read_next_parallel;
|
||||
else
|
||||
cb = heap_scan_stream_read_next_serial;
|
||||
|
||||
scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL,
|
||||
scan->rs_strategy,
|
||||
scan->rs_base.rs_rd,
|
||||
MAIN_FORKNUM,
|
||||
cb,
|
||||
scan,
|
||||
0);
|
||||
}
|
||||
|
||||
|
||||
return (TableScanDesc) scan;
|
||||
}
|
||||
|
||||
|
@ -1111,6 +1186,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
|
|||
|
||||
Assert(scan->rs_empty_tuples_pending == 0);
|
||||
|
||||
/*
|
||||
* The read stream is reset on rescan. This must be done before
|
||||
* initscan(), as some state referred to by read_stream_reset() is reset
|
||||
* in initscan().
|
||||
*/
|
||||
if (scan->rs_read_stream)
|
||||
read_stream_reset(scan->rs_read_stream);
|
||||
|
||||
/*
|
||||
* reinitialize scan descriptor
|
||||
*/
|
||||
|
@ -1135,6 +1218,12 @@ heap_endscan(TableScanDesc sscan)
|
|||
|
||||
Assert(scan->rs_empty_tuples_pending == 0);
|
||||
|
||||
/*
|
||||
* Must free the read stream before freeing the BufferAccessStrategy.
|
||||
*/
|
||||
if (scan->rs_read_stream)
|
||||
read_stream_end(scan->rs_read_stream);
|
||||
|
||||
/*
|
||||
* decrement relation reference count and free scan descriptor storage
|
||||
*/
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "storage/bufpage.h"
|
||||
#include "storage/dsm.h"
|
||||
#include "storage/lockdefs.h"
|
||||
#include "storage/read_stream.h"
|
||||
#include "storage/shm_toc.h"
|
||||
#include "utils/relcache.h"
|
||||
#include "utils/snapshot.h"
|
||||
|
@ -70,6 +71,20 @@ typedef struct HeapScanDescData
|
|||
|
||||
HeapTupleData rs_ctup; /* current tuple in scan, if any */
|
||||
|
||||
/* For scans that stream reads */
|
||||
ReadStream *rs_read_stream;
|
||||
|
||||
/*
|
||||
* For sequential scans and TID range scans to stream reads. The read
|
||||
* stream is allocated at the beginning of the scan and reset on rescan or
|
||||
* when the scan direction changes. The scan direction is saved each time
|
||||
* a new page is requested. If the scan direction changes from one page to
|
||||
* the next, the read stream releases all previously pinned buffers and
|
||||
* resets the prefetch block.
|
||||
*/
|
||||
ScanDirection rs_dir;
|
||||
BlockNumber rs_prefetch_block;
|
||||
|
||||
/*
|
||||
* For parallel scans to store page allocation data. NULL when not
|
||||
* performing a parallel scan.
|
||||
|
|
Loading…
Reference in New Issue