diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 67da5ff71f..b8bb4f8eb0 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -55,8 +55,10 @@ static int32 heap_compare_slots(Datum a, Datum b, void *arg); static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state); static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, bool *done); -static void gather_merge_init(GatherMergeState *gm_state); static void ExecShutdownGatherMergeWorkers(GatherMergeState *node); +static void gather_merge_setup(GatherMergeState *gm_state); +static void gather_merge_init(GatherMergeState *gm_state); +static void gather_merge_clear_tuples(GatherMergeState *gm_state); static bool gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait); static void load_tuple_array(GatherMergeState *gm_state, int reader); @@ -149,14 +151,17 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags) } /* - * store the tuple descriptor into gather merge state, so we can use it - * later while initializing the gather merge slots. + * Store the tuple descriptor into gather merge state, so we can use it + * while initializing the gather merge slots. */ if (!ExecContextForcesOids(&gm_state->ps, &hasoid)) hasoid = false; tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid); gm_state->tupDesc = tupDesc; + /* Now allocate the workspace for gather merge */ + gather_merge_setup(gm_state); + return gm_state; } @@ -340,6 +345,9 @@ ExecReScanGatherMerge(GatherMergeState *node) /* Make sure any existing workers are gracefully shut down */ ExecShutdownGatherMergeWorkers(node); + /* Free any unused tuples, so we don't leak memory across rescans */ + gather_merge_clear_tuples(node); + /* Mark node so that shared state will be rebuilt at next call */ node->initialized = false; node->gm_initialized = false; @@ -370,9 +378,65 @@ ExecReScanGatherMerge(GatherMergeState *node) } /* - * Initialize the Gather merge tuple read. + * Set up the data structures that we'll need for Gather Merge. * - * Pull at least a single tuple from each worker + leader and set up the heap. + * We allocate these once on the basis of gm->num_workers, which is an + * upper bound for the number of workers we'll actually have. During + * a rescan, we reset the structures to empty. This approach simplifies + * not leaking memory across rescans. + * + * In the gm_slots[] array, index 0 is for the leader, and indexes 1 to n + * are for workers. The values placed into gm_heap correspond to indexes + * in gm_slots[]. The gm_tuple_buffers[] array, however, is indexed from + * 0 to n-1; it has no entry for the leader. + */ +static void +gather_merge_setup(GatherMergeState *gm_state) +{ + GatherMerge *gm = castNode(GatherMerge, gm_state->ps.plan); + int nreaders = gm->num_workers; + int i; + + /* + * Allocate gm_slots for the number of workers + one more slot for leader. + * Slot 0 is always for the leader. Leader always calls ExecProcNode() to + * read the tuple, and then stores it directly into its gm_slots entry. + * For other slots, code below will call ExecInitExtraTupleSlot() to + * create a slot for the worker's results. Note that during any single + * scan, we might have fewer than num_workers available workers, in which + * case the extra array entries go unused. + */ + gm_state->gm_slots = (TupleTableSlot **) + palloc0((nreaders + 1) * sizeof(TupleTableSlot *)); + + /* Allocate the tuple slot and tuple array for each worker */ + gm_state->gm_tuple_buffers = (GMReaderTupleBuffer *) + palloc0(nreaders * sizeof(GMReaderTupleBuffer)); + + for (i = 0; i < nreaders; i++) + { + /* Allocate the tuple array with length MAX_TUPLE_STORE */ + gm_state->gm_tuple_buffers[i].tuple = + (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE); + + /* Initialize tuple slot for worker */ + gm_state->gm_slots[i + 1] = ExecInitExtraTupleSlot(gm_state->ps.state); + ExecSetSlotDescriptor(gm_state->gm_slots[i + 1], + gm_state->tupDesc); + } + + /* Allocate the resources for the merge */ + gm_state->gm_heap = binaryheap_allocate(nreaders + 1, + heap_compare_slots, + gm_state); +} + +/* + * Initialize the Gather Merge. + * + * Reset data structures to ensure they're empty. Then pull at least one + * tuple from leader + each worker (or set its "done" indicator), and set up + * the heap. */ static void gather_merge_init(GatherMergeState *gm_state) @@ -381,38 +445,26 @@ gather_merge_init(GatherMergeState *gm_state) bool nowait = true; int i; - /* - * Allocate gm_slots for the number of workers + one more slot for leader. - * Last slot is always for leader. Leader always calls ExecProcNode() to - * read the tuple which will return the TupleTableSlot. Later it will - * directly get assigned to gm_slot. So just initialize leader gm_slot - * with NULL. For other slots, code below will call - * ExecInitExtraTupleSlot() to create a slot for the worker's results. - */ - gm_state->gm_slots = - palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *)); - gm_state->gm_slots[gm_state->nreaders] = NULL; + /* Assert that gather_merge_setup made enough space */ + Assert(nreaders <= castNode(GatherMerge, gm_state->ps.plan)->num_workers); - /* Initialize the tuple slot and tuple array for each worker */ - gm_state->gm_tuple_buffers = - (GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) * - gm_state->nreaders); - for (i = 0; i < gm_state->nreaders; i++) + /* Reset leader's tuple slot to empty */ + gm_state->gm_slots[0] = NULL; + + /* Reset the tuple slot and tuple array for each worker */ + for (i = 0; i < nreaders; i++) { - /* Allocate the tuple array with length MAX_TUPLE_STORE */ - gm_state->gm_tuple_buffers[i].tuple = - (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE); - - /* Initialize slot for worker */ - gm_state->gm_slots[i] = ExecInitExtraTupleSlot(gm_state->ps.state); - ExecSetSlotDescriptor(gm_state->gm_slots[i], - gm_state->tupDesc); + /* Reset tuple array to empty */ + gm_state->gm_tuple_buffers[i].nTuples = 0; + gm_state->gm_tuple_buffers[i].readCounter = 0; + /* Reset done flag to not-done */ + gm_state->gm_tuple_buffers[i].done = false; + /* Ensure output slot is empty */ + ExecClearTuple(gm_state->gm_slots[i + 1]); } - /* Allocate the resources for the merge */ - gm_state->gm_heap = binaryheap_allocate(gm_state->nreaders + 1, - heap_compare_slots, - gm_state); + /* Reset binary heap to empty */ + binaryheap_reset(gm_state->gm_heap); /* * First, try to read a tuple from each worker (including leader) in @@ -422,14 +474,13 @@ gather_merge_init(GatherMergeState *gm_state) * least one tuple) to the heap. */ reread: - for (i = 0; i < nreaders + 1; i++) + for (i = 0; i <= nreaders; i++) { CHECK_FOR_INTERRUPTS(); - /* ignore this source if already known done */ - if ((i < nreaders) ? - !gm_state->gm_tuple_buffers[i].done : - gm_state->need_to_scan_locally) + /* skip this source if already known done */ + if ((i == 0) ? gm_state->need_to_scan_locally : + !gm_state->gm_tuple_buffers[i - 1].done) { if (TupIsNull(gm_state->gm_slots[i])) { @@ -450,9 +501,9 @@ reread: } /* need not recheck leader, since nowait doesn't matter for it */ - for (i = 0; i < nreaders; i++) + for (i = 1; i <= nreaders; i++) { - if (!gm_state->gm_tuple_buffers[i].done && + if (!gm_state->gm_tuple_buffers[i - 1].done && TupIsNull(gm_state->gm_slots[i])) { nowait = false; @@ -467,23 +518,23 @@ reread: } /* - * Clear out the tuple table slots for each gather merge input. + * Clear out the tuple table slot, and any unused pending tuples, + * for each gather merge input. */ static void -gather_merge_clear_slots(GatherMergeState *gm_state) +gather_merge_clear_tuples(GatherMergeState *gm_state) { int i; for (i = 0; i < gm_state->nreaders; i++) { - pfree(gm_state->gm_tuple_buffers[i].tuple); - ExecClearTuple(gm_state->gm_slots[i]); - } + GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i]; - /* Free tuple array as we don't need it any more */ - pfree(gm_state->gm_tuple_buffers); - /* Free the binaryheap, which was created for sort */ - binaryheap_free(gm_state->gm_heap); + while (tuple_buffer->readCounter < tuple_buffer->nTuples) + heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]); + + ExecClearTuple(gm_state->gm_slots[i + 1]); + } } /* @@ -526,7 +577,7 @@ gather_merge_getnext(GatherMergeState *gm_state) if (binaryheap_empty(gm_state->gm_heap)) { /* All the queues are exhausted, and so is the heap */ - gather_merge_clear_slots(gm_state); + gather_merge_clear_tuples(gm_state); return NULL; } else @@ -548,10 +599,10 @@ load_tuple_array(GatherMergeState *gm_state, int reader) int i; /* Don't do anything if this is the leader. */ - if (reader == gm_state->nreaders) + if (reader == 0) return; - tuple_buffer = &gm_state->gm_tuple_buffers[reader]; + tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1]; /* If there's nothing in the array, reset the counters to zero. */ if (tuple_buffer->nTuples == tuple_buffer->readCounter) @@ -590,7 +641,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) * If we're being asked to generate a tuple from the leader, then we just * call ExecProcNode as normal to produce one. */ - if (gm_state->nreaders == reader) + if (reader == 0) { if (gm_state->need_to_scan_locally) { @@ -601,7 +652,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) if (!TupIsNull(outerTupleSlot)) { - gm_state->gm_slots[reader] = outerTupleSlot; + gm_state->gm_slots[0] = outerTupleSlot; return true; } /* need_to_scan_locally serves as "done" flag for leader */ @@ -611,7 +662,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) } /* Otherwise, check the state of the relevant tuple buffer. */ - tuple_buffer = &gm_state->gm_tuple_buffers[reader]; + tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1]; if (tuple_buffer->nTuples > tuple_buffer->readCounter) { @@ -621,8 +672,8 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) else if (tuple_buffer->done) { /* Reader is known to be exhausted. */ - DestroyTupleQueueReader(gm_state->reader[reader]); - gm_state->reader[reader] = NULL; + DestroyTupleQueueReader(gm_state->reader[reader - 1]); + gm_state->reader[reader - 1] = NULL; return false; } else @@ -649,14 +700,14 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) ExecStoreTuple(tup, /* tuple to store */ gm_state->gm_slots[reader], /* slot in which to store the * tuple */ - InvalidBuffer, /* buffer associated with this tuple */ - true); /* pfree this pointer if not from heap */ + InvalidBuffer, /* no buffer associated with tuple */ + true); /* pfree tuple when done with it */ return true; } /* - * Attempt to read a tuple from given reader. + * Attempt to read a tuple from given worker. */ static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, @@ -671,7 +722,7 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, CHECK_FOR_INTERRUPTS(); /* Attempt to read a tuple. */ - reader = gm_state->reader[nreader]; + reader = gm_state->reader[nreader - 1]; /* Run TupleQueueReaders in per-tuple context */ tupleContext = gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 6cf128a7f0..90a60abc4d 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1958,7 +1958,8 @@ typedef struct GatherMergeState int gm_nkeys; /* number of sort columns */ SortSupport gm_sortkeys; /* array of length gm_nkeys */ struct ParallelExecutorInfo *pei; - /* all remaining fields are reinitialized during a rescan: */ + /* all remaining fields are reinitialized during a rescan */ + /* (but the arrays are not reallocated, just cleared) */ int nworkers_launched; /* original number of workers */ int nreaders; /* number of active workers */ TupleTableSlot **gm_slots; /* array with nreaders+1 entries */