Tom Lane 81b9b5ce49 Make gistvacuumcleanup() count the actual number of index tuples.
Previously, it just returned the heap tuple count, which might be only an
estimate, and would be completely the wrong thing if the index is partial.
Since this function scans every index page anyway to find free pages,
it's practically free to count the surviving index tuples.  Let's do that
and return an accurate count.

This is easily visible as a wrong reltuples value for a partial GiST
index following VACUUM, so back-patch to all supported branches.

Andrey Borodin, reviewed by Michail Nikolaev

Discussion: https://postgr.es/m/151956654251.6915.675951950408204404.pgcf@coridan.postgresql.org
2018-03-02 11:22:42 -05:00

275 lines
6.7 KiB
C

/*-------------------------------------------------------------------------
*
* gistvacuum.c
* vacuuming routines for the postgres GiST index access method.
*
*
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/gist/gistvacuum.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/gist_private.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
#include "storage/indexfsm.h"
#include "storage/lmgr.h"
/*
* VACUUM cleanup: update FSM
*/
IndexBulkDeleteResult *
gistvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
{
Relation rel = info->index;
BlockNumber npages,
blkno;
BlockNumber totFreePages;
double tuplesCount;
bool needLock;
/* No-op in ANALYZE ONLY mode */
if (info->analyze_only)
return stats;
/* Set up all-zero stats if gistbulkdelete wasn't called */
if (stats == NULL)
stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
/*
* Need lock unless it's local to this backend.
*/
needLock = !RELATION_IS_LOCAL(rel);
/* try to find deleted pages */
if (needLock)
LockRelationForExtension(rel, ExclusiveLock);
npages = RelationGetNumberOfBlocks(rel);
if (needLock)
UnlockRelationForExtension(rel, ExclusiveLock);
totFreePages = 0;
tuplesCount = 0;
for (blkno = GIST_ROOT_BLKNO + 1; blkno < npages; blkno++)
{
Buffer buffer;
Page page;
vacuum_delay_point();
buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
info->strategy);
LockBuffer(buffer, GIST_SHARE);
page = (Page) BufferGetPage(buffer);
if (PageIsNew(page) || GistPageIsDeleted(page))
{
totFreePages++;
RecordFreeIndexPage(rel, blkno);
}
else if (GistPageIsLeaf(page))
{
/* count tuples in index (considering only leaf tuples) */
tuplesCount += PageGetMaxOffsetNumber(page);
}
UnlockReleaseBuffer(buffer);
}
/* Finally, vacuum the FSM */
IndexFreeSpaceMapVacuum(info->index);
/* return statistics */
stats->pages_free = totFreePages;
if (needLock)
LockRelationForExtension(rel, ExclusiveLock);
stats->num_pages = RelationGetNumberOfBlocks(rel);
if (needLock)
UnlockRelationForExtension(rel, ExclusiveLock);
stats->num_index_tuples = tuplesCount;
stats->estimated_count = false;
return stats;
}
typedef struct GistBDItem
{
GistNSN parentlsn;
BlockNumber blkno;
struct GistBDItem *next;
} GistBDItem;
static void
pushStackIfSplited(Page page, GistBDItem *stack)
{
GISTPageOpaque opaque = GistPageGetOpaque(page);
if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) &&
(GistFollowRight(page) || stack->parentlsn < GistPageGetNSN(page)) &&
opaque->rightlink != InvalidBlockNumber /* sanity check */ )
{
/* split page detected, install right link to the stack */
GistBDItem *ptr = (GistBDItem *) palloc(sizeof(GistBDItem));
ptr->blkno = opaque->rightlink;
ptr->parentlsn = stack->parentlsn;
ptr->next = stack->next;
stack->next = ptr;
}
}
/*
* Bulk deletion of all index entries pointing to a set of heap tuples and
* check invalid tuples left after upgrade.
* The set of target tuples is specified via a callback routine that tells
* whether any given heap tuple (identified by ItemPointer) is being deleted.
*
* Result: a palloc'd struct containing statistical info for VACUUM displays.
*/
IndexBulkDeleteResult *
gistbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
IndexBulkDeleteCallback callback, void *callback_state)
{
Relation rel = info->index;
GistBDItem *stack,
*ptr;
/* first time through? */
if (stats == NULL)
stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
/* we'll re-count the tuples each time */
stats->estimated_count = false;
stats->num_index_tuples = 0;
stack = (GistBDItem *) palloc0(sizeof(GistBDItem));
stack->blkno = GIST_ROOT_BLKNO;
while (stack)
{
Buffer buffer;
Page page;
OffsetNumber i,
maxoff;
IndexTuple idxtuple;
ItemId iid;
buffer = ReadBufferExtended(rel, MAIN_FORKNUM, stack->blkno,
RBM_NORMAL, info->strategy);
LockBuffer(buffer, GIST_SHARE);
gistcheckpage(rel, buffer);
page = (Page) BufferGetPage(buffer);
if (GistPageIsLeaf(page))
{
OffsetNumber todelete[MaxOffsetNumber];
int ntodelete = 0;
LockBuffer(buffer, GIST_UNLOCK);
LockBuffer(buffer, GIST_EXCLUSIVE);
page = (Page) BufferGetPage(buffer);
if (stack->blkno == GIST_ROOT_BLKNO && !GistPageIsLeaf(page))
{
/* only the root can become non-leaf during relock */
UnlockReleaseBuffer(buffer);
/* one more check */
continue;
}
/*
* check for split proceeded after look at parent, we should check
* it after relock
*/
pushStackIfSplited(page, stack);
/*
* Remove deletable tuples from page
*/
maxoff = PageGetMaxOffsetNumber(page);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
iid = PageGetItemId(page, i);
idxtuple = (IndexTuple) PageGetItem(page, iid);
if (callback(&(idxtuple->t_tid), callback_state))
todelete[ntodelete++] = i;
else
stats->num_index_tuples += 1;
}
stats->tuples_removed += ntodelete;
if (ntodelete)
{
START_CRIT_SECTION();
MarkBufferDirty(buffer);
PageIndexMultiDelete(page, todelete, ntodelete);
GistMarkTuplesDeleted(page);
if (RelationNeedsWAL(rel))
{
XLogRecPtr recptr;
recptr = gistXLogUpdate(buffer,
todelete, ntodelete,
NULL, 0, InvalidBuffer);
PageSetLSN(page, recptr);
}
else
PageSetLSN(page, gistGetFakeLSN(rel));
END_CRIT_SECTION();
}
}
else
{
/* check for split proceeded after look at parent */
pushStackIfSplited(page, stack);
maxoff = PageGetMaxOffsetNumber(page);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
iid = PageGetItemId(page, i);
idxtuple = (IndexTuple) PageGetItem(page, iid);
ptr = (GistBDItem *) palloc(sizeof(GistBDItem));
ptr->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
ptr->parentlsn = BufferGetLSNAtomic(buffer);
ptr->next = stack->next;
stack->next = ptr;
if (GistTupleIsInvalid(idxtuple))
ereport(LOG,
(errmsg("index \"%s\" contains an inner tuple marked as invalid",
RelationGetRelationName(rel)),
errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
errhint("Please REINDEX it.")));
}
}
UnlockReleaseBuffer(buffer);
ptr = stack->next;
pfree(stack);
stack = ptr;
vacuum_delay_point();
}
return stats;
}