Split ATExecValidateConstraint into reusable pieces
authorÁlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 16 Jan 2025 15:44:24 +0000 (16:44 +0100)
committerÁlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 16 Jan 2025 15:44:24 +0000 (16:44 +0100)
With this, we have separate functions to add validation requests to
ALTER TABLE's phase 3 queue for check and foreign key constraints, which
allows reusing them in future commits -- particularly this will allow us
to perform validation of invalid foreign key constraints in partitioned
tables.

We could have let the check constraint code alone since we don't need to
reuse that for anything at this point, but it seems cleaner and more
consistent to do both at the same time.

Author: Amul Sul <sulamul@gmail.com>
Discussion: https://postgr.es/m/CAAJ_b96Bp=-ZwihPPtuaNX=SrZ0U6ZsXD3+fgARO0JuKa8v2jQ@mail.gmail.com

src/backend/commands/tablecmds.c

index c42a740ccef845b7d85b796d83610fd7011d4b86..d2420a9558cab4f6bd3ca735404dade96ad49b3c 100644 (file)
@@ -403,6 +403,11 @@ static void ATExecAlterChildConstr(Constraint *cmdcon, Relation conrel, Relation
 static ObjectAddress ATExecValidateConstraint(List **wqueue,
                                                                                          Relation rel, char *constrName,
                                                                                          bool recurse, bool recursing, LOCKMODE lockmode);
+static void QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation rel,
+                                                                               HeapTuple contuple, LOCKMODE lockmode);
+static void QueueCheckConstraintValidation(List **wqueue, Relation conrel, Relation rel,
+                                                                                  char *constrName, HeapTuple contuple,
+                                                                                  bool recurse, bool recursing, LOCKMODE lockmode);
 static int     transformColumnNameList(Oid relId, List *colList,
                                                                        int16 *attnums, Oid *atttypids, Oid *attcollids);
 static int     transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
@@ -12079,136 +12084,190 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName,
 
        if (!con->convalidated)
        {
-               AlteredTableInfo *tab;
-               HeapTuple       copyTuple;
-               Form_pg_constraint copy_con;
-
                if (con->contype == CONSTRAINT_FOREIGN)
                {
-                       NewConstraint *newcon;
-                       Constraint *fkconstraint;
+                       QueueFKConstraintValidation(wqueue, conrel, rel, tuple, lockmode);
+               }
+               else if (con->contype == CONSTRAINT_CHECK)
+               {
+                       QueueCheckConstraintValidation(wqueue, conrel, rel, constrName,
+                                                                                  tuple, recurse, recursing, lockmode);
+               }
 
-                       /* Queue validation for phase 3 */
-                       fkconstraint = makeNode(Constraint);
-                       /* for now this is all we need */
-                       fkconstraint->conname = constrName;
+               ObjectAddressSet(address, ConstraintRelationId, con->oid);
+       }
+       else
+               address = InvalidObjectAddress; /* already validated */
 
-                       newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
-                       newcon->name = constrName;
-                       newcon->contype = CONSTR_FOREIGN;
-                       newcon->refrelid = con->confrelid;
-                       newcon->refindid = con->conindid;
-                       newcon->conid = con->oid;
-                       newcon->qual = (Node *) fkconstraint;
+       systable_endscan(scan);
 
-                       /* Find or create work queue entry for this table */
-                       tab = ATGetQueueEntry(wqueue, rel);
-                       tab->constraints = lappend(tab->constraints, newcon);
+       table_close(conrel, RowExclusiveLock);
 
-                       /*
-                        * We disallow creating invalid foreign keys to or from
-                        * partitioned tables, so ignoring the recursion bit is okay.
-                        */
-               }
-               else if (con->contype == CONSTRAINT_CHECK)
-               {
-                       List       *children = NIL;
-                       ListCell   *child;
-                       NewConstraint *newcon;
-                       Datum           val;
-                       char       *conbin;
+       return address;
+}
 
