diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 5bb609b368..c3aaa8bff0 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -840,6 +840,7 @@ FreeWaitEventSet(WaitEventSet *set) * - WL_SOCKET_CONNECTED: Wait for socket connection to be established, * can be combined with other WL_SOCKET_* events (on non-Windows * platforms, this is the same as WL_SOCKET_WRITEABLE) + * - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer. * - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies * * Returns the offset in WaitEventSet->events (starting from 0), which can be @@ -1042,12 +1043,16 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action) else { Assert(event->fd != PGINVALID_SOCKET); - Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)); + Assert(event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)); if (event->events & WL_SOCKET_READABLE) epoll_ev.events |= EPOLLIN; if (event->events & WL_SOCKET_WRITEABLE) epoll_ev.events |= EPOLLOUT; + if (event->events & WL_SOCKET_CLOSED) + epoll_ev.events |= EPOLLRDHUP; } /* @@ -1086,12 +1091,18 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event) } else { - Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)); + Assert(event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)); pollfd->events = 0; if (event->events & WL_SOCKET_READABLE) pollfd->events |= POLLIN; if (event->events & WL_SOCKET_WRITEABLE) pollfd->events |= POLLOUT; +#ifdef POLLRDHUP + if (event->events & WL_SOCKET_CLOSED) + pollfd->events |= POLLRDHUP; +#endif } Assert(event->fd != PGINVALID_SOCKET); @@ -1164,7 +1175,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) Assert(event->events != WL_LATCH_SET || set->latch != NULL); Assert(event->events == WL_LATCH_SET || event->events == WL_POSTMASTER_DEATH || - (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))); + (event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED))); if (event->events == WL_POSTMASTER_DEATH) { @@ -1187,9 +1200,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) * old event mask to the new event mask, since kevent treats readable * and writable as separate events. */ - if (old_events & WL_SOCKET_READABLE) + if (old_events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED)) old_filt_read = true; - if (event->events & WL_SOCKET_READABLE) + if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED)) new_filt_read = true; if (old_events & WL_SOCKET_WRITEABLE) old_filt_write = true; @@ -1209,7 +1222,10 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) event); } - Assert(count > 0); + /* For WL_SOCKET_READ -> WL_SOCKET_CLOSED, no change needed. */ + if (count == 0) + return; + Assert(count <= 2); rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL); @@ -1524,7 +1540,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, returned_events++; } } - else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + else if (cur_event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)) { Assert(cur_event->fd != PGINVALID_SOCKET); @@ -1542,6 +1560,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events->events |= WL_SOCKET_WRITEABLE; } + if ((cur_event->events & WL_SOCKET_CLOSED) && + (cur_epoll_event->events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP))) + { + /* remote peer shut down, or error */ + occurred_events->events |= WL_SOCKET_CLOSED; + } + if (occurred_events->events != 0) { occurred_events->fd = cur_event->fd; @@ -1667,7 +1692,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events++; returned_events++; } - else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + else if (cur_event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)) { Assert(cur_event->fd >= 0); @@ -1678,6 +1705,14 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events->events |= WL_SOCKET_READABLE; } + if ((cur_event->events & WL_SOCKET_CLOSED) && + (cur_kqueue_event->filter == EVFILT_READ) && + (cur_kqueue_event->flags & EV_EOF)) + { + /* the remote peer has shut down */ + occurred_events->events |= WL_SOCKET_CLOSED; + } + if ((cur_event->events & WL_SOCKET_WRITEABLE) && (cur_kqueue_event->filter == EVFILT_WRITE)) { @@ -1788,7 +1823,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, returned_events++; } } - else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + else if (cur_event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)) { int errflags = POLLHUP | POLLERR | POLLNVAL; @@ -1808,6 +1845,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events->events |= WL_SOCKET_WRITEABLE; } +#ifdef POLLRDHUP + if ((cur_event->events & WL_SOCKET_CLOSED) && + (cur_pollfd->revents & (POLLRDHUP | errflags))) + { + /* remote peer closed, or error */ + occurred_events->events |= WL_SOCKET_CLOSED; + } +#endif + if (occurred_events->events != 0) { occurred_events->fd = cur_event->fd; @@ -2014,6 +2060,21 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, } #endif +/* + * Return whether the current build options can report WL_SOCKET_CLOSED. + */ +bool +WaitEventSetCanReportClosed(void) +{ +#if (defined(WAIT_USE_POLL) && defined(POLLRDHUP)) || \ + defined(WAIT_USE_EPOLL) || \ + defined(WAIT_USE_KQUEUE) + return true; +#else + return false; +#endif +} + /* * Get the number of wait events registered in a given WaitEventSet. */ diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 3aa7b33834..0dd79d73fa 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -134,10 +134,11 @@ typedef struct Latch /* avoid having to deal with case on platforms not requiring it */ #define WL_SOCKET_CONNECTED WL_SOCKET_WRITEABLE #endif - +#define WL_SOCKET_CLOSED (1 << 7) #define WL_SOCKET_MASK (WL_SOCKET_READABLE | \ WL_SOCKET_WRITEABLE | \ - WL_SOCKET_CONNECTED) + WL_SOCKET_CONNECTED | \ + WL_SOCKET_CLOSED) typedef struct WaitEvent { @@ -180,5 +181,6 @@ extern int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info); extern void InitializeLatchWaitSet(void); extern int GetNumRegisteredWaitEvents(WaitEventSet *set); +extern bool WaitEventSetCanReportClosed(void); #endif /* LATCH_H */