Add -F option to pg_receivexlog, for specifying fsync interval.
authorFujii Masao <fujii@postgresql.org>
Fri, 8 Aug 2014 07:50:54 +0000 (16:50 +0900)
committerFujii Masao <fujii@postgresql.org>
Fri, 8 Aug 2014 07:50:54 +0000 (16:50 +0900)
This allows us to specify the maximum time to issue fsync to ensure
the received WAL file is safely flushed to disk. Without this,
pg_receivexlog always flushes WAL file only when it's closed and
which can cause WAL data to be lost at the event of a crash.

Furuya Osamu, heavily modified by me.

doc/src/sgml/ref/pg_receivexlog.sgml
src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivexlog.c
src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/receivelog.h

index 7c50b01a57b982a28136fca8fa853fa054c36e19..c15776fc5852824d3550c79e7dd5e8fec9c9d104 100644 (file)
@@ -105,6 +105,21 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+       <term><option>-F <replaceable class="parameter">interval</replaceable></option></term>
+       <term><option>--fsync-interval=<replaceable class="parameter">interval</replaceable></option></term>
+       <listitem>
+        <para>
+        Specifies the maximum time to issue sync commands to ensure the
+        received WAL file is safely flushed to disk, in seconds. The default
+        value is zero, which disables issuing fsyncs except when WAL file is
+        closed. If <literal>-1</literal> is specified, WAL file is flushed as
+        soon as possible, that is, as soon as there are WAL data which has
+        not been flushed yet.
+        </para>
+       </listitem>
+      </varlistentry>
+
      <varlistentry>
       <term><option>-v</option></term>
       <term><option>--verbose</option></term>
index 5df2eb8c0db7c01827bbdb405eb82e8978d3c1c3..0b02c4c4014b59a6e09fcd6c121caea7934a64de 100644 (file)
@@ -371,7 +371,7 @@ LogStreamerMain(logstreamer_param *param)
    if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
                           param->sysidentifier, param->xlogdir,
                           reached_end_position, standby_message_timeout,
-                          NULL))
+                          NULL, 0))
 
        /*
         * Any errors will already have been reported in the function process,
index 96408389062cd3d3846d841a0d30bf709cf34cd7..0b7af54a7baff7fa935ac1e0c03d3a8946d59a46 100644 (file)
@@ -36,6 +36,7 @@ static char *basedir = NULL;
 static int verbose = 0;
 static int noloop = 0;
 static int standby_message_timeout = 10 * 1000;        /* 10 sec = default */
+static int fsync_interval = 0; /* 0 = default */
 static volatile bool time_to_abort = false;
 
 
@@ -62,6 +63,8 @@ usage(void)
    printf(_("\nOptions:\n"));
    printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
    printf(_("  -n, --no-loop          do not loop on connection lost\n"));
+   printf(_("  -F  --fsync-interval=INTERVAL\n"
+            "                         frequency of syncs to transaction log files (in seconds)\n"));
    printf(_("  -v, --verbose          output verbose messages\n"));
    printf(_("  -V, --version          output version information, then exit\n"));
    printf(_("  -?, --help             show this help, then exit\n"));
@@ -330,7 +333,8 @@ StreamLog(void)
                starttli);
 
    ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
-                     stop_streaming, standby_message_timeout, ".partial");
+                     stop_streaming, standby_message_timeout, ".partial",
+                     fsync_interval);
 
    PQfinish(conn);
 }
@@ -360,6 +364,7 @@ main(int argc, char **argv)
        {"port", required_argument, NULL, 'p'},
        {"username", required_argument, NULL, 'U'},
        {"no-loop", no_argument, NULL, 'n'},
+       {"fsync-interval", required_argument, NULL, 'F'},
        {"no-password", no_argument, NULL, 'w'},
        {"password", no_argument, NULL, 'W'},
        {"status-interval", required_argument, NULL, 's'},
@@ -389,7 +394,7 @@ main(int argc, char **argv)
        }
    }
 
-   while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
+   while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nF:wWv",
                            long_options, &option_index)) != -1)
    {
        switch (c)
@@ -436,6 +441,15 @@ main(int argc, char **argv)
            case 'n':
                noloop = 1;
                break;
+       case 'F':
+           fsync_interval = atoi(optarg) * 1000;
+           if (fsync_interval < -1000)
+           {
+               fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
+                       progname, optarg);
+               exit(1);
+           }
+           break;
            case 'v':
                verbose++;
                break;
