Fix unfairness in all-cached parallel seq scan.
Commit b5a9b18c introduced block streaming infrastructure with a special fast path for all-cached scans, and commit b7b0f3f2 connected the infrastructure up to sequential scans. One of the fast path micro-optimizations had an unintended consequence: it interfered with parallel sequential scan's block range allocator (from commit 56788d21), which has its own ramp-up and ramp-down algorithm when handing out groups of pages to workers. A scan of an all-cached table could give extra blocks to one worker, when others had finished. In some plans (probably already very bad plans, such as the one reported by Alexander), the unfairness could be magnified. An internal buffer of 16 block numbers is removed, keeping just a single block buffer for technical reasons. Back-patch to 17. Reported-by: Alexander Lakhin <exclusion@gmail.com> Discussion: https://postgr.es/m/63a63690-dd92-c809-0b47-af05459e95d1%40gmail.com
This commit is contained in:
parent
ecd56459cf
commit
4effd0844d
@ -117,13 +117,10 @@ struct ReadStream
|
||||
bool advice_enabled;
|
||||
|
||||
/*
|
||||
* Small buffer of block numbers, useful for 'ungetting' to resolve flow
|
||||
* control problems when I/Os are split. Also useful for batch-loading
|
||||
* block numbers in the fast path.
|
||||
* One-block buffer to support 'ungetting' a block number, to resolve flow
|
||||
* control problems when I/Os are split.
|
||||
*/
|
||||
BlockNumber blocknums[16];
|
||||
int16 blocknums_count;
|
||||
int16 blocknums_next;
|
||||
BlockNumber buffered_blocknum;
|
||||
|
||||
/*
|
||||
* The callback that will tell us which block numbers to read, and an
|
||||
@ -167,23 +164,23 @@ get_per_buffer_data(ReadStream *stream, int16 buffer_index)
|
||||
}
|
||||
|
||||
/*
|
||||
* Ask the callback which block it would like us to read next, with a small
|
||||
* buffer in front to allow read_stream_unget_block() to work and to allow the
|
||||
* fast path to skip this function and work directly from the array.
|
||||
* Ask the callback which block it would like us to read next, with a one block
|
||||
* buffer in front to allow read_stream_unget_block() to work.
|
||||
*/
|
||||
static inline BlockNumber
|
||||
read_stream_get_block(ReadStream *stream, void *per_buffer_data)
|
||||
{
|
||||
if (stream->blocknums_next < stream->blocknums_count)
|
||||
return stream->blocknums[stream->blocknums_next++];
|
||||
BlockNumber blocknum;
|
||||
|
||||
/*
|
||||
* We only bother to fetch one at a time here (but see the fast path which
|
||||
* uses more).
|
||||
*/
|
||||
return stream->callback(stream,
|
||||
stream->callback_private_data,
|
||||
per_buffer_data);
|
||||
blocknum = stream->buffered_blocknum;
|
||||
if (blocknum != InvalidBlockNumber)
|
||||
stream->buffered_blocknum = InvalidBlockNumber;
|
||||
else
|
||||
blocknum = stream->callback(stream,
|
||||
stream->callback_private_data,
|
||||
per_buffer_data);
|
||||
|
||||
return blocknum;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -193,42 +190,12 @@ read_stream_get_block(ReadStream *stream, void *per_buffer_data)
|
||||
static inline void
|
||||
read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
|
||||
{
|
||||
if (stream->blocknums_next == stream->blocknums_count)
|
||||
{
|
||||
/* Never initialized or entirely consumed. Re-initialize. */
|
||||
stream->blocknums[0] = blocknum;
|
||||
stream->blocknums_count = 1;
|
||||
stream->blocknums_next = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Must be the last value return from blocknums array. */
|
||||
Assert(stream->blocknums_next > 0);
|
||||
stream->blocknums_next--;
|
||||
Assert(stream->blocknums[stream->blocknums_next] == blocknum);
|
||||
}
|
||||
/* We shouldn't ever unget more than one block. */
|
||||
Assert(stream->buffered_blocknum == InvalidBlockNumber);
|
||||
Assert(blocknum != InvalidBlockNumber);
|
||||
stream->buffered_blocknum = blocknum;
|
||||
}
|
||||
|
||||
#ifndef READ_STREAM_DISABLE_FAST_PATH
|
||||
static void
|
||||
read_stream_fill_blocknums(ReadStream *stream)
|
||||
{
|
||||
BlockNumber blocknum;
|
||||
int i = 0;
|
||||
|
||||
do
|
||||
{
|
||||
blocknum = stream->callback(stream,
|
||||
stream->callback_private_data,
|
||||
NULL);
|
||||
stream->blocknums[i++] = blocknum;
|
||||
} while (i < lengthof(stream->blocknums) &&
|
||||
blocknum != InvalidBlockNumber);
|
||||
stream->blocknums_count = i;
|
||||
stream->blocknums_next = 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
static void
|
||||
read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
|
||||
{
|
||||
@ -530,6 +497,7 @@ read_stream_begin_impl(int flags,
|
||||
stream->queue_size = queue_size;
|
||||
stream->callback = callback;
|
||||
stream->callback_private_data = callback_private_data;
|
||||
stream->buffered_blocknum = InvalidBlockNumber;
|
||||
|
||||
/*
|
||||
* Skip the initial ramp-up phase if the caller says we're going to be
|
||||
@ -649,9 +617,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
|
||||
Assert(buffer != InvalidBuffer);
|
||||
|
||||
/* Choose the next block to pin. */
|
||||
if (unlikely(stream->blocknums_next == stream->blocknums_count))
|
||||
read_stream_fill_blocknums(stream);
|
||||
next_blocknum = stream->blocknums[stream->blocknums_next++];
|
||||
next_blocknum = read_stream_get_block(stream, NULL);
|
||||
|
||||
if (likely(next_blocknum != InvalidBlockNumber))
|
||||
{
|
||||
@ -827,9 +793,8 @@ read_stream_reset(ReadStream *stream)
|
||||
/* Stop looking ahead. */
|
||||
stream->distance = 0;
|
||||
|
||||
/* Forget buffered block numbers and fast path state. */
|
||||
stream->blocknums_next = 0;
|
||||
stream->blocknums_count = 0;
|
||||
/* Forget buffered block number and fast path state. */
|
||||
stream->buffered_blocknum = InvalidBlockNumber;
|
||||
stream->fast_path = false;
|
||||
|
||||
/* Unpin anything that wasn't consumed. */
|
||||
|
Loading…
x
Reference in New Issue
Block a user