Consistency improvements for slot and decoding code.
authorAndres Freund <andres@anarazel.de>
Thu, 12 Jun 2014 11:23:46 +0000 (13:23 +0200)
committerAndres Freund <andres@anarazel.de>
Thu, 12 Jun 2014 11:33:27 +0000 (13:33 +0200)
Change the order of checks in similar functions to be the same; remove
a parameter that's not needed anymore; rename a memory context and
expand a couple of comments.

Per review comments from Amit Kapila

src/backend/access/transam/xlog.c
src/backend/replication/logical/logical.c
src/backend/replication/slot.c
src/backend/replication/slotfuncs.c
src/backend/replication/walsender.c
src/include/replication/slot.h

index 9eca63cbbfa2e5c79673ca08e30599baf59451c0..3f92482b42d4b792c2b6b908a2b635dfc2a04766 100644 (file)
@@ -6270,7 +6270,7 @@ StartupXLOG(void)
     * Initialize replication slots, before there's a chance to remove
     * required resources.
     */
-   StartupReplicationSlots(checkPoint.redo);
+   StartupReplicationSlots();
 
    /*
     * Startup logical state, needs to be setup now so we have proper data
index b82580fbcdf7a5d3dbdbb95ad9cc4bf6e73fa691..9eb5cd5ee4dca95673049eee6e3e4ed3463050c3 100644 (file)
@@ -125,7 +125,7 @@ StartupDecodingContext(List *output_plugin_options,
    slot = MyReplicationSlot;
 
    context = AllocSetContextCreate(CurrentMemoryContext,
-                                   "Changeset Extraction Context",
+                                   "Logical Decoding Context",
                                    ALLOCSET_DEFAULT_MINSIZE,
                                    ALLOCSET_DEFAULT_INITSIZE,
                                    ALLOCSET_DEFAULT_MAXSIZE);
index ee0c7c07a97362736993b38d6ac1279c74ba1d55..5671ac1d14fdb4368e71a27a3cca7203106d55a0 100644 (file)
@@ -829,7 +829,7 @@ CheckPointReplicationSlots(void)
  * needs to be run before we start crash recovery.
  */
 void
-StartupReplicationSlots(XLogRecPtr checkPointRedo)
+StartupReplicationSlots(void)
 {
    DIR        *replication_dir;
    struct dirent *replication_de;
index dc94f504ee27692ae996bc72a0a789d0c6ce5ccc..bd4701f97dfa04e2b8ade5c1c16363c31aa76544 100644 (file)
@@ -46,13 +46,15 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
    HeapTuple   tuple;
    Datum       result;
 
-   check_permissions();
-
-   CheckSlotRequirements();
+   Assert(!MyReplicationSlot);
 
    if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
        elog(ERROR, "return type must be a row type");
 
+   check_permissions();
+
+   CheckSlotRequirements();
+
    /* acquire replication slot, this will check for conflicting names */
    ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
 
@@ -87,6 +89,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
    Datum       values[2];
    bool        nulls[2];
 
+   Assert(!MyReplicationSlot);
+
    if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
        elog(ERROR, "return type must be a row type");
 
@@ -94,10 +98,11 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 
    CheckLogicalDecodingRequirements();
 
-   Assert(!MyReplicationSlot);
-
    /*
-    * Acquire a logical decoding slot, this will check for conflicting names.
+    * Acquire a logical decoding slot, this will check for conflicting
+    * names. Initially create it as ephemeral - that allows us to nicely
+    * handle errors during initialization because it'll get dropped if this
+    * transaction fails. We'll make it persistent at the end.
     */
    ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL);
 
index 088ee2c0976e2091b8b47aaf4279bded2a379a61..318979342ebb593514ec6aac0ffbdc0b25813f6a 100644 (file)
@@ -781,6 +781,11 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
    else
    {
        CheckLogicalDecodingRequirements();
+       /*
+        * Initially create the slot as ephemeral - that allows us to nicely
+        * handle errors during initialization because it'll get dropped if
+        * this transaction fails. We'll make it persistent at the end.
+        */
        ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL);
    }
 
@@ -1682,8 +1687,8 @@ ProcessStandbyHSFeedbackMessage(void)
     * If we're using a replication slot we reserve the xmin via that,
     * otherwise via the walsender's PGXACT entry.
     *
-    * XXX: It might make sense to introduce ephemeral slots and always use
-    * the slot mechanism.
+    * XXX: It might make sense to generalize the ephemeral slot concept and
+    * always use the slot mechanism to handle the feedback xmin.
     */
    if (MyReplicationSlot != NULL)      /* XXX: persistency configurable? */
        PhysicalReplicationSlotNewXmin(feedbackXmin);
index 341e829bbb33945af5eb5c6460af302c25a31dc5..c129a4a7718e8ee5af55afa14179f2994533bc80 100644 (file)
@@ -164,7 +164,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 
-extern void StartupReplicationSlots(XLogRecPtr checkPointRedo);
+extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
 
 extern void CheckSlotRequirements(void);