Add permissions check: now one must be the Postgres superuser or the

table owner in order to vacuum a table.  This is mainly to prevent
denial-of-service attacks via repeated vacuums.  Allow VACUUM to gather
statistics about system relations, except for pg_statistic itself ---
not clear that it's worth the trouble to make that case work cleanly.
Cope with possible tuple size overflow in pg_statistic tuples; I'm
surprised we never realized that could happen.  Hold a couple of locks
a little longer to try to prevent deadlocks between concurrent VACUUMs.
There still seem to be some problems in that last area though :-(
This commit is contained in:
Tom Lane 1999-11-29 04:43:15 +00:00
parent 8a7f31a7d8
commit d367f626f4

View File

@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.127 1999/11/28 02:10:01 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.128 1999/11/29 04:43:15 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -35,6 +35,7 @@
#include "storage/sinval.h" #include "storage/sinval.h"
#include "storage/smgr.h" #include "storage/smgr.h"
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/inval.h" #include "utils/inval.h"
#include "utils/portal.h" #include "utils/portal.h"
@ -393,15 +394,35 @@ vc_vacone(Oid relid, bool analyze, List *va_cols)
} }
/* /*
* Open the class, get an exclusive lock on it, and vacuum it * Open the class, get an exclusive lock on it, and check permissions.
*
* Note we choose to treat permissions failure as a NOTICE and keep
* trying to vacuum the rest of the DB --- is this appropriate?
*/ */
onerel = heap_open(relid, AccessExclusiveLock); onerel = heap_open(relid, AccessExclusiveLock);
#ifndef NO_SECURITY
if (!pg_ownercheck(GetPgUserName(), RelationGetRelationName(onerel),
RELNAME))
{
elog(NOTICE, "Skipping \"%s\" --- only table owner can VACUUM it",
RelationGetRelationName(onerel));
heap_close(onerel, AccessExclusiveLock);
CommitTransactionCommand();
return;
}
#endif
/*
* Set up statistics-gathering machinery.
*/
vacrelstats = (VRelStats *) palloc(sizeof(VRelStats)); vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
vacrelstats->relid = relid; vacrelstats->relid = relid;
vacrelstats->num_pages = vacrelstats->num_tuples = 0; vacrelstats->num_pages = vacrelstats->num_tuples = 0;
vacrelstats->hasindex = false; vacrelstats->hasindex = false;
if (analyze && !IsSystemRelationName(RelationGetRelationName(onerel))) /* we can VACUUM ANALYZE any table except pg_statistic; see vc_updstats */
if (analyze &&
strcmp(RelationGetRelationName(onerel), StatisticRelationName) != 0)
{ {
int attr_cnt, int attr_cnt,
*attnums = NULL; *attnums = NULL;
@ -498,7 +519,7 @@ vc_vacone(Oid relid, bool analyze, List *va_cols)
stats->outfunc = InvalidOid; stats->outfunc = InvalidOid;
} }
vacrelstats->va_natts = attr_cnt; vacrelstats->va_natts = attr_cnt;
/* delete existing pg_statistics rows for relation */ /* delete existing pg_statistic rows for relation */
vc_delstats(relid, ((attnums) ? attr_cnt : 0), attnums); vc_delstats(relid, ((attnums) ? attr_cnt : 0), attnums);
if (attnums) if (attnums)
pfree(attnums); pfree(attnums);
@ -2248,19 +2269,38 @@ vc_bucketcpy(Form_pg_attribute attr, Datum value, Datum *bucket, int *bucket_len
} }
/* /*
* vc_updstats() -- update pg_class statistics for one relation * vc_updstats() -- update statistics for one relation
*
* Statistics are stored in several places: the pg_class row for the
* relation has stats about the whole relation, the pg_attribute rows
* for each attribute store "disbursion", and there is a pg_statistic
* row for each (non-system) attribute. (Disbursion probably ought to
* be moved to pg_statistic, but it's not worth doing unless there's
* another reason to have to change pg_attribute.) Disbursion and
* pg_statistic values are only updated by VACUUM ANALYZE, but we
* always update the stats in pg_class.
* *
* This routine works for both index and heap relation entries in * This routine works for both index and heap relation entries in
* pg_class. We violate no-overwrite semantics here by storing new * pg_class. We violate no-overwrite semantics here by storing new
* values for num_tuples, num_pages, and hasindex directly in the pg_class * values for the statistics columns directly into the pg_class
* tuple that's already on the page. The reason for this is that if * tuple that's already on the page. The reason for this is that if
* we updated these tuples in the usual way, then every tuple in pg_class * we updated these tuples in the usual way, vacuuming pg_class itself
* would be replaced every day. This would make planning and executing * wouldn't work very well --- by the time we got done with a vacuum
* historical queries very expensive. Note that we also don't use * cycle, most of the tuples in pg_class would've been obsoleted.
* any locking while doing updation. * Updating pg_class's own statistics would be especially tricky.
* Of course, this only works for fixed-size never-null columns, but
* these are.
*
* Updates of pg_attribute statistics are handled in the same way
* for the same reasons.
*
* To keep things simple, we punt for pg_statistic, and don't try
* to compute or store rows for pg_statistic itself in pg_statistic.
* This could possibly be made to work, but it's not worth the trouble.
*/ */
static void static void
vc_updstats(Oid relid, int num_pages, int num_tuples, bool hasindex, VRelStats *vacrelstats) vc_updstats(Oid relid, int num_pages, int num_tuples, bool hasindex,
VRelStats *vacrelstats)
{ {
Relation rd, Relation rd,
ad, ad,
@ -2298,6 +2338,12 @@ vc_updstats(Oid relid, int num_pages, int num_tuples, bool hasindex, VRelStats *
pgcform->relpages = num_pages; pgcform->relpages = num_pages;
pgcform->relhasindex = hasindex; pgcform->relhasindex = hasindex;
/* invalidate the tuple in the cache and write the buffer */
RelationInvalidateHeapTuple(rd, &rtup);
WriteBuffer(buffer);
heap_close(rd, RowExclusiveLock);
if (vacrelstats != NULL && vacrelstats->va_natts > 0) if (vacrelstats != NULL && vacrelstats->va_natts > 0)
{ {
VacAttrStats *vacattrstats = vacrelstats->vacattrstats; VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
@ -2305,6 +2351,8 @@ vc_updstats(Oid relid, int num_pages, int num_tuples, bool hasindex, VRelStats *
ad = heap_openr(AttributeRelationName, RowExclusiveLock); ad = heap_openr(AttributeRelationName, RowExclusiveLock);
sd = heap_openr(StatisticRelationName, RowExclusiveLock); sd = heap_openr(StatisticRelationName, RowExclusiveLock);
/* Find pg_attribute rows for this relation */
ScanKeyEntryInitialize(&askey, 0, Anum_pg_attribute_attrelid, ScanKeyEntryInitialize(&askey, 0, Anum_pg_attribute_attrelid,
F_INT4EQ, relid); F_INT4EQ, relid);
@ -2313,15 +2361,10 @@ vc_updstats(Oid relid, int num_pages, int num_tuples, bool hasindex, VRelStats *
while (HeapTupleIsValid(atup = heap_getnext(scan, 0))) while (HeapTupleIsValid(atup = heap_getnext(scan, 0)))
{ {
int i; int i;
float32data selratio; /* average ratio of rows selected
* for a random constant */
VacAttrStats *stats; VacAttrStats *stats;
Datum values[Natts_pg_statistic];
char nulls[Natts_pg_statistic];
attp = (Form_pg_attribute) GETSTRUCT(atup); attp = (Form_pg_attribute) GETSTRUCT(atup);
if (attp->attnum <= 0) /* skip system attributes for now, */ if (attp->attnum <= 0) /* skip system attributes for now */
/* they are unique anyway */
continue; continue;
for (i = 0; i < natts; i++) for (i = 0; i < natts; i++)
@ -2330,12 +2373,15 @@ vc_updstats(Oid relid, int num_pages, int num_tuples, bool hasindex, VRelStats *
break; break;
} }
if (i >= natts) if (i >= natts)
continue; continue; /* skip attr if no stats collected */
stats = &(vacattrstats[i]); stats = &(vacattrstats[i]);
/* overwrite the existing statistics in the tuple */
if (VacAttrStatsEqValid(stats)) if (VacAttrStatsEqValid(stats))
{ {
float32data selratio; /* average ratio of rows selected
* for a random constant */
/* Compute disbursion */
if (stats->nonnull_cnt == 0 && stats->null_cnt == 0) if (stats->nonnull_cnt == 0 && stats->null_cnt == 0)
{ {
/* empty relation, so put a dummy value in attdisbursion */ /* empty relation, so put a dummy value in attdisbursion */
@ -2383,23 +2429,24 @@ vc_updstats(Oid relid, int num_pages, int num_tuples, bool hasindex, VRelStats *
else if (selratio > 1.0) else if (selratio > 1.0)
selratio = 1.0; selratio = 1.0;
} }
/* overwrite the existing statistics in the tuple */
attp->attdisbursion = selratio; attp->attdisbursion = selratio;
/* /* invalidate the tuple in the cache and write the buffer */
* Invalidate the cache for the tuple and write the buffer
*/
RelationInvalidateHeapTuple(ad, atup); RelationInvalidateHeapTuple(ad, atup);
WriteNoReleaseBuffer(scan->rs_cbuf); WriteNoReleaseBuffer(scan->rs_cbuf);
/* DO PG_STATISTIC INSERTS */
/* /*
* doing system relations, especially pg_statistic is a * Create pg_statistic tuples for the relation, if we have
* problem * gathered the right data. vc_delstats() previously
* deleted all the pg_statistic tuples for the rel, so we
* just have to insert new ones here.
*
* Note vc_vacone() has seen to it that we won't come here
* when vacuuming pg_statistic itself.
*/ */
if (VacAttrStatsLtGtValid(stats) && stats->initialized if (VacAttrStatsLtGtValid(stats) && stats->initialized)
/* && !IsSystemRelationName(NameData(pgcform->relname))
*/ )
{ {
float32data nullratio; float32data nullratio;
float32data bestratio; float32data bestratio;
@ -2408,6 +2455,8 @@ vc_updstats(Oid relid, int num_pages, int num_tuples, bool hasindex, VRelStats *
double best_cnt_d = stats->best_cnt, double best_cnt_d = stats->best_cnt,
null_cnt_d = stats->null_cnt, null_cnt_d = stats->null_cnt,
nonnull_cnt_d = stats->nonnull_cnt; /* prevent overflow */ nonnull_cnt_d = stats->nonnull_cnt; /* prevent overflow */
Datum values[Natts_pg_statistic];
char nulls[Natts_pg_statistic];
nullratio = null_cnt_d / (nonnull_cnt_d + null_cnt_d); nullratio = null_cnt_d / (nonnull_cnt_d + null_cnt_d);
bestratio = best_cnt_d / (nonnull_cnt_d + null_cnt_d); bestratio = best_cnt_d / (nonnull_cnt_d + null_cnt_d);
@ -2441,13 +2490,26 @@ vc_updstats(Oid relid, int num_pages, int num_tuples, bool hasindex, VRelStats *
stup = heap_formtuple(sd->rd_att, values, nulls); stup = heap_formtuple(sd->rd_att, values, nulls);
/* ---------------- /* ----------------
* insert the tuple in the relation. * Watch out for oversize tuple, which can happen if
* all three of the saved data values are long.
* Our fallback strategy is just to not store the
* pg_statistic tuple at all in that case. (We could
* replace the values by NULLs and still store the
* numeric stats, but presently selfuncs.c couldn't
* do anything useful with that case anyway.)
*
* We could reduce the probability of overflow, but not
* prevent it, by storing the data values as compressed
* text; is that worth doing? The problem should go
* away whenever long tuples get implemented...
* ---------------- * ----------------
*/ */
heap_insert(sd, stup); if (MAXALIGN(stup->t_len) <= MaxTupleSize)
{ {
/* OK, store tuple and update indexes too */
Relation irelations[Num_pg_statistic_indices]; Relation irelations[Num_pg_statistic_indices];
heap_insert(sd, stup);
CatalogOpenIndices(Num_pg_statistic_indices, Name_pg_statistic_indices, irelations); CatalogOpenIndices(Num_pg_statistic_indices, Name_pg_statistic_indices, irelations);
CatalogIndexInsert(irelations, Num_pg_statistic_indices, sd, stup); CatalogIndexInsert(irelations, Num_pg_statistic_indices, sd, stup);
CatalogCloseIndices(Num_pg_statistic_indices, irelations); CatalogCloseIndices(Num_pg_statistic_indices, irelations);
@ -2462,22 +2524,14 @@ vc_updstats(Oid relid, int num_pages, int num_tuples, bool hasindex, VRelStats *
} }
} }
heap_endscan(scan); heap_endscan(scan);
heap_close(ad, RowExclusiveLock); /* close rels, but hold locks till upcoming commit */
heap_close(sd, RowExclusiveLock); heap_close(ad, NoLock);
heap_close(sd, NoLock);
} }
/*
* Invalidate the cached pg_class tuple and write the buffer
*/
RelationInvalidateHeapTuple(rd, &rtup);
WriteBuffer(buffer);
heap_close(rd, RowExclusiveLock);
} }
/* /*
* vc_delstats() -- delete pg_statistics rows for a relation * vc_delstats() -- delete pg_statistic rows for a relation
* *
* If a list of attribute numbers is given, only zap stats for those attrs. * If a list of attribute numbers is given, only zap stats for those attrs.
*/ */
@ -2514,7 +2568,13 @@ vc_delstats(Oid relid, int attcnt, int *attnums)
} }
heap_endscan(scan); heap_endscan(scan);
heap_close(pgstatistic, RowExclusiveLock); /*
* Close rel, but *keep* lock; we will need to reacquire it later,
* so there's a possibility of deadlock against another VACUUM process
* if we let go now. Keeping the lock shouldn't delay any common
* operation other than an attempted VACUUM of pg_statistic itself.
*/
heap_close(pgstatistic, NoLock);
} }
/* /*