Avoid syncing data twice for the 'publish_via_partition_root' option.
authorAmit Kapila <akapila@postgresql.org>
Wed, 29 Mar 2023 05:16:58 +0000 (10:46 +0530)
committerAmit Kapila <akapila@postgresql.org>
Wed, 29 Mar 2023 05:16:58 +0000 (10:46 +0530)
When there are multiple publications for a subscription and one of those
publishes via the parent table by using publish_via_partition_root and the
other one directly publishes the child table, we end up copying the same
data twice during initial synchronization. The reason for this was that we
get both the parent and child tables from the publisher and try to copy
the data for both of them.

This patch extends the function pg_get_publication_tables() to take a
publication list as its input parameter. This allows us to exclude a
partition table whose ancestor is published by the same publication list.

This problem does exist in back-branches but we decide to fix it there in
a separate commit if required. The fix for back-branches requires quite
complicated changes to fetch the required table information from the
publisher as we can't update the function pg_get_publication_tables() in
back-branches. We are not sure whether we want to deviate and complicate
the code in back-branches for this problem as there are no field reports
yet.

Author: Wang wei
Reviewed-by: Peter Smith, Jacob Champion, Kuroda Hayato, Vignesh C, Osumi Takamichi, Amit Kapila
Discussion: https://postgr.es/m/OS0PR01MB57167F45D481F78CDC5986F794B99@OS0PR01MB5716.jpnprd01.prod.outlook.com

doc/src/sgml/ref/create_publication.sgml
src/backend/catalog/pg_publication.c
src/backend/commands/subscriptioncmds.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/test/regress/expected/rules.out
src/test/subscription/t/013_partition.pl
src/test/subscription/t/028_row_filter.pl
src/test/subscription/t/031_column_list.pl
src/tools/pgindent/typedefs.list

index a2946feaa32eeb471bfe9010a38de693cb7679e8..b2540c63ff4c7e2657dd875b3cdb270c2f7c59bb 100644 (file)
@@ -201,6 +201,16 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
           consisting of a different set of partitions.
          </para>
 
+         <para>
+          There can be a case where a subscription combines multiple
+          publications. If a partitioned table is published by any
+          subscribed publications which set
+          <literal>publish_via_partition_root</literal> = true, changes on this
+          partitioned table (or on its partitions) will be published using
+          the identity and schema of this partitioned table rather than
+          that of the individual partitions.
+         </para>
+
          <para>
           This parameter also affects how row filters and column lists are
           chosen for partitions; see below for details.
index a98fcad421f69b6b4a19397110c8e1b21ebfbac1..47637f28ab6427bd876c9a7ffe5bca971f0ecc13 100644 (file)
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
+/* Records association between publication and published table */
+typedef struct
+{
+   Oid         relid;          /* OID of published table */
+   Oid         pubid;          /* OID of publication that publishes this
+                                * table. */
+} published_rel;
+
 static void publication_translate_columns(Relation targetrel, List *columns,
                                          int *natts, AttrNumber **attrs);
 
@@ -172,42 +180,57 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
 }
 
 /*
- * Filter out the partitions whose parent tables were also specified in
- * the publication.
+ * Returns true if the ancestor is in the list of published relations.
+ * Otherwise, returns false.
  */
