]> granicus.if.org Git - postgresql/commitdiff
WAL for GiST. It work for online backup and so on, but on
authorTeodor Sigaev <teodor@sigaev.ru>
Tue, 14 Jun 2005 11:45:14 +0000 (11:45 +0000)
committerTeodor Sigaev <teodor@sigaev.ru>
Tue, 14 Jun 2005 11:45:14 +0000 (11:45 +0000)
recovery after crash (power loss etc) it may say that it can't restore
index and index should be reindexed.

Some refactoring code.

src/backend/access/gist/Makefile
src/backend/access/gist/gist.c
src/backend/access/gist/gistget.c
src/backend/access/gist/gistutil.c [new file with mode: 0644]
src/backend/access/gist/gistxlog.c [new file with mode: 0644]
src/backend/access/transam/rmgr.c
src/include/access/gist_private.h

index f2ec10a90af3a538eb7026d91b962f391f3d14f5..b22f846a23ddc841bf10cb2fbdb1fa2356631fd9 100644 (file)
@@ -4,7 +4,7 @@
 #    Makefile for access/gist
 #
 # IDENTIFICATION
-#    $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.12 2003/11/29 19:51:39 pgsql Exp $
+#    $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.13 2005/06/14 11:45:13 teodor Exp $
 #
 #-------------------------------------------------------------------------
 
@@ -12,7 +12,7 @@ subdir = src/backend/access/gist
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = gist.o gistget.o gistscan.o
+OBJS = gist.o gistutil.o gistxlog.o gistget.o gistscan.o
 
 all: SUBSYS.o
 
index 2a55c91f0df347138cec17f62d19b2b399fee7ca..4e3faccdf92566c4cdd2df8319bac9dde55f23a3 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.118 2005/06/06 17:01:21 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.119 2005/06/14 11:45:13 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "miscadmin.h"
 #include "utils/memutils.h"
 
-
-#undef GIST_PAGEADDITEM
-
-#define ATTSIZE(datum, tupdesc, i, isnull) \
-       ( \
-               (isnull) ? 0 : \
-                       att_addlength(0, (tupdesc)->attrs[(i)-1]->attlen, (datum)) \
-       )
-
-/* result's status */
-#define INSERTED       0x01
-#define SPLITED                0x02
-
-/* group flags ( in gistSplit ) */
-#define LEFT_ADDED     0x01
-#define RIGHT_ADDED 0x02
-#define BOTH_ADDED     ( LEFT_ADDED | RIGHT_ADDED )
-
-/*
- * This defines only for shorter code, used in gistgetadjusted
- * and gistadjsubkey only
- */
-#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb)      do { \
-       if (isnullkey) {                                                                                          \
-               gistentryinit((evp), rkey, r, NULL,                                               \
-                                         (OffsetNumber) 0, rkeyb, FALSE);                        \
-       } else {                                                                                                          \
-               gistentryinit((evp), okey, r, NULL,                                               \
-                                         (OffsetNumber) 0, okeyb, FALSE);                        \
-       }                                                                                                                         \
-} while(0)
-
-#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \
-       FILLITEM(*ev0p, isnull1, key1, key1b, key2, key2b);             \
-       FILLITEM(*ev1p, isnull2, key2, key2b, key1, key1b);             \
-} while(0);
-
 /* Working state for gistbuild and its callback */
 typedef struct
 {
@@ -80,62 +43,34 @@ static void gistbuildCallback(Relation index,
 static void gistdoinsert(Relation r,
                         IndexTuple itup,
                         GISTSTATE *GISTstate);
-static int gistlayerinsert(Relation r, BlockNumber blkno,
-                               IndexTuple **itup,
-                               int *len,
-                               GISTSTATE *giststate);
-static OffsetNumber gistwritebuffer(Relation r,
-                               Page page,
-                               IndexTuple *itup,
-                               int len,
-                               OffsetNumber off);
-static bool gistnospace(Page page, IndexTuple *itvec, int len);
-static IndexTuple *gistreadbuffer(Buffer buffer, int *len);
-static IndexTuple *gistjoinvector(
-                          IndexTuple *itvec, int *len,
-                          IndexTuple *additvec, int addlen);
-static IndexTuple gistunion(Relation r, IndexTuple *itvec,
-                 int len, GISTSTATE *giststate);
-
-static IndexTuple gistgetadjusted(Relation r,
-                               IndexTuple oldtup,
-                               IndexTuple addtup,
+static void gistfindleaf(GISTInsertState *state,
                                GISTSTATE *giststate);
-static int gistfindgroup(GISTSTATE *giststate,
-                         GISTENTRY *valvec, GIST_SPLITVEC *spl);
-static void gistadjsubkey(Relation r,
-                         IndexTuple *itup, int *len,
-                         GIST_SPLITVEC *v,
-                         GISTSTATE *giststate);
-static IndexTuple gistFormTuple(GISTSTATE *giststate,
-                       Relation r, Datum *attdata, int *datumsize, bool *isnull);
+
+
+typedef struct PageLayout {
+       gistxlogPage    block;
+       OffsetNumber    *list;
+       Buffer          buffer; /* to write after all proceed */
+
+       struct PageLayout *next;
+} PageLayout;
+
+
+#define ROTATEDIST(d) do { \
+       PageLayout *tmp=(PageLayout*)palloc(sizeof(PageLayout)); \
+       memset(tmp,0,sizeof(PageLayout)); \
+       tmp->next = (d); \
+       (d)=tmp; \
+} while(0)
+       
+
 static IndexTuple *gistSplit(Relation r,
                  Buffer buffer,
                  IndexTuple *itup,
                  int *len,
+                 PageLayout    **dist,
                  GISTSTATE *giststate);
-static void gistnewroot(Relation r, IndexTuple *itup, int len);
-static void GISTInitBuffer(Buffer b, uint32 f);
-static OffsetNumber gistchoose(Relation r, Page p,
-                  IndexTuple it,
-                  GISTSTATE *giststate);
-static void gistdelete(Relation r, ItemPointer tid);
-
-#ifdef GIST_PAGEADDITEM
-static IndexTuple gist_tuple_replacekey(Relation r,
-                                         GISTENTRY entry, IndexTuple t);
-#endif
-static void gistcentryinit(GISTSTATE *giststate, int nkey,
-                          GISTENTRY *e, Datum k,
-                          Relation r, Page pg,
-                          OffsetNumber o, int b, bool l, bool isNull);
-static void gistDeCompressAtt(GISTSTATE *giststate, Relation r,
-                                                         IndexTuple tuple, Page p, OffsetNumber o,
-                                                         GISTENTRY *attdata, bool *isnull);
-static void gistpenalty(GISTSTATE *giststate, int attno,
-                       GISTENTRY *key1, bool isNull1,
-                       GISTENTRY *key2, bool isNull2,
-                       float *penalty);
+
 
 #undef GISTDEBUG
 
@@ -191,6 +126,26 @@ gistbuild(PG_FUNCTION_ARGS)
        /* initialize the root page */
        buffer = ReadBuffer(index, P_NEW);
        GISTInitBuffer(buffer, F_LEAF);
+       if ( !index->rd_istemp ) {
+               XLogRecPtr              recptr;
+               XLogRecData             rdata;
+               Page                    page;
+
+               rdata.buffer     = InvalidBuffer;
+               rdata.data       = (char*)&(index->rd_node);
+               rdata.len        = sizeof(RelFileNode);
+               rdata.next       = NULL;
+
+               page = BufferGetPage(buffer);
+
+               START_CRIT_SECTION();
+
+               recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
+               PageSetLSN(page, recptr);
+               PageSetTLI(page, ThisTimeLineID);
+
+               END_CRIT_SECTION();
+       }
        WriteBuffer(buffer);
 
        /* build the index */
@@ -340,51 +295,6 @@ gistinsert(PG_FUNCTION_ARGS)
        PG_RETURN_BOOL(true);
 }
 
-#ifdef GIST_PAGEADDITEM
-/*
- * Take a compressed entry, and install it on a page.  Since we now know
- * where the entry will live, we decompress it and recompress it using
- * that knowledge (some compression routines may want to fish around
- * on the page, for example, or do something special for leaf nodes.)
- */
-static OffsetNumber
-gistPageAddItem(GISTSTATE *giststate,
-                               Relation r,
-                               Page page,
-                               Item item,
-                               Size size,
-                               OffsetNumber offsetNumber,
-                               ItemIdFlags flags,
-                               GISTENTRY *dentry,
-                               IndexTuple *newtup)
-{
-       GISTENTRY       tmpcentry;
-       IndexTuple      itup = (IndexTuple) item;
-       OffsetNumber retval;
-       Datum           datum;
-       bool            IsNull;
-
-       /*
-        * recompress the item given that we now know the exact page and
-        * offset for insertion
-        */
-       datum = index_getattr(itup, 1, r->rd_att, &IsNull);
-       gistdentryinit(giststate, 0, dentry, datum,
-                                  (Relation) 0, (Page) 0,
-                                  (OffsetNumber) InvalidOffsetNumber,
-                                  ATTSIZE(datum, r, 1, IsNull),
-                                  FALSE, IsNull);
-       gistcentryinit(giststate, 0, &tmpcentry, dentry->key, r, page,
-                                  offsetNumber, dentry->bytes, FALSE);
-       *newtup = gist_tuple_replacekey(r, tmpcentry, itup);
-       retval = PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup),
-                                                offsetNumber, flags);
-       if (retval == InvalidOffsetNumber)
-               elog(ERROR, "failed to add index item to \"%s\"",
-                        RelationGetRelationName(r));
-       return retval;
-}
-#endif
 
 /*
  * Workhouse routine for doing insertion into a GiST index. Note that
@@ -394,706 +304,365 @@ gistPageAddItem(GISTSTATE *giststate,
 static void
 gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate)
 {
-       IndexTuple *instup;
-       int                     ret,
-                               len = 1;
+       GISTInsertState state;
 
-       instup = (IndexTuple *) palloc(sizeof(IndexTuple));
-       instup[0] = (IndexTuple) palloc(IndexTupleSize(itup));
-       memcpy(instup[0], itup, IndexTupleSize(itup));
+       memset(&state, 0, sizeof(GISTInsertState));
 
-       ret = gistlayerinsert(r, GIST_ROOT_BLKNO, &instup, &len, giststate);
-       if (ret & SPLITED)
-               gistnewroot(r, instup, len);
-}
+       state.itup = (IndexTuple *) palloc(sizeof(IndexTuple));
+       state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup));
+       memcpy(state.itup[0], itup, IndexTupleSize(itup));
+       state.ituplen=1;
+       state.r = r;
+       state.key = itup->t_tid;
+       state.needInsertComplete = true; 
+       state.xlog_mode = false;
 
-static int
-gistlayerinsert(Relation r, BlockNumber blkno,
-                               IndexTuple **itup,              /* in - out, has compressed entry */
-                               int *len,               /* in - out */
-                               GISTSTATE *giststate)
-{
-       Buffer          buffer;
-       Page            page;
-       int                     ret;
-       GISTPageOpaque opaque;
+       state.stack = (GISTInsertStack*)palloc(sizeof(GISTInsertStack));
+       memset( state.stack, 0, sizeof(GISTInsertStack));
+       state.stack->blkno=GIST_ROOT_BLKNO;
 
-       buffer = ReadBuffer(r, blkno);
-       page = (Page) BufferGetPage(buffer);
-       opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
+       gistfindleaf(&state, giststate);
+       gistmakedeal(&state, giststate);
+}
 
-       if (!(opaque->flags & F_LEAF))
+static bool
+gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
+       bool is_splitted = false;
+
+       if (gistnospace(state->stack->page, state->itup, state->ituplen))
        {
-        /*
-         * This is an internal page, so continue to walk down the
-         * tree. We find the child node that has the minimum insertion
-         * penalty and recursively invoke ourselves to modify that
-         * node. Once the recursive call returns, we may need to
-         * adjust the parent node for two reasons: the child node
-         * split, or the key in this node needs to be adjusted for the
-         * newly inserted key below us.
-         */
-               ItemId          iid;
-               BlockNumber nblkno;
-               ItemPointerData oldtid;
-               IndexTuple      oldtup;
-               OffsetNumber child;
-
-               child = gistchoose(r, page, *(*itup), giststate);
-               iid = PageGetItemId(page, child);
-               oldtup = (IndexTuple) PageGetItem(page, iid);
-               nblkno = ItemPointerGetBlockNumber(&(oldtup->t_tid));
+               /* no space for insertion */
+               IndexTuple *itvec,
+                                  *newitup;
+               int                     tlen,olen;
+               PageLayout      *dist=NULL, *ptr;
+
+               memset(&dist, 0, sizeof(PageLayout));
+               is_splitted = true;
+               itvec = gistextractbuffer(state->stack->buffer, &tlen);
+               olen=tlen;
+               itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
+               newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate);
+
+               if ( !state->r->rd_istemp && !state->xlog_mode) {
+                       gistxlogPageSplit       xlrec;
+                       XLogRecPtr              recptr;
+                       XLogRecData             *rdata;
+                       int i, npage = 0, cur=1;
+
+                       ptr=dist;
+                       while( ptr ) {
+                               npage++;
+                               ptr=ptr->next;
+                       }
 
-               /*
-                * After this call: 1. if child page was splited, then itup
-                * contains keys for each page 2. if  child page wasn't splited,
-                * then itup contains additional for adjustment of current key
-                */
-               ret = gistlayerinsert(r, nblkno, itup, len, giststate);
+                       rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + state->ituplen + 2));
+
+                       xlrec.node = state->r->rd_node;
+                       xlrec.origblkno = state->stack->blkno;
+                       xlrec.npage = npage;
+                       xlrec.nitup = state->ituplen;
+                       xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber;
+                       xlrec.key = state->key;
+                       xlrec.pathlen = (uint16)state->pathlen;
+
+                       rdata[0].buffer = InvalidBuffer;
+                       rdata[0].data   = (char *) &xlrec;
+                       rdata[0].len    = sizeof( gistxlogPageSplit );
+                       rdata[0].next   = NULL;
+
+                       if ( state->pathlen>=0 ) {
+                               rdata[0].next   = &(rdata[1]);
+                               rdata[1].buffer = InvalidBuffer;
+                               rdata[1].data   = (char *) (state->path);
+                               rdata[1].len    = sizeof( BlockNumber ) * state->pathlen;
+                               rdata[1].next   = NULL;
+                               cur++;
+                       }
+                       
+                       /* new tuples */        
+                       for(i=0;i<state->ituplen;i++) {
+                               rdata[cur].buffer = InvalidBuffer;
+                               rdata[cur].data   = (char*)(state->itup[i]);
+                               rdata[cur].len  = IndexTupleSize(state->itup[i]);
+                               rdata[cur-1].next = &(rdata[cur]);
+                               cur++;
+                       }
 
