Foreign keys on partitioned tables
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 4 Apr 2018 17:02:31 +0000 (14:02 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 4 Apr 2018 17:02:49 +0000 (14:02 -0300)
Author: Álvaro Herrera
Discussion: https://postgr.es/m/20171231194359.cvojcour423ulha4@alvherre.pgsql
Reviewed-by: Peter Eisentraut
17 files changed:
doc/src/sgml/ref/alter_table.sgml
doc/src/sgml/ref/create_table.sgml
src/backend/catalog/pg_constraint.c
src/backend/commands/tablecmds.c
src/backend/parser/parse_utilcmd.c
src/backend/utils/adt/ri_triggers.c
src/bin/pg_dump/pg_dump.c
src/include/catalog/pg_constraint_fn.h
src/include/commands/tablecmds.h
src/test/regress/expected/alter_table.out
src/test/regress/expected/create_table.out
src/test/regress/expected/foreign_key.out
src/test/regress/expected/inherit.out
src/test/regress/sql/alter_table.sql
src/test/regress/sql/create_table.sql
src/test/regress/sql/foreign_key.sql
src/test/regress/sql/inherit.sql

index 69f3355ededcbe10d951f46b8914a5ef95adde6c..bd2262761e8f4cb50327d7a247c97a95a0816e22 100644 (file)
@@ -368,7 +368,8 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
       specified check constraints).  But the
       database will not assume that the constraint holds for all rows in
       the table, until it is validated by using the <literal>VALIDATE
-      CONSTRAINT</literal> option.
+      CONSTRAINT</literal> option.  Foreign key constraints on partitioned
+      tables may not be declared <literal>NOT VALID</literal> at present.
      </para>
 
      <para>
index 14a43b45e9acfc10ac75febe2c9d35bbb6c5fb3c..d49899c497145846900b5e731f63e06418cffa6d 100644 (file)
@@ -546,9 +546,12 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
      </para>
 
      <para>
-      Partitioned tables do not support <literal>EXCLUDE</literal> or
-      <literal>FOREIGN KEY</literal> constraints; however, you can define
-      these constraints on individual partitions.
+      Partitioned tables do not support <literal>EXCLUDE</literal> constraints;
+      however, you can define these constraints on individual partitions.
+      Also, while it's possible to define <literal>PRIMARY KEY</literal>
+      constraints on partitioned tables, it is not supported to create foreign
+      keys cannot that reference them.  This restriction will be lifted in a
+      future release.
      </para>
 
     </listitem>
@@ -907,7 +910,9 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
       must have <literal>REFERENCES</literal> permission on the referenced table
       (either the whole table, or the specific referenced columns).
       Note that foreign key constraints cannot be defined between temporary
-      tables and permanent tables.
+      tables and permanent tables.  Also note that while it is possible to
+      define a foreign key on a partitioned table, it is not possible to
+      declare a foreign key that references a partitioned table.
      </para>
 
      <para>
index 4f1a27a7d343d9a123ada975a5353d5f3f094b65..153522782d45e67c5afce87f64c1966495fdd60f 100644 (file)
@@ -26,6 +26,7 @@
 #include "catalog/pg_operator.h"
 #include "catalog/pg_type.h"
 #include "commands/defrem.h"
+#include "commands/tablecmds.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
@@ -377,6 +378,242 @@ CreateConstraintEntry(const char *constraintName,
        return conOid;
 }
 
