Add a common function to generate the origin name.
authorAmit Kapila <akapila@postgresql.org>
Tue, 11 Oct 2022 05:07:52 +0000 (10:37 +0530)
committerAmit Kapila <akapila@postgresql.org>
Tue, 11 Oct 2022 05:07:52 +0000 (10:37 +0530)
Make a common replication origin name formatting function to replace
multiple snprintf() expressions. This also includes logic previously done
by ReplicationOriginNameForTablesync().

This makes the code to generate the origin name consistent among apply
worker and tablesync worker.

Author: Peter Smith
Reviewed-By: Aleksander Alekseev
Discussion: https://postgr.es/m/CAHut%2BPsa8hhfSE6ozUK-ih7GkQziAVAf4f3bqiXEj2nQiu-43g%40mail.gmail.com

src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/include/replication/worker_internal.h

index f3bfcca434cd5a6d46d69cc94ae77962cf0dde76..97594cd9b18188570adff37a1ac44f9504369056 100644 (file)
@@ -657,7 +657,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
    recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
 
-   snprintf(originname, sizeof(originname), "pg_%u", subid);
+   ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
    replorigin_create(originname);
 
    /*
@@ -946,8 +946,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
                     * origin and by this time the origin might be already
                     * removed. For these reasons, passing missing_ok = true.
                     */
-                   ReplicationOriginNameForTablesync(sub->oid, relid, originname,
-                                                     sizeof(originname));
+                   ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
+                                                      sizeof(originname));
                    replorigin_drop_by_name(originname, true, false);
                }
 
@@ -1315,7 +1315,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
                    char        originname[NAMEDATALEN];
                    XLogRecPtr  remote_lsn;
 
-                   snprintf(originname, sizeof(originname), "pg_%u", subid);
+                   ReplicationOriginNameForLogicalRep(subid, InvalidOid,
+                                                      originname, sizeof(originname));
                    originid = replorigin_by_name(originname, false);
                    remote_lsn = replorigin_get_progress(originid, false);
 
@@ -1521,8 +1522,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
         * worker so passing missing_ok = true. This can happen for the states
         * before SUBREL_STATE_FINISHEDCOPY.
         */
-       ReplicationOriginNameForTablesync(subid, relid, originname,
-                                         sizeof(originname));
+       ReplicationOriginNameForLogicalRep(subid, relid, originname,
+                                          sizeof(originname));
        replorigin_drop_by_name(originname, true, false);
    }
 
@@ -1533,7 +1534,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
    RemoveSubscriptionRel(subid, InvalidOid);
 
    /* Remove the origin tracking if exists. */
-   snprintf(originname, sizeof(originname), "pg_%u", subid);
+   ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
    replorigin_drop_by_name(originname, true, false);
 
    /*
index b4a7b4b7f6e75cf76553533962efefdbfa1886b0..94e813ac53cd4643ba7418cdaeeed1be26023737 100644 (file)
@@ -353,10 +353,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
         */
        StartTransactionCommand();
 
-       ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
-                                         MyLogicalRepWorker->relid,
-                                         originname,
-                                         sizeof(originname));
+       ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
+                                          MyLogicalRepWorker->relid,
+                                          originname,
+                                          sizeof(originname));
 
        /*
         * Resetting the origin session removes the ownership of the slot.
@@ -505,10 +505,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                 * error while dropping we won't restart it to drop the
                 * origin. So passing missing_ok = true.
                 */
-               ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
-                                                 rstate->relid,
-                                                 originname,
-                                                 sizeof(originname));
+               ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
+                                                  rstate->relid,
+                                                  originname,
+                                                  sizeof(originname));
                replorigin_drop_by_name(originname, true, false);
 
                /*
@@ -1193,18 +1193,6 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
             relid, GetSystemIdentifier());
 }
 
-/*
- * Form the origin name for tablesync.
- *
- * Return the name in the supplied buffer.
- */
-void
-ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
-                                 char *originname, Size szorgname)
-{
-   snprintf(originname, szorgname, "pg_%u_%u", suboid, relid);
-}
-
 /*
  * Start syncing the table in the sync worker.
  *
@@ -1274,10 +1262,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
           MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
 
    /* Assign the origin tracking record name. */
-   ReplicationOriginNameForTablesync(MySubscription->oid,
-                                     MyLogicalRepWorker->relid,
-                                     originname,
-                                     sizeof(originname));
+   ReplicationOriginNameForLogicalRep(MySubscription->oid,
+                                      MyLogicalRepWorker->relid,
+                                      originname,
+                                      sizeof(originname));
 
    if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
    {
index 207a5805ba7d866dc582d9930204344182210510..5250ae7f54c072ef9a17317230036a8da7aea2f6 100644 (file)
@@ -364,6 +364,30 @@ static void apply_error_callback(void *arg);
 static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
 static inline void reset_apply_error_context_info(void);
 
+/*
+ * Form the origin name for the subscription.
+ *
+ * This is a common function for tablesync and other workers. Tablesync workers
+ * must pass a valid relid. Other callers must pass relid = InvalidOid.
+ *
+ * Return the name in the supplied buffer.
+ */
+void
+ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
+                                  char *originname, Size szoriginname)
+{
+   if (OidIsValid(relid))
+   {
+       /* Replication origin name for tablesync workers. */
+       snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
+   }
+   else
+   {
+       /* Replication origin name for non-tablesync workers. */
+       snprintf(originname, szoriginname, "pg_%u", suboid);
+   }
+}
+
 /*
  * Should this worker apply changes for given relation.
  *
@@ -3679,10 +3703,10 @@ ApplyWorkerMain(Datum main_arg)
         * Allocate the origin name in long-lived context for error context
         * message.
         */
-       ReplicationOriginNameForTablesync(MySubscription->oid,
-                                         MyLogicalRepWorker->relid,
-                                         originname,
-                                         sizeof(originname));
+       ReplicationOriginNameForLogicalRep(MySubscription->oid,
+                                          MyLogicalRepWorker->relid,
+                                          originname,
+                                          sizeof(originname));
        apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
                                                                   originname);
    }
@@ -3707,7 +3731,8 @@ ApplyWorkerMain(Datum main_arg)
 
        /* Setup replication origin tracking. */
        StartTransactionCommand();
-       snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
+       ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+                                          originname, sizeof(originname));
        originid = replorigin_by_name(originname, true);
        if (!OidIsValid(originid))
            originid = replorigin_create(originname);
index f82bc518c32addc457d31b5565d3663a9eb4596a..2b7114ff6d908fb947e772164d2879beb6e83160 100644 (file)
@@ -92,8 +92,8 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
 extern int logicalrep_sync_worker_count(Oid subid);
 
-extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
-                                             char *originname, Size szorgname);
+extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
+                                              char *originname, Size szoriginname);
 extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
 
 extern bool AllTablesyncsReady(void);