]> granicus.if.org Git - postgresql/commitdiff
Repair a number of places that didn't bother to check whether PageAddItem
authorTom Lane <tgl@sss.pgh.pa.us>
Wed, 7 Mar 2001 21:20:26 +0000 (21:20 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Wed, 7 Mar 2001 21:20:26 +0000 (21:20 +0000)
succeeds or not.  Revise rtree page split algorithm to take care about
making a feasible split --- ie, will the incoming tuple actually fit?
Failure to make a feasible split, combined with failure to notice the
failure, account for Jim Stone's recent bug report.  I suspect that
hash and gist indices may have the same type of bug, but at least now
we'll get error messages rather than silent failures if so.  Also clean
up rtree code to use Datum rather than char* where appropriate.

src/backend/access/gist/gist.c
src/backend/access/hash/hashinsert.c
src/backend/access/hash/hashovfl.c
src/backend/access/hash/hashpage.c
src/backend/access/rtree/rtree.c
src/backend/commands/sequence.c

index c7bfa9c9626396ec991b759be083dde07f486414..9e3f935bd67985239881ba1db0d0b0bb4d9b89d2 100644 (file)
@@ -6,7 +6,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.70 2001/02/22 21:48:48 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.71 2001/03/07 21:20:26 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -395,6 +395,9 @@ gistPageAddItem(GISTSTATE *giststate,
        *newtup = gist_tuple_replacekey(r, tmpcentry, itup);
        retval = PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup),
                                                offsetNumber, flags);
+       if (retval == InvalidOffsetNumber)
+               elog(ERROR, "gist: failed to add index item to %s",
+                        RelationGetRelationName(r));
        /* be tidy */
        if (tmpcentry.pred && tmpcentry.pred != dentry->pred
                && tmpcentry.pred != (((char *) itup) + sizeof(IndexTupleData)))