-               /* nothing inserted in child */
-               if (!(ret & INSERTED))
-               {
-                       ReleaseBuffer(buffer);
-                       return 0x00;
-               }
+                       /* new page layout */
+                       ptr=dist;
+                       while(ptr) {
+                               rdata[cur].buffer = InvalidBuffer;
+                               rdata[cur].data   = (char*)&(ptr->block);
+                               rdata[cur].len  = sizeof(gistxlogPage);
+                               rdata[cur-1].next = &(rdata[cur]);
+                               cur++;
+
+                               rdata[cur].buffer = InvalidBuffer;
+                               rdata[cur].data   = (char*)(ptr->list);
+                               rdata[cur].len    = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num);
+                               if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num )
+                                       rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len );
+                               rdata[cur-1].next = &(rdata[cur]);
+                               rdata[cur].next=NULL;
+                               cur++;
+                               
+                               ptr=ptr->next;
+                       }
 
-               /* child did not split */
-               if (!(ret & SPLITED))
-               {
-                       IndexTuple      newtup = gistgetadjusted(r, oldtup, (*itup)[0], giststate);
+                       START_CRIT_SECTION();
 
-                       if (!newtup)
-                       {
-                               /* not need to update key */
-                               ReleaseBuffer(buffer);
-                               return 0x00;
+                       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
+                       ptr = dist;
+                       while(ptr) {    
+                               PageSetLSN(BufferGetPage(ptr->buffer), recptr);
+                               PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
+                               ptr=ptr->next;
                        }
 
-                       (*itup)[0] = newtup;
+                       END_CRIT_SECTION();
                }
 
-        /*
-         * This node's key has been modified, either because a child
-         * split occurred or because we needed to adjust our key for
-         * an insert in a child node. Therefore, remove the old
-         * version of this node's key.
-         */
-               ItemPointerSet(&oldtid, blkno, child);
-               gistdelete(r, &oldtid);
-
-               /*
-                * if child was splitted, new key for child will be inserted in
-                * the end list of child, so we must say to any scans that page is
-                * changed beginning from 'child' offset
-                */
-               if (ret & SPLITED)
-                       gistadjscans(r, GISTOP_SPLIT, blkno, child);
-       }
+               ptr = dist;
+               while(ptr) {
+                       WriteBuffer(ptr->buffer);
+                       ptr=ptr->next;
+               }
 
-       ret = INSERTED;
+               state->itup = newitup;
+               state->ituplen = tlen;                  /* now tlen >= 2 */
 
-       if (gistnospace(page, (*itup), *len))
-       {
-               /* no space for insertion */
-               IndexTuple *itvec,
-                                  *newitup;
-               int                     tlen,
-                                       oldlen;
-
-               ret |= SPLITED;
-               itvec = gistreadbuffer(buffer, &tlen);
-               itvec = gistjoinvector(itvec, &tlen, (*itup), *len);
-               oldlen = *len;
-               newitup = gistSplit(r, buffer, itvec, &tlen, giststate);
-               ReleaseBuffer(buffer);
-               *itup = newitup;
-               *len = tlen;                    /* now tlen >= 2 */
+               if ( state->stack->blkno == GIST_ROOT_BLKNO ) {
+                       gistnewroot(state->r, state->itup, state->ituplen, &(state->key), state->xlog_mode);
+                       state->needInsertComplete=false;
+               }
+               if ( state->xlog_mode ) 
+                       LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
+               ReleaseBuffer(state->stack->buffer);
        }
        else
        {
                /* enough space */
-               OffsetNumber off,
-                                       l;
+               OffsetNumber off, l;
 
-               off = (PageIsEmpty(page)) ?
+               off = (PageIsEmpty(state->stack->page)) ?
                        FirstOffsetNumber
                        :
-                       OffsetNumberNext(PageGetMaxOffsetNumber(page));
-               l = gistwritebuffer(r, page, *itup, *len, off);
-               WriteBuffer(buffer);
+                       OffsetNumberNext(PageGetMaxOffsetNumber(state->stack->page));
+               l = gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, off);
+               if ( !state->r->rd_istemp && !state->xlog_mode) {
+                       gistxlogEntryUpdate     xlrec;
+                       XLogRecPtr              recptr;
+                       XLogRecData             *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( state->ituplen + 2 ) );
+                       int i, cur=0;
+                       
+                       xlrec.node = state->r->rd_node;
+                       xlrec.blkno = state->stack->blkno;
+                       xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber;
+                       xlrec.key = state->key;
+                       xlrec.pathlen = (uint16)state->pathlen;
+
+                       rdata[0].buffer = InvalidBuffer;
+                       rdata[0].data   = (char *) &xlrec;
+                       rdata[0].len    = sizeof( gistxlogEntryUpdate );
+                       rdata[0].next   = NULL;
+
+                       if ( state->pathlen>=0 ) {
+                               rdata[0].next   = &(rdata[1]);
+                               rdata[1].buffer = InvalidBuffer;
+                               rdata[1].data   = (char *) (state->path);
+                               rdata[1].len    = sizeof( BlockNumber ) * state->pathlen;
+                               rdata[1].next   = NULL;
+                               cur++;
+                       }
+
+                       for(i=1; i<=state->ituplen; i++) { /* adding tuples */
+                               rdata[i+cur].buffer = InvalidBuffer;
+                               rdata[i+cur].data   = (char*)(state->itup[i-1]);
+                               rdata[i+cur].len        = IndexTupleSize(state->itup[i-1]);
+                               rdata[i+cur].next       = NULL;
+                               rdata[i-1+cur].next = &(rdata[i+cur]);
+                       }       
+                       
+                       START_CRIT_SECTION();
+
+                       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
+                       PageSetLSN(state->stack->page, recptr);
+                       PageSetTLI(state->stack->page, ThisTimeLineID);
+
+                       END_CRIT_SECTION();
+               }
+
+               if ( state->stack->blkno == GIST_ROOT_BLKNO ) 
+                        state->needInsertComplete=false;
 
-               if (*len > 1)
-               {                                               /* previous insert ret & SPLITED != 0 */
+               if ( state->xlog_mode ) 
+                       LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
+               WriteBuffer(state->stack->buffer);
+
+               if (state->ituplen > 1)
+               {                                               /* previous is_splitted==true */
                        /*
                         * child was splited, so we must form union for insertion in
                         * parent
                         */
-                       IndexTuple      newtup = gistunion(r, (*itup), *len, giststate);
-                       ItemPointerSet(&(newtup->t_tid), blkno, 1);
-                       (*itup)[0] = newtup;
-                       *len = 1;
+                       IndexTuple      newtup = gistunion(state->r, state->itup, state->ituplen, giststate);
+                       ItemPointerSet(&(newtup->t_tid), state->stack->blkno, FirstOffsetNumber);
+                       state->itup[0] = newtup;
+                       state->ituplen = 1;
                }
        }
-
-       return ret;
-}
-
-/*
- * Write itup vector to page, has no control of free space
- */
-static OffsetNumber
-gistwritebuffer(Relation r, Page page, IndexTuple *itup,
-                               int len, OffsetNumber off)
-{
-       OffsetNumber l = InvalidOffsetNumber;
-       int                     i;
-
-       for (i = 0; i < len; i++)
-       {
-#ifdef GIST_PAGEADDITEM
-               GISTENTRY       tmpdentry;
-               IndexTuple      newtup;
-               bool            IsNull;
-
-               l = gistPageAddItem(giststate, r, page,
-                                                       (Item) itup[i], IndexTupleSize(itup[i]),
-                                                       off, LP_USED, &tmpdentry, &newtup);
-               off = OffsetNumberNext(off);
-#else
-               l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]),
-                                               off, LP_USED);
-               if (l == InvalidOffsetNumber)
-                       elog(ERROR, "failed to add index item to \"%s\"",
-                                RelationGetRelationName(r));
-#endif
-       }
-       return l;
+       return is_splitted;
 }
 