-                       /*
-                        * If we're recursing, the parent has already done this, so skip
-                        * it.  Also, if the constraint is a NO INHERIT constraint, we
-                        * shouldn't try to look for it in the children.
-                        */
-                       if (!recursing && !con->connoinherit)
-                               children = find_all_inheritors(RelationGetRelid(rel),
-                                                                                          lockmode, NULL);
+/*
+ * QueueFKConstraintValidation
+ *
+ * Add an entry to the wqueue to validate the given foreign key constraint in
+ * Phase 3 and update the convalidated field in the pg_constraint catalog
+ * for the specified relation.
+ */
+static void
+QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation rel,
+                                                       HeapTuple contuple, LOCKMODE lockmode)
+{
+       Form_pg_constraint con;
+       AlteredTableInfo *tab;
+       HeapTuple       copyTuple;
+       Form_pg_constraint copy_con;
 
-                       /*
-                        * For CHECK constraints, we must ensure that we only mark the
-                        * constraint as validated on the parent if it's already validated
-                        * on the children.
-                        *
-                        * We recurse before validating on the parent, to reduce risk of
-                        * deadlocks.
-                        */
-                       foreach(child, children)
-                       {
-                               Oid                     childoid = lfirst_oid(child);
-                               Relation        childrel;
+       con = (Form_pg_constraint) GETSTRUCT(contuple);
+       Assert(con->contype == CONSTRAINT_FOREIGN);
 
-                               if (childoid == RelationGetRelid(rel))
-                                       continue;
+       if (rel->rd_rel->relkind == RELKIND_RELATION)
+       {
+               NewConstraint *newcon;
+               Constraint *fkconstraint;
 
-                               /*
-                                * If we are told not to recurse, there had better not be any
-                                * child tables, because we can't mark the constraint on the
-                                * parent valid unless it is valid for all child tables.
-                                */
-                               if (!recurse)
-                                       ereport(ERROR,
-                                                       (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
-                                                        errmsg("constraint must be validated on child tables too")));
+               /* Queue validation for phase 3 */
+               fkconstraint = makeNode(Constraint);
+               /* for now this is all we need */
+               fkconstraint->conname = pstrdup(NameStr(con->conname));
 
-                               /* find_all_inheritors already got lock */
-                               childrel = table_open(childoid, NoLock);
+               newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+               newcon->name = fkconstraint->conname;
+               newcon->contype = CONSTR_FOREIGN;
+               newcon->refrelid = con->confrelid;
+               newcon->refindid = con->conindid;
+               newcon->conid = con->oid;
+               newcon->qual = (Node *) fkconstraint;
 
-                               ATExecValidateConstraint(wqueue, childrel, constrName, false,
-                                                                                true, lockmode);
-                               table_close(childrel, NoLock);
-                       }
+               /* Find or create work queue entry for this table */
+               tab = ATGetQueueEntry(wqueue, rel);
+               tab->constraints = lappend(tab->constraints, newcon);
+       }
 
-                       /* Queue validation for phase 3 */
-                       newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
-                       newcon->name = constrName;
-                       newcon->contype = CONSTR_CHECK;
-                       newcon->refrelid = InvalidOid;
-                       newcon->refindid = InvalidOid;
-                       newcon->conid = con->oid;
-
-                       val = SysCacheGetAttrNotNull(CONSTROID, tuple,
-                                                                                Anum_pg_constraint_conbin);
-                       conbin = TextDatumGetCString(val);
-                       newcon->qual = (Node *) stringToNode(conbin);
-
-                       /* Find or create work queue entry for this table */
-                       tab = ATGetQueueEntry(wqueue, rel);
-                       tab->constraints = lappend(tab->constraints, newcon);
+       /*
+        * We disallow creating invalid foreign keys to or from partitioned
+        * tables, so ignoring the recursion bit is okay.
+        */
 
-                       /*
-                        * Invalidate relcache so that others see the new validated
-                        * constraint.
-                        */
-                       CacheInvalidateRelcache(rel);
-               }
+       /*
+        * Now update the catalog, while we have the door open.
+        */
+       copyTuple = heap_copytuple(contuple);
+       copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
+       copy_con->convalidated = true;
+       CatalogTupleUpdate(conrel, &copyTuple->t_self, copyTuple);
+
+       InvokeObjectPostAlterHook(ConstraintRelationId, con->oid, 0);
+
+       heap_freetuple(copyTuple);
+}
+
+/*
+ * QueueCheckConstraintValidation
+ *
+ * Add an entry to the wqueue to validate the given check constraint in Phase 3
+ * and update the convalidated field in the pg_constraint catalog for the
+ * specified relation and all its inheriting children.
+ */
+static void
+QueueCheckConstraintValidation(List **wqueue, Relation conrel, Relation rel,
+                                                          char *constrName, HeapTuple contuple,
+                                                          bool recurse, bool recursing, LOCKMODE lockmode)
+{
+       Form_pg_constraint con;
+       AlteredTableInfo *tab;
+       HeapTuple       copyTuple;
+       Form_pg_constraint copy_con;
+
+       List       *children = NIL;
+       ListCell   *child;
+       NewConstraint *newcon;
+       Datum           val;
+       char       *conbin;
+
+       con = (Form_pg_constraint) GETSTRUCT(contuple);
+       Assert(con->contype == CONSTRAINT_CHECK);
+
+       /*
+        * If we're recursing, the parent has already done this, so skip it. Also,
+        * if the constraint is a NO INHERIT constraint, we shouldn't try to look
+        * for it in the children.
+        */
+       if (!recursing && !con->connoinherit)
+               children = find_all_inheritors(RelationGetRelid(rel),
+                                                                          lockmode, NULL);
+
+       /*
+        * For CHECK constraints, we must ensure that we only mark the constraint
+        * as validated on the parent if it's already validated on the children.
+        *
+        * We recurse before validating on the parent, to reduce risk of
+        * deadlocks.
+        */
+       foreach(child, children)
+       {
+               Oid                     childoid = lfirst_oid(child);
+               Relation        childrel;
+
+               if (childoid == RelationGetRelid(rel))
+                       continue;
 
                /*
-                * Now update the catalog, while we have the door open.
+                * If we are told not to recurse, there had better not be any child
+                * tables, because we can't mark the constraint on the parent valid
+                * unless it is valid for all child tables.
                 */
-               copyTuple = heap_copytuple(tuple);
-               copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
-               copy_con->convalidated = true;
-               CatalogTupleUpdate(conrel, &copyTuple->t_self, copyTuple);
-
-               InvokeObjectPostAlterHook(ConstraintRelationId, con->oid, 0);
+               if (!recurse)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+                                        errmsg("constraint must be validated on child tables too")));
 
