pg_createsubscriber: Add -R publications option.
authorAmit Kapila <akapila@postgresql.org>
Thu, 20 Mar 2025 06:51:54 +0000 (12:21 +0530)
committerAmit Kapila <akapila@postgresql.org>
Thu, 20 Mar 2025 06:51:54 +0000 (12:21 +0530)
This patch introduces a new '-R'/'--remove' option in the
'pg_createsubscriber' utility to specify the object types to be removed
from the subscriber. Currently, we add support to specify 'publications'
as an object type. In the future, other object types like failover-slots
could be added.

This feature allows optionally to remove publications on the subscriber
that were replicated from the primary server (before running this tool)
during physical replication. Users may want to retain these publications
in case they want some pre-existing subscribers to point to the newly
created subscriber.

Author: Shubham Khanna <khannashubham1197@gmail.com>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Reviewed-by: David G. Johnston <david.g.johnston@gmail.com>
Reviewed-by: Euler Taveira <euler@eulerto.com>
Reviewed-by: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: vignesh C <vignesh21@gmail.com>
Reviewed-by: Nisha Moond <nisha.moond412@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAHv8RjL4OvoYafofTb_U_JD5HuyoNowBoGpMfnEbhDSENA74Kg@mail.gmail.com

doc/src/sgml/ref/pg_createsubscriber.sgml
src/bin/pg_basebackup/pg_createsubscriber.c
src/bin/pg_basebackup/t/040_pg_createsubscriber.pl

index b4b996236e470e54c948b169b197c150094c1c91..380d0b1c35cf205cae4e4960c13d4eff6adfece8 100644 (file)
@@ -146,6 +146,29 @@ PostgreSQL documentation
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><option>-R</option></term>
+     <term><option>--remove</option></term>
+     <listitem>
+      <para>
+       Remove all objects of the specified type from specified databases on the
+       target server.
+      </para>
+      <para>
+       publications: The "all tables" publications established for this
+       subscriber are always removed; specifying this object type causes all
+       other publications replicated from the source server to be dropped as
+       well.
+      </para>
+      <para>
+       The objects selected to be dropped are individually logged and do show
+       up in a --dry-run.  There is no opportunity to affect or stop the
+       dropping of the selected objects so consider taking a backup of them
+       using pg_dump.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-s <replaceable class="parameter">dir</replaceable></option></term>
      <term><option>--socketdir=<replaceable class="parameter">dir</replaceable></option></term>
index 6baf92e80246813cfc5f4c05838b696290e26468..d067eb44e6cb3f314c8ab01a261a699ed389355c 100644 (file)
@@ -29,6 +29,7 @@
 #include "getopt_long.h"
 
 #define    DEFAULT_SUB_PORT    "50432"
+#define    OBJECTTYPE_PUBLICATIONS  0x0001
 
 /* Command-line options */
 struct CreateSubscriberOptions
@@ -44,6 +45,7 @@ struct CreateSubscriberOptions
    SimpleStringList sub_names; /* list of subscription names */
    SimpleStringList replslot_names;    /* list of replication slot names */
    int         recovery_timeout;   /* stop recovery after this time */
+   SimpleStringList objecttypes_to_remove; /* list of object types to remove */
 };
 
 /* per-database publication/subscription info */
@@ -68,6 +70,8 @@ struct LogicalRepInfos
 {
    struct LogicalRepInfo *dbinfo;
    bool        two_phase;      /* enable-two-phase option */
+   bits32      objecttypes_to_remove;  /* flags indicating which object types
+                                        * to remove on subscriber */
 };
 
 static void cleanup_objects_atexit(void);
@@ -109,7 +113,9 @@ static void stop_standby_server(const char *datadir);
 static void wait_for_end_recovery(const char *conninfo,
                                  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
-static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static void drop_publication(PGconn *conn, const char *pubname,
+                            const char *dbname, bool *made_publication);
+static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
 static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
                                     const char *lsn);
