]> granicus.if.org Git - postgresql/commitdiff
Reduce size of critical section and remove call of user-defined functions in
authorTeodor Sigaev <teodor@sigaev.ru>
Wed, 10 May 2006 09:19:54 +0000 (09:19 +0000)
committerTeodor Sigaev <teodor@sigaev.ru>
Wed, 10 May 2006 09:19:54 +0000 (09:19 +0000)
insertion and deletion, modify gistSplit() to do not use buffers.

 TODO: gistvacuumcleanup and XLOG

src/backend/access/gist/gist.c
src/backend/access/gist/gistutil.c
src/backend/access/gist/gistvacuum.c
src/backend/access/gist/gistxlog.c
src/include/access/gist_private.h

index 16468fd35a5bce6b442bd3349c6805a5a7857bbf..2272e3339d14d24e50742dab7cf3d8423b0e3d12 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.132 2006/04/03 13:44:33 teodor Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.133 2006/05/10 09:19:54 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -52,6 +52,8 @@ static void gistfindleaf(GISTInsertState *state,
 #define ROTATEDIST(d) do { \
        SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
        memset(tmp,0,sizeof(SplitedPageLayout)); \
+       tmp->block.blkno = InvalidBlockNumber;  \
+       tmp->buffer = InvalidBuffer;    \
        tmp->next = (d); \
        (d)=tmp; \
 } while(0)
@@ -309,52 +311,111 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
        bool            is_splitted = false;
        bool            is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
 
+
        /*
-        * XXX this code really ought to work by locking, but not modifying,
-        * all the buffers it needs; then starting a critical section; then
-        * modifying the buffers in an already-determined way and writing an
-        * XLOG record to reflect that.  Since it doesn't, we've got to put
-        * a critical section around the entire process, which is horrible
-        * from a robustness point of view.
+        * if (!is_leaf) remove old key:
+        * This node's key has been modified, either because a child split
+        * occurred or because we needed to adjust our key for an insert in a
+        * child node. Therefore, remove the old version of this node's key.
+        *
+        * for WAL replay, in the non-split case we handle this by
+        * setting up a one-element todelete array; in the split case, it's
+        * handled implicitly because the tuple vector passed to gistSplit
+        * won't include this tuple.
         */
-       START_CRIT_SECTION();
-
-       if (!is_leaf)
-
-               /*
-                * This node's key has been modified, either because a child split
-                * occurred or because we needed to adjust our key for an insert in a
-                * child node. Therefore, remove the old version of this node's key.
-                *
-                * Note: for WAL replay, in the non-split case we handle this by
-                * setting up a one-element todelete array; in the split case, it's
-                * handled implicitly because the tuple vector passed to gistSplit
-                * won't include this tuple.
-                */
 
-               PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
 
