diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index 67d86ed83b..c3f666e2f1 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -138,7 +138,8 @@ bool optimize_bounded_sort = true; * The objects we actually sort are SortTuple structs. These contain * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple), * which is a separate palloc chunk --- we assume it is just one chunk and - * can be freed by a simple pfree(). SortTuples also contain the tuple's + * can be freed by a simple pfree() (except during final on-the-fly merge, + * when memory is used in batch). SortTuples also contain the tuple's * first key column in Datum/nullflag format, and an index integer. * * Storing the first key column lets us save heap_getattr or index_getattr @@ -220,11 +221,13 @@ struct Tuplesortstate * tuples to return? */ bool boundUsed; /* true if we made use of a bounded heap */ int bound; /* if bounded, the maximum number of tuples */ + bool tuples; /* Can SortTuple.tuple ever be set? */ int64 availMem; /* remaining memory available, in bytes */ int64 allowedMem; /* total memory allowed, in bytes */ int maxTapes; /* number of tapes (Knuth's T) */ int tapeRange; /* maxTapes-1 (Knuth's P) */ - MemoryContext sortcontext; /* memory context holding all sort data */ + MemoryContext sortcontext; /* memory context holding most sort data */ + MemoryContext tuplecontext; /* memory context holding tuple data */ LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */ /* @@ -280,6 +283,15 @@ struct Tuplesortstate int memtupsize; /* allocated length of memtuples array */ bool growmemtuples; /* memtuples' growth still underway? */ + /* + * Memory for tuples is sometimes allocated in batch, rather than + * incrementally. This implies that incremental memory accounting has been + * abandoned. Currently, this only happens for the final on-the-fly merge + * step. Large batch allocations can store tuples (e.g. IndexTuples) + * without palloc() fragmentation and other overhead. + */ + bool batchUsed; + /* * While building initial runs, this is the current output run number * (starting at 0). Afterwards, it is the number of initial runs we made. @@ -314,6 +326,21 @@ struct Tuplesortstate int mergefreelist; /* head of freelist of recycled slots */ int mergefirstfree; /* first slot never used in this merge */ + /* + * Per-tape batch state, when final on-the-fly merge consumes memory from + * just a few large allocations. + * + * Aside from the general benefits of performing fewer individual retail + * palloc() calls, this also helps make merging more cache efficient, since + * each tape's tuples must naturally be accessed sequentially (in sorted + * order). + */ + int64 spacePerTape; /* Space (memory) for tuples (not slots) */ + char **mergetuples; /* Each tape's memory allocation */ + char **mergecurrent; /* Current offset into each tape's memory */ + char **mergetail; /* Last item's start point for each tape */ + char **mergeoverflow; /* Retail palloc() "overflow" for each tape */ + /* * Variables for Algorithm D. Note that destTape is a "logical" tape * number, ie, an index into the tp_xxx[] arrays. Be careful to keep @@ -389,9 +416,8 @@ struct Tuplesortstate * tuplesort_begin_datum and used only by the DatumTuple routines. */ Oid datumType; - /* we need typelen and byval in order to know how to copy the Datums. */ + /* we need typelen in order to know how to copy the Datums. */ int datumTypeLen; - bool datumTypeByVal; /* * Resource snapshot for time of sort start. @@ -405,7 +431,7 @@ struct Tuplesortstate #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup)) #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup)) #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len)) -#define LACKMEM(state) ((state)->availMem < 0) +#define LACKMEM(state) ((state)->availMem < 0 && !(state)->batchUsed) #define USEMEM(state,amt) ((state)->availMem -= (amt)) #define FREEMEM(state,amt) ((state)->availMem += (amt)) @@ -447,7 +473,13 @@ struct Tuplesortstate * rather than the originally-requested size. This is important since * palloc can add substantial overhead. It's not a complete answer since * we won't count any wasted space in palloc allocation blocks, but it's - * a lot better than what we were doing before 7.3. + * a lot better than what we were doing before 7.3. As of 9.6, a + * separate memory context is used for caller passed tuples. Resetting + * it at certain key increments significantly ameliorates fragmentation. + * Note that this places a responsibility on readtup and copytup routines + * to use the right memory context for these tuples (and to not use the + * reset context for anything whose lifetime needs to span multiple + * external sort runs). */ /* When using this macro, beware of double evaluation of len */ @@ -465,7 +497,14 @@ static void inittapes(Tuplesortstate *state); static void selectnewtape(Tuplesortstate *state); static void mergeruns(Tuplesortstate *state); static void mergeonerun(Tuplesortstate *state); -static void beginmerge(Tuplesortstate *state); +static void beginmerge(Tuplesortstate *state, bool finalMerge); +static void batchmemtuples(Tuplesortstate *state); +static void mergebatch(Tuplesortstate *state, int64 spacePerTape); +static void mergebatchone(Tuplesortstate *state, int srcTape, + SortTuple *stup, bool *should_free); +static void mergebatchfreetape(Tuplesortstate *state, int srcTape, + SortTuple *rtup, bool *should_free); +static void *mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen); static void mergepreread(Tuplesortstate *state); static void mergeprereadone(Tuplesortstate *state, int srcTape); static void dumptuples(Tuplesortstate *state, bool alltuples); @@ -477,6 +516,7 @@ static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex); static void reversedirection(Tuplesortstate *state); static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK); static void markrunend(Tuplesortstate *state, int tapenum); +static void *readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen); static int comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup); @@ -543,6 +583,7 @@ tuplesort_begin_common(int workMem, bool randomAccess) { Tuplesortstate *state; MemoryContext sortcontext; + MemoryContext tuplecontext; MemoryContext oldcontext; /* @@ -550,11 +591,26 @@ tuplesort_begin_common(int workMem, bool randomAccess) * needed by the sort will live inside this context. */ sortcontext = AllocSetContextCreate(CurrentMemoryContext, - "TupleSort", + "TupleSort main", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); + /* + * Caller tuple (e.g. IndexTuple) memory context. + * + * A dedicated child content used exclusively for caller passed tuples + * eases memory management. Resetting at key points reduces fragmentation. + * Note that the memtuples array of SortTuples is allocated in the parent + * context, not this context, because there is no need to free memtuples + * early. + */ + tuplecontext = AllocSetContextCreate(sortcontext, + "Caller tuples", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + /* * Make the Tuplesortstate within the per-sort context. This way, we * don't need a separate pfree() operation for it at shutdown. @@ -571,10 +627,12 @@ tuplesort_begin_common(int workMem, bool randomAccess) state->status = TSS_INITIAL; state->randomAccess = randomAccess; state->bounded = false; + state->tuples = true; state->boundUsed = false; state->allowedMem = workMem * (int64) 1024; state->availMem = state->allowedMem; state->sortcontext = sortcontext; + state->tuplecontext = tuplecontext; state->tapeset = NULL; state->memtupcount = 0; @@ -587,6 +645,7 @@ tuplesort_begin_common(int workMem, bool randomAccess) ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1); state->growmemtuples = true; + state->batchUsed = false; state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple)); USEMEM(state, GetMemoryChunkSpace(state->memtuples)); @@ -922,7 +981,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, /* lookup necessary attributes of the datum type */ get_typlenbyval(datumType, &typlen, &typbyval); state->datumTypeLen = typlen; - state->datumTypeByVal = typbyval; + state->tuples = !typbyval; /* Prepare SortSupport data */ state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData)); @@ -1258,7 +1317,7 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, ItemPointer self, Datum *values, bool *isnull) { - MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); + MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); SortTuple stup; Datum original; IndexTuple tuple; @@ -1273,6 +1332,8 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, RelationGetDescr(state->indexRel), &stup.isnull1); + MemoryContextSwitchTo(state->sortcontext); + if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1) { /* @@ -1333,7 +1394,7 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, void tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull) { - MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); + MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); SortTuple stup; /* @@ -1348,7 +1409,7 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull) * identical to stup.tuple. */ - if (isNull || state->datumTypeByVal) + if (isNull || !state->tuples) { /* * Set datum1 to zeroed representation for NULLs (to be consistent, and @@ -1357,6 +1418,7 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull) stup.datum1 = !isNull ? val : (Datum) 0; stup.isnull1 = isNull; stup.tuple = NULL; /* no separate storage */ + MemoryContextSwitchTo(state->sortcontext); } else { @@ -1365,6 +1427,7 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull) stup.isnull1 = false; stup.tuple = DatumGetPointer(original); USEMEM(state, GetMemoryChunkSpace(stup.tuple)); + MemoryContextSwitchTo(state->sortcontext); if (!state->sortKeys->abbrev_converter) { @@ -1670,6 +1733,7 @@ tuplesort_performsort(Tuplesortstate *state) * Internal routine to fetch the next tuple in either forward or back * direction into *stup. Returns FALSE if no more tuples. * If *should_free is set, the caller must pfree stup.tuple when done with it. + * Otherwise, caller should not use tuple following next call here. */ static bool tuplesort_gettuple_common(Tuplesortstate *state, bool forward, @@ -1681,6 +1745,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, { case TSS_SORTEDINMEM: Assert(forward || state->randomAccess); + Assert(!state->batchUsed); *should_free = false; if (forward) { @@ -1725,6 +1790,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, case TSS_SORTEDONTAPE: Assert(forward || state->randomAccess); + Assert(!state->batchUsed); *should_free = true; if (forward) { @@ -1810,7 +1876,9 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, case TSS_FINALMERGE: Assert(forward); - *should_free = true; + Assert(state->batchUsed || !state->tuples); + /* For now, assume tuple is stored in tape's batch memory */ + *should_free = false; /* * This code should match the inner loop of mergeonerun(). @@ -1818,18 +1886,17 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, if (state->memtupcount > 0) { int srcTape = state->memtuples[0].tupindex; - Size tuplen; int tupIndex; SortTuple *newtup; + /* + * Returned tuple is still counted in our memory space most + * of the time. See mergebatchone() for discussion of why + * caller may occasionally be required to free returned + * tuple, and how preread memory is managed with regard to + * edge cases more generally. + */ *stup = state->memtuples[0]; - /* returned tuple is no longer counted in our memory space */ - if (stup->tuple) - { - tuplen = GetMemoryChunkSpace(stup->tuple); - state->availMem += tuplen; - state->mergeavailmem[srcTape] += tuplen; - } tuplesort_heap_siftup(state, false); if ((tupIndex = state->mergenext[srcTape]) == 0) { @@ -1837,15 +1904,25 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, * out of preloaded data on this tape, try to read more * * Unlike mergeonerun(), we only preload from the single - * tape that's run dry. See mergepreread() comments. + * tape that's run dry, though not before preparing its + * batch memory for a new round of sequential consumption. + * See mergepreread() comments. */ + if (state->batchUsed) + mergebatchone(state, srcTape, stup, should_free); + mergeprereadone(state, srcTape); /* * if still no data, we've reached end of run on this tape */ if ((tupIndex = state->mergenext[srcTape]) == 0) + { + /* Free tape's buffer, avoiding dangling pointer */ + if (state->batchUsed) + mergebatchfreetape(state, srcTape, stup, should_free); return true; + } } /* pull next preread tuple from list, insert in heap */ newtup = &state->memtuples[tupIndex]; @@ -1912,6 +1989,8 @@ tuplesort_gettupleslot(Tuplesortstate *state, bool forward, * Fetch the next tuple in either forward or back direction. * Returns NULL if no more tuples. If *should_free is set, the * caller must pfree the returned tuple when done with it. + * If it is not set, caller should not use tuple following next + * call here. */ HeapTuple tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free) @@ -1931,6 +2010,8 @@ tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free) * Fetch the next index tuple in either forward or back direction. * Returns NULL if no more tuples. If *should_free is set, the * caller must pfree the returned tuple when done with it. + * If it is not set, caller should not use tuple following next + * call here. */ IndexTuple tuplesort_getindextuple(Tuplesortstate *state, bool forward, @@ -1979,7 +2060,7 @@ tuplesort_getdatum(Tuplesortstate *state, bool forward, if (state->sortKeys->abbrev_converter && abbrev) *abbrev = stup.datum1; - if (stup.isnull1 || state->datumTypeByVal) + if (stup.isnull1 || !state->tuples) { *val = stup.datum1; *isNull = stup.isnull1; @@ -2162,6 +2243,10 @@ inittapes(Tuplesortstate *state) state->mergelast = (int *) palloc0(maxTapes * sizeof(int)); state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int)); state->mergeavailmem = (int64 *) palloc0(maxTapes * sizeof(int64)); + state->mergetuples = (char **) palloc0(maxTapes * sizeof(char *)); + state->mergecurrent = (char **) palloc0(maxTapes * sizeof(char *)); + state->mergetail = (char **) palloc0(maxTapes * sizeof(char *)); + state->mergeoverflow = (char **) palloc0(maxTapes * sizeof(char *)); state->tp_fib = (int *) palloc0(maxTapes * sizeof(int)); state->tp_runs = (int *) palloc0(maxTapes * sizeof(int)); state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int)); @@ -2320,7 +2405,7 @@ mergeruns(Tuplesortstate *state) /* Tell logtape.c we won't be writing anymore */ LogicalTapeSetForgetFreeSpace(state->tapeset); /* Initialize for the final merge pass */ - beginmerge(state); + beginmerge(state, state->tuples); state->status = TSS_FINALMERGE; return; } @@ -2412,7 +2497,7 @@ mergeonerun(Tuplesortstate *state) * Start the merge by loading one tuple from each active source tape into * the heap. We can also decrease the input run/dummy run counts. */ - beginmerge(state); + beginmerge(state, false); /* * Execute merge by repeatedly extracting lowest tuple in heap, writing it @@ -2450,6 +2535,12 @@ mergeonerun(Tuplesortstate *state) state->mergeavailslots[srcTape]++; } + /* + * Reset tuple memory, now that no caller tuples are needed in memory. + * This prevents fragmentation. + */ + MemoryContextReset(state->tuplecontext); + /* * When the heap empties, we're done. Write an end-of-run marker on the * output tape, and increment its count of real runs. @@ -2471,9 +2562,12 @@ mergeonerun(Tuplesortstate *state) * which tapes contain active input runs in mergeactive[]. Then, load * as many tuples as we can from each active input tape, and finally * fill the merge heap with the first tuple from each active tape. + * + * finalMergeBatch indicates if this is the beginning of a final on-the-fly + * merge where a batched allocation of tuple memory is required. */ static void -beginmerge(Tuplesortstate *state) +beginmerge(Tuplesortstate *state, bool finalMergeBatch) { int activeTapes; int tapenum; @@ -2511,6 +2605,18 @@ beginmerge(Tuplesortstate *state) state->mergefreelist = 0; /* nothing in the freelist */ state->mergefirstfree = activeTapes; /* 1st slot avail for preread */ + if (finalMergeBatch) + { + /* Free outright buffers for tape never actually allocated */ + FREEMEM(state, (state->maxTapes - activeTapes) * TAPE_BUFFER_OVERHEAD); + + /* + * Grow memtuples one last time, since the palloc() overhead no longer + * incurred can make a big difference + */ + batchmemtuples(state); + } + /* * Initialize space allocation to let each active input tape have an equal * share of preread space. @@ -2518,7 +2624,7 @@ beginmerge(Tuplesortstate *state) Assert(activeTapes > 0); slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes; Assert(slotsPerTape > 0); - spacePerTape = state->availMem / activeTapes; + spacePerTape = MAXALIGN_DOWN(state->availMem / activeTapes); for (srcTape = 0; srcTape < state->maxTapes; srcTape++) { if (state->mergeactive[srcTape]) @@ -2528,6 +2634,15 @@ beginmerge(Tuplesortstate *state) } } + /* + * Preallocate tuple batch memory for each tape. This is the memory used + * for tuples themselves (not SortTuples), so it's never used by + * pass-by-value datum sorts. Memory allocation is performed here at most + * once per sort, just in advance of the final on-the-fly merge step. + */ + if (finalMergeBatch) + mergebatch(state, spacePerTape); + /* * Preread as many tuples as possible (and at least one) from each active * tape @@ -2551,10 +2666,318 @@ beginmerge(Tuplesortstate *state) tup->tupindex = state->mergefreelist; state->mergefreelist = tupIndex; state->mergeavailslots[srcTape]++; + +#ifdef TRACE_SORT + if (trace_sort && finalMergeBatch) + { + int64 perTapeKB = (spacePerTape + 1023) / 1024; + int64 usedSpaceKB; + int usedSlots; + + /* + * Report how effective batchmemtuples() was in balancing + * the number of slots against the need for memory for the + * underlying tuples (e.g. IndexTuples). The big preread of + * all tapes when switching to FINALMERGE state should be + * fairly representative of memory utilization during the + * final merge step, and in any case is the only point at + * which all tapes are guaranteed to have depleted either + * their batch memory allowance or slot allowance. Ideally, + * both will be completely depleted for every tape by now. + */ + usedSpaceKB = (state->mergecurrent[srcTape] - + state->mergetuples[srcTape] + 1023) / 1024; + usedSlots = slotsPerTape - state->mergeavailslots[srcTape]; + + elog(LOG, "tape %d initially used %ld KB of %ld KB batch " + "(%2.3f) and %d out of %d slots (%2.3f)", srcTape, + usedSpaceKB, perTapeKB, + (double) usedSpaceKB / (double) perTapeKB, + usedSlots, slotsPerTape, + (double) usedSlots / (double) slotsPerTape); + } +#endif } } } +/* + * batchmemtuples - grow memtuples without palloc overhead + * + * When called, availMem should be approximately the amount of memory we'd + * require to allocate memtupsize - memtupcount tuples (not SortTuples/slots) + * that were allocated with palloc() overhead, and in doing so use up all + * allocated slots. However, though slots and tuple memory is in balance + * following the last grow_memtuples() call, that's predicated on the observed + * average tuple size for the "final" grow_memtuples() call, which includes + * palloc overhead. + * + * This will perform an actual final grow_memtuples() call without any palloc() + * overhead, rebalancing the use of memory between slots and tuples. + */ +static void +batchmemtuples(Tuplesortstate *state) +{ + int64 refund; + int64 availMemLessRefund; + int memtupsize = state->memtupsize; + + /* For simplicity, assume no memtuples are actually currently counted */ + Assert(state->memtupcount == 0); + + /* + * Refund STANDARDCHUNKHEADERSIZE per tuple. + * + * This sometimes fails to make memory use prefectly balanced, but it + * should never make the situation worse. Note that Assert-enabled builds + * get a larger refund, due to a varying STANDARDCHUNKHEADERSIZE. + */ + refund = memtupsize * STANDARDCHUNKHEADERSIZE; + availMemLessRefund = state->availMem - refund; + + /* + * To establish balanced memory use after refunding palloc overhead, + * temporarily have our accounting indicate that we've allocated all + * memory we're allowed to less that refund, and call grow_memtuples() + * to have it increase the number of slots. + */ + state->growmemtuples = true; + USEMEM(state, availMemLessRefund); + (void) grow_memtuples(state); + /* Should not matter, but be tidy */ + FREEMEM(state, availMemLessRefund); + state->growmemtuples = false; + +#ifdef TRACE_SORT + if (trace_sort) + { + Size OldKb = (memtupsize * sizeof(SortTuple) + 1023) / 1024; + Size NewKb = (state->memtupsize * sizeof(SortTuple) + 1023) / 1024; + + elog(LOG, "grew memtuples %1.2fx from %d (%zu KB) to %d (%zu KB) for final merge", + (double) NewKb / (double) OldKb, + memtupsize, OldKb, + state->memtupsize, NewKb); + } +#endif +} + +/* + * mergebatch - initialize tuple memory in batch + * + * This allows sequential access to sorted tuples buffered in memory from + * tapes/runs on disk during a final on-the-fly merge step. Note that the + * memory is not used for SortTuples, but for the underlying tuples (e.g. + * MinimalTuples). + * + * Note that when batch memory is used, there is a simple division of space + * into large buffers (one per active tape). The conventional incremental + * memory accounting (calling USEMEM() and FREEMEM()) is abandoned. Instead, + * when each tape's memory budget is exceeded, a retail palloc() "overflow" is + * performed, which is then immediately detected in a way that is analogous to + * LACKMEM(). This keeps each tape's use of memory fair, which is always a + * goal. + */ +static void +mergebatch(Tuplesortstate *state, int64 spacePerTape) +{ + int srcTape; + + Assert(state->activeTapes > 0); + Assert(state->tuples); + + /* + * For the purposes of tuplesort's memory accounting, the batch allocation + * is special, and regular memory accounting through USEMEM() calls is + * abandoned (see mergeprereadone()). + */ + for (srcTape = 0; srcTape < state->maxTapes; srcTape++) + { + char *mergetuples; + + if (!state->mergeactive[srcTape]) + continue; + + /* Allocate buffer for each active tape */ + mergetuples = MemoryContextAllocHuge(state->tuplecontext, + spacePerTape); + + /* Initialize state for tape */ + state->mergetuples[srcTape] = mergetuples; + state->mergecurrent[srcTape] = mergetuples; + state->mergetail[srcTape] = mergetuples; + state->mergeoverflow[srcTape] = NULL; + } + + state->batchUsed = true; + state->spacePerTape = spacePerTape; +} + +/* + * mergebatchone - prepare batch memory for one merge input tape + * + * This is called following the exhaustion of preread tuples for one input + * tape. All that actually occurs is that the state for the source tape is + * reset to indicate that all memory may be reused. + * + * This routine must deal with fixing up the tuple that is about to be returned + * to the client, due to "overflow" allocations. + */ +static void +mergebatchone(Tuplesortstate *state, int srcTape, SortTuple *rtup, + bool *should_free) +{ + Assert(state->batchUsed); + + /* + * Tuple about to be returned to caller ("stup") is final preread tuple + * from tape, just removed from the top of the heap. Special steps around + * memory management must be performed for that tuple, to make sure it + * isn't overwritten early. + */ + if (!state->mergeoverflow[srcTape]) + { + Size tupLen; + + /* + * Mark tuple buffer range for reuse, but be careful to move final, + * tail tuple to start of space for next run so that it's available + * to caller when stup is returned, and remains available at least + * until the next tuple is requested. + */ + tupLen = state->mergecurrent[srcTape] - state->mergetail[srcTape]; + state->mergecurrent[srcTape] = state->mergetuples[srcTape]; + memmove(state->mergecurrent[srcTape], state->mergetail[srcTape], + tupLen); + + /* Make SortTuple at top of the merge heap point to new tuple */ + rtup->tuple = (void *) state->mergecurrent[srcTape]; + + state->mergetail[srcTape] = state->mergecurrent[srcTape]; + state->mergecurrent[srcTape] += tupLen; + } + else + { + /* + * Handle an "overflow" retail palloc. + * + * This is needed when we run out of tuple memory for the tape. + */ + state->mergecurrent[srcTape] = state->mergetuples[srcTape]; + state->mergetail[srcTape] = state->mergetuples[srcTape]; + + if (rtup->tuple) + { + Assert(rtup->tuple == (void *) state->mergeoverflow[srcTape]); + /* Caller should free palloc'd tuple */ + *should_free = true; + } + state->mergeoverflow[srcTape] = NULL; + } +} + +/* + * mergebatchfreetape - handle final clean-up for batch memory once tape is + * about to become exhausted + * + * All tuples are returned from tape, but a single final tuple, *rtup, is to be + * passed back to caller. Free tape's batch allocation buffer while ensuring + * that the final tuple is managed appropriately. + */ +static void +mergebatchfreetape(Tuplesortstate *state, int srcTape, SortTuple *rtup, + bool *should_free) +{ + Assert(state->batchUsed); + Assert(state->status == TSS_FINALMERGE); + + /* + * Tuple may or may not already be an overflow allocation from + * mergebatchone() + */ + if (!*should_free && rtup->tuple) + { + /* + * Final tuple still in tape's batch allocation. + * + * Return palloc()'d copy to caller, and have it freed in a similar + * manner to overflow allocation. Otherwise, we'd free batch memory + * and pass back a pointer to garbage. Note that we deliberately + * allocate this in the parent tuplesort context, to be on the safe + * side. + */ + Size tuplen; + void *oldTuple = rtup->tuple; + + tuplen = state->mergecurrent[srcTape] - state->mergetail[srcTape]; + rtup->tuple = MemoryContextAlloc(state->sortcontext, tuplen); + memcpy(rtup->tuple, oldTuple, tuplen); + *should_free = true; + } + + /* Free spacePerTape-sized buffer */ + pfree(state->mergetuples[srcTape]); +} + +/* + * mergebatchalloc - allocate memory for one tuple using a batch memory + * "logical allocation". + * + * This is used for the final on-the-fly merge phase only. READTUP() routines + * receive memory from here in place of palloc() and USEMEM() calls. + * + * Tuple tapenum is passed, ensuring each tape's tuples are stored in sorted, + * contiguous order (while allowing safe reuse of memory made available to + * each tape). This maximizes locality of access as tuples are returned by + * final merge. + * + * Caller must not subsequently attempt to free memory returned here. In + * general, only mergebatch* functions know about how memory returned from + * here should be freed, and this function's caller must ensure that batch + * memory management code will definitely have the opportunity to do the right + * thing during the final on-the-fly merge. + */ +static void * +mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen) +{ + Size reserve_tuplen = MAXALIGN(tuplen); + char *ret; + + /* Should overflow at most once before mergebatchone() call: */ + Assert(state->mergeoverflow[tapenum] == NULL); + Assert(state->batchUsed); + + /* It should be possible to use precisely spacePerTape memory at once */ + if (state->mergecurrent[tapenum] + reserve_tuplen <= + state->mergetuples[tapenum] + state->spacePerTape) + { + /* + * Usual case -- caller is returned pointer into its tape's buffer, and + * an offset from that point is recorded as where tape has consumed up + * to for current round of preloading. + */ + ret = state->mergetail[tapenum] = state->mergecurrent[tapenum]; + state->mergecurrent[tapenum] += reserve_tuplen; + } + else + { + /* + * Allocate memory, and record as tape's overflow allocation. This + * will be detected quickly, in a similar fashion to a LACKMEM() + * condition, and should not happen again before a new round of + * preloading for caller's tape. Note that we deliberately allocate + * this in the parent tuplesort context, to be on the safe side. + * + * Sometimes, this does not happen because merging runs out of slots + * before running out of memory. + */ + ret = state->mergeoverflow[tapenum] = + MemoryContextAlloc(state->sortcontext, tuplen); + } + + return ret; +} + /* * mergepreread - load tuples from merge input tapes * @@ -2576,7 +2999,9 @@ beginmerge(Tuplesortstate *state) * that state and so no point in scanning through all the tapes to fix one. * (Moreover, there may be quite a lot of inactive tapes in that state, since * we might have had many fewer runs than tapes. In a regular tape-to-tape - * merge we can expect most of the tapes to be active.) + * merge we can expect most of the tapes to be active. Plus, only + * FINALMERGE state has to consider memory management for a batch + * allocation.) */ static void mergepreread(Tuplesortstate *state) @@ -2605,9 +3030,20 @@ mergeprereadone(Tuplesortstate *state, int srcTape) if (!state->mergeactive[srcTape]) return; /* tape's run is already exhausted */ + + /* + * Manage per-tape availMem. Only actually matters when batch memory not + * in use. + */ priorAvail = state->availMem; state->availMem = state->mergeavailmem[srcTape]; - while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) || + + /* + * When batch memory is used if final on-the-fly merge, only mergeoverflow + * test is relevant; otherwise, only LACKMEM() test is relevant. + */ + while ((state->mergeavailslots[srcTape] > 0 && + state->mergeoverflow[srcTape] == NULL && !LACKMEM(state)) || state->mergenext[srcTape] == 0) { /* read next tuple, if any */ @@ -3093,6 +3529,42 @@ markrunend(Tuplesortstate *state, int tapenum) LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len)); } +/* + * Get memory for tuple from within READTUP() routine. Allocate + * memory and account for that, or consume from tape's batch + * allocation. + * + * Memory returned here in the final on-the-fly merge case is recycled + * from tape's batch allocation. Otherwise, callers must pfree() or + * reset tuple child memory context, and account for that with a + * FREEMEM(). Currently, this only ever needs to happen in WRITETUP() + * routines. + */ +static void * +readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen) +{ + if (state->batchUsed) + { + /* + * No USEMEM() call, because during final on-the-fly merge + * accounting is based on tape-private state. ("Overflow" + * allocations are detected as an indication that a new round + * or preloading is required. Preloading marks existing + * contents of tape's batch buffer for reuse.) + */ + return mergebatchalloc(state, tapenum, tuplen); + } + else + { + char *ret; + + /* Batch allocation yet to be performed */ + ret = MemoryContextAlloc(state->tuplecontext, tuplen); + USEMEM(state, GetMemoryChunkSpace(ret)); + return ret; + } +} + /* * Routines specialized for HeapTuple (actually MinimalTuple) case @@ -3171,6 +3643,7 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup) Datum original; MinimalTuple tuple; HeapTupleData htup; + MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); /* copy the tuple into sort storage */ tuple = ExecCopySlotMinimalTuple(slot); @@ -3184,6 +3657,8 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup) state->tupDesc, &stup->isnull1); + MemoryContextSwitchTo(oldcontext); + if (!state->sortKeys->abbrev_converter || stup->isnull1) { /* @@ -3266,11 +3741,10 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup, { unsigned int tupbodylen = len - sizeof(int); unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET; - MinimalTuple tuple = (MinimalTuple) palloc(tuplen); + MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tapenum, tuplen); char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; HeapTupleData htup; - USEMEM(state, GetMemoryChunkSpace(tuple)); /* read in the tuple proper */ tuple->t_len = tuplen; LogicalTapeReadExact(state->tapeset, tapenum, @@ -3409,12 +3883,15 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup) { HeapTuple tuple = (HeapTuple) tup; Datum original; + MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); /* copy the tuple into sort storage */ tuple = heap_copytuple(tuple); stup->tuple = (void *) tuple; USEMEM(state, GetMemoryChunkSpace(tuple)); + MemoryContextSwitchTo(oldcontext); + /* * set up first-column key value, and potentially abbreviate, if it's a * simple column @@ -3501,9 +3978,10 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int tuplen) { unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int); - HeapTuple tuple = (HeapTuple) palloc(t_len + HEAPTUPLESIZE); + HeapTuple tuple = (HeapTuple) readtup_alloc(state, + tapenum, + t_len + HEAPTUPLESIZE); - USEMEM(state, GetMemoryChunkSpace(tuple)); /* Reconstruct the HeapTupleData header */ tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE); tuple->t_len = t_len; @@ -3722,7 +4200,7 @@ copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup) Datum original; /* copy the tuple into sort storage */ - newtuple = (IndexTuple) palloc(tuplen); + newtuple = (IndexTuple) MemoryContextAlloc(state->tuplecontext, tuplen); memcpy(newtuple, tuple, tuplen); USEMEM(state, GetMemoryChunkSpace(newtuple)); stup->tuple = (void *) newtuple; @@ -3804,9 +4282,8 @@ readtup_index(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len) { unsigned int tuplen = len - sizeof(unsigned int); - IndexTuple tuple = (IndexTuple) palloc(tuplen); + IndexTuple tuple = (IndexTuple) readtup_alloc(state, tapenum, tuplen); - USEMEM(state, GetMemoryChunkSpace(tuple)); LogicalTapeReadExact(state->tapeset, tapenum, tuple, tuplen); if (state->randomAccess) /* need trailing length word? */ @@ -3864,7 +4341,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup) waddr = NULL; tuplen = 0; } - else if (state->datumTypeByVal) + else if (!state->tuples) { waddr = &stup->datum1; tuplen = sizeof(Datum); @@ -3906,7 +4383,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup, stup->isnull1 = true; stup->tuple = NULL; } - else if (state->datumTypeByVal) + else if (!state->tuples) { Assert(tuplen == sizeof(Datum)); LogicalTapeReadExact(state->tapeset, tapenum, @@ -3916,14 +4393,13 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup, } else { - void *raddr = palloc(tuplen); + void *raddr = readtup_alloc(state, tapenum, tuplen); LogicalTapeReadExact(state->tapeset, tapenum, raddr, tuplen); stup->datum1 = PointerGetDatum(raddr); stup->isnull1 = false; stup->tuple = raddr; - USEMEM(state, GetMemoryChunkSpace(raddr)); } if (state->randomAccess) /* need trailing length word? */