847 lines
24 KiB
C
847 lines
24 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* gistxlog.c
|
|
* WAL replay logic for GiST.
|
|
*
|
|
*
|
|
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
* IDENTIFICATION
|
|
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.7 2005/07/01 13:18:17 teodor Exp $
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
#include "postgres.h"
|
|
|
|
#include "access/genam.h"
|
|
#include "access/gist_private.h"
|
|
#include "access/gistscan.h"
|
|
#include "access/heapam.h"
|
|
#include "catalog/index.h"
|
|
#include "commands/vacuum.h"
|
|
#include "miscadmin.h"
|
|
#include "utils/memutils.h"
|
|
|
|
|
|
typedef struct {
|
|
gistxlogEntryUpdate *data;
|
|
int len;
|
|
IndexTuple *itup;
|
|
OffsetNumber *todelete;
|
|
} EntryUpdateRecord;
|
|
|
|
typedef struct {
|
|
gistxlogPage *header;
|
|
IndexTuple *itup;
|
|
} NewPage;
|
|
|
|
typedef struct {
|
|
gistxlogPageSplit *data;
|
|
NewPage *page;
|
|
} PageSplitRecord;
|
|
|
|
/* track for incomplete inserts, idea was taken from nbtxlog.c */
|
|
|
|
typedef struct gistIncompleteInsert {
|
|
RelFileNode node;
|
|
BlockNumber origblkno; /* for splits */
|
|
ItemPointerData key;
|
|
int lenblk;
|
|
BlockNumber *blkno;
|
|
XLogRecPtr lsn;
|
|
BlockNumber *path;
|
|
int pathlen;
|
|
} gistIncompleteInsert;
|
|
|
|
|
|
MemoryContext opCtx;
|
|
MemoryContext insertCtx;
|
|
static List *incomplete_inserts;
|
|
|
|
|
|
#define ItemPointerEQ( a, b ) \
|
|
( \
|
|
ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
|
|
ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \
|
|
)
|
|
|
|
static void
|
|
pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
|
|
BlockNumber *blkno, int lenblk,
|
|
PageSplitRecord *xlinfo /* to extract blkno info */ ) {
|
|
MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
|
|
gistIncompleteInsert *ninsert = (gistIncompleteInsert*)palloc( sizeof(gistIncompleteInsert) );
|
|
|
|
ninsert->node = node;
|
|
ninsert->key = key;
|
|
ninsert->lsn = lsn;
|
|
|
|
if ( lenblk && blkno ) {
|
|
ninsert->lenblk = lenblk;
|
|
ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk );
|
|
memcpy(ninsert->blkno, blkno, sizeof(BlockNumber)*ninsert->lenblk);
|
|
ninsert->origblkno = *blkno;
|
|
} else {
|
|
int i;
|
|
|
|
Assert( xlinfo );
|
|
ninsert->lenblk = xlinfo->data->npage;
|
|
ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk );
|
|
for(i=0;i<ninsert->lenblk;i++)
|
|
ninsert->blkno[i] = xlinfo->page[i].header->blkno;
|
|
ninsert->origblkno = xlinfo->data->origblkno;
|
|
}
|
|
Assert( ninsert->lenblk>0 );
|
|
|
|
incomplete_inserts = lappend(incomplete_inserts, ninsert);
|
|
MemoryContextSwitchTo(oldCxt);
|
|
}
|
|
|
|
static void
|
|
forgetIncompleteInsert(RelFileNode node, ItemPointerData key) {
|
|
ListCell *l;
|
|
|
|
foreach(l, incomplete_inserts) {
|
|
gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l);
|
|
|
|
if ( RelFileNodeEquals(node, insert->node) && ItemPointerEQ( &(insert->key), &(key) ) ) {
|
|
|
|
/* found */
|
|
pfree( insert->blkno );
|
|
incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
|
|
pfree( insert );
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) {
|
|
char *begin = XLogRecGetData(record), *ptr;
|
|
int i=0, addpath=0;
|
|
|
|
decoded->data = (gistxlogEntryUpdate*)begin;
|
|
|
|
if ( decoded->data->ntodelete ) {
|
|
decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogEntryUpdate ) + addpath);
|
|
addpath = MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete );
|
|
} else
|
|
decoded->todelete = NULL;
|
|
|
|
decoded->len=0;
|
|
ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath;
|
|
while( ptr - begin < record->xl_len ) {
|
|
decoded->len++;
|
|
ptr += IndexTupleSize( (IndexTuple)ptr );
|
|
}
|
|
|
|
decoded->itup=(IndexTuple*)palloc( sizeof( IndexTuple ) * decoded->len );
|
|
|
|
ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath;
|
|
while( ptr - begin < record->xl_len ) {
|
|
decoded->itup[i] = (IndexTuple)ptr;
|
|
ptr += IndexTupleSize( decoded->itup[i] );
|
|
i++;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* redo any page update (except page split)
|
|
*/
|
|
static void
|
|
gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
|
|
EntryUpdateRecord xlrec;
|
|
Relation reln;
|
|
Buffer buffer;
|
|
Page page;
|
|
|
|
decodeEntryUpdateRecord( &xlrec, record );
|
|
|
|
reln = XLogOpenRelation(xlrec.data->node);
|
|
if (!RelationIsValid(reln))
|
|
return;
|
|
buffer = XLogReadBuffer(false, reln, xlrec.data->blkno);
|
|
if (!BufferIsValid(buffer))
|
|
elog(PANIC, "gistRedoEntryUpdateRecord: block %u unfound", xlrec.data->blkno);
|
|
page = (Page) BufferGetPage(buffer);
|
|
|
|
if ( isnewroot ) {
|
|
if ( !PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page)) ) {
|
|
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
|
|
ReleaseBuffer(buffer);
|
|
return;
|
|
}
|
|
} else {
|
|
if ( PageIsNew((PageHeader) page) )
|
|
elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page blkno %u", xlrec.data->blkno);
|
|
if (XLByteLE(lsn, PageGetLSN(page))) {
|
|
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
|
|
ReleaseBuffer(buffer);
|
|
return;
|
|
}
|
|
}
|
|
|
|
if ( xlrec.data->isemptypage ) {
|
|
while( !PageIsEmpty(page) )
|
|
PageIndexTupleDelete( page, FirstOffsetNumber );
|
|
|
|
if ( xlrec.data->blkno == GIST_ROOT_BLKNO )
|
|
GistPageSetLeaf( page );
|
|
else
|
|
GistPageSetDeleted( page );
|
|
} else {
|
|
if ( isnewroot )
|
|
GISTInitBuffer(buffer, 0);
|
|
else if ( xlrec.data->ntodelete ) {
|
|
int i;
|
|
for(i=0; i < xlrec.data->ntodelete ; i++)
|
|
PageIndexTupleDelete(page, xlrec.todelete[i]);
|
|
if ( GistPageIsLeaf(page) )
|
|
GistMarkTuplesDeleted(page);
|
|
}
|
|
|
|
/* add tuples */
|
|
if ( xlrec.len > 0 )
|
|
gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
|
|
|
|
/* special case: leafpage, nothing to insert, nothing to delete, then
|
|
vacuum marks page */
|
|
if ( GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0 )
|
|
GistClearTuplesDeleted(page);
|
|
}
|
|
|
|
PageSetLSN(page, lsn);
|
|
PageSetTLI(page, ThisTimeLineID);
|
|
GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
|
|
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
|
|
WriteBuffer(buffer);
|
|
|
|
if ( ItemPointerIsValid( &(xlrec.data->key) ) ) {
|
|
if ( incomplete_inserts != NIL )
|
|
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
|
|
|
|
if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO )
|
|
pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
|
|
&(xlrec.data->blkno), 1,
|
|
NULL);
|
|
}
|
|
}
|
|
|
|
static void
|
|
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) {
|
|
char *begin = XLogRecGetData(record), *ptr;
|
|
int j,i=0;
|
|
|
|
decoded->data = (gistxlogPageSplit*)begin;
|
|
decoded->page = (NewPage*)palloc( sizeof(NewPage) * decoded->data->npage );
|
|
|
|
ptr=begin+sizeof( gistxlogPageSplit );
|
|
for(i=0;i<decoded->data->npage;i++) {
|
|
Assert( ptr - begin < record->xl_len );
|
|
decoded->page[i].header = (gistxlogPage*)ptr;
|
|
ptr += sizeof(gistxlogPage);
|
|
|
|
decoded->page[i].itup = (IndexTuple*)
|
|
palloc( sizeof(IndexTuple) * decoded->page[i].header->num );
|
|
j=0;
|
|
while(j<decoded->page[i].header->num) {
|
|
Assert( ptr - begin < record->xl_len );
|
|
decoded->page[i].itup[j] = (IndexTuple)ptr;
|
|
ptr += IndexTupleSize((IndexTuple)ptr);
|
|
j++;
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
|
|
PageSplitRecord xlrec;
|
|
Relation reln;
|
|
Buffer buffer;
|
|
Page page;
|
|
int i;
|
|
int flags=0;
|
|
|
|
decodePageSplitRecord( &xlrec, record );
|
|
reln = XLogOpenRelation(xlrec.data->node);
|
|
if (!RelationIsValid(reln))
|
|
return;
|
|
|
|
/* first of all wee need get F_LEAF flag from original page */
|
|
buffer = XLogReadBuffer( false, reln, xlrec.data->origblkno);
|
|
if (!BufferIsValid(buffer))
|
|
elog(PANIC, "gistRedoEntryUpdateRecord: block %u unfound", xlrec.data->origblkno);
|
|
page = (Page) BufferGetPage(buffer);
|
|
if ( PageIsNew((PageHeader) page) )
|
|
elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page blkno %u",
|
|
xlrec.data->origblkno);
|
|
|
|
flags = ( GistPageIsLeaf(page) ) ? F_LEAF : 0;
|
|
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
|
|
ReleaseBuffer(buffer);
|
|
|
|
/* loop around all pages */
|
|
for(i=0;i<xlrec.data->npage;i++) {
|
|
NewPage *newpage = xlrec.page + i;
|
|
bool isorigpage = (xlrec.data->origblkno == newpage->header->blkno) ? true : false;
|
|
|
|
buffer = XLogReadBuffer( !isorigpage, reln, newpage->header->blkno);
|
|
if (!BufferIsValid(buffer))
|
|
elog(PANIC, "gistRedoEntryUpdateRecord: block %u unfound", newpage->header->blkno);
|
|
page = (Page) BufferGetPage(buffer);
|
|
|
|
if (XLByteLE(lsn, PageGetLSN(page))) {
|
|
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
|
|
ReleaseBuffer(buffer);
|
|
continue;
|
|
}
|
|
|
|
/* ok, clear buffer */
|
|
GISTInitBuffer(buffer, flags);
|
|
|
|
/* and fill it */
|
|
gistfillbuffer(reln, page, newpage->itup, newpage->header->num, FirstOffsetNumber);
|
|
|
|
PageSetLSN(page, lsn);
|
|
PageSetTLI(page, ThisTimeLineID);
|
|
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
|
|
WriteBuffer(buffer);
|
|
}
|
|
|
|
if ( ItemPointerIsValid( &(xlrec.data->key) ) ) {
|
|
if ( incomplete_inserts != NIL )
|
|
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
|
|
|
|
pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
|
|
NULL, 0,
|
|
&xlrec);
|
|
}
|
|
}
|
|
|
|
static void
|
|
gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) {
|
|
RelFileNode *node = (RelFileNode*)XLogRecGetData(record);
|
|
Relation reln;
|
|
Buffer buffer;
|
|
Page page;
|
|
|
|
reln = XLogOpenRelation(*node);
|
|
if (!RelationIsValid(reln))
|
|
return;
|
|
buffer = XLogReadBuffer( true, reln, GIST_ROOT_BLKNO);
|
|
if (!BufferIsValid(buffer))
|
|
elog(PANIC, "gistRedoCreateIndex: block unfound");
|
|
page = (Page) BufferGetPage(buffer);
|
|
|
|
if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page))) {
|
|
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
|
|
ReleaseBuffer(buffer);
|
|
return;
|
|
}
|
|
|
|
GISTInitBuffer(buffer, F_LEAF);
|
|
|
|
PageSetLSN(page, lsn);
|
|
PageSetTLI(page, ThisTimeLineID);
|
|
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
|
|
WriteBuffer(buffer);
|
|
}
|
|
|
|
static void
|
|
gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record) {
|
|
char *begin = XLogRecGetData(record), *ptr;
|
|
gistxlogInsertComplete *xlrec;
|
|
|
|
xlrec = (gistxlogInsertComplete*)begin;
|
|
|
|
ptr = begin + sizeof( gistxlogInsertComplete );
|
|
while( ptr - begin < record->xl_len ) {
|
|
Assert( record->xl_len - (ptr - begin) >= sizeof(ItemPointerData) );
|
|
forgetIncompleteInsert( xlrec->node, *((ItemPointerData*)ptr) );
|
|
ptr += sizeof(ItemPointerData);
|
|
}
|
|
}
|
|
|
|
void
|
|
gist_redo(XLogRecPtr lsn, XLogRecord *record)
|
|
{
|
|
uint8 info = record->xl_info & ~XLR_INFO_MASK;
|
|
|
|
MemoryContext oldCxt;
|
|
oldCxt = MemoryContextSwitchTo(opCtx);
|
|
switch (info) {
|
|
case XLOG_GIST_ENTRY_UPDATE:
|
|
case XLOG_GIST_ENTRY_DELETE:
|
|
gistRedoEntryUpdateRecord(lsn, record,false);
|
|
break;
|
|
case XLOG_GIST_NEW_ROOT:
|
|
gistRedoEntryUpdateRecord(lsn, record,true);
|
|
break;
|
|
case XLOG_GIST_PAGE_SPLIT:
|
|
gistRedoPageSplitRecord(lsn, record);
|
|
break;
|
|
case XLOG_GIST_CREATE_INDEX:
|
|
gistRedoCreateIndex(lsn, record);
|
|
break;
|
|
case XLOG_GIST_INSERT_COMPLETE:
|
|
gistRedoCompleteInsert(lsn, record);
|
|
break;
|
|
default:
|
|
elog(PANIC, "gist_redo: unknown op code %u", info);
|
|
}
|
|
|
|
MemoryContextSwitchTo(oldCxt);
|
|
MemoryContextReset(opCtx);
|
|
}
|
|
|
|
static void
|
|
out_target(char *buf, RelFileNode node, ItemPointerData key)
|
|
{
|
|
sprintf(buf + strlen(buf), "rel %u/%u/%u; tid %u/%u",
|
|
node.spcNode, node.dbNode, node.relNode,
|
|
ItemPointerGetBlockNumber(&key),
|
|
ItemPointerGetOffsetNumber(&key));
|
|
}
|
|
|
|
static void
|
|
out_gistxlogEntryUpdate(char *buf, gistxlogEntryUpdate *xlrec) {
|
|
out_target(buf, xlrec->node, xlrec->key);
|
|
sprintf(buf + strlen(buf), "; block number %u",
|
|
xlrec->blkno);
|
|
}
|
|
|
|
static void
|
|
out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) {
|
|
strcat(buf, "page_split: ");
|
|
out_target(buf, xlrec->node, xlrec->key);
|
|
sprintf(buf + strlen(buf), "; block number %u splits to %d pages",
|
|
xlrec->origblkno, xlrec->npage);
|
|
}
|
|
|
|
void
|
|
gist_desc(char *buf, uint8 xl_info, char *rec)
|
|
{
|
|
uint8 info = xl_info & ~XLR_INFO_MASK;
|
|
|
|
switch (info) {
|
|
case XLOG_GIST_ENTRY_UPDATE:
|
|
strcat(buf, "entry_update: ");
|
|
out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate*)rec);
|
|
break;
|
|
case XLOG_GIST_ENTRY_DELETE:
|
|
strcat(buf, "entry_delete: ");
|
|
out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate*)rec);
|
|
break;
|
|
case XLOG_GIST_NEW_ROOT:
|
|
strcat(buf, "new_root: ");
|
|
out_target(buf, ((gistxlogEntryUpdate*)rec)->node, ((gistxlogEntryUpdate*)rec)->key);
|
|
break;
|
|
case XLOG_GIST_PAGE_SPLIT:
|
|
out_gistxlogPageSplit(buf, (gistxlogPageSplit*)rec);
|
|
break;
|
|
case XLOG_GIST_CREATE_INDEX:
|
|
sprintf(buf + strlen(buf), "create_index: rel %u/%u/%u",
|
|
((RelFileNode*)rec)->spcNode,
|
|
((RelFileNode*)rec)->dbNode,
|
|
((RelFileNode*)rec)->relNode);
|
|
break;
|
|
case XLOG_GIST_INSERT_COMPLETE:
|
|
sprintf(buf + strlen(buf), "complete_insert: rel %u/%u/%u",
|
|
((gistxlogInsertComplete*)rec)->node.spcNode,
|
|
((gistxlogInsertComplete*)rec)->node.dbNode,
|
|
((gistxlogInsertComplete*)rec)->node.relNode);
|
|
default:
|
|
elog(PANIC, "gist_desc: unknown op code %u", info);
|
|
}
|
|
}
|
|
|
|
IndexTuple
|
|
gist_form_invalid_tuple(BlockNumber blkno) {
|
|
/* we don't alloc space for null's bitmap, this is invalid tuple,
|
|
be carefull in read and write code */
|
|
Size size = IndexInfoFindDataOffset(0);
|
|
IndexTuple tuple=(IndexTuple)palloc0( size );
|
|
|
|
tuple->t_info |= size;
|
|
|
|
ItemPointerSetBlockNumber(&(tuple->t_tid), blkno);
|
|
GistTupleSetInvalid( tuple );
|
|
|
|
return tuple;
|
|
}
|
|
|
|
static Buffer
|
|
gistXLogReadAndLockBuffer( Relation r, BlockNumber blkno ) {
|
|
Buffer buffer = XLogReadBuffer( false, r, blkno );
|
|
if (!BufferIsValid(buffer))
|
|
elog(PANIC, "gistXLogReadAndLockBuffer: block %u unfound", blkno);
|
|
if ( PageIsNew( (PageHeader)(BufferGetPage(buffer)) ) )
|
|
elog(PANIC, "gistXLogReadAndLockBuffer: uninitialized page blkno %u", blkno);
|
|
|
|
return buffer;
|
|
}
|
|
|
|
|
|
static void
|
|
gixtxlogFindPath( Relation index, gistIncompleteInsert *insert ) {
|
|
GISTInsertStack *top;
|
|
|
|
insert->pathlen = 0;
|
|
insert->path = NULL;
|
|
|
|
if ( (top=gistFindPath(index, insert->origblkno, gistXLogReadAndLockBuffer)) != NULL ) {
|
|
int i;
|
|
GISTInsertStack *ptr=top;
|
|
while(ptr) {
|
|
insert->pathlen++;
|
|
ptr = ptr->parent;
|
|
}
|
|
|
|
insert->path=(BlockNumber*)palloc( sizeof(BlockNumber) * insert->pathlen );
|
|
|
|
i=0;
|
|
ptr = top;
|
|
while(ptr) {
|
|
insert->path[i] = ptr->blkno;
|
|
i++;
|
|
ptr = ptr->parent;
|
|
}
|
|
} else
|
|
elog(LOG, "gixtxlogFindPath: lost parent for block %u", insert->origblkno);
|
|
}
|
|
|
|
/*
|
|
* Continue insert after crash. In normal situation, there isn't any incomplete
|
|
* inserts, but if it might be after crash, WAL may has not a record of completetion.
|
|
*
|
|
* Although stored LSN in gistIncompleteInsert is a LSN of child page,
|
|
* we can compare it with LSN of parent, because parent is always locked
|
|
* while we change child page (look at gistmakedeal). So if parent's LSN is
|
|
* lesser than stored lsn then changes in parent doesn't do yet.
|
|
*/
|
|
static void
|
|
gistContinueInsert(gistIncompleteInsert *insert) {
|
|
IndexTuple *itup;
|
|
int i, lenitup;
|
|
Relation index;
|
|
|
|
index = XLogOpenRelation(insert->node);
|
|
if (!RelationIsValid(index))
|
|
return;
|
|
|
|
/* needed vector itup never will be more than initial lenblkno+2,
|
|
because during this processing Indextuple can be only smaller */
|
|
lenitup = insert->lenblk;
|
|
itup = (IndexTuple*)palloc(sizeof(IndexTuple)*(lenitup+2 /*guarantee root split*/));
|
|
|
|
for(i=0;i<insert->lenblk;i++)
|
|
itup[i] = gist_form_invalid_tuple( insert->blkno[i] );
|
|
|
|
if ( insert->origblkno==GIST_ROOT_BLKNO ) {
|
|
/*it was split root, so we should only make new root.
|
|
it can't be simple insert into root, look at call
|
|
pushIncompleteInsert in gistRedoPageSplitRecord */
|
|
Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO);
|
|
Page page;
|
|
|
|
if (!BufferIsValid(buffer))
|
|
elog(PANIC, "gistContinueInsert: root block unfound");
|
|
|
|
page = BufferGetPage(buffer);
|
|
if (XLByteLE(insert->lsn, PageGetLSN(page))) {
|
|
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
|
|
ReleaseBuffer(buffer);
|
|
return;
|
|
}
|
|
|
|
GISTInitBuffer(buffer, 0);
|
|
page = BufferGetPage(buffer);
|
|
gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber);
|
|
PageSetLSN(page, insert->lsn);
|
|
PageSetTLI(page, ThisTimeLineID);
|
|
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
|
|
WriteBuffer(buffer);
|
|
} else {
|
|
Buffer *buffers;
|
|
Page *pages;
|
|
int numbuffer;
|
|
|
|
/* construct path */
|
|
gixtxlogFindPath( index, insert );
|
|
|
|
Assert( insert->pathlen > 0 );
|
|
|
|
buffers= (Buffer*) palloc( sizeof(Buffer) * (insert->lenblk+2/*guarantee root split*/) );
|
|
pages = (Page*) palloc( sizeof(Page ) * (insert->lenblk+2/*guarantee root split*/) );
|
|
|
|
for(i=0;i<insert->pathlen;i++) {
|
|
int j, k, pituplen=0, childfound=0;
|
|
|
|
numbuffer=1;
|
|
buffers[numbuffer-1] = XLogReadBuffer(false, index, insert->path[i]);
|
|
if (!BufferIsValid(buffers[numbuffer-1]))
|
|
elog(PANIC, "gistContinueInsert: block %u unfound", insert->path[i]);
|
|
pages[numbuffer-1] = BufferGetPage( buffers[numbuffer-1] );
|
|
if ( PageIsNew((PageHeader)(pages[numbuffer-1])) )
|
|
elog(PANIC, "gistContinueInsert: uninitialized page blkno %u", insert->path[i]);
|
|
|
|
if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer-1]))) {
|
|
LockBuffer(buffers[numbuffer-1], BUFFER_LOCK_UNLOCK);
|
|
ReleaseBuffer(buffers[numbuffer-1]);
|
|
return;
|
|
}
|
|
|
|
pituplen = PageGetMaxOffsetNumber(pages[numbuffer-1]);
|
|
|
|
/* remove old IndexTuples */
|
|
for(j=0;j<pituplen && childfound<lenitup;j++) {
|
|
BlockNumber blkno;
|
|
ItemId iid = PageGetItemId(pages[numbuffer-1], j+FirstOffsetNumber);
|
|
IndexTuple idxtup = (IndexTuple) PageGetItem(pages[numbuffer-1], iid);
|
|
|
|
blkno = ItemPointerGetBlockNumber( &(idxtup->t_tid) );
|
|
|
|
for(k=0;k<lenitup;k++)
|
|
if ( ItemPointerGetBlockNumber( &(itup[k]->t_tid) ) == blkno ) {
|
|
PageIndexTupleDelete(pages[numbuffer-1], j+FirstOffsetNumber);
|
|
j--; pituplen--;
|
|
childfound++;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if ( gistnospace(pages[numbuffer-1], itup, lenitup) ) {
|
|
/* no space left on page, so we should split */
|
|
buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW);
|
|
if (!BufferIsValid(buffers[numbuffer]))
|
|
elog(PANIC, "gistContinueInsert: can't create new block");
|
|
GISTInitBuffer(buffers[numbuffer], 0);
|
|
pages[numbuffer] = BufferGetPage( buffers[numbuffer] );
|
|
gistfillbuffer( index, pages[numbuffer], itup, lenitup, FirstOffsetNumber );
|
|
numbuffer++;
|
|
|
|
if ( BufferGetBlockNumber( buffers[0] ) == GIST_ROOT_BLKNO ) {
|
|
IndexTuple *parentitup;
|
|
|
|
/* we split root, just copy tuples from old root to new page */
|
|
parentitup = gistextractbuffer(buffers[numbuffer-1], &pituplen);
|
|
|
|
/* sanity check */
|
|
if ( i+1 != insert->pathlen )
|
|
elog(PANIC,"gistContinueInsert: can't restore index '%s'",
|
|
RelationGetRelationName( index ));
|
|
|
|
/* fill new page */
|
|
buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW);
|
|
if (!BufferIsValid(buffers[numbuffer]))
|
|
elog(PANIC, "gistContinueInsert: can't create new block");
|
|
GISTInitBuffer(buffers[numbuffer], 0);
|
|
pages[numbuffer] = BufferGetPage( buffers[numbuffer] );
|
|
gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber);
|
|
numbuffer++;
|
|
|
|
/* fill root page */
|
|
GISTInitBuffer(buffers[0], 0);
|
|
for(j=1;j<numbuffer;j++) {
|
|
IndexTuple tuple = gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) );
|
|
if ( InvalidOffsetNumber == PageAddItem(pages[0],
|
|
(Item)tuple,
|
|
IndexTupleSize( tuple ),
|
|
(OffsetNumber)j,
|
|
LP_USED) )
|
|
elog( PANIC,"gistContinueInsert: can't restore index '%s'",
|
|
RelationGetRelationName( index ));
|
|
}
|
|
}
|
|
} else
|
|
gistfillbuffer( index, pages[numbuffer-1], itup, lenitup, InvalidOffsetNumber);
|
|
|
|
lenitup=numbuffer;
|
|
for(j=0;j<numbuffer;j++) {
|
|
itup[j]=gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) );
|
|
PageSetLSN(pages[j], insert->lsn);
|
|
PageSetTLI(pages[j], ThisTimeLineID);
|
|
GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
|
|
LockBuffer(buffers[j], BUFFER_LOCK_UNLOCK);
|
|
WriteBuffer( buffers[j] );
|
|
}
|
|
}
|
|
}
|
|
|
|
elog(LOG,"Detected incomplete insert into GiST index %u/%u/%u; It's desirable to vacuum or reindex index",
|
|
insert->node.spcNode, insert->node.dbNode, insert->node.relNode);
|
|
}
|
|
|
|
void
|
|
gist_xlog_startup(void) {
|
|
incomplete_inserts=NIL;
|
|
insertCtx = AllocSetContextCreate(CurrentMemoryContext,
|
|
"GiST recovery temporary context",
|
|
ALLOCSET_DEFAULT_MINSIZE,
|
|
ALLOCSET_DEFAULT_INITSIZE,
|
|
ALLOCSET_DEFAULT_MAXSIZE);
|
|
opCtx = createTempGistContext();
|
|
}
|
|
|
|
void
|
|
gist_xlog_cleanup(void) {
|
|
ListCell *l;
|
|
List *reverse=NIL;
|
|
MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
|
|
|
|
/* we should call gistContinueInsert in reverse order */
|
|
|
|
foreach(l, incomplete_inserts)
|
|
reverse = lappend(reverse, lfirst(l));
|
|
|
|
MemoryContextSwitchTo(opCtx);
|
|
foreach(l, reverse) {
|
|
gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l);
|
|
gistContinueInsert(insert);
|
|
MemoryContextReset(opCtx);
|
|
}
|
|
MemoryContextSwitchTo(oldCxt);
|
|
|
|
MemoryContextDelete(opCtx);
|
|
MemoryContextDelete(insertCtx);
|
|
}
|
|
|
|
|
|
XLogRecData *
|
|
formSplitRdata(RelFileNode node, BlockNumber blkno,
|
|
ItemPointer key, SplitedPageLayout *dist ) {
|
|
|
|
XLogRecData *rdata;
|
|
gistxlogPageSplit *xlrec = (gistxlogPageSplit*)palloc(sizeof(gistxlogPageSplit));
|
|
SplitedPageLayout *ptr;
|
|
int npage = 0, cur=1;
|
|
|
|
ptr=dist;
|
|
while( ptr ) {
|
|
npage++;
|
|
ptr=ptr->next;
|
|
}
|
|
|
|
rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + 2));
|
|
|
|
xlrec->node = node;
|
|
xlrec->origblkno = blkno;
|
|
xlrec->npage = (uint16)npage;
|
|
if ( key )
|
|
xlrec->key = *key;
|
|
else
|
|
ItemPointerSetInvalid( &(xlrec->key) );
|
|
|
|
rdata[0].buffer = InvalidBuffer;
|
|
rdata[0].data = (char *) xlrec;
|
|
rdata[0].len = sizeof( gistxlogPageSplit );
|
|
rdata[0].next = NULL;
|
|
|
|
ptr=dist;
|
|
while(ptr) {
|
|
rdata[cur].buffer = InvalidBuffer;
|
|
rdata[cur].data = (char*)&(ptr->block);
|
|
rdata[cur].len = sizeof(gistxlogPage);
|
|
rdata[cur-1].next = &(rdata[cur]);
|
|
cur++;
|
|
|
|
rdata[cur].buffer = InvalidBuffer;
|
|
rdata[cur].data = (char*)(ptr->list);
|
|
rdata[cur].len = ptr->lenlist;
|
|
rdata[cur-1].next = &(rdata[cur]);
|
|
rdata[cur].next=NULL;
|
|
cur++;
|
|
ptr=ptr->next;
|
|
}
|
|
|
|
return rdata;
|
|
}
|
|
|
|
|
|
XLogRecData *
|
|
formUpdateRdata(RelFileNode node, BlockNumber blkno,
|
|
OffsetNumber *todelete, int ntodelete, bool emptypage,
|
|
IndexTuple *itup, int ituplen, ItemPointer key ) {
|
|
XLogRecData *rdata;
|
|
gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate*)palloc(sizeof(gistxlogEntryUpdate));
|
|
|
|
xlrec->node = node;
|
|
xlrec->blkno = blkno;
|
|
if ( key )
|
|
xlrec->key = *key;
|
|
else
|
|
ItemPointerSetInvalid( &(xlrec->key) );
|
|
|
|
if ( emptypage ) {
|
|
xlrec->isemptypage = true;
|
|
xlrec->ntodelete = 0;
|
|
|
|
rdata = (XLogRecData*)palloc( sizeof(XLogRecData) );
|
|
rdata->buffer = InvalidBuffer;
|
|
rdata->data = (char*)xlrec;
|
|
rdata->len = sizeof(gistxlogEntryUpdate);
|
|
rdata->next = NULL;
|
|
} else {
|
|
int cur=1,i;
|
|
|
|
xlrec->isemptypage = false;
|
|
xlrec->ntodelete = ntodelete;
|
|
|
|
rdata = (XLogRecData*) palloc( sizeof(XLogRecData) * ( 2 + ituplen ) );
|
|
|
|
rdata->buffer = InvalidBuffer;
|
|
rdata->data = (char*)xlrec;
|
|
rdata->len = sizeof(gistxlogEntryUpdate);
|
|
rdata->next = NULL;
|
|
|
|
if ( ntodelete ) {
|
|
rdata[cur-1].next = &(rdata[cur]);
|
|
rdata[cur].buffer = InvalidBuffer;
|
|
rdata[cur].data = (char*)todelete;
|
|
rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ntodelete);
|
|
rdata[cur].next = NULL;
|
|
cur++;
|
|
}
|
|
|
|
/* new tuples */
|
|
for(i=0;i<ituplen;i++) {
|
|
rdata[cur].buffer = InvalidBuffer;
|
|
rdata[cur].data = (char*)(itup[i]);
|
|
rdata[cur].len = IndexTupleSize(itup[i]);
|
|
rdata[cur].next = NULL;
|
|
rdata[cur-1].next = &(rdata[cur]);
|
|
cur++;
|
|
}
|
|
}
|
|
|
|
return rdata;
|
|
}
|
|
|
|
XLogRecPtr
|
|
gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len) {
|
|
gistxlogInsertComplete xlrec;
|
|
XLogRecData rdata[2];
|
|
XLogRecPtr recptr;
|
|
|
|
Assert(len>0);
|
|
xlrec.node = node;
|
|
|
|
rdata[0].buffer = InvalidBuffer;
|
|
rdata[0].data = (char *) &xlrec;
|
|
rdata[0].len = sizeof( gistxlogInsertComplete );
|
|
rdata[0].next = &(rdata[1]);
|
|
|
|
rdata[1].buffer = InvalidBuffer;
|
|
rdata[1].data = (char *) keys;
|
|
rdata[1].len = sizeof( ItemPointerData ) * len;
|
|
rdata[1].next = NULL;
|
|
|
|
START_CRIT_SECTION();
|
|
|
|
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata);
|
|
|
|
END_CRIT_SECTION();
|
|
|
|
return recptr;
|
|
}
|