diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 5a3274b2c2..15dac00ffa 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -81,7 +81,6 @@ static void logicalrep_worker_cleanup(LogicalRepWorker *worker); /* Flags set by signal handlers */ static volatile sig_atomic_t got_SIGHUP = false; -static volatile sig_atomic_t got_SIGTERM = false; static bool on_commit_launcher_wakeup = false; @@ -634,20 +633,6 @@ logicalrep_worker_onexit(int code, Datum arg) ApplyLauncherWakeup(); } -/* SIGTERM: set flag to exit at next convenient time */ -static void -logicalrep_launcher_sigterm(SIGNAL_ARGS) -{ - int save_errno = errno; - - got_SIGTERM = true; - - /* Waken anything waiting on the process latch */ - SetLatch(MyLatch); - - errno = save_errno; -} - /* SIGHUP: set flag to reload configuration at next convenient time */ static void logicalrep_launcher_sighup(SIGNAL_ARGS) @@ -809,13 +794,14 @@ ApplyLauncherMain(Datum main_arg) before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0); + Assert(LogicalRepCtx->launcher_pid == 0); + LogicalRepCtx->launcher_pid = MyProcPid; + /* Establish signal handlers. */ pqsignal(SIGHUP, logicalrep_launcher_sighup); - pqsignal(SIGTERM, logicalrep_launcher_sigterm); + pqsignal(SIGTERM, die); BackgroundWorkerUnblockSignals(); - LogicalRepCtx->launcher_pid = MyProcPid; - /* * Establish connection to nailed catalogs (we only ever access * pg_subscription). @@ -823,7 +809,7 @@ ApplyLauncherMain(Datum main_arg) BackgroundWorkerInitializeConnection(NULL, NULL); /* Enter main loop */ - while (!got_SIGTERM) + for (;;) { int rc; List *sublist; @@ -833,6 +819,8 @@ ApplyLauncherMain(Datum main_arg) TimestampTz now; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; + CHECK_FOR_INTERRUPTS(); + now = GetCurrentTimestamp(); /* Limit the start retry to once a wal_retrieve_retry_interval */ @@ -909,13 +897,16 @@ ApplyLauncherMain(Datum main_arg) } } - LogicalRepCtx->launcher_pid = 0; + /* Not reachable */ +} - /* ... and if it returns, we're done */ - ereport(DEBUG1, - (errmsg("logical replication launcher shutting down"))); - - proc_exit(0); +/* + * Is current process the logical replication launcher? + */ +bool +IsLogicalLauncher(void) +{ + return LogicalRepCtx->launcher_pid == MyProcPid; } /* diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 1c60b43163..91ca8df2ea 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -55,6 +55,7 @@ #include "pg_getopt.h" #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" +#include "replication/logicallauncher.h" #include "replication/logicalworker.h" #include "replication/slot.h" #include "replication/walsender.h" @@ -2848,6 +2849,14 @@ ProcessInterrupts(void) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("terminating logical replication worker due to administrator command"))); + else if (IsLogicalLauncher()) + { + ereport(DEBUG1, + (errmsg("logical replication launcher shutting down"))); + + /* The logical replication launcher can be stopped at any time. */ + proc_exit(0); + } else if (RecoveryConflictPending && RecoveryConflictRetryable) { pgstat_report_recovery_conflict(RecoveryConflictReason); diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index d202a237e7..4f3e89e061 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -24,4 +24,6 @@ extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherWakeupAtCommit(void); extern void AtEOXact_ApplyLauncher(bool isCommit); +extern bool IsLogicalLauncher(void); + #endif /* LOGICALLAUNCHER_H */