Fix contrib/seg's GiST picksplit method.

This patch replaces Guttman's generalized split method with a simple
sort-by-center-points algorithm.  Since the data is only one-dimensional
we don't really need the slow and none-too-stable Guttman method.

This is in part a bug fix, since seg has the same size_alpha versus
size_beta typo that was recently fixed in contrib/cube.  It seems
prudent to apply this rather aggressive fix only in HEAD, though.
Back branches will just get the typo fix.

Alexander Korotkov, reviewed by Yeb Havinga
This commit is contained in:
Tom Lane 2010-12-15 21:14:24 -05:00
parent 290f1603b4
commit 2a6ebe70fb

View File

@ -33,6 +33,16 @@ extern void seg_scanner_finish(void);
extern int seg_yydebug;
*/
/*
* Auxiliary data structure for picksplit method.
*/
typedef struct
{
float center;
OffsetNumber index;
SEG *data;
} gseg_picksplit_item;
/*
** Input/Output routines
*/
@ -292,152 +302,104 @@ gseg_penalty(GISTENTRY *origentry, GISTENTRY *newentry, float *result)
return (result);
}
/*
* Compare function for gseg_picksplit_item: sort by center.
*/
static int
gseg_picksplit_item_cmp(const void *a, const void *b)
{
const gseg_picksplit_item *i1 = (const gseg_picksplit_item *) a;
const gseg_picksplit_item *i2 = (const gseg_picksplit_item *) b;
if (i1->center < i2->center)
return -1;
else if (i1->center == i2->center)
return 0;
else
return 1;
}
/*
** The GiST PickSplit method for segments
** We use Guttman's poly time split algorithm
*/
* The GiST PickSplit method for segments
*
* We used to use Guttman's split algorithm here, but since the data is 1-D
* it's easier and more robust to just sort the segments by center-point and
* split at the middle.
*/
GIST_SPLITVEC *
gseg_picksplit(GistEntryVector *entryvec,
GIST_SPLITVEC *v)
{
OffsetNumber i,
j;
SEG *datum_alpha,
*datum_beta;
int i;
SEG *datum_l,
*datum_r;
SEG *union_d,
*union_dl,
*union_dr;
SEG *inter_d;
bool firsttime;
float size_alpha,
size_beta,
size_union,
size_inter;
float size_waste,
waste;
float size_l,
size_r;
int nbytes;
OffsetNumber seed_1 = 1,
seed_2 = 2;
*datum_r,
*seg;
gseg_picksplit_item *sort_items;
OffsetNumber *left,
*right;
OffsetNumber maxoff;
OffsetNumber firstright;
#ifdef GIST_DEBUG
fprintf(stderr, "picksplit\n");
#endif
maxoff = entryvec->n - 2;
nbytes = (maxoff + 2) * sizeof(OffsetNumber);
v->spl_left = (OffsetNumber *) palloc(nbytes);
v->spl_right = (OffsetNumber *) palloc(nbytes);
/* Valid items in entryvec->vector[] are indexed 1..maxoff */
maxoff = entryvec->n - 1;
firsttime = true;
waste = 0.0;
for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i))
/*
* Prepare the auxiliary array and sort it.
*/
sort_items = (gseg_picksplit_item *)
palloc(maxoff * sizeof(gseg_picksplit_item));
for (i = 1; i <= maxoff; i++)
{
datum_alpha = (SEG *) DatumGetPointer(entryvec->vector[i].key);
for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j))
{
datum_beta = (SEG *) DatumGetPointer(entryvec->vector[j].key);
/* compute the wasted space by unioning these guys */
/* size_waste = size_union - size_inter; */
union_d = seg_union(datum_alpha, datum_beta);
rt_seg_size(union_d, &size_union);
inter_d = seg_inter(datum_alpha, datum_beta);
rt_seg_size(inter_d, &size_inter);
size_waste = size_union - size_inter;
/*
* are these a more promising split that what we've already seen?
*/
if (size_waste > waste || firsttime)
{
waste = size_waste;
seed_1 = i;
seed_2 = j;
firsttime = false;
}
}
seg = (SEG *) DatumGetPointer(entryvec->vector[i].key);
/* center calculation is done this way to avoid possible overflow */
sort_items[i - 1].center = seg->lower*0.5f + seg->upper*0.5f;
sort_items[i - 1].index = i;
sort_items[i - 1].data = seg;
}
qsort(sort_items, maxoff, sizeof(gseg_picksplit_item),
gseg_picksplit_item_cmp);
/* sort items below "firstright" will go into the left side */
firstright = maxoff / 2;
v->spl_left = (OffsetNumber *) palloc(maxoff * sizeof(OffsetNumber));
v->spl_right = (OffsetNumber *) palloc(maxoff * sizeof(OffsetNumber));
left = v->spl_left;
v->spl_nleft = 0;
right = v->spl_right;
v->spl_nright = 0;
datum_alpha = (SEG *) DatumGetPointer(entryvec->vector[seed_1].key);
datum_l = seg_union(datum_alpha, datum_alpha);
rt_seg_size(datum_l, &size_l);
datum_beta = (SEG *) DatumGetPointer(entryvec->vector[seed_2].key);
datum_r = seg_union(datum_beta, datum_beta);
rt_seg_size(datum_r, &size_r);
/*
* Emit segments to the left output page, and compute its bounding box.
*/
datum_l = (SEG *) palloc(sizeof(SEG));
memcpy(datum_l, sort_items[0].data, sizeof(SEG));
*left++ = sort_items[0].index;
v->spl_nleft++;
for (i = 1; i < firstright; i++)
{
datum_l = seg_union(datum_l, sort_items[i].data);
*left++ = sort_items[i].index;
v->spl_nleft++;
}
/*
* Now split up the regions between the two seeds. An important property
* of this split algorithm is that the split vector v has the indices of
* items to be split in order in its left and right vectors. We exploit
* this property by doing a merge in the code that actually splits the
* page.
*
* For efficiency, we also place the new index tuple in this loop. This is
* handled at the very end, when we have placed all the existing tuples
* and i == maxoff + 1.
* Likewise for the right page.
*/
maxoff = OffsetNumberNext(maxoff);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
datum_r = (SEG *) palloc(sizeof(SEG));
memcpy(datum_r, sort_items[firstright].data, sizeof(SEG));
*right++ = sort_items[firstright].index;
v->spl_nright++;
for (i = firstright + 1; i < maxoff; i++)
{
/*
* If we've already decided where to place this item, just put it on
* the right list. Otherwise, we need to figure out which page needs
* the least enlargement in order to store the item.
*/
if (i == seed_1)
{
*left++ = i;
v->spl_nleft++;
continue;
}
else if (i == seed_2)
{
*right++ = i;
v->spl_nright++;
continue;
}
/* okay, which page needs least enlargement? */
datum_alpha = (SEG *) DatumGetPointer(entryvec->vector[i].key);
union_dl = seg_union(datum_l, datum_alpha);
union_dr = seg_union(datum_r, datum_alpha);
rt_seg_size(union_dl, &size_alpha);
rt_seg_size(union_dr, &size_beta);
/* pick which page to add it to */
if (size_alpha - size_l < size_beta - size_r)
{
datum_l = union_dl;
size_l = size_alpha;
*left++ = i;
v->spl_nleft++;
}
else
{
datum_r = union_dr;
size_r = size_alpha;
*right++ = i;
v->spl_nright++;
}
datum_r = seg_union(datum_r, sort_items[i].data);
*right++ = sort_items[i].index;
v->spl_nright++;
}
*left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */
v->spl_ldatum = PointerGetDatum(datum_l);
v->spl_rdatum = PointerGetDatum(datum_r);