Clean up and document btree code for ordering keys. Neat stuff,

actually, but who could understand it with no comments?  Fix bug
while at it: _bt_orderkeys would try to invoke comparisons on
NULL inputs, given the right sort of redundant quals.
This commit is contained in:
Tom Lane 2000-07-25 04:47:59 +00:00
parent da1ad323b7
commit 916b2321ad
3 changed files with 282 additions and 189 deletions

View File

@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.61 2000/07/21 06:42:32 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.62 2000/07/25 04:47:58 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -374,7 +374,7 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
BTItem btitem; BTItem btitem;
IndexTuple itup; IndexTuple itup;
BTScanOpaque so; BTScanOpaque so;
Size keysok; bool continuescan;
rel = scan->relation; rel = scan->relation;
so = (BTScanOpaque) scan->opaque; so = (BTScanOpaque) scan->opaque;
@ -396,16 +396,14 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum)); btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
itup = &btitem->bti_itup; itup = &btitem->bti_itup;
if (_bt_checkkeys(scan, itup, &keysok)) if (_bt_checkkeys(scan, itup, dir, &continuescan))
{ {
/* tuple passes all scan key conditions, so return it */ /* tuple passes all scan key conditions, so return it */
Assert(keysok == so->numberOfKeys);
return FormRetrieveIndexResult(current, &(itup->t_tid)); return FormRetrieveIndexResult(current, &(itup->t_tid));
} }
/* This tuple doesn't pass, but there might be more that do */ /* This tuple doesn't pass, but there might be more that do */
} while (keysok >= so->numberOfFirstKeys || } while (continuescan);
(keysok == ((Size) -1) && ScanDirectionIsBackward(dir)));
/* No more items, so close down the current-item info */ /* No more items, so close down the current-item info */
ItemPointerSetInvalid(current); ItemPointerSetInvalid(current);
@ -442,11 +440,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
RegProcedure proc; RegProcedure proc;
int32 result; int32 result;
BTScanOpaque so; BTScanOpaque so;
Size keysok; bool continuescan;
bool strategyCheck; ScanKey scankeys = NULL;
ScanKey scankeys = 0;
int keysCount = 0; int keysCount = 0;
int *nKeyIs = 0; int *nKeyIs = NULL;
int i, int i,
j; j;
StrategyNumber strat_total; StrategyNumber strat_total;
@ -455,71 +452,74 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
so = (BTScanOpaque) scan->opaque; so = (BTScanOpaque) scan->opaque;
/* /*
* Order the keys in the qualification and be sure that the scan * Order the scan keys in our canonical fashion and eliminate any
* exploits the tree order. * redundant keys.
*/
_bt_orderkeys(rel, so);
/*
* Quit now if _bt_orderkeys() discovered that the scan keys can
* never be satisfied (eg, x == 1 AND x > 2).
*/
if (! so->qual_ok)
return (RetrieveIndexResult) NULL;
/*
* Examine the scan keys to discover where we need to start the scan.
*/ */
so->numberOfFirstKeys = 0; /* may be changed by _bt_orderkeys */
so->qual_ok = 1; /* may be changed by _bt_orderkeys */
scan->scanFromEnd = false; scan->scanFromEnd = false;
strategyCheck = false; strat_total = BTEqualStrategyNumber;
if (so->numberOfKeys > 0) if (so->numberOfKeys > 0)
{ {
_bt_orderkeys(rel, so);
if (so->qual_ok)
strategyCheck = true;
}
strat_total = BTEqualStrategyNumber;
if (strategyCheck)
{
AttrNumber attno;
nKeyIs = (int *) palloc(so->numberOfKeys * sizeof(int)); nKeyIs = (int *) palloc(so->numberOfKeys * sizeof(int));
for (i = 0; i < so->numberOfKeys; i++) for (i = 0; i < so->numberOfKeys; i++)
{ {
attno = so->keyData[i].sk_attno; AttrNumber attno = so->keyData[i].sk_attno;
if (attno == keysCount)
/* ignore keys for already-determined attrs */
if (attno <= keysCount)
continue; continue;
/* if we didn't find a boundary for the preceding attr, quit */
if (attno > keysCount + 1) if (attno > keysCount + 1)
break; break;
strat = _bt_getstrat(rel, attno, strat = _bt_getstrat(rel, attno,
so->keyData[i].sk_procedure); so->keyData[i].sk_procedure);
/*
* Can we use this key as a starting boundary for this attr?
*
* We can use multiple keys if they look like, say, = >= =
* but we have to stop after accepting a > or < boundary.
*/
if (strat == strat_total || if (strat == strat_total ||
strat == BTEqualStrategyNumber) strat == BTEqualStrategyNumber)
{ {
nKeyIs[keysCount++] = i; nKeyIs[keysCount++] = i;
continue;
} }
if (ScanDirectionIsBackward(dir) && else if (ScanDirectionIsBackward(dir) &&
(strat == BTLessStrategyNumber || (strat == BTLessStrategyNumber ||
strat == BTLessEqualStrategyNumber)) strat == BTLessEqualStrategyNumber))
{ {
nKeyIs[keysCount++] = i; nKeyIs[keysCount++] = i;
strat_total = strat; strat_total = strat;
if (strat == BTLessStrategyNumber) if (strat == BTLessStrategyNumber)
break; break;
continue;
} }
if (ScanDirectionIsForward(dir) && else if (ScanDirectionIsForward(dir) &&
(strat == BTGreaterStrategyNumber || (strat == BTGreaterStrategyNumber ||
strat == BTGreaterEqualStrategyNumber)) strat == BTGreaterEqualStrategyNumber))
{ {
nKeyIs[keysCount++] = i; nKeyIs[keysCount++] = i;
strat_total = strat; strat_total = strat;
if (strat == BTGreaterStrategyNumber) if (strat == BTGreaterStrategyNumber)
break; break;
continue;
} }
} }
if (!keysCount) if (keysCount == 0)
scan->scanFromEnd = true; scan->scanFromEnd = true;
} }
else else
scan->scanFromEnd = true; scan->scanFromEnd = true;
if (so->qual_ok == 0)
return (RetrieveIndexResult) NULL;
/* if we just need to walk down one edge of the tree, do that */ /* if we just need to walk down one edge of the tree, do that */
if (scan->scanFromEnd) if (scan->scanFromEnd)
{ {
@ -529,16 +529,14 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
} }
/* /*
* Okay, we want something more complicated. What we'll do is use the * We want to start the scan somewhere within the index. Set up a
* first item in the scan key passed in (which has been correctly * scankey we can use to search for the correct starting point.
* ordered to take advantage of index ordering) to position ourselves
* at the right place in the scan.
*/ */
scankeys = (ScanKey) palloc(keysCount * sizeof(ScanKeyData)); scankeys = (ScanKey) palloc(keysCount * sizeof(ScanKeyData));
for (i = 0; i < keysCount; i++) for (i = 0; i < keysCount; i++)
{ {
j = nKeyIs[i]; j = nKeyIs[i];
/* _bt_orderkeys disallows it, but it's place to add some code latter */ /* _bt_orderkeys disallows it, but it's place to add some code later */
if (so->keyData[j].sk_flags & SK_ISNULL) if (so->keyData[j].sk_flags & SK_ISNULL)
{ {
pfree(nKeyIs); pfree(nKeyIs);
@ -578,6 +576,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
blkno = BufferGetBlockNumber(buf); blkno = BufferGetBlockNumber(buf);
page = BufferGetPage(buf); page = BufferGetPage(buf);
/* position to the precise item on the page */
offnum = _bt_binsrch(rel, buf, keysCount, scankeys); offnum = _bt_binsrch(rel, buf, keysCount, scankeys);
ItemPointerSet(current, blkno, offnum); ItemPointerSet(current, blkno, offnum);
@ -595,7 +594,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
* call _bt_endpoint() to set up a scan starting at that index endpoint, * call _bt_endpoint() to set up a scan starting at that index endpoint,
* as appropriate for the desired scan type. * as appropriate for the desired scan type.
* *
* it's yet other place to add some code latter for is(not)null ... * it's yet other place to add some code later for is(not)null ...
*---------- *----------
*/ */
@ -737,13 +736,12 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
itup = &btitem->bti_itup; itup = &btitem->bti_itup;
/* is the first item actually acceptable? */ /* is the first item actually acceptable? */
if (_bt_checkkeys(scan, itup, &keysok)) if (_bt_checkkeys(scan, itup, dir, &continuescan))
{ {
/* yes, return it */ /* yes, return it */
res = FormRetrieveIndexResult(current, &(itup->t_tid)); res = FormRetrieveIndexResult(current, &(itup->t_tid));
} }
else if (keysok >= so->numberOfFirstKeys || else if (continuescan)
(keysok == ((Size) -1) && ScanDirectionIsBackward(dir)))
{ {
/* no, but there might be another one that is */ /* no, but there might be another one that is */
res = _bt_next(scan, dir); res = _bt_next(scan, dir);
@ -906,7 +904,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
IndexTuple itup; IndexTuple itup;
BTScanOpaque so; BTScanOpaque so;
RetrieveIndexResult res; RetrieveIndexResult res;
Size keysok; bool continuescan;
rel = scan->relation; rel = scan->relation;
current = &(scan->currentItemData); current = &(scan->currentItemData);
@ -1012,13 +1010,12 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
itup = &(btitem->bti_itup); itup = &(btitem->bti_itup);
/* see if we picked a winner */ /* see if we picked a winner */
if (_bt_checkkeys(scan, itup, &keysok)) if (_bt_checkkeys(scan, itup, dir, &continuescan))
{ {
/* yes, return it */ /* yes, return it */
res = FormRetrieveIndexResult(current, &(itup->t_tid)); res = FormRetrieveIndexResult(current, &(itup->t_tid));
} }
else if (keysok >= so->numberOfFirstKeys || else if (continuescan)
(keysok == ((Size) -1) && ScanDirectionIsBackward(dir)))
{ {
/* no, but there might be another one that is */ /* no, but there might be another one that is */
res = _bt_next(scan, dir); res = _bt_next(scan, dir);

View File

@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.39 2000/07/21 19:21:00 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.40 2000/07/25 04:47:59 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -145,88 +145,148 @@ _bt_formitem(IndexTuple itup)
return btitem; return btitem;
} }
/* /*----------
* _bt_orderkeys() -- Put keys in a sensible order for conjunctive quals. * _bt_orderkeys() -- Put keys in a sensible order for conjunctive quals.
* *
* The order of the keys in the qual match the ordering imposed by * After this routine runs, the scan keys are ordered by index attribute
* the index. This routine only needs to be called if there is * (all quals for attr 1, then all for attr 2, etc) and within each attr
* more than one qual clause using this index. * the keys are ordered by constraint type: ">", ">=", "=", "<=", "<".
* Furthermore, redundant keys are eliminated: we keep only the tightest
* >/>= bound and the tightest </<= bound, and if there's an = key then
* that's the only one returned. (So, we return either a single = key,
* or one or two boundary-condition keys for each attr.)
*
* As a byproduct of this work, we can detect contradictory quals such
* as "x = 1 AND x > 2". If we see that, we return so->quals_ok = FALSE,
* indicating the scan need not be run at all since no tuples can match.
*
* Another byproduct is to determine how many quals must be satisfied to
* continue the scan. _bt_checkkeys uses this. For example, if the quals
* are "x = 1 AND y < 4 AND z < 5", then _bt_checkkeys will reject a tuple
* (1,2,7), but we must continue the scan in case there are tuples (1,3,z).
* But once we reach tuples like (1,4,z) we can stop scanning because no
* later tuples could match. This is reflected by setting
* so->numberOfRequiredKeys to the number of leading keys that must be
* matched to continue the scan. numberOfRequiredKeys is equal to the
* number of leading "=" keys plus the key(s) for the first non "="
* attribute, which can be seen to be correct by considering the above
* example.
*
* The initial ordering of the keys is expected to be by attribute already
* (see group_clauses_by_indexkey() in indxpath.c). The task here is to
* standardize the appearance of multiple keys for the same attribute.
*
* XXX this routine is one of many places that fail to handle SK_COMMUTE
* scankeys properly. Currently, the planner is careful never to generate
* any indexquals that would require SK_COMMUTE to be set. Someday we ought
* to try to fix this, though it's not real critical as long as indexable
* operators all have commutators...
*
* Note: this routine invokes comparison operators via OidFunctionCallN,
* ie, without caching function lookups. No point in trying to be smarter,
* since these comparisons are executed only when the user expresses a
* hokey qualification, and happen only once per scan anyway.
*----------
*/ */
void void
_bt_orderkeys(Relation relation, BTScanOpaque so) _bt_orderkeys(Relation relation, BTScanOpaque so)
{ {
ScanKey xform; ScanKeyData xform[BTMaxStrategyNumber];
ScanKeyData *cur; bool init[BTMaxStrategyNumber];
uint16 numberOfKeys = so->numberOfKeys;
ScanKey key;
ScanKey cur;
StrategyMap map; StrategyMap map;
int nbytes;
Datum test; Datum test;
int i, int i,
j; j;
int init[BTMaxStrategyNumber + 1]; AttrNumber attno;
ScanKey key; uint16 new_numberOfKeys;
uint16 numberOfKeys = so->numberOfKeys; bool allEqualSoFar;
uint16 new_numberOfKeys = 0;
AttrNumber attno = 1; so->qual_ok = true;
bool equalStrategyEnd, so->numberOfRequiredKeys = 0;
underEqualStrategy;
if (numberOfKeys < 1) if (numberOfKeys < 1)
return; return; /* done if qual-less scan */
key = so->keyData; key = so->keyData;
cur = &key[0]; cur = &key[0];
/* check input keys are correctly ordered */
if (cur->sk_attno != 1) if (cur->sk_attno != 1)
elog(ERROR, "_bt_orderkeys: key(s) for attribute 1 missed"); elog(ERROR, "_bt_orderkeys: key(s) for attribute 1 missed");
/* We can short-circuit most of the work if there's just one key */
if (numberOfKeys == 1) if (numberOfKeys == 1)
{ {
/* /*
* We don't use indices for 'A is null' and 'A is not null' * We don't use indices for 'A is null' and 'A is not null'
* currently and 'A < = > <> NULL' is non-sense' - so qual is not * currently and 'A < = > <> NULL' will always fail - so qual is
* Ok. - vadim 03/21/97 * not Ok if comparison value is NULL. - vadim 03/21/97
*/ */
if (cur->sk_flags & SK_ISNULL) if (cur->sk_flags & SK_ISNULL)
so->qual_ok = 0; so->qual_ok = false;
so->numberOfFirstKeys = 1; so->numberOfRequiredKeys = 1;
return; return;
} }
/* get space for the modified array of keys */ /*
nbytes = BTMaxStrategyNumber * sizeof(ScanKeyData); * Otherwise, do the full set of pushups.
xform = (ScanKey) palloc(nbytes); */
new_numberOfKeys = 0;
allEqualSoFar = true;
MemSet(xform, 0, nbytes); /*
* Initialize for processing of keys for attr 1.
*
* xform[i] holds a copy of the current scan key of strategy type i+1,
* if any; init[i] is TRUE if we have found such a key for this attr.
*/
attno = 1;
map = IndexStrategyGetStrategyMap(RelationGetIndexStrategy(relation), map = IndexStrategyGetStrategyMap(RelationGetIndexStrategy(relation),
BTMaxStrategyNumber, BTMaxStrategyNumber,
attno); attno);
for (j = 0; j <= BTMaxStrategyNumber; j++) MemSet(xform, 0, sizeof(xform)); /* not really necessary */
init[j] = 0; MemSet(init, 0, sizeof(init));
equalStrategyEnd = false; /*
underEqualStrategy = true; * Loop iterates from 0 to numberOfKeys inclusive; we use the last
/* check each key passed in */ * pass to handle after-last-key processing. Actual exit from the
for (i = 0;;) * loop is at the "break" statement below.
*/
for (i = 0; ; cur++, i++)
{ {
if (i < numberOfKeys) if (i < numberOfKeys)
cur = &key[i]; {
/* See comments above: any NULL implies cannot match qual */
if (cur->sk_flags & SK_ISNULL) /* see comments above */ if (cur->sk_flags & SK_ISNULL)
so->qual_ok = 0; {
so->qual_ok = false;
/* Quit processing so we don't try to invoke comparison
* routines on NULLs.
*/
return;
}
}
/*
* If we are at the end of the keys for a particular attr,
* finish up processing and emit the cleaned-up keys.
*/
if (i == numberOfKeys || cur->sk_attno != attno) if (i == numberOfKeys || cur->sk_attno != attno)
{ {
if (cur->sk_attno != attno + 1 && i < numberOfKeys) bool priorAllEqualSoFar = allEqualSoFar;
/* check input keys are correctly ordered */
if (i < numberOfKeys && cur->sk_attno != attno + 1)
elog(ERROR, "_bt_orderkeys: key(s) for attribute %d missed", elog(ERROR, "_bt_orderkeys: key(s) for attribute %d missed",
attno + 1); attno + 1);
underEqualStrategy = (!equalStrategyEnd);
/* /*
* If = has been specified, no other key will be used. In case * If = has been specified, no other key will be used. In case
* of key < 2 && key == 1 and so on we have to set qual_ok to * of key > 2 && key == 1 and so on we have to set qual_ok to
* 0 * false before discarding the other keys.
*/ */
if (init[BTEqualStrategyNumber - 1]) if (init[BTEqualStrategyNumber - 1])
{ {
@ -236,187 +296,222 @@ _bt_orderkeys(Relation relation, BTScanOpaque so)
eq = &xform[BTEqualStrategyNumber - 1]; eq = &xform[BTEqualStrategyNumber - 1];
for (j = BTMaxStrategyNumber; --j >= 0;) for (j = BTMaxStrategyNumber; --j >= 0;)
{ {
if (j == (BTEqualStrategyNumber - 1) || init[j] == 0) if (! init[j] ||
j == (BTEqualStrategyNumber - 1))
continue; continue;
chk = &xform[j]; chk = &xform[j];
test = OidFunctionCall2(chk->sk_procedure, test = OidFunctionCall2(chk->sk_procedure,
eq->sk_argument, chk->sk_argument); eq->sk_argument,
chk->sk_argument);
if (!DatumGetBool(test)) if (!DatumGetBool(test))
so->qual_ok = 0; so->qual_ok = false;
} }
init[BTLessStrategyNumber - 1] = 0; init[BTLessStrategyNumber - 1] = false;
init[BTLessEqualStrategyNumber - 1] = 0; init[BTLessEqualStrategyNumber - 1] = false;
init[BTGreaterEqualStrategyNumber - 1] = 0; init[BTGreaterEqualStrategyNumber - 1] = false;
init[BTGreaterStrategyNumber - 1] = 0; init[BTGreaterStrategyNumber - 1] = false;
} }
else else
equalStrategyEnd = true; {
/*
* No "=" for this key, so we're done with required keys
*/
allEqualSoFar = false;
}
/* only one of <, <= */ /* keep only one of <, <= */
if (init[BTLessStrategyNumber - 1] if (init[BTLessStrategyNumber - 1]
&& init[BTLessEqualStrategyNumber - 1]) && init[BTLessEqualStrategyNumber - 1])
{ {
ScanKeyData *lt, ScanKeyData *lt = &xform[BTLessStrategyNumber - 1];
*le; ScanKeyData *le = &xform[BTLessEqualStrategyNumber - 1];
lt = &xform[BTLessStrategyNumber - 1];
le = &xform[BTLessEqualStrategyNumber - 1];
/*
* DO NOT use the cached function stuff here -- this is
* key ordering, happens only when the user expresses a
* hokey qualification, and gets executed only once,
* anyway. The transform maps are hard-coded, and can't
* be initialized in the correct way.
*/
test = OidFunctionCall2(le->sk_procedure, test = OidFunctionCall2(le->sk_procedure,
lt->sk_argument, le->sk_argument); lt->sk_argument,
le->sk_argument);
if (DatumGetBool(test)) if (DatumGetBool(test))
init[BTLessEqualStrategyNumber - 1] = 0; init[BTLessEqualStrategyNumber - 1] = false;
else else
init[BTLessStrategyNumber - 1] = 0; init[BTLessStrategyNumber - 1] = false;
} }
/* only one of >, >= */ /* keep only one of >, >= */
if (init[BTGreaterStrategyNumber - 1] if (init[BTGreaterStrategyNumber - 1]
&& init[BTGreaterEqualStrategyNumber - 1]) && init[BTGreaterEqualStrategyNumber - 1])
{ {
ScanKeyData *gt, ScanKeyData *gt = &xform[BTGreaterStrategyNumber - 1];
*ge; ScanKeyData *ge = &xform[BTGreaterEqualStrategyNumber - 1];
gt = &xform[BTGreaterStrategyNumber - 1];
ge = &xform[BTGreaterEqualStrategyNumber - 1];
/* see note above on function cache */
test = OidFunctionCall2(ge->sk_procedure, test = OidFunctionCall2(ge->sk_procedure,
gt->sk_argument, ge->sk_argument); gt->sk_argument,
ge->sk_argument);
if (DatumGetBool(test)) if (DatumGetBool(test))
init[BTGreaterEqualStrategyNumber - 1] = 0; init[BTGreaterEqualStrategyNumber - 1] = false;
else else
init[BTGreaterStrategyNumber - 1] = 0; init[BTGreaterStrategyNumber - 1] = false;
} }
/* okay, reorder and count */ /*
* Emit the cleaned-up keys back into the key[] array in the
* correct order. Note we are overwriting our input here!
* It's OK because (a) xform[] is a physical copy of the keys
* we want, (b) we cannot emit more keys than we input, so
* we won't overwrite as-yet-unprocessed keys.
*/
for (j = BTMaxStrategyNumber; --j >= 0;) for (j = BTMaxStrategyNumber; --j >= 0;)
{
if (init[j]) if (init[j])
key[new_numberOfKeys++] = xform[j]; memcpy(&key[new_numberOfKeys++], &xform[j],
sizeof(ScanKeyData));
}
if (underEqualStrategy) /*
so->numberOfFirstKeys = new_numberOfKeys; * If all attrs before this one had "=", include these keys
* into the required-keys count.
*/
if (priorAllEqualSoFar)
so->numberOfRequiredKeys = new_numberOfKeys;
/*
* Exit loop here if done.
*/
if (i == numberOfKeys) if (i == numberOfKeys)
break; break;
/* initialization for new attno */ /* Re-initialize for new attno */
attno = cur->sk_attno; attno = cur->sk_attno;
MemSet(xform, 0, nbytes);
map = IndexStrategyGetStrategyMap(RelationGetIndexStrategy(relation), map = IndexStrategyGetStrategyMap(RelationGetIndexStrategy(relation),
BTMaxStrategyNumber, BTMaxStrategyNumber,
attno); attno);
/* haven't looked at any strategies yet */ MemSet(xform, 0, sizeof(xform)); /* not really necessary */
for (j = 0; j <= BTMaxStrategyNumber; j++) MemSet(init, 0, sizeof(init));
init[j] = 0;
} }
/*
* OK, figure out which strategy this key corresponds to
*/
for (j = BTMaxStrategyNumber; --j >= 0;) for (j = BTMaxStrategyNumber; --j >= 0;)
{ {
if (cur->sk_procedure == map->entry[j].sk_procedure) if (cur->sk_procedure == map->entry[j].sk_procedure)
break; break;
} }
if (j < 0)
elog(ERROR, "_bt_orderkeys: unable to identify operator %u",
cur->sk_procedure);
/* have we seen one of these before? */ /* have we seen one of these before? */
if (init[j]) if (init[j])
{ {
/* yup, use the appropriate value */ /* yup, keep the more restrictive value */
test = FunctionCall2(&cur->sk_func, test = FunctionCall2(&cur->sk_func,
cur->sk_argument, xform[j].sk_argument); cur->sk_argument,
xform[j].sk_argument);
if (DatumGetBool(test)) if (DatumGetBool(test))
xform[j].sk_argument = cur->sk_argument; xform[j].sk_argument = cur->sk_argument;
else if (j == (BTEqualStrategyNumber - 1)) else if (j == (BTEqualStrategyNumber - 1))
so->qual_ok = 0;/* key == a && key == b, but a != b */ so->qual_ok = false; /* key == a && key == b, but a != b */
} }
else else
{ {
/* nope, use this value */ /* nope, so remember this scankey */
memmove(&xform[j], cur, sizeof(*cur)); memcpy(&xform[j], cur, sizeof(ScanKeyData));
init[j] = 1; init[j] = true;
} }
i++;
} }
so->numberOfKeys = new_numberOfKeys; so->numberOfKeys = new_numberOfKeys;
pfree(xform);
} }
/* /*
* Test whether an indextuple satisfies all the scankey conditions * Test whether an indextuple satisfies all the scankey conditions.
* *
* If not ("false" return), the number of conditions satisfied is * If the tuple fails to pass the qual, we also determine whether there's
* returned in *keysok. Given proper ordering of the scankey conditions, * any need to continue the scan beyond this tuple, and set *continuescan
* we can use this to determine whether it's worth continuing the scan. * accordingly. See comments for _bt_orderkeys(), above, about how this is
* See _bt_orderkeys(). * done.
*
* HACK: *keysok == (Size) -1 means we stopped evaluating because we found
* a NULL value in the index tuple. It's not quite clear to me why this
* case has to be treated specially --- tgl 7/00.
*/ */
bool bool
_bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, Size *keysok) _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple,
ScanDirection dir, bool *continuescan)
{ {
BTScanOpaque so = (BTScanOpaque) scan->opaque; BTScanOpaque so = (BTScanOpaque) scan->opaque;
Size keysz = so->numberOfKeys; Size keysz = so->numberOfKeys;
TupleDesc tupdesc; TupleDesc tupdesc;
ScanKey key; ScanKey key;
Datum datum; Size keysok;
bool isNull;
Datum test;
*keysok = 0; *continuescan = true;
/* If no keys, always scan the whole index */
if (keysz == 0) if (keysz == 0)
return true; return true;
key = so->keyData;
tupdesc = RelationGetDescr(scan->relation); tupdesc = RelationGetDescr(scan->relation);
key = so->keyData;
keysok = 0;
IncrIndexProcessed(); IncrIndexProcessed();
while (keysz > 0) while (keysz > 0)
{ {
Datum datum;
bool isNull;
Datum test;
datum = index_getattr(tuple, datum = index_getattr(tuple,
key[0].sk_attno, key->sk_attno,
tupdesc, tupdesc,
&isNull); &isNull);
/* btree doesn't support 'A is null' clauses, yet */ /* btree doesn't support 'A is null' clauses, yet */
if (key[0].sk_flags & SK_ISNULL) if (key->sk_flags & SK_ISNULL)
{
/* we shouldn't get here, really; see _bt_orderkeys() */
*continuescan = false;
return false; return false;
}
if (isNull) if (isNull)
{ {
if (*keysok < so->numberOfFirstKeys) /*
*keysok = -1; * Since NULLs are sorted after non-NULLs, we know we have
* reached the upper limit of the range of values for this
* index attr. On a forward scan, we can stop if this qual
* is one of the "must match" subset. On a backward scan,
* however, we should keep going.
*/
if (keysok < so->numberOfRequiredKeys &&
ScanDirectionIsForward(dir))
*continuescan = false;
/*
* In any case, this indextuple doesn't match the qual.
*/
return false; return false;
} }
if (key[0].sk_flags & SK_COMMUTE) if (key->sk_flags & SK_COMMUTE)
{ test = FunctionCall2(&key->sk_func,
test = FunctionCall2(&key[0].sk_func, key->sk_argument, datum);
key[0].sk_argument, datum);
}
else else
test = FunctionCall2(&key->sk_func,
datum, key->sk_argument);
if (DatumGetBool(test) == !!(key->sk_flags & SK_NEGATE))
{ {
test = FunctionCall2(&key[0].sk_func, /*
datum, key[0].sk_argument); * Tuple fails this qual. If it's a required qual, then
* we can conclude no further tuples will pass, either.
*/
if (keysok < so->numberOfRequiredKeys)
*continuescan = false;
return false;
} }
if (DatumGetBool(test) == !!(key[0].sk_flags & SK_NEGATE)) keysok++;
return false;
(*keysok)++;
key++; key++;
keysz--; keysz--;
} }
/* If we get here, the tuple passes all quals. */
return true; return true;
} }

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: nbtree.h,v 1.39 2000/07/21 06:42:35 tgl Exp $ * $Id: nbtree.h,v 1.40 2000/07/25 04:47:57 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -47,13 +47,12 @@ typedef struct BTPageOpaqueData
typedef BTPageOpaqueData *BTPageOpaque; typedef BTPageOpaqueData *BTPageOpaque;
/* /*
* ScanOpaqueData is used to remember which buffers we're currently * BTScanOpaqueData is used to remember which buffers we're currently
* examining in the scan. We keep these buffers locked and pinned * examining in the scan. We keep these buffers pinned (but not locked,
* and recorded in the opaque entry of the scan in order to avoid * see nbtree.c) and recorded in the opaque entry of the scan to avoid
* doing a ReadBuffer() for every tuple in the index. This avoids * doing a ReadBuffer() for every tuple in the index.
* semop() calls, which are expensive.
* *
* And it's used to remember actual scankey info (we need in it * And it's used to remember actual scankey info (we need it
* if some scankeys evaled at runtime). * if some scankeys evaled at runtime).
* *
* curHeapIptr & mrkHeapIptr are heap iptr-s from current/marked * curHeapIptr & mrkHeapIptr are heap iptr-s from current/marked
@ -69,11 +68,12 @@ typedef struct BTScanOpaqueData
Buffer btso_mrkbuf; Buffer btso_mrkbuf;
ItemPointerData curHeapIptr; ItemPointerData curHeapIptr;
ItemPointerData mrkHeapIptr; ItemPointerData mrkHeapIptr;
uint16 qual_ok; /* 0 for quals like key == 1 && key > 2 */ /* these fields are set by _bt_orderkeys(), which see for more info: */
uint16 numberOfKeys; /* number of keys */ bool qual_ok; /* false if qual can never be satisfied */
uint16 numberOfFirstKeys; /* number of keys for 1st uint16 numberOfKeys; /* number of scan keys */
* attribute */ uint16 numberOfRequiredKeys; /* number of keys that must be matched
ScanKey keyData; /* key descriptor */ * to continue the scan */
ScanKey keyData; /* array of scan keys */
} BTScanOpaqueData; } BTScanOpaqueData;
typedef BTScanOpaqueData *BTScanOpaque; typedef BTScanOpaqueData *BTScanOpaque;
@ -276,7 +276,8 @@ extern ScanKey _bt_mkscankey_nodata(Relation rel);
extern void _bt_freeskey(ScanKey skey); extern void _bt_freeskey(ScanKey skey);
extern void _bt_freestack(BTStack stack); extern void _bt_freestack(BTStack stack);
extern void _bt_orderkeys(Relation relation, BTScanOpaque so); extern void _bt_orderkeys(Relation relation, BTScanOpaque so);
extern bool _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, Size *keysok); extern bool _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple,
ScanDirection dir, bool *continuescan);
extern BTItem _bt_formitem(IndexTuple itup); extern BTItem _bt_formitem(IndexTuple itup);
/* /*