Fix EvalPlanQual rechecking during MERGE.
authorDean Rasheed <dean.a.rasheed@gmail.com>
Sat, 30 Sep 2023 09:52:21 +0000 (10:52 +0100)
committerDean Rasheed <dean.a.rasheed@gmail.com>
Sat, 30 Sep 2023 09:52:21 +0000 (10:52 +0100)
Under some circumstances, concurrent MERGE operations could lead to
inconsistent results, that varied according the plan chosen. This was
caused by a lack of rowmarks on the source relation, which meant that
EvalPlanQual rechecking was not guaranteed to return the same source
tuples when re-running the join query.

Fix by ensuring that preprocess_rowmarks() sets up PlanRowMarks for
all non-target relations used in MERGE, in the same way that it does
for UPDATE and DELETE.

Per bug #18103. Back-patch to v15, where MERGE was introduced.

Dean Rasheed, reviewed by Richard Guo.

Discussion: https://postgr.es/m/18103-c4386baab8e355e3%40postgresql.org

src/backend/executor/README
src/backend/executor/nodeModifyTable.c
src/backend/optimizer/plan/planner.c
src/include/nodes/execnodes.h
src/include/nodes/plannodes.h
src/test/isolation/expected/merge-join.out [new file with mode: 0644]
src/test/isolation/isolation_schedule
src/test/isolation/specs/merge-join.spec [new file with mode: 0644]
src/test/regress/expected/merge.out
src/test/regress/expected/with.out

index 17775a49e2681dc861ccc1709575a9ffb3eb8f25..642d63be613b6637d1b26c4707569bb8a360d12d 100644 (file)
@@ -26,7 +26,7 @@ unnecessarily (for example, Sort does not rescan its input if no parameters
 of the input have changed, since it can just reread its stored sorted data).
 
 For a SELECT, it is only necessary to deliver the top-level result tuples
-to the client.  For INSERT/UPDATE/DELETE, the actual table modification
+to the client.  For INSERT/UPDATE/DELETE/MERGE, the actual table modification
 operations happen in a top-level ModifyTable plan node.  If the query
 includes a RETURNING clause, the ModifyTable node delivers the computed
 RETURNING rows as output, otherwise it returns nothing.  Handling INSERT
@@ -353,8 +353,8 @@ EvalPlanQual (READ COMMITTED Update Checking)
 For simple SELECTs, the executor need only pay attention to tuples that are
 valid according to the snapshot seen by the current transaction (ie, they
 were inserted by a previously committed transaction, and not deleted by any
-previously committed transaction).  However, for UPDATE and DELETE it is not
-cool to modify or delete a tuple that's been modified by an open or
+previously committed transaction).  However, for UPDATE, DELETE, and MERGE it
+is not cool to modify or delete a tuple that's been modified by an open or
 concurrently-committed transaction.  If we are running in SERIALIZABLE
 isolation level then we just raise an error when this condition is seen to
 occur.  In READ COMMITTED isolation level, we must work a lot harder.
@@ -378,14 +378,14 @@ we're doing UPDATE).  If no tuple is returned, then the modified tuple(s)
 fail the quals, so we ignore the current result tuple and continue the
 original query.
 
-In UPDATE/DELETE, only the target relation needs to be handled this way.
+In UPDATE/DELETE/MERGE, only the target relation needs to be handled this way.
 In SELECT FOR UPDATE, there may be multiple relations flagged FOR UPDATE,
 so we obtain lock on the current tuple version in each such relation before
 executing the recheck.
 
 It is also possible that there are relations in the query that are not
-to be locked (they are neither the UPDATE/DELETE target nor specified to
-be locked in SELECT FOR UPDATE/SHARE).  When re-running the test query
+to be locked (they are neither the UPDATE/DELETE/MERGE target nor specified
+to be locked in SELECT FOR UPDATE/SHARE).  When re-running the test query
 we want to use the same rows from these relations that were joined to
 the locked rows.  For ordinary relations this can be implemented relatively
 cheaply by including the row TID in the join outputs and re-fetching that