-static List *
-filter_partitions(List *relids)
+static bool
+is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
+{
+   ListCell   *lc;
+
+   foreach(lc, table_infos)
+   {
+       Oid         relid = ((published_rel *) lfirst(lc))->relid;
+
+       if (relid == ancestor)
+           return true;
+   }
+
+   return false;
+}
+
+/*
+ * Filter out the partitions whose parent tables are also present in the list.
+ */
+static void
+filter_partitions(List *table_infos)
 {
-   List       *result = NIL;
    ListCell   *lc;
-   ListCell   *lc2;
 
-   foreach(lc, relids)
+   foreach(lc, table_infos)
    {
        bool        skip = false;
        List       *ancestors = NIL;
-       Oid         relid = lfirst_oid(lc);
+       ListCell   *lc2;
+       published_rel *table_info = (published_rel *) lfirst(lc);
 
-       if (get_rel_relispartition(relid))
-           ancestors = get_partition_ancestors(relid);
+       if (get_rel_relispartition(table_info->relid))
+           ancestors = get_partition_ancestors(table_info->relid);
 
        foreach(lc2, ancestors)
        {
            Oid         ancestor = lfirst_oid(lc2);
 
-           /* Check if the parent table exists in the published table list. */
-           if (list_member_oid(relids, ancestor))
+           if (is_ancestor_member_tableinfos(ancestor, table_infos))
            {
                skip = true;
                break;
            }
        }
 
-       if (!skip)
-           result = lappend_oid(result, relid);
+       if (skip)
+           table_infos = foreach_delete_current(table_infos, lc);
    }
-
-   return result;
 }
 
 /*
@@ -1026,22 +1049,27 @@ GetPublicationByName(const char *pubname, bool missing_ok)
 }
 
 /*
- * Returns information of tables in a publication.
+ * Get information of the tables in the given publication array.
+ *
+ * Returns pubid, relid, column list, row filter for each table.
  */
 Datum
 pg_get_publication_tables(PG_FUNCTION_ARGS)
 {
-#define NUM_PUBLICATION_TABLES_ELEM    3
+#define NUM_PUBLICATION_TABLES_ELEM    4
    FuncCallContext *funcctx;
-   char       *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
-   Publication *publication;
-   List       *tables;
+   List       *table_infos = NIL;
 
    /* stuff done only on the first call of the function */
    if (SRF_IS_FIRSTCALL())
    {
        TupleDesc   tupdesc;
        MemoryContext oldcontext;
+       ArrayType  *arr;
+       Datum      *elems;
+       int         nelems,
+                   i;
+       bool        viaroot = false;
 
        /* create a function context for cross-call persistence */
        funcctx = SRF_FIRSTCALL_INIT();
@@ -1049,68 +1077,108 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
        /* switch to memory context appropriate for multiple function calls */
        oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
 
-       publication = GetPublicationByName(pubname, false);
-
        /*
-        * Publications support partitioned tables, although all changes are
-        * replicated using leaf partition identity and schema, so we only
-        * need those.
+        * Deconstruct the parameter into elements where each element is a
+        * publication name.
         */
-       if (publication->alltables)
-       {
-           tables = GetAllTablesPublicationRelations(publication->pubviaroot);
-       }
-       else
+       arr = PG_GETARG_ARRAYTYPE_P(0);
+       deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
+                         &elems, NULL, &nelems);
+
+       /* Get Oids of tables from each publication. */
+       for (i = 0; i < nelems; i++)
        {
-           List       *relids,
-                      *schemarelids;
-
-           relids = GetPublicationRelations(publication->oid,
-                                            publication->pubviaroot ?
-                                            PUBLICATION_PART_ROOT :
-                                            PUBLICATION_PART_LEAF);
-           schemarelids = GetAllSchemaPublicationRelations(publication->oid,
-                                                           publication->pubviaroot ?
-                                                           PUBLICATION_PART_ROOT :
-                                                           PUBLICATION_PART_LEAF);
-           tables = list_concat_unique_oid(relids, schemarelids);
+           Publication *pub_elem;
+           List       *pub_elem_tables = NIL;
+           ListCell   *lc;
+
+           pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
 
            /*
-            * If the publication publishes partition changes via their
-            * respective root partitioned tables, we must exclude partitions
-            * in favor of including the root partitioned tables. Otherwise,
-            * the function could return both the child and parent tables
-            * which could cause data of the child table to be
-            * double-published on the subscriber side.
+            * Publications support partitioned tables. If
+            * publish_via_partition_root is false, all changes are replicated
+            * using leaf partition identity and schema, so we only need
+            * those. Otherwise, get the partitioned table itself.
             */
-           if (publication->pubviaroot)
-               tables = filter_partitions(tables);
+           if (pub_elem->alltables)
+               pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
+           else
+           {
+               List       *relids,
+                          *schemarelids;
+
+               relids = GetPublicationRelations(pub_elem->oid,
+                                                pub_elem->pubviaroot ?
+                                                PUBLICATION_PART_ROOT :
+                                                PUBLICATION_PART_LEAF);
+               schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
+                                                               pub_elem->pubviaroot ?
+                                                               PUBLICATION_PART_ROOT :
+                                                               PUBLICATION_PART_LEAF);
+               pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
+           }
+
+           /*
+            * Record the published table and the corresponding publication so
+            * that we can get row filters and column lists later.
+            *
+            * When a table is published by multiple publications, to obtain
+            * all row filters and column lists, the structure related to this
+            * table will be recorded multiple times.
+            */
+           foreach(lc, pub_elem_tables)
+           {
+               published_rel *table_info = (published_rel *) palloc(sizeof(published_rel));
+
+               table_info->relid = lfirst_oid(lc);
+               table_info->pubid = pub_elem->oid;
+               table_infos = lappend(table_infos, table_info);
+           }
+
+           /* At least one publication is using publish_via_partition_root. */
+           if (pub_elem->pubviaroot)
+               viaroot = true;
        }
 
