From a7fcadd10ab67a9cc938eb2818aae33d5be0238a Mon Sep 17 00:00:00 2001 From: "Vadim B. Mikheev" Date: Sat, 21 Oct 2000 15:43:36 +0000 Subject: [PATCH] WAL --- src/backend/access/gist/gist.c | 27 +- src/backend/access/hash/hash.c | 30 +- src/backend/access/heap/heapam.c | 55 +- src/backend/access/nbtree/nbtinsert.c | 44 +- src/backend/access/nbtree/nbtpage.c | 15 +- src/backend/access/nbtree/nbtree.c | 1239 +++++++++++++----------- src/backend/access/rtree/rtree.c | 27 +- src/backend/access/transam/rmgr.c | 59 +- src/backend/access/transam/xact.c | 38 +- src/backend/access/transam/xlog.c | 181 +++- src/backend/access/transam/xlogutils.c | 4 + src/backend/bootstrap/bootstrap.c | 4 +- src/backend/postmaster/postmaster.c | 16 +- src/backend/storage/buffer/bufmgr.c | 7 +- src/backend/storage/page/bufpage.c | 35 +- src/backend/storage/smgr/smgr.c | 25 +- src/include/access/nbtree.h | 12 +- src/include/access/rmgr.h | 11 +- src/include/access/xlog.h | 11 + src/include/storage/bufpage.h | 3 +- src/include/storage/itemid.h | 6 +- 21 files changed, 1167 insertions(+), 682 deletions(-) diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 640c189886..560f28743f 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -6,7 +6,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.62 2000/07/14 22:17:28 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.63 2000/10/21 15:43:09 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -23,6 +23,12 @@ #include "miscadmin.h" #include "utils/syscache.h" +#ifdef XLOG +#include "access/xlogutils.h" +void gist_redo(XLogRecPtr lsn, XLogRecord *record); +void gist_undo(XLogRecPtr lsn, XLogRecord *record); +void gist_desc(char *buf, uint8 xl_info, char* rec); +#endif /* non-export function prototypes */ static InsertIndexResult gistdoinsert(Relation r, IndexTuple itup, @@ -1344,3 +1350,22 @@ int_range_out(INTRANGE *r) } #endif /* defined GISTDEBUG */ + +#ifdef XLOG +void +gist_redo(XLogRecPtr lsn, XLogRecord *record) +{ + elog(STOP, "gist_redo: unimplemented"); +} + +void +gist_undo(XLogRecPtr lsn, XLogRecord *record) +{ + elog(STOP, "gist_undo: unimplemented"); +} + +void +gist_desc(char *buf, uint8 xl_info, char* rec) +{ +} +#endif diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c index cb740bbde9..8db80d5154 100644 --- a/src/backend/access/hash/hash.c +++ b/src/backend/access/hash/hash.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/hash/hash.c,v 1.42 2000/07/14 22:17:28 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/hash/hash.c,v 1.43 2000/10/21 15:43:11 vadim Exp $ * * NOTES * This file contains only the public interface routines. @@ -25,9 +25,16 @@ #include "executor/executor.h" #include "miscadmin.h" - bool BuildingHash = false; +#ifdef XLOG +#include "access/xlogutils.h" +void hash_redo(XLogRecPtr lsn, XLogRecord *record); +void hash_undo(XLogRecPtr lsn, XLogRecord *record); +void hash_desc(char *buf, uint8 xl_info, char* rec); +#endif + + /* * hashbuild() -- build a new hash index. * @@ -478,3 +485,22 @@ hashdelete(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + +#ifdef XLOG +void +hash_redo(XLogRecPtr lsn, XLogRecord *record) +{ + elog(STOP, "hash_redo: unimplemented"); +} + +void +hash_undo(XLogRecPtr lsn, XLogRecord *record) +{ + elog(STOP, "hash_undo: unimplemented"); +} + +void +hash_desc(char *buf, uint8 xl_info, char* rec) +{ +} +#endif diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index fa6afa8057..076fa115f0 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.89 2000/10/20 11:01:02 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.90 2000/10/21 15:43:14 vadim Exp $ * * * INTERFACE ROUTINES @@ -86,12 +86,14 @@ #include "utils/inval.h" #include "utils/relcache.h" -#ifdef XLOG /* comments are in heap_update */ +#ifdef XLOG #include "access/xlogutils.h" void heap_redo(XLogRecPtr lsn, XLogRecord *record); void heap_undo(XLogRecPtr lsn, XLogRecord *record); +void heap_desc(char *buf, uint8 xl_info, char* rec); +/* comments are in heap_update */ static xl_heaptid _locked_tuple_; static void _heap_unlock_tuple(void *data); @@ -2480,4 +2482,53 @@ HeapPageCleanup(Buffer buffer) PageRepairFragmentation(page); } +static void +out_target(char *buf, xl_heaptid *target) +{ + sprintf(buf + strlen(buf), "node %u/%u; cid %u; tid %u/%u", + target->node.tblNode, target->node.relNode, + target->cid, + ItemPointerGetBlockNumber(&(target->tid)), + ItemPointerGetOffsetNumber(&(target->tid))); +} + +void +heap_desc(char *buf, uint8 xl_info, char* rec) +{ + uint8 info = xl_info & ~XLR_INFO_MASK; + + if (info == XLOG_HEAP_INSERT) + { + xl_heap_insert *xlrec = (xl_heap_insert*) rec; + strcat(buf, "insert: "); + out_target(buf, &(xlrec->target)); + } + else if (info == XLOG_HEAP_DELETE) + { + xl_heap_delete *xlrec = (xl_heap_delete*) rec; + strcat(buf, "delete: "); + out_target(buf, &(xlrec->target)); + } + else if (info == XLOG_HEAP_UPDATE) + { + xl_heap_update *xlrec = (xl_heap_update*) rec; + strcat(buf, "update: "); + out_target(buf, &(xlrec->target)); + sprintf(buf + strlen(buf), "; new %u/%u", + ItemPointerGetBlockNumber(&(xlrec->newtid)), + ItemPointerGetOffsetNumber(&(xlrec->newtid))); + } + else if (info == XLOG_HEAP_MOVE) + { + xl_heap_move *xlrec = (xl_heap_move*) rec; + strcat(buf, "move: "); + out_target(buf, &(xlrec->target)); + sprintf(buf + strlen(buf), "; new %u/%u", + ItemPointerGetBlockNumber(&(xlrec->newtid)), + ItemPointerGetOffsetNumber(&(xlrec->newtid))); + } + else + strcat(buf, "UNKNOWN"); +} + #endif /* XLOG */ diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index 0105739b5c..4b92f2f561 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.66 2000/10/13 12:05:20 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.67 2000/10/21 15:43:18 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -527,12 +527,13 @@ _bt_insertonpg(Relation rel, { char xlbuf[sizeof(xl_btree_insert) + sizeof(CommandId) + sizeof(RelFileNode)]; - xl_btree_insert *xlrec = xlbuf; + xl_btree_insert *xlrec = (xl_btree_insert*)xlbuf; int hsize = SizeOfBtreeInsert; BTItemData truncitem; BTItem xlitem = btitem; Size xlsize = IndexTupleDSize(btitem->bti_itup) + (sizeof(BTItemData) - sizeof(IndexTupleData)); + XLogRecPtr recptr; xlrec->target.node = rel->rd_node; ItemPointerSet(&(xlrec->target.tid), BufferGetBlockNumber(buf), newitemoff); @@ -555,7 +556,7 @@ _bt_insertonpg(Relation rel, xlsize = sizeof(BTItemData); } - XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_INSERT, + recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_INSERT, xlbuf, hsize, (char*) xlitem, xlsize); PageSetLSN(page, recptr); @@ -785,17 +786,19 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, { char xlbuf[sizeof(xl_btree_split) + sizeof(CommandId) + sizeof(RelFileNode) + BLCKSZ]; - xl_btree_split *xlrec = xlbuf; + xl_btree_split *xlrec = (xl_btree_split*) xlbuf; int hsize = SizeOfBtreeSplit; int flag = (newitemonleft) ? XLOG_BTREE_SPLEFT : XLOG_BTREE_SPLIT; + BlockNumber blkno; + XLogRecPtr recptr; xlrec->target.node = rel->rd_node; - ItemPointerSet(&(xlrec->target.tid), itup_blkno, itup_off); + ItemPointerSet(&(xlrec->target.tid), *itup_blkno, *itup_off); if (P_ISLEAF(lopaque)) { CommandId cid = GetCurrentCommandId(); - memcpy(xlbuf + hsize, &(char*)cid, sizeof(CommandId)); + memcpy(xlbuf + hsize, &cid, sizeof(CommandId)); hsize += sizeof(CommandId); memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode)); hsize += sizeof(RelFileNode); @@ -814,7 +817,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * Actually, seems that in non-leaf splits newitem shouldn't * go to first data key position on left page. */ - if (! P_ISLEAF(lopaque) && itup_off == P_FIRSTDATAKEY(lopaque)) + if (! P_ISLEAF(lopaque) && *itup_off == P_FIRSTDATAKEY(lopaque)) { BTItemData truncitem = *newitem; truncitem.bti_itup.t_info = sizeof(BTItemData); @@ -828,20 +831,24 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, memcpy(xlbuf + hsize, (char*) newitem, itemsz); hsize += itemsz; } - xlrec->otherblk = BufferGetBlockNumber(rbuf); + blkno = BufferGetBlockNumber(rbuf); + BlockIdSet(&(xlrec->otherblk), blkno); } else - xlrec->otherblk = BufferGetBlockNumber(buf); + { + blkno = BufferGetBlockNumber(buf); + BlockIdSet(&(xlrec->otherblk), blkno); + } - xlrec->rightblk = ropaque->btpo_next; + BlockIdSet(&(xlrec->rightblk), ropaque->btpo_next); /* * Dirrect access to page is not good but faster - we should * implement some new func in page API. */ - XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, flag, xlbuf, - hsize, (char*)rightpage + (PageHeader) rightpage)->pd_upper, - ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->upper); + recptr = XLogInsert(RM_BTREE_ID, flag, xlbuf, + hsize, (char*)rightpage + ((PageHeader) rightpage)->pd_upper, + ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper); PageSetLSN(leftpage, recptr); PageSetSUI(leftpage, ThisStartUpID); @@ -1070,7 +1077,7 @@ static Buffer _bt_getstackbuf(Relation rel, BTStack stack) { BlockNumber blkno; - Buffer buf, newbuf; + Buffer buf; OffsetNumber start, offnum, maxoff; @@ -1236,6 +1243,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) xl_btree_newroot xlrec; Page metapg = BufferGetPage(metabuf); BTMetaPageData *metad = BTPageGetMeta(metapg); + XLogRecPtr recptr; xlrec.node = rel->rd_node; BlockIdSet(&(xlrec.rootblk), rootblknum); @@ -1244,10 +1252,10 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) * Dirrect access to page is not good but faster - we should * implement some new func in page API. */ - XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, - &xlrec, SizeOfBtreeNewroot, - (char*)rootpage + (PageHeader) rootpage)->pd_upper, - ((PageHeader) rootpage)->pd_special - ((PageHeader) rootpage)->upper); + recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, + (char*)&xlrec, SizeOfBtreeNewroot, + (char*)rootpage + ((PageHeader) rootpage)->pd_upper, + ((PageHeader) rootpage)->pd_special - ((PageHeader) rootpage)->pd_upper); metad->btm_root = rootblknum; (metad->btm_level)++; diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index 41acd11659..3cf0e68885 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.39 2000/10/13 02:03:00 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.40 2000/10/21 15:43:18 vadim Exp $ * * NOTES * Postgres btree pages look like ordinary relation pages. The opaque @@ -171,13 +171,14 @@ _bt_getroot(Relation rel, int access) #ifdef XLOG /* XLOG stuff */ { - xl_btree_newroot xlrec; + xl_btree_newroot xlrec; + XLogRecPtr recptr; xlrec.node = rel->rd_node; BlockIdSet(&(xlrec.rootblk), rootblkno); - XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, - &xlrec, SizeOfBtreeNewroot, NULL, 0); + recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, + (char*)&xlrec, SizeOfBtreeNewroot, NULL, 0); PageSetLSN(rootpage, recptr); PageSetSUI(rootpage, ThisStartUpID); @@ -404,10 +405,12 @@ _bt_pagedel(Relation rel, ItemPointer tid) /* XLOG stuff */ { xl_btree_delete xlrec; + XLogRecPtr recptr; + xlrec.target.node = rel->rd_node; xlrec.target.tid = *tid; - XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, - (char*) xlrec, SizeOfBtreeDelete, NULL, 0); + recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, + (char*) &xlrec, SizeOfBtreeDelete, NULL, 0); PageSetLSN(page, recptr); PageSetSUI(page, ThisStartUpID); diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 60ea3162d6..7c71549673 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -12,7 +12,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.66 2000/10/20 11:01:03 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.67 2000/10/21 15:43:18 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -32,6 +32,14 @@ bool BuildingBtree = false; /* see comment in btbuild() */ bool FastBuild = true; /* use sort/build instead of insertion * build */ +#ifdef XLOG +#include "access/xlogutils.h" + +void btree_redo(XLogRecPtr lsn, XLogRecord *record); +void btree_undo(XLogRecPtr lsn, XLogRecord *record); +void btree_desc(char *buf, uint8 xl_info, char* rec); +#endif + static void _bt_restscan(IndexScanDesc scan); /* @@ -732,471 +740,158 @@ _bt_restscan(IndexScanDesc scan) } #ifdef XLOG -void btree_redo(XLogRecPtr lsn, XLogRecord *record) + +static bool +_bt_cleanup_page(Page page, RelFileNode hnode) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; + OffsetNumber maxoff = PageGetMaxOffsetNumber(page); + BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); + OffsetNumber offno; + ItemId lp; + BTItem item; + bool result = false; - if (info == XLOG_BTREE_DELETE) - btree_xlog_delete(true, lsn, record); - else if (info == XLOG_BTREE_INSERT) - btree_xlog_insert(true, lsn, record); - else if (info == XLOG_BTREE_SPLIT) - btree_xlog_split(true, false, lsn, record); /* new item on the right */ - else if (info == XLOG_BTREE_SPLEFT) - btree_xlog_split(true, true, lsn, record); /* new item on the left */ - else if (info == XLOG_BTREE_NEWROOT) - btree_xlog_newroot(true, lsn, record); - else - elog(STOP, "btree_redo: unknown op code %u", info); -} - -void btree_undo(XLogRecPtr lsn, XLogRecord *record) -{ - uint8 info = record->xl_info & ~XLR_INFO_MASK; - - if (info == XLOG_BTREE_DELETE) - btree_xlog_delete(false, lsn, record); - else if (info == XLOG_BTREE_INSERT) - btree_xlog_insert(false, lsn, record); - else if (info == XLOG_BTREE_SPLIT) - btree_xlog_split(false, false, lsn, record);/* new item on the right */ - else if (info == XLOG_BTREE_SPLEFT) - btree_xlog_split(false, true, lsn, record); /* new item on the left */ - else if (info == XLOG_BTREE_NEWROOT) - btree_xlog_newroot(false, lsn, record); - else - elog(STOP, "btree_undo: unknown op code %u", info); -} - -static void btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) -{ - xl_btree_delete *xlrec; - Relation *reln; - Buffer buffer; - Page page; - - if (!redo) - return; - - xlrec = (xl_btree_delete*) XLogRecGetData(record); - reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); - if (!RelationIsValid(reln)) - return; - buffer = XLogReadBuffer(false, reln, - ItemPointerGetBlockNumber(&(xlrec->target.tid))); - if (!BufferIsValid(buffer)) - elog(STOP, "btree_delete_redo: block unfound"); - page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(STOP, "btree_delete_redo: uninitialized page"); - - PageIndexTupleDelete(page, ItemPointerGetOffsetNumber(&(xlrec->target.tid))); - - return; -} - -static void btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) -{ - xl_btree_insert *xlrec; - Relation *reln; - Buffer buffer; - Page page; - BTPageOpaque pageop; - - xlrec = (xl_btree_insert*) XLogRecGetData(record); - reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); - if (!RelationIsValid(reln)) - return; - buffer = XLogReadBuffer((redo) ? true : false, reln, - ItemPointerGetBlockNumber(&(xlrec->target.tid))); - if (!BufferIsValid(buffer)) - return; - page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(STOP, "btree_insert_%s: uninitialized page", - (redo) ? "redo" : "undo"); - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - - if (redo) + for (offno = P_FIRSTDATAKEY(pageop); offno <= maxoff; ) { - if (XLByteLE(lsn, PageGetLSN(page))) - UnlockAndReleaseBuffer(buffer); + lp = PageGetItemId(page, offno); + item = (BTItem) PageGetItem(page, lp); + if (XLogIsValidTuple(hnode, &(item->bti_itup.t_tid))) + offno = OffsetNumberNext(offno); else { - Size hsize = SizeOfBtreeInsert; - RelFileNode hnode; - - if (P_ISLEAF(pageop)) - { - hsize += (sizeof(CommandId) + sizeof(RelFileNode)); - memcpy(&hnode, (char*)xlrec + SizeOfBtreeInsert + - sizeof(CommandId), sizeof(RelFileNode)); - } - - if (! _bt_add_item(page, - ItemPointerGetOffsetNumber(&(xlrec->target.tid)), - (char*)xlrec + hsize, - record->xl_len - hsize, - hnode)) - elog(STOP, "btree_insert_redo: failed to add item"); - - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); + PageIndexTupleDelete(page, offno); + maxoff = PageGetMaxOffsetNumber(page); + result = true; } } - else - { - BTItemData btdata; - if (XLByteLT(PageGetLSN(page), lsn)) - elog(STOP, "btree_insert_undo: bad page LSN"); + return(result); +} + +static bool +_bt_add_item(Page page, OffsetNumber offno, + char* item, Size size, RelFileNode hnode) +{ + BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + if (offno > PageGetMaxOffsetNumber(page) + 1) + { + if (! (pageop->btpo_flags & BTP_REORDER)) + { + elog(NOTICE, "btree_add_item: BTP_REORDER flag was expected"); + pageop->btpo_flags |= BTP_REORDER; + } + offno = PageGetMaxOffsetNumber(page) + 1; + } + + if (PageAddItem(page, (Item) item, size, offno, + LP_USED) == InvalidOffsetNumber) + { + /* ops, not enough space - try to deleted dead tuples */ + bool result; if (! P_ISLEAF(pageop)) - { - UnlockAndReleaseBuffer(buffer); - return; - } - - memcpy(&btdata, (char*)xlrec + SizeOfBtreeInsert + - sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData)); - - _bt_del_item(reln, buffer, &btdata, true, lsn, record); - + return(false); + result = _bt_cleanup_page(page, hnode); + if (!result || PageAddItem(page, (Item) item, size, offno, + LP_USED) == InvalidOffsetNumber) + return(false); } - return; + return(true); } +/* + * Remove from left sibling items belonging to right sibling + * and change P_HIKEY + */ static void -btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) +_bt_fix_left_page(Page page, XLogRecord *record, bool onleft) { - xl_btree_split *xlrec; - Relation *reln; - BlockNumber blkno; - BlockNumber parent; - Buffer buffer; - Page page; - BTPageOpaque pageop; - char *op = (redo) ? "redo" : "undo"; - bool isleaf; + char *xlrec = (char*) XLogRecGetData(record); + BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); + Size hsize = SizeOfBtreeSplit; + RelFileNode hnode; + BTItemData btdata; + OffsetNumber maxoff = PageGetMaxOffsetNumber(page); + OffsetNumber offno; + char *item; + Size itemsz; + char *previtem = NULL; + char *lhikey = NULL; + Size lhisize = 0; - xlrec = (xl_btree_split*) XLogRecGetData(record); - reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); - if (!RelationIsValid(reln)) - return; - - /* Left (original) sibling */ - blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) : - BlockIdGetBlockNumber(xlrec->otherblk); - buffer = XLogReadBuffer(false, reln, blkno); - if (!BufferIsValid(buffer)) - elog(STOP, "btree_split_%s: lost left sibling", op); - - page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(STOP, "btree_split_%s: uninitialized left sibling", op); - - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - isleaf = P_ISLEAF(pageop); - parent = pageop->btpo_parent; - - if (redo) + if (pageop->btpo_flags & BTP_LEAF) { - if (XLByteLE(lsn, PageGetLSN(page))) - UnlockAndReleaseBuffer(buffer); - else - { - /* Delete items related to new right sibling */ - _bt_fix_left_page(page, record, onleft); - - if (onleft) - { - BTItemData btdata; - Size hsize = SizeOfBtreeSplit; - Size itemsz; - RelFileNode hnode; - - pageop->btpo_next = BlockIdGetBlockNumber(xlrec->otherblk); - if (isleaf) - { - hsize += (sizeof(CommandId) + sizeof(RelFileNode)); - memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + - sizeof(CommandId), sizeof(RelFileNode)); - } - else - { - memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); - itemsz = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - hsize += itemsz; - } - - memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); - itemsz = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - - if (! _bt_add_item(page, - ItemPointerGetOffsetNumber(&(xlrec->target.tid)), - (char*)xlrec + hsize, - itemsz, - hnode)) - elog(STOP, "btree_split_redo: failed to add item"); - } - else - pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid)); - - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); - } - } - else /* undo */ - { - if (XLByteLT(PageGetLSN(page), lsn)) - elog(STOP, "btree_split_undo: bad left sibling LSN"); - - if (! isleaf || ! onleft) - UnlockAndReleaseBuffer(buffer); - else - { - BTItemData btdata; - - memcpy(&btdata, (char*)xlrec + SizeOfBtreeSplit + - sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData)); - - _bt_del_item(reln, buffer, &btdata, false, lsn, record); - } - } - - /* Right (new) sibling */ - blkno = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) : - ItemPointerGetBlockNumber(&(xlrec->target.tid)); - buffer = XLogReadBuffer((redo) ? true : false, reln, blkno); - if (!BufferIsValid(buffer)) - elog(STOP, "btree_split_%s: lost right sibling", op); - - page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - { - if (!redo) - elog(STOP, "btree_split_undo: uninitialized right sibling"); - PageInit(page, BufferGetPageSize(buffer), 0); - } - - if (redo) - { - if (XLByteLE(lsn, PageGetLSN(page))) - UnlockAndReleaseBuffer(buffer); - else - { - Size hsize = SizeOfBtreeSplit; - BTItemData btdata; - Size itemsz; - - _bt_pageinit(page, BufferGetPageSize(buffer)); - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - if (isleaf) - { - pageop->btpo_flags |= BTP_LEAF; - hsize += (sizeof(CommandId) + sizeof(RelFileNode)); - } - else - { - memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); - itemsz = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - hsize += itemsz; - } - if (onleft) /* skip target item */ - { - memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); - itemsz = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - hsize += itemsz; - } - - for (char* item = (char*)xlrec + hsize; - item < (char*)record + record->xl_len; ) - { - memcpy(&btdata, item, sizeof(BTItemData)); - itemsz = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - itemsz = MAXALIGN(itemsz); - if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber, - LP_USED) == InvalidOffsetNumber) - elog(STOP, "btree_split_redo: can't add item to right sibling"); - item += itemsz; - } - - pageop->btpo_prev = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) : - BlockIdGetBlockNumber(xlrec->otherblk); - pageop->btpo_next = BlockIdGetBlockNumber(xlrec->rightblk); - pageop->btpo_parent = parent; - - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); - } - } - else /* undo */ - { - if (XLByteLT(PageGetLSN(page), lsn)) - elog(STOP, "btree_split_undo: bad right sibling LSN"); - - if (! isleaf || onleft) - UnlockAndReleaseBuffer(buffer); - else - { - char tbuf[BLCKSZ]; - int cnt; - char *item; - Size itemsz; - - item = (char*)xlrec + SizeOfBtreeSplit + - sizeof(CommandId) + sizeof(RelFileNode); - for (cnt = 0; item < (char*)record + record->xl_len; ) - { - BTItem btitem = (BTItem) - (tbuf + cnt * (MAXALIGN(sizeof(BTItemData)))); - memcpy(btitem, item, sizeof(BTItemData)); - itemsz = IndexTupleDSize(btitem->bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - itemsz = MAXALIGN(itemsz); - item += itemsz; - cnt++; - } - cnt -= ItemPointerGetOffsetNumber(&(xlrec->target.tid)); - if (cnt < 0) - elog(STOP, "btree_split_undo: target item unfound in right sibling"); - - item = tbuf + cnt * (MAXALIGN(sizeof(BTItemData))); - - _bt_del_item(reln, buffer, (BTItem)item, false, lsn, record); - } - } - - /* Right (next) page */ - blkno = BlockIdGetBlockNumber(xlrec->rightblk); - buffer = XLogReadBuffer(false, reln, blkno); - if (!BufferIsValid(buffer)) - elog(STOP, "btree_split_%s: lost next right page", op); - - page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(STOP, "btree_split_%s: uninitialized next right page", op); - - if (redo) - { - if (XLByteLE(lsn, PageGetLSN(page))) - UnlockAndReleaseBuffer(buffer); - else - { - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - pageop->btpo_prev = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) : - ItemPointerGetBlockNumber(&(xlrec->target.tid)); - - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); - } - } - else /* undo */ - { - if (XLByteLT(PageGetLSN(page), lsn)) - elog(STOP, "btree_split_undo: bad next right page LSN"); - - UnlockAndReleaseBuffer(buffer); - } - -} - -static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) -{ - xl_btree_newroot *xlrec; - Relation *reln; - Buffer buffer; - Page page; - Buffer metabuf; - Page metapg; - - if (!redo) - return; - - xlrec = (xl_btree_newroot*) XLogRecGetData(record); - reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node); - if (!RelationIsValid(reln)) - return; - buffer = XLogReadBuffer(true, reln, BlockIdGetBlockNumber(&(xlrec->rootblk))); - if (!BufferIsValid(buffer)) - elog(STOP, "btree_newroot_redo: no root page"); - metabuf = XLogReadBuffer(false, reln, BTREE_METAPAGE); - if (!BufferIsValid(buffer)) - elog(STOP, "btree_newroot_redo: no metapage"); - page = (Page) BufferGetPage(buffer); - - if (PageIsNew((PageHeader) page) || XLByteLT(PageGetLSN(page), lsn)) - { - _bt_pageinit(page, BufferGetPageSize(buffer)); - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - - pageop->btpo_flags |= BTP_ROOT; - pageop->btpo_prev = pageop->btpo_next = P_NONE; - pageop->btpo_parent = BTREE_METAPAGE; - - if (record->xl_len == SizeOfBtreeNewroot) /* no childs */ - pageop->btpo_flags |= BTP_LEAF; - else - { - BTItemData btdata; - Size itemsz; - - for (char* item = (char*)xlrec + SizeOfBtreeNewroot; - item < (char*)record + record->xl_len; ) - { - memcpy(&btdata, item, sizeof(BTItemData)); - itemsz = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - itemsz = MAXALIGN(itemsz); - if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber, - LP_USED) == InvalidOffsetNumber) - elog(STOP, "btree_newroot_redo: can't add item"); - item += itemsz; - } - } - - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); + hsize += (sizeof(CommandId) + sizeof(RelFileNode)); + memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + + sizeof(CommandId), sizeof(RelFileNode)); } else - UnlockAndReleaseBuffer(buffer); - - metapg = BufferGetPage(metabuf); - if (PageIsNew((PageHeader) metapg)) { - BTMetaPageData md; - - _bt_pageinit(metapg, BufferGetPageSize(metabuf)); - md.btm_magic = BTREE_MAGIC; - md.btm_version = BTREE_VERSION; - md.btm_root = P_NONE; - md.btm_level = 0; - memcpy((char *) BTPageGetMeta(pg), (char *) &md, sizeof(md)); + lhikey = (char*)xlrec + hsize; + memcpy(&btdata, lhikey, sizeof(BTItemData)); + lhisize = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + hsize += lhisize; } - if (XLByteLT(PageGetLSN(metapg), lsn)) - { - BTMetaPageData *metad = BTPageGetMeta(metapg); + if (! P_RIGHTMOST(pageop)) + PageIndexTupleDelete(page, P_HIKEY); - metad->btm_root = BlockIdGetBlockNumber(&(xlrec->rootblk)); - (metad->btm_level)++; - PageSetLSN(metapg, lsn); - PageSetSUI(metapg, ThisStartUpID); - UnlockAndWriteBuffer(metabuf); + if (onleft) /* skip target item */ + { + memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + hsize += itemsz; } - else - UnlockAndReleaseBuffer(metabuf); + + for (item = (char*)xlrec + hsize; ; ) + { + memcpy(&btdata, item, sizeof(BTItemData)); + for (offno = P_FIRSTDATAKEY(pageop); + offno <= maxoff; + offno = OffsetNumberNext(offno)) + { + ItemId lp = PageGetItemId(page, offno); + BTItem btitem = (BTItem) PageGetItem(page, lp); + + if (BTItemSame(&btdata, btitem)) + { + PageIndexTupleDelete(page, offno); + break; + } + } + + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = MAXALIGN(itemsz); + + if (item + itemsz < (char*)record + record->xl_len) + { + previtem = item; + item += itemsz; + } + else + break; + } + + /* time to insert hi-key */ + if (pageop->btpo_flags & BTP_LEAF) + { + lhikey = (P_RIGHTMOST(pageop)) ? item : previtem; + memcpy(&btdata, lhikey, sizeof(BTItemData)); + lhisize = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + } + + if (! _bt_add_item(page, + P_HIKEY, + lhikey, + lhisize, + hnode)) + elog(STOP, "btree_split_redo: failed to add hi key to left sibling"); return; } @@ -1298,158 +993,530 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert, return; } -static bool -_bt_add_item(Page page, OffsetNumber offno, - char* item, Size size, RelFileNode hnode) -{ - BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); - - if (offno > PageGetMaxOffsetNumber(page) + 1) - { - if (! (pageop->btpo_flags & BTP_REORDER)) - { - elog(NOTICE, "btree_add_item: BTP_REORDER flag was expected"); - pageop->btpo_flags |= BTP_REORDER; - } - offno = PageGetMaxOffsetNumber(page) + 1; - } - - if (PageAddItem(page, (Item) item, size, offno, - LP_USED) == InvalidOffsetNumber) - { - /* ops, not enough space - try to deleted dead tuples */ - bool result; - - if (! P_ISLEAF(pageop)) - return(false); - result = _bt_cleanup_page(page, hnode); - if (!result || PageAddItem(page, (Item) item, size, offno, - LP_USED) == InvalidOffsetNumber) - return(false); - } - - return(true); -} - -static bool -_bt_cleanup_page(Page page, RelFileNode hnode) -{ - OffsetNumber maxoff = PageGetMaxOffsetNumber(page); - OffsetNumber offno; - ItemId lp; - BTItem item; - bool result = false; - - for (offno = P_FIRSTDATAKEY(pageop); offno <= maxoff; ) - { - lp = PageGetItemId(page, offno); - item = (BTItem) PageGetItem(page, lp); - if (XLogIsValidTuple(hnode, &(item->bti_itup.t_tid)) - offno = OffsetNumberNext(offno); - else - { - PageIndexTupleDelete(page, offno); - maxoff = PageGetMaxOffsetNumber(page); - result = true; - } - } - - return(result); -} - -/* - * Remove from left sibling items belonging to right sibling - * and change P_HIKEY - */ static void -_bt_fix_left_page(Page page, XLogRecord *record, bool onleft) +btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) { - char *xlrec = (char*) XLogRecGetData(record); - BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); - Size hsize = SizeOfBtreeSplit; - RelFileNode hnode; - BTItemData btdata; - OffsetNumber maxoff = PageGetMaxOffsetNumber(page); - OffsetNumber offno; - char *item; - Size itemsz; - char *previtem = NULL; - char *lhikey = NULL; - Size lhisize = 0; + xl_btree_delete *xlrec; + Relation reln; + Buffer buffer; + Page page; - if (pageop->btpo_flags & BTP_LEAF) - { - hsize += (sizeof(CommandId) + sizeof(RelFileNode)); - memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + - sizeof(CommandId), sizeof(RelFileNode)); - } - else - { - lhikey = (char*)xlrec + hsize; - memcpy(&btdata, lhikey, sizeof(BTItemData)); - lhisize = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - hsize += lhisize; - } + if (!redo) + return; - if (! P_RIGHTMOST(pageop)) - PageIndexTupleDelete(page, P_HIKEY); + xlrec = (xl_btree_delete*) XLogRecGetData(record); + reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); + if (!RelationIsValid(reln)) + return; + buffer = XLogReadBuffer(false, reln, + ItemPointerGetBlockNumber(&(xlrec->target.tid))); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_delete_redo: block unfound"); + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(STOP, "btree_delete_redo: uninitialized page"); - if (onleft) /* skip target item */ - { - memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); - itemsz = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - hsize += itemsz; - } - - for (item = (char*)xlrec + hsize; ; ) - { - memcpy(&btdata, item, sizeof(BTItemData)); - for (offno = P_FIRSTDATAKEY(pageop); - offno <= maxoff; - offno = OffsetNumberNext(offno)) - { - ItemId lp = PageGetItemId(page, offno); - BTItem btitem = (BTItem) PageGetItem(page, lp); - - if (BTItemSame(&btdata, btitem)) - { - PageIndexTupleDelete(page, offno); - break; - } - } - - itemsz = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - itemsz = MAXALIGN(itemsz); - - if (item + itemsz < (char*)record + record->xl_len) - { - previtem = item; - item += itemsz; - } - else - break; - } - - /* time to insert hi-key */ - if (pageop->btpo_flags & BTP_LEAF) - { - lhikey = (P_RIGHTMOST(pageop)) ? item : previtem; - memcpy(&btdata, lhikey, sizeof(BTItemData)); - lhisize = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - } - - if (! _bt_add_item(page, - P_HIKEY, - lhikey, - lhisize, - &hnode)) - elog(STOP, "btree_split_redo: failed to add hi key to left sibling"); + PageIndexTupleDelete(page, ItemPointerGetOffsetNumber(&(xlrec->target.tid))); return; } +static void +btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) +{ + xl_btree_insert *xlrec; + Relation reln; + Buffer buffer; + Page page; + BTPageOpaque pageop; + + xlrec = (xl_btree_insert*) XLogRecGetData(record); + reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); + if (!RelationIsValid(reln)) + return; + buffer = XLogReadBuffer((redo) ? true : false, reln, + ItemPointerGetBlockNumber(&(xlrec->target.tid))); + if (!BufferIsValid(buffer)) + return; + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(STOP, "btree_insert_%s: uninitialized page", + (redo) ? "redo" : "undo"); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) + UnlockAndReleaseBuffer(buffer); + else + { + Size hsize = SizeOfBtreeInsert; + RelFileNode hnode; + + if (P_ISLEAF(pageop)) + { + hsize += (sizeof(CommandId) + sizeof(RelFileNode)); + memcpy(&hnode, (char*)xlrec + SizeOfBtreeInsert + + sizeof(CommandId), sizeof(RelFileNode)); + } + + if (! _bt_add_item(page, + ItemPointerGetOffsetNumber(&(xlrec->target.tid)), + (char*)xlrec + hsize, + record->xl_len - hsize, + hnode)) + elog(STOP, "btree_insert_redo: failed to add item"); + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + } + else + { + BTItemData btdata; + + if (XLByteLT(PageGetLSN(page), lsn)) + elog(STOP, "btree_insert_undo: bad page LSN"); + + if (! P_ISLEAF(pageop)) + { + UnlockAndReleaseBuffer(buffer); + return; + } + + memcpy(&btdata, (char*)xlrec + SizeOfBtreeInsert + + sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData)); + + _bt_del_item(reln, buffer, &btdata, true, lsn, record); + + } + + return; +} + +static void +btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) +{ + xl_btree_split *xlrec; + Relation reln; + BlockNumber blkno; + BlockNumber parent; + Buffer buffer; + Page page; + BTPageOpaque pageop; + char *op = (redo) ? "redo" : "undo"; + bool isleaf; + + xlrec = (xl_btree_split*) XLogRecGetData(record); + reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); + if (!RelationIsValid(reln)) + return; + + /* Left (original) sibling */ + blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) : + BlockIdGetBlockNumber(&(xlrec->otherblk)); + buffer = XLogReadBuffer(false, reln, blkno); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_split_%s: lost left sibling", op); + + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(STOP, "btree_split_%s: uninitialized left sibling", op); + + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + isleaf = P_ISLEAF(pageop); + parent = pageop->btpo_parent; + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) + UnlockAndReleaseBuffer(buffer); + else + { + /* Delete items related to new right sibling */ + _bt_fix_left_page(page, record, onleft); + + if (onleft) + { + BTItemData btdata; + Size hsize = SizeOfBtreeSplit; + Size itemsz; + RelFileNode hnode; + + pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->otherblk)); + if (isleaf) + { + hsize += (sizeof(CommandId) + sizeof(RelFileNode)); + memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + + sizeof(CommandId), sizeof(RelFileNode)); + } + else + { + memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + hsize += itemsz; + } + + memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + + if (! _bt_add_item(page, + ItemPointerGetOffsetNumber(&(xlrec->target.tid)), + (char*)xlrec + hsize, + itemsz, + hnode)) + elog(STOP, "btree_split_redo: failed to add item"); + } + else + pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid)); + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + } + else /* undo */ + { + if (XLByteLT(PageGetLSN(page), lsn)) + elog(STOP, "btree_split_undo: bad left sibling LSN"); + + if (! isleaf || ! onleft) + UnlockAndReleaseBuffer(buffer); + else + { + BTItemData btdata; + + memcpy(&btdata, (char*)xlrec + SizeOfBtreeSplit + + sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData)); + + _bt_del_item(reln, buffer, &btdata, false, lsn, record); + } + } + + /* Right (new) sibling */ + blkno = (onleft) ? BlockIdGetBlockNumber(&(xlrec->otherblk)) : + ItemPointerGetBlockNumber(&(xlrec->target.tid)); + buffer = XLogReadBuffer((redo) ? true : false, reln, blkno); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_split_%s: lost right sibling", op); + + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + { + if (!redo) + elog(STOP, "btree_split_undo: uninitialized right sibling"); + PageInit(page, BufferGetPageSize(buffer), 0); + } + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) + UnlockAndReleaseBuffer(buffer); + else + { + Size hsize = SizeOfBtreeSplit; + BTItemData btdata; + Size itemsz; + char *item; + + _bt_pageinit(page, BufferGetPageSize(buffer)); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + if (isleaf) + { + pageop->btpo_flags |= BTP_LEAF; + hsize += (sizeof(CommandId) + sizeof(RelFileNode)); + } + else + { + memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + hsize += itemsz; + } + if (onleft) /* skip target item */ + { + memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + hsize += itemsz; + } + + for (item = (char*)xlrec + hsize; + item < (char*)record + record->xl_len; ) + { + memcpy(&btdata, item, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = MAXALIGN(itemsz); + if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber, + LP_USED) == InvalidOffsetNumber) + elog(STOP, "btree_split_redo: can't add item to right sibling"); + item += itemsz; + } + + pageop->btpo_prev = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) : + BlockIdGetBlockNumber(&(xlrec->otherblk)); + pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->rightblk)); + pageop->btpo_parent = parent; + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + } + else /* undo */ + { + if (XLByteLT(PageGetLSN(page), lsn)) + elog(STOP, "btree_split_undo: bad right sibling LSN"); + + if (! isleaf || onleft) + UnlockAndReleaseBuffer(buffer); + else + { + char tbuf[BLCKSZ]; + int cnt; + char *item; + Size itemsz; + + item = (char*)xlrec + SizeOfBtreeSplit + + sizeof(CommandId) + sizeof(RelFileNode); + for (cnt = 0; item < (char*)record + record->xl_len; ) + { + BTItem btitem = (BTItem) + (tbuf + cnt * (MAXALIGN(sizeof(BTItemData)))); + memcpy(btitem, item, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btitem->bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = MAXALIGN(itemsz); + item += itemsz; + cnt++; + } + cnt -= ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + if (cnt < 0) + elog(STOP, "btree_split_undo: target item unfound in right sibling"); + + item = tbuf + cnt * (MAXALIGN(sizeof(BTItemData))); + + _bt_del_item(reln, buffer, (BTItem)item, false, lsn, record); + } + } + + /* Right (next) page */ + blkno = BlockIdGetBlockNumber(&(xlrec->rightblk)); + buffer = XLogReadBuffer(false, reln, blkno); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_split_%s: lost next right page", op); + + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(STOP, "btree_split_%s: uninitialized next right page", op); + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) + UnlockAndReleaseBuffer(buffer); + else + { + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + pageop->btpo_prev = (onleft) ? + BlockIdGetBlockNumber(&(xlrec->otherblk)) : + ItemPointerGetBlockNumber(&(xlrec->target.tid)); + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + } + else /* undo */ + { + if (XLByteLT(PageGetLSN(page), lsn)) + elog(STOP, "btree_split_undo: bad next right page LSN"); + + UnlockAndReleaseBuffer(buffer); + } + +} + +static void +btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) +{ + xl_btree_newroot *xlrec; + Relation reln; + Buffer buffer; + Page page; + Buffer metabuf; + Page metapg; + + if (!redo) + return; + + xlrec = (xl_btree_newroot*) XLogRecGetData(record); + reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node); + if (!RelationIsValid(reln)) + return; + buffer = XLogReadBuffer(true, reln, BlockIdGetBlockNumber(&(xlrec->rootblk))); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_newroot_redo: no root page"); + metabuf = XLogReadBuffer(false, reln, BTREE_METAPAGE); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_newroot_redo: no metapage"); + page = (Page) BufferGetPage(buffer); + + if (PageIsNew((PageHeader) page) || XLByteLT(PageGetLSN(page), lsn)) + { + BTPageOpaque pageop; + + _bt_pageinit(page, BufferGetPageSize(buffer)); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + pageop->btpo_flags |= BTP_ROOT; + pageop->btpo_prev = pageop->btpo_next = P_NONE; + pageop->btpo_parent = BTREE_METAPAGE; + + if (record->xl_len == SizeOfBtreeNewroot) /* no childs */ + pageop->btpo_flags |= BTP_LEAF; + else + { + BTItemData btdata; + Size itemsz; + char *item; + + for (item = (char*)xlrec + SizeOfBtreeNewroot; + item < (char*)record + record->xl_len; ) + { + memcpy(&btdata, item, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = MAXALIGN(itemsz); + if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber, + LP_USED) == InvalidOffsetNumber) + elog(STOP, "btree_newroot_redo: can't add item"); + item += itemsz; + } + } + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + else + UnlockAndReleaseBuffer(buffer); + + metapg = BufferGetPage(metabuf); + if (PageIsNew((PageHeader) metapg)) + { + BTMetaPageData md; + + _bt_pageinit(metapg, BufferGetPageSize(metabuf)); + md.btm_magic = BTREE_MAGIC; + md.btm_version = BTREE_VERSION; + md.btm_root = P_NONE; + md.btm_level = 0; + memcpy((char *) BTPageGetMeta(metapg), (char *) &md, sizeof(md)); + } + + if (XLByteLT(PageGetLSN(metapg), lsn)) + { + BTMetaPageData *metad = BTPageGetMeta(metapg); + + metad->btm_root = BlockIdGetBlockNumber(&(xlrec->rootblk)); + (metad->btm_level)++; + PageSetLSN(metapg, lsn); + PageSetSUI(metapg, ThisStartUpID); + UnlockAndWriteBuffer(metabuf); + } + else + UnlockAndReleaseBuffer(metabuf); + + return; +} + +void +btree_redo(XLogRecPtr lsn, XLogRecord *record) +{ + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + if (info == XLOG_BTREE_DELETE) + btree_xlog_delete(true, lsn, record); + else if (info == XLOG_BTREE_INSERT) + btree_xlog_insert(true, lsn, record); + else if (info == XLOG_BTREE_SPLIT) + btree_xlog_split(true, false, lsn, record); /* new item on the right */ + else if (info == XLOG_BTREE_SPLEFT) + btree_xlog_split(true, true, lsn, record); /* new item on the left */ + else if (info == XLOG_BTREE_NEWROOT) + btree_xlog_newroot(true, lsn, record); + else + elog(STOP, "btree_redo: unknown op code %u", info); +} + +void +btree_undo(XLogRecPtr lsn, XLogRecord *record) +{ + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + if (info == XLOG_BTREE_DELETE) + btree_xlog_delete(false, lsn, record); + else if (info == XLOG_BTREE_INSERT) + btree_xlog_insert(false, lsn, record); + else if (info == XLOG_BTREE_SPLIT) + btree_xlog_split(false, false, lsn, record);/* new item on the right */ + else if (info == XLOG_BTREE_SPLEFT) + btree_xlog_split(false, true, lsn, record); /* new item on the left */ + else if (info == XLOG_BTREE_NEWROOT) + btree_xlog_newroot(false, lsn, record); + else + elog(STOP, "btree_undo: unknown op code %u", info); +} + +static void +out_target(char *buf, xl_btreetid *target) +{ + sprintf(buf + strlen(buf), "node %u/%u; tid %u/%u", + target->node.tblNode, target->node.relNode, + ItemPointerGetBlockNumber(&(target->tid)), + ItemPointerGetOffsetNumber(&(target->tid))); +} + +void +btree_desc(char *buf, uint8 xl_info, char* rec) +{ + uint8 info = xl_info & ~XLR_INFO_MASK; + + if (info == XLOG_BTREE_INSERT) + { + xl_btree_insert *xlrec = (xl_btree_insert*) rec; + strcat(buf, "insert: "); + out_target(buf, &(xlrec->target)); + } + else if (info == XLOG_BTREE_DELETE) + { + xl_btree_delete *xlrec = (xl_btree_delete*) rec; + strcat(buf, "delete: "); + out_target(buf, &(xlrec->target)); + } + else if (info == XLOG_BTREE_SPLIT || info == XLOG_BTREE_SPLEFT) + { + xl_btree_split *xlrec = (xl_btree_split*) rec; + sprintf(buf + strlen(buf), "split(%s): ", + (info == XLOG_BTREE_SPLIT) ? "right" : "left"); + out_target(buf, &(xlrec->target)); + sprintf(buf + strlen(buf), "; oth %u; rgh %u", + BlockIdGetBlockNumber(&xlrec->otherblk), + BlockIdGetBlockNumber(&xlrec->rightblk)); + } + else if (info == XLOG_BTREE_NEWROOT) + { + xl_btree_newroot *xlrec = (xl_btree_newroot*) rec; + sprintf(buf + strlen(buf), "root: node %u/%u; blk %u", + xlrec->node.tblNode, xlrec->node.relNode, + BlockIdGetBlockNumber(&xlrec->rootblk)); + } + else + strcat(buf, "UNKNOWN"); +} + #endif diff --git a/src/backend/access/rtree/rtree.c b/src/backend/access/rtree/rtree.c index d36bf79a8d..60f6a2f6ca 100644 --- a/src/backend/access/rtree/rtree.c +++ b/src/backend/access/rtree/rtree.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/rtree/Attic/rtree.c,v 1.53 2000/07/30 20:43:40 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/rtree/Attic/rtree.c,v 1.54 2000/10/21 15:43:20 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -22,6 +22,12 @@ #include "executor/executor.h" #include "miscadmin.h" +#ifdef XLOG +#include "access/xlogutils.h" +void rtree_redo(XLogRecPtr lsn, XLogRecord *record); +void rtree_undo(XLogRecPtr lsn, XLogRecord *record); +void rtree_desc(char *buf, uint8 xl_info, char* rec); +#endif typedef struct SPLITVEC { @@ -1066,3 +1072,22 @@ _rtdump(Relation r) } #endif /* defined RTDEBUG */ + +#ifdef XLOG +void +rtree_redo(XLogRecPtr lsn, XLogRecord *record) +{ + elog(STOP, "rtree_redo: unimplemented"); +} + +void +rtree_undo(XLogRecPtr lsn, XLogRecord *record) +{ + elog(STOP, "rtree_undo: unimplemented"); +} + +void +rtree_desc(char *buf, uint8 xl_info, char* rec) +{ +} +#endif diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 4efb53ea08..147256653a 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -1,4 +1,59 @@ #include "postgres.h" -#include "access/rmgr.h" +#include "access/xlog.h" -RmgrData *RmgrTable = NULL; +#ifdef XLOG +extern void xlog_redo(XLogRecPtr lsn, XLogRecord *rptr); +extern void xlog_undo(XLogRecPtr lsn, XLogRecord *rptr); +extern void xlog_desc(char *buf, uint8 xl_info, char* rec); + +extern void xact_redo(XLogRecPtr lsn, XLogRecord *rptr); +extern void xact_undo(XLogRecPtr lsn, XLogRecord *rptr); +extern void xact_desc(char *buf, uint8 xl_info, char* rec); + +extern void smgr_redo(XLogRecPtr lsn, XLogRecord *rptr); +extern void smgr_undo(XLogRecPtr lsn, XLogRecord *rptr); +extern void smgr_desc(char *buf, uint8 xl_info, char* rec); + +extern void heap_redo(XLogRecPtr lsn, XLogRecord *rptr); +extern void heap_undo(XLogRecPtr lsn, XLogRecord *rptr); +extern void heap_desc(char *buf, uint8 xl_info, char* rec); + +extern void btree_redo(XLogRecPtr lsn, XLogRecord *rptr); +extern void btree_undo(XLogRecPtr lsn, XLogRecord *rptr); +extern void btree_desc(char *buf, uint8 xl_info, char* rec); + +extern void hash_redo(XLogRecPtr lsn, XLogRecord *rptr); +extern void hash_undo(XLogRecPtr lsn, XLogRecord *rptr); +extern void hash_desc(char *buf, uint8 xl_info, char* rec); + +extern void rtree_redo(XLogRecPtr lsn, XLogRecord *rptr); +extern void rtree_undo(XLogRecPtr lsn, XLogRecord *rptr); +extern void rtree_desc(char *buf, uint8 xl_info, char* rec); + +extern void gist_redo(XLogRecPtr lsn, XLogRecord *rptr); +extern void gist_undo(XLogRecPtr lsn, XLogRecord *rptr); +extern void gist_desc(char *buf, uint8 xl_info, char* rec); + +RmgrData RmgrTable[] = { +{"XLOG", xlog_redo, xlog_undo, xlog_desc}, +{"Transaction", xact_redo, xact_undo, xact_desc}, +{"Storage", smgr_redo, smgr_undo, smgr_desc}, +{"Reserved 3", NULL, NULL, NULL}, +{"Reserved 4", NULL, NULL, NULL}, +{"Reserved 5", NULL, NULL, NULL}, +{"Reserved 6", NULL, NULL, NULL}, +{"Reserved 7", NULL, NULL, NULL}, +{"Reserved 8", NULL, NULL, NULL}, +{"Reserved 9", NULL, NULL, NULL}, +{"Heap", heap_redo, heap_undo, heap_desc}, +{"Btree", btree_redo, btree_undo, btree_desc}, +{"Hash", hash_redo, hash_undo, hash_desc}, +{"Rtree", rtree_redo, rtree_undo, rtree_desc}, +{"Gist", gist_redo, gist_undo, gist_desc} +}; + +#else + +RmgrData RmgrTable[] = {}; + +#endif diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 681d856517..315d813e92 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.73 2000/10/20 11:01:04 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.74 2000/10/21 15:43:22 vadim Exp $ * * NOTES * Transaction aborts can now occur two ways: @@ -224,6 +224,7 @@ int CommitDelay; void xact_redo(XLogRecPtr lsn, XLogRecord *record); void xact_undo(XLogRecPtr lsn, XLogRecord *record); +void xact_desc(char *buf, uint8 xl_info, char* rec); static void (*_RollbackFunc)(void*) = NULL; static void *_RollbackData = NULL; @@ -692,6 +693,7 @@ RecordTransactionCommit() TransactionIdCommit(xid); #ifdef XLOG + if (MyLastRecPtr.xlogid != 0 || MyLastRecPtr.xrecoff != 0) { xl_xact_commit xlrec; struct timeval delay; @@ -711,6 +713,9 @@ RecordTransactionCommit() delay.tv_sec = 0; delay.tv_usec = CommitDelay; (void) select(0, NULL, NULL, NULL, &delay); + XLogFlush(recptr); + MyLastRecPtr.xlogid = 0; + MyLastRecPtr.xrecoff = 0; } #endif /* @@ -823,7 +828,7 @@ RecordTransactionAbort() TransactionIdAbort(xid); #ifdef XLOG - if (SharedBufferChanged) + if (MyLastRecPtr.xlogid != 0 || MyLastRecPtr.xrecoff != 0) { xl_xact_abort xlrec; XLogRecPtr recptr; @@ -1176,6 +1181,8 @@ AbortTransaction() AtEOXact_Files(); /* Here we'll rollback xaction changes */ + MyLastRecPtr.xlogid = 0; + MyLastRecPtr.xrecoff = 0; AtAbort_Locks(); @@ -1748,6 +1755,33 @@ xact_undo(XLogRecPtr lsn, XLogRecord *record) else if (info != XLOG_XACT_ABORT) elog(STOP, "xact_redo: unknown op code %u", info); } + +void +xact_desc(char *buf, uint8 xl_info, char* rec) +{ + uint8 info = xl_info & ~XLR_INFO_MASK; + + if (info == XLOG_XACT_COMMIT) + { + xl_xact_commit *xlrec = (xl_xact_commit*) rec; + struct tm *tm = localtime(&xlrec->xtime); + + sprintf(buf + strlen(buf), "commit: %04u-%02u-%02u %02u:%02u:%02u", + tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, + tm->tm_hour, tm->tm_min, tm->tm_sec); + } + else if (info == XLOG_XACT_ABORT) + { + xl_xact_abort *xlrec = (xl_xact_abort*) rec; + struct tm *tm = localtime(&xlrec->xtime); + + sprintf(buf + strlen(buf), "abort: %04u-%02u-%02u %02u:%02u:%02u", + tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, + tm->tm_hour, tm->tm_min, tm->tm_sec); + } + else + strcat(buf, "UNKNOWN"); +} void XactPushRollback(void (*func) (void *), void* data) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 8c3cd117cb..bae7af83c3 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6,7 +6,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.18 2000/10/20 11:01:04 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.19 2000/10/21 15:43:22 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -27,6 +27,8 @@ #include "storage/spin.h" #include "storage/s_lock.h" +#include "miscadmin.h" + void UpdateControlFile(void); int XLOGShmemSize(void); void XLOGShmemInit(void); @@ -41,6 +43,9 @@ uint32 XLOGbuffers = 0; XLogRecPtr MyLastRecPtr = {0, 0}; bool StopIfError = false; bool InRecovery = false; +StartUpID ThisStartUpID = 0; + +int XLOG_DEBUG = 1; SPINLOCK ControlFileLockId; SPINLOCK XidGenLockId; @@ -93,6 +98,7 @@ typedef struct XLogCtlData XLogRecPtr *xlblocks; /* 1st byte ptr-s + BLCKSZ */ uint32 XLogCacheByte; uint32 XLogCacheBlck; + StartUpID ThisStartUpID; #ifdef HAS_TEST_AND_SET slock_t insert_lck; slock_t info_lck; @@ -137,16 +143,20 @@ static ControlFileData *ControlFile = NULL; typedef struct CheckPoint { - XLogRecPtr redo; /* next RecPtr available when we */ - /* began to create CheckPoint */ - /* (i.e. REDO start point) */ - XLogRecPtr undo; /* first record of oldest in-progress */ - /* transaction when we started */ - /* (i.e. UNDO end point) */ - TransactionId nextXid; - Oid nextOid; + XLogRecPtr redo; /* next RecPtr available when we */ + /* began to create CheckPoint */ + /* (i.e. REDO start point) */ + XLogRecPtr undo; /* first record of oldest in-progress */ + /* transaction when we started */ + /* (i.e. UNDO end point) */ + StartUpID ThisStartUpID; + TransactionId nextXid; + Oid nextOid; + bool Shutdown; } CheckPoint; +#define XLOG_CHECKPOINT 0x00 + /* * We break each log file in 16Mb segments */ @@ -190,6 +200,7 @@ static int XLogFileInit(uint32 log, uint32 seg); static int XLogFileOpen(uint32 log, uint32 seg, bool econt); static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, char *buffer); static char *str_time(time_t tnow); +static void xlog_outrec(char *buf, XLogRecord *record); static XLgwrResult LgwrResult = {{0, 0}, {0, 0}}; static XLgwrRqst LgwrRqst = {{0, 0}, {0, 0}}; @@ -225,6 +236,13 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 if (len == 0 || len > MAXLOGRECSZ) elog(STOP, "XLogInsert: invalid record len %u", len); + if (IsBootstrapProcessingMode()) + { + RecPtr.xlogid = 0; + RecPtr.xrecoff = SizeOfXLogPHD; /* start of 1st checkpoint record */ + return (RecPtr); + } + /* obtain xlog insert lock */ if (TAS(&(XLogCtl->insert_lck))) /* busy */ { @@ -310,6 +328,23 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 MyProc->logRec = RecPtr; SpinRelease(SInvalLock); } + Insert->PrevRecord = RecPtr; + + if (XLOG_DEBUG) + { + char buf[8192]; + + sprintf(buf, "INSERT @ %u/%u: ", RecPtr.xlogid, RecPtr.xrecoff); + xlog_outrec(buf, record); + if (hdr != NULL) + { + strcat(buf, " - "); + RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, hdr); + } + strcat(buf, "\n"); + write(2, buf, strlen(buf)); + } + MyLastRecPtr = RecPtr; /* begin of record */ Insert->currpos += SizeOfXLogRecord; if (freespace > 0) @@ -330,7 +365,7 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 Insert->currpos += wlen; } Insert->currpos = ((char *) Insert->currpage) + - DOUBLEALIGN(Insert->currpos - ((char *) Insert->currpage)); + MAXALIGN(Insert->currpos - ((char *) Insert->currpage)); len = hdrlen + buflen; } @@ -391,7 +426,7 @@ nbuf: /* we don't store info in subrecord' xl_info */ subrecord->xl_info = 0; Insert->currpos = ((char *) Insert->currpage) + - DOUBLEALIGN(Insert->currpos - ((char *) Insert->currpage)); + MAXALIGN(Insert->currpos - ((char *) Insert->currpage)); } freespace = ((char *) Insert->currpage) + BLCKSZ - Insert->currpos; @@ -738,7 +773,7 @@ XLogFileInit(uint32 log, uint32 seg) fd = BasicOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, S_IRUSR | S_IWUSR); if (fd < 0) - elog(STOP, "Open(logfile %u seg %u) failed: %d", + elog(STOP, "Init(logfile %u seg %u) failed: %d", logId, logSeg, errno); if (lseek(fd, XLogSegSize - 1, SEEK_SET) != (off_t) (XLogSegSize - 1)) @@ -777,6 +812,7 @@ XLogFileOpen(uint32 log, uint32 seg, bool econt) logId, logSeg); return (fd); } + abort(); elog(STOP, "Open(logfile %u seg %u) failed: %d", logId, logSeg, errno); } @@ -876,7 +912,7 @@ got_record:; XLogSubRecord *subrecord; uint32 len = record->xl_len; - if (DOUBLEALIGN(record->xl_len) + RecPtr->xrecoff % BLCKSZ + + if (MAXALIGN(record->xl_len) + RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord != BLCKSZ) { elog(emode, "ReadRecord: invalid fragmented record len %u in (%u, %u)", @@ -938,7 +974,7 @@ got_record:; buffer += subrecord->xl_len; if (subrecord->xl_info & XLR_TO_BE_CONTINUED) { - if (DOUBLEALIGN(subrecord->xl_len) + + if (MAXALIGN(subrecord->xl_len) + SizeOfXLogPHD + SizeOfXLogSubRecord != BLCKSZ) { elog(emode, "ReadRecord: invalid fragmented subrecord len %u in logfile %u seg %u off %u", @@ -949,26 +985,26 @@ got_record:; } break; } - if (BLCKSZ - SizeOfXLogRecord >= DOUBLEALIGN(subrecord->xl_len) + + if (BLCKSZ - SizeOfXLogRecord >= MAXALIGN(subrecord->xl_len) + SizeOfXLogPHD + SizeOfXLogSubRecord) { nextRecord = (XLogRecord *) ((char *) subrecord + - DOUBLEALIGN(subrecord->xl_len) + SizeOfXLogSubRecord); + MAXALIGN(subrecord->xl_len) + SizeOfXLogSubRecord); } EndRecPtr.xlogid = readId; EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff * BLCKSZ + SizeOfXLogPHD + SizeOfXLogSubRecord + - DOUBLEALIGN(subrecord->xl_len); + MAXALIGN(subrecord->xl_len); ReadRecPtr = *RecPtr; return (record); } - if (BLCKSZ - SizeOfXLogRecord >= DOUBLEALIGN(record->xl_len) + + if (BLCKSZ - SizeOfXLogRecord >= MAXALIGN(record->xl_len) + RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord) nextRecord = (XLogRecord *) ((char *) record + - DOUBLEALIGN(record->xl_len) + SizeOfXLogRecord); + MAXALIGN(record->xl_len) + SizeOfXLogRecord); EndRecPtr.xlogid = RecPtr->xlogid; EndRecPtr.xrecoff = RecPtr->xrecoff + - DOUBLEALIGN(record->xl_len) + SizeOfXLogRecord; + MAXALIGN(record->xl_len) + SizeOfXLogRecord; ReadRecPtr = *RecPtr; return (record); @@ -1130,7 +1166,7 @@ BootStrapXLOG() char buffer[BLCKSZ]; CheckPoint checkPoint; -#ifdef NOT_USED +#ifdef XLOG XLogPageHeader page = (XLogPageHeader) buffer; XLogRecord *record; @@ -1146,8 +1182,9 @@ BootStrapXLOG() checkPoint.undo = checkPoint.redo; checkPoint.nextXid = FirstTransactionId; checkPoint.nextOid = BootstrapObjectIdData; + checkPoint.ThisStartUpID = 0; -#ifdef NOT_USED +#ifdef XLOG memset(buffer, 0, BLCKSZ); page->xlp_magic = XLOG_PAGE_MAGIC; @@ -1213,7 +1250,7 @@ str_time(time_t tnow) void StartupXLOG() { -#ifdef NOT_USED +#ifdef XLOG XLogCtlInsert *Insert; CheckPoint checkPoint; XLogRecPtr RecPtr, @@ -1291,7 +1328,7 @@ StartupXLOG() elog(LOG, "Data Base System was interrupted being in production at %s", str_time(ControlFile->time)); -#ifdef NOT_USED +#ifdef XLOG LastRec = RecPtr = ControlFile->checkPoint; if (!XRecOffIsValid(RecPtr.xrecoff)) @@ -1312,17 +1349,20 @@ StartupXLOG() checkPoint.nextXid, checkPoint.nextOid); if (checkPoint.nextXid < FirstTransactionId || checkPoint.nextOid < BootstrapObjectIdData) -#ifdef XLOG + +#ifdef XLOG_2 elog(STOP, "Invalid NextTransactionId/NextOid"); #else elog(LOG, "Invalid NextTransactionId/NextOid"); #endif -#ifdef XLOG +#ifdef XLOG_2 ShmemVariableCache->nextXid = checkPoint.nextXid; ShmemVariableCache->nextOid = checkPoint.nextOid; #endif + ThisStartUpID = checkPoint.ThisStartUpID; + if (XLByteLT(RecPtr, checkPoint.redo)) elog(STOP, "Invalid redo in checkPoint record"); if (checkPoint.undo.xrecoff == 0) @@ -1364,10 +1404,23 @@ StartupXLOG() ReadRecPtr.xlogid, ReadRecPtr.xrecoff); do { -#ifdef XLOG +#ifdef XLOG_2 if (record->xl_xid >= ShmemVariableCache->nextXid) ShmemVariableCache->nextXid = record->xl_xid + 1; #endif + if (XLOG_DEBUG) + { + char buf[8192]; + + sprintf(buf, "REDO @ %u/%u: ", ReadRecPtr.xlogid, ReadRecPtr.xrecoff); + xlog_outrec(buf, record); + strcat(buf, " - "); + RmgrTable[record->xl_rmid].rm_desc(buf, + record->xl_info, XLogRecGetData(record)); + strcat(buf, "\n"); + write(2, buf, strlen(buf)); + } + RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record); record = ReadRecord(NULL, buffer); } while (record->xl_len != 0); @@ -1422,6 +1475,7 @@ StartupXLOG() if (recovery > 0) { +#ifdef NOT_USED int i; /* @@ -1429,21 +1483,37 @@ StartupXLOG() */ for (i = 0; i <= RM_MAX_ID; i++) RmgrTable[record->xl_rmid].rm_redo(ReadRecPtr, NULL); +#endif CreateCheckPoint(true); StopIfError = sie_saved; } -#endif /* NOT_USED */ +#endif /* XLOG */ ControlFile->state = DB_IN_PRODUCTION; ControlFile->time = time(NULL); UpdateControlFile(); + ThisStartUpID++; + XLogCtl->ThisStartUpID = ThisStartUpID; + elog(LOG, "Data Base System is in production state at %s", str_time(time(NULL))); return; } +/* + * Postmaster uses it to set ThisStartUpID from XLogCtlData + * located in shmem after successful startup. + */ +void SetThisStartUpID(void); + +void +SetThisStartUpID(void) +{ + ThisStartUpID = XLogCtl->ThisStartUpID; +} + /* * This func must be called ONCE on system shutdown */ @@ -1461,7 +1531,7 @@ ShutdownXLOG() void CreateCheckPoint(bool shutdown) { -#ifdef NOT_USED +#ifdef XLOG CheckPoint checkPoint; XLogRecPtr recptr; XLogCtlInsert *Insert = &XLogCtl->Insert; @@ -1475,6 +1545,8 @@ CreateCheckPoint(bool shutdown) ControlFile->time = time(NULL); UpdateControlFile(); } + checkPoint.ThisStartUpID = ThisStartUpID; + checkPoint.Shutdown = shutdown; /* Get REDO record ptr */ while (TAS(&(XLogCtl->insert_lck))) @@ -1517,20 +1589,21 @@ CreateCheckPoint(bool shutdown) if (shutdown && checkPoint.undo.xrecoff != 0) elog(STOP, "Active transaction while data base is shutting down"); - recptr = XLogInsert(RM_XLOG_ID, (char *) &checkPoint, sizeof(checkPoint), NULL, 0); + recptr = XLogInsert(RM_XLOG_ID, XLOG_CHECKPOINT, (char *) &checkPoint, + sizeof(checkPoint), NULL, 0); if (shutdown && !XLByteEQ(checkPoint.redo, MyLastRecPtr)) elog(STOP, "XLog concurrent activity while data base is shutting down"); XLogFlush(recptr); -#endif /* NOT_USED */ +#endif /* XLOG */ SpinAcquire(ControlFileLockId); if (shutdown) ControlFile->state = DB_SHUTDOWNED; -#ifdef NOT_USED +#ifdef XLOG ControlFile->checkPoint = MyLastRecPtr; #else ControlFile->checkPoint.xlogid = 0; @@ -1543,3 +1616,47 @@ CreateCheckPoint(bool shutdown) return; } + +void xlog_redo(XLogRecPtr lsn, XLogRecord *record); +void xlog_undo(XLogRecPtr lsn, XLogRecord *record); +void xlog_desc(char *buf, uint8 xl_info, char* rec); + +void +xlog_redo(XLogRecPtr lsn, XLogRecord *record) +{ +} + +void +xlog_undo(XLogRecPtr lsn, XLogRecord *record) +{ +} + +void +xlog_desc(char *buf, uint8 xl_info, char* rec) +{ + uint8 info = xl_info & ~XLR_INFO_MASK; + + if (info == XLOG_CHECKPOINT) + { + CheckPoint *checkpoint = (CheckPoint*) rec; + sprintf(buf + strlen(buf), "checkpoint: redo %u/%u; undo %u/%u; " + "sui %u; xid %u; oid %u; %s", + checkpoint->redo.xlogid, checkpoint->redo.xrecoff, + checkpoint->undo.xlogid, checkpoint->undo.xrecoff, + checkpoint->ThisStartUpID, checkpoint->nextXid, + checkpoint->nextOid, + (checkpoint->Shutdown) ? "shutdown" : "online"); + } + else + strcat(buf, "UNKNOWN"); +} + +static void +xlog_outrec(char *buf, XLogRecord *record) +{ + sprintf(buf + strlen(buf), "prev %u/%u; xprev %u/%u; xid %u: %s", + record->xl_prev.xlogid, record->xl_prev.xrecoff, + record->xl_xact_prev.xlogid, record->xl_xact_prev.xrecoff, + record->xl_xid, + RmgrTable[record->xl_rmid].rm_name); +} diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index a507c39bc2..5140cfaf34 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -276,6 +276,9 @@ _xl_init_rel_cache(void) _xlpgcarr = (Form_pg_class) malloc(sizeof(FormData_pg_class) * _xlcnt); memset(_xlpgcarr, 0, sizeof(XLogRelDesc) * _xlcnt); + _xlrelarr[0].moreRecently = &(_xlrelarr[0]); + _xlrelarr[0].lessRecently = &(_xlrelarr[0]); + memset(&ctl, 0, (int) sizeof(ctl)); ctl.keysize = sizeof(RelFileNode); ctl.datasize = sizeof(XLogRelDesc*); @@ -383,6 +386,7 @@ XLogOpenRelation(bool redo, RmgrId rmid, RelFileNode rnode) hentry->rdesc = res; res->reldata.rd_unlinked = true; /* look smgropen */ + res->reldata.rd_fd = -1; res->reldata.rd_fd = smgropen(DEFAULT_SMGR, &(res->reldata)); } diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index 1620839e9a..275433dda3 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/bootstrap/bootstrap.c,v 1.93 2000/09/06 14:15:14 petere Exp $ + * $Header: /cvsroot/pgsql/src/backend/bootstrap/bootstrap.c,v 1.94 2000/10/21 15:43:24 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -345,6 +345,7 @@ BootstrapMain(int argc, char *argv[]) if (IsUnderPostmaster && xloginit) { + SetProcessingMode(NormalProcessing); StartupXLOG(); proc_exit(0); } @@ -360,6 +361,7 @@ BootstrapMain(int argc, char *argv[]) if (IsUnderPostmaster && !xloginit) { + SetProcessingMode(NormalProcessing); ShutdownXLOG(); proc_exit(0); } diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 6f9a16af35..5276b6dcf1 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -11,7 +11,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.172 2000/10/16 14:52:08 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.173 2000/10/21 15:43:26 vadim Exp $ * * NOTES * @@ -220,6 +220,10 @@ extern char *optarg; extern int optind, opterr; +extern char XLogDir[]; +extern char ControlFilePath[]; +extern void SetThisStartUpID(void); + /* * postmaster.c - function prototypes */ @@ -600,6 +604,10 @@ PostmasterMain(int argc, char *argv[]) /* set up shared memory and semaphores */ reset_shared(PostPortName); + /* Init XLOG pathes */ + snprintf(XLogDir, MAXPGPATH, "%s/pg_xlog", DataDir); + snprintf(ControlFilePath, MAXPGPATH, "%s/global/pg_control", DataDir); + /* * Initialize the list of active backends. This list is only used for * garbage collecting the backend processes. @@ -1449,6 +1457,12 @@ reaper(SIGNAL_ARGS) abort(); ShutdownPID = ShutdownDataBase(); } + + /* + * Startup succeeded - remember its ID + */ + SetThisStartUpID(); + pqsignal(SIGCHLD, reaper); return; } diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index ae4a29d607..459a98daee 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.88 2000/10/20 11:01:07 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.89 2000/10/21 15:43:27 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -609,7 +609,7 @@ BufferAlloc(Relation reln, } /* record the database name and relation name for this buffer */ - strcpy(buf->blind.dbname, DatabaseName); + strcpy(buf->blind.dbname, (DatabaseName) ? DatabaseName : "Recovery"); strcpy(buf->blind.relname, RelationGetPhysicalRelationName(reln)); buf->relId = reln->rd_lockInfo.lockRelId; @@ -1168,8 +1168,9 @@ BufferSync() SpinRelease(BufMgrLock); } - +#ifndef XLOG LocalBufferSync(); +#endif } diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index dd1b44d334..6f043bf944 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/page/bufpage.c,v 1.32 2000/10/20 11:28:39 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/page/bufpage.c,v 1.33 2000/10/21 15:43:29 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -245,7 +245,11 @@ itemidcompare(const void *itemidp1, const void *itemidp2) /* * PageRepairFragmentation - * Frees fragmented space on a page. + * + * Frees fragmented space on a page. + * It doesn't remove unused line pointers! Please don't change this. + * This routine is usable for heap pages only. + * */ void PageRepairFragmentation(Page page) @@ -264,6 +268,8 @@ PageRepairFragmentation(Page page) for (i = 0; i < nline; i++) { lp = ((PageHeader) page)->pd_linp + i; + if ((*lp).lp_flags & LP_DELETE) /* marked for deletion */ + (*lp).lp_flags &= ~(LP_USED | LP_DELETE); if ((*lp).lp_flags & LP_USED) nused++; } @@ -343,6 +349,31 @@ PageGetFreeSpace(Page page) return space; } +/* + * PageRepairFragmentation un-useful for index page cleanup because + * of it doesn't remove line pointers. This routine could be more + * effective but ... no time -:) + */ +void +IndexPageCleanup(Buffer buffer) +{ + Page page = (Page) BufferGetPage(buffer); + ItemId lp; + OffsetNumber maxoff; + OffsetNumber i; + + maxoff = PageGetMaxOffsetNumber(page); + for (i = 0; i < maxoff; i++) + { + lp = ((PageHeader) page)->pd_linp + i; + if ((*lp).lp_flags & LP_DELETE) /* marked for deletion */ + { + PageIndexTupleDelete(page, i + 1); + maxoff--; + } + } +} + /* *---------------------------------------------------------------- * PageIndexTupleDelete diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 8bcb13fd4e..65bc5595a8 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -11,7 +11,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.40 2000/10/16 14:52:12 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.41 2000/10/21 15:43:31 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -539,3 +539,26 @@ smgriswo(int16 smgrno) } #endif + +#ifdef XLOG +#include "access/xlog.h" + +void smgr_redo(XLogRecPtr lsn, XLogRecord *record); +void smgr_undo(XLogRecPtr lsn, XLogRecord *record); +void smgr_desc(char *buf, uint8 xl_info, char* rec); + +void +smgr_redo(XLogRecPtr lsn, XLogRecord *record) +{ +} + +void +smgr_undo(XLogRecPtr lsn, XLogRecord *record) +{ +} + +void +smgr_desc(char *buf, uint8 xl_info, char* rec) +{ +} +#endif diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index 02a0bb6f5d..23a9c9587c 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: nbtree.h,v 1.45 2000/10/13 12:05:22 vadim Exp $ + * $Id: nbtree.h,v 1.46 2000/10/21 15:43:33 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -266,9 +266,9 @@ typedef struct xl_btree_insert typedef struct xl_btree_split { xl_btreetid target; /* inserted tuple id */ - BlockId otherblk; /* second block participated in split: */ + BlockIdData otherblk; /* second block participated in split: */ /* first one is stored in target' tid */ - BlockId rightblk; /* next right block */ + BlockIdData rightblk; /* next right block */ /* * We log all btitems from the right sibling. If new btitem goes on * the left sibling then we log it too and it will be the first @@ -277,7 +277,7 @@ typedef struct xl_btree_split */ } xl_btree_split; -#define SizeOfBtreeSplit (offsetof(xl_btree_insert, rightblk) + sizeof(BlockId)) +#define SizeOfBtreeSplit (offsetof(xl_btree_split, rightblk) + sizeof(BlockIdData)) /* * New root log record. @@ -285,11 +285,11 @@ typedef struct xl_btree_split typedef struct xl_btree_newroot { RelFileNode node; - BlockId rootblk; + BlockIdData rootblk; /* 0 or 2 BTITEMS FOLLOW AT END OF STRUCT */ } xl_btree_newroot; -#define SizeOfBtreeNewroot (offsetof(xl_btree_newroot, rootblk) + sizeof(BlockId)) +#define SizeOfBtreeNewroot (offsetof(xl_btree_newroot, rootblk) + sizeof(BlockIdData)) /* end of XLOG stuff */ diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index 1980b31a3f..221c52d2f2 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -2,7 +2,7 @@ * * rmgr.h * - * Resource managers description table + * Resource managers definition * */ #ifndef RMGR_H @@ -10,15 +10,6 @@ typedef uint8 RmgrId; -typedef struct RmgrData -{ - char *rm_name; - void (*rm_redo)(); /* REDO(XLogRecPtr lsn, XLogRecord rptr) */ - void (*rm_undo)(); /* UNDO(XLogRecPtr lsn, XLogRecord rptr) */ -} RmgrData; - -extern RmgrData *RmgrTable; - /* * Built-in resource managers */ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index c73217f9cc..c77c1cac02 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -90,6 +90,17 @@ typedef XLogPageHeaderData *XLogPageHeader; typedef uint32 StartUpID; extern StartUpID ThisStartUpID; extern bool InRecovery; +extern XLogRecPtr MyLastRecPtr; + +typedef struct RmgrData +{ + char *rm_name; + void (*rm_redo)(XLogRecPtr lsn, XLogRecord *rptr); + void (*rm_undo)(XLogRecPtr lsn, XLogRecord *rptr); + void (*rm_desc)(char *buf, uint8 xl_info, char *rec); +} RmgrData; + +extern RmgrData RmgrTable[]; extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h index 85e51122f9..d547f71b73 100644 --- a/src/include/storage/bufpage.h +++ b/src/include/storage/bufpage.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: bufpage.h,v 1.33 2000/10/20 11:01:21 vadim Exp $ + * $Id: bufpage.h,v 1.34 2000/10/21 15:43:36 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -324,6 +324,7 @@ extern void PageRestoreTempPage(Page tempPage, Page oldPage); extern void PageRepairFragmentation(Page page); extern Size PageGetFreeSpace(Page page); extern void PageIndexTupleDelete(Page page, OffsetNumber offset); +extern void IndexPageCleanup(Buffer buffer); #endif /* BUFPAGE_H */ diff --git a/src/include/storage/itemid.h b/src/include/storage/itemid.h index a915f70168..2e4ab58809 100644 --- a/src/include/storage/itemid.h +++ b/src/include/storage/itemid.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: itemid.h,v 1.13 2000/10/20 11:01:21 vadim Exp $ + * $Id: itemid.h,v 1.14 2000/10/21 15:43:36 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -31,15 +31,11 @@ typedef ItemIdData *ItemId; */ #define LP_USED 0x01 /* this line pointer is being used */ -#ifdef XLOG - #define LP_DELETE 0x02 /* item is to be deleted */ #define ItemIdDeleted(itemId) \ (((itemId)->lp_flags & LP_DELETE) != 0) -#endif - /* * This bit may be passed to PageAddItem together with * LP_USED & LP_DELETED bits to specify overwrite mode