Fix publish_as_relid with multiple publications
authorTomas Vondra <tomas.vondra@postgresql.org>
Wed, 16 Mar 2022 15:42:47 +0000 (16:42 +0100)
committerTomas Vondra <tomas.vondra@postgresql.org>
Wed, 16 Mar 2022 17:05:58 +0000 (18:05 +0100)
Commit 83fd4532a7 allowed publishing of changes via ancestors, for
publications defined with publish_via_partition_root. But the way
the ancestor was determined in get_rel_sync_entry() was incorrect,
simply updating the same variable. So with multiple publications,
replicating different ancestors, the outcome depended on the order
of publications in the list - the value from the last loop was used,
even if it wasn't the top-most ancestor.

This is a probably rare situation, as in most cases publications do
not overlap, so each partition has exactly one candidate ancestor
to replicate as and there's no ambiguity.

Fixed by tracking the "ancestor level" for each publication, and
picking the top-most ancestor. Adds a test case, verifying the
correct ancestor is used for publishing the changes and that this
does not depend on order of publications in the list.

Older releases have another bug in this loop - once all actions are
replicated, the loop is terminated, on the assumption that inspecting
additional publications is unecessary. But that misses the fact that
those additional applications may replicate different ancestors.

Fixed by removal of this break condition. We might still terminate the
loop in some cases (e.g. when replicating all actions and the ancestor
is the partition root).

Backpatch to 13, where publish_via_partition_root was introduced.

Initial report and fix by me, test added by Hou zj. Reviews and
improvements by Amit Kapila.

Author: Tomas Vondra, Hou zj, Amit Kapila
Reviewed-by: Amit Kapila, Hou zj
Discussion: https://postgr.es/m/d26d24dd-2fab-3c48-0162-2b7f84a9c893%40enterprisedb.com

src/backend/catalog/pg_publication.c
src/backend/commands/publicationcmds.c
src/backend/replication/pgoutput/pgoutput.c
src/include/catalog/pg_publication.h
src/test/subscription/t/013_partition.pl

index 25998fbb39b102a64f903434ded0d9caed4eb09b..789b895db89fe1503121dec2c5438c5410c30e0c 100644 (file)
@@ -277,16 +277,21 @@ GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
 
 /*
  * Returns the relid of the topmost ancestor that is published via this
- * publication if any, otherwise returns InvalidOid.
+ * publication if any and set its ancestor level to ancestor_level,
+ * otherwise returns InvalidOid.
+ *
+ * The ancestor_level value allows us to compare the results for multiple
+ * publications, and decide which value is higher up.
  *
  * Note that the list of ancestors should be ordered such that the topmost
  * ancestor is at the end of the list.
  */
 Oid