-       if (gistnospace(state->stack->page, state->itup, state->ituplen))
+       if (gistnospace(state->stack->page, state->itup, state->ituplen, (is_leaf) ? InvalidOffsetNumber : state->stack->childoffnum))
        {
                /* no space for insertion */
-               IndexTuple *itvec,
-                                  *newitup;
+               IndexTuple *itvec;
                int                     tlen;
                SplitedPageLayout *dist = NULL,
                                   *ptr;
+               BlockNumber     rrlink = InvalidBlockNumber;
+               GistNSN         oldnsn;
 
                is_splitted = true;
+
+               /*
+                * Form index tuples vector to split:
+                * remove old tuple if t's needed and add new tuples to vector
+                */
                itvec = gistextractbuffer(state->stack->buffer, &tlen);
+               if ( !is_leaf ) {
+                       /* on inner page we should remove old tuple */
+                       int pos = state->stack->childoffnum - FirstOffsetNumber;
+
+                       tlen--; 
+                       if ( pos != tlen ) 
+                               memmove( itvec+pos, itvec + pos + 1, sizeof( IndexTuple ) * (tlen-pos) );
+               }
                itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
-               newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate);
+               dist = gistSplit(state->r, state->stack->page, itvec, tlen, giststate);
+
+               state->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * tlen);
+               state->ituplen = 0;
+
+               if (state->stack->blkno != GIST_ROOT_BLKNO) {
+                       /* if non-root split then we should not allocate new buffer,
+                          but we must create temporary page to operate */ 
+                       dist->buffer = state->stack->buffer;
+                       dist->page = PageGetTempPage( BufferGetPage(dist->buffer), sizeof(GISTPageOpaqueData) );
+
+                       /*clean all flags except F_LEAF */ 
+                       GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
+               }
+
+               /* make new pages and fills them */
+               for (ptr = dist; ptr; ptr = ptr->next) {
+                       int i;
+                       char *data;
+
+                       /* get new page */
+                       if ( ptr->buffer == InvalidBuffer ) {
+                               ptr->buffer = gistNewBuffer( state->r );
+                               GISTInitBuffer( ptr->buffer, (is_leaf) ? F_LEAF : 0 );
+                               ptr->page = BufferGetPage(ptr->buffer);
+                       }
+                       ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
+
+                       /* fill page, we can do it becouse all this pages are new (ie not linked in tree
+                          or masked by temp page */
+                       data = (char*)(ptr->list); 
+                       for(i=0;i<ptr->block.num;i++) {
+                               if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
+                                       elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(state->r));
+                               data += IndexTupleSize((IndexTuple)data);
+                       }
+
+                       /* set up ItemPointer and remmeber it for parent */
+                       ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
+                       state->itup[ state->ituplen ] = ptr->itup;
+                       state->ituplen++;
+               }
+
+               /* saves old rightlink */
+               if ( state->stack->blkno != GIST_ROOT_BLKNO )
+                       rrlink =  GistPageGetOpaque(dist->page)->rightlink;
+
+               START_CRIT_SECTION();
 
                /*
                 * must mark buffers dirty before XLogInsert, even though we'll
-                * still be changing their opaque fields below
+                * still be changing their opaque fields below.
+                * set up right links.
                 */
-               for (ptr = dist; ptr; ptr = ptr->next)
+               for (ptr = dist; ptr; ptr = ptr->next) 
                {
                        MarkBufferDirty(ptr->buffer);
+                       GistPageGetOpaque(ptr->page)->rightlink = (ptr->next) ?
+                               ptr->next->block.blkno : rrlink;
+               }
+
+               /* restore splitted non-root page */
+               if ( state->stack->blkno != GIST_ROOT_BLKNO ) {
+                       PageRestoreTempPage( dist->page, BufferGetPage( dist->buffer ) );
+                       dist->page = BufferGetPage( dist->buffer );
                }
 
                if (!state->r->rd_istemp)
@@ -366,88 +427,44 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
                                                                   is_leaf, &(state->key), dist);
 
                        recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
+
                        for (ptr = dist; ptr; ptr = ptr->next)
                        {
-                               PageSetLSN(BufferGetPage(ptr->buffer), recptr);
-                               PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
+                               PageSetLSN(ptr->page, recptr);
+                               PageSetTLI(ptr->page, ThisTimeLineID);
                        }
                }
                else
                {
                        for (ptr = dist; ptr; ptr = ptr->next)
                        {
-                               PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
+                               PageSetLSN(ptr->page, XLogRecPtrForTemp);
                        }
                }
 