+/*
+ * CloneForeignKeyConstraints
+ *             Clone foreign keys from a partitioned table to a newly acquired
+ *             partition.
+ *
+ * relationId is a partition of parentId, so we can be certain that it has the
+ * same columns with the same datatypes.  The columns may be in different
+ * order, though.
+ *
+ * The *cloned list is appended ClonedConstraint elements describing what was
+ * created.
+ */
+void
+CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
+{
+       Relation        pg_constraint;
+       Relation        parentRel;
+       Relation        rel;
+       ScanKeyData     key;
+       SysScanDesc     scan;
+       TupleDesc       tupdesc;
+       HeapTuple       tuple;
+       AttrNumber *attmap;
+
+       parentRel = heap_open(parentId, NoLock);        /* already got lock */
+       /* see ATAddForeignKeyConstraint about lock level */
+       rel = heap_open(relationId, AccessExclusiveLock);
+
+       pg_constraint = heap_open(ConstraintRelationId, RowShareLock);
+       tupdesc = RelationGetDescr(pg_constraint);
+
+       /*
+        * The constraint key may differ, if the columns in the partition are
+        * different.  This map is used to convert them.
+        */
+       attmap = convert_tuples_by_name_map(RelationGetDescr(rel),
+                                                                               RelationGetDescr(parentRel),
+                                                                               gettext_noop("could not convert row type"));
+
+       ScanKeyInit(&key,
+                               Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
+                               F_OIDEQ, ObjectIdGetDatum(parentId));
+       scan = systable_beginscan(pg_constraint, ConstraintRelidIndexId, true,
+                                                         NULL, 1, &key);
+
+       while ((tuple = systable_getnext(scan)) != NULL)
+       {
+               Form_pg_constraint      constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+               AttrNumber      conkey[INDEX_MAX_KEYS];
+               AttrNumber      mapped_conkey[INDEX_MAX_KEYS];
+               AttrNumber      confkey[INDEX_MAX_KEYS];
+               Oid                     conpfeqop[INDEX_MAX_KEYS];
+               Oid                     conppeqop[INDEX_MAX_KEYS];
+               Oid                     conffeqop[INDEX_MAX_KEYS];
+               Constraint *fkconstraint;
+               ClonedConstraint *newc;
+               Oid                     constrOid;
+               ObjectAddress parentAddr,
+                                       childAddr;
+               int                     nelem;
+               int                     i;
+               ArrayType  *arr;
+               Datum           datum;
+               bool            isnull;
+
+               /* only foreign keys */
+               if (constrForm->contype != CONSTRAINT_FOREIGN)
+                       continue;
+
+               ObjectAddressSet(parentAddr, ConstraintRelationId,
+                                                HeapTupleGetOid(tuple));
+
+               datum = fastgetattr(tuple, Anum_pg_constraint_conkey,
+                                                       tupdesc, &isnull);
+               if (isnull)
+                       elog(ERROR, "null conkey");
+               arr = DatumGetArrayTypeP(datum);
+               nelem = ARR_DIMS(arr)[0];
+               if (ARR_NDIM(arr) != 1 ||
+                       nelem < 1 ||
+                       nelem > INDEX_MAX_KEYS ||
+                       ARR_HASNULL(arr) ||
+                       ARR_ELEMTYPE(arr) != INT2OID)
+                       elog(ERROR, "conkey is not a 1-D smallint array");
+               memcpy(conkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
+
+               for (i = 0; i < nelem; i++)
+                       mapped_conkey[i] = attmap[conkey[i] - 1];
+
+               datum = fastgetattr(tuple, Anum_pg_constraint_confkey,
+                                                       tupdesc, &isnull);
+               if (isnull)
+                       elog(ERROR, "null confkey");
+               arr = DatumGetArrayTypeP(datum);
+               nelem = ARR_DIMS(arr)[0];
+               if (ARR_NDIM(arr) != 1 ||
+                       nelem < 1 ||
+                       nelem > INDEX_MAX_KEYS ||
+                       ARR_HASNULL(arr) ||
+                       ARR_ELEMTYPE(arr) != INT2OID)
+                       elog(ERROR, "confkey is not a 1-D smallint array");
+               memcpy(confkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
+
+               datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop,
+                                                       tupdesc, &isnull);
+               if (isnull)
+                       elog(ERROR, "null conpfeqop");
+               arr = DatumGetArrayTypeP(datum);
+               nelem = ARR_DIMS(arr)[0];
+               if (ARR_NDIM(arr) != 1 ||
+                       nelem < 1 ||
+                       nelem > INDEX_MAX_KEYS ||
+                       ARR_HASNULL(arr) ||
+                       ARR_ELEMTYPE(arr) != OIDOID)
+                       elog(ERROR, "conpfeqop is not a 1-D OID array");
+               memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
+
+               datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop,
+                                                       tupdesc, &isnull);
+               if (isnull)
+                       elog(ERROR, "null conpfeqop");
+               arr = DatumGetArrayTypeP(datum);
+               nelem = ARR_DIMS(arr)[0];
+               if (ARR_NDIM(arr) != 1 ||
+                       nelem < 1 ||
+                       nelem > INDEX_MAX_KEYS ||
+                       ARR_HASNULL(arr) ||
+                       ARR_ELEMTYPE(arr) != OIDOID)
+                       elog(ERROR, "conpfeqop is not a 1-D OID array");
+               memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
+
+               datum = fastgetattr(tuple, Anum_pg_constraint_conppeqop,
+                                                       tupdesc, &isnull);
+               if (isnull)
+                       elog(ERROR, "null conppeqop");
+               arr = DatumGetArrayTypeP(datum);
+               nelem = ARR_DIMS(arr)[0];
+               if (ARR_NDIM(arr) != 1 ||
+                       nelem < 1 ||
+                       nelem > INDEX_MAX_KEYS ||
+                       ARR_HASNULL(arr) ||
+                       ARR_ELEMTYPE(arr) != OIDOID)
+                       elog(ERROR, "conppeqop is not a 1-D OID array");
+               memcpy(conppeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
+
+               datum = fastgetattr(tuple, Anum_pg_constraint_conffeqop,
+                                                       tupdesc, &isnull);
+               if (isnull)
+                       elog(ERROR, "null conffeqop");
+               arr = DatumGetArrayTypeP(datum);
+               nelem = ARR_DIMS(arr)[0];
+               if (ARR_NDIM(arr) != 1 ||
+                       nelem < 1 ||
+                       nelem > INDEX_MAX_KEYS ||
+                       ARR_HASNULL(arr) ||
+                       ARR_ELEMTYPE(arr) != OIDOID)
+                       elog(ERROR, "conffeqop is not a 1-D OID array");
+               memcpy(conffeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
+
+               constrOid =
+                       CreateConstraintEntry(NameStr(constrForm->conname),
+                                                                 constrForm->connamespace,
+                                                                 CONSTRAINT_FOREIGN,
+                                                                 constrForm->condeferrable,
+                                                                 constrForm->condeferred,
+                                                                 constrForm->convalidated,
+                                                                 HeapTupleGetOid(tuple),
+                                                                 relationId,
+                                                                 mapped_conkey,
+                                                                 nelem,
+                                                                 InvalidOid,   /* not a domain constraint */
+                                                                 constrForm->conindid, /* same index */
+                                                                 constrForm->confrelid, /* same foreign rel */
+                                                                 confkey,
+                                                                 conpfeqop,
+                                                                 conppeqop,
+                                                                 conffeqop,
+                                                                 nelem,
+                                                                 constrForm->confupdtype,
+                                                                 constrForm->confdeltype,
+                                                                 constrForm->confmatchtype,
+                                                                 NULL,
+                                                                 NULL,
+                                                                 NULL,
+                                                                 NULL,
+                                                                 false,
+                                                                 1, false, true);
+
+               ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
+               recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO);
+
+               fkconstraint = makeNode(Constraint);
+               /* for now this is all we need */
+               fkconstraint->fk_upd_action = constrForm->confupdtype;
+               fkconstraint->fk_del_action = constrForm->confdeltype;
+               fkconstraint->deferrable = constrForm->condeferrable;
+               fkconstraint->initdeferred = constrForm->condeferred;
+
+               createForeignKeyTriggers(rel, constrForm->confrelid, fkconstraint,
+                                                                constrOid, constrForm->conindid, false);
+
+               if (cloned)
+               {
+                       /*
+                        * Feed back caller about the constraints we created, so that they can
+                        * set up constraint verification.
+                        */
+                       newc = palloc(sizeof(ClonedConstraint));
+                       newc->relid = relationId;
+                       newc->refrelid = constrForm->confrelid;
+                       newc->conindid = constrForm->conindid;
+                       newc->conid = constrOid;
+                       newc->constraint = fkconstraint;
+
+                       *cloned = lappend(*cloned, newc);
+               }
+       }
+       systable_endscan(scan);
+
+       pfree(attmap);
+
+       if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+       {
+               PartitionDesc   partdesc = RelationGetPartitionDesc(rel);
+               int                     i;
+
+               for (i = 0; i < partdesc->nparts; i++)
+                       CloneForeignKeyConstraints(RelationGetRelid(rel),
+                                                                          partdesc->oids[i],
+                                                                          cloned);
+       }
+
+       heap_close(rel, NoLock);                /* keep lock till commit */
+       heap_close(parentRel, NoLock);
+       heap_close(pg_constraint, RowShareLock);
+}
 
 /*
  * Test whether given name is currently used as a constraint name
index c0f987cc8144cf3d380c59242e1013e94b9c4c44..ec2f9616edd068aadf06745cc2eb6d27486a2771 100644 (file)
@@ -338,9 +338,6 @@ static void validateCheckConstraint(Relation rel, HeapTuple constrtup);
 static void validateForeignKeyConstraint(char *conname,
                                                         Relation rel, Relation pkrel,
                                                         Oid pkindOid, Oid constraintOid);
-static void createForeignKeyTriggers(Relation rel, Oid refRelOid,
-                                                Constraint *fkconstraint,
-                                                Oid constraintOid, Oid indexOid);
 static void ATController(AlterTableStmt *parsetree,
                         Relation rel, List *cmds, bool recurse, LOCKMODE lockmode);
 static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
@@ -411,8 +408,10 @@ static ObjectAddress ATAddCheckConstraint(List **wqueue,
                                         Constraint *constr,
                                         bool recurse, bool recursing, bool is_readd,
                                         LOCKMODE lockmode);
-static ObjectAddress ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
-                                                 Constraint *fkconstraint, LOCKMODE lockmode);
+static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab,
+                                                 Relation rel, Constraint *fkconstraint, Oid parentConstr,
+                                                 bool recurse, bool recursing,
+                                                 LOCKMODE lockmode);
 static void ATExecDropConstraint(Relation rel, const char *constrName,
                                         DropBehavior behavior,
                                         bool recurse, bool recursing,
@@ -505,6 +504,7 @@ static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx,
  * relkind: relkind to assign to the new relation
  * ownerId: if not InvalidOid, use this as the new relation's owner.
  * typaddress: if not null, it's set to the pg_type entry's address.
+ * queryString: for error reporting
  *
  * Note that permissions checks are done against current user regardless of
  * ownerId.  A nonzero ownerId is used when someone is creating a relation
@@ -908,8 +908,8 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
        }
 
        /*
-        * If we're creating a partition, create now all the indexes and triggers
-        * defined in the parent.
+        * If we're creating a partition, create now all the indexes, triggers,
+        * FKs defined in the parent.
         *
         * We can't do it earlier, because DefineIndex wants to know the partition
         * key which we just stored.
@@ -961,6 +961,12 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
                if (parent->trigdesc != NULL)
                        CloneRowTriggersToPartition(parent, rel);
 
+               /*
+                * And foreign keys too.  Note that because we're freshly creating the
+                * table, there is no need to verify these new constraints.
+                */
+               CloneForeignKeyConstraints(parentId, relationId, NULL);
+
                heap_close(parent, NoLock);
        }
 
@@ -7025,7 +7031,9 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
                                                                                 RelationGetNamespace(rel),
                                                                                 NIL);
 
-                       address = ATAddForeignKeyConstraint(tab, rel, newConstraint,
+                       address = ATAddForeignKeyConstraint(wqueue, tab, rel,
+                                                                                               newConstraint, InvalidOid,
+                                                                                               recurse, false,
                                                                                                lockmode);
                        break;
 
@@ -7180,8 +7188,9 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
  * We do permissions checks here, however.
  */
 static ObjectAddress
-ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
-                                                 Constraint *fkconstraint, LOCKMODE lockmode)
+ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
+                                                 Constraint *fkconstraint, Oid parentConstr,
+                                                 bool recurse, bool recursing, LOCKMODE lockmode)
 {
        Relation        pkrel;
        int16           pkattnum[INDEX_MAX_KEYS];
@@ -7220,6 +7229,21 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
                                 errmsg("cannot reference partitioned table \"%s\"",
                                                RelationGetRelationName(pkrel))));
 
