diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index d4c8c5692c..ee231cedcf 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -10168,7 +10168,130 @@ SELECT * FROM batch_table ORDER BY x; 50 | test50 | test50 (50 rows) +-- Clean up +DROP TABLE batch_table; +DROP TABLE batch_table_p0; +DROP TABLE batch_table_p1; ALTER SERVER loopback OPTIONS (DROP batch_size); +-- Test that pending inserts are handled properly when needed +CREATE TABLE batch_table (a text, b int); +CREATE FOREIGN TABLE ftable (a text, b int) + SERVER loopback + OPTIONS (table_name 'batch_table', batch_size '2'); +CREATE TABLE ltable (a text, b int); +CREATE FUNCTION ftable_rowcount_trigf() RETURNS trigger LANGUAGE plpgsql AS +$$ +begin + raise notice '%: there are % rows in ftable', + TG_NAME, (SELECT count(*) FROM ftable); + if TG_OP = 'DELETE' then + return OLD; + else + return NEW; + end if; +end; +$$; +CREATE TRIGGER ftable_rowcount_trigger +BEFORE INSERT OR UPDATE OR DELETE ON ltable +FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf(); +WITH t AS ( + INSERT INTO ltable VALUES ('AAA', 42), ('BBB', 42) RETURNING * +) +INSERT INTO ftable SELECT * FROM t; +NOTICE: ftable_rowcount_trigger: there are 0 rows in ftable +NOTICE: ftable_rowcount_trigger: there are 1 rows in ftable +SELECT * FROM ltable; + a | b +-----+---- + AAA | 42 + BBB | 42 +(2 rows) + +SELECT * FROM ftable; + a | b +-----+---- + AAA | 42 + BBB | 42 +(2 rows) + +DELETE FROM ftable; +WITH t AS ( + UPDATE ltable SET b = b + 100 RETURNING * +) +INSERT INTO ftable SELECT * FROM t; +NOTICE: ftable_rowcount_trigger: there are 0 rows in ftable +NOTICE: ftable_rowcount_trigger: there are 1 rows in ftable +SELECT * FROM ltable; + a | b +-----+----- + AAA | 142 + BBB | 142 +(2 rows) + +SELECT * FROM ftable; + a | b +-----+----- + AAA | 142 + BBB | 142 +(2 rows) + +DELETE FROM ftable; +WITH t AS ( + DELETE FROM ltable RETURNING * +) +INSERT INTO ftable SELECT * FROM t; +NOTICE: ftable_rowcount_trigger: there are 0 rows in ftable +NOTICE: ftable_rowcount_trigger: there are 1 rows in ftable +SELECT * FROM ltable; + a | b +---+--- +(0 rows) + +SELECT * FROM ftable; + a | b +-----+----- + AAA | 142 + BBB | 142 +(2 rows) + +DELETE FROM ftable; +-- Clean up +DROP FOREIGN TABLE ftable; +DROP TABLE batch_table; +DROP TRIGGER ftable_rowcount_trigger ON ltable; +DROP TABLE ltable; +CREATE TABLE parent (a text, b int) PARTITION BY LIST (a); +CREATE TABLE batch_table (a text, b int); +CREATE FOREIGN TABLE ftable + PARTITION OF parent + FOR VALUES IN ('AAA') + SERVER loopback + OPTIONS (table_name 'batch_table', batch_size '2'); +CREATE TABLE ltable + PARTITION OF parent + FOR VALUES IN ('BBB'); +CREATE TRIGGER ftable_rowcount_trigger +BEFORE INSERT ON ltable +FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf(); +INSERT INTO parent VALUES ('AAA', 42), ('BBB', 42), ('AAA', 42), ('BBB', 42); +NOTICE: ftable_rowcount_trigger: there are 1 rows in ftable +NOTICE: ftable_rowcount_trigger: there are 2 rows in ftable +SELECT tableoid::regclass, * FROM parent; + tableoid | a | b +----------+-----+---- + ftable | AAA | 42 + ftable | AAA | 42 + ltable | BBB | 42 + ltable | BBB | 42 +(4 rows) + +-- Clean up +DROP FOREIGN TABLE ftable; +DROP TABLE batch_table; +DROP TRIGGER ftable_rowcount_trigger ON ltable; +DROP TABLE ltable; +DROP TABLE parent; +DROP FUNCTION ftable_rowcount_trigf; -- =================================================================== -- test asynchronous execution -- =================================================================== diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 94a7d367d1..258506b01a 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -3205,8 +3205,94 @@ INSERT INTO batch_table SELECT i, 'test'||i, 'test'|| i FROM generate_series(1, SELECT COUNT(*) FROM batch_table; SELECT * FROM batch_table ORDER BY x; +-- Clean up +DROP TABLE batch_table; +DROP TABLE batch_table_p0; +DROP TABLE batch_table_p1; + ALTER SERVER loopback OPTIONS (DROP batch_size); +-- Test that pending inserts are handled properly when needed +CREATE TABLE batch_table (a text, b int); +CREATE FOREIGN TABLE ftable (a text, b int) + SERVER loopback + OPTIONS (table_name 'batch_table', batch_size '2'); +CREATE TABLE ltable (a text, b int); +CREATE FUNCTION ftable_rowcount_trigf() RETURNS trigger LANGUAGE plpgsql AS +$$ +begin + raise notice '%: there are % rows in ftable', + TG_NAME, (SELECT count(*) FROM ftable); + if TG_OP = 'DELETE' then + return OLD; + else + return NEW; + end if; +end; +$$; +CREATE TRIGGER ftable_rowcount_trigger +BEFORE INSERT OR UPDATE OR DELETE ON ltable +FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf(); + +WITH t AS ( + INSERT INTO ltable VALUES ('AAA', 42), ('BBB', 42) RETURNING * +) +INSERT INTO ftable SELECT * FROM t; + +SELECT * FROM ltable; +SELECT * FROM ftable; +DELETE FROM ftable; + +WITH t AS ( + UPDATE ltable SET b = b + 100 RETURNING * +) +INSERT INTO ftable SELECT * FROM t; + +SELECT * FROM ltable; +SELECT * FROM ftable; +DELETE FROM ftable; + +WITH t AS ( + DELETE FROM ltable RETURNING * +) +INSERT INTO ftable SELECT * FROM t; + +SELECT * FROM ltable; +SELECT * FROM ftable; +DELETE FROM ftable; + +-- Clean up +DROP FOREIGN TABLE ftable; +DROP TABLE batch_table; +DROP TRIGGER ftable_rowcount_trigger ON ltable; +DROP TABLE ltable; + +CREATE TABLE parent (a text, b int) PARTITION BY LIST (a); +CREATE TABLE batch_table (a text, b int); +CREATE FOREIGN TABLE ftable + PARTITION OF parent + FOR VALUES IN ('AAA') + SERVER loopback + OPTIONS (table_name 'batch_table', batch_size '2'); +CREATE TABLE ltable + PARTITION OF parent + FOR VALUES IN ('BBB'); +CREATE TRIGGER ftable_rowcount_trigger +BEFORE INSERT ON ltable +FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf(); + +INSERT INTO parent VALUES ('AAA', 42), ('BBB', 42), ('AAA', 42), ('BBB', 42); + +SELECT tableoid::regclass, * FROM parent; + +-- Clean up +DROP FOREIGN TABLE ftable; +DROP TABLE batch_table; +DROP TRIGGER ftable_rowcount_trigger ON ltable; +DROP TABLE ltable; +DROP TABLE parent; +DROP FUNCTION ftable_rowcount_trigf; + -- =================================================================== -- test asynchronous execution -- =================================================================== diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index b3ce4bae53..83d21d612b 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1257,6 +1257,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo, resultRelInfo->ri_ChildToRootMap = NULL; resultRelInfo->ri_ChildToRootMapValid = false; resultRelInfo->ri_CopyMultiInsertBuffer = NULL; + resultRelInfo->ri_ModifyTableState = NULL; } /* diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c index 606c920b06..216da08d0c 100644 --- a/src/backend/executor/execPartition.c +++ b/src/backend/executor/execPartition.c @@ -934,6 +934,13 @@ ExecInitRoutingInfo(ModifyTableState *mtstate, Assert(partRelInfo->ri_BatchSize >= 1); + /* + * If doing batch insert, setup back-link so we can easily find the + * mtstate again. + */ + if (partRelInfo->ri_BatchSize > 1) + partRelInfo->ri_ModifyTableState = mtstate; + partRelInfo->ri_CopyMultiInsertBuffer = NULL; /* diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index ad11392b99..64a8c2e593 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -127,6 +127,7 @@ CreateExecutorState(void) estate->es_result_relations = NULL; estate->es_opened_result_relations = NIL; estate->es_tuple_routing_result_relations = NIL; + estate->es_insert_pending_result_relations = NIL; estate->es_trig_target_relations = NIL; estate->es_param_list_info = NULL; diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 37ba4755cb..ee0f042204 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -67,6 +67,7 @@ static void ExecBatchInsert(ModifyTableState *mtstate, int numSlots, EState *estate, bool canSetTag); +static void ExecPendingInserts(EState *estate); static bool ExecOnConflictUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer conflictTid, @@ -645,6 +646,10 @@ ExecInsert(ModifyTableState *mtstate, if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_before_row) { + /* Flush any pending inserts, so rows are visible to the triggers */ + if (estate->es_insert_pending_result_relations != NIL) + ExecPendingInserts(estate); + if (!ExecBRInsertTriggers(estate, resultRelInfo, slot)) return NULL; /* "do nothing" */ } @@ -678,6 +683,8 @@ ExecInsert(ModifyTableState *mtstate, */ if (resultRelInfo->ri_BatchSize > 1) { + bool flushed = false; + /* * When we've reached the desired batch size, perform the * insertion. @@ -690,6 +697,7 @@ ExecInsert(ModifyTableState *mtstate, resultRelInfo->ri_NumSlots, estate, canSetTag); resultRelInfo->ri_NumSlots = 0; + flushed = true; } oldContext = MemoryContextSwitchTo(estate->es_query_cxt); @@ -732,6 +740,24 @@ ExecInsert(ModifyTableState *mtstate, ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots], planSlot); + /* + * If these are the first tuples stored in the buffers, add the + * target rel to the es_insert_pending_result_relations list, + * except in the case where flushing was done above, in which case + * the target rel would already have been added to the list, so no + * need to do this. + */ + if (resultRelInfo->ri_NumSlots == 0 && !flushed) + { + Assert(!list_member_ptr(estate->es_insert_pending_result_relations, + resultRelInfo)); + estate->es_insert_pending_result_relations = + lappend(estate->es_insert_pending_result_relations, + resultRelInfo); + } + Assert(list_member_ptr(estate->es_insert_pending_result_relations, + resultRelInfo)); + resultRelInfo->ri_NumSlots++; MemoryContextSwitchTo(oldContext); @@ -1034,9 +1060,8 @@ ExecBatchInsert(ModifyTableState *mtstate, slot = rslots[i]; /* - * AFTER ROW Triggers or RETURNING expressions might reference the - * tableoid column, so (re-)initialize tts_tableOid before evaluating - * them. + * AFTER ROW Triggers might reference the tableoid column, so + * (re-)initialize tts_tableOid before evaluating them. */ slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); @@ -1107,6 +1132,10 @@ ExecDelete(ModifyTableState *mtstate, { bool dodelete; + /* Flush any pending inserts, so rows are visible to the triggers */ + if (estate->es_insert_pending_result_relations != NIL) + ExecPendingInserts(estate); + dodelete = ExecBRDeleteTriggers(estate, epqstate, resultRelInfo, tupleid, oldtuple, epqreturnslot); @@ -1410,6 +1439,32 @@ ldelete:; return NULL; } +/* + * ExecPendingInserts -- flushes all pending inserts to the foreign tables + */ +static void +ExecPendingInserts(EState *estate) +{ + ListCell *lc; + + foreach(lc, estate->es_insert_pending_result_relations) + { + ResultRelInfo *resultRelInfo = (ResultRelInfo *) lfirst(lc); + ModifyTableState *mtstate = resultRelInfo->ri_ModifyTableState; + + Assert(mtstate); + ExecBatchInsert(mtstate, resultRelInfo, + resultRelInfo->ri_Slots, + resultRelInfo->ri_PlanSlots, + resultRelInfo->ri_NumSlots, + estate, mtstate->canSetTag); + resultRelInfo->ri_NumSlots = 0; + } + + list_free(estate->es_insert_pending_result_relations); + estate->es_insert_pending_result_relations = NIL; +} + /* * ExecCrossPartitionUpdate --- Move an updated tuple to another partition. * @@ -1634,6 +1689,10 @@ ExecUpdate(ModifyTableState *mtstate, if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_before_row) { + /* Flush any pending inserts, so rows are visible to the triggers */ + if (estate->es_insert_pending_result_relations != NIL) + ExecPendingInserts(estate); + if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo, tupleid, oldtuple, slot)) return NULL; /* "do nothing" */ @@ -2361,9 +2420,6 @@ ExecModifyTable(PlanState *pstate) ItemPointerData tuple_ctid; HeapTupleData oldtupdata; HeapTuple oldtuple; - PartitionTupleRouting *proute = node->mt_partition_tuple_routing; - List *relinfos = NIL; - ListCell *lc; CHECK_FOR_INTERRUPTS(); @@ -2620,21 +2676,8 @@ ExecModifyTable(PlanState *pstate) /* * Insert remaining tuples for batch insert. */ - if (proute) - relinfos = estate->es_tuple_routing_result_relations; - else - relinfos = estate->es_opened_result_relations; - - foreach(lc, relinfos) - { - resultRelInfo = lfirst(lc); - if (resultRelInfo->ri_NumSlots > 0) - ExecBatchInsert(node, resultRelInfo, - resultRelInfo->ri_Slots, - resultRelInfo->ri_PlanSlots, - resultRelInfo->ri_NumSlots, - estate, node->canSetTag); - } + if (estate->es_insert_pending_result_relations != NIL) + ExecPendingInserts(estate); /* * We're done, but fire AFTER STATEMENT triggers before exiting. @@ -3140,6 +3183,13 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) } else resultRelInfo->ri_BatchSize = 1; + + /* + * If doing batch insert, setup back-link so we can easily find the + * mtstate again. + */ + if (resultRelInfo->ri_BatchSize > 1) + resultRelInfo->ri_ModifyTableState = mtstate; } /* diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 3dfac5bd5f..71d0cf44dd 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -524,6 +524,9 @@ typedef struct ResultRelInfo /* for use by copyfrom.c when performing multi-inserts */ struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer; + + /* for use by nodeModifyTable.c when performing batch-inserts */ + struct ModifyTableState *ri_ModifyTableState; } ResultRelInfo; /* ---------------- @@ -645,6 +648,12 @@ typedef struct EState int es_jit_flags; struct JitContext *es_jit; struct JitInstrumentation *es_jit_worker_instr; + + /* + * The following list contains ResultRelInfos for foreign tables on which + * batch-inserts are to be executed. + */ + List *es_insert_pending_result_relations; } EState;