-/*
- * Check space for itup vector on page
- */
-static bool
-gistnospace(Page page, IndexTuple *itvec, int len)
-{
-       unsigned int size = 0;
-       int                     i;
-
-       for (i = 0; i < len; i++)
-               size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
-
-       return (PageGetFreeSpace(page) < size);
-}
-
-/*
- * Read buffer into itup vector
- */
-static IndexTuple *
-gistreadbuffer(Buffer buffer, int *len /* out */ )
-{
-       OffsetNumber i,
-                               maxoff;
-       IndexTuple *itvec;
-       Page            p = (Page) BufferGetPage(buffer);
-
-       maxoff = PageGetMaxOffsetNumber(p);
-       *len = maxoff;
-       itvec = palloc(sizeof(IndexTuple) * maxoff);
-       for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
-               itvec[i - 1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
-
-       return itvec;
-}
-
-/*
- * join two vectors into one
- */
-static IndexTuple *
-gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
-{
-       itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
-       memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
-       *len += addlen;
-       return itvec;
-}
-
-/*
- * Return an IndexTuple containing the result of applying the "union"
- * method to the specified IndexTuple vector.
- */
-static IndexTuple
-gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
+static void
+gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
 {
-       Datum           attr[INDEX_MAX_KEYS];
-       bool            isnull[INDEX_MAX_KEYS];
-       GistEntryVector *evec;
-       int                     i;
-       GISTENTRY       centry[INDEX_MAX_KEYS];
-
-       evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
-
-       for (i = 0; i < r->rd_att->natts; i++)
-       {
-               Datum           datum;
-               int                     j;
-               int                     real_len;
+       ItemId          iid;
+       IndexTuple      oldtup;
+       GISTInsertStack *ptr;
 
-               real_len = 0;
-               for (j = 0; j < len; j++)
-               {
-                       bool            IsNull;
-                       datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
-                       if (IsNull)
-                               continue;
-
-                       gistdentryinit(giststate, i,
-                                                  &(evec->vector[real_len]),
-                                                  datum,
-                                                  NULL, NULL, (OffsetNumber) 0,
-                                                  ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
-                                                  FALSE, IsNull);
-                       real_len++;
-               }
+       /* walk down */
+       while( true ) { 
+               GISTPageOpaque opaque;
 
-               /* If this tuple vector was all NULLs, the union is NULL */
-               if (real_len == 0)
-               {
-                       attr[i] = (Datum) 0;
-                       isnull[i] = TRUE;
-               }
-               else
+               state->stack->buffer = ReadBuffer(state->r, state->stack->blkno);
+               state->stack->page = (Page) BufferGetPage(state->stack->buffer);
+               opaque = (GISTPageOpaque) PageGetSpecialPointer(state->stack->page);
+       
+               if (!(opaque->flags & F_LEAF))
                {
-                       int datumsize;
-
-                       if (real_len == 1)
-                       {
-                               evec->n = 2;
-                               gistentryinit(evec->vector[1],
-                                                         evec->vector[0].key, r, NULL,
-                                                         (OffsetNumber) 0, evec->vector[0].bytes, FALSE);
-                       }
-                       else
-                               evec->n = real_len;
-
-                       /* Compress the result of the union and store in attr array */
-                       datum = FunctionCall2(&giststate->unionFn[i],
-                                                                 PointerGetDatum(evec),
-                                                                 PointerGetDatum(&datumsize));
-
-                       gistcentryinit(giststate, i, &centry[i], datum,
-                                                  NULL, NULL, (OffsetNumber) 0,
-                                                  datumsize, FALSE, FALSE);
-                       isnull[i] = FALSE;
-                       attr[i] = centry[i].key;
-               }
+                       /*
+                       * This is an internal page, so continue to walk down the
+                       * tree. We find the child node that has the minimum insertion
+                       * penalty and recursively invoke ourselves to modify that
+                       * node. Once the recursive call returns, we may need to
+                       * adjust the parent node for two reasons: the child node
+                       * split, or the key in this node needs to be adjusted for the
+                       * newly inserted key below us.
+                       */
+                       GISTInsertStack *item=(GISTInsertStack*)palloc(sizeof(GISTInsertStack));
+       
+                       state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate);
+
+                       iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
+                       oldtup = (IndexTuple) PageGetItem(state->stack->page, iid);
+                       item->blkno = ItemPointerGetBlockNumber(&(oldtup->t_tid));
+                       item->parent = state->stack;
+                       item->todelete = false;
+                       state->stack = item;
+               } else 
+                       break;
        }
 
-       return index_form_tuple(giststate->tupdesc, attr, isnull);
-}
-
-
-/*
- * Forms union of oldtup and addtup, if union == oldtup then return NULL
- */
-static IndexTuple
-gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
-{
-       GistEntryVector *evec;
-       bool            neednew = false;
-       bool            isnull[INDEX_MAX_KEYS];
-       Datum           attr[INDEX_MAX_KEYS];
-       GISTENTRY       centry[INDEX_MAX_KEYS],
-                               oldatt[INDEX_MAX_KEYS],
-                               addatt[INDEX_MAX_KEYS],
-                          *ev0p,
-                          *ev1p;
-       bool            oldisnull[INDEX_MAX_KEYS],
-                               addisnull[INDEX_MAX_KEYS];
-       IndexTuple      newtup = NULL;
-       int                     i;
-
-       evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
-       evec->n = 2;
-       ev0p = &(evec->vector[0]);
-       ev1p = &(evec->vector[1]);
-
-       gistDeCompressAtt(giststate, r, oldtup, NULL,
-                                         (OffsetNumber) 0, oldatt, oldisnull);
-
-       gistDeCompressAtt(giststate, r, addtup, NULL,
-                                         (OffsetNumber) 0, addatt, addisnull);
-
-       for (i = 0; i < r->rd_att->natts; i++)
-       {
-               if (oldisnull[i] && addisnull[i])
-               {
-                       attr[i] = (Datum) 0;
-                       isnull[i] = TRUE;
-               }
-               else
-               {
-                       Datum           datum;
-                       int                     datumsize;
-
-                       FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes,
-                                  addisnull[i], addatt[i].key, addatt[i].bytes);
-
-                       datum = FunctionCall2(&giststate->unionFn[i],
-                                                                 PointerGetDatum(evec),
-                                                                 PointerGetDatum(&datumsize));
-
-                       if (oldisnull[i] || addisnull[i])
-                       {
-                               if (oldisnull[i])
-                                       neednew = true;
-                       }
-                       else
-                       {
-                               bool    result;
-
-                               FunctionCall3(&giststate->equalFn[i],
-                                                         ev0p->key,
-                                                         datum,
-                                                         PointerGetDatum(&result));
+       /* now state->stack->(page, buffer and blkno) points to leaf page, so insert */
 
-                               if (!result)
-                                       neednew = true;
-                       }
-
-                       gistcentryinit(giststate, i, &centry[i], datum,
-                                                  NULL, NULL, (OffsetNumber) 0,
-                                                  datumsize, FALSE, FALSE);
-
-                       attr[i] = centry[i].key;
-                       isnull[i] = FALSE;
-               }
+       /* form state->path to work xlog */
+       ptr = state->stack;
+       state->pathlen=1;
+       while( ptr ) {
+               state->pathlen++;
+               ptr=ptr->parent;
        }
-
-       if (neednew)
-       {
-               /* need to update key */
-               newtup = index_form_tuple(giststate->tupdesc, attr, isnull);
-               newtup->t_tid = oldtup->t_tid;
+       state->path=(BlockNumber*)palloc(sizeof(BlockNumber)*state->pathlen);
+       ptr = state->stack;
+       state->pathlen=0;
+       while( ptr ) {
+               state->path[ state->pathlen ] = ptr->blkno;
+               state->pathlen++;
+               ptr=ptr->parent;
        }
-
-       return newtup;
+       state->pathlen--;
+       state->path++;
 }
 
-static void
-gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
-{
-       int lr;
-
-       for (lr = 0; lr < 2; lr++)
-       {
-               OffsetNumber *entries;
-               int                     i;
-               Datum      *attr;
-               int                     len,
-                                       *attrsize;
-               bool       *isnull;
-               GistEntryVector *evec;
-
-               if (lr)
-               {
-                       attrsize = spl->spl_lattrsize;
-                       attr = spl->spl_lattr;
-                       len = spl->spl_nleft;
-                       entries = spl->spl_left;
-                       isnull = spl->spl_lisnull;
-               }
-               else
-               {
-                       attrsize = spl->spl_rattrsize;
-                       attr = spl->spl_rattr;
-                       len = spl->spl_nright;
-                       entries = spl->spl_right;
-                       isnull = spl->spl_risnull;
-               }
-
-               evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
-
-               for (i = 1; i < r->rd_att->natts; i++)
-               {
-                       int                     j;
-                       Datum           datum;
-                       int                     datumsize;
-                       int                     real_len;
-
-                       real_len = 0;
-                       for (j = 0; j < len; j++)
-                       {
-                               bool            IsNull;
-
-                               if (spl->spl_idgrp[entries[j]])
-                                       continue;
-                               datum = index_getattr(itvec[entries[j] - 1], i + 1,
-                                                                         giststate->tupdesc, &IsNull);
-                               if (IsNull)
-                                       continue;
-                               gistdentryinit(giststate, i,
-                                                          &(evec->vector[real_len]),
-                                                          datum,
-                                                          NULL, NULL, (OffsetNumber) 0,
-                                                          ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
-                                                          FALSE, IsNull);
-                               real_len++;
-
-                       }
 
-                       if (real_len == 0)
-                       {
-                               datum = (Datum) 0;
-                               datumsize = 0;
-                               isnull[i] = true;
-                       }
-                       else
-                       {
-                               /*
-                                * evec->vector[0].bytes may be not defined, so form union
-                                * with itself
-                                */
-                               if (real_len == 1)
-                               {
-                                       evec->n = 2;
-                                       memcpy(&(evec->vector[1]), &(evec->vector[0]),
-                                                  sizeof(GISTENTRY));
-                               }
-                               else
-                                       evec->n = real_len;
-                               datum = FunctionCall2(&giststate->unionFn[i],
-                                                                         PointerGetDatum(evec),
-                                                                         PointerGetDatum(&datumsize));
-                               isnull[i] = false;
-                       }
+void
+gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) {
+       int                     is_splitted;
+       ItemId          iid;
+       IndexTuple      oldtup, newtup;
 
-                       attr[i] = datum;
-                       attrsize[i] = datumsize;
-               }
-       }
-}
+       /* walk up */
+       while( true ) {
+                /*
+                 * After this call: 1. if child page was splited, then itup
+                 * contains keys for each page 2. if  child page wasn't splited,
+                 * then itup contains additional for adjustment of current key
+                 */
 
-/*
- * find group in vector with equal value
- */
-static int
-gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
-{
-       int                     i;
-       int                     curid = 1;
-
-       /*
-        * first key is always not null (see gistinsert), so we may not check
-        * for nulls
-        */
-       for (i = 0; i < spl->spl_nleft; i++)
-       {
-               int j;
-               int len;
-               bool result;
-
-               if (spl->spl_idgrp[spl->spl_left[i]])
-                       continue;
-               len = 0;
-               /* find all equal value in right part */
-               for (j = 0; j < spl->spl_nright; j++)
-               {
-                       if (spl->spl_idgrp[spl->spl_right[j]])
-                               continue;
-                       FunctionCall3(&giststate->equalFn[0],
-                                                 valvec[spl->spl_left[i]].key,
-                                                 valvec[spl->spl_right[j]].key,
-                                                 PointerGetDatum(&result));
-                       if (result)
-                       {
-                               spl->spl_idgrp[spl->spl_right[j]] = curid;
-                               len++;
-                       }
-               }
-               /* find all other equal value in left part */
-               if (len)
-               {
-                       /* add current val to list of equal values */
-                       spl->spl_idgrp[spl->spl_left[i]] = curid;
-                       /* searching .. */
-                       for (j = i + 1; j < spl->spl_nleft; j++)
-                       {
-                               if (spl->spl_idgrp[spl->spl_left[j]])
-                                       continue;
-                               FunctionCall3(&giststate->equalFn[0],
-                                                         valvec[spl->spl_left[i]].key,
-                                                         valvec[spl->spl_left[j]].key,
-                                                         PointerGetDatum(&result));
-                               if (result)
-                               {
-                                       spl->spl_idgrp[spl->spl_left[j]] = curid;
-                                       len++;
-                               }
-                       }
-                       spl->spl_ngrp[curid] = len + 1;
-                       curid++;
-               }
-       }
+               is_splitted = gistplacetopage(state, giststate );
 
-       return curid;
-}
+               /* pop page from stack */
+               state->stack = state->stack->parent;
+               state->pathlen--;
+               state->path++;
+       
+               /* stack is void */
+               if ( ! state->stack )
+                       break;
 
-/*
- * Insert equivalent tuples to left or right page with minimum
- * penalty
- */
-static void
-gistadjsubkey(Relation r,
-                         IndexTuple *itup, /* contains compressed entry */
-                         int *len,
-                         GIST_SPLITVEC *v,
-                         GISTSTATE *giststate)
-{
-       int                     curlen;
-       OffsetNumber *curwpos;
-       GISTENTRY       entry,
-                               identry[INDEX_MAX_KEYS],
-                          *ev0p,
-                          *ev1p;
-       float           lpenalty,
-                               rpenalty;
-       GistEntryVector *evec;
-       int                     datumsize;
-       bool            isnull[INDEX_MAX_KEYS];
-       int                     i,
-                               j;
 
-       /* clear vectors */
-       curlen = v->spl_nleft;
-       curwpos = v->spl_left;
-       for (i = 0; i < v->spl_nleft; i++)
-       {
-               if (v->spl_idgrp[v->spl_left[i]] == 0)
+               /* child did not split */
+               if (!is_splitted)
                {
-                       *curwpos = v->spl_left[i];
-                       curwpos++;
-               }
-               else
-                       curlen--;
-       }
-       v->spl_nleft = curlen;
+                       /* parent's tuple */
+                       iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
+                       oldtup = (IndexTuple) PageGetItem(state->stack->page, iid);
+                       newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate);
+       
+                       if (!newtup) /* not need to update key */
+                               break;
 
-       curlen = v->spl_nright;
-       curwpos = v->spl_right;
-       for (i = 0; i < v->spl_nright; i++)
-       {
-               if (v->spl_idgrp[v->spl_right[i]] == 0)
-               {
-                       *curwpos = v->spl_right[i];
-                       curwpos++;
+                       state->itup[0] = newtup;        
                }
-               else
-                       curlen--;
+       
+               /*
+                * This node's key has been modified, either because a child
+                * split occurred or because we needed to adjust our key for
+                * an insert in a child node. Therefore, remove the old
+                * version of this node's key.
+                */
+               gistadjscans(state->r, GISTOP_DEL, state->stack->blkno, state->stack->childoffnum);
+               PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
+               if ( !state->r->rd_istemp ) 
+                       state->stack->todelete = true;
+                               
+               /*
+                * if child was splitted, new key for child will be inserted in
+                * the end list of child, so we must say to any scans that page is
+                * changed beginning from 'child' offset
+                */
+               if (is_splitted)
+                       gistadjscans(state->r, GISTOP_SPLIT, state->stack->blkno, state->stack->childoffnum);
+       } /* while */
+
+       /* release all buffers */
+       while( state->stack ) {
+               if ( state->xlog_mode ) 
+                       LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
+               ReleaseBuffer(state->stack->buffer);
+               state->stack = state->stack->parent;
        }
-       v->spl_nright = curlen;
 
-       evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
-       evec->n = 2;
-       ev0p = &(evec->vector[0]);
-       ev1p = &(evec->vector[1]);
+       /* say to xlog that insert is completed */
+       if ( !state->xlog_mode && state->needInsertComplete && !state->r->rd_istemp ) {
+               gistxlogInsertComplete  xlrec;
+               XLogRecData             rdata;
+                       
+               xlrec.node = state->r->rd_node;
+               xlrec.key = state->key;
+                       
+               rdata.buffer = InvalidBuffer;
+               rdata.data   = (char *) &xlrec;
+               rdata.len    = sizeof( gistxlogInsertComplete );
+               rdata.next   = NULL;
 
-       /* add equivalent tuple */
-       for (i = 0; i < *len; i++)
-       {
-               Datum           datum;
+               START_CRIT_SECTION();
 
-               if (v->spl_idgrp[i + 1] == 0)   /* already inserted */
-                       continue;
-               gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0,
-                                                 identry, isnull);
+               XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, &rdata);
 
-               v->spl_ngrp[v->spl_idgrp[i + 1]]--;
-               if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 &&
-                       (v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED)
-               {
-                       /* force last in group */
-                       rpenalty = 1.0;
-                       lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0;
-               }
-               else
-               {
-                       /* where? */
-                       for (j = 1; j < r->rd_att->natts; j++)
-                       {
-                               gistentryinit(entry, v->spl_lattr[j], r, NULL,
-                                                  (OffsetNumber) 0, v->spl_lattrsize[j], FALSE);
-                               gistpenalty(giststate, j, &entry, v->spl_lisnull[j],
-                                                       &identry[j], isnull[j], &lpenalty);
-
-                               gistentryinit(entry, v->spl_rattr[j], r, NULL,
-                                                  (OffsetNumber) 0, v->spl_rattrsize[j], FALSE);
-                               gistpenalty(giststate, j, &entry, v->spl_risnull[j],
-                                                       &identry[j], isnull[j], &rpenalty);
-
-                               if (lpenalty != rpenalty)
-                                       break;
-                       }
-               }
-
-               /*
-                * add
-                * XXX: refactor this to avoid duplicating code
-                */
-               if (lpenalty < rpenalty)
-               {
-                       v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED;
-                       v->spl_left[v->spl_nleft] = i + 1;
-                       v->spl_nleft++;
-                       for (j = 1; j < r->rd_att->natts; j++)
-                       {
-                               if (isnull[j] && v->spl_lisnull[j])
-                               {
-                                       v->spl_lattr[j] = (Datum) 0;
-                                       v->spl_lattrsize[j] = 0;
-                               }
-                               else
-                               {
-                                       FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j],
-                                                  isnull[j], identry[j].key, identry[j].bytes);
-
-                                       datum = FunctionCall2(&giststate->unionFn[j],
-                                                                                 PointerGetDatum(evec),
-                                                                                 PointerGetDatum(&datumsize));
-
-                                       v->spl_lattr[j] = datum;
-                                       v->spl_lattrsize[j] = datumsize;
-                                       v->spl_lisnull[j] = false;
-                               }
-                       }
-               }
-               else
-               {
-                       v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED;
-                       v->spl_right[v->spl_nright] = i + 1;
-                       v->spl_nright++;
-                       for (j = 1; j < r->rd_att->natts; j++)
-                       {
-                               if (isnull[j] && v->spl_risnull[j])
-                               {
-                                       v->spl_rattr[j] = (Datum) 0;
-                                       v->spl_rattrsize[j] = 0;
-                               }
-                               else
-                               {
-                                       FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j],
-                                                  isnull[j], identry[j].key, identry[j].bytes);
-
-                                       datum = FunctionCall2(&giststate->unionFn[j],
-                                                                                 PointerGetDatum(evec),
-                                                                                 PointerGetDatum(&datumsize));
-
-                                       v->spl_rattr[j] = datum;
-                                       v->spl_rattrsize[j] = datumsize;
-                                       v->spl_risnull[j] = false;
-                               }
-                       }
-               }
+               END_CRIT_SECTION();
        }
 }
 
