]> granicus.if.org Git - postgresql/blob - src/backend/replication/logical/reorderbuffer.c
Introduce logical decoding.
[postgresql] / src / backend / replication / logical / reorderbuffer.c
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  *        PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2014, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  *        This module gets handed individual pieces of transactions in the order
15  *        they are written to the WAL and is responsible to reassemble them into
16  *        toplevel transaction sized pieces. When a transaction is completely
17  *        reassembled - signalled by reading the transaction commit record - it
18  *        will then call the output plugin (c.f. ReorderBufferCommit()) with the
19  *        individual changes. The output plugins rely on snapshots built by
20  *        snapbuild.c which hands them to us.
21  *
22  *        Transactions and subtransactions/savepoints in postgres are not
23  *        immediately linked to each other from outside the performing
24  *        backend. Only at commit/abort (or special xact_assignment records) they
25  *        are linked together. Which means that we will have to splice together a
26  *        toplevel transaction from its subtransactions. To do that efficiently we
27  *        build a binary heap indexed by the smallest current lsn of the individual
28  *        subtransactions' changestreams. As the individual streams are inherently
29  *        ordered by LSN - since that is where we build them from - the transaction
30  *        can easily be reassembled by always using the subtransaction with the
31  *        smallest current LSN from the heap.
32  *
33  *        In order to cope with large transactions - which can be several times as
34  *        big as the available memory - this module supports spooling the contents
35  *        of a large transactions to disk. When the transaction is replayed the
36  *        contents of individual (sub-)transactions will be read from disk in
37  *        chunks.
38  *
39  *        This module also has to deal with reassembling toast records from the
40  *        individual chunks stored in WAL. When a new (or initial) version of a
41  *        tuple is stored in WAL it will always be preceded by the toast chunks
42  *        emitted for the columns stored out of line. Within a single toplevel
43  *        transaction there will be no other data carrying records between a row's
44  *        toast chunks and the row data itself. See ReorderBufferToast* for
45  *        details.
46  * -------------------------------------------------------------------------
47  */
48 #include "postgres.h"
49
50 #include <unistd.h>
51 #include <sys/stat.h>
52
53 #include "miscadmin.h"
54
55 #include "access/rewriteheap.h"
56 #include "access/transam.h"
57 #include "access/tuptoaster.h"
58 #include "access/xact.h"
59
60 #include "catalog/catalog.h"
61
62 #include "common/relpath.h"
63
64 #include "lib/binaryheap.h"
65
66 #include "replication/logical.h"
67 #include "replication/reorderbuffer.h"
68 #include "replication/slot.h"
69 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
70
71 #include "storage/bufmgr.h"
72 #include "storage/fd.h"
73 #include "storage/sinval.h"
74
75 #include "utils/builtins.h"
76 #include "utils/combocid.h"
77 #include "utils/memdebug.h"
78 #include "utils/memutils.h"
79 #include "utils/relcache.h"
80 #include "utils/relfilenodemap.h"
81 #include "utils/tqual.h"
82
83 /*
84  * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds
85  * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE
86  * changes. We don't want to leak those internal values to external users
87  * though (they would just use switch()...default:) because that would make it
88  * harder to add to new user visible values.
89  *
90  * This needs to be synchronized with ReorderBufferChangeType! Adjust the
91  * StaticAssertExpr's in ReorderBufferAllocate if you add anything!
92  */
93 typedef enum
94 {
95         REORDER_BUFFER_CHANGE_INTERNAL_INSERT,
96         REORDER_BUFFER_CHANGE_INTERNAL_UPDATE,
97         REORDER_BUFFER_CHANGE_INTERNAL_DELETE,
98         REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
99         REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
100         REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
101 } ReorderBufferChangeTypeInternal;
102
103 /* entry for a hash table we use to map from xid to our transaction state */
104 typedef struct ReorderBufferTXNByIdEnt
105 {
106         TransactionId xid;
107         ReorderBufferTXN *txn;
108 } ReorderBufferTXNByIdEnt;
109
110 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
111 typedef struct ReorderBufferTupleCidKey
112 {
113         RelFileNode relnode;
114         ItemPointerData tid;
115 } ReorderBufferTupleCidKey;
116
117 typedef struct ReorderBufferTupleCidEnt
118 {
119         ReorderBufferTupleCidKey key;
120         CommandId       cmin;
121         CommandId       cmax;
122         CommandId       combocid;               /* just for debugging */
123 } ReorderBufferTupleCidEnt;
124
125 /* k-way in-order change iteration support structures */
126 typedef struct ReorderBufferIterTXNEntry
127 {
128         XLogRecPtr      lsn;
129         ReorderBufferChange *change;
130         ReorderBufferTXN *txn;
131         int                     fd;
132         XLogSegNo       segno;
133 } ReorderBufferIterTXNEntry;
134
135 typedef struct ReorderBufferIterTXNState
136 {
137         binaryheap *heap;
138         Size            nr_txns;
139         dlist_head      old_change;
140         ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
141 } ReorderBufferIterTXNState;
142
143 /* toast datastructures */
144 typedef struct ReorderBufferToastEnt
145 {
146         Oid                     chunk_id;               /* toast_table.chunk_id */
147         int32           last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
148                                                                  * have seen */
149         Size            num_chunks;             /* number of chunks we've already seen */
150         Size            size;                   /* combined size of chunks seen */
151         dlist_head      chunks;                 /* linked list of chunks */
152         struct varlena *reconstructed;          /* reconstructed varlena now pointed
153                                                                                  * to in main tup */
154 } ReorderBufferToastEnt;
155
156 /* Disk serialization support datastructures */
157 typedef struct ReorderBufferDiskChange
158 {
159         Size            size;
160         ReorderBufferChange change;
161         /* data follows */
162 } ReorderBufferDiskChange;
163
164 /*
165  * Maximum number of changes kept in memory, per transaction. After that,
166  * changes are spooled to disk.
167  *
168  * The current value should be sufficient to decode the entire transaction
169  * without hitting disk in OLTP workloads, while starting to spool to disk in
170  * other workloads reasonably fast.
171  *
172  * At some point in the future it probaly makes sense to have a more elaborate
173  * resource management here, but it's not entirely clear what that would look
174  * like.
175  */
176 static const Size max_changes_in_memory = 4096;
177
178 /*
179  * We use a very simple form of a slab allocator for frequently allocated
180  * objects, simply keeping a fixed number in a linked list when unused,
181  * instead pfree()ing them. Without that in many workloads aset.c becomes a
182  * major bottleneck, especially when spilling to disk while decoding batch
183  * workloads.
184  */
185 static const Size max_cached_changes = 4096 * 2;
186 static const Size max_cached_tuplebufs = 4096 * 2;              /* ~8MB */
187 static const Size max_cached_transactions = 512;
188
189
190 /* ---------------------------------------
191  * primary reorderbuffer support routines
192  * ---------------------------------------
193  */
194 static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
195 static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
196 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
197                                           TransactionId xid, bool create, bool *is_new,
198                                           XLogRecPtr lsn, bool create_as_top);
199
200 static void AssertTXNLsnOrder(ReorderBuffer *rb);
201
202 /* ---------------------------------------
203  * support functions for lsn-order iterating over the ->changes of a
204  * transaction and its subtransactions
205  *
206  * used for iteration over the k-way heap merge of a transaction and its
207  * subtransactions
208  * ---------------------------------------
209  */
210 static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn);
211 static ReorderBufferChange *
212                         ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
213 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
214                                                    ReorderBufferIterTXNState *state);
215 static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
216
217 /*
218  * ---------------------------------------
219  * Disk serialization support functions
220  * ---------------------------------------
221  */
222 static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
223 static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
224 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
225                                                          int fd, ReorderBufferChange *change);
226 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
227                                                         int *fd, XLogSegNo *segno);
228 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
229                                                    char *change);
230 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
231
232 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
233 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
234                                           ReorderBufferTXN *txn, CommandId cid);
235
236 /* ---------------------------------------
237  * toast reassembly support
238  * ---------------------------------------
239  */
240 static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
241 static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
242 static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
243                                                   Relation relation, ReorderBufferChange *change);
244 static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
245                                                           Relation relation, ReorderBufferChange *change);
246
247
248 /*
249  * Allocate a new ReorderBuffer
250  */
251 ReorderBuffer *
252 ReorderBufferAllocate(void)
253 {
254         ReorderBuffer *buffer;
255         HASHCTL         hash_ctl;
256         MemoryContext new_ctx;
257
258         StaticAssertExpr((int) REORDER_BUFFER_CHANGE_INTERNAL_INSERT == (int) REORDER_BUFFER_CHANGE_INSERT, "out of sync enums");
259         StaticAssertExpr((int) REORDER_BUFFER_CHANGE_INTERNAL_UPDATE == (int) REORDER_BUFFER_CHANGE_UPDATE, "out of sync enums");
260         StaticAssertExpr((int) REORDER_BUFFER_CHANGE_INTERNAL_DELETE == (int) REORDER_BUFFER_CHANGE_DELETE, "out of sync enums");
261
262         /* allocate memory in own context, to have better accountability */
263         new_ctx = AllocSetContextCreate(CurrentMemoryContext,
264                                                                         "ReorderBuffer",
265                                                                         ALLOCSET_DEFAULT_MINSIZE,
266                                                                         ALLOCSET_DEFAULT_INITSIZE,
267                                                                         ALLOCSET_DEFAULT_MAXSIZE);
268
269         buffer =
270                 (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
271
272         memset(&hash_ctl, 0, sizeof(hash_ctl));
273
274         buffer->context = new_ctx;
275
276         hash_ctl.keysize = sizeof(TransactionId);
277         hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
278         hash_ctl.hash = tag_hash;
279         hash_ctl.hcxt = buffer->context;
280
281         buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
282                                                                  HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
283
284         buffer->by_txn_last_xid = InvalidTransactionId;
285         buffer->by_txn_last_txn = NULL;
286
287         buffer->nr_cached_transactions = 0;
288         buffer->nr_cached_changes = 0;
289         buffer->nr_cached_tuplebufs = 0;
290
291         buffer->outbuf = NULL;
292         buffer->outbufsize = 0;
293
294         buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
295
296         dlist_init(&buffer->toplevel_by_lsn);
297         dlist_init(&buffer->cached_transactions);
298         dlist_init(&buffer->cached_changes);
299         slist_init(&buffer->cached_tuplebufs);
300
301         return buffer;
302 }
303
304 /*
305  * Free a ReorderBuffer
306  */
307 void
308 ReorderBufferFree(ReorderBuffer *rb)
309 {
310         MemoryContext context = rb->context;
311
312         /*
313          * We free separately allocated data by entirely scrapping reorderbuffer's
314          * memory context.
315          */
316         MemoryContextDelete(context);
317 }
318
319 /*
320  * Get a unused, possibly preallocated, ReorderBufferTXN.
321  */
322 static ReorderBufferTXN *
323 ReorderBufferGetTXN(ReorderBuffer *rb)
324 {
325         ReorderBufferTXN *txn;
326
327         /* check the slab cache */
328         if (rb->nr_cached_transactions > 0)
329         {
330                 rb->nr_cached_transactions--;
331                 txn = (ReorderBufferTXN *)
332                         dlist_container(ReorderBufferTXN, node,
333                                                         dlist_pop_head_node(&rb->cached_transactions));
334         }
335         else
336         {
337                 txn = (ReorderBufferTXN *)
338                         MemoryContextAlloc(rb->context, sizeof(ReorderBufferTXN));
339         }
340
341         memset(txn, 0, sizeof(ReorderBufferTXN));
342
343         dlist_init(&txn->changes);
344         dlist_init(&txn->tuplecids);
345         dlist_init(&txn->subtxns);
346
347         return txn;
348 }
349
350 /*
351  * Free a ReorderBufferTXN.
352  *
353  * Deallocation might be delayed for efficiency purposes, for details check
354  * the comments above max_cached_changes's definition.
355  */
356 void
357 ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
358 {
359         /* clean the lookup cache if we were cached (quite likely) */
360         if (rb->by_txn_last_xid == txn->xid)
361         {
362                 rb->by_txn_last_xid = InvalidTransactionId;
363                 rb->by_txn_last_txn = NULL;
364         }
365
366         /* free data that's contained */
367
368         if (txn->tuplecid_hash != NULL)
369         {
370                 hash_destroy(txn->tuplecid_hash);
371                 txn->tuplecid_hash = NULL;
372         }
373
374         if (txn->invalidations)
375         {
376                 pfree(txn->invalidations);
377                 txn->invalidations = NULL;
378         }
379
380         /* check whether to put into the slab cache */
381         if (rb->nr_cached_transactions < max_cached_transactions)
382         {
383                 rb->nr_cached_transactions++;
384                 dlist_push_head(&rb->cached_transactions, &txn->node);
385                 VALGRIND_MAKE_MEM_UNDEFINED(txn, sizeof(ReorderBufferTXN));
386                 VALGRIND_MAKE_MEM_DEFINED(&txn->node, sizeof(txn->node));
387         }
388         else
389         {
390                 pfree(txn);
391         }
392 }
393
394 /*
395  * Get a unused, possibly preallocated, ReorderBufferChange.
396  */
397 ReorderBufferChange *
398 ReorderBufferGetChange(ReorderBuffer *rb)
399 {
400         ReorderBufferChange *change;
401
402         /* check the slab cache */
403         if (rb->nr_cached_changes)
404         {
405                 rb->nr_cached_changes--;
406                 change = (ReorderBufferChange *)
407                         dlist_container(ReorderBufferChange, node,
408                                                         dlist_pop_head_node(&rb->cached_changes));
409         }
410         else
411         {
412                 change = (ReorderBufferChange *)
413                         MemoryContextAlloc(rb->context, sizeof(ReorderBufferChange));
414         }
415
416         memset(change, 0, sizeof(ReorderBufferChange));
417         return change;
418 }
419
420 /*
421  * Free an ReorderBufferChange.
422  *
423  * Deallocation might be delayed for efficiency purposes, for details check
424  * the comments above max_cached_changes's definition.
425  */
426 void
427 ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
428 {
429         /* free contained data */
430         switch ((ReorderBufferChangeTypeInternal) change->action_internal)
431         {
432                 case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
433                 case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE:
434                 case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
435                         if (change->tp.newtuple)
436                         {
437                                 ReorderBufferReturnTupleBuf(rb, change->tp.newtuple);
438                                 change->tp.newtuple = NULL;
439                         }
440
441                         if (change->tp.oldtuple)
442                         {
443                                 ReorderBufferReturnTupleBuf(rb, change->tp.oldtuple);
444                                 change->tp.oldtuple = NULL;
445                         }
446                         break;
447                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
448                         if (change->snapshot)
449                         {
450                                 ReorderBufferFreeSnap(rb, change->snapshot);
451                                 change->snapshot = NULL;
452                         }
453                         break;
454                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
455                         break;
456                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
457                         break;
458         }
459
460         /* check whether to put into the slab cache */
461         if (rb->nr_cached_changes < max_cached_changes)
462         {
463                 rb->nr_cached_changes++;
464                 dlist_push_head(&rb->cached_changes, &change->node);
465                 VALGRIND_MAKE_MEM_UNDEFINED(change, sizeof(ReorderBufferChange));
466                 VALGRIND_MAKE_MEM_DEFINED(&change->node, sizeof(change->node));
467         }
468         else
469         {
470                 pfree(change);
471         }
472 }
473
474
475 /*
476  * Get a unused, possibly preallocated, ReorderBufferTupleBuf
477  */
478 ReorderBufferTupleBuf *
479 ReorderBufferGetTupleBuf(ReorderBuffer *rb)
480 {
481         ReorderBufferTupleBuf *tuple;
482
483         /* check the slab cache */
484         if (rb->nr_cached_tuplebufs)
485         {
486                 rb->nr_cached_tuplebufs--;
487                 tuple = slist_container(ReorderBufferTupleBuf, node,
488                                                                 slist_pop_head_node(&rb->cached_tuplebufs));
489 #ifdef USE_ASSERT_CHECKING
490                 memset(tuple, 0xdeadbeef, sizeof(ReorderBufferTupleBuf));
491 #endif
492         }
493         else
494         {
495                 tuple = (ReorderBufferTupleBuf *)
496                         MemoryContextAlloc(rb->context, sizeof(ReorderBufferTupleBuf));
497         }
498
499         return tuple;
500 }
501
502 /*
503  * Free an ReorderBufferTupleBuf.
504  *
505  * Deallocation might be delayed for efficiency purposes, for details check
506  * the comments above max_cached_changes's definition.
507  */
508 void
509 ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
510 {
511         /* check whether to put into the slab cache */
512         if (rb->nr_cached_tuplebufs < max_cached_tuplebufs)
513         {
514                 rb->nr_cached_tuplebufs++;
515                 slist_push_head(&rb->cached_tuplebufs, &tuple->node);
516                 VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf));
517                 VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
518         }
519         else
520         {
521                 pfree(tuple);
522         }
523 }
524
525 /*
526  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
527  * If create is true, and a transaction doesn't already exist, create it
528  * (with the given LSN, and as top transaction if that's specified);
529  * when this happens, is_new is set to true.
530  */
531 static ReorderBufferTXN *
532 ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
533                                           bool *is_new, XLogRecPtr lsn, bool create_as_top)
534 {
535         ReorderBufferTXN *txn;
536         ReorderBufferTXNByIdEnt *ent;
537         bool            found;
538
539         Assert(TransactionIdIsValid(xid));
540         Assert(!create || lsn != InvalidXLogRecPtr);
541
542         /*
543          * Check the one-entry lookup cache first
544          */
545         if (TransactionIdIsValid(rb->by_txn_last_xid) &&
546                 rb->by_txn_last_xid == xid)
547         {
548                 txn = rb->by_txn_last_txn;
549
550                 if (txn != NULL)
551                 {
552                         /* found it, and it's valid */
553                         if (is_new)
554                                 *is_new = false;
555                         return txn;
556                 }
557
558                 /*
559                  * cached as non-existant, and asked not to create? Then nothing else
560                  * to do.
561                  */
562                 if (!create)
563                         return NULL;
564                 /* otherwise fall through to create it */
565         }
566
567         /*
568          * If the cache wasn't hit or it yielded an "does-not-exist" and we want
569          * to create an entry.
570          */
571
572         /* search the lookup table */
573         ent = (ReorderBufferTXNByIdEnt *)
574                 hash_search(rb->by_txn,
575                                         (void *) &xid,
576                                         create ? HASH_ENTER : HASH_FIND,
577                                         &found);
578         if (found)
579                 txn = ent->txn;
580         else if (create)
581         {
582                 /* initialize the new entry, if creation was requested */
583                 Assert(ent != NULL);
584
585                 ent->txn = ReorderBufferGetTXN(rb);
586                 ent->txn->xid = xid;
587                 txn = ent->txn;
588                 txn->first_lsn = lsn;
589                 txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
590
591                 if (create_as_top)
592                 {
593                         dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
594                         AssertTXNLsnOrder(rb);
595                 }
596         }
597         else
598                 txn = NULL;                             /* not found and not asked to create */
599
600         /* update cache */
601         rb->by_txn_last_xid = xid;
602         rb->by_txn_last_txn = txn;
603
604         if (is_new)
605                 *is_new = !found;
606
607         Assert(!create || !!txn);
608         return txn;
609 }
610
611 /*
612  * Queue a change into a transaction so it can be replayed upon commit.
613  */
614 void
615 ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
616                                            ReorderBufferChange *change)
617 {
618         ReorderBufferTXN *txn;
619
620         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
621
622         change->lsn = lsn;
623         Assert(InvalidXLogRecPtr != lsn);
624         dlist_push_tail(&txn->changes, &change->node);
625         txn->nentries++;
626         txn->nentries_mem++;
627
628         ReorderBufferCheckSerializeTXN(rb, txn);
629 }
630
631 static void
632 AssertTXNLsnOrder(ReorderBuffer *rb)
633 {
634 #ifdef USE_ASSERT_CHECKING
635         dlist_iter      iter;
636         XLogRecPtr      prev_first_lsn = InvalidXLogRecPtr;
637
638         dlist_foreach(iter, &rb->toplevel_by_lsn)
639         {
640                 ReorderBufferTXN *cur_txn;
641
642                 cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
643                 Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
644
645                 if (cur_txn->end_lsn != InvalidXLogRecPtr)
646                         Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
647
648                 if (prev_first_lsn != InvalidXLogRecPtr)
649                         Assert(prev_first_lsn < cur_txn->first_lsn);
650
651                 Assert(!cur_txn->is_known_as_subxact);
652                 prev_first_lsn = cur_txn->first_lsn;
653         }
654 #endif
655 }
656
657 ReorderBufferTXN *
658 ReorderBufferGetOldestTXN(ReorderBuffer *rb)
659 {
660         ReorderBufferTXN *txn;
661
662         if (dlist_is_empty(&rb->toplevel_by_lsn))
663                 return NULL;
664
665         AssertTXNLsnOrder(rb);
666
667         txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
668
669         Assert(!txn->is_known_as_subxact);
670         Assert(txn->first_lsn != InvalidXLogRecPtr);
671         return txn;
672 }
673
674 void
675 ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
676 {
677         rb->current_restart_decoding_lsn = ptr;
678 }
679
680 void
681 ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
682                                                  TransactionId subxid, XLogRecPtr lsn)
683 {
684         ReorderBufferTXN *txn;
685         ReorderBufferTXN *subtxn;
686         bool            new_top;
687         bool            new_sub;
688
689         txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
690         subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
691
692         if (new_sub)
693         {
694                 /*
695                  * we assign subtransactions to top level transaction even if we don't
696                  * have data for it yet, assignment records frequently reference xids
697                  * that have not yet produced any records. Knowing those aren't top
698                  * level xids allows us to make processing cheaper in some places.
699                  */
700                 dlist_push_tail(&txn->subtxns, &subtxn->node);
701                 txn->nsubtxns++;
702         }
703         else if (!subtxn->is_known_as_subxact)
704         {
705                 subtxn->is_known_as_subxact = true;
706                 Assert(subtxn->nsubtxns == 0);
707
708                 /* remove from lsn order list of top-level transactions */
709                 dlist_delete(&subtxn->node);
710
711                 /* add to toplevel transaction */
712                 dlist_push_tail(&txn->subtxns, &subtxn->node);
713                 txn->nsubtxns++;
714         }
715         else if (new_top)
716         {
717                 elog(ERROR, "existing subxact assigned to unknown toplevel xact");
718         }
719 }
720
721 /*
722  * Associate a subtransaction with its toplevel transaction at commit
723  * time. There may be no further changes added after this.
724  */
725 void
726 ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
727                                                  TransactionId subxid, XLogRecPtr commit_lsn,
728                                                  XLogRecPtr end_lsn)
729 {
730         ReorderBufferTXN *txn;
731         ReorderBufferTXN *subtxn;
732
733         subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
734                                                                    InvalidXLogRecPtr, false);
735
736         /*
737          * No need to do anything if that subtxn didn't contain any changes
738          */
739         if (!subtxn)
740                 return;
741
742         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
743
744         if (txn == NULL)
745                 elog(ERROR, "subxact logged without previous toplevel record");
746
747         /*
748          * Pass the our base snapshot to the parent transaction if it doesn't have
749          * one, or ours is older. That can happen if there are no changes in the
750          * toplevel transaction but in one of the child transactions. This allows
751          * the parent to simply use it's base snapshot initially.
752          */
753         if (txn->base_snapshot == NULL ||
754                 txn->base_snapshot_lsn > subtxn->base_snapshot_lsn)
755         {
756                 txn->base_snapshot = subtxn->base_snapshot;
757                 txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
758                 subtxn->base_snapshot = NULL;
759                 subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
760         }
761
762         subtxn->final_lsn = commit_lsn;
763         subtxn->end_lsn = end_lsn;
764
765         if (!subtxn->is_known_as_subxact)
766         {
767                 subtxn->is_known_as_subxact = true;
768                 Assert(subtxn->nsubtxns == 0);
769
770                 /* remove from lsn order list of top-level transactions */
771                 dlist_delete(&subtxn->node);
772
773                 /* add to subtransaction list */
774                 dlist_push_tail(&txn->subtxns, &subtxn->node);
775                 txn->nsubtxns++;
776         }
777 }
778
779
780 /*
781  * Support for efficiently iterating over a transaction's and its
782  * subtransactions' changes.
783  *
784  * We do by doing a k-way merge between transactions/subtransactions. For that
785  * we model the current heads of the different transactions as a binary heap
786  * so we easily know which (sub-)transaction has the change with the smallest
787  * lsn next.
788  *
789  * We assume the changes in individual transactions are already sorted by LSN.
790  */
791
792 /*
793  * Binary heap comparison function.
794  */
795 static int
796 ReorderBufferIterCompare(Datum a, Datum b, void *arg)
797 {
798         ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
799         XLogRecPtr      pos_a = state->entries[DatumGetInt32(a)].lsn;
800         XLogRecPtr      pos_b = state->entries[DatumGetInt32(b)].lsn;
801
802         if (pos_a < pos_b)
803                 return 1;
804         else if (pos_a == pos_b)
805                 return 0;
806         return -1;
807 }
808
809 /*
810  * Allocate & initialize an iterator which iterates in lsn order over a
811  * transaction and all its subtransactions.
812  */
813 static ReorderBufferIterTXNState *
814 ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
815 {
816         Size            nr_txns = 0;
817         ReorderBufferIterTXNState *state;
818         dlist_iter      cur_txn_i;
819         int32           off;
820
821         /*
822          * Calculate the size of our heap: one element for every transaction that
823          * contains changes.  (Besides the transactions already in the reorder
824          * buffer, we count the one we were directly passed.)
825          */
826         if (txn->nentries > 0)
827                 nr_txns++;
828
829         dlist_foreach(cur_txn_i, &txn->subtxns)
830         {
831                 ReorderBufferTXN *cur_txn;
832
833                 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
834
835                 if (cur_txn->nentries > 0)
836                         nr_txns++;
837         }
838
839         /*
840          * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
841          * need to allocate/build a heap then.
842          */
843
844         /* allocate iteration state */
845         state = (ReorderBufferIterTXNState *)
846                 MemoryContextAllocZero(rb->context,
847                                                            sizeof(ReorderBufferIterTXNState) +
848                                                            sizeof(ReorderBufferIterTXNEntry) * nr_txns);
849
850         state->nr_txns = nr_txns;
851         dlist_init(&state->old_change);
852
853         for (off = 0; off < state->nr_txns; off++)
854         {
855                 state->entries[off].fd = -1;
856                 state->entries[off].segno = 0;
857         }
858
859         /* allocate heap */
860         state->heap = binaryheap_allocate(state->nr_txns,
861                                                                           ReorderBufferIterCompare,
862                                                                           state);
863
864         /*
865          * Now insert items into the binary heap, in an unordered fashion.  (We
866          * will run a heap assembly step at the end; this is more efficient.)
867          */
868
869         off = 0;
870
871         /* add toplevel transaction if it contains changes */
872         if (txn->nentries > 0)
873         {
874                 ReorderBufferChange *cur_change;
875
876                 if (txn->nentries != txn->nentries_mem)
877                         ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
878                                                                                 &state->entries[off].segno);
879
880                 cur_change = dlist_head_element(ReorderBufferChange, node,
881                                                                                 &txn->changes);
882
883                 state->entries[off].lsn = cur_change->lsn;
884                 state->entries[off].change = cur_change;
885                 state->entries[off].txn = txn;
886
887                 binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
888         }
889
890         /* add subtransactions if they contain changes */
891         dlist_foreach(cur_txn_i, &txn->subtxns)
892         {
893                 ReorderBufferTXN *cur_txn;
894
895                 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
896
897                 if (cur_txn->nentries > 0)
898                 {
899                         ReorderBufferChange *cur_change;
900
901                         if (txn->nentries != txn->nentries_mem)
902                                 ReorderBufferRestoreChanges(rb, cur_txn,
903                                                                                         &state->entries[off].fd,
904                                                                                         &state->entries[off].segno);
905
906                         cur_change = dlist_head_element(ReorderBufferChange, node,
907                                                                                         &cur_txn->changes);
908
909                         state->entries[off].lsn = cur_change->lsn;
910                         state->entries[off].change = cur_change;
911                         state->entries[off].txn = cur_txn;
912
913                         binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
914                 }
915         }
916
917         /* assemble a valid binary heap */
918         binaryheap_build(state->heap);
919
920         return state;
921 }
922
923 /*
924  * Return the next change when iterating over a transaction and its
925  * subtransactions.
926  *
927  * Returns NULL when no further changes exist.
928  */
929 static ReorderBufferChange *
930 ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
931 {
932         ReorderBufferChange *change;
933         ReorderBufferIterTXNEntry *entry;
934         int32           off;
935
936         /* nothing there anymore */
937         if (state->heap->bh_size == 0)
938                 return NULL;
939
940         off = DatumGetInt32(binaryheap_first(state->heap));
941         entry = &state->entries[off];
942
943         /* free memory we might have "leaked" in the previous *Next call */
944         if (!dlist_is_empty(&state->old_change))
945         {
946                 change = dlist_container(ReorderBufferChange, node,
947                                                                  dlist_pop_head_node(&state->old_change));
948                 ReorderBufferReturnChange(rb, change);
949                 Assert(dlist_is_empty(&state->old_change));
950         }
951
952         change = entry->change;
953
954         /*
955          * update heap with information about which transaction has the next
956          * relevant change in LSN order
957          */
958
959         /* there are in-memory changes */
960         if (dlist_has_next(&entry->txn->changes, &entry->change->node))
961         {
962                 dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
963                 ReorderBufferChange *next_change =
964                 dlist_container(ReorderBufferChange, node, next);
965
966                 /* txn stays the same */
967                 state->entries[off].lsn = next_change->lsn;
968                 state->entries[off].change = next_change;
969
970                 binaryheap_replace_first(state->heap, Int32GetDatum(off));
971                 return change;
972         }
973
974         /* try to load changes from disk */
975         if (entry->txn->nentries != entry->txn->nentries_mem)
976         {
977                 /*
978                  * Ugly: restoring changes will reuse *Change records, thus delete the
979                  * current one from the per-tx list and only free in the next call.
980                  */
981                 dlist_delete(&change->node);
982                 dlist_push_tail(&state->old_change, &change->node);
983
984                 if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
985                                                                                 &state->entries[off].segno))
986                 {
987                         /* successfully restored changes from disk */
988                         ReorderBufferChange *next_change =
989                         dlist_head_element(ReorderBufferChange, node,
990                                                            &entry->txn->changes);
991
992                         elog(DEBUG2, "restored %u/%u changes from disk",
993                                  (uint32) entry->txn->nentries_mem,
994                                  (uint32) entry->txn->nentries);
995
996                         Assert(entry->txn->nentries_mem);
997                         /* txn stays the same */
998                         state->entries[off].lsn = next_change->lsn;
999                         state->entries[off].change = next_change;
1000                         binaryheap_replace_first(state->heap, Int32GetDatum(off));
1001
1002                         return change;
1003                 }
1004         }
1005
1006         /* ok, no changes there anymore, remove */
1007         binaryheap_remove_first(state->heap);
1008
1009         return change;
1010 }
1011
1012 /*
1013  * Deallocate the iterator
1014  */
1015 static void
1016 ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1017                                                    ReorderBufferIterTXNState *state)
1018 {
1019         int32           off;
1020
1021         for (off = 0; off < state->nr_txns; off++)
1022         {
1023                 if (state->entries[off].fd != -1)
1024                         CloseTransientFile(state->entries[off].fd);
1025         }
1026
1027         /* free memory we might have "leaked" in the last *Next call */
1028         if (!dlist_is_empty(&state->old_change))
1029         {
1030                 ReorderBufferChange *change;
1031
1032                 change = dlist_container(ReorderBufferChange, node,
1033                                                                  dlist_pop_head_node(&state->old_change));
1034                 ReorderBufferReturnChange(rb, change);
1035                 Assert(dlist_is_empty(&state->old_change));
1036         }
1037
1038         binaryheap_free(state->heap);
1039         pfree(state);
1040 }
1041
1042 /*
1043  * Cleanup the contents of a transaction, usually after the transaction
1044  * committed or aborted.
1045  */
1046 static void
1047 ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1048 {
1049         bool            found;
1050         dlist_mutable_iter iter;
1051
1052         /* cleanup subtransactions & their changes */
1053         dlist_foreach_modify(iter, &txn->subtxns)
1054         {
1055                 ReorderBufferTXN *subtxn;
1056
1057                 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1058
1059                 /*
1060                  * Subtransactions are always associated to the toplevel TXN, even if
1061                  * they originally were happening inside another subtxn, so we won't
1062                  * ever recurse more than one level deep here.
1063                  */
1064                 Assert(subtxn->is_known_as_subxact);
1065                 Assert(subtxn->nsubtxns == 0);
1066
1067                 ReorderBufferCleanupTXN(rb, subtxn);
1068         }
1069
1070         /* cleanup changes in the toplevel txn */
1071         dlist_foreach_modify(iter, &txn->changes)
1072         {
1073                 ReorderBufferChange *change;
1074
1075                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1076
1077                 ReorderBufferReturnChange(rb, change);
1078         }
1079
1080         /*
1081          * Cleanup the tuplecids we stored for decoding catalog snapshot
1082          * access. They are always stored in the toplevel transaction.
1083          */
1084         dlist_foreach_modify(iter, &txn->tuplecids)
1085         {
1086                 ReorderBufferChange *change;
1087
1088                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1089                 Assert(change->action_internal == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1090                 ReorderBufferReturnChange(rb, change);
1091         }
1092
1093         if (txn->base_snapshot != NULL)
1094         {
1095                 SnapBuildSnapDecRefcount(txn->base_snapshot);
1096                 txn->base_snapshot = NULL;
1097                 txn->base_snapshot_lsn = InvalidXLogRecPtr;
1098         }
1099
1100         /* delete from list of known subxacts */
1101         if (txn->is_known_as_subxact)
1102         {
1103                 /* NB: nsubxacts count of parent will be too high now */
1104                 dlist_delete(&txn->node);
1105         }
1106         /* delete from LSN ordered list of toplevel TXNs */
1107         else
1108         {
1109                 dlist_delete(&txn->node);
1110         }
1111
1112         /* now remove reference from buffer */
1113         hash_search(rb->by_txn,
1114                                 (void *) &txn->xid,
1115                                 HASH_REMOVE,
1116                                 &found);
1117         Assert(found);
1118
1119         /* remove entries spilled to disk */
1120         if (txn->nentries != txn->nentries_mem)
1121                 ReorderBufferRestoreCleanup(rb, txn);
1122
1123         /* deallocate */
1124         ReorderBufferReturnTXN(rb, txn);
1125 }
1126
1127 /*
1128  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1129  * tqual.c's HeapTupleSatisfiesHistoricMVCC.
1130  */
1131 static void
1132 ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
1133 {
1134         dlist_iter      iter;
1135         HASHCTL         hash_ctl;
1136
1137         if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1138                 return;
1139
1140         memset(&hash_ctl, 0, sizeof(hash_ctl));
1141
1142         hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1143         hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1144         hash_ctl.hash = tag_hash;
1145         hash_ctl.hcxt = rb->context;
1146
1147         /*
1148          * create the hash with the exact number of to-be-stored tuplecids from
1149          * the start
1150          */
1151         txn->tuplecid_hash =
1152                 hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1153                                         HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
1154
1155         dlist_foreach(iter, &txn->tuplecids)
1156         {
1157                 ReorderBufferTupleCidKey key;
1158                 ReorderBufferTupleCidEnt *ent;
1159                 bool            found;
1160                 ReorderBufferChange *change;
1161
1162                 change = dlist_container(ReorderBufferChange, node, iter.cur);
1163
1164                 Assert(change->action_internal == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1165
1166                 /* be careful about padding */
1167                 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1168
1169                 key.relnode = change->tuplecid.node;
1170
1171                 ItemPointerCopy(&change->tuplecid.tid,
1172                                                 &key.tid);
1173
1174                 ent = (ReorderBufferTupleCidEnt *)
1175                         hash_search(txn->tuplecid_hash,
1176                                                 (void *) &key,
1177                                                 HASH_ENTER | HASH_FIND,
1178                                                 &found);
1179                 if (!found)
1180                 {
1181                         ent->cmin = change->tuplecid.cmin;
1182                         ent->cmax = change->tuplecid.cmax;
1183                         ent->combocid = change->tuplecid.combocid;
1184                 }
1185                 else
1186                 {
1187                         Assert(ent->cmin == change->tuplecid.cmin);
1188                         Assert(ent->cmax == InvalidCommandId ||
1189                                    ent->cmax == change->tuplecid.cmax);
1190
1191                         /*
1192                          * if the tuple got valid in this transaction and now got deleted
1193                          * we already have a valid cmin stored. The cmax will be
1194                          * InvalidCommandId though.
1195                          */
1196                         ent->cmax = change->tuplecid.cmax;
1197                 }
1198         }
1199 }
1200
1201 /*
1202  * Copy a provided snapshot so we can modify it privately. This is needed so
1203  * that catalog modifying transactions can look into intermediate catalog
1204  * states.
1205  */
1206 static Snapshot
1207 ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
1208                                           ReorderBufferTXN *txn, CommandId cid)
1209 {
1210         Snapshot        snap;
1211         dlist_iter      iter;
1212         int                     i = 0;
1213         Size            size;
1214
1215         size = sizeof(SnapshotData) +
1216                 sizeof(TransactionId) * orig_snap->xcnt +
1217                 sizeof(TransactionId) * (txn->nsubtxns + 1);
1218
1219         snap = MemoryContextAllocZero(rb->context, size);
1220         memcpy(snap, orig_snap, sizeof(SnapshotData));
1221
1222         snap->copied = true;
1223         snap->active_count = 0;
1224         snap->regd_count = 1;
1225         snap->xip = (TransactionId *) (snap + 1);
1226
1227         memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1228
1229         /*
1230          * snap->subxip contains all txids that belong to our transaction which we
1231          * need to check via cmin/cmax. Thats why we store the toplevel
1232          * transaction in there as well.
1233          */
1234         snap->subxip = snap->xip + snap->xcnt;
1235         snap->subxip[i++] = txn->xid;
1236
1237         /*
1238          * nsubxcnt isn't decreased when subtransactions abort, so count
1239          * manually. Since it's an upper boundary it is safe to use it for the
1240          * allocation above.
1241          */
1242         snap->subxcnt = 1;
1243
1244         dlist_foreach(iter, &txn->subtxns)
1245         {
1246                 ReorderBufferTXN *sub_txn;
1247
1248                 sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1249                 snap->subxip[i++] = sub_txn->xid;
1250                 snap->subxcnt++;
1251         }
1252
1253         /* sort so we can bsearch() later */
1254         qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1255
1256         /* store the specified current CommandId */
1257         snap->curcid = cid;
1258
1259         return snap;
1260 }
1261
1262 /*
1263  * Free a previously ReorderBufferCopySnap'ed snapshot
1264  */
1265 static void
1266 ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
1267 {
1268         if (snap->copied)
1269                 pfree(snap);
1270         else
1271                 SnapBuildSnapDecRefcount(snap);
1272 }
1273
1274 /*
1275  * Perform the replay of a transaction and it's non-aborted subtransactions.
1276  *
1277  * Subtransactions previously have to be processed by
1278  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1279  * transaction with ReorderBufferAssignChild.
1280  *
1281  * We currently can only decode a transaction's contents in when their commit
1282  * record is read because that's currently the only place where we know about
1283  * cache invalidations. Thus, once a toplevel commit is read, we iterate over
1284  * the top and subtransactions (using a k-way merge) and replay the changes in
1285  * lsn order.
1286  */
1287 void
1288 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
1289                                         XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1290                                         TimestampTz commit_time)
1291 {
1292         ReorderBufferTXN *txn;
1293         ReorderBufferIterTXNState *iterstate = NULL;
1294         ReorderBufferChange *change;
1295
1296         volatile CommandId      command_id = FirstCommandId;
1297         volatile Snapshot       snapshot_now = NULL;
1298         volatile bool           txn_started = false;
1299         volatile bool           subtxn_started = false;
1300
1301         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1302                                                                 false);
1303
1304         /* unknown transaction, nothing to replay */
1305         if (txn == NULL)
1306                 return;
1307
1308         txn->final_lsn = commit_lsn;
1309         txn->end_lsn = end_lsn;
1310         txn->commit_time = commit_time;
1311
1312         /* serialize the last bunch of changes if we need start earlier anyway */
1313         if (txn->nentries_mem != txn->nentries)
1314                 ReorderBufferSerializeTXN(rb, txn);
1315
1316         /*
1317          * If this transaction didn't have any real changes in our database, it's
1318          * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1319          * transferred its snapshot to this transaction if it had one and the
1320          * toplevel tx didn't.
1321          */
1322         if (txn->base_snapshot == NULL)
1323         {
1324                 Assert(txn->ninvalidations == 0);
1325                 ReorderBufferCleanupTXN(rb, txn);
1326                 return;
1327         }
1328
1329         snapshot_now = txn->base_snapshot;
1330
1331         /* build data to be able to lookup the CommandIds of catalog tuples */
1332         ReorderBufferBuildTupleCidHash(rb, txn);
1333
1334         /* setup the initial snapshot */
1335         SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1336
1337         PG_TRY();
1338         {
1339                 txn_started = false;
1340
1341                 /*
1342                  * Decoding needs access to syscaches et al., which in turn use
1343                  * heavyweight locks and such. Thus we need to have enough state around
1344                  * to keep track of those. The easiest way is to simply use a
1345                  * transaction internally. That also allows us to easily enforce that
1346                  * nothing writes to the database by checking for xid assignments.
1347                  *
1348                  * When we're called via the SQL SRF there's already a transaction
1349                  * started, so start an explicit subtransaction there.
1350                  */
1351                 if (IsTransactionOrTransactionBlock())
1352                 {
1353                         BeginInternalSubTransaction("replay");
1354                         subtxn_started = true;
1355                 }
1356                 else
1357                 {
1358                         StartTransactionCommand();
1359                         txn_started = true;
1360                 }
1361
1362                 rb->begin(rb, txn);
1363
1364                 iterstate = ReorderBufferIterTXNInit(rb, txn);
1365                 while ((change = ReorderBufferIterTXNNext(rb, iterstate)))
1366                 {
1367                         Relation        relation = NULL;
1368                         Oid                     reloid;
1369
1370                         switch ((ReorderBufferChangeTypeInternal) change->action_internal)
1371                         {
1372                                 case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
1373                                 case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE:
1374                                 case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
1375                                         Assert(snapshot_now);
1376
1377                                         reloid = RelidByRelfilenode(change->tp.relnode.spcNode,
1378                                                                                                 change->tp.relnode.relNode);
1379
1380                                         /*
1381                                          * Catalog tuple without data, emitted while catalog was
1382                                          * in the process of being rewritten.
1383                                          */
1384                                         if (reloid == InvalidOid &&
1385                                                 change->tp.newtuple == NULL &&
1386                                                 change->tp.oldtuple == NULL)
1387                                                 continue;
1388                                         else if (reloid == InvalidOid)
1389                                                 elog(ERROR, "could not lookup relation %s",
1390                                                          relpathperm(change->tp.relnode, MAIN_FORKNUM));
1391
1392                                         relation = RelationIdGetRelation(reloid);
1393
1394                                         if (relation == NULL)
1395                                                 elog(ERROR, "could open relation descriptor %s",
1396                                                          relpathperm(change->tp.relnode, MAIN_FORKNUM));
1397
1398                                         if (RelationIsLogicallyLogged(relation))
1399                                         {
1400                                                 /*
1401                                                  * For now ignore sequence changes entirely. Most of
1402                                                  * the time they don't log changes using records we
1403                                                  * understand, so it doesn't make sense to handle the
1404                                                  * few cases we do.
1405                                                  */
1406                                                 if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1407                                                 {
1408                                                 }
1409                                                 /* user-triggered change */
1410                                                 else if (!IsToastRelation(relation))
1411                                                 {
1412                                                         ReorderBufferToastReplace(rb, txn, relation, change);
1413                                                         rb->apply_change(rb, txn, relation, change);
1414                                                         ReorderBufferToastReset(rb, txn);
1415                                                 }
1416                                                 /* we're not interested in toast deletions */
1417                                                 else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1418                                                 {
1419                                                         /*
1420                                                          * Need to reassemble the full toasted Datum in
1421                                                          * memory, to ensure the chunks don't get reused
1422                                                          * till we're done remove it from the list of this
1423                                                          * transaction's changes. Otherwise it will get
1424                                                          * freed/reused while restoring spooled data from
1425                                                          * disk.
1426                                                          */
1427                                                         dlist_delete(&change->node);
1428                                                         ReorderBufferToastAppendChunk(rb, txn, relation,
1429                                                                                                                   change);
1430                                                 }
1431
1432                                         }
1433                                         RelationClose(relation);
1434                                         break;
1435                                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
1436                                         /* get rid of the old */
1437                                         TeardownHistoricSnapshot(false);
1438
1439                                         if (snapshot_now->copied)
1440                                         {
1441                                                 ReorderBufferFreeSnap(rb, snapshot_now);
1442                                                 snapshot_now =
1443                                                         ReorderBufferCopySnap(rb, change->snapshot,
1444                                                                                                   txn, command_id);
1445                                         }
1446                                         /*
1447                                          * Restored from disk, need to be careful not to double
1448                                          * free. We could introduce refcounting for that, but for
1449                                          * now this seems infrequent enough not to care.
1450                                          */
1451                                         else if (change->snapshot->copied)
1452                                         {
1453                                                 snapshot_now =
1454                                                         ReorderBufferCopySnap(rb, change->snapshot,
1455                                                                                                   txn, command_id);
1456                                         }
1457                                         else
1458                                         {
1459                                                 snapshot_now = change->snapshot;
1460                                         }
1461
1462
1463                                         /* and continue with the new one */
1464                                         SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1465                                         break;
1466
1467                                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
1468                                         Assert(change->command_id != InvalidCommandId);
1469
1470                                         if (command_id < change->command_id)
1471                                         {
1472                                                 command_id = change->command_id;
1473
1474                                                 if (!snapshot_now->copied)
1475                                                 {
1476                                                         /* we don't use the global one anymore */
1477                                                         snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1478                                                                                                                                  txn, command_id);
1479                                                 }
1480
1481                                                 snapshot_now->curcid = command_id;
1482
1483                                                 TeardownHistoricSnapshot(false);
1484                                                 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1485
1486                                                 /*
1487                                                  * Every time the CommandId is incremented, we could
1488                                                  * see new catalog contents, so execute all
1489                                                  * invalidations.
1490                                                  */
1491                                                 ReorderBufferExecuteInvalidations(rb, txn);
1492                                         }
1493
1494                                         break;
1495
1496                                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
1497                                         elog(ERROR, "tuplecid value in changequeue");
1498                                         break;
1499                         }
1500                 }
1501
1502                 ReorderBufferIterTXNFinish(rb, iterstate);
1503
1504                 /* call commit callback */
1505                 rb->commit(rb, txn, commit_lsn);
1506
1507                 /* this is just a sanity check against bad output plugin behaviour */
1508                 if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
1509                         elog(ERROR, "output plugin used xid %u",
1510                                  GetCurrentTransactionId());
1511
1512                 /* make sure there's no cache pollution */
1513                 ReorderBufferExecuteInvalidations(rb, txn);
1514
1515                 /* cleanup */
1516                 TeardownHistoricSnapshot(false);
1517
1518                 /*
1519                  * Abort subtransaction or the transaction as a whole has the right
1520                  * semantics. We want all locks acquired in here to be released, not
1521                  * reassigned to the parent and we do not want any database access
1522                  * have persistent effects.
1523                  */
1524                 if (subtxn_started)
1525                         RollbackAndReleaseCurrentSubTransaction();
1526                 else if (txn_started)
1527                         AbortCurrentTransaction();
1528
1529                 if (snapshot_now->copied)
1530                         ReorderBufferFreeSnap(rb, snapshot_now);
1531
1532                 /* remove potential on-disk data, and deallocate */
1533                 ReorderBufferCleanupTXN(rb, txn);
1534         }
1535         PG_CATCH();
1536         {
1537                 /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1538                 if (iterstate)
1539                         ReorderBufferIterTXNFinish(rb, iterstate);
1540
1541                 TeardownHistoricSnapshot(true);
1542
1543                 if (snapshot_now->copied)
1544                         ReorderBufferFreeSnap(rb, snapshot_now);
1545
1546                 if (subtxn_started)
1547                         RollbackAndReleaseCurrentSubTransaction();
1548                 else if (txn_started)
1549                         AbortCurrentTransaction();
1550
1551                 /*
1552                  * Invalidations in an aborted transactions aren't allowed to do
1553                  * catalog access, so we don't need to still have the snapshot setup.
1554                  */
1555                 ReorderBufferExecuteInvalidations(rb, txn);
1556
1557                 /* remove potential on-disk data, and deallocate */
1558                 ReorderBufferCleanupTXN(rb, txn);
1559
1560                 PG_RE_THROW();
1561         }
1562         PG_END_TRY();
1563 }
1564
1565 /*
1566  * Abort a transaction that possibly has previous changes. Needs to be first
1567  * called for subtransactions and then for the toplevel xid.
1568  *
1569  * NB: Transactions handled here have to have actively aborted (i.e. have
1570  * produced an abort record). Implicitly aborted transactions are handled via
1571  * ReorderBufferAbortOld(); transactions we're just not interesteded in, but
1572  * which have committed are handled in ReorderBufferForget().
1573  *
1574  * This function purges this transaction and its contents from memory and
1575  * disk.
1576  */
1577 void
1578 ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1579 {
1580         ReorderBufferTXN *txn;
1581
1582         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1583                                                                 false);
1584
1585         /* unknown, nothing to remove */
1586         if (txn == NULL)
1587                 return;
1588
1589         /* cosmetic... */
1590         txn->final_lsn = lsn;
1591
1592         /* remove potential on-disk data, and deallocate */
1593         ReorderBufferCleanupTXN(rb, txn);
1594 }
1595
1596 /*
1597  * Abort all transactions that aren't actually running anymore because the
1598  * server restarted.
1599  *
1600  * NB: These really have to be transactions that have aborted due to a server
1601  * crash/immediate restart, as we don't deal with invalidations here.
1602  */
1603 void
1604 ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
1605 {
1606         dlist_mutable_iter it;
1607
1608         /*
1609          * Iterate through all (potential) toplevel TXNs and abort all that are
1610          * older than what possibly can be running. Once we've found the first
1611          * that is alive we stop, there might be some that acquired an xid earlier
1612          * but started writing later, but it's unlikely and they will cleaned up
1613          * in a later call to ReorderBufferAbortOld().
1614          */
1615         dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1616         {
1617                 ReorderBufferTXN * txn;
1618
1619                 txn = dlist_container(ReorderBufferTXN, node, it.cur);
1620
1621                 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1622                 {
1623                         elog(DEBUG1, "aborting old transaction %u", txn->xid);
1624
1625                         /* remove potential on-disk data, and deallocate this tx */
1626                         ReorderBufferCleanupTXN(rb, txn);
1627                 }
1628                 else
1629                         return;
1630         }
1631 }
1632
1633 /*
1634  * Forget the contents of a transaction if we aren't interested in it's
1635  * contents. Needs to be first called for subtransactions and then for the
1636  * toplevel xid.
1637  *
1638  * This is significantly different to ReorderBufferAbort() because
1639  * transactions that have committed need to be treated differenly from aborted
1640  * ones since they may have modified the catalog.
1641  *
1642  * Note that this is only allowed to be called in the moment a transaction
1643  * commit has just been read, not earlier; otherwise later records referring
1644  * to this xid might re-create the transaction incompletely.
1645  */
1646 void
1647 ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1648 {
1649         ReorderBufferTXN *txn;
1650
1651         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1652                                                                 false);
1653
1654         /* unknown, nothing to forget */
1655         if (txn == NULL)
1656                 return;
1657
1658         /* cosmetic... */
1659         txn->final_lsn = lsn;
1660
1661         /*
1662          * Proccess cache invalidation messages if there are any. Even if we're
1663          * not interested in the transaction's contents, it could have manipulated
1664          * the catalog and we need to update the caches according to that.
1665          */
1666         if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1667         {
1668                 /* setup snapshot to perform the invalidations in */
1669                 SetupHistoricSnapshot(txn->base_snapshot, txn->tuplecid_hash);
1670                 PG_TRY();
1671                 {
1672                         ReorderBufferExecuteInvalidations(rb, txn);
1673                         TeardownHistoricSnapshot(false);
1674                 }
1675                 PG_CATCH();
1676                 {
1677                         /* cleanup */
1678                         TeardownHistoricSnapshot(true);
1679                         PG_RE_THROW();
1680                 }
1681                 PG_END_TRY();
1682         }
1683         else
1684                 Assert(txn->ninvalidations == 0);
1685
1686         /* remove potential on-disk data, and deallocate */
1687         ReorderBufferCleanupTXN(rb, txn);
1688 }
1689
1690
1691 /*
1692  * Check whether a transaction is already known in this module.xs
1693  */
1694 bool
1695 ReorderBufferIsXidKnown(ReorderBuffer *rb, TransactionId xid)
1696 {
1697         ReorderBufferTXN *txn;
1698
1699         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1700                                                                 false);
1701         return txn != NULL;
1702 }
1703
1704 /*
1705  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
1706  * because the previous snapshot doesn't describe the catalog correctly for
1707  * following rows.
1708  */
1709 void
1710 ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
1711                                                  XLogRecPtr lsn, Snapshot snap)
1712 {
1713         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1714
1715         change->snapshot = snap;
1716         change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
1717
1718         ReorderBufferQueueChange(rb, xid, lsn, change);
1719 }
1720
1721 /*
1722  * Setup the base snapshot of a transaction. The base snapshot is the snapshot
1723  * that is used to decode all changes until either this transaction modifies
1724  * the catalog or another catalog modifying transaction commits.
1725  *
1726  * Needs to be called before any changes are added with
1727  * ReorderBufferQueueChange().
1728  */
1729 void
1730 ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
1731                                                          XLogRecPtr lsn, Snapshot snap)
1732 {
1733         ReorderBufferTXN *txn;
1734         bool            is_new;
1735
1736         txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
1737         Assert(txn->base_snapshot == NULL);
1738         Assert(snap != NULL);
1739
1740         txn->base_snapshot = snap;
1741         txn->base_snapshot_lsn = lsn;
1742 }
1743
1744 /*
1745  * Access the catalog with this CommandId at this point in the changestream.
1746  *
1747  * May only be called for command ids > 1
1748  */
1749 void
1750 ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
1751                                                          XLogRecPtr lsn, CommandId cid)
1752 {
1753         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1754
1755         change->command_id = cid;
1756         change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
1757
1758         ReorderBufferQueueChange(rb, xid, lsn, change);
1759 }
1760
1761
1762 /*
1763  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
1764  */
1765 void
1766 ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
1767                                                          XLogRecPtr lsn, RelFileNode node,
1768                                                          ItemPointerData tid, CommandId cmin,
1769                                                          CommandId cmax, CommandId combocid)
1770 {
1771         ReorderBufferChange *change = ReorderBufferGetChange(rb);
1772         ReorderBufferTXN *txn;
1773
1774         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1775
1776         change->tuplecid.node = node;
1777         change->tuplecid.tid = tid;
1778         change->tuplecid.cmin = cmin;
1779         change->tuplecid.cmax = cmax;
1780         change->tuplecid.combocid = combocid;
1781         change->lsn = lsn;
1782         change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
1783
1784         dlist_push_tail(&txn->tuplecids, &change->node);
1785         txn->ntuplecids++;
1786 }
1787
1788 /*
1789  * Setup the invalidation of the toplevel transaction.
1790  *
1791  * This needs to be done before ReorderBufferCommit is called!
1792  */
1793 void
1794 ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
1795                                                           XLogRecPtr lsn, Size nmsgs,
1796                                                           SharedInvalidationMessage *msgs)
1797 {
1798         ReorderBufferTXN *txn;
1799
1800         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1801
1802         if (txn->ninvalidations != 0)
1803                 elog(ERROR, "only ever add one set of invalidations");
1804
1805         Assert(nmsgs > 0);
1806
1807         txn->ninvalidations = nmsgs;
1808         txn->invalidations = (SharedInvalidationMessage *)
1809                 MemoryContextAlloc(rb->context,
1810                                                    sizeof(SharedInvalidationMessage) * nmsgs);
1811         memcpy(txn->invalidations, msgs,
1812                    sizeof(SharedInvalidationMessage) * nmsgs);
1813 }
1814
1815 /*
1816  * Apply all invalidations we know. Possibly we only need parts at this point
1817  * in the changestream but we don't know which those are.
1818  */
1819 static void
1820 ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
1821 {
1822         int                     i;
1823
1824         for (i = 0; i < txn->ninvalidations; i++)
1825                 LocalExecuteInvalidationMessage(&txn->invalidations[i]);
1826 }
1827
1828 /*
1829  * Mark a transaction as containing catalog changes
1830  */
1831 void
1832 ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
1833                                                                   XLogRecPtr lsn)
1834 {
1835         ReorderBufferTXN *txn;
1836
1837         txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1838
1839         txn->has_catalog_changes = true;
1840 }
1841
1842 /*
1843  * Query whether a transaction is already *known* to contain catalog
1844  * changes. This can be wrong until directly before the commit!
1845  */
1846 bool
1847 ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
1848 {
1849         ReorderBufferTXN *txn;
1850
1851         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1852                                                                 false);
1853         if (txn == NULL)
1854                 return false;
1855
1856         return txn->has_catalog_changes;
1857 }
1858
1859 /*
1860  * Have we already added the first snapshot?
1861  */
1862 bool
1863 ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
1864 {
1865         ReorderBufferTXN *txn;
1866
1867         txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1868                                                                 false);
1869
1870         /* transaction isn't known yet, ergo no snapshot */
1871         if (txn == NULL)
1872                 return false;
1873
1874         /*
1875          * TODO: It would be a nice improvement if we would check the toplevel
1876          * transaction in subtransactions, but we'd need to keep track of a bit
1877          * more state.
1878          */
1879         return txn->base_snapshot != NULL;
1880 }
1881
1882
1883 /*
1884  * ---------------------------------------
1885  * Disk serialization support
1886  * ---------------------------------------
1887  */
1888
1889 /*
1890  * Ensure the IO buffer is >= sz.
1891  */
1892 static void
1893 ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
1894 {
1895         if (!rb->outbufsize)
1896         {
1897                 rb->outbuf = MemoryContextAlloc(rb->context, sz);
1898                 rb->outbufsize = sz;
1899         }
1900         else if (rb->outbufsize < sz)
1901         {
1902                 rb->outbuf = repalloc(rb->outbuf, sz);
1903                 rb->outbufsize = sz;
1904         }
1905 }
1906
1907 /*
1908  * Check whether the transaction tx should spill its data to disk.
1909  */
1910 static void
1911 ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1912 {
1913         /*
1914          * TODO: improve accounting so we cheaply can take subtransactions into
1915          * account here.
1916          */
1917         if (txn->nentries_mem >= max_changes_in_memory)
1918         {
1919                 ReorderBufferSerializeTXN(rb, txn);
1920                 Assert(txn->nentries_mem == 0);
1921         }
1922 }
1923
1924 /*
1925  * Spill data of a large transaction (and its subtransactions) to disk.
1926  */
1927 static void
1928 ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1929 {
1930         dlist_iter      subtxn_i;
1931         dlist_mutable_iter change_i;
1932         int                     fd = -1;
1933         XLogSegNo       curOpenSegNo = 0;
1934         Size            spilled = 0;
1935         char            path[MAXPGPATH];
1936
1937         elog(DEBUG2, "spill %u changes in tx %u to disk",
1938                  (uint32) txn->nentries_mem, txn->xid);
1939
1940         /* do the same to all child TXs */
1941         dlist_foreach(subtxn_i, &txn->subtxns)
1942         {
1943                 ReorderBufferTXN *subtxn;
1944
1945                 subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
1946                 ReorderBufferSerializeTXN(rb, subtxn);
1947         }
1948
1949         /* serialize changestream */
1950         dlist_foreach_modify(change_i, &txn->changes)
1951         {
1952                 ReorderBufferChange *change;
1953
1954                 change = dlist_container(ReorderBufferChange, node, change_i.cur);
1955
1956                 /*
1957                  * store in segment in which it belongs by start lsn, don't split over
1958                  * multiple segments tho
1959                  */
1960                 if (fd == -1 || XLByteInSeg(change->lsn, curOpenSegNo))
1961                 {
1962                         XLogRecPtr      recptr;
1963
1964                         if (fd != -1)
1965                                 CloseTransientFile(fd);
1966
1967                         XLByteToSeg(change->lsn, curOpenSegNo);
1968                         XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
1969
1970                         /*
1971                          * No need to care about TLIs here, only used during a single run,
1972                          * so each LSN only maps to a specific WAL record.
1973                          */
1974                         sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
1975                                         NameStr(MyReplicationSlot->data.name), txn->xid,
1976                                         (uint32) (recptr >> 32), (uint32) recptr);
1977
1978                         /* open segment, create it if necessary */
1979                         fd = OpenTransientFile(path,
1980                                                                    O_CREAT | O_WRONLY | O_APPEND | PG_BINARY,
1981                                                                    S_IRUSR | S_IWUSR);
1982
1983                         if (fd < 0)
1984                                 ereport(ERROR,
1985                                                 (errcode_for_file_access(),
1986                                                  errmsg("could not open file \"%s\": %m",
1987                                                                 path)));
1988                 }
1989
1990                 ReorderBufferSerializeChange(rb, txn, fd, change);
1991                 dlist_delete(&change->node);
1992                 ReorderBufferReturnChange(rb, change);
1993
1994                 spilled++;
1995         }
1996
1997         Assert(spilled == txn->nentries_mem);
1998         Assert(dlist_is_empty(&txn->changes));
1999         txn->nentries_mem = 0;
2000
2001         if (fd != -1)
2002                 CloseTransientFile(fd);
2003 }
2004
2005 /*
2006  * Serialize individual change to disk.
2007  */
2008 static void
2009 ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2010                                                          int fd, ReorderBufferChange *change)
2011 {
2012         ReorderBufferDiskChange *ondisk;
2013         Size            sz = sizeof(ReorderBufferDiskChange);
2014
2015         ReorderBufferSerializeReserve(rb, sz);
2016
2017         ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2018         memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2019
2020         switch ((ReorderBufferChangeTypeInternal) change->action_internal)
2021         {
2022                 case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
2023                         /* fall through */
2024                 case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE:
2025                         /* fall through */
2026                 case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
2027                         {
2028                                 char       *data;
2029                                 Size            oldlen = 0;
2030                                 Size            newlen = 0;
2031
2032                                 if (change->tp.oldtuple)
2033                                         oldlen = offsetof(ReorderBufferTupleBuf, data)
2034                                                 + change->tp.oldtuple->tuple.t_len
2035                                                 - offsetof(HeapTupleHeaderData, t_bits);
2036
2037                                 if (change->tp.newtuple)
2038                                         newlen = offsetof(ReorderBufferTupleBuf, data)
2039                                                 + change->tp.newtuple->tuple.t_len
2040                                                 - offsetof(HeapTupleHeaderData, t_bits);
2041
2042                                 sz += oldlen;
2043                                 sz += newlen;
2044
2045                                 /* make sure we have enough space */
2046                                 ReorderBufferSerializeReserve(rb, sz);
2047
2048                                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2049                                 /* might have been reallocated above */
2050                                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2051
2052                                 if (oldlen)
2053                                 {
2054                                         memcpy(data, change->tp.oldtuple, oldlen);
2055                                         data += oldlen;
2056                                         Assert(&change->tp.oldtuple->header == change->tp.oldtuple->tuple.t_data);
2057                                 }
2058
2059                                 if (newlen)
2060                                 {
2061                                         memcpy(data, change->tp.newtuple, newlen);
2062                                         data += newlen;
2063                                         Assert(&change->tp.newtuple->header == change->tp.newtuple->tuple.t_data);
2064                                 }
2065                                 break;
2066                         }
2067                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2068                         {
2069                                 char       *data;
2070
2071                                 sz += sizeof(SnapshotData) +
2072                                         sizeof(TransactionId) * change->snapshot->xcnt +
2073                                         sizeof(TransactionId) * change->snapshot->subxcnt
2074                                         ;
2075
2076                                 /* make sure we have enough space */
2077                                 ReorderBufferSerializeReserve(rb, sz);
2078                                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2079                                 /* might have been reallocated above */
2080                                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2081
2082                                 memcpy(data, change->snapshot, sizeof(SnapshotData));
2083                                 data += sizeof(SnapshotData);
2084
2085                                 if (change->snapshot->xcnt)
2086                                 {
2087                                         memcpy(data, change->snapshot->xip,
2088                                                    sizeof(TransactionId) + change->snapshot->xcnt);
2089                                         data += sizeof(TransactionId) + change->snapshot->xcnt;
2090                                 }
2091
2092                                 if (change->snapshot->subxcnt)
2093                                 {
2094                                         memcpy(data, change->snapshot->subxip,
2095                                                    sizeof(TransactionId) + change->snapshot->subxcnt);
2096                                         data += sizeof(TransactionId) + change->snapshot->subxcnt;
2097                                 }
2098                                 break;
2099                         }
2100                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2101                         /* ReorderBufferChange contains everything important */
2102                         break;
2103                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2104                         /* ReorderBufferChange contains everything important */
2105                         break;
2106         }
2107
2108         ondisk->size = sz;
2109
2110         if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2111         {
2112                 CloseTransientFile(fd);
2113                 ereport(ERROR,
2114                                 (errcode_for_file_access(),
2115                                  errmsg("could not write to xid %u's data file: %m",
2116                                                 txn->xid)));
2117         }
2118
2119         Assert(ondisk->change.action_internal == change->action_internal);
2120 }
2121
2122 /*
2123  * Restore a number of changes spilled to disk back into memory.
2124  */
2125 static Size
2126 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2127                                                         int *fd, XLogSegNo *segno)
2128 {
2129         Size            restored = 0;
2130         XLogSegNo       last_segno;
2131         dlist_mutable_iter cleanup_iter;
2132
2133         Assert(txn->first_lsn != InvalidXLogRecPtr);
2134         Assert(txn->final_lsn != InvalidXLogRecPtr);
2135
2136         /* free current entries, so we have memory for more */
2137         dlist_foreach_modify(cleanup_iter, &txn->changes)
2138         {
2139                 ReorderBufferChange *cleanup =
2140                 dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2141
2142                 dlist_delete(&cleanup->node);
2143                 ReorderBufferReturnChange(rb, cleanup);
2144         }
2145         txn->nentries_mem = 0;
2146         Assert(dlist_is_empty(&txn->changes));
2147
2148         XLByteToSeg(txn->final_lsn, last_segno);
2149
2150         while (restored < max_changes_in_memory && *segno <= last_segno)
2151         {
2152                 int                     readBytes;
2153                 ReorderBufferDiskChange *ondisk;
2154
2155                 if (*fd == -1)
2156                 {
2157                         XLogRecPtr      recptr;
2158                         char            path[MAXPGPATH];
2159
2160                         /* first time in */
2161                         if (*segno == 0)
2162                         {
2163                                 XLByteToSeg(txn->first_lsn, *segno);
2164                         }
2165
2166                         Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2167                         XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
2168
2169                         /*
2170                          * No need to care about TLIs here, only used during a single run,
2171                          * so each LSN only maps to a specific WAL record.
2172                          */
2173                         sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2174                                         NameStr(MyReplicationSlot->data.name), txn->xid,
2175                                         (uint32) (recptr >> 32), (uint32) recptr);
2176
2177                         *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2178                         if (*fd < 0 && errno == ENOENT)
2179                         {
2180                                 *fd = -1;
2181                                 (*segno)++;
2182                                 continue;
2183                         }
2184                         else if (*fd < 0)
2185                                 ereport(ERROR,
2186                                                 (errcode_for_file_access(),
2187                                                  errmsg("could not open file \"%s\": %m",
2188                                                                 path)));
2189
2190                 }
2191
2192                 ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
2193
2194
2195                 /*
2196                  * Read the statically sized part of a change which has information
2197                  * about the total size. If we couldn't read a record, we're at the
2198                  * end of this file.
2199                  */
2200
2201                 readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2202
2203                 /* eof */
2204                 if (readBytes == 0)
2205                 {
2206                         CloseTransientFile(*fd);
2207                         *fd = -1;
2208                         (*segno)++;
2209                         continue;
2210                 }
2211                 else if (readBytes < 0)
2212                         ereport(ERROR,
2213                                         (errcode_for_file_access(),
2214                                          errmsg("could not read from reorderbuffer spill file: %m")));
2215                 else if (readBytes != sizeof(ReorderBufferDiskChange))
2216                         ereport(ERROR,
2217                                         (errcode_for_file_access(),
2218                                          errmsg("incomplete read from reorderbuffer spill file: read %d instead of %u",
2219                                                         readBytes,
2220                                                         (uint32) sizeof(ReorderBufferDiskChange))));
2221
2222                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2223
2224                 ReorderBufferSerializeReserve(rb,
2225                                                                           sizeof(ReorderBufferDiskChange) + ondisk->size);
2226                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2227
2228                 readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2229                                                  ondisk->size - sizeof(ReorderBufferDiskChange));
2230
2231                 if (readBytes < 0)
2232                         ereport(ERROR,
2233                                         (errcode_for_file_access(),
2234                                          errmsg("could not read from reorderbuffer spill file: %m")));
2235                 else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2236                         ereport(ERROR,
2237                                         (errcode_for_file_access(),
2238                                          errmsg("could not read from reorderbuffer spill file: read %d instead of %u",
2239                                                         readBytes,
2240                                                         (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2241
2242                 /*
2243                  * ok, read a full change from disk, now restore it into proper
2244                  * in-memory format
2245                  */
2246                 ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2247                 restored++;
2248         }
2249
2250         return restored;
2251 }
2252
2253 /*
2254  * Convert change from its on-disk format to in-memory format and queue it onto
2255  * the TXN's ->changes list.
2256  */
2257 static void
2258 ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2259                                                    char *data)
2260 {
2261         ReorderBufferDiskChange *ondisk;
2262         ReorderBufferChange *change;
2263
2264         ondisk = (ReorderBufferDiskChange *) data;
2265
2266         change = ReorderBufferGetChange(rb);
2267
2268         /* copy static part */
2269         memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2270
2271         data += sizeof(ReorderBufferDiskChange);
2272
2273         /* restore individual stuff */
2274         switch ((ReorderBufferChangeTypeInternal) change->action_internal)
2275         {
2276                 case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
2277                         /* fall through */
2278                 case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE:
2279                         /* fall through */
2280                 case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
2281                         if (change->tp.newtuple)
2282                         {
2283                                 Size            len = offsetof(ReorderBufferTupleBuf, data)
2284                                 +((ReorderBufferTupleBuf *) data)->tuple.t_len
2285                                 - offsetof(HeapTupleHeaderData, t_bits);
2286
2287                                 change->tp.newtuple = ReorderBufferGetTupleBuf(rb);
2288                                 memcpy(change->tp.newtuple, data, len);
2289                                 change->tp.newtuple->tuple.t_data = &change->tp.newtuple->header;
2290
2291                                 data += len;
2292                         }
2293
2294                         if (change->tp.oldtuple)
2295                         {
2296                                 Size            len = offsetof(ReorderBufferTupleBuf, data)
2297                                 +((ReorderBufferTupleBuf *) data)->tuple.t_len
2298                                 - offsetof(HeapTupleHeaderData, t_bits);
2299
2300                                 change->tp.oldtuple = ReorderBufferGetTupleBuf(rb);
2301                                 memcpy(change->tp.oldtuple, data, len);
2302                                 change->tp.oldtuple->tuple.t_data = &change->tp.oldtuple->header;
2303                                 data += len;
2304                         }
2305                         break;
2306                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2307                         {
2308                                 Snapshot        oldsnap = (Snapshot) data;
2309                                 Size            size = sizeof(SnapshotData) +
2310                                 sizeof(TransactionId) * oldsnap->xcnt +
2311                                 sizeof(TransactionId) * (oldsnap->subxcnt + 0)
2312                                                    ;
2313
2314                                 Assert(change->snapshot != NULL);
2315
2316                                 change->snapshot = MemoryContextAllocZero(rb->context, size);
2317
2318                                 memcpy(change->snapshot, data, size);
2319                                 change->snapshot->xip = (TransactionId *)
2320                                         (((char *) change->snapshot) + sizeof(SnapshotData));
2321                                 change->snapshot->subxip =
2322                                         change->snapshot->xip + change->snapshot->xcnt + 0;
2323                                 change->snapshot->copied = true;
2324                                 break;
2325                         }
2326                         /* the base struct contains all the data, easy peasy */
2327                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2328                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2329                         break;
2330         }
2331
2332         dlist_push_tail(&txn->changes, &change->node);
2333         txn->nentries_mem++;
2334 }
2335
2336 /*
2337  * Remove all on-disk stored for the passed in transaction.
2338  */
2339 static void
2340 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
2341 {
2342         XLogSegNo       first;
2343         XLogSegNo       cur;
2344         XLogSegNo       last;
2345
2346         Assert(txn->first_lsn != InvalidXLogRecPtr);
2347         Assert(txn->final_lsn != InvalidXLogRecPtr);
2348
2349         XLByteToSeg(txn->first_lsn, first);
2350         XLByteToSeg(txn->final_lsn, last);
2351
2352         /* iterate over all possible filenames, and delete them */
2353         for (cur = first; cur <= last; cur++)
2354         {
2355                 char            path[MAXPGPATH];
2356                 XLogRecPtr      recptr;
2357
2358                 XLogSegNoOffsetToRecPtr(cur, 0, recptr);
2359
2360                 sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2361                                 NameStr(MyReplicationSlot->data.name), txn->xid,
2362                                 (uint32) (recptr >> 32), (uint32) recptr);
2363                 if (unlink(path) != 0 && errno != ENOENT)
2364                         ereport(ERROR,
2365                                         (errcode_for_file_access(),
2366                                          errmsg("could not unlink file \"%s\": %m", path)));
2367         }
2368 }
2369
2370 /*
2371  * Delete all data spilled to disk after we've restarted/crashed. It will be
2372  * recreated when the respective slots are reused.
2373  */
2374 void
2375 StartupReorderBuffer(void)
2376 {
2377         DIR                *logical_dir;
2378         struct dirent *logical_de;
2379
2380         DIR                *spill_dir;
2381         struct dirent *spill_de;
2382
2383         logical_dir = AllocateDir("pg_replslot");
2384         while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2385         {
2386                 struct stat     statbuf;
2387                 char            path[MAXPGPATH];
2388
2389                 if (strcmp(logical_de->d_name, ".") == 0 ||
2390                         strcmp(logical_de->d_name, "..") == 0)
2391                         continue;
2392
2393                 /* if it cannot be a slot, skip the directory */
2394                 if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2395                         continue;
2396
2397                 /*
2398                  * ok, has to be a surviving logical slot, iterate and delete
2399                  * everythign starting with xid-*
2400                  */
2401                 sprintf(path, "pg_replslot/%s", logical_de->d_name);
2402
2403                 /* we're only creating directories here, skip if it's not our's */
2404                 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2405                         continue;
2406
2407                 spill_dir = AllocateDir(path);
2408                 while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2409                 {
2410                         if (strcmp(spill_de->d_name, ".") == 0 ||
2411                                 strcmp(spill_de->d_name, "..") == 0)
2412                                 continue;
2413
2414                         /* only look at names that can be ours */
2415                         if (strncmp(spill_de->d_name, "xid", 3) == 0)
2416                         {
2417                                 sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2418                                                 spill_de->d_name);
2419
2420                                 if (unlink(path) != 0)
2421                                         ereport(PANIC,
2422                                                         (errcode_for_file_access(),
2423                                                          errmsg("could not unlink file \"%s\": %m",
2424                                                                         path)));
2425                         }
2426                 }
2427                 FreeDir(spill_dir);
2428         }
2429         FreeDir(logical_dir);
2430 }
2431
2432 /* ---------------------------------------
2433  * toast reassembly support
2434  * ---------------------------------------
2435  */
2436
2437 /*
2438  * Initialize per tuple toast reconstruction support.
2439  */
2440 static void
2441 ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
2442 {
2443         HASHCTL         hash_ctl;
2444
2445         Assert(txn->toast_hash == NULL);
2446
2447         memset(&hash_ctl, 0, sizeof(hash_ctl));
2448         hash_ctl.keysize = sizeof(Oid);
2449         hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2450         hash_ctl.hash = tag_hash;
2451         hash_ctl.hcxt = rb->context;
2452         txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2453                                                                   HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
2454 }
2455
2456 /*
2457  * Per toast-chunk handling for toast reconstruction
2458  *
2459  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
2460  * toasted Datum comes along.
2461  */
2462 static void
2463 ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
2464                                                           Relation relation, ReorderBufferChange *change)
2465 {
2466         ReorderBufferToastEnt *ent;
2467         bool            found;
2468         int32           chunksize;
2469         bool            isnull;
2470         Pointer         chunk;
2471         TupleDesc       desc = RelationGetDescr(relation);
2472         Oid                     chunk_id;
2473         Oid                     chunk_seq;
2474
2475         if (txn->toast_hash == NULL)
2476                 ReorderBufferToastInitHash(rb, txn);
2477
2478         Assert(IsToastRelation(relation));
2479
2480         chunk_id = DatumGetObjectId(fastgetattr(&change->tp.newtuple->tuple, 1, desc, &isnull));
2481         Assert(!isnull);
2482         chunk_seq = DatumGetInt32(fastgetattr(&change->tp.newtuple->tuple, 2, desc, &isnull));
2483         Assert(!isnull);
2484
2485         ent = (ReorderBufferToastEnt *)
2486                 hash_search(txn->toast_hash,
2487                                         (void *) &chunk_id,
2488                                         HASH_ENTER,
2489                                         &found);
2490
2491         if (!found)
2492         {
2493                 Assert(ent->chunk_id == chunk_id);
2494                 ent->num_chunks = 0;
2495                 ent->last_chunk_seq = 0;
2496                 ent->size = 0;
2497                 ent->reconstructed = NULL;
2498                 dlist_init(&ent->chunks);
2499
2500                 if (chunk_seq != 0)
2501                         elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2502                                  chunk_seq, chunk_id);
2503         }
2504         else if (found && chunk_seq != ent->last_chunk_seq + 1)
2505                 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2506                          chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2507
2508         chunk = DatumGetPointer(fastgetattr(&change->tp.newtuple->tuple, 3, desc, &isnull));
2509         Assert(!isnull);
2510
2511         /* calculate size so we can allocate the right size at once later */
2512         if (!VARATT_IS_EXTENDED(chunk))
2513                 chunksize = VARSIZE(chunk) - VARHDRSZ;
2514         else if (VARATT_IS_SHORT(chunk))
2515                 /* could happen due to heap_form_tuple doing its thing */
2516                 chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2517         else
2518                 elog(ERROR, "unexpected type of toast chunk");
2519
2520         ent->size += chunksize;
2521         ent->last_chunk_seq = chunk_seq;
2522         ent->num_chunks++;
2523         dlist_push_tail(&ent->chunks, &change->node);
2524 }
2525
2526 /*
2527  * Rejigger change->newtuple to point to in-memory toast tuples instead to
2528  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
2529  *
2530  * We cannot replace unchanged toast tuples though, so those will still point
2531  * to on-disk toast data.
2532  */
2533 static void
2534 ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
2535                                                   Relation relation, ReorderBufferChange *change)
2536 {
2537         TupleDesc       desc;
2538         int                     natt;
2539         Datum      *attrs;
2540         bool       *isnull;
2541         bool       *free;
2542         HeapTuple       newtup;
2543         Relation        toast_rel;
2544         TupleDesc       toast_desc;
2545         MemoryContext oldcontext;
2546
2547         /* no toast tuples changed */
2548         if (txn->toast_hash == NULL)
2549                 return;
2550
2551         oldcontext = MemoryContextSwitchTo(rb->context);
2552
2553         /* we should only have toast tuples in an INSERT or UPDATE */
2554         Assert(change->tp.newtuple);
2555
2556         desc = RelationGetDescr(relation);
2557
2558         toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
2559         toast_desc = RelationGetDescr(toast_rel);
2560
2561         /* should we allocate from stack instead? */
2562         attrs = palloc0(sizeof(Datum) * desc->natts);
2563         isnull = palloc0(sizeof(bool) * desc->natts);
2564         free = palloc0(sizeof(bool) * desc->natts);
2565
2566         heap_deform_tuple(&change->tp.newtuple->tuple, desc,
2567                                           attrs, isnull);
2568
2569         for (natt = 0; natt < desc->natts; natt++)
2570         {
2571                 Form_pg_attribute attr = desc->attrs[natt];
2572                 ReorderBufferToastEnt *ent;
2573                 struct varlena *varlena;
2574
2575                 /* va_rawsize is the size of the original datum -- including header */
2576                 struct varatt_external toast_pointer;
2577                 struct varatt_indirect redirect_pointer;
2578                 struct varlena *new_datum = NULL;
2579                 struct varlena *reconstructed;
2580                 dlist_iter      it;
2581                 Size            data_done = 0;
2582
2583                 /* system columns aren't toasted */
2584                 if (attr->attnum < 0)
2585                         continue;
2586
2587                 if (attr->attisdropped)
2588                         continue;
2589
2590                 /* not a varlena datatype */
2591                 if (attr->attlen != -1)
2592                         continue;
2593
2594                 /* no data */
2595                 if (isnull[natt])
2596                         continue;
2597
2598                 /* ok, we know we have a toast datum */
2599                 varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
2600
2601                 /* no need to do anything if the tuple isn't external */
2602                 if (!VARATT_IS_EXTERNAL(varlena))
2603                         continue;
2604
2605                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
2606
2607                 /*
2608                  * Check whether the toast tuple changed, replace if so.
2609                  */
2610                 ent = (ReorderBufferToastEnt *)
2611                         hash_search(txn->toast_hash,
2612                                                 (void *) &toast_pointer.va_valueid,
2613                                                 HASH_FIND,
2614                                                 NULL);
2615                 if (ent == NULL)
2616                         continue;
2617
2618                 new_datum =
2619                         (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
2620
2621                 free[natt] = true;
2622
2623                 reconstructed = palloc0(toast_pointer.va_rawsize);
2624
2625                 ent->reconstructed = reconstructed;
2626
2627                 /* stitch toast tuple back together from its parts */
2628                 dlist_foreach(it, &ent->chunks)
2629                 {
2630                         bool            isnull;
2631                         ReorderBufferTupleBuf *tup =
2632                         dlist_container(ReorderBufferChange, node, it.cur)->tp.newtuple;
2633                         Pointer         chunk =
2634                         DatumGetPointer(fastgetattr(&tup->tuple, 3, toast_desc, &isnull));
2635
2636                         Assert(!isnull);
2637                         Assert(!VARATT_IS_EXTERNAL(chunk));
2638                         Assert(!VARATT_IS_SHORT(chunk));
2639
2640                         memcpy(VARDATA(reconstructed) + data_done,
2641                                    VARDATA(chunk),
2642                                    VARSIZE(chunk) - VARHDRSZ);
2643                         data_done += VARSIZE(chunk) - VARHDRSZ;
2644                 }
2645                 Assert(data_done == toast_pointer.va_extsize);
2646
2647                 /* make sure its marked as compressed or not */
2648                 if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2649                         SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
2650                 else
2651                         SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
2652
2653                 memset(&redirect_pointer, 0, sizeof(redirect_pointer));
2654                 redirect_pointer.pointer = reconstructed;
2655
2656                 SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
2657                 memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
2658                            sizeof(redirect_pointer));
2659
2660                 attrs[natt] = PointerGetDatum(new_datum);
2661         }
2662
2663         /*
2664          * Build tuple in separate memory & copy tuple back into the tuplebuf
2665          * passed to the output plugin. We can't directly heap_fill_tuple() into
2666          * the tuplebuf because attrs[] will point back into the current content.
2667          */
2668         newtup = heap_form_tuple(desc, attrs, isnull);
2669         Assert(change->tp.newtuple->tuple.t_len <= MaxHeapTupleSize);
2670         Assert(&change->tp.newtuple->header == change->tp.newtuple->tuple.t_data);
2671
2672         memcpy(change->tp.newtuple->tuple.t_data,
2673                    newtup->t_data,
2674                    newtup->t_len);
2675         change->tp.newtuple->tuple.t_len = newtup->t_len;
2676
2677         /*
2678          * free resources we won't further need, more persistent stuff will be
2679          * free'd in ReorderBufferToastReset().
2680          */
2681         RelationClose(toast_rel);
2682         pfree(newtup);
2683         for (natt = 0; natt < desc->natts; natt++)
2684         {
2685                 if (free[natt])
2686                         pfree(DatumGetPointer(attrs[natt]));
2687         }
2688         pfree(attrs);
2689         pfree(free);
2690         pfree(isnull);
2691
2692         MemoryContextSwitchTo(oldcontext);
2693 }
2694
2695 /*
2696  * Free all resources allocated for toast reconstruction.
2697  */
2698 static void
2699 ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
2700 {
2701         HASH_SEQ_STATUS hstat;
2702         ReorderBufferToastEnt *ent;
2703
2704         if (txn->toast_hash == NULL)
2705                 return;
2706
2707         /* sequentially walk over the hash and free everything */
2708         hash_seq_init(&hstat, txn->toast_hash);
2709         while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
2710         {
2711                 dlist_mutable_iter it;
2712
2713                 if (ent->reconstructed != NULL)
2714                         pfree(ent->reconstructed);
2715
2716                 dlist_foreach_modify(it, &ent->chunks)
2717                 {
2718                         ReorderBufferChange *change =
2719                         dlist_container(ReorderBufferChange, node, it.cur);
2720
2721                         dlist_delete(&change->node);
2722                         ReorderBufferReturnChange(rb, change);
2723                 }
2724         }
2725
2726         hash_destroy(txn->toast_hash);
2727         txn->toast_hash = NULL;
2728 }
2729
2730
2731 /* ---------------------------------------
2732  * Visibility support for logical decoding
2733  *
2734  *
2735  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
2736  * always rely on stored cmin/cmax values because of two scenarios:
2737  *
2738  * * A tuple got changed multiple times during a single transaction and thus
2739  *       has got a combocid. Combocid's are only valid for the duration of a
2740  *       single transaction.
2741  * * A tuple with a cmin but no cmax (and thus no combocid) got
2742  *       deleted/updated in another transaction than the one which created it
2743  *       which we are looking at right now. As only one of cmin, cmax or combocid
2744  *       is actually stored in the heap we don't have access to the the value we
2745  *       need anymore.
2746  *
2747  * To resolve those problems we have a per-transaction hash of (cmin,
2748  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
2749  * (cmin, cmax) values. That also takes care of combocids by simply
2750  * not caring about them at all. As we have the real cmin/cmax values
2751  * combocids aren't interesting.
2752  *
2753  * As we only care about catalog tuples here the overhead of this
2754  * hashtable should be acceptable.
2755  *
2756  * Heap rewrites complicate this a bit, check rewriteheap.c for
2757  * details.
2758  * -------------------------------------------------------------------------
2759  */
2760
2761 /* struct for qsort()ing mapping files by lsn somewhat efficiently */
2762 typedef struct RewriteMappingFile
2763 {
2764         XLogRecPtr      lsn;
2765         char            fname[MAXPGPATH];
2766 } RewriteMappingFile;
2767
2768 #if NOT_USED
2769 static void
2770 DisplayMapping(HTAB *tuplecid_data)
2771 {
2772         HASH_SEQ_STATUS hstat;
2773         ReorderBufferTupleCidEnt *ent;
2774
2775         hash_seq_init(&hstat, tuplecid_data);
2776         while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
2777         {
2778                 elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
2779                          ent->key.relnode.dbNode,
2780                          ent->key.relnode.spcNode,
2781                          ent->key.relnode.relNode,
2782                          BlockIdGetBlockNumber(&ent->key.tid.ip_blkid),
2783                          ent->key.tid.ip_posid,
2784                          ent->cmin,
2785                          ent->cmax
2786                         );
2787         }
2788 }
2789 #endif
2790
2791 /*
2792  * Apply a single mapping file to tuplecid_data.
2793  *
2794  * The mapping file has to have been verified to be a) committed b) for our
2795  * transaction c) applied in LSN order.
2796  */
2797 static void
2798 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
2799 {
2800         char            path[MAXPGPATH];
2801         int                     fd;
2802         int                     readBytes;
2803         LogicalRewriteMappingData map;
2804
2805         sprintf(path, "pg_llog/mappings/%s", fname);
2806         fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2807         if (fd < 0)
2808                 ereport(ERROR,
2809                                 (errmsg("could not open file \"%s\": %m", path)));
2810
2811         while (true)
2812         {
2813                 ReorderBufferTupleCidKey key;
2814                 ReorderBufferTupleCidEnt *ent;
2815                 ReorderBufferTupleCidEnt *new_ent;
2816                 bool found;
2817
2818                 /* be careful about padding */
2819                 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
2820
2821                 /* read all mappings till the end of the file */
2822                 readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
2823
2824                 if (readBytes < 0)
2825                         ereport(ERROR,
2826                                         (errcode_for_file_access(),
2827                                          errmsg("could not read file \"%s\": %m",
2828                                                         path)));
2829                 else if (readBytes == 0) /* EOF */
2830                         break;
2831                 else if (readBytes != sizeof(LogicalRewriteMappingData))
2832                         ereport(ERROR,
2833                                         (errcode_for_file_access(),
2834                                          errmsg("could not read file \"%s\", read %d instead of %d",
2835                                                         path, readBytes,
2836                                                         (int32) sizeof(LogicalRewriteMappingData))));
2837
2838                 key.relnode = map.old_node;
2839                 ItemPointerCopy(&map.old_tid,
2840                                                 &key.tid);
2841
2842
2843                 ent = (ReorderBufferTupleCidEnt *)
2844                         hash_search(tuplecid_data,
2845                                                 (void *) &key,
2846                                                 HASH_FIND,
2847                                                 NULL);
2848
2849                 /* no existing mapping, no need to update */
2850                 if (!ent)
2851                         continue;
2852
2853                 key.relnode = map.new_node;
2854                 ItemPointerCopy(&map.new_tid,
2855                                                 &key.tid);
2856
2857                 new_ent = (ReorderBufferTupleCidEnt *)
2858                         hash_search(tuplecid_data,
2859                                                 (void *) &key,
2860                                                 HASH_ENTER,
2861                                                 &found);
2862
2863                 if (found)
2864                 {
2865                         /*
2866                          * Make sure the existing mapping makes sense. We sometime update
2867                          * old records that did not yet have a cmax (e.g. pg_class' own
2868                          * entry while rewriting it) during rewrites, so allow that.
2869                          */
2870                         Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
2871                         Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
2872                 }
2873                 else
2874                 {
2875                         /* update mapping */
2876                         new_ent->cmin = ent->cmin;
2877                         new_ent->cmax = ent->cmax;
2878                         new_ent->combocid = ent->combocid;
2879                 }
2880         }
2881 }
2882
2883
2884 /*
2885  * Check whether the TransactionOId 'xid' is in the pre-sorted array 'xip'.
2886  */
2887 static bool
2888 TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
2889 {
2890         return bsearch(&xid, xip, num,
2891                                    sizeof(TransactionId), xidComparator) != NULL;
2892 }
2893
2894 /*
2895  * qsort() comparator for sorting RewriteMappingFiles in LSN order.
2896  */
2897 static int
2898 file_sort_by_lsn(const void *a_p, const void *b_p)
2899 {
2900         RewriteMappingFile *a = *(RewriteMappingFile **)a_p;
2901         RewriteMappingFile *b = *(RewriteMappingFile **)b_p;
2902
2903         if (a->lsn < b->lsn)
2904                 return -1;
2905         else if (a->lsn > b->lsn)
2906                 return 1;
2907         return 0;
2908 }
2909
2910 /*
2911  * Apply any existing logical remapping files if there are any targeted at our
2912  * transaction for relid.
2913  */
2914 static void
2915 UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
2916 {
2917         DIR                *mapping_dir;
2918         struct dirent *mapping_de;
2919         List       *files = NIL;
2920         ListCell   *file;
2921         RewriteMappingFile **files_a;
2922         size_t          off;
2923         Oid                     dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
2924
2925         mapping_dir = AllocateDir("pg_llog/mappings");
2926         while ((mapping_de = ReadDir(mapping_dir, "pg_llog/mappings")) != NULL)
2927         {
2928                 Oid                             f_dboid;
2929                 Oid                             f_relid;
2930                 TransactionId   f_mapped_xid;
2931                 TransactionId   f_create_xid;
2932                 XLogRecPtr              f_lsn;
2933                 uint32                  f_hi, f_lo;
2934                 RewriteMappingFile *f;
2935
2936                 if (strcmp(mapping_de->d_name, ".") == 0 ||
2937                         strcmp(mapping_de->d_name, "..") == 0)
2938                         continue;
2939
2940                 /* Ignore files that aren't ours*/
2941                 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
2942                         continue;
2943
2944                 if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
2945                                    &f_dboid, &f_relid, &f_hi, &f_lo,
2946                                    &f_mapped_xid, &f_create_xid) != 6)
2947                         elog(ERROR, "could not parse fname %s", mapping_de->d_name);
2948
2949                 f_lsn = ((uint64) f_hi) << 32 | f_lo;
2950
2951                 /* mapping for another database */
2952                 if (f_dboid != dboid)
2953                         continue;
2954
2955                 /* mapping for another relation */
2956                 if (f_relid != relid)
2957                         continue;
2958
2959                 /* did the creating transaction abort? */
2960                 if (!TransactionIdDidCommit(f_create_xid))
2961                         continue;
2962
2963                 /* not for our transaction */
2964                 if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
2965                         continue;
2966
2967                 /* ok, relevant, queue for apply */
2968                 f = palloc(sizeof(RewriteMappingFile));
2969                 f->lsn = f_lsn;
2970                 strcpy(f->fname, mapping_de->d_name);
2971                 files = lappend(files, f);
2972         }
2973         FreeDir(mapping_dir);
2974
2975         /* build array we can easily sort */
2976         files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
2977         off = 0;
2978         foreach(file, files)
2979         {
2980                 files_a[off++] = lfirst(file);
2981         }
2982
2983         /* sort files so we apply them in LSN order */
2984         qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
2985                   file_sort_by_lsn);
2986
2987         for(off = 0; off < list_length(files); off++)
2988         {
2989                 RewriteMappingFile *f = files_a[off];
2990                 elog(DEBUG1, "applying mapping: %s in %u", f->fname,
2991                         snapshot->subxip[0]);
2992                 ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
2993                 pfree(f);
2994         }
2995 }
2996
2997 /*
2998  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
2999  * combocids.
3000  */
3001 bool
3002 ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
3003                                                           Snapshot snapshot,
3004                                                           HeapTuple htup, Buffer buffer,
3005                                                           CommandId *cmin, CommandId *cmax)
3006 {
3007         ReorderBufferTupleCidKey key;
3008         ReorderBufferTupleCidEnt *ent;
3009         ForkNumber      forkno;
3010         BlockNumber blockno;
3011         bool updated_mapping = false;
3012
3013         /* be careful about padding */
3014         memset(&key, 0, sizeof(key));
3015
3016         Assert(!BufferIsLocal(buffer));
3017
3018         /*
3019          * get relfilenode from the buffer, no convenient way to access it other
3020          * than that.
3021          */
3022         BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3023
3024         /* tuples can only be in the main fork */
3025         Assert(forkno == MAIN_FORKNUM);
3026         Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3027
3028         ItemPointerCopy(&htup->t_self,
3029                                         &key.tid);
3030
3031 restart:
3032         ent = (ReorderBufferTupleCidEnt *)
3033                 hash_search(tuplecid_data,
3034                                         (void *) &key,
3035                                         HASH_FIND,
3036                                         NULL);
3037
3038         /*
3039          * failed to find a mapping, check whether the table was rewritten and
3040          * apply mapping if so, but only do that once - there can be no new
3041          * mappings while we are in here since we have to hold a lock on the
3042          * relation.
3043          */
3044         if (ent == NULL && !updated_mapping)
3045         {
3046                 UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3047                 /* now check but don't update for a mapping again */
3048                 updated_mapping = true;
3049                 goto restart;
3050         }
3051         else if (ent == NULL)
3052                 return false;
3053
3054         if (cmin)
3055                 *cmin = ent->cmin;
3056         if (cmax)
3057                 *cmax = ent->cmax;
3058         return true;
3059 }