]> granicus.if.org Git - postgresql/commitdiff
Add the "snapshot too old" feature
authorKevin Grittner <kgrittn@postgresql.org>
Fri, 8 Apr 2016 19:36:30 +0000 (14:36 -0500)
committerKevin Grittner <kgrittn@postgresql.org>
Fri, 8 Apr 2016 19:36:30 +0000 (14:36 -0500)
This feature is controlled by a new old_snapshot_threshold GUC.  A
value of -1 disables the feature, and that is the default.  The
value of 0 is just intended for testing.  Above that it is the
number of minutes a snapshot can reach before pruning and vacuum
are allowed to remove dead tuples which the snapshot would
otherwise protect.  The xmin associated with a transaction ID does
still protect dead tuples.  A connection which is using an "old"
snapshot does not get an error unless it accesses a page modified
recently enough that it might not be able to produce accurate
results.

This is similar to the Oracle feature, and we use the same SQLSTATE
and error message for compatibility.

41 files changed:
contrib/bloom/blscan.c
doc/src/sgml/config.sgml
src/backend/access/brin/brin.c
src/backend/access/brin/brin_revmap.c
src/backend/access/gin/ginbtree.c
src/backend/access/gin/gindatapage.c
src/backend/access/gin/ginget.c
src/backend/access/gin/gininsert.c
src/backend/access/gist/gistget.c
src/backend/access/hash/hash.c
src/backend/access/hash/hashsearch.c
src/backend/access/heap/heapam.c
src/backend/access/heap/pruneheap.c
src/backend/access/nbtree/nbtinsert.c
src/backend/access/nbtree/nbtpage.c
src/backend/access/nbtree/nbtsearch.c
src/backend/access/spgist/spgscan.c
src/backend/commands/vacuum.c
src/backend/commands/vacuumlazy.c
src/backend/storage/buffer/bufmgr.c
src/backend/storage/ipc/ipci.c
src/backend/storage/ipc/procarray.c
src/backend/storage/lmgr/lwlocknames.txt
src/backend/utils/errcodes.txt
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/backend/utils/time/snapmgr.c
src/include/access/brin_revmap.h
src/include/access/gin_private.h
src/include/access/nbtree.h
src/include/storage/bufmgr.h
src/include/utils/rel.h
src/include/utils/snapmgr.h
src/include/utils/snapshot.h
src/test/modules/Makefile
src/test/modules/snapshot_too_old/Makefile [new file with mode: 0644]
src/test/modules/snapshot_too_old/expected/sto_using_cursor.out [new file with mode: 0644]
src/test/modules/snapshot_too_old/expected/sto_using_select.out [new file with mode: 0644]
src/test/modules/snapshot_too_old/specs/sto_using_cursor.spec [new file with mode: 0644]
src/test/modules/snapshot_too_old/specs/sto_using_select.spec [new file with mode: 0644]
src/test/modules/snapshot_too_old/sto.conf [new file with mode: 0644]

index ae937f667101951247390f2cecc839b4503fd07f..e75ed3d6136acf026c153e6e23fbf71b9d457c7c 100644 (file)
@@ -138,7 +138,8 @@ blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
                                                                        blkno, RBM_NORMAL, bas);
 
                LockBuffer(buffer, BUFFER_LOCK_SHARE);
-               page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               page = BufferGetPage(buffer, scan->xs_snapshot, scan->indexRelation,
+                                                        BGP_TEST_FOR_OLD_SNAPSHOT);
 
                if (!BloomPageIsDeleted(page))
                {
index 2f04702b6e531ba51a911a56e6805ed41034757f..f9ba1487822e9f542961591ab2c0f466c8f8d239 100644 (file)
@@ -2041,6 +2041,42 @@ include_dir 'conf.d'
         </para>
        </listitem>
       </varlistentry>
+
+      <varlistentry id="guc-old-snapshot-threshold" xreflabel="old_snapshot_threshold">
+       <term><varname>old_snapshot_threshold</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>old_snapshot_threshold</> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Sets the minimum time that a snapshot can be used without risk of a
+         <literal>snapshot too old</> error occurring when using the snapshot.
+         This parameter can only be set at server start.
+        </para>
+
+        <para>
+         Beyond the threshold, old data may be vacuumed away.  This can help
+         prevent bloat in the face of snapshots which remain in use for a
+         long time.  To prevent incorrect results due to cleanup of data which
+         would otherwise be visible to the snapshot, an error is generated
+         when the snapshot is older than this threshold and the snapshot is
+         used to read a page which has been modified since the snapshot was
+         built.
+        </para>
+
+        <para>
+         A value of <literal>-1</> disables this feature, and is the default.
+         Useful values for production work probably range from a small number
+         of hours to a few days.  The setting will be coerced to a granularity
+         of minutes, and small numbers (such as <literal>0</> or
+         <literal>1min</>) are only allowed because they may sometimes be
+         useful for testing.  While a setting as high as <literal>60d</> is
+         allowed, please note that in many workloads extreme bloat or
+         transaction ID wraparound may occur in much shorter time frames.
+        </para>
+       </listitem>
+      </varlistentry>
      </variablelist>
     </sect2>
    </sect1>
@@ -3051,6 +3087,10 @@ include_dir 'conf.d'
         You should also consider setting <varname>hot_standby_feedback</>
         on standby server(s) as an alternative to using this parameter.
        </para>
+       <para>
+        This does not prevent cleanup of dead rows which have reached the age
+        specified by <varname>old_snapshot_threshold</>.
+       </para>
       </listitem>
      </varlistentry>
 
@@ -3198,6 +3238,16 @@ include_dir 'conf.d'
         until it eventually reaches the primary.  Standbys make no other use
         of feedback they receive other than to pass upstream.
        </para>
+       <para>
+        This setting does not override the behavior of
+        <varname>old_snapshot_threshold</> on the primary; a snapshot on the
+        standby which exceeds the primary's age threshold can become invalid,
+        resulting in cancellation of transactions on the standby.  This is
+        because <varname>old_snapshot_threshold</> is intended to provide an
+        absolute limit on the time which dead rows can contribute to bloat,
+        which would otherwise be violated because of the configuration of a
+        standby.
+       </para>
       </listitem>
      </varlistentry>
 
index 6f6f1b1b4155e79bc0c0fd8708882b7b5a6b64d6..e64c94d35699aecc4f3b9d33517263e29c75cf84 100644 (file)
@@ -135,7 +135,7 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
        MemoryContext tupcxt = NULL;
        MemoryContext oldcxt = NULL;
 
-       revmap = brinRevmapInitialize(idxRel, &pagesPerRange);
+       revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
 
        for (;;)
        {
@@ -152,7 +152,7 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
                /* normalize the block number to be the first block in the range */
                heapBlk = (heapBlk / pagesPerRange) * pagesPerRange;
                brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
-                                                                                BUFFER_LOCK_SHARE);
+                                                                                BUFFER_LOCK_SHARE, NULL);
 
                /* if range is unsummarized, there's nothing to do */
                if (!brtup)
@@ -285,7 +285,8 @@ brinbeginscan(Relation r, int nkeys, int norderbys)
        scan = RelationGetIndexScan(r, nkeys, norderbys);
 
        opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
-       opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange);
+       opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
+                                                                                          scan->xs_snapshot);
        opaque->bo_bdesc = brin_build_desc(r);
        scan->opaque = opaque;
 
@@ -368,7 +369,8 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
                MemoryContextResetAndDeleteChildren(perRangeCxt);
 
                tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
-                                                                          &off, &size, BUFFER_LOCK_SHARE);
+                                                                          &off, &size, BUFFER_LOCK_SHARE,
+                                                                          scan->xs_snapshot);
                if (tup)
                {
                        tup = brin_copy_tuple(tup, size);
@@ -647,7 +649,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
        /*
         * Initialize our state, including the deformed tuple state.
         */
-       revmap = brinRevmapInitialize(index, &pagesPerRange);
+       revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
        state = initialize_brin_buildstate(index, revmap, pagesPerRange);
 
        /*
@@ -1045,7 +1047,8 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
                 * the same.)
                 */
                phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
-                                                                                &offset, &phsz, BUFFER_LOCK_SHARE);
+                                                                                &offset, &phsz, BUFFER_LOCK_SHARE,
+                                                                                NULL);
                /* the placeholder tuple must exist */
                if (phtup == NULL)
                        elog(ERROR, "missing placeholder tuple");
@@ -1080,7 +1083,7 @@ brinsummarize(Relation index, Relation heapRel, double *numSummarized,
        BlockNumber pagesPerRange;
        Buffer          buf;
 
-       revmap = brinRevmapInitialize(index, &pagesPerRange);
+       revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
 
        /*
         * Scan the revmap to find unsummarized items.
@@ -1095,7 +1098,7 @@ brinsummarize(Relation index, Relation heapRel, double *numSummarized,
                CHECK_FOR_INTERRUPTS();
 
                tup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
-                                                                          BUFFER_LOCK_SHARE);
+                                                                          BUFFER_LOCK_SHARE, NULL);
                if (tup == NULL)
                {
                        /* no revmap entry for this heap range. Summarize it. */
index ce21cbabb7afe9723726457d7d805a6280298949..5ed867cf30bfdfd93dc3f23db25c57f50ed698e1 100644 (file)
@@ -68,7 +68,8 @@ static void revmap_physical_extend(BrinRevmap *revmap);
  * brinRevmapTerminate when caller is done with it.
  */
 BrinRevmap *
-brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange)
+brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange,
+                                        Snapshot snapshot)
 {
        BrinRevmap *revmap;
        Buffer          meta;
@@ -77,7 +78,7 @@ brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange)
 
        meta = ReadBuffer(idxrel, BRIN_METAPAGE_BLKNO);
        LockBuffer(meta, BUFFER_LOCK_SHARE);
