From 99cea49d6525e30bc3768e4ffb95377e7584dea1 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Wed, 16 Jun 2021 22:53:31 +0200 Subject: [PATCH] Fix copying data into slots with FDW batching Commit b676ac443b optimized handling of tuple slots with bulk inserts into foreign tables, so that the slots are initialized only once and reused for all batches. The data was however copied into the slots only after the initialization, inserting duplicate values when the slot gets reused. Fixed by moving the ExecCopySlot outside the init branch. The existing postgres_fdw tests failed to catch this due to inserting data into foreign tables without unique indexes, and then checking only the number of inserted rows. This adds a new test with both a unique index and a check of inserted values. Reported-by: Alexander Pyhalov Discussion: https://postgr.es/m/7a8cf8d56b3d18e5c0bccd6cd42d04ac%40postgrespro.ru --- .../postgres_fdw/expected/postgres_fdw.out | 82 ++++++++++++++++++- contrib/postgres_fdw/sql/postgres_fdw.sql | 29 ++++++- src/backend/executor/nodeModifyTable.c | 10 ++- 3 files changed, 115 insertions(+), 6 deletions(-) diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 1fb26639fc..858e5d4a38 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -9761,7 +9761,87 @@ SELECT tableoid::regclass, * FROM batch_cp_upd_test; (2 rows) -- Clean up -DROP TABLE batch_table, batch_cp_upd_test CASCADE; +DROP TABLE batch_table, batch_cp_upd_test, batch_table_p0, batch_table_p1 CASCADE; +-- Use partitioning +ALTER SERVER loopback OPTIONS (ADD batch_size '10'); +CREATE TABLE batch_table ( x int, field1 text, field2 text) PARTITION BY HASH (x); +CREATE TABLE batch_table_p0 (LIKE batch_table); +ALTER TABLE batch_table_p0 ADD CONSTRAINT p0_pkey PRIMARY KEY (x); +CREATE FOREIGN TABLE batch_table_p0f + PARTITION OF batch_table + FOR VALUES WITH (MODULUS 2, REMAINDER 0) + SERVER loopback + OPTIONS (table_name 'batch_table_p0'); +CREATE TABLE batch_table_p1 (LIKE batch_table); +ALTER TABLE batch_table_p1 ADD CONSTRAINT p1_pkey PRIMARY KEY (x); +CREATE FOREIGN TABLE batch_table_p1f + PARTITION OF batch_table + FOR VALUES WITH (MODULUS 2, REMAINDER 1) + SERVER loopback + OPTIONS (table_name 'batch_table_p1'); +INSERT INTO batch_table SELECT i, 'test'||i, 'test'|| i FROM generate_series(1, 50) i; +SELECT COUNT(*) FROM batch_table; + count +------- + 50 +(1 row) + +SELECT * FROM batch_table ORDER BY x; + x | field1 | field2 +----+--------+-------- + 1 | test1 | test1 + 2 | test2 | test2 + 3 | test3 | test3 + 4 | test4 | test4 + 5 | test5 | test5 + 6 | test6 | test6 + 7 | test7 | test7 + 8 | test8 | test8 + 9 | test9 | test9 + 10 | test10 | test10 + 11 | test11 | test11 + 12 | test12 | test12 + 13 | test13 | test13 + 14 | test14 | test14 + 15 | test15 | test15 + 16 | test16 | test16 + 17 | test17 | test17 + 18 | test18 | test18 + 19 | test19 | test19 + 20 | test20 | test20 + 21 | test21 | test21 + 22 | test22 | test22 + 23 | test23 | test23 + 24 | test24 | test24 + 25 | test25 | test25 + 26 | test26 | test26 + 27 | test27 | test27 + 28 | test28 | test28 + 29 | test29 | test29 + 30 | test30 | test30 + 31 | test31 | test31 + 32 | test32 | test32 + 33 | test33 | test33 + 34 | test34 | test34 + 35 | test35 | test35 + 36 | test36 | test36 + 37 | test37 | test37 + 38 | test38 | test38 + 39 | test39 | test39 + 40 | test40 | test40 + 41 | test41 | test41 + 42 | test42 | test42 + 43 | test43 | test43 + 44 | test44 | test44 + 45 | test45 | test45 + 46 | test46 | test46 + 47 | test47 | test47 + 48 | test48 | test48 + 49 | test49 | test49 + 50 | test50 | test50 +(50 rows) + +ALTER SERVER loopback OPTIONS (DROP batch_size); -- =================================================================== -- test asynchronous execution -- =================================================================== diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 8cb2148f1f..34a67d7160 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -3083,7 +3083,34 @@ UPDATE batch_cp_upd_test t SET a = 1 FROM (VALUES (1), (2)) s(a) WHERE t.a = s.a SELECT tableoid::regclass, * FROM batch_cp_upd_test; -- Clean up -DROP TABLE batch_table, batch_cp_upd_test CASCADE; +DROP TABLE batch_table, batch_cp_upd_test, batch_table_p0, batch_table_p1 CASCADE; + +-- Use partitioning +ALTER SERVER loopback OPTIONS (ADD batch_size '10'); + +CREATE TABLE batch_table ( x int, field1 text, field2 text) PARTITION BY HASH (x); + +CREATE TABLE batch_table_p0 (LIKE batch_table); +ALTER TABLE batch_table_p0 ADD CONSTRAINT p0_pkey PRIMARY KEY (x); +CREATE FOREIGN TABLE batch_table_p0f + PARTITION OF batch_table + FOR VALUES WITH (MODULUS 2, REMAINDER 0) + SERVER loopback + OPTIONS (table_name 'batch_table_p0'); + +CREATE TABLE batch_table_p1 (LIKE batch_table); +ALTER TABLE batch_table_p1 ADD CONSTRAINT p1_pkey PRIMARY KEY (x); +CREATE FOREIGN TABLE batch_table_p1f + PARTITION OF batch_table + FOR VALUES WITH (MODULUS 2, REMAINDER 1) + SERVER loopback + OPTIONS (table_name 'batch_table_p1'); + +INSERT INTO batch_table SELECT i, 'test'||i, 'test'|| i FROM generate_series(1, 50) i; +SELECT COUNT(*) FROM batch_table; +SELECT * FROM batch_table ORDER BY x; + +ALTER SERVER loopback OPTIONS (DROP batch_size); -- =================================================================== -- test asynchronous execution diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 88c479c6da..143517bc76 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -717,18 +717,20 @@ ExecInsert(ModifyTableState *mtstate, resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] = MakeSingleTupleTableSlot(tdesc, slot->tts_ops); - ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots], - slot); resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] = MakeSingleTupleTableSlot(tdesc, planSlot->tts_ops); - ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots], - planSlot); /* remember how many batch slots we initialized */ resultRelInfo->ri_NumSlotsInitialized++; } + ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots], + slot); + + ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots], + planSlot); + resultRelInfo->ri_NumSlots++; MemoryContextSwitchTo(oldContext);