index e8e750739af4dc9618b9e211ce05732add80ed22..5439dce2148366166d9a56dec40bca4240ac2808 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashinsert.c,v 1.21 2001/01/24 19:42:47 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashinsert.c,v 1.22 2001/03/07 21:20:26 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -230,7 +230,10 @@ _hash_pgaddtup(Relation rel,
        _hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
 
        itup_off = OffsetNumberNext(PageGetMaxOffsetNumber(page));
-       PageAddItem(page, (Item) hitem, itemsize, itup_off, LP_USED);
+       if (PageAddItem(page, (Item) hitem, itemsize, itup_off, LP_USED)
+               == InvalidOffsetNumber)
+               elog(ERROR, "_hash_pgaddtup: failed to add index item to %s",
+                        RelationGetRelationName(rel));
 
        /* write the buffer, but hold our lock */
        _hash_wrtnorelbuf(rel, buf);
index cd129e6cb796aeb0082817db925093d144d732d1..8e2ed1bb8afc0a5c9b2302c59ed359ce7a90647f 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashovfl.c,v 1.28 2001/01/24 19:42:47 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashovfl.c,v 1.29 2001/03/07 21:20:26 tgl Exp $
  *
  * NOTES
  *       Overflow pages look like ordinary relation pages.
@@ -564,7 +564,10 @@ _hash_squeezebucket(Relation rel,
                 * page.
                 */
                woffnum = OffsetNumberNext(PageGetMaxOffsetNumber(wpage));
-               PageAddItem(wpage, (Item) hitem, itemsz, woffnum, LP_USED);
+               if (PageAddItem(wpage, (Item) hitem, itemsz, woffnum, LP_USED)
+                       == InvalidOffsetNumber)
+                       elog(ERROR, "_hash_squeezebucket: failed to add index item to %s",
+                                RelationGetRelationName(rel));
 
                /*
                 * delete the tuple from the "read" page. PageIndexTupleDelete
index 4fb2cf82b083d2fb979af49e046d6502290ec757..156db9078a83763d58eb310c9f1b8e7040c97119 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.29 2001/01/24 19:42:47 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.30 2001/03/07 21:20:26 tgl Exp $
  *
  * NOTES
  *       Postgres hash pages look like ordinary relation pages.  The opaque
@@ -619,7 +619,10 @@ _hash_splitpage(Relation rel,
                        }
 
                        noffnum = OffsetNumberNext(PageGetMaxOffsetNumber(npage));
-                       PageAddItem(npage, (Item) hitem, itemsz, noffnum, LP_USED);
+                       if (PageAddItem(npage, (Item) hitem, itemsz, noffnum, LP_USED)
+                               == InvalidOffsetNumber)
+                               elog(ERROR, "_hash_splitpage: failed to add index item to %s",
+                                        RelationGetRelationName(rel));
                        _hash_wrtnorelbuf(rel, nbuf);
 
                        /*
index 9f3948657c9af06cf867c892edb8f968c1e7447e..45382d5ef3cd441dce716e91e109dd0d2a1f5a07 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/rtree/Attic/rtree.c,v 1.59 2001/01/29 00:39:15 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/rtree/Attic/rtree.c,v 1.60 2001/03/07 21:20:26 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/rtree.h"
+#include "access/xlogutils.h"
 #include "catalog/index.h"
 #include "executor/executor.h"
 #include "miscadmin.h"
 
-#include "access/xlogutils.h"
 
+/*
+ * XXX We assume that all datatypes indexable in rtrees are pass-by-reference.
+ * To fix this, you'd need to improve the IndexTupleGetDatum() macro, and
+ * do something with the various datum-pfreeing code.  However, it's not that
+ * unreasonable an assumption in practice.
+ */
+#define IndexTupleGetDatum(itup)  \
+       PointerGetDatum(((char *) (itup)) + sizeof(IndexTupleData))
+
+/*
+ * Space-allocation macros.  Note we count the item's line pointer in its size.
+ */
+#define RTPageAvailSpace  \
+       (BLCKSZ - (sizeof(PageHeaderData) - sizeof(ItemIdData)) \
+        - MAXALIGN(sizeof(RTreePageOpaqueData)))
+#define IndexTupleTotalSize(itup)  \
+       (MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData))
+#define IndexTupleAttSize(itup)  \
+       (IndexTupleSize(itup) - sizeof(IndexTupleData))
+
+/* results of rtpicksplit() */
 typedef struct SPLITVEC
 {
        OffsetNumber *spl_left;
        int                     spl_nleft;
-       char       *spl_ldatum;
+       Datum           spl_ldatum;
        OffsetNumber *spl_right;
        int                     spl_nright;
-       char       *spl_rdatum;
+       Datum           spl_rdatum;
 } SPLITVEC;
 
 typedef struct RTSTATE
@@ -44,14 +65,14 @@ typedef struct RTSTATE
 /* non-export function prototypes */
 static InsertIndexResult rtdoinsert(Relation r, IndexTuple itup,
                   RTSTATE *rtstate);
-static void rttighten(Relation r, RTSTACK *stk, char *datum, int att_size,
+static void rttighten(Relation r, RTSTACK *stk, Datum datum, int att_size,
                  RTSTATE *rtstate);
-static InsertIndexResult dosplit(Relation r, Buffer buffer, RTSTACK *stack,
+static InsertIndexResult rtdosplit(Relation r, Buffer buffer, RTSTACK *stack,
                IndexTuple itup, RTSTATE *rtstate);
 static void rtintinsert(Relation r, RTSTACK *stk, IndexTuple ltup,
                        IndexTuple rtup, RTSTATE *rtstate);
 static void rtnewroot(Relation r, IndexTuple lt, IndexTuple rt);
-static void picksplit(Relation r, Page page, SPLITVEC *v, IndexTuple itup,
+static void rtpicksplit(Relation r, Page page, SPLITVEC *v, IndexTuple itup,
                  RTSTATE *rtstate);
 static void RTInitBuffer(Buffer b, uint32 f);
 static OffsetNumber choose(Relation r, Page p, IndexTuple it,
@@ -295,7 +316,7 @@ rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate)
        RTSTACK    *stack;
        InsertIndexResult res;
        RTreePageOpaque opaque;
-       char       *datum;
+       Datum           datum;
 
        blk = P_ROOT;
        buffer = InvalidBuffer;
@@ -332,7 +353,7 @@ rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate)
        if (nospace(page, itup))
        {
                /* need to do a split */
-               res = dosplit(r, buffer, stack, itup, rtstate);
+               res = rtdosplit(r, buffer, stack, itup, rtstate);
                freestack(stack);
                WriteBuffer(buffer);    /* don't forget to release buffer! */
                return res;
@@ -351,14 +372,16 @@ rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate)
                                                OffsetNumberNext(PageGetMaxOffsetNumber(page)),
                                                LP_USED);
        }
+       if (l == InvalidOffsetNumber)
+               elog(ERROR, "rtdoinsert: failed to add index item to %s",
+                        RelationGetRelationName(r));
 
        WriteBuffer(buffer);
 
-       datum = (((char *) itup) + sizeof(IndexTupleData));
+       datum = IndexTupleGetDatum(itup);
 
        /* now expand the page boundary in the parent to include the new child */
-       rttighten(r, stack, datum,
-                         (IndexTupleSize(itup) - sizeof(IndexTupleData)), rtstate);
+       rttighten(r, stack, datum, IndexTupleAttSize(itup), rtstate);
        freestack(stack);
 
        /* build and return an InsertIndexResult for this insertion */
@@ -371,12 +394,12 @@ rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate)
 static void
 rttighten(Relation r,
                  RTSTACK *stk,
-                 char *datum,
+                 Datum datum,
                  int att_size,
                  RTSTATE *rtstate)
 {
-       char       *oldud;
-       char       *tdatum;
+       Datum           oldud;
+       Datum           tdatum;
        Page            p;
        float           old_size,
                                newd_size;
@@ -388,20 +411,15 @@ rttighten(Relation r,
        b = ReadBuffer(r, stk->rts_blk);
        p = BufferGetPage(b);
 
-       oldud = (char *) PageGetItem(p, PageGetItemId(p, stk->rts_child));
-       oldud += sizeof(IndexTupleData);
+       oldud = IndexTupleGetDatum(PageGetItem(p,
+                                                                                  PageGetItemId(p, stk->rts_child)));
 
-       FunctionCall2(&rtstate->sizeFn,
-                                 PointerGetDatum(oldud),
+       FunctionCall2(&rtstate->sizeFn, oldud,
                                  PointerGetDatum(&old_size));
 
-       datum = (char *)
-               DatumGetPointer(FunctionCall2(&rtstate->unionFn,
-                                                                         PointerGetDatum(oldud),
-                                                                         PointerGetDatum(datum)));
+       datum = FunctionCall2(&rtstate->unionFn, oldud, datum);
 
-       FunctionCall2(&rtstate->sizeFn,
-                                 PointerGetDatum(datum),
+       FunctionCall2(&rtstate->sizeFn, datum,
                                  PointerGetDatum(&newd_size));
 
        if (newd_size != old_size)
@@ -415,45 +433,46 @@ rttighten(Relation r,
                         * This is an internal page, so 'oldud' had better be a union
                         * (constant-length) key, too.  (See comment below.)
                         */
-                       Assert(VARSIZE(datum) == VARSIZE(oldud));
-                       memmove(oldud, datum, VARSIZE(datum));
+                       Assert(VARSIZE(DatumGetPointer(datum)) ==
+                                  VARSIZE(DatumGetPointer(oldud)));
+                       memmove(DatumGetPointer(oldud), DatumGetPointer(datum),
+                                       VARSIZE(DatumGetPointer(datum)));
                }
                else
-                       memmove(oldud, datum, att_size);
+               {
+                       memmove(DatumGetPointer(oldud), DatumGetPointer(datum),
+                                       att_size);
+               }
                WriteBuffer(b);
 
                /*
                 * The user may be defining an index on variable-sized data (like
                 * polygons).  If so, we need to get a constant-sized datum for
                 * insertion on the internal page.      We do this by calling the
-                * union proc, which is guaranteed to return a rectangle.
+                * union proc, which is required to return a rectangle.
                 */
+               tdatum = FunctionCall2(&rtstate->unionFn, datum, datum);
 
-               tdatum = (char *)
-                       DatumGetPointer(FunctionCall2(&rtstate->unionFn,
-                                                                                 PointerGetDatum(datum),
-                                                                                 PointerGetDatum(datum)));
                rttighten(r, stk->rts_parent, tdatum, att_size, rtstate);
-               pfree(tdatum);
+               pfree(DatumGetPointer(tdatum));
        }
        else
                ReleaseBuffer(b);
-       pfree(datum);
+       pfree(DatumGetPointer(datum));
 }
 
 /*
- *     dosplit -- split a page in the tree.
+ *     rtdosplit -- split a page in the tree.
  *
- *       This is the quadratic-cost split algorithm Guttman describes in
- *       his paper.  The reason we chose it is that you can implement this
- *       with less information about the data types on which you're operating.
+ *       rtpicksplit does the interesting work of choosing the split.
+ *       This routine just does the bit-pushing.
  */
 static InsertIndexResult
-dosplit(Relation r,
-               Buffer buffer,
-               RTSTACK *stack,
-               IndexTuple itup,
-               RTSTATE *rtstate)
+rtdosplit(Relation r,
+                 Buffer buffer,
+                 RTSTACK *stack,
+                 IndexTuple itup,
+                 RTSTATE *rtstate)
 {
        Page            p;
        Buffer          leftbuf,
@@ -476,14 +495,15 @@ dosplit(Relation r,
        InsertIndexResult res;
        char       *isnull;
        SPLITVEC        v;
+       OffsetNumber *spl_left,
+                          *spl_right;
        TupleDesc       tupDesc;
 
-       isnull = (char *) palloc(r->rd_rel->relnatts);
-       for (blank = 0; blank < r->rd_rel->relnatts; blank++)
-               isnull[blank] = ' ';
        p = (Page) BufferGetPage(buffer);
        opaque = (RTreePageOpaque) PageGetSpecialPointer(p);
 
+       rtpicksplit(r, p, &v, itup, rtstate);
+
        /*
         * The root of the tree is the first block in the relation.  If we're
         * about to split the root, we need to do some hocus-pocus to enforce
@@ -510,8 +530,8 @@ dosplit(Relation r,
        rbknum = BufferGetBlockNumber(rightbuf);
        right = (Page) BufferGetPage(rightbuf);
 
-       picksplit(r, p, &v, itup, rtstate);
-
+       spl_left = v.spl_left;
+       spl_right = v.spl_right;
        leftoff = rightoff = FirstOffsetNumber;
        maxoff = PageGetMaxOffsetNumber(p);
        for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
@@ -519,19 +539,24 @@ dosplit(Relation r,
                itemid = PageGetItemId(p, i);
                item = (IndexTuple) PageGetItem(p, itemid);
 
-               if (i == *(v.spl_left))
+               if (i == *spl_left)
                {
-                       PageAddItem(left, (Item) item, IndexTupleSize(item),
-                                               leftoff, LP_USED);
+                       if (PageAddItem(left, (Item) item, IndexTupleSize(item),
+                                                       leftoff, LP_USED) == InvalidOffsetNumber)
+                               elog(ERROR, "rtdosplit: failed to copy index item in %s",
+                                        RelationGetRelationName(r));
                        leftoff = OffsetNumberNext(leftoff);
-                       v.spl_left++;           /* advance in left split vector */
+                       spl_left++;                     /* advance in left split vector */
                }
                else
                {
-                       PageAddItem(right, (Item) item, IndexTupleSize(item),
-                                               rightoff, LP_USED);
+                       Assert(i == *spl_right);
+                       if (PageAddItem(right, (Item) item, IndexTupleSize(item),
+                                                       rightoff, LP_USED) == InvalidOffsetNumber)
+                               elog(ERROR, "rtdosplit: failed to copy index item in %s",
+                                        RelationGetRelationName(r));
                        rightoff = OffsetNumberNext(rightoff);
-                       v.spl_right++;          /* advance in right split vector */
+                       spl_right++;            /* advance in right split vector */
                }
        }
 
@@ -539,21 +564,34 @@ dosplit(Relation r,
        res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
 
        /* now insert the new index tuple */
-       if (*(v.spl_left) != FirstOffsetNumber)
+       if (*spl_left == maxoff+1)
        {
-               PageAddItem(left, (Item) itup, IndexTupleSize(itup),
-                                       leftoff, LP_USED);
+               if (PageAddItem(left, (Item) itup, IndexTupleSize(itup),
+                                               leftoff, LP_USED) == InvalidOffsetNumber)
+                       elog(ERROR, "rtdosplit: failed to add index item to %s",
+                                RelationGetRelationName(r));
                leftoff = OffsetNumberNext(leftoff);
                ItemPointerSet(&(res->pointerData), lbknum, leftoff);
+               spl_left++;
        }
        else
        {
-               PageAddItem(right, (Item) itup, IndexTupleSize(itup),
-                                       rightoff, LP_USED);
+               Assert(*spl_right == maxoff+1);
+               if (PageAddItem(right, (Item) itup, IndexTupleSize(itup),
+                                               rightoff, LP_USED) == InvalidOffsetNumber)
+                       elog(ERROR, "rtdosplit: failed to add index item to %s",
+                                RelationGetRelationName(r));
                rightoff = OffsetNumberNext(rightoff);
                ItemPointerSet(&(res->pointerData), rbknum, rightoff);
+               spl_right++;
        }
 
+       /* Make sure we consumed all of the split vectors, and release 'em */
+       Assert(*spl_left == InvalidOffsetNumber);
+       Assert(*spl_right == InvalidOffsetNumber);
+       pfree(v.spl_left);
+       pfree(v.spl_right);
+
        if ((bufblock = BufferGetBlockNumber(buffer)) != P_ROOT)
                PageRestoreTempPage(left, p);
        WriteBuffer(leftbuf);
@@ -579,10 +617,14 @@ dosplit(Relation r,
        rtadjscans(r, RTOP_SPLIT, bufblock, FirstOffsetNumber);
 
        tupDesc = r->rd_att;
+       isnull = (char *) palloc(r->rd_rel->relnatts);
+       for (blank = 0; blank < r->rd_rel->relnatts; blank++)
+               isnull[blank] = ' ';
+
        ltup = (IndexTuple) index_formtuple(tupDesc,
-                                                                         (Datum *) &(v.spl_ldatum), isnull);
+                                                                               &(v.spl_ldatum), isnull);
        rtup = (IndexTuple) index_formtuple(tupDesc,
-                                                                         (Datum *) &(v.spl_rdatum), isnull);
+                                                                               &(v.spl_rdatum), isnull);
        pfree(isnull);
 
        /* set pointers to new child pages in the internal index tuples */
@@ -607,9 +649,9 @@ rtintinsert(Relation r,
        IndexTuple      old;
        Buffer          b;
        Page            p;
-       char       *ldatum,
-                          *rdatum,
-                          *newdatum;
+       Datum           ldatum,
+                               rdatum,
+                               newdatum;
        InsertIndexResult res;
 
        if (stk == (RTSTACK *) NULL)
@@ -623,7 +665,7 @@ rtintinsert(Relation r,
        old = (IndexTuple) PageGetItem(p, PageGetItemId(p, stk->rts_child));
 
        /*
-        * This is a hack.      Right now, we force rtree keys to be constant
+        * This is a hack.      Right now, we force rtree internal keys to be constant
         * size. To fix this, need delete the old key and add both left and
         * right for the two new pages.  The insertion of left may force a
         * split if the new left key is bigger than the old key.
@@ -637,30 +679,30 @@ rtintinsert(Relation r,
 
        if (nospace(p, rtup))
        {
-               newdatum = (((char *) ltup) + sizeof(IndexTupleData));
+               newdatum = IndexTupleGetDatum(ltup);
                rttighten(r, stk->rts_parent, newdatum,
-                          (IndexTupleSize(ltup) - sizeof(IndexTupleData)), rtstate);
-               res = dosplit(r, b, stk->rts_parent, rtup, rtstate);
+                                 IndexTupleAttSize(ltup), rtstate);
+               res = rtdosplit(r, b, stk->rts_parent, rtup, rtstate);
                WriteBuffer(b);                 /* don't forget to release buffer!  -
                                                                 * 01/31/94 */
                pfree(res);
        }
        else
        {
-               PageAddItem(p, (Item) rtup, IndexTupleSize(rtup),
-                                       PageGetMaxOffsetNumber(p), LP_USED);
+               if (PageAddItem(p, (Item) rtup, IndexTupleSize(rtup),
+                                               PageGetMaxOffsetNumber(p),
+                                               LP_USED) == InvalidOffsetNumber)
+                       elog(ERROR, "rtintinsert: failed to add index item to %s",
+                                RelationGetRelationName(r));
                WriteBuffer(b);
-               ldatum = (((char *) ltup) + sizeof(IndexTupleData));
-               rdatum = (((char *) rtup) + sizeof(IndexTupleData));
-               newdatum = (char *)
-                       DatumGetPointer(FunctionCall2(&rtstate->unionFn,
-                                                                                 PointerGetDatum(ldatum),
-                                                                                 PointerGetDatum(rdatum)));
+               ldatum = IndexTupleGetDatum(ltup);
+               rdatum = IndexTupleGetDatum(rtup);
+               newdatum = FunctionCall2(&rtstate->unionFn, ldatum, rdatum);
 
                rttighten(r, stk->rts_parent, newdatum,
-                          (IndexTupleSize(rtup) - sizeof(IndexTupleData)), rtstate);
+                                 IndexTupleAttSize(rtup), rtstate);
 
-               pfree(newdatum);
+               pfree(DatumGetPointer(newdatum));
        }
 }
 
@@ -673,33 +715,64 @@ rtnewroot(Relation r, IndexTuple lt, IndexTuple rt)
        b = ReadBuffer(r, P_ROOT);
        RTInitBuffer(b, 0);
        p = BufferGetPage(b);
-       PageAddItem(p, (Item) lt, IndexTupleSize(lt),
-                               FirstOffsetNumber, LP_USED);
-       PageAddItem(p, (Item) rt, IndexTupleSize(rt),
-                               OffsetNumberNext(FirstOffsetNumber), LP_USED);
+       if (PageAddItem(p, (Item) lt, IndexTupleSize(lt),
+                                       FirstOffsetNumber,
+                                       LP_USED) == InvalidOffsetNumber)
+               elog(ERROR, "rtnewroot: failed to add index item to %s",
+                        RelationGetRelationName(r));
+       if (PageAddItem(p, (Item) rt, IndexTupleSize(rt),
+                                       OffsetNumberNext(FirstOffsetNumber),
+                                       LP_USED) == InvalidOffsetNumber)
+               elog(ERROR, "rtnewroot: failed to add index item to %s",
+                        RelationGetRelationName(r));
        WriteBuffer(b);
 }
 
