Improve table locking behavior in the face of current DDL.
authorRobert Haas <rhaas@postgresql.org>
Wed, 30 Nov 2011 15:12:27 +0000 (10:12 -0500)
committerRobert Haas <rhaas@postgresql.org>
Wed, 30 Nov 2011 15:27:00 +0000 (10:27 -0500)
In the previous coding, callers were faced with an awkward choice:
look up the name, do permissions checks, and then lock the table; or
look up the name, lock the table, and then do permissions checks.
The first choice was wrong because the results of the name lookup
and permissions checks might be out-of-date by the time the table
lock was acquired, while the second allowed a user with no privileges
to interfere with access to a table by users who do have privileges
(e.g. if a malicious backend queues up for an AccessExclusiveLock on
a table on which AccessShareLock is already held, further attempts
to access the table will be blocked until the AccessExclusiveLock
is obtained and the malicious backend's transaction rolls back).

To fix, allow callers of RangeVarGetRelid() to pass a callback which
gets executed after performing the name lookup but before acquiring
the relation lock.  If the name lookup is retried (because
invalidation messages are received), the callback will be re-executed
as well, so we get the best of both worlds.  RangeVarGetRelid() is
renamed to RangeVarGetRelidExtended(); callers not wishing to supply
a callback can continue to invoke it as RangeVarGetRelid(), which is
now a macro.  Since the only one caller that uses nowait = true now
passes a callback anyway, the RangeVarGetRelid() macro defaults nowait
as well.  The callback can also be used for supplemental locking - for
example, REINDEX INDEX needs to acquire the table lock before the index
lock to reduce deadlock possibilities.

There's a lot more work to be done here to fix all the cases where this
can be a problem, but this commit provides the general infrastructure
and fixes the following specific cases: REINDEX INDEX, REINDEX TABLE,
LOCK TABLE, and and DROP TABLE/INDEX/SEQUENCE/VIEW/FOREIGN TABLE.

Per discussion with Noah Misch and Alvaro Herrera.

21 files changed:
src/backend/access/heap/heapam.c
src/backend/catalog/aclchk.c
src/backend/catalog/index.c
src/backend/catalog/namespace.c
src/backend/commands/alter.c
src/backend/commands/indexcmds.c
src/backend/commands/lockcmds.c
src/backend/commands/sequence.c
src/backend/commands/tablecmds.c
src/backend/commands/trigger.c
src/backend/commands/vacuum.c
src/backend/parser/parse_relation.c
src/backend/parser/parse_type.c
src/backend/rewrite/rewriteDefine.c
src/backend/tcop/utility.c
src/backend/utils/adt/acl.c
src/backend/utils/adt/regproc.c
src/backend/utils/adt/ruleutils.c
src/include/catalog/index.h
src/include/catalog/namespace.h
src/pl/plpgsql/src/pl_comp.c

index 7b27c2305821b4e5633759f7b3058a90f0e2c07c..245b224686257f3802934e7483ef26d74238a0eb 100644 (file)
@@ -979,7 +979,7 @@ relation_openrv(const RangeVar *relation, LOCKMODE lockmode)
    Oid         relOid;
 
    /* Look up and lock the appropriate relation using namespace search */
-   relOid = RangeVarGetRelid(relation, lockmode, false, false);
+   relOid = RangeVarGetRelid(relation, lockmode, false);
 
    /* Let relation_open do the rest */
    return relation_open(relOid, NoLock);
@@ -1001,7 +1001,7 @@ relation_openrv_extended(const RangeVar *relation, LOCKMODE lockmode,
    Oid         relOid;
 
    /* Look up and lock the appropriate relation using namespace search */
-   relOid = RangeVarGetRelid(relation, lockmode, missing_ok, false);
+   relOid = RangeVarGetRelid(relation, lockmode, missing_ok);
 
    /* Return NULL on not-found */
    if (!OidIsValid(relOid))
index 9a125bdac68cc483b7f79138a873ff2756ec6fb4..60c0b8a5b234b4e3c33ffa3f373769ba72467372 100644 (file)
@@ -606,7 +606,7 @@ objectNamesToOids(GrantObjectType objtype, List *objnames)
                RangeVar   *relvar = (RangeVar *) lfirst(cell);
                Oid         relOid;
 
-               relOid = RangeVarGetRelid(relvar, NoLock, false, false);
+               relOid = RangeVarGetRelid(relvar, NoLock, false);
                objects = lappend_oid(objects, relOid);
            }
            break;
index 99e130c1b0d9fea27e7de86225f646907d144601..758872ff4eda584ab4f269c768226e10079ddf08 100644 (file)
@@ -111,7 +111,6 @@ static void validate_index_heapscan(Relation heapRelation,
                        IndexInfo *indexInfo,
                        Snapshot snapshot,
                        v_i_state *state);
-static Oid IndexGetRelation(Oid indexId);
 static bool ReindexIsCurrentlyProcessingIndex(Oid indexOid);
 static void SetReindexProcessing(Oid heapOid, Oid indexOid);
 static void ResetReindexProcessing(void);
@@ -1301,7 +1300,7 @@ index_drop(Oid indexId)
     * proceeding until we commit and send out a shared-cache-inval notice
     * that will make them update their index lists.
     */
-   heapId = IndexGetRelation(indexId);
+   heapId = IndexGetRelation(indexId, false);
    userHeapRelation = heap_open(heapId, AccessExclusiveLock);
 
    userIndexRelation = index_open(indexId, AccessExclusiveLock);
@@ -2763,8 +2762,8 @@ validate_index_heapscan(Relation heapRelation,
  * IndexGetRelation: given an index's relation OID, get the OID of the
  * relation it is an index on. Uses the system cache.
  */
-static Oid
-IndexGetRelation(Oid indexId)
+Oid
+IndexGetRelation(Oid indexId, bool missing_ok)
 {
    HeapTuple   tuple;
    Form_pg_index index;
@@ -2772,7 +2771,11 @@ IndexGetRelation(Oid indexId)
 
    tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexId));
    if (!HeapTupleIsValid(tuple))
+   {
+       if (missing_ok)
+           return InvalidOid;
        elog(ERROR, "cache lookup failed for index %u", indexId);
+   }
    index = (Form_pg_index) GETSTRUCT(tuple);
    Assert(index->indexrelid == indexId);
 
@@ -2800,7 +2803,7 @@ reindex_index(Oid indexId, bool skip_constraint_checks)
     * Open and lock the parent heap relation.  ShareLock is sufficient since
     * we only need to be sure no schema or data changes are going on.
     */
-   heapId = IndexGetRelation(indexId);
+   heapId = IndexGetRelation(indexId, false);
    heapRelation = heap_open(heapId, ShareLock);
 
    /*
index 6d4d4b1cccf765cb22692e3d1b68db91eacaf13c..25cad3d1528f6ba8eee5a60cb7bc67fc51d25c7d 100644 (file)
@@ -219,10 +219,14 @@ Datum     pg_is_other_temp_schema(PG_FUNCTION_ARGS);
  * otherwise raise an error.
  *
  * If nowait = true, throw an error if we'd have to wait for a lock.
+ *
+ * Callback allows caller to check permissions or acquire additional locks
+ * prior to grabbing the relation lock.
  */
 Oid
-RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok,
-                bool nowait)
+RangeVarGetRelidExtended(const RangeVar *relation, LOCKMODE lockmode,
+                        bool missing_ok, bool nowait,
+                        RangeVarGetRelidCallback callback, void *callback_arg)
 {
    uint64      inval_count;
    Oid         relId;
@@ -308,6 +312,19 @@ RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok,
            relId = RelnameGetRelid(relation->relname);
        }
 
