Use a more granular approach to follow update chains
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 27 Nov 2013 20:50:33 +0000 (17:50 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 28 Nov 2013 14:54:25 +0000 (11:54 -0300)
Instead of simply checking the KEYS_UPDATED bit, we need to check
whether each lock held on the future version of the tuple conflicts with
the lock we're trying to acquire.

Per bug report #8434 by Tomonari Katsumata

src/backend/access/heap/heapam.c

index 493df9f6ccdb104e69950a68ae55080ce7c1751e..68c14925ed53cd550dfb7752052413bb7da1db77 100644 (file)
@@ -4777,6 +4777,83 @@ l5:
    *result_xmax = new_xmax;
 }
 
+/*
+ * Subroutine for heap_lock_updated_tuple_rec.
+ *
+ * Given an hypothetical multixact status held by the transaction identified
+ * with the given xid, does the current transaction need to wait, fail, or can
+ * it continue if it wanted to acquire a lock of the given mode?  "needwait"
+ * is set to true if waiting is necessary; if it can continue, then
+ * HeapTupleMayBeUpdated is returned.  In case of a conflict, a different
+ * HeapTupleSatisfiesUpdate return code is returned.
+ *
+ * The held status is said to be hypothetical because it might correspond to a
+ * lock held by a single Xid, i.e. not a real MultiXactId; we express it this
+ * way for simplicity of API.
+ */
+static HTSU_Result
+test_lockmode_for_conflict(MultiXactStatus status, TransactionId xid,
+                          LockTupleMode mode, bool *needwait)
+{
+   MultiXactStatus wantedstatus;
+
+   *needwait = false;
+   wantedstatus = get_mxact_status_for_lock(mode, false);
+
+   /*
+    * Note: we *must* check TransactionIdIsInProgress before
+    * TransactionIdDidAbort/Commit; see comment at top of tqual.c for an
+    * explanation.
+    */
+   if (TransactionIdIsCurrentTransactionId(xid))
+   {
+       /*
+        * Updated by our own transaction? Just return failure.  This shouldn't
+        * normally happen.
+        */
+       return HeapTupleSelfUpdated;
+   }
+   else if (TransactionIdIsInProgress(xid))
+   {
+       /*
+        * If the locking transaction is running, what we do depends on whether
+        * the lock modes conflict: if they do, then we must wait for it to
+        * finish; otherwise we can fall through to lock this tuple version
+        * without waiting.
+        */
+       if (DoLockModesConflict(LOCKMODE_from_mxstatus(status),
+                               LOCKMODE_from_mxstatus(wantedstatus)))
+       {
+           *needwait = true;
+       }
+
+       /*
+        * If we set needwait above, then this value doesn't matter; otherwise,
+        * this value signals to caller that it's okay to proceed.
+        */
+       return HeapTupleMayBeUpdated;
+   }
+   else if (TransactionIdDidAbort(xid))
+       return HeapTupleMayBeUpdated;
+   else if (TransactionIdDidCommit(xid))
+   {
+       /*
+        * If the updating transaction committed, what we do depends on whether
+        * the lock modes conflict: if they do, then we must report error to
+        * caller.  But if they don't, we can fall through to lock it.
+        */
+       if (DoLockModesConflict(LOCKMODE_from_mxstatus(status),
+                               LOCKMODE_from_mxstatus(wantedstatus)))
+           /* bummer */
+           return HeapTupleUpdated;
+
+       return HeapTupleMayBeUpdated;
+   }
+
+   /* Not in progress, not aborted, not committed -- must have crashed */
+   return HeapTupleMayBeUpdated;
+}
+
 
 /*
  * Recursive part of heap_lock_updated_tuple
@@ -4794,7 +4871,8 @@ heap_lock_updated_tuple_rec(Relation rel, ItemPointer tid, TransactionId xid,
    Buffer      buf;
    uint16      new_infomask,
                new_infomask2,
-               old_infomask;
+               old_infomask,
+               old_infomask2;
    TransactionId xmax,
                new_xmax;
    TransactionId priorXmax = InvalidTransactionId;
@@ -4836,45 +4914,107 @@ l4:
        }
 
        old_infomask = mytup.t_data->t_infomask;
+       old_infomask2 = mytup.t_data->t_infomask2;
        xmax = HeapTupleHeaderGetRawXmax(mytup.t_data);
 
        /*
-        * If this tuple is updated and the key has been modified (or
-        * deleted), what we do depends on the status of the updating
-        * transaction: if it's live, we sleep until it finishes; if it has
-        * committed, we have to fail (i.e. return HeapTupleUpdated); if it
-        * aborted, we ignore it. For updates that didn't touch the key, we
-        * can just plough ahead.
+        * If this tuple version has been updated or locked by some concurrent
+        * transaction(s), what we do depends on whether our lock mode
+        * conflicts with what those other transactions hold, and also on the
+        * status of them.
         */
