]> granicus.if.org Git - postgresql/commitdiff
1. full functional WAL for GiST
authorTeodor Sigaev <teodor@sigaev.ru>
Mon, 20 Jun 2005 10:29:37 +0000 (10:29 +0000)
committerTeodor Sigaev <teodor@sigaev.ru>
Mon, 20 Jun 2005 10:29:37 +0000 (10:29 +0000)
2. improve vacuum for gist
   - use FSM
   - full vacuum:
      - reforms parent tuple if it's needed
        ( tuples was deleted on child page or parent tuple remains invalid
          after crash recovery )
      - truncate index file if possible
3. fixes bugs and mistakes

src/backend/access/gist/Makefile
src/backend/access/gist/gist.c
src/backend/access/gist/gistget.c
src/backend/access/gist/gistutil.c
src/backend/access/gist/gistvacuum.c [new file with mode: 0644]
src/backend/access/gist/gistxlog.c
src/include/access/gist.h
src/include/access/gist_private.h
src/include/catalog/catversion.h
src/include/catalog/pg_am.h
src/include/catalog/pg_proc.h

index b22f846a23ddc841bf10cb2fbdb1fa2356631fd9..12f770ddb880693f42fa21453076afae6efd1cee 100644 (file)
@@ -4,7 +4,7 @@
 #    Makefile for access/gist
 #
 # IDENTIFICATION
-#    $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.13 2005/06/14 11:45:13 teodor Exp $
+#    $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.14 2005/06/20 10:29:36 teodor Exp $
 #
 #-------------------------------------------------------------------------
 
@@ -12,7 +12,7 @@ subdir = src/backend/access/gist
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = gist.o gistutil.o gistxlog.o gistget.o gistscan.o
+OBJS = gist.o gistutil.o gistxlog.o gistvacuum.o gistget.o gistscan.o
 
 all: SUBSYS.o
 
index 4e3faccdf92566c4cdd2df8319bac9dde55f23a3..340f6b9b4f1dc17df2ad0982ffc12da65c713ff6 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.119 2005/06/14 11:45:13 teodor Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.120 2005/06/20 10:29:36 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -29,7 +29,7 @@ typedef struct
        GISTSTATE       giststate;
        int                     numindexattrs;
        double          indtuples;
-       MemoryContext tmpCxt;
+       MemoryContext tmpCtx;
 } GISTBuildState;
 
 
@@ -47,37 +47,14 @@ static void gistfindleaf(GISTInsertState *state,
                                GISTSTATE *giststate);
 
 
-typedef struct PageLayout {
-       gistxlogPage    block;
-       OffsetNumber    *list;
-       Buffer          buffer; /* to write after all proceed */
-
-       struct PageLayout *next;
-} PageLayout;
-
-
 #define ROTATEDIST(d) do { \
-       PageLayout *tmp=(PageLayout*)palloc(sizeof(PageLayout)); \
-       memset(tmp,0,sizeof(PageLayout)); \
+       SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
+       memset(tmp,0,sizeof(SplitedPageLayout)); \
        tmp->next = (d); \
        (d)=tmp; \
 } while(0)
        
 
-static IndexTuple *gistSplit(Relation r,
-                 Buffer buffer,
-                 IndexTuple *itup,
-                 int *len,
-                 PageLayout    **dist,
-                 GISTSTATE *giststate);
-
-
-#undef GISTDEBUG
-
-#ifdef GISTDEBUG
-static void gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff);
-#endif
-
 /*
  * Create and return a temporary memory context for use by GiST. We
  * _always_ invoke user-provided methods in a temporary memory
@@ -124,7 +101,7 @@ gistbuild(PG_FUNCTION_ARGS)
        initGISTstate(&buildstate.giststate, index);
 
        /* initialize the root page */
-       buffer = ReadBuffer(index, P_NEW);
+       buffer = gistReadBuffer(index, P_NEW);
        GISTInitBuffer(buffer, F_LEAF);
        if ( !index->rd_istemp ) {
                XLogRecPtr              recptr;
@@ -155,23 +132,20 @@ gistbuild(PG_FUNCTION_ARGS)
         * create a temporary memory context that is reset once for each
         * tuple inserted into the index
         */
-       buildstate.tmpCxt = createTempGistContext();
+       buildstate.tmpCtx = createTempGistContext();
 
        /* do the heap scan */
        reltuples = IndexBuildHeapScan(heap, index, indexInfo,
                                                                   gistbuildCallback, (void *) &buildstate);
 
        /* okay, all heap tuples are indexed */
-       MemoryContextDelete(buildstate.tmpCxt);
+       MemoryContextDelete(buildstate.tmpCtx);
 
        /* since we just counted the # of tuples, may as well update stats */
        IndexCloseAndUpdateStats(heap, reltuples, index, buildstate.indtuples);
 
        freeGISTstate(&buildstate.giststate);
 
-#ifdef GISTDEBUG
-       gist_dumptree(index, 0, GIST_ROOT_BLKNO, 0);
-#endif
        PG_RETURN_VOID();
 }
 
@@ -190,13 +164,13 @@ gistbuildCallback(Relation index,
        IndexTuple      itup;
        GISTENTRY       tmpcentry;
        int                     i;
-       MemoryContext oldCxt;
+       MemoryContext oldCtx;
 
        /* GiST cannot index tuples with leading NULLs */
        if (isnull[0])
                return;
 
-       oldCxt = MemoryContextSwitchTo(buildstate->tmpCxt);
+       oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
 
        /* immediately compress keys to normalize */
        for (i = 0; i < buildstate->numindexattrs; i++)
@@ -226,8 +200,8 @@ gistbuildCallback(Relation index,
        gistdoinsert(index, itup, &buildstate->giststate);
 
        buildstate->indtuples += 1;
-       MemoryContextSwitchTo(oldCxt);
-       MemoryContextReset(buildstate->tmpCxt);
+       MemoryContextSwitchTo(oldCtx);
+       MemoryContextReset(buildstate->tmpCtx);
 }
 
 /*
@@ -251,8 +225,8 @@ gistinsert(PG_FUNCTION_ARGS)
        GISTSTATE       giststate;
        GISTENTRY       tmpentry;
        int                     i;
-       MemoryContext oldCxt;
-       MemoryContext insertCxt;
+       MemoryContext oldCtx;
+       MemoryContext insertCtx;
 
        /*
         * Since GIST is not marked "amconcurrent" in pg_am, caller should
@@ -264,8 +238,8 @@ gistinsert(PG_FUNCTION_ARGS)
        if (isnull[0])
                PG_RETURN_BOOL(false);
 
-       insertCxt = createTempGistContext();
-       oldCxt = MemoryContextSwitchTo(insertCxt);
+       insertCtx = createTempGistContext();
+       oldCtx = MemoryContextSwitchTo(insertCtx);
 
        initGISTstate(&giststate, r);
 
@@ -289,8 +263,8 @@ gistinsert(PG_FUNCTION_ARGS)
 
        /* cleanup */
        freeGISTstate(&giststate);
-       MemoryContextSwitchTo(oldCxt);
-       MemoryContextDelete(insertCxt);
+       MemoryContextSwitchTo(oldCtx);
+       MemoryContextDelete(insertCtx);
 
        PG_RETURN_BOOL(true);
 }
@@ -315,7 +289,6 @@ gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate)
        state.r = r;
        state.key = itup->t_tid;
        state.needInsertComplete = true; 
-       state.xlog_mode = false;
 
        state.stack = (GISTInsertStack*)palloc(sizeof(GISTInsertStack));
        memset( state.stack, 0, sizeof(GISTInsertStack));
@@ -335,80 +308,27 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
                IndexTuple *itvec,
                                   *newitup;
                int                     tlen,olen;
-               PageLayout      *dist=NULL, *ptr;
+               SplitedPageLayout       *dist=NULL, *ptr;
 
-               memset(&dist, 0, sizeof(PageLayout));
                is_splitted = true;
                itvec = gistextractbuffer(state->stack->buffer, &tlen);
                olen=tlen;
                itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
                newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate);
 
-               if ( !state->r->rd_istemp && !state->xlog_mode) {
-                       gistxlogPageSplit       xlrec;
-                       XLogRecPtr              recptr;
-                       XLogRecData             *rdata;
-                       int i, npage = 0, cur=1;
-
-                       ptr=dist;
-                       while( ptr ) {
-                               npage++;
-                               ptr=ptr->next;
-                       }
-
-                       rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + state->ituplen + 2));
-
-                       xlrec.node = state->r->rd_node;
-                       xlrec.origblkno = state->stack->blkno;
-                       xlrec.npage = npage;
-                       xlrec.nitup = state->ituplen;
-                       xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber;
-                       xlrec.key = state->key;
-                       xlrec.pathlen = (uint16)state->pathlen;
-
-                       rdata[0].buffer = InvalidBuffer;
-                       rdata[0].data   = (char *) &xlrec;
-                       rdata[0].len    = sizeof( gistxlogPageSplit );
-                       rdata[0].next   = NULL;
-
-                       if ( state->pathlen>=0 ) {
-                               rdata[0].next   = &(rdata[1]);
-                               rdata[1].buffer = InvalidBuffer;
-                               rdata[1].data   = (char *) (state->path);
-                               rdata[1].len    = sizeof( BlockNumber ) * state->pathlen;
-                               rdata[1].next   = NULL;
-                               cur++;
-                       }
-                       
-                       /* new tuples */        
-                       for(i=0;i<state->ituplen;i++) {
-                               rdata[cur].buffer = InvalidBuffer;
-                               rdata[cur].data   = (char*)(state->itup[i]);
-                               rdata[cur].len  = IndexTupleSize(state->itup[i]);
-                               rdata[cur-1].next = &(rdata[cur]);
-                               cur++;
+               if ( !state->r->rd_istemp ) {
+                       OffsetNumber    noffs=0, offs[ MAXALIGN( sizeof(OffsetNumber) ) / sizeof(OffsetNumber) ];
+                       XLogRecPtr      recptr;
+                       XLogRecData     *rdata;
+       
+                       if ( state->stack->todelete ) {
+                               offs[0] = state->stack->childoffnum;
+                               noffs=1;
                        }
 
-                       /* new page layout */
-                       ptr=dist;
-                       while(ptr) {
-                               rdata[cur].buffer = InvalidBuffer;
-                               rdata[cur].data   = (char*)&(ptr->block);
-                               rdata[cur].len  = sizeof(gistxlogPage);
-                               rdata[cur-1].next = &(rdata[cur]);
-                               cur++;
-
-                               rdata[cur].buffer = InvalidBuffer;
-                               rdata[cur].data   = (char*)(ptr->list);
-                               rdata[cur].len    = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num);
-                               if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num )
-                                       rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len );
-                               rdata[cur-1].next = &(rdata[cur]);
-                               rdata[cur].next=NULL;
-                               cur++;
-                               
-                               ptr=ptr->next;
-                       }
+                       rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
+                               offs, noffs, state->itup, state->ituplen, 
+                               &(state->key), state->path, state->pathlen, dist); 
 
                        START_CRIT_SECTION();
 
@@ -433,57 +353,36 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
                state->ituplen = tlen;                  /* now tlen >= 2 */
 
                if ( state->stack->blkno == GIST_ROOT_BLKNO ) {
-                       gistnewroot(state->r, state->itup, state->ituplen, &(state->key), state->xlog_mode);
+                       gistnewroot(state->r, state->itup, state->ituplen, &(state->key));
                        state->needInsertComplete=false;
                }
-               if ( state->xlog_mode ) 
-                       LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
                ReleaseBuffer(state->stack->buffer);
        }
        else
        {
                /* enough space */
                OffsetNumber off, l;
+               bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
 
                off = (PageIsEmpty(state->stack->page)) ?
                        FirstOffsetNumber
                        :
                        OffsetNumberNext(PageGetMaxOffsetNumber(state->stack->page));
                l = gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, off);