+       /*
+        * Invoke caller-supplied callback, if any.
+        *
+        * This callback is a good place to check permissions: we haven't taken
+        * the table lock yet (and it's really best to check permissions before
+        * locking anything!), but we've gotten far enough to know what OID we
+        * think we should lock.  Of course, concurrent DDL might things while
+        * we're waiting for the lock, but in that case the callback will be
+        * invoked again for the new OID.
+        */
+       if (callback)
+           callback(relation, relId, oldRelId, callback_arg);
+
        /*
         * If no lock requested, we assume the caller knows what they're
         * doing.  They should have already acquired a heavyweight lock on
index c32122466433349e08fd1222180412c77fb56af3..eb0584e99105f69433cd6dcfdd6f4f6701273c74 100644 (file)
@@ -111,7 +111,7 @@ ExecRenameStmt(RenameStmt *stmt)
                 * in RenameRelation, renameatt, or renametrig.
                 */
                relid = RangeVarGetRelid(stmt->relation, AccessExclusiveLock,
-                                        false, false);
+                                        false);
 
                switch (stmt->renameType)
                {
index 69aa5bf6466a77042c5c6935c98b81bc48b4b28c..386b95bca2096f8fd4dea420681dd6fbd37252d9 100644 (file)
@@ -63,7 +63,10 @@ static void ComputeIndexAttrs(IndexInfo *indexInfo,
 static Oid GetIndexOpClass(List *opclass, Oid attrType,
                char *accessMethodName, Oid accessMethodId);
 static char *ChooseIndexNameAddition(List *colnames);
-
+static void RangeVarCallbackForReindexTable(const RangeVar *relation,
+                               Oid relId, Oid oldRelId, void *arg);
+static void RangeVarCallbackForReindexIndex(const RangeVar *relation,
+                               Oid relId, Oid oldRelId, void *arg);
 
 /*
  * CheckIndexCompatible
@@ -129,7 +132,7 @@ CheckIndexCompatible(Oid oldId,
    Datum       d;
 
    /* Caller should already have the relation locked in some way. */
-   relationId = RangeVarGetRelid(heapRelation, NoLock, false, false);
+   relationId = RangeVarGetRelid(heapRelation, NoLock, false);
    /*
     * We can pretend isconstraint = false unconditionally.  It only serves to
     * decide the text of an error message that should never happen for us.
@@ -1725,33 +1728,74 @@ void
 ReindexIndex(RangeVar *indexRelation)
 {
    Oid         indOid;
-   HeapTuple   tuple;
+   Oid         heapOid = InvalidOid;
+
+   /* lock level used here should match index lock reindex_index() */
+   indOid = RangeVarGetRelidExtended(indexRelation, AccessExclusiveLock,
+                                     false, false,
+                                     RangeVarCallbackForReindexIndex,
+                                     (void *) &heapOid);
+
+   reindex_index(indOid, false);
+}
+
+/*
+ * Check permissions on table before acquiring relation lock; also lock
+ * the heap before the RangeVarGetRelidExtended takes the index lock, to avoid
+ * deadlocks.
+ */
+static void
+RangeVarCallbackForReindexIndex(const RangeVar *relation,
+                               Oid relId, Oid oldRelId, void *arg)
+{
+   char        relkind;
+   Oid        *heapOid = (Oid *) arg;
 
    /*
-    * XXX: This is not safe in the presence of concurrent DDL. We should
-    * take AccessExclusiveLock here, but that would violate the rule that
-    * indexes should only be locked after their parent tables.  For now,
-    * we live with it.
+    * If we previously locked some other index's heap, and the name we're
+    * looking up no longer refers to that relation, release the now-useless
+    * lock.
     */
-   indOid = RangeVarGetRelid(indexRelation, NoLock, false, false);
-   tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(indOid));
-   if (!HeapTupleIsValid(tuple))
-       elog(ERROR, "cache lookup failed for relation %u", indOid);
+   if (relId != oldRelId && OidIsValid(oldRelId))
+   {
+       /* lock level here should match reindex_index() heap lock */
+       UnlockRelationOid(*heapOid, ShareLock);
+       *heapOid = InvalidOid;
+   }
+
+   /* If the relation does not exist, there's nothing more to do. */
+   if (!OidIsValid(relId))
+       return;
 
-   if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
+   /*
+    * If the relation does exist, check whether it's an index.  But note
+    * that the relation might have been dropped between the time we did the
+    * name lookup and now.  In that case, there's nothing to do.
+    */
+   relkind = get_rel_relkind(relId);
+   if (!relkind)
+       return;
+   if (relkind != RELKIND_INDEX)
        ereport(ERROR,
                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                errmsg("\"%s\" is not an index",
-                       indexRelation->relname)));
+                errmsg("\"%s\" is not an index", relation->relname)));
 
    /* Check permissions */
