blkno, RBM_NORMAL, bas);
LockBuffer(buffer, BUFFER_LOCK_SHARE);
- page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(buffer, scan->xs_snapshot, scan->indexRelation,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
if (!BloomPageIsDeleted(page))
{
</para>
</listitem>
</varlistentry>
+
+ <varlistentry id="guc-old-snapshot-threshold" xreflabel="old_snapshot_threshold">
+ <term><varname>old_snapshot_threshold</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>old_snapshot_threshold</> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the minimum time that a snapshot can be used without risk of a
+ <literal>snapshot too old</> error occurring when using the snapshot.
+ This parameter can only be set at server start.
+ </para>
+
+ <para>
+ Beyond the threshold, old data may be vacuumed away. This can help
+ prevent bloat in the face of snapshots which remain in use for a
+ long time. To prevent incorrect results due to cleanup of data which
+ would otherwise be visible to the snapshot, an error is generated
+ when the snapshot is older than this threshold and the snapshot is
+ used to read a page which has been modified since the snapshot was
+ built.
+ </para>
+
+ <para>
+ A value of <literal>-1</> disables this feature, and is the default.
+ Useful values for production work probably range from a small number
+ of hours to a few days. The setting will be coerced to a granularity
+ of minutes, and small numbers (such as <literal>0</> or
+ <literal>1min</>) are only allowed because they may sometimes be
+ useful for testing. While a setting as high as <literal>60d</> is
+ allowed, please note that in many workloads extreme bloat or
+ transaction ID wraparound may occur in much shorter time frames.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</sect2>
</sect1>
You should also consider setting <varname>hot_standby_feedback</>
on standby server(s) as an alternative to using this parameter.
</para>
+ <para>
+ This does not prevent cleanup of dead rows which have reached the age
+ specified by <varname>old_snapshot_threshold</>.
+ </para>
</listitem>
</varlistentry>
until it eventually reaches the primary. Standbys make no other use
of feedback they receive other than to pass upstream.
</para>
+ <para>
+ This setting does not override the behavior of
+ <varname>old_snapshot_threshold</> on the primary; a snapshot on the
+ standby which exceeds the primary's age threshold can become invalid,
+ resulting in cancellation of transactions on the standby. This is
+ because <varname>old_snapshot_threshold</> is intended to provide an
+ absolute limit on the time which dead rows can contribute to bloat,
+ which would otherwise be violated because of the configuration of a
+ standby.
+ </para>
</listitem>
</varlistentry>
MemoryContext tupcxt = NULL;
MemoryContext oldcxt = NULL;
- revmap = brinRevmapInitialize(idxRel, &pagesPerRange);
+ revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
for (;;)
{
/* normalize the block number to be the first block in the range */
heapBlk = (heapBlk / pagesPerRange) * pagesPerRange;
brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
- BUFFER_LOCK_SHARE);
+ BUFFER_LOCK_SHARE, NULL);
/* if range is unsummarized, there's nothing to do */
if (!brtup)
scan = RelationGetIndexScan(r, nkeys, norderbys);
opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
- opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange);
+ opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
+ scan->xs_snapshot);
opaque->bo_bdesc = brin_build_desc(r);
scan->opaque = opaque;
MemoryContextResetAndDeleteChildren(perRangeCxt);
tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
- &off, &size, BUFFER_LOCK_SHARE);
+ &off, &size, BUFFER_LOCK_SHARE,
+ scan->xs_snapshot);
if (tup)
{
tup = brin_copy_tuple(tup, size);
/*
* Initialize our state, including the deformed tuple state.
*/
- revmap = brinRevmapInitialize(index, &pagesPerRange);
+ revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
state = initialize_brin_buildstate(index, revmap, pagesPerRange);
/*
* the same.)
*/
phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
- &offset, &phsz, BUFFER_LOCK_SHARE);
+ &offset, &phsz, BUFFER_LOCK_SHARE,
+ NULL);
/* the placeholder tuple must exist */
if (phtup == NULL)
elog(ERROR, "missing placeholder tuple");
BlockNumber pagesPerRange;
Buffer buf;
- revmap = brinRevmapInitialize(index, &pagesPerRange);
+ revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
/*
* Scan the revmap to find unsummarized items.
CHECK_FOR_INTERRUPTS();
tup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
- BUFFER_LOCK_SHARE);
+ BUFFER_LOCK_SHARE, NULL);
if (tup == NULL)
{
/* no revmap entry for this heap range. Summarize it. */
* brinRevmapTerminate when caller is done with it.
*/
BrinRevmap *
-brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange)
+brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange,
+ Snapshot snapshot)
{
BrinRevmap *revmap;
Buffer meta;
meta = ReadBuffer(idxrel, BRIN_METAPAGE_BLKNO);
LockBuffer(meta, BUFFER_LOCK_SHARE);
- page = BufferGetPage(meta, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(meta, snapshot, idxrel, BGP_TEST_FOR_OLD_SNAPSHOT);
metadata = (BrinMetaPageData *) PageGetContents(page);
revmap = palloc(sizeof(BrinRevmap));
*/
BrinTuple *
brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk,
- Buffer *buf, OffsetNumber *off, Size *size, int mode)
+ Buffer *buf, OffsetNumber *off, Size *size, int mode,
+ Snapshot snapshot)
{
Relation idxRel = revmap->rm_irel;
BlockNumber mapBlk;
*buf = ReadBuffer(idxRel, blk);
}
LockBuffer(*buf, mode);
- page = BufferGetPage(*buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(*buf, snapshot, idxRel,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
/* If we land on a revmap page, start over */
if (BRIN_IS_REGULAR_PAGE(page))
* is share-locked, and stack->parent is NULL.
*/
GinBtreeStack *
-ginFindLeafPage(GinBtree btree, bool searchMode)
+ginFindLeafPage(GinBtree btree, bool searchMode, Snapshot snapshot)
{
GinBtreeStack *stack;
stack->off = InvalidOffsetNumber;
- page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(stack->buffer, snapshot, btree->index,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
access = ginTraverseLock(stack->buffer, searchMode);
stack->buffer = ginStepRight(stack->buffer, btree->index, access);
stack->blkno = rightlink;
- page = BufferGetPage(stack->buffer, NULL, NULL,
- BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(stack->buffer, snapshot, btree->index,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
if (!searchMode && GinPageIsIncompleteSplit(page))
ginFinishSplit(btree, stack, false, NULL);
{
/* search for the leaf page where the first item should go to */
btree.itemptr = insertdata.items[insertdata.curitem];
- stack = ginFindLeafPage(&btree, false);
+ stack = ginFindLeafPage(&btree, false, NULL);
ginInsertValue(&btree, stack, &insertdata, buildStats);
}
* Starts a new scan on a posting tree.
*/
GinBtreeStack *
-ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno)
+ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno,
+ Snapshot snapshot)
{
GinBtreeStack *stack;
btree->fullScan = TRUE;
- stack = ginFindLeafPage(btree, TRUE);
+ stack = ginFindLeafPage(btree, TRUE, snapshot);
return stack;
}
Page page;
/* Descend to the leftmost leaf page */
- stack = ginScanBeginPostingTree(&btree, index, rootPostingTree);
+ stack = ginScanBeginPostingTree(&btree, index, rootPostingTree, snapshot);
buffer = stack->buffer;
IncrBufferRefCount(buffer); /* prevent unpin in freeGinBtreeStack */
if (moveRightIfItNeeded(btree, stack) == false)
return true;
- page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(stack->buffer, snapshot, btree->index,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, stack->off));
/*
ginPrepareEntryScan(&btreeEntry, entry->attnum,
entry->queryKey, entry->queryCategory,
ginstate);
- stackEntry = ginFindLeafPage(&btreeEntry, true);
+ stackEntry = ginFindLeafPage(&btreeEntry, true, snapshot);
page = BufferGetPage(stackEntry->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
needUnlock = TRUE;
needUnlock = FALSE;
stack = ginScanBeginPostingTree(&entry->btree, ginstate->index,
- rootPostingTree);
+ rootPostingTree, snapshot);
entry->buffer = stack->buffer;
/*
entry->btree.itemptr.ip_posid++;
}
entry->btree.fullScan = false;
- stack = ginFindLeafPage(&entry->btree, true);
+ stack = ginFindLeafPage(&entry->btree, true, snapshot);
/* we don't need the stack, just the buffer. */
entry->buffer = stack->buffer;
ItemPointerSetInvalid(&pos->item);
for (;;)
{
- page = BufferGetPage(pos->pendingBuffer, NULL,
- NULL, BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(pos->pendingBuffer, scan->xs_snapshot,
+ scan->indexRelation, BGP_TEST_FOR_OLD_SNAPSHOT);
maxoff = PageGetMaxOffsetNumber(page);
if (pos->firstOffset > maxoff)
memset(datumExtracted + pos->firstOffset - 1, 0,
sizeof(bool) * (pos->lastOffset - pos->firstOffset));
- page = BufferGetPage(pos->pendingBuffer, NULL,
- NULL, BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(pos->pendingBuffer, scan->xs_snapshot,
+ scan->indexRelation, BGP_TEST_FOR_OLD_SNAPSHOT);
for (i = 0; i < so->nkeys; i++)
{
*ntids = 0;
LockBuffer(metabuffer, GIN_SHARE);
- page = BufferGetPage(metabuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(metabuffer, scan->xs_snapshot, scan->indexRelation,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
blkno = GinPageGetMeta(page)->head;
/*
ginPrepareEntryScan(&btree, attnum, key, category, ginstate);
- stack = ginFindLeafPage(&btree, false);
+ stack = ginFindLeafPage(&btree, false, NULL);
page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
if (btree.findItem(&btree, stack))
buffer = ReadBuffer(scan->indexRelation, pageItem->blkno);
LockBuffer(buffer, GIST_SHARE);
gistcheckpage(scan->indexRelation, buffer);
- page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(buffer, scan->xs_snapshot, r, BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = GistPageGetOpaque(page);
/*
buf = so->hashso_curbuf;
Assert(BufferIsValid(buf));
- page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(buf, scan->xs_snapshot, rel,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
maxoffnum = PageGetMaxOffsetNumber(page);
for (offnum = ItemPointerGetOffsetNumber(current);
offnum <= maxoffnum;
/* Read the metapage */
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
- page = BufferGetPage(metabuf, NULL, NULL,
- BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(metabuf, scan->xs_snapshot, rel,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
metap = HashPageGetMeta(page);
/*
/* Fetch the primary bucket page for the bucket */
buf = _hash_getbuf(rel, blkno, HASH_READ, LH_BUCKET_PAGE);
- page = BufferGetPage(buf, NULL, NULL,
- BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(buf, scan->xs_snapshot, rel,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (HashPageOpaque) PageGetSpecialPointer(page);
Assert(opaque->hasho_bucket == bucket);
_hash_readnext(rel, &buf, &page, &opaque);
if (BufferIsValid(buf))
{
+ TestForOldSnapshot(scan->xs_snapshot, rel, page);
maxoff = PageGetMaxOffsetNumber(page);
offnum = _hash_binsearch(page, so->hashso_sk_hash);
}
_hash_readprev(rel, &buf, &page, &opaque);
if (BufferIsValid(buf))
{
+ TestForOldSnapshot(scan->xs_snapshot, rel, page);
maxoff = PageGetMaxOffsetNumber(page);
offnum = _hash_binsearch_last(page, so->hashso_sk_hash);
}
*/
LockBuffer(buffer, BUFFER_LOCK_SHARE);
- dp = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ dp = BufferGetPage(buffer, snapshot, scan->rs_rd,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
lines = PageGetMaxOffsetNumber(dp);
ntup = 0;
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
- dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd, BGP_TEST_FOR_OLD_SNAPSHOT);
lines = PageGetMaxOffsetNumber(dp);
/* page and lineoff now reference the physically next tid */
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
- dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
lines = PageGetMaxOffsetNumber(dp);
if (!scan->rs_inited)
heapgetpage(scan, page);
/* Since the tuple was previously fetched, needn't lock page here */
- dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
lpp = PageGetItemId(dp, lineoff);
Assert(ItemIdIsNormal(lpp));
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
- dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
lines = PageGetMaxOffsetNumber((Page) dp);
linesleft = lines;
if (backward)
lineindex = scan->rs_cindex + 1;
}
- dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
lines = scan->rs_ntuples;
/* page and lineindex now reference the next visible tid */
page = scan->rs_cblock; /* current page */
}
- dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
lines = scan->rs_ntuples;
if (!scan->rs_inited)
heapgetpage(scan, page);
/* Since the tuple was previously fetched, needn't lock page here */
- dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
lpp = PageGetItemId(dp, lineoff);
Assert(ItemIdIsNormal(lpp));
heapgetpage(scan, page);
- dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
lines = scan->rs_ntuples;
linesleft = lines;
if (backward)
* Need share lock on buffer to examine tuple commit status.
*/
LockBuffer(buffer, BUFFER_LOCK_SHARE);
- page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(buffer, snapshot, relation, BGP_TEST_FOR_OLD_SNAPSHOT);
/*
* We'd better check for out-of-range offnum in case of VACUUM since the
*/
buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&ctid));
LockBuffer(buffer, BUFFER_LOCK_SHARE);
- page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(buffer, snapshot, relation,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
/*
* Check for bogus item number. This is not treated as an error
* need to use the horizon that includes slots, otherwise the data-only
* horizon can be used. Note that the toast relation of user defined
* relations are *not* considered catalog relations.
+ *
+ * It is OK to apply the old snapshot limit before acquiring the cleanup
+ * lock because the worst that can happen is that we are not quite as
+ * aggressive about the cleanup (by however many transaction IDs are
+ * consumed between this point and acquiring the lock). This allows us to
+ * save significant overhead in the case where the page is found not to be
+ * prunable.
*/
if (IsCatalogRelation(relation) ||
RelationIsAccessibleInLogicalDecoding(relation))
OldestXmin = RecentGlobalXmin;
else
- OldestXmin = RecentGlobalDataXmin;
+ OldestXmin =
+ TransactionIdLimitedForOldSnapshots(RecentGlobalDataXmin,
+ relation);
Assert(TransactionIdIsValid(OldestXmin));
top:
/* find the first page containing this key */
- stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
+ stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE, NULL);
offset = InvalidOffsetNumber;
* precise description.
*/
buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
- true, stack, BT_WRITE);
+ true, stack, BT_WRITE, NULL);
/*
* If we're not allowing duplicates, make sure the key isn't already in
elog(DEBUG2, "concurrent ROOT page split");
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
/* Find the leftmost page at the next level up */
- pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
+ pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false,
+ NULL);
/* Set up a phony stack entry pointing there */
stack = &fakestack;
stack->bts_blkno = BufferGetBlockNumber(pbuf);
itup_scankey = _bt_mkscankey(rel, targetkey);
/* find the leftmost leaf page containing this key */
stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
- false, &lbuf, BT_READ);
+ false, &lbuf, BT_READ, NULL);
/* don't need a pin on the page */
_bt_relbuf(rel, lbuf);
* address of the leaf-page buffer, which is read-locked and pinned.
* No locks are held on the parent pages, however!
*
+ * If the snapshot parameter is not NULL, "old snapshot" checking will take
+ * place during the descent through the tree. This is not needed when
+ * positioning for an insert or delete, so NULL is used for those cases.
+ *
* NOTE that the returned buffer is read-locked regardless of the access
* parameter. However, access = BT_WRITE will allow an empty root page
* to be created and returned. When access = BT_READ, an empty index
*/
BTStack
_bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
- Buffer *bufP, int access)
+ Buffer *bufP, int access, Snapshot snapshot)
{
BTStack stack_in = NULL;
*/
*bufP = _bt_moveright(rel, *bufP, keysz, scankey, nextkey,
(access == BT_WRITE), stack_in,
- BT_READ);
+ BT_READ, snapshot);
/* if this is a leaf page, we're done */
page = BufferGetPage(*bufP, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
* On entry, we have the buffer pinned and a lock of the type specified by
* 'access'. If we move right, we release the buffer and lock and acquire
* the same on the right sibling. Return value is the buffer we stop at.
+ *
+ * If the snapshot parameter is not NULL, "old snapshot" checking will take
+ * place during the descent through the tree. This is not needed when
+ * positioning for an insert or delete, so NULL is used for those cases.
*/
Buffer
_bt_moveright(Relation rel,
bool nextkey,
bool forupdate,
BTStack stack,
- int access)
+ int access,
+ Snapshot snapshot)
{
Page page;
BTPageOpaque opaque;
for (;;)
{
- page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (P_RIGHTMOST(opaque))
* Use the manufactured insertion scan key to descend the tree and
* position ourselves on the target leaf page.
*/
- stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ);
+ stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ,
+ scan->xs_snapshot);
/* don't need to keep the stack around... */
_bt_freestack(stack);
/* step right one page */
so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
/* check for deleted page */
- page = BufferGetPage(so->currPos.buf, NULL, NULL,
- BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(so->currPos.buf, scan->xs_snapshot, rel,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_IGNORE(opaque))
{
* it's not half-dead and contains matching tuples. Else loop back
* and do it all again.
*/
- page = BufferGetPage(so->currPos.buf, NULL, NULL,
- BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(so->currPos.buf, scan->xs_snapshot, rel,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_IGNORE(opaque))
{
/* check for interrupts while we're not holding any buffer lock */
CHECK_FOR_INTERRUPTS();
buf = _bt_getbuf(rel, blkno, BT_READ);
- page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
/*
break;
blkno = opaque->btpo_next;
buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
- page = BufferGetPage(buf, NULL, NULL,
- BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(buf, snapshot, rel,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
}
/* Return to the original page to see what's up */
buf = _bt_relandgetbuf(rel, buf, obknum, BT_READ);
- page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (P_ISDELETED(opaque))
{
RelationGetRelationName(rel));
blkno = opaque->btpo_next;
buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
- page = BufferGetPage(buf, NULL, NULL,
- BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(buf, snapshot, rel,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_ISDELETED(opaque))
break;
* The returned buffer is pinned and read-locked.
*/
Buffer
-_bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
+_bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
+ Snapshot snapshot)
{
Buffer buf;
Page page;
if (!BufferIsValid(buf))
return InvalidBuffer;
- page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
for (;;)
elog(ERROR, "fell off the end of index \"%s\"",
RelationGetRelationName(rel));
buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
- page = BufferGetPage(buf, NULL, NULL,
- BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(buf, snapshot, rel,
+ BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
}
* version of _bt_search(). We don't maintain a stack since we know we
* won't need it.
*/
- buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir));
+ buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir), scan->xs_snapshot);
if (!BufferIsValid(buf))
{
}
/* else new pointer points to the same page, no work needed */
- page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ page = BufferGetPage(buffer, snapshot, index, BGP_TEST_FOR_OLD_SNAPSHOT);
isnull = SpGistPageStoresNulls(page) ? true : false;
* working on a particular table at any time, and that each vacuum is
* always an independent transaction.
*/
- *oldestXmin = GetOldestXmin(rel, true);
+ *oldestXmin =
+ TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel);
Assert(TransactionIdIsNormal(*oldestXmin));
possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
if (possibly_freeable > 0 &&
(possibly_freeable >= REL_TRUNCATE_MINIMUM ||
- possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION))
+ possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION) &&
+ old_snapshot_threshold < 0)
return true;
else
return false;
context->nr_pending = 0;
}
+
+
+/*
+ * Check whether the given snapshot is too old to have safely read the given
+ * page from the given table. If so, throw a "snapshot too old" error.
+ *
+ * This test generally needs to be performed after every BufferGetPage() call
+ * that is executed as part of a scan. It is not needed for calls made for
+ * modifying the page (for example, to position to the right place to insert a
+ * new index tuple or for vacuuming). To minimize errors of omission, the
+ * BufferGetPage() macro accepts parameters to specify whether the test should
+ * be run, and supply the necessary snapshot and relation parameters. See the
+ * declaration of BufferGetPage() for more details.
+ *
+ * Note that a NULL snapshot argument is allowed and causes a fast return
+ * without error; this is to support call sites which can be called from
+ * either scans or index modification areas.
+ *
+ * For best performance, keep the tests that are fastest and/or most likely to
+ * exclude a page from old snapshot testing near the front.
+ */
+extern Page
+TestForOldSnapshot(Snapshot snapshot, Relation relation, Page page)
+{
+ Assert(relation != NULL);
+
+ if (old_snapshot_threshold >= 0
+ && (snapshot) != NULL
+ && (snapshot)->satisfies == HeapTupleSatisfiesMVCC
+ && !XLogRecPtrIsInvalid((snapshot)->lsn)
+ && PageGetLSN(page) > (snapshot)->lsn
+ && !IsCatalogRelation(relation)
+ && !RelationIsAccessibleInLogicalDecoding(relation)
+ && (snapshot)->whenTaken < GetOldSnapshotThresholdTimestamp())
+ ereport(ERROR,
+ (errcode(ERRCODE_SNAPSHOT_TOO_OLD),
+ errmsg("snapshot too old")));
+
+ return page;
+}
#include "storage/procsignal.h"
#include "storage/sinvaladt.h"
#include "storage/spin.h"
+#include "utils/snapmgr.h"
shmem_startup_hook_type shmem_startup_hook = NULL;
size = add_size(size, ReplicationOriginShmemSize());
size = add_size(size, WalSndShmemSize());
size = add_size(size, WalRcvShmemSize());
+ size = add_size(size, SnapMgrShmemSize());
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
/*
* Set up other modules that need some shared memory space
*/
+ SnapMgrInit();
BTreeShmemInit();
SyncScanShmemInit();
AsyncShmemInit();
snapshot->regd_count = 0;
snapshot->copied = false;
+ /*
+ * Capture the current time and WAL stream location in case this snapshot
+ * becomes old enough to need to fall back on the special "old snapshot"
+ * logic.
+ */
+ snapshot->lsn = GetXLogInsertRecPtr();
+ snapshot->whenTaken = GetSnapshotCurrentTimestamp();
+ MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
+
return snapshot;
}
CommitTsLock 39
ReplicationOriginLock 40
MultiXactTruncationLock 41
+OldSnapshotTimeMapLock 42
58P01 E ERRCODE_UNDEFINED_FILE undefined_file
58P02 E ERRCODE_DUPLICATE_FILE duplicate_file
+Section: Class 72 - Snapshot Failure
+# (class borrowed from Oracle)
+72000 E ERRCODE_SNAPSHOT_TOO_OLD snapshot_too_old
+
Section: Class F0 - Configuration File Error
# (PostgreSQL-specific error class)
check_autovacuum_work_mem, NULL, NULL
},
+ {
+ {"old_snapshot_threshold", PGC_POSTMASTER, RESOURCES_ASYNCHRONOUS,
+ gettext_noop("Time before a snapshot is too old to read pages changed after the snapshot was taken."),
+ gettext_noop("A value of -1 disables this feature."),
+ GUC_UNIT_MIN
+ },
+ &old_snapshot_threshold,
+ -1, -1, MINS_PER_HOUR * HOURS_PER_DAY * 60,
+ NULL, NULL, NULL
+ },
+
{
{"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER,
gettext_noop("Time between issuing TCP keepalives."),
#effective_io_concurrency = 1 # 1-1000; 0 disables prefetching
#max_worker_processes = 8
#max_parallel_degree = 0 # max number of worker processes per node
+#old_snapshot_threshold = -1 # 1min-60d; -1 disables; 0 is immediate
+ # (change requires restart)
#------------------------------------------------------------------------------
#include "access/transam.h"
#include "access/xact.h"
+#include "access/xlog.h"
+#include "catalog/catalog.h"
#include "lib/pairingheap.h"
#include "miscadmin.h"
#include "storage/predicate.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/sinval.h"
+#include "storage/spin.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
+#include "utils/rel.h"
#include "utils/resowner_private.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/tqual.h"
+/*
+ * GUC parameters
+ */
+int old_snapshot_threshold; /* number of minutes, -1 disables */
+
+/*
+ * Structure for dealing with old_snapshot_threshold implementation.
+ */
+typedef struct OldSnapshotControlData
+{
+ /*
+ * Variables for old snapshot handling are shared among processes and are
+ * only allowed to move forward.
+ */
+ slock_t mutex_current; /* protect current timestamp */
+ int64 current_timestamp; /* latest snapshot timestamp */
+ slock_t mutex_latest_xmin; /* protect latest snapshot xmin */
+ TransactionId latest_xmin; /* latest snapshot xmin */
+ slock_t mutex_threshold; /* protect threshold fields */
+ int64 threshold_timestamp; /* earlier snapshot is old */
+ TransactionId threshold_xid; /* earlier xid may be gone */
+
+ /*
+ * Keep one xid per minute for old snapshot error handling.
+ *
+ * Use a circular buffer with a head offset, a count of entries currently
+ * used, and a timestamp corresponding to the xid at the head offset. A
+ * count_used value of zero means that there are no times stored; a
+ * count_used value of old_snapshot_threshold means that the buffer is
+ * full and the head must be advanced to add new entries. Use timestamps
+ * aligned to minute boundaries, since that seems less surprising than
+ * aligning based on the first usage timestamp.
+ *
+ * It is OK if the xid for a given time slot is from earlier than
+ * calculated by adding the number of minutes corresponding to the
+ * (possibly wrapped) distance from the head offset to the time of the
+ * head entry, since that just results in the vacuuming of old tuples
+ * being slightly less aggressive. It would not be OK for it to be off in
+ * the other direction, since it might result in vacuuming tuples that are
+ * still expected to be there.
+ *
+ * Use of an SLRU was considered but not chosen because it is more
+ * heavyweight than is needed for this, and would probably not be any less
+ * code to implement.
+ *
+ * Persistence is not needed.
+ */
+ int head_offset; /* subscript of oldest tracked time */
+ int64 head_timestamp; /* time corresponding to head xid */
+ int count_used; /* how many slots are in use */
+ TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER];
+} OldSnapshotControlData;
+
+typedef struct OldSnapshotControlData *OldSnapshotControl;
+
+static volatile OldSnapshotControl oldSnapshotControl;
+
+
/*
* CurrentSnapshot points to the only snapshot taken in transaction-snapshot
* mode, and to the latest one taken in a read-committed transaction.
static List *exportedSnapshots = NIL;
/* Prototypes for local functions */
+static int64 AlignTimestampToMinuteBoundary(int64 ts);
static Snapshot CopySnapshot(Snapshot snapshot);
static void FreeSnapshot(Snapshot snapshot);
static void SnapshotResetXmin(void);
CommandId curcid;
} SerializedSnapshotData;
+Size
+SnapMgrShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(OldSnapshotControlData, xid_by_minute);
+ if (old_snapshot_threshold > 0)
+ size = add_size(size, mul_size(sizeof(TransactionId),
+ old_snapshot_threshold));
+
+ return size;
+}
+
+/*
+ * Initialize for managing old snapshot detection.
+ */
+void
+SnapMgrInit(void)
+{
+ bool found;
+
+ /*
+ * Create or attach to the OldSnapshotControl structure.
+ */
+ oldSnapshotControl = (OldSnapshotControl)
+ ShmemInitStruct("OldSnapshotControlData",
+ SnapMgrShmemSize(), &found);
+
+ if (!found)
+ {
+ SpinLockInit(&oldSnapshotControl->mutex_current);
+ oldSnapshotControl->current_timestamp = 0;
+ SpinLockInit(&oldSnapshotControl->mutex_latest_xmin);
+ oldSnapshotControl->latest_xmin = InvalidTransactionId;
+ SpinLockInit(&oldSnapshotControl->mutex_threshold);
+ oldSnapshotControl->threshold_timestamp = 0;
+ oldSnapshotControl->threshold_xid = InvalidTransactionId;
+ oldSnapshotControl->head_offset = 0;
+ oldSnapshotControl->head_timestamp = 0;
+ oldSnapshotControl->count_used = 0;
+ }
+}
+
/*
* GetTransactionSnapshot
* Get the appropriate snapshot for a new query in a transaction.
return false;
}
+
+/*
+ * Return an int64 timestamp which is exactly on a minute boundary.
+ *
+ * If the argument is already aligned, return that value, otherwise move to
+ * the next minute boundary following the given time.
+ */
+static int64
+AlignTimestampToMinuteBoundary(int64 ts)
+{
+ int64 retval = ts + (USECS_PER_MINUTE - 1);
+
+ return retval - (retval % USECS_PER_MINUTE);
+}
+
+/*
+ * Get current timestamp for snapshots as int64 that never moves backward.
+ */
+int64
+GetSnapshotCurrentTimestamp(void)
+{
+ int64 now = GetCurrentIntegerTimestamp();
+
+ /*
+ * Don't let time move backward; if it hasn't advanced, use the old value.
+ */
+ SpinLockAcquire(&oldSnapshotControl->mutex_current);
+ if (now <= oldSnapshotControl->current_timestamp)
+ now = oldSnapshotControl->current_timestamp;
+ else
+ oldSnapshotControl->current_timestamp = now;
+ SpinLockRelease(&oldSnapshotControl->mutex_current);
+
+ return now;
+}
+
+/*
+ * Get timestamp through which vacuum may have processed based on last stored
+ * value for threshold_timestamp.
+ *
+ * XXX: So far, we never trust that a 64-bit value can be read atomically; if
+ * that ever changes, we could get rid of the spinlock here.
+ */
+int64
+GetOldSnapshotThresholdTimestamp(void)
+{
+ int64 threshold_timestamp;
+
+ SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+ threshold_timestamp = oldSnapshotControl->threshold_timestamp;
+ SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+
+ return threshold_timestamp;
+}
+
+static void
+SetOldSnapshotThresholdTimestamp(int64 ts, TransactionId xlimit)
+{
+ SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+ oldSnapshotControl->threshold_timestamp = ts;
+ oldSnapshotControl->threshold_xid = xlimit;
+ SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+}
+
+/*
+ * TransactionIdLimitedForOldSnapshots
+ *
+ * Apply old snapshot limit, if any. This is intended to be called for page
+ * pruning and table vacuuming, to allow old_snapshot_threshold to override
+ * the normal global xmin value. Actual testing for snapshot too old will be
+ * based on whether a snapshot timestamp is prior to the threshold timestamp
+ * set in this function.
+ */
+TransactionId
+TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
+ Relation relation)
+{
+ if (TransactionIdIsNormal(recentXmin)
+ && old_snapshot_threshold >= 0
+ && RelationNeedsWAL(relation)
+ && !IsCatalogRelation(relation)
+ && !RelationIsAccessibleInLogicalDecoding(relation))
+ {
+ int64 ts = GetSnapshotCurrentTimestamp();
+ TransactionId xlimit = recentXmin;
+ TransactionId latest_xmin = oldSnapshotControl->latest_xmin;
+ bool same_ts_as_threshold = false;
+
+ /*
+ * Zero threshold always overrides to latest xmin, if valid. Without
+ * some heuristic it will find its own snapshot too old on, for
+ * example, a simple UPDATE -- which would make it useless for most
+ * testing, but there is no principled way to ensure that it doesn't
+ * fail in this way. Use a five-second delay to try to get useful
+ * testing behavior, but this may need adjustment.
+ */
+ if (old_snapshot_threshold == 0)
+ {
+ if (TransactionIdPrecedes(latest_xmin, MyPgXact->xmin)
+ && TransactionIdFollows(latest_xmin, xlimit))
+ xlimit = latest_xmin;
+
+ ts -= 5 * USECS_PER_SEC;
+ SetOldSnapshotThresholdTimestamp(ts, xlimit);
+
+ return xlimit;
+ }
+
+ ts = AlignTimestampToMinuteBoundary(ts)
+ - (old_snapshot_threshold * USECS_PER_MINUTE);
+
+ /* Check for fast exit without LW locking. */
+ SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+ if (ts == oldSnapshotControl->threshold_timestamp)
+ {
+ xlimit = oldSnapshotControl->threshold_xid;
+ same_ts_as_threshold = true;
+ }
+ SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+
+ if (!same_ts_as_threshold)
+ {
+ LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);
+
+ if (oldSnapshotControl->count_used > 0
+ && ts >= oldSnapshotControl->head_timestamp)
+ {
+ int offset;
+
+ offset = ((ts - oldSnapshotControl->head_timestamp)
+ / USECS_PER_MINUTE);
+ if (offset > oldSnapshotControl->count_used - 1)
+ offset = oldSnapshotControl->count_used - 1;
+ offset = (oldSnapshotControl->head_offset + offset)
+ % old_snapshot_threshold;
+ xlimit = oldSnapshotControl->xid_by_minute[offset];
+
+ if (NormalTransactionIdFollows(xlimit, recentXmin))
+ SetOldSnapshotThresholdTimestamp(ts, xlimit);
+ }
+
+ LWLockRelease(OldSnapshotTimeMapLock);
+ }
+
+ /*
+ * Failsafe protection against vacuuming work of active transaction.
+ *
+ * This is not an assertion because we avoid the spinlock for
+ * performance, leaving open the possibility that xlimit could advance
+ * and be more current; but it seems prudent to apply this limit. It
+ * might make pruning a tiny bit less agressive than it could be, but
+ * protects against data loss bugs.
+ */
+ if (TransactionIdIsNormal(latest_xmin)
+ && TransactionIdPrecedes(latest_xmin, xlimit))
+ xlimit = latest_xmin;
+
+ if (NormalTransactionIdFollows(xlimit, recentXmin))
+ return xlimit;
+ }
+
+ return recentXmin;
+}
+
+/*
+ * Take care of the circular buffer that maps time to xid.
+ */
+void
+MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin)
+{
+ int64 ts;
+
+ /* Fast exit when old_snapshot_threshold is not used. */
+ if (old_snapshot_threshold < 0)
+ return;
+
+ /* Keep track of the latest xmin seen by any process. */
+ SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
+ if (TransactionIdFollows(xmin, oldSnapshotControl->latest_xmin))
+ oldSnapshotControl->latest_xmin = xmin;
+ SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
+
+ /* No further tracking needed for 0 (used for testing). */
+ if (old_snapshot_threshold == 0)
+ return;
+
+ /*
+ * We don't want to do something stupid with unusual values, but we don't
+ * want to litter the log with warnings or break otherwise normal
+ * processing for this feature; so if something seems unreasonable, just
+ * log at DEBUG level and return without doing anything.
+ */
+ if (whenTaken < 0)
+ {
+ elog(DEBUG1,
+ "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
+ (long) whenTaken);
+ return;
+ }
+ if (!TransactionIdIsNormal(xmin))
+ {
+ elog(DEBUG1,
+ "MaintainOldSnapshotTimeMapping called with xmin = %lu",
+ (unsigned long) xmin);
+ return;
+ }
+
+ ts = AlignTimestampToMinuteBoundary(whenTaken);
+
+ LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE);
+
+ Assert(oldSnapshotControl->head_offset >= 0);
+ Assert(oldSnapshotControl->head_offset < old_snapshot_threshold);
+ Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0);
+ Assert(oldSnapshotControl->count_used >= 0);
+ Assert(oldSnapshotControl->count_used <= old_snapshot_threshold);
+
+ if (oldSnapshotControl->count_used == 0)
+ {
+ /* set up first entry for empty mapping */
+ oldSnapshotControl->head_offset = 0;
+ oldSnapshotControl->head_timestamp = ts;
+ oldSnapshotControl->count_used = 1;
+ oldSnapshotControl->xid_by_minute[0] = xmin;
+ }
+ else if (ts < oldSnapshotControl->head_timestamp)
+ {
+ /* old ts; log it at DEBUG */
+ LWLockRelease(OldSnapshotTimeMapLock);
+ elog(DEBUG1,
+ "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
+ (long) whenTaken);
+ return;
+ }
+ else if (ts <= (oldSnapshotControl->head_timestamp +
+ ((oldSnapshotControl->count_used - 1)
+ * USECS_PER_MINUTE)))
+ {
+ /* existing mapping; advance xid if possible */
+ int bucket = (oldSnapshotControl->head_offset
+ + ((ts - oldSnapshotControl->head_timestamp)
+ / USECS_PER_MINUTE))
+ % old_snapshot_threshold;
+
+ if (TransactionIdPrecedes(oldSnapshotControl->xid_by_minute[bucket], xmin))
+ oldSnapshotControl->xid_by_minute[bucket] = xmin;
+ }
+ else
+ {
+ /* We need a new bucket, but it might not be the very next one. */
+ int advance = ((ts - oldSnapshotControl->head_timestamp)
+ / USECS_PER_MINUTE);
+
+ oldSnapshotControl->head_timestamp = ts;
+
+ if (advance >= old_snapshot_threshold)
+ {
+ /* Advance is so far that all old data is junk; start over. */
+ oldSnapshotControl->head_offset = 0;
+ oldSnapshotControl->count_used = 1;
+ oldSnapshotControl->xid_by_minute[0] = xmin;
+ }
+ else
+ {
+ /* Store the new value in one or more buckets. */
+ int i;
+
+ for (i = 0; i < advance; i++)
+ {
+ if (oldSnapshotControl->count_used == old_snapshot_threshold)
+ {
+ /* Map full and new value replaces old head. */
+ int old_head = oldSnapshotControl->head_offset;
+
+ if (old_head == (old_snapshot_threshold - 1))
+ oldSnapshotControl->head_offset = 0;
+ else
+ oldSnapshotControl->head_offset = old_head + 1;
+ oldSnapshotControl->xid_by_minute[old_head] = xmin;
+ }
+ else
+ {
+ /* Extend map to unused entry. */
+ int new_tail = (oldSnapshotControl->head_offset
+ + oldSnapshotControl->count_used)
+ % old_snapshot_threshold;
+
+ oldSnapshotControl->count_used++;
+ oldSnapshotControl->xid_by_minute[new_tail] = xmin;
+ }
+ }
+ }
+ }
+
+ LWLockRelease(OldSnapshotTimeMapLock);
+}
+
+
/*
* Setup a snapshot that replaces normal catalog snapshots that allows catalog
* access to behave just like it did at a certain point in the past.
#include "storage/itemptr.h"
#include "storage/off.h"
#include "utils/relcache.h"
+#include "utils/snapshot.h"
/* struct definition lives in brin_revmap.c */
typedef struct BrinRevmap BrinRevmap;
extern BrinRevmap *brinRevmapInitialize(Relation idxrel,
- BlockNumber *pagesPerRange);
+ BlockNumber *pagesPerRange, Snapshot snapshot);
extern void brinRevmapTerminate(BrinRevmap *revmap);
extern void brinRevmapExtend(BrinRevmap *revmap,
BlockNumber heapBlk, ItemPointerData tid);
extern BrinTuple *brinGetTupleForHeapBlock(BrinRevmap *revmap,
BlockNumber heapBlk, Buffer *buf, OffsetNumber *off,
- Size *size, int mode);
+ Size *size, int mode, Snapshot snapshot);
#endif /* BRIN_REVMAP_H */
* PostingItem
*/
-extern GinBtreeStack *ginFindLeafPage(GinBtree btree, bool searchMode);
+extern GinBtreeStack *ginFindLeafPage(GinBtree btree, bool searchMode, Snapshot snapshot);
extern Buffer ginStepRight(Buffer buffer, Relation index, int lockmode);
extern void freeGinBtreeStack(GinBtreeStack *stack);
extern void ginInsertValue(GinBtree btree, GinBtreeStack *stack,
extern void ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
ItemPointerData *items, uint32 nitem,
GinStatsData *buildStats);
-extern GinBtreeStack *ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno);
+extern GinBtreeStack *ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno, Snapshot snapshot);
extern void ginDataFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage);
extern void ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno);
*/
extern BTStack _bt_search(Relation rel,
int keysz, ScanKey scankey, bool nextkey,
- Buffer *bufP, int access);
+ Buffer *bufP, int access, Snapshot snapshot);
extern Buffer _bt_moveright(Relation rel, Buffer buf, int keysz,
ScanKey scankey, bool nextkey, bool forupdate, BTStack stack,
- int access);
+ int access, Snapshot snapshot);
extern OffsetNumber _bt_binsrch(Relation rel, Buffer buf, int keysz,
ScanKey scankey, bool nextkey);
extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey,
Page page, OffsetNumber offnum);
extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
extern bool _bt_next(IndexScanDesc scan, ScanDirection dir);
-extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost);
+extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
+ Snapshot snapshot);
/*
* prototypes for functions in nbtutils.c
/*
* BufferGetPage
* Returns the page associated with a buffer.
+ *
+ * agetest will normally be a literal, so use a macro at the outer level to
+ * give the compiler a chance to optimize away the runtime code to check it.
+ *
+ * TestForOldSnapshot(), if it doesn't throw an error, will return the page
+ * argument it is passed, so the same result will go back to this macro's
+ * caller for either agetest value; it is a matter of whether to call the
+ * function to perform the test. For call sites where the check is not needed
+ * (which is the vast majority of them), the snapshot and relation parameters
+ * can, and generally should, be NULL.
*/
#define BufferGetPage(buffer, snapshot, relation, agetest) \
( \
- AssertMacro((agetest) == BGP_NO_SNAPSHOT_TEST), \
- ((Page)BufferGetBlock(buffer)) \
+ ( \
+ AssertMacro((agetest) == BGP_NO_SNAPSHOT_TEST || (agetest) == BGP_TEST_FOR_OLD_SNAPSHOT), \
+ ((agetest) == BGP_NO_SNAPSHOT_TEST) \
+ ) ? \
+ ((Page)BufferGetBlock(buffer)) \
+ : \
+ (TestForOldSnapshot(snapshot, relation, (Page)BufferGetBlock(buffer))) \
)
/*
#define REL_H
#include "access/tupdesc.h"
+#include "access/xlog.h"
#include "catalog/pg_class.h"
#include "catalog/pg_index.h"
#include "fmgr.h"
#define SNAPMGR_H
#include "fmgr.h"
+#include "utils/relcache.h"
#include "utils/resowner.h"
#include "utils/snapshot.h"
+/* GUC variables */
+extern int old_snapshot_threshold;
+
+
+extern Size SnapMgrShmemSize(void);
+extern void SnapMgrInit(void);
+extern int64 GetSnapshotCurrentTimestamp(void);
+extern int64 GetOldSnapshotThresholdTimestamp(void);
+
extern bool FirstSnapshotSet;
extern TransactionId TransactionXmin;
extern bool XactHasExportedSnapshots(void);
extern void DeleteAllExportedSnapshotFiles(void);
extern bool ThereAreNoPriorRegisteredSnapshots(void);
+extern TransactionId TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
+ Relation relation);
+extern void MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin);
extern char *ExportSnapshot(Snapshot snapshot);
#define SNAPSHOT_H
#include "access/htup.h"
+#include "access/xlogdefs.h"
#include "lib/pairingheap.h"
#include "storage/buf.h"
uint32 active_count; /* refcount on ActiveSnapshot stack */
uint32 regd_count; /* refcount on RegisteredSnapshots */
pairingheap_node ph_node; /* link in the RegisteredSnapshots heap */
+
+ int64 whenTaken; /* timestamp when snapshot was taken */
+ XLogRecPtr lsn; /* position in the WAL stream when taken */
} SnapshotData;
/*
brin \
commit_ts \
dummy_seclabel \
+ snapshot_too_old \
test_ddl_deparse \
test_extensions \
test_parser \
--- /dev/null
+# src/test/modules/snapshot_too_old/Makefile
+
+EXTRA_CLEAN = ./isolation_output
+
+ISOLATIONCHECKS=sto_using_cursor sto_using_select
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/snapshot_too_old
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
+
+# Disabled because these tests require "old_snapshot_threshold" >= 0, which
+# typical installcheck users do not have (e.g. buildfarm clients).
+installcheck:;
+
+# But it can nonetheless be very helpful to run tests on preexisting
+# installation, allow to do so, but only if requested explicitly.
+installcheck-force: isolationcheck-install-force
+
+check: isolationcheck
+
+submake-isolation:
+ $(MAKE) -C $(top_builddir)/src/test/isolation all
+
+submake-test_snapshot_too_old:
+ $(MAKE) -C $(top_builddir)/src/test/modules/snapshot_too_old
+
+isolationcheck: | submake-isolation submake-test_snapshot_too_old temp-install
+ $(MKDIR_P) isolation_output
+ $(pg_isolation_regress_check) \
+ --temp-config $(top_srcdir)/src/test/modules/snapshot_too_old/sto.conf \
+ --outputdir=./isolation_output \
+ $(ISOLATIONCHECKS)
+
+isolationcheck-install-force: all | submake-isolation submake-test_snapshot_too_old temp-install
+ $(pg_isolation_regress_installcheck) \
+ $(ISOLATIONCHECKS)
+
+.PHONY: check submake-test_snapshot_too_old isolationcheck isolationcheck-install-force
+
+temp-install: EXTRA_INSTALL=src/test/modules/snapshot_too_old
--- /dev/null
+Parsed test spec with 2 sessions
+
+starting permutation: s1decl s1f1 s1sleep s1f2 s2u
+step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
+step s1f1: FETCH FIRST FROM cursor1;
+c
+
+1
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting pg_sleep
+
+0
+step s1f2: FETCH FIRST FROM cursor1;
+c
+
+1
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+
+starting permutation: s1decl s1f1 s1sleep s2u s1f2
+step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
+step s1f1: FETCH FIRST FROM cursor1;
+c
+
+1
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting pg_sleep
+
+0
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1f2: FETCH FIRST FROM cursor1;
+ERROR: snapshot too old
+
+starting permutation: s1decl s1f1 s2u s1sleep s1f2
+step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
+step s1f1: FETCH FIRST FROM cursor1;
+c
+
+1
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting pg_sleep
+
+0
+step s1f2: FETCH FIRST FROM cursor1;
+ERROR: snapshot too old
+
+starting permutation: s1decl s2u s1f1 s1sleep s1f2
+step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1f1: FETCH FIRST FROM cursor1;
+c
+
+1
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting pg_sleep
+
+0
+step s1f2: FETCH FIRST FROM cursor1;
+ERROR: snapshot too old
+
+starting permutation: s2u s1decl s1f1 s1sleep s1f2
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1decl: DECLARE cursor1 CURSOR FOR SELECT c FROM sto1;
+step s1f1: FETCH FIRST FROM cursor1;
+c
+
+2
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting pg_sleep
+
+0
+step s1f2: FETCH FIRST FROM cursor1;
+ERROR: snapshot too old
--- /dev/null
+Parsed test spec with 2 sessions
+
+starting permutation: s1f1 s1sleep s1f2 s2u
+step s1f1: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+c
+
+1
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting pg_sleep
+
+0
+step s1f2: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+c
+
+1
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+
+starting permutation: s1f1 s1sleep s2u s1f2
+step s1f1: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+c
+
+1
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting pg_sleep
+
+0
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1f2: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+ERROR: snapshot too old
+
+starting permutation: s1f1 s2u s1sleep s1f2
+step s1f1: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+c
+
+1
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting pg_sleep
+
+0
+step s1f2: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+ERROR: snapshot too old
+
+starting permutation: s2u s1f1 s1sleep s1f2
+step s2u: UPDATE sto1 SET c = 1001 WHERE c = 1;
+step s1f1: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+c
+
+2
+step s1sleep: SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold';
+setting pg_sleep
+
+0
+step s1f2: SELECT c FROM sto1 ORDER BY c LIMIT 1;
+ERROR: snapshot too old
--- /dev/null
+# This test provokes a "snapshot too old" error using a cursor.
+#
+# The sleep is needed because with a threshold of zero a statement could error
+# on changes it made. With more normal settings no external delay is needed,
+# but we don't want these tests to run long enough to see that, since
+# granularity is in minutes.
+#
+# Since results depend on the value of old_snapshot_threshold, sneak that into
+# the line generated by the sleep, so that a surprising values isn't so hard
+# to identify.
+
+setup
+{
+ CREATE TABLE sto1 (c int NOT NULL);
+ INSERT INTO sto1 SELECT generate_series(1, 1000);
+ CREATE TABLE sto2 (c int NOT NULL);
+}
+setup
+{
+ VACUUM ANALYZE sto1;
+}
+
+teardown
+{
+ DROP TABLE sto1, sto2;
+}
+
+session "s1"
+setup { BEGIN ISOLATION LEVEL REPEATABLE READ; }
+step "s1decl" { DECLARE cursor1 CURSOR FOR SELECT c FROM sto1; }
+step "s1f1" { FETCH FIRST FROM cursor1; }
+step "s1sleep" { SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold'; }
+step "s1f2" { FETCH FIRST FROM cursor1; }
+teardown { COMMIT; }
+
+session "s2"
+step "s2u" { UPDATE sto1 SET c = 1001 WHERE c = 1; }
--- /dev/null
+# This test provokes a "snapshot too old" error using SELECT statements.
+#
+# The sleep is needed because with a threshold of zero a statement could error
+# on changes it made. With more normal settings no external delay is needed,
+# but we don't want these tests to run long enough to see that, since
+# granularity is in minutes.
+#
+# Since results depend on the value of old_snapshot_threshold, sneak that into
+# the line generated by the sleep, so that a surprising values isn't so hard
+# to identify.
+
+setup
+{
+ CREATE TABLE sto1 (c int NOT NULL);
+ INSERT INTO sto1 SELECT generate_series(1, 1000);
+ CREATE TABLE sto2 (c int NOT NULL);
+}
+setup
+{
+ VACUUM ANALYZE sto1;
+}
+
+teardown
+{
+ DROP TABLE sto1, sto2;
+}
+
+session "s1"
+setup { BEGIN ISOLATION LEVEL REPEATABLE READ; }
+step "s1f1" { SELECT c FROM sto1 ORDER BY c LIMIT 1; }
+step "s1sleep" { SELECT setting, pg_sleep(6) FROM pg_settings WHERE name = 'old_snapshot_threshold'; }
+step "s1f2" { SELECT c FROM sto1 ORDER BY c LIMIT 1; }
+teardown { COMMIT; }
+
+session "s2"
+step "s2u" { UPDATE sto1 SET c = 1001 WHERE c = 1; }
--- /dev/null
+autovacuum = off
+old_snapshot_threshold = 0
+