]> granicus.if.org Git - postgresql/commitdiff
Reduce size of critial section during vacuum full, critical
authorTeodor Sigaev <teodor@sigaev.ru>
Wed, 17 May 2006 16:34:59 +0000 (16:34 +0000)
committerTeodor Sigaev <teodor@sigaev.ru>
Wed, 17 May 2006 16:34:59 +0000 (16:34 +0000)
sections now isn't nested. All user-defined functions now is
called outside critsections. Small improvements in WAL
protocol.

TODO: improve XLOG replay

src/backend/access/gist/gist.c
src/backend/access/gist/gistutil.c
src/backend/access/gist/gistvacuum.c
src/backend/access/gist/gistxlog.c
src/include/access/gist_private.h

index 4ce461d4463ba739439d6b2eaafd3a02d8f4c1a8..d207b7ecfa7f72e7c1205e5c23b8978536b412a5 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.134 2006/05/10 23:18:38 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.135 2006/05/17 16:34:59 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -347,7 +347,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
                 * Form index tuples vector to split:
                 * remove old tuple if t's needed and add new tuples to vector
                 */
-               itvec = gistextractbuffer(state->stack->buffer, &tlen);
+               itvec = gistextractpage(state->stack->page, &tlen);
                if ( !is_leaf ) {
                        /* on inner page we should remove old tuple */
                        int pos = state->stack->childoffnum - FirstOffsetNumber;
@@ -501,7 +501,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
                        }
 
                        rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer,
-                                                                       offs, noffs, false,
+                                                                       offs, noffs, 
                                                                        state->itup, state->ituplen,
                                                                        &(state->key));
 
@@ -1157,7 +1157,7 @@ gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer ke
                XLogRecData *rdata;
 
                rdata = formUpdateRdata(r->rd_node, buffer,
-                                                               NULL, 0, false,
+                                                               NULL, 0,
                                                                itup, len, key);
 
                recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
index d5d6405100b3e803a7022da690026944d3603ad5..ca5a9d652d22a2b8e133fb483b6591da93da9d6b 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *                     $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.11 2006/05/10 09:19:54 teodor Exp $
+ *                     $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.12 2006/05/17 16:34:59 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -112,18 +112,17 @@ gistfitpage(IndexTuple *itvec, int len) {
  * Read buffer into itup vector
  */
 IndexTuple *
-gistextractbuffer(Buffer buffer, int *len /* out */ )
+gistextractpage(Page page, int *len /* out */ )
 {
        OffsetNumber i,
                                maxoff;
        IndexTuple *itvec;
-       Page            p = (Page) BufferGetPage(buffer);
 
-       maxoff = PageGetMaxOffsetNumber(p);
+       maxoff = PageGetMaxOffsetNumber(page);
        *len = maxoff;
        itvec = palloc(sizeof(IndexTuple) * maxoff);
        for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
-               itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
+               itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
 
        return itvec;
 }
index e81c0ebf487fb53ffe06e69fd86075152b895953..9b32304d1ae7c231ebb789ff4e4991c15422077f 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.20 2006/05/10 09:19:54 teodor Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.21 2006/05/17 16:34:59 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -47,23 +47,235 @@ typedef struct
        bool            emptypage;
 } ArrayTuple;
 
