From 86374c9a0e304520044ab8b2cb84849c7db4a9fa Mon Sep 17 00:00:00 2001 From: =?utf8?q?=C3=81lvaro=20Herrera?= Date: Thu, 16 Jan 2025 16:44:24 +0100 Subject: [PATCH] Split ATExecValidateConstraint into reusable pieces With this, we have separate functions to add validation requests to ALTER TABLE's phase 3 queue for check and foreign key constraints, which allows reusing them in future commits -- particularly this will allow us to perform validation of invalid foreign key constraints in partitioned tables. We could have let the check constraint code alone since we don't need to reuse that for anything at this point, but it seems cleaner and more consistent to do both at the same time. Author: Amul Sul Discussion: https://postgr.es/m/CAAJ_b96Bp=-ZwihPPtuaNX=SrZ0U6ZsXD3+fgARO0JuKa8v2jQ@mail.gmail.com --- src/backend/commands/tablecmds.c | 271 +++++++++++++++++++------------ 1 file changed, 165 insertions(+), 106 deletions(-) diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index c42a740cce..d2420a9558 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -403,6 +403,11 @@ static void ATExecAlterChildConstr(Constraint *cmdcon, Relation conrel, Relation static ObjectAddress ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName, bool recurse, bool recursing, LOCKMODE lockmode); +static void QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation rel, + HeapTuple contuple, LOCKMODE lockmode); +static void QueueCheckConstraintValidation(List **wqueue, Relation conrel, Relation rel, + char *constrName, HeapTuple contuple, + bool recurse, bool recursing, LOCKMODE lockmode); static int transformColumnNameList(Oid relId, List *colList, int16 *attnums, Oid *atttypids, Oid *attcollids); static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid, @@ -12079,136 +12084,190 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName, if (!con->convalidated) { - AlteredTableInfo *tab; - HeapTuple copyTuple; - Form_pg_constraint copy_con; - if (con->contype == CONSTRAINT_FOREIGN) { - NewConstraint *newcon; - Constraint *fkconstraint; + QueueFKConstraintValidation(wqueue, conrel, rel, tuple, lockmode); + } + else if (con->contype == CONSTRAINT_CHECK) + { + QueueCheckConstraintValidation(wqueue, conrel, rel, constrName, + tuple, recurse, recursing, lockmode); + } - /* Queue validation for phase 3 */ - fkconstraint = makeNode(Constraint); - /* for now this is all we need */ - fkconstraint->conname = constrName; + ObjectAddressSet(address, ConstraintRelationId, con->oid); + } + else + address = InvalidObjectAddress; /* already validated */ - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = constrName; - newcon->contype = CONSTR_FOREIGN; - newcon->refrelid = con->confrelid; - newcon->refindid = con->conindid; - newcon->conid = con->oid; - newcon->qual = (Node *) fkconstraint; + systable_endscan(scan); - /* Find or create work queue entry for this table */ - tab = ATGetQueueEntry(wqueue, rel); - tab->constraints = lappend(tab->constraints, newcon); + table_close(conrel, RowExclusiveLock); - /* - * We disallow creating invalid foreign keys to or from - * partitioned tables, so ignoring the recursion bit is okay. - */ - } - else if (con->contype == CONSTRAINT_CHECK) - { - List *children = NIL; - ListCell *child; - NewConstraint *newcon; - Datum val; - char *conbin; + return address; +} - /* - * If we're recursing, the parent has already done this, so skip - * it. Also, if the constraint is a NO INHERIT constraint, we - * shouldn't try to look for it in the children. - */ - if (!recursing && !con->connoinherit) - children = find_all_inheritors(RelationGetRelid(rel), - lockmode, NULL); +/* + * QueueFKConstraintValidation + * + * Add an entry to the wqueue to validate the given foreign key constraint in + * Phase 3 and update the convalidated field in the pg_constraint catalog + * for the specified relation. + */ +static void +QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation rel, + HeapTuple contuple, LOCKMODE lockmode) +{ + Form_pg_constraint con; + AlteredTableInfo *tab; + HeapTuple copyTuple; + Form_pg_constraint copy_con; - /* - * For CHECK constraints, we must ensure that we only mark the - * constraint as validated on the parent if it's already validated - * on the children. - * - * We recurse before validating on the parent, to reduce risk of - * deadlocks. - */ - foreach(child, children) - { - Oid childoid = lfirst_oid(child); - Relation childrel; + con = (Form_pg_constraint) GETSTRUCT(contuple); + Assert(con->contype == CONSTRAINT_FOREIGN); - if (childoid == RelationGetRelid(rel)) - continue; + if (rel->rd_rel->relkind == RELKIND_RELATION) + { + NewConstraint *newcon; + Constraint *fkconstraint; - /* - * If we are told not to recurse, there had better not be any - * child tables, because we can't mark the constraint on the - * parent valid unless it is valid for all child tables. - */ - if (!recurse) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("constraint must be validated on child tables too"))); + /* Queue validation for phase 3 */ + fkconstraint = makeNode(Constraint); + /* for now this is all we need */ + fkconstraint->conname = pstrdup(NameStr(con->conname)); - /* find_all_inheritors already got lock */ - childrel = table_open(childoid, NoLock); + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = fkconstraint->conname; + newcon->contype = CONSTR_FOREIGN; + newcon->refrelid = con->confrelid; + newcon->refindid = con->conindid; + newcon->conid = con->oid; + newcon->qual = (Node *) fkconstraint; - ATExecValidateConstraint(wqueue, childrel, constrName, false, - true, lockmode); - table_close(childrel, NoLock); - } + /* Find or create work queue entry for this table */ + tab = ATGetQueueEntry(wqueue, rel); + tab->constraints = lappend(tab->constraints, newcon); + } - /* Queue validation for phase 3 */ - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = constrName; - newcon->contype = CONSTR_CHECK; - newcon->refrelid = InvalidOid; - newcon->refindid = InvalidOid; - newcon->conid = con->oid; - - val = SysCacheGetAttrNotNull(CONSTROID, tuple, - Anum_pg_constraint_conbin); - conbin = TextDatumGetCString(val); - newcon->qual = (Node *) stringToNode(conbin); - - /* Find or create work queue entry for this table */ - tab = ATGetQueueEntry(wqueue, rel); - tab->constraints = lappend(tab->constraints, newcon); + /* + * We disallow creating invalid foreign keys to or from partitioned + * tables, so ignoring the recursion bit is okay. + */ - /* - * Invalidate relcache so that others see the new validated - * constraint. - */ - CacheInvalidateRelcache(rel); - } + /* + * Now update the catalog, while we have the door open. + */ + copyTuple = heap_copytuple(contuple); + copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); + copy_con->convalidated = true; + CatalogTupleUpdate(conrel, ©Tuple->t_self, copyTuple); + + InvokeObjectPostAlterHook(ConstraintRelationId, con->oid, 0); + + heap_freetuple(copyTuple); +} + +/* + * QueueCheckConstraintValidation + * + * Add an entry to the wqueue to validate the given check constraint in Phase 3 + * and update the convalidated field in the pg_constraint catalog for the + * specified relation and all its inheriting children. + */ +static void +QueueCheckConstraintValidation(List **wqueue, Relation conrel, Relation rel, + char *constrName, HeapTuple contuple, + bool recurse, bool recursing, LOCKMODE lockmode) +{ + Form_pg_constraint con; + AlteredTableInfo *tab; + HeapTuple copyTuple; + Form_pg_constraint copy_con; + + List *children = NIL; + ListCell *child; + NewConstraint *newcon; + Datum val; + char *conbin; + + con = (Form_pg_constraint) GETSTRUCT(contuple); + Assert(con->contype == CONSTRAINT_CHECK); + + /* + * If we're recursing, the parent has already done this, so skip it. Also, + * if the constraint is a NO INHERIT constraint, we shouldn't try to look + * for it in the children. + */ + if (!recursing && !con->connoinherit) + children = find_all_inheritors(RelationGetRelid(rel), + lockmode, NULL); + + /* + * For CHECK constraints, we must ensure that we only mark the constraint + * as validated on the parent if it's already validated on the children. + * + * We recurse before validating on the parent, to reduce risk of + * deadlocks. + */ + foreach(child, children) + { + Oid childoid = lfirst_oid(child); + Relation childrel; + + if (childoid == RelationGetRelid(rel)) + continue; /* - * Now update the catalog, while we have the door open. + * If we are told not to recurse, there had better not be any child + * tables, because we can't mark the constraint on the parent valid + * unless it is valid for all child tables. */ - copyTuple = heap_copytuple(tuple); - copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); - copy_con->convalidated = true; - CatalogTupleUpdate(conrel, ©Tuple->t_self, copyTuple); - - InvokeObjectPostAlterHook(ConstraintRelationId, con->oid, 0); + if (!recurse) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("constraint must be validated on child tables too"))); - heap_freetuple(copyTuple); + /* find_all_inheritors already got lock */ + childrel = table_open(childoid, NoLock); - ObjectAddressSet(address, ConstraintRelationId, con->oid); + ATExecValidateConstraint(wqueue, childrel, constrName, false, + true, lockmode); + table_close(childrel, NoLock); } - else - address = InvalidObjectAddress; /* already validated */ - systable_endscan(scan); + /* Queue validation for phase 3 */ + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = constrName; + newcon->contype = CONSTR_CHECK; + newcon->refrelid = InvalidOid; + newcon->refindid = InvalidOid; + newcon->conid = con->oid; - table_close(conrel, RowExclusiveLock); + val = SysCacheGetAttrNotNull(CONSTROID, contuple, + Anum_pg_constraint_conbin); + conbin = TextDatumGetCString(val); + newcon->qual = (Node *) stringToNode(conbin); - return address; -} + /* Find or create work queue entry for this table */ + tab = ATGetQueueEntry(wqueue, rel); + tab->constraints = lappend(tab->constraints, newcon); + /* + * Invalidate relcache so that others see the new validated constraint. + */ + CacheInvalidateRelcache(rel); + + /* + * Now update the catalog, while we have the door open. + */ + copyTuple = heap_copytuple(contuple); + copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); + copy_con->convalidated = true; + CatalogTupleUpdate(conrel, ©Tuple->t_self, copyTuple); + + InvokeObjectPostAlterHook(ConstraintRelationId, con->oid, 0); + + heap_freetuple(copyTuple); +} /* * transformColumnNameList - transform list of column names -- 2.39.5