index d28e13b4d8c9f3b9433aee886e2c8596a12ffa29..89b22f20e2a3b373adeb24b26f32a60876125bc4 100644 (file)
@@ -31,12 +31,14 @@ static char current_walfile_name[MAXPGPATH] = "";
 static bool reportFlushPosition = false;
 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
 
+static int64 last_fsync = -1;      /* timestamp of last WAL file flush */
 static bool still_sending = true;      /* feedback still needs to be sent? */
 
 static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
                 uint32 timeline, char *basedir,
               stream_stop_callback stream_stop, int standby_message_timeout,
-                char *partial_suffix, XLogRecPtr *stoppos);
+                 char *partial_suffix, XLogRecPtr *stoppos,
+                 int fsync_interval);
 static int CopyStreamPoll(PGconn *conn, long timeout_ms);
 static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
 static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
@@ -48,6 +50,13 @@ static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
 static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
                                       XLogRecPtr blockpos, char *basedir, char *partial_suffix,
                                       XLogRecPtr *stoppos);
+static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
+                               uint32 timeline, char *basedir,
+                               stream_stop_callback stream_stop,
+                               char *partial_suffix, XLogRecPtr *stoppos);
+static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
+                                        int64 last_status, int fsync_interval,
+                                        XLogRecPtr blockpos);
 
 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
                         uint32 *timeline);
@@ -200,6 +209,7 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
                progname, current_walfile_name, partial_suffix);
 
    lastFlushPosition = pos;
+   last_fsync = feGetCurrentTimestamp();
    return true;
 }
 
@@ -430,13 +440,17 @@ CheckServerVersionForStreaming(PGconn *conn)
  * allows you to tell the difference between partial and completed files,
  * so that you can continue later where you left.
  *
+ * fsync_interval controls how often we flush to the received WAL file,
+ * in milliseconds.
+ *
  * Note: The log position *must* be at a log segment start!
  */
 bool
 ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                  char *sysidentifier, char *basedir,
                  stream_stop_callback stream_stop,
-                 int standby_message_timeout, char *partial_suffix)
+                 int standby_message_timeout, char *partial_suffix,
+                 int fsync_interval)
 {
    char        query[128];
    char        slotcmd[128];
@@ -581,7 +595,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
        /* Stream the WAL */
        res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
                               standby_message_timeout, partial_suffix,
-                              &stoppos);
+                              &stoppos, fsync_interval);
        if (res == NULL)
            goto error;
 
@@ -746,7 +760,7 @@ static PGresult *
 HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                 char *basedir, stream_stop_callback stream_stop,
                 int standby_message_timeout, char *partial_suffix,
-                XLogRecPtr *stoppos)
+                XLogRecPtr *stoppos, int fsync_interval)
 {
    char       *copybuf = NULL;
    int64       last_status = -1;
@@ -763,26 +777,36 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
        /*
         * Check if we should continue streaming, or abort at this point.
         */
-       if (still_sending && stream_stop(blockpos, timeline, false))
+       if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
+                               stream_stop, partial_suffix, stoppos))
+           goto error;
+
+       now = feGetCurrentTimestamp();
+
+       /*
+        * If fsync_interval has elapsed since last WAL flush and we've written
+        * some WAL data, flush them to disk.
+        */
+       if (lastFlushPosition < blockpos &&
+           walfile != -1 &&
+           ((fsync_interval > 0 &&
+             feTimestampDifferenceExceeds(last_fsync, now, fsync_interval)) ||
+            fsync_interval < 0))
        {
-           if (!close_walfile(basedir, partial_suffix, blockpos))
-           {
-               /* Potential error message is written by close_walfile */
-               goto error;
-           }
-           if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+           if (fsync(walfile) != 0)
            {
-               fprintf(stderr, _("%s: could not send copy-end packet: %s"),
-                       progname, PQerrorMessage(conn));
+               fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
+                       progname, current_walfile_name, strerror(errno));
                goto error;
            }
-           still_sending = false;
+
+           lastFlushPosition = blockpos;
+           last_fsync = now;
        }
 
        /*
         * Potentially send a status message to the master
         */
-       now = feGetCurrentTimestamp();
        if (still_sending && standby_message_timeout > 0 &&
            feTimestampDifferenceExceeds(last_status, now,
                                         standby_message_timeout))
@@ -794,64 +818,58 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
        }
 
        /*
-        * Compute how long send/receive loops should sleep
+        * Calculate how long send/receive loops should sleep
         */