-               if ( !state->r->rd_istemp && !state->xlog_mode) {
-                       gistxlogEntryUpdate     xlrec;
-                       XLogRecPtr              recptr;
-                       XLogRecData             *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( state->ituplen + 2 ) );
-                       int i, cur=0;
-                       
-                       xlrec.node = state->r->rd_node;
-                       xlrec.blkno = state->stack->blkno;
-                       xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber;
-                       xlrec.key = state->key;
-                       xlrec.pathlen = (uint16)state->pathlen;
-
-                       rdata[0].buffer = InvalidBuffer;
-                       rdata[0].data   = (char *) &xlrec;
-                       rdata[0].len    = sizeof( gistxlogEntryUpdate );
-                       rdata[0].next   = NULL;
-
-                       if ( state->pathlen>=0 ) {
-                               rdata[0].next   = &(rdata[1]);
-                               rdata[1].buffer = InvalidBuffer;
-                               rdata[1].data   = (char *) (state->path);
-                               rdata[1].len    = sizeof( BlockNumber ) * state->pathlen;
-                               rdata[1].next   = NULL;
-                               cur++;
+               if ( !state->r->rd_istemp ) {
+                       OffsetNumber    noffs=0, offs[ MAXALIGN( sizeof(OffsetNumber) ) / sizeof(OffsetNumber) ];
+                       XLogRecPtr      recptr;
+                       XLogRecData     *rdata;
+       
+                       if ( state->stack->todelete ) {
+                               offs[0] = state->stack->childoffnum;
+                               noffs=1;
                        }
+       
+                       rdata = formUpdateRdata(state->r->rd_node, state->stack->blkno,
+                               offs, noffs, false, state->itup, state->ituplen, 
+                               &(state->key), state->path, state->pathlen); 
 
-                       for(i=1; i<=state->ituplen; i++) { /* adding tuples */
-                               rdata[i+cur].buffer = InvalidBuffer;
-                               rdata[i+cur].data   = (char*)(state->itup[i-1]);
-                               rdata[i+cur].len        = IndexTupleSize(state->itup[i-1]);
-                               rdata[i+cur].next       = NULL;
-                               rdata[i-1+cur].next = &(rdata[i+cur]);
-                       }       
-                       
                        START_CRIT_SECTION();
 
                        recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
@@ -495,9 +394,6 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
 
                if ( state->stack->blkno == GIST_ROOT_BLKNO ) 
                         state->needInsertComplete=false;
-
-               if ( state->xlog_mode ) 
-                       LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
                WriteBuffer(state->stack->buffer);
 
                if (state->ituplen > 1)
@@ -507,9 +403,14 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
                         * parent
                         */
                        IndexTuple      newtup = gistunion(state->r, state->itup, state->ituplen, giststate);
-                       ItemPointerSet(&(newtup->t_tid), state->stack->blkno, FirstOffsetNumber);
+                       ItemPointerSetBlockNumber(&(newtup->t_tid), state->stack->blkno);
                        state->itup[0] = newtup;
                        state->ituplen = 1;
+               } else if (is_leaf) {
+                       /* itup[0] store key to adjust parent, we set it to valid
+                          to correct check by GistTupleIsInvalid macro in gistgetadjusted() */  
+                       ItemPointerSetBlockNumber(&(state->itup[0]->t_tid), state->stack->blkno);
+                       GistTupleSetValid( state->itup[0] );
                }
        }
        return is_splitted;
@@ -524,13 +425,10 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
 
        /* walk down */
        while( true ) { 
-               GISTPageOpaque opaque;
-
-               state->stack->buffer = ReadBuffer(state->r, state->stack->blkno);
+               state->stack->buffer = gistReadBuffer(state->r, state->stack->blkno);
                state->stack->page = (Page) BufferGetPage(state->stack->buffer);
-               opaque = (GISTPageOpaque) PageGetSpecialPointer(state->stack->page);
-       
-               if (!(opaque->flags & F_LEAF))
+
+               if (!GistPageIsLeaf(state->stack->page))
                {
                        /*
                        * This is an internal page, so continue to walk down the
@@ -564,7 +462,7 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
                state->pathlen++;
                ptr=ptr->parent;
        }
-       state->path=(BlockNumber*)palloc(sizeof(BlockNumber)*state->pathlen);
+       state->path=(BlockNumber*)palloc(MAXALIGN(sizeof(BlockNumber)*state->pathlen));
        ptr = state->stack;
        state->pathlen=0;
        while( ptr ) {
@@ -591,7 +489,7 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) {
                  * then itup contains additional for adjustment of current key
                  */
 
-               is_splitted = gistplacetopage(state, giststate );
+               is_splitted = gistplacetopage(state, giststate);
 
                /* pop page from stack */
                state->stack = state->stack->parent;
@@ -623,6 +521,7 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) {
                 * an insert in a child node. Therefore, remove the old
                 * version of this node's key.
                 */
+
                gistadjscans(state->r, GISTOP_DEL, state->stack->blkno, state->stack->childoffnum);
                PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
                if ( !state->r->rd_istemp ) 
@@ -639,42 +538,32 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) {
 
        /* release all buffers */
        while( state->stack ) {
-               if ( state->xlog_mode ) 
-                       LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
                ReleaseBuffer(state->stack->buffer);
                state->stack = state->stack->parent;
        }
 
        /* say to xlog that insert is completed */
-       if ( !state->xlog_mode && state->needInsertComplete && !state->r->rd_istemp ) {
-               gistxlogInsertComplete  xlrec;
-               XLogRecData             rdata;
-                       
-               xlrec.node = state->r->rd_node;
-               xlrec.key = state->key;
-                       
-               rdata.buffer = InvalidBuffer;
-               rdata.data   = (char *) &xlrec;
-               rdata.len    = sizeof( gistxlogInsertComplete );
-               rdata.next   = NULL;
-
-               START_CRIT_SECTION();
+       if ( state->needInsertComplete && !state->r->rd_istemp )
+               gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1); 
+}
 
-               XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, &rdata);
+static void 
+gistToRealOffset(OffsetNumber *arr, int len, OffsetNumber *reasloffset) {
+       int i;
 
-               END_CRIT_SECTION();
-       }
+       for(i=0;i<len;i++)
+               arr[i] = reasloffset[ arr[i] ]; 
 }
 
 /*
  *     gistSplit -- split a page in the tree.
  */
-static IndexTuple *
+IndexTuple *
 gistSplit(Relation r,
                  Buffer buffer,
                  IndexTuple *itup,             /* contains compressed entry */
                  int *len,
-                 PageLayout    **dist,
+                 SplitedPageLayout     **dist,
                  GISTSTATE *giststate)
 {
        Page            p;
@@ -690,8 +579,11 @@ gistSplit(Relation r,
        GISTPageOpaque opaque;
        GIST_SPLITVEC v;
        GistEntryVector *entryvec;
-       int                     i,
+       int                     i, fakeoffset,
                                nlen;
+       OffsetNumber    *realoffset;
+       IndexTuple      *cleaneditup = itup;
+       int     lencleaneditup = *len;
 
        p = (Page) BufferGetPage(buffer);
        opaque = (GISTPageOpaque) PageGetSpecialPointer(p);
@@ -703,8 +595,8 @@ gistSplit(Relation r,
         */
        if (BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO)
        {
-               leftbuf = ReadBuffer(r, P_NEW);
-               GISTInitBuffer(leftbuf, opaque->flags);
+               leftbuf = gistReadBuffer(r, P_NEW);
+               GISTInitBuffer(leftbuf, opaque->flags&F_LEAF);
                lbknum = BufferGetBlockNumber(leftbuf);
                left = (Page) BufferGetPage(leftbuf);
        }
@@ -716,74 +608,99 @@ gistSplit(Relation r,
                left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData));
        }
 
-       rightbuf = ReadBuffer(r, P_NEW);
-       GISTInitBuffer(rightbuf, opaque->flags);
+       rightbuf = gistReadBuffer(r, P_NEW);
+       GISTInitBuffer(rightbuf, opaque->flags&F_LEAF);
        rbknum = BufferGetBlockNumber(rightbuf);
        right = (Page) BufferGetPage(rightbuf);
 
        /* generate the item array */
+       realoffset = palloc((*len + 1) * sizeof(OffsetNumber));
        entryvec = palloc(GEVHDRSZ + (*len + 1) * sizeof(GISTENTRY));
        entryvec->n = *len + 1;
 
+       fakeoffset = FirstOffsetNumber;
        for (i = 1; i <= *len; i++)
        {
                Datum           datum;
                bool            IsNull;
 
+               if (!GistPageIsLeaf(p) && GistTupleIsInvalid( itup[i - 1] )) {
+                       entryvec->n--;
+                       /* remember position of invalid tuple */
+                       realoffset[ entryvec->n ] = i;
+                       continue;
+               }
+
                datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull);
-               gistdentryinit(giststate, 0, &(entryvec->vector[i]),
+               gistdentryinit(giststate, 0, &(entryvec->vector[fakeoffset]),
                                           datum, r, p, i,
                                           ATTSIZE(datum, giststate->tupdesc, 1, IsNull),
                                           FALSE, IsNull);
+               realoffset[ fakeoffset ] = i;
+               fakeoffset++;
        }
 
-       /*
-        * now let the user-defined picksplit function set up the split
-        * vector; in entryvec have no null value!!
-        */
-       FunctionCall2(&giststate->picksplitFn[0],
-                                 PointerGetDatum(entryvec),
-                                 PointerGetDatum(&v));
-
-       /* compatibility with old code */
-       if (v.spl_left[v.spl_nleft - 1] == InvalidOffsetNumber)
-               v.spl_left[v.spl_nleft - 1] = (OffsetNumber) *len;
-       if (v.spl_right[v.spl_nright - 1] == InvalidOffsetNumber)
-               v.spl_right[v.spl_nright - 1] = (OffsetNumber) *len;
-
-       v.spl_lattr[0] = v.spl_ldatum;
-       v.spl_rattr[0] = v.spl_rdatum;
-       v.spl_lisnull[0] = false;
-       v.spl_risnull[0] = false;
-
-       /*
-        * if index is multikey, then we must to try get smaller bounding box
-        * for subkey(s)
-        */
-       if (r->rd_att->natts > 1)
-       {
-               int                     MaxGrpId;
-
-               v.spl_idgrp = (int *) palloc0(sizeof(int) * (*len + 1));
-               v.spl_grpflag = (char *) palloc0(sizeof(char) * (*len + 1));
-               v.spl_ngrp = (int *) palloc(sizeof(int) * (*len + 1));
-
-               MaxGrpId = gistfindgroup(giststate, entryvec->vector, &v);
-
-               /* form union of sub keys for each page (l,p) */
-               gistunionsubkey(r, giststate, itup, &v);
-
-               /*
-                * if possible, we insert equivalent tuples with control by
-                * penalty for a subkey(s)
-                */
-               if (MaxGrpId > 1)
-                       gistadjsubkey(r, itup, len, &v, giststate);
+       /* 
+         * if it was invalid tuple then we need special processing. If
+        * it's possible, we move all invalid tuples on right page.
+         * We should remember, that union with invalid tuples 
+        * is a invalid tuple. 
+         */
+       if ( entryvec->n != *len + 1 ) {
+               lencleaneditup = entryvec->n-1;
+               cleaneditup = (IndexTuple*)palloc(lencleaneditup * sizeof(IndexTuple));
+               for(i=1;i<entryvec->n;i++)
+                       cleaneditup[i-1] = itup[ realoffset[ i ]-1 ];
+
+               if ( gistnospace( left, cleaneditup, lencleaneditup ) ) {
+                       /* no space on left to put all good tuples, so picksplit */ 
+                       gistUserPicksplit(r, entryvec, &v, cleaneditup, lencleaneditup, giststate);
+                       v.spl_leftvalid = true;
+                       v.spl_rightvalid = false;
+                       gistToRealOffset( v.spl_left, v.spl_nleft, realoffset );
+                       gistToRealOffset( v.spl_right, v.spl_nright, realoffset );
+                } else { 
+                       /* we can try to store all valid tuples on one page */ 
+                       v.spl_right = (OffsetNumber*)palloc( entryvec->n * sizeof(OffsetNumber) );
+                       v.spl_left = (OffsetNumber*)palloc( entryvec->n * sizeof(OffsetNumber) );
+
+                       if ( lencleaneditup==0 ) {
+                               /* all tuples are invalid, so moves half of its to right */
+                               v.spl_leftvalid = v.spl_rightvalid = false;
+                               v.spl_nright = 0;
+                               v.spl_nleft = 0;
+                               for(i=1;i<=*len;i++) 
+                                       if ( i-1<*len/2 )  
+                                               v.spl_left[ v.spl_nleft++ ] = i;
+                                       else
+                                               v.spl_right[ v.spl_nright++ ] = i;
+                       } else { 
+                               /* we will not call gistUserPicksplit, just put good
+                                  tuples on left and invalid on right */
+                               v.spl_nleft = lencleaneditup;
+                               v.spl_nright = 0;
+                               for(i=1;i<entryvec->n;i++)
+                                       v.spl_left[i-1] = i; 
+                               gistToRealOffset( v.spl_left, v.spl_nleft, realoffset );
+                               v.spl_lattr[0] = v.spl_ldatum = (Datum)0;
+                               v.spl_rattr[0] = v.spl_rdatum = (Datum)0;
+                               v.spl_lisnull[0] = true;
+                               v.spl_risnull[0] = true;
+                               gistunionsubkey(r, giststate, itup, &v, true);
+                               v.spl_leftvalid = true;
+                               v.spl_rightvalid = false;
+                       }
+               }
+       } else {
+               /* there is no invalid tuples, so usial processing */ 
+               gistUserPicksplit(r, entryvec, &v, itup, *len, giststate);
+               v.spl_leftvalid = v.spl_rightvalid = true;
        }
 
+
        /* form left and right vector */
-       lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nleft);
-       rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nright);
+       lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len+1));
+       rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len+1));
 
        for (i = 0; i < v.spl_nleft; i++)
                lvectup[i] = itup[v.spl_left[i] - 1];
@@ -791,12 +708,16 @@ gistSplit(Relation r,
        for (i = 0; i < v.spl_nright; i++)
                rvectup[i] = itup[v.spl_right[i] - 1];
 
+       /* place invalid tuples on right page if itsn't done yet */
+       for (fakeoffset = entryvec->n; fakeoffset < *len+1 && lencleaneditup; fakeoffset++) {
+               rvectup[v.spl_nright++] = itup[realoffset[fakeoffset] - 1];
+       }
 
        /* write on disk (may need another split) */
        if (gistnospace(right, rvectup, v.spl_nright))
        {
                int i;
-               PageLayout *d, *origd=*dist;
+               SplitedPageLayout *d, *origd=*dist;
        
                nlen = v.spl_nright;
                newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate);
@@ -824,8 +745,9 @@ gistSplit(Relation r,
  
                nlen = 1;
                newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1);
-               newtup[0] = gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull);
-               ItemPointerSet(&(newtup[0]->t_tid), rbknum, FirstOffsetNumber);
+               newtup[0] = ( v.spl_rightvalid ) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull)
+                               : gist_form_invalid_tuple( rbknum );
+               ItemPointerSetBlockNumber(&(newtup[0]->t_tid), rbknum);
        }
 
        if (gistnospace(left, lvectup, v.spl_nleft))