-   if (!pg_class_ownercheck(indOid, GetUserId()))
-       aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
-                      indexRelation->relname);
-
-   ReleaseSysCache(tuple);
+   if (!pg_class_ownercheck(relId, GetUserId()))
+       aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname);
 
-   reindex_index(indOid, false);
+   /* Lock heap before index to avoid deadlock. */
+   if (relId != oldRelId)
+   {
+       /*
+        * Lock level here should match reindex_index() heap lock.
+        * If the OID isn't valid, it means the index as concurrently dropped,
+        * which is not a problem for us; just return normally.
+        */
+       *heapOid = IndexGetRelation(relId, true);
+       if (OidIsValid(*heapOid))
+           LockRelationOid(*heapOid, ShareLock);
+   }
 }
 
 /*
@@ -1762,27 +1806,10 @@ void
 ReindexTable(RangeVar *relation)
 {
    Oid         heapOid;
-   HeapTuple   tuple;
 
    /* The lock level used here should match reindex_relation(). */
-   heapOid = RangeVarGetRelid(relation, ShareLock, false, false);
-   tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(heapOid));
-   if (!HeapTupleIsValid(tuple))       /* shouldn't happen */
-       elog(ERROR, "cache lookup failed for relation %u", heapOid);
-
-   if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_RELATION &&
-       ((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_TOASTVALUE)
-       ereport(ERROR,
-               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                errmsg("\"%s\" is not a table",
-                       relation->relname)));
-
-   /* Check permissions */
-   if (!pg_class_ownercheck(heapOid, GetUserId()))
-       aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
-                      relation->relname);
-
-   ReleaseSysCache(tuple);
+   heapOid = RangeVarGetRelidExtended(relation, ShareLock, false, false,
+                                      RangeVarCallbackForReindexTable, NULL);
 
    if (!reindex_relation(heapOid, REINDEX_REL_PROCESS_TOAST))
        ereport(NOTICE,
@@ -1790,6 +1817,37 @@ ReindexTable(RangeVar *relation)
                        relation->relname)));
 }
 
