Refactor pg_receivexlog main loop code, for readability.
authorFujii Masao <fujii@postgresql.org>
Fri, 4 Jul 2014 03:00:48 +0000 (12:00 +0900)
committerFujii Masao <fujii@postgresql.org>
Fri, 4 Jul 2014 03:00:48 +0000 (12:00 +0900)
Previously the source codes for receiving the data and for
polling the socket were included in pg_receivexlog main loop.
This commit splits out them as separate functions. This is
useful for improving the readability of main loop code and
making the future pg_receivexlog-related patch simpler.

src/bin/pg_basebackup/receivelog.c

index d76e605e21e9746a0fcf50d74e7964e66544290e..4aa35da2fce359d4e735a00ed37cb744c69ff781 100644 (file)
@@ -35,6 +35,8 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
                 uint32 timeline, char *basedir,
               stream_stop_callback stream_stop, int standby_message_timeout,
                 char *partial_suffix, XLogRecPtr *stoppos);
+static int CopyStreamPoll(PGconn *conn, long timeout_ms);
+static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
 
 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
                         uint32 *timeline);
@@ -744,12 +746,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
        int         bytes_written;
        int64       now;
        int         hdr_len;
-
-       if (copybuf != NULL)
-       {
-           PQfreemem(copybuf);
-           copybuf = NULL;
-       }
+       long        sleeptime;
 
        /*
         * Check if we should continue streaming, or abort at this point.
@@ -784,67 +781,38 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
            last_status = now;
        }
 
-       r = PQgetCopyData(conn, &copybuf, 1);
-       if (r == 0)
+       /*
+        * Compute how long send/receive loops should sleep
+        */
+       if (standby_message_timeout && still_sending)
        {
-           /*
-            * No data available. Wait for some to appear, but not longer than
-            * the specified timeout, so that we can ping the server.
-            */
-           fd_set      input_mask;
-           struct timeval timeout;
-           struct timeval *timeoutptr;
-
-           FD_ZERO(&input_mask);
-           FD_SET(PQsocket(conn), &input_mask);
-           if (standby_message_timeout && still_sending)
+           int64       targettime;
+           long        secs;
+           int         usecs;
+
+           targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
+           feTimestampDifference(now,
+                                 targettime,
+                                 &secs,
+                                 &usecs);
+           /* Always sleep at least 1 sec */
+           if (secs <= 0)
            {
-               int64       targettime;
-               long        secs;
-               int         usecs;
-
-               targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
-               feTimestampDifference(now,
-                                     targettime,
-                                     &secs,
-                                     &usecs);
-               if (secs <= 0)
-                   timeout.tv_sec = 1; /* Always sleep at least 1 sec */
-               else
-                   timeout.tv_sec = secs;
-               timeout.tv_usec = usecs;
-               timeoutptr = &timeout;
+               secs = 1;
+               usecs = 0;
            }
-           else
-               timeoutptr = NULL;
 
-           r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
-           if (r == 0 || (r < 0 && errno == EINTR))
-           {
-               /*
-                * Got a timeout or signal. Continue the loop and either
-                * deliver a status packet to the server or just go back into
-                * blocking.
-                */
-               continue;
-           }
-           else if (r < 0)
-           {
-               fprintf(stderr, _("%s: select() failed: %s\n"),
-                       progname, strerror(errno));
-               goto error;
-           }
-           /* Else there is actually data on the socket */
-           if (PQconsumeInput(conn) == 0)
-           {
-               fprintf(stderr,
-                       _("%s: could not receive data from WAL stream: %s"),
-                       progname, PQerrorMessage(conn));
-               goto error;
-           }
-           continue;
+           sleeptime = secs * 1000 + usecs / 1000;
        }
+       else
+           sleeptime = -1;
+
+       r = CopyStreamReceive(conn, sleeptime, &copybuf);
+       if (r == 0)
+           continue;
        if (r == -1)
