mirror of https://github.com/postgres/postgres
Improve eviction algorithm in ReorderBuffer using max-heap for many subtransactions.
Previously, when selecting the transaction to evict during logical decoding, we check all transactions to find the largest transaction. This could lead to a significant replication lag especially in the case where there are many subtransactions. This commit improves the eviction algorithm in ReorderBuffer using the max-heap with transaction size as the key to efficiently find the largest transaction. The max-heap starts with empty. While the max-heap is empty, we don't do anything for the max-heap when updating the memory counter. Therefore, we get the largest transaction in O(N) time, where N is the number of transactions including top-level transactions and subtransactions. We build the max-heap just before selecting the largest transactions if the number of transactions being decoded is higher than the threshold, MAX_HEAP_TXN_COUNT_THRESHOLD. After building the max-heap, we also update the max-heap when updating the memory counter. The intention is to efficiently find the largest transaction in O(1) time instead of incurring the cost of memory counter updates (O(log N)). Once the number of transactions got lower than the threshold, we reset the max-heap. The performance benchmark results showed significant speed up (more than x30 speed up on my machine) in decoding a transaction with 100k subtransactions, whereas there is no visible overhead in other cases. Reviewed-by: Amit Kapila, Hayato Kuroda, Vignesh C, Ajin Cherian, Tomas Vondra, Shubham Khanna, Peter Smith, Álvaro Herrera, Euler Taveira Discussion: https://postgr.es/m/CAD21AoAfKTgrBrLq96GcTv9d6k97zaQcDM-rxfKEt4GSe0qnaQ%40mail.gmail.com
This commit is contained in:
parent
7487044d6c
commit
5bec1d6bc5
|
@ -67,6 +67,21 @@
|
|||
* allocator, evicting the oldest changes would make it more likely the
|
||||
* memory gets actually freed.
|
||||
*
|
||||
* We use a max-heap with transaction size as the key to efficiently find
|
||||
* the largest transaction. While the max-heap is empty, we don't update
|
||||
* the max-heap when updating the memory counter. Therefore, we can get
|
||||
* the largest transaction in O(N) time, where N is the number of
|
||||
* transactions including top-level transactions and subtransactions.
|
||||
*
|
||||
* We build the max-heap just before selecting the largest transactions
|
||||
* if the number of transactions being decoded is higher than the threshold,
|
||||
* MAX_HEAP_TXN_COUNT_THRESHOLD. After building the max-heap, we also
|
||||
* update the max-heap when updating the memory counter. The intention is
|
||||
* to efficiently find the largest transaction in O(1) time instead of
|
||||
* incurring the cost of memory counter updates (O(log N)). Once the number
|
||||
* of transactions got lower than the threshold, we reset the max-heap
|
||||
* (refer to ReorderBufferMaybeResetMaxHeap() for details).
|
||||
*
|
||||
* We still rely on max_changes_in_memory when loading serialized changes
|
||||
* back into memory. At that point we can't use the memory limit directly
|
||||
* as we load the subxacts independently. One option to deal with this
|
||||
|
@ -107,6 +122,22 @@
|
|||
#include "utils/rel.h"
|
||||
#include "utils/relfilenumbermap.h"
|
||||
|
||||
/*
|
||||
* Threshold of the total number of top-level and sub transactions that
|
||||
* controls whether we use the max-heap for tracking their sizes. Although
|
||||
* using the max-heap to select the largest transaction is effective when
|
||||
* there are many transactions being decoded, maintaining the max-heap while
|
||||
* updating the memory statistics can be costly. Therefore, we use
|
||||
* MaxConnections as the threshold so that we use the max-heap only when
|
||||
* using subtransactions.
|
||||
*/
|
||||
#define MAX_HEAP_TXN_COUNT_THRESHOLD MaxConnections
|
||||
|
||||
/*
|
||||
* A macro to check if the max-heap is ready to use and needs to be updated
|
||||
* accordingly.
|
||||
*/
|
||||
#define ReorderBufferMaxHeapIsReady(rb) !binaryheap_empty((rb)->txn_heap)
|
||||
|
||||
/* entry for a hash table we use to map from xid to our transaction state */
|
||||
typedef struct ReorderBufferTXNByIdEnt
|
||||
|
@ -259,6 +290,9 @@ static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
|||
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
|
||||
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
|
||||
TransactionId xid, XLogSegNo segno);
|
||||
static void ReorderBufferBuildMaxHeap(ReorderBuffer *rb);
|
||||
static void ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb);
|
||||
static int ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg);
|
||||
|
||||
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
|
||||
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
|
||||
|
@ -293,6 +327,7 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
|
|||
static Size ReorderBufferChangeSize(ReorderBufferChange *change);
|
||||
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
|
||||
ReorderBufferChange *change,
|
||||
ReorderBufferTXN *txn,
|
||||
bool addition, Size sz);
|
||||
|
||||
/*
|
||||
|
@ -355,6 +390,17 @@ ReorderBufferAllocate(void)
|
|||
buffer->outbufsize = 0;
|
||||
buffer->size = 0;
|
||||
|
||||
/*
|
||||
* The binaryheap is indexed for faster manipulations.
|
||||
*
|
||||
* We allocate the initial heap size greater than
|
||||
* MAX_HEAP_TXN_COUNT_THRESHOLD because the txn_heap will not be used
|
||||
* until the threshold is exceeded.
|
||||
*/
|
||||
buffer->txn_heap = binaryheap_allocate(MAX_HEAP_TXN_COUNT_THRESHOLD * 2,
|
||||
ReorderBufferTXNSizeCompare,
|
||||
true, NULL);
|
||||
|
||||
buffer->spillTxns = 0;
|
||||
buffer->spillCount = 0;
|
||||
buffer->spillBytes = 0;
|
||||
|
@ -485,7 +531,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
|
|||
{
|
||||
/* update memory accounting info */
|
||||
if (upd_mem)
|
||||
ReorderBufferChangeMemoryUpdate(rb, change, false,
|
||||
ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
|
||||
ReorderBufferChangeSize(change));
|
||||
|
||||
/* free contained data */
|
||||
|
@ -816,7 +862,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
|
|||
txn->nentries_mem++;
|
||||
|
||||
/* update memory accounting information */
|
||||
ReorderBufferChangeMemoryUpdate(rb, change, true,
|
||||
ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
|
||||
ReorderBufferChangeSize(change));
|
||||
|
||||
/* process partial change */
|
||||
|
@ -1527,7 +1573,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
|
|||
/* Check we're not mixing changes from different transactions. */
|
||||
Assert(change->txn == txn);
|
||||
|
||||
ReorderBufferReturnChange(rb, change, true);
|
||||
ReorderBufferReturnChange(rb, change, false);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1586,8 +1632,17 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
|
|||
if (rbtxn_is_serialized(txn))
|
||||
ReorderBufferRestoreCleanup(rb, txn);
|
||||
|
||||
/* Update the memory counter */
|
||||
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
|
||||
|
||||
/* deallocate */
|
||||
ReorderBufferReturnTXN(rb, txn);
|
||||
|
||||
/*
|
||||
* After cleaning up one transaction, the number of transactions might get
|
||||
* lower than the threshold for the max-heap.
|
||||
*/
|
||||
ReorderBufferMaybeResetMaxHeap(rb);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1637,9 +1692,12 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
|
|||
/* remove the change from it's containing list */
|
||||
dlist_delete(&change->node);
|
||||
|
||||
ReorderBufferReturnChange(rb, change, true);
|
||||
ReorderBufferReturnChange(rb, change, false);
|
||||
}
|
||||
|
||||
/* Update the memory counter */
|
||||
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
|
||||
|
||||
/*
|
||||
* Mark the transaction as streamed.
|
||||
*
|
||||
|
@ -3166,6 +3224,9 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
|
|||
* decide if we reached the memory limit, the transaction counter allows
|
||||
* us to quickly pick the largest transaction for eviction.
|
||||
*
|
||||
* Either txn or change must be non-NULL at least. We update the memory
|
||||
* counter of txn if it's non-NULL, otherwise change->txn.
|
||||
*
|
||||
* When streaming is enabled, we need to update the toplevel transaction
|
||||
* counters instead - we don't really care about subtransactions as we
|
||||
* can't stream them individually anyway, and we only pick toplevel
|
||||
|
@ -3174,22 +3235,27 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
|
|||
static void
|
||||
ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
|
||||
ReorderBufferChange *change,
|
||||
ReorderBufferTXN *txn,
|
||||
bool addition, Size sz)
|
||||
{
|
||||
ReorderBufferTXN *txn;
|
||||
ReorderBufferTXN *toptxn;
|
||||
|
||||
Assert(change->txn);
|
||||
Assert(txn || change);
|
||||
|
||||
/*
|
||||
* Ignore tuple CID changes, because those are not evicted when reaching
|
||||
* memory limit. So we just don't count them, because it might easily
|
||||
* trigger a pointless attempt to spill.
|
||||
*/
|
||||
if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
|
||||
if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
|
||||
return;
|
||||
|
||||
txn = change->txn;
|
||||
if (sz == 0)
|
||||
return;
|
||||
|
||||
if (txn == NULL)
|
||||
txn = change->txn;
|
||||
Assert(txn != NULL);
|
||||
|
||||
/*
|
||||
* Update the total size in top level as well. This is later used to
|
||||
|
@ -3204,6 +3270,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
|
|||
|
||||
/* Update the total size in the top transaction. */
|
||||
toptxn->total_size += sz;
|
||||
|
||||
/* Update the max-heap as well if necessary */
|
||||
if (ReorderBufferMaxHeapIsReady(rb))
|
||||
{
|
||||
if ((txn->size - sz) == 0)
|
||||
binaryheap_add(rb->txn_heap, PointerGetDatum(txn));
|
||||
else
|
||||
binaryheap_update_up(rb->txn_heap, PointerGetDatum(txn));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -3213,6 +3288,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
|
|||
|
||||
/* Update the total size in the top transaction. */
|
||||
toptxn->total_size -= sz;
|
||||
|
||||
/* Update the max-heap as well if necessary */
|
||||
if (ReorderBufferMaxHeapIsReady(rb))
|
||||
{
|
||||
if (txn->size == 0)
|
||||
binaryheap_remove_node_ptr(rb->txn_heap, PointerGetDatum(txn));
|
||||
else
|
||||
binaryheap_update_down(rb->txn_heap, PointerGetDatum(txn));
|
||||
}
|
||||
}
|
||||
|
||||
Assert(txn->size <= rb->size);
|
||||
|
@ -3468,34 +3552,123 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/* Compare two transactions by size */
|
||||
static int
|
||||
ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg)
|
||||
{
|
||||
ReorderBufferTXN *ta = (ReorderBufferTXN *) DatumGetPointer(a);
|
||||
ReorderBufferTXN *tb = (ReorderBufferTXN *) DatumGetPointer(b);
|
||||
|
||||
if (ta->size < tb->size)
|
||||
return -1;
|
||||
if (ta->size > tb->size)
|
||||
return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Find the largest transaction (toplevel or subxact) to evict (spill to disk).
|
||||
*
|
||||
* XXX With many subtransactions this might be quite slow, because we'll have
|
||||
* to walk through all of them. There are some options how we could improve
|
||||
* that: (a) maintain some secondary structure with transactions sorted by
|
||||
* amount of changes, (b) not looking for the entirely largest transaction,
|
||||
* but e.g. for transaction using at least some fraction of the memory limit,
|
||||
* and (c) evicting multiple transactions at once, e.g. to free a given portion
|
||||
* of the memory limit (e.g. 50%).
|
||||
* Build the max-heap. The heap assembly step is deferred until the end, for
|
||||
* efficiency.
|
||||
*/
|
||||
static ReorderBufferTXN *
|
||||
ReorderBufferLargestTXN(ReorderBuffer *rb)
|
||||
static void
|
||||
ReorderBufferBuildMaxHeap(ReorderBuffer *rb)
|
||||
{
|
||||
HASH_SEQ_STATUS hash_seq;
|
||||
ReorderBufferTXNByIdEnt *ent;
|
||||
ReorderBufferTXN *largest = NULL;
|
||||
|
||||
Assert(binaryheap_empty(rb->txn_heap));
|
||||
|
||||
hash_seq_init(&hash_seq, rb->by_txn);
|
||||
while ((ent = hash_seq_search(&hash_seq)) != NULL)
|
||||
{
|
||||
ReorderBufferTXN *txn = ent->txn;
|
||||
|
||||
/* if the current transaction is larger, remember it */
|
||||
if ((!largest) || (txn->size > largest->size))
|
||||
largest = txn;
|
||||
if (txn->size == 0)
|
||||
continue;
|
||||
|
||||
binaryheap_add_unordered(rb->txn_heap, PointerGetDatum(txn));
|
||||
}
|
||||
|
||||
binaryheap_build(rb->txn_heap);
|
||||
}
|
||||
|
||||
/*
|
||||
* Reset the max-heap if the number of transactions got lower than the
|
||||
* threshold.
|
||||
*/
|
||||
static void
|
||||
ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb)
|
||||
{
|
||||
/*
|
||||
* If we add and remove transactions right around the threshold, we could
|
||||
* easily end up "thrashing". To avoid it, we adapt 10% of transactions to
|
||||
* reset the max-heap.
|
||||
*/
|
||||
if (ReorderBufferMaxHeapIsReady(rb) &&
|
||||
binaryheap_size(rb->txn_heap) < MAX_HEAP_TXN_COUNT_THRESHOLD * 0.9)
|
||||
binaryheap_reset(rb->txn_heap);
|
||||
}
|
||||
|
||||
/*
|
||||
* Find the largest transaction (toplevel or subxact) to evict (spill to disk)
|
||||
* by doing a linear search or using the max-heap depending on the number of
|
||||
* transactions in ReorderBuffer. Refer to the comments atop this file for the
|
||||
* algorithm details.
|
||||
*/
|
||||
static ReorderBufferTXN *
|
||||
ReorderBufferLargestTXN(ReorderBuffer *rb)
|
||||
{
|
||||
ReorderBufferTXN *largest = NULL;
|
||||
|
||||
if (!ReorderBufferMaxHeapIsReady(rb))
|
||||
{
|
||||
/*
|
||||
* If the number of transactions are small, we scan all transactions
|
||||
* being decoded to get the largest transaction. This saves the cost
|
||||
* of building a max-heap with a small number of transactions.
|
||||
*/
|
||||
if (hash_get_num_entries(rb->by_txn) < MAX_HEAP_TXN_COUNT_THRESHOLD)
|
||||
{
|
||||
HASH_SEQ_STATUS hash_seq;
|
||||
ReorderBufferTXNByIdEnt *ent;
|
||||
|
||||
hash_seq_init(&hash_seq, rb->by_txn);
|
||||
while ((ent = hash_seq_search(&hash_seq)) != NULL)
|
||||
{
|
||||
ReorderBufferTXN *txn = ent->txn;
|
||||
|
||||
/* if the current transaction is larger, remember it */
|
||||
if ((!largest) || (txn->size > largest->size))
|
||||
largest = txn;
|
||||
}
|
||||
|
||||
Assert(largest);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* There are a large number of transactions in ReorderBuffer. We
|
||||
* build the max-heap for efficiently selecting the largest
|
||||
* transactions.
|
||||
*/
|
||||
ReorderBufferBuildMaxHeap(rb);
|
||||
|
||||
/*
|
||||
* The max-heap is ready now. We remain the max-heap at least
|
||||
* until we free up enough transactions to bring the total memory
|
||||
* usage below the limit. The largest transaction is selected
|
||||
* below.
|
||||
*/
|
||||
Assert(ReorderBufferMaxHeapIsReady(rb));
|
||||
}
|
||||
}
|
||||
|
||||
/* Get the largest transaction from the max-heap */
|
||||
if (ReorderBufferMaxHeapIsReady(rb))
|
||||
largest = (ReorderBufferTXN *)
|
||||
DatumGetPointer(binaryheap_first(rb->txn_heap));
|
||||
|
||||
Assert(largest);
|
||||
Assert(largest->size > 0);
|
||||
Assert(largest->size <= rb->size);
|
||||
|
@ -3638,6 +3811,13 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
|
|||
|
||||
/* We must be under the memory limit now. */
|
||||
Assert(rb->size < logical_decoding_work_mem * 1024L);
|
||||
|
||||
/*
|
||||
* After evicting some transactions, the number of transactions might get
|
||||
* lower than the threshold for the max-heap.
|
||||
*/
|
||||
ReorderBufferMaybeResetMaxHeap(rb);
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -3705,11 +3885,14 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
|
|||
|
||||
ReorderBufferSerializeChange(rb, txn, fd, change);
|
||||
dlist_delete(&change->node);
|
||||
ReorderBufferReturnChange(rb, change, true);
|
||||
ReorderBufferReturnChange(rb, change, false);
|
||||
|
||||
spilled++;
|
||||
}
|
||||
|
||||
/* Update the memory counter */
|
||||
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size);
|
||||
|
||||
/* update the statistics iff we have spilled anything */
|
||||
if (spilled)
|
||||
{
|
||||
|
@ -4491,7 +4674,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
|||
* update the accounting too (subtracting the size from the counters). And
|
||||
* we don't want to underflow there.
|
||||
*/
|
||||
ReorderBufferChangeMemoryUpdate(rb, change, true,
|
||||
ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
|
||||
ReorderBufferChangeSize(change));
|
||||
}
|
||||
|
||||
|
@ -4903,9 +5086,9 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
|||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
/* subtract the old change size */
|
||||
ReorderBufferChangeMemoryUpdate(rb, change, false, old_size);
|
||||
ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size);
|
||||
/* now add the change back, with the correct size */
|
||||
ReorderBufferChangeMemoryUpdate(rb, change, true,
|
||||
ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
|
||||
ReorderBufferChangeSize(change));
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
#define REORDERBUFFER_H
|
||||
|
||||
#include "access/htup_details.h"
|
||||
#include "lib/binaryheap.h"
|
||||
#include "lib/ilist.h"
|
||||
#include "storage/sinval.h"
|
||||
#include "utils/hsearch.h"
|
||||
|
@ -631,6 +632,9 @@ struct ReorderBuffer
|
|||
/* memory accounting */
|
||||
Size size;
|
||||
|
||||
/* Max-heap for sizes of all top-level and sub transactions */
|
||||
binaryheap *txn_heap;
|
||||
|
||||
/*
|
||||
* Statistics about transactions spilled to disk.
|
||||
*
|
||||
|
|
Loading…
Reference in New Issue