+/*
+ * Check permissions on table before acquiring relation lock.
+ */
+static void
+RangeVarCallbackForReindexTable(const RangeVar *relation,
+                               Oid relId, Oid oldRelId, void *arg)
+{
+   char        relkind;
+
+   /* Nothing to do if the relation was not found. */
+   if (!OidIsValid(relId))
+       return;
+
+   /*
+    * If the relation does exist, check whether it's an index.  But note
+    * that the relation might have been dropped between the time we did the
+    * name lookup and now.  In that case, there's nothing to do.
+    */
+   relkind = get_rel_relkind(relId);
+   if (!relkind)
+       return;
+   if (relkind != RELKIND_RELATION && relkind != RELKIND_TOASTVALUE)
+       ereport(ERROR,
+               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                errmsg("\"%s\" is not a table", relation->relname)));
+
+   /* Check permissions */
+   if (!pg_class_ownercheck(relId, GetUserId()))
+       aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname);
+}
+
 /*
  * ReindexDatabase
  *     Recreate indexes of a database.
index 875f868b969233344f3286d9491072c38fe3e9b2..91642ce03de2e4fb6e54bb4401fd5bdd63cae104 100644 (file)
 #include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/lsyscache.h"
+#include "utils/syscache.h"
 
-static void LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait,
-                bool recurse);
-
+static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait);
+static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode);
+static void RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid,
+                            Oid oldrelid, void *arg);
 
 /*
  * LOCK TABLE
@@ -51,98 +53,124 @@ LockTableCommand(LockStmt *lockstmt)
    foreach(p, lockstmt->relations)
    {
        RangeVar   *rv = (RangeVar *) lfirst(p);
-       Relation    rel;
        bool        recurse = interpretInhOption(rv->inhOpt);
        Oid         reloid;
 
-       reloid = RangeVarGetRelid(rv, lockstmt->mode, false, lockstmt->nowait);
-       rel = relation_open(reloid, NoLock);
+       reloid = RangeVarGetRelidExtended(rv, lockstmt->mode, false,
+                                         lockstmt->nowait,
+                                         RangeVarCallbackForLockTable,
+                                         (void *) &lockstmt->mode);
 
-       LockTableRecurse(rel, lockstmt->mode, lockstmt->nowait, recurse);
+       if (recurse)
+           LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait);
    }
 }
 
 /*
- * Apply LOCK TABLE recursively over an inheritance tree
+ * Before acquiring a table lock on the named table, check whether we have
+ * permission to do so.
  */
 static void
-LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, bool recurse)
+RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid,
+                            void *arg)
 {
+   LOCKMODE    lockmode = * (LOCKMODE *) arg;
+   char        relkind;
    AclResult   aclresult;
-   Oid         reloid = RelationGetRelid(rel);
 
-   /* Verify adequate privilege */
-   if (lockmode == AccessShareLock)
-       aclresult = pg_class_aclcheck(reloid, GetUserId(),
-                                     ACL_SELECT);
-   else
-       aclresult = pg_class_aclcheck(reloid, GetUserId(),
-                                     ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE);
-   if (aclresult != ACLCHECK_OK)
-       aclcheck_error(aclresult, ACL_KIND_CLASS,
-                      RelationGetRelationName(rel));
+   if (!OidIsValid(relid))
+       return;         /* doesn't exist, so no permissions check */
+   relkind = get_rel_relkind(relid);
+   if (!relkind)
+       return;         /* woops, concurrently dropped; no permissions check */
 
    /* Currently, we only allow plain tables to be locked */
-   if (rel->rd_rel->relkind != RELKIND_RELATION)
+   if (relkind != RELKIND_RELATION)
        ereport(ERROR,
                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                 errmsg("\"%s\" is not a table",
-                       RelationGetRelationName(rel))));
+                       rv->relname)));
 