+/*
+ * Make union of keys on page 
+ */
+static IndexTuple
+PageMakeUnionKey(GistVacuum *gv, Buffer buffer) {
+       Page    page = BufferGetPage( buffer );
+       IndexTuple *vec,
+                               tmp, res;
+       int                     veclen = 0;
+       MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
+
+       vec = gistextractpage(page, &veclen);
+       /* we call gistunion() in temprorary context because user-defined functions called in gistunion()
+          may do not free all memory */
+       tmp = gistunion(gv->index, vec, veclen, &(gv->giststate));
+       MemoryContextSwitchTo(oldCtx);
+
+       res = (IndexTuple) palloc(IndexTupleSize(tmp));
+       memcpy(res, tmp, IndexTupleSize(tmp));
+
+       ItemPointerSetBlockNumber(&(res->t_tid), BufferGetBlockNumber(buffer));
+       GistTupleSetValid(res);
+
+       MemoryContextReset(gv->opCtx);
+
+       return res;
+}
+
+static void
+gistDeleteSubtree( GistVacuum *gv, BlockNumber blkno ) {
+       Buffer  buffer;
+       Page    page;
+
+       buffer = ReadBuffer(gv->index, blkno);
+       LockBuffer(buffer, GIST_EXCLUSIVE);
+       page = (Page) BufferGetPage(buffer);
+
+       if ( !GistPageIsLeaf(page) ) {
+               int     i;
+
+               for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i = OffsetNumberNext(i)) {
+                       ItemId iid = PageGetItemId(page, i);
+                       IndexTuple idxtuple = (IndexTuple) PageGetItem(page, iid);
+                       gistDeleteSubtree(gv, ItemPointerGetBlockNumber(&(idxtuple->t_tid))); 
+               }
+       }
+
+       START_CRIT_SECTION();
+
+       MarkBufferDirty(buffer);
+
+       page = (Page) BufferGetPage(buffer);
+       GistPageSetDeleted(page);
+       gv->result->std.pages_deleted++;
+
+       if (!gv->index->rd_istemp)
+       {
+               XLogRecData rdata;
+               XLogRecPtr      recptr;
+               gistxlogPageDelete      xlrec;
+
+               xlrec.node = gv->index->rd_node;
+               xlrec.blkno = blkno;
+
+               rdata.buffer = InvalidBuffer;
+               rdata.data = (char *) &xlrec;
+               rdata.len = sizeof(gistxlogPageDelete);
+               rdata.next = NULL;
+
+               recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE, &rdata);
+               PageSetLSN(page, recptr);
+               PageSetTLI(page, ThisTimeLineID);
+       }
+       else
+               PageSetLSN(page, XLogRecPtrForTemp);
+       
+       END_CRIT_SECTION();
+
+       UnlockReleaseBuffer(buffer);
+}
+
+static Page    
+GistPageGetCopyPage( Page page ) {
+       Size    pageSize = PageGetPageSize( page );
+       Page tmppage;
+
+       tmppage=(Page)palloc( pageSize );
+       memcpy( tmppage, page, pageSize );
+
+       return tmppage;
+}
+
+static ArrayTuple
+vacuumSplitPage(GistVacuum *gv, Page tempPage, Buffer buffer, IndexTuple *addon, int curlenaddon) {
+       ArrayTuple      res = {NULL, 0, false};
+       IndexTuple *vec;
+       SplitedPageLayout *dist = NULL,
+                                  *ptr;
+       int                     i, veclen=0;
+       BlockNumber     blkno = BufferGetBlockNumber(buffer);
+       MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
+
+       vec = gistextractpage(tempPage, &veclen);
+       vec = gistjoinvector(vec, &veclen, addon, curlenaddon);
+       dist = gistSplit(gv->index, tempPage, vec, veclen, &(gv->giststate));
+
+       MemoryContextSwitchTo(oldCtx);
+
+       if (blkno != GIST_ROOT_BLKNO) {
+               /* if non-root split then we should not allocate new buffer */
+               dist->buffer = buffer;
+               dist->page = tempPage;
+               /* during vacuum we never split leaf page */
+               GistPageGetOpaque(dist->page)->flags = 0;
+       } else
+               pfree(tempPage);
+
+       res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen);
+       res.ituplen = 0;
+
+       /* make new pages and fills them */
+       for (ptr = dist; ptr; ptr = ptr->next) {
+               char *data;
+
+               if ( ptr->buffer == InvalidBuffer ) {
+                       ptr->buffer = gistNewBuffer( gv->index );
+                       GISTInitBuffer( ptr->buffer, 0 );
+                       ptr->page = BufferGetPage(ptr->buffer);
+               }
+               ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
+
+               data = (char*)(ptr->list);
+               for(i=0;i<ptr->block.num;i++) {
+                       if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
+                               elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index));
+                       data += IndexTupleSize((IndexTuple)data);
+               }
+
+               ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
+               res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup));
+               memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) );
+               res.ituplen++;
+       }
+
+       START_CRIT_SECTION();
+
+       for (ptr = dist; ptr; ptr = ptr->next) {
+               MarkBufferDirty(ptr->buffer);
+               GistPageGetOpaque(ptr->page)->rightlink = InvalidBlockNumber;
+       }
+
+       /* restore splitted non-root page */
+       if (blkno != GIST_ROOT_BLKNO) {
+               PageRestoreTempPage( dist->page, BufferGetPage( dist->buffer ) );
+               dist->page = BufferGetPage( dist->buffer );
+       }
+
+       if (!gv->index->rd_istemp)
+       {
+               XLogRecPtr      recptr;
+               XLogRecData *rdata;
+               ItemPointerData key;            /* set key for incomplete
+                                                                        * insert */
+               char       *xlinfo;
+
+               ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
+
+               rdata = formSplitRdata(gv->index->rd_node, blkno,
+                                                                                  false, &key, dist);
+               xlinfo = rdata->data;
+
+               recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
+               for (ptr = dist; ptr; ptr = ptr->next)
+               {
+                       PageSetLSN(BufferGetPage(ptr->buffer), recptr);
+                       PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
+               }
+
+               pfree(xlinfo);
+               pfree(rdata);
+       }
+       else
+       {
+               for (ptr = dist; ptr; ptr = ptr->next)
+                       PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
+       }
+
+       for (ptr = dist; ptr; ptr = ptr->next)
+       {
+               /* we must keep the buffer pin on the head page */
+               if (BufferGetBlockNumber(ptr->buffer) != blkno)
+                       UnlockReleaseBuffer( ptr->buffer );
+       }
+
+       if (blkno == GIST_ROOT_BLKNO)
+       {
+               ItemPointerData key;            /* set key for incomplete
+                                                                        * insert */
+
+               ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
+
+               gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key);
+       }
+
+       END_CRIT_SECTION();
+
+       MemoryContextReset(gv->opCtx);
+
+       return res;
+}
 
 static ArrayTuple
 gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
 {
        ArrayTuple      res = {NULL, 0, false};
        Buffer          buffer;
-       Page            page;
+       Page            page, tempPage = NULL;
        OffsetNumber i,
                                maxoff;
        ItemId          iid;
        int                     lenaddon = 4,
                                curlenaddon = 0,
-                               ntodelete = 0;
+                               nOffToDelete = 0,
+                               nBlkToDelete = 0;
        IndexTuple      idxtuple,
                           *addon = NULL;
        bool            needwrite = false;
-       OffsetNumber todelete[MaxOffsetNumber];
+       OffsetNumber offToDelete[MaxOffsetNumber];
+       BlockNumber  blkToDelete[MaxOffsetNumber];
        ItemPointerData *completed = NULL;
        int                     ncompleted = 0,
                                lencompleted = 16;
@@ -76,12 +288,6 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
        page = (Page) BufferGetPage(buffer);
        maxoff = PageGetMaxOffsetNumber(page);
 
-       /*
-        * XXX need to reduce scope of changes to page so we can make this
-        * critical section less extensive
-        */
-       START_CRIT_SECTION();
-
        if (GistPageIsLeaf(page))
        {
                if (GistTuplesDeleted(page))
@@ -92,13 +298,16 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
                completed = (ItemPointerData *) palloc(sizeof(ItemPointerData) * lencompleted);
                addon = (IndexTuple *) palloc(sizeof(IndexTuple) * lenaddon);
 
+               /* get copy of page to work */
+               tempPage = GistPageGetCopyPage(page);
+
                for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
                {
                        ArrayTuple      chldtuple;
                        bool            needchildunion;
 
-                       iid = PageGetItemId(page, i);
-                       idxtuple = (IndexTuple) PageGetItem(page, iid);
+                       iid = PageGetItemId(tempPage, i);
+                       idxtuple = (IndexTuple) PageGetItem(tempPage, iid);
                        needchildunion = (GistTupleIsInvalid(idxtuple)) ? true : false;
 
                        if (needchildunion)
@@ -109,14 +318,19 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
                                                                                 needchildunion);
                        if (chldtuple.ituplen || chldtuple.emptypage)
                        {
-                               PageIndexTupleDelete(page, i);
-                               todelete[ntodelete++] = i;
+                               /* update tuple or/and inserts new */
+                               if ( chldtuple.emptypage )
+                                       blkToDelete[nBlkToDelete++] = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
+                               offToDelete[nOffToDelete++] = i;
+                               PageIndexTupleDelete(tempPage, i);
                                i--;
                                maxoff--;
                                needwrite = needunion = true;
 
                                if (chldtuple.ituplen)
                                {
+
+                                       Assert( chldtuple.emptypage == false );
                                        while (curlenaddon + chldtuple.ituplen >= lenaddon)
                                        {
                                                lenaddon *= 2;
@@ -150,200 +364,102 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
                                }
                        }
                }
+               
+               Assert( maxoff == PageGetMaxOffsetNumber(tempPage) );
 
                if (curlenaddon)
                {
                        /* insert updated tuples */
-                       if (gistnospace(page, addon, curlenaddon, InvalidOffsetNumber))
-                       {
+                       if (gistnospace(tempPage, addon, curlenaddon, InvalidOffsetNumber)) {
                                /* there is no space on page to insert tuples */
-                               IndexTuple *vec;
-                               SplitedPageLayout *dist = NULL,
-                                                  *ptr;
-                               int                     i, veclen=0;
-                               MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
-
-                               vec = gistextractbuffer(buffer, &veclen);
-                               vec = gistjoinvector(vec, &veclen, addon, curlenaddon);
-                               dist = gistSplit(gv->index, page, vec, veclen, &(gv->giststate));
-
-                               MemoryContextSwitchTo(oldCtx);
-
-                               if (blkno != GIST_ROOT_BLKNO) {
-                                       /* if non-root split then we should not allocate new buffer */
-                                       dist->buffer = buffer;
-                                       dist->page = BufferGetPage(dist->buffer);
-                                       GistPageGetOpaque(dist->page)->flags = 0;
-                               }
-
-                               res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen);
-                               res.ituplen = 0;
-
-                               /* make new pages and fills them */
-                               for (ptr = dist; ptr; ptr = ptr->next) {
-                                       char *data;
-
-                                       if ( ptr->buffer == InvalidBuffer ) {
-                                               ptr->buffer = gistNewBuffer( gv->index );
-                                               GISTInitBuffer( ptr->buffer, 0 );
-                                               ptr->page = BufferGetPage(ptr->buffer);
-                                       }
-                                       ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
-
-                                       data = (char*)(ptr->list);
-                                       for(i=0;i<ptr->block.num;i++) {
-                                               if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
-                                                       elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index));
-                                               data += IndexTupleSize((IndexTuple)data);
-                                       }
-
-                                       ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
-                                       res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup));
-                                       memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) );
-                                       res.ituplen++;
-
-                                       MarkBufferDirty(ptr->buffer);
-                               }
-
-                               if (!gv->index->rd_istemp)
-                               {
-                                       XLogRecPtr      recptr;
-                                       XLogRecData *rdata;
-                                       ItemPointerData key;            /* set key for incomplete
-                                                                                                * insert */
-                                       char       *xlinfo;
-
-                                       ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
+                               res = vacuumSplitPage(gv, tempPage, buffer, addon, curlenaddon);
+                               tempPage=NULL; /* vacuumSplitPage() free tempPage */
+                               needwrite = needunion = false;          /* gistSplit already forms unions and writes pages */
+                       } else
+                               /* enough free space */
+                               gistfillbuffer(gv->index, tempPage, addon, curlenaddon, InvalidOffsetNumber);
+               }
+       }
 
