Allow publishing partition changes via ancestors
authorPeter Eisentraut <peter@eisentraut.org>
Wed, 8 Apr 2020 07:59:27 +0000 (09:59 +0200)
committerPeter Eisentraut <peter@eisentraut.org>
Wed, 8 Apr 2020 09:19:23 +0000 (11:19 +0200)
To control whether partition changes are replicated using their own
identity and schema or an ancestor's, add a new parameter that can be
set per publication named 'publish_via_partition_root'.

This allows replicating a partitioned table into a different partition
structure on the subscriber.

Author: Amit Langote <amitlangote09@gmail.com>
Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com>
Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Reviewed-by: Petr Jelinek <petr@2ndquadrant.com>
Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com

15 files changed:
doc/src/sgml/catalogs.sgml
doc/src/sgml/logical-replication.sgml
doc/src/sgml/ref/create_publication.sgml
src/backend/catalog/pg_publication.c
src/backend/commands/publicationcmds.c
src/backend/replication/pgoutput/pgoutput.c
src/backend/utils/cache/relcache.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h
src/bin/psql/describe.c
src/include/catalog/catversion.h
src/include/catalog/pg_publication.h
src/test/regress/expected/publication.out
src/test/regress/sql/publication.sql
src/test/subscription/t/013_partition.pl

index 0d61d98b1153ea9772634e8b459706f072996896..386c6d7bd1b65b287d6c5d5036422c3ae9604cac 100644 (file)
@@ -5437,6 +5437,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       <entry>If true, <command>TRUNCATE</command> operations are replicated for
        tables in the publication.</entry>
      </row>
+
+     <row>
+      <entry><structfield>pubviaroot</structfield></entry>
+      <entry><type>bool</type></entry>
+      <entry></entry>
+      <entry>If true, operations on a leaf partition are replicated using the
+       identity and schema of its topmost partitioned ancestor mentioned in the
+       publication instead of its own.
+      </entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
index c513621470afce8149d3bb056eca962ccb141efa..eba331a72b584cdce79e8e2a87c0a844cbc09aa6 100644 (file)
    <listitem>
     <para>
      When replicating between partitioned tables, the actual replication
-     originates from the leaf partitions on the publisher, so partitions on
-     the publisher must also exist on the subscriber as valid target tables.
-     (They could either be leaf partitions themselves, or they could be
-     further subpartitioned, or they could even be independent tables.)
+     originates, by default, from the leaf partitions on the publisher, so
+     partitions on the publisher must also exist on the subscriber as valid
+     target tables. (They could either be leaf partitions themselves, or they
+     could be further subpartitioned, or they could even be independent
+     tables.)  Publications can also specify that changes are to be replicated
+     using the identity and schema of the partitioned root table instead of
+     that of the individual leaf partitions in which the changes actually
+     originate (see <xref linkend="sql-createpublication"/>).
     </para>
    </listitem>
   </itemizedlist>
index 597cb28f3397dce294e13c05ba47512aea9f85fc..2c52a8aada1725b13166360bf0e966489ea893ec 100644 (file)
@@ -123,6 +123,26 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>publish_via_partition_root</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          This parameter determines whether changes in a partitioned table (or
+          on its partitions) contained in the publication will be published
+          using the identity and schema of the partitioned table rather than
+          that of the individual partitions that are actually changed; the
+          latter is the default.  Enablings this allows the changes to be
+          replicated into a non-partitioned table or a partitioned table
+          consisting of a different set of partitions.
+         </para>
+
+         <para>
+          If this is enabled, <literal>TRUNCATE</literal> operations performed
+          directly on partitions are not replicated.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist>
 
      </para>
index 500a5ae1ee0b9c453a5f5c4e7527ae4fccb08860..68f6887b383a175bfb1706891c03e5c12d3e2675 100644 (file)
@@ -42,8 +42,6 @@
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
-static List *get_rel_publications(Oid relid);
-
 /*
  * Check if relation can be in given publication and throws appropriate
  * error if not.
@@ -216,37 +214,9 @@ publication_add_relation(Oid pubid, Relation targetrel,
    return myself;
 }
 
-
-/*
- * Gets list of publication oids for a relation, plus those of ancestors,
- * if any, if the relation is a partition.
- */
+/* Gets list of publication oids for a relation */
 List *
 GetRelationPublications(Oid relid)
-{
-   List       *result = NIL;
-
-   result = get_rel_publications(relid);
-   if (get_rel_relispartition(relid))
-   {
-       List       *ancestors = get_partition_ancestors(relid);
-       ListCell   *lc;
-
-       foreach(lc, ancestors)
-       {
-           Oid         ancestor = lfirst_oid(lc);
-           List       *ancestor_pubs = get_rel_publications(ancestor);
-
-           result = list_concat(result, ancestor_pubs);
-       }
-   }
-
-   return result;
-}
-
-/* Workhorse of GetRelationPublications() */
-static List *
-get_rel_publications(Oid relid)
 {
    List       *result = NIL;
    CatCList   *pubrellist;
@@ -373,9 +343,13 @@ GetAllTablesPublications(void)
 
 /*
  * Gets list of all relation published by FOR ALL TABLES publication(s).
+ *
+ * If the publication publishes partition changes via their respective root
+ * partitioned tables, we must exclude partitions in favor of including the
+ * root partitioned tables.
  */
 List *
-GetAllTablesPublicationRelations(void)
+GetAllTablesPublicationRelations(bool pubviaroot)
 {
    Relation    classRel;
    ScanKeyData key[1];
@@ -397,12 +371,35 @@ GetAllTablesPublicationRelations(void)
        Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
        Oid         relid = relForm->oid;
 
-       if (is_publishable_class(relid, relForm))
+       if (is_publishable_class(relid, relForm) &&
+           !(relForm->relispartition && pubviaroot))
            result = lappend_oid(result, relid);
    }
 
    table_endscan(scan);
-   table_close(classRel, AccessShareLock);
+
+   if (pubviaroot)
+   {
+       ScanKeyInit(&key[0],
+                   Anum_pg_class_relkind,
+                   BTEqualStrategyNumber, F_CHAREQ,
+                   CharGetDatum(RELKIND_PARTITIONED_TABLE));
+
+       scan = table_beginscan_catalog(classRel, 1, key);
+
+       while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+       {
+           Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+           Oid         relid = relForm->oid;
+
+           if (is_publishable_class(relid, relForm) &&
+               !relForm->relispartition)
+               result = lappend_oid(result, relid);
+       }
+
+       table_endscan(scan);
+       table_close(classRel, AccessShareLock);
+   }
 
    return result;
 }
