]> granicus.if.org Git - postgresql/blob - src/backend/replication/logical/reorderbuffer.c
Improve error messages in reorderbuffer.c.
[postgresql] / src / backend / replication / logical / reorderbuffer.c
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  *        PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2014, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  *        This module gets handed individual pieces of transactions in the order
15  *        they are written to the WAL and is responsible to reassemble them into
16  *        toplevel transaction sized pieces. When a transaction is completely
17  *        reassembled - signalled by reading the transaction commit record - it
18  *        will then call the output plugin (c.f. ReorderBufferCommit()) with the
19  *        individual changes. The output plugins rely on snapshots built by
20  *        snapbuild.c which hands them to us.
21  *
22  *        Transactions and subtransactions/savepoints in postgres are not
23  *        immediately linked to each other from outside the performing
24  *        backend. Only at commit/abort (or special xact_assignment records) they
25  *        are linked together. Which means that we will have to splice together a
26  *        toplevel transaction from its subtransactions. To do that efficiently we
27  *        build a binary heap indexed by the smallest current lsn of the individual
28  *        subtransactions' changestreams. As the individual streams are inherently
29  *        ordered by LSN - since that is where we build them from - the transaction
30  *        can easily be reassembled by always using the subtransaction with the
31  *        smallest current LSN from the heap.
32  *
33  *        In order to cope with large transactions - which can be several times as
34  *        big as the available memory - this module supports spooling the contents
35  *        of a large transactions to disk. When the transaction is replayed the
36  *        contents of individual (sub-)transactions will be read from disk in
37  *        chunks.
38  *
39  *        This module also has to deal with reassembling toast records from the
40  *        individual chunks stored in WAL. When a new (or initial) version of a
41  *        tuple is stored in WAL it will always be preceded by the toast chunks
42  *        emitted for the columns stored out of line. Within a single toplevel
43  *        transaction there will be no other data carrying records between a row's
44  *        toast chunks and the row data itself. See ReorderBufferToast* for
45  *        details.
46  * -------------------------------------------------------------------------
47  */
48 #include "postgres.h"
49
50 #include <unistd.h>
51 #include <sys/stat.h>
52
53 #include "access/rewriteheap.h"
54 #include "access/transam.h"
55 #include "access/tuptoaster.h"
56 #include "access/xact.h"
57 #include "catalog/catalog.h"
58 #include "lib/binaryheap.h"
59 #include "miscadmin.h"
60 #include "replication/logical.h"
61 #include "replication/reorderbuffer.h"
62 #include "replication/slot.h"
63 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
64 #include "storage/bufmgr.h"
65 #include "storage/fd.h"
66 #include "storage/sinval.h"
67 #include "utils/builtins.h"
68 #include "utils/combocid.h"
69 #include "utils/memdebug.h"
70 #include "utils/memutils.h"
71 #include "utils/relcache.h"
72 #include "utils/relfilenodemap.h"
73 #include "utils/tqual.h"
74
75
76 /* entry for a hash table we use to map from xid to our transaction state */
77 typedef struct ReorderBufferTXNByIdEnt
78 {
79         TransactionId xid;
80         ReorderBufferTXN *txn;
81 } ReorderBufferTXNByIdEnt;
82
83 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
84 typedef struct ReorderBufferTupleCidKey
85 {
86         RelFileNode relnode;
87         ItemPointerData tid;
88 } ReorderBufferTupleCidKey;
89
90 typedef struct ReorderBufferTupleCidEnt
91 {
92         ReorderBufferTupleCidKey key;
93         CommandId       cmin;
94         CommandId       cmax;
95         CommandId       combocid;               /* just for debugging */
96 } ReorderBufferTupleCidEnt;
97
98 /* k-way in-order change iteration support structures */
99 typedef struct ReorderBufferIterTXNEntry
100 {
101         XLogRecPtr      lsn;
102         ReorderBufferChange *change;
103         ReorderBufferTXN *txn;
104         int                     fd;
105         XLogSegNo       segno;
106 } ReorderBufferIterTXNEntry;
107
108 typedef struct ReorderBufferIterTXNState
109 {
110         binaryheap *heap;
111         Size            nr_txns;
112         dlist_head      old_change;
113         ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
114 } ReorderBufferIterTXNState;
115
116 /* toast datastructures */
117 typedef struct ReorderBufferToastEnt
118 {
119         Oid                     chunk_id;               /* toast_table.chunk_id */
120         int32           last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
121                                                                  * have seen */
122         Size            num_chunks;             /* number of chunks we've already seen */
123         Size            size;                   /* combined size of chunks seen */
124         dlist_head      chunks;                 /* linked list of chunks */
125         struct varlena *reconstructed;          /* reconstructed varlena now pointed
126                                                                                  * to in main tup */
127 } ReorderBufferToastEnt;
128
129 /* Disk serialization support datastructures */
130 typedef struct ReorderBufferDiskChange
131 {
132         Size            size;
133         ReorderBufferChange change;
134         /* data follows */
135 } ReorderBufferDiskChange;
136
137 /*
138  * Maximum number of changes kept in memory, per transaction. After that,
139  * changes are spooled to disk.
140  *
141  * The current value should be sufficient to decode the entire transaction
142  * without hitting disk in OLTP workloads, while starting to spool to disk in
143  * other workloads reasonably fast.
144  *
145  * At some point in the future it probaly makes sense to have a more elaborate
146  * resource management here, but it's not entirely clear what that would look
147  * like.
148  */
149 static const Size max_changes_in_memory = 4096;
150
151 /*
152  * We use a very simple form of a slab allocator for frequently allocated
153  * objects, simply keeping a fixed number in a linked list when unused,
154  * instead pfree()ing them. Without that in many workloads aset.c becomes a
155  * major bottleneck, especially when spilling to disk while decoding batch
156  * workloads.
157  */
158 static const Size max_cached_changes = 4096 * 2;
159 static const Size max_cached_tuplebufs = 4096 * 2;              /* ~8MB */
160 static const Size max_cached_transactions = 512;
161
162
163 /* ---------------------------------------
164  * primary reorderbuffer support routines
165  * ---------------------------------------
166  */
167 static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
168 static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
169 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
170                                           TransactionId xid, bool create, bool *is_new,
171                                           XLogRecPtr lsn, bool create_as_top);
172
173 static void AssertTXNLsnOrder(ReorderBuffer *rb);
174
175 /* ---------------------------------------
176  * support functions for lsn-order iterating over the ->changes of a
177  * transaction and its subtransactions
178  *
179  * used for iteration over the k-way heap merge of a transaction and its
180  * subtransactions
181  * ---------------------------------------
182  */
183 static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn);
184 static ReorderBufferChange *
185                         ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
186 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
187                                                    ReorderBufferIterTXNState *state);
188 static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
189
190 /*
191  * ---------------------------------------
192  * Disk serialization support functions
193  * ---------------------------------------
194  */
195 static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
196 static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
197 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
198                                                          int fd, ReorderBufferChange *change);
199 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
200                                                         int *fd, XLogSegNo *segno);
201 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
202                                                    char *change);
203 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
204
205 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
206 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
207                                           ReorderBufferTXN *txn, CommandId cid);
208
209 /* ---------------------------------------
210  * toast reassembly support
211  * ---------------------------------------
212  */
213 static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
214 static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
215 static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
216                                                   Relation relation, ReorderBufferChange *change);
217 static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
218                                                           Relation relation, ReorderBufferChange *change);
219
220
221 /*
222  * Allocate a new ReorderBuffer
223  */
224 ReorderBuffer *
225 ReorderBufferAllocate(void)
226 {
227         ReorderBuffer *buffer;
228         HASHCTL         hash_ctl;
229         MemoryContext new_ctx;
230
231         /* allocate memory in own context, to have better accountability */
232         new_ctx = AllocSetContextCreate(CurrentMemoryContext,
233                                                                         "ReorderBuffer",
234                                                                         ALLOCSET_DEFAULT_MINSIZE,
235                                                                         ALLOCSET_DEFAULT_INITSIZE,
236                                                                         ALLOCSET_DEFAULT_MAXSIZE);
237
238         buffer =
239                 (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
240
241         memset(&hash_ctl, 0, sizeof(hash_ctl));
242
243         buffer->context = new_ctx;
244
245         hash_ctl.keysize = sizeof(TransactionId);
246         hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
247         hash_ctl.hash = tag_hash;
248         hash_ctl.hcxt = buffer->context;
249
250         buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
251                                                                  HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
252
253         buffer->by_txn_last_xid = InvalidTransactionId;
254         buffer->by_txn_last_txn = NULL;
255
256         buffer->nr_cached_transactions = 0;
257         buffer->nr_cached_changes = 0;
258         buffer->nr_cached_tuplebufs = 0;
259
260         buffer->outbuf = NULL;
261         buffer->outbufsize = 0;
262
263         buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
264
265         dlist_init(&buffer->toplevel_by_lsn);
266         dlist_init(&buffer->cached_transactions);
267         dlist_init(&buffer->cached_changes);
268         slist_init(&buffer->cached_tuplebufs);
269
270         return buffer;
271 }
272
273 /*
274  * Free a ReorderBuffer
275  */
276 void
277 ReorderBufferFree(ReorderBuffer *rb)
278 {
279         MemoryContext context = rb->context;
280
281         /*
282          * We free separately allocated data by entirely scrapping reorderbuffer's
283          * memory context.
284          */
285         MemoryContextDelete(context);
286 }
287
288 /*
289  * Get a unused, possibly preallocated, ReorderBufferTXN.
290  */
291 static ReorderBufferTXN *
292 ReorderBufferGetTXN(ReorderBuffer *rb)
293 {
294         ReorderBufferTXN *txn;
295
296         /* check the slab cache */
297         if (rb->nr_cached_transactions > 0)
298         {
299                 rb->nr_cached_transactions--;
300                 txn = (ReorderBufferTXN *)
301                         dlist_container(ReorderBufferTXN, node,
302                                                         dlist_pop_head_node(&rb->cached_transactions));
303         }
304         else
305         {
306                 txn = (ReorderBufferTXN *)
307                         MemoryContextAlloc(rb->context, sizeof(ReorderBufferTXN));
308         }
309
310         memset(txn, 0, sizeof(ReorderBufferTXN));
311
312         dlist_init(&txn->changes);
313         dlist_init(&txn->tuplecids);
314         dlist_init(&txn->subtxns);
315
316         return txn;
317 }
318
319 /*
320  * Free a ReorderBufferTXN.
321  *
322  * Deallocation might be delayed for efficiency purposes, for details check
323  * the comments above max_cached_changes's definition.
324  */
325 void
326 ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
327 {
328         /* clean the lookup cache if we were cached (quite likely) */
329         if (rb->by_txn_last_xid == txn->xid)
330         {
331                 rb->by_txn_last_xid = InvalidTransactionId;
332                 rb->by_txn_last_txn = NULL;
333         }
334
335         /* free data that's contained */
336
337         if (txn->tuplecid_hash != NULL)
338         {
339                 hash_destroy(txn->tuplecid_hash);
340                 txn->tuplecid_hash = NULL;
341         }
342
343         if (txn->invalidations)
344         {
345                 pfree(txn->invalidations);
346                 txn->invalidations = NULL;
347         }
348
349         /* check whether to put into the slab cache */
350         if (rb->nr_cached_transactions < max_cached_transactions)
351         {
352                 rb->nr_cached_transactions++;
353                 dlist_push_head(&rb->cached_transactions, &txn->node);
354                 VALGRIND_MAKE_MEM_UNDEFINED(txn, sizeof(ReorderBufferTXN));
355                 VALGRIND_MAKE_MEM_DEFINED(&txn->node, sizeof(txn->node));
356         }
357         else
358         {
359                 pfree(txn);
360         }
361 }
362
363 /*
364  * Get a unused, possibly preallocated, ReorderBufferChange.
365  */
366 ReorderBufferChange *
367 ReorderBufferGetChange(ReorderBuffer *rb)
368 {
369         ReorderBufferChange *change;
370
371         /* check the slab cache */
372         if (rb->nr_cached_changes)
373         {
374                 rb->nr_cached_changes--;
375                 change = (ReorderBufferChange *)
376                         dlist_container(ReorderBufferChange, node,
377                                                         dlist_pop_head_node(&rb->cached_changes));
378         }
379         else
380         {
381                 change = (ReorderBufferChange *)
382                         MemoryContextAlloc(rb->context, sizeof(ReorderBufferChange));
383         }
384
385         memset(change, 0, sizeof(ReorderBufferChange));
386         return change;
387 }
388
389 /*
390  * Free an ReorderBufferChange.
391  *
392  * Deallocation might be delayed for efficiency purposes, for details check
393  * the comments above max_cached_changes's definition.
394  */
395 void
396 ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
397 {
398         /* free contained data */
399         switch (change->action)
400         {
401                 case REORDER_BUFFER_CHANGE_INSERT:
402                 case REORDER_BUFFER_CHANGE_UPDATE:
403                 case REORDER_BUFFER_CHANGE_DELETE:
404                         if (change->data.tp.newtuple)
405                         {
406                                 ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
407                                 change->data.tp.newtuple = NULL;
408                         }
409
410                         if (change->data.tp.oldtuple)
411                         {
412                                 ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
413                                 change->data.tp.oldtuple = NULL;
414                         }
415                         break;
416                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
417                         if (change->data.snapshot)
418                         {
419                                 ReorderBufferFreeSnap(rb, change->data.snapshot);
420                                 change->data.snapshot = NULL;
421                         }
422                         break;
423                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
424                         break;
425                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
426                         break;
427         }
428
429         /* check whether to put into the slab cache */
430         if (rb->nr_cached_changes < max_cached_changes)
431         {
432                 rb->nr_cached_changes++;
433                 dlist_push_head(&rb->cached_changes, &change->node);
434                 VALGRIND_MAKE_MEM_UNDEFINED(change, sizeof(ReorderBufferChange));
435                 VALGRIND_MAKE_MEM_DEFINED(&change->node, sizeof(change->node));
436         }
437         else
438         {
439                 pfree(change);
440         }
441 }
442
443
444 /*
445  * Get a unused, possibly preallocated, ReorderBufferTupleBuf
446  */
447 ReorderBufferTupleBuf *
448 ReorderBufferGetTupleBuf(ReorderBuffer *rb)
449 {
450         ReorderBufferTupleBuf *tuple;
451
452         /* check the slab cache */
453         if (rb->nr_cached_tuplebufs)
454         {
455                 rb->nr_cached_tuplebufs--;
456                 tuple = slist_container(ReorderBufferTupleBuf, node,
457                                                                 slist_pop_head_node(&rb->cached_tuplebufs));
458 #ifdef USE_ASSERT_CHECKING
459                 memset(tuple, 0xdeadbeef, sizeof(ReorderBufferTupleBuf));
460 #endif
461         }
462         else
463         {
464                 tuple = (ReorderBufferTupleBuf *)
465                         MemoryContextAlloc(rb->context, sizeof(ReorderBufferTupleBuf));
466         }
467
468         return tuple;
469 }
470
471 /*
472  * Free an ReorderBufferTupleBuf.
473  *
474  * Deallocation might be delayed for efficiency purposes, for details check
475  * the comments above max_cached_changes's definition.
476  */
477 void
478 ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
479 {
480         /* check whether to put into the slab cache */
481         if (rb->nr_cached_tuplebufs < max_cached_tuplebufs)
482         {
483                 rb->nr_cached_tuplebufs++;
484                 slist_push_head(&rb->cached_tuplebufs, &tuple->node);
485                 VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf));
486                 VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
487         }
488         else
489         {
490                 pfree(tuple);
491         }
492 }
493
494 /*
495  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
496  * If create is true, and a transaction doesn't already exist, create it
497  * (with the given LSN, and as top transaction if that's specified);
498  * when this happens, is_new is set to true.
499  */
500 static ReorderBufferTXN *
501 ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
502                                           bool *is_new, XLogRecPtr lsn, bool create_as_top)
503 {
504         ReorderBufferTXN *txn;
505         ReorderBufferTXNByIdEnt *ent;
506         bool            found;
507
508         Assert(TransactionIdIsValid(xid));
509         Assert(!create || lsn != InvalidXLogRecPtr);
510
511         /*
512          * Check the one-entry lookup cache first
513          */
514         if (TransactionIdIsValid(rb->by_txn_last_xid) &&
515                 rb->by_txn_last_xid == xid)
516         {
517                 txn = rb->by_txn_last_txn;
518
519                 if (txn != NULL)
520                 {
521                         /* found it, and it's valid */
522                         if (is_new)
523                                 *is_new = false;
524                         return txn;
525                 }
526
527                 /*
528                  * cached as non-existant, and asked not to create? Then nothing else
529                  * to do.
530                  */
531                 if (!create)
532                         return NULL;
533                 /* otherwise fall through to create it */
534         }
535
536         /*
537          * If the cache wasn't hit or it yielded an "does-not-exist" and we want
538          * to create an entry.
539          */
540
541         /* search the lookup table */
542         ent = (ReorderBufferTXNByIdEnt *)
543                 hash_search(rb->by_txn,
544                                         (void *) &xid,
545                                         create ? HASH_ENTER : HASH_FIND,
546                                         &found);
547         if (found)
548                 txn = ent->txn;
549         else if (create)
550         {
551                 /* initialize the new entry, if creation was requested */
552                 Assert(ent != NULL);
553
554                 ent->txn = ReorderBufferGetTXN(rb);
555                 ent->txn->xid = xid;
556                 txn = ent->txn;
557                 txn->first_lsn = lsn;
558                 txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
559
560                 if (create_as_top)
561                 {
562                         dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
563                         AssertTXNLsnOrder(rb);
564                 }
565         }
566         else
567                 txn = NULL;                             /* not found and not asked to create */
568
569         /* update cache */
570         rb->by_txn_last_xid = xid;
571         rb->by_txn_last_txn = txn;
572
573         if (is_new)
574                 *is_new = !found;
575
576         Assert(!create || !!txn);
577         return txn;
578 }
579
580 /*
581  * Queue a change into a transaction so it can be replayed upon commit.
582  */
583 void
584 ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
585                                            ReorderBufferChange *change)
586 {
587         ReorderBufferTXN *txn;
588
589         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
590
591         change->lsn = lsn;
592         Assert(InvalidXLogRecPtr != lsn);
593         dlist_push_tail(&txn->changes, &change->node);
594         txn->nentries++;
595         txn->nentries_mem++;
596
597         ReorderBufferCheckSerializeTXN(rb, txn);
598 }
599
600 static void
601 AssertTXNLsnOrder(ReorderBuffer *rb)
602 {
603 #ifdef USE_ASSERT_CHECKING
604         dlist_iter      iter;
605         XLogRecPtr      prev_first_lsn = InvalidXLogRecPtr;
606
607         dlist_foreach(iter, &rb->toplevel_by_lsn)
608         {
609                 ReorderBufferTXN *cur_txn;
610
611                 cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
612                 Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
613
614                 if (cur_txn->end_lsn != InvalidXLogRecPtr)
615                         Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
616
617                 if (prev_first_lsn != InvalidXLogRecPtr)
618                         Assert(prev_first_lsn < cur_txn->first_lsn);
619
620                 Assert(!cur_txn->is_known_as_subxact);
621                 prev_first_lsn = cur_txn->first_lsn;
622         }
623 #endif
624 }
625
626 ReorderBufferTXN *
627 ReorderBufferGetOldestTXN(ReorderBuffer *rb)
628 {
629         ReorderBufferTXN *txn;
630
631         if (dlist_is_empty(&rb->toplevel_by_lsn))
632                 return NULL;
633
634         AssertTXNLsnOrder(rb);
635
636         txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
637
638         Assert(!txn->is_known_as_subxact);
639         Assert(txn->first_lsn != InvalidXLogRecPtr);
640         return txn;
641 }
642
643 void
644 ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
645 {
646         rb->current_restart_decoding_lsn = ptr;
647 }
648
649 void
650 ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
651                                                  TransactionId subxid, XLogRecPtr lsn)
652 {
653         ReorderBufferTXN *txn;
654         ReorderBufferTXN *subtxn;
655         bool            new_top;
656         bool            new_sub;
657
658         txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
659         subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
660
661         if (new_sub)
662         {
663                 /*
664                  * we assign subtransactions to top level transaction even if we don't
665                  * have data for it yet, assignment records frequently reference xids
666                  * that have not yet produced any records. Knowing those aren't top
667                  * level xids allows us to make processing cheaper in some places.
668                  */
669                 dlist_push_tail(&txn->subtxns, &subtxn->node);
670                 txn->nsubtxns++;
671         }
672         else if (!subtxn->is_known_as_subxact)
673         {
674                 subtxn->is_known_as_subxact = true;
675                 Assert(subtxn->nsubtxns == 0);
676
677                 /* remove from lsn order list of top-level transactions */
678                 dlist_delete(&subtxn->node);
679
680                 /* add to toplevel transaction */
681                 dlist_push_tail(&txn->subtxns, &subtxn->node);
682                 txn->nsubtxns++;
683         }
684         else if (new_top)
685         {
686                 elog(ERROR, "existing subxact assigned to unknown toplevel xact");
687         }
688 }
689
690 /*
691  * Associate a subtransaction with its toplevel transaction at commit
692  * time. There may be no further changes added after this.
693  */
694 void
695 ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
696                                                  TransactionId subxid, XLogRecPtr commit_lsn,
697                                                  XLogRecPtr end_lsn)
698 {
699         ReorderBufferTXN *txn;
700         ReorderBufferTXN *subtxn;
701
702         subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
703                                                                    InvalidXLogRecPtr, false);
704
705         /*
706          * No need to do anything if that subtxn didn't contain any changes
707          */
708         if (!subtxn)
709                 return;
710
711         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
712
713         if (txn == NULL)
714                 elog(ERROR, "subxact logged without previous toplevel record");
715
716         /*
717          * Pass the our base snapshot to the parent transaction if it doesn't have
718          * one, or ours is older. That can happen if there are no changes in the
719          * toplevel transaction but in one of the child transactions. This allows
720          * the parent to simply use it's base snapshot initially.
721          */
722         if (txn->base_snapshot == NULL ||
723                 txn->base_snapshot_lsn > subtxn->base_snapshot_lsn)
724         {
725                 txn->base_snapshot = subtxn->base_snapshot;
726                 txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
727                 subtxn->base_snapshot = NULL;
728                 subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
729         }
730
731         subtxn->final_lsn = commit_lsn;
732         subtxn->end_lsn = end_lsn;
733
734         if (!subtxn->is_known_as_subxact)
735         {
736                 subtxn->is_known_as_subxact = true;
737                 Assert(subtxn->nsubtxns == 0);
738
739                 /* remove from lsn order list of top-level transactions */
740                 dlist_delete(&subtxn->node);
741
742                 /* add to subtransaction list */
743                 dlist_push_tail(&txn->subtxns, &subtxn->node);
744                 txn->nsubtxns++;
745         }
746 }
747
748
749 /*
750  * Support for efficiently iterating over a transaction's and its
751  * subtransactions' changes.
752  *
753  * We do by doing a k-way merge between transactions/subtransactions. For that
754  * we model the current heads of the different transactions as a binary heap
755  * so we easily know which (sub-)transaction has the change with the smallest
756  * lsn next.
757  *
758  * We assume the changes in individual transactions are already sorted by LSN.
759  */
760
761 /*
762  * Binary heap comparison function.
763  */
764 static int
765 ReorderBufferIterCompare(Datum a, Datum b, void *arg)
766 {
767         ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
768         XLogRecPtr      pos_a = state->entries[DatumGetInt32(a)].lsn;
769         XLogRecPtr      pos_b = state->entries[DatumGetInt32(b)].lsn;
770
771         if (pos_a < pos_b)
772                 return 1;
773         else if (pos_a == pos_b)
774                 return 0;
775         return -1;
776 }
777
778 /*
779  * Allocate & initialize an iterator which iterates in lsn order over a
780  * transaction and all its subtransactions.
781  */
782 static ReorderBufferIterTXNState *
783 ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
784 {
785         Size            nr_txns = 0;
786         ReorderBufferIterTXNState *state;
787         dlist_iter      cur_txn_i;
788         int32           off;
789
790         /*
791          * Calculate the size of our heap: one element for every transaction that
792          * contains changes.  (Besides the transactions already in the reorder
793          * buffer, we count the one we were directly passed.)
794          */
795         if (txn->nentries > 0)
796                 nr_txns++;
797
798         dlist_foreach(cur_txn_i, &txn->subtxns)
799         {
800                 ReorderBufferTXN *cur_txn;
801
802                 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
803
804                 if (cur_txn->nentries > 0)
805                         nr_txns++;
806         }
807
808         /*
809          * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
810          * need to allocate/build a heap then.
811          */
812
813         /* allocate iteration state */
814         state = (ReorderBufferIterTXNState *)
815                 MemoryContextAllocZero(rb->context,
816                                                            sizeof(ReorderBufferIterTXNState) +
817                                                            sizeof(ReorderBufferIterTXNEntry) * nr_txns);
818
819         state->nr_txns = nr_txns;
820         dlist_init(&state->old_change);
821
822         for (off = 0; off < state->nr_txns; off++)
823         {
824                 state->entries[off].fd = -1;
825                 state->entries[off].segno = 0;
826         }
827
828         /* allocate heap */
829         state->heap = binaryheap_allocate(state->nr_txns,
830                                                                           ReorderBufferIterCompare,
831                                                                           state);
832
833         /*
834          * Now insert items into the binary heap, in an unordered fashion.  (We
835          * will run a heap assembly step at the end; this is more efficient.)
836          */
837
838         off = 0;
839
840         /* add toplevel transaction if it contains changes */
841         if (txn->nentries > 0)
842         {
843                 ReorderBufferChange *cur_change;
844
845                 if (txn->nentries != txn->nentries_mem)
846                         ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
847                                                                                 &state->entries[off].segno);
848
849                 cur_change = dlist_head_element(ReorderBufferChange, node,
850                                                                                 &txn->changes);
851
852                 state->entries[off].lsn = cur_change->lsn;
853                 state->entries[off].change = cur_change;
854                 state->entries[off].txn = txn;
855
856                 binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
857         }
858
859         /* add subtransactions if they contain changes */
860         dlist_foreach(cur_txn_i, &txn->subtxns)
861         {
862                 ReorderBufferTXN *cur_txn;
863
864                 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
865
866                 if (cur_txn->nentries > 0)
867                 {
868                         ReorderBufferChange *cur_change;
869
870                         if (txn->nentries != txn->nentries_mem)
871                                 ReorderBufferRestoreChanges(rb, cur_txn,
872                                                                                         &state->entries[off].fd,
873                                                                                         &state->entries[off].segno);
874
875                         cur_change = dlist_head_element(ReorderBufferChange, node,
876                                                                                         &cur_txn->changes);
877
878                         state->entries[off].lsn = cur_change->lsn;
879                         state->entries[off].change = cur_change;
880                         state->entries[off].txn = cur_txn;
881
882                         binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
883                 }
884         }
885
886         /* assemble a valid binary heap */
887         binaryheap_build(state->heap);
888
889         return state;
890 }
891
892 /*
893  * Return the next change when iterating over a transaction and its
894  * subtransactions.
895  *
896  * Returns NULL when no further changes exist.
897  */
898 static ReorderBufferChange *
899 ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
900 {
901         ReorderBufferChange *change;
902         ReorderBufferIterTXNEntry *entry;
903         int32           off;
904
905         /* nothing there anymore */
906         if (state->heap->bh_size == 0)
907                 return NULL;
908
909         off = DatumGetInt32(binaryheap_first(state->heap));
910         entry = &state->entries[off];
911
912         /* free memory we might have "leaked" in the previous *Next call */
913         if (!dlist_is_empty(&state->old_change))
914         {
915                 change = dlist_container(ReorderBufferChange, node,
916                                                                  dlist_pop_head_node(&state->old_change));
917                 ReorderBufferReturnChange(rb, change);
918                 Assert(dlist_is_empty(&state->old_change));
919         }
920
921         change = entry->change;
922
923         /*
924          * update heap with information about which transaction has the next
925          * relevant change in LSN order
926          */
927
928         /* there are in-memory changes */
929         if (dlist_has_next(&entry->txn->changes, &entry->change->node))
930         {
931                 dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
932                 ReorderBufferChange *next_change =
933                 dlist_container(ReorderBufferChange, node, next);
934
935                 /* txn stays the same */
936                 state->entries[off].lsn = next_change->lsn;
937                 state->entries[off].change = next_change;
938
939                 binaryheap_replace_first(state->heap, Int32GetDatum(off));
940                 return change;
941         }
942
943         /* try to load changes from disk */
944         if (entry->txn->nentries != entry->txn->nentries_mem)
945         {
946                 /*
947                  * Ugly: restoring changes will reuse *Change records, thus delete the
948                  * current one from the per-tx list and only free in the next call.
949                  */
950                 dlist_delete(&change->node);
951                 dlist_push_tail(&state->old_change, &change->node);
952
953                 if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
954                                                                                 &state->entries[off].segno))
955                 {
956                         /* successfully restored changes from disk */
957                         ReorderBufferChange *next_change =
958                         dlist_head_element(ReorderBufferChange, node,
959                                                            &entry->txn->changes);
960
961                         elog(DEBUG2, "restored %u/%u changes from disk",
962                                  (uint32) entry->txn->nentries_mem,
963                                  (uint32) entry->txn->nentries);
964
965                         Assert(entry->txn->nentries_mem);
966                         /* txn stays the same */
967                         state->entries[off].lsn = next_change->lsn;
968                         state->entries[off].change = next_change;
969                         binaryheap_replace_first(state->heap, Int32GetDatum(off));
970
971                         return change;
972                 }
973         }
974
975         /* ok, no changes there anymore, remove */
976         binaryheap_remove_first(state->heap);
977
978         return change;
979 }
980
981 /*
982  * Deallocate the iterator
983  */
984 static void
985 ReorderBufferIterTXNFinish(ReorderBuffer *rb,
986                                                    ReorderBufferIterTXNState *state)
987 {
988         int32           off;
989
990         for (off = 0; off < state->nr_txns; off++)
991         {
992                 if (state->entries[off].fd != -1)
993                         CloseTransientFile(state->entries[off].fd);
994         }
995
996         /* free memory we might have "leaked" in the last *Next call */
997         if (!dlist_is_empty(&state->old_change))
998         {
999                 ReorderBufferChange *change;
1000
1001                 change = dlist_container(ReorderBufferChange, node,
1002                                                                  dlist_pop_head_node(&state->old_change));
1003                 ReorderBufferReturnChange(rb, change);
1004                 Assert(dlist_is_empty(&state->old_change));
1005         }
1006
1007         binaryheap_free(state->heap);
1008         pfree(state);
1009 }
1010
1011 /*
1012  * Cleanup the contents of a transaction, usually after the transaction
1013  * committed or aborted.
1014  */
1015 static void
1016 ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1017 {
1018         bool            found;
1019         dlist_mutable_iter iter;
1020
1021         /* cleanup subtransactions & their changes */
1022         dlist_foreach_modify(iter, &txn->subtxns)
1023         {
1024                 ReorderBufferTXN *subtxn;
1025
1026                 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1027
1028                 /*
1029                  * Subtransactions are always associated to the toplevel TXN, even if
1030                  * they originally were happening inside another subtxn, so we won't
1031                  * ever recurse more than one level deep here.
1032                  */
1033                 Assert(subtxn->is_known_as_subxact);
1034                 Assert(subtxn->nsubtxns == 0);
1035
1036                 ReorderBufferCleanupTXN(rb, subtxn);
1037         }
1038
1039         /* cleanup changes in the toplevel txn */
1040         dlist_foreach_modify(iter, &txn->changes)
1041         {
1042                 ReorderBufferChange *change;
1043
1044                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1045
1046                 ReorderBufferReturnChange(rb, change);
1047         }
1048
1049         /*
1050          * Cleanup the tuplecids we stored for decoding catalog snapshot
1051          * access. They are always stored in the toplevel transaction.
1052          */
1053         dlist_foreach_modify(iter, &txn->tuplecids)
1054         {
1055                 ReorderBufferChange *change;
1056
1057                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1058                 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1059                 ReorderBufferReturnChange(rb, change);
1060         }
1061
1062         if (txn->base_snapshot != NULL)
1063         {
1064                 SnapBuildSnapDecRefcount(txn->base_snapshot);
1065                 txn->base_snapshot = NULL;
1066                 txn->base_snapshot_lsn = InvalidXLogRecPtr;
1067         }
1068
1069         /* delete from list of known subxacts */
1070         if (txn->is_known_as_subxact)
1071         {
1072                 /* NB: nsubxacts count of parent will be too high now */
1073                 dlist_delete(&txn->node);
1074         }
1075         /* delete from LSN ordered list of toplevel TXNs */
1076         else
1077         {
1078                 dlist_delete(&txn->node);
1079         }
1080
1081         /* now remove reference from buffer */
1082         hash_search(rb->by_txn,
1083                                 (void *) &txn->xid,
1084                                 HASH_REMOVE,
1085                                 &found);
1086         Assert(found);
1087
1088         /* remove entries spilled to disk */
1089         if (txn->nentries != txn->nentries_mem)
1090                 ReorderBufferRestoreCleanup(rb, txn);
1091
1092         /* deallocate */
1093         ReorderBufferReturnTXN(rb, txn);
1094 }
1095
1096 /*
1097  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1098  * tqual.c's HeapTupleSatisfiesHistoricMVCC.
1099  */
1100 static void
1101 ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
1102 {
1103         dlist_iter      iter;
1104         HASHCTL         hash_ctl;
1105
1106         if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1107                 return;
1108
1109         memset(&hash_ctl, 0, sizeof(hash_ctl));
1110
1111         hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1112         hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1113         hash_ctl.hash = tag_hash;
1114         hash_ctl.hcxt = rb->context;
1115
1116         /*
1117          * create the hash with the exact number of to-be-stored tuplecids from
1118          * the start
1119          */
1120         txn->tuplecid_hash =
1121                 hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1122                                         HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
1123
1124         dlist_foreach(iter, &txn->tuplecids)
1125         {
1126                 ReorderBufferTupleCidKey key;
1127                 ReorderBufferTupleCidEnt *ent;
1128                 bool            found;
1129                 ReorderBufferChange *change;
1130
1131                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1132
1133                 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1134
1135                 /* be careful about padding */
1136                 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1137
1138                 key.relnode = change->data.tuplecid.node;
1139
1140                 ItemPointerCopy(&change->data.tuplecid.tid,
1141                                                 &key.tid);
1142
1143                 ent = (ReorderBufferTupleCidEnt *)
1144                         hash_search(txn->tuplecid_hash,
1145                                                 (void *) &key,
1146                                                 HASH_ENTER | HASH_FIND,
1147                                                 &found);
1148                 if (!found)
1149                 {
1150                         ent->cmin = change->data.tuplecid.cmin;
1151                         ent->cmax = change->data.tuplecid.cmax;
1152                         ent->combocid = change->data.tuplecid.combocid;
1153                 }
1154                 else
1155                 {
1156                         Assert(ent->cmin == change->data.tuplecid.cmin);
1157                         Assert(ent->cmax == InvalidCommandId ||
1158                                    ent->cmax == change->data.tuplecid.cmax);
1159
1160                         /*
1161                          * if the tuple got valid in this transaction and now got deleted
1162                          * we already have a valid cmin stored. The cmax will be
1163                          * InvalidCommandId though.
1164                          */
1165                         ent->cmax = change->data.tuplecid.cmax;
1166                 }
1167         }
1168 }
1169
1170 /*
1171  * Copy a provided snapshot so we can modify it privately. This is needed so
1172  * that catalog modifying transactions can look into intermediate catalog
1173  * states.
1174  */
1175 static Snapshot
1176 ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
1177                                           ReorderBufferTXN *txn, CommandId cid)
1178 {
1179         Snapshot        snap;
1180         dlist_iter      iter;
1181         int                     i = 0;
1182         Size            size;
1183
1184         size = sizeof(SnapshotData) +
1185                 sizeof(TransactionId) * orig_snap->xcnt +
1186                 sizeof(TransactionId) * (txn->nsubtxns + 1);
1187
1188         snap = MemoryContextAllocZero(rb->context, size);
1189         memcpy(snap, orig_snap, sizeof(SnapshotData));
1190
1191         snap->copied = true;
1192         snap->active_count = 0;
1193         snap->regd_count = 1;
1194         snap->xip = (TransactionId *) (snap + 1);
1195
1196         memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1197
1198         /*
1199          * snap->subxip contains all txids that belong to our transaction which we
1200          * need to check via cmin/cmax. Thats why we store the toplevel
1201          * transaction in there as well.
1202          */
1203         snap->subxip = snap->xip + snap->xcnt;
1204         snap->subxip[i++] = txn->xid;
1205
1206         /*
1207          * nsubxcnt isn't decreased when subtransactions abort, so count
1208          * manually. Since it's an upper boundary it is safe to use it for the
1209          * allocation above.
1210          */
1211         snap->subxcnt = 1;
1212
1213         dlist_foreach(iter, &txn->subtxns)
1214         {
1215                 ReorderBufferTXN *sub_txn;
1216
1217                 sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1218                 snap->subxip[i++] = sub_txn->xid;
1219                 snap->subxcnt++;
1220         }
1221
1222         /* sort so we can bsearch() later */
1223         qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1224
1225         /* store the specified current CommandId */
1226         snap->curcid = cid;
1227
1228         return snap;
1229 }
1230
1231 /*
1232  * Free a previously ReorderBufferCopySnap'ed snapshot
1233  */
1234 static void
1235 ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
1236 {
1237         if (snap->copied)
1238                 pfree(snap);
1239         else
1240                 SnapBuildSnapDecRefcount(snap);
1241 }
1242
1243 /*
1244  * Perform the replay of a transaction and it's non-aborted subtransactions.
1245  *
1246  * Subtransactions previously have to be processed by
1247  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1248  * transaction with ReorderBufferAssignChild.
1249  *
1250  * We currently can only decode a transaction's contents in when their commit
1251  * record is read because that's currently the only place where we know about
1252  * cache invalidations. Thus, once a toplevel commit is read, we iterate over
1253  * the top and subtransactions (using a k-way merge) and replay the changes in
1254  * lsn order.
1255  */
1256 void
1257 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
1258                                         XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1259                                         TimestampTz commit_time)
1260 {
1261         ReorderBufferTXN *txn;
1262         ReorderBufferIterTXNState *iterstate = NULL;
1263         ReorderBufferChange *change;
1264
1265         volatile CommandId      command_id = FirstCommandId;
1266         volatile Snapshot       snapshot_now = NULL;
1267         volatile bool           txn_started = false;
1268         volatile bool           subtxn_started = false;
1269
1270         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1271                                                                 false);
1272
1273         /* unknown transaction, nothing to replay */
1274         if (txn == NULL)
1275                 return;
1276
1277         txn->final_lsn = commit_lsn;
1278         txn->end_lsn = end_lsn;
1279         txn->commit_time = commit_time;
1280
1281         /* serialize the last bunch of changes if we need start earlier anyway */
1282         if (txn->nentries_mem != txn->nentries)
1283                 ReorderBufferSerializeTXN(rb, txn);
1284
1285         /*
1286          * If this transaction didn't have any real changes in our database, it's
1287          * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1288          * transferred its snapshot to this transaction if it had one and the
1289          * toplevel tx didn't.
1290          */
1291         if (txn->base_snapshot == NULL)
1292         {
1293                 Assert(txn->ninvalidations == 0);
1294                 ReorderBufferCleanupTXN(rb, txn);
1295                 return;
1296         }
1297
1298         snapshot_now = txn->base_snapshot;
1299
1300         /* build data to be able to lookup the CommandIds of catalog tuples */
1301         ReorderBufferBuildTupleCidHash(rb, txn);
1302
1303         /* setup the initial snapshot */
1304         SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1305
1306         PG_TRY();
1307         {
1308                 txn_started = false;
1309
1310                 /*
1311                  * Decoding needs access to syscaches et al., which in turn use
1312                  * heavyweight locks and such. Thus we need to have enough state around
1313                  * to keep track of those. The easiest way is to simply use a
1314                  * transaction internally. That also allows us to easily enforce that
1315                  * nothing writes to the database by checking for xid assignments.
1316                  *
1317                  * When we're called via the SQL SRF there's already a transaction
1318                  * started, so start an explicit subtransaction there.
1319                  */
1320                 if (IsTransactionOrTransactionBlock())
1321                 {
1322                         BeginInternalSubTransaction("replay");
1323                         subtxn_started = true;
1324                 }
1325                 else
1326                 {
1327                         StartTransactionCommand();
1328                         txn_started = true;
1329                 }
1330
1331                 rb->begin(rb, txn);
1332
1333                 iterstate = ReorderBufferIterTXNInit(rb, txn);
1334                 while ((change = ReorderBufferIterTXNNext(rb, iterstate)))
1335                 {
1336                         Relation        relation = NULL;
1337                         Oid                     reloid;
1338
1339                         switch (change->action)
1340                         {
1341                                 case REORDER_BUFFER_CHANGE_INSERT:
1342                                 case REORDER_BUFFER_CHANGE_UPDATE:
1343                                 case REORDER_BUFFER_CHANGE_DELETE:
1344                                         Assert(snapshot_now);
1345
1346                                         reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1347                                                                                                 change->data.tp.relnode.relNode);
1348
1349                                         /*
1350                                          * Catalog tuple without data, emitted while catalog was
1351                                          * in the process of being rewritten.
1352                                          */
1353                                         if (reloid == InvalidOid &&
1354                                                 change->data.tp.newtuple == NULL &&
1355                                                 change->data.tp.oldtuple == NULL)
1356                                                 continue;
1357                                         else if (reloid == InvalidOid)
1358                                                 elog(ERROR, "could not map filenode \"%s\" to relation OID",
1359                                                          relpathperm(change->data.tp.relnode,
1360                                                                                  MAIN_FORKNUM));
1361
1362                                         relation = RelationIdGetRelation(reloid);
1363
1364                                         if (relation == NULL)
1365                                                 elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1366                                                          reloid,
1367                                                          relpathperm(change->data.tp.relnode,
1368                                                                                  MAIN_FORKNUM));
1369
1370                                         if (RelationIsLogicallyLogged(relation))
1371                                         {
1372                                                 /*
1373                                                  * For now ignore sequence changes entirely. Most of
1374                                                  * the time they don't log changes using records we
1375                                                  * understand, so it doesn't make sense to handle the
1376                                                  * few cases we do.
1377                                                  */
1378                                                 if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1379                                                 {
1380                                                 }
1381                                                 /* user-triggered change */
1382                                                 else if (!IsToastRelation(relation))
1383                                                 {
1384                                                         ReorderBufferToastReplace(rb, txn, relation, change);
1385                                                         rb->apply_change(rb, txn, relation, change);
1386                                                         ReorderBufferToastReset(rb, txn);
1387                                                 }
1388                                                 /* we're not interested in toast deletions */
1389                                                 else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1390                                                 {
1391                                                         /*
1392                                                          * Need to reassemble the full toasted Datum in
1393                                                          * memory, to ensure the chunks don't get reused
1394                                                          * till we're done remove it from the list of this
1395                                                          * transaction's changes. Otherwise it will get
1396                                                          * freed/reused while restoring spooled data from
1397                                                          * disk.
1398                                                          */
1399                                                         dlist_delete(&change->node);
1400                                                         ReorderBufferToastAppendChunk(rb, txn, relation,
1401                                                                                                                   change);
1402                                                 }
1403
1404                                         }
1405                                         RelationClose(relation);
1406                                         break;
1407                                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
1408                                         /* get rid of the old */
1409                                         TeardownHistoricSnapshot(false);
1410
1411                                         if (snapshot_now->copied)
1412                                         {
1413                                                 ReorderBufferFreeSnap(rb, snapshot_now);
1414                                                 snapshot_now =
1415                                                         ReorderBufferCopySnap(rb, change->data.snapshot,
1416                                                                                                   txn, command_id);
1417                                         }
1418                                         /*
1419                                          * Restored from disk, need to be careful not to double
1420                                          * free. We could introduce refcounting for that, but for
1421                                          * now this seems infrequent enough not to care.
1422                                          */
1423                                         else if (change->data.snapshot->copied)
1424                                         {
1425                                                 snapshot_now =
1426                                                         ReorderBufferCopySnap(rb, change->data.snapshot,
1427                                                                                                   txn, command_id);
1428                                         }
1429                                         else
1430                                         {
1431                                                 snapshot_now = change->data.snapshot;
1432                                         }
1433
1434
1435                                         /* and continue with the new one */
1436                                         SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1437                                         break;
1438
1439                                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
1440                                         Assert(change->data.command_id != InvalidCommandId);
1441
1442                                         if (command_id < change->data.command_id)
1443                                         {
1444                                                 command_id = change->data.command_id;
1445
1446                                                 if (!snapshot_now->copied)
1447                                                 {
1448                                                         /* we don't use the global one anymore */
1449                                                         snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1450                                                                                                                                  txn, command_id);
1451                                                 }
1452
1453                                                 snapshot_now->curcid = command_id;
1454
1455                                                 TeardownHistoricSnapshot(false);
1456                                                 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1457
1458                                                 /*
1459                                                  * Every time the CommandId is incremented, we could
1460                                                  * see new catalog contents, so execute all
1461                                                  * invalidations.
1462                                                  */
1463                                                 ReorderBufferExecuteInvalidations(rb, txn);
1464                                         }
1465
1466                                         break;
1467
1468                                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
1469                                         elog(ERROR, "tuplecid value in changequeue");
1470                                         break;
1471                         }
1472                 }
1473
1474                 ReorderBufferIterTXNFinish(rb, iterstate);
1475
1476                 /* call commit callback */
1477                 rb->commit(rb, txn, commit_lsn);
1478
1479                 /* this is just a sanity check against bad output plugin behaviour */
1480                 if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
1481                         elog(ERROR, "output plugin used XID %u",
1482                                  GetCurrentTransactionId());
1483
1484                 /* make sure there's no cache pollution */
1485                 ReorderBufferExecuteInvalidations(rb, txn);
1486
1487                 /* cleanup */
1488                 TeardownHistoricSnapshot(false);
1489
1490                 /*
1491                  * Abort subtransaction or the transaction as a whole has the right
1492                  * semantics. We want all locks acquired in here to be released, not
1493                  * reassigned to the parent and we do not want any database access
1494                  * have persistent effects.
1495                  */
1496                 if (subtxn_started)
1497                         RollbackAndReleaseCurrentSubTransaction();
1498                 else if (txn_started)
1499                         AbortCurrentTransaction();
1500
1501                 if (snapshot_now->copied)
1502                         ReorderBufferFreeSnap(rb, snapshot_now);
1503
1504                 /* remove potential on-disk data, and deallocate */
1505                 ReorderBufferCleanupTXN(rb, txn);
1506         }
1507         PG_CATCH();
1508         {
1509                 /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1510                 if (iterstate)
1511                         ReorderBufferIterTXNFinish(rb, iterstate);
1512
1513                 TeardownHistoricSnapshot(true);
1514
1515                 if (snapshot_now->copied)
1516                         ReorderBufferFreeSnap(rb, snapshot_now);
1517
1518                 if (subtxn_started)
1519                         RollbackAndReleaseCurrentSubTransaction();
1520                 else if (txn_started)
1521                         AbortCurrentTransaction();
1522
1523                 /*
1524                  * Invalidations in an aborted transactions aren't allowed to do
1525                  * catalog access, so we don't need to still have the snapshot setup.
1526                  */
1527                 ReorderBufferExecuteInvalidations(rb, txn);
1528
1529                 /* remove potential on-disk data, and deallocate */
1530                 ReorderBufferCleanupTXN(rb, txn);
1531
1532                 PG_RE_THROW();
1533         }
1534         PG_END_TRY();
1535 }
1536
1537 /*
1538  * Abort a transaction that possibly has previous changes. Needs to be first
1539  * called for subtransactions and then for the toplevel xid.
1540  *
1541  * NB: Transactions handled here have to have actively aborted (i.e. have
1542  * produced an abort record). Implicitly aborted transactions are handled via
1543  * ReorderBufferAbortOld(); transactions we're just not interesteded in, but
1544  * which have committed are handled in ReorderBufferForget().
1545  *
1546  * This function purges this transaction and its contents from memory and
1547  * disk.
1548  */
1549 void
1550 ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1551 {
1552         ReorderBufferTXN *txn;
1553
1554         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1555                                                                 false);
1556
1557         /* unknown, nothing to remove */
1558         if (txn == NULL)
1559                 return;
1560
1561         /* cosmetic... */
1562         txn->final_lsn = lsn;
1563
1564         /* remove potential on-disk data, and deallocate */
1565         ReorderBufferCleanupTXN(rb, txn);
1566 }
1567
1568 /*
1569  * Abort all transactions that aren't actually running anymore because the
1570  * server restarted.
1571  *
1572  * NB: These really have to be transactions that have aborted due to a server
1573  * crash/immediate restart, as we don't deal with invalidations here.
1574  */
1575 void
1576 ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
1577 {
1578         dlist_mutable_iter it;
1579
1580         /*
1581          * Iterate through all (potential) toplevel TXNs and abort all that are
1582          * older than what possibly can be running. Once we've found the first
1583          * that is alive we stop, there might be some that acquired an xid earlier
1584          * but started writing later, but it's unlikely and they will cleaned up
1585          * in a later call to ReorderBufferAbortOld().
1586          */
1587         dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1588         {
1589                 ReorderBufferTXN * txn;
1590
1591                 txn = dlist_container(ReorderBufferTXN, node, it.cur);
1592
1593                 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1594                 {
1595                         elog(DEBUG1, "aborting old transaction %u", txn->xid);
1596
1597                         /* remove potential on-disk data, and deallocate this tx */
1598                         ReorderBufferCleanupTXN(rb, txn);
1599                 }
1600                 else
1601                         return;
1602         }
1603 }
1604
1605 /*
1606  * Forget the contents of a transaction if we aren't interested in it's
1607  * contents. Needs to be first called for subtransactions and then for the
1608  * toplevel xid.
1609  *
1610  * This is significantly different to ReorderBufferAbort() because
1611  * transactions that have committed need to be treated differenly from aborted
1612  * ones since they may have modified the catalog.
1613  *
1614  * Note that this is only allowed to be called in the moment a transaction
1615  * commit has just been read, not earlier; otherwise later records referring
1616  * to this xid might re-create the transaction incompletely.
1617  */
1618 void
1619 ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1620 {
1621         ReorderBufferTXN *txn;
1622
1623         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1624                                                                 false);
1625
1626         /* unknown, nothing to forget */
1627         if (txn == NULL)
1628                 return;
1629
1630         /* cosmetic... */
1631         txn->final_lsn = lsn;
1632
1633         /*
1634          * Proccess cache invalidation messages if there are any. Even if we're
1635          * not interested in the transaction's contents, it could have manipulated
1636          * the catalog and we need to update the caches according to that.
1637          */
1638         if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1639         {
1640                 /* setup snapshot to perform the invalidations in */
1641                 SetupHistoricSnapshot(txn->base_snapshot, txn->tuplecid_hash);
1642                 PG_TRY();
1643                 {
1644                         ReorderBufferExecuteInvalidations(rb, txn);
1645                         TeardownHistoricSnapshot(false);
1646                 }
1647                 PG_CATCH();
1648                 {
1649                         /* cleanup */
1650                         TeardownHistoricSnapshot(true);
1651                         PG_RE_THROW();
1652                 }
1653                 PG_END_TRY();
1654         }
1655         else
1656                 Assert(txn->ninvalidations == 0);
1657
1658         /* remove potential on-disk data, and deallocate */
1659         ReorderBufferCleanupTXN(rb, txn);
1660 }
1661
1662
1663 /*
1664  * Check whether a transaction is already known in this module.xs
1665  */
1666 bool
1667 ReorderBufferIsXidKnown(ReorderBuffer *rb, TransactionId xid)
1668 {
1669         ReorderBufferTXN *txn;
1670
1671         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1672                                                                 false);
1673         return txn != NULL;
1674 }
1675
1676 /*
1677  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
1678  * because the previous snapshot doesn't describe the catalog correctly for
1679  * following rows.
1680  */
1681 void
1682 ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
1683                                                  XLogRecPtr lsn, Snapshot snap)
1684 {
1685         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1686
1687         change->data.snapshot = snap;
1688         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
1689
1690         ReorderBufferQueueChange(rb, xid, lsn, change);
1691 }
1692
1693 /*
1694  * Setup the base snapshot of a transaction. The base snapshot is the snapshot
1695  * that is used to decode all changes until either this transaction modifies
1696  * the catalog or another catalog modifying transaction commits.
1697  *
1698  * Needs to be called before any changes are added with
1699  * ReorderBufferQueueChange().
1700  */
1701 void
1702 ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
1703                                                          XLogRecPtr lsn, Snapshot snap)
1704 {
1705         ReorderBufferTXN *txn;
1706         bool            is_new;
1707
1708         txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
1709         Assert(txn->base_snapshot == NULL);
1710         Assert(snap != NULL);
1711
1712         txn->base_snapshot = snap;
1713         txn->base_snapshot_lsn = lsn;
1714 }
1715
1716 /*
1717  * Access the catalog with this CommandId at this point in the changestream.
1718  *
1719  * May only be called for command ids > 1
1720  */
1721 void
1722 ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
1723                                                          XLogRecPtr lsn, CommandId cid)
1724 {
1725         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1726
1727         change->data.command_id = cid;
1728         change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
1729
1730         ReorderBufferQueueChange(rb, xid, lsn, change);
1731 }
1732
1733
1734 /*
1735  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
1736  */
1737 void
1738 ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
1739                                                          XLogRecPtr lsn, RelFileNode node,
1740                                                          ItemPointerData tid, CommandId cmin,
1741                                                          CommandId cmax, CommandId combocid)
1742 {
1743         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1744         ReorderBufferTXN *txn;
1745
1746         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1747
1748         change->data.tuplecid.node = node;
1749         change->data.tuplecid.tid = tid;
1750         change->data.tuplecid.cmin = cmin;
1751         change->data.tuplecid.cmax = cmax;
1752         change->data.tuplecid.combocid = combocid;
1753         change->lsn = lsn;
1754         change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
1755
1756         dlist_push_tail(&txn->tuplecids, &change->node);
1757         txn->ntuplecids++;
1758 }
1759
1760 /*
1761  * Setup the invalidation of the toplevel transaction.
1762  *
1763  * This needs to be done before ReorderBufferCommit is called!
1764  */
1765 void
1766 ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
1767                                                           XLogRecPtr lsn, Size nmsgs,
1768                                                           SharedInvalidationMessage *msgs)
1769 {
1770         ReorderBufferTXN *txn;
1771
1772         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1773
1774         if (txn->ninvalidations != 0)
1775                 elog(ERROR, "only ever add one set of invalidations");
1776
1777         Assert(nmsgs > 0);
1778
1779         txn->ninvalidations = nmsgs;
1780         txn->invalidations = (SharedInvalidationMessage *)
1781                 MemoryContextAlloc(rb->context,
1782                                                    sizeof(SharedInvalidationMessage) * nmsgs);
1783         memcpy(txn->invalidations, msgs,
1784                    sizeof(SharedInvalidationMessage) * nmsgs);
1785 }
1786
1787 /*
1788  * Apply all invalidations we know. Possibly we only need parts at this point
1789  * in the changestream but we don't know which those are.
1790  */
1791 static void
1792 ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
1793 {
1794         int                     i;
1795
1796         for (i = 0; i < txn->ninvalidations; i++)
1797                 LocalExecuteInvalidationMessage(&txn->invalidations[i]);
1798 }
1799
1800 /*
1801  * Mark a transaction as containing catalog changes
1802  */
1803 void
1804 ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
1805                                                                   XLogRecPtr lsn)
1806 {
1807         ReorderBufferTXN *txn;
1808
1809         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1810
1811         txn->has_catalog_changes = true;
1812 }
1813
1814 /*
1815  * Query whether a transaction is already *known* to contain catalog
1816  * changes. This can be wrong until directly before the commit!
1817  */
1818 bool
1819 ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
1820 {
1821         ReorderBufferTXN *txn;
1822
1823         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1824                                                                 false);
1825         if (txn == NULL)
1826                 return false;
1827
1828         return txn->has_catalog_changes;
1829 }
1830
1831 /*
1832  * Have we already added the first snapshot?
1833  */
1834 bool
1835 ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
1836 {
1837         ReorderBufferTXN *txn;
1838
1839         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1840                                                                 false);
1841
1842         /* transaction isn't known yet, ergo no snapshot */
1843         if (txn == NULL)
1844                 return false;
1845
1846         /*
1847          * TODO: It would be a nice improvement if we would check the toplevel
1848          * transaction in subtransactions, but we'd need to keep track of a bit
1849          * more state.
1850          */
1851         return txn->base_snapshot != NULL;
1852 }
1853
1854
1855 /*
1856  * ---------------------------------------
1857  * Disk serialization support
1858  * ---------------------------------------
1859  */
1860
1861 /*
1862  * Ensure the IO buffer is >= sz.
1863  */
1864 static void
1865 ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
1866 {
1867         if (!rb->outbufsize)
1868         {
1869                 rb->outbuf = MemoryContextAlloc(rb->context, sz);
1870                 rb->outbufsize = sz;
1871         }
1872         else if (rb->outbufsize < sz)
1873         {
1874                 rb->outbuf = repalloc(rb->outbuf, sz);
1875                 rb->outbufsize = sz;
1876         }
1877 }
1878
1879 /*
1880  * Check whether the transaction tx should spill its data to disk.
1881  */
1882 static void
1883 ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1884 {
1885         /*
1886          * TODO: improve accounting so we cheaply can take subtransactions into
1887          * account here.
1888          */
1889         if (txn->nentries_mem >= max_changes_in_memory)
1890         {
1891                 ReorderBufferSerializeTXN(rb, txn);
1892                 Assert(txn->nentries_mem == 0);
1893         }
1894 }
1895
1896 /*
1897  * Spill data of a large transaction (and its subtransactions) to disk.
1898  */
1899 static void
1900 ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1901 {
1902         dlist_iter      subtxn_i;
1903         dlist_mutable_iter change_i;
1904         int                     fd = -1;
1905         XLogSegNo       curOpenSegNo = 0;
1906         Size            spilled = 0;
1907         char            path[MAXPGPATH];
1908
1909         elog(DEBUG2, "spill %u changes in XID %u to disk",
1910                  (uint32) txn->nentries_mem, txn->xid);
1911
1912         /* do the same to all child TXs */
1913         dlist_foreach(subtxn_i, &txn->subtxns)
1914         {
1915                 ReorderBufferTXN *subtxn;
1916
1917                 subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
1918                 ReorderBufferSerializeTXN(rb, subtxn);
1919         }
1920
1921         /* serialize changestream */
1922         dlist_foreach_modify(change_i, &txn->changes)
1923         {
1924                 ReorderBufferChange *change;
1925
1926                 change = dlist_container(ReorderBufferChange, node, change_i.cur);
1927
1928                 /*
1929                  * store in segment in which it belongs by start lsn, don't split over
1930                  * multiple segments tho
1931                  */
1932                 if (fd == -1 || XLByteInSeg(change->lsn, curOpenSegNo))
1933                 {
1934                         XLogRecPtr      recptr;
1935
1936                         if (fd != -1)
1937                                 CloseTransientFile(fd);
1938
1939                         XLByteToSeg(change->lsn, curOpenSegNo);
1940                         XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
1941
1942                         /*
1943                          * No need to care about TLIs here, only used during a single run,
1944                          * so each LSN only maps to a specific WAL record.
1945                          */
1946                         sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
1947                                         NameStr(MyReplicationSlot->data.name), txn->xid,
1948                                         (uint32) (recptr >> 32), (uint32) recptr);
1949
1950                         /* open segment, create it if necessary */
1951                         fd = OpenTransientFile(path,
1952                                                                    O_CREAT | O_WRONLY | O_APPEND | PG_BINARY,
1953                                                                    S_IRUSR | S_IWUSR);
1954
1955                         if (fd < 0)
1956                                 ereport(ERROR,
1957                                                 (errcode_for_file_access(),
1958                                                  errmsg("could not open file \"%s\": %m",
1959                                                                 path)));
1960                 }
1961
1962                 ReorderBufferSerializeChange(rb, txn, fd, change);
1963                 dlist_delete(&change->node);
1964                 ReorderBufferReturnChange(rb, change);
1965
1966                 spilled++;
1967         }
1968
1969         Assert(spilled == txn->nentries_mem);
1970         Assert(dlist_is_empty(&txn->changes));
1971         txn->nentries_mem = 0;
1972
1973         if (fd != -1)
1974                 CloseTransientFile(fd);
1975 }
1976
1977 /*
1978  * Serialize individual change to disk.
1979  */
1980 static void
1981 ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
1982                                                          int fd, ReorderBufferChange *change)
1983 {
1984         ReorderBufferDiskChange *ondisk;
1985         Size            sz = sizeof(ReorderBufferDiskChange);
1986
1987         ReorderBufferSerializeReserve(rb, sz);
1988
1989         ondisk = (ReorderBufferDiskChange *) rb->outbuf;
1990         memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
1991
1992         switch (change->action)
1993         {
1994                 case REORDER_BUFFER_CHANGE_INSERT:
1995                         /* fall through */
1996                 case REORDER_BUFFER_CHANGE_UPDATE:
1997                         /* fall through */
1998                 case REORDER_BUFFER_CHANGE_DELETE:
1999                         {
2000                                 char       *data;
2001                                 ReorderBufferTupleBuf *oldtup, *newtup;
2002                                 Size            oldlen = 0;
2003                                 Size            newlen = 0;
2004
2005                                 oldtup = change->data.tp.oldtuple;
2006                                 newtup = change->data.tp.newtuple;
2007
2008                                 if (oldtup)
2009                                         oldlen = offsetof(ReorderBufferTupleBuf, data)
2010                                                 + oldtup->tuple.t_len
2011                                                 - offsetof(HeapTupleHeaderData, t_bits);
2012
2013                                 if (newtup)
2014                                         newlen = offsetof(ReorderBufferTupleBuf, data)
2015                                                 + newtup->tuple.t_len
2016                                                 - offsetof(HeapTupleHeaderData, t_bits);
2017
2018                                 sz += oldlen;
2019                                 sz += newlen;
2020
2021                                 /* make sure we have enough space */
2022                                 ReorderBufferSerializeReserve(rb, sz);
2023
2024                                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2025                                 /* might have been reallocated above */
2026                                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2027
2028                                 if (oldlen)
2029                                 {
2030                                         memcpy(data, oldtup, oldlen);
2031                                         data += oldlen;
2032                                 }
2033
2034                                 if (newlen)
2035                                 {
2036                                         memcpy(data, newtup, newlen);
2037                                         data += newlen;
2038                                 }
2039                                 break;
2040                         }
2041                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2042                         {
2043                                 Snapshot        snap;
2044                                 char       *data;
2045
2046                                 snap = change->data.snapshot;
2047
2048                                 sz += sizeof(SnapshotData) +
2049                                         sizeof(TransactionId) * snap->xcnt +
2050                                         sizeof(TransactionId) * snap->subxcnt
2051                                         ;
2052
2053                                 /* make sure we have enough space */
2054                                 ReorderBufferSerializeReserve(rb, sz);
2055                                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2056                                 /* might have been reallocated above */
2057                                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2058
2059                                 memcpy(data, snap, sizeof(SnapshotData));
2060                                 data += sizeof(SnapshotData);
2061
2062                                 if (snap->xcnt)
2063                                 {
2064                                         memcpy(data, snap->xip,
2065                                                    sizeof(TransactionId) + snap->xcnt);
2066                                         data += sizeof(TransactionId) + snap->xcnt;
2067                                 }
2068
2069                                 if (snap->subxcnt)
2070                                 {
2071                                         memcpy(data, snap->subxip,
2072                                                    sizeof(TransactionId) + snap->subxcnt);
2073                                         data += sizeof(TransactionId) + snap->subxcnt;
2074                                 }
2075                                 break;
2076                         }
2077                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2078                         /* ReorderBufferChange contains everything important */
2079                         break;
2080                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2081                         /* ReorderBufferChange contains everything important */
2082                         break;
2083         }
2084
2085         ondisk->size = sz;
2086
2087         if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2088         {
2089                 CloseTransientFile(fd);
2090                 ereport(ERROR,
2091                                 (errcode_for_file_access(),
2092                                  errmsg("could not write to data file for XID %u: %m",
2093                                                 txn->xid)));
2094         }
2095
2096         Assert(ondisk->change.action == change->action);
2097 }
2098
2099 /*
2100  * Restore a number of changes spilled to disk back into memory.
2101  */
2102 static Size
2103 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2104                                                         int *fd, XLogSegNo *segno)
2105 {
2106         Size            restored = 0;
2107         XLogSegNo       last_segno;
2108         dlist_mutable_iter cleanup_iter;
2109
2110         Assert(txn->first_lsn != InvalidXLogRecPtr);
2111         Assert(txn->final_lsn != InvalidXLogRecPtr);
2112
2113         /* free current entries, so we have memory for more */
2114         dlist_foreach_modify(cleanup_iter, &txn->changes)
2115         {
2116                 ReorderBufferChange *cleanup =
2117                 dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2118
2119                 dlist_delete(&cleanup->node);
2120                 ReorderBufferReturnChange(rb, cleanup);
2121         }
2122         txn->nentries_mem = 0;
2123         Assert(dlist_is_empty(&txn->changes));
2124
2125         XLByteToSeg(txn->final_lsn, last_segno);
2126
2127         while (restored < max_changes_in_memory && *segno <= last_segno)
2128         {
2129                 int                     readBytes;
2130                 ReorderBufferDiskChange *ondisk;
2131
2132                 if (*fd == -1)
2133                 {
2134                         XLogRecPtr      recptr;
2135                         char            path[MAXPGPATH];
2136
2137                         /* first time in */
2138                         if (*segno == 0)
2139                         {
2140                                 XLByteToSeg(txn->first_lsn, *segno);
2141                         }
2142
2143                         Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2144                         XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
2145
2146                         /*
2147                          * No need to care about TLIs here, only used during a single run,
2148                          * so each LSN only maps to a specific WAL record.
2149                          */
2150                         sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2151                                         NameStr(MyReplicationSlot->data.name), txn->xid,
2152                                         (uint32) (recptr >> 32), (uint32) recptr);
2153
2154                         *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2155                         if (*fd < 0 && errno == ENOENT)
2156                         {
2157                                 *fd = -1;
2158                                 (*segno)++;
2159                                 continue;
2160                         }
2161                         else if (*fd < 0)
2162                                 ereport(ERROR,
2163                                                 (errcode_for_file_access(),
2164                                                  errmsg("could not open file \"%s\": %m",
2165                                                                 path)));
2166
2167                 }
2168
2169                 ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
2170
2171
2172                 /*
2173                  * Read the statically sized part of a change which has information
2174                  * about the total size. If we couldn't read a record, we're at the
2175                  * end of this file.
2176                  */
2177
2178                 readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2179
2180                 /* eof */
2181                 if (readBytes == 0)
2182                 {
2183                         CloseTransientFile(*fd);
2184                         *fd = -1;
2185                         (*segno)++;
2186                         continue;
2187                 }
2188                 else if (readBytes < 0)
2189                         ereport(ERROR,
2190                                         (errcode_for_file_access(),
2191                                          errmsg("could not read from reorderbuffer spill file: %m")));
2192                 else if (readBytes != sizeof(ReorderBufferDiskChange))
2193                         ereport(ERROR,
2194                                         (errcode_for_file_access(),
2195                                          errmsg("incomplete read from reorderbuffer spill file: read %d instead of %u bytes",
2196                                                         readBytes,
2197                                                         (uint32) sizeof(ReorderBufferDiskChange))));
2198
2199                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2200
2201                 ReorderBufferSerializeReserve(rb,
2202                                                                           sizeof(ReorderBufferDiskChange) + ondisk->size);
2203                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2204
2205                 readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2206                                                  ondisk->size - sizeof(ReorderBufferDiskChange));
2207
2208                 if (readBytes < 0)
2209                         ereport(ERROR,
2210                                         (errcode_for_file_access(),
2211                                          errmsg("could not read from reorderbuffer spill file: %m")));
2212                 else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2213                         ereport(ERROR,
2214                                         (errcode_for_file_access(),
2215                                          errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2216                                                         readBytes,
2217                                                         (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2218
2219                 /*
2220                  * ok, read a full change from disk, now restore it into proper
2221                  * in-memory format
2222                  */
2223                 ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2224                 restored++;
2225         }
2226
2227         return restored;
2228 }
2229
2230 /*
2231  * Convert change from its on-disk format to in-memory format and queue it onto
2232  * the TXN's ->changes list.
2233  */
2234 static void
2235 ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2236                                                    char *data)
2237 {
2238         ReorderBufferDiskChange *ondisk;
2239         ReorderBufferChange *change;
2240
2241         ondisk = (ReorderBufferDiskChange *) data;
2242
2243         change = ReorderBufferGetChange(rb);
2244
2245         /* copy static part */
2246         memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2247
2248         data += sizeof(ReorderBufferDiskChange);
2249
2250         /* restore individual stuff */
2251         switch (change->action)
2252         {
2253                 case REORDER_BUFFER_CHANGE_INSERT:
2254                         /* fall through */
2255                 case REORDER_BUFFER_CHANGE_UPDATE:
2256                         /* fall through */
2257                 case REORDER_BUFFER_CHANGE_DELETE:
2258                         if (change->data.tp.newtuple)
2259                         {
2260                                 Size            len = offsetof(ReorderBufferTupleBuf, data)
2261                                 +((ReorderBufferTupleBuf *) data)->tuple.t_len
2262                                 - offsetof(HeapTupleHeaderData, t_bits);
2263
2264                                 change->data.tp.newtuple = ReorderBufferGetTupleBuf(rb);
2265                                 memcpy(change->data.tp.newtuple, data, len);
2266                                 change->data.tp.newtuple->tuple.t_data =
2267                                         &change->data.tp.newtuple->header;
2268                                 data += len;
2269                         }
2270
2271                         if (change->data.tp.oldtuple)
2272                         {
2273                                 Size            len = offsetof(ReorderBufferTupleBuf, data)
2274                                 +((ReorderBufferTupleBuf *) data)->tuple.t_len
2275                                 - offsetof(HeapTupleHeaderData, t_bits);
2276
2277                                 change->data.tp.oldtuple = ReorderBufferGetTupleBuf(rb);
2278                                 memcpy(change->data.tp.oldtuple, data, len);
2279                                 change->data.tp.oldtuple->tuple.t_data =
2280                                         &change->data.tp.oldtuple->header;
2281                                 data += len;
2282                         }
2283                         break;
2284                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2285                         {
2286                                 Snapshot        oldsnap;
2287                                 Snapshot        newsnap;
2288                                 Size            size;
2289
2290                                 oldsnap = (Snapshot) data;
2291
2292                                 size = sizeof(SnapshotData) +
2293                                         sizeof(TransactionId) * oldsnap->xcnt +
2294                                         sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2295
2296                                 change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2297
2298                                 newsnap = change->data.snapshot;
2299
2300                                 memcpy(newsnap, data, size);
2301                                 newsnap->xip = (TransactionId *)
2302                                         (((char *) newsnap) + sizeof(SnapshotData));
2303                                 newsnap->subxip = newsnap->xip + newsnap->xcnt;
2304                                 newsnap->copied = true;
2305                                 break;
2306                         }
2307                         /* the base struct contains all the data, easy peasy */
2308                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2309                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2310                         break;
2311         }
2312
2313         dlist_push_tail(&txn->changes, &change->node);
2314         txn->nentries_mem++;
2315 }
2316
2317 /*
2318  * Remove all on-disk stored for the passed in transaction.
2319  */
2320 static void
2321 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
2322 {
2323         XLogSegNo       first;
2324         XLogSegNo       cur;
2325         XLogSegNo       last;
2326
2327         Assert(txn->first_lsn != InvalidXLogRecPtr);
2328         Assert(txn->final_lsn != InvalidXLogRecPtr);
2329
2330         XLByteToSeg(txn->first_lsn, first);
2331         XLByteToSeg(txn->final_lsn, last);
2332
2333         /* iterate over all possible filenames, and delete them */
2334         for (cur = first; cur <= last; cur++)
2335         {
2336                 char            path[MAXPGPATH];
2337                 XLogRecPtr      recptr;
2338
2339                 XLogSegNoOffsetToRecPtr(cur, 0, recptr);
2340
2341                 sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2342                                 NameStr(MyReplicationSlot->data.name), txn->xid,
2343                                 (uint32) (recptr >> 32), (uint32) recptr);
2344                 if (unlink(path) != 0 && errno != ENOENT)
2345                         ereport(ERROR,
2346                                         (errcode_for_file_access(),
2347                                          errmsg("could not unlink file \"%s\": %m", path)));
2348         }
2349 }
2350
2351 /*
2352  * Delete all data spilled to disk after we've restarted/crashed. It will be
2353  * recreated when the respective slots are reused.
2354  */
2355 void
2356 StartupReorderBuffer(void)
2357 {
2358         DIR                *logical_dir;
2359         struct dirent *logical_de;
2360
2361         DIR                *spill_dir;
2362         struct dirent *spill_de;
2363
2364         logical_dir = AllocateDir("pg_replslot");
2365         while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2366         {
2367                 struct stat     statbuf;
2368                 char            path[MAXPGPATH];
2369
2370                 if (strcmp(logical_de->d_name, ".") == 0 ||
2371                         strcmp(logical_de->d_name, "..") == 0)
2372                         continue;
2373
2374                 /* if it cannot be a slot, skip the directory */
2375                 if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2376                         continue;
2377
2378                 /*
2379                  * ok, has to be a surviving logical slot, iterate and delete
2380                  * everythign starting with xid-*
2381                  */
2382                 sprintf(path, "pg_replslot/%s", logical_de->d_name);
2383
2384                 /* we're only creating directories here, skip if it's not our's */
2385                 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2386                         continue;
2387
2388                 spill_dir = AllocateDir(path);
2389                 while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2390                 {
2391                         if (strcmp(spill_de->d_name, ".") == 0 ||
2392                                 strcmp(spill_de->d_name, "..") == 0)
2393                                 continue;
2394
2395                         /* only look at names that can be ours */
2396                         if (strncmp(spill_de->d_name, "xid", 3) == 0)
2397                         {
2398                                 sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2399                                                 spill_de->d_name);
2400
2401                                 if (unlink(path) != 0)
2402                                         ereport(PANIC,
2403                                                         (errcode_for_file_access(),
2404                                                          errmsg("could not unlink file \"%s\": %m",
2405                                                                         path)));
2406                         }
2407                 }
2408                 FreeDir(spill_dir);
2409         }
2410         FreeDir(logical_dir);
2411 }
2412
2413 /* ---------------------------------------
2414  * toast reassembly support
2415  * ---------------------------------------
2416  */
2417
2418 /*
2419  * Initialize per tuple toast reconstruction support.
2420  */
2421 static void
2422 ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
2423 {
2424         HASHCTL         hash_ctl;
2425
2426         Assert(txn->toast_hash == NULL);
2427
2428         memset(&hash_ctl, 0, sizeof(hash_ctl));
2429         hash_ctl.keysize = sizeof(Oid);
2430         hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2431         hash_ctl.hash = tag_hash;
2432         hash_ctl.hcxt = rb->context;
2433         txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2434                                                                   HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
2435 }
2436
2437 /*
2438  * Per toast-chunk handling for toast reconstruction
2439  *
2440  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
2441  * toasted Datum comes along.
2442  */
2443 static void
2444 ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
2445                                                           Relation relation, ReorderBufferChange *change)
2446 {
2447         ReorderBufferToastEnt *ent;
2448         ReorderBufferTupleBuf *newtup;
2449         bool            found;
2450         int32           chunksize;
2451         bool            isnull;
2452         Pointer         chunk;
2453         TupleDesc       desc = RelationGetDescr(relation);
2454         Oid                     chunk_id;
2455         Oid                     chunk_seq;
2456
2457         if (txn->toast_hash == NULL)
2458                 ReorderBufferToastInitHash(rb, txn);
2459
2460         Assert(IsToastRelation(relation));
2461
2462         newtup = change->data.tp.newtuple;
2463         chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2464         Assert(!isnull);
2465         chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2466         Assert(!isnull);
2467
2468         ent = (ReorderBufferToastEnt *)
2469                 hash_search(txn->toast_hash,
2470                                         (void *) &chunk_id,
2471                                         HASH_ENTER,
2472                                         &found);
2473
2474         if (!found)
2475         {
2476                 Assert(ent->chunk_id == chunk_id);
2477                 ent->num_chunks = 0;
2478                 ent->last_chunk_seq = 0;
2479                 ent->size = 0;
2480                 ent->reconstructed = NULL;
2481                 dlist_init(&ent->chunks);
2482
2483                 if (chunk_seq != 0)
2484                         elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2485                                  chunk_seq, chunk_id);
2486         }
2487         else if (found && chunk_seq != ent->last_chunk_seq + 1)
2488                 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2489                          chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2490
2491         chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2492         Assert(!isnull);
2493
2494         /* calculate size so we can allocate the right size at once later */
2495         if (!VARATT_IS_EXTENDED(chunk))
2496                 chunksize = VARSIZE(chunk) - VARHDRSZ;
2497         else if (VARATT_IS_SHORT(chunk))
2498                 /* could happen due to heap_form_tuple doing its thing */
2499                 chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2500         else
2501                 elog(ERROR, "unexpected type of toast chunk");
2502
2503         ent->size += chunksize;
2504         ent->last_chunk_seq = chunk_seq;
2505         ent->num_chunks++;
2506         dlist_push_tail(&ent->chunks, &change->node);
2507 }
2508
2509 /*
2510  * Rejigger change->newtuple to point to in-memory toast tuples instead to
2511  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
2512  *
2513  * We cannot replace unchanged toast tuples though, so those will still point
2514  * to on-disk toast data.
2515  */
2516 static void
2517 ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
2518                                                   Relation relation, ReorderBufferChange *change)
2519 {
2520         TupleDesc       desc;
2521         int                     natt;
2522         Datum      *attrs;
2523         bool       *isnull;
2524         bool       *free;
2525         HeapTuple       tmphtup;
2526         Relation        toast_rel;
2527         TupleDesc       toast_desc;
2528         MemoryContext oldcontext;
2529         ReorderBufferTupleBuf *newtup;
2530
2531         /* no toast tuples changed */
2532         if (txn->toast_hash == NULL)
2533                 return;
2534
2535         oldcontext = MemoryContextSwitchTo(rb->context);
2536
2537         /* we should only have toast tuples in an INSERT or UPDATE */
2538         Assert(change->data.tp.newtuple);
2539
2540         desc = RelationGetDescr(relation);
2541
2542         toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
2543         toast_desc = RelationGetDescr(toast_rel);
2544
2545         /* should we allocate from stack instead? */
2546         attrs = palloc0(sizeof(Datum) * desc->natts);
2547         isnull = palloc0(sizeof(bool) * desc->natts);
2548         free = palloc0(sizeof(bool) * desc->natts);
2549
2550         newtup = change->data.tp.newtuple;
2551
2552         heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
2553
2554         for (natt = 0; natt < desc->natts; natt++)
2555         {
2556                 Form_pg_attribute attr = desc->attrs[natt];
2557                 ReorderBufferToastEnt *ent;
2558                 struct varlena *varlena;
2559
2560                 /* va_rawsize is the size of the original datum -- including header */
2561                 struct varatt_external toast_pointer;
2562                 struct varatt_indirect redirect_pointer;
2563                 struct varlena *new_datum = NULL;
2564                 struct varlena *reconstructed;
2565                 dlist_iter      it;
2566                 Size            data_done = 0;
2567
2568                 /* system columns aren't toasted */
2569                 if (attr->attnum < 0)
2570                         continue;
2571
2572                 if (attr->attisdropped)
2573                         continue;
2574
2575                 /* not a varlena datatype */
2576                 if (attr->attlen != -1)
2577                         continue;
2578
2579                 /* no data */
2580                 if (isnull[natt])
2581                         continue;
2582
2583                 /* ok, we know we have a toast datum */
2584                 varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
2585
2586                 /* no need to do anything if the tuple isn't external */
2587                 if (!VARATT_IS_EXTERNAL(varlena))
2588                         continue;
2589
2590                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
2591
2592                 /*
2593                  * Check whether the toast tuple changed, replace if so.
2594                  */
2595                 ent = (ReorderBufferToastEnt *)
2596                         hash_search(txn->toast_hash,
2597                                                 (void *) &toast_pointer.va_valueid,
2598                                                 HASH_FIND,
2599                                                 NULL);
2600                 if (ent == NULL)
2601                         continue;
2602
2603                 new_datum =
2604                         (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
2605
2606                 free[natt] = true;
2607
2608                 reconstructed = palloc0(toast_pointer.va_rawsize);
2609
2610                 ent->reconstructed = reconstructed;
2611
2612                 /* stitch toast tuple back together from its parts */
2613                 dlist_foreach(it, &ent->chunks)
2614                 {
2615                         bool            isnull;
2616                         ReorderBufferChange *cchange;
2617                         ReorderBufferTupleBuf *ctup;
2618                         Pointer         chunk;
2619
2620                         cchange = dlist_container(ReorderBufferChange, node, it.cur);
2621                         ctup = cchange->data.tp.newtuple;
2622                         chunk = DatumGetPointer(
2623                                 fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
2624
2625                         Assert(!isnull);
2626                         Assert(!VARATT_IS_EXTERNAL(chunk));
2627                         Assert(!VARATT_IS_SHORT(chunk));
2628
2629                         memcpy(VARDATA(reconstructed) + data_done,
2630                                    VARDATA(chunk),
2631                                    VARSIZE(chunk) - VARHDRSZ);
2632                         data_done += VARSIZE(chunk) - VARHDRSZ;
2633                 }
2634                 Assert(data_done == toast_pointer.va_extsize);
2635
2636                 /* make sure its marked as compressed or not */
2637                 if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2638                         SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
2639                 else
2640                         SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
2641
2642                 memset(&redirect_pointer, 0, sizeof(redirect_pointer));
2643                 redirect_pointer.pointer = reconstructed;
2644
2645                 SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
2646                 memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
2647                            sizeof(redirect_pointer));
2648
2649                 attrs[natt] = PointerGetDatum(new_datum);
2650         }
2651
2652         /*
2653          * Build tuple in separate memory & copy tuple back into the tuplebuf
2654          * passed to the output plugin. We can't directly heap_fill_tuple() into
2655          * the tuplebuf because attrs[] will point back into the current content.
2656          */
2657         tmphtup = heap_form_tuple(desc, attrs, isnull);
2658         Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
2659         Assert(&newtup->header == newtup->tuple.t_data);
2660
2661         memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
2662         newtup->tuple.t_len = tmphtup->t_len;
2663
2664         /*
2665          * free resources we won't further need, more persistent stuff will be
2666          * free'd in ReorderBufferToastReset().
2667          */
2668         RelationClose(toast_rel);
2669         pfree(tmphtup);
2670         for (natt = 0; natt < desc->natts; natt++)
2671         {
2672                 if (free[natt])
2673                         pfree(DatumGetPointer(attrs[natt]));
2674         }
2675         pfree(attrs);
2676         pfree(free);
2677         pfree(isnull);
2678
2679         MemoryContextSwitchTo(oldcontext);
2680 }
2681
2682 /*
2683  * Free all resources allocated for toast reconstruction.
2684  */
2685 static void
2686 ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
2687 {
2688         HASH_SEQ_STATUS hstat;
2689         ReorderBufferToastEnt *ent;
2690
2691         if (txn->toast_hash == NULL)
2692                 return;
2693
2694         /* sequentially walk over the hash and free everything */
2695         hash_seq_init(&hstat, txn->toast_hash);
2696         while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
2697         {
2698                 dlist_mutable_iter it;
2699
2700                 if (ent->reconstructed != NULL)
2701                         pfree(ent->reconstructed);
2702
2703                 dlist_foreach_modify(it, &ent->chunks)
2704                 {
2705                         ReorderBufferChange *change =
2706                         dlist_container(ReorderBufferChange, node, it.cur);
2707
2708                         dlist_delete(&change->node);
2709                         ReorderBufferReturnChange(rb, change);
2710                 }
2711         }
2712
2713         hash_destroy(txn->toast_hash);
2714         txn->toast_hash = NULL;
2715 }
2716
2717
2718 /* ---------------------------------------
2719  * Visibility support for logical decoding
2720  *
2721  *
2722  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
2723  * always rely on stored cmin/cmax values because of two scenarios:
2724  *
2725  * * A tuple got changed multiple times during a single transaction and thus
2726  *       has got a combocid. Combocid's are only valid for the duration of a
2727  *       single transaction.
2728  * * A tuple with a cmin but no cmax (and thus no combocid) got
2729  *       deleted/updated in another transaction than the one which created it
2730  *       which we are looking at right now. As only one of cmin, cmax or combocid
2731  *       is actually stored in the heap we don't have access to the value we
2732  *       need anymore.
2733  *
2734  * To resolve those problems we have a per-transaction hash of (cmin,
2735  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
2736  * (cmin, cmax) values. That also takes care of combocids by simply
2737  * not caring about them at all. As we have the real cmin/cmax values
2738  * combocids aren't interesting.
2739  *
2740  * As we only care about catalog tuples here the overhead of this
2741  * hashtable should be acceptable.
2742  *
2743  * Heap rewrites complicate this a bit, check rewriteheap.c for
2744  * details.
2745  * -------------------------------------------------------------------------
2746  */
2747
2748 /* struct for qsort()ing mapping files by lsn somewhat efficiently */
2749 typedef struct RewriteMappingFile
2750 {
2751         XLogRecPtr      lsn;
2752         char            fname[MAXPGPATH];
2753 } RewriteMappingFile;
2754
2755 #if NOT_USED
2756 static void
2757 DisplayMapping(HTAB *tuplecid_data)
2758 {
2759         HASH_SEQ_STATUS hstat;
2760         ReorderBufferTupleCidEnt *ent;
2761
2762         hash_seq_init(&hstat, tuplecid_data);
2763         while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
2764         {
2765                 elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
2766                          ent->key.relnode.dbNode,
2767                          ent->key.relnode.spcNode,
2768                          ent->key.relnode.relNode,
2769                          BlockIdGetBlockNumber(&ent->key.tid.ip_blkid),
2770                          ent->key.tid.ip_posid,
2771                          ent->cmin,
2772                          ent->cmax
2773                         );
2774         }
2775 }
2776 #endif
2777
2778 /*
2779  * Apply a single mapping file to tuplecid_data.
2780  *
2781  * The mapping file has to have been verified to be a) committed b) for our
2782  * transaction c) applied in LSN order.
2783  */
2784 static void
2785 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
2786 {
2787         char            path[MAXPGPATH];
2788         int                     fd;
2789         int                     readBytes;
2790         LogicalRewriteMappingData map;
2791
2792         sprintf(path, "pg_llog/mappings/%s", fname);
2793         fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2794         if (fd < 0)
2795                 ereport(ERROR,
2796                                 (errmsg("could not open file \"%s\": %m", path)));
2797
2798         while (true)
2799         {
2800                 ReorderBufferTupleCidKey key;
2801                 ReorderBufferTupleCidEnt *ent;
2802                 ReorderBufferTupleCidEnt *new_ent;
2803                 bool found;
2804
2805                 /* be careful about padding */
2806                 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
2807
2808                 /* read all mappings till the end of the file */
2809                 readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
2810
2811                 if (readBytes < 0)
2812                         ereport(ERROR,
2813                                         (errcode_for_file_access(),
2814                                          errmsg("could not read file \"%s\": %m",
2815                                                         path)));
2816                 else if (readBytes == 0) /* EOF */
2817                         break;
2818                 else if (readBytes != sizeof(LogicalRewriteMappingData))
2819                         ereport(ERROR,
2820                                         (errcode_for_file_access(),
2821                                          errmsg("could not read from file \"%s\": read %d instead of %d bytes",
2822                                                         path, readBytes,
2823                                                         (int32) sizeof(LogicalRewriteMappingData))));
2824
2825                 key.relnode = map.old_node;
2826                 ItemPointerCopy(&map.old_tid,
2827                                                 &key.tid);
2828
2829
2830                 ent = (ReorderBufferTupleCidEnt *)
2831                         hash_search(tuplecid_data,
2832                                                 (void *) &key,
2833                                                 HASH_FIND,
2834                                                 NULL);
2835
2836                 /* no existing mapping, no need to update */
2837                 if (!ent)
2838                         continue;
2839
2840                 key.relnode = map.new_node;
2841                 ItemPointerCopy(&map.new_tid,
2842                                                 &key.tid);
2843
2844                 new_ent = (ReorderBufferTupleCidEnt *)
2845                         hash_search(tuplecid_data,
2846                                                 (void *) &key,
2847                                                 HASH_ENTER,
2848                                                 &found);
2849
2850                 if (found)
2851                 {
2852                         /*
2853                          * Make sure the existing mapping makes sense. We sometime update
2854                          * old records that did not yet have a cmax (e.g. pg_class' own
2855                          * entry while rewriting it) during rewrites, so allow that.
2856                          */
2857                         Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
2858                         Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
2859                 }
2860                 else
2861                 {
2862                         /* update mapping */
2863                         new_ent->cmin = ent->cmin;
2864                         new_ent->cmax = ent->cmax;
2865                         new_ent->combocid = ent->combocid;
2866                 }
2867         }
2868 }
2869
2870
2871 /*
2872  * Check whether the TransactionOId 'xid' is in the pre-sorted array 'xip'.
2873  */
2874 static bool
2875 TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
2876 {
2877         return bsearch(&xid, xip, num,
2878                                    sizeof(TransactionId), xidComparator) != NULL;
2879 }
2880
2881 /*
2882  * qsort() comparator for sorting RewriteMappingFiles in LSN order.
2883  */
2884 static int
2885 file_sort_by_lsn(const void *a_p, const void *b_p)
2886 {
2887         RewriteMappingFile *a = *(RewriteMappingFile **)a_p;
2888         RewriteMappingFile *b = *(RewriteMappingFile **)b_p;
2889
2890         if (a->lsn < b->lsn)
2891                 return -1;
2892         else if (a->lsn > b->lsn)
2893                 return 1;
2894         return 0;
2895 }
2896
2897 /*
2898  * Apply any existing logical remapping files if there are any targeted at our
2899  * transaction for relid.
2900  */
2901 static void
2902 UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
2903 {
2904         DIR                *mapping_dir;
2905         struct dirent *mapping_de;
2906         List       *files = NIL;
2907         ListCell   *file;
2908         RewriteMappingFile **files_a;
2909         size_t          off;
2910         Oid                     dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
2911
2912         mapping_dir = AllocateDir("pg_llog/mappings");
2913         while ((mapping_de = ReadDir(mapping_dir, "pg_llog/mappings")) != NULL)
2914         {
2915                 Oid                             f_dboid;
2916                 Oid                             f_relid;
2917                 TransactionId   f_mapped_xid;
2918                 TransactionId   f_create_xid;
2919                 XLogRecPtr              f_lsn;
2920                 uint32                  f_hi, f_lo;
2921                 RewriteMappingFile *f;
2922
2923                 if (strcmp(mapping_de->d_name, ".") == 0 ||
2924                         strcmp(mapping_de->d_name, "..") == 0)
2925                         continue;
2926
2927                 /* Ignore files that aren't ours*/
2928                 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
2929                         continue;
2930
2931                 if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
2932                                    &f_dboid, &f_relid, &f_hi, &f_lo,
2933                                    &f_mapped_xid, &f_create_xid) != 6)
2934                         elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
2935
2936                 f_lsn = ((uint64) f_hi) << 32 | f_lo;
2937
2938                 /* mapping for another database */
2939                 if (f_dboid != dboid)
2940                         continue;
2941
2942                 /* mapping for another relation */
2943                 if (f_relid != relid)
2944                         continue;
2945
2946                 /* did the creating transaction abort? */
2947                 if (!TransactionIdDidCommit(f_create_xid))
2948                         continue;
2949
2950                 /* not for our transaction */
2951                 if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
2952                         continue;
2953
2954                 /* ok, relevant, queue for apply */
2955                 f = palloc(sizeof(RewriteMappingFile));
2956                 f->lsn = f_lsn;
2957                 strcpy(f->fname, mapping_de->d_name);
2958                 files = lappend(files, f);
2959         }
2960         FreeDir(mapping_dir);
2961
2962         /* build array we can easily sort */
2963         files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
2964         off = 0;
2965         foreach(file, files)
2966         {
2967                 files_a[off++] = lfirst(file);
2968         }
2969
2970         /* sort files so we apply them in LSN order */
2971         qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
2972                   file_sort_by_lsn);
2973
2974         for(off = 0; off < list_length(files); off++)
2975         {
2976                 RewriteMappingFile *f = files_a[off];
2977                 elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
2978                         snapshot->subxip[0]);
2979                 ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
2980                 pfree(f);
2981         }
2982 }
2983
2984 /*
2985  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
2986  * combocids.
2987  */
2988 bool
2989 ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
2990                                                           Snapshot snapshot,
2991                                                           HeapTuple htup, Buffer buffer,
2992                                                           CommandId *cmin, CommandId *cmax)
2993 {
2994         ReorderBufferTupleCidKey key;
2995         ReorderBufferTupleCidEnt *ent;
2996         ForkNumber      forkno;
2997         BlockNumber blockno;
2998         bool updated_mapping = false;
2999
3000         /* be careful about padding */
3001         memset(&key, 0, sizeof(key));
3002
3003         Assert(!BufferIsLocal(buffer));
3004
3005         /*
3006          * get relfilenode from the buffer, no convenient way to access it other
3007          * than that.
3008          */
3009         BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3010
3011         /* tuples can only be in the main fork */
3012         Assert(forkno == MAIN_FORKNUM);
3013         Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3014
3015         ItemPointerCopy(&htup->t_self,
3016                                         &key.tid);
3017
3018 restart:
3019         ent = (ReorderBufferTupleCidEnt *)
3020                 hash_search(tuplecid_data,
3021                                         (void *) &key,
3022                                         HASH_FIND,
3023                                         NULL);
3024
3025         /*
3026          * failed to find a mapping, check whether the table was rewritten and
3027          * apply mapping if so, but only do that once - there can be no new
3028          * mappings while we are in here since we have to hold a lock on the
3029          * relation.
3030          */
3031         if (ent == NULL && !updated_mapping)
3032         {
3033                 UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3034                 /* now check but don't update for a mapping again */
3035                 updated_mapping = true;
3036                 goto restart;
3037         }
3038         else if (ent == NULL)
3039                 return false;
3040
3041         if (cmin)
3042                 *cmin = ent->cmin;
3043         if (cmax)
3044                 *cmax = ent->cmax;
3045         return true;
3046 }