Use latch instead of select() in walreceiver
authorPeter Eisentraut <peter_e@gmx.net>
Wed, 30 Nov 2016 17:00:00 +0000 (12:00 -0500)
committerPeter Eisentraut <peter_e@gmx.net>
Fri, 2 Dec 2016 01:23:28 +0000 (20:23 -0500)
Replace use of poll()/select() by WaitLatchOrSocket(), which is more
portable and flexible.

Also change walreceiver to use its procLatch instead of a custom latch.

From: Petr Jelinek <petr@2ndquadrant.com>

src/backend/postmaster/pgstat.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/walreceiver.c
src/backend/replication/walreceiverfuncs.c
src/include/pgstat.h
src/include/replication/walreceiver.h

index a3921977c572d0d96aea464eed2188ccdc18426e..c7584cb1d3492e89684bf986c163e926a86d3bae 100644 (file)
@@ -3338,6 +3338,9 @@ pgstat_get_wait_client(WaitEventClient w)
        case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
            event_name = "WalReceiverWaitStart";
            break;
+       case WAIT_EVENT_LIBPQWALRECEIVER_READ:
+           event_name = "LibPQWalReceiverRead";
+           break;
        case WAIT_EVENT_WAL_SENDER_WAIT_WAL:
            event_name = "WalSenderWaitForWAL";
            break;
index f1c843e868c08bedf5a520df3052564f9c7fae15..6c01e7b991853a3388f11a36db605cb2abbb61eb 100644 (file)
 #include "pqexpbuffer.h"
 #include "access/xlog.h"
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "replication/walreceiver.h"
+#include "storage/proc.h"
 #include "utils/builtins.h"
 
-#ifdef HAVE_POLL_H
-#include <poll.h>
-#endif
-#ifdef HAVE_SYS_POLL_H
-#include <sys/poll.h>
-#endif
-#ifdef HAVE_SYS_SELECT_H
-#include <sys/select.h>
-#endif
-
 PG_MODULE_MAGIC;
 
 void       _PG_init(void);
@@ -59,7 +51,6 @@ static void libpqrcv_send(const char *buffer, int nbytes);
 static void libpqrcv_disconnect(void);
 
 /* Prototypes for private functions */
-static bool libpq_select(int timeout_ms);
 static PGresult *libpqrcv_PQexec(const char *query);
 
 /*
@@ -366,67 +357,6 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli,
    PQclear(res);
 }
 
-/*
- * Wait until we can read WAL stream, or timeout.
- *
- * Returns true if data has become available for reading, false if timed out
- * or interrupted by signal.
- *
- * This is based on pqSocketCheck.
- */
-static bool
-libpq_select(int timeout_ms)
-{
-   int         ret;
-
-   Assert(streamConn != NULL);
-   if (PQsocket(streamConn) < 0)
-       ereport(ERROR,
-               (errcode_for_socket_access(),
-                errmsg("invalid socket: %s", PQerrorMessage(streamConn))));
-
-   /* We use poll(2) if available, otherwise select(2) */
-   {
-#ifdef HAVE_POLL
-       struct pollfd input_fd;
-
-       input_fd.fd = PQsocket(streamConn);
-       input_fd.events = POLLIN | POLLERR;
-       input_fd.revents = 0;
-
-       ret = poll(&input_fd, 1, timeout_ms);
-#else                          /* !HAVE_POLL */
-
-       fd_set      input_mask;
-       struct timeval timeout;
-       struct timeval *ptr_timeout;
-
-       FD_ZERO(&input_mask);
-       FD_SET(PQsocket(streamConn), &input_mask);
-
-       if (timeout_ms < 0)
-           ptr_timeout = NULL;
-       else
-       {
-           timeout.tv_sec = timeout_ms / 1000;
-           timeout.tv_usec = (timeout_ms % 1000) * 1000;
-           ptr_timeout = &timeout;
-       }
-
-       ret = select(PQsocket(streamConn) + 1, &input_mask,
-                    NULL, NULL, ptr_timeout);
-#endif   /* HAVE_POLL */
-   }
-
-   if (ret == 0 || (ret < 0 && errno == EINTR))
-       return false;
-   if (ret < 0)
-       ereport(ERROR,
-               (errcode_for_socket_access(),
-                errmsg("select() failed: %m")));
-   return true;
-}
-
 /*
  * Send a query and wait for the results by using the asynchronous libpq
  * functions and the backend version of select().
@@ -470,14 +400,31 @@ libpqrcv_PQexec(const char *query)
         */
        while (PQisBusy(streamConn))
        {
+           int         rc;
+
            /*
             * We don't need to break down the sleep into smaller increments,
-            * and check for interrupts after each nap, since we can just
-            * elog(FATAL) within SIGTERM signal handler if the signal arrives
-            * in the middle of establishment of replication connection.
+            * since we'll get interrupted by signals and can either handle
+            * interrupts here or elog(FATAL) within SIGTERM signal handler if
+            * the signal arrives in the middle of establishment of
+            * replication connection.
             */
-           if (!libpq_select(-1))
-               continue;       /* interrupted */
+           ResetLatch(&MyProc->procLatch);
+           rc = WaitLatchOrSocket(&MyProc->procLatch,
+                                  WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
+                                  WL_LATCH_SET,
+                                  PQsocket(streamConn),
+                                  0,
+                                  WAIT_EVENT_LIBPQWALRECEIVER_READ);
+           if (rc & WL_POSTMASTER_DEATH)
+               exit(1);
+
+           /* interrupted */
+           if (rc & WL_LATCH_SET)
+           {
+               CHECK_FOR_INTERRUPTS();
+               continue;
+           }
            if (PQconsumeInput(streamConn) == 0)
                return NULL;    /* trouble */
        }
