From: Marc G. Fournier Date: Fri, 12 Jan 2001 00:12:58 +0000 (+0000) Subject: New feature: X-Git-Tag: REL7_1~858 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=0ad7db4be4b1f0208271c49fc1c8348f11ebc5b3;p=postgresql New feature: 1. Support of variable size keys - new algorithm of insertion to tree (GLI - gist layrered insertion). Previous algorithm was implemented as described in paper by Joseph M. Hellerstein et.al "Generalized Search Trees for Database Systems". This (old) algorithm was not suitable for variable size keys and could be not effective ( walking up-down ) in case of multiple levels split Bug fixed: 1. fixed bug in gistPageAddItem - key values were written to disk uncompressed. This caused failure if decompression function does real job. 2. NULLs handling - we keep NULLs in tree. Right way is to remove them, but we don't know how to inform vacuum about index statistics. This is just cosmetic warning message (like in case with R-Tree), but I'm not sure how to recognize real problem if we remove NULLs and suppress this warning as Tom suggested. 3. various memory leaks This work was done by Teodor Sigaev (teodor@stack.net) and Oleg Bartunov (oleg@sai.msu.su). --- diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 7b30aa72a9..969b3309f7 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -6,7 +6,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.67 2000/11/30 08:46:20 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.68 2001/01/12 00:12:58 scrappy Exp $ * *------------------------------------------------------------------------- */ @@ -25,37 +25,61 @@ #include "access/xlogutils.h" +/* result's status */ +#define INSERTED 0x01 +#define SPLITED 0x02 + /* non-export function prototypes */ -static InsertIndexResult gistdoinsert(Relation r, IndexTuple itup, - GISTSTATE *GISTstate); -static InsertIndexResult gistentryinsert(Relation r, GISTSTACK *stk, - IndexTuple tup, - GISTSTATE *giststate); -static void gistentryinserttwo(Relation r, GISTSTACK *stk, IndexTuple ltup, - IndexTuple rtup, GISTSTATE *giststate); -static void gistAdjustKeys(Relation r, GISTSTACK *stk, BlockNumber blk, - char *datum, int att_size, GISTSTATE *giststate); -static void gistintinsert(Relation r, GISTSTACK *stk, IndexTuple ltup, - IndexTuple rtup, GISTSTATE *giststate); -static InsertIndexResult gistSplit(Relation r, Buffer buffer, - GISTSTACK *stack, IndexTuple itup, - GISTSTATE *giststate); -static void gistnewroot(GISTSTATE *giststate, Relation r, IndexTuple lt, - IndexTuple rt); +static void gistdoinsert(Relation r, + IndexTuple itup, + InsertIndexResult *res, + GISTSTATE *GISTstate); +static int gistlayerinsert( Relation r, BlockNumber blkno, + IndexTuple **itup, + int *len, + InsertIndexResult *res, + GISTSTATE *giststate ); +static OffsetNumber gistwritebuffer( Relation r, + Page page, + IndexTuple *itup, + int len, + OffsetNumber off, + GISTSTATE *giststate ); +static int gistnospace( Page page, + IndexTuple *itvec, int len ); +static IndexTuple * gistreadbuffer( Relation r, + Buffer buffer, int *len ); +static IndexTuple * gistjoinvector( + IndexTuple *itvec, int *len, + IndexTuple *additvec, int addlen ); +static IndexTuple gistunion( Relation r, IndexTuple *itvec, + int len, GISTSTATE *giststate ); +static IndexTuple gistgetadjusted( Relation r, + IndexTuple oldtup, + IndexTuple addtup, + GISTSTATE *giststate ); +static IndexTuple * gistSplit(Relation r, + Buffer buffer, + IndexTuple *itup, + int *len, + GISTSTATE *giststate, + InsertIndexResult *res); +static void gistnewroot(GISTSTATE *giststate, Relation r, + IndexTuple *itup, int len); static void GISTInitBuffer(Buffer b, uint32 f); -static BlockNumber gistChooseSubtree(Relation r, IndexTuple itup, int level, - GISTSTATE *giststate, - GISTSTACK **retstack, Buffer *leafbuf); -static OffsetNumber gistchoose(Relation r, Page p, IndexTuple it, - GISTSTATE *giststate); -static int gistnospace(Page p, IndexTuple it); -static IndexTuple gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t); -static void gistcentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, - Relation r, Page pg, OffsetNumber o, int b, bool l); - +static OffsetNumber gistchoose(Relation r, Page p, + IndexTuple it, + GISTSTATE *giststate); +static IndexTuple gist_tuple_replacekey(Relation r, + GISTENTRY entry, IndexTuple t); +static void gistcentryinit(GISTSTATE *giststate, + GISTENTRY *e, char *pr, + Relation r, Page pg, + OffsetNumber o, int b, bool l); + +#undef GISTDEBUG #ifdef GISTDEBUG -static char *int_range_out(INTRANGE *r); - +static void gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff); #endif /* @@ -86,7 +110,6 @@ gistbuild(PG_FUNCTION_ARGS) TupleTableSlot *slot; #endif ExprContext *econtext; - InsertIndexResult res = NULL; GISTSTATE giststate; GISTENTRY tmpcentry; Buffer buffer = InvalidBuffer; @@ -223,14 +246,13 @@ gistbuild(PG_FUNCTION_ARGS) * not when you're initializing the whole index at once. */ - res = gistdoinsert(index, itup, &giststate); + gistdoinsert(index, itup, NULL, &giststate); for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++) if (compvec[i]) pfree(DatumGetPointer(attdata[i])); pfree(itup); - pfree(res); } /* okay, all heap tuples are indexed */ @@ -274,6 +296,10 @@ gistbuild(PG_FUNCTION_ARGS) } } +#ifdef GISTDEBUG +gist_dumptree(index, 0, GISTP_ROOT, 0); +#endif + PG_RETURN_VOID(); } @@ -324,7 +350,8 @@ gistinsert(PG_FUNCTION_ARGS) * RelationSetLockForWrite(r); */ - res = gistdoinsert(r, itup, &giststate); + res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); + gistdoinsert(r, itup, &res, &giststate); for (i = 0; i < r->rd_att->natts; i++) if (compvec[i] == TRUE) pfree((char *) datum[i]); @@ -353,6 +380,7 @@ gistPageAddItem(GISTSTATE *giststate, { GISTENTRY tmpcentry; IndexTuple itup = (IndexTuple) item; + OffsetNumber retval; /* * recompress the item given that we now know the exact page and @@ -364,295 +392,353 @@ gistPageAddItem(GISTSTATE *giststate, IndexTupleSize(itup) - sizeof(IndexTupleData), FALSE); gistcentryinit(giststate, &tmpcentry, dentry->pred, r, page, offsetNumber, dentry->bytes, FALSE); - *newtup = gist_tuple_replacekey(r, *dentry, itup); + *newtup = gist_tuple_replacekey(r, tmpcentry, itup); + retval = PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup), + offsetNumber, flags); /* be tidy */ - if (tmpcentry.pred != dentry->pred + if (tmpcentry.pred && tmpcentry.pred != dentry->pred && tmpcentry.pred != (((char *) itup) + sizeof(IndexTupleData))) pfree(tmpcentry.pred); - - return (PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup), - offsetNumber, flags)); + return (retval); } +static void +gistdoinsert( Relation r, + IndexTuple itup, + InsertIndexResult *res, + GISTSTATE *giststate ) { + IndexTuple *instup; + int i,ret,len = 1; + + instup = ( IndexTuple* ) palloc( sizeof(IndexTuple) ); + instup[0] = ( IndexTuple ) palloc( IndexTupleSize( itup ) ); + memcpy( instup[0], itup, IndexTupleSize( itup ) ); + + ret = gistlayerinsert(r, GISTP_ROOT, &instup, &len, res, giststate); + if ( ret & SPLITED ) + gistnewroot( giststate, r, instup, len ); -static InsertIndexResult -gistdoinsert(Relation r, - IndexTuple itup, /* itup contains compressed entry */ - GISTSTATE *giststate) -{ - GISTENTRY tmpdentry; - InsertIndexResult res; - OffsetNumber l; - GISTSTACK *stack; - Buffer buffer; - BlockNumber blk; - Page page; - OffsetNumber off; - IndexTuple newtup; + for(i=0;iflags & F_LEAF)) { + /* internal page, so we must walk on tree */ + /* len IS equial 1 */ + ItemId iid; + BlockNumber nblkno; + ItemPointerData oldtid; + IndexTuple oldtup; + + child = gistchoose( r, page, *(*itup), giststate ); + iid = PageGetItemId(page, child); + oldtup = (IndexTuple) PageGetItem(page, iid); + nblkno = ItemPointerGetBlockNumber(&(oldtup->t_tid)); + + /* + * After this call: + * 1. if child page was splited, then itup contains + * keys for each page + * 2. if child page wasn't splited, then itup contains + * additional for adjustement of current key + */ + ret = gistlayerinsert( r, nblkno, itup, len, res, giststate ); - /* add the item and write the buffer */ - l = gistPageAddItem(giststate, r, page, (Item) itup, IndexTupleSize(itup), - off, LP_USED, &tmpdentry, &newtup); - WriteBuffer(buffer); + /* nothing inserted in child */ + if ( ! (ret & INSERTED) ) { + ReleaseBuffer(buffer); + return 0x00; + } - /* now expand the page boundary in the parent to include the new child */ - gistAdjustKeys(r, stack, blk, tmpdentry.pred, tmpdentry.bytes, giststate); - gistfreestack(stack); + /* child does not splited */ + if ( ! (ret & SPLITED) ) { + IndexTuple newtup = gistgetadjusted( r, oldtup, (*itup)[0], giststate ); + if ( ! newtup ) { + /* not need to update key */ + ReleaseBuffer(buffer); + return 0x00; + } - /* be tidy */ - if (itup != newtup) - pfree(newtup); - if (tmpdentry.pred != (((char *) itup) + sizeof(IndexTupleData))) - pfree(tmpdentry.pred); + pfree( (*itup)[0] ); /* !!! */ + (*itup)[0] = newtup; + } - /* build and return an InsertIndexResult for this insertion */ - res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); - ItemPointerSet(&(res->pointerData), blk, l); + /* key is modified, so old version must be deleted */ + ItemPointerSet(&oldtid, blkno, child); + DirectFunctionCall2(gistdelete, + PointerGetDatum(r), + PointerGetDatum(&oldtid)); + } - return res; -} + ret = INSERTED; + + if ( gistnospace(page, (*itup), *len) ) { + /* no space for insertion */ + IndexTuple *itvec; + int tlen; + + ret |= SPLITED; + itvec = gistreadbuffer( r, buffer, &tlen ); + itvec = gistjoinvector( itvec, &tlen, (*itup), *len ); + pfree( (*itup) ); + (*itup) = gistSplit( r, buffer, itvec, &tlen, giststate, + (opaque->flags & F_LEAF) ? res : NULL ); /*res only for inserting in leaf*/ + ReleaseBuffer( buffer ); + pfree( itvec ); + *len = tlen; /* now tlen >= 2 */ + } else { + /* enogth space */ + OffsetNumber off, l; + + off = ( PageIsEmpty(page) ) ? + FirstOffsetNumber + : + OffsetNumberNext(PageGetMaxOffsetNumber(page)); + l = gistwritebuffer( r, page, (*itup), *len, off, giststate ); + WriteBuffer(buffer); + /* set res if insert into leaf page, in + this case, len = 1 always */ + if ( res && (opaque->flags & F_LEAF) ) + ItemPointerSet(&((*res)->pointerData), blkno, l); + + if ( *len > 1 ) { /* previos insert ret & SPLITED != 0 */ + int i; + /* child was splited, so we must form union + * for insertion in parent */ + IndexTuple newtup = gistunion(r, (*itup), *len, giststate); + for(i=0; i<*len; i++) + pfree( (*itup)[i] ); + (*itup)[0] = newtup; + *len = 1; + } + } + + return ret; +} -static BlockNumber -gistChooseSubtree(Relation r, IndexTuple itup, /* itup has compressed - * entry */ - int level, - GISTSTATE *giststate, - GISTSTACK **retstack /* out */ , - Buffer *leafbuf /* out */ ) -{ - Buffer buffer; - BlockNumber blk; - GISTSTACK *stack; - Page page; - GISTPageOpaque opaque; - IndexTuple which; +/* + * Write itup vector to page, has no control of free space + */ +static OffsetNumber +gistwritebuffer( Relation r, Page page, IndexTuple *itup, + int len, OffsetNumber off, GISTSTATE *giststate) { + OffsetNumber l = InvalidOffsetNumber; + int i; + GISTENTRY tmpdentry; + IndexTuple newtup; + + for(i=0; iflags & F_LEAF)) - { - GISTSTACK *n; - ItemId iid; - - n = (GISTSTACK *) palloc(sizeof(GISTSTACK)); - n->gs_parent = stack; - n->gs_blk = blk; - n->gs_child = gistchoose(r, page, itup, giststate); - stack = n; - - iid = PageGetItemId(page, n->gs_child); - which = (IndexTuple) PageGetItem(page, iid); - blk = ItemPointerGetBlockNumber(&(which->t_tid)); - } - } while (!(opaque->flags & F_LEAF)); + *len=0; + maxoff = PageGetMaxOffsetNumber(p); + itvec = palloc( sizeof(IndexTuple) * maxoff ); + for(i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + itvec[ (*len)++ ] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); - *retstack = stack; - *leafbuf = buffer; + return itvec; +} - return blk; +/* + * join two vectors into one + */ +static IndexTuple * +gistjoinvector( IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen ) { + itvec = (IndexTuple*) repalloc( (void*)itvec, sizeof(IndexTuple) * ( (*len) + addlen ) ); + memmove( &itvec[*len], additvec, sizeof(IndexTuple) * addlen ); + *len += addlen; + return itvec; } +/* + * return union of itup vector + */ +static IndexTuple +gistunion( Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate ) { + bytea *evec; + char *datum; + int datumsize, i; + GISTENTRY centry; + char isnull; + IndexTuple newtup; + + evec = (bytea *) palloc(len * sizeof(GISTENTRY) + VARHDRSZ); + VARATT_SIZEP(evec) = len * sizeof(GISTENTRY) + VARHDRSZ; + + for ( i = 0 ; i< len ; i++ ) + gistdentryinit(giststate, &((GISTENTRY *) VARDATA(evec))[i], + (char*) itvec[i] + sizeof(IndexTupleData), + (Relation)NULL, (Page)NULL, (OffsetNumber)NULL, + IndexTupleSize((IndexTuple)itvec[i]) - sizeof(IndexTupleData), FALSE); -static void -gistAdjustKeys(Relation r, - GISTSTACK *stk, - BlockNumber blk, - char *datum, /* datum is uncompressed */ - int att_size, - GISTSTATE *giststate) -{ - char *oldud; - Page p; - Buffer b; - bool result; - bytea *evec; - GISTENTRY centry, - *ev0p, - *ev1p; - int size, - datumsize; - IndexTuple tid; + datum = (char *) + DatumGetPointer(FunctionCall2(&giststate->unionFn, + PointerGetDatum(evec), + PointerGetDatum(&datumsize))); - if (stk == (GISTSTACK *) NULL) - return; + for ( i = 0 ; i< len ; i++ ) + if ( ((GISTENTRY *) VARDATA(evec))[i].pred && + ((GISTENTRY *) VARDATA(evec))[i].pred != + ((char*)( itvec[i] )+ sizeof(IndexTupleData)) ) + pfree( ((GISTENTRY *) VARDATA(evec))[i].pred ); + + pfree( evec ); - b = ReadBuffer(r, stk->gs_blk); - p = BufferGetPage(b); + gistcentryinit(giststate, ¢ry, datum, + (Relation)NULL, (Page)NULL, (OffsetNumber)NULL, + datumsize, FALSE); + + isnull = (centry.pred) ? ' ' : 'n'; + newtup = (IndexTuple) index_formtuple( r->rd_att, (Datum *) ¢ry.pred, &isnull ); + if (centry.pred != datum) + pfree( datum ); - oldud = (char *) PageGetItem(p, PageGetItemId(p, stk->gs_child)); - tid = (IndexTuple) oldud; - size = IndexTupleSize((IndexTuple) oldud) - sizeof(IndexTupleData); - oldud += sizeof(IndexTupleData); + return newtup; +} +/* + * Forms union of oldtup and addtup, if union == oldtup then return NULL + */ +static IndexTuple +gistgetadjusted( Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate ) { + bytea *evec; + char *datum; + int datumsize; + bool result; + char isnull; + GISTENTRY centry, *ev0p, *ev1p; + IndexTuple newtup = NULL; + evec = (bytea *) palloc(2 * sizeof(GISTENTRY) + VARHDRSZ); VARATT_SIZEP(evec) = 2 * sizeof(GISTENTRY) + VARHDRSZ; - /* insert decompressed oldud into entry vector */ gistdentryinit(giststate, &((GISTENTRY *) VARDATA(evec))[0], - oldud, r, p, stk->gs_child, - size, FALSE); + (char*) oldtup + sizeof(IndexTupleData), (Relation) NULL, + (Page) NULL, (OffsetNumber) 0, + IndexTupleSize((IndexTuple)oldtup) - sizeof(IndexTupleData), FALSE); ev0p = &((GISTENTRY *) VARDATA(evec))[0]; - /* insert datum entry into entry vector */ - gistentryinit(((GISTENTRY *) VARDATA(evec))[1], datum, - (Relation) NULL, (Page) NULL, (OffsetNumber) 0, att_size, FALSE); + gistdentryinit(giststate, &((GISTENTRY *) VARDATA(evec))[1], + (char*) addtup + sizeof(IndexTupleData), (Relation) NULL, + (Page) NULL, (OffsetNumber) 0, + IndexTupleSize((IndexTuple)addtup) - sizeof(IndexTupleData), FALSE); ev1p = &((GISTENTRY *) VARDATA(evec))[1]; - /* form union of decompressed entries */ datum = (char *) DatumGetPointer(FunctionCall2(&giststate->unionFn, - PointerGetDatum(evec), - PointerGetDatum(&datumsize))); - - /* did union leave decompressed version of oldud unchanged? */ - FunctionCall3(&giststate->equalFn, - PointerGetDatum(ev0p->pred), - PointerGetDatum(datum), - PointerGetDatum(&result)); - if (!result) - { - TupleDesc td = RelationGetDescr(r); + PointerGetDatum(evec), + PointerGetDatum(&datumsize))); + + if ( ! ( ev0p->pred && ev1p->pred ) ) { + result = ( ev0p->pred == NULL && ev1p->pred == NULL ); + } else { + FunctionCall3(&giststate->equalFn, + PointerGetDatum(ev0p->pred), + PointerGetDatum(datum), + PointerGetDatum(&result)); + } - /* compress datum for storage on page */ + if ( result ) { + /* not need to update key */ + pfree( datum ); + } else { gistcentryinit(giststate, ¢ry, datum, ev0p->rel, ev0p->page, - ev0p->offset, datumsize, FALSE); - if (td->attrs[0]->attlen >= 0) - { - memmove(oldud, centry.pred, att_size); - gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, datum, att_size, - giststate); - } - else if (VARSIZE(centry.pred) == VARSIZE(oldud)) - { - memmove(oldud, centry.pred, VARSIZE(centry.pred)); - gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, datum, att_size, - giststate); - } - else - { - - /* - * * new datum is not the same size as the old. * We have to - * delete the old entry and insert the new * one. Note that - * this may cause a split here! - */ - IndexTuple newtup; - ItemPointerData oldtid; - char *isnull; - TupleDesc tupDesc; - InsertIndexResult res; - - /* delete old tuple */ - ItemPointerSet(&oldtid, stk->gs_blk, stk->gs_child); - DirectFunctionCall2(gistdelete, - PointerGetDatum(r), - PointerGetDatum(&oldtid)); - - /* generate and insert new tuple */ - tupDesc = r->rd_att; - isnull = (char *) palloc(r->rd_rel->relnatts); - MemSet(isnull, ' ', r->rd_rel->relnatts); - newtup = (IndexTuple) index_formtuple(tupDesc, - (Datum *) ¢ry.pred, isnull); - pfree(isnull); - /* set pointer in new tuple to point to current child */ - ItemPointerSet(&oldtid, blk, 1); - newtup->t_tid = oldtid; - - /* inserting the new entry also adjust keys above */ - res = gistentryinsert(r, stk, newtup, giststate); - - /* in stack, set info to point to new tuple */ - stk->gs_blk = ItemPointerGetBlockNumber(&(res->pointerData)); - stk->gs_child = ItemPointerGetOffsetNumber(&(res->pointerData)); - - pfree(res); - } - WriteBuffer(b); + ev0p->offset, datumsize, FALSE); + isnull = (centry.pred) ? ' ' : 'n'; + newtup = (IndexTuple) index_formtuple( r->rd_att, (Datum *) ¢ry.pred, &isnull ); + newtup->t_tid = oldtup->t_tid; if (centry.pred != datum) - pfree(datum); + pfree( datum ); } - else - ReleaseBuffer(b); - pfree(evec); -} + if ( ev0p->pred && + ev0p->pred != (char*) oldtup + sizeof(IndexTupleData) ) + pfree( ev0p->pred ); + if ( ev1p->pred && + ev1p->pred != (char*) addtup + sizeof(IndexTupleData) ) + pfree( ev1p->pred ); + pfree( evec ); + + return newtup; +} + /* * gistSplit -- split a page in the tree. - * */ -static InsertIndexResult +static IndexTuple * gistSplit(Relation r, Buffer buffer, - GISTSTACK *stack, - IndexTuple itup, /* contains compressed entry */ - GISTSTATE *giststate) + IndexTuple *itup, /* contains compressed entry */ + int *len, + GISTSTATE *giststate, + InsertIndexResult *res) { Page p; - Buffer leftbuf, - rightbuf; - Page left, - right; - ItemId itemid; - IndexTuple item; - IndexTuple ltup, - rtup, - newtup; - OffsetNumber maxoff; - OffsetNumber i; - OffsetNumber leftoff, - rightoff; - BlockNumber lbknum, - rbknum; - BlockNumber bufblock; + Buffer leftbuf, rightbuf; + Page left, right; + OffsetNumber *spl_left, *spl_right; + IndexTuple *lvectup, *rvectup, *newtup; + int leftoff, rightoff; + BlockNumber lbknum, rbknum; GISTPageOpaque opaque; - int blank; - InsertIndexResult res; - char *isnull; + char isnull; GIST_SPLITVEC v; - TupleDesc tupDesc; bytea *entryvec; bool *decompvec; - IndexTuple item_1; - GISTENTRY tmpdentry, - tmpentry; + GISTENTRY tmpentry; + int i, nlen; - isnull = (char *) palloc(r->rd_rel->relnatts); - for (blank = 0; blank < r->rd_rel->relnatts; blank++) - isnull[blank] = ' '; p = (Page) BufferGetPage(buffer); opaque = (GISTPageOpaque) PageGetSpecialPointer(p); @@ -684,313 +770,138 @@ gistSplit(Relation r, right = (Page) BufferGetPage(rightbuf); /* generate the item array */ - maxoff = PageGetMaxOffsetNumber(p); - entryvec = (bytea *) palloc(VARHDRSZ + (maxoff + 2) * sizeof(GISTENTRY)); - decompvec = (bool *) palloc(VARHDRSZ + (maxoff + 2) * sizeof(bool)); - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + entryvec = (bytea *) palloc(VARHDRSZ + (*len+1) * sizeof(GISTENTRY)); + decompvec = (bool *) palloc(VARHDRSZ + (*len+1) * sizeof(bool)); + VARATT_SIZEP(entryvec) = (*len+1) * sizeof(GISTENTRY) + VARHDRSZ; + for (i = 1; i <= *len; i++) { - item_1 = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); gistdentryinit(giststate, &((GISTENTRY *) VARDATA(entryvec))[i], - (((char *) item_1) + sizeof(IndexTupleData)), + (((char *) itup[i-1]) + sizeof(IndexTupleData)), r, p, i, - IndexTupleSize(item_1) - sizeof(IndexTupleData), FALSE); + IndexTupleSize(itup[i-1]) - sizeof(IndexTupleData), FALSE); if ((char *) (((GISTENTRY *) VARDATA(entryvec))[i].pred) - == (((char *) item_1) + sizeof(IndexTupleData))) + == (((char *) itup[i-1]) + sizeof(IndexTupleData))) decompvec[i] = FALSE; else decompvec[i] = TRUE; } - /* add the new datum as the last entry */ - gistdentryinit(giststate, &(((GISTENTRY *) VARDATA(entryvec))[maxoff + 1]), - (((char *) itup) + sizeof(IndexTupleData)), - (Relation) NULL, (Page) NULL, - (OffsetNumber) 0, tmpentry.bytes, FALSE); - if ((char *) (((GISTENTRY *) VARDATA(entryvec))[maxoff + 1]).pred != - (((char *) itup) + sizeof(IndexTupleData))) - decompvec[maxoff + 1] = TRUE; - else - decompvec[maxoff + 1] = FALSE; - - VARATT_SIZEP(entryvec) = (maxoff + 2) * sizeof(GISTENTRY) + VARHDRSZ; - /* now let the user-defined picksplit function set up the split vector */ FunctionCall2(&giststate->picksplitFn, - PointerGetDatum(entryvec), - PointerGetDatum(&v)); - - /* compress ldatum and rdatum */ - gistcentryinit(giststate, &tmpentry, v.spl_ldatum, (Relation) NULL, - (Page) NULL, (OffsetNumber) 0, - ((GISTENTRY *) VARDATA(entryvec))[i].bytes, FALSE); - if (v.spl_ldatum != tmpentry.pred) - pfree(v.spl_ldatum); - v.spl_ldatum = tmpentry.pred; - - gistcentryinit(giststate, &tmpentry, v.spl_rdatum, (Relation) NULL, - (Page) NULL, (OffsetNumber) 0, - ((GISTENTRY *) VARDATA(entryvec))[i].bytes, FALSE); - if (v.spl_rdatum != tmpentry.pred) - pfree(v.spl_rdatum); - v.spl_rdatum = tmpentry.pred; + PointerGetDatum(entryvec), + PointerGetDatum(&v)); /* clean up the entry vector: its preds need to be deleted, too */ - for (i = FirstOffsetNumber; i <= maxoff + 1; i = OffsetNumberNext(i)) - if (decompvec[i]) + for (i = 1; i <= *len; i++) + if (decompvec[i] && ((GISTENTRY *) VARDATA(entryvec))[i].pred) pfree(((GISTENTRY *) VARDATA(entryvec))[i].pred); pfree(entryvec); pfree(decompvec); - leftoff = rightoff = FirstOffsetNumber; - maxoff = PageGetMaxOffsetNumber(p); - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) - { - itemid = PageGetItemId(p, i); - item = (IndexTuple) PageGetItem(p, itemid); - - if (i == *(v.spl_left)) - { - gistPageAddItem(giststate, r, left, (Item) item, - IndexTupleSize(item), - leftoff, LP_USED, &tmpdentry, &newtup); - leftoff = OffsetNumberNext(leftoff); - v.spl_left++; /* advance in left split vector */ - /* be tidy */ - if (tmpdentry.pred != (((char *) item) + sizeof(IndexTupleData))) - pfree(tmpdentry.pred); - if ((IndexTuple) item != newtup) - pfree(newtup); - } - else - { - gistPageAddItem(giststate, r, right, (Item) item, - IndexTupleSize(item), - rightoff, LP_USED, &tmpdentry, &newtup); - rightoff = OffsetNumberNext(rightoff); - v.spl_right++; /* advance in right split vector */ - /* be tidy */ - if (tmpdentry.pred != (((char *) item) + sizeof(IndexTupleData))) - pfree(tmpdentry.pred); - if (item != newtup) - pfree(newtup); + spl_left = v.spl_left; spl_right = v.spl_right; + + /* form left and right vector */ + lvectup = (IndexTuple*) palloc( sizeof( IndexTuple )*v.spl_nleft ); + rvectup = (IndexTuple*) palloc( sizeof( IndexTuple )*v.spl_nright ); + leftoff = rightoff = 0; + for( i=1; i <= *len; i++ ) { + if (i == *(spl_left) || ( i==*len && *(spl_left) != FirstOffsetNumber ) ) { + lvectup[ leftoff++ ] = itup[ i-1 ]; + spl_left++; + } else { + rvectup[ rightoff++ ] = itup[ i-1 ]; + spl_right++; } } - /* build an InsertIndexResult for this insertion */ - res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); - - /* now insert the new index tuple */ - if (*(v.spl_left) != FirstOffsetNumber) - { - gistPageAddItem(giststate, r, left, (Item) itup, - IndexTupleSize(itup), - leftoff, LP_USED, &tmpdentry, &newtup); - leftoff = OffsetNumberNext(leftoff); - ItemPointerSet(&(res->pointerData), lbknum, leftoff); - /* be tidy */ - if (tmpdentry.pred != (((char *) itup) + sizeof(IndexTupleData))) - pfree(tmpdentry.pred); - if (itup != newtup) - pfree(newtup); - } - else - { - gistPageAddItem(giststate, r, right, (Item) itup, - IndexTupleSize(itup), - rightoff, LP_USED, &tmpdentry, &newtup); - rightoff = OffsetNumberNext(rightoff); - ItemPointerSet(&(res->pointerData), rbknum, rightoff); - /* be tidy */ - if (tmpdentry.pred != (((char *) itup) + sizeof(IndexTupleData))) - pfree(tmpdentry.pred); - if (itup != newtup) - pfree(newtup); + /* write on disk (may be need another split) */ + if ( gistnospace(right, rvectup, v.spl_nright) ) { + nlen = v.spl_nright; + newtup = gistSplit(r, rightbuf, rvectup, &nlen, giststate, + ( res && rvectup[ nlen-1 ] == itup[ *len - 1 ] ) ? res : NULL ); + ReleaseBuffer( rightbuf ); + } else { + OffsetNumber l; + + l = gistwritebuffer( r, right, rvectup, v.spl_nright, FirstOffsetNumber, giststate ); + WriteBuffer(rightbuf); + + if ( res ) + ItemPointerSet(&((*res)->pointerData), rbknum, l); + gistcentryinit(giststate, &tmpentry, v.spl_rdatum, (Relation) NULL, + (Page) NULL, (OffsetNumber) 0, + -1, FALSE); + if (v.spl_rdatum != tmpentry.pred) + pfree(v.spl_rdatum); + v.spl_rdatum = tmpentry.pred; + + nlen = 1; + newtup = (IndexTuple*) palloc( sizeof(IndexTuple) * 1); + isnull = ( v.spl_rdatum ) ? ' ' : 'n'; + newtup[0] = (IndexTuple) index_formtuple(r->rd_att, (Datum *) &(v.spl_rdatum), &isnull); + ItemPointerSet(&(newtup[0]->t_tid), rbknum, 1); } - if ((bufblock = BufferGetBlockNumber(buffer)) != GISTP_ROOT) - PageRestoreTempPage(left, p); - WriteBuffer(leftbuf); - WriteBuffer(rightbuf); - - /* - * Okay, the page is split. We have three things left to do: - * - * 1) Adjust any active scans on this index to cope with changes we - * introduced in its structure by splitting this page. - * - * 2) "Tighten" the bounding box of the pointer to the left page in the - * parent node in the tree, if any. Since we moved a bunch of stuff - * off the left page, we expect it to get smaller. This happens in - * the internal insertion routine. - * - * 3) Insert a pointer to the right page in the parent. This may cause - * the parent to split. If it does, we need to repeat steps one and - * two for each split node in the tree. - */ - - /* adjust active scans */ - gistadjscans(r, GISTOP_SPLIT, bufblock, FirstOffsetNumber); - - tupDesc = r->rd_att; - - ltup = (IndexTuple) index_formtuple(tupDesc, - (Datum *) &(v.spl_ldatum), isnull); - rtup = (IndexTuple) index_formtuple(tupDesc, - (Datum *) &(v.spl_rdatum), isnull); - pfree(isnull); - - /* set pointers to new child pages in the internal index tuples */ - ItemPointerSet(&(ltup->t_tid), lbknum, 1); - ItemPointerSet(&(rtup->t_tid), rbknum, 1); - - gistintinsert(r, stack, ltup, rtup, giststate); - - pfree(ltup); - pfree(rtup); - - return res; -} - -/* -** After a split, we need to overwrite the old entry's key in the parent, -** and install install an entry for the new key into the parent. -*/ -static void -gistintinsert(Relation r, - GISTSTACK *stk, - IndexTuple ltup, /* new version of entry for old page */ - IndexTuple rtup, /* entry for new page */ - GISTSTATE *giststate) -{ - ItemPointerData ltid; - - if (stk == (GISTSTACK *) NULL) - { - gistnewroot(giststate, r, ltup, rtup); - return; + if ( gistnospace(left, lvectup, v.spl_nleft) ) { + int llen = v.spl_nleft; + IndexTuple *lntup; + + lntup = gistSplit(r, leftbuf, lvectup, &llen, giststate, + ( res && lvectup[ llen-1 ] == itup[ *len - 1 ] ) ? res : NULL ); + ReleaseBuffer( leftbuf ); + + newtup = gistjoinvector( newtup, &nlen, lntup, llen ); + pfree( lntup ); + } else { + OffsetNumber l; + + l = gistwritebuffer( r, left, lvectup, v.spl_nleft, FirstOffsetNumber, giststate ); + if ( BufferGetBlockNumber(buffer) != GISTP_ROOT) + PageRestoreTempPage(left, p); + + WriteBuffer(leftbuf); + + if ( res ) + ItemPointerSet(&((*res)->pointerData), lbknum, l); + gistcentryinit(giststate, &tmpentry, v.spl_ldatum, (Relation) NULL, + (Page) NULL, (OffsetNumber) 0, + -1, FALSE); + if (v.spl_ldatum != tmpentry.pred) + pfree(v.spl_ldatum); + v.spl_ldatum = tmpentry.pred; + + nlen += 1; + newtup = (IndexTuple*) repalloc( (void*)newtup, sizeof(IndexTuple) * nlen); + isnull = ( v.spl_ldatum ) ? ' ' : 'n'; + newtup[nlen-1] = (IndexTuple) index_formtuple(r->rd_att, (Datum *) &(v.spl_ldatum), &isnull); + ItemPointerSet(&(newtup[nlen-1]->t_tid), lbknum, 1); } - /* remove old left pointer, insert the 2 new entries */ - ItemPointerSet(<id, stk->gs_blk, stk->gs_child); - DirectFunctionCall2(gistdelete, - PointerGetDatum(r), - PointerGetDatum(<id)); - gistentryinserttwo(r, stk, ltup, rtup, giststate); -} - - -/* -** Insert two entries onto one page, handling a split for either one! -*/ -static void -gistentryinserttwo(Relation r, GISTSTACK *stk, IndexTuple ltup, - IndexTuple rtup, GISTSTATE *giststate) -{ - Buffer b; - Page p; - InsertIndexResult res; - GISTENTRY tmpentry; - IndexTuple newtup; - - b = ReadBuffer(r, stk->gs_blk); - p = BufferGetPage(b); - - if (gistnospace(p, ltup)) - { - res = gistSplit(r, b, stk->gs_parent, ltup, giststate); - WriteBuffer(b); /* don't forget to release buffer! - - * 01/31/94 */ - pfree(res); - gistdoinsert(r, rtup, giststate); - } - else - { - gistPageAddItem(giststate, r, p, (Item) ltup, - IndexTupleSize(ltup), InvalidOffsetNumber, - LP_USED, &tmpentry, &newtup); - WriteBuffer(b); - gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, tmpentry.pred, - tmpentry.bytes, giststate); - /* be tidy */ - if (tmpentry.pred != (((char *) ltup) + sizeof(IndexTupleData))) - pfree(tmpentry.pred); - if (ltup != newtup) - pfree(newtup); - gistentryinsert(r, stk, rtup, giststate); - } -} + /* adjust active scans */ + gistadjscans(r, GISTOP_SPLIT, BufferGetBlockNumber(buffer), FirstOffsetNumber); -/* -** Insert an entry onto a page -*/ -static InsertIndexResult -gistentryinsert(Relation r, GISTSTACK *stk, IndexTuple tup, - GISTSTATE *giststate) -{ - Buffer b; - Page p; - InsertIndexResult res; - OffsetNumber off; - GISTENTRY tmpentry; - IndexTuple newtup; - - b = ReadBuffer(r, stk->gs_blk); - p = BufferGetPage(b); + /* !!! pfree */ + pfree( rvectup ); + pfree( lvectup ); + pfree( v.spl_left ); + pfree( v.spl_right ); - if (gistnospace(p, tup)) - { - res = gistSplit(r, b, stk->gs_parent, tup, giststate); - WriteBuffer(b); /* don't forget to release buffer! - - * 01/31/94 */ - return res; - } - else - { - res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); - off = gistPageAddItem(giststate, r, p, (Item) tup, IndexTupleSize(tup), - InvalidOffsetNumber, LP_USED, &tmpentry, &newtup); - WriteBuffer(b); - ItemPointerSet(&(res->pointerData), stk->gs_blk, off); - gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, tmpentry.pred, - tmpentry.bytes, giststate); - /* be tidy */ - if (tmpentry.pred != (((char *) tup) + sizeof(IndexTupleData))) - pfree(tmpentry.pred); - if (tup != newtup) - pfree(newtup); - return res; - } + *len = nlen; + return newtup; } - static void -gistnewroot(GISTSTATE *giststate, Relation r, IndexTuple lt, IndexTuple rt) +gistnewroot(GISTSTATE *giststate, Relation r, IndexTuple *itup, int len) { Buffer b; Page p; - GISTENTRY tmpentry; - IndexTuple newtup; b = ReadBuffer(r, GISTP_ROOT); GISTInitBuffer(b, 0); p = BufferGetPage(b); - gistPageAddItem(giststate, r, p, (Item) lt, IndexTupleSize(lt), - FirstOffsetNumber, - LP_USED, &tmpentry, &newtup); - /* be tidy */ - if (tmpentry.pred != (((char *) lt) + sizeof(IndexTupleData))) - pfree(tmpentry.pred); - if (lt != newtup) - pfree(newtup); - gistPageAddItem(giststate, r, p, (Item) rt, IndexTupleSize(rt), - OffsetNumberNext(FirstOffsetNumber), LP_USED, - &tmpentry, &newtup); - /* be tidy */ - if (tmpentry.pred != (((char *) rt) + sizeof(IndexTupleData))) - pfree(tmpentry.pred); - if (rt != newtup) - pfree(newtup); + + gistwritebuffer( r, p, itup, len, FirstOffsetNumber, giststate ); WriteBuffer(b); } @@ -1057,21 +968,15 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ if (which_grow == 0) break; } - if (entry.pred != datum) + if (entry.pred && entry.pred != datum) pfree(entry.pred); } - if (identry.pred != id) + if (identry.pred && identry.pred != id) pfree(identry.pred); return which; } -static int -gistnospace(Page p, IndexTuple it) -{ - return PageGetFreeSpace(p) < IndexTupleSize(it); -} - void gistfreestack(GISTSTACK *s) { @@ -1193,7 +1098,7 @@ gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t) char *datum = (((char *) t) + sizeof(IndexTupleData)); /* if new entry fits in index tuple, copy it in */ - if ((Size) entry.bytes < IndexTupleSize(t) - sizeof(IndexTupleData)) + if ((Size) entry.bytes < IndexTupleSize(t) - sizeof(IndexTupleData) || (Size) entry.bytes == 0 ) { memcpy(datum, entry.pred, entry.bytes); /* clear out old size */ @@ -1208,17 +1113,13 @@ gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t) /* generate a new index tuple for the compressed entry */ TupleDesc tupDesc = r->rd_att; IndexTuple newtup; - char *isnull; - int blank; + char isnull; - isnull = (char *) palloc(r->rd_rel->relnatts); - for (blank = 0; blank < r->rd_rel->relnatts; blank++) - isnull[blank] = ' '; + isnull = ( entry.pred ) ? ' ' : 'n'; newtup = (IndexTuple) index_formtuple(tupDesc, (Datum *) &(entry.pred), - isnull); + &isnull); newtup->t_tid = t->t_tid; - pfree(isnull); return newtup; } } @@ -1269,81 +1170,46 @@ gistcentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, Relation r, } } - - #ifdef GISTDEBUG - -/* -** sloppy debugging support routine, requires recompilation with appropriate -** "out" method for the index keys. Could be fixed to find that info -** in the catalogs... -*/ -void -_gistdump(Relation r) +static void +gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff) { - Buffer buf; + Buffer buffer; Page page; - OffsetNumber offnum, - maxoff; - BlockNumber blkno; - BlockNumber nblocks; - GISTPageOpaque po; - IndexTuple itup; - BlockNumber itblkno; - OffsetNumber itoffno; - char *datum; - char *itkey; + GISTPageOpaque opaque; + IndexTuple which; + ItemId iid; + OffsetNumber i,maxoff; + BlockNumber cblk; + char *pred; - nblocks = RelationGetNumberOfBlocks(r); - for (blkno = 0; blkno < nblocks; blkno++) - { - buf = ReadBuffer(r, blkno); - page = BufferGetPage(buf); - po = (GISTPageOpaque) PageGetSpecialPointer(page); - maxoff = PageGetMaxOffsetNumber(page); - printf("Page %d maxoff %d <%s>\n", blkno, maxoff, - (po->flags & F_LEAF ? "LEAF" : "INTERNAL")); - - if (PageIsEmpty(page)) - { - ReleaseBuffer(buf); - continue; - } + pred = (char*) palloc( sizeof(char)*level+1 ); + MemSet(pred, '\t', level); + pred[level]='\0'; - for (offnum = FirstOffsetNumber; - offnum <= maxoff; - offnum = OffsetNumberNext(offnum)) - { - itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum)); - itblkno = ItemPointerGetBlockNumber(&(itup->t_tid)); - itoffno = ItemPointerGetOffsetNumber(&(itup->t_tid)); - datum = ((char *) itup); - datum += sizeof(IndexTupleData); - /* get out function for type of key, and out it! */ - itkey = (char *) int_range_out((INTRANGE *) datum); - /* itkey = " unable to print"; */ - printf("\t[%d] size %d heap <%d,%d> key:%s\n", - offnum, IndexTupleSize(itup), itblkno, itoffno, itkey); - pfree(itkey); + buffer = ReadBuffer(r, blk); + page = (Page) BufferGetPage(buffer); + opaque = (GISTPageOpaque) PageGetSpecialPointer(page); + + maxoff = PageGetMaxOffsetNumber( page ); + + elog(NOTICE,"%sPage: %d %s blk: %d maxoff: %d free: %d", pred, coff, ( opaque->flags & F_LEAF ) ? "LEAF" : "INTE", (int)blk, (int)maxoff, PageGetFreeSpace(page)); + + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { + iid = PageGetItemId(page, i); + which = (IndexTuple) PageGetItem(page, iid); + cblk = ItemPointerGetBlockNumber(&(which->t_tid)); +#ifdef PRINTTUPLE + elog(NOTICE,"%s Tuple. blk: %d size: %d", pred, (int)cblk, IndexTupleSize( which ) ); +#endif + + if ( ! ( opaque->flags & F_LEAF ) ) { + gist_dumptree( r, level+1, cblk, i ); } - - ReleaseBuffer(buf); } + ReleaseBuffer(buffer); + pfree(pred); } - -static char * -int_range_out(INTRANGE *r) -{ - char *result; - - if (r == NULL) - return NULL; - result = (char *) palloc(80); - snprintf(result, 80, "[%d,%d): %d", r->lower, r->upper, r->flag); - - return result; -} - #endif /* defined GISTDEBUG */ void @@ -1362,3 +1228,4 @@ void gist_desc(char *buf, uint8 xl_info, char* rec) { } +