-   /*
-    * If requested, recurse to children.  We use find_inheritance_children
-    * not find_all_inheritors to avoid taking locks far in advance of
-    * checking privileges.  This means we'll visit multiply-inheriting
-    * children more than once, but that's no problem.
-    */
-   if (recurse)
+   /* Check permissions. */
+   aclresult = LockTableAclCheck(relid, lockmode);
+   if (aclresult != ACLCHECK_OK)
+       aclcheck_error(aclresult, ACL_KIND_CLASS, rv->relname);
+}
+
+/*
+ * Apply LOCK TABLE recursively over an inheritance tree
+ *
+ * We use find_inheritance_children not find_all_inheritors to avoid taking
+ * locks far in advance of checking privileges.  This means we'll visit
+ * multiply-inheriting children more than once, but that's no problem.
+ */
+static void
+LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait)
+{
+   List       *children;
+   ListCell   *lc;
+
+   children = find_inheritance_children(reloid, NoLock);
+
+   foreach(lc, children)
    {
-       List       *children = find_inheritance_children(reloid, NoLock);
-       ListCell   *lc;
-       Relation    childrel;
+       Oid         childreloid = lfirst_oid(lc);
+       AclResult   aclresult;
+
+       /* Check permissions before acquiring the lock. */
+       aclresult = LockTableAclCheck(childreloid, lockmode);
+       if (aclresult != ACLCHECK_OK)
+       {
+           char       *relname = get_rel_name(childreloid);
+           if (!relname)
+               continue;   /* child concurrently dropped, just skip it */
+           aclcheck_error(aclresult, ACL_KIND_CLASS, relname); 
+       }
 
-       foreach(lc, children)
+       /* We have enough rights to lock the relation; do so. */
+       if (!nowait)
+           LockRelationOid(childreloid, lockmode);
+       else if (!ConditionalLockRelationOid(childreloid, lockmode))
        {
-           Oid         childreloid = lfirst_oid(lc);
-
-           /*
-            * Acquire the lock, to protect against concurrent drops.  Note
-            * that a lock against an already-dropped relation's OID won't
-            * fail.
-            */
-           if (!nowait)
-               LockRelationOid(childreloid, lockmode);
-           else if (!ConditionalLockRelationOid(childreloid, lockmode))
-           {
-               /* try to throw error by name; relation could be deleted... */
-               char       *relname = get_rel_name(childreloid);
-
-               if (relname)
-                   ereport(ERROR,
-                           (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
-                            errmsg("could not obtain lock on relation \"%s\"",
-                               relname)));
-               else
-                   ereport(ERROR,
-                           (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
-                    errmsg("could not obtain lock on relation with OID %u",
-                                reloid)));
-           }
-
-           /*
-            * Now that we have the lock, check to see if the relation really
-            * exists or not.
-            */
-           childrel = try_relation_open(childreloid, NoLock);
-           if (!childrel)
-           {
-               /* Release useless lock */
-               UnlockRelationOid(childreloid, lockmode);
-           }
-
-           LockTableRecurse(childrel, lockmode, nowait, recurse);
+           /* try to throw error by name; relation could be deleted... */
+           char       *relname = get_rel_name(childreloid);
+           if (!relname)
+               continue;   /* child concurrently dropped, just skip it */
+           ereport(ERROR,
+                   (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+                    errmsg("could not obtain lock on relation \"%s\"",
+                       relname)));
        }
+
+       /*
+        * Even if we got the lock, child might have been concurrently dropped.
+        * If so, we can skip it.
+        */
+       if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(childreloid)))
+       {
+           /* Release useless lock */
+           UnlockRelationOid(childreloid, lockmode);
+           continue;
+       }
+
+       LockTableRecurse(childreloid, lockmode, nowait);
    }
+}
 
-   relation_close(rel, NoLock);    /* close rel, keep lock */
+/*
+ * Check whether the current user is permitted to lock this relation.
+ */
+static AclResult
+LockTableAclCheck(Oid reloid, LOCKMODE lockmode)
+{
+   AclResult   aclresult;
+
+   /* Verify adequate privilege */
+   if (lockmode == AccessShareLock)
+       aclresult = pg_class_aclcheck(reloid, GetUserId(),
+                                     ACL_SELECT);
+   else
+       aclresult = pg_class_aclcheck(reloid, GetUserId(),
+                                     ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE);
+   return aclresult;
 }
index 54660f448055375f41efc42e05bee5f8eb10d264..41e5301fc5f84e128079816fc52fd985e521a75b 100644 (file)
@@ -425,7 +425,7 @@ AlterSequence(AlterSeqStmt *stmt)
    List       *owned_by;
 
    /* Open and lock sequence. */
-   relid = RangeVarGetRelid(stmt->sequence, AccessShareLock, false, false);
+   relid = RangeVarGetRelid(stmt->sequence, AccessShareLock, false);
    init_sequence(relid, &elm, &seqrel);
 
    /* allow ALTER to sequence owner only */
@@ -513,7 +513,7 @@ nextval(PG_FUNCTION_ARGS)
     * whether the performance penalty is material in practice, but for now,
     * we do it this way.
     */
-   relid = RangeVarGetRelid(sequence, NoLock, false, false);
+   relid = RangeVarGetRelid(sequence, NoLock, false);
 
    PG_RETURN_INT64(nextval_internal(relid));
 }
index c4622c04125bbca7d3fa75e538c5737e4aa4fb96..1c3fe6a9630c0470baf48d38e3980978e4c0efcc 100644 (file)
@@ -235,6 +235,12 @@ static const struct dropmsgstrings dropmsgstringarray[] = {
    {'\0', 0, NULL, NULL, NULL, NULL}
 };
 
+struct DropRelationCallbackState
+{
+   char    relkind;
+   Oid     heapOid;
+};
+
 /* Alter table target-type flags for ATSimplePermissions */
 #define        ATT_TABLE               0x0001
 #define        ATT_VIEW                0x0002
@@ -375,6 +381,9 @@ static void copy_relation_data(SMgrRelation rel, SMgrRelation dst,
                   ForkNumber forkNum, char relpersistence);
 static const char *storage_name(char c);
 
+static void RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid,
+                               Oid oldRelOid, void *arg);
+
 
 /* ----------------------------------------------------------------
  *     DefineRelation
@@ -751,9 +760,8 @@ RemoveRelations(DropStmt *drop)
    {
        RangeVar   *rel = makeRangeVarFromNameList((List *) lfirst(cell));
        Oid         relOid;
-       HeapTuple   tuple;
-       Form_pg_class classform;
        ObjectAddress obj;
+       struct DropRelationCallbackState    state;
 
        /*
         * These next few steps are a great deal like relation_openrv, but we
@@ -767,15 +775,13 @@ RemoveRelations(DropStmt *drop)
         */
        AcceptInvalidationMessages();
 
