544 lines
14 KiB
C
544 lines
14 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* nodeTee.c--
|
|
*
|
|
*
|
|
* Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
* DESCRIPTION
|
|
* This code provides support for a tee node, which allows multiple
|
|
* parent in a megaplan.
|
|
*
|
|
* INTERFACE ROUTINES
|
|
* ExecTee
|
|
* ExecInitTee
|
|
* ExecEndTee
|
|
*
|
|
* IDENTIFICATION
|
|
* $Header: /cvsroot/pgsql/src/backend/executor/Attic/nodeTee.c,v 1.24 1998/10/08 18:29:27 momjian Exp $
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include <sys/types.h>
|
|
#include <sys/file.h>
|
|
#include "postgres.h"
|
|
|
|
#include "utils/palloc.h"
|
|
#include "utils/relcache.h"
|
|
#include "utils/mcxt.h"
|
|
#include "storage/bufmgr.h"
|
|
#include "storage/smgr.h"
|
|
#include "optimizer/internal.h"
|
|
#include "executor/executor.h"
|
|
#include "executor/nodeTee.h"
|
|
#include "catalog/catalog.h"
|
|
#include "catalog/heap.h"
|
|
#include "tcop/pquery.h"
|
|
#include "access/heapam.h"
|
|
|
|
/* ------------------------------------------------------------------
|
|
* ExecInitTee
|
|
*
|
|
* Create tee state
|
|
*
|
|
* ------------------------------------------------------------------
|
|
*/
|
|
bool
|
|
ExecInitTee(Tee *node, EState *currentEstate, Plan *parent)
|
|
{
|
|
TeeState *teeState;
|
|
Plan *outerPlan;
|
|
int len;
|
|
Relation bufferRel;
|
|
TupleDesc tupType;
|
|
EState *estate;
|
|
|
|
/*
|
|
* it is possible that the Tee has already been initialized since it
|
|
* can be reached by multiple parents. If it is already initialized,
|
|
* simply return and do not initialize the children nodes again
|
|
*/
|
|
if (node->plan.state)
|
|
return TRUE;
|
|
|
|
/* ----------------
|
|
* assign the node's execution state
|
|
* ----------------
|
|
*/
|
|
|
|
/*
|
|
* make a new executor state, because we have a different
|
|
* es_range_table
|
|
*/
|
|
|
|
/* node->plan.state = estate;*/
|
|
|
|
estate = CreateExecutorState();
|
|
estate->es_direction = currentEstate->es_direction;
|
|
estate->es_BaseId = currentEstate->es_BaseId;
|
|
estate->es_BaseId = currentEstate->es_BaseId;
|
|
estate->es_tupleTable = currentEstate->es_tupleTable;
|
|
estate->es_refcount = currentEstate->es_refcount;
|
|
estate->es_junkFilter = currentEstate->es_junkFilter;
|
|
|
|
/*
|
|
* use the range table for Tee subplan since the range tables for the
|
|
* two parents may be different
|
|
*/
|
|
if (node->rtentries)
|
|
estate->es_range_table = node->rtentries;
|
|
else
|
|
estate->es_range_table = currentEstate->es_range_table;
|
|
|
|
node->plan.state = estate;
|
|
|
|
|
|
/* ----------------
|
|
* create teeState structure
|
|
* ----------------
|
|
*/
|
|
teeState = makeNode(TeeState);
|
|
teeState->tee_leftPlace = 0;
|
|
teeState->tee_rightPlace = 0;
|
|
teeState->tee_lastPlace = 0;
|
|
teeState->tee_bufferRel = NULL;
|
|
teeState->tee_leftScanDesc = NULL;
|
|
teeState->tee_rightScanDesc = NULL;
|
|
|
|
|
|
node->teestate = teeState;
|
|
|
|
/* ----------------
|
|
* Miscellanious initialization
|
|
*
|
|
* + assign node's base_id
|
|
* + assign debugging hooks and
|
|
* + create expression context for node
|
|
* ----------------
|
|
*/
|
|
ExecAssignNodeBaseInfo(estate, &(teeState->cstate), parent);
|
|
ExecAssignExprContext(estate, &(teeState->cstate));
|
|
|
|
#define TEE_NSLOTS 2
|
|
/* ----------------
|
|
* initialize tuple slots
|
|
* ----------------
|
|
*/
|
|
ExecInitResultTupleSlot(estate, &(teeState->cstate));
|
|
|
|
/* initialize child nodes */
|
|
outerPlan = outerPlan((Plan *) node);
|
|
ExecInitNode(outerPlan, estate, (Plan *) node);
|
|
|
|
/* ----------------
|
|
* the tuple type info is from the outer plan of this node
|
|
* the result type is also the same as the outerplan
|
|
*/
|
|
ExecAssignResultTypeFromOuterPlan((Plan *) node, &(teeState->cstate));
|
|
ExecAssignProjectionInfo((Plan *) node, &teeState->cstate);
|
|
|
|
/* ---------------------------------------
|
|
initialize temporary relation to buffer tuples
|
|
*/
|
|
tupType = ExecGetResultType(&(teeState->cstate));
|
|
len = ExecTargetListLength(((Plan *) node)->targetlist);
|
|
|
|
/* bufferRel = ExecCreatR(len, tupType, _TEMP_RELATION_ID_); */
|
|
|
|
/*
|
|
* create a catalogued relation even though this is a temporary
|
|
* relation
|
|
*/
|
|
/* cleanup of catalogued relations is easier to do */
|
|
|
|
if (node->teeTableName[0] != '\0')
|
|
{
|
|
Relation r;
|
|
|
|
teeState->tee_bufferRelname = pstrdup(node->teeTableName);
|
|
|
|
/*
|
|
* we are given an tee table name, if a relation by that name
|
|
* exists, then we open it, else we create it and then open it
|
|
*/
|
|
r = RelationNameGetRelation(teeState->tee_bufferRelname);
|
|
|
|
if (RelationIsValid(r))
|
|
bufferRel = heap_openr(teeState->tee_bufferRelname);
|
|
else
|
|
bufferRel = heap_open(
|
|
heap_create_with_catalog(teeState->tee_bufferRelname,
|
|
tupType, RELKIND_RELATION));
|
|
}
|
|
else
|
|
{
|
|
sprintf(teeState->tee_bufferRelname,
|
|
"ttemp_%d", /* 'ttemp' for 'tee' temporary */
|
|
newoid());
|
|
/* bufferRel = ExecCreatR(len, tupType, _TEMP_RELATION_ID); */
|
|
bufferRel = heap_open(
|
|
heap_create_with_catalog(teeState->tee_bufferRelname,
|
|
tupType, RELKIND_RELATION));
|
|
}
|
|
|
|
teeState->tee_bufferRel = bufferRel;
|
|
|
|
/*
|
|
* initialize a memory context for allocating thing like scan
|
|
* descriptors
|
|
*/
|
|
|
|
/*
|
|
* we do this so that on cleanup of the tee, we can free things. if we
|
|
* didn't have our own memory context, we would be in the memory
|
|
* context of the portal that we happen to be using at the moment
|
|
*/
|
|
|
|
teeState->tee_mcxt = (MemoryContext) CreateGlobalMemory(teeState->tee_bufferRelname);
|
|
|
|
/*
|
|
* don't initialize the scan descriptors here because it's not good to
|
|
* initialize scan descriptors on empty rels. Wait until the scan
|
|
* descriptors are needed before initializing them.
|
|
*/
|
|
|
|
teeState->tee_leftScanDesc = NULL;
|
|
teeState->tee_rightScanDesc = NULL;
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
int
|
|
ExecCountSlotsTee(Tee *node)
|
|
{
|
|
/* Tee nodes can't have innerPlans */
|
|
return ExecCountSlotsNode(outerPlan(node)) + TEE_NSLOTS;
|
|
}
|
|
|
|
/* ----------------------------------------------------------------
|
|
initTeeScanDescs
|
|
initializes the left and right scandescs on the temporary
|
|
relation of a Tee node
|
|
|
|
must open two separate scan descriptors,
|
|
because the left and right scans may be at different points
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
static void
|
|
initTeeScanDescs(Tee *node)
|
|
{
|
|
TeeState *teeState;
|
|
Relation bufferRel;
|
|
ScanDirection dir;
|
|
Snapshot snapshot;
|
|
MemoryContext orig;
|
|
|
|
teeState = node->teestate;
|
|
if (teeState->tee_leftScanDesc && teeState->tee_rightScanDesc)
|
|
return;
|
|
|
|
orig = CurrentMemoryContext;
|
|
MemoryContextSwitchTo(teeState->tee_mcxt);
|
|
|
|
bufferRel = teeState->tee_bufferRel;
|
|
dir = ((Plan *) node)->state->es_direction; /* backwards not handled
|
|
* yet XXX */
|
|
snapshot = ((Plan *) node)->state->es_snapshot;
|
|
|
|
if (teeState->tee_leftScanDesc == NULL)
|
|
{
|
|
teeState->tee_leftScanDesc = heap_beginscan(bufferRel,
|
|
ScanDirectionIsBackward(dir),
|
|
snapshot,
|
|
0, /* num scan keys */
|
|
NULL /* scan keys */
|
|
);
|
|
}
|
|
if (teeState->tee_rightScanDesc == NULL)
|
|
{
|
|
teeState->tee_rightScanDesc = heap_beginscan(bufferRel,
|
|
ScanDirectionIsBackward(dir),
|
|
snapshot,
|
|
0, /* num scan keys */
|
|
NULL /* scan keys */
|
|
);
|
|
}
|
|
|
|
MemoryContextSwitchTo(orig);
|
|
}
|
|
|
|
/* ----------------------------------------------------------------
|
|
* ExecTee(node)
|
|
*
|
|
*
|
|
* A Tee serves to connect a subplan to multiple parents.
|
|
* the subplan is always the outplan of the Tee node.
|
|
*
|
|
* The Tee gets requests from either leftParent or rightParent,
|
|
* fetches the result tuple from the child, and then
|
|
* stored the result into a temporary relation (serving as a queue).
|
|
* leftPlace and rightPlace keep track of where the left and rightParents
|
|
* are.
|
|
* If a parent requests a tuple and that parent is not at the end
|
|
* of the temporary relation, then the request is satisfied from
|
|
* the queue instead of by executing the child plan
|
|
*
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
|
|
TupleTableSlot *
|
|
ExecTee(Tee *node, Plan *parent)
|
|
{
|
|
EState *estate;
|
|
TeeState *teeState;
|
|
int leftPlace,
|
|
rightPlace,
|
|
lastPlace;
|
|
int branch;
|
|
TupleTableSlot *result;
|
|
TupleTableSlot *slot;
|
|
Plan *childNode;
|
|
ScanDirection dir;
|
|
HeapTuple heapTuple;
|
|
Relation bufferRel;
|
|
HeapScanDesc scanDesc;
|
|
|
|
estate = ((Plan *) node)->state;
|
|
teeState = node->teestate;
|
|
leftPlace = teeState->tee_leftPlace;
|
|
rightPlace = teeState->tee_rightPlace;
|
|
lastPlace = teeState->tee_lastPlace;
|
|
bufferRel = teeState->tee_bufferRel;
|
|
|
|
childNode = outerPlan(node);
|
|
|
|
dir = estate->es_direction;
|
|
|
|
/* XXX doesn't handle backwards direction yet */
|
|
|
|
if (parent == node->leftParent)
|
|
branch = leftPlace;
|
|
else if ((parent == node->rightParent) || (parent == (Plan *) node))
|
|
|
|
/*
|
|
* the tee node could be the root node of the plan, in which case,
|
|
* we treat it like a right-parent pull
|
|
*/
|
|
branch = rightPlace;
|
|
else
|
|
{
|
|
elog(ERROR, "A Tee node can only be executed from its left or right parent\n");
|
|
return NULL;
|
|
}
|
|
|
|
if (branch == lastPlace)
|
|
{ /* we're at the end of the queue already,
|
|
* - get a new tuple from the child plan,
|
|
* - store it in the queue, - increment
|
|
* lastPlace, - increment leftPlace or
|
|
* rightPlace as appropriate, - and return
|
|
* result */
|
|
slot = ExecProcNode(childNode, (Plan *) node);
|
|
if (!TupIsNull(slot))
|
|
{
|
|
heapTuple = slot->val;
|
|
|
|
/* insert into temporary relation */
|
|
heap_insert(bufferRel, heapTuple);
|
|
|
|
/*
|
|
* once there is data in the temporary relation, ensure that
|
|
* the left and right scandescs are initialized
|
|
*/
|
|
initTeeScanDescs(node);
|
|
|
|
scanDesc = (parent == node->leftParent) ?
|
|
teeState->tee_leftScanDesc : teeState->tee_rightScanDesc;
|
|
|
|
{
|
|
|
|
/*
|
|
* move the scandesc forward so we don't re-read this
|
|
* tuple later
|
|
*/
|
|
HeapTuple throwAway;
|
|
|
|
/* Buffer buffer; */
|
|
throwAway = heap_getnext(scanDesc, ScanDirectionIsBackward(dir));
|
|
}
|
|
|
|
/*
|
|
* set the shouldFree field of the child's slot so that when
|
|
* the child's slot is free'd, this tuple isn't free'd also
|
|
*/
|
|
|
|
/*
|
|
* does this mean this tuple has to be garbage collected
|
|
* later??
|
|
*/
|
|
slot->ttc_shouldFree = false;
|
|
|
|
teeState->tee_lastPlace = lastPlace + 1;
|
|
}
|
|
result = slot;
|
|
}
|
|
else
|
|
{ /* the desired data already exists in the
|
|
* temporary relation */
|
|
scanDesc = (parent == node->leftParent) ?
|
|
teeState->tee_leftScanDesc : teeState->tee_rightScanDesc;
|
|
|
|
heapTuple = heap_getnext(scanDesc, ScanDirectionIsBackward(dir));
|
|
|
|
/*
|
|
* Increase the pin count on the buffer page, because the tuple
|
|
* stored in the slot also points to it (as well as the scan
|
|
* descriptor). If we don't, ExecStoreTuple will decrease the pin
|
|
* count on the next iteration.
|
|
*/
|
|
|
|
if (scanDesc->rs_cbuf != InvalidBuffer)
|
|
IncrBufferRefCount(scanDesc->rs_cbuf);
|
|
|
|
slot = teeState->cstate.cs_ResultTupleSlot;
|
|
slot->ttc_tupleDescriptor = RelationGetDescr(bufferRel);
|
|
|
|
result = ExecStoreTuple(heapTuple, /* tuple to store */
|
|
slot, /* slot to store in */
|
|
scanDesc->rs_cbuf, /* this tuple's buffer */
|
|
false); /* don't free stuff from
|
|
* heap_getnext */
|
|
|
|
}
|
|
|
|
if (parent == node->leftParent)
|
|
teeState->tee_leftPlace = leftPlace + 1;
|
|
else
|
|
teeState->tee_rightPlace = rightPlace + 1;
|
|
|
|
return result;
|
|
}
|
|
|
|
#ifdef NOT_USED
|
|
/* ----------------------------------------------------------------
|
|
* ExecTeeReScan(node)
|
|
*
|
|
* Rescans the relation.
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
void
|
|
ExecTeeReScan(Tee *node, ExprContext *exprCtxt, Plan *parent)
|
|
{
|
|
|
|
EState *estate;
|
|
TeeState *teeState;
|
|
ScanDirection dir;
|
|
|
|
estate = ((Plan *) node)->state;
|
|
teeState = node->teestate;
|
|
|
|
dir = estate->es_direction;
|
|
|
|
/* XXX doesn't handle backwards direction yet */
|
|
|
|
if (parent == node->leftParent)
|
|
{
|
|
if (teeState->tee_leftScanDesc)
|
|
{
|
|
heap_rescan(teeState->tee_leftScanDesc,
|
|
ScanDirectionIsBackward(dir),
|
|
NULL);
|
|
teeState->tee_leftPlace = 0;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (teeState->tee_rightScanDesc)
|
|
{
|
|
heap_rescan(teeState->tee_leftScanDesc,
|
|
ScanDirectionIsBackward(dir),
|
|
NULL);
|
|
teeState->tee_rightPlace = 0;
|
|
}
|
|
}
|
|
}
|
|
#endif
|
|
|
|
|
|
/* ---------------------------------------------------------------------
|
|
* ExecEndTee
|
|
*
|
|
* End the Tee node, and free up any storage
|
|
* since a Tee node can be downstream of multiple parent nodes,
|
|
* only free when both parents are done
|
|
* --------------------------------------------------------------------
|
|
*/
|
|
|
|
void
|
|
ExecEndTee(Tee *node, Plan *parent)
|
|
{
|
|
EState *estate;
|
|
TeeState *teeState;
|
|
int leftPlace,
|
|
rightPlace,
|
|
lastPlace;
|
|
Relation bufferRel;
|
|
MemoryContext orig;
|
|
|
|
estate = ((Plan *) node)->state;
|
|
teeState = node->teestate;
|
|
leftPlace = teeState->tee_leftPlace;
|
|
rightPlace = teeState->tee_rightPlace;
|
|
lastPlace = teeState->tee_lastPlace;
|
|
|
|
if (!node->leftParent || parent == node->leftParent)
|
|
leftPlace = -1;
|
|
|
|
if (!node->rightParent || parent == node->rightParent)
|
|
rightPlace = -1;
|
|
|
|
if (parent == (Plan *) node)
|
|
rightPlace = leftPlace = -1;
|
|
|
|
teeState->tee_leftPlace = leftPlace;
|
|
teeState->tee_rightPlace = rightPlace;
|
|
if ((leftPlace == -1) && (rightPlace == -1))
|
|
{
|
|
/* remove the temporary relations */
|
|
/* and close the scan descriptors */
|
|
|
|
bufferRel = teeState->tee_bufferRel;
|
|
if (bufferRel)
|
|
{
|
|
heap_destroy(bufferRel);
|
|
teeState->tee_bufferRel = NULL;
|
|
if (teeState->tee_mcxt)
|
|
{
|
|
orig = CurrentMemoryContext;
|
|
MemoryContextSwitchTo(teeState->tee_mcxt);
|
|
}
|
|
else
|
|
orig = 0;
|
|
|
|
if (teeState->tee_leftScanDesc)
|
|
{
|
|
heap_endscan(teeState->tee_leftScanDesc);
|
|
teeState->tee_leftScanDesc = NULL;
|
|
}
|
|
if (teeState->tee_rightScanDesc)
|
|
{
|
|
heap_endscan(teeState->tee_rightScanDesc);
|
|
teeState->tee_rightScanDesc = NULL;
|
|
}
|
|
|
|
if (teeState->tee_mcxt)
|
|
{
|
|
MemoryContextSwitchTo(orig);
|
|
teeState->tee_mcxt = NULL;
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|