diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 16468fd35a..2272e3339d 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.132 2006/04/03 13:44:33 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.133 2006/05/10 09:19:54 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -52,6 +52,8 @@ static void gistfindleaf(GISTInsertState *state, #define ROTATEDIST(d) do { \ SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \ memset(tmp,0,sizeof(SplitedPageLayout)); \ + tmp->block.blkno = InvalidBlockNumber; \ + tmp->buffer = InvalidBuffer; \ tmp->next = (d); \ (d)=tmp; \ } while(0) @@ -309,52 +311,111 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) bool is_splitted = false; bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false; + /* - * XXX this code really ought to work by locking, but not modifying, - * all the buffers it needs; then starting a critical section; then - * modifying the buffers in an already-determined way and writing an - * XLOG record to reflect that. Since it doesn't, we've got to put - * a critical section around the entire process, which is horrible - * from a robustness point of view. + * if (!is_leaf) remove old key: + * This node's key has been modified, either because a child split + * occurred or because we needed to adjust our key for an insert in a + * child node. Therefore, remove the old version of this node's key. + * + * for WAL replay, in the non-split case we handle this by + * setting up a one-element todelete array; in the split case, it's + * handled implicitly because the tuple vector passed to gistSplit + * won't include this tuple. */ - START_CRIT_SECTION(); - if (!is_leaf) - /* - * This node's key has been modified, either because a child split - * occurred or because we needed to adjust our key for an insert in a - * child node. Therefore, remove the old version of this node's key. - * - * Note: for WAL replay, in the non-split case we handle this by - * setting up a one-element todelete array; in the split case, it's - * handled implicitly because the tuple vector passed to gistSplit - * won't include this tuple. - */ - - PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); - - if (gistnospace(state->stack->page, state->itup, state->ituplen)) + if (gistnospace(state->stack->page, state->itup, state->ituplen, (is_leaf) ? InvalidOffsetNumber : state->stack->childoffnum)) { /* no space for insertion */ - IndexTuple *itvec, - *newitup; + IndexTuple *itvec; int tlen; SplitedPageLayout *dist = NULL, *ptr; + BlockNumber rrlink = InvalidBlockNumber; + GistNSN oldnsn; is_splitted = true; + + /* + * Form index tuples vector to split: + * remove old tuple if t's needed and add new tuples to vector + */ itvec = gistextractbuffer(state->stack->buffer, &tlen); + if ( !is_leaf ) { + /* on inner page we should remove old tuple */ + int pos = state->stack->childoffnum - FirstOffsetNumber; + + tlen--; + if ( pos != tlen ) + memmove( itvec+pos, itvec + pos + 1, sizeof( IndexTuple ) * (tlen-pos) ); + } itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen); - newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate); + dist = gistSplit(state->r, state->stack->page, itvec, tlen, giststate); + + state->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * tlen); + state->ituplen = 0; + + if (state->stack->blkno != GIST_ROOT_BLKNO) { + /* if non-root split then we should not allocate new buffer, + but we must create temporary page to operate */ + dist->buffer = state->stack->buffer; + dist->page = PageGetTempPage( BufferGetPage(dist->buffer), sizeof(GISTPageOpaqueData) ); + + /*clean all flags except F_LEAF */ + GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0; + } + + /* make new pages and fills them */ + for (ptr = dist; ptr; ptr = ptr->next) { + int i; + char *data; + + /* get new page */ + if ( ptr->buffer == InvalidBuffer ) { + ptr->buffer = gistNewBuffer( state->r ); + GISTInitBuffer( ptr->buffer, (is_leaf) ? F_LEAF : 0 ); + ptr->page = BufferGetPage(ptr->buffer); + } + ptr->block.blkno = BufferGetBlockNumber( ptr->buffer ); + + /* fill page, we can do it becouse all this pages are new (ie not linked in tree + or masked by temp page */ + data = (char*)(ptr->list); + for(i=0;iblock.num;i++) { + if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber ) + elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(state->r)); + data += IndexTupleSize((IndexTuple)data); + } + + /* set up ItemPointer and remmeber it for parent */ + ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno); + state->itup[ state->ituplen ] = ptr->itup; + state->ituplen++; + } + + /* saves old rightlink */ + if ( state->stack->blkno != GIST_ROOT_BLKNO ) + rrlink = GistPageGetOpaque(dist->page)->rightlink; + + START_CRIT_SECTION(); /* * must mark buffers dirty before XLogInsert, even though we'll - * still be changing their opaque fields below + * still be changing their opaque fields below. + * set up right links. */ - for (ptr = dist; ptr; ptr = ptr->next) + for (ptr = dist; ptr; ptr = ptr->next) { MarkBufferDirty(ptr->buffer); + GistPageGetOpaque(ptr->page)->rightlink = (ptr->next) ? + ptr->next->block.blkno : rrlink; + } + + /* restore splitted non-root page */ + if ( state->stack->blkno != GIST_ROOT_BLKNO ) { + PageRestoreTempPage( dist->page, BufferGetPage( dist->buffer ) ); + dist->page = BufferGetPage( dist->buffer ); } if (!state->r->rd_istemp) @@ -366,88 +427,44 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) is_leaf, &(state->key), dist); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); + for (ptr = dist; ptr; ptr = ptr->next) { - PageSetLSN(BufferGetPage(ptr->buffer), recptr); - PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID); + PageSetLSN(ptr->page, recptr); + PageSetTLI(ptr->page, ThisTimeLineID); } } else { for (ptr = dist; ptr; ptr = ptr->next) { - PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp); + PageSetLSN(ptr->page, XLogRecPtrForTemp); } } - state->itup = newitup; - state->ituplen = tlen; /* now tlen >= 2 */ + /* set up NSN */ + oldnsn = GistPageGetOpaque(dist->page)->nsn; + if ( state->stack->blkno == GIST_ROOT_BLKNO ) + /* if root split we should put initial value */ + oldnsn = PageGetLSN(dist->page); - if (state->stack->blkno == GIST_ROOT_BLKNO) - { + for (ptr = dist; ptr; ptr = ptr->next) { + /* only for last set oldnsn */ + GistPageGetOpaque(ptr->page)->nsn = (ptr->next) ? + PageGetLSN(ptr->page) : oldnsn; + } + + /* + * release buffers, if it was a root split then + * release all buffers because we create all buffers + */ + ptr = ( state->stack->blkno == GIST_ROOT_BLKNO ) ? dist : dist->next; + for(; ptr; ptr = ptr->next) + UnlockReleaseBuffer(ptr->buffer); + + if (state->stack->blkno == GIST_ROOT_BLKNO) { gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key)); state->needInsertComplete = false; - for (ptr = dist; ptr; ptr = ptr->next) - { - Page page = (Page) BufferGetPage(ptr->buffer); - - GistPageGetOpaque(page)->rightlink = (ptr->next) ? - ptr->next->block.blkno : InvalidBlockNumber; - GistPageGetOpaque(page)->nsn = PageGetLSN(page); - UnlockReleaseBuffer(ptr->buffer); - } - } - else - { - Page page; - BlockNumber rightrightlink = InvalidBlockNumber; - SplitedPageLayout *ourpage = NULL; - GistNSN oldnsn; - GISTPageOpaque opaque; - - /* move origpage to first in chain */ - if (dist->block.blkno != state->stack->blkno) - { - ptr = dist; - while (ptr->next) - { - if (ptr->next->block.blkno == state->stack->blkno) - { - ourpage = ptr->next; - ptr->next = ptr->next->next; - ourpage->next = dist; - dist = ourpage; - break; - } - ptr = ptr->next; - } - Assert(ourpage != NULL); - } - else - ourpage = dist; - - /* now gets all needed data, and sets nsn's */ - page = (Page) BufferGetPage(ourpage->buffer); - opaque = GistPageGetOpaque(page); - rightrightlink = opaque->rightlink; - oldnsn = opaque->nsn; - opaque->nsn = PageGetLSN(page); - opaque->rightlink = ourpage->next->block.blkno; - - /* - * fill and release all new pages. They isn't linked into tree yet - */ - for (ptr = ourpage->next; ptr; ptr = ptr->next) - { - page = (Page) BufferGetPage(ptr->buffer); - GistPageGetOpaque(page)->rightlink = (ptr->next) ? - ptr->next->block.blkno : rightrightlink; - /* only for last set oldnsn */ - GistPageGetOpaque(page)->nsn = (ptr->next) ? - opaque->nsn : oldnsn; - - UnlockReleaseBuffer(ptr->buffer); - } } END_CRIT_SECTION(); @@ -455,13 +472,14 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) else { /* enough space */ - XLogRecPtr oldlsn; + START_CRIT_SECTION(); + if (!is_leaf) + PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber); MarkBufferDirty(state->stack->buffer); - oldlsn = PageGetLSN(state->stack->page); if (!state->r->rd_istemp) { OffsetNumber noffs = 0, @@ -921,77 +939,55 @@ gistToRealOffset(OffsetNumber *arr, int len, OffsetNumber *reasloffset) arr[i] = reasloffset[arr[i]]; } +static IndexTupleData * +gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) { + char *ptr, *ret = palloc(BLCKSZ); + int i; + + ptr = ret; + for (i = 0; i < veclen; i++) { + memcpy(ptr, vec[i], IndexTupleSize(vec[i])); + ptr += IndexTupleSize(vec[i]); + } + + *memlen = ptr - ret; + Assert( *memlen < BLCKSZ ); + return (IndexTupleData*)ret; +} + /* * gistSplit -- split a page in the tree. */ -IndexTuple * +SplitedPageLayout * gistSplit(Relation r, - Buffer buffer, + Page page, IndexTuple *itup, /* contains compressed entry */ - int *len, - SplitedPageLayout **dist, + int len, GISTSTATE *giststate) { - Page p; - Buffer leftbuf, - rightbuf; - Page left, - right; IndexTuple *lvectup, - *rvectup, - *newtup; - BlockNumber lbknum, - rbknum; - GISTPageOpaque opaque; + *rvectup; GIST_SPLITVEC v; GistEntryVector *entryvec; int i, - fakeoffset, - nlen; + fakeoffset; OffsetNumber *realoffset; IndexTuple *cleaneditup = itup; - int lencleaneditup = *len; - - p = (Page) BufferGetPage(buffer); - opaque = GistPageGetOpaque(p); - - /* - * The root of the tree is the first block in the relation. If we're - * about to split the root, we need to do some hocus-pocus to enforce this - * guarantee. - */ - if (BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO) - { - leftbuf = gistNewBuffer(r); - GISTInitBuffer(leftbuf, opaque->flags & F_LEAF); - lbknum = BufferGetBlockNumber(leftbuf); - left = (Page) BufferGetPage(leftbuf); - } - else - { - leftbuf = buffer; - /* IncrBufferRefCount(buffer); */ - lbknum = BufferGetBlockNumber(buffer); - left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData)); - } - - rightbuf = gistNewBuffer(r); - GISTInitBuffer(rightbuf, opaque->flags & F_LEAF); - rbknum = BufferGetBlockNumber(rightbuf); - right = (Page) BufferGetPage(rightbuf); + int lencleaneditup = len; + SplitedPageLayout *res = NULL; /* generate the item array */ - realoffset = palloc((*len + 1) * sizeof(OffsetNumber)); - entryvec = palloc(GEVHDRSZ + (*len + 1) * sizeof(GISTENTRY)); - entryvec->n = *len + 1; + realoffset = palloc((len + 1) * sizeof(OffsetNumber)); + entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY)); + entryvec->n = len + 1; fakeoffset = FirstOffsetNumber; - for (i = 1; i <= *len; i++) + for (i = 1; i <= len; i++) { Datum datum; bool IsNull; - if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup[i - 1])) + if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1])) { entryvec->n--; /* remember position of invalid tuple */ @@ -1001,7 +997,7 @@ gistSplit(Relation r, datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull); gistdentryinit(giststate, 0, &(entryvec->vector[fakeoffset]), - datum, r, p, i, + datum, r, page, i, ATTSIZE(datum, giststate->tupdesc, 1, IsNull), FALSE, IsNull); realoffset[fakeoffset] = i; @@ -1013,14 +1009,14 @@ gistSplit(Relation r, * possible, we move all invalid tuples on right page. We should remember, * that union with invalid tuples is a invalid tuple. */ - if (entryvec->n != *len + 1) + if (entryvec->n != len + 1) { lencleaneditup = entryvec->n - 1; cleaneditup = (IndexTuple *) palloc(lencleaneditup * sizeof(IndexTuple)); for (i = 1; i < entryvec->n; i++) cleaneditup[i - 1] = itup[realoffset[i] - 1]; - if (gistnospace(left, cleaneditup, lencleaneditup)) + if (!gistfitpage(cleaneditup, lencleaneditup)) { /* no space on left to put all good tuples, so picksplit */ gistUserPicksplit(r, entryvec, &v, cleaneditup, lencleaneditup, giststate); @@ -1041,8 +1037,8 @@ gistSplit(Relation r, v.spl_leftvalid = v.spl_rightvalid = false; v.spl_nright = 0; v.spl_nleft = 0; - for (i = 1; i <= *len; i++) - if (i - 1 < *len / 2) + for (i = 1; i <= len; i++) + if (i - 1 < len / 2) v.spl_left[v.spl_nleft++] = i; else v.spl_right[v.spl_nright++] = i; @@ -1071,14 +1067,14 @@ gistSplit(Relation r, else { /* there is no invalid tuples, so usial processing */ - gistUserPicksplit(r, entryvec, &v, itup, *len, giststate); + gistUserPicksplit(r, entryvec, &v, itup, len, giststate); v.spl_leftvalid = v.spl_rightvalid = true; } /* form left and right vector */ - lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len + 1)); - rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len + 1)); + lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1)); + rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1)); for (i = 0; i < v.spl_nleft; i++) lvectup[i] = itup[v.spl_left[i] - 1]; @@ -1087,87 +1083,48 @@ gistSplit(Relation r, rvectup[i] = itup[v.spl_right[i] - 1]; /* place invalid tuples on right page if itsn't done yet */ - for (fakeoffset = entryvec->n; fakeoffset < *len + 1 && lencleaneditup; fakeoffset++) + for (fakeoffset = entryvec->n; fakeoffset < len + 1 && lencleaneditup; fakeoffset++) { rvectup[v.spl_nright++] = itup[realoffset[fakeoffset] - 1]; } - /* write on disk (may need another split) */ - if (gistnospace(right, rvectup, v.spl_nright)) + /* finalyze splitting (may need another split) */ + if (!gistfitpage(rvectup, v.spl_nright)) { - nlen = v.spl_nright; - newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate); - /* ReleaseBuffer(rightbuf); */ + res = gistSplit(r, page, rvectup, v.spl_nright, giststate); } else { - char *ptr; - - gistfillbuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber); - /* XLOG stuff */ - ROTATEDIST(*dist); - (*dist)->block.blkno = BufferGetBlockNumber(rightbuf); - (*dist)->block.num = v.spl_nright; - (*dist)->list = (IndexTupleData *) palloc(BLCKSZ); - ptr = (char *) ((*dist)->list); - for (i = 0; i < v.spl_nright; i++) - { - memcpy(ptr, rvectup[i], IndexTupleSize(rvectup[i])); - ptr += IndexTupleSize(rvectup[i]); - } - (*dist)->lenlist = ptr - ((char *) ((*dist)->list)); - (*dist)->buffer = rightbuf; - - nlen = 1; - newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1); - newtup[0] = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull) - : gist_form_invalid_tuple(rbknum); - ItemPointerSetBlockNumber(&(newtup[0]->t_tid), rbknum); + ROTATEDIST(res); + res->block.num = v.spl_nright; + res->list = gistfillitupvec(rvectup, v.spl_nright, &( res->lenlist ) ); + res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull) + : gist_form_invalid_tuple(GIST_ROOT_BLKNO); } - if (gistnospace(left, lvectup, v.spl_nleft)) + if (!gistfitpage(lvectup, v.spl_nleft)) { - int llen = v.spl_nleft; - IndexTuple *lntup; + SplitedPageLayout *resptr, *subres; - lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate); - /* ReleaseBuffer(leftbuf); */ + resptr = subres = gistSplit(r, page, lvectup, v.spl_nleft, giststate); - newtup = gistjoinvector(newtup, &nlen, lntup, llen); + /* install on list's tail */ + while( resptr->next ) + resptr = resptr->next; + + resptr->next = res; + res = subres; } else { - char *ptr; - - gistfillbuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber); - /* XLOG stuff */ - ROTATEDIST(*dist); - (*dist)->block.blkno = BufferGetBlockNumber(leftbuf); - (*dist)->block.num = v.spl_nleft; - (*dist)->list = (IndexTupleData *) palloc(BLCKSZ); - ptr = (char *) ((*dist)->list); - for (i = 0; i < v.spl_nleft; i++) - { - memcpy(ptr, lvectup[i], IndexTupleSize(lvectup[i])); - ptr += IndexTupleSize(lvectup[i]); - } - (*dist)->lenlist = ptr - ((char *) ((*dist)->list)); - (*dist)->buffer = leftbuf; - - if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO) - PageRestoreTempPage(left, p); - - nlen += 1; - newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen); - newtup[nlen - 1] = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull) - : gist_form_invalid_tuple(lbknum); - ItemPointerSetBlockNumber(&(newtup[nlen - 1]->t_tid), lbknum); + ROTATEDIST(res); + res->block.num = v.spl_nleft; + res->list = gistfillitupvec(lvectup, v.spl_nleft, &( res->lenlist ) ); + res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull) + : gist_form_invalid_tuple(GIST_ROOT_BLKNO); } - GistClearTuplesDeleted(p); - - *len = nlen; - return newtup; + return res; } /* diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c index bf0a090ff8..d5d6405100 100644 --- a/src/backend/access/gist/gistutil.c +++ b/src/backend/access/gist/gistutil.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.10 2006/03/05 15:58:20 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.11 2006/05/10 09:19:54 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -81,15 +81,31 @@ gistfillbuffer(Relation r, Page page, IndexTuple *itup, * Check space for itup vector on page */ bool -gistnospace(Page page, IndexTuple *itvec, int len) +gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete) { - unsigned int size = 0; + unsigned int size = 0, deleted = 0; int i; for (i = 0; i < len; i++) size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData); - return (PageGetFreeSpace(page) < size); + if ( todelete != InvalidOffsetNumber ) { + IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete)); + deleted = IndexTupleSize(itup) + sizeof(ItemIdData); + } + + return (PageGetFreeSpace(page) + deleted < size); +} + +bool +gistfitpage(IndexTuple *itvec, int len) { + int i; + Size size=0; + + for(i=0;iopCtx); - vec = gistextractbuffer(buffer, &(res.ituplen)); - vec = gistjoinvector(vec, &(res.ituplen), addon, curlenaddon); - res.itup = gistSplit(gv->index, buffer, vec, &(res.ituplen), &dist, &(gv->giststate)); + vec = gistextractbuffer(buffer, &veclen); + vec = gistjoinvector(vec, &veclen, addon, curlenaddon); + dist = gistSplit(gv->index, page, vec, veclen, &(gv->giststate)); + MemoryContextSwitchTo(oldCtx); - vec = (IndexTuple *) palloc(sizeof(IndexTuple) * res.ituplen); - for (i = 0; i < res.ituplen; i++) - { - vec[i] = (IndexTuple) palloc(IndexTupleSize(res.itup[i])); - memcpy(vec[i], res.itup[i], IndexTupleSize(res.itup[i])); + if (blkno != GIST_ROOT_BLKNO) { + /* if non-root split then we should not allocate new buffer */ + dist->buffer = buffer; + dist->page = BufferGetPage(dist->buffer); + GistPageGetOpaque(dist->page)->flags = 0; } - res.itup = vec; - for (ptr = dist; ptr; ptr = ptr->next) - { + res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen); + res.ituplen = 0; + + /* make new pages and fills them */ + for (ptr = dist; ptr; ptr = ptr->next) { + char *data; + + if ( ptr->buffer == InvalidBuffer ) { + ptr->buffer = gistNewBuffer( gv->index ); + GISTInitBuffer( ptr->buffer, 0 ); + ptr->page = BufferGetPage(ptr->buffer); + } + ptr->block.blkno = BufferGetBlockNumber( ptr->buffer ); + + data = (char*)(ptr->list); + for(i=0;iblock.num;i++) { + if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber ) + elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index)); + data += IndexTupleSize((IndexTuple)data); + } + + ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno); + res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup)); + memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) ); + res.ituplen++; + MarkBufferDirty(ptr->buffer); } @@ -218,10 +239,9 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) for (ptr = dist; ptr; ptr = ptr->next) { - /* we must keep the buffer lock on the head page */ + /* we must keep the buffer pin on the head page */ if (BufferGetBlockNumber(ptr->buffer) != blkno) - LockBuffer(ptr->buffer, GIST_UNLOCK); - ReleaseBuffer(ptr->buffer); + UnlockReleaseBuffer( ptr->buffer ); } if (blkno == GIST_ROOT_BLKNO) @@ -294,6 +314,7 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) if (needwrite) { MarkBufferDirty(buffer); + GistClearTuplesDeleted(page); if (!gv->index->rd_istemp) { @@ -570,14 +591,7 @@ gistbulkdelete(PG_FUNCTION_ARGS) /* * Remove deletable tuples from page - * - * XXX try to make this critical section shorter. Could do it - * by separating the callback loop from the actual tuple deletion, - * but that would affect the definition of the todelete[] array - * passed into the WAL record (because the indexes would all be - * pre-deletion). */ - START_CRIT_SECTION(); maxoff = PageGetMaxOffsetNumber(page); @@ -588,13 +602,9 @@ gistbulkdelete(PG_FUNCTION_ARGS) if (callback(&(idxtuple->t_tid), callback_state)) { - PageIndexTupleDelete(page, i); - todelete[ntodelete] = i; - i--; - maxoff--; + todelete[ntodelete] = i-ntodelete; ntodelete++; stats->std.tuples_removed += 1; - Assert(maxoff == PageGetMaxOffsetNumber(page)); } else stats->std.num_index_tuples += 1; @@ -602,10 +612,14 @@ gistbulkdelete(PG_FUNCTION_ARGS) if (ntodelete) { - GistMarkTuplesDeleted(page); + START_CRIT_SECTION(); MarkBufferDirty(buffer); + for(i=0;ird_istemp) { XLogRecData *rdata; @@ -627,9 +641,10 @@ gistbulkdelete(PG_FUNCTION_ARGS) } else PageSetLSN(page, XLogRecPtrForTemp); + + END_CRIT_SECTION(); } - END_CRIT_SECTION(); } else { diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index c74762b7df..a029d8f1ec 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.15 2006/04/03 16:45:50 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.16 2006/05/10 09:19:54 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -625,7 +625,7 @@ gistContinueInsert(gistIncompleteInsert *insert) } } - if (gistnospace(pages[numbuffer - 1], itup, lenitup)) + if (gistnospace(pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber)) { /* no space left on page, so we must split */ buffers[numbuffer] = ReadBuffer(index, P_NEW); diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h index 1bfc90abbc..7e9469f000 100644 --- a/src/include/access/gist_private.h +++ b/src/include/access/gist_private.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.12 2006/03/30 23:03:10 tgl Exp $ + * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.13 2006/05/10 09:19:54 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -138,6 +138,8 @@ typedef struct SplitedPageLayout gistxlogPage block; IndexTupleData *list; int lenlist; + IndexTuple itup; /* union key for page */ + Page page; /* to operate */ Buffer buffer; /* to write after all proceed */ struct SplitedPageLayout *next; @@ -234,8 +236,8 @@ extern void freeGISTstate(GISTSTATE *giststate); extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate); extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key); -extern IndexTuple *gistSplit(Relation r, Buffer buffer, IndexTuple *itup, - int *len, SplitedPageLayout **dist, GISTSTATE *giststate); +extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup, + int len, GISTSTATE *giststate); extern GISTInsertStack *gistFindPath(Relation r, BlockNumber child); @@ -261,11 +263,16 @@ extern Datum gistgettuple(PG_FUNCTION_ARGS); extern Datum gistgetmulti(PG_FUNCTION_ARGS); /* gistutil.c */ + +#define GiSTPageSize \ + ( BLCKSZ - SizeOfPageHeaderData - MAXALIGN(sizeof(GISTPageOpaqueData)) ) + +extern bool gistfitpage(IndexTuple *itvec, int len); +extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete); extern void gistcheckpage(Relation rel, Buffer buf); extern Buffer gistNewBuffer(Relation r); extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup, int len, OffsetNumber off); -extern bool gistnospace(Page page, IndexTuple *itvec, int len); extern IndexTuple *gistextractbuffer(Buffer buffer, int *len /* out */ ); extern IndexTuple *gistjoinvector( IndexTuple *itvec, int *len,