index d21a178ad5a061e704b3664ea97d994bb76a91fd..f62d28ac6084a7e04bf89e6c82f46394891d3099 100644 (file)
@@ -4284,9 +4284,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 
        /*
         * If we have any secondary relations in an UPDATE or DELETE, they need to
-        * be treated like non-locked relations in SELECT FOR UPDATE, ie, the
-        * EvalPlanQual mechanism needs to be told about them.  Locate the
-        * relevant ExecRowMarks.
+        * be treated like non-locked relations in SELECT FOR UPDATE, i.e., the
+        * EvalPlanQual mechanism needs to be told about them.  This also goes for
+        * the source relations in a MERGE.  Locate the relevant ExecRowMarks.
         */
        arowmarks = NIL;
        foreach(l, node->rowMarks)
index 44efb1f4ebcc7807873bf443aa01f9093b2f6956..aafef24cf14ecbdbbbdee18daafed6a816a9d7e1 100644 (file)
@@ -2262,11 +2262,12 @@ preprocess_rowmarks(PlannerInfo *root)
        else
        {
                /*
-                * We only need rowmarks for UPDATE, DELETE, or FOR [KEY]
+                * We only need rowmarks for UPDATE, DELETE, MERGE, or FOR [KEY]
                 * UPDATE/SHARE.
                 */
                if (parse->commandType != CMD_UPDATE &&
-                       parse->commandType != CMD_DELETE)
+                       parse->commandType != CMD_DELETE &&
+                       parse->commandType != CMD_MERGE)
                        return;
        }
 
index cb714f4a196b8b44310064d29ea49b632a986358..869465d6f803a78417ffc325de4cc8c432362903 100644 (file)
@@ -721,8 +721,8 @@ typedef struct EState
  * ExecRowMark -
  *        runtime representation of FOR [KEY] UPDATE/SHARE clauses
  *
