diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out index 4ab2d47bf8..a76f77601e 100644 --- a/contrib/test_decoding/expected/stream.out +++ b/contrib/test_decoding/expected/stream.out @@ -109,6 +109,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl committing streamed transaction (17 rows) +/* + * Test concurrent abort with toast data. When streaming the second insertion, we + * detect that the subtransaction was aborted, and reset the transaction while having + * the TOAST changes in memory, resulting in deallocating both decoded changes and + * TOAST reconstruction data. Memory usage counters must be updated correctly. + */ +BEGIN; +INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i); +ALTER TABLE stream_test ADD COLUMN i INT; +SAVEPOINT s1; +INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i); +ROLLBACK TO s1; +COMMIT; +SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + count +------- + 5 +(1 row) + DROP TABLE stream_test; SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql index 4feec62972..7f43f0c2ab 100644 --- a/contrib/test_decoding/sql/stream.sql +++ b/contrib/test_decoding/sql/stream.sql @@ -44,5 +44,20 @@ toasted-123456789012345678901234567890123456789012345678901234567890123456789012 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); +/* + * Test concurrent abort with toast data. When streaming the second insertion, we + * detect that the subtransaction was aborted, and reset the transaction while having + * the TOAST changes in memory, resulting in deallocating both decoded changes and + * TOAST reconstruction data. Memory usage counters must be updated correctly. + */ +BEGIN; +INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i); +ALTER TABLE stream_test ADD COLUMN i INT; +SAVEPOINT s1; +INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i); +ROLLBACK TO s1; +COMMIT; +SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + DROP TABLE stream_test; SELECT pg_drop_replication_slot('regression_slot'); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 00a8327e77..b3139c41e2 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -467,6 +467,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Reset the toast hash */ ReorderBufferToastReset(rb, txn); + /* All changes must be deallocated */ + Assert(txn->size == 0); + pfree(txn); } @@ -1506,6 +1509,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { bool found; dlist_mutable_iter iter; + Size mem_freed = 0; /* cleanup subtransactions & their changes */ dlist_foreach_modify(iter, &txn->subtxns) @@ -1535,9 +1539,20 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Check we're not mixing changes from different transactions. */ Assert(change->txn == txn); + /* + * Instead of updating the memory counter for individual changes, + * we sum up the size of memory to free so we can update the memory + * counter all together below. This saves costs of maintaining + * the max-heap. + */ + mem_freed += ReorderBufferChangeSize(change); + ReorderBufferReturnChange(rb, change, false); } + /* Update the memory counter */ + ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed); + /* * Cleanup the tuplecids we stored for decoding catalog snapshot access. * They are always stored in the toplevel transaction. @@ -1594,9 +1609,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) if (rbtxn_is_serialized(txn)) ReorderBufferRestoreCleanup(rb, txn); - /* Update the memory counter */ - ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size); - /* deallocate */ ReorderBufferReturnTXN(rb, txn); } @@ -1616,6 +1628,7 @@ static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared) { dlist_mutable_iter iter; + Size mem_freed = 0; /* cleanup subtransactions & their changes */ dlist_foreach_modify(iter, &txn->subtxns) @@ -1648,11 +1661,19 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep /* remove the change from it's containing list */ dlist_delete(&change->node); + /* + * Instead of updating the memory counter for individual changes, + * we sum up the size of memory to free so we can update the memory + * counter all together below. This saves costs of maintaining + * the max-heap. + */ + mem_freed += ReorderBufferChangeSize(change); + ReorderBufferReturnChange(rb, change, false); } /* Update the memory counter */ - ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size); + ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed); /* * Mark the transaction as streamed. @@ -2062,6 +2083,9 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, rb->stream_stop(rb, txn, last_lsn); ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id); } + + /* All changes must be deallocated */ + Assert(txn->size == 0); } /*