-               heap_freetuple(copyTuple);
+               /* find_all_inheritors already got lock */
+               childrel = table_open(childoid, NoLock);
 
-               ObjectAddressSet(address, ConstraintRelationId, con->oid);
+               ATExecValidateConstraint(wqueue, childrel, constrName, false,
+                                                                true, lockmode);
+               table_close(childrel, NoLock);
        }
-       else
-               address = InvalidObjectAddress; /* already validated */
 
-       systable_endscan(scan);
+       /* Queue validation for phase 3 */
+       newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+       newcon->name = constrName;
+       newcon->contype = CONSTR_CHECK;
+       newcon->refrelid = InvalidOid;
+       newcon->refindid = InvalidOid;
+       newcon->conid = con->oid;
 
-       table_close(conrel, RowExclusiveLock);
+       val = SysCacheGetAttrNotNull(CONSTROID, contuple,
+                                                                Anum_pg_constraint_conbin);
+       conbin = TextDatumGetCString(val);
+       newcon->qual = (Node *) stringToNode(conbin);
 
-       return address;
-}
+       /* Find or create work queue entry for this table */
+       tab = ATGetQueueEntry(wqueue, rel);
+       tab->constraints = lappend(tab->constraints, newcon);
 
+       /*
+        * Invalidate relcache so that others see the new validated constraint.
+        */
+       CacheInvalidateRelcache(rel);
+
+       /*
+        * Now update the catalog, while we have the door open.
+        */
+       copyTuple = heap_copytuple(contuple);
+       copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
+       copy_con->convalidated = true;
+       CatalogTupleUpdate(conrel, &copyTuple->t_self, copyTuple);
+
+       InvokeObjectPostAlterHook(ConstraintRelationId, con->oid, 0);
+
+       heap_freetuple(copyTuple);
+}
 
 /*
  * transformColumnNameList - transform list of column names