+       /*
+        * If the publication publishes partition changes via their respective
+        * root partitioned tables, we must exclude partitions in favor of
+        * including the root partitioned tables. Otherwise, the function
+        * could return both the child and parent tables which could cause
+        * data of the child table to be double-published on the subscriber
+        * side.
+        */
+       if (viaroot)
+           filter_partitions(table_infos);
+
        /* Construct a tuple descriptor for the result rows. */
        tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
-       TupleDescInitEntry(tupdesc, (AttrNumber) 1, "relid",
+       TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
                           OIDOID, -1, 0);
-       TupleDescInitEntry(tupdesc, (AttrNumber) 2, "attrs",
+       TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
+                          OIDOID, -1, 0);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
                           INT2VECTOROID, -1, 0);
-       TupleDescInitEntry(tupdesc, (AttrNumber) 3, "qual",
+       TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
                           PG_NODE_TREEOID, -1, 0);
 
        funcctx->tuple_desc = BlessTupleDesc(tupdesc);
-       funcctx->user_fctx = (void *) tables;
+       funcctx->user_fctx = (void *) table_infos;
 
        MemoryContextSwitchTo(oldcontext);
    }
 
    /* stuff done on every call of the function */
    funcctx = SRF_PERCALL_SETUP();
-   tables = (List *) funcctx->user_fctx;
+   table_infos = (List *) funcctx->user_fctx;
 
-   if (funcctx->call_cntr < list_length(tables))
+   if (funcctx->call_cntr < list_length(table_infos))
    {
        HeapTuple   pubtuple = NULL;
        HeapTuple   rettuple;
-       Oid         relid = list_nth_oid(tables, funcctx->call_cntr);
+       Publication *pub;
+       published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
+       Oid         relid = table_info->relid;
        Oid         schemaid = get_rel_namespace(relid);
        Datum       values[NUM_PUBLICATION_TABLES_ELEM] = {0};
        bool        nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
@@ -1119,42 +1187,43 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
         * Form tuple with appropriate data.
         */
 
-       publication = GetPublicationByName(pubname, false);
+       pub = GetPublication(table_info->pubid);
 
-       values[0] = ObjectIdGetDatum(relid);
+       values[0] = ObjectIdGetDatum(pub->oid);
+       values[1] = ObjectIdGetDatum(relid);
 
        /*
         * We don't consider row filters or column lists for FOR ALL TABLES or
         * FOR TABLES IN SCHEMA publications.
         */
-       if (!publication->alltables &&
+       if (!pub->alltables &&
            !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
                                   ObjectIdGetDatum(schemaid),
-                                  ObjectIdGetDatum(publication->oid)))
+                                  ObjectIdGetDatum(pub->oid)))
            pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
                                           ObjectIdGetDatum(relid),
-                                          ObjectIdGetDatum(publication->oid));
+                                          ObjectIdGetDatum(pub->oid));
 
        if (HeapTupleIsValid(pubtuple))
        {
            /* Lookup the column list attribute. */
-           values[1] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
+           values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
                                        Anum_pg_publication_rel_prattrs,
-                                       &(nulls[1]));
+                                       &(nulls[2]));
 
            /* Null indicates no filter. */