-               state->itup = newitup;
-               state->ituplen = tlen;  /* now tlen >= 2 */
-
-               if (state->stack->blkno == GIST_ROOT_BLKNO)
-               {
-                       gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
-                       state->needInsertComplete = false;
-                       for (ptr = dist; ptr; ptr = ptr->next)
-                       {
-                               Page            page = (Page) BufferGetPage(ptr->buffer);
+               /* set up NSN */
+               oldnsn = GistPageGetOpaque(dist->page)->nsn;
+               if ( state->stack->blkno == GIST_ROOT_BLKNO )
+                       /* if root split we should put initial value */
+                       oldnsn = PageGetLSN(dist->page);
 
-                               GistPageGetOpaque(page)->rightlink = (ptr->next) ?
-                                       ptr->next->block.blkno : InvalidBlockNumber;
-                               GistPageGetOpaque(page)->nsn = PageGetLSN(page);
-                               UnlockReleaseBuffer(ptr->buffer);
-                       }
+               for (ptr = dist; ptr; ptr = ptr->next) {
+                       /* only for last set oldnsn */
+                       GistPageGetOpaque(ptr->page)->nsn = (ptr->next) ?
+                               PageGetLSN(ptr->page) : oldnsn;
                }
-               else
-               {
-                       Page            page;
-                       BlockNumber rightrightlink = InvalidBlockNumber;
-                       SplitedPageLayout *ourpage = NULL;
-                       GistNSN         oldnsn;
-                       GISTPageOpaque opaque;
-
-                       /* move origpage to first in chain */
-                       if (dist->block.blkno != state->stack->blkno)
-                       {
-                               ptr = dist;
-                               while (ptr->next)
-                               {
-                                       if (ptr->next->block.blkno == state->stack->blkno)
-                                       {
-                                               ourpage = ptr->next;
-                                               ptr->next = ptr->next->next;
-                                               ourpage->next = dist;
-                                               dist = ourpage;
-                                               break;
-                                       }
-                                       ptr = ptr->next;
-                               }
-                               Assert(ourpage != NULL);
-                       }
-                       else
-                               ourpage = dist;
 
-                       /* now gets all needed data, and sets nsn's */
-                       page = (Page) BufferGetPage(ourpage->buffer);
-                       opaque = GistPageGetOpaque(page);
-                       rightrightlink = opaque->rightlink;
-                       oldnsn = opaque->nsn;
-                       opaque->nsn = PageGetLSN(page);
-                       opaque->rightlink = ourpage->next->block.blkno;
+               /* 
+                * release buffers, if it was a root split then
+                * release all buffers because we create all buffers 
+                */
+               ptr = ( state->stack->blkno == GIST_ROOT_BLKNO ) ? dist : dist->next;
+               for(; ptr; ptr = ptr->next)
+                       UnlockReleaseBuffer(ptr->buffer);
 
-                       /*
-                        * fill and release all new pages. They isn't linked into tree yet
-                        */
-                       for (ptr = ourpage->next; ptr; ptr = ptr->next)
-                       {
-                               page = (Page) BufferGetPage(ptr->buffer);
-                               GistPageGetOpaque(page)->rightlink = (ptr->next) ?
-                                       ptr->next->block.blkno : rightrightlink;
-                               /* only for last set oldnsn */
-                               GistPageGetOpaque(page)->nsn = (ptr->next) ?
-                                       opaque->nsn : oldnsn;
-
-                               UnlockReleaseBuffer(ptr->buffer);
-                       }
+               if (state->stack->blkno == GIST_ROOT_BLKNO) {
+                       gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
+                       state->needInsertComplete = false;
                }
 
                END_CRIT_SECTION();
@@ -455,13 +472,14 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
        else
        {
                /* enough space */
-               XLogRecPtr      oldlsn;
+               START_CRIT_SECTION();
 
+               if (!is_leaf)
+                       PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
                gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber);
 
                MarkBufferDirty(state->stack->buffer);
 
-               oldlsn = PageGetLSN(state->stack->page);
                if (!state->r->rd_istemp)
                {
                        OffsetNumber noffs = 0,
@@ -921,77 +939,55 @@ gistToRealOffset(OffsetNumber *arr, int len, OffsetNumber *reasloffset)
                arr[i] = reasloffset[arr[i]];
 }
 
+static IndexTupleData *
+gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) {
+       char *ptr, *ret = palloc(BLCKSZ);
+       int i;
+
+       ptr = ret;
+       for (i = 0; i < veclen; i++) {
+               memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
+               ptr += IndexTupleSize(vec[i]);
+       }
+
+       *memlen = ptr - ret;
+       Assert( *memlen < BLCKSZ );
+       return (IndexTupleData*)ret;
+}
+
 /*
  *     gistSplit -- split a page in the tree.
  */
-IndexTuple *
+SplitedPageLayout *
 gistSplit(Relation r,
-                 Buffer buffer,
+                 Page page,
                  IndexTuple *itup,             /* contains compressed entry */
-                 int *len,
-                 SplitedPageLayout **dist,
+                 int len,
                  GISTSTATE *giststate)
 {
-       Page            p;
-       Buffer          leftbuf,
-                               rightbuf;
-       Page            left,
-                               right;
        IndexTuple *lvectup,
-                          *rvectup,
-                          *newtup;
-       BlockNumber lbknum,
-                               rbknum;
-       GISTPageOpaque opaque;
+                          *rvectup;
        GIST_SPLITVEC v;
        GistEntryVector *entryvec;
        int                     i,
-                               fakeoffset,
-                               nlen;
+                               fakeoffset;
        OffsetNumber *realoffset;
        IndexTuple *cleaneditup = itup;
-       int                     lencleaneditup = *len;
-
-       p = (Page) BufferGetPage(buffer);
-       opaque = GistPageGetOpaque(p);
-
-       /*
-        * The root of the tree is the first block in the relation.  If we're
-        * about to split the root, we need to do some hocus-pocus to enforce this
-        * guarantee.
-        */
-       if (BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO)
-       {
-               leftbuf = gistNewBuffer(r);
-               GISTInitBuffer(leftbuf, opaque->flags & F_LEAF);
-               lbknum = BufferGetBlockNumber(leftbuf);
-               left = (Page) BufferGetPage(leftbuf);
-       }
-       else
-       {
-               leftbuf = buffer;
-               /* IncrBufferRefCount(buffer); */
-               lbknum = BufferGetBlockNumber(buffer);
-               left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData));
-       }
-
-       rightbuf = gistNewBuffer(r);
-       GISTInitBuffer(rightbuf, opaque->flags & F_LEAF);
-       rbknum = BufferGetBlockNumber(rightbuf);
-       right = (Page) BufferGetPage(rightbuf);
+       int                     lencleaneditup = len;
+       SplitedPageLayout       *res = NULL;
 
        /* generate the item array */