@@ -1105,6 +674,7 @@ gistSplit(Relation r,
                  Buffer buffer,
                  IndexTuple *itup,             /* contains compressed entry */
                  int *len,
+                 PageLayout    **dist,
                  GISTSTATE *giststate)
 {
        Page            p;
@@ -1225,29 +795,57 @@ gistSplit(Relation r,
        /* write on disk (may need another split) */
        if (gistnospace(right, rvectup, v.spl_nright))
        {
+               int i;
+               PageLayout *d, *origd=*dist;
+       
                nlen = v.spl_nright;
-               newtup = gistSplit(r, rightbuf, rvectup, &nlen, giststate);
+               newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate);
+               /* XLOG stuff */
+               d=*dist;
+               /* translate offsetnumbers to our */
+               while( d && d!=origd ) {
+                       for(i=0;i<d->block.num;i++)
+                               d->list[i] = v.spl_right[ d->list[i]-1 ]; 
+                       d=d->next;
+               }
                ReleaseBuffer(rightbuf);
        }
        else
        {
                OffsetNumber l;
 
-               l = gistwritebuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber);
-               WriteBuffer(rightbuf);
-
+               l = gistfillbuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber);
+               /* XLOG stuff */
+               ROTATEDIST(*dist);
+               (*dist)->block.blkno = BufferGetBlockNumber(rightbuf);
+               (*dist)->block.num = v.spl_nright;
+               (*dist)->list = v.spl_right;
+               (*dist)->buffer = rightbuf;
                nlen = 1;
                newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1);
                newtup[0] = gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull);
-               ItemPointerSet(&(newtup[0]->t_tid), rbknum, 1);
+               ItemPointerSet(&(newtup[0]->t_tid), rbknum, FirstOffsetNumber);
        }
 
        if (gistnospace(left, lvectup, v.spl_nleft))
        {
                int                     llen = v.spl_nleft;
                IndexTuple *lntup;
-
-               lntup = gistSplit(r, leftbuf, lvectup, &llen, giststate);
+               int i;
+               PageLayout *d, *origd=*dist;
+
+               lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate);
+
+               /* XLOG stuff */
+               d=*dist;
+               /* translate offsetnumbers to our */
+               while( d && d!=origd ) {
+                       for(i=0;i<d->block.num;i++)
+                               d->list[i] = v.spl_left[ d->list[i]-1 ]; 
+                       d=d->next;
+               }
+               
                ReleaseBuffer(leftbuf);
 
                newtup = gistjoinvector(newtup, &nlen, lntup, llen);
@@ -1256,151 +854,76 @@ gistSplit(Relation r,
        {
                OffsetNumber l;
 
-               l = gistwritebuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber);
+               l = gistfillbuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber);
                if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO)
                        PageRestoreTempPage(left, p);
 
-               WriteBuffer(leftbuf);
-
+               /* XLOG stuff */
+               ROTATEDIST(*dist);
+               (*dist)->block.blkno = BufferGetBlockNumber(leftbuf);
+               (*dist)->block.num = v.spl_nleft;
+               (*dist)->list = v.spl_left;
+               (*dist)->buffer = leftbuf;
                nlen += 1;
                newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen);
                newtup[nlen - 1] = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull);
-               ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, 1);
+               ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, FirstOffsetNumber);
        }
 
        *len = nlen;
        return newtup;
 }
 
-static void
-gistnewroot(Relation r, IndexTuple *itup, int len)
-{
-       Buffer          b;
-       Page            p;
-
-       b = ReadBuffer(r, GIST_ROOT_BLKNO);
-       GISTInitBuffer(b, 0);
-       p = BufferGetPage(b);
-
-       gistwritebuffer(r, p, itup, len, FirstOffsetNumber);
-       WriteBuffer(b);
-}
-
-static void
-GISTInitBuffer(Buffer b, uint32 f)
+void
+gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode)
 {
-       GISTPageOpaque opaque;
+       Buffer          buffer;
        Page            page;
-       Size            pageSize;
-
-       pageSize = BufferGetPageSize(b);
-       page = BufferGetPage(b);
-       PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
-
-       opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
-       opaque->flags = f;
-}
-
-/*
- * find entry with lowest penalty
- */
-static OffsetNumber
-gistchoose(Relation r, Page p, IndexTuple it,  /* it has compressed entry */
-                  GISTSTATE *giststate)
-{
-       OffsetNumber maxoff;
-       OffsetNumber i;
-       OffsetNumber which;
-       float           sum_grow,
-                               which_grow[INDEX_MAX_KEYS];
-       GISTENTRY       entry,
-                               identry[INDEX_MAX_KEYS];
-       bool            isnull[INDEX_MAX_KEYS];
-
-       maxoff = PageGetMaxOffsetNumber(p);
-       *which_grow = -1.0;
-       which = -1;
-       sum_grow = 1;
-       gistDeCompressAtt(giststate, r,
-                                         it, NULL, (OffsetNumber) 0,
-                                         identry, isnull);
-
-       for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i))
-       {
-               int                     j;
-               IndexTuple      itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
 
-               sum_grow = 0;
-               for (j = 0; j < r->rd_att->natts; j++)
-               {
-                       Datum           datum;
-                       float           usize;
-                       bool            IsNull;
-
-                       datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
-                       gistdentryinit(giststate, j, &entry, datum, r, p, i,
-                                                  ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull),
-                                                  FALSE, IsNull);
-                       gistpenalty(giststate, j, &entry, IsNull,
-                                               &identry[j], isnull[j], &usize);
-
-                       if (which_grow[j] < 0 || usize < which_grow[j])
-                       {
-                               which = i;
-                               which_grow[j] = usize;
-                               if (j < r->rd_att->natts - 1 && i == FirstOffsetNumber)
-                                       which_grow[j + 1] = -1;
-                               sum_grow += which_grow[j];
-                       }
-                       else if (which_grow[j] == usize)
-                               sum_grow += usize;
-                       else
-                       {
-                               sum_grow = 1;
-                               break;
-                       }
-               }
+       buffer = (xlog_mode) ? XLogReadBuffer(false, r, GIST_ROOT_BLKNO) : ReadBuffer(r, GIST_ROOT_BLKNO);
+       GISTInitBuffer(buffer, 0);
+       page = BufferGetPage(buffer);
+
+       gistfillbuffer(r, page, itup, len, FirstOffsetNumber);
+       if ( !xlog_mode && !r->rd_istemp ) {
+               gistxlogEntryUpdate     xlrec;
+               XLogRecPtr              recptr;
+               XLogRecData             *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( len + 1 ) );
+               int i;
+                       
+               xlrec.node = r->rd_node;
+               xlrec.blkno = GIST_ROOT_BLKNO;
+               xlrec.todeleteoffnum = InvalidOffsetNumber;
+               xlrec.key = *key;
+               xlrec.pathlen=0;
+                       
+               rdata[0].buffer = InvalidBuffer;
+               rdata[0].data   = (char *) &xlrec;
+               rdata[0].len    = sizeof( gistxlogEntryUpdate );
+               rdata[0].next   = NULL;
+
+               for(i=1; i<=len; i++) {
+                       rdata[i].buffer = InvalidBuffer;
+                       rdata[i].data   = (char*)(itup[i-1]);
+                       rdata[i].len    = IndexTupleSize(itup[i-1]);
+                       rdata[i].next   = NULL;
+                       rdata[i-1].next = &(rdata[i]);
+               }       
+                       
+               START_CRIT_SECTION();
+
+               recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
+               PageSetLSN(page, recptr);
+               PageSetTLI(page, ThisTimeLineID);
+
+               END_CRIT_SECTION();
        }
-
-       return which;
+       if ( xlog_mode ) 
+               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       WriteBuffer(buffer);
 }
 
-/*
- * Retail deletion of a single tuple.
- *
- * NB: this is no longer called externally, but is still needed by
- * gistlayerinsert().  That dependency will have to be fixed if GIST
- * is ever going to allow concurrent insertions.
- */
-static void
-gistdelete(Relation r, ItemPointer tid)
-{
-       BlockNumber blkno;
-       OffsetNumber offnum;
-       Buffer          buf;
-       Page            page;
-
-       /*
-        * Since GIST is not marked "amconcurrent" in pg_am, caller should
-        * have acquired exclusive lock on index relation.      We need no locking
-        * here.
-        */
-
-       blkno = ItemPointerGetBlockNumber(tid);
-       offnum = ItemPointerGetOffsetNumber(tid);
-
-       /* adjust any scans that will be affected by this deletion */
-       /* NB: this works only for scans in *this* backend! */
-       gistadjscans(r, GISTOP_DEL, blkno, offnum);
-
-       /* delete the index tuple */
-       buf = ReadBuffer(r, blkno);
-       page = BufferGetPage(buf);
-
-       PageIndexTupleDelete(page, offnum);
-
-       WriteBuffer(buf);
-}
 
 /*
  * Bulk deletion of all index entries pointing to a set of heap tuples.
@@ -1462,6 +985,30 @@ gistbulkdelete(PG_FUNCTION_ARGS)
                        page = BufferGetPage(buf);
 
                        PageIndexTupleDelete(page, offnum);
+                       if ( !rel->rd_istemp ) {
+                               gistxlogEntryUpdate     xlrec;
+                               XLogRecPtr              recptr;
+                               XLogRecData             rdata;
+                       
+                               xlrec.node = rel->rd_node;
+                               xlrec.blkno = blkno;
+                               xlrec.todeleteoffnum = offnum;
+                               xlrec.pathlen=0;
+                               ItemPointerSetInvalid( &(xlrec.key) );
+                       
+                               rdata.buffer = InvalidBuffer;
+                               rdata.data   = (char *) &xlrec;
+                               rdata.len    = sizeof( gistxlogEntryUpdate );
+                               rdata.next   = NULL;
+
+                               START_CRIT_SECTION();
+
+                               recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_DELETE, &rdata);
+                               PageSetLSN(page, recptr);
+                               PageSetTLI(page, ThisTimeLineID);
+
+                               END_CRIT_SECTION();
+                       }
 
                        WriteBuffer(buf);
 
@@ -1527,159 +1074,6 @@ freeGISTstate(GISTSTATE *giststate)
        /* no work */
 }
 
-#ifdef GIST_PAGEADDITEM
-/*
- * Given an IndexTuple to be inserted on a page, this routine replaces
- * the key with another key, which may involve generating a new IndexTuple
- * if the sizes don't match or if the null status changes.
- *
- * XXX this only works for a single-column index tuple!
- */
-static IndexTuple
-gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t)
-{
-       bool            IsNull;
-       Datum           datum = index_getattr(t, 1, r->rd_att, &IsNull);
-
-       /*
-        * If new entry fits in index tuple, copy it in.  To avoid worrying
-        * about null-value bitmask, pass it off to the general
-        * index_form_tuple routine if either the previous or new value is
-        * NULL.
-        */
-       if (!IsNull && DatumGetPointer(entry.key) != NULL &&
-               (Size) entry.bytes <= ATTSIZE(datum, r, 1, IsNull))
-       {
-               memcpy(DatumGetPointer(datum),
-                          DatumGetPointer(entry.key),
-                          entry.bytes);
-               /* clear out old size */
-               t->t_info &= ~INDEX_SIZE_MASK;
-               /* or in new size */
-               t->t_info |= MAXALIGN(entry.bytes + sizeof(IndexTupleData));
-
-               return t;
-       }
-       else
-       {
-               /* generate a new index tuple for the compressed entry */
-               TupleDesc       tupDesc = r->rd_att;
-               IndexTuple      newtup;
-               bool            isnull;
-
-               isnull = (DatumGetPointer(entry.key) == NULL);
-               newtup = index_form_tuple(tupDesc, &(entry.key), &isnull);
-               newtup->t_tid = t->t_tid;
-               return newtup;
-       }
-}
-#endif
-
-/*
- * initialize a GiST entry with a decompressed version of key
- */
-void
-gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
-                          Datum k, Relation r, Page pg, OffsetNumber o,
-                          int b, bool l, bool isNull)
-{
-       if (b && !isNull)
-       {
-               GISTENTRY  *dep;
-
-               gistentryinit(*e, k, r, pg, o, b, l);
-               dep = (GISTENTRY *)
-                       DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey],
-                                                                                 PointerGetDatum(e)));
-               /* decompressFn may just return the given pointer */
-               if (dep != e)
-                       gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
-                                                 dep->bytes, dep->leafkey);
-       }
-       else
-               gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
-}
-
-
-/*
- * initialize a GiST entry with a compressed version of key
- */
-static void
-gistcentryinit(GISTSTATE *giststate, int nkey,
-                          GISTENTRY *e, Datum k, Relation r,
-                          Page pg, OffsetNumber o, int b, bool l, bool isNull)
-{
-       if (!isNull)
-       {
-               GISTENTRY  *cep;
-
-               gistentryinit(*e, k, r, pg, o, b, l);
-               cep = (GISTENTRY *)
-                       DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey],
-                                                                                 PointerGetDatum(e)));
-               /* compressFn may just return the given pointer */
-               if (cep != e)
-                       gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset,
-                                                 cep->bytes, cep->leafkey);
-       }
-       else
-               gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
-}
-
-static IndexTuple
-gistFormTuple(GISTSTATE *giststate, Relation r,
-                         Datum attdata[], int datumsize[], bool isnull[])
-{
-       GISTENTRY       centry[INDEX_MAX_KEYS];
-       Datum           compatt[INDEX_MAX_KEYS];
-       int                     i;
-
-       for (i = 0; i < r->rd_att->natts; i++)
-       {
-               if (isnull[i])
-                       compatt[i] = (Datum) 0;
-               else
-               {
-                       gistcentryinit(giststate, i, &centry[i], attdata[i],
-                                                  NULL, NULL, (OffsetNumber) 0,
-                                                  datumsize[i], FALSE, FALSE);
-                       compatt[i] = centry[i].key;
-               }
-       }
-
-       return index_form_tuple(giststate->tupdesc, compatt, isnull);
-}
-
-static void
-gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
-                                 OffsetNumber o, GISTENTRY *attdata, bool *isnull)
-{
-       int                     i;
-
-       for (i = 0; i < r->rd_att->natts; i++)
-       {
-               Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
-               gistdentryinit(giststate, i, &attdata[i],
-                                          datum, r, p, o,
-                                          ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]),
-                                          FALSE, isnull[i]);
-       }
-}
-
-static void
-gistpenalty(GISTSTATE *giststate, int attno,
-                       GISTENTRY *key1, bool isNull1,
-                       GISTENTRY *key2, bool isNull2, float *penalty)
-{
-       if (giststate->penaltyFn[attno].fn_strict && (isNull1 || isNull2))
-               *penalty = 0.0;
-       else
-               FunctionCall3(&giststate->penaltyFn[attno],
-                                         PointerGetDatum(key1),
-                                         PointerGetDatum(key2),
-                                         PointerGetDatum(penalty));
-}
-
 #ifdef GISTDEBUG
 static void
 gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff)