-       /*
-        * Look up the appropriate relation using namespace search.
-        *
-        * XXX: Doing this without a lock is unsafe in the presence of
-        * concurrent DDL, but acquiring a lock here might violate the rule
-        * that a table must be locked before its corresponding index.
-        * So, for now, we ignore the hazard.
-        */
-       relOid = RangeVarGetRelid(rel, NoLock, true, false);
+       /* Look up the appropriate relation using namespace search. */
+       state.relkind = relkind;
+       state.heapOid = InvalidOid;
+       relOid = RangeVarGetRelidExtended(rel, AccessExclusiveLock, true,
+                                         false,
+                                         RangeVarCallbackForDropRelation,
+                                         (void *) &state);
 
        /* Not there? */
        if (!OidIsValid(relOid))
@@ -784,57 +790,12 @@ RemoveRelations(DropStmt *drop)
            continue;
        }
 
-       /*
-        * In DROP INDEX, attempt to acquire lock on the parent table before
-        * locking the index.  index_drop() will need this anyway, and since
-        * regular queries lock tables before their indexes, we risk deadlock
-        * if we do it the other way around.  No error if we don't find a
-        * pg_index entry, though --- that most likely means it isn't an
-        * index, and we'll fail below.
-        */
-       if (relkind == RELKIND_INDEX)
-       {
-           tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(relOid));
-           if (HeapTupleIsValid(tuple))
-           {
-               Form_pg_index index = (Form_pg_index) GETSTRUCT(tuple);
-
-               LockRelationOid(index->indrelid, AccessExclusiveLock);
-               ReleaseSysCache(tuple);
-           }
-       }
-
-       /* Get the lock before trying to fetch the syscache entry */
-       LockRelationOid(relOid, AccessExclusiveLock);
-
-       tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
-       if (!HeapTupleIsValid(tuple))
-           elog(ERROR, "cache lookup failed for relation %u", relOid);
-       classform = (Form_pg_class) GETSTRUCT(tuple);
-
-       if (classform->relkind != relkind)
-           DropErrorMsgWrongType(rel->relname, classform->relkind, relkind);
-
-       /* Allow DROP to either table owner or schema owner */
-       if (!pg_class_ownercheck(relOid, GetUserId()) &&
-           !pg_namespace_ownercheck(classform->relnamespace, GetUserId()))
-           aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
-                          rel->relname);
-
-       if (!allowSystemTableMods && IsSystemClass(classform))
-           ereport(ERROR,
-                   (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                    errmsg("permission denied: \"%s\" is a system catalog",
-                           rel->relname)));
-
        /* OK, we're ready to delete this one */
        obj.classId = RelationRelationId;
        obj.objectId = relOid;
        obj.objectSubId = 0;
 
        add_exact_object_address(&obj, objects);
-
-       ReleaseSysCache(tuple);
    }
 
    performMultipleDeletions(objects, drop->behavior);
@@ -842,6 +803,74 @@ RemoveRelations(DropStmt *drop)
    free_object_addresses(objects);
 }
 
+/*
+ * Before acquiring a table lock, check whether we have sufficient rights.
+ * In the case of DROP INDEX, also try to lock the table before the index.
+ */
+static void
+RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid,
+                               void *arg)
+{
+   HeapTuple   tuple;
+   struct DropRelationCallbackState *state;
+   char        relkind;
+   Form_pg_class classform;
+
+   state = (struct DropRelationCallbackState *) arg;
+   relkind = state->relkind;
+
+   /*
+    * If we previously locked some other index's heap, and the name we're
+    * looking up no longer refers to that relation, release the now-useless
+    * lock.
+    */
+   if (relOid != oldRelOid && OidIsValid(state->heapOid))
+   {
+       UnlockRelationOid(state->heapOid, AccessExclusiveLock);
+       state->heapOid = InvalidOid;
+   }
+
+   /* Didn't find a relation, so need for locking or permission checks. */
+   if (!OidIsValid(relOid))
+       return;
+
+   tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
+   if (!HeapTupleIsValid(tuple))
+       return;                 /* concurrently dropped, so nothing to do */
+   classform = (Form_pg_class) GETSTRUCT(tuple);
+
+   if (classform->relkind != relkind)
+       DropErrorMsgWrongType(rel->relname, classform->relkind, relkind);
+
+   /* Allow DROP to either table owner or schema owner */
+   if (!pg_class_ownercheck(relOid, GetUserId()) &&
+       !pg_namespace_ownercheck(classform->relnamespace, GetUserId()))
+       aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+                      rel->relname);
+
+   if (!allowSystemTableMods && IsSystemClass(classform))
+       ereport(ERROR,
+               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                errmsg("permission denied: \"%s\" is a system catalog",
+                       rel->relname)));
+
+   ReleaseSysCache(tuple);
+
+   /*
+    * In DROP INDEX, attempt to acquire lock on the parent table before
+    * locking the index.  index_drop() will need this anyway, and since
+    * regular queries lock tables before their indexes, we risk deadlock
+    * if we do it the other way around.  No error if we don't find a
+    * pg_index entry, though --- the relation may have been droppd.
+    */
+   if (relkind == RELKIND_INDEX && relOid != oldRelOid)
+   {
+       state->heapOid = IndexGetRelation(relOid, true);
+       if (OidIsValid(state->heapOid))
+           LockRelationOid(state->heapOid, AccessExclusiveLock);
+   }
+}
+
 /*
  * ExecuteTruncate
  *     Executes a TRUNCATE command.
index 5589528e5ca3d63083c3f21fe157b5ddda75cd40..b205deca29f6cef42b889bd710ca6067c4961b42 100644 (file)
@@ -201,8 +201,7 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
         * we might end up creating a pg_constraint entry referencing a
         * nonexistent table.
         */