- * When doing UPDATE, DELETE, or SELECT FOR [KEY] UPDATE/SHARE, we will have an
- * ExecRowMark for each non-target relation in the query (except inheritance
+ * When doing UPDATE/DELETE/MERGE/SELECT FOR [KEY] UPDATE/SHARE, we will have
+ * an ExecRowMark for each non-target relation in the query (except inheritance
  * parent RTEs, which can be ignored at runtime).  Virtual relations such as
  * subqueries-in-FROM will have an ExecRowMark with relation == NULL.  See
  * PlanRowMark for details about most of the fields.  In addition to fields
index 1b787fe03184abdee3df8c32d6c0e4e01e4e9428..8cafbf3f8a4394d78e9a38917ad532f22427e44b 100644 (file)
@@ -1309,7 +1309,7 @@ typedef struct Limit
  * doing a separate remote query to lock each selected row is usually pretty
  * unappealing, so early locking remains a credible design choice for FDWs.
  *
- * When doing UPDATE, DELETE, or SELECT FOR UPDATE/SHARE, we have to uniquely
+ * When doing UPDATE/DELETE/MERGE/SELECT FOR UPDATE/SHARE, we have to uniquely
  * identify all the source rows, not only those from the target relations, so
  * that we can perform EvalPlanQual rechecking at need.  For plain tables we
  * can just fetch the TID, much as for a target relation; this case is
@@ -1338,7 +1338,7 @@ typedef enum RowMarkType
  * PlanRowMark -
  *        plan-time representation of FOR [KEY] UPDATE/SHARE clauses
  *
- * When doing UPDATE, DELETE, or SELECT FOR UPDATE/SHARE, we create a separate
+ * When doing UPDATE/DELETE/MERGE/SELECT FOR UPDATE/SHARE, we create a separate
  * PlanRowMark node for each non-target relation in the query.  Relations that
  * are not specified as FOR UPDATE/SHARE are marked ROW_MARK_REFERENCE (if
  * regular tables or supported foreign tables) or ROW_MARK_COPY (if not).
diff --git a/src/test/isolation/expected/merge-join.out b/src/test/isolation/expected/merge-join.out
new file mode 100644 (file)
index 0000000..57f048c
--- /dev/null
@@ -0,0 +1,148 @@
+Parsed test spec with 2 sessions
+
+starting permutation: b1 m1 s1 c1 b2 m2 s2 c2
+step b1: BEGIN ISOLATION LEVEL READ COMMITTED;
+step m1: MERGE INTO tgt USING src ON tgt.id = src.id
+             WHEN MATCHED THEN UPDATE SET val = src.val
+             WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val);
+step s1: SELECT * FROM tgt;
+id|val
+--+---
+ 1| 10
+ 2| 20
+ 3| 30
+(3 rows)
+
+step c1: COMMIT;
+step b2: BEGIN ISOLATION LEVEL READ COMMITTED;
+step m2: MERGE INTO tgt USING src ON tgt.id = src.id
+             WHEN MATCHED THEN UPDATE SET val = src.val
+             WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val);
+step s2: SELECT * FROM tgt;
+id|val
+--+---
+ 1| 10
+ 2| 20
+ 3| 30
+(3 rows)
+
+step c2: COMMIT;
+
+starting permutation: b1 b2 m1 hj ex m2 c1 c2 s1
+step b1: BEGIN ISOLATION LEVEL READ COMMITTED;
+step b2: BEGIN ISOLATION LEVEL READ COMMITTED;
+step m1: MERGE INTO tgt USING src ON tgt.id = src.id
+             WHEN MATCHED THEN UPDATE SET val = src.val
+             WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val);
+step hj: SET LOCAL enable_mergejoin = off; SET LOCAL enable_nestloop = off;
+step ex: EXPLAIN (verbose, costs off)
+           MERGE INTO tgt USING src ON tgt.id = src.id
+             WHEN MATCHED THEN UPDATE SET val = src.val
+             WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val);
+QUERY PLAN                                         
+---------------------------------------------------
+Merge on public.tgt                                
+  ->  Hash Left Join                               
+        Output: tgt.ctid, src.val, src.id, src.ctid
+        Inner Unique: true                         
+        Hash Cond: (src.id = tgt.id)               
+        ->  Seq Scan on public.src                 
+              Output: src.val, src.id, src.ctid    
+        ->  Hash                                   
+              Output: tgt.ctid, tgt.id             
+              ->  Seq Scan on public.tgt           
+                    Output: tgt.ctid, tgt.id       
+(11 rows)
+
+step m2: MERGE INTO tgt USING src ON tgt.id = src.id
+             WHEN MATCHED THEN UPDATE SET val = src.val
+             WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val); <waiting ...>
+step c1: COMMIT;
+step m2: <... completed>
+step c2: COMMIT;
+step s1: SELECT * FROM tgt;
+id|val
+--+---
+ 1| 10
+ 2| 20
+ 3| 30
+(3 rows)
+
+
+starting permutation: b1 b2 m1 mj ex m2 c1 c2 s1
+step b1: BEGIN ISOLATION LEVEL READ COMMITTED;
+step b2: BEGIN ISOLATION LEVEL READ COMMITTED;
+step m1: MERGE INTO tgt USING src ON tgt.id = src.id
+             WHEN MATCHED THEN UPDATE SET val = src.val
+             WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val);
+step mj: SET LOCAL enable_hashjoin = off; SET LOCAL enable_nestloop = off;
+step ex: EXPLAIN (verbose, costs off)
+           MERGE INTO tgt USING src ON tgt.id = src.id
+             WHEN MATCHED THEN UPDATE SET val = src.val
+             WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val);
+QUERY PLAN                                         
+---------------------------------------------------
+Merge on public.tgt                                
+  ->  Merge Left Join                              
+        Output: tgt.ctid, src.val, src.id, src.ctid
+        Inner Unique: true                         
+        Merge Cond: (src.id = tgt.id)              
+        ->  Index Scan using src_pkey on public.src
+              Output: src.val, src.id, src.ctid    
+        ->  Index Scan using tgt_pkey on public.tgt
+              Output: tgt.ctid, tgt.id             
+(9 rows)
+
+step m2: MERGE INTO tgt USING src ON tgt.id = src.id
+             WHEN MATCHED THEN UPDATE SET val = src.val
+             WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val); <waiting ...>
+step c1: COMMIT;
+step m2: <... completed>
+step c2: COMMIT;
+step s1: SELECT * FROM tgt;
+id|val
+--+---
+ 1| 10
+ 2| 20
+ 3| 30
+(3 rows)
+
+
+starting permutation: b1 b2 m1 nl ex m2 c1 c2 s1
+step b1: BEGIN ISOLATION LEVEL READ COMMITTED;
+step b2: BEGIN ISOLATION LEVEL READ COMMITTED;
+step m1: MERGE INTO tgt USING src ON tgt.id = src.id
+             WHEN MATCHED THEN UPDATE SET val = src.val
+             WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val);
+step nl: SET LOCAL enable_hashjoin = off; SET LOCAL enable_mergejoin = off;
+step ex: EXPLAIN (verbose, costs off)
+           MERGE INTO tgt USING src ON tgt.id = src.id
+             WHEN MATCHED THEN UPDATE SET val = src.val
+             WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val);
+QUERY PLAN                                         
+---------------------------------------------------
+Merge on public.tgt                                
+  ->  Nested Loop Left Join                        
+        Output: tgt.ctid, src.val, src.id, src.ctid
+        Inner Unique: true                         
+        ->  Seq Scan on public.src                 
+              Output: src.val, src.id, src.ctid    
+        ->  Index Scan using tgt_pkey on public.tgt
+              Output: tgt.ctid, tgt.id             
+              Index Cond: (tgt.id = src.id)        
+(9 rows)
+
+step m2: MERGE INTO tgt USING src ON tgt.id = src.id
+             WHEN MATCHED THEN UPDATE SET val = src.val
+             WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val); <waiting ...>
+step c1: COMMIT;
+step m2: <... completed>
+step c2: COMMIT;
+step s1: SELECT * FROM tgt;
+id|val
+--+---
+ 1| 10
+ 2| 20
+ 3| 30
+(3 rows)
+
index 4fc56ae99c93425edaf576f1cccdf6b21ba68d7c..b2be88ead1d2597c4b54805930ba8e9b310927b7 100644 (file)
@@ -51,6 +51,7 @@ test: merge-insert-update
 test: merge-delete
 test: merge-update
 test: merge-match-recheck