-       realoffset = palloc((*len + 1) * sizeof(OffsetNumber));
-       entryvec = palloc(GEVHDRSZ + (*len + 1) * sizeof(GISTENTRY));
-       entryvec->n = *len + 1;
+       realoffset = palloc((len + 1) * sizeof(OffsetNumber));
+       entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
+       entryvec->n = len + 1;
 
        fakeoffset = FirstOffsetNumber;
-       for (i = 1; i <= *len; i++)
+       for (i = 1; i <= len; i++)
        {
                Datum           datum;
                bool            IsNull;
 
-               if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup[i - 1]))
+               if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
                {
                        entryvec->n--;
                        /* remember position of invalid tuple */
@@ -1001,7 +997,7 @@ gistSplit(Relation r,
 
                datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull);
                gistdentryinit(giststate, 0, &(entryvec->vector[fakeoffset]),
-                                          datum, r, p, i,
+                                          datum, r, page, i,
                                           ATTSIZE(datum, giststate->tupdesc, 1, IsNull),
                                           FALSE, IsNull);
                realoffset[fakeoffset] = i;
@@ -1013,14 +1009,14 @@ gistSplit(Relation r,
         * possible, we move all invalid tuples on right page. We should remember,
         * that union with invalid tuples is a invalid tuple.
         */
-       if (entryvec->n != *len + 1)
+       if (entryvec->n != len + 1)
        {
                lencleaneditup = entryvec->n - 1;
                cleaneditup = (IndexTuple *) palloc(lencleaneditup * sizeof(IndexTuple));
                for (i = 1; i < entryvec->n; i++)
                        cleaneditup[i - 1] = itup[realoffset[i] - 1];
 
-               if (gistnospace(left, cleaneditup, lencleaneditup))
+               if (!gistfitpage(cleaneditup, lencleaneditup))
                {
                        /* no space on left to put all good tuples, so picksplit */
                        gistUserPicksplit(r, entryvec, &v, cleaneditup, lencleaneditup, giststate);
@@ -1041,8 +1037,8 @@ gistSplit(Relation r,
                                v.spl_leftvalid = v.spl_rightvalid = false;
                                v.spl_nright = 0;
                                v.spl_nleft = 0;
-                               for (i = 1; i <= *len; i++)
-                                       if (i - 1 < *len / 2)
+                               for (i = 1; i <= len; i++)
+                                       if (i - 1 < len / 2)
                                                v.spl_left[v.spl_nleft++] = i;
                                        else
                                                v.spl_right[v.spl_nright++] = i;
@@ -1071,14 +1067,14 @@ gistSplit(Relation r,
        else
        {
                /* there is no invalid tuples, so usial processing */
-               gistUserPicksplit(r, entryvec, &v, itup, *len, giststate);
+               gistUserPicksplit(r, entryvec, &v, itup, len, giststate);
                v.spl_leftvalid = v.spl_rightvalid = true;
        }
 
 
        /* form left and right vector */
-       lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len + 1));
-       rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len + 1));
+       lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
+       rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
 
        for (i = 0; i < v.spl_nleft; i++)
                lvectup[i] = itup[v.spl_left[i] - 1];
@@ -1087,87 +1083,48 @@ gistSplit(Relation r,
                rvectup[i] = itup[v.spl_right[i] - 1];
 
        /* place invalid tuples on right page if itsn't done yet */
-       for (fakeoffset = entryvec->n; fakeoffset < *len + 1 && lencleaneditup; fakeoffset++)
+       for (fakeoffset = entryvec->n; fakeoffset < len + 1 && lencleaneditup; fakeoffset++)
        {
                rvectup[v.spl_nright++] = itup[realoffset[fakeoffset] - 1];
        }
 
-       /* write on disk (may need another split) */
-       if (gistnospace(right, rvectup, v.spl_nright))
+       /* finalyze splitting (may need another split) */
+       if (!gistfitpage(rvectup, v.spl_nright))
        {
-               nlen = v.spl_nright;
-               newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate);
-               /* ReleaseBuffer(rightbuf); */
+               res = gistSplit(r, page, rvectup, v.spl_nright, giststate);
        }
        else
        {
-               char       *ptr;
-
-               gistfillbuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber);
-               /* XLOG stuff */
-               ROTATEDIST(*dist);
-               (*dist)->block.blkno = BufferGetBlockNumber(rightbuf);
-               (*dist)->block.num = v.spl_nright;
-               (*dist)->list = (IndexTupleData *) palloc(BLCKSZ);
-               ptr = (char *) ((*dist)->list);
-               for (i = 0; i < v.spl_nright; i++)
-               {
-                       memcpy(ptr, rvectup[i], IndexTupleSize(rvectup[i]));
-                       ptr += IndexTupleSize(rvectup[i]);
-               }
-               (*dist)->lenlist = ptr - ((char *) ((*dist)->list));
-               (*dist)->buffer = rightbuf;
-
-               nlen = 1;
-               newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1);
-               newtup[0] = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull)
-                       : gist_form_invalid_tuple(rbknum);
-               ItemPointerSetBlockNumber(&(newtup[0]->t_tid), rbknum);
+               ROTATEDIST(res);
+               res->block.num = v.spl_nright;
+               res->list = gistfillitupvec(rvectup, v.spl_nright, &( res->lenlist ) );
+               res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull)
+                       : gist_form_invalid_tuple(GIST_ROOT_BLKNO);
        }
 