-GetTopMostAncestorInPublication(Oid puboid, List *ancestors)
+GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
 {
    ListCell   *lc;
    Oid         topmost_relid = InvalidOid;
+   int         level = 0;
 
    /*
     * Find the "topmost" ancestor that is in this publication.
@@ -297,13 +302,25 @@ GetTopMostAncestorInPublication(Oid puboid, List *ancestors)
        List       *apubids = GetRelationPublications(ancestor);
        List       *aschemaPubids = NIL;
 
+       level++;
+
        if (list_member_oid(apubids, puboid))
+       {
            topmost_relid = ancestor;
+
+           if (ancestor_level)
+               *ancestor_level = level;
+       }
        else
        {
            aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
            if (list_member_oid(aschemaPubids, puboid))
+           {
                topmost_relid = ancestor;
+
+               if (ancestor_level)
+                   *ancestor_level = level;
+           }
        }
 
        list_free(apubids);
index 16b8661a1b7f183abb87b672768548830218206c..1aad2e769cb8d85e0559be8e02fb9dfdc8a34d79 100644 (file)
@@ -323,7 +323,8 @@ contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors,
     */
    if (pubviaroot && relation->rd_rel->relispartition)
    {
-       publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors);
+       publish_as_relid
+           = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
 
        if (!OidIsValid(publish_as_relid))
            publish_as_relid = relid;
index ea57a0477f057c694834e0de79cef984926539b5..d869f3e93ebf0d2bd666ce45914811cf4038ac70 100644 (file)
@@ -1748,6 +1748,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
        List       *schemaPubids = GetSchemaPublications(schemaId);
        ListCell   *lc;
        Oid         publish_as_relid = relid;
+       int         publish_ancestor_level = 0;
        bool        am_partition = get_rel_relispartition(relid);
        char        relkind = get_rel_relkind(relid);
        List       *rel_publications = NIL;
@@ -1815,11 +1816,28 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
            Publication *pub = lfirst(lc);
            bool        publish = false;
 
+           /*
+            * Under what relid should we publish changes in this publication?
+            * We'll use the top-most relid across all publications. Also track
+            * the ancestor level for this publication.
+            */
+           Oid pub_relid = relid;
+           int ancestor_level = 0;
+
+           /*
+            * If this is a FOR ALL TABLES publication, pick the partition root
+            * and set the ancestor level accordingly.
+            */
            if (pub->alltables)
            {
                publish = true;
                if (pub->pubviaroot && am_partition)
-                   publish_as_relid = llast_oid(get_partition_ancestors(relid));
+               {
+                   List       *ancestors = get_partition_ancestors(relid);
+
+                   pub_relid = llast_oid(ancestors);
+                   ancestor_level = list_length(ancestors);
+               }
            }
 
            if (!publish)
@@ -1835,16 +1853,21 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
                if (am_partition)
                {
                    Oid         ancestor;
+                   int         level;
                    List       *ancestors = get_partition_ancestors(relid);
 
                    ancestor = GetTopMostAncestorInPublication(pub->oid,
-                                                              ancestors);
+                                                              ancestors,
+                                                              &level);
 
                    if (ancestor != InvalidOid)
                    {
                        ancestor_published = true;
                        if (pub->pubviaroot)
-                           publish_as_relid = ancestor;
+                       {
+                           pub_relid = ancestor;
+                           ancestor_level = level;
+                       }
                    }
                }
 
@@ -1868,6 +1891,20 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
                entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 
                rel_publications = lappend(rel_publications, pub);
+
+               /*
+                * We want to publish the changes as the top-most ancestor
+                * across all publications. So we need to check if the
+                * already calculated level is higher than the new one. If
+                * yes, we can ignore the new value (as it's a child).
+                * Otherwise the new value is an ancestor, so we keep it.
+                */
+               if (publish_ancestor_level > ancestor_level)
+                   continue;
+
+               /* The new value is an ancestor, so let's keep it. */
+               publish_as_relid = pub_relid;
+               publish_ancestor_level = ancestor_level;
            }
        }
 
index ba72e62e614e02b1e4c782ad02936e47d67b9749..fe773cf9b7d59133b7a51e0edb91f66f3dd32c37 100644 (file)
@@ -134,7 +134,8 @@ extern List *GetAllSchemaPublicationRelations(Oid puboid,
 extern List *GetPubPartitionOptionRelations(List *result,
                                            PublicationPartOpt pub_partopt,
                                            Oid relid);
-extern Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors);
+extern Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors,
+                                           int *ancestor_level);
 
 extern bool is_publishable_relation(Relation rel);
 extern bool is_schema_publication(Oid pubid);