+test: merge-join
 test: delete-abort-savept
 test: delete-abort-savept-2
 test: aborted-keyrevoke
diff --git a/src/test/isolation/specs/merge-join.spec b/src/test/isolation/specs/merge-join.spec
new file mode 100644 (file)
index 0000000..e33a02c
--- /dev/null
@@ -0,0 +1,45 @@
+# MERGE JOIN
+#
+# This test checks the EPQ recheck mechanism during MERGE when joining to a
+# source table using different join methods, per bug #18103
+
+setup
+{
+  CREATE TABLE src (id int PRIMARY KEY, val int);
+  CREATE TABLE tgt (id int PRIMARY KEY, val int);
+  INSERT INTO src SELECT x, x*10 FROM generate_series(1,3) g(x);
+  INSERT INTO tgt SELECT x, x FROM generate_series(1,3) g(x);
+}
+
+teardown
+{
+  DROP TABLE src, tgt;
+}
+
+session s1
+step b1  { BEGIN ISOLATION LEVEL READ COMMITTED; }
+step m1  { MERGE INTO tgt USING src ON tgt.id = src.id
+             WHEN MATCHED THEN UPDATE SET val = src.val
+             WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val); }
+step s1  { SELECT * FROM tgt; }
+step c1  { COMMIT; }
+
+session s2
+step b2  { BEGIN ISOLATION LEVEL READ COMMITTED; }
+step hj  { SET LOCAL enable_mergejoin = off; SET LOCAL enable_nestloop = off; }
+step mj  { SET LOCAL enable_hashjoin = off; SET LOCAL enable_nestloop = off; }
+step nl  { SET LOCAL enable_hashjoin = off; SET LOCAL enable_mergejoin = off; }
+step ex  { EXPLAIN (verbose, costs off)
+           MERGE INTO tgt USING src ON tgt.id = src.id
+             WHEN MATCHED THEN UPDATE SET val = src.val
+             WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val); }
+step m2  { MERGE INTO tgt USING src ON tgt.id = src.id
+             WHEN MATCHED THEN UPDATE SET val = src.val
+             WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val); }
+step s2  { SELECT * FROM tgt; }
+step c2  { COMMIT; }
+
+permutation b1 m1 s1 c1 b2 m2 s2 c2
+permutation b1 b2 m1 hj ex m2 c1 c2 s1
+permutation b1 b2 m1 mj ex m2 c1 c2 s1
+permutation b1 b2 m1 nl ex m2 c1 c2 s1
index 133d42117c0891286f3c2d0a6b0eeebd6bb0f5aa..28a6d0ba98bdc9c09611806ffb94a0a37e897def 100644 (file)
@@ -1829,11 +1829,11 @@ MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid
  Merge on public.pa_target t
    Merge on public.pa_targetp t_1
    ->  Hash Left Join