+       if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+       {
+               if (!recurse)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                        errmsg("foreign key referencing partitioned table \"%s\" must not be ONLY",
+                                                       RelationGetRelationName(pkrel))));
+               if (fkconstraint->skip_validation && !fkconstraint->initially_valid)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                        errmsg("cannot add NOT VALID foreign key to relation \"%s\"",
+                                                       RelationGetRelationName(pkrel)),
+                                        errdetail("This feature is not yet supported on partitioned tables.")));
+       }
+
        if (pkrel->rd_rel->relkind != RELKIND_RELATION)
                ereport(ERROR,
                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
@@ -7527,7 +7551,7 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
                                                                          fkconstraint->deferrable,
                                                                          fkconstraint->initdeferred,
                                                                          fkconstraint->initially_valid,
-                                                                         InvalidOid,   /* no parent constraint */
+                                                                         parentConstr,
                                                                          RelationGetRelid(rel),
                                                                          fkattnum,
                                                                          numfks,
@@ -7553,10 +7577,12 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
        ObjectAddressSet(address, ConstraintRelationId, constrOid);
 
        /*
-        * Create the triggers that will enforce the constraint.
+        * Create the triggers that will enforce the constraint.  We only want
+        * the action triggers to appear for the parent partitioned relation,
+        * even though the constraints also exist below.
         */
        createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint,
-                                                        constrOid, indexOid);
+                                                        constrOid, indexOid, !recursing);
 
        /*
         * Tell Phase 3 to check that the constraint is satisfied by existing
@@ -7580,6 +7606,40 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
                tab->constraints = lappend(tab->constraints, newcon);
        }
 
+       /*
+        * When called on a partitioned table, recurse to create the constraint on
+        * the partitions also.
+        */
+       if (recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+       {
+               PartitionDesc partdesc;
+
+               partdesc = RelationGetPartitionDesc(rel);
+
+               for (i = 0; i < partdesc->nparts; i++)
+               {
+                       Oid                     partitionId = partdesc->oids[i];
+                       Relation        partition = heap_open(partitionId, lockmode);
+                       AlteredTableInfo *childtab;
+                       ObjectAddress childAddr;
+
+                       CheckTableNotInUse(partition, "ALTER TABLE");
+
+                       /* Find or create work queue entry for this table */
+                       childtab = ATGetQueueEntry(wqueue, partition);
+
+                       childAddr =
+                               ATAddForeignKeyConstraint(wqueue, childtab, partition,
+                                                                                 fkconstraint, constrOid,
+                                                                                 recurse, true, lockmode);
+
+                       /* Record this constraint as dependent on the parent one */
+                       recordDependencyOn(&childAddr, &address, DEPENDENCY_INTERNAL_AUTO);
+
+                       heap_close(partition, NoLock);
+               }
+       }
+
        /*
         * Close pk table, but keep lock until we've committed.
         */
@@ -7842,8 +7902,8 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
                        heap_close(refrel, NoLock);
 
                        /*
-                        * Foreign keys do not inherit, so we purposely ignore the
-                        * recursion bit here
+                        * We disallow creating invalid foreign keys to or from
+                        * partitioned tables, so ignoring the recursion bit is okay.
                         */
                }
                else if (con->contype == CONSTRAINT_CHECK)
@@ -8489,23 +8549,16 @@ CreateFKCheckTrigger(Oid myRelOid, Oid refRelOid, Constraint *fkconstraint,
 }
 
 /*
- * Create the triggers that implement an FK constraint.
- *
- * NB: if you change any trigger properties here, see also
- * ATExecAlterConstraint.
+ * createForeignKeyActionTriggers
+ *             Create the referenced-side "action" triggers that implement a foreign
+ *             key.
  */
 static void
-createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
-                                                Oid constraintOid, Oid indexOid)
+createForeignKeyActionTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
+                                                          Oid constraintOid, Oid indexOid)
 {
-       Oid                     myRelOid;
        CreateTrigStmt *fk_trigger;
 
-       myRelOid = RelationGetRelid(rel);
-
-       /* Make changes-so-far visible */
-       CommandCounterIncrement();
-
        /*
         * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
         * DELETE action on the referenced table.
@@ -8555,7 +8608,8 @@ createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
        }
        fk_trigger->args = NIL;
 
-       (void) CreateTrigger(fk_trigger, NULL, refRelOid, myRelOid, constraintOid,
+       (void) CreateTrigger(fk_trigger, NULL, refRelOid, RelationGetRelid(rel),
+                                                constraintOid,
                                                 indexOid, InvalidOid, InvalidOid, NULL, true, false);
 
        /* Make changes-so-far visible */
@@ -8610,22 +8664,58 @@ createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
        }
        fk_trigger->args = NIL;
 
-       (void) CreateTrigger(fk_trigger, NULL, refRelOid, myRelOid, constraintOid,
+       (void) CreateTrigger(fk_trigger, NULL, refRelOid, RelationGetRelid(rel),
+                                                constraintOid,
                                                 indexOid, InvalidOid, InvalidOid, NULL, true, false);
+}
 
-       /* Make changes-so-far visible */
-       CommandCounterIncrement();
-
-       /*
-        * Build and execute CREATE CONSTRAINT TRIGGER statements for the CHECK
-        * action for both INSERTs and UPDATEs on the referencing table.
-        */
+/*
+ * createForeignKeyCheckTriggers
+ *             Create the referencing-side "check" triggers that implement a foreign
+ *             key.
+ */
+static void
+createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
+                                                         Constraint *fkconstraint, Oid constraintOid,
+                                                         Oid indexOid)
+{
        CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, constraintOid,
                                                 indexOid, true);
        CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, constraintOid,
                                                 indexOid, false);
 }
 
+/*
+ * Create the triggers that implement an FK constraint.
+ *
+ * NB: if you change any trigger properties here, see also
+ * ATExecAlterConstraint.
+ */
+void
+createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
+                                                Oid constraintOid, Oid indexOid, bool create_action)
+{
+       /*
+        * For the referenced side, create action triggers, if requested.  (If the
+        * referencing side is partitioned, there is still only one trigger, which
+        * runs on the referenced side and points to the top of the referencing
+        * hierarchy.)
+        */
+       if (create_action)
+               createForeignKeyActionTriggers(rel, refRelOid, fkconstraint, constraintOid,
+                                                                          indexOid);
+
+       /*
+        * For the referencing side, create the check triggers.  We only need these
+        * on the partitions.
+        */
+       if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+               createForeignKeyCheckTriggers(RelationGetRelid(rel), refRelOid,
+                                                                         fkconstraint, constraintOid, indexOid);
+
+       CommandCounterIncrement();
+}
+
 /*
  * ALTER TABLE DROP CONSTRAINT
  *
@@ -13889,6 +13979,8 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
        bool            found_whole_row;
        Oid                     defaultPartOid;
        List       *partBoundConstraint;
+       List       *cloned;
+       ListCell   *l;
 
        /*
         * We must lock the default partition, because attaching a new partition
@@ -14071,6 +14163,35 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
        /* and triggers */
        CloneRowTriggersToPartition(rel, attachrel);
 
+       /*
+        * Clone foreign key constraints, and setup for Phase 3 to verify them.
+        */
+       cloned = NIL;
+       CloneForeignKeyConstraints(RelationGetRelid(rel),
+                                                          RelationGetRelid(attachrel), &cloned);
+       foreach(l, cloned)
+       {
+               ClonedConstraint *cloned = lfirst(l);
+               NewConstraint *newcon;
+               Relation        clonedrel;
+               AlteredTableInfo *parttab;
+
+               clonedrel = relation_open(cloned->relid, NoLock);
+               parttab = ATGetQueueEntry(wqueue, clonedrel);
+
+               newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+               newcon->name = cloned->constraint->conname;
+               newcon->contype = CONSTR_FOREIGN;
+               newcon->refrelid = cloned->refrelid;
+               newcon->refindid = cloned->conindid;
+               newcon->conid = cloned->conid;
+               newcon->qual = (Node *) cloned->constraint;
+
+               parttab->constraints = lappend(parttab->constraints, newcon);
+
+               relation_close(clonedrel, NoLock);
+       }
+
        /*
         * Generate partition constraint from the partition bound specification.
         * If the parent itself is a partition, make sure to include its
index 0fd14f43c6beefec7a1b4e74c857fa9b6ecbd606..513a5dda262398f98493bdd6bd3203b3344330dc 100644 (file)
@@ -749,12 +749,6 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column)
                                                         errmsg("foreign key constraints are not supported on foreign tables"),
                                                         parser_errposition(cxt->pstate,
                                                                                                constraint->location)));
-                               if (cxt->ispartitioned)
-                                       ereport(ERROR,
-                                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                                        errmsg("foreign key constraints are not supported on partitioned tables"),
-                                                        parser_errposition(cxt->pstate,
-                                                                                               constraint->location)));
 
                                /*
                                 * Fill in the current attribute's name and throw it into the
@@ -868,12 +862,6 @@ transformTableConstraint(CreateStmtContext *cxt, Constraint *constraint)
                                                 errmsg("foreign key constraints are not supported on foreign tables"),
                                                 parser_errposition(cxt->pstate,
                                                                                        constraint->location)));
-                       if (cxt->ispartitioned)
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                                errmsg("foreign key constraints are not supported on partitioned tables"),
-                                                parser_errposition(cxt->pstate,
-                                                                                       constraint->location)));
                        cxt->fkconstraints = lappend(cxt->fkconstraints, constraint);
                        break;
 
index d0fe65cea9b8f91cce50649cfc960b17e0255555..90ddbe55167ea3bafad33106c51264b028370af0 100644 (file)
@@ -788,20 +788,23 @@ ri_restrict(TriggerData *trigdata, bool is_no_action)
                                char            paramname[16];
                                const char *querysep;
                                Oid                     queryoids[RI_MAX_NUMKEYS];
+                               const char *fk_only;
                                int                     i;
 
                                /* ----------
                                 * The query string built is
-                                *      SELECT 1 FROM ONLY <fktable> x WHERE $1 = fkatt1 [AND ...]
+                                *      SELECT 1 FROM [ONLY] <fktable> x WHERE $1 = fkatt1 [AND ...]
                                 *                 FOR KEY SHARE OF x
                                 * The type id's for the $ parameters are those of the
                                 * corresponding PK attributes.
                                 * ----------
                                 */
                                initStringInfo(&querybuf);