+/*
+ * Choose how to split an rtree page into two pages.
+ *
+ * We return two vectors of index item numbers, one for the items to be
+ * put on the left page, one for the items to be put on the right page.
+ * In addition, the item to be added (itup) is listed in the appropriate
+ * vector.  It is represented by item number N+1 (N = # of items on page).
+ *
+ * Both vectors appear in sequence order with a terminating sentinel value
+ * of InvalidOffsetNumber.
+ *
+ * The bounding-box datums for the two new pages are also returned in *v.
+ *
+ * This is the quadratic-cost split algorithm Guttman describes in
+ * his paper.  The reason we chose it is that you can implement this
+ * with less information about the data types on which you're operating.
+ *
+ * We must also deal with a consideration not found in Guttman's algorithm:
+ * variable-length data.  In particular, the incoming item might be
+ * large enough that not just any split will work.  In the worst case,
+ * our "split" may have to be the new item on one page and all the existing
+ * items on the other.  Short of that, we have to take care that we do not
+ * make a split that leaves both pages too full for the new item.
+ */
 static void
-picksplit(Relation r,
-                 Page page,
-                 SPLITVEC *v,
-                 IndexTuple itup,
-                 RTSTATE *rtstate)
+rtpicksplit(Relation r,
+                       Page page,
+                       SPLITVEC *v,
+                       IndexTuple itup,
+                       RTSTATE *rtstate)
 {
-       OffsetNumber maxoff;
+       OffsetNumber maxoff,
+                               newitemoff;
        OffsetNumber i,
                                j;
        IndexTuple      item_1,
                                item_2;
-       char       *datum_alpha,
-                          *datum_beta;
-       char       *datum_l,
-                          *datum_r;
-       char       *union_d,
-                          *union_dl,
-                          *union_dr;
-       char       *inter_d;
+       Datum           datum_alpha,
+                               datum_beta;
+       Datum           datum_l,
+                               datum_r;
+       Datum           union_d,
+                               union_dl,
+                               union_dr;
+       Datum           inter_d;
        bool            firsttime;
        float           size_alpha,
                                size_beta,
@@ -714,9 +787,26 @@ picksplit(Relation r,
                                seed_2 = 0;
        OffsetNumber *left,
                           *right;
+       Size            newitemsz,
+                               item_1_sz,
+                               item_2_sz,
+                               left_avail_space,
+                               right_avail_space;
+
+       /*
+        * First, make sure the new item is not so large that we can't possibly
+        * fit it on a page, even by itself.  (It's sufficient to make this test
+        * here, since any oversize tuple must lead to a page split attempt.)
+        */
+       newitemsz = IndexTupleTotalSize(itup);
+       if (newitemsz > RTPageAvailSpace)
+               elog(ERROR, "rtree: index item size %lu exceeds maximum %lu",
+                        (unsigned long) newitemsz, (unsigned long) RTPageAvailSpace);
 
        maxoff = PageGetMaxOffsetNumber(page);
+       newitemoff = OffsetNumberNext(maxoff); /* phony index for new item */
 
+       /* Make arrays big enough for worst case, including sentinel */
        nbytes = (maxoff + 2) * sizeof(OffsetNumber);
        v->spl_left = (OffsetNumber *) palloc(nbytes);
        v->spl_right = (OffsetNumber *) palloc(nbytes);
@@ -727,42 +817,46 @@ picksplit(Relation r,
        for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i))
        {
                item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
-               datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
+               datum_alpha = IndexTupleGetDatum(item_1);
+               item_1_sz = IndexTupleTotalSize(item_1);
+
                for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j))
                {
                        item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, j));