@@ -194,7 +200,8 @@ cleanup_objects_atexit(void)
            if (conn != NULL)
            {
                if (dbinfo->made_publication)
-                   drop_publication(conn, dbinfo);
+                   drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
+                                    &dbinfo->made_publication);
                if (dbinfo->made_replslot)
                    drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
                disconnect_database(conn, false);
@@ -241,6 +248,8 @@ usage(void)
    printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
    printf(_("  -p, --subscriber-port=PORT      subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
    printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
+   printf(_("  -R, --remove=OBJECTTYPE         remove all objects of the specified type from specified\n"
+            "                                  databases on the subscriber; accepts: publications\n"));
    printf(_("  -s, --socketdir=DIR             socket directory to use (default current dir.)\n"));
    printf(_("  -t, --recovery-timeout=SECS     seconds to wait for recovery to end\n"));
    printf(_("  -T, --enable-two-phase          enable two-phase commit for all subscriptions\n"));
@@ -1193,12 +1202,8 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
         */
        check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
 
-       /*
-        * Since the publication was created before the consistent LSN, it is
-        * available on the subscriber when the physical replica is promoted.
-        * Remove publications from the subscriber because it has no use.
-        */
-       drop_publication(conn, &dbinfo[i]);
+       /* Check and drop the required publications in the given database. */
+       check_and_drop_publications(conn, &dbinfo[i]);
 
        create_subscription(conn, &dbinfo[i]);
 
@@ -1663,10 +1668,11 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 }
 
 /*
- * Remove publication if it couldn't finish all steps.
+ * Drop the specified publication in the given database.
  */
 static void
-drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
+drop_publication(PGconn *conn, const char *pubname, const char *dbname,
+                bool *made_publication)
 {
    PQExpBuffer str = createPQExpBuffer();
    PGresult   *res;
@@ -1674,10 +1680,10 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 
    Assert(conn != NULL);
 
-   pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
+   pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
 
    pg_log_info("dropping publication \"%s\" in database \"%s\"",
-               dbinfo->pubname, dbinfo->dbname);
+               pubname, dbname);
 
    appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
 
@@ -1691,8 +1697,8 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
            pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
-                        dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
-           dbinfo->made_publication = false;   /* don't try again. */
+                        pubname, dbname, PQresultErrorMessage(res));
+           *made_publication = false;  /* don't try again. */
 
            /*
             * Don't disconnect and exit here. This routine is used by primary
@@ -1708,6 +1714,55 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
    destroyPQExpBuffer(str);
 }
 
+/*
+ * Retrieve and drop the publications.
+ *
+ * Since the publications were created before the consistent LSN, they
+ * remain on the subscriber even after the physical replica is
+ * promoted. Remove these publications from the subscriber because
+ * they have no use. Additionally, if requested, drop all pre-existing
+ * publications.
+ */
+static void
+check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
+{
+   PGresult   *res;
+   bool        drop_all_pubs = dbinfos.objecttypes_to_remove & OBJECTTYPE_PUBLICATIONS;
+
+   Assert(conn != NULL);
+
+   if (drop_all_pubs)
+   {
+       pg_log_info("dropping all existing publications in database \"%s\"",
+                   dbinfo->dbname);
+
+       /* Fetch all publication names */
+       res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       {
+           pg_log_error("could not obtain publication information: %s",
+                        PQresultErrorMessage(res));
+           PQclear(res);
+           disconnect_database(conn, true);
+       }
+
+       /* Drop each publication */
+       for (int i = 0; i < PQntuples(res); i++)
+           drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
+                            &dbinfo->made_publication);
+
+       PQclear(res);
+   }
+
+   /*
+    * In dry-run mode, we don't create publications, but we still try to drop
+    * those to provide necessary information to the user.
+    */
+   if (!drop_all_pubs || dry_run)
+       drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
+                        &dbinfo->made_publication);
+}
+
 /*
  * Create a subscription with some predefined options.
  *
@@ -1914,6 +1969,7 @@ main(int argc, char **argv)
        {"dry-run", no_argument, NULL, 'n'},
        {"subscriber-port", required_argument, NULL, 'p'},
        {"publisher-server", required_argument, NULL, 'P'},
+       {"remove", required_argument, NULL, 'R'},
        {"socketdir", required_argument, NULL, 's'},
        {"recovery-timeout", required_argument, NULL, 't'},
        {"enable-two-phase", no_argument, NULL, 'T'},
@@ -1995,7 +2051,7 @@ main(int argc, char **argv)
 
    get_restricted_token();
 
-   while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v",
+   while ((c = getopt_long(argc, argv, "d:D:np:P:R:s:t:TU:v",
                            long_options, &option_index)) != -1)
    {
        switch (c)
@@ -2025,6 +2081,12 @@ main(int argc, char **argv)
            case 'P':
                opt.pub_conninfo_str = pg_strdup(optarg);
                break;
+           case 'R':
+               if (!simple_string_list_member(&opt.objecttypes_to_remove, optarg))
+                   simple_string_list_append(&opt.objecttypes_to_remove, optarg);
+               else
+                   pg_fatal("object type \"%s\" is specified more than once for --remove", optarg);
+               break;
            case 's':
                opt.socket_dir = pg_strdup(optarg);
                canonicalize_path(opt.socket_dir);
@@ -2189,6 +2251,19 @@ main(int argc, char **argv)
        exit(1);
    }
 
+   /* Verify the object types specified for removal from the subscriber */
+   for (SimpleStringListCell *cell = opt.objecttypes_to_remove.head; cell; cell = cell->next)
+   {
+       if (pg_strcasecmp(cell->val, "publications") == 0)
+           dbinfos.objecttypes_to_remove |= OBJECTTYPE_PUBLICATIONS;
+       else
+       {
+           pg_log_error("invalid object type \"%s\" specified for --remove", cell->val);
+           pg_log_error_hint("The valid option is: \"publications\"");
+           exit(1);
+       }
+   }
+
    /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
    pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
    pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
