*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/rtree/Attic/rtree.c,v 1.59 2001/01/29 00:39:15 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/rtree/Attic/rtree.c,v 1.60 2001/03/07 21:20:26 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "access/genam.h"
#include "access/heapam.h"
#include "access/rtree.h"
+#include "access/xlogutils.h"
#include "catalog/index.h"
#include "executor/executor.h"
#include "miscadmin.h"
-#include "access/xlogutils.h"
+/*
+ * XXX We assume that all datatypes indexable in rtrees are pass-by-reference.
+ * To fix this, you'd need to improve the IndexTupleGetDatum() macro, and
+ * do something with the various datum-pfreeing code. However, it's not that
+ * unreasonable an assumption in practice.
+ */
+#define IndexTupleGetDatum(itup) \
+ PointerGetDatum(((char *) (itup)) + sizeof(IndexTupleData))
+
+/*
+ * Space-allocation macros. Note we count the item's line pointer in its size.
+ */
+#define RTPageAvailSpace \
+ (BLCKSZ - (sizeof(PageHeaderData) - sizeof(ItemIdData)) \
+ - MAXALIGN(sizeof(RTreePageOpaqueData)))
+#define IndexTupleTotalSize(itup) \
+ (MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData))
+#define IndexTupleAttSize(itup) \
+ (IndexTupleSize(itup) - sizeof(IndexTupleData))
+
+/* results of rtpicksplit() */
typedef struct SPLITVEC
{
OffsetNumber *spl_left;
int spl_nleft;
- char *spl_ldatum;
+ Datum spl_ldatum;
OffsetNumber *spl_right;
int spl_nright;
- char *spl_rdatum;
+ Datum spl_rdatum;
} SPLITVEC;
typedef struct RTSTATE
/* non-export function prototypes */
static InsertIndexResult rtdoinsert(Relation r, IndexTuple itup,
RTSTATE *rtstate);
-static void rttighten(Relation r, RTSTACK *stk, char *datum, int att_size,
+static void rttighten(Relation r, RTSTACK *stk, Datum datum, int att_size,
RTSTATE *rtstate);
-static InsertIndexResult dosplit(Relation r, Buffer buffer, RTSTACK *stack,
+static InsertIndexResult rtdosplit(Relation r, Buffer buffer, RTSTACK *stack,
IndexTuple itup, RTSTATE *rtstate);
static void rtintinsert(Relation r, RTSTACK *stk, IndexTuple ltup,
IndexTuple rtup, RTSTATE *rtstate);
static void rtnewroot(Relation r, IndexTuple lt, IndexTuple rt);
-static void picksplit(Relation r, Page page, SPLITVEC *v, IndexTuple itup,
+static void rtpicksplit(Relation r, Page page, SPLITVEC *v, IndexTuple itup,
RTSTATE *rtstate);
static void RTInitBuffer(Buffer b, uint32 f);
static OffsetNumber choose(Relation r, Page p, IndexTuple it,
RTSTACK *stack;
InsertIndexResult res;
RTreePageOpaque opaque;
- char *datum;
+ Datum datum;
blk = P_ROOT;
buffer = InvalidBuffer;
if (nospace(page, itup))
{
/* need to do a split */
- res = dosplit(r, buffer, stack, itup, rtstate);
+ res = rtdosplit(r, buffer, stack, itup, rtstate);
freestack(stack);
WriteBuffer(buffer); /* don't forget to release buffer! */
return res;
OffsetNumberNext(PageGetMaxOffsetNumber(page)),
LP_USED);
}
+ if (l == InvalidOffsetNumber)
+ elog(ERROR, "rtdoinsert: failed to add index item to %s",
+ RelationGetRelationName(r));
WriteBuffer(buffer);
- datum = (((char *) itup) + sizeof(IndexTupleData));
+ datum = IndexTupleGetDatum(itup);
/* now expand the page boundary in the parent to include the new child */
- rttighten(r, stack, datum,
- (IndexTupleSize(itup) - sizeof(IndexTupleData)), rtstate);
+ rttighten(r, stack, datum, IndexTupleAttSize(itup), rtstate);
freestack(stack);
/* build and return an InsertIndexResult for this insertion */
static void
rttighten(Relation r,
RTSTACK *stk,
- char *datum,
+ Datum datum,
int att_size,
RTSTATE *rtstate)
{
- char *oldud;
- char *tdatum;
+ Datum oldud;
+ Datum tdatum;
Page p;
float old_size,
newd_size;
b = ReadBuffer(r, stk->rts_blk);
p = BufferGetPage(b);
- oldud = (char *) PageGetItem(p, PageGetItemId(p, stk->rts_child));
- oldud += sizeof(IndexTupleData);
+ oldud = IndexTupleGetDatum(PageGetItem(p,
+ PageGetItemId(p, stk->rts_child)));
- FunctionCall2(&rtstate->sizeFn,
- PointerGetDatum(oldud),
+ FunctionCall2(&rtstate->sizeFn, oldud,
PointerGetDatum(&old_size));
- datum = (char *)
- DatumGetPointer(FunctionCall2(&rtstate->unionFn,
- PointerGetDatum(oldud),
- PointerGetDatum(datum)));
+ datum = FunctionCall2(&rtstate->unionFn, oldud, datum);
- FunctionCall2(&rtstate->sizeFn,
- PointerGetDatum(datum),
+ FunctionCall2(&rtstate->sizeFn, datum,
PointerGetDatum(&newd_size));
if (newd_size != old_size)
* This is an internal page, so 'oldud' had better be a union
* (constant-length) key, too. (See comment below.)
*/
- Assert(VARSIZE(datum) == VARSIZE(oldud));
- memmove(oldud, datum, VARSIZE(datum));
+ Assert(VARSIZE(DatumGetPointer(datum)) ==
+ VARSIZE(DatumGetPointer(oldud)));
+ memmove(DatumGetPointer(oldud), DatumGetPointer(datum),
+ VARSIZE(DatumGetPointer(datum)));
}
else
- memmove(oldud, datum, att_size);
+ {
+ memmove(DatumGetPointer(oldud), DatumGetPointer(datum),
+ att_size);
+ }
WriteBuffer(b);
/*
* The user may be defining an index on variable-sized data (like
* polygons). If so, we need to get a constant-sized datum for
* insertion on the internal page. We do this by calling the
- * union proc, which is guaranteed to return a rectangle.
+ * union proc, which is required to return a rectangle.
*/
+ tdatum = FunctionCall2(&rtstate->unionFn, datum, datum);
- tdatum = (char *)
- DatumGetPointer(FunctionCall2(&rtstate->unionFn,
- PointerGetDatum(datum),
- PointerGetDatum(datum)));
rttighten(r, stk->rts_parent, tdatum, att_size, rtstate);
- pfree(tdatum);
+ pfree(DatumGetPointer(tdatum));
}
else
ReleaseBuffer(b);
- pfree(datum);
+ pfree(DatumGetPointer(datum));
}
/*
- * dosplit -- split a page in the tree.
+ * rtdosplit -- split a page in the tree.
*
- * This is the quadratic-cost split algorithm Guttman describes in
- * his paper. The reason we chose it is that you can implement this
- * with less information about the data types on which you're operating.
+ * rtpicksplit does the interesting work of choosing the split.
+ * This routine just does the bit-pushing.
*/
static InsertIndexResult
-dosplit(Relation r,
- Buffer buffer,
- RTSTACK *stack,
- IndexTuple itup,
- RTSTATE *rtstate)
+rtdosplit(Relation r,
+ Buffer buffer,
+ RTSTACK *stack,
+ IndexTuple itup,
+ RTSTATE *rtstate)
{
Page p;
Buffer leftbuf,
InsertIndexResult res;
char *isnull;
SPLITVEC v;
+ OffsetNumber *spl_left,
+ *spl_right;
TupleDesc tupDesc;
- isnull = (char *) palloc(r->rd_rel->relnatts);
- for (blank = 0; blank < r->rd_rel->relnatts; blank++)
- isnull[blank] = ' ';
p = (Page) BufferGetPage(buffer);
opaque = (RTreePageOpaque) PageGetSpecialPointer(p);
+ rtpicksplit(r, p, &v, itup, rtstate);
+
/*
* The root of the tree is the first block in the relation. If we're
* about to split the root, we need to do some hocus-pocus to enforce
rbknum = BufferGetBlockNumber(rightbuf);
right = (Page) BufferGetPage(rightbuf);
- picksplit(r, p, &v, itup, rtstate);
-
+ spl_left = v.spl_left;
+ spl_right = v.spl_right;
leftoff = rightoff = FirstOffsetNumber;
maxoff = PageGetMaxOffsetNumber(p);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
itemid = PageGetItemId(p, i);
item = (IndexTuple) PageGetItem(p, itemid);
- if (i == *(v.spl_left))
+ if (i == *spl_left)
{
- PageAddItem(left, (Item) item, IndexTupleSize(item),
- leftoff, LP_USED);
+ if (PageAddItem(left, (Item) item, IndexTupleSize(item),
+ leftoff, LP_USED) == InvalidOffsetNumber)
+ elog(ERROR, "rtdosplit: failed to copy index item in %s",
+ RelationGetRelationName(r));
leftoff = OffsetNumberNext(leftoff);
- v.spl_left++; /* advance in left split vector */
+ spl_left++; /* advance in left split vector */
}
else
{
- PageAddItem(right, (Item) item, IndexTupleSize(item),
- rightoff, LP_USED);
+ Assert(i == *spl_right);
+ if (PageAddItem(right, (Item) item, IndexTupleSize(item),
+ rightoff, LP_USED) == InvalidOffsetNumber)
+ elog(ERROR, "rtdosplit: failed to copy index item in %s",
+ RelationGetRelationName(r));
rightoff = OffsetNumberNext(rightoff);
- v.spl_right++; /* advance in right split vector */
+ spl_right++; /* advance in right split vector */
}
}
res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
/* now insert the new index tuple */
- if (*(v.spl_left) != FirstOffsetNumber)
+ if (*spl_left == maxoff+1)
{
- PageAddItem(left, (Item) itup, IndexTupleSize(itup),
- leftoff, LP_USED);
+ if (PageAddItem(left, (Item) itup, IndexTupleSize(itup),
+ leftoff, LP_USED) == InvalidOffsetNumber)
+ elog(ERROR, "rtdosplit: failed to add index item to %s",
+ RelationGetRelationName(r));
leftoff = OffsetNumberNext(leftoff);
ItemPointerSet(&(res->pointerData), lbknum, leftoff);
+ spl_left++;
}
else
{
- PageAddItem(right, (Item) itup, IndexTupleSize(itup),
- rightoff, LP_USED);
+ Assert(*spl_right == maxoff+1);
+ if (PageAddItem(right, (Item) itup, IndexTupleSize(itup),
+ rightoff, LP_USED) == InvalidOffsetNumber)
+ elog(ERROR, "rtdosplit: failed to add index item to %s",
+ RelationGetRelationName(r));
rightoff = OffsetNumberNext(rightoff);
ItemPointerSet(&(res->pointerData), rbknum, rightoff);
+ spl_right++;
}
+ /* Make sure we consumed all of the split vectors, and release 'em */
+ Assert(*spl_left == InvalidOffsetNumber);
+ Assert(*spl_right == InvalidOffsetNumber);
+ pfree(v.spl_left);
+ pfree(v.spl_right);
+
if ((bufblock = BufferGetBlockNumber(buffer)) != P_ROOT)
PageRestoreTempPage(left, p);
WriteBuffer(leftbuf);
rtadjscans(r, RTOP_SPLIT, bufblock, FirstOffsetNumber);
tupDesc = r->rd_att;
+ isnull = (char *) palloc(r->rd_rel->relnatts);
+ for (blank = 0; blank < r->rd_rel->relnatts; blank++)
+ isnull[blank] = ' ';
+
ltup = (IndexTuple) index_formtuple(tupDesc,
- (Datum *) &(v.spl_ldatum), isnull);
+ &(v.spl_ldatum), isnull);
rtup = (IndexTuple) index_formtuple(tupDesc,
- (Datum *) &(v.spl_rdatum), isnull);
+ &(v.spl_rdatum), isnull);
pfree(isnull);
/* set pointers to new child pages in the internal index tuples */
IndexTuple old;
Buffer b;
Page p;
- char *ldatum,
- *rdatum,
- *newdatum;
+ Datum ldatum,
+ rdatum,
+ newdatum;
InsertIndexResult res;
if (stk == (RTSTACK *) NULL)
old = (IndexTuple) PageGetItem(p, PageGetItemId(p, stk->rts_child));
/*
- * This is a hack. Right now, we force rtree keys to be constant
+ * This is a hack. Right now, we force rtree internal keys to be constant
* size. To fix this, need delete the old key and add both left and
* right for the two new pages. The insertion of left may force a
* split if the new left key is bigger than the old key.
if (nospace(p, rtup))
{
- newdatum = (((char *) ltup) + sizeof(IndexTupleData));
+ newdatum = IndexTupleGetDatum(ltup);
rttighten(r, stk->rts_parent, newdatum,
- (IndexTupleSize(ltup) - sizeof(IndexTupleData)), rtstate);
- res = dosplit(r, b, stk->rts_parent, rtup, rtstate);
+ IndexTupleAttSize(ltup), rtstate);
+ res = rtdosplit(r, b, stk->rts_parent, rtup, rtstate);
WriteBuffer(b); /* don't forget to release buffer! -
* 01/31/94 */
pfree(res);
}
else
{
- PageAddItem(p, (Item) rtup, IndexTupleSize(rtup),
- PageGetMaxOffsetNumber(p), LP_USED);
+ if (PageAddItem(p, (Item) rtup, IndexTupleSize(rtup),
+ PageGetMaxOffsetNumber(p),
+ LP_USED) == InvalidOffsetNumber)
+ elog(ERROR, "rtintinsert: failed to add index item to %s",
+ RelationGetRelationName(r));
WriteBuffer(b);
- ldatum = (((char *) ltup) + sizeof(IndexTupleData));
- rdatum = (((char *) rtup) + sizeof(IndexTupleData));
- newdatum = (char *)
- DatumGetPointer(FunctionCall2(&rtstate->unionFn,
- PointerGetDatum(ldatum),
- PointerGetDatum(rdatum)));
+ ldatum = IndexTupleGetDatum(ltup);
+ rdatum = IndexTupleGetDatum(rtup);
+ newdatum = FunctionCall2(&rtstate->unionFn, ldatum, rdatum);
rttighten(r, stk->rts_parent, newdatum,
- (IndexTupleSize(rtup) - sizeof(IndexTupleData)), rtstate);
+ IndexTupleAttSize(rtup), rtstate);
- pfree(newdatum);
+ pfree(DatumGetPointer(newdatum));
}
}
b = ReadBuffer(r, P_ROOT);
RTInitBuffer(b, 0);
p = BufferGetPage(b);
- PageAddItem(p, (Item) lt, IndexTupleSize(lt),
- FirstOffsetNumber, LP_USED);
- PageAddItem(p, (Item) rt, IndexTupleSize(rt),
- OffsetNumberNext(FirstOffsetNumber), LP_USED);
+ if (PageAddItem(p, (Item) lt, IndexTupleSize(lt),
+ FirstOffsetNumber,
+ LP_USED) == InvalidOffsetNumber)
+ elog(ERROR, "rtnewroot: failed to add index item to %s",
+ RelationGetRelationName(r));
+ if (PageAddItem(p, (Item) rt, IndexTupleSize(rt),
+ OffsetNumberNext(FirstOffsetNumber),
+ LP_USED) == InvalidOffsetNumber)
+ elog(ERROR, "rtnewroot: failed to add index item to %s",
+ RelationGetRelationName(r));
WriteBuffer(b);
}
+/*
+ * Choose how to split an rtree page into two pages.
+ *
+ * We return two vectors of index item numbers, one for the items to be
+ * put on the left page, one for the items to be put on the right page.
+ * In addition, the item to be added (itup) is listed in the appropriate
+ * vector. It is represented by item number N+1 (N = # of items on page).
+ *
+ * Both vectors appear in sequence order with a terminating sentinel value
+ * of InvalidOffsetNumber.
+ *
+ * The bounding-box datums for the two new pages are also returned in *v.
+ *
+ * This is the quadratic-cost split algorithm Guttman describes in
+ * his paper. The reason we chose it is that you can implement this
+ * with less information about the data types on which you're operating.
+ *
+ * We must also deal with a consideration not found in Guttman's algorithm:
+ * variable-length data. In particular, the incoming item might be
+ * large enough that not just any split will work. In the worst case,
+ * our "split" may have to be the new item on one page and all the existing
+ * items on the other. Short of that, we have to take care that we do not
+ * make a split that leaves both pages too full for the new item.
+ */
static void
-picksplit(Relation r,
- Page page,
- SPLITVEC *v,
- IndexTuple itup,
- RTSTATE *rtstate)
+rtpicksplit(Relation r,
+ Page page,
+ SPLITVEC *v,
+ IndexTuple itup,
+ RTSTATE *rtstate)
{
- OffsetNumber maxoff;
+ OffsetNumber maxoff,
+ newitemoff;
OffsetNumber i,
j;
IndexTuple item_1,
item_2;
- char *datum_alpha,
- *datum_beta;
- char *datum_l,
- *datum_r;
- char *union_d,
- *union_dl,
- *union_dr;
- char *inter_d;
+ Datum datum_alpha,
+ datum_beta;
+ Datum datum_l,
+ datum_r;
+ Datum union_d,
+ union_dl,
+ union_dr;
+ Datum inter_d;
bool firsttime;
float size_alpha,
size_beta,
seed_2 = 0;
OffsetNumber *left,
*right;
+ Size newitemsz,
+ item_1_sz,
+ item_2_sz,
+ left_avail_space,
+ right_avail_space;
+
+ /*
+ * First, make sure the new item is not so large that we can't possibly
+ * fit it on a page, even by itself. (It's sufficient to make this test
+ * here, since any oversize tuple must lead to a page split attempt.)
+ */
+ newitemsz = IndexTupleTotalSize(itup);
+ if (newitemsz > RTPageAvailSpace)
+ elog(ERROR, "rtree: index item size %lu exceeds maximum %lu",
+ (unsigned long) newitemsz, (unsigned long) RTPageAvailSpace);
maxoff = PageGetMaxOffsetNumber(page);
+ newitemoff = OffsetNumberNext(maxoff); /* phony index for new item */
+ /* Make arrays big enough for worst case, including sentinel */
nbytes = (maxoff + 2) * sizeof(OffsetNumber);
v->spl_left = (OffsetNumber *) palloc(nbytes);
v->spl_right = (OffsetNumber *) palloc(nbytes);
for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i))
{
item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
- datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
+ datum_alpha = IndexTupleGetDatum(item_1);
+ item_1_sz = IndexTupleTotalSize(item_1);
+
for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j))
{
item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, j));
- datum_beta = ((char *) item_2) + sizeof(IndexTupleData);
+ datum_beta = IndexTupleGetDatum(item_2);
+ item_2_sz = IndexTupleTotalSize(item_2);
+
+ /*
+ * Ignore seed pairs that don't leave room for the new item
+ * on either split page.
+ */
+ if (newitemsz + item_1_sz > RTPageAvailSpace &&
+ newitemsz + item_2_sz > RTPageAvailSpace)
+ continue;
/* compute the wasted space by unioning these guys */
- union_d = (char *)
- DatumGetPointer(FunctionCall2(&rtstate->unionFn,
- PointerGetDatum(datum_alpha),
- PointerGetDatum(datum_beta)));
- FunctionCall2(&rtstate->sizeFn,
- PointerGetDatum(union_d),
+ union_d = FunctionCall2(&rtstate->unionFn,
+ datum_alpha, datum_beta);
+ FunctionCall2(&rtstate->sizeFn, union_d,
PointerGetDatum(&size_union));
- inter_d = (char *)
- DatumGetPointer(FunctionCall2(&rtstate->interFn,
- PointerGetDatum(datum_alpha),
- PointerGetDatum(datum_beta)));
+ inter_d = FunctionCall2(&rtstate->interFn,
+ datum_alpha, datum_beta);
/* The interFn may return a NULL pointer (not an SQL null!)
* to indicate no intersection. sizeFn must cope with this.
*/
- FunctionCall2(&rtstate->sizeFn,
- PointerGetDatum(inter_d),
+ FunctionCall2(&rtstate->sizeFn, inter_d,
PointerGetDatum(&size_inter));
size_waste = size_union - size_inter;
- if (union_d != (char *) NULL)
- pfree(union_d);
- if (inter_d != (char *) NULL)
- pfree(inter_d);
+ if (DatumGetPointer(union_d) != NULL)
+ pfree(DatumGetPointer(union_d));
+ if (DatumGetPointer(inter_d) != NULL)
+ pfree(DatumGetPointer(inter_d));
/*
* are these a more promising split that what we've already
* seen?
*/
-
if (size_waste > waste || firsttime)
{
waste = size_waste;
}
}
- left = v->spl_left;
- v->spl_nleft = 0;
- right = v->spl_right;
- v->spl_nright = 0;
+ if (firsttime)
+ {
+ /*
+ * There is no possible split except to put the new item on its
+ * own page. Since we still have to compute the union rectangles,
+ * we play dumb and run through the split algorithm anyway,
+ * setting seed_1 = first item on page and seed_2 = new item.
+ */
+ seed_1 = FirstOffsetNumber;
+ seed_2 = newitemoff;
+ }
item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_1));
- datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
- datum_l = (char *)
- DatumGetPointer(FunctionCall2(&rtstate->unionFn,
- PointerGetDatum(datum_alpha),
- PointerGetDatum(datum_alpha)));
- FunctionCall2(&rtstate->sizeFn,
- PointerGetDatum(datum_l),
- PointerGetDatum(&size_l));
- item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_2));
- datum_beta = ((char *) item_2) + sizeof(IndexTupleData);
- datum_r = (char *)
- DatumGetPointer(FunctionCall2(&rtstate->unionFn,
- PointerGetDatum(datum_beta),
- PointerGetDatum(datum_beta)));
- FunctionCall2(&rtstate->sizeFn,
- PointerGetDatum(datum_r),
- PointerGetDatum(&size_r));
+ datum_alpha = IndexTupleGetDatum(item_1);
+ datum_l = FunctionCall2(&rtstate->unionFn, datum_alpha, datum_alpha);
+ FunctionCall2(&rtstate->sizeFn, datum_l, PointerGetDatum(&size_l));
+ left_avail_space = RTPageAvailSpace - IndexTupleTotalSize(item_1);
+
+ if (seed_2 == newitemoff)
+ {
+ item_2 = itup;
+ /* Needn't leave room for new item in calculations below */
+ newitemsz = 0;
+ }
+ else
+ item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_2));
+ datum_beta = IndexTupleGetDatum(item_2);
+ datum_r = FunctionCall2(&rtstate->unionFn, datum_beta, datum_beta);
+ FunctionCall2(&rtstate->sizeFn, datum_r, PointerGetDatum(&size_r));
+ right_avail_space = RTPageAvailSpace - IndexTupleTotalSize(item_2);
/*
* Now split up the regions between the two seeds. An important
* is handled at the very end, when we have placed all the existing
* tuples and i == maxoff + 1.
*/
+ left = v->spl_left;
+ v->spl_nleft = 0;
+ right = v->spl_right;
+ v->spl_nright = 0;
- maxoff = OffsetNumberNext(maxoff);
- for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ for (i = FirstOffsetNumber; i <= newitemoff; i = OffsetNumberNext(i))
{
+ bool left_feasible,
+ right_feasible,
+ choose_left;
/*
* If we've already decided where to place this item, just put it
- * on the right list. Otherwise, we need to figure out which page
+ * on the correct list. Otherwise, we need to figure out which page
* needs the least enlargement in order to store the item.
*/
{
*left++ = i;
v->spl_nleft++;
+ /* left avail_space & union already includes this one */
continue;
}
- else if (i == seed_2)
+ if (i == seed_2)
{
*right++ = i;
v->spl_nright++;
+ /* right avail_space & union already includes this one */
continue;
}
- /* okay, which page needs least enlargement? */
- if (i == maxoff)
+ /* Compute new union datums and sizes for both possible additions */
+ if (i == newitemoff)
+ {
item_1 = itup;
+ /* Needn't leave room for new item anymore */
+ newitemsz = 0;
+ }
else
item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+ item_1_sz = IndexTupleTotalSize(item_1);
- datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
- union_dl = (char *)
- DatumGetPointer(FunctionCall2(&rtstate->unionFn,
- PointerGetDatum(datum_l),
- PointerGetDatum(datum_alpha)));
- union_dr = (char *)
- DatumGetPointer(FunctionCall2(&rtstate->unionFn,
- PointerGetDatum(datum_r),
- PointerGetDatum(datum_alpha)));
- FunctionCall2(&rtstate->sizeFn,
- PointerGetDatum(union_dl),
+ datum_alpha = IndexTupleGetDatum(item_1);
+ union_dl = FunctionCall2(&rtstate->unionFn, datum_l, datum_alpha);
+ union_dr = FunctionCall2(&rtstate->unionFn, datum_r, datum_alpha);
+ FunctionCall2(&rtstate->sizeFn, union_dl,
PointerGetDatum(&size_alpha));
- FunctionCall2(&rtstate->sizeFn,
- PointerGetDatum(union_dr),
+ FunctionCall2(&rtstate->sizeFn, union_dr,
PointerGetDatum(&size_beta));
- /* pick which page to add it to */
- if (size_alpha - size_l < size_beta - size_r)
+ /*
+ * We prefer the page that shows smaller enlargement of its union area
+ * (Guttman's algorithm), but we must take care that at least one page
+ * will still have room for the new item after this one is added.
+ *
+ * (We know that all the old items together can fit on one page,
+ * so we need not worry about any other problem than failing to fit
+ * the new item.)
+ */
+ left_feasible = (left_avail_space >= item_1_sz &&
+ ((left_avail_space - item_1_sz) >= newitemsz ||
+ right_avail_space >= newitemsz));
+ right_feasible = (right_avail_space >= item_1_sz &&
+ ((right_avail_space - item_1_sz) >= newitemsz ||
+ left_avail_space >= newitemsz));
+ if (left_feasible && right_feasible)
+ {
+ /* Both feasible, use Guttman's algorithm */
+ choose_left = (size_alpha - size_l < size_beta - size_r);
+ }
+ else if (left_feasible)
+ choose_left = true;
+ else if (right_feasible)
+ choose_left = false;
+ else
{
- pfree(datum_l);
- pfree(union_dr);
+ elog(ERROR, "rtpicksplit: failed to find a workable page split");
+ choose_left = false; /* keep compiler quiet */
+ }
+
+ if (choose_left)
+ {
+ pfree(DatumGetPointer(datum_l));
+ pfree(DatumGetPointer(union_dr));
datum_l = union_dl;
size_l = size_alpha;
+ left_avail_space -= item_1_sz;
*left++ = i;
v->spl_nleft++;
}
else
{
- pfree(datum_r);
- pfree(union_dl);
+ pfree(DatumGetPointer(datum_r));
+ pfree(DatumGetPointer(union_dl));
datum_r = union_dr;
size_r = size_beta;
+ right_avail_space -= item_1_sz;
*right++ = i;
v->spl_nright++;
}
}
- *left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */
+
+ *left = *right = InvalidOffsetNumber; /* add ending sentinels */
v->spl_ldatum = datum_l;
v->spl_rdatum = datum_r;
{
OffsetNumber maxoff;
OffsetNumber i;
- char *ud,
- *id;
- char *datum;
+ Datum ud,
+ id;
+ Datum datum;
float usize,
dsize;
OffsetNumber which;
float which_grow;
- id = ((char *) it) + sizeof(IndexTupleData);
+ id = IndexTupleGetDatum(it);
maxoff = PageGetMaxOffsetNumber(p);
which_grow = -1.0;
which = -1;
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
- datum = (char *) PageGetItem(p, PageGetItemId(p, i));
- datum += sizeof(IndexTupleData);
- FunctionCall2(&rtstate->sizeFn,
- PointerGetDatum(datum),
+ datum = IndexTupleGetDatum(PageGetItem(p, PageGetItemId(p, i)));
+ FunctionCall2(&rtstate->sizeFn, datum,
PointerGetDatum(&dsize));
- ud = (char *)
- DatumGetPointer(FunctionCall2(&rtstate->unionFn,
- PointerGetDatum(datum),
- PointerGetDatum(id)));
- FunctionCall2(&rtstate->sizeFn,
- PointerGetDatum(ud),
+ ud = FunctionCall2(&rtstate->unionFn, datum, id);
+ FunctionCall2(&rtstate->sizeFn, ud,
PointerGetDatum(&usize));
- pfree(ud);
+ pfree(DatumGetPointer(ud));
if (which_grow < 0 || usize - dsize < which_grow)
{
which = i;
IndexTuple itup;
BlockNumber itblkno;
OffsetNumber itoffno;
- char *datum;
+ Datum datum;
char *itkey;
nblocks = RelationGetNumberOfBlocks(r);
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
itblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
itoffno = ItemPointerGetOffsetNumber(&(itup->t_tid));
- datum = ((char *) itup);
- datum += sizeof(IndexTupleData);
+ datum = IndexTupleGetDatum(itup);
itkey = DatumGetCString(DirectFunctionCall1(box_out,
- PointerGetDatum(datum)));
+ datum));
printf("\t[%d] size %d heap <%d,%d> key:%s\n",
offnum, IndexTupleSize(itup), itblkno, itoffno, itkey);
pfree(itkey);