Fix DROP OPERATOR to reset oprcom/oprnegate links to the dropped operator.
authorTom Lane <tgl@sss.pgh.pa.us>
Fri, 25 Mar 2016 16:33:16 +0000 (12:33 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Fri, 25 Mar 2016 16:33:16 +0000 (12:33 -0400)
This avoids leaving dangling links in pg_operator; which while fairly
harmless are also unsightly.

While we're at it, simplify OperatorUpd, which went through
heap_modify_tuple for no very good reason considering it had already made
a tuple copy it could just scribble on.

Roma Sokolov, reviewed by Tomas Vondra, additional hacking by Robert Haas
and myself.

src/backend/catalog/pg_operator.c
src/backend/commands/operatorcmds.c
src/include/catalog/pg_operator_fn.h
src/test/regress/expected/drop_operator.out [new file with mode: 0644]
src/test/regress/parallel_schedule
src/test/regress/serial_schedule
src/test/regress/sql/drop_operator.sql [new file with mode: 0644]

index 3cd18995565e8c881aef4d65fc2541ef7257d14b..5b5cd3fc01bd77b0e933e758171bc0fed470793a 100644 (file)
@@ -54,8 +54,6 @@ static Oid OperatorShellMake(const char *operatorName,
                  Oid leftTypeId,
                  Oid rightTypeId);
 
-static void OperatorUpd(Oid baseId, Oid commId, Oid negId);
-
 static Oid get_other_operator(List *otherOp,
                   Oid otherLeftTypeId, Oid otherRightTypeId,
                   const char *operatorName, Oid operatorNamespace,
@@ -566,7 +564,7 @@ OperatorCreate(const char *operatorName,
        commutatorId = operatorObjectId;
 
    if (OidIsValid(commutatorId) || OidIsValid(negatorId))
-       OperatorUpd(operatorObjectId, commutatorId, negatorId);
+       OperatorUpd(operatorObjectId, commutatorId, negatorId, false);
 
    return address;
 }
@@ -639,125 +637,125 @@ get_other_operator(List *otherOp, Oid otherLeftTypeId, Oid otherRightTypeId,
  * OperatorUpd
  *
  * For a given operator, look up its negator and commutator operators.
- * If they are defined, but their negator and commutator fields
- * (respectively) are empty, then use the new operator for neg or comm.
- * This solves a problem for users who need to insert two new operators
- * which are the negator or commutator of each other.
+ * When isDelete is false, update their negator and commutator fields to
+ * point back to the given operator; when isDelete is true, update those
+ * fields to no longer point back to the given operator.
+ *
+ * The !isDelete case solves a problem for users who need to insert two new
+ * operators that are the negator or commutator of each other, while the
+ * isDelete case is needed so as not to leave dangling OID links behind
+ * after dropping an operator.
  */
-static void
-OperatorUpd(Oid baseId, Oid commId, Oid negId)
+void
+OperatorUpd(Oid baseId, Oid commId, Oid negId, bool isDelete)
 {
-   int         i;
    Relation    pg_operator_desc;
    HeapTuple   tup;
-   bool        nulls[Natts_pg_operator];
-   bool        replaces[Natts_pg_operator];
-   Datum       values[Natts_pg_operator];
-
-   for (i = 0; i < Natts_pg_operator; ++i)
-   {
-       values[i] = (Datum) 0;
-       replaces[i] = false;
-       nulls[i] = false;
-   }
 
    /*
-    * check and update the commutator & negator, if necessary
-    *
-    * We need a CommandCounterIncrement here in case of a self-commutator
-    * operator: we'll need to update the tuple that we just inserted.
+    * If we're making an operator into its own commutator, then we need a
+    * command-counter increment here, since we've just inserted the tuple
+    * we're about to update.  But when we're dropping an operator, we can
+    * skip this because we're at the beginning of the command.
     */
-   CommandCounterIncrement();
+   if (!isDelete)
+       CommandCounterIncrement();
 
+   /* Open the relation. */
    pg_operator_desc = heap_open(OperatorRelationId, RowExclusiveLock);
 
-   tup = SearchSysCacheCopy1(OPEROID, ObjectIdGetDatum(commId));
+   /* Get a writable copy of the commutator's tuple. */
+   if (OidIsValid(commId))
+       tup = SearchSysCacheCopy1(OPEROID, ObjectIdGetDatum(commId));
+   else
+       tup = NULL;
 
-   /*
-    * if the commutator and negator are the same operator, do one update. XXX
-    * this is probably useless code --- I doubt it ever makes sense for
-    * commutator and negator to be the same thing...
-    */
-   if (commId == negId)
+   /* Update the commutator's tuple if need be. */
+   if (HeapTupleIsValid(tup))
    {
-       if (HeapTupleIsValid(tup))
+       Form_pg_operator t = (Form_pg_operator) GETSTRUCT(tup);
+       bool        update_commutator = false;
+
+       /*
+        * Out of due caution, we only change the commutator's oprcom field if
+        * it has the exact value we expected: InvalidOid when creating an
+        * operator, or baseId when dropping one.
+        */
+       if (isDelete && t->oprcom == baseId)
        {
-           Form_pg_operator t = (Form_pg_operator) GETSTRUCT(tup);
-
-           if (!OidIsValid(t->oprcom) || !OidIsValid(t->oprnegate))
-           {
-               if (!OidIsValid(t->oprnegate))
-               {
-                   values[Anum_pg_operator_oprnegate - 1] = ObjectIdGetDatum(baseId);
-                   replaces[Anum_pg_operator_oprnegate - 1] = true;
-               }
-
-               if (!OidIsValid(t->oprcom))
-               {
-                   values[Anum_pg_operator_oprcom - 1] = ObjectIdGetDatum(baseId);
-                   replaces[Anum_pg_operator_oprcom - 1] = true;
-               }
-
-               tup = heap_modify_tuple(tup,
-                                       RelationGetDescr(pg_operator_desc),
-                                       values,
-                                       nulls,
-                                       replaces);
-
-               simple_heap_update(pg_operator_desc, &tup->t_self, tup);
-
-               CatalogUpdateIndexes(pg_operator_desc, tup);
-           }
+           t->oprcom = InvalidOid;
+           update_commutator = true;
+       }
+       else if (!isDelete && !OidIsValid(t->oprcom))
+       {
+           t->oprcom = baseId;
+           update_commutator = true;
        }
 
-       heap_close(pg_operator_desc, RowExclusiveLock);
-
-       return;
-   }
-
-   /* if commutator and negator are different, do two updates */
-
-   if (HeapTupleIsValid(tup) &&
-       !(OidIsValid(((Form_pg_operator) GETSTRUCT(tup))->oprcom)))
-   {
-       values[Anum_pg_operator_oprcom - 1] = ObjectIdGetDatum(baseId);
-       replaces[Anum_pg_operator_oprcom - 1] = true;
-
-       tup = heap_modify_tuple(tup,
-                               RelationGetDescr(pg_operator_desc),
-                               values,
-                               nulls,
-                               replaces);
-
-       simple_heap_update(pg_operator_desc, &tup->t_self, tup);
-
-       CatalogUpdateIndexes(pg_operator_desc, tup);
-
-       values[Anum_pg_operator_oprcom - 1] = (Datum) NULL;
-       replaces[Anum_pg_operator_oprcom - 1] = false;
+       /* If any columns were found to need modification, update tuple. */
+       if (update_commutator)
+       {
+           simple_heap_update(pg_operator_desc, &tup->t_self, tup);
+           CatalogUpdateIndexes(pg_operator_desc, tup);
+
+           /*
+            * Do CCI to make the updated tuple visible.  We must do this in
+            * case the commutator is also the negator.  (Which would be a
+            * logic error on the operator definer's part, but that's not a
+            * good reason to fail here.)  We would need a CCI anyway in the
+            * deletion case for a self-commutator with no negator.
+            */
+           CommandCounterIncrement();
+       }
    }
 
-   /* check and update the negator, if necessary */
-
-   tup = SearchSysCacheCopy1(OPEROID, ObjectIdGetDatum(negId));
+   /*
+    * Similarly find and update the negator, if any.
+    */
+   if (OidIsValid(negId))
+       tup = SearchSysCacheCopy1(OPEROID, ObjectIdGetDatum(negId));
+   else
+       tup = NULL;
 
-   if (HeapTupleIsValid(tup) &&
-       !(OidIsValid(((Form_pg_operator) GETSTRUCT(tup))->oprnegate)))
+   if (HeapTupleIsValid(tup))
    {
-       values[Anum_pg_operator_oprnegate - 1] = ObjectIdGetDatum(baseId);
-       replaces[Anum_pg_operator_oprnegate - 1] = true;
+       Form_pg_operator t = (Form_pg_operator) GETSTRUCT(tup);
+       bool        update_negator = false;
 
-       tup = heap_modify_tuple(tup,
-                               RelationGetDescr(pg_operator_desc),
-                               values,
-                               nulls,
-                               replaces);
-
-       simple_heap_update(pg_operator_desc, &tup->t_self, tup);
+       /*
+        * Out of due caution, we only change the negator's oprnegate field if
+        * it has the exact value we expected: InvalidOid when creating an
+        * operator, or baseId when dropping one.
+        */
+       if (isDelete && t->oprnegate == baseId)
+       {
+           t->oprnegate = InvalidOid;
+           update_negator = true;
+       }
+       else if (!isDelete && !OidIsValid(t->oprnegate))
+       {
+           t->oprnegate = baseId;
+           update_negator = true;
+       }
 
-       CatalogUpdateIndexes(pg_operator_desc, tup);
+       /* If any columns were found to need modification, update tuple. */
+       if (update_negator)
+       {
+           simple_heap_update(pg_operator_desc, &tup->t_self, tup);
+           CatalogUpdateIndexes(pg_operator_desc, tup);
+
+           /*
+            * In the deletion case, do CCI to make the updated tuple visible.
+            * We must do this in case the operator is its own negator. (Which
+            * would be a logic error on the operator definer's part, but
+            * that's not a good reason to fail here.)
+            */
+           if (isDelete)
+               CommandCounterIncrement();
+       }
    }
 
+   /* Close relation and release catalog lock. */
    heap_close(pg_operator_desc, RowExclusiveLock);
 }
 
index 664e5d7de74514837729610a9780eb72b7c9a4e1..d32ba2d61f626299bedb73bcb2fac62b3f3baf01 100644 (file)
@@ -341,12 +341,32 @@ RemoveOperatorById(Oid operOid)
 {
    Relation    relation;
    HeapTuple   tup;
+   Form_pg_operator op;
 
    relation = heap_open(OperatorRelationId, RowExclusiveLock);
 
    tup = SearchSysCache1(OPEROID, ObjectIdGetDatum(operOid));
    if (!HeapTupleIsValid(tup)) /* should not happen */
        elog(ERROR, "cache lookup failed for operator %u", operOid);
+   op = (Form_pg_operator) GETSTRUCT(tup);
+
+   /*
+    * Reset links from commutator and negator, if any.  In case of a
+    * self-commutator or self-negator, this means we have to re-fetch the
+    * updated tuple.  (We could optimize away updates on the tuple we're
+    * about to drop, but it doesn't seem worth convoluting the logic for.)
+    */
+   if (OidIsValid(op->oprcom) || OidIsValid(op->oprnegate))
+   {
+       OperatorUpd(operOid, op->oprcom, op->oprnegate, true);
+       if (operOid == op->oprcom || operOid == op->oprnegate)
+       {
+           ReleaseSysCache(tup);
+           tup = SearchSysCache1(OPEROID, ObjectIdGetDatum(operOid));
+           if (!HeapTupleIsValid(tup)) /* should not happen */
+               elog(ERROR, "cache lookup failed for operator %u", operOid);
+       }
+   }
 
    simple_heap_delete(relation, &tup->t_self);
 
index 5315e19feb3d5132b40b90a11ef2fb7158eb4590..58814e21111294f41759c0e8edf9a5118c001ead 100644 (file)
@@ -31,4 +31,6 @@ extern ObjectAddress OperatorCreate(const char *operatorName,
 
 extern ObjectAddress makeOperatorDependencies(HeapTuple tuple, bool isUpdate);
 
+extern void OperatorUpd(Oid baseId, Oid commId, Oid negId, bool isDelete);
+
 #endif   /* PG_OPERATOR_FN_H */
diff --git a/src/test/regress/expected/drop_operator.out b/src/test/regress/expected/drop_operator.out
new file mode 100644 (file)
index 0000000..cc8f5e7
--- /dev/null
@@ -0,0 +1,61 @@
+CREATE OPERATOR === (
+        PROCEDURE = int8eq,
+        LEFTARG = bigint,
+        RIGHTARG = bigint,
+        COMMUTATOR = ===
+);
+CREATE OPERATOR !== (
+        PROCEDURE = int8ne,
+        LEFTARG = bigint,
+        RIGHTARG = bigint,
+        NEGATOR = ===,
+        COMMUTATOR = !==
+);
+DROP OPERATOR !==(bigint, bigint);
+SELECT  ctid, oprcom
+FROM    pg_catalog.pg_operator fk
+WHERE   oprcom != 0 AND
+        NOT EXISTS(SELECT 1 FROM pg_catalog.pg_operator pk WHERE pk.oid = fk.oprcom);
+ ctid | oprcom 
+------+--------
+(0 rows)
+
+SELECT  ctid, oprnegate
+FROM    pg_catalog.pg_operator fk
+WHERE   oprnegate != 0 AND
+        NOT EXISTS(SELECT 1 FROM pg_catalog.pg_operator pk WHERE pk.oid = fk.oprnegate);
+ ctid | oprnegate 
+------+-----------
+(0 rows)
+
+DROP OPERATOR ===(bigint, bigint);
+CREATE OPERATOR <| (
+        PROCEDURE = int8lt,
+        LEFTARG = bigint,
+        RIGHTARG = bigint
+);
+CREATE OPERATOR |> (
+        PROCEDURE = int8gt,
+        LEFTARG = bigint,
+        RIGHTARG = bigint,
+        NEGATOR = <|,
+        COMMUTATOR = <|
+);
+DROP OPERATOR |>(bigint, bigint);
+SELECT  ctid, oprcom
+FROM    pg_catalog.pg_operator fk
+WHERE   oprcom != 0 AND
+        NOT EXISTS(SELECT 1 FROM pg_catalog.pg_operator pk WHERE pk.oid = fk.oprcom);
+ ctid | oprcom 
+------+--------
+(0 rows)
+
+SELECT  ctid, oprnegate
+FROM    pg_catalog.pg_operator fk
+WHERE   oprnegate != 0 AND
+        NOT EXISTS(SELECT 1 FROM pg_catalog.pg_operator pk WHERE pk.oid = fk.oprnegate);
+ ctid | oprnegate 
+------+-----------
+(0 rows)
+
+DROP OPERATOR <|(bigint, bigint);
index 8be4b831a1efc8dd7cbf7f0edde19a333d7ee80b..7c7b58d43d9d109f5498348c9b154b1cf1edf90a 100644 (file)
@@ -84,7 +84,7 @@ test: select_into select_distinct select_distinct_on select_implicit select_havi
 # ----------
 # Another group of parallel tests
 # ----------
-test: brin gin gist spgist privileges security_label collate matview lock replica_identity rowsecurity object_address tablesample groupingsets
+test: brin gin gist spgist privileges security_label collate matview lock replica_identity rowsecurity object_address tablesample groupingsets drop_operator
 
 # ----------
 # Another group of parallel tests
index 1de3da8bac0a985bcaddd354cf3dee487e2f5fbd..1b66516a13f1509f7ee2ce053c2d4a8c72abb6e2 100644 (file)
@@ -89,7 +89,6 @@ test: union
 test: case
 test: join
 test: aggregates
-test: groupingsets
 test: transactions
 ignore: random
 test: random
@@ -114,6 +113,8 @@ test: replica_identity
 test: rowsecurity
 test: object_address
 test: tablesample
+test: groupingsets
+test: drop_operator
 test: alter_generic
 test: alter_operator
 test: misc
diff --git a/src/test/regress/sql/drop_operator.sql b/src/test/regress/sql/drop_operator.sql
new file mode 100644 (file)
index 0000000..cc62cfa
--- /dev/null
@@ -0,0 +1,56 @@
+CREATE OPERATOR === (
+        PROCEDURE = int8eq,
+        LEFTARG = bigint,
+        RIGHTARG = bigint,
+        COMMUTATOR = ===
+);
+
+CREATE OPERATOR !== (
+        PROCEDURE = int8ne,
+        LEFTARG = bigint,
+        RIGHTARG = bigint,
+        NEGATOR = ===,
+        COMMUTATOR = !==
+);
+
+DROP OPERATOR !==(bigint, bigint);
+
+SELECT  ctid, oprcom
+FROM    pg_catalog.pg_operator fk
+WHERE   oprcom != 0 AND
+        NOT EXISTS(SELECT 1 FROM pg_catalog.pg_operator pk WHERE pk.oid = fk.oprcom);
+
+SELECT  ctid, oprnegate
+FROM    pg_catalog.pg_operator fk
+WHERE   oprnegate != 0 AND
+        NOT EXISTS(SELECT 1 FROM pg_catalog.pg_operator pk WHERE pk.oid = fk.oprnegate);
+
+DROP OPERATOR ===(bigint, bigint);
+
+CREATE OPERATOR <| (
+        PROCEDURE = int8lt,
+        LEFTARG = bigint,
+        RIGHTARG = bigint
+);
+
+CREATE OPERATOR |> (
+        PROCEDURE = int8gt,
+        LEFTARG = bigint,
+        RIGHTARG = bigint,
+        NEGATOR = <|,
+        COMMUTATOR = <|
+);
+
+DROP OPERATOR |>(bigint, bigint);
+
+SELECT  ctid, oprcom
+FROM    pg_catalog.pg_operator fk
+WHERE   oprcom != 0 AND
+        NOT EXISTS(SELECT 1 FROM pg_catalog.pg_operator pk WHERE pk.oid = fk.oprcom);
+
+SELECT  ctid, oprnegate
+FROM    pg_catalog.pg_operator fk
+WHERE   oprnegate != 0 AND
+        NOT EXISTS(SELECT 1 FROM pg_catalog.pg_operator pk WHERE pk.oid = fk.oprnegate);
+
+DROP OPERATOR <|(bigint, bigint);