@@ -433,6 +430,7 @@ GetPublication(Oid pubid)
    pub->pubactions.pubupdate = pubform->pubupdate;
    pub->pubactions.pubdelete = pubform->pubdelete;
    pub->pubactions.pubtruncate = pubform->pubtruncate;
+   pub->pubviaroot = pubform->pubviaroot;
 
    ReleaseSysCache(tup);
 
@@ -533,9 +531,11 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
         * need those.
         */
        if (publication->alltables)
-           tables = GetAllTablesPublicationRelations();
+           tables = GetAllTablesPublicationRelations(publication->pubviaroot);
        else
            tables = GetPublicationRelations(publication->oid,
+                                            publication->pubviaroot ?
+                                            PUBLICATION_PART_ROOT :
                                             PUBLICATION_PART_LEAF);
        funcctx->user_fctx = (void *) tables;
 
index 494c0bdc282c1eb7352c124e38a9c38950cbf252..771268f70a260bbea0009a5f8691fc1e2a1198d9 100644 (file)
@@ -23,6 +23,7 @@
 #include "catalog/namespace.h"
 #include "catalog/objectaccess.h"
 #include "catalog/objectaddress.h"
+#include "catalog/partition.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
@@ -56,20 +57,21 @@ static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
 static void
 parse_publication_options(List *options,
                          bool *publish_given,
-                         bool *publish_insert,
-                         bool *publish_update,
-                         bool *publish_delete,
-                         bool *publish_truncate)
+                         PublicationActions *pubactions,
+                         bool *publish_via_partition_root_given,
+                         bool *publish_via_partition_root)
 {
    ListCell   *lc;
 
    *publish_given = false;
+   *publish_via_partition_root_given = false;
 
-   /* Defaults are true */
-   *publish_insert = true;
-   *publish_update = true;
-   *publish_delete = true;
-   *publish_truncate = true;
+   /* defaults */
+   pubactions->pubinsert = true;
+   pubactions->pubupdate = true;
+   pubactions->pubdelete = true;
+   pubactions->pubtruncate = true;
+   *publish_via_partition_root = false;
 
    /* Parse options */
    foreach(lc, options)
@@ -91,10 +93,10 @@ parse_publication_options(List *options,
             * If publish option was given only the explicitly listed actions
             * should be published.
             */
-           *publish_insert = false;
-           *publish_update = false;
-           *publish_delete = false;
-           *publish_truncate = false;
+           pubactions->pubinsert = false;
+           pubactions->pubupdate = false;
+           pubactions->pubdelete = false;
+           pubactions->pubtruncate = false;
 
            *publish_given = true;
            publish = defGetString(defel);
@@ -110,19 +112,28 @@ parse_publication_options(List *options,
                char       *publish_opt = (char *) lfirst(lc);
 
                if (strcmp(publish_opt, "insert") == 0)
-                   *publish_insert = true;
+                   pubactions->pubinsert = true;
                else if (strcmp(publish_opt, "update") == 0)
-                   *publish_update = true;
+                   pubactions->pubupdate = true;
                else if (strcmp(publish_opt, "delete") == 0)
-                   *publish_delete = true;
+                   pubactions->pubdelete = true;
                else if (strcmp(publish_opt, "truncate") == 0)
-                   *publish_truncate = true;
+                   pubactions->pubtruncate = true;
                else
                    ereport(ERROR,
                            (errcode(ERRCODE_SYNTAX_ERROR),
                             errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
            }
        }
+       else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
+       {
+           if (*publish_via_partition_root_given)
+               ereport(ERROR,
+                       (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg("conflicting or redundant options")));
+           *publish_via_partition_root_given = true;
+           *publish_via_partition_root = defGetBoolean(defel);
+       }
        else
            ereport(ERROR,
                    (errcode(ERRCODE_SYNTAX_ERROR),
@@ -143,10 +154,9 @@ CreatePublication(CreatePublicationStmt *stmt)
    Datum       values[Natts_pg_publication];
    HeapTuple   tup;
    bool        publish_given;
-   bool        publish_insert;
-   bool        publish_update;
-   bool        publish_delete;
-   bool        publish_truncate;
+   PublicationActions pubactions;
+   bool        publish_via_partition_root_given;
+   bool        publish_via_partition_root;
    AclResult   aclresult;
 
    /* must have CREATE privilege on database */
@@ -183,9 +193,9 @@ CreatePublication(CreatePublicationStmt *stmt)
    values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
 
    parse_publication_options(stmt->options,
-                             &publish_given, &publish_insert,
-                             &publish_update, &publish_delete,
-                             &publish_truncate);
+                             &publish_given, &pubactions,
+                             &publish_via_partition_root_given,
+                             &publish_via_partition_root);
 
    puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
                                Anum_pg_publication_oid);
@@ -193,13 +203,15 @@ CreatePublication(CreatePublicationStmt *stmt)
    values[Anum_pg_publication_puballtables - 1] =
        BoolGetDatum(stmt->for_all_tables);
    values[Anum_pg_publication_pubinsert - 1] =
-       BoolGetDatum(publish_insert);
+       BoolGetDatum(pubactions.pubinsert);
    values[Anum_pg_publication_pubupdate - 1] =
-       BoolGetDatum(publish_update);
+       BoolGetDatum(pubactions.pubupdate);
    values[Anum_pg_publication_pubdelete - 1] =
-       BoolGetDatum(publish_delete);
+       BoolGetDatum(pubactions.pubdelete);
    values[Anum_pg_publication_pubtruncate - 1] =
-       BoolGetDatum(publish_truncate);
+       BoolGetDatum(pubactions.pubtruncate);
+   values[Anum_pg_publication_pubviaroot - 1] =
+       BoolGetDatum(publish_via_partition_root);
 
    tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -251,17 +263,16 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
    bool        replaces[Natts_pg_publication];
    Datum       values[Natts_pg_publication];
    bool        publish_given;
-   bool        publish_insert;
-   bool        publish_update;
-   bool        publish_delete;
-   bool        publish_truncate;
+   PublicationActions pubactions;
+   bool        publish_via_partition_root_given;
+   bool        publish_via_partition_root;
    ObjectAddress obj;
    Form_pg_publication pubform;
 
    parse_publication_options(stmt->options,
-                             &publish_given, &publish_insert,
-                             &publish_update, &publish_delete,
-                             &publish_truncate);
+                             &publish_given, &pubactions,
+                             &publish_via_partition_root_given,
+                             &publish_via_partition_root);
 
    /* Everything ok, form a new tuple. */
    memset(values, 0, sizeof(values));
@@ -270,19 +281,25 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
 
    if (publish_given)
    {
-       values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert);
+       values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
        replaces[Anum_pg_publication_pubinsert - 1] = true;
 
-       values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update);
+       values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
        replaces[Anum_pg_publication_pubupdate - 1] = true;
 
-       values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
+       values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
        replaces[Anum_pg_publication_pubdelete - 1] = true;
 
-       values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate);
+       values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
        replaces[Anum_pg_publication_pubtruncate - 1] = true;
    }
 