@@ -1726,13 +1120,3 @@ gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff)
 }
 #endif   /* defined GISTDEBUG */
 
-void
-gist_redo(XLogRecPtr lsn, XLogRecord *record)
-{
-       elog(PANIC, "gist_redo: unimplemented");
-}
-
-void
-gist_desc(char *buf, uint8 xl_info, char *rec)
-{
-}
index e5f77b67926261e4f6c6cfc03123e1aa5995f159..5b9a94471b10816aeebc07c54ab601d924d2c3b3 100644 (file)
@@ -8,14 +8,14 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.47 2005/05/17 03:34:18 neilc Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.48 2005/06/14 11:45:13 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
-#include "access/gist_private.h"
 #include "access/itup.h"
+#include "access/gist_private.h"
 #include "executor/execdebug.h"
 #include "utils/memutils.h"
 
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
new file mode 100644 (file)
index 0000000..44391f9
--- /dev/null
@@ -0,0 +1,785 @@
+/*-------------------------------------------------------------------------
+ *
+ * gistutil.c
+ *       utilities routines for the postgres GiST index access method.
+ *
+ *
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *          $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.1 2005/06/14 11:45:13 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/gist_private.h"
+#include "access/gistscan.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "miscadmin.h"
+
+/* group flags ( in gistadjsubkey ) */
+#define LEFT_ADDED      0x01
+#define RIGHT_ADDED 0x02
+#define BOTH_ADDED      ( LEFT_ADDED | RIGHT_ADDED )
+
+
+/*
+ * This defines is only for shorter code, used in gistgetadjusted
+ * and gistadjsubkey only
+ */
+#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb)      do { \
+       if (isnullkey) {                                                                                          \
+               gistentryinit((evp), rkey, r, NULL,                                               \
+                                         (OffsetNumber) 0, rkeyb, FALSE);                        \
+       } else {                                                                                                          \
+               gistentryinit((evp), okey, r, NULL,                                               \
+                                         (OffsetNumber) 0, okeyb, FALSE);                        \
+       }                                                                                                                         \
+} while(0)
+
+#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \
+       FILLITEM(*ev0p, isnull1, key1, key1b, key2, key2b);             \
+       FILLITEM(*ev1p, isnull2, key2, key2b, key1, key1b);             \
+} while(0);
+
+
+static void
+gistpenalty(GISTSTATE *giststate, int attno,
+                       GISTENTRY *key1, bool isNull1,
+                       GISTENTRY *key2, bool isNull2, float *penalty);
+
+/*
+ * Write itup vector to page, has no control of free space
+ */
+OffsetNumber
+gistfillbuffer(Relation r, Page page, IndexTuple *itup,
+                               int len, OffsetNumber off)
+{
+       OffsetNumber l = InvalidOffsetNumber;
+       int                     i;
+
+       for (i = 0; i < len; i++)
+       {
+               l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]),
+                                               off, LP_USED);
+               if (l == InvalidOffsetNumber)
+                       elog(ERROR, "gistfillbuffer: failed to add index item to \"%s\"",
+                                RelationGetRelationName(r));
+               off++;
+       }
+       return l;
+}
+
+/*
+ * Check space for itup vector on page
+ */
+bool
+gistnospace(Page page, IndexTuple *itvec, int len)
+{
+       unsigned int size = 0;
+       int                     i;
+
+       for (i = 0; i < len; i++)
+               size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
+
+       return (PageGetFreeSpace(page) < size);
+}
+
+/*
+ * Read buffer into itup vector
+ */
+IndexTuple *
+gistextractbuffer(Buffer buffer, int *len /* out */ )
+{
+       OffsetNumber i,
+                               maxoff;
+       IndexTuple *itvec;
+       Page            p = (Page) BufferGetPage(buffer);
+
+       maxoff = PageGetMaxOffsetNumber(p);
+       *len = maxoff;
+       itvec = palloc(sizeof(IndexTuple) * maxoff);
+       for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+               itvec[i - 1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
+
+       return itvec;
+}
+
+/*
+ * join two vectors into one
+ */
+IndexTuple *
+gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
+{
+       itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
+       memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
+       *len += addlen;
+       return itvec;
+}
+
+/*
+ * Return an IndexTuple containing the result of applying the "union"
+ * method to the specified IndexTuple vector.
+ */
+IndexTuple
+gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
+{
+       Datum           attr[INDEX_MAX_KEYS];
+       bool            isnull[INDEX_MAX_KEYS];
+       GistEntryVector *evec;
+       int                     i;
+       GISTENTRY       centry[INDEX_MAX_KEYS];
+
+       evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
+
+       for (i = 0; i < r->rd_att->natts; i++)
+       {
+               Datum           datum;
+               int                     j;
+               int                     real_len;
+
+               real_len = 0;
+               for (j = 0; j < len; j++)
+               {
+                       bool            IsNull;
+                       datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
+                       if (IsNull)
+                               continue;
+
+                       gistdentryinit(giststate, i,
+                                                  &(evec->vector[real_len]),
+                                                  datum,
+                                                  NULL, NULL, (OffsetNumber) 0,
+                                                  ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
+                                                  FALSE, IsNull);
+                       real_len++;
+               }
+
+               /* If this tuple vector was all NULLs, the union is NULL */
+               if (real_len == 0)
+               {
+                       attr[i] = (Datum) 0;
+                       isnull[i] = TRUE;
+               }
+               else
+               {
+                       int datumsize;
+
+                       if (real_len == 1)
+                       {
+                               evec->n = 2;
+                               gistentryinit(evec->vector[1],
+                                                         evec->vector[0].key, r, NULL,
+                                                         (OffsetNumber) 0, evec->vector[0].bytes, FALSE);
+                       }
+                       else
+                               evec->n = real_len;
+
+                       /* Compress the result of the union and store in attr array */
+                       datum = FunctionCall2(&giststate->unionFn[i],
+                                                                 PointerGetDatum(evec),
+                                                                 PointerGetDatum(&datumsize));
+
+                       gistcentryinit(giststate, i, &centry[i], datum,
+                                                  NULL, NULL, (OffsetNumber) 0,
+                                                  datumsize, FALSE, FALSE);
+                       isnull[i] = FALSE;
+                       attr[i] = centry[i].key;
+               }
+       }
+
+       return index_form_tuple(giststate->tupdesc, attr, isnull);
+}
+
+
+/*
+ * Forms union of oldtup and addtup, if union == oldtup then return NULL
+ */
+IndexTuple
+gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
+{
+       GistEntryVector *evec;
+       bool            neednew = false;
+       bool            isnull[INDEX_MAX_KEYS];
+       Datum           attr[INDEX_MAX_KEYS];
+       GISTENTRY       centry[INDEX_MAX_KEYS],
+                               oldatt[INDEX_MAX_KEYS],
+                               addatt[INDEX_MAX_KEYS],
+                          *ev0p,
+                          *ev1p;
+       bool            oldisnull[INDEX_MAX_KEYS],
+                               addisnull[INDEX_MAX_KEYS];
+       IndexTuple      newtup = NULL;
+       int                     i;
+
+       evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
+       evec->n = 2;
+       ev0p = &(evec->vector[0]);
+       ev1p = &(evec->vector[1]);
+
+       gistDeCompressAtt(giststate, r, oldtup, NULL,
+                                         (OffsetNumber) 0, oldatt, oldisnull);
+
+       gistDeCompressAtt(giststate, r, addtup, NULL,
+                                         (OffsetNumber) 0, addatt, addisnull);
+
+       for (i = 0; i < r->rd_att->natts; i++)
+       {
+               if (oldisnull[i] && addisnull[i])
+               {
+                       attr[i] = (Datum) 0;
+                       isnull[i] = TRUE;
+               }
+               else
+               {
+                       Datum           datum;
+                       int                     datumsize;
+
+                       FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes,
+                                  addisnull[i], addatt[i].key, addatt[i].bytes);
+
+                       datum = FunctionCall2(&giststate->unionFn[i],
+                                                                 PointerGetDatum(evec),
+                                                                 PointerGetDatum(&datumsize));
+
+                       if (oldisnull[i] || addisnull[i])
+                       {
+                               if (oldisnull[i])
+                                       neednew = true;
+                       }
+                       else
+                       {
+                               bool    result;
+
+                               FunctionCall3(&giststate->equalFn[i],
+                                                         ev0p->key,
+                                                         datum,
+                                                         PointerGetDatum(&result));
+
+                               if (!result)
+                                       neednew = true;
+                       }
+
+                       gistcentryinit(giststate, i, &centry[i], datum,
+                                                  NULL, NULL, (OffsetNumber) 0,
+                                                  datumsize, FALSE, FALSE);
+
+                       attr[i] = centry[i].key;
+                       isnull[i] = FALSE;
+               }
+       }
+
+       if (neednew)
+       {
+               /* need to update key */
+               newtup = index_form_tuple(giststate->tupdesc, attr, isnull);
+               newtup->t_tid = oldtup->t_tid;
+       }
+
+       return newtup;
+}
+
+void
+gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
+{
+       int lr;
+
+       for (lr = 0; lr < 2; lr++)
+       {
+               OffsetNumber *entries;
+               int                     i;
+               Datum      *attr;
+               int                     len,
+                                       *attrsize;
+               bool       *isnull;
+               GistEntryVector *evec;
+
+               if (lr)
+               {
+                       attrsize = spl->spl_lattrsize;
+                       attr = spl->spl_lattr;
+                       len = spl->spl_nleft;
+                       entries = spl->spl_left;
+                       isnull = spl->spl_lisnull;
+               }
+               else
+               {
+                       attrsize = spl->spl_rattrsize;
+                       attr = spl->spl_rattr;
+                       len = spl->spl_nright;
+                       entries = spl->spl_right;
+                       isnull = spl->spl_risnull;
+               }
+
+               evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
+
+               for (i = 1; i < r->rd_att->natts; i++)
+               {
+                       int                     j;
+                       Datum           datum;
+                       int                     datumsize;
+                       int                     real_len;
+
+                       real_len = 0;
+                       for (j = 0; j < len; j++)
+                       {
+                               bool            IsNull;
+
+                               if (spl->spl_idgrp[entries[j]])
+                                       continue;
+                               datum = index_getattr(itvec[entries[j] - 1], i + 1,
+                                                                         giststate->tupdesc, &IsNull);
+                               if (IsNull)
+                                       continue;
+                               gistdentryinit(giststate, i,
+                                                          &(evec->vector[real_len]),
+                                                          datum,
+                                                          NULL, NULL, (OffsetNumber) 0,
+                                                          ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
+                                                          FALSE, IsNull);
+                               real_len++;
+
+                       }
+
+                       if (real_len == 0)
+                       {
+                               datum = (Datum) 0;
+                               datumsize = 0;
+                               isnull[i] = true;
+                       }
+                       else
+                       {
+                               /*
+                                * evec->vector[0].bytes may be not defined, so form union
+                                * with itself
+                                */
+                               if (real_len == 1)
+                               {
+                                       evec->n = 2;
+                                       memcpy(&(evec->vector[1]), &(evec->vector[0]),
+                                                  sizeof(GISTENTRY));
+                               }
+                               else
+                                       evec->n = real_len;
+                               datum = FunctionCall2(&giststate->unionFn[i],
+                                                                         PointerGetDatum(evec),
+                                                                         PointerGetDatum(&datumsize));
+                               isnull[i] = false;
+                       }
+
+                       attr[i] = datum;
+                       attrsize[i] = datumsize;
+               }
+       }
+}
+
+/*
+ * find group in vector with equal value
+ */
+int
+gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
+{
+       int                     i;
+       int                     curid = 1;
+
+       /*
+        * first key is always not null (see gistinsert), so we may not check
+        * for nulls
+        */
+       for (i = 0; i < spl->spl_nleft; i++)
+       {
+               int j;
+               int len;
+               bool result;
+
+               if (spl->spl_idgrp[spl->spl_left[i]])
+                       continue;
+               len = 0;
+               /* find all equal value in right part */
+               for (j = 0; j < spl->spl_nright; j++)
+               {
+                       if (spl->spl_idgrp[spl->spl_right[j]])
+                               continue;
+                       FunctionCall3(&giststate->equalFn[0],
+                                                 valvec[spl->spl_left[i]].key,
+                                                 valvec[spl->spl_right[j]].key,
+                                                 PointerGetDatum(&result));
+                       if (result)
+                       {
+                               spl->spl_idgrp[spl->spl_right[j]] = curid;
+                               len++;
+                       }
+               }
+               /* find all other equal value in left part */
+               if (len)
+               {
+                       /* add current val to list of equal values */
+                       spl->spl_idgrp[spl->spl_left[i]] = curid;
+                       /* searching .. */
+                       for (j = i + 1; j < spl->spl_nleft; j++)
+                       {
+                               if (spl->spl_idgrp[spl->spl_left[j]])
+                                       continue;
+                               FunctionCall3(&giststate->equalFn[0],
+                                                         valvec[spl->spl_left[i]].key,
+                                                         valvec[spl->spl_left[j]].key,
+                                                         PointerGetDatum(&result));
+                               if (result)
+                               {
+                                       spl->spl_idgrp[spl->spl_left[j]] = curid;
+                                       len++;
+                               }
+                       }
+                       spl->spl_ngrp[curid] = len + 1;
+                       curid++;
+               }
+       }
+
+       return curid;
+}
+
+/*
+ * Insert equivalent tuples to left or right page with minimum
+ * penalty
+ */
+void
+gistadjsubkey(Relation r,
+                         IndexTuple *itup, /* contains compressed entry */
+                         int *len,
+                         GIST_SPLITVEC *v,
+                         GISTSTATE *giststate)
+{
+       int                     curlen;
+       OffsetNumber *curwpos;
+       GISTENTRY       entry,
+                               identry[INDEX_MAX_KEYS],
+                          *ev0p,
+                          *ev1p;
+       float           lpenalty,
+                               rpenalty;
+       GistEntryVector *evec;
+       int                     datumsize;
+       bool            isnull[INDEX_MAX_KEYS];
+       int                     i,
+                               j;
+
+       /* clear vectors */
+       curlen = v->spl_nleft;
+       curwpos = v->spl_left;
+       for (i = 0; i < v->spl_nleft; i++)
+       {
+               if (v->spl_idgrp[v->spl_left[i]] == 0)
+               {
+                       *curwpos = v->spl_left[i];
+                       curwpos++;
+               }
+               else
+                       curlen--;
+       }
+       v->spl_nleft = curlen;
+
+       curlen = v->spl_nright;
+       curwpos = v->spl_right;
+       for (i = 0; i < v->spl_nright; i++)
+       {
+               if (v->spl_idgrp[v->spl_right[i]] == 0)
+               {
+                       *curwpos = v->spl_right[i];
+                       curwpos++;
+               }
+               else
+                       curlen--;
+       }
+       v->spl_nright = curlen;
+
+       evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
+       evec->n = 2;
+       ev0p = &(evec->vector[0]);
+       ev1p = &(evec->vector[1]);
+
+       /* add equivalent tuple */
+       for (i = 0; i < *len; i++)
+       {
+               Datum           datum;
+
+               if (v->spl_idgrp[i + 1] == 0)   /* already inserted */
+                       continue;
+               gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0,
+                                                 identry, isnull);
+
+               v->spl_ngrp[v->spl_idgrp[i + 1]]--;
+               if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 &&
+                       (v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED)
+               {
+                       /* force last in group */
+                       rpenalty = 1.0;
+                       lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0;
+               }
+               else
+               {
+                       /* where? */
+                       for (j = 1; j < r->rd_att->natts; j++)
+                       {
+                               gistentryinit(entry, v->spl_lattr[j], r, NULL,
+                                                  (OffsetNumber) 0, v->spl_lattrsize[j], FALSE);
+                               gistpenalty(giststate, j, &entry, v->spl_lisnull[j],
+                                                       &identry[j], isnull[j], &lpenalty);
+
+                               gistentryinit(entry, v->spl_rattr[j], r, NULL,
+                                                  (OffsetNumber) 0, v->spl_rattrsize[j], FALSE);
+                               gistpenalty(giststate, j, &entry, v->spl_risnull[j],
+                                                       &identry[j], isnull[j], &rpenalty);
+
+                               if (lpenalty != rpenalty)
+                                       break;
+                       }
+               }
+
+               /*
+                * add
+                * XXX: refactor this to avoid duplicating code
+                */
+               if (lpenalty < rpenalty)
+               {
+                       v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED;
+                       v->spl_left[v->spl_nleft] = i + 1;
+                       v->spl_nleft++;
+                       for (j = 1; j < r->rd_att->natts; j++)
+                       {
+                               if (isnull[j] && v->spl_lisnull[j])
+                               {
+                                       v->spl_lattr[j] = (Datum) 0;
+                                       v->spl_lattrsize[j] = 0;
+                               }
+                               else
+                               {
+                                       FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j],
+                                                  isnull[j], identry[j].key, identry[j].bytes);
+
+                                       datum = FunctionCall2(&giststate->unionFn[j],
+                                                                                 PointerGetDatum(evec),
+                                                                                 PointerGetDatum(&datumsize));
+
+                                       v->spl_lattr[j] = datum;
+                                       v->spl_lattrsize[j] = datumsize;
+                                       v->spl_lisnull[j] = false;
+                               }
+                       }
+               }
+               else
+               {
+                       v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED;
+                       v->spl_right[v->spl_nright] = i + 1;
+                       v->spl_nright++;
+                       for (j = 1; j < r->rd_att->natts; j++)
+                       {
+                               if (isnull[j] && v->spl_risnull[j])
+                               {
+                                       v->spl_rattr[j] = (Datum) 0;
+                                       v->spl_rattrsize[j] = 0;
+                               }
+                               else
+                               {
+                                       FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j],
+                                                  isnull[j], identry[j].key, identry[j].bytes);
+
+                                       datum = FunctionCall2(&giststate->unionFn[j],
+                                                                                 PointerGetDatum(evec),
+                                                                                 PointerGetDatum(&datumsize));
+
+                                       v->spl_rattr[j] = datum;
+                                       v->spl_rattrsize[j] = datumsize;
+                                       v->spl_risnull[j] = false;
+                               }
+                       }
+               }
+       }
+}
+
+/*
+ * find entry with lowest penalty
+ */
+OffsetNumber
+gistchoose(Relation r, Page p, IndexTuple it,  /* it has compressed entry */
+                  GISTSTATE *giststate)
+{
+       OffsetNumber maxoff;
+       OffsetNumber i;
+       OffsetNumber which;
+       float           sum_grow,
+                               which_grow[INDEX_MAX_KEYS];
+       GISTENTRY       entry,
+                               identry[INDEX_MAX_KEYS];
+       bool            isnull[INDEX_MAX_KEYS];
+
+       maxoff = PageGetMaxOffsetNumber(p);
+       *which_grow = -1.0;
+       which = -1;
+       sum_grow = 1;
+       gistDeCompressAtt(giststate, r,
+                                         it, NULL, (OffsetNumber) 0,
+                                         identry, isnull);
+
+       for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i))
+       {
+               int                     j;
+               IndexTuple      itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
+
+               sum_grow = 0;
+               for (j = 0; j < r->rd_att->natts; j++)
+               {
+                       Datum           datum;
+                       float           usize;
+                       bool            IsNull;
+
+                       datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
+                       gistdentryinit(giststate, j, &entry, datum, r, p, i,
+                                                  ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull),
+                                                  FALSE, IsNull);
+                       gistpenalty(giststate, j, &entry, IsNull,
+                                               &identry[j], isnull[j], &usize);
+
+                       if (which_grow[j] < 0 || usize < which_grow[j])
+                       {
+                               which = i;
+                               which_grow[j] = usize;
+                               if (j < r->rd_att->natts - 1 && i == FirstOffsetNumber)
+                                       which_grow[j + 1] = -1;
+                               sum_grow += which_grow[j];
+                       }
+                       else if (which_grow[j] == usize)
+                               sum_grow += usize;
+                       else
+                       {
+                               sum_grow = 1;
+                               break;
+                       }
+               }
+       }
+
+       return which;
+}
+
+/*
+ * initialize a GiST entry with a decompressed version of key
+ */
+void
+gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
+                          Datum k, Relation r, Page pg, OffsetNumber o,
+                          int b, bool l, bool isNull)
+{
+       if (b && !isNull)
+       {
+               GISTENTRY  *dep;
+
+               gistentryinit(*e, k, r, pg, o, b, l);
+               dep = (GISTENTRY *)
+                       DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey],
+                                                                                 PointerGetDatum(e)));
+               /* decompressFn may just return the given pointer */
+               if (dep != e)
+                       gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
+                                                 dep->bytes, dep->leafkey);
+       }
+       else
+               gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
+}
+
+
+/*
+ * initialize a GiST entry with a compressed version of key
+ */
+void
+gistcentryinit(GISTSTATE *giststate, int nkey,
+                          GISTENTRY *e, Datum k, Relation r,
+                          Page pg, OffsetNumber o, int b, bool l, bool isNull)
+{
+       if (!isNull)
+       {
+               GISTENTRY  *cep;
+
+               gistentryinit(*e, k, r, pg, o, b, l);
+               cep = (GISTENTRY *)
+                       DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey],
+                                                                                 PointerGetDatum(e)));
+               /* compressFn may just return the given pointer */
+               if (cep != e)
+                       gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset,
+                                                 cep->bytes, cep->leafkey);
+       }
+       else
+               gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
+}
+
+IndexTuple
+gistFormTuple(GISTSTATE *giststate, Relation r,
+                         Datum attdata[], int datumsize[], bool isnull[])
+{
+       GISTENTRY       centry[INDEX_MAX_KEYS];
+       Datum           compatt[INDEX_MAX_KEYS];
+       int                     i;
+
+       for (i = 0; i < r->rd_att->natts; i++)
+       {
+               if (isnull[i])
+                       compatt[i] = (Datum) 0;
+               else
+               {
+                       gistcentryinit(giststate, i, &centry[i], attdata[i],
+                                                  NULL, NULL, (OffsetNumber) 0,
+                                                  datumsize[i], FALSE, FALSE);
+                       compatt[i] = centry[i].key;
+               }
+       }
+
+       return index_form_tuple(giststate->tupdesc, compatt, isnull);
+}
+
+void
+gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
+                                 OffsetNumber o, GISTENTRY *attdata, bool *isnull)
+{
+       int                     i;
+
+       for (i = 0; i < r->rd_att->natts; i++)
+       {
+               Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
+               gistdentryinit(giststate, i, &attdata[i],
+                                          datum, r, p, o,
+                                          ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]),
+                                          FALSE, isnull[i]);
+       }
+}
+
+static void
+gistpenalty(GISTSTATE *giststate, int attno,
+                       GISTENTRY *key1, bool isNull1,
+                       GISTENTRY *key2, bool isNull2, float *penalty)
+{
+       if (giststate->penaltyFn[attno].fn_strict && (isNull1 || isNull2))
+               *penalty = 0.0;
+       else
+               FunctionCall3(&giststate->penaltyFn[attno],
+                                         PointerGetDatum(key1),
+                                         PointerGetDatum(key2),
+                                         PointerGetDatum(penalty));
+}
+
+void
+GISTInitBuffer(Buffer b, uint32 f)
+{
+       GISTPageOpaque opaque;
+       Page            page;
+       Size            pageSize;
+
+       pageSize = BufferGetPageSize(b);
+       page = BufferGetPage(b);
+       PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
+
+       opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
+       opaque->flags = f;
+}
+
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
new file mode 100644 (file)
index 0000000..b99ab24
--- /dev/null
@@ -0,0 +1,628 @@
+/*-------------------------------------------------------------------------
+ *
+ * gistxlog.c
+ *       WAL replay logic for GiST.
+ *
+ *
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *           $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.1 2005/06/14 11:45:13 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/gist_private.h"
+#include "access/gistscan.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "utils/memutils.h"
+
+typedef struct {
+       gistxlogEntryUpdate     *data;
+       int                     len;
+       IndexTuple              *itup;
+       BlockNumber             *path;
+} EntryUpdateRecord;
+
+typedef struct {
+       gistxlogPage    *header;
+       OffsetNumber    *offnum;
+
+       /* to work with */
+       Page    page;
+       Buffer  buffer;
+       bool    is_ok;
+} NewPage;
+
+typedef struct {
+       gistxlogPageSplit       *data;
+       NewPage                 *page;
+       IndexTuple              *itup;
+       BlockNumber             *path;
+} PageSplitRecord;
+
+/* track for incomplete inserts, idea was taken from nbtxlog.c */
+
+typedef struct gistIncompleteInsert {
+       RelFileNode     node;
+       ItemPointerData key;
+       int             lenblk;
+       BlockNumber     *blkno;
+       int             pathlen;
+       BlockNumber     *path;
+} gistIncompleteInsert;
+
+
+MemoryContext opCtx;
+MemoryContext insertCtx;
+static List *incomplete_inserts;
+
+
+#define ItemPointerEQ( a, b )  \
+       ( \
+       ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(a) && \
+       ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \
+        )
+
+static void
+pushIncompleteInsert(RelFileNode node, ItemPointerData key,
+               BlockNumber *blkno, int lenblk,
+               BlockNumber *path,  int pathlen,
+               PageSplitRecord *xlinfo /* to extract blkno info */ ) {
+       MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
+       gistIncompleteInsert *ninsert = (gistIncompleteInsert*)palloc( sizeof(gistIncompleteInsert) );
+
+       ninsert->node = node;
+       ninsert->key  = key;
+
+       if ( lenblk && blkno ) {        
+               ninsert->lenblk = lenblk;
+               ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk );
+               memcpy(ninsert->blkno, blkno, sizeof(BlockNumber)*ninsert->lenblk);
+       } else {
+               int i;
+
+               Assert( xlinfo );
+               ninsert->lenblk = xlinfo->data->npage;
+               ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk );
+               for(i=0;i<ninsert->lenblk;i++)
+                       ninsert->blkno[i] = xlinfo->page[i].header->blkno;
+       }
+       Assert( ninsert->lenblk>0 );
+       
+       if ( path && ninsert->pathlen ) {
+               ninsert->pathlen = pathlen;
+               ninsert->path = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->pathlen );
+               memcpy(ninsert->path, path, sizeof(BlockNumber)*ninsert->pathlen);
+       } else { 
+               ninsert->pathlen = 0;
+               ninsert->path = NULL;
+       }
+
+       incomplete_inserts = lappend(incomplete_inserts, ninsert);
+       MemoryContextSwitchTo(oldCxt);
+}
+
+static void
+forgetIncompleteInsert(RelFileNode node, ItemPointerData key) {
+       ListCell   *l;
+
+       foreach(l, incomplete_inserts) {
+               gistIncompleteInsert    *insert = (gistIncompleteInsert*) lfirst(l);
+
+               if (  RelFileNodeEquals(node, insert->node) && ItemPointerEQ( &(insert->key), &(key) ) ) {
+                       
+                       /* found */
+                       if ( insert->path ) pfree( insert->path );
+                       pfree( insert->blkno );
+                       incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
+                       pfree( insert );
+                       break;
+               } 
+       }
+}
+
+static void
+decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) {
+       char *begin = XLogRecGetData(record), *ptr;
+       int i=0, addpath=0;
+
+       decoded->data = (gistxlogEntryUpdate*)begin;
+
+       if ( decoded->data->pathlen ) {
+               addpath = sizeof(BlockNumber) * decoded->data->pathlen;
+               decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
+       } else 
+               decoded->path = NULL;
+
+       decoded->len=0;
+       ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath;
+       while( ptr - begin < record->xl_len ) {
+               decoded->len++;
+               ptr += IndexTupleSize( (IndexTuple)ptr );
+       }  
+
+       decoded->itup=(IndexTuple*)palloc( sizeof( IndexTuple ) * decoded->len );
+       
+       ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath;
+       while( ptr - begin < record->xl_len ) {
+               decoded->itup[i] = (IndexTuple)ptr;
+               ptr += IndexTupleSize( decoded->itup[i] );
+               i++;
+       }
+}
+
+
+static void
+gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
+       EntryUpdateRecord       xlrec;
+       Relation        reln;
+       Buffer          buffer;
+       Page            page;
+
+       decodeEntryUpdateRecord( &xlrec, record );
+
+       reln = XLogOpenRelation(xlrec.data->node);
+       if (!RelationIsValid(reln))
+               return;
+       buffer = XLogReadBuffer(false, reln, xlrec.data->blkno);
+       if (!BufferIsValid(buffer))
+               elog(PANIC, "gistRedoEntryUpdateRecord: block unfound");
+       page = (Page) BufferGetPage(buffer);
+
+       if ( isnewroot ) {
+               if ( !PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page)) ) {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       ReleaseBuffer(buffer);
+                       return;
+               }
+       } else { 
+               if ( PageIsNew((PageHeader) page) )
+                       elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page");
+               if (XLByteLE(lsn, PageGetLSN(page))) {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       ReleaseBuffer(buffer);
+                       return;
+               }
+       }
+
+       if ( isnewroot )
+               GISTInitBuffer(buffer, 0);
+       else if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber ) 
+               PageIndexTupleDelete(page, xlrec.data->todeleteoffnum);
+
+       /* add tuples */
+       if ( xlrec.len > 0 ) {
+                OffsetNumber off = (PageIsEmpty(page)) ?  
+                        FirstOffsetNumber
+                        :
+                        OffsetNumberNext(PageGetMaxOffsetNumber(page));
+
+               gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off);
+       }
+
+       PageSetLSN(page, lsn);
+       PageSetTLI(page, ThisTimeLineID);
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       WriteBuffer(buffer);
+
+       if ( ItemPointerIsValid( &(xlrec.data->key) ) ) {
+               if ( incomplete_inserts != NIL )
+                       forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
+
+               if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO )
+                       pushIncompleteInsert(xlrec.data->node, xlrec.data->key, 
+                               &(xlrec.data->blkno), 1,
+                               xlrec.path, xlrec.data->pathlen,
+                               NULL);
+       }
+}
+
+static void
+decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) {
+       char *begin = XLogRecGetData(record), *ptr;
+       int i=0, addpath = 0;
+
+       decoded->data = (gistxlogPageSplit*)begin;
+       decoded->page = (NewPage*)palloc( sizeof(NewPage) * decoded->data->npage );
+       decoded->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * decoded->data->nitup );
+
+       if ( decoded->data->pathlen ) {
+               addpath = sizeof(BlockNumber) * decoded->data->pathlen;
+               decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
+       } else 
+               decoded->path = NULL;
+
+       ptr=begin+sizeof( gistxlogPageSplit ) + addpath;
+       for(i=0;i<decoded->data->nitup;i++) {
+               Assert( ptr - begin < record->xl_len );
+               decoded->itup[i] = (IndexTuple)ptr;
+               ptr += IndexTupleSize( decoded->itup[i] );
+       }
+
+       for(i=0;i<decoded->data->npage;i++) {
+               Assert( ptr - begin < record->xl_len );
+               decoded->page[i].header = (gistxlogPage*)ptr;
+               ptr += sizeof(gistxlogPage);
+
+               Assert( ptr - begin < record->xl_len );
+               decoded->page[i].offnum = (OffsetNumber*)ptr;
+               ptr += MAXALIGN( sizeof(OffsetNumber) * decoded->page[i].header->num );
+       }
+}
+       
+static void
+gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
+       PageSplitRecord xlrec;
+       Relation        reln;
+       Buffer          buffer;
+       Page            page;
+       int             i, len=0;
+       IndexTuple      *itup, *institup;
+       GISTPageOpaque opaque;
+       bool release=true;
+
+       decodePageSplitRecord( &xlrec, record );
+
+       reln = XLogOpenRelation(xlrec.data->node);
+       if (!RelationIsValid(reln))
+               return;
+       buffer = XLogReadBuffer( false, reln, xlrec.data->origblkno);
+       if (!BufferIsValid(buffer))
+               elog(PANIC, "gistRedoEntryUpdateRecord: block unfound");
+       page = (Page) BufferGetPage(buffer);
+       if (PageIsNew((PageHeader) page))
+               elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page");
+
+       if (XLByteLE(lsn, PageGetLSN(page))) {
+               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+               ReleaseBuffer(buffer);
+               return;
+       }
+       
+       if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber )
+               PageIndexTupleDelete(page, xlrec.data->todeleteoffnum);
+
+       itup = gistextractbuffer(buffer, &len);
+       itup = gistjoinvector(itup, &len, xlrec.itup, xlrec.data->nitup);
+       institup = (IndexTuple*)palloc( sizeof(IndexTuple) * len );
+        opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
+
+       for(i=0;i<xlrec.data->npage;i++) {
+               int j;
+               NewPage *newpage = xlrec.page + i; 
+
+               /* prepare itup vector */
+               for(j=0;j<newpage->header->num;j++)
+                       institup[j] = itup[ newpage->offnum[j] - 1 ];
+
+               if ( newpage->header->blkno == xlrec.data->origblkno ) {
+                       /* IncrBufferRefCount(buffer); */
+                       newpage->page = (Page) PageGetTempPage(page, sizeof(GISTPageOpaqueData));
+                       newpage->buffer = buffer;
+                       newpage->is_ok=false; 
+               } else {
+                       newpage->buffer = XLogReadBuffer(true, reln, newpage->header->blkno);
+                       if (!BufferIsValid(newpage->buffer))
+                               elog(PANIC, "gistRedoPageSplitRecord: lost page");
+                       newpage->page = (Page) BufferGetPage(newpage->buffer);
+                       if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(newpage->page))) {
+                               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                               ReleaseBuffer(buffer);
+                               newpage->is_ok=true;
+                               continue; /* good page */
+                       } else {
+                               newpage->is_ok=false;
+                               GISTInitBuffer(newpage->buffer, opaque->flags & F_LEAF);
+                       }
+               }
+               gistfillbuffer(reln, newpage->page, institup, newpage->header->num, FirstOffsetNumber);
+       }
+
+       for(i=0;i<xlrec.data->npage;i++) {
+               NewPage *newpage = xlrec.page + i;
+
+               if ( newpage->is_ok )
+                       continue;
+
+               if ( newpage->header->blkno == xlrec.data->origblkno ) { 
+                       PageRestoreTempPage(newpage->page, page);
+                       release = false;
+               }
+
+               PageSetLSN(newpage->page, lsn);
+               PageSetTLI(newpage->page, ThisTimeLineID);
+               LockBuffer(newpage->buffer, BUFFER_LOCK_UNLOCK);
+               WriteBuffer(newpage->buffer);   
+       }
+
+       if ( release ) {
+               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+               ReleaseBuffer(buffer);
+       }
+
+       if ( ItemPointerIsValid( &(xlrec.data->key) ) ) {
+               if ( incomplete_inserts != NIL )
+                       forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
+
+               pushIncompleteInsert(xlrec.data->node, xlrec.data->key, 
+                               NULL, 0,
+                               xlrec.path, xlrec.data->pathlen,
+                               &xlrec);
+       }
+}
+
+static void
+gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) {
+       RelFileNode     *node = (RelFileNode*)XLogRecGetData(record);
+       Relation        reln;
+       Buffer          buffer;
+       Page            page;
+
+       reln = XLogOpenRelation(*node);
+       if (!RelationIsValid(reln))
+               return;
+       buffer = XLogReadBuffer( true, reln, GIST_ROOT_BLKNO);
+       if (!BufferIsValid(buffer))
+               elog(PANIC, "gistRedoCreateIndex: block unfound");
+       page = (Page) BufferGetPage(buffer);
+
+       if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page))) {
+               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+               ReleaseBuffer(buffer);
+               return;
+       }
+
+       GISTInitBuffer(buffer, F_LEAF);
+
+       PageSetLSN(page, lsn);
+       PageSetTLI(page, ThisTimeLineID);
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       WriteBuffer(buffer);    
+}
+
+void
+gist_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+       uint8           info = record->xl_info & ~XLR_INFO_MASK;
+
+       MemoryContext oldCxt;
+       oldCxt = MemoryContextSwitchTo(opCtx);
+       switch (info) {
+               case    XLOG_GIST_ENTRY_UPDATE:
+               case    XLOG_GIST_ENTRY_DELETE:
+                       gistRedoEntryUpdateRecord(lsn, record,false);
+                       break;
+               case    XLOG_GIST_NEW_ROOT:
+                       gistRedoEntryUpdateRecord(lsn, record,true);
+                       break;
+               case    XLOG_GIST_PAGE_SPLIT:
+                       gistRedoPageSplitRecord(lsn, record);   
+                       break;
+               case    XLOG_GIST_CREATE_INDEX:
+                       gistRedoCreateIndex(lsn, record);
+                       break;
+               case    XLOG_GIST_INSERT_COMPLETE:
+                       forgetIncompleteInsert( ((gistxlogInsertComplete*)XLogRecGetData(record))->node, 
+                               ((gistxlogInsertComplete*)XLogRecGetData(record))->key );
+                       break;
+               default:
+                       elog(PANIC, "gist_redo: unknown op code %u", info);
+       }
+
+       MemoryContextSwitchTo(oldCxt);
+       MemoryContextReset(opCtx);
+}
+
+static void
+out_target(char *buf, RelFileNode node, ItemPointerData key)
+{
+        sprintf(buf + strlen(buf), "rel %u/%u/%u; tid %u/%u",
+                 node.spcNode, node.dbNode, node.relNode,
+                        ItemPointerGetBlockNumber(&key),
+                        ItemPointerGetOffsetNumber(&key));
+}
+
+static void
+out_gistxlogEntryUpdate(char *buf, gistxlogEntryUpdate *xlrec) {
+       out_target(buf, xlrec->node, xlrec->key);
+       sprintf(buf + strlen(buf), "; block number %u; update offset %u;", 
+               xlrec->blkno, xlrec->todeleteoffnum);
+}
+
+static void
+out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) {
+       strcat(buf, "page_split: ");
+       out_target(buf, xlrec->node, xlrec->key);
+       sprintf(buf + strlen(buf), "; block number %u; update offset %u; add %d tuples; split to %d pages", 
+               xlrec->origblkno, xlrec->todeleteoffnum,
+               xlrec->nitup, xlrec->npage);
+}
+
+void
+gist_desc(char *buf, uint8 xl_info, char *rec)
+{
+       uint8           info = xl_info & ~XLR_INFO_MASK;
+
+       switch (info) {
+               case    XLOG_GIST_ENTRY_UPDATE:
+                       strcat(buf, "entry_update: ");
+                       out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate*)rec);
+                       break;
+               case    XLOG_GIST_ENTRY_DELETE:
+                       strcat(buf, "entry_delete: ");
+                       out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate*)rec);
+                       break;
+               case    XLOG_GIST_NEW_ROOT:
+                       strcat(buf, "new_root: ");
+                       out_target(buf, ((gistxlogEntryUpdate*)rec)->node, ((gistxlogEntryUpdate*)rec)->key);
+                       break;
+               case    XLOG_GIST_PAGE_SPLIT:
+                       out_gistxlogPageSplit(buf, (gistxlogPageSplit*)rec);
+                       break;
+               case    XLOG_GIST_CREATE_INDEX:
+                       sprintf(buf + strlen(buf), "create_index: rel %u/%u/%u", 
+                               ((RelFileNode*)rec)->spcNode, 
+                               ((RelFileNode*)rec)->dbNode, 
+                               ((RelFileNode*)rec)->relNode);
+                       break;
+               case    XLOG_GIST_INSERT_COMPLETE:
+                       strcat(buf, "insert_complete: ");
+                       out_target(buf, ((gistxlogInsertComplete*)rec)->node, ((gistxlogInsertComplete*)rec)->key); 
+               default:
+                       elog(PANIC, "gist_desc: unknown op code %u", info);
+       }
+}
+
+
+#ifdef GIST_INCOMPLETE_INSERT 
+static void
+gistContinueInsert(gistIncompleteInsert *insert) {
+       GISTSTATE       giststate;
+       GISTInsertState state;
+       int i;
+       MemoryContext oldCxt;
+       oldCxt = MemoryContextSwitchTo(opCtx);
+       
+       state.r = XLogOpenRelation(insert->node);
+       if (!RelationIsValid(state.r))
+               return;
+
+       initGISTstate(&giststate, state.r);
+
+       state.needInsertComplete=false;
+       ItemPointerSetInvalid( &(state.key) );
+       state.path=NULL;
+       state.pathlen=0;
+       state.xlog_mode = true;
+
+       /* form union tuples */
+       state.itup = (IndexTuple*)palloc(sizeof(IndexTuple)*insert->lenblk);
+       state.ituplen = insert->lenblk; 
+       for(i=0;i<insert->lenblk;i++) {
+               int len=0;
+               IndexTuple *itup;
+               Buffer  buffer;
+               Page    page;
+
+               buffer = XLogReadBuffer(false, state.r, insert->blkno[i]);
+               if (!BufferIsValid(buffer))
+                       elog(PANIC, "gistContinueInsert: block unfound");
+               page = (Page) BufferGetPage(buffer);
+               if ( PageIsNew((PageHeader)page) )
+                       elog(PANIC, "gistContinueInsert: uninitialized page");
+
+               itup = gistextractbuffer(buffer, &len);
+               state.itup[i] = gistunion(state.r, itup, len, &giststate);
+
+               ItemPointerSet( &(state.itup[i]->t_tid), insert->blkno[i], FirstOffsetNumber );
+               
+               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+               ReleaseBuffer(buffer);
+       }
+
+       if ( insert->pathlen==0 ) { 
+               /*it  was split root, so we should only make new root*/
+               gistnewroot(state.r, state.itup, state.ituplen, &(state.key), true);
+               MemoryContextSwitchTo(oldCxt);
+               MemoryContextReset(opCtx);
+               return;
+       }
+
+       /* form stack */
+       state.stack=NULL;
+       for(i=0;i<insert->pathlen;i++) {
+               int j,len=0;
+               IndexTuple *itup;
+               GISTInsertStack *top = (GISTInsertStack*)palloc( sizeof(GISTInsertStack) );
+
+               top->blkno = insert->path[i];
+               top->buffer = XLogReadBuffer(false, state.r, top->blkno);
+               if (!BufferIsValid(top->buffer))
+                       elog(PANIC, "gistContinueInsert: block unfound");
+               top->page = (Page) BufferGetPage(top->buffer);
+               if ( PageIsNew((PageHeader)(top->page)) )
+                       elog(PANIC, "gistContinueInsert: uninitialized page");
+
+               top->todelete = false;  
+
+               /* find childoffnum */
+               itup = gistextractbuffer(top->buffer, &len);
+               top->childoffnum=InvalidOffsetNumber;
+               for(j=0;j<len && top->childoffnum==InvalidOffsetNumber;j++) {
+                       BlockNumber blkno = ItemPointerGetBlockNumber( &(itup[j]->t_tid) ); 
+                       
+                       if ( i==0 ) {
+                               int k; 
+                               for(k=0;k<insert->lenblk;k++)
+                                       if ( insert->blkno[k] == blkno ) {
+                                               top->childoffnum = j+1;
+                                               break;
+                                       }
+                       } else if ( insert->path[i-1]==blkno )
+                                       top->childoffnum = j+1;
+               }
+
+               if ( top->childoffnum==InvalidOffsetNumber ) {
+                       elog(WARNING, "gistContinueInsert: unknown parent, REINDEX GiST Indexes");
+                       return;
+               }
+
+               if ( i==0 ) 
+                       PageIndexTupleDelete(top->page, top->childoffnum);
+                       
+               /* install item on right place in stack */
+               top->parent=NULL;
+               if ( state.stack ) {
+                       GISTInsertStack *ptr = state.stack;
+                       while( ptr->parent )
+                               ptr = ptr->parent;
+                       ptr->parent=top;
+               } else
+                       state.stack = top;
+       }
+
+       /* Good. Now we can continue insert */
+
+       gistmakedeal(&state, &giststate);
+
+       MemoryContextSwitchTo(oldCxt);
+       MemoryContextReset(opCtx);
+}
+#endif
+
+void
+gist_xlog_startup(void) {
+       incomplete_inserts=NIL;
+       insertCtx = AllocSetContextCreate(CurrentMemoryContext,
+               "GiST insert in xlog  temporary context",       
+                                 ALLOCSET_DEFAULT_MINSIZE,
+                                 ALLOCSET_DEFAULT_INITSIZE,
+                                 ALLOCSET_DEFAULT_MAXSIZE);
+       opCtx = createTempGistContext();
+}
+
+void
+gist_xlog_cleanup(void) {
+       ListCell   *l;
+
+       foreach(l, incomplete_inserts) {
+               gistIncompleteInsert    *insert = (gistIncompleteInsert*) lfirst(l);
+               char buf[1024];
+
+               *buf='\0';
+               out_target(buf, insert->node, insert->key);
+               elog(LOG,"Incomplete insert: %s; It's needed to reindex", buf);
+#ifdef GIST_INCOMPLETE_INSERT 
+               gistContinueInsert(insert);
+#endif
+       }
+       MemoryContextDelete(opCtx);
+       MemoryContextDelete(insertCtx); 
+}
+
index 5fe442fd80d05cb9dce964d7b32c13b0446cb939..3b8bcb131d0be70ea21c4cf290710ff7ff300c02 100644 (file)
@@ -3,7 +3,7 @@
  *
  * Resource managers definition
  *