-                       datum_beta = ((char *) item_2) + sizeof(IndexTupleData);
+                       datum_beta = IndexTupleGetDatum(item_2);
+                       item_2_sz = IndexTupleTotalSize(item_2);
+
+                       /*
+                        * Ignore seed pairs that don't leave room for the new item
+                        * on either split page.
+                        */
+                       if (newitemsz + item_1_sz > RTPageAvailSpace &&
+                               newitemsz + item_2_sz > RTPageAvailSpace)
+                               continue;
 
                        /* compute the wasted space by unioning these guys */
-                       union_d = (char *)
-                               DatumGetPointer(FunctionCall2(&rtstate->unionFn,
-                                                                                         PointerGetDatum(datum_alpha),
-                                                                                         PointerGetDatum(datum_beta)));
-                       FunctionCall2(&rtstate->sizeFn,
-                                                 PointerGetDatum(union_d),
+                       union_d = FunctionCall2(&rtstate->unionFn,
+                                                                       datum_alpha, datum_beta);
+                       FunctionCall2(&rtstate->sizeFn, union_d,
                                                  PointerGetDatum(&size_union));
-                       inter_d = (char *)
-                               DatumGetPointer(FunctionCall2(&rtstate->interFn,
-                                                                                         PointerGetDatum(datum_alpha),
-                                                                                         PointerGetDatum(datum_beta)));
+                       inter_d = FunctionCall2(&rtstate->interFn,
+                                                                       datum_alpha, datum_beta);
                        /* The interFn may return a NULL pointer (not an SQL null!)
                         * to indicate no intersection.  sizeFn must cope with this.
                         */
-                       FunctionCall2(&rtstate->sizeFn,
-                                                 PointerGetDatum(inter_d),
+                       FunctionCall2(&rtstate->sizeFn, inter_d,
                                                  PointerGetDatum(&size_inter));
                        size_waste = size_union - size_inter;
 
-                       if (union_d != (char *) NULL)
-                               pfree(union_d);
-                       if (inter_d != (char *) NULL)
-                               pfree(inter_d);
+                       if (DatumGetPointer(union_d) != NULL)
+                               pfree(DatumGetPointer(union_d));
+                       if (DatumGetPointer(inter_d) != NULL)
+                               pfree(DatumGetPointer(inter_d));
 
                        /*
                         * are these a more promising split that what we've already
                         * seen?
                         */
-
                        if (size_waste > waste || firsttime)
                        {
                                waste = size_waste;
@@ -773,29 +867,36 @@ picksplit(Relation r,
                }
        }
 
-       left = v->spl_left;
-       v->spl_nleft = 0;
-       right = v->spl_right;
-       v->spl_nright = 0;
+       if (firsttime)
+       {
+               /*
+                * There is no possible split except to put the new item on its
+                * own page.  Since we still have to compute the union rectangles,
+                * we play dumb and run through the split algorithm anyway,
+                * setting seed_1 = first item on page and seed_2 = new item.
+                */
+               seed_1 = FirstOffsetNumber;
+               seed_2 = newitemoff;
+       }
 
        item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_1));
