Rewrite hash join to use simple linked lists instead of a
fixed-size hashtable. This should prevent 'hashtable out of memory' errors, unless you really do run out of memory. Note: target size for hashtable is now taken from -S postmaster switch, not -B, since it is local memory in the backend rather than shared memory.
This commit is contained in:
parent
d261a5ec86
commit
26069a58e8
@ -6,7 +6,7 @@
|
||||
* Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
*
|
||||
* $Id: nodeHash.c,v 1.34 1999/05/09 00:53:20 tgl Exp $
|
||||
* $Id: nodeHash.c,v 1.35 1999/05/18 21:33:06 tgl Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -22,11 +22,6 @@
|
||||
#include <stdio.h>
|
||||
#include <math.h>
|
||||
#include <string.h>
|
||||
#include <sys/file.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#include "postgres.h"
|
||||
#include "miscadmin.h"
|
||||
@ -34,17 +29,12 @@
|
||||
#include "executor/executor.h"
|
||||
#include "executor/nodeHash.h"
|
||||
#include "executor/nodeHashjoin.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/portal.h"
|
||||
|
||||
extern int NBuffers;
|
||||
extern int SortMem;
|
||||
|
||||
static int hashFunc(Datum key, int len, bool byVal);
|
||||
static RelativeAddr hashTableAlloc(int size, HashJoinTable hashtable);
|
||||
static void * absHashTableAlloc(int size, HashJoinTable hashtable);
|
||||
static void ExecHashOverflowInsert(HashJoinTable hashtable,
|
||||
HashBucket bucket,
|
||||
HeapTuple heapTuple);
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
* ExecHash
|
||||
@ -63,11 +53,7 @@ ExecHash(Hash *node)
|
||||
HashJoinTable hashtable;
|
||||
TupleTableSlot *slot;
|
||||
ExprContext *econtext;
|
||||
|
||||
int nbatch;
|
||||
File *batches = NULL;
|
||||
RelativeAddr *batchPos;
|
||||
int *batchSizes;
|
||||
int i;
|
||||
|
||||
/* ----------------
|
||||
@ -79,27 +65,25 @@ ExecHash(Hash *node)
|
||||
estate = node->plan.state;
|
||||
outerNode = outerPlan(node);
|
||||
|
||||
hashtable = node->hashtable;
|
||||
hashtable = hashstate->hashtable;
|
||||
if (hashtable == NULL)
|
||||
elog(ERROR, "ExecHash: hash table is NULL.");
|
||||
|
||||
nbatch = hashtable->nbatch;
|
||||
|
||||
if (nbatch > 0)
|
||||
{ /* if needs hash partition */
|
||||
/* --------------
|
||||
* allocate space for the file descriptors of batch files
|
||||
* then open the batch files in the current processes.
|
||||
* --------------
|
||||
{
|
||||
/* ----------------
|
||||
* Open temp files for inner batches, if needed.
|
||||
* Note that file buffers are palloc'd in regular executor context.
|
||||
* ----------------
|
||||
*/
|
||||
batches = (File *) palloc(nbatch * sizeof(File));
|
||||
for (i = 0; i < nbatch; i++)
|
||||
{
|
||||
batches[i] = OpenTemporaryFile();
|
||||
File tfile = OpenTemporaryFile();
|
||||
Assert(tfile >= 0);
|
||||
hashtable->innerBatchFile[i] = BufFileCreate(tfile);
|
||||
}
|
||||
hashstate->hashBatches = batches;
|
||||
batchPos = (RelativeAddr *) ABSADDR(hashtable->innerbatchPos);
|
||||
batchSizes = (int *) ABSADDR(hashtable->innerbatchSizes);
|
||||
}
|
||||
|
||||
/* ----------------
|
||||
@ -110,7 +94,7 @@ ExecHash(Hash *node)
|
||||
econtext = hashstate->cstate.cs_ExprContext;
|
||||
|
||||
/* ----------------
|
||||
* get tuple and insert into the hash table
|
||||
* get all inner tuples and insert into the hash table (or temp files)
|
||||
* ----------------
|
||||
*/
|
||||
for (;;)
|
||||
@ -118,26 +102,11 @@ ExecHash(Hash *node)
|
||||
slot = ExecProcNode(outerNode, (Plan *) node);
|
||||
if (TupIsNull(slot))
|
||||
break;
|
||||
|
||||
econtext->ecxt_innertuple = slot;
|
||||
ExecHashTableInsert(hashtable, econtext, hashkey,
|
||||
hashstate->hashBatches);
|
||||
|
||||
ExecHashTableInsert(hashtable, econtext, hashkey);
|
||||
ExecClearTuple(slot);
|
||||
}
|
||||
|
||||
/*
|
||||
* end of build phase, flush all the last pages of the batches.
|
||||
*/
|
||||
for (i = 0; i < nbatch; i++)
|
||||
{
|
||||
if (FileSeek(batches[i], 0L, SEEK_END) < 0)
|
||||
perror("FileSeek");
|
||||
if (FileWrite(batches[i], ABSADDR(hashtable->batch) + i * BLCKSZ, BLCKSZ) < 0)
|
||||
perror("FileWrite");
|
||||
NDirectFileWrite++;
|
||||
}
|
||||
|
||||
/* ---------------------
|
||||
* Return the slot so that we have the tuple descriptor
|
||||
* when we need to save/restore them. -Jeff 11 July 1991
|
||||
@ -173,10 +142,10 @@ ExecInitHash(Hash *node, EState *estate, Plan *parent)
|
||||
*/
|
||||
hashstate = makeNode(HashState);
|
||||
node->hashstate = hashstate;
|
||||
hashstate->hashBatches = NULL;
|
||||
hashstate->hashtable = NULL;
|
||||
|
||||
/* ----------------
|
||||
* Miscellanious initialization
|
||||
* Miscellaneous initialization
|
||||
*
|
||||
* + assign node's base_id
|
||||
* + assign debugging hooks and
|
||||
@ -186,7 +155,6 @@ ExecInitHash(Hash *node, EState *estate, Plan *parent)
|
||||
ExecAssignNodeBaseInfo(estate, &hashstate->cstate, parent);
|
||||
ExecAssignExprContext(estate, &hashstate->cstate);
|
||||
|
||||
#define HASH_NSLOTS 1
|
||||
/* ----------------
|
||||
* initialize our result slot
|
||||
* ----------------
|
||||
@ -214,6 +182,7 @@ ExecInitHash(Hash *node, EState *estate, Plan *parent)
|
||||
int
|
||||
ExecCountSlotsHash(Hash *node)
|
||||
{
|
||||
#define HASH_NSLOTS 1
|
||||
return ExecCountSlotsNode(outerPlan(node)) +
|
||||
ExecCountSlotsNode(innerPlan(node)) +
|
||||
HASH_NSLOTS;
|
||||
@ -230,16 +199,12 @@ ExecEndHash(Hash *node)
|
||||
{
|
||||
HashState *hashstate;
|
||||
Plan *outerPlan;
|
||||
File *batches;
|
||||
|
||||
/* ----------------
|
||||
* get info from the hash state
|
||||
* ----------------
|
||||
*/
|
||||
hashstate = node->hashstate;
|
||||
batches = hashstate->hashBatches;
|
||||
if (batches != NULL)
|
||||
pfree(batches);
|
||||
|
||||
/* ----------------
|
||||
* free projection info. no need to free result type info
|
||||
@ -256,21 +221,6 @@ ExecEndHash(Hash *node)
|
||||
ExecEndNode(outerPlan, (Plan *) node);
|
||||
}
|
||||
|
||||
static RelativeAddr
|
||||
hashTableAlloc(int size, HashJoinTable hashtable)
|
||||
{
|
||||
RelativeAddr p = hashtable->top;
|
||||
hashtable->top += MAXALIGN(size);
|
||||
return p;
|
||||
}
|
||||
|
||||
static void *
|
||||
absHashTableAlloc(int size, HashJoinTable hashtable)
|
||||
{
|
||||
RelativeAddr p = hashTableAlloc(size, hashtable);
|
||||
return ABSADDR(p);
|
||||
}
|
||||
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
* ExecHashTableCreate
|
||||
@ -285,22 +235,19 @@ HashJoinTable
|
||||
ExecHashTableCreate(Hash *node)
|
||||
{
|
||||
Plan *outerNode;
|
||||
int HashTBSize;
|
||||
int nbatch;
|
||||
int ntuples;
|
||||
int tupsize;
|
||||
int pages;
|
||||
int sqrtpages;
|
||||
IpcMemoryId shmid;
|
||||
double inner_rel_bytes;
|
||||
double hash_table_bytes;
|
||||
int nbatch;
|
||||
HashJoinTable hashtable;
|
||||
HashBucket bucket;
|
||||
int nbuckets;
|
||||
int totalbuckets;
|
||||
int bucketsize;
|
||||
int i;
|
||||
RelativeAddr *outerbatchPos;
|
||||
RelativeAddr *innerbatchPos;
|
||||
int *innerbatchSizes;
|
||||
Portal myPortal;
|
||||
char myPortalName[64];
|
||||
MemoryContext oldcxt;
|
||||
|
||||
/* ----------------
|
||||
* Get information about the size of the relation to be hashed
|
||||
@ -314,38 +261,48 @@ ExecHashTableCreate(Hash *node)
|
||||
ntuples = outerNode->plan_size;
|
||||
if (ntuples <= 0) /* force a plausible size if no info */
|
||||
ntuples = 1000;
|
||||
tupsize = outerNode->plan_width + sizeof(HeapTupleData);
|
||||
pages = (int) ceil((double) ntuples * tupsize * FUDGE_FAC / BLCKSZ);
|
||||
/* estimate tupsize based on footprint of tuple in hashtable...
|
||||
* but what about palloc overhead?
|
||||
*/
|
||||
tupsize = MAXALIGN(outerNode->plan_width) +
|
||||
MAXALIGN(sizeof(HashJoinTupleData));
|
||||
inner_rel_bytes = (double) ntuples * tupsize * FUDGE_FAC;
|
||||
|
||||
/*
|
||||
* Max hashtable size is NBuffers pages, but not less than
|
||||
* Target hashtable size is SortMem kilobytes, but not less than
|
||||
* sqrt(estimated inner rel size), so as to avoid horrible performance.
|
||||
* XXX since the hashtable is not allocated in shared mem anymore,
|
||||
* it would probably be more appropriate to drive this from -S than -B.
|
||||
*/
|
||||
sqrtpages = (int) ceil(sqrt((double) pages));
|
||||
HashTBSize = NBuffers;
|
||||
if (sqrtpages > HashTBSize)
|
||||
HashTBSize = sqrtpages;
|
||||
hash_table_bytes = sqrt(inner_rel_bytes);
|
||||
if (hash_table_bytes < (SortMem * 1024L))
|
||||
hash_table_bytes = SortMem * 1024L;
|
||||
|
||||
/*
|
||||
* Count the number of hash buckets we want for the whole relation,
|
||||
* and the number we can actually fit in the allowed memory.
|
||||
* NOTE: FUDGE_FAC here determines the fraction of the hashtable space
|
||||
* saved for overflow records. Need a better approach...
|
||||
* for an average bucket load of NTUP_PER_BUCKET (per virtual bucket!).
|
||||
*/
|
||||
totalbuckets = (int) ceil((double) ntuples / NTUP_PER_BUCKET);
|
||||
bucketsize = MAXALIGN(NTUP_PER_BUCKET * tupsize + sizeof(*bucket));
|
||||
nbuckets = (int) ((HashTBSize * BLCKSZ) / (bucketsize * FUDGE_FAC));
|
||||
totalbuckets = (int) ceil((double) ntuples * FUDGE_FAC / NTUP_PER_BUCKET);
|
||||
|
||||
/*
|
||||
* Count the number of buckets we think will actually fit in the
|
||||
* target memory size, at a loading of NTUP_PER_BUCKET (physical buckets).
|
||||
* NOTE: FUDGE_FAC here determines the fraction of the hashtable space
|
||||
* reserved to allow for nonuniform distribution of hash values.
|
||||
* Perhaps this should be a different number from the other uses of
|
||||
* FUDGE_FAC, but since we have no real good way to pick either one...
|
||||
*/
|
||||
bucketsize = NTUP_PER_BUCKET * tupsize;
|
||||
nbuckets = (int) (hash_table_bytes / (bucketsize * FUDGE_FAC));
|
||||
if (nbuckets <= 0)
|
||||
nbuckets = 1;
|
||||
|
||||
if (totalbuckets <= nbuckets)
|
||||
{
|
||||
/* We have enough space, so no batching. In theory we could
|
||||
* even reduce HashTBSize, but as long as we don't have a way
|
||||
* to deal with overflow-space overrun, best to leave the
|
||||
* extra space available for overflow.
|
||||
* even reduce nbuckets, but since that could lead to poor
|
||||
* behavior if estimated ntuples is much less than reality,
|
||||
* it seems better to make more buckets instead of fewer.
|
||||
*/
|
||||
nbuckets = totalbuckets;
|
||||
totalbuckets = nbuckets;
|
||||
nbatch = 0;
|
||||
}
|
||||
else
|
||||
@ -356,7 +313,8 @@ ExecHashTableCreate(Hash *node)
|
||||
* of groups we will use for the part of the data that doesn't
|
||||
* fall into the first nbuckets hash buckets.
|
||||
*/
|
||||
nbatch = (int) ceil((double) (pages - HashTBSize) / HashTBSize);
|
||||
nbatch = (int) ceil((inner_rel_bytes - hash_table_bytes) /
|
||||
hash_table_bytes);
|
||||
if (nbatch <= 0)
|
||||
nbatch = 1;
|
||||
}
|
||||
@ -374,89 +332,116 @@ ExecHashTableCreate(Hash *node)
|
||||
#endif
|
||||
|
||||
/* ----------------
|
||||
* in non-parallel machines, we don't need to put the hash table
|
||||
* in the shared memory. We just palloc it. The space needed
|
||||
* is the hash area itself plus nbatch+1 I/O buffer pages.
|
||||
* ----------------
|
||||
*/
|
||||
hashtable = (HashJoinTable) palloc((HashTBSize + nbatch + 1) * BLCKSZ);
|
||||
shmid = 0;
|
||||
|
||||
if (hashtable == NULL)
|
||||
elog(ERROR, "not enough memory for hashjoin.");
|
||||
/* ----------------
|
||||
* initialize the hash table header
|
||||
* Initialize the hash table control block.
|
||||
* The hashtable control block is just palloc'd from executor memory.
|
||||
* ----------------
|
||||
*/
|
||||
hashtable = (HashJoinTable) palloc(sizeof(HashTableData));
|
||||
hashtable->nbuckets = nbuckets;
|
||||
hashtable->totalbuckets = totalbuckets;
|
||||
hashtable->bucketsize = bucketsize;
|
||||
hashtable->shmid = shmid;
|
||||
hashtable->top = MAXALIGN(sizeof(HashTableData));
|
||||
hashtable->bottom = HashTBSize * BLCKSZ;
|
||||
/*
|
||||
* hashtable->readbuf has to be maxaligned!!!
|
||||
* Note there are nbatch additional pages available after readbuf;
|
||||
* these are used for buffering the outgoing batch data.
|
||||
*/
|
||||
hashtable->readbuf = hashtable->bottom;
|
||||
hashtable->batch = hashtable->bottom + BLCKSZ;
|
||||
hashtable->buckets = NULL;
|
||||
hashtable->nbatch = nbatch;
|
||||
hashtable->curbatch = 0;
|
||||
hashtable->pcount = hashtable->nprocess = 0;
|
||||
hashtable->innerBatchFile = NULL;
|
||||
hashtable->outerBatchFile = NULL;
|
||||
hashtable->innerBatchSize = NULL;
|
||||
hashtable->outerBatchSize = NULL;
|
||||
|
||||
/* ----------------
|
||||
* Create a named portal in which to keep the hashtable working storage.
|
||||
* Each hashjoin must have its own portal, so be wary of name conflicts.
|
||||
* ----------------
|
||||
*/
|
||||
i = 0;
|
||||
do {
|
||||
i++;
|
||||
sprintf(myPortalName, "<hashtable %d>", i);
|
||||
myPortal = GetPortalByName(myPortalName);
|
||||
} while (PortalIsValid(myPortal));
|
||||
myPortal = CreatePortal(myPortalName);
|
||||
Assert(PortalIsValid(myPortal));
|
||||
hashtable->myPortal = (void*) myPortal; /* kluge for circular includes */
|
||||
hashtable->hashCxt = (MemoryContext) PortalGetVariableMemory(myPortal);
|
||||
hashtable->batchCxt = (MemoryContext) PortalGetHeapMemory(myPortal);
|
||||
|
||||
/* Allocate data that will live for the life of the hashjoin */
|
||||
|
||||
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
|
||||
|
||||
if (nbatch > 0)
|
||||
{
|
||||
/* ---------------
|
||||
* allocate and initialize the outer batches
|
||||
* allocate and initialize the file arrays in hashCxt
|
||||
* ---------------
|
||||
*/
|
||||
outerbatchPos = (RelativeAddr *)
|
||||
absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable);
|
||||
hashtable->innerBatchFile = (BufFile **)
|
||||
palloc(nbatch * sizeof(BufFile *));
|
||||
hashtable->outerBatchFile = (BufFile **)
|
||||
palloc(nbatch * sizeof(BufFile *));
|
||||
hashtable->innerBatchSize = (long *)
|
||||
palloc(nbatch * sizeof(long));
|
||||
hashtable->outerBatchSize = (long *)
|
||||
palloc(nbatch * sizeof(long));
|
||||
for (i = 0; i < nbatch; i++)
|
||||
{
|
||||
outerbatchPos[i] = -1;
|
||||
hashtable->innerBatchFile[i] = NULL;
|
||||
hashtable->outerBatchFile[i] = NULL;
|
||||
hashtable->innerBatchSize[i] = 0;
|
||||
hashtable->outerBatchSize[i] = 0;
|
||||
}
|
||||
hashtable->outerbatchPos = RELADDR(outerbatchPos);
|
||||
/* ---------------
|
||||
* allocate and initialize the inner batches
|
||||
* ---------------
|
||||
*/
|
||||
innerbatchPos = (RelativeAddr *)
|
||||
absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable);
|
||||
innerbatchSizes = (int *)
|
||||
absHashTableAlloc(nbatch * sizeof(int), hashtable);
|
||||
for (i = 0; i < nbatch; i++)
|
||||
{
|
||||
innerbatchPos[i] = -1;
|
||||
innerbatchSizes[i] = 0;
|
||||
}
|
||||
hashtable->innerbatchPos = RELADDR(innerbatchPos);
|
||||
hashtable->innerbatchSizes = RELADDR(innerbatchSizes);
|
||||
}
|
||||
else
|
||||
{
|
||||
hashtable->outerbatchPos = (RelativeAddr) NULL;
|
||||
hashtable->innerbatchPos = (RelativeAddr) NULL;
|
||||
hashtable->innerbatchSizes = (RelativeAddr) NULL;
|
||||
/* The files will not be opened until later... */
|
||||
}
|
||||
|
||||
hashtable->overflownext = hashtable->top + bucketsize * nbuckets;
|
||||
Assert(hashtable->overflownext < hashtable->bottom);
|
||||
/* ----------------
|
||||
* initialize each hash bucket
|
||||
* ----------------
|
||||
/* Prepare portal for the first-scan space allocations;
|
||||
* allocate the hashbucket array therein, and set each bucket "empty".
|
||||
*/
|
||||
bucket = (HashBucket) ABSADDR(hashtable->top);
|
||||
MemoryContextSwitchTo(hashtable->batchCxt);
|
||||
StartPortalAllocMode(DefaultAllocMode, 0);
|
||||
|
||||
hashtable->buckets = (HashJoinTuple *)
|
||||
palloc(nbuckets * sizeof(HashJoinTuple));
|
||||
|
||||
if (hashtable->buckets == NULL)
|
||||
elog(ERROR, "Insufficient memory for hash table.");
|
||||
|
||||
for (i = 0; i < nbuckets; i++)
|
||||
{
|
||||
bucket->top = RELADDR((char *) bucket + MAXALIGN(sizeof(*bucket)));
|
||||
bucket->bottom = bucket->top;
|
||||
bucket->firstotuple = bucket->lastotuple = -1;
|
||||
bucket = (HashBucket) ((char *) bucket + bucketsize);
|
||||
hashtable->buckets[i] = NULL;
|
||||
}
|
||||
|
||||
MemoryContextSwitchTo(oldcxt);
|
||||
|
||||
return hashtable;
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
* ExecHashTableDestroy
|
||||
*
|
||||
* destroy a hash table
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
void
|
||||
ExecHashTableDestroy(HashJoinTable hashtable)
|
||||
{
|
||||
int i;
|
||||
|
||||
/* Make sure all the temp files are closed */
|
||||
for (i = 0; i < hashtable->nbatch; i++)
|
||||
{
|
||||
if (hashtable->innerBatchFile[i])
|
||||
BufFileClose(hashtable->innerBatchFile[i]);
|
||||
if (hashtable->outerBatchFile[i])
|
||||
BufFileClose(hashtable->outerBatchFile[i]);
|
||||
}
|
||||
|
||||
/* Destroy the portal to release all working memory */
|
||||
/* cast here is a kluge for circular includes... */
|
||||
PortalDestroy((Portal*) & hashtable->myPortal);
|
||||
|
||||
/* And drop the control block */
|
||||
pfree(hashtable);
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
* ExecHashTableInsert
|
||||
*
|
||||
@ -467,32 +452,11 @@ ExecHashTableCreate(Hash *node)
|
||||
void
|
||||
ExecHashTableInsert(HashJoinTable hashtable,
|
||||
ExprContext *econtext,
|
||||
Var *hashkey,
|
||||
File *batches)
|
||||
Var *hashkey)
|
||||
{
|
||||
TupleTableSlot *slot;
|
||||
HeapTuple heapTuple;
|
||||
HashBucket bucket;
|
||||
int bucketno;
|
||||
int nbatch;
|
||||
int batchno;
|
||||
char *buffer;
|
||||
RelativeAddr *batchPos;
|
||||
int *batchSizes;
|
||||
char *pos;
|
||||
|
||||
nbatch = hashtable->nbatch;
|
||||
batchPos = (RelativeAddr *) ABSADDR(hashtable->innerbatchPos);
|
||||
batchSizes = (int *) ABSADDR(hashtable->innerbatchSizes);
|
||||
|
||||
slot = econtext->ecxt_innertuple;
|
||||
heapTuple = slot->val;
|
||||
|
||||
#ifdef HJDEBUG
|
||||
printf("Inserting ");
|
||||
#endif
|
||||
|
||||
bucketno = ExecHashGetBucket(hashtable, econtext, hashkey);
|
||||
int bucketno = ExecHashGetBucket(hashtable, econtext, hashkey);
|
||||
TupleTableSlot *slot = econtext->ecxt_innertuple;
|
||||
HeapTuple heapTuple = slot->val;
|
||||
|
||||
/* ----------------
|
||||
* decide whether to put the tuple in the hash table or a tmp file
|
||||
@ -504,22 +468,24 @@ ExecHashTableInsert(HashJoinTable hashtable,
|
||||
* put the tuple in hash table
|
||||
* ---------------
|
||||
*/
|
||||
bucket = (HashBucket)
|
||||
(ABSADDR(hashtable->top) + bucketno * hashtable->bucketsize);
|
||||
if (((char *) MAXALIGN(ABSADDR(bucket->bottom)) - (char *) bucket)
|
||||
+ heapTuple->t_len + HEAPTUPLESIZE > hashtable->bucketsize)
|
||||
ExecHashOverflowInsert(hashtable, bucket, heapTuple);
|
||||
else
|
||||
{
|
||||
memmove((char *) MAXALIGN(ABSADDR(bucket->bottom)),
|
||||
heapTuple,
|
||||
HEAPTUPLESIZE);
|
||||
memmove((char *) MAXALIGN(ABSADDR(bucket->bottom)) + HEAPTUPLESIZE,
|
||||
heapTuple->t_data,
|
||||
heapTuple->t_len);
|
||||
bucket->bottom = ((RelativeAddr) MAXALIGN(bucket->bottom) +
|
||||
heapTuple->t_len + HEAPTUPLESIZE);
|
||||
}
|
||||
HashJoinTuple hashTuple;
|
||||
int hashTupleSize;
|
||||
|
||||
hashTupleSize = MAXALIGN(sizeof(*hashTuple)) + heapTuple->t_len;
|
||||
hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
|
||||
hashTupleSize);
|
||||
if (hashTuple == NULL)
|
||||
elog(ERROR, "Insufficient memory for hash table.");
|
||||
memcpy((char *) & hashTuple->htup,
|
||||
(char *) heapTuple,
|
||||
sizeof(hashTuple->htup));
|
||||
hashTuple->htup.t_data = (HeapTupleHeader)
|
||||
(((char *) hashTuple) + MAXALIGN(sizeof(*hashTuple)));
|
||||
memcpy((char *) hashTuple->htup.t_data,
|
||||
(char *) heapTuple->t_data,
|
||||
heapTuple->t_len);
|
||||
hashTuple->next = hashtable->buckets[bucketno];
|
||||
hashtable->buckets[bucketno] = hashTuple;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -527,31 +493,14 @@ ExecHashTableInsert(HashJoinTable hashtable,
|
||||
* put the tuple into a tmp file for other batches
|
||||
* -----------------
|
||||
*/
|
||||
batchno = (nbatch * (bucketno - hashtable->nbuckets)) /
|
||||
int batchno = (hashtable->nbatch * (bucketno - hashtable->nbuckets)) /
|
||||
(hashtable->totalbuckets - hashtable->nbuckets);
|
||||
buffer = ABSADDR(hashtable->batch) + batchno * BLCKSZ;
|
||||
batchSizes[batchno]++;
|
||||
pos = (char *)
|
||||
ExecHashJoinSaveTuple(heapTuple,
|
||||
buffer,
|
||||
batches[batchno],
|
||||
(char *) ABSADDR(batchPos[batchno]));
|
||||
batchPos[batchno] = RELADDR(pos);
|
||||
hashtable->innerBatchSize[batchno]++;
|
||||
ExecHashJoinSaveTuple(heapTuple,
|
||||
hashtable->innerBatchFile[batchno]);
|
||||
}
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
* ExecHashTableDestroy
|
||||
*
|
||||
* destroy a hash table
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
void
|
||||
ExecHashTableDestroy(HashJoinTable hashtable)
|
||||
{
|
||||
pfree(hashtable);
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
* ExecHashGetBucket
|
||||
*
|
||||
@ -567,12 +516,12 @@ ExecHashGetBucket(HashJoinTable hashtable,
|
||||
Datum keyval;
|
||||
bool isNull;
|
||||
|
||||
|
||||
/* ----------------
|
||||
* Get the join attribute value of the tuple
|
||||
* ----------------
|
||||
*
|
||||
* ...It's quick hack - use ExecEvalExpr instead of ExecEvalVar:
|
||||
* hashkey may be T_ArrayRef, not just T_Var. - vadim 04/22/97
|
||||
* ----------------
|
||||
*/
|
||||
keyval = ExecEvalExpr((Node *) hashkey, econtext, &isNull, NULL);
|
||||
|
||||
@ -603,62 +552,6 @@ ExecHashGetBucket(HashJoinTable hashtable,
|
||||
return bucketno;
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
* ExecHashOverflowInsert
|
||||
*
|
||||
* insert into the overflow area of a hash bucket
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
static void
|
||||
ExecHashOverflowInsert(HashJoinTable hashtable,
|
||||
HashBucket bucket,
|
||||
HeapTuple heapTuple)
|
||||
{
|
||||
OverflowTuple otuple;
|
||||
RelativeAddr newend;
|
||||
OverflowTuple firstotuple;
|
||||
OverflowTuple lastotuple;
|
||||
|
||||
firstotuple = (OverflowTuple) ABSADDR(bucket->firstotuple);
|
||||
lastotuple = (OverflowTuple) ABSADDR(bucket->lastotuple);
|
||||
/* ----------------
|
||||
* see if we run out of overflow space
|
||||
* ----------------
|
||||
*/
|
||||
newend = (RelativeAddr) MAXALIGN(hashtable->overflownext + sizeof(*otuple)
|
||||
+ heapTuple->t_len + HEAPTUPLESIZE);
|
||||
if (newend > hashtable->bottom)
|
||||
elog(ERROR,
|
||||
"hash table out of memory. Use -B parameter to increase buffers.");
|
||||
|
||||
/* ----------------
|
||||
* establish the overflow chain
|
||||
* ----------------
|
||||
*/
|
||||
otuple = (OverflowTuple) ABSADDR(hashtable->overflownext);
|
||||
hashtable->overflownext = newend;
|
||||
if (firstotuple == NULL)
|
||||
bucket->firstotuple = bucket->lastotuple = RELADDR(otuple);
|
||||
else
|
||||
{
|
||||
lastotuple->next = RELADDR(otuple);
|
||||
bucket->lastotuple = RELADDR(otuple);
|
||||
}
|
||||
|
||||
/* ----------------
|
||||
* copy the tuple into the overflow area
|
||||
* ----------------
|
||||
*/
|
||||
otuple->next = -1;
|
||||
otuple->tuple = RELADDR(MAXALIGN(((char *) otuple + sizeof(*otuple))));
|
||||
memmove(ABSADDR(otuple->tuple),
|
||||
heapTuple,
|
||||
HEAPTUPLESIZE);
|
||||
memmove(ABSADDR(otuple->tuple) + HEAPTUPLESIZE,
|
||||
heapTuple->t_data,
|
||||
heapTuple->t_len);
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
* ExecScanHashBucket
|
||||
*
|
||||
@ -667,95 +560,46 @@ ExecHashOverflowInsert(HashJoinTable hashtable,
|
||||
*/
|
||||
HeapTuple
|
||||
ExecScanHashBucket(HashJoinState *hjstate,
|
||||
HashBucket bucket,
|
||||
HeapTuple curtuple,
|
||||
List *hjclauses,
|
||||
ExprContext *econtext)
|
||||
{
|
||||
HeapTuple heapTuple;
|
||||
bool qualResult;
|
||||
OverflowTuple otuple = NULL;
|
||||
OverflowTuple curotuple;
|
||||
TupleTableSlot *inntuple;
|
||||
OverflowTuple firstotuple;
|
||||
OverflowTuple lastotuple;
|
||||
HashJoinTable hashtable;
|
||||
HashJoinTable hashtable = hjstate->hj_HashTable;
|
||||
HashJoinTuple hashTuple = hjstate->hj_CurTuple;
|
||||
|
||||
hashtable = hjstate->hj_HashTable;
|
||||
firstotuple = (OverflowTuple) ABSADDR(bucket->firstotuple);
|
||||
lastotuple = (OverflowTuple) ABSADDR(bucket->lastotuple);
|
||||
|
||||
/* ----------------
|
||||
* search the hash bucket
|
||||
* ----------------
|
||||
/* hj_CurTuple is NULL to start scanning a new bucket, or the address
|
||||
* of the last tuple returned from the current bucket.
|
||||
*/
|
||||
if (curtuple == NULL || curtuple < (HeapTuple) ABSADDR(bucket->bottom))
|
||||
if (hashTuple == NULL)
|
||||
{
|
||||
if (curtuple == NULL)
|
||||
heapTuple = (HeapTuple)
|
||||
MAXALIGN(ABSADDR(bucket->top));
|
||||
else
|
||||
heapTuple = (HeapTuple)
|
||||
MAXALIGN(((char *) curtuple + curtuple->t_len + HEAPTUPLESIZE));
|
||||
|
||||
while (heapTuple < (HeapTuple) ABSADDR(bucket->bottom))
|
||||
{
|
||||
|
||||
heapTuple->t_data = (HeapTupleHeader)
|
||||
((char *) heapTuple + HEAPTUPLESIZE);
|
||||
|
||||
inntuple = ExecStoreTuple(heapTuple, /* tuple to store */
|
||||
hjstate->hj_HashTupleSlot, /* slot */
|
||||
InvalidBuffer, /* tuple has no buffer */
|
||||
false); /* do not pfree this tuple */
|
||||
|
||||
econtext->ecxt_innertuple = inntuple;
|
||||
qualResult = ExecQual((List *) hjclauses, econtext);
|
||||
|
||||
if (qualResult)
|
||||
return heapTuple;
|
||||
|
||||
heapTuple = (HeapTuple)
|
||||
MAXALIGN(((char *) heapTuple + heapTuple->t_len + HEAPTUPLESIZE));
|
||||
}
|
||||
|
||||
if (firstotuple == NULL)
|
||||
return NULL;
|
||||
otuple = firstotuple;
|
||||
hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
|
||||
}
|
||||
else
|
||||
{
|
||||
hashTuple = hashTuple->next;
|
||||
}
|
||||
|
||||
/* ----------------
|
||||
* search the overflow area of the hash bucket
|
||||
* ----------------
|
||||
*/
|
||||
if (otuple == NULL)
|
||||
while (hashTuple != NULL)
|
||||
{
|
||||
curotuple = hjstate->hj_CurOTuple;
|
||||
otuple = (OverflowTuple) ABSADDR(curotuple->next);
|
||||
}
|
||||
|
||||
while (otuple != NULL)
|
||||
{
|
||||
heapTuple = (HeapTuple) ABSADDR(otuple->tuple);
|
||||
heapTuple->t_data = (HeapTupleHeader)
|
||||
((char *) heapTuple + HEAPTUPLESIZE);
|
||||
HeapTuple heapTuple = & hashTuple->htup;
|
||||
TupleTableSlot *inntuple;
|
||||
bool qualResult;
|
||||
|
||||
/* insert hashtable's tuple into exec slot so ExecQual sees it */
|
||||
inntuple = ExecStoreTuple(heapTuple, /* tuple to store */
|
||||
hjstate->hj_HashTupleSlot, /* slot */
|
||||
InvalidBuffer, /* SP?? this tuple has
|
||||
* no buffer */
|
||||
InvalidBuffer,
|
||||
false); /* do not pfree this tuple */
|
||||
|
||||
econtext->ecxt_innertuple = inntuple;
|
||||
qualResult = ExecQual((List *) hjclauses, econtext);
|
||||
|
||||
qualResult = ExecQual(hjclauses, econtext);
|
||||
|
||||
if (qualResult)
|
||||
{
|
||||
hjstate->hj_CurOTuple = otuple;
|
||||
hjstate->hj_CurTuple = hashTuple;
|
||||
return heapTuple;
|
||||
}
|
||||
|
||||
otuple = (OverflowTuple) ABSADDR(otuple->next);
|
||||
hashTuple = hashTuple->next;
|
||||
}
|
||||
|
||||
/* ----------------
|
||||
@ -819,60 +663,57 @@ hashFunc(Datum key, int len, bool byVal)
|
||||
* reset hash table header for new batch
|
||||
*
|
||||
* ntuples is the number of tuples in the inner relation's batch
|
||||
* (which we currently don't actually use...)
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
void
|
||||
ExecHashTableReset(HashJoinTable hashtable, int ntuples)
|
||||
ExecHashTableReset(HashJoinTable hashtable, long ntuples)
|
||||
{
|
||||
MemoryContext oldcxt;
|
||||
int nbuckets = hashtable->nbuckets;
|
||||
int i;
|
||||
HashBucket bucket;
|
||||
|
||||
/*
|
||||
* We can reset the number of hashbuckets since we are going to
|
||||
* recalculate the hash values of all the tuples in the new batch
|
||||
* anyway. We might as well spread out the hash values as much as
|
||||
* we can within the available space. Note we must set nbuckets
|
||||
* equal to totalbuckets since we will NOT generate any new output
|
||||
* batches after this point.
|
||||
* Release all the hash buckets and tuples acquired in the prior pass,
|
||||
* and reinitialize the portal for a new pass.
|
||||
*/
|
||||
hashtable->nbuckets = hashtable->totalbuckets =
|
||||
(int) (hashtable->bottom / (hashtable->bucketsize * FUDGE_FAC));
|
||||
oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
|
||||
EndPortalAllocMode();
|
||||
StartPortalAllocMode(DefaultAllocMode, 0);
|
||||
|
||||
/*
|
||||
* reinitialize the overflow area to empty, and reinit each hash bucket.
|
||||
* We still use the same number of physical buckets as in the first pass.
|
||||
* (It could be different; but we already decided how many buckets would
|
||||
* be appropriate for the allowed memory, so stick with that number.)
|
||||
* We MUST set totalbuckets to equal nbuckets, because from now on
|
||||
* no tuples will go out to temp files; there are no more virtual buckets,
|
||||
* only real buckets. (This implies that tuples will go into different
|
||||
* bucket numbers than they did on the first pass, but that's OK.)
|
||||
*/
|
||||
hashtable->overflownext = hashtable->top + hashtable->bucketsize *
|
||||
hashtable->nbuckets;
|
||||
Assert(hashtable->overflownext < hashtable->bottom);
|
||||
hashtable->totalbuckets = nbuckets;
|
||||
|
||||
bucket = (HashBucket) ABSADDR(hashtable->top);
|
||||
for (i = 0; i < hashtable->nbuckets; i++)
|
||||
/* Reallocate and reinitialize the hash bucket headers. */
|
||||
hashtable->buckets = (HashJoinTuple *)
|
||||
palloc(nbuckets * sizeof(HashJoinTuple));
|
||||
|
||||
if (hashtable->buckets == NULL)
|
||||
elog(ERROR, "Insufficient memory for hash table.");
|
||||
|
||||
for (i = 0; i < nbuckets; i++)
|
||||
{
|
||||
bucket->top = RELADDR((char *) bucket + MAXALIGN(sizeof(*bucket)));
|
||||
bucket->bottom = bucket->top;
|
||||
bucket->firstotuple = bucket->lastotuple = -1;
|
||||
bucket = (HashBucket) ((char *) bucket + hashtable->bucketsize);
|
||||
hashtable->buckets[i] = NULL;
|
||||
}
|
||||
|
||||
hashtable->pcount = hashtable->nprocess;
|
||||
MemoryContextSwitchTo(oldcxt);
|
||||
}
|
||||
|
||||
void
|
||||
ExecReScanHash(Hash *node, ExprContext *exprCtxt, Plan *parent)
|
||||
{
|
||||
HashState *hashstate = node->hashstate;
|
||||
|
||||
if (hashstate->hashBatches != NULL)
|
||||
{
|
||||
pfree(hashstate->hashBatches);
|
||||
hashstate->hashBatches = NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* if chgParam of subnode is not null then plan will be re-scanned by
|
||||
* first ExecProcNode.
|
||||
*/
|
||||
if (((Plan *) node)->lefttree->chgParam == NULL)
|
||||
ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node);
|
||||
|
||||
}
|
||||
|
@ -7,15 +7,12 @@
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $Header: /cvsroot/pgsql/src/backend/executor/nodeHashjoin.c,v 1.19 1999/05/09 00:53:21 tgl Exp $
|
||||
* $Header: /cvsroot/pgsql/src/backend/executor/nodeHashjoin.c,v 1.20 1999/05/18 21:33:06 tgl Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include <sys/types.h>
|
||||
#include <string.h>
|
||||
#include <sys/file.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
@ -25,19 +22,15 @@
|
||||
#include "executor/nodeHashjoin.h"
|
||||
#include "optimizer/clauses.h" /* for get_leftop */
|
||||
|
||||
static TupleTableSlot *
|
||||
ExecHashJoinOuterGetTuple(Plan *node, Plan *parent, HashJoinState *hjstate);
|
||||
|
||||
static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate, char *buffer,
|
||||
File file, TupleTableSlot *tupleSlot, int *block, char **position);
|
||||
|
||||
static int ExecHashJoinGetBatch(int bucketno, HashJoinTable hashtable,
|
||||
int nbatch);
|
||||
|
||||
static TupleTableSlot *ExecHashJoinOuterGetTuple(Plan *node, Plan *parent,
|
||||
HashJoinState *hjstate);
|
||||
static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
|
||||
BufFile *file,
|
||||
TupleTableSlot *tupleSlot);
|
||||
static int ExecHashJoinGetBatch(int bucketno, HashJoinTable hashtable);
|
||||
static int ExecHashJoinNewBatch(HashJoinState *hjstate);
|
||||
|
||||
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
* ExecHashJoin
|
||||
*
|
||||
@ -61,27 +54,14 @@ ExecHashJoin(HashJoin *node)
|
||||
TupleTableSlot *inntuple;
|
||||
Var *outerVar;
|
||||
ExprContext *econtext;
|
||||
|
||||
HashJoinTable hashtable;
|
||||
int bucketno;
|
||||
HashBucket bucket;
|
||||
HeapTuple curtuple;
|
||||
|
||||
bool qualResult;
|
||||
|
||||
TupleTableSlot *outerTupleSlot;
|
||||
TupleTableSlot *innerTupleSlot;
|
||||
int nbatch;
|
||||
int curbatch;
|
||||
File *outerbatches;
|
||||
RelativeAddr *outerbatchPos;
|
||||
Var *innerhashkey;
|
||||
int batch;
|
||||
int batchno;
|
||||
char *buffer;
|
||||
int i;
|
||||
bool hashPhaseDone;
|
||||
char *pos;
|
||||
|
||||
/* ----------------
|
||||
* get information from HashJoin node
|
||||
@ -103,8 +83,6 @@ ExecHashJoin(HashJoin *node)
|
||||
* -----------------
|
||||
*/
|
||||
hashtable = hjstate->hj_HashTable;
|
||||
bucket = hjstate->hj_CurBucket;
|
||||
curtuple = hjstate->hj_CurTuple;
|
||||
|
||||
/* --------------------
|
||||
* initialize expression context
|
||||
@ -121,13 +99,13 @@ ExecHashJoin(HashJoin *node)
|
||||
if (!isDone)
|
||||
return result;
|
||||
}
|
||||
|
||||
/* ----------------
|
||||
* if this is the first call, build the hash table for inner relation
|
||||
* ----------------
|
||||
*/
|
||||
if (!hashPhaseDone)
|
||||
{ /* if the hash phase not completed */
|
||||
hashtable = node->hashjointable;
|
||||
if (hashtable == NULL)
|
||||
{ /* if the hash table has not been created */
|
||||
/* ----------------
|
||||
@ -143,45 +121,25 @@ ExecHashJoin(HashJoin *node)
|
||||
* execute the Hash node, to build the hash table
|
||||
* ----------------
|
||||
*/
|
||||
hashNode->hashtable = hashtable;
|
||||
hashNode->hashstate->hashtable = hashtable;
|
||||
innerTupleSlot = ExecProcNode((Plan *) hashNode, (Plan *) node);
|
||||
}
|
||||
bucket = NULL;
|
||||
curtuple = NULL;
|
||||
curbatch = 0;
|
||||
node->hashdone = true;
|
||||
/* ----------------
|
||||
* Open temp files for outer batches, if needed.
|
||||
* Note that file buffers are palloc'd in regular executor context.
|
||||
* ----------------
|
||||
*/
|
||||
for (i = 0; i < hashtable->nbatch; i++)
|
||||
{
|
||||
File tfile = OpenTemporaryFile();
|
||||
Assert(tfile >= 0);
|
||||
hashtable->outerBatchFile[i] = BufFileCreate(tfile);
|
||||
}
|
||||
}
|
||||
else if (hashtable == NULL)
|
||||
return NULL;
|
||||
|
||||
nbatch = hashtable->nbatch;
|
||||
outerbatches = hjstate->hj_OuterBatches;
|
||||
if (nbatch > 0 && outerbatches == NULL)
|
||||
{ /* if needs hash partition */
|
||||
/* -----------------
|
||||
* allocate space for file descriptors of outer batch files
|
||||
* then open the batch files in the current process
|
||||
* -----------------
|
||||
*/
|
||||
innerhashkey = hashNode->hashkey;
|
||||
hjstate->hj_InnerHashKey = innerhashkey;
|
||||
outerbatches = (File *) palloc(nbatch * sizeof(File));
|
||||
for (i = 0; i < nbatch; i++)
|
||||
{
|
||||
outerbatches[i] = OpenTemporaryFile();
|
||||
}
|
||||
hjstate->hj_OuterBatches = outerbatches;
|
||||
|
||||
/* ------------------
|
||||
* get the inner batch file descriptors from the
|
||||
* hash node
|
||||
* ------------------
|
||||
*/
|
||||
hjstate->hj_InnerBatches = hashNode->hashstate->hashBatches;
|
||||
}
|
||||
outerbatchPos = (RelativeAddr *) ABSADDR(hashtable->outerbatchPos);
|
||||
curbatch = hashtable->curbatch;
|
||||
|
||||
/* ----------------
|
||||
* Now get an outer tuple and probe into the hash table for matches
|
||||
* ----------------
|
||||
@ -189,185 +147,106 @@ ExecHashJoin(HashJoin *node)
|
||||
outerTupleSlot = hjstate->jstate.cs_OuterTupleSlot;
|
||||
outerVar = get_leftop(clause);
|
||||
|
||||
bucketno = -1; /* if bucketno remains -1, means use old
|
||||
* outer tuple */
|
||||
if (TupIsNull(outerTupleSlot))
|
||||
for (;;)
|
||||
{
|
||||
|
||||
/*
|
||||
* if the current outer tuple is nil, get a new one
|
||||
*/
|
||||
outerTupleSlot = (TupleTableSlot *)
|
||||
ExecHashJoinOuterGetTuple(outerNode, (Plan *) node, hjstate);
|
||||
|
||||
while (curbatch <= nbatch && TupIsNull(outerTupleSlot))
|
||||
if (TupIsNull(outerTupleSlot))
|
||||
{
|
||||
|
||||
/*
|
||||
* if the current batch runs out, switch to new batch
|
||||
*/
|
||||
curbatch = ExecHashJoinNewBatch(hjstate);
|
||||
if (curbatch > nbatch)
|
||||
outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode,
|
||||
(Plan *) node,
|
||||
hjstate);
|
||||
if (TupIsNull(outerTupleSlot))
|
||||
{
|
||||
|
||||
/*
|
||||
* when the last batch runs out, clean up
|
||||
* when the last batch runs out, clean up and exit
|
||||
*/
|
||||
ExecHashTableDestroy(hashtable);
|
||||
hjstate->hj_HashTable = NULL;
|
||||
return NULL;
|
||||
}
|
||||
else
|
||||
outerTupleSlot = (TupleTableSlot *)
|
||||
ExecHashJoinOuterGetTuple(outerNode, (Plan *) node, hjstate);
|
||||
|
||||
/*
|
||||
* now we have an outer tuple, find the corresponding bucket for
|
||||
* this tuple from the hash table
|
||||
*/
|
||||
econtext->ecxt_outertuple = outerTupleSlot;
|
||||
hjstate->hj_CurBucketNo = ExecHashGetBucket(hashtable, econtext,
|
||||
outerVar);
|
||||
hjstate->hj_CurTuple = NULL;
|
||||
|
||||
/* ----------------
|
||||
* Now we've got an outer tuple and the corresponding hash bucket,
|
||||
* but this tuple may not belong to the current batch.
|
||||
* This need only be checked in the first pass.
|
||||
* ----------------
|
||||
*/
|
||||
if (hashtable->curbatch == 0)
|
||||
{
|
||||
int batch = ExecHashJoinGetBatch(hjstate->hj_CurBucketNo,
|
||||
hashtable);
|
||||
if (batch > 0)
|
||||
{
|
||||
/*
|
||||
* Need to postpone this outer tuple to a later batch.
|
||||
* Save it in the corresponding outer-batch file.
|
||||
*/
|
||||
int batchno = batch - 1;
|
||||
hashtable->outerBatchSize[batchno]++;
|
||||
ExecHashJoinSaveTuple(outerTupleSlot->val,
|
||||
hashtable->outerBatchFile[batchno]);
|
||||
ExecClearTuple(outerTupleSlot);
|
||||
continue; /* loop around for a new outer tuple */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* now we get an outer tuple, find the corresponding bucket for
|
||||
* this tuple from the hash table
|
||||
* OK, scan the selected hash bucket for matches
|
||||
*/
|
||||
econtext->ecxt_outertuple = outerTupleSlot;
|
||||
|
||||
#ifdef HJDEBUG
|
||||
printf("Probing ");
|
||||
#endif
|
||||
bucketno = ExecHashGetBucket(hashtable, econtext, outerVar);
|
||||
bucket = (HashBucket) (ABSADDR(hashtable->top)
|
||||
+ bucketno * hashtable->bucketsize);
|
||||
}
|
||||
|
||||
for (;;)
|
||||
{
|
||||
/* ----------------
|
||||
* Now we've got an outer tuple and the corresponding hash bucket,
|
||||
* but this tuple may not belong to the current batch.
|
||||
* ----------------
|
||||
*/
|
||||
if (curbatch == 0 && bucketno != -1) /* if this is the first
|
||||
* pass */
|
||||
batch = ExecHashJoinGetBatch(bucketno, hashtable, nbatch);
|
||||
else
|
||||
batch = 0;
|
||||
if (batch > 0)
|
||||
for (;;)
|
||||
{
|
||||
|
||||
curtuple = ExecScanHashBucket(hjstate,
|
||||
hjclauses,
|
||||
econtext);
|
||||
if (curtuple == NULL)
|
||||
break; /* out of matches */
|
||||
/*
|
||||
* if the current outer tuple does not belong to the current
|
||||
* batch, save to the tmp file for the corresponding batch.
|
||||
* we've got a match, but still need to test qpqual
|
||||
*/
|
||||
buffer = ABSADDR(hashtable->batch) + (batch - 1) * BLCKSZ;
|
||||
batchno = batch - 1;
|
||||
pos = ExecHashJoinSaveTuple(outerTupleSlot->val,
|
||||
buffer,
|
||||
outerbatches[batchno],
|
||||
ABSADDR(outerbatchPos[batchno]));
|
||||
|
||||
outerbatchPos[batchno] = RELADDR(pos);
|
||||
}
|
||||
else if (bucket != NULL)
|
||||
{
|
||||
do
|
||||
inntuple = ExecStoreTuple(curtuple,
|
||||
hjstate->hj_HashTupleSlot,
|
||||
InvalidBuffer,
|
||||
false); /* don't pfree this tuple */
|
||||
econtext->ecxt_innertuple = inntuple;
|
||||
qualResult = ExecQual(qual, econtext);
|
||||
/* ----------------
|
||||
* if we pass the qual, then save state for next call and
|
||||
* have ExecProject form the projection, store it
|
||||
* in the tuple table, and return the slot.
|
||||
* ----------------
|
||||
*/
|
||||
if (qualResult)
|
||||
{
|
||||
ProjectionInfo *projInfo;
|
||||
TupleTableSlot *result;
|
||||
bool isDone;
|
||||
|
||||
/*
|
||||
* scan the hash bucket for matches
|
||||
*/
|
||||
curtuple = ExecScanHashBucket(hjstate,
|
||||
bucket,
|
||||
curtuple,
|
||||
hjclauses,
|
||||
econtext);
|
||||
|
||||
if (curtuple != NULL)
|
||||
{
|
||||
|
||||
/*
|
||||
* we've got a match, but still need to test qpqual
|
||||
*/
|
||||
inntuple = ExecStoreTuple(curtuple,
|
||||
hjstate->hj_HashTupleSlot,
|
||||
InvalidBuffer,
|
||||
false); /* don't pfree this
|
||||
* tuple */
|
||||
|
||||
econtext->ecxt_innertuple = inntuple;
|
||||
|
||||
/* ----------------
|
||||
* test to see if we pass the qualification
|
||||
* ----------------
|
||||
*/
|
||||
qualResult = ExecQual((List *) qual, econtext);
|
||||
|
||||
/* ----------------
|
||||
* if we pass the qual, then save state for next call and
|
||||
* have ExecProject form the projection, store it
|
||||
* in the tuple table, and return the slot.
|
||||
* ----------------
|
||||
*/
|
||||
if (qualResult)
|
||||
{
|
||||
ProjectionInfo *projInfo;
|
||||
TupleTableSlot *result;
|
||||
bool isDone;
|
||||
|
||||
hjstate->hj_CurBucket = bucket;
|
||||
hjstate->hj_CurTuple = curtuple;
|
||||
hashtable->curbatch = curbatch;
|
||||
hjstate->jstate.cs_OuterTupleSlot = outerTupleSlot;
|
||||
|
||||
projInfo = hjstate->jstate.cs_ProjInfo;
|
||||
result = ExecProject(projInfo, &isDone);
|
||||
hjstate->jstate.cs_TupFromTlist = !isDone;
|
||||
return result;
|
||||
}
|
||||
}
|
||||
hjstate->jstate.cs_OuterTupleSlot = outerTupleSlot;
|
||||
projInfo = hjstate->jstate.cs_ProjInfo;
|
||||
result = ExecProject(projInfo, &isDone);
|
||||
hjstate->jstate.cs_TupFromTlist = !isDone;
|
||||
return result;
|
||||
}
|
||||
while (curtuple != NULL);
|
||||
}
|
||||
|
||||
/* ----------------
|
||||
* Now the current outer tuple has run out of matches,
|
||||
* so we free it and get a new outer tuple.
|
||||
* so we free it and loop around to get a new outer tuple.
|
||||
* ----------------
|
||||
*/
|
||||
outerTupleSlot = (TupleTableSlot *)
|
||||
ExecHashJoinOuterGetTuple(outerNode, (Plan *) node, hjstate);
|
||||
|
||||
while (curbatch <= nbatch && TupIsNull(outerTupleSlot))
|
||||
{
|
||||
|
||||
/*
|
||||
* if the current batch runs out, switch to new batch
|
||||
*/
|
||||
curbatch = ExecHashJoinNewBatch(hjstate);
|
||||
if (curbatch > nbatch)
|
||||
{
|
||||
|
||||
/*
|
||||
* when the last batch runs out, clean up
|
||||
*/
|
||||
ExecHashTableDestroy(hashtable);
|
||||
hjstate->hj_HashTable = NULL;
|
||||
return NULL;
|
||||
}
|
||||
else
|
||||
outerTupleSlot = (TupleTableSlot *)
|
||||
ExecHashJoinOuterGetTuple(outerNode, (Plan *) node, hjstate);
|
||||
}
|
||||
|
||||
/* ----------------
|
||||
* Now get the corresponding hash bucket for the new
|
||||
* outer tuple.
|
||||
* ----------------
|
||||
*/
|
||||
econtext->ecxt_outertuple = outerTupleSlot;
|
||||
#ifdef HJDEBUG
|
||||
printf("Probing ");
|
||||
#endif
|
||||
bucketno = ExecHashGetBucket(hashtable, econtext, outerVar);
|
||||
bucket = (HashBucket) (ABSADDR(hashtable->top)
|
||||
+ bucketno * hashtable->bucketsize);
|
||||
curtuple = NULL;
|
||||
ExecClearTuple(outerTupleSlot);
|
||||
}
|
||||
}
|
||||
|
||||
@ -399,7 +278,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, Plan *parent)
|
||||
node->hashjoinstate = hjstate;
|
||||
|
||||
/* ----------------
|
||||
* Miscellanious initialization
|
||||
* Miscellaneous initialization
|
||||
*
|
||||
* + assign node's base_id
|
||||
* + assign debugging hooks and
|
||||
@ -456,22 +335,16 @@ ExecInitHashJoin(HashJoin *node, EState *estate, Plan *parent)
|
||||
ExecAssignProjectionInfo((Plan *) node, &hjstate->jstate);
|
||||
|
||||
/* ----------------
|
||||
* XXX comment me
|
||||
* initialize hash-specific info
|
||||
* ----------------
|
||||
*/
|
||||
|
||||
node->hashdone = false;
|
||||
|
||||
hjstate->hj_HashTable = (HashJoinTable) NULL;
|
||||
hjstate->hj_HashTableShmId = (IpcMemoryId) 0;
|
||||
hjstate->hj_CurBucket = (HashBucket) NULL;
|
||||
hjstate->hj_CurTuple = (HeapTuple) NULL;
|
||||
hjstate->hj_CurOTuple = (OverflowTuple) NULL;
|
||||
hjstate->hj_CurBucketNo = 0;
|
||||
hjstate->hj_CurTuple = (HashJoinTuple) NULL;
|
||||
hjstate->hj_InnerHashKey = (Var *) NULL;
|
||||
hjstate->hj_OuterBatches = (File *) NULL;
|
||||
hjstate->hj_InnerBatches = (File *) NULL;
|
||||
hjstate->hj_OuterReadPos = (char *) NULL;
|
||||
hjstate->hj_OuterReadBlk = (int) 0;
|
||||
|
||||
hjstate->jstate.cs_OuterTupleSlot = (TupleTableSlot *) NULL;
|
||||
hjstate->jstate.cs_TupFromTlist = (bool) false;
|
||||
@ -554,93 +427,69 @@ ExecEndHashJoin(HashJoin *node)
|
||||
static TupleTableSlot *
|
||||
ExecHashJoinOuterGetTuple(Plan *node, Plan *parent, HashJoinState *hjstate)
|
||||
{
|
||||
HashJoinTable hashtable = hjstate->hj_HashTable;
|
||||
int curbatch = hashtable->curbatch;
|
||||
TupleTableSlot *slot;
|
||||
HashJoinTable hashtable;
|
||||
int curbatch;
|
||||
File *outerbatches;
|
||||
char *outerreadPos;
|
||||
int batchno;
|
||||
char *outerreadBuf;
|
||||
int outerreadBlk;
|
||||
|
||||
hashtable = hjstate->hj_HashTable;
|
||||
curbatch = hashtable->curbatch;
|
||||
|
||||
if (curbatch == 0)
|
||||
{ /* if it is the first pass */
|
||||
slot = ExecProcNode(node, parent);
|
||||
return slot;
|
||||
if (! TupIsNull(slot))
|
||||
return slot;
|
||||
/*
|
||||
* We have just reached the end of the first pass.
|
||||
* Try to switch to a saved batch.
|
||||
*/
|
||||
curbatch = ExecHashJoinNewBatch(hjstate);
|
||||
}
|
||||
|
||||
/*
|
||||
* otherwise, read from the tmp files
|
||||
* Try to read from a temp file.
|
||||
* Loop allows us to advance to new batch as needed.
|
||||
*/
|
||||
outerbatches = hjstate->hj_OuterBatches;
|
||||
outerreadPos = hjstate->hj_OuterReadPos;
|
||||
outerreadBlk = hjstate->hj_OuterReadBlk;
|
||||
outerreadBuf = ABSADDR(hashtable->readbuf);
|
||||
batchno = curbatch - 1;
|
||||
while (curbatch <= hashtable->nbatch)
|
||||
{
|
||||
slot = ExecHashJoinGetSavedTuple(hjstate,
|
||||
hashtable->outerBatchFile[curbatch-1],
|
||||
hjstate->hj_OuterTupleSlot);
|
||||
if (! TupIsNull(slot))
|
||||
return slot;
|
||||
curbatch = ExecHashJoinNewBatch(hjstate);
|
||||
}
|
||||
|
||||
slot = ExecHashJoinGetSavedTuple(hjstate,
|
||||
outerreadBuf,
|
||||
outerbatches[batchno],
|
||||
hjstate->hj_OuterTupleSlot,
|
||||
&outerreadBlk,
|
||||
&outerreadPos);
|
||||
|
||||
hjstate->hj_OuterReadPos = outerreadPos;
|
||||
hjstate->hj_OuterReadBlk = outerreadBlk;
|
||||
|
||||
return slot;
|
||||
/* Out of batches... */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
* ExecHashJoinGetSavedTuple
|
||||
*
|
||||
* read the next tuple from a tmp file using a certain buffer
|
||||
* read the next tuple from a tmp file
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
|
||||
static TupleTableSlot *
|
||||
ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
|
||||
char *buffer,
|
||||
File file,
|
||||
TupleTableSlot *tupleSlot,
|
||||
int *block, /* return parameter */
|
||||
char **position) /* return parameter */
|
||||
BufFile *file,
|
||||
TupleTableSlot *tupleSlot)
|
||||
{
|
||||
char *bufstart;
|
||||
char *bufend;
|
||||
int cc;
|
||||
HeapTuple heapTuple;
|
||||
HashJoinTable hashtable;
|
||||
HeapTupleData htup;
|
||||
size_t nread;
|
||||
HeapTuple heapTuple;
|
||||
|
||||
hashtable = hjstate->hj_HashTable;
|
||||
bufend = buffer + *(long *) buffer;
|
||||
bufstart = (char *) (buffer + sizeof(long));
|
||||
if ((*position == NULL) || (*position >= bufend))
|
||||
{
|
||||
if (*position == NULL)
|
||||
(*block) = 0;
|
||||
else
|
||||
(*block)++;
|
||||
FileSeek(file, *block * BLCKSZ, SEEK_SET);
|
||||
cc = FileRead(file, buffer, BLCKSZ);
|
||||
NDirectFileRead++;
|
||||
if (cc < 0)
|
||||
perror("FileRead");
|
||||
if (cc == 0) /* end of file */
|
||||
return NULL;
|
||||
else
|
||||
(*position) = bufstart;
|
||||
}
|
||||
heapTuple = (HeapTuple) (*position);
|
||||
nread = BufFileRead(file, (void *) &htup, sizeof(HeapTupleData));
|
||||
if (nread == 0)
|
||||
return NULL; /* end of file */
|
||||
if (nread != sizeof(HeapTupleData))
|
||||
elog(ERROR, "Read from hashjoin temp file failed");
|
||||
heapTuple = palloc(HEAPTUPLESIZE + htup.t_len);
|
||||
memcpy((char *) heapTuple, (char *) &htup, sizeof(HeapTupleData));
|
||||
heapTuple->t_data = (HeapTupleHeader)
|
||||
((char *) heapTuple + HEAPTUPLESIZE);
|
||||
(*position) = (char *) MAXALIGN(*position +
|
||||
heapTuple->t_len + HEAPTUPLESIZE);
|
||||
|
||||
return ExecStoreTuple(heapTuple, tupleSlot, InvalidBuffer, false);
|
||||
nread = BufFileRead(file, (void *) heapTuple->t_data, htup.t_len);
|
||||
if (nread != (size_t) htup.t_len)
|
||||
elog(ERROR, "Read from hashjoin temp file failed");
|
||||
return ExecStoreTuple(heapTuple, tupleSlot, InvalidBuffer, true);
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
@ -652,116 +501,80 @@ ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
|
||||
static int
|
||||
ExecHashJoinNewBatch(HashJoinState *hjstate)
|
||||
{
|
||||
File *innerBatches;
|
||||
File *outerBatches;
|
||||
int *innerBatchSizes;
|
||||
Var *innerhashkey;
|
||||
HashJoinTable hashtable;
|
||||
int nbatch;
|
||||
char *readPos;
|
||||
int readBlk;
|
||||
char *readBuf;
|
||||
HashJoinTable hashtable = hjstate->hj_HashTable;
|
||||
int nbatch = hashtable->nbatch;
|
||||
int newbatch = hashtable->curbatch + 1;
|
||||
long *innerBatchSize = hashtable->innerBatchSize;
|
||||
long *outerBatchSize = hashtable->outerBatchSize;
|
||||
BufFile *innerFile;
|
||||
TupleTableSlot *slot;
|
||||
ExprContext *econtext;
|
||||
int i;
|
||||
int cc;
|
||||
int newbatch;
|
||||
Var *innerhashkey;
|
||||
|
||||
hashtable = hjstate->hj_HashTable;
|
||||
outerBatches = hjstate->hj_OuterBatches;
|
||||
innerBatches = hjstate->hj_InnerBatches;
|
||||
nbatch = hashtable->nbatch;
|
||||
newbatch = hashtable->curbatch + 1;
|
||||
|
||||
/* ------------------
|
||||
* this is the last process, so it will do the cleanup and
|
||||
* batch-switching.
|
||||
* ------------------
|
||||
*/
|
||||
if (newbatch == 1)
|
||||
{
|
||||
|
||||
/*
|
||||
* if it is end of the first pass, flush all the last pages for
|
||||
* the batches.
|
||||
*/
|
||||
outerBatches = hjstate->hj_OuterBatches;
|
||||
for (i = 0; i < nbatch; i++)
|
||||
{
|
||||
cc = FileSeek(outerBatches[i], 0L, SEEK_END);
|
||||
if (cc < 0)
|
||||
perror("FileSeek");
|
||||
cc = FileWrite(outerBatches[i],
|
||||
ABSADDR(hashtable->batch) + i * BLCKSZ, BLCKSZ);
|
||||
NDirectFileWrite++;
|
||||
if (cc < 0)
|
||||
perror("FileWrite");
|
||||
}
|
||||
}
|
||||
if (newbatch > 1)
|
||||
{
|
||||
|
||||
/*
|
||||
* remove the previous outer batch
|
||||
* We no longer need the previous outer batch file;
|
||||
* close it right away to free disk space.
|
||||
*/
|
||||
FileUnlink(outerBatches[newbatch - 2]);
|
||||
BufFileClose(hashtable->outerBatchFile[newbatch - 2]);
|
||||
hashtable->outerBatchFile[newbatch - 2] = NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* rebuild the hash table for the new inner batch
|
||||
*/
|
||||
innerBatchSizes = (int *) ABSADDR(hashtable->innerbatchSizes);
|
||||
/* --------------
|
||||
* skip over empty inner batches
|
||||
* We can skip over any batches that are empty on either side.
|
||||
* Release associated temp files right away.
|
||||
* --------------
|
||||
*/
|
||||
while (newbatch <= nbatch && innerBatchSizes[newbatch - 1] == 0)
|
||||
while (newbatch <= nbatch &&
|
||||
(innerBatchSize[newbatch - 1] == 0L ||
|
||||
outerBatchSize[newbatch - 1] == 0L))
|
||||
{
|
||||
FileUnlink(outerBatches[newbatch - 1]);
|
||||
FileUnlink(innerBatches[newbatch - 1]);
|
||||
BufFileClose(hashtable->innerBatchFile[newbatch - 1]);
|
||||
hashtable->innerBatchFile[newbatch - 1] = NULL;
|
||||
BufFileClose(hashtable->outerBatchFile[newbatch - 1]);
|
||||
hashtable->outerBatchFile[newbatch - 1] = NULL;
|
||||
newbatch++;
|
||||
}
|
||||
|
||||
if (newbatch > nbatch)
|
||||
{
|
||||
hashtable->pcount = hashtable->nprocess;
|
||||
return newbatch; /* no more batches */
|
||||
|
||||
return newbatch;
|
||||
}
|
||||
ExecHashTableReset(hashtable, innerBatchSizes[newbatch - 1]);
|
||||
/*
|
||||
* Rewind inner and outer batch files for this batch,
|
||||
* so that we can start reading them.
|
||||
*/
|
||||
if (BufFileSeek(hashtable->outerBatchFile[newbatch - 1], 0L,
|
||||
SEEK_SET) != 0L)
|
||||
elog(ERROR, "Failed to rewind hash temp file");
|
||||
|
||||
innerFile = hashtable->innerBatchFile[newbatch - 1];
|
||||
|
||||
if (BufFileSeek(innerFile, 0L, SEEK_SET) != 0L)
|
||||
elog(ERROR, "Failed to rewind hash temp file");
|
||||
|
||||
/*
|
||||
* Reload the hash table with the new inner batch
|
||||
*/
|
||||
ExecHashTableReset(hashtable, innerBatchSize[newbatch - 1]);
|
||||
|
||||
econtext = hjstate->jstate.cs_ExprContext;
|
||||
innerhashkey = hjstate->hj_InnerHashKey;
|
||||
readPos = NULL;
|
||||
readBlk = 0;
|
||||
readBuf = ABSADDR(hashtable->readbuf);
|
||||
|
||||
while ((slot = ExecHashJoinGetSavedTuple(hjstate,
|
||||
readBuf,
|
||||
innerBatches[newbatch - 1],
|
||||
hjstate->hj_HashTupleSlot,
|
||||
&readBlk,
|
||||
&readPos))
|
||||
innerFile,
|
||||
hjstate->hj_HashTupleSlot))
|
||||
&& !TupIsNull(slot))
|
||||
{
|
||||
econtext->ecxt_innertuple = slot;
|
||||
ExecHashTableInsert(hashtable, econtext, innerhashkey, NULL);
|
||||
/* possible bug - glass */
|
||||
ExecHashTableInsert(hashtable, econtext, innerhashkey);
|
||||
}
|
||||
|
||||
|
||||
/* -----------------
|
||||
* only the last process comes to this branch
|
||||
* now all the processes have finished the build phase
|
||||
* ----------------
|
||||
*/
|
||||
|
||||
/*
|
||||
* after we build the hash table, the inner batch is no longer needed
|
||||
* after we build the hash table, the inner batch file is no longer needed
|
||||
*/
|
||||
FileUnlink(innerBatches[newbatch - 1]);
|
||||
hjstate->hj_OuterReadPos = NULL;
|
||||
hashtable->pcount = hashtable->nprocess;
|
||||
BufFileClose(innerFile);
|
||||
hashtable->innerBatchFile[newbatch - 1] = NULL;
|
||||
|
||||
hashtable->curbatch = newbatch;
|
||||
return newbatch;
|
||||
@ -777,63 +590,41 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
static int
|
||||
ExecHashJoinGetBatch(int bucketno, HashJoinTable hashtable, int nbatch)
|
||||
ExecHashJoinGetBatch(int bucketno, HashJoinTable hashtable)
|
||||
{
|
||||
int b;
|
||||
|
||||
if (bucketno < hashtable->nbuckets || nbatch == 0)
|
||||
if (bucketno < hashtable->nbuckets || hashtable->nbatch == 0)
|
||||
return 0;
|
||||
|
||||
b = (float) (bucketno - hashtable->nbuckets) /
|
||||
(float) (hashtable->totalbuckets - hashtable->nbuckets) *
|
||||
nbatch;
|
||||
b = (hashtable->nbatch * (bucketno - hashtable->nbuckets)) /
|
||||
(hashtable->totalbuckets - hashtable->nbuckets);
|
||||
return b + 1;
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
* ExecHashJoinSaveTuple
|
||||
*
|
||||
* save a tuple to a tmp file using a buffer.
|
||||
* the first few bytes in a page is an offset to the end
|
||||
* of the page.
|
||||
* save a tuple to a tmp file.
|
||||
*
|
||||
* The data recorded in the file for each tuple is an image of its
|
||||
* HeapTupleData (with meaningless t_data pointer) followed by the
|
||||
* HeapTupleHeader and tuple data.
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
|
||||
char *
|
||||
void
|
||||
ExecHashJoinSaveTuple(HeapTuple heapTuple,
|
||||
char *buffer,
|
||||
File file,
|
||||
char *position)
|
||||
BufFile *file)
|
||||
{
|
||||
long *pageend;
|
||||
char *pagestart;
|
||||
char *pagebound;
|
||||
int cc;
|
||||
size_t written;
|
||||
|
||||
pageend = (long *) buffer;
|
||||
pagestart = (char *) (buffer + sizeof(long));
|
||||
pagebound = buffer + BLCKSZ;
|
||||
if (position == NULL)
|
||||
position = pagestart;
|
||||
|
||||
if (position + heapTuple->t_len + HEAPTUPLESIZE >= pagebound)
|
||||
{
|
||||
cc = FileSeek(file, 0L, SEEK_END);
|
||||
if (cc < 0)
|
||||
perror("FileSeek");
|
||||
cc = FileWrite(file, buffer, BLCKSZ);
|
||||
NDirectFileWrite++;
|
||||
if (cc < 0)
|
||||
perror("FileWrite");
|
||||
position = pagestart;
|
||||
*pageend = 0;
|
||||
}
|
||||
memmove(position, heapTuple, HEAPTUPLESIZE);
|
||||
memmove(position + HEAPTUPLESIZE, heapTuple->t_data, heapTuple->t_len);
|
||||
position = (char *) MAXALIGN(position + heapTuple->t_len + HEAPTUPLESIZE);
|
||||
*pageend = position - buffer;
|
||||
|
||||
return position;
|
||||
written = BufFileWrite(file, (void *) heapTuple, sizeof(HeapTupleData));
|
||||
if (written != sizeof(HeapTupleData))
|
||||
elog(ERROR, "Write to hashjoin temp file failed");
|
||||
written = BufFileWrite(file, (void *) heapTuple->t_data, heapTuple->t_len);
|
||||
if (written != (size_t) heapTuple->t_len)
|
||||
elog(ERROR, "Write to hashjoin temp file failed");
|
||||
}
|
||||
|
||||
void
|
||||
@ -855,14 +646,10 @@ ExecReScanHashJoin(HashJoin *node, ExprContext *exprCtxt, Plan *parent)
|
||||
ExecHashTableDestroy(hjstate->hj_HashTable);
|
||||
hjstate->hj_HashTable = NULL;
|
||||
}
|
||||
hjstate->hj_CurBucket = (HashBucket) NULL;
|
||||
hjstate->hj_CurTuple = (HeapTuple) NULL;
|
||||
hjstate->hj_CurOTuple = (OverflowTuple) NULL;
|
||||
|
||||
hjstate->hj_CurBucketNo = 0;
|
||||
hjstate->hj_CurTuple = (HashJoinTuple) NULL;
|
||||
hjstate->hj_InnerHashKey = (Var *) NULL;
|
||||
hjstate->hj_OuterBatches = (File *) NULL;
|
||||
hjstate->hj_InnerBatches = (File *) NULL;
|
||||
hjstate->hj_OuterReadPos = (char *) NULL;
|
||||
hjstate->hj_OuterReadBlk = (int) 0;
|
||||
|
||||
hjstate->jstate.cs_OuterTupleSlot = (TupleTableSlot *) NULL;
|
||||
hjstate->jstate.cs_TupFromTlist = (bool) false;
|
||||
@ -875,5 +662,4 @@ ExecReScanHashJoin(HashJoin *node, ExprContext *exprCtxt, Plan *parent)
|
||||
ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node);
|
||||
if (((Plan *) node)->righttree->chgParam == NULL)
|
||||
ExecReScan(((Plan *) node)->righttree, exprCtxt, (Plan *) node);
|
||||
|
||||
}
|
||||
|
@ -1,85 +1,92 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* hashjoin.h
|
||||
* internal structures for hash table and buckets
|
||||
* internal structures for hash joins
|
||||
*
|
||||
*
|
||||
* Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* $Id: hashjoin.h,v 1.10 1999/05/09 00:53:18 tgl Exp $
|
||||
* $Id: hashjoin.h,v 1.11 1999/05/18 21:33:04 tgl Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef HASHJOIN_H
|
||||
#define HASHJOIN_H
|
||||
|
||||
#include <storage/ipc.h>
|
||||
|
||||
/* -----------------
|
||||
* have to use relative address as pointers in the hashtable
|
||||
* because the hashtable may reallocate in different processes
|
||||
*
|
||||
* XXX: this relative-address stuff is useless on all supported platforms
|
||||
* and is a ever-dangerous source of bugs. Really ought to rip it out.
|
||||
* -----------------
|
||||
*/
|
||||
typedef int RelativeAddr;
|
||||
|
||||
/* ------------------
|
||||
* The relative addresses are always relative to the head of the
|
||||
* hashtable, the following macros convert them to/from absolute address.
|
||||
* NULL is represented as -1 (CAUTION: RELADDR() doesn't handle that!).
|
||||
* CAUTION: ABSADDR evaluates its arg twice!!
|
||||
* ------------------
|
||||
*/
|
||||
#define ABSADDR(X) ((X) < 0 ? (char*) NULL : (char*)hashtable + (X))
|
||||
#define RELADDR(X) ((RelativeAddr)((char*)(X) - (char*)hashtable))
|
||||
|
||||
typedef char **charPP;
|
||||
typedef int *intP;
|
||||
#include "access/htup.h"
|
||||
#include "storage/fd.h"
|
||||
#include "utils/mcxt.h"
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
* hash-join hash table structures
|
||||
*
|
||||
* Each active hashjoin has a HashJoinTable control block which is
|
||||
* palloc'd in the executor's context. All other storage needed for
|
||||
* the hashjoin is kept in a private "named portal", one for each hashjoin.
|
||||
* This makes it easy and fast to release the storage when we don't need it
|
||||
* anymore.
|
||||
*
|
||||
* The portal manager guarantees that portals will be discarded at end of
|
||||
* transaction, so we have no problem with a memory leak if the join is
|
||||
* aborted early by an error. (Likewise, any temporary files we make will
|
||||
* be cleaned up by the virtual file manager in event of an error.)
|
||||
*
|
||||
* Storage that should live through the entire join is allocated from the
|
||||
* portal's "variable context", while storage that is only wanted for the
|
||||
* current batch is allocated in the portal's "heap context". By popping
|
||||
* the portal's heap at the end of a batch, we free all the per-batch storage
|
||||
* reliably and without tedium.
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
|
||||
typedef struct HashJoinTupleData
|
||||
{
|
||||
struct HashJoinTupleData *next; /* link to next tuple in same bucket */
|
||||
HeapTupleData htup; /* tuple header */
|
||||
} HashJoinTupleData;
|
||||
|
||||
typedef HashJoinTupleData *HashJoinTuple;
|
||||
|
||||
typedef struct HashTableData
|
||||
{
|
||||
int nbuckets;
|
||||
int totalbuckets;
|
||||
int bucketsize;
|
||||
IpcMemoryId shmid;
|
||||
RelativeAddr top; /* char* */
|
||||
RelativeAddr bottom; /* char* */
|
||||
RelativeAddr overflownext; /* char* */
|
||||
RelativeAddr batch; /* char* */
|
||||
RelativeAddr readbuf; /* char* */
|
||||
int nbatch;
|
||||
RelativeAddr outerbatchPos; /* RelativeAddr* */
|
||||
RelativeAddr innerbatchPos; /* RelativeAddr* */
|
||||
RelativeAddr innerbatchSizes; /* int* */
|
||||
int curbatch;
|
||||
int nprocess;
|
||||
int pcount;
|
||||
} HashTableData; /* real hash table follows here */
|
||||
int nbuckets; /* buckets in use during this batch */
|
||||
int totalbuckets; /* total number of (virtual) buckets */
|
||||
HashJoinTuple *buckets; /* buckets[i] is head of list of tuples */
|
||||
/* buckets array is per-batch storage, as are all the tuples */
|
||||
|
||||
int nbatch; /* number of batches; 0 means 1-pass join */
|
||||
int curbatch; /* current batch #, or 0 during 1st pass */
|
||||
|
||||
/* all these arrays are allocated for the life of the hash join,
|
||||
* but only if nbatch > 0:
|
||||
*/
|
||||
BufFile **innerBatchFile; /* buffered virtual temp file per batch */
|
||||
BufFile **outerBatchFile; /* buffered virtual temp file per batch */
|
||||
long *outerBatchSize; /* count of tuples in each outer batch file */
|
||||
long *innerBatchSize; /* count of tuples in each inner batch file */
|
||||
|
||||
/* During 1st scan of inner relation, we get tuples from executor.
|
||||
* If nbatch > 0 then tuples that don't belong in first nbuckets logical
|
||||
* buckets get dumped into inner-batch temp files.
|
||||
* The same statements apply for the 1st scan of the outer relation,
|
||||
* except we write tuples to outer-batch temp files.
|
||||
* If nbatch > 0 then we do the following for each batch:
|
||||
* 1. Read tuples from inner batch file, load into hash buckets.
|
||||
* 2. Read tuples from outer batch file, match to hash buckets and output.
|
||||
*/
|
||||
|
||||
/* Ugly kluge: myPortal ought to be declared as type Portal (ie, PortalD*)
|
||||
* but if we try to include utils/portal.h here, we end up with a
|
||||
* circular dependency of include files! Until the various node.h files
|
||||
* are restructured in a cleaner way, we have to fake it. The most
|
||||
* reliable fake seems to be to declare myPortal as void * and then
|
||||
* cast it to the right things in nodeHash.c.
|
||||
*/
|
||||
void *myPortal; /* where to keep working storage */
|
||||
MemoryContext hashCxt; /* context for whole-hash-join storage */
|
||||
MemoryContext batchCxt; /* context for this-batch-only storage */
|
||||
} HashTableData;
|
||||
|
||||
typedef HashTableData *HashJoinTable;
|
||||
|
||||
typedef struct OverflowTupleData
|
||||
{
|
||||
RelativeAddr tuple; /* HeapTuple */
|
||||
RelativeAddr next; /* struct OverflowTupleData * */
|
||||
} OverflowTupleData; /* real tuple follows here */
|
||||
|
||||
typedef OverflowTupleData *OverflowTuple;
|
||||
|
||||
typedef struct HashBucketData
|
||||
{
|
||||
RelativeAddr top; /* HeapTuple */
|
||||
RelativeAddr bottom; /* HeapTuple */
|
||||
RelativeAddr firstotuple; /* OverflowTuple */
|
||||
RelativeAddr lastotuple; /* OverflowTuple */
|
||||
} HashBucketData; /* real bucket follows here */
|
||||
|
||||
typedef HashBucketData *HashBucket;
|
||||
|
||||
#endif /* HASHJOIN_H */
|
||||
|
@ -6,7 +6,7 @@
|
||||
*
|
||||
* Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* $Id: nodeHash.h,v 1.11 1999/02/13 23:21:25 momjian Exp $
|
||||
* $Id: nodeHash.h,v 1.12 1999/05/18 21:33:05 tgl Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -18,7 +18,6 @@
|
||||
#include "nodes/execnodes.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "nodes/plannodes.h"
|
||||
#include "storage/fd.h"
|
||||
#include "utils/syscache.h"
|
||||
|
||||
extern TupleTableSlot *ExecHash(Hash *node);
|
||||
@ -26,15 +25,14 @@ extern bool ExecInitHash(Hash *node, EState *estate, Plan *parent);
|
||||
extern int ExecCountSlotsHash(Hash *node);
|
||||
extern void ExecEndHash(Hash *node);
|
||||
extern HashJoinTable ExecHashTableCreate(Hash *node);
|
||||
extern void ExecHashTableInsert(HashJoinTable hashtable, ExprContext *econtext,
|
||||
Var *hashkey, File *batches);
|
||||
extern void ExecHashTableDestroy(HashJoinTable hashtable);
|
||||
extern void ExecHashTableInsert(HashJoinTable hashtable, ExprContext *econtext,
|
||||
Var *hashkey);
|
||||
extern int ExecHashGetBucket(HashJoinTable hashtable, ExprContext *econtext,
|
||||
Var *hashkey);
|
||||
extern HeapTuple ExecScanHashBucket(HashJoinState *hjstate, HashBucket bucket,
|
||||
HeapTuple curtuple, List *hjclauses,
|
||||
ExprContext *econtext);
|
||||
extern void ExecHashTableReset(HashJoinTable hashtable, int ntuples);
|
||||
Var *hashkey);
|
||||
extern HeapTuple ExecScanHashBucket(HashJoinState *hjstate, List *hjclauses,
|
||||
ExprContext *econtext);
|
||||
extern void ExecHashTableReset(HashJoinTable hashtable, long ntuples);
|
||||
extern void ExecReScanHash(Hash *node, ExprContext *exprCtxt, Plan *parent);
|
||||
|
||||
#endif /* NODEHASH_H */
|
||||
|
@ -6,7 +6,7 @@
|
||||
*
|
||||
* Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* $Id: nodeHashjoin.h,v 1.11 1999/02/13 23:21:26 momjian Exp $
|
||||
* $Id: nodeHashjoin.h,v 1.12 1999/05/18 21:33:05 tgl Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -21,9 +21,7 @@ extern TupleTableSlot *ExecHashJoin(HashJoin *node);
|
||||
extern bool ExecInitHashJoin(HashJoin *node, EState *estate, Plan *parent);
|
||||
extern int ExecCountSlotsHashJoin(HashJoin *node);
|
||||
extern void ExecEndHashJoin(HashJoin *node);
|
||||
extern char *ExecHashJoinSaveTuple(HeapTuple heapTuple, char *buffer,
|
||||
File file, char *position);
|
||||
extern void ExecHashJoinSaveTuple(HeapTuple heapTuple, BufFile *file);
|
||||
extern void ExecReScanHashJoin(HashJoin *node, ExprContext *exprCtxt, Plan *parent);
|
||||
|
||||
|
||||
#endif /* NODEHASHJOIN_H */
|
||||
|
Loading…
x
Reference in New Issue
Block a user