* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.121 2005/06/20 15:22:37 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.122 2005/06/27 12:45:21 teodor Exp $
*
*-------------------------------------------------------------------------
*/
#include "miscadmin.h"
#include "utils/memutils.h"
+const XLogRecPtr XLogRecPtrForTemp = { 1, 1 };
+
/* Working state for gistbuild and its callback */
typedef struct
{
initGISTstate(&buildstate.giststate, index);
/* initialize the root page */
- buffer = gistReadBuffer(index, P_NEW);
+ buffer = gistNewBuffer(index);
GISTInitBuffer(buffer, F_LEAF);
if ( !index->rd_istemp ) {
XLogRecPtr recptr;
PageSetTLI(page, ThisTimeLineID);
END_CRIT_SECTION();
- }
+ } else
+ PageSetLSN(BufferGetPage(buffer), XLogRecPtrForTemp);
+ LockBuffer(buffer, GIST_UNLOCK);
WriteBuffer(buffer);
/* build the index */
MemoryContext oldCtx;
MemoryContext insertCtx;
- /*
- * Since GIST is not marked "amconcurrent" in pg_am, caller should
- * have acquired exclusive lock on index relation. We need no locking
- * here.
- */
-
/* GiST cannot index tuples with leading NULLs */
if (isnull[0])
PG_RETURN_BOOL(false);
state.key = itup->t_tid;
state.needInsertComplete = true;
- state.stack = (GISTInsertStack*)palloc(sizeof(GISTInsertStack));
- memset( state.stack, 0, sizeof(GISTInsertStack));
+ state.stack = (GISTInsertStack*)palloc0(sizeof(GISTInsertStack));
state.stack->blkno=GIST_ROOT_BLKNO;
gistfindleaf(&state, giststate);
static bool
gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
bool is_splitted = false;
+ bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
+
+ if ( !is_leaf )
+ /*
+ * This node's key has been modified, either because a child
+ * split occurred or because we needed to adjust our key for
+ * an insert in a child node. Therefore, remove the old
+ * version of this node's key.
+ */
+
+ PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
+
if (gistnospace(state->stack->page, state->itup, state->ituplen))
{
/* no space for insertion */
XLogRecData *rdata;
rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
- &(state->key), state->path, state->pathlen, dist);
+ &(state->key), dist);
START_CRIT_SECTION();
}
END_CRIT_SECTION();
- }
-
- ptr = dist;
- while(ptr) {
- WriteBuffer(ptr->buffer);
- ptr=ptr->next;
+ } else {
+ ptr = dist;
+ while(ptr) {
+ PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
+ ptr=ptr->next;
+ }
}
state->itup = newitup;
state->ituplen = tlen; /* now tlen >= 2 */
if ( state->stack->blkno == GIST_ROOT_BLKNO ) {
- gistnewroot(state->r, state->itup, state->ituplen, &(state->key));
+ gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
state->needInsertComplete=false;
+ ptr = dist;
+ while(ptr) {
+ Page page = (Page)BufferGetPage(ptr->buffer);
+ GistPageGetOpaque(page)->rightlink = ( ptr->next ) ?
+ ptr->next->block.blkno : InvalidBlockNumber;
+ LockBuffer( ptr->buffer, GIST_UNLOCK );
+ WriteBuffer(ptr->buffer);
+ ptr=ptr->next;
+ }
+ } else {
+ Page page;
+ BlockNumber rightrightlink = InvalidBlockNumber;
+ SplitedPageLayout *ourpage=NULL;
+ GistNSN oldnsn;
+ GISTPageOpaque opaque;
+
+ /* move origpage to first in chain */
+ if ( dist->block.blkno != state->stack->blkno ) {
+ ptr = dist;
+ while(ptr->next) {
+ if ( ptr->next->block.blkno == state->stack->blkno ) {
+ ourpage = ptr->next;
+ ptr->next = ptr->next->next;
+ ourpage->next = dist;
+ dist = ourpage;
+ break;
+ }
+ ptr=ptr->next;
+ }
+ Assert( ourpage != NULL );
+ } else
+ ourpage = dist;
+
+
+ /* now gets all needed data, and sets nsn's */
+ page = (Page)BufferGetPage(ourpage->buffer);
+ opaque = GistPageGetOpaque(page);
+ rightrightlink = opaque->rightlink;
+ oldnsn = opaque->nsn;
+ opaque->nsn = PageGetLSN(page);
+ opaque->rightlink = ourpage->next->block.blkno;
+
+ /* fills and write all new pages.
+ They isn't linked into tree yet */
+
+ ptr = ourpage->next;
+ while(ptr) {
+ page = (Page)BufferGetPage(ptr->buffer);
+ GistPageGetOpaque(page)->rightlink = ( ptr->next ) ?
+ ptr->next->block.blkno : rightrightlink;
+ /* only for last set oldnsn */
+ GistPageGetOpaque(page)->nsn = ( ptr->next ) ?
+ opaque->nsn : oldnsn;
+
+ LockBuffer(ptr->buffer, GIST_UNLOCK);
+ WriteBuffer(ptr->buffer);
+ ptr=ptr->next;
+ }
}
- ReleaseBuffer(state->stack->buffer);
+ WriteNoReleaseBuffer( state->stack->buffer );
}
else
{
/* enough space */
- OffsetNumber off, l;
- bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
+ OffsetNumber l, off;
+ XLogRecPtr oldlsn;
- off = (PageIsEmpty(state->stack->page)) ?
- FirstOffsetNumber
- :
- OffsetNumberNext(PageGetMaxOffsetNumber(state->stack->page));
+ off = ( PageIsEmpty(state->stack->page) ) ?
+ FirstOffsetNumber : OffsetNumberNext(PageGetMaxOffsetNumber(state->stack->page));
+
l = gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, off);
+ oldlsn = PageGetLSN(state->stack->page);
if ( !state->r->rd_istemp ) {
OffsetNumber noffs=0, offs[ MAXALIGN( sizeof(OffsetNumber) ) / sizeof(OffsetNumber) ];
XLogRecPtr recptr;
XLogRecData *rdata;
- if ( state->stack->todelete ) {
+ if ( !is_leaf ) {
+ /*only on inner page we should delete previous version */
offs[0] = state->stack->childoffnum;
noffs=1;
}
rdata = formUpdateRdata(state->r->rd_node, state->stack->blkno,
offs, noffs, false, state->itup, state->ituplen,
- &(state->key), state->path, state->pathlen);
+ &(state->key));
START_CRIT_SECTION();
PageSetTLI(state->stack->page, ThisTimeLineID);
END_CRIT_SECTION();
- }
+ } else
+ PageSetLSN(state->stack->page, XLogRecPtrForTemp);
if ( state->stack->blkno == GIST_ROOT_BLKNO )
state->needInsertComplete=false;
- WriteBuffer(state->stack->buffer);
+ WriteNoReleaseBuffer(state->stack->buffer);
+
+ if (!is_leaf) /* small optimization: inform scan ablout deleting... */
+ gistadjscans(state->r, GISTOP_DEL, state->stack->blkno,
+ state->stack->childoffnum, PageGetLSN(state->stack->page), oldlsn );
if (state->ituplen > 1)
{ /* previous is_splitted==true */
return is_splitted;
}
+/*
+ * returns stack of pages, all pages in stack are pinned, and
+ * leaf is X-locked
+ */
+
static void
gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
{
ItemId iid;
- IndexTuple oldtup;
- GISTInsertStack *ptr;
+ IndexTuple idxtuple;
+ GISTPageOpaque opaque;
+
+ /* walk down, We don't lock page for a long time, but so
+ we should be ready to recheck path in a bad case...
+ We remember, that page->lsn should never be invalid. */
+ while( true ) {
+
+ if ( XLogRecPtrIsInvalid( state->stack->lsn ) )
+ state->stack->buffer = ReadBuffer(state->r, state->stack->blkno);
+ LockBuffer( state->stack->buffer, GIST_SHARE );
- /* walk down */
- while( true ) {
- state->stack->buffer = gistReadBuffer(state->r, state->stack->blkno);
state->stack->page = (Page) BufferGetPage(state->stack->buffer);
+ opaque = GistPageGetOpaque(state->stack->page);
+
+ state->stack->lsn = PageGetLSN(state->stack->page);
+ Assert( state->r->rd_istemp || !XLogRecPtrIsInvalid( state->stack->lsn ) );
+
+ if ( state->stack->blkno != GIST_ROOT_BLKNO &&
+ XLByteLT( state->stack->parent->lsn, opaque->nsn) ) {
+ /* caused split non-root page is detected, go up to parent to choose best child */
+ LockBuffer( state->stack->buffer, GIST_UNLOCK );
+ ReleaseBuffer( state->stack->buffer );
+ state->stack = state->stack->parent;
+ continue;
+ }
+
if (!GistPageIsLeaf(state->stack->page))
{
* split, or the key in this node needs to be adjusted for the
* newly inserted key below us.
*/
- GISTInsertStack *item=(GISTInsertStack*)palloc(sizeof(GISTInsertStack));
+ GISTInsertStack *item=(GISTInsertStack*)palloc0(sizeof(GISTInsertStack));
state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate);
iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
- oldtup = (IndexTuple) PageGetItem(state->stack->page, iid);
- item->blkno = ItemPointerGetBlockNumber(&(oldtup->t_tid));
+ idxtuple = (IndexTuple) PageGetItem(state->stack->page, iid);
+ item->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
+ LockBuffer( state->stack->buffer, GIST_UNLOCK );
+
item->parent = state->stack;
- item->todelete = false;
+ item->child = NULL;
+ if ( state->stack )
+ state->stack->child = item;
state->stack = item;
- } else
+ } else {
+ /* be carefull, during unlock/lock page may be changed... */
+ LockBuffer( state->stack->buffer, GIST_UNLOCK );
+ LockBuffer( state->stack->buffer, GIST_EXCLUSIVE );
+ state->stack->page = (Page) BufferGetPage(state->stack->buffer);
+ opaque = GistPageGetOpaque(state->stack->page);
+
+ if ( state->stack->blkno == GIST_ROOT_BLKNO ) {
+ /* the only page can become inner instead of leaf is a root page,
+ so for root we should recheck it */
+ if ( !GistPageIsLeaf(state->stack->page) ) {
+ /* very rarely situation: during unlock/lock index
+ with number of pages = 1 was increased */
+ LockBuffer( state->stack->buffer, GIST_UNLOCK );
+ continue;
+ }
+ /* we don't need to check root split, because checking
+ leaf/inner is enough to recognize split for root */
+
+ } else if ( XLByteLT( state->stack->parent->lsn, opaque->nsn) ) {
+ /* detecting split during unlock/lock, so we should
+ find better child on parent*/
+
+ /* forget buffer */
+ LockBuffer( state->stack->buffer, GIST_UNLOCK );
+ ReleaseBuffer( state->stack->buffer );
+
+ state->stack = state->stack->parent;
+ continue;
+ }
+
+ state->stack->lsn = PageGetLSN( state->stack->page );
+
+ /* ok we found a leaf page and it X-locked */
break;
+ }
}
- /* now state->stack->(page, buffer and blkno) points to leaf page, so insert */
+ /* now state->stack->(page, buffer and blkno) points to leaf page */
+}
- /* form state->path to work xlog */
- ptr = state->stack;
- state->pathlen=1;
- while( ptr ) {
- state->pathlen++;
- ptr=ptr->parent;
- }
- state->path=(BlockNumber*)palloc(MAXALIGN(sizeof(BlockNumber)*state->pathlen));
- ptr = state->stack;
- state->pathlen=0;
- while( ptr ) {
- state->path[ state->pathlen ] = ptr->blkno;
- state->pathlen++;
- ptr=ptr->parent;
+/*
+ * Should have the same interface as XLogReadBuffer
+ */
+static Buffer
+gistReadAndLockBuffer( bool unused, Relation r, BlockNumber blkno ) {
+ Buffer buffer = ReadBuffer( r, blkno );
+ LockBuffer( buffer, GIST_SHARE );
+ return buffer;
+}
+
+/*
+ * Traverse the tree to find path from root page,
+ * to prevent deadlocks, it should lock only one page simultaneously.
+ * Function uses in recovery and usial mode, so should work with different
+ * read functions (gistReadAndLockBuffer and XLogReadBuffer)
+ * returns from the begining of closest parent;
+ */
+GISTInsertStack*
+gistFindPath( Relation r, BlockNumber child, Buffer (*myReadBuffer)(bool, Relation, BlockNumber) ) {
+ Page page;
+ Buffer buffer;
+ OffsetNumber i, maxoff;
+ ItemId iid;
+ IndexTuple idxtuple;
+ GISTInsertStack *top, *tail, *ptr;
+ BlockNumber blkno;
+
+ top = tail = (GISTInsertStack*)palloc0( sizeof(GISTInsertStack) );
+ top->blkno = GIST_ROOT_BLKNO;
+
+ while( top && top->blkno != child ) {
+ buffer = myReadBuffer(false, r, top->blkno); /* buffer locked */
+ page = (Page)BufferGetPage( buffer );
+ Assert( !GistPageIsLeaf(page) );
+
+ top->lsn = PageGetLSN(page);
+
+ if ( top->parent && XLByteLT( top->parent->lsn, GistPageGetOpaque(page)->nsn) &&
+ GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */) {
+ /* page splited while we thinking of... */
+ ptr = (GISTInsertStack*)palloc0( sizeof(GISTInsertStack) );
+ ptr->blkno = GistPageGetOpaque(page)->rightlink;
+ ptr->childoffnum = InvalidOffsetNumber;
+ ptr->parent = top;
+ ptr->next = NULL;
+ tail->next = ptr;
+ tail = ptr;
+ }
+
+ maxoff = PageGetMaxOffsetNumber(page);
+
+ for(i = FirstOffsetNumber; i<= maxoff; i = OffsetNumberNext(i)) {
+ iid = PageGetItemId(page, i);
+ idxtuple = (IndexTuple) PageGetItem(page, iid);
+ blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
+ if ( blkno == child ) {
+ OffsetNumber poff = InvalidOffsetNumber;
+
+ /* make childs links */
+ ptr = top;
+ while( ptr->parent ) {
+ /* set child link */
+ ptr->parent->child = ptr;
+ /* move childoffnum.. */
+ if ( ptr == top ) {
+ /*first iteration*/
+ poff = ptr->parent->childoffnum;
+ ptr->parent->childoffnum = ptr->childoffnum;
+ } else {
+ OffsetNumber tmp = ptr->parent->childoffnum;
+ ptr->parent->childoffnum = poff;
+ poff = tmp;
+ }
+ ptr = ptr->parent;
+ }
+ top->childoffnum = i;
+ LockBuffer( buffer, GIST_UNLOCK );
+ ReleaseBuffer( buffer );
+ return top;
+ } else if ( GistPageGetOpaque(page)->level> 0 ) {
+ /* Install next inner page to the end of stack */
+ ptr = (GISTInsertStack*)palloc0( sizeof(GISTInsertStack) );
+ ptr->blkno = blkno;
+ ptr->childoffnum = i; /* set offsetnumber of child to child !!! */
+ ptr->parent = top;
+ ptr->next = NULL;
+ tail->next = ptr;
+ tail = ptr;
+ }
+ }
+
+ LockBuffer( buffer, GIST_UNLOCK );
+ ReleaseBuffer( buffer );
+ top = top->next;
}
- state->pathlen--;
- state->path++;
+
+ return NULL;
}
+/*
+ * Returns X-locked parent of stack page
+ */
+
+static void
+gistFindCorrectParent( Relation r, GISTInsertStack *child ) {
+ GISTInsertStack *parent = child->parent;
+
+ LockBuffer( parent->buffer, GIST_EXCLUSIVE );
+ parent->page = (Page)BufferGetPage( parent->buffer );
+
+
+ /* here we don't need to distinguish between split and page update */
+ if ( parent->childoffnum == InvalidOffsetNumber || !XLByteEQ( parent->lsn, PageGetLSN(parent->page) ) ) {
+ /* parent is changed, look child in right links until found */
+ OffsetNumber i, maxoff;
+ ItemId iid;
+ IndexTuple idxtuple;
+ GISTInsertStack *ptr;
+
+ while(true) {
+ maxoff = PageGetMaxOffsetNumber(parent->page);
+ for(i = FirstOffsetNumber; i<= maxoff; i = OffsetNumberNext(i)) {
+ iid = PageGetItemId(parent->page, i);
+ idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
+ if ( ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno ) {
+ /* yes!!, found */
+ parent->childoffnum = i;
+ return;
+ }
+ }
+
+ parent->blkno = GistPageGetOpaque( parent->page )->rightlink;
+ LockBuffer( parent->buffer, GIST_UNLOCK );
+ ReleaseBuffer( parent->buffer );
+ if ( parent->blkno == InvalidBlockNumber )
+ /* end of chain and still didn't found parent,
+ It's very-very rare situation when root splited */
+ break;
+ parent->buffer = ReadBuffer( r, parent->blkno );
+ LockBuffer( parent->buffer, GIST_EXCLUSIVE );
+ parent->page = (Page)BufferGetPage( parent->buffer );
+ }
+
+ /* awful!!, we need search tree to find parent ... ,
+ but before we should release all old parent */
+
+ ptr = child->parent->parent; /* child->parent already released above */
+ while(ptr) {
+ ReleaseBuffer( ptr->buffer );
+ ptr = ptr->parent;
+ }
+
+ /* ok, find new path */
+ ptr = parent = gistFindPath(r, child->blkno, gistReadAndLockBuffer);
+ Assert( ptr!=NULL );
+
+ /* read all buffers as supposed in caller */
+ while( ptr ) {
+ ptr->buffer = ReadBuffer( r, ptr->blkno );
+ ptr->page = (Page)BufferGetPage( ptr->buffer );
+ ptr = ptr->parent;
+ }
+
+ /* install new chain of parents to stack */
+ child->parent = parent;
+ parent->child = child;
+
+ /* make recursive call to normal processing */
+ gistFindCorrectParent( r, child );
+ }
+
+ return;
+}
+
void
gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) {
int is_splitted;
* then itup contains additional for adjustment of current key
*/
+ if ( state->stack->parent ) {
+ /* X-lock parent page before proceed child,
+ gistFindCorrectParent should find and lock it */
+ gistFindCorrectParent( state->r, state->stack );
+ }
is_splitted = gistplacetopage(state, giststate);
- /* pop page from stack */
+ /* parent locked above, so release child buffer */
+ LockBuffer(state->stack->buffer, GIST_UNLOCK );
+ ReleaseBuffer( state->stack->buffer );
+
+ /* pop parent page from stack */
state->stack = state->stack->parent;
- state->pathlen--;
- state->path++;
/* stack is void */
if ( ! state->stack )
break;
-
- /* child did not split */
+ /* child did not split, so we can check is it needed to update parent tuple */
if (!is_splitted)
{
/* parent's tuple */
oldtup = (IndexTuple) PageGetItem(state->stack->page, iid);
newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate);
- if (!newtup) /* not need to update key */
+ if (!newtup) { /* not need to update key */
+ LockBuffer( state->stack->buffer, GIST_UNLOCK );
break;
+ }
state->itup[0] = newtup;
- }
-
- /*
- * This node's key has been modified, either because a child
- * split occurred or because we needed to adjust our key for
- * an insert in a child node. Therefore, remove the old
- * version of this node's key.
- */
-
- gistadjscans(state->r, GISTOP_DEL, state->stack->blkno, state->stack->childoffnum);
- PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
- if ( !state->r->rd_istemp )
- state->stack->todelete = true;
-
- /*
- * if child was splitted, new key for child will be inserted in
- * the end list of child, so we must say to any scans that page is
- * changed beginning from 'child' offset
- */
- if (is_splitted)
- gistadjscans(state->r, GISTOP_SPLIT, state->stack->blkno, state->stack->childoffnum);
+ }
} /* while */
- /* release all buffers */
+ /* release all parent buffers */
while( state->stack ) {
ReleaseBuffer(state->stack->buffer);
state->stack = state->stack->parent;
OffsetNumber *realoffset;
IndexTuple *cleaneditup = itup;
int lencleaneditup = *len;
+ int level;
p = (Page) BufferGetPage(buffer);
- opaque = (GISTPageOpaque) PageGetSpecialPointer(p);
+ opaque = GistPageGetOpaque(p);
+ level = opaque->level;
/*
* The root of the tree is the first block in the relation. If we're
*/
if (BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO)
{
- leftbuf = gistReadBuffer(r, P_NEW);
+ leftbuf = gistNewBuffer(r);
GISTInitBuffer(leftbuf, opaque->flags&F_LEAF);
lbknum = BufferGetBlockNumber(leftbuf);
left = (Page) BufferGetPage(leftbuf);
+ GistPageGetOpaque(left)->level = level;
}
else
{
leftbuf = buffer;
- IncrBufferRefCount(buffer);
+ /* IncrBufferRefCount(buffer); */
lbknum = BufferGetBlockNumber(buffer);
left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData));
}
- rightbuf = gistReadBuffer(r, P_NEW);
+ rightbuf = gistNewBuffer(r);
GISTInitBuffer(rightbuf, opaque->flags&F_LEAF);
rbknum = BufferGetBlockNumber(rightbuf);
right = (Page) BufferGetPage(rightbuf);
+ GistPageGetOpaque(right)->level = level;
/* generate the item array */
realoffset = palloc((*len + 1) * sizeof(OffsetNumber));
{
nlen = v.spl_nright;
newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate);
- ReleaseBuffer(rightbuf);
+ /* ReleaseBuffer(rightbuf); */
}
else
{
IndexTuple *lntup;
lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate);
- ReleaseBuffer(leftbuf);
+ /* ReleaseBuffer(leftbuf); */
newtup = gistjoinvector(newtup, &nlen, lntup, llen);
}
}
void
-gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key)
+gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key)
{
- Buffer buffer;
Page page;
+ int level;
- buffer = gistReadBuffer(r, GIST_ROOT_BLKNO);
- GISTInitBuffer(buffer, 0);
+ Assert( BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO );
page = BufferGetPage(buffer);
+ level = GistPageGetOpaque(page)->level;
+ GISTInitBuffer(buffer, 0);
+ GistPageGetOpaque(page)->level = level+1;
gistfillbuffer(r, page, itup, len, FirstOffsetNumber);
if ( !r->rd_istemp ) {
XLogRecData *rdata;
rdata = formUpdateRdata(r->rd_node, GIST_ROOT_BLKNO,
- NULL, 0, false, itup, len,
- key, NULL, 0);
+ NULL, 0, false, itup, len, key);
START_CRIT_SECTION();
PageSetTLI(page, ThisTimeLineID);
END_CRIT_SECTION();
- }
- WriteBuffer(buffer);
+ } else
+ PageSetLSN(page, XLogRecPtrForTemp);
}
void
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.49 2005/06/20 10:29:36 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.50 2005/06/27 12:45:22 teodor Exp $
*
*-------------------------------------------------------------------------
*/
static OffsetNumber gistfindnext(IndexScanDesc scan, OffsetNumber n,
ScanDirection dir);
-static bool gistnext(IndexScanDesc scan, ScanDirection dir);
+static int gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, bool ignore_killed_tuples);
static bool gistindex_keytest(IndexTuple tuple, IndexScanDesc scan,
OffsetNumber offset);
+static void
+killtuple(Relation r, GISTScanOpaque so, ItemPointer iptr) {
+ Buffer buffer = so->curbuf;
+
+ for(;;) {
+ Page p;
+ BlockNumber blkno;
+ OffsetNumber offset, maxoff;
+
+ LockBuffer( buffer, GIST_SHARE );
+ p = (Page)BufferGetPage( buffer );
+
+ if ( buffer == so->curbuf && XLByteEQ( so->stack->lsn, PageGetLSN(p) ) ) {
+ /* page unchanged, so all is simple */
+ offset = ItemPointerGetOffsetNumber(iptr);
+ PageGetItemId(p, offset)->lp_flags |= LP_DELETE;
+ SetBufferCommitInfoNeedsSave(buffer);
+ LockBuffer( buffer, GIST_UNLOCK );
+ break;
+ }
+
+ maxoff = PageGetMaxOffsetNumber( p );
+
+ for(offset = FirstOffsetNumber; offset<= maxoff; offset = OffsetNumberNext(offset)) {
+ IndexTuple ituple = (IndexTuple) PageGetItem(p, PageGetItemId(p, offset));
+
+ if ( ItemPointerEquals( &(ituple->t_tid), iptr ) ) {
+ /* found */
+ PageGetItemId(p, offset)->lp_flags |= LP_DELETE;
+ SetBufferCommitInfoNeedsSave(buffer);
+ LockBuffer( buffer, GIST_UNLOCK );
+ if ( buffer != so->curbuf )
+ ReleaseBuffer( buffer );
+ return;
+ }
+ }
+
+ /* follow right link */
+ /*
+ * ??? is it good? if tuple dropped by concurrent vacuum,
+ * we will read all leaf pages...
+ */
+ blkno = GistPageGetOpaque(p)->rightlink;
+ LockBuffer( buffer, GIST_UNLOCK );
+ if ( buffer != so->curbuf )
+ ReleaseBuffer( buffer );
+
+ if ( blkno==InvalidBlockNumber )
+ /* can't found, dropped by somebody else */
+ return;
+ buffer = ReadBuffer( r, blkno );
+ }
+}
/*
* gistgettuple() -- Get the next tuple in the scan
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
- Page page;
- OffsetNumber offnum;
GISTScanOpaque so;
+ ItemPointerData tid;
+ bool res;
so = (GISTScanOpaque) scan->opaque;
/*
* If we have produced an index tuple in the past and the executor
* has informed us we need to mark it as "killed", do so now.
- *
- * XXX: right now there is no concurrent access. In the
- * future, we should (a) get a read lock on the page (b) check
- * that the location of the previously-fetched tuple hasn't
- * changed due to concurrent insertions.
*/
- if (scan->kill_prior_tuple && ItemPointerIsValid(&(scan->currentItemData)))
- {
- offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
- page = BufferGetPage(so->curbuf);
- PageGetItemId(page, offnum)->lp_flags |= LP_DELETE;
- SetBufferCommitInfoNeedsSave(so->curbuf);
- }
+ if (scan->kill_prior_tuple && ItemPointerIsValid(&(scan->currentItemData)))
+ killtuple(scan->indexRelation, so, &(scan->currentItemData));
/*
* Get the next tuple that matches the search key. If asked to
* skip killed tuples, continue looping until we find a non-killed
* tuple that matches the search key.
*/
- for (;;)
- {
- bool res = gistnext(scan, dir);
-
- if (res == true && scan->ignore_killed_tuples)
- {
- offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
- page = BufferGetPage(so->curbuf);
- if (ItemIdDeleted(PageGetItemId(page, offnum)))
- continue;
- }
+ res = ( gistnext(scan, dir, &tid, 1, scan->ignore_killed_tuples) ) ? true : false;
- PG_RETURN_BOOL(res);
- }
+ PG_RETURN_BOOL(res);
}
Datum
ItemPointer tids = (ItemPointer) PG_GETARG_POINTER(1);
int32 max_tids = PG_GETARG_INT32(2);
int32 *returned_tids = (int32 *) PG_GETARG_POINTER(3);
- bool res = true;
- int32 ntids = 0;
- /* XXX generic implementation: loop around guts of gistgettuple */
- while (ntids < max_tids)
- {
- res = gistnext(scan, ForwardScanDirection);
- if (!res)
- break;
- tids[ntids] = scan->xs_ctup.t_self;
- ntids++;
- }
-
- *returned_tids = ntids;
- PG_RETURN_BOOL(res);
+ *returned_tids = gistnext(scan, ForwardScanDirection, tids, max_tids, false);
+
+ PG_RETURN_BOOL(*returned_tids == max_tids);
}
/*
- * Fetch a tuple that matchs the search key; this can be invoked
+ * Fetch a tuples that matchs the search key; this can be invoked
* either to fetch the first such tuple or subsequent matching
* tuples. Returns true iff a matching tuple was found.
*/
-static bool
-gistnext(IndexScanDesc scan, ScanDirection dir)
+static int
+gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, bool ignore_killed_tuples)
{
Page p;
OffsetNumber n;
GISTScanOpaque so;
- GISTSTACK *stk;
+ GISTSearchStack *stk;
IndexTuple it;
+ GISTPageOpaque opaque;
+ bool resetoffset=false;
+ int ntids=0;
so = (GISTScanOpaque) scan->opaque;
{
/* Being asked to fetch the first entry, so start at the root */
Assert(so->curbuf == InvalidBuffer);
- so->curbuf = ReadBuffer(scan->indexRelation, GIST_ROOT_BLKNO);
- }
+ Assert(so->stack == NULL);
- p = BufferGetPage(so->curbuf);
+ so->curbuf = ReadBuffer(scan->indexRelation, GIST_ROOT_BLKNO);
+
+ stk = so->stack = (GISTSearchStack*) palloc0( sizeof(GISTSearchStack) );
- if (ItemPointerIsValid(&scan->currentItemData) == false)
- {
- if (ScanDirectionIsBackward(dir))
- n = PageGetMaxOffsetNumber(p);
- else
- n = FirstOffsetNumber;
+ stk->next = NULL;
+ stk->block = GIST_ROOT_BLKNO;
+ } else if ( so->curbuf == InvalidBuffer ) {
+ return 0;
}
- else
- {
- n = ItemPointerGetOffsetNumber(&(scan->currentItemData));
- if (ScanDirectionIsBackward(dir))
- n = OffsetNumberPrev(n);
- else
- n = OffsetNumberNext(n);
- }
+ for(;;) {
+ /* First of all, we need lock buffer */
+ Assert( so->curbuf != InvalidBuffer );
+ LockBuffer( so->curbuf, GIST_SHARE );
+ p = BufferGetPage(so->curbuf);
+ opaque = GistPageGetOpaque( p );
+ resetoffset = false;
+
+ if ( XLogRecPtrIsInvalid( so->stack->lsn ) || !XLByteEQ( so->stack->lsn, PageGetLSN(p) ) ) {
+ /* page changed from last visit or visit first time , reset offset */
+ so->stack->lsn = PageGetLSN(p);
+ resetoffset = true;
+
+ /* check page split, occured from last visit or visit to parent */
+ if ( !XLogRecPtrIsInvalid( so->stack->parentlsn ) &&
+ XLByteLT( so->stack->parentlsn, opaque->nsn ) &&
+ opaque->rightlink != InvalidBlockNumber /* sanity check */ &&
+ (so->stack->next==NULL || so->stack->next->block != opaque->rightlink) /* check if already added */) {
+ /* detect page split, follow right link to add pages */
+
+ stk = (GISTSearchStack*) palloc( sizeof(GISTSearchStack) );
+ stk->next = so->stack->next;
+ stk->block = opaque->rightlink;
+ stk->parentlsn = so->stack->parentlsn;
+ memset( &(stk->lsn), 0, sizeof(GistNSN) );
+ so->stack->next = stk;
+ }
+ }
- for (;;)
- {
- n = gistfindnext(scan, n, dir);
+ /* if page is empty, then just skip it */
+ if ( PageIsEmpty(p) ) {
+ LockBuffer( so->curbuf, GIST_UNLOCK );
+ stk = so->stack->next;
+ pfree( so->stack );
+ so->stack = stk;
- if (!OffsetNumberIsValid(n))
- {
- /*
- * We ran out of matching index entries on the current
- * page, so pop the top stack entry and use it to continue
- * the search.
- */
- /* If we're out of stack entries, we're done */
- if (so->stack == NULL)
- {
+ if (so->stack == NULL) {
ReleaseBuffer(so->curbuf);
so->curbuf = InvalidBuffer;
- return false;
+ return ntids;
}
- stk = so->stack;
so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation,
- stk->block);
- p = BufferGetPage(so->curbuf);
+ stk->block);
+ continue;
+ }
+ if (!GistPageIsLeaf(p) || resetoffset || ItemPointerIsValid(&scan->currentItemData) == false)
+ {
if (ScanDirectionIsBackward(dir))
- n = OffsetNumberPrev(stk->offset);
+ n = PageGetMaxOffsetNumber(p);
else
- n = OffsetNumberNext(stk->offset);
-
- so->stack = stk->parent;
- pfree(stk);
-
- continue;
+ n = FirstOffsetNumber;
+ }
+ else
+ {
+ n = ItemPointerGetOffsetNumber(&(scan->currentItemData));
+
+ if (ScanDirectionIsBackward(dir))
+ n = OffsetNumberPrev(n);
+ else
+ n = OffsetNumberNext(n);
}
- if (GistPageIsLeaf(p))
+ /* wonderfull, we can look at page */
+
+ for(;;)
{
- /*
- * We've found a matching index entry in a leaf page, so
- * return success. Note that we keep "curbuf" pinned so
- * that we can efficiently resume the index scan later.
- */
- ItemPointerSet(&(scan->currentItemData),
+ n = gistfindnext(scan, n, dir);
+
+ if (!OffsetNumberIsValid(n))
+ {
+ /*
+ * We ran out of matching index entries on the current
+ * page, so pop the top stack entry and use it to continue
+ * the search.
+ */
+ LockBuffer( so->curbuf, GIST_UNLOCK );
+ stk = so->stack->next;
+ pfree( so->stack );
+ so->stack = stk;
+
+ /* If we're out of stack entries, we're done */
+
+ if (so->stack == NULL)
+ {
+ ReleaseBuffer(so->curbuf);
+ so->curbuf = InvalidBuffer;
+ return ntids;
+ }
+
+ so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation,
+ stk->block);
+ /* XXX go up */
+ break;
+ }
+
+ if (GistPageIsLeaf(p))
+ {
+ /*
+ * We've found a matching index entry in a leaf page, so
+ * return success. Note that we keep "curbuf" pinned so
+ * that we can efficiently resume the index scan later.
+ */
+
+ ItemPointerSet(&(scan->currentItemData),
BufferGetBlockNumber(so->curbuf), n);
- it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
- scan->xs_ctup.t_self = it->t_tid;
- return true;
- }
- else
- {
- /*
- * We've found an entry in an internal node whose key is
- * consistent with the search key, so continue the search
- * in the pointed-to child node (i.e. we search depth
- * first). Push the current node onto the stack so we
- * resume searching from this node later.
- */
- BlockNumber child_block;
-
- stk = (GISTSTACK *) palloc(sizeof(GISTSTACK));
- stk->offset = n;
- stk->block = BufferGetBlockNumber(so->curbuf);
- stk->parent = so->stack;
- so->stack = stk;
+ if ( ! ( ignore_killed_tuples && ItemIdDeleted(PageGetItemId(p, n)) ) ) {
+ it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
+ tids[ntids] = scan->xs_ctup.t_self = it->t_tid;
+ ntids++;
+
+ if ( ntids == maxtids ) {
+ LockBuffer( so->curbuf, GIST_UNLOCK );
+ return ntids;
+ }
+ }
+ }
+ else
+ {
+ /*
+ * We've found an entry in an internal node whose key is
+ * consistent with the search key, so push it to stack
+ */
- it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
- child_block = ItemPointerGetBlockNumber(&(it->t_tid));
+ stk = (GISTSearchStack *) palloc(sizeof(GISTSearchStack));
- so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation,
- child_block);
- p = BufferGetPage(so->curbuf);
+ it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
+ stk->block = ItemPointerGetBlockNumber(&(it->t_tid));
+ memset( &(stk->lsn), 0, sizeof(GistNSN) );
+ stk->parentlsn = so->stack->lsn;
+
+ stk->next = so->stack->next;
+ so->stack->next = stk;
+
+ }
if (ScanDirectionIsBackward(dir))
- n = PageGetMaxOffsetNumber(p);
+ n = OffsetNumberPrev(n);
else
- n = FirstOffsetNumber;
+ n = OffsetNumberNext(n);
}
}
+
+ return ntids;
}
/*
* Return the offset of the first index entry that is consistent with
* the search key after offset 'n' in the current page. If there are
* no more consistent entries, return InvalidOffsetNumber.
+ * Page should be locked....
*/
static OffsetNumber
gistfindnext(IndexScanDesc scan, OffsetNumber n, ScanDirection dir)
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistscan.c,v 1.58 2005/05/17 03:34:18 neilc Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistscan.c,v 1.59 2005/06/27 12:45:22 teodor Exp $
*
*-------------------------------------------------------------------------
*/
static void gistregscan(IndexScanDesc scan);
static void gistdropscan(IndexScanDesc scan);
static void gistadjone(IndexScanDesc scan, int op, BlockNumber blkno,
- OffsetNumber offnum);
-static void adjuststack(GISTSTACK *stk, BlockNumber blkno);
-static void adjustiptr(IndexScanDesc scan, ItemPointer iptr,
- int op, BlockNumber blkno, OffsetNumber offnum);
-static void gistfreestack(GISTSTACK *s);
+ OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn);
+static void adjustiptr(IndexScanDesc scan, ItemPointer iptr, GISTSearchStack *stk,
+ int op, BlockNumber blkno, OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn);
+static void gistfreestack(GISTSearchStack *s);
/*
* Whenever we start a GiST scan in a backend, we register it in
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
GISTScanOpaque so;
- GISTSTACK *o,
+ GISTSearchStack *o,
*n,
*tmp;
/* copy the parent stack from the current item data */
while (n != NULL)
{
- tmp = (GISTSTACK *) palloc(sizeof(GISTSTACK));
- tmp->offset = n->offset;
+ tmp = (GISTSearchStack *) palloc(sizeof(GISTSearchStack));
+ tmp->lsn = n->lsn;
+ tmp->parentlsn = n->parentlsn;
tmp->block = n->block;
- tmp->parent = o;
+ tmp->next = o;
o = tmp;
- n = n->parent;
+ n = n->next;
}
gistfreestack(so->markstk);
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
GISTScanOpaque so;
- GISTSTACK *o,
+ GISTSearchStack *o,
*n,
*tmp;
/* copy the parent stack from the current item data */
while (n != NULL)
{
- tmp = (GISTSTACK *) palloc(sizeof(GISTSTACK));
- tmp->offset = n->offset;
+ tmp = (GISTSearchStack *) palloc(sizeof(GISTSearchStack));
+ tmp->lsn = n->lsn;
+ tmp->parentlsn = n->parentlsn;
tmp->block = n->block;
- tmp->parent = o;
+ tmp->next = o;
o = tmp;
- n = n->parent;
+ n = n->next;
}
gistfreestack(so->stack);
pfree(scan->opaque);
}
+
gistdropscan(scan);
PG_RETURN_VOID();
}
void
-gistadjscans(Relation rel, int op, BlockNumber blkno, OffsetNumber offnum)
+gistadjscans(Relation rel, int op, BlockNumber blkno, OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn)
{
GISTScanList l;
Oid relid;
+ if ( XLogRecPtrIsInvalid(newlsn) || XLogRecPtrIsInvalid(oldlsn) )
+ return;
+
relid = RelationGetRelid(rel);
for (l = GISTScans; l != NULL; l = l->gsl_next)
{
if (l->gsl_scan->indexRelation->rd_id == relid)
- gistadjone(l->gsl_scan, op, blkno, offnum);
+ gistadjone(l->gsl_scan, op, blkno, offnum, newlsn, oldlsn);
}
}
gistadjone(IndexScanDesc scan,
int op,
BlockNumber blkno,
- OffsetNumber offnum)
+ OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn)
{
- GISTScanOpaque so;
-
- adjustiptr(scan, &(scan->currentItemData), op, blkno, offnum);
- adjustiptr(scan, &(scan->currentMarkData), op, blkno, offnum);
-
- so = (GISTScanOpaque) scan->opaque;
+ GISTScanOpaque so = (GISTScanOpaque) scan->opaque ;
- if (op == GISTOP_SPLIT)
- {
- adjuststack(so->stack, blkno);
- adjuststack(so->markstk, blkno);
- }
+ adjustiptr(scan, &(scan->currentItemData), so->stack, op, blkno, offnum, newlsn, oldlsn);
+ adjustiptr(scan, &(scan->currentMarkData), so->markstk, op, blkno, offnum, newlsn, oldlsn);
}
/*
*/
static void
adjustiptr(IndexScanDesc scan,
- ItemPointer iptr,
+ ItemPointer iptr, GISTSearchStack *stk,
int op,
BlockNumber blkno,
- OffsetNumber offnum)
+ OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn)
{
OffsetNumber curoff;
GISTScanOpaque so;
{
case GISTOP_DEL:
/* back up one if we need to */
- if (curoff >= offnum)
+ if (curoff >= offnum && XLByteEQ(stk->lsn, oldlsn) ) /* the same vesrion of page */
{
if (curoff > FirstOffsetNumber)
{
else
so->flags |= GS_MRKBEFORE;
}
+ stk->lsn = newlsn;
}
break;
-
- case GISTOP_SPLIT:
- /* back to start of page on split */
- ItemPointerSet(iptr, blkno, FirstOffsetNumber);
- if (iptr == &(scan->currentItemData))
- so->flags &= ~GS_CURBEFORE;
- else
- so->flags &= ~GS_MRKBEFORE;
- break;
-
default:
elog(ERROR, "Bad operation in GiST scan adjust: %d", op);
}
}
}
-/*
- * adjuststack() -- adjust the supplied stack for a split on a page in
- * the index we're scanning.
- *
- * If a page on our parent stack has split, we need to back up to the
- * beginning of the page and rescan it. The reason for this is that
- * the split algorithm for GiSTs doesn't order tuples in any useful
- * way on a single page. This means on that a split, we may wind up
- * looking at some heap tuples more than once. This is handled in the
- * access method update code for heaps; if we've modified the tuple we
- * are looking at already in this transaction, we ignore the update
- * request.
- */
-static void
-adjuststack(GISTSTACK *stk, BlockNumber blkno)
-{
- while (stk != NULL)
- {
- if (stk->block == blkno)
- stk->offset = FirstOffsetNumber;
-
- stk = stk->parent;
- }
-}
-
static void
-gistfreestack(GISTSTACK *s)
+gistfreestack(GISTSearchStack *s)
{
while (s != NULL)
{
- GISTSTACK *p = s->parent;
+ GISTSearchStack *p = s->next;
pfree(s);
s = p;
}
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.2 2005/06/20 10:29:36 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.3 2005/06/27 12:45:22 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
page = BufferGetPage(b);
PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
- opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
+ opaque = GistPageGetOpaque(page);
opaque->flags = f;
+ opaque->nsplited = 0;
+ opaque->level = 0;
+ opaque->rightlink = InvalidBlockNumber;
+ memset( &(opaque->nsn), 0, sizeof(GistNSN) );
}
void
}
Buffer
-gistReadBuffer(Relation r, BlockNumber blkno) {
+gistNewBuffer(Relation r) {
Buffer buffer = InvalidBuffer;
+ bool needLock;
- if ( blkno != P_NEW ) {
- buffer = ReadBuffer(r, blkno);
- } else {
- Page page;
-
- while(true) {
- blkno = GetFreeIndexPage(&r->rd_node);
- if (blkno == InvalidBlockNumber)
- break;
+ while(true) {
+ BlockNumber blkno = GetFreeIndexPage(&r->rd_node);
+ if (blkno == InvalidBlockNumber)
+ break;
- buffer = ReadBuffer(r, blkno);
- page = BufferGetPage(buffer);
+ buffer = ReadBuffer(r, blkno);
+ if ( ConditionalLockBuffer(buffer) ) {
+ Page page = BufferGetPage(buffer);
if ( GistPageIsDeleted( page ) ) {
GistPageSetNonDeleted( page );
return buffer;
- }
- ReleaseBuffer( buffer );
+ } else
+ LockBuffer(buffer, GIST_UNLOCK);
}
- buffer = ReadBuffer(r, P_NEW);
+ ReleaseBuffer( buffer );
}
-
+
+ needLock = !RELATION_IS_LOCAL(r);
+
+ if (needLock)
+ LockRelationForExtension(r, ExclusiveLock);
+
+ buffer = ReadBuffer(r, P_NEW);
+ LockBuffer(buffer, GIST_EXCLUSIVE);
+
+ if (needLock)
+ UnlockRelationForExtension(r, ExclusiveLock);
+
return buffer;
}
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.2 2005/06/20 15:22:37 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.3 2005/06/27 12:45:22 teodor Exp $
*
*-------------------------------------------------------------------------
*/
Relation index;
MemoryContext opCtx;
IndexBulkDeleteResult *result;
-
- /* path to root */
- BlockNumber *path;
- int pathlen;
- int curpathlen;
} GistVacuum;
-static void
-shiftPath(GistVacuum *gv, BlockNumber blkno) {
- if ( gv->pathlen == 0 ) {
- gv->pathlen = 8;
- gv->path = (BlockNumber*) palloc( MAXALIGN(sizeof(BlockNumber)*gv->pathlen) );
- } else if ( gv->pathlen == gv->curpathlen ) {
- gv->pathlen *= 2;
- gv->path = (BlockNumber*) repalloc( gv->path, MAXALIGN(sizeof(BlockNumber)*gv->pathlen) );
- }
-
- if ( gv->curpathlen )
- memmove( gv->path+1, gv->path, sizeof(BlockNumber)*gv->curpathlen );
- gv->curpathlen++;
- gv->path[0] = blkno;
-}
-
-static void
-unshiftPath(GistVacuum *gv) {
- gv->curpathlen--;
- if ( gv->curpathlen )
- memmove( gv->path, gv->path+1, sizeof(BlockNumber)*gv->curpathlen );
-}
-
typedef struct {
IndexTuple *itup;
int ituplen;
bool emptypage;
} ArrayTuple;
-
static ArrayTuple
gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
ArrayTuple res = {NULL, 0, false};
completed = (ItemPointerData*)palloc( sizeof(ItemPointerData)*lencompleted );
addon=(IndexTuple*)palloc(sizeof(IndexTuple)*lenaddon);
- shiftPath(gv, blkno);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
ArrayTuple chldtuple;
bool needchildunion;
chldtuple = gistVacuumUpdate( gv, ItemPointerGetBlockNumber(&(idxtuple->t_tid)),
needchildunion );
if ( chldtuple.ituplen || chldtuple.emptypage ) {
- /* adjust any scans that will be affected by this deletion */
- gistadjscans(gv->index, GISTOP_DEL, blkno, i);
PageIndexTupleDelete(page, i);
todelete[ ntodelete++ ] = i;
i--; maxoff--;
oldCtx = MemoryContextSwitchTo(gv->opCtx);
- /* path is need to recovery because there is new pages, in a case of
- crash it's needed to add inner tuple pointers on parent page */
rdata = formSplitRdata(gv->index->rd_node, blkno,
- &key, gv->path, gv->curpathlen, dist);
+ &key, dist);
MemoryContextSwitchTo(oldCtx);
}
END_CRIT_SECTION();
-
+ } else {
+ ptr = dist;
+ while(ptr) {
+ PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
+ ptr=ptr->next;
+ }
}
ptr = dist;
while(ptr) {
+ if ( BufferGetBlockNumber(ptr->buffer) != blkno )
+ LockBuffer( ptr->buffer, GIST_UNLOCK );
WriteBuffer(ptr->buffer);
ptr=ptr->next;
}
ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
oldCtx = MemoryContextSwitchTo(gv->opCtx);
- gistnewroot(gv->index, res.itup, res.ituplen, &key);
+ gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key);
MemoryContextSwitchTo(oldCtx);
+
+ WriteNoReleaseBuffer(buffer);
}
needwrite=false;
needunion = false; /* gistSplit already forms unions */
} else {
+ /* enough free space */
OffsetNumber off = (PageIsEmpty(page)) ?
FirstOffsetNumber
:
OffsetNumberNext(PageGetMaxOffsetNumber(page));
- /* enough free space */
gistfillbuffer(gv->index, page, addon, curlenaddon, off);
}
}
- unshiftPath(gv);
}
if ( needunion ) {
if ( !gv->index->rd_istemp ) {
XLogRecData *rdata;
XLogRecPtr recptr;
- MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
+ char *xlinfo;
- /* In a vacuum, it's not need to push path, because
- there is no new inserted keys */
rdata = formUpdateRdata(gv->index->rd_node, blkno, todelete, ntodelete,
- res.emptypage, addon, curlenaddon, NULL, NULL, 0);
- MemoryContextSwitchTo(oldCtx);
-
+ res.emptypage, addon, curlenaddon, NULL );
+ xlinfo = rdata->data;
START_CRIT_SECTION();
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
END_CRIT_SECTION();
- MemoryContextReset(gv->opCtx);
- }
+
+ pfree( xlinfo );
+ pfree( rdata );
+ } else
+ PageSetLSN(page, XLogRecPtrForTemp);
WriteBuffer( buffer );
} else
ReleaseBuffer( buffer );
BlockNumber npages, blkno;
BlockNumber nFreePages, *freePages, maxFreePages;
BlockNumber lastBlock = GIST_ROOT_BLKNO, lastFilledBlock = GIST_ROOT_BLKNO;
-
- /* LockRelation(rel, AccessExclusiveLock); */
+ bool needLock;
/* gistVacuumUpdate may cause hard work */
if ( info->vacuum_full ) {
GistVacuum gv;
ArrayTuple res;
+ LockRelation(rel, AccessExclusiveLock);
+
gv.index = rel;
initGISTstate(&(gv.giststate), rel);
gv.opCtx = createTempGistContext();
gv.result = stats;
- gv.path=NULL;
- gv.pathlen = gv.curpathlen = 0;
-
/* walk through the entire index for update tuples */
res = gistVacuumUpdate( &gv, GIST_ROOT_BLKNO, false );
/* cleanup */
pfree( res.itup[i] );
pfree( res.itup );
}
- if ( gv.path )
- pfree( gv.path );
freeGISTstate(&(gv.giststate));
MemoryContextDelete(gv.opCtx);
} else if (needFullVacuum) {
needFullVacuum = false;
+ needLock = !RELATION_IS_LOCAL(rel);
+ if ( info->vacuum_full )
+ needLock = false; /* relation locked with AccessExclusiveLock */
+
/* try to find deleted pages */
+ if (needLock)
+ LockRelationForExtension(rel, ExclusiveLock);
npages = RelationGetNumberOfBlocks(rel);
- maxFreePages = RelationGetNumberOfBlocks(rel);
+ if (needLock)
+ UnlockRelationForExtension(rel, ExclusiveLock);
+
+ maxFreePages = npages;
if ( maxFreePages > MaxFSMPages )
maxFreePages = MaxFSMPages;
+
nFreePages = 0;
freePages = (BlockNumber*) palloc (sizeof(BlockNumber) * maxFreePages);
for(blkno=GIST_ROOT_BLKNO+1;blkno<npages;blkno++) {
Buffer buffer = ReadBuffer(rel, blkno);
- Page page=(Page)BufferGetPage(buffer);
+ Page page;
+
+ LockBuffer( buffer, GIST_SHARE );
+ page=(Page)BufferGetPage(buffer);
if ( GistPageIsDeleted(page) ) {
if (nFreePages < maxFreePages) {
}
} else
lastFilledBlock = blkno;
+ LockBuffer( buffer, GIST_UNLOCK );
ReleaseBuffer(buffer);
}
lastBlock = npages-1;
- if ( nFreePages > 0 ) {
- if ( info->vacuum_full ) { /* try to truncate index */
- int i;
- for(i=0;i<nFreePages;i++)
- if ( freePages[i] >= lastFilledBlock ) {
- nFreePages = i;
- break;
- }
+ if ( info->vacuum_full && nFreePages>0 ) { /* try to truncate index */
+ int i;
+ for(i=0;i<nFreePages;i++)
+ if ( freePages[i] >= lastFilledBlock ) {
+ nFreePages = i;
+ break;
+ }
- if ( lastBlock > lastFilledBlock )
- RelationTruncate( rel, lastFilledBlock+1 );
- stats->pages_removed = lastBlock - lastFilledBlock;
- }
-
- if ( nFreePages > 0 )
- RecordIndexFreeSpace( &rel->rd_node, nFreePages, freePages );
+ if ( lastBlock > lastFilledBlock )
+ RelationTruncate( rel, lastFilledBlock+1 );
+ stats->pages_removed = lastBlock - lastFilledBlock;
}
+
+ RecordIndexFreeSpace( &rel->rd_node, nFreePages, freePages );
pfree( freePages );
/* return statistics */
stats->pages_free = nFreePages;
+ if (needLock)
+ LockRelationForExtension(rel, ExclusiveLock);
stats->num_pages = RelationGetNumberOfBlocks(rel);
+ if (needLock)
+ UnlockRelationForExtension(rel, ExclusiveLock);
- /* UnlockRelation(rel, AccessExclusiveLock); */
+ if (info->vacuum_full)
+ UnlockRelation(rel, AccessExclusiveLock);
PG_RETURN_POINTER(stats);
}
typedef struct GistBDItem {
+ GistNSN parentlsn;
BlockNumber blkno;
struct GistBDItem *next;
} GistBDItem;
+static void
+pushStackIfSplited(Page page, GistBDItem *stack) {
+ GISTPageOpaque opaque = GistPageGetOpaque(page);
+
+ if ( stack->blkno!=GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid( stack->parentlsn ) &&
+ XLByteLT( stack->parentlsn, opaque->nsn) &&
+ opaque->rightlink != InvalidBlockNumber /* sanity check */ ) {
+ /* split page detected, install right link to the stack */
+
+ GistBDItem *ptr = (GistBDItem*) palloc(sizeof(GistBDItem));
+ ptr->blkno = opaque->rightlink;
+ ptr->parentlsn = stack->parentlsn;
+ ptr->next = stack->next;
+ stack->next = ptr;
+ }
+}
+
+
/*
* Bulk deletion of all index entries pointing to a set of heap tuples and
- * update invalid tuples after crash recovery.
+ * check invalid tuples after crash recovery.
* The set of target tuples is specified via a callback routine that tells
* whether any given heap tuple (identified by ItemPointer) is being deleted.
*
void* callback_state = (void *) PG_GETARG_POINTER(2);
IndexBulkDeleteResult *result = (IndexBulkDeleteResult*)palloc0(sizeof(IndexBulkDeleteResult));
GistBDItem *stack, *ptr;
- MemoryContext opCtx = createTempGistContext();
+ bool needLock;
- stack = (GistBDItem*) palloc(sizeof(GistBDItem));
+ stack = (GistBDItem*) palloc0(sizeof(GistBDItem));
stack->blkno = GIST_ROOT_BLKNO;
- stack->next = NULL;
needFullVacuum = false;
while( stack ) {
Buffer buffer = ReadBuffer(rel, stack->blkno);
- Page page = (Page) BufferGetPage(buffer);
- OffsetNumber i, maxoff = PageGetMaxOffsetNumber(page);
+ Page page;
+ OffsetNumber i, maxoff;
IndexTuple idxtuple;
ItemId iid;
- OffsetNumber *todelete = NULL;
- int ntodelete = 0;
+
+ LockBuffer(buffer, GIST_SHARE);
+ page = (Page) BufferGetPage(buffer);
if ( GistPageIsLeaf(page) ) {
- ItemPointerData heapptr;
+ OffsetNumber *todelete = NULL;
+ int ntodelete = 0;
+
+ LockBuffer(buffer, GIST_UNLOCK);
+ LockBuffer(buffer, GIST_EXCLUSIVE);
+
+ page = (Page) BufferGetPage(buffer);
+ if ( stack->blkno==GIST_ROOT_BLKNO && !GistPageIsLeaf(page) ) {
+ /* the only root can become non-leaf during relock */
+ LockBuffer(buffer, GIST_UNLOCK);
+ ReleaseBuffer(buffer);
+ /* one more check */
+ continue;
+ }
- todelete = (OffsetNumber*)palloc( MAXALIGN(sizeof(OffsetNumber)*maxoff) );
+ /* check for split proceeded after look at parent,
+ we should check it after relock */
+ pushStackIfSplited(page, stack);
+
+ maxoff = PageGetMaxOffsetNumber(page);
+ todelete = (OffsetNumber*)palloc( MAXALIGN(sizeof(OffsetNumber)*(maxoff+1)) );
for(i=FirstOffsetNumber;i<=maxoff;i=OffsetNumberNext(i)) {
iid = PageGetItemId(page, i);
idxtuple = (IndexTuple) PageGetItem(page, iid);
- heapptr = idxtuple->t_tid;
- if ( callback(&heapptr, callback_state) ) {
- gistadjscans(rel, GISTOP_DEL, stack->blkno, i);
+ if ( callback(&(idxtuple->t_tid), callback_state) ) {
PageIndexTupleDelete(page, i);
- todelete[ ntodelete++ ] = i;
- i--; maxoff--;
+ todelete[ ntodelete ] = i;
+ i--; maxoff--; ntodelete++;
result->tuples_removed += 1;
+ Assert( maxoff == PageGetMaxOffsetNumber(page) );
} else
result->num_index_tuples += 1;
}
+
+ if ( ntodelete ) {
+ GistMarkTuplesDeleted(page);
+
+ if (!rel->rd_istemp ) {
+ XLogRecData *rdata;
+ XLogRecPtr recptr;
+ gistxlogEntryUpdate *xlinfo;
+
+ rdata = formUpdateRdata(rel->rd_node, stack->blkno, todelete, ntodelete,
+ false, NULL, 0, NULL);
+ xlinfo = (gistxlogEntryUpdate*)rdata->data;
+
+ START_CRIT_SECTION();
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ END_CRIT_SECTION();
+
+ pfree( xlinfo );
+ pfree( rdata );
+ } else
+ PageSetLSN(page, XLogRecPtrForTemp);
+ WriteNoReleaseBuffer( buffer );
+ }
+
+ pfree( todelete );
} else {
+ /* check for split proceeded after look at parent */
+ pushStackIfSplited(page, stack);
+
+ maxoff = PageGetMaxOffsetNumber(page);
+
for(i=FirstOffsetNumber;i<=maxoff;i=OffsetNumberNext(i)) {
iid = PageGetItemId(page, i);
idxtuple = (IndexTuple) PageGetItem(page, iid);
ptr = (GistBDItem*) palloc(sizeof(GistBDItem));
ptr->blkno = ItemPointerGetBlockNumber( &(idxtuple->t_tid) );
+ ptr->parentlsn = PageGetLSN( page );
ptr->next = stack->next;
stack->next = ptr;
}
}
- if ( ntodelete && todelete ) {
- GistMarkTuplesDeleted(page);
-
- if (!rel->rd_istemp ) {
- XLogRecData *rdata;
- XLogRecPtr recptr;
- MemoryContext oldCtx = MemoryContextSwitchTo(opCtx);
-
- rdata = formUpdateRdata(rel->rd_node, stack->blkno, todelete, ntodelete,
- false, NULL, 0, NULL, NULL, 0);
- MemoryContextSwitchTo(oldCtx);
-
- START_CRIT_SECTION();
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
- PageSetLSN(page, recptr);
- PageSetTLI(page, ThisTimeLineID);
- END_CRIT_SECTION();
-
- MemoryContextReset(opCtx);
- }
-
- WriteBuffer( buffer );
- } else
- ReleaseBuffer( buffer );
+ LockBuffer( buffer, GIST_UNLOCK );
+ ReleaseBuffer( buffer );
- if ( todelete )
- pfree( todelete );
ptr = stack->next;
pfree( stack );
vacuum_delay_point();
}
- MemoryContextDelete( opCtx );
+ needLock = !RELATION_IS_LOCAL(rel);
+ if (needLock)
+ LockRelationForExtension(rel, ExclusiveLock);
result->num_pages = RelationGetNumberOfBlocks(rel);
-
+ if (needLock)
+ UnlockRelationForExtension(rel, ExclusiveLock);
PG_RETURN_POINTER( result );
}
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.3 2005/06/20 15:22:37 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.4 2005/06/27 12:45:22 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
gistxlogEntryUpdate *data;
int len;
IndexTuple *itup;
- BlockNumber *path;
OffsetNumber *todelete;
} EntryUpdateRecord;
typedef struct {
gistxlogPageSplit *data;
NewPage *page;
- BlockNumber *path;
} PageSplitRecord;
/* track for incomplete inserts, idea was taken from nbtxlog.c */
ItemPointerData key;
int lenblk;
BlockNumber *blkno;
- int pathlen;
- BlockNumber *path;
XLogRecPtr lsn;
+ BlockNumber *path;
+ int pathlen;
} gistIncompleteInsert;
static void
pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
BlockNumber *blkno, int lenblk,
- BlockNumber *path, int pathlen,
PageSplitRecord *xlinfo /* to extract blkno info */ ) {
MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
gistIncompleteInsert *ninsert = (gistIncompleteInsert*)palloc( sizeof(gistIncompleteInsert) );
}
Assert( ninsert->lenblk>0 );
- if ( path && pathlen ) {
- ninsert->pathlen = pathlen;
- ninsert->path = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->pathlen );
- memcpy(ninsert->path, path, sizeof(BlockNumber)*ninsert->pathlen);
- } else {
- ninsert->pathlen = 0;
- ninsert->path = NULL;
- }
-
incomplete_inserts = lappend(incomplete_inserts, ninsert);
MemoryContextSwitchTo(oldCxt);
}
if ( RelFileNodeEquals(node, insert->node) && ItemPointerEQ( &(insert->key), &(key) ) ) {
/* found */
- if ( insert->path ) pfree( insert->path );
pfree( insert->blkno );
incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
pfree( insert );
decoded->data = (gistxlogEntryUpdate*)begin;
- if ( decoded->data->pathlen ) {
- addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen );
- decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
- } else
- decoded->path = NULL;
-
if ( decoded->data->ntodelete ) {
decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogEntryUpdate ) + addpath);
- addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete );
+ addpath = MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete );
} else
decoded->todelete = NULL;
if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO )
pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
&(xlrec.data->blkno), 1,
- xlrec.path, xlrec.data->pathlen,
NULL);
}
}
static void
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) {
char *begin = XLogRecGetData(record), *ptr;
- int j,i=0, addpath = 0;
+ int j,i=0;
decoded->data = (gistxlogPageSplit*)begin;
decoded->page = (NewPage*)palloc( sizeof(NewPage) * decoded->data->npage );
- if ( decoded->data->pathlen ) {
- addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen );
- decoded->path = (BlockNumber*)(begin+sizeof( gistxlogPageSplit ));
- } else
- decoded->path = NULL;
-
- ptr=begin+sizeof( gistxlogPageSplit ) + addpath;
+ ptr=begin+sizeof( gistxlogPageSplit );
for(i=0;i<decoded->data->npage;i++) {
Assert( ptr - begin < record->xl_len );
decoded->page[i].header = (gistxlogPage*)ptr;
pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
NULL, 0,
- xlrec.path, xlrec.data->pathlen,
&xlrec);
}
}
return tuple;
}
+static void
+gixtxlogFindPath( Relation index, gistIncompleteInsert *insert ) {
+ int i;
+ GISTInsertStack *top;
+
+ insert->pathlen = 0;
+ insert->path = NULL;
+
+ for(i=0;insert->lenblk;i++) {
+ if ( (top=gistFindPath(index, insert->blkno[i], XLogReadBuffer)) != NULL ) {
+ GISTInsertStack *ptr=top;
+ while(ptr) {
+ insert->pathlen++;
+ ptr = ptr->parent;
+ }
+
+ insert->path=(BlockNumber*)palloc( sizeof(BlockNumber) * insert->pathlen );
+
+ i=0;
+ ptr = top;
+ while(ptr) {
+ insert->path[i] = ptr->blkno;
+ i++;
+ ptr = ptr->parent;
+ }
+ break;
+ }
+ }
+}
+
static void
gistContinueInsert(gistIncompleteInsert *insert) {
IndexTuple *itup;
for(i=0;i<insert->lenblk;i++)
itup[i] = gist_form_invalid_tuple( insert->blkno[i] );
+ /* construct path */
+ gixtxlogFindPath( index, insert );
+
if ( insert->pathlen==0 ) {
/*it was split root, so we should only make new root*/
Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO);
XLogRecData *
formSplitRdata(RelFileNode node, BlockNumber blkno,
- ItemPointer key,
- BlockNumber *path, int pathlen, SplitedPageLayout *dist ) {
+ ItemPointer key, SplitedPageLayout *dist ) {
XLogRecData *rdata;
gistxlogPageSplit *xlrec = (gistxlogPageSplit*)palloc(sizeof(gistxlogPageSplit));
xlrec->node = node;
xlrec->origblkno = blkno;
xlrec->npage = (uint16)npage;
- xlrec->pathlen = (uint16)pathlen;
if ( key )
xlrec->key = *key;
else
rdata[0].len = sizeof( gistxlogPageSplit );
rdata[0].next = NULL;
- if ( pathlen ) {
- rdata[cur-1].next = &(rdata[cur]);
- rdata[cur].buffer = InvalidBuffer;
- rdata[cur].data = (char*)path;
- rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen);
- rdata[cur].next = NULL;
- cur++;
- }
-
ptr=dist;
while(ptr) {
rdata[cur].buffer = InvalidBuffer;
XLogRecData *
formUpdateRdata(RelFileNode node, BlockNumber blkno,
OffsetNumber *todelete, int ntodelete, bool emptypage,
- IndexTuple *itup, int ituplen, ItemPointer key,
- BlockNumber *path, int pathlen) {
+ IndexTuple *itup, int ituplen, ItemPointer key ) {
XLogRecData *rdata;
gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate*)palloc(sizeof(gistxlogEntryUpdate));
if ( emptypage ) {
xlrec->isemptypage = true;
xlrec->ntodelete = 0;
- xlrec->pathlen = 0;
rdata = (XLogRecData*)palloc( sizeof(XLogRecData) );
rdata->buffer = InvalidBuffer;
xlrec->isemptypage = false;
xlrec->ntodelete = ntodelete;
- xlrec->pathlen = pathlen;
- rdata = (XLogRecData*) palloc( sizeof(XLogRecData) * ( 3 + ituplen ) );
+ rdata = (XLogRecData*) palloc( sizeof(XLogRecData) * ( 2 + ituplen ) );
rdata->buffer = InvalidBuffer;
rdata->data = (char*)xlrec;
rdata->len = sizeof(gistxlogEntryUpdate);
rdata->next = NULL;
- if ( pathlen ) {
- rdata[cur-1].next = &(rdata[cur]);
- rdata[cur].buffer = InvalidBuffer;
- rdata[cur].data = (char*)path;
- rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen);
- rdata[cur].next = NULL;
- cur++;
- }
-
if ( ntodelete ) {
rdata[cur-1].next = &(rdata[cur]);
rdata[cur].buffer = InvalidBuffer;
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/index/indexam.c,v 1.83 2005/06/13 23:14:48 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/index/indexam.c,v 1.84 2005/06/27 12:45:22 teodor Exp $
*
* INTERFACE ROUTINES
* index_open - open an index relation by relation OID
FmgrInfo *procedure;
RELATION_CHECKS;
- GET_REL_PROCEDURE(ambeginscan);
RelationIncrementReferenceCount(indexRelation);
*/
LockRelation(indexRelation, AccessShareLock);
+ /*
+ * LockRelation can clean rd_aminfo structure, so fill procedure
+ * after LockRelation
+ */
+
+ GET_REL_PROCEDURE(ambeginscan);
+
/*
* Tell the AM to open a scan.
*/
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/access/gist.h,v 1.47 2005/06/20 10:29:36 teodor Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist.h,v 1.48 2005/06/27 12:45:22 teodor Exp $
*
*-------------------------------------------------------------------------
*/
#include "storage/bufpage.h"
#include "storage/off.h"
#include "utils/rel.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
/*
* amproc indexes for GiST indexes.
#define F_DELETED (1 << 1)
#define F_TUPLES_DELETED (1 << 2)
+typedef XLogRecPtr GistNSN;
+
typedef struct GISTPageOpaqueData
{
- uint32 flags;
+ uint8 flags;
+
+ /* number page to which current one is splitted in last split */
+ uint8 nsplited;
+
+ /* level of page, 0 - leaf */
+ uint16 level;
+ BlockNumber rightlink;
+
+ /* the only meaning - change this value if
+ page split. */
+ GistNSN nsn;
} GISTPageOpaqueData;
typedef GISTPageOpaqueData *GISTPageOpaque;
bool leafkey;
} GISTENTRY;
-#define GistPageIsLeaf(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_LEAF)
+#define GistPageGetOpaque(page) ( (GISTPageOpaque) PageGetSpecialPointer(page) )
+
+#define GistPageIsLeaf(page) ( GistPageGetOpaque(page)->flags & F_LEAF)
#define GIST_LEAF(entry) (GistPageIsLeaf((entry)->page))
-#define GistPageSetLeaf(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_LEAF)
-#define GistPageSetNonLeaf(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_LEAF)
+#define GistPageSetLeaf(page) ( GistPageGetOpaque(page)->flags |= F_LEAF)
+#define GistPageSetNonLeaf(page) ( GistPageGetOpaque(page)->flags &= ~F_LEAF)
-#define GistPageIsDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_DELETED)
-#define GistPageSetDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_DELETED)
-#define GistPageSetNonDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_DELETED)
+#define GistPageIsDeleted(page) ( GistPageGetOpaque(page)->flags & F_DELETED)
+#define GistPageSetDeleted(page) ( GistPageGetOpaque(page)->flags |= F_DELETED)
+#define GistPageSetNonDeleted(page) ( GistPageGetOpaque(page)->flags &= ~F_DELETED)
-#define GistTuplesDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_TUPLES_DELETED)
-#define GistMarkTuplesDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_TUPLES_DELETED)
-#define GistClearTuplesDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_TUPLES_DELETED)
+#define GistTuplesDeleted(page) ( GistPageGetOpaque(page)->flags & F_TUPLES_DELETED)
+#define GistMarkTuplesDeleted(page) ( GistPageGetOpaque(page)->flags |= F_TUPLES_DELETED)
+#define GistClearTuplesDeleted(page) ( GistPageGetOpaque(page)->flags &= ~F_TUPLES_DELETED)
/*
* Vector of GISTENTRY structs; user-defined methods union and pick
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.5 2005/06/20 15:22:38 teodor Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.6 2005/06/27 12:45:22 teodor Exp $
*
*-------------------------------------------------------------------------
*/
#include "access/xlogdefs.h"
#include "fmgr.h"
+#define GIST_UNLOCK BUFFER_LOCK_UNLOCK
+#define GIST_SHARE BUFFER_LOCK_SHARE
+#define GIST_EXCLUSIVE BUFFER_LOCK_EXCLUSIVE
+
+
/*
+ * XXX old comment!!!
* When we descend a tree, we keep a stack of parent pointers. This
* allows us to follow a chain of internal node points until we reach
* a leaf node, and then back up the stack to re-examine the internal
* the node's page that we stopped at (i.e. we followed the child
* pointer located at the specified offset).
*/
-typedef struct GISTSTACK
+typedef struct GISTSearchStack
{
- struct GISTSTACK *parent;
- OffsetNumber offset;
+ struct GISTSearchStack *next;
BlockNumber block;
-} GISTSTACK;
+ /* to identify page changed */
+ GistNSN lsn;
+ /* to recognize split occured */
+ GistNSN parentlsn;
+} GISTSearchStack;
typedef struct GISTSTATE
{
*/
typedef struct GISTScanOpaqueData
{
- GISTSTACK *stack;
- GISTSTACK *markstk;
+ GISTSearchStack *stack;
+ GISTSearchStack *markstk;
uint16 flags;
GISTSTATE *giststate;
MemoryContext tempCxt;
typedef GISTScanOpaqueData *GISTScanOpaque;
+/* XLog stuff */
+extern const XLogRecPtr XLogRecPtrForTemp;
+
+#define XLOG_GIST_ENTRY_UPDATE 0x00
+#define XLOG_GIST_ENTRY_DELETE 0x10
+#define XLOG_GIST_NEW_ROOT 0x20
+
+typedef struct gistxlogEntryUpdate {
+ RelFileNode node;
+ BlockNumber blkno;
+
+ uint16 ntodelete;
+ bool isemptypage;
+
+ /*
+ * It used to identify completeness of insert.
+ * Sets to leaf itup
+ */
+ ItemPointerData key;
+
+ /* follow:
+ * 1. todelete OffsetNumbers
+ * 2. tuples to insert
+ */
+} gistxlogEntryUpdate;
+
+#define XLOG_GIST_PAGE_SPLIT 0x30
+
+typedef struct gistxlogPageSplit {
+ RelFileNode node;
+ BlockNumber origblkno; /*splitted page*/
+ uint16 npage;
+
+ /* see comments on gistxlogEntryUpdate */
+ ItemPointerData key;
+
+ /* follow:
+ * 1. gistxlogPage and array of IndexTupleData per page
+ */
+} gistxlogPageSplit;
+
+#define XLOG_GIST_INSERT_COMPLETE 0x40
+
+typedef struct gistxlogPage {
+ BlockNumber blkno;
+ int num;
+} gistxlogPage;
+
+#define XLOG_GIST_CREATE_INDEX 0x50
+
+typedef struct gistxlogInsertComplete {
+ RelFileNode node;
+ /* follows ItemPointerData key to clean */
+} gistxlogInsertComplete;
+
+/* SplitedPageLayout - gistSplit function result */
+typedef struct SplitedPageLayout {
+ gistxlogPage block;
+ IndexTupleData *list;
+ int lenlist;
+ Buffer buffer; /* to write after all proceed */
+
+ struct SplitedPageLayout *next;
+} SplitedPageLayout;
+
/*
* GISTInsertStack used for locking buffers and transfer arguments during
* insertion
BlockNumber blkno;
Buffer buffer;
Page page;
+
+ /* log sequence number from page->lsn to
+ recognize page update and compare it with page's nsn
+ to recognize page split*/
+ GistNSN lsn;
/* child's offset */
OffsetNumber childoffnum;
- /* pointer to parent */
+ /* pointer to parent and child */
struct GISTInsertStack *parent;
+ struct GISTInsertStack *child;
- bool todelete;
+ /* for gistFindPath */
+ struct GISTInsertStack *next;
} GISTInsertStack;
+#define XLogRecPtrIsInvalid( r ) ( (r).xlogid == 0 && (r).xrecoff == 0 )
+
typedef struct {
Relation r;
IndexTuple *itup; /* in/out, points to compressed entry */
/* pointer to heap tuple */
ItemPointerData key;
-
- /* path to stroe in XLog */
- BlockNumber *path;
- int pathlen;
} GISTInsertState;
/*
* constants tell us what sort of operation changed the index.
*/
#define GISTOP_DEL 0
-#define GISTOP_SPLIT 1
+/* #define GISTOP_SPLIT 1 */
#define ATTSIZE(datum, tupdesc, i, isnull) \
( \
att_addlength(0, (tupdesc)->attrs[(i)-1]->attlen, (datum)) \
)
-/* XLog stuff */
-#define XLOG_GIST_ENTRY_UPDATE 0x00
-#define XLOG_GIST_ENTRY_DELETE 0x10
-#define XLOG_GIST_NEW_ROOT 0x20
-
-typedef struct gistxlogEntryUpdate {
- RelFileNode node;
- BlockNumber blkno;
-
- uint16 ntodelete;
- uint16 pathlen;
- bool isemptypage;
-
- /*
- * It used to identify completeness of insert.
- * Sets to leaf itup
- */
- ItemPointerData key;
-
- /* follow:
- * 1. path to root (BlockNumber)
- * 2. todelete OffsetNumbers
- * 3. tuples to insert
- */
-} gistxlogEntryUpdate;
-
-#define XLOG_GIST_PAGE_SPLIT 0x30
-
-typedef struct gistxlogPageSplit {
- RelFileNode node;
- BlockNumber origblkno; /*splitted page*/
- uint16 pathlen;
- uint16 npage;
-
- /* see comments on gistxlogEntryUpdate */
- ItemPointerData key;
-
- /* follow:
- * 1. path to root (BlockNumber)
- * 2. gistxlogPage and array of IndexTupleData per page
- */
-} gistxlogPageSplit;
-
-typedef struct gistxlogPage {
- BlockNumber blkno;
- int num;
-} gistxlogPage;
-
-
-#define XLOG_GIST_INSERT_COMPLETE 0x40
-
-typedef struct gistxlogInsertComplete {
- RelFileNode node;
- /* follows ItemPointerData key to clean */
-} gistxlogInsertComplete;
-
-#define XLOG_GIST_CREATE_INDEX 0x50
-
/*
* mark tuples on inner pages during recovery
*/
extern MemoryContext createTempGistContext(void);
extern void initGISTstate(GISTSTATE *giststate, Relation index);
extern void freeGISTstate(GISTSTATE *giststate);
-extern void gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key);
extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate);
+extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key);
-typedef struct SplitedPageLayout {
- gistxlogPage block;
- IndexTupleData *list;
- int lenlist;
- Buffer buffer; /* to write after all proceed */
-
- struct SplitedPageLayout *next;
-} SplitedPageLayout;
-
-IndexTuple * gistSplit(Relation r, Buffer buffer, IndexTuple *itup,
+extern IndexTuple * gistSplit(Relation r, Buffer buffer, IndexTuple *itup,
int *len, SplitedPageLayout **dist, GISTSTATE *giststate);
+
+extern GISTInsertStack* gistFindPath( Relation r, BlockNumber child,
+ Buffer (*myReadBuffer)(bool, Relation, BlockNumber) );
/* gistxlog.c */
extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
extern void gist_desc(char *buf, uint8 xl_info, char *rec);
extern XLogRecData* formUpdateRdata(RelFileNode node, BlockNumber blkno,
OffsetNumber *todelete, int ntodelete, bool emptypage,
- IndexTuple *itup, int ituplen, ItemPointer key,
- BlockNumber *path, int pathlen);
+ IndexTuple *itup, int ituplen, ItemPointer key);
extern XLogRecData* formSplitRdata(RelFileNode node, BlockNumber blkno,
- ItemPointer key,
- BlockNumber *path, int pathlen, SplitedPageLayout *dist );
+ ItemPointer key, SplitedPageLayout *dist);
extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len);
extern Datum gistgetmulti(PG_FUNCTION_ARGS);
/* gistutil.c */
-extern Buffer gistReadBuffer(Relation r, BlockNumber blkno);
+extern Buffer gistNewBuffer(Relation r);
extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup,
int len, OffsetNumber off);
extern bool gistnospace(Page page, IndexTuple *itvec, int len);
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/access/gistscan.h,v 1.26 2004/12/31 22:03:21 pgsql Exp $
+ * $PostgreSQL: pgsql/src/include/access/gistscan.h,v 1.27 2005/06/27 12:45:22 teodor Exp $
*
*-------------------------------------------------------------------------
*/
#define GISTSCAN_H
#include "access/relscan.h"
+#include "access/xlogdefs.h"
extern Datum gistbeginscan(PG_FUNCTION_ARGS);
extern Datum gistrescan(PG_FUNCTION_ARGS);
extern Datum gistmarkpos(PG_FUNCTION_ARGS);
extern Datum gistrestrpos(PG_FUNCTION_ARGS);
extern Datum gistendscan(PG_FUNCTION_ARGS);
-extern void gistadjscans(Relation r, int op, BlockNumber blkno, OffsetNumber offnum);
+extern void gistadjscans(Relation r, int op, BlockNumber blkno, OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn);
extern void ReleaseResources_gist(void);
#endif /* GISTSCAN_H */
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.281 2005/06/24 20:53:31 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.282 2005/06/27 12:45:22 teodor Exp $
*
*-------------------------------------------------------------------------
*/
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 200506241
+#define CATALOG_VERSION_NO 200506271
#endif
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.36 2005/06/24 20:53:31 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.37 2005/06/27 12:45:23 teodor Exp $
*
* NOTES
* the genbki.sh script reads this file and generates .bki
DATA(insert OID = 405 ( hash 1 1 0 f f f f t hashinsert hashbeginscan hashgettuple hashgetmulti hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbulkdelete - hashcostestimate ));
DESCR("hash index access method");
#define HASH_AM_OID 405
-DATA(insert OID = 783 ( gist 100 7 0 f t f f f gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate ));
+DATA(insert OID = 783 ( gist 100 7 0 f t f f t gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate ));
DESCR("GiST index access method");
#define GIST_AM_OID 783