-       page = BufferGetPage(meta, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+       page = BufferGetPage(meta, snapshot, idxrel, BGP_TEST_FOR_OLD_SNAPSHOT);
        metadata = (BrinMetaPageData *) PageGetContents(page);
 
        revmap = palloc(sizeof(BrinRevmap));
@@ -187,7 +188,8 @@ brinSetHeapBlockItemptr(Buffer buf, BlockNumber pagesPerRange,
  */
 BrinTuple *
 brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk,
-                                                Buffer *buf, OffsetNumber *off, Size *size, int mode)
+                                                Buffer *buf, OffsetNumber *off, Size *size, int mode,
+                                                Snapshot snapshot)
 {
        Relation        idxRel = revmap->rm_irel;
        BlockNumber mapBlk;
@@ -264,7 +266,8 @@ brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk,
                        *buf = ReadBuffer(idxRel, blk);
                }
                LockBuffer(*buf, mode);
-               page = BufferGetPage(*buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               page = BufferGetPage(*buf, snapshot, idxRel,
+                                                        BGP_TEST_FOR_OLD_SNAPSHOT);
 
                /* If we land on a revmap page, start over */
                if (BRIN_IS_REGULAR_PAGE(page))
index 13258cca0ea798c2a6bab8ca732b1c5b651ae321..e593b2bbe9938ca5b2d160e57a16eb7e1ecaaef8 100644 (file)
@@ -71,7 +71,7 @@ ginTraverseLock(Buffer buffer, bool searchMode)
  * is share-locked, and stack->parent is NULL.
  */
 GinBtreeStack *
-ginFindLeafPage(GinBtree btree, bool searchMode)
+ginFindLeafPage(GinBtree btree, bool searchMode, Snapshot snapshot)
 {
        GinBtreeStack *stack;
 
@@ -89,7 +89,8 @@ ginFindLeafPage(GinBtree btree, bool searchMode)
 
                stack->off = InvalidOffsetNumber;
 
-               page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               page = BufferGetPage(stack->buffer, snapshot, btree->index,
+                                                        BGP_TEST_FOR_OLD_SNAPSHOT);
 
                access = ginTraverseLock(stack->buffer, searchMode);
 
@@ -115,8 +116,8 @@ ginFindLeafPage(GinBtree btree, bool searchMode)
 
                        stack->buffer = ginStepRight(stack->buffer, btree->index, access);
                        stack->blkno = rightlink;
-                       page = BufferGetPage(stack->buffer, NULL, NULL,
-                                                                BGP_NO_SNAPSHOT_TEST);
+                       page = BufferGetPage(stack->buffer, snapshot, btree->index,
+                                                                BGP_TEST_FOR_OLD_SNAPSHOT);
 
                        if (!searchMode && GinPageIsIncompleteSplit(page))
                                ginFinishSplit(btree, stack, false, NULL);
index 9c501a1af5f17fb119220587a054ea717b1dfc41..ed3d9174f65f54b8ac453e34bd8132a14300a6b1 100644 (file)
@@ -1820,7 +1820,7 @@ ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
        {
                /* search for the leaf page where the first item should go to */
                btree.itemptr = insertdata.items[insertdata.curitem];
-               stack = ginFindLeafPage(&btree, false);
+               stack = ginFindLeafPage(&btree, false, NULL);
 
                ginInsertValue(&btree, stack, &insertdata, buildStats);
        }
@@ -1830,7 +1830,8 @@ ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
  * Starts a new scan on a posting tree.
  */
 GinBtreeStack *
-ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno)
+ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno,
+                                               Snapshot snapshot)
 {
        GinBtreeStack *stack;
 
@@ -1838,7 +1839,7 @@ ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno)
 
        btree->fullScan = TRUE;
 
-       stack = ginFindLeafPage(btree, TRUE);
+       stack = ginFindLeafPage(btree, TRUE, snapshot);
 
        return stack;
 }
index 33683278e100135b872d67709d546ea2fc3c3755..b79ba1e62afccb026f48a26d1589f021b231687b 100644 (file)
@@ -73,7 +73,7 @@ scanPostingTree(Relation index, GinScanEntry scanEntry,
        Page            page;
 
        /* Descend to the leftmost leaf page */
-       stack = ginScanBeginPostingTree(&btree, index, rootPostingTree);
+       stack = ginScanBeginPostingTree(&btree, index, rootPostingTree, snapshot);
        buffer = stack->buffer;
        IncrBufferRefCount(buffer); /* prevent unpin in freeGinBtreeStack */
 
@@ -146,7 +146,8 @@ collectMatchBitmap(GinBtreeData *btree, GinBtreeStack *stack,
                if (moveRightIfItNeeded(btree, stack) == false)
                        return true;
 
-               page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               page = BufferGetPage(stack->buffer, snapshot, btree->index,
+                                                        BGP_TEST_FOR_OLD_SNAPSHOT);
                itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, stack->off));
 
                /*
@@ -320,7 +321,7 @@ restartScanEntry:
        ginPrepareEntryScan(&btreeEntry, entry->attnum,
                                                entry->queryKey, entry->queryCategory,
                                                ginstate);
-       stackEntry = ginFindLeafPage(&btreeEntry, true);
+       stackEntry = ginFindLeafPage(&btreeEntry, true, snapshot);
        page = BufferGetPage(stackEntry->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
        needUnlock = TRUE;
 
@@ -385,7 +386,7 @@ restartScanEntry:
                        needUnlock = FALSE;
 
                        stack = ginScanBeginPostingTree(&entry->btree, ginstate->index,
-                                                                                       rootPostingTree);
+                                                                                       rootPostingTree, snapshot);
                        entry->buffer = stack->buffer;
 
                        /*
@@ -627,7 +628,7 @@ entryLoadMoreItems(GinState *ginstate, GinScanEntry entry,
                        entry->btree.itemptr.ip_posid++;
                }
                entry->btree.fullScan = false;
-               stack = ginFindLeafPage(&entry->btree, true);
+               stack = ginFindLeafPage(&entry->btree, true, snapshot);
 
                /* we don't need the stack, just the buffer. */
                entry->buffer = stack->buffer;