-       if (gistnospace(left, lvectup, v.spl_nleft))
+       if (!gistfitpage(lvectup, v.spl_nleft))
        {
-               int                     llen = v.spl_nleft;
-               IndexTuple *lntup;
+               SplitedPageLayout *resptr, *subres;
 
-               lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate);
-               /* ReleaseBuffer(leftbuf); */
+               resptr = subres = gistSplit(r, page, lvectup, v.spl_nleft, giststate);
 
-               newtup = gistjoinvector(newtup, &nlen, lntup, llen);
+               /* install on list's tail */ 
+               while( resptr->next )
+                       resptr = resptr->next;
+
+               resptr->next = res;
+               res = subres;
        }
        else
        {
-               char       *ptr;
-
-               gistfillbuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber);
-               /* XLOG stuff */
-               ROTATEDIST(*dist);
-               (*dist)->block.blkno = BufferGetBlockNumber(leftbuf);
-               (*dist)->block.num = v.spl_nleft;
-               (*dist)->list = (IndexTupleData *) palloc(BLCKSZ);
-               ptr = (char *) ((*dist)->list);
-               for (i = 0; i < v.spl_nleft; i++)
-               {
-                       memcpy(ptr, lvectup[i], IndexTupleSize(lvectup[i]));
-                       ptr += IndexTupleSize(lvectup[i]);
-               }
-               (*dist)->lenlist = ptr - ((char *) ((*dist)->list));
-               (*dist)->buffer = leftbuf;
-
-               if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO)
-                       PageRestoreTempPage(left, p);
-
-               nlen += 1;
-               newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen);
-               newtup[nlen - 1] = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull)
-                       : gist_form_invalid_tuple(lbknum);
-               ItemPointerSetBlockNumber(&(newtup[nlen - 1]->t_tid), lbknum);
+               ROTATEDIST(res);
+               res->block.num = v.spl_nleft;
+               res->list = gistfillitupvec(lvectup, v.spl_nleft, &( res->lenlist ) );
+               res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull)
+                       : gist_form_invalid_tuple(GIST_ROOT_BLKNO);
        }
 
