From bc062cb938234d8e815d14fecd8d3b2abff3fe88 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Wed, 25 Aug 2021 08:32:04 -0400
Subject: [PATCH] Fix broken snapshot handling in parallel workers.

Pengchengliu reported an assertion failure in a parallel woker while
performing a parallel scan using an overflowed snapshot. The proximate
cause is that TransactionXmin was set to an incorrect value.  The
underlying cause is incorrect snapshot handling in parallel.c.

In particular, InitializeParallelDSM() was unconditionally calling
GetTransactionSnapshot(), because I (rhaas) mistakenly thought that
was always retrieving an existing snapshot whereas, at isolation
levels less than REPEATABLE READ, it's actually taking a new one. So
instead do this only at higher isolation levels where there actually
is a single snapshot for the whole transaction.

By itself, this is not a sufficient fix, because we still need to
guarantee that TransactionXmin gets set properly in the workers. The
easiest way to do that seems to be to install the leader's active
snapshot as the transaction snapshot if the leader did not serialize a
transaction snapshot. This doesn't affect the results of future
GetTrasnactionSnapshot() calls since those have to take a new snapshot
anyway; what we care about is the side effect of setting TransactionXmin.

Report by Pengchengliu. Patch by Greg Nancarrow, except for some comment
text which I supplied.

Discussion: https://postgr.es/m/002f01d748ac$eaa781a0$bff684e0$@tju.edu.cn
---
 src/backend/access/transam/parallel.c | 52 +++++++++++++++++++--------
 1 file changed, 38 insertions(+), 14 deletions(-)

diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 14a8690019..95ab3722d3 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -254,8 +254,11 @@ InitializeParallelDSM(ParallelContext *pcxt)
 		shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
 		combocidlen = EstimateComboCIDStateSpace();
 		shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
-		tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
-		shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
+		if (IsolationUsesXactSnapshot())
+		{
+			tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
+			shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
+		}
 		asnaplen = EstimateSnapshotSpace(active_snapshot);
 		shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
 		tstatelen = EstimateTransactionStateSpace();
@@ -366,11 +369,19 @@ InitializeParallelDSM(ParallelContext *pcxt)
 		SerializeComboCIDState(combocidlen, combocidspace);
 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
 
-		/* Serialize transaction snapshot and active snapshot. */
-		tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
-		SerializeSnapshot(transaction_snapshot, tsnapspace);
-		shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
-					   tsnapspace);
+		/*
+		 * Serialize the transaction snapshot if the transaction
+		 * isolation-level uses a transaction snapshot.
+		 */
+		if (IsolationUsesXactSnapshot())
+		{
+			tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
+			SerializeSnapshot(transaction_snapshot, tsnapspace);
+			shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
+						   tsnapspace);
+		}
+
+		/* Serialize the active snapshot. */
 		asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
 		SerializeSnapshot(active_snapshot, asnapspace);
 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
@@ -1260,6 +1271,8 @@ ParallelWorkerMain(Datum main_arg)
 	char	   *enumblacklistspace;
 	StringInfoData msgbuf;
 	char	   *session_dsm_handle_space;
+	Snapshot	tsnapshot;
+	Snapshot	asnapshot;
 
 	/* Set flag to indicate that we're initializing a parallel worker. */
 	InitializingParallelWorker = true;
@@ -1407,14 +1420,25 @@ ParallelWorkerMain(Datum main_arg)
 		shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
 	AttachSession(*(dsm_handle *) session_dsm_handle_space);
 
-	/* Restore transaction snapshot. */
-	tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, false);
-	RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
-							   fps->parallel_master_pgproc);
-
-	/* Restore active snapshot. */
+	/*
+	 * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
+	 * the leader has serialized the transaction snapshot and we must restore
+	 * it. At lower isolation levels, there is no transaction-lifetime
+	 * snapshot, but we need TransactionXmin to get set to a value which is
+	 * less than or equal to the xmin of every snapshot that will be used by
+	 * this worker. The easiest way to accomplish that is to install the
+	 * active snapshot as the transaction snapshot. Code running in this
+	 * parallel worker might take new snapshots via GetTransactionSnapshot()
+	 * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a
+	 * snapshot older than the active snapshot.
+	 */
 	asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
-	PushActiveSnapshot(RestoreSnapshot(asnapspace));
+	tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
+	asnapshot = RestoreSnapshot(asnapspace);
+	tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
+	RestoreTransactionSnapshot(tsnapshot,
+							   fps->parallel_master_pgproc);
+	PushActiveSnapshot(asnapshot);
 
 	/*
 	 * We've changed which tuples we can see, and must therefore invalidate