@@ -833,7 +755,7 @@ gistSplit(Relation r,
                int                     llen = v.spl_nleft;
                IndexTuple *lntup;
                int i;
-               PageLayout *d, *origd=*dist;
+               SplitedPageLayout *d, *origd=*dist;
 
                lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate);
 
@@ -867,49 +789,35 @@ gistSplit(Relation r,
  
                nlen += 1;
                newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen);
-               newtup[nlen - 1] = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull);
-               ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, FirstOffsetNumber);
+               newtup[nlen - 1] = ( v.spl_leftvalid ) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull)
+                               : gist_form_invalid_tuple( lbknum );
+               ItemPointerSetBlockNumber(&(newtup[nlen - 1]->t_tid), lbknum);
        }
 
+       GistClearTuplesDeleted(p);
        *len = nlen;
        return newtup;
 }
 
 void
-gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode)
+gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key)
 {
        Buffer          buffer;
        Page            page;
 
-       buffer = (xlog_mode) ? XLogReadBuffer(false, r, GIST_ROOT_BLKNO) : ReadBuffer(r, GIST_ROOT_BLKNO);
+       buffer = gistReadBuffer(r, GIST_ROOT_BLKNO);
        GISTInitBuffer(buffer, 0);
        page = BufferGetPage(buffer);
 
        gistfillbuffer(r, page, itup, len, FirstOffsetNumber);
-       if ( !xlog_mode && !r->rd_istemp ) {
-               gistxlogEntryUpdate     xlrec;
+       if ( !r->rd_istemp ) {
                XLogRecPtr              recptr;
-               XLogRecData             *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( len + 1 ) );
-               int i;
+               XLogRecData             *rdata;
                        
-               xlrec.node = r->rd_node;
-               xlrec.blkno = GIST_ROOT_BLKNO;
-               xlrec.todeleteoffnum = InvalidOffsetNumber;
-               xlrec.key = *key;
-               xlrec.pathlen=0;
-                       
-               rdata[0].buffer = InvalidBuffer;
-               rdata[0].data   = (char *) &xlrec;
-               rdata[0].len    = sizeof( gistxlogEntryUpdate );
-               rdata[0].next   = NULL;
-
-               for(i=1; i<=len; i++) {
-                       rdata[i].buffer = InvalidBuffer;
-                       rdata[i].data   = (char*)(itup[i-1]);
-                       rdata[i].len    = IndexTupleSize(itup[i-1]);
-                       rdata[i].next   = NULL;
-                       rdata[i-1].next = &(rdata[i]);
-               }       
+               rdata = formUpdateRdata(r->rd_node, GIST_ROOT_BLKNO,
+                       NULL, 0, false, itup, len, 
+                       key, NULL, 0); 
                        
                START_CRIT_SECTION();
 
@@ -919,118 +827,9 @@ gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mo
 
                END_CRIT_SECTION();
        }
-       if ( xlog_mode ) 
-               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
        WriteBuffer(buffer);
 }
 
-
-/*
- * Bulk deletion of all index entries pointing to a set of heap tuples.
- * The set of target tuples is specified via a callback routine that tells
- * whether any given heap tuple (identified by ItemPointer) is being deleted.
- *
- * Result: a palloc'd struct containing statistical info for VACUUM displays.
- */
-Datum
-gistbulkdelete(PG_FUNCTION_ARGS)
-{
-       Relation        rel = (Relation) PG_GETARG_POINTER(0);
-       IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1);
-       void       *callback_state = (void *) PG_GETARG_POINTER(2);
-       IndexBulkDeleteResult *result;
-       BlockNumber num_pages;
-       double          tuples_removed;
-       double          num_index_tuples;
-       IndexScanDesc iscan;
-
-       tuples_removed = 0;
-       num_index_tuples = 0;
-
-       /*
-        * Since GIST is not marked "amconcurrent" in pg_am, caller should
-        * have acquired exclusive lock on index relation.      We need no locking
-        * here.
-        */
-
-       /*
-        * XXX generic implementation --- should be improved!
-        */
-
-       /* walk through the entire index */
-       iscan = index_beginscan(NULL, rel, SnapshotAny, 0, NULL);
-       /* including killed tuples */
-       iscan->ignore_killed_tuples = false;
-
-       while (index_getnext_indexitem(iscan, ForwardScanDirection))
-       {
-               vacuum_delay_point();
-
-               if (callback(&iscan->xs_ctup.t_self, callback_state))
-               {
-                       ItemPointerData indextup = iscan->currentItemData;
-                       BlockNumber blkno;
-                       OffsetNumber offnum;
-                       Buffer          buf;
-                       Page            page;
-
-                       blkno = ItemPointerGetBlockNumber(&indextup);
-                       offnum = ItemPointerGetOffsetNumber(&indextup);
-
-                       /* adjust any scans that will be affected by this deletion */
-                       gistadjscans(rel, GISTOP_DEL, blkno, offnum);
-
-                       /* delete the index tuple */
-                       buf = ReadBuffer(rel, blkno);
-                       page = BufferGetPage(buf);
-
-                       PageIndexTupleDelete(page, offnum);
-                       if ( !rel->rd_istemp ) {
-                               gistxlogEntryUpdate     xlrec;
-                               XLogRecPtr              recptr;
-                               XLogRecData             rdata;
-                       
-                               xlrec.node = rel->rd_node;
-                               xlrec.blkno = blkno;
-                               xlrec.todeleteoffnum = offnum;
-                               xlrec.pathlen=0;
-                               ItemPointerSetInvalid( &(xlrec.key) );
-                       
-                               rdata.buffer = InvalidBuffer;
-                               rdata.data   = (char *) &xlrec;
-                               rdata.len    = sizeof( gistxlogEntryUpdate );
-                               rdata.next   = NULL;
-
-                               START_CRIT_SECTION();
-
-                               recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_DELETE, &rdata);
-                               PageSetLSN(page, recptr);
-                               PageSetTLI(page, ThisTimeLineID);
-
-                               END_CRIT_SECTION();
-                       }
-
-                       WriteBuffer(buf);
-
-                       tuples_removed += 1;
-               }
-               else
-                       num_index_tuples += 1;
-       }
-
-       index_endscan(iscan);
-
-       /* return statistics */
-       num_pages = RelationGetNumberOfBlocks(rel);
-
-       result = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
-       result->num_pages = num_pages;
-       result->num_index_tuples = num_index_tuples;
-       result->tuples_removed = tuples_removed;
-
-       PG_RETURN_POINTER(result);
-}
-
 void
 initGISTstate(GISTSTATE *giststate, Relation index)
 {
@@ -1074,49 +873,3 @@ freeGISTstate(GISTSTATE *giststate)
        /* no work */
 }
 
-#ifdef GISTDEBUG
-static void
-gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff)
-{
-       Buffer          buffer;
-       Page            page;
-       GISTPageOpaque opaque;
-       IndexTuple      which;
-       ItemId          iid;
-       OffsetNumber i,
-                               maxoff;
-       BlockNumber cblk;
-       char       *pred;
-
-       pred = (char *) palloc(sizeof(char) * level + 1);
-       MemSet(pred, '\t', level);
-       pred[level] = '\0';
-
-       buffer = ReadBuffer(r, blk);
-       page = (Page) BufferGetPage(buffer);
-       opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
-
-       maxoff = PageGetMaxOffsetNumber(page);
-
-       elog(DEBUG4, "%sPage: %d %s blk: %d maxoff: %d free: %d", pred,
-                coff, (opaque->flags & F_LEAF) ? "LEAF" : "INTE", (int) blk,
-                (int) maxoff, PageGetFreeSpace(page));
-
-       for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
-       {
-               iid = PageGetItemId(page, i);
-               which = (IndexTuple) PageGetItem(page, iid);
-               cblk = ItemPointerGetBlockNumber(&(which->t_tid));
-#ifdef PRINTTUPLE
-               elog(DEBUG4, "%s  Tuple. blk: %d size: %d", pred, (int) cblk,
-                        IndexTupleSize(which));
-#endif
-
-               if (!(opaque->flags & F_LEAF))
-                       gist_dumptree(r, level + 1, cblk, i);
-       }
-       ReleaseBuffer(buffer);
-       pfree(pred);
-}
-#endif   /* defined GISTDEBUG */
-
index 5b9a94471b10816aeebc07c54ab601d924d2c3b3..4bce9962f3a3962afbaf562e2dbd1b0ec1f8070e 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.48 2005/06/14 11:45:13 teodor Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.49 2005/06/20 10:29:36 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -112,7 +112,6 @@ gistnext(IndexScanDesc scan, ScanDirection dir)
 {
        Page            p;
        OffsetNumber n;
-       GISTPageOpaque po;
        GISTScanOpaque so;
        GISTSTACK  *stk;
        IndexTuple      it;
@@ -127,7 +126,6 @@ gistnext(IndexScanDesc scan, ScanDirection dir)
        }
 
        p = BufferGetPage(so->curbuf);
-       po = (GISTPageOpaque) PageGetSpecialPointer(p);
 
        if (ItemPointerIsValid(&scan->currentItemData) == false)
        {
@@ -169,7 +167,6 @@ gistnext(IndexScanDesc scan, ScanDirection dir)
                        so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation,
                                                                                          stk->block);
                        p = BufferGetPage(so->curbuf);
-                       po = (GISTPageOpaque) PageGetSpecialPointer(p);
 
                        if (ScanDirectionIsBackward(dir))
                                n = OffsetNumberPrev(stk->offset);
@@ -182,7 +179,7 @@ gistnext(IndexScanDesc scan, ScanDirection dir)
                        continue;
                }
 
-               if (po->flags & F_LEAF)
+               if (GistPageIsLeaf(p))
                {
                        /*
                         * We've found a matching index entry in a leaf page, so
@@ -219,7 +216,6 @@ gistnext(IndexScanDesc scan, ScanDirection dir)
                        so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation,
                                                                                          child_block);
                        p = BufferGetPage(so->curbuf);
-                       po = (GISTPageOpaque) PageGetSpecialPointer(p);
 
                        if (ScanDirectionIsBackward(dir))
                                n = PageGetMaxOffsetNumber(p);
@@ -256,6 +252,12 @@ gistindex_keytest(IndexTuple tuple,
 
        IncrIndexProcessed();
 
+       /*
+         * Tuple doesn't restore after crash recovery because of inclomplete insert 
+         */
+       if ( !GistPageIsLeaf(p) && GistTupleIsInvalid(tuple) ) 
+               return true;
+
        while (keySize > 0)
        {
                Datum           datum;
@@ -317,7 +319,6 @@ gistfindnext(IndexScanDesc scan, OffsetNumber n, ScanDirection dir)
 {
        OffsetNumber    maxoff;
        IndexTuple              it;
-       GISTPageOpaque  po;
        GISTScanOpaque  so;
        MemoryContext   oldcxt;
        Page                    p;
@@ -325,7 +326,6 @@ gistfindnext(IndexScanDesc scan, OffsetNumber n, ScanDirection dir)
        so = (GISTScanOpaque) scan->opaque;
        p = BufferGetPage(so->curbuf);
        maxoff = PageGetMaxOffsetNumber(p);
-       po = (GISTPageOpaque) PageGetSpecialPointer(p);
 
        /*
         * Make sure we're in a short-lived memory context when we invoke
index 44391f9f738b820a0b51c37d894e868fb4778149..735be85f25702203b700f707031baa4af593e6d8 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *          $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.1 2005/06/14 11:45:13 teodor Exp $
+ *          $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.2 2005/06/20 10:29:36 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -19,6 +19,7 @@
 #include "access/heapam.h"
 #include "catalog/index.h"
 #include "miscadmin.h"
+#include "storage/freespace.h"
 
 /* group flags ( in gistadjsubkey ) */
 #define LEFT_ADDED      0x01
@@ -132,9 +133,14 @@ gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
        GistEntryVector *evec;
        int                     i;
        GISTENTRY       centry[INDEX_MAX_KEYS];
+       IndexTuple      res;
 
        evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
 
+       for(i = 0; i<len; i++) 
+               if ( GistTupleIsInvalid( itvec[i] ) )
+                       return gist_form_invalid_tuple( InvalidBlockNumber );   
+
        for (i = 0; i < r->rd_att->natts; i++)
        {
                Datum           datum;
@@ -191,7 +197,9 @@ gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
                }
        }
 
-       return index_form_tuple(giststate->tupdesc, attr, isnull);
+       res = index_form_tuple(giststate->tupdesc, attr, isnull);
+       GistTupleSetValid( res );
+       return res;
 }
 
 
@@ -215,11 +223,15 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
        IndexTuple      newtup = NULL;
        int                     i;
 
+       if ( GistTupleIsInvalid(oldtup) || GistTupleIsInvalid(addtup) )
+               return gist_form_invalid_tuple( ItemPointerGetBlockNumber( &(oldtup->t_tid) ) ); 
        evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
        evec->n = 2;
        ev0p = &(evec->vector[0]);
        ev1p = &(evec->vector[1]);
 
+
        gistDeCompressAtt(giststate, r, oldtup, NULL,
                                          (OffsetNumber) 0, oldatt, oldisnull);
 
@@ -283,7 +295,7 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
 }
 
 void
-gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
+gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl, bool isall)
 {
        int lr;
 
@@ -314,9 +326,9 @@ gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITV
                        isnull = spl->spl_risnull;
                }
 
-               evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
+               evec = palloc(((len < 2) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
 
-               for (i = 1; i < r->rd_att->natts; i++)
+               for (i = (isall) ? 0 : 1; i < r->rd_att->natts; i++)
                {
                        int                     j;
                        Datum           datum;
@@ -448,7 +460,7 @@ gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
 void
 gistadjsubkey(Relation r,
                          IndexTuple *itup, /* contains compressed entry */
-                         int *len,
+                         int len,
                          GIST_SPLITVEC *v,
                          GISTSTATE *giststate)
 {
@@ -501,7 +513,7 @@ gistadjsubkey(Relation r,
        ev1p = &(evec->vector[1]);
 
        /* add equivalent tuple */
-       for (i = 0; i < *len; i++)
+       for (i = 0; i < len; i++)
        {
                Datum           datum;
 
@@ -617,7 +629,7 @@ gistchoose(Relation r, Page p, IndexTuple it,       /* it has compressed entry */
 
        maxoff = PageGetMaxOffsetNumber(p);
        *which_grow = -1.0;
-       which = -1;
+       which = InvalidOffsetNumber;
        sum_grow = 1;
        gistDeCompressAtt(giststate, r,
                                          it, NULL, (OffsetNumber) 0,
@@ -627,6 +639,12 @@ gistchoose(Relation r, Page p, IndexTuple it,      /* it has compressed entry */
        {
                int                     j;
                IndexTuple      itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
+               
+               if ( !GistPageIsLeaf(p) && GistTupleIsInvalid(itup) ) {
+                       elog(LOG, "It's desirable to vacuum or reindex GiST index '%s' due to crash recovery", 
+                               RelationGetRelationName(r));
+                       continue; 
+               }
 
                sum_grow = 0;
                for (j = 0; j < r->rd_att->natts; j++)
@@ -660,6 +678,9 @@ gistchoose(Relation r, Page p, IndexTuple it,       /* it has compressed entry */
                }
        }
 
+       if ( which == InvalidOffsetNumber )
+               which = FirstOffsetNumber;
+
        return which;
 }
 
@@ -721,6 +742,7 @@ gistFormTuple(GISTSTATE *giststate, Relation r,
        GISTENTRY       centry[INDEX_MAX_KEYS];
        Datum           compatt[INDEX_MAX_KEYS];
        int                     i;
+       IndexTuple      res;
 
        for (i = 0; i < r->rd_att->natts; i++)
        {
@@ -735,7 +757,9 @@ gistFormTuple(GISTSTATE *giststate, Relation r,
                }
        }
 
-       return index_form_tuple(giststate->tupdesc, compatt, isnull);
+       res = index_form_tuple(giststate->tupdesc, compatt, isnull);
+       GistTupleSetValid(res);
+       return res;
 }
 
 void
@@ -783,3 +807,79 @@ GISTInitBuffer(Buffer b, uint32 f)
        opaque->flags = f;
 }
 
+void
+gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v, 
+               IndexTuple *itup, int len, GISTSTATE *giststate) {
+       /*
+        * now let the user-defined picksplit function set up the split
+        * vector; in entryvec have no null value!!
+        */
+       FunctionCall2(&giststate->picksplitFn[0],
+                                 PointerGetDatum(entryvec),
+                                 PointerGetDatum(v));
+
+       /* compatibility with old code */
+       if (v->spl_left[v->spl_nleft - 1] == InvalidOffsetNumber)
+               v->spl_left[v->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1);
+       if (v->spl_right[v->spl_nright - 1] == InvalidOffsetNumber)
+               v->spl_right[v->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1);
+
+       v->spl_lattr[0] = v->spl_ldatum;
+       v->spl_rattr[0] = v->spl_rdatum;
+       v->spl_lisnull[0] = false;
+       v->spl_risnull[0] = false;
+
+       /*
+        * if index is multikey, then we must to try get smaller bounding box
+        * for subkey(s)
+        */
+       if (r->rd_att->natts > 1)
+       {
+               int                     MaxGrpId;
+
+               v->spl_idgrp = (int *) palloc0(sizeof(int) * entryvec->n);
+               v->spl_grpflag = (char *) palloc0(sizeof(char) * entryvec->n);
+               v->spl_ngrp = (int *) palloc(sizeof(int) * entryvec->n);
+
+               MaxGrpId = gistfindgroup(giststate, entryvec->vector, v);
+
+               /* form union of sub keys for each page (l,p) */
+               gistunionsubkey(r, giststate, itup, v, false);
+
+               /*
+                * if possible, we insert equivalent tuples with control by
+                * penalty for a subkey(s)
+                */
+               if (MaxGrpId > 1)
+                       gistadjsubkey(r, itup, len, v, giststate);
+       }
+}
+
+Buffer  
+gistReadBuffer(Relation r, BlockNumber blkno) {
+       Buffer buffer = InvalidBuffer;
+
+       if ( blkno != P_NEW ) {
+               buffer = ReadBuffer(r, blkno);
+       } else {
+               Page page;
+
+               while(true) {
+                       blkno = GetFreeIndexPage(&r->rd_node);
+                       if (blkno == InvalidBlockNumber)
+                               break;
+
+                       buffer = ReadBuffer(r, blkno);
+                       page = BufferGetPage(buffer);
+                       if ( GistPageIsDeleted( page ) ) {
+                               GistPageSetNonDeleted( page );
+                               return buffer;
+                       }
+                       ReleaseBuffer( buffer );
+               }
+
+               buffer = ReadBuffer(r, P_NEW); 
+       }
+       
+       return buffer;
+}
diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c
new file mode 100644 (file)
index 0000000..8f8e7f7
--- /dev/null
@@ -0,0 +1,519 @@
+/*-------------------------------------------------------------------------
+ *
+ * gistvacuum.c
+ *       interface routines for the postgres GiST index access method.
+ *
+ *
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *       $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.1 2005/06/20 10:29:36 teodor Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/gist_private.h"
+#include "access/gistscan.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "utils/memutils.h"
+#include "storage/freespace.h"
+#include "storage/smgr.h"
+
+/* filled by gistbulkdelete, cleared by gistvacuumpcleanup */ 
+static bool needFullVacuum = false; 
+
+
+typedef struct {
+       GISTSTATE       giststate;
+       Relation        index;
+       MemoryContext   opCtx;
+       IndexBulkDeleteResult   *result;
+
+       /* path to root */
+       BlockNumber     *path;
+       int             pathlen;
+       int             curpathlen;
+} GistVacuum;
+
+static void
+shiftPath(GistVacuum *gv, BlockNumber blkno) {
+       if ( gv->pathlen == 0 ) {
+               gv->pathlen = 8;
+               gv->path = (BlockNumber*) palloc( MAXALIGN(sizeof(BlockNumber)*gv->pathlen) );
+       } else if ( gv->pathlen == gv->curpathlen ) {
+               gv->pathlen *= 2;
+               gv->path = (BlockNumber*) repalloc( gv->path, MAXALIGN(sizeof(BlockNumber)*gv->pathlen) );
+       }
+
+       if ( gv->curpathlen )
+               memmove( gv->path+1, gv->path, sizeof(BlockNumber)*gv->curpathlen ); 
+       gv->curpathlen++;
+       gv->path[0] = blkno;
+}
+
+static void
+unshiftPath(GistVacuum *gv) {
+       gv->curpathlen--;
+       if ( gv->curpathlen )
+               memmove( gv->path, gv->path+1, sizeof(BlockNumber)*gv->curpathlen );
+} 
+
+typedef struct {
+       IndexTuple      *itup;
+       int             ituplen;
+       bool            emptypage;
+} ArrayTuple;
+
+
+static ArrayTuple
+gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
+       ArrayTuple      res = {NULL, 0, false};
+       Buffer          buffer;
+       Page            page;
+       OffsetNumber    i, maxoff;
+       ItemId          iid;
+       int             lenaddon=4, curlenaddon=0, ntodelete=0;
+       IndexTuple      idxtuple, *addon=NULL;
+       bool            needwrite=false;
+       OffsetNumber    *todelete=NULL;
+       ItemPointerData *completed=NULL;
+       int             ncompleted=0, lencompleted=16;
+
+       buffer = ReadBuffer(gv->index, blkno);
+       page = (Page) BufferGetPage(buffer);
+       maxoff = PageGetMaxOffsetNumber(page);
+
+
+       if ( GistPageIsLeaf(page) ) {
+               if ( GistTuplesDeleted(page) ) {
+                       needunion = needwrite = true;
+                       GistClearTuplesDeleted(page);
+               }
+       } else {
+               todelete = (OffsetNumber*)palloc( MAXALIGN(sizeof(OffsetNumber)*(maxoff+1)) );
+               completed = (ItemPointerData*)palloc( sizeof(ItemPointerData)*lencompleted );
+               addon=(IndexTuple*)palloc(sizeof(IndexTuple)*lenaddon);
+
+               shiftPath(gv, blkno);
+               for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
+                       ArrayTuple chldtuple;
+                       bool needchildunion;
+
+                       iid = PageGetItemId(page, i);
+                       idxtuple = (IndexTuple) PageGetItem(page, iid);
+                       needchildunion = (GistTupleIsInvalid(idxtuple)) ? true : false;
+               
+                       if ( needchildunion ) 
+                               elog(DEBUG2,"gistVacuumUpdate: Need union for block %u", ItemPointerGetBlockNumber(&(idxtuple->t_tid)));
+       
+                       chldtuple = gistVacuumUpdate( gv, ItemPointerGetBlockNumber(&(idxtuple->t_tid)),
+                               needchildunion );
+                       if ( chldtuple.ituplen || chldtuple.emptypage ) {
+                               /* adjust any scans that will be affected by this deletion */
+                               gistadjscans(gv->index, GISTOP_DEL, blkno, i);
+                               PageIndexTupleDelete(page, i);
+                               todelete[ ntodelete++ ] = i;
+                               i--; maxoff--;
+                               needwrite=needunion=true;
+
+                               if ( chldtuple.ituplen ) {
+                                       while( curlenaddon + chldtuple.ituplen >= lenaddon ) {
+                                               lenaddon*=2;
+                                               addon=(IndexTuple*)repalloc( addon, sizeof(IndexTuple)*lenaddon );
+                                       }
+
+                                       memcpy( addon + curlenaddon, chldtuple.itup, chldtuple.ituplen * sizeof(IndexTuple) );
+
+                                       curlenaddon += chldtuple.ituplen;
+
+                                       if ( chldtuple.ituplen > 1 ) {
+                                               /* child was splitted, so we need mark completion insert(split) */
+                                               int j;
+
+                                               while( ncompleted + chldtuple.ituplen > lencompleted ) {
+                                                       lencompleted*=2;
+                                                       completed = (ItemPointerData*)repalloc(completed, sizeof(ItemPointerData) * lencompleted);
+                                               } 
+                                               for(j=0;j<chldtuple.ituplen;j++) {
+                                                       ItemPointerCopy( &(chldtuple.itup[j]->t_tid), completed + ncompleted ); 
+                                                       ncompleted++; 
+                                               }
+                                       }
+                                       pfree( chldtuple.itup );
+                               }
+                       }
+               }
+
+               if ( curlenaddon ) {
+                       /* insert updated tuples */
+                       if (gistnospace(page, addon, curlenaddon)) {
+                               /* there is no space on page to insert tuples */
+                               IndexTuple      *vec;
+                               SplitedPageLayout       *dist=NULL,*ptr;
+                               int i;
+                               MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx); 
+
+                               vec = gistextractbuffer(buffer, &(res.ituplen));
+                               vec = gistjoinvector(vec, &(res.ituplen), addon, curlenaddon);
+                               res.itup = gistSplit(gv->index, buffer, vec, &(res.ituplen), &dist, &(gv->giststate)); 
+                               MemoryContextSwitchTo(oldCtx);
+
+                               vec = (IndexTuple*)palloc( sizeof(IndexTuple) * res.ituplen );
+                               for(i=0;i<res.ituplen;i++) {
+                                       vec[i] = (IndexTuple)palloc( IndexTupleSize(res.itup[i]) );
+                                       memcpy( vec[i], res.itup[i], IndexTupleSize(res.itup[i]) );
+                               }
+                               res.itup = vec; 
+
+                               if ( !gv->index->rd_istemp ) {
+                                       XLogRecPtr              recptr;
+                                       XLogRecData             *rdata;
+                                       ItemPointerData         key; /* set key for incomplete insert */
+
+                                       ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
+       
+                                       oldCtx = MemoryContextSwitchTo(gv->opCtx);
+
+                                       /* path is need to recovery because there is new pages, in a case of
+                                          crash it's needed to add inner tuple pointers on parent page */ 
+                                       rdata = formSplitRdata(gv->index->rd_node, blkno,
+                                               todelete, ntodelete, addon, curlenaddon,
+                                               &key, gv->path, gv->curpathlen, dist);
+
+                                       MemoryContextSwitchTo(oldCtx);
+                                       
+                                       START_CRIT_SECTION();
+                       
+                                       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
+                                       ptr = dist;
+                                       while(ptr) {
+                                               PageSetLSN(BufferGetPage(ptr->buffer), recptr);
+                                               PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
+                                               ptr=ptr->next;
+                                       }
+
+                                       END_CRIT_SECTION();
+                               
+                               }
+
+                               ptr = dist;
+                               while(ptr) {
+                                       WriteBuffer(ptr->buffer);
+                                       ptr=ptr->next;
+                               }
+
+                               if ( blkno == GIST_ROOT_BLKNO ) { 
+                                       ItemPointerData         key; /* set key for incomplete insert */
+
+                                       ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
+
+                                       oldCtx = MemoryContextSwitchTo(gv->opCtx);
+                                       gistnewroot(gv->index, res.itup, res.ituplen, &key);
+                                       MemoryContextSwitchTo(oldCtx);
+                               }
+
+                               needwrite=false;
+                               MemoryContextReset(gv->opCtx);
+
+                               needunion = false; /* gistSplit already forms unions */
+                       } else {
+                               OffsetNumber off = (PageIsEmpty(page)) ?
+                                       FirstOffsetNumber
+                                       :
+                                       OffsetNumberNext(PageGetMaxOffsetNumber(page));
+
+                               /* enough free space */
+                               gistfillbuffer(gv->index, page, addon, curlenaddon, off); 
+                       } 
+               }
+               unshiftPath(gv);
+       }
+
+       if ( needunion ) {
+               /* forms union for page  or check empty*/
+               if ( PageIsEmpty(page) ) {
+                       if ( blkno == GIST_ROOT_BLKNO ) {
+                               needwrite=true;
+                               GistPageSetLeaf( page );
+                       } else {
+                               needwrite=true;
+                               res.emptypage=true;
+                               GistPageSetDeleted( page );
+                               gv->result->pages_deleted++;
+                       }
+               } else {
+                       IndexTuple      *vec, tmp;
+                       int             veclen=0;
+                       MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
+                       vec = gistextractbuffer(buffer, &veclen);
+                       tmp  = gistunion(gv->index, vec, veclen, &(gv->giststate));
+                       MemoryContextSwitchTo(oldCtx);
+
+                       res.itup=(IndexTuple*)palloc( sizeof(IndexTuple) );
+                       res.ituplen = 1;
+                       res.itup[0] = (IndexTuple)palloc( IndexTupleSize(tmp) );
+                       memcpy( res.itup[0], tmp, IndexTupleSize(tmp) );
+
+                       ItemPointerSetBlockNumber(&(res.itup[0]->t_tid), blkno);
+                       GistTupleSetValid( res.itup[0] );        
+               
+                       MemoryContextReset(gv->opCtx);
+               }
+       }
+
+       if ( needwrite ) {
+               if ( !gv->index->rd_istemp ) {
+                       XLogRecData *rdata;
+                       XLogRecPtr      recptr;
+                       MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
+
+                       /* In a vacuum, it's not need to push path, because
+                          there is no new inserted keys */
+                       rdata = formUpdateRdata(gv->index->rd_node, blkno, todelete, ntodelete, 
+                               res.emptypage, addon, curlenaddon, NULL, NULL, 0);
+                       MemoryContextSwitchTo(oldCtx);
+               
+       
+                       START_CRIT_SECTION();
+                       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
+                       PageSetLSN(page, recptr);
+                       PageSetTLI(page, ThisTimeLineID);
+                       END_CRIT_SECTION();
+                       MemoryContextReset(gv->opCtx);
+               }
+               WriteBuffer( buffer );
+       } else
+               ReleaseBuffer( buffer );
+
+       if ( ncompleted && !gv->index->rd_istemp )
+               gistxlogInsertCompletion( gv->index->rd_node, completed, ncompleted );
+
+       for(i=0;i<curlenaddon;i++)
+               pfree( addon[i] );
+       if (addon) pfree(addon);
+       if (todelete) pfree(todelete); 
+       if (completed) pfree(completed); 
+       return res;
+}
+
+/*
+ * For usial vacuum just update FSM, for full vacuum
+ * reforms parent tuples if some of childs was deleted or changed,
+ * update invalid tuples (they can exsist from last crash recovery only),
+ * tries to get smaller index
+ */
+
+Datum
+gistvacuumcleanup(PG_FUNCTION_ARGS) {
+       Relation        rel = (Relation) PG_GETARG_POINTER(0);
+       IndexVacuumCleanupInfo *info = (IndexVacuumCleanupInfo *) PG_GETARG_POINTER(1);
+       IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(2);
+       BlockNumber npages, blkno;
+       BlockNumber nFreePages, *freePages, maxFreePages;
+       BlockNumber lastBlock = GIST_ROOT_BLKNO, lastFilledBlock = GIST_ROOT_BLKNO;     
+
+       /* LockRelation(rel, AccessExclusiveLock); */
+
+       /* gistVacuumUpdate may cause hard work */
+       if ( info->vacuum_full ) {
+               GistVacuum      gv;
+               ArrayTuple      res;
+
+               gv.index = rel;
+               initGISTstate(&(gv.giststate), rel);
+               gv.opCtx = createTempGistContext();
+               gv.result = stats;
+
+               gv.path=NULL;
+               gv.pathlen = gv.curpathlen = 0;
+
+               /* walk through the entire index for update tuples */
+               res = gistVacuumUpdate( &gv, GIST_ROOT_BLKNO, false );
+               /* cleanup */
+               if (res.itup) {
+                       int i;
+                       for(i=0;i<res.ituplen;i++)
+                               pfree( res.itup[i] );
+                       pfree( res.itup );
+               }
+               if ( gv.path )
+                       pfree( gv.path );
+               freeGISTstate(&(gv.giststate));
+               MemoryContextDelete(gv.opCtx);
+       } else if (needFullVacuum) {
+               elog(NOTICE,"It's desirable to vacuum full or reindex GiST index '%s' due to crash recovery", 
+                       RelationGetRelationName(rel));
+       }
+
+       needFullVacuum = false;
+
+       /* try to find deleted pages */
+       npages = RelationGetNumberOfBlocks(rel);
+       maxFreePages = RelationGetNumberOfBlocks(rel);
+       if ( maxFreePages > MaxFSMPages )
+               maxFreePages = MaxFSMPages;
+       nFreePages = 0;
+       freePages = (BlockNumber*) palloc (sizeof(BlockNumber) * maxFreePages);
+       for(blkno=GIST_ROOT_BLKNO+1;blkno<npages;blkno++) {
+               Buffer  buffer = ReadBuffer(rel, blkno);
+               Page    page=(Page)BufferGetPage(buffer);
+
+               if ( GistPageIsDeleted(page) ) {
+                       if (nFreePages < maxFreePages) {
+                               freePages[ nFreePages ] = blkno;
+                               nFreePages++;
+                       }
+               } else
+                       lastFilledBlock = blkno;
+               ReleaseBuffer(buffer);
+       }
+       lastBlock = npages-1;
+               
+       if ( nFreePages > 0 ) {
+               if ( info->vacuum_full ) { /* try to truncate index */
+                       int i;
+                       for(i=0;i<nFreePages;i++)
+                               if ( freePages[i] >= lastFilledBlock ) {
+                                       nFreePages = i;
+                                       break;
+                               }
+       
+                       if ( lastBlock > lastFilledBlock )      
+                               RelationTruncate( rel, lastFilledBlock+1 );
+                       stats->pages_removed = lastBlock - lastFilledBlock;
+               }
+               
+               if ( nFreePages > 0 )
+                       RecordIndexFreeSpace( &rel->rd_node, nFreePages, freePages );
+       }
+       pfree( freePages ); 
+
+       /* return statistics */
+       stats->pages_free = nFreePages;
+       stats->num_pages = RelationGetNumberOfBlocks(rel);
+
+       /* UnlockRelation(rel, AccessExclusiveLock); */
+
+       PG_RETURN_POINTER(stats);
+}
+
+typedef struct GistBDItem {
+       BlockNumber     blkno;
+       struct GistBDItem *next; 
+} GistBDItem;
+
+/*
+ * Bulk deletion of all index entries pointing to a set of heap tuples and
+ * update invalid tuples after crash recovery.
+ * The set of target tuples is specified via a callback routine that tells
+ * whether any given heap tuple (identified by ItemPointer) is being deleted.
+ *
+ * Result: a palloc'd struct containing statistical info for VACUUM displays.
+ */
+Datum
+gistbulkdelete(PG_FUNCTION_ARGS) {
+       Relation        rel = (Relation) PG_GETARG_POINTER(0);
+       IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1);
+       void* callback_state = (void *) PG_GETARG_POINTER(2);
+       IndexBulkDeleteResult   *result = (IndexBulkDeleteResult*)palloc0(sizeof(IndexBulkDeleteResult));       
+       GistBDItem      *stack, *ptr;
+       MemoryContext opCtx = createTempGistContext();
+       
+       stack = (GistBDItem*) palloc(sizeof(GistBDItem));
+
+       stack->blkno = GIST_ROOT_BLKNO;
+       stack->next = NULL;
+       needFullVacuum = false;
+
+       while( stack ) {
+               Buffer buffer = ReadBuffer(rel, stack->blkno);
+               Page   page   = (Page) BufferGetPage(buffer);
+               OffsetNumber i, maxoff = PageGetMaxOffsetNumber(page);
+               IndexTuple      idxtuple;
+               ItemId          iid;
+               OffsetNumber *todelete = NULL;
+               int ntodelete = 0;      
+
+               if ( GistPageIsLeaf(page) ) {
+                       ItemPointerData heapptr;
+
+                       todelete = (OffsetNumber*)palloc( MAXALIGN(sizeof(OffsetNumber)*maxoff) );
+
+                       for(i=FirstOffsetNumber;i<=maxoff;i=OffsetNumberNext(i)) {
+                               iid = PageGetItemId(page, i);   
+                               idxtuple = (IndexTuple) PageGetItem(page, iid);
+                               heapptr = idxtuple->t_tid;
+
+                               if ( callback(&heapptr, callback_state) ) {
+                                       gistadjscans(rel, GISTOP_DEL, stack->blkno, i);
+                                       PageIndexTupleDelete(page, i);
+                                       todelete[ ntodelete++ ] = i;
+                                       i--; maxoff--;
+                                       result->tuples_removed += 1;
+                               } else 
+                                       result->num_index_tuples += 1;
+                       }
+               } else {
+                       for(i=FirstOffsetNumber;i<=maxoff;i=OffsetNumberNext(i)) {
+                               iid = PageGetItemId(page, i);
+                               idxtuple = (IndexTuple) PageGetItem(page, iid);
+
+                               ptr = (GistBDItem*) palloc(sizeof(GistBDItem));
+                               ptr->blkno = ItemPointerGetBlockNumber( &(idxtuple->t_tid) );
+                               ptr->next = stack->next;
+                               stack->next = ptr;
+
+                               if ( GistTupleIsInvalid(idxtuple) )
+                                       needFullVacuum = true;
+                       }
+               }
+
+               if ( ntodelete && todelete ) {
+                       GistMarkTuplesDeleted(page);
+
+                       if (!rel->rd_istemp ) {
+                               XLogRecData *rdata;
+                               XLogRecPtr      recptr;
+                               MemoryContext oldCtx = MemoryContextSwitchTo(opCtx);
+
+                               rdata = formUpdateRdata(rel->rd_node, stack->blkno, todelete, ntodelete,
+                                       false, NULL, 0, NULL, NULL, 0);
+                               MemoryContextSwitchTo(oldCtx);
+
+                               START_CRIT_SECTION();
+                               recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
+                               PageSetLSN(page, recptr);
+                               PageSetTLI(page, ThisTimeLineID);
+                               END_CRIT_SECTION();
+
+                               MemoryContextReset(opCtx);
+                       }
+
+                       WriteBuffer( buffer );
+               } else
+                       ReleaseBuffer( buffer );
+
+               if ( todelete )
+                       pfree( todelete );
+
+               ptr = stack->next;
+               pfree( stack );
+               stack = ptr;
+       }
+
+       MemoryContextDelete( opCtx );
+
+       result->num_pages = RelationGetNumberOfBlocks(rel);
+
+
+       PG_RETURN_POINTER( result );
+}
+
index b99ab24761d9b042a4da27a0f71e0c0965d98c92..b6c0696e1af413689af4a3b3ba89a0f53a0ca948 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *           $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.1 2005/06/14 11:45:13 teodor Exp $
+ *           $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.2 2005/06/20 10:29:36 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 #include "miscadmin.h"
 #include "utils/memutils.h"
 
+
 typedef struct {
        gistxlogEntryUpdate     *data;
        int                     len;
        IndexTuple              *itup;
        BlockNumber             *path;
+       OffsetNumber            *todelete;
 } EntryUpdateRecord;
 
 typedef struct {
@@ -44,6 +46,7 @@ typedef struct {
        NewPage                 *page;
        IndexTuple              *itup;
        BlockNumber             *path;
+       OffsetNumber            *todelete;
 } PageSplitRecord;
 
 /* track for incomplete inserts, idea was taken from nbtxlog.c */
@@ -55,6 +58,7 @@ typedef struct gistIncompleteInsert {
        BlockNumber     *blkno;
        int             pathlen;
        BlockNumber     *path;
+       XLogRecPtr      lsn;
 } gistIncompleteInsert;
 
 
@@ -65,12 +69,12 @@ static List *incomplete_inserts;
 
 #define ItemPointerEQ( a, b )  \
        ( \
-       ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(a) && \
+       ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
        ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \
         )
 
 static void
-pushIncompleteInsert(RelFileNode node, ItemPointerData key,
+pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
                BlockNumber *blkno, int lenblk,
                BlockNumber *path,  int pathlen,
                PageSplitRecord *xlinfo /* to extract blkno info */ ) {
@@ -79,6 +83,7 @@ pushIncompleteInsert(RelFileNode node, ItemPointerData key,
 
        ninsert->node = node;
        ninsert->key  = key;
+       ninsert->lsn  = lsn;
 
        if ( lenblk && blkno ) {        
                ninsert->lenblk = lenblk;
@@ -95,7 +100,7 @@ pushIncompleteInsert(RelFileNode node, ItemPointerData key,
        }
        Assert( ninsert->lenblk>0 );
        
-       if ( path && ninsert->pathlen ) {
+       if ( path && pathlen ) {
                ninsert->pathlen = pathlen;
                ninsert->path = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->pathlen );
                memcpy(ninsert->path, path, sizeof(BlockNumber)*ninsert->pathlen);
@@ -135,11 +140,17 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) {
        decoded->data = (gistxlogEntryUpdate*)begin;
 
        if ( decoded->data->pathlen ) {
-               addpath = sizeof(BlockNumber) * decoded->data->pathlen;
+               addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen );
                decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
        } else 
                decoded->path = NULL;
 
+       if ( decoded->data->ntodelete ) {
+               decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogEntryUpdate ) + addpath);
+               addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete );
+       } else 
+               decoded->todelete = NULL;       
+
        decoded->len=0;
        ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath;
        while( ptr - begin < record->xl_len ) {
@@ -157,7 +168,9 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) {
        }
 }
 
-
+/*
+ * redo any page update (except page split)
+ */
 static void
 gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
        EntryUpdateRecord       xlrec;
@@ -191,19 +204,39 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
                }
        }
 