-       constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, false,
-                                      false);
+       constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, false);
    }
 
    /* permission checks */
index 5eec56950cc400669523145dbeecadfd5db5ce80..e70dbedbd056369cccc9b55acbca3a0209203b23 100644 (file)
@@ -330,7 +330,7 @@ get_rel_oids(Oid relid, const RangeVar *vacrel)
         * alternative, since we're going to commit this transaction and
         * begin a new one between now and then.
         */
-       relid = RangeVarGetRelid(vacrel, NoLock, false, false);
+       relid = RangeVarGetRelid(vacrel, NoLock, false);
 
        /* Make a relation list entry for this guy */
        oldcontext = MemoryContextSwitchTo(vac_context);
index 9ac28c916cd608fef10dc7b54df583a254116c6b..35e4baa7240d8deac8a612b987a3b1ba7b178351 100644 (file)
@@ -282,7 +282,7 @@ searchRangeTable(ParseState *pstate, RangeVar *relation)
    if (!relation->schemaname)
        cte = scanNameSpaceForCTE(pstate, refname, &ctelevelsup);
    if (!cte)
-       relId = RangeVarGetRelid(relation, NoLock, true, false);
+       relId = RangeVarGetRelid(relation, NoLock, true);
 
    /* Now look for RTEs matching either the relation/CTE or the alias */
    for (levelsup = 0;
index 2122032ce223ccaaf0eee3e908a9f35fc31ec42a..91028c44ca262295b678fb1d797f126e4bb24f3b 100644 (file)
@@ -115,7 +115,7 @@ LookupTypeName(ParseState *pstate, const TypeName *typeName,
         * of concurrent DDL.  But taking a lock would carry a performance
         * penalty and would also require a permissions check.
         */
-       relid = RangeVarGetRelid(rel, NoLock, false, false);
+       relid = RangeVarGetRelid(rel, NoLock, false);
        attnum = get_attnum(relid, field);
        if (attnum == InvalidAttrNumber)
            ereport(ERROR,
index 17db70e74a3162d4caa65a2e25ac42dbecaddf18..a9bb8d51f2c5c70de4658701297aaa2583cc18bc 100644 (file)
@@ -203,8 +203,7 @@ DefineRule(RuleStmt *stmt, const char *queryString)
     * Find and lock the relation.  Lock level should match
     * DefineQueryRewrite.
     */
-   relId = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, false,
-                            false);
+   relId = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, false);
 
    /* ... and execute */
    DefineQueryRewrite(stmt->rulename,
index 6f88c476adae1a9564ea337ee6b42716c5f48159..36d9edc2aaf20a57ace57f2589b18095d821c6f6 100644 (file)
@@ -84,7 +84,7 @@ CheckRelationOwnership(RangeVar *rel, bool noCatalogs)
     * AccessExclusiveLock) before verifying that the user has permissions
     * is not appealing either.
     */
-   relOid = RangeVarGetRelid(rel, NoLock, false, false);
+   relOid = RangeVarGetRelid(rel, NoLock, false);
 
    tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
    if (!HeapTupleIsValid(tuple))       /* should not happen */
index e79ba50ad3315ab25dd5e5030a83ad5c627be84d..48fb673df251cd811b7e954aaca4de9283fe8884 100644 (file)
@@ -1940,7 +1940,7 @@ convert_table_name(text *tablename)
    relrv = makeRangeVarFromNameList(textToQualifiedNameList(tablename));
 
    /* We might not even have permissions on this relation; don't lock it. */
-   return RangeVarGetRelid(relrv, NoLock, false, false);
+   return RangeVarGetRelid(relrv, NoLock, false);
 }
 
 /*
index 0d42a39ec431faf03e9c8b884d43dc14333b799d..c050f5ec910e4e8d019bad266b3583ab712b2ffc 100644 (file)
@@ -824,8 +824,7 @@ regclassin(PG_FUNCTION_ARGS)
    names = stringToQualifiedNameList(class_name_or_oid);
 
    /* We might not even have permissions on this relation; don't lock it. */