+                               fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+                                       "" : "ONLY ";
                                quoteRelationName(fkrelname, fk_rel);
-                               appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x",
-                                                                fkrelname);
+                               appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+                                                                fk_only, fkrelname);
                                querysep = "WHERE";
                                for (i = 0; i < riinfo->nkeys; i++)
                                {
@@ -947,17 +950,21 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS)
                                char            paramname[16];
                                const char *querysep;
                                Oid                     queryoids[RI_MAX_NUMKEYS];
+                               const char *fk_only;
 
                                /* ----------
                                 * The query string built is
-                                *      DELETE FROM ONLY <fktable> WHERE $1 = fkatt1 [AND ...]
+                                *      DELETE FROM [ONLY] <fktable> WHERE $1 = fkatt1 [AND ...]
                                 * The type id's for the $ parameters are those of the
                                 * corresponding PK attributes.
                                 * ----------
                                 */
                                initStringInfo(&querybuf);
+                               fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+                                       "" : "ONLY ";
                                quoteRelationName(fkrelname, fk_rel);
-                               appendStringInfo(&querybuf, "DELETE FROM ONLY %s", fkrelname);
+                               appendStringInfo(&querybuf, "DELETE FROM %s%s",
+                                                                fk_only, fkrelname);
                                querysep = "WHERE";
                                for (i = 0; i < riinfo->nkeys; i++)
                                {
@@ -1118,10 +1125,11 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
                                const char *querysep;
                                const char *qualsep;
                                Oid                     queryoids[RI_MAX_NUMKEYS * 2];
+                               const char *fk_only;
 
                                /* ----------
                                 * The query string built is
-                                *      UPDATE ONLY <fktable> SET fkatt1 = $1 [, ...]
+                                *      UPDATE [ONLY] <fktable> SET fkatt1 = $1 [, ...]
                                 *                      WHERE $n = fkatt1 [AND ...]
                                 * The type id's for the $ parameters are those of the
                                 * corresponding PK attributes.  Note that we are assuming
@@ -1131,8 +1139,11 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
                                 */
                                initStringInfo(&querybuf);
                                initStringInfo(&qualbuf);
+                               fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+                                       "" : "ONLY ";
                                quoteRelationName(fkrelname, fk_rel);
-                               appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname);
+                               appendStringInfo(&querybuf, "UPDATE %s%s SET",
+                                                                fk_only, fkrelname);
                                querysep = "";
                                qualsep = "WHERE";
                                for (i = 0, j = riinfo->nkeys; i < riinfo->nkeys; i++, j++)
@@ -1337,11 +1348,12 @@ ri_setnull(TriggerData *trigdata)
                                char            paramname[16];
                                const char *querysep;
                                const char *qualsep;
+                               const char *fk_only;
                                Oid                     queryoids[RI_MAX_NUMKEYS];
 
                                /* ----------
                                 * The query string built is
-                                *      UPDATE ONLY <fktable> SET fkatt1 = NULL [, ...]
+                                *      UPDATE [ONLY] <fktable> SET fkatt1 = NULL [, ...]
                                 *                      WHERE $1 = fkatt1 [AND ...]
                                 * The type id's for the $ parameters are those of the
                                 * corresponding PK attributes.
@@ -1349,8 +1361,11 @@ ri_setnull(TriggerData *trigdata)
                                 */
                                initStringInfo(&querybuf);
                                initStringInfo(&qualbuf);
+                               fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+                                       "" : "ONLY ";
                                quoteRelationName(fkrelname, fk_rel);
-                               appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname);
+                               appendStringInfo(&querybuf, "UPDATE %s%s SET",
+                                                                fk_only, fkrelname);
                                querysep = "";
                                qualsep = "WHERE";
                                for (i = 0; i < riinfo->nkeys; i++)
@@ -1554,11 +1569,12 @@ ri_setdefault(TriggerData *trigdata)
                                const char *querysep;
                                const char *qualsep;
                                Oid                     queryoids[RI_MAX_NUMKEYS];
+                               const char *fk_only;
                                int                     i;
 
                                /* ----------
                                 * The query string built is
-                                *      UPDATE ONLY <fktable> SET fkatt1 = DEFAULT [, ...]
+                                *      UPDATE [ONLY] <fktable> SET fkatt1 = DEFAULT [, ...]
                                 *                      WHERE $1 = fkatt1 [AND ...]
                                 * The type id's for the $ parameters are those of the
                                 * corresponding PK attributes.
@@ -1567,7 +1583,10 @@ ri_setdefault(TriggerData *trigdata)
                                initStringInfo(&querybuf);
                                initStringInfo(&qualbuf);
                                quoteRelationName(fkrelname, fk_rel);
-                               appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname);
+                               fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+                                       "" : "ONLY ";
+                               appendStringInfo(&querybuf, "UPDATE %s%s SET",
+                                                                fk_only, fkrelname);
                                querysep = "";
                                qualsep = "WHERE";
                                for (i = 0; i < riinfo->nkeys; i++)
@@ -1838,6 +1857,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
        RangeTblEntry *pkrte;
        RangeTblEntry *fkrte;
        const char *sep;
+       const char *fk_only;
        int                     i;
        int                     save_nestlevel;
        char            workmembuf[32];
@@ -1894,8 +1914,8 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 
        /*----------
         * The query string built is:
-        *      SELECT fk.keycols FROM ONLY relname fk
-        *       LEFT OUTER JOIN ONLY pkrelname pk
+        *      SELECT fk.keycols FROM [ONLY] relname fk
+        *       LEFT OUTER JOIN pkrelname pk
         *       ON (pk.pkkeycol1=fk.keycol1 [AND ...])
         *       WHERE pk.pkkeycol1 IS NULL AND
         * For MATCH SIMPLE:
@@ -1920,9 +1940,11 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 
        quoteRelationName(pkrelname, pk_rel);
        quoteRelationName(fkrelname, fk_rel);
+       fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+               "" : "ONLY ";
        appendStringInfo(&querybuf,
-                                        " FROM ONLY %s fk LEFT OUTER JOIN ONLY %s pk ON",
-                                        fkrelname, pkrelname);
+                                        " FROM %s%s fk LEFT OUTER JOIN %s pk ON",
+                                        fk_only, fkrelname, pkrelname);
 
        strcpy(pkattname, "pk.");
        strcpy(fkattname, "fk.");
@@ -2298,13 +2320,6 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk)
                        elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"",
                                 trigger->tgname, RelationGetRelationName(trig_rel));
        }
-       else
-       {
-               if (riinfo->fk_relid != RelationGetRelid(trig_rel) ||
-                       riinfo->pk_relid != trigger->tgconstrrelid)
-                       elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"",
-                                trigger->tgname, RelationGetRelationName(trig_rel));
-       }
 
        return riinfo;
 }
index 64cde3266bf60b894888ca3da712cae96ec9a8ff..d066f4f00b61ea222524621cccefb645028f7d07 100644 (file)
@@ -7116,13 +7116,23 @@ getConstraints(Archive *fout, TableInfo tblinfo[], int numTables)
                                          tbinfo->dobj.name);
 
                resetPQExpBuffer(query);
-               appendPQExpBuffer(query,
-                                                 "SELECT tableoid, oid, conname, confrelid, "
-                                                 "pg_catalog.pg_get_constraintdef(oid) AS condef "
-                                                 "FROM pg_catalog.pg_constraint "
-                                                 "WHERE conrelid = '%u'::pg_catalog.oid "
-                                                 "AND contype = 'f'",
-                                                 tbinfo->dobj.catId.oid);
+               if (fout->remoteVersion >= 110000)
+                       appendPQExpBuffer(query,
+                                                         "SELECT tableoid, oid, conname, confrelid, "
+                                                         "pg_catalog.pg_get_constraintdef(oid) AS condef "
+                                                         "FROM pg_catalog.pg_constraint "
+                                                         "WHERE conrelid = '%u'::pg_catalog.oid "
+                                                         "AND conparentid = 0 "
+                                                         "AND contype = 'f'",
+                                                         tbinfo->dobj.catId.oid);
+               else
+                       appendPQExpBuffer(query,
+                                                         "SELECT tableoid, oid, conname, confrelid, "
+                                                         "pg_catalog.pg_get_constraintdef(oid) AS condef "
+                                                         "FROM pg_catalog.pg_constraint "
+                                                         "WHERE conrelid = '%u'::pg_catalog.oid "
+                                                         "AND contype = 'f'",
+                                                         tbinfo->dobj.catId.oid);
                res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
 
                ntups = PQntuples(res);
@@ -16374,18 +16384,28 @@ dumpConstraint(Archive *fout, ConstraintInfo *coninfo)
        }
        else if (coninfo->contype == 'f')
        {
+               char *only;
+
+               /*
+                * Foreign keys on partitioned tables are always declared as inheriting
+                * to partitions; for all other cases, emit them as applying ONLY
+                * directly to the named table, because that's how they work for
+                * regular inherited tables.
+                */
+               only = tbinfo->relkind == RELKIND_PARTITIONED_TABLE ? "" : "ONLY ";
+
                /*
                 * XXX Potentially wrap in a 'SET CONSTRAINTS OFF' block so that the
                 * current table data is not processed
                 */
