diff --git a/src/backend/commands/functioncmds.c b/src/backend/commands/functioncmds.c index bb91838884..4f1f4fb1ef 100644 --- a/src/backend/commands/functioncmds.c +++ b/src/backend/commands/functioncmds.c @@ -62,6 +62,7 @@ #include "parser/parse_func.h" #include "parser/parse_type.h" #include "pgstat.h" +#include "tcop/pquery.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -2252,6 +2253,20 @@ ExecuteCallStmt(CallStmt *stmt, ParamListInfo params, bool atomic, DestReceiver if (fcinfo->isnull) elog(ERROR, "procedure returned null record"); + /* + * Ensure there's an active snapshot whilst we execute whatever's + * involved here. Note that this is *not* sufficient to make the + * world safe for TOAST pointers to be included in the returned data: + * the referenced data could have gone away while we didn't hold a + * snapshot. Hence, it's incumbent on PLs that can do COMMIT/ROLLBACK + * to not return TOAST pointers, unless those pointers were fetched + * after the last COMMIT/ROLLBACK in the procedure. + * + * XXX that is a really nasty, hard-to-test requirement. Is there a + * way to remove it? + */ + EnsurePortalSnapshotExists(); + td = DatumGetHeapTupleHeader(retval); tupType = HeapTupleHeaderGetTypeId(td); tupTypmod = HeapTupleHeaderGetTypMod(td); diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index b108168821..97b1a63527 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -251,12 +251,8 @@ _SPI_commit(bool chain) /* Start the actual commit */ _SPI_current->internal_xact = true; - /* - * Before committing, pop all active snapshots to avoid error about - * "snapshot %p still active". - */ - while (ActiveSnapshotSet()) - PopActiveSnapshot(); + /* Release snapshots associated with portals */ + ForgetPortalSnapshots(); if (chain) SaveTransactionCharacteristics(); @@ -313,6 +309,9 @@ _SPI_rollback(bool chain) /* Start the actual rollback */ _SPI_current->internal_xact = true; + /* Release snapshots associated with portals */ + ForgetPortalSnapshots(); + if (chain) SaveTransactionCharacteristics(); @@ -2100,6 +2099,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, uint64 my_processed = 0; SPITupleTable *my_tuptable = NULL; int res = 0; + bool allow_nonatomic = plan->no_snapshots; /* legacy API name */ bool pushed_active_snap = false; ErrorContextCallback spierrcontext; CachedPlan *cplan = NULL; @@ -2132,11 +2132,12 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, * In the first two cases, we can just push the snap onto the stack once * for the whole plan list. * - * But if the plan has no_snapshots set to true, then don't manage - * snapshots at all. The caller should then take care of that. + * Note that snapshot != InvalidSnapshot implies an atomic execution + * context. */ - if (snapshot != InvalidSnapshot && !plan->no_snapshots) + if (snapshot != InvalidSnapshot) { + Assert(!allow_nonatomic); if (read_only) { PushActiveSnapshot(snapshot); @@ -2211,15 +2212,39 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, stmt_list = cplan->stmt_list; /* - * In the default non-read-only case, get a new snapshot, replacing - * any that we pushed in a previous cycle. + * If we weren't given a specific snapshot to use, and the statement + * list requires a snapshot, set that up. */ - if (snapshot == InvalidSnapshot && !read_only && !plan->no_snapshots) + if (snapshot == InvalidSnapshot && + (list_length(stmt_list) > 1 || + (list_length(stmt_list) == 1 && + PlannedStmtRequiresSnapshot(linitial_node(PlannedStmt, + stmt_list))))) { - if (pushed_active_snap) - PopActiveSnapshot(); - PushActiveSnapshot(GetTransactionSnapshot()); - pushed_active_snap = true; + /* + * First, ensure there's a Portal-level snapshot. This back-fills + * the snapshot stack in case the previous operation was a COMMIT + * or ROLLBACK inside a procedure or DO block. (We can't put back + * the Portal snapshot any sooner, or we'd break cases like doing + * SET or LOCK just after COMMIT.) It's enough to check once per + * statement list, since COMMIT/ROLLBACK/CALL/DO can't appear + * within a multi-statement list. + */ + EnsurePortalSnapshotExists(); + + /* + * In the default non-read-only case, get a new per-statement-list + * snapshot, replacing any that we pushed in a previous cycle. + * Skip it when doing non-atomic execution, though (we rely + * entirely on the Portal snapshot in that case). + */ + if (!read_only && !allow_nonatomic) + { + if (pushed_active_snap) + PopActiveSnapshot(); + PushActiveSnapshot(GetTransactionSnapshot()); + pushed_active_snap = true; + } } foreach(lc2, stmt_list) @@ -2231,6 +2256,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, _SPI_current->processed = 0; _SPI_current->tuptable = NULL; + /* Check for unsupported cases. */ if (stmt->utilityStmt) { if (IsA(stmt->utilityStmt, CopyStmt)) @@ -2259,9 +2285,10 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, /* * If not read-only mode, advance the command counter before each - * command and update the snapshot. + * command and update the snapshot. (But skip it if the snapshot + * isn't under our control.) */ - if (!read_only && !plan->no_snapshots) + if (!read_only && pushed_active_snap) { CommandCounterIncrement(); UpdateActiveSnapshotCommandId(); @@ -2295,13 +2322,11 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, QueryCompletion qc; /* - * If the SPI context is atomic, or we are asked to manage - * snapshots, then we are in an atomic execution context. - * Conversely, to propagate a nonatomic execution context, the - * caller must be in a nonatomic SPI context and manage - * snapshots itself. + * If the SPI context is atomic, or we were not told to allow + * nonatomic operations, tell ProcessUtility this is an atomic + * execution context. */ - if (_SPI_current->atomic || !plan->no_snapshots) + if (_SPI_current->atomic || !allow_nonatomic) context = PROCESS_UTILITY_QUERY; else context = PROCESS_UTILITY_QUERY_NONATOMIC; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index e25ad67223..caaa59c7bc 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -199,6 +199,13 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel) ResultRelInfo *resultRelInfo; RangeTblEntry *rte; + /* + * Input functions may need an active snapshot, as may AFTER triggers + * invoked during finish_estate. For safety, ensure an active snapshot + * exists throughout all our usage of the executor. + */ + PushActiveSnapshot(GetTransactionSnapshot()); + estate = CreateExecutorState(); rte = makeNode(RangeTblEntry); @@ -223,6 +230,22 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel) return estate; } +/* + * Finish any operations related to the executor state created by + * create_estate_for_relation(). + */ +static void +finish_estate(EState *estate) +{ + /* Handle any queued AFTER triggers. */ + AfterTriggerEndQuery(estate); + + /* Cleanup. */ + ExecResetTupleTable(estate->es_tupleTable, false); + FreeExecutorState(estate); + PopActiveSnapshot(); +} + /* * Executes default values for columns for which we can't map to remote * relation columns. @@ -634,9 +657,6 @@ apply_handle_insert(StringInfo s) RelationGetDescr(rel->localrel), &TTSOpsVirtual); - /* Input functions may need an active snapshot, so get one */ - PushActiveSnapshot(GetTransactionSnapshot()); - /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_store_cstrings(remoteslot, rel, newtup.values); @@ -651,13 +671,7 @@ apply_handle_insert(StringInfo s) apply_handle_insert_internal(estate->es_result_relation_info, estate, remoteslot); - PopActiveSnapshot(); - - /* Handle queued AFTER triggers. */ - AfterTriggerEndQuery(estate); - - ExecResetTupleTable(estate->es_tupleTable, false); - FreeExecutorState(estate); + finish_estate(estate); logicalrep_rel_close(rel, NoLock); @@ -778,8 +792,6 @@ apply_handle_update(StringInfo s) /* Also populate extraUpdatedCols, in case we have generated columns */ fill_extraUpdatedCols(target_rte, rel->localrel); - PushActiveSnapshot(GetTransactionSnapshot()); - /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_store_cstrings(remoteslot, rel, @@ -794,13 +806,7 @@ apply_handle_update(StringInfo s) apply_handle_update_internal(estate->es_result_relation_info, estate, remoteslot, &newtup, rel); - PopActiveSnapshot(); - - /* Handle queued AFTER triggers. */ - AfterTriggerEndQuery(estate); - - ExecResetTupleTable(estate->es_tupleTable, false); - FreeExecutorState(estate); + finish_estate(estate); logicalrep_rel_close(rel, NoLock); @@ -902,8 +908,6 @@ apply_handle_delete(StringInfo s) RelationGetDescr(rel->localrel), &TTSOpsVirtual); - PushActiveSnapshot(GetTransactionSnapshot()); - /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_store_cstrings(remoteslot, rel, oldtup.values); @@ -917,13 +921,7 @@ apply_handle_delete(StringInfo s) apply_handle_delete_internal(estate->es_result_relation_info, estate, remoteslot, &rel->remoterel); - PopActiveSnapshot(); - - /* Handle queued AFTER triggers. */ - AfterTriggerEndQuery(estate); - - ExecResetTupleTable(estate->es_tupleTable, false); - FreeExecutorState(estate); + finish_estate(estate); logicalrep_rel_close(rel, NoLock); @@ -1248,7 +1246,7 @@ apply_handle_truncate(StringInfo s) List *relids = NIL; List *relids_logged = NIL; ListCell *lc; - LOCKMODE lockmode = AccessExclusiveLock; + LOCKMODE lockmode = AccessExclusiveLock; ensure_transaction(); diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index fa80e0e635..a88a054fb4 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -476,6 +476,13 @@ PortalStart(Portal portal, ParamListInfo params, else PushActiveSnapshot(GetTransactionSnapshot()); + /* + * We could remember the snapshot in portal->portalSnapshot, + * but presently there seems no need to, as this code path + * cannot be used for non-atomic execution. Hence there can't + * be any commit/abort that might destroy the snapshot. + */ + /* * Create QueryDesc in portal's context; for the moment, set * the destination to DestNone. @@ -1114,45 +1121,26 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt, bool isTopLevel, bool setHoldSnapshot, DestReceiver *dest, QueryCompletion *qc) { - Node *utilityStmt = pstmt->utilityStmt; - Snapshot snapshot; - /* - * Set snapshot if utility stmt needs one. Most reliable way to do this - * seems to be to enumerate those that do not need one; this is a short - * list. Transaction control, LOCK, and SET must *not* set a snapshot - * since they need to be executable at the start of a transaction-snapshot - * mode transaction without freezing a snapshot. By extension we allow - * SHOW not to set a snapshot. The other stmts listed are just efficiency - * hacks. Beware of listing anything that can modify the database --- if, - * say, it has to update an index with expressions that invoke - * user-defined functions, then it had better have a snapshot. + * Set snapshot if utility stmt needs one. */ - if (!(IsA(utilityStmt, TransactionStmt) || - IsA(utilityStmt, LockStmt) || - IsA(utilityStmt, VariableSetStmt) || - IsA(utilityStmt, VariableShowStmt) || - IsA(utilityStmt, ConstraintsSetStmt) || - /* efficiency hacks from here down */ - IsA(utilityStmt, FetchStmt) || - IsA(utilityStmt, ListenStmt) || - IsA(utilityStmt, NotifyStmt) || - IsA(utilityStmt, UnlistenStmt) || - IsA(utilityStmt, CheckPointStmt))) + if (PlannedStmtRequiresSnapshot(pstmt)) { - snapshot = GetTransactionSnapshot(); + Snapshot snapshot = GetTransactionSnapshot(); + /* If told to, register the snapshot we're using and save in portal */ if (setHoldSnapshot) { snapshot = RegisterSnapshot(snapshot); portal->holdSnapshot = snapshot; } + /* In any case, make the snapshot active and remember it in portal */ PushActiveSnapshot(snapshot); /* PushActiveSnapshot might have copied the snapshot */ - snapshot = GetActiveSnapshot(); + portal->portalSnapshot = GetActiveSnapshot(); } else - snapshot = NULL; + portal->portalSnapshot = NULL; ProcessUtility(pstmt, portal->sourceText, @@ -1166,13 +1154,17 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt, MemoryContextSwitchTo(portal->portalContext); /* - * Some utility commands may pop the ActiveSnapshot stack from under us, - * so be careful to only pop the stack if our snapshot is still at the - * top. + * Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from + * under us, so don't complain if it's now empty. Otherwise, our snapshot + * should be the top one; pop it. Note that this could be a different + * snapshot from the one we made above; see EnsurePortalSnapshotExists. */ - if (snapshot != NULL && ActiveSnapshotSet() && - snapshot == GetActiveSnapshot()) + if (portal->portalSnapshot != NULL && ActiveSnapshotSet()) + { + Assert(portal->portalSnapshot == GetActiveSnapshot()); PopActiveSnapshot(); + } + portal->portalSnapshot = NULL; } /* @@ -1254,6 +1246,12 @@ PortalRunMulti(Portal portal, * from what holdSnapshot has.) */ PushCopiedSnapshot(snapshot); + + /* + * As for PORTAL_ONE_SELECT portals, it does not seem + * necessary to maintain portal->portalSnapshot here. + */ + active_snapshot_set = true; } else @@ -1690,3 +1688,78 @@ DoPortalRewind(Portal portal) portal->atEnd = false; portal->portalPos = 0; } + +/* + * PlannedStmtRequiresSnapshot - what it says on the tin + */ +bool +PlannedStmtRequiresSnapshot(PlannedStmt *pstmt) +{ + Node *utilityStmt = pstmt->utilityStmt; + + /* If it's not a utility statement, it definitely needs a snapshot */ + if (utilityStmt == NULL) + return true; + + /* + * Most utility statements need a snapshot, and the default presumption + * about new ones should be that they do too. Hence, enumerate those that + * do not need one. + * + * Transaction control, LOCK, and SET must *not* set a snapshot, since + * they need to be executable at the start of a transaction-snapshot-mode + * transaction without freezing a snapshot. By extension we allow SHOW + * not to set a snapshot. The other stmts listed are just efficiency + * hacks. Beware of listing anything that can modify the database --- if, + * say, it has to update an index with expressions that invoke + * user-defined functions, then it had better have a snapshot. + */ + if (IsA(utilityStmt, TransactionStmt) || + IsA(utilityStmt, LockStmt) || + IsA(utilityStmt, VariableSetStmt) || + IsA(utilityStmt, VariableShowStmt) || + IsA(utilityStmt, ConstraintsSetStmt) || + /* efficiency hacks from here down */ + IsA(utilityStmt, FetchStmt) || + IsA(utilityStmt, ListenStmt) || + IsA(utilityStmt, NotifyStmt) || + IsA(utilityStmt, UnlistenStmt) || + IsA(utilityStmt, CheckPointStmt)) + return false; + + return true; +} + +/* + * EnsurePortalSnapshotExists - recreate Portal-level snapshot, if needed + * + * Generally, we will have an active snapshot whenever we are executing + * inside a Portal, unless the Portal's query is one of the utility + * statements exempted from that rule (see PlannedStmtRequiresSnapshot). + * However, procedures and DO blocks can commit or abort the transaction, + * and thereby destroy all snapshots. This function can be called to + * re-establish the Portal-level snapshot when none exists. + */ +void +EnsurePortalSnapshotExists(void) +{ + Portal portal; + + /* + * Nothing to do if a snapshot is set. (We take it on faith that the + * outermost active snapshot belongs to some Portal; or if there is no + * Portal, it's somebody else's responsibility to manage things.) + */ + if (ActiveSnapshotSet()) + return; + + /* Otherwise, we'd better have an active Portal */ + portal = ActivePortal; + Assert(portal != NULL); + Assert(portal->portalSnapshot == NULL); + + /* Create a new snapshot and make it active */ + PushActiveSnapshot(GetTransactionSnapshot()); + /* PushActiveSnapshot might have copied the snapshot */ + portal->portalSnapshot = GetActiveSnapshot(); +} diff --git a/src/backend/utils/mmgr/portalmem.c b/src/backend/utils/mmgr/portalmem.c index 7072ce48a3..a34abbd80c 100644 --- a/src/backend/utils/mmgr/portalmem.c +++ b/src/backend/utils/mmgr/portalmem.c @@ -502,6 +502,9 @@ PortalDrop(Portal portal, bool isTopCommit) portal->cleanup = NULL; } + /* There shouldn't be an active snapshot anymore, except after error */ + Assert(portal->portalSnapshot == NULL || !isTopCommit); + /* * Remove portal from hash table. Because we do this here, we will not * come back to try to remove the portal again if there's any error in the @@ -709,6 +712,8 @@ PreCommit_Portals(bool isPrepare) portal->holdSnapshot = NULL; } portal->resowner = NULL; + /* Clear portalSnapshot too, for cleanliness */ + portal->portalSnapshot = NULL; continue; } @@ -1278,3 +1283,54 @@ HoldPinnedPortals(void) } } } + +/* + * Drop the outer active snapshots for all portals, so that no snapshots + * remain active. + * + * Like HoldPinnedPortals, this must be called when initiating a COMMIT or + * ROLLBACK inside a procedure. This has to be separate from that since it + * should not be run until we're done with steps that are likely to fail. + * + * It's tempting to fold this into PreCommit_Portals, but to do so, we'd + * need to clean up snapshot management in VACUUM and perhaps other places. + */ +void +ForgetPortalSnapshots(void) +{ + HASH_SEQ_STATUS status; + PortalHashEnt *hentry; + int numPortalSnaps = 0; + int numActiveSnaps = 0; + + /* First, scan PortalHashTable and clear portalSnapshot fields */ + hash_seq_init(&status, PortalHashTable); + + while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL) + { + Portal portal = hentry->portal; + + if (portal->portalSnapshot != NULL) + { + portal->portalSnapshot = NULL; + numPortalSnaps++; + } + /* portal->holdSnapshot will be cleaned up in PreCommit_Portals */ + } + + /* + * Now, pop all the active snapshots, which should be just those that were + * portal snapshots. Ideally we'd drive this directly off the portal + * scan, but there's no good way to visit the portals in the correct + * order. So just cross-check after the fact. + */ + while (ActiveSnapshotSet()) + { + PopActiveSnapshot(); + numActiveSnaps++; + } + + if (numPortalSnaps != numActiveSnaps) + elog(ERROR, "portal snapshots (%d) did not account for all active snapshots (%d)", + numPortalSnaps, numActiveSnaps); +} diff --git a/src/include/executor/spi_priv.h b/src/include/executor/spi_priv.h index 6220928bd3..5e6575d2eb 100644 --- a/src/include/executor/spi_priv.h +++ b/src/include/executor/spi_priv.h @@ -92,7 +92,7 @@ typedef struct _SPI_plan int magic; /* should equal _SPI_PLAN_MAGIC */ bool saved; /* saved or unsaved plan? */ bool oneshot; /* one-shot plan? */ - bool no_snapshots; /* let the caller handle the snapshots */ + bool no_snapshots; /* allow nonatomic CALL/DO execution */ List *plancache_list; /* one CachedPlanSource per parsetree */ MemoryContext plancxt; /* Context containing _SPI_plan and data */ int cursor_options; /* Cursor options used for planning */ diff --git a/src/include/tcop/pquery.h b/src/include/tcop/pquery.h index 437642cc72..1385a007ab 100644 --- a/src/include/tcop/pquery.h +++ b/src/include/tcop/pquery.h @@ -17,6 +17,8 @@ #include "nodes/parsenodes.h" #include "utils/portal.h" +struct PlannedStmt; /* avoid including plannodes.h here */ + extern PGDLLIMPORT Portal ActivePortal; @@ -42,4 +44,8 @@ extern uint64 PortalRunFetch(Portal portal, long count, DestReceiver *dest); +extern bool PlannedStmtRequiresSnapshot(struct PlannedStmt *pstmt); + +extern void EnsurePortalSnapshotExists(void); + #endif /* PQUERY_H */ diff --git a/src/include/utils/portal.h b/src/include/utils/portal.h index d41ff2efda..cb40bfa761 100644 --- a/src/include/utils/portal.h +++ b/src/include/utils/portal.h @@ -194,6 +194,14 @@ typedef struct PortalData /* Presentation data, primarily used by the pg_cursors system view */ TimestampTz creation_time; /* time at which this portal was defined */ bool visible; /* include this portal in pg_cursors? */ + + /* + * Outermost ActiveSnapshot for execution of the portal's queries. For + * all but a few utility commands, we require such a snapshot to exist. + * This ensures that TOAST references in query results can be detoasted, + * and helps to reduce thrashing of the process's exposed xmin. + */ + Snapshot portalSnapshot; /* active snapshot, or NULL if none */ } PortalData; /* @@ -237,5 +245,6 @@ extern void PortalCreateHoldStore(Portal portal); extern void PortalHashTableDeleteAll(void); extern bool ThereAreNoReadyPortals(void); extern void HoldPinnedPortals(void); +extern void ForgetPortalSnapshots(void); #endif /* PORTAL_H */ diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c index abb18268cb..b5d20c09be 100644 --- a/src/pl/plpgsql/src/pl_exec.c +++ b/src/pl/plpgsql/src/pl_exec.c @@ -2146,7 +2146,6 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt) PLpgSQL_variable *volatile cur_target = stmt->target; volatile LocalTransactionId before_lxid; LocalTransactionId after_lxid; - volatile bool pushed_active_snap = false; volatile int rc; /* @@ -2184,9 +2183,8 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt) Assert(!expr->expr_simple_expr); /* - * The procedure call could end transactions, which would upset - * the snapshot management in SPI_execute*, so don't let it do it. - * Instead, we set the snapshots ourselves below. + * Tell SPI to allow non-atomic execution. (The field name is a + * legacy choice.) */ plan->no_snapshots = true; @@ -2328,16 +2326,6 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt) before_lxid = MyProc->lxid; - /* - * Set snapshot only for non-read-only procedures, similar to SPI - * behavior. - */ - if (!estate->readonly_func) - { - PushActiveSnapshot(GetTransactionSnapshot()); - pushed_active_snap = true; - } - rc = SPI_execute_plan_with_paramlist(expr->plan, paramLI, estate->readonly_func, 0); } @@ -2372,17 +2360,7 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt) after_lxid = MyProc->lxid; - if (before_lxid == after_lxid) - { - /* - * If we are still in the same transaction after the call, pop the - * snapshot that we might have pushed. (If it's a new transaction, - * then all the snapshots are gone already.) - */ - if (pushed_active_snap) - PopActiveSnapshot(); - } - else + if (before_lxid != after_lxid) { /* * If we are in a new transaction after the call, we need to build new @@ -4946,6 +4924,7 @@ exec_stmt_rollback(PLpgSQL_execstate *estate, PLpgSQL_stmt_rollback *stmt) * * We just parse and execute the statement normally, but we have to do it * without setting a snapshot, for things like SET TRANSACTION. + * XXX spi.c now handles this correctly, so we no longer need a special case. */ static int exec_stmt_set(PLpgSQL_execstate *estate, PLpgSQL_stmt_set *stmt) @@ -4954,10 +4933,7 @@ exec_stmt_set(PLpgSQL_execstate *estate, PLpgSQL_stmt_set *stmt) int rc; if (expr->plan == NULL) - { exec_prepare_plan(estate, expr, 0, true); - expr->plan->no_snapshots = true; - } rc = SPI_execute_plan(expr->plan, NULL, NULL, estate->readonly_func, 0); diff --git a/src/test/isolation/expected/plpgsql-toast.out b/src/test/isolation/expected/plpgsql-toast.out index 4f216b94b6..213bddad4f 100644 --- a/src/test/isolation/expected/plpgsql-toast.out +++ b/src/test/isolation/expected/plpgsql-toast.out @@ -235,3 +235,30 @@ s1: NOTICE: length(r) = 6002 s1: NOTICE: length(r) = 9002 s1: NOTICE: length(r) = 12002 step assign6: <... completed> + +starting permutation: fetch-after-commit +pg_advisory_unlock_all + + +pg_advisory_unlock_all + + +s1: NOTICE: length(t) = 6000 +s1: NOTICE: length(t) = 9000 +s1: NOTICE: length(t) = 12000 +step fetch-after-commit: +do $$ + declare + r record; + t text; + begin + insert into test1 values (2, repeat('bar', 3000)); + insert into test1 values (3, repeat('baz', 4000)); + for r in select test1.a from test1 loop + commit; + select b into t from test1 where a = r.a; + raise notice 'length(t) = %', length(t); + end loop; + end; +$$; + diff --git a/src/test/isolation/specs/plpgsql-toast.spec b/src/test/isolation/specs/plpgsql-toast.spec index d360f8fccb..fb40588d4f 100644 --- a/src/test/isolation/specs/plpgsql-toast.spec +++ b/src/test/isolation/specs/plpgsql-toast.spec @@ -131,6 +131,26 @@ do $$ $$; } +# Check that the results of a query can be detoasted just after committing +# (there's no interaction with VACUUM here) +step "fetch-after-commit" +{ +do $$ + declare + r record; + t text; + begin + insert into test1 values (2, repeat('bar', 3000)); + insert into test1 values (3, repeat('baz', 4000)); + for r in select test1.a from test1 loop + commit; + select b into t from test1 where a = r.a; + raise notice 'length(t) = %', length(t); + end loop; + end; +$$; +} + session "s2" setup { @@ -155,3 +175,4 @@ permutation "lock" "assign3" "vacuum" "unlock" permutation "lock" "assign4" "vacuum" "unlock" permutation "lock" "assign5" "vacuum" "unlock" permutation "lock" "assign6" "vacuum" "unlock" +permutation "fetch-after-commit" diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index a04c03a7e2..4b7d637c70 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 51; +use Test::More tests => 54; # setup @@ -67,6 +67,40 @@ $node_subscriber1->safe_psql('postgres', "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" ); +# Add set of AFTER replica triggers for testing that they are fired +# correctly. This uses a table that records details of all trigger +# activities. Triggers are marked as enabled for a subset of the +# partition tree. +$node_subscriber1->safe_psql( + 'postgres', qq{ +CREATE TABLE sub1_trigger_activity (tgtab text, tgop text, + tgwhen text, tglevel text, olda int, newa int); +CREATE FUNCTION sub1_trigger_activity_func() RETURNS TRIGGER AS \$\$ +BEGIN + IF (TG_OP = 'INSERT') THEN + INSERT INTO public.sub1_trigger_activity + SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, NULL, NEW.a; + ELSIF (TG_OP = 'UPDATE') THEN + INSERT INTO public.sub1_trigger_activity + SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, OLD.a, NEW.a; + END IF; + RETURN NULL; +END; +\$\$ LANGUAGE plpgsql; +CREATE TRIGGER sub1_tab1_log_op_trigger + AFTER INSERT OR UPDATE ON tab1 + FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func(); +ALTER TABLE ONLY tab1 ENABLE REPLICA TRIGGER sub1_tab1_log_op_trigger; +CREATE TRIGGER sub1_tab1_2_log_op_trigger + AFTER INSERT OR UPDATE ON tab1_2 + FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func(); +ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub1_tab1_2_log_op_trigger; +CREATE TRIGGER sub1_tab1_2_2_log_op_trigger + AFTER INSERT OR UPDATE ON tab1_2_2 + FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func(); +ALTER TABLE ONLY tab1_2_2 ENABLE REPLICA TRIGGER sub1_tab1_2_2_log_op_trigger; +}); + # subscriber 2 # # This does not use partitioning. The tables match the leaf tables on @@ -87,6 +121,34 @@ $node_subscriber2->safe_psql('postgres', "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub_all" ); +# Add set of AFTER replica triggers for testing that they are fired +# correctly, using the same method as the first subscriber. +$node_subscriber2->safe_psql( + 'postgres', qq{ +CREATE TABLE sub2_trigger_activity (tgtab text, + tgop text, tgwhen text, tglevel text, olda int, newa int); +CREATE FUNCTION sub2_trigger_activity_func() RETURNS TRIGGER AS \$\$ +BEGIN + IF (TG_OP = 'INSERT') THEN + INSERT INTO public.sub2_trigger_activity + SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, NULL, NEW.a; + ELSIF (TG_OP = 'UPDATE') THEN + INSERT INTO public.sub2_trigger_activity + SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, OLD.a, NEW.a; + END IF; + RETURN NULL; +END; +\$\$ LANGUAGE plpgsql; +CREATE TRIGGER sub2_tab1_log_op_trigger + AFTER INSERT OR UPDATE ON tab1 + FOR EACH ROW EXECUTE PROCEDURE sub2_trigger_activity_func(); +ALTER TABLE ONLY tab1 ENABLE REPLICA TRIGGER sub2_tab1_log_op_trigger; +CREATE TRIGGER sub2_tab1_2_log_op_trigger + AFTER INSERT OR UPDATE ON tab1_2 + FOR EACH ROW EXECUTE PROCEDURE sub2_trigger_activity_func(); +ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub2_tab1_2_log_op_trigger; +}); + # Wait for initial sync of all subscriptions my $synced_query = "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; @@ -130,6 +192,14 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT c, a FROM tab1_2 ORDER BY 1, 2"); is($result, qq(sub2_tab1_2|5), 'inserts into tab1_2 replicated'); +# The AFTER trigger of tab1_2 should have recorded one INSERT. +$result = $node_subscriber2->safe_psql('postgres', + "SELECT * FROM sub2_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;" +); +is( $result, + qq(tab1_2|INSERT|AFTER|ROW||5), + 'check replica insert after trigger applied on subscriber'); + $result = $node_subscriber2->safe_psql('postgres', "SELECT c, a FROM tab1_def ORDER BY 1, 2"); is($result, qq(sub2_tab1_def|0), 'inserts into tab1_def replicated'); @@ -161,6 +231,15 @@ $result = $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab1_2_2 ORDER BY 1"); is($result, qq(6), 'updates of tab1_2 replicated into tab1_2_2 correctly'); +# The AFTER trigger should have recorded the UPDATEs of tab1_2_2. +$result = $node_subscriber1->safe_psql('postgres', + "SELECT * FROM sub1_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;" +); +is( $result, qq(tab1_2_2|INSERT|AFTER|ROW||6 +tab1_2_2|UPDATE|AFTER|ROW|4|6 +tab1_2_2|UPDATE|AFTER|ROW|6|4), + 'check replica update after trigger applied on subscriber'); + $result = $node_subscriber2->safe_psql('postgres', "SELECT c, a FROM tab1_1 ORDER BY 1, 2"); is( $result, qq(sub2_tab1_1|2 @@ -170,6 +249,16 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT c, a FROM tab1_2 ORDER BY 1, 2"); is($result, qq(sub2_tab1_2|6), 'tab1_2 updated'); +# The AFTER trigger should have recorded the updates of tab1_2. +$result = $node_subscriber2->safe_psql('postgres', + "SELECT * FROM sub2_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;" +); +is( $result, qq(tab1_2|INSERT|AFTER|ROW||5 +tab1_2|UPDATE|AFTER|ROW|4|6 +tab1_2|UPDATE|AFTER|ROW|5|6 +tab1_2|UPDATE|AFTER|ROW|6|4), + 'check replica update after trigger applied on subscriber'); + $result = $node_subscriber2->safe_psql('postgres', "SELECT c, a FROM tab1_def ORDER BY 1"); is($result, qq(sub2_tab1_def|0), 'tab1_def unchanged');