@@ -1335,8 +1336,8 @@ scanGetCandidate(IndexScanDesc scan, pendingPosition *pos)
        ItemPointerSetInvalid(&pos->item);
        for (;;)
        {
-               page = BufferGetPage(pos->pendingBuffer, NULL,
-                                                        NULL, BGP_NO_SNAPSHOT_TEST);
+               page = BufferGetPage(pos->pendingBuffer, scan->xs_snapshot,
+                                                        scan->indexRelation, BGP_TEST_FOR_OLD_SNAPSHOT);
 
                maxoff = PageGetMaxOffsetNumber(page);
                if (pos->firstOffset > maxoff)
@@ -1516,8 +1517,8 @@ collectMatchesForHeapRow(IndexScanDesc scan, pendingPosition *pos)
                memset(datumExtracted + pos->firstOffset - 1, 0,
                           sizeof(bool) * (pos->lastOffset - pos->firstOffset));
 
-               page = BufferGetPage(pos->pendingBuffer, NULL,
-                                                        NULL, BGP_NO_SNAPSHOT_TEST);
+               page = BufferGetPage(pos->pendingBuffer, scan->xs_snapshot,
+                                                        scan->indexRelation, BGP_TEST_FOR_OLD_SNAPSHOT);
 
                for (i = 0; i < so->nkeys; i++)
                {
@@ -1710,7 +1711,8 @@ scanPendingInsert(IndexScanDesc scan, TIDBitmap *tbm, int64 *ntids)
        *ntids = 0;
 
        LockBuffer(metabuffer, GIN_SHARE);
-       page = BufferGetPage(metabuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+       page = BufferGetPage(metabuffer, scan->xs_snapshot, scan->indexRelation,
+                                                BGP_TEST_FOR_OLD_SNAPSHOT);
        blkno = GinPageGetMeta(page)->head;
 
        /*
index 126501149d25f2b73ec8011fc99b2a7933a86cdf..d4bfed06bcc2a710fb49d2e1d158f6f079fc3f23 100644 (file)
@@ -192,7 +192,7 @@ ginEntryInsert(GinState *ginstate,
 
        ginPrepareEntryScan(&btree, attnum, key, category, ginstate);
 
-       stack = ginFindLeafPage(&btree, false);
+       stack = ginFindLeafPage(&btree, false, NULL);
        page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
 
        if (btree.findItem(&btree, stack))
index 13a039947ba0748bfa31cb121fb524a265f6ff3a..24af868466d822a31b60d77d7d054c35b884b61a 100644 (file)
@@ -336,7 +336,7 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
        buffer = ReadBuffer(scan->indexRelation, pageItem->blkno);
        LockBuffer(buffer, GIST_SHARE);
        gistcheckpage(scan->indexRelation, buffer);
-       page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+       page = BufferGetPage(buffer, scan->xs_snapshot, r, BGP_TEST_FOR_OLD_SNAPSHOT);
        opaque = GistPageGetOpaque(page);
 
        /*
index a5032e1251df741a3bfaeef8aa8ffe1afdc5fb2c..03cd0b006c3fc02cdb716930aba5ef0db0098202 100644 (file)
@@ -278,7 +278,8 @@ hashgettuple(IndexScanDesc scan, ScanDirection dir)
 
                buf = so->hashso_curbuf;
                Assert(BufferIsValid(buf));
-               page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               page = BufferGetPage(buf, scan->xs_snapshot, rel,
+                                                        BGP_TEST_FOR_OLD_SNAPSHOT);
                maxoffnum = PageGetMaxOffsetNumber(page);
                for (offnum = ItemPointerGetOffsetNumber(current);
                         offnum <= maxoffnum;
index dd1f464e53addfa00e02a00615f2cfa3c07bc898..4c14362c6fe0238d7d8f906f178a5ade8e84be2d 100644 (file)
@@ -188,8 +188,8 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
 
        /* Read the metapage */
        metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
-       page = BufferGetPage(metabuf, NULL, NULL,
-                                                BGP_NO_SNAPSHOT_TEST);
+       page = BufferGetPage(metabuf, scan->xs_snapshot, rel,
+                                                BGP_TEST_FOR_OLD_SNAPSHOT);
        metap = HashPageGetMeta(page);
 
        /*
@@ -242,8 +242,8 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
 
        /* Fetch the primary bucket page for the bucket */
        buf = _hash_getbuf(rel, blkno, HASH_READ, LH_BUCKET_PAGE);
-       page = BufferGetPage(buf, NULL, NULL,
-                                                BGP_NO_SNAPSHOT_TEST);
+       page = BufferGetPage(buf, scan->xs_snapshot, rel,
+                                                BGP_TEST_FOR_OLD_SNAPSHOT);
        opaque = (HashPageOpaque) PageGetSpecialPointer(page);
        Assert(opaque->hasho_bucket == bucket);
 
@@ -350,6 +350,7 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
                                        _hash_readnext(rel, &buf, &page, &opaque);
                                        if (BufferIsValid(buf))
                                        {
+                                               TestForOldSnapshot(scan->xs_snapshot, rel, page);
                                                maxoff = PageGetMaxOffsetNumber(page);
                                                offnum = _hash_binsearch(page, so->hashso_sk_hash);
                                        }
@@ -391,6 +392,7 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
                                        _hash_readprev(rel, &buf, &page, &opaque);
                                        if (BufferIsValid(buf))
                                        {
+                                               TestForOldSnapshot(scan->xs_snapshot, rel, page);
                                                maxoff = PageGetMaxOffsetNumber(page);
                                                offnum = _hash_binsearch_last(page, so->hashso_sk_hash);
                                        }
index 66b23540fe29f556a985e2dc3fadf226dad4ddc0..29fd31a819d278c8e052a398c56fd6f4294614e9 100644 (file)
@@ -394,7 +394,8 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
         */
        LockBuffer(buffer, BUFFER_LOCK_SHARE);
 
-       dp = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+       dp = BufferGetPage(buffer, snapshot, scan->rs_rd,
+                                          BGP_TEST_FOR_OLD_SNAPSHOT);
        lines = PageGetMaxOffsetNumber(dp);
        ntup = 0;
 
@@ -537,7 +538,7 @@ heapgettup(HeapScanDesc scan,
 
                LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 
-               dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd, BGP_TEST_FOR_OLD_SNAPSHOT);
                lines = PageGetMaxOffsetNumber(dp);
                /* page and lineoff now reference the physically next tid */
 
@@ -582,7 +583,8 @@ heapgettup(HeapScanDesc scan,
 
                LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 
-               dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd,
+                                                  BGP_TEST_FOR_OLD_SNAPSHOT);
                lines = PageGetMaxOffsetNumber(dp);
 
                if (!scan->rs_inited)
@@ -616,7 +618,8 @@ heapgettup(HeapScanDesc scan,
                        heapgetpage(scan, page);
 
                /* Since the tuple was previously fetched, needn't lock page here */
-               dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd,
+                                                  BGP_TEST_FOR_OLD_SNAPSHOT);
                lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
                lpp = PageGetItemId(dp, lineoff);
                Assert(ItemIdIsNormal(lpp));
@@ -745,7 +748,8 @@ heapgettup(HeapScanDesc scan,
 
                LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 
-               dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd,
+                                                  BGP_TEST_FOR_OLD_SNAPSHOT);
                lines = PageGetMaxOffsetNumber((Page) dp);
                linesleft = lines;
                if (backward)
@@ -832,7 +836,8 @@ heapgettup_pagemode(HeapScanDesc scan,
                        lineindex = scan->rs_cindex + 1;
                }
 
-               dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd,
+                                                  BGP_TEST_FOR_OLD_SNAPSHOT);
                lines = scan->rs_ntuples;
                /* page and lineindex now reference the next visible tid */
 
@@ -875,7 +880,8 @@ heapgettup_pagemode(HeapScanDesc scan,
                        page = scan->rs_cblock;         /* current page */
                }
 
-               dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd,
+                                                  BGP_TEST_FOR_OLD_SNAPSHOT);
                lines = scan->rs_ntuples;
 
                if (!scan->rs_inited)
@@ -908,7 +914,8 @@ heapgettup_pagemode(HeapScanDesc scan,
                        heapgetpage(scan, page);
 
                /* Since the tuple was previously fetched, needn't lock page here */
-               dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd,
+                                                  BGP_TEST_FOR_OLD_SNAPSHOT);
                lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
                lpp = PageGetItemId(dp, lineoff);
                Assert(ItemIdIsNormal(lpp));
@@ -1027,7 +1034,8 @@ heapgettup_pagemode(HeapScanDesc scan,
 
                heapgetpage(scan, page);
 
-               dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd,
+                                                  BGP_TEST_FOR_OLD_SNAPSHOT);
                lines = scan->rs_ntuples;
                linesleft = lines;
                if (backward)
@@ -1871,7 +1879,7 @@ heap_fetch(Relation relation,
         * Need share lock on buffer to examine tuple commit status.
         */
        LockBuffer(buffer, BUFFER_LOCK_SHARE);
-       page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+       page = BufferGetPage(buffer, snapshot, relation, BGP_TEST_FOR_OLD_SNAPSHOT);
 
        /*
         * We'd better check for out-of-range offnum in case of VACUUM since the
@@ -2200,7 +2208,8 @@ heap_get_latest_tid(Relation relation,
                 */
                buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&ctid));
                LockBuffer(buffer, BUFFER_LOCK_SHARE);
-               page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               page = BufferGetPage(buffer, snapshot, relation,
+                                                        BGP_TEST_FOR_OLD_SNAPSHOT);
 
                /*
                 * Check for bogus item number.  This is not treated as an error
index 19201b0bca5c9be7f2a36bb838414345aabe8385..ce073ccdc2359933ee6e8d23a549196dffcf6051 100644 (file)
@@ -92,12 +92,21 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
         * need to use the horizon that includes slots, otherwise the data-only
         * horizon can be used. Note that the toast relation of user defined
         * relations are *not* considered catalog relations.
+        *
+        * It is OK to apply the old snapshot limit before acquiring the cleanup
+        * lock because the worst that can happen is that we are not quite as
+        * aggressive about the cleanup (by however many transaction IDs are
+        * consumed between this point and acquiring the lock).  This allows us to
+        * save significant overhead in the case where the page is found not to be
+        * prunable.
         */
        if (IsCatalogRelation(relation) ||
                RelationIsAccessibleInLogicalDecoding(relation))
                OldestXmin = RecentGlobalXmin;
        else
-               OldestXmin = RecentGlobalDataXmin;
+               OldestXmin =
+                               TransactionIdLimitedForOldSnapshots(RecentGlobalDataXmin,
+                                                                                                       relation);
 
        Assert(TransactionIdIsValid(OldestXmin));
 
index bf7a817551771151f49d63dde592d3c2bf9716e4..3796656e177e3e985f8d01d78eb756bd6e94b117 100644 (file)
@@ -119,7 +119,7 @@ _bt_doinsert(Relation rel, IndexTuple itup,
 
 top:
        /* find the first page containing this key */
-       stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
+       stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE, NULL);
 
        offset = InvalidOffsetNumber;
 
@@ -135,7 +135,7 @@ top:
         * precise description.
         */
        buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
-                                               true, stack, BT_WRITE);
+                                               true, stack, BT_WRITE, NULL);
 
        /*
         * If we're not allowing duplicates, make sure the key isn't already in
@@ -1682,7 +1682,8 @@ _bt_insert_parent(Relation rel,
                        elog(DEBUG2, "concurrent ROOT page split");
                        lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
                        /* Find the leftmost page at the next level up */
-                       pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
+                       pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false,
+                                                                       NULL);
                        /* Set up a phony stack entry pointing there */
                        stack = &fakestack;
                        stack->bts_blkno = BufferGetBlockNumber(pbuf);