-           values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
+           values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
                                        Anum_pg_publication_rel_prqual,
-                                       &(nulls[2]));
+                                       &(nulls[3]));
        }
        else
        {
-           nulls[1] = true;
            nulls[2] = true;
+           nulls[3] = true;
        }
 
        /* Show all columns when the column list is not specified. */
-       if (nulls[1] == true)
+       if (nulls[2])
        {
            Relation    rel = table_open(relid, AccessShareLock);
            int         nattnums = 0;
@@ -1176,8 +1245,8 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 
            if (nattnums > 0)
            {
-               values[1] = PointerGetDatum(buildint2vector(attnums, nattnums));
-               nulls[1] = false;
+               values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
+               nulls[2] = false;
            }
 
            table_close(rel, AccessShareLock);
index 8a26ddab1c704c10bd4ce3f391f0202991bfa39e..93a238412aad131e4c5746e3dd1574a87d3ffdf9 100644 (file)
@@ -1936,21 +1936,60 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
    WalRcvExecResult *res;
    StringInfoData cmd;
    TupleTableSlot *slot;
-   Oid         tableRow[3] = {TEXTOID, TEXTOID, NAMEARRAYOID};
+   Oid         tableRow[3] = {TEXTOID, TEXTOID, InvalidOid};
    List       *tablelist = NIL;
-   bool        check_columnlist = (walrcv_server_version(wrconn) >= 150000);
+   int         server_version = walrcv_server_version(wrconn);
+   bool        check_columnlist = (server_version >= 150000);
 
    initStringInfo(&cmd);
-   appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
 
-   /* Get column lists for each relation if the publisher supports it */
-   if (check_columnlist)
-       appendStringInfoString(&cmd, ", t.attnames\n");
+   /* Get the list of tables from the publisher. */
+   if (server_version >= 160000)
+   {
+       StringInfoData pub_names;
 
-   appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
-                          " WHERE t.pubname IN (");
-   get_publications_str(publications, &cmd, true);
-   appendStringInfoChar(&cmd, ')');
+       tableRow[2] = INT2VECTOROID;
+       initStringInfo(&pub_names);
+       get_publications_str(publications, &pub_names, true);
+
+       /*
+        * From version 16, we allowed passing multiple publications to the
+        * function pg_get_publication_tables. This helped to filter out the
+        * partition table whose ancestor is also published in this
+        * publication array.
+        *
+        * Join pg_get_publication_tables with pg_publication to exclude
+        * non-existing publications.
+        *
+        * Note that attrs are always stored in sorted order so we don't need
+        * to worry if different publications have specified them in a
+        * different order. See publication_translate_columns.
+        */
+       appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
+                        "       FROM pg_class c\n"
+                        "         JOIN pg_namespace n ON n.oid = c.relnamespace\n"
+                        "         JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
+                        "                FROM pg_publication\n"
+                        "                WHERE pubname IN ( %s )) AS gpt\n"
+                        "             ON gpt.relid = c.oid\n",
+                        pub_names.data);
+
+       pfree(pub_names.data);
+   }
+   else
+   {
+       tableRow[2] = NAMEARRAYOID;
+       appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
+
+       /* Get column lists for each relation if the publisher supports it */
+       if (check_columnlist)
+           appendStringInfoString(&cmd, ", t.attnames\n");
+
+       appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
+                              " WHERE t.pubname IN (");
+       get_publications_str(publications, &cmd, true);
+       appendStringInfoChar(&cmd, ')');
+   }
 
    res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
    pfree(cmd.data);
index c187d47eb27c6b6584d71ccfa646abd495e0f522..db590b6bf076169870a7149b8e40e2ba93a04436 100644 (file)
@@ -57,6 +57,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202303281
+#define CATALOG_VERSION_NO 202303291
 
 #endif
index 7c358cff162ac40f87a5e10a37f1e89d5b0c6300..def8ee2045536ce3e419bbfceb087d4928f17d4b 100644 (file)
   prosrc => 'pg_show_replication_origin_status' },
 
 # publications
