]> granicus.if.org Git - postgresql/blob - src/backend/utils/time/snapmgr.c
Rename TransactionChain functions
[postgresql] / src / backend / utils / time / snapmgr.c
1 /*-------------------------------------------------------------------------
2  *
3  * snapmgr.c
4  *              PostgreSQL snapshot manager
5  *
6  * We keep track of snapshots in two ways: those "registered" by resowner.c,
7  * and the "active snapshot" stack.  All snapshots in either of them live in
8  * persistent memory.  When a snapshot is no longer in any of these lists
9  * (tracked by separate refcounts on each snapshot), its memory can be freed.
10  *
11  * The FirstXactSnapshot, if any, is treated a bit specially: we increment its
12  * regd_count and list it in RegisteredSnapshots, but this reference is not
13  * tracked by a resource owner. We used to use the TopTransactionResourceOwner
14  * to track this snapshot reference, but that introduces logical circularity
15  * and thus makes it impossible to clean up in a sane fashion.  It's better to
16  * handle this reference as an internally-tracked registration, so that this
17  * module is entirely lower-level than ResourceOwners.
18  *
19  * Likewise, any snapshots that have been exported by pg_export_snapshot
20  * have regd_count = 1 and are listed in RegisteredSnapshots, but are not
21  * tracked by any resource owner.
22  *
23  * Likewise, the CatalogSnapshot is listed in RegisteredSnapshots when it
24  * is valid, but is not tracked by any resource owner.
25  *
26  * The same is true for historic snapshots used during logical decoding,
27  * their lifetime is managed separately (as they live longer than one xact.c
28  * transaction).
29  *
30  * These arrangements let us reset MyPgXact->xmin when there are no snapshots
31  * referenced by this transaction, and advance it when the one with oldest
32  * Xmin is no longer referenced.  For simplicity however, only registered
33  * snapshots not active snapshots participate in tracking which one is oldest;
34  * we don't try to change MyPgXact->xmin except when the active-snapshot
35  * stack is empty.
36  *
37  *
38  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
39  * Portions Copyright (c) 1994, Regents of the University of California
40  *
41  * IDENTIFICATION
42  *        src/backend/utils/time/snapmgr.c
43  *
44  *-------------------------------------------------------------------------
45  */
46 #include "postgres.h"
47
48 #include <sys/stat.h>
49 #include <unistd.h>
50
51 #include "access/transam.h"
52 #include "access/xact.h"
53 #include "access/xlog.h"
54 #include "catalog/catalog.h"
55 #include "lib/pairingheap.h"
56 #include "miscadmin.h"
57 #include "storage/predicate.h"
58 #include "storage/proc.h"
59 #include "storage/procarray.h"
60 #include "storage/sinval.h"
61 #include "storage/sinvaladt.h"
62 #include "storage/spin.h"
63 #include "utils/builtins.h"
64 #include "utils/memutils.h"
65 #include "utils/rel.h"
66 #include "utils/resowner_private.h"
67 #include "utils/snapmgr.h"
68 #include "utils/syscache.h"
69 #include "utils/tqual.h"
70
71
72 /*
73  * GUC parameters
74  */
75 int                     old_snapshot_threshold; /* number of minutes, -1 disables */
76
77 /*
78  * Structure for dealing with old_snapshot_threshold implementation.
79  */
80 typedef struct OldSnapshotControlData
81 {
82         /*
83          * Variables for old snapshot handling are shared among processes and are
84          * only allowed to move forward.
85          */
86         slock_t         mutex_current;  /* protect current_timestamp */
87         TimestampTz current_timestamp;  /* latest snapshot timestamp */
88         slock_t         mutex_latest_xmin;      /* protect latest_xmin and next_map_update */
89         TransactionId latest_xmin;      /* latest snapshot xmin */
90         TimestampTz next_map_update;    /* latest snapshot valid up to */
91         slock_t         mutex_threshold;        /* protect threshold fields */
92         TimestampTz threshold_timestamp;        /* earlier snapshot is old */
93         TransactionId threshold_xid;    /* earlier xid may be gone */
94
95         /*
96          * Keep one xid per minute for old snapshot error handling.
97          *
98          * Use a circular buffer with a head offset, a count of entries currently
99          * used, and a timestamp corresponding to the xid at the head offset.  A
100          * count_used value of zero means that there are no times stored; a
101          * count_used value of OLD_SNAPSHOT_TIME_MAP_ENTRIES means that the buffer
102          * is full and the head must be advanced to add new entries.  Use
103          * timestamps aligned to minute boundaries, since that seems less
104          * surprising than aligning based on the first usage timestamp.  The
105          * latest bucket is effectively stored within latest_xmin.  The circular
106          * buffer is updated when we get a new xmin value that doesn't fall into
107          * the same interval.
108          *
109          * It is OK if the xid for a given time slot is from earlier than
110          * calculated by adding the number of minutes corresponding to the
111          * (possibly wrapped) distance from the head offset to the time of the
112          * head entry, since that just results in the vacuuming of old tuples
113          * being slightly less aggressive.  It would not be OK for it to be off in
114          * the other direction, since it might result in vacuuming tuples that are
115          * still expected to be there.
116          *
117          * Use of an SLRU was considered but not chosen because it is more
118          * heavyweight than is needed for this, and would probably not be any less
119          * code to implement.
120          *
121          * Persistence is not needed.
122          */
123         int                     head_offset;    /* subscript of oldest tracked time */
124         TimestampTz head_timestamp; /* time corresponding to head xid */
125         int                     count_used;             /* how many slots are in use */
126         TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER];
127 } OldSnapshotControlData;
128
129 static volatile OldSnapshotControlData *oldSnapshotControl;
130
131
132 /*
133  * CurrentSnapshot points to the only snapshot taken in transaction-snapshot
134  * mode, and to the latest one taken in a read-committed transaction.
135  * SecondarySnapshot is a snapshot that's always up-to-date as of the current
136  * instant, even in transaction-snapshot mode.  It should only be used for
137  * special-purpose code (say, RI checking.)  CatalogSnapshot points to an
138  * MVCC snapshot intended to be used for catalog scans; we must invalidate it
139  * whenever a system catalog change occurs.
140  *
141  * These SnapshotData structs are static to simplify memory allocation
142  * (see the hack in GetSnapshotData to avoid repeated malloc/free).
143  */
144 static SnapshotData CurrentSnapshotData = {HeapTupleSatisfiesMVCC};
145 static SnapshotData SecondarySnapshotData = {HeapTupleSatisfiesMVCC};
146 SnapshotData CatalogSnapshotData = {HeapTupleSatisfiesMVCC};
147
148 /* Pointers to valid snapshots */
149 static Snapshot CurrentSnapshot = NULL;
150 static Snapshot SecondarySnapshot = NULL;
151 static Snapshot CatalogSnapshot = NULL;
152 static Snapshot HistoricSnapshot = NULL;
153
154 /*
155  * These are updated by GetSnapshotData.  We initialize them this way
156  * for the convenience of TransactionIdIsInProgress: even in bootstrap
157  * mode, we don't want it to say that BootstrapTransactionId is in progress.
158  *
159  * RecentGlobalXmin and RecentGlobalDataXmin are initialized to
160  * InvalidTransactionId, to ensure that no one tries to use a stale
161  * value. Readers should ensure that it has been set to something else
162  * before using it.
163  */
164 TransactionId TransactionXmin = FirstNormalTransactionId;
165 TransactionId RecentXmin = FirstNormalTransactionId;
166 TransactionId RecentGlobalXmin = InvalidTransactionId;
167 TransactionId RecentGlobalDataXmin = InvalidTransactionId;
168
169 /* (table, ctid) => (cmin, cmax) mapping during timetravel */
170 static HTAB *tuplecid_data = NULL;
171
172 /*
173  * Elements of the active snapshot stack.
174  *
175  * Each element here accounts for exactly one active_count on SnapshotData.
176  *
177  * NB: the code assumes that elements in this list are in non-increasing
178  * order of as_level; also, the list must be NULL-terminated.
179  */
180 typedef struct ActiveSnapshotElt
181 {
182         Snapshot        as_snap;
183         int                     as_level;
184         struct ActiveSnapshotElt *as_next;
185 } ActiveSnapshotElt;
186
187 /* Top of the stack of active snapshots */
188 static ActiveSnapshotElt *ActiveSnapshot = NULL;
189
190 /* Bottom of the stack of active snapshots */
191 static ActiveSnapshotElt *OldestActiveSnapshot = NULL;
192
193 /*
194  * Currently registered Snapshots.  Ordered in a heap by xmin, so that we can
195  * quickly find the one with lowest xmin, to advance our MyPgXact->xmin.
196  */
197 static int xmin_cmp(const pairingheap_node *a, const pairingheap_node *b,
198                  void *arg);
199
200 static pairingheap RegisteredSnapshots = {&xmin_cmp, NULL, NULL};
201
202 /* first GetTransactionSnapshot call in a transaction? */
203 bool            FirstSnapshotSet = false;
204
205 /*
206  * Remember the serializable transaction snapshot, if any.  We cannot trust
207  * FirstSnapshotSet in combination with IsolationUsesXactSnapshot(), because
208  * GUC may be reset before us, changing the value of IsolationUsesXactSnapshot.
209  */
210 static Snapshot FirstXactSnapshot = NULL;
211
212 /* Define pathname of exported-snapshot files */
213 #define SNAPSHOT_EXPORT_DIR "pg_snapshots"
214
215 /* Structure holding info about exported snapshot. */
216 typedef struct ExportedSnapshot
217 {
218         char       *snapfile;
219         Snapshot        snapshot;
220 } ExportedSnapshot;
221
222 /* Current xact's exported snapshots (a list of ExportedSnapshot structs) */
223 static List *exportedSnapshots = NIL;
224
225 /* Prototypes for local functions */
226 static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts);
227 static Snapshot CopySnapshot(Snapshot snapshot);
228 static void FreeSnapshot(Snapshot snapshot);
229 static void SnapshotResetXmin(void);
230
231 /*
232  * Snapshot fields to be serialized.
233  *
234  * Only these fields need to be sent to the cooperating backend; the
235  * remaining ones can (and must) be set by the receiver upon restore.
236  */
237 typedef struct SerializedSnapshotData
238 {
239         TransactionId xmin;
240         TransactionId xmax;
241         uint32          xcnt;
242         int32           subxcnt;
243         bool            suboverflowed;
244         bool            takenDuringRecovery;
245         CommandId       curcid;
246         TimestampTz whenTaken;
247         XLogRecPtr      lsn;
248 } SerializedSnapshotData;
249
250 Size
251 SnapMgrShmemSize(void)
252 {
253         Size            size;
254
255         size = offsetof(OldSnapshotControlData, xid_by_minute);
256         if (old_snapshot_threshold > 0)
257                 size = add_size(size, mul_size(sizeof(TransactionId),
258                                                                            OLD_SNAPSHOT_TIME_MAP_ENTRIES));
259
260         return size;
261 }
262
263 /*
264  * Initialize for managing old snapshot detection.
265  */
266 void
267 SnapMgrInit(void)
268 {
269         bool            found;
270
271         /*
272          * Create or attach to the OldSnapshotControlData structure.
273          */
274         oldSnapshotControl = (volatile OldSnapshotControlData *)
275                 ShmemInitStruct("OldSnapshotControlData",
276                                                 SnapMgrShmemSize(), &found);
277
278         if (!found)
279         {
280                 SpinLockInit(&oldSnapshotControl->mutex_current);
281                 oldSnapshotControl->current_timestamp = 0;
282                 SpinLockInit(&oldSnapshotControl->mutex_latest_xmin);
283                 oldSnapshotControl->latest_xmin = InvalidTransactionId;
284                 oldSnapshotControl->next_map_update = 0;
285                 SpinLockInit(&oldSnapshotControl->mutex_threshold);
286                 oldSnapshotControl->threshold_timestamp = 0;
287                 oldSnapshotControl->threshold_xid = InvalidTransactionId;
288                 oldSnapshotControl->head_offset = 0;
289                 oldSnapshotControl->head_timestamp = 0;
290                 oldSnapshotControl->count_used = 0;
291         }
292 }
293
294 /*
295  * GetTransactionSnapshot
296  *              Get the appropriate snapshot for a new query in a transaction.
297  *
298  * Note that the return value may point at static storage that will be modified
299  * by future calls and by CommandCounterIncrement().  Callers should call
300  * RegisterSnapshot or PushActiveSnapshot on the returned snap if it is to be
301  * used very long.
302  */
303 Snapshot
304 GetTransactionSnapshot(void)
305 {
306         /*
307          * Return historic snapshot if doing logical decoding. We'll never need a
308          * non-historic transaction snapshot in this (sub-)transaction, so there's
309          * no need to be careful to set one up for later calls to
310          * GetTransactionSnapshot().
311          */
312         if (HistoricSnapshotActive())
313         {
314                 Assert(!FirstSnapshotSet);
315                 return HistoricSnapshot;
316         }
317
318         /* First call in transaction? */
319         if (!FirstSnapshotSet)
320         {
321                 /*
322                  * Don't allow catalog snapshot to be older than xact snapshot.  Must
323                  * do this first to allow the empty-heap Assert to succeed.
324                  */
325                 InvalidateCatalogSnapshot();
326
327                 Assert(pairingheap_is_empty(&RegisteredSnapshots));
328                 Assert(FirstXactSnapshot == NULL);
329
330                 if (IsInParallelMode())
331                         elog(ERROR,
332                                  "cannot take query snapshot during a parallel operation");
333
334                 /*
335                  * In transaction-snapshot mode, the first snapshot must live until
336                  * end of xact regardless of what the caller does with it, so we must
337                  * make a copy of it rather than returning CurrentSnapshotData
338                  * directly.  Furthermore, if we're running in serializable mode,
339                  * predicate.c needs to wrap the snapshot fetch in its own processing.
340                  */
341                 if (IsolationUsesXactSnapshot())
342                 {
343                         /* First, create the snapshot in CurrentSnapshotData */
344                         if (IsolationIsSerializable())
345                                 CurrentSnapshot = GetSerializableTransactionSnapshot(&CurrentSnapshotData);
346                         else
347                                 CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
348                         /* Make a saved copy */
349                         CurrentSnapshot = CopySnapshot(CurrentSnapshot);
350                         FirstXactSnapshot = CurrentSnapshot;
351                         /* Mark it as "registered" in FirstXactSnapshot */
352                         FirstXactSnapshot->regd_count++;
353                         pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
354                 }
355                 else
356                         CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
357
358                 FirstSnapshotSet = true;
359                 return CurrentSnapshot;
360         }
361
362         if (IsolationUsesXactSnapshot())
363                 return CurrentSnapshot;
364
365         /* Don't allow catalog snapshot to be older than xact snapshot. */
366         InvalidateCatalogSnapshot();
367
368         CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
369
370         return CurrentSnapshot;
371 }
372
373 /*
374  * GetLatestSnapshot
375  *              Get a snapshot that is up-to-date as of the current instant,
376  *              even if we are executing in transaction-snapshot mode.
377  */
378 Snapshot
379 GetLatestSnapshot(void)
380 {
381         /*
382          * We might be able to relax this, but nothing that could otherwise work
383          * needs it.
384          */
385         if (IsInParallelMode())
386                 elog(ERROR,
387                          "cannot update SecondarySnapshot during a parallel operation");
388
389         /*
390          * So far there are no cases requiring support for GetLatestSnapshot()
391          * during logical decoding, but it wouldn't be hard to add if required.
392          */
393         Assert(!HistoricSnapshotActive());
394
395         /* If first call in transaction, go ahead and set the xact snapshot */
396         if (!FirstSnapshotSet)
397                 return GetTransactionSnapshot();
398
399         SecondarySnapshot = GetSnapshotData(&SecondarySnapshotData);
400
401         return SecondarySnapshot;
402 }
403
404 /*
405  * GetOldestSnapshot
406  *
407  *              Get the transaction's oldest known snapshot, as judged by the LSN.
408  *              Will return NULL if there are no active or registered snapshots.
409  */
410 Snapshot
411 GetOldestSnapshot(void)
412 {
413         Snapshot        OldestRegisteredSnapshot = NULL;
414         XLogRecPtr      RegisteredLSN = InvalidXLogRecPtr;
415
416         if (!pairingheap_is_empty(&RegisteredSnapshots))
417         {
418                 OldestRegisteredSnapshot = pairingheap_container(SnapshotData, ph_node,
419                                                                                                                  pairingheap_first(&RegisteredSnapshots));
420                 RegisteredLSN = OldestRegisteredSnapshot->lsn;
421         }
422
423         if (OldestActiveSnapshot != NULL)
424         {
425                 XLogRecPtr      ActiveLSN = OldestActiveSnapshot->as_snap->lsn;
426
427                 if (XLogRecPtrIsInvalid(RegisteredLSN) || RegisteredLSN > ActiveLSN)
428                         return OldestActiveSnapshot->as_snap;
429         }
430
431         return OldestRegisteredSnapshot;
432 }
433
434 /*
435  * GetCatalogSnapshot
436  *              Get a snapshot that is sufficiently up-to-date for scan of the
437  *              system catalog with the specified OID.
438  */
439 Snapshot
440 GetCatalogSnapshot(Oid relid)
441 {
442         /*
443          * Return historic snapshot while we're doing logical decoding, so we can
444          * see the appropriate state of the catalog.
445          *
446          * This is the primary reason for needing to reset the system caches after
447          * finishing decoding.
448          */
449         if (HistoricSnapshotActive())
450                 return HistoricSnapshot;
451
452         return GetNonHistoricCatalogSnapshot(relid);
453 }
454
455 /*
456  * GetNonHistoricCatalogSnapshot
457  *              Get a snapshot that is sufficiently up-to-date for scan of the system
458  *              catalog with the specified OID, even while historic snapshots are set
459  *              up.
460  */
461 Snapshot
462 GetNonHistoricCatalogSnapshot(Oid relid)
463 {
464         /*
465          * If the caller is trying to scan a relation that has no syscache, no
466          * catcache invalidations will be sent when it is updated.  For a few key
467          * relations, snapshot invalidations are sent instead.  If we're trying to
468          * scan a relation for which neither catcache nor snapshot invalidations
469          * are sent, we must refresh the snapshot every time.
470          */
471         if (CatalogSnapshot &&
472                 !RelationInvalidatesSnapshotsOnly(relid) &&
473                 !RelationHasSysCache(relid))
474                 InvalidateCatalogSnapshot();
475
476         if (CatalogSnapshot == NULL)
477         {
478                 /* Get new snapshot. */
479                 CatalogSnapshot = GetSnapshotData(&CatalogSnapshotData);
480
481                 /*
482                  * Make sure the catalog snapshot will be accounted for in decisions
483                  * about advancing PGXACT->xmin.  We could apply RegisterSnapshot, but
484                  * that would result in making a physical copy, which is overkill; and
485                  * it would also create a dependency on some resource owner, which we
486                  * do not want for reasons explained at the head of this file. Instead
487                  * just shove the CatalogSnapshot into the pairing heap manually. This
488                  * has to be reversed in InvalidateCatalogSnapshot, of course.
489                  *
490                  * NB: it had better be impossible for this to throw error, since the
491                  * CatalogSnapshot pointer is already valid.
492                  */
493                 pairingheap_add(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
494         }
495
496         return CatalogSnapshot;
497 }
498
499 /*
500  * InvalidateCatalogSnapshot
501  *              Mark the current catalog snapshot, if any, as invalid
502  *
503  * We could change this API to allow the caller to provide more fine-grained
504  * invalidation details, so that a change to relation A wouldn't prevent us
505  * from using our cached snapshot to scan relation B, but so far there's no
506  * evidence that the CPU cycles we spent tracking such fine details would be
507  * well-spent.
508  */
509 void
510 InvalidateCatalogSnapshot(void)
511 {
512         if (CatalogSnapshot)
513         {
514                 pairingheap_remove(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
515                 CatalogSnapshot = NULL;
516                 SnapshotResetXmin();
517         }
518 }
519
520 /*
521  * InvalidateCatalogSnapshotConditionally
522  *              Drop catalog snapshot if it's the only one we have
523  *
524  * This is called when we are about to wait for client input, so we don't
525  * want to continue holding the catalog snapshot if it might mean that the
526  * global xmin horizon can't advance.  However, if there are other snapshots
527  * still active or registered, the catalog snapshot isn't likely to be the
528  * oldest one, so we might as well keep it.
529  */
530 void
531 InvalidateCatalogSnapshotConditionally(void)
532 {
533         if (CatalogSnapshot &&
534                 ActiveSnapshot == NULL &&
535                 pairingheap_is_singular(&RegisteredSnapshots))
536                 InvalidateCatalogSnapshot();
537 }
538
539 /*
540  * SnapshotSetCommandId
541  *              Propagate CommandCounterIncrement into the static snapshots, if set
542  */
543 void
544 SnapshotSetCommandId(CommandId curcid)
545 {
546         if (!FirstSnapshotSet)
547                 return;
548
549         if (CurrentSnapshot)
550                 CurrentSnapshot->curcid = curcid;
551         if (SecondarySnapshot)
552                 SecondarySnapshot->curcid = curcid;
553         /* Should we do the same with CatalogSnapshot? */
554 }
555
556 /*
557  * SetTransactionSnapshot
558  *              Set the transaction's snapshot from an imported MVCC snapshot.
559  *
560  * Note that this is very closely tied to GetTransactionSnapshot --- it
561  * must take care of all the same considerations as the first-snapshot case
562  * in GetTransactionSnapshot.
563  */
564 static void
565 SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
566                                            int sourcepid, PGPROC *sourceproc)
567 {
568         /* Caller should have checked this already */
569         Assert(!FirstSnapshotSet);
570
571         /* Better do this to ensure following Assert succeeds. */
572         InvalidateCatalogSnapshot();
573
574         Assert(pairingheap_is_empty(&RegisteredSnapshots));
575         Assert(FirstXactSnapshot == NULL);
576         Assert(!HistoricSnapshotActive());
577
578         /*
579          * Even though we are not going to use the snapshot it computes, we must
580          * call GetSnapshotData, for two reasons: (1) to be sure that
581          * CurrentSnapshotData's XID arrays have been allocated, and (2) to update
582          * RecentXmin and RecentGlobalXmin.  (We could alternatively include those
583          * two variables in exported snapshot files, but it seems better to have
584          * snapshot importers compute reasonably up-to-date values for them.)
585          */
586         CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
587
588         /*
589          * Now copy appropriate fields from the source snapshot.
590          */
591         CurrentSnapshot->xmin = sourcesnap->xmin;
592         CurrentSnapshot->xmax = sourcesnap->xmax;
593         CurrentSnapshot->xcnt = sourcesnap->xcnt;
594         Assert(sourcesnap->xcnt <= GetMaxSnapshotXidCount());
595         memcpy(CurrentSnapshot->xip, sourcesnap->xip,
596                    sourcesnap->xcnt * sizeof(TransactionId));
597         CurrentSnapshot->subxcnt = sourcesnap->subxcnt;
598         Assert(sourcesnap->subxcnt <= GetMaxSnapshotSubxidCount());
599         memcpy(CurrentSnapshot->subxip, sourcesnap->subxip,
600                    sourcesnap->subxcnt * sizeof(TransactionId));
601         CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed;
602         CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery;
603         /* NB: curcid should NOT be copied, it's a local matter */
604
605         /*
606          * Now we have to fix what GetSnapshotData did with MyPgXact->xmin and
607          * TransactionXmin.  There is a race condition: to make sure we are not
608          * causing the global xmin to go backwards, we have to test that the
609          * source transaction is still running, and that has to be done
610          * atomically. So let procarray.c do it.
611          *
612          * Note: in serializable mode, predicate.c will do this a second time. It
613          * doesn't seem worth contorting the logic here to avoid two calls,
614          * especially since it's not clear that predicate.c *must* do this.
615          */
616         if (sourceproc != NULL)
617         {
618                 if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc))
619                         ereport(ERROR,
620                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
621                                          errmsg("could not import the requested snapshot"),
622                                          errdetail("The source transaction is not running anymore.")));
623         }
624         else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcevxid))
625                 ereport(ERROR,
626                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
627                                  errmsg("could not import the requested snapshot"),
628                                  errdetail("The source process with PID %d is not running anymore.",
629                                                    sourcepid)));
630
631         /*
632          * In transaction-snapshot mode, the first snapshot must live until end of
633          * xact, so we must make a copy of it.  Furthermore, if we're running in
634          * serializable mode, predicate.c needs to do its own processing.
635          */
636         if (IsolationUsesXactSnapshot())
637         {
638                 if (IsolationIsSerializable())
639                         SetSerializableTransactionSnapshot(CurrentSnapshot, sourcevxid,
640                                                                                            sourcepid);
641                 /* Make a saved copy */
642                 CurrentSnapshot = CopySnapshot(CurrentSnapshot);
643                 FirstXactSnapshot = CurrentSnapshot;
644                 /* Mark it as "registered" in FirstXactSnapshot */
645                 FirstXactSnapshot->regd_count++;
646                 pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
647         }
648
649         FirstSnapshotSet = true;
650 }
651
652 /*
653  * CopySnapshot
654  *              Copy the given snapshot.
655  *
656  * The copy is palloc'd in TopTransactionContext and has initial refcounts set
657  * to 0.  The returned snapshot has the copied flag set.
658  */
659 static Snapshot
660 CopySnapshot(Snapshot snapshot)
661 {
662         Snapshot        newsnap;
663         Size            subxipoff;
664         Size            size;
665
666         Assert(snapshot != InvalidSnapshot);
667
668         /* We allocate any XID arrays needed in the same palloc block. */
669         size = subxipoff = sizeof(SnapshotData) +
670                 snapshot->xcnt * sizeof(TransactionId);
671         if (snapshot->subxcnt > 0)
672                 size += snapshot->subxcnt * sizeof(TransactionId);
673
674         newsnap = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
675         memcpy(newsnap, snapshot, sizeof(SnapshotData));
676
677         newsnap->regd_count = 0;
678         newsnap->active_count = 0;
679         newsnap->copied = true;
680
681         /* setup XID array */
682         if (snapshot->xcnt > 0)
683         {
684                 newsnap->xip = (TransactionId *) (newsnap + 1);
685                 memcpy(newsnap->xip, snapshot->xip,
686                            snapshot->xcnt * sizeof(TransactionId));
687         }
688         else
689                 newsnap->xip = NULL;
690
691         /*
692          * Setup subXID array. Don't bother to copy it if it had overflowed,
693          * though, because it's not used anywhere in that case. Except if it's a
694          * snapshot taken during recovery; all the top-level XIDs are in subxip as
695          * well in that case, so we mustn't lose them.
696          */
697         if (snapshot->subxcnt > 0 &&
698                 (!snapshot->suboverflowed || snapshot->takenDuringRecovery))
699         {
700                 newsnap->subxip = (TransactionId *) ((char *) newsnap + subxipoff);
701                 memcpy(newsnap->subxip, snapshot->subxip,
702                            snapshot->subxcnt * sizeof(TransactionId));
703         }
704         else
705                 newsnap->subxip = NULL;
706
707         return newsnap;
708 }
709
710 /*
711  * FreeSnapshot
712  *              Free the memory associated with a snapshot.
713  */
714 static void
715 FreeSnapshot(Snapshot snapshot)
716 {
717         Assert(snapshot->regd_count == 0);
718         Assert(snapshot->active_count == 0);
719         Assert(snapshot->copied);
720
721         pfree(snapshot);
722 }
723
724 /*
725  * PushActiveSnapshot
726  *              Set the given snapshot as the current active snapshot
727  *
728  * If the passed snapshot is a statically-allocated one, or it is possibly
729  * subject to a future command counter update, create a new long-lived copy
730  * with active refcount=1.  Otherwise, only increment the refcount.
731  */
732 void
733 PushActiveSnapshot(Snapshot snap)
734 {
735         ActiveSnapshotElt *newactive;
736
737         Assert(snap != InvalidSnapshot);
738
739         newactive = MemoryContextAlloc(TopTransactionContext, sizeof(ActiveSnapshotElt));
740
741         /*
742          * Checking SecondarySnapshot is probably useless here, but it seems
743          * better to be sure.
744          */
745         if (snap == CurrentSnapshot || snap == SecondarySnapshot || !snap->copied)
746                 newactive->as_snap = CopySnapshot(snap);
747         else
748                 newactive->as_snap = snap;
749
750         newactive->as_next = ActiveSnapshot;
751         newactive->as_level = GetCurrentTransactionNestLevel();
752
753         newactive->as_snap->active_count++;
754
755         ActiveSnapshot = newactive;
756         if (OldestActiveSnapshot == NULL)
757                 OldestActiveSnapshot = ActiveSnapshot;
758 }
759
760 /*
761  * PushCopiedSnapshot
762  *              As above, except forcibly copy the presented snapshot.
763  *
764  * This should be used when the ActiveSnapshot has to be modifiable, for
765  * example if the caller intends to call UpdateActiveSnapshotCommandId.
766  * The new snapshot will be released when popped from the stack.
767  */
768 void
769 PushCopiedSnapshot(Snapshot snapshot)
770 {
771         PushActiveSnapshot(CopySnapshot(snapshot));
772 }
773
774 /*
775  * UpdateActiveSnapshotCommandId
776  *
777  * Update the current CID of the active snapshot.  This can only be applied
778  * to a snapshot that is not referenced elsewhere.
779  */
780 void
781 UpdateActiveSnapshotCommandId(void)
782 {
783         CommandId       save_curcid,
784                                 curcid;
785
786         Assert(ActiveSnapshot != NULL);
787         Assert(ActiveSnapshot->as_snap->active_count == 1);
788         Assert(ActiveSnapshot->as_snap->regd_count == 0);
789
790         /*
791          * Don't allow modification of the active snapshot during parallel
792          * operation.  We share the snapshot to worker backends at the beginning
793          * of parallel operation, so any change to the snapshot can lead to
794          * inconsistencies.  We have other defenses against
795          * CommandCounterIncrement, but there are a few places that call this
796          * directly, so we put an additional guard here.
797          */
798         save_curcid = ActiveSnapshot->as_snap->curcid;
799         curcid = GetCurrentCommandId(false);
800         if (IsInParallelMode() && save_curcid != curcid)
801                 elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation");
802         ActiveSnapshot->as_snap->curcid = curcid;
803 }
804
805 /*
806  * PopActiveSnapshot
807  *
808  * Remove the topmost snapshot from the active snapshot stack, decrementing the
809  * reference count, and free it if this was the last reference.
810  */
811 void
812 PopActiveSnapshot(void)
813 {
814         ActiveSnapshotElt *newstack;
815
816         newstack = ActiveSnapshot->as_next;
817
818         Assert(ActiveSnapshot->as_snap->active_count > 0);
819
820         ActiveSnapshot->as_snap->active_count--;
821
822         if (ActiveSnapshot->as_snap->active_count == 0 &&
823                 ActiveSnapshot->as_snap->regd_count == 0)
824                 FreeSnapshot(ActiveSnapshot->as_snap);
825
826         pfree(ActiveSnapshot);
827         ActiveSnapshot = newstack;
828         if (ActiveSnapshot == NULL)
829                 OldestActiveSnapshot = NULL;
830
831         SnapshotResetXmin();
832 }
833
834 /*
835  * GetActiveSnapshot
836  *              Return the topmost snapshot in the Active stack.
837  */
838 Snapshot
839 GetActiveSnapshot(void)
840 {
841         Assert(ActiveSnapshot != NULL);
842
843         return ActiveSnapshot->as_snap;
844 }
845
846 /*
847  * ActiveSnapshotSet
848  *              Return whether there is at least one snapshot in the Active stack
849  */
850 bool
851 ActiveSnapshotSet(void)
852 {
853         return ActiveSnapshot != NULL;
854 }
855
856 /*
857  * RegisterSnapshot
858  *              Register a snapshot as being in use by the current resource owner
859  *
860  * If InvalidSnapshot is passed, it is not registered.
861  */
862 Snapshot
863 RegisterSnapshot(Snapshot snapshot)
864 {
865         if (snapshot == InvalidSnapshot)
866                 return InvalidSnapshot;
867
868         return RegisterSnapshotOnOwner(snapshot, CurrentResourceOwner);
869 }
870
871 /*
872  * RegisterSnapshotOnOwner
873  *              As above, but use the specified resource owner
874  */
875 Snapshot
876 RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner)
877 {
878         Snapshot        snap;
879
880         if (snapshot == InvalidSnapshot)
881                 return InvalidSnapshot;
882
883         /* Static snapshot?  Create a persistent copy */
884         snap = snapshot->copied ? snapshot : CopySnapshot(snapshot);
885
886         /* and tell resowner.c about it */
887         ResourceOwnerEnlargeSnapshots(owner);
888         snap->regd_count++;
889         ResourceOwnerRememberSnapshot(owner, snap);
890
891         if (snap->regd_count == 1)
892                 pairingheap_add(&RegisteredSnapshots, &snap->ph_node);
893
894         return snap;
895 }
896
897 /*
898  * UnregisterSnapshot
899  *
900  * Decrement the reference count of a snapshot, remove the corresponding
901  * reference from CurrentResourceOwner, and free the snapshot if no more
902  * references remain.
903  */
904 void
905 UnregisterSnapshot(Snapshot snapshot)
906 {
907         if (snapshot == NULL)
908                 return;
909
910         UnregisterSnapshotFromOwner(snapshot, CurrentResourceOwner);
911 }
912
913 /*
914  * UnregisterSnapshotFromOwner
915  *              As above, but use the specified resource owner
916  */
917 void
918 UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner)
919 {
920         if (snapshot == NULL)
921                 return;
922
923         Assert(snapshot->regd_count > 0);
924         Assert(!pairingheap_is_empty(&RegisteredSnapshots));
925
926         ResourceOwnerForgetSnapshot(owner, snapshot);
927
928         snapshot->regd_count--;
929         if (snapshot->regd_count == 0)
930                 pairingheap_remove(&RegisteredSnapshots, &snapshot->ph_node);
931
932         if (snapshot->regd_count == 0 && snapshot->active_count == 0)
933         {
934                 FreeSnapshot(snapshot);
935                 SnapshotResetXmin();
936         }
937 }
938
939 /*
940  * Comparison function for RegisteredSnapshots heap.  Snapshots are ordered
941  * by xmin, so that the snapshot with smallest xmin is at the top.
942  */
943 static int
944 xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
945 {
946         const SnapshotData *asnap = pairingheap_const_container(SnapshotData, ph_node, a);
947         const SnapshotData *bsnap = pairingheap_const_container(SnapshotData, ph_node, b);
948
949         if (TransactionIdPrecedes(asnap->xmin, bsnap->xmin))
950                 return 1;
951         else if (TransactionIdFollows(asnap->xmin, bsnap->xmin))
952                 return -1;
953         else
954                 return 0;
955 }
956
957 /*
958  * SnapshotResetXmin
959  *
960  * If there are no more snapshots, we can reset our PGXACT->xmin to InvalidXid.
961  * Note we can do this without locking because we assume that storing an Xid
962  * is atomic.
963  *
964  * Even if there are some remaining snapshots, we may be able to advance our
965  * PGXACT->xmin to some degree.  This typically happens when a portal is
966  * dropped.  For efficiency, we only consider recomputing PGXACT->xmin when
967  * the active snapshot stack is empty; this allows us not to need to track
968  * which active snapshot is oldest.
969  *
970  * Note: it's tempting to use GetOldestSnapshot() here so that we can include
971  * active snapshots in the calculation.  However, that compares by LSN not
972  * xmin so it's not entirely clear that it's the same thing.  Also, we'd be
973  * critically dependent on the assumption that the bottommost active snapshot
974  * stack entry has the oldest xmin.  (Current uses of GetOldestSnapshot() are
975  * not actually critical, but this would be.)
976  */
977 static void
978 SnapshotResetXmin(void)
979 {
980         Snapshot        minSnapshot;
981
982         if (ActiveSnapshot != NULL)
983                 return;
984
985         if (pairingheap_is_empty(&RegisteredSnapshots))
986         {
987                 MyPgXact->xmin = InvalidTransactionId;
988                 return;
989         }
990
991         minSnapshot = pairingheap_container(SnapshotData, ph_node,
992                                                                                 pairingheap_first(&RegisteredSnapshots));
993
994         if (TransactionIdPrecedes(MyPgXact->xmin, minSnapshot->xmin))
995                 MyPgXact->xmin = minSnapshot->xmin;
996 }
997
998 /*
999  * AtSubCommit_Snapshot
1000  */
1001 void
1002 AtSubCommit_Snapshot(int level)
1003 {
1004         ActiveSnapshotElt *active;
1005
1006         /*
1007          * Relabel the active snapshots set in this subtransaction as though they
1008          * are owned by the parent subxact.
1009          */
1010         for (active = ActiveSnapshot; active != NULL; active = active->as_next)
1011         {
1012                 if (active->as_level < level)
1013                         break;
1014                 active->as_level = level - 1;
1015         }
1016 }
1017
1018 /*
1019  * AtSubAbort_Snapshot
1020  *              Clean up snapshots after a subtransaction abort
1021  */
1022 void
1023 AtSubAbort_Snapshot(int level)
1024 {
1025         /* Forget the active snapshots set by this subtransaction */
1026         while (ActiveSnapshot && ActiveSnapshot->as_level >= level)
1027         {
1028                 ActiveSnapshotElt *next;
1029
1030                 next = ActiveSnapshot->as_next;
1031
1032                 /*
1033                  * Decrement the snapshot's active count.  If it's still registered or
1034                  * marked as active by an outer subtransaction, we can't free it yet.
1035                  */
1036                 Assert(ActiveSnapshot->as_snap->active_count >= 1);
1037                 ActiveSnapshot->as_snap->active_count -= 1;
1038
1039                 if (ActiveSnapshot->as_snap->active_count == 0 &&
1040                         ActiveSnapshot->as_snap->regd_count == 0)
1041                         FreeSnapshot(ActiveSnapshot->as_snap);
1042
1043                 /* and free the stack element */
1044                 pfree(ActiveSnapshot);
1045
1046                 ActiveSnapshot = next;
1047                 if (ActiveSnapshot == NULL)
1048                         OldestActiveSnapshot = NULL;
1049         }
1050
1051         SnapshotResetXmin();
1052 }
1053
1054 /*
1055  * AtEOXact_Snapshot
1056  *              Snapshot manager's cleanup function for end of transaction
1057  */
1058 void
1059 AtEOXact_Snapshot(bool isCommit, bool resetXmin)
1060 {
1061         /*
1062          * In transaction-snapshot mode we must release our privately-managed
1063          * reference to the transaction snapshot.  We must remove it from
1064          * RegisteredSnapshots to keep the check below happy.  But we don't bother
1065          * to do FreeSnapshot, for two reasons: the memory will go away with
1066          * TopTransactionContext anyway, and if someone has left the snapshot
1067          * stacked as active, we don't want the code below to be chasing through a
1068          * dangling pointer.
1069          */
1070         if (FirstXactSnapshot != NULL)
1071         {
1072                 Assert(FirstXactSnapshot->regd_count > 0);
1073                 Assert(!pairingheap_is_empty(&RegisteredSnapshots));
1074                 pairingheap_remove(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
1075         }
1076         FirstXactSnapshot = NULL;
1077
1078         /*
1079          * If we exported any snapshots, clean them up.
1080          */
1081         if (exportedSnapshots != NIL)
1082         {
1083                 ListCell   *lc;
1084
1085                 /*
1086                  * Get rid of the files.  Unlink failure is only a WARNING because (1)
1087                  * it's too late to abort the transaction, and (2) leaving a leaked
1088                  * file around has little real consequence anyway.
1089                  *
1090                  * We also also need to remove the snapshots from RegisteredSnapshots
1091                  * to prevent a warning below.
1092                  *
1093                  * As with the FirstXactSnapshot, we don't need to free resources of
1094                  * the snapshot iself as it will go away with the memory context.
1095                  */
1096                 foreach(lc, exportedSnapshots)
1097                 {
1098                         ExportedSnapshot *esnap = (ExportedSnapshot *) lfirst(lc);
1099
1100                         if (unlink(esnap->snapfile))
1101                                 elog(WARNING, "could not unlink file \"%s\": %m",
1102                                          esnap->snapfile);
1103
1104                         pairingheap_remove(&RegisteredSnapshots,
1105                                                            &esnap->snapshot->ph_node);
1106                 }
1107
1108                 exportedSnapshots = NIL;
1109         }
1110
1111         /* Drop catalog snapshot if any */
1112         InvalidateCatalogSnapshot();
1113
1114         /* On commit, complain about leftover snapshots */
1115         if (isCommit)
1116         {
1117                 ActiveSnapshotElt *active;
1118
1119                 if (!pairingheap_is_empty(&RegisteredSnapshots))
1120                         elog(WARNING, "registered snapshots seem to remain after cleanup");
1121
1122                 /* complain about unpopped active snapshots */
1123                 for (active = ActiveSnapshot; active != NULL; active = active->as_next)
1124                         elog(WARNING, "snapshot %p still active", active);
1125         }
1126
1127         /*
1128          * And reset our state.  We don't need to free the memory explicitly --
1129          * it'll go away with TopTransactionContext.
1130          */
1131         ActiveSnapshot = NULL;
1132         OldestActiveSnapshot = NULL;
1133         pairingheap_reset(&RegisteredSnapshots);
1134
1135         CurrentSnapshot = NULL;
1136         SecondarySnapshot = NULL;
1137
1138         FirstSnapshotSet = false;
1139
1140         /*
1141          * During normal commit processing, we call ProcArrayEndTransaction() to
1142          * reset the PgXact->xmin. That call happens prior to the call to
1143          * AtEOXact_Snapshot(), so we need not touch xmin here at all.
1144          */
1145         if (resetXmin)
1146                 SnapshotResetXmin();
1147
1148         Assert(resetXmin || MyPgXact->xmin == 0);
1149 }
1150
1151
1152 /*
1153  * ExportSnapshot
1154  *              Export the snapshot to a file so that other backends can import it.
1155  *              Returns the token (the file name) that can be used to import this
1156  *              snapshot.
1157  */
1158 char *
1159 ExportSnapshot(Snapshot snapshot)
1160 {
1161         TransactionId topXid;
1162         TransactionId *children;
1163         ExportedSnapshot *esnap;
1164         int                     nchildren;
1165         int                     addTopXid;
1166         StringInfoData buf;
1167         FILE       *f;
1168         int                     i;
1169         MemoryContext oldcxt;
1170         char            path[MAXPGPATH];
1171         char            pathtmp[MAXPGPATH];
1172
1173         /*
1174          * It's tempting to call RequireTransactionBlock here, since it's not very
1175          * useful to export a snapshot that will disappear immediately afterwards.
1176          * However, we haven't got enough information to do that, since we don't
1177          * know if we're at top level or not.  For example, we could be inside a
1178          * plpgsql function that is going to fire off other transactions via
1179          * dblink.  Rather than disallow perfectly legitimate usages, don't make a
1180          * check.
1181          *
1182          * Also note that we don't make any restriction on the transaction's
1183          * isolation level; however, importers must check the level if they are
1184          * serializable.
1185          */
1186
1187         /*
1188          * Get our transaction ID if there is one, to include in the snapshot.
1189          */
1190         topXid = GetTopTransactionIdIfAny();
1191
1192         /*
1193          * We cannot export a snapshot from a subtransaction because there's no
1194          * easy way for importers to verify that the same subtransaction is still
1195          * running.
1196          */
1197         if (IsSubTransaction())
1198                 ereport(ERROR,
1199                                 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1200                                  errmsg("cannot export a snapshot from a subtransaction")));
1201
1202         /*
1203          * We do however allow previous committed subtransactions to exist.
1204          * Importers of the snapshot must see them as still running, so get their
1205          * XIDs to add them to the snapshot.
1206          */
1207         nchildren = xactGetCommittedChildren(&children);
1208
1209         /*
1210          * Generate file path for the snapshot.  We start numbering of snapshots
1211          * inside the transaction from 1.
1212          */
1213         snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d",
1214                          MyProc->backendId, MyProc->lxid, list_length(exportedSnapshots) + 1);
1215
1216         /*
1217          * Copy the snapshot into TopTransactionContext, add it to the
1218          * exportedSnapshots list, and mark it pseudo-registered.  We do this to
1219          * ensure that the snapshot's xmin is honored for the rest of the
1220          * transaction.
1221          */
1222         snapshot = CopySnapshot(snapshot);
1223
1224         oldcxt = MemoryContextSwitchTo(TopTransactionContext);
1225         esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot));
1226         esnap->snapfile = pstrdup(path);
1227         esnap->snapshot = snapshot;
1228         exportedSnapshots = lappend(exportedSnapshots, esnap);
1229         MemoryContextSwitchTo(oldcxt);
1230
1231         snapshot->regd_count++;
1232         pairingheap_add(&RegisteredSnapshots, &snapshot->ph_node);
1233
1234         /*
1235          * Fill buf with a text serialization of the snapshot, plus identification
1236          * data about this transaction.  The format expected by ImportSnapshot is
1237          * pretty rigid: each line must be fieldname:value.
1238          */
1239         initStringInfo(&buf);
1240
1241         appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->backendId, MyProc->lxid);
1242         appendStringInfo(&buf, "pid:%d\n", MyProcPid);
1243         appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
1244         appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);
1245         appendStringInfo(&buf, "ro:%d\n", XactReadOnly);
1246
1247         appendStringInfo(&buf, "xmin:%u\n", snapshot->xmin);
1248         appendStringInfo(&buf, "xmax:%u\n", snapshot->xmax);
1249
1250         /*
1251          * We must include our own top transaction ID in the top-xid data, since
1252          * by definition we will still be running when the importing transaction
1253          * adopts the snapshot, but GetSnapshotData never includes our own XID in
1254          * the snapshot.  (There must, therefore, be enough room to add it.)
1255          *
1256          * However, it could be that our topXid is after the xmax, in which case
1257          * we shouldn't include it because xip[] members are expected to be before
1258          * xmax.  (We need not make the same check for subxip[] members, see
1259          * snapshot.h.)
1260          */
1261         addTopXid = (TransactionIdIsValid(topXid) &&
1262                                  TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0;
1263         appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid);
1264         for (i = 0; i < snapshot->xcnt; i++)
1265                 appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]);
1266         if (addTopXid)
1267                 appendStringInfo(&buf, "xip:%u\n", topXid);
1268
1269         /*
1270          * Similarly, we add our subcommitted child XIDs to the subxid data. Here,
1271          * we have to cope with possible overflow.
1272          */
1273         if (snapshot->suboverflowed ||
1274                 snapshot->subxcnt + nchildren > GetMaxSnapshotSubxidCount())
1275                 appendStringInfoString(&buf, "sof:1\n");
1276         else
1277         {
1278                 appendStringInfoString(&buf, "sof:0\n");
1279                 appendStringInfo(&buf, "sxcnt:%d\n", snapshot->subxcnt + nchildren);
1280                 for (i = 0; i < snapshot->subxcnt; i++)
1281                         appendStringInfo(&buf, "sxp:%u\n", snapshot->subxip[i]);
1282                 for (i = 0; i < nchildren; i++)
1283                         appendStringInfo(&buf, "sxp:%u\n", children[i]);
1284         }
1285         appendStringInfo(&buf, "rec:%u\n", snapshot->takenDuringRecovery);
1286
1287         /*
1288          * Now write the text representation into a file.  We first write to a
1289          * ".tmp" filename, and rename to final filename if no error.  This
1290          * ensures that no other backend can read an incomplete file
1291          * (ImportSnapshot won't allow it because of its valid-characters check).
1292          */
1293         snprintf(pathtmp, sizeof(pathtmp), "%s.tmp", path);
1294         if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
1295                 ereport(ERROR,
1296                                 (errcode_for_file_access(),
1297                                  errmsg("could not create file \"%s\": %m", pathtmp)));
1298
1299         if (fwrite(buf.data, buf.len, 1, f) != 1)
1300                 ereport(ERROR,
1301                                 (errcode_for_file_access(),
1302                                  errmsg("could not write to file \"%s\": %m", pathtmp)));
1303
1304         /* no fsync() since file need not survive a system crash */
1305
1306         if (FreeFile(f))
1307                 ereport(ERROR,
1308                                 (errcode_for_file_access(),
1309                                  errmsg("could not write to file \"%s\": %m", pathtmp)));
1310
1311         /*
1312          * Now that we have written everything into a .tmp file, rename the file
1313          * to remove the .tmp suffix.
1314          */
1315         if (rename(pathtmp, path) < 0)
1316                 ereport(ERROR,
1317                                 (errcode_for_file_access(),
1318                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
1319                                                 pathtmp, path)));
1320
1321         /*
1322          * The basename of the file is what we return from pg_export_snapshot().
1323          * It's already in path in a textual format and we know that the path
1324          * starts with SNAPSHOT_EXPORT_DIR.  Skip over the prefix and the slash
1325          * and pstrdup it so as not to return the address of a local variable.
1326          */
1327         return pstrdup(path + strlen(SNAPSHOT_EXPORT_DIR) + 1);
1328 }
1329
1330 /*
1331  * pg_export_snapshot
1332  *              SQL-callable wrapper for ExportSnapshot.
1333  */
1334 Datum
1335 pg_export_snapshot(PG_FUNCTION_ARGS)
1336 {
1337         char       *snapshotName;
1338
1339         snapshotName = ExportSnapshot(GetActiveSnapshot());
1340         PG_RETURN_TEXT_P(cstring_to_text(snapshotName));
1341 }
1342
1343
1344 /*
1345  * Parsing subroutines for ImportSnapshot: parse a line with the given
1346  * prefix followed by a value, and advance *s to the next line.  The
1347  * filename is provided for use in error messages.
1348  */
1349 static int
1350 parseIntFromText(const char *prefix, char **s, const char *filename)
1351 {
1352         char       *ptr = *s;
1353         int                     prefixlen = strlen(prefix);
1354         int                     val;
1355
1356         if (strncmp(ptr, prefix, prefixlen) != 0)
1357                 ereport(ERROR,
1358                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1359                                  errmsg("invalid snapshot data in file \"%s\"", filename)));
1360         ptr += prefixlen;
1361         if (sscanf(ptr, "%d", &val) != 1)
1362                 ereport(ERROR,
1363                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1364                                  errmsg("invalid snapshot data in file \"%s\"", filename)));
1365         ptr = strchr(ptr, '\n');
1366         if (!ptr)
1367                 ereport(ERROR,
1368                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1369                                  errmsg("invalid snapshot data in file \"%s\"", filename)));
1370         *s = ptr + 1;
1371         return val;
1372 }
1373
1374 static TransactionId
1375 parseXidFromText(const char *prefix, char **s, const char *filename)
1376 {
1377         char       *ptr = *s;
1378         int                     prefixlen = strlen(prefix);
1379         TransactionId val;
1380
1381         if (strncmp(ptr, prefix, prefixlen) != 0)
1382                 ereport(ERROR,
1383                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1384                                  errmsg("invalid snapshot data in file \"%s\"", filename)));
1385         ptr += prefixlen;
1386         if (sscanf(ptr, "%u", &val) != 1)
1387                 ereport(ERROR,
1388                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1389                                  errmsg("invalid snapshot data in file \"%s\"", filename)));
1390         ptr = strchr(ptr, '\n');
1391         if (!ptr)
1392                 ereport(ERROR,
1393                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1394                                  errmsg("invalid snapshot data in file \"%s\"", filename)));
1395         *s = ptr + 1;
1396         return val;
1397 }
1398
1399 static void
1400 parseVxidFromText(const char *prefix, char **s, const char *filename,
1401                                   VirtualTransactionId *vxid)
1402 {
1403         char       *ptr = *s;
1404         int                     prefixlen = strlen(prefix);
1405
1406         if (strncmp(ptr, prefix, prefixlen) != 0)
1407                 ereport(ERROR,
1408                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1409                                  errmsg("invalid snapshot data in file \"%s\"", filename)));
1410         ptr += prefixlen;
1411         if (sscanf(ptr, "%d/%u", &vxid->backendId, &vxid->localTransactionId) != 2)
1412                 ereport(ERROR,
1413                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1414                                  errmsg("invalid snapshot data in file \"%s\"", filename)));
1415         ptr = strchr(ptr, '\n');
1416         if (!ptr)
1417                 ereport(ERROR,
1418                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1419                                  errmsg("invalid snapshot data in file \"%s\"", filename)));
1420         *s = ptr + 1;
1421 }
1422
1423 /*
1424  * ImportSnapshot
1425  *              Import a previously exported snapshot.  The argument should be a
1426  *              filename in SNAPSHOT_EXPORT_DIR.  Load the snapshot from that file.
1427  *              This is called by "SET TRANSACTION SNAPSHOT 'foo'".
1428  */
1429 void
1430 ImportSnapshot(const char *idstr)
1431 {
1432         char            path[MAXPGPATH];
1433         FILE       *f;
1434         struct stat stat_buf;
1435         char       *filebuf;
1436         int                     xcnt;
1437         int                     i;
1438         VirtualTransactionId src_vxid;
1439         int                     src_pid;
1440         Oid                     src_dbid;
1441         int                     src_isolevel;
1442         bool            src_readonly;
1443         SnapshotData snapshot;
1444
1445         /*
1446          * Must be at top level of a fresh transaction.  Note in particular that
1447          * we check we haven't acquired an XID --- if we have, it's conceivable
1448          * that the snapshot would show it as not running, making for very screwy
1449          * behavior.
1450          */
1451         if (FirstSnapshotSet ||
1452                 GetTopTransactionIdIfAny() != InvalidTransactionId ||
1453                 IsSubTransaction())
1454                 ereport(ERROR,
1455                                 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1456                                  errmsg("SET TRANSACTION SNAPSHOT must be called before any query")));
1457
1458         /*
1459          * If we are in read committed mode then the next query would execute with
1460          * a new snapshot thus making this function call quite useless.
1461          */
1462         if (!IsolationUsesXactSnapshot())
1463                 ereport(ERROR,
1464                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1465                                  errmsg("a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ")));
1466
1467         /*
1468          * Verify the identifier: only 0-9, A-F and hyphens are allowed.  We do
1469          * this mainly to prevent reading arbitrary files.
1470          */
1471         if (strspn(idstr, "0123456789ABCDEF-") != strlen(idstr))
1472                 ereport(ERROR,
1473                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1474                                  errmsg("invalid snapshot identifier: \"%s\"", idstr)));
1475
1476         /* OK, read the file */
1477         snprintf(path, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", idstr);
1478
1479         f = AllocateFile(path, PG_BINARY_R);
1480         if (!f)
1481                 ereport(ERROR,
1482                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1483                                  errmsg("invalid snapshot identifier: \"%s\"", idstr)));
1484
1485         /* get the size of the file so that we know how much memory we need */
1486         if (fstat(fileno(f), &stat_buf))
1487                 elog(ERROR, "could not stat file \"%s\": %m", path);
1488
1489         /* and read the file into a palloc'd string */
1490         filebuf = (char *) palloc(stat_buf.st_size + 1);
1491         if (fread(filebuf, stat_buf.st_size, 1, f) != 1)
1492                 elog(ERROR, "could not read file \"%s\": %m", path);
1493
1494         filebuf[stat_buf.st_size] = '\0';
1495
1496         FreeFile(f);
1497
1498         /*
1499          * Construct a snapshot struct by parsing the file content.
1500          */
1501         memset(&snapshot, 0, sizeof(snapshot));
1502
1503         parseVxidFromText("vxid:", &filebuf, path, &src_vxid);
1504         src_pid = parseIntFromText("pid:", &filebuf, path);
1505         /* we abuse parseXidFromText a bit here ... */
1506         src_dbid = parseXidFromText("dbid:", &filebuf, path);
1507         src_isolevel = parseIntFromText("iso:", &filebuf, path);
1508         src_readonly = parseIntFromText("ro:", &filebuf, path);
1509
1510         snapshot.xmin = parseXidFromText("xmin:", &filebuf, path);
1511         snapshot.xmax = parseXidFromText("xmax:", &filebuf, path);
1512
1513         snapshot.xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path);
1514
1515         /* sanity-check the xid count before palloc */
1516         if (xcnt < 0 || xcnt > GetMaxSnapshotXidCount())
1517                 ereport(ERROR,
1518                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1519                                  errmsg("invalid snapshot data in file \"%s\"", path)));
1520
1521         snapshot.xip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
1522         for (i = 0; i < xcnt; i++)
1523                 snapshot.xip[i] = parseXidFromText("xip:", &filebuf, path);
1524
1525         snapshot.suboverflowed = parseIntFromText("sof:", &filebuf, path);
1526
1527         if (!snapshot.suboverflowed)
1528         {
1529                 snapshot.subxcnt = xcnt = parseIntFromText("sxcnt:", &filebuf, path);
1530
1531                 /* sanity-check the xid count before palloc */
1532                 if (xcnt < 0 || xcnt > GetMaxSnapshotSubxidCount())
1533                         ereport(ERROR,
1534                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1535                                          errmsg("invalid snapshot data in file \"%s\"", path)));
1536
1537                 snapshot.subxip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
1538                 for (i = 0; i < xcnt; i++)
1539                         snapshot.subxip[i] = parseXidFromText("sxp:", &filebuf, path);
1540         }
1541         else
1542         {
1543                 snapshot.subxcnt = 0;
1544                 snapshot.subxip = NULL;
1545         }
1546
1547         snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path);
1548
1549         /*
1550          * Do some additional sanity checking, just to protect ourselves.  We
1551          * don't trouble to check the array elements, just the most critical
1552          * fields.
1553          */
1554         if (!VirtualTransactionIdIsValid(src_vxid) ||
1555                 !OidIsValid(src_dbid) ||
1556                 !TransactionIdIsNormal(snapshot.xmin) ||
1557                 !TransactionIdIsNormal(snapshot.xmax))
1558                 ereport(ERROR,
1559                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1560                                  errmsg("invalid snapshot data in file \"%s\"", path)));
1561
1562         /*
1563          * If we're serializable, the source transaction must be too, otherwise
1564          * predicate.c has problems (SxactGlobalXmin could go backwards).  Also, a
1565          * non-read-only transaction can't adopt a snapshot from a read-only
1566          * transaction, as predicate.c handles the cases very differently.
1567          */
1568         if (IsolationIsSerializable())
1569         {
1570                 if (src_isolevel != XACT_SERIALIZABLE)
1571                         ereport(ERROR,
1572                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1573                                          errmsg("a serializable transaction cannot import a snapshot from a non-serializable transaction")));
1574                 if (src_readonly && !XactReadOnly)
1575                         ereport(ERROR,
1576                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1577                                          errmsg("a non-read-only serializable transaction cannot import a snapshot from a read-only transaction")));
1578         }
1579
1580         /*
1581          * We cannot import a snapshot that was taken in a different database,
1582          * because vacuum calculates OldestXmin on a per-database basis; so the
1583          * source transaction's xmin doesn't protect us from data loss.  This
1584          * restriction could be removed if the source transaction were to mark its
1585          * xmin as being globally applicable.  But that would require some
1586          * additional syntax, since that has to be known when the snapshot is
1587          * initially taken.  (See pgsql-hackers discussion of 2011-10-21.)
1588          */
1589         if (src_dbid != MyDatabaseId)
1590                 ereport(ERROR,
1591                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1592                                  errmsg("cannot import a snapshot from a different database")));
1593
1594         /* OK, install the snapshot */
1595         SetTransactionSnapshot(&snapshot, &src_vxid, src_pid, NULL);
1596 }
1597
1598 /*
1599  * XactHasExportedSnapshots
1600  *              Test whether current transaction has exported any snapshots.
1601  */
1602 bool
1603 XactHasExportedSnapshots(void)
1604 {
1605         return (exportedSnapshots != NIL);
1606 }
1607
1608 /*
1609  * DeleteAllExportedSnapshotFiles
1610  *              Clean up any files that have been left behind by a crashed backend
1611  *              that had exported snapshots before it died.
1612  *
1613  * This should be called during database startup or crash recovery.
1614  */
1615 void
1616 DeleteAllExportedSnapshotFiles(void)
1617 {
1618         char            buf[MAXPGPATH + sizeof(SNAPSHOT_EXPORT_DIR)];
1619         DIR                *s_dir;
1620         struct dirent *s_de;
1621
1622         /*
1623          * Problems in reading the directory, or unlinking files, are reported at
1624          * LOG level.  Since we're running in the startup process, ERROR level
1625          * would prevent database start, and it's not important enough for that.
1626          */
1627         s_dir = AllocateDir(SNAPSHOT_EXPORT_DIR);
1628
1629         while ((s_de = ReadDirExtended(s_dir, SNAPSHOT_EXPORT_DIR, LOG)) != NULL)
1630         {
1631                 if (strcmp(s_de->d_name, ".") == 0 ||
1632                         strcmp(s_de->d_name, "..") == 0)
1633                         continue;
1634
1635                 snprintf(buf, sizeof(buf), SNAPSHOT_EXPORT_DIR "/%s", s_de->d_name);
1636
1637                 if (unlink(buf) != 0)
1638                         ereport(LOG,
1639                                         (errcode_for_file_access(),
1640                                          errmsg("could not remove file \"%s\": %m", buf)));
1641         }
1642
1643         FreeDir(s_dir);
1644 }
1645
1646 /*
1647  * ThereAreNoPriorRegisteredSnapshots
1648  *              Is the registered snapshot count less than or equal to one?
1649  *
1650  * Don't use this to settle important decisions.  While zero registrations and
1651  * no ActiveSnapshot would confirm a certain idleness, the system makes no
1652  * guarantees about the significance of one registered snapshot.
1653  */
1654 bool
1655 ThereAreNoPriorRegisteredSnapshots(void)
1656 {
1657         if (pairingheap_is_empty(&RegisteredSnapshots) ||
1658                 pairingheap_is_singular(&RegisteredSnapshots))
1659                 return true;
1660
1661         return false;
1662 }
1663
1664
1665 /*
1666  * Return a timestamp that is exactly on a minute boundary.
1667  *
1668  * If the argument is already aligned, return that value, otherwise move to
1669  * the next minute boundary following the given time.
1670  */
1671 static TimestampTz
1672 AlignTimestampToMinuteBoundary(TimestampTz ts)
1673 {
1674         TimestampTz retval = ts + (USECS_PER_MINUTE - 1);
1675
1676         return retval - (retval % USECS_PER_MINUTE);
1677 }
1678
1679 /*
1680  * Get current timestamp for snapshots
1681  *
1682  * This is basically GetCurrentTimestamp(), but with a guarantee that
1683  * the result never moves backward.
1684  */
1685 TimestampTz
1686 GetSnapshotCurrentTimestamp(void)
1687 {
1688         TimestampTz now = GetCurrentTimestamp();
1689
1690         /*
1691          * Don't let time move backward; if it hasn't advanced, use the old value.
1692          */
1693         SpinLockAcquire(&oldSnapshotControl->mutex_current);
1694         if (now <= oldSnapshotControl->current_timestamp)
1695                 now = oldSnapshotControl->current_timestamp;
1696         else
1697                 oldSnapshotControl->current_timestamp = now;
1698         SpinLockRelease(&oldSnapshotControl->mutex_current);
1699
1700         return now;
1701 }
1702
1703 /*
1704  * Get timestamp through which vacuum may have processed based on last stored
1705  * value for threshold_timestamp.
1706  *
1707  * XXX: So far, we never trust that a 64-bit value can be read atomically; if
1708  * that ever changes, we could get rid of the spinlock here.
1709  */
1710 TimestampTz
1711 GetOldSnapshotThresholdTimestamp(void)
1712 {
1713         TimestampTz threshold_timestamp;
1714
1715         SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1716         threshold_timestamp = oldSnapshotControl->threshold_timestamp;
1717         SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1718
1719         return threshold_timestamp;
1720 }
1721
1722 static void
1723 SetOldSnapshotThresholdTimestamp(TimestampTz ts, TransactionId xlimit)
1724 {
1725         SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1726         oldSnapshotControl->threshold_timestamp = ts;
1727         oldSnapshotControl->threshold_xid = xlimit;
1728         SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1729 }
1730
1731 /*
1732  * TransactionIdLimitedForOldSnapshots
1733  *
1734  * Apply old snapshot limit, if any.  This is intended to be called for page
1735  * pruning and table vacuuming, to allow old_snapshot_threshold to override
1736  * the normal global xmin value.  Actual testing for snapshot too old will be
1737  * based on whether a snapshot timestamp is prior to the threshold timestamp
1738  * set in this function.
1739  */
1740 TransactionId
1741 TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
1742                                                                         Relation relation)
1743 {
1744         if (TransactionIdIsNormal(recentXmin)
1745                 && old_snapshot_threshold >= 0
1746                 && RelationAllowsEarlyPruning(relation))
1747         {
1748                 TimestampTz ts = GetSnapshotCurrentTimestamp();
1749                 TransactionId xlimit = recentXmin;
1750                 TransactionId latest_xmin;
1751                 TimestampTz update_ts;
1752                 bool            same_ts_as_threshold = false;
1753
1754                 SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
1755                 latest_xmin = oldSnapshotControl->latest_xmin;
1756                 update_ts = oldSnapshotControl->next_map_update;
1757                 SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
1758
1759                 /*
1760                  * Zero threshold always overrides to latest xmin, if valid.  Without
1761                  * some heuristic it will find its own snapshot too old on, for
1762                  * example, a simple UPDATE -- which would make it useless for most
1763                  * testing, but there is no principled way to ensure that it doesn't
1764                  * fail in this way.  Use a five-second delay to try to get useful
1765                  * testing behavior, but this may need adjustment.
1766                  */
1767                 if (old_snapshot_threshold == 0)
1768                 {
1769                         if (TransactionIdPrecedes(latest_xmin, MyPgXact->xmin)
1770                                 && TransactionIdFollows(latest_xmin, xlimit))
1771                                 xlimit = latest_xmin;
1772
1773                         ts -= 5 * USECS_PER_SEC;
1774                         SetOldSnapshotThresholdTimestamp(ts, xlimit);
1775
1776                         return xlimit;
1777                 }
1778
1779                 ts = AlignTimestampToMinuteBoundary(ts)
1780                         - (old_snapshot_threshold * USECS_PER_MINUTE);
1781
1782                 /* Check for fast exit without LW locking. */
1783                 SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1784                 if (ts == oldSnapshotControl->threshold_timestamp)
1785                 {
1786                         xlimit = oldSnapshotControl->threshold_xid;
1787                         same_ts_as_threshold = true;
1788                 }
1789                 SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1790
1791                 if (!same_ts_as_threshold)
1792                 {
1793                         if (ts == update_ts)
1794                         {
1795                                 xlimit = latest_xmin;
1796                                 if (NormalTransactionIdFollows(xlimit, recentXmin))
1797                                         SetOldSnapshotThresholdTimestamp(ts, xlimit);
1798                         }
1799                         else
1800                         {
1801                                 LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);
1802
1803                                 if (oldSnapshotControl->count_used > 0
1804                                         && ts >= oldSnapshotControl->head_timestamp)
1805                                 {
1806                                         int                     offset;
1807
1808                                         offset = ((ts - oldSnapshotControl->head_timestamp)
1809                                                           / USECS_PER_MINUTE);
1810                                         if (offset > oldSnapshotControl->count_used - 1)
1811                                                 offset = oldSnapshotControl->count_used - 1;
1812                                         offset = (oldSnapshotControl->head_offset + offset)
1813                                                 % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1814                                         xlimit = oldSnapshotControl->xid_by_minute[offset];
1815
1816                                         if (NormalTransactionIdFollows(xlimit, recentXmin))
1817                                                 SetOldSnapshotThresholdTimestamp(ts, xlimit);
1818                                 }
1819
1820                                 LWLockRelease(OldSnapshotTimeMapLock);
1821                         }
1822                 }
1823
1824                 /*
1825                  * Failsafe protection against vacuuming work of active transaction.
1826                  *
1827                  * This is not an assertion because we avoid the spinlock for
1828                  * performance, leaving open the possibility that xlimit could advance
1829                  * and be more current; but it seems prudent to apply this limit.  It
1830                  * might make pruning a tiny bit less aggressive than it could be, but
1831                  * protects against data loss bugs.
1832                  */
1833                 if (TransactionIdIsNormal(latest_xmin)
1834                         && TransactionIdPrecedes(latest_xmin, xlimit))
1835                         xlimit = latest_xmin;
1836
1837                 if (NormalTransactionIdFollows(xlimit, recentXmin))
1838                         return xlimit;
1839         }
1840
1841         return recentXmin;
1842 }
1843
1844 /*
1845  * Take care of the circular buffer that maps time to xid.
1846  */
1847 void
1848 MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
1849 {
1850         TimestampTz ts;
1851         TransactionId latest_xmin;
1852         TimestampTz update_ts;
1853         bool            map_update_required = false;
1854
1855         /* Never call this function when old snapshot checking is disabled. */
1856         Assert(old_snapshot_threshold >= 0);
1857
1858         ts = AlignTimestampToMinuteBoundary(whenTaken);
1859
1860         /*
1861          * Keep track of the latest xmin seen by any process. Update mapping with
1862          * a new value when we have crossed a bucket boundary.
1863          */
1864         SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
1865         latest_xmin = oldSnapshotControl->latest_xmin;
1866         update_ts = oldSnapshotControl->next_map_update;
1867         if (ts > update_ts)
1868         {
1869                 oldSnapshotControl->next_map_update = ts;
1870                 map_update_required = true;
1871         }
1872         if (TransactionIdFollows(xmin, latest_xmin))
1873                 oldSnapshotControl->latest_xmin = xmin;
1874         SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
1875
1876         /* We only needed to update the most recent xmin value. */
1877         if (!map_update_required)
1878                 return;
1879
1880         /* No further tracking needed for 0 (used for testing). */
1881         if (old_snapshot_threshold == 0)
1882                 return;
1883
1884         /*
1885          * We don't want to do something stupid with unusual values, but we don't
1886          * want to litter the log with warnings or break otherwise normal
1887          * processing for this feature; so if something seems unreasonable, just
1888          * log at DEBUG level and return without doing anything.
1889          */
1890         if (whenTaken < 0)
1891         {
1892                 elog(DEBUG1,
1893                          "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
1894                          (long) whenTaken);
1895                 return;
1896         }
1897         if (!TransactionIdIsNormal(xmin))
1898         {
1899                 elog(DEBUG1,
1900                          "MaintainOldSnapshotTimeMapping called with xmin = %lu",
1901                          (unsigned long) xmin);
1902                 return;
1903         }
1904
1905         LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE);
1906
1907         Assert(oldSnapshotControl->head_offset >= 0);
1908         Assert(oldSnapshotControl->head_offset < OLD_SNAPSHOT_TIME_MAP_ENTRIES);
1909         Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0);
1910         Assert(oldSnapshotControl->count_used >= 0);
1911         Assert(oldSnapshotControl->count_used <= OLD_SNAPSHOT_TIME_MAP_ENTRIES);
1912
1913         if (oldSnapshotControl->count_used == 0)
1914         {
1915                 /* set up first entry for empty mapping */
1916                 oldSnapshotControl->head_offset = 0;
1917                 oldSnapshotControl->head_timestamp = ts;
1918                 oldSnapshotControl->count_used = 1;
1919                 oldSnapshotControl->xid_by_minute[0] = xmin;
1920         }
1921         else if (ts < oldSnapshotControl->head_timestamp)
1922         {
1923                 /* old ts; log it at DEBUG */
1924                 LWLockRelease(OldSnapshotTimeMapLock);
1925                 elog(DEBUG1,
1926                          "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
1927                          (long) whenTaken);
1928                 return;
1929         }
1930         else if (ts <= (oldSnapshotControl->head_timestamp +
1931                                         ((oldSnapshotControl->count_used - 1)
1932                                          * USECS_PER_MINUTE)))
1933         {
1934                 /* existing mapping; advance xid if possible */
1935                 int                     bucket = (oldSnapshotControl->head_offset
1936                                                           + ((ts - oldSnapshotControl->head_timestamp)
1937                                                                  / USECS_PER_MINUTE))
1938                 % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1939
1940                 if (TransactionIdPrecedes(oldSnapshotControl->xid_by_minute[bucket], xmin))
1941                         oldSnapshotControl->xid_by_minute[bucket] = xmin;
1942         }
1943         else
1944         {
1945                 /* We need a new bucket, but it might not be the very next one. */
1946                 int                     advance = ((ts - oldSnapshotControl->head_timestamp)
1947                                                            / USECS_PER_MINUTE);
1948
1949                 oldSnapshotControl->head_timestamp = ts;
1950
1951                 if (advance >= OLD_SNAPSHOT_TIME_MAP_ENTRIES)
1952                 {
1953                         /* Advance is so far that all old data is junk; start over. */
1954                         oldSnapshotControl->head_offset = 0;
1955                         oldSnapshotControl->count_used = 1;
1956                         oldSnapshotControl->xid_by_minute[0] = xmin;
1957                 }
1958                 else
1959                 {
1960                         /* Store the new value in one or more buckets. */
1961                         int                     i;
1962
1963                         for (i = 0; i < advance; i++)
1964                         {
1965                                 if (oldSnapshotControl->count_used == OLD_SNAPSHOT_TIME_MAP_ENTRIES)
1966                                 {
1967                                         /* Map full and new value replaces old head. */
1968                                         int                     old_head = oldSnapshotControl->head_offset;
1969
1970                                         if (old_head == (OLD_SNAPSHOT_TIME_MAP_ENTRIES - 1))
1971                                                 oldSnapshotControl->head_offset = 0;
1972                                         else
1973                                                 oldSnapshotControl->head_offset = old_head + 1;
1974                                         oldSnapshotControl->xid_by_minute[old_head] = xmin;
1975                                 }
1976                                 else
1977                                 {
1978                                         /* Extend map to unused entry. */
1979                                         int                     new_tail = (oldSnapshotControl->head_offset
1980                                                                                         + oldSnapshotControl->count_used)
1981                                         % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1982
1983                                         oldSnapshotControl->count_used++;
1984                                         oldSnapshotControl->xid_by_minute[new_tail] = xmin;
1985                                 }
1986                         }
1987                 }
1988         }
1989
1990         LWLockRelease(OldSnapshotTimeMapLock);
1991 }
1992
1993
1994 /*
1995  * Setup a snapshot that replaces normal catalog snapshots that allows catalog
1996  * access to behave just like it did at a certain point in the past.
1997  *
1998  * Needed for logical decoding.
1999  */
2000 void
2001 SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
2002 {
2003         Assert(historic_snapshot != NULL);
2004
2005         /* setup the timetravel snapshot */
2006         HistoricSnapshot = historic_snapshot;
2007
2008         /* setup (cmin, cmax) lookup hash */
2009         tuplecid_data = tuplecids;
2010 }
2011
2012
2013 /*
2014  * Make catalog snapshots behave normally again.
2015  */
2016 void
2017 TeardownHistoricSnapshot(bool is_error)
2018 {
2019         HistoricSnapshot = NULL;
2020         tuplecid_data = NULL;
2021 }
2022
2023 bool
2024 HistoricSnapshotActive(void)
2025 {
2026         return HistoricSnapshot != NULL;
2027 }
2028
2029 HTAB *
2030 HistoricSnapshotGetTupleCids(void)
2031 {
2032         Assert(HistoricSnapshotActive());
2033         return tuplecid_data;
2034 }
2035
2036 /*
2037  * EstimateSnapshotSpace
2038  *              Returns the size needed to store the given snapshot.
2039  *
2040  * We are exporting only required fields from the Snapshot, stored in
2041  * SerializedSnapshotData.
2042  */
2043 Size
2044 EstimateSnapshotSpace(Snapshot snap)
2045 {
2046         Size            size;
2047
2048         Assert(snap != InvalidSnapshot);
2049         Assert(snap->satisfies == HeapTupleSatisfiesMVCC);
2050
2051         /* We allocate any XID arrays needed in the same palloc block. */
2052         size = add_size(sizeof(SerializedSnapshotData),
2053                                         mul_size(snap->xcnt, sizeof(TransactionId)));
2054         if (snap->subxcnt > 0 &&
2055                 (!snap->suboverflowed || snap->takenDuringRecovery))
2056                 size = add_size(size,
2057                                                 mul_size(snap->subxcnt, sizeof(TransactionId)));
2058
2059         return size;
2060 }
2061
2062 /*
2063  * SerializeSnapshot
2064  *              Dumps the serialized snapshot (extracted from given snapshot) onto the
2065  *              memory location at start_address.
2066  */
2067 void
2068 SerializeSnapshot(Snapshot snapshot, char *start_address)
2069 {
2070         SerializedSnapshotData serialized_snapshot;
2071
2072         Assert(snapshot->subxcnt >= 0);
2073
2074         /* Copy all required fields */
2075         serialized_snapshot.xmin = snapshot->xmin;
2076         serialized_snapshot.xmax = snapshot->xmax;
2077         serialized_snapshot.xcnt = snapshot->xcnt;
2078         serialized_snapshot.subxcnt = snapshot->subxcnt;
2079         serialized_snapshot.suboverflowed = snapshot->suboverflowed;
2080         serialized_snapshot.takenDuringRecovery = snapshot->takenDuringRecovery;
2081         serialized_snapshot.curcid = snapshot->curcid;
2082         serialized_snapshot.whenTaken = snapshot->whenTaken;
2083         serialized_snapshot.lsn = snapshot->lsn;
2084
2085         /*
2086          * Ignore the SubXID array if it has overflowed, unless the snapshot was
2087          * taken during recovery - in that case, top-level XIDs are in subxip as
2088          * well, and we mustn't lose them.
2089          */
2090         if (serialized_snapshot.suboverflowed && !snapshot->takenDuringRecovery)
2091                 serialized_snapshot.subxcnt = 0;
2092
2093         /* Copy struct to possibly-unaligned buffer */
2094         memcpy(start_address,
2095                    &serialized_snapshot, sizeof(SerializedSnapshotData));
2096
2097         /* Copy XID array */
2098         if (snapshot->xcnt > 0)
2099                 memcpy((TransactionId *) (start_address +
2100                                                                   sizeof(SerializedSnapshotData)),
2101                            snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
2102
2103         /*
2104          * Copy SubXID array. Don't bother to copy it if it had overflowed,
2105          * though, because it's not used anywhere in that case. Except if it's a
2106          * snapshot taken during recovery; all the top-level XIDs are in subxip as
2107          * well in that case, so we mustn't lose them.
2108          */
2109         if (serialized_snapshot.subxcnt > 0)
2110         {
2111                 Size            subxipoff = sizeof(SerializedSnapshotData) +
2112                 snapshot->xcnt * sizeof(TransactionId);
2113
2114                 memcpy((TransactionId *) (start_address + subxipoff),
2115                            snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId));
2116         }
2117 }
2118
2119 /*
2120  * RestoreSnapshot
2121  *              Restore a serialized snapshot from the specified address.
2122  *
2123  * The copy is palloc'd in TopTransactionContext and has initial refcounts set
2124  * to 0.  The returned snapshot has the copied flag set.
2125  */
2126 Snapshot
2127 RestoreSnapshot(char *start_address)
2128 {
2129         SerializedSnapshotData serialized_snapshot;
2130         Size            size;
2131         Snapshot        snapshot;
2132         TransactionId *serialized_xids;
2133
2134         memcpy(&serialized_snapshot, start_address,
2135                    sizeof(SerializedSnapshotData));
2136         serialized_xids = (TransactionId *)
2137                 (start_address + sizeof(SerializedSnapshotData));
2138
2139         /* We allocate any XID arrays needed in the same palloc block. */
2140         size = sizeof(SnapshotData)
2141                 + serialized_snapshot.xcnt * sizeof(TransactionId)
2142                 + serialized_snapshot.subxcnt * sizeof(TransactionId);
2143
2144         /* Copy all required fields */
2145         snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
2146         snapshot->satisfies = HeapTupleSatisfiesMVCC;
2147         snapshot->xmin = serialized_snapshot.xmin;
2148         snapshot->xmax = serialized_snapshot.xmax;
2149         snapshot->xip = NULL;
2150         snapshot->xcnt = serialized_snapshot.xcnt;
2151         snapshot->subxip = NULL;
2152         snapshot->subxcnt = serialized_snapshot.subxcnt;
2153         snapshot->suboverflowed = serialized_snapshot.suboverflowed;
2154         snapshot->takenDuringRecovery = serialized_snapshot.takenDuringRecovery;
2155         snapshot->curcid = serialized_snapshot.curcid;
2156         snapshot->whenTaken = serialized_snapshot.whenTaken;
2157         snapshot->lsn = serialized_snapshot.lsn;
2158
2159         /* Copy XIDs, if present. */
2160         if (serialized_snapshot.xcnt > 0)
2161         {
2162                 snapshot->xip = (TransactionId *) (snapshot + 1);
2163                 memcpy(snapshot->xip, serialized_xids,
2164                            serialized_snapshot.xcnt * sizeof(TransactionId));
2165         }
2166
2167         /* Copy SubXIDs, if present. */
2168         if (serialized_snapshot.subxcnt > 0)
2169         {
2170                 snapshot->subxip = ((TransactionId *) (snapshot + 1)) +
2171                         serialized_snapshot.xcnt;
2172                 memcpy(snapshot->subxip, serialized_xids + serialized_snapshot.xcnt,
2173                            serialized_snapshot.subxcnt * sizeof(TransactionId));
2174         }
2175
2176         /* Set the copied flag so that the caller will set refcounts correctly. */
2177         snapshot->regd_count = 0;
2178         snapshot->active_count = 0;
2179         snapshot->copied = true;
2180
2181         return snapshot;
2182 }
2183
2184 /*
2185  * Install a restored snapshot as the transaction snapshot.
2186  *
2187  * The second argument is of type void * so that snapmgr.h need not include
2188  * the declaration for PGPROC.
2189  */
2190 void
2191 RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
2192 {
2193         SetTransactionSnapshot(snapshot, NULL, InvalidPid, master_pgproc);
2194 }