index 36b18047615026e0f9cc7e3286463b4787563aa3..9ba61d5fe130ec5330149da4cb7938d133eb8146 100644 (file)
@@ -1255,7 +1255,7 @@ _bt_pagedel(Relation rel, Buffer buf)
                                itup_scankey = _bt_mkscankey(rel, targetkey);
                                /* find the leftmost leaf page containing this key */
                                stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
-                                                                  false, &lbuf, BT_READ);
+                                                                  false, &lbuf, BT_READ, NULL);
                                /* don't need a pin on the page */
                                _bt_relbuf(rel, lbuf);
 
index 83f790f79175a543b7f25e11b07bcec51bbbc967..470bab0c521e855dfefc5493d3d8104d48337482 100644 (file)
@@ -79,6 +79,10 @@ _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp)
  * address of the leaf-page buffer, which is read-locked and pinned.
  * No locks are held on the parent pages, however!
  *
+ * If the snapshot parameter is not NULL, "old snapshot" checking will take
+ * place during the descent through the tree.  This is not needed when
+ * positioning for an insert or delete, so NULL is used for those cases.
+ *
  * NOTE that the returned buffer is read-locked regardless of the access
  * parameter.  However, access = BT_WRITE will allow an empty root page
  * to be created and returned.  When access = BT_READ, an empty index
@@ -87,7 +91,7 @@ _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp)
  */
 BTStack
 _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
-                  Buffer *bufP, int access)
+                  Buffer *bufP, int access, Snapshot snapshot)
 {
        BTStack         stack_in = NULL;
 
@@ -126,7 +130,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
                 */
                *bufP = _bt_moveright(rel, *bufP, keysz, scankey, nextkey,
                                                          (access == BT_WRITE), stack_in,
-                                                         BT_READ);
+                                                         BT_READ, snapshot);
 
                /* if this is a leaf page, we're done */
                page = BufferGetPage(*bufP, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
@@ -199,6 +203,10 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
  * On entry, we have the buffer pinned and a lock of the type specified by
  * 'access'.  If we move right, we release the buffer and lock and acquire
  * the same on the right sibling.  Return value is the buffer we stop at.
+ *
+ * If the snapshot parameter is not NULL, "old snapshot" checking will take
+ * place during the descent through the tree.  This is not needed when
+ * positioning for an insert or delete, so NULL is used for those cases.
  */
 Buffer
 _bt_moveright(Relation rel,
@@ -208,7 +216,8 @@ _bt_moveright(Relation rel,
                          bool nextkey,
                          bool forupdate,
                          BTStack stack,
-                         int access)
+                         int access,
+                         Snapshot snapshot)
 {
        Page            page;
        BTPageOpaque opaque;
@@ -233,7 +242,7 @@ _bt_moveright(Relation rel,
 
        for (;;)
        {
-               page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT);
                opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
                if (P_RIGHTMOST(opaque))
@@ -972,7 +981,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
         * Use the manufactured insertion scan key to descend the tree and
         * position ourselves on the target leaf page.
         */
-       stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ);
+       stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ,
+                                          scan->xs_snapshot);
 
        /* don't need to keep the stack around... */
        _bt_freestack(stack);
@@ -1337,8 +1347,8 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
                        /* step right one page */
                        so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
                        /* check for deleted page */
-                       page = BufferGetPage(so->currPos.buf, NULL, NULL,
-                                                                BGP_NO_SNAPSHOT_TEST);
+                       page = BufferGetPage(so->currPos.buf, scan->xs_snapshot, rel,
+                                                                BGP_TEST_FOR_OLD_SNAPSHOT);
                        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
                        if (!P_IGNORE(opaque))
                        {
@@ -1412,8 +1422,8 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
                         * it's not half-dead and contains matching tuples. Else loop back
                         * and do it all again.
                         */
-                       page = BufferGetPage(so->currPos.buf, NULL, NULL,
-                                                                BGP_NO_SNAPSHOT_TEST);
+                       page = BufferGetPage(so->currPos.buf, scan->xs_snapshot, rel,
+                                                                BGP_TEST_FOR_OLD_SNAPSHOT);
                        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
                        if (!P_IGNORE(opaque))
                        {
@@ -1476,7 +1486,7 @@ _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot)
                /* check for interrupts while we're not holding any buffer lock */
                CHECK_FOR_INTERRUPTS();
                buf = _bt_getbuf(rel, blkno, BT_READ);
-               page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT);
                opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
                /*
@@ -1502,14 +1512,14 @@ _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot)
                                break;
                        blkno = opaque->btpo_next;
                        buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
-                       page = BufferGetPage(buf, NULL, NULL,
-                                                                BGP_NO_SNAPSHOT_TEST);
+                       page = BufferGetPage(buf, snapshot, rel,
+                                                                BGP_TEST_FOR_OLD_SNAPSHOT);
                        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
                }
 
                /* Return to the original page to see what's up */
                buf = _bt_relandgetbuf(rel, buf, obknum, BT_READ);
-               page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT);
                opaque = (BTPageOpaque) PageGetSpecialPointer(page);
                if (P_ISDELETED(opaque))
                {
@@ -1526,8 +1536,8 @@ _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot)
                                                 RelationGetRelationName(rel));
                                blkno = opaque->btpo_next;
                                buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
-                               page = BufferGetPage(buf, NULL, NULL,
-                                                                        BGP_NO_SNAPSHOT_TEST);
+                               page = BufferGetPage(buf, snapshot, rel,
+                                                                        BGP_TEST_FOR_OLD_SNAPSHOT);
                                opaque = (BTPageOpaque) PageGetSpecialPointer(page);
                                if (!P_ISDELETED(opaque))
                                        break;
@@ -1564,7 +1574,8 @@ _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot)
  * The returned buffer is pinned and read-locked.
  */
 Buffer
-_bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
+_bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
+                                Snapshot snapshot)
 {
        Buffer          buf;
        Page            page;
@@ -1586,7 +1597,7 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
        if (!BufferIsValid(buf))
                return InvalidBuffer;
 
-       page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+       page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT);
        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
        for (;;)
@@ -1605,8 +1616,8 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
                                elog(ERROR, "fell off the end of index \"%s\"",
                                         RelationGetRelationName(rel));
                        buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
-                       page = BufferGetPage(buf, NULL, NULL,
-                                                                BGP_NO_SNAPSHOT_TEST);
+                       page = BufferGetPage(buf, snapshot, rel,
+                                                                BGP_TEST_FOR_OLD_SNAPSHOT);
                        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
                }
 
@@ -1659,7 +1670,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
         * version of _bt_search().  We don't maintain a stack since we know we
         * won't need it.
         */
-       buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir));
+       buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir), scan->xs_snapshot);
 
        if (!BufferIsValid(buf))
        {
index fafdca31f3965f02675e8412961bedfcd0645a26..7acd71a2911e7610478811a944ca6889b5c4b864 100644 (file)
@@ -341,7 +341,7 @@ redirect:
                }
                /* else new pointer points to the same page, no work needed */
 
-               page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               page = BufferGetPage(buffer, snapshot, index, BGP_TEST_FOR_OLD_SNAPSHOT);
 
                isnull = SpGistPageStoresNulls(page) ? true : false;
 
index 4cb4acf33a505c9fae6e315423eb9667b95de808..93361a0c99c964a506c87e36878e0f3ce94f787e 100644 (file)
@@ -489,7 +489,8 @@ vacuum_set_xid_limits(Relation rel,
         * working on a particular table at any time, and that each vacuum is
         * always an independent transaction.
         */
-       *oldestXmin = GetOldestXmin(rel, true);
+       *oldestXmin =
+               TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel);
 
        Assert(TransactionIdIsNormal(*oldestXmin));
 
index 3f48ef40701ee1e4b5ae82ce487b076f88286fbf..d0e92b336580e1974a87ca17cda834d38dafec36 100644 (file)
@@ -1660,7 +1660,8 @@ should_attempt_truncation(LVRelStats *vacrelstats)
        possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
        if (possibly_freeable > 0 &&
                (possibly_freeable >= REL_TRUNCATE_MINIMUM ||
-                possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION))
+                possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION) &&
+               old_snapshot_threshold < 0)
                return true;
        else
                return false;
index 9874c3eaa04477fd824c56400d845499ab6d2b1d..c664984d0a11b9f75a71bbcd4612843bbd777efd 100644 (file)
@@ -4114,3 +4114,43 @@ IssuePendingWritebacks(WritebackContext *context)
 
        context->nr_pending = 0;
 }
