diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index f439c582a5..ed9a3d6c0e 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \
- spill slot truncate
+ spill slot truncate stream
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top
diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
new file mode 100644
index 0000000000..9a5d7e7c43
--- /dev/null
+++ b/contrib/test_decoding/expected/stream.out
@@ -0,0 +1,94 @@
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+CREATE TABLE stream_test(data text);
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------
+(0 rows)
+
+-- streaming test with sub-transaction
+BEGIN;
+savepoint s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+ ?column?
+----------
+ msg5
+(1 row)
+
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+rollback to s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1');
+ data
+----------------------------------------------------------
+ opening a streamed block for transaction
+ streaming message: transactional: 1 prefix: test, sz: 50
+ closing a streamed block for transaction
+ aborting streamed (sub)transaction
+ opening a streamed block for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ closing a streamed block for transaction
+ committing streamed transaction
+(27 rows)
+
+-- streaming test for toast changes
+ALTER TABLE stream_test ALTER COLUMN data set storage external;
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------
+(0 rows)
+
+INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 10) g(i);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1');
+ data
+------------------------------------------
+ opening a streamed block for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ closing a streamed block for transaction
+ committing streamed transaction
+(13 rows)
+
+DROP TABLE stream_test;
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/expected/truncate.out b/contrib/test_decoding/expected/truncate.out
index 1cf2ae835c..e64d377214 100644
--- a/contrib/test_decoding/expected/truncate.out
+++ b/contrib/test_decoding/expected/truncate.out
@@ -25,3 +25,9 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
COMMIT
(9 rows)
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql
new file mode 100644
index 0000000000..8abc30de0a
--- /dev/null
+++ b/contrib/test_decoding/sql/stream.sql
@@ -0,0 +1,30 @@
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE stream_test(data text);
+
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- streaming test with sub-transaction
+BEGIN;
+savepoint s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+rollback to s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+COMMIT;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1');
+
+-- streaming test for toast changes
+ALTER TABLE stream_test ALTER COLUMN data set storage external;
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 10) g(i);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1');
+
+DROP TABLE stream_test;
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/truncate.sql b/contrib/test_decoding/sql/truncate.sql
index 5aecdf0881..5633854e0d 100644
--- a/contrib/test_decoding/sql/truncate.sql
+++ b/contrib/test_decoding/sql/truncate.sql
@@ -11,3 +11,4 @@ TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
TRUNCATE tab1, tab2;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index dbef52a3af..34745150e9 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -122,6 +122,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
{
ListCell *option;
TestDecodingData *data;
+ bool enable_streaming = false;
data = palloc0(sizeof(TestDecodingData));
data->context = AllocSetContextCreate(ctx->context,
@@ -212,6 +213,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
+ else if (strcmp(elem->defname, "stream-changes") == 0)
+ {
+ if (elem->arg == NULL)
+ continue;
+ else if (!parse_bool(strVal(elem->arg), &enable_streaming))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ }
else
{
ereport(ERROR,
@@ -221,6 +232,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
elem->arg ? strVal(elem->arg) : "(null)")));
}
}
+
+ ctx->streaming &= enable_streaming;
}
/* cleanup this plugin's resources */
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 791a62b57c..1571d71a5b 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -433,9 +433,12 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
ALTER TABLE user_catalog_table SET (user_catalog_table = true);
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
- Any actions leading to transaction ID assignment are prohibited. That, among others,
- includes writing to tables, performing DDL changes, and
- calling pg_current_xact_id().
+ Note that access to user catalog tables or regular system catalog tables
+ in the output plugins has to be done via the systable_*
+ scan APIs only. Access via the heap_* scan APIs will
+ error out. Additionally, any actions leading to transaction ID assignment
+ are prohibited. That, among others, includes writing to tables, performing
+ DDL changes, and calling pg_current_xact_id().
diff --git a/doc/src/sgml/test-decoding.sgml b/doc/src/sgml/test-decoding.sgml
index 8356a3d67b..fe7c9783fa 100644
--- a/doc/src/sgml/test-decoding.sgml
+++ b/doc/src/sgml/test-decoding.sgml
@@ -39,4 +39,26 @@ postgres=# SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'i
+
+ We can also get the changes of the in-progress transaction and the typical
+ output, might be:
+
+
+postgres[33712]=#* SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'stream-changes', '1');
+ lsn | xid | data
+-----------+-----+--------------------------------------------------
+ 0/16B21F8 | 503 | opening a streamed block for transaction TXN 503
+ 0/16B21F8 | 503 | streaming change for TXN 503
+ 0/16B2300 | 503 | streaming change for TXN 503
+ 0/16B2408 | 503 | streaming change for TXN 503
+ 0/16BEBA0 | 503 | closing a streamed block for transaction TXN 503
+ 0/16B21F8 | 503 | opening a streamed block for transaction TXN 503
+ 0/16BECA8 | 503 | streaming change for TXN 503
+ 0/16BEDB0 | 503 | streaming change for TXN 503
+ 0/16BEEB8 | 503 | streaming change for TXN 503
+ 0/16BEBA0 | 503 | closing a streamed block for transaction TXN 503
+(10 rows)
+
+
+
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 5eef225f5c..00169006fb 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1299,6 +1299,16 @@ heap_getnext(TableScanDesc sscan, ScanDirection direction)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg_internal("only heap AM is supported")));
+ /*
+ * We don't expect direct calls to heap_getnext with valid CheckXidAlive
+ * for catalog or regular tables. See detailed comments in xact.c where
+ * these variables are declared. Normally we have such a check at tableam
+ * level API but this is called from many places so we need to ensure it
+ * here.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected heap_getnext call during logical decoding");
+
/* Note: no locking manipulations needed */
if (scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE)
@@ -1956,6 +1966,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
{
xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
bufflags |= REGBUF_KEEP_DATA;
+
+ if (IsToastRelation(relation))
+ xlrec.flags |= XLH_INSERT_ON_TOAST_RELATION;
}
XLogBeginInsert();
diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c
index dba10890aa..c77128087c 100644
--- a/src/backend/access/heap/heapam_visibility.c
+++ b/src/backend/access/heap/heapam_visibility.c
@@ -1571,8 +1571,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
htup, buffer,
&cmin, &cmax);
+ /*
+ * If we haven't resolved the combocid to cmin/cmax, that means we
+ * have not decoded the combocid yet. That means the cmin is
+ * definitely in the future, and we're not supposed to see the tuple
+ * yet.
+ *
+ * XXX This only applies to decoding of in-progress transactions. In
+ * regular logical decoding we only execute this code at commit time,
+ * at which point we should have seen all relevant combocids. So
+ * ideally, we should error out in this case but in practice, this
+ * won't happen. If we are too worried about this then we can add an
+ * elog inside ResolveCminCmaxDuringDecoding.
+ *
+ * XXX For the streaming case, we can track the largest combocid
+ * assigned, and error out based on this (when unable to resolve
+ * combocid below that observed maximum value).
+ */
if (!resolved)
- elog(ERROR, "could not resolve cmin/cmax of catalog tuple");
+ return false;
Assert(cmin != InvalidCommandId);
@@ -1642,10 +1659,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
htup, buffer,
&cmin, &cmax);
- if (!resolved)
- elog(ERROR, "could not resolve combocid to cmax");
-
- Assert(cmax != InvalidCommandId);
+ /*
+ * If we haven't resolved the combocid to cmin/cmax, that means we
+ * have not decoded the combocid yet. That means the cmax is
+ * definitely in the future, and we're still supposed to see the
+ * tuple.
+ *
+ * XXX This only applies to decoding of in-progress transactions. In
+ * regular logical decoding we only execute this code at commit time,
+ * at which point we should have seen all relevant combocids. So
+ * ideally, we should error out in this case but in practice, this
+ * won't happen. If we are too worried about this then we can add an
+ * elog inside ResolveCminCmaxDuringDecoding.
+ *
+ * XXX For the streaming case, we can track the largest combocid
+ * assigned, and error out based on this (when unable to resolve
+ * combocid below that observed maximum value).
+ */
+ if (!resolved || cmax == InvalidCommandId)
+ return true;
if (cmax >= snapshot->curcid)
return true; /* deleted after scan started */
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index dfba5ae39a..e3164e674a 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -28,6 +28,7 @@
#include "lib/stringinfo.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
+#include "storage/procarray.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@@ -429,9 +430,36 @@ systable_beginscan(Relation heapRelation,
sysscan->iscan = NULL;
}
+ /*
+ * If CheckXidAlive is set then set a flag to indicate that system table
+ * scan is in-progress. See detailed comments in xact.c where these
+ * variables are declared.
+ */
+ if (TransactionIdIsValid(CheckXidAlive))
+ bsysscan = true;
+
return sysscan;
}
+/*
+ * HandleConcurrentAbort - Handle concurrent abort of the CheckXidAlive.
+ *
+ * Error out, if CheckXidAlive is aborted. We can't directly use
+ * TransactionIdDidAbort as after crash such transaction might not have been
+ * marked as aborted. See detailed comments in xact.c where the variable
+ * is declared.
+ */
+static inline void
+HandleConcurrentAbort()
+{
+ if (TransactionIdIsValid(CheckXidAlive) &&
+ !TransactionIdIsInProgress(CheckXidAlive) &&
+ !TransactionIdDidCommit(CheckXidAlive))
+ ereport(ERROR,
+ (errcode(ERRCODE_TRANSACTION_ROLLBACK),
+ errmsg("transaction aborted during system catalog scan")));
+}
+
/*
* systable_getnext --- get next tuple in a heap-or-index scan
*
@@ -481,6 +509,12 @@ systable_getnext(SysScanDesc sysscan)
}
}
+ /*
+ * Handle the concurrent abort while fetching the catalog tuple during
+ * logical streaming of a transaction.
+ */
+ HandleConcurrentAbort();
+
return htup;
}
@@ -517,6 +551,12 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup)
sysscan->slot,
freshsnap);
+ /*
+ * Handle the concurrent abort while fetching the catalog tuple during
+ * logical streaming of a transaction.
+ */
+ HandleConcurrentAbort();
+
return result;
}
@@ -545,6 +585,13 @@ systable_endscan(SysScanDesc sysscan)
if (sysscan->snapshot)
UnregisterSnapshot(sysscan->snapshot);
+ /*
+ * Reset the bsysscan flag at the end of the systable scan. See
+ * detailed comments in xact.c where these variables are declared.
+ */
+ if (TransactionIdIsValid(CheckXidAlive))
+ bsysscan = false;
+
pfree(sysscan);
}
@@ -643,6 +690,12 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
if (htup && sysscan->iscan->xs_recheck)
elog(ERROR, "system catalog scans with lossy index conditions are not implemented");
+ /*
+ * Handle the concurrent abort while fetching the catalog tuple during
+ * logical streaming of a transaction.
+ */
+ HandleConcurrentAbort();
+
return htup;
}
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index 3afb63b1fe..c638319765 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -248,6 +248,14 @@ table_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid)
Relation rel = scan->rs_rd;
const TableAmRoutine *tableam = rel->rd_tableam;
+ /*
+ * We don't expect direct calls to table_tuple_get_latest_tid with valid
+ * CheckXidAlive for catalog or regular tables. See detailed comments in
+ * xact.c where these variables are declared.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected table_tuple_get_latest_tid call during logical decoding");
+
/*
* Since this can be called with user-supplied TID, don't trust the input
* too much.
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index d4f7c29847..727d616035 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -82,6 +82,19 @@ bool XactDeferrable;
int synchronous_commit = SYNCHRONOUS_COMMIT_ON;
+/*
+ * CheckXidAlive is a xid value pointing to a possibly ongoing (sub)
+ * transaction. Currently, it is used in logical decoding. It's possible
+ * that such transactions can get aborted while the decoding is ongoing in
+ * which case we skip decoding that particular transaction. To ensure that we
+ * check whether the CheckXidAlive is aborted after fetching the tuple from
+ * system tables. We also ensure that during logical decoding we never
+ * directly access the tableam or heap APIs because we are checking for the
+ * concurrent aborts only in systable_* APIs.
+ */
+TransactionId CheckXidAlive = InvalidTransactionId;
+bool bsysscan = false;
+
/*
* When running as a parallel worker, we place only a single
* TransactionStateData on the parallel worker's state stack, and the XID
@@ -2680,6 +2693,9 @@ AbortTransaction(void)
/* Forget about any active REINDEX. */
ResetReindexState(s->nestingLevel);
+ /* Reset logical streaming state. */
+ ResetLogicalStreamingState();
+
/* If in parallel mode, clean up workers and exit parallel mode. */
if (IsInParallelMode())
{
@@ -4982,6 +4998,9 @@ AbortSubTransaction(void)
/* Forget about any active REINDEX. */
ResetReindexState(s->nestingLevel);
+ /* Reset logical streaming state. */
+ ResetLogicalStreamingState();
+
/* Exit from parallel mode, if necessary. */
if (IsInParallelMode())
{
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index f3a1c31a29..f21f61d5e1 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -724,7 +724,9 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = true;
- ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+ change,
+ xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
}
/*
@@ -791,7 +793,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = true;
- ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+ change, false);
}
/*
@@ -848,7 +851,8 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = true;
- ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+ change, false);
}
/*
@@ -884,7 +888,7 @@ DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
memcpy(change->data.truncate.relids, xlrec->relids,
xlrec->nrelids * sizeof(Oid));
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
- buf->origptr, change);
+ buf->origptr, change, false);
}
/*
@@ -984,7 +988,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = false;
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
- buf->origptr, change);
+ buf->origptr, change, false);
/* move to the next xl_multi_insert_tuple entry */
data += datalen;
@@ -1022,7 +1026,8 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = true;
- ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+ change, false);
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 05d24b93da..42f284b33f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1442,3 +1442,13 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
SpinLockRelease(&MyReplicationSlot->mutex);
}
}
+
+/*
+ * Clear logical streaming state during (sub)transaction abort.
+ */
+void
+ResetLogicalStreamingState(void)
+{
+ CheckXidAlive = InvalidTransactionId;
+ bsysscan = false;
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index ce6e62152f..5b7afe6d9e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -178,6 +178,21 @@ typedef struct ReorderBufferDiskChange
/* data follows */
} ReorderBufferDiskChange;
+#define IsSpecInsert(action) \
+( \
+ ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
+)
+#define IsSpecConfirm(action) \
+( \
+ ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) \
+)
+#define IsInsertOrUpdate(action) \
+( \
+ (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
+ ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
+ ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
+)
+
/*
* Maximum number of changes kept in memory, per transaction. After that,
* changes are spooled to disk.
@@ -236,6 +251,7 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *change);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno);
@@ -244,6 +260,16 @@ static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
ReorderBufferTXN *txn, CommandId cid);
+/*
+ * ---------------------------------------
+ * Streaming support functions
+ * ---------------------------------------
+ */
+static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
+static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
+static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn);
+
/* ---------------------------------------
* toast reassembly support
* ---------------------------------------
@@ -367,6 +393,9 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
dlist_init(&txn->tuplecids);
dlist_init(&txn->subtxns);
+ /* InvalidCommandId is not zero, so set it explicitly */
+ txn->command_id = InvalidCommandId;
+
return txn;
}
@@ -416,13 +445,15 @@ ReorderBufferGetChange(ReorderBuffer *rb)
}
/*
- * Free an ReorderBufferChange.
+ * Free a ReorderBufferChange and update memory accounting, if requested.
*/
void
-ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
+ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
+ bool upd_mem)
{
/* update memory accounting info */
- ReorderBufferChangeMemoryUpdate(rb, change, false);
+ if (upd_mem)
+ ReorderBufferChangeMemoryUpdate(rb, change, false);
/* free contained data */
switch (change->action)
@@ -624,16 +655,102 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
}
/*
- * Queue a change into a transaction so it can be replayed upon commit.
+ * Record the partial change for the streaming of in-progress transactions. We
+ * can stream only complete changes so if we have a partial change like toast
+ * table insert or speculative insert then we mark such a 'txn' so that it
+ * can't be streamed. We also ensure that if the changes in such a 'txn' are
+ * above logical_decoding_work_mem threshold then we stream them as soon as we
+ * have a complete change.
+ */
+static void
+ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ ReorderBufferChange *change,
+ bool toast_insert)
+{
+ ReorderBufferTXN *toptxn;
+
+ /*
+ * The partial changes need to be processed only while streaming
+ * in-progress transactions.
+ */
+ if (!ReorderBufferCanStream(rb))
+ return;
+
+ /* Get the top transaction. */
+ if (txn->toptxn != NULL)
+ toptxn = txn->toptxn;
+ else
+ toptxn = txn;
+
+ /*
+ * Set the toast insert bit whenever we get toast insert to indicate a
+ * partial change and clear it when we get the insert or update on main
+ * table (Both update and insert will do the insert in the toast table).
+ */
+ if (toast_insert)
+ toptxn->txn_flags |= RBTXN_HAS_TOAST_INSERT;
+ else if (rbtxn_has_toast_insert(toptxn) &&
+ IsInsertOrUpdate(change->action))
+ toptxn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT;
+
+ /*
+ * Set the spec insert bit whenever we get the speculative insert to
+ * indicate the partial change and clear the same on speculative confirm.
+ */
+ if (IsSpecInsert(change->action))
+ toptxn->txn_flags |= RBTXN_HAS_SPEC_INSERT;
+ else if (IsSpecConfirm(change->action))
+ {
+ /*
+ * Speculative confirm change must be preceded by speculative
+ * insertion.
+ */
+ Assert(rbtxn_has_spec_insert(toptxn));
+ toptxn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT;
+ }
+
+ /*
+ * Stream the transaction if it is serialized before and the changes are
+ * now complete in the top-level transaction.
+ *
+ * The reason for doing the streaming of such a transaction as soon as we
+ * get the complete change for it is that previously it would have reached
+ * the memory threshold and wouldn't get streamed because of incomplete
+ * changes. Delaying such transactions would increase apply lag for them.
+ */
+ if (ReorderBufferCanStartStreaming(rb) &&
+ !(rbtxn_has_incomplete_tuple(toptxn)) &&
+ rbtxn_is_serialized(txn))
+ ReorderBufferStreamTXN(rb, toptxn);
+}
+
+/*
+ * Queue a change into a transaction so it can be replayed upon commit or will be
+ * streamed when we reach logical_decoding_work_mem threshold.
*/
void
ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
- ReorderBufferChange *change)
+ ReorderBufferChange *change, bool toast_insert)
{
ReorderBufferTXN *txn;
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+ /*
+ * While streaming the previous changes we have detected that the
+ * transaction is aborted. So there is no point in collecting further
+ * changes for it.
+ */
+ if (txn->concurrent_abort)
+ {
+ /*
+ * We don't need to update memory accounting for this change as we
+ * have not added it to the queue yet.
+ */
+ ReorderBufferReturnChange(rb, change, false);
+ return;
+ }
+
change->lsn = lsn;
change->txn = txn;
@@ -645,6 +762,9 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
/* update memory accounting information */
ReorderBufferChangeMemoryUpdate(rb, change, true);
+ /* process partial change */
+ ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
+
/* check the memory limits and evict something if needed */
ReorderBufferCheckMemoryLimit(rb);
}
@@ -674,7 +794,7 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
change->data.msg.message = palloc(message_size);
memcpy(change->data.msg.message, message, message_size);
- ReorderBufferQueueChange(rb, xid, lsn, change);
+ ReorderBufferQueueChange(rb, xid, lsn, change, false);
MemoryContextSwitchTo(oldcontext);
}
@@ -763,6 +883,38 @@ AssertTXNLsnOrder(ReorderBuffer *rb)
#endif
}
+/*
+ * AssertChangeLsnOrder
+ *
+ * Check ordering of changes in the (sub)transaction.
+ */
+static void
+AssertChangeLsnOrder(ReorderBufferTXN *txn)
+{
+#ifdef USE_ASSERT_CHECKING
+ dlist_iter iter;
+ XLogRecPtr prev_lsn = txn->first_lsn;
+
+ dlist_foreach(iter, &txn->changes)
+ {
+ ReorderBufferChange *cur_change;
+
+ cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
+
+ Assert(txn->first_lsn != InvalidXLogRecPtr);
+ Assert(cur_change->lsn != InvalidXLogRecPtr);
+ Assert(txn->first_lsn <= cur_change->lsn);
+
+ if (txn->end_lsn != InvalidXLogRecPtr)
+ Assert(cur_change->lsn <= txn->end_lsn);
+
+ Assert(prev_lsn <= cur_change->lsn);
+
+ prev_lsn = cur_change->lsn;
+ }
+#endif
+}
+
/*
* ReorderBufferGetOldestTXN
* Return oldest transaction in reorderbuffer
@@ -1018,6 +1170,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
*iter_state = NULL;
+ /* Check ordering of changes in the toplevel transaction. */
+ AssertChangeLsnOrder(txn);
+
/*
* Calculate the size of our heap: one element for every transaction that
* contains changes. (Besides the transactions already in the reorder
@@ -1032,6 +1187,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
+ /* Check ordering of changes in this subtransaction. */
+ AssertChangeLsnOrder(cur_txn);
+
if (cur_txn->nentries > 0)
nr_txns++;
}
@@ -1148,7 +1306,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
{
change = dlist_container(ReorderBufferChange, node,
dlist_pop_head_node(&state->old_change));
- ReorderBufferReturnChange(rb, change);
+ ReorderBufferReturnChange(rb, change, true);
Assert(dlist_is_empty(&state->old_change));
}
@@ -1234,7 +1392,7 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
change = dlist_container(ReorderBufferChange, node,
dlist_pop_head_node(&state->old_change));
- ReorderBufferReturnChange(rb, change);
+ ReorderBufferReturnChange(rb, change, true);
Assert(dlist_is_empty(&state->old_change));
}
@@ -1280,7 +1438,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* Check we're not mixing changes from different transactions. */
Assert(change->txn == txn);
- ReorderBufferReturnChange(rb, change);
+ ReorderBufferReturnChange(rb, change, true);
}
/*
@@ -1297,7 +1455,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
Assert(change->txn == txn);
Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
- ReorderBufferReturnChange(rb, change);
+ ReorderBufferReturnChange(rb, change, true);
}
/*
@@ -1309,6 +1467,15 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_delete(&txn->base_snapshot_node);
}
+ /*
+ * Cleanup the snapshot for the last streamed run.
+ */
+ if (txn->snapshot_now != NULL)
+ {
+ Assert(rbtxn_is_streamed(txn));
+ ReorderBufferFreeSnap(rb, txn->snapshot_now);
+ }
+
/*
* Remove TXN from its containing list.
*
@@ -1334,6 +1501,91 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferReturnTXN(rb, txn);
}
+/*
+ * Discard changes from a transaction (and subtransactions), after streaming
+ * them. Keep the remaining info - transactions, tuplecids, invalidations and
+ * snapshots.
+ */
+static void
+ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+ dlist_mutable_iter iter;
+
+ /* cleanup subtransactions & their changes */
+ dlist_foreach_modify(iter, &txn->subtxns)
+ {
+ ReorderBufferTXN *subtxn;
+
+ subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
+
+ /*
+ * Subtransactions are always associated to the toplevel TXN, even if
+ * they originally were happening inside another subtxn, so we won't
+ * ever recurse more than one level deep here.
+ */
+ Assert(rbtxn_is_known_subxact(subtxn));
+ Assert(subtxn->nsubtxns == 0);
+
+ ReorderBufferTruncateTXN(rb, subtxn);
+ }
+
+ /* cleanup changes in the toplevel txn */
+ dlist_foreach_modify(iter, &txn->changes)
+ {
+ ReorderBufferChange *change;
+
+ change = dlist_container(ReorderBufferChange, node, iter.cur);
+
+ /* Check we're not mixing changes from different transactions. */
+ Assert(change->txn == txn);
+
+ /* remove the change from it's containing list */
+ dlist_delete(&change->node);
+
+ ReorderBufferReturnChange(rb, change, true);
+ }
+
+ /*
+ * Mark the transaction as streamed.
+ *
+ * The toplevel transaction, identified by (toptxn==NULL), is marked as
+ * streamed always, even if it does not contain any changes (that is, when
+ * all the changes are in subtransactions).
+ *
+ * For subtransactions, we only mark them as streamed when there are
+ * changes in them.
+ *
+ * We do it this way because of aborts - we don't want to send aborts for
+ * XIDs the downstream is not aware of. And of course, it always knows
+ * about the toplevel xact (we send the XID in all messages), but we never
+ * stream XIDs of empty subxacts.
+ */
+ if ((!txn->toptxn) || (txn->nentries_mem != 0))
+ txn->txn_flags |= RBTXN_IS_STREAMED;
+
+ /*
+ * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any
+ * memory. We could also keep the hash table and update it with new ctid
+ * values, but this seems simpler and good enough for now.
+ */
+ if (txn->tuplecid_hash != NULL)
+ {
+ hash_destroy(txn->tuplecid_hash);
+ txn->tuplecid_hash = NULL;
+ }
+
+ /* If this txn is serialized then clean the disk space. */
+ if (rbtxn_is_serialized(txn))
+ {
+ ReorderBufferRestoreCleanup(rb, txn);
+ txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
+ }
+
+ /* also reset the number of entries in the transaction */
+ txn->nentries_mem = 0;
+ txn->nentries = 0;
+}
+
/*
* Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
* HeapTupleSatisfiesHistoricMVCC.
@@ -1485,57 +1737,191 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
}
/*
- * Perform the replay of a transaction and its non-aborted subtransactions.
- *
- * Subtransactions previously have to be processed by
- * ReorderBufferCommitChild(), even if previously assigned to the toplevel
- * transaction with ReorderBufferAssignChild.
- *
- * We currently can only decode a transaction's contents when its commit
- * record is read because that's the only place where we know about cache
- * invalidations. Thus, once a toplevel commit is read, we iterate over the top
- * and subtransactions (using a k-way merge) and replay the changes in lsn
- * order.
+ * If the transaction was (partially) streamed, we need to commit it in a
+ * 'streamed' way. That is, we first stream the remaining part of the
+ * transaction, and then invoke stream_commit message.
*/
-void
-ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
- XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
- TimestampTz commit_time,
- RepOriginId origin_id, XLogRecPtr origin_lsn)
+static void
+ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
- ReorderBufferTXN *txn;
- volatile Snapshot snapshot_now;
- volatile CommandId command_id = FirstCommandId;
- bool using_subtxn;
- ReorderBufferIterTXNState *volatile iterstate = NULL;
+ /* we should only call this for previously streamed transactions */
+ Assert(rbtxn_is_streamed(txn));
- txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
- false);
+ ReorderBufferStreamTXN(rb, txn);
- /* unknown transaction, nothing to replay */
- if (txn == NULL)
+ rb->stream_commit(rb, txn, txn->final_lsn);
+
+ ReorderBufferCleanupTXN(rb, txn);
+}
+
+/*
+ * Set xid to detect concurrent aborts.
+ *
+ * While streaming an in-progress transaction there is a possibility that the
+ * (sub)transaction might get aborted concurrently. In such case if the
+ * (sub)transaction has catalog update then we might decode the tuple using
+ * wrong catalog version. For example, suppose there is one catalog tuple with
+ * (xmin: 500, xmax: 0). Now, the transaction 501 updates the catalog tuple
+ * and after that we will have two tuples (xmin: 500, xmax: 501) and
+ * (xmin: 501, xmax: 0). Now, if 501 is aborted and some other transaction
+ * say 502 updates the same catalog tuple then the first tuple will be changed
+ * to (xmin: 500, xmax: 502). So, the problem is that when we try to decode
+ * the tuple inserted/updated in 501 after the catalog update, we will see the
+ * catalog tuple with (xmin: 500, xmax: 502) as visible because it will
+ * consider that the tuple is deleted by xid 502 which is not visible to our
+ * snapshot. And when we will try to decode with that catalog tuple, it can
+ * lead to a wrong result or a crash. So, it is necessary to detect
+ * concurrent aborts to allow streaming of in-progress transactions.
+ *
+ * For detecting the concurrent abort we set CheckXidAlive to the current
+ * (sub)transaction's xid for which this change belongs to. And, during
+ * catalog scan we can check the status of the xid and if it is aborted we will
+ * report a specific error so that we can stop streaming current transaction
+ * and discard the already streamed changes on such an error. We might have
+ * already streamed some of the changes for the aborted (sub)transaction, but
+ * that is fine because when we decode the abort we will stream abort message
+ * to truncate the changes in the subscriber.
+ */
+static inline void
+SetupCheckXidLive(TransactionId xid)
+{
+ /*
+ * If the input transaction id is already set as a CheckXidAlive then
+ * nothing to do.
+ */
+ if (TransactionIdEquals(CheckXidAlive, xid))
return;
- txn->final_lsn = commit_lsn;
- txn->end_lsn = end_lsn;
- txn->commit_time = commit_time;
- txn->origin_id = origin_id;
- txn->origin_lsn = origin_lsn;
-
/*
- * If this transaction has no snapshot, it didn't make any changes to the
- * database, so there's nothing to decode. Note that
- * ReorderBufferCommitChild will have transferred any snapshots from
- * subtransactions if there were any.
+ * setup CheckXidAlive if it's not committed yet. We don't check if the
+ * xid is aborted. That will happen during catalog access.
*/
- if (txn->base_snapshot == NULL)
+ if (!TransactionIdDidCommit(xid))
+ CheckXidAlive = xid;
+ else
+ CheckXidAlive = InvalidTransactionId;
+}
+
+/*
+ * Helper function for ReorderBufferProcessTXN for applying change.
+ */
+static inline void
+ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ Relation relation, ReorderBufferChange *change,
+ bool streaming)
+{
+ if (streaming)
+ rb->stream_change(rb, txn, relation, change);
+ else
+ rb->apply_change(rb, txn, relation, change);
+}
+
+/*
+ * Helper function for ReorderBufferProcessTXN for applying the truncate.
+ */
+static inline void
+ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ int nrelations, Relation *relations,
+ ReorderBufferChange *change, bool streaming)
+{
+ if (streaming)
+ rb->stream_truncate(rb, txn, nrelations, relations, change);
+ else
+ rb->apply_truncate(rb, txn, nrelations, relations, change);
+}
+
+/*
+ * Helper function for ReorderBufferProcessTXN for applying the message.
+ */
+static inline void
+ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ ReorderBufferChange *change, bool streaming)
+{
+ if (streaming)
+ rb->stream_message(rb, txn, change->lsn, true,
+ change->data.msg.prefix,
+ change->data.msg.message_size,
+ change->data.msg.message);
+ else
+ rb->message(rb, txn, change->lsn, true,
+ change->data.msg.prefix,
+ change->data.msg.message_size,
+ change->data.msg.message);
+}
+
+/*
+ * Function to store the command id and snapshot at the end of the current
+ * stream so that we can reuse the same while sending the next stream.
+ */
+static inline void
+ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ Snapshot snapshot_now, CommandId command_id)
+{
+ txn->command_id = command_id;
+
+ /* Avoid copying if it's already copied. */
+ if (snapshot_now->copied)
+ txn->snapshot_now = snapshot_now;
+ else
+ txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
+ txn, command_id);
+}
+
+/*
+ * Helper function for ReorderBufferProcessTXN to handle the concurrent
+ * abort of the streaming transaction. This resets the TXN such that it
+ * can be used to stream the remaining data of transaction being processed.
+ */
+static void
+ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ Snapshot snapshot_now,
+ CommandId command_id,
+ XLogRecPtr last_lsn,
+ ReorderBufferChange *specinsert)
+{
+ /* Discard the changes that we just streamed */
+ ReorderBufferTruncateTXN(rb, txn);
+
+ /* Free all resources allocated for toast reconstruction */
+ ReorderBufferToastReset(rb, txn);
+
+ /* Return the spec insert change if it is not NULL */
+ if (specinsert != NULL)
{
- Assert(txn->ninvalidations == 0);
- ReorderBufferCleanupTXN(rb, txn);
- return;
+ ReorderBufferReturnChange(rb, specinsert, true);
+ specinsert = NULL;
}
- snapshot_now = txn->base_snapshot;
+ /* Stop the stream. */
+ rb->stream_stop(rb, txn, last_lsn);
+
+ /* Remember the command ID and snapshot for the streaming run */
+ ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
+}
+
+/*
+ * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN.
+ *
+ * Send data of a transaction (and its subtransactions) to the
+ * output plugin. We iterate over the top and subtransactions (using a k-way
+ * merge) and replay the changes in lsn order.
+ *
+ * If streaming is true then data will be sent using stream API.
+ */
+static void
+ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn,
+ volatile Snapshot snapshot_now,
+ volatile CommandId command_id,
+ bool streaming)
+{
+ bool using_subtxn;
+ MemoryContext ccxt = CurrentMemoryContext;
+ ReorderBufferIterTXNState *volatile iterstate = NULL;
+ volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
+ ReorderBufferChange *volatile specinsert = NULL;
+ volatile bool stream_started = false;
+ ReorderBufferTXN *volatile curtxn = NULL;
/* build data to be able to lookup the CommandIds of catalog tuples */
ReorderBufferBuildTupleCidHash(rb, txn);
@@ -1558,14 +1944,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
PG_TRY();
{
ReorderBufferChange *change;
- ReorderBufferChange *specinsert = NULL;
if (using_subtxn)
- BeginInternalSubTransaction("replay");
+ BeginInternalSubTransaction(streaming ? "stream" : "replay");
else
StartTransactionCommand();
- rb->begin(rb, txn);
+ /* We only need to send begin/commit for non-streamed transactions. */
+ if (!streaming)
+ rb->begin(rb, txn);
ReorderBufferIterTXNInit(rb, txn, &iterstate);
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
@@ -1573,6 +1960,36 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
Relation relation = NULL;
Oid reloid;
+ /*
+ * We can't call start stream callback before processing first
+ * change.
+ */
+ if (prev_lsn == InvalidXLogRecPtr)
+ {
+ if (streaming)
+ {
+ txn->origin_id = change->origin_id;
+ rb->stream_start(rb, txn, change->lsn);
+ stream_started = true;
+ }
+ }
+
+ /*
+ * Enforce correct ordering of changes, merged from multiple
+ * subtransactions. The changes may have the same LSN due to
+ * MULTI_INSERT xlog records.
+ */
+ Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
+
+ prev_lsn = change->lsn;
+
+ /* Set the current xid to detect concurrent aborts. */
+ if (streaming)
+ {
+ curtxn = change->txn;
+ SetupCheckXidLive(curtxn->xid);
+ }
+
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -1649,7 +2066,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
if (!IsToastRelation(relation))
{
ReorderBufferToastReplace(rb, txn, relation, change);
- rb->apply_change(rb, txn, relation, change);
+ ReorderBufferApplyChange(rb, txn, relation, change,
+ streaming);
/*
* Only clear reassembled toast chunks if we're sure
@@ -1685,11 +2103,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
*/
if (specinsert != NULL)
{
- ReorderBufferReturnChange(rb, specinsert);
+ ReorderBufferReturnChange(rb, specinsert, true);
specinsert = NULL;
}
- if (relation != NULL)
+ if (RelationIsValid(relation))
{
RelationClose(relation);
relation = NULL;
@@ -1714,7 +2132,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
/* clear out a pending (and thus failed) speculation */
if (specinsert != NULL)
{
- ReorderBufferReturnChange(rb, specinsert);
+ ReorderBufferReturnChange(rb, specinsert, true);
specinsert = NULL;
}
@@ -1747,7 +2165,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
relations[nrelations++] = relation;
}
- rb->apply_truncate(rb, txn, nrelations, relations, change);
+ /* Apply the truncate. */
+ ReorderBufferApplyTruncate(rb, txn, nrelations,
+ relations, change,
+ streaming);
for (i = 0; i < nrelations; i++)
RelationClose(relations[i]);
@@ -1756,10 +2177,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
}
case REORDER_BUFFER_CHANGE_MESSAGE:
- rb->message(rb, txn, change->lsn, true,
- change->data.msg.prefix,
- change->data.msg.message_size,
- change->data.msg.message);
+ ReorderBufferApplyMessage(rb, txn, change, streaming);
break;
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
@@ -1790,7 +2208,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
snapshot_now = change->data.snapshot;
}
-
/* and continue with the new one */
SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
break;
@@ -1837,7 +2254,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
*/
if (specinsert)
{
- ReorderBufferReturnChange(rb, specinsert);
+ ReorderBufferReturnChange(rb, specinsert, true);
specinsert = NULL;
}
@@ -1845,14 +2262,35 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
ReorderBufferIterTXNFinish(rb, iterstate);
iterstate = NULL;
- /* call commit callback */
- rb->commit(rb, txn, commit_lsn);
+ /*
+ * Done with current changes, send the last message for this set of
+ * changes depending upon streaming mode.
+ */
+ if (streaming)
+ {
+ if (stream_started)
+ {
+ rb->stream_stop(rb, txn, prev_lsn);
+ stream_started = false;
+ }
+ }
+ else
+ rb->commit(rb, txn, commit_lsn);
/* this is just a sanity check against bad output plugin behaviour */
if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
elog(ERROR, "output plugin used XID %u",
GetCurrentTransactionId());
+ /*
+ * Remember the command ID and snapshot for the next set of changes in
+ * streaming mode.
+ */
+ if (streaming)
+ ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
+ else if (snapshot_now->copied)
+ ReorderBufferFreeSnap(rb, snapshot_now);
+
/* cleanup */
TeardownHistoricSnapshot(false);
@@ -1870,14 +2308,27 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
- if (snapshot_now->copied)
- ReorderBufferFreeSnap(rb, snapshot_now);
+ /*
+ * If we are streaming the in-progress transaction then discard the
+ * changes that we just streamed, and mark the transactions as
+ * streamed (if they contained changes). Otherwise, remove all the
+ * changes and deallocate the ReorderBufferTXN.
+ */
+ if (streaming)
+ {
+ ReorderBufferTruncateTXN(rb, txn);
- /* remove potential on-disk data, and deallocate */
- ReorderBufferCleanupTXN(rb, txn);
+ /* Reset the CheckXidAlive */
+ CheckXidAlive = InvalidTransactionId;
+ }
+ else
+ ReorderBufferCleanupTXN(rb, txn);
}
PG_CATCH();
{
+ MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
+ ErrorData *errdata = CopyErrorData();
+
/* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
if (iterstate)
ReorderBufferIterTXNFinish(rb, iterstate);
@@ -1896,17 +2347,108 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
- if (snapshot_now->copied)
- ReorderBufferFreeSnap(rb, snapshot_now);
+ /*
+ * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
+ * abort of the (sub)transaction we are streaming. We need to do the
+ * cleanup and return gracefully on this error, see SetupCheckXidLive.
+ */
+ if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK)
+ {
+ /*
+ * This error can only occur when we are sending the data in
+ * streaming mode and the streaming is not finished yet.
+ */
+ Assert(streaming);
+ Assert(stream_started);
- /* remove potential on-disk data, and deallocate */
- ReorderBufferCleanupTXN(rb, txn);
+ /* Cleanup the temporary error state. */
+ FlushErrorState();
+ FreeErrorData(errdata);
+ errdata = NULL;
+ curtxn->concurrent_abort = true;
- PG_RE_THROW();
+ /* Reset the TXN so that it is allowed to stream remaining data. */
+ ReorderBufferResetTXN(rb, txn, snapshot_now,
+ command_id, prev_lsn,
+ specinsert);
+ }
+ else
+ {
+ ReorderBufferCleanupTXN(rb, txn);
+ MemoryContextSwitchTo(ecxt);
+ PG_RE_THROW();
+ }
}
PG_END_TRY();
}
+/*
+ * Perform the replay of a transaction and its non-aborted subtransactions.
+ *
+ * Subtransactions previously have to be processed by
+ * ReorderBufferCommitChild(), even if previously assigned to the toplevel
+ * transaction with ReorderBufferAssignChild.
+ *
+ * This interface is called once a toplevel commit is read for both streamed
+ * as well as non-streamed transactions.
+ */
+void
+ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+ ReorderBufferTXN *txn;
+ Snapshot snapshot_now;
+ CommandId command_id = FirstCommandId;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ /* unknown transaction, nothing to replay */
+ if (txn == NULL)
+ return;
+
+ txn->final_lsn = commit_lsn;
+ txn->end_lsn = end_lsn;
+ txn->commit_time = commit_time;
+ txn->origin_id = origin_id;
+ txn->origin_lsn = origin_lsn;
+
+ /*
+ * If the transaction was (partially) streamed, we need to commit it in a
+ * 'streamed' way. That is, we first stream the remaining part of the
+ * transaction, and then invoke stream_commit message.
+ *
+ * Called after everything (origin ID, LSN, ...) is stored in the
+ * transaction to avoid passing that information directly.
+ */
+ if (rbtxn_is_streamed(txn))
+ {
+ ReorderBufferStreamCommit(rb, txn);
+ return;
+ }
+
+ /*
+ * If this transaction has no snapshot, it didn't make any changes to the
+ * database, so there's nothing to decode. Note that
+ * ReorderBufferCommitChild will have transferred any snapshots from
+ * subtransactions if there were any.
+ */
+ if (txn->base_snapshot == NULL)
+ {
+ Assert(txn->ninvalidations == 0);
+ ReorderBufferCleanupTXN(rb, txn);
+ return;
+ }
+
+ snapshot_now = txn->base_snapshot;
+
+ /* Process and send the changes to output plugin. */
+ ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
+ command_id, false);
+}
+
/*
* Abort a transaction that possibly has previous changes. Needs to be first
* called for subtransactions and then for the toplevel xid.
@@ -1931,6 +2473,22 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
if (txn == NULL)
return;
+ /* For streamed transactions notify the remote node about the abort. */
+ if (rbtxn_is_streamed(txn))
+ {
+ rb->stream_abort(rb, txn, lsn);
+
+ /*
+ * We might have decoded changes for this transaction that could load
+ * the cache as per the current transaction's view (consider DDL's
+ * happened in this transaction). We don't want the decoding of future
+ * transactions to use those cache entries so execute invalidations.
+ */
+ if (txn->ninvalidations > 0)
+ ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
+ txn->invalidations);
+ }
+
/* cosmetic... */
txn->final_lsn = lsn;
@@ -2000,6 +2558,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
if (txn == NULL)
return;
+ /* For streamed transactions notify the remote node about the abort. */
+ if (rbtxn_is_streamed(txn))
+ rb->stream_abort(rb, txn, lsn);
+
/* cosmetic... */
txn->final_lsn = lsn;
@@ -2082,7 +2644,7 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
change->data.snapshot = snap;
change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
- ReorderBufferQueueChange(rb, xid, lsn, change);
+ ReorderBufferQueueChange(rb, xid, lsn, change, false);
}
/*
@@ -2131,12 +2693,21 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
change->data.command_id = cid;
change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
- ReorderBufferQueueChange(rb, xid, lsn, change);
+ ReorderBufferQueueChange(rb, xid, lsn, change, false);
}
/*
- * Update the memory accounting info. We track memory used by the whole
- * reorder buffer and the transaction containing the change.
+ * Update memory counters to account for the new or removed change.
+ *
+ * We update two counters - in the reorder buffer, and in the transaction
+ * containing the change. The reorder buffer counter allows us to quickly
+ * decide if we reached the memory limit, the transaction counter allows
+ * us to quickly pick the largest transaction for eviction.
+ *
+ * When streaming is enabled, we need to update the toplevel transaction
+ * counters instead - we don't really care about subtransactions as we
+ * can't stream them individually anyway, and we only pick toplevel
+ * transactions for eviction. So only toplevel transactions matter.
*/
static void
ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
@@ -2144,6 +2715,8 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
bool addition)
{
Size sz;
+ ReorderBufferTXN *txn;
+ ReorderBufferTXN *toptxn = NULL;
Assert(change->txn);
@@ -2155,19 +2728,41 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
return;
+ txn = change->txn;
+
+ /* If streaming supported, update the total size in top level as well. */
+ if (ReorderBufferCanStream(rb))
+ {
+ if (txn->toptxn != NULL)
+ toptxn = txn->toptxn;
+ else
+ toptxn = txn;
+ }
+
sz = ReorderBufferChangeSize(change);
if (addition)
{
- change->txn->size += sz;
+ txn->size += sz;
rb->size += sz;
+
+ /* Update the total size in the top transaction. */
+ if (toptxn)
+ toptxn->total_size += sz;
}
else
{
- Assert((rb->size >= sz) && (change->txn->size >= sz));
- change->txn->size -= sz;
+ Assert((rb->size >= sz) && (txn->size >= sz));
+ txn->size -= sz;
rb->size -= sz;
+
+ /* Update the total size in the top transaction. */
+ if (toptxn)
+ toptxn->total_size -= sz;
}
+
+ Assert(txn->size <= rb->size);
+ Assert((txn->size >= 0) && (rb->size >= 0));
}
/*
@@ -2387,6 +2982,51 @@ ReorderBufferLargestTXN(ReorderBuffer *rb)
return largest;
}
+/*
+ * Find the largest toplevel transaction to evict (by streaming).
+ *
+ * This can be seen as an optimized version of ReorderBufferLargestTXN, which
+ * should give us the same transaction (because we don't update memory account
+ * for subtransaction with streaming, so it's always 0). But we can simply
+ * iterate over the limited number of toplevel transactions.
+ *
+ * Note that, we skip transactions that contains incomplete changes. There
+ * is a scope of optimization here such that we can select the largest transaction
+ * which has complete changes. But that will make the code and design quite complex
+ * and that might not be worth the benefit. If we plan to stream the transactions
+ * that contains incomplete changes then we need to find a way to partially
+ * stream/truncate the transaction changes in-memory and build a mechanism to
+ * partially truncate the spilled files. Additionally, whenever we partially
+ * stream the transaction we need to maintain the last streamed lsn and next time
+ * we need to restore from that segment and the offset in WAL. As we stream the
+ * changes from the top transaction and restore them subtransaction wise, we need
+ * to even remember the subxact from where we streamed the last change.
+ */
+static ReorderBufferTXN *
+ReorderBufferLargestTopTXN(ReorderBuffer *rb)
+{
+ dlist_iter iter;
+ Size largest_size = 0;
+ ReorderBufferTXN *largest = NULL;
+
+ /* Find the largest top-level transaction. */
+ dlist_foreach(iter, &rb->toplevel_by_lsn)
+ {
+ ReorderBufferTXN *txn;
+
+ txn = dlist_container(ReorderBufferTXN, node, iter.cur);
+
+ if ((largest != NULL || txn->total_size > largest_size) &&
+ (txn->total_size > 0) && !(rbtxn_has_incomplete_tuple(txn)))
+ {
+ largest = txn;
+ largest_size = txn->total_size;
+ }
+ }
+
+ return largest;
+}
+
/*
* Check whether the logical_decoding_work_mem limit was reached, and if yes
* pick the largest (sub)transaction at-a-time to evict and spill its changes to
@@ -2419,11 +3059,33 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
{
/*
* Pick the largest transaction (or subtransaction) and evict it from
- * memory by serializing it to disk.
+ * memory by streaming, if possible. Otherwise, spill to disk.
*/
- txn = ReorderBufferLargestTXN(rb);
+ if (ReorderBufferCanStartStreaming(rb) &&
+ (txn = ReorderBufferLargestTopTXN(rb)) != NULL)
+ {
+ /* we know there has to be one, because the size is not zero */
+ Assert(txn && !txn->toptxn);
+ Assert(txn->total_size > 0);
+ Assert(rb->size >= txn->total_size);
- ReorderBufferSerializeTXN(rb, txn);
+ ReorderBufferStreamTXN(rb, txn);
+ }
+ else
+ {
+ /*
+ * Pick the largest transaction (or subtransaction) and evict it
+ * from memory by serializing it to disk.
+ */
+ txn = ReorderBufferLargestTXN(rb);
+
+ /* we know there has to be one, because the size is not zero */
+ Assert(txn);
+ Assert(txn->size > 0);
+ Assert(rb->size >= txn->size);
+
+ ReorderBufferSerializeTXN(rb, txn);
+ }
/*
* After eviction, the transaction should have no entries in memory,
@@ -2501,7 +3163,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferSerializeChange(rb, txn, fd, change);
dlist_delete(&change->node);
- ReorderBufferReturnChange(rb, change);
+ ReorderBufferReturnChange(rb, change, true);
spilled++;
}
@@ -2713,6 +3375,136 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
Assert(ondisk->change.action == change->action);
}
+/* Returns true, if the output plugin supports streaming, false, otherwise. */
+static inline bool
+ReorderBufferCanStream(ReorderBuffer *rb)
+{
+ LogicalDecodingContext *ctx = rb->private_data;
+
+ return ctx->streaming;
+}
+
+/* Returns true, if the streaming can be started now, false, otherwise. */
+static inline bool
+ReorderBufferCanStartStreaming(ReorderBuffer *rb)
+{
+ LogicalDecodingContext *ctx = rb->private_data;
+ SnapBuild *builder = ctx->snapshot_builder;
+
+ /*
+ * We can't start streaming immediately even if the streaming is enabled
+ * because we previously decoded this transaction and now just are
+ * restarting.
+ */
+ if (ReorderBufferCanStream(rb) &&
+ !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
+ {
+ /* We must have a consistent snapshot by this time */
+ Assert(SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT);
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Send data of a large transaction (and its subtransactions) to the
+ * output plugin, but using the stream API.
+ */
+static void
+ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+ Snapshot snapshot_now;
+ CommandId command_id;
+
+ /* We can never reach here for a subtransaction. */
+ Assert(txn->toptxn == NULL);
+
+ /*
+ * We can't make any assumptions about base snapshot here, similar to what
+ * ReorderBufferCommit() does. That relies on base_snapshot getting
+ * transferred from subxact in ReorderBufferCommitChild(), but that was
+ * not yet called as the transaction is in-progress.
+ *
+ * So just walk the subxacts and use the same logic here. But we only need
+ * to do that once, when the transaction is streamed for the first time.
+ * After that we need to reuse the snapshot from the previous run.
+ *
+ * Unlike DecodeCommit which adds xids of all the subtransactions in
+ * snapshot's xip array via SnapBuildCommittedTxn, we can't do that here
+ * but we do add them to subxip array instead via ReorderBufferCopySnap.
+ * This allows the catalog changes made in subtransactions decoded till
+ * now to be visible.
+ */
+ if (txn->snapshot_now == NULL)
+ {
+ dlist_iter subxact_i;
+
+ /* make sure this transaction is streamed for the first time */
+ Assert(!rbtxn_is_streamed(txn));
+
+ /* at the beginning we should have invalid command ID */
+ Assert(txn->command_id == InvalidCommandId);
+
+ dlist_foreach(subxact_i, &txn->subtxns)
+ {
+ ReorderBufferTXN *subtxn;
+
+ subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
+ ReorderBufferTransferSnapToParent(txn, subtxn);
+ }
+
+ /*
+ * If this transaction has no snapshot, it didn't make any changes to
+ * the database till now, so there's nothing to decode.
+ */
+ if (txn->base_snapshot == NULL)
+ {
+ Assert(txn->ninvalidations == 0);
+ return;
+ }
+
+ command_id = FirstCommandId;
+ snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
+ txn, command_id);
+ }
+ else
+ {
+ /* the transaction must have been already streamed */
+ Assert(rbtxn_is_streamed(txn));
+
+ /*
+ * Nah, we already have snapshot from the previous streaming run. We
+ * assume new subxacts can't move the LSN backwards, and so can't beat
+ * the LSN condition in the previous branch (so no need to walk
+ * through subxacts again). In fact, we must not do that as we may be
+ * using snapshot half-way through the subxact.
+ */
+ command_id = txn->command_id;
+
+ /*
+ * We can't use txn->snapshot_now directly because after the last
+ * streaming run, we might have got some new sub-transactions. So we
+ * need to add them to the snapshot.
+ */
+ snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
+ txn, command_id);
+
+ /* Free the previously copied snapshot. */
+ Assert(txn->snapshot_now->copied);
+ ReorderBufferFreeSnap(rb, txn->snapshot_now);
+ txn->snapshot_now = NULL;
+ }
+
+ /* Process and send the changes to output plugin. */
+ ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
+ command_id, true);
+
+ Assert(dlist_is_empty(&txn->changes));
+ Assert(txn->nentries == 0);
+ Assert(txn->nentries_mem == 0);
+}
+
/*
* Size of a change in memory.
*/
@@ -2813,7 +3605,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
dlist_delete(&cleanup->node);
- ReorderBufferReturnChange(rb, cleanup);
+ ReorderBufferReturnChange(rb, cleanup, true);
}
txn->nentries_mem = 0;
Assert(dlist_is_empty(&txn->changes));
@@ -3522,7 +4314,7 @@ ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_container(ReorderBufferChange, node, it.cur);
dlist_delete(&change->node);
- ReorderBufferReturnChange(rb, change);
+ ReorderBufferReturnChange(rb, change, true);
}
}
@@ -3812,6 +4604,17 @@ ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
BlockNumber blockno;
bool updated_mapping = false;
+ /*
+ * Return unresolved if tuplecid_data is not valid. That's because when
+ * streaming in-progress transactions we may run into tuples with the CID
+ * before actually decoding them. Think e.g. about INSERT followed by
+ * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
+ * INSERT. So in such cases, we assume the CID is from the future
+ * command.
+ */
+ if (tuplecid_data == NULL)
+ return false;
+
/* be careful about padding */
memset(&key, 0, sizeof(key));
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 95d18cdb12..aa17f7df84 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -67,6 +67,7 @@
#define XLH_INSERT_LAST_IN_MULTI (1<<1)
#define XLH_INSERT_IS_SPECULATIVE (1<<2)
#define XLH_INSERT_CONTAINS_NEW_TUPLE (1<<3)
+#define XLH_INSERT_ON_TOAST_RELATION (1<<4)
/*
* xl_heap_update flag values, 8 bits are available.
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 7ba72c84e0..387eb34a61 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -19,6 +19,7 @@
#include "access/relscan.h"
#include "access/sdir.h"
+#include "access/xact.h"
#include "utils/guc.h"
#include "utils/rel.h"
#include "utils/snapshot.h"
@@ -903,6 +904,15 @@ static inline bool
table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
{
slot->tts_tableOid = RelationGetRelid(sscan->rs_rd);
+
+ /*
+ * We don't expect direct calls to table_scan_getnextslot with valid
+ * CheckXidAlive for catalog or regular tables. See detailed comments in
+ * xact.c where these variables are declared.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected table_scan_getnextslot call during logical decoding");
+
return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot);
}
@@ -1017,6 +1027,13 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan,
TupleTableSlot *slot,
bool *call_again, bool *all_dead)
{
+ /*
+ * We don't expect direct calls to table_index_fetch_tuple with valid
+ * CheckXidAlive for catalog or regular tables. See detailed comments in
+ * xact.c where these variables are declared.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected table_index_fetch_tuple call during logical decoding");
return scan->rel->rd_tableam->index_fetch_tuple(scan, tid, snapshot,
slot, call_again,
@@ -1056,6 +1073,14 @@ table_tuple_fetch_row_version(Relation rel,
Snapshot snapshot,
TupleTableSlot *slot)
{
+ /*
+ * We don't expect direct calls to table_tuple_fetch_row_version with
+ * valid CheckXidAlive for catalog or regular tables. See detailed
+ * comments in xact.c where these variables are declared.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected table_tuple_fetch_row_version call during logical decoding");
+
return rel->rd_tableam->tuple_fetch_row_version(rel, tid, snapshot, slot);
}
@@ -1713,6 +1738,14 @@ static inline bool
table_scan_bitmap_next_block(TableScanDesc scan,
struct TBMIterateResult *tbmres)
{
+ /*
+ * We don't expect direct calls to table_scan_bitmap_next_block with valid
+ * CheckXidAlive for catalog or regular tables. See detailed comments in
+ * xact.c where these variables are declared.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected table_scan_bitmap_next_block call during logical decoding");
+
return scan->rs_rd->rd_tableam->scan_bitmap_next_block(scan,
tbmres);
}
@@ -1730,6 +1763,14 @@ table_scan_bitmap_next_tuple(TableScanDesc scan,
struct TBMIterateResult *tbmres,
TupleTableSlot *slot)
{
+ /*
+ * We don't expect direct calls to table_scan_bitmap_next_tuple with valid
+ * CheckXidAlive for catalog or regular tables. See detailed comments in
+ * xact.c where these variables are declared.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected table_scan_bitmap_next_tuple call during logical decoding");
+
return scan->rs_rd->rd_tableam->scan_bitmap_next_tuple(scan,
tbmres,
slot);
@@ -1748,6 +1789,13 @@ static inline bool
table_scan_sample_next_block(TableScanDesc scan,
struct SampleScanState *scanstate)
{
+ /*
+ * We don't expect direct calls to table_scan_sample_next_block with valid
+ * CheckXidAlive for catalog or regular tables. See detailed comments in
+ * xact.c where these variables are declared.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected table_scan_sample_next_block call during logical decoding");
return scan->rs_rd->rd_tableam->scan_sample_next_block(scan, scanstate);
}
@@ -1764,6 +1812,13 @@ table_scan_sample_next_tuple(TableScanDesc scan,
struct SampleScanState *scanstate,
TupleTableSlot *slot)
{
+ /*
+ * We don't expect direct calls to table_scan_sample_next_tuple with valid
+ * CheckXidAlive for catalog or regular tables. See detailed comments in
+ * xact.c where these variables are declared.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected table_scan_sample_next_tuple call during logical decoding");
return scan->rs_rd->rd_tableam->scan_sample_next_tuple(scan, scanstate,
slot);
}
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 53480116a4..c18554bae2 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -81,6 +81,10 @@ typedef enum
/* Synchronous commit level */
extern int synchronous_commit;
+/* used during logical streaming of a transaction */
+extern TransactionId CheckXidAlive;
+extern bool bsysscan;
+
/*
* Miscellaneous flag bits to record events which occur on the top level
* transaction. These flags are only persisted in MyXactFlags and are intended
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index deef31825d..b0fae9808b 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -121,5 +121,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
+extern void ResetLogicalStreamingState(void);
#endif
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 42bc817648..1ae17d5f11 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -162,6 +162,9 @@ typedef struct ReorderBufferChange
#define RBTXN_HAS_CATALOG_CHANGES 0x0001
#define RBTXN_IS_SUBXACT 0x0002
#define RBTXN_IS_SERIALIZED 0x0004
+#define RBTXN_IS_STREAMED 0x0008
+#define RBTXN_HAS_TOAST_INSERT 0x0010
+#define RBTXN_HAS_SPEC_INSERT 0x0020
/* Does the transaction have catalog changes? */
#define rbtxn_has_catalog_changes(txn) \
@@ -181,6 +184,40 @@ typedef struct ReorderBufferChange
((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
)
+/* This transaction's changes has toast insert, without main table insert. */
+#define rbtxn_has_toast_insert(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_HAS_TOAST_INSERT) != 0 \
+)
+/*
+ * This transaction's changes has speculative insert, without speculative
+ * confirm.
+ */
+#define rbtxn_has_spec_insert(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_HAS_SPEC_INSERT) != 0 \
+)
+
+/* Check whether this transaction has an incomplete change. */
+#define rbtxn_has_incomplete_tuple(txn) \
+( \
+ rbtxn_has_toast_insert(txn) || rbtxn_has_spec_insert(txn) \
+)
+
+/*
+ * Has this transaction been streamed to downstream?
+ *
+ * (It's not possible to deduce this from nentries and nentries_mem for
+ * various reasons. For example, all changes may be in subtransactions in
+ * which case we'd have nentries==0 for the toplevel one, which would say
+ * nothing about the streaming. So we maintain this flag, but only for the
+ * toplevel transaction.)
+ */
+#define rbtxn_is_streamed(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
+)
+
typedef struct ReorderBufferTXN
{
/* See above */
@@ -248,6 +285,13 @@ typedef struct ReorderBufferTXN
XLogRecPtr base_snapshot_lsn;
dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */
+ /*
+ * Snapshot/CID from the previous streaming run. Only valid for already
+ * streamed transactions (NULL/InvalidCommandId otherwise).
+ */
+ Snapshot snapshot_now;
+ CommandId command_id;
+
/*
* How many ReorderBufferChange's do we have in this txn.
*
@@ -313,6 +357,12 @@ typedef struct ReorderBufferTXN
* Size of this transaction (changes currently in memory, in bytes).
*/
Size size;
+
+ /* Size of top-transaction including sub-transactions. */
+ Size total_size;
+
+ /* If we have detected concurrent abort then ignore future changes. */
+ bool concurrent_abort;
} ReorderBufferTXN;
/* so we can define the callbacks used inside struct ReorderBuffer itself */
@@ -484,12 +534,14 @@ void ReorderBufferFree(ReorderBuffer *);
ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len);
void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
-void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
+void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *, bool);
Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids);
void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids);
-void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
+void ReorderBufferQueueChange(ReorderBuffer *, TransactionId,
+ XLogRecPtr lsn, ReorderBufferChange *,
+ bool toast_insert);
void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
bool transactional, const char *prefix,
Size message_size, const char *message);