-                                       rdata = formSplitRdata(gv->index->rd_node, blkno,
-                                                                                  false, &key, dist);
-                                       xlinfo = rdata->data;
+       /* 
+        * If page is empty, we should remove pointer to it before
+        * deleting page (except root)
+        */
 
-                                       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
-                                       for (ptr = dist; ptr; ptr = ptr->next)
-                                       {
-                                               PageSetLSN(BufferGetPage(ptr->buffer), recptr);
-                                               PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
-                                       }
+       if ( blkno != GIST_ROOT_BLKNO && ( PageIsEmpty(page) || (tempPage && PageIsEmpty(tempPage)) ) ) {
+               /*
+                * New version of page is empty, so leave it unchanged,
+                * upper call will mark our page as deleted.
+                * In case of page split we never will be here...
+                *
+                * If page was empty it can't become non-empty during processing 
+                */
+               res.emptypage = true;
+               UnlockReleaseBuffer(buffer);
+       } else {
+               /* write page and remove its childs if it need */
 
-                                       pfree(xlinfo);
-                                       pfree(rdata);
-                               }
-                               else
-                               {
-                                       for (ptr = dist; ptr; ptr = ptr->next)
-                                       {
-                                               PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
-                                       }
-                               }
+               START_CRIT_SECTION();
 