-       if ( isnewroot )
-               GISTInitBuffer(buffer, 0);
-       else if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber ) 
-               PageIndexTupleDelete(page, xlrec.data->todeleteoffnum);
+       if ( xlrec.data->isemptypage ) {
+               while( !PageIsEmpty(page) )
+                       PageIndexTupleDelete( page, FirstOffsetNumber );
+               
+               if ( xlrec.data->blkno == GIST_ROOT_BLKNO )
+                       GistPageSetLeaf( page );
+               else
+                       GistPageSetDeleted( page );
+       } else {
+               if ( isnewroot )
+                       GISTInitBuffer(buffer, 0);
+               else if ( xlrec.data->ntodelete ) { 
+                       int i;
+                       for(i=0; i < xlrec.data->ntodelete ; i++)  
+                               PageIndexTupleDelete(page, xlrec.todelete[i]);
+                       if ( GistPageIsLeaf(page) )
+                               GistMarkTuplesDeleted(page);
+               }
 
-       /* add tuples */
-       if ( xlrec.len > 0 ) {
-                OffsetNumber off = (PageIsEmpty(page)) ?  
-                        FirstOffsetNumber
-                        :
-                        OffsetNumberNext(PageGetMaxOffsetNumber(page));
+               /* add tuples */
+               if ( xlrec.len > 0 ) {
+                       OffsetNumber off = (PageIsEmpty(page)) ?  
+                               FirstOffsetNumber
+                               :
+                               OffsetNumberNext(PageGetMaxOffsetNumber(page));
 
-               gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off);
+                       gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off);
+               }
+
+               /* special case: leafpage, nothing to insert, nothing to delete, then
+                  vacuum marks page */
+               if ( GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0 )
+                       GistClearTuplesDeleted(page);   
        }
 
        PageSetLSN(page, lsn);
