# Makefile for access/gist
#
# IDENTIFICATION
-# $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.13 2005/06/14 11:45:13 teodor Exp $
+# $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.14 2005/06/20 10:29:36 teodor Exp $
#
#-------------------------------------------------------------------------
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = gist.o gistutil.o gistxlog.o gistget.o gistscan.o
+OBJS = gist.o gistutil.o gistxlog.o gistvacuum.o gistget.o gistscan.o
all: SUBSYS.o
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.119 2005/06/14 11:45:13 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.120 2005/06/20 10:29:36 teodor Exp $
*
*-------------------------------------------------------------------------
*/
GISTSTATE giststate;
int numindexattrs;
double indtuples;
- MemoryContext tmpCxt;
+ MemoryContext tmpCtx;
} GISTBuildState;
GISTSTATE *giststate);
-typedef struct PageLayout {
- gistxlogPage block;
- OffsetNumber *list;
- Buffer buffer; /* to write after all proceed */
-
- struct PageLayout *next;
-} PageLayout;
-
-
#define ROTATEDIST(d) do { \
- PageLayout *tmp=(PageLayout*)palloc(sizeof(PageLayout)); \
- memset(tmp,0,sizeof(PageLayout)); \
+ SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
+ memset(tmp,0,sizeof(SplitedPageLayout)); \
tmp->next = (d); \
(d)=tmp; \
} while(0)
-static IndexTuple *gistSplit(Relation r,
- Buffer buffer,
- IndexTuple *itup,
- int *len,
- PageLayout **dist,
- GISTSTATE *giststate);
-
-
-#undef GISTDEBUG
-
-#ifdef GISTDEBUG
-static void gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff);
-#endif
-
/*
* Create and return a temporary memory context for use by GiST. We
* _always_ invoke user-provided methods in a temporary memory
initGISTstate(&buildstate.giststate, index);
/* initialize the root page */
- buffer = ReadBuffer(index, P_NEW);
+ buffer = gistReadBuffer(index, P_NEW);
GISTInitBuffer(buffer, F_LEAF);
if ( !index->rd_istemp ) {
XLogRecPtr recptr;
* create a temporary memory context that is reset once for each
* tuple inserted into the index
*/
- buildstate.tmpCxt = createTempGistContext();
+ buildstate.tmpCtx = createTempGistContext();
/* do the heap scan */
reltuples = IndexBuildHeapScan(heap, index, indexInfo,
gistbuildCallback, (void *) &buildstate);
/* okay, all heap tuples are indexed */
- MemoryContextDelete(buildstate.tmpCxt);
+ MemoryContextDelete(buildstate.tmpCtx);
/* since we just counted the # of tuples, may as well update stats */
IndexCloseAndUpdateStats(heap, reltuples, index, buildstate.indtuples);
freeGISTstate(&buildstate.giststate);
-#ifdef GISTDEBUG
- gist_dumptree(index, 0, GIST_ROOT_BLKNO, 0);
-#endif
PG_RETURN_VOID();
}
IndexTuple itup;
GISTENTRY tmpcentry;
int i;
- MemoryContext oldCxt;
+ MemoryContext oldCtx;
/* GiST cannot index tuples with leading NULLs */
if (isnull[0])
return;
- oldCxt = MemoryContextSwitchTo(buildstate->tmpCxt);
+ oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
/* immediately compress keys to normalize */
for (i = 0; i < buildstate->numindexattrs; i++)
gistdoinsert(index, itup, &buildstate->giststate);
buildstate->indtuples += 1;
- MemoryContextSwitchTo(oldCxt);
- MemoryContextReset(buildstate->tmpCxt);
+ MemoryContextSwitchTo(oldCtx);
+ MemoryContextReset(buildstate->tmpCtx);
}
/*
GISTSTATE giststate;
GISTENTRY tmpentry;
int i;
- MemoryContext oldCxt;
- MemoryContext insertCxt;
+ MemoryContext oldCtx;
+ MemoryContext insertCtx;
/*
* Since GIST is not marked "amconcurrent" in pg_am, caller should
if (isnull[0])
PG_RETURN_BOOL(false);
- insertCxt = createTempGistContext();
- oldCxt = MemoryContextSwitchTo(insertCxt);
+ insertCtx = createTempGistContext();
+ oldCtx = MemoryContextSwitchTo(insertCtx);
initGISTstate(&giststate, r);
/* cleanup */
freeGISTstate(&giststate);
- MemoryContextSwitchTo(oldCxt);
- MemoryContextDelete(insertCxt);
+ MemoryContextSwitchTo(oldCtx);
+ MemoryContextDelete(insertCtx);
PG_RETURN_BOOL(true);
}
state.r = r;
state.key = itup->t_tid;
state.needInsertComplete = true;
- state.xlog_mode = false;
state.stack = (GISTInsertStack*)palloc(sizeof(GISTInsertStack));
memset( state.stack, 0, sizeof(GISTInsertStack));
IndexTuple *itvec,
*newitup;
int tlen,olen;
- PageLayout *dist=NULL, *ptr;
+ SplitedPageLayout *dist=NULL, *ptr;
- memset(&dist, 0, sizeof(PageLayout));
is_splitted = true;
itvec = gistextractbuffer(state->stack->buffer, &tlen);
olen=tlen;
itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate);
- if ( !state->r->rd_istemp && !state->xlog_mode) {
- gistxlogPageSplit xlrec;
- XLogRecPtr recptr;
- XLogRecData *rdata;
- int i, npage = 0, cur=1;
-
- ptr=dist;
- while( ptr ) {
- npage++;
- ptr=ptr->next;
- }
-
- rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + state->ituplen + 2));
-
- xlrec.node = state->r->rd_node;
- xlrec.origblkno = state->stack->blkno;
- xlrec.npage = npage;
- xlrec.nitup = state->ituplen;
- xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber;
- xlrec.key = state->key;
- xlrec.pathlen = (uint16)state->pathlen;
-
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = sizeof( gistxlogPageSplit );
- rdata[0].next = NULL;
-
- if ( state->pathlen>=0 ) {
- rdata[0].next = &(rdata[1]);
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = (char *) (state->path);
- rdata[1].len = sizeof( BlockNumber ) * state->pathlen;
- rdata[1].next = NULL;
- cur++;
- }
-
- /* new tuples */
- for(i=0;i<state->ituplen;i++) {
- rdata[cur].buffer = InvalidBuffer;
- rdata[cur].data = (char*)(state->itup[i]);
- rdata[cur].len = IndexTupleSize(state->itup[i]);
- rdata[cur-1].next = &(rdata[cur]);
- cur++;
+ if ( !state->r->rd_istemp ) {
+ OffsetNumber noffs=0, offs[ MAXALIGN( sizeof(OffsetNumber) ) / sizeof(OffsetNumber) ];
+ XLogRecPtr recptr;
+ XLogRecData *rdata;
+
+ if ( state->stack->todelete ) {
+ offs[0] = state->stack->childoffnum;
+ noffs=1;
}
- /* new page layout */
- ptr=dist;
- while(ptr) {
- rdata[cur].buffer = InvalidBuffer;
- rdata[cur].data = (char*)&(ptr->block);
- rdata[cur].len = sizeof(gistxlogPage);
- rdata[cur-1].next = &(rdata[cur]);
- cur++;
-
- rdata[cur].buffer = InvalidBuffer;
- rdata[cur].data = (char*)(ptr->list);
- rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num);
- if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num )
- rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len );
- rdata[cur-1].next = &(rdata[cur]);
- rdata[cur].next=NULL;
- cur++;
-
- ptr=ptr->next;
- }
+ rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
+ offs, noffs, state->itup, state->ituplen,
+ &(state->key), state->path, state->pathlen, dist);
START_CRIT_SECTION();
state->ituplen = tlen; /* now tlen >= 2 */
if ( state->stack->blkno == GIST_ROOT_BLKNO ) {
- gistnewroot(state->r, state->itup, state->ituplen, &(state->key), state->xlog_mode);
+ gistnewroot(state->r, state->itup, state->ituplen, &(state->key));
state->needInsertComplete=false;
}
- if ( state->xlog_mode )
- LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(state->stack->buffer);
}
else
{
/* enough space */
OffsetNumber off, l;
+ bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
off = (PageIsEmpty(state->stack->page)) ?
FirstOffsetNumber
:
OffsetNumberNext(PageGetMaxOffsetNumber(state->stack->page));
l = gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, off);
- if ( !state->r->rd_istemp && !state->xlog_mode) {
- gistxlogEntryUpdate xlrec;
- XLogRecPtr recptr;
- XLogRecData *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( state->ituplen + 2 ) );
- int i, cur=0;
-
- xlrec.node = state->r->rd_node;
- xlrec.blkno = state->stack->blkno;
- xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber;
- xlrec.key = state->key;
- xlrec.pathlen = (uint16)state->pathlen;
-
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = sizeof( gistxlogEntryUpdate );
- rdata[0].next = NULL;
-
- if ( state->pathlen>=0 ) {
- rdata[0].next = &(rdata[1]);
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = (char *) (state->path);
- rdata[1].len = sizeof( BlockNumber ) * state->pathlen;
- rdata[1].next = NULL;
- cur++;
+ if ( !state->r->rd_istemp ) {
+ OffsetNumber noffs=0, offs[ MAXALIGN( sizeof(OffsetNumber) ) / sizeof(OffsetNumber) ];
+ XLogRecPtr recptr;
+ XLogRecData *rdata;
+
+ if ( state->stack->todelete ) {
+ offs[0] = state->stack->childoffnum;
+ noffs=1;
}
+
+ rdata = formUpdateRdata(state->r->rd_node, state->stack->blkno,
+ offs, noffs, false, state->itup, state->ituplen,
+ &(state->key), state->path, state->pathlen);
- for(i=1; i<=state->ituplen; i++) { /* adding tuples */
- rdata[i+cur].buffer = InvalidBuffer;
- rdata[i+cur].data = (char*)(state->itup[i-1]);
- rdata[i+cur].len = IndexTupleSize(state->itup[i-1]);
- rdata[i+cur].next = NULL;
- rdata[i-1+cur].next = &(rdata[i+cur]);
- }
-
START_CRIT_SECTION();
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
if ( state->stack->blkno == GIST_ROOT_BLKNO )
state->needInsertComplete=false;
-
- if ( state->xlog_mode )
- LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(state->stack->buffer);
if (state->ituplen > 1)
* parent
*/
IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate);
- ItemPointerSet(&(newtup->t_tid), state->stack->blkno, FirstOffsetNumber);
+ ItemPointerSetBlockNumber(&(newtup->t_tid), state->stack->blkno);
state->itup[0] = newtup;
state->ituplen = 1;
+ } else if (is_leaf) {
+ /* itup[0] store key to adjust parent, we set it to valid
+ to correct check by GistTupleIsInvalid macro in gistgetadjusted() */
+ ItemPointerSetBlockNumber(&(state->itup[0]->t_tid), state->stack->blkno);
+ GistTupleSetValid( state->itup[0] );
}
}
return is_splitted;
/* walk down */
while( true ) {
- GISTPageOpaque opaque;
-
- state->stack->buffer = ReadBuffer(state->r, state->stack->blkno);
+ state->stack->buffer = gistReadBuffer(state->r, state->stack->blkno);
state->stack->page = (Page) BufferGetPage(state->stack->buffer);
- opaque = (GISTPageOpaque) PageGetSpecialPointer(state->stack->page);
-
- if (!(opaque->flags & F_LEAF))
+
+ if (!GistPageIsLeaf(state->stack->page))
{
/*
* This is an internal page, so continue to walk down the
state->pathlen++;
ptr=ptr->parent;
}
- state->path=(BlockNumber*)palloc(sizeof(BlockNumber)*state->pathlen);
+ state->path=(BlockNumber*)palloc(MAXALIGN(sizeof(BlockNumber)*state->pathlen));
ptr = state->stack;
state->pathlen=0;
while( ptr ) {
* then itup contains additional for adjustment of current key
*/
- is_splitted = gistplacetopage(state, giststate );
+ is_splitted = gistplacetopage(state, giststate);
/* pop page from stack */
state->stack = state->stack->parent;
* an insert in a child node. Therefore, remove the old
* version of this node's key.
*/
+
gistadjscans(state->r, GISTOP_DEL, state->stack->blkno, state->stack->childoffnum);
PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
if ( !state->r->rd_istemp )
/* release all buffers */
while( state->stack ) {
- if ( state->xlog_mode )
- LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(state->stack->buffer);
state->stack = state->stack->parent;
}
/* say to xlog that insert is completed */
- if ( !state->xlog_mode && state->needInsertComplete && !state->r->rd_istemp ) {
- gistxlogInsertComplete xlrec;
- XLogRecData rdata;
-
- xlrec.node = state->r->rd_node;
- xlrec.key = state->key;
-
- rdata.buffer = InvalidBuffer;
- rdata.data = (char *) &xlrec;
- rdata.len = sizeof( gistxlogInsertComplete );
- rdata.next = NULL;
-
- START_CRIT_SECTION();
+ if ( state->needInsertComplete && !state->r->rd_istemp )
+ gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1);
+}
- XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, &rdata);
+static void
+gistToRealOffset(OffsetNumber *arr, int len, OffsetNumber *reasloffset) {
+ int i;
- END_CRIT_SECTION();
- }
+ for(i=0;i<len;i++)
+ arr[i] = reasloffset[ arr[i] ];
}
/*
* gistSplit -- split a page in the tree.
*/
-static IndexTuple *
+IndexTuple *
gistSplit(Relation r,
Buffer buffer,
IndexTuple *itup, /* contains compressed entry */
int *len,
- PageLayout **dist,
+ SplitedPageLayout **dist,
GISTSTATE *giststate)
{
Page p;
GISTPageOpaque opaque;
GIST_SPLITVEC v;
GistEntryVector *entryvec;
- int i,
+ int i, fakeoffset,
nlen;
+ OffsetNumber *realoffset;
+ IndexTuple *cleaneditup = itup;
+ int lencleaneditup = *len;
p = (Page) BufferGetPage(buffer);
opaque = (GISTPageOpaque) PageGetSpecialPointer(p);
*/
if (BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO)
{
- leftbuf = ReadBuffer(r, P_NEW);
- GISTInitBuffer(leftbuf, opaque->flags);
+ leftbuf = gistReadBuffer(r, P_NEW);
+ GISTInitBuffer(leftbuf, opaque->flags&F_LEAF);
lbknum = BufferGetBlockNumber(leftbuf);
left = (Page) BufferGetPage(leftbuf);
}
left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData));
}
- rightbuf = ReadBuffer(r, P_NEW);
- GISTInitBuffer(rightbuf, opaque->flags);
+ rightbuf = gistReadBuffer(r, P_NEW);
+ GISTInitBuffer(rightbuf, opaque->flags&F_LEAF);
rbknum = BufferGetBlockNumber(rightbuf);
right = (Page) BufferGetPage(rightbuf);
/* generate the item array */
+ realoffset = palloc((*len + 1) * sizeof(OffsetNumber));
entryvec = palloc(GEVHDRSZ + (*len + 1) * sizeof(GISTENTRY));
entryvec->n = *len + 1;
+ fakeoffset = FirstOffsetNumber;
for (i = 1; i <= *len; i++)
{
Datum datum;
bool IsNull;
+ if (!GistPageIsLeaf(p) && GistTupleIsInvalid( itup[i - 1] )) {
+ entryvec->n--;
+ /* remember position of invalid tuple */
+ realoffset[ entryvec->n ] = i;
+ continue;
+ }
+
datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull);
- gistdentryinit(giststate, 0, &(entryvec->vector[i]),
+ gistdentryinit(giststate, 0, &(entryvec->vector[fakeoffset]),
datum, r, p, i,
ATTSIZE(datum, giststate->tupdesc, 1, IsNull),
FALSE, IsNull);
+ realoffset[ fakeoffset ] = i;
+ fakeoffset++;
}
- /*
- * now let the user-defined picksplit function set up the split
- * vector; in entryvec have no null value!!
- */
- FunctionCall2(&giststate->picksplitFn[0],
- PointerGetDatum(entryvec),
- PointerGetDatum(&v));
-
- /* compatibility with old code */
- if (v.spl_left[v.spl_nleft - 1] == InvalidOffsetNumber)
- v.spl_left[v.spl_nleft - 1] = (OffsetNumber) *len;
- if (v.spl_right[v.spl_nright - 1] == InvalidOffsetNumber)
- v.spl_right[v.spl_nright - 1] = (OffsetNumber) *len;
-
- v.spl_lattr[0] = v.spl_ldatum;
- v.spl_rattr[0] = v.spl_rdatum;
- v.spl_lisnull[0] = false;
- v.spl_risnull[0] = false;
-
- /*
- * if index is multikey, then we must to try get smaller bounding box
- * for subkey(s)
- */
- if (r->rd_att->natts > 1)
- {
- int MaxGrpId;
-
- v.spl_idgrp = (int *) palloc0(sizeof(int) * (*len + 1));
- v.spl_grpflag = (char *) palloc0(sizeof(char) * (*len + 1));
- v.spl_ngrp = (int *) palloc(sizeof(int) * (*len + 1));
-
- MaxGrpId = gistfindgroup(giststate, entryvec->vector, &v);
-
- /* form union of sub keys for each page (l,p) */
- gistunionsubkey(r, giststate, itup, &v);
-
- /*
- * if possible, we insert equivalent tuples with control by
- * penalty for a subkey(s)
- */
- if (MaxGrpId > 1)
- gistadjsubkey(r, itup, len, &v, giststate);
+ /*
+ * if it was invalid tuple then we need special processing. If
+ * it's possible, we move all invalid tuples on right page.
+ * We should remember, that union with invalid tuples
+ * is a invalid tuple.
+ */
+ if ( entryvec->n != *len + 1 ) {
+ lencleaneditup = entryvec->n-1;
+ cleaneditup = (IndexTuple*)palloc(lencleaneditup * sizeof(IndexTuple));
+ for(i=1;i<entryvec->n;i++)
+ cleaneditup[i-1] = itup[ realoffset[ i ]-1 ];
+
+ if ( gistnospace( left, cleaneditup, lencleaneditup ) ) {
+ /* no space on left to put all good tuples, so picksplit */
+ gistUserPicksplit(r, entryvec, &v, cleaneditup, lencleaneditup, giststate);
+ v.spl_leftvalid = true;
+ v.spl_rightvalid = false;
+ gistToRealOffset( v.spl_left, v.spl_nleft, realoffset );
+ gistToRealOffset( v.spl_right, v.spl_nright, realoffset );
+ } else {
+ /* we can try to store all valid tuples on one page */
+ v.spl_right = (OffsetNumber*)palloc( entryvec->n * sizeof(OffsetNumber) );
+ v.spl_left = (OffsetNumber*)palloc( entryvec->n * sizeof(OffsetNumber) );
+
+ if ( lencleaneditup==0 ) {
+ /* all tuples are invalid, so moves half of its to right */
+ v.spl_leftvalid = v.spl_rightvalid = false;
+ v.spl_nright = 0;
+ v.spl_nleft = 0;
+ for(i=1;i<=*len;i++)
+ if ( i-1<*len/2 )
+ v.spl_left[ v.spl_nleft++ ] = i;
+ else
+ v.spl_right[ v.spl_nright++ ] = i;
+ } else {
+ /* we will not call gistUserPicksplit, just put good
+ tuples on left and invalid on right */
+ v.spl_nleft = lencleaneditup;
+ v.spl_nright = 0;
+ for(i=1;i<entryvec->n;i++)
+ v.spl_left[i-1] = i;
+ gistToRealOffset( v.spl_left, v.spl_nleft, realoffset );
+ v.spl_lattr[0] = v.spl_ldatum = (Datum)0;
+ v.spl_rattr[0] = v.spl_rdatum = (Datum)0;
+ v.spl_lisnull[0] = true;
+ v.spl_risnull[0] = true;
+ gistunionsubkey(r, giststate, itup, &v, true);
+ v.spl_leftvalid = true;
+ v.spl_rightvalid = false;
+ }
+ }
+ } else {
+ /* there is no invalid tuples, so usial processing */
+ gistUserPicksplit(r, entryvec, &v, itup, *len, giststate);
+ v.spl_leftvalid = v.spl_rightvalid = true;
}
+
/* form left and right vector */
- lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nleft);
- rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nright);
+ lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len+1));
+ rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len+1));
for (i = 0; i < v.spl_nleft; i++)
lvectup[i] = itup[v.spl_left[i] - 1];
for (i = 0; i < v.spl_nright; i++)
rvectup[i] = itup[v.spl_right[i] - 1];
+ /* place invalid tuples on right page if itsn't done yet */
+ for (fakeoffset = entryvec->n; fakeoffset < *len+1 && lencleaneditup; fakeoffset++) {
+ rvectup[v.spl_nright++] = itup[realoffset[fakeoffset] - 1];
+ }
/* write on disk (may need another split) */
if (gistnospace(right, rvectup, v.spl_nright))
{
int i;
- PageLayout *d, *origd=*dist;
+ SplitedPageLayout *d, *origd=*dist;
nlen = v.spl_nright;
newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate);
nlen = 1;
newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1);
- newtup[0] = gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull);
- ItemPointerSet(&(newtup[0]->t_tid), rbknum, FirstOffsetNumber);
+ newtup[0] = ( v.spl_rightvalid ) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull)
+ : gist_form_invalid_tuple( rbknum );
+ ItemPointerSetBlockNumber(&(newtup[0]->t_tid), rbknum);
}
if (gistnospace(left, lvectup, v.spl_nleft))
int llen = v.spl_nleft;
IndexTuple *lntup;
int i;
- PageLayout *d, *origd=*dist;
+ SplitedPageLayout *d, *origd=*dist;
lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate);
nlen += 1;
newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen);
- newtup[nlen - 1] = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull);
- ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, FirstOffsetNumber);
+ newtup[nlen - 1] = ( v.spl_leftvalid ) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull)
+ : gist_form_invalid_tuple( lbknum );
+ ItemPointerSetBlockNumber(&(newtup[nlen - 1]->t_tid), lbknum);
}
+ GistClearTuplesDeleted(p);
+
*len = nlen;
return newtup;
}
void
-gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode)
+gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key)
{
Buffer buffer;
Page page;
- buffer = (xlog_mode) ? XLogReadBuffer(false, r, GIST_ROOT_BLKNO) : ReadBuffer(r, GIST_ROOT_BLKNO);
+ buffer = gistReadBuffer(r, GIST_ROOT_BLKNO);
GISTInitBuffer(buffer, 0);
page = BufferGetPage(buffer);
gistfillbuffer(r, page, itup, len, FirstOffsetNumber);
- if ( !xlog_mode && !r->rd_istemp ) {
- gistxlogEntryUpdate xlrec;
+ if ( !r->rd_istemp ) {
XLogRecPtr recptr;
- XLogRecData *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( len + 1 ) );
- int i;
+ XLogRecData *rdata;
- xlrec.node = r->rd_node;
- xlrec.blkno = GIST_ROOT_BLKNO;
- xlrec.todeleteoffnum = InvalidOffsetNumber;
- xlrec.key = *key;
- xlrec.pathlen=0;
-
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = sizeof( gistxlogEntryUpdate );
- rdata[0].next = NULL;
-
- for(i=1; i<=len; i++) {
- rdata[i].buffer = InvalidBuffer;
- rdata[i].data = (char*)(itup[i-1]);
- rdata[i].len = IndexTupleSize(itup[i-1]);
- rdata[i].next = NULL;
- rdata[i-1].next = &(rdata[i]);
- }
+ rdata = formUpdateRdata(r->rd_node, GIST_ROOT_BLKNO,
+ NULL, 0, false, itup, len,
+ key, NULL, 0);
START_CRIT_SECTION();
END_CRIT_SECTION();
}
- if ( xlog_mode )
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
}
-
-/*
- * Bulk deletion of all index entries pointing to a set of heap tuples.
- * The set of target tuples is specified via a callback routine that tells
- * whether any given heap tuple (identified by ItemPointer) is being deleted.
- *
- * Result: a palloc'd struct containing statistical info for VACUUM displays.
- */
-Datum
-gistbulkdelete(PG_FUNCTION_ARGS)
-{
- Relation rel = (Relation) PG_GETARG_POINTER(0);
- IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1);
- void *callback_state = (void *) PG_GETARG_POINTER(2);
- IndexBulkDeleteResult *result;
- BlockNumber num_pages;
- double tuples_removed;
- double num_index_tuples;
- IndexScanDesc iscan;
-
- tuples_removed = 0;
- num_index_tuples = 0;
-
- /*
- * Since GIST is not marked "amconcurrent" in pg_am, caller should
- * have acquired exclusive lock on index relation. We need no locking
- * here.
- */
-
- /*
- * XXX generic implementation --- should be improved!
- */
-
- /* walk through the entire index */
- iscan = index_beginscan(NULL, rel, SnapshotAny, 0, NULL);
- /* including killed tuples */
- iscan->ignore_killed_tuples = false;
-
- while (index_getnext_indexitem(iscan, ForwardScanDirection))
- {
- vacuum_delay_point();
-
- if (callback(&iscan->xs_ctup.t_self, callback_state))
- {
- ItemPointerData indextup = iscan->currentItemData;
- BlockNumber blkno;
- OffsetNumber offnum;
- Buffer buf;
- Page page;
-
- blkno = ItemPointerGetBlockNumber(&indextup);
- offnum = ItemPointerGetOffsetNumber(&indextup);
-
- /* adjust any scans that will be affected by this deletion */
- gistadjscans(rel, GISTOP_DEL, blkno, offnum);
-
- /* delete the index tuple */
- buf = ReadBuffer(rel, blkno);
- page = BufferGetPage(buf);
-
- PageIndexTupleDelete(page, offnum);
- if ( !rel->rd_istemp ) {
- gistxlogEntryUpdate xlrec;
- XLogRecPtr recptr;
- XLogRecData rdata;
-
- xlrec.node = rel->rd_node;
- xlrec.blkno = blkno;
- xlrec.todeleteoffnum = offnum;
- xlrec.pathlen=0;
- ItemPointerSetInvalid( &(xlrec.key) );
-
- rdata.buffer = InvalidBuffer;
- rdata.data = (char *) &xlrec;
- rdata.len = sizeof( gistxlogEntryUpdate );
- rdata.next = NULL;
-
- START_CRIT_SECTION();
-
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_DELETE, &rdata);
- PageSetLSN(page, recptr);
- PageSetTLI(page, ThisTimeLineID);
-
- END_CRIT_SECTION();
- }
-
- WriteBuffer(buf);
-
- tuples_removed += 1;
- }
- else
- num_index_tuples += 1;
- }
-
- index_endscan(iscan);
-
- /* return statistics */
- num_pages = RelationGetNumberOfBlocks(rel);
-
- result = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
- result->num_pages = num_pages;
- result->num_index_tuples = num_index_tuples;
- result->tuples_removed = tuples_removed;
-
- PG_RETURN_POINTER(result);
-}
-
void
initGISTstate(GISTSTATE *giststate, Relation index)
{
/* no work */
}
-#ifdef GISTDEBUG
-static void
-gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff)
-{
- Buffer buffer;
- Page page;
- GISTPageOpaque opaque;
- IndexTuple which;
- ItemId iid;
- OffsetNumber i,
- maxoff;
- BlockNumber cblk;
- char *pred;
-
- pred = (char *) palloc(sizeof(char) * level + 1);
- MemSet(pred, '\t', level);
- pred[level] = '\0';
-
- buffer = ReadBuffer(r, blk);
- page = (Page) BufferGetPage(buffer);
- opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
-
- maxoff = PageGetMaxOffsetNumber(page);
-
- elog(DEBUG4, "%sPage: %d %s blk: %d maxoff: %d free: %d", pred,
- coff, (opaque->flags & F_LEAF) ? "LEAF" : "INTE", (int) blk,
- (int) maxoff, PageGetFreeSpace(page));
-
- for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
- {
- iid = PageGetItemId(page, i);
- which = (IndexTuple) PageGetItem(page, iid);
- cblk = ItemPointerGetBlockNumber(&(which->t_tid));
-#ifdef PRINTTUPLE
- elog(DEBUG4, "%s Tuple. blk: %d size: %d", pred, (int) cblk,
- IndexTupleSize(which));
-#endif
-
- if (!(opaque->flags & F_LEAF))
- gist_dumptree(r, level + 1, cblk, i);
- }
- ReleaseBuffer(buffer);
- pfree(pred);
-}
-#endif /* defined GISTDEBUG */
-
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.48 2005/06/14 11:45:13 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.49 2005/06/20 10:29:36 teodor Exp $
*
*-------------------------------------------------------------------------
*/
{
Page p;
OffsetNumber n;
- GISTPageOpaque po;
GISTScanOpaque so;
GISTSTACK *stk;
IndexTuple it;
}
p = BufferGetPage(so->curbuf);
- po = (GISTPageOpaque) PageGetSpecialPointer(p);
if (ItemPointerIsValid(&scan->currentItemData) == false)
{
so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation,
stk->block);
p = BufferGetPage(so->curbuf);
- po = (GISTPageOpaque) PageGetSpecialPointer(p);
if (ScanDirectionIsBackward(dir))
n = OffsetNumberPrev(stk->offset);
continue;
}
- if (po->flags & F_LEAF)
+ if (GistPageIsLeaf(p))
{
/*
* We've found a matching index entry in a leaf page, so
so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation,
child_block);
p = BufferGetPage(so->curbuf);
- po = (GISTPageOpaque) PageGetSpecialPointer(p);
if (ScanDirectionIsBackward(dir))
n = PageGetMaxOffsetNumber(p);
IncrIndexProcessed();
+ /*
+ * Tuple doesn't restore after crash recovery because of inclomplete insert
+ */
+ if ( !GistPageIsLeaf(p) && GistTupleIsInvalid(tuple) )
+ return true;
+
while (keySize > 0)
{
Datum datum;
{
OffsetNumber maxoff;
IndexTuple it;
- GISTPageOpaque po;
GISTScanOpaque so;
MemoryContext oldcxt;
Page p;
so = (GISTScanOpaque) scan->opaque;
p = BufferGetPage(so->curbuf);
maxoff = PageGetMaxOffsetNumber(p);
- po = (GISTPageOpaque) PageGetSpecialPointer(p);
/*
* Make sure we're in a short-lived memory context when we invoke
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.1 2005/06/14 11:45:13 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.2 2005/06/20 10:29:36 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/heapam.h"
#include "catalog/index.h"
#include "miscadmin.h"
+#include "storage/freespace.h"
/* group flags ( in gistadjsubkey ) */
#define LEFT_ADDED 0x01
GistEntryVector *evec;
int i;
GISTENTRY centry[INDEX_MAX_KEYS];
+ IndexTuple res;
evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
+ for(i = 0; i<len; i++)
+ if ( GistTupleIsInvalid( itvec[i] ) )
+ return gist_form_invalid_tuple( InvalidBlockNumber );
+
for (i = 0; i < r->rd_att->natts; i++)
{
Datum datum;
}
}
- return index_form_tuple(giststate->tupdesc, attr, isnull);
+ res = index_form_tuple(giststate->tupdesc, attr, isnull);
+ GistTupleSetValid( res );
+ return res;
}
IndexTuple newtup = NULL;
int i;
+ if ( GistTupleIsInvalid(oldtup) || GistTupleIsInvalid(addtup) )
+ return gist_form_invalid_tuple( ItemPointerGetBlockNumber( &(oldtup->t_tid) ) );
+
evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
evec->n = 2;
ev0p = &(evec->vector[0]);
ev1p = &(evec->vector[1]);
+
gistDeCompressAtt(giststate, r, oldtup, NULL,
(OffsetNumber) 0, oldatt, oldisnull);
}
void
-gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
+gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl, bool isall)
{
int lr;
isnull = spl->spl_risnull;
}
- evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
+ evec = palloc(((len < 2) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
- for (i = 1; i < r->rd_att->natts; i++)
+ for (i = (isall) ? 0 : 1; i < r->rd_att->natts; i++)
{
int j;
Datum datum;
void
gistadjsubkey(Relation r,
IndexTuple *itup, /* contains compressed entry */
- int *len,
+ int len,
GIST_SPLITVEC *v,
GISTSTATE *giststate)
{
ev1p = &(evec->vector[1]);
/* add equivalent tuple */
- for (i = 0; i < *len; i++)
+ for (i = 0; i < len; i++)
{
Datum datum;
maxoff = PageGetMaxOffsetNumber(p);
*which_grow = -1.0;
- which = -1;
+ which = InvalidOffsetNumber;
sum_grow = 1;
gistDeCompressAtt(giststate, r,
it, NULL, (OffsetNumber) 0,
{
int j;
IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
+
+ if ( !GistPageIsLeaf(p) && GistTupleIsInvalid(itup) ) {
+ elog(LOG, "It's desirable to vacuum or reindex GiST index '%s' due to crash recovery",
+ RelationGetRelationName(r));
+ continue;
+ }
sum_grow = 0;
for (j = 0; j < r->rd_att->natts; j++)
}
}
+ if ( which == InvalidOffsetNumber )
+ which = FirstOffsetNumber;
+
return which;
}
GISTENTRY centry[INDEX_MAX_KEYS];
Datum compatt[INDEX_MAX_KEYS];
int i;
+ IndexTuple res;
for (i = 0; i < r->rd_att->natts; i++)
{
}
}
- return index_form_tuple(giststate->tupdesc, compatt, isnull);
+ res = index_form_tuple(giststate->tupdesc, compatt, isnull);
+ GistTupleSetValid(res);
+ return res;
}
void
opaque->flags = f;
}
+void
+gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
+ IndexTuple *itup, int len, GISTSTATE *giststate) {
+ /*
+ * now let the user-defined picksplit function set up the split
+ * vector; in entryvec have no null value!!
+ */
+ FunctionCall2(&giststate->picksplitFn[0],
+ PointerGetDatum(entryvec),
+ PointerGetDatum(v));
+
+ /* compatibility with old code */
+ if (v->spl_left[v->spl_nleft - 1] == InvalidOffsetNumber)
+ v->spl_left[v->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1);
+ if (v->spl_right[v->spl_nright - 1] == InvalidOffsetNumber)
+ v->spl_right[v->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1);
+
+ v->spl_lattr[0] = v->spl_ldatum;
+ v->spl_rattr[0] = v->spl_rdatum;
+ v->spl_lisnull[0] = false;
+ v->spl_risnull[0] = false;
+
+ /*
+ * if index is multikey, then we must to try get smaller bounding box
+ * for subkey(s)
+ */
+ if (r->rd_att->natts > 1)
+ {
+ int MaxGrpId;
+
+ v->spl_idgrp = (int *) palloc0(sizeof(int) * entryvec->n);
+ v->spl_grpflag = (char *) palloc0(sizeof(char) * entryvec->n);
+ v->spl_ngrp = (int *) palloc(sizeof(int) * entryvec->n);
+
+ MaxGrpId = gistfindgroup(giststate, entryvec->vector, v);
+
+ /* form union of sub keys for each page (l,p) */
+ gistunionsubkey(r, giststate, itup, v, false);
+
+ /*
+ * if possible, we insert equivalent tuples with control by
+ * penalty for a subkey(s)
+ */
+ if (MaxGrpId > 1)
+ gistadjsubkey(r, itup, len, v, giststate);
+ }
+}
+
+Buffer
+gistReadBuffer(Relation r, BlockNumber blkno) {
+ Buffer buffer = InvalidBuffer;
+
+ if ( blkno != P_NEW ) {
+ buffer = ReadBuffer(r, blkno);
+ } else {
+ Page page;
+
+ while(true) {
+ blkno = GetFreeIndexPage(&r->rd_node);
+ if (blkno == InvalidBlockNumber)
+ break;
+
+ buffer = ReadBuffer(r, blkno);
+ page = BufferGetPage(buffer);
+ if ( GistPageIsDeleted( page ) ) {
+ GistPageSetNonDeleted( page );
+ return buffer;
+ }
+ ReleaseBuffer( buffer );
+ }
+
+ buffer = ReadBuffer(r, P_NEW);
+ }
+
+ return buffer;
+}
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * gistvacuum.c
+ * interface routines for the postgres GiST index access method.
+ *
+ *
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.1 2005/06/20 10:29:36 teodor Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/gist_private.h"
+#include "access/gistscan.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "utils/memutils.h"
+#include "storage/freespace.h"
+#include "storage/smgr.h"
+
+/* filled by gistbulkdelete, cleared by gistvacuumpcleanup */
+static bool needFullVacuum = false;
+
+
+typedef struct {
+ GISTSTATE giststate;
+ Relation index;
+ MemoryContext opCtx;
+ IndexBulkDeleteResult *result;
+
+ /* path to root */
+ BlockNumber *path;
+ int pathlen;
+ int curpathlen;
+} GistVacuum;
+
+static void
+shiftPath(GistVacuum *gv, BlockNumber blkno) {
+ if ( gv->pathlen == 0 ) {
+ gv->pathlen = 8;
+ gv->path = (BlockNumber*) palloc( MAXALIGN(sizeof(BlockNumber)*gv->pathlen) );
+ } else if ( gv->pathlen == gv->curpathlen ) {
+ gv->pathlen *= 2;
+ gv->path = (BlockNumber*) repalloc( gv->path, MAXALIGN(sizeof(BlockNumber)*gv->pathlen) );
+ }
+
+ if ( gv->curpathlen )
+ memmove( gv->path+1, gv->path, sizeof(BlockNumber)*gv->curpathlen );
+ gv->curpathlen++;
+ gv->path[0] = blkno;
+}
+
+static void
+unshiftPath(GistVacuum *gv) {
+ gv->curpathlen--;
+ if ( gv->curpathlen )
+ memmove( gv->path, gv->path+1, sizeof(BlockNumber)*gv->curpathlen );
+}
+
+typedef struct {
+ IndexTuple *itup;
+ int ituplen;
+ bool emptypage;
+} ArrayTuple;
+
+
+static ArrayTuple
+gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
+ ArrayTuple res = {NULL, 0, false};
+ Buffer buffer;
+ Page page;
+ OffsetNumber i, maxoff;
+ ItemId iid;
+ int lenaddon=4, curlenaddon=0, ntodelete=0;
+ IndexTuple idxtuple, *addon=NULL;
+ bool needwrite=false;
+ OffsetNumber *todelete=NULL;
+ ItemPointerData *completed=NULL;
+ int ncompleted=0, lencompleted=16;
+
+ buffer = ReadBuffer(gv->index, blkno);
+ page = (Page) BufferGetPage(buffer);
+ maxoff = PageGetMaxOffsetNumber(page);
+
+
+ if ( GistPageIsLeaf(page) ) {
+ if ( GistTuplesDeleted(page) ) {
+ needunion = needwrite = true;
+ GistClearTuplesDeleted(page);
+ }
+ } else {
+ todelete = (OffsetNumber*)palloc( MAXALIGN(sizeof(OffsetNumber)*(maxoff+1)) );
+ completed = (ItemPointerData*)palloc( sizeof(ItemPointerData)*lencompleted );
+ addon=(IndexTuple*)palloc(sizeof(IndexTuple)*lenaddon);
+
+ shiftPath(gv, blkno);
+ for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
+ ArrayTuple chldtuple;
+ bool needchildunion;
+
+ iid = PageGetItemId(page, i);
+ idxtuple = (IndexTuple) PageGetItem(page, iid);
+ needchildunion = (GistTupleIsInvalid(idxtuple)) ? true : false;
+
+ if ( needchildunion )
+ elog(DEBUG2,"gistVacuumUpdate: Need union for block %u", ItemPointerGetBlockNumber(&(idxtuple->t_tid)));
+
+ chldtuple = gistVacuumUpdate( gv, ItemPointerGetBlockNumber(&(idxtuple->t_tid)),
+ needchildunion );
+ if ( chldtuple.ituplen || chldtuple.emptypage ) {
+ /* adjust any scans that will be affected by this deletion */
+ gistadjscans(gv->index, GISTOP_DEL, blkno, i);
+ PageIndexTupleDelete(page, i);
+ todelete[ ntodelete++ ] = i;
+ i--; maxoff--;
+ needwrite=needunion=true;
+
+ if ( chldtuple.ituplen ) {
+ while( curlenaddon + chldtuple.ituplen >= lenaddon ) {
+ lenaddon*=2;
+ addon=(IndexTuple*)repalloc( addon, sizeof(IndexTuple)*lenaddon );
+ }
+
+ memcpy( addon + curlenaddon, chldtuple.itup, chldtuple.ituplen * sizeof(IndexTuple) );
+
+ curlenaddon += chldtuple.ituplen;
+
+ if ( chldtuple.ituplen > 1 ) {
+ /* child was splitted, so we need mark completion insert(split) */
+ int j;
+
+ while( ncompleted + chldtuple.ituplen > lencompleted ) {
+ lencompleted*=2;
+ completed = (ItemPointerData*)repalloc(completed, sizeof(ItemPointerData) * lencompleted);
+ }
+ for(j=0;j<chldtuple.ituplen;j++) {
+ ItemPointerCopy( &(chldtuple.itup[j]->t_tid), completed + ncompleted );
+ ncompleted++;
+ }
+ }
+ pfree( chldtuple.itup );
+ }
+ }
+ }
+
+ if ( curlenaddon ) {
+ /* insert updated tuples */
+ if (gistnospace(page, addon, curlenaddon)) {
+ /* there is no space on page to insert tuples */
+ IndexTuple *vec;
+ SplitedPageLayout *dist=NULL,*ptr;
+ int i;
+ MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
+
+ vec = gistextractbuffer(buffer, &(res.ituplen));
+ vec = gistjoinvector(vec, &(res.ituplen), addon, curlenaddon);
+ res.itup = gistSplit(gv->index, buffer, vec, &(res.ituplen), &dist, &(gv->giststate));
+ MemoryContextSwitchTo(oldCtx);
+
+ vec = (IndexTuple*)palloc( sizeof(IndexTuple) * res.ituplen );
+ for(i=0;i<res.ituplen;i++) {
+ vec[i] = (IndexTuple)palloc( IndexTupleSize(res.itup[i]) );
+ memcpy( vec[i], res.itup[i], IndexTupleSize(res.itup[i]) );
+ }
+ res.itup = vec;
+
+ if ( !gv->index->rd_istemp ) {
+ XLogRecPtr recptr;
+ XLogRecData *rdata;
+ ItemPointerData key; /* set key for incomplete insert */
+
+ ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
+
+ oldCtx = MemoryContextSwitchTo(gv->opCtx);
+
+ /* path is need to recovery because there is new pages, in a case of
+ crash it's needed to add inner tuple pointers on parent page */
+ rdata = formSplitRdata(gv->index->rd_node, blkno,
+ todelete, ntodelete, addon, curlenaddon,
+ &key, gv->path, gv->curpathlen, dist);
+
+ MemoryContextSwitchTo(oldCtx);
+
+ START_CRIT_SECTION();
+
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
+ ptr = dist;
+ while(ptr) {
+ PageSetLSN(BufferGetPage(ptr->buffer), recptr);
+ PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
+ ptr=ptr->next;
+ }
+
+ END_CRIT_SECTION();
+
+ }
+
+ ptr = dist;
+ while(ptr) {
+ WriteBuffer(ptr->buffer);
+ ptr=ptr->next;
+ }
+
+ if ( blkno == GIST_ROOT_BLKNO ) {
+ ItemPointerData key; /* set key for incomplete insert */
+
+ ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
+
+ oldCtx = MemoryContextSwitchTo(gv->opCtx);
+ gistnewroot(gv->index, res.itup, res.ituplen, &key);
+ MemoryContextSwitchTo(oldCtx);
+ }
+
+ needwrite=false;
+
+ MemoryContextReset(gv->opCtx);
+
+ needunion = false; /* gistSplit already forms unions */
+ } else {
+ OffsetNumber off = (PageIsEmpty(page)) ?
+ FirstOffsetNumber
+ :
+ OffsetNumberNext(PageGetMaxOffsetNumber(page));
+
+ /* enough free space */
+ gistfillbuffer(gv->index, page, addon, curlenaddon, off);
+ }
+ }
+ unshiftPath(gv);
+ }
+
+ if ( needunion ) {
+ /* forms union for page or check empty*/
+ if ( PageIsEmpty(page) ) {
+ if ( blkno == GIST_ROOT_BLKNO ) {
+ needwrite=true;
+ GistPageSetLeaf( page );
+ } else {
+ needwrite=true;
+ res.emptypage=true;
+ GistPageSetDeleted( page );
+ gv->result->pages_deleted++;
+ }
+ } else {
+ IndexTuple *vec, tmp;
+ int veclen=0;
+ MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
+
+ vec = gistextractbuffer(buffer, &veclen);
+ tmp = gistunion(gv->index, vec, veclen, &(gv->giststate));
+ MemoryContextSwitchTo(oldCtx);
+
+ res.itup=(IndexTuple*)palloc( sizeof(IndexTuple) );
+ res.ituplen = 1;
+ res.itup[0] = (IndexTuple)palloc( IndexTupleSize(tmp) );
+ memcpy( res.itup[0], tmp, IndexTupleSize(tmp) );
+
+ ItemPointerSetBlockNumber(&(res.itup[0]->t_tid), blkno);
+ GistTupleSetValid( res.itup[0] );
+
+ MemoryContextReset(gv->opCtx);
+ }
+ }
+
+ if ( needwrite ) {
+ if ( !gv->index->rd_istemp ) {
+ XLogRecData *rdata;
+ XLogRecPtr recptr;
+ MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
+
+ /* In a vacuum, it's not need to push path, because
+ there is no new inserted keys */
+ rdata = formUpdateRdata(gv->index->rd_node, blkno, todelete, ntodelete,
+ res.emptypage, addon, curlenaddon, NULL, NULL, 0);
+ MemoryContextSwitchTo(oldCtx);
+
+
+ START_CRIT_SECTION();
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ END_CRIT_SECTION();
+ MemoryContextReset(gv->opCtx);
+ }
+ WriteBuffer( buffer );
+ } else
+ ReleaseBuffer( buffer );
+
+ if ( ncompleted && !gv->index->rd_istemp )
+ gistxlogInsertCompletion( gv->index->rd_node, completed, ncompleted );
+
+ for(i=0;i<curlenaddon;i++)
+ pfree( addon[i] );
+ if (addon) pfree(addon);
+ if (todelete) pfree(todelete);
+ if (completed) pfree(completed);
+ return res;
+}
+
+/*
+ * For usial vacuum just update FSM, for full vacuum
+ * reforms parent tuples if some of childs was deleted or changed,
+ * update invalid tuples (they can exsist from last crash recovery only),
+ * tries to get smaller index
+ */
+
+Datum
+gistvacuumcleanup(PG_FUNCTION_ARGS) {
+ Relation rel = (Relation) PG_GETARG_POINTER(0);
+ IndexVacuumCleanupInfo *info = (IndexVacuumCleanupInfo *) PG_GETARG_POINTER(1);
+ IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(2);
+ BlockNumber npages, blkno;
+ BlockNumber nFreePages, *freePages, maxFreePages;
+ BlockNumber lastBlock = GIST_ROOT_BLKNO, lastFilledBlock = GIST_ROOT_BLKNO;
+
+ /* LockRelation(rel, AccessExclusiveLock); */
+
+ /* gistVacuumUpdate may cause hard work */
+ if ( info->vacuum_full ) {
+ GistVacuum gv;
+ ArrayTuple res;
+
+ gv.index = rel;
+ initGISTstate(&(gv.giststate), rel);
+ gv.opCtx = createTempGistContext();
+ gv.result = stats;
+
+ gv.path=NULL;
+ gv.pathlen = gv.curpathlen = 0;
+
+ /* walk through the entire index for update tuples */
+ res = gistVacuumUpdate( &gv, GIST_ROOT_BLKNO, false );
+ /* cleanup */
+ if (res.itup) {
+ int i;
+ for(i=0;i<res.ituplen;i++)
+ pfree( res.itup[i] );
+ pfree( res.itup );
+ }
+ if ( gv.path )
+ pfree( gv.path );
+ freeGISTstate(&(gv.giststate));
+ MemoryContextDelete(gv.opCtx);
+ } else if (needFullVacuum) {
+ elog(NOTICE,"It's desirable to vacuum full or reindex GiST index '%s' due to crash recovery",
+ RelationGetRelationName(rel));
+ }
+
+ needFullVacuum = false;
+
+ /* try to find deleted pages */
+ npages = RelationGetNumberOfBlocks(rel);
+ maxFreePages = RelationGetNumberOfBlocks(rel);
+ if ( maxFreePages > MaxFSMPages )
+ maxFreePages = MaxFSMPages;
+ nFreePages = 0;
+ freePages = (BlockNumber*) palloc (sizeof(BlockNumber) * maxFreePages);
+ for(blkno=GIST_ROOT_BLKNO+1;blkno<npages;blkno++) {
+ Buffer buffer = ReadBuffer(rel, blkno);
+ Page page=(Page)BufferGetPage(buffer);
+
+ if ( GistPageIsDeleted(page) ) {
+ if (nFreePages < maxFreePages) {
+ freePages[ nFreePages ] = blkno;
+ nFreePages++;
+ }
+ } else
+ lastFilledBlock = blkno;
+ ReleaseBuffer(buffer);
+ }
+ lastBlock = npages-1;
+
+ if ( nFreePages > 0 ) {
+ if ( info->vacuum_full ) { /* try to truncate index */
+ int i;
+ for(i=0;i<nFreePages;i++)
+ if ( freePages[i] >= lastFilledBlock ) {
+ nFreePages = i;
+ break;
+ }
+
+ if ( lastBlock > lastFilledBlock )
+ RelationTruncate( rel, lastFilledBlock+1 );
+ stats->pages_removed = lastBlock - lastFilledBlock;
+ }
+
+ if ( nFreePages > 0 )
+ RecordIndexFreeSpace( &rel->rd_node, nFreePages, freePages );
+ }
+ pfree( freePages );
+
+ /* return statistics */
+ stats->pages_free = nFreePages;
+ stats->num_pages = RelationGetNumberOfBlocks(rel);
+
+ /* UnlockRelation(rel, AccessExclusiveLock); */
+
+ PG_RETURN_POINTER(stats);
+}
+
+typedef struct GistBDItem {
+ BlockNumber blkno;
+ struct GistBDItem *next;
+} GistBDItem;
+
+/*
+ * Bulk deletion of all index entries pointing to a set of heap tuples and
+ * update invalid tuples after crash recovery.
+ * The set of target tuples is specified via a callback routine that tells
+ * whether any given heap tuple (identified by ItemPointer) is being deleted.
+ *
+ * Result: a palloc'd struct containing statistical info for VACUUM displays.
+ */
+Datum
+gistbulkdelete(PG_FUNCTION_ARGS) {
+ Relation rel = (Relation) PG_GETARG_POINTER(0);
+ IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1);
+ void* callback_state = (void *) PG_GETARG_POINTER(2);
+ IndexBulkDeleteResult *result = (IndexBulkDeleteResult*)palloc0(sizeof(IndexBulkDeleteResult));
+ GistBDItem *stack, *ptr;
+ MemoryContext opCtx = createTempGistContext();
+
+ stack = (GistBDItem*) palloc(sizeof(GistBDItem));
+
+ stack->blkno = GIST_ROOT_BLKNO;
+ stack->next = NULL;
+ needFullVacuum = false;
+
+ while( stack ) {
+ Buffer buffer = ReadBuffer(rel, stack->blkno);
+ Page page = (Page) BufferGetPage(buffer);
+ OffsetNumber i, maxoff = PageGetMaxOffsetNumber(page);
+ IndexTuple idxtuple;
+ ItemId iid;
+ OffsetNumber *todelete = NULL;
+ int ntodelete = 0;
+
+ if ( GistPageIsLeaf(page) ) {
+ ItemPointerData heapptr;
+
+ todelete = (OffsetNumber*)palloc( MAXALIGN(sizeof(OffsetNumber)*maxoff) );
+
+ for(i=FirstOffsetNumber;i<=maxoff;i=OffsetNumberNext(i)) {
+ iid = PageGetItemId(page, i);
+ idxtuple = (IndexTuple) PageGetItem(page, iid);
+ heapptr = idxtuple->t_tid;
+
+ if ( callback(&heapptr, callback_state) ) {
+ gistadjscans(rel, GISTOP_DEL, stack->blkno, i);
+ PageIndexTupleDelete(page, i);
+ todelete[ ntodelete++ ] = i;
+ i--; maxoff--;
+ result->tuples_removed += 1;
+ } else
+ result->num_index_tuples += 1;
+ }
+ } else {
+ for(i=FirstOffsetNumber;i<=maxoff;i=OffsetNumberNext(i)) {
+ iid = PageGetItemId(page, i);
+ idxtuple = (IndexTuple) PageGetItem(page, iid);
+
+ ptr = (GistBDItem*) palloc(sizeof(GistBDItem));
+ ptr->blkno = ItemPointerGetBlockNumber( &(idxtuple->t_tid) );
+ ptr->next = stack->next;
+ stack->next = ptr;
+
+ if ( GistTupleIsInvalid(idxtuple) )
+ needFullVacuum = true;
+ }
+ }
+
+ if ( ntodelete && todelete ) {
+ GistMarkTuplesDeleted(page);
+
+ if (!rel->rd_istemp ) {
+ XLogRecData *rdata;
+ XLogRecPtr recptr;
+ MemoryContext oldCtx = MemoryContextSwitchTo(opCtx);
+
+ rdata = formUpdateRdata(rel->rd_node, stack->blkno, todelete, ntodelete,
+ false, NULL, 0, NULL, NULL, 0);
+ MemoryContextSwitchTo(oldCtx);
+
+ START_CRIT_SECTION();
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ END_CRIT_SECTION();
+
+ MemoryContextReset(opCtx);
+ }
+
+ WriteBuffer( buffer );
+ } else
+ ReleaseBuffer( buffer );
+
+ if ( todelete )
+ pfree( todelete );
+
+ ptr = stack->next;
+ pfree( stack );
+ stack = ptr;
+ }
+
+ MemoryContextDelete( opCtx );
+
+ result->num_pages = RelationGetNumberOfBlocks(rel);
+
+
+ PG_RETURN_POINTER( result );
+}
+
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.1 2005/06/14 11:45:13 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.2 2005/06/20 10:29:36 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "utils/memutils.h"
+
typedef struct {
gistxlogEntryUpdate *data;
int len;
IndexTuple *itup;
BlockNumber *path;
+ OffsetNumber *todelete;
} EntryUpdateRecord;
typedef struct {
NewPage *page;
IndexTuple *itup;
BlockNumber *path;
+ OffsetNumber *todelete;
} PageSplitRecord;
/* track for incomplete inserts, idea was taken from nbtxlog.c */
BlockNumber *blkno;
int pathlen;
BlockNumber *path;
+ XLogRecPtr lsn;
} gistIncompleteInsert;
#define ItemPointerEQ( a, b ) \
( \
- ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(a) && \
+ ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \
)
static void
-pushIncompleteInsert(RelFileNode node, ItemPointerData key,
+pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
BlockNumber *blkno, int lenblk,
BlockNumber *path, int pathlen,
PageSplitRecord *xlinfo /* to extract blkno info */ ) {
ninsert->node = node;
ninsert->key = key;
+ ninsert->lsn = lsn;
if ( lenblk && blkno ) {
ninsert->lenblk = lenblk;
}
Assert( ninsert->lenblk>0 );
- if ( path && ninsert->pathlen ) {
+ if ( path && pathlen ) {
ninsert->pathlen = pathlen;
ninsert->path = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->pathlen );
memcpy(ninsert->path, path, sizeof(BlockNumber)*ninsert->pathlen);
decoded->data = (gistxlogEntryUpdate*)begin;
if ( decoded->data->pathlen ) {
- addpath = sizeof(BlockNumber) * decoded->data->pathlen;
+ addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen );
decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
} else
decoded->path = NULL;
+ if ( decoded->data->ntodelete ) {
+ decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogEntryUpdate ) + addpath);
+ addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete );
+ } else
+ decoded->todelete = NULL;
+
decoded->len=0;
ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath;
while( ptr - begin < record->xl_len ) {
}
}
-
+/*
+ * redo any page update (except page split)
+ */
static void
gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
EntryUpdateRecord xlrec;
}
}
- if ( isnewroot )
- GISTInitBuffer(buffer, 0);
- else if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber )
- PageIndexTupleDelete(page, xlrec.data->todeleteoffnum);
+ if ( xlrec.data->isemptypage ) {
+ while( !PageIsEmpty(page) )
+ PageIndexTupleDelete( page, FirstOffsetNumber );
+
+ if ( xlrec.data->blkno == GIST_ROOT_BLKNO )
+ GistPageSetLeaf( page );
+ else
+ GistPageSetDeleted( page );
+ } else {
+ if ( isnewroot )
+ GISTInitBuffer(buffer, 0);
+ else if ( xlrec.data->ntodelete ) {
+ int i;
+ for(i=0; i < xlrec.data->ntodelete ; i++)
+ PageIndexTupleDelete(page, xlrec.todelete[i]);
+ if ( GistPageIsLeaf(page) )
+ GistMarkTuplesDeleted(page);
+ }
- /* add tuples */
- if ( xlrec.len > 0 ) {
- OffsetNumber off = (PageIsEmpty(page)) ?
- FirstOffsetNumber
- :
- OffsetNumberNext(PageGetMaxOffsetNumber(page));
+ /* add tuples */
+ if ( xlrec.len > 0 ) {
+ OffsetNumber off = (PageIsEmpty(page)) ?
+ FirstOffsetNumber
+ :
+ OffsetNumberNext(PageGetMaxOffsetNumber(page));
- gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off);
+ gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off);
+ }
+
+ /* special case: leafpage, nothing to insert, nothing to delete, then
+ vacuum marks page */
+ if ( GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0 )
+ GistClearTuplesDeleted(page);
}
PageSetLSN(page, lsn);
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO )
- pushIncompleteInsert(xlrec.data->node, xlrec.data->key,
+ pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
&(xlrec.data->blkno), 1,
xlrec.path, xlrec.data->pathlen,
NULL);
decoded->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * decoded->data->nitup );
if ( decoded->data->pathlen ) {
- addpath = sizeof(BlockNumber) * decoded->data->pathlen;
- decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
+ addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen );
+ decoded->path = (BlockNumber*)(begin+sizeof( gistxlogPageSplit ));
} else
decoded->path = NULL;
+ if ( decoded->data->ntodelete ) {
+ decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogPageSplit ) + addpath);
+ addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete );
+ } else
+ decoded->todelete = NULL;
+
ptr=begin+sizeof( gistxlogPageSplit ) + addpath;
for(i=0;i<decoded->data->nitup;i++) {
Assert( ptr - begin < record->xl_len );
return;
}
- if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber )
- PageIndexTupleDelete(page, xlrec.data->todeleteoffnum);
+ if ( xlrec.data->ntodelete ) {
+ int i;
+ for(i=0; i < xlrec.data->ntodelete ; i++)
+ PageIndexTupleDelete(page, xlrec.todelete[i]);
+ }
itup = gistextractbuffer(buffer, &len);
itup = gistjoinvector(itup, &len, xlrec.itup, xlrec.data->nitup);
institup = (IndexTuple*)palloc( sizeof(IndexTuple) * len );
opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
+ /* read and fill all pages */
for(i=0;i<xlrec.data->npage;i++) {
int j;
NewPage *newpage = xlrec.page + i;
- /* prepare itup vector */
+ /* prepare itup vector per page */
for(j=0;j<newpage->header->num;j++)
institup[j] = itup[ newpage->offnum[j] - 1 ];
if (!BufferIsValid(newpage->buffer))
elog(PANIC, "gistRedoPageSplitRecord: lost page");
newpage->page = (Page) BufferGetPage(newpage->buffer);
- if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(newpage->page))) {
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
+ if (!PageIsNew((PageHeader) newpage->page) && XLByteLE(lsn, PageGetLSN(newpage->page))) {
+ LockBuffer(newpage->buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(newpage->buffer);
newpage->is_ok=true;
continue; /* good page */
} else {
if ( incomplete_inserts != NIL )
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
- pushIncompleteInsert(xlrec.data->node, xlrec.data->key,
+ pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
NULL, 0,
xlrec.path, xlrec.data->pathlen,
&xlrec);
WriteBuffer(buffer);
}
+static void
+gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record) {
+ char *begin = XLogRecGetData(record), *ptr;
+ gistxlogInsertComplete *xlrec;
+
+ xlrec = (gistxlogInsertComplete*)begin;
+
+ ptr = begin + sizeof( gistxlogInsertComplete );
+ while( ptr - begin < record->xl_len ) {
+ Assert( record->xl_len - (ptr - begin) >= sizeof(ItemPointerData) );
+ forgetIncompleteInsert( xlrec->node, *((ItemPointerData*)ptr) );
+ ptr += sizeof(ItemPointerData);
+ }
+}
+
void
gist_redo(XLogRecPtr lsn, XLogRecord *record)
{
gistRedoCreateIndex(lsn, record);
break;
case XLOG_GIST_INSERT_COMPLETE:
- forgetIncompleteInsert( ((gistxlogInsertComplete*)XLogRecGetData(record))->node,
- ((gistxlogInsertComplete*)XLogRecGetData(record))->key );
+ gistRedoCompleteInsert(lsn, record);
break;
default:
elog(PANIC, "gist_redo: unknown op code %u", info);
static void
out_gistxlogEntryUpdate(char *buf, gistxlogEntryUpdate *xlrec) {
out_target(buf, xlrec->node, xlrec->key);
- sprintf(buf + strlen(buf), "; block number %u; update offset %u;",
- xlrec->blkno, xlrec->todeleteoffnum);
+ sprintf(buf + strlen(buf), "; block number %u",
+ xlrec->blkno);
}
static void
out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) {
strcat(buf, "page_split: ");
out_target(buf, xlrec->node, xlrec->key);
- sprintf(buf + strlen(buf), "; block number %u; update offset %u; add %d tuples; split to %d pages",
- xlrec->origblkno, xlrec->todeleteoffnum,
+ sprintf(buf + strlen(buf), "; block number %u; add %d tuples; split to %d pages",
+ xlrec->origblkno,
xlrec->nitup, xlrec->npage);
}
((RelFileNode*)rec)->relNode);
break;
case XLOG_GIST_INSERT_COMPLETE:
- strcat(buf, "insert_complete: ");
- out_target(buf, ((gistxlogInsertComplete*)rec)->node, ((gistxlogInsertComplete*)rec)->key);
+ sprintf(buf + strlen(buf), "complete_insert: rel %u/%u/%u",
+ ((gistxlogInsertComplete*)rec)->node.spcNode,
+ ((gistxlogInsertComplete*)rec)->node.dbNode,
+ ((gistxlogInsertComplete*)rec)->node.relNode);
default:
elog(PANIC, "gist_desc: unknown op code %u", info);
}
}
+IndexTuple
+gist_form_invalid_tuple(BlockNumber blkno) {
+ /* we don't alloc space for null's bitmap, this is invalid tuple,
+ be carefull in read and write code */
+ Size size = IndexInfoFindDataOffset(0);
+ IndexTuple tuple=(IndexTuple)palloc0( size );
+
+ tuple->t_info |= size;
+
+ ItemPointerSetBlockNumber(&(tuple->t_tid), blkno);
+ GistTupleSetInvalid( tuple );
+
+ return tuple;
+}
-#ifdef GIST_INCOMPLETE_INSERT
static void
gistContinueInsert(gistIncompleteInsert *insert) {
- GISTSTATE giststate;
- GISTInsertState state;
- int i;
+ IndexTuple *itup;
+ int i, lenitup;
MemoryContext oldCxt;
+ Relation index;
+
oldCxt = MemoryContextSwitchTo(opCtx);
- state.r = XLogOpenRelation(insert->node);
- if (!RelationIsValid(state.r))
+ index = XLogOpenRelation(insert->node);
+ if (!RelationIsValid(index))
return;
- initGISTstate(&giststate, state.r);
+ elog(LOG,"Detected incomplete insert into GiST index %u/%u/%u; It's desirable to vacuum or reindex index",
+ insert->node.spcNode, insert->node.dbNode, insert->node.relNode);
- state.needInsertComplete=false;
- ItemPointerSetInvalid( &(state.key) );
- state.path=NULL;
- state.pathlen=0;
- state.xlog_mode = true;
+ /* needed vector itup never will be more than initial lenblkno+2,
+ because during this processing Indextuple can be only smaller */
+ lenitup = insert->lenblk;
+ itup = (IndexTuple*)palloc(sizeof(IndexTuple)*(lenitup+2 /*guarantee root split*/));
- /* form union tuples */
- state.itup = (IndexTuple*)palloc(sizeof(IndexTuple)*insert->lenblk);
- state.ituplen = insert->lenblk;
- for(i=0;i<insert->lenblk;i++) {
- int len=0;
- IndexTuple *itup;
- Buffer buffer;
- Page page;
+ for(i=0;i<insert->lenblk;i++)
+ itup[i] = gist_form_invalid_tuple( insert->blkno[i] );
- buffer = XLogReadBuffer(false, state.r, insert->blkno[i]);
- if (!BufferIsValid(buffer))
- elog(PANIC, "gistContinueInsert: block unfound");
- page = (Page) BufferGetPage(buffer);
- if ( PageIsNew((PageHeader)page) )
- elog(PANIC, "gistContinueInsert: uninitialized page");
+ if ( insert->pathlen==0 ) {
+ /*it was split root, so we should only make new root*/
+ Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO);
+ Page page;
- itup = gistextractbuffer(buffer, &len);
- state.itup[i] = gistunion(state.r, itup, len, &giststate);
+ if (!BufferIsValid(buffer))
+ elog(PANIC, "gistContinueInsert: root block unfound");
- ItemPointerSet( &(state.itup[i]->t_tid), insert->blkno[i], FirstOffsetNumber );
-
+ GISTInitBuffer(buffer, 0);
+ page = BufferGetPage(buffer);
+ gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
- }
-
- if ( insert->pathlen==0 ) {
- /*it was split root, so we should only make new root*/
- gistnewroot(state.r, state.itup, state.ituplen, &(state.key), true);
- MemoryContextSwitchTo(oldCxt);
- MemoryContextReset(opCtx);
- return;
- }
+ WriteBuffer(buffer);
+ } else {
+ Buffer *buffers;
+ Page *pages;
+ int numbuffer;
+
+ buffers= (Buffer*) palloc( sizeof(Buffer) * (insert->lenblk+2/*guarantee root split*/) );
+ pages = (Page*) palloc( sizeof(Page ) * (insert->lenblk+2/*guarantee root split*/) );
- /* form stack */
- state.stack=NULL;
- for(i=0;i<insert->pathlen;i++) {
- int j,len=0;
- IndexTuple *itup;
- GISTInsertStack *top = (GISTInsertStack*)palloc( sizeof(GISTInsertStack) );
-
- top->blkno = insert->path[i];
- top->buffer = XLogReadBuffer(false, state.r, top->blkno);
- if (!BufferIsValid(top->buffer))
- elog(PANIC, "gistContinueInsert: block unfound");
- top->page = (Page) BufferGetPage(top->buffer);
- if ( PageIsNew((PageHeader)(top->page)) )
- elog(PANIC, "gistContinueInsert: uninitialized page");
-
- top->todelete = false;
-
- /* find childoffnum */
- itup = gistextractbuffer(top->buffer, &len);
- top->childoffnum=InvalidOffsetNumber;
- for(j=0;j<len && top->childoffnum==InvalidOffsetNumber;j++) {
- BlockNumber blkno = ItemPointerGetBlockNumber( &(itup[j]->t_tid) );
+ for(i=0;i<insert->pathlen;i++) {
+ int j, k, pituplen=0, childfound=0;
+
+ numbuffer=1;
+ buffers[numbuffer-1] = XLogReadBuffer(false, index, insert->path[i]);
+ if (!BufferIsValid(buffers[numbuffer-1]))
+ elog(PANIC, "gistContinueInsert: block %u unfound", insert->path[i]);
+ pages[numbuffer-1] = BufferGetPage( buffers[numbuffer-1] );
+ if ( PageIsNew((PageHeader)(pages[numbuffer-1])) )
+ elog(PANIC, "gistContinueInsert: uninitialized page");
+
+ pituplen = PageGetMaxOffsetNumber(pages[numbuffer-1]);
- if ( i==0 ) {
- int k;
- for(k=0;k<insert->lenblk;k++)
- if ( insert->blkno[k] == blkno ) {
- top->childoffnum = j+1;
+ /* remove old IndexTuples */
+ for(j=0;j<pituplen && childfound<lenitup;j++) {
+ BlockNumber blkno;
+ ItemId iid = PageGetItemId(pages[numbuffer-1], j+FirstOffsetNumber);
+ IndexTuple idxtup = (IndexTuple) PageGetItem(pages[numbuffer-1], iid);
+
+ blkno = ItemPointerGetBlockNumber( &(idxtup->t_tid) );
+
+ for(k=0;k<lenitup;k++)
+ if ( ItemPointerGetBlockNumber( &(itup[k]->t_tid) ) == blkno ) {
+ PageIndexTupleDelete(pages[numbuffer-1], j+FirstOffsetNumber);
+ j--; pituplen--;
+ childfound++;
break;
}
- } else if ( insert->path[i-1]==blkno )
- top->childoffnum = j+1;
- }
+ }
- if ( top->childoffnum==InvalidOffsetNumber ) {
- elog(WARNING, "gistContinueInsert: unknown parent, REINDEX GiST Indexes");
- return;
+ if ( gistnospace(pages[numbuffer-1], itup, lenitup) ) {
+ /* no space left on page, so we should split */
+ buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW);
+ if (!BufferIsValid(buffers[numbuffer]))
+ elog(PANIC, "gistContinueInsert: can't create new block");
+ GISTInitBuffer(buffers[numbuffer], 0);
+ pages[numbuffer] = BufferGetPage( buffers[numbuffer] );
+ gistfillbuffer( index, pages[numbuffer], itup, lenitup, FirstOffsetNumber );
+ numbuffer++;
+
+ if ( BufferGetBlockNumber( buffers[0] ) == GIST_ROOT_BLKNO ) {
+ IndexTuple *parentitup;
+
+ parentitup = gistextractbuffer(buffers[numbuffer-1], &pituplen);
+
+ /* we split root, just copy tuples from old root to new page */
+ if ( i+1 != insert->pathlen )
+ elog(PANIC,"gistContinueInsert: can't restore index '%s'",
+ RelationGetRelationName( index ));
+
+ /* fill new page */
+ buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW);
+ if (!BufferIsValid(buffers[numbuffer]))
+ elog(PANIC, "gistContinueInsert: can't create new block");
+ GISTInitBuffer(buffers[numbuffer], 0);
+ pages[numbuffer] = BufferGetPage( buffers[numbuffer] );
+ gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber);
+ numbuffer++;
+
+ /* fill root page */
+ GISTInitBuffer(buffers[0], 0);
+ for(j=1;j<numbuffer;j++) {
+ IndexTuple tuple = gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) );
+ if ( InvalidOffsetNumber == PageAddItem(pages[0],
+ (Item)tuple,
+ IndexTupleSize( tuple ),
+ (OffsetNumber)j,
+ LP_USED) )
+ elog( PANIC,"gistContinueInsert: can't restore index '%s'",
+ RelationGetRelationName( index ));
+ }
+ }
+ } else
+ gistfillbuffer( index, pages[numbuffer-1], itup, lenitup,
+ (PageIsEmpty(pages[numbuffer-1])) ?
+ FirstOffsetNumber : OffsetNumberNext(PageGetMaxOffsetNumber(pages[numbuffer-1])) );
+
+ lenitup=numbuffer;
+ for(j=0;j<numbuffer;j++) {
+ itup[j]=gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) );
+ PageSetLSN(pages[j], insert->lsn);
+ PageSetTLI(pages[j], ThisTimeLineID);
+ LockBuffer(buffers[j], BUFFER_LOCK_UNLOCK);
+ WriteBuffer( buffers[j] );
+ }
}
-
- if ( i==0 )
- PageIndexTupleDelete(top->page, top->childoffnum);
-
- /* install item on right place in stack */
- top->parent=NULL;
- if ( state.stack ) {
- GISTInsertStack *ptr = state.stack;
- while( ptr->parent )
- ptr = ptr->parent;
- ptr->parent=top;
- } else
- state.stack = top;
}
- /* Good. Now we can continue insert */
-
- gistmakedeal(&state, &giststate);
-
MemoryContextSwitchTo(oldCxt);
MemoryContextReset(opCtx);
}
-#endif
void
gist_xlog_startup(void) {
incomplete_inserts=NIL;
insertCtx = AllocSetContextCreate(CurrentMemoryContext,
- "GiST insert in xlog temporary context",
+ "GiST recovery temporary context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
foreach(l, incomplete_inserts) {
gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l);
- char buf[1024];
-
- *buf='\0';
- out_target(buf, insert->node, insert->key);
- elog(LOG,"Incomplete insert: %s; It's needed to reindex", buf);
-#ifdef GIST_INCOMPLETE_INSERT
gistContinueInsert(insert);
-#endif
}
MemoryContextDelete(opCtx);
MemoryContextDelete(insertCtx);
}
+
+XLogRecData *
+formSplitRdata(RelFileNode node, BlockNumber blkno,
+ OffsetNumber *todelete, int ntodelete,
+ IndexTuple *itup, int ituplen, ItemPointer key,
+ BlockNumber *path, int pathlen, SplitedPageLayout *dist ) {
+
+ XLogRecData *rdata;
+ gistxlogPageSplit *xlrec = (gistxlogPageSplit*)palloc(sizeof(gistxlogPageSplit));
+ SplitedPageLayout *ptr;
+ int npage = 0, cur=1, i;
+
+ ptr=dist;
+ while( ptr ) {
+ npage++;
+ ptr=ptr->next;
+ }
+
+ rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + ituplen + 3));
+
+ xlrec->node = node;
+ xlrec->origblkno = blkno;
+ xlrec->npage = (uint16)npage;
+ xlrec->nitup = (uint16)ituplen;
+ xlrec->ntodelete = (uint16)ntodelete;
+ xlrec->pathlen = (uint16)pathlen;
+ if ( key )
+ xlrec->key = *key;
+ else
+ ItemPointerSetInvalid( &(xlrec->key) );
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) xlrec;
+ rdata[0].len = sizeof( gistxlogPageSplit );
+ rdata[0].next = NULL;
+
+ if ( pathlen ) {
+ rdata[cur-1].next = &(rdata[cur]);
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)path;
+ rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen);
+ rdata[cur].next = NULL;
+ cur++;
+ }
+
+ if ( ntodelete ) {
+ rdata[cur-1].next = &(rdata[cur]);
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)todelete;
+ rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ntodelete);
+ rdata[cur].next = NULL;
+ cur++;
+ }
+
+ /* new tuples */
+ for(i=0;i<ituplen;i++) {
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)(itup[i]);
+ rdata[cur].len = IndexTupleSize(itup[i]);
+ rdata[cur].next = NULL;
+ rdata[cur-1].next = &(rdata[cur]);
+ cur++;
+ }
+
+ ptr=dist;
+ while(ptr) {
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)&(ptr->block);
+ rdata[cur].len = sizeof(gistxlogPage);
+ rdata[cur-1].next = &(rdata[cur]);
+ cur++;
+
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)(ptr->list);
+ rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num);
+ if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num )
+ rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len );
+ rdata[cur-1].next = &(rdata[cur]);
+ rdata[cur].next=NULL;
+ cur++;
+ ptr=ptr->next;
+ }
+
+ return rdata;
+}
+
+
+XLogRecData *
+formUpdateRdata(RelFileNode node, BlockNumber blkno,
+ OffsetNumber *todelete, int ntodelete, bool emptypage,
+ IndexTuple *itup, int ituplen, ItemPointer key,
+ BlockNumber *path, int pathlen) {
+ XLogRecData *rdata;
+ gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate*)palloc(sizeof(gistxlogEntryUpdate));
+
+ xlrec->node = node;
+ xlrec->blkno = blkno;
+ if ( key )
+ xlrec->key = *key;
+ else
+ ItemPointerSetInvalid( &(xlrec->key) );
+
+ if ( emptypage ) {
+ xlrec->isemptypage = true;
+ xlrec->ntodelete = 0;
+ xlrec->pathlen = 0;
+
+ rdata = (XLogRecData*)palloc( sizeof(XLogRecData) );
+ rdata->buffer = InvalidBuffer;
+ rdata->data = (char*)xlrec;
+ rdata->len = sizeof(gistxlogEntryUpdate);
+ rdata->next = NULL;
+ } else {
+ int cur=1,i;
+
+ xlrec->isemptypage = false;
+ xlrec->ntodelete = ntodelete;
+ xlrec->pathlen = pathlen;
+
+ rdata = (XLogRecData*) palloc( sizeof(XLogRecData) * ( 3 + ituplen ) );
+
+ rdata->buffer = InvalidBuffer;
+ rdata->data = (char*)xlrec;
+ rdata->len = sizeof(gistxlogEntryUpdate);
+ rdata->next = NULL;
+
+ if ( pathlen ) {
+ rdata[cur-1].next = &(rdata[cur]);
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)path;
+ rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen);
+ rdata[cur].next = NULL;
+ cur++;
+ }
+
+ if ( ntodelete ) {
+ rdata[cur-1].next = &(rdata[cur]);
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)todelete;
+ rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ntodelete);
+ rdata[cur].next = NULL;
+ cur++;
+ }
+
+ /* new tuples */
+ for(i=0;i<ituplen;i++) {
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)(itup[i]);
+ rdata[cur].len = IndexTupleSize(itup[i]);
+ rdata[cur].next = NULL;
+ rdata[cur-1].next = &(rdata[cur]);
+ cur++;
+ }
+ }
+
+ return rdata;
+}
+
+XLogRecPtr
+gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len) {
+ gistxlogInsertComplete xlrec;
+ XLogRecData rdata[2];
+ XLogRecPtr recptr;
+
+ Assert(len>0);
+ xlrec.node = node;
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = sizeof( gistxlogInsertComplete );
+ rdata[0].next = &(rdata[1]);
+
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = (char *) keys;
+ rdata[1].len = sizeof( ItemPointerData ) * len;
+ rdata[1].next = NULL;
+
+ START_CRIT_SECTION();
+
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata);
+
+ END_CRIT_SECTION();
+
+ return recptr;
+}
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/access/gist.h,v 1.46 2005/05/17 03:34:18 neilc Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist.h,v 1.47 2005/06/20 10:29:36 teodor Exp $
*
*-------------------------------------------------------------------------
*/
* Page opaque data in a GiST index page.
*/
#define F_LEAF (1 << 0)
+#define F_DELETED (1 << 1)
+#define F_TUPLES_DELETED (1 << 2)
typedef struct GISTPageOpaqueData
{
* spl_left */
int spl_lattrsize[INDEX_MAX_KEYS];
bool spl_lisnull[INDEX_MAX_KEYS];
+ bool spl_leftvalid;
OffsetNumber *spl_right; /* array of entries that go right */
int spl_nright; /* size of the array */
* spl_right */
int spl_rattrsize[INDEX_MAX_KEYS];
bool spl_risnull[INDEX_MAX_KEYS];
+ bool spl_rightvalid;
int *spl_idgrp;
int *spl_ngrp; /* number in each group */
bool leafkey;
} GISTENTRY;
-#define GIST_LEAF(entry) (((GISTPageOpaque) PageGetSpecialPointer((entry)->page))->flags & F_LEAF)
+#define GistPageIsLeaf(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_LEAF)
+#define GIST_LEAF(entry) (GistPageIsLeaf((entry)->page))
+#define GistPageSetLeaf(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_LEAF)
+#define GistPageSetNonLeaf(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_LEAF)
+
+#define GistPageIsDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_DELETED)
+#define GistPageSetDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_DELETED)
+#define GistPageSetNonDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_DELETED)
+
+#define GistTuplesDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_TUPLES_DELETED)
+#define GistMarkTuplesDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_TUPLES_DELETED)
+#define GistClearTuplesDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_TUPLES_DELETED)
/*
* Vector of GISTENTRY structs; user-defined methods union and pick
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.3 2005/06/14 11:45:14 teodor Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.4 2005/06/20 10:29:36 teodor Exp $
*
*-------------------------------------------------------------------------
*/
int ituplen; /* length of itup */
GISTInsertStack *stack;
bool needInsertComplete;
- bool xlog_mode;
/* pointer to heap tuple */
ItemPointerData key;
RelFileNode node;
BlockNumber blkno;
- /* if todeleteoffnum!=InvalidOffsetNumber then delete it. */
- OffsetNumber todeleteoffnum;
+ uint16 ntodelete;
uint16 pathlen;
+ bool isemptypage;
/*
- * It used to identify compliteness of insert.
+ * It used to identify completeness of insert.
* Sets to leaf itup
*/
ItemPointerData key;
/* follow:
- * 1. path to root (BlockNumber)
- * 2. tuples to insert
+ * 1. path to root (BlockNumber)
+ * 2. todelete OffsetNumbers
+ * 3. tuples to insert
*/
} gistxlogEntryUpdate;
typedef struct gistxlogPageSplit {
RelFileNode node;
BlockNumber origblkno; /*splitted page*/
- OffsetNumber todeleteoffnum;
+ uint16 ntodelete;
uint16 pathlen;
- int npage;
- int nitup;
+ uint16 npage;
+ uint16 nitup;
/* see comments on gistxlogEntryUpdate */
ItemPointerData key;
/* follow:
* 1. path to root (BlockNumber)
- * 2. tuples to insert
- * 3. gistxlogPage and array of OffsetNumber per page
+ * 2. todelete OffsetNumbers
+ * 3. tuples to insert
+ * 4. gistxlogPage and array of OffsetNumber per page
*/
} gistxlogPageSplit;
typedef struct gistxlogInsertComplete {
RelFileNode node;
- ItemPointerData key;
+ /* follows ItemPointerData key to clean */
} gistxlogInsertComplete;
-#define XLOG_GIST_CREATE_INDEX 0x50
+#define XLOG_GIST_CREATE_INDEX 0x50
+
+/*
+ * mark tuples on inner pages during recovery
+ */
+#define TUPLE_IS_VALID 0xffff
+#define TUPLE_IS_INVALID 0xfffe
+
+#define GistTupleIsInvalid(itup) ( ItemPointerGetOffsetNumber( &((itup)->t_tid) ) == TUPLE_IS_INVALID )
+#define GistTupleSetValid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID )
+#define GistTupleSetInvalid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_INVALID )
/* gist.c */
extern Datum gistbuild(PG_FUNCTION_ARGS);
extern Datum gistinsert(PG_FUNCTION_ARGS);
-extern Datum gistbulkdelete(PG_FUNCTION_ARGS);
extern MemoryContext createTempGistContext(void);
extern void initGISTstate(GISTSTATE *giststate, Relation index);
extern void freeGISTstate(GISTSTATE *giststate);
-extern void gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode);
+extern void gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key);
extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate);
+typedef struct SplitedPageLayout {
+ gistxlogPage block;
+ OffsetNumber *list;
+ Buffer buffer; /* to write after all proceed */
+
+ struct SplitedPageLayout *next;
+} SplitedPageLayout;
+
+IndexTuple * gistSplit(Relation r, Buffer buffer, IndexTuple *itup,
+ int *len, SplitedPageLayout **dist, GISTSTATE *giststate);
/* gistxlog.c */
extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
extern void gist_desc(char *buf, uint8 xl_info, char *rec);
extern void gist_xlog_startup(void);
extern void gist_xlog_cleanup(void);
+extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno);
+
+extern XLogRecData* formUpdateRdata(RelFileNode node, BlockNumber blkno,
+ OffsetNumber *todelete, int ntodelete, bool emptypage,
+ IndexTuple *itup, int ituplen, ItemPointer key,
+ BlockNumber *path, int pathlen);
+
+extern XLogRecData* formSplitRdata(RelFileNode node, BlockNumber blkno,
+ OffsetNumber *todelete, int ntodelete,
+ IndexTuple *itup, int ituplen, ItemPointer key,
+ BlockNumber *path, int pathlen, SplitedPageLayout *dist );
+
+extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len);
/* gistget.c */
extern Datum gistgettuple(PG_FUNCTION_ARGS);
extern Datum gistgetmulti(PG_FUNCTION_ARGS);
/* gistutil.c */
+extern Buffer gistReadBuffer(Relation r, BlockNumber blkno);
extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup,
int len, OffsetNumber off);
extern bool gistnospace(Page page, IndexTuple *itvec, int len);
extern int gistfindgroup(GISTSTATE *giststate,
GISTENTRY *valvec, GIST_SPLITVEC *spl);
extern void gistadjsubkey(Relation r,
- IndexTuple *itup, int *len,
+ IndexTuple *itup, int len,
GIST_SPLITVEC *v,
GISTSTATE *giststate);
extern IndexTuple gistFormTuple(GISTSTATE *giststate,
IndexTuple tuple, Page p, OffsetNumber o,
GISTENTRY *attdata, bool *isnull);
extern void gistunionsubkey(Relation r, GISTSTATE *giststate,
- IndexTuple *itvec, GIST_SPLITVEC *spl);
+ IndexTuple *itvec, GIST_SPLITVEC *spl, bool isall);
extern void GISTInitBuffer(Buffer b, uint32 f);
extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
Datum k, Relation r, Page pg, OffsetNumber o,
int b, bool l, bool isNull);
+void gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
+ IndexTuple *itup, int len, GISTSTATE *giststate);
+
+/* gistvacuum.c */
+extern Datum gistbulkdelete(PG_FUNCTION_ARGS);
+extern Datum gistvacuumcleanup(PG_FUNCTION_ARGS);
#endif /* GIST_PRIVATE_H */
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.278 2005/06/18 19:33:42 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.279 2005/06/20 10:29:37 teodor Exp $
*
*-------------------------------------------------------------------------
*/
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 200506181
+#define CATALOG_VERSION_NO 200506201
#endif
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.34 2005/06/13 23:14:49 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.35 2005/06/20 10:29:37 teodor Exp $
*
* NOTES
* the genbki.sh script reads this file and generates .bki
DATA(insert OID = 405 ( hash 1 1 0 f f f f t hashinsert hashbeginscan hashgettuple hashgetmulti hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbulkdelete - hashcostestimate ));
DESCR("hash index access method");
#define HASH_AM_OID 405
-DATA(insert OID = 783 ( gist 100 7 0 f t f f f gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete - gistcostestimate ));
+DATA(insert OID = 783 ( gist 100 7 0 f t f f f gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate ));
DESCR("GiST index access method");
#define GIST_AM_OID 783
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/catalog/pg_proc.h,v 1.368 2005/06/17 22:32:48 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/pg_proc.h,v 1.369 2005/06/20 10:29:37 teodor Exp $
*
* NOTES
* The script catalog/genbki.sh reads this file and generates .bki
DESCR("gist(internal)");
DATA(insert OID = 776 ( gistbulkdelete PGNSP PGUID 12 f f t f v 3 2281 "2281 2281 2281" _null_ _null_ _null_ gistbulkdelete - _null_ ));
DESCR("gist(internal)");
+DATA(insert OID = 2561 ( gistvacuumcleanup PGNSP PGUID 12 f f t f v 3 2281 "2281 2281 2281" _null_ _null_ _null_ gistvacuumcleanup - _null_ ));
DATA(insert OID = 772 ( gistcostestimate PGNSP PGUID 12 f f t f v 7 2278 "2281 2281 2281 2281 2281 2281 2281" _null_ _null_ _null_ gistcostestimate - _null_ ));
DESCR("gist(internal)");