+
+
+/*
+ * Check whether the given snapshot is too old to have safely read the given
+ * page from the given table.  If so, throw a "snapshot too old" error.
+ *
+ * This test generally needs to be performed after every BufferGetPage() call
+ * that is executed as part of a scan.  It is not needed for calls made for
+ * modifying the page (for example, to position to the right place to insert a
+ * new index tuple or for vacuuming).  To minimize errors of omission, the
+ * BufferGetPage() macro accepts parameters to specify whether the test should
+ * be run, and supply the necessary snapshot and relation parameters.  See the
+ * declaration of BufferGetPage() for more details.
+ *
+ * Note that a NULL snapshot argument is allowed and causes a fast return
+ * without error; this is to support call sites which can be called from
+ * either scans or index modification areas.
+ *
+ * For best performance, keep the tests that are fastest and/or most likely to
+ * exclude a page from old snapshot testing near the front.
+ */
+extern Page
+TestForOldSnapshot(Snapshot snapshot, Relation relation, Page page)
+{
+       Assert(relation != NULL);
+
+       if (old_snapshot_threshold >= 0
+        && (snapshot) != NULL
+        && (snapshot)->satisfies == HeapTupleSatisfiesMVCC
+        && !XLogRecPtrIsInvalid((snapshot)->lsn)
+        && PageGetLSN(page) > (snapshot)->lsn
+        && !IsCatalogRelation(relation)
+        && !RelationIsAccessibleInLogicalDecoding(relation)
+        && (snapshot)->whenTaken < GetOldSnapshotThresholdTimestamp())
+               ereport(ERROR,
+                               (errcode(ERRCODE_SNAPSHOT_TOO_OLD),
+                                errmsg("snapshot too old")));
+
+       return page;
+}
index 36a04fc57089370f9bc9d3ede266b2e0cf95580e..c04b17fa8ead59f8190a3fa7d880197e9cff9235 100644 (file)
@@ -43,6 +43,7 @@
 #include "storage/procsignal.h"
 #include "storage/sinvaladt.h"
 #include "storage/spin.h"
+#include "utils/snapmgr.h"
 
 
 shmem_startup_hook_type shmem_startup_hook = NULL;
@@ -136,6 +137,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
                size = add_size(size, ReplicationOriginShmemSize());
                size = add_size(size, WalSndShmemSize());
                size = add_size(size, WalRcvShmemSize());
+               size = add_size(size, SnapMgrShmemSize());
                size = add_size(size, BTreeShmemSize());
                size = add_size(size, SyncScanShmemSize());
                size = add_size(size, AsyncShmemSize());
@@ -247,6 +249,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
        /*
         * Set up other modules that need some shared memory space
         */
+       SnapMgrInit();
        BTreeShmemInit();
        SyncScanShmemInit();
        AsyncShmemInit();
index 01cfa9d5f90ddc403af4f4a8b04dd1f37788a1f9..5bc9fd6595e1e2ca29e97585a8e35c1f483c58e4 100644 (file)
@@ -1759,6 +1759,15 @@ GetSnapshotData(Snapshot snapshot)
        snapshot->regd_count = 0;
        snapshot->copied = false;
 
+       /*
+        * Capture the current time and WAL stream location in case this snapshot
+        * becomes old enough to need to fall back on the special "old snapshot"
+        * logic.
+        */
+       snapshot->lsn = GetXLogInsertRecPtr();
+       snapshot->whenTaken = GetSnapshotCurrentTimestamp();
+       MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
+
        return snapshot;
 }
 
index c557cb68d0b3255777af88ac2dfcd8bb11029689..f8996cd21a552089d08978936cd7418f9232aee9 100644 (file)
@@ -46,3 +46,4 @@ CommitTsControlLock                                   38
 CommitTsLock                                           39
 ReplicationOriginLock                          40
 MultiXactTruncationLock                                41
+OldSnapshotTimeMapLock                         42
index 49494f9cd31510e3716145a0c0bedbf40592fc15..be924d58bd56b7beda10c38c19efc12290a87d96 100644 (file)
@@ -417,6 +417,10 @@ Section: Class 58 - System Error (errors external to PostgreSQL itself)
 58P01    E    ERRCODE_UNDEFINED_FILE                                         undefined_file
 58P02    E    ERRCODE_DUPLICATE_FILE                                         duplicate_file
 
+Section: Class 72 - Snapshot Failure
+# (class borrowed from Oracle)
+72000    E    ERRCODE_SNAPSHOT_TOO_OLD                                       snapshot_too_old
+
 Section: Class F0 - Configuration File Error
 
 # (PostgreSQL-specific error class)
index f7ed167d7f87dc196f8ccd5731cf8dbcc46fa05c..fb091bc4a0bdde429fcc3662c4e4b658cac8bf1c 100644 (file)
@@ -2677,6 +2677,17 @@ static struct config_int ConfigureNamesInt[] =
                check_autovacuum_work_mem, NULL, NULL
        },
 
+       {
+               {"old_snapshot_threshold", PGC_POSTMASTER, RESOURCES_ASYNCHRONOUS,
+                       gettext_noop("Time before a snapshot is too old to read pages changed after the snapshot was taken."),
+                       gettext_noop("A value of -1 disables this feature."),
+                       GUC_UNIT_MIN
+               },
+               &old_snapshot_threshold,
+               -1, -1, MINS_PER_HOUR * HOURS_PER_DAY * 60,
+               NULL, NULL, NULL
+       },
+
        {
                {"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER,
                        gettext_noop("Time between issuing TCP keepalives."),
index bcc86e29d27743e48c43505999837f539f5f53cf..d4dd285ef0a6b52333d630cf0cc5d010df8aa730 100644 (file)
 #effective_io_concurrency = 1          # 1-1000; 0 disables prefetching
 #max_worker_processes = 8
 #max_parallel_degree = 0               # max number of worker processes per node
+#old_snapshot_threshold = -1           # 1min-60d; -1 disables; 0 is immediate
+                                                                       # (change requires restart)
 
 
 #------------------------------------------------------------------------------
index b88e01200419b114309f01ab7dd8ab4e5c9cc3b3..19504c3598747db0dd121bc29e8af9c82a20738c 100644 (file)
 
 #include "access/transam.h"
 #include "access/xact.h"
+#include "access/xlog.h"
+#include "catalog/catalog.h"
 #include "lib/pairingheap.h"
 #include "miscadmin.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/sinval.h"
+#include "storage/spin.h"
 #include "utils/builtins.h"
 #include "utils/memutils.h"
+#include "utils/rel.h"
 #include "utils/resowner_private.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/tqual.h"
 
 
+/*
+ * GUC parameters
+ */
+int                    old_snapshot_threshold;         /* number of minutes, -1 disables */
+
+/*
+ * Structure for dealing with old_snapshot_threshold implementation.
+ */
+typedef struct OldSnapshotControlData
+{
+       /*
+        * Variables for old snapshot handling are shared among processes and are
+        * only allowed to move forward.
+        */
+       slock_t         mutex_current;                  /* protect current timestamp */
+       int64           current_timestamp;              /* latest snapshot timestamp */
+       slock_t         mutex_latest_xmin;              /* protect latest snapshot xmin */
+       TransactionId latest_xmin;                      /* latest snapshot xmin */
+       slock_t         mutex_threshold;                /* protect threshold fields */
+       int64           threshold_timestamp;    /* earlier snapshot is old */
+       TransactionId threshold_xid;            /* earlier xid may be gone */
+
+       /*
+        * Keep one xid per minute for old snapshot error handling.
+        *
+        * Use a circular buffer with a head offset, a count of entries currently
+        * used, and a timestamp corresponding to the xid at the head offset.  A
+        * count_used value of zero means that there are no times stored; a
+        * count_used value of old_snapshot_threshold means that the buffer is
+        * full and the head must be advanced to add new entries.  Use timestamps
+        * aligned to minute boundaries, since that seems less surprising than
+        * aligning based on the first usage timestamp.
+        *
+        * It is OK if the xid for a given time slot is from earlier than
+        * calculated by adding the number of minutes corresponding to the
+        * (possibly wrapped) distance from the head offset to the time of the
+        * head entry, since that just results in the vacuuming of old tuples
+        * being slightly less aggressive.  It would not be OK for it to be off in
+        * the other direction, since it might result in vacuuming tuples that are
+        * still expected to be there.
+        *
+        * Use of an SLRU was considered but not chosen because it is more
+        * heavyweight than is needed for this, and would probably not be any less
+        * code to implement.
+        *
+        * Persistence is not needed.
+        */
+       int                     head_offset;            /* subscript of oldest tracked time */
+       int64           head_timestamp;         /* time corresponding to head xid */
+       int                     count_used;                     /* how many slots are in use */
+       TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER];
+}      OldSnapshotControlData;
+
+typedef struct OldSnapshotControlData *OldSnapshotControl;
+
+static volatile OldSnapshotControl oldSnapshotControl;
+
+
 /*
  * CurrentSnapshot points to the only snapshot taken in transaction-snapshot
  * mode, and to the latest one taken in a read-committed transaction.
@@ -153,6 +215,7 @@ static Snapshot FirstXactSnapshot = NULL;
 static List *exportedSnapshots = NIL;
 
 /* Prototypes for local functions */
+static int64 AlignTimestampToMinuteBoundary(int64 ts);
 static Snapshot CopySnapshot(Snapshot snapshot);
 static void FreeSnapshot(Snapshot snapshot);
 static void SnapshotResetXmin(void);
@@ -174,6 +237,49 @@ typedef struct SerializedSnapshotData
        CommandId       curcid;
 } SerializedSnapshotData;
 