index 2bb3dce1b1c5bab7ac7d1e1aeb59e6e50b7bc7a3..8bfb041560823f2c3ca11ba09b42b7ece190c09f 100644 (file)
@@ -261,7 +261,7 @@ WalReceiverMain(void)
    /* Arrange to clean up at walreceiver exit */
    on_shmem_exit(WalRcvDie, 0);
 
-   OwnLatch(&walrcv->latch);
+   walrcv->latch = &MyProc->procLatch;
 
    /* Properly accept or ignore signals the postmaster might send us */
    pqsignal(SIGHUP, WalRcvSigHupHandler);      /* set flag to read config
@@ -483,7 +483,7 @@ WalReceiverMain(void)
                 * avoiding some system calls.
                 */
                Assert(wait_fd != PGINVALID_SOCKET);
-               rc = WaitLatchOrSocket(&walrcv->latch,
+               rc = WaitLatchOrSocket(walrcv->latch,
                                   WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
                                       WL_TIMEOUT | WL_LATCH_SET,
                                       wait_fd,
@@ -491,7 +491,7 @@ WalReceiverMain(void)
                                       WAIT_EVENT_WAL_RECEIVER_MAIN);
                if (rc & WL_LATCH_SET)
                {
-                   ResetLatch(&walrcv->latch);
+                   ResetLatch(walrcv->latch);
                    if (walrcv->force_reply)
                    {
                        /*
@@ -652,7 +652,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
    WakeupRecovery();
    for (;;)
    {
-       ResetLatch(&walrcv->latch);
+       ResetLatch(walrcv->latch);
 
        /*
         * Emergency bailout if postmaster has died.  This is to avoid the
@@ -687,7 +687,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
        }
        SpinLockRelease(&walrcv->mutex);
 
-       WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
+       WaitLatch(walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
                  WAIT_EVENT_WAL_RECEIVER_WAIT_START);
    }
 
@@ -763,7 +763,7 @@ WalRcvDie(int code, Datum arg)
    /* Ensure that all WAL records received are flushed to disk */
    XLogWalRcvFlush(true);
 
-   DisownLatch(&walrcv->latch);
+   walrcv->latch = NULL;
 
    SpinLockAcquire(&walrcv->mutex);
    Assert(walrcv->walRcvState == WALRCV_STREAMING ||
@@ -812,7 +812,8 @@ WalRcvShutdownHandler(SIGNAL_ARGS)
 
    got_SIGTERM = true;
 
-   SetLatch(&WalRcv->latch);
+   if (WalRcv->latch)
+       SetLatch(WalRcv->latch);
 
    /* Don't joggle the elbow of proc_exit */
    if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
@@ -1297,7 +1298,8 @@ void
 WalRcvForceReply(void)
 {
    WalRcv->force_reply = true;
-   SetLatch(&WalRcv->latch);
+   if (WalRcv->latch)
+       SetLatch(WalRcv->latch);
 }
 
 /*
index 5f6e423f1f63c74d53ed7607849a9bf81ac6eae9..01111a4c12bcd8f3ddde0ebe47edeab4ab3c304c 100644 (file)
@@ -64,7 +64,7 @@ WalRcvShmemInit(void)
        MemSet(WalRcv, 0, WalRcvShmemSize());
        WalRcv->walRcvState = WALRCV_STOPPED;
        SpinLockInit(&WalRcv->mutex);
-       InitSharedLatch(&WalRcv->latch);
+       WalRcv->latch = NULL;
    }
 }
 
@@ -279,8 +279,8 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
 
    if (launch)
        SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
-   else
-       SetLatch(&walrcv->latch);
+   else if (walrcv->latch)
+       SetLatch(walrcv->latch);
 }
 
 /*
index 0b85b7ad3ae9975eb20e6803c86c498e037ff0de..152ff0620852133323edda179e17539893fbe539 100644 (file)
@@ -763,6 +763,7 @@ typedef enum
    WAIT_EVENT_CLIENT_WRITE,
    WAIT_EVENT_SSL_OPEN_SERVER,
    WAIT_EVENT_WAL_RECEIVER_WAIT_START,
+   WAIT_EVENT_LIBPQWALRECEIVER_READ,
    WAIT_EVENT_WAL_SENDER_WAIT_WAL,
    WAIT_EVENT_WAL_SENDER_WRITE_DATA
 } WaitEventClient;
index cd787c92b3fc9eb2fb8de62d1199ceb64cc6c1c1..afbb8d8b9541e450d08702834406cc7b835be80c 100644 (file)
@@ -127,8 +127,9 @@ typedef struct
     * where to start streaming (after setting receiveStart and
     * receiveStartTLI), and also to tell it to send apply feedback to the
     * primary whenever specially marked commit records are applied.
+    * This is normally mapped to procLatch when walreceiver is running.
     */
-   Latch       latch;
+   Latch      *latch;
 } WalRcvData;
 
 extern WalRcvData *WalRcv;