* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.118 2005/06/06 17:01:21 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.119 2005/06/14 11:45:13 teodor Exp $
*
*-------------------------------------------------------------------------
*/
#include "miscadmin.h"
#include "utils/memutils.h"
-
-#undef GIST_PAGEADDITEM
-
-#define ATTSIZE(datum, tupdesc, i, isnull) \
- ( \
- (isnull) ? 0 : \
- att_addlength(0, (tupdesc)->attrs[(i)-1]->attlen, (datum)) \
- )
-
-/* result's status */
-#define INSERTED 0x01
-#define SPLITED 0x02
-
-/* group flags ( in gistSplit ) */
-#define LEFT_ADDED 0x01
-#define RIGHT_ADDED 0x02
-#define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED )
-
-/*
- * This defines only for shorter code, used in gistgetadjusted
- * and gistadjsubkey only
- */
-#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb) do { \
- if (isnullkey) { \
- gistentryinit((evp), rkey, r, NULL, \
- (OffsetNumber) 0, rkeyb, FALSE); \
- } else { \
- gistentryinit((evp), okey, r, NULL, \
- (OffsetNumber) 0, okeyb, FALSE); \
- } \
-} while(0)
-
-#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \
- FILLITEM(*ev0p, isnull1, key1, key1b, key2, key2b); \
- FILLITEM(*ev1p, isnull2, key2, key2b, key1, key1b); \
-} while(0);
-
/* Working state for gistbuild and its callback */
typedef struct
{
static void gistdoinsert(Relation r,
IndexTuple itup,
GISTSTATE *GISTstate);
-static int gistlayerinsert(Relation r, BlockNumber blkno,
- IndexTuple **itup,
- int *len,
- GISTSTATE *giststate);
-static OffsetNumber gistwritebuffer(Relation r,
- Page page,
- IndexTuple *itup,
- int len,
- OffsetNumber off);
-static bool gistnospace(Page page, IndexTuple *itvec, int len);
-static IndexTuple *gistreadbuffer(Buffer buffer, int *len);
-static IndexTuple *gistjoinvector(
- IndexTuple *itvec, int *len,
- IndexTuple *additvec, int addlen);
-static IndexTuple gistunion(Relation r, IndexTuple *itvec,
- int len, GISTSTATE *giststate);
-
-static IndexTuple gistgetadjusted(Relation r,
- IndexTuple oldtup,
- IndexTuple addtup,
+static void gistfindleaf(GISTInsertState *state,
GISTSTATE *giststate);
-static int gistfindgroup(GISTSTATE *giststate,
- GISTENTRY *valvec, GIST_SPLITVEC *spl);
-static void gistadjsubkey(Relation r,
- IndexTuple *itup, int *len,
- GIST_SPLITVEC *v,
- GISTSTATE *giststate);
-static IndexTuple gistFormTuple(GISTSTATE *giststate,
- Relation r, Datum *attdata, int *datumsize, bool *isnull);
+
+
+typedef struct PageLayout {
+ gistxlogPage block;
+ OffsetNumber *list;
+ Buffer buffer; /* to write after all proceed */
+
+ struct PageLayout *next;
+} PageLayout;
+
+
+#define ROTATEDIST(d) do { \
+ PageLayout *tmp=(PageLayout*)palloc(sizeof(PageLayout)); \
+ memset(tmp,0,sizeof(PageLayout)); \
+ tmp->next = (d); \
+ (d)=tmp; \
+} while(0)
+
+
static IndexTuple *gistSplit(Relation r,
Buffer buffer,
IndexTuple *itup,
int *len,
+ PageLayout **dist,
GISTSTATE *giststate);
-static void gistnewroot(Relation r, IndexTuple *itup, int len);
-static void GISTInitBuffer(Buffer b, uint32 f);
-static OffsetNumber gistchoose(Relation r, Page p,
- IndexTuple it,
- GISTSTATE *giststate);
-static void gistdelete(Relation r, ItemPointer tid);
-
-#ifdef GIST_PAGEADDITEM
-static IndexTuple gist_tuple_replacekey(Relation r,
- GISTENTRY entry, IndexTuple t);
-#endif
-static void gistcentryinit(GISTSTATE *giststate, int nkey,
- GISTENTRY *e, Datum k,
- Relation r, Page pg,
- OffsetNumber o, int b, bool l, bool isNull);
-static void gistDeCompressAtt(GISTSTATE *giststate, Relation r,
- IndexTuple tuple, Page p, OffsetNumber o,
- GISTENTRY *attdata, bool *isnull);
-static void gistpenalty(GISTSTATE *giststate, int attno,
- GISTENTRY *key1, bool isNull1,
- GISTENTRY *key2, bool isNull2,
- float *penalty);
+
#undef GISTDEBUG
/* initialize the root page */
buffer = ReadBuffer(index, P_NEW);
GISTInitBuffer(buffer, F_LEAF);
+ if ( !index->rd_istemp ) {
+ XLogRecPtr recptr;
+ XLogRecData rdata;
+ Page page;
+
+ rdata.buffer = InvalidBuffer;
+ rdata.data = (char*)&(index->rd_node);
+ rdata.len = sizeof(RelFileNode);
+ rdata.next = NULL;
+
+ page = BufferGetPage(buffer);
+
+ START_CRIT_SECTION();
+
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+
+ END_CRIT_SECTION();
+ }
WriteBuffer(buffer);
/* build the index */
PG_RETURN_BOOL(true);
}
-#ifdef GIST_PAGEADDITEM
-/*
- * Take a compressed entry, and install it on a page. Since we now know
- * where the entry will live, we decompress it and recompress it using
- * that knowledge (some compression routines may want to fish around
- * on the page, for example, or do something special for leaf nodes.)
- */
-static OffsetNumber
-gistPageAddItem(GISTSTATE *giststate,
- Relation r,
- Page page,
- Item item,
- Size size,
- OffsetNumber offsetNumber,
- ItemIdFlags flags,
- GISTENTRY *dentry,
- IndexTuple *newtup)
-{
- GISTENTRY tmpcentry;
- IndexTuple itup = (IndexTuple) item;
- OffsetNumber retval;
- Datum datum;
- bool IsNull;
-
- /*
- * recompress the item given that we now know the exact page and
- * offset for insertion
- */
- datum = index_getattr(itup, 1, r->rd_att, &IsNull);
- gistdentryinit(giststate, 0, dentry, datum,
- (Relation) 0, (Page) 0,
- (OffsetNumber) InvalidOffsetNumber,
- ATTSIZE(datum, r, 1, IsNull),
- FALSE, IsNull);
- gistcentryinit(giststate, 0, &tmpcentry, dentry->key, r, page,
- offsetNumber, dentry->bytes, FALSE);
- *newtup = gist_tuple_replacekey(r, tmpcentry, itup);
- retval = PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup),
- offsetNumber, flags);
- if (retval == InvalidOffsetNumber)
- elog(ERROR, "failed to add index item to \"%s\"",
- RelationGetRelationName(r));
- return retval;
-}
-#endif
/*
* Workhouse routine for doing insertion into a GiST index. Note that
static void
gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate)
{
- IndexTuple *instup;
- int ret,
- len = 1;
+ GISTInsertState state;
- instup = (IndexTuple *) palloc(sizeof(IndexTuple));
- instup[0] = (IndexTuple) palloc(IndexTupleSize(itup));
- memcpy(instup[0], itup, IndexTupleSize(itup));
+ memset(&state, 0, sizeof(GISTInsertState));
- ret = gistlayerinsert(r, GIST_ROOT_BLKNO, &instup, &len, giststate);
- if (ret & SPLITED)
- gistnewroot(r, instup, len);
-}
+ state.itup = (IndexTuple *) palloc(sizeof(IndexTuple));
+ state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup));
+ memcpy(state.itup[0], itup, IndexTupleSize(itup));
+ state.ituplen=1;
+ state.r = r;
+ state.key = itup->t_tid;
+ state.needInsertComplete = true;
+ state.xlog_mode = false;
-static int
-gistlayerinsert(Relation r, BlockNumber blkno,
- IndexTuple **itup, /* in - out, has compressed entry */
- int *len, /* in - out */
- GISTSTATE *giststate)
-{
- Buffer buffer;
- Page page;
- int ret;
- GISTPageOpaque opaque;
+ state.stack = (GISTInsertStack*)palloc(sizeof(GISTInsertStack));
+ memset( state.stack, 0, sizeof(GISTInsertStack));
+ state.stack->blkno=GIST_ROOT_BLKNO;
- buffer = ReadBuffer(r, blkno);
- page = (Page) BufferGetPage(buffer);
- opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
+ gistfindleaf(&state, giststate);
+ gistmakedeal(&state, giststate);
+}
- if (!(opaque->flags & F_LEAF))
+static bool
+gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
+ bool is_splitted = false;
+
+ if (gistnospace(state->stack->page, state->itup, state->ituplen))
{
- /*
- * This is an internal page, so continue to walk down the
- * tree. We find the child node that has the minimum insertion
- * penalty and recursively invoke ourselves to modify that
- * node. Once the recursive call returns, we may need to
- * adjust the parent node for two reasons: the child node
- * split, or the key in this node needs to be adjusted for the
- * newly inserted key below us.
- */
- ItemId iid;
- BlockNumber nblkno;
- ItemPointerData oldtid;
- IndexTuple oldtup;
- OffsetNumber child;
-
- child = gistchoose(r, page, *(*itup), giststate);
- iid = PageGetItemId(page, child);
- oldtup = (IndexTuple) PageGetItem(page, iid);
- nblkno = ItemPointerGetBlockNumber(&(oldtup->t_tid));
+ /* no space for insertion */
+ IndexTuple *itvec,
+ *newitup;
+ int tlen,olen;
+ PageLayout *dist=NULL, *ptr;
+
+ memset(&dist, 0, sizeof(PageLayout));
+ is_splitted = true;
+ itvec = gistextractbuffer(state->stack->buffer, &tlen);
+ olen=tlen;
+ itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
+ newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate);
+
+ if ( !state->r->rd_istemp && !state->xlog_mode) {
+ gistxlogPageSplit xlrec;
+ XLogRecPtr recptr;
+ XLogRecData *rdata;
+ int i, npage = 0, cur=1;
+
+ ptr=dist;
+ while( ptr ) {
+ npage++;
+ ptr=ptr->next;
+ }
- /*
- * After this call: 1. if child page was splited, then itup
- * contains keys for each page 2. if child page wasn't splited,
- * then itup contains additional for adjustment of current key
- */
- ret = gistlayerinsert(r, nblkno, itup, len, giststate);
+ rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + state->ituplen + 2));
+
+ xlrec.node = state->r->rd_node;
+ xlrec.origblkno = state->stack->blkno;
+ xlrec.npage = npage;
+ xlrec.nitup = state->ituplen;
+ xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber;
+ xlrec.key = state->key;
+ xlrec.pathlen = (uint16)state->pathlen;
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = sizeof( gistxlogPageSplit );
+ rdata[0].next = NULL;
+
+ if ( state->pathlen>=0 ) {
+ rdata[0].next = &(rdata[1]);
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = (char *) (state->path);
+ rdata[1].len = sizeof( BlockNumber ) * state->pathlen;
+ rdata[1].next = NULL;
+ cur++;
+ }
+
+ /* new tuples */
+ for(i=0;i<state->ituplen;i++) {
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)(state->itup[i]);
+ rdata[cur].len = IndexTupleSize(state->itup[i]);
+ rdata[cur-1].next = &(rdata[cur]);
+ cur++;
+ }
- /* nothing inserted in child */
- if (!(ret & INSERTED))
- {
- ReleaseBuffer(buffer);
- return 0x00;
- }
+ /* new page layout */
+ ptr=dist;
+ while(ptr) {
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)&(ptr->block);
+ rdata[cur].len = sizeof(gistxlogPage);
+ rdata[cur-1].next = &(rdata[cur]);
+ cur++;
+
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)(ptr->list);
+ rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num);
+ if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num )
+ rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len );
+ rdata[cur-1].next = &(rdata[cur]);
+ rdata[cur].next=NULL;
+ cur++;
+
+ ptr=ptr->next;
+ }
- /* child did not split */
- if (!(ret & SPLITED))
- {
- IndexTuple newtup = gistgetadjusted(r, oldtup, (*itup)[0], giststate);
+ START_CRIT_SECTION();
- if (!newtup)
- {
- /* not need to update key */
- ReleaseBuffer(buffer);
- return 0x00;
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
+ ptr = dist;
+ while(ptr) {
+ PageSetLSN(BufferGetPage(ptr->buffer), recptr);
+ PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
+ ptr=ptr->next;
}
- (*itup)[0] = newtup;
+ END_CRIT_SECTION();
}
- /*
- * This node's key has been modified, either because a child
- * split occurred or because we needed to adjust our key for
- * an insert in a child node. Therefore, remove the old
- * version of this node's key.
- */
- ItemPointerSet(&oldtid, blkno, child);
- gistdelete(r, &oldtid);
-
- /*
- * if child was splitted, new key for child will be inserted in
- * the end list of child, so we must say to any scans that page is
- * changed beginning from 'child' offset
- */
- if (ret & SPLITED)
- gistadjscans(r, GISTOP_SPLIT, blkno, child);
- }
+ ptr = dist;
+ while(ptr) {
+ WriteBuffer(ptr->buffer);
+ ptr=ptr->next;
+ }
- ret = INSERTED;
+ state->itup = newitup;
+ state->ituplen = tlen; /* now tlen >= 2 */
- if (gistnospace(page, (*itup), *len))
- {
- /* no space for insertion */
- IndexTuple *itvec,
- *newitup;
- int tlen,
- oldlen;
-
- ret |= SPLITED;
- itvec = gistreadbuffer(buffer, &tlen);
- itvec = gistjoinvector(itvec, &tlen, (*itup), *len);
- oldlen = *len;
- newitup = gistSplit(r, buffer, itvec, &tlen, giststate);
- ReleaseBuffer(buffer);
- *itup = newitup;
- *len = tlen; /* now tlen >= 2 */
+ if ( state->stack->blkno == GIST_ROOT_BLKNO ) {
+ gistnewroot(state->r, state->itup, state->ituplen, &(state->key), state->xlog_mode);
+ state->needInsertComplete=false;
+ }
+ if ( state->xlog_mode )
+ LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(state->stack->buffer);
}
else
{
/* enough space */
- OffsetNumber off,
- l;
+ OffsetNumber off, l;
- off = (PageIsEmpty(page)) ?
+ off = (PageIsEmpty(state->stack->page)) ?
FirstOffsetNumber
:
- OffsetNumberNext(PageGetMaxOffsetNumber(page));
- l = gistwritebuffer(r, page, *itup, *len, off);
- WriteBuffer(buffer);
+ OffsetNumberNext(PageGetMaxOffsetNumber(state->stack->page));
+ l = gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, off);
+ if ( !state->r->rd_istemp && !state->xlog_mode) {
+ gistxlogEntryUpdate xlrec;
+ XLogRecPtr recptr;
+ XLogRecData *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( state->ituplen + 2 ) );
+ int i, cur=0;
+
+ xlrec.node = state->r->rd_node;
+ xlrec.blkno = state->stack->blkno;
+ xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber;
+ xlrec.key = state->key;
+ xlrec.pathlen = (uint16)state->pathlen;
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = sizeof( gistxlogEntryUpdate );
+ rdata[0].next = NULL;
+
+ if ( state->pathlen>=0 ) {
+ rdata[0].next = &(rdata[1]);
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = (char *) (state->path);
+ rdata[1].len = sizeof( BlockNumber ) * state->pathlen;
+ rdata[1].next = NULL;
+ cur++;
+ }
+
+ for(i=1; i<=state->ituplen; i++) { /* adding tuples */
+ rdata[i+cur].buffer = InvalidBuffer;
+ rdata[i+cur].data = (char*)(state->itup[i-1]);
+ rdata[i+cur].len = IndexTupleSize(state->itup[i-1]);
+ rdata[i+cur].next = NULL;
+ rdata[i-1+cur].next = &(rdata[i+cur]);
+ }
+
+ START_CRIT_SECTION();
+
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
+ PageSetLSN(state->stack->page, recptr);
+ PageSetTLI(state->stack->page, ThisTimeLineID);
+
+ END_CRIT_SECTION();
+ }
+
+ if ( state->stack->blkno == GIST_ROOT_BLKNO )
+ state->needInsertComplete=false;
- if (*len > 1)
- { /* previous insert ret & SPLITED != 0 */
+ if ( state->xlog_mode )
+ LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
+ WriteBuffer(state->stack->buffer);
+
+ if (state->ituplen > 1)
+ { /* previous is_splitted==true */
/*
* child was splited, so we must form union for insertion in
* parent
*/
- IndexTuple newtup = gistunion(r, (*itup), *len, giststate);
- ItemPointerSet(&(newtup->t_tid), blkno, 1);
- (*itup)[0] = newtup;
- *len = 1;
+ IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate);
+ ItemPointerSet(&(newtup->t_tid), state->stack->blkno, FirstOffsetNumber);
+ state->itup[0] = newtup;
+ state->ituplen = 1;
}
}
-
- return ret;
-}
-
-/*
- * Write itup vector to page, has no control of free space
- */
-static OffsetNumber
-gistwritebuffer(Relation r, Page page, IndexTuple *itup,
- int len, OffsetNumber off)
-{
- OffsetNumber l = InvalidOffsetNumber;
- int i;
-
- for (i = 0; i < len; i++)
- {
-#ifdef GIST_PAGEADDITEM
- GISTENTRY tmpdentry;
- IndexTuple newtup;
- bool IsNull;
-
- l = gistPageAddItem(giststate, r, page,
- (Item) itup[i], IndexTupleSize(itup[i]),
- off, LP_USED, &tmpdentry, &newtup);
- off = OffsetNumberNext(off);
-#else
- l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]),
- off, LP_USED);
- if (l == InvalidOffsetNumber)
- elog(ERROR, "failed to add index item to \"%s\"",
- RelationGetRelationName(r));
-#endif
- }
- return l;
+ return is_splitted;
}
-/*
- * Check space for itup vector on page
- */
-static bool
-gistnospace(Page page, IndexTuple *itvec, int len)
-{
- unsigned int size = 0;
- int i;
-
- for (i = 0; i < len; i++)
- size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
-
- return (PageGetFreeSpace(page) < size);
-}
-
-/*
- * Read buffer into itup vector
- */
-static IndexTuple *
-gistreadbuffer(Buffer buffer, int *len /* out */ )
-{
- OffsetNumber i,
- maxoff;
- IndexTuple *itvec;
- Page p = (Page) BufferGetPage(buffer);
-
- maxoff = PageGetMaxOffsetNumber(p);
- *len = maxoff;
- itvec = palloc(sizeof(IndexTuple) * maxoff);
- for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
- itvec[i - 1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
-
- return itvec;
-}
-
-/*
- * join two vectors into one
- */
-static IndexTuple *
-gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
-{
- itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
- memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
- *len += addlen;
- return itvec;
-}
-
-/*
- * Return an IndexTuple containing the result of applying the "union"
- * method to the specified IndexTuple vector.
- */
-static IndexTuple
-gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
+static void
+gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
{
- Datum attr[INDEX_MAX_KEYS];
- bool isnull[INDEX_MAX_KEYS];
- GistEntryVector *evec;
- int i;
- GISTENTRY centry[INDEX_MAX_KEYS];
-
- evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
-
- for (i = 0; i < r->rd_att->natts; i++)
- {
- Datum datum;
- int j;
- int real_len;
+ ItemId iid;
+ IndexTuple oldtup;
+ GISTInsertStack *ptr;
- real_len = 0;
- for (j = 0; j < len; j++)
- {
- bool IsNull;
- datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
- if (IsNull)
- continue;
-
- gistdentryinit(giststate, i,
- &(evec->vector[real_len]),
- datum,
- NULL, NULL, (OffsetNumber) 0,
- ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
- FALSE, IsNull);
- real_len++;
- }
+ /* walk down */
+ while( true ) {
+ GISTPageOpaque opaque;
- /* If this tuple vector was all NULLs, the union is NULL */
- if (real_len == 0)
- {
- attr[i] = (Datum) 0;
- isnull[i] = TRUE;
- }
- else
+ state->stack->buffer = ReadBuffer(state->r, state->stack->blkno);
+ state->stack->page = (Page) BufferGetPage(state->stack->buffer);
+ opaque = (GISTPageOpaque) PageGetSpecialPointer(state->stack->page);
+
+ if (!(opaque->flags & F_LEAF))
{
- int datumsize;
-
- if (real_len == 1)
- {
- evec->n = 2;
- gistentryinit(evec->vector[1],
- evec->vector[0].key, r, NULL,
- (OffsetNumber) 0, evec->vector[0].bytes, FALSE);
- }
- else
- evec->n = real_len;
-
- /* Compress the result of the union and store in attr array */
- datum = FunctionCall2(&giststate->unionFn[i],
- PointerGetDatum(evec),
- PointerGetDatum(&datumsize));
-
- gistcentryinit(giststate, i, ¢ry[i], datum,
- NULL, NULL, (OffsetNumber) 0,
- datumsize, FALSE, FALSE);
- isnull[i] = FALSE;
- attr[i] = centry[i].key;
- }
+ /*
+ * This is an internal page, so continue to walk down the
+ * tree. We find the child node that has the minimum insertion
+ * penalty and recursively invoke ourselves to modify that
+ * node. Once the recursive call returns, we may need to
+ * adjust the parent node for two reasons: the child node
+ * split, or the key in this node needs to be adjusted for the
+ * newly inserted key below us.
+ */
+ GISTInsertStack *item=(GISTInsertStack*)palloc(sizeof(GISTInsertStack));
+
+ state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate);
+
+ iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
+ oldtup = (IndexTuple) PageGetItem(state->stack->page, iid);
+ item->blkno = ItemPointerGetBlockNumber(&(oldtup->t_tid));
+ item->parent = state->stack;
+ item->todelete = false;
+ state->stack = item;
+ } else
+ break;
}
- return index_form_tuple(giststate->tupdesc, attr, isnull);
-}
-
-
-/*
- * Forms union of oldtup and addtup, if union == oldtup then return NULL
- */
-static IndexTuple
-gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
-{
- GistEntryVector *evec;
- bool neednew = false;
- bool isnull[INDEX_MAX_KEYS];
- Datum attr[INDEX_MAX_KEYS];
- GISTENTRY centry[INDEX_MAX_KEYS],
- oldatt[INDEX_MAX_KEYS],
- addatt[INDEX_MAX_KEYS],
- *ev0p,
- *ev1p;
- bool oldisnull[INDEX_MAX_KEYS],
- addisnull[INDEX_MAX_KEYS];
- IndexTuple newtup = NULL;
- int i;
-
- evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
- evec->n = 2;
- ev0p = &(evec->vector[0]);
- ev1p = &(evec->vector[1]);
-
- gistDeCompressAtt(giststate, r, oldtup, NULL,
- (OffsetNumber) 0, oldatt, oldisnull);
-
- gistDeCompressAtt(giststate, r, addtup, NULL,
- (OffsetNumber) 0, addatt, addisnull);
-
- for (i = 0; i < r->rd_att->natts; i++)
- {
- if (oldisnull[i] && addisnull[i])
- {
- attr[i] = (Datum) 0;
- isnull[i] = TRUE;
- }
- else
- {
- Datum datum;
- int datumsize;
-
- FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes,
- addisnull[i], addatt[i].key, addatt[i].bytes);
-
- datum = FunctionCall2(&giststate->unionFn[i],
- PointerGetDatum(evec),
- PointerGetDatum(&datumsize));
-
- if (oldisnull[i] || addisnull[i])
- {
- if (oldisnull[i])
- neednew = true;
- }
- else
- {
- bool result;
-
- FunctionCall3(&giststate->equalFn[i],
- ev0p->key,
- datum,
- PointerGetDatum(&result));
+ /* now state->stack->(page, buffer and blkno) points to leaf page, so insert */
- if (!result)
- neednew = true;
- }
-
- gistcentryinit(giststate, i, ¢ry[i], datum,
- NULL, NULL, (OffsetNumber) 0,
- datumsize, FALSE, FALSE);
-
- attr[i] = centry[i].key;
- isnull[i] = FALSE;
- }
+ /* form state->path to work xlog */
+ ptr = state->stack;
+ state->pathlen=1;
+ while( ptr ) {
+ state->pathlen++;
+ ptr=ptr->parent;
}
-
- if (neednew)
- {
- /* need to update key */
- newtup = index_form_tuple(giststate->tupdesc, attr, isnull);
- newtup->t_tid = oldtup->t_tid;
+ state->path=(BlockNumber*)palloc(sizeof(BlockNumber)*state->pathlen);
+ ptr = state->stack;
+ state->pathlen=0;
+ while( ptr ) {
+ state->path[ state->pathlen ] = ptr->blkno;
+ state->pathlen++;
+ ptr=ptr->parent;
}
-
- return newtup;
+ state->pathlen--;
+ state->path++;
}
-static void
-gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
-{
- int lr;
-
- for (lr = 0; lr < 2; lr++)
- {
- OffsetNumber *entries;
- int i;
- Datum *attr;
- int len,
- *attrsize;
- bool *isnull;
- GistEntryVector *evec;
-
- if (lr)
- {
- attrsize = spl->spl_lattrsize;
- attr = spl->spl_lattr;
- len = spl->spl_nleft;
- entries = spl->spl_left;
- isnull = spl->spl_lisnull;
- }
- else
- {
- attrsize = spl->spl_rattrsize;
- attr = spl->spl_rattr;
- len = spl->spl_nright;
- entries = spl->spl_right;
- isnull = spl->spl_risnull;
- }
-
- evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
-
- for (i = 1; i < r->rd_att->natts; i++)
- {
- int j;
- Datum datum;
- int datumsize;
- int real_len;
-
- real_len = 0;
- for (j = 0; j < len; j++)
- {
- bool IsNull;
-
- if (spl->spl_idgrp[entries[j]])
- continue;
- datum = index_getattr(itvec[entries[j] - 1], i + 1,
- giststate->tupdesc, &IsNull);
- if (IsNull)
- continue;
- gistdentryinit(giststate, i,
- &(evec->vector[real_len]),
- datum,
- NULL, NULL, (OffsetNumber) 0,
- ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
- FALSE, IsNull);
- real_len++;
-
- }
- if (real_len == 0)
- {
- datum = (Datum) 0;
- datumsize = 0;
- isnull[i] = true;
- }
- else
- {
- /*
- * evec->vector[0].bytes may be not defined, so form union
- * with itself
- */
- if (real_len == 1)
- {
- evec->n = 2;
- memcpy(&(evec->vector[1]), &(evec->vector[0]),
- sizeof(GISTENTRY));
- }
- else
- evec->n = real_len;
- datum = FunctionCall2(&giststate->unionFn[i],
- PointerGetDatum(evec),
- PointerGetDatum(&datumsize));
- isnull[i] = false;
- }
+void
+gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) {
+ int is_splitted;
+ ItemId iid;
+ IndexTuple oldtup, newtup;
- attr[i] = datum;
- attrsize[i] = datumsize;
- }
- }
-}
+ /* walk up */
+ while( true ) {
+ /*
+ * After this call: 1. if child page was splited, then itup
+ * contains keys for each page 2. if child page wasn't splited,
+ * then itup contains additional for adjustment of current key
+ */
-/*
- * find group in vector with equal value
- */
-static int
-gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
-{
- int i;
- int curid = 1;
-
- /*
- * first key is always not null (see gistinsert), so we may not check
- * for nulls
- */
- for (i = 0; i < spl->spl_nleft; i++)
- {
- int j;
- int len;
- bool result;
-
- if (spl->spl_idgrp[spl->spl_left[i]])
- continue;
- len = 0;
- /* find all equal value in right part */
- for (j = 0; j < spl->spl_nright; j++)
- {
- if (spl->spl_idgrp[spl->spl_right[j]])
- continue;
- FunctionCall3(&giststate->equalFn[0],
- valvec[spl->spl_left[i]].key,
- valvec[spl->spl_right[j]].key,
- PointerGetDatum(&result));
- if (result)
- {
- spl->spl_idgrp[spl->spl_right[j]] = curid;
- len++;
- }
- }
- /* find all other equal value in left part */
- if (len)
- {
- /* add current val to list of equal values */
- spl->spl_idgrp[spl->spl_left[i]] = curid;
- /* searching .. */
- for (j = i + 1; j < spl->spl_nleft; j++)
- {
- if (spl->spl_idgrp[spl->spl_left[j]])
- continue;
- FunctionCall3(&giststate->equalFn[0],
- valvec[spl->spl_left[i]].key,
- valvec[spl->spl_left[j]].key,
- PointerGetDatum(&result));
- if (result)
- {
- spl->spl_idgrp[spl->spl_left[j]] = curid;
- len++;
- }
- }
- spl->spl_ngrp[curid] = len + 1;
- curid++;
- }
- }
+ is_splitted = gistplacetopage(state, giststate );
- return curid;
-}
+ /* pop page from stack */
+ state->stack = state->stack->parent;
+ state->pathlen--;
+ state->path++;
+
+ /* stack is void */
+ if ( ! state->stack )
+ break;
-/*
- * Insert equivalent tuples to left or right page with minimum
- * penalty
- */
-static void
-gistadjsubkey(Relation r,
- IndexTuple *itup, /* contains compressed entry */
- int *len,
- GIST_SPLITVEC *v,
- GISTSTATE *giststate)
-{
- int curlen;
- OffsetNumber *curwpos;
- GISTENTRY entry,
- identry[INDEX_MAX_KEYS],
- *ev0p,
- *ev1p;
- float lpenalty,
- rpenalty;
- GistEntryVector *evec;
- int datumsize;
- bool isnull[INDEX_MAX_KEYS];
- int i,
- j;
- /* clear vectors */
- curlen = v->spl_nleft;
- curwpos = v->spl_left;
- for (i = 0; i < v->spl_nleft; i++)
- {
- if (v->spl_idgrp[v->spl_left[i]] == 0)
+ /* child did not split */
+ if (!is_splitted)
{
- *curwpos = v->spl_left[i];
- curwpos++;
- }
- else
- curlen--;
- }
- v->spl_nleft = curlen;
+ /* parent's tuple */
+ iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
+ oldtup = (IndexTuple) PageGetItem(state->stack->page, iid);
+ newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate);
+
+ if (!newtup) /* not need to update key */
+ break;
- curlen = v->spl_nright;
- curwpos = v->spl_right;
- for (i = 0; i < v->spl_nright; i++)
- {
- if (v->spl_idgrp[v->spl_right[i]] == 0)
- {
- *curwpos = v->spl_right[i];
- curwpos++;
+ state->itup[0] = newtup;
}
- else
- curlen--;
+
+ /*
+ * This node's key has been modified, either because a child
+ * split occurred or because we needed to adjust our key for
+ * an insert in a child node. Therefore, remove the old
+ * version of this node's key.
+ */
+ gistadjscans(state->r, GISTOP_DEL, state->stack->blkno, state->stack->childoffnum);
+ PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
+ if ( !state->r->rd_istemp )
+ state->stack->todelete = true;
+
+ /*
+ * if child was splitted, new key for child will be inserted in
+ * the end list of child, so we must say to any scans that page is
+ * changed beginning from 'child' offset
+ */
+ if (is_splitted)
+ gistadjscans(state->r, GISTOP_SPLIT, state->stack->blkno, state->stack->childoffnum);
+ } /* while */
+
+ /* release all buffers */
+ while( state->stack ) {
+ if ( state->xlog_mode )
+ LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(state->stack->buffer);
+ state->stack = state->stack->parent;
}
- v->spl_nright = curlen;
- evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
- evec->n = 2;
- ev0p = &(evec->vector[0]);
- ev1p = &(evec->vector[1]);
+ /* say to xlog that insert is completed */
+ if ( !state->xlog_mode && state->needInsertComplete && !state->r->rd_istemp ) {
+ gistxlogInsertComplete xlrec;
+ XLogRecData rdata;
+
+ xlrec.node = state->r->rd_node;
+ xlrec.key = state->key;
+
+ rdata.buffer = InvalidBuffer;
+ rdata.data = (char *) &xlrec;
+ rdata.len = sizeof( gistxlogInsertComplete );
+ rdata.next = NULL;
- /* add equivalent tuple */
- for (i = 0; i < *len; i++)
- {
- Datum datum;
+ START_CRIT_SECTION();
- if (v->spl_idgrp[i + 1] == 0) /* already inserted */
- continue;
- gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0,
- identry, isnull);
+ XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, &rdata);
- v->spl_ngrp[v->spl_idgrp[i + 1]]--;
- if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 &&
- (v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED)
- {
- /* force last in group */
- rpenalty = 1.0;
- lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0;
- }
- else
- {
- /* where? */
- for (j = 1; j < r->rd_att->natts; j++)
- {
- gistentryinit(entry, v->spl_lattr[j], r, NULL,
- (OffsetNumber) 0, v->spl_lattrsize[j], FALSE);
- gistpenalty(giststate, j, &entry, v->spl_lisnull[j],
- &identry[j], isnull[j], &lpenalty);
-
- gistentryinit(entry, v->spl_rattr[j], r, NULL,
- (OffsetNumber) 0, v->spl_rattrsize[j], FALSE);
- gistpenalty(giststate, j, &entry, v->spl_risnull[j],
- &identry[j], isnull[j], &rpenalty);
-
- if (lpenalty != rpenalty)
- break;
- }
- }
-
- /*
- * add
- * XXX: refactor this to avoid duplicating code
- */
- if (lpenalty < rpenalty)
- {
- v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED;
- v->spl_left[v->spl_nleft] = i + 1;
- v->spl_nleft++;
- for (j = 1; j < r->rd_att->natts; j++)
- {
- if (isnull[j] && v->spl_lisnull[j])
- {
- v->spl_lattr[j] = (Datum) 0;
- v->spl_lattrsize[j] = 0;
- }
- else
- {
- FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j],
- isnull[j], identry[j].key, identry[j].bytes);
-
- datum = FunctionCall2(&giststate->unionFn[j],
- PointerGetDatum(evec),
- PointerGetDatum(&datumsize));
-
- v->spl_lattr[j] = datum;
- v->spl_lattrsize[j] = datumsize;
- v->spl_lisnull[j] = false;
- }
- }
- }
- else
- {
- v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED;
- v->spl_right[v->spl_nright] = i + 1;
- v->spl_nright++;
- for (j = 1; j < r->rd_att->natts; j++)
- {
- if (isnull[j] && v->spl_risnull[j])
- {
- v->spl_rattr[j] = (Datum) 0;
- v->spl_rattrsize[j] = 0;
- }
- else
- {
- FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j],
- isnull[j], identry[j].key, identry[j].bytes);
-
- datum = FunctionCall2(&giststate->unionFn[j],
- PointerGetDatum(evec),
- PointerGetDatum(&datumsize));
-
- v->spl_rattr[j] = datum;
- v->spl_rattrsize[j] = datumsize;
- v->spl_risnull[j] = false;
- }
- }
- }
+ END_CRIT_SECTION();
}
}
Buffer buffer,
IndexTuple *itup, /* contains compressed entry */
int *len,
+ PageLayout **dist,
GISTSTATE *giststate)
{
Page p;
/* write on disk (may need another split) */
if (gistnospace(right, rvectup, v.spl_nright))
{
+ int i;
+ PageLayout *d, *origd=*dist;
+
nlen = v.spl_nright;
- newtup = gistSplit(r, rightbuf, rvectup, &nlen, giststate);
+ newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate);
+ /* XLOG stuff */
+ d=*dist;
+ /* translate offsetnumbers to our */
+ while( d && d!=origd ) {
+ for(i=0;i<d->block.num;i++)
+ d->list[i] = v.spl_right[ d->list[i]-1 ];
+ d=d->next;
+ }
ReleaseBuffer(rightbuf);
}
else
{
OffsetNumber l;
- l = gistwritebuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber);
- WriteBuffer(rightbuf);
-
+ l = gistfillbuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber);
+ /* XLOG stuff */
+ ROTATEDIST(*dist);
+ (*dist)->block.blkno = BufferGetBlockNumber(rightbuf);
+ (*dist)->block.num = v.spl_nright;
+ (*dist)->list = v.spl_right;
+ (*dist)->buffer = rightbuf;
+
nlen = 1;
newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1);
newtup[0] = gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull);
- ItemPointerSet(&(newtup[0]->t_tid), rbknum, 1);
+ ItemPointerSet(&(newtup[0]->t_tid), rbknum, FirstOffsetNumber);
}
if (gistnospace(left, lvectup, v.spl_nleft))
{
int llen = v.spl_nleft;
IndexTuple *lntup;
-
- lntup = gistSplit(r, leftbuf, lvectup, &llen, giststate);
+ int i;
+ PageLayout *d, *origd=*dist;
+
+ lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate);
+
+ /* XLOG stuff */
+ d=*dist;
+ /* translate offsetnumbers to our */
+ while( d && d!=origd ) {
+ for(i=0;i<d->block.num;i++)
+ d->list[i] = v.spl_left[ d->list[i]-1 ];
+ d=d->next;
+ }
+
ReleaseBuffer(leftbuf);
newtup = gistjoinvector(newtup, &nlen, lntup, llen);
{
OffsetNumber l;
- l = gistwritebuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber);
+ l = gistfillbuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber);
if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO)
PageRestoreTempPage(left, p);
- WriteBuffer(leftbuf);
-
+ /* XLOG stuff */
+ ROTATEDIST(*dist);
+ (*dist)->block.blkno = BufferGetBlockNumber(leftbuf);
+ (*dist)->block.num = v.spl_nleft;
+ (*dist)->list = v.spl_left;
+ (*dist)->buffer = leftbuf;
+
nlen += 1;
newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen);
newtup[nlen - 1] = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull);
- ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, 1);
+ ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, FirstOffsetNumber);
}
*len = nlen;
return newtup;
}
-static void
-gistnewroot(Relation r, IndexTuple *itup, int len)
-{
- Buffer b;
- Page p;
-
- b = ReadBuffer(r, GIST_ROOT_BLKNO);
- GISTInitBuffer(b, 0);
- p = BufferGetPage(b);
-
- gistwritebuffer(r, p, itup, len, FirstOffsetNumber);
- WriteBuffer(b);
-}
-
-static void
-GISTInitBuffer(Buffer b, uint32 f)
+void
+gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode)
{
- GISTPageOpaque opaque;
+ Buffer buffer;
Page page;
- Size pageSize;
-
- pageSize = BufferGetPageSize(b);
- page = BufferGetPage(b);
- PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
-
- opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
- opaque->flags = f;
-}
-
-/*
- * find entry with lowest penalty
- */
-static OffsetNumber
-gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
- GISTSTATE *giststate)
-{
- OffsetNumber maxoff;
- OffsetNumber i;
- OffsetNumber which;
- float sum_grow,
- which_grow[INDEX_MAX_KEYS];
- GISTENTRY entry,
- identry[INDEX_MAX_KEYS];
- bool isnull[INDEX_MAX_KEYS];
-
- maxoff = PageGetMaxOffsetNumber(p);
- *which_grow = -1.0;
- which = -1;
- sum_grow = 1;
- gistDeCompressAtt(giststate, r,
- it, NULL, (OffsetNumber) 0,
- identry, isnull);
-
- for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i))
- {
- int j;
- IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
- sum_grow = 0;
- for (j = 0; j < r->rd_att->natts; j++)
- {
- Datum datum;
- float usize;
- bool IsNull;
-
- datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
- gistdentryinit(giststate, j, &entry, datum, r, p, i,
- ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull),
- FALSE, IsNull);
- gistpenalty(giststate, j, &entry, IsNull,
- &identry[j], isnull[j], &usize);
-
- if (which_grow[j] < 0 || usize < which_grow[j])
- {
- which = i;
- which_grow[j] = usize;
- if (j < r->rd_att->natts - 1 && i == FirstOffsetNumber)
- which_grow[j + 1] = -1;
- sum_grow += which_grow[j];
- }
- else if (which_grow[j] == usize)
- sum_grow += usize;
- else
- {
- sum_grow = 1;
- break;
- }
- }
+ buffer = (xlog_mode) ? XLogReadBuffer(false, r, GIST_ROOT_BLKNO) : ReadBuffer(r, GIST_ROOT_BLKNO);
+ GISTInitBuffer(buffer, 0);
+ page = BufferGetPage(buffer);
+
+ gistfillbuffer(r, page, itup, len, FirstOffsetNumber);
+ if ( !xlog_mode && !r->rd_istemp ) {
+ gistxlogEntryUpdate xlrec;
+ XLogRecPtr recptr;
+ XLogRecData *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( len + 1 ) );
+ int i;
+
+ xlrec.node = r->rd_node;
+ xlrec.blkno = GIST_ROOT_BLKNO;
+ xlrec.todeleteoffnum = InvalidOffsetNumber;
+ xlrec.key = *key;
+ xlrec.pathlen=0;
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = sizeof( gistxlogEntryUpdate );
+ rdata[0].next = NULL;
+
+ for(i=1; i<=len; i++) {
+ rdata[i].buffer = InvalidBuffer;
+ rdata[i].data = (char*)(itup[i-1]);
+ rdata[i].len = IndexTupleSize(itup[i-1]);
+ rdata[i].next = NULL;
+ rdata[i-1].next = &(rdata[i]);
+ }
+
+ START_CRIT_SECTION();
+
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+
+ END_CRIT_SECTION();
}
-
- return which;
+ if ( xlog_mode )
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ WriteBuffer(buffer);
}
-/*
- * Retail deletion of a single tuple.
- *
- * NB: this is no longer called externally, but is still needed by
- * gistlayerinsert(). That dependency will have to be fixed if GIST
- * is ever going to allow concurrent insertions.
- */
-static void
-gistdelete(Relation r, ItemPointer tid)
-{
- BlockNumber blkno;
- OffsetNumber offnum;
- Buffer buf;
- Page page;
-
- /*
- * Since GIST is not marked "amconcurrent" in pg_am, caller should
- * have acquired exclusive lock on index relation. We need no locking
- * here.
- */
-
- blkno = ItemPointerGetBlockNumber(tid);
- offnum = ItemPointerGetOffsetNumber(tid);
-
- /* adjust any scans that will be affected by this deletion */
- /* NB: this works only for scans in *this* backend! */
- gistadjscans(r, GISTOP_DEL, blkno, offnum);
-
- /* delete the index tuple */
- buf = ReadBuffer(r, blkno);
- page = BufferGetPage(buf);
-
- PageIndexTupleDelete(page, offnum);
-
- WriteBuffer(buf);
-}
/*
* Bulk deletion of all index entries pointing to a set of heap tuples.
page = BufferGetPage(buf);
PageIndexTupleDelete(page, offnum);
+ if ( !rel->rd_istemp ) {
+ gistxlogEntryUpdate xlrec;
+ XLogRecPtr recptr;
+ XLogRecData rdata;
+
+ xlrec.node = rel->rd_node;
+ xlrec.blkno = blkno;
+ xlrec.todeleteoffnum = offnum;
+ xlrec.pathlen=0;
+ ItemPointerSetInvalid( &(xlrec.key) );
+
+ rdata.buffer = InvalidBuffer;
+ rdata.data = (char *) &xlrec;
+ rdata.len = sizeof( gistxlogEntryUpdate );
+ rdata.next = NULL;
+
+ START_CRIT_SECTION();
+
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_DELETE, &rdata);
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+
+ END_CRIT_SECTION();
+ }
WriteBuffer(buf);
/* no work */
}
-#ifdef GIST_PAGEADDITEM
-/*
- * Given an IndexTuple to be inserted on a page, this routine replaces
- * the key with another key, which may involve generating a new IndexTuple
- * if the sizes don't match or if the null status changes.
- *
- * XXX this only works for a single-column index tuple!
- */
-static IndexTuple
-gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t)
-{
- bool IsNull;
- Datum datum = index_getattr(t, 1, r->rd_att, &IsNull);
-
- /*
- * If new entry fits in index tuple, copy it in. To avoid worrying
- * about null-value bitmask, pass it off to the general
- * index_form_tuple routine if either the previous or new value is
- * NULL.
- */
- if (!IsNull && DatumGetPointer(entry.key) != NULL &&
- (Size) entry.bytes <= ATTSIZE(datum, r, 1, IsNull))
- {
- memcpy(DatumGetPointer(datum),
- DatumGetPointer(entry.key),
- entry.bytes);
- /* clear out old size */
- t->t_info &= ~INDEX_SIZE_MASK;
- /* or in new size */
- t->t_info |= MAXALIGN(entry.bytes + sizeof(IndexTupleData));
-
- return t;
- }
- else
- {
- /* generate a new index tuple for the compressed entry */
- TupleDesc tupDesc = r->rd_att;
- IndexTuple newtup;
- bool isnull;
-
- isnull = (DatumGetPointer(entry.key) == NULL);
- newtup = index_form_tuple(tupDesc, &(entry.key), &isnull);
- newtup->t_tid = t->t_tid;
- return newtup;
- }
-}
-#endif
-
-/*
- * initialize a GiST entry with a decompressed version of key
- */
-void
-gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
- Datum k, Relation r, Page pg, OffsetNumber o,
- int b, bool l, bool isNull)
-{
- if (b && !isNull)
- {
- GISTENTRY *dep;
-
- gistentryinit(*e, k, r, pg, o, b, l);
- dep = (GISTENTRY *)
- DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey],
- PointerGetDatum(e)));
- /* decompressFn may just return the given pointer */
- if (dep != e)
- gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
- dep->bytes, dep->leafkey);
- }
- else
- gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
-}
-
-
-/*
- * initialize a GiST entry with a compressed version of key
- */
-static void
-gistcentryinit(GISTSTATE *giststate, int nkey,
- GISTENTRY *e, Datum k, Relation r,
- Page pg, OffsetNumber o, int b, bool l, bool isNull)
-{
- if (!isNull)
- {
- GISTENTRY *cep;
-
- gistentryinit(*e, k, r, pg, o, b, l);
- cep = (GISTENTRY *)
- DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey],
- PointerGetDatum(e)));
- /* compressFn may just return the given pointer */
- if (cep != e)
- gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset,
- cep->bytes, cep->leafkey);
- }
- else
- gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
-}
-
-static IndexTuple
-gistFormTuple(GISTSTATE *giststate, Relation r,
- Datum attdata[], int datumsize[], bool isnull[])
-{
- GISTENTRY centry[INDEX_MAX_KEYS];
- Datum compatt[INDEX_MAX_KEYS];
- int i;
-
- for (i = 0; i < r->rd_att->natts; i++)
- {
- if (isnull[i])
- compatt[i] = (Datum) 0;
- else
- {
- gistcentryinit(giststate, i, ¢ry[i], attdata[i],
- NULL, NULL, (OffsetNumber) 0,
- datumsize[i], FALSE, FALSE);
- compatt[i] = centry[i].key;
- }
- }
-
- return index_form_tuple(giststate->tupdesc, compatt, isnull);
-}
-
-static void
-gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
- OffsetNumber o, GISTENTRY *attdata, bool *isnull)
-{
- int i;
-
- for (i = 0; i < r->rd_att->natts; i++)
- {
- Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
- gistdentryinit(giststate, i, &attdata[i],
- datum, r, p, o,
- ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]),
- FALSE, isnull[i]);
- }
-}
-
-static void
-gistpenalty(GISTSTATE *giststate, int attno,
- GISTENTRY *key1, bool isNull1,
- GISTENTRY *key2, bool isNull2, float *penalty)
-{
- if (giststate->penaltyFn[attno].fn_strict && (isNull1 || isNull2))
- *penalty = 0.0;
- else
- FunctionCall3(&giststate->penaltyFn[attno],
- PointerGetDatum(key1),
- PointerGetDatum(key2),
- PointerGetDatum(penalty));
-}
-
#ifdef GISTDEBUG
static void
gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff)
}
#endif /* defined GISTDEBUG */
-void
-gist_redo(XLogRecPtr lsn, XLogRecord *record)
-{
- elog(PANIC, "gist_redo: unimplemented");
-}
-
-void
-gist_desc(char *buf, uint8 xl_info, char *rec)
-{
-}
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * gistutil.c
+ * utilities routines for the postgres GiST index access method.
+ *
+ *
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.1 2005/06/14 11:45:13 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/gist_private.h"
+#include "access/gistscan.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "miscadmin.h"
+
+/* group flags ( in gistadjsubkey ) */
+#define LEFT_ADDED 0x01
+#define RIGHT_ADDED 0x02
+#define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED )
+
+
+/*
+ * This defines is only for shorter code, used in gistgetadjusted
+ * and gistadjsubkey only
+ */
+#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb) do { \
+ if (isnullkey) { \
+ gistentryinit((evp), rkey, r, NULL, \
+ (OffsetNumber) 0, rkeyb, FALSE); \
+ } else { \
+ gistentryinit((evp), okey, r, NULL, \
+ (OffsetNumber) 0, okeyb, FALSE); \
+ } \
+} while(0)
+
+#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \
+ FILLITEM(*ev0p, isnull1, key1, key1b, key2, key2b); \
+ FILLITEM(*ev1p, isnull2, key2, key2b, key1, key1b); \
+} while(0);
+
+
+static void
+gistpenalty(GISTSTATE *giststate, int attno,
+ GISTENTRY *key1, bool isNull1,
+ GISTENTRY *key2, bool isNull2, float *penalty);
+
+/*
+ * Write itup vector to page, has no control of free space
+ */
+OffsetNumber
+gistfillbuffer(Relation r, Page page, IndexTuple *itup,
+ int len, OffsetNumber off)
+{
+ OffsetNumber l = InvalidOffsetNumber;
+ int i;
+
+ for (i = 0; i < len; i++)
+ {
+ l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]),
+ off, LP_USED);
+ if (l == InvalidOffsetNumber)
+ elog(ERROR, "gistfillbuffer: failed to add index item to \"%s\"",
+ RelationGetRelationName(r));
+ off++;
+ }
+ return l;
+}
+
+/*
+ * Check space for itup vector on page
+ */
+bool
+gistnospace(Page page, IndexTuple *itvec, int len)
+{
+ unsigned int size = 0;
+ int i;
+
+ for (i = 0; i < len; i++)
+ size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
+
+ return (PageGetFreeSpace(page) < size);
+}
+
+/*
+ * Read buffer into itup vector
+ */
+IndexTuple *
+gistextractbuffer(Buffer buffer, int *len /* out */ )
+{
+ OffsetNumber i,
+ maxoff;
+ IndexTuple *itvec;
+ Page p = (Page) BufferGetPage(buffer);
+
+ maxoff = PageGetMaxOffsetNumber(p);
+ *len = maxoff;
+ itvec = palloc(sizeof(IndexTuple) * maxoff);
+ for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ itvec[i - 1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
+
+ return itvec;
+}
+
+/*
+ * join two vectors into one
+ */
+IndexTuple *
+gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
+{
+ itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
+ memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
+ *len += addlen;
+ return itvec;
+}
+
+/*
+ * Return an IndexTuple containing the result of applying the "union"
+ * method to the specified IndexTuple vector.
+ */
+IndexTuple
+gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
+{
+ Datum attr[INDEX_MAX_KEYS];
+ bool isnull[INDEX_MAX_KEYS];
+ GistEntryVector *evec;
+ int i;
+ GISTENTRY centry[INDEX_MAX_KEYS];
+
+ evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
+
+ for (i = 0; i < r->rd_att->natts; i++)
+ {
+ Datum datum;
+ int j;
+ int real_len;
+
+ real_len = 0;
+ for (j = 0; j < len; j++)
+ {
+ bool IsNull;
+ datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
+ if (IsNull)
+ continue;
+
+ gistdentryinit(giststate, i,
+ &(evec->vector[real_len]),
+ datum,
+ NULL, NULL, (OffsetNumber) 0,
+ ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
+ FALSE, IsNull);
+ real_len++;
+ }
+
+ /* If this tuple vector was all NULLs, the union is NULL */
+ if (real_len == 0)
+ {
+ attr[i] = (Datum) 0;
+ isnull[i] = TRUE;
+ }
+ else
+ {
+ int datumsize;
+
+ if (real_len == 1)
+ {
+ evec->n = 2;
+ gistentryinit(evec->vector[1],
+ evec->vector[0].key, r, NULL,
+ (OffsetNumber) 0, evec->vector[0].bytes, FALSE);
+ }
+ else
+ evec->n = real_len;
+
+ /* Compress the result of the union and store in attr array */
+ datum = FunctionCall2(&giststate->unionFn[i],
+ PointerGetDatum(evec),
+ PointerGetDatum(&datumsize));
+
+ gistcentryinit(giststate, i, ¢ry[i], datum,
+ NULL, NULL, (OffsetNumber) 0,
+ datumsize, FALSE, FALSE);
+ isnull[i] = FALSE;
+ attr[i] = centry[i].key;
+ }
+ }
+
+ return index_form_tuple(giststate->tupdesc, attr, isnull);
+}
+
+
+/*
+ * Forms union of oldtup and addtup, if union == oldtup then return NULL
+ */
+IndexTuple
+gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
+{
+ GistEntryVector *evec;
+ bool neednew = false;
+ bool isnull[INDEX_MAX_KEYS];
+ Datum attr[INDEX_MAX_KEYS];
+ GISTENTRY centry[INDEX_MAX_KEYS],
+ oldatt[INDEX_MAX_KEYS],
+ addatt[INDEX_MAX_KEYS],
+ *ev0p,
+ *ev1p;
+ bool oldisnull[INDEX_MAX_KEYS],
+ addisnull[INDEX_MAX_KEYS];
+ IndexTuple newtup = NULL;
+ int i;
+
+ evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
+ evec->n = 2;
+ ev0p = &(evec->vector[0]);
+ ev1p = &(evec->vector[1]);
+
+ gistDeCompressAtt(giststate, r, oldtup, NULL,
+ (OffsetNumber) 0, oldatt, oldisnull);
+
+ gistDeCompressAtt(giststate, r, addtup, NULL,
+ (OffsetNumber) 0, addatt, addisnull);
+
+ for (i = 0; i < r->rd_att->natts; i++)
+ {
+ if (oldisnull[i] && addisnull[i])
+ {
+ attr[i] = (Datum) 0;
+ isnull[i] = TRUE;
+ }
+ else
+ {
+ Datum datum;
+ int datumsize;
+
+ FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes,
+ addisnull[i], addatt[i].key, addatt[i].bytes);
+
+ datum = FunctionCall2(&giststate->unionFn[i],
+ PointerGetDatum(evec),
+ PointerGetDatum(&datumsize));
+
+ if (oldisnull[i] || addisnull[i])
+ {
+ if (oldisnull[i])
+ neednew = true;
+ }
+ else
+ {
+ bool result;
+
+ FunctionCall3(&giststate->equalFn[i],
+ ev0p->key,
+ datum,
+ PointerGetDatum(&result));
+
+ if (!result)
+ neednew = true;
+ }
+
+ gistcentryinit(giststate, i, ¢ry[i], datum,
+ NULL, NULL, (OffsetNumber) 0,
+ datumsize, FALSE, FALSE);
+
+ attr[i] = centry[i].key;
+ isnull[i] = FALSE;
+ }
+ }
+
+ if (neednew)
+ {
+ /* need to update key */
+ newtup = index_form_tuple(giststate->tupdesc, attr, isnull);
+ newtup->t_tid = oldtup->t_tid;
+ }
+
+ return newtup;
+}
+
+void
+gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
+{
+ int lr;
+
+ for (lr = 0; lr < 2; lr++)
+ {
+ OffsetNumber *entries;
+ int i;
+ Datum *attr;
+ int len,
+ *attrsize;
+ bool *isnull;
+ GistEntryVector *evec;
+
+ if (lr)
+ {
+ attrsize = spl->spl_lattrsize;
+ attr = spl->spl_lattr;
+ len = spl->spl_nleft;
+ entries = spl->spl_left;
+ isnull = spl->spl_lisnull;
+ }
+ else
+ {
+ attrsize = spl->spl_rattrsize;
+ attr = spl->spl_rattr;
+ len = spl->spl_nright;
+ entries = spl->spl_right;
+ isnull = spl->spl_risnull;
+ }
+
+ evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
+
+ for (i = 1; i < r->rd_att->natts; i++)
+ {
+ int j;
+ Datum datum;
+ int datumsize;
+ int real_len;
+
+ real_len = 0;
+ for (j = 0; j < len; j++)
+ {
+ bool IsNull;
+
+ if (spl->spl_idgrp[entries[j]])
+ continue;
+ datum = index_getattr(itvec[entries[j] - 1], i + 1,
+ giststate->tupdesc, &IsNull);
+ if (IsNull)
+ continue;
+ gistdentryinit(giststate, i,
+ &(evec->vector[real_len]),
+ datum,
+ NULL, NULL, (OffsetNumber) 0,
+ ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
+ FALSE, IsNull);
+ real_len++;
+
+ }
+
+ if (real_len == 0)
+ {
+ datum = (Datum) 0;
+ datumsize = 0;
+ isnull[i] = true;
+ }
+ else
+ {
+ /*
+ * evec->vector[0].bytes may be not defined, so form union
+ * with itself
+ */
+ if (real_len == 1)
+ {
+ evec->n = 2;
+ memcpy(&(evec->vector[1]), &(evec->vector[0]),
+ sizeof(GISTENTRY));
+ }
+ else
+ evec->n = real_len;
+ datum = FunctionCall2(&giststate->unionFn[i],
+ PointerGetDatum(evec),
+ PointerGetDatum(&datumsize));
+ isnull[i] = false;
+ }
+
+ attr[i] = datum;
+ attrsize[i] = datumsize;
+ }
+ }
+}
+
+/*
+ * find group in vector with equal value
+ */
+int
+gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
+{
+ int i;
+ int curid = 1;
+
+ /*
+ * first key is always not null (see gistinsert), so we may not check
+ * for nulls
+ */
+ for (i = 0; i < spl->spl_nleft; i++)
+ {
+ int j;
+ int len;
+ bool result;
+
+ if (spl->spl_idgrp[spl->spl_left[i]])
+ continue;
+ len = 0;
+ /* find all equal value in right part */
+ for (j = 0; j < spl->spl_nright; j++)
+ {
+ if (spl->spl_idgrp[spl->spl_right[j]])
+ continue;
+ FunctionCall3(&giststate->equalFn[0],
+ valvec[spl->spl_left[i]].key,
+ valvec[spl->spl_right[j]].key,
+ PointerGetDatum(&result));
+ if (result)
+ {
+ spl->spl_idgrp[spl->spl_right[j]] = curid;
+ len++;
+ }
+ }
+ /* find all other equal value in left part */
+ if (len)
+ {
+ /* add current val to list of equal values */
+ spl->spl_idgrp[spl->spl_left[i]] = curid;
+ /* searching .. */
+ for (j = i + 1; j < spl->spl_nleft; j++)
+ {
+ if (spl->spl_idgrp[spl->spl_left[j]])
+ continue;
+ FunctionCall3(&giststate->equalFn[0],
+ valvec[spl->spl_left[i]].key,
+ valvec[spl->spl_left[j]].key,
+ PointerGetDatum(&result));
+ if (result)
+ {
+ spl->spl_idgrp[spl->spl_left[j]] = curid;
+ len++;
+ }
+ }
+ spl->spl_ngrp[curid] = len + 1;
+ curid++;
+ }
+ }
+
+ return curid;
+}
+
+/*
+ * Insert equivalent tuples to left or right page with minimum
+ * penalty
+ */
+void
+gistadjsubkey(Relation r,
+ IndexTuple *itup, /* contains compressed entry */
+ int *len,
+ GIST_SPLITVEC *v,
+ GISTSTATE *giststate)
+{
+ int curlen;
+ OffsetNumber *curwpos;
+ GISTENTRY entry,
+ identry[INDEX_MAX_KEYS],
+ *ev0p,
+ *ev1p;
+ float lpenalty,
+ rpenalty;
+ GistEntryVector *evec;
+ int datumsize;
+ bool isnull[INDEX_MAX_KEYS];
+ int i,
+ j;
+
+ /* clear vectors */
+ curlen = v->spl_nleft;
+ curwpos = v->spl_left;
+ for (i = 0; i < v->spl_nleft; i++)
+ {
+ if (v->spl_idgrp[v->spl_left[i]] == 0)
+ {
+ *curwpos = v->spl_left[i];
+ curwpos++;
+ }
+ else
+ curlen--;
+ }
+ v->spl_nleft = curlen;
+
+ curlen = v->spl_nright;
+ curwpos = v->spl_right;
+ for (i = 0; i < v->spl_nright; i++)
+ {
+ if (v->spl_idgrp[v->spl_right[i]] == 0)
+ {
+ *curwpos = v->spl_right[i];
+ curwpos++;
+ }
+ else
+ curlen--;
+ }
+ v->spl_nright = curlen;
+
+ evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
+ evec->n = 2;
+ ev0p = &(evec->vector[0]);
+ ev1p = &(evec->vector[1]);
+
+ /* add equivalent tuple */
+ for (i = 0; i < *len; i++)
+ {
+ Datum datum;
+
+ if (v->spl_idgrp[i + 1] == 0) /* already inserted */
+ continue;
+ gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0,
+ identry, isnull);
+
+ v->spl_ngrp[v->spl_idgrp[i + 1]]--;
+ if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 &&
+ (v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED)
+ {
+ /* force last in group */
+ rpenalty = 1.0;
+ lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0;
+ }
+ else
+ {
+ /* where? */
+ for (j = 1; j < r->rd_att->natts; j++)
+ {
+ gistentryinit(entry, v->spl_lattr[j], r, NULL,
+ (OffsetNumber) 0, v->spl_lattrsize[j], FALSE);
+ gistpenalty(giststate, j, &entry, v->spl_lisnull[j],
+ &identry[j], isnull[j], &lpenalty);
+
+ gistentryinit(entry, v->spl_rattr[j], r, NULL,
+ (OffsetNumber) 0, v->spl_rattrsize[j], FALSE);
+ gistpenalty(giststate, j, &entry, v->spl_risnull[j],
+ &identry[j], isnull[j], &rpenalty);
+
+ if (lpenalty != rpenalty)
+ break;
+ }
+ }
+
+ /*
+ * add
+ * XXX: refactor this to avoid duplicating code
+ */
+ if (lpenalty < rpenalty)
+ {
+ v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED;
+ v->spl_left[v->spl_nleft] = i + 1;
+ v->spl_nleft++;
+ for (j = 1; j < r->rd_att->natts; j++)
+ {
+ if (isnull[j] && v->spl_lisnull[j])
+ {
+ v->spl_lattr[j] = (Datum) 0;
+ v->spl_lattrsize[j] = 0;
+ }
+ else
+ {
+ FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j],
+ isnull[j], identry[j].key, identry[j].bytes);
+
+ datum = FunctionCall2(&giststate->unionFn[j],
+ PointerGetDatum(evec),
+ PointerGetDatum(&datumsize));
+
+ v->spl_lattr[j] = datum;
+ v->spl_lattrsize[j] = datumsize;
+ v->spl_lisnull[j] = false;
+ }
+ }
+ }
+ else
+ {
+ v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED;
+ v->spl_right[v->spl_nright] = i + 1;
+ v->spl_nright++;
+ for (j = 1; j < r->rd_att->natts; j++)
+ {
+ if (isnull[j] && v->spl_risnull[j])
+ {
+ v->spl_rattr[j] = (Datum) 0;
+ v->spl_rattrsize[j] = 0;
+ }
+ else
+ {
+ FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j],
+ isnull[j], identry[j].key, identry[j].bytes);
+
+ datum = FunctionCall2(&giststate->unionFn[j],
+ PointerGetDatum(evec),
+ PointerGetDatum(&datumsize));
+
+ v->spl_rattr[j] = datum;
+ v->spl_rattrsize[j] = datumsize;
+ v->spl_risnull[j] = false;
+ }
+ }
+ }
+ }
+}
+
+/*
+ * find entry with lowest penalty
+ */
+OffsetNumber
+gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
+ GISTSTATE *giststate)
+{
+ OffsetNumber maxoff;
+ OffsetNumber i;
+ OffsetNumber which;
+ float sum_grow,
+ which_grow[INDEX_MAX_KEYS];
+ GISTENTRY entry,
+ identry[INDEX_MAX_KEYS];
+ bool isnull[INDEX_MAX_KEYS];
+
+ maxoff = PageGetMaxOffsetNumber(p);
+ *which_grow = -1.0;
+ which = -1;
+ sum_grow = 1;
+ gistDeCompressAtt(giststate, r,
+ it, NULL, (OffsetNumber) 0,
+ identry, isnull);
+
+ for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i))
+ {
+ int j;
+ IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
+
+ sum_grow = 0;
+ for (j = 0; j < r->rd_att->natts; j++)
+ {
+ Datum datum;
+ float usize;
+ bool IsNull;
+
+ datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
+ gistdentryinit(giststate, j, &entry, datum, r, p, i,
+ ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull),
+ FALSE, IsNull);
+ gistpenalty(giststate, j, &entry, IsNull,
+ &identry[j], isnull[j], &usize);
+
+ if (which_grow[j] < 0 || usize < which_grow[j])
+ {
+ which = i;
+ which_grow[j] = usize;
+ if (j < r->rd_att->natts - 1 && i == FirstOffsetNumber)
+ which_grow[j + 1] = -1;
+ sum_grow += which_grow[j];
+ }
+ else if (which_grow[j] == usize)
+ sum_grow += usize;
+ else
+ {
+ sum_grow = 1;
+ break;
+ }
+ }
+ }
+
+ return which;
+}
+
+/*
+ * initialize a GiST entry with a decompressed version of key
+ */
+void
+gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
+ Datum k, Relation r, Page pg, OffsetNumber o,
+ int b, bool l, bool isNull)
+{
+ if (b && !isNull)
+ {
+ GISTENTRY *dep;
+
+ gistentryinit(*e, k, r, pg, o, b, l);
+ dep = (GISTENTRY *)
+ DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey],
+ PointerGetDatum(e)));
+ /* decompressFn may just return the given pointer */
+ if (dep != e)
+ gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
+ dep->bytes, dep->leafkey);
+ }
+ else
+ gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
+}
+
+
+/*
+ * initialize a GiST entry with a compressed version of key
+ */
+void
+gistcentryinit(GISTSTATE *giststate, int nkey,
+ GISTENTRY *e, Datum k, Relation r,
+ Page pg, OffsetNumber o, int b, bool l, bool isNull)
+{
+ if (!isNull)
+ {
+ GISTENTRY *cep;
+
+ gistentryinit(*e, k, r, pg, o, b, l);
+ cep = (GISTENTRY *)
+ DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey],
+ PointerGetDatum(e)));
+ /* compressFn may just return the given pointer */
+ if (cep != e)
+ gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset,
+ cep->bytes, cep->leafkey);
+ }
+ else
+ gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
+}
+
+IndexTuple
+gistFormTuple(GISTSTATE *giststate, Relation r,
+ Datum attdata[], int datumsize[], bool isnull[])
+{
+ GISTENTRY centry[INDEX_MAX_KEYS];
+ Datum compatt[INDEX_MAX_KEYS];
+ int i;
+
+ for (i = 0; i < r->rd_att->natts; i++)
+ {
+ if (isnull[i])
+ compatt[i] = (Datum) 0;
+ else
+ {
+ gistcentryinit(giststate, i, ¢ry[i], attdata[i],
+ NULL, NULL, (OffsetNumber) 0,
+ datumsize[i], FALSE, FALSE);
+ compatt[i] = centry[i].key;
+ }
+ }
+
+ return index_form_tuple(giststate->tupdesc, compatt, isnull);
+}
+
+void
+gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
+ OffsetNumber o, GISTENTRY *attdata, bool *isnull)
+{
+ int i;
+
+ for (i = 0; i < r->rd_att->natts; i++)
+ {
+ Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
+ gistdentryinit(giststate, i, &attdata[i],
+ datum, r, p, o,
+ ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]),
+ FALSE, isnull[i]);
+ }
+}
+
+static void
+gistpenalty(GISTSTATE *giststate, int attno,
+ GISTENTRY *key1, bool isNull1,
+ GISTENTRY *key2, bool isNull2, float *penalty)
+{
+ if (giststate->penaltyFn[attno].fn_strict && (isNull1 || isNull2))
+ *penalty = 0.0;
+ else
+ FunctionCall3(&giststate->penaltyFn[attno],
+ PointerGetDatum(key1),
+ PointerGetDatum(key2),
+ PointerGetDatum(penalty));
+}
+
+void
+GISTInitBuffer(Buffer b, uint32 f)
+{
+ GISTPageOpaque opaque;
+ Page page;
+ Size pageSize;
+
+ pageSize = BufferGetPageSize(b);
+ page = BufferGetPage(b);
+ PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
+
+ opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
+ opaque->flags = f;
+}
+
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * gistxlog.c
+ * WAL replay logic for GiST.
+ *
+ *
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.1 2005/06/14 11:45:13 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/gist_private.h"
+#include "access/gistscan.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "utils/memutils.h"
+
+typedef struct {
+ gistxlogEntryUpdate *data;
+ int len;
+ IndexTuple *itup;
+ BlockNumber *path;
+} EntryUpdateRecord;
+
+typedef struct {
+ gistxlogPage *header;
+ OffsetNumber *offnum;
+
+ /* to work with */
+ Page page;
+ Buffer buffer;
+ bool is_ok;
+} NewPage;
+
+typedef struct {
+ gistxlogPageSplit *data;
+ NewPage *page;
+ IndexTuple *itup;
+ BlockNumber *path;
+} PageSplitRecord;
+
+/* track for incomplete inserts, idea was taken from nbtxlog.c */
+
+typedef struct gistIncompleteInsert {
+ RelFileNode node;
+ ItemPointerData key;
+ int lenblk;
+ BlockNumber *blkno;
+ int pathlen;
+ BlockNumber *path;
+} gistIncompleteInsert;
+
+
+MemoryContext opCtx;
+MemoryContext insertCtx;
+static List *incomplete_inserts;
+
+
+#define ItemPointerEQ( a, b ) \
+ ( \
+ ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(a) && \
+ ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \
+ )
+
+static void
+pushIncompleteInsert(RelFileNode node, ItemPointerData key,
+ BlockNumber *blkno, int lenblk,
+ BlockNumber *path, int pathlen,
+ PageSplitRecord *xlinfo /* to extract blkno info */ ) {
+ MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
+ gistIncompleteInsert *ninsert = (gistIncompleteInsert*)palloc( sizeof(gistIncompleteInsert) );
+
+ ninsert->node = node;
+ ninsert->key = key;
+
+ if ( lenblk && blkno ) {
+ ninsert->lenblk = lenblk;
+ ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk );
+ memcpy(ninsert->blkno, blkno, sizeof(BlockNumber)*ninsert->lenblk);
+ } else {
+ int i;
+
+ Assert( xlinfo );
+ ninsert->lenblk = xlinfo->data->npage;
+ ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk );
+ for(i=0;i<ninsert->lenblk;i++)
+ ninsert->blkno[i] = xlinfo->page[i].header->blkno;
+ }
+ Assert( ninsert->lenblk>0 );
+
+ if ( path && ninsert->pathlen ) {
+ ninsert->pathlen = pathlen;
+ ninsert->path = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->pathlen );
+ memcpy(ninsert->path, path, sizeof(BlockNumber)*ninsert->pathlen);
+ } else {
+ ninsert->pathlen = 0;
+ ninsert->path = NULL;
+ }
+
+ incomplete_inserts = lappend(incomplete_inserts, ninsert);
+ MemoryContextSwitchTo(oldCxt);
+}
+
+static void
+forgetIncompleteInsert(RelFileNode node, ItemPointerData key) {
+ ListCell *l;
+
+ foreach(l, incomplete_inserts) {
+ gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l);
+
+ if ( RelFileNodeEquals(node, insert->node) && ItemPointerEQ( &(insert->key), &(key) ) ) {
+
+ /* found */
+ if ( insert->path ) pfree( insert->path );
+ pfree( insert->blkno );
+ incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
+ pfree( insert );
+ break;
+ }
+ }
+}
+
+static void
+decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) {
+ char *begin = XLogRecGetData(record), *ptr;
+ int i=0, addpath=0;
+
+ decoded->data = (gistxlogEntryUpdate*)begin;
+
+ if ( decoded->data->pathlen ) {
+ addpath = sizeof(BlockNumber) * decoded->data->pathlen;
+ decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
+ } else
+ decoded->path = NULL;
+
+ decoded->len=0;
+ ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath;
+ while( ptr - begin < record->xl_len ) {
+ decoded->len++;
+ ptr += IndexTupleSize( (IndexTuple)ptr );
+ }
+
+ decoded->itup=(IndexTuple*)palloc( sizeof( IndexTuple ) * decoded->len );
+
+ ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath;
+ while( ptr - begin < record->xl_len ) {
+ decoded->itup[i] = (IndexTuple)ptr;
+ ptr += IndexTupleSize( decoded->itup[i] );
+ i++;
+ }
+}
+
+
+static void
+gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
+ EntryUpdateRecord xlrec;
+ Relation reln;
+ Buffer buffer;
+ Page page;
+
+ decodeEntryUpdateRecord( &xlrec, record );
+
+ reln = XLogOpenRelation(xlrec.data->node);
+ if (!RelationIsValid(reln))
+ return;
+ buffer = XLogReadBuffer(false, reln, xlrec.data->blkno);
+ if (!BufferIsValid(buffer))
+ elog(PANIC, "gistRedoEntryUpdateRecord: block unfound");
+ page = (Page) BufferGetPage(buffer);
+
+ if ( isnewroot ) {
+ if ( !PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page)) ) {
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ return;
+ }
+ } else {
+ if ( PageIsNew((PageHeader) page) )
+ elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page");
+ if (XLByteLE(lsn, PageGetLSN(page))) {
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ return;
+ }
+ }
+
+ if ( isnewroot )
+ GISTInitBuffer(buffer, 0);
+ else if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber )
+ PageIndexTupleDelete(page, xlrec.data->todeleteoffnum);
+
+ /* add tuples */
+ if ( xlrec.len > 0 ) {
+ OffsetNumber off = (PageIsEmpty(page)) ?
+ FirstOffsetNumber
+ :
+ OffsetNumberNext(PageGetMaxOffsetNumber(page));
+
+ gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off);
+ }
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ WriteBuffer(buffer);
+
+ if ( ItemPointerIsValid( &(xlrec.data->key) ) ) {
+ if ( incomplete_inserts != NIL )
+ forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
+
+ if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO )
+ pushIncompleteInsert(xlrec.data->node, xlrec.data->key,
+ &(xlrec.data->blkno), 1,
+ xlrec.path, xlrec.data->pathlen,
+ NULL);
+ }
+}
+
+static void
+decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) {
+ char *begin = XLogRecGetData(record), *ptr;
+ int i=0, addpath = 0;
+
+ decoded->data = (gistxlogPageSplit*)begin;
+ decoded->page = (NewPage*)palloc( sizeof(NewPage) * decoded->data->npage );
+ decoded->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * decoded->data->nitup );
+
+ if ( decoded->data->pathlen ) {
+ addpath = sizeof(BlockNumber) * decoded->data->pathlen;
+ decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
+ } else
+ decoded->path = NULL;
+
+ ptr=begin+sizeof( gistxlogPageSplit ) + addpath;
+ for(i=0;i<decoded->data->nitup;i++) {
+ Assert( ptr - begin < record->xl_len );
+ decoded->itup[i] = (IndexTuple)ptr;
+ ptr += IndexTupleSize( decoded->itup[i] );
+ }
+
+ for(i=0;i<decoded->data->npage;i++) {
+ Assert( ptr - begin < record->xl_len );
+ decoded->page[i].header = (gistxlogPage*)ptr;
+ ptr += sizeof(gistxlogPage);
+
+ Assert( ptr - begin < record->xl_len );
+ decoded->page[i].offnum = (OffsetNumber*)ptr;
+ ptr += MAXALIGN( sizeof(OffsetNumber) * decoded->page[i].header->num );
+ }
+}
+
+static void
+gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
+ PageSplitRecord xlrec;
+ Relation reln;
+ Buffer buffer;
+ Page page;
+ int i, len=0;
+ IndexTuple *itup, *institup;
+ GISTPageOpaque opaque;
+ bool release=true;
+
+ decodePageSplitRecord( &xlrec, record );
+
+ reln = XLogOpenRelation(xlrec.data->node);
+ if (!RelationIsValid(reln))
+ return;
+ buffer = XLogReadBuffer( false, reln, xlrec.data->origblkno);
+ if (!BufferIsValid(buffer))
+ elog(PANIC, "gistRedoEntryUpdateRecord: block unfound");
+ page = (Page) BufferGetPage(buffer);
+ if (PageIsNew((PageHeader) page))
+ elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page");
+
+ if (XLByteLE(lsn, PageGetLSN(page))) {
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ return;
+ }
+
+ if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber )
+ PageIndexTupleDelete(page, xlrec.data->todeleteoffnum);
+
+ itup = gistextractbuffer(buffer, &len);
+ itup = gistjoinvector(itup, &len, xlrec.itup, xlrec.data->nitup);
+ institup = (IndexTuple*)palloc( sizeof(IndexTuple) * len );
+ opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
+
+ for(i=0;i<xlrec.data->npage;i++) {
+ int j;
+ NewPage *newpage = xlrec.page + i;
+
+ /* prepare itup vector */
+ for(j=0;j<newpage->header->num;j++)
+ institup[j] = itup[ newpage->offnum[j] - 1 ];
+
+ if ( newpage->header->blkno == xlrec.data->origblkno ) {
+ /* IncrBufferRefCount(buffer); */
+ newpage->page = (Page) PageGetTempPage(page, sizeof(GISTPageOpaqueData));
+ newpage->buffer = buffer;
+ newpage->is_ok=false;
+ } else {
+ newpage->buffer = XLogReadBuffer(true, reln, newpage->header->blkno);
+ if (!BufferIsValid(newpage->buffer))
+ elog(PANIC, "gistRedoPageSplitRecord: lost page");
+ newpage->page = (Page) BufferGetPage(newpage->buffer);
+ if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(newpage->page))) {
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ newpage->is_ok=true;
+ continue; /* good page */
+ } else {
+ newpage->is_ok=false;
+ GISTInitBuffer(newpage->buffer, opaque->flags & F_LEAF);
+ }
+ }
+ gistfillbuffer(reln, newpage->page, institup, newpage->header->num, FirstOffsetNumber);
+ }
+
+ for(i=0;i<xlrec.data->npage;i++) {
+ NewPage *newpage = xlrec.page + i;
+
+ if ( newpage->is_ok )
+ continue;
+
+ if ( newpage->header->blkno == xlrec.data->origblkno ) {
+ PageRestoreTempPage(newpage->page, page);
+ release = false;
+ }
+
+ PageSetLSN(newpage->page, lsn);
+ PageSetTLI(newpage->page, ThisTimeLineID);
+ LockBuffer(newpage->buffer, BUFFER_LOCK_UNLOCK);
+ WriteBuffer(newpage->buffer);
+ }
+
+ if ( release ) {
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ }
+
+ if ( ItemPointerIsValid( &(xlrec.data->key) ) ) {
+ if ( incomplete_inserts != NIL )
+ forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
+
+ pushIncompleteInsert(xlrec.data->node, xlrec.data->key,
+ NULL, 0,
+ xlrec.path, xlrec.data->pathlen,
+ &xlrec);
+ }
+}
+
+static void
+gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) {
+ RelFileNode *node = (RelFileNode*)XLogRecGetData(record);
+ Relation reln;
+ Buffer buffer;
+ Page page;
+
+ reln = XLogOpenRelation(*node);
+ if (!RelationIsValid(reln))
+ return;
+ buffer = XLogReadBuffer( true, reln, GIST_ROOT_BLKNO);
+ if (!BufferIsValid(buffer))
+ elog(PANIC, "gistRedoCreateIndex: block unfound");
+ page = (Page) BufferGetPage(buffer);
+
+ if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page))) {
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ return;
+ }
+
+ GISTInitBuffer(buffer, F_LEAF);
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ WriteBuffer(buffer);
+}
+
+void
+gist_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
+
+ MemoryContext oldCxt;
+ oldCxt = MemoryContextSwitchTo(opCtx);
+ switch (info) {
+ case XLOG_GIST_ENTRY_UPDATE:
+ case XLOG_GIST_ENTRY_DELETE:
+ gistRedoEntryUpdateRecord(lsn, record,false);
+ break;
+ case XLOG_GIST_NEW_ROOT:
+ gistRedoEntryUpdateRecord(lsn, record,true);
+ break;
+ case XLOG_GIST_PAGE_SPLIT:
+ gistRedoPageSplitRecord(lsn, record);
+ break;
+ case XLOG_GIST_CREATE_INDEX:
+ gistRedoCreateIndex(lsn, record);
+ break;
+ case XLOG_GIST_INSERT_COMPLETE:
+ forgetIncompleteInsert( ((gistxlogInsertComplete*)XLogRecGetData(record))->node,
+ ((gistxlogInsertComplete*)XLogRecGetData(record))->key );
+ break;
+ default:
+ elog(PANIC, "gist_redo: unknown op code %u", info);
+ }
+
+ MemoryContextSwitchTo(oldCxt);
+ MemoryContextReset(opCtx);
+}
+
+static void
+out_target(char *buf, RelFileNode node, ItemPointerData key)
+{
+ sprintf(buf + strlen(buf), "rel %u/%u/%u; tid %u/%u",
+ node.spcNode, node.dbNode, node.relNode,
+ ItemPointerGetBlockNumber(&key),
+ ItemPointerGetOffsetNumber(&key));
+}
+
+static void
+out_gistxlogEntryUpdate(char *buf, gistxlogEntryUpdate *xlrec) {
+ out_target(buf, xlrec->node, xlrec->key);
+ sprintf(buf + strlen(buf), "; block number %u; update offset %u;",
+ xlrec->blkno, xlrec->todeleteoffnum);
+}
+
+static void
+out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) {
+ strcat(buf, "page_split: ");
+ out_target(buf, xlrec->node, xlrec->key);
+ sprintf(buf + strlen(buf), "; block number %u; update offset %u; add %d tuples; split to %d pages",
+ xlrec->origblkno, xlrec->todeleteoffnum,
+ xlrec->nitup, xlrec->npage);
+}
+
+void
+gist_desc(char *buf, uint8 xl_info, char *rec)
+{
+ uint8 info = xl_info & ~XLR_INFO_MASK;
+
+ switch (info) {
+ case XLOG_GIST_ENTRY_UPDATE:
+ strcat(buf, "entry_update: ");
+ out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate*)rec);
+ break;
+ case XLOG_GIST_ENTRY_DELETE:
+ strcat(buf, "entry_delete: ");
+ out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate*)rec);
+ break;
+ case XLOG_GIST_NEW_ROOT:
+ strcat(buf, "new_root: ");
+ out_target(buf, ((gistxlogEntryUpdate*)rec)->node, ((gistxlogEntryUpdate*)rec)->key);
+ break;
+ case XLOG_GIST_PAGE_SPLIT:
+ out_gistxlogPageSplit(buf, (gistxlogPageSplit*)rec);
+ break;
+ case XLOG_GIST_CREATE_INDEX:
+ sprintf(buf + strlen(buf), "create_index: rel %u/%u/%u",
+ ((RelFileNode*)rec)->spcNode,
+ ((RelFileNode*)rec)->dbNode,
+ ((RelFileNode*)rec)->relNode);
+ break;
+ case XLOG_GIST_INSERT_COMPLETE:
+ strcat(buf, "insert_complete: ");
+ out_target(buf, ((gistxlogInsertComplete*)rec)->node, ((gistxlogInsertComplete*)rec)->key);
+ default:
+ elog(PANIC, "gist_desc: unknown op code %u", info);
+ }
+}
+
+
+#ifdef GIST_INCOMPLETE_INSERT
+static void
+gistContinueInsert(gistIncompleteInsert *insert) {
+ GISTSTATE giststate;
+ GISTInsertState state;
+ int i;
+ MemoryContext oldCxt;
+ oldCxt = MemoryContextSwitchTo(opCtx);
+
+ state.r = XLogOpenRelation(insert->node);
+ if (!RelationIsValid(state.r))
+ return;
+
+ initGISTstate(&giststate, state.r);
+
+ state.needInsertComplete=false;
+ ItemPointerSetInvalid( &(state.key) );
+ state.path=NULL;
+ state.pathlen=0;
+ state.xlog_mode = true;
+
+ /* form union tuples */
+ state.itup = (IndexTuple*)palloc(sizeof(IndexTuple)*insert->lenblk);
+ state.ituplen = insert->lenblk;
+ for(i=0;i<insert->lenblk;i++) {
+ int len=0;
+ IndexTuple *itup;
+ Buffer buffer;
+ Page page;
+
+ buffer = XLogReadBuffer(false, state.r, insert->blkno[i]);
+ if (!BufferIsValid(buffer))
+ elog(PANIC, "gistContinueInsert: block unfound");
+ page = (Page) BufferGetPage(buffer);
+ if ( PageIsNew((PageHeader)page) )
+ elog(PANIC, "gistContinueInsert: uninitialized page");
+
+ itup = gistextractbuffer(buffer, &len);
+ state.itup[i] = gistunion(state.r, itup, len, &giststate);
+
+ ItemPointerSet( &(state.itup[i]->t_tid), insert->blkno[i], FirstOffsetNumber );
+
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ }
+
+ if ( insert->pathlen==0 ) {
+ /*it was split root, so we should only make new root*/
+ gistnewroot(state.r, state.itup, state.ituplen, &(state.key), true);
+ MemoryContextSwitchTo(oldCxt);
+ MemoryContextReset(opCtx);
+ return;
+ }
+
+ /* form stack */
+ state.stack=NULL;
+ for(i=0;i<insert->pathlen;i++) {
+ int j,len=0;
+ IndexTuple *itup;
+ GISTInsertStack *top = (GISTInsertStack*)palloc( sizeof(GISTInsertStack) );
+
+ top->blkno = insert->path[i];
+ top->buffer = XLogReadBuffer(false, state.r, top->blkno);
+ if (!BufferIsValid(top->buffer))
+ elog(PANIC, "gistContinueInsert: block unfound");
+ top->page = (Page) BufferGetPage(top->buffer);
+ if ( PageIsNew((PageHeader)(top->page)) )
+ elog(PANIC, "gistContinueInsert: uninitialized page");
+
+ top->todelete = false;
+
+ /* find childoffnum */
+ itup = gistextractbuffer(top->buffer, &len);
+ top->childoffnum=InvalidOffsetNumber;
+ for(j=0;j<len && top->childoffnum==InvalidOffsetNumber;j++) {
+ BlockNumber blkno = ItemPointerGetBlockNumber( &(itup[j]->t_tid) );
+
+ if ( i==0 ) {
+ int k;
+ for(k=0;k<insert->lenblk;k++)
+ if ( insert->blkno[k] == blkno ) {
+ top->childoffnum = j+1;
+ break;
+ }
+ } else if ( insert->path[i-1]==blkno )
+ top->childoffnum = j+1;
+ }
+
+ if ( top->childoffnum==InvalidOffsetNumber ) {
+ elog(WARNING, "gistContinueInsert: unknown parent, REINDEX GiST Indexes");
+ return;
+ }
+
+ if ( i==0 )
+ PageIndexTupleDelete(top->page, top->childoffnum);
+
+ /* install item on right place in stack */
+ top->parent=NULL;
+ if ( state.stack ) {
+ GISTInsertStack *ptr = state.stack;
+ while( ptr->parent )
+ ptr = ptr->parent;
+ ptr->parent=top;
+ } else
+ state.stack = top;
+ }
+
+ /* Good. Now we can continue insert */
+
+ gistmakedeal(&state, &giststate);
+
+ MemoryContextSwitchTo(oldCxt);
+ MemoryContextReset(opCtx);
+}
+#endif
+
+void
+gist_xlog_startup(void) {
+ incomplete_inserts=NIL;
+ insertCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "GiST insert in xlog temporary context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ opCtx = createTempGistContext();
+}
+
+void
+gist_xlog_cleanup(void) {
+ ListCell *l;
+
+ foreach(l, incomplete_inserts) {
+ gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l);
+ char buf[1024];
+
+ *buf='\0';
+ out_target(buf, insert->node, insert->key);
+ elog(LOG,"Incomplete insert: %s; It's needed to reindex", buf);
+#ifdef GIST_INCOMPLETE_INSERT
+ gistContinueInsert(insert);
+#endif
+ }
+ MemoryContextDelete(opCtx);
+ MemoryContextDelete(insertCtx);
+}
+