]> granicus.if.org Git - postgresql/blob - src/backend/replication/logical/reorderbuffer.c
Improve hash_create's API for selecting simple-binary-key hash functions.
[postgresql] / src / backend / replication / logical / reorderbuffer.c
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  *        PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2014, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  *        This module gets handed individual pieces of transactions in the order
15  *        they are written to the WAL and is responsible to reassemble them into
16  *        toplevel transaction sized pieces. When a transaction is completely
17  *        reassembled - signalled by reading the transaction commit record - it
18  *        will then call the output plugin (c.f. ReorderBufferCommit()) with the
19  *        individual changes. The output plugins rely on snapshots built by
20  *        snapbuild.c which hands them to us.
21  *
22  *        Transactions and subtransactions/savepoints in postgres are not
23  *        immediately linked to each other from outside the performing
24  *        backend. Only at commit/abort (or special xact_assignment records) they
25  *        are linked together. Which means that we will have to splice together a
26  *        toplevel transaction from its subtransactions. To do that efficiently we
27  *        build a binary heap indexed by the smallest current lsn of the individual
28  *        subtransactions' changestreams. As the individual streams are inherently
29  *        ordered by LSN - since that is where we build them from - the transaction
30  *        can easily be reassembled by always using the subtransaction with the
31  *        smallest current LSN from the heap.
32  *
33  *        In order to cope with large transactions - which can be several times as
34  *        big as the available memory - this module supports spooling the contents
35  *        of a large transactions to disk. When the transaction is replayed the
36  *        contents of individual (sub-)transactions will be read from disk in
37  *        chunks.
38  *
39  *        This module also has to deal with reassembling toast records from the
40  *        individual chunks stored in WAL. When a new (or initial) version of a
41  *        tuple is stored in WAL it will always be preceded by the toast chunks
42  *        emitted for the columns stored out of line. Within a single toplevel
43  *        transaction there will be no other data carrying records between a row's
44  *        toast chunks and the row data itself. See ReorderBufferToast* for
45  *        details.
46  * -------------------------------------------------------------------------
47  */
48 #include "postgres.h"
49
50 #include <unistd.h>
51 #include <sys/stat.h>
52
53 #include "access/rewriteheap.h"
54 #include "access/transam.h"
55 #include "access/tuptoaster.h"
56 #include "access/xact.h"
57 #include "access/xlog_internal.h"
58 #include "catalog/catalog.h"
59 #include "lib/binaryheap.h"
60 #include "miscadmin.h"
61 #include "replication/logical.h"
62 #include "replication/reorderbuffer.h"
63 #include "replication/slot.h"
64 #include "replication/snapbuild.h"              /* just for SnapBuildSnapDecRefcount */
65 #include "storage/bufmgr.h"
66 #include "storage/fd.h"
67 #include "storage/sinval.h"
68 #include "utils/builtins.h"
69 #include "utils/combocid.h"
70 #include "utils/memdebug.h"
71 #include "utils/memutils.h"
72 #include "utils/relcache.h"
73 #include "utils/relfilenodemap.h"
74 #include "utils/tqual.h"
75
76
77 /* entry for a hash table we use to map from xid to our transaction state */
78 typedef struct ReorderBufferTXNByIdEnt
79 {
80         TransactionId xid;
81         ReorderBufferTXN *txn;
82 } ReorderBufferTXNByIdEnt;
83
84 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
85 typedef struct ReorderBufferTupleCidKey
86 {
87         RelFileNode relnode;
88         ItemPointerData tid;
89 } ReorderBufferTupleCidKey;
90
91 typedef struct ReorderBufferTupleCidEnt
92 {
93         ReorderBufferTupleCidKey key;
94         CommandId       cmin;
95         CommandId       cmax;
96         CommandId       combocid;               /* just for debugging */
97 } ReorderBufferTupleCidEnt;
98
99 /* k-way in-order change iteration support structures */
100 typedef struct ReorderBufferIterTXNEntry
101 {
102         XLogRecPtr      lsn;
103         ReorderBufferChange *change;
104         ReorderBufferTXN *txn;
105         int                     fd;
106         XLogSegNo       segno;
107 } ReorderBufferIterTXNEntry;
108
109 typedef struct ReorderBufferIterTXNState
110 {
111         binaryheap *heap;
112         Size            nr_txns;
113         dlist_head      old_change;
114         ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
115 } ReorderBufferIterTXNState;
116
117 /* toast datastructures */
118 typedef struct ReorderBufferToastEnt
119 {
120         Oid                     chunk_id;               /* toast_table.chunk_id */
121         int32           last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
122                                                                  * have seen */
123         Size            num_chunks;             /* number of chunks we've already seen */
124         Size            size;                   /* combined size of chunks seen */
125         dlist_head      chunks;                 /* linked list of chunks */
126         struct varlena *reconstructed;          /* reconstructed varlena now pointed
127                                                                                  * to in main tup */
128 } ReorderBufferToastEnt;
129
130 /* Disk serialization support datastructures */
131 typedef struct ReorderBufferDiskChange
132 {
133         Size            size;
134         ReorderBufferChange change;
135         /* data follows */
136 } ReorderBufferDiskChange;
137
138 /*
139  * Maximum number of changes kept in memory, per transaction. After that,
140  * changes are spooled to disk.
141  *
142  * The current value should be sufficient to decode the entire transaction
143  * without hitting disk in OLTP workloads, while starting to spool to disk in
144  * other workloads reasonably fast.
145  *
146  * At some point in the future it probaly makes sense to have a more elaborate
147  * resource management here, but it's not entirely clear what that would look
148  * like.
149  */
150 static const Size max_changes_in_memory = 4096;
151
152 /*
153  * We use a very simple form of a slab allocator for frequently allocated
154  * objects, simply keeping a fixed number in a linked list when unused,
155  * instead pfree()ing them. Without that in many workloads aset.c becomes a
156  * major bottleneck, especially when spilling to disk while decoding batch
157  * workloads.
158  */
159 static const Size max_cached_changes = 4096 * 2;
160 static const Size max_cached_tuplebufs = 4096 * 2;              /* ~8MB */
161 static const Size max_cached_transactions = 512;
162
163
164 /* ---------------------------------------
165  * primary reorderbuffer support routines
166  * ---------------------------------------
167  */
168 static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
169 static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
170 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
171                                           TransactionId xid, bool create, bool *is_new,
172                                           XLogRecPtr lsn, bool create_as_top);
173
174 static void AssertTXNLsnOrder(ReorderBuffer *rb);
175
176 /* ---------------------------------------
177  * support functions for lsn-order iterating over the ->changes of a
178  * transaction and its subtransactions
179  *
180  * used for iteration over the k-way heap merge of a transaction and its
181  * subtransactions
182  * ---------------------------------------
183  */
184 static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn);
185 static ReorderBufferChange *
186                         ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
187 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
188                                                    ReorderBufferIterTXNState *state);
189 static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
190
191 /*
192  * ---------------------------------------
193  * Disk serialization support functions
194  * ---------------------------------------
195  */
196 static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
197 static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
198 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
199                                                          int fd, ReorderBufferChange *change);
200 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
201                                                         int *fd, XLogSegNo *segno);
202 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
203                                                    char *change);
204 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
205
206 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
207 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
208                                           ReorderBufferTXN *txn, CommandId cid);
209
210 /* ---------------------------------------
211  * toast reassembly support
212  * ---------------------------------------
213  */
214 static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
215 static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
216 static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
217                                                   Relation relation, ReorderBufferChange *change);
218 static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
219                                                           Relation relation, ReorderBufferChange *change);
220
221
222 /*
223  * Allocate a new ReorderBuffer
224  */
225 ReorderBuffer *
226 ReorderBufferAllocate(void)
227 {
228         ReorderBuffer *buffer;
229         HASHCTL         hash_ctl;
230         MemoryContext new_ctx;
231
232         /* allocate memory in own context, to have better accountability */
233         new_ctx = AllocSetContextCreate(CurrentMemoryContext,
234                                                                         "ReorderBuffer",
235                                                                         ALLOCSET_DEFAULT_MINSIZE,
236                                                                         ALLOCSET_DEFAULT_INITSIZE,
237                                                                         ALLOCSET_DEFAULT_MAXSIZE);
238
239         buffer =
240                 (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
241
242         memset(&hash_ctl, 0, sizeof(hash_ctl));
243
244         buffer->context = new_ctx;
245
246         hash_ctl.keysize = sizeof(TransactionId);
247         hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
248         hash_ctl.hcxt = buffer->context;
249
250         buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
251                                                                  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
252
253         buffer->by_txn_last_xid = InvalidTransactionId;
254         buffer->by_txn_last_txn = NULL;
255
256         buffer->nr_cached_transactions = 0;
257         buffer->nr_cached_changes = 0;
258         buffer->nr_cached_tuplebufs = 0;
259
260         buffer->outbuf = NULL;
261         buffer->outbufsize = 0;
262
263         buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
264
265         dlist_init(&buffer->toplevel_by_lsn);
266         dlist_init(&buffer->cached_transactions);
267         dlist_init(&buffer->cached_changes);
268         slist_init(&buffer->cached_tuplebufs);
269
270         return buffer;
271 }
272
273 /*
274  * Free a ReorderBuffer
275  */
276 void
277 ReorderBufferFree(ReorderBuffer *rb)
278 {
279         MemoryContext context = rb->context;
280
281         /*
282          * We free separately allocated data by entirely scrapping reorderbuffer's
283          * memory context.
284          */
285         MemoryContextDelete(context);
286 }
287
288 /*
289  * Get a unused, possibly preallocated, ReorderBufferTXN.
290  */
291 static ReorderBufferTXN *
292 ReorderBufferGetTXN(ReorderBuffer *rb)
293 {
294         ReorderBufferTXN *txn;
295
296         /* check the slab cache */
297         if (rb->nr_cached_transactions > 0)
298         {
299                 rb->nr_cached_transactions--;
300                 txn = (ReorderBufferTXN *)
301                         dlist_container(ReorderBufferTXN, node,
302                                                         dlist_pop_head_node(&rb->cached_transactions));
303         }
304         else
305         {
306                 txn = (ReorderBufferTXN *)
307                         MemoryContextAlloc(rb->context, sizeof(ReorderBufferTXN));
308         }
309
310         memset(txn, 0, sizeof(ReorderBufferTXN));
311
312         dlist_init(&txn->changes);
313         dlist_init(&txn->tuplecids);
314         dlist_init(&txn->subtxns);
315
316         return txn;
317 }
318
319 /*
320  * Free a ReorderBufferTXN.
321  *
322  * Deallocation might be delayed for efficiency purposes, for details check
323  * the comments above max_cached_changes's definition.
324  */
325 static void
326 ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
327 {
328         /* clean the lookup cache if we were cached (quite likely) */
329         if (rb->by_txn_last_xid == txn->xid)
330         {
331                 rb->by_txn_last_xid = InvalidTransactionId;
332                 rb->by_txn_last_txn = NULL;
333         }
334
335         /* free data that's contained */
336
337         if (txn->tuplecid_hash != NULL)
338         {
339                 hash_destroy(txn->tuplecid_hash);
340                 txn->tuplecid_hash = NULL;
341         }
342
343         if (txn->invalidations)
344         {
345                 pfree(txn->invalidations);
346                 txn->invalidations = NULL;
347         }
348
349         /* check whether to put into the slab cache */
350         if (rb->nr_cached_transactions < max_cached_transactions)
351         {
352                 rb->nr_cached_transactions++;
353                 dlist_push_head(&rb->cached_transactions, &txn->node);
354                 VALGRIND_MAKE_MEM_UNDEFINED(txn, sizeof(ReorderBufferTXN));
355                 VALGRIND_MAKE_MEM_DEFINED(&txn->node, sizeof(txn->node));
356         }
357         else
358         {
359                 pfree(txn);
360         }
361 }
362
363 /*
364  * Get a unused, possibly preallocated, ReorderBufferChange.
365  */
366 ReorderBufferChange *
367 ReorderBufferGetChange(ReorderBuffer *rb)
368 {
369         ReorderBufferChange *change;
370
371         /* check the slab cache */
372         if (rb->nr_cached_changes)
373         {
374                 rb->nr_cached_changes--;
375                 change = (ReorderBufferChange *)
376                         dlist_container(ReorderBufferChange, node,
377                                                         dlist_pop_head_node(&rb->cached_changes));
378         }
379         else
380         {
381                 change = (ReorderBufferChange *)
382                         MemoryContextAlloc(rb->context, sizeof(ReorderBufferChange));
383         }
384
385         memset(change, 0, sizeof(ReorderBufferChange));
386         return change;
387 }
388
389 /*
390  * Free an ReorderBufferChange.
391  *
392  * Deallocation might be delayed for efficiency purposes, for details check
393  * the comments above max_cached_changes's definition.
394  */
395 void
396 ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
397 {
398         /* free contained data */
399         switch (change->action)
400         {
401                 case REORDER_BUFFER_CHANGE_INSERT:
402                 case REORDER_BUFFER_CHANGE_UPDATE:
403                 case REORDER_BUFFER_CHANGE_DELETE:
404                         if (change->data.tp.newtuple)
405                         {
406                                 ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
407                                 change->data.tp.newtuple = NULL;
408                         }
409
410                         if (change->data.tp.oldtuple)
411                         {
412                                 ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
413                                 change->data.tp.oldtuple = NULL;
414                         }
415                         break;
416                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
417                         if (change->data.snapshot)
418                         {
419                                 ReorderBufferFreeSnap(rb, change->data.snapshot);
420                                 change->data.snapshot = NULL;
421                         }
422                         break;
423                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
424                         break;
425                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
426                         break;
427         }
428
429         /* check whether to put into the slab cache */
430         if (rb->nr_cached_changes < max_cached_changes)
431         {
432                 rb->nr_cached_changes++;
433                 dlist_push_head(&rb->cached_changes, &change->node);
434                 VALGRIND_MAKE_MEM_UNDEFINED(change, sizeof(ReorderBufferChange));
435                 VALGRIND_MAKE_MEM_DEFINED(&change->node, sizeof(change->node));
436         }
437         else
438         {
439                 pfree(change);
440         }
441 }
442
443
444 /*
445  * Get a unused, possibly preallocated, ReorderBufferTupleBuf
446  */
447 ReorderBufferTupleBuf *
448 ReorderBufferGetTupleBuf(ReorderBuffer *rb)
449 {
450         ReorderBufferTupleBuf *tuple;
451
452         /* check the slab cache */
453         if (rb->nr_cached_tuplebufs)
454         {
455                 rb->nr_cached_tuplebufs--;
456                 tuple = slist_container(ReorderBufferTupleBuf, node,
457                                                                 slist_pop_head_node(&rb->cached_tuplebufs));
458 #ifdef USE_ASSERT_CHECKING
459                 memset(tuple, 0xa9, sizeof(ReorderBufferTupleBuf));
460 #endif
461         }
462         else
463         {
464                 tuple = (ReorderBufferTupleBuf *)
465                         MemoryContextAlloc(rb->context, sizeof(ReorderBufferTupleBuf));
466         }
467
468         return tuple;
469 }
470
471 /*
472  * Free an ReorderBufferTupleBuf.
473  *
474  * Deallocation might be delayed for efficiency purposes, for details check
475  * the comments above max_cached_changes's definition.
476  */
477 void
478 ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
479 {
480         /* check whether to put into the slab cache */
481         if (rb->nr_cached_tuplebufs < max_cached_tuplebufs)
482         {
483                 rb->nr_cached_tuplebufs++;
484                 slist_push_head(&rb->cached_tuplebufs, &tuple->node);
485                 VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf));
486                 VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
487         }
488         else
489         {
490                 pfree(tuple);
491         }
492 }
493
494 /*
495  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
496  * If create is true, and a transaction doesn't already exist, create it
497  * (with the given LSN, and as top transaction if that's specified);
498  * when this happens, is_new is set to true.
499  */
500 static ReorderBufferTXN *
501 ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
502                                           bool *is_new, XLogRecPtr lsn, bool create_as_top)
503 {
504         ReorderBufferTXN *txn;
505         ReorderBufferTXNByIdEnt *ent;
506         bool            found;
507
508         Assert(TransactionIdIsValid(xid));
509         Assert(!create || lsn != InvalidXLogRecPtr);
510
511         /*
512          * Check the one-entry lookup cache first
513          */
514         if (TransactionIdIsValid(rb->by_txn_last_xid) &&
515                 rb->by_txn_last_xid == xid)
516         {
517                 txn = rb->by_txn_last_txn;
518
519                 if (txn != NULL)
520                 {
521                         /* found it, and it's valid */
522                         if (is_new)
523                                 *is_new = false;
524                         return txn;
525                 }
526
527                 /*
528                  * cached as non-existant, and asked not to create? Then nothing else
529                  * to do.
530                  */
531                 if (!create)
532                         return NULL;
533                 /* otherwise fall through to create it */
534         }
535
536         /*
537          * If the cache wasn't hit or it yielded an "does-not-exist" and we want
538          * to create an entry.
539          */
540
541         /* search the lookup table */
542         ent = (ReorderBufferTXNByIdEnt *)
543                 hash_search(rb->by_txn,
544                                         (void *) &xid,
545                                         create ? HASH_ENTER : HASH_FIND,
546                                         &found);
547         if (found)
548                 txn = ent->txn;
549         else if (create)
550         {
551                 /* initialize the new entry, if creation was requested */
552                 Assert(ent != NULL);
553
554                 ent->txn = ReorderBufferGetTXN(rb);
555                 ent->txn->xid = xid;
556                 txn = ent->txn;
557                 txn->first_lsn = lsn;
558                 txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
559
560                 if (create_as_top)
561                 {
562                         dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
563                         AssertTXNLsnOrder(rb);
564                 }
565         }
566         else
567                 txn = NULL;                             /* not found and not asked to create */
568
569         /* update cache */
570         rb->by_txn_last_xid = xid;
571         rb->by_txn_last_txn = txn;
572
573         if (is_new)
574                 *is_new = !found;
575
576         Assert(!create || !!txn);
577         return txn;
578 }
579
580 /*
581  * Queue a change into a transaction so it can be replayed upon commit.
582  */
583 void
584 ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
585                                                  ReorderBufferChange *change)
586 {
587         ReorderBufferTXN *txn;
588
589         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
590
591         change->lsn = lsn;
592         Assert(InvalidXLogRecPtr != lsn);
593         dlist_push_tail(&txn->changes, &change->node);
594         txn->nentries++;
595         txn->nentries_mem++;
596
597         ReorderBufferCheckSerializeTXN(rb, txn);
598 }
599
600 static void
601 AssertTXNLsnOrder(ReorderBuffer *rb)
602 {
603 #ifdef USE_ASSERT_CHECKING
604         dlist_iter      iter;
605         XLogRecPtr      prev_first_lsn = InvalidXLogRecPtr;
606
607         dlist_foreach(iter, &rb->toplevel_by_lsn)
608         {
609                 ReorderBufferTXN *cur_txn;
610
611                 cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
612                 Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
613
614                 if (cur_txn->end_lsn != InvalidXLogRecPtr)
615                         Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
616
617                 if (prev_first_lsn != InvalidXLogRecPtr)
618                         Assert(prev_first_lsn < cur_txn->first_lsn);
619
620                 Assert(!cur_txn->is_known_as_subxact);
621                 prev_first_lsn = cur_txn->first_lsn;
622         }
623 #endif
624 }
625
626 ReorderBufferTXN *
627 ReorderBufferGetOldestTXN(ReorderBuffer *rb)
628 {
629         ReorderBufferTXN *txn;
630
631         if (dlist_is_empty(&rb->toplevel_by_lsn))
632                 return NULL;
633
634         AssertTXNLsnOrder(rb);
635
636         txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
637
638         Assert(!txn->is_known_as_subxact);
639         Assert(txn->first_lsn != InvalidXLogRecPtr);
640         return txn;
641 }
642
643 void
644 ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
645 {
646         rb->current_restart_decoding_lsn = ptr;
647 }
648
649 void
650 ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
651                                                  TransactionId subxid, XLogRecPtr lsn)
652 {
653         ReorderBufferTXN *txn;
654         ReorderBufferTXN *subtxn;
655         bool            new_top;
656         bool            new_sub;
657
658         txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
659         subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
660
661         if (new_sub)
662         {
663                 /*
664                  * we assign subtransactions to top level transaction even if we don't
665                  * have data for it yet, assignment records frequently reference xids
666                  * that have not yet produced any records. Knowing those aren't top
667                  * level xids allows us to make processing cheaper in some places.
668                  */
669                 dlist_push_tail(&txn->subtxns, &subtxn->node);
670                 txn->nsubtxns++;
671         }
672         else if (!subtxn->is_known_as_subxact)
673         {
674                 subtxn->is_known_as_subxact = true;
675                 Assert(subtxn->nsubtxns == 0);
676
677                 /* remove from lsn order list of top-level transactions */
678                 dlist_delete(&subtxn->node);
679
680                 /* add to toplevel transaction */
681                 dlist_push_tail(&txn->subtxns, &subtxn->node);
682                 txn->nsubtxns++;
683         }
684         else if (new_top)
685         {
686                 elog(ERROR, "existing subxact assigned to unknown toplevel xact");
687         }
688 }
689
690 /*
691  * Associate a subtransaction with its toplevel transaction at commit
692  * time. There may be no further changes added after this.
693  */
694 void
695 ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
696                                                  TransactionId subxid, XLogRecPtr commit_lsn,
697                                                  XLogRecPtr end_lsn)
698 {
699         ReorderBufferTXN *txn;
700         ReorderBufferTXN *subtxn;
701
702         subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
703                                                                    InvalidXLogRecPtr, false);
704
705         /*
706          * No need to do anything if that subtxn didn't contain any changes
707          */
708         if (!subtxn)
709                 return;
710
711         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
712
713         if (txn == NULL)
714                 elog(ERROR, "subxact logged without previous toplevel record");
715
716         /*
717          * Pass the our base snapshot to the parent transaction if it doesn't have
718          * one, or ours is older. That can happen if there are no changes in the
719          * toplevel transaction but in one of the child transactions. This allows
720          * the parent to simply use it's base snapshot initially.
721          */
722         if (txn->base_snapshot == NULL ||
723                 txn->base_snapshot_lsn > subtxn->base_snapshot_lsn)
724         {
725                 txn->base_snapshot = subtxn->base_snapshot;
726                 txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
727                 subtxn->base_snapshot = NULL;
728                 subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
729         }
730
731         subtxn->final_lsn = commit_lsn;
732         subtxn->end_lsn = end_lsn;
733
734         if (!subtxn->is_known_as_subxact)
735         {
736                 subtxn->is_known_as_subxact = true;
737                 Assert(subtxn->nsubtxns == 0);
738
739                 /* remove from lsn order list of top-level transactions */
740                 dlist_delete(&subtxn->node);
741
742                 /* add to subtransaction list */
743                 dlist_push_tail(&txn->subtxns, &subtxn->node);
744                 txn->nsubtxns++;
745         }
746 }
747
748
749 /*
750  * Support for efficiently iterating over a transaction's and its
751  * subtransactions' changes.
752  *
753  * We do by doing a k-way merge between transactions/subtransactions. For that
754  * we model the current heads of the different transactions as a binary heap
755  * so we easily know which (sub-)transaction has the change with the smallest
756  * lsn next.
757  *
758  * We assume the changes in individual transactions are already sorted by LSN.
759  */
760
761 /*
762  * Binary heap comparison function.
763  */
764 static int
765 ReorderBufferIterCompare(Datum a, Datum b, void *arg)
766 {
767         ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
768         XLogRecPtr      pos_a = state->entries[DatumGetInt32(a)].lsn;
769         XLogRecPtr      pos_b = state->entries[DatumGetInt32(b)].lsn;
770
771         if (pos_a < pos_b)
772                 return 1;
773         else if (pos_a == pos_b)
774                 return 0;
775         return -1;
776 }
777
778 /*
779  * Allocate & initialize an iterator which iterates in lsn order over a
780  * transaction and all its subtransactions.
781  */
782 static ReorderBufferIterTXNState *
783 ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
784 {
785         Size            nr_txns = 0;
786         ReorderBufferIterTXNState *state;
787         dlist_iter      cur_txn_i;
788         int32           off;
789
790         /*
791          * Calculate the size of our heap: one element for every transaction that
792          * contains changes.  (Besides the transactions already in the reorder
793          * buffer, we count the one we were directly passed.)
794          */
795         if (txn->nentries > 0)
796                 nr_txns++;
797
798         dlist_foreach(cur_txn_i, &txn->subtxns)
799         {
800                 ReorderBufferTXN *cur_txn;
801
802                 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
803
804                 if (cur_txn->nentries > 0)
805                         nr_txns++;
806         }
807
808         /*
809          * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
810          * need to allocate/build a heap then.
811          */
812
813         /* allocate iteration state */
814         state = (ReorderBufferIterTXNState *)
815                 MemoryContextAllocZero(rb->context,
816                                                            sizeof(ReorderBufferIterTXNState) +
817                                                            sizeof(ReorderBufferIterTXNEntry) * nr_txns);
818
819         state->nr_txns = nr_txns;
820         dlist_init(&state->old_change);
821
822         for (off = 0; off < state->nr_txns; off++)
823         {
824                 state->entries[off].fd = -1;
825                 state->entries[off].segno = 0;
826         }
827
828         /* allocate heap */
829         state->heap = binaryheap_allocate(state->nr_txns,
830                                                                           ReorderBufferIterCompare,
831                                                                           state);
832
833         /*
834          * Now insert items into the binary heap, in an unordered fashion.  (We
835          * will run a heap assembly step at the end; this is more efficient.)
836          */
837
838         off = 0;
839
840         /* add toplevel transaction if it contains changes */
841         if (txn->nentries > 0)
842         {
843                 ReorderBufferChange *cur_change;
844
845                 if (txn->nentries != txn->nentries_mem)
846                         ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
847                                                                                 &state->entries[off].segno);
848
849                 cur_change = dlist_head_element(ReorderBufferChange, node,
850                                                                                 &txn->changes);
851
852                 state->entries[off].lsn = cur_change->lsn;
853                 state->entries[off].change = cur_change;
854                 state->entries[off].txn = txn;
855
856                 binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
857         }
858
859         /* add subtransactions if they contain changes */
860         dlist_foreach(cur_txn_i, &txn->subtxns)
861         {
862                 ReorderBufferTXN *cur_txn;
863
864                 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
865
866                 if (cur_txn->nentries > 0)
867                 {
868                         ReorderBufferChange *cur_change;
869
870                         if (txn->nentries != txn->nentries_mem)
871                                 ReorderBufferRestoreChanges(rb, cur_txn,
872                                                                                         &state->entries[off].fd,
873                                                                                         &state->entries[off].segno);
874
875                         cur_change = dlist_head_element(ReorderBufferChange, node,
876                                                                                         &cur_txn->changes);
877
878                         state->entries[off].lsn = cur_change->lsn;
879                         state->entries[off].change = cur_change;
880                         state->entries[off].txn = cur_txn;
881
882                         binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
883                 }
884         }
885
886         /* assemble a valid binary heap */
887         binaryheap_build(state->heap);
888
889         return state;
890 }
891
892 /*
893  * Return the next change when iterating over a transaction and its
894  * subtransactions.
895  *
896  * Returns NULL when no further changes exist.
897  */
898 static ReorderBufferChange *
899 ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
900 {
901         ReorderBufferChange *change;
902         ReorderBufferIterTXNEntry *entry;
903         int32           off;
904
905         /* nothing there anymore */
906         if (state->heap->bh_size == 0)
907                 return NULL;
908
909         off = DatumGetInt32(binaryheap_first(state->heap));
910         entry = &state->entries[off];
911
912         /* free memory we might have "leaked" in the previous *Next call */
913         if (!dlist_is_empty(&state->old_change))
914         {
915                 change = dlist_container(ReorderBufferChange, node,
916                                                                  dlist_pop_head_node(&state->old_change));
917                 ReorderBufferReturnChange(rb, change);
918                 Assert(dlist_is_empty(&state->old_change));
919         }
920
921         change = entry->change;
922
923         /*
924          * update heap with information about which transaction has the next
925          * relevant change in LSN order
926          */
927
928         /* there are in-memory changes */
929         if (dlist_has_next(&entry->txn->changes, &entry->change->node))
930         {
931                 dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
932                 ReorderBufferChange *next_change =
933                 dlist_container(ReorderBufferChange, node, next);
934
935                 /* txn stays the same */
936                 state->entries[off].lsn = next_change->lsn;
937                 state->entries[off].change = next_change;
938
939                 binaryheap_replace_first(state->heap, Int32GetDatum(off));
940                 return change;
941         }
942
943         /* try to load changes from disk */
944         if (entry->txn->nentries != entry->txn->nentries_mem)
945         {
946                 /*
947                  * Ugly: restoring changes will reuse *Change records, thus delete the
948                  * current one from the per-tx list and only free in the next call.
949                  */
950                 dlist_delete(&change->node);
951                 dlist_push_tail(&state->old_change, &change->node);
952
953                 if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
954                                                                                 &state->entries[off].segno))
955                 {
956                         /* successfully restored changes from disk */
957                         ReorderBufferChange *next_change =
958                         dlist_head_element(ReorderBufferChange, node,
959                                                            &entry->txn->changes);
960
961                         elog(DEBUG2, "restored %u/%u changes from disk",
962                                  (uint32) entry->txn->nentries_mem,
963                                  (uint32) entry->txn->nentries);
964
965                         Assert(entry->txn->nentries_mem);
966                         /* txn stays the same */
967                         state->entries[off].lsn = next_change->lsn;
968                         state->entries[off].change = next_change;
969                         binaryheap_replace_first(state->heap, Int32GetDatum(off));
970
971                         return change;
972                 }
973         }
974
975         /* ok, no changes there anymore, remove */
976         binaryheap_remove_first(state->heap);
977
978         return change;
979 }
980
981 /*
982  * Deallocate the iterator
983  */
984 static void
985 ReorderBufferIterTXNFinish(ReorderBuffer *rb,
986                                                    ReorderBufferIterTXNState *state)
987 {
988         int32           off;
989
990         for (off = 0; off < state->nr_txns; off++)
991         {
992                 if (state->entries[off].fd != -1)
993                         CloseTransientFile(state->entries[off].fd);
994         }
995
996         /* free memory we might have "leaked" in the last *Next call */
997         if (!dlist_is_empty(&state->old_change))
998         {
999                 ReorderBufferChange *change;
1000
1001                 change = dlist_container(ReorderBufferChange, node,
1002                                                                  dlist_pop_head_node(&state->old_change));
1003                 ReorderBufferReturnChange(rb, change);
1004                 Assert(dlist_is_empty(&state->old_change));
1005         }
1006
1007         binaryheap_free(state->heap);
1008         pfree(state);
1009 }
1010
1011 /*
1012  * Cleanup the contents of a transaction, usually after the transaction
1013  * committed or aborted.
1014  */
1015 static void
1016 ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1017 {
1018         bool            found;
1019         dlist_mutable_iter iter;
1020
1021         /* cleanup subtransactions & their changes */
1022         dlist_foreach_modify(iter, &txn->subtxns)
1023         {
1024                 ReorderBufferTXN *subtxn;
1025
1026                 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1027
1028                 /*
1029                  * Subtransactions are always associated to the toplevel TXN, even if
1030                  * they originally were happening inside another subtxn, so we won't
1031                  * ever recurse more than one level deep here.
1032                  */
1033                 Assert(subtxn->is_known_as_subxact);
1034                 Assert(subtxn->nsubtxns == 0);
1035
1036                 ReorderBufferCleanupTXN(rb, subtxn);
1037         }
1038
1039         /* cleanup changes in the toplevel txn */
1040         dlist_foreach_modify(iter, &txn->changes)
1041         {
1042                 ReorderBufferChange *change;
1043
1044                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1045
1046                 ReorderBufferReturnChange(rb, change);
1047         }
1048
1049         /*
1050          * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1051          * They are always stored in the toplevel transaction.
1052          */
1053         dlist_foreach_modify(iter, &txn->tuplecids)
1054         {
1055                 ReorderBufferChange *change;
1056
1057                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1058                 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1059                 ReorderBufferReturnChange(rb, change);
1060         }
1061
1062         if (txn->base_snapshot != NULL)
1063         {
1064                 SnapBuildSnapDecRefcount(txn->base_snapshot);
1065                 txn->base_snapshot = NULL;
1066                 txn->base_snapshot_lsn = InvalidXLogRecPtr;
1067         }
1068
1069         /* delete from list of known subxacts */
1070         if (txn->is_known_as_subxact)
1071         {
1072                 /* NB: nsubxacts count of parent will be too high now */
1073                 dlist_delete(&txn->node);
1074         }
1075         /* delete from LSN ordered list of toplevel TXNs */
1076         else
1077         {
1078                 dlist_delete(&txn->node);
1079         }
1080
1081         /* now remove reference from buffer */
1082         hash_search(rb->by_txn,
1083                                 (void *) &txn->xid,
1084                                 HASH_REMOVE,
1085                                 &found);
1086         Assert(found);
1087
1088         /* remove entries spilled to disk */
1089         if (txn->nentries != txn->nentries_mem)
1090                 ReorderBufferRestoreCleanup(rb, txn);
1091
1092         /* deallocate */
1093         ReorderBufferReturnTXN(rb, txn);
1094 }
1095
1096 /*
1097  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1098  * tqual.c's HeapTupleSatisfiesHistoricMVCC.
1099  */
1100 static void
1101 ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
1102 {
1103         dlist_iter      iter;
1104         HASHCTL         hash_ctl;
1105
1106         if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1107                 return;
1108
1109         memset(&hash_ctl, 0, sizeof(hash_ctl));
1110
1111         hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1112         hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1113         hash_ctl.hcxt = rb->context;
1114
1115         /*
1116          * create the hash with the exact number of to-be-stored tuplecids from
1117          * the start
1118          */
1119         txn->tuplecid_hash =
1120                 hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1121                                         HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1122
1123         dlist_foreach(iter, &txn->tuplecids)
1124         {
1125                 ReorderBufferTupleCidKey key;
1126                 ReorderBufferTupleCidEnt *ent;
1127                 bool            found;
1128                 ReorderBufferChange *change;
1129
1130                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1131
1132                 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1133
1134                 /* be careful about padding */
1135                 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1136
1137                 key.relnode = change->data.tuplecid.node;
1138
1139                 ItemPointerCopy(&change->data.tuplecid.tid,
1140                                                 &key.tid);
1141
1142                 ent = (ReorderBufferTupleCidEnt *)
1143                         hash_search(txn->tuplecid_hash,
1144                                                 (void *) &key,
1145                                                 HASH_ENTER | HASH_FIND,
1146                                                 &found);
1147                 if (!found)
1148                 {
1149                         ent->cmin = change->data.tuplecid.cmin;
1150                         ent->cmax = change->data.tuplecid.cmax;
1151                         ent->combocid = change->data.tuplecid.combocid;
1152                 }
1153                 else
1154                 {
1155                         Assert(ent->cmin == change->data.tuplecid.cmin);
1156                         Assert(ent->cmax == InvalidCommandId ||
1157                                    ent->cmax == change->data.tuplecid.cmax);
1158
1159                         /*
1160                          * if the tuple got valid in this transaction and now got deleted
1161                          * we already have a valid cmin stored. The cmax will be
1162                          * InvalidCommandId though.
1163                          */
1164                         ent->cmax = change->data.tuplecid.cmax;
1165                 }
1166         }
1167 }
1168
1169 /*
1170  * Copy a provided snapshot so we can modify it privately. This is needed so
1171  * that catalog modifying transactions can look into intermediate catalog
1172  * states.
1173  */
1174 static Snapshot
1175 ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
1176                                           ReorderBufferTXN *txn, CommandId cid)
1177 {
1178         Snapshot        snap;
1179         dlist_iter      iter;
1180         int                     i = 0;
1181         Size            size;
1182
1183         size = sizeof(SnapshotData) +
1184                 sizeof(TransactionId) * orig_snap->xcnt +
1185                 sizeof(TransactionId) * (txn->nsubtxns + 1);
1186
1187         snap = MemoryContextAllocZero(rb->context, size);
1188         memcpy(snap, orig_snap, sizeof(SnapshotData));
1189
1190         snap->copied = true;
1191         snap->active_count = 0;
1192         snap->regd_count = 1;
1193         snap->xip = (TransactionId *) (snap + 1);
1194
1195         memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1196
1197         /*
1198          * snap->subxip contains all txids that belong to our transaction which we
1199          * need to check via cmin/cmax. Thats why we store the toplevel
1200          * transaction in there as well.
1201          */
1202         snap->subxip = snap->xip + snap->xcnt;
1203         snap->subxip[i++] = txn->xid;
1204
1205         /*
1206          * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1207          * Since it's an upper boundary it is safe to use it for the allocation
1208          * above.
1209          */
1210         snap->subxcnt = 1;
1211
1212         dlist_foreach(iter, &txn->subtxns)
1213         {
1214                 ReorderBufferTXN *sub_txn;
1215
1216                 sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1217                 snap->subxip[i++] = sub_txn->xid;
1218                 snap->subxcnt++;
1219         }
1220
1221         /* sort so we can bsearch() later */
1222         qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1223
1224         /* store the specified current CommandId */
1225         snap->curcid = cid;
1226
1227         return snap;
1228 }
1229
1230 /*
1231  * Free a previously ReorderBufferCopySnap'ed snapshot
1232  */
1233 static void
1234 ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
1235 {
1236         if (snap->copied)
1237                 pfree(snap);
1238         else
1239                 SnapBuildSnapDecRefcount(snap);
1240 }
1241
1242 /*
1243  * Perform the replay of a transaction and it's non-aborted subtransactions.
1244  *
1245  * Subtransactions previously have to be processed by
1246  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1247  * transaction with ReorderBufferAssignChild.
1248  *
1249  * We currently can only decode a transaction's contents in when their commit
1250  * record is read because that's currently the only place where we know about
1251  * cache invalidations. Thus, once a toplevel commit is read, we iterate over
1252  * the top and subtransactions (using a k-way merge) and replay the changes in
1253  * lsn order.
1254  */
1255 void
1256 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
1257                                         XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1258                                         TimestampTz commit_time)
1259 {
1260         ReorderBufferTXN *txn;
1261         ReorderBufferIterTXNState *iterstate = NULL;
1262         ReorderBufferChange *change;
1263
1264         volatile CommandId command_id = FirstCommandId;
1265         volatile Snapshot snapshot_now = NULL;
1266         volatile bool using_subtxn = false;
1267
1268         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1269                                                                 false);
1270
1271         /* unknown transaction, nothing to replay */
1272         if (txn == NULL)
1273                 return;
1274
1275         txn->final_lsn = commit_lsn;
1276         txn->end_lsn = end_lsn;
1277         txn->commit_time = commit_time;
1278
1279         /* serialize the last bunch of changes if we need start earlier anyway */
1280         if (txn->nentries_mem != txn->nentries)
1281                 ReorderBufferSerializeTXN(rb, txn);
1282
1283         /*
1284          * If this transaction didn't have any real changes in our database, it's
1285          * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1286          * transferred its snapshot to this transaction if it had one and the
1287          * toplevel tx didn't.
1288          */
1289         if (txn->base_snapshot == NULL)
1290         {
1291                 Assert(txn->ninvalidations == 0);
1292                 ReorderBufferCleanupTXN(rb, txn);
1293                 return;
1294         }
1295
1296         snapshot_now = txn->base_snapshot;
1297
1298         /* build data to be able to lookup the CommandIds of catalog tuples */
1299         ReorderBufferBuildTupleCidHash(rb, txn);
1300
1301         /* setup the initial snapshot */
1302         SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1303
1304         PG_TRY();
1305         {
1306
1307                 /*
1308                  * Decoding needs access to syscaches et al., which in turn use
1309                  * heavyweight locks and such. Thus we need to have enough state
1310                  * around to keep track of those. The easiest way is to simply use a
1311                  * transaction internally. That also allows us to easily enforce that
1312                  * nothing writes to the database by checking for xid assignments.
1313                  *
1314                  * When we're called via the SQL SRF there's already a transaction
1315                  * started, so start an explicit subtransaction there.
1316                  */
1317                 using_subtxn = IsTransactionOrTransactionBlock();
1318
1319                 if (using_subtxn)
1320                         BeginInternalSubTransaction("replay");
1321                 else
1322                         StartTransactionCommand();
1323
1324                 rb->begin(rb, txn);
1325
1326                 iterstate = ReorderBufferIterTXNInit(rb, txn);
1327                 while ((change = ReorderBufferIterTXNNext(rb, iterstate)))
1328                 {
1329                         Relation        relation = NULL;
1330                         Oid                     reloid;
1331
1332                         switch (change->action)
1333                         {
1334                                 case REORDER_BUFFER_CHANGE_INSERT:
1335                                 case REORDER_BUFFER_CHANGE_UPDATE:
1336                                 case REORDER_BUFFER_CHANGE_DELETE:
1337                                         Assert(snapshot_now);
1338
1339                                         reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1340                                                                                         change->data.tp.relnode.relNode);
1341
1342                                         /*
1343                                          * Catalog tuple without data, emitted while catalog was
1344                                          * in the process of being rewritten.
1345                                          */
1346                                         if (reloid == InvalidOid &&
1347                                                 change->data.tp.newtuple == NULL &&
1348                                                 change->data.tp.oldtuple == NULL)
1349                                                 continue;
1350                                         else if (reloid == InvalidOid)
1351                                                 elog(ERROR, "could not map filenode \"%s\" to relation OID",
1352                                                          relpathperm(change->data.tp.relnode,
1353                                                                                  MAIN_FORKNUM));
1354
1355                                         relation = RelationIdGetRelation(reloid);
1356
1357                                         if (relation == NULL)
1358                                                 elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1359                                                          reloid,
1360                                                          relpathperm(change->data.tp.relnode,
1361                                                                                  MAIN_FORKNUM));
1362
1363                                         if (RelationIsLogicallyLogged(relation))
1364                                         {
1365                                                 /*
1366                                                  * For now ignore sequence changes entirely. Most of
1367                                                  * the time they don't log changes using records we
1368                                                  * understand, so it doesn't make sense to handle the
1369                                                  * few cases we do.
1370                                                  */
1371                                                 if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1372                                                 {
1373                                                 }
1374                                                 /* user-triggered change */
1375                                                 else if (!IsToastRelation(relation))
1376                                                 {
1377                                                         ReorderBufferToastReplace(rb, txn, relation, change);
1378                                                         rb->apply_change(rb, txn, relation, change);
1379
1380                                                         /*
1381                                                          * Only clear reassembled toast chunks if we're
1382                                                          * sure they're not required anymore. The creator
1383                                                          * of the tuple tells us.
1384                                                          */
1385                                                         if (change->data.tp.clear_toast_afterwards)
1386                                                                 ReorderBufferToastReset(rb, txn);
1387                                                 }
1388                                                 /* we're not interested in toast deletions */
1389                                                 else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1390                                                 {
1391                                                         /*
1392                                                          * Need to reassemble the full toasted Datum in
1393                                                          * memory, to ensure the chunks don't get reused
1394                                                          * till we're done remove it from the list of this
1395                                                          * transaction's changes. Otherwise it will get
1396                                                          * freed/reused while restoring spooled data from
1397                                                          * disk.
1398                                                          */
1399                                                         dlist_delete(&change->node);
1400                                                         ReorderBufferToastAppendChunk(rb, txn, relation,
1401                                                                                                                   change);
1402                                                 }
1403
1404                                         }
1405                                         RelationClose(relation);
1406                                         break;
1407                                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
1408                                         /* get rid of the old */
1409                                         TeardownHistoricSnapshot(false);
1410
1411                                         if (snapshot_now->copied)
1412                                         {
1413                                                 ReorderBufferFreeSnap(rb, snapshot_now);
1414                                                 snapshot_now =
1415                                                         ReorderBufferCopySnap(rb, change->data.snapshot,
1416                                                                                                   txn, command_id);
1417                                         }
1418
1419                                         /*
1420                                          * Restored from disk, need to be careful not to double
1421                                          * free. We could introduce refcounting for that, but for
1422                                          * now this seems infrequent enough not to care.
1423                                          */
1424                                         else if (change->data.snapshot->copied)
1425                                         {
1426                                                 snapshot_now =
1427                                                         ReorderBufferCopySnap(rb, change->data.snapshot,
1428                                                                                                   txn, command_id);
1429                                         }
1430                                         else
1431                                         {
1432                                                 snapshot_now = change->data.snapshot;
1433                                         }
1434
1435
1436                                         /* and continue with the new one */
1437                                         SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1438                                         break;
1439
1440                                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
1441                                         Assert(change->data.command_id != InvalidCommandId);
1442
1443                                         if (command_id < change->data.command_id)
1444                                         {
1445                                                 command_id = change->data.command_id;
1446
1447                                                 if (!snapshot_now->copied)
1448                                                 {
1449                                                         /* we don't use the global one anymore */
1450                                                         snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1451                                                                                                                         txn, command_id);
1452                                                 }
1453
1454                                                 snapshot_now->curcid = command_id;
1455
1456                                                 TeardownHistoricSnapshot(false);
1457                                                 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1458
1459                                                 /*
1460                                                  * Every time the CommandId is incremented, we could
1461                                                  * see new catalog contents, so execute all
1462                                                  * invalidations.
1463                                                  */
1464                                                 ReorderBufferExecuteInvalidations(rb, txn);
1465                                         }
1466
1467                                         break;
1468
1469                                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
1470                                         elog(ERROR, "tuplecid value in changequeue");
1471                                         break;
1472                         }
1473                 }
1474
1475                 ReorderBufferIterTXNFinish(rb, iterstate);
1476
1477                 /* call commit callback */
1478                 rb->commit(rb, txn, commit_lsn);
1479
1480                 /* this is just a sanity check against bad output plugin behaviour */
1481                 if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
1482                         elog(ERROR, "output plugin used XID %u",
1483                                  GetCurrentTransactionId());
1484
1485                 /* cleanup */
1486                 TeardownHistoricSnapshot(false);
1487
1488                 /*
1489                  * Aborting the current (sub-)transaction as a whole has the right
1490                  * semantics. We want all locks acquired in here to be released, not
1491                  * reassigned to the parent and we do not want any database access
1492                  * have persistent effects.
1493                  */
1494                 AbortCurrentTransaction();
1495
1496                 /* make sure there's no cache pollution */
1497                 ReorderBufferExecuteInvalidations(rb, txn);
1498
1499                 if (using_subtxn)
1500                         RollbackAndReleaseCurrentSubTransaction();
1501
1502                 if (snapshot_now->copied)
1503                         ReorderBufferFreeSnap(rb, snapshot_now);
1504
1505                 /* remove potential on-disk data, and deallocate */
1506                 ReorderBufferCleanupTXN(rb, txn);
1507         }
1508         PG_CATCH();
1509         {
1510                 /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1511                 if (iterstate)
1512                         ReorderBufferIterTXNFinish(rb, iterstate);
1513
1514                 TeardownHistoricSnapshot(true);
1515
1516                 /*
1517                  * Force cache invalidation to happen outside of a valid transaction
1518                  * to prevent catalog access as we just caught an error.
1519                  */
1520                 AbortCurrentTransaction();
1521
1522                 /* make sure there's no cache pollution */
1523                 ReorderBufferExecuteInvalidations(rb, txn);
1524
1525                 if (using_subtxn)
1526                         RollbackAndReleaseCurrentSubTransaction();
1527
1528                 if (snapshot_now->copied)
1529                         ReorderBufferFreeSnap(rb, snapshot_now);
1530
1531                 /* remove potential on-disk data, and deallocate */
1532                 ReorderBufferCleanupTXN(rb, txn);
1533
1534                 PG_RE_THROW();
1535         }
1536         PG_END_TRY();
1537 }
1538
1539 /*
1540  * Abort a transaction that possibly has previous changes. Needs to be first
1541  * called for subtransactions and then for the toplevel xid.
1542  *
1543  * NB: Transactions handled here have to have actively aborted (i.e. have
1544  * produced an abort record). Implicitly aborted transactions are handled via
1545  * ReorderBufferAbortOld(); transactions we're just not interesteded in, but
1546  * which have committed are handled in ReorderBufferForget().
1547  *
1548  * This function purges this transaction and its contents from memory and
1549  * disk.
1550  */
1551 void
1552 ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1553 {
1554         ReorderBufferTXN *txn;
1555
1556         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1557                                                                 false);
1558
1559         /* unknown, nothing to remove */
1560         if (txn == NULL)
1561                 return;
1562
1563         /* cosmetic... */
1564         txn->final_lsn = lsn;
1565
1566         /* remove potential on-disk data, and deallocate */
1567         ReorderBufferCleanupTXN(rb, txn);
1568 }
1569
1570 /*
1571  * Abort all transactions that aren't actually running anymore because the
1572  * server restarted.
1573  *
1574  * NB: These really have to be transactions that have aborted due to a server
1575  * crash/immediate restart, as we don't deal with invalidations here.
1576  */
1577 void
1578 ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
1579 {
1580         dlist_mutable_iter it;
1581
1582         /*
1583          * Iterate through all (potential) toplevel TXNs and abort all that are
1584          * older than what possibly can be running. Once we've found the first
1585          * that is alive we stop, there might be some that acquired an xid earlier
1586          * but started writing later, but it's unlikely and they will cleaned up
1587          * in a later call to ReorderBufferAbortOld().
1588          */
1589         dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1590         {
1591                 ReorderBufferTXN *txn;
1592
1593                 txn = dlist_container(ReorderBufferTXN, node, it.cur);
1594
1595                 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1596                 {
1597                         elog(DEBUG1, "aborting old transaction %u", txn->xid);
1598
1599                         /* remove potential on-disk data, and deallocate this tx */
1600                         ReorderBufferCleanupTXN(rb, txn);
1601                 }
1602                 else
1603                         return;
1604         }
1605 }
1606
1607 /*
1608  * Forget the contents of a transaction if we aren't interested in it's
1609  * contents. Needs to be first called for subtransactions and then for the
1610  * toplevel xid.
1611  *
1612  * This is significantly different to ReorderBufferAbort() because
1613  * transactions that have committed need to be treated differenly from aborted
1614  * ones since they may have modified the catalog.
1615  *
1616  * Note that this is only allowed to be called in the moment a transaction
1617  * commit has just been read, not earlier; otherwise later records referring
1618  * to this xid might re-create the transaction incompletely.
1619  */
1620 void
1621 ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1622 {
1623         ReorderBufferTXN *txn;
1624
1625         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1626                                                                 false);
1627
1628         /* unknown, nothing to forget */
1629         if (txn == NULL)
1630                 return;
1631
1632         /* cosmetic... */
1633         txn->final_lsn = lsn;
1634
1635         /*
1636          * Proccess cache invalidation messages if there are any. Even if we're
1637          * not interested in the transaction's contents, it could have manipulated
1638          * the catalog and we need to update the caches according to that.
1639          */
1640         if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1641         {
1642                 bool use_subtxn = IsTransactionOrTransactionBlock();
1643
1644                 if (use_subtxn)
1645                         BeginInternalSubTransaction("replay");
1646
1647                 /*
1648                  * Force invalidations to happen outside of a valid transaction - that
1649                  * way entries will just be marked as invalid without accessing the
1650                  * catalog. That's advantageous because we don't need to setup the
1651                  * full state necessary for catalog access.
1652                  */
1653                 if (use_subtxn)
1654                         AbortCurrentTransaction();
1655
1656                 ReorderBufferExecuteInvalidations(rb, txn);
1657
1658                 if (use_subtxn)
1659                         RollbackAndReleaseCurrentSubTransaction();
1660         }
1661         else
1662                 Assert(txn->ninvalidations == 0);
1663
1664         /* remove potential on-disk data, and deallocate */
1665         ReorderBufferCleanupTXN(rb, txn);
1666 }
1667
1668
1669 /*
1670  * Check whether a transaction is already known in this module.xs
1671  */
1672 bool
1673 ReorderBufferIsXidKnown(ReorderBuffer *rb, TransactionId xid)
1674 {
1675         ReorderBufferTXN *txn;
1676
1677         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1678                                                                 false);
1679         return txn != NULL;
1680 }
1681
1682 /*
1683  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
1684  * because the previous snapshot doesn't describe the catalog correctly for
1685  * following rows.
1686  */
1687 void
1688 ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
1689                                                  XLogRecPtr lsn, Snapshot snap)
1690 {
1691         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1692
1693         change->data.snapshot = snap;
1694         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
1695
1696         ReorderBufferQueueChange(rb, xid, lsn, change);
1697 }
1698
1699 /*
1700  * Setup the base snapshot of a transaction. The base snapshot is the snapshot
1701  * that is used to decode all changes until either this transaction modifies
1702  * the catalog or another catalog modifying transaction commits.
1703  *
1704  * Needs to be called before any changes are added with
1705  * ReorderBufferQueueChange().
1706  */
1707 void
1708 ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
1709                                                          XLogRecPtr lsn, Snapshot snap)
1710 {
1711         ReorderBufferTXN *txn;
1712         bool            is_new;
1713
1714         txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
1715         Assert(txn->base_snapshot == NULL);
1716         Assert(snap != NULL);
1717
1718         txn->base_snapshot = snap;
1719         txn->base_snapshot_lsn = lsn;
1720 }
1721
1722 /*
1723  * Access the catalog with this CommandId at this point in the changestream.
1724  *
1725  * May only be called for command ids > 1
1726  */
1727 void
1728 ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
1729                                                          XLogRecPtr lsn, CommandId cid)
1730 {
1731         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1732
1733         change->data.command_id = cid;
1734         change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
1735
1736         ReorderBufferQueueChange(rb, xid, lsn, change);
1737 }
1738
1739
1740 /*
1741  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
1742  */
1743 void
1744 ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
1745                                                          XLogRecPtr lsn, RelFileNode node,
1746                                                          ItemPointerData tid, CommandId cmin,
1747                                                          CommandId cmax, CommandId combocid)
1748 {
1749         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1750         ReorderBufferTXN *txn;
1751
1752         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1753
1754         change->data.tuplecid.node = node;
1755         change->data.tuplecid.tid = tid;
1756         change->data.tuplecid.cmin = cmin;
1757         change->data.tuplecid.cmax = cmax;
1758         change->data.tuplecid.combocid = combocid;
1759         change->lsn = lsn;
1760         change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
1761
1762         dlist_push_tail(&txn->tuplecids, &change->node);
1763         txn->ntuplecids++;
1764 }
1765
1766 /*
1767  * Setup the invalidation of the toplevel transaction.
1768  *
1769  * This needs to be done before ReorderBufferCommit is called!
1770  */
1771 void
1772 ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
1773                                                           XLogRecPtr lsn, Size nmsgs,
1774                                                           SharedInvalidationMessage *msgs)
1775 {
1776         ReorderBufferTXN *txn;
1777
1778         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1779
1780         if (txn->ninvalidations != 0)
1781                 elog(ERROR, "only ever add one set of invalidations");
1782
1783         Assert(nmsgs > 0);
1784
1785         txn->ninvalidations = nmsgs;
1786         txn->invalidations = (SharedInvalidationMessage *)
1787                 MemoryContextAlloc(rb->context,
1788                                                    sizeof(SharedInvalidationMessage) * nmsgs);
1789         memcpy(txn->invalidations, msgs,
1790                    sizeof(SharedInvalidationMessage) * nmsgs);
1791 }
1792
1793 /*
1794  * Apply all invalidations we know. Possibly we only need parts at this point
1795  * in the changestream but we don't know which those are.
1796  */
1797 static void
1798 ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
1799 {
1800         int                     i;
1801
1802         for (i = 0; i < txn->ninvalidations; i++)
1803                 LocalExecuteInvalidationMessage(&txn->invalidations[i]);
1804 }
1805
1806 /*
1807  * Mark a transaction as containing catalog changes
1808  */
1809 void
1810 ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
1811                                                                   XLogRecPtr lsn)
1812 {
1813         ReorderBufferTXN *txn;
1814
1815         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1816
1817         txn->has_catalog_changes = true;
1818 }
1819
1820 /*
1821  * Query whether a transaction is already *known* to contain catalog
1822  * changes. This can be wrong until directly before the commit!
1823  */
1824 bool
1825 ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
1826 {
1827         ReorderBufferTXN *txn;
1828
1829         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1830                                                                 false);
1831         if (txn == NULL)
1832                 return false;
1833
1834         return txn->has_catalog_changes;
1835 }
1836
1837 /*
1838  * Have we already added the first snapshot?
1839  */
1840 bool
1841 ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
1842 {
1843         ReorderBufferTXN *txn;
1844
1845         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1846                                                                 false);
1847
1848         /* transaction isn't known yet, ergo no snapshot */
1849         if (txn == NULL)
1850                 return false;
1851
1852         /*
1853          * TODO: It would be a nice improvement if we would check the toplevel
1854          * transaction in subtransactions, but we'd need to keep track of a bit
1855          * more state.
1856          */
1857         return txn->base_snapshot != NULL;
1858 }
1859
1860
1861 /*
1862  * ---------------------------------------
1863  * Disk serialization support
1864  * ---------------------------------------
1865  */
1866
1867 /*
1868  * Ensure the IO buffer is >= sz.
1869  */
1870 static void
1871 ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
1872 {
1873         if (!rb->outbufsize)
1874         {
1875                 rb->outbuf = MemoryContextAlloc(rb->context, sz);
1876                 rb->outbufsize = sz;
1877         }
1878         else if (rb->outbufsize < sz)
1879         {
1880                 rb->outbuf = repalloc(rb->outbuf, sz);
1881                 rb->outbufsize = sz;
1882         }
1883 }
1884
1885 /*
1886  * Check whether the transaction tx should spill its data to disk.
1887  */
1888 static void
1889 ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1890 {
1891         /*
1892          * TODO: improve accounting so we cheaply can take subtransactions into
1893          * account here.
1894          */
1895         if (txn->nentries_mem >= max_changes_in_memory)
1896         {
1897                 ReorderBufferSerializeTXN(rb, txn);
1898                 Assert(txn->nentries_mem == 0);
1899         }
1900 }
1901
1902 /*
1903  * Spill data of a large transaction (and its subtransactions) to disk.
1904  */
1905 static void
1906 ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1907 {
1908         dlist_iter      subtxn_i;
1909         dlist_mutable_iter change_i;
1910         int                     fd = -1;
1911         XLogSegNo       curOpenSegNo = 0;
1912         Size            spilled = 0;
1913         char            path[MAXPGPATH];
1914
1915         elog(DEBUG2, "spill %u changes in XID %u to disk",
1916                  (uint32) txn->nentries_mem, txn->xid);
1917
1918         /* do the same to all child TXs */
1919         dlist_foreach(subtxn_i, &txn->subtxns)
1920         {
1921                 ReorderBufferTXN *subtxn;
1922
1923                 subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
1924                 ReorderBufferSerializeTXN(rb, subtxn);
1925         }
1926
1927         /* serialize changestream */
1928         dlist_foreach_modify(change_i, &txn->changes)
1929         {
1930                 ReorderBufferChange *change;
1931
1932                 change = dlist_container(ReorderBufferChange, node, change_i.cur);
1933
1934                 /*
1935                  * store in segment in which it belongs by start lsn, don't split over
1936                  * multiple segments tho
1937                  */
1938                 if (fd == -1 || XLByteInSeg(change->lsn, curOpenSegNo))
1939                 {
1940                         XLogRecPtr      recptr;
1941
1942                         if (fd != -1)
1943                                 CloseTransientFile(fd);
1944
1945                         XLByteToSeg(change->lsn, curOpenSegNo);
1946                         XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
1947
1948                         /*
1949                          * No need to care about TLIs here, only used during a single run,
1950                          * so each LSN only maps to a specific WAL record.
1951                          */
1952                         sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
1953                                         NameStr(MyReplicationSlot->data.name), txn->xid,
1954                                         (uint32) (recptr >> 32), (uint32) recptr);
1955
1956                         /* open segment, create it if necessary */
1957                         fd = OpenTransientFile(path,
1958                                                                    O_CREAT | O_WRONLY | O_APPEND | PG_BINARY,
1959                                                                    S_IRUSR | S_IWUSR);
1960
1961                         if (fd < 0)
1962                                 ereport(ERROR,
1963                                                 (errcode_for_file_access(),
1964                                                  errmsg("could not open file \"%s\": %m",
1965                                                                 path)));
1966                 }
1967
1968                 ReorderBufferSerializeChange(rb, txn, fd, change);
1969                 dlist_delete(&change->node);
1970                 ReorderBufferReturnChange(rb, change);
1971
1972                 spilled++;
1973         }
1974
1975         Assert(spilled == txn->nentries_mem);
1976         Assert(dlist_is_empty(&txn->changes));
1977         txn->nentries_mem = 0;
1978
1979         if (fd != -1)
1980                 CloseTransientFile(fd);
1981 }
1982
1983 /*
1984  * Serialize individual change to disk.
1985  */
1986 static void
1987 ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
1988                                                          int fd, ReorderBufferChange *change)
1989 {
1990         ReorderBufferDiskChange *ondisk;
1991         Size            sz = sizeof(ReorderBufferDiskChange);
1992
1993         ReorderBufferSerializeReserve(rb, sz);
1994
1995         ondisk = (ReorderBufferDiskChange *) rb->outbuf;
1996         memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
1997
1998         switch (change->action)
1999         {
2000                 case REORDER_BUFFER_CHANGE_INSERT:
2001                         /* fall through */
2002                 case REORDER_BUFFER_CHANGE_UPDATE:
2003                         /* fall through */
2004                 case REORDER_BUFFER_CHANGE_DELETE:
2005                         {
2006                                 char       *data;
2007                                 ReorderBufferTupleBuf *oldtup,
2008                                                    *newtup;
2009                                 Size            oldlen = 0;
2010                                 Size            newlen = 0;
2011
2012                                 oldtup = change->data.tp.oldtuple;
2013                                 newtup = change->data.tp.newtuple;
2014
2015                                 if (oldtup)
2016                                         oldlen = offsetof(ReorderBufferTupleBuf, data)
2017                                                 +oldtup->tuple.t_len
2018                                                 - offsetof(HeapTupleHeaderData, t_bits);
2019
2020                                 if (newtup)
2021                                         newlen = offsetof(ReorderBufferTupleBuf, data)
2022                                                 +newtup->tuple.t_len
2023                                                 - offsetof(HeapTupleHeaderData, t_bits);
2024
2025                                 sz += oldlen;
2026                                 sz += newlen;
2027
2028                                 /* make sure we have enough space */
2029                                 ReorderBufferSerializeReserve(rb, sz);
2030
2031                                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2032                                 /* might have been reallocated above */
2033                                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2034
2035                                 if (oldlen)
2036                                 {
2037                                         memcpy(data, oldtup, oldlen);
2038                                         data += oldlen;
2039                                 }
2040
2041                                 if (newlen)
2042                                 {
2043                                         memcpy(data, newtup, newlen);
2044                                         data += newlen;
2045                                 }
2046                                 break;
2047                         }
2048                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2049                         {
2050                                 Snapshot        snap;
2051                                 char       *data;
2052
2053                                 snap = change->data.snapshot;
2054
2055                                 sz += sizeof(SnapshotData) +
2056                                         sizeof(TransactionId) * snap->xcnt +
2057                                         sizeof(TransactionId) * snap->subxcnt
2058                                         ;
2059
2060                                 /* make sure we have enough space */
2061                                 ReorderBufferSerializeReserve(rb, sz);
2062                                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2063                                 /* might have been reallocated above */
2064                                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2065
2066                                 memcpy(data, snap, sizeof(SnapshotData));
2067                                 data += sizeof(SnapshotData);
2068
2069                                 if (snap->xcnt)
2070                                 {
2071                                         memcpy(data, snap->xip,
2072                                                    sizeof(TransactionId) * snap->xcnt);
2073                                         data += sizeof(TransactionId) * snap->xcnt;
2074                                 }
2075
2076                                 if (snap->subxcnt)
2077                                 {
2078                                         memcpy(data, snap->subxip,
2079                                                    sizeof(TransactionId) * snap->subxcnt);
2080                                         data += sizeof(TransactionId) * snap->subxcnt;
2081                                 }
2082                                 break;
2083                         }
2084                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2085                         /* ReorderBufferChange contains everything important */
2086                         break;
2087                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2088                         /* ReorderBufferChange contains everything important */
2089                         break;
2090         }
2091
2092         ondisk->size = sz;
2093
2094         if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2095         {
2096                 CloseTransientFile(fd);
2097                 ereport(ERROR,
2098                                 (errcode_for_file_access(),
2099                                  errmsg("could not write to data file for XID %u: %m",
2100                                                 txn->xid)));
2101         }
2102
2103         Assert(ondisk->change.action == change->action);
2104 }
2105
2106 /*
2107  * Restore a number of changes spilled to disk back into memory.
2108  */
2109 static Size
2110 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2111                                                         int *fd, XLogSegNo *segno)
2112 {
2113         Size            restored = 0;
2114         XLogSegNo       last_segno;
2115         dlist_mutable_iter cleanup_iter;
2116
2117         Assert(txn->first_lsn != InvalidXLogRecPtr);
2118         Assert(txn->final_lsn != InvalidXLogRecPtr);
2119
2120         /* free current entries, so we have memory for more */
2121         dlist_foreach_modify(cleanup_iter, &txn->changes)
2122         {
2123                 ReorderBufferChange *cleanup =
2124                 dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2125
2126                 dlist_delete(&cleanup->node);
2127                 ReorderBufferReturnChange(rb, cleanup);
2128         }
2129         txn->nentries_mem = 0;
2130         Assert(dlist_is_empty(&txn->changes));
2131
2132         XLByteToSeg(txn->final_lsn, last_segno);
2133
2134         while (restored < max_changes_in_memory && *segno <= last_segno)
2135         {
2136                 int                     readBytes;
2137                 ReorderBufferDiskChange *ondisk;
2138
2139                 if (*fd == -1)
2140                 {
2141                         XLogRecPtr      recptr;
2142                         char            path[MAXPGPATH];
2143
2144                         /* first time in */
2145                         if (*segno == 0)
2146                         {
2147                                 XLByteToSeg(txn->first_lsn, *segno);
2148                         }
2149
2150                         Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2151                         XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
2152
2153                         /*
2154                          * No need to care about TLIs here, only used during a single run,
2155                          * so each LSN only maps to a specific WAL record.
2156                          */
2157                         sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2158                                         NameStr(MyReplicationSlot->data.name), txn->xid,
2159                                         (uint32) (recptr >> 32), (uint32) recptr);
2160
2161                         *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2162                         if (*fd < 0 && errno == ENOENT)
2163                         {
2164                                 *fd = -1;
2165                                 (*segno)++;
2166                                 continue;
2167                         }
2168                         else if (*fd < 0)
2169                                 ereport(ERROR,
2170                                                 (errcode_for_file_access(),
2171                                                  errmsg("could not open file \"%s\": %m",
2172                                                                 path)));
2173
2174                 }
2175
2176                 /*
2177                  * Read the statically sized part of a change which has information
2178                  * about the total size. If we couldn't read a record, we're at the
2179                  * end of this file.
2180                  */
2181                 ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
2182                 readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2183
2184                 /* eof */
2185                 if (readBytes == 0)
2186                 {
2187                         CloseTransientFile(*fd);
2188                         *fd = -1;
2189                         (*segno)++;
2190                         continue;
2191                 }
2192                 else if (readBytes < 0)
2193                         ereport(ERROR,
2194                                         (errcode_for_file_access(),
2195                                 errmsg("could not read from reorderbuffer spill file: %m")));
2196                 else if (readBytes != sizeof(ReorderBufferDiskChange))
2197                         ereport(ERROR,
2198                                         (errcode_for_file_access(),
2199                                          errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2200                                                         readBytes,
2201                                                         (uint32) sizeof(ReorderBufferDiskChange))));
2202
2203                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2204
2205                 ReorderBufferSerializeReserve(rb,
2206                                                          sizeof(ReorderBufferDiskChange) + ondisk->size);
2207                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2208
2209                 readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2210                                                  ondisk->size - sizeof(ReorderBufferDiskChange));
2211
2212                 if (readBytes < 0)
2213                         ereport(ERROR,
2214                                         (errcode_for_file_access(),
2215                                 errmsg("could not read from reorderbuffer spill file: %m")));
2216                 else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2217                         ereport(ERROR,
2218                                         (errcode_for_file_access(),
2219                                          errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2220                                                         readBytes,
2221                                 (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2222
2223                 /*
2224                  * ok, read a full change from disk, now restore it into proper
2225                  * in-memory format
2226                  */
2227                 ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2228                 restored++;
2229         }
2230
2231         return restored;
2232 }
2233
2234 /*
2235  * Convert change from its on-disk format to in-memory format and queue it onto
2236  * the TXN's ->changes list.
2237  */
2238 static void
2239 ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2240                                                    char *data)
2241 {
2242         ReorderBufferDiskChange *ondisk;
2243         ReorderBufferChange *change;
2244
2245         ondisk = (ReorderBufferDiskChange *) data;
2246
2247         change = ReorderBufferGetChange(rb);
2248
2249         /* copy static part */
2250         memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2251
2252         data += sizeof(ReorderBufferDiskChange);
2253
2254         /* restore individual stuff */
2255         switch (change->action)
2256         {
2257                 case REORDER_BUFFER_CHANGE_INSERT:
2258                         /* fall through */
2259                 case REORDER_BUFFER_CHANGE_UPDATE:
2260                         /* fall through */
2261                 case REORDER_BUFFER_CHANGE_DELETE:
2262                         if (change->data.tp.newtuple)
2263                         {
2264                                 Size            len = offsetof(ReorderBufferTupleBuf, data)
2265                                 +((ReorderBufferTupleBuf *) data)->tuple.t_len
2266                                 - offsetof(HeapTupleHeaderData, t_bits);
2267
2268                                 change->data.tp.newtuple = ReorderBufferGetTupleBuf(rb);
2269                                 memcpy(change->data.tp.newtuple, data, len);
2270                                 change->data.tp.newtuple->tuple.t_data =
2271                                         &change->data.tp.newtuple->header;
2272                                 data += len;
2273                         }
2274
2275                         if (change->data.tp.oldtuple)
2276                         {
2277                                 Size            len = offsetof(ReorderBufferTupleBuf, data)
2278                                 +((ReorderBufferTupleBuf *) data)->tuple.t_len
2279                                 - offsetof(HeapTupleHeaderData, t_bits);
2280
2281                                 change->data.tp.oldtuple = ReorderBufferGetTupleBuf(rb);
2282                                 memcpy(change->data.tp.oldtuple, data, len);
2283                                 change->data.tp.oldtuple->tuple.t_data =
2284                                         &change->data.tp.oldtuple->header;
2285                                 data += len;
2286                         }
2287                         break;
2288                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2289                         {
2290                                 Snapshot        oldsnap;
2291                                 Snapshot        newsnap;
2292                                 Size            size;
2293
2294                                 oldsnap = (Snapshot) data;
2295
2296                                 size = sizeof(SnapshotData) +
2297                                         sizeof(TransactionId) * oldsnap->xcnt +
2298                                         sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2299
2300                                 change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2301
2302                                 newsnap = change->data.snapshot;
2303
2304                                 memcpy(newsnap, data, size);
2305                                 newsnap->xip = (TransactionId *)
2306                                         (((char *) newsnap) + sizeof(SnapshotData));
2307                                 newsnap->subxip = newsnap->xip + newsnap->xcnt;
2308                                 newsnap->copied = true;
2309                                 break;
2310                         }
2311                         /* the base struct contains all the data, easy peasy */
2312                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2313                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2314                         break;
2315         }
2316
2317         dlist_push_tail(&txn->changes, &change->node);
2318         txn->nentries_mem++;
2319 }
2320
2321 /*
2322  * Remove all on-disk stored for the passed in transaction.
2323  */
2324 static void
2325 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
2326 {
2327         XLogSegNo       first;
2328         XLogSegNo       cur;
2329         XLogSegNo       last;
2330
2331         Assert(txn->first_lsn != InvalidXLogRecPtr);
2332         Assert(txn->final_lsn != InvalidXLogRecPtr);
2333
2334         XLByteToSeg(txn->first_lsn, first);
2335         XLByteToSeg(txn->final_lsn, last);
2336
2337         /* iterate over all possible filenames, and delete them */
2338         for (cur = first; cur <= last; cur++)
2339         {
2340                 char            path[MAXPGPATH];
2341                 XLogRecPtr      recptr;
2342
2343                 XLogSegNoOffsetToRecPtr(cur, 0, recptr);
2344
2345                 sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2346                                 NameStr(MyReplicationSlot->data.name), txn->xid,
2347                                 (uint32) (recptr >> 32), (uint32) recptr);
2348                 if (unlink(path) != 0 && errno != ENOENT)
2349                         ereport(ERROR,
2350                                         (errcode_for_file_access(),
2351                                          errmsg("could not remove file \"%s\": %m", path)));
2352         }
2353 }
2354
2355 /*
2356  * Delete all data spilled to disk after we've restarted/crashed. It will be
2357  * recreated when the respective slots are reused.
2358  */
2359 void
2360 StartupReorderBuffer(void)
2361 {
2362         DIR                *logical_dir;
2363         struct dirent *logical_de;
2364
2365         DIR                *spill_dir;
2366         struct dirent *spill_de;
2367
2368         logical_dir = AllocateDir("pg_replslot");
2369         while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2370         {
2371                 struct stat statbuf;
2372                 char            path[MAXPGPATH];
2373
2374                 if (strcmp(logical_de->d_name, ".") == 0 ||
2375                         strcmp(logical_de->d_name, "..") == 0)
2376                         continue;
2377
2378                 /* if it cannot be a slot, skip the directory */
2379                 if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2380                         continue;
2381
2382                 /*
2383                  * ok, has to be a surviving logical slot, iterate and delete
2384                  * everythign starting with xid-*
2385                  */
2386                 sprintf(path, "pg_replslot/%s", logical_de->d_name);
2387
2388                 /* we're only creating directories here, skip if it's not our's */
2389                 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2390                         continue;
2391
2392                 spill_dir = AllocateDir(path);
2393                 while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2394                 {
2395                         if (strcmp(spill_de->d_name, ".") == 0 ||
2396                                 strcmp(spill_de->d_name, "..") == 0)
2397                                 continue;
2398
2399                         /* only look at names that can be ours */
2400                         if (strncmp(spill_de->d_name, "xid", 3) == 0)
2401                         {
2402                                 sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2403                                                 spill_de->d_name);
2404
2405                                 if (unlink(path) != 0)
2406                                         ereport(PANIC,
2407                                                         (errcode_for_file_access(),
2408                                                          errmsg("could not remove file \"%s\": %m",
2409                                                                         path)));
2410                         }
2411                 }
2412                 FreeDir(spill_dir);
2413         }
2414         FreeDir(logical_dir);
2415 }
2416
2417 /* ---------------------------------------
2418  * toast reassembly support
2419  * ---------------------------------------
2420  */
2421
2422 /*
2423  * Initialize per tuple toast reconstruction support.
2424  */
2425 static void
2426 ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
2427 {
2428         HASHCTL         hash_ctl;
2429
2430         Assert(txn->toast_hash == NULL);
2431
2432         memset(&hash_ctl, 0, sizeof(hash_ctl));
2433         hash_ctl.keysize = sizeof(Oid);
2434         hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2435         hash_ctl.hcxt = rb->context;
2436         txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2437                                                                   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
2438 }
2439
2440 /*
2441  * Per toast-chunk handling for toast reconstruction
2442  *
2443  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
2444  * toasted Datum comes along.
2445  */
2446 static void
2447 ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
2448                                                           Relation relation, ReorderBufferChange *change)
2449 {
2450         ReorderBufferToastEnt *ent;
2451         ReorderBufferTupleBuf *newtup;
2452         bool            found;
2453         int32           chunksize;
2454         bool            isnull;
2455         Pointer         chunk;
2456         TupleDesc       desc = RelationGetDescr(relation);
2457         Oid                     chunk_id;
2458         int32           chunk_seq;
2459
2460         if (txn->toast_hash == NULL)
2461                 ReorderBufferToastInitHash(rb, txn);
2462
2463         Assert(IsToastRelation(relation));
2464
2465         newtup = change->data.tp.newtuple;
2466         chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2467         Assert(!isnull);
2468         chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2469         Assert(!isnull);
2470
2471         ent = (ReorderBufferToastEnt *)
2472                 hash_search(txn->toast_hash,
2473                                         (void *) &chunk_id,
2474                                         HASH_ENTER,
2475                                         &found);
2476
2477         if (!found)
2478         {
2479                 Assert(ent->chunk_id == chunk_id);
2480                 ent->num_chunks = 0;
2481                 ent->last_chunk_seq = 0;
2482                 ent->size = 0;
2483                 ent->reconstructed = NULL;
2484                 dlist_init(&ent->chunks);
2485
2486                 if (chunk_seq != 0)
2487                         elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2488                                  chunk_seq, chunk_id);
2489         }
2490         else if (found && chunk_seq != ent->last_chunk_seq + 1)
2491                 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2492                          chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2493
2494         chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2495         Assert(!isnull);
2496
2497         /* calculate size so we can allocate the right size at once later */
2498         if (!VARATT_IS_EXTENDED(chunk))
2499                 chunksize = VARSIZE(chunk) - VARHDRSZ;
2500         else if (VARATT_IS_SHORT(chunk))
2501                 /* could happen due to heap_form_tuple doing its thing */
2502                 chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2503         else
2504                 elog(ERROR, "unexpected type of toast chunk");
2505
2506         ent->size += chunksize;
2507         ent->last_chunk_seq = chunk_seq;
2508         ent->num_chunks++;
2509         dlist_push_tail(&ent->chunks, &change->node);
2510 }
2511
2512 /*
2513  * Rejigger change->newtuple to point to in-memory toast tuples instead to
2514  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
2515  *
2516  * We cannot replace unchanged toast tuples though, so those will still point
2517  * to on-disk toast data.
2518  */
2519 static void
2520 ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
2521                                                   Relation relation, ReorderBufferChange *change)
2522 {
2523         TupleDesc       desc;
2524         int                     natt;
2525         Datum      *attrs;
2526         bool       *isnull;
2527         bool       *free;
2528         HeapTuple       tmphtup;
2529         Relation        toast_rel;
2530         TupleDesc       toast_desc;
2531         MemoryContext oldcontext;
2532         ReorderBufferTupleBuf *newtup;
2533
2534         /* no toast tuples changed */
2535         if (txn->toast_hash == NULL)
2536                 return;
2537
2538         oldcontext = MemoryContextSwitchTo(rb->context);
2539
2540         /* we should only have toast tuples in an INSERT or UPDATE */
2541         Assert(change->data.tp.newtuple);
2542
2543         desc = RelationGetDescr(relation);
2544
2545         toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
2546         toast_desc = RelationGetDescr(toast_rel);
2547
2548         /* should we allocate from stack instead? */
2549         attrs = palloc0(sizeof(Datum) * desc->natts);
2550         isnull = palloc0(sizeof(bool) * desc->natts);
2551         free = palloc0(sizeof(bool) * desc->natts);
2552
2553         newtup = change->data.tp.newtuple;
2554
2555         heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
2556
2557         for (natt = 0; natt < desc->natts; natt++)
2558         {
2559                 Form_pg_attribute attr = desc->attrs[natt];
2560                 ReorderBufferToastEnt *ent;
2561                 struct varlena *varlena;
2562
2563                 /* va_rawsize is the size of the original datum -- including header */
2564                 struct varatt_external toast_pointer;
2565                 struct varatt_indirect redirect_pointer;
2566                 struct varlena *new_datum = NULL;
2567                 struct varlena *reconstructed;
2568                 dlist_iter      it;
2569                 Size            data_done = 0;
2570
2571                 /* system columns aren't toasted */
2572                 if (attr->attnum < 0)
2573                         continue;
2574
2575                 if (attr->attisdropped)
2576                         continue;
2577
2578                 /* not a varlena datatype */
2579                 if (attr->attlen != -1)
2580                         continue;
2581
2582                 /* no data */
2583                 if (isnull[natt])
2584                         continue;
2585
2586                 /* ok, we know we have a toast datum */
2587                 varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
2588
2589                 /* no need to do anything if the tuple isn't external */
2590                 if (!VARATT_IS_EXTERNAL(varlena))
2591                         continue;
2592
2593                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
2594
2595                 /*
2596                  * Check whether the toast tuple changed, replace if so.
2597                  */
2598                 ent = (ReorderBufferToastEnt *)
2599                         hash_search(txn->toast_hash,
2600                                                 (void *) &toast_pointer.va_valueid,
2601                                                 HASH_FIND,
2602                                                 NULL);
2603                 if (ent == NULL)
2604                         continue;
2605
2606                 new_datum =
2607                         (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
2608
2609                 free[natt] = true;
2610
2611                 reconstructed = palloc0(toast_pointer.va_rawsize);
2612
2613                 ent->reconstructed = reconstructed;
2614
2615                 /* stitch toast tuple back together from its parts */
2616                 dlist_foreach(it, &ent->chunks)
2617                 {
2618                         bool            isnull;
2619                         ReorderBufferChange *cchange;
2620                         ReorderBufferTupleBuf *ctup;
2621                         Pointer         chunk;
2622
2623                         cchange = dlist_container(ReorderBufferChange, node, it.cur);
2624                         ctup = cchange->data.tp.newtuple;
2625                         chunk = DatumGetPointer(
2626                                                   fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
2627
2628                         Assert(!isnull);
2629                         Assert(!VARATT_IS_EXTERNAL(chunk));
2630                         Assert(!VARATT_IS_SHORT(chunk));
2631
2632                         memcpy(VARDATA(reconstructed) + data_done,
2633                                    VARDATA(chunk),
2634                                    VARSIZE(chunk) - VARHDRSZ);
2635                         data_done += VARSIZE(chunk) - VARHDRSZ;
2636                 }
2637                 Assert(data_done == toast_pointer.va_extsize);
2638
2639                 /* make sure its marked as compressed or not */
2640                 if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2641                         SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
2642                 else
2643                         SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
2644
2645                 memset(&redirect_pointer, 0, sizeof(redirect_pointer));
2646                 redirect_pointer.pointer = reconstructed;
2647
2648                 SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
2649                 memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
2650                            sizeof(redirect_pointer));
2651
2652                 attrs[natt] = PointerGetDatum(new_datum);
2653         }
2654
2655         /*
2656          * Build tuple in separate memory & copy tuple back into the tuplebuf
2657          * passed to the output plugin. We can't directly heap_fill_tuple() into
2658          * the tuplebuf because attrs[] will point back into the current content.
2659          */
2660         tmphtup = heap_form_tuple(desc, attrs, isnull);
2661         Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
2662         Assert(&newtup->header == newtup->tuple.t_data);
2663
2664         memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
2665         newtup->tuple.t_len = tmphtup->t_len;
2666
2667         /*
2668          * free resources we won't further need, more persistent stuff will be
2669          * free'd in ReorderBufferToastReset().
2670          */
2671         RelationClose(toast_rel);
2672         pfree(tmphtup);
2673         for (natt = 0; natt < desc->natts; natt++)
2674         {
2675                 if (free[natt])
2676                         pfree(DatumGetPointer(attrs[natt]));
2677         }
2678         pfree(attrs);
2679         pfree(free);
2680         pfree(isnull);
2681
2682         MemoryContextSwitchTo(oldcontext);
2683 }
2684
2685 /*
2686  * Free all resources allocated for toast reconstruction.
2687  */
2688 static void
2689 ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
2690 {
2691         HASH_SEQ_STATUS hstat;
2692         ReorderBufferToastEnt *ent;
2693
2694         if (txn->toast_hash == NULL)
2695                 return;
2696
2697         /* sequentially walk over the hash and free everything */
2698         hash_seq_init(&hstat, txn->toast_hash);
2699         while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
2700         {
2701                 dlist_mutable_iter it;
2702
2703                 if (ent->reconstructed != NULL)
2704                         pfree(ent->reconstructed);
2705
2706                 dlist_foreach_modify(it, &ent->chunks)
2707                 {
2708                         ReorderBufferChange *change =
2709                         dlist_container(ReorderBufferChange, node, it.cur);
2710
2711                         dlist_delete(&change->node);
2712                         ReorderBufferReturnChange(rb, change);
2713                 }
2714         }
2715
2716         hash_destroy(txn->toast_hash);
2717         txn->toast_hash = NULL;
2718 }
2719
2720
2721 /* ---------------------------------------
2722  * Visibility support for logical decoding
2723  *
2724  *
2725  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
2726  * always rely on stored cmin/cmax values because of two scenarios:
2727  *
2728  * * A tuple got changed multiple times during a single transaction and thus
2729  *       has got a combocid. Combocid's are only valid for the duration of a
2730  *       single transaction.
2731  * * A tuple with a cmin but no cmax (and thus no combocid) got
2732  *       deleted/updated in another transaction than the one which created it
2733  *       which we are looking at right now. As only one of cmin, cmax or combocid
2734  *       is actually stored in the heap we don't have access to the value we
2735  *       need anymore.
2736  *
2737  * To resolve those problems we have a per-transaction hash of (cmin,
2738  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
2739  * (cmin, cmax) values. That also takes care of combocids by simply
2740  * not caring about them at all. As we have the real cmin/cmax values
2741  * combocids aren't interesting.
2742  *
2743  * As we only care about catalog tuples here the overhead of this
2744  * hashtable should be acceptable.
2745  *
2746  * Heap rewrites complicate this a bit, check rewriteheap.c for
2747  * details.
2748  * -------------------------------------------------------------------------
2749  */
2750
2751 /* struct for qsort()ing mapping files by lsn somewhat efficiently */
2752 typedef struct RewriteMappingFile
2753 {
2754         XLogRecPtr      lsn;
2755         char            fname[MAXPGPATH];
2756 } RewriteMappingFile;
2757
2758 #if NOT_USED
2759 static void
2760 DisplayMapping(HTAB *tuplecid_data)
2761 {
2762         HASH_SEQ_STATUS hstat;
2763         ReorderBufferTupleCidEnt *ent;
2764
2765         hash_seq_init(&hstat, tuplecid_data);
2766         while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
2767         {
2768                 elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
2769                          ent->key.relnode.dbNode,
2770                          ent->key.relnode.spcNode,
2771                          ent->key.relnode.relNode,
2772                          BlockIdGetBlockNumber(&ent->key.tid.ip_blkid),
2773                          ent->key.tid.ip_posid,
2774                          ent->cmin,
2775                          ent->cmax
2776                         );
2777         }
2778 }
2779 #endif
2780
2781 /*
2782  * Apply a single mapping file to tuplecid_data.
2783  *
2784  * The mapping file has to have been verified to be a) committed b) for our
2785  * transaction c) applied in LSN order.
2786  */
2787 static void
2788 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
2789 {
2790         char            path[MAXPGPATH];
2791         int                     fd;
2792         int                     readBytes;
2793         LogicalRewriteMappingData map;
2794
2795         sprintf(path, "pg_logical/mappings/%s", fname);
2796         fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2797         if (fd < 0)
2798                 ereport(ERROR,
2799                                 (errmsg("could not open file \"%s\": %m", path)));
2800
2801         while (true)
2802         {
2803                 ReorderBufferTupleCidKey key;
2804                 ReorderBufferTupleCidEnt *ent;
2805                 ReorderBufferTupleCidEnt *new_ent;
2806                 bool            found;
2807
2808                 /* be careful about padding */
2809                 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
2810
2811                 /* read all mappings till the end of the file */
2812                 readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
2813
2814                 if (readBytes < 0)
2815                         ereport(ERROR,
2816                                         (errcode_for_file_access(),
2817                                          errmsg("could not read file \"%s\": %m",
2818                                                         path)));
2819                 else if (readBytes == 0)        /* EOF */
2820                         break;
2821                 else if (readBytes != sizeof(LogicalRewriteMappingData))
2822                         ereport(ERROR,
2823                                         (errcode_for_file_access(),
2824                                          errmsg("could not read from file \"%s\": read %d instead of %d bytes",
2825                                                         path, readBytes,
2826                                                         (int32) sizeof(LogicalRewriteMappingData))));
2827
2828                 key.relnode = map.old_node;
2829                 ItemPointerCopy(&map.old_tid,
2830                                                 &key.tid);
2831
2832
2833                 ent = (ReorderBufferTupleCidEnt *)
2834                         hash_search(tuplecid_data,
2835                                                 (void *) &key,
2836                                                 HASH_FIND,
2837                                                 NULL);
2838
2839                 /* no existing mapping, no need to update */
2840                 if (!ent)
2841                         continue;
2842
2843                 key.relnode = map.new_node;
2844                 ItemPointerCopy(&map.new_tid,
2845                                                 &key.tid);
2846
2847                 new_ent = (ReorderBufferTupleCidEnt *)
2848                         hash_search(tuplecid_data,
2849                                                 (void *) &key,
2850                                                 HASH_ENTER,
2851                                                 &found);
2852
2853                 if (found)
2854                 {
2855                         /*
2856                          * Make sure the existing mapping makes sense. We sometime update
2857                          * old records that did not yet have a cmax (e.g. pg_class' own
2858                          * entry while rewriting it) during rewrites, so allow that.
2859                          */
2860                         Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
2861                         Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
2862                 }
2863                 else
2864                 {
2865                         /* update mapping */
2866                         new_ent->cmin = ent->cmin;
2867                         new_ent->cmax = ent->cmax;
2868                         new_ent->combocid = ent->combocid;
2869                 }
2870         }
2871 }
2872
2873
2874 /*
2875  * Check whether the TransactionOId 'xid' is in the pre-sorted array 'xip'.
2876  */
2877 static bool
2878 TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
2879 {
2880         return bsearch(&xid, xip, num,
2881                                    sizeof(TransactionId), xidComparator) != NULL;
2882 }
2883
2884 /*
2885  * qsort() comparator for sorting RewriteMappingFiles in LSN order.
2886  */
2887 static int
2888 file_sort_by_lsn(const void *a_p, const void *b_p)
2889 {
2890         RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
2891         RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
2892
2893         if (a->lsn < b->lsn)
2894                 return -1;
2895         else if (a->lsn > b->lsn)
2896                 return 1;
2897         return 0;
2898 }
2899
2900 /*
2901  * Apply any existing logical remapping files if there are any targeted at our
2902  * transaction for relid.
2903  */
2904 static void
2905 UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
2906 {
2907         DIR                *mapping_dir;
2908         struct dirent *mapping_de;
2909         List       *files = NIL;
2910         ListCell   *file;
2911         RewriteMappingFile **files_a;
2912         size_t          off;
2913         Oid                     dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
2914
2915         mapping_dir = AllocateDir("pg_logical/mappings");
2916         while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
2917         {
2918                 Oid                     f_dboid;
2919                 Oid                     f_relid;
2920                 TransactionId f_mapped_xid;
2921                 TransactionId f_create_xid;
2922                 XLogRecPtr      f_lsn;
2923                 uint32          f_hi,
2924                                         f_lo;
2925                 RewriteMappingFile *f;
2926
2927                 if (strcmp(mapping_de->d_name, ".") == 0 ||
2928                         strcmp(mapping_de->d_name, "..") == 0)
2929                         continue;
2930
2931                 /* Ignore files that aren't ours */
2932                 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
2933                         continue;
2934
2935                 if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
2936                                    &f_dboid, &f_relid, &f_hi, &f_lo,
2937                                    &f_mapped_xid, &f_create_xid) != 6)
2938                         elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
2939
2940                 f_lsn = ((uint64) f_hi) << 32 | f_lo;
2941
2942                 /* mapping for another database */
2943                 if (f_dboid != dboid)
2944                         continue;
2945
2946                 /* mapping for another relation */
2947                 if (f_relid != relid)
2948                         continue;
2949
2950                 /* did the creating transaction abort? */
2951                 if (!TransactionIdDidCommit(f_create_xid))
2952                         continue;
2953
2954                 /* not for our transaction */
2955                 if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
2956                         continue;
2957
2958                 /* ok, relevant, queue for apply */
2959                 f = palloc(sizeof(RewriteMappingFile));
2960                 f->lsn = f_lsn;
2961                 strcpy(f->fname, mapping_de->d_name);
2962                 files = lappend(files, f);
2963         }
2964         FreeDir(mapping_dir);
2965
2966         /* build array we can easily sort */
2967         files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
2968         off = 0;
2969         foreach(file, files)
2970         {
2971                 files_a[off++] = lfirst(file);
2972         }
2973
2974         /* sort files so we apply them in LSN order */
2975         qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
2976                   file_sort_by_lsn);
2977
2978         for (off = 0; off < list_length(files); off++)
2979         {
2980                 RewriteMappingFile *f = files_a[off];
2981
2982                 elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
2983                          snapshot->subxip[0]);
2984                 ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
2985                 pfree(f);
2986         }
2987 }
2988
2989 /*
2990  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
2991  * combocids.
2992  */
2993 bool
2994 ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
2995                                                           Snapshot snapshot,
2996                                                           HeapTuple htup, Buffer buffer,
2997                                                           CommandId *cmin, CommandId *cmax)
2998 {
2999         ReorderBufferTupleCidKey key;
3000         ReorderBufferTupleCidEnt *ent;
3001         ForkNumber      forkno;
3002         BlockNumber blockno;
3003         bool            updated_mapping = false;
3004
3005         /* be careful about padding */
3006         memset(&key, 0, sizeof(key));
3007
3008         Assert(!BufferIsLocal(buffer));
3009
3010         /*
3011          * get relfilenode from the buffer, no convenient way to access it other
3012          * than that.
3013          */
3014         BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3015
3016         /* tuples can only be in the main fork */
3017         Assert(forkno == MAIN_FORKNUM);
3018         Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3019
3020         ItemPointerCopy(&htup->t_self,
3021                                         &key.tid);
3022
3023 restart:
3024         ent = (ReorderBufferTupleCidEnt *)
3025                 hash_search(tuplecid_data,
3026                                         (void *) &key,
3027                                         HASH_FIND,
3028                                         NULL);
3029
3030         /*
3031          * failed to find a mapping, check whether the table was rewritten and
3032          * apply mapping if so, but only do that once - there can be no new
3033          * mappings while we are in here since we have to hold a lock on the
3034          * relation.
3035          */
3036         if (ent == NULL && !updated_mapping)
3037         {
3038                 UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3039                 /* now check but don't update for a mapping again */
3040                 updated_mapping = true;
3041                 goto restart;
3042         }
3043         else if (ent == NULL)
3044                 return false;
3045
3046         if (cmin)
3047                 *cmin = ent->cmin;
3048         if (cmax)
3049                 *cmax = ent->cmax;
3050         return true;
3051 }