/*-------------------------------------------------------------------------
*
* gistvacuum.c
- * interface routines for the postgres GiST index access method.
+ * vacuuming routines for the postgres GiST index access method.
*
*
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.47 2010/02/08 04:33:52 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.48 2010/02/08 05:17:31 tgl Exp $
*
*-------------------------------------------------------------------------
*/
bool needReindex;
} GistBulkDeleteResult;
-typedef struct
-{
- GISTSTATE giststate;
- Relation index;
- MemoryContext opCtx;
- GistBulkDeleteResult *result;
- BufferAccessStrategy strategy;
-} GistVacuum;
-
-typedef struct
-{
- IndexTuple *itup;
- int ituplen;
- bool emptypage;
-} ArrayTuple;
-
-/*
- * Make union of keys on page
- */
-static IndexTuple
-PageMakeUnionKey(GistVacuum *gv, Buffer buffer)
-{
- Page page = BufferGetPage(buffer);
- IndexTuple *vec,
- tmp,
- res;
- int veclen = 0;
- MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
-
- vec = gistextractpage(page, &veclen);
-
- /*
- * we call gistunion() in temprorary context because user-defined
- * functions called in gistunion() may do not free all memory
- */
- tmp = gistunion(gv->index, vec, veclen, &(gv->giststate));
- MemoryContextSwitchTo(oldCtx);
-
- res = (IndexTuple) palloc(IndexTupleSize(tmp));
- memcpy(res, tmp, IndexTupleSize(tmp));
-
- ItemPointerSetBlockNumber(&(res->t_tid), BufferGetBlockNumber(buffer));
- GistTupleSetValid(res);
-
- MemoryContextReset(gv->opCtx);
-
- return res;
-}
-
-static void
-gistDeleteSubtree(GistVacuum *gv, BlockNumber blkno)
-{
- Buffer buffer;
- Page page;
-
- buffer = ReadBufferExtended(gv->index, MAIN_FORKNUM, blkno, RBM_NORMAL,
- gv->strategy);
- LockBuffer(buffer, GIST_EXCLUSIVE);
- page = (Page) BufferGetPage(buffer);
-
- if (!GistPageIsLeaf(page))
- {
- int i;
-
- for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i = OffsetNumberNext(i))
- {
- ItemId iid = PageGetItemId(page, i);
- IndexTuple idxtuple = (IndexTuple) PageGetItem(page, iid);
-
- gistDeleteSubtree(gv, ItemPointerGetBlockNumber(&(idxtuple->t_tid)));
- }
- }
-
- START_CRIT_SECTION();
-
- MarkBufferDirty(buffer);
-
- page = (Page) BufferGetPage(buffer);
- GistPageSetDeleted(page);
- gv->result->std.pages_deleted++;
-
- if (!gv->index->rd_istemp)
- {
- XLogRecData rdata[2];
- XLogRecPtr recptr;
- gistxlogPageDelete xlrec;
-
- xlrec.node = gv->index->rd_node;
- xlrec.blkno = blkno;
-
- rdata[0].buffer = buffer;
- rdata[0].buffer_std = true;
- rdata[0].data = NULL;
- rdata[0].len = 0;
- rdata[0].next = &(rdata[1]);
-
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = (char *) &xlrec;
- rdata[1].len = sizeof(gistxlogPageDelete);
- rdata[1].next = NULL;
-
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE, rdata);
- PageSetLSN(page, recptr);
- PageSetTLI(page, ThisTimeLineID);
- }
- else
- PageSetLSN(page, XLogRecPtrForTemp);
-
- END_CRIT_SECTION();
-
- UnlockReleaseBuffer(buffer);
-}
-
-static ArrayTuple
-vacuumSplitPage(GistVacuum *gv, Page tempPage, Buffer buffer, IndexTuple *addon, int curlenaddon)
-{
- ArrayTuple res = {NULL, 0, false};
- IndexTuple *vec;
- SplitedPageLayout *dist = NULL,
- *ptr;
- int i,
- veclen = 0;
- BlockNumber blkno = BufferGetBlockNumber(buffer);
- MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
-
- vec = gistextractpage(tempPage, &veclen);
- vec = gistjoinvector(vec, &veclen, addon, curlenaddon);
- dist = gistSplit(gv->index, tempPage, vec, veclen, &(gv->giststate));
-
- MemoryContextSwitchTo(oldCtx);
-
- if (blkno != GIST_ROOT_BLKNO)
- {
- /* if non-root split then we should not allocate new buffer */
- dist->buffer = buffer;
- dist->page = tempPage;
- /* during vacuum we never split leaf page */
- GistPageGetOpaque(dist->page)->flags = 0;
- }
- else
- pfree(tempPage);
-
- res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen);
- res.ituplen = 0;
-
- /* make new pages and fills them */
- for (ptr = dist; ptr; ptr = ptr->next)
- {
- char *data;
-
- if (ptr->buffer == InvalidBuffer)
- {
- ptr->buffer = gistNewBuffer(gv->index);
- GISTInitBuffer(ptr->buffer, 0);
- ptr->page = BufferGetPage(ptr->buffer);
- }
- ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
-
- data = (char *) (ptr->list);
- for (i = 0; i < ptr->block.num; i++)
- {
- if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
- elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index));
- data += IndexTupleSize((IndexTuple) data);
- }
-
- ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
- res.itup[res.ituplen] = (IndexTuple) palloc(IndexTupleSize(ptr->itup));
- memcpy(res.itup[res.ituplen], ptr->itup, IndexTupleSize(ptr->itup));
- res.ituplen++;
- }
-
- START_CRIT_SECTION();
-
- for (ptr = dist; ptr; ptr = ptr->next)
- {
- MarkBufferDirty(ptr->buffer);
- GistPageGetOpaque(ptr->page)->rightlink = InvalidBlockNumber;
- }
-
- /* restore splitted non-root page */
- if (blkno != GIST_ROOT_BLKNO)
- {
- PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
- dist->page = BufferGetPage(dist->buffer);
- }
-
- if (!gv->index->rd_istemp)
- {
- XLogRecPtr recptr;
- XLogRecData *rdata;
- ItemPointerData key; /* set key for incomplete insert */
- char *xlinfo;
-
- ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
-
- rdata = formSplitRdata(gv->index->rd_node, blkno,
- false, &key, dist);
- xlinfo = rdata->data;
-
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
- for (ptr = dist; ptr; ptr = ptr->next)
- {
- PageSetLSN(BufferGetPage(ptr->buffer), recptr);
- PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
- }
-
- pfree(xlinfo);
- pfree(rdata);
- }
- else
- {
- for (ptr = dist; ptr; ptr = ptr->next)
- PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
- }
-
- for (ptr = dist; ptr; ptr = ptr->next)
- {
- /* we must keep the buffer pin on the head page */
- if (BufferGetBlockNumber(ptr->buffer) != blkno)
- UnlockReleaseBuffer(ptr->buffer);
- }
-
- if (blkno == GIST_ROOT_BLKNO)
- {
- ItemPointerData key; /* set key for incomplete insert */
-
- ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
-
- gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key);
- }
-
- END_CRIT_SECTION();
-
- MemoryContextReset(gv->opCtx);
-
- return res;
-}
-
-static ArrayTuple
-gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
-{
- ArrayTuple res = {NULL, 0, false};
- Buffer buffer;
- Page page,
- tempPage = NULL;
- OffsetNumber i,
- maxoff;
- ItemId iid;
- int lenaddon = 4,
- curlenaddon = 0,
- nOffToDelete = 0,
- nBlkToDelete = 0;
- IndexTuple idxtuple,
- *addon = NULL;
- bool needwrite = false;
- OffsetNumber offToDelete[MaxOffsetNumber];
- BlockNumber blkToDelete[MaxOffsetNumber];
- ItemPointerData *completed = NULL;
- int ncompleted = 0,
- lencompleted = 16;
-
- vacuum_delay_point();
-
- buffer = ReadBufferExtended(gv->index, MAIN_FORKNUM, blkno, RBM_NORMAL,
- gv->strategy);
- LockBuffer(buffer, GIST_EXCLUSIVE);
- gistcheckpage(gv->index, buffer);
- page = (Page) BufferGetPage(buffer);
- maxoff = PageGetMaxOffsetNumber(page);
-
- if (GistPageIsLeaf(page))
- {
- if (GistTuplesDeleted(page))
- needunion = needwrite = true;
- }
- else
- {
- completed = (ItemPointerData *) palloc(sizeof(ItemPointerData) * lencompleted);
- addon = (IndexTuple *) palloc(sizeof(IndexTuple) * lenaddon);
-
- /* get copy of page to work */
- tempPage = PageGetTempPageCopy(page);
-
- for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
- {
- ArrayTuple chldtuple;
- bool needchildunion;
-
- iid = PageGetItemId(tempPage, i);
- idxtuple = (IndexTuple) PageGetItem(tempPage, iid);
- needchildunion = (GistTupleIsInvalid(idxtuple)) ? true : false;
-
- if (needchildunion)
- elog(DEBUG2, "gistVacuumUpdate: need union for block %u",
- ItemPointerGetBlockNumber(&(idxtuple->t_tid)));
-
- chldtuple = gistVacuumUpdate(gv, ItemPointerGetBlockNumber(&(idxtuple->t_tid)),
- needchildunion);
- if (chldtuple.ituplen || chldtuple.emptypage)
- {
- /* update tuple or/and inserts new */
- if (chldtuple.emptypage)
- blkToDelete[nBlkToDelete++] = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
- offToDelete[nOffToDelete++] = i;
- PageIndexTupleDelete(tempPage, i);
- i--;
- maxoff--;
- needwrite = needunion = true;
-
- if (chldtuple.ituplen)
- {
-
- Assert(chldtuple.emptypage == false);
- while (curlenaddon + chldtuple.ituplen >= lenaddon)
- {
- lenaddon *= 2;
- addon = (IndexTuple *) repalloc(addon, sizeof(IndexTuple) * lenaddon);
- }
-
- memcpy(addon + curlenaddon, chldtuple.itup, chldtuple.ituplen * sizeof(IndexTuple));
-
- curlenaddon += chldtuple.ituplen;
-
- if (chldtuple.ituplen > 1)
- {
- /*
- * child was split, so we need mark completion
- * insert(split)
- */
- int j;
-
- while (ncompleted + chldtuple.ituplen > lencompleted)
- {
- lencompleted *= 2;
- completed = (ItemPointerData *) repalloc(completed, sizeof(ItemPointerData) * lencompleted);
- }
- for (j = 0; j < chldtuple.ituplen; j++)
- {
- ItemPointerCopy(&(chldtuple.itup[j]->t_tid), completed + ncompleted);
- ncompleted++;
- }
- }
- pfree(chldtuple.itup);
- }
- }
- }
-
- Assert(maxoff == PageGetMaxOffsetNumber(tempPage));
-
- if (curlenaddon)
- {
- /* insert updated tuples */
- if (gistnospace(tempPage, addon, curlenaddon, InvalidOffsetNumber, 0))
- {
- /* there is no space on page to insert tuples */
- res = vacuumSplitPage(gv, tempPage, buffer, addon, curlenaddon);
- tempPage = NULL; /* vacuumSplitPage() free tempPage */
- needwrite = needunion = false; /* gistSplit already forms
- * unions and writes pages */
- }
- else
- /* enough free space */
- gistfillbuffer(tempPage, addon, curlenaddon, InvalidOffsetNumber);
- }
- }
-
- /*
- * If page is empty, we should remove pointer to it before deleting page
- * (except root)
- */
-
- if (blkno != GIST_ROOT_BLKNO && (PageIsEmpty(page) || (tempPage && PageIsEmpty(tempPage))))
- {
- /*
- * New version of page is empty, so leave it unchanged, upper call
- * will mark our page as deleted. In case of page split we never will
- * be here...
- *
- * If page was empty it can't become non-empty during processing
- */
- res.emptypage = true;
- UnlockReleaseBuffer(buffer);
- }
- else
- {
- /* write page and remove its childs if it need */
-
- START_CRIT_SECTION();
-
- if (tempPage && needwrite)
- {
- PageRestoreTempPage(tempPage, page);
- tempPage = NULL;
- }
-
- /* Empty index */
- if (PageIsEmpty(page) && blkno == GIST_ROOT_BLKNO)
- {
- needwrite = true;
- GistPageSetLeaf(page);
- }
-
-
- if (needwrite)
- {
- MarkBufferDirty(buffer);
- GistClearTuplesDeleted(page);
-
- if (!gv->index->rd_istemp)
- {
- XLogRecData *rdata;
- XLogRecPtr recptr;
- char *xlinfo;
-
- rdata = formUpdateRdata(gv->index->rd_node, buffer,
- offToDelete, nOffToDelete,
- addon, curlenaddon, NULL);
- xlinfo = rdata->next->data;
-
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
- PageSetLSN(page, recptr);
- PageSetTLI(page, ThisTimeLineID);
-
- pfree(xlinfo);
- pfree(rdata);
- }
- else
- PageSetLSN(page, XLogRecPtrForTemp);
- }
-
- END_CRIT_SECTION();
-
- if (needunion && !PageIsEmpty(page))
- {
- res.itup = (IndexTuple *) palloc(sizeof(IndexTuple));
- res.ituplen = 1;
- res.itup[0] = PageMakeUnionKey(gv, buffer);
- }
-
- UnlockReleaseBuffer(buffer);
-
- /* delete empty children, now we havn't any links to pointed subtrees */
- for (i = 0; i < nBlkToDelete; i++)
- gistDeleteSubtree(gv, blkToDelete[i]);
-
- if (ncompleted && !gv->index->rd_istemp)
- gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted);
- }
-
-
- for (i = 0; i < curlenaddon; i++)
- pfree(addon[i]);
- if (addon)
- pfree(addon);
- if (completed)
- pfree(completed);
- if (tempPage)
- pfree(tempPage);
-
- return res;
-}
/*
* VACUUM cleanup: update FSM