]> granicus.if.org Git - postgresql/commitdiff
Fix possible duplicate tuples while GiST scan. Now page is processed
authorTeodor Sigaev <teodor@sigaev.ru>
Sat, 23 Aug 2008 10:43:58 +0000 (10:43 +0000)
committerTeodor Sigaev <teodor@sigaev.ru>
Sat, 23 Aug 2008 10:43:58 +0000 (10:43 +0000)
at once and ItemPointers are collected in memory.

Remove tuple's killing by killtuple() if tuple was moved to another
page - it could produce unaceptable overhead.

Backpatch up to 8.1 because the bug was introduced by GiST's concurrency support.

src/backend/access/gist/gistget.c
src/backend/access/gist/gistscan.c
src/include/access/gist_private.h

index 1e02ec082f4f0f559367722eda599520b60348c3..feeb84d38d73ff1c29b94671e0e6b7263c38606e 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.52.2.1 2005/11/22 18:23:03 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.52.2.2 2008/08/23 10:43:58 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -30,62 +30,38 @@ static bool gistindex_keytest(IndexTuple tuple, IndexScanDesc scan,
 static void
 killtuple(Relation r, GISTScanOpaque so, ItemPointer iptr)
 {
-       Buffer          buffer = so->curbuf;
+       Page        p;
+       OffsetNumber offset;
 
-       for (;;)
+       LockBuffer(so->curbuf, GIST_SHARE);
+       p = (Page) BufferGetPage(so->curbuf);
+       if (XLByteEQ(so->stack->lsn, PageGetLSN(p)))
        {
-               Page            p;
-               BlockNumber blkno;
-               OffsetNumber offset,
-                                       maxoff;
-
-               LockBuffer(buffer, GIST_SHARE);
-               p = (Page) BufferGetPage(buffer);
-
-               if (buffer == so->curbuf && XLByteEQ(so->stack->lsn, PageGetLSN(p)))
-               {
-                       /* page unchanged, so all is simple */
-                       offset = ItemPointerGetOffsetNumber(iptr);
-                       PageGetItemId(p, offset)->lp_flags |= LP_DELETE;
-                       SetBufferCommitInfoNeedsSave(buffer);
-                       LockBuffer(buffer, GIST_UNLOCK);
-                       break;
-               }
-
-               maxoff = PageGetMaxOffsetNumber(p);
+               /* page unchanged, so all is simple */
+               offset = ItemPointerGetOffsetNumber(iptr);
+               PageGetItemId(p, offset)->lp_flags |= LP_DELETE;
+               SetBufferCommitInfoNeedsSave(so->curbuf);
+       }
+       else
+       {
+               OffsetNumber maxoff = PageGetMaxOffsetNumber(p);
 
                for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
                {
-                       IndexTuple      ituple = (IndexTuple) PageGetItem(p, PageGetItemId(p, offset));
+                       IndexTuple  ituple = (IndexTuple) PageGetItem(p, PageGetItemId(p, offset));
 
                        if (ItemPointerEquals(&(ituple->t_tid), iptr))
                        {
                                /* found */
                                PageGetItemId(p, offset)->lp_flags |= LP_DELETE;
-                               SetBufferCommitInfoNeedsSave(buffer);
-                               LockBuffer(buffer, GIST_UNLOCK);
-                               if (buffer != so->curbuf)
-                                       ReleaseBuffer(buffer);
-                               return;
+                               SetBufferCommitInfoNeedsSave(so->curbuf);
+                               break;
                        }
                }
-
-               /* follow right link */
-
-               /*
-                * ??? is it good? if tuple dropped by concurrent vacuum, we will read
-                * all leaf pages...
-                */
-               blkno = GistPageGetOpaque(p)->rightlink;
-               LockBuffer(buffer, GIST_UNLOCK);
-               if (buffer != so->curbuf)
-                       ReleaseBuffer(buffer);
-
-               if (blkno == InvalidBlockNumber)
-                       /* can't found, dropped by somebody else */
-                       return;
-               buffer = ReadBuffer(r, blkno);
        }
+
+       LockBuffer(so->curbuf, GIST_UNLOCK);
 }
 
 /*
@@ -146,7 +122,6 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
        GISTSearchStack *stk;
        IndexTuple      it;
        GISTPageOpaque opaque;
-       bool            resetoffset = false;
        int                     ntids = 0;
 
        so = (GISTScanOpaque) scan->opaque;
@@ -171,6 +146,42 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
                return 0;
        }
 
+       /*
+        * check stored pointers from last visit 
+        */
+       if ( so->nPageData > 0 ) 
+       {
+               while( ntids < maxtids && so->curPageData < so->nPageData )
+               {
+                       tids[ ntids ] = scan->xs_ctup.t_self = so->pageData[ so->curPageData ];
+                               
+                       so->curPageData ++;
+                       ntids++;
+               }
+
+               if ( ntids == maxtids )
+                       return ntids;
+               
+               /*
+                * Go to the next page
+                */
+               stk = so->stack->next;
+               pfree(so->stack);
+               so->stack = stk;
+
+               /* If we're out of stack entries, we're done */
+               if (so->stack == NULL)
+               {
+                       ReleaseBuffer(so->curbuf);
+                       so->curbuf = InvalidBuffer;
+                       return ntids;
+               }
+
+               so->curbuf = ReleaseAndReadBuffer(so->curbuf,
+                                                                                 scan->indexRelation,
+                                                                                 stk->block);
+       }
+
        for (;;)
        {
                /* First of all, we need lock buffer */
@@ -178,30 +189,25 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
                LockBuffer(so->curbuf, GIST_SHARE);
                p = BufferGetPage(so->curbuf);
                opaque = GistPageGetOpaque(p);
-               resetoffset = false;
 
-               if (XLogRecPtrIsInvalid(so->stack->lsn) || !XLByteEQ(so->stack->lsn, PageGetLSN(p)))
-               {
-                       /* page changed from last visit or visit first time , reset offset */
-                       so->stack->lsn = PageGetLSN(p);
-                       resetoffset = true;
-
-                       /* check page split, occured from last visit or visit to parent */
-                       if (!XLogRecPtrIsInvalid(so->stack->parentlsn) &&
-                               XLByteLT(so->stack->parentlsn, opaque->nsn) &&
-                               opaque->rightlink != InvalidBlockNumber /* sanity check */ &&
-                               (so->stack->next == NULL || so->stack->next->block != opaque->rightlink)                /* check if already
-                                       added */ )
-                       {
-                               /* detect page split, follow right link to add pages */
+               /* remember lsn to identify page changed for tuple's killing */
+               so->stack->lsn = PageGetLSN(p);
 
-                               stk = (GISTSearchStack *) palloc(sizeof(GISTSearchStack));
-                               stk->next = so->stack->next;
-                               stk->block = opaque->rightlink;
-                               stk->parentlsn = so->stack->parentlsn;
-                               memset(&(stk->lsn), 0, sizeof(GistNSN));
-                               so->stack->next = stk;
-                       }
+               /* check page split, occured from last visit or visit to parent */
+               if (!XLogRecPtrIsInvalid(so->stack->parentlsn) &&
+                       XLByteLT(so->stack->parentlsn, opaque->nsn) &&
+                       opaque->rightlink != InvalidBlockNumber /* sanity check */ &&
+                       (so->stack->next == NULL || so->stack->next->block != opaque->rightlink)                /* check if already
+                                       added */ )
+               {
+                       /* detect page split, follow right link to add pages */
+
+                       stk = (GISTSearchStack *) palloc(sizeof(GISTSearchStack));
+                       stk->next = so->stack->next;
+                       stk->block = opaque->rightlink;
+                       stk->parentlsn = so->stack->parentlsn;
+                       memset(&(stk->lsn), 0, sizeof(GistNSN));
+                       so->stack->next = stk;
                }
 
                /* if page is empty, then just skip it */
@@ -224,24 +230,13 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
                        continue;
                }
 
-               if (!GistPageIsLeaf(p) || resetoffset || ItemPointerIsValid(&scan->currentItemData) == false)
-               {
-                       if (ScanDirectionIsBackward(dir))
-                               n = PageGetMaxOffsetNumber(p);
-                       else
-                               n = FirstOffsetNumber;
-               }
+               if (ScanDirectionIsBackward(dir))
+                       n = PageGetMaxOffsetNumber(p);
                else
-               {
-                       n = ItemPointerGetOffsetNumber(&(scan->currentItemData));
-
-                       if (ScanDirectionIsBackward(dir))
-                               n = OffsetNumberPrev(n);
-                       else
-                               n = OffsetNumberNext(n);
-               }
+                       n = FirstOffsetNumber;
 
                /* wonderfull, we can look at page */
+               so->nPageData = so->curPageData = 0;
 
                for (;;)
                {
@@ -249,6 +244,20 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
 
                        if (!OffsetNumberIsValid(n))
                        {
+                               while( ntids < maxtids && so->curPageData < so->nPageData )
+                               {
+                                       tids[ ntids ] = scan->xs_ctup.t_self = so->pageData[ so->curPageData ];
+                               
+                                       so->curPageData ++;
+                                       ntids++;
+                               }
+
+                               if ( ntids == maxtids )
+                               {
+                                       LockBuffer(so->curbuf, GIST_UNLOCK);
+                                       return ntids;
+                               }
+
                                /*
                                 * We ran out of matching index entries on the current page,
                                 * so pop the top stack entry and use it to continue the
@@ -288,14 +297,8 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
                                if (!(ignore_killed_tuples && ItemIdDeleted(PageGetItemId(p, n))))
                                {
                                        it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
-                                       tids[ntids] = scan->xs_ctup.t_self = it->t_tid;
-                                       ntids++;
-
-                                       if (ntids == maxtids)
-                                       {
-                                               LockBuffer(so->curbuf, GIST_UNLOCK);
-                                               return ntids;
-                                       }
+                                       so->pageData[ so->nPageData ] = it->t_tid;
+                                       so->nPageData ++;
                                }
                        }
                        else
index 97f4d264d1a57e404aa89ccb246a7f512013f927..3604debba0f4e2514812f244f175e0244c61b3a8 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/gist/gistscan.c,v 1.61 2005/09/22 20:44:36 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/gist/gistscan.c,v 1.61.2.1 2008/08/23 10:43:58 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -98,6 +98,7 @@ gistrescan(PG_FUNCTION_ARGS)
                        ReleaseBuffer(so->markbuf);
                        so->markbuf = InvalidBuffer;
                }
+
        }
        else
        {
@@ -113,6 +114,8 @@ gistrescan(PG_FUNCTION_ARGS)
                scan->opaque = so;
        }
 
+       so->nPageData = so->curPageData = 0;
+
        /* Update scan key, if a new one is given */
        if (key && scan->numberOfKeys > 0)
        {
@@ -179,6 +182,11 @@ gistmarkpos(PG_FUNCTION_ARGS)
                so->markbuf = so->curbuf;
        }
 
+       so->markNPageData = so->nPageData;
+       so->markCurPageData = so->curPageData;
+       if ( so->markNPageData > 0 )
+               memcpy( so->markPageData, so->pageData, sizeof(ItemPointerData) * so->markNPageData );          
+
        PG_RETURN_VOID();
 }
 
@@ -228,6 +236,11 @@ gistrestrpos(PG_FUNCTION_ARGS)
                so->curbuf = so->markbuf;
        }
 
+       so->nPageData = so->markNPageData;
+       so->curPageData = so->markNPageData;
+       if ( so->markNPageData > 0 )
+               memcpy( so->pageData, so->markPageData, sizeof(ItemPointerData) * so->markNPageData );          
+
        PG_RETURN_VOID();
 }
 
index 1cfa5b92bc2c1fdb12c123855b4ed6dbeaed03ec..c1861180d24a93030ec96cbdc81909c2cb212e77 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.8 2005/10/15 02:49:42 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.8.2.1 2008/08/23 10:43:58 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -73,6 +73,13 @@ typedef struct GISTScanOpaqueData
        MemoryContext tempCxt;
        Buffer          curbuf;
        Buffer          markbuf;
+
+       ItemPointerData pageData[BLCKSZ/sizeof(IndexTupleData)];
+       OffsetNumber    nPageData;
+       OffsetNumber    curPageData;
+       ItemPointerData markPageData[BLCKSZ/sizeof(IndexTupleData)];
+       OffsetNumber    markNPageData;
+       OffsetNumber    markCurPageData;
 } GISTScanOpaqueData;
 
 typedef GISTScanOpaqueData *GISTScanOpaque;