-       datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
-       datum_l = (char *)
-               DatumGetPointer(FunctionCall2(&rtstate->unionFn,
-                                                                         PointerGetDatum(datum_alpha),
-                                                                         PointerGetDatum(datum_alpha)));
-       FunctionCall2(&rtstate->sizeFn,
-                                 PointerGetDatum(datum_l),
-                                 PointerGetDatum(&size_l));
-       item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_2));
-       datum_beta = ((char *) item_2) + sizeof(IndexTupleData);
-       datum_r = (char *)
-               DatumGetPointer(FunctionCall2(&rtstate->unionFn,
-                                                                         PointerGetDatum(datum_beta),
-                                                                         PointerGetDatum(datum_beta)));
-       FunctionCall2(&rtstate->sizeFn,
-                                 PointerGetDatum(datum_r),
-                                 PointerGetDatum(&size_r));
+       datum_alpha = IndexTupleGetDatum(item_1);
+       datum_l = FunctionCall2(&rtstate->unionFn, datum_alpha, datum_alpha);
+       FunctionCall2(&rtstate->sizeFn, datum_l, PointerGetDatum(&size_l));
+       left_avail_space = RTPageAvailSpace - IndexTupleTotalSize(item_1);
+
+       if (seed_2 == newitemoff)
+       {
+               item_2 = itup;
+               /* Needn't leave room for new item in calculations below */
+               newitemsz = 0;
+       }
+       else
+               item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_2));
+       datum_beta = IndexTupleGetDatum(item_2);
+       datum_r = FunctionCall2(&rtstate->unionFn, datum_beta, datum_beta);
+       FunctionCall2(&rtstate->sizeFn, datum_r, PointerGetDatum(&size_r));
+       right_avail_space = RTPageAvailSpace - IndexTupleTotalSize(item_2);
 
        /*
         * Now split up the regions between the two seeds.      An important
@@ -808,14 +909,20 @@ picksplit(Relation r,
         * is handled at the very end, when we have placed all the existing
         * tuples and i == maxoff + 1.
         */