-               appendPQExpBuffer(q, "ALTER TABLE ONLY %s\n",
-                                                 fmtQualifiedDumpable(tbinfo));
+               appendPQExpBuffer(q, "ALTER TABLE %s%s\n",
+                                                 only, fmtQualifiedDumpable(tbinfo));
                appendPQExpBuffer(q, "    ADD CONSTRAINT %s %s;\n",
                                                  fmtId(coninfo->dobj.name),
                                                  coninfo->condef);
 
-               appendPQExpBuffer(delq, "ALTER TABLE ONLY %s ",
-                                                 fmtQualifiedDumpable(tbinfo));
+               appendPQExpBuffer(delq, "ALTER TABLE %s%s ",
+                                                 only, fmtQualifiedDumpable(tbinfo));
                appendPQExpBuffer(delq, "DROP CONSTRAINT %s;\n",
                                                  fmtId(coninfo->dobj.name));
 
index 06a2362003ebbcf0fa8f69485d113f1826bd6767..0170e08c450622197c748dda7ce9ee24bf451b20 100644 (file)
@@ -27,6 +27,19 @@ typedef enum ConstraintCategory
        CONSTRAINT_ASSERTION            /* for future expansion */
 } ConstraintCategory;
 
+/*
+ * Used when cloning a foreign key constraint to a partition, so that the
+ * caller can optionally set up a verification pass for it.
+ */
+typedef struct ClonedConstraint
+{
+       Oid             relid;
+       Oid             refrelid;
+       Oid             conindid;
+       Oid             conid;
+       Constraint *constraint;
+} ClonedConstraint;
+
 extern Oid CreateConstraintEntry(const char *constraintName,
                                          Oid constraintNamespace,
                                          char constraintType,
@@ -57,6 +70,9 @@ extern Oid CreateConstraintEntry(const char *constraintName,
                                          bool conNoInherit,
                                          bool is_internal);
 
+extern void CloneForeignKeyConstraints(Oid parentId, Oid relationId,
+                                                  List **cloned);
+
 extern void RemoveConstraintById(Oid conId);
 extern void RenameConstraintById(Oid conId, const char *newname);
 
index 06e5180a305c2c187db1333347465857e4c6762c..04a961d3835c06748e2034f585396fa915fb660b 100644 (file)
@@ -74,6 +74,10 @@ extern void find_composite_type_dependencies(Oid typeOid,
 
 extern void check_of_type(HeapTuple typetuple);
 
+extern void createForeignKeyTriggers(Relation rel, Oid refRelOid,
+                                                Constraint *fkconstraint, Oid constraintOid,
+                                                Oid indexOid, bool create_action);
+
 extern void register_on_commit_action(Oid relid, OnCommitAction action);
 extern void remove_on_commit_action(Oid relid);
 
index a80d16a3946f298def55a84309800cd071e060cd..1d6cc9ca6cdf1d481e18f50e648b7e8d0c6836bb 100644 (file)
@@ -3305,10 +3305,6 @@ CREATE TABLE partitioned (
        a int,
        b int
 ) PARTITION BY RANGE (a, (a+b+1));
-ALTER TABLE partitioned ADD FOREIGN KEY (a) REFERENCES blah;
-ERROR:  foreign key constraints are not supported on partitioned tables
-LINE 1: ALTER TABLE partitioned ADD FOREIGN KEY (a) REFERENCES blah;
-                                    ^
 ALTER TABLE partitioned ADD EXCLUDE USING gist (a WITH &&);
 ERROR:  exclusion constraints are not supported on partitioned tables
 LINE 1: ALTER TABLE partitioned ADD EXCLUDE USING gist (a WITH &&);
index 39a963888dcdefcdc33881ab9b7b77b9a657ffc2..e7244390372b9c15c98ba7130c22e8bbdf1a3f5a 100644 (file)
@@ -281,16 +281,6 @@ CREATE TABLE partitioned (
 ) PARTITION BY LIST (a1, a2);  -- fail
 ERROR:  cannot use "list" partition strategy with more than one column
 -- unsupported constraint type for partitioned tables
-CREATE TABLE pkrel (
-       a int PRIMARY KEY
-);
-CREATE TABLE partitioned (
-       a int REFERENCES pkrel(a)
-) PARTITION BY RANGE (a);
-ERROR:  foreign key constraints are not supported on partitioned tables
-LINE 2:  a int REFERENCES pkrel(a)
-               ^
-DROP TABLE pkrel;
 CREATE TABLE partitioned (
        a int,
        EXCLUDE USING gist (a WITH &&)
index e1077824663e1969924856bdce9f6917aa25cf74..b90c4926e29b23ee3b516e3d10628de36797186d 100644 (file)
@@ -1428,3 +1428,214 @@ alter table fktable2 drop constraint fktable2_f1_fkey;
 ERROR:  cannot ALTER TABLE "pktable2" because it has pending trigger events
 commit;
 drop table pktable2, fktable2;
+--
+-- Foreign keys and partitioned tables
+--
+-- partitioned table in the referenced side are not allowed
+CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b))
+  PARTITION BY RANGE (a, b);
+-- verify with create table first ...
+CREATE TABLE fk_notpartitioned_fk (a int, b int,
+  FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk);
+ERROR:  cannot reference partitioned table "fk_partitioned_pk"
+-- and then with alter table.
+CREATE TABLE fk_notpartitioned_fk_2 (a int, b int);
+ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b)
+  REFERENCES fk_partitioned_pk;
+ERROR:  cannot reference partitioned table "fk_partitioned_pk"
+DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2;
+-- Creation of a partitioned hierarchy with irregular definitions
+CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int,
+  PRIMARY KEY (a, b));
+ALTER TABLE fk_notpartitioned_pk DROP COLUMN fdrop1, DROP COLUMN fdrop2;
+CREATE TABLE fk_partitioned_fk (b int, fdrop1 int, a int) PARTITION BY RANGE (a, b);
+ALTER TABLE fk_partitioned_fk DROP COLUMN fdrop1;
+CREATE TABLE fk_partitioned_fk_1 (fdrop1 int, fdrop2 int, a int, fdrop3 int, b int);
+ALTER TABLE fk_partitioned_fk_1 DROP COLUMN fdrop1, DROP COLUMN fdrop2, DROP COLUMN fdrop3;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_1 FOR VALUES FROM (0,0) TO (1000,1000);
+ALTER TABLE fk_partitioned_fk ADD FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk;
+CREATE TABLE fk_partitioned_fk_2 (b int, fdrop1 int, fdrop2 int, a int);
+ALTER TABLE fk_partitioned_fk_2 DROP COLUMN fdrop1, DROP COLUMN fdrop2;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES FROM (1000,1000) TO (2000,2000);
+CREATE TABLE fk_partitioned_fk_3 (fdrop1 int, fdrop2 int, fdrop3 int, fdrop4 int, b int, a int)
+  PARTITION BY HASH (a);
+ALTER TABLE fk_partitioned_fk_3 DROP COLUMN fdrop1, DROP COLUMN fdrop2,
+       DROP COLUMN fdrop3, DROP COLUMN fdrop4;
+CREATE TABLE fk_partitioned_fk_3_0 PARTITION OF fk_partitioned_fk_3 FOR VALUES WITH (MODULUS 5, REMAINDER 0);
+CREATE TABLE fk_partitioned_fk_3_1 PARTITION OF fk_partitioned_fk_3 FOR VALUES WITH (MODULUS 5, REMAINDER 1);
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_3
+  FOR VALUES FROM (2000,2000) TO (3000,3000);
+-- these inserts, targetting both the partition directly as well as the
+-- partitioned table, should all fail
+INSERT INTO fk_partitioned_fk (a,b) VALUES (500, 501);
+ERROR:  insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+DETAIL:  Key (a, b)=(500, 501) is not present in table "fk_notpartitioned_pk".
+INSERT INTO fk_partitioned_fk_1 (a,b) VALUES (500, 501);
+ERROR:  insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+DETAIL:  Key (a, b)=(500, 501) is not present in table "fk_notpartitioned_pk".
+INSERT INTO fk_partitioned_fk (a,b) VALUES (1500, 1501);
+ERROR:  insert or update on table "fk_partitioned_fk_2" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+DETAIL:  Key (a, b)=(1500, 1501) is not present in table "fk_notpartitioned_pk".
+INSERT INTO fk_partitioned_fk_2 (a,b) VALUES (1500, 1501);
+ERROR:  insert or update on table "fk_partitioned_fk_2" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+DETAIL:  Key (a, b)=(1500, 1501) is not present in table "fk_notpartitioned_pk".
+INSERT INTO fk_partitioned_fk (a,b) VALUES (2500, 2502);
+ERROR:  insert or update on table "fk_partitioned_fk_3_1" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+DETAIL:  Key (a, b)=(2500, 2502) is not present in table "fk_notpartitioned_pk".
+INSERT INTO fk_partitioned_fk_3 (a,b) VALUES (2500, 2502);
+ERROR:  insert or update on table "fk_partitioned_fk_3_1" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+DETAIL:  Key (a, b)=(2500, 2502) is not present in table "fk_notpartitioned_pk".
+INSERT INTO fk_partitioned_fk (a,b) VALUES (2501, 2503);
+ERROR:  insert or update on table "fk_partitioned_fk_3_0" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+DETAIL:  Key (a, b)=(2501, 2503) is not present in table "fk_notpartitioned_pk".
+INSERT INTO fk_partitioned_fk_3 (a,b) VALUES (2501, 2503);
+ERROR:  insert or update on table "fk_partitioned_fk_3_0" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+DETAIL:  Key (a, b)=(2501, 2503) is not present in table "fk_notpartitioned_pk".
+-- but if we insert the values that make them valid, then they work
+INSERT INTO fk_notpartitioned_pk VALUES (500, 501), (1500, 1501),
+  (2500, 2502), (2501, 2503);
+INSERT INTO fk_partitioned_fk (a,b) VALUES (500, 501);
+INSERT INTO fk_partitioned_fk (a,b) VALUES (1500, 1501);
+INSERT INTO fk_partitioned_fk (a,b) VALUES (2500, 2502);
+INSERT INTO fk_partitioned_fk (a,b) VALUES (2501, 2503);
+-- this update fails because there is no referenced row
+UPDATE fk_partitioned_fk SET a = a + 1 WHERE a = 2501;
+ERROR:  insert or update on table "fk_partitioned_fk_3_1" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+DETAIL:  Key (a, b)=(2502, 2503) is not present in table "fk_notpartitioned_pk".
+-- but we can fix it thusly:
+INSERT INTO fk_notpartitioned_pk (a,b) VALUES (2502, 2503);
+UPDATE fk_partitioned_fk SET a = a + 1 WHERE a = 2501;
+-- these updates would leave lingering rows in the referencing table; disallow
+UPDATE fk_notpartitioned_pk SET b = 502 WHERE a = 500;
+ERROR:  update or delete on table "fk_notpartitioned_pk" violates foreign key constraint "fk_partitioned_fk_a_fkey" on table "fk_partitioned_fk"
+DETAIL:  Key (a, b)=(500, 501) is still referenced from table "fk_partitioned_fk".
+UPDATE fk_notpartitioned_pk SET b = 1502 WHERE a = 1500;
+ERROR:  update or delete on table "fk_notpartitioned_pk" violates foreign key constraint "fk_partitioned_fk_a_fkey" on table "fk_partitioned_fk"
+DETAIL:  Key (a, b)=(1500, 1501) is still referenced from table "fk_partitioned_fk".
+UPDATE fk_notpartitioned_pk SET b = 2504 WHERE a = 2500;
+ERROR:  update or delete on table "fk_notpartitioned_pk" violates foreign key constraint "fk_partitioned_fk_a_fkey" on table "fk_partitioned_fk"
+DETAIL:  Key (a, b)=(2500, 2502) is still referenced from table "fk_partitioned_fk".
+ALTER TABLE fk_partitioned_fk DROP CONSTRAINT fk_partitioned_fk_a_fkey;
+-- done.
+DROP TABLE fk_notpartitioned_pk, fk_partitioned_fk;
+-- Test some other exotic foreign key features: MATCH SIMPLE, ON UPDATE/DELETE
+-- actions
+CREATE TABLE fk_notpartitioned_pk (a int, b int, primary key (a, b));
+CREATE TABLE fk_partitioned_fk (a int default 2501, b int default 142857) PARTITION BY LIST (a);
+CREATE TABLE fk_partitioned_fk_1 PARTITION OF fk_partitioned_fk FOR VALUES IN (NULL,500,501,502);
+ALTER TABLE fk_partitioned_fk ADD FOREIGN KEY (a, b)
+  REFERENCES fk_notpartitioned_pk MATCH SIMPLE
+  ON DELETE SET NULL ON UPDATE SET NULL;
+CREATE TABLE fk_partitioned_fk_2 PARTITION OF fk_partitioned_fk FOR VALUES IN (1500,1502);
+CREATE TABLE fk_partitioned_fk_3 (a int, b int);
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_3 FOR VALUES IN (2500,2501,2502,2503);
+-- this insert fails
+INSERT INTO fk_partitioned_fk (a, b) VALUES (2502, 2503);
+ERROR:  insert or update on table "fk_partitioned_fk_3" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+DETAIL:  Key (a, b)=(2502, 2503) is not present in table "fk_notpartitioned_pk".
+INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, 2503);
+ERROR:  insert or update on table "fk_partitioned_fk_3" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+DETAIL:  Key (a, b)=(2502, 2503) is not present in table "fk_notpartitioned_pk".
+-- but since the FK is MATCH SIMPLE, this one doesn't
+INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, NULL);
+-- now create the referenced row ...
+INSERT INTO fk_notpartitioned_pk VALUES (2502, 2503);
+--- and now the same insert work
+INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, 2503);
+-- this always works
+INSERT INTO fk_partitioned_fk (a,b) VALUES (NULL, NULL);
+-- ON UPDATE SET NULL
+SELECT tableoid::regclass, a, b FROM fk_partitioned_fk WHERE b IS NULL ORDER BY a;
+      tableoid       |  a   | b 
+---------------------+------+---
+ fk_partitioned_fk_3 | 2502 |  
+ fk_partitioned_fk_1 |      |  
+(2 rows)
+
+UPDATE fk_notpartitioned_pk SET a = a + 1 WHERE a = 2502;
+SELECT tableoid::regclass, a, b FROM fk_partitioned_fk WHERE b IS NULL ORDER BY a;
+      tableoid       |  a   | b 
+---------------------+------+---
+ fk_partitioned_fk_3 | 2502 |  
+ fk_partitioned_fk_1 |      |  
+ fk_partitioned_fk_1 |      |  
+(3 rows)
+
+-- ON DELETE SET NULL
+INSERT INTO fk_partitioned_fk VALUES (2503, 2503);
+SELECT count(*) FROM fk_partitioned_fk WHERE a IS NULL;
+ count 
+-------
+     2
+(1 row)
+
+DELETE FROM fk_notpartitioned_pk;
+SELECT count(*) FROM fk_partitioned_fk WHERE a IS NULL;
+ count 
+-------
+     3
+(1 row)
+
+-- ON UPDATE/DELETE SET DEFAULT
+ALTER TABLE fk_partitioned_fk DROP CONSTRAINT fk_partitioned_fk_a_fkey;
+ALTER TABLE fk_partitioned_fk ADD FOREIGN KEY (a, b)
+  REFERENCES fk_notpartitioned_pk
+  ON DELETE SET DEFAULT ON UPDATE SET DEFAULT;
+INSERT INTO fk_notpartitioned_pk VALUES (2502, 2503);
+INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, 2503);
+-- this fails, because the defaults for the referencing table are not present
+-- in the referenced table:
+UPDATE fk_notpartitioned_pk SET a = 1500 WHERE a = 2502;
+ERROR:  insert or update on table "fk_partitioned_fk_3" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+DETAIL:  Key (a, b)=(2501, 142857) is not present in table "fk_notpartitioned_pk".
+-- but inserting the row we can make it work:
+INSERT INTO fk_notpartitioned_pk VALUES (2501, 142857);
+UPDATE fk_notpartitioned_pk SET a = 1500 WHERE a = 2502;
+SELECT * FROM fk_partitioned_fk WHERE b = 142857;
+  a   |   b    
+------+--------
+ 2501 | 142857
+(1 row)
+
+-- ON UPDATE/DELETE CASCADE
+ALTER TABLE fk_partitioned_fk DROP CONSTRAINT fk_partitioned_fk_a_fkey;
+ALTER TABLE fk_partitioned_fk ADD FOREIGN KEY (a, b)
+  REFERENCES fk_notpartitioned_pk
+  ON DELETE CASCADE ON UPDATE CASCADE;
+UPDATE fk_notpartitioned_pk SET a = 2502 WHERE a = 2501;
+SELECT * FROM fk_partitioned_fk WHERE b = 142857;
+  a   |   b    
+------+--------
+ 2502 | 142857
+(1 row)
+
+-- Now you see it ...
+SELECT * FROM fk_partitioned_fk WHERE b = 142857;
+  a   |   b    
+------+--------
+ 2502 | 142857
+(1 row)
+
+DELETE FROM fk_notpartitioned_pk WHERE b = 142857;
+-- now you don't.
+SELECT * FROM fk_partitioned_fk WHERE a = 142857;
+ a | b 
+---+---
+(0 rows)
+
+-- verify that DROP works
+DROP TABLE fk_partitioned_fk_2;
+-- verify that attaching a table checks that the existing data satisfies the
+-- constraint
+CREATE TABLE fk_partitioned_fk_2 (a int, b int) PARTITION BY RANGE (b);
+CREATE TABLE fk_partitioned_fk_2_1 PARTITION OF fk_partitioned_fk_2 FOR VALUES FROM (0) TO (1000);
+CREATE TABLE fk_partitioned_fk_2_2 PARTITION OF fk_partitioned_fk_2 FOR VALUES FROM (1000) TO (2000);
+INSERT INTO fk_partitioned_fk_2 VALUES (1600, 601), (1600, 1601);
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
+  FOR VALUES IN (1600);
+ERROR:  insert or update on table "fk_partitioned_fk_2" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+DETAIL:  Key (a, b)=(1600, 601) is not present in table "fk_notpartitioned_pk".
+INSERT INTO fk_notpartitioned_pk VALUES (1600, 601), (1600, 1601);
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
+  FOR VALUES IN (1600);
+-- leave these tables around intentionally
index f56151fc1e7dd2bace7c6308ebf3fedaf67c89a3..5e57b9a4658b60432f989f1d38b700bfb857da40 100644 (file)
@@ -1209,6 +1209,31 @@ Inherits: test_foreign_constraints
 DROP TABLE test_foreign_constraints_inh;
 DROP TABLE test_foreign_constraints;
 DROP TABLE test_primary_constraints;