-       if (standby_message_timeout && still_sending)
-       {
-           int64       targettime;
-           long        secs;
-           int         usecs;
-
-           targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
-           feTimestampDifference(now,
-                                 targettime,
-                                 &secs,
-                                 &usecs);
-           /* Always sleep at least 1 sec */
-           if (secs <= 0)
-           {
-               secs = 1;
-               usecs = 0;
-           }
-
-           sleeptime = secs * 1000 + usecs / 1000;
-       }
-       else
-           sleeptime = -1;
+       sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
+                                                last_status, fsync_interval, blockpos);
 
        r = CopyStreamReceive(conn, sleeptime, &copybuf);
-       if (r == 0)
-           continue;
-       if (r == -1)
-           goto error;
-       if (r == -2)
+       while (r != 0)
        {
-           PGresult    *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
-                                                    basedir, partial_suffix, stoppos);
-           if (res == NULL)
+           if (r == -1)
                goto error;
-           else
-               return res;
-       }
+           if (r == -2)
+           {
+               PGresult    *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
+                                                        basedir, partial_suffix, stoppos);
+               if (res == NULL)
+                   goto error;
+               else
+                   return res;
+           }
 
-       /* Check the message type. */
-       if (copybuf[0] == 'k')
-       {
-           if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
-                                    &last_status))
-               goto error;
-       }
-       else if (copybuf[0] == 'w')
-       {
-           if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
-                                   timeline, basedir, stream_stop, partial_suffix))
+           /* Check the message type. */
+           if (copybuf[0] == 'k')
+           {
+               if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
+                                        &last_status))
+                   goto error;
+           }
+           else if (copybuf[0] == 'w')
+           {
+               if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
+                                       timeline, basedir, stream_stop, partial_suffix))
+                   goto error;
+
+               /*
+                * Check if we should continue streaming, or abort at this point.
+                */
+               if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
+                                        stream_stop, partial_suffix, stoppos))
+                   goto error;
+           }
+           else
+           {
+               fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+                       progname, copybuf[0]);
                goto error;
-       }
-       else
-       {
-           fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
-                   progname, copybuf[0]);
-           goto error;
+           }
+
+           /*
+            * Process the received data, and any subsequent data we
+            * can read without blocking.
+            */
+           r = CopyStreamReceive(conn, 0, &copybuf);
        }
    }
 
@@ -1193,3 +1211,80 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
    *stoppos = blockpos;
    return res;
 }
+
+/*
+ * Check if we should continue streaming, or abort at this point.
+ */
+static bool
+CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
+                   char *basedir, stream_stop_callback stream_stop,
+                   char *partial_suffix, XLogRecPtr *stoppos)
+{
+   if (still_sending && stream_stop(blockpos, timeline, false))
+   {
+       if (!close_walfile(basedir, partial_suffix, blockpos))
+       {
+           /* Potential error message is written by close_walfile */
+           return false;
+       }
+       if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+       {
+           fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+                   progname, PQerrorMessage(conn));
+           return false;
+       }
+       still_sending = false;
+   }
+
+   return true;
+}
+
+/*
+ * Calculate how long send/receive loops should sleep
+ */
+static long
+CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
+                            int64 last_status, int fsync_interval, XLogRecPtr blockpos)
+{
+   int64       targettime = 0;
+   int64       status_targettime = 0;
+   int64       fsync_targettime = 0;
+   long        sleeptime;
+
+   if (standby_message_timeout && still_sending)
+       status_targettime = last_status +
+           (standby_message_timeout - 1) * ((int64) 1000);
+
+   if (fsync_interval > 0 && lastFlushPosition < blockpos)
+       fsync_targettime = last_fsync +
+           (fsync_interval - 1) * ((int64) 1000);
+
+   if ((status_targettime < fsync_targettime && status_targettime > 0) ||
+       fsync_targettime == 0)
+       targettime = status_targettime;
+   else
+       targettime = fsync_targettime;
+
+   if (targettime > 0)
+   {
+       long        secs;
+       int         usecs;
+
+       feTimestampDifference(now,
+                             targettime,
+                             &secs,
+                             &usecs);
+       /* Always sleep at least 1 sec */
+       if (secs <= 0)
+       {
+           secs = 1;
+           usecs = 0;
+       }
+
+       sleeptime = secs * 1000 + usecs / 1000;
+   }
+   else
+       sleeptime = -1;
+
+   return sleeptime;
+}
index f4789a580ae75df4b97fe40be89ec3fe4817ba70..72f82453733463400e9d0e6364ea8d9e3815c850 100644 (file)
@@ -16,4 +16,5 @@ extern bool ReceiveXlogStream(PGconn *conn,
                  char *basedir,
                  stream_stop_callback stream_stop,
                  int standby_message_timeout,
-                 char *partial_suffix);
+                 char *partial_suffix,
+                 int fsync_interval);