From d2158b02813bb44116988eeb10d9a32565b58c1c Mon Sep 17 00:00:00 2001 From: Teodor Sigaev Date: Wed, 24 May 2006 11:01:39 +0000 Subject: [PATCH] * Add support NULL to GiST. * some refactoring and simplify code int gistutil.c and gist.c * now in some cases it can be called used-defined picksplit method for non-first column in index, but here is a place to do more. * small fix of docs related to support NULL. --- doc/src/sgml/indexam.sgml | 5 +- doc/src/sgml/indices.sgml | 16 +- src/backend/access/gist/gist.c | 256 +++++++++------- src/backend/access/gist/gistget.c | 19 +- src/backend/access/gist/gistutil.c | 476 ++++++++++++----------------- src/include/access/gist_private.h | 25 +- src/include/catalog/catversion.h | 4 +- src/include/catalog/pg_am.h | 4 +- 8 files changed, 375 insertions(+), 430 deletions(-) diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml index 61956cdfcf..70fba4ecc0 100644 --- a/doc/src/sgml/indexam.sgml +++ b/doc/src/sgml/indexam.sgml @@ -1,4 +1,4 @@ - + Index Access Method Interface Definition @@ -126,8 +126,7 @@ used to scan for rows with a = 4, which is wrong if the index omits rows where b is null. It is, however, OK to omit rows where the first indexed column is null. - (GiST currently does so.) Thus, - amindexnulls should be set true only if the + Thus, amindexnulls should be set true only if the index access method indexes all rows, including arbitrary combinations of null values. diff --git a/doc/src/sgml/indices.sgml b/doc/src/sgml/indices.sgml index 9bb19c2cee..10669c0155 100644 --- a/doc/src/sgml/indices.sgml +++ b/doc/src/sgml/indices.sgml @@ -1,4 +1,4 @@ - + Indexes @@ -290,13 +290,13 @@ CREATE INDEX test2_mm_idx ON test2 (major, minor); - A multicolumn GiST index can only be used when there is a query condition - on its leading column. Conditions on additional columns restrict the - entries returned by the index, but the condition on the first column is the - most important one for determining how much of the index needs to be - scanned. A GiST index will be relatively ineffective if its first column - has only a few distinct values, even if there are many distinct values in - additional columns. + A multicolumn GiST index can be used with query conditions that + involve any subset of the index's columns. Conditions on additional + columns restrict the entries returned by the index, but the condition on + the first column is the most important one for determining how much of + the index needs to be scanned. A GiST index will be relatively + ineffective if its first column has only a few distinct values, even if + there are many distinct values in additional columns. diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index cb10cbc35b..54ac45ee2f 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.136 2006/05/19 16:15:17 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.137 2006/05/24 11:01:39 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -181,32 +181,13 @@ gistbuildCallback(Relation index, { GISTBuildState *buildstate = (GISTBuildState *) state; IndexTuple itup; - GISTENTRY tmpcentry; - int i; MemoryContext oldCtx; - /* GiST cannot index tuples with leading NULLs */ - if (isnull[0]) - return; - oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx); - /* immediately compress keys to normalize */ - for (i = 0; i < buildstate->numindexattrs; i++) - { - if (isnull[i]) - values[i] = (Datum) 0; - else - { - gistcentryinit(&buildstate->giststate, i, &tmpcentry, values[i], - NULL, NULL, (OffsetNumber) 0, - -1 /* size is currently bogus */ , TRUE, FALSE); - values[i] = tmpcentry.key; - } - } - /* form an index tuple and point it at the heap tuple */ - itup = index_form_tuple(buildstate->giststate.tupdesc, values, isnull); + itup = gistFormTuple(&buildstate->giststate, index, + values, NULL /* size is currently bogus */, isnull); itup->t_tid = htup->t_self; /* @@ -243,34 +224,16 @@ gistinsert(PG_FUNCTION_ARGS) #endif IndexTuple itup; GISTSTATE giststate; - GISTENTRY tmpentry; - int i; MemoryContext oldCtx; MemoryContext insertCtx; - /* GiST cannot index tuples with leading NULLs */ - if (isnull[0]) - PG_RETURN_BOOL(false); - insertCtx = createTempGistContext(); oldCtx = MemoryContextSwitchTo(insertCtx); initGISTstate(&giststate, r); - /* immediately compress keys to normalize */ - for (i = 0; i < r->rd_att->natts; i++) - { - if (isnull[i]) - values[i] = (Datum) 0; - else - { - gistcentryinit(&giststate, i, &tmpentry, values[i], - NULL, NULL, (OffsetNumber) 0, - -1 /* size is currently bogus */ , TRUE, FALSE); - values[i] = tmpentry.key; - } - } - itup = index_form_tuple(giststate.tupdesc, values, isnull); + itup = gistFormTuple(&giststate, r, + values, NULL /* size is currently bogus */, isnull); itup->t_tid = *ht_ctid; gistdoinsert(r, itup, &giststate); @@ -937,7 +900,147 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) } /* - * gistSplit -- split a page in the tree. + * simple split page + */ +static void +gistSplitHalf(GIST_SPLITVEC *v, int len) { + int i; + + v->spl_nright = v->spl_nleft = 0; + v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber)); + v->spl_right= (OffsetNumber *) palloc(len * sizeof(OffsetNumber)); + for(i = 1; i <= len; i++) + if ( ispl_right[ v->spl_nright++ ] = i; + else + v->spl_left[ v->spl_nleft++ ] = i; +} + +/* + * if it was invalid tuple then we need special processing. + * We move all invalid tuples on right page. + * + * if there is no place on left page, gistSplit will be called one more + * time for left page. + * + * Normally, we never exec this code, but after crash replay it's possible + * to get 'invalid' tuples (probability is low enough) + */ +static void +gistSplitByInvalid(GISTSTATE *giststate, GIST_SPLITVEC *v, IndexTuple *itup, int len) { + int i; + static OffsetNumber offInvTuples[ MaxOffsetNumber ]; + int nOffInvTuples = 0; + + for (i = 1; i <= len; i++) + if ( GistTupleIsInvalid(itup[i - 1]) ) + offInvTuples[ nOffInvTuples++ ] = i; + + if ( nOffInvTuples == len ) { + /* corner case, all tuples are invalid */ + v->spl_rightvalid= v->spl_leftvalid = false; + gistSplitHalf( v, len ); + } else { + GistSplitVec gsvp; + + v->spl_right = offInvTuples; + v->spl_nright = nOffInvTuples; + v->spl_rightvalid = false; + + v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber)); + v->spl_nleft = 0; + for(i = 1; i <= len; i++) + if ( !GistTupleIsInvalid(itup[i - 1]) ) + v->spl_left[ v->spl_nleft++ ] = i; + v->spl_leftvalid = true; + + gsvp.idgrp = NULL; + gsvp.attrsize = v->spl_lattrsize; + gsvp.attr = v->spl_lattr; + gsvp.len = v->spl_nleft; + gsvp.entries = v->spl_left; + gsvp.isnull = v->spl_lisnull; + + gistunionsubkeyvec(giststate, itup, &gsvp, 0); + } +} + +/* + * trys to split page by attno key, in a case of null + * values move its to separate page. + */ +static void +gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate, + GIST_SPLITVEC *v, GistEntryVector *entryvec, int attno) { + int i; + static OffsetNumber offNullTuples[ MaxOffsetNumber ]; + int nOffNullTuples = 0; + + + for (i = 1; i <= len; i++) { + Datum datum; + bool IsNull; + + if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1])) { + gistSplitByInvalid(giststate, v, itup, len); + return; + } + + datum = index_getattr(itup[i - 1], attno+1, giststate->tupdesc, &IsNull); + gistdentryinit(giststate, attno, &(entryvec->vector[i]), + datum, r, page, i, + ATTSIZE(datum, giststate->tupdesc, attno+1, IsNull), + FALSE, IsNull); + if ( IsNull ) + offNullTuples[ nOffNullTuples++ ] = i; + } + + v->spl_leftvalid = v->spl_rightvalid = true; + + if ( nOffNullTuples == len ) { + /* + * Corner case: All keys in attno column are null, we should try to + * by keys in next column. It all keys in all columns + * are NULL just split page half by half + */ + v->spl_risnull[attno] = v->spl_lisnull[attno] = TRUE; + if ( attno+1 == r->rd_att->natts ) + gistSplitHalf( v, len ); + else + gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno+1); + } else if ( nOffNullTuples > 0 ) { + int j=0; + + /* + * We don't want to mix NULLs and not-NULLs keys + * on one page, so move nulls to right page + */ + v->spl_right = offNullTuples; + v->spl_nright = nOffNullTuples; + v->spl_risnull[attno] = TRUE; + + v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber)); + v->spl_nleft = 0; + for(i = 1; i <= len; i++) + if ( jspl_nright && offNullTuples[j] == i ) + j++; + else + v->spl_left[ v->spl_nleft++ ] = i; + + v->spl_idgrp = NULL; + gistunionsubkey(giststate, itup, v, 0); + } else { + /* + * all keys are not-null + */ + gistUserPicksplit(r, entryvec, attno, v, itup, len, giststate); + } +} + +/* + * gistSplit -- split a page in the tree and fill struct + * used for XLOG and real writes buffers. Function is recursive, ie + * it will split page until keys will fit in every page. */ SplitedPageLayout * gistSplit(Relation r, @@ -951,77 +1054,14 @@ gistSplit(Relation r, GIST_SPLITVEC v; GistEntryVector *entryvec; int i; - OffsetNumber offInvTuples[ MaxOffsetNumber ]; - int nOffInvTuples = 0; SplitedPageLayout *res = NULL; /* generate the item array */ entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY)); entryvec->n = len + 1; - for (i = 1; i <= len; i++) - { - Datum datum; - bool IsNull; - - if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1])) - /* remember position of invalid tuple */ - offInvTuples[ nOffInvTuples++ ] = i; - - if ( nOffInvTuples > 0 ) - /* we can safely do not decompress other keys, because - we will do splecial processing, but - it's needed to find another invalid tuples */ - continue; - - datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull); - gistdentryinit(giststate, 0, &(entryvec->vector[i]), - datum, r, page, i, - ATTSIZE(datum, giststate->tupdesc, 1, IsNull), - FALSE, IsNull); - } - - /* - * if it was invalid tuple then we need special processing. - * We move all invalid tuples on right page. - * - * if there is no place on left page, gistSplit will be called one more - * time for left page. - * - * Normally, we never exec this code, but after crash replay it's possible - * to get 'invalid' tuples (probability is low enough) - */ - if (nOffInvTuples > 0) - { - GistSplitVec gsvp; - - v.spl_right = offInvTuples; - v.spl_nright = nOffInvTuples; - v.spl_rightvalid = false; - - v.spl_left = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber)); - v.spl_nleft = 0; - for(i = 1; i <= len; i++) - if ( !GistTupleIsInvalid(itup[i - 1]) ) - v.spl_left[ v.spl_nleft++ ] = i; - v.spl_leftvalid = true; - - gsvp.idgrp = NULL; - gsvp.attrsize = v.spl_lattrsize; - gsvp.attr = v.spl_lattr; - gsvp.len = v.spl_nleft; - gsvp.entries = v.spl_left; - gsvp.isnull = v.spl_lisnull; - - gistunionsubkeyvec(giststate, itup, &gsvp, true); - } - else - { - /* there is no invalid tuples, so usial processing */ - gistUserPicksplit(r, entryvec, &v, itup, len, giststate); - v.spl_leftvalid = v.spl_rightvalid = true; - } - + gistSplitByKey(r, page, itup, len, giststate, + &v, entryvec, 0); /* form left and right vector */ lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1)); diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c index 55d45b4a03..dcb8f2da54 100644 --- a/src/backend/access/gist/gistget.c +++ b/src/backend/access/gist/gistget.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.56 2006/03/05 15:58:20 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.57 2006/05/24 11:01:39 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -361,7 +361,7 @@ gistindex_keytest(IndexTuple tuple, IncrIndexProcessed(); /* - * Tuple doesn't restore after crash recovery because of inclomplete + * Tuple doesn't restore after crash recovery because of incomplete * insert */ if (!GistPageIsLeaf(p) && GistTupleIsInvalid(tuple)) @@ -378,14 +378,15 @@ gistindex_keytest(IndexTuple tuple, key->sk_attno, giststate->tupdesc, &isNull); - /* is the index entry NULL? */ - if (isNull) - { - /* XXX eventually should check if SK_ISNULL */ + + if ( key->sk_flags & SK_ISNULL ) { + /* is the compared-to datum NULL? on non-leaf page it's possible + to have nulls in childs :( */ + + if ( isNull || !GistPageIsLeaf(p) ) + return true; return false; - } - /* is the compared-to datum NULL? */ - if (key->sk_flags & SK_ISNULL) + } else if ( isNull ) return false; gistdentryinit(giststate, key->sk_attno - 1, &de, diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c index 92798a27d3..3db72aa199 100644 --- a/src/backend/access/gist/gistutil.c +++ b/src/backend/access/gist/gistutil.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.13 2006/05/19 16:15:17 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.14 2006/05/24 11:01:39 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -26,30 +26,18 @@ #define RIGHT_ADDED 0x02 #define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED ) +static float gistpenalty(GISTSTATE *giststate, int attno, + GISTENTRY *key1, bool isNull1, + GISTENTRY *key2, bool isNull2); + /* - * This defines is only for shorter code, used in gistgetadjusted - * and gistadjsubkey only + * static *S used for temrorary storage (saves stack and palloc() call) */ -#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb) do { \ - if (isnullkey) { \ - gistentryinit((evp), rkey, r, NULL, \ - (OffsetNumber) 0, rkeyb, FALSE); \ - } else { \ - gistentryinit((evp), okey, r, NULL, \ - (OffsetNumber) 0, okeyb, FALSE); \ - } \ -} while(0) -#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \ - FILLITEM(*ev0p, isnull1, key1, key1b, key2, key2b); \ - FILLITEM(*ev1p, isnull2, key2, key2b, key1, key1b); \ -} while(0); - - -static void gistpenalty(GISTSTATE *giststate, int attno, - GISTENTRY *key1, bool isNull1, - GISTENTRY *key2, bool isNull2, float *penalty); +static int attrsizeS[INDEX_MAX_KEYS]; +static Datum attrS[INDEX_MAX_KEYS]; +static bool isnullS[INDEX_MAX_KEYS]; /* * Write itup vector to page, has no control of free space @@ -164,87 +152,115 @@ gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) { } /* - * Return an IndexTuple containing the result of applying the "union" - * method to the specified IndexTuple vector. + * Make unions of keys in IndexTuple vector, return FALSE if itvec contains + * invalid tuple. Resulting Datums aren't compressed. */ -IndexTuple -gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) -{ - Datum attr[INDEX_MAX_KEYS]; - bool isnull[INDEX_MAX_KEYS]; - GistEntryVector *evec; + +static bool +gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey, + Datum *attr, bool *isnull, int *attrsize ) { int i; - GISTENTRY centry[INDEX_MAX_KEYS]; - IndexTuple res; + GistEntryVector *evec; evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); - for (i = 0; i < len; i++) - if (GistTupleIsInvalid(itvec[i])) - return gist_form_invalid_tuple(InvalidBlockNumber); + for (i = startkey; i < giststate->tupdesc->natts; i++) { + int j; - for (i = 0; i < r->rd_att->natts; i++) - { - Datum datum; - int j; - int real_len; + evec->n = 0; - real_len = 0; - for (j = 0; j < len; j++) - { - bool IsNull; + for (j = 0; j < len; j++) { + Datum datum; + bool IsNull; + + if (GistTupleIsInvalid(itvec[j])) + return FALSE; /* signals that union with invalid tuple => result is invalid */ datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull); if (IsNull) continue; gistdentryinit(giststate, i, - &(evec->vector[real_len]), + evec->vector + evec->n, datum, NULL, NULL, (OffsetNumber) 0, ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull), FALSE, IsNull); - real_len++; + evec->n++; } /* If this tuple vector was all NULLs, the union is NULL */ - if (real_len == 0) - { + if ( evec->n == 0 ) { attr[i] = (Datum) 0; + attrsize[i] = (Datum) 0; isnull[i] = TRUE; - } - else - { - int datumsize; - - if (real_len == 1) - { + } else { + if (evec->n == 1) { evec->n = 2; - gistentryinit(evec->vector[1], - evec->vector[0].key, r, NULL, - (OffsetNumber) 0, evec->vector[0].bytes, FALSE); - } - else - evec->n = real_len; + evec->vector[1] = evec->vector[0]; + } - /* Compress the result of the union and store in attr array */ - datum = FunctionCall2(&giststate->unionFn[i], + /* Make union and store in attr array */ + attr[i] = FunctionCall2(&giststate->unionFn[i], PointerGetDatum(evec), - PointerGetDatum(&datumsize)); + PointerGetDatum(attrsize + i)); - gistcentryinit(giststate, i, ¢ry[i], datum, - NULL, NULL, (OffsetNumber) 0, - datumsize, FALSE, FALSE); isnull[i] = FALSE; - attr[i] = centry[i].key; } } - res = index_form_tuple(giststate->tupdesc, attr, isnull); - GistTupleSetValid(res); - return res; + return TRUE; } +/* + * Return an IndexTuple containing the result of applying the "union" + * method to the specified IndexTuple vector. + */ +IndexTuple +gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) +{ + if ( !gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS, attrsizeS) ) + return gist_form_invalid_tuple(InvalidBlockNumber); + + return gistFormTuple(giststate, r, attrS, attrsizeS, isnullS); +} + +/* + * makes union of two key + */ +static void +gistMakeUnionKey( GISTSTATE *giststate, int attno, + GISTENTRY *entry1, bool isnull1, + GISTENTRY *entry2, bool isnull2, + Datum *dst, int *dstsize, bool *dstisnull ) { + + static char storage[ 2 * sizeof(GISTENTRY) + GEVHDRSZ ]; + GistEntryVector *evec = (GistEntryVector*)storage; + + evec->n = 2; + + if ( isnull1 && isnull2 ) { + *dstisnull = TRUE; + *dst = (Datum)0; + *dstsize = 0; + } else { + if ( isnull1 == FALSE && isnull2 == FALSE ) { + evec->vector[0] = *entry1; + evec->vector[1] = *entry2; + } else if ( isnull1 == FALSE ) { + evec->vector[0] = *entry1; + evec->vector[1] = *entry1; + } else { + evec->vector[0] = *entry2; + evec->vector[1] = *entry2; + } + + *dstisnull = FALSE; + *dst = FunctionCall2(&giststate->unionFn[attno], + PointerGetDatum(evec), + PointerGetDatum(dstsize)); + } +} /* * Forms union of oldtup and addtup, if union == oldtup then return NULL @@ -252,15 +268,9 @@ gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) IndexTuple gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate) { - GistEntryVector *evec; - bool neednew = false; - bool isnull[INDEX_MAX_KEYS]; - Datum attr[INDEX_MAX_KEYS]; - GISTENTRY centry[INDEX_MAX_KEYS], - oldatt[INDEX_MAX_KEYS], - addatt[INDEX_MAX_KEYS], - *ev0p, - *ev1p; + bool neednew = FALSE; + GISTENTRY oldentries[INDEX_MAX_KEYS], + addentries[INDEX_MAX_KEYS]; bool oldisnull[INDEX_MAX_KEYS], addisnull[INDEX_MAX_KEYS]; IndexTuple newtup = NULL; @@ -269,147 +279,83 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis if (GistTupleIsInvalid(oldtup) || GistTupleIsInvalid(addtup)) return gist_form_invalid_tuple(ItemPointerGetBlockNumber(&(oldtup->t_tid))); - evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ); - evec->n = 2; - ev0p = &(evec->vector[0]); - ev1p = &(evec->vector[1]); - - gistDeCompressAtt(giststate, r, oldtup, NULL, - (OffsetNumber) 0, oldatt, oldisnull); + (OffsetNumber) 0, oldentries, oldisnull); gistDeCompressAtt(giststate, r, addtup, NULL, - (OffsetNumber) 0, addatt, addisnull); + (OffsetNumber) 0, addentries, addisnull); - for (i = 0; i < r->rd_att->natts; i++) - { - if (oldisnull[i] && addisnull[i]) - { - attr[i] = (Datum) 0; - isnull[i] = TRUE; - } - else - { - Datum datum; - int datumsize; + for(i = 0; i < r->rd_att->natts; i++) { + gistMakeUnionKey( giststate, i, + oldentries + i, oldisnull[i], + addentries + i, addisnull[i], + attrS + i, attrsizeS + i, isnullS + i ); - FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes, - addisnull[i], addatt[i].key, addatt[i].bytes); + if ( neednew ) + /* we already need new key, so we can skip check */ + continue; - datum = FunctionCall2(&giststate->unionFn[i], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); + if ( isnullS[i] ) + /* union of key may be NULL if and only if both keys are NULL */ + continue; - if (oldisnull[i] || addisnull[i]) - { - if (oldisnull[i]) - neednew = true; - } - else - { - bool result; + if ( !addisnull[i] ) { + if ( oldisnull[i] ) + neednew = true; + else { + bool result; FunctionCall3(&giststate->equalFn[i], - ev0p->key, - datum, - PointerGetDatum(&result)); + oldentries[i].key, + attrS[i], + PointerGetDatum(&result)); if (!result) neednew = true; } - - gistcentryinit(giststate, i, ¢ry[i], datum, - NULL, NULL, (OffsetNumber) 0, - datumsize, FALSE, FALSE); - - attr[i] = centry[i].key; - isnull[i] = FALSE; } } if (neednew) { /* need to update key */ - newtup = index_form_tuple(giststate->tupdesc, attr, isnull); + newtup = gistFormTuple(giststate, r, attrS, attrsizeS, isnullS); newtup->t_tid = oldtup->t_tid; } return newtup; } +/* + * Forms unions of subkeys after page split, but + * uses only tuples aren't in groups of equalent tuples + */ void gistunionsubkeyvec(GISTSTATE *giststate, IndexTuple *itvec, - GistSplitVec *gsvp, bool isall) { - int i; - GistEntryVector *evec; + GistSplitVec *gsvp, int startkey) { + IndexTuple *cleanedItVec; + int i, cleanedLen=0; - evec = palloc(((gsvp->len < 2) ? 2 : gsvp->len) * sizeof(GISTENTRY) + GEVHDRSZ); + cleanedItVec = (IndexTuple*)palloc(sizeof(IndexTuple) * gsvp->len); - for (i = (isall) ? 0 : 1; i < giststate->tupdesc->natts; i++) - { - int j; - Datum datum; - int datumsize; - int real_len; + for(i=0;ilen;i++) { + if ( gsvp->idgrp && gsvp->idgrp[gsvp->entries[i]]) + continue; - real_len = 0; - for (j = 0; j < gsvp->len; j++) - { - bool IsNull; - - if ( gsvp->idgrp && gsvp->idgrp[gsvp->entries[j]]) - continue; - - datum = index_getattr(itvec[gsvp->entries[j] - 1], i + 1, - giststate->tupdesc, &IsNull); - if (IsNull) - continue; - gistdentryinit(giststate, i, - &(evec->vector[real_len]), - datum, - NULL, NULL, (OffsetNumber) 0, - ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull), - FALSE, IsNull); - real_len++; - - } - - if (real_len == 0) - { - datum = (Datum) 0; - datumsize = 0; - gsvp->isnull[i] = true; - } - else - { - /* - * evec->vector[0].bytes may be not defined, so form union - * with itself - */ - if (real_len == 1) - { - evec->n = 2; - memcpy(&(evec->vector[1]), &(evec->vector[0]), - sizeof(GISTENTRY)); - } - else - evec->n = real_len; - datum = FunctionCall2(&giststate->unionFn[i], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - gsvp->isnull[i] = false; - } - - gsvp->attr[i] = datum; - gsvp->attrsize[i] = datumsize; + cleanedItVec[cleanedLen++] = itvec[gsvp->entries[i] - 1]; } + + gistMakeUnionItVec(giststate, cleanedItVec, cleanedLen, startkey, + gsvp->attr, gsvp->isnull, gsvp->attrsize); + + pfree( cleanedItVec ); } /* - * unions subkey for after user picksplit over first column + * unions subkeys for after user picksplit over attno-1 column */ -static void -gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl) +void +gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl, int attno) { GistSplitVec gsvp; @@ -421,7 +367,7 @@ gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl) gsvp.entries = spl->spl_left; gsvp.isnull = spl->spl_lisnull; - gistunionsubkeyvec(giststate, itvec, &gsvp, false); + gistunionsubkeyvec(giststate, itvec, &gsvp, attno); gsvp.attrsize = spl->spl_rattrsize; gsvp.attr = spl->spl_rattr; @@ -429,20 +375,20 @@ gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl) gsvp.entries = spl->spl_right; gsvp.isnull = spl->spl_risnull; - gistunionsubkeyvec(giststate, itvec, &gsvp, false); + gistunionsubkeyvec(giststate, itvec, &gsvp, attno); } /* * find group in vector with equal value */ -int -gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl) +static int +gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl, int attno) { int i; int curid = 1; /* - * first key is always not null (see gistinsert), so we may not check for + * attno key is always not null (see gistSplitByKey), so we may not check for * nulls */ for (i = 0; i < spl->spl_nleft; i++) @@ -459,7 +405,7 @@ gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl) { if (spl->spl_idgrp[spl->spl_right[j]]) continue; - FunctionCall3(&giststate->equalFn[0], + FunctionCall3(&giststate->equalFn[attno], valvec[spl->spl_left[i]].key, valvec[spl->spl_right[j]].key, PointerGetDatum(&result)); @@ -479,7 +425,7 @@ gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl) { if (spl->spl_idgrp[spl->spl_left[j]]) continue; - FunctionCall3(&giststate->equalFn[0], + FunctionCall3(&giststate->equalFn[attno], valvec[spl->spl_left[i]].key, valvec[spl->spl_left[j]].key, PointerGetDatum(&result)); @@ -501,23 +447,20 @@ gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl) * Insert equivalent tuples to left or right page with minimum * penalty */ -void +static void gistadjsubkey(Relation r, IndexTuple *itup, /* contains compressed entry */ int len, GIST_SPLITVEC *v, - GISTSTATE *giststate) + GISTSTATE *giststate, + int attno) { int curlen; OffsetNumber *curwpos; GISTENTRY entry, - identry[INDEX_MAX_KEYS], - *ev0p, - *ev1p; - float lpenalty, - rpenalty; - GistEntryVector *evec; - int datumsize; + identry[INDEX_MAX_KEYS]; + float lpenalty = 0, + rpenalty = 0; bool isnull[INDEX_MAX_KEYS]; int i, j; @@ -551,16 +494,9 @@ gistadjsubkey(Relation r, } v->spl_nright = curlen; - evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ); - evec->n = 2; - ev0p = &(evec->vector[0]); - ev1p = &(evec->vector[1]); - /* add equivalent tuple */ for (i = 0; i < len; i++) { - Datum datum; - if (v->spl_idgrp[i + 1] == 0) /* already inserted */ continue; gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0, @@ -577,17 +513,17 @@ gistadjsubkey(Relation r, else { /* where? */ - for (j = 1; j < r->rd_att->natts; j++) + for (j = attno+1; j < r->rd_att->natts; j++) { gistentryinit(entry, v->spl_lattr[j], r, NULL, (OffsetNumber) 0, v->spl_lattrsize[j], FALSE); - gistpenalty(giststate, j, &entry, v->spl_lisnull[j], - &identry[j], isnull[j], &lpenalty); + lpenalty = gistpenalty(giststate, j, &entry, v->spl_lisnull[j], + &identry[j], isnull[j]); gistentryinit(entry, v->spl_rattr[j], r, NULL, (OffsetNumber) 0, v->spl_rattrsize[j], FALSE); - gistpenalty(giststate, j, &entry, v->spl_risnull[j], - &identry[j], isnull[j], &rpenalty); + rpenalty = gistpenalty(giststate, j, &entry, v->spl_risnull[j], + &identry[j], isnull[j]); if (lpenalty != rpenalty) break; @@ -600,55 +536,31 @@ gistadjsubkey(Relation r, if (lpenalty < rpenalty) { v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED; - v->spl_left[v->spl_nleft] = i + 1; - v->spl_nleft++; - for (j = 1; j < r->rd_att->natts; j++) + v->spl_left[v->spl_nleft++] = i + 1; + + for (j = attno+1; j < r->rd_att->natts; j++) { - if (isnull[j] && v->spl_lisnull[j]) - { - v->spl_lattr[j] = (Datum) 0; - v->spl_lattrsize[j] = 0; - } - else - { - FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j], - isnull[j], identry[j].key, identry[j].bytes); - - datum = FunctionCall2(&giststate->unionFn[j], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - - v->spl_lattr[j] = datum; - v->spl_lattrsize[j] = datumsize; - v->spl_lisnull[j] = false; - } + gistentryinit(entry, v->spl_lattr[j], r, NULL, + (OffsetNumber) 0, v->spl_lattrsize[j], FALSE); + gistMakeUnionKey( giststate, j, + &entry, v->spl_lisnull[j], + identry + j, isnull[j], + v->spl_lattr + j, v->spl_lattrsize + j, v->spl_lisnull + j ); } } else { v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED; - v->spl_right[v->spl_nright] = i + 1; - v->spl_nright++; - for (j = 1; j < r->rd_att->natts; j++) + v->spl_right[v->spl_nright++] = i + 1; + + for (j = attno+1; j < r->rd_att->natts; j++) { - if (isnull[j] && v->spl_risnull[j]) - { - v->spl_rattr[j] = (Datum) 0; - v->spl_rattrsize[j] = 0; - } - else - { - FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j], - isnull[j], identry[j].key, identry[j].bytes); - - datum = FunctionCall2(&giststate->unionFn[j], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - - v->spl_rattr[j] = datum; - v->spl_rattrsize[j] = datumsize; - v->spl_risnull[j] = false; - } + gistentryinit(entry, v->spl_rattr[j], r, NULL, + (OffsetNumber) 0, v->spl_rattrsize[j], FALSE); + gistMakeUnionKey( giststate, j, + &entry, v->spl_risnull[j], + identry + j, isnull[j], + v->spl_rattr + j, v->spl_rattrsize + j, v->spl_risnull + j ); } } } @@ -702,8 +614,8 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ gistdentryinit(giststate, j, &entry, datum, r, p, i, ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull), FALSE, IsNull); - gistpenalty(giststate, j, &entry, IsNull, - &identry[j], isnull[j], &usize); + usize = gistpenalty(giststate, j, &entry, IsNull, + &identry[j], isnull[j]); if (which_grow[j] < 0 || usize < which_grow[j]) { @@ -796,8 +708,10 @@ gistFormTuple(GISTSTATE *giststate, Relation r, else { gistcentryinit(giststate, i, ¢ry[i], attdata[i], - NULL, NULL, (OffsetNumber) 0, - datumsize[i], FALSE, FALSE); + r, NULL, (OffsetNumber) 0, + (datumsize) ? datumsize[i] : -1, + (datumsize) ? FALSE : TRUE, + FALSE); compatt[i] = centry[i].key; } } @@ -824,29 +738,35 @@ gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, } } -static void +static float gistpenalty(GISTSTATE *giststate, int attno, - GISTENTRY *key1, bool isNull1, - GISTENTRY *key2, bool isNull2, float *penalty) + GISTENTRY *orig, bool isNullOrig, + GISTENTRY *add, bool isNullAdd) { - if (giststate->penaltyFn[attno].fn_strict && (isNull1 || isNull2)) - *penalty = 0.0; - else + float penalty = 0.0; + + if ( giststate->penaltyFn[attno].fn_strict==FALSE || ( isNullOrig == FALSE && isNullAdd == FALSE ) ) FunctionCall3(&giststate->penaltyFn[attno], - PointerGetDatum(key1), - PointerGetDatum(key2), - PointerGetDatum(penalty)); + PointerGetDatum(orig), + PointerGetDatum(add), + PointerGetDatum(&penalty)); + else if ( isNullOrig && isNullAdd ) + penalty = 0.0; + else + penalty = 1e10; /* try to prevent to mix null and non-null value */ + + return penalty; } void -gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v, +gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GIST_SPLITVEC *v, IndexTuple *itup, int len, GISTSTATE *giststate) { /* * now let the user-defined picksplit function set up the split vector; in * entryvec have no null value!! */ - FunctionCall2(&giststate->picksplitFn[0], + FunctionCall2(&giststate->picksplitFn[attno], PointerGetDatum(entryvec), PointerGetDatum(v)); @@ -856,16 +776,16 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v, if (v->spl_right[v->spl_nright - 1] == InvalidOffsetNumber) v->spl_right[v->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1); - v->spl_lattr[0] = v->spl_ldatum; - v->spl_rattr[0] = v->spl_rdatum; - v->spl_lisnull[0] = false; - v->spl_risnull[0] = false; + v->spl_lattr[attno] = v->spl_ldatum; + v->spl_rattr[attno] = v->spl_rdatum; + v->spl_lisnull[attno] = false; + v->spl_risnull[attno] = false; /* * if index is multikey, then we must to try get smaller bounding box for * subkey(s) */ - if (giststate->tupdesc->natts > 1) + if (giststate->tupdesc->natts > 1 && attno+1 != giststate->tupdesc->natts) { int MaxGrpId; @@ -873,17 +793,17 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v, v->spl_grpflag = (char *) palloc0(sizeof(char) * entryvec->n); v->spl_ngrp = (int *) palloc(sizeof(int) * entryvec->n); - MaxGrpId = gistfindgroup(giststate, entryvec->vector, v); + MaxGrpId = gistfindgroup(giststate, entryvec->vector, v, attno); /* form union of sub keys for each page (l,p) */ - gistunionsubkey(giststate, itup, v); + gistunionsubkey(giststate, itup, v, attno + 1); /* * if possible, we insert equivalent tuples with control by penalty * for a subkey(s) */ if (MaxGrpId > 1) - gistadjsubkey(r, itup, len, v, giststate); + gistadjsubkey(r, itup, len, v, giststate, attno); } } diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h index f08d49dbf9..43a6f62943 100644 --- a/src/include/access/gist_private.h +++ b/src/include/access/gist_private.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.15 2006/05/19 16:15:17 teodor Exp $ + * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.16 2006/05/24 11:01:39 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -206,17 +206,6 @@ typedef struct /* root page of a gist index */ #define GIST_ROOT_BLKNO 0 -/* - * When we update a relation on which we're doing a scan, we need to - * check the scan and fix it if the update affected any of the pages - * it touches. Otherwise, we can miss records that we should see. - * The only times we need to do this are for deletions and splits. See - * the code in gistscan.c for how the scan is fixed. These two - * constants tell us what sort of operation changed the index. - */ -#define GISTOP_DEL 0 -/* #define GISTOP_SPLIT 1 */ - #define ATTSIZE(datum, tupdesc, i, isnull) \ ( \ (isnull) ? 0 : \ @@ -291,12 +280,6 @@ extern IndexTuple gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate); -extern int gistfindgroup(GISTSTATE *giststate, - GISTENTRY *valvec, GIST_SPLITVEC *spl); -extern void gistadjsubkey(Relation r, - IndexTuple *itup, int len, - GIST_SPLITVEC *v, - GISTSTATE *giststate); extern IndexTuple gistFormTuple(GISTSTATE *giststate, Relation r, Datum *attdata, int *datumsize, bool *isnull); @@ -321,13 +304,15 @@ typedef struct { } GistSplitVec; extern void gistunionsubkeyvec(GISTSTATE *giststate, - IndexTuple *itvec, GistSplitVec *gsvp, bool isall); + IndexTuple *itvec, GistSplitVec *gsvp, int startkey); +extern void gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, + GIST_SPLITVEC *spl, int attno); extern void GISTInitBuffer(Buffer b, uint32 f); extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, Datum k, Relation r, Page pg, OffsetNumber o, int b, bool l, bool isNull); -void gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v, +void gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GIST_SPLITVEC *v, IndexTuple *itup, int len, GISTSTATE *giststate); /* gistvacuum.c */ diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 17ce73d015..06f0a125bf 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -37,7 +37,7 @@ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.333 2006/05/19 19:08:26 alvherre Exp $ + * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.334 2006/05/24 11:01:39 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 200605191 +#define CATALOG_VERSION_NO 200605241 #endif diff --git a/src/include/catalog/pg_am.h b/src/include/catalog/pg_am.h index 07a74c892d..141b42e02d 100644 --- a/src/include/catalog/pg_am.h +++ b/src/include/catalog/pg_am.h @@ -8,7 +8,7 @@ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.42 2006/05/02 22:25:10 tgl Exp $ + * $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.43 2006/05/24 11:01:39 teodor Exp $ * * NOTES * the genbki.sh script reads this file and generates .bki @@ -114,7 +114,7 @@ DESCR("b-tree index access method"); DATA(insert OID = 405 ( hash 1 1 0 f f f f f t f hashinsert hashbeginscan hashgettuple hashgetmulti hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbulkdelete hashvacuumcleanup hashcostestimate )); DESCR("hash index access method"); #define HASH_AM_OID 405 -DATA(insert OID = 783 ( gist 100 7 0 f t f f t t t gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate )); +DATA(insert OID = 783 ( gist 100 7 0 f t t t t t t gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate )); DESCR("GiST index access method"); #define GIST_AM_OID 783 DATA(insert OID = 2742 ( gin 100 4 0 f f f f t t f gininsert ginbeginscan gingettuple gingetmulti ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbulkdelete ginvacuumcleanup gincostestimate ));