recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
- snprintf(originname, sizeof(originname), "pg_%u", subid);
+ ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
replorigin_create(originname);
/*
* origin and by this time the origin might be already
* removed. For these reasons, passing missing_ok = true.
*/
- ReplicationOriginNameForTablesync(sub->oid, relid, originname,
- sizeof(originname));
+ ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
+ sizeof(originname));
replorigin_drop_by_name(originname, true, false);
}
char originname[NAMEDATALEN];
XLogRecPtr remote_lsn;
- snprintf(originname, sizeof(originname), "pg_%u", subid);
+ ReplicationOriginNameForLogicalRep(subid, InvalidOid,
+ originname, sizeof(originname));
originid = replorigin_by_name(originname, false);
remote_lsn = replorigin_get_progress(originid, false);
* worker so passing missing_ok = true. This can happen for the states
* before SUBREL_STATE_FINISHEDCOPY.
*/
- ReplicationOriginNameForTablesync(subid, relid, originname,
- sizeof(originname));
+ ReplicationOriginNameForLogicalRep(subid, relid, originname,
+ sizeof(originname));
replorigin_drop_by_name(originname, true, false);
}
RemoveSubscriptionRel(subid, InvalidOid);
/* Remove the origin tracking if exists. */
- snprintf(originname, sizeof(originname), "pg_%u", subid);
+ ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
replorigin_drop_by_name(originname, true, false);
/*
*/
StartTransactionCommand();
- ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- originname,
- sizeof(originname));
+ ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
/*
* Resetting the origin session removes the ownership of the slot.
* error while dropping we won't restart it to drop the
* origin. So passing missing_ok = true.
*/
- ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
- rstate->relid,
- originname,
- sizeof(originname));
+ ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
+ rstate->relid,
+ originname,
+ sizeof(originname));
replorigin_drop_by_name(originname, true, false);
/*
relid, GetSystemIdentifier());
}
-/*
- * Form the origin name for tablesync.
- *
- * Return the name in the supplied buffer.
- */
-void
-ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
- char *originname, Size szorgname)
-{
- snprintf(originname, szorgname, "pg_%u_%u", suboid, relid);
-}
-
/*
* Start syncing the table in the sync worker.
*
MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
/* Assign the origin tracking record name. */
- ReplicationOriginNameForTablesync(MySubscription->oid,
- MyLogicalRepWorker->relid,
- originname,
- sizeof(originname));
+ ReplicationOriginNameForLogicalRep(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
{
static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
static inline void reset_apply_error_context_info(void);
+/*
+ * Form the origin name for the subscription.
+ *
+ * This is a common function for tablesync and other workers. Tablesync workers
+ * must pass a valid relid. Other callers must pass relid = InvalidOid.
+ *
+ * Return the name in the supplied buffer.
+ */
+void
+ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
+ char *originname, Size szoriginname)
+{
+ if (OidIsValid(relid))
+ {
+ /* Replication origin name for tablesync workers. */
+ snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
+ }
+ else
+ {
+ /* Replication origin name for non-tablesync workers. */
+ snprintf(originname, szoriginname, "pg_%u", suboid);
+ }
+}
+
/*
* Should this worker apply changes for given relation.
*
* Allocate the origin name in long-lived context for error context
* message.
*/
- ReplicationOriginNameForTablesync(MySubscription->oid,
- MyLogicalRepWorker->relid,
- originname,
- sizeof(originname));
+ ReplicationOriginNameForLogicalRep(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
originname);
}
/* Setup replication origin tracking. */
StartTransactionCommand();
- snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
+ ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+ originname, sizeof(originname));
originid = replorigin_by_name(originname, true);
if (!OidIsValid(originid))
originid = replorigin_create(originname);
extern int logicalrep_sync_worker_count(Oid subid);
-extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
- char *originname, Size szorgname);
+extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
+ char *originname, Size szoriginname);
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
extern bool AllTablesyncsReady(void);