-                               for (ptr = dist; ptr; ptr = ptr->next)
-                               {
-                                       /* we must keep the buffer pin on the head page */
-                                       if (BufferGetBlockNumber(ptr->buffer) != blkno)
-                                               UnlockReleaseBuffer( ptr->buffer );
-                               }
+               if ( tempPage && needwrite ) {
+                       PageRestoreTempPage(tempPage, page);
+                       tempPage = NULL;
+               }
 
-                               if (blkno == GIST_ROOT_BLKNO)
-                               {
-                                       ItemPointerData key;            /* set key for incomplete
-                                                                                                * insert */
+               /* Empty index */ 
+               if (PageIsEmpty(page) && blkno == GIST_ROOT_BLKNO )
+               {
+                       needwrite = true;
+                       GistPageSetLeaf(page);
+               }
 
-                                       ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
+       
+               if (needwrite)
+               {
+                       MarkBufferDirty(buffer);
+                       GistClearTuplesDeleted(page);
 
-                                       oldCtx = MemoryContextSwitchTo(gv->opCtx);
-                                       gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key);
-                                       MemoryContextSwitchTo(oldCtx);
-                               }
+                       if (!gv->index->rd_istemp)
+                       {
+                               XLogRecData *rdata;
+                               XLogRecPtr      recptr;
+                               char       *xlinfo;
 
-                               needwrite = false;
+                               rdata = formUpdateRdata(gv->index->rd_node, buffer,
+                                                                               offToDelete, nOffToDelete,
+                                                                               addon, curlenaddon, NULL);
+                               xlinfo = rdata->next->data;
 
-                               MemoryContextReset(gv->opCtx);
+                               recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
+                               PageSetLSN(page, recptr);
+                               PageSetTLI(page, ThisTimeLineID);
 
-                               needunion = false;              /* gistSplit already forms unions */
+                               pfree(xlinfo);
+                               pfree(rdata);
                        }
                        else