index 5266471a7aeb58c48dcc90c3bceeb66f2c53e8c2..66e63e755ef0febec77ac3227e9c6a2ee8fce961 100644 (file)
@@ -409,6 +409,14 @@ $node_publisher->safe_psql('postgres',
    "CREATE TABLE tab3 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
 $node_publisher->safe_psql('postgres',
    "CREATE TABLE tab3_1 PARTITION OF tab3 FOR VALUES IN (0, 1, 2, 3, 5, 6)");
+
+$node_publisher->safe_psql('postgres',
+   "CREATE TABLE tab4 (a int PRIMARY KEY) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+   "CREATE TABLE tab4_1 PARTITION OF tab4 FOR VALUES IN (0, 1) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+   "CREATE TABLE tab4_1_1 PARTITION OF tab4_1 FOR VALUES IN (0, 1)");
+
 $node_publisher->safe_psql('postgres',
    "ALTER PUBLICATION pub_all SET (publish_via_partition_root = true)");
 # Note: tab3_1's parent is not in the publication, in which case its
@@ -419,6 +427,11 @@ $node_publisher->safe_psql('postgres',
    "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab2_1, tab3_1 WITH (publish_via_partition_root = true)"
 );
 
+# for tab4, we publish changes through the "middle" partitioned table
+$node_publisher->safe_psql('postgres',
+   "CREATE PUBLICATION pub_lower_level FOR TABLE tab4_1 WITH (publish_via_partition_root = true)"
+);
+
 # prepare data for the initial sync
 $node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (1)");
 
@@ -462,10 +475,20 @@ $node_subscriber2->safe_psql('postgres',
 $node_subscriber2->safe_psql('postgres',
    "CREATE TABLE tab3_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3_1', b text)"
 );
+
+# Note: We create two separate tables, not a partitioned one, so that we can
+# easily identity through which relation were the changes replicated.
+$node_subscriber2->safe_psql('postgres',
+   "CREATE TABLE tab4 (a int PRIMARY KEY)"
+);
+$node_subscriber2->safe_psql('postgres',
+   "CREATE TABLE tab4_1 (a int PRIMARY KEY)"
+);
 # Publication that sub2 points to now publishes via root, so must update
-# subscription target relations.
+# subscription target relations. We set the list of publications so that
+# the FOR ALL TABLES publication is second (the list order matters).
 $node_subscriber2->safe_psql('postgres',
-   "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
+   "ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_lower_level, pub_all");
 
 # Wait for initial sync of all subscriptions
 $node_subscriber1->poll_query_until('postgres', $synced_query)
@@ -487,6 +510,11 @@ $node_publisher->safe_psql('postgres',
 $node_publisher->safe_psql('postgres',
    "INSERT INTO tab3 VALUES (1), (0), (3), (5)");
 
+# Insert a row into the leaf partition, should be replicated through the
+# partition root (thanks to the FOR ALL TABLES partition).
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO tab4 VALUES (0)");
+
 $node_publisher->wait_for_catchup('sub_viaroot');
 $node_publisher->wait_for_catchup('sub2');
 
@@ -525,6 +553,46 @@ sub2_tab3|1
 sub2_tab3|3
 sub2_tab3|5), 'inserts into tab3 replicated');
 
+# tab4 change should be replicated through the root partition, which
+# maps to the tab4 relation on subscriber.
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT a FROM tab4 ORDER BY 1");
+is( $result, qq(0), 'inserts into tab4 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT a FROM tab4_1 ORDER BY 1");
+is( $result, qq(), 'inserts into tab4_1 replicated');
+
+
+# now switch the order of publications in the list, try again, the result
+# should be the same (no dependence on order of pulications)
+$node_subscriber2->safe_psql('postgres',
+   "ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_all, pub_lower_level");
+
+# make sure the subscription on the second subscriber is synced, before
+# continuing
+$node_subscriber2->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Insert a change into the leaf partition, should be replicated through
+# the partition root (thanks to the FOR ALL TABLES partition).
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO tab4 VALUES (1)");
+
+$node_publisher->wait_for_catchup('sub2');
+
+# tab4 change should be replicated through the root partition, which
+# maps to the tab4 relation on subscriber.
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT a FROM tab4 ORDER BY 1");
+is( $result, qq(0
+1), 'inserts into tab4 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT a FROM tab4_1 ORDER BY 1");
+is( $result, qq(), 'inserts into tab4_1 replicated');
+
+
 # update (replicated as update)
 $node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 5");
 $node_publisher->safe_psql('postgres', "UPDATE tab2 SET a = 6 WHERE a = 5");