Use standard interrupt handling in logical replication launcher.
Previously the exit handling was only able to exit from within the main loop, and not from within the backend code it calls. Fix that by using the standard die() SIGTERM handler, and adding the necessary CHECK_FOR_INTERRUPTS() call. This requires adding yet another process-type-specific branch to ProcessInterrupts(), which hints that we probably should generalize that handling. But that's work for another day. Author: Petr Jelinek Reviewed-By: Andres Freund Discussion: https://postgr.es/m/fe072153-babd-3b5d-8052-73527a6eb657@2ndquadrant.com
This commit is contained in:
parent
5fd56b9f5b
commit
2c48f5db64
@ -81,7 +81,6 @@ static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
|
||||
|
||||
/* Flags set by signal handlers */
|
||||
static volatile sig_atomic_t got_SIGHUP = false;
|
||||
static volatile sig_atomic_t got_SIGTERM = false;
|
||||
|
||||
static bool on_commit_launcher_wakeup = false;
|
||||
|
||||
@ -634,20 +633,6 @@ logicalrep_worker_onexit(int code, Datum arg)
|
||||
ApplyLauncherWakeup();
|
||||
}
|
||||
|
||||
/* SIGTERM: set flag to exit at next convenient time */
|
||||
static void
|
||||
logicalrep_launcher_sigterm(SIGNAL_ARGS)
|
||||
{
|
||||
int save_errno = errno;
|
||||
|
||||
got_SIGTERM = true;
|
||||
|
||||
/* Waken anything waiting on the process latch */
|
||||
SetLatch(MyLatch);
|
||||
|
||||
errno = save_errno;
|
||||
}
|
||||
|
||||
/* SIGHUP: set flag to reload configuration at next convenient time */
|
||||
static void
|
||||
logicalrep_launcher_sighup(SIGNAL_ARGS)
|
||||
@ -809,13 +794,14 @@ ApplyLauncherMain(Datum main_arg)
|
||||
|
||||
before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
|
||||
|
||||
Assert(LogicalRepCtx->launcher_pid == 0);
|
||||
LogicalRepCtx->launcher_pid = MyProcPid;
|
||||
|
||||
/* Establish signal handlers. */
|
||||
pqsignal(SIGHUP, logicalrep_launcher_sighup);
|
||||
pqsignal(SIGTERM, logicalrep_launcher_sigterm);
|
||||
pqsignal(SIGTERM, die);
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
LogicalRepCtx->launcher_pid = MyProcPid;
|
||||
|
||||
/*
|
||||
* Establish connection to nailed catalogs (we only ever access
|
||||
* pg_subscription).
|
||||
@ -823,7 +809,7 @@ ApplyLauncherMain(Datum main_arg)
|
||||
BackgroundWorkerInitializeConnection(NULL, NULL);
|
||||
|
||||
/* Enter main loop */
|
||||
while (!got_SIGTERM)
|
||||
for (;;)
|
||||
{
|
||||
int rc;
|
||||
List *sublist;
|
||||
@ -833,6 +819,8 @@ ApplyLauncherMain(Datum main_arg)
|
||||
TimestampTz now;
|
||||
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
now = GetCurrentTimestamp();
|
||||
|
||||
/* Limit the start retry to once a wal_retrieve_retry_interval */
|
||||
@ -909,13 +897,16 @@ ApplyLauncherMain(Datum main_arg)
|
||||
}
|
||||
}
|
||||
|
||||
LogicalRepCtx->launcher_pid = 0;
|
||||
/* Not reachable */
|
||||
}
|
||||
|
||||
/* ... and if it returns, we're done */
|
||||
ereport(DEBUG1,
|
||||
(errmsg("logical replication launcher shutting down")));
|
||||
|
||||
proc_exit(0);
|
||||
/*
|
||||
* Is current process the logical replication launcher?
|
||||
*/
|
||||
bool
|
||||
IsLogicalLauncher(void)
|
||||
{
|
||||
return LogicalRepCtx->launcher_pid == MyProcPid;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -55,6 +55,7 @@
|
||||
#include "pg_getopt.h"
|
||||
#include "postmaster/autovacuum.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "replication/logicallauncher.h"
|
||||
#include "replication/logicalworker.h"
|
||||
#include "replication/slot.h"
|
||||
#include "replication/walsender.h"
|
||||
@ -2848,6 +2849,14 @@ ProcessInterrupts(void)
|
||||
ereport(FATAL,
|
||||
(errcode(ERRCODE_ADMIN_SHUTDOWN),
|
||||
errmsg("terminating logical replication worker due to administrator command")));
|
||||
else if (IsLogicalLauncher())
|
||||
{
|
||||
ereport(DEBUG1,
|
||||
(errmsg("logical replication launcher shutting down")));
|
||||
|
||||
/* The logical replication launcher can be stopped at any time. */
|
||||
proc_exit(0);
|
||||
}
|
||||
else if (RecoveryConflictPending && RecoveryConflictRetryable)
|
||||
{
|
||||
pgstat_report_recovery_conflict(RecoveryConflictReason);
|
||||
|
@ -24,4 +24,6 @@ extern void ApplyLauncherShmemInit(void);
|
||||
extern void ApplyLauncherWakeupAtCommit(void);
|
||||
extern void AtEOXact_ApplyLauncher(bool isCommit);
|
||||
|
||||
extern bool IsLogicalLauncher(void);
|
||||
|
||||
#endif /* LOGICALLAUNCHER_H */
|
||||
|
Loading…
x
Reference in New Issue
Block a user