diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index ee7a18137f..82c1ddcdcb 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -577,16 +577,7 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo) list_length(ParallelApplyWorkerPool) > (max_parallel_apply_workers_per_subscription / 2)) { - int slot_no; - uint16 generation; - - SpinLockAcquire(&winfo->shared->mutex); - generation = winfo->shared->logicalrep_worker_generation; - slot_no = winfo->shared->logicalrep_worker_slot_no; - SpinLockRelease(&winfo->shared->mutex); - - logicalrep_pa_worker_stop(slot_no, generation); - + logicalrep_pa_worker_stop(winfo); pa_free_worker_info(winfo); return; @@ -636,8 +627,11 @@ pa_detach_all_error_mq(void) { ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc); - shm_mq_detach(winfo->error_mq_handle); - winfo->error_mq_handle = NULL; + if (winfo->error_mq_handle) + { + shm_mq_detach(winfo->error_mq_handle); + winfo->error_mq_handle = NULL; + } } } @@ -845,6 +839,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) * Make sure the leader apply worker tries to read from our error queue one more * time. This guards against the case where we exit uncleanly without sending * an ErrorResponse, for example because some code calls proc_exit directly. + * + * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks, + * if any. See ParallelWorkerShutdown for details. */ static void pa_shutdown(int code, Datum arg) @@ -901,8 +898,6 @@ ParallelApplyWorkerMain(Datum main_arg) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("bad magic number in dynamic shared memory segment"))); - before_shmem_exit(pa_shutdown, PointerGetDatum(seg)); - /* Look up the shared information. */ shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false); MyParallelShared = shared; @@ -921,6 +916,13 @@ ParallelApplyWorkerMain(Datum main_arg) */ logicalrep_worker_attach(worker_slot); + /* + * Register the shutdown callback after we are attached to the worker + * slot. This is to ensure that MyLogicalRepWorker remains valid when this + * callback is invoked. + */ + before_shmem_exit(pa_shutdown, PointerGetDatum(seg)); + SpinLockAcquire(&MyParallelShared->mutex); MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation; MyParallelShared->logicalrep_worker_slot_no = worker_slot; diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index ceea126231..87b5593d2d 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -609,19 +609,37 @@ logicalrep_worker_stop(Oid subid, Oid relid) } /* - * Stop the logical replication parallel apply worker corresponding to the - * input slot number. + * Stop the given logical replication parallel apply worker. * * Node that the function sends SIGINT instead of SIGTERM to the parallel apply * worker so that the worker exits cleanly. */ void -logicalrep_pa_worker_stop(int slot_no, uint16 generation) +logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo) { + int slot_no; + uint16 generation; LogicalRepWorker *worker; + SpinLockAcquire(&winfo->shared->mutex); + generation = winfo->shared->logicalrep_worker_generation; + slot_no = winfo->shared->logicalrep_worker_slot_no; + SpinLockRelease(&winfo->shared->mutex); + Assert(slot_no >= 0 && slot_no < max_logical_replication_workers); + /* + * Detach from the error_mq_handle for the parallel apply worker before + * stopping it. This prevents the leader apply worker from trying to + * receive the message from the error queue that might already be detached + * by the parallel apply worker. + */ + if (winfo->error_mq_handle) + { + shm_mq_detach(winfo->error_mq_handle); + winfo->error_mq_handle = NULL; + } + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = &LogicalRepCtx->workers[slot_no]; diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index b57eed052f..343e781896 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -235,7 +235,7 @@ extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm); extern void logicalrep_worker_stop(Oid subid, Oid relid); -extern void logicalrep_pa_worker_stop(int slot_no, uint16 generation); +extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);