-                       {
-                               /* enough free space */
-                               gistfillbuffer(gv->index, page, addon, curlenaddon, InvalidOffsetNumber);
-                       }
+                               PageSetLSN(page, XLogRecPtrForTemp);
                }
-       }
 
-       if (needunion)
-       {
-               /* forms union for page  or check empty */
-               if (PageIsEmpty(page))
-               {
-                       if (blkno == GIST_ROOT_BLKNO)
-                       {
-                               needwrite = true;
-                               GistPageSetLeaf(page);
-                       }
-                       else
-                       {
-                               needwrite = true;
-                               res.emptypage = true;
-                               GistPageSetDeleted(page);
-                               gv->result->std.pages_deleted++;
-                       }
-               }
-               else
-               {
-                       IndexTuple *vec,
-                                               tmp;
-                       int                     veclen = 0;
-                       MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
-
-                       vec = gistextractbuffer(buffer, &veclen);
-                       tmp = gistunion(gv->index, vec, veclen, &(gv->giststate));
-                       MemoryContextSwitchTo(oldCtx);
+               END_CRIT_SECTION();
 
+               if ( needunion && !PageIsEmpty(page) )
+               {
                        res.itup = (IndexTuple *) palloc(sizeof(IndexTuple));
                        res.ituplen = 1;
-                       res.itup[0] = (IndexTuple) palloc(IndexTupleSize(tmp));
-                       memcpy(res.itup[0], tmp, IndexTupleSize(tmp));
-
-                       ItemPointerSetBlockNumber(&(res.itup[0]->t_tid), blkno);
-                       GistTupleSetValid(res.itup[0]);
-
-                       MemoryContextReset(gv->opCtx);
+                       res.itup[0] = PageMakeUnionKey(gv, buffer);
                }
-       }
 
-       if (needwrite)
-       {
-               MarkBufferDirty(buffer);
-               GistClearTuplesDeleted(page);
-
-               if (!gv->index->rd_istemp)
-               {
-                       XLogRecData *rdata;
-                       XLogRecPtr      recptr;
-                       char       *xlinfo;
-
-                       rdata = formUpdateRdata(gv->index->rd_node, buffer,
-                                                                       todelete, ntodelete, res.emptypage,
-                                                                       addon, curlenaddon, NULL);
-                       xlinfo = rdata->data;
+               UnlockReleaseBuffer(buffer);
 
-                       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
-                       PageSetLSN(page, recptr);
-                       PageSetTLI(page, ThisTimeLineID);
+               /* delete empty children, now we havn't any links to pointed subtrees */
+               for(i=0;i<nBlkToDelete;i++) 
+                       gistDeleteSubtree(gv, blkToDelete[i]);
 
-                       pfree(xlinfo);
-                       pfree(rdata);
-               }
-               else
-                       PageSetLSN(page, XLogRecPtrForTemp);
+               if (ncompleted && !gv->index->rd_istemp)
+                       gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted);
        }
 
