]> granicus.if.org Git - postgresql/blob - src/backend/replication/logical/reorderbuffer.c
Add missing apostrophe.
[postgresql] / src / backend / replication / logical / reorderbuffer.c
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  *        PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  *        This module gets handed individual pieces of transactions in the order
15  *        they are written to the WAL and is responsible to reassemble them into
16  *        toplevel transaction sized pieces. When a transaction is completely
17  *        reassembled - signalled by reading the transaction commit record - it
18  *        will then call the output plugin (c.f. ReorderBufferCommit()) with the
19  *        individual changes. The output plugins rely on snapshots built by
20  *        snapbuild.c which hands them to us.
21  *
22  *        Transactions and subtransactions/savepoints in postgres are not
23  *        immediately linked to each other from outside the performing
24  *        backend. Only at commit/abort (or special xact_assignment records) they
25  *        are linked together. Which means that we will have to splice together a
26  *        toplevel transaction from its subtransactions. To do that efficiently we
27  *        build a binary heap indexed by the smallest current lsn of the individual
28  *        subtransactions' changestreams. As the individual streams are inherently
29  *        ordered by LSN - since that is where we build them from - the transaction
30  *        can easily be reassembled by always using the subtransaction with the
31  *        smallest current LSN from the heap.
32  *
33  *        In order to cope with large transactions - which can be several times as
34  *        big as the available memory - this module supports spooling the contents
35  *        of a large transactions to disk. When the transaction is replayed the
36  *        contents of individual (sub-)transactions will be read from disk in
37  *        chunks.
38  *
39  *        This module also has to deal with reassembling toast records from the
40  *        individual chunks stored in WAL. When a new (or initial) version of a
41  *        tuple is stored in WAL it will always be preceded by the toast chunks
42  *        emitted for the columns stored out of line. Within a single toplevel
43  *        transaction there will be no other data carrying records between a row's
44  *        toast chunks and the row data itself. See ReorderBufferToast* for
45  *        details.
46  * -------------------------------------------------------------------------
47  */
48 #include "postgres.h"
49
50 #include <unistd.h>
51 #include <sys/stat.h>
52
53 #include "access/rewriteheap.h"
54 #include "access/transam.h"
55 #include "access/tuptoaster.h"
56 #include "access/xact.h"
57 #include "access/xlog_internal.h"
58 #include "catalog/catalog.h"
59 #include "lib/binaryheap.h"
60 #include "miscadmin.h"
61 #include "pgstat.h"
62 #include "replication/logical.h"
63 #include "replication/reorderbuffer.h"
64 #include "replication/slot.h"
65 #include "replication/snapbuild.h"              /* just for SnapBuildSnapDecRefcount */
66 #include "storage/bufmgr.h"
67 #include "storage/fd.h"
68 #include "storage/sinval.h"
69 #include "utils/builtins.h"
70 #include "utils/combocid.h"
71 #include "utils/memdebug.h"
72 #include "utils/memutils.h"
73 #include "utils/rel.h"
74 #include "utils/relfilenodemap.h"
75 #include "utils/tqual.h"
76
77
78 /* entry for a hash table we use to map from xid to our transaction state */
79 typedef struct ReorderBufferTXNByIdEnt
80 {
81         TransactionId xid;
82         ReorderBufferTXN *txn;
83 } ReorderBufferTXNByIdEnt;
84
85 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
86 typedef struct ReorderBufferTupleCidKey
87 {
88         RelFileNode relnode;
89         ItemPointerData tid;
90 } ReorderBufferTupleCidKey;
91
92 typedef struct ReorderBufferTupleCidEnt
93 {
94         ReorderBufferTupleCidKey key;
95         CommandId       cmin;
96         CommandId       cmax;
97         CommandId       combocid;               /* just for debugging */
98 } ReorderBufferTupleCidEnt;
99
100 /* k-way in-order change iteration support structures */
101 typedef struct ReorderBufferIterTXNEntry
102 {
103         XLogRecPtr      lsn;
104         ReorderBufferChange *change;
105         ReorderBufferTXN *txn;
106         int                     fd;
107         XLogSegNo       segno;
108 } ReorderBufferIterTXNEntry;
109
110 typedef struct ReorderBufferIterTXNState
111 {
112         binaryheap *heap;
113         Size            nr_txns;
114         dlist_head      old_change;
115         ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
116 } ReorderBufferIterTXNState;
117
118 /* toast datastructures */
119 typedef struct ReorderBufferToastEnt
120 {
121         Oid                     chunk_id;               /* toast_table.chunk_id */
122         int32           last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
123                                                                  * have seen */
124         Size            num_chunks;             /* number of chunks we've already seen */
125         Size            size;                   /* combined size of chunks seen */
126         dlist_head      chunks;                 /* linked list of chunks */
127         struct varlena *reconstructed;          /* reconstructed varlena now pointed
128                                                                                  * to in main tup */
129 } ReorderBufferToastEnt;
130
131 /* Disk serialization support datastructures */
132 typedef struct ReorderBufferDiskChange
133 {
134         Size            size;
135         ReorderBufferChange change;
136         /* data follows */
137 } ReorderBufferDiskChange;
138
139 /*
140  * Maximum number of changes kept in memory, per transaction. After that,
141  * changes are spooled to disk.
142  *
143  * The current value should be sufficient to decode the entire transaction
144  * without hitting disk in OLTP workloads, while starting to spool to disk in
145  * other workloads reasonably fast.
146  *
147  * At some point in the future it probably makes sense to have a more elaborate
148  * resource management here, but it's not entirely clear what that would look
149  * like.
150  */
151 static const Size max_changes_in_memory = 4096;
152
153 /*
154  * We use a very simple form of a slab allocator for frequently allocated
155  * objects, simply keeping a fixed number in a linked list when unused,
156  * instead pfree()ing them. Without that in many workloads aset.c becomes a
157  * major bottleneck, especially when spilling to disk while decoding batch
158  * workloads.
159  */
160 static const Size max_cached_tuplebufs = 4096 * 2;              /* ~8MB */
161
162 /* ---------------------------------------
163  * primary reorderbuffer support routines
164  * ---------------------------------------
165  */
166 static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
167 static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
168 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
169                                           TransactionId xid, bool create, bool *is_new,
170                                           XLogRecPtr lsn, bool create_as_top);
171
172 static void AssertTXNLsnOrder(ReorderBuffer *rb);
173
174 /* ---------------------------------------
175  * support functions for lsn-order iterating over the ->changes of a
176  * transaction and its subtransactions
177  *
178  * used for iteration over the k-way heap merge of a transaction and its
179  * subtransactions
180  * ---------------------------------------
181  */
182 static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn);
183 static ReorderBufferChange *
184                         ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
185 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
186                                                    ReorderBufferIterTXNState *state);
187 static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
188
189 /*
190  * ---------------------------------------
191  * Disk serialization support functions
192  * ---------------------------------------
193  */
194 static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
195 static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
196 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
197                                                          int fd, ReorderBufferChange *change);
198 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
199                                                         int *fd, XLogSegNo *segno);
200 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
201                                                    char *change);
202 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
203
204 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
205 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
206                                           ReorderBufferTXN *txn, CommandId cid);
207
208 /* ---------------------------------------
209  * toast reassembly support
210  * ---------------------------------------
211  */
212 static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
213 static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
214 static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
215                                                   Relation relation, ReorderBufferChange *change);
216 static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
217                                                           Relation relation, ReorderBufferChange *change);
218
219
220 /*
221  * Allocate a new ReorderBuffer
222  */
223 ReorderBuffer *
224 ReorderBufferAllocate(void)
225 {
226         ReorderBuffer *buffer;
227         HASHCTL         hash_ctl;
228         MemoryContext new_ctx;
229
230         /* allocate memory in own context, to have better accountability */
231         new_ctx = AllocSetContextCreate(CurrentMemoryContext,
232                                                                         "ReorderBuffer",
233                                                                         ALLOCSET_DEFAULT_SIZES);
234
235         buffer =
236                 (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
237
238         memset(&hash_ctl, 0, sizeof(hash_ctl));
239
240         buffer->context = new_ctx;
241
242         buffer->change_context = SlabContextCreate(new_ctx,
243                                                                                            "Change",
244                                                                                            SLAB_DEFAULT_BLOCK_SIZE,
245                                                                                            sizeof(ReorderBufferChange));
246
247         buffer->txn_context = SlabContextCreate(new_ctx,
248                                                                                         "TXN",
249                                                                                         SLAB_DEFAULT_BLOCK_SIZE,
250                                                                                         sizeof(ReorderBufferTXN));
251
252         hash_ctl.keysize = sizeof(TransactionId);
253         hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
254         hash_ctl.hcxt = buffer->context;
255
256         buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
257                                                                  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
258
259         buffer->by_txn_last_xid = InvalidTransactionId;
260         buffer->by_txn_last_txn = NULL;
261
262         buffer->nr_cached_tuplebufs = 0;
263
264         buffer->outbuf = NULL;
265         buffer->outbufsize = 0;
266
267         buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
268
269         dlist_init(&buffer->toplevel_by_lsn);
270         slist_init(&buffer->cached_tuplebufs);
271
272         return buffer;
273 }
274
275 /*
276  * Free a ReorderBuffer
277  */
278 void
279 ReorderBufferFree(ReorderBuffer *rb)
280 {
281         MemoryContext context = rb->context;
282
283         /*
284          * We free separately allocated data by entirely scrapping reorderbuffer's
285          * memory context.
286          */
287         MemoryContextDelete(context);
288 }
289
290 /*
291  * Get an unused, possibly preallocated, ReorderBufferTXN.
292  */
293 static ReorderBufferTXN *
294 ReorderBufferGetTXN(ReorderBuffer *rb)
295 {
296         ReorderBufferTXN *txn;
297
298         txn = (ReorderBufferTXN *)
299                 MemoryContextAlloc(rb->txn_context, sizeof(ReorderBufferTXN));
300
301         memset(txn, 0, sizeof(ReorderBufferTXN));
302
303         dlist_init(&txn->changes);
304         dlist_init(&txn->tuplecids);
305         dlist_init(&txn->subtxns);
306
307         return txn;
308 }
309
310 /*
311  * Free a ReorderBufferTXN.
312  *
313  * Deallocation might be delayed for efficiency purposes, for details check
314  * the comments above max_cached_changes's definition.
315  */
316 static void
317 ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
318 {
319         /* clean the lookup cache if we were cached (quite likely) */
320         if (rb->by_txn_last_xid == txn->xid)
321         {
322                 rb->by_txn_last_xid = InvalidTransactionId;
323                 rb->by_txn_last_txn = NULL;
324         }
325
326         /* free data that's contained */
327
328         if (txn->tuplecid_hash != NULL)
329         {
330                 hash_destroy(txn->tuplecid_hash);
331                 txn->tuplecid_hash = NULL;
332         }
333
334         if (txn->invalidations)
335         {
336                 pfree(txn->invalidations);
337                 txn->invalidations = NULL;
338         }
339
340         pfree(txn);
341 }
342
343 /*
344  * Get an unused, possibly preallocated, ReorderBufferChange.
345  */
346 ReorderBufferChange *
347 ReorderBufferGetChange(ReorderBuffer *rb)
348 {
349         ReorderBufferChange *change;
350
351         change = (ReorderBufferChange *)
352                 MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
353
354         memset(change, 0, sizeof(ReorderBufferChange));
355         return change;
356 }
357
358 /*
359  * Free an ReorderBufferChange.
360  *
361  * Deallocation might be delayed for efficiency purposes, for details check
362  * the comments above max_cached_changes's definition.
363  */
364 void
365 ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
366 {
367         /* free contained data */
368         switch (change->action)
369         {
370                 case REORDER_BUFFER_CHANGE_INSERT:
371                 case REORDER_BUFFER_CHANGE_UPDATE:
372                 case REORDER_BUFFER_CHANGE_DELETE:
373                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
374                         if (change->data.tp.newtuple)
375                         {
376                                 ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
377                                 change->data.tp.newtuple = NULL;
378                         }
379
380                         if (change->data.tp.oldtuple)
381                         {
382                                 ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
383                                 change->data.tp.oldtuple = NULL;
384                         }
385                         break;
386                 case REORDER_BUFFER_CHANGE_MESSAGE:
387                         if (change->data.msg.prefix != NULL)
388                                 pfree(change->data.msg.prefix);
389                         change->data.msg.prefix = NULL;
390                         if (change->data.msg.message != NULL)
391                                 pfree(change->data.msg.message);
392                         change->data.msg.message = NULL;
393                         break;
394                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
395                         if (change->data.snapshot)
396                         {
397                                 ReorderBufferFreeSnap(rb, change->data.snapshot);
398                                 change->data.snapshot = NULL;
399                         }
400                         break;
401                         /* no data in addition to the struct itself */
402                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
403                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
404                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
405                         break;
406         }
407
408         pfree(change);
409 }
410
411 /*
412  * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at
413  * least a tuple of size tuple_len (excluding header overhead).
414  */
415 ReorderBufferTupleBuf *
416 ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
417 {
418         ReorderBufferTupleBuf *tuple;
419         Size            alloc_len;
420
421         alloc_len = tuple_len + SizeofHeapTupleHeader;
422
423         /*
424          * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
425          * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
426          * generated for oldtuples can be bigger, as they don't have out-of-line
427          * toast columns.
428          */
429         if (alloc_len < MaxHeapTupleSize)
430                 alloc_len = MaxHeapTupleSize;
431
432
433         /* if small enough, check the slab cache */
434         if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
435         {
436                 rb->nr_cached_tuplebufs--;
437                 tuple = slist_container(ReorderBufferTupleBuf, node,
438                                                                 slist_pop_head_node(&rb->cached_tuplebufs));
439                 Assert(tuple->alloc_tuple_size == MaxHeapTupleSize);
440 #ifdef USE_ASSERT_CHECKING
441                 memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
442                 VALGRIND_MAKE_MEM_UNDEFINED(&tuple->tuple, sizeof(HeapTupleData));
443 #endif
444                 tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
445 #ifdef USE_ASSERT_CHECKING
446                 memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
447                 VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
448 #endif
449         }
450         else
451         {
452                 tuple = (ReorderBufferTupleBuf *)
453                         MemoryContextAlloc(rb->context,
454                                                            sizeof(ReorderBufferTupleBuf) +
455                                                            MAXIMUM_ALIGNOF + alloc_len);
456                 tuple->alloc_tuple_size = alloc_len;
457                 tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
458         }
459
460         return tuple;
461 }
462
463 /*
464  * Free an ReorderBufferTupleBuf.
465  *
466  * Deallocation might be delayed for efficiency purposes, for details check
467  * the comments above max_cached_changes's definition.
468  */
469 void
470 ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
471 {
472         /* check whether to put into the slab cache, oversized tuples never are */
473         if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
474                 rb->nr_cached_tuplebufs < max_cached_tuplebufs)
475         {
476                 rb->nr_cached_tuplebufs++;
477                 slist_push_head(&rb->cached_tuplebufs, &tuple->node);
478                 VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
479                 VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf));
480                 VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
481                 VALGRIND_MAKE_MEM_DEFINED(&tuple->alloc_tuple_size, sizeof(tuple->alloc_tuple_size));
482         }
483         else
484         {
485                 pfree(tuple);
486         }
487 }
488
489 /*
490  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
491  * If create is true, and a transaction doesn't already exist, create it
492  * (with the given LSN, and as top transaction if that's specified);
493  * when this happens, is_new is set to true.
494  */
495 static ReorderBufferTXN *
496 ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
497                                           bool *is_new, XLogRecPtr lsn, bool create_as_top)
498 {
499         ReorderBufferTXN *txn;
500         ReorderBufferTXNByIdEnt *ent;
501         bool            found;
502
503         Assert(TransactionIdIsValid(xid));
504         Assert(!create || lsn != InvalidXLogRecPtr);
505
506         /*
507          * Check the one-entry lookup cache first
508          */
509         if (TransactionIdIsValid(rb->by_txn_last_xid) &&
510                 rb->by_txn_last_xid == xid)
511         {
512                 txn = rb->by_txn_last_txn;
513
514                 if (txn != NULL)
515                 {
516                         /* found it, and it's valid */
517                         if (is_new)
518                                 *is_new = false;
519                         return txn;
520                 }
521
522                 /*
523                  * cached as non-existent, and asked not to create? Then nothing else
524                  * to do.
525                  */
526                 if (!create)
527                         return NULL;
528                 /* otherwise fall through to create it */
529         }
530
531         /*
532          * If the cache wasn't hit or it yielded an "does-not-exist" and we want
533          * to create an entry.
534          */
535
536         /* search the lookup table */
537         ent = (ReorderBufferTXNByIdEnt *)
538                 hash_search(rb->by_txn,
539                                         (void *) &xid,
540                                         create ? HASH_ENTER : HASH_FIND,
541                                         &found);
542         if (found)
543                 txn = ent->txn;
544         else if (create)
545         {
546                 /* initialize the new entry, if creation was requested */
547                 Assert(ent != NULL);
548
549                 ent->txn = ReorderBufferGetTXN(rb);
550                 ent->txn->xid = xid;
551                 txn = ent->txn;
552                 txn->first_lsn = lsn;
553                 txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
554
555                 if (create_as_top)
556                 {
557                         dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
558                         AssertTXNLsnOrder(rb);
559                 }
560         }
561         else
562                 txn = NULL;                             /* not found and not asked to create */
563
564         /* update cache */
565         rb->by_txn_last_xid = xid;
566         rb->by_txn_last_txn = txn;
567
568         if (is_new)
569                 *is_new = !found;
570
571         Assert(!create || txn != NULL);
572         return txn;
573 }
574
575 /*
576  * Queue a change into a transaction so it can be replayed upon commit.
577  */
578 void
579 ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
580                                                  ReorderBufferChange *change)
581 {
582         ReorderBufferTXN *txn;
583
584         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
585
586         change->lsn = lsn;
587         Assert(InvalidXLogRecPtr != lsn);
588         dlist_push_tail(&txn->changes, &change->node);
589         txn->nentries++;
590         txn->nentries_mem++;
591
592         ReorderBufferCheckSerializeTXN(rb, txn);
593 }
594
595 /*
596  * Queue message into a transaction so it can be processed upon commit.
597  */
598 void
599 ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
600                                                   Snapshot snapshot, XLogRecPtr lsn,
601                                                   bool transactional, const char *prefix,
602                                                   Size message_size, const char *message)
603 {
604         if (transactional)
605         {
606                 MemoryContext oldcontext;
607                 ReorderBufferChange *change;
608
609                 Assert(xid != InvalidTransactionId);
610
611                 oldcontext = MemoryContextSwitchTo(rb->context);
612
613                 change = ReorderBufferGetChange(rb);
614                 change->action = REORDER_BUFFER_CHANGE_MESSAGE;
615                 change->data.msg.prefix = pstrdup(prefix);
616                 change->data.msg.message_size = message_size;
617                 change->data.msg.message = palloc(message_size);
618                 memcpy(change->data.msg.message, message, message_size);
619
620                 ReorderBufferQueueChange(rb, xid, lsn, change);
621
622                 MemoryContextSwitchTo(oldcontext);
623         }
624         else
625         {
626                 ReorderBufferTXN *txn = NULL;
627                 volatile Snapshot snapshot_now = snapshot;
628
629                 if (xid != InvalidTransactionId)
630                         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
631
632                 /* setup snapshot to allow catalog access */
633                 SetupHistoricSnapshot(snapshot_now, NULL);
634                 PG_TRY();
635                 {
636                         rb->message(rb, txn, lsn, false, prefix, message_size, message);
637
638                         TeardownHistoricSnapshot(false);
639                 }
640                 PG_CATCH();
641                 {
642                         TeardownHistoricSnapshot(true);
643                         PG_RE_THROW();
644                 }
645                 PG_END_TRY();
646         }
647 }
648
649
650 static void
651 AssertTXNLsnOrder(ReorderBuffer *rb)
652 {
653 #ifdef USE_ASSERT_CHECKING
654         dlist_iter      iter;
655         XLogRecPtr      prev_first_lsn = InvalidXLogRecPtr;
656
657         dlist_foreach(iter, &rb->toplevel_by_lsn)
658         {
659                 ReorderBufferTXN *cur_txn;
660
661                 cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
662                 Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
663
664                 if (cur_txn->end_lsn != InvalidXLogRecPtr)
665                         Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
666
667                 if (prev_first_lsn != InvalidXLogRecPtr)
668                         Assert(prev_first_lsn < cur_txn->first_lsn);
669
670                 Assert(!cur_txn->is_known_as_subxact);
671                 prev_first_lsn = cur_txn->first_lsn;
672         }
673 #endif
674 }
675
676 ReorderBufferTXN *
677 ReorderBufferGetOldestTXN(ReorderBuffer *rb)
678 {
679         ReorderBufferTXN *txn;
680
681         if (dlist_is_empty(&rb->toplevel_by_lsn))
682                 return NULL;
683
684         AssertTXNLsnOrder(rb);
685
686         txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
687
688         Assert(!txn->is_known_as_subxact);
689         Assert(txn->first_lsn != InvalidXLogRecPtr);
690         return txn;
691 }
692
693 void
694 ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
695 {
696         rb->current_restart_decoding_lsn = ptr;
697 }
698
699 void
700 ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
701                                                  TransactionId subxid, XLogRecPtr lsn)
702 {
703         ReorderBufferTXN *txn;
704         ReorderBufferTXN *subtxn;
705         bool            new_top;
706         bool            new_sub;
707
708         txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
709         subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
710
711         if (new_sub)
712         {
713                 /*
714                  * we assign subtransactions to top level transaction even if we don't
715                  * have data for it yet, assignment records frequently reference xids
716                  * that have not yet produced any records. Knowing those aren't top
717                  * level xids allows us to make processing cheaper in some places.
718                  */
719                 dlist_push_tail(&txn->subtxns, &subtxn->node);
720                 txn->nsubtxns++;
721         }
722         else if (!subtxn->is_known_as_subxact)
723         {
724                 subtxn->is_known_as_subxact = true;
725                 Assert(subtxn->nsubtxns == 0);
726
727                 /* remove from lsn order list of top-level transactions */
728                 dlist_delete(&subtxn->node);
729
730                 /* add to toplevel transaction */
731                 dlist_push_tail(&txn->subtxns, &subtxn->node);
732                 txn->nsubtxns++;
733         }
734         else if (new_top)
735         {
736                 elog(ERROR, "existing subxact assigned to unknown toplevel xact");
737         }
738 }
739
740 /*
741  * Associate a subtransaction with its toplevel transaction at commit
742  * time. There may be no further changes added after this.
743  */
744 void
745 ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
746                                                  TransactionId subxid, XLogRecPtr commit_lsn,
747                                                  XLogRecPtr end_lsn)
748 {
749         ReorderBufferTXN *txn;
750         ReorderBufferTXN *subtxn;
751
752         subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
753                                                                    InvalidXLogRecPtr, false);
754
755         /*
756          * No need to do anything if that subtxn didn't contain any changes
757          */
758         if (!subtxn)
759                 return;
760
761         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
762
763         if (txn == NULL)
764                 elog(ERROR, "subxact logged without previous toplevel record");
765
766         /*
767          * Pass our base snapshot to the parent transaction if it doesn't have
768          * one, or ours is older. That can happen if there are no changes in the
769          * toplevel transaction but in one of the child transactions. This allows
770          * the parent to simply use its base snapshot initially.
771          */
772         if (subtxn->base_snapshot != NULL &&
773                 (txn->base_snapshot == NULL ||
774                  txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
775         {
776                 txn->base_snapshot = subtxn->base_snapshot;
777                 txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
778                 subtxn->base_snapshot = NULL;
779                 subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
780         }
781
782         subtxn->final_lsn = commit_lsn;
783         subtxn->end_lsn = end_lsn;
784
785         if (!subtxn->is_known_as_subxact)
786         {
787                 subtxn->is_known_as_subxact = true;
788                 Assert(subtxn->nsubtxns == 0);
789
790                 /* remove from lsn order list of top-level transactions */
791                 dlist_delete(&subtxn->node);
792
793                 /* add to subtransaction list */
794                 dlist_push_tail(&txn->subtxns, &subtxn->node);
795                 txn->nsubtxns++;
796         }
797 }
798
799
800 /*
801  * Support for efficiently iterating over a transaction's and its
802  * subtransactions' changes.
803  *
804  * We do by doing a k-way merge between transactions/subtransactions. For that
805  * we model the current heads of the different transactions as a binary heap
806  * so we easily know which (sub-)transaction has the change with the smallest
807  * lsn next.
808  *
809  * We assume the changes in individual transactions are already sorted by LSN.
810  */
811
812 /*
813  * Binary heap comparison function.
814  */
815 static int
816 ReorderBufferIterCompare(Datum a, Datum b, void *arg)
817 {
818         ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
819         XLogRecPtr      pos_a = state->entries[DatumGetInt32(a)].lsn;
820         XLogRecPtr      pos_b = state->entries[DatumGetInt32(b)].lsn;
821
822         if (pos_a < pos_b)
823                 return 1;
824         else if (pos_a == pos_b)
825                 return 0;
826         return -1;
827 }
828
829 /*
830  * Allocate & initialize an iterator which iterates in lsn order over a
831  * transaction and all its subtransactions.
832  */
833 static ReorderBufferIterTXNState *
834 ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
835 {
836         Size            nr_txns = 0;
837         ReorderBufferIterTXNState *state;
838         dlist_iter      cur_txn_i;
839         int32           off;
840
841         /*
842          * Calculate the size of our heap: one element for every transaction that
843          * contains changes.  (Besides the transactions already in the reorder
844          * buffer, we count the one we were directly passed.)
845          */
846         if (txn->nentries > 0)
847                 nr_txns++;
848
849         dlist_foreach(cur_txn_i, &txn->subtxns)
850         {
851                 ReorderBufferTXN *cur_txn;
852
853                 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
854
855                 if (cur_txn->nentries > 0)
856                         nr_txns++;
857         }
858
859         /*
860          * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
861          * need to allocate/build a heap then.
862          */
863
864         /* allocate iteration state */
865         state = (ReorderBufferIterTXNState *)
866                 MemoryContextAllocZero(rb->context,
867                                                            sizeof(ReorderBufferIterTXNState) +
868                                                            sizeof(ReorderBufferIterTXNEntry) * nr_txns);
869
870         state->nr_txns = nr_txns;
871         dlist_init(&state->old_change);
872
873         for (off = 0; off < state->nr_txns; off++)
874         {
875                 state->entries[off].fd = -1;
876                 state->entries[off].segno = 0;
877         }
878
879         /* allocate heap */
880         state->heap = binaryheap_allocate(state->nr_txns,
881                                                                           ReorderBufferIterCompare,
882                                                                           state);
883
884         /*
885          * Now insert items into the binary heap, in an unordered fashion.  (We
886          * will run a heap assembly step at the end; this is more efficient.)
887          */
888
889         off = 0;
890
891         /* add toplevel transaction if it contains changes */
892         if (txn->nentries > 0)
893         {
894                 ReorderBufferChange *cur_change;
895
896                 if (txn->nentries != txn->nentries_mem)
897                 {
898                         /* serialize remaining changes */
899                         ReorderBufferSerializeTXN(rb, txn);
900                         ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
901                                                                                 &state->entries[off].segno);
902                 }
903
904                 cur_change = dlist_head_element(ReorderBufferChange, node,
905                                                                                 &txn->changes);
906
907                 state->entries[off].lsn = cur_change->lsn;
908                 state->entries[off].change = cur_change;
909                 state->entries[off].txn = txn;
910
911                 binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
912         }
913
914         /* add subtransactions if they contain changes */
915         dlist_foreach(cur_txn_i, &txn->subtxns)
916         {
917                 ReorderBufferTXN *cur_txn;
918
919                 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
920
921                 if (cur_txn->nentries > 0)
922                 {
923                         ReorderBufferChange *cur_change;
924
925                         if (cur_txn->nentries != cur_txn->nentries_mem)
926                         {
927                                 /* serialize remaining changes */
928                                 ReorderBufferSerializeTXN(rb, cur_txn);
929                                 ReorderBufferRestoreChanges(rb, cur_txn,
930                                                                                         &state->entries[off].fd,
931                                                                                         &state->entries[off].segno);
932                         }
933                         cur_change = dlist_head_element(ReorderBufferChange, node,
934                                                                                         &cur_txn->changes);
935
936                         state->entries[off].lsn = cur_change->lsn;
937                         state->entries[off].change = cur_change;
938                         state->entries[off].txn = cur_txn;
939
940                         binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
941                 }
942         }
943
944         /* assemble a valid binary heap */
945         binaryheap_build(state->heap);
946
947         return state;
948 }
949
950 /*
951  * Return the next change when iterating over a transaction and its
952  * subtransactions.
953  *
954  * Returns NULL when no further changes exist.
955  */
956 static ReorderBufferChange *
957 ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
958 {
959         ReorderBufferChange *change;
960         ReorderBufferIterTXNEntry *entry;
961         int32           off;
962
963         /* nothing there anymore */
964         if (state->heap->bh_size == 0)
965                 return NULL;
966
967         off = DatumGetInt32(binaryheap_first(state->heap));
968         entry = &state->entries[off];
969
970         /* free memory we might have "leaked" in the previous *Next call */
971         if (!dlist_is_empty(&state->old_change))
972         {
973                 change = dlist_container(ReorderBufferChange, node,
974                                                                  dlist_pop_head_node(&state->old_change));
975                 ReorderBufferReturnChange(rb, change);
976                 Assert(dlist_is_empty(&state->old_change));
977         }
978
979         change = entry->change;
980
981         /*
982          * update heap with information about which transaction has the next
983          * relevant change in LSN order
984          */
985
986         /* there are in-memory changes */
987         if (dlist_has_next(&entry->txn->changes, &entry->change->node))
988         {
989                 dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
990                 ReorderBufferChange *next_change =
991                 dlist_container(ReorderBufferChange, node, next);
992
993                 /* txn stays the same */
994                 state->entries[off].lsn = next_change->lsn;
995                 state->entries[off].change = next_change;
996
997                 binaryheap_replace_first(state->heap, Int32GetDatum(off));
998                 return change;
999         }
1000
1001         /* try to load changes from disk */
1002         if (entry->txn->nentries != entry->txn->nentries_mem)
1003         {
1004                 /*
1005                  * Ugly: restoring changes will reuse *Change records, thus delete the
1006                  * current one from the per-tx list and only free in the next call.
1007                  */
1008                 dlist_delete(&change->node);
1009                 dlist_push_tail(&state->old_change, &change->node);
1010
1011                 if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1012                                                                                 &state->entries[off].segno))
1013                 {
1014                         /* successfully restored changes from disk */
1015                         ReorderBufferChange *next_change =
1016                         dlist_head_element(ReorderBufferChange, node,
1017                                                            &entry->txn->changes);
1018
1019                         elog(DEBUG2, "restored %u/%u changes from disk",
1020                                  (uint32) entry->txn->nentries_mem,
1021                                  (uint32) entry->txn->nentries);
1022
1023                         Assert(entry->txn->nentries_mem);
1024                         /* txn stays the same */
1025                         state->entries[off].lsn = next_change->lsn;
1026                         state->entries[off].change = next_change;
1027                         binaryheap_replace_first(state->heap, Int32GetDatum(off));
1028
1029                         return change;
1030                 }
1031         }
1032
1033         /* ok, no changes there anymore, remove */
1034         binaryheap_remove_first(state->heap);
1035
1036         return change;
1037 }
1038
1039 /*
1040  * Deallocate the iterator
1041  */
1042 static void
1043 ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1044                                                    ReorderBufferIterTXNState *state)
1045 {
1046         int32           off;
1047
1048         for (off = 0; off < state->nr_txns; off++)
1049         {
1050                 if (state->entries[off].fd != -1)
1051                         CloseTransientFile(state->entries[off].fd);
1052         }
1053
1054         /* free memory we might have "leaked" in the last *Next call */
1055         if (!dlist_is_empty(&state->old_change))
1056         {
1057                 ReorderBufferChange *change;
1058
1059                 change = dlist_container(ReorderBufferChange, node,
1060                                                                  dlist_pop_head_node(&state->old_change));
1061                 ReorderBufferReturnChange(rb, change);
1062                 Assert(dlist_is_empty(&state->old_change));
1063         }
1064
1065         binaryheap_free(state->heap);
1066         pfree(state);
1067 }
1068
1069 /*
1070  * Cleanup the contents of a transaction, usually after the transaction
1071  * committed or aborted.
1072  */
1073 static void
1074 ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1075 {
1076         bool            found;
1077         dlist_mutable_iter iter;
1078
1079         /* cleanup subtransactions & their changes */
1080         dlist_foreach_modify(iter, &txn->subtxns)
1081         {
1082                 ReorderBufferTXN *subtxn;
1083
1084                 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1085
1086                 /*
1087                  * Subtransactions are always associated to the toplevel TXN, even if
1088                  * they originally were happening inside another subtxn, so we won't
1089                  * ever recurse more than one level deep here.
1090                  */
1091                 Assert(subtxn->is_known_as_subxact);
1092                 Assert(subtxn->nsubtxns == 0);
1093
1094                 ReorderBufferCleanupTXN(rb, subtxn);
1095         }
1096
1097         /* cleanup changes in the toplevel txn */
1098         dlist_foreach_modify(iter, &txn->changes)
1099         {
1100                 ReorderBufferChange *change;
1101
1102                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1103
1104                 ReorderBufferReturnChange(rb, change);
1105         }
1106
1107         /*
1108          * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1109          * They are always stored in the toplevel transaction.
1110          */
1111         dlist_foreach_modify(iter, &txn->tuplecids)
1112         {
1113                 ReorderBufferChange *change;
1114
1115                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1116                 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1117                 ReorderBufferReturnChange(rb, change);
1118         }
1119
1120         if (txn->base_snapshot != NULL)
1121         {
1122                 SnapBuildSnapDecRefcount(txn->base_snapshot);
1123                 txn->base_snapshot = NULL;
1124                 txn->base_snapshot_lsn = InvalidXLogRecPtr;
1125         }
1126
1127         /*
1128          * Remove TXN from its containing list.
1129          *
1130          * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1131          * parent's list of known subxacts; this leaves the parent's nsubxacts
1132          * count too high, but we don't care.  Otherwise, we are deleting the TXN
1133          * from the LSN-ordered list of toplevel TXNs.
1134          */
1135         dlist_delete(&txn->node);
1136
1137         /* now remove reference from buffer */
1138         hash_search(rb->by_txn,
1139                                 (void *) &txn->xid,
1140                                 HASH_REMOVE,
1141                                 &found);
1142         Assert(found);
1143
1144         /* remove entries spilled to disk */
1145         if (txn->nentries != txn->nentries_mem)
1146                 ReorderBufferRestoreCleanup(rb, txn);
1147
1148         /* deallocate */
1149         ReorderBufferReturnTXN(rb, txn);
1150 }
1151
1152 /*
1153  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1154  * tqual.c's HeapTupleSatisfiesHistoricMVCC.
1155  */
1156 static void
1157 ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
1158 {
1159         dlist_iter      iter;
1160         HASHCTL         hash_ctl;
1161
1162         if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1163                 return;
1164
1165         memset(&hash_ctl, 0, sizeof(hash_ctl));
1166
1167         hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1168         hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1169         hash_ctl.hcxt = rb->context;
1170
1171         /*
1172          * create the hash with the exact number of to-be-stored tuplecids from
1173          * the start
1174          */
1175         txn->tuplecid_hash =
1176                 hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1177                                         HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1178
1179         dlist_foreach(iter, &txn->tuplecids)
1180         {
1181                 ReorderBufferTupleCidKey key;
1182                 ReorderBufferTupleCidEnt *ent;
1183                 bool            found;
1184                 ReorderBufferChange *change;
1185
1186                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1187
1188                 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1189
1190                 /* be careful about padding */
1191                 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1192
1193                 key.relnode = change->data.tuplecid.node;
1194
1195                 ItemPointerCopy(&change->data.tuplecid.tid,
1196                                                 &key.tid);
1197
1198                 ent = (ReorderBufferTupleCidEnt *)
1199                         hash_search(txn->tuplecid_hash,
1200                                                 (void *) &key,
1201                                                 HASH_ENTER | HASH_FIND,
1202                                                 &found);
1203                 if (!found)
1204                 {
1205                         ent->cmin = change->data.tuplecid.cmin;
1206                         ent->cmax = change->data.tuplecid.cmax;
1207                         ent->combocid = change->data.tuplecid.combocid;
1208                 }
1209                 else
1210                 {
1211                         Assert(ent->cmin == change->data.tuplecid.cmin);
1212                         Assert(ent->cmax == InvalidCommandId ||
1213                                    ent->cmax == change->data.tuplecid.cmax);
1214
1215                         /*
1216                          * if the tuple got valid in this transaction and now got deleted
1217                          * we already have a valid cmin stored. The cmax will be
1218                          * InvalidCommandId though.
1219                          */
1220                         ent->cmax = change->data.tuplecid.cmax;
1221                 }
1222         }
1223 }
1224
1225 /*
1226  * Copy a provided snapshot so we can modify it privately. This is needed so
1227  * that catalog modifying transactions can look into intermediate catalog
1228  * states.
1229  */
1230 static Snapshot
1231 ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
1232                                           ReorderBufferTXN *txn, CommandId cid)
1233 {
1234         Snapshot        snap;
1235         dlist_iter      iter;
1236         int                     i = 0;
1237         Size            size;
1238
1239         size = sizeof(SnapshotData) +
1240                 sizeof(TransactionId) * orig_snap->xcnt +
1241                 sizeof(TransactionId) * (txn->nsubtxns + 1);
1242
1243         snap = MemoryContextAllocZero(rb->context, size);
1244         memcpy(snap, orig_snap, sizeof(SnapshotData));
1245
1246         snap->copied = true;
1247         snap->active_count = 1;         /* mark as active so nobody frees it */
1248         snap->regd_count = 0;
1249         snap->xip = (TransactionId *) (snap + 1);
1250
1251         memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1252
1253         /*
1254          * snap->subxip contains all txids that belong to our transaction which we
1255          * need to check via cmin/cmax. That's why we store the toplevel
1256          * transaction in there as well.
1257          */
1258         snap->subxip = snap->xip + snap->xcnt;
1259         snap->subxip[i++] = txn->xid;
1260
1261         /*
1262          * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1263          * Since it's an upper boundary it is safe to use it for the allocation
1264          * above.
1265          */
1266         snap->subxcnt = 1;
1267
1268         dlist_foreach(iter, &txn->subtxns)
1269         {
1270                 ReorderBufferTXN *sub_txn;
1271
1272                 sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1273                 snap->subxip[i++] = sub_txn->xid;
1274                 snap->subxcnt++;
1275         }
1276
1277         /* sort so we can bsearch() later */
1278         qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1279
1280         /* store the specified current CommandId */
1281         snap->curcid = cid;
1282
1283         return snap;
1284 }
1285
1286 /*
1287  * Free a previously ReorderBufferCopySnap'ed snapshot
1288  */
1289 static void
1290 ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
1291 {
1292         if (snap->copied)
1293                 pfree(snap);
1294         else
1295                 SnapBuildSnapDecRefcount(snap);
1296 }
1297
1298 /*
1299  * Perform the replay of a transaction and it's non-aborted subtransactions.
1300  *
1301  * Subtransactions previously have to be processed by
1302  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1303  * transaction with ReorderBufferAssignChild.
1304  *
1305  * We currently can only decode a transaction's contents in when their commit
1306  * record is read because that's currently the only place where we know about
1307  * cache invalidations. Thus, once a toplevel commit is read, we iterate over
1308  * the top and subtransactions (using a k-way merge) and replay the changes in
1309  * lsn order.
1310  */
1311 void
1312 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
1313                                         XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1314                                         TimestampTz commit_time,
1315                                         RepOriginId origin_id, XLogRecPtr origin_lsn)
1316 {
1317         ReorderBufferTXN *txn;
1318         volatile Snapshot snapshot_now;
1319         volatile CommandId command_id = FirstCommandId;
1320         bool            using_subtxn;
1321         ReorderBufferIterTXNState *volatile iterstate = NULL;
1322
1323         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1324                                                                 false);
1325
1326         /* unknown transaction, nothing to replay */
1327         if (txn == NULL)
1328                 return;
1329
1330         txn->final_lsn = commit_lsn;
1331         txn->end_lsn = end_lsn;
1332         txn->commit_time = commit_time;
1333         txn->origin_id = origin_id;
1334         txn->origin_lsn = origin_lsn;
1335
1336         /*
1337          * If this transaction didn't have any real changes in our database, it's
1338          * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1339          * transferred its snapshot to this transaction if it had one and the
1340          * toplevel tx didn't.
1341          */
1342         if (txn->base_snapshot == NULL)
1343         {
1344                 Assert(txn->ninvalidations == 0);
1345                 ReorderBufferCleanupTXN(rb, txn);
1346                 return;
1347         }
1348
1349         snapshot_now = txn->base_snapshot;
1350
1351         /* build data to be able to lookup the CommandIds of catalog tuples */
1352         ReorderBufferBuildTupleCidHash(rb, txn);
1353
1354         /* setup the initial snapshot */
1355         SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1356
1357         /*
1358          * Decoding needs access to syscaches et al., which in turn use
1359          * heavyweight locks and such. Thus we need to have enough state around to
1360          * keep track of those.  The easiest way is to simply use a transaction
1361          * internally.  That also allows us to easily enforce that nothing writes
1362          * to the database by checking for xid assignments.
1363          *
1364          * When we're called via the SQL SRF there's already a transaction
1365          * started, so start an explicit subtransaction there.
1366          */
1367         using_subtxn = IsTransactionOrTransactionBlock();
1368
1369         PG_TRY();
1370         {
1371                 ReorderBufferChange *change;
1372                 ReorderBufferChange *specinsert = NULL;
1373
1374                 if (using_subtxn)
1375                         BeginInternalSubTransaction("replay");
1376                 else
1377                         StartTransactionCommand();
1378
1379                 rb->begin(rb, txn);
1380
1381                 iterstate = ReorderBufferIterTXNInit(rb, txn);
1382                 while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1383                 {
1384                         Relation        relation = NULL;
1385                         Oid                     reloid;
1386
1387                         switch (change->action)
1388                         {
1389                                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
1390
1391                                         /*
1392                                          * Confirmation for speculative insertion arrived. Simply
1393                                          * use as a normal record. It'll be cleaned up at the end
1394                                          * of INSERT processing.
1395                                          */
1396                                         Assert(specinsert->data.tp.oldtuple == NULL);
1397                                         change = specinsert;
1398                                         change->action = REORDER_BUFFER_CHANGE_INSERT;
1399
1400                                         /* intentionally fall through */
1401                                 case REORDER_BUFFER_CHANGE_INSERT:
1402                                 case REORDER_BUFFER_CHANGE_UPDATE:
1403                                 case REORDER_BUFFER_CHANGE_DELETE:
1404                                         Assert(snapshot_now);
1405
1406                                         reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1407                                                                                         change->data.tp.relnode.relNode);
1408
1409                                         /*
1410                                          * Catalog tuple without data, emitted while catalog was
1411                                          * in the process of being rewritten.
1412                                          */
1413                                         if (reloid == InvalidOid &&
1414                                                 change->data.tp.newtuple == NULL &&
1415                                                 change->data.tp.oldtuple == NULL)
1416                                                 goto change_done;
1417                                         else if (reloid == InvalidOid)
1418                                                 elog(ERROR, "could not map filenode \"%s\" to relation OID",
1419                                                          relpathperm(change->data.tp.relnode,
1420                                                                                  MAIN_FORKNUM));
1421
1422                                         relation = RelationIdGetRelation(reloid);
1423
1424                                         if (relation == NULL)
1425                                                 elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1426                                                          reloid,
1427                                                          relpathperm(change->data.tp.relnode,
1428                                                                                  MAIN_FORKNUM));
1429
1430                                         if (!RelationIsLogicallyLogged(relation))
1431                                                 goto change_done;
1432
1433                                         /*
1434                                          * For now ignore sequence changes entirely. Most of the
1435                                          * time they don't log changes using records we
1436                                          * understand, so it doesn't make sense to handle the few
1437                                          * cases we do.
1438                                          */
1439                                         if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1440                                                 goto change_done;
1441
1442                                         /* user-triggered change */
1443                                         if (!IsToastRelation(relation))
1444                                         {
1445                                                 ReorderBufferToastReplace(rb, txn, relation, change);
1446                                                 rb->apply_change(rb, txn, relation, change);
1447
1448                                                 /*
1449                                                  * Only clear reassembled toast chunks if we're sure
1450                                                  * they're not required anymore. The creator of the
1451                                                  * tuple tells us.
1452                                                  */
1453                                                 if (change->data.tp.clear_toast_afterwards)
1454                                                         ReorderBufferToastReset(rb, txn);
1455                                         }
1456                                         /* we're not interested in toast deletions */
1457                                         else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1458                                         {
1459                                                 /*
1460                                                  * Need to reassemble the full toasted Datum in
1461                                                  * memory, to ensure the chunks don't get reused till
1462                                                  * we're done remove it from the list of this
1463                                                  * transaction's changes. Otherwise it will get
1464                                                  * freed/reused while restoring spooled data from
1465                                                  * disk.
1466                                                  */
1467                                                 dlist_delete(&change->node);
1468                                                 ReorderBufferToastAppendChunk(rb, txn, relation,
1469                                                                                                           change);
1470                                         }
1471
1472                         change_done:
1473
1474                                         /*
1475                                          * Either speculative insertion was confirmed, or it was
1476                                          * unsuccessful and the record isn't needed anymore.
1477                                          */
1478                                         if (specinsert != NULL)
1479                                         {
1480                                                 ReorderBufferReturnChange(rb, specinsert);
1481                                                 specinsert = NULL;
1482                                         }
1483
1484                                         if (relation != NULL)
1485                                         {
1486                                                 RelationClose(relation);
1487                                                 relation = NULL;
1488                                         }
1489                                         break;
1490
1491                                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
1492
1493                                         /*
1494                                          * Speculative insertions are dealt with by delaying the
1495                                          * processing of the insert until the confirmation record
1496                                          * arrives. For that we simply unlink the record from the
1497                                          * chain, so it does not get freed/reused while restoring
1498                                          * spooled data from disk.
1499                                          *
1500                                          * This is safe in the face of concurrent catalog changes
1501                                          * because the relevant relation can't be changed between
1502                                          * speculative insertion and confirmation due to
1503                                          * CheckTableNotInUse() and locking.
1504                                          */
1505
1506                                         /* clear out a pending (and thus failed) speculation */
1507                                         if (specinsert != NULL)
1508                                         {
1509                                                 ReorderBufferReturnChange(rb, specinsert);
1510                                                 specinsert = NULL;
1511                                         }
1512
1513                                         /* and memorize the pending insertion */
1514                                         dlist_delete(&change->node);
1515                                         specinsert = change;
1516                                         break;
1517
1518                                 case REORDER_BUFFER_CHANGE_MESSAGE:
1519                                         rb->message(rb, txn, change->lsn, true,
1520                                                                 change->data.msg.prefix,
1521                                                                 change->data.msg.message_size,
1522                                                                 change->data.msg.message);
1523                                         break;
1524
1525                                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
1526                                         /* get rid of the old */
1527                                         TeardownHistoricSnapshot(false);
1528
1529                                         if (snapshot_now->copied)
1530                                         {
1531                                                 ReorderBufferFreeSnap(rb, snapshot_now);
1532                                                 snapshot_now =
1533                                                         ReorderBufferCopySnap(rb, change->data.snapshot,
1534                                                                                                   txn, command_id);
1535                                         }
1536
1537                                         /*
1538                                          * Restored from disk, need to be careful not to double
1539                                          * free. We could introduce refcounting for that, but for
1540                                          * now this seems infrequent enough not to care.
1541                                          */
1542                                         else if (change->data.snapshot->copied)
1543                                         {
1544                                                 snapshot_now =
1545                                                         ReorderBufferCopySnap(rb, change->data.snapshot,
1546                                                                                                   txn, command_id);
1547                                         }
1548                                         else
1549                                         {
1550                                                 snapshot_now = change->data.snapshot;
1551                                         }
1552
1553
1554                                         /* and continue with the new one */
1555                                         SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1556                                         break;
1557
1558                                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
1559                                         Assert(change->data.command_id != InvalidCommandId);
1560
1561                                         if (command_id < change->data.command_id)
1562                                         {
1563                                                 command_id = change->data.command_id;
1564
1565                                                 if (!snapshot_now->copied)
1566                                                 {
1567                                                         /* we don't use the global one anymore */
1568                                                         snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1569                                                                                                                         txn, command_id);
1570                                                 }
1571
1572                                                 snapshot_now->curcid = command_id;
1573
1574                                                 TeardownHistoricSnapshot(false);
1575                                                 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1576
1577                                                 /*
1578                                                  * Every time the CommandId is incremented, we could
1579                                                  * see new catalog contents, so execute all
1580                                                  * invalidations.
1581                                                  */
1582                                                 ReorderBufferExecuteInvalidations(rb, txn);
1583                                         }
1584
1585                                         break;
1586
1587                                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
1588                                         elog(ERROR, "tuplecid value in changequeue");
1589                                         break;
1590                         }
1591                 }
1592
1593                 /*
1594                  * There's a speculative insertion remaining, just clean in up, it
1595                  * can't have been successful, otherwise we'd gotten a confirmation
1596                  * record.
1597                  */
1598                 if (specinsert)
1599                 {
1600                         ReorderBufferReturnChange(rb, specinsert);
1601                         specinsert = NULL;
1602                 }
1603
1604                 /* clean up the iterator */
1605                 ReorderBufferIterTXNFinish(rb, iterstate);
1606                 iterstate = NULL;
1607
1608                 /* call commit callback */
1609                 rb->commit(rb, txn, commit_lsn);
1610
1611                 /* this is just a sanity check against bad output plugin behaviour */
1612                 if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
1613                         elog(ERROR, "output plugin used XID %u",
1614                                  GetCurrentTransactionId());
1615
1616                 /* cleanup */
1617                 TeardownHistoricSnapshot(false);
1618
1619                 /*
1620                  * Aborting the current (sub-)transaction as a whole has the right
1621                  * semantics. We want all locks acquired in here to be released, not
1622                  * reassigned to the parent and we do not want any database access
1623                  * have persistent effects.
1624                  */
1625                 AbortCurrentTransaction();
1626
1627                 /* make sure there's no cache pollution */
1628                 ReorderBufferExecuteInvalidations(rb, txn);
1629
1630                 if (using_subtxn)
1631                         RollbackAndReleaseCurrentSubTransaction();
1632
1633                 if (snapshot_now->copied)
1634                         ReorderBufferFreeSnap(rb, snapshot_now);
1635
1636                 /* remove potential on-disk data, and deallocate */
1637                 ReorderBufferCleanupTXN(rb, txn);
1638         }
1639         PG_CATCH();
1640         {
1641                 /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1642                 if (iterstate)
1643                         ReorderBufferIterTXNFinish(rb, iterstate);
1644
1645                 TeardownHistoricSnapshot(true);
1646
1647                 /*
1648                  * Force cache invalidation to happen outside of a valid transaction
1649                  * to prevent catalog access as we just caught an error.
1650                  */
1651                 AbortCurrentTransaction();
1652
1653                 /* make sure there's no cache pollution */
1654                 ReorderBufferExecuteInvalidations(rb, txn);
1655
1656                 if (using_subtxn)
1657                         RollbackAndReleaseCurrentSubTransaction();
1658
1659                 if (snapshot_now->copied)
1660                         ReorderBufferFreeSnap(rb, snapshot_now);
1661
1662                 /* remove potential on-disk data, and deallocate */
1663                 ReorderBufferCleanupTXN(rb, txn);
1664
1665                 PG_RE_THROW();
1666         }
1667         PG_END_TRY();
1668 }
1669
1670 /*
1671  * Abort a transaction that possibly has previous changes. Needs to be first
1672  * called for subtransactions and then for the toplevel xid.
1673  *
1674  * NB: Transactions handled here have to have actively aborted (i.e. have
1675  * produced an abort record). Implicitly aborted transactions are handled via
1676  * ReorderBufferAbortOld(); transactions we're just not interested in, but
1677  * which have committed are handled in ReorderBufferForget().
1678  *
1679  * This function purges this transaction and its contents from memory and
1680  * disk.
1681  */
1682 void
1683 ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1684 {
1685         ReorderBufferTXN *txn;
1686
1687         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1688                                                                 false);
1689
1690         /* unknown, nothing to remove */
1691         if (txn == NULL)
1692                 return;
1693
1694         /* cosmetic... */
1695         txn->final_lsn = lsn;
1696
1697         /* remove potential on-disk data, and deallocate */
1698         ReorderBufferCleanupTXN(rb, txn);
1699 }
1700
1701 /*
1702  * Abort all transactions that aren't actually running anymore because the
1703  * server restarted.
1704  *
1705  * NB: These really have to be transactions that have aborted due to a server
1706  * crash/immediate restart, as we don't deal with invalidations here.
1707  */
1708 void
1709 ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
1710 {
1711         dlist_mutable_iter it;
1712
1713         /*
1714          * Iterate through all (potential) toplevel TXNs and abort all that are
1715          * older than what possibly can be running. Once we've found the first
1716          * that is alive we stop, there might be some that acquired an xid earlier
1717          * but started writing later, but it's unlikely and they will cleaned up
1718          * in a later call to ReorderBufferAbortOld().
1719          */
1720         dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1721         {
1722                 ReorderBufferTXN *txn;
1723
1724                 txn = dlist_container(ReorderBufferTXN, node, it.cur);
1725
1726                 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1727                 {
1728                         elog(DEBUG2, "aborting old transaction %u", txn->xid);
1729
1730                         /* remove potential on-disk data, and deallocate this tx */
1731                         ReorderBufferCleanupTXN(rb, txn);
1732                 }
1733                 else
1734                         return;
1735         }
1736 }
1737
1738 /*
1739  * Forget the contents of a transaction if we aren't interested in it's
1740  * contents. Needs to be first called for subtransactions and then for the
1741  * toplevel xid.
1742  *
1743  * This is significantly different to ReorderBufferAbort() because
1744  * transactions that have committed need to be treated differently from aborted
1745  * ones since they may have modified the catalog.
1746  *
1747  * Note that this is only allowed to be called in the moment a transaction
1748  * commit has just been read, not earlier; otherwise later records referring
1749  * to this xid might re-create the transaction incompletely.
1750  */
1751 void
1752 ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1753 {
1754         ReorderBufferTXN *txn;
1755
1756         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1757                                                                 false);
1758
1759         /* unknown, nothing to forget */
1760         if (txn == NULL)
1761                 return;
1762
1763         /* cosmetic... */
1764         txn->final_lsn = lsn;
1765
1766         /*
1767          * Process cache invalidation messages if there are any. Even if we're not
1768          * interested in the transaction's contents, it could have manipulated the
1769          * catalog and we need to update the caches according to that.
1770          */
1771         if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1772                 ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
1773                                                                                    txn->invalidations);
1774         else
1775                 Assert(txn->ninvalidations == 0);
1776
1777         /* remove potential on-disk data, and deallocate */
1778         ReorderBufferCleanupTXN(rb, txn);
1779 }
1780
1781 /*
1782  * Execute invalidations happening outside the context of a decoded
1783  * transaction. That currently happens either for xid-less commits
1784  * (c.f. RecordTransactionCommit()) or for invalidations in uninteresting
1785  * transactions (via ReorderBufferForget()).
1786  */
1787 void
1788 ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
1789                                                                    SharedInvalidationMessage *invalidations)
1790 {
1791         bool            use_subtxn = IsTransactionOrTransactionBlock();
1792         int                     i;
1793
1794         if (use_subtxn)
1795                 BeginInternalSubTransaction("replay");
1796
1797         /*
1798          * Force invalidations to happen outside of a valid transaction - that way
1799          * entries will just be marked as invalid without accessing the catalog.
1800          * That's advantageous because we don't need to setup the full state
1801          * necessary for catalog access.
1802          */
1803         if (use_subtxn)
1804                 AbortCurrentTransaction();
1805
1806         for (i = 0; i < ninvalidations; i++)
1807                 LocalExecuteInvalidationMessage(&invalidations[i]);
1808
1809         if (use_subtxn)
1810                 RollbackAndReleaseCurrentSubTransaction();
1811 }
1812
1813 /*
1814  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
1815  * least once for every xid in XLogRecord->xl_xid (other places in records
1816  * may, but do not have to be passed through here).
1817  *
1818  * Reorderbuffer keeps some datastructures about transactions in LSN order,
1819  * for efficiency. To do that it has to know about when transactions are seen
1820  * first in the WAL. As many types of records are not actually interesting for
1821  * logical decoding, they do not necessarily pass though here.
1822  */
1823 void
1824 ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1825 {
1826         /* many records won't have an xid assigned, centralize check here */
1827         if (xid != InvalidTransactionId)
1828                 ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1829 }
1830
1831 /*
1832  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
1833  * because the previous snapshot doesn't describe the catalog correctly for
1834  * following rows.
1835  */
1836 void
1837 ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
1838                                                  XLogRecPtr lsn, Snapshot snap)
1839 {
1840         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1841
1842         change->data.snapshot = snap;
1843         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
1844
1845         ReorderBufferQueueChange(rb, xid, lsn, change);
1846 }
1847
1848 /*
1849  * Setup the base snapshot of a transaction. The base snapshot is the snapshot
1850  * that is used to decode all changes until either this transaction modifies
1851  * the catalog or another catalog modifying transaction commits.
1852  *
1853  * Needs to be called before any changes are added with
1854  * ReorderBufferQueueChange().
1855  */
1856 void
1857 ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
1858                                                          XLogRecPtr lsn, Snapshot snap)
1859 {
1860         ReorderBufferTXN *txn;
1861         bool            is_new;
1862
1863         txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
1864         Assert(txn->base_snapshot == NULL);
1865         Assert(snap != NULL);
1866
1867         txn->base_snapshot = snap;
1868         txn->base_snapshot_lsn = lsn;
1869 }
1870
1871 /*
1872  * Access the catalog with this CommandId at this point in the changestream.
1873  *
1874  * May only be called for command ids > 1
1875  */
1876 void
1877 ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
1878                                                          XLogRecPtr lsn, CommandId cid)
1879 {
1880         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1881
1882         change->data.command_id = cid;
1883         change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
1884
1885         ReorderBufferQueueChange(rb, xid, lsn, change);
1886 }
1887
1888
1889 /*
1890  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
1891  */
1892 void
1893 ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
1894                                                          XLogRecPtr lsn, RelFileNode node,
1895                                                          ItemPointerData tid, CommandId cmin,
1896                                                          CommandId cmax, CommandId combocid)
1897 {
1898         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1899         ReorderBufferTXN *txn;
1900
1901         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1902
1903         change->data.tuplecid.node = node;
1904         change->data.tuplecid.tid = tid;
1905         change->data.tuplecid.cmin = cmin;
1906         change->data.tuplecid.cmax = cmax;
1907         change->data.tuplecid.combocid = combocid;
1908         change->lsn = lsn;
1909         change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
1910
1911         dlist_push_tail(&txn->tuplecids, &change->node);
1912         txn->ntuplecids++;
1913 }
1914
1915 /*
1916  * Setup the invalidation of the toplevel transaction.
1917  *
1918  * This needs to be done before ReorderBufferCommit is called!
1919  */
1920 void
1921 ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
1922                                                           XLogRecPtr lsn, Size nmsgs,
1923                                                           SharedInvalidationMessage *msgs)
1924 {
1925         ReorderBufferTXN *txn;
1926
1927         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1928
1929         if (txn->ninvalidations != 0)
1930                 elog(ERROR, "only ever add one set of invalidations");
1931
1932         Assert(nmsgs > 0);
1933
1934         txn->ninvalidations = nmsgs;
1935         txn->invalidations = (SharedInvalidationMessage *)
1936                 MemoryContextAlloc(rb->context,
1937                                                    sizeof(SharedInvalidationMessage) * nmsgs);
1938         memcpy(txn->invalidations, msgs,
1939                    sizeof(SharedInvalidationMessage) * nmsgs);
1940 }
1941
1942 /*
1943  * Apply all invalidations we know. Possibly we only need parts at this point
1944  * in the changestream but we don't know which those are.
1945  */
1946 static void
1947 ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
1948 {
1949         int                     i;
1950
1951         for (i = 0; i < txn->ninvalidations; i++)
1952                 LocalExecuteInvalidationMessage(&txn->invalidations[i]);
1953 }
1954
1955 /*
1956  * Mark a transaction as containing catalog changes
1957  */
1958 void
1959 ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
1960                                                                   XLogRecPtr lsn)
1961 {
1962         ReorderBufferTXN *txn;
1963
1964         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1965
1966         txn->has_catalog_changes = true;
1967 }
1968
1969 /*
1970  * Query whether a transaction is already *known* to contain catalog
1971  * changes. This can be wrong until directly before the commit!
1972  */
1973 bool
1974 ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
1975 {
1976         ReorderBufferTXN *txn;
1977
1978         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1979                                                                 false);
1980         if (txn == NULL)
1981                 return false;
1982
1983         return txn->has_catalog_changes;
1984 }
1985
1986 /*
1987  * Have we already added the first snapshot?
1988  */
1989 bool
1990 ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
1991 {
1992         ReorderBufferTXN *txn;
1993
1994         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1995                                                                 false);
1996
1997         /* transaction isn't known yet, ergo no snapshot */
1998         if (txn == NULL)
1999                 return false;
2000
2001         /*
2002          * TODO: It would be a nice improvement if we would check the toplevel
2003          * transaction in subtransactions, but we'd need to keep track of a bit
2004          * more state.
2005          */
2006         return txn->base_snapshot != NULL;
2007 }
2008
2009
2010 /*
2011  * ---------------------------------------
2012  * Disk serialization support
2013  * ---------------------------------------
2014  */
2015
2016 /*
2017  * Ensure the IO buffer is >= sz.
2018  */
2019 static void
2020 ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
2021 {
2022         if (!rb->outbufsize)
2023         {
2024                 rb->outbuf = MemoryContextAlloc(rb->context, sz);
2025                 rb->outbufsize = sz;
2026         }
2027         else if (rb->outbufsize < sz)
2028         {
2029                 rb->outbuf = repalloc(rb->outbuf, sz);
2030                 rb->outbufsize = sz;
2031         }
2032 }
2033
2034 /*
2035  * Check whether the transaction tx should spill its data to disk.
2036  */
2037 static void
2038 ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2039 {
2040         /*
2041          * TODO: improve accounting so we cheaply can take subtransactions into
2042          * account here.
2043          */
2044         if (txn->nentries_mem >= max_changes_in_memory)
2045         {
2046                 ReorderBufferSerializeTXN(rb, txn);
2047                 Assert(txn->nentries_mem == 0);
2048         }
2049 }
2050
2051 /*
2052  * Spill data of a large transaction (and its subtransactions) to disk.
2053  */
2054 static void
2055 ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2056 {
2057         dlist_iter      subtxn_i;
2058         dlist_mutable_iter change_i;
2059         int                     fd = -1;
2060         XLogSegNo       curOpenSegNo = 0;
2061         Size            spilled = 0;
2062         char            path[MAXPGPATH];
2063
2064         elog(DEBUG2, "spill %u changes in XID %u to disk",
2065                  (uint32) txn->nentries_mem, txn->xid);
2066
2067         /* do the same to all child TXs */
2068         dlist_foreach(subtxn_i, &txn->subtxns)
2069         {
2070                 ReorderBufferTXN *subtxn;
2071
2072                 subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2073                 ReorderBufferSerializeTXN(rb, subtxn);
2074         }
2075
2076         /* serialize changestream */
2077         dlist_foreach_modify(change_i, &txn->changes)
2078         {
2079                 ReorderBufferChange *change;
2080
2081                 change = dlist_container(ReorderBufferChange, node, change_i.cur);
2082
2083                 /*
2084                  * store in segment in which it belongs by start lsn, don't split over
2085                  * multiple segments tho
2086                  */
2087                 if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo))
2088                 {
2089                         XLogRecPtr      recptr;
2090
2091                         if (fd != -1)
2092                                 CloseTransientFile(fd);
2093
2094                         XLByteToSeg(change->lsn, curOpenSegNo);
2095                         XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
2096
2097                         /*
2098                          * No need to care about TLIs here, only used during a single run,
2099                          * so each LSN only maps to a specific WAL record.
2100                          */
2101                         sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2102                                         NameStr(MyReplicationSlot->data.name), txn->xid,
2103                                         (uint32) (recptr >> 32), (uint32) recptr);
2104
2105                         /* open segment, create it if necessary */
2106                         fd = OpenTransientFile(path,
2107                                                                    O_CREAT | O_WRONLY | O_APPEND | PG_BINARY,
2108                                                                    S_IRUSR | S_IWUSR);
2109
2110                         if (fd < 0)
2111                                 ereport(ERROR,
2112                                                 (errcode_for_file_access(),
2113                                                  errmsg("could not open file \"%s\": %m",
2114                                                                 path)));
2115                 }
2116
2117                 ReorderBufferSerializeChange(rb, txn, fd, change);
2118                 dlist_delete(&change->node);
2119                 ReorderBufferReturnChange(rb, change);
2120
2121                 spilled++;
2122         }
2123
2124         Assert(spilled == txn->nentries_mem);
2125         Assert(dlist_is_empty(&txn->changes));
2126         txn->nentries_mem = 0;
2127
2128         if (fd != -1)
2129                 CloseTransientFile(fd);
2130 }
2131
2132 /*
2133  * Serialize individual change to disk.
2134  */
2135 static void
2136 ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2137                                                          int fd, ReorderBufferChange *change)
2138 {
2139         ReorderBufferDiskChange *ondisk;
2140         Size            sz = sizeof(ReorderBufferDiskChange);
2141
2142         ReorderBufferSerializeReserve(rb, sz);
2143
2144         ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2145         memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2146
2147         switch (change->action)
2148         {
2149                         /* fall through these, they're all similar enough */
2150                 case REORDER_BUFFER_CHANGE_INSERT:
2151                 case REORDER_BUFFER_CHANGE_UPDATE:
2152                 case REORDER_BUFFER_CHANGE_DELETE:
2153                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2154                         {
2155                                 char       *data;
2156                                 ReorderBufferTupleBuf *oldtup,
2157                                                    *newtup;
2158                                 Size            oldlen = 0;
2159                                 Size            newlen = 0;
2160
2161                                 oldtup = change->data.tp.oldtuple;
2162                                 newtup = change->data.tp.newtuple;
2163
2164                                 if (oldtup)
2165                                 {
2166                                         sz += sizeof(HeapTupleData);
2167                                         oldlen = oldtup->tuple.t_len;
2168                                         sz += oldlen;
2169                                 }
2170
2171                                 if (newtup)
2172                                 {
2173                                         sz += sizeof(HeapTupleData);
2174                                         newlen = newtup->tuple.t_len;
2175                                         sz += newlen;
2176                                 }
2177
2178                                 /* make sure we have enough space */
2179                                 ReorderBufferSerializeReserve(rb, sz);
2180
2181                                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2182                                 /* might have been reallocated above */
2183                                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2184
2185                                 if (oldlen)
2186                                 {
2187                                         memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2188                                         data += sizeof(HeapTupleData);
2189
2190                                         memcpy(data, oldtup->tuple.t_data, oldlen);
2191                                         data += oldlen;
2192                                 }
2193
2194                                 if (newlen)
2195                                 {
2196                                         memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2197                                         data += sizeof(HeapTupleData);
2198
2199                                         memcpy(data, newtup->tuple.t_data, newlen);
2200                                         data += newlen;
2201                                 }
2202                                 break;
2203                         }
2204                 case REORDER_BUFFER_CHANGE_MESSAGE:
2205                         {
2206                                 char       *data;
2207                                 Size            prefix_size = strlen(change->data.msg.prefix) + 1;
2208
2209                                 sz += prefix_size + change->data.msg.message_size +
2210                                         sizeof(Size) + sizeof(Size);
2211                                 ReorderBufferSerializeReserve(rb, sz);
2212
2213                                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2214
2215                                 /* might have been reallocated above */
2216                                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2217
2218                                 /* write the prefix including the size */
2219                                 memcpy(data, &prefix_size, sizeof(Size));
2220                                 data += sizeof(Size);
2221                                 memcpy(data, change->data.msg.prefix,
2222                                            prefix_size);
2223                                 data += prefix_size;
2224
2225                                 /* write the message including the size */
2226                                 memcpy(data, &change->data.msg.message_size, sizeof(Size));
2227                                 data += sizeof(Size);
2228                                 memcpy(data, change->data.msg.message,
2229                                            change->data.msg.message_size);
2230                                 data += change->data.msg.message_size;
2231
2232                                 break;
2233                         }
2234                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2235                         {
2236                                 Snapshot        snap;
2237                                 char       *data;
2238
2239                                 snap = change->data.snapshot;
2240
2241                                 sz += sizeof(SnapshotData) +
2242                                         sizeof(TransactionId) * snap->xcnt +
2243                                         sizeof(TransactionId) * snap->subxcnt
2244                                         ;
2245
2246                                 /* make sure we have enough space */
2247                                 ReorderBufferSerializeReserve(rb, sz);
2248                                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2249                                 /* might have been reallocated above */
2250                                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2251
2252                                 memcpy(data, snap, sizeof(SnapshotData));
2253                                 data += sizeof(SnapshotData);
2254
2255                                 if (snap->xcnt)
2256                                 {
2257                                         memcpy(data, snap->xip,
2258                                                    sizeof(TransactionId) * snap->xcnt);
2259                                         data += sizeof(TransactionId) * snap->xcnt;
2260                                 }
2261
2262                                 if (snap->subxcnt)
2263                                 {
2264                                         memcpy(data, snap->subxip,
2265                                                    sizeof(TransactionId) * snap->subxcnt);
2266                                         data += sizeof(TransactionId) * snap->subxcnt;
2267                                 }
2268                                 break;
2269                         }
2270                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2271                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2272                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2273                         /* ReorderBufferChange contains everything important */
2274                         break;
2275         }
2276
2277         ondisk->size = sz;
2278
2279         pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
2280         if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2281         {
2282                 int                     save_errno = errno;
2283
2284                 CloseTransientFile(fd);
2285                 errno = save_errno;
2286                 ereport(ERROR,
2287                                 (errcode_for_file_access(),
2288                                  errmsg("could not write to data file for XID %u: %m",
2289                                                 txn->xid)));
2290         }
2291         pgstat_report_wait_end();
2292
2293         Assert(ondisk->change.action == change->action);
2294 }
2295
2296 /*
2297  * Restore a number of changes spilled to disk back into memory.
2298  */
2299 static Size
2300 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2301                                                         int *fd, XLogSegNo *segno)
2302 {
2303         Size            restored = 0;
2304         XLogSegNo       last_segno;
2305         dlist_mutable_iter cleanup_iter;
2306
2307         Assert(txn->first_lsn != InvalidXLogRecPtr);
2308         Assert(txn->final_lsn != InvalidXLogRecPtr);
2309
2310         /* free current entries, so we have memory for more */
2311         dlist_foreach_modify(cleanup_iter, &txn->changes)
2312         {
2313                 ReorderBufferChange *cleanup =
2314                 dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2315
2316                 dlist_delete(&cleanup->node);
2317                 ReorderBufferReturnChange(rb, cleanup);
2318         }
2319         txn->nentries_mem = 0;
2320         Assert(dlist_is_empty(&txn->changes));
2321
2322         XLByteToSeg(txn->final_lsn, last_segno);
2323
2324         while (restored < max_changes_in_memory && *segno <= last_segno)
2325         {
2326                 int                     readBytes;
2327                 ReorderBufferDiskChange *ondisk;
2328
2329                 if (*fd == -1)
2330                 {
2331                         XLogRecPtr      recptr;
2332                         char            path[MAXPGPATH];
2333
2334                         /* first time in */
2335                         if (*segno == 0)
2336                         {
2337                                 XLByteToSeg(txn->first_lsn, *segno);
2338                         }
2339
2340                         Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2341                         XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
2342
2343                         /*
2344                          * No need to care about TLIs here, only used during a single run,
2345                          * so each LSN only maps to a specific WAL record.
2346                          */
2347                         sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2348                                         NameStr(MyReplicationSlot->data.name), txn->xid,
2349                                         (uint32) (recptr >> 32), (uint32) recptr);
2350
2351                         *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2352                         if (*fd < 0 && errno == ENOENT)
2353                         {
2354                                 *fd = -1;
2355                                 (*segno)++;
2356                                 continue;
2357                         }
2358                         else if (*fd < 0)
2359                                 ereport(ERROR,
2360                                                 (errcode_for_file_access(),
2361                                                  errmsg("could not open file \"%s\": %m",
2362                                                                 path)));
2363
2364                 }
2365
2366                 /*
2367                  * Read the statically sized part of a change which has information
2368                  * about the total size. If we couldn't read a record, we're at the
2369                  * end of this file.
2370                  */
2371                 ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
2372                 pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2373                 readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2374                 pgstat_report_wait_end();
2375
2376                 /* eof */
2377                 if (readBytes == 0)
2378                 {
2379                         CloseTransientFile(*fd);
2380                         *fd = -1;
2381                         (*segno)++;
2382                         continue;
2383                 }
2384                 else if (readBytes < 0)
2385                         ereport(ERROR,
2386                                         (errcode_for_file_access(),
2387                                 errmsg("could not read from reorderbuffer spill file: %m")));
2388                 else if (readBytes != sizeof(ReorderBufferDiskChange))
2389                         ereport(ERROR,
2390                                         (errcode_for_file_access(),
2391                                          errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2392                                                         readBytes,
2393                                                         (uint32) sizeof(ReorderBufferDiskChange))));
2394
2395                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2396
2397                 ReorderBufferSerializeReserve(rb,
2398                                                          sizeof(ReorderBufferDiskChange) + ondisk->size);
2399                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2400
2401                 pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2402                 readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2403                                                  ondisk->size - sizeof(ReorderBufferDiskChange));
2404                 pgstat_report_wait_end();
2405
2406                 if (readBytes < 0)
2407                         ereport(ERROR,
2408                                         (errcode_for_file_access(),
2409                                 errmsg("could not read from reorderbuffer spill file: %m")));
2410                 else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2411                         ereport(ERROR,
2412                                         (errcode_for_file_access(),
2413                                          errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2414                                                         readBytes,
2415                                 (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2416
2417                 /*
2418                  * ok, read a full change from disk, now restore it into proper
2419                  * in-memory format
2420                  */
2421                 ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2422                 restored++;
2423         }
2424
2425         return restored;
2426 }
2427
2428 /*
2429  * Convert change from its on-disk format to in-memory format and queue it onto
2430  * the TXN's ->changes list.
2431  *
2432  * Note: although "data" is declared char*, at entry it points to a
2433  * maxalign'd buffer, making it safe in most of this function to assume
2434  * that the pointed-to data is suitably aligned for direct access.
2435  */
2436 static void
2437 ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2438                                                    char *data)
2439 {
2440         ReorderBufferDiskChange *ondisk;
2441         ReorderBufferChange *change;
2442
2443         ondisk = (ReorderBufferDiskChange *) data;
2444
2445         change = ReorderBufferGetChange(rb);
2446
2447         /* copy static part */
2448         memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2449
2450         data += sizeof(ReorderBufferDiskChange);
2451
2452         /* restore individual stuff */
2453         switch (change->action)
2454         {
2455                         /* fall through these, they're all similar enough */
2456                 case REORDER_BUFFER_CHANGE_INSERT:
2457                 case REORDER_BUFFER_CHANGE_UPDATE:
2458                 case REORDER_BUFFER_CHANGE_DELETE:
2459                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2460                         if (change->data.tp.oldtuple)
2461                         {
2462                                 uint32          tuplelen = ((HeapTuple) data)->t_len;
2463
2464                                 change->data.tp.oldtuple =
2465                                         ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2466
2467                                 /* restore ->tuple */
2468                                 memcpy(&change->data.tp.oldtuple->tuple, data,
2469                                            sizeof(HeapTupleData));
2470                                 data += sizeof(HeapTupleData);
2471
2472                                 /* reset t_data pointer into the new tuplebuf */
2473                                 change->data.tp.oldtuple->tuple.t_data =
2474                                         ReorderBufferTupleBufData(change->data.tp.oldtuple);
2475
2476                                 /* restore tuple data itself */
2477                                 memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2478                                 data += tuplelen;
2479                         }
2480
2481                         if (change->data.tp.newtuple)
2482                         {
2483                                 /* here, data might not be suitably aligned! */
2484                                 uint32          tuplelen;
2485
2486                                 memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2487                                            sizeof(uint32));
2488
2489                                 change->data.tp.newtuple =
2490                                         ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2491
2492                                 /* restore ->tuple */
2493                                 memcpy(&change->data.tp.newtuple->tuple, data,
2494                                            sizeof(HeapTupleData));
2495                                 data += sizeof(HeapTupleData);
2496
2497                                 /* reset t_data pointer into the new tuplebuf */
2498                                 change->data.tp.newtuple->tuple.t_data =
2499                                         ReorderBufferTupleBufData(change->data.tp.newtuple);
2500
2501                                 /* restore tuple data itself */
2502                                 memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2503                                 data += tuplelen;
2504                         }
2505
2506                         break;
2507                 case REORDER_BUFFER_CHANGE_MESSAGE:
2508                         {
2509                                 Size            prefix_size;
2510
2511                                 /* read prefix */
2512                                 memcpy(&prefix_size, data, sizeof(Size));
2513                                 data += sizeof(Size);
2514                                 change->data.msg.prefix = MemoryContextAlloc(rb->context,
2515                                                                                                                          prefix_size);
2516                                 memcpy(change->data.msg.prefix, data, prefix_size);
2517                                 Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2518                                 data += prefix_size;
2519
2520                                 /* read the message */
2521                                 memcpy(&change->data.msg.message_size, data, sizeof(Size));
2522                                 data += sizeof(Size);
2523                                 change->data.msg.message = MemoryContextAlloc(rb->context,
2524                                                                                           change->data.msg.message_size);
2525                                 memcpy(change->data.msg.message, data,
2526                                            change->data.msg.message_size);
2527                                 data += change->data.msg.message_size;
2528
2529                                 break;
2530                         }
2531                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2532                         {
2533                                 Snapshot        oldsnap;
2534                                 Snapshot        newsnap;
2535                                 Size            size;
2536
2537                                 oldsnap = (Snapshot) data;
2538
2539                                 size = sizeof(SnapshotData) +
2540                                         sizeof(TransactionId) * oldsnap->xcnt +
2541                                         sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2542
2543                                 change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2544
2545                                 newsnap = change->data.snapshot;
2546
2547                                 memcpy(newsnap, data, size);
2548                                 newsnap->xip = (TransactionId *)
2549                                         (((char *) newsnap) + sizeof(SnapshotData));
2550                                 newsnap->subxip = newsnap->xip + newsnap->xcnt;
2551                                 newsnap->copied = true;
2552                                 break;
2553                         }
2554                         /* the base struct contains all the data, easy peasy */
2555                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2556                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2557                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2558                         break;
2559         }
2560
2561         dlist_push_tail(&txn->changes, &change->node);
2562         txn->nentries_mem++;
2563 }
2564
2565 /*
2566  * Remove all on-disk stored for the passed in transaction.
2567  */
2568 static void
2569 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
2570 {
2571         XLogSegNo       first;
2572         XLogSegNo       cur;
2573         XLogSegNo       last;
2574
2575         Assert(txn->first_lsn != InvalidXLogRecPtr);
2576         Assert(txn->final_lsn != InvalidXLogRecPtr);
2577
2578         XLByteToSeg(txn->first_lsn, first);
2579         XLByteToSeg(txn->final_lsn, last);
2580
2581         /* iterate over all possible filenames, and delete them */
2582         for (cur = first; cur <= last; cur++)
2583         {
2584                 char            path[MAXPGPATH];
2585                 XLogRecPtr      recptr;
2586
2587                 XLogSegNoOffsetToRecPtr(cur, 0, recptr);
2588
2589                 sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2590                                 NameStr(MyReplicationSlot->data.name), txn->xid,
2591                                 (uint32) (recptr >> 32), (uint32) recptr);
2592                 if (unlink(path) != 0 && errno != ENOENT)
2593                         ereport(ERROR,
2594                                         (errcode_for_file_access(),
2595                                          errmsg("could not remove file \"%s\": %m", path)));
2596         }
2597 }
2598
2599 /*
2600  * Delete all data spilled to disk after we've restarted/crashed. It will be
2601  * recreated when the respective slots are reused.
2602  */
2603 void
2604 StartupReorderBuffer(void)
2605 {
2606         DIR                *logical_dir;
2607         struct dirent *logical_de;
2608
2609         DIR                *spill_dir;
2610         struct dirent *spill_de;
2611
2612         logical_dir = AllocateDir("pg_replslot");
2613         while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2614         {
2615                 struct stat statbuf;
2616                 char            path[MAXPGPATH * 2 + 12];
2617
2618                 if (strcmp(logical_de->d_name, ".") == 0 ||
2619                         strcmp(logical_de->d_name, "..") == 0)
2620                         continue;
2621
2622                 /* if it cannot be a slot, skip the directory */
2623                 if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2624                         continue;
2625
2626                 /*
2627                  * ok, has to be a surviving logical slot, iterate and delete
2628                  * everything starting with xid-*
2629                  */
2630                 sprintf(path, "pg_replslot/%s", logical_de->d_name);
2631
2632                 /* we're only creating directories here, skip if it's not our's */
2633                 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2634                         continue;
2635
2636                 spill_dir = AllocateDir(path);
2637                 while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2638                 {
2639                         if (strcmp(spill_de->d_name, ".") == 0 ||
2640                                 strcmp(spill_de->d_name, "..") == 0)
2641                                 continue;
2642
2643                         /* only look at names that can be ours */
2644                         if (strncmp(spill_de->d_name, "xid", 3) == 0)
2645                         {
2646                                 sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2647                                                 spill_de->d_name);
2648
2649                                 if (unlink(path) != 0)
2650                                         ereport(PANIC,
2651                                                         (errcode_for_file_access(),
2652                                                          errmsg("could not remove file \"%s\": %m",
2653                                                                         path)));
2654                         }
2655                 }
2656                 FreeDir(spill_dir);
2657         }
2658         FreeDir(logical_dir);
2659 }
2660
2661 /* ---------------------------------------
2662  * toast reassembly support
2663  * ---------------------------------------
2664  */
2665
2666 /*
2667  * Initialize per tuple toast reconstruction support.
2668  */
2669 static void
2670 ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
2671 {
2672         HASHCTL         hash_ctl;
2673
2674         Assert(txn->toast_hash == NULL);
2675
2676         memset(&hash_ctl, 0, sizeof(hash_ctl));
2677         hash_ctl.keysize = sizeof(Oid);
2678         hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2679         hash_ctl.hcxt = rb->context;
2680         txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2681                                                                   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
2682 }
2683
2684 /*
2685  * Per toast-chunk handling for toast reconstruction
2686  *
2687  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
2688  * toasted Datum comes along.
2689  */
2690 static void
2691 ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
2692                                                           Relation relation, ReorderBufferChange *change)
2693 {
2694         ReorderBufferToastEnt *ent;
2695         ReorderBufferTupleBuf *newtup;
2696         bool            found;
2697         int32           chunksize;
2698         bool            isnull;
2699         Pointer         chunk;
2700         TupleDesc       desc = RelationGetDescr(relation);
2701         Oid                     chunk_id;
2702         int32           chunk_seq;
2703
2704         if (txn->toast_hash == NULL)
2705                 ReorderBufferToastInitHash(rb, txn);
2706
2707         Assert(IsToastRelation(relation));
2708
2709         newtup = change->data.tp.newtuple;
2710         chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2711         Assert(!isnull);
2712         chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2713         Assert(!isnull);
2714
2715         ent = (ReorderBufferToastEnt *)
2716                 hash_search(txn->toast_hash,
2717                                         (void *) &chunk_id,
2718                                         HASH_ENTER,
2719                                         &found);
2720
2721         if (!found)
2722         {
2723                 Assert(ent->chunk_id == chunk_id);
2724                 ent->num_chunks = 0;
2725                 ent->last_chunk_seq = 0;
2726                 ent->size = 0;
2727                 ent->reconstructed = NULL;
2728                 dlist_init(&ent->chunks);
2729
2730                 if (chunk_seq != 0)
2731                         elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2732                                  chunk_seq, chunk_id);
2733         }
2734         else if (found && chunk_seq != ent->last_chunk_seq + 1)
2735                 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2736                          chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2737
2738         chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2739         Assert(!isnull);
2740
2741         /* calculate size so we can allocate the right size at once later */
2742         if (!VARATT_IS_EXTENDED(chunk))
2743                 chunksize = VARSIZE(chunk) - VARHDRSZ;
2744         else if (VARATT_IS_SHORT(chunk))
2745                 /* could happen due to heap_form_tuple doing its thing */
2746                 chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2747         else
2748                 elog(ERROR, "unexpected type of toast chunk");
2749
2750         ent->size += chunksize;
2751         ent->last_chunk_seq = chunk_seq;
2752         ent->num_chunks++;
2753         dlist_push_tail(&ent->chunks, &change->node);
2754 }
2755
2756 /*
2757  * Rejigger change->newtuple to point to in-memory toast tuples instead to
2758  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
2759  *
2760  * We cannot replace unchanged toast tuples though, so those will still point
2761  * to on-disk toast data.
2762  */
2763 static void
2764 ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
2765                                                   Relation relation, ReorderBufferChange *change)
2766 {
2767         TupleDesc       desc;
2768         int                     natt;
2769         Datum      *attrs;
2770         bool       *isnull;
2771         bool       *free;
2772         HeapTuple       tmphtup;
2773         Relation        toast_rel;
2774         TupleDesc       toast_desc;
2775         MemoryContext oldcontext;
2776         ReorderBufferTupleBuf *newtup;
2777
2778         /* no toast tuples changed */
2779         if (txn->toast_hash == NULL)
2780                 return;
2781
2782         oldcontext = MemoryContextSwitchTo(rb->context);
2783
2784         /* we should only have toast tuples in an INSERT or UPDATE */
2785         Assert(change->data.tp.newtuple);
2786
2787         desc = RelationGetDescr(relation);
2788
2789         toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
2790         toast_desc = RelationGetDescr(toast_rel);
2791
2792         /* should we allocate from stack instead? */
2793         attrs = palloc0(sizeof(Datum) * desc->natts);
2794         isnull = palloc0(sizeof(bool) * desc->natts);
2795         free = palloc0(sizeof(bool) * desc->natts);
2796
2797         newtup = change->data.tp.newtuple;
2798
2799         heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
2800
2801         for (natt = 0; natt < desc->natts; natt++)
2802         {
2803                 Form_pg_attribute attr = desc->attrs[natt];
2804                 ReorderBufferToastEnt *ent;
2805                 struct varlena *varlena;
2806
2807                 /* va_rawsize is the size of the original datum -- including header */
2808                 struct varatt_external toast_pointer;
2809                 struct varatt_indirect redirect_pointer;
2810                 struct varlena *new_datum = NULL;
2811                 struct varlena *reconstructed;
2812                 dlist_iter      it;
2813                 Size            data_done = 0;
2814
2815                 /* system columns aren't toasted */
2816                 if (attr->attnum < 0)
2817                         continue;
2818
2819                 if (attr->attisdropped)
2820                         continue;
2821
2822                 /* not a varlena datatype */
2823                 if (attr->attlen != -1)
2824                         continue;
2825
2826                 /* no data */
2827                 if (isnull[natt])
2828                         continue;
2829
2830                 /* ok, we know we have a toast datum */
2831                 varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
2832
2833                 /* no need to do anything if the tuple isn't external */
2834                 if (!VARATT_IS_EXTERNAL(varlena))
2835                         continue;
2836
2837                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
2838
2839                 /*
2840                  * Check whether the toast tuple changed, replace if so.
2841                  */
2842                 ent = (ReorderBufferToastEnt *)
2843                         hash_search(txn->toast_hash,
2844                                                 (void *) &toast_pointer.va_valueid,
2845                                                 HASH_FIND,
2846                                                 NULL);
2847                 if (ent == NULL)
2848                         continue;
2849
2850                 new_datum =
2851                         (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
2852
2853                 free[natt] = true;
2854
2855                 reconstructed = palloc0(toast_pointer.va_rawsize);
2856
2857                 ent->reconstructed = reconstructed;
2858
2859                 /* stitch toast tuple back together from its parts */
2860                 dlist_foreach(it, &ent->chunks)
2861                 {
2862                         bool            isnull;
2863                         ReorderBufferChange *cchange;
2864                         ReorderBufferTupleBuf *ctup;
2865                         Pointer         chunk;
2866
2867                         cchange = dlist_container(ReorderBufferChange, node, it.cur);
2868                         ctup = cchange->data.tp.newtuple;
2869                         chunk = DatumGetPointer(
2870                                                   fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
2871
2872                         Assert(!isnull);
2873                         Assert(!VARATT_IS_EXTERNAL(chunk));
2874                         Assert(!VARATT_IS_SHORT(chunk));
2875
2876                         memcpy(VARDATA(reconstructed) + data_done,
2877                                    VARDATA(chunk),
2878                                    VARSIZE(chunk) - VARHDRSZ);
2879                         data_done += VARSIZE(chunk) - VARHDRSZ;
2880                 }
2881                 Assert(data_done == toast_pointer.va_extsize);
2882
2883                 /* make sure its marked as compressed or not */
2884                 if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2885                         SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
2886                 else
2887                         SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
2888
2889                 memset(&redirect_pointer, 0, sizeof(redirect_pointer));
2890                 redirect_pointer.pointer = reconstructed;
2891
2892                 SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
2893                 memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
2894                            sizeof(redirect_pointer));
2895
2896                 attrs[natt] = PointerGetDatum(new_datum);
2897         }
2898
2899         /*
2900          * Build tuple in separate memory & copy tuple back into the tuplebuf
2901          * passed to the output plugin. We can't directly heap_fill_tuple() into
2902          * the tuplebuf because attrs[] will point back into the current content.
2903          */
2904         tmphtup = heap_form_tuple(desc, attrs, isnull);
2905         Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
2906         Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
2907
2908         memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
2909         newtup->tuple.t_len = tmphtup->t_len;
2910
2911         /*
2912          * free resources we won't further need, more persistent stuff will be
2913          * free'd in ReorderBufferToastReset().
2914          */
2915         RelationClose(toast_rel);
2916         pfree(tmphtup);
2917         for (natt = 0; natt < desc->natts; natt++)
2918         {
2919                 if (free[natt])
2920                         pfree(DatumGetPointer(attrs[natt]));
2921         }
2922         pfree(attrs);
2923         pfree(free);
2924         pfree(isnull);
2925
2926         MemoryContextSwitchTo(oldcontext);
2927 }
2928
2929 /*
2930  * Free all resources allocated for toast reconstruction.
2931  */
2932 static void
2933 ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
2934 {
2935         HASH_SEQ_STATUS hstat;
2936         ReorderBufferToastEnt *ent;
2937
2938         if (txn->toast_hash == NULL)
2939                 return;
2940
2941         /* sequentially walk over the hash and free everything */
2942         hash_seq_init(&hstat, txn->toast_hash);
2943         while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
2944         {
2945                 dlist_mutable_iter it;
2946
2947                 if (ent->reconstructed != NULL)
2948                         pfree(ent->reconstructed);
2949
2950                 dlist_foreach_modify(it, &ent->chunks)
2951                 {
2952                         ReorderBufferChange *change =
2953                         dlist_container(ReorderBufferChange, node, it.cur);
2954
2955                         dlist_delete(&change->node);
2956                         ReorderBufferReturnChange(rb, change);
2957                 }
2958         }
2959
2960         hash_destroy(txn->toast_hash);
2961         txn->toast_hash = NULL;
2962 }
2963
2964
2965 /* ---------------------------------------
2966  * Visibility support for logical decoding
2967  *
2968  *
2969  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
2970  * always rely on stored cmin/cmax values because of two scenarios:
2971  *
2972  * * A tuple got changed multiple times during a single transaction and thus
2973  *       has got a combocid. Combocid's are only valid for the duration of a
2974  *       single transaction.
2975  * * A tuple with a cmin but no cmax (and thus no combocid) got
2976  *       deleted/updated in another transaction than the one which created it
2977  *       which we are looking at right now. As only one of cmin, cmax or combocid
2978  *       is actually stored in the heap we don't have access to the value we
2979  *       need anymore.
2980  *
2981  * To resolve those problems we have a per-transaction hash of (cmin,
2982  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
2983  * (cmin, cmax) values. That also takes care of combocids by simply
2984  * not caring about them at all. As we have the real cmin/cmax values
2985  * combocids aren't interesting.
2986  *
2987  * As we only care about catalog tuples here the overhead of this
2988  * hashtable should be acceptable.
2989  *
2990  * Heap rewrites complicate this a bit, check rewriteheap.c for
2991  * details.
2992  * -------------------------------------------------------------------------
2993  */
2994
2995 /* struct for qsort()ing mapping files by lsn somewhat efficiently */
2996 typedef struct RewriteMappingFile
2997 {
2998         XLogRecPtr      lsn;
2999         char            fname[MAXPGPATH];
3000 } RewriteMappingFile;
3001
3002 #if NOT_USED
3003 static void
3004 DisplayMapping(HTAB *tuplecid_data)
3005 {
3006         HASH_SEQ_STATUS hstat;
3007         ReorderBufferTupleCidEnt *ent;
3008
3009         hash_seq_init(&hstat, tuplecid_data);
3010         while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
3011         {
3012                 elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
3013                          ent->key.relnode.dbNode,
3014                          ent->key.relnode.spcNode,
3015                          ent->key.relnode.relNode,
3016                          ItemPointerGetBlockNumber(&ent->key.tid),
3017                          ItemPointerGetOffsetNumber(&ent->key.tid),
3018                          ent->cmin,
3019                          ent->cmax
3020                         );
3021         }
3022 }
3023 #endif
3024
3025 /*
3026  * Apply a single mapping file to tuplecid_data.
3027  *
3028  * The mapping file has to have been verified to be a) committed b) for our
3029  * transaction c) applied in LSN order.
3030  */
3031 static void
3032 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
3033 {
3034         char            path[MAXPGPATH];
3035         int                     fd;
3036         int                     readBytes;
3037         LogicalRewriteMappingData map;
3038
3039         sprintf(path, "pg_logical/mappings/%s", fname);
3040         fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
3041         if (fd < 0)
3042                 ereport(ERROR,
3043                                 (errcode_for_file_access(),
3044                                  errmsg("could not open file \"%s\": %m", path)));
3045
3046         while (true)
3047         {
3048                 ReorderBufferTupleCidKey key;
3049                 ReorderBufferTupleCidEnt *ent;
3050                 ReorderBufferTupleCidEnt *new_ent;
3051                 bool            found;
3052
3053                 /* be careful about padding */
3054                 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3055
3056                 /* read all mappings till the end of the file */
3057                 pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
3058                 readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3059                 pgstat_report_wait_end();
3060
3061                 if (readBytes < 0)
3062                         ereport(ERROR,
3063                                         (errcode_for_file_access(),
3064                                          errmsg("could not read file \"%s\": %m",
3065                                                         path)));
3066                 else if (readBytes == 0)        /* EOF */
3067                         break;
3068                 else if (readBytes != sizeof(LogicalRewriteMappingData))
3069                         ereport(ERROR,
3070                                         (errcode_for_file_access(),
3071                                          errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3072                                                         path, readBytes,
3073                                                         (int32) sizeof(LogicalRewriteMappingData))));
3074
3075                 key.relnode = map.old_node;
3076                 ItemPointerCopy(&map.old_tid,
3077                                                 &key.tid);
3078
3079
3080                 ent = (ReorderBufferTupleCidEnt *)
3081                         hash_search(tuplecid_data,
3082                                                 (void *) &key,
3083                                                 HASH_FIND,
3084                                                 NULL);
3085
3086                 /* no existing mapping, no need to update */
3087                 if (!ent)
3088                         continue;
3089
3090                 key.relnode = map.new_node;
3091                 ItemPointerCopy(&map.new_tid,
3092                                                 &key.tid);
3093
3094                 new_ent = (ReorderBufferTupleCidEnt *)
3095                         hash_search(tuplecid_data,
3096                                                 (void *) &key,
3097                                                 HASH_ENTER,
3098                                                 &found);
3099
3100                 if (found)
3101                 {
3102                         /*
3103                          * Make sure the existing mapping makes sense. We sometime update
3104                          * old records that did not yet have a cmax (e.g. pg_class' own
3105                          * entry while rewriting it) during rewrites, so allow that.
3106                          */
3107                         Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3108                         Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3109                 }
3110                 else
3111                 {
3112                         /* update mapping */
3113                         new_ent->cmin = ent->cmin;
3114                         new_ent->cmax = ent->cmax;
3115                         new_ent->combocid = ent->combocid;
3116                 }
3117         }
3118 }
3119
3120
3121 /*
3122  * Check whether the TransactionOId 'xid' is in the pre-sorted array 'xip'.
3123  */
3124 static bool
3125 TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
3126 {
3127         return bsearch(&xid, xip, num,
3128                                    sizeof(TransactionId), xidComparator) != NULL;
3129 }
3130
3131 /*
3132  * qsort() comparator for sorting RewriteMappingFiles in LSN order.
3133  */
3134 static int
3135 file_sort_by_lsn(const void *a_p, const void *b_p)
3136 {
3137         RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
3138         RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
3139
3140         if (a->lsn < b->lsn)
3141                 return -1;
3142         else if (a->lsn > b->lsn)
3143                 return 1;
3144         return 0;
3145 }
3146
3147 /*
3148  * Apply any existing logical remapping files if there are any targeted at our
3149  * transaction for relid.
3150  */
3151 static void
3152 UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
3153 {
3154         DIR                *mapping_dir;
3155         struct dirent *mapping_de;
3156         List       *files = NIL;
3157         ListCell   *file;
3158         RewriteMappingFile **files_a;
3159         size_t          off;
3160         Oid                     dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
3161
3162         mapping_dir = AllocateDir("pg_logical/mappings");
3163         while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
3164         {
3165                 Oid                     f_dboid;
3166                 Oid                     f_relid;
3167                 TransactionId f_mapped_xid;
3168                 TransactionId f_create_xid;
3169                 XLogRecPtr      f_lsn;
3170                 uint32          f_hi,
3171                                         f_lo;
3172                 RewriteMappingFile *f;
3173
3174                 if (strcmp(mapping_de->d_name, ".") == 0 ||
3175                         strcmp(mapping_de->d_name, "..") == 0)
3176                         continue;
3177
3178                 /* Ignore files that aren't ours */
3179                 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
3180                         continue;
3181
3182                 if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
3183                                    &f_dboid, &f_relid, &f_hi, &f_lo,
3184                                    &f_mapped_xid, &f_create_xid) != 6)
3185                         elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
3186
3187                 f_lsn = ((uint64) f_hi) << 32 | f_lo;
3188
3189                 /* mapping for another database */
3190                 if (f_dboid != dboid)
3191                         continue;
3192
3193                 /* mapping for another relation */
3194                 if (f_relid != relid)
3195                         continue;
3196
3197                 /* did the creating transaction abort? */
3198                 if (!TransactionIdDidCommit(f_create_xid))
3199                         continue;
3200
3201                 /* not for our transaction */
3202                 if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
3203                         continue;
3204
3205                 /* ok, relevant, queue for apply */
3206                 f = palloc(sizeof(RewriteMappingFile));
3207                 f->lsn = f_lsn;
3208                 strcpy(f->fname, mapping_de->d_name);
3209                 files = lappend(files, f);
3210         }
3211         FreeDir(mapping_dir);
3212
3213         /* build array we can easily sort */
3214         files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
3215         off = 0;
3216         foreach(file, files)
3217         {
3218                 files_a[off++] = lfirst(file);
3219         }
3220
3221         /* sort files so we apply them in LSN order */
3222         qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
3223                   file_sort_by_lsn);
3224
3225         for (off = 0; off < list_length(files); off++)
3226         {
3227                 RewriteMappingFile *f = files_a[off];
3228
3229                 elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
3230                          snapshot->subxip[0]);
3231                 ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
3232                 pfree(f);
3233         }
3234 }
3235
3236 /*
3237  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
3238  * combocids.
3239  */
3240 bool
3241 ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
3242                                                           Snapshot snapshot,
3243                                                           HeapTuple htup, Buffer buffer,
3244                                                           CommandId *cmin, CommandId *cmax)
3245 {
3246         ReorderBufferTupleCidKey key;
3247         ReorderBufferTupleCidEnt *ent;
3248         ForkNumber      forkno;
3249         BlockNumber blockno;
3250         bool            updated_mapping = false;
3251
3252         /* be careful about padding */
3253         memset(&key, 0, sizeof(key));
3254
3255         Assert(!BufferIsLocal(buffer));
3256
3257         /*
3258          * get relfilenode from the buffer, no convenient way to access it other
3259          * than that.
3260          */
3261         BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3262
3263         /* tuples can only be in the main fork */
3264         Assert(forkno == MAIN_FORKNUM);
3265         Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3266
3267         ItemPointerCopy(&htup->t_self,
3268                                         &key.tid);
3269
3270 restart:
3271         ent = (ReorderBufferTupleCidEnt *)
3272                 hash_search(tuplecid_data,
3273                                         (void *) &key,
3274                                         HASH_FIND,
3275                                         NULL);
3276
3277         /*
3278          * failed to find a mapping, check whether the table was rewritten and
3279          * apply mapping if so, but only do that once - there can be no new
3280          * mappings while we are in here since we have to hold a lock on the
3281          * relation.
3282          */
3283         if (ent == NULL && !updated_mapping)
3284         {
3285                 UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3286                 /* now check but don't update for a mapping again */
3287                 updated_mapping = true;
3288                 goto restart;
3289         }
3290         else if (ent == NULL)
3291                 return false;
3292
3293         if (cmin)
3294                 *cmin = ent->cmin;
3295         if (cmax)
3296                 *cmax = ent->cmax;
3297         return true;
3298 }