-       GistClearTuplesDeleted(p);
-
-       *len = nlen;
-       return newtup;
+       return res;
 }
 
 /*
index bf0a090ff8ba62fb047ba1e815999409a586c12a..d5d6405100b3e803a7022da690026944d3603ad5 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *                     $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.10 2006/03/05 15:58:20 momjian Exp $
+ *                     $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.11 2006/05/10 09:19:54 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -81,15 +81,31 @@ gistfillbuffer(Relation r, Page page, IndexTuple *itup,
  * Check space for itup vector on page
  */
 bool
-gistnospace(Page page, IndexTuple *itvec, int len)
+gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete)
 {
-       unsigned int size = 0;
+       unsigned int size = 0, deleted = 0;
        int                     i;
 
        for (i = 0; i < len; i++)
                size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
 
-       return (PageGetFreeSpace(page) < size);
+       if ( todelete != InvalidOffsetNumber ) {
+               IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete));
+               deleted = IndexTupleSize(itup) + sizeof(ItemIdData);
+       }
+
+       return (PageGetFreeSpace(page) + deleted < size);
+}
+
+bool
+gistfitpage(IndexTuple *itvec, int len) {
+       int i;
+       Size size=0;
+
+       for(i=0;i<len;i++)
+               size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
+
+       return (size <= GiSTPageSize);
 }
 
 /*
@@ -107,7 +123,7 @@ gistextractbuffer(Buffer buffer, int *len /* out */ )
        *len = maxoff;
        itvec = palloc(sizeof(IndexTuple) * maxoff);
        for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
-               itvec[i - 1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
+               itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
 
        return itvec;
 }