+Size
+SnapMgrShmemSize(void)
+{
+       Size            size;
+
+       size = offsetof(OldSnapshotControlData, xid_by_minute);
+       if (old_snapshot_threshold > 0)
+               size = add_size(size, mul_size(sizeof(TransactionId),
+                                                                          old_snapshot_threshold));
+
+       return size;
+}
+
+/*
+ * Initialize for managing old snapshot detection.
+ */
+void
+SnapMgrInit(void)
+{
+       bool            found;
+
+       /*
+        * Create or attach to the OldSnapshotControl structure.
+        */
+       oldSnapshotControl = (OldSnapshotControl)
+               ShmemInitStruct("OldSnapshotControlData",
+                                               SnapMgrShmemSize(), &found);
+
+       if (!found)
+       {
+               SpinLockInit(&oldSnapshotControl->mutex_current);
+               oldSnapshotControl->current_timestamp = 0;
+               SpinLockInit(&oldSnapshotControl->mutex_latest_xmin);
+               oldSnapshotControl->latest_xmin = InvalidTransactionId;
+               SpinLockInit(&oldSnapshotControl->mutex_threshold);
+               oldSnapshotControl->threshold_timestamp = 0;
+               oldSnapshotControl->threshold_xid = InvalidTransactionId;
+               oldSnapshotControl->head_offset = 0;
+               oldSnapshotControl->head_timestamp = 0;
+               oldSnapshotControl->count_used = 0;
+       }
+}
+
 /*
  * GetTransactionSnapshot
  *             Get the appropriate snapshot for a new query in a transaction.
@@ -1405,6 +1511,304 @@ ThereAreNoPriorRegisteredSnapshots(void)
        return false;
 }
 
+
+/*
+ * Return an int64 timestamp which is exactly on a minute boundary.
+ *
+ * If the argument is already aligned, return that value, otherwise move to
+ * the next minute boundary following the given time.
+ */
+static int64
+AlignTimestampToMinuteBoundary(int64 ts)
+{
+       int64           retval = ts + (USECS_PER_MINUTE - 1);
+
+       return retval - (retval % USECS_PER_MINUTE);
+}
+
+/*
+ * Get current timestamp for snapshots as int64 that never moves backward.
+ */
+int64
+GetSnapshotCurrentTimestamp(void)
+{
+       int64           now = GetCurrentIntegerTimestamp();
+
+       /*
+        * Don't let time move backward; if it hasn't advanced, use the old value.
+        */
+       SpinLockAcquire(&oldSnapshotControl->mutex_current);
+       if (now <= oldSnapshotControl->current_timestamp)
+               now = oldSnapshotControl->current_timestamp;
+       else
+               oldSnapshotControl->current_timestamp = now;
+       SpinLockRelease(&oldSnapshotControl->mutex_current);
+
+       return now;
+}
+
+/*
+ * Get timestamp through which vacuum may have processed based on last stored
+ * value for threshold_timestamp.
+ *
+ * XXX: So far, we never trust that a 64-bit value can be read atomically; if
+ * that ever changes, we could get rid of the spinlock here.
+ */
+int64
+GetOldSnapshotThresholdTimestamp(void)
+{
+       int64           threshold_timestamp;
+
+       SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+       threshold_timestamp = oldSnapshotControl->threshold_timestamp;
+       SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+
+       return threshold_timestamp;
+}
+
+static void
+SetOldSnapshotThresholdTimestamp(int64 ts, TransactionId xlimit)
+{
+       SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+       oldSnapshotControl->threshold_timestamp = ts;
+       oldSnapshotControl->threshold_xid = xlimit;
+       SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+}
+
+/*
+ * TransactionIdLimitedForOldSnapshots
+ *
+ * Apply old snapshot limit, if any.  This is intended to be called for page
+ * pruning and table vacuuming, to allow old_snapshot_threshold to override
+ * the normal global xmin value.  Actual testing for snapshot too old will be
+ * based on whether a snapshot timestamp is prior to the threshold timestamp
+ * set in this function.
+ */
+TransactionId
+TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
+                                                                       Relation relation)
+{
+       if (TransactionIdIsNormal(recentXmin)
+               && old_snapshot_threshold >= 0
+               && RelationNeedsWAL(relation)
+               && !IsCatalogRelation(relation)
+               && !RelationIsAccessibleInLogicalDecoding(relation))
+       {
+               int64           ts = GetSnapshotCurrentTimestamp();
+               TransactionId xlimit = recentXmin;
+               TransactionId latest_xmin = oldSnapshotControl->latest_xmin;
+               bool            same_ts_as_threshold = false;
+
+               /*
+                * Zero threshold always overrides to latest xmin, if valid.  Without
+                * some heuristic it will find its own snapshot too old on, for
+                * example, a simple UPDATE -- which would make it useless for most
+                * testing, but there is no principled way to ensure that it doesn't
+                * fail in this way.  Use a five-second delay to try to get useful
+                * testing behavior, but this may need adjustment.
+                */
+               if (old_snapshot_threshold == 0)
+               {
+                       if (TransactionIdPrecedes(latest_xmin, MyPgXact->xmin)
+                               && TransactionIdFollows(latest_xmin, xlimit))
+                               xlimit = latest_xmin;
+
+                       ts -= 5 * USECS_PER_SEC;
+                       SetOldSnapshotThresholdTimestamp(ts, xlimit);
+
+                       return xlimit;
+               }
+
+               ts = AlignTimestampToMinuteBoundary(ts)
+                        - (old_snapshot_threshold * USECS_PER_MINUTE);
+
+               /* Check for fast exit without LW locking. */
+               SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+               if (ts == oldSnapshotControl->threshold_timestamp)
+               {
+                       xlimit = oldSnapshotControl->threshold_xid;
+                       same_ts_as_threshold = true;
+               }
+               SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+
+               if (!same_ts_as_threshold)
+               {
+                       LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);
+
+                       if (oldSnapshotControl->count_used > 0
+                               && ts >= oldSnapshotControl->head_timestamp)
+                       {
+                               int             offset;
+
+                               offset = ((ts - oldSnapshotControl->head_timestamp)
+                                                 / USECS_PER_MINUTE);
+                               if (offset > oldSnapshotControl->count_used - 1)
+                                       offset = oldSnapshotControl->count_used - 1;
+                               offset = (oldSnapshotControl->head_offset + offset)
+                                               % old_snapshot_threshold;
+                               xlimit = oldSnapshotControl->xid_by_minute[offset];
+
+                               if (NormalTransactionIdFollows(xlimit, recentXmin))
+                                       SetOldSnapshotThresholdTimestamp(ts, xlimit);
+                       }
+
+                       LWLockRelease(OldSnapshotTimeMapLock);
+               }
+
+               /*
+                * Failsafe protection against vacuuming work of active transaction.
+                *
+                * This is not an assertion because we avoid the spinlock for
+                * performance, leaving open the possibility that xlimit could advance
+                * and be more current; but it seems prudent to apply this limit.  It
+                * might make pruning a tiny bit less agressive than it could be, but
+                * protects against data loss bugs.
+                */
+               if (TransactionIdIsNormal(latest_xmin)
+                       && TransactionIdPrecedes(latest_xmin, xlimit))
+                       xlimit = latest_xmin;
+
+               if (NormalTransactionIdFollows(xlimit, recentXmin))
+                       return xlimit;
+       }
+
+       return recentXmin;
+}
+
+/*
+ * Take care of the circular buffer that maps time to xid.
+ */
+void
+MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin)
+{
+       int64           ts;
+
+       /* Fast exit when old_snapshot_threshold is not used. */
+       if (old_snapshot_threshold < 0)
+               return;
+
+       /* Keep track of the latest xmin seen by any process. */
+       SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
+       if (TransactionIdFollows(xmin, oldSnapshotControl->latest_xmin))
+               oldSnapshotControl->latest_xmin = xmin;
+       SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
+
+       /* No further tracking needed for 0 (used for testing). */
+       if (old_snapshot_threshold == 0)
+               return;
+
+       /*
+        * We don't want to do something stupid with unusual values, but we don't
+        * want to litter the log with warnings or break otherwise normal
+        * processing for this feature; so if something seems unreasonable, just
+        * log at DEBUG level and return without doing anything.
+        */
+       if (whenTaken < 0)
+       {
+               elog(DEBUG1,
+                        "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
+                        (long) whenTaken);
+               return;
+       }
+       if (!TransactionIdIsNormal(xmin))
+       {
+               elog(DEBUG1,
+                        "MaintainOldSnapshotTimeMapping called with xmin = %lu",
+                        (unsigned long) xmin);
+               return;
+       }
+
+       ts = AlignTimestampToMinuteBoundary(whenTaken);
+
+       LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE);
+
+       Assert(oldSnapshotControl->head_offset >= 0);
+       Assert(oldSnapshotControl->head_offset < old_snapshot_threshold);
+       Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0);
+       Assert(oldSnapshotControl->count_used >= 0);
+       Assert(oldSnapshotControl->count_used <= old_snapshot_threshold);
+
+       if (oldSnapshotControl->count_used == 0)
+       {
+               /* set up first entry for empty mapping */
+               oldSnapshotControl->head_offset = 0;
+               oldSnapshotControl->head_timestamp = ts;
+               oldSnapshotControl->count_used = 1;
+               oldSnapshotControl->xid_by_minute[0] = xmin;
+       }
+       else if (ts < oldSnapshotControl->head_timestamp)
+       {
+               /* old ts; log it at DEBUG */
+               LWLockRelease(OldSnapshotTimeMapLock);
+               elog(DEBUG1,
+                        "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
+                        (long) whenTaken);
+               return;
+       }
+       else if (ts <= (oldSnapshotControl->head_timestamp +
+                                       ((oldSnapshotControl->count_used - 1)
+                                        * USECS_PER_MINUTE)))
+       {
+               /* existing mapping; advance xid if possible */
+               int             bucket = (oldSnapshotControl->head_offset
+                                                 + ((ts - oldSnapshotControl->head_timestamp)
+                                                        / USECS_PER_MINUTE))
+                                                % old_snapshot_threshold;
+
+               if (TransactionIdPrecedes(oldSnapshotControl->xid_by_minute[bucket], xmin))
+                       oldSnapshotControl->xid_by_minute[bucket] = xmin;
+       }
+       else
+       {
+               /* We need a new bucket, but it might not be the very next one. */
+               int             advance = ((ts - oldSnapshotControl->head_timestamp)
+                                                  / USECS_PER_MINUTE);
+
+               oldSnapshotControl->head_timestamp = ts;
+
+               if (advance >= old_snapshot_threshold)
+               {
+                       /* Advance is so far that all old data is junk; start over. */
+                       oldSnapshotControl->head_offset = 0;
+                       oldSnapshotControl->count_used = 1;
+                       oldSnapshotControl->xid_by_minute[0] = xmin;
+               }
+               else
+               {
+                       /* Store the new value in one or more buckets. */
+                       int i;
+
+                       for (i = 0; i < advance; i++)
+                       {
+                               if (oldSnapshotControl->count_used == old_snapshot_threshold)
+                               {
+                                       /* Map full and new value replaces old head. */
+                                       int             old_head = oldSnapshotControl->head_offset;
+
+                                       if (old_head == (old_snapshot_threshold - 1))
+                                               oldSnapshotControl->head_offset = 0;
+                                       else
+                                               oldSnapshotControl->head_offset = old_head + 1;
+                                       oldSnapshotControl->xid_by_minute[old_head] = xmin;
+                               }
+                               else
+                               {
+                                       /* Extend map to unused entry. */
+                                       int             new_tail = (oldSnapshotControl->head_offset
+                                                                               + oldSnapshotControl->count_used)
+                                                                          % old_snapshot_threshold;
+
+                                       oldSnapshotControl->count_used++;
+                                       oldSnapshotControl->xid_by_minute[new_tail] = xmin;
+                               }
+                       }
+               }
+       }
+
+       LWLockRelease(OldSnapshotTimeMapLock);
+}
+
+
 /*
  * Setup a snapshot that replaces normal catalog snapshots that allows catalog
  * access to behave just like it did at a certain point in the past.
index 19528bf79ed2cf3f4764c43760472efd09c4ef2c..89054e01281636b50af6cba74299ffc2a6c40fc9 100644 (file)
 #include "storage/itemptr.h"
 #include "storage/off.h"
 #include "utils/relcache.h"
+#include "utils/snapshot.h"
 
 /* struct definition lives in brin_revmap.c */
 typedef struct BrinRevmap BrinRevmap;
 
 extern BrinRevmap *brinRevmapInitialize(Relation idxrel,
-                                        BlockNumber *pagesPerRange);
+                                        BlockNumber *pagesPerRange, Snapshot snapshot);
 extern void brinRevmapTerminate(BrinRevmap *revmap);
 
 extern void brinRevmapExtend(BrinRevmap *revmap,
@@ -34,6 +35,6 @@ extern void brinSetHeapBlockItemptr(Buffer rmbuf, BlockNumber pagesPerRange,
                                                BlockNumber heapBlk, ItemPointerData tid);
 extern BrinTuple *brinGetTupleForHeapBlock(BrinRevmap *revmap,
                                                 BlockNumber heapBlk, Buffer *buf, OffsetNumber *off,
-                                                Size *size, int mode);
+                                                Size *size, int mode, Snapshot snapshot);
 
 #endif   /* BRIN_REVMAP_H */