-       if (!(old_infomask & HEAP_XMAX_INVALID) &&
-           (mytup.t_data->t_infomask2 & HEAP_KEYS_UPDATED))
+       if (!(old_infomask & HEAP_XMAX_INVALID))
        {
-           TransactionId update_xid;
+           TransactionId rawxmax;
+           bool        needwait;
 
-           /*
-            * Note: we *must* check TransactionIdIsInProgress before
-            * TransactionIdDidAbort/Commit; see comment at top of tqual.c for
-            * an explanation.
-            */
-           update_xid = HeapTupleHeaderGetUpdateXid(mytup.t_data);
-           if (TransactionIdIsCurrentTransactionId(update_xid))
+           rawxmax = HeapTupleHeaderGetRawXmax(mytup.t_data);
+           if (old_infomask & HEAP_XMAX_IS_MULTI)
            {
-               UnlockReleaseBuffer(buf);
-               return HeapTupleSelfUpdated;
-           }
-           else if (TransactionIdIsInProgress(update_xid))
-           {
-               LockBuffer(buf, BUFFER_LOCK_UNLOCK);
-               /* No LockTupleTuplock here -- see heap_lock_updated_tuple */
-               XactLockTableWait(update_xid);
-               goto l4;
+               int     nmembers;
+               int     i;
+               MultiXactMember *members;
+
+               nmembers = GetMultiXactIdMembers(rawxmax, &members, false);
+               for (i = 0; i < nmembers; i++)
+               {
+                   HTSU_Result res;
+
+                   res = test_lockmode_for_conflict(members[i].status,
+                                                    members[i].xid,
+                                                    mode, &needwait);
+
+                   if (needwait)
+                   {
+                       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+                       XactLockTableWait(members[i].xid);
+                       pfree(members);
+                       goto l4;
+                   }
+                   if (res != HeapTupleMayBeUpdated)
+                   {
+                       UnlockReleaseBuffer(buf);
+                       pfree(members);
+                       return res;
+                   }
+               }
+               if (members)
+                   pfree(members);
            }
-           else if (TransactionIdDidAbort(update_xid))
-               ;               /* okay to proceed */
-           else if (TransactionIdDidCommit(update_xid))
+           else
            {
-               UnlockReleaseBuffer(buf);
-               return HeapTupleUpdated;
+               HTSU_Result res;
+               MultiXactStatus status;
+
+               /*
+                * For a non-multi Xmax, we first need to compute the
+                * corresponding MultiXactStatus by using the infomask bits.
+                */
+               if (HEAP_XMAX_IS_LOCKED_ONLY(old_infomask))
+               {
+                   if (HEAP_XMAX_IS_KEYSHR_LOCKED(old_infomask))
+                       status = MultiXactStatusForKeyShare;
+                   else if (HEAP_XMAX_IS_SHR_LOCKED(old_infomask))
+                       status = MultiXactStatusForShare;
+                   else if (HEAP_XMAX_IS_EXCL_LOCKED(old_infomask))
+                   {
+                       if (old_infomask2 & HEAP_KEYS_UPDATED)
+                           status = MultiXactStatusForUpdate;
+                       else
+                           status = MultiXactStatusForNoKeyUpdate;
+                   }
+                   else
+                   {
+                       /*
+                        * LOCK_ONLY present alone (a pg_upgraded tuple
+                        * marked as share-locked in the old cluster) shouldn't
+                        * be seen in the middle of an update chain.
+                        */
+                       elog(ERROR, "invalid lock status in tuple");
+                   }
+               }
+               else
+               {
+                   /* it's an update, but which kind? */
+                   if (old_infomask2 & HEAP_KEYS_UPDATED)
+                       status = MultiXactStatusUpdate;
+                   else
+                       status = MultiXactStatusNoKeyUpdate;
+               }
+
+               res = test_lockmode_for_conflict(status, rawxmax, mode,
+                                                &needwait);
+               if (needwait)
+               {
+                   LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+                   XactLockTableWait(rawxmax);
+                   goto l4;
+               }
+               if (res != HeapTupleMayBeUpdated)
+               {
+                   UnlockReleaseBuffer(buf);
+                   return res;
+               }
            }
        }