diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index cb22174270..16a0bee610 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -527,9 +527,9 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) if (!anyone_alive) break; - WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1, + WaitLatch(MyLatch, WL_LATCH_SET, -1, WAIT_EVENT_PARALLEL_FINISH); - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); } if (pcxt->toc != NULL) diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index 96939327c3..8fbc03819d 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -172,9 +172,9 @@ mq_putmessage(char msgtype, const char *s, size_t len) if (result != SHM_MQ_WOULD_BLOCK) break; - WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0, + WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_PUT_MESSAGE); - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); } diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index c3454276bf..712d700481 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -1144,7 +1144,7 @@ WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle) if (status == BGWH_STOPPED) break; - rc = WaitLatch(&MyProc->procLatch, + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0, WAIT_EVENT_BGWORKER_SHUTDOWN); @@ -1154,7 +1154,7 @@ WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle) break; } - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); } return status; diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 726d1b5bd8..89c34b8225 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -176,7 +176,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, ? WL_SOCKET_READABLE : WL_SOCKET_WRITEABLE); - rc = WaitLatchOrSocket(&MyProc->procLatch, + rc = WaitLatchOrSocket(MyLatch, WL_POSTMASTER_DEATH | WL_LATCH_SET | io_flag, PQsocket(conn->streamConn), @@ -190,7 +190,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, /* Interrupted? */ if (rc & WL_LATCH_SET) { - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); } @@ -574,21 +574,22 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) * the signal arrives in the middle of establishment of * replication connection. */ - ResetLatch(&MyProc->procLatch); - rc = WaitLatchOrSocket(&MyProc->procLatch, + rc = WaitLatchOrSocket(MyLatch, WL_POSTMASTER_DEATH | WL_SOCKET_READABLE | WL_LATCH_SET, PQsocket(streamConn), 0, WAIT_EVENT_LIBPQWALRECEIVER); + + /* Emergency bailout? */ if (rc & WL_POSTMASTER_DEATH) exit(1); - /* interrupted */ + /* Interrupted? */ if (rc & WL_LATCH_SET) { + ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); - continue; } if (PQconsumeInput(streamConn) == 0) return NULL; /* trouble */ diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 5aaf24bfe4..5a3274b2c2 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -208,10 +208,15 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1000L, WAIT_EVENT_BGWORKER_STARTUP); + /* emergency bailout if postmaster has died */ if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(MyLatch); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } } return; @@ -440,10 +445,8 @@ logicalrep_worker_stop(Oid subid, Oid relid) LWLockRelease(LogicalRepWorkerLock); - CHECK_FOR_INTERRUPTS(); - /* Wait for signal. */ - rc = WaitLatch(&MyProc->procLatch, + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1000L, WAIT_EVENT_BGWORKER_STARTUP); @@ -451,7 +454,11 @@ logicalrep_worker_stop(Oid subid, Oid relid) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(&MyProc->procLatch); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } /* Check worker status. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); @@ -492,7 +499,7 @@ logicalrep_worker_stop(Oid subid, Oid relid) CHECK_FOR_INTERRUPTS(); /* Wait for more work. */ - rc = WaitLatch(&MyProc->procLatch, + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1000L, WAIT_EVENT_BGWORKER_SHUTDOWN); @@ -500,7 +507,11 @@ logicalrep_worker_stop(Oid subid, Oid relid) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(&MyProc->procLatch); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } } } @@ -876,7 +887,7 @@ ApplyLauncherMain(Datum main_arg) } /* Wait for more work. */ - rc = WaitLatch(&MyProc->procLatch, + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, wait_time, WAIT_EVENT_LOGICAL_LAUNCHER_MAIN); @@ -885,13 +896,17 @@ ApplyLauncherMain(Datum main_arg) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + if (got_SIGHUP) { got_SIGHUP = false; ProcessConfigFile(PGC_SIGHUP); } - - ResetLatch(&MyProc->procLatch); } LogicalRepCtx->launcher_pid = 0; diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index ed66602935..6e55d2d606 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -191,7 +191,7 @@ wait_for_relation_state_change(Oid relid, char expected_state) if (!worker) return false; - rc = WaitLatch(&MyProc->procLatch, + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); @@ -199,7 +199,7 @@ wait_for_relation_state_change(Oid relid, char expected_state) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); } return false; @@ -236,7 +236,7 @@ wait_for_worker_state_change(char expected_state) if (MyLogicalRepWorker->relstate == expected_state) return true; - rc = WaitLatch(&MyProc->procLatch, + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); @@ -244,7 +244,7 @@ wait_for_worker_state_change(char expected_state) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); } return false; @@ -604,7 +604,7 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Wait for more data or latch. */ - rc = WaitLatchOrSocket(&MyProc->procLatch, + rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA); @@ -613,7 +613,7 @@ copy_read_data(void *outbuf, int minread, int maxread) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); } return bytesread; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 51a64487cd..999d627c87 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1146,7 +1146,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* * Wait for more data or latch. */ - rc = WaitLatchOrSocket(&MyProc->procLatch, + rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, fd, NAPTIME_PER_CYCLE, @@ -1156,6 +1156,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + if (got_SIGHUP) { got_SIGHUP = false; @@ -1209,8 +1215,6 @@ LogicalRepApplyLoop(XLogRecPtr last_received) send_feedback(last_received, requestReply, requestReply); } - - ResetLatch(&MyProc->procLatch); } } diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c index 5afb21121b..b4b7d28dd5 100644 --- a/src/backend/storage/lmgr/condition_variable.c +++ b/src/backend/storage/lmgr/condition_variable.c @@ -68,14 +68,14 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv) { cv_wait_event_set = CreateWaitEventSet(TopMemoryContext, 1); AddWaitEventToSet(cv_wait_event_set, WL_LATCH_SET, PGINVALID_SOCKET, - &MyProc->procLatch, NULL); + MyLatch, NULL); } /* * Reset my latch before adding myself to the queue and before entering * the caller's predicate loop. */ - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); /* Add myself to the wait queue. */ SpinLockAcquire(&cv->mutex); @@ -135,7 +135,7 @@ ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info) WaitEventSetWait(cv_wait_event_set, -1, &event, 1, wait_event_info); /* Reset latch before testing whether we can return. */ - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); /* * If this process has been taken out of the wait list, then we know diff --git a/src/test/modules/worker_spi/worker_spi.c b/src/test/modules/worker_spi/worker_spi.c index 9abfc714a9..553baf0045 100644 --- a/src/test/modules/worker_spi/worker_spi.c +++ b/src/test/modules/worker_spi/worker_spi.c @@ -235,6 +235,8 @@ worker_spi_main(Datum main_arg) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); + CHECK_FOR_INTERRUPTS(); + /* * In case of a SIGHUP, just reload the configuration. */