Fix memory counter update in ReorderBuffer.

Commit 5bec1d6bc5 changed the memory usage updates of the
ReorderBufferTXN to zero all at once by subtracting txn->size, rather
than updating it for each change. However, if TOAST reconstruction
data remained in the transaction when freeing it, there were cases
where it further subtracted the memory counter from zero, resulting in
an assertion failure.

This change calculates the memory size for each change and updates the
memory usage to precisely the amount that has been freed.

Backpatch to v17, where this was introducd.

Reviewed-by: Amit Kapila, Shlok Kyal
Discussion: https://postgr.es/m/CAD21AoAqkNUvicgKPT_dXzNoOwpPkVTg0QPPxEcWmzT0moCJ1g%40mail.gmail.com
Backpatch-through: 17
This commit is contained in:
Masahiko Sawada 2024-08-26 11:00:07 -07:00
parent 09a8407dbf
commit 52f1d6730b
3 changed files with 62 additions and 4 deletions

View File

@ -109,6 +109,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
committing streamed transaction
(17 rows)
/*
* Test concurrent abort with toast data. When streaming the second insertion, we
* detect that the subtransaction was aborted, and reset the transaction while having
* the TOAST changes in memory, resulting in deallocating both decoded changes and
* TOAST reconstruction data. Memory usage counters must be updated correctly.
*/
BEGIN;
INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
ALTER TABLE stream_test ADD COLUMN i INT;
SAVEPOINT s1;
INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i);
ROLLBACK TO s1;
COMMIT;
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
count
-------
5
(1 row)
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot

View File

@ -44,5 +44,20 @@ toasted-123456789012345678901234567890123456789012345678901234567890123456789012
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
/*
* Test concurrent abort with toast data. When streaming the second insertion, we
* detect that the subtransaction was aborted, and reset the transaction while having
* the TOAST changes in memory, resulting in deallocating both decoded changes and
* TOAST reconstruction data. Memory usage counters must be updated correctly.
*/
BEGIN;
INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
ALTER TABLE stream_test ADD COLUMN i INT;
SAVEPOINT s1;
INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i);
ROLLBACK TO s1;
COMMIT;
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');

View File

@ -467,6 +467,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* Reset the toast hash */
ReorderBufferToastReset(rb, txn);
/* All changes must be deallocated */
Assert(txn->size == 0);
pfree(txn);
}
@ -1506,6 +1509,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
bool found;
dlist_mutable_iter iter;
Size mem_freed = 0;
/* cleanup subtransactions & their changes */
dlist_foreach_modify(iter, &txn->subtxns)
@ -1535,9 +1539,20 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* Check we're not mixing changes from different transactions. */
Assert(change->txn == txn);
/*
* Instead of updating the memory counter for individual changes,
* we sum up the size of memory to free so we can update the memory
* counter all together below. This saves costs of maintaining
* the max-heap.
*/
mem_freed += ReorderBufferChangeSize(change);
ReorderBufferReturnChange(rb, change, false);
}
/* Update the memory counter */
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
/*
* Cleanup the tuplecids we stored for decoding catalog snapshot access.
* They are always stored in the toplevel transaction.
@ -1594,9 +1609,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
if (rbtxn_is_serialized(txn))
ReorderBufferRestoreCleanup(rb, txn);
/* Update the memory counter */
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
/* deallocate */
ReorderBufferReturnTXN(rb, txn);
}
@ -1616,6 +1628,7 @@ static void
ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
{
dlist_mutable_iter iter;
Size mem_freed = 0;
/* cleanup subtransactions & their changes */
dlist_foreach_modify(iter, &txn->subtxns)
@ -1648,11 +1661,19 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
/* remove the change from it's containing list */
dlist_delete(&change->node);
/*
* Instead of updating the memory counter for individual changes,
* we sum up the size of memory to free so we can update the memory
* counter all together below. This saves costs of maintaining
* the max-heap.
*/
mem_freed += ReorderBufferChangeSize(change);
ReorderBufferReturnChange(rb, change, false);
}
/* Update the memory counter */
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
/*
* Mark the transaction as streamed.
@ -2062,6 +2083,9 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
rb->stream_stop(rb, txn, last_lsn);
ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
}
/* All changes must be deallocated */
Assert(txn->size == 0);
}
/*