Rewrite comments in replication slot advance implementation
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 19 Jul 2018 18:15:44 +0000 (14:15 -0400)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 19 Jul 2018 18:15:44 +0000 (14:15 -0400)
The code added by 9c7d06d60680 was a bit obscure; clarify that by
rewriting the comments.  Lack of clarity has already caused bugs, so
it's a worthy goal.

Co-authored-by: Arseny Sher <a.sher@postgrespro.ru>
Co-authored-by: Michaël Paquier <michael@paquier.xyz>
Co-authored-by: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: Petr Jelínek <petr.jelinek@2ndquadrant.com>
Discussion: https://postgr.es/m/87y3fgoyrn.fsf@ars-thinkpad

src/backend/replication/logical/logical.c
src/backend/replication/slotfuncs.c

index c2d0e0c723f457a6c0cfc95b11208bcdaaa73102..3cd4eefb9bf776c8cfb06fa311e23d371011d76d 100644 (file)
@@ -338,7 +338,10 @@ CreateInitDecodingContext(char *plugin,
  *     that, see below).
  *
  * output_plugin_options
- *     contains options passed to the output plugin.
+ *     options passed to the output plugin.
+ *
+ * fast_forward
+ *     bypass the generation of logical changes.
  *
  * read_page, prepare_write, do_write, update_progress
  *     callbacks that have to be filled to perform the use-case dependent,
index 6d7474a976c6c46a4d4379d8fd955616ddb187b9..8782bad4a2102b80daa9618d1d0afae833a95873 100644 (file)
@@ -317,10 +317,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 }
 
 /*
- * Helper function for advancing physical replication slot forward.
- * The LSN position to move to is compared simply to the slot's
- * restart_lsn, knowing that any position older than that would be
- * removed by successive checkpoints.
+ * Helper function for advancing our physical replication slot forward.
+ *
+ * The LSN position to move to is compared simply to the slot's restart_lsn,
+ * knowing that any position older than that would be removed by successive
+ * checkpoints.
  */
 static XLogRecPtr
 pg_physical_replication_slot_advance(XLogRecPtr moveto)
@@ -340,59 +341,78 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
 }
 
 /*
- * Helper function for advancing logical replication slot forward.
+ * Helper function for advancing our logical replication slot forward.
+ *
  * The slot's restart_lsn is used as start point for reading records,
  * while confirmed_lsn is used as base point for the decoding context.
- * The LSN position to move to is checked by doing a per-record scan and
- * logical decoding which makes sure that confirmed_lsn is updated to a
- * LSN which allows the future slot consumer to get consistent logical
- * changes.
+ *
+ * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
+ * because we need to digest WAL to advance restart_lsn allowing to recycle
+ * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
+ * mode, no changes are generated anyway.
  */
 static XLogRecPtr
 pg_logical_replication_slot_advance(XLogRecPtr moveto)
 {
    LogicalDecodingContext *ctx;
    ResourceOwner old_resowner = CurrentResourceOwner;
-   XLogRecPtr  startlsn = MyReplicationSlot->data.restart_lsn;
-   XLogRecPtr  retlsn = MyReplicationSlot->data.confirmed_flush;
+   XLogRecPtr  startlsn;
+   XLogRecPtr  retlsn;
 
    PG_TRY();
    {
-       /* restart at slot's confirmed_flush */
+       /*
+        * Create our decoding context in fast_forward mode, passing start_lsn
+        * as InvalidXLogRecPtr, so that we start processing from my slot's
+        * confirmed_flush.
+        */
        ctx = CreateDecodingContext(InvalidXLogRecPtr,
                                    NIL,
-                                   true,
+                                   true,   /* fast_forward */
                                    logical_read_local_xlog_page,
                                    NULL, NULL, NULL);
 
+       /*
+        * Start reading at the slot's restart_lsn, which we know to point to
+        * a valid record.
+        */
+       startlsn = MyReplicationSlot->data.restart_lsn;
+
+       /* Initialize our return value in case we don't do anything */
+       retlsn = MyReplicationSlot->data.confirmed_flush;
+
        /* invalidate non-timetravel entries */
        InvalidateSystemCaches();
 
-       /* Decode until we run out of records */
-       while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
-              (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto))
+       /* Decode at least one record, until we run out of records */
+       while ((!XLogRecPtrIsInvalid(startlsn) &&
+               startlsn < moveto) ||
+              (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) &&
+               ctx->reader->EndRecPtr < moveto))
        {
-           XLogRecord *record;
            char       *errm = NULL;
+           XLogRecord *record;
 
+           /*
+            * Read records.  No changes are generated in fast_forward mode,
+            * but snapbuilder/slot statuses are updated properly.
+            */
            record = XLogReadRecord(ctx->reader, startlsn, &errm);
            if (errm)
                elog(ERROR, "%s", errm);
 
-           /*
-            * Now that we've set up the xlog reader state, subsequent calls
-            * pass InvalidXLogRecPtr to say "continue from last record"
-            */
+           /* Read sequentially from now on */
            startlsn = InvalidXLogRecPtr;
 
            /*
-            * The {begin_txn,change,commit_txn}_wrapper callbacks above will
-            * store the description into our tuplestore.
+            * Process the record.  Storage-level changes are ignored in
+            * fast_forward mode, but other modules (such as snapbuilder)
+            * might still have critical updates to do.
             */
-           if (record != NULL)
+           if (record)
                LogicalDecodingProcessRecord(ctx, ctx->reader);
 
-           /* Stop once the moving point wanted by caller has been reached */
+           /* Stop once the requested target has been reached */
            if (moveto <= ctx->reader->EndRecPtr)
                break;
 
@@ -411,7 +431,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
            LogicalConfirmReceivedLocation(moveto);
 
            /*
-            * If only the confirmed_flush_lsn has changed the slot won't get
+            * If only the confirmed_flush LSN has changed the slot won't get
             * marked as dirty by the above. Callers on the walsender
             * interface are expected to keep track of their own progress and
             * don't need it written out. But SQL-interface users cannot