]> granicus.if.org Git - postgresql/blob - src/backend/replication/logical/reorderbuffer.c
Phase 3 of pgindent updates.
[postgresql] / src / backend / replication / logical / reorderbuffer.c
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  *        PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  *        This module gets handed individual pieces of transactions in the order
15  *        they are written to the WAL and is responsible to reassemble them into
16  *        toplevel transaction sized pieces. When a transaction is completely
17  *        reassembled - signalled by reading the transaction commit record - it
18  *        will then call the output plugin (c.f. ReorderBufferCommit()) with the
19  *        individual changes. The output plugins rely on snapshots built by
20  *        snapbuild.c which hands them to us.
21  *
22  *        Transactions and subtransactions/savepoints in postgres are not
23  *        immediately linked to each other from outside the performing
24  *        backend. Only at commit/abort (or special xact_assignment records) they
25  *        are linked together. Which means that we will have to splice together a
26  *        toplevel transaction from its subtransactions. To do that efficiently we
27  *        build a binary heap indexed by the smallest current lsn of the individual
28  *        subtransactions' changestreams. As the individual streams are inherently
29  *        ordered by LSN - since that is where we build them from - the transaction
30  *        can easily be reassembled by always using the subtransaction with the
31  *        smallest current LSN from the heap.
32  *
33  *        In order to cope with large transactions - which can be several times as
34  *        big as the available memory - this module supports spooling the contents
35  *        of a large transactions to disk. When the transaction is replayed the
36  *        contents of individual (sub-)transactions will be read from disk in
37  *        chunks.
38  *
39  *        This module also has to deal with reassembling toast records from the
40  *        individual chunks stored in WAL. When a new (or initial) version of a
41  *        tuple is stored in WAL it will always be preceded by the toast chunks
42  *        emitted for the columns stored out of line. Within a single toplevel
43  *        transaction there will be no other data carrying records between a row's
44  *        toast chunks and the row data itself. See ReorderBufferToast* for
45  *        details.
46  * -------------------------------------------------------------------------
47  */
48 #include "postgres.h"
49
50 #include <unistd.h>
51 #include <sys/stat.h>
52
53 #include "access/rewriteheap.h"
54 #include "access/transam.h"
55 #include "access/tuptoaster.h"
56 #include "access/xact.h"
57 #include "access/xlog_internal.h"
58 #include "catalog/catalog.h"
59 #include "lib/binaryheap.h"
60 #include "miscadmin.h"
61 #include "pgstat.h"
62 #include "replication/logical.h"
63 #include "replication/reorderbuffer.h"
64 #include "replication/slot.h"
65 #include "replication/snapbuild.h"      /* just for SnapBuildSnapDecRefcount */
66 #include "storage/bufmgr.h"
67 #include "storage/fd.h"
68 #include "storage/sinval.h"
69 #include "utils/builtins.h"
70 #include "utils/combocid.h"
71 #include "utils/memdebug.h"
72 #include "utils/memutils.h"
73 #include "utils/rel.h"
74 #include "utils/relfilenodemap.h"
75 #include "utils/tqual.h"
76
77
78 /* entry for a hash table we use to map from xid to our transaction state */
79 typedef struct ReorderBufferTXNByIdEnt
80 {
81         TransactionId xid;
82         ReorderBufferTXN *txn;
83 } ReorderBufferTXNByIdEnt;
84
85 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
86 typedef struct ReorderBufferTupleCidKey
87 {
88         RelFileNode relnode;
89         ItemPointerData tid;
90 } ReorderBufferTupleCidKey;
91
92 typedef struct ReorderBufferTupleCidEnt
93 {
94         ReorderBufferTupleCidKey key;
95         CommandId       cmin;
96         CommandId       cmax;
97         CommandId       combocid;               /* just for debugging */
98 } ReorderBufferTupleCidEnt;
99
100 /* k-way in-order change iteration support structures */
101 typedef struct ReorderBufferIterTXNEntry
102 {
103         XLogRecPtr      lsn;
104         ReorderBufferChange *change;
105         ReorderBufferTXN *txn;
106         int                     fd;
107         XLogSegNo       segno;
108 } ReorderBufferIterTXNEntry;
109
110 typedef struct ReorderBufferIterTXNState
111 {
112         binaryheap *heap;
113         Size            nr_txns;
114         dlist_head      old_change;
115         ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
116 } ReorderBufferIterTXNState;
117
118 /* toast datastructures */
119 typedef struct ReorderBufferToastEnt
120 {
121         Oid                     chunk_id;               /* toast_table.chunk_id */
122         int32           last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
123                                                                  * have seen */
124         Size            num_chunks;             /* number of chunks we've already seen */
125         Size            size;                   /* combined size of chunks seen */
126         dlist_head      chunks;                 /* linked list of chunks */
127         struct varlena *reconstructed;  /* reconstructed varlena now pointed to in
128                                                                          * main tup */
129 } ReorderBufferToastEnt;
130
131 /* Disk serialization support datastructures */
132 typedef struct ReorderBufferDiskChange
133 {
134         Size            size;
135         ReorderBufferChange change;
136         /* data follows */
137 } ReorderBufferDiskChange;
138
139 /*
140  * Maximum number of changes kept in memory, per transaction. After that,
141  * changes are spooled to disk.
142  *
143  * The current value should be sufficient to decode the entire transaction
144  * without hitting disk in OLTP workloads, while starting to spool to disk in
145  * other workloads reasonably fast.
146  *
147  * At some point in the future it probably makes sense to have a more elaborate
148  * resource management here, but it's not entirely clear what that would look
149  * like.
150  */
151 static const Size max_changes_in_memory = 4096;
152
153 /*
154  * We use a very simple form of a slab allocator for frequently allocated
155  * objects, simply keeping a fixed number in a linked list when unused,
156  * instead pfree()ing them. Without that in many workloads aset.c becomes a
157  * major bottleneck, especially when spilling to disk while decoding batch
158  * workloads.
159  */
160 static const Size max_cached_tuplebufs = 4096 * 2;      /* ~8MB */
161
162 /* ---------------------------------------
163  * primary reorderbuffer support routines
164  * ---------------------------------------
165  */
166 static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
167 static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
168 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
169                                           TransactionId xid, bool create, bool *is_new,
170                                           XLogRecPtr lsn, bool create_as_top);
171
172 static void AssertTXNLsnOrder(ReorderBuffer *rb);
173
174 /* ---------------------------------------
175  * support functions for lsn-order iterating over the ->changes of a
176  * transaction and its subtransactions
177  *
178  * used for iteration over the k-way heap merge of a transaction and its
179  * subtransactions
180  * ---------------------------------------
181  */
182 static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn);
183 static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
184 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
185                                                    ReorderBufferIterTXNState *state);
186 static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
187
188 /*
189  * ---------------------------------------
190  * Disk serialization support functions
191  * ---------------------------------------
192  */
193 static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
194 static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
195 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
196                                                          int fd, ReorderBufferChange *change);
197 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
198                                                         int *fd, XLogSegNo *segno);
199 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
200                                                    char *change);
201 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
202
203 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
204 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
205                                           ReorderBufferTXN *txn, CommandId cid);
206
207 /* ---------------------------------------
208  * toast reassembly support
209  * ---------------------------------------
210  */
211 static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
212 static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
213 static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
214                                                   Relation relation, ReorderBufferChange *change);
215 static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
216                                                           Relation relation, ReorderBufferChange *change);
217
218
219 /*
220  * Allocate a new ReorderBuffer
221  */
222 ReorderBuffer *
223 ReorderBufferAllocate(void)
224 {
225         ReorderBuffer *buffer;
226         HASHCTL         hash_ctl;
227         MemoryContext new_ctx;
228
229         /* allocate memory in own context, to have better accountability */
230         new_ctx = AllocSetContextCreate(CurrentMemoryContext,
231                                                                         "ReorderBuffer",
232                                                                         ALLOCSET_DEFAULT_SIZES);
233
234         buffer =
235                 (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
236
237         memset(&hash_ctl, 0, sizeof(hash_ctl));
238
239         buffer->context = new_ctx;
240
241         buffer->change_context = SlabContextCreate(new_ctx,
242                                                                                            "Change",
243                                                                                            SLAB_DEFAULT_BLOCK_SIZE,
244                                                                                            sizeof(ReorderBufferChange));
245
246         buffer->txn_context = SlabContextCreate(new_ctx,
247                                                                                         "TXN",
248                                                                                         SLAB_DEFAULT_BLOCK_SIZE,
249                                                                                         sizeof(ReorderBufferTXN));
250
251         hash_ctl.keysize = sizeof(TransactionId);
252         hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
253         hash_ctl.hcxt = buffer->context;
254
255         buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
256                                                                  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
257
258         buffer->by_txn_last_xid = InvalidTransactionId;
259         buffer->by_txn_last_txn = NULL;
260
261         buffer->nr_cached_tuplebufs = 0;
262
263         buffer->outbuf = NULL;
264         buffer->outbufsize = 0;
265
266         buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
267
268         dlist_init(&buffer->toplevel_by_lsn);
269         slist_init(&buffer->cached_tuplebufs);
270
271         return buffer;
272 }
273
274 /*
275  * Free a ReorderBuffer
276  */
277 void
278 ReorderBufferFree(ReorderBuffer *rb)
279 {
280         MemoryContext context = rb->context;
281
282         /*
283          * We free separately allocated data by entirely scrapping reorderbuffer's
284          * memory context.
285          */
286         MemoryContextDelete(context);
287 }
288
289 /*
290  * Get an unused, possibly preallocated, ReorderBufferTXN.
291  */
292 static ReorderBufferTXN *
293 ReorderBufferGetTXN(ReorderBuffer *rb)
294 {
295         ReorderBufferTXN *txn;
296
297         txn = (ReorderBufferTXN *)
298                 MemoryContextAlloc(rb->txn_context, sizeof(ReorderBufferTXN));
299
300         memset(txn, 0, sizeof(ReorderBufferTXN));
301
302         dlist_init(&txn->changes);
303         dlist_init(&txn->tuplecids);
304         dlist_init(&txn->subtxns);
305
306         return txn;
307 }
308
309 /*
310  * Free a ReorderBufferTXN.
311  *
312  * Deallocation might be delayed for efficiency purposes, for details check
313  * the comments above max_cached_changes's definition.
314  */
315 static void
316 ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
317 {
318         /* clean the lookup cache if we were cached (quite likely) */
319         if (rb->by_txn_last_xid == txn->xid)
320         {
321                 rb->by_txn_last_xid = InvalidTransactionId;
322                 rb->by_txn_last_txn = NULL;
323         }
324
325         /* free data that's contained */
326
327         if (txn->tuplecid_hash != NULL)
328         {
329                 hash_destroy(txn->tuplecid_hash);
330                 txn->tuplecid_hash = NULL;
331         }
332
333         if (txn->invalidations)
334         {
335                 pfree(txn->invalidations);
336                 txn->invalidations = NULL;
337         }
338
339         pfree(txn);
340 }
341
342 /*
343  * Get an unused, possibly preallocated, ReorderBufferChange.
344  */
345 ReorderBufferChange *
346 ReorderBufferGetChange(ReorderBuffer *rb)
347 {
348         ReorderBufferChange *change;
349
350         change = (ReorderBufferChange *)
351                 MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
352
353         memset(change, 0, sizeof(ReorderBufferChange));
354         return change;
355 }
356
357 /*
358  * Free an ReorderBufferChange.
359  *
360  * Deallocation might be delayed for efficiency purposes, for details check
361  * the comments above max_cached_changes's definition.
362  */
363 void
364 ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
365 {
366         /* free contained data */
367         switch (change->action)
368         {
369                 case REORDER_BUFFER_CHANGE_INSERT:
370                 case REORDER_BUFFER_CHANGE_UPDATE:
371                 case REORDER_BUFFER_CHANGE_DELETE:
372                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
373                         if (change->data.tp.newtuple)
374                         {
375                                 ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
376                                 change->data.tp.newtuple = NULL;
377                         }
378
379                         if (change->data.tp.oldtuple)
380                         {
381                                 ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
382                                 change->data.tp.oldtuple = NULL;
383                         }
384                         break;
385                 case REORDER_BUFFER_CHANGE_MESSAGE:
386                         if (change->data.msg.prefix != NULL)
387                                 pfree(change->data.msg.prefix);
388                         change->data.msg.prefix = NULL;
389                         if (change->data.msg.message != NULL)
390                                 pfree(change->data.msg.message);
391                         change->data.msg.message = NULL;
392                         break;
393                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
394                         if (change->data.snapshot)
395                         {
396                                 ReorderBufferFreeSnap(rb, change->data.snapshot);
397                                 change->data.snapshot = NULL;
398                         }
399                         break;
400                         /* no data in addition to the struct itself */
401                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
402                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
403                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
404                         break;
405         }
406
407         pfree(change);
408 }
409
410 /*
411  * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at
412  * least a tuple of size tuple_len (excluding header overhead).
413  */
414 ReorderBufferTupleBuf *
415 ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
416 {
417         ReorderBufferTupleBuf *tuple;
418         Size            alloc_len;
419
420         alloc_len = tuple_len + SizeofHeapTupleHeader;
421
422         /*
423          * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
424          * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
425          * generated for oldtuples can be bigger, as they don't have out-of-line
426          * toast columns.
427          */
428         if (alloc_len < MaxHeapTupleSize)
429                 alloc_len = MaxHeapTupleSize;
430
431
432         /* if small enough, check the slab cache */
433         if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
434         {
435                 rb->nr_cached_tuplebufs--;
436                 tuple = slist_container(ReorderBufferTupleBuf, node,
437                                                                 slist_pop_head_node(&rb->cached_tuplebufs));
438                 Assert(tuple->alloc_tuple_size == MaxHeapTupleSize);
439 #ifdef USE_ASSERT_CHECKING
440                 memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
441                 VALGRIND_MAKE_MEM_UNDEFINED(&tuple->tuple, sizeof(HeapTupleData));
442 #endif
443                 tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
444 #ifdef USE_ASSERT_CHECKING
445                 memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
446                 VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
447 #endif
448         }
449         else
450         {
451                 tuple = (ReorderBufferTupleBuf *)
452                         MemoryContextAlloc(rb->context,
453                                                            sizeof(ReorderBufferTupleBuf) +
454                                                            MAXIMUM_ALIGNOF + alloc_len);
455                 tuple->alloc_tuple_size = alloc_len;
456                 tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
457         }
458
459         return tuple;
460 }
461
462 /*
463  * Free an ReorderBufferTupleBuf.
464  *
465  * Deallocation might be delayed for efficiency purposes, for details check
466  * the comments above max_cached_changes's definition.
467  */
468 void
469 ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
470 {
471         /* check whether to put into the slab cache, oversized tuples never are */
472         if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
473                 rb->nr_cached_tuplebufs < max_cached_tuplebufs)
474         {
475                 rb->nr_cached_tuplebufs++;
476                 slist_push_head(&rb->cached_tuplebufs, &tuple->node);
477                 VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
478                 VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf));
479                 VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
480                 VALGRIND_MAKE_MEM_DEFINED(&tuple->alloc_tuple_size, sizeof(tuple->alloc_tuple_size));
481         }
482         else
483         {
484                 pfree(tuple);
485         }
486 }
487
488 /*
489  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
490  * If create is true, and a transaction doesn't already exist, create it
491  * (with the given LSN, and as top transaction if that's specified);
492  * when this happens, is_new is set to true.
493  */
494 static ReorderBufferTXN *
495 ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
496                                           bool *is_new, XLogRecPtr lsn, bool create_as_top)
497 {
498         ReorderBufferTXN *txn;
499         ReorderBufferTXNByIdEnt *ent;
500         bool            found;
501
502         Assert(TransactionIdIsValid(xid));
503         Assert(!create || lsn != InvalidXLogRecPtr);
504
505         /*
506          * Check the one-entry lookup cache first
507          */
508         if (TransactionIdIsValid(rb->by_txn_last_xid) &&
509                 rb->by_txn_last_xid == xid)
510         {
511                 txn = rb->by_txn_last_txn;
512
513                 if (txn != NULL)
514                 {
515                         /* found it, and it's valid */
516                         if (is_new)
517                                 *is_new = false;
518                         return txn;
519                 }
520
521                 /*
522                  * cached as non-existent, and asked not to create? Then nothing else
523                  * to do.
524                  */
525                 if (!create)
526                         return NULL;
527                 /* otherwise fall through to create it */
528         }
529
530         /*
531          * If the cache wasn't hit or it yielded an "does-not-exist" and we want
532          * to create an entry.
533          */
534
535         /* search the lookup table */
536         ent = (ReorderBufferTXNByIdEnt *)
537                 hash_search(rb->by_txn,
538                                         (void *) &xid,
539                                         create ? HASH_ENTER : HASH_FIND,
540                                         &found);
541         if (found)
542                 txn = ent->txn;
543         else if (create)
544         {
545                 /* initialize the new entry, if creation was requested */
546                 Assert(ent != NULL);
547
548                 ent->txn = ReorderBufferGetTXN(rb);
549                 ent->txn->xid = xid;
550                 txn = ent->txn;
551                 txn->first_lsn = lsn;
552                 txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
553
554                 if (create_as_top)
555                 {
556                         dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
557                         AssertTXNLsnOrder(rb);
558                 }
559         }
560         else
561                 txn = NULL;                             /* not found and not asked to create */
562
563         /* update cache */
564         rb->by_txn_last_xid = xid;
565         rb->by_txn_last_txn = txn;
566
567         if (is_new)
568                 *is_new = !found;
569
570         Assert(!create || txn != NULL);
571         return txn;
572 }
573
574 /*
575  * Queue a change into a transaction so it can be replayed upon commit.
576  */
577 void
578 ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
579                                                  ReorderBufferChange *change)
580 {
581         ReorderBufferTXN *txn;
582
583         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
584
585         change->lsn = lsn;
586         Assert(InvalidXLogRecPtr != lsn);
587         dlist_push_tail(&txn->changes, &change->node);
588         txn->nentries++;
589         txn->nentries_mem++;
590
591         ReorderBufferCheckSerializeTXN(rb, txn);
592 }
593
594 /*
595  * Queue message into a transaction so it can be processed upon commit.
596  */
597 void
598 ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
599                                                   Snapshot snapshot, XLogRecPtr lsn,
600                                                   bool transactional, const char *prefix,
601                                                   Size message_size, const char *message)
602 {
603         if (transactional)
604         {
605                 MemoryContext oldcontext;
606                 ReorderBufferChange *change;
607
608                 Assert(xid != InvalidTransactionId);
609
610                 oldcontext = MemoryContextSwitchTo(rb->context);
611
612                 change = ReorderBufferGetChange(rb);
613                 change->action = REORDER_BUFFER_CHANGE_MESSAGE;
614                 change->data.msg.prefix = pstrdup(prefix);
615                 change->data.msg.message_size = message_size;
616                 change->data.msg.message = palloc(message_size);
617                 memcpy(change->data.msg.message, message, message_size);
618
619                 ReorderBufferQueueChange(rb, xid, lsn, change);
620
621                 MemoryContextSwitchTo(oldcontext);
622         }
623         else
624         {
625                 ReorderBufferTXN *txn = NULL;
626                 volatile Snapshot snapshot_now = snapshot;
627
628                 if (xid != InvalidTransactionId)
629                         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
630
631                 /* setup snapshot to allow catalog access */
632                 SetupHistoricSnapshot(snapshot_now, NULL);
633                 PG_TRY();
634                 {
635                         rb->message(rb, txn, lsn, false, prefix, message_size, message);
636
637                         TeardownHistoricSnapshot(false);
638                 }
639                 PG_CATCH();
640                 {
641                         TeardownHistoricSnapshot(true);
642                         PG_RE_THROW();
643                 }
644                 PG_END_TRY();
645         }
646 }
647
648
649 static void
650 AssertTXNLsnOrder(ReorderBuffer *rb)
651 {
652 #ifdef USE_ASSERT_CHECKING
653         dlist_iter      iter;
654         XLogRecPtr      prev_first_lsn = InvalidXLogRecPtr;
655
656         dlist_foreach(iter, &rb->toplevel_by_lsn)
657         {
658                 ReorderBufferTXN *cur_txn;
659
660                 cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
661                 Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
662
663                 if (cur_txn->end_lsn != InvalidXLogRecPtr)
664                         Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
665
666                 if (prev_first_lsn != InvalidXLogRecPtr)
667                         Assert(prev_first_lsn < cur_txn->first_lsn);
668
669                 Assert(!cur_txn->is_known_as_subxact);
670                 prev_first_lsn = cur_txn->first_lsn;
671         }
672 #endif
673 }
674
675 ReorderBufferTXN *
676 ReorderBufferGetOldestTXN(ReorderBuffer *rb)
677 {
678         ReorderBufferTXN *txn;
679
680         if (dlist_is_empty(&rb->toplevel_by_lsn))
681                 return NULL;
682
683         AssertTXNLsnOrder(rb);
684
685         txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
686
687         Assert(!txn->is_known_as_subxact);
688         Assert(txn->first_lsn != InvalidXLogRecPtr);
689         return txn;
690 }
691
692 void
693 ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
694 {
695         rb->current_restart_decoding_lsn = ptr;
696 }
697
698 void
699 ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
700                                                  TransactionId subxid, XLogRecPtr lsn)
701 {
702         ReorderBufferTXN *txn;
703         ReorderBufferTXN *subtxn;
704         bool            new_top;
705         bool            new_sub;
706
707         txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
708         subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
709
710         if (new_sub)
711         {
712                 /*
713                  * we assign subtransactions to top level transaction even if we don't
714                  * have data for it yet, assignment records frequently reference xids
715                  * that have not yet produced any records. Knowing those aren't top
716                  * level xids allows us to make processing cheaper in some places.
717                  */
718                 dlist_push_tail(&txn->subtxns, &subtxn->node);
719                 txn->nsubtxns++;
720         }
721         else if (!subtxn->is_known_as_subxact)
722         {
723                 subtxn->is_known_as_subxact = true;
724                 Assert(subtxn->nsubtxns == 0);
725
726                 /* remove from lsn order list of top-level transactions */
727                 dlist_delete(&subtxn->node);
728
729                 /* add to toplevel transaction */
730                 dlist_push_tail(&txn->subtxns, &subtxn->node);
731                 txn->nsubtxns++;
732         }
733         else if (new_top)
734         {
735                 elog(ERROR, "existing subxact assigned to unknown toplevel xact");
736         }
737 }
738
739 /*
740  * Associate a subtransaction with its toplevel transaction at commit
741  * time. There may be no further changes added after this.
742  */
743 void
744 ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
745                                                  TransactionId subxid, XLogRecPtr commit_lsn,
746                                                  XLogRecPtr end_lsn)
747 {
748         ReorderBufferTXN *txn;
749         ReorderBufferTXN *subtxn;
750
751         subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
752                                                                    InvalidXLogRecPtr, false);
753
754         /*
755          * No need to do anything if that subtxn didn't contain any changes
756          */
757         if (!subtxn)
758                 return;
759
760         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
761
762         if (txn == NULL)
763                 elog(ERROR, "subxact logged without previous toplevel record");
764
765         /*
766          * Pass our base snapshot to the parent transaction if it doesn't have
767          * one, or ours is older. That can happen if there are no changes in the
768          * toplevel transaction but in one of the child transactions. This allows
769          * the parent to simply use its base snapshot initially.
770          */
771         if (subtxn->base_snapshot != NULL &&
772                 (txn->base_snapshot == NULL ||
773                  txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
774         {
775                 txn->base_snapshot = subtxn->base_snapshot;
776                 txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
777                 subtxn->base_snapshot = NULL;
778                 subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
779         }
780
781         subtxn->final_lsn = commit_lsn;
782         subtxn->end_lsn = end_lsn;
783
784         if (!subtxn->is_known_as_subxact)
785         {
786                 subtxn->is_known_as_subxact = true;
787                 Assert(subtxn->nsubtxns == 0);
788
789                 /* remove from lsn order list of top-level transactions */
790                 dlist_delete(&subtxn->node);
791
792                 /* add to subtransaction list */
793                 dlist_push_tail(&txn->subtxns, &subtxn->node);
794                 txn->nsubtxns++;
795         }
796 }
797
798
799 /*
800  * Support for efficiently iterating over a transaction's and its
801  * subtransactions' changes.
802  *
803  * We do by doing a k-way merge between transactions/subtransactions. For that
804  * we model the current heads of the different transactions as a binary heap
805  * so we easily know which (sub-)transaction has the change with the smallest
806  * lsn next.
807  *
808  * We assume the changes in individual transactions are already sorted by LSN.
809  */
810
811 /*
812  * Binary heap comparison function.
813  */
814 static int
815 ReorderBufferIterCompare(Datum a, Datum b, void *arg)
816 {
817         ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
818         XLogRecPtr      pos_a = state->entries[DatumGetInt32(a)].lsn;
819         XLogRecPtr      pos_b = state->entries[DatumGetInt32(b)].lsn;
820
821         if (pos_a < pos_b)
822                 return 1;
823         else if (pos_a == pos_b)
824                 return 0;
825         return -1;
826 }
827
828 /*
829  * Allocate & initialize an iterator which iterates in lsn order over a
830  * transaction and all its subtransactions.
831  */
832 static ReorderBufferIterTXNState *
833 ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
834 {
835         Size            nr_txns = 0;
836         ReorderBufferIterTXNState *state;
837         dlist_iter      cur_txn_i;
838         int32           off;
839
840         /*
841          * Calculate the size of our heap: one element for every transaction that
842          * contains changes.  (Besides the transactions already in the reorder
843          * buffer, we count the one we were directly passed.)
844          */
845         if (txn->nentries > 0)
846                 nr_txns++;
847
848         dlist_foreach(cur_txn_i, &txn->subtxns)
849         {
850                 ReorderBufferTXN *cur_txn;
851
852                 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
853
854                 if (cur_txn->nentries > 0)
855                         nr_txns++;
856         }
857
858         /*
859          * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
860          * need to allocate/build a heap then.
861          */
862
863         /* allocate iteration state */
864         state = (ReorderBufferIterTXNState *)
865                 MemoryContextAllocZero(rb->context,
866                                                            sizeof(ReorderBufferIterTXNState) +
867                                                            sizeof(ReorderBufferIterTXNEntry) * nr_txns);
868
869         state->nr_txns = nr_txns;
870         dlist_init(&state->old_change);
871
872         for (off = 0; off < state->nr_txns; off++)
873         {
874                 state->entries[off].fd = -1;
875                 state->entries[off].segno = 0;
876         }
877
878         /* allocate heap */
879         state->heap = binaryheap_allocate(state->nr_txns,
880                                                                           ReorderBufferIterCompare,
881                                                                           state);
882
883         /*
884          * Now insert items into the binary heap, in an unordered fashion.  (We
885          * will run a heap assembly step at the end; this is more efficient.)
886          */
887
888         off = 0;
889
890         /* add toplevel transaction if it contains changes */
891         if (txn->nentries > 0)
892         {
893                 ReorderBufferChange *cur_change;
894
895                 if (txn->serialized)
896                 {
897                         /* serialize remaining changes */
898                         ReorderBufferSerializeTXN(rb, txn);
899                         ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
900                                                                                 &state->entries[off].segno);
901                 }
902
903                 cur_change = dlist_head_element(ReorderBufferChange, node,
904                                                                                 &txn->changes);
905
906                 state->entries[off].lsn = cur_change->lsn;
907                 state->entries[off].change = cur_change;
908                 state->entries[off].txn = txn;
909
910                 binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
911         }
912
913         /* add subtransactions if they contain changes */
914         dlist_foreach(cur_txn_i, &txn->subtxns)
915         {
916                 ReorderBufferTXN *cur_txn;
917
918                 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
919
920                 if (cur_txn->nentries > 0)
921                 {
922                         ReorderBufferChange *cur_change;
923
924                         if (cur_txn->serialized)
925                         {
926                                 /* serialize remaining changes */
927                                 ReorderBufferSerializeTXN(rb, cur_txn);
928                                 ReorderBufferRestoreChanges(rb, cur_txn,
929                                                                                         &state->entries[off].fd,
930                                                                                         &state->entries[off].segno);
931                         }
932                         cur_change = dlist_head_element(ReorderBufferChange, node,
933                                                                                         &cur_txn->changes);
934
935                         state->entries[off].lsn = cur_change->lsn;
936                         state->entries[off].change = cur_change;
937                         state->entries[off].txn = cur_txn;
938
939                         binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
940                 }
941         }
942
943         /* assemble a valid binary heap */
944         binaryheap_build(state->heap);
945
946         return state;
947 }
948
949 /*
950  * Return the next change when iterating over a transaction and its
951  * subtransactions.
952  *
953  * Returns NULL when no further changes exist.
954  */
955 static ReorderBufferChange *
956 ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
957 {
958         ReorderBufferChange *change;
959         ReorderBufferIterTXNEntry *entry;
960         int32           off;
961
962         /* nothing there anymore */
963         if (state->heap->bh_size == 0)
964                 return NULL;
965
966         off = DatumGetInt32(binaryheap_first(state->heap));
967         entry = &state->entries[off];
968
969         /* free memory we might have "leaked" in the previous *Next call */
970         if (!dlist_is_empty(&state->old_change))
971         {
972                 change = dlist_container(ReorderBufferChange, node,
973                                                                  dlist_pop_head_node(&state->old_change));
974                 ReorderBufferReturnChange(rb, change);
975                 Assert(dlist_is_empty(&state->old_change));
976         }
977
978         change = entry->change;
979
980         /*
981          * update heap with information about which transaction has the next
982          * relevant change in LSN order
983          */
984
985         /* there are in-memory changes */
986         if (dlist_has_next(&entry->txn->changes, &entry->change->node))
987         {
988                 dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
989                 ReorderBufferChange *next_change =
990                 dlist_container(ReorderBufferChange, node, next);
991
992                 /* txn stays the same */
993                 state->entries[off].lsn = next_change->lsn;
994                 state->entries[off].change = next_change;
995
996                 binaryheap_replace_first(state->heap, Int32GetDatum(off));
997                 return change;
998         }
999
1000         /* try to load changes from disk */
1001         if (entry->txn->nentries != entry->txn->nentries_mem)
1002         {
1003                 /*
1004                  * Ugly: restoring changes will reuse *Change records, thus delete the
1005                  * current one from the per-tx list and only free in the next call.
1006                  */
1007                 dlist_delete(&change->node);
1008                 dlist_push_tail(&state->old_change, &change->node);
1009
1010                 if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1011                                                                                 &state->entries[off].segno))
1012                 {
1013                         /* successfully restored changes from disk */
1014                         ReorderBufferChange *next_change =
1015                         dlist_head_element(ReorderBufferChange, node,
1016                                                            &entry->txn->changes);
1017
1018                         elog(DEBUG2, "restored %u/%u changes from disk",
1019                                  (uint32) entry->txn->nentries_mem,
1020                                  (uint32) entry->txn->nentries);
1021
1022                         Assert(entry->txn->nentries_mem);
1023                         /* txn stays the same */
1024                         state->entries[off].lsn = next_change->lsn;
1025                         state->entries[off].change = next_change;
1026                         binaryheap_replace_first(state->heap, Int32GetDatum(off));
1027
1028                         return change;
1029                 }
1030         }
1031
1032         /* ok, no changes there anymore, remove */
1033         binaryheap_remove_first(state->heap);
1034
1035         return change;
1036 }
1037
1038 /*
1039  * Deallocate the iterator
1040  */
1041 static void
1042 ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1043                                                    ReorderBufferIterTXNState *state)
1044 {
1045         int32           off;
1046
1047         for (off = 0; off < state->nr_txns; off++)
1048         {
1049                 if (state->entries[off].fd != -1)
1050                         CloseTransientFile(state->entries[off].fd);
1051         }
1052
1053         /* free memory we might have "leaked" in the last *Next call */
1054         if (!dlist_is_empty(&state->old_change))
1055         {
1056                 ReorderBufferChange *change;
1057
1058                 change = dlist_container(ReorderBufferChange, node,
1059                                                                  dlist_pop_head_node(&state->old_change));
1060                 ReorderBufferReturnChange(rb, change);
1061                 Assert(dlist_is_empty(&state->old_change));
1062         }
1063
1064         binaryheap_free(state->heap);
1065         pfree(state);
1066 }
1067
1068 /*
1069  * Cleanup the contents of a transaction, usually after the transaction
1070  * committed or aborted.
1071  */
1072 static void
1073 ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1074 {
1075         bool            found;
1076         dlist_mutable_iter iter;
1077
1078         /* cleanup subtransactions & their changes */
1079         dlist_foreach_modify(iter, &txn->subtxns)
1080         {
1081                 ReorderBufferTXN *subtxn;
1082
1083                 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1084
1085                 /*
1086                  * Subtransactions are always associated to the toplevel TXN, even if
1087                  * they originally were happening inside another subtxn, so we won't
1088                  * ever recurse more than one level deep here.
1089                  */
1090                 Assert(subtxn->is_known_as_subxact);
1091                 Assert(subtxn->nsubtxns == 0);
1092
1093                 ReorderBufferCleanupTXN(rb, subtxn);
1094         }
1095
1096         /* cleanup changes in the toplevel txn */
1097         dlist_foreach_modify(iter, &txn->changes)
1098         {
1099                 ReorderBufferChange *change;
1100
1101                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1102
1103                 ReorderBufferReturnChange(rb, change);
1104         }
1105
1106         /*
1107          * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1108          * They are always stored in the toplevel transaction.
1109          */
1110         dlist_foreach_modify(iter, &txn->tuplecids)
1111         {
1112                 ReorderBufferChange *change;
1113
1114                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1115                 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1116                 ReorderBufferReturnChange(rb, change);
1117         }
1118
1119         if (txn->base_snapshot != NULL)
1120         {
1121                 SnapBuildSnapDecRefcount(txn->base_snapshot);
1122                 txn->base_snapshot = NULL;
1123                 txn->base_snapshot_lsn = InvalidXLogRecPtr;
1124         }
1125
1126         /*
1127          * Remove TXN from its containing list.
1128          *
1129          * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1130          * parent's list of known subxacts; this leaves the parent's nsubxacts
1131          * count too high, but we don't care.  Otherwise, we are deleting the TXN
1132          * from the LSN-ordered list of toplevel TXNs.
1133          */
1134         dlist_delete(&txn->node);
1135
1136         /* now remove reference from buffer */
1137         hash_search(rb->by_txn,
1138                                 (void *) &txn->xid,
1139                                 HASH_REMOVE,
1140                                 &found);
1141         Assert(found);
1142
1143         /* remove entries spilled to disk */
1144         if (txn->serialized)
1145                 ReorderBufferRestoreCleanup(rb, txn);
1146
1147         /* deallocate */
1148         ReorderBufferReturnTXN(rb, txn);
1149 }
1150
1151 /*
1152  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1153  * tqual.c's HeapTupleSatisfiesHistoricMVCC.
1154  */
1155 static void
1156 ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
1157 {
1158         dlist_iter      iter;
1159         HASHCTL         hash_ctl;
1160
1161         if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1162                 return;
1163
1164         memset(&hash_ctl, 0, sizeof(hash_ctl));
1165
1166         hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1167         hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1168         hash_ctl.hcxt = rb->context;
1169
1170         /*
1171          * create the hash with the exact number of to-be-stored tuplecids from
1172          * the start
1173          */
1174         txn->tuplecid_hash =
1175                 hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1176                                         HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1177
1178         dlist_foreach(iter, &txn->tuplecids)
1179         {
1180                 ReorderBufferTupleCidKey key;
1181                 ReorderBufferTupleCidEnt *ent;
1182                 bool            found;
1183                 ReorderBufferChange *change;
1184
1185                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1186
1187                 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1188
1189                 /* be careful about padding */
1190                 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1191
1192                 key.relnode = change->data.tuplecid.node;
1193
1194                 ItemPointerCopy(&change->data.tuplecid.tid,
1195                                                 &key.tid);
1196
1197                 ent = (ReorderBufferTupleCidEnt *)
1198                         hash_search(txn->tuplecid_hash,
1199                                                 (void *) &key,
1200                                                 HASH_ENTER | HASH_FIND,
1201                                                 &found);
1202                 if (!found)
1203                 {
1204                         ent->cmin = change->data.tuplecid.cmin;
1205                         ent->cmax = change->data.tuplecid.cmax;
1206                         ent->combocid = change->data.tuplecid.combocid;
1207                 }
1208                 else
1209                 {
1210                         Assert(ent->cmin == change->data.tuplecid.cmin);
1211                         Assert(ent->cmax == InvalidCommandId ||
1212                                    ent->cmax == change->data.tuplecid.cmax);
1213
1214                         /*
1215                          * if the tuple got valid in this transaction and now got deleted
1216                          * we already have a valid cmin stored. The cmax will be
1217                          * InvalidCommandId though.
1218                          */
1219                         ent->cmax = change->data.tuplecid.cmax;
1220                 }
1221         }
1222 }
1223
1224 /*
1225  * Copy a provided snapshot so we can modify it privately. This is needed so
1226  * that catalog modifying transactions can look into intermediate catalog
1227  * states.
1228  */
1229 static Snapshot
1230 ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
1231                                           ReorderBufferTXN *txn, CommandId cid)
1232 {
1233         Snapshot        snap;
1234         dlist_iter      iter;
1235         int                     i = 0;
1236         Size            size;
1237
1238         size = sizeof(SnapshotData) +
1239                 sizeof(TransactionId) * orig_snap->xcnt +
1240                 sizeof(TransactionId) * (txn->nsubtxns + 1);
1241
1242         snap = MemoryContextAllocZero(rb->context, size);
1243         memcpy(snap, orig_snap, sizeof(SnapshotData));
1244
1245         snap->copied = true;
1246         snap->active_count = 1;         /* mark as active so nobody frees it */
1247         snap->regd_count = 0;
1248         snap->xip = (TransactionId *) (snap + 1);
1249
1250         memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1251
1252         /*
1253          * snap->subxip contains all txids that belong to our transaction which we
1254          * need to check via cmin/cmax. That's why we store the toplevel
1255          * transaction in there as well.
1256          */
1257         snap->subxip = snap->xip + snap->xcnt;
1258         snap->subxip[i++] = txn->xid;
1259
1260         /*
1261          * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1262          * Since it's an upper boundary it is safe to use it for the allocation
1263          * above.
1264          */
1265         snap->subxcnt = 1;
1266
1267         dlist_foreach(iter, &txn->subtxns)
1268         {
1269                 ReorderBufferTXN *sub_txn;
1270
1271                 sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1272                 snap->subxip[i++] = sub_txn->xid;
1273                 snap->subxcnt++;
1274         }
1275
1276         /* sort so we can bsearch() later */
1277         qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1278
1279         /* store the specified current CommandId */
1280         snap->curcid = cid;
1281
1282         return snap;
1283 }
1284
1285 /*
1286  * Free a previously ReorderBufferCopySnap'ed snapshot
1287  */
1288 static void
1289 ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
1290 {
1291         if (snap->copied)
1292                 pfree(snap);
1293         else
1294                 SnapBuildSnapDecRefcount(snap);
1295 }
1296
1297 /*
1298  * Perform the replay of a transaction and it's non-aborted subtransactions.
1299  *
1300  * Subtransactions previously have to be processed by
1301  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1302  * transaction with ReorderBufferAssignChild.
1303  *
1304  * We currently can only decode a transaction's contents in when their commit
1305  * record is read because that's currently the only place where we know about
1306  * cache invalidations. Thus, once a toplevel commit is read, we iterate over
1307  * the top and subtransactions (using a k-way merge) and replay the changes in
1308  * lsn order.
1309  */
1310 void
1311 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
1312                                         XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1313                                         TimestampTz commit_time,
1314                                         RepOriginId origin_id, XLogRecPtr origin_lsn)
1315 {
1316         ReorderBufferTXN *txn;
1317         volatile Snapshot snapshot_now;
1318         volatile CommandId command_id = FirstCommandId;
1319         bool            using_subtxn;
1320         ReorderBufferIterTXNState *volatile iterstate = NULL;
1321
1322         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1323                                                                 false);
1324
1325         /* unknown transaction, nothing to replay */
1326         if (txn == NULL)
1327                 return;
1328
1329         txn->final_lsn = commit_lsn;
1330         txn->end_lsn = end_lsn;
1331         txn->commit_time = commit_time;
1332         txn->origin_id = origin_id;
1333         txn->origin_lsn = origin_lsn;
1334
1335         /*
1336          * If this transaction didn't have any real changes in our database, it's
1337          * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1338          * transferred its snapshot to this transaction if it had one and the
1339          * toplevel tx didn't.
1340          */
1341         if (txn->base_snapshot == NULL)
1342         {
1343                 Assert(txn->ninvalidations == 0);
1344                 ReorderBufferCleanupTXN(rb, txn);
1345                 return;
1346         }
1347
1348         snapshot_now = txn->base_snapshot;
1349
1350         /* build data to be able to lookup the CommandIds of catalog tuples */
1351         ReorderBufferBuildTupleCidHash(rb, txn);
1352
1353         /* setup the initial snapshot */
1354         SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1355
1356         /*
1357          * Decoding needs access to syscaches et al., which in turn use
1358          * heavyweight locks and such. Thus we need to have enough state around to
1359          * keep track of those.  The easiest way is to simply use a transaction
1360          * internally.  That also allows us to easily enforce that nothing writes
1361          * to the database by checking for xid assignments.
1362          *
1363          * When we're called via the SQL SRF there's already a transaction
1364          * started, so start an explicit subtransaction there.
1365          */
1366         using_subtxn = IsTransactionOrTransactionBlock();
1367
1368         PG_TRY();
1369         {
1370                 ReorderBufferChange *change;
1371                 ReorderBufferChange *specinsert = NULL;
1372
1373                 if (using_subtxn)
1374                         BeginInternalSubTransaction("replay");
1375                 else
1376                         StartTransactionCommand();
1377
1378                 rb->begin(rb, txn);
1379
1380                 iterstate = ReorderBufferIterTXNInit(rb, txn);
1381                 while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1382                 {
1383                         Relation        relation = NULL;
1384                         Oid                     reloid;
1385
1386                         switch (change->action)
1387                         {
1388                                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
1389
1390                                         /*
1391                                          * Confirmation for speculative insertion arrived. Simply
1392                                          * use as a normal record. It'll be cleaned up at the end
1393                                          * of INSERT processing.
1394                                          */
1395                                         Assert(specinsert->data.tp.oldtuple == NULL);
1396                                         change = specinsert;
1397                                         change->action = REORDER_BUFFER_CHANGE_INSERT;
1398
1399                                         /* intentionally fall through */
1400                                 case REORDER_BUFFER_CHANGE_INSERT:
1401                                 case REORDER_BUFFER_CHANGE_UPDATE:
1402                                 case REORDER_BUFFER_CHANGE_DELETE:
1403                                         Assert(snapshot_now);
1404
1405                                         reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1406                                                                                                 change->data.tp.relnode.relNode);
1407
1408                                         /*
1409                                          * Catalog tuple without data, emitted while catalog was
1410                                          * in the process of being rewritten.
1411                                          */
1412                                         if (reloid == InvalidOid &&
1413                                                 change->data.tp.newtuple == NULL &&
1414                                                 change->data.tp.oldtuple == NULL)
1415                                                 goto change_done;
1416                                         else if (reloid == InvalidOid)
1417                                                 elog(ERROR, "could not map filenode \"%s\" to relation OID",
1418                                                          relpathperm(change->data.tp.relnode,
1419                                                                                  MAIN_FORKNUM));
1420
1421                                         relation = RelationIdGetRelation(reloid);
1422
1423                                         if (relation == NULL)
1424                                                 elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1425                                                          reloid,
1426                                                          relpathperm(change->data.tp.relnode,
1427                                                                                  MAIN_FORKNUM));
1428
1429                                         if (!RelationIsLogicallyLogged(relation))
1430                                                 goto change_done;
1431
1432                                         /*
1433                                          * For now ignore sequence changes entirely. Most of the
1434                                          * time they don't log changes using records we
1435                                          * understand, so it doesn't make sense to handle the few
1436                                          * cases we do.
1437                                          */
1438                                         if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1439                                                 goto change_done;
1440
1441                                         /* user-triggered change */
1442                                         if (!IsToastRelation(relation))
1443                                         {
1444                                                 ReorderBufferToastReplace(rb, txn, relation, change);
1445                                                 rb->apply_change(rb, txn, relation, change);
1446
1447                                                 /*
1448                                                  * Only clear reassembled toast chunks if we're sure
1449                                                  * they're not required anymore. The creator of the
1450                                                  * tuple tells us.
1451                                                  */
1452                                                 if (change->data.tp.clear_toast_afterwards)
1453                                                         ReorderBufferToastReset(rb, txn);
1454                                         }
1455                                         /* we're not interested in toast deletions */
1456                                         else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1457                                         {
1458                                                 /*
1459                                                  * Need to reassemble the full toasted Datum in
1460                                                  * memory, to ensure the chunks don't get reused till
1461                                                  * we're done remove it from the list of this
1462                                                  * transaction's changes. Otherwise it will get
1463                                                  * freed/reused while restoring spooled data from
1464                                                  * disk.
1465                                                  */
1466                                                 dlist_delete(&change->node);
1467                                                 ReorderBufferToastAppendChunk(rb, txn, relation,
1468                                                                                                           change);
1469                                         }
1470
1471                         change_done:
1472
1473                                         /*
1474                                          * Either speculative insertion was confirmed, or it was
1475                                          * unsuccessful and the record isn't needed anymore.
1476                                          */
1477                                         if (specinsert != NULL)
1478                                         {
1479                                                 ReorderBufferReturnChange(rb, specinsert);
1480                                                 specinsert = NULL;
1481                                         }
1482
1483                                         if (relation != NULL)
1484                                         {
1485                                                 RelationClose(relation);
1486                                                 relation = NULL;
1487                                         }
1488                                         break;
1489
1490                                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
1491
1492                                         /*
1493                                          * Speculative insertions are dealt with by delaying the
1494                                          * processing of the insert until the confirmation record
1495                                          * arrives. For that we simply unlink the record from the
1496                                          * chain, so it does not get freed/reused while restoring
1497                                          * spooled data from disk.
1498                                          *
1499                                          * This is safe in the face of concurrent catalog changes
1500                                          * because the relevant relation can't be changed between
1501                                          * speculative insertion and confirmation due to
1502                                          * CheckTableNotInUse() and locking.
1503                                          */
1504
1505                                         /* clear out a pending (and thus failed) speculation */
1506                                         if (specinsert != NULL)
1507                                         {
1508                                                 ReorderBufferReturnChange(rb, specinsert);
1509                                                 specinsert = NULL;
1510                                         }
1511
1512                                         /* and memorize the pending insertion */
1513                                         dlist_delete(&change->node);
1514                                         specinsert = change;
1515                                         break;
1516
1517                                 case REORDER_BUFFER_CHANGE_MESSAGE:
1518                                         rb->message(rb, txn, change->lsn, true,
1519                                                                 change->data.msg.prefix,
1520                                                                 change->data.msg.message_size,
1521                                                                 change->data.msg.message);
1522                                         break;
1523
1524                                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
1525                                         /* get rid of the old */
1526                                         TeardownHistoricSnapshot(false);
1527
1528                                         if (snapshot_now->copied)
1529                                         {
1530                                                 ReorderBufferFreeSnap(rb, snapshot_now);
1531                                                 snapshot_now =
1532                                                         ReorderBufferCopySnap(rb, change->data.snapshot,
1533                                                                                                   txn, command_id);
1534                                         }
1535
1536                                         /*
1537                                          * Restored from disk, need to be careful not to double
1538                                          * free. We could introduce refcounting for that, but for
1539                                          * now this seems infrequent enough not to care.
1540                                          */
1541                                         else if (change->data.snapshot->copied)
1542                                         {
1543                                                 snapshot_now =
1544                                                         ReorderBufferCopySnap(rb, change->data.snapshot,
1545                                                                                                   txn, command_id);
1546                                         }
1547                                         else
1548                                         {
1549                                                 snapshot_now = change->data.snapshot;
1550                                         }
1551
1552
1553                                         /* and continue with the new one */
1554                                         SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1555                                         break;
1556
1557                                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
1558                                         Assert(change->data.command_id != InvalidCommandId);
1559
1560                                         if (command_id < change->data.command_id)
1561                                         {
1562                                                 command_id = change->data.command_id;
1563
1564                                                 if (!snapshot_now->copied)
1565                                                 {
1566                                                         /* we don't use the global one anymore */
1567                                                         snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1568                                                                                                                                  txn, command_id);
1569                                                 }
1570
1571                                                 snapshot_now->curcid = command_id;
1572
1573                                                 TeardownHistoricSnapshot(false);
1574                                                 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1575
1576                                                 /*
1577                                                  * Every time the CommandId is incremented, we could
1578                                                  * see new catalog contents, so execute all
1579                                                  * invalidations.
1580                                                  */
1581                                                 ReorderBufferExecuteInvalidations(rb, txn);
1582                                         }
1583
1584                                         break;
1585
1586                                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
1587                                         elog(ERROR, "tuplecid value in changequeue");
1588                                         break;
1589                         }
1590                 }
1591
1592                 /*
1593                  * There's a speculative insertion remaining, just clean in up, it
1594                  * can't have been successful, otherwise we'd gotten a confirmation
1595                  * record.
1596                  */
1597                 if (specinsert)
1598                 {
1599                         ReorderBufferReturnChange(rb, specinsert);
1600                         specinsert = NULL;
1601                 }
1602
1603                 /* clean up the iterator */
1604                 ReorderBufferIterTXNFinish(rb, iterstate);
1605                 iterstate = NULL;
1606
1607                 /* call commit callback */
1608                 rb->commit(rb, txn, commit_lsn);
1609
1610                 /* this is just a sanity check against bad output plugin behaviour */
1611                 if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
1612                         elog(ERROR, "output plugin used XID %u",
1613                                  GetCurrentTransactionId());
1614
1615                 /* cleanup */
1616                 TeardownHistoricSnapshot(false);
1617
1618                 /*
1619                  * Aborting the current (sub-)transaction as a whole has the right
1620                  * semantics. We want all locks acquired in here to be released, not
1621                  * reassigned to the parent and we do not want any database access
1622                  * have persistent effects.
1623                  */
1624                 AbortCurrentTransaction();
1625
1626                 /* make sure there's no cache pollution */
1627                 ReorderBufferExecuteInvalidations(rb, txn);
1628
1629                 if (using_subtxn)
1630                         RollbackAndReleaseCurrentSubTransaction();
1631
1632                 if (snapshot_now->copied)
1633                         ReorderBufferFreeSnap(rb, snapshot_now);
1634
1635                 /* remove potential on-disk data, and deallocate */
1636                 ReorderBufferCleanupTXN(rb, txn);
1637         }
1638         PG_CATCH();
1639         {
1640                 /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1641                 if (iterstate)
1642                         ReorderBufferIterTXNFinish(rb, iterstate);
1643
1644                 TeardownHistoricSnapshot(true);
1645
1646                 /*
1647                  * Force cache invalidation to happen outside of a valid transaction
1648                  * to prevent catalog access as we just caught an error.
1649                  */
1650                 AbortCurrentTransaction();
1651
1652                 /* make sure there's no cache pollution */
1653                 ReorderBufferExecuteInvalidations(rb, txn);
1654
1655                 if (using_subtxn)
1656                         RollbackAndReleaseCurrentSubTransaction();
1657
1658                 if (snapshot_now->copied)
1659                         ReorderBufferFreeSnap(rb, snapshot_now);
1660
1661                 /* remove potential on-disk data, and deallocate */
1662                 ReorderBufferCleanupTXN(rb, txn);
1663
1664                 PG_RE_THROW();
1665         }
1666         PG_END_TRY();
1667 }
1668
1669 /*
1670  * Abort a transaction that possibly has previous changes. Needs to be first
1671  * called for subtransactions and then for the toplevel xid.
1672  *
1673  * NB: Transactions handled here have to have actively aborted (i.e. have
1674  * produced an abort record). Implicitly aborted transactions are handled via
1675  * ReorderBufferAbortOld(); transactions we're just not interested in, but
1676  * which have committed are handled in ReorderBufferForget().
1677  *
1678  * This function purges this transaction and its contents from memory and
1679  * disk.
1680  */
1681 void
1682 ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1683 {
1684         ReorderBufferTXN *txn;
1685
1686         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1687                                                                 false);
1688
1689         /* unknown, nothing to remove */
1690         if (txn == NULL)
1691                 return;
1692
1693         /* cosmetic... */
1694         txn->final_lsn = lsn;
1695
1696         /* remove potential on-disk data, and deallocate */
1697         ReorderBufferCleanupTXN(rb, txn);
1698 }
1699
1700 /*
1701  * Abort all transactions that aren't actually running anymore because the
1702  * server restarted.
1703  *
1704  * NB: These really have to be transactions that have aborted due to a server
1705  * crash/immediate restart, as we don't deal with invalidations here.
1706  */
1707 void
1708 ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
1709 {
1710         dlist_mutable_iter it;
1711
1712         /*
1713          * Iterate through all (potential) toplevel TXNs and abort all that are
1714          * older than what possibly can be running. Once we've found the first
1715          * that is alive we stop, there might be some that acquired an xid earlier
1716          * but started writing later, but it's unlikely and they will cleaned up
1717          * in a later call to ReorderBufferAbortOld().
1718          */
1719         dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1720         {
1721                 ReorderBufferTXN *txn;
1722
1723                 txn = dlist_container(ReorderBufferTXN, node, it.cur);
1724
1725                 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1726                 {
1727                         elog(DEBUG2, "aborting old transaction %u", txn->xid);
1728
1729                         /* remove potential on-disk data, and deallocate this tx */
1730                         ReorderBufferCleanupTXN(rb, txn);
1731                 }
1732                 else
1733                         return;
1734         }
1735 }
1736
1737 /*
1738  * Forget the contents of a transaction if we aren't interested in it's
1739  * contents. Needs to be first called for subtransactions and then for the
1740  * toplevel xid.
1741  *
1742  * This is significantly different to ReorderBufferAbort() because
1743  * transactions that have committed need to be treated differently from aborted
1744  * ones since they may have modified the catalog.
1745  *
1746  * Note that this is only allowed to be called in the moment a transaction
1747  * commit has just been read, not earlier; otherwise later records referring
1748  * to this xid might re-create the transaction incompletely.
1749  */
1750 void
1751 ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1752 {
1753         ReorderBufferTXN *txn;
1754
1755         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1756                                                                 false);
1757
1758         /* unknown, nothing to forget */
1759         if (txn == NULL)
1760                 return;
1761
1762         /* cosmetic... */
1763         txn->final_lsn = lsn;
1764
1765         /*
1766          * Process cache invalidation messages if there are any. Even if we're not
1767          * interested in the transaction's contents, it could have manipulated the
1768          * catalog and we need to update the caches according to that.
1769          */
1770         if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1771                 ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
1772                                                                                    txn->invalidations);
1773         else
1774                 Assert(txn->ninvalidations == 0);
1775
1776         /* remove potential on-disk data, and deallocate */
1777         ReorderBufferCleanupTXN(rb, txn);
1778 }
1779
1780 /*
1781  * Execute invalidations happening outside the context of a decoded
1782  * transaction. That currently happens either for xid-less commits
1783  * (c.f. RecordTransactionCommit()) or for invalidations in uninteresting
1784  * transactions (via ReorderBufferForget()).
1785  */
1786 void
1787 ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
1788                                                                    SharedInvalidationMessage *invalidations)
1789 {
1790         bool            use_subtxn = IsTransactionOrTransactionBlock();
1791         int                     i;
1792
1793         if (use_subtxn)
1794                 BeginInternalSubTransaction("replay");
1795
1796         /*
1797          * Force invalidations to happen outside of a valid transaction - that way
1798          * entries will just be marked as invalid without accessing the catalog.
1799          * That's advantageous because we don't need to setup the full state
1800          * necessary for catalog access.
1801          */
1802         if (use_subtxn)
1803                 AbortCurrentTransaction();
1804
1805         for (i = 0; i < ninvalidations; i++)
1806                 LocalExecuteInvalidationMessage(&invalidations[i]);
1807
1808         if (use_subtxn)
1809                 RollbackAndReleaseCurrentSubTransaction();
1810 }
1811
1812 /*
1813  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
1814  * least once for every xid in XLogRecord->xl_xid (other places in records
1815  * may, but do not have to be passed through here).
1816  *
1817  * Reorderbuffer keeps some datastructures about transactions in LSN order,
1818  * for efficiency. To do that it has to know about when transactions are seen
1819  * first in the WAL. As many types of records are not actually interesting for
1820  * logical decoding, they do not necessarily pass though here.
1821  */
1822 void
1823 ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1824 {
1825         /* many records won't have an xid assigned, centralize check here */
1826         if (xid != InvalidTransactionId)
1827                 ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1828 }
1829
1830 /*
1831  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
1832  * because the previous snapshot doesn't describe the catalog correctly for
1833  * following rows.
1834  */
1835 void
1836 ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
1837                                                  XLogRecPtr lsn, Snapshot snap)
1838 {
1839         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1840
1841         change->data.snapshot = snap;
1842         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
1843
1844         ReorderBufferQueueChange(rb, xid, lsn, change);
1845 }
1846
1847 /*
1848  * Setup the base snapshot of a transaction. The base snapshot is the snapshot
1849  * that is used to decode all changes until either this transaction modifies
1850  * the catalog or another catalog modifying transaction commits.
1851  *
1852  * Needs to be called before any changes are added with
1853  * ReorderBufferQueueChange().
1854  */
1855 void
1856 ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
1857                                                          XLogRecPtr lsn, Snapshot snap)
1858 {
1859         ReorderBufferTXN *txn;
1860         bool            is_new;
1861
1862         txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
1863         Assert(txn->base_snapshot == NULL);
1864         Assert(snap != NULL);
1865
1866         txn->base_snapshot = snap;
1867         txn->base_snapshot_lsn = lsn;
1868 }
1869
1870 /*
1871  * Access the catalog with this CommandId at this point in the changestream.
1872  *
1873  * May only be called for command ids > 1
1874  */
1875 void
1876 ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
1877                                                          XLogRecPtr lsn, CommandId cid)
1878 {
1879         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1880
1881         change->data.command_id = cid;
1882         change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
1883
1884         ReorderBufferQueueChange(rb, xid, lsn, change);
1885 }
1886
1887
1888 /*
1889  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
1890  */
1891 void
1892 ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
1893                                                          XLogRecPtr lsn, RelFileNode node,
1894                                                          ItemPointerData tid, CommandId cmin,
1895                                                          CommandId cmax, CommandId combocid)
1896 {
1897         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1898         ReorderBufferTXN *txn;
1899
1900         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1901
1902         change->data.tuplecid.node = node;
1903         change->data.tuplecid.tid = tid;
1904         change->data.tuplecid.cmin = cmin;
1905         change->data.tuplecid.cmax = cmax;
1906         change->data.tuplecid.combocid = combocid;
1907         change->lsn = lsn;
1908         change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
1909
1910         dlist_push_tail(&txn->tuplecids, &change->node);
1911         txn->ntuplecids++;
1912 }
1913
1914 /*
1915  * Setup the invalidation of the toplevel transaction.
1916  *
1917  * This needs to be done before ReorderBufferCommit is called!
1918  */
1919 void
1920 ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
1921                                                           XLogRecPtr lsn, Size nmsgs,
1922                                                           SharedInvalidationMessage *msgs)
1923 {
1924         ReorderBufferTXN *txn;
1925
1926         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1927
1928         if (txn->ninvalidations != 0)
1929                 elog(ERROR, "only ever add one set of invalidations");
1930
1931         Assert(nmsgs > 0);
1932
1933         txn->ninvalidations = nmsgs;
1934         txn->invalidations = (SharedInvalidationMessage *)
1935                 MemoryContextAlloc(rb->context,
1936                                                    sizeof(SharedInvalidationMessage) * nmsgs);
1937         memcpy(txn->invalidations, msgs,
1938                    sizeof(SharedInvalidationMessage) * nmsgs);
1939 }
1940
1941 /*
1942  * Apply all invalidations we know. Possibly we only need parts at this point
1943  * in the changestream but we don't know which those are.
1944  */
1945 static void
1946 ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
1947 {
1948         int                     i;
1949
1950         for (i = 0; i < txn->ninvalidations; i++)
1951                 LocalExecuteInvalidationMessage(&txn->invalidations[i]);
1952 }
1953
1954 /*
1955  * Mark a transaction as containing catalog changes
1956  */
1957 void
1958 ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
1959                                                                   XLogRecPtr lsn)
1960 {
1961         ReorderBufferTXN *txn;
1962
1963         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1964
1965         txn->has_catalog_changes = true;
1966 }
1967
1968 /*
1969  * Query whether a transaction is already *known* to contain catalog
1970  * changes. This can be wrong until directly before the commit!
1971  */
1972 bool
1973 ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
1974 {
1975         ReorderBufferTXN *txn;
1976
1977         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1978                                                                 false);
1979         if (txn == NULL)
1980                 return false;
1981
1982         return txn->has_catalog_changes;
1983 }
1984
1985 /*
1986  * Have we already added the first snapshot?
1987  */
1988 bool
1989 ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
1990 {
1991         ReorderBufferTXN *txn;
1992
1993         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1994                                                                 false);
1995
1996         /* transaction isn't known yet, ergo no snapshot */
1997         if (txn == NULL)
1998                 return false;
1999
2000         /*
2001          * TODO: It would be a nice improvement if we would check the toplevel
2002          * transaction in subtransactions, but we'd need to keep track of a bit
2003          * more state.
2004          */
2005         return txn->base_snapshot != NULL;
2006 }
2007
2008
2009 /*
2010  * ---------------------------------------
2011  * Disk serialization support
2012  * ---------------------------------------
2013  */
2014
2015 /*
2016  * Ensure the IO buffer is >= sz.
2017  */
2018 static void
2019 ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
2020 {
2021         if (!rb->outbufsize)
2022         {
2023                 rb->outbuf = MemoryContextAlloc(rb->context, sz);
2024                 rb->outbufsize = sz;
2025         }
2026         else if (rb->outbufsize < sz)
2027         {
2028                 rb->outbuf = repalloc(rb->outbuf, sz);
2029                 rb->outbufsize = sz;
2030         }
2031 }
2032
2033 /*
2034  * Check whether the transaction tx should spill its data to disk.
2035  */
2036 static void
2037 ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2038 {
2039         /*
2040          * TODO: improve accounting so we cheaply can take subtransactions into
2041          * account here.
2042          */
2043         if (txn->nentries_mem >= max_changes_in_memory)
2044         {
2045                 ReorderBufferSerializeTXN(rb, txn);
2046                 Assert(txn->nentries_mem == 0);
2047         }
2048 }
2049
2050 /*
2051  * Spill data of a large transaction (and its subtransactions) to disk.
2052  */
2053 static void
2054 ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2055 {
2056         dlist_iter      subtxn_i;
2057         dlist_mutable_iter change_i;
2058         int                     fd = -1;
2059         XLogSegNo       curOpenSegNo = 0;
2060         Size            spilled = 0;
2061         char            path[MAXPGPATH];
2062
2063         elog(DEBUG2, "spill %u changes in XID %u to disk",
2064                  (uint32) txn->nentries_mem, txn->xid);
2065
2066         /* do the same to all child TXs */
2067         dlist_foreach(subtxn_i, &txn->subtxns)
2068         {
2069                 ReorderBufferTXN *subtxn;
2070
2071                 subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2072                 ReorderBufferSerializeTXN(rb, subtxn);
2073         }
2074
2075         /* serialize changestream */
2076         dlist_foreach_modify(change_i, &txn->changes)
2077         {
2078                 ReorderBufferChange *change;
2079
2080                 change = dlist_container(ReorderBufferChange, node, change_i.cur);
2081
2082                 /*
2083                  * store in segment in which it belongs by start lsn, don't split over
2084                  * multiple segments tho
2085                  */
2086                 if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo))
2087                 {
2088                         XLogRecPtr      recptr;
2089
2090                         if (fd != -1)
2091                                 CloseTransientFile(fd);
2092
2093                         XLByteToSeg(change->lsn, curOpenSegNo);
2094                         XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
2095
2096                         /*
2097                          * No need to care about TLIs here, only used during a single run,
2098                          * so each LSN only maps to a specific WAL record.
2099                          */
2100                         sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2101                                         NameStr(MyReplicationSlot->data.name), txn->xid,
2102                                         (uint32) (recptr >> 32), (uint32) recptr);
2103
2104                         /* open segment, create it if necessary */
2105                         fd = OpenTransientFile(path,
2106                                                                    O_CREAT | O_WRONLY | O_APPEND | PG_BINARY,
2107                                                                    S_IRUSR | S_IWUSR);
2108
2109                         if (fd < 0)
2110                                 ereport(ERROR,
2111                                                 (errcode_for_file_access(),
2112                                                  errmsg("could not open file \"%s\": %m",
2113                                                                 path)));
2114                 }
2115
2116                 ReorderBufferSerializeChange(rb, txn, fd, change);
2117                 dlist_delete(&change->node);
2118                 ReorderBufferReturnChange(rb, change);
2119
2120                 spilled++;
2121         }
2122
2123         Assert(spilled == txn->nentries_mem);
2124         Assert(dlist_is_empty(&txn->changes));
2125         txn->nentries_mem = 0;
2126         txn->serialized = true;
2127
2128         if (fd != -1)
2129                 CloseTransientFile(fd);
2130 }
2131
2132 /*
2133  * Serialize individual change to disk.
2134  */
2135 static void
2136 ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2137                                                          int fd, ReorderBufferChange *change)
2138 {
2139         ReorderBufferDiskChange *ondisk;
2140         Size            sz = sizeof(ReorderBufferDiskChange);
2141
2142         ReorderBufferSerializeReserve(rb, sz);
2143
2144         ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2145         memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2146
2147         switch (change->action)
2148         {
2149                         /* fall through these, they're all similar enough */
2150                 case REORDER_BUFFER_CHANGE_INSERT:
2151                 case REORDER_BUFFER_CHANGE_UPDATE:
2152                 case REORDER_BUFFER_CHANGE_DELETE:
2153                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2154                         {
2155                                 char       *data;
2156                                 ReorderBufferTupleBuf *oldtup,
2157                                                    *newtup;
2158                                 Size            oldlen = 0;
2159                                 Size            newlen = 0;
2160
2161                                 oldtup = change->data.tp.oldtuple;
2162                                 newtup = change->data.tp.newtuple;
2163
2164                                 if (oldtup)
2165                                 {
2166                                         sz += sizeof(HeapTupleData);
2167                                         oldlen = oldtup->tuple.t_len;
2168                                         sz += oldlen;
2169                                 }
2170
2171                                 if (newtup)
2172                                 {
2173                                         sz += sizeof(HeapTupleData);
2174                                         newlen = newtup->tuple.t_len;
2175                                         sz += newlen;
2176                                 }
2177
2178                                 /* make sure we have enough space */
2179                                 ReorderBufferSerializeReserve(rb, sz);
2180
2181                                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2182                                 /* might have been reallocated above */
2183                                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2184
2185                                 if (oldlen)
2186                                 {
2187                                         memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2188                                         data += sizeof(HeapTupleData);
2189
2190                                         memcpy(data, oldtup->tuple.t_data, oldlen);
2191                                         data += oldlen;
2192                                 }
2193
2194                                 if (newlen)
2195                                 {
2196                                         memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2197                                         data += sizeof(HeapTupleData);
2198
2199                                         memcpy(data, newtup->tuple.t_data, newlen);
2200                                         data += newlen;
2201                                 }
2202                                 break;
2203                         }
2204                 case REORDER_BUFFER_CHANGE_MESSAGE:
2205                         {
2206                                 char       *data;
2207                                 Size            prefix_size = strlen(change->data.msg.prefix) + 1;
2208
2209                                 sz += prefix_size + change->data.msg.message_size +
2210                                         sizeof(Size) + sizeof(Size);
2211                                 ReorderBufferSerializeReserve(rb, sz);
2212
2213                                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2214
2215                                 /* might have been reallocated above */
2216                                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2217
2218                                 /* write the prefix including the size */
2219                                 memcpy(data, &prefix_size, sizeof(Size));
2220                                 data += sizeof(Size);
2221                                 memcpy(data, change->data.msg.prefix,
2222                                            prefix_size);
2223                                 data += prefix_size;
2224
2225                                 /* write the message including the size */
2226                                 memcpy(data, &change->data.msg.message_size, sizeof(Size));
2227                                 data += sizeof(Size);
2228                                 memcpy(data, change->data.msg.message,
2229                                            change->data.msg.message_size);
2230                                 data += change->data.msg.message_size;
2231
2232                                 break;
2233                         }
2234                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2235                         {
2236                                 Snapshot        snap;
2237                                 char       *data;
2238
2239                                 snap = change->data.snapshot;
2240
2241                                 sz += sizeof(SnapshotData) +
2242                                         sizeof(TransactionId) * snap->xcnt +
2243                                         sizeof(TransactionId) * snap->subxcnt
2244                                         ;
2245
2246                                 /* make sure we have enough space */
2247                                 ReorderBufferSerializeReserve(rb, sz);
2248                                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2249                                 /* might have been reallocated above */
2250                                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2251
2252                                 memcpy(data, snap, sizeof(SnapshotData));
2253                                 data += sizeof(SnapshotData);
2254
2255                                 if (snap->xcnt)
2256                                 {
2257                                         memcpy(data, snap->xip,
2258                                                    sizeof(TransactionId) * snap->xcnt);
2259                                         data += sizeof(TransactionId) * snap->xcnt;
2260                                 }
2261
2262                                 if (snap->subxcnt)
2263                                 {
2264                                         memcpy(data, snap->subxip,
2265                                                    sizeof(TransactionId) * snap->subxcnt);
2266                                         data += sizeof(TransactionId) * snap->subxcnt;
2267                                 }
2268                                 break;
2269                         }
2270                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2271                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2272                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2273                         /* ReorderBufferChange contains everything important */
2274                         break;
2275         }
2276
2277         ondisk->size = sz;
2278
2279         pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
2280         if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2281         {
2282                 int                     save_errno = errno;
2283
2284                 CloseTransientFile(fd);
2285                 errno = save_errno;
2286                 ereport(ERROR,
2287                                 (errcode_for_file_access(),
2288                                  errmsg("could not write to data file for XID %u: %m",
2289                                                 txn->xid)));
2290         }
2291         pgstat_report_wait_end();
2292
2293         Assert(ondisk->change.action == change->action);
2294 }
2295
2296 /*
2297  * Restore a number of changes spilled to disk back into memory.
2298  */
2299 static Size
2300 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2301                                                         int *fd, XLogSegNo *segno)
2302 {
2303         Size            restored = 0;
2304         XLogSegNo       last_segno;
2305         dlist_mutable_iter cleanup_iter;
2306
2307         Assert(txn->first_lsn != InvalidXLogRecPtr);
2308         Assert(txn->final_lsn != InvalidXLogRecPtr);
2309
2310         /* free current entries, so we have memory for more */
2311         dlist_foreach_modify(cleanup_iter, &txn->changes)
2312         {
2313                 ReorderBufferChange *cleanup =
2314                 dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2315
2316                 dlist_delete(&cleanup->node);
2317                 ReorderBufferReturnChange(rb, cleanup);
2318         }
2319         txn->nentries_mem = 0;
2320         Assert(dlist_is_empty(&txn->changes));
2321
2322         XLByteToSeg(txn->final_lsn, last_segno);
2323
2324         while (restored < max_changes_in_memory && *segno <= last_segno)
2325         {
2326                 int                     readBytes;
2327                 ReorderBufferDiskChange *ondisk;
2328
2329                 if (*fd == -1)
2330                 {
2331                         XLogRecPtr      recptr;
2332                         char            path[MAXPGPATH];
2333
2334                         /* first time in */
2335                         if (*segno == 0)
2336                         {
2337                                 XLByteToSeg(txn->first_lsn, *segno);
2338                         }
2339
2340                         Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2341                         XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
2342
2343                         /*
2344                          * No need to care about TLIs here, only used during a single run,
2345                          * so each LSN only maps to a specific WAL record.
2346                          */
2347                         sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2348                                         NameStr(MyReplicationSlot->data.name), txn->xid,
2349                                         (uint32) (recptr >> 32), (uint32) recptr);
2350
2351                         *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2352                         if (*fd < 0 && errno == ENOENT)
2353                         {
2354                                 *fd = -1;
2355                                 (*segno)++;
2356                                 continue;
2357                         }
2358                         else if (*fd < 0)
2359                                 ereport(ERROR,
2360                                                 (errcode_for_file_access(),
2361                                                  errmsg("could not open file \"%s\": %m",
2362                                                                 path)));
2363
2364                 }
2365
2366                 /*
2367                  * Read the statically sized part of a change which has information
2368                  * about the total size. If we couldn't read a record, we're at the
2369                  * end of this file.
2370                  */
2371                 ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
2372                 pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2373                 readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2374                 pgstat_report_wait_end();
2375
2376                 /* eof */
2377                 if (readBytes == 0)
2378                 {
2379                         CloseTransientFile(*fd);
2380                         *fd = -1;
2381                         (*segno)++;
2382                         continue;
2383                 }
2384                 else if (readBytes < 0)
2385                         ereport(ERROR,
2386                                         (errcode_for_file_access(),
2387                                          errmsg("could not read from reorderbuffer spill file: %m")));
2388                 else if (readBytes != sizeof(ReorderBufferDiskChange))
2389                         ereport(ERROR,
2390                                         (errcode_for_file_access(),
2391                                          errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2392                                                         readBytes,
2393                                                         (uint32) sizeof(ReorderBufferDiskChange))));
2394
2395                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2396
2397                 ReorderBufferSerializeReserve(rb,
2398                                                                           sizeof(ReorderBufferDiskChange) + ondisk->size);
2399                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2400
2401                 pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2402                 readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2403                                                  ondisk->size - sizeof(ReorderBufferDiskChange));
2404                 pgstat_report_wait_end();
2405
2406                 if (readBytes < 0)
2407                         ereport(ERROR,
2408                                         (errcode_for_file_access(),
2409                                          errmsg("could not read from reorderbuffer spill file: %m")));
2410                 else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2411                         ereport(ERROR,
2412                                         (errcode_for_file_access(),
2413                                          errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2414                                                         readBytes,
2415                                                         (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2416
2417                 /*
2418                  * ok, read a full change from disk, now restore it into proper
2419                  * in-memory format
2420                  */
2421                 ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2422                 restored++;
2423         }
2424
2425         return restored;
2426 }
2427
2428 /*
2429  * Convert change from its on-disk format to in-memory format and queue it onto
2430  * the TXN's ->changes list.
2431  *
2432  * Note: although "data" is declared char*, at entry it points to a
2433  * maxalign'd buffer, making it safe in most of this function to assume
2434  * that the pointed-to data is suitably aligned for direct access.
2435  */
2436 static void
2437 ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2438                                                    char *data)
2439 {
2440         ReorderBufferDiskChange *ondisk;
2441         ReorderBufferChange *change;
2442
2443         ondisk = (ReorderBufferDiskChange *) data;
2444
2445         change = ReorderBufferGetChange(rb);
2446
2447         /* copy static part */
2448         memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2449
2450         data += sizeof(ReorderBufferDiskChange);
2451
2452         /* restore individual stuff */
2453         switch (change->action)
2454         {
2455                         /* fall through these, they're all similar enough */
2456                 case REORDER_BUFFER_CHANGE_INSERT:
2457                 case REORDER_BUFFER_CHANGE_UPDATE:
2458                 case REORDER_BUFFER_CHANGE_DELETE:
2459                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2460                         if (change->data.tp.oldtuple)
2461                         {
2462                                 uint32          tuplelen = ((HeapTuple) data)->t_len;
2463
2464                                 change->data.tp.oldtuple =
2465                                         ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2466
2467                                 /* restore ->tuple */
2468                                 memcpy(&change->data.tp.oldtuple->tuple, data,
2469                                            sizeof(HeapTupleData));
2470                                 data += sizeof(HeapTupleData);
2471
2472                                 /* reset t_data pointer into the new tuplebuf */
2473                                 change->data.tp.oldtuple->tuple.t_data =
2474                                         ReorderBufferTupleBufData(change->data.tp.oldtuple);
2475
2476                                 /* restore tuple data itself */
2477                                 memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2478                                 data += tuplelen;
2479                         }
2480
2481                         if (change->data.tp.newtuple)
2482                         {
2483                                 /* here, data might not be suitably aligned! */
2484                                 uint32          tuplelen;
2485
2486                                 memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2487                                            sizeof(uint32));
2488
2489                                 change->data.tp.newtuple =
2490                                         ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2491
2492                                 /* restore ->tuple */
2493                                 memcpy(&change->data.tp.newtuple->tuple, data,
2494                                            sizeof(HeapTupleData));
2495                                 data += sizeof(HeapTupleData);
2496
2497                                 /* reset t_data pointer into the new tuplebuf */
2498                                 change->data.tp.newtuple->tuple.t_data =
2499                                         ReorderBufferTupleBufData(change->data.tp.newtuple);
2500
2501                                 /* restore tuple data itself */
2502                                 memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2503                                 data += tuplelen;
2504                         }
2505
2506                         break;
2507                 case REORDER_BUFFER_CHANGE_MESSAGE:
2508                         {
2509                                 Size            prefix_size;
2510
2511                                 /* read prefix */
2512                                 memcpy(&prefix_size, data, sizeof(Size));
2513                                 data += sizeof(Size);
2514                                 change->data.msg.prefix = MemoryContextAlloc(rb->context,
2515                                                                                                                          prefix_size);
2516                                 memcpy(change->data.msg.prefix, data, prefix_size);
2517                                 Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2518                                 data += prefix_size;
2519
2520                                 /* read the message */
2521                                 memcpy(&change->data.msg.message_size, data, sizeof(Size));
2522                                 data += sizeof(Size);
2523                                 change->data.msg.message = MemoryContextAlloc(rb->context,
2524                                                                                                                           change->data.msg.message_size);
2525                                 memcpy(change->data.msg.message, data,
2526                                            change->data.msg.message_size);
2527                                 data += change->data.msg.message_size;
2528
2529                                 break;
2530                         }
2531                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2532                         {
2533                                 Snapshot        oldsnap;
2534                                 Snapshot        newsnap;
2535                                 Size            size;
2536
2537                                 oldsnap = (Snapshot) data;
2538
2539                                 size = sizeof(SnapshotData) +
2540                                         sizeof(TransactionId) * oldsnap->xcnt +
2541                                         sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2542
2543                                 change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2544
2545                                 newsnap = change->data.snapshot;
2546
2547                                 memcpy(newsnap, data, size);
2548                                 newsnap->xip = (TransactionId *)
2549                                         (((char *) newsnap) + sizeof(SnapshotData));
2550                                 newsnap->subxip = newsnap->xip + newsnap->xcnt;
2551                                 newsnap->copied = true;
2552                                 break;
2553                         }
2554                         /* the base struct contains all the data, easy peasy */
2555                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2556                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2557                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2558                         break;
2559         }
2560
2561         dlist_push_tail(&txn->changes, &change->node);
2562         txn->nentries_mem++;
2563 }
2564
2565 /*
2566  * Remove all on-disk stored for the passed in transaction.
2567  */
2568 static void
2569 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
2570 {
2571         XLogSegNo       first;
2572         XLogSegNo       cur;
2573         XLogSegNo       last;
2574
2575         Assert(txn->first_lsn != InvalidXLogRecPtr);
2576         Assert(txn->final_lsn != InvalidXLogRecPtr);
2577
2578         XLByteToSeg(txn->first_lsn, first);
2579         XLByteToSeg(txn->final_lsn, last);
2580
2581         /* iterate over all possible filenames, and delete them */
2582         for (cur = first; cur <= last; cur++)
2583         {
2584                 char            path[MAXPGPATH];
2585                 XLogRecPtr      recptr;
2586
2587                 XLogSegNoOffsetToRecPtr(cur, 0, recptr);
2588
2589                 sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2590                                 NameStr(MyReplicationSlot->data.name), txn->xid,
2591                                 (uint32) (recptr >> 32), (uint32) recptr);
2592                 if (unlink(path) != 0 && errno != ENOENT)
2593                         ereport(ERROR,
2594                                         (errcode_for_file_access(),
2595                                          errmsg("could not remove file \"%s\": %m", path)));
2596         }
2597 }
2598
2599 /*
2600  * Delete all data spilled to disk after we've restarted/crashed. It will be
2601  * recreated when the respective slots are reused.
2602  */
2603 void
2604 StartupReorderBuffer(void)
2605 {
2606         DIR                *logical_dir;
2607         struct dirent *logical_de;
2608
2609         DIR                *spill_dir;
2610         struct dirent *spill_de;
2611
2612         logical_dir = AllocateDir("pg_replslot");
2613         while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2614         {
2615                 struct stat statbuf;
2616                 char            path[MAXPGPATH * 2 + 12];
2617
2618                 if (strcmp(logical_de->d_name, ".") == 0 ||
2619                         strcmp(logical_de->d_name, "..") == 0)
2620                         continue;
2621
2622                 /* if it cannot be a slot, skip the directory */
2623                 if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2624                         continue;
2625
2626                 /*
2627                  * ok, has to be a surviving logical slot, iterate and delete
2628                  * everything starting with xid-*
2629                  */
2630                 sprintf(path, "pg_replslot/%s", logical_de->d_name);
2631
2632                 /* we're only creating directories here, skip if it's not our's */
2633                 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2634                         continue;
2635
2636                 spill_dir = AllocateDir(path);
2637                 while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2638                 {
2639                         if (strcmp(spill_de->d_name, ".") == 0 ||
2640                                 strcmp(spill_de->d_name, "..") == 0)
2641                                 continue;
2642
2643                         /* only look at names that can be ours */
2644                         if (strncmp(spill_de->d_name, "xid", 3) == 0)
2645                         {
2646                                 sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2647                                                 spill_de->d_name);
2648
2649                                 if (unlink(path) != 0)
2650                                         ereport(PANIC,
2651                                                         (errcode_for_file_access(),
2652                                                          errmsg("could not remove file \"%s\": %m",
2653                                                                         path)));
2654                         }
2655                 }
2656                 FreeDir(spill_dir);
2657         }
2658         FreeDir(logical_dir);
2659 }
2660
2661 /* ---------------------------------------
2662  * toast reassembly support
2663  * ---------------------------------------
2664  */
2665
2666 /*
2667  * Initialize per tuple toast reconstruction support.
2668  */
2669 static void
2670 ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
2671 {
2672         HASHCTL         hash_ctl;
2673
2674         Assert(txn->toast_hash == NULL);
2675
2676         memset(&hash_ctl, 0, sizeof(hash_ctl));
2677         hash_ctl.keysize = sizeof(Oid);
2678         hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2679         hash_ctl.hcxt = rb->context;
2680         txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2681                                                                   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
2682 }
2683
2684 /*
2685  * Per toast-chunk handling for toast reconstruction
2686  *
2687  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
2688  * toasted Datum comes along.
2689  */
2690 static void
2691 ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
2692                                                           Relation relation, ReorderBufferChange *change)
2693 {
2694         ReorderBufferToastEnt *ent;
2695         ReorderBufferTupleBuf *newtup;
2696         bool            found;
2697         int32           chunksize;
2698         bool            isnull;
2699         Pointer         chunk;
2700         TupleDesc       desc = RelationGetDescr(relation);
2701         Oid                     chunk_id;
2702         int32           chunk_seq;
2703
2704         if (txn->toast_hash == NULL)
2705                 ReorderBufferToastInitHash(rb, txn);
2706
2707         Assert(IsToastRelation(relation));
2708
2709         newtup = change->data.tp.newtuple;
2710         chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2711         Assert(!isnull);
2712         chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2713         Assert(!isnull);
2714
2715         ent = (ReorderBufferToastEnt *)
2716                 hash_search(txn->toast_hash,
2717                                         (void *) &chunk_id,
2718                                         HASH_ENTER,
2719                                         &found);
2720
2721         if (!found)
2722         {
2723                 Assert(ent->chunk_id == chunk_id);
2724                 ent->num_chunks = 0;
2725                 ent->last_chunk_seq = 0;
2726                 ent->size = 0;
2727                 ent->reconstructed = NULL;
2728                 dlist_init(&ent->chunks);
2729
2730                 if (chunk_seq != 0)
2731                         elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2732                                  chunk_seq, chunk_id);
2733         }
2734         else if (found && chunk_seq != ent->last_chunk_seq + 1)
2735                 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2736                          chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2737
2738         chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2739         Assert(!isnull);
2740
2741         /* calculate size so we can allocate the right size at once later */
2742         if (!VARATT_IS_EXTENDED(chunk))
2743                 chunksize = VARSIZE(chunk) - VARHDRSZ;
2744         else if (VARATT_IS_SHORT(chunk))
2745                 /* could happen due to heap_form_tuple doing its thing */
2746                 chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2747         else
2748                 elog(ERROR, "unexpected type of toast chunk");
2749
2750         ent->size += chunksize;
2751         ent->last_chunk_seq = chunk_seq;
2752         ent->num_chunks++;
2753         dlist_push_tail(&ent->chunks, &change->node);
2754 }
2755
2756 /*
2757  * Rejigger change->newtuple to point to in-memory toast tuples instead to
2758  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
2759  *
2760  * We cannot replace unchanged toast tuples though, so those will still point
2761  * to on-disk toast data.
2762  */
2763 static void
2764 ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
2765                                                   Relation relation, ReorderBufferChange *change)
2766 {
2767         TupleDesc       desc;
2768         int                     natt;
2769         Datum      *attrs;
2770         bool       *isnull;
2771         bool       *free;
2772         HeapTuple       tmphtup;
2773         Relation        toast_rel;
2774         TupleDesc       toast_desc;
2775         MemoryContext oldcontext;
2776         ReorderBufferTupleBuf *newtup;
2777
2778         /* no toast tuples changed */
2779         if (txn->toast_hash == NULL)
2780                 return;
2781
2782         oldcontext = MemoryContextSwitchTo(rb->context);
2783
2784         /* we should only have toast tuples in an INSERT or UPDATE */
2785         Assert(change->data.tp.newtuple);
2786
2787         desc = RelationGetDescr(relation);
2788
2789         toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
2790         toast_desc = RelationGetDescr(toast_rel);
2791
2792         /* should we allocate from stack instead? */
2793         attrs = palloc0(sizeof(Datum) * desc->natts);
2794         isnull = palloc0(sizeof(bool) * desc->natts);
2795         free = palloc0(sizeof(bool) * desc->natts);
2796
2797         newtup = change->data.tp.newtuple;
2798
2799         heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
2800
2801         for (natt = 0; natt < desc->natts; natt++)
2802         {
2803                 Form_pg_attribute attr = desc->attrs[natt];
2804                 ReorderBufferToastEnt *ent;
2805                 struct varlena *varlena;
2806
2807                 /* va_rawsize is the size of the original datum -- including header */
2808                 struct varatt_external toast_pointer;
2809                 struct varatt_indirect redirect_pointer;
2810                 struct varlena *new_datum = NULL;
2811                 struct varlena *reconstructed;
2812                 dlist_iter      it;
2813                 Size            data_done = 0;
2814
2815                 /* system columns aren't toasted */
2816                 if (attr->attnum < 0)
2817                         continue;
2818
2819                 if (attr->attisdropped)
2820                         continue;
2821
2822                 /* not a varlena datatype */
2823                 if (attr->attlen != -1)
2824                         continue;
2825
2826                 /* no data */
2827                 if (isnull[natt])
2828                         continue;
2829
2830                 /* ok, we know we have a toast datum */
2831                 varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
2832
2833                 /* no need to do anything if the tuple isn't external */
2834                 if (!VARATT_IS_EXTERNAL(varlena))
2835                         continue;
2836
2837                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
2838
2839                 /*
2840                  * Check whether the toast tuple changed, replace if so.
2841                  */
2842                 ent = (ReorderBufferToastEnt *)
2843                         hash_search(txn->toast_hash,
2844                                                 (void *) &toast_pointer.va_valueid,
2845                                                 HASH_FIND,
2846                                                 NULL);
2847                 if (ent == NULL)
2848                         continue;
2849
2850                 new_datum =
2851                         (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
2852
2853                 free[natt] = true;
2854
2855                 reconstructed = palloc0(toast_pointer.va_rawsize);
2856
2857                 ent->reconstructed = reconstructed;
2858
2859                 /* stitch toast tuple back together from its parts */
2860                 dlist_foreach(it, &ent->chunks)
2861                 {
2862                         bool            isnull;
2863                         ReorderBufferChange *cchange;
2864                         ReorderBufferTupleBuf *ctup;
2865                         Pointer         chunk;
2866
2867                         cchange = dlist_container(ReorderBufferChange, node, it.cur);
2868                         ctup = cchange->data.tp.newtuple;
2869                         chunk = DatumGetPointer(
2870                                                                         fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
2871
2872                         Assert(!isnull);
2873                         Assert(!VARATT_IS_EXTERNAL(chunk));
2874                         Assert(!VARATT_IS_SHORT(chunk));
2875
2876                         memcpy(VARDATA(reconstructed) + data_done,
2877                                    VARDATA(chunk),
2878                                    VARSIZE(chunk) - VARHDRSZ);
2879                         data_done += VARSIZE(chunk) - VARHDRSZ;
2880                 }
2881                 Assert(data_done == toast_pointer.va_extsize);
2882
2883                 /* make sure its marked as compressed or not */
2884                 if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2885                         SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
2886                 else
2887                         SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
2888
2889                 memset(&redirect_pointer, 0, sizeof(redirect_pointer));
2890                 redirect_pointer.pointer = reconstructed;
2891
2892                 SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
2893                 memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
2894                            sizeof(redirect_pointer));
2895
2896                 attrs[natt] = PointerGetDatum(new_datum);
2897         }
2898
2899         /*
2900          * Build tuple in separate memory & copy tuple back into the tuplebuf
2901          * passed to the output plugin. We can't directly heap_fill_tuple() into
2902          * the tuplebuf because attrs[] will point back into the current content.
2903          */
2904         tmphtup = heap_form_tuple(desc, attrs, isnull);
2905         Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
2906         Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
2907
2908         memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
2909         newtup->tuple.t_len = tmphtup->t_len;
2910
2911         /*
2912          * free resources we won't further need, more persistent stuff will be
2913          * free'd in ReorderBufferToastReset().
2914          */
2915         RelationClose(toast_rel);
2916         pfree(tmphtup);
2917         for (natt = 0; natt < desc->natts; natt++)
2918         {
2919                 if (free[natt])
2920                         pfree(DatumGetPointer(attrs[natt]));
2921         }
2922         pfree(attrs);
2923         pfree(free);
2924         pfree(isnull);
2925
2926         MemoryContextSwitchTo(oldcontext);
2927 }
2928
2929 /*
2930  * Free all resources allocated for toast reconstruction.
2931  */
2932 static void
2933 ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
2934 {
2935         HASH_SEQ_STATUS hstat;
2936         ReorderBufferToastEnt *ent;
2937
2938         if (txn->toast_hash == NULL)
2939                 return;
2940
2941         /* sequentially walk over the hash and free everything */
2942         hash_seq_init(&hstat, txn->toast_hash);
2943         while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
2944         {
2945                 dlist_mutable_iter it;
2946
2947                 if (ent->reconstructed != NULL)
2948                         pfree(ent->reconstructed);
2949
2950                 dlist_foreach_modify(it, &ent->chunks)
2951                 {
2952                         ReorderBufferChange *change =
2953                         dlist_container(ReorderBufferChange, node, it.cur);
2954
2955                         dlist_delete(&change->node);
2956                         ReorderBufferReturnChange(rb, change);
2957                 }
2958         }
2959
2960         hash_destroy(txn->toast_hash);
2961         txn->toast_hash = NULL;
2962 }
2963
2964
2965 /* ---------------------------------------
2966  * Visibility support for logical decoding
2967  *
2968  *
2969  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
2970  * always rely on stored cmin/cmax values because of two scenarios:
2971  *
2972  * * A tuple got changed multiple times during a single transaction and thus
2973  *       has got a combocid. Combocid's are only valid for the duration of a
2974  *       single transaction.
2975  * * A tuple with a cmin but no cmax (and thus no combocid) got
2976  *       deleted/updated in another transaction than the one which created it
2977  *       which we are looking at right now. As only one of cmin, cmax or combocid
2978  *       is actually stored in the heap we don't have access to the value we
2979  *       need anymore.
2980  *
2981  * To resolve those problems we have a per-transaction hash of (cmin,
2982  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
2983  * (cmin, cmax) values. That also takes care of combocids by simply
2984  * not caring about them at all. As we have the real cmin/cmax values
2985  * combocids aren't interesting.
2986  *
2987  * As we only care about catalog tuples here the overhead of this
2988  * hashtable should be acceptable.
2989  *
2990  * Heap rewrites complicate this a bit, check rewriteheap.c for
2991  * details.
2992  * -------------------------------------------------------------------------
2993  */
2994
2995 /* struct for qsort()ing mapping files by lsn somewhat efficiently */
2996 typedef struct RewriteMappingFile
2997 {
2998         XLogRecPtr      lsn;
2999         char            fname[MAXPGPATH];
3000 } RewriteMappingFile;
3001
3002 #if NOT_USED
3003 static void
3004 DisplayMapping(HTAB *tuplecid_data)
3005 {
3006         HASH_SEQ_STATUS hstat;
3007         ReorderBufferTupleCidEnt *ent;
3008
3009         hash_seq_init(&hstat, tuplecid_data);
3010         while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
3011         {
3012                 elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
3013                          ent->key.relnode.dbNode,
3014                          ent->key.relnode.spcNode,
3015                          ent->key.relnode.relNode,
3016                          ItemPointerGetBlockNumber(&ent->key.tid),
3017                          ItemPointerGetOffsetNumber(&ent->key.tid),
3018                          ent->cmin,
3019                          ent->cmax
3020                         );
3021         }
3022 }
3023 #endif
3024
3025 /*
3026  * Apply a single mapping file to tuplecid_data.
3027  *
3028  * The mapping file has to have been verified to be a) committed b) for our
3029  * transaction c) applied in LSN order.
3030  */
3031 static void
3032 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
3033 {
3034         char            path[MAXPGPATH];
3035         int                     fd;
3036         int                     readBytes;
3037         LogicalRewriteMappingData map;
3038
3039         sprintf(path, "pg_logical/mappings/%s", fname);
3040         fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
3041         if (fd < 0)
3042                 ereport(ERROR,
3043                                 (errcode_for_file_access(),
3044                                  errmsg("could not open file \"%s\": %m", path)));
3045
3046         while (true)
3047         {
3048                 ReorderBufferTupleCidKey key;
3049                 ReorderBufferTupleCidEnt *ent;
3050                 ReorderBufferTupleCidEnt *new_ent;
3051                 bool            found;
3052
3053                 /* be careful about padding */
3054                 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3055
3056                 /* read all mappings till the end of the file */
3057                 pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
3058                 readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3059                 pgstat_report_wait_end();
3060
3061                 if (readBytes < 0)
3062                         ereport(ERROR,
3063                                         (errcode_for_file_access(),
3064                                          errmsg("could not read file \"%s\": %m",
3065                                                         path)));
3066                 else if (readBytes == 0)        /* EOF */
3067                         break;
3068                 else if (readBytes != sizeof(LogicalRewriteMappingData))
3069                         ereport(ERROR,
3070                                         (errcode_for_file_access(),
3071                                          errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3072                                                         path, readBytes,
3073                                                         (int32) sizeof(LogicalRewriteMappingData))));
3074
3075                 key.relnode = map.old_node;
3076                 ItemPointerCopy(&map.old_tid,
3077                                                 &key.tid);
3078
3079
3080                 ent = (ReorderBufferTupleCidEnt *)
3081                         hash_search(tuplecid_data,
3082                                                 (void *) &key,
3083                                                 HASH_FIND,
3084                                                 NULL);
3085
3086                 /* no existing mapping, no need to update */
3087                 if (!ent)
3088                         continue;
3089
3090                 key.relnode = map.new_node;
3091                 ItemPointerCopy(&map.new_tid,
3092                                                 &key.tid);
3093
3094                 new_ent = (ReorderBufferTupleCidEnt *)
3095                         hash_search(tuplecid_data,
3096                                                 (void *) &key,
3097                                                 HASH_ENTER,
3098                                                 &found);
3099
3100                 if (found)
3101                 {
3102                         /*
3103                          * Make sure the existing mapping makes sense. We sometime update
3104                          * old records that did not yet have a cmax (e.g. pg_class' own
3105                          * entry while rewriting it) during rewrites, so allow that.
3106                          */
3107                         Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3108                         Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3109                 }
3110                 else
3111                 {
3112                         /* update mapping */
3113                         new_ent->cmin = ent->cmin;
3114                         new_ent->cmax = ent->cmax;
3115                         new_ent->combocid = ent->combocid;
3116                 }
3117         }
3118 }
3119
3120
3121 /*
3122  * Check whether the TransactionOId 'xid' is in the pre-sorted array 'xip'.
3123  */
3124 static bool
3125 TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
3126 {
3127         return bsearch(&xid, xip, num,
3128                                    sizeof(TransactionId), xidComparator) != NULL;
3129 }
3130
3131 /*
3132  * qsort() comparator for sorting RewriteMappingFiles in LSN order.
3133  */
3134 static int
3135 file_sort_by_lsn(const void *a_p, const void *b_p)
3136 {
3137         RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
3138         RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
3139
3140         if (a->lsn < b->lsn)
3141                 return -1;
3142         else if (a->lsn > b->lsn)
3143                 return 1;
3144         return 0;
3145 }
3146
3147 /*
3148  * Apply any existing logical remapping files if there are any targeted at our
3149  * transaction for relid.
3150  */
3151 static void
3152 UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
3153 {
3154         DIR                *mapping_dir;
3155         struct dirent *mapping_de;
3156         List       *files = NIL;
3157         ListCell   *file;
3158         RewriteMappingFile **files_a;
3159         size_t          off;
3160         Oid                     dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
3161
3162         mapping_dir = AllocateDir("pg_logical/mappings");
3163         while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
3164         {
3165                 Oid                     f_dboid;
3166                 Oid                     f_relid;
3167                 TransactionId f_mapped_xid;
3168                 TransactionId f_create_xid;
3169                 XLogRecPtr      f_lsn;
3170                 uint32          f_hi,
3171                                         f_lo;
3172                 RewriteMappingFile *f;
3173
3174                 if (strcmp(mapping_de->d_name, ".") == 0 ||
3175                         strcmp(mapping_de->d_name, "..") == 0)
3176                         continue;
3177
3178                 /* Ignore files that aren't ours */
3179                 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
3180                         continue;
3181
3182                 if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
3183                                    &f_dboid, &f_relid, &f_hi, &f_lo,
3184                                    &f_mapped_xid, &f_create_xid) != 6)
3185                         elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
3186
3187                 f_lsn = ((uint64) f_hi) << 32 | f_lo;
3188
3189                 /* mapping for another database */
3190                 if (f_dboid != dboid)
3191                         continue;
3192
3193                 /* mapping for another relation */
3194                 if (f_relid != relid)
3195                         continue;
3196
3197                 /* did the creating transaction abort? */
3198                 if (!TransactionIdDidCommit(f_create_xid))
3199                         continue;
3200
3201                 /* not for our transaction */
3202                 if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
3203                         continue;
3204
3205                 /* ok, relevant, queue for apply */
3206                 f = palloc(sizeof(RewriteMappingFile));
3207                 f->lsn = f_lsn;
3208                 strcpy(f->fname, mapping_de->d_name);
3209                 files = lappend(files, f);
3210         }
3211         FreeDir(mapping_dir);
3212
3213         /* build array we can easily sort */
3214         files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
3215         off = 0;
3216         foreach(file, files)
3217         {
3218                 files_a[off++] = lfirst(file);
3219         }
3220
3221         /* sort files so we apply them in LSN order */
3222         qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
3223                   file_sort_by_lsn);
3224
3225         for (off = 0; off < list_length(files); off++)
3226         {
3227                 RewriteMappingFile *f = files_a[off];
3228
3229                 elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
3230                          snapshot->subxip[0]);
3231                 ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
3232                 pfree(f);
3233         }
3234 }
3235
3236 /*
3237  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
3238  * combocids.
3239  */
3240 bool
3241 ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
3242                                                           Snapshot snapshot,
3243                                                           HeapTuple htup, Buffer buffer,
3244                                                           CommandId *cmin, CommandId *cmax)
3245 {
3246         ReorderBufferTupleCidKey key;
3247         ReorderBufferTupleCidEnt *ent;
3248         ForkNumber      forkno;
3249         BlockNumber blockno;
3250         bool            updated_mapping = false;
3251
3252         /* be careful about padding */
3253         memset(&key, 0, sizeof(key));
3254
3255         Assert(!BufferIsLocal(buffer));
3256
3257         /*
3258          * get relfilenode from the buffer, no convenient way to access it other
3259          * than that.
3260          */
3261         BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3262
3263         /* tuples can only be in the main fork */
3264         Assert(forkno == MAIN_FORKNUM);
3265         Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3266
3267         ItemPointerCopy(&htup->t_self,
3268                                         &key.tid);
3269
3270 restart:
3271         ent = (ReorderBufferTupleCidEnt *)
3272                 hash_search(tuplecid_data,
3273                                         (void *) &key,
3274                                         HASH_FIND,
3275                                         NULL);
3276
3277         /*
3278          * failed to find a mapping, check whether the table was rewritten and
3279          * apply mapping if so, but only do that once - there can be no new
3280          * mappings while we are in here since we have to hold a lock on the
3281          * relation.
3282          */
3283         if (ent == NULL && !updated_mapping)
3284         {
3285                 UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3286                 /* now check but don't update for a mapping again */
3287                 updated_mapping = true;
3288                 goto restart;
3289         }
3290         else if (ent == NULL)
3291                 return false;
3292
3293         if (cmin)
3294                 *cmin = ent->cmin;
3295         if (cmax)
3296                 *cmax = ent->cmax;
3297         return true;
3298 }