From: Teodor Sigaev Date: Mon, 20 Jun 2005 10:29:37 +0000 (+0000) Subject: 1. full functional WAL for GiST X-Git-Tag: REL8_1_0BETA1~512 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=d544ec8bbd70d50c1b50a00437b8061cabeeb5f2;p=postgresql 1. full functional WAL for GiST 2. improve vacuum for gist - use FSM - full vacuum: - reforms parent tuple if it's needed ( tuples was deleted on child page or parent tuple remains invalid after crash recovery ) - truncate index file if possible 3. fixes bugs and mistakes --- diff --git a/src/backend/access/gist/Makefile b/src/backend/access/gist/Makefile index b22f846a23..12f770ddb8 100644 --- a/src/backend/access/gist/Makefile +++ b/src/backend/access/gist/Makefile @@ -4,7 +4,7 @@ # Makefile for access/gist # # IDENTIFICATION -# $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.13 2005/06/14 11:45:13 teodor Exp $ +# $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.14 2005/06/20 10:29:36 teodor Exp $ # #------------------------------------------------------------------------- @@ -12,7 +12,7 @@ subdir = src/backend/access/gist top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = gist.o gistutil.o gistxlog.o gistget.o gistscan.o +OBJS = gist.o gistutil.o gistxlog.o gistvacuum.o gistget.o gistscan.o all: SUBSYS.o diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 4e3faccdf9..340f6b9b4f 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.119 2005/06/14 11:45:13 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.120 2005/06/20 10:29:36 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -29,7 +29,7 @@ typedef struct GISTSTATE giststate; int numindexattrs; double indtuples; - MemoryContext tmpCxt; + MemoryContext tmpCtx; } GISTBuildState; @@ -47,37 +47,14 @@ static void gistfindleaf(GISTInsertState *state, GISTSTATE *giststate); -typedef struct PageLayout { - gistxlogPage block; - OffsetNumber *list; - Buffer buffer; /* to write after all proceed */ - - struct PageLayout *next; -} PageLayout; - - #define ROTATEDIST(d) do { \ - PageLayout *tmp=(PageLayout*)palloc(sizeof(PageLayout)); \ - memset(tmp,0,sizeof(PageLayout)); \ + SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \ + memset(tmp,0,sizeof(SplitedPageLayout)); \ tmp->next = (d); \ (d)=tmp; \ } while(0) -static IndexTuple *gistSplit(Relation r, - Buffer buffer, - IndexTuple *itup, - int *len, - PageLayout **dist, - GISTSTATE *giststate); - - -#undef GISTDEBUG - -#ifdef GISTDEBUG -static void gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff); -#endif - /* * Create and return a temporary memory context for use by GiST. We * _always_ invoke user-provided methods in a temporary memory @@ -124,7 +101,7 @@ gistbuild(PG_FUNCTION_ARGS) initGISTstate(&buildstate.giststate, index); /* initialize the root page */ - buffer = ReadBuffer(index, P_NEW); + buffer = gistReadBuffer(index, P_NEW); GISTInitBuffer(buffer, F_LEAF); if ( !index->rd_istemp ) { XLogRecPtr recptr; @@ -155,23 +132,20 @@ gistbuild(PG_FUNCTION_ARGS) * create a temporary memory context that is reset once for each * tuple inserted into the index */ - buildstate.tmpCxt = createTempGistContext(); + buildstate.tmpCtx = createTempGistContext(); /* do the heap scan */ reltuples = IndexBuildHeapScan(heap, index, indexInfo, gistbuildCallback, (void *) &buildstate); /* okay, all heap tuples are indexed */ - MemoryContextDelete(buildstate.tmpCxt); + MemoryContextDelete(buildstate.tmpCtx); /* since we just counted the # of tuples, may as well update stats */ IndexCloseAndUpdateStats(heap, reltuples, index, buildstate.indtuples); freeGISTstate(&buildstate.giststate); -#ifdef GISTDEBUG - gist_dumptree(index, 0, GIST_ROOT_BLKNO, 0); -#endif PG_RETURN_VOID(); } @@ -190,13 +164,13 @@ gistbuildCallback(Relation index, IndexTuple itup; GISTENTRY tmpcentry; int i; - MemoryContext oldCxt; + MemoryContext oldCtx; /* GiST cannot index tuples with leading NULLs */ if (isnull[0]) return; - oldCxt = MemoryContextSwitchTo(buildstate->tmpCxt); + oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx); /* immediately compress keys to normalize */ for (i = 0; i < buildstate->numindexattrs; i++) @@ -226,8 +200,8 @@ gistbuildCallback(Relation index, gistdoinsert(index, itup, &buildstate->giststate); buildstate->indtuples += 1; - MemoryContextSwitchTo(oldCxt); - MemoryContextReset(buildstate->tmpCxt); + MemoryContextSwitchTo(oldCtx); + MemoryContextReset(buildstate->tmpCtx); } /* @@ -251,8 +225,8 @@ gistinsert(PG_FUNCTION_ARGS) GISTSTATE giststate; GISTENTRY tmpentry; int i; - MemoryContext oldCxt; - MemoryContext insertCxt; + MemoryContext oldCtx; + MemoryContext insertCtx; /* * Since GIST is not marked "amconcurrent" in pg_am, caller should @@ -264,8 +238,8 @@ gistinsert(PG_FUNCTION_ARGS) if (isnull[0]) PG_RETURN_BOOL(false); - insertCxt = createTempGistContext(); - oldCxt = MemoryContextSwitchTo(insertCxt); + insertCtx = createTempGistContext(); + oldCtx = MemoryContextSwitchTo(insertCtx); initGISTstate(&giststate, r); @@ -289,8 +263,8 @@ gistinsert(PG_FUNCTION_ARGS) /* cleanup */ freeGISTstate(&giststate); - MemoryContextSwitchTo(oldCxt); - MemoryContextDelete(insertCxt); + MemoryContextSwitchTo(oldCtx); + MemoryContextDelete(insertCtx); PG_RETURN_BOOL(true); } @@ -315,7 +289,6 @@ gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate) state.r = r; state.key = itup->t_tid; state.needInsertComplete = true; - state.xlog_mode = false; state.stack = (GISTInsertStack*)palloc(sizeof(GISTInsertStack)); memset( state.stack, 0, sizeof(GISTInsertStack)); @@ -335,80 +308,27 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { IndexTuple *itvec, *newitup; int tlen,olen; - PageLayout *dist=NULL, *ptr; + SplitedPageLayout *dist=NULL, *ptr; - memset(&dist, 0, sizeof(PageLayout)); is_splitted = true; itvec = gistextractbuffer(state->stack->buffer, &tlen); olen=tlen; itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen); newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate); - if ( !state->r->rd_istemp && !state->xlog_mode) { - gistxlogPageSplit xlrec; - XLogRecPtr recptr; - XLogRecData *rdata; - int i, npage = 0, cur=1; - - ptr=dist; - while( ptr ) { - npage++; - ptr=ptr->next; - } - - rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + state->ituplen + 2)); - - xlrec.node = state->r->rd_node; - xlrec.origblkno = state->stack->blkno; - xlrec.npage = npage; - xlrec.nitup = state->ituplen; - xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber; - xlrec.key = state->key; - xlrec.pathlen = (uint16)state->pathlen; - - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &xlrec; - rdata[0].len = sizeof( gistxlogPageSplit ); - rdata[0].next = NULL; - - if ( state->pathlen>=0 ) { - rdata[0].next = &(rdata[1]); - rdata[1].buffer = InvalidBuffer; - rdata[1].data = (char *) (state->path); - rdata[1].len = sizeof( BlockNumber ) * state->pathlen; - rdata[1].next = NULL; - cur++; - } - - /* new tuples */ - for(i=0;iituplen;i++) { - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char*)(state->itup[i]); - rdata[cur].len = IndexTupleSize(state->itup[i]); - rdata[cur-1].next = &(rdata[cur]); - cur++; + if ( !state->r->rd_istemp ) { + OffsetNumber noffs=0, offs[ MAXALIGN( sizeof(OffsetNumber) ) / sizeof(OffsetNumber) ]; + XLogRecPtr recptr; + XLogRecData *rdata; + + if ( state->stack->todelete ) { + offs[0] = state->stack->childoffnum; + noffs=1; } - /* new page layout */ - ptr=dist; - while(ptr) { - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char*)&(ptr->block); - rdata[cur].len = sizeof(gistxlogPage); - rdata[cur-1].next = &(rdata[cur]); - cur++; - - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char*)(ptr->list); - rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num); - if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num ) - rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len ); - rdata[cur-1].next = &(rdata[cur]); - rdata[cur].next=NULL; - cur++; - - ptr=ptr->next; - } + rdata = formSplitRdata(state->r->rd_node, state->stack->blkno, + offs, noffs, state->itup, state->ituplen, + &(state->key), state->path, state->pathlen, dist); START_CRIT_SECTION(); @@ -433,57 +353,36 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { state->ituplen = tlen; /* now tlen >= 2 */ if ( state->stack->blkno == GIST_ROOT_BLKNO ) { - gistnewroot(state->r, state->itup, state->ituplen, &(state->key), state->xlog_mode); + gistnewroot(state->r, state->itup, state->ituplen, &(state->key)); state->needInsertComplete=false; } - if ( state->xlog_mode ) - LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(state->stack->buffer); } else { /* enough space */ OffsetNumber off, l; + bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false; off = (PageIsEmpty(state->stack->page)) ? FirstOffsetNumber : OffsetNumberNext(PageGetMaxOffsetNumber(state->stack->page)); l = gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, off); - if ( !state->r->rd_istemp && !state->xlog_mode) { - gistxlogEntryUpdate xlrec; - XLogRecPtr recptr; - XLogRecData *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( state->ituplen + 2 ) ); - int i, cur=0; - - xlrec.node = state->r->rd_node; - xlrec.blkno = state->stack->blkno; - xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber; - xlrec.key = state->key; - xlrec.pathlen = (uint16)state->pathlen; - - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &xlrec; - rdata[0].len = sizeof( gistxlogEntryUpdate ); - rdata[0].next = NULL; - - if ( state->pathlen>=0 ) { - rdata[0].next = &(rdata[1]); - rdata[1].buffer = InvalidBuffer; - rdata[1].data = (char *) (state->path); - rdata[1].len = sizeof( BlockNumber ) * state->pathlen; - rdata[1].next = NULL; - cur++; + if ( !state->r->rd_istemp ) { + OffsetNumber noffs=0, offs[ MAXALIGN( sizeof(OffsetNumber) ) / sizeof(OffsetNumber) ]; + XLogRecPtr recptr; + XLogRecData *rdata; + + if ( state->stack->todelete ) { + offs[0] = state->stack->childoffnum; + noffs=1; } + + rdata = formUpdateRdata(state->r->rd_node, state->stack->blkno, + offs, noffs, false, state->itup, state->ituplen, + &(state->key), state->path, state->pathlen); - for(i=1; i<=state->ituplen; i++) { /* adding tuples */ - rdata[i+cur].buffer = InvalidBuffer; - rdata[i+cur].data = (char*)(state->itup[i-1]); - rdata[i+cur].len = IndexTupleSize(state->itup[i-1]); - rdata[i+cur].next = NULL; - rdata[i-1+cur].next = &(rdata[i+cur]); - } - START_CRIT_SECTION(); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata); @@ -495,9 +394,6 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { if ( state->stack->blkno == GIST_ROOT_BLKNO ) state->needInsertComplete=false; - - if ( state->xlog_mode ) - LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(state->stack->buffer); if (state->ituplen > 1) @@ -507,9 +403,14 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { * parent */ IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate); - ItemPointerSet(&(newtup->t_tid), state->stack->blkno, FirstOffsetNumber); + ItemPointerSetBlockNumber(&(newtup->t_tid), state->stack->blkno); state->itup[0] = newtup; state->ituplen = 1; + } else if (is_leaf) { + /* itup[0] store key to adjust parent, we set it to valid + to correct check by GistTupleIsInvalid macro in gistgetadjusted() */ + ItemPointerSetBlockNumber(&(state->itup[0]->t_tid), state->stack->blkno); + GistTupleSetValid( state->itup[0] ); } } return is_splitted; @@ -524,13 +425,10 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) /* walk down */ while( true ) { - GISTPageOpaque opaque; - - state->stack->buffer = ReadBuffer(state->r, state->stack->blkno); + state->stack->buffer = gistReadBuffer(state->r, state->stack->blkno); state->stack->page = (Page) BufferGetPage(state->stack->buffer); - opaque = (GISTPageOpaque) PageGetSpecialPointer(state->stack->page); - - if (!(opaque->flags & F_LEAF)) + + if (!GistPageIsLeaf(state->stack->page)) { /* * This is an internal page, so continue to walk down the @@ -564,7 +462,7 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) state->pathlen++; ptr=ptr->parent; } - state->path=(BlockNumber*)palloc(sizeof(BlockNumber)*state->pathlen); + state->path=(BlockNumber*)palloc(MAXALIGN(sizeof(BlockNumber)*state->pathlen)); ptr = state->stack; state->pathlen=0; while( ptr ) { @@ -591,7 +489,7 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) { * then itup contains additional for adjustment of current key */ - is_splitted = gistplacetopage(state, giststate ); + is_splitted = gistplacetopage(state, giststate); /* pop page from stack */ state->stack = state->stack->parent; @@ -623,6 +521,7 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) { * an insert in a child node. Therefore, remove the old * version of this node's key. */ + gistadjscans(state->r, GISTOP_DEL, state->stack->blkno, state->stack->childoffnum); PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); if ( !state->r->rd_istemp ) @@ -639,42 +538,32 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) { /* release all buffers */ while( state->stack ) { - if ( state->xlog_mode ) - LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(state->stack->buffer); state->stack = state->stack->parent; } /* say to xlog that insert is completed */ - if ( !state->xlog_mode && state->needInsertComplete && !state->r->rd_istemp ) { - gistxlogInsertComplete xlrec; - XLogRecData rdata; - - xlrec.node = state->r->rd_node; - xlrec.key = state->key; - - rdata.buffer = InvalidBuffer; - rdata.data = (char *) &xlrec; - rdata.len = sizeof( gistxlogInsertComplete ); - rdata.next = NULL; - - START_CRIT_SECTION(); + if ( state->needInsertComplete && !state->r->rd_istemp ) + gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1); +} - XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, &rdata); +static void +gistToRealOffset(OffsetNumber *arr, int len, OffsetNumber *reasloffset) { + int i; - END_CRIT_SECTION(); - } + for(i=0;iflags); + leftbuf = gistReadBuffer(r, P_NEW); + GISTInitBuffer(leftbuf, opaque->flags&F_LEAF); lbknum = BufferGetBlockNumber(leftbuf); left = (Page) BufferGetPage(leftbuf); } @@ -716,74 +608,99 @@ gistSplit(Relation r, left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData)); } - rightbuf = ReadBuffer(r, P_NEW); - GISTInitBuffer(rightbuf, opaque->flags); + rightbuf = gistReadBuffer(r, P_NEW); + GISTInitBuffer(rightbuf, opaque->flags&F_LEAF); rbknum = BufferGetBlockNumber(rightbuf); right = (Page) BufferGetPage(rightbuf); /* generate the item array */ + realoffset = palloc((*len + 1) * sizeof(OffsetNumber)); entryvec = palloc(GEVHDRSZ + (*len + 1) * sizeof(GISTENTRY)); entryvec->n = *len + 1; + fakeoffset = FirstOffsetNumber; for (i = 1; i <= *len; i++) { Datum datum; bool IsNull; + if (!GistPageIsLeaf(p) && GistTupleIsInvalid( itup[i - 1] )) { + entryvec->n--; + /* remember position of invalid tuple */ + realoffset[ entryvec->n ] = i; + continue; + } + datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull); - gistdentryinit(giststate, 0, &(entryvec->vector[i]), + gistdentryinit(giststate, 0, &(entryvec->vector[fakeoffset]), datum, r, p, i, ATTSIZE(datum, giststate->tupdesc, 1, IsNull), FALSE, IsNull); + realoffset[ fakeoffset ] = i; + fakeoffset++; } - /* - * now let the user-defined picksplit function set up the split - * vector; in entryvec have no null value!! - */ - FunctionCall2(&giststate->picksplitFn[0], - PointerGetDatum(entryvec), - PointerGetDatum(&v)); - - /* compatibility with old code */ - if (v.spl_left[v.spl_nleft - 1] == InvalidOffsetNumber) - v.spl_left[v.spl_nleft - 1] = (OffsetNumber) *len; - if (v.spl_right[v.spl_nright - 1] == InvalidOffsetNumber) - v.spl_right[v.spl_nright - 1] = (OffsetNumber) *len; - - v.spl_lattr[0] = v.spl_ldatum; - v.spl_rattr[0] = v.spl_rdatum; - v.spl_lisnull[0] = false; - v.spl_risnull[0] = false; - - /* - * if index is multikey, then we must to try get smaller bounding box - * for subkey(s) - */ - if (r->rd_att->natts > 1) - { - int MaxGrpId; - - v.spl_idgrp = (int *) palloc0(sizeof(int) * (*len + 1)); - v.spl_grpflag = (char *) palloc0(sizeof(char) * (*len + 1)); - v.spl_ngrp = (int *) palloc(sizeof(int) * (*len + 1)); - - MaxGrpId = gistfindgroup(giststate, entryvec->vector, &v); - - /* form union of sub keys for each page (l,p) */ - gistunionsubkey(r, giststate, itup, &v); - - /* - * if possible, we insert equivalent tuples with control by - * penalty for a subkey(s) - */ - if (MaxGrpId > 1) - gistadjsubkey(r, itup, len, &v, giststate); + /* + * if it was invalid tuple then we need special processing. If + * it's possible, we move all invalid tuples on right page. + * We should remember, that union with invalid tuples + * is a invalid tuple. + */ + if ( entryvec->n != *len + 1 ) { + lencleaneditup = entryvec->n-1; + cleaneditup = (IndexTuple*)palloc(lencleaneditup * sizeof(IndexTuple)); + for(i=1;in;i++) + cleaneditup[i-1] = itup[ realoffset[ i ]-1 ]; + + if ( gistnospace( left, cleaneditup, lencleaneditup ) ) { + /* no space on left to put all good tuples, so picksplit */ + gistUserPicksplit(r, entryvec, &v, cleaneditup, lencleaneditup, giststate); + v.spl_leftvalid = true; + v.spl_rightvalid = false; + gistToRealOffset( v.spl_left, v.spl_nleft, realoffset ); + gistToRealOffset( v.spl_right, v.spl_nright, realoffset ); + } else { + /* we can try to store all valid tuples on one page */ + v.spl_right = (OffsetNumber*)palloc( entryvec->n * sizeof(OffsetNumber) ); + v.spl_left = (OffsetNumber*)palloc( entryvec->n * sizeof(OffsetNumber) ); + + if ( lencleaneditup==0 ) { + /* all tuples are invalid, so moves half of its to right */ + v.spl_leftvalid = v.spl_rightvalid = false; + v.spl_nright = 0; + v.spl_nleft = 0; + for(i=1;i<=*len;i++) + if ( i-1<*len/2 ) + v.spl_left[ v.spl_nleft++ ] = i; + else + v.spl_right[ v.spl_nright++ ] = i; + } else { + /* we will not call gistUserPicksplit, just put good + tuples on left and invalid on right */ + v.spl_nleft = lencleaneditup; + v.spl_nright = 0; + for(i=1;in;i++) + v.spl_left[i-1] = i; + gistToRealOffset( v.spl_left, v.spl_nleft, realoffset ); + v.spl_lattr[0] = v.spl_ldatum = (Datum)0; + v.spl_rattr[0] = v.spl_rdatum = (Datum)0; + v.spl_lisnull[0] = true; + v.spl_risnull[0] = true; + gistunionsubkey(r, giststate, itup, &v, true); + v.spl_leftvalid = true; + v.spl_rightvalid = false; + } + } + } else { + /* there is no invalid tuples, so usial processing */ + gistUserPicksplit(r, entryvec, &v, itup, *len, giststate); + v.spl_leftvalid = v.spl_rightvalid = true; } + /* form left and right vector */ - lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nleft); - rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nright); + lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len+1)); + rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len+1)); for (i = 0; i < v.spl_nleft; i++) lvectup[i] = itup[v.spl_left[i] - 1]; @@ -791,12 +708,16 @@ gistSplit(Relation r, for (i = 0; i < v.spl_nright; i++) rvectup[i] = itup[v.spl_right[i] - 1]; + /* place invalid tuples on right page if itsn't done yet */ + for (fakeoffset = entryvec->n; fakeoffset < *len+1 && lencleaneditup; fakeoffset++) { + rvectup[v.spl_nright++] = itup[realoffset[fakeoffset] - 1]; + } /* write on disk (may need another split) */ if (gistnospace(right, rvectup, v.spl_nright)) { int i; - PageLayout *d, *origd=*dist; + SplitedPageLayout *d, *origd=*dist; nlen = v.spl_nright; newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate); @@ -824,8 +745,9 @@ gistSplit(Relation r, nlen = 1; newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1); - newtup[0] = gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull); - ItemPointerSet(&(newtup[0]->t_tid), rbknum, FirstOffsetNumber); + newtup[0] = ( v.spl_rightvalid ) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull) + : gist_form_invalid_tuple( rbknum ); + ItemPointerSetBlockNumber(&(newtup[0]->t_tid), rbknum); } if (gistnospace(left, lvectup, v.spl_nleft)) @@ -833,7 +755,7 @@ gistSplit(Relation r, int llen = v.spl_nleft; IndexTuple *lntup; int i; - PageLayout *d, *origd=*dist; + SplitedPageLayout *d, *origd=*dist; lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate); @@ -867,49 +789,35 @@ gistSplit(Relation r, nlen += 1; newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen); - newtup[nlen - 1] = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull); - ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, FirstOffsetNumber); + newtup[nlen - 1] = ( v.spl_leftvalid ) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull) + : gist_form_invalid_tuple( lbknum ); + ItemPointerSetBlockNumber(&(newtup[nlen - 1]->t_tid), lbknum); } + GistClearTuplesDeleted(p); + *len = nlen; return newtup; } void -gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode) +gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key) { Buffer buffer; Page page; - buffer = (xlog_mode) ? XLogReadBuffer(false, r, GIST_ROOT_BLKNO) : ReadBuffer(r, GIST_ROOT_BLKNO); + buffer = gistReadBuffer(r, GIST_ROOT_BLKNO); GISTInitBuffer(buffer, 0); page = BufferGetPage(buffer); gistfillbuffer(r, page, itup, len, FirstOffsetNumber); - if ( !xlog_mode && !r->rd_istemp ) { - gistxlogEntryUpdate xlrec; + if ( !r->rd_istemp ) { XLogRecPtr recptr; - XLogRecData *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( len + 1 ) ); - int i; + XLogRecData *rdata; - xlrec.node = r->rd_node; - xlrec.blkno = GIST_ROOT_BLKNO; - xlrec.todeleteoffnum = InvalidOffsetNumber; - xlrec.key = *key; - xlrec.pathlen=0; - - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &xlrec; - rdata[0].len = sizeof( gistxlogEntryUpdate ); - rdata[0].next = NULL; - - for(i=1; i<=len; i++) { - rdata[i].buffer = InvalidBuffer; - rdata[i].data = (char*)(itup[i-1]); - rdata[i].len = IndexTupleSize(itup[i-1]); - rdata[i].next = NULL; - rdata[i-1].next = &(rdata[i]); - } + rdata = formUpdateRdata(r->rd_node, GIST_ROOT_BLKNO, + NULL, 0, false, itup, len, + key, NULL, 0); START_CRIT_SECTION(); @@ -919,118 +827,9 @@ gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mo END_CRIT_SECTION(); } - if ( xlog_mode ) - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(buffer); } - -/* - * Bulk deletion of all index entries pointing to a set of heap tuples. - * The set of target tuples is specified via a callback routine that tells - * whether any given heap tuple (identified by ItemPointer) is being deleted. - * - * Result: a palloc'd struct containing statistical info for VACUUM displays. - */ -Datum -gistbulkdelete(PG_FUNCTION_ARGS) -{ - Relation rel = (Relation) PG_GETARG_POINTER(0); - IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1); - void *callback_state = (void *) PG_GETARG_POINTER(2); - IndexBulkDeleteResult *result; - BlockNumber num_pages; - double tuples_removed; - double num_index_tuples; - IndexScanDesc iscan; - - tuples_removed = 0; - num_index_tuples = 0; - - /* - * Since GIST is not marked "amconcurrent" in pg_am, caller should - * have acquired exclusive lock on index relation. We need no locking - * here. - */ - - /* - * XXX generic implementation --- should be improved! - */ - - /* walk through the entire index */ - iscan = index_beginscan(NULL, rel, SnapshotAny, 0, NULL); - /* including killed tuples */ - iscan->ignore_killed_tuples = false; - - while (index_getnext_indexitem(iscan, ForwardScanDirection)) - { - vacuum_delay_point(); - - if (callback(&iscan->xs_ctup.t_self, callback_state)) - { - ItemPointerData indextup = iscan->currentItemData; - BlockNumber blkno; - OffsetNumber offnum; - Buffer buf; - Page page; - - blkno = ItemPointerGetBlockNumber(&indextup); - offnum = ItemPointerGetOffsetNumber(&indextup); - - /* adjust any scans that will be affected by this deletion */ - gistadjscans(rel, GISTOP_DEL, blkno, offnum); - - /* delete the index tuple */ - buf = ReadBuffer(rel, blkno); - page = BufferGetPage(buf); - - PageIndexTupleDelete(page, offnum); - if ( !rel->rd_istemp ) { - gistxlogEntryUpdate xlrec; - XLogRecPtr recptr; - XLogRecData rdata; - - xlrec.node = rel->rd_node; - xlrec.blkno = blkno; - xlrec.todeleteoffnum = offnum; - xlrec.pathlen=0; - ItemPointerSetInvalid( &(xlrec.key) ); - - rdata.buffer = InvalidBuffer; - rdata.data = (char *) &xlrec; - rdata.len = sizeof( gistxlogEntryUpdate ); - rdata.next = NULL; - - START_CRIT_SECTION(); - - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_DELETE, &rdata); - PageSetLSN(page, recptr); - PageSetTLI(page, ThisTimeLineID); - - END_CRIT_SECTION(); - } - - WriteBuffer(buf); - - tuples_removed += 1; - } - else - num_index_tuples += 1; - } - - index_endscan(iscan); - - /* return statistics */ - num_pages = RelationGetNumberOfBlocks(rel); - - result = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); - result->num_pages = num_pages; - result->num_index_tuples = num_index_tuples; - result->tuples_removed = tuples_removed; - - PG_RETURN_POINTER(result); -} - void initGISTstate(GISTSTATE *giststate, Relation index) { @@ -1074,49 +873,3 @@ freeGISTstate(GISTSTATE *giststate) /* no work */ } -#ifdef GISTDEBUG -static void -gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff) -{ - Buffer buffer; - Page page; - GISTPageOpaque opaque; - IndexTuple which; - ItemId iid; - OffsetNumber i, - maxoff; - BlockNumber cblk; - char *pred; - - pred = (char *) palloc(sizeof(char) * level + 1); - MemSet(pred, '\t', level); - pred[level] = '\0'; - - buffer = ReadBuffer(r, blk); - page = (Page) BufferGetPage(buffer); - opaque = (GISTPageOpaque) PageGetSpecialPointer(page); - - maxoff = PageGetMaxOffsetNumber(page); - - elog(DEBUG4, "%sPage: %d %s blk: %d maxoff: %d free: %d", pred, - coff, (opaque->flags & F_LEAF) ? "LEAF" : "INTE", (int) blk, - (int) maxoff, PageGetFreeSpace(page)); - - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) - { - iid = PageGetItemId(page, i); - which = (IndexTuple) PageGetItem(page, iid); - cblk = ItemPointerGetBlockNumber(&(which->t_tid)); -#ifdef PRINTTUPLE - elog(DEBUG4, "%s Tuple. blk: %d size: %d", pred, (int) cblk, - IndexTupleSize(which)); -#endif - - if (!(opaque->flags & F_LEAF)) - gist_dumptree(r, level + 1, cblk, i); - } - ReleaseBuffer(buffer); - pfree(pred); -} -#endif /* defined GISTDEBUG */ - diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c index 5b9a94471b..4bce9962f3 100644 --- a/src/backend/access/gist/gistget.c +++ b/src/backend/access/gist/gistget.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.48 2005/06/14 11:45:13 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.49 2005/06/20 10:29:36 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -112,7 +112,6 @@ gistnext(IndexScanDesc scan, ScanDirection dir) { Page p; OffsetNumber n; - GISTPageOpaque po; GISTScanOpaque so; GISTSTACK *stk; IndexTuple it; @@ -127,7 +126,6 @@ gistnext(IndexScanDesc scan, ScanDirection dir) } p = BufferGetPage(so->curbuf); - po = (GISTPageOpaque) PageGetSpecialPointer(p); if (ItemPointerIsValid(&scan->currentItemData) == false) { @@ -169,7 +167,6 @@ gistnext(IndexScanDesc scan, ScanDirection dir) so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation, stk->block); p = BufferGetPage(so->curbuf); - po = (GISTPageOpaque) PageGetSpecialPointer(p); if (ScanDirectionIsBackward(dir)) n = OffsetNumberPrev(stk->offset); @@ -182,7 +179,7 @@ gistnext(IndexScanDesc scan, ScanDirection dir) continue; } - if (po->flags & F_LEAF) + if (GistPageIsLeaf(p)) { /* * We've found a matching index entry in a leaf page, so @@ -219,7 +216,6 @@ gistnext(IndexScanDesc scan, ScanDirection dir) so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation, child_block); p = BufferGetPage(so->curbuf); - po = (GISTPageOpaque) PageGetSpecialPointer(p); if (ScanDirectionIsBackward(dir)) n = PageGetMaxOffsetNumber(p); @@ -256,6 +252,12 @@ gistindex_keytest(IndexTuple tuple, IncrIndexProcessed(); + /* + * Tuple doesn't restore after crash recovery because of inclomplete insert + */ + if ( !GistPageIsLeaf(p) && GistTupleIsInvalid(tuple) ) + return true; + while (keySize > 0) { Datum datum; @@ -317,7 +319,6 @@ gistfindnext(IndexScanDesc scan, OffsetNumber n, ScanDirection dir) { OffsetNumber maxoff; IndexTuple it; - GISTPageOpaque po; GISTScanOpaque so; MemoryContext oldcxt; Page p; @@ -325,7 +326,6 @@ gistfindnext(IndexScanDesc scan, OffsetNumber n, ScanDirection dir) so = (GISTScanOpaque) scan->opaque; p = BufferGetPage(so->curbuf); maxoff = PageGetMaxOffsetNumber(p); - po = (GISTPageOpaque) PageGetSpecialPointer(p); /* * Make sure we're in a short-lived memory context when we invoke diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c index 44391f9f73..735be85f25 100644 --- a/src/backend/access/gist/gistutil.c +++ b/src/backend/access/gist/gistutil.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.1 2005/06/14 11:45:13 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.2 2005/06/20 10:29:36 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -19,6 +19,7 @@ #include "access/heapam.h" #include "catalog/index.h" #include "miscadmin.h" +#include "storage/freespace.h" /* group flags ( in gistadjsubkey ) */ #define LEFT_ADDED 0x01 @@ -132,9 +133,14 @@ gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) GistEntryVector *evec; int i; GISTENTRY centry[INDEX_MAX_KEYS]; + IndexTuple res; evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); + for(i = 0; ird_att->natts; i++) { Datum datum; @@ -191,7 +197,9 @@ gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) } } - return index_form_tuple(giststate->tupdesc, attr, isnull); + res = index_form_tuple(giststate->tupdesc, attr, isnull); + GistTupleSetValid( res ); + return res; } @@ -215,11 +223,15 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis IndexTuple newtup = NULL; int i; + if ( GistTupleIsInvalid(oldtup) || GistTupleIsInvalid(addtup) ) + return gist_form_invalid_tuple( ItemPointerGetBlockNumber( &(oldtup->t_tid) ) ); + evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ); evec->n = 2; ev0p = &(evec->vector[0]); ev1p = &(evec->vector[1]); + gistDeCompressAtt(giststate, r, oldtup, NULL, (OffsetNumber) 0, oldatt, oldisnull); @@ -283,7 +295,7 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis } void -gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl) +gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl, bool isall) { int lr; @@ -314,9 +326,9 @@ gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITV isnull = spl->spl_risnull; } - evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); + evec = palloc(((len < 2) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); - for (i = 1; i < r->rd_att->natts; i++) + for (i = (isall) ? 0 : 1; i < r->rd_att->natts; i++) { int j; Datum datum; @@ -448,7 +460,7 @@ gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl) void gistadjsubkey(Relation r, IndexTuple *itup, /* contains compressed entry */ - int *len, + int len, GIST_SPLITVEC *v, GISTSTATE *giststate) { @@ -501,7 +513,7 @@ gistadjsubkey(Relation r, ev1p = &(evec->vector[1]); /* add equivalent tuple */ - for (i = 0; i < *len; i++) + for (i = 0; i < len; i++) { Datum datum; @@ -617,7 +629,7 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ maxoff = PageGetMaxOffsetNumber(p); *which_grow = -1.0; - which = -1; + which = InvalidOffsetNumber; sum_grow = 1; gistDeCompressAtt(giststate, r, it, NULL, (OffsetNumber) 0, @@ -627,6 +639,12 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ { int j; IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); + + if ( !GistPageIsLeaf(p) && GistTupleIsInvalid(itup) ) { + elog(LOG, "It's desirable to vacuum or reindex GiST index '%s' due to crash recovery", + RelationGetRelationName(r)); + continue; + } sum_grow = 0; for (j = 0; j < r->rd_att->natts; j++) @@ -660,6 +678,9 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ } } + if ( which == InvalidOffsetNumber ) + which = FirstOffsetNumber; + return which; } @@ -721,6 +742,7 @@ gistFormTuple(GISTSTATE *giststate, Relation r, GISTENTRY centry[INDEX_MAX_KEYS]; Datum compatt[INDEX_MAX_KEYS]; int i; + IndexTuple res; for (i = 0; i < r->rd_att->natts; i++) { @@ -735,7 +757,9 @@ gistFormTuple(GISTSTATE *giststate, Relation r, } } - return index_form_tuple(giststate->tupdesc, compatt, isnull); + res = index_form_tuple(giststate->tupdesc, compatt, isnull); + GistTupleSetValid(res); + return res; } void @@ -783,3 +807,79 @@ GISTInitBuffer(Buffer b, uint32 f) opaque->flags = f; } +void +gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v, + IndexTuple *itup, int len, GISTSTATE *giststate) { + /* + * now let the user-defined picksplit function set up the split + * vector; in entryvec have no null value!! + */ + FunctionCall2(&giststate->picksplitFn[0], + PointerGetDatum(entryvec), + PointerGetDatum(v)); + + /* compatibility with old code */ + if (v->spl_left[v->spl_nleft - 1] == InvalidOffsetNumber) + v->spl_left[v->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1); + if (v->spl_right[v->spl_nright - 1] == InvalidOffsetNumber) + v->spl_right[v->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1); + + v->spl_lattr[0] = v->spl_ldatum; + v->spl_rattr[0] = v->spl_rdatum; + v->spl_lisnull[0] = false; + v->spl_risnull[0] = false; + + /* + * if index is multikey, then we must to try get smaller bounding box + * for subkey(s) + */ + if (r->rd_att->natts > 1) + { + int MaxGrpId; + + v->spl_idgrp = (int *) palloc0(sizeof(int) * entryvec->n); + v->spl_grpflag = (char *) palloc0(sizeof(char) * entryvec->n); + v->spl_ngrp = (int *) palloc(sizeof(int) * entryvec->n); + + MaxGrpId = gistfindgroup(giststate, entryvec->vector, v); + + /* form union of sub keys for each page (l,p) */ + gistunionsubkey(r, giststate, itup, v, false); + + /* + * if possible, we insert equivalent tuples with control by + * penalty for a subkey(s) + */ + if (MaxGrpId > 1) + gistadjsubkey(r, itup, len, v, giststate); + } +} + +Buffer +gistReadBuffer(Relation r, BlockNumber blkno) { + Buffer buffer = InvalidBuffer; + + if ( blkno != P_NEW ) { + buffer = ReadBuffer(r, blkno); + } else { + Page page; + + while(true) { + blkno = GetFreeIndexPage(&r->rd_node); + if (blkno == InvalidBlockNumber) + break; + + buffer = ReadBuffer(r, blkno); + page = BufferGetPage(buffer); + if ( GistPageIsDeleted( page ) ) { + GistPageSetNonDeleted( page ); + return buffer; + } + ReleaseBuffer( buffer ); + } + + buffer = ReadBuffer(r, P_NEW); + } + + return buffer; +} diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c new file mode 100644 index 0000000000..8f8e7f7507 --- /dev/null +++ b/src/backend/access/gist/gistvacuum.c @@ -0,0 +1,519 @@ +/*------------------------------------------------------------------------- + * + * gistvacuum.c + * interface routines for the postgres GiST index access method. + * + * + * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.1 2005/06/20 10:29:36 teodor Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/genam.h" +#include "access/gist_private.h" +#include "access/gistscan.h" +#include "access/heapam.h" +#include "catalog/index.h" +#include "commands/vacuum.h" +#include "miscadmin.h" +#include "utils/memutils.h" +#include "storage/freespace.h" +#include "storage/smgr.h" + +/* filled by gistbulkdelete, cleared by gistvacuumpcleanup */ +static bool needFullVacuum = false; + + +typedef struct { + GISTSTATE giststate; + Relation index; + MemoryContext opCtx; + IndexBulkDeleteResult *result; + + /* path to root */ + BlockNumber *path; + int pathlen; + int curpathlen; +} GistVacuum; + +static void +shiftPath(GistVacuum *gv, BlockNumber blkno) { + if ( gv->pathlen == 0 ) { + gv->pathlen = 8; + gv->path = (BlockNumber*) palloc( MAXALIGN(sizeof(BlockNumber)*gv->pathlen) ); + } else if ( gv->pathlen == gv->curpathlen ) { + gv->pathlen *= 2; + gv->path = (BlockNumber*) repalloc( gv->path, MAXALIGN(sizeof(BlockNumber)*gv->pathlen) ); + } + + if ( gv->curpathlen ) + memmove( gv->path+1, gv->path, sizeof(BlockNumber)*gv->curpathlen ); + gv->curpathlen++; + gv->path[0] = blkno; +} + +static void +unshiftPath(GistVacuum *gv) { + gv->curpathlen--; + if ( gv->curpathlen ) + memmove( gv->path, gv->path+1, sizeof(BlockNumber)*gv->curpathlen ); +} + +typedef struct { + IndexTuple *itup; + int ituplen; + bool emptypage; +} ArrayTuple; + + +static ArrayTuple +gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) { + ArrayTuple res = {NULL, 0, false}; + Buffer buffer; + Page page; + OffsetNumber i, maxoff; + ItemId iid; + int lenaddon=4, curlenaddon=0, ntodelete=0; + IndexTuple idxtuple, *addon=NULL; + bool needwrite=false; + OffsetNumber *todelete=NULL; + ItemPointerData *completed=NULL; + int ncompleted=0, lencompleted=16; + + buffer = ReadBuffer(gv->index, blkno); + page = (Page) BufferGetPage(buffer); + maxoff = PageGetMaxOffsetNumber(page); + + + if ( GistPageIsLeaf(page) ) { + if ( GistTuplesDeleted(page) ) { + needunion = needwrite = true; + GistClearTuplesDeleted(page); + } + } else { + todelete = (OffsetNumber*)palloc( MAXALIGN(sizeof(OffsetNumber)*(maxoff+1)) ); + completed = (ItemPointerData*)palloc( sizeof(ItemPointerData)*lencompleted ); + addon=(IndexTuple*)palloc(sizeof(IndexTuple)*lenaddon); + + shiftPath(gv, blkno); + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { + ArrayTuple chldtuple; + bool needchildunion; + + iid = PageGetItemId(page, i); + idxtuple = (IndexTuple) PageGetItem(page, iid); + needchildunion = (GistTupleIsInvalid(idxtuple)) ? true : false; + + if ( needchildunion ) + elog(DEBUG2,"gistVacuumUpdate: Need union for block %u", ItemPointerGetBlockNumber(&(idxtuple->t_tid))); + + chldtuple = gistVacuumUpdate( gv, ItemPointerGetBlockNumber(&(idxtuple->t_tid)), + needchildunion ); + if ( chldtuple.ituplen || chldtuple.emptypage ) { + /* adjust any scans that will be affected by this deletion */ + gistadjscans(gv->index, GISTOP_DEL, blkno, i); + PageIndexTupleDelete(page, i); + todelete[ ntodelete++ ] = i; + i--; maxoff--; + needwrite=needunion=true; + + if ( chldtuple.ituplen ) { + while( curlenaddon + chldtuple.ituplen >= lenaddon ) { + lenaddon*=2; + addon=(IndexTuple*)repalloc( addon, sizeof(IndexTuple)*lenaddon ); + } + + memcpy( addon + curlenaddon, chldtuple.itup, chldtuple.ituplen * sizeof(IndexTuple) ); + + curlenaddon += chldtuple.ituplen; + + if ( chldtuple.ituplen > 1 ) { + /* child was splitted, so we need mark completion insert(split) */ + int j; + + while( ncompleted + chldtuple.ituplen > lencompleted ) { + lencompleted*=2; + completed = (ItemPointerData*)repalloc(completed, sizeof(ItemPointerData) * lencompleted); + } + for(j=0;jt_tid), completed + ncompleted ); + ncompleted++; + } + } + pfree( chldtuple.itup ); + } + } + } + + if ( curlenaddon ) { + /* insert updated tuples */ + if (gistnospace(page, addon, curlenaddon)) { + /* there is no space on page to insert tuples */ + IndexTuple *vec; + SplitedPageLayout *dist=NULL,*ptr; + int i; + MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx); + + vec = gistextractbuffer(buffer, &(res.ituplen)); + vec = gistjoinvector(vec, &(res.ituplen), addon, curlenaddon); + res.itup = gistSplit(gv->index, buffer, vec, &(res.ituplen), &dist, &(gv->giststate)); + MemoryContextSwitchTo(oldCtx); + + vec = (IndexTuple*)palloc( sizeof(IndexTuple) * res.ituplen ); + for(i=0;iindex->rd_istemp ) { + XLogRecPtr recptr; + XLogRecData *rdata; + ItemPointerData key; /* set key for incomplete insert */ + + ItemPointerSet(&key, blkno, TUPLE_IS_VALID); + + oldCtx = MemoryContextSwitchTo(gv->opCtx); + + /* path is need to recovery because there is new pages, in a case of + crash it's needed to add inner tuple pointers on parent page */ + rdata = formSplitRdata(gv->index->rd_node, blkno, + todelete, ntodelete, addon, curlenaddon, + &key, gv->path, gv->curpathlen, dist); + + MemoryContextSwitchTo(oldCtx); + + START_CRIT_SECTION(); + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); + ptr = dist; + while(ptr) { + PageSetLSN(BufferGetPage(ptr->buffer), recptr); + PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID); + ptr=ptr->next; + } + + END_CRIT_SECTION(); + + } + + ptr = dist; + while(ptr) { + WriteBuffer(ptr->buffer); + ptr=ptr->next; + } + + if ( blkno == GIST_ROOT_BLKNO ) { + ItemPointerData key; /* set key for incomplete insert */ + + ItemPointerSet(&key, blkno, TUPLE_IS_VALID); + + oldCtx = MemoryContextSwitchTo(gv->opCtx); + gistnewroot(gv->index, res.itup, res.ituplen, &key); + MemoryContextSwitchTo(oldCtx); + } + + needwrite=false; + + MemoryContextReset(gv->opCtx); + + needunion = false; /* gistSplit already forms unions */ + } else { + OffsetNumber off = (PageIsEmpty(page)) ? + FirstOffsetNumber + : + OffsetNumberNext(PageGetMaxOffsetNumber(page)); + + /* enough free space */ + gistfillbuffer(gv->index, page, addon, curlenaddon, off); + } + } + unshiftPath(gv); + } + + if ( needunion ) { + /* forms union for page or check empty*/ + if ( PageIsEmpty(page) ) { + if ( blkno == GIST_ROOT_BLKNO ) { + needwrite=true; + GistPageSetLeaf( page ); + } else { + needwrite=true; + res.emptypage=true; + GistPageSetDeleted( page ); + gv->result->pages_deleted++; + } + } else { + IndexTuple *vec, tmp; + int veclen=0; + MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx); + + vec = gistextractbuffer(buffer, &veclen); + tmp = gistunion(gv->index, vec, veclen, &(gv->giststate)); + MemoryContextSwitchTo(oldCtx); + + res.itup=(IndexTuple*)palloc( sizeof(IndexTuple) ); + res.ituplen = 1; + res.itup[0] = (IndexTuple)palloc( IndexTupleSize(tmp) ); + memcpy( res.itup[0], tmp, IndexTupleSize(tmp) ); + + ItemPointerSetBlockNumber(&(res.itup[0]->t_tid), blkno); + GistTupleSetValid( res.itup[0] ); + + MemoryContextReset(gv->opCtx); + } + } + + if ( needwrite ) { + if ( !gv->index->rd_istemp ) { + XLogRecData *rdata; + XLogRecPtr recptr; + MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx); + + /* In a vacuum, it's not need to push path, because + there is no new inserted keys */ + rdata = formUpdateRdata(gv->index->rd_node, blkno, todelete, ntodelete, + res.emptypage, addon, curlenaddon, NULL, NULL, 0); + MemoryContextSwitchTo(oldCtx); + + + START_CRIT_SECTION(); + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + END_CRIT_SECTION(); + MemoryContextReset(gv->opCtx); + } + WriteBuffer( buffer ); + } else + ReleaseBuffer( buffer ); + + if ( ncompleted && !gv->index->rd_istemp ) + gistxlogInsertCompletion( gv->index->rd_node, completed, ncompleted ); + + for(i=0;ivacuum_full ) { + GistVacuum gv; + ArrayTuple res; + + gv.index = rel; + initGISTstate(&(gv.giststate), rel); + gv.opCtx = createTempGistContext(); + gv.result = stats; + + gv.path=NULL; + gv.pathlen = gv.curpathlen = 0; + + /* walk through the entire index for update tuples */ + res = gistVacuumUpdate( &gv, GIST_ROOT_BLKNO, false ); + /* cleanup */ + if (res.itup) { + int i; + for(i=0;i MaxFSMPages ) + maxFreePages = MaxFSMPages; + nFreePages = 0; + freePages = (BlockNumber*) palloc (sizeof(BlockNumber) * maxFreePages); + for(blkno=GIST_ROOT_BLKNO+1;blkno 0 ) { + if ( info->vacuum_full ) { /* try to truncate index */ + int i; + for(i=0;i= lastFilledBlock ) { + nFreePages = i; + break; + } + + if ( lastBlock > lastFilledBlock ) + RelationTruncate( rel, lastFilledBlock+1 ); + stats->pages_removed = lastBlock - lastFilledBlock; + } + + if ( nFreePages > 0 ) + RecordIndexFreeSpace( &rel->rd_node, nFreePages, freePages ); + } + pfree( freePages ); + + /* return statistics */ + stats->pages_free = nFreePages; + stats->num_pages = RelationGetNumberOfBlocks(rel); + + /* UnlockRelation(rel, AccessExclusiveLock); */ + + PG_RETURN_POINTER(stats); +} + +typedef struct GistBDItem { + BlockNumber blkno; + struct GistBDItem *next; +} GistBDItem; + +/* + * Bulk deletion of all index entries pointing to a set of heap tuples and + * update invalid tuples after crash recovery. + * The set of target tuples is specified via a callback routine that tells + * whether any given heap tuple (identified by ItemPointer) is being deleted. + * + * Result: a palloc'd struct containing statistical info for VACUUM displays. + */ +Datum +gistbulkdelete(PG_FUNCTION_ARGS) { + Relation rel = (Relation) PG_GETARG_POINTER(0); + IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1); + void* callback_state = (void *) PG_GETARG_POINTER(2); + IndexBulkDeleteResult *result = (IndexBulkDeleteResult*)palloc0(sizeof(IndexBulkDeleteResult)); + GistBDItem *stack, *ptr; + MemoryContext opCtx = createTempGistContext(); + + stack = (GistBDItem*) palloc(sizeof(GistBDItem)); + + stack->blkno = GIST_ROOT_BLKNO; + stack->next = NULL; + needFullVacuum = false; + + while( stack ) { + Buffer buffer = ReadBuffer(rel, stack->blkno); + Page page = (Page) BufferGetPage(buffer); + OffsetNumber i, maxoff = PageGetMaxOffsetNumber(page); + IndexTuple idxtuple; + ItemId iid; + OffsetNumber *todelete = NULL; + int ntodelete = 0; + + if ( GistPageIsLeaf(page) ) { + ItemPointerData heapptr; + + todelete = (OffsetNumber*)palloc( MAXALIGN(sizeof(OffsetNumber)*maxoff) ); + + for(i=FirstOffsetNumber;i<=maxoff;i=OffsetNumberNext(i)) { + iid = PageGetItemId(page, i); + idxtuple = (IndexTuple) PageGetItem(page, iid); + heapptr = idxtuple->t_tid; + + if ( callback(&heapptr, callback_state) ) { + gistadjscans(rel, GISTOP_DEL, stack->blkno, i); + PageIndexTupleDelete(page, i); + todelete[ ntodelete++ ] = i; + i--; maxoff--; + result->tuples_removed += 1; + } else + result->num_index_tuples += 1; + } + } else { + for(i=FirstOffsetNumber;i<=maxoff;i=OffsetNumberNext(i)) { + iid = PageGetItemId(page, i); + idxtuple = (IndexTuple) PageGetItem(page, iid); + + ptr = (GistBDItem*) palloc(sizeof(GistBDItem)); + ptr->blkno = ItemPointerGetBlockNumber( &(idxtuple->t_tid) ); + ptr->next = stack->next; + stack->next = ptr; + + if ( GistTupleIsInvalid(idxtuple) ) + needFullVacuum = true; + } + } + + if ( ntodelete && todelete ) { + GistMarkTuplesDeleted(page); + + if (!rel->rd_istemp ) { + XLogRecData *rdata; + XLogRecPtr recptr; + MemoryContext oldCtx = MemoryContextSwitchTo(opCtx); + + rdata = formUpdateRdata(rel->rd_node, stack->blkno, todelete, ntodelete, + false, NULL, 0, NULL, NULL, 0); + MemoryContextSwitchTo(oldCtx); + + START_CRIT_SECTION(); + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + END_CRIT_SECTION(); + + MemoryContextReset(opCtx); + } + + WriteBuffer( buffer ); + } else + ReleaseBuffer( buffer ); + + if ( todelete ) + pfree( todelete ); + + ptr = stack->next; + pfree( stack ); + stack = ptr; + } + + MemoryContextDelete( opCtx ); + + result->num_pages = RelationGetNumberOfBlocks(rel); + + + PG_RETURN_POINTER( result ); +} + diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index b99ab24761..b6c0696e1a 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.1 2005/06/14 11:45:13 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.2 2005/06/20 10:29:36 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -22,11 +22,13 @@ #include "miscadmin.h" #include "utils/memutils.h" + typedef struct { gistxlogEntryUpdate *data; int len; IndexTuple *itup; BlockNumber *path; + OffsetNumber *todelete; } EntryUpdateRecord; typedef struct { @@ -44,6 +46,7 @@ typedef struct { NewPage *page; IndexTuple *itup; BlockNumber *path; + OffsetNumber *todelete; } PageSplitRecord; /* track for incomplete inserts, idea was taken from nbtxlog.c */ @@ -55,6 +58,7 @@ typedef struct gistIncompleteInsert { BlockNumber *blkno; int pathlen; BlockNumber *path; + XLogRecPtr lsn; } gistIncompleteInsert; @@ -65,12 +69,12 @@ static List *incomplete_inserts; #define ItemPointerEQ( a, b ) \ ( \ - ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(a) && \ + ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \ ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \ ) static void -pushIncompleteInsert(RelFileNode node, ItemPointerData key, +pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, BlockNumber *blkno, int lenblk, BlockNumber *path, int pathlen, PageSplitRecord *xlinfo /* to extract blkno info */ ) { @@ -79,6 +83,7 @@ pushIncompleteInsert(RelFileNode node, ItemPointerData key, ninsert->node = node; ninsert->key = key; + ninsert->lsn = lsn; if ( lenblk && blkno ) { ninsert->lenblk = lenblk; @@ -95,7 +100,7 @@ pushIncompleteInsert(RelFileNode node, ItemPointerData key, } Assert( ninsert->lenblk>0 ); - if ( path && ninsert->pathlen ) { + if ( path && pathlen ) { ninsert->pathlen = pathlen; ninsert->path = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->pathlen ); memcpy(ninsert->path, path, sizeof(BlockNumber)*ninsert->pathlen); @@ -135,11 +140,17 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) { decoded->data = (gistxlogEntryUpdate*)begin; if ( decoded->data->pathlen ) { - addpath = sizeof(BlockNumber) * decoded->data->pathlen; + addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen ); decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate )); } else decoded->path = NULL; + if ( decoded->data->ntodelete ) { + decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogEntryUpdate ) + addpath); + addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete ); + } else + decoded->todelete = NULL; + decoded->len=0; ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath; while( ptr - begin < record->xl_len ) { @@ -157,7 +168,9 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) { } } - +/* + * redo any page update (except page split) + */ static void gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { EntryUpdateRecord xlrec; @@ -191,19 +204,39 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { } } - if ( isnewroot ) - GISTInitBuffer(buffer, 0); - else if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber ) - PageIndexTupleDelete(page, xlrec.data->todeleteoffnum); + if ( xlrec.data->isemptypage ) { + while( !PageIsEmpty(page) ) + PageIndexTupleDelete( page, FirstOffsetNumber ); + + if ( xlrec.data->blkno == GIST_ROOT_BLKNO ) + GistPageSetLeaf( page ); + else + GistPageSetDeleted( page ); + } else { + if ( isnewroot ) + GISTInitBuffer(buffer, 0); + else if ( xlrec.data->ntodelete ) { + int i; + for(i=0; i < xlrec.data->ntodelete ; i++) + PageIndexTupleDelete(page, xlrec.todelete[i]); + if ( GistPageIsLeaf(page) ) + GistMarkTuplesDeleted(page); + } - /* add tuples */ - if ( xlrec.len > 0 ) { - OffsetNumber off = (PageIsEmpty(page)) ? - FirstOffsetNumber - : - OffsetNumberNext(PageGetMaxOffsetNumber(page)); + /* add tuples */ + if ( xlrec.len > 0 ) { + OffsetNumber off = (PageIsEmpty(page)) ? + FirstOffsetNumber + : + OffsetNumberNext(PageGetMaxOffsetNumber(page)); - gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off); + gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off); + } + + /* special case: leafpage, nothing to insert, nothing to delete, then + vacuum marks page */ + if ( GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0 ) + GistClearTuplesDeleted(page); } PageSetLSN(page, lsn); @@ -216,7 +249,7 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO ) - pushIncompleteInsert(xlrec.data->node, xlrec.data->key, + pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, &(xlrec.data->blkno), 1, xlrec.path, xlrec.data->pathlen, NULL); @@ -233,11 +266,17 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) { decoded->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * decoded->data->nitup ); if ( decoded->data->pathlen ) { - addpath = sizeof(BlockNumber) * decoded->data->pathlen; - decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate )); + addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen ); + decoded->path = (BlockNumber*)(begin+sizeof( gistxlogPageSplit )); } else decoded->path = NULL; + if ( decoded->data->ntodelete ) { + decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogPageSplit ) + addpath); + addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete ); + } else + decoded->todelete = NULL; + ptr=begin+sizeof( gistxlogPageSplit ) + addpath; for(i=0;idata->nitup;i++) { Assert( ptr - begin < record->xl_len ); @@ -285,19 +324,23 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) { return; } - if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber ) - PageIndexTupleDelete(page, xlrec.data->todeleteoffnum); + if ( xlrec.data->ntodelete ) { + int i; + for(i=0; i < xlrec.data->ntodelete ; i++) + PageIndexTupleDelete(page, xlrec.todelete[i]); + } itup = gistextractbuffer(buffer, &len); itup = gistjoinvector(itup, &len, xlrec.itup, xlrec.data->nitup); institup = (IndexTuple*)palloc( sizeof(IndexTuple) * len ); opaque = (GISTPageOpaque) PageGetSpecialPointer(page); + /* read and fill all pages */ for(i=0;inpage;i++) { int j; NewPage *newpage = xlrec.page + i; - /* prepare itup vector */ + /* prepare itup vector per page */ for(j=0;jheader->num;j++) institup[j] = itup[ newpage->offnum[j] - 1 ]; @@ -311,9 +354,9 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) { if (!BufferIsValid(newpage->buffer)) elog(PANIC, "gistRedoPageSplitRecord: lost page"); newpage->page = (Page) BufferGetPage(newpage->buffer); - if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(newpage->page))) { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); + if (!PageIsNew((PageHeader) newpage->page) && XLByteLE(lsn, PageGetLSN(newpage->page))) { + LockBuffer(newpage->buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(newpage->buffer); newpage->is_ok=true; continue; /* good page */ } else { @@ -350,7 +393,7 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) { if ( incomplete_inserts != NIL ) forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); - pushIncompleteInsert(xlrec.data->node, xlrec.data->key, + pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, NULL, 0, xlrec.path, xlrec.data->pathlen, &xlrec); @@ -386,6 +429,21 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) { WriteBuffer(buffer); } +static void +gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record) { + char *begin = XLogRecGetData(record), *ptr; + gistxlogInsertComplete *xlrec; + + xlrec = (gistxlogInsertComplete*)begin; + + ptr = begin + sizeof( gistxlogInsertComplete ); + while( ptr - begin < record->xl_len ) { + Assert( record->xl_len - (ptr - begin) >= sizeof(ItemPointerData) ); + forgetIncompleteInsert( xlrec->node, *((ItemPointerData*)ptr) ); + ptr += sizeof(ItemPointerData); + } +} + void gist_redo(XLogRecPtr lsn, XLogRecord *record) { @@ -408,8 +466,7 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) gistRedoCreateIndex(lsn, record); break; case XLOG_GIST_INSERT_COMPLETE: - forgetIncompleteInsert( ((gistxlogInsertComplete*)XLogRecGetData(record))->node, - ((gistxlogInsertComplete*)XLogRecGetData(record))->key ); + gistRedoCompleteInsert(lsn, record); break; default: elog(PANIC, "gist_redo: unknown op code %u", info); @@ -431,16 +488,16 @@ out_target(char *buf, RelFileNode node, ItemPointerData key) static void out_gistxlogEntryUpdate(char *buf, gistxlogEntryUpdate *xlrec) { out_target(buf, xlrec->node, xlrec->key); - sprintf(buf + strlen(buf), "; block number %u; update offset %u;", - xlrec->blkno, xlrec->todeleteoffnum); + sprintf(buf + strlen(buf), "; block number %u", + xlrec->blkno); } static void out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) { strcat(buf, "page_split: "); out_target(buf, xlrec->node, xlrec->key); - sprintf(buf + strlen(buf), "; block number %u; update offset %u; add %d tuples; split to %d pages", - xlrec->origblkno, xlrec->todeleteoffnum, + sprintf(buf + strlen(buf), "; block number %u; add %d tuples; split to %d pages", + xlrec->origblkno, xlrec->nitup, xlrec->npage); } @@ -472,135 +529,172 @@ gist_desc(char *buf, uint8 xl_info, char *rec) ((RelFileNode*)rec)->relNode); break; case XLOG_GIST_INSERT_COMPLETE: - strcat(buf, "insert_complete: "); - out_target(buf, ((gistxlogInsertComplete*)rec)->node, ((gistxlogInsertComplete*)rec)->key); + sprintf(buf + strlen(buf), "complete_insert: rel %u/%u/%u", + ((gistxlogInsertComplete*)rec)->node.spcNode, + ((gistxlogInsertComplete*)rec)->node.dbNode, + ((gistxlogInsertComplete*)rec)->node.relNode); default: elog(PANIC, "gist_desc: unknown op code %u", info); } } +IndexTuple +gist_form_invalid_tuple(BlockNumber blkno) { + /* we don't alloc space for null's bitmap, this is invalid tuple, + be carefull in read and write code */ + Size size = IndexInfoFindDataOffset(0); + IndexTuple tuple=(IndexTuple)palloc0( size ); + + tuple->t_info |= size; + + ItemPointerSetBlockNumber(&(tuple->t_tid), blkno); + GistTupleSetInvalid( tuple ); + + return tuple; +} -#ifdef GIST_INCOMPLETE_INSERT static void gistContinueInsert(gistIncompleteInsert *insert) { - GISTSTATE giststate; - GISTInsertState state; - int i; + IndexTuple *itup; + int i, lenitup; MemoryContext oldCxt; + Relation index; + oldCxt = MemoryContextSwitchTo(opCtx); - state.r = XLogOpenRelation(insert->node); - if (!RelationIsValid(state.r)) + index = XLogOpenRelation(insert->node); + if (!RelationIsValid(index)) return; - initGISTstate(&giststate, state.r); + elog(LOG,"Detected incomplete insert into GiST index %u/%u/%u; It's desirable to vacuum or reindex index", + insert->node.spcNode, insert->node.dbNode, insert->node.relNode); - state.needInsertComplete=false; - ItemPointerSetInvalid( &(state.key) ); - state.path=NULL; - state.pathlen=0; - state.xlog_mode = true; + /* needed vector itup never will be more than initial lenblkno+2, + because during this processing Indextuple can be only smaller */ + lenitup = insert->lenblk; + itup = (IndexTuple*)palloc(sizeof(IndexTuple)*(lenitup+2 /*guarantee root split*/)); - /* form union tuples */ - state.itup = (IndexTuple*)palloc(sizeof(IndexTuple)*insert->lenblk); - state.ituplen = insert->lenblk; - for(i=0;ilenblk;i++) { - int len=0; - IndexTuple *itup; - Buffer buffer; - Page page; + for(i=0;ilenblk;i++) + itup[i] = gist_form_invalid_tuple( insert->blkno[i] ); - buffer = XLogReadBuffer(false, state.r, insert->blkno[i]); - if (!BufferIsValid(buffer)) - elog(PANIC, "gistContinueInsert: block unfound"); - page = (Page) BufferGetPage(buffer); - if ( PageIsNew((PageHeader)page) ) - elog(PANIC, "gistContinueInsert: uninitialized page"); + if ( insert->pathlen==0 ) { + /*it was split root, so we should only make new root*/ + Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO); + Page page; - itup = gistextractbuffer(buffer, &len); - state.itup[i] = gistunion(state.r, itup, len, &giststate); + if (!BufferIsValid(buffer)) + elog(PANIC, "gistContinueInsert: root block unfound"); - ItemPointerSet( &(state.itup[i]->t_tid), insert->blkno[i], FirstOffsetNumber ); - + GISTInitBuffer(buffer, 0); + page = BufferGetPage(buffer); + gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - } - - if ( insert->pathlen==0 ) { - /*it was split root, so we should only make new root*/ - gistnewroot(state.r, state.itup, state.ituplen, &(state.key), true); - MemoryContextSwitchTo(oldCxt); - MemoryContextReset(opCtx); - return; - } + WriteBuffer(buffer); + } else { + Buffer *buffers; + Page *pages; + int numbuffer; + + buffers= (Buffer*) palloc( sizeof(Buffer) * (insert->lenblk+2/*guarantee root split*/) ); + pages = (Page*) palloc( sizeof(Page ) * (insert->lenblk+2/*guarantee root split*/) ); - /* form stack */ - state.stack=NULL; - for(i=0;ipathlen;i++) { - int j,len=0; - IndexTuple *itup; - GISTInsertStack *top = (GISTInsertStack*)palloc( sizeof(GISTInsertStack) ); - - top->blkno = insert->path[i]; - top->buffer = XLogReadBuffer(false, state.r, top->blkno); - if (!BufferIsValid(top->buffer)) - elog(PANIC, "gistContinueInsert: block unfound"); - top->page = (Page) BufferGetPage(top->buffer); - if ( PageIsNew((PageHeader)(top->page)) ) - elog(PANIC, "gistContinueInsert: uninitialized page"); - - top->todelete = false; - - /* find childoffnum */ - itup = gistextractbuffer(top->buffer, &len); - top->childoffnum=InvalidOffsetNumber; - for(j=0;jchildoffnum==InvalidOffsetNumber;j++) { - BlockNumber blkno = ItemPointerGetBlockNumber( &(itup[j]->t_tid) ); + for(i=0;ipathlen;i++) { + int j, k, pituplen=0, childfound=0; + + numbuffer=1; + buffers[numbuffer-1] = XLogReadBuffer(false, index, insert->path[i]); + if (!BufferIsValid(buffers[numbuffer-1])) + elog(PANIC, "gistContinueInsert: block %u unfound", insert->path[i]); + pages[numbuffer-1] = BufferGetPage( buffers[numbuffer-1] ); + if ( PageIsNew((PageHeader)(pages[numbuffer-1])) ) + elog(PANIC, "gistContinueInsert: uninitialized page"); + + pituplen = PageGetMaxOffsetNumber(pages[numbuffer-1]); - if ( i==0 ) { - int k; - for(k=0;klenblk;k++) - if ( insert->blkno[k] == blkno ) { - top->childoffnum = j+1; + /* remove old IndexTuples */ + for(j=0;jt_tid) ); + + for(k=0;kt_tid) ) == blkno ) { + PageIndexTupleDelete(pages[numbuffer-1], j+FirstOffsetNumber); + j--; pituplen--; + childfound++; break; } - } else if ( insert->path[i-1]==blkno ) - top->childoffnum = j+1; - } + } - if ( top->childoffnum==InvalidOffsetNumber ) { - elog(WARNING, "gistContinueInsert: unknown parent, REINDEX GiST Indexes"); - return; + if ( gistnospace(pages[numbuffer-1], itup, lenitup) ) { + /* no space left on page, so we should split */ + buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW); + if (!BufferIsValid(buffers[numbuffer])) + elog(PANIC, "gistContinueInsert: can't create new block"); + GISTInitBuffer(buffers[numbuffer], 0); + pages[numbuffer] = BufferGetPage( buffers[numbuffer] ); + gistfillbuffer( index, pages[numbuffer], itup, lenitup, FirstOffsetNumber ); + numbuffer++; + + if ( BufferGetBlockNumber( buffers[0] ) == GIST_ROOT_BLKNO ) { + IndexTuple *parentitup; + + parentitup = gistextractbuffer(buffers[numbuffer-1], &pituplen); + + /* we split root, just copy tuples from old root to new page */ + if ( i+1 != insert->pathlen ) + elog(PANIC,"gistContinueInsert: can't restore index '%s'", + RelationGetRelationName( index )); + + /* fill new page */ + buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW); + if (!BufferIsValid(buffers[numbuffer])) + elog(PANIC, "gistContinueInsert: can't create new block"); + GISTInitBuffer(buffers[numbuffer], 0); + pages[numbuffer] = BufferGetPage( buffers[numbuffer] ); + gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber); + numbuffer++; + + /* fill root page */ + GISTInitBuffer(buffers[0], 0); + for(j=1;jlsn); + PageSetTLI(pages[j], ThisTimeLineID); + LockBuffer(buffers[j], BUFFER_LOCK_UNLOCK); + WriteBuffer( buffers[j] ); + } } - - if ( i==0 ) - PageIndexTupleDelete(top->page, top->childoffnum); - - /* install item on right place in stack */ - top->parent=NULL; - if ( state.stack ) { - GISTInsertStack *ptr = state.stack; - while( ptr->parent ) - ptr = ptr->parent; - ptr->parent=top; - } else - state.stack = top; } - /* Good. Now we can continue insert */ - - gistmakedeal(&state, &giststate); - MemoryContextSwitchTo(oldCxt); MemoryContextReset(opCtx); } -#endif void gist_xlog_startup(void) { incomplete_inserts=NIL; insertCtx = AllocSetContextCreate(CurrentMemoryContext, - "GiST insert in xlog temporary context", + "GiST recovery temporary context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); @@ -613,16 +707,194 @@ gist_xlog_cleanup(void) { foreach(l, incomplete_inserts) { gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l); - char buf[1024]; - - *buf='\0'; - out_target(buf, insert->node, insert->key); - elog(LOG,"Incomplete insert: %s; It's needed to reindex", buf); -#ifdef GIST_INCOMPLETE_INSERT gistContinueInsert(insert); -#endif } MemoryContextDelete(opCtx); MemoryContextDelete(insertCtx); } + +XLogRecData * +formSplitRdata(RelFileNode node, BlockNumber blkno, + OffsetNumber *todelete, int ntodelete, + IndexTuple *itup, int ituplen, ItemPointer key, + BlockNumber *path, int pathlen, SplitedPageLayout *dist ) { + + XLogRecData *rdata; + gistxlogPageSplit *xlrec = (gistxlogPageSplit*)palloc(sizeof(gistxlogPageSplit)); + SplitedPageLayout *ptr; + int npage = 0, cur=1, i; + + ptr=dist; + while( ptr ) { + npage++; + ptr=ptr->next; + } + + rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + ituplen + 3)); + + xlrec->node = node; + xlrec->origblkno = blkno; + xlrec->npage = (uint16)npage; + xlrec->nitup = (uint16)ituplen; + xlrec->ntodelete = (uint16)ntodelete; + xlrec->pathlen = (uint16)pathlen; + if ( key ) + xlrec->key = *key; + else + ItemPointerSetInvalid( &(xlrec->key) ); + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) xlrec; + rdata[0].len = sizeof( gistxlogPageSplit ); + rdata[0].next = NULL; + + if ( pathlen ) { + rdata[cur-1].next = &(rdata[cur]); + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)path; + rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen); + rdata[cur].next = NULL; + cur++; + } + + if ( ntodelete ) { + rdata[cur-1].next = &(rdata[cur]); + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)todelete; + rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ntodelete); + rdata[cur].next = NULL; + cur++; + } + + /* new tuples */ + for(i=0;iblock); + rdata[cur].len = sizeof(gistxlogPage); + rdata[cur-1].next = &(rdata[cur]); + cur++; + + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)(ptr->list); + rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num); + if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num ) + rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len ); + rdata[cur-1].next = &(rdata[cur]); + rdata[cur].next=NULL; + cur++; + ptr=ptr->next; + } + + return rdata; +} + + +XLogRecData * +formUpdateRdata(RelFileNode node, BlockNumber blkno, + OffsetNumber *todelete, int ntodelete, bool emptypage, + IndexTuple *itup, int ituplen, ItemPointer key, + BlockNumber *path, int pathlen) { + XLogRecData *rdata; + gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate*)palloc(sizeof(gistxlogEntryUpdate)); + + xlrec->node = node; + xlrec->blkno = blkno; + if ( key ) + xlrec->key = *key; + else + ItemPointerSetInvalid( &(xlrec->key) ); + + if ( emptypage ) { + xlrec->isemptypage = true; + xlrec->ntodelete = 0; + xlrec->pathlen = 0; + + rdata = (XLogRecData*)palloc( sizeof(XLogRecData) ); + rdata->buffer = InvalidBuffer; + rdata->data = (char*)xlrec; + rdata->len = sizeof(gistxlogEntryUpdate); + rdata->next = NULL; + } else { + int cur=1,i; + + xlrec->isemptypage = false; + xlrec->ntodelete = ntodelete; + xlrec->pathlen = pathlen; + + rdata = (XLogRecData*) palloc( sizeof(XLogRecData) * ( 3 + ituplen ) ); + + rdata->buffer = InvalidBuffer; + rdata->data = (char*)xlrec; + rdata->len = sizeof(gistxlogEntryUpdate); + rdata->next = NULL; + + if ( pathlen ) { + rdata[cur-1].next = &(rdata[cur]); + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)path; + rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen); + rdata[cur].next = NULL; + cur++; + } + + if ( ntodelete ) { + rdata[cur-1].next = &(rdata[cur]); + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)todelete; + rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ntodelete); + rdata[cur].next = NULL; + cur++; + } + + /* new tuples */ + for(i=0;i0); + xlrec.node = node; + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof( gistxlogInsertComplete ); + rdata[0].next = &(rdata[1]); + + rdata[1].buffer = InvalidBuffer; + rdata[1].data = (char *) keys; + rdata[1].len = sizeof( ItemPointerData ) * len; + rdata[1].next = NULL; + + START_CRIT_SECTION(); + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata); + + END_CRIT_SECTION(); + + return recptr; +} diff --git a/src/include/access/gist.h b/src/include/access/gist.h index 33110b71b6..bf9c1c712b 100644 --- a/src/include/access/gist.h +++ b/src/include/access/gist.h @@ -9,7 +9,7 @@ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/gist.h,v 1.46 2005/05/17 03:34:18 neilc Exp $ + * $PostgreSQL: pgsql/src/include/access/gist.h,v 1.47 2005/06/20 10:29:36 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -36,6 +36,8 @@ * Page opaque data in a GiST index page. */ #define F_LEAF (1 << 0) +#define F_DELETED (1 << 1) +#define F_TUPLES_DELETED (1 << 2) typedef struct GISTPageOpaqueData { @@ -56,6 +58,7 @@ typedef struct GIST_SPLITVEC * spl_left */ int spl_lattrsize[INDEX_MAX_KEYS]; bool spl_lisnull[INDEX_MAX_KEYS]; + bool spl_leftvalid; OffsetNumber *spl_right; /* array of entries that go right */ int spl_nright; /* size of the array */ @@ -64,6 +67,7 @@ typedef struct GIST_SPLITVEC * spl_right */ int spl_rattrsize[INDEX_MAX_KEYS]; bool spl_risnull[INDEX_MAX_KEYS]; + bool spl_rightvalid; int *spl_idgrp; int *spl_ngrp; /* number in each group */ @@ -86,7 +90,18 @@ typedef struct GISTENTRY bool leafkey; } GISTENTRY; -#define GIST_LEAF(entry) (((GISTPageOpaque) PageGetSpecialPointer((entry)->page))->flags & F_LEAF) +#define GistPageIsLeaf(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_LEAF) +#define GIST_LEAF(entry) (GistPageIsLeaf((entry)->page)) +#define GistPageSetLeaf(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_LEAF) +#define GistPageSetNonLeaf(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_LEAF) + +#define GistPageIsDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_DELETED) +#define GistPageSetDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_DELETED) +#define GistPageSetNonDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_DELETED) + +#define GistTuplesDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_TUPLES_DELETED) +#define GistMarkTuplesDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_TUPLES_DELETED) +#define GistClearTuplesDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_TUPLES_DELETED) /* * Vector of GISTENTRY structs; user-defined methods union and pick diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h index 479f221176..2a563e1dd6 100644 --- a/src/include/access/gist_private.h +++ b/src/include/access/gist_private.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.3 2005/06/14 11:45:14 teodor Exp $ + * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.4 2005/06/20 10:29:36 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -94,7 +94,6 @@ typedef struct { int ituplen; /* length of itup */ GISTInsertStack *stack; bool needInsertComplete; - bool xlog_mode; /* pointer to heap tuple */ ItemPointerData key; @@ -142,19 +141,20 @@ typedef struct gistxlogEntryUpdate { RelFileNode node; BlockNumber blkno; - /* if todeleteoffnum!=InvalidOffsetNumber then delete it. */ - OffsetNumber todeleteoffnum; + uint16 ntodelete; uint16 pathlen; + bool isemptypage; /* - * It used to identify compliteness of insert. + * It used to identify completeness of insert. * Sets to leaf itup */ ItemPointerData key; /* follow: - * 1. path to root (BlockNumber) - * 2. tuples to insert + * 1. path to root (BlockNumber) + * 2. todelete OffsetNumbers + * 3. tuples to insert */ } gistxlogEntryUpdate; @@ -163,18 +163,19 @@ typedef struct gistxlogEntryUpdate { typedef struct gistxlogPageSplit { RelFileNode node; BlockNumber origblkno; /*splitted page*/ - OffsetNumber todeleteoffnum; + uint16 ntodelete; uint16 pathlen; - int npage; - int nitup; + uint16 npage; + uint16 nitup; /* see comments on gistxlogEntryUpdate */ ItemPointerData key; /* follow: * 1. path to root (BlockNumber) - * 2. tuples to insert - * 3. gistxlogPage and array of OffsetNumber per page + * 2. todelete OffsetNumbers + * 3. tuples to insert + * 4. gistxlogPage and array of OffsetNumber per page */ } gistxlogPageSplit; @@ -188,32 +189,65 @@ typedef struct gistxlogPage { typedef struct gistxlogInsertComplete { RelFileNode node; - ItemPointerData key; + /* follows ItemPointerData key to clean */ } gistxlogInsertComplete; -#define XLOG_GIST_CREATE_INDEX 0x50 +#define XLOG_GIST_CREATE_INDEX 0x50 + +/* + * mark tuples on inner pages during recovery + */ +#define TUPLE_IS_VALID 0xffff +#define TUPLE_IS_INVALID 0xfffe + +#define GistTupleIsInvalid(itup) ( ItemPointerGetOffsetNumber( &((itup)->t_tid) ) == TUPLE_IS_INVALID ) +#define GistTupleSetValid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID ) +#define GistTupleSetInvalid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_INVALID ) /* gist.c */ extern Datum gistbuild(PG_FUNCTION_ARGS); extern Datum gistinsert(PG_FUNCTION_ARGS); -extern Datum gistbulkdelete(PG_FUNCTION_ARGS); extern MemoryContext createTempGistContext(void); extern void initGISTstate(GISTSTATE *giststate, Relation index); extern void freeGISTstate(GISTSTATE *giststate); -extern void gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode); +extern void gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key); extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate); +typedef struct SplitedPageLayout { + gistxlogPage block; + OffsetNumber *list; + Buffer buffer; /* to write after all proceed */ + + struct SplitedPageLayout *next; +} SplitedPageLayout; + +IndexTuple * gistSplit(Relation r, Buffer buffer, IndexTuple *itup, + int *len, SplitedPageLayout **dist, GISTSTATE *giststate); /* gistxlog.c */ extern void gist_redo(XLogRecPtr lsn, XLogRecord *record); extern void gist_desc(char *buf, uint8 xl_info, char *rec); extern void gist_xlog_startup(void); extern void gist_xlog_cleanup(void); +extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno); + +extern XLogRecData* formUpdateRdata(RelFileNode node, BlockNumber blkno, + OffsetNumber *todelete, int ntodelete, bool emptypage, + IndexTuple *itup, int ituplen, ItemPointer key, + BlockNumber *path, int pathlen); + +extern XLogRecData* formSplitRdata(RelFileNode node, BlockNumber blkno, + OffsetNumber *todelete, int ntodelete, + IndexTuple *itup, int ituplen, ItemPointer key, + BlockNumber *path, int pathlen, SplitedPageLayout *dist ); + +extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len); /* gistget.c */ extern Datum gistgettuple(PG_FUNCTION_ARGS); extern Datum gistgetmulti(PG_FUNCTION_ARGS); /* gistutil.c */ +extern Buffer gistReadBuffer(Relation r, BlockNumber blkno); extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup, int len, OffsetNumber off); extern bool gistnospace(Page page, IndexTuple *itvec, int len); @@ -230,7 +264,7 @@ extern IndexTuple gistgetadjusted(Relation r, extern int gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl); extern void gistadjsubkey(Relation r, - IndexTuple *itup, int *len, + IndexTuple *itup, int len, GIST_SPLITVEC *v, GISTSTATE *giststate); extern IndexTuple gistFormTuple(GISTSTATE *giststate, @@ -247,10 +281,16 @@ extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, OffsetNumber o, GISTENTRY *attdata, bool *isnull); extern void gistunionsubkey(Relation r, GISTSTATE *giststate, - IndexTuple *itvec, GIST_SPLITVEC *spl); + IndexTuple *itvec, GIST_SPLITVEC *spl, bool isall); extern void GISTInitBuffer(Buffer b, uint32 f); extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, Datum k, Relation r, Page pg, OffsetNumber o, int b, bool l, bool isNull); +void gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v, + IndexTuple *itup, int len, GISTSTATE *giststate); + +/* gistvacuum.c */ +extern Datum gistbulkdelete(PG_FUNCTION_ARGS); +extern Datum gistvacuumcleanup(PG_FUNCTION_ARGS); #endif /* GIST_PRIVATE_H */ diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index bdbaa83ace..458cddd134 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -37,7 +37,7 @@ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.278 2005/06/18 19:33:42 tgl Exp $ + * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.279 2005/06/20 10:29:37 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 200506181 +#define CATALOG_VERSION_NO 200506201 #endif diff --git a/src/include/catalog/pg_am.h b/src/include/catalog/pg_am.h index f473277b46..2cb2264328 100644 --- a/src/include/catalog/pg_am.h +++ b/src/include/catalog/pg_am.h @@ -8,7 +8,7 @@ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.34 2005/06/13 23:14:49 tgl Exp $ + * $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.35 2005/06/20 10:29:37 teodor Exp $ * * NOTES * the genbki.sh script reads this file and generates .bki @@ -112,7 +112,7 @@ DESCR("b-tree index access method"); DATA(insert OID = 405 ( hash 1 1 0 f f f f t hashinsert hashbeginscan hashgettuple hashgetmulti hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbulkdelete - hashcostestimate )); DESCR("hash index access method"); #define HASH_AM_OID 405 -DATA(insert OID = 783 ( gist 100 7 0 f t f f f gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete - gistcostestimate )); +DATA(insert OID = 783 ( gist 100 7 0 f t f f f gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate )); DESCR("GiST index access method"); #define GIST_AM_OID 783 diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index a05a4f3a62..f219065b61 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/catalog/pg_proc.h,v 1.368 2005/06/17 22:32:48 tgl Exp $ + * $PostgreSQL: pgsql/src/include/catalog/pg_proc.h,v 1.369 2005/06/20 10:29:37 teodor Exp $ * * NOTES * The script catalog/genbki.sh reads this file and generates .bki @@ -1092,6 +1092,7 @@ DATA(insert OID = 782 ( gistbuild PGNSP PGUID 12 f f t f v 3 2278 "2281 228 DESCR("gist(internal)"); DATA(insert OID = 776 ( gistbulkdelete PGNSP PGUID 12 f f t f v 3 2281 "2281 2281 2281" _null_ _null_ _null_ gistbulkdelete - _null_ )); DESCR("gist(internal)"); +DATA(insert OID = 2561 ( gistvacuumcleanup PGNSP PGUID 12 f f t f v 3 2281 "2281 2281 2281" _null_ _null_ _null_ gistvacuumcleanup - _null_ )); DATA(insert OID = 772 ( gistcostestimate PGNSP PGUID 12 f f t f v 7 2278 "2281 2281 2281 2281 2281 2281 2281" _null_ _null_ _null_ gistcostestimate - _null_ )); DESCR("gist(internal)");