* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gin/gindatapage.c,v 1.6 2007/01/05 22:19:21 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gin/gindatapage.c,v 1.7 2007/06/04 15:56:28 teodor Exp $
*-------------------------------------------------------------------------
*/
static XLogRecData rdata[3];
int sizeofitem = GinSizeOfItem(page);
static ginxlogInsert data;
+ int cnt=0;
*prdata = rdata;
Assert(GinPageIsData(page));
data.isData = TRUE;
data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
- rdata[0].buffer = buf;
- rdata[0].buffer_std = FALSE;
- rdata[0].data = NULL;
- rdata[0].len = 0;
- rdata[0].next = &rdata[1];
+ /*
+ * Prevent full page write if child's split occurs. That is needed
+ * to remove incomplete splits while replaying WAL
+ *
+ * data.updateBlkno contains new block number (of newly created right page)
+ * for recently splited page.
+ */
+ if ( data.updateBlkno == InvalidBlockNumber )
+ {
+ rdata[0].buffer = buf;
+ rdata[0].buffer_std = FALSE;
+ rdata[0].data = NULL;
+ rdata[0].len = 0;
+ rdata[0].next = &rdata[1];
+ cnt++;
+ }
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = (char *) &data;
- rdata[1].len = sizeof(ginxlogInsert);
- rdata[1].next = &rdata[2];
+ rdata[cnt].buffer = InvalidBuffer;
+ rdata[cnt].data = (char *) &data;
+ rdata[cnt].len = sizeof(ginxlogInsert);
+ rdata[cnt].next = &rdata[cnt+1];
+ cnt++;
- rdata[2].buffer = InvalidBuffer;
- rdata[2].data = (GinPageIsLeaf(page)) ? ((char *) (btree->items + btree->curitem)) : ((char *) &(btree->pitem));
- rdata[2].len = sizeofitem;
- rdata[2].next = NULL;
+ rdata[cnt].buffer = InvalidBuffer;
+ rdata[cnt].data = (GinPageIsLeaf(page)) ? ((char *) (btree->items + btree->curitem)) : ((char *) &(btree->pitem));
+ rdata[cnt].len = sizeofitem;
+ rdata[cnt].next = NULL;
if (GinPageIsLeaf(page))
{
btree->curitem++;
}
data.nitem = btree->curitem - savedPos;
- rdata[2].len = sizeofitem * data.nitem;
+ rdata[cnt].len = sizeofitem * data.nitem;
}
else
{
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gin/ginentrypage.c,v 1.6 2007/01/05 22:19:21 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gin/ginentrypage.c,v 1.7 2007/06/04 15:56:28 teodor Exp $
*-------------------------------------------------------------------------
*/
static XLogRecData rdata[3];
OffsetNumber placed;
static ginxlogInsert data;
+ int cnt=0;
*prdata = rdata;
data.updateBlkno = entryPreparePage(btree, page, off);
data.isData = false;
data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
- rdata[0].buffer = buf;
- rdata[0].buffer_std = TRUE;
- rdata[0].data = NULL;
- rdata[0].len = 0;
- rdata[0].next = &rdata[1];
+ /*
+ * Prevent full page write if child's split occurs. That is needed
+ * to remove incomplete splits while replaying WAL
+ *
+ * data.updateBlkno contains new block number (of newly created right page)
+ * for recently splited page.
+ */
+ if ( data.updateBlkno == InvalidBlockNumber )
+ {
+ rdata[0].buffer = buf;
+ rdata[0].buffer_std = TRUE;
+ rdata[0].data = NULL;
+ rdata[0].len = 0;
+ rdata[0].next = &rdata[1];
+ cnt++;
+ }
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = (char *) &data;
- rdata[1].len = sizeof(ginxlogInsert);
- rdata[1].next = &rdata[2];
-
- rdata[2].buffer = InvalidBuffer;
- rdata[2].data = (char *) btree->entry;
- rdata[2].len = IndexTupleSize(btree->entry);
- rdata[2].next = NULL;
+ rdata[cnt].buffer = InvalidBuffer;
+ rdata[cnt].data = (char *) &data;
+ rdata[cnt].len = sizeof(ginxlogInsert);
+ rdata[cnt].next = &rdata[cnt+1];
+ cnt++;
+
+ rdata[cnt].buffer = InvalidBuffer;
+ rdata[cnt].data = (char *) btree->entry;
+ rdata[cnt].len = IndexTupleSize(btree->entry);
+ rdata[cnt].next = NULL;
btree->entry = NULL;
}
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gin/ginget.c,v 1.7 2007/02/01 04:16:08 neilc Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gin/ginget.c,v 1.8 2007/06/04 15:56:28 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "catalog/index.h"
#include "utils/memutils.h"
-static OffsetNumber
-findItemInPage(Page page, ItemPointer item, OffsetNumber off)
+static bool
+findItemInPage(Page page, ItemPointer item, OffsetNumber *off)
{
OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
int res;
- for (; off <= maxoff; off++)
+ if ( GinPageGetOpaque(page)->flags & GIN_DELETED )
+ /* page was deleted by concurrent vacuum */
+ return false;
+
+ if ( *off > maxoff || *off == InvalidOffsetNumber )
+ res = -1;
+ else
+ res = compareItemPointers(item, (ItemPointer) GinDataPageGetItem(page, *off));
+
+ if ( res == 0 )
+ {
+ /* page isn't changed */
+ return true;
+ }
+ else if ( res > 0 )
{
- res = compareItemPointers(item, (ItemPointer) GinDataPageGetItem(page, off));
- Assert(res >= 0);
+ /*
+ * some items was added before our position, look further to find
+ * it or first greater
+ */
+
+ (*off)++;
+ for (; *off <= maxoff; (*off)++)
+ {
+ res = compareItemPointers(item, (ItemPointer) GinDataPageGetItem(page, *off));
+
+ if (res == 0)
+ return true;
- if (res == 0)
- return off;
+ if (res < 0)
+ {
+ (*off)--;
+ return true;
+ }
+ }
}
+ else
+ {
+ /*
+ * some items was deleted before our position, look from begining
+ * to find it or first greater
+ */
+
+ for(*off = FirstOffsetNumber; *off<= maxoff; (*off)++)
+ {
+ res = compareItemPointers(item, (ItemPointer) GinDataPageGetItem(page, *off));
- return InvalidOffsetNumber;
+ if ( res == 0 )
+ return true;
+
+ if (res < 0)
+ {
+ (*off)--;
+ return true;
+ }
+ }
+ }
+
+ return false;
}
/*
}
else if (entry->buffer != InvalidBuffer)
{
- /* we should find place were we was stopped */
+ /* we should find place where we was stopped */
BlockNumber blkno;
Page page;
page = BufferGetPage(entry->buffer);
/* try to find curItem in current buffer */
- if ((entry->offset = findItemInPage(page, &entry->curItem, entry->offset)) != InvalidOffsetNumber)
+ if ( findItemInPage(page, &entry->curItem, &entry->offset) )
return;
/* walk to right */
LockBuffer(entry->buffer, GIN_SHARE);
page = BufferGetPage(entry->buffer);
- if ((entry->offset = findItemInPage(page, &entry->curItem, FirstOffsetNumber)) != InvalidOffsetNumber)
+ entry->offset = InvalidOffsetNumber;
+ if ( findItemInPage(page, &entry->curItem, &entry->offset) )
return;
}
- elog(ERROR, "Logic error: lost previously founded ItemId");
+ /*
+ * curItem and any greated items was deleted by concurrent vacuum,
+ * so we finished scan with currrent entry
+ */
}
}
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gin/ginvacuum.c,v 1.13 2007/05/31 14:03:09 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gin/ginvacuum.c,v 1.14 2007/06/04 15:56:28 teodor Exp $
*-------------------------------------------------------------------------
*/
/*
* We should be sure that we don't concurrent with inserts, insert process
* never release root page until end (but it can unlock it and lock
- * again). If we lock root with with LockBufferForCleanup, new scan
- * process can't begin, but previous may run. ginmarkpos/start* keeps
- * buffer pinned, so we will wait for it. We lock only one posting tree in
- * whole index, so, it's concurrent enough.. Side effect: after this is
- * full complete, tree is unused by any other process
+ * again). New scan can't start but previously started
+ * ones work concurrently.
*/
- LockBufferForCleanup(buffer);
+ if ( isRoot )
+ LockBufferForCleanup(buffer);
+ else
+ LockBuffer(buffer, GIN_EXCLUSIVE);
Assert(GinPageIsData(page));
if (!isParentRoot) /* parent is already locked by
* LockBufferForCleanup() */
LockBuffer(pBuffer, GIN_EXCLUSIVE);
+ if (leftBlkno != InvalidBlockNumber)
+ LockBuffer(lBuffer, GIN_EXCLUSIVE);
START_CRIT_SECTION();
{
BlockNumber rightlink;
- LockBuffer(lBuffer, GIN_EXCLUSIVE);
-
page = BufferGetPage(dBuffer);
rightlink = GinPageGetOpaque(page)->rightlink;
PageDeletePostingItem(parentPage, myoff);
page = BufferGetPage(dBuffer);
+ /*
+ * we shouldn't change rightlink field to save
+ * workability of running search scan
+ */
GinPageGetOpaque(page)->flags = GIN_DELETED;
if (!gvs->index->rd_istemp)
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gin/ginxlog.c,v 1.6 2007/01/05 22:19:21 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gin/ginxlog.c,v 1.7 2007/06/04 15:56:28 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno)
{
ListCell *l;
+ bool found = false;
foreach(l, incomplete_splits)
{
if (RelFileNodeEquals(node, split->node) && leftBlkno == split->leftBlkno && updateBlkno == split->rightBlkno)
{
incomplete_splits = list_delete_ptr(incomplete_splits, split);
+ found = true;
break;
}
}
+
+ if (!found)
+ {
+ elog(ERROR, "failed to identify corresponding split record for %u/%u/%u",
+ node.relNode, leftBlkno, updateBlkno);
+ }
}
static void
UnlockReleaseBuffer(buffer);
}
- if (!(record->xl_info & XLR_BKP_BLOCK_2) && data->leftBlkno != InvalidBlockNumber)
+ if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
{
buffer = XLogReadBuffer(reln, data->leftBlkno, false);
page = BufferGetPage(buffer);
MemoryContextSwitchTo(topCtx);
MemoryContextDelete(opCtx);
+ incomplete_splits = NIL;
}
bool