+-- Test foreign key behavior
+create table inh_fk_1 (a int primary key);
+insert into inh_fk_1 values (1), (2), (3);
+create table inh_fk_2 (x int primary key, y int references inh_fk_1 on delete cascade);
+insert into inh_fk_2 values (11, 1), (22, 2), (33, 3);
+create table inh_fk_2_child () inherits (inh_fk_2);
+insert into inh_fk_2_child values (111, 1), (222, 2);
+delete from inh_fk_1 where a = 1;
+select * from inh_fk_1 order by 1;
+ a 
+---
+ 2
+ 3
+(2 rows)
+
+select * from inh_fk_2 order by 1, 2;
+  x  | y 
+-----+---
+  22 | 2
+  33 | 3
+ 111 | 1
+ 222 | 2
+(4 rows)
+
+drop table inh_fk_1, inh_fk_2, inh_fk_2_child;
 -- Test that parent and child CHECK constraints can be created in either order
 create table p1(f1 int);
 create table p1_c1() inherits(p1);
index 8198d1e9305d70f6d25bcdc786e7cab470532a1f..61799b6499de8a699b36661881074a5b687f9b6e 100644 (file)
@@ -2035,7 +2035,6 @@ CREATE TABLE partitioned (
        a int,
        b int
 ) PARTITION BY RANGE (a, (a+b+1));