-   result = RangeVarGetRelid(makeRangeVarFromNameList(names), NoLock, false,
-                             false);
+   result = RangeVarGetRelid(makeRangeVarFromNameList(names), NoLock, false);
 
    PG_RETURN_OID(result);
 }
@@ -1298,7 +1297,7 @@ text_regclass(PG_FUNCTION_ARGS)
    rv = makeRangeVarFromNameList(textToQualifiedNameList(relname));
 
    /* We might not even have permissions on this relation; don't lock it. */
-   result = RangeVarGetRelid(rv, NoLock, false, false);
+   result = RangeVarGetRelid(rv, NoLock, false);
 
    PG_RETURN_OID(result);
 }
index 75923a6f2ea92c147f282d3e54ce60f3cb9c6610..29df7488fd64c58adb722cd901020f577f037936 100644 (file)
@@ -390,7 +390,7 @@ pg_get_viewdef_name(PG_FUNCTION_ARGS)
 
    /* Look up view name.  Can't lock it - we might not have privileges. */
    viewrel = makeRangeVarFromNameList(textToQualifiedNameList(viewname));
-   viewoid = RangeVarGetRelid(viewrel, NoLock, false, false);
+   viewoid = RangeVarGetRelid(viewrel, NoLock, false);
 
    PG_RETURN_TEXT_P(string_to_text(pg_get_viewdef_worker(viewoid, 0)));
 }
@@ -410,7 +410,7 @@ pg_get_viewdef_name_ext(PG_FUNCTION_ARGS)
 
    /* Look up view name.  Can't lock it - we might not have privileges. */
    viewrel = makeRangeVarFromNameList(textToQualifiedNameList(viewname));
-   viewoid = RangeVarGetRelid(viewrel, NoLock, false, false);
+   viewoid = RangeVarGetRelid(viewrel, NoLock, false);
 
    PG_RETURN_TEXT_P(string_to_text(pg_get_viewdef_worker(viewoid, prettyFlags)));
 }
@@ -1576,7 +1576,7 @@ pg_get_serial_sequence(PG_FUNCTION_ARGS)
 
    /* Look up table name.  Can't lock it - we might not have privileges. */
    tablerv = makeRangeVarFromNameList(textToQualifiedNameList(tablename));
-   tableOid = RangeVarGetRelid(tablerv, NoLock, false, false);
+   tableOid = RangeVarGetRelid(tablerv, NoLock, false);
 
    /* Get the number of the column */
    column = text_to_cstring(columnname);
index 8b78b05a2e68a719e9c19ea5dcbe342d8f524ccc..9efd9afed8d4070fa383c41c7d58fd75a3bc527b 100644 (file)
@@ -99,5 +99,6 @@ extern bool reindex_relation(Oid relid, int flags);
 
 extern bool ReindexIsProcessingHeap(Oid heapOid);
 extern bool ReindexIsProcessingIndex(Oid indexOid);
+extern Oid IndexGetRelation(Oid indexId, bool missing_ok);
 
 #endif   /* INDEX_H */
index 904c6fd97d844059c96d345e62bde216730f959f..d537a48b62ca481d00f15ae4f90d2b9ca9570d8c 100644 (file)
@@ -47,9 +47,16 @@ typedef struct OverrideSearchPath
    bool        addTemp;        /* implicitly prepend temp schema? */
 } OverrideSearchPath;
 
+typedef void (*RangeVarGetRelidCallback)(const RangeVar *relation, Oid relId,
+   Oid oldRelId, void *callback_arg);
 
-extern Oid RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode,
-                bool missing_ok, bool nowait);
+#define RangeVarGetRelid(relation, lockmode, missing_ok) \
+   RangeVarGetRelidExtended(relation, lockmode, missing_ok, false, NULL, NULL)
+
+extern Oid RangeVarGetRelidExtended(const RangeVar *relation,
+                        LOCKMODE lockmode, bool missing_ok, bool nowait,
+                        RangeVarGetRelidCallback callback,
+                        void *callback_arg);
 extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation);
 extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation);
 extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid);
index 275cad7b5c95ae880fba595ca7ae99bc4938ab02..d1be6c9985a06a0957b0d2ff8fa5b31e95ed1b61 100644 (file)
@@ -1707,7 +1707,7 @@ plpgsql_parse_cwordtype(List *idents)
                              strVal(lsecond(idents)),
                              -1);
        /* Can't lock relation - we might not have privileges. */
-       classOid = RangeVarGetRelid(relvar, NoLock, true, false);
+       classOid = RangeVarGetRelid(relvar, NoLock, true);
        if (!OidIsValid(classOid))
            goto done;
        fldname = strVal(lthird(idents));
@@ -1808,7 +1808,7 @@ plpgsql_parse_cwordrowtype(List *idents)
    relvar = makeRangeVar(strVal(linitial(idents)),
                          strVal(lsecond(idents)),
                          -1);
-   classOid = RangeVarGetRelid(relvar, NoLock, false, false);
+   classOid = RangeVarGetRelid(relvar, NoLock, false);
 
    MemoryContextSwitchTo(oldCxt);