-{ oid => '6119', descr => 'get information of tables in a publication',
-  proname => 'pg_get_publication_tables', prorows => '1000', proretset => 't',
-  provolatile => 's', prorettype => 'record', proargtypes => 'text',
-  proallargtypes => '{text,oid,int2vector,pg_node_tree}',
-  proargmodes => '{i,o,o,o}', proargnames => '{pubname,relid,attrs,qual}',
+{ oid => '6119',
+  descr => 'get information of the tables that are part of the specified publications',
+  proname => 'pg_get_publication_tables', prorows => '1000',
+  provariadic => 'text', proretset => 't', provolatile => 's',
+  prorettype => 'record', proargtypes => '_text',
+  proallargtypes => '{_text,oid,oid,int2vector,pg_node_tree}',
+  proargmodes => '{v,o,o,o,o}', proargnames => '{pubname,pubid,relid,attrs,qual}',
   prosrc => 'pg_get_publication_tables' },
 { oid => '6121',
   descr => 'returns whether a relation can be part of a publication',
index 996d22b7ddf8ec5a3dbcf06320a32dd697556368..c71758db46e926b65013b3e02fbf5b27fc0d0b98 100644 (file)
@@ -1449,7 +1449,7 @@ pg_publication_tables| SELECT p.pubname,
           WHERE ((a.attrelid = gpt.relid) AND (a.attnum = ANY ((gpt.attrs)::smallint[])))) AS attnames,
     pg_get_expr(gpt.qual, gpt.relid) AS rowfilter
    FROM pg_publication p,
-    LATERAL pg_get_publication_tables((p.pubname)::text) gpt(relid, attrs, qual),
+    LATERAL pg_get_publication_tables(VARIADIC ARRAY[(p.pubname)::text]) gpt(pubid, relid, attrs, qual),
     (pg_class c
      JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
   WHERE (c.oid = gpt.relid);
index 11a5c3c03ef80dd757e495ec443db1cb85551684..f03cd24246d2c747e191de29eb17771860f939a8 100644 (file)
@@ -409,10 +409,10 @@ $node_publisher->safe_psql('postgres',
 $node_publisher->safe_psql('postgres',
    "CREATE TABLE tab4 (a int PRIMARY KEY) PARTITION BY LIST (a)");
 $node_publisher->safe_psql('postgres',
-   "CREATE TABLE tab4_1 PARTITION OF tab4 FOR VALUES IN (0, 1) PARTITION BY LIST (a)"
+   "CREATE TABLE tab4_1 PARTITION OF tab4 FOR VALUES IN (-1, 0, 1) PARTITION BY LIST (a)"
 );
 $node_publisher->safe_psql('postgres',
-   "CREATE TABLE tab4_1_1 PARTITION OF tab4_1 FOR VALUES IN (0, 1)");
+   "CREATE TABLE tab4_1_1 PARTITION OF tab4_1 FOR VALUES IN (-1, 0, 1)");
 
 $node_publisher->safe_psql('postgres',
    "ALTER PUBLICATION pub_all SET (publish_via_partition_root = true)");
@@ -424,13 +424,13 @@ $node_publisher->safe_psql('postgres',
    "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab2_1, tab3_1 WITH (publish_via_partition_root = true)"
 );
 
-# for tab4, we publish changes through the "middle" partitioned table
 $node_publisher->safe_psql('postgres',
    "CREATE PUBLICATION pub_lower_level FOR TABLE tab4_1 WITH (publish_via_partition_root = true)"
 );
 
 # prepare data for the initial sync
 $node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (1)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab4 VALUES (-1)");
 
 # subscriber 1
 $node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION sub1");
@@ -479,9 +479,10 @@ $node_subscriber2->safe_psql('postgres',
    "CREATE TABLE tab4 (a int PRIMARY KEY)");
 $node_subscriber2->safe_psql('postgres',
    "CREATE TABLE tab4_1 (a int PRIMARY KEY)");
-# Publication that sub2 points to now publishes via root, so must update
-# subscription target relations. We set the list of publications so that
-# the FOR ALL TABLES publication is second (the list order matters).
+# Since we specified publish_via_partition_root in pub_all and
+# pub_lower_level, all partition tables use their root tables' identity and
+# schema. We set the list of publications so that the FOR ALL TABLES
+# publication is second (the list order matters).
 $node_subscriber2->safe_psql('postgres',
    "ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_lower_level, pub_all");
 
@@ -492,6 +493,12 @@ $node_subscriber2->wait_for_subscription_sync;
 # check that data is synced correctly
 $result = $node_subscriber1->safe_psql('postgres', "SELECT c, a FROM tab2");
 is($result, qq(sub1_tab2|1), 'initial data synced for pub_viaroot');
+$result =
+  $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4 ORDER BY 1");
+is($result, qq(-1), 'initial data synced for pub_lower_level and pub_all');
+$result =
+  $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4_1 ORDER BY 1");
+is($result, qq(), 'initial data synced for pub_lower_level and pub_all');
 
 # insert
 $node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (0)");
@@ -548,7 +555,8 @@ sub2_tab3|5), 'inserts into tab3 replicated');
 # maps to the tab4 relation on subscriber.
 $result =
   $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4 ORDER BY 1");
