Add WL_SOCKET_CLOSED for socket shutdown events.
authorThomas Munro <tmunro@postgresql.org>
Mon, 14 Feb 2022 03:29:28 +0000 (16:29 +1300)
committerThomas Munro <tmunro@postgresql.org>
Mon, 14 Feb 2022 03:52:23 +0000 (16:52 +1300)
Provide a way for WaitEventSet to report that the remote peer has shut
down its socket, independently of whether there is any buffered data
remaining to be read.  This works only on systems where the kernel
exposes that information, namely:

* WAIT_USE_POLL builds using POLLRDHUP, if available
* WAIT_USE_EPOLL builds using EPOLLRDHUP
* WAIT_USE_KQUEUE builds using EV_EOF

Reviewed-by: Zhihong Yu <zyu@yugabyte.com>
Reviewed-by: Maksim Milyutin <milyutinma@gmail.com>
Discussion: https://postgr.es/m/77def86b27e41f0efcba411460e929ae%40postgrespro.ru

src/backend/storage/ipc/latch.c
src/include/storage/latch.h

index 5bb609b368d5751ccf8af88416b8a92295787fd0..c3aaa8bff03cba066f5149e57431adb130aa14be 100644 (file)
@@ -840,6 +840,7 @@ FreeWaitEventSet(WaitEventSet *set)
  * - WL_SOCKET_CONNECTED: Wait for socket connection to be established,
  *  can be combined with other WL_SOCKET_* events (on non-Windows
  *  platforms, this is the same as WL_SOCKET_WRITEABLE)
+ * - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer.
  * - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies
  *
  * Returns the offset in WaitEventSet->events (starting from 0), which can be
@@ -1042,12 +1043,16 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
    else
    {
        Assert(event->fd != PGINVALID_SOCKET);
-       Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
+       Assert(event->events & (WL_SOCKET_READABLE |
+                               WL_SOCKET_WRITEABLE |
+                               WL_SOCKET_CLOSED));
 
        if (event->events & WL_SOCKET_READABLE)
            epoll_ev.events |= EPOLLIN;
        if (event->events & WL_SOCKET_WRITEABLE)
            epoll_ev.events |= EPOLLOUT;
+       if (event->events & WL_SOCKET_CLOSED)
+           epoll_ev.events |= EPOLLRDHUP;
    }
 
    /*
@@ -1086,12 +1091,18 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
    }
    else
    {
-       Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
+       Assert(event->events & (WL_SOCKET_READABLE |
+                               WL_SOCKET_WRITEABLE |
+                               WL_SOCKET_CLOSED));
        pollfd->events = 0;
        if (event->events & WL_SOCKET_READABLE)
            pollfd->events |= POLLIN;
        if (event->events & WL_SOCKET_WRITEABLE)
            pollfd->events |= POLLOUT;
+#ifdef POLLRDHUP
+       if (event->events & WL_SOCKET_CLOSED)
+           pollfd->events |= POLLRDHUP;
+#endif
    }
 
    Assert(event->fd != PGINVALID_SOCKET);
@@ -1164,7 +1175,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
    Assert(event->events != WL_LATCH_SET || set->latch != NULL);
    Assert(event->events == WL_LATCH_SET ||
           event->events == WL_POSTMASTER_DEATH ||
-          (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)));
+          (event->events & (WL_SOCKET_READABLE |
+                            WL_SOCKET_WRITEABLE |
+                            WL_SOCKET_CLOSED)));
 
    if (event->events == WL_POSTMASTER_DEATH)
    {
@@ -1187,9 +1200,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
         * old event mask to the new event mask, since kevent treats readable
         * and writable as separate events.
         */
-       if (old_events & WL_SOCKET_READABLE)
+       if (old_events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED))
            old_filt_read = true;
-       if (event->events & WL_SOCKET_READABLE)
+       if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED))
            new_filt_read = true;
        if (old_events & WL_SOCKET_WRITEABLE)
            old_filt_write = true;
@@ -1209,7 +1222,10 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
                                     event);
    }
 
-   Assert(count > 0);
+   /* For WL_SOCKET_READ -> WL_SOCKET_CLOSED, no change needed. */
+   if (count == 0)
+       return;
+
    Assert(count <= 2);
 
    rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL);
@@ -1524,7 +1540,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
                returned_events++;
            }
        }
