* Initialize replication slots, before there's a chance to remove
* required resources.
*/
- StartupReplicationSlots(checkPoint.redo);
+ StartupReplicationSlots();
/*
* Startup logical state, needs to be setup now so we have proper data
slot = MyReplicationSlot;
context = AllocSetContextCreate(CurrentMemoryContext,
- "Changeset Extraction Context",
+ "Logical Decoding Context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
* needs to be run before we start crash recovery.
*/
void
-StartupReplicationSlots(XLogRecPtr checkPointRedo)
+StartupReplicationSlots(void)
{
DIR *replication_dir;
struct dirent *replication_de;
HeapTuple tuple;
Datum result;
- check_permissions();
-
- CheckSlotRequirements();
+ Assert(!MyReplicationSlot);
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
+ check_permissions();
+
+ CheckSlotRequirements();
+
/* acquire replication slot, this will check for conflicting names */
ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
Datum values[2];
bool nulls[2];
+ Assert(!MyReplicationSlot);
+
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
CheckLogicalDecodingRequirements();
- Assert(!MyReplicationSlot);
-
/*
- * Acquire a logical decoding slot, this will check for conflicting names.
+ * Acquire a logical decoding slot, this will check for conflicting
+ * names. Initially create it as ephemeral - that allows us to nicely
+ * handle errors during initialization because it'll get dropped if this
+ * transaction fails. We'll make it persistent at the end.
*/
ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL);
else
{
CheckLogicalDecodingRequirements();
+ /*
+ * Initially create the slot as ephemeral - that allows us to nicely
+ * handle errors during initialization because it'll get dropped if
+ * this transaction fails. We'll make it persistent at the end.
+ */
ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL);
}
* If we're using a replication slot we reserve the xmin via that,
* otherwise via the walsender's PGXACT entry.
*
- * XXX: It might make sense to introduce ephemeral slots and always use
- * the slot mechanism.
+ * XXX: It might make sense to generalize the ephemeral slot concept and
+ * always use the slot mechanism to handle the feedback xmin.
*/
if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
PhysicalReplicationSlotNewXmin(feedbackXmin);
extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
-extern void StartupReplicationSlots(XLogRecPtr checkPointRedo);
+extern void StartupReplicationSlots(void);
extern void CheckPointReplicationSlots(void);
extern void CheckSlotRequirements(void);