* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.20 2006/05/10 09:19:54 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.21 2006/05/17 16:34:59 teodor Exp $
*
*-------------------------------------------------------------------------
*/
bool emptypage;
} ArrayTuple;
+/*
+ * Make union of keys on page
+ */
+static IndexTuple
+PageMakeUnionKey(GistVacuum *gv, Buffer buffer) {
+ Page page = BufferGetPage( buffer );
+ IndexTuple *vec,
+ tmp, res;
+ int veclen = 0;
+ MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
+
+ vec = gistextractpage(page, &veclen);
+ /* we call gistunion() in temprorary context because user-defined functions called in gistunion()
+ may do not free all memory */
+ tmp = gistunion(gv->index, vec, veclen, &(gv->giststate));
+ MemoryContextSwitchTo(oldCtx);
+
+ res = (IndexTuple) palloc(IndexTupleSize(tmp));
+ memcpy(res, tmp, IndexTupleSize(tmp));
+
+ ItemPointerSetBlockNumber(&(res->t_tid), BufferGetBlockNumber(buffer));
+ GistTupleSetValid(res);
+
+ MemoryContextReset(gv->opCtx);
+
+ return res;
+}
+
+static void
+gistDeleteSubtree( GistVacuum *gv, BlockNumber blkno ) {
+ Buffer buffer;
+ Page page;
+
+ buffer = ReadBuffer(gv->index, blkno);
+ LockBuffer(buffer, GIST_EXCLUSIVE);
+ page = (Page) BufferGetPage(buffer);
+
+ if ( !GistPageIsLeaf(page) ) {
+ int i;
+
+ for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i = OffsetNumberNext(i)) {
+ ItemId iid = PageGetItemId(page, i);
+ IndexTuple idxtuple = (IndexTuple) PageGetItem(page, iid);
+ gistDeleteSubtree(gv, ItemPointerGetBlockNumber(&(idxtuple->t_tid)));
+ }
+ }
+
+ START_CRIT_SECTION();
+
+ MarkBufferDirty(buffer);
+
+ page = (Page) BufferGetPage(buffer);
+ GistPageSetDeleted(page);
+ gv->result->std.pages_deleted++;
+
+ if (!gv->index->rd_istemp)
+ {
+ XLogRecData rdata;
+ XLogRecPtr recptr;
+ gistxlogPageDelete xlrec;
+
+ xlrec.node = gv->index->rd_node;
+ xlrec.blkno = blkno;
+
+ rdata.buffer = InvalidBuffer;
+ rdata.data = (char *) &xlrec;
+ rdata.len = sizeof(gistxlogPageDelete);
+ rdata.next = NULL;
+
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE, &rdata);
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ }
+ else
+ PageSetLSN(page, XLogRecPtrForTemp);
+
+ END_CRIT_SECTION();
+
+ UnlockReleaseBuffer(buffer);
+}
+
+static Page
+GistPageGetCopyPage( Page page ) {
+ Size pageSize = PageGetPageSize( page );
+ Page tmppage;
+
+ tmppage=(Page)palloc( pageSize );
+ memcpy( tmppage, page, pageSize );
+
+ return tmppage;
+}
+
+static ArrayTuple
+vacuumSplitPage(GistVacuum *gv, Page tempPage, Buffer buffer, IndexTuple *addon, int curlenaddon) {
+ ArrayTuple res = {NULL, 0, false};
+ IndexTuple *vec;
+ SplitedPageLayout *dist = NULL,
+ *ptr;
+ int i, veclen=0;
+ BlockNumber blkno = BufferGetBlockNumber(buffer);
+ MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
+
+ vec = gistextractpage(tempPage, &veclen);
+ vec = gistjoinvector(vec, &veclen, addon, curlenaddon);
+ dist = gistSplit(gv->index, tempPage, vec, veclen, &(gv->giststate));
+
+ MemoryContextSwitchTo(oldCtx);
+
+ if (blkno != GIST_ROOT_BLKNO) {
+ /* if non-root split then we should not allocate new buffer */
+ dist->buffer = buffer;
+ dist->page = tempPage;
+ /* during vacuum we never split leaf page */
+ GistPageGetOpaque(dist->page)->flags = 0;
+ } else
+ pfree(tempPage);
+
+ res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen);
+ res.ituplen = 0;
+
+ /* make new pages and fills them */
+ for (ptr = dist; ptr; ptr = ptr->next) {
+ char *data;
+
+ if ( ptr->buffer == InvalidBuffer ) {
+ ptr->buffer = gistNewBuffer( gv->index );
+ GISTInitBuffer( ptr->buffer, 0 );
+ ptr->page = BufferGetPage(ptr->buffer);
+ }
+ ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
+
+ data = (char*)(ptr->list);
+ for(i=0;i<ptr->block.num;i++) {
+ if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
+ elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index));
+ data += IndexTupleSize((IndexTuple)data);
+ }
+
+ ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
+ res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup));
+ memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) );
+ res.ituplen++;
+ }
+
+ START_CRIT_SECTION();
+
+ for (ptr = dist; ptr; ptr = ptr->next) {
+ MarkBufferDirty(ptr->buffer);
+ GistPageGetOpaque(ptr->page)->rightlink = InvalidBlockNumber;
+ }
+
+ /* restore splitted non-root page */
+ if (blkno != GIST_ROOT_BLKNO) {
+ PageRestoreTempPage( dist->page, BufferGetPage( dist->buffer ) );
+ dist->page = BufferGetPage( dist->buffer );
+ }
+
+ if (!gv->index->rd_istemp)
+ {
+ XLogRecPtr recptr;
+ XLogRecData *rdata;
+ ItemPointerData key; /* set key for incomplete
+ * insert */
+ char *xlinfo;
+
+ ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
+
+ rdata = formSplitRdata(gv->index->rd_node, blkno,
+ false, &key, dist);
+ xlinfo = rdata->data;
+
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
+ for (ptr = dist; ptr; ptr = ptr->next)
+ {
+ PageSetLSN(BufferGetPage(ptr->buffer), recptr);
+ PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
+ }
+
+ pfree(xlinfo);
+ pfree(rdata);
+ }
+ else
+ {
+ for (ptr = dist; ptr; ptr = ptr->next)
+ PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
+ }
+
+ for (ptr = dist; ptr; ptr = ptr->next)
+ {
+ /* we must keep the buffer pin on the head page */
+ if (BufferGetBlockNumber(ptr->buffer) != blkno)
+ UnlockReleaseBuffer( ptr->buffer );
+ }
+
+ if (blkno == GIST_ROOT_BLKNO)
+ {
+ ItemPointerData key; /* set key for incomplete
+ * insert */
+
+ ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
+
+ gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key);
+ }
+
+ END_CRIT_SECTION();
+
+ MemoryContextReset(gv->opCtx);
+
+ return res;
+}
static ArrayTuple
gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
{
ArrayTuple res = {NULL, 0, false};
Buffer buffer;
- Page page;
+ Page page, tempPage = NULL;
OffsetNumber i,
maxoff;
ItemId iid;
int lenaddon = 4,
curlenaddon = 0,
- ntodelete = 0;
+ nOffToDelete = 0,
+ nBlkToDelete = 0;
IndexTuple idxtuple,
*addon = NULL;
bool needwrite = false;
- OffsetNumber todelete[MaxOffsetNumber];
+ OffsetNumber offToDelete[MaxOffsetNumber];
+ BlockNumber blkToDelete[MaxOffsetNumber];
ItemPointerData *completed = NULL;
int ncompleted = 0,
lencompleted = 16;
page = (Page) BufferGetPage(buffer);
maxoff = PageGetMaxOffsetNumber(page);
- /*
- * XXX need to reduce scope of changes to page so we can make this
- * critical section less extensive
- */
- START_CRIT_SECTION();
-
if (GistPageIsLeaf(page))
{
if (GistTuplesDeleted(page))
completed = (ItemPointerData *) palloc(sizeof(ItemPointerData) * lencompleted);
addon = (IndexTuple *) palloc(sizeof(IndexTuple) * lenaddon);
+ /* get copy of page to work */
+ tempPage = GistPageGetCopyPage(page);
+
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
ArrayTuple chldtuple;
bool needchildunion;
- iid = PageGetItemId(page, i);
- idxtuple = (IndexTuple) PageGetItem(page, iid);
+ iid = PageGetItemId(tempPage, i);
+ idxtuple = (IndexTuple) PageGetItem(tempPage, iid);
needchildunion = (GistTupleIsInvalid(idxtuple)) ? true : false;
if (needchildunion)
needchildunion);
if (chldtuple.ituplen || chldtuple.emptypage)
{
- PageIndexTupleDelete(page, i);
- todelete[ntodelete++] = i;
+ /* update tuple or/and inserts new */
+ if ( chldtuple.emptypage )
+ blkToDelete[nBlkToDelete++] = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
+ offToDelete[nOffToDelete++] = i;
+ PageIndexTupleDelete(tempPage, i);
i--;
maxoff--;
needwrite = needunion = true;
if (chldtuple.ituplen)
{
+
+ Assert( chldtuple.emptypage == false );
while (curlenaddon + chldtuple.ituplen >= lenaddon)
{
lenaddon *= 2;
}
}
}
+
+ Assert( maxoff == PageGetMaxOffsetNumber(tempPage) );
if (curlenaddon)
{
/* insert updated tuples */
- if (gistnospace(page, addon, curlenaddon, InvalidOffsetNumber))
- {
+ if (gistnospace(tempPage, addon, curlenaddon, InvalidOffsetNumber)) {
/* there is no space on page to insert tuples */
- IndexTuple *vec;
- SplitedPageLayout *dist = NULL,
- *ptr;
- int i, veclen=0;
- MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
-
- vec = gistextractbuffer(buffer, &veclen);
- vec = gistjoinvector(vec, &veclen, addon, curlenaddon);
- dist = gistSplit(gv->index, page, vec, veclen, &(gv->giststate));
-
- MemoryContextSwitchTo(oldCtx);
-
- if (blkno != GIST_ROOT_BLKNO) {
- /* if non-root split then we should not allocate new buffer */
- dist->buffer = buffer;
- dist->page = BufferGetPage(dist->buffer);
- GistPageGetOpaque(dist->page)->flags = 0;
- }
-
- res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen);
- res.ituplen = 0;
-
- /* make new pages and fills them */
- for (ptr = dist; ptr; ptr = ptr->next) {
- char *data;
-
- if ( ptr->buffer == InvalidBuffer ) {
- ptr->buffer = gistNewBuffer( gv->index );
- GISTInitBuffer( ptr->buffer, 0 );
- ptr->page = BufferGetPage(ptr->buffer);
- }
- ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
-
- data = (char*)(ptr->list);
- for(i=0;i<ptr->block.num;i++) {
- if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
- elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index));
- data += IndexTupleSize((IndexTuple)data);
- }
-
- ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
- res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup));
- memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) );
- res.ituplen++;
-
- MarkBufferDirty(ptr->buffer);
- }
-
- if (!gv->index->rd_istemp)
- {
- XLogRecPtr recptr;
- XLogRecData *rdata;
- ItemPointerData key; /* set key for incomplete
- * insert */
- char *xlinfo;
-
- ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
+ res = vacuumSplitPage(gv, tempPage, buffer, addon, curlenaddon);
+ tempPage=NULL; /* vacuumSplitPage() free tempPage */
+ needwrite = needunion = false; /* gistSplit already forms unions and writes pages */
+ } else
+ /* enough free space */
+ gistfillbuffer(gv->index, tempPage, addon, curlenaddon, InvalidOffsetNumber);
+ }
+ }
- rdata = formSplitRdata(gv->index->rd_node, blkno,
- false, &key, dist);
- xlinfo = rdata->data;
+ /*
+ * If page is empty, we should remove pointer to it before
+ * deleting page (except root)
+ */
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
- for (ptr = dist; ptr; ptr = ptr->next)
- {
- PageSetLSN(BufferGetPage(ptr->buffer), recptr);
- PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
- }
+ if ( blkno != GIST_ROOT_BLKNO && ( PageIsEmpty(page) || (tempPage && PageIsEmpty(tempPage)) ) ) {
+ /*
+ * New version of page is empty, so leave it unchanged,
+ * upper call will mark our page as deleted.
+ * In case of page split we never will be here...
+ *
+ * If page was empty it can't become non-empty during processing
+ */
+ res.emptypage = true;
+ UnlockReleaseBuffer(buffer);
+ } else {
+ /* write page and remove its childs if it need */
- pfree(xlinfo);
- pfree(rdata);
- }
- else
- {
- for (ptr = dist; ptr; ptr = ptr->next)
- {
- PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
- }
- }
+ START_CRIT_SECTION();
- for (ptr = dist; ptr; ptr = ptr->next)
- {
- /* we must keep the buffer pin on the head page */
- if (BufferGetBlockNumber(ptr->buffer) != blkno)
- UnlockReleaseBuffer( ptr->buffer );
- }
+ if ( tempPage && needwrite ) {
+ PageRestoreTempPage(tempPage, page);
+ tempPage = NULL;
+ }
- if (blkno == GIST_ROOT_BLKNO)
- {
- ItemPointerData key; /* set key for incomplete
- * insert */
+ /* Empty index */
+ if (PageIsEmpty(page) && blkno == GIST_ROOT_BLKNO )
+ {
+ needwrite = true;
+ GistPageSetLeaf(page);
+ }
- ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
+
+ if (needwrite)
+ {
+ MarkBufferDirty(buffer);
+ GistClearTuplesDeleted(page);
- oldCtx = MemoryContextSwitchTo(gv->opCtx);
- gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key);
- MemoryContextSwitchTo(oldCtx);
- }
+ if (!gv->index->rd_istemp)
+ {
+ XLogRecData *rdata;
+ XLogRecPtr recptr;
+ char *xlinfo;
- needwrite = false;
+ rdata = formUpdateRdata(gv->index->rd_node, buffer,
+ offToDelete, nOffToDelete,
+ addon, curlenaddon, NULL);
+ xlinfo = rdata->next->data;
- MemoryContextReset(gv->opCtx);
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
- needunion = false; /* gistSplit already forms unions */
+ pfree(xlinfo);
+ pfree(rdata);
}
else
- {
- /* enough free space */
- gistfillbuffer(gv->index, page, addon, curlenaddon, InvalidOffsetNumber);
- }
+ PageSetLSN(page, XLogRecPtrForTemp);
}
- }
- if (needunion)
- {
- /* forms union for page or check empty */
- if (PageIsEmpty(page))
- {
- if (blkno == GIST_ROOT_BLKNO)
- {
- needwrite = true;
- GistPageSetLeaf(page);
- }
- else
- {
- needwrite = true;
- res.emptypage = true;
- GistPageSetDeleted(page);
- gv->result->std.pages_deleted++;
- }
- }
- else
- {
- IndexTuple *vec,
- tmp;
- int veclen = 0;
- MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
-
- vec = gistextractbuffer(buffer, &veclen);
- tmp = gistunion(gv->index, vec, veclen, &(gv->giststate));
- MemoryContextSwitchTo(oldCtx);
+ END_CRIT_SECTION();
+ if ( needunion && !PageIsEmpty(page) )
+ {
res.itup = (IndexTuple *) palloc(sizeof(IndexTuple));
res.ituplen = 1;
- res.itup[0] = (IndexTuple) palloc(IndexTupleSize(tmp));
- memcpy(res.itup[0], tmp, IndexTupleSize(tmp));
-
- ItemPointerSetBlockNumber(&(res.itup[0]->t_tid), blkno);
- GistTupleSetValid(res.itup[0]);
-
- MemoryContextReset(gv->opCtx);
+ res.itup[0] = PageMakeUnionKey(gv, buffer);
}
- }
- if (needwrite)
- {
- MarkBufferDirty(buffer);
- GistClearTuplesDeleted(page);
-
- if (!gv->index->rd_istemp)
- {
- XLogRecData *rdata;
- XLogRecPtr recptr;
- char *xlinfo;
-
- rdata = formUpdateRdata(gv->index->rd_node, buffer,
- todelete, ntodelete, res.emptypage,
- addon, curlenaddon, NULL);
- xlinfo = rdata->data;
+ UnlockReleaseBuffer(buffer);
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
- PageSetLSN(page, recptr);
- PageSetTLI(page, ThisTimeLineID);
+ /* delete empty children, now we havn't any links to pointed subtrees */
+ for(i=0;i<nBlkToDelete;i++)
+ gistDeleteSubtree(gv, blkToDelete[i]);
- pfree(xlinfo);
- pfree(rdata);
- }
- else
- PageSetLSN(page, XLogRecPtrForTemp);
+ if (ncompleted && !gv->index->rd_istemp)
+ gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted);
}
- END_CRIT_SECTION();
-
- UnlockReleaseBuffer(buffer);
-
- if (ncompleted && !gv->index->rd_istemp)
- gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted);
for (i = 0; i < curlenaddon; i++)
pfree(addon[i]);
pfree(addon);
if (completed)
pfree(completed);
+ if (tempPage)
+ pfree(tempPage);
+
return res;
}
gistxlogPageUpdate *xlinfo;
rdata = formUpdateRdata(rel->rd_node, buffer,
- todelete, ntodelete, false,
+ todelete, ntodelete,
NULL, 0,
NULL);
- xlinfo = (gistxlogPageUpdate *) rdata->data;
+ xlinfo = (gistxlogPageUpdate *) rdata->next->data;
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
PageSetLSN(page, recptr);
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.16 2006/05/10 09:19:54 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.17 2006/05/17 16:34:59 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
return;
}
- if (xlrec.data->isemptypage)
+ if (isnewroot)
+ GISTInitBuffer(buffer, 0);
+ else if (xlrec.data->ntodelete)
{
- while (!PageIsEmpty(page))
- PageIndexTupleDelete(page, FirstOffsetNumber);
+ int i;
- if (xlrec.data->blkno == GIST_ROOT_BLKNO)
- GistPageSetLeaf(page);
- else
- GistPageSetDeleted(page);
+ for (i = 0; i < xlrec.data->ntodelete; i++)
+ PageIndexTupleDelete(page, xlrec.todelete[i]);
+ if (GistPageIsLeaf(page))
+ GistMarkTuplesDeleted(page);
}
- else
- {
- if (isnewroot)
- GISTInitBuffer(buffer, 0);
- else if (xlrec.data->ntodelete)
- {
- int i;
- for (i = 0; i < xlrec.data->ntodelete; i++)
- PageIndexTupleDelete(page, xlrec.todelete[i]);
- if (GistPageIsLeaf(page))
- GistMarkTuplesDeleted(page);
- }
+ /* add tuples */
+ if (xlrec.len > 0)
+ gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
- /* add tuples */
- if (xlrec.len > 0)
- gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
+ /*
+ * special case: leafpage, nothing to insert, nothing to delete, then
+ * vacuum marks page
+ */
+ if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
+ GistClearTuplesDeleted(page);
- /*
- * special case: leafpage, nothing to insert, nothing to delete, then
- * vacuum marks page
- */
- if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
- GistClearTuplesDeleted(page);
- }
+ if ( !GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO )
+ /* all links on non-leaf root page was deleted by vacuum full,
+ so root page becomes a leaf */
+ GistPageSetLeaf(page);
GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
PageSetLSN(page, lsn);
UnlockReleaseBuffer(buffer);
}
+static void
+gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
+{
+ gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
+ Relation reln;
+ Buffer buffer;
+ Page page;
+
+ reln = XLogOpenRelation(xldata->node);
+ buffer = XLogReadBuffer(reln, xldata->blkno, false);
+ if (!BufferIsValid(buffer))
+ return;
+
+ GISTInitBuffer( buffer, 0 );
+ page = (Page) BufferGetPage(buffer);
+ GistPageSetDeleted(page);
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+}
+
static void
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
{
case XLOG_GIST_PAGE_UPDATE:
gistRedoPageUpdateRecord(lsn, record, false);
break;
+ case XLOG_GIST_PAGE_DELETE:
+ gistRedoPageDeleteRecord(lsn, record);
+ break;
case XLOG_GIST_NEW_ROOT:
gistRedoPageUpdateRecord(lsn, record, true);
break;
static void
out_target(StringInfo buf, RelFileNode node, ItemPointerData key)
{
- appendStringInfo(buf, "rel %u/%u/%u; tid %u/%u",
- node.spcNode, node.dbNode, node.relNode,
+ appendStringInfo(buf, "rel %u/%u/%u",
+ node.spcNode, node.dbNode, node.relNode);
+ if ( ItemPointerIsValid( &key ) )
+ appendStringInfo(buf, "; tid %u/%u",
ItemPointerGetBlockNumber(&key),
ItemPointerGetOffsetNumber(&key));
}
appendStringInfo(buf, "; block number %u", xlrec->blkno);
}
+static void
+out_gistxlogPageDelete(StringInfo buf, gistxlogPageDelete *xlrec)
+{
+ appendStringInfo(buf, "page_delete: rel %u/%u/%u; blkno %u",
+ xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
+ xlrec->blkno);
+}
+
static void
out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
{
appendStringInfo(buf, "page_update: ");
out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec);
break;
+ case XLOG_GIST_PAGE_DELETE:
+ out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
+ break;
case XLOG_GIST_NEW_ROOT:
appendStringInfo(buf, "new_root: ");
out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
* we split root, just copy tuples from old root to new
* page
*/
- parentitup = gistextractbuffer(buffers[numbuffer - 1],
+ parentitup = gistextractpage(pages[numbuffer - 1],
&pituplen);
/* sanity check */
*/
XLogRecData *
formUpdateRdata(RelFileNode node, Buffer buffer,
- OffsetNumber *todelete, int ntodelete, bool emptypage,
+ OffsetNumber *todelete, int ntodelete,
IndexTuple *itup, int ituplen, ItemPointer key)
{
XLogRecData *rdata;
int cur,
i;
- /* ugly wart in API: emptypage causes us to ignore other inputs */
- if (emptypage)
- ntodelete = ituplen = 0;
-
- rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (2 + ituplen));
+ rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
xlrec->node = node;
xlrec->blkno = BufferGetBlockNumber(buffer);
xlrec->ntodelete = ntodelete;
- xlrec->isemptypage = emptypage;
+
if (key)
xlrec->key = *key;
else
ItemPointerSetInvalid(&(xlrec->key));
- rdata[0].data = (char *) xlrec;
- rdata[0].len = sizeof(gistxlogPageUpdate);
- rdata[0].buffer = InvalidBuffer;
+ rdata[0].buffer = buffer;
+ rdata[0].buffer_std = true;
+ rdata[0].data = NULL;
+ rdata[0].len = 0;
rdata[0].next = &(rdata[1]);
- rdata[1].data = (char *) todelete;
- rdata[1].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
- rdata[1].buffer = buffer;
- rdata[1].buffer_std = true;
- rdata[1].next = NULL;
+ rdata[1].data = (char *) xlrec;
+ rdata[1].len = sizeof(gistxlogPageUpdate);
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].next = &(rdata[2]);
+
+ rdata[2].data = (char *) todelete;
+ rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
+ rdata[2].buffer = buffer;
+ rdata[2].buffer_std = true;
+ rdata[2].next = NULL;
/* new tuples */
- cur = 2;
+ cur = 3;
for (i = 0; i < ituplen; i++)
{
rdata[cur - 1].next = &(rdata[cur]);