From f0c5a6c61479be9da05ce086d1029885a7d55cb3 Mon Sep 17 00:00:00 2001 From: "Vadim B. Mikheev" Date: Wed, 27 Nov 1996 07:27:20 +0000 Subject: [PATCH] Shrinking and other things. --- src/backend/commands/vacuum.c | 1161 ++++++++++++++++++++++++++------- 1 file changed, 926 insertions(+), 235 deletions(-) diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 7fe8648e1e..06140f7c27 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.8 1996/11/06 08:21:40 scrappy Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.9 1996/11/27 07:27:20 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -27,14 +27,17 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include +#include "storage/shmem.h" #ifdef NEED_RUSAGE # include #else /* NEED_RUSAGE */ @@ -44,26 +47,47 @@ bool VacuumRunning = false; +#ifdef VACUUM_QUIET +static int MESSLEV = DEBUG; +#else +static int MESSLEV = NOTICE; +#endif + +typedef struct { + FuncIndexInfo finfo; + FuncIndexInfo *finfoP; + IndexTupleForm tform; + int natts; +} IndDesc; + /* non-export function prototypes */ static void _vc_init(void); static void _vc_shutdown(void); static void _vc_vacuum(NameData *VacRelP); static VRelList _vc_getrels(Portal p, NameData *VacRelP); -static void _vc_vacone(Portal p, VRelList curvrl); -static void _vc_vacheap(Portal p, VRelList curvrl, Relation onerel); -static void _vc_vacindices(VRelList curvrl, Relation onerel); -static void _vc_vaconeind(VRelList curvrl, Relation indrel); +static void _vc_vacone (VRelList curvrl); +static void _vc_scanheap (VRelList curvrl, Relation onerel, VPageList Vvpl, VPageList Fvpl); +static void _vc_rpfheap (VRelList curvrl, Relation onerel, VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel); +static void _vc_vacheap (VRelList curvrl, Relation onerel, VPageList vpl); +static void _vc_vacpage (Page page, VPageDescr vpd, Relation archrel); +static void _vc_vaconeind (VPageList vpl, Relation indrel, int nhtups); static void _vc_updstats(Oid relid, int npages, int ntuples, bool hasindex); static void _vc_setpagelock(Relation rel, BlockNumber blkno); -static bool _vc_tidreapped(ItemPointer itemptr, VRelList curvrl, Relation indrel); -static void _vc_reappage(Portal p, VRelList curvrl, VPageDescr vpc); +static VPageDescr _vc_tidreapped (ItemPointer itemptr, VPageList curvrl); +static void _vc_reappage (VPageList vpl, VPageDescr vpc); +static void _vc_vpinsert (VPageList vpl, VPageDescr vpnew); static void _vc_free(Portal p, VRelList vrl); +static void _vc_getindices (Oid relid, int *nindices, Relation **Irel); +static void _vc_clsindices (int nindices, Relation *Irel); static Relation _vc_getarchrel(Relation heaprel); static void _vc_archive(Relation archrel, HeapTuple htup); static bool _vc_isarchrel(char *rname); +static void _vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc); static char * _vc_find_eq (char *bot, int nelem, int size, char *elm, int (*compar)(char *, char *)); static int _vc_cmp_blk (char *left, char *right); static int _vc_cmp_offno (char *left, char *right); +static bool _vc_enough_space (VPageDescr vpd, Size len); + void vacuum(char *vacrel) @@ -183,7 +207,7 @@ _vc_vacuum(NameData *VacRelP) /* vacuum each heap relation */ for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next) - _vc_vacone(p, cur); + _vc_vacone (cur); _vc_free(p, vrl); @@ -200,7 +224,7 @@ _vc_getrels(Portal p, NameData *VacRelP) Buffer buf; PortalVariableMemory portalmem; MemoryContext old; - VRelList vrl, cur = NULL; + VRelList vrl, cur; Datum d; char *rname; char rkind; @@ -249,6 +273,17 @@ _vc_getrels(Portal p, NameData *VacRelP) continue; } + /* don't vacuum large objects for now - something breaks when we do */ + if ( (strlen(rname) > 4) && rname[0] == 'X' && + rname[1] == 'i' && rname[2] == 'n' && + (rname[3] == 'v' || rname[3] == 'x')) + { + elog (NOTICE, "Rel %.*s: can't vacuum LargeObjects now", + NAMEDATALEN, rname); + ReleaseBuffer(buf); + continue; + } + d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relsmgr, pgcdesc, &n); smgrno = DatumGetInt16(d); @@ -283,8 +318,6 @@ _vc_getrels(Portal p, NameData *VacRelP) cur->vrl_relid = pgctup->t_oid; cur->vrl_attlist = (VAttList) NULL; - cur->vrl_pgdsc = (VPageDescr*) NULL; - cur->vrl_nrepg = 0; cur->vrl_npages = cur->vrl_ntups = 0; cur->vrl_hasindex = false; cur->vrl_next = (VRelList) NULL; @@ -317,7 +350,7 @@ _vc_getrels(Portal p, NameData *VacRelP) * us to lock the entire database during one pass of the vacuum cleaner. */ static void -_vc_vacone(Portal p, VRelList curvrl) +_vc_vacone (VRelList curvrl) { Relation pgclass; TupleDesc pgcdesc; @@ -326,6 +359,12 @@ _vc_vacone(Portal p, VRelList curvrl) HeapScanDesc pgcscan; Relation onerel; ScanKeyData pgckey; + VPageListData Vvpl; /* List of pages to vacuum and/or clean indices */ + VPageListData Fvpl; /* List of pages with space enough for re-using */ + VPageDescr *vpp; + Relation *Irel; + int nindices; + int i; StartTransactionCommand(); @@ -355,14 +394,46 @@ _vc_vacone(Portal p, VRelList curvrl) /* we require the relation to be locked until the indices are cleaned */ RelationSetLockForWrite(onerel); - /* vacuum it */ - _vc_vacheap(p, curvrl, onerel); + /* scan it */ + Vvpl.vpl_npages = Fvpl.vpl_npages = 0; + _vc_scanheap(curvrl, onerel, &Vvpl, &Fvpl); - /* if we vacuumed any heap tuples, vacuum the indices too */ - if (curvrl->vrl_nrepg > 0) - _vc_vacindices(curvrl, onerel); + /* Now open/count indices */ + Irel = (Relation *) NULL; + if ( Vvpl.vpl_npages > 0 ) + /* Open all indices of this relation */ + _vc_getindices(curvrl->vrl_relid, &nindices, &Irel); else - curvrl->vrl_hasindex = onerel->rd_rel->relhasindex; + /* Count indices only */ + _vc_getindices(curvrl->vrl_relid, &nindices, NULL); + + if ( nindices > 0 ) + curvrl->vrl_hasindex = true; + else + curvrl->vrl_hasindex = false; + + /* Clean index' relation(s) */ + if ( Irel != (Relation*) NULL ) + { + for (i = 0; i < nindices; i++) + _vc_vaconeind (&Vvpl, Irel[i], curvrl->vrl_ntups); + } + + if ( Fvpl.vpl_npages > 0 ) /* Try to shrink heap */ + _vc_rpfheap (curvrl, onerel, &Vvpl, &Fvpl, nindices, Irel); + else if ( Vvpl.vpl_npages > 0 ) /* Clean pages from Vvpl list */ + _vc_vacheap (curvrl, onerel, &Vvpl); + + /* ok - free Vvpl list of reapped pages */ + if ( Vvpl.vpl_npages > 0 ) + { + vpp = Vvpl.vpl_pgdesc; + for (i = 0; i < Vvpl.vpl_npages; i++, vpp++) + pfree(*vpp); + pfree (Vvpl.vpl_pgdesc); + if ( Fvpl.vpl_npages > 0 ) + pfree (Fvpl.vpl_pgdesc); + } /* all done with this class */ heap_close(onerel); @@ -376,69 +447,46 @@ _vc_vacone(Portal p, VRelList curvrl) CommitTransactionCommand(); } -# define ADJUST_FREE_SPACE(S)\ - ((DOUBLEALIGN(S) == (S)) ? (S) : (DOUBLEALIGN(S) - sizeof(double))) - /* - * _vc_vacheap() -- vacuum an open heap relation + * _vc_scanheap() -- scan an open heap relation * - * This routine sets commit times, vacuums dead tuples, cleans up - * wasted space on the page, and maintains statistics on the number - * of live tuples in a heap. In addition, it records the tids of - * all tuples removed from the heap for any reason. These tids are - * used in a scan of indices on the relation to get rid of dead - * index tuples. + * This routine sets commit times, constructs Vvpl list of + * empty/uninitialized pages and pages with dead tuples and + * ~LP_USED line pointers, constructs Fvpl list of pages + * appropriate for purposes of shrinking and maintains statistics + * on the number of live tuples in a heap. */ static void -_vc_vacheap(Portal p, VRelList curvrl, Relation onerel) +_vc_scanheap (VRelList curvrl, Relation onerel, + VPageList Vvpl, VPageList Fvpl) { int nblocks, blkno; ItemId itemid; HeapTuple htup; Buffer buf; - Page page; + Page page, tempPage = NULL; OffsetNumber offnum, maxoff; - Relation archrel = NULL; - bool isarchived; - int nvac; - int ntups; - bool pgchanged, tupgone; + bool pgchanged, tupgone, dobufrel, notup; AbsoluteTime purgetime, expiretime; RelativeTime preservetime; char *relname; - VPageDescr vpc; - uint32 nunused, nempg, nnepg, nchpg; - Size frsize; + VPageDescr vpc, vp; + uint32 nvac, ntups, nunused, ncrash, nempg, nnepg, nchpg, nemend; + Size frsize, frsusf; + Size min_tlen = MAXTUPLEN; + Size max_tlen = 0; + int i; struct rusage ru0, ru1; getrusage(RUSAGE_SELF, &ru0); + nvac = ntups = nunused = ncrash = nempg = nnepg = nchpg = nemend = 0; + frsize = frsusf = 0; + relname = (RelationGetRelationName(onerel))->data; - nvac = 0; - ntups = 0; - nunused = nempg = nnepg = nchpg = 0; - frsize = 0; - curvrl->vrl_nrepg = 0; - nblocks = RelationGetNumberOfBlocks(onerel); - /* if the relation has an archive, open it */ - if (onerel->rd_rel->relarch != 'n') { - isarchived = true; - archrel = _vc_getarchrel(onerel); - } else - isarchived = false; - - /* don't vacuum large objects for now. - something breaks when we do*/ - if ( (strlen(relname) > 4) && - relname[0] == 'X' && - relname[1] == 'i' && - relname[2] == 'n' && - (relname[3] == 'v' || relname[3] == 'x')) - return; - /* calculate the purge time: tuples that expired before this time will be archived or deleted */ purgetime = GetCurrentTransactionStartTime(); @@ -456,46 +504,53 @@ _vc_vacheap(Portal p, VRelList curvrl, Relation onerel) purgetime = expiretime; vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber)); + vpc->vpd_nusd = 0; for (blkno = 0; blkno < nblocks; blkno++) { buf = ReadBuffer(onerel, blkno); page = BufferGetPage(buf); vpc->vpd_blkno = blkno; vpc->vpd_noff = 0; + vpc->vpd_noff = 0; if (PageIsNew(page)) { elog (NOTICE, "Rel %.*s: Uninitialized page %u - fixing", NAMEDATALEN, relname, blkno); PageInit (page, BufferGetPageSize (buf), 0); - vpc->vpd_free = PageGetFreeSpace (page); - vpc->vpd_free = ADJUST_FREE_SPACE (vpc->vpd_free); - frsize += vpc->vpd_free; + vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower; + frsize += (vpc->vpd_free - sizeof (ItemIdData)); nnepg++; - _vc_reappage(p, curvrl, vpc); /* No one index should point here */ + nemend++; + _vc_reappage (Vvpl, vpc); WriteBuffer(buf); continue; } if (PageIsEmpty(page)) { - vpc->vpd_free = PageGetFreeSpace (page); - vpc->vpd_free = ADJUST_FREE_SPACE (vpc->vpd_free); - frsize += vpc->vpd_free; + vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower; + frsize += (vpc->vpd_free - sizeof (ItemIdData)); nempg++; - _vc_reappage(p, curvrl, vpc); /* No one index should point here */ + nemend++; + _vc_reappage (Vvpl, vpc); ReleaseBuffer(buf); continue; } pgchanged = false; + notup = true; maxoff = PageGetMaxOffsetNumber(page); for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) { itemid = PageGetItemId(page, offnum); + /* + * Collect un-used items too - it's possible to have + * indices pointing here after crash. + */ if (!ItemIdIsUsed(itemid)) { vpc->vpd_voff[vpc->vpd_noff++] = offnum; - nunused++; + nunused++; continue; } @@ -506,11 +561,21 @@ _vc_vacheap(Portal p, VRelList curvrl, Relation onerel) TransactionIdIsValid((TransactionId)htup->t_xmin)) { if (TransactionIdDidAbort(htup->t_xmin)) { - pgchanged = true; tupgone = true; } else if (TransactionIdDidCommit(htup->t_xmin)) { htup->t_tmin = TransactionIdGetCommitTime(htup->t_xmin); pgchanged = true; + } else if ( !TransactionIdIsInProgress (htup->t_xmin) ) { + /* + * Not Aborted, Not Committed, Not in Progress - + * so it from crashed process. - vadim 11/26/96 + */ + ncrash++; + tupgone = true; + } + else { + elog (MESSLEV, "Rel %.*s: InsertTransactionInProgress %u for TID %u/%u", + NAMEDATALEN, relname, htup->t_xmin, blkno, offnum); } } @@ -532,140 +597,618 @@ _vc_vacheap(Portal p, VRelList curvrl, Relation onerel) if (htup->t_tmax < purgetime) { tupgone = true; - pgchanged = true; } } } + /* + * Is it possible at all ? - vadim 11/26/96 + */ + if ( !TransactionIdIsValid((TransactionId)htup->t_xmin) ) + { + elog (NOTICE, "TID %u/%u: INSERT_TRANSACTION_ID IS INVALID. \ +DELETE_TRANSACTION_ID_VALID %d, TUPGONE %d.", + TransactionIdIsValid((TransactionId)htup->t_xmax), + tupgone); + } + if (tupgone) { - ItemId lpp = &(((PageHeader) page)->pd_linp[offnum - 1]); - - /* write the tuple to the archive, if necessary */ - if (isarchived) - _vc_archive(archrel, htup); + ItemId lpp; + + if ( tempPage == (Page) NULL ) + { + Size pageSize; + + pageSize = PageGetPageSize(page); + tempPage = (Page) palloc(pageSize); + memmove (tempPage, page, pageSize); + } + + lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]); /* mark it unused */ lpp->lp_flags &= ~LP_USED; vpc->vpd_voff[vpc->vpd_noff++] = offnum; + nvac++; - ++nvac; } else { ntups++; + notup = false; + if ( htup->t_len < min_tlen ) + min_tlen = htup->t_len; + if ( htup->t_len > max_tlen ) + max_tlen = htup->t_len; } } if (pgchanged) { - PageRepairFragmentation(page); - if ( vpc->vpd_noff > 0 ) { /* there are some new unused tids here */ - vpc->vpd_free = PageGetFreeSpace (page); - vpc->vpd_free = ADJUST_FREE_SPACE (vpc->vpd_free); - frsize += vpc->vpd_free; - _vc_reappage(p, curvrl, vpc); - } WriteBuffer(buf); + dobufrel = false; nchpg++; - } else { - if ( vpc->vpd_noff > 0 ) { /* there are only old unused tids here */ - vpc->vpd_free = PageGetFreeSpace (page); - vpc->vpd_free = ADJUST_FREE_SPACE (vpc->vpd_free); - frsize += vpc->vpd_free; - _vc_reappage(p, curvrl, vpc); - } - ReleaseBuffer(buf); } + else + dobufrel = true; + if ( tempPage != (Page) NULL ) + { /* Some tuples are gone */ + PageRepairFragmentation(tempPage); + vpc->vpd_free = ((PageHeader)tempPage)->pd_upper - ((PageHeader)tempPage)->pd_lower; + frsize += vpc->vpd_free; + _vc_reappage (Vvpl, vpc); + pfree (tempPage); + tempPage = (Page) NULL; + } + else if ( vpc->vpd_noff > 0 ) + { /* there are only ~LP_USED line pointers */ + vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower; + frsize += vpc->vpd_free; + _vc_reappage (Vvpl, vpc); + } + if ( dobufrel ) + ReleaseBuffer(buf); + if ( notup ) + nemend++; + else + nemend = 0; } - if (isarchived) - heap_close(archrel); + pfree (vpc); /* save stats in the rel list for use later */ curvrl->vrl_ntups = ntups; curvrl->vrl_npages = nblocks; + + Vvpl->vpl_nemend = nemend; + Fvpl->vpl_nemend = nemend; + + /* + * Try to make Fvpl keeping in mind that we can't use free space + * of "empty" end-pages and last page if it reapped. + */ + if ( Vvpl->vpl_npages - nemend > 0 ) + { + int nusf; /* blocks usefull for re-using */ + + nusf = Vvpl->vpl_npages - nemend; + if ( (Vvpl->vpl_pgdesc[nusf-1])->vpd_blkno == nblocks - nemend - 1 ) + nusf--; + + for (i = 0; i < nusf; i++) + { + vp = Vvpl->vpl_pgdesc[i]; + if ( _vc_enough_space (vp, min_tlen) ) + { + _vc_vpinsert (Fvpl, vp); + frsusf += vp->vpd_free; + } + } + } getrusage(RUSAGE_SELF, &ru1); - elog (NOTICE, "Rel %.*s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \ -Tuples %u: Vac %u, UnUsed %u; FreeSpace %u. Elapsed %u/%u sec.", + elog (MESSLEV, "Rel %.*s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \ +Tup %u: Vac %u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. Elapsed %u/%u sec.", NAMEDATALEN, relname, - nblocks, nchpg, curvrl->vrl_nrepg, nempg, nnepg, - ntups, nvac, nunused, frsize, + nblocks, nchpg, Vvpl->vpl_npages, nempg, nnepg, + ntups, nvac, ncrash, nunused, min_tlen, max_tlen, + frsize, frsusf, nemend, Fvpl->vpl_npages, ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec); - pfree (vpc); -} +} /* _vc_scanheap */ + /* - * _vc_vacindices() -- vacuum all the indices for a particular heap relation. + * _vc_rpfheap() -- try to repaire relation' fragmentation * - * On entry, curvrl points at the relation currently being vacuumed. - * We already have a write lock on the relation, so we don't need to - * worry about anyone building an index on it while we're doing the - * vacuuming. The tid list for curvrl is sorted in reverse tid order: - * that is, tids on higher page numbers are before those on lower page - * numbers, and tids high on the page are before those low on the page. - * We use this ordering to cut down the search cost when we look at an - * index entry. - * - * We're executing inside the transaction that vacuumed the heap. + * This routine marks dead tuples as unused and tries re-use dead space + * by moving tuples (and inserting indices if needed). It constructs + * Nvpl list of free-ed pages (moved tuples) and clean indices + * for them after committing (in hack-manner - without losing locks + * and freeing memory!) current transaction. It truncates relation + * if some end-blocks are gone away. */ static void -_vc_vacindices(VRelList curvrl, Relation onerel) +_vc_rpfheap (VRelList curvrl, Relation onerel, + VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel) { - Relation pgindex; - TupleDesc pgidesc; - HeapTuple pgitup; - HeapScanDesc pgiscan; - Buffer buf; - Relation indrel; - Oid indoid; - Datum d; - bool n; - int nindices; - ScanKeyData pgikey; + TransactionId myXID; + CommandId myCID; + AbsoluteTime myCTM; + Buffer buf, ToBuf; + int nblocks, blkno; + Page page, ToPage; + OffsetNumber offnum, maxoff, newoff, moff; + ItemId itemid, newitemid; + HeapTuple htup, newtup; + TupleDesc tupdesc; + Datum *idatum; + char *inulls; + InsertIndexResult iresult; + VPageListData Nvpl; + VPageDescr ToVpd, Fvplast, Vvplast, vpc, *vpp; + IndDesc *Idesc, *idcur; + int Fblklast, Vblklast, i; + Size tlen; + int nmoved, Fnpages, Vnpages; + int nchkmvd, ntups; + bool isempty, dowrite; + Relation archrel; + struct rusage ru0, ru1; - /* see if we can dodge doing any work at all */ - if (!(onerel->rd_rel->relhasindex)) - return; + getrusage(RUSAGE_SELF, &ru0); - nindices = 0; - - /* prepare a heap scan on the pg_index relation */ - pgindex = heap_openr(IndexRelationName); - pgidesc = RelationGetTupleDescriptor(pgindex); - - ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid, - ObjectIdEqualRegProcedure, - ObjectIdGetDatum(curvrl->vrl_relid)); - - pgiscan = heap_beginscan(pgindex, false, NowTimeQual, 1, &pgikey); - - /* vacuum all the indices */ - while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, &buf))) { - d = (Datum) heap_getattr(pgitup, buf, Anum_pg_index_indexrelid, - pgidesc, &n); - indoid = DatumGetObjectId(d); - indrel = index_open(indoid); - _vc_vaconeind(curvrl, indrel); - heap_close(indrel); - nindices++; + myXID = GetCurrentTransactionId(); + myCID = GetCurrentCommandId(); + + if ( Irel != (Relation*) NULL ) /* preparation for index' inserts */ + { + _vc_mkindesc (onerel, nindices, Irel, &Idesc); + tupdesc = RelationGetTupleDescriptor(onerel); + idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof (*idatum)); + inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof (*inulls)); } - heap_endscan(pgiscan); - heap_close(pgindex); - - if (nindices > 0) - curvrl->vrl_hasindex = true; + /* if the relation has an archive, open it */ + if (onerel->rd_rel->relarch != 'n') + { + archrel = _vc_getarchrel(onerel); + /* Archive tuples from "empty" end-pages */ + for ( vpp = Vvpl->vpl_pgdesc + Vvpl->vpl_npages - 1, + i = Vvpl->vpl_nemend; i > 0; i--, vpp-- ) + { + if ( (*vpp)->vpd_noff > 0 ) + { + buf = ReadBuffer(onerel, (*vpp)->vpd_blkno); + page = BufferGetPage(buf); + Assert ( !PageIsEmpty(page) ); + _vc_vacpage (page, *vpp, archrel); + WriteBuffer (buf); + } + } + } else - curvrl->vrl_hasindex = false; -} + archrel = (Relation) NULL; + + Nvpl.vpl_npages = 0; + Fnpages = Fvpl->vpl_npages; + Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1]; + Fblklast = Fvplast->vpd_blkno; + Assert ( Vvpl->vpl_npages > Vvpl->vpl_nemend ); + Vnpages = Vvpl->vpl_npages - Vvpl->vpl_nemend; + Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1]; + Vblklast = Vvplast->vpd_blkno; + Assert ( Vblklast >= Fblklast ); + ToBuf = InvalidBuffer; + nmoved = 0; + + vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber)); + vpc->vpd_nusd = 0; + + nblocks = curvrl->vrl_npages; + for (blkno = nblocks - Vvpl->vpl_nemend - 1; ; blkno--) + { + /* if it's reapped page and it was used by me - quit */ + if ( blkno == Fblklast && Fvplast->vpd_nusd > 0 ) + break; + + buf = ReadBuffer(onerel, blkno); + page = BufferGetPage(buf); + + vpc->vpd_noff = 0; + + isempty = PageIsEmpty(page); + + if ( blkno == Vblklast ) /* it's reapped page */ + { + if ( Vvplast->vpd_noff > 0 ) /* there are dead tuples */ + { /* on this page - clean */ + Assert ( ! isempty ); + _vc_vacpage (page, Vvplast, archrel); + dowrite = true; + } + else + Assert ( isempty ); + Assert ( --Vnpages > 0 ); + /* get prev reapped page from Vvpl */ + Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1]; + Vblklast = Vvplast->vpd_blkno; + if ( blkno == Fblklast ) /* this page in Fvpl too */ + { + Assert ( --Fnpages > 0 ); + Assert ( Fvplast->vpd_nusd == 0 ); + /* get prev reapped page from Fvpl */ + Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1]; + Fblklast = Fvplast->vpd_blkno; + } + Assert ( Fblklast <= Vblklast ); + if ( isempty ) + { + ReleaseBuffer(buf); + continue; + } + } + else + { + Assert ( ! isempty ); + dowrite = false; + } + + vpc->vpd_blkno = blkno; + maxoff = PageGetMaxOffsetNumber(page); + for (offnum = FirstOffsetNumber; + offnum <= maxoff; + offnum = OffsetNumberNext(offnum)) + { + itemid = PageGetItemId(page, offnum); + + if (!ItemIdIsUsed(itemid)) + continue; + + htup = (HeapTuple) PageGetItem(page, itemid); + tlen = htup->t_len; + + /* try to find new page for this tuple */ + if ( ToBuf == InvalidBuffer || + ! _vc_enough_space (ToVpd, tlen) ) + { + if ( ToBuf != InvalidBuffer ) + WriteBuffer(ToBuf); + ToBuf = InvalidBuffer; + for (i=0; i < Fnpages; i++) + { + if ( _vc_enough_space (Fvpl->vpl_pgdesc[i], tlen) ) + break; + } + if ( i == Fnpages ) + break; /* can't move item anywhere */ + ToVpd = Fvpl->vpl_pgdesc[i]; + ToBuf = ReadBuffer(onerel, ToVpd->vpd_blkno); + ToPage = BufferGetPage(ToBuf); + /* if this page was not used before - clean it */ + if ( ! PageIsEmpty(ToPage) && ToVpd->vpd_nusd == 0 ) + _vc_vacpage (ToPage, ToVpd, archrel); + } + + /* copy tuple */ + newtup = (HeapTuple) palloc (tlen); + memmove((char *) newtup, (char *) htup, tlen); + + /* store transaction information */ + TransactionIdStore(myXID, &(newtup->t_xmin)); + newtup->t_cmin = myCID; + StoreInvalidTransactionId(&(newtup->t_xmax)); + newtup->t_tmin = INVALID_ABSTIME; + newtup->t_tmax = CURRENT_ABSTIME; + ItemPointerSetInvalid(&newtup->t_chain); + + /* add tuple to the page */ + newoff = PageAddItem (ToPage, (Item)newtup, tlen, + InvalidOffsetNumber, LP_USED); + if ( newoff == InvalidOffsetNumber ) + { + elog (WARN, "\ +failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)", + tlen, ToVpd->vpd_blkno, ToVpd->vpd_free, + ToVpd->vpd_nusd, ToVpd->vpd_noff); + } + newitemid = PageGetItemId(ToPage, newoff); + pfree (newtup); + newtup = (HeapTuple) PageGetItem(ToPage, newitemid); + ItemPointerSet(&(newtup->t_ctid), ToVpd->vpd_blkno, newoff); + + /* now logically delete end-tuple */ + TransactionIdStore(myXID, &(htup->t_xmax)); + htup->t_cmax = myCID; + memmove ((char*)&(htup->t_chain), (char*)&(newtup->t_ctid), sizeof (newtup->t_ctid)); + + ToVpd->vpd_nusd++; + nmoved++; + ToVpd->vpd_free = ((PageHeader)ToPage)->pd_upper - ((PageHeader)ToPage)->pd_lower; + vpc->vpd_voff[vpc->vpd_noff++] = offnum; + + /* insert index' tuples if needed */ + if ( Irel != (Relation*) NULL ) + { + for (i = 0, idcur = Idesc; i < nindices; i++, idcur++) + { + FormIndexDatum ( + idcur->natts, + (AttrNumber *)&(idcur->tform->indkey[0]), + newtup, + tupdesc, + InvalidBuffer, + idatum, + inulls, + idcur->finfoP); + iresult = index_insert ( + Irel[i], + idatum, + inulls, + &(newtup->t_ctid), + true); + if (iresult) pfree(iresult); + } + } + + } /* walk along page */ + + if ( vpc->vpd_noff > 0 ) /* some tuples were moved */ + { + _vc_reappage (&Nvpl, vpc); + WriteBuffer(buf); + } + else if ( dowrite ) + WriteBuffer(buf); + else + ReleaseBuffer(buf); + + if ( offnum <= maxoff ) + break; /* some item(s) left */ + + } /* walk along relation */ + + blkno++; /* new number of blocks */ + + if ( ToBuf != InvalidBuffer ) + { + Assert (nmoved > 0); + WriteBuffer(ToBuf); + } + + if ( nmoved > 0 ) + { + /* + * We have to commit our tuple' movings before we'll truncate + * relation, but we shouldn't lose our locks. And so - quick hack: + * flush buffers and record status of current transaction + * as committed, and continue. - vadim 11/13/96 + */ + FlushBufferPool(!TransactionFlushEnabled()); + TransactionIdCommit(myXID); + FlushBufferPool(!TransactionFlushEnabled()); + myCTM = TransactionIdGetCommitTime(myXID); + } + + /* + * Clean uncleaned reapped pages from Vvpl list + * and set commit' times for inserted tuples + */ + nchkmvd = 0; + for (i = 0, vpp = Vvpl->vpl_pgdesc; i < Vnpages; i++, vpp++) + { + Assert ( (*vpp)->vpd_blkno < blkno ); + buf = ReadBuffer(onerel, (*vpp)->vpd_blkno); + page = BufferGetPage(buf); + if ( (*vpp)->vpd_nusd == 0 ) /* this page was not used */ + { + /* noff == 0 in empty pages only - such pages should be re-used */ + Assert ( (*vpp)->vpd_noff > 0 ); + _vc_vacpage (page, *vpp, archrel); + } + else /* this page was used */ + { + ntups = 0; + moff = PageGetMaxOffsetNumber(page); + for (newoff = FirstOffsetNumber; + newoff <= moff; + newoff = OffsetNumberNext(newoff)) + { + itemid = PageGetItemId(page, newoff); + if (!ItemIdIsUsed(itemid)) + continue; + htup = (HeapTuple) PageGetItem(page, itemid); + if ( TransactionIdEquals((TransactionId)htup->t_xmin, myXID) ) + { + htup->t_tmin = myCTM; + ntups++; + } + } + Assert ( (*vpp)->vpd_nusd == ntups ); + nchkmvd += ntups; + } + WriteBuffer (buf); + } + Assert ( nmoved == nchkmvd ); + + getrusage(RUSAGE_SELF, &ru1); + + elog (MESSLEV, "Rel %.*s: Pages: %u --> %u; Tuple(s) moved: %u. \ +Elapsed %u/%u sec.", + NAMEDATALEN, (RelationGetRelationName(onerel))->data, + nblocks, blkno, nmoved, + ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, + ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec); + + if ( Nvpl.vpl_npages > 0 ) + { + /* vacuum indices again if needed */ + if ( Irel != (Relation*) NULL ) + { + VPageDescr *vpleft, *vpright, vpsave; + + /* re-sort Nvpl.vpl_pgdesc */ + for (vpleft = Nvpl.vpl_pgdesc, + vpright = Nvpl.vpl_pgdesc + Nvpl.vpl_npages - 1; + vpleft < vpright; vpleft++, vpright--) + { + vpsave = *vpleft; *vpleft = *vpright; *vpright = vpsave; + } + for (i = 0; i < nindices; i++) + _vc_vaconeind (&Nvpl, Irel[i], curvrl->vrl_ntups); + } + + /* + * clean moved tuples from last page in Nvpl list + * if some tuples left there + */ + if ( vpc->vpd_noff > 0 && offnum <= maxoff ) + { + Assert (vpc->vpd_blkno == blkno - 1); + buf = ReadBuffer(onerel, vpc->vpd_blkno); + page = BufferGetPage (buf); + ntups = 0; + maxoff = offnum; + for (offnum = FirstOffsetNumber; + offnum < maxoff; + offnum = OffsetNumberNext(offnum)) + { + itemid = PageGetItemId(page, offnum); + if (!ItemIdIsUsed(itemid)) + continue; + htup = (HeapTuple) PageGetItem(page, itemid); + Assert ( TransactionIdEquals((TransactionId)htup->t_xmax, myXID) ); + itemid->lp_flags &= ~LP_USED; + ntups++; + } + Assert ( vpc->vpd_noff == ntups ); + PageRepairFragmentation(page); + WriteBuffer (buf); + } + + /* now - free new list of reapped pages */ + vpp = Nvpl.vpl_pgdesc; + for (i = 0; i < Nvpl.vpl_npages; i++, vpp++) + pfree(*vpp); + pfree (Nvpl.vpl_pgdesc); + } + + /* truncate relation */ + if ( blkno < nblocks ) + { + blkno = smgrtruncate (onerel->rd_rel->relsmgr, onerel, blkno); + Assert ( blkno >= 0 ); + curvrl->vrl_npages = blkno; /* set new number of blocks */ + } + + if ( archrel != (Relation) NULL ) + heap_close(archrel); + + if ( Irel != (Relation*) NULL ) /* pfree index' allocations */ + { + pfree (Idesc); + pfree (idatum); + pfree (inulls); + _vc_clsindices (nindices, Irel); + } + + pfree (vpc); + +} /* _vc_rpfheap */ + +/* + * _vc_vacheap() -- free dead tuples + * + * This routine marks dead tuples as unused and truncates relation + * if there are "empty" end-blocks. + */ +static void +_vc_vacheap (VRelList curvrl, Relation onerel, VPageList Vvpl) +{ + Buffer buf; + Page page; + VPageDescr *vpp; + Relation archrel; + int nblocks; + int i; + + nblocks = Vvpl->vpl_npages; + /* if the relation has an archive, open it */ + if (onerel->rd_rel->relarch != 'n') + archrel = _vc_getarchrel(onerel); + else + { + archrel = (Relation) NULL; + nblocks -= Vvpl->vpl_nemend; /* nothing to do with them */ + } + + for (i = 0, vpp = Vvpl->vpl_pgdesc; i < nblocks; i++, vpp++) + { + if ( (*vpp)->vpd_noff > 0 ) + { + buf = ReadBuffer(onerel, (*vpp)->vpd_blkno); + page = BufferGetPage (buf); + _vc_vacpage (page, *vpp, archrel); + WriteBuffer (buf); + } + } + + /* truncate relation if there are some empty end-pages */ + if ( Vvpl->vpl_nemend > 0 ) + { + Assert ( curvrl->vrl_npages >= Vvpl->vpl_nemend ); + nblocks = curvrl->vrl_npages - Vvpl->vpl_nemend; + elog (MESSLEV, "Rel %.*s: Pages: %u --> %u.", + NAMEDATALEN, (RelationGetRelationName(onerel))->data, + curvrl->vrl_npages, nblocks); + + /* + * we have to flush "empty" end-pages (if changed, but who knows it) + * before truncation + */ + FlushBufferPool(!TransactionFlushEnabled()); + + nblocks = smgrtruncate (onerel->rd_rel->relsmgr, onerel, nblocks); + Assert ( nblocks >= 0 ); + curvrl->vrl_npages = nblocks; /* set new number of blocks */ + } + + if ( archrel != (Relation) NULL ) + heap_close(archrel); + +} /* _vc_vacheap */ + +/* + * _vc_vacpage() -- free (and archive if needed) dead tuples on a page + * and repaire its fragmentation. + */ +static void +_vc_vacpage (Page page, VPageDescr vpd, Relation archrel) +{ + ItemId itemid; + HeapTuple htup; + int i; + + Assert ( vpd->vpd_nusd == 0 ); + for (i=0; i < vpd->vpd_noff; i++) + { + itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_voff[i] - 1]); + if ( archrel != (Relation) NULL && ItemIdIsUsed(itemid) ) + { + htup = (HeapTuple) PageGetItem (page, itemid); + _vc_archive (archrel, htup); + } + itemid->lp_flags &= ~LP_USED; + } + PageRepairFragmentation(page); + +} /* _vc_vacpage */ /* * _vc_vaconeind() -- vacuum one index relation. * - * Curvrl is the VRelList entry for the heap we're currently vacuuming. - * It's locked. Onerel is an index relation on the vacuumed heap. + * Vpl is the VPageList of the heap we're currently vacuuming. + * It's locked. Indrel is an index relation on the vacuumed heap. * We don't set locks on the index relation here, since the indexed * access methods support locking at different granularities. * We let them handle it. @@ -674,7 +1217,7 @@ _vc_vacindices(VRelList curvrl, Relation onerel) * pg_class. */ static void -_vc_vaconeind(VRelList curvrl, Relation indrel) +_vc_vaconeind(VPageList vpl, Relation indrel, int nhtups) { RetrieveIndexResult res; IndexScanDesc iscan; @@ -682,6 +1225,7 @@ _vc_vaconeind(VRelList curvrl, Relation indrel) int nvac; int nitups; int nipages; + VPageDescr vp; struct rusage ru0, ru1; getrusage(RUSAGE_SELF, &ru0); @@ -695,7 +1239,8 @@ _vc_vaconeind(VRelList curvrl, Relation indrel) != (RetrieveIndexResult) NULL) { heapptr = &res->heap_iptr; - if (_vc_tidreapped(heapptr, curvrl, indrel)) { + if ( (vp = _vc_tidreapped (heapptr, vpl)) != (VPageDescr) NULL) + { #if 0 elog(DEBUG, "<%x,%x> -> <%x,%x>", ItemPointerGetBlockNumber(&(res->index_iptr)), @@ -703,6 +1248,12 @@ _vc_vaconeind(VRelList curvrl, Relation indrel) ItemPointerGetBlockNumber(&(res->heap_iptr)), ItemPointerGetOffsetNumber(&(res->heap_iptr))); #endif + if ( vp->vpd_noff == 0 ) + { /* this is EmptyPage !!! */ + elog (NOTICE, "Ind %.*s: pointer to EmptyPage (blk %u off %u) - fixing", + NAMEDATALEN, indrel->rd_rel->relname.data, + vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr)); + } ++nvac; index_delete(indrel, &res->index_iptr); } else { @@ -721,12 +1272,58 @@ _vc_vaconeind(VRelList curvrl, Relation indrel) getrusage(RUSAGE_SELF, &ru1); - elog (NOTICE, "Ind %.*s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.", + elog (MESSLEV, "Ind %.*s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.", NAMEDATALEN, indrel->rd_rel->relname.data, nipages, nitups, nvac, ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec); -} + if ( nitups != nhtups ) + elog (NOTICE, "NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)", + nitups, nhtups); + +} /* _vc_vaconeind */ + +/* + * _vc_tidreapped() -- is a particular tid reapped? + * + * vpl->VPageDescr_array is sorted in right order. + */ +static VPageDescr +_vc_tidreapped(ItemPointer itemptr, VPageList vpl) +{ + OffsetNumber ioffno; + OffsetNumber *voff; + VPageDescr vp, *vpp; + VPageDescrData vpd; + + vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr); + ioffno = ItemPointerGetOffsetNumber(itemptr); + + vp = &vpd; + vpp = (VPageDescr*) _vc_find_eq ((char*)(vpl->vpl_pgdesc), + vpl->vpl_npages, sizeof (VPageDescr), (char*)&vp, + _vc_cmp_blk); + + if ( vpp == (VPageDescr*) NULL ) + return ((VPageDescr)NULL); + vp = *vpp; + + /* ok - we are on true page */ + + if ( vp->vpd_noff == 0 ) { /* this is EmptyPage !!! */ + return (vp); + } + + voff = (OffsetNumber*) _vc_find_eq ((char*)(vp->vpd_voff), + vp->vpd_noff, sizeof (OffsetNumber), (char*)&ioffno, + _vc_cmp_offno); + + if ( voff == (OffsetNumber*) NULL ) + return ((VPageDescr)NULL); + + return (vp); + +} /* _vc_tidreapped */ /* * _vc_updstats() -- update pg_class statistics for one relation @@ -771,7 +1368,7 @@ _vc_updstats(Oid relid, int npages, int ntuples, bool hasindex) pgcform->relhasindex = hasindex; /* XXX -- after write, should invalidate relcache in other backends */ - WriteNoReleaseBuffer(buf); + WriteNoReleaseBuffer(buf); /* heap_endscan release scan' buffers ? */ /* that's all, folks */ heap_endscan(sdesc); @@ -788,88 +1385,47 @@ static void _vc_setpagelock(Relation rel, BlockNumber blkno) RelationSetLockForWritePage(rel, &itm); } -/* - * _vc_tidreapped() -- is a particular tid reapped? - * - * VPageDescr array is sorted in right order. - */ -static bool -_vc_tidreapped(ItemPointer itemptr, VRelList curvrl, Relation indrel) -{ - OffsetNumber ioffno; - OffsetNumber *voff; - VPageDescr vp, *vpp; - VPageDescrData vpd; - - vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr); - ioffno = ItemPointerGetOffsetNumber(itemptr); - - vp = &vpd; - vpp = (VPageDescr*) _vc_find_eq ((char*)(curvrl->vrl_pgdsc), - curvrl->vrl_nrepg, sizeof (VPageDescr), (char*)&vp, - _vc_cmp_blk); - - if ( vpp == (VPageDescr*) NULL ) - return (false); - vp = *vpp; - - /* ok - we are on true page */ - - if ( vp->vpd_noff == 0 ) { /* this is EmptyPage !!! */ - elog (NOTICE, "Ind %.*s: pointer to EmptyPage (blk %u off %u) - fixing", - NAMEDATALEN, indrel->rd_rel->relname.data, - vpd.vpd_blkno, ioffno); - return (true); - } - - voff = (OffsetNumber*) _vc_find_eq ((char*)(vp->vpd_voff), - vp->vpd_noff, sizeof (OffsetNumber), (char*)&ioffno, - _vc_cmp_offno); - - if ( voff == (OffsetNumber*) NULL ) - return (false); - - return (true); - -} /* _vc_tidreapped */ - /* - * _vc_reappage() -- save a page on the array of reaped pages for the current - * entry on the vacuum relation list. + * _vc_reappage() -- save a page on the array of reapped pages. * * As a side effect of the way that the vacuuming loop for a given * relation works, higher pages come after lower pages in the array * (and highest tid on a page is last). */ -static void -_vc_reappage(Portal p, - VRelList curvrl, - VPageDescr vpc) +static void +_vc_reappage(VPageList vpl, VPageDescr vpc) { - PortalVariableMemory pmem; - MemoryContext old; VPageDescr newvpd; - /* allocate a VPageDescrData entry in the portal memory context */ - pmem = PortalGetVariableMemory(p); - old = MemoryContextSwitchTo((MemoryContext) pmem); - if ( curvrl->vrl_nrepg == 0 ) - curvrl->vrl_pgdsc = (VPageDescr*) palloc(100*sizeof(VPageDescr)); - else if ( curvrl->vrl_nrepg % 100 == 0 ) - curvrl->vrl_pgdsc = (VPageDescr*) repalloc(curvrl->vrl_pgdsc, (curvrl->vrl_nrepg+100)*sizeof(VPageDescr)); + /* allocate a VPageDescrData entry */ newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_noff*sizeof(OffsetNumber)); - MemoryContextSwitchTo(old); /* fill it in */ if ( vpc->vpd_noff > 0 ) memmove (newvpd->vpd_voff, vpc->vpd_voff, vpc->vpd_noff*sizeof(OffsetNumber)); newvpd->vpd_blkno = vpc->vpd_blkno; newvpd->vpd_free = vpc->vpd_free; + newvpd->vpd_nusd = vpc->vpd_nusd; newvpd->vpd_noff = vpc->vpd_noff; - curvrl->vrl_pgdsc[curvrl->vrl_nrepg] = newvpd; - (curvrl->vrl_nrepg)++; + /* insert this page into vpl list */ + _vc_vpinsert (vpl, newvpd); + +} /* _vc_reappage */ + +static void +_vc_vpinsert (VPageList vpl, VPageDescr vpnew) +{ + + /* allocate a VPageDescr entry if needed */ + if ( vpl->vpl_npages == 0 ) + vpl->vpl_pgdesc = (VPageDescr*) palloc(100*sizeof(VPageDescr)); + else if ( vpl->vpl_npages % 100 == 0 ) + vpl->vpl_pgdesc = (VPageDescr*) repalloc(vpl->vpl_pgdesc, (vpl->vpl_npages+100)*sizeof(VPageDescr)); + vpl->vpl_pgdesc[vpl->vpl_npages] = vpnew; + (vpl->vpl_npages)++; + } static void @@ -877,8 +1433,6 @@ _vc_free(Portal p, VRelList vrl) { VRelList p_vrl; VAttList p_val, val; - VPageDescr *vpd; - int i; MemoryContext old; PortalVariableMemory pmem; @@ -895,15 +1449,6 @@ _vc_free(Portal p, VRelList vrl) pfree(p_val); } - /* free page' descriptions */ - if ( vrl->vrl_nrepg > 0 ) { - vpd = vrl->vrl_pgdsc; - for (i = 0; i < vrl->vrl_nrepg; i++) { - pfree(vpd[i]); - } - pfree (vpd); - } - /* free rel list entry */ p_vrl = vrl; vrl = vrl->vrl_next; @@ -1038,3 +1583,149 @@ _vc_cmp_offno (char *left, char *right) return (1); } /* _vc_cmp_offno */ + + +static void +_vc_getindices (Oid relid, int *nindices, Relation **Irel) +{ + Relation pgindex; + Relation irel; + TupleDesc pgidesc; + HeapTuple pgitup; + HeapScanDesc pgiscan; + Datum d; + int i, k; + bool n; + ScanKeyData pgikey; + Oid *ioid; + + *nindices = i = 0; + + ioid = (Oid *) palloc(10*sizeof(Oid)); + + /* prepare a heap scan on the pg_index relation */ + pgindex = heap_openr(IndexRelationName); + pgidesc = RelationGetTupleDescriptor(pgindex); + + ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid, + ObjectIdEqualRegProcedure, + ObjectIdGetDatum(relid)); + + pgiscan = heap_beginscan(pgindex, false, NowTimeQual, 1, &pgikey); + + while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, NULL))) { + d = (Datum) heap_getattr(pgitup, InvalidBuffer, Anum_pg_index_indexrelid, + pgidesc, &n); + i++; + if ( i % 10 == 0 ) + ioid = (Oid *) repalloc(ioid, (i+10)*sizeof(Oid)); + ioid[i-1] = DatumGetObjectId(d); + } + + heap_endscan(pgiscan); + heap_close(pgindex); + + if ( i == 0 ) { /* No one index found */ + pfree(ioid); + return; + } + + if ( Irel != (Relation **) NULL ) + *Irel = (Relation *) palloc(i * sizeof(Relation)); + + for (k = 0; i > 0; ) + { + irel = index_open(ioid[--i]); + if ( irel != (Relation) NULL ) + { + if ( Irel != (Relation **) NULL ) + (*Irel)[k] = irel; + else + index_close (irel); + k++; + } + else + elog (NOTICE, "CAN't OPEN INDEX %u - SKIP IT", ioid[i]); + } + *nindices = k; + pfree(ioid); + + if ( Irel != (Relation **) NULL && *nindices == 0 ) + { + pfree (*Irel); + *Irel = (Relation *) NULL; + } + +} /* _vc_getindices */ + + +static void +_vc_clsindices (int nindices, Relation *Irel) +{ + + if ( Irel == (Relation*) NULL ) + return; + + while (nindices--) { + index_close (Irel[nindices]); + } + pfree (Irel); + +} /* _vc_clsindices */ + + +static void +_vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc) +{ + IndDesc *idcur; + HeapTuple pgIndexTup; + AttrNumber *attnumP; + int natts; + int i; + + *Idesc = (IndDesc *) palloc (nindices * sizeof (IndDesc)); + + for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++) { + pgIndexTup = + SearchSysCacheTuple(INDEXRELID, + ObjectIdGetDatum(Irel[i]->rd_id), + 0,0,0); + Assert(pgIndexTup); + idcur->tform = (IndexTupleForm)GETSTRUCT(pgIndexTup); + for (attnumP = &(idcur->tform->indkey[0]), natts = 0; + *attnumP != InvalidAttrNumber && natts != INDEX_MAX_KEYS; + attnumP++, natts++); + if (idcur->tform->indproc != InvalidOid) { + idcur->finfoP = &(idcur->finfo); + FIgetnArgs(idcur->finfoP) = natts; + natts = 1; + FIgetProcOid(idcur->finfoP) = idcur->tform->indproc; + *(FIgetname(idcur->finfoP)) = '\0'; + } else + idcur->finfoP = (FuncIndexInfo *) NULL; + + idcur->natts = natts; + } + +} /* _vc_mkindesc */ + + +static bool +_vc_enough_space (VPageDescr vpd, Size len) +{ + + len = DOUBLEALIGN(len); + + if ( len > vpd->vpd_free ) + return (false); + + if ( vpd->vpd_nusd < vpd->vpd_noff ) /* there are free itemid(s) */ + return (true); /* and len <= free_space */ + + /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */ + if ( len <= vpd->vpd_free - sizeof (ItemIdData) ) + return (true); + + return (false); + +} /* _vc_enough_space */