-       END_CRIT_SECTION();
-
-       UnlockReleaseBuffer(buffer);
-
-       if (ncompleted && !gv->index->rd_istemp)
-               gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted);
 
        for (i = 0; i < curlenaddon; i++)
                pfree(addon[i]);
@@ -351,6 +467,9 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
                pfree(addon);
        if (completed)
                pfree(completed);
+       if (tempPage)
+               pfree(tempPage);
+
        return res;
 }
 
@@ -627,10 +746,10 @@ gistbulkdelete(PG_FUNCTION_ARGS)
                                        gistxlogPageUpdate *xlinfo;
 
                                        rdata = formUpdateRdata(rel->rd_node, buffer,
-                                                                                       todelete, ntodelete, false,
+                                                                                       todelete, ntodelete, 
                                                                                        NULL, 0,
                                                                                        NULL);
-                                       xlinfo = (gistxlogPageUpdate *) rdata->data;
+                                       xlinfo = (gistxlogPageUpdate *) rdata->next->data;
 
                                        recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
                                        PageSetLSN(page, recptr);
index a029d8f1ec5540c7b08f5c28a577cea72cfb77dc..01dab119b2ef01f1e7c5dcb1fa07043640792375 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *                      $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.16 2006/05/10 09:19:54 teodor Exp $
+ *                      $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.17 2006/05/17 16:34:59 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -209,41 +209,33 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
                return;
        }
 
-       if (xlrec.data->isemptypage)
+       if (isnewroot)
+               GISTInitBuffer(buffer, 0);
+       else if (xlrec.data->ntodelete)
        {
-               while (!PageIsEmpty(page))
-                       PageIndexTupleDelete(page, FirstOffsetNumber);
+               int                     i;
 
-               if (xlrec.data->blkno == GIST_ROOT_BLKNO)
-                       GistPageSetLeaf(page);
-               else
-                       GistPageSetDeleted(page);
+               for (i = 0; i < xlrec.data->ntodelete; i++)
+                       PageIndexTupleDelete(page, xlrec.todelete[i]);
+               if (GistPageIsLeaf(page))
+                       GistMarkTuplesDeleted(page);
        }