-         Output: s.sid, t_1.tableoid, t_1.ctid
+         Output: s.sid, s.ctid, t_1.tableoid, t_1.ctid
          Inner Unique: true
          Hash Cond: (s.sid = t_1.tid)
          ->  Seq Scan on public.pa_source s
-               Output: s.sid
+               Output: s.sid, s.ctid
          ->  Hash
                Output: t_1.tid, t_1.tableoid, t_1.ctid
                ->  Seq Scan on public.pa_targetp t_1
@@ -1859,11 +1859,11 @@ MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid
 --------------------------------------------
  Merge on public.pa_target t
    ->  Hash Left Join
-         Output: s.sid, t.ctid
+         Output: s.sid, s.ctid, t.ctid
          Inner Unique: true
          Hash Cond: (s.sid = t.tid)
          ->  Seq Scan on public.pa_source s
-               Output: s.sid
+               Output: s.sid, s.ctid
          ->  Hash
                Output: t.tid, t.ctid
                ->  Result
index 88e57a2c8774fea8e699681dc84317fd4c34c727..a01efa50a51cbced383cba8c2830453598db9161 100644 (file)
@@ -3015,28 +3015,30 @@ WITH cte_basic AS MATERIALIZED (SELECT 1 a, 'cte_basic val' b)
 MERGE INTO m USING (select 0 k, 'merge source SubPlan' v offset 0) o ON m.k=o.k
 WHEN MATCHED THEN UPDATE SET v = (SELECT b || ' merge update' FROM cte_basic WHERE cte_basic.a = m.k LIMIT 1)
 WHEN NOT MATCHED THEN INSERT VALUES(o.k, o.v);
-                           QUERY PLAN                           
-----------------------------------------------------------------
+                            QUERY PLAN                             
+-------------------------------------------------------------------
  Merge on public.m
    CTE cte_basic
      ->  Result
            Output: 1, 'cte_basic val'::text
    ->  Hash Right Join