index e212c9ff1e0504271bce4147333884e6f19c6728..a7d4a90d4b1d9ce55a24b3351a5b3143c6bc249c 100644 (file)
@@ -703,7 +703,7 @@ typedef struct
  * PostingItem
  */
 
-extern GinBtreeStack *ginFindLeafPage(GinBtree btree, bool searchMode);
+extern GinBtreeStack *ginFindLeafPage(GinBtree btree, bool searchMode, Snapshot snapshot);
 extern Buffer ginStepRight(Buffer buffer, Relation index, int lockmode);
 extern void freeGinBtreeStack(GinBtreeStack *stack);
 extern void ginInsertValue(GinBtree btree, GinBtreeStack *stack,
@@ -731,7 +731,7 @@ extern void GinPageDeletePostingItem(Page page, OffsetNumber offset);
 extern void ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
                                          ItemPointerData *items, uint32 nitem,
                                          GinStatsData *buildStats);
-extern GinBtreeStack *ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno);
+extern GinBtreeStack *ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno, Snapshot snapshot);
 extern void ginDataFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage);
 extern void ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno);
 
index 9046b166bd9a56262f9cd02fd22b5a365307b64b..ca5034907dd6b27633aa75513fa63924804a70b9 100644 (file)
@@ -710,17 +710,18 @@ extern int        _bt_pagedel(Relation rel, Buffer buf);
  */
 extern BTStack _bt_search(Relation rel,
                   int keysz, ScanKey scankey, bool nextkey,
-                  Buffer *bufP, int access);
+                  Buffer *bufP, int access, Snapshot snapshot);
 extern Buffer _bt_moveright(Relation rel, Buffer buf, int keysz,
                          ScanKey scankey, bool nextkey, bool forupdate, BTStack stack,
-                         int access);
+                         int access, Snapshot snapshot);
 extern OffsetNumber _bt_binsrch(Relation rel, Buffer buf, int keysz,
                        ScanKey scankey, bool nextkey);
 extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey,
                        Page page, OffsetNumber offnum);
 extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
 extern bool _bt_next(IndexScanDesc scan, ScanDirection dir);
-extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost);
+extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
+                                                          Snapshot snapshot);
 
 /*
  * prototypes for functions in nbtutils.c
index 4c15934f36b95ba20272d7828299c0894ea502dc..6fea1bc13dcb93520e3b6c812cf10b378cdd9b72 100644 (file)
@@ -180,11 +180,26 @@ extern PGDLLIMPORT int32 *LocalRefCount;
 /*
  * BufferGetPage
  *             Returns the page associated with a buffer.
+ *
+ * agetest will normally be a literal, so use a macro at the outer level to
+ * give the compiler a chance to optimize away the runtime code to check it.
+ *
+ * TestForOldSnapshot(), if it doesn't throw an error, will return the page
+ * argument it is passed, so the same result will go back to this macro's
+ * caller for either agetest value; it is a matter of whether to call the
+ * function to perform the test.  For call sites where the check is not needed
+ * (which is the vast majority of them), the snapshot and relation parameters
+ * can, and generally should, be NULL.
  */
 #define BufferGetPage(buffer, snapshot, relation, agetest) \
 ( \
-       AssertMacro((agetest) == BGP_NO_SNAPSHOT_TEST), \
-       ((Page)BufferGetBlock(buffer)) \
+       ( \
+               AssertMacro((agetest) == BGP_NO_SNAPSHOT_TEST || (agetest) == BGP_TEST_FOR_OLD_SNAPSHOT), \
+               ((agetest) == BGP_NO_SNAPSHOT_TEST) \
+       ) ? \
+               ((Page)BufferGetBlock(buffer)) \
+       : \
+               (TestForOldSnapshot(snapshot, relation, (Page)BufferGetBlock(buffer))) \
 )
 
 /*
index c7582c2a11ca27ab476a2f652e8a6bf3c8b366dc..b5d82d60042650b628a3bb4087b8c76aa280d099 100644 (file)
@@ -15,6 +15,7 @@
 #define REL_H
 
 #include "access/tupdesc.h"
+#include "access/xlog.h"
 #include "catalog/pg_class.h"
 #include "catalog/pg_index.h"
 #include "fmgr.h"
index a9e9066dd2958eed32dd49d741f65fcd6a5ae13e..371042a6077069f51a1a0d39fe0391ba78d46dcc 100644 (file)
 #define SNAPMGR_H
 
 #include "fmgr.h"
+#include "utils/relcache.h"
 #include "utils/resowner.h"
 #include "utils/snapshot.h"
 
 
+/* GUC variables */
+extern int     old_snapshot_threshold;
+
+
+extern Size SnapMgrShmemSize(void);
+extern void SnapMgrInit(void);
+extern int64 GetSnapshotCurrentTimestamp(void);
+extern int64 GetOldSnapshotThresholdTimestamp(void);
+
 extern bool FirstSnapshotSet;
 
 extern TransactionId TransactionXmin;
@@ -54,6 +64,9 @@ extern void ImportSnapshot(const char *idstr);
 extern bool XactHasExportedSnapshots(void);
 extern void DeleteAllExportedSnapshotFiles(void);
 extern bool ThereAreNoPriorRegisteredSnapshots(void);