-       else
-       {
-               if (isnewroot)
-                       GISTInitBuffer(buffer, 0);
-               else if (xlrec.data->ntodelete)
-               {
-                       int                     i;
 
-                       for (i = 0; i < xlrec.data->ntodelete; i++)
-                               PageIndexTupleDelete(page, xlrec.todelete[i]);
-                       if (GistPageIsLeaf(page))
-                               GistMarkTuplesDeleted(page);
-               }
+       /* add tuples */
+       if (xlrec.len > 0)
+               gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
 
-               /* add tuples */
-               if (xlrec.len > 0)
-                       gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
+       /*
+        * special case: leafpage, nothing to insert, nothing to delete, then
+        * vacuum marks page
+        */
+       if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
+               GistClearTuplesDeleted(page);
 
-               /*
-                * special case: leafpage, nothing to insert, nothing to delete, then
-                * vacuum marks page
-                */
-               if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
-                       GistClearTuplesDeleted(page);
-       }
+       if ( !GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO )
+               /* all links on non-leaf root page was deleted by vacuum full,
+                  so root page becomes a leaf */
+               GistPageSetLeaf(page);
 
        GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
        PageSetLSN(page, lsn);
@@ -252,6 +244,29 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
        UnlockReleaseBuffer(buffer);
 }
 
+static void
+gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
+{
+       gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
+       Relation        reln;
+       Buffer          buffer;
+       Page            page;
+
+       reln = XLogOpenRelation(xldata->node);
+       buffer = XLogReadBuffer(reln, xldata->blkno, false);
+       if (!BufferIsValid(buffer))
+               return;
+
+       GISTInitBuffer( buffer, 0 );
+       page = (Page) BufferGetPage(buffer);
+       GistPageSetDeleted(page);
+
+       PageSetLSN(page, lsn);
+       PageSetTLI(page, ThisTimeLineID);
+       MarkBufferDirty(buffer);
+       UnlockReleaseBuffer(buffer);
+}
+
 static void
 decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
 {
@@ -382,6 +397,9 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
                case XLOG_GIST_PAGE_UPDATE:
                        gistRedoPageUpdateRecord(lsn, record, false);
                        break;
+               case XLOG_GIST_PAGE_DELETE:
+                       gistRedoPageDeleteRecord(lsn, record);
+                       break;
                case XLOG_GIST_NEW_ROOT:
                        gistRedoPageUpdateRecord(lsn, record, true);
                        break;
@@ -405,8 +423,10 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
 static void
 out_target(StringInfo buf, RelFileNode node, ItemPointerData key)
 {
-       appendStringInfo(buf, "rel %u/%u/%u; tid %u/%u",
-                       node.spcNode, node.dbNode, node.relNode,
+       appendStringInfo(buf, "rel %u/%u/%u",
+                       node.spcNode, node.dbNode, node.relNode);
+       if ( ItemPointerIsValid( &key ) )
+               appendStringInfo(buf, "; tid %u/%u",
                        ItemPointerGetBlockNumber(&key),
                        ItemPointerGetOffsetNumber(&key));
 }
@@ -418,6 +438,14 @@ out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
        appendStringInfo(buf, "; block number %u", xlrec->blkno);
 }
 
+static void
+out_gistxlogPageDelete(StringInfo buf, gistxlogPageDelete *xlrec)
+{
+       appendStringInfo(buf, "page_delete: rel %u/%u/%u; blkno %u",
+                       xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
+                       xlrec->blkno);
+}
+
 static void
 out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
 {
@@ -438,6 +466,9 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec)
                        appendStringInfo(buf, "page_update: ");
                        out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec);
                        break;
+               case XLOG_GIST_PAGE_DELETE:
+                       out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
+                       break;
                case XLOG_GIST_NEW_ROOT:
                        appendStringInfo(buf, "new_root: ");
                        out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
@@ -643,7 +674,7 @@ gistContinueInsert(gistIncompleteInsert *insert)
                                         * we split root, just copy tuples from old root to new
                                         * page
                                         */
