]> granicus.if.org Git - postgresql/blob - src/backend/replication/logical/reorderbuffer.c
Further cleanup of ReorderBufferCommit().
[postgresql] / src / backend / replication / logical / reorderbuffer.c
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  *        PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2015, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  *        This module gets handed individual pieces of transactions in the order
15  *        they are written to the WAL and is responsible to reassemble them into
16  *        toplevel transaction sized pieces. When a transaction is completely
17  *        reassembled - signalled by reading the transaction commit record - it
18  *        will then call the output plugin (c.f. ReorderBufferCommit()) with the
19  *        individual changes. The output plugins rely on snapshots built by
20  *        snapbuild.c which hands them to us.
21  *
22  *        Transactions and subtransactions/savepoints in postgres are not
23  *        immediately linked to each other from outside the performing
24  *        backend. Only at commit/abort (or special xact_assignment records) they
25  *        are linked together. Which means that we will have to splice together a
26  *        toplevel transaction from its subtransactions. To do that efficiently we
27  *        build a binary heap indexed by the smallest current lsn of the individual
28  *        subtransactions' changestreams. As the individual streams are inherently
29  *        ordered by LSN - since that is where we build them from - the transaction
30  *        can easily be reassembled by always using the subtransaction with the
31  *        smallest current LSN from the heap.
32  *
33  *        In order to cope with large transactions - which can be several times as
34  *        big as the available memory - this module supports spooling the contents
35  *        of a large transactions to disk. When the transaction is replayed the
36  *        contents of individual (sub-)transactions will be read from disk in
37  *        chunks.
38  *
39  *        This module also has to deal with reassembling toast records from the
40  *        individual chunks stored in WAL. When a new (or initial) version of a
41  *        tuple is stored in WAL it will always be preceded by the toast chunks
42  *        emitted for the columns stored out of line. Within a single toplevel
43  *        transaction there will be no other data carrying records between a row's
44  *        toast chunks and the row data itself. See ReorderBufferToast* for
45  *        details.
46  * -------------------------------------------------------------------------
47  */
48 #include "postgres.h"
49
50 #include <unistd.h>
51 #include <sys/stat.h>
52
53 #include "access/rewriteheap.h"
54 #include "access/transam.h"
55 #include "access/tuptoaster.h"
56 #include "access/xact.h"
57 #include "access/xlog_internal.h"
58 #include "catalog/catalog.h"
59 #include "lib/binaryheap.h"
60 #include "miscadmin.h"
61 #include "replication/logical.h"
62 #include "replication/reorderbuffer.h"
63 #include "replication/slot.h"
64 #include "replication/snapbuild.h"              /* just for SnapBuildSnapDecRefcount */
65 #include "storage/bufmgr.h"
66 #include "storage/fd.h"
67 #include "storage/sinval.h"
68 #include "utils/builtins.h"
69 #include "utils/combocid.h"
70 #include "utils/memdebug.h"
71 #include "utils/memutils.h"
72 #include "utils/relcache.h"
73 #include "utils/relfilenodemap.h"
74 #include "utils/tqual.h"
75
76
77 /* entry for a hash table we use to map from xid to our transaction state */
78 typedef struct ReorderBufferTXNByIdEnt
79 {
80         TransactionId xid;
81         ReorderBufferTXN *txn;
82 } ReorderBufferTXNByIdEnt;
83
84 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
85 typedef struct ReorderBufferTupleCidKey
86 {
87         RelFileNode relnode;
88         ItemPointerData tid;
89 } ReorderBufferTupleCidKey;
90
91 typedef struct ReorderBufferTupleCidEnt
92 {
93         ReorderBufferTupleCidKey key;
94         CommandId       cmin;
95         CommandId       cmax;
96         CommandId       combocid;               /* just for debugging */
97 } ReorderBufferTupleCidEnt;
98
99 /* k-way in-order change iteration support structures */
100 typedef struct ReorderBufferIterTXNEntry
101 {
102         XLogRecPtr      lsn;
103         ReorderBufferChange *change;
104         ReorderBufferTXN *txn;
105         int                     fd;
106         XLogSegNo       segno;
107 } ReorderBufferIterTXNEntry;
108
109 typedef struct ReorderBufferIterTXNState
110 {
111         binaryheap *heap;
112         Size            nr_txns;
113         dlist_head      old_change;
114         ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
115 } ReorderBufferIterTXNState;
116
117 /* toast datastructures */
118 typedef struct ReorderBufferToastEnt
119 {
120         Oid                     chunk_id;               /* toast_table.chunk_id */
121         int32           last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
122                                                                  * have seen */
123         Size            num_chunks;             /* number of chunks we've already seen */
124         Size            size;                   /* combined size of chunks seen */
125         dlist_head      chunks;                 /* linked list of chunks */
126         struct varlena *reconstructed;          /* reconstructed varlena now pointed
127                                                                                  * to in main tup */
128 } ReorderBufferToastEnt;
129
130 /* Disk serialization support datastructures */
131 typedef struct ReorderBufferDiskChange
132 {
133         Size            size;
134         ReorderBufferChange change;
135         /* data follows */
136 } ReorderBufferDiskChange;
137
138 /*
139  * Maximum number of changes kept in memory, per transaction. After that,
140  * changes are spooled to disk.
141  *
142  * The current value should be sufficient to decode the entire transaction
143  * without hitting disk in OLTP workloads, while starting to spool to disk in
144  * other workloads reasonably fast.
145  *
146  * At some point in the future it probaly makes sense to have a more elaborate
147  * resource management here, but it's not entirely clear what that would look
148  * like.
149  */
150 static const Size max_changes_in_memory = 4096;
151
152 /*
153  * We use a very simple form of a slab allocator for frequently allocated
154  * objects, simply keeping a fixed number in a linked list when unused,
155  * instead pfree()ing them. Without that in many workloads aset.c becomes a
156  * major bottleneck, especially when spilling to disk while decoding batch
157  * workloads.
158  */
159 static const Size max_cached_changes = 4096 * 2;
160 static const Size max_cached_tuplebufs = 4096 * 2;              /* ~8MB */
161 static const Size max_cached_transactions = 512;
162
163
164 /* ---------------------------------------
165  * primary reorderbuffer support routines
166  * ---------------------------------------
167  */
168 static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
169 static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
170 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
171                                           TransactionId xid, bool create, bool *is_new,
172                                           XLogRecPtr lsn, bool create_as_top);
173
174 static void AssertTXNLsnOrder(ReorderBuffer *rb);
175
176 /* ---------------------------------------
177  * support functions for lsn-order iterating over the ->changes of a
178  * transaction and its subtransactions
179  *
180  * used for iteration over the k-way heap merge of a transaction and its
181  * subtransactions
182  * ---------------------------------------
183  */
184 static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn);
185 static ReorderBufferChange *
186                         ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
187 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
188                                                    ReorderBufferIterTXNState *state);
189 static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
190
191 /*
192  * ---------------------------------------
193  * Disk serialization support functions
194  * ---------------------------------------
195  */
196 static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
197 static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
198 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
199                                                          int fd, ReorderBufferChange *change);
200 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
201                                                         int *fd, XLogSegNo *segno);
202 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
203                                                    char *change);
204 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
205
206 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
207 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
208                                           ReorderBufferTXN *txn, CommandId cid);
209
210 /* ---------------------------------------
211  * toast reassembly support
212  * ---------------------------------------
213  */
214 static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
215 static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
216 static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
217                                                   Relation relation, ReorderBufferChange *change);
218 static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
219                                                           Relation relation, ReorderBufferChange *change);
220
221
222 /*
223  * Allocate a new ReorderBuffer
224  */
225 ReorderBuffer *
226 ReorderBufferAllocate(void)
227 {
228         ReorderBuffer *buffer;
229         HASHCTL         hash_ctl;
230         MemoryContext new_ctx;
231
232         /* allocate memory in own context, to have better accountability */
233         new_ctx = AllocSetContextCreate(CurrentMemoryContext,
234                                                                         "ReorderBuffer",
235                                                                         ALLOCSET_DEFAULT_MINSIZE,
236                                                                         ALLOCSET_DEFAULT_INITSIZE,
237                                                                         ALLOCSET_DEFAULT_MAXSIZE);
238
239         buffer =
240                 (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
241
242         memset(&hash_ctl, 0, sizeof(hash_ctl));
243
244         buffer->context = new_ctx;
245
246         hash_ctl.keysize = sizeof(TransactionId);
247         hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
248         hash_ctl.hcxt = buffer->context;
249
250         buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
251                                                                  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
252
253         buffer->by_txn_last_xid = InvalidTransactionId;
254         buffer->by_txn_last_txn = NULL;
255
256         buffer->nr_cached_transactions = 0;
257         buffer->nr_cached_changes = 0;
258         buffer->nr_cached_tuplebufs = 0;
259
260         buffer->outbuf = NULL;
261         buffer->outbufsize = 0;
262
263         buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
264
265         dlist_init(&buffer->toplevel_by_lsn);
266         dlist_init(&buffer->cached_transactions);
267         dlist_init(&buffer->cached_changes);
268         slist_init(&buffer->cached_tuplebufs);
269
270         return buffer;
271 }
272
273 /*
274  * Free a ReorderBuffer
275  */
276 void
277 ReorderBufferFree(ReorderBuffer *rb)
278 {
279         MemoryContext context = rb->context;
280
281         /*
282          * We free separately allocated data by entirely scrapping reorderbuffer's
283          * memory context.
284          */
285         MemoryContextDelete(context);
286 }
287
288 /*
289  * Get a unused, possibly preallocated, ReorderBufferTXN.
290  */
291 static ReorderBufferTXN *
292 ReorderBufferGetTXN(ReorderBuffer *rb)
293 {
294         ReorderBufferTXN *txn;
295
296         /* check the slab cache */
297         if (rb->nr_cached_transactions > 0)
298         {
299                 rb->nr_cached_transactions--;
300                 txn = (ReorderBufferTXN *)
301                         dlist_container(ReorderBufferTXN, node,
302                                                         dlist_pop_head_node(&rb->cached_transactions));
303         }
304         else
305         {
306                 txn = (ReorderBufferTXN *)
307                         MemoryContextAlloc(rb->context, sizeof(ReorderBufferTXN));
308         }
309
310         memset(txn, 0, sizeof(ReorderBufferTXN));
311
312         dlist_init(&txn->changes);
313         dlist_init(&txn->tuplecids);
314         dlist_init(&txn->subtxns);
315
316         return txn;
317 }
318
319 /*
320  * Free a ReorderBufferTXN.
321  *
322  * Deallocation might be delayed for efficiency purposes, for details check
323  * the comments above max_cached_changes's definition.
324  */
325 static void
326 ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
327 {
328         /* clean the lookup cache if we were cached (quite likely) */
329         if (rb->by_txn_last_xid == txn->xid)
330         {
331                 rb->by_txn_last_xid = InvalidTransactionId;
332                 rb->by_txn_last_txn = NULL;
333         }
334
335         /* free data that's contained */
336
337         if (txn->tuplecid_hash != NULL)
338         {
339                 hash_destroy(txn->tuplecid_hash);
340                 txn->tuplecid_hash = NULL;
341         }
342
343         if (txn->invalidations)
344         {
345                 pfree(txn->invalidations);
346                 txn->invalidations = NULL;
347         }
348
349         /* check whether to put into the slab cache */
350         if (rb->nr_cached_transactions < max_cached_transactions)
351         {
352                 rb->nr_cached_transactions++;
353                 dlist_push_head(&rb->cached_transactions, &txn->node);
354                 VALGRIND_MAKE_MEM_UNDEFINED(txn, sizeof(ReorderBufferTXN));
355                 VALGRIND_MAKE_MEM_DEFINED(&txn->node, sizeof(txn->node));
356         }
357         else
358         {
359                 pfree(txn);
360         }
361 }
362
363 /*
364  * Get a unused, possibly preallocated, ReorderBufferChange.
365  */
366 ReorderBufferChange *
367 ReorderBufferGetChange(ReorderBuffer *rb)
368 {
369         ReorderBufferChange *change;
370
371         /* check the slab cache */
372         if (rb->nr_cached_changes)
373         {
374                 rb->nr_cached_changes--;
375                 change = (ReorderBufferChange *)
376                         dlist_container(ReorderBufferChange, node,
377                                                         dlist_pop_head_node(&rb->cached_changes));
378         }
379         else
380         {
381                 change = (ReorderBufferChange *)
382                         MemoryContextAlloc(rb->context, sizeof(ReorderBufferChange));
383         }
384
385         memset(change, 0, sizeof(ReorderBufferChange));
386         return change;
387 }
388
389 /*
390  * Free an ReorderBufferChange.
391  *
392  * Deallocation might be delayed for efficiency purposes, for details check
393  * the comments above max_cached_changes's definition.
394  */
395 void
396 ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
397 {
398         /* free contained data */
399         switch (change->action)
400         {
401                 case REORDER_BUFFER_CHANGE_INSERT:
402                 case REORDER_BUFFER_CHANGE_UPDATE:
403                 case REORDER_BUFFER_CHANGE_DELETE:
404                         if (change->data.tp.newtuple)
405                         {
406                                 ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
407                                 change->data.tp.newtuple = NULL;
408                         }
409
410                         if (change->data.tp.oldtuple)
411                         {
412                                 ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
413                                 change->data.tp.oldtuple = NULL;
414                         }
415                         break;
416                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
417                         if (change->data.snapshot)
418                         {
419                                 ReorderBufferFreeSnap(rb, change->data.snapshot);
420                                 change->data.snapshot = NULL;
421                         }
422                         break;
423                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
424                         break;
425                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
426                         break;
427         }
428
429         /* check whether to put into the slab cache */
430         if (rb->nr_cached_changes < max_cached_changes)
431         {
432                 rb->nr_cached_changes++;
433                 dlist_push_head(&rb->cached_changes, &change->node);
434                 VALGRIND_MAKE_MEM_UNDEFINED(change, sizeof(ReorderBufferChange));
435                 VALGRIND_MAKE_MEM_DEFINED(&change->node, sizeof(change->node));
436         }
437         else
438         {
439                 pfree(change);
440         }
441 }
442
443
444 /*
445  * Get a unused, possibly preallocated, ReorderBufferTupleBuf
446  */
447 ReorderBufferTupleBuf *
448 ReorderBufferGetTupleBuf(ReorderBuffer *rb)
449 {
450         ReorderBufferTupleBuf *tuple;
451
452         /* check the slab cache */
453         if (rb->nr_cached_tuplebufs)
454         {
455                 rb->nr_cached_tuplebufs--;
456                 tuple = slist_container(ReorderBufferTupleBuf, node,
457                                                                 slist_pop_head_node(&rb->cached_tuplebufs));
458 #ifdef USE_ASSERT_CHECKING
459                 memset(tuple, 0xa9, sizeof(ReorderBufferTupleBuf));
460 #endif
461         }
462         else
463         {
464                 tuple = (ReorderBufferTupleBuf *)
465                         MemoryContextAlloc(rb->context, sizeof(ReorderBufferTupleBuf));
466         }
467
468         return tuple;
469 }
470
471 /*
472  * Free an ReorderBufferTupleBuf.
473  *
474  * Deallocation might be delayed for efficiency purposes, for details check
475  * the comments above max_cached_changes's definition.
476  */
477 void
478 ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
479 {
480         /* check whether to put into the slab cache */
481         if (rb->nr_cached_tuplebufs < max_cached_tuplebufs)
482         {
483                 rb->nr_cached_tuplebufs++;
484                 slist_push_head(&rb->cached_tuplebufs, &tuple->node);
485                 VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf));
486                 VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
487         }
488         else
489         {
490                 pfree(tuple);
491         }
492 }
493
494 /*
495  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
496  * If create is true, and a transaction doesn't already exist, create it
497  * (with the given LSN, and as top transaction if that's specified);
498  * when this happens, is_new is set to true.
499  */
500 static ReorderBufferTXN *
501 ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
502                                           bool *is_new, XLogRecPtr lsn, bool create_as_top)
503 {
504         ReorderBufferTXN *txn;
505         ReorderBufferTXNByIdEnt *ent;
506         bool            found;
507
508         Assert(TransactionIdIsValid(xid));
509         Assert(!create || lsn != InvalidXLogRecPtr);
510
511         /*
512          * Check the one-entry lookup cache first
513          */
514         if (TransactionIdIsValid(rb->by_txn_last_xid) &&
515                 rb->by_txn_last_xid == xid)
516         {
517                 txn = rb->by_txn_last_txn;
518
519                 if (txn != NULL)
520                 {
521                         /* found it, and it's valid */
522                         if (is_new)
523                                 *is_new = false;
524                         return txn;
525                 }
526
527                 /*
528                  * cached as non-existant, and asked not to create? Then nothing else
529                  * to do.
530                  */
531                 if (!create)
532                         return NULL;
533                 /* otherwise fall through to create it */
534         }
535
536         /*
537          * If the cache wasn't hit or it yielded an "does-not-exist" and we want
538          * to create an entry.
539          */
540
541         /* search the lookup table */
542         ent = (ReorderBufferTXNByIdEnt *)
543                 hash_search(rb->by_txn,
544                                         (void *) &xid,
545                                         create ? HASH_ENTER : HASH_FIND,
546                                         &found);
547         if (found)
548                 txn = ent->txn;
549         else if (create)
550         {
551                 /* initialize the new entry, if creation was requested */
552                 Assert(ent != NULL);
553
554                 ent->txn = ReorderBufferGetTXN(rb);
555                 ent->txn->xid = xid;
556                 txn = ent->txn;
557                 txn->first_lsn = lsn;
558                 txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
559
560                 if (create_as_top)
561                 {
562                         dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
563                         AssertTXNLsnOrder(rb);
564                 }
565         }
566         else
567                 txn = NULL;                             /* not found and not asked to create */
568
569         /* update cache */
570         rb->by_txn_last_xid = xid;
571         rb->by_txn_last_txn = txn;
572
573         if (is_new)
574                 *is_new = !found;
575
576         Assert(!create || !!txn);
577         return txn;
578 }
579
580 /*
581  * Queue a change into a transaction so it can be replayed upon commit.
582  */
583 void
584 ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
585                                                  ReorderBufferChange *change)
586 {
587         ReorderBufferTXN *txn;
588
589         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
590
591         change->lsn = lsn;
592         Assert(InvalidXLogRecPtr != lsn);
593         dlist_push_tail(&txn->changes, &change->node);
594         txn->nentries++;
595         txn->nentries_mem++;
596
597         ReorderBufferCheckSerializeTXN(rb, txn);
598 }
599
600 static void
601 AssertTXNLsnOrder(ReorderBuffer *rb)
602 {
603 #ifdef USE_ASSERT_CHECKING
604         dlist_iter      iter;
605         XLogRecPtr      prev_first_lsn = InvalidXLogRecPtr;
606
607         dlist_foreach(iter, &rb->toplevel_by_lsn)
608         {
609                 ReorderBufferTXN *cur_txn;
610
611                 cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
612                 Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
613
614                 if (cur_txn->end_lsn != InvalidXLogRecPtr)
615                         Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
616
617                 if (prev_first_lsn != InvalidXLogRecPtr)
618                         Assert(prev_first_lsn < cur_txn->first_lsn);
619
620                 Assert(!cur_txn->is_known_as_subxact);
621                 prev_first_lsn = cur_txn->first_lsn;
622         }
623 #endif
624 }
625
626 ReorderBufferTXN *
627 ReorderBufferGetOldestTXN(ReorderBuffer *rb)
628 {
629         ReorderBufferTXN *txn;
630
631         if (dlist_is_empty(&rb->toplevel_by_lsn))
632                 return NULL;
633
634         AssertTXNLsnOrder(rb);
635
636         txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
637
638         Assert(!txn->is_known_as_subxact);
639         Assert(txn->first_lsn != InvalidXLogRecPtr);
640         return txn;
641 }
642
643 void
644 ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
645 {
646         rb->current_restart_decoding_lsn = ptr;
647 }
648
649 void
650 ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
651                                                  TransactionId subxid, XLogRecPtr lsn)
652 {
653         ReorderBufferTXN *txn;
654         ReorderBufferTXN *subtxn;
655         bool            new_top;
656         bool            new_sub;
657
658         txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
659         subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
660
661         if (new_sub)
662         {
663                 /*
664                  * we assign subtransactions to top level transaction even if we don't
665                  * have data for it yet, assignment records frequently reference xids
666                  * that have not yet produced any records. Knowing those aren't top
667                  * level xids allows us to make processing cheaper in some places.
668                  */
669                 dlist_push_tail(&txn->subtxns, &subtxn->node);
670                 txn->nsubtxns++;
671         }
672         else if (!subtxn->is_known_as_subxact)
673         {
674                 subtxn->is_known_as_subxact = true;
675                 Assert(subtxn->nsubtxns == 0);
676
677                 /* remove from lsn order list of top-level transactions */
678                 dlist_delete(&subtxn->node);
679
680                 /* add to toplevel transaction */
681                 dlist_push_tail(&txn->subtxns, &subtxn->node);
682                 txn->nsubtxns++;
683         }
684         else if (new_top)
685         {
686                 elog(ERROR, "existing subxact assigned to unknown toplevel xact");
687         }
688 }
689
690 /*
691  * Associate a subtransaction with its toplevel transaction at commit
692  * time. There may be no further changes added after this.
693  */
694 void
695 ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
696                                                  TransactionId subxid, XLogRecPtr commit_lsn,
697                                                  XLogRecPtr end_lsn)
698 {
699         ReorderBufferTXN *txn;
700         ReorderBufferTXN *subtxn;
701
702         subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
703                                                                    InvalidXLogRecPtr, false);
704
705         /*
706          * No need to do anything if that subtxn didn't contain any changes
707          */
708         if (!subtxn)
709                 return;
710
711         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
712
713         if (txn == NULL)
714                 elog(ERROR, "subxact logged without previous toplevel record");
715
716         /*
717          * Pass the our base snapshot to the parent transaction if it doesn't have
718          * one, or ours is older. That can happen if there are no changes in the
719          * toplevel transaction but in one of the child transactions. This allows
720          * the parent to simply use it's base snapshot initially.
721          */
722         if (txn->base_snapshot == NULL ||
723                 txn->base_snapshot_lsn > subtxn->base_snapshot_lsn)
724         {
725                 txn->base_snapshot = subtxn->base_snapshot;
726                 txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
727                 subtxn->base_snapshot = NULL;
728                 subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
729         }
730
731         subtxn->final_lsn = commit_lsn;
732         subtxn->end_lsn = end_lsn;
733
734         if (!subtxn->is_known_as_subxact)
735         {
736                 subtxn->is_known_as_subxact = true;
737                 Assert(subtxn->nsubtxns == 0);
738
739                 /* remove from lsn order list of top-level transactions */
740                 dlist_delete(&subtxn->node);
741
742                 /* add to subtransaction list */
743                 dlist_push_tail(&txn->subtxns, &subtxn->node);
744                 txn->nsubtxns++;
745         }
746 }
747
748
749 /*
750  * Support for efficiently iterating over a transaction's and its
751  * subtransactions' changes.
752  *
753  * We do by doing a k-way merge between transactions/subtransactions. For that
754  * we model the current heads of the different transactions as a binary heap
755  * so we easily know which (sub-)transaction has the change with the smallest
756  * lsn next.
757  *
758  * We assume the changes in individual transactions are already sorted by LSN.
759  */
760
761 /*
762  * Binary heap comparison function.
763  */
764 static int
765 ReorderBufferIterCompare(Datum a, Datum b, void *arg)
766 {
767         ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
768         XLogRecPtr      pos_a = state->entries[DatumGetInt32(a)].lsn;
769         XLogRecPtr      pos_b = state->entries[DatumGetInt32(b)].lsn;
770
771         if (pos_a < pos_b)
772                 return 1;
773         else if (pos_a == pos_b)
774                 return 0;
775         return -1;
776 }
777
778 /*
779  * Allocate & initialize an iterator which iterates in lsn order over a
780  * transaction and all its subtransactions.
781  */
782 static ReorderBufferIterTXNState *
783 ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
784 {
785         Size            nr_txns = 0;
786         ReorderBufferIterTXNState *state;
787         dlist_iter      cur_txn_i;
788         int32           off;
789
790         /*
791          * Calculate the size of our heap: one element for every transaction that
792          * contains changes.  (Besides the transactions already in the reorder
793          * buffer, we count the one we were directly passed.)
794          */
795         if (txn->nentries > 0)
796                 nr_txns++;
797
798         dlist_foreach(cur_txn_i, &txn->subtxns)
799         {
800                 ReorderBufferTXN *cur_txn;
801
802                 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
803
804                 if (cur_txn->nentries > 0)
805                         nr_txns++;
806         }
807
808         /*
809          * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
810          * need to allocate/build a heap then.
811          */
812
813         /* allocate iteration state */
814         state = (ReorderBufferIterTXNState *)
815                 MemoryContextAllocZero(rb->context,
816                                                            sizeof(ReorderBufferIterTXNState) +
817                                                            sizeof(ReorderBufferIterTXNEntry) * nr_txns);
818
819         state->nr_txns = nr_txns;
820         dlist_init(&state->old_change);
821
822         for (off = 0; off < state->nr_txns; off++)
823         {
824                 state->entries[off].fd = -1;
825                 state->entries[off].segno = 0;
826         }
827
828         /* allocate heap */
829         state->heap = binaryheap_allocate(state->nr_txns,
830                                                                           ReorderBufferIterCompare,
831                                                                           state);
832
833         /*
834          * Now insert items into the binary heap, in an unordered fashion.  (We
835          * will run a heap assembly step at the end; this is more efficient.)
836          */
837
838         off = 0;
839
840         /* add toplevel transaction if it contains changes */
841         if (txn->nentries > 0)
842         {
843                 ReorderBufferChange *cur_change;
844
845                 if (txn->nentries != txn->nentries_mem)
846                         ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
847                                                                                 &state->entries[off].segno);
848
849                 cur_change = dlist_head_element(ReorderBufferChange, node,
850                                                                                 &txn->changes);
851
852                 state->entries[off].lsn = cur_change->lsn;
853                 state->entries[off].change = cur_change;
854                 state->entries[off].txn = txn;
855
856                 binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
857         }
858
859         /* add subtransactions if they contain changes */
860         dlist_foreach(cur_txn_i, &txn->subtxns)
861         {
862                 ReorderBufferTXN *cur_txn;
863
864                 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
865
866                 if (cur_txn->nentries > 0)
867                 {
868                         ReorderBufferChange *cur_change;
869
870                         if (txn->nentries != txn->nentries_mem)
871                                 ReorderBufferRestoreChanges(rb, cur_txn,
872                                                                                         &state->entries[off].fd,
873                                                                                         &state->entries[off].segno);
874
875                         cur_change = dlist_head_element(ReorderBufferChange, node,
876                                                                                         &cur_txn->changes);
877
878                         state->entries[off].lsn = cur_change->lsn;
879                         state->entries[off].change = cur_change;
880                         state->entries[off].txn = cur_txn;
881
882                         binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
883                 }
884         }
885
886         /* assemble a valid binary heap */
887         binaryheap_build(state->heap);
888
889         return state;
890 }
891
892 /*
893  * Return the next change when iterating over a transaction and its
894  * subtransactions.
895  *
896  * Returns NULL when no further changes exist.
897  */
898 static ReorderBufferChange *
899 ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
900 {
901         ReorderBufferChange *change;
902         ReorderBufferIterTXNEntry *entry;
903         int32           off;
904
905         /* nothing there anymore */
906         if (state->heap->bh_size == 0)
907                 return NULL;
908
909         off = DatumGetInt32(binaryheap_first(state->heap));
910         entry = &state->entries[off];
911
912         /* free memory we might have "leaked" in the previous *Next call */
913         if (!dlist_is_empty(&state->old_change))
914         {
915                 change = dlist_container(ReorderBufferChange, node,
916                                                                  dlist_pop_head_node(&state->old_change));
917                 ReorderBufferReturnChange(rb, change);
918                 Assert(dlist_is_empty(&state->old_change));
919         }
920
921         change = entry->change;
922
923         /*
924          * update heap with information about which transaction has the next
925          * relevant change in LSN order
926          */
927
928         /* there are in-memory changes */
929         if (dlist_has_next(&entry->txn->changes, &entry->change->node))
930         {
931                 dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
932                 ReorderBufferChange *next_change =
933                 dlist_container(ReorderBufferChange, node, next);
934
935                 /* txn stays the same */
936                 state->entries[off].lsn = next_change->lsn;
937                 state->entries[off].change = next_change;
938
939                 binaryheap_replace_first(state->heap, Int32GetDatum(off));
940                 return change;
941         }
942
943         /* try to load changes from disk */
944         if (entry->txn->nentries != entry->txn->nentries_mem)
945         {
946                 /*
947                  * Ugly: restoring changes will reuse *Change records, thus delete the
948                  * current one from the per-tx list and only free in the next call.
949                  */
950                 dlist_delete(&change->node);
951                 dlist_push_tail(&state->old_change, &change->node);
952
953                 if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
954                                                                                 &state->entries[off].segno))
955                 {
956                         /* successfully restored changes from disk */
957                         ReorderBufferChange *next_change =
958                         dlist_head_element(ReorderBufferChange, node,
959                                                            &entry->txn->changes);
960
961                         elog(DEBUG2, "restored %u/%u changes from disk",
962                                  (uint32) entry->txn->nentries_mem,
963                                  (uint32) entry->txn->nentries);
964
965                         Assert(entry->txn->nentries_mem);
966                         /* txn stays the same */
967                         state->entries[off].lsn = next_change->lsn;
968                         state->entries[off].change = next_change;
969                         binaryheap_replace_first(state->heap, Int32GetDatum(off));
970
971                         return change;
972                 }
973         }
974
975         /* ok, no changes there anymore, remove */
976         binaryheap_remove_first(state->heap);
977
978         return change;
979 }
980
981 /*
982  * Deallocate the iterator
983  */
984 static void
985 ReorderBufferIterTXNFinish(ReorderBuffer *rb,
986                                                    ReorderBufferIterTXNState *state)
987 {
988         int32           off;
989
990         for (off = 0; off < state->nr_txns; off++)
991         {
992                 if (state->entries[off].fd != -1)
993                         CloseTransientFile(state->entries[off].fd);
994         }
995
996         /* free memory we might have "leaked" in the last *Next call */
997         if (!dlist_is_empty(&state->old_change))
998         {
999                 ReorderBufferChange *change;
1000
1001                 change = dlist_container(ReorderBufferChange, node,
1002                                                                  dlist_pop_head_node(&state->old_change));
1003                 ReorderBufferReturnChange(rb, change);
1004                 Assert(dlist_is_empty(&state->old_change));
1005         }
1006
1007         binaryheap_free(state->heap);
1008         pfree(state);
1009 }
1010
1011 /*
1012  * Cleanup the contents of a transaction, usually after the transaction
1013  * committed or aborted.
1014  */
1015 static void
1016 ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1017 {
1018         bool            found;
1019         dlist_mutable_iter iter;
1020
1021         /* cleanup subtransactions & their changes */
1022         dlist_foreach_modify(iter, &txn->subtxns)
1023         {
1024                 ReorderBufferTXN *subtxn;
1025
1026                 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1027
1028                 /*
1029                  * Subtransactions are always associated to the toplevel TXN, even if
1030                  * they originally were happening inside another subtxn, so we won't
1031                  * ever recurse more than one level deep here.
1032                  */
1033                 Assert(subtxn->is_known_as_subxact);
1034                 Assert(subtxn->nsubtxns == 0);
1035
1036                 ReorderBufferCleanupTXN(rb, subtxn);
1037         }
1038
1039         /* cleanup changes in the toplevel txn */
1040         dlist_foreach_modify(iter, &txn->changes)
1041         {
1042                 ReorderBufferChange *change;
1043
1044                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1045
1046                 ReorderBufferReturnChange(rb, change);
1047         }
1048
1049         /*
1050          * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1051          * They are always stored in the toplevel transaction.
1052          */
1053         dlist_foreach_modify(iter, &txn->tuplecids)
1054         {
1055                 ReorderBufferChange *change;
1056
1057                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1058                 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1059                 ReorderBufferReturnChange(rb, change);
1060         }
1061
1062         if (txn->base_snapshot != NULL)
1063         {
1064                 SnapBuildSnapDecRefcount(txn->base_snapshot);
1065                 txn->base_snapshot = NULL;
1066                 txn->base_snapshot_lsn = InvalidXLogRecPtr;
1067         }
1068
1069         /* delete from list of known subxacts */
1070         if (txn->is_known_as_subxact)
1071         {
1072                 /* NB: nsubxacts count of parent will be too high now */
1073                 dlist_delete(&txn->node);
1074         }
1075         /* delete from LSN ordered list of toplevel TXNs */
1076         else
1077         {
1078                 dlist_delete(&txn->node);
1079         }
1080
1081         /* now remove reference from buffer */
1082         hash_search(rb->by_txn,
1083                                 (void *) &txn->xid,
1084                                 HASH_REMOVE,
1085                                 &found);
1086         Assert(found);
1087
1088         /* remove entries spilled to disk */
1089         if (txn->nentries != txn->nentries_mem)
1090                 ReorderBufferRestoreCleanup(rb, txn);
1091
1092         /* deallocate */
1093         ReorderBufferReturnTXN(rb, txn);
1094 }
1095
1096 /*
1097  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1098  * tqual.c's HeapTupleSatisfiesHistoricMVCC.
1099  */
1100 static void
1101 ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
1102 {
1103         dlist_iter      iter;
1104         HASHCTL         hash_ctl;
1105
1106         if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1107                 return;
1108
1109         memset(&hash_ctl, 0, sizeof(hash_ctl));
1110
1111         hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1112         hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1113         hash_ctl.hcxt = rb->context;
1114
1115         /*
1116          * create the hash with the exact number of to-be-stored tuplecids from
1117          * the start
1118          */
1119         txn->tuplecid_hash =
1120                 hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1121                                         HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1122
1123         dlist_foreach(iter, &txn->tuplecids)
1124         {
1125                 ReorderBufferTupleCidKey key;
1126                 ReorderBufferTupleCidEnt *ent;
1127                 bool            found;
1128                 ReorderBufferChange *change;
1129
1130                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1131
1132                 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1133
1134                 /* be careful about padding */
1135                 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1136
1137                 key.relnode = change->data.tuplecid.node;
1138
1139                 ItemPointerCopy(&change->data.tuplecid.tid,
1140                                                 &key.tid);
1141
1142                 ent = (ReorderBufferTupleCidEnt *)
1143                         hash_search(txn->tuplecid_hash,
1144                                                 (void *) &key,
1145                                                 HASH_ENTER | HASH_FIND,
1146                                                 &found);
1147                 if (!found)
1148                 {
1149                         ent->cmin = change->data.tuplecid.cmin;
1150                         ent->cmax = change->data.tuplecid.cmax;
1151                         ent->combocid = change->data.tuplecid.combocid;
1152                 }
1153                 else
1154                 {
1155                         Assert(ent->cmin == change->data.tuplecid.cmin);
1156                         Assert(ent->cmax == InvalidCommandId ||
1157                                    ent->cmax == change->data.tuplecid.cmax);
1158
1159                         /*
1160                          * if the tuple got valid in this transaction and now got deleted
1161                          * we already have a valid cmin stored. The cmax will be
1162                          * InvalidCommandId though.
1163                          */
1164                         ent->cmax = change->data.tuplecid.cmax;
1165                 }
1166         }
1167 }
1168
1169 /*
1170  * Copy a provided snapshot so we can modify it privately. This is needed so
1171  * that catalog modifying transactions can look into intermediate catalog
1172  * states.
1173  */
1174 static Snapshot
1175 ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
1176                                           ReorderBufferTXN *txn, CommandId cid)
1177 {
1178         Snapshot        snap;
1179         dlist_iter      iter;
1180         int                     i = 0;
1181         Size            size;
1182
1183         size = sizeof(SnapshotData) +
1184                 sizeof(TransactionId) * orig_snap->xcnt +
1185                 sizeof(TransactionId) * (txn->nsubtxns + 1);
1186
1187         snap = MemoryContextAllocZero(rb->context, size);
1188         memcpy(snap, orig_snap, sizeof(SnapshotData));
1189
1190         snap->copied = true;
1191         snap->active_count = 0;
1192         snap->regd_count = 1;
1193         snap->xip = (TransactionId *) (snap + 1);
1194
1195         memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1196
1197         /*
1198          * snap->subxip contains all txids that belong to our transaction which we
1199          * need to check via cmin/cmax. Thats why we store the toplevel
1200          * transaction in there as well.
1201          */
1202         snap->subxip = snap->xip + snap->xcnt;
1203         snap->subxip[i++] = txn->xid;
1204
1205         /*
1206          * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1207          * Since it's an upper boundary it is safe to use it for the allocation
1208          * above.
1209          */
1210         snap->subxcnt = 1;
1211
1212         dlist_foreach(iter, &txn->subtxns)
1213         {
1214                 ReorderBufferTXN *sub_txn;
1215
1216                 sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1217                 snap->subxip[i++] = sub_txn->xid;
1218                 snap->subxcnt++;
1219         }
1220
1221         /* sort so we can bsearch() later */
1222         qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1223
1224         /* store the specified current CommandId */
1225         snap->curcid = cid;
1226
1227         return snap;
1228 }
1229
1230 /*
1231  * Free a previously ReorderBufferCopySnap'ed snapshot
1232  */
1233 static void
1234 ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
1235 {
1236         if (snap->copied)
1237                 pfree(snap);
1238         else
1239                 SnapBuildSnapDecRefcount(snap);
1240 }
1241
1242 /*
1243  * Perform the replay of a transaction and it's non-aborted subtransactions.
1244  *
1245  * Subtransactions previously have to be processed by
1246  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1247  * transaction with ReorderBufferAssignChild.
1248  *
1249  * We currently can only decode a transaction's contents in when their commit
1250  * record is read because that's currently the only place where we know about
1251  * cache invalidations. Thus, once a toplevel commit is read, we iterate over
1252  * the top and subtransactions (using a k-way merge) and replay the changes in
1253  * lsn order.
1254  */
1255 void
1256 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
1257                                         XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1258                                         TimestampTz commit_time)
1259 {
1260         ReorderBufferTXN *txn;
1261         volatile Snapshot snapshot_now;
1262         volatile CommandId command_id = FirstCommandId;
1263         bool            using_subtxn;
1264         ReorderBufferIterTXNState *volatile iterstate = NULL;
1265
1266         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1267                                                                 false);
1268
1269         /* unknown transaction, nothing to replay */
1270         if (txn == NULL)
1271                 return;
1272
1273         txn->final_lsn = commit_lsn;
1274         txn->end_lsn = end_lsn;
1275         txn->commit_time = commit_time;
1276
1277         /* serialize the last bunch of changes if we need start earlier anyway */
1278         if (txn->nentries_mem != txn->nentries)
1279                 ReorderBufferSerializeTXN(rb, txn);
1280
1281         /*
1282          * If this transaction didn't have any real changes in our database, it's
1283          * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1284          * transferred its snapshot to this transaction if it had one and the
1285          * toplevel tx didn't.
1286          */
1287         if (txn->base_snapshot == NULL)
1288         {
1289                 Assert(txn->ninvalidations == 0);
1290                 ReorderBufferCleanupTXN(rb, txn);
1291                 return;
1292         }
1293
1294         snapshot_now = txn->base_snapshot;
1295
1296         /* build data to be able to lookup the CommandIds of catalog tuples */
1297         ReorderBufferBuildTupleCidHash(rb, txn);
1298
1299         /* setup the initial snapshot */
1300         SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1301
1302         /*
1303          * Decoding needs access to syscaches et al., which in turn use
1304          * heavyweight locks and such. Thus we need to have enough state around to
1305          * keep track of those.  The easiest way is to simply use a transaction
1306          * internally.  That also allows us to easily enforce that nothing writes
1307          * to the database by checking for xid assignments.
1308          *
1309          * When we're called via the SQL SRF there's already a transaction
1310          * started, so start an explicit subtransaction there.
1311          */
1312         using_subtxn = IsTransactionOrTransactionBlock();
1313
1314         PG_TRY();
1315         {
1316                 ReorderBufferChange *change;
1317
1318                 if (using_subtxn)
1319                         BeginInternalSubTransaction("replay");
1320                 else
1321                         StartTransactionCommand();
1322
1323                 rb->begin(rb, txn);
1324
1325                 iterstate = ReorderBufferIterTXNInit(rb, txn);
1326                 while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1327                 {
1328                         Relation        relation = NULL;
1329                         Oid                     reloid;
1330
1331                         switch (change->action)
1332                         {
1333                                 case REORDER_BUFFER_CHANGE_INSERT:
1334                                 case REORDER_BUFFER_CHANGE_UPDATE:
1335                                 case REORDER_BUFFER_CHANGE_DELETE:
1336                                         Assert(snapshot_now);
1337
1338                                         reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1339                                                                                         change->data.tp.relnode.relNode);
1340
1341                                         /*
1342                                          * Catalog tuple without data, emitted while catalog was
1343                                          * in the process of being rewritten.
1344                                          */
1345                                         if (reloid == InvalidOid &&
1346                                                 change->data.tp.newtuple == NULL &&
1347                                                 change->data.tp.oldtuple == NULL)
1348                                                 continue;
1349                                         else if (reloid == InvalidOid)
1350                                                 elog(ERROR, "could not map filenode \"%s\" to relation OID",
1351                                                          relpathperm(change->data.tp.relnode,
1352                                                                                  MAIN_FORKNUM));
1353
1354                                         relation = RelationIdGetRelation(reloid);
1355
1356                                         if (relation == NULL)
1357                                                 elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1358                                                          reloid,
1359                                                          relpathperm(change->data.tp.relnode,
1360                                                                                  MAIN_FORKNUM));
1361
1362                                         if (RelationIsLogicallyLogged(relation))
1363                                         {
1364                                                 /*
1365                                                  * For now ignore sequence changes entirely. Most of
1366                                                  * the time they don't log changes using records we
1367                                                  * understand, so it doesn't make sense to handle the
1368                                                  * few cases we do.
1369                                                  */
1370                                                 if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1371                                                 {
1372                                                 }
1373                                                 /* user-triggered change */
1374                                                 else if (!IsToastRelation(relation))
1375                                                 {
1376                                                         ReorderBufferToastReplace(rb, txn, relation, change);
1377                                                         rb->apply_change(rb, txn, relation, change);
1378
1379                                                         /*
1380                                                          * Only clear reassembled toast chunks if we're
1381                                                          * sure they're not required anymore. The creator
1382                                                          * of the tuple tells us.
1383                                                          */
1384                                                         if (change->data.tp.clear_toast_afterwards)
1385                                                                 ReorderBufferToastReset(rb, txn);
1386                                                 }
1387                                                 /* we're not interested in toast deletions */
1388                                                 else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1389                                                 {
1390                                                         /*
1391                                                          * Need to reassemble the full toasted Datum in
1392                                                          * memory, to ensure the chunks don't get reused
1393                                                          * till we're done remove it from the list of this
1394                                                          * transaction's changes. Otherwise it will get
1395                                                          * freed/reused while restoring spooled data from
1396                                                          * disk.
1397                                                          */
1398                                                         dlist_delete(&change->node);
1399                                                         ReorderBufferToastAppendChunk(rb, txn, relation,
1400                                                                                                                   change);
1401                                                 }
1402
1403                                         }
1404                                         RelationClose(relation);
1405                                         break;
1406                                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
1407                                         /* get rid of the old */
1408                                         TeardownHistoricSnapshot(false);
1409
1410                                         if (snapshot_now->copied)
1411                                         {
1412                                                 ReorderBufferFreeSnap(rb, snapshot_now);
1413                                                 snapshot_now =
1414                                                         ReorderBufferCopySnap(rb, change->data.snapshot,
1415                                                                                                   txn, command_id);
1416                                         }
1417
1418                                         /*
1419                                          * Restored from disk, need to be careful not to double
1420                                          * free. We could introduce refcounting for that, but for
1421                                          * now this seems infrequent enough not to care.
1422                                          */
1423                                         else if (change->data.snapshot->copied)
1424                                         {
1425                                                 snapshot_now =
1426                                                         ReorderBufferCopySnap(rb, change->data.snapshot,
1427                                                                                                   txn, command_id);
1428                                         }
1429                                         else
1430                                         {
1431                                                 snapshot_now = change->data.snapshot;
1432                                         }
1433
1434
1435                                         /* and continue with the new one */
1436                                         SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1437                                         break;
1438
1439                                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
1440                                         Assert(change->data.command_id != InvalidCommandId);
1441
1442                                         if (command_id < change->data.command_id)
1443                                         {
1444                                                 command_id = change->data.command_id;
1445
1446                                                 if (!snapshot_now->copied)
1447                                                 {
1448                                                         /* we don't use the global one anymore */
1449                                                         snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1450                                                                                                                         txn, command_id);
1451                                                 }
1452
1453                                                 snapshot_now->curcid = command_id;
1454
1455                                                 TeardownHistoricSnapshot(false);
1456                                                 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1457
1458                                                 /*
1459                                                  * Every time the CommandId is incremented, we could
1460                                                  * see new catalog contents, so execute all
1461                                                  * invalidations.
1462                                                  */
1463                                                 ReorderBufferExecuteInvalidations(rb, txn);
1464                                         }
1465
1466                                         break;
1467
1468                                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
1469                                         elog(ERROR, "tuplecid value in changequeue");
1470                                         break;
1471                         }
1472                 }
1473
1474                 /* clean up the iterator */
1475                 ReorderBufferIterTXNFinish(rb, iterstate);
1476                 iterstate = NULL;
1477
1478                 /* call commit callback */
1479                 rb->commit(rb, txn, commit_lsn);
1480
1481                 /* this is just a sanity check against bad output plugin behaviour */
1482                 if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
1483                         elog(ERROR, "output plugin used XID %u",
1484                                  GetCurrentTransactionId());
1485
1486                 /* cleanup */
1487                 TeardownHistoricSnapshot(false);
1488
1489                 /*
1490                  * Aborting the current (sub-)transaction as a whole has the right
1491                  * semantics. We want all locks acquired in here to be released, not
1492                  * reassigned to the parent and we do not want any database access
1493                  * have persistent effects.
1494                  */
1495                 AbortCurrentTransaction();
1496
1497                 /* make sure there's no cache pollution */
1498                 ReorderBufferExecuteInvalidations(rb, txn);
1499
1500                 if (using_subtxn)
1501                         RollbackAndReleaseCurrentSubTransaction();
1502
1503                 if (snapshot_now->copied)
1504                         ReorderBufferFreeSnap(rb, snapshot_now);
1505
1506                 /* remove potential on-disk data, and deallocate */
1507                 ReorderBufferCleanupTXN(rb, txn);
1508         }
1509         PG_CATCH();
1510         {
1511                 /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1512                 if (iterstate)
1513                         ReorderBufferIterTXNFinish(rb, iterstate);
1514
1515                 TeardownHistoricSnapshot(true);
1516
1517                 /*
1518                  * Force cache invalidation to happen outside of a valid transaction
1519                  * to prevent catalog access as we just caught an error.
1520                  */
1521                 AbortCurrentTransaction();
1522
1523                 /* make sure there's no cache pollution */
1524                 ReorderBufferExecuteInvalidations(rb, txn);
1525
1526                 if (using_subtxn)
1527                         RollbackAndReleaseCurrentSubTransaction();
1528
1529                 if (snapshot_now->copied)
1530                         ReorderBufferFreeSnap(rb, snapshot_now);
1531
1532                 /* remove potential on-disk data, and deallocate */
1533                 ReorderBufferCleanupTXN(rb, txn);
1534
1535                 PG_RE_THROW();
1536         }
1537         PG_END_TRY();
1538 }
1539
1540 /*
1541  * Abort a transaction that possibly has previous changes. Needs to be first
1542  * called for subtransactions and then for the toplevel xid.
1543  *
1544  * NB: Transactions handled here have to have actively aborted (i.e. have
1545  * produced an abort record). Implicitly aborted transactions are handled via
1546  * ReorderBufferAbortOld(); transactions we're just not interesteded in, but
1547  * which have committed are handled in ReorderBufferForget().
1548  *
1549  * This function purges this transaction and its contents from memory and
1550  * disk.
1551  */
1552 void
1553 ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1554 {
1555         ReorderBufferTXN *txn;
1556
1557         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1558                                                                 false);
1559
1560         /* unknown, nothing to remove */
1561         if (txn == NULL)
1562                 return;
1563
1564         /* cosmetic... */
1565         txn->final_lsn = lsn;
1566
1567         /* remove potential on-disk data, and deallocate */
1568         ReorderBufferCleanupTXN(rb, txn);
1569 }
1570
1571 /*
1572  * Abort all transactions that aren't actually running anymore because the
1573  * server restarted.
1574  *
1575  * NB: These really have to be transactions that have aborted due to a server
1576  * crash/immediate restart, as we don't deal with invalidations here.
1577  */
1578 void
1579 ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
1580 {
1581         dlist_mutable_iter it;
1582
1583         /*
1584          * Iterate through all (potential) toplevel TXNs and abort all that are
1585          * older than what possibly can be running. Once we've found the first
1586          * that is alive we stop, there might be some that acquired an xid earlier
1587          * but started writing later, but it's unlikely and they will cleaned up
1588          * in a later call to ReorderBufferAbortOld().
1589          */
1590         dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1591         {
1592                 ReorderBufferTXN *txn;
1593
1594                 txn = dlist_container(ReorderBufferTXN, node, it.cur);
1595
1596                 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1597                 {
1598                         elog(DEBUG1, "aborting old transaction %u", txn->xid);
1599
1600                         /* remove potential on-disk data, and deallocate this tx */
1601                         ReorderBufferCleanupTXN(rb, txn);
1602                 }
1603                 else
1604                         return;
1605         }
1606 }
1607
1608 /*
1609  * Forget the contents of a transaction if we aren't interested in it's
1610  * contents. Needs to be first called for subtransactions and then for the
1611  * toplevel xid.
1612  *
1613  * This is significantly different to ReorderBufferAbort() because
1614  * transactions that have committed need to be treated differenly from aborted
1615  * ones since they may have modified the catalog.
1616  *
1617  * Note that this is only allowed to be called in the moment a transaction
1618  * commit has just been read, not earlier; otherwise later records referring
1619  * to this xid might re-create the transaction incompletely.
1620  */
1621 void
1622 ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1623 {
1624         ReorderBufferTXN *txn;
1625
1626         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1627                                                                 false);
1628
1629         /* unknown, nothing to forget */
1630         if (txn == NULL)
1631                 return;
1632
1633         /* cosmetic... */
1634         txn->final_lsn = lsn;
1635
1636         /*
1637          * Proccess cache invalidation messages if there are any. Even if we're
1638          * not interested in the transaction's contents, it could have manipulated
1639          * the catalog and we need to update the caches according to that.
1640          */
1641         if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1642         {
1643                 bool            use_subtxn = IsTransactionOrTransactionBlock();
1644
1645                 if (use_subtxn)
1646                         BeginInternalSubTransaction("replay");
1647
1648                 /*
1649                  * Force invalidations to happen outside of a valid transaction - that
1650                  * way entries will just be marked as invalid without accessing the
1651                  * catalog. That's advantageous because we don't need to setup the
1652                  * full state necessary for catalog access.
1653                  */
1654                 if (use_subtxn)
1655                         AbortCurrentTransaction();
1656
1657                 ReorderBufferExecuteInvalidations(rb, txn);
1658
1659                 if (use_subtxn)
1660                         RollbackAndReleaseCurrentSubTransaction();
1661         }
1662         else
1663                 Assert(txn->ninvalidations == 0);
1664
1665         /* remove potential on-disk data, and deallocate */
1666         ReorderBufferCleanupTXN(rb, txn);
1667 }
1668
1669
1670 /*
1671  * Check whether a transaction is already known in this module.xs
1672  */
1673 bool
1674 ReorderBufferIsXidKnown(ReorderBuffer *rb, TransactionId xid)
1675 {
1676         ReorderBufferTXN *txn;
1677
1678         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1679                                                                 false);
1680         return txn != NULL;
1681 }
1682
1683 /*
1684  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
1685  * because the previous snapshot doesn't describe the catalog correctly for
1686  * following rows.
1687  */
1688 void
1689 ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
1690                                                  XLogRecPtr lsn, Snapshot snap)
1691 {
1692         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1693
1694         change->data.snapshot = snap;
1695         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
1696
1697         ReorderBufferQueueChange(rb, xid, lsn, change);
1698 }
1699
1700 /*
1701  * Setup the base snapshot of a transaction. The base snapshot is the snapshot
1702  * that is used to decode all changes until either this transaction modifies
1703  * the catalog or another catalog modifying transaction commits.
1704  *
1705  * Needs to be called before any changes are added with
1706  * ReorderBufferQueueChange().
1707  */
1708 void
1709 ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
1710                                                          XLogRecPtr lsn, Snapshot snap)
1711 {
1712         ReorderBufferTXN *txn;
1713         bool            is_new;
1714
1715         txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
1716         Assert(txn->base_snapshot == NULL);
1717         Assert(snap != NULL);
1718
1719         txn->base_snapshot = snap;
1720         txn->base_snapshot_lsn = lsn;
1721 }
1722
1723 /*
1724  * Access the catalog with this CommandId at this point in the changestream.
1725  *
1726  * May only be called for command ids > 1
1727  */
1728 void
1729 ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
1730                                                          XLogRecPtr lsn, CommandId cid)
1731 {
1732         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1733
1734         change->data.command_id = cid;
1735         change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
1736
1737         ReorderBufferQueueChange(rb, xid, lsn, change);
1738 }
1739
1740
1741 /*
1742  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
1743  */
1744 void
1745 ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
1746                                                          XLogRecPtr lsn, RelFileNode node,
1747                                                          ItemPointerData tid, CommandId cmin,
1748                                                          CommandId cmax, CommandId combocid)
1749 {
1750         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1751         ReorderBufferTXN *txn;
1752
1753         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1754
1755         change->data.tuplecid.node = node;
1756         change->data.tuplecid.tid = tid;
1757         change->data.tuplecid.cmin = cmin;
1758         change->data.tuplecid.cmax = cmax;
1759         change->data.tuplecid.combocid = combocid;
1760         change->lsn = lsn;
1761         change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
1762
1763         dlist_push_tail(&txn->tuplecids, &change->node);
1764         txn->ntuplecids++;
1765 }
1766
1767 /*
1768  * Setup the invalidation of the toplevel transaction.
1769  *
1770  * This needs to be done before ReorderBufferCommit is called!
1771  */
1772 void
1773 ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
1774                                                           XLogRecPtr lsn, Size nmsgs,
1775                                                           SharedInvalidationMessage *msgs)
1776 {
1777         ReorderBufferTXN *txn;
1778
1779         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1780
1781         if (txn->ninvalidations != 0)
1782                 elog(ERROR, "only ever add one set of invalidations");
1783
1784         Assert(nmsgs > 0);
1785
1786         txn->ninvalidations = nmsgs;
1787         txn->invalidations = (SharedInvalidationMessage *)
1788                 MemoryContextAlloc(rb->context,
1789                                                    sizeof(SharedInvalidationMessage) * nmsgs);
1790         memcpy(txn->invalidations, msgs,
1791                    sizeof(SharedInvalidationMessage) * nmsgs);
1792 }
1793
1794 /*
1795  * Apply all invalidations we know. Possibly we only need parts at this point
1796  * in the changestream but we don't know which those are.
1797  */
1798 static void
1799 ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
1800 {
1801         int                     i;
1802
1803         for (i = 0; i < txn->ninvalidations; i++)
1804                 LocalExecuteInvalidationMessage(&txn->invalidations[i]);
1805 }
1806
1807 /*
1808  * Mark a transaction as containing catalog changes
1809  */
1810 void
1811 ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
1812                                                                   XLogRecPtr lsn)
1813 {
1814         ReorderBufferTXN *txn;
1815
1816         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1817
1818         txn->has_catalog_changes = true;
1819 }
1820
1821 /*
1822  * Query whether a transaction is already *known* to contain catalog
1823  * changes. This can be wrong until directly before the commit!
1824  */
1825 bool
1826 ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
1827 {
1828         ReorderBufferTXN *txn;
1829
1830         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1831                                                                 false);
1832         if (txn == NULL)
1833                 return false;
1834
1835         return txn->has_catalog_changes;
1836 }
1837
1838 /*
1839  * Have we already added the first snapshot?
1840  */
1841 bool
1842 ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
1843 {
1844         ReorderBufferTXN *txn;
1845
1846         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1847                                                                 false);
1848
1849         /* transaction isn't known yet, ergo no snapshot */
1850         if (txn == NULL)
1851                 return false;
1852
1853         /*
1854          * TODO: It would be a nice improvement if we would check the toplevel
1855          * transaction in subtransactions, but we'd need to keep track of a bit
1856          * more state.
1857          */
1858         return txn->base_snapshot != NULL;
1859 }
1860
1861
1862 /*
1863  * ---------------------------------------
1864  * Disk serialization support
1865  * ---------------------------------------
1866  */
1867
1868 /*
1869  * Ensure the IO buffer is >= sz.
1870  */
1871 static void
1872 ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
1873 {
1874         if (!rb->outbufsize)
1875         {
1876                 rb->outbuf = MemoryContextAlloc(rb->context, sz);
1877                 rb->outbufsize = sz;
1878         }
1879         else if (rb->outbufsize < sz)
1880         {
1881                 rb->outbuf = repalloc(rb->outbuf, sz);
1882                 rb->outbufsize = sz;
1883         }
1884 }
1885
1886 /*
1887  * Check whether the transaction tx should spill its data to disk.
1888  */
1889 static void
1890 ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1891 {
1892         /*
1893          * TODO: improve accounting so we cheaply can take subtransactions into
1894          * account here.
1895          */
1896         if (txn->nentries_mem >= max_changes_in_memory)
1897         {
1898                 ReorderBufferSerializeTXN(rb, txn);
1899                 Assert(txn->nentries_mem == 0);
1900         }
1901 }
1902
1903 /*
1904  * Spill data of a large transaction (and its subtransactions) to disk.
1905  */
1906 static void
1907 ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1908 {
1909         dlist_iter      subtxn_i;
1910         dlist_mutable_iter change_i;
1911         int                     fd = -1;
1912         XLogSegNo       curOpenSegNo = 0;
1913         Size            spilled = 0;
1914         char            path[MAXPGPATH];
1915
1916         elog(DEBUG2, "spill %u changes in XID %u to disk",
1917                  (uint32) txn->nentries_mem, txn->xid);
1918
1919         /* do the same to all child TXs */
1920         dlist_foreach(subtxn_i, &txn->subtxns)
1921         {
1922                 ReorderBufferTXN *subtxn;
1923
1924                 subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
1925                 ReorderBufferSerializeTXN(rb, subtxn);
1926         }
1927
1928         /* serialize changestream */
1929         dlist_foreach_modify(change_i, &txn->changes)
1930         {
1931                 ReorderBufferChange *change;
1932
1933                 change = dlist_container(ReorderBufferChange, node, change_i.cur);
1934
1935                 /*
1936                  * store in segment in which it belongs by start lsn, don't split over
1937                  * multiple segments tho
1938                  */
1939                 if (fd == -1 || XLByteInSeg(change->lsn, curOpenSegNo))
1940                 {
1941                         XLogRecPtr      recptr;
1942
1943                         if (fd != -1)
1944                                 CloseTransientFile(fd);
1945
1946                         XLByteToSeg(change->lsn, curOpenSegNo);
1947                         XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
1948
1949                         /*
1950                          * No need to care about TLIs here, only used during a single run,
1951                          * so each LSN only maps to a specific WAL record.
1952                          */
1953                         sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
1954                                         NameStr(MyReplicationSlot->data.name), txn->xid,
1955                                         (uint32) (recptr >> 32), (uint32) recptr);
1956
1957                         /* open segment, create it if necessary */
1958                         fd = OpenTransientFile(path,
1959                                                                    O_CREAT | O_WRONLY | O_APPEND | PG_BINARY,
1960                                                                    S_IRUSR | S_IWUSR);
1961
1962                         if (fd < 0)
1963                                 ereport(ERROR,
1964                                                 (errcode_for_file_access(),
1965                                                  errmsg("could not open file \"%s\": %m",
1966                                                                 path)));
1967                 }
1968
1969                 ReorderBufferSerializeChange(rb, txn, fd, change);
1970                 dlist_delete(&change->node);
1971                 ReorderBufferReturnChange(rb, change);
1972
1973                 spilled++;
1974         }
1975
1976         Assert(spilled == txn->nentries_mem);
1977         Assert(dlist_is_empty(&txn->changes));
1978         txn->nentries_mem = 0;
1979
1980         if (fd != -1)
1981                 CloseTransientFile(fd);
1982 }
1983
1984 /*
1985  * Serialize individual change to disk.
1986  */
1987 static void
1988 ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
1989                                                          int fd, ReorderBufferChange *change)
1990 {
1991         ReorderBufferDiskChange *ondisk;
1992         Size            sz = sizeof(ReorderBufferDiskChange);
1993
1994         ReorderBufferSerializeReserve(rb, sz);
1995
1996         ondisk = (ReorderBufferDiskChange *) rb->outbuf;
1997         memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
1998
1999         switch (change->action)
2000         {
2001                 case REORDER_BUFFER_CHANGE_INSERT:
2002                         /* fall through */
2003                 case REORDER_BUFFER_CHANGE_UPDATE:
2004                         /* fall through */
2005                 case REORDER_BUFFER_CHANGE_DELETE:
2006                         {
2007                                 char       *data;
2008                                 ReorderBufferTupleBuf *oldtup,
2009                                                    *newtup;
2010                                 Size            oldlen = 0;
2011                                 Size            newlen = 0;
2012
2013                                 oldtup = change->data.tp.oldtuple;
2014                                 newtup = change->data.tp.newtuple;
2015
2016                                 if (oldtup)
2017                                         oldlen = offsetof(ReorderBufferTupleBuf, data)
2018                                                 +oldtup->tuple.t_len
2019                                                 - offsetof(HeapTupleHeaderData, t_bits);
2020
2021                                 if (newtup)
2022                                         newlen = offsetof(ReorderBufferTupleBuf, data)
2023                                                 +newtup->tuple.t_len
2024                                                 - offsetof(HeapTupleHeaderData, t_bits);
2025
2026                                 sz += oldlen;
2027                                 sz += newlen;
2028
2029                                 /* make sure we have enough space */
2030                                 ReorderBufferSerializeReserve(rb, sz);
2031
2032                                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2033                                 /* might have been reallocated above */
2034                                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2035
2036                                 if (oldlen)
2037                                 {
2038                                         memcpy(data, oldtup, oldlen);
2039                                         data += oldlen;
2040                                 }
2041
2042                                 if (newlen)
2043                                 {
2044                                         memcpy(data, newtup, newlen);
2045                                         data += newlen;
2046                                 }
2047                                 break;
2048                         }
2049                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2050                         {
2051                                 Snapshot        snap;
2052                                 char       *data;
2053
2054                                 snap = change->data.snapshot;
2055
2056                                 sz += sizeof(SnapshotData) +
2057                                         sizeof(TransactionId) * snap->xcnt +
2058                                         sizeof(TransactionId) * snap->subxcnt
2059                                         ;
2060
2061                                 /* make sure we have enough space */
2062                                 ReorderBufferSerializeReserve(rb, sz);
2063                                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2064                                 /* might have been reallocated above */
2065                                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2066
2067                                 memcpy(data, snap, sizeof(SnapshotData));
2068                                 data += sizeof(SnapshotData);
2069
2070                                 if (snap->xcnt)
2071                                 {
2072                                         memcpy(data, snap->xip,
2073                                                    sizeof(TransactionId) * snap->xcnt);
2074                                         data += sizeof(TransactionId) * snap->xcnt;
2075                                 }
2076
2077                                 if (snap->subxcnt)
2078                                 {
2079                                         memcpy(data, snap->subxip,
2080                                                    sizeof(TransactionId) * snap->subxcnt);
2081                                         data += sizeof(TransactionId) * snap->subxcnt;
2082                                 }
2083                                 break;
2084                         }
2085                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2086                         /* ReorderBufferChange contains everything important */
2087                         break;
2088                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2089                         /* ReorderBufferChange contains everything important */
2090                         break;
2091         }
2092
2093         ondisk->size = sz;
2094
2095         if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2096         {
2097                 CloseTransientFile(fd);
2098                 ereport(ERROR,
2099                                 (errcode_for_file_access(),
2100                                  errmsg("could not write to data file for XID %u: %m",
2101                                                 txn->xid)));
2102         }
2103
2104         Assert(ondisk->change.action == change->action);
2105 }
2106
2107 /*
2108  * Restore a number of changes spilled to disk back into memory.
2109  */
2110 static Size
2111 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2112                                                         int *fd, XLogSegNo *segno)
2113 {
2114         Size            restored = 0;
2115         XLogSegNo       last_segno;
2116         dlist_mutable_iter cleanup_iter;
2117
2118         Assert(txn->first_lsn != InvalidXLogRecPtr);
2119         Assert(txn->final_lsn != InvalidXLogRecPtr);
2120
2121         /* free current entries, so we have memory for more */
2122         dlist_foreach_modify(cleanup_iter, &txn->changes)
2123         {
2124                 ReorderBufferChange *cleanup =
2125                 dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2126
2127                 dlist_delete(&cleanup->node);
2128                 ReorderBufferReturnChange(rb, cleanup);
2129         }
2130         txn->nentries_mem = 0;
2131         Assert(dlist_is_empty(&txn->changes));
2132
2133         XLByteToSeg(txn->final_lsn, last_segno);
2134
2135         while (restored < max_changes_in_memory && *segno <= last_segno)
2136         {
2137                 int                     readBytes;
2138                 ReorderBufferDiskChange *ondisk;
2139
2140                 if (*fd == -1)
2141                 {
2142                         XLogRecPtr      recptr;
2143                         char            path[MAXPGPATH];
2144
2145                         /* first time in */
2146                         if (*segno == 0)
2147                         {
2148                                 XLByteToSeg(txn->first_lsn, *segno);
2149                         }
2150
2151                         Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2152                         XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
2153
2154                         /*
2155                          * No need to care about TLIs here, only used during a single run,
2156                          * so each LSN only maps to a specific WAL record.
2157                          */
2158                         sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2159                                         NameStr(MyReplicationSlot->data.name), txn->xid,
2160                                         (uint32) (recptr >> 32), (uint32) recptr);
2161
2162                         *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2163                         if (*fd < 0 && errno == ENOENT)
2164                         {
2165                                 *fd = -1;
2166                                 (*segno)++;
2167                                 continue;
2168                         }
2169                         else if (*fd < 0)
2170                                 ereport(ERROR,
2171                                                 (errcode_for_file_access(),
2172                                                  errmsg("could not open file \"%s\": %m",
2173                                                                 path)));
2174
2175                 }
2176
2177                 /*
2178                  * Read the statically sized part of a change which has information
2179                  * about the total size. If we couldn't read a record, we're at the
2180                  * end of this file.
2181                  */
2182                 ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
2183                 readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2184
2185                 /* eof */
2186                 if (readBytes == 0)
2187                 {
2188                         CloseTransientFile(*fd);
2189                         *fd = -1;
2190                         (*segno)++;
2191                         continue;
2192                 }
2193                 else if (readBytes < 0)
2194                         ereport(ERROR,
2195                                         (errcode_for_file_access(),
2196                                 errmsg("could not read from reorderbuffer spill file: %m")));
2197                 else if (readBytes != sizeof(ReorderBufferDiskChange))
2198                         ereport(ERROR,
2199                                         (errcode_for_file_access(),
2200                                          errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2201                                                         readBytes,
2202                                                         (uint32) sizeof(ReorderBufferDiskChange))));
2203
2204                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2205
2206                 ReorderBufferSerializeReserve(rb,
2207                                                          sizeof(ReorderBufferDiskChange) + ondisk->size);
2208                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2209
2210                 readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2211                                                  ondisk->size - sizeof(ReorderBufferDiskChange));
2212
2213                 if (readBytes < 0)
2214                         ereport(ERROR,
2215                                         (errcode_for_file_access(),
2216                                 errmsg("could not read from reorderbuffer spill file: %m")));
2217                 else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2218                         ereport(ERROR,
2219                                         (errcode_for_file_access(),
2220                                          errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2221                                                         readBytes,
2222                                 (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2223
2224                 /*
2225                  * ok, read a full change from disk, now restore it into proper
2226                  * in-memory format
2227                  */
2228                 ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2229                 restored++;
2230         }
2231
2232         return restored;
2233 }
2234
2235 /*
2236  * Convert change from its on-disk format to in-memory format and queue it onto
2237  * the TXN's ->changes list.
2238  */
2239 static void
2240 ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2241                                                    char *data)
2242 {
2243         ReorderBufferDiskChange *ondisk;
2244         ReorderBufferChange *change;
2245
2246         ondisk = (ReorderBufferDiskChange *) data;
2247
2248         change = ReorderBufferGetChange(rb);
2249
2250         /* copy static part */
2251         memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2252
2253         data += sizeof(ReorderBufferDiskChange);
2254
2255         /* restore individual stuff */
2256         switch (change->action)
2257         {
2258                 case REORDER_BUFFER_CHANGE_INSERT:
2259                         /* fall through */
2260                 case REORDER_BUFFER_CHANGE_UPDATE:
2261                         /* fall through */
2262                 case REORDER_BUFFER_CHANGE_DELETE:
2263                         if (change->data.tp.newtuple)
2264                         {
2265                                 Size            len = offsetof(ReorderBufferTupleBuf, data)
2266                                 +((ReorderBufferTupleBuf *) data)->tuple.t_len
2267                                 - offsetof(HeapTupleHeaderData, t_bits);
2268
2269                                 change->data.tp.newtuple = ReorderBufferGetTupleBuf(rb);
2270                                 memcpy(change->data.tp.newtuple, data, len);
2271                                 change->data.tp.newtuple->tuple.t_data =
2272                                         &change->data.tp.newtuple->header;
2273                                 data += len;
2274                         }
2275
2276                         if (change->data.tp.oldtuple)
2277                         {
2278                                 Size            len = offsetof(ReorderBufferTupleBuf, data)
2279                                 +((ReorderBufferTupleBuf *) data)->tuple.t_len
2280                                 - offsetof(HeapTupleHeaderData, t_bits);
2281
2282                                 change->data.tp.oldtuple = ReorderBufferGetTupleBuf(rb);
2283                                 memcpy(change->data.tp.oldtuple, data, len);
2284                                 change->data.tp.oldtuple->tuple.t_data =
2285                                         &change->data.tp.oldtuple->header;
2286                                 data += len;
2287                         }
2288                         break;
2289                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2290                         {
2291                                 Snapshot        oldsnap;
2292                                 Snapshot        newsnap;
2293                                 Size            size;
2294
2295                                 oldsnap = (Snapshot) data;
2296
2297                                 size = sizeof(SnapshotData) +
2298                                         sizeof(TransactionId) * oldsnap->xcnt +
2299                                         sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2300
2301                                 change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2302
2303                                 newsnap = change->data.snapshot;
2304
2305                                 memcpy(newsnap, data, size);
2306                                 newsnap->xip = (TransactionId *)
2307                                         (((char *) newsnap) + sizeof(SnapshotData));
2308                                 newsnap->subxip = newsnap->xip + newsnap->xcnt;
2309                                 newsnap->copied = true;
2310                                 break;
2311                         }
2312                         /* the base struct contains all the data, easy peasy */
2313                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2314                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2315                         break;
2316         }
2317
2318         dlist_push_tail(&txn->changes, &change->node);
2319         txn->nentries_mem++;
2320 }
2321
2322 /*
2323  * Remove all on-disk stored for the passed in transaction.
2324  */
2325 static void
2326 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
2327 {
2328         XLogSegNo       first;
2329         XLogSegNo       cur;
2330         XLogSegNo       last;
2331
2332         Assert(txn->first_lsn != InvalidXLogRecPtr);
2333         Assert(txn->final_lsn != InvalidXLogRecPtr);
2334
2335         XLByteToSeg(txn->first_lsn, first);
2336         XLByteToSeg(txn->final_lsn, last);
2337
2338         /* iterate over all possible filenames, and delete them */
2339         for (cur = first; cur <= last; cur++)
2340         {
2341                 char            path[MAXPGPATH];
2342                 XLogRecPtr      recptr;
2343
2344                 XLogSegNoOffsetToRecPtr(cur, 0, recptr);
2345
2346                 sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2347                                 NameStr(MyReplicationSlot->data.name), txn->xid,
2348                                 (uint32) (recptr >> 32), (uint32) recptr);
2349                 if (unlink(path) != 0 && errno != ENOENT)
2350                         ereport(ERROR,
2351                                         (errcode_for_file_access(),
2352                                          errmsg("could not remove file \"%s\": %m", path)));
2353         }
2354 }
2355
2356 /*
2357  * Delete all data spilled to disk after we've restarted/crashed. It will be
2358  * recreated when the respective slots are reused.
2359  */
2360 void
2361 StartupReorderBuffer(void)
2362 {
2363         DIR                *logical_dir;
2364         struct dirent *logical_de;
2365
2366         DIR                *spill_dir;
2367         struct dirent *spill_de;
2368
2369         logical_dir = AllocateDir("pg_replslot");
2370         while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2371         {
2372                 struct stat statbuf;
2373                 char            path[MAXPGPATH];
2374
2375                 if (strcmp(logical_de->d_name, ".") == 0 ||
2376                         strcmp(logical_de->d_name, "..") == 0)
2377                         continue;
2378
2379                 /* if it cannot be a slot, skip the directory */
2380                 if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2381                         continue;
2382
2383                 /*
2384                  * ok, has to be a surviving logical slot, iterate and delete
2385                  * everythign starting with xid-*
2386                  */
2387                 sprintf(path, "pg_replslot/%s", logical_de->d_name);
2388
2389                 /* we're only creating directories here, skip if it's not our's */
2390                 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2391                         continue;
2392
2393                 spill_dir = AllocateDir(path);
2394                 while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2395                 {
2396                         if (strcmp(spill_de->d_name, ".") == 0 ||
2397                                 strcmp(spill_de->d_name, "..") == 0)
2398                                 continue;
2399
2400                         /* only look at names that can be ours */
2401                         if (strncmp(spill_de->d_name, "xid", 3) == 0)
2402                         {
2403                                 sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2404                                                 spill_de->d_name);
2405
2406                                 if (unlink(path) != 0)
2407                                         ereport(PANIC,
2408                                                         (errcode_for_file_access(),
2409                                                          errmsg("could not remove file \"%s\": %m",
2410                                                                         path)));
2411                         }
2412                 }
2413                 FreeDir(spill_dir);
2414         }
2415         FreeDir(logical_dir);
2416 }
2417
2418 /* ---------------------------------------
2419  * toast reassembly support
2420  * ---------------------------------------
2421  */
2422
2423 /*
2424  * Initialize per tuple toast reconstruction support.
2425  */
2426 static void
2427 ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
2428 {
2429         HASHCTL         hash_ctl;
2430
2431         Assert(txn->toast_hash == NULL);
2432
2433         memset(&hash_ctl, 0, sizeof(hash_ctl));
2434         hash_ctl.keysize = sizeof(Oid);
2435         hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2436         hash_ctl.hcxt = rb->context;
2437         txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2438                                                                   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
2439 }
2440
2441 /*
2442  * Per toast-chunk handling for toast reconstruction
2443  *
2444  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
2445  * toasted Datum comes along.
2446  */
2447 static void
2448 ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
2449                                                           Relation relation, ReorderBufferChange *change)
2450 {
2451         ReorderBufferToastEnt *ent;
2452         ReorderBufferTupleBuf *newtup;
2453         bool            found;
2454         int32           chunksize;
2455         bool            isnull;
2456         Pointer         chunk;
2457         TupleDesc       desc = RelationGetDescr(relation);
2458         Oid                     chunk_id;
2459         int32           chunk_seq;
2460
2461         if (txn->toast_hash == NULL)
2462                 ReorderBufferToastInitHash(rb, txn);
2463
2464         Assert(IsToastRelation(relation));
2465
2466         newtup = change->data.tp.newtuple;
2467         chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2468         Assert(!isnull);
2469         chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2470         Assert(!isnull);
2471
2472         ent = (ReorderBufferToastEnt *)
2473                 hash_search(txn->toast_hash,
2474                                         (void *) &chunk_id,
2475                                         HASH_ENTER,
2476                                         &found);
2477
2478         if (!found)
2479         {
2480                 Assert(ent->chunk_id == chunk_id);
2481                 ent->num_chunks = 0;
2482                 ent->last_chunk_seq = 0;
2483                 ent->size = 0;
2484                 ent->reconstructed = NULL;
2485                 dlist_init(&ent->chunks);
2486
2487                 if (chunk_seq != 0)
2488                         elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2489                                  chunk_seq, chunk_id);
2490         }
2491         else if (found && chunk_seq != ent->last_chunk_seq + 1)
2492                 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2493                          chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2494
2495         chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2496         Assert(!isnull);
2497
2498         /* calculate size so we can allocate the right size at once later */
2499         if (!VARATT_IS_EXTENDED(chunk))
2500                 chunksize = VARSIZE(chunk) - VARHDRSZ;
2501         else if (VARATT_IS_SHORT(chunk))
2502                 /* could happen due to heap_form_tuple doing its thing */
2503                 chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2504         else
2505                 elog(ERROR, "unexpected type of toast chunk");
2506
2507         ent->size += chunksize;
2508         ent->last_chunk_seq = chunk_seq;
2509         ent->num_chunks++;
2510         dlist_push_tail(&ent->chunks, &change->node);
2511 }
2512
2513 /*
2514  * Rejigger change->newtuple to point to in-memory toast tuples instead to
2515  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
2516  *
2517  * We cannot replace unchanged toast tuples though, so those will still point
2518  * to on-disk toast data.
2519  */
2520 static void
2521 ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
2522                                                   Relation relation, ReorderBufferChange *change)
2523 {
2524         TupleDesc       desc;
2525         int                     natt;
2526         Datum      *attrs;
2527         bool       *isnull;
2528         bool       *free;
2529         HeapTuple       tmphtup;
2530         Relation        toast_rel;
2531         TupleDesc       toast_desc;
2532         MemoryContext oldcontext;
2533         ReorderBufferTupleBuf *newtup;
2534
2535         /* no toast tuples changed */
2536         if (txn->toast_hash == NULL)
2537                 return;
2538
2539         oldcontext = MemoryContextSwitchTo(rb->context);
2540
2541         /* we should only have toast tuples in an INSERT or UPDATE */
2542         Assert(change->data.tp.newtuple);
2543
2544         desc = RelationGetDescr(relation);
2545
2546         toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
2547         toast_desc = RelationGetDescr(toast_rel);
2548
2549         /* should we allocate from stack instead? */
2550         attrs = palloc0(sizeof(Datum) * desc->natts);
2551         isnull = palloc0(sizeof(bool) * desc->natts);
2552         free = palloc0(sizeof(bool) * desc->natts);
2553
2554         newtup = change->data.tp.newtuple;
2555
2556         heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
2557
2558         for (natt = 0; natt < desc->natts; natt++)
2559         {
2560                 Form_pg_attribute attr = desc->attrs[natt];
2561                 ReorderBufferToastEnt *ent;
2562                 struct varlena *varlena;
2563
2564                 /* va_rawsize is the size of the original datum -- including header */
2565                 struct varatt_external toast_pointer;
2566                 struct varatt_indirect redirect_pointer;
2567                 struct varlena *new_datum = NULL;
2568                 struct varlena *reconstructed;
2569                 dlist_iter      it;
2570                 Size            data_done = 0;
2571
2572                 /* system columns aren't toasted */
2573                 if (attr->attnum < 0)
2574                         continue;
2575
2576                 if (attr->attisdropped)
2577                         continue;
2578
2579                 /* not a varlena datatype */
2580                 if (attr->attlen != -1)
2581                         continue;
2582
2583                 /* no data */
2584                 if (isnull[natt])
2585                         continue;
2586
2587                 /* ok, we know we have a toast datum */
2588                 varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
2589
2590                 /* no need to do anything if the tuple isn't external */
2591                 if (!VARATT_IS_EXTERNAL(varlena))
2592                         continue;
2593
2594                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
2595
2596                 /*
2597                  * Check whether the toast tuple changed, replace if so.
2598                  */
2599                 ent = (ReorderBufferToastEnt *)
2600                         hash_search(txn->toast_hash,
2601                                                 (void *) &toast_pointer.va_valueid,
2602                                                 HASH_FIND,
2603                                                 NULL);
2604                 if (ent == NULL)
2605                         continue;
2606
2607                 new_datum =
2608                         (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
2609
2610                 free[natt] = true;
2611
2612                 reconstructed = palloc0(toast_pointer.va_rawsize);
2613
2614                 ent->reconstructed = reconstructed;
2615
2616                 /* stitch toast tuple back together from its parts */
2617                 dlist_foreach(it, &ent->chunks)
2618                 {
2619                         bool            isnull;
2620                         ReorderBufferChange *cchange;
2621                         ReorderBufferTupleBuf *ctup;
2622                         Pointer         chunk;
2623
2624                         cchange = dlist_container(ReorderBufferChange, node, it.cur);
2625                         ctup = cchange->data.tp.newtuple;
2626                         chunk = DatumGetPointer(
2627                                                   fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
2628
2629                         Assert(!isnull);
2630                         Assert(!VARATT_IS_EXTERNAL(chunk));
2631                         Assert(!VARATT_IS_SHORT(chunk));
2632
2633                         memcpy(VARDATA(reconstructed) + data_done,
2634                                    VARDATA(chunk),
2635                                    VARSIZE(chunk) - VARHDRSZ);
2636                         data_done += VARSIZE(chunk) - VARHDRSZ;
2637                 }
2638                 Assert(data_done == toast_pointer.va_extsize);
2639
2640                 /* make sure its marked as compressed or not */
2641                 if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2642                         SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
2643                 else
2644                         SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
2645
2646                 memset(&redirect_pointer, 0, sizeof(redirect_pointer));
2647                 redirect_pointer.pointer = reconstructed;
2648
2649                 SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
2650                 memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
2651                            sizeof(redirect_pointer));
2652
2653                 attrs[natt] = PointerGetDatum(new_datum);
2654         }
2655
2656         /*
2657          * Build tuple in separate memory & copy tuple back into the tuplebuf
2658          * passed to the output plugin. We can't directly heap_fill_tuple() into
2659          * the tuplebuf because attrs[] will point back into the current content.
2660          */
2661         tmphtup = heap_form_tuple(desc, attrs, isnull);
2662         Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
2663         Assert(&newtup->header == newtup->tuple.t_data);
2664
2665         memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
2666         newtup->tuple.t_len = tmphtup->t_len;
2667
2668         /*
2669          * free resources we won't further need, more persistent stuff will be
2670          * free'd in ReorderBufferToastReset().
2671          */
2672         RelationClose(toast_rel);
2673         pfree(tmphtup);
2674         for (natt = 0; natt < desc->natts; natt++)
2675         {
2676                 if (free[natt])
2677                         pfree(DatumGetPointer(attrs[natt]));
2678         }
2679         pfree(attrs);
2680         pfree(free);
2681         pfree(isnull);
2682
2683         MemoryContextSwitchTo(oldcontext);
2684 }
2685
2686 /*
2687  * Free all resources allocated for toast reconstruction.
2688  */
2689 static void
2690 ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
2691 {
2692         HASH_SEQ_STATUS hstat;
2693         ReorderBufferToastEnt *ent;
2694
2695         if (txn->toast_hash == NULL)
2696                 return;
2697
2698         /* sequentially walk over the hash and free everything */
2699         hash_seq_init(&hstat, txn->toast_hash);
2700         while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
2701         {
2702                 dlist_mutable_iter it;
2703
2704                 if (ent->reconstructed != NULL)
2705                         pfree(ent->reconstructed);
2706
2707                 dlist_foreach_modify(it, &ent->chunks)
2708                 {
2709                         ReorderBufferChange *change =
2710                         dlist_container(ReorderBufferChange, node, it.cur);
2711
2712                         dlist_delete(&change->node);
2713                         ReorderBufferReturnChange(rb, change);
2714                 }
2715         }
2716
2717         hash_destroy(txn->toast_hash);
2718         txn->toast_hash = NULL;
2719 }
2720
2721
2722 /* ---------------------------------------
2723  * Visibility support for logical decoding
2724  *
2725  *
2726  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
2727  * always rely on stored cmin/cmax values because of two scenarios:
2728  *
2729  * * A tuple got changed multiple times during a single transaction and thus
2730  *       has got a combocid. Combocid's are only valid for the duration of a
2731  *       single transaction.
2732  * * A tuple with a cmin but no cmax (and thus no combocid) got
2733  *       deleted/updated in another transaction than the one which created it
2734  *       which we are looking at right now. As only one of cmin, cmax or combocid
2735  *       is actually stored in the heap we don't have access to the value we
2736  *       need anymore.
2737  *
2738  * To resolve those problems we have a per-transaction hash of (cmin,
2739  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
2740  * (cmin, cmax) values. That also takes care of combocids by simply
2741  * not caring about them at all. As we have the real cmin/cmax values
2742  * combocids aren't interesting.
2743  *
2744  * As we only care about catalog tuples here the overhead of this
2745  * hashtable should be acceptable.
2746  *
2747  * Heap rewrites complicate this a bit, check rewriteheap.c for
2748  * details.
2749  * -------------------------------------------------------------------------
2750  */
2751
2752 /* struct for qsort()ing mapping files by lsn somewhat efficiently */
2753 typedef struct RewriteMappingFile
2754 {
2755         XLogRecPtr      lsn;
2756         char            fname[MAXPGPATH];
2757 } RewriteMappingFile;
2758
2759 #if NOT_USED
2760 static void
2761 DisplayMapping(HTAB *tuplecid_data)
2762 {
2763         HASH_SEQ_STATUS hstat;
2764         ReorderBufferTupleCidEnt *ent;
2765
2766         hash_seq_init(&hstat, tuplecid_data);
2767         while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
2768         {
2769                 elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
2770                          ent->key.relnode.dbNode,
2771                          ent->key.relnode.spcNode,
2772                          ent->key.relnode.relNode,
2773                          BlockIdGetBlockNumber(&ent->key.tid.ip_blkid),
2774                          ent->key.tid.ip_posid,
2775                          ent->cmin,
2776                          ent->cmax
2777                         );
2778         }
2779 }
2780 #endif
2781
2782 /*
2783  * Apply a single mapping file to tuplecid_data.
2784  *
2785  * The mapping file has to have been verified to be a) committed b) for our
2786  * transaction c) applied in LSN order.
2787  */
2788 static void
2789 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
2790 {
2791         char            path[MAXPGPATH];
2792         int                     fd;
2793         int                     readBytes;
2794         LogicalRewriteMappingData map;
2795
2796         sprintf(path, "pg_logical/mappings/%s", fname);
2797         fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2798         if (fd < 0)
2799                 ereport(ERROR,
2800                                 (errmsg("could not open file \"%s\": %m", path)));
2801
2802         while (true)
2803         {
2804                 ReorderBufferTupleCidKey key;
2805                 ReorderBufferTupleCidEnt *ent;
2806                 ReorderBufferTupleCidEnt *new_ent;
2807                 bool            found;
2808
2809                 /* be careful about padding */
2810                 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
2811
2812                 /* read all mappings till the end of the file */
2813                 readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
2814
2815                 if (readBytes < 0)
2816                         ereport(ERROR,
2817                                         (errcode_for_file_access(),
2818                                          errmsg("could not read file \"%s\": %m",
2819                                                         path)));
2820                 else if (readBytes == 0)        /* EOF */
2821                         break;
2822                 else if (readBytes != sizeof(LogicalRewriteMappingData))
2823                         ereport(ERROR,
2824                                         (errcode_for_file_access(),
2825                                          errmsg("could not read from file \"%s\": read %d instead of %d bytes",
2826                                                         path, readBytes,
2827                                                         (int32) sizeof(LogicalRewriteMappingData))));
2828
2829                 key.relnode = map.old_node;
2830                 ItemPointerCopy(&map.old_tid,
2831                                                 &key.tid);
2832
2833
2834                 ent = (ReorderBufferTupleCidEnt *)
2835                         hash_search(tuplecid_data,
2836                                                 (void *) &key,
2837                                                 HASH_FIND,
2838                                                 NULL);
2839
2840                 /* no existing mapping, no need to update */
2841                 if (!ent)
2842                         continue;
2843
2844                 key.relnode = map.new_node;
2845                 ItemPointerCopy(&map.new_tid,
2846                                                 &key.tid);
2847
2848                 new_ent = (ReorderBufferTupleCidEnt *)
2849                         hash_search(tuplecid_data,
2850                                                 (void *) &key,
2851                                                 HASH_ENTER,
2852                                                 &found);
2853
2854                 if (found)
2855                 {
2856                         /*
2857                          * Make sure the existing mapping makes sense. We sometime update
2858                          * old records that did not yet have a cmax (e.g. pg_class' own
2859                          * entry while rewriting it) during rewrites, so allow that.
2860                          */
2861                         Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
2862                         Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
2863                 }
2864                 else
2865                 {
2866                         /* update mapping */
2867                         new_ent->cmin = ent->cmin;
2868                         new_ent->cmax = ent->cmax;
2869                         new_ent->combocid = ent->combocid;
2870                 }
2871         }
2872 }
2873
2874
2875 /*
2876  * Check whether the TransactionOId 'xid' is in the pre-sorted array 'xip'.
2877  */
2878 static bool
2879 TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
2880 {
2881         return bsearch(&xid, xip, num,
2882                                    sizeof(TransactionId), xidComparator) != NULL;
2883 }
2884
2885 /*
2886  * qsort() comparator for sorting RewriteMappingFiles in LSN order.
2887  */
2888 static int
2889 file_sort_by_lsn(const void *a_p, const void *b_p)
2890 {
2891         RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
2892         RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
2893
2894         if (a->lsn < b->lsn)
2895                 return -1;
2896         else if (a->lsn > b->lsn)
2897                 return 1;
2898         return 0;
2899 }
2900
2901 /*
2902  * Apply any existing logical remapping files if there are any targeted at our
2903  * transaction for relid.
2904  */
2905 static void
2906 UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
2907 {
2908         DIR                *mapping_dir;
2909         struct dirent *mapping_de;
2910         List       *files = NIL;
2911         ListCell   *file;
2912         RewriteMappingFile **files_a;
2913         size_t          off;
2914         Oid                     dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
2915
2916         mapping_dir = AllocateDir("pg_logical/mappings");
2917         while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
2918         {
2919                 Oid                     f_dboid;
2920                 Oid                     f_relid;
2921                 TransactionId f_mapped_xid;
2922                 TransactionId f_create_xid;
2923                 XLogRecPtr      f_lsn;
2924                 uint32          f_hi,
2925                                         f_lo;
2926                 RewriteMappingFile *f;
2927
2928                 if (strcmp(mapping_de->d_name, ".") == 0 ||
2929                         strcmp(mapping_de->d_name, "..") == 0)
2930                         continue;
2931
2932                 /* Ignore files that aren't ours */
2933                 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
2934                         continue;
2935
2936                 if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
2937                                    &f_dboid, &f_relid, &f_hi, &f_lo,
2938                                    &f_mapped_xid, &f_create_xid) != 6)
2939                         elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
2940
2941                 f_lsn = ((uint64) f_hi) << 32 | f_lo;
2942
2943                 /* mapping for another database */
2944                 if (f_dboid != dboid)
2945                         continue;
2946
2947                 /* mapping for another relation */
2948                 if (f_relid != relid)
2949                         continue;
2950
2951                 /* did the creating transaction abort? */
2952                 if (!TransactionIdDidCommit(f_create_xid))
2953                         continue;
2954
2955                 /* not for our transaction */
2956                 if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
2957                         continue;
2958
2959                 /* ok, relevant, queue for apply */
2960                 f = palloc(sizeof(RewriteMappingFile));
2961                 f->lsn = f_lsn;
2962                 strcpy(f->fname, mapping_de->d_name);
2963                 files = lappend(files, f);
2964         }
2965         FreeDir(mapping_dir);
2966
2967         /* build array we can easily sort */
2968         files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
2969         off = 0;
2970         foreach(file, files)
2971         {
2972                 files_a[off++] = lfirst(file);
2973         }
2974
2975         /* sort files so we apply them in LSN order */
2976         qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
2977                   file_sort_by_lsn);
2978
2979         for (off = 0; off < list_length(files); off++)
2980         {
2981                 RewriteMappingFile *f = files_a[off];
2982
2983                 elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
2984                          snapshot->subxip[0]);
2985                 ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
2986                 pfree(f);
2987         }
2988 }
2989
2990 /*
2991  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
2992  * combocids.
2993  */
2994 bool
2995 ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
2996                                                           Snapshot snapshot,
2997                                                           HeapTuple htup, Buffer buffer,
2998                                                           CommandId *cmin, CommandId *cmax)
2999 {
3000         ReorderBufferTupleCidKey key;
3001         ReorderBufferTupleCidEnt *ent;
3002         ForkNumber      forkno;
3003         BlockNumber blockno;
3004         bool            updated_mapping = false;
3005
3006         /* be careful about padding */
3007         memset(&key, 0, sizeof(key));
3008
3009         Assert(!BufferIsLocal(buffer));
3010
3011         /*
3012          * get relfilenode from the buffer, no convenient way to access it other
3013          * than that.
3014          */
3015         BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3016
3017         /* tuples can only be in the main fork */
3018         Assert(forkno == MAIN_FORKNUM);
3019         Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3020
3021         ItemPointerCopy(&htup->t_self,
3022                                         &key.tid);
3023
3024 restart:
3025         ent = (ReorderBufferTupleCidEnt *)
3026                 hash_search(tuplecid_data,
3027                                         (void *) &key,
3028                                         HASH_FIND,
3029                                         NULL);
3030
3031         /*
3032          * failed to find a mapping, check whether the table was rewritten and
3033          * apply mapping if so, but only do that once - there can be no new
3034          * mappings while we are in here since we have to hold a lock on the
3035          * relation.
3036          */
3037         if (ent == NULL && !updated_mapping)
3038         {
3039                 UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3040                 /* now check but don't update for a mapping again */
3041                 updated_mapping = true;
3042                 goto restart;
3043         }
3044         else if (ent == NULL)
3045                 return false;
3046
3047         if (cmin)
3048                 *cmin = ent->cmin;
3049         if (cmax)
3050                 *cmax = ent->cmax;
3051         return true;
3052 }