From 52d26d560e272613c501e35b24fbf8d710de4b8a Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Wed, 30 Jun 2021 09:37:59 +0530
Subject: [PATCH] Allow streaming the changes after speculative aborts.

Until now, we didn't allow to stream the changes in logical replication
till we receive speculative confirm or the next DML change record after
speculative inserts. The reason was that we never use to process
speculative aborts but after commit 4daa140a2f it is possible to process
them so we can allow streaming once we receive speculative abort after
speculative insertion.

We decided to backpatch to 14 where the feature for streaming in progress
transactions have been introduced as this is a minor change and makes that
functionality better.

Author: Amit Kapila
Reviewed-By: Dilip Kumar
Backpatch-through: 14
Discussion: https://postgr.es/m/CAA4eK1KdqmTCtrBR6oFfGELrLLbDLDedL6zACcsUOQuTJBj1vw@mail.gmail.com
---
 src/backend/replication/logical/reorderbuffer.c | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index ad1c2bad01..b8c5e2a44e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -182,9 +182,10 @@ typedef struct ReorderBufferDiskChange
 ( \
 	((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
 )
-#define IsSpecConfirm(action) \
+#define IsSpecConfirmOrAbort(action) \
 ( \
-	((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) \
+	(((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
+	((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
 )
 #define IsInsertOrUpdate(action) \
 ( \
@@ -731,12 +732,13 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 	/*
 	 * Indicate a partial change for speculative inserts.  The change will be
-	 * considered as complete once we get the speculative confirm token.
+	 * considered as complete once we get the speculative confirm or abort
+	 * token.
 	 */
 	if (IsSpecInsert(change->action))
 		toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE;
 	else if (rbtxn_has_partial_change(toptxn) &&
-			 IsSpecConfirm(change->action))
+			 IsSpecConfirmOrAbort(change->action))
 		toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
 
 	/*