-                                       parentitup = gistextractbuffer(buffers[numbuffer - 1],
+                                       parentitup = gistextractpage(pages[numbuffer - 1],
                                                                                                   &pituplen);
 
                                        /* sanity check */
@@ -796,7 +827,7 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
  */
 XLogRecData *
 formUpdateRdata(RelFileNode node, Buffer buffer,
-                               OffsetNumber *todelete, int ntodelete, bool emptypage,
+                               OffsetNumber *todelete, int ntodelete,
                                IndexTuple *itup, int ituplen, ItemPointer key)
 {
        XLogRecData *rdata;
@@ -804,35 +835,37 @@ formUpdateRdata(RelFileNode node, Buffer buffer,
        int                     cur,
                                i;
 
-       /* ugly wart in API: emptypage causes us to ignore other inputs */
-       if (emptypage)
-               ntodelete = ituplen = 0;
-
-       rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (2 + ituplen));
+       rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
        xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
 
        xlrec->node = node;
        xlrec->blkno = BufferGetBlockNumber(buffer);
        xlrec->ntodelete = ntodelete;
-       xlrec->isemptypage = emptypage;
+
        if (key)
                xlrec->key = *key;
        else
                ItemPointerSetInvalid(&(xlrec->key));
 
-       rdata[0].data = (char *) xlrec;
-       rdata[0].len = sizeof(gistxlogPageUpdate);
-       rdata[0].buffer = InvalidBuffer;
+       rdata[0].buffer = buffer;
+       rdata[0].buffer_std = true;
+       rdata[0].data = NULL;
+       rdata[0].len = 0;
        rdata[0].next = &(rdata[1]);
 
-       rdata[1].data = (char *) todelete;
-       rdata[1].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
-       rdata[1].buffer = buffer;
-       rdata[1].buffer_std = true;
-       rdata[1].next = NULL;
+       rdata[1].data = (char *) xlrec;
+       rdata[1].len = sizeof(gistxlogPageUpdate);
+       rdata[1].buffer = InvalidBuffer;
+       rdata[1].next = &(rdata[2]);
+
+       rdata[2].data = (char *) todelete;
+       rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
+       rdata[2].buffer = buffer;
+       rdata[2].buffer_std = true;
+       rdata[2].next = NULL;
 
        /* new tuples */
-       cur = 2;
+       cur = 3;
        for (i = 0; i < ituplen; i++)
        {
                rdata[cur - 1].next = &(rdata[cur]);
index 7e9469f000b9e2b7b2898fea4ef0ee16c40e1bd1..a866277fe9f462df637c9ad55b1096027513c2cb 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.13 2006/05/10 09:19:54 teodor Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.14 2006/05/17 16:34:59 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -85,20 +85,21 @@ extern const XLogRecPtr XLogRecPtrForTemp;
 #define XLOG_GIST_PAGE_SPLIT           0x30
 #define XLOG_GIST_INSERT_COMPLETE      0x40
 #define XLOG_GIST_CREATE_INDEX         0x50
+#define XLOG_GIST_PAGE_DELETE          0x60
 
 typedef struct gistxlogPageUpdate
 {
        RelFileNode node;
        BlockNumber blkno;
 
-       uint16          ntodelete;
-       bool            isemptypage;
-
        /*
         * It used to identify completeness of insert. Sets to leaf itup
         */
        ItemPointerData key;
 
+       /* number of deleted offsets */
+       uint16          ntodelete;
+
        /*
         * follow: 1. todelete OffsetNumbers 2. tuples to insert
         */
@@ -131,6 +132,11 @@ typedef struct gistxlogInsertComplete
        /* follows ItemPointerData key to clean */
 } gistxlogInsertComplete;
 
+typedef struct gistxlogPageDelete
+{
+       RelFileNode node;
+       BlockNumber blkno;
+} gistxlogPageDelete;
 
 /* SplitedPageLayout - gistSplit function result */
 typedef struct SplitedPageLayout
@@ -249,7 +255,7 @@ extern void gist_xlog_cleanup(void);
 extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno);
 
 extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer,
-                               OffsetNumber *todelete, int ntodelete, bool emptypage,
+                               OffsetNumber *todelete, int ntodelete, 
                                IndexTuple *itup, int ituplen, ItemPointer key);
 
 extern XLogRecData *formSplitRdata(RelFileNode node,
@@ -273,7 +279,7 @@ extern void gistcheckpage(Relation rel, Buffer buf);
 extern Buffer gistNewBuffer(Relation r);
 extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup,
                           int len, OffsetNumber off);
-extern IndexTuple *gistextractbuffer(Buffer buffer, int *len /* out */ );
+extern IndexTuple *gistextractpage(Page page, int *len /* out */ );
 extern IndexTuple *gistjoinvector(
                           IndexTuple *itvec, int *len,
                           IndexTuple *additvec, int addlen);