+extern TransactionId TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
+                                                                                                                Relation relation);
+extern void MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin);
 
 extern char *ExportSnapshot(Snapshot snapshot);
 
index 2a563633d2bfa7fd23a6a3d787c74075d216edeb..998e2e593d06ea07eb6b6459332e70d6d7dd3820 100644 (file)
@@ -14,6 +14,7 @@
 #define SNAPSHOT_H
 
 #include "access/htup.h"
+#include "access/xlogdefs.h"
 #include "lib/pairingheap.h"
 #include "storage/buf.h"
 
@@ -105,6 +106,9 @@ typedef struct SnapshotData
        uint32          active_count;   /* refcount on ActiveSnapshot stack */
        uint32          regd_count;             /* refcount on RegisteredSnapshots */
        pairingheap_node ph_node;       /* link in the RegisteredSnapshots heap */
+
+       int64           whenTaken;              /* timestamp when snapshot was taken */
+       XLogRecPtr      lsn;                    /* position in the WAL stream when taken */
 } SnapshotData;
 
 /*
index ebdcdc8c2ae9d633e78e65e931ade78020e89830..d086163fabcc427dbdf85fec1ca0a2d56658d0db 100644 (file)
@@ -8,6 +8,7 @@ SUBDIRS = \
                  brin \
                  commit_ts \
                  dummy_seclabel \
+                 snapshot_too_old \
                  test_ddl_deparse \
                  test_extensions \
                  test_parser \
diff --git a/src/test/modules/snapshot_too_old/Makefile b/src/test/modules/snapshot_too_old/Makefile
new file mode 100644 (file)
index 0000000..16339f0
--- /dev/null
@@ -0,0 +1,47 @@
+# src/test/modules/snapshot_too_old/Makefile
+
+EXTRA_CLEAN = ./isolation_output
+
+ISOLATIONCHECKS=sto_using_cursor sto_using_select
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/snapshot_too_old
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
+
+# Disabled because these tests require "old_snapshot_threshold" >= 0, which
+# typical installcheck users do not have (e.g. buildfarm clients).
+installcheck:;
+
+# But it can nonetheless be very helpful to run tests on preexisting
+# installation, allow to do so, but only if requested explicitly.
+installcheck-force: isolationcheck-install-force
+
+check: isolationcheck
+
+submake-isolation:
+       $(MAKE) -C $(top_builddir)/src/test/isolation all
+
+submake-test_snapshot_too_old:
+       $(MAKE) -C $(top_builddir)/src/test/modules/snapshot_too_old
+
+isolationcheck: | submake-isolation submake-test_snapshot_too_old temp-install
+       $(MKDIR_P) isolation_output
+       $(pg_isolation_regress_check) \
+           --temp-config $(top_srcdir)/src/test/modules/snapshot_too_old/sto.conf \
+           --outputdir=./isolation_output \
+           $(ISOLATIONCHECKS)
+
+isolationcheck-install-force: all | submake-isolation submake-test_snapshot_too_old temp-install
+       $(pg_isolation_regress_installcheck) \
+           $(ISOLATIONCHECKS)
+
+.PHONY: check submake-test_snapshot_too_old isolationcheck isolationcheck-install-force
+
+temp-install: EXTRA_INSTALL=src/test/modules/snapshot_too_old
diff --git a/src/test/modules/snapshot_too_old/expected/sto_using_cursor.out b/src/test/modules/snapshot_too_old/expected/sto_using_cursor.out
new file mode 100644 (file)
index 0000000..8cc29ec
--- /dev/null
@@ -0,0 +1,73 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1decl s1f1 s1sleep s1f2 s2u
+step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
+step s1f1: FETCH FIRST FROM cursor1;
+c              
+
+1              
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s1f2: FETCH FIRST FROM cursor1;
+c              
+
+1              
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+
+starting permutation: s1decl s1f1 s1sleep s2u s1f2
+step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
+step s1f1: FETCH FIRST FROM cursor1;
+c              
+
+1              
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1f2: FETCH FIRST FROM cursor1;
+ERROR:  snapshot too old
+
+starting permutation: s1decl s1f1 s2u s1sleep s1f2
+step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
+step s1f1: FETCH FIRST FROM cursor1;
+c              
+
+1              
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s1f2: FETCH FIRST FROM cursor1;
+ERROR:  snapshot too old
+
+starting permutation: s1decl s2u s1f1 s1sleep s1f2
+step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1f1: FETCH FIRST FROM cursor1;
+c              
+
+1              
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s1f2: FETCH FIRST FROM cursor1;
+ERROR:  snapshot too old
+
+starting permutation: s2u s1decl s1f1 s1sleep s1f2
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
+step s1f1: FETCH FIRST FROM cursor1;
+c              
+
+2              
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s1f2: FETCH FIRST FROM cursor1;
+ERROR:  snapshot too old
diff --git a/src/test/modules/snapshot_too_old/expected/sto_using_select.out b/src/test/modules/snapshot_too_old/expected/sto_using_select.out
new file mode 100644 (file)
index 0000000..eb15bc2
--- /dev/null
@@ -0,0 +1,55 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1f1 s1sleep s1f2 s2u
+step s1f1: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+c              
+
+1              
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s1f2: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+c              
+
+1              
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+
+starting permutation: s1f1 s1sleep s2u s1f2
+step s1f1: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+c              
+
+1              
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1f2: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+ERROR:  snapshot too old
+
+starting permutation: s1f1 s2u s1sleep s1f2
+step s1f1: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+c              
+
+1              
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s1f2: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+ERROR:  snapshot too old
+
+starting permutation: s2u s1f1 s1sleep s1f2
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1f1: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+c              
+
+2              
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting        pg_sleep       
+
+0                             
+step s1f2: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+ERROR:  snapshot too old
diff --git a/src/test/modules/snapshot_too_old/specs/sto_using_cursor.spec b/src/test/modules/snapshot_too_old/specs/sto_using_cursor.spec
new file mode 100644 (file)
index 0000000..eac18ca
--- /dev/null
@@ -0,0 +1,37 @@
+# This test provokes a "snapshot too old" error using a cursor.
+#
+# The sleep is needed because with a threshold of zero a statement could error
+# on changes it made.  With more normal settings no external delay is needed,
+# but we don't want these tests to run long enough to see that, since
+# granularity is in minutes.
+#
+# Since results depend on the value of old_snapshot_threshold, sneak that into
+# the line generated by the sleep, so that a surprising values isn't so hard
+# to identify.
+
+setup
+{
+    CREATE TABLE sto1 (c int NOT NULL);
+    INSERT INTO sto1 SELECT generate_series(1, 1000);
+    CREATE TABLE sto2 (c int NOT NULL);
+}
+setup
+{
+    VACUUM ANALYZE sto1;
+}
+
+teardown
+{
+    DROP TABLE sto1, sto2;
+}
+
+session "s1"
+setup                  { BEGIN ISOLATION LEVEL REPEATABLE READ; }
+step "s1decl"  { DECLARE cursor1 CURSOR FOR SELECT c FROM sto1; }
+step "s1f1"            { FETCH FIRST FROM cursor1; }
+step "s1sleep" { SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold'; }
+step "s1f2"            { FETCH FIRST FROM cursor1; }
+teardown               { COMMIT; }
+
+session "s2"
+step "s2u"             { UPDATE sto1 SET c = 1001 WHERE c = 1; }
diff --git a/src/test/modules/snapshot_too_old/specs/sto_using_select.spec b/src/test/modules/snapshot_too_old/specs/sto_using_select.spec
new file mode 100644 (file)
index 0000000..d7c34f3
--- /dev/null
@@ -0,0 +1,36 @@
+# This test provokes a "snapshot too old" error using SELECT statements.
+#
+# The sleep is needed because with a threshold of zero a statement could error
+# on changes it made.  With more normal settings no external delay is needed,
+# but we don't want these tests to run long enough to see that, since
+# granularity is in minutes.
+#
+# Since results depend on the value of old_snapshot_threshold, sneak that into
+# the line generated by the sleep, so that a surprising values isn't so hard
+# to identify.
+
+setup
+{
+    CREATE TABLE sto1 (c int NOT NULL);
+    INSERT INTO sto1 SELECT generate_series(1, 1000);
+    CREATE TABLE sto2 (c int NOT NULL);
+}
+setup
+{
+    VACUUM ANALYZE sto1;
+}
+
+teardown
+{
+    DROP TABLE sto1, sto2;
+}
+
+session "s1"
+setup                  { BEGIN ISOLATION LEVEL REPEATABLE READ; }
+step "s1f1"            { SELECT c FROM sto1 ORDER BY c LIMIT 1; }
+step "s1sleep" { SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold'; }
+step "s1f2"            { SELECT c FROM sto1 ORDER BY c LIMIT 1; }
+teardown               { COMMIT; }
+
+session "s2"
+step "s2u"             { UPDATE sto1 SET c = 1001 WHERE c = 1; }
diff --git a/src/test/modules/snapshot_too_old/sto.conf b/src/test/modules/snapshot_too_old/sto.conf
new file mode 100644 (file)
index 0000000..ce8048f
--- /dev/null
@@ -0,0 +1,3 @@
+autovacuum = off
+old_snapshot_threshold = 0
+