* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.124 2005/06/29 14:06:14 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.125 2005/06/30 17:52:13 teodor Exp $
*
*-------------------------------------------------------------------------
*/
* Should have the same interface as XLogReadBuffer
*/
static Buffer
-gistReadAndLockBuffer( bool unused, Relation r, BlockNumber blkno ) {
+gistReadAndLockBuffer( Relation r, BlockNumber blkno ) {
Buffer buffer = ReadBuffer( r, blkno );
LockBuffer( buffer, GIST_SHARE );
return buffer;
* returns from the begining of closest parent;
*/
GISTInsertStack*
-gistFindPath( Relation r, BlockNumber child, Buffer (*myReadBuffer)(bool, Relation, BlockNumber) ) {
+gistFindPath( Relation r, BlockNumber child, Buffer (*myReadBuffer)(Relation, BlockNumber) ) {
Page page;
Buffer buffer;
OffsetNumber i, maxoff;
top->blkno = GIST_ROOT_BLKNO;
while( top && top->blkno != child ) {
- buffer = myReadBuffer(false, r, top->blkno); /* buffer locked */
+ buffer = myReadBuffer(r, top->blkno); /* buffer locked */
page = (Page)BufferGetPage( buffer );
- Assert( !GistPageIsLeaf(page) );
+
+ if ( GistPageIsLeaf(page) ) {
+ /* we can safety go away, follows only leaf pages */
+ LockBuffer( buffer, GIST_UNLOCK );
+ ReleaseBuffer( buffer );
+ return NULL;
+ }
top->lsn = PageGetLSN(page);
LockBuffer( buffer, GIST_UNLOCK );
ReleaseBuffer( buffer );
return top;
- } else if ( GistPageGetOpaque(page)->level> 0 ) {
+ } else {
/* Install next inner page to the end of stack */
ptr = (GISTInsertStack*)palloc0( sizeof(GISTInsertStack) );
ptr->blkno = blkno;
OffsetNumber *realoffset;
IndexTuple *cleaneditup = itup;
int lencleaneditup = *len;
- int level;
p = (Page) BufferGetPage(buffer);
opaque = GistPageGetOpaque(p);
- level = opaque->level;
/*
* The root of the tree is the first block in the relation. If we're
GISTInitBuffer(leftbuf, opaque->flags&F_LEAF);
lbknum = BufferGetBlockNumber(leftbuf);
left = (Page) BufferGetPage(leftbuf);
- GistPageGetOpaque(left)->level = level;
}
else
{
GISTInitBuffer(rightbuf, opaque->flags&F_LEAF);
rbknum = BufferGetBlockNumber(rightbuf);
right = (Page) BufferGetPage(rightbuf);
- GistPageGetOpaque(right)->level = level;
/* generate the item array */
realoffset = palloc((*len + 1) * sizeof(OffsetNumber));
gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key)
{
Page page;
- int level;
Assert( BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO );
page = BufferGetPage(buffer);
- level = GistPageGetOpaque(page)->level;
GISTInitBuffer(buffer, 0);
- GistPageGetOpaque(page)->level = level+1;
gistfillbuffer(r, page, itup, len, FirstOffsetNumber);
if ( !r->rd_istemp ) {
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.5 2005/06/28 15:51:00 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.6 2005/06/30 17:52:14 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
typedef struct gistIncompleteInsert {
RelFileNode node;
+ BlockNumber origblkno; /* for splits */
ItemPointerData key;
int lenblk;
BlockNumber *blkno;
ninsert->lenblk = lenblk;
ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk );
memcpy(ninsert->blkno, blkno, sizeof(BlockNumber)*ninsert->lenblk);
+ ninsert->origblkno = *blkno;
} else {
int i;
ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk );
for(i=0;i<ninsert->lenblk;i++)
ninsert->blkno[i] = xlinfo->page[i].header->blkno;
+ ninsert->origblkno = xlinfo->data->origblkno;
}
Assert( ninsert->lenblk>0 );
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
+ GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
return tuple;
}
+static Buffer
+gistXLogReadAndLockBuffer( Relation r, BlockNumber blkno ) {
+ Buffer buffer = XLogReadBuffer( false, r, blkno );
+ if (!BufferIsValid(buffer))
+ elog(PANIC, "gistXLogReadAndLockBuffer: block %u unfound", blkno);
+ if ( PageIsNew( (PageHeader)(BufferGetPage(buffer)) ) )
+ elog(PANIC, "gistXLogReadAndLockBuffer: uninitialized page %u", blkno);
+
+ return buffer;
+}
+
+
static void
gixtxlogFindPath( Relation index, gistIncompleteInsert *insert ) {
- int i;
GISTInsertStack *top;
insert->pathlen = 0;
insert->path = NULL;
- for(i=0;insert->lenblk;i++) {
- if ( (top=gistFindPath(index, insert->blkno[i], XLogReadBuffer)) != NULL ) {
- GISTInsertStack *ptr=top;
- while(ptr) {
- insert->pathlen++;
- ptr = ptr->parent;
- }
+ if ( (top=gistFindPath(index, insert->origblkno, gistXLogReadAndLockBuffer)) != NULL ) {
+ int i;
+ GISTInsertStack *ptr=top;
+ while(ptr) {
+ insert->pathlen++;
+ ptr = ptr->parent;
+ }
- insert->path=(BlockNumber*)palloc( sizeof(BlockNumber) * insert->pathlen );
+ insert->path=(BlockNumber*)palloc( sizeof(BlockNumber) * insert->pathlen );
- i=0;
- ptr = top;
- while(ptr) {
- insert->path[i] = ptr->blkno;
- i++;
- ptr = ptr->parent;
- }
- break;
+ i=0;
+ ptr = top;
+ while(ptr) {
+ insert->path[i] = ptr->blkno;
+ i++;
+ ptr = ptr->parent;
}
- }
+ } else
+ elog(LOG, "gixtxlogFindPath: lost parent for block %u", insert->origblkno);
}
static void
gistContinueInsert(gistIncompleteInsert *insert) {
IndexTuple *itup;
int i, lenitup;
- MemoryContext oldCxt;
Relation index;
- oldCxt = MemoryContextSwitchTo(opCtx);
-
index = XLogOpenRelation(insert->node);
- if (!RelationIsValid(index))
+ if (!RelationIsValid(index))
return;
- elog(LOG,"Detected incomplete insert into GiST index %u/%u/%u; It's desirable to vacuum or reindex index",
- insert->node.spcNode, insert->node.dbNode, insert->node.relNode);
-
/* needed vector itup never will be more than initial lenblkno+2,
because during this processing Indextuple can be only smaller */
lenitup = insert->lenblk;
itup = (IndexTuple*)palloc(sizeof(IndexTuple)*(lenitup+2 /*guarantee root split*/));
- for(i=0;i<insert->lenblk;i++)
+ for(i=0;i<insert->lenblk;i++)
itup[i] = gist_form_invalid_tuple( insert->blkno[i] );
- /* construct path */
- gixtxlogFindPath( index, insert );
-
- if ( insert->pathlen==0 ) {
- /*it was split root, so we should only make new root*/
+ if ( insert->origblkno==GIST_ROOT_BLKNO ) {
+ /*it was split root, so we should only make new root.
+ it can't be simple insert into root, look at call
+ pushIncompleteInsert in gistRedoPageSplitRecord */
Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO);
Page page;
if (!BufferIsValid(buffer))
elog(PANIC, "gistContinueInsert: root block unfound");
+ page = BufferGetPage(buffer);
+ if (XLByteLE(insert->lsn, PageGetLSN(page))) {
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ return;
+ }
+
GISTInitBuffer(buffer, 0);
page = BufferGetPage(buffer);
gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber);
+ PageSetLSN(page, insert->lsn);
+ PageSetTLI(page, ThisTimeLineID);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
} else {
Buffer *buffers;
Page *pages;
int numbuffer;
-
+
+ /* construct path */
+ gixtxlogFindPath( index, insert );
+
+ Assert( insert->pathlen > 0 );
+
buffers= (Buffer*) palloc( sizeof(Buffer) * (insert->lenblk+2/*guarantee root split*/) );
pages = (Page*) palloc( sizeof(Page ) * (insert->lenblk+2/*guarantee root split*/) );
if ( PageIsNew((PageHeader)(pages[numbuffer-1])) )
elog(PANIC, "gistContinueInsert: uninitialized page");
+ if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer-1]))) {
+ LockBuffer(buffers[numbuffer-1], BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffers[numbuffer-1]);
+ return;
+ }
+
pituplen = PageGetMaxOffsetNumber(pages[numbuffer-1]);
/* remove old IndexTuples */
if ( BufferGetBlockNumber( buffers[0] ) == GIST_ROOT_BLKNO ) {
IndexTuple *parentitup;
+ /* we split root, just copy tuples from old root to new page */
parentitup = gistextractbuffer(buffers[numbuffer-1], &pituplen);
- /* we split root, just copy tuples from old root to new page */
+ /* sanity check */
if ( i+1 != insert->pathlen )
elog(PANIC,"gistContinueInsert: can't restore index '%s'",
RelationGetRelationName( index ));
itup[j]=gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) );
PageSetLSN(pages[j], insert->lsn);
PageSetTLI(pages[j], ThisTimeLineID);
+ GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
LockBuffer(buffers[j], BUFFER_LOCK_UNLOCK);
WriteBuffer( buffers[j] );
}
}
}
- MemoryContextSwitchTo(oldCxt);
- MemoryContextReset(opCtx);
+ elog(LOG,"Detected incomplete insert into GiST index %u/%u/%u; It's desirable to vacuum or reindex index",
+ insert->node.spcNode, insert->node.dbNode, insert->node.relNode);
}
void
void
gist_xlog_cleanup(void) {
ListCell *l;
+ List *reverse=NIL;
+ MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
- foreach(l, incomplete_inserts) {
+ /* we should call gistContinueInsert in reverse order */
+
+ foreach(l, incomplete_inserts)
+ reverse = lappend(reverse, lfirst(l));
+
+ MemoryContextSwitchTo(opCtx);
+ foreach(l, reverse) {
gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l);
gistContinueInsert(insert);
+ MemoryContextReset(opCtx);
}
+ MemoryContextSwitchTo(oldCxt);
+
MemoryContextDelete(opCtx);
MemoryContextDelete(insertCtx);
}