+   if (publish_via_partition_root_given)
+   {
+       values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
+       replaces[Anum_pg_publication_pubviaroot - 1] = true;
+   }
+
    tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
                            replaces);
 
index 552a70cffa5590261d2366a2f3b3218fd9a241e2..5fbf2d4367b14f2d25613136082d70a61930926d 100644 (file)
@@ -12,6 +12,8 @@
  */
 #include "postgres.h"
 
+#include "access/tupconvert.h"
+#include "catalog/partition.h"
 #include "catalog/pg_publication.h"
 #include "fmgr.h"
 #include "replication/logical.h"
@@ -20,6 +22,7 @@
 #include "replication/pgoutput.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
@@ -49,6 +52,7 @@ static bool publications_valid;
 static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
                                        uint32 hashvalue);
+static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx);
 
 /*
  * Entry in the map used to remember which relation schemas we sent.
@@ -59,9 +63,31 @@ static void publication_invalidation_cb(Datum arg, int cacheid,
 typedef struct RelationSyncEntry
 {
    Oid         relid;          /* relation oid */
-   bool        schema_sent;    /* did we send the schema? */
+
+   /*
+    * Did we send the schema?  If ancestor relid is set, its schema must also
+    * have been sent for this to be true.
+    */
+   bool        schema_sent;
+
    bool        replicate_valid;
    PublicationActions pubactions;
+
+   /*
+    * OID of the relation to publish changes as.  For a partition, this may
+    * be set to one of its ancestors whose schema will be used when
+    * replicating changes, if publish_via_partition_root is set for the
+    * publication.
+    */
+   Oid         publish_as_relid;
+
+   /*
+    * Map used when replicating using an ancestor's schema to convert tuples
+    * from partition's type to the ancestor's; NULL if publish_as_relid is
+    * same as 'relid' or if unnecessary due to partition and the ancestor
+    * having identical TupleDesc.
+    */
+   TupleConversionMap *map;
 } RelationSyncEntry;
 
 /* Map used to remember which relation schemas we sent. */
@@ -259,47 +285,71 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 }
 
 /*
- * Write the relation schema if the current schema hasn't been sent yet.
+ * Write the current schema of the relation and its ancestor (if any) if not
+ * done yet.
  */
 static void
 maybe_send_schema(LogicalDecodingContext *ctx,
                  Relation relation, RelationSyncEntry *relentry)
 {
-   if (!relentry->schema_sent)
+   if (relentry->schema_sent)
+       return;
+
+   /* If needed, send the ancestor's schema first. */
+   if (relentry->publish_as_relid != RelationGetRelid(relation))
    {
-       TupleDesc   desc;
-       int         i;
+       Relation    ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+       TupleDesc   indesc = RelationGetDescr(relation);
+       TupleDesc   outdesc = RelationGetDescr(ancestor);
+       MemoryContext oldctx;
+
+       /* Map must live as long as the session does. */
+       oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+       relentry->map = convert_tuples_by_name(indesc, outdesc);
+       MemoryContextSwitchTo(oldctx);
+       send_relation_and_attrs(ancestor, ctx);
+       RelationClose(ancestor);
+   }
 
-       desc = RelationGetDescr(relation);
+   send_relation_and_attrs(relation, ctx);
+   relentry->schema_sent = true;
+}
 
-       /*
-        * Write out type info if needed.  We do that only for user-created
-        * types.  We use FirstGenbkiObjectId as the cutoff, so that we only
-        * consider objects with hand-assigned OIDs to be "built in", not for
-        * instance any function or type defined in the information_schema.
-        * This is important because only hand-assigned OIDs can be expected
-        * to remain stable across major versions.
-        */
-       for (i = 0; i < desc->natts; i++)
-       {
-           Form_pg_attribute att = TupleDescAttr(desc, i);
+/*
+ * Sends a relation
+ */
+static void
+send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx)
+{
+   TupleDesc   desc = RelationGetDescr(relation);
+   int         i;
 
-           if (att->attisdropped || att->attgenerated)
-               continue;
+   /*
+    * Write out type info if needed.  We do that only for user-created types.
+    * We use FirstGenbkiObjectId as the cutoff, so that we only consider
+    * objects with hand-assigned OIDs to be "built in", not for instance any
+    * function or type defined in the information_schema. This is important
+    * because only hand-assigned OIDs can be expected to remain stable across
+    * major versions.
+    */
+   for (i = 0; i < desc->natts; i++)
+   {
+       Form_pg_attribute att = TupleDescAttr(desc, i);
 
-           if (att->atttypid < FirstGenbkiObjectId)
-               continue;
+       if (att->attisdropped || att->attgenerated)
+           continue;
 
-           OutputPluginPrepareWrite(ctx, false);
-           logicalrep_write_typ(ctx->out, att->atttypid);
-           OutputPluginWrite(ctx, false);
-       }
+       if (att->atttypid < FirstGenbkiObjectId)
+           continue;
 
        OutputPluginPrepareWrite(ctx, false);
-       logicalrep_write_rel(ctx->out, relation);
+       logicalrep_write_typ(ctx->out, att->atttypid);
        OutputPluginWrite(ctx, false);
-       relentry->schema_sent = true;
    }
