1 /*-------------------------------------------------------------------------
4 * PostgreSQL snapshot manager
6 * We keep track of snapshots in two ways: those "registered" by resowner.c,
7 * and the "active snapshot" stack. All snapshots in either of them live in
8 * persistent memory. When a snapshot is no longer in any of these lists
9 * (tracked by separate refcounts on each snapshot), its memory can be freed.
11 * The FirstXactSnapshot, if any, is treated a bit specially: we increment its
12 * regd_count and list it in RegisteredSnapshots, but this reference is not
13 * tracked by a resource owner. We used to use the TopTransactionResourceOwner
14 * to track this snapshot reference, but that introduces logical circularity
15 * and thus makes it impossible to clean up in a sane fashion. It's better to
16 * handle this reference as an internally-tracked registration, so that this
17 * module is entirely lower-level than ResourceOwners.
19 * Likewise, any snapshots that have been exported by pg_export_snapshot
20 * have regd_count = 1 and are listed in RegisteredSnapshots, but are not
21 * tracked by any resource owner.
23 * Likewise, the CatalogSnapshot is listed in RegisteredSnapshots when it
24 * is valid, but is not tracked by any resource owner.
26 * The same is true for historic snapshots used during logical decoding,
27 * their lifetime is managed separately (as they live longer than one xact.c
30 * These arrangements let us reset MyPgXact->xmin when there are no snapshots
31 * referenced by this transaction, and advance it when the one with oldest
32 * Xmin is no longer referenced. For simplicity however, only registered
33 * snapshots not active snapshots participate in tracking which one is oldest;
34 * we don't try to change MyPgXact->xmin except when the active-snapshot
38 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
39 * Portions Copyright (c) 1994, Regents of the University of California
42 * src/backend/utils/time/snapmgr.c
44 *-------------------------------------------------------------------------
51 #include "access/transam.h"
52 #include "access/xact.h"
53 #include "access/xlog.h"
54 #include "catalog/catalog.h"
55 #include "lib/pairingheap.h"
56 #include "miscadmin.h"
57 #include "storage/predicate.h"
58 #include "storage/proc.h"
59 #include "storage/procarray.h"
60 #include "storage/sinval.h"
61 #include "storage/sinvaladt.h"
62 #include "storage/spin.h"
63 #include "utils/builtins.h"
64 #include "utils/memutils.h"
65 #include "utils/rel.h"
66 #include "utils/resowner_private.h"
67 #include "utils/snapmgr.h"
68 #include "utils/syscache.h"
69 #include "utils/tqual.h"
75 int old_snapshot_threshold; /* number of minutes, -1 disables */
78 * Structure for dealing with old_snapshot_threshold implementation.
80 typedef struct OldSnapshotControlData
83 * Variables for old snapshot handling are shared among processes and are
84 * only allowed to move forward.
86 slock_t mutex_current; /* protect current_timestamp */
87 TimestampTz current_timestamp; /* latest snapshot timestamp */
88 slock_t mutex_latest_xmin; /* protect latest_xmin and next_map_update */
89 TransactionId latest_xmin; /* latest snapshot xmin */
90 TimestampTz next_map_update; /* latest snapshot valid up to */
91 slock_t mutex_threshold; /* protect threshold fields */
92 TimestampTz threshold_timestamp; /* earlier snapshot is old */
93 TransactionId threshold_xid; /* earlier xid may be gone */
96 * Keep one xid per minute for old snapshot error handling.
98 * Use a circular buffer with a head offset, a count of entries currently
99 * used, and a timestamp corresponding to the xid at the head offset. A
100 * count_used value of zero means that there are no times stored; a
101 * count_used value of OLD_SNAPSHOT_TIME_MAP_ENTRIES means that the buffer
102 * is full and the head must be advanced to add new entries. Use
103 * timestamps aligned to minute boundaries, since that seems less
104 * surprising than aligning based on the first usage timestamp. The
105 * latest bucket is effectively stored within latest_xmin. The circular
106 * buffer is updated when we get a new xmin value that doesn't fall into
109 * It is OK if the xid for a given time slot is from earlier than
110 * calculated by adding the number of minutes corresponding to the
111 * (possibly wrapped) distance from the head offset to the time of the
112 * head entry, since that just results in the vacuuming of old tuples
113 * being slightly less aggressive. It would not be OK for it to be off in
114 * the other direction, since it might result in vacuuming tuples that are
115 * still expected to be there.
117 * Use of an SLRU was considered but not chosen because it is more
118 * heavyweight than is needed for this, and would probably not be any less
121 * Persistence is not needed.
123 int head_offset; /* subscript of oldest tracked time */
124 TimestampTz head_timestamp; /* time corresponding to head xid */
125 int count_used; /* how many slots are in use */
126 TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER];
127 } OldSnapshotControlData;
129 static volatile OldSnapshotControlData *oldSnapshotControl;
133 * CurrentSnapshot points to the only snapshot taken in transaction-snapshot
134 * mode, and to the latest one taken in a read-committed transaction.
135 * SecondarySnapshot is a snapshot that's always up-to-date as of the current
136 * instant, even in transaction-snapshot mode. It should only be used for
137 * special-purpose code (say, RI checking.) CatalogSnapshot points to an
138 * MVCC snapshot intended to be used for catalog scans; we must invalidate it
139 * whenever a system catalog change occurs.
141 * These SnapshotData structs are static to simplify memory allocation
142 * (see the hack in GetSnapshotData to avoid repeated malloc/free).
144 static SnapshotData CurrentSnapshotData = {HeapTupleSatisfiesMVCC};
145 static SnapshotData SecondarySnapshotData = {HeapTupleSatisfiesMVCC};
146 SnapshotData CatalogSnapshotData = {HeapTupleSatisfiesMVCC};
148 /* Pointers to valid snapshots */
149 static Snapshot CurrentSnapshot = NULL;
150 static Snapshot SecondarySnapshot = NULL;
151 static Snapshot CatalogSnapshot = NULL;
152 static Snapshot HistoricSnapshot = NULL;
155 * These are updated by GetSnapshotData. We initialize them this way
156 * for the convenience of TransactionIdIsInProgress: even in bootstrap
157 * mode, we don't want it to say that BootstrapTransactionId is in progress.
159 * RecentGlobalXmin and RecentGlobalDataXmin are initialized to
160 * InvalidTransactionId, to ensure that no one tries to use a stale
161 * value. Readers should ensure that it has been set to something else
164 TransactionId TransactionXmin = FirstNormalTransactionId;
165 TransactionId RecentXmin = FirstNormalTransactionId;
166 TransactionId RecentGlobalXmin = InvalidTransactionId;
167 TransactionId RecentGlobalDataXmin = InvalidTransactionId;
169 /* (table, ctid) => (cmin, cmax) mapping during timetravel */
170 static HTAB *tuplecid_data = NULL;
173 * Elements of the active snapshot stack.
175 * Each element here accounts for exactly one active_count on SnapshotData.
177 * NB: the code assumes that elements in this list are in non-increasing
178 * order of as_level; also, the list must be NULL-terminated.
180 typedef struct ActiveSnapshotElt
184 struct ActiveSnapshotElt *as_next;
187 /* Top of the stack of active snapshots */
188 static ActiveSnapshotElt *ActiveSnapshot = NULL;
190 /* Bottom of the stack of active snapshots */
191 static ActiveSnapshotElt *OldestActiveSnapshot = NULL;
194 * Currently registered Snapshots. Ordered in a heap by xmin, so that we can
195 * quickly find the one with lowest xmin, to advance our MyPgXact->xmin.
197 static int xmin_cmp(const pairingheap_node *a, const pairingheap_node *b,
200 static pairingheap RegisteredSnapshots = {&xmin_cmp, NULL, NULL};
202 /* first GetTransactionSnapshot call in a transaction? */
203 bool FirstSnapshotSet = false;
206 * Remember the serializable transaction snapshot, if any. We cannot trust
207 * FirstSnapshotSet in combination with IsolationUsesXactSnapshot(), because
208 * GUC may be reset before us, changing the value of IsolationUsesXactSnapshot.
210 static Snapshot FirstXactSnapshot = NULL;
212 /* Define pathname of exported-snapshot files */
213 #define SNAPSHOT_EXPORT_DIR "pg_snapshots"
215 /* Structure holding info about exported snapshot. */
216 typedef struct ExportedSnapshot
222 /* Current xact's exported snapshots (a list of ExportedSnapshot structs) */
223 static List *exportedSnapshots = NIL;
225 /* Prototypes for local functions */
226 static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts);
227 static Snapshot CopySnapshot(Snapshot snapshot);
228 static void FreeSnapshot(Snapshot snapshot);
229 static void SnapshotResetXmin(void);
232 * Snapshot fields to be serialized.
234 * Only these fields need to be sent to the cooperating backend; the
235 * remaining ones can (and must) be set by the receiver upon restore.
237 typedef struct SerializedSnapshotData
244 bool takenDuringRecovery;
246 TimestampTz whenTaken;
248 } SerializedSnapshotData;
251 SnapMgrShmemSize(void)
255 size = offsetof(OldSnapshotControlData, xid_by_minute);
256 if (old_snapshot_threshold > 0)
257 size = add_size(size, mul_size(sizeof(TransactionId),
258 OLD_SNAPSHOT_TIME_MAP_ENTRIES));
264 * Initialize for managing old snapshot detection.
272 * Create or attach to the OldSnapshotControlData structure.
274 oldSnapshotControl = (volatile OldSnapshotControlData *)
275 ShmemInitStruct("OldSnapshotControlData",
276 SnapMgrShmemSize(), &found);
280 SpinLockInit(&oldSnapshotControl->mutex_current);
281 oldSnapshotControl->current_timestamp = 0;
282 SpinLockInit(&oldSnapshotControl->mutex_latest_xmin);
283 oldSnapshotControl->latest_xmin = InvalidTransactionId;
284 oldSnapshotControl->next_map_update = 0;
285 SpinLockInit(&oldSnapshotControl->mutex_threshold);
286 oldSnapshotControl->threshold_timestamp = 0;
287 oldSnapshotControl->threshold_xid = InvalidTransactionId;
288 oldSnapshotControl->head_offset = 0;
289 oldSnapshotControl->head_timestamp = 0;
290 oldSnapshotControl->count_used = 0;
295 * GetTransactionSnapshot
296 * Get the appropriate snapshot for a new query in a transaction.
298 * Note that the return value may point at static storage that will be modified
299 * by future calls and by CommandCounterIncrement(). Callers should call
300 * RegisterSnapshot or PushActiveSnapshot on the returned snap if it is to be
304 GetTransactionSnapshot(void)
307 * Return historic snapshot if doing logical decoding. We'll never need a
308 * non-historic transaction snapshot in this (sub-)transaction, so there's
309 * no need to be careful to set one up for later calls to
310 * GetTransactionSnapshot().
312 if (HistoricSnapshotActive())
314 Assert(!FirstSnapshotSet);
315 return HistoricSnapshot;
318 /* First call in transaction? */
319 if (!FirstSnapshotSet)
322 * Don't allow catalog snapshot to be older than xact snapshot. Must
323 * do this first to allow the empty-heap Assert to succeed.
325 InvalidateCatalogSnapshot();
327 Assert(pairingheap_is_empty(&RegisteredSnapshots));
328 Assert(FirstXactSnapshot == NULL);
330 if (IsInParallelMode())
332 "cannot take query snapshot during a parallel operation");
335 * In transaction-snapshot mode, the first snapshot must live until
336 * end of xact regardless of what the caller does with it, so we must
337 * make a copy of it rather than returning CurrentSnapshotData
338 * directly. Furthermore, if we're running in serializable mode,
339 * predicate.c needs to wrap the snapshot fetch in its own processing.
341 if (IsolationUsesXactSnapshot())
343 /* First, create the snapshot in CurrentSnapshotData */
344 if (IsolationIsSerializable())
345 CurrentSnapshot = GetSerializableTransactionSnapshot(&CurrentSnapshotData);
347 CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
348 /* Make a saved copy */
349 CurrentSnapshot = CopySnapshot(CurrentSnapshot);
350 FirstXactSnapshot = CurrentSnapshot;
351 /* Mark it as "registered" in FirstXactSnapshot */
352 FirstXactSnapshot->regd_count++;
353 pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
356 CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
358 FirstSnapshotSet = true;
359 return CurrentSnapshot;
362 if (IsolationUsesXactSnapshot())
363 return CurrentSnapshot;
365 /* Don't allow catalog snapshot to be older than xact snapshot. */
366 InvalidateCatalogSnapshot();
368 CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
370 return CurrentSnapshot;
375 * Get a snapshot that is up-to-date as of the current instant,
376 * even if we are executing in transaction-snapshot mode.
379 GetLatestSnapshot(void)
382 * We might be able to relax this, but nothing that could otherwise work
385 if (IsInParallelMode())
387 "cannot update SecondarySnapshot during a parallel operation");
390 * So far there are no cases requiring support for GetLatestSnapshot()
391 * during logical decoding, but it wouldn't be hard to add if required.
393 Assert(!HistoricSnapshotActive());
395 /* If first call in transaction, go ahead and set the xact snapshot */
396 if (!FirstSnapshotSet)
397 return GetTransactionSnapshot();
399 SecondarySnapshot = GetSnapshotData(&SecondarySnapshotData);
401 return SecondarySnapshot;
407 * Get the transaction's oldest known snapshot, as judged by the LSN.
408 * Will return NULL if there are no active or registered snapshots.
411 GetOldestSnapshot(void)
413 Snapshot OldestRegisteredSnapshot = NULL;
414 XLogRecPtr RegisteredLSN = InvalidXLogRecPtr;
416 if (!pairingheap_is_empty(&RegisteredSnapshots))
418 OldestRegisteredSnapshot = pairingheap_container(SnapshotData, ph_node,
419 pairingheap_first(&RegisteredSnapshots));
420 RegisteredLSN = OldestRegisteredSnapshot->lsn;
423 if (OldestActiveSnapshot != NULL)
425 XLogRecPtr ActiveLSN = OldestActiveSnapshot->as_snap->lsn;
427 if (XLogRecPtrIsInvalid(RegisteredLSN) || RegisteredLSN > ActiveLSN)
428 return OldestActiveSnapshot->as_snap;
431 return OldestRegisteredSnapshot;
436 * Get a snapshot that is sufficiently up-to-date for scan of the
437 * system catalog with the specified OID.
440 GetCatalogSnapshot(Oid relid)
443 * Return historic snapshot while we're doing logical decoding, so we can
444 * see the appropriate state of the catalog.
446 * This is the primary reason for needing to reset the system caches after
447 * finishing decoding.
449 if (HistoricSnapshotActive())
450 return HistoricSnapshot;
452 return GetNonHistoricCatalogSnapshot(relid);
456 * GetNonHistoricCatalogSnapshot
457 * Get a snapshot that is sufficiently up-to-date for scan of the system
458 * catalog with the specified OID, even while historic snapshots are set
462 GetNonHistoricCatalogSnapshot(Oid relid)
465 * If the caller is trying to scan a relation that has no syscache, no
466 * catcache invalidations will be sent when it is updated. For a few key
467 * relations, snapshot invalidations are sent instead. If we're trying to
468 * scan a relation for which neither catcache nor snapshot invalidations
469 * are sent, we must refresh the snapshot every time.
471 if (CatalogSnapshot &&
472 !RelationInvalidatesSnapshotsOnly(relid) &&
473 !RelationHasSysCache(relid))
474 InvalidateCatalogSnapshot();
476 if (CatalogSnapshot == NULL)
478 /* Get new snapshot. */
479 CatalogSnapshot = GetSnapshotData(&CatalogSnapshotData);
482 * Make sure the catalog snapshot will be accounted for in decisions
483 * about advancing PGXACT->xmin. We could apply RegisterSnapshot, but
484 * that would result in making a physical copy, which is overkill; and
485 * it would also create a dependency on some resource owner, which we
486 * do not want for reasons explained at the head of this file. Instead
487 * just shove the CatalogSnapshot into the pairing heap manually. This
488 * has to be reversed in InvalidateCatalogSnapshot, of course.
490 * NB: it had better be impossible for this to throw error, since the
491 * CatalogSnapshot pointer is already valid.
493 pairingheap_add(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
496 return CatalogSnapshot;
500 * InvalidateCatalogSnapshot
501 * Mark the current catalog snapshot, if any, as invalid
503 * We could change this API to allow the caller to provide more fine-grained
504 * invalidation details, so that a change to relation A wouldn't prevent us
505 * from using our cached snapshot to scan relation B, but so far there's no
506 * evidence that the CPU cycles we spent tracking such fine details would be
510 InvalidateCatalogSnapshot(void)
514 pairingheap_remove(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
515 CatalogSnapshot = NULL;
521 * InvalidateCatalogSnapshotConditionally
522 * Drop catalog snapshot if it's the only one we have
524 * This is called when we are about to wait for client input, so we don't
525 * want to continue holding the catalog snapshot if it might mean that the
526 * global xmin horizon can't advance. However, if there are other snapshots
527 * still active or registered, the catalog snapshot isn't likely to be the
528 * oldest one, so we might as well keep it.
531 InvalidateCatalogSnapshotConditionally(void)
533 if (CatalogSnapshot &&
534 ActiveSnapshot == NULL &&
535 pairingheap_is_singular(&RegisteredSnapshots))
536 InvalidateCatalogSnapshot();
540 * SnapshotSetCommandId
541 * Propagate CommandCounterIncrement into the static snapshots, if set
544 SnapshotSetCommandId(CommandId curcid)
546 if (!FirstSnapshotSet)
550 CurrentSnapshot->curcid = curcid;
551 if (SecondarySnapshot)
552 SecondarySnapshot->curcid = curcid;
553 /* Should we do the same with CatalogSnapshot? */
557 * SetTransactionSnapshot
558 * Set the transaction's snapshot from an imported MVCC snapshot.
560 * Note that this is very closely tied to GetTransactionSnapshot --- it
561 * must take care of all the same considerations as the first-snapshot case
562 * in GetTransactionSnapshot.
565 SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
566 int sourcepid, PGPROC *sourceproc)
568 /* Caller should have checked this already */
569 Assert(!FirstSnapshotSet);
571 /* Better do this to ensure following Assert succeeds. */
572 InvalidateCatalogSnapshot();
574 Assert(pairingheap_is_empty(&RegisteredSnapshots));
575 Assert(FirstXactSnapshot == NULL);
576 Assert(!HistoricSnapshotActive());
579 * Even though we are not going to use the snapshot it computes, we must
580 * call GetSnapshotData, for two reasons: (1) to be sure that
581 * CurrentSnapshotData's XID arrays have been allocated, and (2) to update
582 * RecentXmin and RecentGlobalXmin. (We could alternatively include those
583 * two variables in exported snapshot files, but it seems better to have
584 * snapshot importers compute reasonably up-to-date values for them.)
586 CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
589 * Now copy appropriate fields from the source snapshot.
591 CurrentSnapshot->xmin = sourcesnap->xmin;
592 CurrentSnapshot->xmax = sourcesnap->xmax;
593 CurrentSnapshot->xcnt = sourcesnap->xcnt;
594 Assert(sourcesnap->xcnt <= GetMaxSnapshotXidCount());
595 memcpy(CurrentSnapshot->xip, sourcesnap->xip,
596 sourcesnap->xcnt * sizeof(TransactionId));
597 CurrentSnapshot->subxcnt = sourcesnap->subxcnt;
598 Assert(sourcesnap->subxcnt <= GetMaxSnapshotSubxidCount());
599 memcpy(CurrentSnapshot->subxip, sourcesnap->subxip,
600 sourcesnap->subxcnt * sizeof(TransactionId));
601 CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed;
602 CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery;
603 /* NB: curcid should NOT be copied, it's a local matter */
606 * Now we have to fix what GetSnapshotData did with MyPgXact->xmin and
607 * TransactionXmin. There is a race condition: to make sure we are not
608 * causing the global xmin to go backwards, we have to test that the
609 * source transaction is still running, and that has to be done
610 * atomically. So let procarray.c do it.
612 * Note: in serializable mode, predicate.c will do this a second time. It
613 * doesn't seem worth contorting the logic here to avoid two calls,
614 * especially since it's not clear that predicate.c *must* do this.
616 if (sourceproc != NULL)
618 if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc))
620 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
621 errmsg("could not import the requested snapshot"),
622 errdetail("The source transaction is not running anymore.")));
624 else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcevxid))
626 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
627 errmsg("could not import the requested snapshot"),
628 errdetail("The source process with PID %d is not running anymore.",
632 * In transaction-snapshot mode, the first snapshot must live until end of
633 * xact, so we must make a copy of it. Furthermore, if we're running in
634 * serializable mode, predicate.c needs to do its own processing.
636 if (IsolationUsesXactSnapshot())
638 if (IsolationIsSerializable())
639 SetSerializableTransactionSnapshot(CurrentSnapshot, sourcevxid,
641 /* Make a saved copy */
642 CurrentSnapshot = CopySnapshot(CurrentSnapshot);
643 FirstXactSnapshot = CurrentSnapshot;
644 /* Mark it as "registered" in FirstXactSnapshot */
645 FirstXactSnapshot->regd_count++;
646 pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
649 FirstSnapshotSet = true;
654 * Copy the given snapshot.
656 * The copy is palloc'd in TopTransactionContext and has initial refcounts set
657 * to 0. The returned snapshot has the copied flag set.
660 CopySnapshot(Snapshot snapshot)
666 Assert(snapshot != InvalidSnapshot);
668 /* We allocate any XID arrays needed in the same palloc block. */
669 size = subxipoff = sizeof(SnapshotData) +
670 snapshot->xcnt * sizeof(TransactionId);
671 if (snapshot->subxcnt > 0)
672 size += snapshot->subxcnt * sizeof(TransactionId);
674 newsnap = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
675 memcpy(newsnap, snapshot, sizeof(SnapshotData));
677 newsnap->regd_count = 0;
678 newsnap->active_count = 0;
679 newsnap->copied = true;
681 /* setup XID array */
682 if (snapshot->xcnt > 0)
684 newsnap->xip = (TransactionId *) (newsnap + 1);
685 memcpy(newsnap->xip, snapshot->xip,
686 snapshot->xcnt * sizeof(TransactionId));
692 * Setup subXID array. Don't bother to copy it if it had overflowed,
693 * though, because it's not used anywhere in that case. Except if it's a
694 * snapshot taken during recovery; all the top-level XIDs are in subxip as
695 * well in that case, so we mustn't lose them.
697 if (snapshot->subxcnt > 0 &&
698 (!snapshot->suboverflowed || snapshot->takenDuringRecovery))
700 newsnap->subxip = (TransactionId *) ((char *) newsnap + subxipoff);
701 memcpy(newsnap->subxip, snapshot->subxip,
702 snapshot->subxcnt * sizeof(TransactionId));
705 newsnap->subxip = NULL;
712 * Free the memory associated with a snapshot.
715 FreeSnapshot(Snapshot snapshot)
717 Assert(snapshot->regd_count == 0);
718 Assert(snapshot->active_count == 0);
719 Assert(snapshot->copied);
726 * Set the given snapshot as the current active snapshot
728 * If the passed snapshot is a statically-allocated one, or it is possibly
729 * subject to a future command counter update, create a new long-lived copy
730 * with active refcount=1. Otherwise, only increment the refcount.
733 PushActiveSnapshot(Snapshot snap)
735 ActiveSnapshotElt *newactive;
737 Assert(snap != InvalidSnapshot);
739 newactive = MemoryContextAlloc(TopTransactionContext, sizeof(ActiveSnapshotElt));
742 * Checking SecondarySnapshot is probably useless here, but it seems
745 if (snap == CurrentSnapshot || snap == SecondarySnapshot || !snap->copied)
746 newactive->as_snap = CopySnapshot(snap);
748 newactive->as_snap = snap;
750 newactive->as_next = ActiveSnapshot;
751 newactive->as_level = GetCurrentTransactionNestLevel();
753 newactive->as_snap->active_count++;
755 ActiveSnapshot = newactive;
756 if (OldestActiveSnapshot == NULL)
757 OldestActiveSnapshot = ActiveSnapshot;
762 * As above, except forcibly copy the presented snapshot.
764 * This should be used when the ActiveSnapshot has to be modifiable, for
765 * example if the caller intends to call UpdateActiveSnapshotCommandId.
766 * The new snapshot will be released when popped from the stack.
769 PushCopiedSnapshot(Snapshot snapshot)
771 PushActiveSnapshot(CopySnapshot(snapshot));
775 * UpdateActiveSnapshotCommandId
777 * Update the current CID of the active snapshot. This can only be applied
778 * to a snapshot that is not referenced elsewhere.
781 UpdateActiveSnapshotCommandId(void)
783 CommandId save_curcid,
786 Assert(ActiveSnapshot != NULL);
787 Assert(ActiveSnapshot->as_snap->active_count == 1);
788 Assert(ActiveSnapshot->as_snap->regd_count == 0);
791 * Don't allow modification of the active snapshot during parallel
792 * operation. We share the snapshot to worker backends at the beginning
793 * of parallel operation, so any change to the snapshot can lead to
794 * inconsistencies. We have other defenses against
795 * CommandCounterIncrement, but there are a few places that call this
796 * directly, so we put an additional guard here.
798 save_curcid = ActiveSnapshot->as_snap->curcid;
799 curcid = GetCurrentCommandId(false);
800 if (IsInParallelMode() && save_curcid != curcid)
801 elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation");
802 ActiveSnapshot->as_snap->curcid = curcid;
808 * Remove the topmost snapshot from the active snapshot stack, decrementing the
809 * reference count, and free it if this was the last reference.
812 PopActiveSnapshot(void)
814 ActiveSnapshotElt *newstack;
816 newstack = ActiveSnapshot->as_next;
818 Assert(ActiveSnapshot->as_snap->active_count > 0);
820 ActiveSnapshot->as_snap->active_count--;
822 if (ActiveSnapshot->as_snap->active_count == 0 &&
823 ActiveSnapshot->as_snap->regd_count == 0)
824 FreeSnapshot(ActiveSnapshot->as_snap);
826 pfree(ActiveSnapshot);
827 ActiveSnapshot = newstack;
828 if (ActiveSnapshot == NULL)
829 OldestActiveSnapshot = NULL;
836 * Return the topmost snapshot in the Active stack.
839 GetActiveSnapshot(void)
841 Assert(ActiveSnapshot != NULL);
843 return ActiveSnapshot->as_snap;
848 * Return whether there is at least one snapshot in the Active stack
851 ActiveSnapshotSet(void)
853 return ActiveSnapshot != NULL;
858 * Register a snapshot as being in use by the current resource owner
860 * If InvalidSnapshot is passed, it is not registered.
863 RegisterSnapshot(Snapshot snapshot)
865 if (snapshot == InvalidSnapshot)
866 return InvalidSnapshot;
868 return RegisterSnapshotOnOwner(snapshot, CurrentResourceOwner);
872 * RegisterSnapshotOnOwner
873 * As above, but use the specified resource owner
876 RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner)
880 if (snapshot == InvalidSnapshot)
881 return InvalidSnapshot;
883 /* Static snapshot? Create a persistent copy */
884 snap = snapshot->copied ? snapshot : CopySnapshot(snapshot);
886 /* and tell resowner.c about it */
887 ResourceOwnerEnlargeSnapshots(owner);
889 ResourceOwnerRememberSnapshot(owner, snap);
891 if (snap->regd_count == 1)
892 pairingheap_add(&RegisteredSnapshots, &snap->ph_node);
900 * Decrement the reference count of a snapshot, remove the corresponding
901 * reference from CurrentResourceOwner, and free the snapshot if no more
905 UnregisterSnapshot(Snapshot snapshot)
907 if (snapshot == NULL)
910 UnregisterSnapshotFromOwner(snapshot, CurrentResourceOwner);
914 * UnregisterSnapshotFromOwner
915 * As above, but use the specified resource owner
918 UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner)
920 if (snapshot == NULL)
923 Assert(snapshot->regd_count > 0);
924 Assert(!pairingheap_is_empty(&RegisteredSnapshots));
926 ResourceOwnerForgetSnapshot(owner, snapshot);
928 snapshot->regd_count--;
929 if (snapshot->regd_count == 0)
930 pairingheap_remove(&RegisteredSnapshots, &snapshot->ph_node);
932 if (snapshot->regd_count == 0 && snapshot->active_count == 0)
934 FreeSnapshot(snapshot);
940 * Comparison function for RegisteredSnapshots heap. Snapshots are ordered
941 * by xmin, so that the snapshot with smallest xmin is at the top.
944 xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
946 const SnapshotData *asnap = pairingheap_const_container(SnapshotData, ph_node, a);
947 const SnapshotData *bsnap = pairingheap_const_container(SnapshotData, ph_node, b);
949 if (TransactionIdPrecedes(asnap->xmin, bsnap->xmin))
951 else if (TransactionIdFollows(asnap->xmin, bsnap->xmin))
960 * If there are no more snapshots, we can reset our PGXACT->xmin to InvalidXid.
961 * Note we can do this without locking because we assume that storing an Xid
964 * Even if there are some remaining snapshots, we may be able to advance our
965 * PGXACT->xmin to some degree. This typically happens when a portal is
966 * dropped. For efficiency, we only consider recomputing PGXACT->xmin when
967 * the active snapshot stack is empty; this allows us not to need to track
968 * which active snapshot is oldest.
970 * Note: it's tempting to use GetOldestSnapshot() here so that we can include
971 * active snapshots in the calculation. However, that compares by LSN not
972 * xmin so it's not entirely clear that it's the same thing. Also, we'd be
973 * critically dependent on the assumption that the bottommost active snapshot
974 * stack entry has the oldest xmin. (Current uses of GetOldestSnapshot() are
975 * not actually critical, but this would be.)
978 SnapshotResetXmin(void)
980 Snapshot minSnapshot;
982 if (ActiveSnapshot != NULL)
985 if (pairingheap_is_empty(&RegisteredSnapshots))
987 MyPgXact->xmin = InvalidTransactionId;
991 minSnapshot = pairingheap_container(SnapshotData, ph_node,
992 pairingheap_first(&RegisteredSnapshots));
994 if (TransactionIdPrecedes(MyPgXact->xmin, minSnapshot->xmin))
995 MyPgXact->xmin = minSnapshot->xmin;
999 * AtSubCommit_Snapshot
1002 AtSubCommit_Snapshot(int level)
1004 ActiveSnapshotElt *active;
1007 * Relabel the active snapshots set in this subtransaction as though they
1008 * are owned by the parent subxact.
1010 for (active = ActiveSnapshot; active != NULL; active = active->as_next)
1012 if (active->as_level < level)
1014 active->as_level = level - 1;
1019 * AtSubAbort_Snapshot
1020 * Clean up snapshots after a subtransaction abort
1023 AtSubAbort_Snapshot(int level)
1025 /* Forget the active snapshots set by this subtransaction */
1026 while (ActiveSnapshot && ActiveSnapshot->as_level >= level)
1028 ActiveSnapshotElt *next;
1030 next = ActiveSnapshot->as_next;
1033 * Decrement the snapshot's active count. If it's still registered or
1034 * marked as active by an outer subtransaction, we can't free it yet.
1036 Assert(ActiveSnapshot->as_snap->active_count >= 1);
1037 ActiveSnapshot->as_snap->active_count -= 1;
1039 if (ActiveSnapshot->as_snap->active_count == 0 &&
1040 ActiveSnapshot->as_snap->regd_count == 0)
1041 FreeSnapshot(ActiveSnapshot->as_snap);
1043 /* and free the stack element */
1044 pfree(ActiveSnapshot);
1046 ActiveSnapshot = next;
1047 if (ActiveSnapshot == NULL)
1048 OldestActiveSnapshot = NULL;
1051 SnapshotResetXmin();
1056 * Snapshot manager's cleanup function for end of transaction
1059 AtEOXact_Snapshot(bool isCommit, bool resetXmin)
1062 * In transaction-snapshot mode we must release our privately-managed
1063 * reference to the transaction snapshot. We must remove it from
1064 * RegisteredSnapshots to keep the check below happy. But we don't bother
1065 * to do FreeSnapshot, for two reasons: the memory will go away with
1066 * TopTransactionContext anyway, and if someone has left the snapshot
1067 * stacked as active, we don't want the code below to be chasing through a
1070 if (FirstXactSnapshot != NULL)
1072 Assert(FirstXactSnapshot->regd_count > 0);
1073 Assert(!pairingheap_is_empty(&RegisteredSnapshots));
1074 pairingheap_remove(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
1076 FirstXactSnapshot = NULL;
1079 * If we exported any snapshots, clean them up.
1081 if (exportedSnapshots != NIL)
1086 * Get rid of the files. Unlink failure is only a WARNING because (1)
1087 * it's too late to abort the transaction, and (2) leaving a leaked
1088 * file around has little real consequence anyway.
1090 * We also need to remove the snapshots from RegisteredSnapshots to
1091 * prevent a warning below.
1093 * As with the FirstXactSnapshot, we don't need to free resources of
1094 * the snapshot iself as it will go away with the memory context.
1096 foreach(lc, exportedSnapshots)
1098 ExportedSnapshot *esnap = (ExportedSnapshot *) lfirst(lc);
1100 if (unlink(esnap->snapfile))
1101 elog(WARNING, "could not unlink file \"%s\": %m",
1104 pairingheap_remove(&RegisteredSnapshots,
1105 &esnap->snapshot->ph_node);
1108 exportedSnapshots = NIL;
1111 /* Drop catalog snapshot if any */
1112 InvalidateCatalogSnapshot();
1114 /* On commit, complain about leftover snapshots */
1117 ActiveSnapshotElt *active;
1119 if (!pairingheap_is_empty(&RegisteredSnapshots))
1120 elog(WARNING, "registered snapshots seem to remain after cleanup");
1122 /* complain about unpopped active snapshots */
1123 for (active = ActiveSnapshot; active != NULL; active = active->as_next)
1124 elog(WARNING, "snapshot %p still active", active);
1128 * And reset our state. We don't need to free the memory explicitly --
1129 * it'll go away with TopTransactionContext.
1131 ActiveSnapshot = NULL;
1132 OldestActiveSnapshot = NULL;
1133 pairingheap_reset(&RegisteredSnapshots);
1135 CurrentSnapshot = NULL;
1136 SecondarySnapshot = NULL;
1138 FirstSnapshotSet = false;
1141 * During normal commit processing, we call ProcArrayEndTransaction() to
1142 * reset the PgXact->xmin. That call happens prior to the call to
1143 * AtEOXact_Snapshot(), so we need not touch xmin here at all.
1146 SnapshotResetXmin();
1148 Assert(resetXmin || MyPgXact->xmin == 0);
1154 * Export the snapshot to a file so that other backends can import it.
1155 * Returns the token (the file name) that can be used to import this
1159 ExportSnapshot(Snapshot snapshot)
1161 TransactionId topXid;
1162 TransactionId *children;
1163 ExportedSnapshot *esnap;
1169 MemoryContext oldcxt;
1170 char path[MAXPGPATH];
1171 char pathtmp[MAXPGPATH];
1174 * It's tempting to call RequireTransactionBlock here, since it's not very
1175 * useful to export a snapshot that will disappear immediately afterwards.
1176 * However, we haven't got enough information to do that, since we don't
1177 * know if we're at top level or not. For example, we could be inside a
1178 * plpgsql function that is going to fire off other transactions via
1179 * dblink. Rather than disallow perfectly legitimate usages, don't make a
1182 * Also note that we don't make any restriction on the transaction's
1183 * isolation level; however, importers must check the level if they are
1188 * Get our transaction ID if there is one, to include in the snapshot.
1190 topXid = GetTopTransactionIdIfAny();
1193 * We cannot export a snapshot from a subtransaction because there's no
1194 * easy way for importers to verify that the same subtransaction is still
1197 if (IsSubTransaction())
1199 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1200 errmsg("cannot export a snapshot from a subtransaction")));
1203 * We do however allow previous committed subtransactions to exist.
1204 * Importers of the snapshot must see them as still running, so get their
1205 * XIDs to add them to the snapshot.
1207 nchildren = xactGetCommittedChildren(&children);
1210 * Generate file path for the snapshot. We start numbering of snapshots
1211 * inside the transaction from 1.
1213 snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d",
1214 MyProc->backendId, MyProc->lxid, list_length(exportedSnapshots) + 1);
1217 * Copy the snapshot into TopTransactionContext, add it to the
1218 * exportedSnapshots list, and mark it pseudo-registered. We do this to
1219 * ensure that the snapshot's xmin is honored for the rest of the
1222 snapshot = CopySnapshot(snapshot);
1224 oldcxt = MemoryContextSwitchTo(TopTransactionContext);
1225 esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot));
1226 esnap->snapfile = pstrdup(path);
1227 esnap->snapshot = snapshot;
1228 exportedSnapshots = lappend(exportedSnapshots, esnap);
1229 MemoryContextSwitchTo(oldcxt);
1231 snapshot->regd_count++;
1232 pairingheap_add(&RegisteredSnapshots, &snapshot->ph_node);
1235 * Fill buf with a text serialization of the snapshot, plus identification
1236 * data about this transaction. The format expected by ImportSnapshot is
1237 * pretty rigid: each line must be fieldname:value.
1239 initStringInfo(&buf);
1241 appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->backendId, MyProc->lxid);
1242 appendStringInfo(&buf, "pid:%d\n", MyProcPid);
1243 appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
1244 appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);
1245 appendStringInfo(&buf, "ro:%d\n", XactReadOnly);
1247 appendStringInfo(&buf, "xmin:%u\n", snapshot->xmin);
1248 appendStringInfo(&buf, "xmax:%u\n", snapshot->xmax);
1251 * We must include our own top transaction ID in the top-xid data, since
1252 * by definition we will still be running when the importing transaction
1253 * adopts the snapshot, but GetSnapshotData never includes our own XID in
1254 * the snapshot. (There must, therefore, be enough room to add it.)
1256 * However, it could be that our topXid is after the xmax, in which case
1257 * we shouldn't include it because xip[] members are expected to be before
1258 * xmax. (We need not make the same check for subxip[] members, see
1261 addTopXid = (TransactionIdIsValid(topXid) &&
1262 TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0;
1263 appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid);
1264 for (i = 0; i < snapshot->xcnt; i++)
1265 appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]);
1267 appendStringInfo(&buf, "xip:%u\n", topXid);
1270 * Similarly, we add our subcommitted child XIDs to the subxid data. Here,
1271 * we have to cope with possible overflow.
1273 if (snapshot->suboverflowed ||
1274 snapshot->subxcnt + nchildren > GetMaxSnapshotSubxidCount())
1275 appendStringInfoString(&buf, "sof:1\n");
1278 appendStringInfoString(&buf, "sof:0\n");
1279 appendStringInfo(&buf, "sxcnt:%d\n", snapshot->subxcnt + nchildren);
1280 for (i = 0; i < snapshot->subxcnt; i++)
1281 appendStringInfo(&buf, "sxp:%u\n", snapshot->subxip[i]);
1282 for (i = 0; i < nchildren; i++)
1283 appendStringInfo(&buf, "sxp:%u\n", children[i]);
1285 appendStringInfo(&buf, "rec:%u\n", snapshot->takenDuringRecovery);
1288 * Now write the text representation into a file. We first write to a
1289 * ".tmp" filename, and rename to final filename if no error. This
1290 * ensures that no other backend can read an incomplete file
1291 * (ImportSnapshot won't allow it because of its valid-characters check).
1293 snprintf(pathtmp, sizeof(pathtmp), "%s.tmp", path);
1294 if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
1296 (errcode_for_file_access(),
1297 errmsg("could not create file \"%s\": %m", pathtmp)));
1299 if (fwrite(buf.data, buf.len, 1, f) != 1)
1301 (errcode_for_file_access(),
1302 errmsg("could not write to file \"%s\": %m", pathtmp)));
1304 /* no fsync() since file need not survive a system crash */
1308 (errcode_for_file_access(),
1309 errmsg("could not write to file \"%s\": %m", pathtmp)));
1312 * Now that we have written everything into a .tmp file, rename the file
1313 * to remove the .tmp suffix.
1315 if (rename(pathtmp, path) < 0)
1317 (errcode_for_file_access(),
1318 errmsg("could not rename file \"%s\" to \"%s\": %m",
1322 * The basename of the file is what we return from pg_export_snapshot().
1323 * It's already in path in a textual format and we know that the path
1324 * starts with SNAPSHOT_EXPORT_DIR. Skip over the prefix and the slash
1325 * and pstrdup it so as not to return the address of a local variable.
1327 return pstrdup(path + strlen(SNAPSHOT_EXPORT_DIR) + 1);
1331 * pg_export_snapshot
1332 * SQL-callable wrapper for ExportSnapshot.
1335 pg_export_snapshot(PG_FUNCTION_ARGS)
1339 snapshotName = ExportSnapshot(GetActiveSnapshot());
1340 PG_RETURN_TEXT_P(cstring_to_text(snapshotName));
1345 * Parsing subroutines for ImportSnapshot: parse a line with the given
1346 * prefix followed by a value, and advance *s to the next line. The
1347 * filename is provided for use in error messages.
1350 parseIntFromText(const char *prefix, char **s, const char *filename)
1353 int prefixlen = strlen(prefix);
1356 if (strncmp(ptr, prefix, prefixlen) != 0)
1358 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1359 errmsg("invalid snapshot data in file \"%s\"", filename)));
1361 if (sscanf(ptr, "%d", &val) != 1)
1363 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1364 errmsg("invalid snapshot data in file \"%s\"", filename)));
1365 ptr = strchr(ptr, '\n');
1368 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1369 errmsg("invalid snapshot data in file \"%s\"", filename)));
1374 static TransactionId
1375 parseXidFromText(const char *prefix, char **s, const char *filename)
1378 int prefixlen = strlen(prefix);
1381 if (strncmp(ptr, prefix, prefixlen) != 0)
1383 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1384 errmsg("invalid snapshot data in file \"%s\"", filename)));
1386 if (sscanf(ptr, "%u", &val) != 1)
1388 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1389 errmsg("invalid snapshot data in file \"%s\"", filename)));
1390 ptr = strchr(ptr, '\n');
1393 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1394 errmsg("invalid snapshot data in file \"%s\"", filename)));
1400 parseVxidFromText(const char *prefix, char **s, const char *filename,
1401 VirtualTransactionId *vxid)
1404 int prefixlen = strlen(prefix);
1406 if (strncmp(ptr, prefix, prefixlen) != 0)
1408 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1409 errmsg("invalid snapshot data in file \"%s\"", filename)));
1411 if (sscanf(ptr, "%d/%u", &vxid->backendId, &vxid->localTransactionId) != 2)
1413 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1414 errmsg("invalid snapshot data in file \"%s\"", filename)));
1415 ptr = strchr(ptr, '\n');
1418 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1419 errmsg("invalid snapshot data in file \"%s\"", filename)));
1425 * Import a previously exported snapshot. The argument should be a
1426 * filename in SNAPSHOT_EXPORT_DIR. Load the snapshot from that file.
1427 * This is called by "SET TRANSACTION SNAPSHOT 'foo'".
1430 ImportSnapshot(const char *idstr)
1432 char path[MAXPGPATH];
1434 struct stat stat_buf;
1438 VirtualTransactionId src_vxid;
1443 SnapshotData snapshot;
1446 * Must be at top level of a fresh transaction. Note in particular that
1447 * we check we haven't acquired an XID --- if we have, it's conceivable
1448 * that the snapshot would show it as not running, making for very screwy
1451 if (FirstSnapshotSet ||
1452 GetTopTransactionIdIfAny() != InvalidTransactionId ||
1455 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1456 errmsg("SET TRANSACTION SNAPSHOT must be called before any query")));
1459 * If we are in read committed mode then the next query would execute with
1460 * a new snapshot thus making this function call quite useless.
1462 if (!IsolationUsesXactSnapshot())
1464 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1465 errmsg("a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ")));
1468 * Verify the identifier: only 0-9, A-F and hyphens are allowed. We do
1469 * this mainly to prevent reading arbitrary files.
1471 if (strspn(idstr, "0123456789ABCDEF-") != strlen(idstr))
1473 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1474 errmsg("invalid snapshot identifier: \"%s\"", idstr)));
1476 /* OK, read the file */
1477 snprintf(path, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", idstr);
1479 f = AllocateFile(path, PG_BINARY_R);
1482 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1483 errmsg("invalid snapshot identifier: \"%s\"", idstr)));
1485 /* get the size of the file so that we know how much memory we need */
1486 if (fstat(fileno(f), &stat_buf))
1487 elog(ERROR, "could not stat file \"%s\": %m", path);
1489 /* and read the file into a palloc'd string */
1490 filebuf = (char *) palloc(stat_buf.st_size + 1);
1491 if (fread(filebuf, stat_buf.st_size, 1, f) != 1)
1492 elog(ERROR, "could not read file \"%s\": %m", path);
1494 filebuf[stat_buf.st_size] = '\0';
1499 * Construct a snapshot struct by parsing the file content.
1501 memset(&snapshot, 0, sizeof(snapshot));
1503 parseVxidFromText("vxid:", &filebuf, path, &src_vxid);
1504 src_pid = parseIntFromText("pid:", &filebuf, path);
1505 /* we abuse parseXidFromText a bit here ... */
1506 src_dbid = parseXidFromText("dbid:", &filebuf, path);
1507 src_isolevel = parseIntFromText("iso:", &filebuf, path);
1508 src_readonly = parseIntFromText("ro:", &filebuf, path);
1510 snapshot.xmin = parseXidFromText("xmin:", &filebuf, path);
1511 snapshot.xmax = parseXidFromText("xmax:", &filebuf, path);
1513 snapshot.xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path);
1515 /* sanity-check the xid count before palloc */
1516 if (xcnt < 0 || xcnt > GetMaxSnapshotXidCount())
1518 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1519 errmsg("invalid snapshot data in file \"%s\"", path)));
1521 snapshot.xip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
1522 for (i = 0; i < xcnt; i++)
1523 snapshot.xip[i] = parseXidFromText("xip:", &filebuf, path);
1525 snapshot.suboverflowed = parseIntFromText("sof:", &filebuf, path);
1527 if (!snapshot.suboverflowed)
1529 snapshot.subxcnt = xcnt = parseIntFromText("sxcnt:", &filebuf, path);
1531 /* sanity-check the xid count before palloc */
1532 if (xcnt < 0 || xcnt > GetMaxSnapshotSubxidCount())
1534 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1535 errmsg("invalid snapshot data in file \"%s\"", path)));
1537 snapshot.subxip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
1538 for (i = 0; i < xcnt; i++)
1539 snapshot.subxip[i] = parseXidFromText("sxp:", &filebuf, path);
1543 snapshot.subxcnt = 0;
1544 snapshot.subxip = NULL;
1547 snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path);
1550 * Do some additional sanity checking, just to protect ourselves. We
1551 * don't trouble to check the array elements, just the most critical
1554 if (!VirtualTransactionIdIsValid(src_vxid) ||
1555 !OidIsValid(src_dbid) ||
1556 !TransactionIdIsNormal(snapshot.xmin) ||
1557 !TransactionIdIsNormal(snapshot.xmax))
1559 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1560 errmsg("invalid snapshot data in file \"%s\"", path)));
1563 * If we're serializable, the source transaction must be too, otherwise
1564 * predicate.c has problems (SxactGlobalXmin could go backwards). Also, a
1565 * non-read-only transaction can't adopt a snapshot from a read-only
1566 * transaction, as predicate.c handles the cases very differently.
1568 if (IsolationIsSerializable())
1570 if (src_isolevel != XACT_SERIALIZABLE)
1572 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1573 errmsg("a serializable transaction cannot import a snapshot from a non-serializable transaction")));
1574 if (src_readonly && !XactReadOnly)
1576 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1577 errmsg("a non-read-only serializable transaction cannot import a snapshot from a read-only transaction")));
1581 * We cannot import a snapshot that was taken in a different database,
1582 * because vacuum calculates OldestXmin on a per-database basis; so the
1583 * source transaction's xmin doesn't protect us from data loss. This
1584 * restriction could be removed if the source transaction were to mark its
1585 * xmin as being globally applicable. But that would require some
1586 * additional syntax, since that has to be known when the snapshot is
1587 * initially taken. (See pgsql-hackers discussion of 2011-10-21.)
1589 if (src_dbid != MyDatabaseId)
1591 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1592 errmsg("cannot import a snapshot from a different database")));
1594 /* OK, install the snapshot */
1595 SetTransactionSnapshot(&snapshot, &src_vxid, src_pid, NULL);
1599 * XactHasExportedSnapshots
1600 * Test whether current transaction has exported any snapshots.
1603 XactHasExportedSnapshots(void)
1605 return (exportedSnapshots != NIL);
1609 * DeleteAllExportedSnapshotFiles
1610 * Clean up any files that have been left behind by a crashed backend
1611 * that had exported snapshots before it died.
1613 * This should be called during database startup or crash recovery.
1616 DeleteAllExportedSnapshotFiles(void)
1618 char buf[MAXPGPATH + sizeof(SNAPSHOT_EXPORT_DIR)];
1620 struct dirent *s_de;
1623 * Problems in reading the directory, or unlinking files, are reported at
1624 * LOG level. Since we're running in the startup process, ERROR level
1625 * would prevent database start, and it's not important enough for that.
1627 s_dir = AllocateDir(SNAPSHOT_EXPORT_DIR);
1629 while ((s_de = ReadDirExtended(s_dir, SNAPSHOT_EXPORT_DIR, LOG)) != NULL)
1631 if (strcmp(s_de->d_name, ".") == 0 ||
1632 strcmp(s_de->d_name, "..") == 0)
1635 snprintf(buf, sizeof(buf), SNAPSHOT_EXPORT_DIR "/%s", s_de->d_name);
1637 if (unlink(buf) != 0)
1639 (errcode_for_file_access(),
1640 errmsg("could not remove file \"%s\": %m", buf)));
1647 * ThereAreNoPriorRegisteredSnapshots
1648 * Is the registered snapshot count less than or equal to one?
1650 * Don't use this to settle important decisions. While zero registrations and
1651 * no ActiveSnapshot would confirm a certain idleness, the system makes no
1652 * guarantees about the significance of one registered snapshot.
1655 ThereAreNoPriorRegisteredSnapshots(void)
1657 if (pairingheap_is_empty(&RegisteredSnapshots) ||
1658 pairingheap_is_singular(&RegisteredSnapshots))
1666 * Return a timestamp that is exactly on a minute boundary.
1668 * If the argument is already aligned, return that value, otherwise move to
1669 * the next minute boundary following the given time.
1672 AlignTimestampToMinuteBoundary(TimestampTz ts)
1674 TimestampTz retval = ts + (USECS_PER_MINUTE - 1);
1676 return retval - (retval % USECS_PER_MINUTE);
1680 * Get current timestamp for snapshots
1682 * This is basically GetCurrentTimestamp(), but with a guarantee that
1683 * the result never moves backward.
1686 GetSnapshotCurrentTimestamp(void)
1688 TimestampTz now = GetCurrentTimestamp();
1691 * Don't let time move backward; if it hasn't advanced, use the old value.
1693 SpinLockAcquire(&oldSnapshotControl->mutex_current);
1694 if (now <= oldSnapshotControl->current_timestamp)
1695 now = oldSnapshotControl->current_timestamp;
1697 oldSnapshotControl->current_timestamp = now;
1698 SpinLockRelease(&oldSnapshotControl->mutex_current);
1704 * Get timestamp through which vacuum may have processed based on last stored
1705 * value for threshold_timestamp.
1707 * XXX: So far, we never trust that a 64-bit value can be read atomically; if
1708 * that ever changes, we could get rid of the spinlock here.
1711 GetOldSnapshotThresholdTimestamp(void)
1713 TimestampTz threshold_timestamp;
1715 SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1716 threshold_timestamp = oldSnapshotControl->threshold_timestamp;
1717 SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1719 return threshold_timestamp;
1723 SetOldSnapshotThresholdTimestamp(TimestampTz ts, TransactionId xlimit)
1725 SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1726 oldSnapshotControl->threshold_timestamp = ts;
1727 oldSnapshotControl->threshold_xid = xlimit;
1728 SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1732 * TransactionIdLimitedForOldSnapshots
1734 * Apply old snapshot limit, if any. This is intended to be called for page
1735 * pruning and table vacuuming, to allow old_snapshot_threshold to override
1736 * the normal global xmin value. Actual testing for snapshot too old will be
1737 * based on whether a snapshot timestamp is prior to the threshold timestamp
1738 * set in this function.
1741 TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
1744 if (TransactionIdIsNormal(recentXmin)
1745 && old_snapshot_threshold >= 0
1746 && RelationAllowsEarlyPruning(relation))
1748 TimestampTz ts = GetSnapshotCurrentTimestamp();
1749 TransactionId xlimit = recentXmin;
1750 TransactionId latest_xmin;
1751 TimestampTz update_ts;
1752 bool same_ts_as_threshold = false;
1754 SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
1755 latest_xmin = oldSnapshotControl->latest_xmin;
1756 update_ts = oldSnapshotControl->next_map_update;
1757 SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
1760 * Zero threshold always overrides to latest xmin, if valid. Without
1761 * some heuristic it will find its own snapshot too old on, for
1762 * example, a simple UPDATE -- which would make it useless for most
1763 * testing, but there is no principled way to ensure that it doesn't
1764 * fail in this way. Use a five-second delay to try to get useful
1765 * testing behavior, but this may need adjustment.
1767 if (old_snapshot_threshold == 0)
1769 if (TransactionIdPrecedes(latest_xmin, MyPgXact->xmin)
1770 && TransactionIdFollows(latest_xmin, xlimit))
1771 xlimit = latest_xmin;
1773 ts -= 5 * USECS_PER_SEC;
1774 SetOldSnapshotThresholdTimestamp(ts, xlimit);
1779 ts = AlignTimestampToMinuteBoundary(ts)
1780 - (old_snapshot_threshold * USECS_PER_MINUTE);
1782 /* Check for fast exit without LW locking. */
1783 SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1784 if (ts == oldSnapshotControl->threshold_timestamp)
1786 xlimit = oldSnapshotControl->threshold_xid;
1787 same_ts_as_threshold = true;
1789 SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1791 if (!same_ts_as_threshold)
1793 if (ts == update_ts)
1795 xlimit = latest_xmin;
1796 if (NormalTransactionIdFollows(xlimit, recentXmin))
1797 SetOldSnapshotThresholdTimestamp(ts, xlimit);
1801 LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);
1803 if (oldSnapshotControl->count_used > 0
1804 && ts >= oldSnapshotControl->head_timestamp)
1808 offset = ((ts - oldSnapshotControl->head_timestamp)
1809 / USECS_PER_MINUTE);
1810 if (offset > oldSnapshotControl->count_used - 1)
1811 offset = oldSnapshotControl->count_used - 1;
1812 offset = (oldSnapshotControl->head_offset + offset)
1813 % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1814 xlimit = oldSnapshotControl->xid_by_minute[offset];
1816 if (NormalTransactionIdFollows(xlimit, recentXmin))
1817 SetOldSnapshotThresholdTimestamp(ts, xlimit);
1820 LWLockRelease(OldSnapshotTimeMapLock);
1825 * Failsafe protection against vacuuming work of active transaction.
1827 * This is not an assertion because we avoid the spinlock for
1828 * performance, leaving open the possibility that xlimit could advance
1829 * and be more current; but it seems prudent to apply this limit. It
1830 * might make pruning a tiny bit less aggressive than it could be, but
1831 * protects against data loss bugs.
1833 if (TransactionIdIsNormal(latest_xmin)
1834 && TransactionIdPrecedes(latest_xmin, xlimit))
1835 xlimit = latest_xmin;
1837 if (NormalTransactionIdFollows(xlimit, recentXmin))
1845 * Take care of the circular buffer that maps time to xid.
1848 MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
1851 TransactionId latest_xmin;
1852 TimestampTz update_ts;
1853 bool map_update_required = false;
1855 /* Never call this function when old snapshot checking is disabled. */
1856 Assert(old_snapshot_threshold >= 0);
1858 ts = AlignTimestampToMinuteBoundary(whenTaken);
1861 * Keep track of the latest xmin seen by any process. Update mapping with
1862 * a new value when we have crossed a bucket boundary.
1864 SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
1865 latest_xmin = oldSnapshotControl->latest_xmin;
1866 update_ts = oldSnapshotControl->next_map_update;
1869 oldSnapshotControl->next_map_update = ts;
1870 map_update_required = true;
1872 if (TransactionIdFollows(xmin, latest_xmin))
1873 oldSnapshotControl->latest_xmin = xmin;
1874 SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
1876 /* We only needed to update the most recent xmin value. */
1877 if (!map_update_required)
1880 /* No further tracking needed for 0 (used for testing). */
1881 if (old_snapshot_threshold == 0)
1885 * We don't want to do something stupid with unusual values, but we don't
1886 * want to litter the log with warnings or break otherwise normal
1887 * processing for this feature; so if something seems unreasonable, just
1888 * log at DEBUG level and return without doing anything.
1893 "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
1897 if (!TransactionIdIsNormal(xmin))
1900 "MaintainOldSnapshotTimeMapping called with xmin = %lu",
1901 (unsigned long) xmin);
1905 LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE);
1907 Assert(oldSnapshotControl->head_offset >= 0);
1908 Assert(oldSnapshotControl->head_offset < OLD_SNAPSHOT_TIME_MAP_ENTRIES);
1909 Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0);
1910 Assert(oldSnapshotControl->count_used >= 0);
1911 Assert(oldSnapshotControl->count_used <= OLD_SNAPSHOT_TIME_MAP_ENTRIES);
1913 if (oldSnapshotControl->count_used == 0)
1915 /* set up first entry for empty mapping */
1916 oldSnapshotControl->head_offset = 0;
1917 oldSnapshotControl->head_timestamp = ts;
1918 oldSnapshotControl->count_used = 1;
1919 oldSnapshotControl->xid_by_minute[0] = xmin;
1921 else if (ts < oldSnapshotControl->head_timestamp)
1923 /* old ts; log it at DEBUG */
1924 LWLockRelease(OldSnapshotTimeMapLock);
1926 "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
1930 else if (ts <= (oldSnapshotControl->head_timestamp +
1931 ((oldSnapshotControl->count_used - 1)
1932 * USECS_PER_MINUTE)))
1934 /* existing mapping; advance xid if possible */
1935 int bucket = (oldSnapshotControl->head_offset
1936 + ((ts - oldSnapshotControl->head_timestamp)
1937 / USECS_PER_MINUTE))
1938 % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1940 if (TransactionIdPrecedes(oldSnapshotControl->xid_by_minute[bucket], xmin))
1941 oldSnapshotControl->xid_by_minute[bucket] = xmin;
1945 /* We need a new bucket, but it might not be the very next one. */
1946 int advance = ((ts - oldSnapshotControl->head_timestamp)
1947 / USECS_PER_MINUTE);
1949 oldSnapshotControl->head_timestamp = ts;
1951 if (advance >= OLD_SNAPSHOT_TIME_MAP_ENTRIES)
1953 /* Advance is so far that all old data is junk; start over. */
1954 oldSnapshotControl->head_offset = 0;
1955 oldSnapshotControl->count_used = 1;
1956 oldSnapshotControl->xid_by_minute[0] = xmin;
1960 /* Store the new value in one or more buckets. */
1963 for (i = 0; i < advance; i++)
1965 if (oldSnapshotControl->count_used == OLD_SNAPSHOT_TIME_MAP_ENTRIES)
1967 /* Map full and new value replaces old head. */
1968 int old_head = oldSnapshotControl->head_offset;
1970 if (old_head == (OLD_SNAPSHOT_TIME_MAP_ENTRIES - 1))
1971 oldSnapshotControl->head_offset = 0;
1973 oldSnapshotControl->head_offset = old_head + 1;
1974 oldSnapshotControl->xid_by_minute[old_head] = xmin;
1978 /* Extend map to unused entry. */
1979 int new_tail = (oldSnapshotControl->head_offset
1980 + oldSnapshotControl->count_used)
1981 % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1983 oldSnapshotControl->count_used++;
1984 oldSnapshotControl->xid_by_minute[new_tail] = xmin;
1990 LWLockRelease(OldSnapshotTimeMapLock);
1995 * Setup a snapshot that replaces normal catalog snapshots that allows catalog
1996 * access to behave just like it did at a certain point in the past.
1998 * Needed for logical decoding.
2001 SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
2003 Assert(historic_snapshot != NULL);
2005 /* setup the timetravel snapshot */
2006 HistoricSnapshot = historic_snapshot;
2008 /* setup (cmin, cmax) lookup hash */
2009 tuplecid_data = tuplecids;
2014 * Make catalog snapshots behave normally again.
2017 TeardownHistoricSnapshot(bool is_error)
2019 HistoricSnapshot = NULL;
2020 tuplecid_data = NULL;
2024 HistoricSnapshotActive(void)
2026 return HistoricSnapshot != NULL;
2030 HistoricSnapshotGetTupleCids(void)
2032 Assert(HistoricSnapshotActive());
2033 return tuplecid_data;
2037 * EstimateSnapshotSpace
2038 * Returns the size needed to store the given snapshot.
2040 * We are exporting only required fields from the Snapshot, stored in
2041 * SerializedSnapshotData.
2044 EstimateSnapshotSpace(Snapshot snap)
2048 Assert(snap != InvalidSnapshot);
2049 Assert(snap->satisfies == HeapTupleSatisfiesMVCC);
2051 /* We allocate any XID arrays needed in the same palloc block. */
2052 size = add_size(sizeof(SerializedSnapshotData),
2053 mul_size(snap->xcnt, sizeof(TransactionId)));
2054 if (snap->subxcnt > 0 &&
2055 (!snap->suboverflowed || snap->takenDuringRecovery))
2056 size = add_size(size,
2057 mul_size(snap->subxcnt, sizeof(TransactionId)));
2064 * Dumps the serialized snapshot (extracted from given snapshot) onto the
2065 * memory location at start_address.
2068 SerializeSnapshot(Snapshot snapshot, char *start_address)
2070 SerializedSnapshotData serialized_snapshot;
2072 Assert(snapshot->subxcnt >= 0);
2074 /* Copy all required fields */
2075 serialized_snapshot.xmin = snapshot->xmin;
2076 serialized_snapshot.xmax = snapshot->xmax;
2077 serialized_snapshot.xcnt = snapshot->xcnt;
2078 serialized_snapshot.subxcnt = snapshot->subxcnt;
2079 serialized_snapshot.suboverflowed = snapshot->suboverflowed;
2080 serialized_snapshot.takenDuringRecovery = snapshot->takenDuringRecovery;
2081 serialized_snapshot.curcid = snapshot->curcid;
2082 serialized_snapshot.whenTaken = snapshot->whenTaken;
2083 serialized_snapshot.lsn = snapshot->lsn;
2086 * Ignore the SubXID array if it has overflowed, unless the snapshot was
2087 * taken during recovery - in that case, top-level XIDs are in subxip as
2088 * well, and we mustn't lose them.
2090 if (serialized_snapshot.suboverflowed && !snapshot->takenDuringRecovery)
2091 serialized_snapshot.subxcnt = 0;
2093 /* Copy struct to possibly-unaligned buffer */
2094 memcpy(start_address,
2095 &serialized_snapshot, sizeof(SerializedSnapshotData));
2097 /* Copy XID array */
2098 if (snapshot->xcnt > 0)
2099 memcpy((TransactionId *) (start_address +
2100 sizeof(SerializedSnapshotData)),
2101 snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
2104 * Copy SubXID array. Don't bother to copy it if it had overflowed,
2105 * though, because it's not used anywhere in that case. Except if it's a
2106 * snapshot taken during recovery; all the top-level XIDs are in subxip as
2107 * well in that case, so we mustn't lose them.
2109 if (serialized_snapshot.subxcnt > 0)
2111 Size subxipoff = sizeof(SerializedSnapshotData) +
2112 snapshot->xcnt * sizeof(TransactionId);
2114 memcpy((TransactionId *) (start_address + subxipoff),
2115 snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId));
2121 * Restore a serialized snapshot from the specified address.
2123 * The copy is palloc'd in TopTransactionContext and has initial refcounts set
2124 * to 0. The returned snapshot has the copied flag set.
2127 RestoreSnapshot(char *start_address)
2129 SerializedSnapshotData serialized_snapshot;
2132 TransactionId *serialized_xids;
2134 memcpy(&serialized_snapshot, start_address,
2135 sizeof(SerializedSnapshotData));
2136 serialized_xids = (TransactionId *)
2137 (start_address + sizeof(SerializedSnapshotData));
2139 /* We allocate any XID arrays needed in the same palloc block. */
2140 size = sizeof(SnapshotData)
2141 + serialized_snapshot.xcnt * sizeof(TransactionId)
2142 + serialized_snapshot.subxcnt * sizeof(TransactionId);
2144 /* Copy all required fields */
2145 snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
2146 snapshot->satisfies = HeapTupleSatisfiesMVCC;
2147 snapshot->xmin = serialized_snapshot.xmin;
2148 snapshot->xmax = serialized_snapshot.xmax;
2149 snapshot->xip = NULL;
2150 snapshot->xcnt = serialized_snapshot.xcnt;
2151 snapshot->subxip = NULL;
2152 snapshot->subxcnt = serialized_snapshot.subxcnt;
2153 snapshot->suboverflowed = serialized_snapshot.suboverflowed;
2154 snapshot->takenDuringRecovery = serialized_snapshot.takenDuringRecovery;
2155 snapshot->curcid = serialized_snapshot.curcid;
2156 snapshot->whenTaken = serialized_snapshot.whenTaken;
2157 snapshot->lsn = serialized_snapshot.lsn;
2159 /* Copy XIDs, if present. */
2160 if (serialized_snapshot.xcnt > 0)
2162 snapshot->xip = (TransactionId *) (snapshot + 1);
2163 memcpy(snapshot->xip, serialized_xids,
2164 serialized_snapshot.xcnt * sizeof(TransactionId));
2167 /* Copy SubXIDs, if present. */
2168 if (serialized_snapshot.subxcnt > 0)
2170 snapshot->subxip = ((TransactionId *) (snapshot + 1)) +
2171 serialized_snapshot.xcnt;
2172 memcpy(snapshot->subxip, serialized_xids + serialized_snapshot.xcnt,
2173 serialized_snapshot.subxcnt * sizeof(TransactionId));
2176 /* Set the copied flag so that the caller will set refcounts correctly. */
2177 snapshot->regd_count = 0;
2178 snapshot->active_count = 0;
2179 snapshot->copied = true;
2185 * Install a restored snapshot as the transaction snapshot.
2187 * The second argument is of type void * so that snapmgr.h need not include
2188 * the declaration for PGPROC.
2191 RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
2193 SetTransactionSnapshot(snapshot, NULL, InvalidPid, master_pgproc);