Fix copying data into slots with FDW batching
authorTomas Vondra <tomas.vondra@postgresql.org>
Wed, 16 Jun 2021 20:53:31 +0000 (22:53 +0200)
committerTomas Vondra <tomas.vondra@postgresql.org>
Wed, 16 Jun 2021 21:49:25 +0000 (23:49 +0200)
Commit b676ac443b optimized handling of tuple slots with bulk inserts
into foreign tables, so that the slots are initialized only once and
reused for all batches. The data was however copied into the slots only
after the initialization, inserting duplicate values when the slot gets
reused. Fixed by moving the ExecCopySlot outside the init branch.

The existing postgres_fdw tests failed to catch this due to inserting
data into foreign tables without unique indexes, and then checking only
the number of inserted rows. This adds a new test with both a unique
index and a check of inserted values.

Reported-by: Alexander Pyhalov
Discussion: https://postgr.es/m/7a8cf8d56b3d18e5c0bccd6cd42d04ac%40postgrespro.ru

contrib/postgres_fdw/expected/postgres_fdw.out
contrib/postgres_fdw/sql/postgres_fdw.sql
src/backend/executor/nodeModifyTable.c

index 1fb26639fcb9126c296a6442265cbfbc89ff9fee..858e5d4a388ce3a2166f90c626b73537ff8da92c 100644 (file)
@@ -9761,7 +9761,87 @@ SELECT tableoid::regclass, * FROM batch_cp_upd_test;
 (2 rows)
 
 -- Clean up
-DROP TABLE batch_table, batch_cp_upd_test CASCADE;
+DROP TABLE batch_table, batch_cp_upd_test, batch_table_p0, batch_table_p1 CASCADE;
+-- Use partitioning
+ALTER SERVER loopback OPTIONS (ADD batch_size '10');
+CREATE TABLE batch_table ( x int, field1 text, field2 text) PARTITION BY HASH (x);
+CREATE TABLE batch_table_p0 (LIKE batch_table);
+ALTER TABLE batch_table_p0 ADD CONSTRAINT p0_pkey PRIMARY KEY (x);
+CREATE FOREIGN TABLE batch_table_p0f
+   PARTITION OF batch_table
+   FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+   SERVER loopback
+   OPTIONS (table_name 'batch_table_p0');
+CREATE TABLE batch_table_p1 (LIKE batch_table);
+ALTER TABLE batch_table_p1 ADD CONSTRAINT p1_pkey PRIMARY KEY (x);
+CREATE FOREIGN TABLE batch_table_p1f
+   PARTITION OF batch_table
+   FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+   SERVER loopback
+   OPTIONS (table_name 'batch_table_p1');
+INSERT INTO batch_table SELECT i, 'test'||i, 'test'|| i FROM generate_series(1, 50) i;
+SELECT COUNT(*) FROM batch_table;
+ count 
+-------
+    50
+(1 row)
+
+SELECT * FROM batch_table ORDER BY x;
+ x  | field1 | field2 
+----+--------+--------
+  1 | test1  | test1
+  2 | test2  | test2
+  3 | test3  | test3
+  4 | test4  | test4
+  5 | test5  | test5
+  6 | test6  | test6
+  7 | test7  | test7
+  8 | test8  | test8
+  9 | test9  | test9
+ 10 | test10 | test10
+ 11 | test11 | test11
+ 12 | test12 | test12
+ 13 | test13 | test13
+ 14 | test14 | test14
+ 15 | test15 | test15
+ 16 | test16 | test16
+ 17 | test17 | test17
+ 18 | test18 | test18
+ 19 | test19 | test19
+ 20 | test20 | test20
+ 21 | test21 | test21
+ 22 | test22 | test22
+ 23 | test23 | test23
+ 24 | test24 | test24
+ 25 | test25 | test25
+ 26 | test26 | test26
+ 27 | test27 | test27
+ 28 | test28 | test28
+ 29 | test29 | test29
+ 30 | test30 | test30
+ 31 | test31 | test31
+ 32 | test32 | test32
+ 33 | test33 | test33
+ 34 | test34 | test34
+ 35 | test35 | test35
+ 36 | test36 | test36
+ 37 | test37 | test37
+ 38 | test38 | test38
+ 39 | test39 | test39
+ 40 | test40 | test40
+ 41 | test41 | test41
+ 42 | test42 | test42
+ 43 | test43 | test43
+ 44 | test44 | test44
+ 45 | test45 | test45
+ 46 | test46 | test46
+ 47 | test47 | test47
+ 48 | test48 | test48
+ 49 | test49 | test49
+ 50 | test50 | test50
+(50 rows)
+
+ALTER SERVER loopback OPTIONS (DROP batch_size);
 -- ===================================================================
 -- test asynchronous execution
 -- ===================================================================
index 8cb2148f1f6aa991c648602b45745f437dfc17e3..34a67d716052c151c2a2dbb788eae909ee374982 100644 (file)
@@ -3083,7 +3083,34 @@ UPDATE batch_cp_upd_test t SET a = 1 FROM (VALUES (1), (2)) s(a) WHERE t.a = s.a
 SELECT tableoid::regclass, * FROM batch_cp_upd_test;
 
 -- Clean up
-DROP TABLE batch_table, batch_cp_upd_test CASCADE;
+DROP TABLE batch_table, batch_cp_upd_test, batch_table_p0, batch_table_p1 CASCADE;
+
+-- Use partitioning
+ALTER SERVER loopback OPTIONS (ADD batch_size '10');
+
+CREATE TABLE batch_table ( x int, field1 text, field2 text) PARTITION BY HASH (x);
+
+CREATE TABLE batch_table_p0 (LIKE batch_table);
+ALTER TABLE batch_table_p0 ADD CONSTRAINT p0_pkey PRIMARY KEY (x);
+CREATE FOREIGN TABLE batch_table_p0f
+   PARTITION OF batch_table
+   FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+   SERVER loopback
+   OPTIONS (table_name 'batch_table_p0');
+
+CREATE TABLE batch_table_p1 (LIKE batch_table);
+ALTER TABLE batch_table_p1 ADD CONSTRAINT p1_pkey PRIMARY KEY (x);
+CREATE FOREIGN TABLE batch_table_p1f
+   PARTITION OF batch_table
+   FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+   SERVER loopback
+   OPTIONS (table_name 'batch_table_p1');
+
+INSERT INTO batch_table SELECT i, 'test'||i, 'test'|| i FROM generate_series(1, 50) i;
+SELECT COUNT(*) FROM batch_table;
+SELECT * FROM batch_table ORDER BY x;
+
+ALTER SERVER loopback OPTIONS (DROP batch_size);
 
 -- ===================================================================
 -- test asynchronous execution
index 88c479c6da33a07ec57f0d2627e304fbf8a960e8..143517bc760cc3e530aaa713ba87b63bd1bc1f61 100644 (file)
@@ -717,18 +717,20 @@ ExecInsert(ModifyTableState *mtstate,
 
                resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] =
                    MakeSingleTupleTableSlot(tdesc, slot->tts_ops);
-               ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
-                            slot);
 
                resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] =
                    MakeSingleTupleTableSlot(tdesc, planSlot->tts_ops);
-               ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
-                            planSlot);
 
                /* remember how many batch slots we initialized */
                resultRelInfo->ri_NumSlotsInitialized++;
            }
 
+           ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
+                        slot);
+
+           ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
+                        planSlot);
+
            resultRelInfo->ri_NumSlots++;
 
            MemoryContextSwitchTo(oldContext);