-ALTER TABLE partitioned ADD FOREIGN KEY (a) REFERENCES blah;
 ALTER TABLE partitioned ADD EXCLUDE USING gist (a WITH &&);
 
 -- cannot drop column that is part of the partition key
index 7d67ce05d99404f0212c64f76639b31c89f7ca3c..235bef13dc01dba26b9e3631036fdac2da97c5f4 100644 (file)
@@ -298,14 +298,6 @@ CREATE TABLE partitioned (
 ) PARTITION BY LIST (a1, a2);  -- fail
 
 -- unsupported constraint type for partitioned tables
-CREATE TABLE pkrel (
-       a int PRIMARY KEY
-);
-CREATE TABLE partitioned (
-       a int REFERENCES pkrel(a)
-) PARTITION BY RANGE (a);
-DROP TABLE pkrel;
-
 CREATE TABLE partitioned (
        a int,
        EXCLUDE USING gist (a WITH &&)
index 63d643c9cc7c5dc52f32496953d0803924305149..f3e7058583956001b13bd5325ee65da5557cf1f4 100644 (file)
@@ -1065,3 +1065,157 @@ alter table fktable2 drop constraint fktable2_f1_fkey;
 commit;
 
 drop table pktable2, fktable2;
+
+
+--
+-- Foreign keys and partitioned tables
+--
+
+-- partitioned table in the referenced side are not allowed
+CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b))
+  PARTITION BY RANGE (a, b);
+-- verify with create table first ...
+CREATE TABLE fk_notpartitioned_fk (a int, b int,
+  FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk);
+-- and then with alter table.
+CREATE TABLE fk_notpartitioned_fk_2 (a int, b int);
+ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b)
+  REFERENCES fk_partitioned_pk;
+DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2;
+
+-- Creation of a partitioned hierarchy with irregular definitions
+CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int,
+  PRIMARY KEY (a, b));
+ALTER TABLE fk_notpartitioned_pk DROP COLUMN fdrop1, DROP COLUMN fdrop2;
+CREATE TABLE fk_partitioned_fk (b int, fdrop1 int, a int) PARTITION BY RANGE (a, b);
+ALTER TABLE fk_partitioned_fk DROP COLUMN fdrop1;
+CREATE TABLE fk_partitioned_fk_1 (fdrop1 int, fdrop2 int, a int, fdrop3 int, b int);
+ALTER TABLE fk_partitioned_fk_1 DROP COLUMN fdrop1, DROP COLUMN fdrop2, DROP COLUMN fdrop3;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_1 FOR VALUES FROM (0,0) TO (1000,1000);
+ALTER TABLE fk_partitioned_fk ADD FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk;
+CREATE TABLE fk_partitioned_fk_2 (b int, fdrop1 int, fdrop2 int, a int);
+ALTER TABLE fk_partitioned_fk_2 DROP COLUMN fdrop1, DROP COLUMN fdrop2;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES FROM (1000,1000) TO (2000,2000);
+
+CREATE TABLE fk_partitioned_fk_3 (fdrop1 int, fdrop2 int, fdrop3 int, fdrop4 int, b int, a int)
+  PARTITION BY HASH (a);
+ALTER TABLE fk_partitioned_fk_3 DROP COLUMN fdrop1, DROP COLUMN fdrop2,
+       DROP COLUMN fdrop3, DROP COLUMN fdrop4;
+CREATE TABLE fk_partitioned_fk_3_0 PARTITION OF fk_partitioned_fk_3 FOR VALUES WITH (MODULUS 5, REMAINDER 0);
+CREATE TABLE fk_partitioned_fk_3_1 PARTITION OF fk_partitioned_fk_3 FOR VALUES WITH (MODULUS 5, REMAINDER 1);
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_3
+  FOR VALUES FROM (2000,2000) TO (3000,3000);
+
+-- these inserts, targetting both the partition directly as well as the
+-- partitioned table, should all fail
+INSERT INTO fk_partitioned_fk (a,b) VALUES (500, 501);
+INSERT INTO fk_partitioned_fk_1 (a,b) VALUES (500, 501);
+INSERT INTO fk_partitioned_fk (a,b) VALUES (1500, 1501);
+INSERT INTO fk_partitioned_fk_2 (a,b) VALUES (1500, 1501);
+INSERT INTO fk_partitioned_fk (a,b) VALUES (2500, 2502);
+INSERT INTO fk_partitioned_fk_3 (a,b) VALUES (2500, 2502);
+INSERT INTO fk_partitioned_fk (a,b) VALUES (2501, 2503);
+INSERT INTO fk_partitioned_fk_3 (a,b) VALUES (2501, 2503);
+
+-- but if we insert the values that make them valid, then they work
+INSERT INTO fk_notpartitioned_pk VALUES (500, 501), (1500, 1501),
+  (2500, 2502), (2501, 2503);
+INSERT INTO fk_partitioned_fk (a,b) VALUES (500, 501);
+INSERT INTO fk_partitioned_fk (a,b) VALUES (1500, 1501);
+INSERT INTO fk_partitioned_fk (a,b) VALUES (2500, 2502);
+INSERT INTO fk_partitioned_fk (a,b) VALUES (2501, 2503);
+
+-- this update fails because there is no referenced row
+UPDATE fk_partitioned_fk SET a = a + 1 WHERE a = 2501;
+-- but we can fix it thusly:
+INSERT INTO fk_notpartitioned_pk (a,b) VALUES (2502, 2503);
+UPDATE fk_partitioned_fk SET a = a + 1 WHERE a = 2501;
+
+-- these updates would leave lingering rows in the referencing table; disallow
+UPDATE fk_notpartitioned_pk SET b = 502 WHERE a = 500;
+UPDATE fk_notpartitioned_pk SET b = 1502 WHERE a = 1500;
+UPDATE fk_notpartitioned_pk SET b = 2504 WHERE a = 2500;
+ALTER TABLE fk_partitioned_fk DROP CONSTRAINT fk_partitioned_fk_a_fkey;
+-- done.
+DROP TABLE fk_notpartitioned_pk, fk_partitioned_fk;
+
+-- Test some other exotic foreign key features: MATCH SIMPLE, ON UPDATE/DELETE
+-- actions
+CREATE TABLE fk_notpartitioned_pk (a int, b int, primary key (a, b));
+CREATE TABLE fk_partitioned_fk (a int default 2501, b int default 142857) PARTITION BY LIST (a);
+CREATE TABLE fk_partitioned_fk_1 PARTITION OF fk_partitioned_fk FOR VALUES IN (NULL,500,501,502);
+ALTER TABLE fk_partitioned_fk ADD FOREIGN KEY (a, b)
+  REFERENCES fk_notpartitioned_pk MATCH SIMPLE
+  ON DELETE SET NULL ON UPDATE SET NULL;
+CREATE TABLE fk_partitioned_fk_2 PARTITION OF fk_partitioned_fk FOR VALUES IN (1500,1502);
+CREATE TABLE fk_partitioned_fk_3 (a int, b int);
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_3 FOR VALUES IN (2500,2501,2502,2503);
+
+-- this insert fails
+INSERT INTO fk_partitioned_fk (a, b) VALUES (2502, 2503);
+INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, 2503);
+-- but since the FK is MATCH SIMPLE, this one doesn't
+INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, NULL);
+-- now create the referenced row ...
+INSERT INTO fk_notpartitioned_pk VALUES (2502, 2503);
+--- and now the same insert work
+INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, 2503);
+-- this always works
+INSERT INTO fk_partitioned_fk (a,b) VALUES (NULL, NULL);
+
+-- ON UPDATE SET NULL
+SELECT tableoid::regclass, a, b FROM fk_partitioned_fk WHERE b IS NULL ORDER BY a;
+UPDATE fk_notpartitioned_pk SET a = a + 1 WHERE a = 2502;
+SELECT tableoid::regclass, a, b FROM fk_partitioned_fk WHERE b IS NULL ORDER BY a;
+
+-- ON DELETE SET NULL
+INSERT INTO fk_partitioned_fk VALUES (2503, 2503);
+SELECT count(*) FROM fk_partitioned_fk WHERE a IS NULL;
+DELETE FROM fk_notpartitioned_pk;
+SELECT count(*) FROM fk_partitioned_fk WHERE a IS NULL;
+
+-- ON UPDATE/DELETE SET DEFAULT
+ALTER TABLE fk_partitioned_fk DROP CONSTRAINT fk_partitioned_fk_a_fkey;
+ALTER TABLE fk_partitioned_fk ADD FOREIGN KEY (a, b)
+  REFERENCES fk_notpartitioned_pk
+  ON DELETE SET DEFAULT ON UPDATE SET DEFAULT;
+INSERT INTO fk_notpartitioned_pk VALUES (2502, 2503);
+INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, 2503);
+-- this fails, because the defaults for the referencing table are not present
+-- in the referenced table:
+UPDATE fk_notpartitioned_pk SET a = 1500 WHERE a = 2502;
+-- but inserting the row we can make it work:
+INSERT INTO fk_notpartitioned_pk VALUES (2501, 142857);
+UPDATE fk_notpartitioned_pk SET a = 1500 WHERE a = 2502;
+SELECT * FROM fk_partitioned_fk WHERE b = 142857;
+
+-- ON UPDATE/DELETE CASCADE
+ALTER TABLE fk_partitioned_fk DROP CONSTRAINT fk_partitioned_fk_a_fkey;
+ALTER TABLE fk_partitioned_fk ADD FOREIGN KEY (a, b)
+  REFERENCES fk_notpartitioned_pk
+  ON DELETE CASCADE ON UPDATE CASCADE;
+UPDATE fk_notpartitioned_pk SET a = 2502 WHERE a = 2501;
+SELECT * FROM fk_partitioned_fk WHERE b = 142857;
+
+-- Now you see it ...
+SELECT * FROM fk_partitioned_fk WHERE b = 142857;
+DELETE FROM fk_notpartitioned_pk WHERE b = 142857;
+-- now you don't.
+SELECT * FROM fk_partitioned_fk WHERE a = 142857;
+
+-- verify that DROP works
+DROP TABLE fk_partitioned_fk_2;
+
+-- verify that attaching a table checks that the existing data satisfies the
+-- constraint
+CREATE TABLE fk_partitioned_fk_2 (a int, b int) PARTITION BY RANGE (b);
+CREATE TABLE fk_partitioned_fk_2_1 PARTITION OF fk_partitioned_fk_2 FOR VALUES FROM (0) TO (1000);
+CREATE TABLE fk_partitioned_fk_2_2 PARTITION OF fk_partitioned_fk_2 FOR VALUES FROM (1000) TO (2000);
+INSERT INTO fk_partitioned_fk_2 VALUES (1600, 601), (1600, 1601);
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
+  FOR VALUES IN (1600);
+INSERT INTO fk_notpartitioned_pk VALUES (1600, 601), (1600, 1601);
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
+  FOR VALUES IN (1600);
+
+-- leave these tables around intentionally
index 9397f72c1380df4b0bf79bfa64655284a0307557..5a48376fc03341991b528b3e2b02ce1db641a747 100644 (file)
@@ -409,6 +409,18 @@ DROP TABLE test_foreign_constraints_inh;
 DROP TABLE test_foreign_constraints;
 DROP TABLE test_primary_constraints;
 
+-- Test foreign key behavior
+create table inh_fk_1 (a int primary key);
+insert into inh_fk_1 values (1), (2), (3);
+create table inh_fk_2 (x int primary key, y int references inh_fk_1 on delete cascade);
+insert into inh_fk_2 values (11, 1), (22, 2), (33, 3);
+create table inh_fk_2_child () inherits (inh_fk_2);
+insert into inh_fk_2_child values (111, 1), (222, 2);
+delete from inh_fk_1 where a = 1;
+select * from inh_fk_1 order by 1;
+select * from inh_fk_2 order by 1, 2;
+drop table inh_fk_1, inh_fk_2, inh_fk_2_child;
+
 -- Test that parent and child CHECK constraints can be created in either order
 create table p1(f1 int);
 create table p1_c1() inherits(p1);