diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 31f7381f2d..b567b8b59e 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -695,9 +695,9 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, * Record the partial change for the streaming of in-progress transactions. We * can stream only complete changes so if we have a partial change like toast * table insert or speculative insert then we mark such a 'txn' so that it - * can't be streamed. We also ensure that if the changes in such a 'txn' are - * above logical_decoding_work_mem threshold then we stream them as soon as we - * have a complete change. + * can't be streamed. We also ensure that if the changes in such a 'txn' can + * be streamed and are above logical_decoding_work_mem threshold then we stream + * them as soon as we have a complete change. */ static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, @@ -762,7 +762,8 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, */ if (ReorderBufferCanStartStreaming(rb) && !(rbtxn_has_partial_change(toptxn)) && - rbtxn_is_serialized(txn)) + rbtxn_is_serialized(txn) && + rbtxn_has_streamable_change(toptxn)) ReorderBufferStreamTXN(rb, toptxn); } @@ -793,6 +794,29 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, return; } + /* + * The changes that are sent downstream are considered streamable. We + * remember such transactions so that only those will later be considered + * for streaming. + */ + if (change->action == REORDER_BUFFER_CHANGE_INSERT || + change->action == REORDER_BUFFER_CHANGE_UPDATE || + change->action == REORDER_BUFFER_CHANGE_DELETE || + change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT || + change->action == REORDER_BUFFER_CHANGE_TRUNCATE || + change->action == REORDER_BUFFER_CHANGE_MESSAGE) + { + ReorderBufferTXN *toptxn; + + /* get the top transaction */ + if (txn->toptxn != NULL) + toptxn = txn->toptxn; + else + toptxn = txn; + + toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE; + } + change->lsn = lsn; change->txn = txn; @@ -2942,9 +2966,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) if (txn == NULL) return; - /* For streamed transactions notify the remote node about the abort. */ - if (rbtxn_is_streamed(txn)) - rb->stream_abort(rb, txn, lsn); + /* this transaction mustn't be streamed */ + Assert(!rbtxn_is_streamed(txn)); /* cosmetic... */ txn->final_lsn = lsn; @@ -3460,14 +3483,15 @@ ReorderBufferLargestTXN(ReorderBuffer *rb) } /* - * Find the largest toplevel transaction to evict (by streaming). + * Find the largest streamable toplevel transaction to evict (by streaming). * * This can be seen as an optimized version of ReorderBufferLargestTXN, which * should give us the same transaction (because we don't update memory account * for subtransaction with streaming, so it's always 0). But we can simply * iterate over the limited number of toplevel transactions that have a base * snapshot. There is no use of selecting a transaction that doesn't have base - * snapshot because we don't decode such transactions. + * snapshot because we don't decode such transactions. Also, we do not select + * the transaction which doesn't have any streamable change. * * Note that, we skip transactions that contains incomplete changes. There * is a scope of optimization here such that we can select the largest @@ -3483,7 +3507,7 @@ ReorderBufferLargestTXN(ReorderBuffer *rb) * the subxact from where we streamed the last change. */ static ReorderBufferTXN * -ReorderBufferLargestTopTXN(ReorderBuffer *rb) +ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb) { dlist_iter iter; Size largest_size = 0; @@ -3502,7 +3526,8 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb) Assert(txn->base_snapshot != NULL); if ((largest == NULL || txn->total_size > largest_size) && - (txn->total_size > 0) && !(rbtxn_has_partial_change(txn))) + (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) && + rbtxn_has_streamable_change(txn)) { largest = txn; largest_size = txn->total_size; @@ -3547,7 +3572,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) * memory by streaming, if possible. Otherwise, spill to disk. */ if (ReorderBufferCanStartStreaming(rb) && - (txn = ReorderBufferLargestTopTXN(rb)) != NULL) + (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL) { /* we know there has to be one, because the size is not zero */ Assert(txn && !txn->toptxn); @@ -3919,7 +3944,7 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb) * restarting. */ if (ReorderBufferCanStream(rb) && - !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr)) + !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr)) return true; return false; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index b23d8cc4f9..c700b55b1c 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -168,14 +168,15 @@ typedef struct ReorderBufferChange } ReorderBufferChange; /* ReorderBufferTXN txn_flags */ -#define RBTXN_HAS_CATALOG_CHANGES 0x0001 -#define RBTXN_IS_SUBXACT 0x0002 -#define RBTXN_IS_SERIALIZED 0x0004 -#define RBTXN_IS_SERIALIZED_CLEAR 0x0008 -#define RBTXN_IS_STREAMED 0x0010 -#define RBTXN_HAS_PARTIAL_CHANGE 0x0020 -#define RBTXN_PREPARE 0x0040 -#define RBTXN_SKIPPED_PREPARE 0x0080 +#define RBTXN_HAS_CATALOG_CHANGES 0x0001 +#define RBTXN_IS_SUBXACT 0x0002 +#define RBTXN_IS_SERIALIZED 0x0004 +#define RBTXN_IS_SERIALIZED_CLEAR 0x0008 +#define RBTXN_IS_STREAMED 0x0010 +#define RBTXN_HAS_PARTIAL_CHANGE 0x0020 +#define RBTXN_PREPARE 0x0040 +#define RBTXN_SKIPPED_PREPARE 0x0080 +#define RBTXN_HAS_STREAMABLE_CHANGE 0x0100 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -207,6 +208,12 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \ ) +/* Does this transaction contain streamable changes? */ +#define rbtxn_has_streamable_change(txn) \ +( \ + ((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \ +) + /* * Has this transaction been streamed to downstream? *