-is($result, qq(0), 'inserts into tab4 replicated');
+is( $result, qq(-1
+0), 'inserts into tab4 replicated');
 
 $result =
   $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4_1 ORDER BY 1");
@@ -574,7 +582,8 @@ $node_publisher->wait_for_catchup('sub2');
 # maps to the tab4 relation on subscriber.
 $result =
   $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4 ORDER BY 1");
-is( $result, qq(0
+is( $result, qq(-1
+0
 1), 'inserts into tab4 replicated');
 
 $result =
index def3fbc54bd2a14bf7685d23667eab8f6ac76ca5..b0d4b2d5b17a560c92fe77f881207b5d253d3754 100644 (file)
@@ -235,6 +235,11 @@ $node_publisher->safe_psql('postgres',
 $node_publisher->safe_psql('postgres',
    "CREATE TABLE tab_rowfilter_viaroot_part_1 PARTITION OF tab_rowfilter_viaroot_part FOR VALUES FROM (1) TO (20)"
 );
+$node_publisher->safe_psql('postgres',
+   "CREATE TABLE tab_rowfilter_parent_sync (a int) PARTITION BY RANGE (a)");
+$node_publisher->safe_psql('postgres',
+   "CREATE TABLE tab_rowfilter_child_sync PARTITION OF tab_rowfilter_parent_sync FOR VALUES FROM (1) TO (20)"
+);
 
 # setup structure on subscriber
 $node_subscriber->safe_psql('postgres',
@@ -285,6 +290,10 @@ $node_subscriber->safe_psql('postgres',
    "CREATE TABLE tab_rowfilter_viaroot_part (a int)");
 $node_subscriber->safe_psql('postgres',
    "CREATE TABLE tab_rowfilter_viaroot_part_1 (a int)");
+$node_subscriber->safe_psql('postgres',
+   "CREATE TABLE tab_rowfilter_parent_sync (a int)");
+$node_subscriber->safe_psql('postgres',
+   "CREATE TABLE tab_rowfilter_child_sync (a int)");
 
 # setup logical replication
 $node_publisher->safe_psql('postgres',
@@ -341,6 +350,15 @@ $node_publisher->safe_psql('postgres',
    "CREATE PUBLICATION tap_pub_viaroot_2 FOR TABLE tab_rowfilter_viaroot_part_1 WHERE (a < 15) WITH (publish_via_partition_root)"
 );
 
+# two publications, one publishing through ancestor and another one directly
+# publishing the partition, with different row filters
+$node_publisher->safe_psql('postgres',
+   "CREATE PUBLICATION tap_pub_parent_sync FOR TABLE tab_rowfilter_parent_sync WHERE (a > 15) WITH (publish_via_partition_root)"
+);
+$node_publisher->safe_psql('postgres',
+   "CREATE PUBLICATION tap_pub_child_sync FOR TABLE tab_rowfilter_child_sync WHERE (a < 15)"
+);
+
 #
 # The following INSERTs are executed before the CREATE SUBSCRIPTION, so these
 # SQL commands are for testing the initial data copy using logical replication.
@@ -362,6 +380,9 @@ $node_publisher->safe_psql('postgres',
 $node_publisher->safe_psql('postgres',
    "INSERT INTO tab_rowfilter_4 (c) SELECT generate_series(1, 10)");
 
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO tab_rowfilter_parent_sync(a) VALUES(14), (16)");
+
 # insert data into partitioned table and directly on the partition
 $node_publisher->safe_psql('postgres',
    "INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(1, 100),(7000, 101),(15000, 102),(5500, 300)"
@@ -387,7 +408,7 @@ $node_publisher->safe_psql('postgres',
 );
 
 $node_subscriber->safe_psql('postgres',
-   "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1"
+   "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1, tap_pub_parent_sync, tap_pub_child_sync"
 );
 
 # wait for initial table synchronization to finish
@@ -512,6 +533,25 @@ is( $result, qq(20
 30
 40), 'check initial data copy from table tab_rowfilter_inherited');
 
+# Check expected replicated rows for tap_pub_parent_sync and
+# tap_pub_child_sync.
+# Since the option publish_via_partition_root of tap_pub_parent_sync is true,
+# so the row filter of tap_pub_parent_sync will be used:
+# tap_pub_parent_sync filter is: (a > 15)
+# tap_pub_child_sync filter is: (a < 15)
+# - INSERT (14)        NO, 14 < 15
+# - INSERT (16)        YES, 16 > 15
+$result =
+  $node_subscriber->safe_psql('postgres',
+   "SELECT a FROM tab_rowfilter_parent_sync ORDER BY 1");
+is( $result, qq(16),
+   'check initial data copy from tab_rowfilter_parent_sync');
+$result =
+  $node_subscriber->safe_psql('postgres',
+   "SELECT a FROM tab_rowfilter_child_sync ORDER BY 1");
+is( $result, qq(),
+   'check initial data copy from tab_rowfilter_child_sync');
+
 # The following commands are executed after CREATE SUBSCRIPTION, so these SQL
 # commands are for testing normal logical replication behavior.
 #
index fd77757352a7ff454cfcf525a0f4dbc47f1afdb1..b67292ba9c6ebc24e6895acfb1791dce101b4fb1 100644 (file)
@@ -959,16 +959,21 @@ $node_publisher->safe_psql(
    CREATE TABLE test_root_1 PARTITION OF test_root FOR VALUES FROM (1) TO (10);
    CREATE TABLE test_root_2 PARTITION OF test_root FOR VALUES FROM (10) TO (20);
 
-   CREATE PUBLICATION pub_root_true FOR TABLE test_root (a) WITH (publish_via_partition_root = true);
+   CREATE PUBLICATION pub_test_root FOR TABLE test_root (a) WITH (publish_via_partition_root = true);
+   CREATE PUBLICATION pub_test_root_1 FOR TABLE test_root_1 (a, b);
 
    -- initial data
    INSERT INTO test_root VALUES (1, 2, 3);
    INSERT INTO test_root VALUES (10, 20, 30);
 ));
 
+# Subscribe to pub_test_root and pub_test_root_1 at the same time, which means
+# that the initial data will be synced once, and only the column list of the
+# parent table (test_root) in the publication pub_test_root will be used for
+# both table sync and data replication.
 $node_subscriber->safe_psql(
    'postgres', qq(
-   CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true;
+   CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_test_root, pub_test_root_1;
 ));
 
 $node_subscriber->wait_for_subscription_sync;
index 0b7bc45767184097de707f1701e51c1e2dc46cd6..f45bcc52d39543fbb548fbffa487a0fd312daf54 100644 (file)
@@ -3603,6 +3603,7 @@ pthread_mutex_t
 pthread_once_t
 pthread_t
 ptrdiff_t
+published_rel
 pull_var_clause_context
 pull_varattnos_context
 pull_varnos_context