From 4effd0844daf41a962c05188b719df75ca8ced93 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Sat, 31 Aug 2024 17:27:38 +1200 Subject: [PATCH] Fix unfairness in all-cached parallel seq scan. Commit b5a9b18c introduced block streaming infrastructure with a special fast path for all-cached scans, and commit b7b0f3f2 connected the infrastructure up to sequential scans. One of the fast path micro-optimizations had an unintended consequence: it interfered with parallel sequential scan's block range allocator (from commit 56788d21), which has its own ramp-up and ramp-down algorithm when handing out groups of pages to workers. A scan of an all-cached table could give extra blocks to one worker, when others had finished. In some plans (probably already very bad plans, such as the one reported by Alexander), the unfairness could be magnified. An internal buffer of 16 block numbers is removed, keeping just a single block buffer for technical reasons. Back-patch to 17. Reported-by: Alexander Lakhin Discussion: https://postgr.es/m/63a63690-dd92-c809-0b47-af05459e95d1%40gmail.com --- src/backend/storage/aio/read_stream.c | 81 ++++++++------------------- 1 file changed, 23 insertions(+), 58 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index a83c18c2a4..93cdd35fea 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -117,13 +117,10 @@ struct ReadStream bool advice_enabled; /* - * Small buffer of block numbers, useful for 'ungetting' to resolve flow - * control problems when I/Os are split. Also useful for batch-loading - * block numbers in the fast path. + * One-block buffer to support 'ungetting' a block number, to resolve flow + * control problems when I/Os are split. */ - BlockNumber blocknums[16]; - int16 blocknums_count; - int16 blocknums_next; + BlockNumber buffered_blocknum; /* * The callback that will tell us which block numbers to read, and an @@ -167,23 +164,23 @@ get_per_buffer_data(ReadStream *stream, int16 buffer_index) } /* - * Ask the callback which block it would like us to read next, with a small - * buffer in front to allow read_stream_unget_block() to work and to allow the - * fast path to skip this function and work directly from the array. + * Ask the callback which block it would like us to read next, with a one block + * buffer in front to allow read_stream_unget_block() to work. */ static inline BlockNumber read_stream_get_block(ReadStream *stream, void *per_buffer_data) { - if (stream->blocknums_next < stream->blocknums_count) - return stream->blocknums[stream->blocknums_next++]; + BlockNumber blocknum; - /* - * We only bother to fetch one at a time here (but see the fast path which - * uses more). - */ - return stream->callback(stream, - stream->callback_private_data, - per_buffer_data); + blocknum = stream->buffered_blocknum; + if (blocknum != InvalidBlockNumber) + stream->buffered_blocknum = InvalidBlockNumber; + else + blocknum = stream->callback(stream, + stream->callback_private_data, + per_buffer_data); + + return blocknum; } /* @@ -193,42 +190,12 @@ read_stream_get_block(ReadStream *stream, void *per_buffer_data) static inline void read_stream_unget_block(ReadStream *stream, BlockNumber blocknum) { - if (stream->blocknums_next == stream->blocknums_count) - { - /* Never initialized or entirely consumed. Re-initialize. */ - stream->blocknums[0] = blocknum; - stream->blocknums_count = 1; - stream->blocknums_next = 0; - } - else - { - /* Must be the last value return from blocknums array. */ - Assert(stream->blocknums_next > 0); - stream->blocknums_next--; - Assert(stream->blocknums[stream->blocknums_next] == blocknum); - } + /* We shouldn't ever unget more than one block. */ + Assert(stream->buffered_blocknum == InvalidBlockNumber); + Assert(blocknum != InvalidBlockNumber); + stream->buffered_blocknum = blocknum; } -#ifndef READ_STREAM_DISABLE_FAST_PATH -static void -read_stream_fill_blocknums(ReadStream *stream) -{ - BlockNumber blocknum; - int i = 0; - - do - { - blocknum = stream->callback(stream, - stream->callback_private_data, - NULL); - stream->blocknums[i++] = blocknum; - } while (i < lengthof(stream->blocknums) && - blocknum != InvalidBlockNumber); - stream->blocknums_count = i; - stream->blocknums_next = 0; -} -#endif - static void read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) { @@ -530,6 +497,7 @@ read_stream_begin_impl(int flags, stream->queue_size = queue_size; stream->callback = callback; stream->callback_private_data = callback_private_data; + stream->buffered_blocknum = InvalidBlockNumber; /* * Skip the initial ramp-up phase if the caller says we're going to be @@ -649,9 +617,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(buffer != InvalidBuffer); /* Choose the next block to pin. */ - if (unlikely(stream->blocknums_next == stream->blocknums_count)) - read_stream_fill_blocknums(stream); - next_blocknum = stream->blocknums[stream->blocknums_next++]; + next_blocknum = read_stream_get_block(stream, NULL); if (likely(next_blocknum != InvalidBlockNumber)) { @@ -827,9 +793,8 @@ read_stream_reset(ReadStream *stream) /* Stop looking ahead. */ stream->distance = 0; - /* Forget buffered block numbers and fast path state. */ - stream->blocknums_next = 0; - stream->blocknums_count = 0; + /* Forget buffered block number and fast path state. */ + stream->buffered_blocknum = InvalidBlockNumber; stream->fast_path = false; /* Unpin anything that wasn't consumed. */