+           goto error;
+       if (r == -2)
        {
            PGresult   *res = PQgetResult(conn);
 
@@ -877,15 +845,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
            }
            if (copybuf != NULL)
                PQfreemem(copybuf);
+           copybuf = NULL;
            *stoppos = blockpos;
            return res;
        }
-       if (r == -2)
-       {
-           fprintf(stderr, _("%s: could not read COPY data: %s"),
-                   progname, PQerrorMessage(conn));
-           goto error;
-       }
 
        /* Check the message type. */
        if (copybuf[0] == 'k')
@@ -1056,3 +1019,115 @@ error:
        PQfreemem(copybuf);
    return NULL;
 }
+
+/*
+ * Wait until we can read CopyData message, or timeout.
+ *
+ * Returns 1 if data has become available for reading, 0 if timed out
+ * or interrupted by signal, and -1 on an error.
+ */
+static int
+CopyStreamPoll(PGconn *conn, long timeout_ms)
+{
+   int         ret;
+   fd_set      input_mask;
+   struct timeval timeout;
+   struct timeval *timeoutptr;
+
+   if (PQsocket(conn) < 0)
+   {
+       fprintf(stderr, _("%s: socket not open"), progname);
+       return -1;
+   }
+
+   FD_ZERO(&input_mask);
+   FD_SET(PQsocket(conn), &input_mask);
+
+   if (timeout_ms < 0)
+       timeoutptr = NULL;
+   else
+   {
+       timeout.tv_sec = timeout_ms / 1000L;
+       timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
+       timeoutptr = &timeout;
+   }
+
+   ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+   if (ret == 0 || (ret < 0 && errno == EINTR))
+       return 0;       /* Got a timeout or signal */
+   else if (ret < 0)
+   {
+       fprintf(stderr, _("%s: select() failed: %s\n"),
+               progname, strerror(errno));
+       return -1;
+   }
+
+   return 1;
+}
+
+/*
+ * Receive CopyData message available from XLOG stream, blocking for
+ * maximum of 'timeout' ms.
+ *
+ * If data was received, returns the length of the data. *buffer is set to
+ * point to a buffer holding the received message. The buffer is only valid
+ * until the next CopyStreamReceive call.
+ *
+ * 0 if no data was available within timeout, or wait was interrupted
+ * by signal. -1 on error. -2 if the server ended the COPY.
+ */
+static int
+CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
+{
+   static char    *copybuf = NULL;
+   int         rawlen;
+
+   if (copybuf != NULL)
+       PQfreemem(copybuf);
+   copybuf = NULL;
+   *buffer = NULL;
+
+   /* Try to receive a CopyData message */
+   rawlen = PQgetCopyData(conn, &copybuf, 1);
+   if (rawlen == 0)
+   {
+       /*
+        * No data available. Wait for some to appear, but not longer than
+        * the specified timeout, so that we can ping the server.
+        */
+       if (timeout > 0)
+       {
+           int     ret;
+
+           ret = CopyStreamPoll(conn, timeout);
+           if (ret <= 0)
+               return ret;
+       }
+
+       /* Else there is actually data on the socket */
+       if (PQconsumeInput(conn) == 0)
+       {
+           fprintf(stderr,
+                   _("%s: could not receive data from WAL stream: %s"),
+                   progname, PQerrorMessage(conn));
+           return -1;
+       }
+
+       /* Now that we've consumed some input, try again */
+       rawlen = PQgetCopyData(conn, &copybuf, 1);
+       if (rawlen == 0)
+           return 0;
+   }
+   if (rawlen == -1)           /* end-of-streaming or error */
+       return -2;
+   if (rawlen == -2)
+   {
+       fprintf(stderr, _("%s: could not read COPY data: %s"),
+               progname, PQerrorMessage(conn));
+       return -1;
+   }
+
+   /* Return received messages to caller */
+   *buffer = copybuf;
+   return rawlen;
+}