+
+   OutputPluginPrepareWrite(ctx, false);
+   logicalrep_write_rel(ctx->out, relation);
+   OutputPluginWrite(ctx, false);
 }
 
 /*
@@ -346,28 +396,65 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    switch (change->action)
    {
        case REORDER_BUFFER_CHANGE_INSERT:
-           OutputPluginPrepareWrite(ctx, true);
-           logicalrep_write_insert(ctx->out, relation,
-                                   &change->data.tp.newtuple->tuple);
-           OutputPluginWrite(ctx, true);
-           break;
+           {
+               HeapTuple   tuple = &change->data.tp.newtuple->tuple;
+
+               /* Switch relation if publishing via root. */
+               if (relentry->publish_as_relid != RelationGetRelid(relation))
+               {
+                   Assert(relation->rd_rel->relispartition);
+                   relation = RelationIdGetRelation(relentry->publish_as_relid);
+                   /* Convert tuple if needed. */
+                   if (relentry->map)
+                       tuple = execute_attr_map_tuple(tuple, relentry->map);
+               }
+
+               OutputPluginPrepareWrite(ctx, true);
+               logicalrep_write_insert(ctx->out, relation, tuple);
+               OutputPluginWrite(ctx, true);
+               break;
+           }
        case REORDER_BUFFER_CHANGE_UPDATE:
            {
                HeapTuple   oldtuple = change->data.tp.oldtuple ?
                &change->data.tp.oldtuple->tuple : NULL;
+               HeapTuple   newtuple = &change->data.tp.newtuple->tuple;
+
+               /* Switch relation if publishing via root. */
+               if (relentry->publish_as_relid != RelationGetRelid(relation))
+               {
+                   Assert(relation->rd_rel->relispartition);
+                   relation = RelationIdGetRelation(relentry->publish_as_relid);
+                   /* Convert tuples if needed. */
+                   if (relentry->map)
+                   {
+                       oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
+                       newtuple = execute_attr_map_tuple(newtuple, relentry->map);
+                   }
+               }
 
                OutputPluginPrepareWrite(ctx, true);
-               logicalrep_write_update(ctx->out, relation, oldtuple,
-                                       &change->data.tp.newtuple->tuple);
+               logicalrep_write_update(ctx->out, relation, oldtuple, newtuple);
                OutputPluginWrite(ctx, true);
                break;
            }
        case REORDER_BUFFER_CHANGE_DELETE:
            if (change->data.tp.oldtuple)
            {
+               HeapTuple   oldtuple = &change->data.tp.oldtuple->tuple;
+
+               /* Switch relation if publishing via root. */
+               if (relentry->publish_as_relid != RelationGetRelid(relation))
+               {
+                   Assert(relation->rd_rel->relispartition);
+                   relation = RelationIdGetRelation(relentry->publish_as_relid);
+                   /* Convert tuple if needed. */
+                   if (relentry->map)
+                       oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
+               }
+
                OutputPluginPrepareWrite(ctx, true);
-               logicalrep_write_delete(ctx->out, relation,
-                                       &change->data.tp.oldtuple->tuple);
+               logicalrep_write_delete(ctx->out, relation, oldtuple);
                OutputPluginWrite(ctx, true);
            }
            else
@@ -412,10 +499,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
            continue;
 
        /*
-        * Don't send partitioned tables, because partitions should be sent
-        * instead.
+        * Don't send partitions if the publication wants to send only the
+        * root tables through it.
         */
-       if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+       if (relation->rd_rel->relispartition &&
+           relentry->publish_as_relid != relid)
            continue;
 
        relids[nrelids++] = relid;
@@ -540,12 +628,15 @@ init_rel_sync_cache(MemoryContext cachectx)
  * This looks up publications that the given relation is directly or
  * indirectly part of (the latter if it's really the relation's ancestor that
  * is part of a publication) and fills up the found entry with the information