index c35fa108ce3459afd09d793f6217e390f78fae7a..2c9bd5bdb9ec9630f80685bb3f1d24afe3781d9d 100644 (file)
@@ -329,6 +329,21 @@ $node_p->safe_psql($db1,
    "CREATE SUBSCRIPTION $dummy_sub CONNECTION 'dbname=dummy' PUBLICATION pub_dummy WITH (connect=false)"
 );
 $node_p->wait_for_replay_catchup($node_s);
+
+# Create user-defined publications, wait for streaming replication to sync them
+# to the standby, then verify that '--remove'
+# removes them.
+$node_p->safe_psql(
+   $db1, qq(
+   CREATE PUBLICATION test_pub1 FOR ALL TABLES;
+   CREATE PUBLICATION test_pub2 FOR ALL TABLES;
+));
+
+$node_p->wait_for_replay_catchup($node_s);
+
+ok($node_s->safe_psql($db1, "SELECT COUNT(*) = 2 FROM pg_publication"),
+   'two pre-existing publications on subscriber');
+
 $node_s->stop;
 
 # dry run mode on node S
@@ -373,7 +388,8 @@ command_ok(
 
 # Run pg_createsubscriber on node S.  --verbose is used twice
 # to show more information.
-# In passing, also test the --enable-two-phase option
+# In passing, also test the --enable-two-phase option and
+# --remove option
 command_ok(
    [
        'pg_createsubscriber',
@@ -389,7 +405,8 @@ command_ok(
        '--replication-slot' => 'replslot2',
        '--database' => $db1,
        '--database' => $db2,
-       '--enable-two-phase'
+       '--enable-two-phase',
+       '--remove' => 'publications',
    ],
    'run pg_createsubscriber on node S');
 
@@ -408,6 +425,10 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
 # Start subscriber
 $node_s->start;
 
+# Confirm publications are removed from the subscriber node
+is($node_s->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;"),
+   '0', 'all publications on subscriber have been removed');
+
 # Verify that all subtwophase states are pending or enabled,
 # e.g. there are no subscriptions where subtwophase is disabled ('d')
 is( $node_s->safe_psql(