index eafd472c5fd351b05f50bd15804ab0d4fe40f405..e81c0ebf487fb53ffe06e69fd86075152b895953 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.19 2006/05/02 22:25:10 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.20 2006/05/10 09:19:54 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -85,10 +85,7 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
        if (GistPageIsLeaf(page))
        {
                if (GistTuplesDeleted(page))
-               {
                        needunion = needwrite = true;
-                       GistClearTuplesDeleted(page);
-               }
        }
        else
        {
@@ -157,30 +154,54 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
                if (curlenaddon)
                {
                        /* insert updated tuples */
-                       if (gistnospace(page, addon, curlenaddon))
+                       if (gistnospace(page, addon, curlenaddon, InvalidOffsetNumber))
                        {
                                /* there is no space on page to insert tuples */
                                IndexTuple *vec;
                                SplitedPageLayout *dist = NULL,
                                                   *ptr;
-                               int                     i;
+                               int                     i, veclen=0;
                                MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
 
-                               vec = gistextractbuffer(buffer, &(res.ituplen));
-                               vec = gistjoinvector(vec, &(res.ituplen), addon, curlenaddon);
-                               res.itup = gistSplit(gv->index, buffer, vec, &(res.ituplen), &dist, &(gv->giststate));
+                               vec = gistextractbuffer(buffer, &veclen);
+                               vec = gistjoinvector(vec, &veclen, addon, curlenaddon);
+                               dist = gistSplit(gv->index, page, vec, veclen, &(gv->giststate));
+
                                MemoryContextSwitchTo(oldCtx);
 
-                               vec = (IndexTuple *) palloc(sizeof(IndexTuple) * res.ituplen);
-                               for (i = 0; i < res.ituplen; i++)
-                               {
-                                       vec[i] = (IndexTuple) palloc(IndexTupleSize(res.itup[i]));
-                                       memcpy(vec[i], res.itup[i], IndexTupleSize(res.itup[i]));
+                               if (blkno != GIST_ROOT_BLKNO) {
+                                       /* if non-root split then we should not allocate new buffer */
+                                       dist->buffer = buffer;
+                                       dist->page = BufferGetPage(dist->buffer);
+                                       GistPageGetOpaque(dist->page)->flags = 0;
                                }
-                               res.itup = vec;
 
-                               for (ptr = dist; ptr; ptr = ptr->next)
-                               {
+                               res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen);
+                               res.ituplen = 0;
+
+                               /* make new pages and fills them */
+                               for (ptr = dist; ptr; ptr = ptr->next) {
+                                       char *data;
+
+                                       if ( ptr->buffer == InvalidBuffer ) {
+                                               ptr->buffer = gistNewBuffer( gv->index );
+                                               GISTInitBuffer( ptr->buffer, 0 );
+                                               ptr->page = BufferGetPage(ptr->buffer);
+                                       }
+                                       ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
+
+                                       data = (char*)(ptr->list);
+                                       for(i=0;i<ptr->block.num;i++) {
+                                               if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
+                                                       elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index));
+                                               data += IndexTupleSize((IndexTuple)data);
+                                       }
+
+                                       ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
+                                       res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup));
+                                       memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) );
+                                       res.ituplen++;
+
                                        MarkBufferDirty(ptr->buffer);
                                }
 
@@ -218,10 +239,9 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
 
                                for (ptr = dist; ptr; ptr = ptr->next)
                                {
-                                       /* we must keep the buffer lock on the head page */
+                                       /* we must keep the buffer pin on the head page */
                                        if (BufferGetBlockNumber(ptr->buffer) != blkno)
-                                               LockBuffer(ptr->buffer, GIST_UNLOCK);
-                                       ReleaseBuffer(ptr->buffer);
+                                               UnlockReleaseBuffer( ptr->buffer );
                                }
 
                                if (blkno == GIST_ROOT_BLKNO)
