diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel index dfcbafabf0..db9ac3d504 100644 --- a/src/backend/access/transam/README.parallel +++ b/src/backend/access/transam/README.parallel @@ -222,7 +222,9 @@ pattern looks like this: ExitParallelMode(); -If desired, after WaitForParallelWorkersToFinish() has been called, another -call to LaunchParallelWorkers() can be made using the same parallel context. -Calls to these two functions can be alternated any number of times before -destroying the parallel context. +If desired, after WaitForParallelWorkersToFinish() has been called, the +context can be reset so that workers can be launched anew using the same +parallel context. To do this, first call ReinitializeParallelDSM() to +reinitialize state managed by the parallel context machinery itself; then, +perform any other necessary resetting of state; after that, you can again +call LaunchParallelWorkers. diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 35a873de6b..79cc9880bb 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -110,6 +110,7 @@ static void HandleParallelMessage(ParallelContext *, int, StringInfo msg); static void ParallelErrorContext(void *arg); static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc); static void ParallelWorkerMain(Datum main_arg); +static void WaitForParallelWorkersToExit(ParallelContext *pcxt); /* * Establish a new parallel context. This should be done after entering @@ -383,6 +384,46 @@ InitializeParallelDSM(ParallelContext *pcxt) MemoryContextSwitchTo(oldcontext); } +/* + * Reinitialize the dynamic shared memory segment for a parallel context such + * that we could launch workers for it again. + */ +void +ReinitializeParallelDSM(ParallelContext *pcxt) +{ + FixedParallelState *fps; + char *error_queue_space; + int i; + + if (pcxt->nworkers_launched == 0) + return; + + WaitForParallelWorkersToFinish(pcxt); + WaitForParallelWorkersToExit(pcxt); + + /* Reset a few bits of fixed parallel state to a clean state. */ + fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); + fps->workers_attached = 0; + fps->last_xlog_end = 0; + + /* Recreate error queues. */ + error_queue_space = + shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE); + for (i = 0; i < pcxt->nworkers; ++i) + { + char *start; + shm_mq *mq; + + start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE; + mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE); + shm_mq_set_receiver(mq, MyProc); + pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); + } + + /* Reset number of workers launched. */ + pcxt->nworkers_launched = 0; +} + /* * Launch parallel workers. */ @@ -404,52 +445,6 @@ LaunchParallelWorkers(ParallelContext *pcxt) /* We might be running in a short-lived memory context. */ oldcontext = MemoryContextSwitchTo(TopTransactionContext); - /* - * This function can be called for a parallel context for which it has - * already been called previously, but only if all of the old workers - * have already exited. When this case arises, we need to do some extra - * reinitialization. - */ - if (pcxt->nworkers_launched > 0) - { - FixedParallelState *fps; - char *error_queue_space; - - /* Clean out old worker handles. */ - for (i = 0; i < pcxt->nworkers; ++i) - { - if (pcxt->worker[i].error_mqh != NULL) - elog(ERROR, "previously launched worker still alive"); - if (pcxt->worker[i].bgwhandle != NULL) - { - pfree(pcxt->worker[i].bgwhandle); - pcxt->worker[i].bgwhandle = NULL; - } - } - - /* Reset a few bits of fixed parallel state to a clean state. */ - fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); - fps->workers_attached = 0; - fps->last_xlog_end = 0; - - /* Recreate error queues. */ - error_queue_space = - shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE); - for (i = 0; i < pcxt->nworkers; ++i) - { - char *start; - shm_mq *mq; - - start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE; - mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE); - shm_mq_set_receiver(mq, MyProc); - pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); - } - - /* Reset number of workers launched. */ - pcxt->nworkers_launched = 0; - } - /* Configure a worker. */ snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d", MyProcPid); @@ -501,7 +496,7 @@ LaunchParallelWorkers(ParallelContext *pcxt) } /* - * Wait for all workers to exit. + * Wait for all workers to finish computing. * * Even if the parallel operation seems to have completed successfully, it's * important to call this function afterwards. We must not miss any errors @@ -552,6 +547,46 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) } } +/* + * Wait for all workers to exit. + * + * This function ensures that workers have been completely shutdown. The + * difference between WaitForParallelWorkersToFinish and this function is + * that former just ensures that last message sent by worker backend is + * received by master backend whereas this ensures the complete shutdown. + */ +static void +WaitForParallelWorkersToExit(ParallelContext *pcxt) +{ + int i; + + /* Wait until the workers actually die. */ + for (i = 0; i < pcxt->nworkers; ++i) + { + BgwHandleStatus status; + + if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL) + continue; + + status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle); + + /* + * If the postmaster kicked the bucket, we have no chance of cleaning + * up safely -- we won't be able to tell when our workers are actually + * dead. This doesn't necessitate a PANIC since they will all abort + * eventually, but we can't safely continue this session. + */ + if (status == BGWH_POSTMASTER_DIED) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("postmaster exited during a parallel transaction"))); + + /* Release memory. */ + pfree(pcxt->worker[i].bgwhandle); + pcxt->worker[i].bgwhandle = NULL; + } +} + /* * Destroy a parallel context. * @@ -578,10 +613,10 @@ DestroyParallelContext(ParallelContext *pcxt) { for (i = 0; i < pcxt->nworkers; ++i) { - if (pcxt->worker[i].bgwhandle != NULL) - TerminateBackgroundWorker(pcxt->worker[i].bgwhandle); if (pcxt->worker[i].error_mqh != NULL) { + TerminateBackgroundWorker(pcxt->worker[i].bgwhandle); + pfree(pcxt->worker[i].error_mqh); pcxt->worker[i].error_mqh = NULL; } @@ -609,38 +644,14 @@ DestroyParallelContext(ParallelContext *pcxt) pcxt->private_memory = NULL; } - /* Wait until the workers actually die. */ - for (i = 0; i < pcxt->nworkers; ++i) - { - BgwHandleStatus status; - - if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL) - continue; - - /* - * We can't finish transaction commit or abort until all of the - * workers are dead. This means, in particular, that we can't respond - * to interrupts at this stage. - */ - HOLD_INTERRUPTS(); - status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle); - RESUME_INTERRUPTS(); - - /* - * If the postmaster kicked the bucket, we have no chance of cleaning - * up safely -- we won't be able to tell when our workers are actually - * dead. This doesn't necessitate a PANIC since they will all abort - * eventually, but we can't safely continue this session. - */ - if (status == BGWH_POSTMASTER_DIED) - ereport(FATAL, - (errcode(ERRCODE_ADMIN_SHUTDOWN), - errmsg("postmaster exited during a parallel transaction"))); - - /* Release memory. */ - pfree(pcxt->worker[i].bgwhandle); - pcxt->worker[i].bgwhandle = NULL; - } + /* + * We can't finish transaction commit or abort until all of the + * workers have exited. This means, in particular, that we can't respond + * to interrupts at this stage. + */ + HOLD_INTERRUPTS(); + WaitForParallelWorkersToExit(pcxt); + RESUME_INTERRUPTS(); /* Free the worker array itself. */ if (pcxt->worker != NULL) @@ -799,9 +810,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) case 'X': /* Terminate, indicating clean exit */ { - pfree(pcxt->worker[i].bgwhandle); pfree(pcxt->worker[i].error_mqh); - pcxt->worker[i].bgwhandle = NULL; pcxt->worker[i].error_mqh = NULL; break; } diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index efcbaef416..99a9de3cdc 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -84,7 +84,8 @@ static bool ExecParallelEstimate(PlanState *node, ExecParallelEstimateContext *e); static bool ExecParallelInitializeDSM(PlanState *node, ExecParallelInitializeDSMContext *d); -static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt); +static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt, + bool reinitialize); static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation); @@ -217,7 +218,7 @@ ExecParallelInitializeDSM(PlanState *planstate, * to the main backend and start the workers. */ static shm_mq_handle ** -ExecParallelSetupTupleQueues(ParallelContext *pcxt) +ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize) { shm_mq_handle **responseq; char *tqueuespace; @@ -231,9 +232,16 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt) responseq = (shm_mq_handle **) palloc(pcxt->nworkers * sizeof(shm_mq_handle *)); - /* Allocate space from the DSM for the queues themselves. */ - tqueuespace = shm_toc_allocate(pcxt->toc, - PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers); + /* + * If not reinitializing, allocate space from the DSM for the queues; + * otherwise, find the already allocated space. + */ + if (!reinitialize) + tqueuespace = + shm_toc_allocate(pcxt->toc, + PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers); + else + tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE); /* Create the queues, and become the receiver for each. */ for (i = 0; i < pcxt->nworkers; ++i) @@ -248,12 +256,23 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt) } /* Add array of queues to shm_toc, so others can find it. */ - shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace); + if (!reinitialize) + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace); /* Return array of handles. */ return responseq; } +/* + * Re-initialize the response queues for backend workers to return tuples + * to the main backend and start the workers. + */ +shm_mq_handle ** +ExecParallelReinitializeTupleQueues(ParallelContext *pcxt) +{ + return ExecParallelSetupTupleQueues(pcxt, true); +} + /* * Sets up the required infrastructure for backend workers to perform * execution and return results to the main backend. @@ -363,7 +382,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) pei->buffer_usage = bufusage_space; /* Set up tuple queues. */ - pei->tqueue = ExecParallelSetupTupleQueues(pcxt); + pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); /* * If instrumentation options were supplied, allocate space for the diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 9c1533e311..5f589614dc 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -41,6 +41,7 @@ static TupleTableSlot *gather_getnext(GatherState *gatherstate); +static void ExecShutdownGatherWorkers(GatherState *node); /* ---------------------------------------------------------------- @@ -150,9 +151,10 @@ ExecGather(GatherState *node) bool got_any_worker = false; /* Initialize the workers required to execute Gather node. */ - node->pei = ExecInitParallelPlan(node->ps.lefttree, - estate, - gather->num_workers); + if (!node->pei) + node->pei = ExecInitParallelPlan(node->ps.lefttree, + estate, + gather->num_workers); /* * Register backend workers. We might not get as many as we @@ -279,7 +281,7 @@ gather_getnext(GatherState *gatherstate) gatherstate->need_to_scan_locally, &done); if (done) - ExecShutdownGather(gatherstate); + ExecShutdownGatherWorkers(gatherstate); if (HeapTupleIsValid(tup)) { @@ -308,15 +310,15 @@ gather_getnext(GatherState *gatherstate) } /* ---------------------------------------------------------------- - * ExecShutdownGather + * ExecShutdownGatherWorkers * - * Destroy the setup for parallel workers. Collect all the - * stats after workers are stopped, else some work done by - * workers won't be accounted. + * Destroy the parallel workers. Collect all the stats after + * workers are stopped, else some work done by workers won't be + * accounted. * ---------------------------------------------------------------- */ void -ExecShutdownGather(GatherState *node) +ExecShutdownGatherWorkers(GatherState *node) { /* Shut down tuple queue funnel before shutting down workers. */ if (node->funnel != NULL) @@ -327,8 +329,25 @@ ExecShutdownGather(GatherState *node) /* Now shut down the workers. */ if (node->pei != NULL) - { ExecParallelFinish(node->pei); +} + +/* ---------------------------------------------------------------- + * ExecShutdownGather + * + * Destroy the setup for parallel workers including parallel context. + * Collect all the stats after workers are stopped, else some work + * done by workers won't be accounted. + * ---------------------------------------------------------------- + */ +void +ExecShutdownGather(GatherState *node) +{ + ExecShutdownGatherWorkers(node); + + /* Now destroy the parallel context. */ + if (node->pei != NULL) + { ExecParallelCleanup(node->pei); node->pei = NULL; } @@ -349,14 +368,21 @@ void ExecReScanGather(GatherState *node) { /* - * Re-initialize the parallel context and workers to perform rescan of - * relation. We want to gracefully shutdown all the workers so that they + * Re-initialize the parallel workers to perform rescan of relation. + * We want to gracefully shutdown all the workers so that they * should be able to propagate any error or other information to master - * backend before dying. + * backend before dying. Parallel context will be reused for rescan. */ - ExecShutdownGather(node); + ExecShutdownGatherWorkers(node); node->initialized = false; + if (node->pei) + { + ReinitializeParallelDSM(node->pei->pcxt); + node->pei->tqueue = + ExecParallelReinitializeTupleQueues(node->pei->pcxt); + } + ExecReScan(node->ps.lefttree); } diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index d4b7c5dd75..411db7964d 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -56,6 +56,7 @@ extern bool InitializingParallelWorker; extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers); extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers); extern void InitializeParallelDSM(ParallelContext *); +extern void ReinitializeParallelDSM(ParallelContext *pcxt); extern void LaunchParallelWorkers(ParallelContext *); extern void WaitForParallelWorkersToFinish(ParallelContext *); extern void DestroyParallelContext(ParallelContext *); diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 505500e76b..23c29ebb90 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -33,5 +33,6 @@ extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers); extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei); +extern shm_mq_handle **ExecParallelReinitializeTupleQueues(ParallelContext *pcxt); #endif /* EXECPARALLEL_H */