+       left = v->spl_left;
+       v->spl_nleft = 0;
+       right = v->spl_right;
+       v->spl_nright = 0;
 
-       maxoff = OffsetNumberNext(maxoff);
-       for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+       for (i = FirstOffsetNumber; i <= newitemoff; i = OffsetNumberNext(i))
        {
+               bool    left_feasible,
+                               right_feasible,
+                               choose_left;
 
                /*
                 * If we've already decided where to place this item, just put it
-                * on the right list.  Otherwise, we need to figure out which page
+                * on the correct list.  Otherwise, we need to figure out which page
                 * needs the least enlargement in order to store the item.
                 */
 
@@ -823,58 +930,89 @@ picksplit(Relation r,
                {
                        *left++ = i;
                        v->spl_nleft++;
+                       /* left avail_space & union already includes this one */
                        continue;
                }
-               else if (i == seed_2)
+               if (i == seed_2)
                {
                        *right++ = i;
                        v->spl_nright++;
+                       /* right avail_space & union already includes this one */
                        continue;
                }
 
-               /* okay, which page needs least enlargement? */
-               if (i == maxoff)
+               /* Compute new union datums and sizes for both possible additions */
+               if (i == newitemoff)
+               {
                        item_1 = itup;
+                       /* Needn't leave room for new item anymore */
+                       newitemsz = 0;
+               }
                else
                        item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+               item_1_sz = IndexTupleTotalSize(item_1);
 
-               datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
-               union_dl = (char *)
-                       DatumGetPointer(FunctionCall2(&rtstate->unionFn,
-                                                                                 PointerGetDatum(datum_l),
-                                                                                 PointerGetDatum(datum_alpha)));
-               union_dr = (char *)
-                       DatumGetPointer(FunctionCall2(&rtstate->unionFn,
-                                                                                 PointerGetDatum(datum_r),
-                                                                                 PointerGetDatum(datum_alpha)));
-               FunctionCall2(&rtstate->sizeFn,
-                                         PointerGetDatum(union_dl),
+               datum_alpha = IndexTupleGetDatum(item_1);
+               union_dl = FunctionCall2(&rtstate->unionFn, datum_l, datum_alpha);
+               union_dr = FunctionCall2(&rtstate->unionFn, datum_r, datum_alpha);
+               FunctionCall2(&rtstate->sizeFn, union_dl,
                                          PointerGetDatum(&size_alpha));
-               FunctionCall2(&rtstate->sizeFn,
-                                         PointerGetDatum(union_dr),
+               FunctionCall2(&rtstate->sizeFn, union_dr,
                                          PointerGetDatum(&size_beta));
 
-               /* pick which page to add it to */
-               if (size_alpha - size_l < size_beta - size_r)
+               /*
+                * We prefer the page that shows smaller enlargement of its union area
+                * (Guttman's algorithm), but we must take care that at least one page
+                * will still have room for the new item after this one is added.
+                *
+                * (We know that all the old items together can fit on one page,
+                * so we need not worry about any other problem than failing to fit
+                * the new item.)
+                */
+               left_feasible = (left_avail_space >= item_1_sz &&
+                                                ((left_avail_space - item_1_sz) >= newitemsz ||
+                                                 right_avail_space >= newitemsz));
+               right_feasible = (right_avail_space >= item_1_sz &&
+                                                 ((right_avail_space - item_1_sz) >= newitemsz ||
+                                                  left_avail_space >= newitemsz));
+               if (left_feasible && right_feasible)
+               {
+                       /* Both feasible, use Guttman's algorithm */
+                       choose_left = (size_alpha - size_l < size_beta - size_r);
+               }
+               else if (left_feasible)
+                       choose_left = true;
+               else if (right_feasible)
+                       choose_left = false;
+               else
                {
-                       pfree(datum_l);
-                       pfree(union_dr);
+                       elog(ERROR, "rtpicksplit: failed to find a workable page split");
+                       choose_left = false; /* keep compiler quiet */
+               }
+
+               if (choose_left)
+               {
+                       pfree(DatumGetPointer(datum_l));
+                       pfree(DatumGetPointer(union_dr));
                        datum_l = union_dl;
                        size_l = size_alpha;
+                       left_avail_space -= item_1_sz;
                        *left++ = i;
                        v->spl_nleft++;
                }
                else
                {
-                       pfree(datum_r);
-                       pfree(union_dl);
+                       pfree(DatumGetPointer(datum_r));
+                       pfree(DatumGetPointer(union_dl));
                        datum_r = union_dr;
                        size_r = size_beta;
+                       right_avail_space -= item_1_sz;
                        *right++ = i;
                        v->spl_nright++;
                }
        }
-       *left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */
+
+       *left = *right = InvalidOffsetNumber; /* add ending sentinels */
 
        v->spl_ldatum = datum_l;
        v->spl_rdatum = datum_r;
@@ -902,34 +1040,28 @@ choose(Relation r, Page p, IndexTuple it, RTSTATE *rtstate)
 {
        OffsetNumber maxoff;
        OffsetNumber i;
-       char       *ud,
-                          *id;
-       char       *datum;
+       Datum           ud,
+                               id;
+       Datum           datum;
        float           usize,
                                dsize;
        OffsetNumber which;
        float           which_grow;
 
-       id = ((char *) it) + sizeof(IndexTupleData);
+       id = IndexTupleGetDatum(it);
        maxoff = PageGetMaxOffsetNumber(p);
        which_grow = -1.0;
        which = -1;
 
        for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
        {
-               datum = (char *) PageGetItem(p, PageGetItemId(p, i));
-               datum += sizeof(IndexTupleData);
-               FunctionCall2(&rtstate->sizeFn,
-                                         PointerGetDatum(datum),
+               datum = IndexTupleGetDatum(PageGetItem(p, PageGetItemId(p, i)));
+               FunctionCall2(&rtstate->sizeFn, datum,
                                          PointerGetDatum(&dsize));
-               ud = (char *)
-                       DatumGetPointer(FunctionCall2(&rtstate->unionFn,
-                                                                                 PointerGetDatum(datum),
-                                                                                 PointerGetDatum(id)));
-               FunctionCall2(&rtstate->sizeFn,
-                                         PointerGetDatum(ud),
+               ud = FunctionCall2(&rtstate->unionFn, datum, id);
+               FunctionCall2(&rtstate->sizeFn, ud,
                                          PointerGetDatum(&usize));
-               pfree(ud);
+               pfree(DatumGetPointer(ud));
                if (which_grow < 0 || usize - dsize < which_grow)
                {
                        which = i;
@@ -1026,7 +1158,7 @@ _rtdump(Relation r)
        IndexTuple      itup;
        BlockNumber itblkno;
        OffsetNumber itoffno;
-       char       *datum;
+       Datum           datum;
        char       *itkey;
 
        nblocks = RelationGetNumberOfBlocks(r);
@@ -1052,10 +1184,9 @@ _rtdump(Relation r)
                        itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
                        itblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
                        itoffno = ItemPointerGetOffsetNumber(&(itup->t_tid));
-                       datum = ((char *) itup);
-                       datum += sizeof(IndexTupleData);
+                       datum = IndexTupleGetDatum(itup);
                        itkey = DatumGetCString(DirectFunctionCall1(box_out,
-                                                                                               PointerGetDatum(datum)));
+                                                                                                               datum));
                        printf("\t[%d] size %d heap <%d,%d> key:%s\n",
                                   offnum, IndexTupleSize(itup), itblkno, itoffno, itkey);
                        pfree(itkey);
index 5ad9a87f5916404502d6d3ebff7c3383202b3e5f..04398423b67349484e407d38841cc2a9bf7b10f7 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.50 2001/02/13 01:57:12 pjw Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.51 2001/03/07 21:20:26 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -785,7 +785,8 @@ void seq_redo(XLogRecPtr lsn, XLogRecord *record)
        itemsz = record->xl_len - sizeof(xl_seq_rec);
        itemsz = MAXALIGN(itemsz);
        if (PageAddItem(page, (Item)item, itemsz, 
-                       FirstOffsetNumber, LP_USED) == InvalidOffsetNumber)
+                                       FirstOffsetNumber, LP_USED) == InvalidOffsetNumber)
+               elog(STOP, "seq_redo: failed to add item to page");
 
        PageSetLSN(page, lsn);
        PageSetSUI(page, ThisStartUpID);