- * $PostgreSQL: pgsql/src/backend/access/transam/rmgr.c,v 1.19 2005/06/08 15:50:26 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/rmgr.c,v 1.20 2005/06/14 11:45:14 teodor Exp $
  */
 #include "postgres.h"
 
@@ -37,6 +37,6 @@ const RmgrData RmgrTable[RM_MAX_ID + 1] = {
        {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup},
        {"Hash", hash_redo, hash_desc, NULL, NULL},
        {"Rtree", rtree_redo, rtree_desc, NULL, NULL},
-       {"Gist", gist_redo, gist_desc, NULL, NULL},
+       {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup},
        {"Sequence", seq_redo, seq_desc, NULL, NULL}
 };
index db255a7b6f548f60670996352e3869cf07d8c6eb..479f221176b00f9fa7fd8e6367bc5dbee017d013 100644 (file)
@@ -7,15 +7,17 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.2 2005/06/06 17:01:24 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.3 2005/06/14 11:45:14 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
 #ifndef GIST_PRIVATE_H
 #define GIST_PRIVATE_H
 
+#include "access/itup.h"
 #include "access/gist.h"
 #include "access/xlog.h"
+#include "access/xlogdefs.h"
 #include "fmgr.h"
 
 /*
@@ -66,6 +68,42 @@ typedef struct GISTScanOpaqueData
 
 typedef GISTScanOpaqueData *GISTScanOpaque;
 
+/*
+ * GISTInsertStack used for locking buffers and transfer arguments during
+ * insertion
+ */
+
+typedef struct GISTInsertStack {
+       /* current page */
+       BlockNumber     blkno;   
+       Buffer          buffer;
+       Page            page;
+       
+       /* child's offset */
+       OffsetNumber    childoffnum;
+
+       /* pointer to parent */
+       struct GISTInsertStack  *parent;
+
+       bool todelete;
+} GISTInsertStack;
+
+typedef struct {
+       Relation        r;
+       IndexTuple      *itup; /* in/out, points to compressed entry */
+       int             ituplen; /* length of itup */
+       GISTInsertStack *stack;
+       bool needInsertComplete;
+       bool xlog_mode;
+
+       /* pointer to heap tuple */
+       ItemPointerData key;
+
+       /* path to stroe in XLog */
+       BlockNumber     *path;
+       int             pathlen; 
+} GISTInsertState;
+
 /*
  * When we're doing a scan and updating a tree at the same time, the
  * updates may affect the scan.  We use the flags entry of the scan's
@@ -89,6 +127,72 @@ typedef GISTScanOpaqueData *GISTScanOpaque;
 #define GISTOP_DEL             0
 #define GISTOP_SPLIT   1
 
+#define ATTSIZE(datum, tupdesc, i, isnull) \
+        ( \
+                (isnull) ? 0 : \
+                   att_addlength(0, (tupdesc)->attrs[(i)-1]->attlen, (datum)) \
+        ) 
+
+/* XLog stuff */
+#define        XLOG_GIST_ENTRY_UPDATE  0x00
+#define        XLOG_GIST_ENTRY_DELETE  0x10
+#define XLOG_GIST_NEW_ROOT     0x20
+
+typedef struct gistxlogEntryUpdate {
+       RelFileNode     node;
+       BlockNumber     blkno;
+
+       /* if todeleteoffnum!=InvalidOffsetNumber then delete it. */ 
+       OffsetNumber    todeleteoffnum;
+       uint16          pathlen;
+
+       /* 
+        * It used to identify compliteness of insert.
+         * Sets to leaf itup 
+         */ 
+       ItemPointerData key;
+
+       /* follow:
+        * 1. path to root (BlockNumber) 
+        * 2. tuples to insert
+         */ 
+} gistxlogEntryUpdate;
+
+#define XLOG_GIST_PAGE_SPLIT   0x30
+
+typedef struct gistxlogPageSplit {
+       RelFileNode     node;
+       BlockNumber     origblkno; /*splitted page*/
+       OffsetNumber    todeleteoffnum;
+       uint16          pathlen;
+       int             npage;
+       int             nitup;
+
+       /* see comments on gistxlogEntryUpdate */
+       ItemPointerData key;
+       /* follow:
+        * 1. path to root (BlockNumber) 
+        * 2. tuples to insert
+        * 3. gistxlogPage and array of OffsetNumber per page
+         */ 
+} gistxlogPageSplit;
+
+typedef struct gistxlogPage {
+       BlockNumber     blkno;
+       int             num;
+} gistxlogPage;        
+
+
+#define XLOG_GIST_INSERT_COMPLETE  0x40
+
+typedef struct gistxlogInsertComplete {
+       RelFileNode     node;
+       ItemPointerData key;
+} gistxlogInsertComplete;
+
+#define XLOG_GIST_CREATE_INDEX 0x50
+
 /* gist.c */
 extern Datum gistbuild(PG_FUNCTION_ARGS);
 extern Datum gistinsert(PG_FUNCTION_ARGS);