- * about which operations to publish.
+ * about which operations to publish and whether to use an ancestor's schema
+ * when publishing.
  */
 static RelationSyncEntry *
 get_rel_sync_entry(PGOutputData *data, Oid relid)
 {
    RelationSyncEntry *entry;
+   bool        am_partition = get_rel_relispartition(relid);
+   char        relkind = get_rel_relkind(relid);
    bool        found;
    MemoryContext oldctx;
 
@@ -564,6 +655,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
    {
        List       *pubids = GetRelationPublications(relid);
        ListCell   *lc;
+       Oid         publish_as_relid = relid;
 
        /* Reload publications if needed before use. */
        if (!publications_valid)
@@ -588,8 +680,56 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
        foreach(lc, data->publications)
        {
            Publication *pub = lfirst(lc);
+           bool        publish = false;
+
+           if (pub->alltables)
+           {
+               publish = true;
+               if (pub->pubviaroot && am_partition)
+                   publish_as_relid = llast_oid(get_partition_ancestors(relid));
+           }
+
+           if (!publish)
+           {
+               bool    ancestor_published = false;
+
+               /*
+                * For a partition, check if any of the ancestors are
+                * published.  If so, note down the topmost ancestor that is
+                * published via this publication, which will be used as the
+                * relation via which to publish the partition's changes.
+                */
+               if (am_partition)
+               {
+                   List   *ancestors = get_partition_ancestors(relid);
+                   ListCell *lc2;
+
+                   /* Find the "topmost" ancestor that is in this publication. */
+                   foreach(lc2, ancestors)
+                   {
+                       Oid     ancestor = lfirst_oid(lc2);
+
+                       if (list_member_oid(GetRelationPublications(ancestor),
+                                           pub->oid))
+                       {
+                           ancestor_published = true;
+                           if (pub->pubviaroot)
+                               publish_as_relid = ancestor;
+                       }
+                   }
+               }
+
+               if (list_member_oid(pubids, pub->oid) || ancestor_published)
+                   publish = true;
+           }
 
-           if (pub->alltables || list_member_oid(pubids, pub->oid))
+           /*
+            * Don't publish changes for partitioned tables, because
+            * publishing those of its partitions suffices, unless partition
+            * changes won't be published due to pubviaroot being set.
+            */
+           if (publish &&
+               (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
            {
                entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
                entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
@@ -604,6 +744,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 
        list_free(pubids);
 
+       entry->publish_as_relid = publish_as_relid;
        entry->replicate_valid = true;
    }
 
index dfd81f1320e350db37c077e0577265824d5c8d53..9f1f11d0c144cf116699806912f9df70264dfb97 100644 (file)
@@ -44,6 +44,7 @@
 #include "catalog/catalog.h"
 #include "catalog/indexing.h"
 #include "catalog/namespace.h"
+#include "catalog/partition.h"
 #include "catalog/pg_am.h"
 #include "catalog/pg_amproc.h"
 #include "catalog/pg_attrdef.h"
@@ -5314,6 +5315,20 @@ GetRelationPublicationActions(Relation relation)
 
    /* Fetch the publication membership info. */
    puboids = GetRelationPublications(RelationGetRelid(relation));
+   if (relation->rd_rel->relispartition)
+   {
+       /* Add publications that the ancestors are in too. */
+       List   *ancestors = get_partition_ancestors(RelationGetRelid(relation));
+       ListCell *lc;
+
+       foreach(lc, ancestors)
+       {
+           Oid     ancestor = lfirst_oid(lc);
+
+           puboids = list_concat_unique_oid(puboids,
+                                            GetRelationPublications(ancestor));
+       }
+   }
    puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
 
    foreach(lc, puboids)
index 408637cfec4ef31bdc0d89238399e89a391fd3ba..c579227b1974899d029753bb92560da86cdc039c 100644 (file)
@@ -3868,6 +3868,7 @@ getPublications(Archive *fout)
    int         i_pubupdate;
    int         i_pubdelete;
    int         i_pubtruncate;
+   int         i_pubviaroot;
    int         i,
                ntups;
 
@@ -3879,18 +3880,25 @@ getPublications(Archive *fout)
    resetPQExpBuffer(query);
 
    /* Get the publications. */
-   if (fout->remoteVersion >= 110000)
+   if (fout->remoteVersion >= 130000)
+       appendPQExpBuffer(query,
+                         "SELECT p.tableoid, p.oid, p.pubname, "
+                         "(%s p.pubowner) AS rolname, "
+                         "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
+                         "FROM pg_publication p",
+                         username_subquery);
+   else if (fout->remoteVersion >= 110000)
        appendPQExpBuffer(query,
                          "SELECT p.tableoid, p.oid, p.pubname, "
                          "(%s p.pubowner) AS rolname, "
-                         "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate "
+                         "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot "
                          "FROM pg_publication p",
                          username_subquery);
    else
        appendPQExpBuffer(query,
                          "SELECT p.tableoid, p.oid, p.pubname, "
                          "(%s p.pubowner) AS rolname, "
-                         "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate "
+                         "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot "
                          "FROM pg_publication p",
                          username_subquery);
 
@@ -3907,6 +3915,7 @@ getPublications(Archive *fout)
    i_pubupdate = PQfnumber(res, "pubupdate");
    i_pubdelete = PQfnumber(res, "pubdelete");
    i_pubtruncate = PQfnumber(res, "pubtruncate");
+   i_pubviaroot = PQfnumber(res, "pubviaroot");
 
    pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
 
@@ -3929,6 +3938,8 @@ getPublications(Archive *fout)
            (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
        pubinfo[i].pubtruncate =
            (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
+       pubinfo[i].pubviaroot =
+           (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
 
        if (strlen(pubinfo[i].rolname) == 0)
            pg_log_warning("owner of publication \"%s\" appears to be invalid",
@@ -4005,7 +4016,12 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo)
        first = false;
    }
 
-   appendPQExpBufferStr(query, "');\n");
+   appendPQExpBufferStr(query, "'");
+
+   if (pubinfo->pubviaroot)
+       appendPQExpBufferStr(query, ", publish_via_partition_root = true");
+
+   appendPQExpBufferStr(query, ");\n");
 
    ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
                 ARCHIVE_OPTS(.tag = pubinfo->dobj.name,
index 3e11166615e105df7f0a9005cd1ee71c34744e4b..61c909e06d837e66bbb8f084acac7cceeeb8f067 100644 (file)
@@ -602,6 +602,7 @@ typedef struct _PublicationInfo
    bool        pubupdate;
    bool        pubdelete;
    bool        pubtruncate;
+   bool        pubviaroot;
 } PublicationInfo;
 
 /*
index 109245fea78c185201638f7c0bf68476da884f56..f05e914b4de52d9333f5b11b79078bc6c3d31d3d 100644 (file)
@@ -5707,7 +5707,7 @@ listPublications(const char *pattern)
    PQExpBufferData buf;
    PGresult   *res;
    printQueryOpt myopt = pset.popt;
-   static const bool translate_columns[] = {false, false, false, false, false, false, false};
+   static const bool translate_columns[] = {false, false, false, false, false, false, false, false};
 
    if (pset.sversion < 100000)
    {
@@ -5738,6 +5738,10 @@ listPublications(const char *pattern)
        appendPQExpBuffer(&buf,
                          ",\n  pubtruncate AS \"%s\"",
                          gettext_noop("Truncates"));
+   if (pset.sversion >= 130000)
+       appendPQExpBuffer(&buf,
+                         ",\n  pubviaroot AS \"%s\"",
+                         gettext_noop("Via root"));
 
    appendPQExpBufferStr(&buf,
                         "\nFROM pg_catalog.pg_publication\n");
@@ -5779,6 +5783,7 @@ describePublications(const char *pattern)
    int         i;
    PGresult   *res;
    bool        has_pubtruncate;
+   bool        has_pubviaroot;
 
    if (pset.sversion < 100000)
    {
@@ -5791,6 +5796,7 @@ describePublications(const char *pattern)
    }
 
    has_pubtruncate = (pset.sversion >= 110000);
+   has_pubviaroot = (pset.sversion >= 130000);
 
    initPQExpBuffer(&buf);
 
@@ -5801,6 +5807,9 @@ describePublications(const char *pattern)
    if (has_pubtruncate)
        appendPQExpBufferStr(&buf,
                             ", pubtruncate");
+   if (has_pubviaroot)
+       appendPQExpBufferStr(&buf,
+                            ", pubviaroot");
    appendPQExpBufferStr(&buf,
                         "\nFROM pg_catalog.pg_publication\n");
 
@@ -5850,6 +5859,8 @@ describePublications(const char *pattern)
 
        if (has_pubtruncate)
            ncols++;
+       if (has_pubviaroot)
+           ncols++;
 
        initPQExpBuffer(&title);
        printfPQExpBuffer(&title, _("Publication %s"), pubname);
@@ -5862,6 +5873,8 @@ describePublications(const char *pattern)
        printTableAddHeader(&cont, gettext_noop("Deletes"), true, align);
        if (has_pubtruncate)
            printTableAddHeader(&cont, gettext_noop("Truncates"), true, align);
+       if (has_pubviaroot)
+           printTableAddHeader(&cont, gettext_noop("Via root"), true, align);
 
        printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false);
        printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false);
@@ -5870,6 +5883,8 @@ describePublications(const char *pattern)
        printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false);
        if (has_pubtruncate)
            printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false);
+       if (has_pubviaroot)
+           printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false);
 
        if (!puballtables)
        {
index 27381d7874f259720a10839d83a990e200bccdcb..13bbddf785ec703993b724136e126985f504d2b3 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202004073
+#define CATALOG_VERSION_NO 202004074
 
 #endif
index bb52e8c5e0853262c15cd085900210225d90bfc8..ec02f48da0fa78a4092a69127bed535636d87ff0 100644 (file)
@@ -52,6 +52,8 @@ CATALOG(pg_publication,6104,PublicationRelationId)
    /* true if truncates are published */
    bool        pubtruncate;
 
+   /* true if partition changes are published using root schema */
+   bool        pubviaroot;
 } FormData_pg_publication;
 
 /* ----------------
@@ -74,6 +76,7 @@ typedef struct Publication
    Oid         oid;
    char       *name;
    bool        alltables;
+   bool        pubviaroot;
    PublicationActions pubactions;
 } Publication;
 
@@ -99,7 +102,7 @@ typedef enum PublicationPartOpt
 
 extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
 extern List *GetAllTablesPublications(void);
-extern List *GetAllTablesPublicationRelations(void);
+extern List *GetAllTablesPublicationRelations(bool pubviaroot);
 
 extern bool is_publishable_relation(Relation rel);
 extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
index 2634d2c1e142bf5affb4d11ca413456b4f62b361..63d6ab7a4ef265b3a5286453d85f05502e274e91 100644 (file)
@@ -25,21 +25,23 @@ CREATE PUBLICATION testpub_xxx WITH (foo);
 ERROR:  unrecognized publication parameter: "foo"
 CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum');
 ERROR:  unrecognized "publish" value: "cluster"
+CREATE PUBLICATION testpub_xxx WITH (publish_via_partition_root = 'true', publish_via_partition_root = '0');
+ERROR:  conflicting or redundant options
 \dRp
-                                         List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------+--------------------------+------------+---------+---------+---------+-----------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f
- testpub_default    | regress_publication_user | f          | f       | t       | f       | f
+                                              List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f
+ testpub_default    | regress_publication_user | f          | f       | t       | f       | f         | f
 (2 rows)
 
 ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete');
 \dRp
-                                         List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------+--------------------------+------------+---------+---------+---------+-----------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f
- testpub_default    | regress_publication_user | f          | t       | t       | t       | f
+                                              List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f
+ testpub_default    | regress_publication_user | f          | t       | t       | t       | f         | f
 (2 rows)
 
 --- adding tables
@@ -83,10 +85,10 @@ Publications:
     "testpub_foralltables"
 
 \dRp+ testpub_foralltables
-                        Publication testpub_foralltables
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | t          | t       | t       | f       | f
+                              Publication testpub_foralltables
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | t          | t       | t       | f       | f         | f
 (1 row)
 
 DROP TABLE testpub_tbl2;
@@ -98,19 +100,19 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3;
 CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3;
 RESET client_min_messages;
 \dRp+ testpub3
-                              Publication testpub3
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | t
+                                    Publication testpub3
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
 Tables:
     "public.testpub_tbl3"
     "public.testpub_tbl3a"
 
 \dRp+ testpub4
-                              Publication testpub4
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | t
+                                    Publication testpub4
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
 Tables:
     "public.testpub_tbl3"
 
@@ -129,10 +131,10 @@ ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
 -- only parent is listed as being in publication, not the partition
 ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
 \dRp+ testpub_forparted
-                          Publication testpub_forparted
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | t
+                               Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
 Tables:
     "public.testpub_parted"
 
@@ -143,6 +145,15 @@ HINT:  To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
 ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
 -- works again, because parent's publication is no longer considered
 UPDATE testpub_parted1 SET a = 1;
+ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
+\dRp+ testpub_forparted
+                               Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | t
+Tables:
+    "public.testpub_parted"
+
 DROP TABLE testpub_parted1;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
 -- fail - view
@@ -159,10 +170,10 @@ ERROR:  relation "testpub_tbl1" is already member of publication "testpub_fortbl
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
 ERROR:  publication "testpub_fortbl" already exists
 \dRp+ testpub_fortbl
-                           Publication testpub_fortbl
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | t
+                                 Publication testpub_fortbl
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -200,10 +211,10 @@ Publications:
     "testpub_fortbl"
 
 \dRp+ testpub_default
-                           Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | f
+                                Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | f         | f
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -247,10 +258,10 @@ DROP TABLE testpub_parted;
 DROP VIEW testpub_view;
 DROP TABLE testpub_tbl1;
 \dRp+ testpub_default
-                           Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | f
+                                Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | f         | f
 (1 row)
 
 -- fail - must be owner of publication
@@ -260,20 +271,20 @@ ERROR:  must be owner of publication testpub_default
 RESET ROLE;
 ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
 \dRp testpub_foo
-                                     List of publications
-    Name     |          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
--------------+--------------------------+------------+---------+---------+---------+-----------
- testpub_foo | regress_publication_user | f          | t       | t       | t       | f
+                                           List of publications
+    Name     |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+-------------+--------------------------+------------+---------+---------+---------+-----------+----------
+ testpub_foo | regress_publication_user | f          | t       | t       | t       | f         | f
 (1 row)
 
 -- rename back to keep the rest simple
 ALTER PUBLICATION testpub_foo RENAME TO testpub_default;
 ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
 \dRp testpub_default
-                                        List of publications
-      Name       |           Owner           | All tables | Inserts | Updates | Deletes | Truncates 
------------------+---------------------------+------------+---------+---------+---------+-----------
- testpub_default | regress_publication_user2 | f          | t       | t       | t       | f
+                                             List of publications
+      Name       |           Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+-----------------+---------------------------+------------+---------+---------+---------+-----------+----------
+ testpub_default | regress_publication_user2 | f          | t       | t       | t       | f         | f
 (1 row)
 
 DROP PUBLICATION testpub_default;
index 219e04129d2d9e8508c6888a9ba6ae9dfa88c5dd..d844075368d0c56c051beb5effab21383d3a8682 100644 (file)
@@ -23,6 +23,7 @@ ALTER PUBLICATION testpub_default SET (publish = update);
 -- error cases
 CREATE PUBLICATION testpub_xxx WITH (foo);
 CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum');
+CREATE PUBLICATION testpub_xxx WITH (publish_via_partition_root = 'true', publish_via_partition_root = '0');
 
 \dRp
 
@@ -87,6 +88,8 @@ UPDATE testpub_parted1 SET a = 1;
 ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
 -- works again, because parent's publication is no longer considered
 UPDATE testpub_parted1 SET a = 1;
+ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
+\dRp+ testpub_forparted
 DROP TABLE testpub_parted1;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
 
index 5db1b21c594a264c72b9b2a077eaa3e4cdc8fb00..208bb556ce41217636757ca33a2b559f52a95900 100644 (file)
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 24;
+use Test::More tests => 51;
 
 # setup
 
@@ -48,7 +48,6 @@ $node_subscriber1->safe_psql('postgres',
    "CREATE TABLE tab1 (c text, a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
 $node_subscriber1->safe_psql('postgres',
    "CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT NULL)");
-
 $node_subscriber1->safe_psql('postgres',
    "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)");
 $node_subscriber1->safe_psql('postgres',
@@ -87,6 +86,8 @@ $node_subscriber1->poll_query_until('postgres', $synced_query)
 $node_subscriber2->poll_query_until('postgres', $synced_query)
   or die "Timed out while waiting for subscriber to synchronize data";
 
+# Tests for replication using leaf partition identity and schema
+
 # insert
 $node_publisher->safe_psql('postgres',
    "INSERT INTO tab1 VALUES (1)");
@@ -260,3 +261,296 @@ is($result, qq(), 'truncate of tab1_1 replicated');
 $result = $node_subscriber2->safe_psql('postgres',
    "SELECT a FROM tab1 ORDER BY 1");
 is($result, qq(), 'truncate of tab1 replicated');
+
+# Tests for replication using root table identity and schema
+
+# publisher
+$node_publisher->safe_psql('postgres',
+   "DROP PUBLICATION pub1");
+$node_publisher->safe_psql('postgres',
+   "CREATE TABLE tab2 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+   "CREATE TABLE tab2_1 (b text, a int NOT NULL)");
+$node_publisher->safe_psql('postgres',
+   "ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES IN (0, 1, 2, 3)");
+$node_publisher->safe_psql('postgres',
+   "CREATE TABLE tab2_2 PARTITION OF tab2 FOR VALUES IN (5, 6)");
+
+$node_publisher->safe_psql('postgres',
+   "CREATE TABLE tab3 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+   "CREATE TABLE tab3_1 PARTITION OF tab3 FOR VALUES IN (0, 1, 2, 3, 5, 6)");
+$node_publisher->safe_psql('postgres',
+   "ALTER PUBLICATION pub_all SET (publish_via_partition_root = true)");
+# Note: tab3_1's parent is not in the publication, in which case its
+# changes are published using own identity.
+$node_publisher->safe_psql('postgres',
+   "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab3_1 WITH (publish_via_partition_root = true)");
+
+# subscriber 1
+$node_subscriber1->safe_psql('postgres',
+   "DROP SUBSCRIPTION sub1");
+$node_subscriber1->safe_psql('postgres',
+   "CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub1_tab2', b text) PARTITION BY RANGE (a)");
+$node_subscriber1->safe_psql('postgres',
+   "CREATE TABLE tab2_1 (c text DEFAULT 'sub1_tab2', b text, a int NOT NULL)");
+$node_subscriber1->safe_psql('postgres',
+   "ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES FROM (0) TO (10)");
+$node_subscriber1->safe_psql('postgres',
+   "CREATE TABLE tab3_1 (c text DEFAULT 'sub1_tab3_1', b text, a int NOT NULL PRIMARY KEY)");
+$node_subscriber1->safe_psql('postgres',
+   "CREATE SUBSCRIPTION sub_viaroot CONNECTION '$publisher_connstr' PUBLICATION pub_viaroot");
+
+# subscriber 2
+$node_subscriber2->safe_psql('postgres',
+   "DROP TABLE tab1");
+$node_subscriber2->safe_psql('postgres',
+   "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text) PARTITION BY HASH (a)");
+# Note: tab1's partitions are named tab1_1 and tab1_2 on the publisher.
+$node_subscriber2->safe_psql('postgres',
+   "CREATE TABLE tab1_part1 (b text, c text, a int NOT NULL)");
+$node_subscriber2->safe_psql('postgres',
+   "ALTER TABLE tab1 ATTACH PARTITION tab1_part1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)");
+$node_subscriber2->safe_psql('postgres',
+   "CREATE TABLE tab1_part2 PARTITION OF tab1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)");
+$node_subscriber2->safe_psql('postgres',
+   "CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab2', b text)");
+$node_subscriber2->safe_psql('postgres',
+   "CREATE TABLE tab3 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3', b text)");
+$node_subscriber2->safe_psql('postgres',
+   "CREATE TABLE tab3_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3_1', b text)");
+# Publication that sub2 points to now publishes via root, so must update
+# subscription target relations.
+$node_subscriber2->safe_psql('postgres',
+   "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
+
+# Wait for initial sync of all subscriptions
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber2->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# insert
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO tab1 VALUES (1), (0)");
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO tab1_1 (a) VALUES (3)");
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO tab1_2 VALUES (5)");
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO tab2 VALUES (1), (0), (3), (5)");
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO tab3 VALUES (1), (0), (3), (5)");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+   "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub1_tab2|0
+sub1_tab2|1
+sub1_tab2|3
+sub1_tab2|5), 'inserts into tab2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+   "SELECT c, a FROM tab3_1 ORDER BY 1, 2");
+is($result, qq(sub1_tab3_1|0
+sub1_tab3_1|1
+sub1_tab3_1|3
+sub1_tab3_1|5), 'inserts into tab3_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is($result, qq(sub2_tab1|0
+sub2_tab1|1
+sub2_tab1|3
+sub2_tab1|5), 'inserts into tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub2_tab2|0
+sub2_tab2|1
+sub2_tab2|3
+sub2_tab2|5), 'inserts into tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT c, a FROM tab3 ORDER BY 1, 2");
+is($result, qq(sub2_tab3|0
+sub2_tab3|1
+sub2_tab3|3
+sub2_tab3|5), 'inserts into tab3 replicated');
+
+# update (replicated as update)
+$node_publisher->safe_psql('postgres',
+   "UPDATE tab1 SET a = 6 WHERE a = 5");
+$node_publisher->safe_psql('postgres',
+   "UPDATE tab2 SET a = 6 WHERE a = 5");
+$node_publisher->safe_psql('postgres',
+   "UPDATE tab3 SET a = 6 WHERE a = 5");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+   "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub1_tab2|0
+sub1_tab2|1
+sub1_tab2|3
+sub1_tab2|6), 'update of tab2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+   "SELECT c, a FROM tab3_1 ORDER BY 1, 2");
+is($result, qq(sub1_tab3_1|0
+sub1_tab3_1|1
+sub1_tab3_1|3
+sub1_tab3_1|6), 'update of tab3_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is($result, qq(sub2_tab1|0
+sub2_tab1|1
+sub2_tab1|3
+sub2_tab1|6), 'inserts into tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub2_tab2|0
+sub2_tab2|1
+sub2_tab2|3
+sub2_tab2|6), 'inserts into tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT c, a FROM tab3 ORDER BY 1, 2");
+is($result, qq(sub2_tab3|0
+sub2_tab3|1
+sub2_tab3|3
+sub2_tab3|6), 'inserts into tab3 replicated');
+
+# update (replicated as delete+insert)
+$node_publisher->safe_psql('postgres',
+   "UPDATE tab1 SET a = 2 WHERE a = 6");
+$node_publisher->safe_psql('postgres',
+   "UPDATE tab2 SET a = 2 WHERE a = 6");
+$node_publisher->safe_psql('postgres',
+   "UPDATE tab3 SET a = 2 WHERE a = 6");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+   "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub1_tab2|0
+sub1_tab2|1
+sub1_tab2|2
+sub1_tab2|3), 'update of tab2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+   "SELECT c, a FROM tab3_1 ORDER BY 1, 2");
+is($result, qq(sub1_tab3_1|0
+sub1_tab3_1|1
+sub1_tab3_1|2
+sub1_tab3_1|3), 'update of tab3_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is($result, qq(sub2_tab1|0
+sub2_tab1|1
+sub2_tab1|2
+sub2_tab1|3), 'update of tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub2_tab2|0
+sub2_tab2|1
+sub2_tab2|2
+sub2_tab2|3), 'update of tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT c, a FROM tab3 ORDER BY 1, 2");
+is($result, qq(sub2_tab3|0
+sub2_tab3|1
+sub2_tab3|2
+sub2_tab3|3), 'update of tab3 replicated');
+
+# delete
+$node_publisher->safe_psql('postgres',
+   "DELETE FROM tab1");
+$node_publisher->safe_psql('postgres',
+   "DELETE FROM tab2");
+$node_publisher->safe_psql('postgres',
+   "DELETE FROM tab3");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+   "SELECT a FROM tab2");
+is($result, qq(), 'delete tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT a FROM tab1");
+is($result, qq(), 'delete from tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT a FROM tab2");
+is($result, qq(), 'delete from tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT a FROM tab3");
+is($result, qq(), 'delete from tab3 replicated');
+
+# truncate
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO tab1 VALUES (1), (2), (5)");
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO tab2 VALUES (1), (2), (5)");
+# these will NOT be replicated
+$node_publisher->safe_psql('postgres',
+   "TRUNCATE tab1_2, tab2_1, tab3_1");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+   "SELECT a FROM tab2 ORDER BY 1");
+is($result, qq(1
+2
+5), 'truncate of tab2_1 NOT replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT a FROM tab1 ORDER BY 1");
+is($result, qq(1
+2
+5), 'truncate of tab1_2 NOT replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT a FROM tab2 ORDER BY 1");
+is($result, qq(1
+2
+5), 'truncate of tab2_1 NOT replicated');
+
+$node_publisher->safe_psql('postgres',
+   "TRUNCATE tab1, tab2, tab3");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+   "SELECT a FROM tab2");
+is($result, qq(), 'truncate of tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT a FROM tab1");
+is($result, qq(), 'truncate of tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT a FROM tab2");
+is($result, qq(), 'truncate of tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT a FROM tab3");
+is($result, qq(), 'truncate of tab3 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT a FROM tab3_1");
+is($result, qq(), 'truncate of tab3_1 replicated');