@@ -294,6 +314,7 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
        if (needwrite)
        {
                MarkBufferDirty(buffer);
+               GistClearTuplesDeleted(page);
 
                if (!gv->index->rd_istemp)
                {
@@ -570,14 +591,7 @@ gistbulkdelete(PG_FUNCTION_ARGS)
 
                        /*
                         * Remove deletable tuples from page
-                        *
-                        * XXX try to make this critical section shorter.  Could do it
-                        * by separating the callback loop from the actual tuple deletion,
-                        * but that would affect the definition of the todelete[] array
-                        * passed into the WAL record (because the indexes would all be
-                        * pre-deletion).
                         */
-                       START_CRIT_SECTION();
 
                        maxoff = PageGetMaxOffsetNumber(page);
 
@@ -588,13 +602,9 @@ gistbulkdelete(PG_FUNCTION_ARGS)
 
                                if (callback(&(idxtuple->t_tid), callback_state))
                                {
-                                       PageIndexTupleDelete(page, i);
-                                       todelete[ntodelete] = i;
-                                       i--;
-                                       maxoff--;
+                                       todelete[ntodelete] = i-ntodelete;
                                        ntodelete++;
                                        stats->std.tuples_removed += 1;
-                                       Assert(maxoff == PageGetMaxOffsetNumber(page));
                                }
                                else
                                        stats->std.num_index_tuples += 1;
@@ -602,10 +612,14 @@ gistbulkdelete(PG_FUNCTION_ARGS)
 
                        if (ntodelete)
                        {
-                               GistMarkTuplesDeleted(page);
+                               START_CRIT_SECTION();
 
                                MarkBufferDirty(buffer);
 
+                               for(i=0;i<ntodelete;i++)
+                                       PageIndexTupleDelete(page, todelete[i]);
+                               GistMarkTuplesDeleted(page);
+
                                if (!rel->rd_istemp)
                                {
                                        XLogRecData *rdata;
@@ -627,9 +641,10 @@ gistbulkdelete(PG_FUNCTION_ARGS)
                                }
                                else
                                        PageSetLSN(page, XLogRecPtrForTemp);
+
+                               END_CRIT_SECTION();
                        }
 
-                       END_CRIT_SECTION();
                }
                else
                {
index c74762b7df18762ef978835bd2265570e7a4dcf6..a029d8f1ec5540c7b08f5c28a577cea72cfb77dc 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *                      $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.15 2006/04/03 16:45:50 tgl Exp $
+ *                      $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.16 2006/05/10 09:19:54 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -625,7 +625,7 @@ gistContinueInsert(gistIncompleteInsert *insert)
                                        }
                        }
 
-                       if (gistnospace(pages[numbuffer - 1], itup, lenitup))
+                       if (gistnospace(pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber))
                        {
                                /* no space left on page, so we must split */
                                buffers[numbuffer] = ReadBuffer(index, P_NEW);
index 1bfc90abbcedf69641e0d7bd00d4ce1dcd6ee736..7e9469f000b9e2b7b2898fea4ef0ee16c40e1bd1 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.12 2006/03/30 23:03:10 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.13 2006/05/10 09:19:54 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -138,6 +138,8 @@ typedef struct SplitedPageLayout
        gistxlogPage block;
        IndexTupleData *list;
        int                     lenlist;
+       IndexTuple      itup;  /* union key for page */
+       Page            page;                   /* to operate */
        Buffer          buffer;                 /* to write after all proceed */
 
        struct SplitedPageLayout *next;
@@ -234,8 +236,8 @@ extern void freeGISTstate(GISTSTATE *giststate);
 extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate);
 extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key);
 
-extern IndexTuple *gistSplit(Relation r, Buffer buffer, IndexTuple *itup,
-                 int *len, SplitedPageLayout **dist, GISTSTATE *giststate);
+extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup,
+                 int len, GISTSTATE *giststate);
 
 extern GISTInsertStack *gistFindPath(Relation r, BlockNumber child);
 
@@ -261,11 +263,16 @@ extern Datum gistgettuple(PG_FUNCTION_ARGS);
 extern Datum gistgetmulti(PG_FUNCTION_ARGS);
 
 /* gistutil.c */
+
+#define GiSTPageSize   \
+    ( BLCKSZ - SizeOfPageHeaderData - MAXALIGN(sizeof(GISTPageOpaqueData)) ) 
+
+extern bool gistfitpage(IndexTuple *itvec, int len);
+extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete);
 extern void gistcheckpage(Relation rel, Buffer buf);
 extern Buffer gistNewBuffer(Relation r);
 extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup,
                           int len, OffsetNumber off);
-extern bool gistnospace(Page page, IndexTuple *itvec, int len);
 extern IndexTuple *gistextractbuffer(Buffer buffer, int *len /* out */ );
 extern IndexTuple *gistjoinvector(
                           IndexTuple *itvec, int *len,