@@ -216,7 +249,7 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
                        forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
 
                if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO )
-                       pushIncompleteInsert(xlrec.data->node, xlrec.data->key, 
+                       pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, 
                                &(xlrec.data->blkno), 1,
                                xlrec.path, xlrec.data->pathlen,
                                NULL);
@@ -233,11 +266,17 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) {
        decoded->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * decoded->data->nitup );
 
        if ( decoded->data->pathlen ) {
-               addpath = sizeof(BlockNumber) * decoded->data->pathlen;
-               decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
+               addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen );
+               decoded->path = (BlockNumber*)(begin+sizeof( gistxlogPageSplit ));
        } else 
                decoded->path = NULL;
 
+       if ( decoded->data->ntodelete ) {
+               decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogPageSplit ) + addpath);
+               addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete );
+       } else 
+               decoded->todelete = NULL;       
+
        ptr=begin+sizeof( gistxlogPageSplit ) + addpath;
        for(i=0;i<decoded->data->nitup;i++) {
                Assert( ptr - begin < record->xl_len );
@@ -285,19 +324,23 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
                return;
        }
        
-       if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber )
-               PageIndexTupleDelete(page, xlrec.data->todeleteoffnum);
+       if ( xlrec.data->ntodelete ) { 
+               int i;
+               for(i=0; i < xlrec.data->ntodelete ; i++)  
+                       PageIndexTupleDelete(page, xlrec.todelete[i]);
+       }
 
        itup = gistextractbuffer(buffer, &len);
        itup = gistjoinvector(itup, &len, xlrec.itup, xlrec.data->nitup);
        institup = (IndexTuple*)palloc( sizeof(IndexTuple) * len );
         opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
 
+       /* read and fill all pages */
        for(i=0;i<xlrec.data->npage;i++) {
                int j;
                NewPage *newpage = xlrec.page + i; 
 
-               /* prepare itup vector */
+               /* prepare itup vector per page */
                for(j=0;j<newpage->header->num;j++)
                        institup[j] = itup[ newpage->offnum[j] - 1 ];
 
@@ -311,9 +354,9 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
                        if (!BufferIsValid(newpage->buffer))
                                elog(PANIC, "gistRedoPageSplitRecord: lost page");
                        newpage->page = (Page) BufferGetPage(newpage->buffer);
-                       if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(newpage->page))) {
-                               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-                               ReleaseBuffer(buffer);
+                       if (!PageIsNew((PageHeader) newpage->page) && XLByteLE(lsn, PageGetLSN(newpage->page))) {
+                               LockBuffer(newpage->buffer, BUFFER_LOCK_UNLOCK);
+                               ReleaseBuffer(newpage->buffer);
                                newpage->is_ok=true;
                                continue; /* good page */
                        } else {
@@ -350,7 +393,7 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
                if ( incomplete_inserts != NIL )
                        forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
 
-               pushIncompleteInsert(xlrec.data->node, xlrec.data->key, 
+               pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, 
                                NULL, 0,
                                xlrec.path, xlrec.data->pathlen,
                                &xlrec);
@@ -386,6 +429,21 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) {
        WriteBuffer(buffer);    
 }
 
+static void
+gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record) {
+       char *begin = XLogRecGetData(record), *ptr;
+       gistxlogInsertComplete  *xlrec;
+
+       xlrec = (gistxlogInsertComplete*)begin;
+
+       ptr = begin + sizeof( gistxlogInsertComplete );
+       while( ptr - begin < record->xl_len ) {
+               Assert( record->xl_len - (ptr - begin) >= sizeof(ItemPointerData) );
+               forgetIncompleteInsert( xlrec->node, *((ItemPointerData*)ptr) );
+               ptr += sizeof(ItemPointerData);
+       }  
+}
+
 void
 gist_redo(XLogRecPtr lsn, XLogRecord *record)
 {
@@ -408,8 +466,7 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
                        gistRedoCreateIndex(lsn, record);
                        break;
                case    XLOG_GIST_INSERT_COMPLETE:
-                       forgetIncompleteInsert( ((gistxlogInsertComplete*)XLogRecGetData(record))->node, 
-                               ((gistxlogInsertComplete*)XLogRecGetData(record))->key );
+                       gistRedoCompleteInsert(lsn, record);
                        break;
                default:
                        elog(PANIC, "gist_redo: unknown op code %u", info);
@@ -431,16 +488,16 @@ out_target(char *buf, RelFileNode node, ItemPointerData key)
 static void
 out_gistxlogEntryUpdate(char *buf, gistxlogEntryUpdate *xlrec) {
        out_target(buf, xlrec->node, xlrec->key);
-       sprintf(buf + strlen(buf), "; block number %u; update offset %u;", 
-               xlrec->blkno, xlrec->todeleteoffnum);
+       sprintf(buf + strlen(buf), "; block number %u", 
+               xlrec->blkno);
 }
 
 static void
 out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) {
        strcat(buf, "page_split: ");
        out_target(buf, xlrec->node, xlrec->key);
-       sprintf(buf + strlen(buf), "; block number %u; update offset %u; add %d tuples; split to %d pages", 
-               xlrec->origblkno, xlrec->todeleteoffnum,
+       sprintf(buf + strlen(buf), "; block number %u; add %d tuples; split to %d pages", 
+               xlrec->origblkno, 
                xlrec->nitup, xlrec->npage);
 }
 
@@ -472,135 +529,172 @@ gist_desc(char *buf, uint8 xl_info, char *rec)
                                ((RelFileNode*)rec)->relNode);
                        break;
                case    XLOG_GIST_INSERT_COMPLETE:
-                       strcat(buf, "insert_complete: ");
-                       out_target(buf, ((gistxlogInsertComplete*)rec)->node, ((gistxlogInsertComplete*)rec)->key); 
+                       sprintf(buf + strlen(buf), "complete_insert: rel %u/%u/%u", 
+                               ((gistxlogInsertComplete*)rec)->node.spcNode, 
+                               ((gistxlogInsertComplete*)rec)->node.dbNode, 
+                               ((gistxlogInsertComplete*)rec)->node.relNode);
                default:
                        elog(PANIC, "gist_desc: unknown op code %u", info);
        }
 }
 
+IndexTuple 
+gist_form_invalid_tuple(BlockNumber blkno) {
+       /* we don't alloc space for null's bitmap, this is invalid tuple,
+          be carefull in read and write code */
+       Size size = IndexInfoFindDataOffset(0);
+       IndexTuple tuple=(IndexTuple)palloc0( size );
+
+       tuple->t_info |= size;
+       
+       ItemPointerSetBlockNumber(&(tuple->t_tid), blkno);
+       GistTupleSetInvalid( tuple );
+
+       return tuple;
+}
 
