]> granicus.if.org Git - postgresql/blob - src/backend/utils/time/snapmgr.c
Handle unaligned SerializeSnapshot() buffer.
[postgresql] / src / backend / utils / time / snapmgr.c
1 /*-------------------------------------------------------------------------
2  *
3  * snapmgr.c
4  *              PostgreSQL snapshot manager
5  *
6  * We keep track of snapshots in two ways: those "registered" by resowner.c,
7  * and the "active snapshot" stack.  All snapshots in either of them live in
8  * persistent memory.  When a snapshot is no longer in any of these lists
9  * (tracked by separate refcounts on each snapshot), its memory can be freed.
10  *
11  * The FirstXactSnapshot, if any, is treated a bit specially: we increment its
12  * regd_count and list it in RegisteredSnapshots, but this reference is not
13  * tracked by a resource owner. We used to use the TopTransactionResourceOwner
14  * to track this snapshot reference, but that introduces logical circularity
15  * and thus makes it impossible to clean up in a sane fashion.  It's better to
16  * handle this reference as an internally-tracked registration, so that this
17  * module is entirely lower-level than ResourceOwners.
18  *
19  * Likewise, any snapshots that have been exported by pg_export_snapshot
20  * have regd_count = 1 and are listed in RegisteredSnapshots, but are not
21  * tracked by any resource owner.
22  *
23  * Likewise, the CatalogSnapshot is listed in RegisteredSnapshots when it
24  * is valid, but is not tracked by any resource owner.
25  *
26  * The same is true for historic snapshots used during logical decoding,
27  * their lifetime is managed separately (as they live longer than one xact.c
28  * transaction).
29  *
30  * These arrangements let us reset MyPgXact->xmin when there are no snapshots
31  * referenced by this transaction, and advance it when the one with oldest
32  * Xmin is no longer referenced.  For simplicity however, only registered
33  * snapshots not active snapshots participate in tracking which one is oldest;
34  * we don't try to change MyPgXact->xmin except when the active-snapshot
35  * stack is empty.
36  *
37  *
38  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
39  * Portions Copyright (c) 1994, Regents of the University of California
40  *
41  * IDENTIFICATION
42  *        src/backend/utils/time/snapmgr.c
43  *
44  *-------------------------------------------------------------------------
45  */
46 #include "postgres.h"
47
48 #include <sys/stat.h>
49 #include <unistd.h>
50
51 #include "access/transam.h"
52 #include "access/xact.h"
53 #include "access/xlog.h"
54 #include "catalog/catalog.h"
55 #include "lib/pairingheap.h"
56 #include "miscadmin.h"
57 #include "storage/predicate.h"
58 #include "storage/proc.h"
59 #include "storage/procarray.h"
60 #include "storage/sinval.h"
61 #include "storage/spin.h"
62 #include "utils/builtins.h"
63 #include "utils/memutils.h"
64 #include "utils/rel.h"
65 #include "utils/resowner_private.h"
66 #include "utils/snapmgr.h"
67 #include "utils/syscache.h"
68 #include "utils/tqual.h"
69
70
71 /*
72  * GUC parameters
73  */
74 int                     old_snapshot_threshold;         /* number of minutes, -1 disables */
75
76 /*
77  * Structure for dealing with old_snapshot_threshold implementation.
78  */
79 typedef struct OldSnapshotControlData
80 {
81         /*
82          * Variables for old snapshot handling are shared among processes and are
83          * only allowed to move forward.
84          */
85         slock_t         mutex_current;  /* protect current_timestamp */
86         TimestampTz current_timestamp;          /* latest snapshot timestamp */
87         slock_t         mutex_latest_xmin;              /* protect latest_xmin and
88                                                                                  * next_map_update */
89         TransactionId latest_xmin;      /* latest snapshot xmin */
90         TimestampTz next_map_update;    /* latest snapshot valid up to */
91         slock_t         mutex_threshold;        /* protect threshold fields */
92         TimestampTz threshold_timestamp;        /* earlier snapshot is old */
93         TransactionId threshold_xid;    /* earlier xid may be gone */
94
95         /*
96          * Keep one xid per minute for old snapshot error handling.
97          *
98          * Use a circular buffer with a head offset, a count of entries currently
99          * used, and a timestamp corresponding to the xid at the head offset.  A
100          * count_used value of zero means that there are no times stored; a
101          * count_used value of OLD_SNAPSHOT_TIME_MAP_ENTRIES means that the buffer
102          * is full and the head must be advanced to add new entries.  Use
103          * timestamps aligned to minute boundaries, since that seems less
104          * surprising than aligning based on the first usage timestamp.  The
105          * latest bucket is effectively stored within latest_xmin.  The circular
106          * buffer is updated when we get a new xmin value that doesn't fall into
107          * the same interval.
108          *
109          * It is OK if the xid for a given time slot is from earlier than
110          * calculated by adding the number of minutes corresponding to the
111          * (possibly wrapped) distance from the head offset to the time of the
112          * head entry, since that just results in the vacuuming of old tuples
113          * being slightly less aggressive.  It would not be OK for it to be off in
114          * the other direction, since it might result in vacuuming tuples that are
115          * still expected to be there.
116          *
117          * Use of an SLRU was considered but not chosen because it is more
118          * heavyweight than is needed for this, and would probably not be any less
119          * code to implement.
120          *
121          * Persistence is not needed.
122          */
123         int                     head_offset;    /* subscript of oldest tracked time */
124         TimestampTz head_timestamp; /* time corresponding to head xid */
125         int                     count_used;             /* how many slots are in use */
126         TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER];
127 } OldSnapshotControlData;
128
129 static volatile OldSnapshotControlData *oldSnapshotControl;
130
131
132 /*
133  * CurrentSnapshot points to the only snapshot taken in transaction-snapshot
134  * mode, and to the latest one taken in a read-committed transaction.
135  * SecondarySnapshot is a snapshot that's always up-to-date as of the current
136  * instant, even in transaction-snapshot mode.  It should only be used for
137  * special-purpose code (say, RI checking.)  CatalogSnapshot points to an
138  * MVCC snapshot intended to be used for catalog scans; we must invalidate it
139  * whenever a system catalog change occurs.
140  *
141  * These SnapshotData structs are static to simplify memory allocation
142  * (see the hack in GetSnapshotData to avoid repeated malloc/free).
143  */
144 static SnapshotData CurrentSnapshotData = {HeapTupleSatisfiesMVCC};
145 static SnapshotData SecondarySnapshotData = {HeapTupleSatisfiesMVCC};
146 SnapshotData CatalogSnapshotData = {HeapTupleSatisfiesMVCC};
147
148 /* Pointers to valid snapshots */
149 static Snapshot CurrentSnapshot = NULL;
150 static Snapshot SecondarySnapshot = NULL;
151 static Snapshot CatalogSnapshot = NULL;
152 static Snapshot HistoricSnapshot = NULL;
153
154 /*
155  * These are updated by GetSnapshotData.  We initialize them this way
156  * for the convenience of TransactionIdIsInProgress: even in bootstrap
157  * mode, we don't want it to say that BootstrapTransactionId is in progress.
158  *
159  * RecentGlobalXmin and RecentGlobalDataXmin are initialized to
160  * InvalidTransactionId, to ensure that no one tries to use a stale
161  * value. Readers should ensure that it has been set to something else
162  * before using it.
163  */
164 TransactionId TransactionXmin = FirstNormalTransactionId;
165 TransactionId RecentXmin = FirstNormalTransactionId;
166 TransactionId RecentGlobalXmin = InvalidTransactionId;
167 TransactionId RecentGlobalDataXmin = InvalidTransactionId;
168
169 /* (table, ctid) => (cmin, cmax) mapping during timetravel */
170 static HTAB *tuplecid_data = NULL;
171
172 /*
173  * Elements of the active snapshot stack.
174  *
175  * Each element here accounts for exactly one active_count on SnapshotData.
176  *
177  * NB: the code assumes that elements in this list are in non-increasing
178  * order of as_level; also, the list must be NULL-terminated.
179  */
180 typedef struct ActiveSnapshotElt
181 {
182         Snapshot        as_snap;
183         int                     as_level;
184         struct ActiveSnapshotElt *as_next;
185 } ActiveSnapshotElt;
186
187 /* Top of the stack of active snapshots */
188 static ActiveSnapshotElt *ActiveSnapshot = NULL;
189
190 /* Bottom of the stack of active snapshots */
191 static ActiveSnapshotElt *OldestActiveSnapshot = NULL;
192
193 /*
194  * Currently registered Snapshots.  Ordered in a heap by xmin, so that we can
195  * quickly find the one with lowest xmin, to advance our MyPgXact->xmin.
196  */
197 static int xmin_cmp(const pairingheap_node *a, const pairingheap_node *b,
198                  void *arg);
199
200 static pairingheap RegisteredSnapshots = {&xmin_cmp, NULL, NULL};
201
202 /* first GetTransactionSnapshot call in a transaction? */
203 bool            FirstSnapshotSet = false;
204
205 /*
206  * Remember the serializable transaction snapshot, if any.  We cannot trust
207  * FirstSnapshotSet in combination with IsolationUsesXactSnapshot(), because
208  * GUC may be reset before us, changing the value of IsolationUsesXactSnapshot.
209  */
210 static Snapshot FirstXactSnapshot = NULL;
211
212 /* Define pathname of exported-snapshot files */
213 #define SNAPSHOT_EXPORT_DIR "pg_snapshots"
214 #define XactExportFilePath(path, xid, num, suffix) \
215         snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%d%s", \
216                          xid, num, suffix)
217
218 /* Current xact's exported snapshots (a list of Snapshot structs) */
219 static List *exportedSnapshots = NIL;
220
221 /* Prototypes for local functions */
222 static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts);
223 static Snapshot CopySnapshot(Snapshot snapshot);
224 static void FreeSnapshot(Snapshot snapshot);
225 static void SnapshotResetXmin(void);
226
227 /*
228  * Snapshot fields to be serialized.
229  *
230  * Only these fields need to be sent to the cooperating backend; the
231  * remaining ones can (and must) be set by the receiver upon restore.
232  */
233 typedef struct SerializedSnapshotData
234 {
235         TransactionId xmin;
236         TransactionId xmax;
237         uint32          xcnt;
238         int32           subxcnt;
239         bool            suboverflowed;
240         bool            takenDuringRecovery;
241         CommandId       curcid;
242         TimestampTz whenTaken;
243         XLogRecPtr      lsn;
244 } SerializedSnapshotData;
245
246 Size
247 SnapMgrShmemSize(void)
248 {
249         Size            size;
250
251         size = offsetof(OldSnapshotControlData, xid_by_minute);
252         if (old_snapshot_threshold > 0)
253                 size = add_size(size, mul_size(sizeof(TransactionId),
254                                                                            OLD_SNAPSHOT_TIME_MAP_ENTRIES));
255
256         return size;
257 }
258
259 /*
260  * Initialize for managing old snapshot detection.
261  */
262 void
263 SnapMgrInit(void)
264 {
265         bool            found;
266
267         /*
268          * Create or attach to the OldSnapshotControlData structure.
269          */
270         oldSnapshotControl = (volatile OldSnapshotControlData *)
271                 ShmemInitStruct("OldSnapshotControlData",
272                                                 SnapMgrShmemSize(), &found);
273
274         if (!found)
275         {
276                 SpinLockInit(&oldSnapshotControl->mutex_current);
277                 oldSnapshotControl->current_timestamp = 0;
278                 SpinLockInit(&oldSnapshotControl->mutex_latest_xmin);
279                 oldSnapshotControl->latest_xmin = InvalidTransactionId;
280                 oldSnapshotControl->next_map_update = 0;
281                 SpinLockInit(&oldSnapshotControl->mutex_threshold);
282                 oldSnapshotControl->threshold_timestamp = 0;
283                 oldSnapshotControl->threshold_xid = InvalidTransactionId;
284                 oldSnapshotControl->head_offset = 0;
285                 oldSnapshotControl->head_timestamp = 0;
286                 oldSnapshotControl->count_used = 0;
287         }
288 }
289
290 /*
291  * GetTransactionSnapshot
292  *              Get the appropriate snapshot for a new query in a transaction.
293  *
294  * Note that the return value may point at static storage that will be modified
295  * by future calls and by CommandCounterIncrement().  Callers should call
296  * RegisterSnapshot or PushActiveSnapshot on the returned snap if it is to be
297  * used very long.
298  */
299 Snapshot
300 GetTransactionSnapshot(void)
301 {
302         /*
303          * Return historic snapshot if doing logical decoding. We'll never need a
304          * non-historic transaction snapshot in this (sub-)transaction, so there's
305          * no need to be careful to set one up for later calls to
306          * GetTransactionSnapshot().
307          */
308         if (HistoricSnapshotActive())
309         {
310                 Assert(!FirstSnapshotSet);
311                 return HistoricSnapshot;
312         }
313
314         /* First call in transaction? */
315         if (!FirstSnapshotSet)
316         {
317                 /*
318                  * Don't allow catalog snapshot to be older than xact snapshot.  Must
319                  * do this first to allow the empty-heap Assert to succeed.
320                  */
321                 InvalidateCatalogSnapshot();
322
323                 Assert(pairingheap_is_empty(&RegisteredSnapshots));
324                 Assert(FirstXactSnapshot == NULL);
325
326                 if (IsInParallelMode())
327                         elog(ERROR,
328                                  "cannot take query snapshot during a parallel operation");
329
330                 /*
331                  * In transaction-snapshot mode, the first snapshot must live until
332                  * end of xact regardless of what the caller does with it, so we must
333                  * make a copy of it rather than returning CurrentSnapshotData
334                  * directly.  Furthermore, if we're running in serializable mode,
335                  * predicate.c needs to wrap the snapshot fetch in its own processing.
336                  */
337                 if (IsolationUsesXactSnapshot())
338                 {
339                         /* First, create the snapshot in CurrentSnapshotData */
340                         if (IsolationIsSerializable())
341                                 CurrentSnapshot = GetSerializableTransactionSnapshot(&CurrentSnapshotData);
342                         else
343                                 CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
344                         /* Make a saved copy */
345                         CurrentSnapshot = CopySnapshot(CurrentSnapshot);
346                         FirstXactSnapshot = CurrentSnapshot;
347                         /* Mark it as "registered" in FirstXactSnapshot */
348                         FirstXactSnapshot->regd_count++;
349                         pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
350                 }
351                 else
352                         CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
353
354                 FirstSnapshotSet = true;
355                 return CurrentSnapshot;
356         }
357
358         if (IsolationUsesXactSnapshot())
359                 return CurrentSnapshot;
360
361         /* Don't allow catalog snapshot to be older than xact snapshot. */
362         InvalidateCatalogSnapshot();
363
364         CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
365
366         return CurrentSnapshot;
367 }
368
369 /*
370  * GetLatestSnapshot
371  *              Get a snapshot that is up-to-date as of the current instant,
372  *              even if we are executing in transaction-snapshot mode.
373  */
374 Snapshot
375 GetLatestSnapshot(void)
376 {
377         /*
378          * We might be able to relax this, but nothing that could otherwise work
379          * needs it.
380          */
381         if (IsInParallelMode())
382                 elog(ERROR,
383                          "cannot update SecondarySnapshot during a parallel operation");
384
385         /*
386          * So far there are no cases requiring support for GetLatestSnapshot()
387          * during logical decoding, but it wouldn't be hard to add if required.
388          */
389         Assert(!HistoricSnapshotActive());
390
391         /* If first call in transaction, go ahead and set the xact snapshot */
392         if (!FirstSnapshotSet)
393                 return GetTransactionSnapshot();
394
395         SecondarySnapshot = GetSnapshotData(&SecondarySnapshotData);
396
397         return SecondarySnapshot;
398 }
399
400 /*
401  * GetOldestSnapshot
402  *
403  *              Get the transaction's oldest known snapshot, as judged by the LSN.
404  *              Will return NULL if there are no active or registered snapshots.
405  */
406 Snapshot
407 GetOldestSnapshot(void)
408 {
409         Snapshot        OldestRegisteredSnapshot = NULL;
410         XLogRecPtr      RegisteredLSN = InvalidXLogRecPtr;
411
412         if (!pairingheap_is_empty(&RegisteredSnapshots))
413         {
414                 OldestRegisteredSnapshot = pairingheap_container(SnapshotData, ph_node,
415                                                                         pairingheap_first(&RegisteredSnapshots));
416                 RegisteredLSN = OldestRegisteredSnapshot->lsn;
417         }
418
419         if (OldestActiveSnapshot != NULL)
420         {
421                 XLogRecPtr      ActiveLSN = OldestActiveSnapshot->as_snap->lsn;
422
423                 if (XLogRecPtrIsInvalid(RegisteredLSN) || RegisteredLSN > ActiveLSN)
424                         return OldestActiveSnapshot->as_snap;
425         }
426
427         return OldestRegisteredSnapshot;
428 }
429
430 /*
431  * GetCatalogSnapshot
432  *              Get a snapshot that is sufficiently up-to-date for scan of the
433  *              system catalog with the specified OID.
434  */
435 Snapshot
436 GetCatalogSnapshot(Oid relid)
437 {
438         /*
439          * Return historic snapshot while we're doing logical decoding, so we can
440          * see the appropriate state of the catalog.
441          *
442          * This is the primary reason for needing to reset the system caches after
443          * finishing decoding.
444          */
445         if (HistoricSnapshotActive())
446                 return HistoricSnapshot;
447
448         return GetNonHistoricCatalogSnapshot(relid);
449 }
450
451 /*
452  * GetNonHistoricCatalogSnapshot
453  *              Get a snapshot that is sufficiently up-to-date for scan of the system
454  *              catalog with the specified OID, even while historic snapshots are set
455  *              up.
456  */
457 Snapshot
458 GetNonHistoricCatalogSnapshot(Oid relid)
459 {
460         /*
461          * If the caller is trying to scan a relation that has no syscache, no
462          * catcache invalidations will be sent when it is updated.  For a few key
463          * relations, snapshot invalidations are sent instead.  If we're trying to
464          * scan a relation for which neither catcache nor snapshot invalidations
465          * are sent, we must refresh the snapshot every time.
466          */
467         if (CatalogSnapshot &&
468                 !RelationInvalidatesSnapshotsOnly(relid) &&
469                 !RelationHasSysCache(relid))
470                 InvalidateCatalogSnapshot();
471
472         if (CatalogSnapshot == NULL)
473         {
474                 /* Get new snapshot. */
475                 CatalogSnapshot = GetSnapshotData(&CatalogSnapshotData);
476
477                 /*
478                  * Make sure the catalog snapshot will be accounted for in decisions
479                  * about advancing PGXACT->xmin.  We could apply RegisterSnapshot, but
480                  * that would result in making a physical copy, which is overkill; and
481                  * it would also create a dependency on some resource owner, which we
482                  * do not want for reasons explained at the head of this file. Instead
483                  * just shove the CatalogSnapshot into the pairing heap manually. This
484                  * has to be reversed in InvalidateCatalogSnapshot, of course.
485                  *
486                  * NB: it had better be impossible for this to throw error, since the
487                  * CatalogSnapshot pointer is already valid.
488                  */
489                 pairingheap_add(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
490         }
491
492         return CatalogSnapshot;
493 }
494
495 /*
496  * InvalidateCatalogSnapshot
497  *              Mark the current catalog snapshot, if any, as invalid
498  *
499  * We could change this API to allow the caller to provide more fine-grained
500  * invalidation details, so that a change to relation A wouldn't prevent us
501  * from using our cached snapshot to scan relation B, but so far there's no
502  * evidence that the CPU cycles we spent tracking such fine details would be
503  * well-spent.
504  */
505 void
506 InvalidateCatalogSnapshot(void)
507 {
508         if (CatalogSnapshot)
509         {
510                 pairingheap_remove(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
511                 CatalogSnapshot = NULL;
512                 SnapshotResetXmin();
513         }
514 }
515
516 /*
517  * InvalidateCatalogSnapshotConditionally
518  *              Drop catalog snapshot if it's the only one we have
519  *
520  * This is called when we are about to wait for client input, so we don't
521  * want to continue holding the catalog snapshot if it might mean that the
522  * global xmin horizon can't advance.  However, if there are other snapshots
523  * still active or registered, the catalog snapshot isn't likely to be the
524  * oldest one, so we might as well keep it.
525  */
526 void
527 InvalidateCatalogSnapshotConditionally(void)
528 {
529         if (CatalogSnapshot &&
530                 ActiveSnapshot == NULL &&
531                 pairingheap_is_singular(&RegisteredSnapshots))
532                 InvalidateCatalogSnapshot();
533 }
534
535 /*
536  * SnapshotSetCommandId
537  *              Propagate CommandCounterIncrement into the static snapshots, if set
538  */
539 void
540 SnapshotSetCommandId(CommandId curcid)
541 {
542         if (!FirstSnapshotSet)
543                 return;
544
545         if (CurrentSnapshot)
546                 CurrentSnapshot->curcid = curcid;
547         if (SecondarySnapshot)
548                 SecondarySnapshot->curcid = curcid;
549         /* Should we do the same with CatalogSnapshot? */
550 }
551
552 /*
553  * SetTransactionSnapshot
554  *              Set the transaction's snapshot from an imported MVCC snapshot.
555  *
556  * Note that this is very closely tied to GetTransactionSnapshot --- it
557  * must take care of all the same considerations as the first-snapshot case
558  * in GetTransactionSnapshot.
559  */
560 static void
561 SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
562                                            PGPROC *sourceproc)
563 {
564         /* Caller should have checked this already */
565         Assert(!FirstSnapshotSet);
566
567         /* Better do this to ensure following Assert succeeds. */
568         InvalidateCatalogSnapshot();
569
570         Assert(pairingheap_is_empty(&RegisteredSnapshots));
571         Assert(FirstXactSnapshot == NULL);
572         Assert(!HistoricSnapshotActive());
573
574         /*
575          * Even though we are not going to use the snapshot it computes, we must
576          * call GetSnapshotData, for two reasons: (1) to be sure that
577          * CurrentSnapshotData's XID arrays have been allocated, and (2) to update
578          * RecentXmin and RecentGlobalXmin.  (We could alternatively include those
579          * two variables in exported snapshot files, but it seems better to have
580          * snapshot importers compute reasonably up-to-date values for them.)
581          */
582         CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
583
584         /*
585          * Now copy appropriate fields from the source snapshot.
586          */
587         CurrentSnapshot->xmin = sourcesnap->xmin;
588         CurrentSnapshot->xmax = sourcesnap->xmax;
589         CurrentSnapshot->xcnt = sourcesnap->xcnt;
590         Assert(sourcesnap->xcnt <= GetMaxSnapshotXidCount());
591         memcpy(CurrentSnapshot->xip, sourcesnap->xip,
592                    sourcesnap->xcnt * sizeof(TransactionId));
593         CurrentSnapshot->subxcnt = sourcesnap->subxcnt;
594         Assert(sourcesnap->subxcnt <= GetMaxSnapshotSubxidCount());
595         memcpy(CurrentSnapshot->subxip, sourcesnap->subxip,
596                    sourcesnap->subxcnt * sizeof(TransactionId));
597         CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed;
598         CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery;
599         /* NB: curcid should NOT be copied, it's a local matter */
600
601         /*
602          * Now we have to fix what GetSnapshotData did with MyPgXact->xmin and
603          * TransactionXmin.  There is a race condition: to make sure we are not
604          * causing the global xmin to go backwards, we have to test that the
605          * source transaction is still running, and that has to be done
606          * atomically. So let procarray.c do it.
607          *
608          * Note: in serializable mode, predicate.c will do this a second time. It
609          * doesn't seem worth contorting the logic here to avoid two calls,
610          * especially since it's not clear that predicate.c *must* do this.
611          */
612         if (sourceproc != NULL)
613         {
614                 if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc))
615                         ereport(ERROR,
616                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
617                                          errmsg("could not import the requested snapshot"),
618                            errdetail("The source transaction is not running anymore.")));
619         }
620         else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
621                 ereport(ERROR,
622                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
623                                  errmsg("could not import the requested snapshot"),
624                            errdetail("The source transaction %u is not running anymore.",
625                                                  sourcexid)));
626
627         /*
628          * In transaction-snapshot mode, the first snapshot must live until end of
629          * xact, so we must make a copy of it.  Furthermore, if we're running in
630          * serializable mode, predicate.c needs to do its own processing.
631          */
632         if (IsolationUsesXactSnapshot())
633         {
634                 if (IsolationIsSerializable())
635                         SetSerializableTransactionSnapshot(CurrentSnapshot, sourcexid);
636                 /* Make a saved copy */
637                 CurrentSnapshot = CopySnapshot(CurrentSnapshot);
638                 FirstXactSnapshot = CurrentSnapshot;
639                 /* Mark it as "registered" in FirstXactSnapshot */
640                 FirstXactSnapshot->regd_count++;
641                 pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
642         }
643
644         FirstSnapshotSet = true;
645 }
646
647 /*
648  * CopySnapshot
649  *              Copy the given snapshot.
650  *
651  * The copy is palloc'd in TopTransactionContext and has initial refcounts set
652  * to 0.  The returned snapshot has the copied flag set.
653  */
654 static Snapshot
655 CopySnapshot(Snapshot snapshot)
656 {
657         Snapshot        newsnap;
658         Size            subxipoff;
659         Size            size;
660
661         Assert(snapshot != InvalidSnapshot);
662
663         /* We allocate any XID arrays needed in the same palloc block. */
664         size = subxipoff = sizeof(SnapshotData) +
665                 snapshot->xcnt * sizeof(TransactionId);
666         if (snapshot->subxcnt > 0)
667                 size += snapshot->subxcnt * sizeof(TransactionId);
668
669         newsnap = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
670         memcpy(newsnap, snapshot, sizeof(SnapshotData));
671
672         newsnap->regd_count = 0;
673         newsnap->active_count = 0;
674         newsnap->copied = true;
675
676         /* setup XID array */
677         if (snapshot->xcnt > 0)
678         {
679                 newsnap->xip = (TransactionId *) (newsnap + 1);
680                 memcpy(newsnap->xip, snapshot->xip,
681                            snapshot->xcnt * sizeof(TransactionId));
682         }
683         else
684                 newsnap->xip = NULL;
685
686         /*
687          * Setup subXID array. Don't bother to copy it if it had overflowed,
688          * though, because it's not used anywhere in that case. Except if it's a
689          * snapshot taken during recovery; all the top-level XIDs are in subxip as
690          * well in that case, so we mustn't lose them.
691          */
692         if (snapshot->subxcnt > 0 &&
693                 (!snapshot->suboverflowed || snapshot->takenDuringRecovery))
694         {
695                 newsnap->subxip = (TransactionId *) ((char *) newsnap + subxipoff);
696                 memcpy(newsnap->subxip, snapshot->subxip,
697                            snapshot->subxcnt * sizeof(TransactionId));
698         }
699         else
700                 newsnap->subxip = NULL;
701
702         return newsnap;
703 }
704
705 /*
706  * FreeSnapshot
707  *              Free the memory associated with a snapshot.
708  */
709 static void
710 FreeSnapshot(Snapshot snapshot)
711 {
712         Assert(snapshot->regd_count == 0);
713         Assert(snapshot->active_count == 0);
714         Assert(snapshot->copied);
715
716         pfree(snapshot);
717 }
718
719 /*
720  * PushActiveSnapshot
721  *              Set the given snapshot as the current active snapshot
722  *
723  * If the passed snapshot is a statically-allocated one, or it is possibly
724  * subject to a future command counter update, create a new long-lived copy
725  * with active refcount=1.  Otherwise, only increment the refcount.
726  */
727 void
728 PushActiveSnapshot(Snapshot snap)
729 {
730         ActiveSnapshotElt *newactive;
731
732         Assert(snap != InvalidSnapshot);
733
734         newactive = MemoryContextAlloc(TopTransactionContext, sizeof(ActiveSnapshotElt));
735
736         /*
737          * Checking SecondarySnapshot is probably useless here, but it seems
738          * better to be sure.
739          */
740         if (snap == CurrentSnapshot || snap == SecondarySnapshot || !snap->copied)
741                 newactive->as_snap = CopySnapshot(snap);
742         else
743                 newactive->as_snap = snap;
744
745         newactive->as_next = ActiveSnapshot;
746         newactive->as_level = GetCurrentTransactionNestLevel();
747
748         newactive->as_snap->active_count++;
749
750         ActiveSnapshot = newactive;
751         if (OldestActiveSnapshot == NULL)
752                 OldestActiveSnapshot = ActiveSnapshot;
753 }
754
755 /*
756  * PushCopiedSnapshot
757  *              As above, except forcibly copy the presented snapshot.
758  *
759  * This should be used when the ActiveSnapshot has to be modifiable, for
760  * example if the caller intends to call UpdateActiveSnapshotCommandId.
761  * The new snapshot will be released when popped from the stack.
762  */
763 void
764 PushCopiedSnapshot(Snapshot snapshot)
765 {
766         PushActiveSnapshot(CopySnapshot(snapshot));
767 }
768
769 /*
770  * UpdateActiveSnapshotCommandId
771  *
772  * Update the current CID of the active snapshot.  This can only be applied
773  * to a snapshot that is not referenced elsewhere.
774  */
775 void
776 UpdateActiveSnapshotCommandId(void)
777 {
778         CommandId       save_curcid,
779                                 curcid;
780
781         Assert(ActiveSnapshot != NULL);
782         Assert(ActiveSnapshot->as_snap->active_count == 1);
783         Assert(ActiveSnapshot->as_snap->regd_count == 0);
784
785         /*
786          * Don't allow modification of the active snapshot during parallel
787          * operation.  We share the snapshot to worker backends at the beginning
788          * of parallel operation, so any change to the snapshot can lead to
789          * inconsistencies.  We have other defenses against
790          * CommandCounterIncrement, but there are a few places that call this
791          * directly, so we put an additional guard here.
792          */
793         save_curcid = ActiveSnapshot->as_snap->curcid;
794         curcid = GetCurrentCommandId(false);
795         if (IsInParallelMode() && save_curcid != curcid)
796                 elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation");
797         ActiveSnapshot->as_snap->curcid = curcid;
798 }
799
800 /*
801  * PopActiveSnapshot
802  *
803  * Remove the topmost snapshot from the active snapshot stack, decrementing the
804  * reference count, and free it if this was the last reference.
805  */
806 void
807 PopActiveSnapshot(void)
808 {
809         ActiveSnapshotElt *newstack;
810
811         newstack = ActiveSnapshot->as_next;
812
813         Assert(ActiveSnapshot->as_snap->active_count > 0);
814
815         ActiveSnapshot->as_snap->active_count--;
816
817         if (ActiveSnapshot->as_snap->active_count == 0 &&
818                 ActiveSnapshot->as_snap->regd_count == 0)
819                 FreeSnapshot(ActiveSnapshot->as_snap);
820
821         pfree(ActiveSnapshot);
822         ActiveSnapshot = newstack;
823         if (ActiveSnapshot == NULL)
824                 OldestActiveSnapshot = NULL;
825
826         SnapshotResetXmin();
827 }
828
829 /*
830  * GetActiveSnapshot
831  *              Return the topmost snapshot in the Active stack.
832  */
833 Snapshot
834 GetActiveSnapshot(void)
835 {
836         Assert(ActiveSnapshot != NULL);
837
838         return ActiveSnapshot->as_snap;
839 }
840
841 /*
842  * ActiveSnapshotSet
843  *              Return whether there is at least one snapshot in the Active stack
844  */
845 bool
846 ActiveSnapshotSet(void)
847 {
848         return ActiveSnapshot != NULL;
849 }
850
851 /*
852  * RegisterSnapshot
853  *              Register a snapshot as being in use by the current resource owner
854  *
855  * If InvalidSnapshot is passed, it is not registered.
856  */
857 Snapshot
858 RegisterSnapshot(Snapshot snapshot)
859 {
860         if (snapshot == InvalidSnapshot)
861                 return InvalidSnapshot;
862
863         return RegisterSnapshotOnOwner(snapshot, CurrentResourceOwner);
864 }
865
866 /*
867  * RegisterSnapshotOnOwner
868  *              As above, but use the specified resource owner
869  */
870 Snapshot
871 RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner)
872 {
873         Snapshot        snap;
874
875         if (snapshot == InvalidSnapshot)
876                 return InvalidSnapshot;
877
878         /* Static snapshot?  Create a persistent copy */
879         snap = snapshot->copied ? snapshot : CopySnapshot(snapshot);
880
881         /* and tell resowner.c about it */
882         ResourceOwnerEnlargeSnapshots(owner);
883         snap->regd_count++;
884         ResourceOwnerRememberSnapshot(owner, snap);
885
886         if (snap->regd_count == 1)
887                 pairingheap_add(&RegisteredSnapshots, &snap->ph_node);
888
889         return snap;
890 }
891
892 /*
893  * UnregisterSnapshot
894  *
895  * Decrement the reference count of a snapshot, remove the corresponding
896  * reference from CurrentResourceOwner, and free the snapshot if no more
897  * references remain.
898  */
899 void
900 UnregisterSnapshot(Snapshot snapshot)
901 {
902         if (snapshot == NULL)
903                 return;
904
905         UnregisterSnapshotFromOwner(snapshot, CurrentResourceOwner);
906 }
907
908 /*
909  * UnregisterSnapshotFromOwner
910  *              As above, but use the specified resource owner
911  */
912 void
913 UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner)
914 {
915         if (snapshot == NULL)
916                 return;
917
918         Assert(snapshot->regd_count > 0);
919         Assert(!pairingheap_is_empty(&RegisteredSnapshots));
920
921         ResourceOwnerForgetSnapshot(owner, snapshot);
922
923         snapshot->regd_count--;
924         if (snapshot->regd_count == 0)
925                 pairingheap_remove(&RegisteredSnapshots, &snapshot->ph_node);
926
927         if (snapshot->regd_count == 0 && snapshot->active_count == 0)
928         {
929                 FreeSnapshot(snapshot);
930                 SnapshotResetXmin();
931         }
932 }
933
934 /*
935  * Comparison function for RegisteredSnapshots heap.  Snapshots are ordered
936  * by xmin, so that the snapshot with smallest xmin is at the top.
937  */
938 static int
939 xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
940 {
941         const SnapshotData *asnap = pairingheap_const_container(SnapshotData, ph_node, a);
942         const SnapshotData *bsnap = pairingheap_const_container(SnapshotData, ph_node, b);
943
944         if (TransactionIdPrecedes(asnap->xmin, bsnap->xmin))
945                 return 1;
946         else if (TransactionIdFollows(asnap->xmin, bsnap->xmin))
947                 return -1;
948         else
949                 return 0;
950 }
951
952 /*
953  * SnapshotResetXmin
954  *
955  * If there are no more snapshots, we can reset our PGXACT->xmin to InvalidXid.
956  * Note we can do this without locking because we assume that storing an Xid
957  * is atomic.
958  *
959  * Even if there are some remaining snapshots, we may be able to advance our
960  * PGXACT->xmin to some degree.  This typically happens when a portal is
961  * dropped.  For efficiency, we only consider recomputing PGXACT->xmin when
962  * the active snapshot stack is empty; this allows us not to need to track
963  * which active snapshot is oldest.
964  *
965  * Note: it's tempting to use GetOldestSnapshot() here so that we can include
966  * active snapshots in the calculation.  However, that compares by LSN not
967  * xmin so it's not entirely clear that it's the same thing.  Also, we'd be
968  * critically dependent on the assumption that the bottommost active snapshot
969  * stack entry has the oldest xmin.  (Current uses of GetOldestSnapshot() are
970  * not actually critical, but this would be.)
971  */
972 static void
973 SnapshotResetXmin(void)
974 {
975         Snapshot        minSnapshot;
976
977         if (ActiveSnapshot != NULL)
978                 return;
979
980         if (pairingheap_is_empty(&RegisteredSnapshots))
981         {
982                 MyPgXact->xmin = InvalidTransactionId;
983                 return;
984         }
985
986         minSnapshot = pairingheap_container(SnapshotData, ph_node,
987                                                                         pairingheap_first(&RegisteredSnapshots));
988
989         if (TransactionIdPrecedes(MyPgXact->xmin, minSnapshot->xmin))
990                 MyPgXact->xmin = minSnapshot->xmin;
991 }
992
993 /*
994  * AtSubCommit_Snapshot
995  */
996 void
997 AtSubCommit_Snapshot(int level)
998 {
999         ActiveSnapshotElt *active;
1000
1001         /*
1002          * Relabel the active snapshots set in this subtransaction as though they
1003          * are owned by the parent subxact.
1004          */
1005         for (active = ActiveSnapshot; active != NULL; active = active->as_next)
1006         {
1007                 if (active->as_level < level)
1008                         break;
1009                 active->as_level = level - 1;
1010         }
1011 }
1012
1013 /*
1014  * AtSubAbort_Snapshot
1015  *              Clean up snapshots after a subtransaction abort
1016  */
1017 void
1018 AtSubAbort_Snapshot(int level)
1019 {
1020         /* Forget the active snapshots set by this subtransaction */
1021         while (ActiveSnapshot && ActiveSnapshot->as_level >= level)
1022         {
1023                 ActiveSnapshotElt *next;
1024
1025                 next = ActiveSnapshot->as_next;
1026
1027                 /*
1028                  * Decrement the snapshot's active count.  If it's still registered or
1029                  * marked as active by an outer subtransaction, we can't free it yet.
1030                  */
1031                 Assert(ActiveSnapshot->as_snap->active_count >= 1);
1032                 ActiveSnapshot->as_snap->active_count -= 1;
1033
1034                 if (ActiveSnapshot->as_snap->active_count == 0 &&
1035                         ActiveSnapshot->as_snap->regd_count == 0)
1036                         FreeSnapshot(ActiveSnapshot->as_snap);
1037
1038                 /* and free the stack element */
1039                 pfree(ActiveSnapshot);
1040
1041                 ActiveSnapshot = next;
1042                 if (ActiveSnapshot == NULL)
1043                         OldestActiveSnapshot = NULL;
1044         }
1045
1046         SnapshotResetXmin();
1047 }
1048
1049 /*
1050  * AtEOXact_Snapshot
1051  *              Snapshot manager's cleanup function for end of transaction
1052  */
1053 void
1054 AtEOXact_Snapshot(bool isCommit)
1055 {
1056         /*
1057          * In transaction-snapshot mode we must release our privately-managed
1058          * reference to the transaction snapshot.  We must remove it from
1059          * RegisteredSnapshots to keep the check below happy.  But we don't bother
1060          * to do FreeSnapshot, for two reasons: the memory will go away with
1061          * TopTransactionContext anyway, and if someone has left the snapshot
1062          * stacked as active, we don't want the code below to be chasing through a
1063          * dangling pointer.
1064          */
1065         if (FirstXactSnapshot != NULL)
1066         {
1067                 Assert(FirstXactSnapshot->regd_count > 0);
1068                 Assert(!pairingheap_is_empty(&RegisteredSnapshots));
1069                 pairingheap_remove(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
1070         }
1071         FirstXactSnapshot = NULL;
1072
1073         /*
1074          * If we exported any snapshots, clean them up.
1075          */
1076         if (exportedSnapshots != NIL)
1077         {
1078                 TransactionId myxid = GetTopTransactionId();
1079                 int                     i;
1080                 char            buf[MAXPGPATH];
1081                 ListCell   *lc;
1082
1083                 /*
1084                  * Get rid of the files.  Unlink failure is only a WARNING because (1)
1085                  * it's too late to abort the transaction, and (2) leaving a leaked
1086                  * file around has little real consequence anyway.
1087                  */
1088                 for (i = 1; i <= list_length(exportedSnapshots); i++)
1089                 {
1090                         XactExportFilePath(buf, myxid, i, "");
1091                         if (unlink(buf))
1092                                 elog(WARNING, "could not unlink file \"%s\": %m", buf);
1093                 }
1094
1095                 /*
1096                  * As with the FirstXactSnapshot, we needn't spend any effort on
1097                  * cleaning up the per-snapshot data structures, but we do need to
1098                  * remove them from RegisteredSnapshots to prevent a warning below.
1099                  */
1100                 foreach(lc, exportedSnapshots)
1101                 {
1102                         Snapshot        snap = (Snapshot) lfirst(lc);
1103
1104                         pairingheap_remove(&RegisteredSnapshots, &snap->ph_node);
1105                 }
1106
1107                 exportedSnapshots = NIL;
1108         }
1109
1110         /* Drop catalog snapshot if any */
1111         InvalidateCatalogSnapshot();
1112
1113         /* On commit, complain about leftover snapshots */
1114         if (isCommit)
1115         {
1116                 ActiveSnapshotElt *active;
1117
1118                 if (!pairingheap_is_empty(&RegisteredSnapshots))
1119                         elog(WARNING, "registered snapshots seem to remain after cleanup");
1120
1121                 /* complain about unpopped active snapshots */
1122                 for (active = ActiveSnapshot; active != NULL; active = active->as_next)
1123                         elog(WARNING, "snapshot %p still active", active);
1124         }
1125
1126         /*
1127          * And reset our state.  We don't need to free the memory explicitly --
1128          * it'll go away with TopTransactionContext.
1129          */
1130         ActiveSnapshot = NULL;
1131         OldestActiveSnapshot = NULL;
1132         pairingheap_reset(&RegisteredSnapshots);
1133
1134         CurrentSnapshot = NULL;
1135         SecondarySnapshot = NULL;
1136
1137         FirstSnapshotSet = false;
1138
1139         SnapshotResetXmin();
1140 }
1141
1142
1143 /*
1144  * ExportSnapshot
1145  *              Export the snapshot to a file so that other backends can import it.
1146  *              Returns the token (the file name) that can be used to import this
1147  *              snapshot.
1148  */
1149 char *
1150 ExportSnapshot(Snapshot snapshot)
1151 {
1152         TransactionId topXid;
1153         TransactionId *children;
1154         int                     nchildren;
1155         int                     addTopXid;
1156         StringInfoData buf;
1157         FILE       *f;
1158         int                     i;
1159         MemoryContext oldcxt;
1160         char            path[MAXPGPATH];
1161         char            pathtmp[MAXPGPATH];
1162
1163         /*
1164          * It's tempting to call RequireTransactionChain here, since it's not very
1165          * useful to export a snapshot that will disappear immediately afterwards.
1166          * However, we haven't got enough information to do that, since we don't
1167          * know if we're at top level or not.  For example, we could be inside a
1168          * plpgsql function that is going to fire off other transactions via
1169          * dblink.  Rather than disallow perfectly legitimate usages, don't make a
1170          * check.
1171          *
1172          * Also note that we don't make any restriction on the transaction's
1173          * isolation level; however, importers must check the level if they are
1174          * serializable.
1175          */
1176
1177         /*
1178          * This will assign a transaction ID if we do not yet have one.
1179          */
1180         topXid = GetTopTransactionId();
1181
1182         /*
1183          * We cannot export a snapshot from a subtransaction because there's no
1184          * easy way for importers to verify that the same subtransaction is still
1185          * running.
1186          */
1187         if (IsSubTransaction())
1188                 ereport(ERROR,
1189                                 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1190                                  errmsg("cannot export a snapshot from a subtransaction")));
1191
1192         /*
1193          * We do however allow previous committed subtransactions to exist.
1194          * Importers of the snapshot must see them as still running, so get their
1195          * XIDs to add them to the snapshot.
1196          */
1197         nchildren = xactGetCommittedChildren(&children);
1198
1199         /*
1200          * Copy the snapshot into TopTransactionContext, add it to the
1201          * exportedSnapshots list, and mark it pseudo-registered.  We do this to
1202          * ensure that the snapshot's xmin is honored for the rest of the
1203          * transaction.
1204          */
1205         snapshot = CopySnapshot(snapshot);
1206
1207         oldcxt = MemoryContextSwitchTo(TopTransactionContext);
1208         exportedSnapshots = lappend(exportedSnapshots, snapshot);
1209         MemoryContextSwitchTo(oldcxt);
1210
1211         snapshot->regd_count++;
1212         pairingheap_add(&RegisteredSnapshots, &snapshot->ph_node);
1213
1214         /*
1215          * Fill buf with a text serialization of the snapshot, plus identification
1216          * data about this transaction.  The format expected by ImportSnapshot is
1217          * pretty rigid: each line must be fieldname:value.
1218          */
1219         initStringInfo(&buf);
1220
1221         appendStringInfo(&buf, "xid:%u\n", topXid);
1222         appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
1223         appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);
1224         appendStringInfo(&buf, "ro:%d\n", XactReadOnly);
1225
1226         appendStringInfo(&buf, "xmin:%u\n", snapshot->xmin);
1227         appendStringInfo(&buf, "xmax:%u\n", snapshot->xmax);
1228
1229         /*
1230          * We must include our own top transaction ID in the top-xid data, since
1231          * by definition we will still be running when the importing transaction
1232          * adopts the snapshot, but GetSnapshotData never includes our own XID in
1233          * the snapshot.  (There must, therefore, be enough room to add it.)
1234          *
1235          * However, it could be that our topXid is after the xmax, in which case
1236          * we shouldn't include it because xip[] members are expected to be before
1237          * xmax.  (We need not make the same check for subxip[] members, see
1238          * snapshot.h.)
1239          */
1240         addTopXid = TransactionIdPrecedes(topXid, snapshot->xmax) ? 1 : 0;
1241         appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid);
1242         for (i = 0; i < snapshot->xcnt; i++)
1243                 appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]);
1244         if (addTopXid)
1245                 appendStringInfo(&buf, "xip:%u\n", topXid);
1246
1247         /*
1248          * Similarly, we add our subcommitted child XIDs to the subxid data. Here,
1249          * we have to cope with possible overflow.
1250          */
1251         if (snapshot->suboverflowed ||
1252                 snapshot->subxcnt + nchildren > GetMaxSnapshotSubxidCount())
1253                 appendStringInfoString(&buf, "sof:1\n");
1254         else
1255         {
1256                 appendStringInfoString(&buf, "sof:0\n");
1257                 appendStringInfo(&buf, "sxcnt:%d\n", snapshot->subxcnt + nchildren);
1258                 for (i = 0; i < snapshot->subxcnt; i++)
1259                         appendStringInfo(&buf, "sxp:%u\n", snapshot->subxip[i]);
1260                 for (i = 0; i < nchildren; i++)
1261                         appendStringInfo(&buf, "sxp:%u\n", children[i]);
1262         }
1263         appendStringInfo(&buf, "rec:%u\n", snapshot->takenDuringRecovery);
1264
1265         /*
1266          * Now write the text representation into a file.  We first write to a
1267          * ".tmp" filename, and rename to final filename if no error.  This
1268          * ensures that no other backend can read an incomplete file
1269          * (ImportSnapshot won't allow it because of its valid-characters check).
1270          */
1271         XactExportFilePath(pathtmp, topXid, list_length(exportedSnapshots), ".tmp");
1272         if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
1273                 ereport(ERROR,
1274                                 (errcode_for_file_access(),
1275                                  errmsg("could not create file \"%s\": %m", pathtmp)));
1276
1277         if (fwrite(buf.data, buf.len, 1, f) != 1)
1278                 ereport(ERROR,
1279                                 (errcode_for_file_access(),
1280                                  errmsg("could not write to file \"%s\": %m", pathtmp)));
1281
1282         /* no fsync() since file need not survive a system crash */
1283
1284         if (FreeFile(f))
1285                 ereport(ERROR,
1286                                 (errcode_for_file_access(),
1287                                  errmsg("could not write to file \"%s\": %m", pathtmp)));
1288
1289         /*
1290          * Now that we have written everything into a .tmp file, rename the file
1291          * to remove the .tmp suffix.
1292          */
1293         XactExportFilePath(path, topXid, list_length(exportedSnapshots), "");
1294
1295         if (rename(pathtmp, path) < 0)
1296                 ereport(ERROR,
1297                                 (errcode_for_file_access(),
1298                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
1299                                                 pathtmp, path)));
1300
1301         /*
1302          * The basename of the file is what we return from pg_export_snapshot().
1303          * It's already in path in a textual format and we know that the path
1304          * starts with SNAPSHOT_EXPORT_DIR.  Skip over the prefix and the slash
1305          * and pstrdup it so as not to return the address of a local variable.
1306          */
1307         return pstrdup(path + strlen(SNAPSHOT_EXPORT_DIR) + 1);
1308 }
1309
1310 /*
1311  * pg_export_snapshot
1312  *              SQL-callable wrapper for ExportSnapshot.
1313  */
1314 Datum
1315 pg_export_snapshot(PG_FUNCTION_ARGS)
1316 {
1317         char       *snapshotName;
1318
1319         snapshotName = ExportSnapshot(GetActiveSnapshot());
1320         PG_RETURN_TEXT_P(cstring_to_text(snapshotName));
1321 }
1322
1323
1324 /*
1325  * Parsing subroutines for ImportSnapshot: parse a line with the given
1326  * prefix followed by a value, and advance *s to the next line.  The
1327  * filename is provided for use in error messages.
1328  */
1329 static int
1330 parseIntFromText(const char *prefix, char **s, const char *filename)
1331 {
1332         char       *ptr = *s;
1333         int                     prefixlen = strlen(prefix);
1334         int                     val;
1335
1336         if (strncmp(ptr, prefix, prefixlen) != 0)
1337                 ereport(ERROR,
1338                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1339                                  errmsg("invalid snapshot data in file \"%s\"", filename)));
1340         ptr += prefixlen;
1341         if (sscanf(ptr, "%d", &val) != 1)
1342                 ereport(ERROR,
1343                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1344                                  errmsg("invalid snapshot data in file \"%s\"", filename)));
1345         ptr = strchr(ptr, '\n');
1346         if (!ptr)
1347                 ereport(ERROR,
1348                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1349                                  errmsg("invalid snapshot data in file \"%s\"", filename)));
1350         *s = ptr + 1;
1351         return val;
1352 }
1353
1354 static TransactionId
1355 parseXidFromText(const char *prefix, char **s, const char *filename)
1356 {
1357         char       *ptr = *s;
1358         int                     prefixlen = strlen(prefix);
1359         TransactionId val;
1360
1361         if (strncmp(ptr, prefix, prefixlen) != 0)
1362                 ereport(ERROR,
1363                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1364                                  errmsg("invalid snapshot data in file \"%s\"", filename)));
1365         ptr += prefixlen;
1366         if (sscanf(ptr, "%u", &val) != 1)
1367                 ereport(ERROR,
1368                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1369                                  errmsg("invalid snapshot data in file \"%s\"", filename)));
1370         ptr = strchr(ptr, '\n');
1371         if (!ptr)
1372                 ereport(ERROR,
1373                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1374                                  errmsg("invalid snapshot data in file \"%s\"", filename)));
1375         *s = ptr + 1;
1376         return val;
1377 }
1378
1379 /*
1380  * ImportSnapshot
1381  *              Import a previously exported snapshot.  The argument should be a
1382  *              filename in SNAPSHOT_EXPORT_DIR.  Load the snapshot from that file.
1383  *              This is called by "SET TRANSACTION SNAPSHOT 'foo'".
1384  */
1385 void
1386 ImportSnapshot(const char *idstr)
1387 {
1388         char            path[MAXPGPATH];
1389         FILE       *f;
1390         struct stat stat_buf;
1391         char       *filebuf;
1392         int                     xcnt;
1393         int                     i;
1394         TransactionId src_xid;
1395         Oid                     src_dbid;
1396         int                     src_isolevel;
1397         bool            src_readonly;
1398         SnapshotData snapshot;
1399
1400         /*
1401          * Must be at top level of a fresh transaction.  Note in particular that
1402          * we check we haven't acquired an XID --- if we have, it's conceivable
1403          * that the snapshot would show it as not running, making for very screwy
1404          * behavior.
1405          */
1406         if (FirstSnapshotSet ||
1407                 GetTopTransactionIdIfAny() != InvalidTransactionId ||
1408                 IsSubTransaction())
1409                 ereport(ERROR,
1410                                 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1411                 errmsg("SET TRANSACTION SNAPSHOT must be called before any query")));
1412
1413         /*
1414          * If we are in read committed mode then the next query would execute with
1415          * a new snapshot thus making this function call quite useless.
1416          */
1417         if (!IsolationUsesXactSnapshot())
1418                 ereport(ERROR,
1419                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1420                                  errmsg("a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ")));
1421
1422         /*
1423          * Verify the identifier: only 0-9, A-F and hyphens are allowed.  We do
1424          * this mainly to prevent reading arbitrary files.
1425          */
1426         if (strspn(idstr, "0123456789ABCDEF-") != strlen(idstr))
1427                 ereport(ERROR,
1428                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1429                                  errmsg("invalid snapshot identifier: \"%s\"", idstr)));
1430
1431         /* OK, read the file */
1432         snprintf(path, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", idstr);
1433
1434         f = AllocateFile(path, PG_BINARY_R);
1435         if (!f)
1436                 ereport(ERROR,
1437                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1438                                  errmsg("invalid snapshot identifier: \"%s\"", idstr)));
1439
1440         /* get the size of the file so that we know how much memory we need */
1441         if (fstat(fileno(f), &stat_buf))
1442                 elog(ERROR, "could not stat file \"%s\": %m", path);
1443
1444         /* and read the file into a palloc'd string */
1445         filebuf = (char *) palloc(stat_buf.st_size + 1);
1446         if (fread(filebuf, stat_buf.st_size, 1, f) != 1)
1447                 elog(ERROR, "could not read file \"%s\": %m", path);
1448
1449         filebuf[stat_buf.st_size] = '\0';
1450
1451         FreeFile(f);
1452
1453         /*
1454          * Construct a snapshot struct by parsing the file content.
1455          */
1456         memset(&snapshot, 0, sizeof(snapshot));
1457
1458         src_xid = parseXidFromText("xid:", &filebuf, path);
1459         /* we abuse parseXidFromText a bit here ... */
1460         src_dbid = parseXidFromText("dbid:", &filebuf, path);
1461         src_isolevel = parseIntFromText("iso:", &filebuf, path);
1462         src_readonly = parseIntFromText("ro:", &filebuf, path);
1463
1464         snapshot.xmin = parseXidFromText("xmin:", &filebuf, path);
1465         snapshot.xmax = parseXidFromText("xmax:", &filebuf, path);
1466
1467         snapshot.xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path);
1468
1469         /* sanity-check the xid count before palloc */
1470         if (xcnt < 0 || xcnt > GetMaxSnapshotXidCount())
1471                 ereport(ERROR,
1472                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1473                                  errmsg("invalid snapshot data in file \"%s\"", path)));
1474
1475         snapshot.xip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
1476         for (i = 0; i < xcnt; i++)
1477                 snapshot.xip[i] = parseXidFromText("xip:", &filebuf, path);
1478
1479         snapshot.suboverflowed = parseIntFromText("sof:", &filebuf, path);
1480
1481         if (!snapshot.suboverflowed)
1482         {
1483                 snapshot.subxcnt = xcnt = parseIntFromText("sxcnt:", &filebuf, path);
1484
1485                 /* sanity-check the xid count before palloc */
1486                 if (xcnt < 0 || xcnt > GetMaxSnapshotSubxidCount())
1487                         ereport(ERROR,
1488                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1489                                          errmsg("invalid snapshot data in file \"%s\"", path)));
1490
1491                 snapshot.subxip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
1492                 for (i = 0; i < xcnt; i++)
1493                         snapshot.subxip[i] = parseXidFromText("sxp:", &filebuf, path);
1494         }
1495         else
1496         {
1497                 snapshot.subxcnt = 0;
1498                 snapshot.subxip = NULL;
1499         }
1500
1501         snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path);
1502
1503         /*
1504          * Do some additional sanity checking, just to protect ourselves.  We
1505          * don't trouble to check the array elements, just the most critical
1506          * fields.
1507          */
1508         if (!TransactionIdIsNormal(src_xid) ||
1509                 !OidIsValid(src_dbid) ||
1510                 !TransactionIdIsNormal(snapshot.xmin) ||
1511                 !TransactionIdIsNormal(snapshot.xmax))
1512                 ereport(ERROR,
1513                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1514                                  errmsg("invalid snapshot data in file \"%s\"", path)));
1515
1516         /*
1517          * If we're serializable, the source transaction must be too, otherwise
1518          * predicate.c has problems (SxactGlobalXmin could go backwards).  Also, a
1519          * non-read-only transaction can't adopt a snapshot from a read-only
1520          * transaction, as predicate.c handles the cases very differently.
1521          */
1522         if (IsolationIsSerializable())
1523         {
1524                 if (src_isolevel != XACT_SERIALIZABLE)
1525                         ereport(ERROR,
1526                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1527                                          errmsg("a serializable transaction cannot import a snapshot from a non-serializable transaction")));
1528                 if (src_readonly && !XactReadOnly)
1529                         ereport(ERROR,
1530                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1531                                          errmsg("a non-read-only serializable transaction cannot import a snapshot from a read-only transaction")));
1532         }
1533
1534         /*
1535          * We cannot import a snapshot that was taken in a different database,
1536          * because vacuum calculates OldestXmin on a per-database basis; so the
1537          * source transaction's xmin doesn't protect us from data loss.  This
1538          * restriction could be removed if the source transaction were to mark its
1539          * xmin as being globally applicable.  But that would require some
1540          * additional syntax, since that has to be known when the snapshot is
1541          * initially taken.  (See pgsql-hackers discussion of 2011-10-21.)
1542          */
1543         if (src_dbid != MyDatabaseId)
1544                 ereport(ERROR,
1545                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1546                           errmsg("cannot import a snapshot from a different database")));
1547
1548         /* OK, install the snapshot */
1549         SetTransactionSnapshot(&snapshot, src_xid, NULL);
1550 }
1551
1552 /*
1553  * XactHasExportedSnapshots
1554  *              Test whether current transaction has exported any snapshots.
1555  */
1556 bool
1557 XactHasExportedSnapshots(void)
1558 {
1559         return (exportedSnapshots != NIL);
1560 }
1561
1562 /*
1563  * DeleteAllExportedSnapshotFiles
1564  *              Clean up any files that have been left behind by a crashed backend
1565  *              that had exported snapshots before it died.
1566  *
1567  * This should be called during database startup or crash recovery.
1568  */
1569 void
1570 DeleteAllExportedSnapshotFiles(void)
1571 {
1572         char            buf[MAXPGPATH];
1573         DIR                *s_dir;
1574         struct dirent *s_de;
1575
1576         if (!(s_dir = AllocateDir(SNAPSHOT_EXPORT_DIR)))
1577         {
1578                 /*
1579                  * We really should have that directory in a sane cluster setup. But
1580                  * then again if we don't, it's not fatal enough to make it FATAL.
1581                  * Since we're running in the postmaster, LOG is our best bet.
1582                  */
1583                 elog(LOG, "could not open directory \"%s\": %m", SNAPSHOT_EXPORT_DIR);
1584                 return;
1585         }
1586
1587         while ((s_de = ReadDir(s_dir, SNAPSHOT_EXPORT_DIR)) != NULL)
1588         {
1589                 if (strcmp(s_de->d_name, ".") == 0 ||
1590                         strcmp(s_de->d_name, "..") == 0)
1591                         continue;
1592
1593                 snprintf(buf, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", s_de->d_name);
1594                 /* Again, unlink failure is not worthy of FATAL */
1595                 if (unlink(buf))
1596                         elog(LOG, "could not unlink file \"%s\": %m", buf);
1597         }
1598
1599         FreeDir(s_dir);
1600 }
1601
1602 bool
1603 ThereAreNoPriorRegisteredSnapshots(void)
1604 {
1605         if (pairingheap_is_empty(&RegisteredSnapshots) ||
1606                 pairingheap_is_singular(&RegisteredSnapshots))
1607                 return true;
1608
1609         return false;
1610 }
1611
1612
1613 /*
1614  * Return a timestamp that is exactly on a minute boundary.
1615  *
1616  * If the argument is already aligned, return that value, otherwise move to
1617  * the next minute boundary following the given time.
1618  */
1619 static TimestampTz
1620 AlignTimestampToMinuteBoundary(TimestampTz ts)
1621 {
1622         TimestampTz retval = ts + (USECS_PER_MINUTE - 1);
1623
1624         return retval - (retval % USECS_PER_MINUTE);
1625 }
1626
1627 /*
1628  * Get current timestamp for snapshots
1629  *
1630  * This is basically GetCurrentTimestamp(), but with a guarantee that
1631  * the result never moves backward.
1632  */
1633 TimestampTz
1634 GetSnapshotCurrentTimestamp(void)
1635 {
1636         TimestampTz now = GetCurrentTimestamp();
1637
1638         /*
1639          * Don't let time move backward; if it hasn't advanced, use the old value.
1640          */
1641         SpinLockAcquire(&oldSnapshotControl->mutex_current);
1642         if (now <= oldSnapshotControl->current_timestamp)
1643                 now = oldSnapshotControl->current_timestamp;
1644         else
1645                 oldSnapshotControl->current_timestamp = now;
1646         SpinLockRelease(&oldSnapshotControl->mutex_current);
1647
1648         return now;
1649 }
1650
1651 /*
1652  * Get timestamp through which vacuum may have processed based on last stored
1653  * value for threshold_timestamp.
1654  *
1655  * XXX: So far, we never trust that a 64-bit value can be read atomically; if
1656  * that ever changes, we could get rid of the spinlock here.
1657  */
1658 TimestampTz
1659 GetOldSnapshotThresholdTimestamp(void)
1660 {
1661         TimestampTz threshold_timestamp;
1662
1663         SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1664         threshold_timestamp = oldSnapshotControl->threshold_timestamp;
1665         SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1666
1667         return threshold_timestamp;
1668 }
1669
1670 static void
1671 SetOldSnapshotThresholdTimestamp(TimestampTz ts, TransactionId xlimit)
1672 {
1673         SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1674         oldSnapshotControl->threshold_timestamp = ts;
1675         oldSnapshotControl->threshold_xid = xlimit;
1676         SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1677 }
1678
1679 /*
1680  * TransactionIdLimitedForOldSnapshots
1681  *
1682  * Apply old snapshot limit, if any.  This is intended to be called for page
1683  * pruning and table vacuuming, to allow old_snapshot_threshold to override
1684  * the normal global xmin value.  Actual testing for snapshot too old will be
1685  * based on whether a snapshot timestamp is prior to the threshold timestamp
1686  * set in this function.
1687  */
1688 TransactionId
1689 TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
1690                                                                         Relation relation)
1691 {
1692         if (TransactionIdIsNormal(recentXmin)
1693                 && old_snapshot_threshold >= 0
1694                 && RelationAllowsEarlyPruning(relation))
1695         {
1696                 TimestampTz ts = GetSnapshotCurrentTimestamp();
1697                 TransactionId xlimit = recentXmin;
1698                 TransactionId latest_xmin;
1699                 TimestampTz update_ts;
1700                 bool            same_ts_as_threshold = false;
1701
1702                 SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
1703                 latest_xmin = oldSnapshotControl->latest_xmin;
1704                 update_ts = oldSnapshotControl->next_map_update;
1705                 SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
1706
1707                 /*
1708                  * Zero threshold always overrides to latest xmin, if valid.  Without
1709                  * some heuristic it will find its own snapshot too old on, for
1710                  * example, a simple UPDATE -- which would make it useless for most
1711                  * testing, but there is no principled way to ensure that it doesn't
1712                  * fail in this way.  Use a five-second delay to try to get useful
1713                  * testing behavior, but this may need adjustment.
1714                  */
1715                 if (old_snapshot_threshold == 0)
1716                 {
1717                         if (TransactionIdPrecedes(latest_xmin, MyPgXact->xmin)
1718                                 && TransactionIdFollows(latest_xmin, xlimit))
1719                                 xlimit = latest_xmin;
1720
1721                         ts -= 5 * USECS_PER_SEC;
1722                         SetOldSnapshotThresholdTimestamp(ts, xlimit);
1723
1724                         return xlimit;
1725                 }
1726
1727                 ts = AlignTimestampToMinuteBoundary(ts)
1728                         - (old_snapshot_threshold * USECS_PER_MINUTE);
1729
1730                 /* Check for fast exit without LW locking. */
1731                 SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1732                 if (ts == oldSnapshotControl->threshold_timestamp)
1733                 {
1734                         xlimit = oldSnapshotControl->threshold_xid;
1735                         same_ts_as_threshold = true;
1736                 }
1737                 SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1738
1739                 if (!same_ts_as_threshold)
1740                 {
1741                         if (ts == update_ts)
1742                         {
1743                                 xlimit = latest_xmin;
1744                                 if (NormalTransactionIdFollows(xlimit, recentXmin))
1745                                         SetOldSnapshotThresholdTimestamp(ts, xlimit);
1746                         }
1747                         else
1748                         {
1749                                 LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);
1750
1751                                 if (oldSnapshotControl->count_used > 0
1752                                         && ts >= oldSnapshotControl->head_timestamp)
1753                                 {
1754                                         int                     offset;
1755
1756                                         offset = ((ts - oldSnapshotControl->head_timestamp)
1757                                                           / USECS_PER_MINUTE);
1758                                         if (offset > oldSnapshotControl->count_used - 1)
1759                                                 offset = oldSnapshotControl->count_used - 1;
1760                                         offset = (oldSnapshotControl->head_offset + offset)
1761                                                 % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1762                                         xlimit = oldSnapshotControl->xid_by_minute[offset];
1763
1764                                         if (NormalTransactionIdFollows(xlimit, recentXmin))
1765                                                 SetOldSnapshotThresholdTimestamp(ts, xlimit);
1766                                 }
1767
1768                                 LWLockRelease(OldSnapshotTimeMapLock);
1769                         }
1770                 }
1771
1772                 /*
1773                  * Failsafe protection against vacuuming work of active transaction.
1774                  *
1775                  * This is not an assertion because we avoid the spinlock for
1776                  * performance, leaving open the possibility that xlimit could advance
1777                  * and be more current; but it seems prudent to apply this limit.  It
1778                  * might make pruning a tiny bit less aggressive than it could be, but
1779                  * protects against data loss bugs.
1780                  */
1781                 if (TransactionIdIsNormal(latest_xmin)
1782                         && TransactionIdPrecedes(latest_xmin, xlimit))
1783                         xlimit = latest_xmin;
1784
1785                 if (NormalTransactionIdFollows(xlimit, recentXmin))
1786                         return xlimit;
1787         }
1788
1789         return recentXmin;
1790 }
1791
1792 /*
1793  * Take care of the circular buffer that maps time to xid.
1794  */
1795 void
1796 MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
1797 {
1798         TimestampTz ts;
1799         TransactionId latest_xmin;
1800         TimestampTz update_ts;
1801         bool            map_update_required = false;
1802
1803         /* Never call this function when old snapshot checking is disabled. */
1804         Assert(old_snapshot_threshold >= 0);
1805
1806         ts = AlignTimestampToMinuteBoundary(whenTaken);
1807
1808         /*
1809          * Keep track of the latest xmin seen by any process. Update mapping with
1810          * a new value when we have crossed a bucket boundary.
1811          */
1812         SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
1813         latest_xmin = oldSnapshotControl->latest_xmin;
1814         update_ts = oldSnapshotControl->next_map_update;
1815         if (ts > update_ts)
1816         {
1817                 oldSnapshotControl->next_map_update = ts;
1818                 map_update_required = true;
1819         }
1820         if (TransactionIdFollows(xmin, latest_xmin))
1821                 oldSnapshotControl->latest_xmin = xmin;
1822         SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
1823
1824         /* We only needed to update the most recent xmin value. */
1825         if (!map_update_required)
1826                 return;
1827
1828         /* No further tracking needed for 0 (used for testing). */
1829         if (old_snapshot_threshold == 0)
1830                 return;
1831
1832         /*
1833          * We don't want to do something stupid with unusual values, but we don't
1834          * want to litter the log with warnings or break otherwise normal
1835          * processing for this feature; so if something seems unreasonable, just
1836          * log at DEBUG level and return without doing anything.
1837          */
1838         if (whenTaken < 0)
1839         {
1840                 elog(DEBUG1,
1841                 "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
1842                          (long) whenTaken);
1843                 return;
1844         }
1845         if (!TransactionIdIsNormal(xmin))
1846         {
1847                 elog(DEBUG1,
1848                          "MaintainOldSnapshotTimeMapping called with xmin = %lu",
1849                          (unsigned long) xmin);
1850                 return;
1851         }
1852
1853         LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE);
1854
1855         Assert(oldSnapshotControl->head_offset >= 0);
1856         Assert(oldSnapshotControl->head_offset < OLD_SNAPSHOT_TIME_MAP_ENTRIES);
1857         Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0);
1858         Assert(oldSnapshotControl->count_used >= 0);
1859         Assert(oldSnapshotControl->count_used <= OLD_SNAPSHOT_TIME_MAP_ENTRIES);
1860
1861         if (oldSnapshotControl->count_used == 0)
1862         {
1863                 /* set up first entry for empty mapping */
1864                 oldSnapshotControl->head_offset = 0;
1865                 oldSnapshotControl->head_timestamp = ts;
1866                 oldSnapshotControl->count_used = 1;
1867                 oldSnapshotControl->xid_by_minute[0] = xmin;
1868         }
1869         else if (ts < oldSnapshotControl->head_timestamp)
1870         {
1871                 /* old ts; log it at DEBUG */
1872                 LWLockRelease(OldSnapshotTimeMapLock);
1873                 elog(DEBUG1,
1874                          "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
1875                          (long) whenTaken);
1876                 return;
1877         }
1878         else if (ts <= (oldSnapshotControl->head_timestamp +
1879                                         ((oldSnapshotControl->count_used - 1)
1880                                          * USECS_PER_MINUTE)))
1881         {
1882                 /* existing mapping; advance xid if possible */
1883                 int                     bucket = (oldSnapshotControl->head_offset
1884                                                           + ((ts - oldSnapshotControl->head_timestamp)
1885                                                                  / USECS_PER_MINUTE))
1886                 % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1887
1888                 if (TransactionIdPrecedes(oldSnapshotControl->xid_by_minute[bucket], xmin))
1889                         oldSnapshotControl->xid_by_minute[bucket] = xmin;
1890         }
1891         else
1892         {
1893                 /* We need a new bucket, but it might not be the very next one. */
1894                 int                     advance = ((ts - oldSnapshotControl->head_timestamp)
1895                                                            / USECS_PER_MINUTE);
1896
1897                 oldSnapshotControl->head_timestamp = ts;
1898
1899                 if (advance >= OLD_SNAPSHOT_TIME_MAP_ENTRIES)
1900                 {
1901                         /* Advance is so far that all old data is junk; start over. */
1902                         oldSnapshotControl->head_offset = 0;
1903                         oldSnapshotControl->count_used = 1;
1904                         oldSnapshotControl->xid_by_minute[0] = xmin;
1905                 }
1906                 else
1907                 {
1908                         /* Store the new value in one or more buckets. */
1909                         int                     i;
1910
1911                         for (i = 0; i < advance; i++)
1912                         {
1913                                 if (oldSnapshotControl->count_used == OLD_SNAPSHOT_TIME_MAP_ENTRIES)
1914                                 {
1915                                         /* Map full and new value replaces old head. */
1916                                         int                     old_head = oldSnapshotControl->head_offset;
1917
1918                                         if (old_head == (OLD_SNAPSHOT_TIME_MAP_ENTRIES - 1))
1919                                                 oldSnapshotControl->head_offset = 0;
1920                                         else
1921                                                 oldSnapshotControl->head_offset = old_head + 1;
1922                                         oldSnapshotControl->xid_by_minute[old_head] = xmin;
1923                                 }
1924                                 else
1925                                 {
1926                                         /* Extend map to unused entry. */
1927                                         int                     new_tail = (oldSnapshotControl->head_offset
1928                                                                                         + oldSnapshotControl->count_used)
1929                                         % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1930
1931                                         oldSnapshotControl->count_used++;
1932                                         oldSnapshotControl->xid_by_minute[new_tail] = xmin;
1933                                 }
1934                         }
1935                 }
1936         }
1937
1938         LWLockRelease(OldSnapshotTimeMapLock);
1939 }
1940
1941
1942 /*
1943  * Setup a snapshot that replaces normal catalog snapshots that allows catalog
1944  * access to behave just like it did at a certain point in the past.
1945  *
1946  * Needed for logical decoding.
1947  */
1948 void
1949 SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
1950 {
1951         Assert(historic_snapshot != NULL);
1952
1953         /* setup the timetravel snapshot */
1954         HistoricSnapshot = historic_snapshot;
1955
1956         /* setup (cmin, cmax) lookup hash */
1957         tuplecid_data = tuplecids;
1958 }
1959
1960
1961 /*
1962  * Make catalog snapshots behave normally again.
1963  */
1964 void
1965 TeardownHistoricSnapshot(bool is_error)
1966 {
1967         HistoricSnapshot = NULL;
1968         tuplecid_data = NULL;
1969 }
1970
1971 bool
1972 HistoricSnapshotActive(void)
1973 {
1974         return HistoricSnapshot != NULL;
1975 }
1976
1977 HTAB *
1978 HistoricSnapshotGetTupleCids(void)
1979 {
1980         Assert(HistoricSnapshotActive());
1981         return tuplecid_data;
1982 }
1983
1984 /*
1985  * EstimateSnapshotSpace
1986  *              Returns the size needed to store the given snapshot.
1987  *
1988  * We are exporting only required fields from the Snapshot, stored in
1989  * SerializedSnapshotData.
1990  */
1991 Size
1992 EstimateSnapshotSpace(Snapshot snap)
1993 {
1994         Size            size;
1995
1996         Assert(snap != InvalidSnapshot);
1997         Assert(snap->satisfies == HeapTupleSatisfiesMVCC);
1998
1999         /* We allocate any XID arrays needed in the same palloc block. */
2000         size = add_size(sizeof(SerializedSnapshotData),
2001                                         mul_size(snap->xcnt, sizeof(TransactionId)));
2002         if (snap->subxcnt > 0 &&
2003                 (!snap->suboverflowed || snap->takenDuringRecovery))
2004                 size = add_size(size,
2005                                                 mul_size(snap->subxcnt, sizeof(TransactionId)));
2006
2007         return size;
2008 }
2009
2010 /*
2011  * SerializeSnapshot
2012  *              Dumps the serialized snapshot (extracted from given snapshot) onto the
2013  *              memory location at start_address.
2014  */
2015 void
2016 SerializeSnapshot(Snapshot snapshot, char *start_address)
2017 {
2018         SerializedSnapshotData serialized_snapshot;
2019
2020         Assert(snapshot->subxcnt >= 0);
2021
2022         /* Copy all required fields */
2023         serialized_snapshot.xmin = snapshot->xmin;
2024         serialized_snapshot.xmax = snapshot->xmax;
2025         serialized_snapshot.xcnt = snapshot->xcnt;
2026         serialized_snapshot.subxcnt = snapshot->subxcnt;
2027         serialized_snapshot.suboverflowed = snapshot->suboverflowed;
2028         serialized_snapshot.takenDuringRecovery = snapshot->takenDuringRecovery;
2029         serialized_snapshot.curcid = snapshot->curcid;
2030         serialized_snapshot.whenTaken = snapshot->whenTaken;
2031         serialized_snapshot.lsn = snapshot->lsn;
2032
2033         /*
2034          * Ignore the SubXID array if it has overflowed, unless the snapshot was
2035          * taken during recovey - in that case, top-level XIDs are in subxip as
2036          * well, and we mustn't lose them.
2037          */
2038         if (serialized_snapshot.suboverflowed && !snapshot->takenDuringRecovery)
2039                 serialized_snapshot.subxcnt = 0;
2040
2041         /* Copy struct to possibly-unaligned buffer */
2042         memcpy(start_address,
2043                    &serialized_snapshot, sizeof(SerializedSnapshotData));
2044
2045         /* Copy XID array */
2046         if (snapshot->xcnt > 0)
2047                 memcpy((TransactionId *) (start_address +
2048                                                                   sizeof(SerializedSnapshotData)),
2049                            snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
2050
2051         /*
2052          * Copy SubXID array. Don't bother to copy it if it had overflowed,
2053          * though, because it's not used anywhere in that case. Except if it's a
2054          * snapshot taken during recovery; all the top-level XIDs are in subxip as
2055          * well in that case, so we mustn't lose them.
2056          */
2057         if (serialized_snapshot.subxcnt > 0)
2058         {
2059                 Size            subxipoff = sizeof(SerializedSnapshotData) +
2060                 snapshot->xcnt * sizeof(TransactionId);
2061
2062                 memcpy((TransactionId *) (start_address + subxipoff),
2063                            snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId));
2064         }
2065 }
2066
2067 /*
2068  * RestoreSnapshot
2069  *              Restore a serialized snapshot from the specified address.
2070  *
2071  * The copy is palloc'd in TopTransactionContext and has initial refcounts set
2072  * to 0.  The returned snapshot has the copied flag set.
2073  */
2074 Snapshot
2075 RestoreSnapshot(char *start_address)
2076 {
2077         SerializedSnapshotData serialized_snapshot;
2078         Size            size;
2079         Snapshot        snapshot;
2080         TransactionId *serialized_xids;
2081
2082         memcpy(&serialized_snapshot, start_address,
2083                    sizeof(SerializedSnapshotData));
2084         serialized_xids = (TransactionId *)
2085                 (start_address + sizeof(SerializedSnapshotData));
2086
2087         /* We allocate any XID arrays needed in the same palloc block. */
2088         size = sizeof(SnapshotData)
2089                 + serialized_snapshot.xcnt * sizeof(TransactionId)
2090                 + serialized_snapshot.subxcnt * sizeof(TransactionId);
2091
2092         /* Copy all required fields */
2093         snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
2094         snapshot->satisfies = HeapTupleSatisfiesMVCC;
2095         snapshot->xmin = serialized_snapshot.xmin;
2096         snapshot->xmax = serialized_snapshot.xmax;
2097         snapshot->xip = NULL;
2098         snapshot->xcnt = serialized_snapshot.xcnt;
2099         snapshot->subxip = NULL;
2100         snapshot->subxcnt = serialized_snapshot.subxcnt;
2101         snapshot->suboverflowed = serialized_snapshot.suboverflowed;
2102         snapshot->takenDuringRecovery = serialized_snapshot.takenDuringRecovery;
2103         snapshot->curcid = serialized_snapshot.curcid;
2104         snapshot->whenTaken = serialized_snapshot.whenTaken;
2105         snapshot->lsn = serialized_snapshot.lsn;
2106
2107         /* Copy XIDs, if present. */
2108         if (serialized_snapshot.xcnt > 0)
2109         {
2110                 snapshot->xip = (TransactionId *) (snapshot + 1);
2111                 memcpy(snapshot->xip, serialized_xids,
2112                            serialized_snapshot.xcnt * sizeof(TransactionId));
2113         }
2114
2115         /* Copy SubXIDs, if present. */
2116         if (serialized_snapshot.subxcnt > 0)
2117         {
2118                 snapshot->subxip = ((TransactionId *) (snapshot + 1)) +
2119                         serialized_snapshot.xcnt;
2120                 memcpy(snapshot->subxip, serialized_xids + serialized_snapshot.xcnt,
2121                            serialized_snapshot.subxcnt * sizeof(TransactionId));
2122         }
2123
2124         /* Set the copied flag so that the caller will set refcounts correctly. */
2125         snapshot->regd_count = 0;
2126         snapshot->active_count = 0;
2127         snapshot->copied = true;
2128
2129         return snapshot;
2130 }
2131
2132 /*
2133  * Install a restored snapshot as the transaction snapshot.
2134  *
2135  * The second argument is of type void * so that snapmgr.h need not include
2136  * the declaration for PGPROC.
2137  */
2138 void
2139 RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
2140 {
2141         SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc);
2142 }