@@ -96,15 +200,57 @@ extern Datum gistbulkdelete(PG_FUNCTION_ARGS);
 extern MemoryContext createTempGistContext(void);
 extern void initGISTstate(GISTSTATE *giststate, Relation index);
 extern void freeGISTstate(GISTSTATE *giststate);
-extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
-                          Datum k, Relation r, Page pg, OffsetNumber o,
-                          int b, bool l, bool isNull);
+extern void gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode);
+extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate);
 
+/* gistxlog.c */
 extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
 extern void gist_desc(char *buf, uint8 xl_info, char *rec);
+extern void gist_xlog_startup(void);
+extern void gist_xlog_cleanup(void);
 
 /* gistget.c */
 extern Datum gistgettuple(PG_FUNCTION_ARGS);
 extern Datum gistgetmulti(PG_FUNCTION_ARGS);
 
+/* gistutil.c */
+extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup,
+                                int len, OffsetNumber off);
+extern bool gistnospace(Page page, IndexTuple *itvec, int len);
+extern IndexTuple * gistextractbuffer(Buffer buffer, int *len /* out */ );
+extern IndexTuple * gistjoinvector(
+                           IndexTuple *itvec, int *len,
+                           IndexTuple *additvec, int addlen);
+extern IndexTuple gistunion(Relation r, IndexTuple *itvec,
+                  int len, GISTSTATE *giststate);
+extern IndexTuple gistgetadjusted(Relation r,
+                                IndexTuple oldtup,
+                                IndexTuple addtup,
+                                GISTSTATE *giststate);
+extern int gistfindgroup(GISTSTATE *giststate,
+                          GISTENTRY *valvec, GIST_SPLITVEC *spl);
+extern void gistadjsubkey(Relation r,
+                          IndexTuple *itup, int *len,
+                          GIST_SPLITVEC *v,
+                          GISTSTATE *giststate);
+extern IndexTuple gistFormTuple(GISTSTATE *giststate,
+                        Relation r, Datum *attdata, int *datumsize, bool *isnull);
+
+extern OffsetNumber gistchoose(Relation r, Page p,
+                   IndexTuple it,
+                   GISTSTATE *giststate);
+extern void gistcentryinit(GISTSTATE *giststate, int nkey,
+                           GISTENTRY *e, Datum k, 
+                           Relation r, Page pg,
+                           OffsetNumber o, int b, bool l, bool isNull);
+extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r,
+                              IndexTuple tuple, Page p, OffsetNumber o,
+                              GISTENTRY *attdata, bool *isnull);
+extern void gistunionsubkey(Relation r, GISTSTATE *giststate, 
+                            IndexTuple *itvec, GIST_SPLITVEC *spl);
+extern void GISTInitBuffer(Buffer b, uint32 f);
+extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
+                          Datum k, Relation r, Page pg, OffsetNumber o,
+                          int b, bool l, bool isNull);
+
 #endif /* GIST_PRIVATE_H */