-#ifdef GIST_INCOMPLETE_INSERT 
 static void
 gistContinueInsert(gistIncompleteInsert *insert) {
-       GISTSTATE       giststate;
-       GISTInsertState state;
-       int i;
+       IndexTuple   *itup;
+       int i, lenitup;
        MemoryContext oldCxt;
+       Relation index;
+
        oldCxt = MemoryContextSwitchTo(opCtx);
        
-       state.r = XLogOpenRelation(insert->node);
-       if (!RelationIsValid(state.r))
+       index = XLogOpenRelation(insert->node);
+       if (!RelationIsValid(index))
                return;
 
-       initGISTstate(&giststate, state.r);
+       elog(LOG,"Detected incomplete insert into GiST index %u/%u/%u; It's desirable to vacuum or reindex index",
+                insert->node.spcNode, insert->node.dbNode, insert->node.relNode);
 
-       state.needInsertComplete=false;
-       ItemPointerSetInvalid( &(state.key) );
-       state.path=NULL;
-       state.pathlen=0;
-       state.xlog_mode = true;
+       /* needed vector itup never will be more than initial lenblkno+2, 
+           because during this processing Indextuple can be only smaller */ 
+       lenitup = insert->lenblk;       
+       itup = (IndexTuple*)palloc(sizeof(IndexTuple)*(lenitup+2 /*guarantee root split*/));
 
-       /* form union tuples */
-       state.itup = (IndexTuple*)palloc(sizeof(IndexTuple)*insert->lenblk);
-       state.ituplen = insert->lenblk; 
-       for(i=0;i<insert->lenblk;i++) {
-               int len=0;
-               IndexTuple *itup;
-               Buffer  buffer;
-               Page    page;
+       for(i=0;i<insert->lenblk;i++) 
+               itup[i] = gist_form_invalid_tuple( insert->blkno[i] );
 
-               buffer = XLogReadBuffer(false, state.r, insert->blkno[i]);
-               if (!BufferIsValid(buffer))
-                       elog(PANIC, "gistContinueInsert: block unfound");
-               page = (Page) BufferGetPage(buffer);
-               if ( PageIsNew((PageHeader)page) )
-                       elog(PANIC, "gistContinueInsert: uninitialized page");
+       if ( insert->pathlen==0 ) {
+               /*it  was split root, so we should only make new root*/
+               Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO);
+               Page   page;
 
-               itup = gistextractbuffer(buffer, &len);
-               state.itup[i] = gistunion(state.r, itup, len, &giststate);
+               if (!BufferIsValid(buffer))
+                       elog(PANIC, "gistContinueInsert: root block unfound");
 
-               ItemPointerSet( &(state.itup[i]->t_tid), insert->blkno[i], FirstOffsetNumber );
-               
+               GISTInitBuffer(buffer, 0);
+               page = BufferGetPage(buffer);
+               gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber);
                LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-               ReleaseBuffer(buffer);
-       }
-
-       if ( insert->pathlen==0 ) { 
-               /*it  was split root, so we should only make new root*/
-               gistnewroot(state.r, state.itup, state.ituplen, &(state.key), true);
-               MemoryContextSwitchTo(oldCxt);
-               MemoryContextReset(opCtx);
-               return;
-       }
+               WriteBuffer(buffer);
+       } else {
+               Buffer  *buffers;
+               Page    *pages;
+               int numbuffer;
+               
+               buffers= (Buffer*) palloc( sizeof(Buffer) * (insert->lenblk+2/*guarantee root split*/) );
+               pages  = (Page*)   palloc( sizeof(Page  ) * (insert->lenblk+2/*guarantee root split*/) );
 
-       /* form stack */
-       state.stack=NULL;
-       for(i=0;i<insert->pathlen;i++) {
-               int j,len=0;
-               IndexTuple *itup;
-               GISTInsertStack *top = (GISTInsertStack*)palloc( sizeof(GISTInsertStack) );
-
-               top->blkno = insert->path[i];
-               top->buffer = XLogReadBuffer(false, state.r, top->blkno);
-               if (!BufferIsValid(top->buffer))
-                       elog(PANIC, "gistContinueInsert: block unfound");
-               top->page = (Page) BufferGetPage(top->buffer);
-               if ( PageIsNew((PageHeader)(top->page)) )
-                       elog(PANIC, "gistContinueInsert: uninitialized page");
-
-               top->todelete = false;  
-
-               /* find childoffnum */
-               itup = gistextractbuffer(top->buffer, &len);
-               top->childoffnum=InvalidOffsetNumber;
-               for(j=0;j<len && top->childoffnum==InvalidOffsetNumber;j++) {
-                       BlockNumber blkno = ItemPointerGetBlockNumber( &(itup[j]->t_tid) ); 
+               for(i=0;i<insert->pathlen;i++) {
+                       int     j, k, pituplen=0, childfound=0;
+               
+                       numbuffer=1;
+                       buffers[numbuffer-1] = XLogReadBuffer(false, index, insert->path[i]);
+                       if (!BufferIsValid(buffers[numbuffer-1]))
+                               elog(PANIC, "gistContinueInsert: block %u unfound", insert->path[i]);
+                       pages[numbuffer-1] = BufferGetPage( buffers[numbuffer-1] );
+                       if ( PageIsNew((PageHeader)(pages[numbuffer-1])) )
+                               elog(PANIC, "gistContinueInsert: uninitialized page");
+
+                       pituplen = PageGetMaxOffsetNumber(pages[numbuffer-1]);
                        
-                       if ( i==0 ) {
-                               int k; 
-                               for(k=0;k<insert->lenblk;k++)
-                                       if ( insert->blkno[k] == blkno ) {
-                                               top->childoffnum = j+1;
+                       /* remove old IndexTuples */
+                       for(j=0;j<pituplen && childfound<lenitup;j++) {
+                               BlockNumber blkno;
+                               ItemId iid = PageGetItemId(pages[numbuffer-1], j+FirstOffsetNumber);
+                               IndexTuple idxtup = (IndexTuple) PageGetItem(pages[numbuffer-1], iid);
+
+                               blkno = ItemPointerGetBlockNumber( &(idxtup->t_tid) );
+
+                               for(k=0;k<lenitup;k++) 
+                                       if ( ItemPointerGetBlockNumber( &(itup[k]->t_tid) ) == blkno ) {
+                                               PageIndexTupleDelete(pages[numbuffer-1], j+FirstOffsetNumber);
+                                               j--; pituplen--;
+                                               childfound++;
                                                break;
                                        }
-                       } else if ( insert->path[i-1]==blkno )
-                                       top->childoffnum = j+1;
-               }
+                       }
 
-               if ( top->childoffnum==InvalidOffsetNumber ) {
-                       elog(WARNING, "gistContinueInsert: unknown parent, REINDEX GiST Indexes");
-                       return;
+                       if ( gistnospace(pages[numbuffer-1], itup, lenitup) ) { 
+                               /* no space left on page, so we should split */
+                               buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW);
+                               if (!BufferIsValid(buffers[numbuffer]))
+                                       elog(PANIC, "gistContinueInsert: can't create new block");
+                               GISTInitBuffer(buffers[numbuffer], 0);
+                               pages[numbuffer] = BufferGetPage( buffers[numbuffer] );
+                               gistfillbuffer( index, pages[numbuffer], itup, lenitup, FirstOffsetNumber );
+                               numbuffer++;
+
+                               if ( BufferGetBlockNumber( buffers[0] ) == GIST_ROOT_BLKNO ) {
+                                       IndexTuple *parentitup;
+
+                                       parentitup = gistextractbuffer(buffers[numbuffer-1], &pituplen);
+
+                                       /* we split root, just copy tuples from old root to new page */
+                                       if ( i+1 != insert->pathlen )
+                                               elog(PANIC,"gistContinueInsert: can't restore index '%s'",
+                                                       RelationGetRelationName( index ));
+
+                                       /* fill new page */ 
+                                       buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW);
+                                       if (!BufferIsValid(buffers[numbuffer]))
+                                               elog(PANIC, "gistContinueInsert: can't create new block");
+                                       GISTInitBuffer(buffers[numbuffer], 0);
+                                       pages[numbuffer] = BufferGetPage( buffers[numbuffer] );
+                                       gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber);
+                                       numbuffer++;
+
+                                       /* fill root page */
+                                       GISTInitBuffer(buffers[0], 0);
+                                       for(j=1;j<numbuffer;j++) {
+                                               IndexTuple  tuple = gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) );
+                                               if ( InvalidOffsetNumber == PageAddItem(pages[0], 
+                                                               (Item)tuple,
+                                                               IndexTupleSize( tuple ),
+                                                               (OffsetNumber)j,
+                                                               LP_USED) )
+                                                       elog( PANIC,"gistContinueInsert: can't restore index '%s'",
+                                                                       RelationGetRelationName( index ));
+                                               }
+                               }
+                       } else 
+                               gistfillbuffer( index, pages[numbuffer-1], itup, lenitup, 
+                                       (PageIsEmpty(pages[numbuffer-1])) ? 
+                                               FirstOffsetNumber : OffsetNumberNext(PageGetMaxOffsetNumber(pages[numbuffer-1])) );
+
+                       lenitup=numbuffer;
+                       for(j=0;j<numbuffer;j++) {
+                               itup[j]=gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) );
+                               PageSetLSN(pages[j], insert->lsn);
+                               PageSetTLI(pages[j], ThisTimeLineID);
+                               LockBuffer(buffers[j], BUFFER_LOCK_UNLOCK);
+                               WriteBuffer( buffers[j] );
+                       }
                }
-
-               if ( i==0 ) 
-                       PageIndexTupleDelete(top->page, top->childoffnum);
-                       
-               /* install item on right place in stack */
-               top->parent=NULL;
-               if ( state.stack ) {
-                       GISTInsertStack *ptr = state.stack;
-                       while( ptr->parent )
-                               ptr = ptr->parent;
-                       ptr->parent=top;
-               } else
-                       state.stack = top;
        }
 
-       /* Good. Now we can continue insert */
-
-       gistmakedeal(&state, &giststate);
-
        MemoryContextSwitchTo(oldCxt);
        MemoryContextReset(opCtx);
 }
-#endif
 
 void
 gist_xlog_startup(void) {
        incomplete_inserts=NIL;
        insertCtx = AllocSetContextCreate(CurrentMemoryContext,
-               "GiST insert in xlog  temporary context",       
+               "GiST recovery temporary context",      
                                  ALLOCSET_DEFAULT_MINSIZE,
                                  ALLOCSET_DEFAULT_INITSIZE,
                                  ALLOCSET_DEFAULT_MAXSIZE);
@@ -613,16 +707,194 @@ gist_xlog_cleanup(void) {
 
        foreach(l, incomplete_inserts) {
                gistIncompleteInsert    *insert = (gistIncompleteInsert*) lfirst(l);
-               char buf[1024];
-
-               *buf='\0';
-               out_target(buf, insert->node, insert->key);
-               elog(LOG,"Incomplete insert: %s; It's needed to reindex", buf);
-#ifdef GIST_INCOMPLETE_INSERT 
                gistContinueInsert(insert);
-#endif
        }
        MemoryContextDelete(opCtx);
        MemoryContextDelete(insertCtx); 
 }
 
+
+XLogRecData *
+formSplitRdata(RelFileNode node, BlockNumber blkno, 
+               OffsetNumber *todelete, int ntodelete, 
+               IndexTuple *itup, int ituplen, ItemPointer key, 
+               BlockNumber *path, int pathlen, SplitedPageLayout *dist ) {
+               
+       XLogRecData     *rdata;
+       gistxlogPageSplit       *xlrec = (gistxlogPageSplit*)palloc(sizeof(gistxlogPageSplit));
+       SplitedPageLayout       *ptr;
+       int npage = 0, cur=1, i;
+
+       ptr=dist;
+       while( ptr ) {
+               npage++;
+               ptr=ptr->next;
+       }
+
+       rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + ituplen + 3));
+
+       xlrec->node = node;
+       xlrec->origblkno = blkno;
+       xlrec->npage = (uint16)npage;
+       xlrec->nitup = (uint16)ituplen;
+       xlrec->ntodelete = (uint16)ntodelete;
+       xlrec->pathlen = (uint16)pathlen;
+       if ( key )
+               xlrec->key = *key;
+       else
+               ItemPointerSetInvalid( &(xlrec->key) );
+       
+       rdata[0].buffer = InvalidBuffer;
+       rdata[0].data   = (char *) xlrec;
+       rdata[0].len    = sizeof( gistxlogPageSplit );
+       rdata[0].next   = NULL;
+
+       if ( pathlen ) {
+               rdata[cur-1].next   = &(rdata[cur]);
+               rdata[cur].buffer = InvalidBuffer;
+               rdata[cur].data = (char*)path;
+               rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen);
+               rdata[cur].next = NULL;
+               cur++;
+       }
+
+       if ( ntodelete ) {
+               rdata[cur-1].next   = &(rdata[cur]);
+               rdata[cur].buffer = InvalidBuffer;
+               rdata[cur].data = (char*)todelete;
+               rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ntodelete);
+               rdata[cur].next = NULL;
+               cur++;
+       }
+
+       /* new tuples */
+       for(i=0;i<ituplen;i++) {
+               rdata[cur].buffer = InvalidBuffer;
+               rdata[cur].data   = (char*)(itup[i]);
+               rdata[cur].len  = IndexTupleSize(itup[i]);
+               rdata[cur].next  = NULL;
+               rdata[cur-1].next = &(rdata[cur]);
+               cur++;
+       }
+
+       ptr=dist;
+       while(ptr) {
+               rdata[cur].buffer = InvalidBuffer;
+               rdata[cur].data   = (char*)&(ptr->block);
+               rdata[cur].len  = sizeof(gistxlogPage);
+               rdata[cur-1].next = &(rdata[cur]);
+               cur++;
+
+               rdata[cur].buffer = InvalidBuffer;
+               rdata[cur].data   = (char*)(ptr->list);
+               rdata[cur].len    = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num);
+               if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num )
+                       rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len );
+               rdata[cur-1].next = &(rdata[cur]);
+               rdata[cur].next=NULL;
+               cur++;
+               ptr=ptr->next;
+       }
+
+       return rdata;    
+}
+
+
+XLogRecData *
+formUpdateRdata(RelFileNode node, BlockNumber blkno, 
+               OffsetNumber *todelete, int ntodelete, bool emptypage,
+               IndexTuple *itup, int ituplen, ItemPointer key, 
+               BlockNumber *path, int pathlen) {
+       XLogRecData     *rdata;
+       gistxlogEntryUpdate     *xlrec = (gistxlogEntryUpdate*)palloc(sizeof(gistxlogEntryUpdate));
+
+       xlrec->node = node;
+       xlrec->blkno = blkno;
+       if ( key )
+               xlrec->key = *key;
+       else
+               ItemPointerSetInvalid( &(xlrec->key) );
+       
+       if ( emptypage ) {
+               xlrec->isemptypage = true;
+               xlrec->ntodelete = 0;
+               xlrec->pathlen = 0;
+               
+               rdata = (XLogRecData*)palloc( sizeof(XLogRecData) );
+               rdata->buffer = InvalidBuffer;
+               rdata->data = (char*)xlrec;
+               rdata->len = sizeof(gistxlogEntryUpdate);
+               rdata->next = NULL;
+       } else {
+               int cur=1,i;
+
+               xlrec->isemptypage = false;
+               xlrec->ntodelete = ntodelete;
+               xlrec->pathlen = pathlen;
+
+               rdata = (XLogRecData*) palloc( sizeof(XLogRecData) * ( 3 + ituplen ) );
+
+               rdata->buffer = InvalidBuffer;
+               rdata->data = (char*)xlrec;
+               rdata->len = sizeof(gistxlogEntryUpdate);
+               rdata->next = NULL;
+
+               if ( pathlen ) {
+                       rdata[cur-1].next   = &(rdata[cur]);
+                       rdata[cur].buffer = InvalidBuffer;
+                       rdata[cur].data = (char*)path;
+                       rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen);
+                       rdata[cur].next = NULL;
+                       cur++;
+               }
+
+               if ( ntodelete ) {
+                       rdata[cur-1].next   = &(rdata[cur]);
+                       rdata[cur].buffer = InvalidBuffer;
+                       rdata[cur].data = (char*)todelete;
+                       rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ntodelete);
+                       rdata[cur].next = NULL;
+                       cur++;
+               }
+
+               /* new tuples */
+                for(i=0;i<ituplen;i++) {
+                       rdata[cur].buffer = InvalidBuffer;
+                       rdata[cur].data   = (char*)(itup[i]);
+                       rdata[cur].len  = IndexTupleSize(itup[i]);
+                       rdata[cur].next  = NULL;
+                       rdata[cur-1].next = &(rdata[cur]);
+                       cur++;
+               }
+       }
+
+       return rdata;
+}
+
+XLogRecPtr 
+gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len) {
+       gistxlogInsertComplete  xlrec;
+       XLogRecData             rdata[2];
+       XLogRecPtr recptr;
+
+       Assert(len>0);
+       xlrec.node = node;
+
+       rdata[0].buffer = InvalidBuffer;
+       rdata[0].data   = (char *) &xlrec;
+       rdata[0].len    = sizeof( gistxlogInsertComplete );
+       rdata[0].next   = &(rdata[1]);
+
+       rdata[1].buffer = InvalidBuffer;
+       rdata[1].data   = (char *) keys;
+       rdata[1].len    = sizeof( ItemPointerData ) * len;
+       rdata[1].next   = NULL;
+
+       START_CRIT_SECTION();
+
+       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata);
+  
+       END_CRIT_SECTION();
+
+       return recptr;
+}
index 33110b71b6aa1e5577f2542773ca35f2b49e1e20..bf9c1c712bb7ca7a274a57276a7099aa7ef81819 100644 (file)
@@ -9,7 +9,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/gist.h,v 1.46 2005/05/17 03:34:18 neilc Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist.h,v 1.47 2005/06/20 10:29:36 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -36,6 +36,8 @@
  * Page opaque data in a GiST index page.
  */
 #define F_LEAF                 (1 << 0)