-         Output: m.ctid, (0), ('merge source SubPlan'::text)
-         Hash Cond: (m.k = (0))
+         Output: m.ctid, o.k, o.v, o.*
+         Hash Cond: (m.k = o.k)
          ->  Seq Scan on public.m
                Output: m.ctid, m.k
          ->  Hash
-               Output: (0), ('merge source SubPlan'::text)
-               ->  Result
-                     Output: 0, 'merge source SubPlan'::text
+               Output: o.k, o.v, o.*
+               ->  Subquery Scan on o
+                     Output: o.k, o.v, o.*
+                     ->  Result
+                           Output: 0, 'merge source SubPlan'::text
    SubPlan 2
      ->  Limit
            Output: ((cte_basic.b || ' merge update'::text))
            ->  CTE Scan on cte_basic
                  Output: (cte_basic.b || ' merge update'::text)
                  Filter: (cte_basic.a = m.k)
-(19 rows)
+(21 rows)
 
 -- InitPlan
 WITH cte_init AS MATERIALIZED (SELECT 1 a, 'cte_init val' b)
@@ -3056,8 +3058,8 @@ WITH cte_init AS MATERIALIZED (SELECT 1 a, 'cte_init val' b)
 MERGE INTO m USING (select 1 k, 'merge source InitPlan' v offset 0) o ON m.k=o.k
 WHEN MATCHED THEN UPDATE SET v = (SELECT b || ' merge update' FROM cte_init WHERE a = 1 LIMIT 1)
 WHEN NOT MATCHED THEN INSERT VALUES(o.k, o.v);
-                          QUERY PLAN                           
----------------------------------------------------------------
+                             QUERY PLAN                             
+--------------------------------------------------------------------
  Merge on public.m
    CTE cte_init
      ->  Result
@@ -3069,15 +3071,17 @@ WHEN NOT MATCHED THEN INSERT VALUES(o.k, o.v);
                  Output: (cte_init.b || ' merge update'::text)
                  Filter: (cte_init.a = 1)
    ->  Hash Right Join
-         Output: m.ctid, (1), ('merge source InitPlan'::text)
-         Hash Cond: (m.k = (1))
+         Output: m.ctid, o.k, o.v, o.*
+         Hash Cond: (m.k = o.k)
          ->  Seq Scan on public.m
                Output: m.ctid, m.k
          ->  Hash
-               Output: (1), ('merge source InitPlan'::text)
-               ->  Result
-                     Output: 1, 'merge source InitPlan'::text
-(19 rows)
+               Output: o.k, o.v, o.*
+               ->  Subquery Scan on o
+                     Output: o.k, o.v, o.*
+                     ->  Result
+                           Output: 1, 'merge source InitPlan'::text
+(21 rows)
 
 -- MERGE source comes from CTE:
 WITH merge_source_cte AS MATERIALIZED (SELECT 15 a, 'merge_source_cte val' b)
@@ -3111,14 +3115,14 @@ WHEN NOT MATCHED THEN INSERT VALUES(o.a, o.b || (SELECT merge_source_cte.*::text
      ->  CTE Scan on merge_source_cte merge_source_cte_2
            Output: ((merge_source_cte_2.*)::text || ' merge insert'::text)
    ->  Hash Right Join
-         Output: m.ctid, merge_source_cte.a, merge_source_cte.b
+         Output: m.ctid, merge_source_cte.a, merge_source_cte.b, merge_source_cte.*
          Hash Cond: (m.k = merge_source_cte.a)
          ->  Seq Scan on public.m
                Output: m.ctid, m.k
          ->  Hash
-               Output: merge_source_cte.a, merge_source_cte.b
+               Output: merge_source_cte.a, merge_source_cte.b, merge_source_cte.*
                ->  CTE Scan on merge_source_cte
-                     Output: merge_source_cte.a, merge_source_cte.b
+                     Output: merge_source_cte.a, merge_source_cte.b, merge_source_cte.*
 (20 rows)
 
 DROP TABLE m;