-       else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+       else if (cur_event->events & (WL_SOCKET_READABLE |
+                                     WL_SOCKET_WRITEABLE |
+                                     WL_SOCKET_CLOSED))
        {
            Assert(cur_event->fd != PGINVALID_SOCKET);
 
@@ -1542,6 +1560,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
                occurred_events->events |= WL_SOCKET_WRITEABLE;
            }
 
+           if ((cur_event->events & WL_SOCKET_CLOSED) &&
+               (cur_epoll_event->events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)))
+           {
+               /* remote peer shut down, or error */
+               occurred_events->events |= WL_SOCKET_CLOSED;
+           }
+
            if (occurred_events->events != 0)
            {
                occurred_events->fd = cur_event->fd;
@@ -1667,7 +1692,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
            occurred_events++;
            returned_events++;
        }
-       else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+       else if (cur_event->events & (WL_SOCKET_READABLE |
+                                     WL_SOCKET_WRITEABLE |
+                                     WL_SOCKET_CLOSED))
        {
            Assert(cur_event->fd >= 0);
 
@@ -1678,6 +1705,14 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
                occurred_events->events |= WL_SOCKET_READABLE;
            }
 
+           if ((cur_event->events & WL_SOCKET_CLOSED) &&
+               (cur_kqueue_event->filter == EVFILT_READ) &&
+               (cur_kqueue_event->flags & EV_EOF))
+           {
+               /* the remote peer has shut down */
+               occurred_events->events |= WL_SOCKET_CLOSED;
+           }
+
            if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
                (cur_kqueue_event->filter == EVFILT_WRITE))
            {
@@ -1788,7 +1823,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
                returned_events++;
            }
        }
-       else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+       else if (cur_event->events & (WL_SOCKET_READABLE |
+                                     WL_SOCKET_WRITEABLE |
+                                     WL_SOCKET_CLOSED))
        {
            int         errflags = POLLHUP | POLLERR | POLLNVAL;
 
@@ -1808,6 +1845,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
                occurred_events->events |= WL_SOCKET_WRITEABLE;
            }
 
+#ifdef POLLRDHUP
+           if ((cur_event->events & WL_SOCKET_CLOSED) &&
+               (cur_pollfd->revents & (POLLRDHUP | errflags)))
+           {
+               /* remote peer closed, or error */
+               occurred_events->events |= WL_SOCKET_CLOSED;
+           }
+#endif
+
            if (occurred_events->events != 0)
            {
                occurred_events->fd = cur_event->fd;
@@ -2014,6 +2060,21 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 }
 #endif
 
+/*
+ * Return whether the current build options can report WL_SOCKET_CLOSED.
+ */
+bool
+WaitEventSetCanReportClosed(void)
+{
+#if (defined(WAIT_USE_POLL) && defined(POLLRDHUP)) || \
+   defined(WAIT_USE_EPOLL) || \
+   defined(WAIT_USE_KQUEUE)
+   return true;
+#else
+   return false;
+#endif
+}
+
 /*
  * Get the number of wait events registered in a given WaitEventSet.
  */
index 3aa7b338343e3bdf4f2c84c07b9f61bd9beda16e..0dd79d73fa2e18a8d8ee396ff7ad563ee9d45e5d 100644 (file)
@@ -134,10 +134,11 @@ typedef struct Latch
 /* avoid having to deal with case on platforms not requiring it */
 #define WL_SOCKET_CONNECTED  WL_SOCKET_WRITEABLE
 #endif
-
+#define WL_SOCKET_CLOSED    (1 << 7)
 #define WL_SOCKET_MASK     (WL_SOCKET_READABLE | \
                             WL_SOCKET_WRITEABLE | \
-                            WL_SOCKET_CONNECTED)
+                            WL_SOCKET_CONNECTED | \
+                            WL_SOCKET_CLOSED)
 
 typedef struct WaitEvent
 {
@@ -180,5 +181,6 @@ extern int  WaitLatchOrSocket(Latch *latch, int wakeEvents,
                              pgsocket sock, long timeout, uint32 wait_event_info);
 extern void InitializeLatchWaitSet(void);
 extern int GetNumRegisteredWaitEvents(WaitEventSet *set);
+extern bool    WaitEventSetCanReportClosed(void);
 
 #endif                         /* LATCH_H */