+#define F_DELETED              (1 << 1)
+#define F_TUPLES_DELETED       (1 << 2)
 
 typedef struct GISTPageOpaqueData
 {
@@ -56,6 +58,7 @@ typedef struct GIST_SPLITVEC
                                                                                                 * spl_left */
        int                     spl_lattrsize[INDEX_MAX_KEYS];
        bool            spl_lisnull[INDEX_MAX_KEYS];
+       bool            spl_leftvalid;
 
        OffsetNumber *spl_right;        /* array of entries that go right */
        int                     spl_nright;             /* size of the array */
@@ -64,6 +67,7 @@ typedef struct GIST_SPLITVEC
                                                                                                 * spl_right */
        int                     spl_rattrsize[INDEX_MAX_KEYS];
        bool            spl_risnull[INDEX_MAX_KEYS];
+       bool            spl_rightvalid;
 
        int                *spl_idgrp;
        int                *spl_ngrp;           /* number in each group */
@@ -86,7 +90,18 @@ typedef struct GISTENTRY
        bool            leafkey;
 } GISTENTRY;
 
-#define GIST_LEAF(entry) (((GISTPageOpaque) PageGetSpecialPointer((entry)->page))->flags & F_LEAF)
+#define GistPageIsLeaf(page)   (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_LEAF)
+#define GIST_LEAF(entry) (GistPageIsLeaf((entry)->page))
+#define GistPageSetLeaf(page)  (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_LEAF)
+#define GistPageSetNonLeaf(page)       (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_LEAF)
+
+#define GistPageIsDeleted(page)        (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_DELETED)
+#define GistPageSetDeleted(page)       (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_DELETED)
+#define GistPageSetNonDeleted(page)    (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_DELETED)
+
+#define GistTuplesDeleted(page)        (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_TUPLES_DELETED)
+#define GistMarkTuplesDeleted(page)    (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_TUPLES_DELETED)
+#define GistClearTuplesDeleted(page)   (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_TUPLES_DELETED)
 
 /*
  * Vector of GISTENTRY structs; user-defined methods union and pick
index 479f221176b00f9fa7fd8e6367bc5dbee017d013..2a563e1dd65817b7a6ff9e50ba70dc2af0432610 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.3 2005/06/14 11:45:14 teodor Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.4 2005/06/20 10:29:36 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -94,7 +94,6 @@ typedef struct {
        int             ituplen; /* length of itup */
        GISTInsertStack *stack;
        bool needInsertComplete;
-       bool xlog_mode;
 
        /* pointer to heap tuple */
        ItemPointerData key;
@@ -142,19 +141,20 @@ typedef struct gistxlogEntryUpdate {
        RelFileNode     node;
        BlockNumber     blkno;
 
-       /* if todeleteoffnum!=InvalidOffsetNumber then delete it. */ 
-       OffsetNumber    todeleteoffnum;
+       uint16          ntodelete;
        uint16          pathlen;
+       bool            isemptypage;    
 
        /* 
-        * It used to identify compliteness of insert.
+        * It used to identify completeness of insert.
          * Sets to leaf itup 
          */ 
        ItemPointerData key;
 
        /* follow:
-        * 1. path to root (BlockNumber) 
-        * 2. tuples to insert
+        * 1. path to root (BlockNumber)
+        * 2. todelete OffsetNumbers 
+        * 3. tuples to insert
          */ 
 } gistxlogEntryUpdate;
 
@@ -163,18 +163,19 @@ typedef struct gistxlogEntryUpdate {
 typedef struct gistxlogPageSplit {
        RelFileNode     node;
        BlockNumber     origblkno; /*splitted page*/
-       OffsetNumber    todeleteoffnum;
+       uint16          ntodelete;
        uint16          pathlen;
-       int             npage;
-       int             nitup;
+       uint16          npage;
+       uint16          nitup;
 
        /* see comments on gistxlogEntryUpdate */
        ItemPointerData key;
  
        /* follow:
         * 1. path to root (BlockNumber) 
-        * 2. tuples to insert
-        * 3. gistxlogPage and array of OffsetNumber per page
+        * 2. todelete OffsetNumbers 
+        * 3. tuples to insert
+        * 4. gistxlogPage and array of OffsetNumber per page
          */ 
 } gistxlogPageSplit;
 
@@ -188,32 +189,65 @@ typedef struct gistxlogPage {
 
 typedef struct gistxlogInsertComplete {
        RelFileNode     node;
-       ItemPointerData key;
+       /* follows ItemPointerData key to clean */
 } gistxlogInsertComplete;
 
-#define XLOG_GIST_CREATE_INDEX 0x50
+#define        XLOG_GIST_CREATE_INDEX  0x50
+
+/*
+ * mark tuples on inner pages during recovery
+ */
+#define TUPLE_IS_VALID         0xffff
+#define TUPLE_IS_INVALID       0xfffe
+
+#define  GistTupleIsInvalid(itup)      ( ItemPointerGetOffsetNumber( &((itup)->t_tid) ) == TUPLE_IS_INVALID )
+#define  GistTupleSetValid(itup)       ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID )
+#define  GistTupleSetInvalid(itup)     ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_INVALID )
 
 /* gist.c */
 extern Datum gistbuild(PG_FUNCTION_ARGS);
 extern Datum gistinsert(PG_FUNCTION_ARGS);
-extern Datum gistbulkdelete(PG_FUNCTION_ARGS);
 extern MemoryContext createTempGistContext(void);
 extern void initGISTstate(GISTSTATE *giststate, Relation index);
 extern void freeGISTstate(GISTSTATE *giststate);
-extern void gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode);
+extern void gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key);
 extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate);
 
+typedef struct SplitedPageLayout {
+        gistxlogPage    block;
+        OffsetNumber    *list;
+        Buffer          buffer; /* to write after all proceed */
+
+        struct SplitedPageLayout *next;
+} SplitedPageLayout;
+
+IndexTuple * gistSplit(Relation r, Buffer buffer, IndexTuple *itup,
+                  int *len, SplitedPageLayout    **dist, GISTSTATE *giststate);
 /* gistxlog.c */
 extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
 extern void gist_desc(char *buf, uint8 xl_info, char *rec);
 extern void gist_xlog_startup(void);
 extern void gist_xlog_cleanup(void);
+extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno);
+
+extern XLogRecData* formUpdateRdata(RelFileNode node, BlockNumber blkno,
+                OffsetNumber *todelete, int ntodelete, bool emptypage,
+                IndexTuple *itup, int ituplen, ItemPointer key,
+                BlockNumber *path, int pathlen);
+
+extern XLogRecData* formSplitRdata(RelFileNode node, BlockNumber blkno,
+                OffsetNumber *todelete, int ntodelete, 
+                IndexTuple *itup, int ituplen, ItemPointer key,
+                BlockNumber *path, int pathlen, SplitedPageLayout *dist );
+
+extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len);
 
 /* gistget.c */
 extern Datum gistgettuple(PG_FUNCTION_ARGS);
 extern Datum gistgetmulti(PG_FUNCTION_ARGS);
 
 /* gistutil.c */
+extern Buffer  gistReadBuffer(Relation r, BlockNumber blkno);
 extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup,
                                 int len, OffsetNumber off);
 extern bool gistnospace(Page page, IndexTuple *itvec, int len);
@@ -230,7 +264,7 @@ extern IndexTuple gistgetadjusted(Relation r,
 extern int gistfindgroup(GISTSTATE *giststate,
                           GISTENTRY *valvec, GIST_SPLITVEC *spl);
 extern void gistadjsubkey(Relation r,
-                          IndexTuple *itup, int *len,
+                          IndexTuple *itup, int len,
                           GIST_SPLITVEC *v,
                           GISTSTATE *giststate);
 extern IndexTuple gistFormTuple(GISTSTATE *giststate,
@@ -247,10 +281,16 @@ extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r,
                               IndexTuple tuple, Page p, OffsetNumber o,
                               GISTENTRY *attdata, bool *isnull);
 extern void gistunionsubkey(Relation r, GISTSTATE *giststate, 
-                            IndexTuple *itvec, GIST_SPLITVEC *spl);
+                            IndexTuple *itvec, GIST_SPLITVEC *spl, bool isall);
 extern void GISTInitBuffer(Buffer b, uint32 f);
 extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
                           Datum k, Relation r, Page pg, OffsetNumber o,
                           int b, bool l, bool isNull);
+void gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
+                IndexTuple *itup, int len, GISTSTATE *giststate);
+
+/* gistvacuum.c */
+extern Datum gistbulkdelete(PG_FUNCTION_ARGS);
+extern Datum gistvacuumcleanup(PG_FUNCTION_ARGS);
 
 #endif /* GIST_PRIVATE_H */
index bdbaa83ace87e0694ef9cd951079bfb8a42f5176..458cddd134c1adc07209ca73912b6ef2f438898e 100644 (file)
@@ -37,7 +37,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.278 2005/06/18 19:33:42 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.279 2005/06/20 10:29:37 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     200506181
+#define CATALOG_VERSION_NO     200506201
 
 #endif
index f473277b46bafc645e9c1396559e6f5885516bf3..2cb22643280b8c8a16a8b5f44d515933d5844f72 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.34 2005/06/13 23:14:49 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.35 2005/06/20 10:29:37 teodor Exp $
  *
  * NOTES
  *             the genbki.sh script reads this file and generates .bki
@@ -112,7 +112,7 @@ DESCR("b-tree index access method");
 DATA(insert OID = 405 (  hash  1 1 0 f f f f t hashinsert hashbeginscan hashgettuple hashgetmulti hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbulkdelete - hashcostestimate ));
 DESCR("hash index access method");
 #define HASH_AM_OID 405
-DATA(insert OID = 783 (  gist  100 7 0 f t f f f gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete - gistcostestimate ));
+DATA(insert OID = 783 (  gist  100 7 0 f t f f f gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate ));
 DESCR("GiST index access method");
 #define GIST_AM_OID 783
 
index a05a4f3a62cf8e728606b008db8f3f7260f6c9b5..f219065b61ce0a8cc13454213ff3a5c42676a6ac 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/catalog/pg_proc.h,v 1.368 2005/06/17 22:32:48 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/pg_proc.h,v 1.369 2005/06/20 10:29:37 teodor Exp $
  *
  * NOTES
  *       The script catalog/genbki.sh reads this file and generates .bki
@@ -1092,6 +1092,7 @@ DATA(insert OID = 782 (  gistbuild                   PGNSP PGUID 12 f f t f v 3 2278 "2281 228
 DESCR("gist(internal)");
 DATA(insert OID = 776 (  gistbulkdelete    PGNSP PGUID 12 f f t f v 3 2281 "2281 2281 2281" _null_ _null_ _null_ gistbulkdelete - _null_ ));
 DESCR("gist(internal)");
+DATA(insert OID = 2561 (  gistvacuumcleanup   PGNSP PGUID 12 f f t f v 3 2281 "2281 2281 2281" _null_ _null_ _null_ gistvacuumcleanup - _null_ ));
 DATA(insert OID = 772 (  gistcostestimate  PGNSP PGUID 12 f f t f v 7 2278 "2281 2281 2281 2281 2281 2281 2281" _null_ _null_ _null_  gistcostestimate - _null_ ));
 DESCR("gist(internal)");