]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/rewriteheap.c
Collection of typo fixes.
[postgresql] / src / backend / access / heap / rewriteheap.c
1 /*-------------------------------------------------------------------------
2  *
3  * rewriteheap.c
4  *        Support functions to rewrite tables.
5  *
6  * These functions provide a facility to completely rewrite a heap, while
7  * preserving visibility information and update chains.
8  *
9  * INTERFACE
10  *
11  * The caller is responsible for creating the new heap, all catalog
12  * changes, supplying the tuples to be written to the new heap, and
13  * rebuilding indexes.  The caller must hold AccessExclusiveLock on the
14  * target table, because we assume no one else is writing into it.
15  *
16  * To use the facility:
17  *
18  * begin_heap_rewrite
19  * while (fetch next tuple)
20  * {
21  *         if (tuple is dead)
22  *                 rewrite_heap_dead_tuple
23  *         else
24  *         {
25  *                 // do any transformations here if required
26  *                 rewrite_heap_tuple
27  *         }
28  * }
29  * end_heap_rewrite
30  *
31  * The contents of the new relation shouldn't be relied on until after
32  * end_heap_rewrite is called.
33  *
34  *
35  * IMPLEMENTATION
36  *
37  * This would be a fairly trivial affair, except that we need to maintain
38  * the ctid chains that link versions of an updated tuple together.
39  * Since the newly stored tuples will have tids different from the original
40  * ones, if we just copied t_ctid fields to the new table the links would
41  * be wrong.  When we are required to copy a (presumably recently-dead or
42  * delete-in-progress) tuple whose ctid doesn't point to itself, we have
43  * to substitute the correct ctid instead.
44  *
45  * For each ctid reference from A -> B, we might encounter either A first
46  * or B first.  (Note that a tuple in the middle of a chain is both A and B
47  * of different pairs.)
48  *
49  * If we encounter A first, we'll store the tuple in the unresolved_tups
50  * hash table. When we later encounter B, we remove A from the hash table,
51  * fix the ctid to point to the new location of B, and insert both A and B
52  * to the new heap.
53  *
54  * If we encounter B first, we can insert B to the new heap right away.
55  * We then add an entry to the old_new_tid_map hash table showing B's
56  * original tid (in the old heap) and new tid (in the new heap).
57  * When we later encounter A, we get the new location of B from the table,
58  * and can write A immediately with the correct ctid.
59  *
60  * Entries in the hash tables can be removed as soon as the later tuple
61  * is encountered.  That helps to keep the memory usage down.  At the end,
62  * both tables are usually empty; we should have encountered both A and B
63  * of each pair.  However, it's possible for A to be RECENTLY_DEAD and B
64  * entirely DEAD according to HeapTupleSatisfiesVacuum, because the test
65  * for deadness using OldestXmin is not exact.  In such a case we might
66  * encounter B first, and skip it, and find A later.  Then A would be added
67  * to unresolved_tups, and stay there until end of the rewrite.  Since
68  * this case is very unusual, we don't worry about the memory usage.
69  *
70  * Using in-memory hash tables means that we use some memory for each live
71  * update chain in the table, from the time we find one end of the
72  * reference until we find the other end.  That shouldn't be a problem in
73  * practice, but if you do something like an UPDATE without a where-clause
74  * on a large table, and then run CLUSTER in the same transaction, you
75  * could run out of memory.  It doesn't seem worthwhile to add support for
76  * spill-to-disk, as there shouldn't be that many RECENTLY_DEAD tuples in a
77  * table under normal circumstances.  Furthermore, in the typical scenario
78  * of CLUSTERing on an unchanging key column, we'll see all the versions
79  * of a given tuple together anyway, and so the peak memory usage is only
80  * proportional to the number of RECENTLY_DEAD versions of a single row, not
81  * in the whole table.  Note that if we do fail halfway through a CLUSTER,
82  * the old table is still valid, so failure is not catastrophic.
83  *
84  * We can't use the normal heap_insert function to insert into the new
85  * heap, because heap_insert overwrites the visibility information.
86  * We use a special-purpose raw_heap_insert function instead, which
87  * is optimized for bulk inserting a lot of tuples, knowing that we have
88  * exclusive access to the heap.  raw_heap_insert builds new pages in
89  * local storage.  When a page is full, or at the end of the process,
90  * we insert it to WAL as a single record and then write it to disk
91  * directly through smgr.  Note, however, that any data sent to the new
92  * heap's TOAST table will go through the normal bufmgr.
93  *
94  *
95  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
96  * Portions Copyright (c) 1994-5, Regents of the University of California
97  *
98  * IDENTIFICATION
99  *        src/backend/access/heap/rewriteheap.c
100  *
101  *-------------------------------------------------------------------------
102  */
103 #include "postgres.h"
104
105 #include <sys/stat.h>
106 #include <unistd.h>
107
108 #include "miscadmin.h"
109
110 #include "access/heapam.h"
111 #include "access/heapam_xlog.h"
112 #include "access/rewriteheap.h"
113 #include "access/transam.h"
114 #include "access/tuptoaster.h"
115 #include "access/xact.h"
116 #include "access/xloginsert.h"
117
118 #include "catalog/catalog.h"
119
120 #include "lib/ilist.h"
121
122 #include "replication/logical.h"
123 #include "replication/slot.h"
124
125 #include "storage/bufmgr.h"
126 #include "storage/fd.h"
127 #include "storage/smgr.h"
128
129 #include "utils/memutils.h"
130 #include "utils/rel.h"
131 #include "utils/tqual.h"
132
133 #include "storage/procarray.h"
134
135 /*
136  * State associated with a rewrite operation. This is opaque to the user
137  * of the rewrite facility.
138  */
139 typedef struct RewriteStateData
140 {
141         Relation        rs_old_rel;             /* source heap */
142         Relation        rs_new_rel;             /* destination heap */
143         Page            rs_buffer;              /* page currently being built */
144         BlockNumber rs_blockno;         /* block where page will go */
145         bool            rs_buffer_valid;        /* T if any tuples in buffer */
146         bool            rs_use_wal;             /* must we WAL-log inserts? */
147         bool            rs_logical_rewrite;             /* do we need to do logical rewriting */
148         TransactionId rs_oldest_xmin;           /* oldest xmin used by caller to
149                                                                                  * determine tuple visibility */
150         TransactionId rs_freeze_xid;/* Xid that will be used as freeze cutoff
151                                                                  * point */
152         TransactionId rs_logical_xmin;          /* Xid that will be used as cutoff
153                                                                                  * point for logical rewrites */
154         MultiXactId rs_cutoff_multi;/* MultiXactId that will be used as cutoff
155                                                                  * point for multixacts */
156         MemoryContext rs_cxt;           /* for hash tables and entries and tuples in
157                                                                  * them */
158         XLogRecPtr      rs_begin_lsn;   /* XLogInsertLsn when starting the rewrite */
159         HTAB       *rs_unresolved_tups;         /* unmatched A tuples */
160         HTAB       *rs_old_new_tid_map;         /* unmatched B tuples */
161         HTAB       *rs_logical_mappings;        /* logical remapping files */
162         uint32          rs_num_rewrite_mappings;                /* # in memory mappings */
163 }       RewriteStateData;
164
165 /*
166  * The lookup keys for the hash tables are tuple TID and xmin (we must check
167  * both to avoid false matches from dead tuples).  Beware that there is
168  * probably some padding space in this struct; it must be zeroed out for
169  * correct hashtable operation.
170  */
171 typedef struct
172 {
173         TransactionId xmin;                     /* tuple xmin */
174         ItemPointerData tid;            /* tuple location in old heap */
175 } TidHashKey;
176
177 /*
178  * Entry structures for the hash tables
179  */
180 typedef struct
181 {
182         TidHashKey      key;                    /* expected xmin/old location of B tuple */
183         ItemPointerData old_tid;        /* A's location in the old heap */
184         HeapTuple       tuple;                  /* A's tuple contents */
185 } UnresolvedTupData;
186
187 typedef UnresolvedTupData *UnresolvedTup;
188
189 typedef struct
190 {
191         TidHashKey      key;                    /* actual xmin/old location of B tuple */
192         ItemPointerData new_tid;        /* where we put it in the new heap */
193 } OldToNewMappingData;
194
195 typedef OldToNewMappingData *OldToNewMapping;
196
197 /*
198  * In-Memory data for an xid that might need logical remapping entries
199  * to be logged.
200  */
201 typedef struct RewriteMappingFile
202 {
203         TransactionId xid;                      /* xid that might need to see the row */
204         int                     vfd;                    /* fd of mappings file */
205         off_t           off;                    /* how far have we written yet */
206         uint32          num_mappings;   /* number of in-memory mappings */
207         dlist_head      mappings;               /* list of in-memory mappings */
208         char            path[MAXPGPATH];        /* path, for error messages */
209 } RewriteMappingFile;
210
211 /*
212  * A single In-Memeory logical rewrite mapping, hanging of
213  * RewriteMappingFile->mappings.
214  */
215 typedef struct RewriteMappingDataEntry
216 {
217         LogicalRewriteMappingData map;          /* map between old and new location of
218                                                                                  * the tuple */
219         dlist_node      node;
220 } RewriteMappingDataEntry;
221
222
223 /* prototypes for internal functions */
224 static void raw_heap_insert(RewriteState state, HeapTuple tup);
225
226 /* internal logical remapping prototypes */
227 static void logical_begin_heap_rewrite(RewriteState state);
228 static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple);
229 static void logical_end_heap_rewrite(RewriteState state);
230
231
232 /*
233  * Begin a rewrite of a table
234  *
235  * old_heap             old, locked heap relation tuples will be read from
236  * new_heap             new, locked heap relation to insert tuples to
237  * oldest_xmin  xid used by the caller to determine which tuples are dead
238  * freeze_xid   xid before which tuples will be frozen
239  * min_multi    multixact before which multis will be removed
240  * use_wal              should the inserts to the new heap be WAL-logged?
241  *
242  * Returns an opaque RewriteState, allocated in current memory context,
243  * to be used in subsequent calls to the other functions.
244  */
245 RewriteState
246 begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin,
247                                    TransactionId freeze_xid, MultiXactId cutoff_multi,
248                                    bool use_wal)
249 {
250         RewriteState state;
251         MemoryContext rw_cxt;
252         MemoryContext old_cxt;
253         HASHCTL         hash_ctl;
254
255         /*
256          * To ease cleanup, make a separate context that will contain the
257          * RewriteState struct itself plus all subsidiary data.
258          */
259         rw_cxt = AllocSetContextCreate(CurrentMemoryContext,
260                                                                    "Table rewrite",
261                                                                    ALLOCSET_DEFAULT_MINSIZE,
262                                                                    ALLOCSET_DEFAULT_INITSIZE,
263                                                                    ALLOCSET_DEFAULT_MAXSIZE);
264         old_cxt = MemoryContextSwitchTo(rw_cxt);
265
266         /* Create and fill in the state struct */
267         state = palloc0(sizeof(RewriteStateData));
268
269         state->rs_old_rel = old_heap;
270         state->rs_new_rel = new_heap;
271         state->rs_buffer = (Page) palloc(BLCKSZ);
272         /* new_heap needn't be empty, just locked */
273         state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
274         state->rs_buffer_valid = false;
275         state->rs_use_wal = use_wal;
276         state->rs_oldest_xmin = oldest_xmin;
277         state->rs_freeze_xid = freeze_xid;
278         state->rs_cutoff_multi = cutoff_multi;
279         state->rs_cxt = rw_cxt;
280
281         /* Initialize hash tables used to track update chains */
282         memset(&hash_ctl, 0, sizeof(hash_ctl));
283         hash_ctl.keysize = sizeof(TidHashKey);
284         hash_ctl.entrysize = sizeof(UnresolvedTupData);
285         hash_ctl.hcxt = state->rs_cxt;
286
287         state->rs_unresolved_tups =
288                 hash_create("Rewrite / Unresolved ctids",
289                                         128,            /* arbitrary initial size */
290                                         &hash_ctl,
291                                         HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
292
293         hash_ctl.entrysize = sizeof(OldToNewMappingData);
294
295         state->rs_old_new_tid_map =
296                 hash_create("Rewrite / Old to new tid map",
297                                         128,            /* arbitrary initial size */
298                                         &hash_ctl,
299                                         HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
300
301         MemoryContextSwitchTo(old_cxt);
302
303         logical_begin_heap_rewrite(state);
304
305         return state;
306 }
307
308 /*
309  * End a rewrite.
310  *
311  * state and any other resources are freed.
312  */
313 void
314 end_heap_rewrite(RewriteState state)
315 {
316         HASH_SEQ_STATUS seq_status;
317         UnresolvedTup unresolved;
318
319         /*
320          * Write any remaining tuples in the UnresolvedTups table. If we have any
321          * left, they should in fact be dead, but let's err on the safe side.
322          */
323         hash_seq_init(&seq_status, state->rs_unresolved_tups);
324
325         while ((unresolved = hash_seq_search(&seq_status)) != NULL)
326         {
327                 ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
328                 raw_heap_insert(state, unresolved->tuple);
329         }
330
331         /* Write the last page, if any */
332         if (state->rs_buffer_valid)
333         {
334                 if (state->rs_use_wal)
335                         log_newpage(&state->rs_new_rel->rd_node,
336                                                 MAIN_FORKNUM,
337                                                 state->rs_blockno,
338                                                 state->rs_buffer,
339                                                 true);
340                 RelationOpenSmgr(state->rs_new_rel);
341
342                 PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
343
344                 smgrextend(state->rs_new_rel->rd_smgr, MAIN_FORKNUM, state->rs_blockno,
345                                    (char *) state->rs_buffer, true);
346         }
347
348         /*
349          * If the rel is WAL-logged, must fsync before commit.  We use heap_sync
350          * to ensure that the toast table gets fsync'd too.
351          *
352          * It's obvious that we must do this when not WAL-logging. It's less
353          * obvious that we have to do it even if we did WAL-log the pages. The
354          * reason is the same as in tablecmds.c's copy_relation_data(): we're
355          * writing data that's not in shared buffers, and so a CHECKPOINT
356          * occurring during the rewriteheap operation won't have fsync'd data we
357          * wrote before the checkpoint.
358          */
359         if (RelationNeedsWAL(state->rs_new_rel))
360                 heap_sync(state->rs_new_rel);
361
362         logical_end_heap_rewrite(state);
363
364         /* Deleting the context frees everything */
365         MemoryContextDelete(state->rs_cxt);
366 }
367
368 /*
369  * Add a tuple to the new heap.
370  *
371  * Visibility information is copied from the original tuple, except that
372  * we "freeze" very-old tuples.  Note that since we scribble on new_tuple,
373  * it had better be temp storage not a pointer to the original tuple.
374  *
375  * state                opaque state as returned by begin_heap_rewrite
376  * old_tuple    original tuple in the old heap
377  * new_tuple    new, rewritten tuple to be inserted to new heap
378  */
379 void
380 rewrite_heap_tuple(RewriteState state,
381                                    HeapTuple old_tuple, HeapTuple new_tuple)
382 {
383         MemoryContext old_cxt;
384         ItemPointerData old_tid;
385         TidHashKey      hashkey;
386         bool            found;
387         bool            free_new;
388
389         old_cxt = MemoryContextSwitchTo(state->rs_cxt);
390
391         /*
392          * Copy the original tuple's visibility information into new_tuple.
393          *
394          * XXX we might later need to copy some t_infomask2 bits, too? Right now,
395          * we intentionally clear the HOT status bits.
396          */
397         memcpy(&new_tuple->t_data->t_choice.t_heap,
398                    &old_tuple->t_data->t_choice.t_heap,
399                    sizeof(HeapTupleFields));
400
401         new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
402         new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
403         new_tuple->t_data->t_infomask |=
404                 old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
405
406         /*
407          * While we have our hands on the tuple, we may as well freeze any
408          * eligible xmin or xmax, so that future VACUUM effort can be saved.
409          */
410         heap_freeze_tuple(new_tuple->t_data, state->rs_freeze_xid,
411                                           state->rs_cutoff_multi);
412
413         /*
414          * Invalid ctid means that ctid should point to the tuple itself. We'll
415          * override it later if the tuple is part of an update chain.
416          */
417         ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
418
419         /*
420          * If the tuple has been updated, check the old-to-new mapping hash table.
421          */
422         if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
423                   HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
424                 !(ItemPointerEquals(&(old_tuple->t_self),
425                                                         &(old_tuple->t_data->t_ctid))))
426         {
427                 OldToNewMapping mapping;
428
429                 memset(&hashkey, 0, sizeof(hashkey));
430                 hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
431                 hashkey.tid = old_tuple->t_data->t_ctid;
432
433                 mapping = (OldToNewMapping)
434                         hash_search(state->rs_old_new_tid_map, &hashkey,
435                                                 HASH_FIND, NULL);
436
437                 if (mapping != NULL)
438                 {
439                         /*
440                          * We've already copied the tuple that t_ctid points to, so we can
441                          * set the ctid of this tuple to point to the new location, and
442                          * insert it right away.
443                          */
444                         new_tuple->t_data->t_ctid = mapping->new_tid;
445
446                         /* We don't need the mapping entry anymore */
447                         hash_search(state->rs_old_new_tid_map, &hashkey,
448                                                 HASH_REMOVE, &found);
449                         Assert(found);
450                 }
451                 else
452                 {
453                         /*
454                          * We haven't seen the tuple t_ctid points to yet. Stash this
455                          * tuple into unresolved_tups to be written later.
456                          */
457                         UnresolvedTup unresolved;
458
459                         unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
460                                                                          HASH_ENTER, &found);
461                         Assert(!found);
462
463                         unresolved->old_tid = old_tuple->t_self;
464                         unresolved->tuple = heap_copytuple(new_tuple);
465
466                         /*
467                          * We can't do anything more now, since we don't know where the
468                          * tuple will be written.
469                          */
470                         MemoryContextSwitchTo(old_cxt);
471                         return;
472                 }
473         }
474
475         /*
476          * Now we will write the tuple, and then check to see if it is the B tuple
477          * in any new or known pair.  When we resolve a known pair, we will be
478          * able to write that pair's A tuple, and then we have to check if it
479          * resolves some other pair.  Hence, we need a loop here.
480          */
481         old_tid = old_tuple->t_self;
482         free_new = false;
483
484         for (;;)
485         {
486                 ItemPointerData new_tid;
487
488                 /* Insert the tuple and find out where it's put in new_heap */
489                 raw_heap_insert(state, new_tuple);
490                 new_tid = new_tuple->t_self;
491
492                 logical_rewrite_heap_tuple(state, old_tid, new_tuple);
493
494                 /*
495                  * If the tuple is the updated version of a row, and the prior version
496                  * wouldn't be DEAD yet, then we need to either resolve the prior
497                  * version (if it's waiting in rs_unresolved_tups), or make an entry
498                  * in rs_old_new_tid_map (so we can resolve it when we do see it). The
499                  * previous tuple's xmax would equal this one's xmin, so it's
500                  * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
501                  */
502                 if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
503                         !TransactionIdPrecedes(HeapTupleHeaderGetXmin(new_tuple->t_data),
504                                                                    state->rs_oldest_xmin))
505                 {
506                         /*
507                          * Okay, this is B in an update pair.  See if we've seen A.
508                          */
509                         UnresolvedTup unresolved;
510
511                         memset(&hashkey, 0, sizeof(hashkey));
512                         hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
513                         hashkey.tid = old_tid;
514
515                         unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
516                                                                          HASH_FIND, NULL);
517
518                         if (unresolved != NULL)
519                         {
520                                 /*
521                                  * We have seen and memorized the previous tuple already. Now
522                                  * that we know where we inserted the tuple its t_ctid points
523                                  * to, fix its t_ctid and insert it to the new heap.
524                                  */
525                                 if (free_new)
526                                         heap_freetuple(new_tuple);
527                                 new_tuple = unresolved->tuple;
528                                 free_new = true;
529                                 old_tid = unresolved->old_tid;
530                                 new_tuple->t_data->t_ctid = new_tid;
531
532                                 /*
533                                  * We don't need the hash entry anymore, but don't free its
534                                  * tuple just yet.
535                                  */
536                                 hash_search(state->rs_unresolved_tups, &hashkey,
537                                                         HASH_REMOVE, &found);
538                                 Assert(found);
539
540                                 /* loop back to insert the previous tuple in the chain */
541                                 continue;
542                         }
543                         else
544                         {
545                                 /*
546                                  * Remember the new tid of this tuple. We'll use it to set the
547                                  * ctid when we find the previous tuple in the chain.
548                                  */
549                                 OldToNewMapping mapping;
550
551                                 mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
552                                                                           HASH_ENTER, &found);
553                                 Assert(!found);
554
555                                 mapping->new_tid = new_tid;
556                         }
557                 }
558
559                 /* Done with this (chain of) tuples, for now */
560                 if (free_new)
561                         heap_freetuple(new_tuple);
562                 break;
563         }
564
565         MemoryContextSwitchTo(old_cxt);
566 }
567
568 /*
569  * Register a dead tuple with an ongoing rewrite. Dead tuples are not
570  * copied to the new table, but we still make note of them so that we
571  * can release some resources earlier.
572  *
573  * Returns true if a tuple was removed from the unresolved_tups table.
574  * This indicates that that tuple, previously thought to be "recently dead",
575  * is now known really dead and won't be written to the output.
576  */
577 bool
578 rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
579 {
580         /*
581          * If we have already seen an earlier tuple in the update chain that
582          * points to this tuple, let's forget about that earlier tuple. It's in
583          * fact dead as well, our simple xmax < OldestXmin test in
584          * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
585          * when xmin of a tuple is greater than xmax, which sounds
586          * counter-intuitive but is perfectly valid.
587          *
588          * We don't bother to try to detect the situation the other way round,
589          * when we encounter the dead tuple first and then the recently dead one
590          * that points to it. If that happens, we'll have some unmatched entries
591          * in the UnresolvedTups hash table at the end. That can happen anyway,
592          * because a vacuum might have removed the dead tuple in the chain before
593          * us.
594          */
595         UnresolvedTup unresolved;
596         TidHashKey      hashkey;
597         bool            found;
598
599         memset(&hashkey, 0, sizeof(hashkey));
600         hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
601         hashkey.tid = old_tuple->t_self;
602
603         unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
604                                                          HASH_FIND, NULL);
605
606         if (unresolved != NULL)
607         {
608                 /* Need to free the contained tuple as well as the hashtable entry */
609                 heap_freetuple(unresolved->tuple);
610                 hash_search(state->rs_unresolved_tups, &hashkey,
611                                         HASH_REMOVE, &found);
612                 Assert(found);
613                 return true;
614         }
615
616         return false;
617 }
618
619 /*
620  * Insert a tuple to the new relation.  This has to track heap_insert
621  * and its subsidiary functions!
622  *
623  * t_self of the tuple is set to the new TID of the tuple. If t_ctid of the
624  * tuple is invalid on entry, it's replaced with the new TID as well (in
625  * the inserted data only, not in the caller's copy).
626  */
627 static void
628 raw_heap_insert(RewriteState state, HeapTuple tup)
629 {
630         Page            page = state->rs_buffer;
631         Size            pageFreeSpace,
632                                 saveFreeSpace;
633         Size            len;
634         OffsetNumber newoff;
635         HeapTuple       heaptup;
636
637         /*
638          * If the new tuple is too big for storage or contains already toasted
639          * out-of-line attributes from some other relation, invoke the toaster.
640          *
641          * Note: below this point, heaptup is the data we actually intend to store
642          * into the relation; tup is the caller's original untoasted data.
643          */
644         if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
645         {
646                 /* toast table entries should never be recursively toasted */
647                 Assert(!HeapTupleHasExternal(tup));
648                 heaptup = tup;
649         }
650         else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
651                 heaptup = toast_insert_or_update(state->rs_new_rel, tup, NULL,
652                                                                                  HEAP_INSERT_SKIP_FSM |
653                                                                                  (state->rs_use_wal ?
654                                                                                   0 : HEAP_INSERT_SKIP_WAL));
655         else
656                 heaptup = tup;
657
658         len = MAXALIGN(heaptup->t_len);         /* be conservative */
659
660         /*
661          * If we're gonna fail for oversize tuple, do it right away
662          */
663         if (len > MaxHeapTupleSize)
664                 ereport(ERROR,
665                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
666                                  errmsg("row is too big: size %zu, maximum size %zu",
667                                                 len, MaxHeapTupleSize)));
668
669         /* Compute desired extra freespace due to fillfactor option */
670         saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
671                                                                                                    HEAP_DEFAULT_FILLFACTOR);
672
673         /* Now we can check to see if there's enough free space already. */
674         if (state->rs_buffer_valid)
675         {
676                 pageFreeSpace = PageGetHeapFreeSpace(page);
677
678                 if (len + saveFreeSpace > pageFreeSpace)
679                 {
680                         /* Doesn't fit, so write out the existing page */
681
682                         /* XLOG stuff */
683                         if (state->rs_use_wal)
684                                 log_newpage(&state->rs_new_rel->rd_node,
685                                                         MAIN_FORKNUM,
686                                                         state->rs_blockno,
687                                                         page,
688                                                         true);
689
690                         /*
691                          * Now write the page. We say isTemp = true even if it's not a
692                          * temp table, because there's no need for smgr to schedule an
693                          * fsync for this write; we'll do it ourselves in
694                          * end_heap_rewrite.
695                          */
696                         RelationOpenSmgr(state->rs_new_rel);
697
698                         PageSetChecksumInplace(page, state->rs_blockno);
699
700                         smgrextend(state->rs_new_rel->rd_smgr, MAIN_FORKNUM,
701                                            state->rs_blockno, (char *) page, true);
702
703                         state->rs_blockno++;
704                         state->rs_buffer_valid = false;
705                 }
706         }
707
708         if (!state->rs_buffer_valid)
709         {
710                 /* Initialize a new empty page */
711                 PageInit(page, BLCKSZ, 0);
712                 state->rs_buffer_valid = true;
713         }
714
715         /* And now we can insert the tuple into the page */
716         newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
717                                                  InvalidOffsetNumber, false, true);
718         if (newoff == InvalidOffsetNumber)
719                 elog(ERROR, "failed to add tuple");
720
721         /* Update caller's t_self to the actual position where it was stored */
722         ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
723
724         /*
725          * Insert the correct position into CTID of the stored tuple, too, if the
726          * caller didn't supply a valid CTID.
727          */
728         if (!ItemPointerIsValid(&tup->t_data->t_ctid))
729         {
730                 ItemId          newitemid;
731                 HeapTupleHeader onpage_tup;
732
733                 newitemid = PageGetItemId(page, newoff);
734                 onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
735
736                 onpage_tup->t_ctid = tup->t_self;
737         }
738
739         /* If heaptup is a private copy, release it. */
740         if (heaptup != tup)
741                 heap_freetuple(heaptup);
742 }
743
744 /* ------------------------------------------------------------------------
745  * Logical rewrite support
746  *
747  * When doing logical decoding - which relies on using cmin/cmax of catalog
748  * tuples, via xl_heap_new_cid records - heap rewrites have to log enough
749  * information to allow the decoding backend to updates its internal mapping
750  * of (relfilenode,ctid) => (cmin, cmax) to be correct for the rewritten heap.
751  *
752  * For that, every time we find a tuple that's been modified in a catalog
753  * relation within the xmin horizon of any decoding slot, we log a mapping
754  * from the old to the new location.
755  *
756  * To deal with rewrites that abort the filename of a mapping file contains
757  * the xid of the transaction performing the rewrite, which then can be
758  * checked before being read in.
759  *
760  * For efficiency we don't immediately spill every single map mapping for a
761  * row to disk but only do so in batches when we've collected several of them
762  * in memory or when end_heap_rewrite() has been called.
763  *
764  * Crash-Safety: This module diverts from the usual patterns of doing WAL
765  * since it cannot rely on checkpoint flushing out all buffers and thus
766  * waiting for exlusive locks on buffers. Usually the XLogInsert() covering
767  * buffer modifications is performed while the buffer(s) that are being
768  * modified are exlusively locked guaranteeing that both the WAL record and
769  * the modified heap are on either side of the checkpoint. But since the
770  * mapping files we log aren't in shared_buffers that interlock doesn't work.
771  *
772  * Instead we simply write the mapping files out to disk, *before* the
773  * XLogInsert() is performed. That guarantees that either the XLogInsert() is
774  * inserted after the checkpoint's redo pointer or that the checkpoint (via
775  * LogicalRewriteHeapCheckpoint()) has flushed the (partial) mapping file to
776  * disk. That leaves the tail end that has not yet been flushed open to
777  * corruption, which is solved by including the current offset in the
778  * xl_heap_rewrite_mapping records and truncating the mapping file to it
779  * during replay. Every time a rewrite is finished all generated mapping files
780  * are synced to disk.
781  *
782  * Note that if we were only concerned about crash safety we wouldn't have to
783  * deal with WAL logging at all - an fsync() at the end of a rewrite would be
784  * sufficient for crash safety. Any mapping that hasn't been safely flushed to
785  * disk has to be by an aborted (explicitly or via a crash) transaction and is
786  * ignored by virtue of the xid in its name being subject to a
787  * TransactionDidCommit() check. But we want to support having standbys via
788  * physical replication, both for availability and to do logical decoding
789  * there.
790  * ------------------------------------------------------------------------
791  */
792
793 /*
794  * Do preparations for logging logical mappings during a rewrite if
795  * necessary. If we detect that we don't need to log anything we'll prevent
796  * any further action by the various logical rewrite functions.
797  */
798 static void
799 logical_begin_heap_rewrite(RewriteState state)
800 {
801         HASHCTL         hash_ctl;
802         TransactionId logical_xmin;
803
804         /*
805          * We only need to persist these mappings if the rewritten table can be
806          * accessed during logical decoding, if not, we can skip doing any
807          * additional work.
808          */
809         state->rs_logical_rewrite =
810                 RelationIsAccessibleInLogicalDecoding(state->rs_old_rel);
811
812         if (!state->rs_logical_rewrite)
813                 return;
814
815         ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
816
817         /*
818          * If there are no logical slots in progress we don't need to do anything,
819          * there cannot be any remappings for relevant rows yet. The relation's
820          * lock protects us against races.
821          */
822         if (logical_xmin == InvalidTransactionId)
823         {
824                 state->rs_logical_rewrite = false;
825                 return;
826         }
827
828         state->rs_logical_xmin = logical_xmin;
829         state->rs_begin_lsn = GetXLogInsertRecPtr();
830         state->rs_num_rewrite_mappings = 0;
831
832         memset(&hash_ctl, 0, sizeof(hash_ctl));
833         hash_ctl.keysize = sizeof(TransactionId);
834         hash_ctl.entrysize = sizeof(RewriteMappingFile);
835         hash_ctl.hcxt = state->rs_cxt;
836
837         state->rs_logical_mappings =
838                 hash_create("Logical rewrite mapping",
839                                         128,            /* arbitrary initial size */
840                                         &hash_ctl,
841                                         HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
842 }
843
844 /*
845  * Flush all logical in-memory mappings to disk, but don't fsync them yet.
846  */
847 static void
848 logical_heap_rewrite_flush_mappings(RewriteState state)
849 {
850         HASH_SEQ_STATUS seq_status;
851         RewriteMappingFile *src;
852         dlist_mutable_iter iter;
853
854         Assert(state->rs_logical_rewrite);
855
856         /* no logical rewrite in progress, no need to iterate over mappings */
857         if (state->rs_num_rewrite_mappings == 0)
858                 return;
859
860         elog(DEBUG1, "flushing %u logical rewrite mapping entries",
861                  state->rs_num_rewrite_mappings);
862
863         hash_seq_init(&seq_status, state->rs_logical_mappings);
864         while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
865         {
866                 char       *waldata;
867                 char       *waldata_start;
868                 xl_heap_rewrite_mapping xlrec;
869                 Oid                     dboid;
870                 uint32          len;
871                 int                     written;
872
873                 /* this file hasn't got any new mappings */
874                 if (src->num_mappings == 0)
875                         continue;
876
877                 if (state->rs_old_rel->rd_rel->relisshared)
878                         dboid = InvalidOid;
879                 else
880                         dboid = MyDatabaseId;
881
882                 xlrec.num_mappings = src->num_mappings;
883                 xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
884                 xlrec.mapped_xid = src->xid;
885                 xlrec.mapped_db = dboid;
886                 xlrec.offset = src->off;
887                 xlrec.start_lsn = state->rs_begin_lsn;
888
889                 /* write all mappings consecutively */
890                 len = src->num_mappings * sizeof(LogicalRewriteMappingData);
891                 waldata_start = waldata = palloc(len);
892
893                 /*
894                  * collect data we need to write out, but don't modify ondisk data yet
895                  */
896                 dlist_foreach_modify(iter, &src->mappings)
897                 {
898                         RewriteMappingDataEntry *pmap;
899
900                         pmap = dlist_container(RewriteMappingDataEntry, node, iter.cur);
901
902                         memcpy(waldata, &pmap->map, sizeof(pmap->map));
903                         waldata += sizeof(pmap->map);
904
905                         /* remove from the list and free */
906                         dlist_delete(&pmap->node);
907                         pfree(pmap);
908
909                         /* update bookkeeping */
910                         state->rs_num_rewrite_mappings--;
911                         src->num_mappings--;
912                 }
913
914                 Assert(src->num_mappings == 0);
915                 Assert(waldata == waldata_start + len);
916
917                 /*
918                  * Note that we deviate from the usual WAL coding practices here,
919                  * check the above "Logical rewrite support" comment for reasoning.
920                  */
921                 written = FileWrite(src->vfd, waldata_start, len);
922                 if (written != len)
923                         ereport(ERROR,
924                                         (errcode_for_file_access(),
925                                          errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
926                                                         written, len)));
927                 src->off += len;
928
929                 XLogBeginInsert();
930                 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
931                 XLogRegisterData(waldata_start, len);
932
933                 /* write xlog record */
934                 XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE);
935
936                 pfree(waldata_start);
937         }
938         Assert(state->rs_num_rewrite_mappings == 0);
939 }
940
941 /*
942  * Logical remapping part of end_heap_rewrite().
943  */
944 static void
945 logical_end_heap_rewrite(RewriteState state)
946 {
947         HASH_SEQ_STATUS seq_status;
948         RewriteMappingFile *src;
949
950         /* done, no logical rewrite in progress */
951         if (!state->rs_logical_rewrite)
952                 return;
953
954         /* writeout remaining in-memory entries */
955         if (state->rs_num_rewrite_mappings > 0)
956                 logical_heap_rewrite_flush_mappings(state);
957
958         /* Iterate over all mappings we have written and fsync the files. */
959         hash_seq_init(&seq_status, state->rs_logical_mappings);
960         while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
961         {
962                 if (FileSync(src->vfd) != 0)
963                         ereport(ERROR,
964                                         (errcode_for_file_access(),
965                                          errmsg("could not fsync file \"%s\": %m", src->path)));
966                 FileClose(src->vfd);
967         }
968         /* memory context cleanup will deal with the rest */
969 }
970
971 /*
972  * Log a single (old->new) mapping for 'xid'.
973  */
974 static void
975 logical_rewrite_log_mapping(RewriteState state, TransactionId xid,
976                                                         LogicalRewriteMappingData *map)
977 {
978         RewriteMappingFile *src;
979         RewriteMappingDataEntry *pmap;
980         Oid                     relid;
981         bool            found;
982
983         relid = RelationGetRelid(state->rs_old_rel);
984
985         /* look for existing mappings for this 'mapped' xid */
986         src = hash_search(state->rs_logical_mappings, &xid,
987                                           HASH_ENTER, &found);
988
989         /*
990          * We haven't yet had the need to map anything for this xid, create
991          * per-xid data structures.
992          */
993         if (!found)
994         {
995                 char            path[MAXPGPATH];
996                 Oid                     dboid;
997
998                 if (state->rs_old_rel->rd_rel->relisshared)
999                         dboid = InvalidOid;
1000                 else
1001                         dboid = MyDatabaseId;
1002
1003                 snprintf(path, MAXPGPATH,
1004                                  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1005                                  dboid, relid,
1006                                  (uint32) (state->rs_begin_lsn >> 32),
1007                                  (uint32) state->rs_begin_lsn,
1008                                  xid, GetCurrentTransactionId());
1009
1010                 dlist_init(&src->mappings);
1011                 src->num_mappings = 0;
1012                 src->off = 0;
1013                 memcpy(src->path, path, sizeof(path));
1014                 src->vfd = PathNameOpenFile(path,
1015                                                                         O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1016                                                                         S_IRUSR | S_IWUSR);
1017                 if (src->vfd < 0)
1018                         ereport(ERROR,
1019                                         (errcode_for_file_access(),
1020                                          errmsg("could not create file \"%s\": %m", path)));
1021         }
1022
1023         pmap = MemoryContextAlloc(state->rs_cxt,
1024                                                           sizeof(RewriteMappingDataEntry));
1025         memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
1026         dlist_push_tail(&src->mappings, &pmap->node);
1027         src->num_mappings++;
1028         state->rs_num_rewrite_mappings++;
1029
1030         /*
1031          * Write out buffer every time we've too many in-memory entries across all
1032          * mapping files.
1033          */
1034         if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
1035                 logical_heap_rewrite_flush_mappings(state);
1036 }
1037
1038 /*
1039  * Perform logical remapping for a tuple that's mapped from old_tid to
1040  * new_tuple->t_self by rewrite_heap_tuple() if necessary for the tuple.
1041  */
1042 static void
1043 logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid,
1044                                                    HeapTuple new_tuple)
1045 {
1046         ItemPointerData new_tid = new_tuple->t_self;
1047         TransactionId cutoff = state->rs_logical_xmin;
1048         TransactionId xmin;
1049         TransactionId xmax;
1050         bool            do_log_xmin = false;
1051         bool            do_log_xmax = false;
1052         LogicalRewriteMappingData map;
1053
1054         /* no logical rewrite in progress, we don't need to log anything */
1055         if (!state->rs_logical_rewrite)
1056                 return;
1057
1058         xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1059         /* use *GetUpdateXid to correctly deal with multixacts */
1060         xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1061
1062         /*
1063          * Log the mapping iff the tuple has been created recently.
1064          */
1065         if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
1066                 do_log_xmin = true;
1067
1068         if (!TransactionIdIsNormal(xmax))
1069         {
1070                 /*
1071                  * no xmax is set, can't have any permanent ones, so this check is
1072                  * sufficient
1073                  */
1074         }
1075         else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
1076         {
1077                 /* only locked, we don't care */
1078         }
1079         else if (!TransactionIdPrecedes(xmax, cutoff))
1080         {
1081                 /* tuple has been deleted recently, log */
1082                 do_log_xmax = true;
1083         }
1084
1085         /* if neither needs to be logged, we're done */
1086         if (!do_log_xmin && !do_log_xmax)
1087                 return;
1088
1089         /* fill out mapping information */
1090         map.old_node = state->rs_old_rel->rd_node;
1091         map.old_tid = old_tid;
1092         map.new_node = state->rs_new_rel->rd_node;
1093         map.new_tid = new_tid;
1094
1095         /* ---
1096          * Now persist the mapping for the individual xids that are affected. We
1097          * need to log for both xmin and xmax if they aren't the same transaction
1098          * since the mapping files are per "affected" xid.
1099          * We don't muster all that much effort detecting whether xmin and xmax
1100          * are actually the same transaction, we just check whether the xid is the
1101          * same disregarding subtransactions. Logging too much is relatively
1102          * harmless and we could never do the check fully since subtransaction
1103          * data is thrown away during restarts.
1104          * ---
1105          */
1106         if (do_log_xmin)
1107                 logical_rewrite_log_mapping(state, xmin, &map);
1108         /* separately log mapping for xmax unless it'd be redundant */
1109         if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1110                 logical_rewrite_log_mapping(state, xmax, &map);
1111 }
1112
1113 /*
1114  * Replay XLOG_HEAP2_REWRITE records
1115  */
1116 void
1117 heap_xlog_logical_rewrite(XLogReaderState *r)
1118 {
1119         char            path[MAXPGPATH];
1120         int                     fd;
1121         xl_heap_rewrite_mapping *xlrec;
1122         uint32          len;
1123         char       *data;
1124
1125         xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
1126
1127         snprintf(path, MAXPGPATH,
1128                          "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1129                          xlrec->mapped_db, xlrec->mapped_rel,
1130                          (uint32) (xlrec->start_lsn >> 32),
1131                          (uint32) xlrec->start_lsn,
1132                          xlrec->mapped_xid, XLogRecGetXid(r));
1133
1134         fd = OpenTransientFile(path,
1135                                                    O_CREAT | O_WRONLY | PG_BINARY,
1136                                                    S_IRUSR | S_IWUSR);
1137         if (fd < 0)
1138                 ereport(ERROR,
1139                                 (errcode_for_file_access(),
1140                                  errmsg("could not create file \"%s\": %m", path)));
1141
1142         /*
1143          * Truncate all data that's not guaranteed to have been safely fsynced (by
1144          * previous record or by the last checkpoint).
1145          */
1146         if (ftruncate(fd, xlrec->offset) != 0)
1147                 ereport(ERROR,
1148                                 (errcode_for_file_access(),
1149                                  errmsg("could not truncate file \"%s\" to %u: %m",
1150                                                 path, (uint32) xlrec->offset)));
1151
1152         /* now seek to the position we want to write our data to */
1153         if (lseek(fd, xlrec->offset, SEEK_SET) != xlrec->offset)
1154                 ereport(ERROR,
1155                                 (errcode_for_file_access(),
1156                                  errmsg("could not seek to end of file \"%s\": %m",
1157                                                 path)));
1158
1159         data = XLogRecGetData(r) + sizeof(*xlrec);
1160
1161         len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
1162
1163         /* write out tail end of mapping file (again) */
1164         if (write(fd, data, len) != len)
1165                 ereport(ERROR,
1166                                 (errcode_for_file_access(),
1167                                  errmsg("could not write to file \"%s\": %m", path)));
1168
1169         /*
1170          * Now fsync all previously written data. We could improve things and only
1171          * do this for the last write to a file, but the required bookkeeping
1172          * doesn't seem worth the trouble.
1173          */
1174         if (pg_fsync(fd) != 0)
1175                 ereport(ERROR,
1176                                 (errcode_for_file_access(),
1177                                  errmsg("could not fsync file \"%s\": %m", path)));
1178
1179         CloseTransientFile(fd);
1180 }
1181
1182 /* ---
1183  * Perform a checkpoint for logical rewrite mappings
1184  *
1185  * This serves two tasks:
1186  * 1) Remove all mappings not needed anymore based on the logical restart LSN
1187  * 2) Flush all remaining mappings to disk, so that replay after a checkpoint
1188  *        only has to deal with the parts of a mapping that have been written out
1189  *        after the checkpoint started.
1190  * ---
1191  */
1192 void
1193 CheckPointLogicalRewriteHeap(void)
1194 {
1195         XLogRecPtr      cutoff;
1196         XLogRecPtr      redo;
1197         DIR                *mappings_dir;
1198         struct dirent *mapping_de;
1199         char            path[MAXPGPATH];
1200
1201         /*
1202          * We start of with a minimum of the last redo pointer. No new decoding
1203          * slot will start before that, so that's a safe upper bound for removal.
1204          */
1205         redo = GetRedoRecPtr();
1206
1207         /* now check for the restart ptrs from existing slots */
1208         cutoff = ReplicationSlotsComputeLogicalRestartLSN();
1209
1210         /* don't start earlier than the restart lsn */
1211         if (cutoff != InvalidXLogRecPtr && redo < cutoff)
1212                 cutoff = redo;
1213
1214         mappings_dir = AllocateDir("pg_logical/mappings");
1215         while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
1216         {
1217                 struct stat statbuf;
1218                 Oid                     dboid;
1219                 Oid                     relid;
1220                 XLogRecPtr      lsn;
1221                 TransactionId rewrite_xid;
1222                 TransactionId create_xid;
1223                 uint32          hi,
1224                                         lo;
1225
1226                 if (strcmp(mapping_de->d_name, ".") == 0 ||
1227                         strcmp(mapping_de->d_name, "..") == 0)
1228                         continue;
1229
1230                 snprintf(path, MAXPGPATH, "pg_logical/mappings/%s", mapping_de->d_name);
1231                 if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1232                         continue;
1233
1234                 /* Skip over files that cannot be ours. */
1235                 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1236                         continue;
1237
1238                 if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
1239                                    &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1240                         elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1241
1242                 lsn = ((uint64) hi) << 32 | lo;
1243
1244                 if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1245                 {
1246                         elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1247                         if (unlink(path) < 0)
1248                                 ereport(ERROR,
1249                                                 (errcode_for_file_access(),
1250                                                  errmsg("could not remove file \"%s\": %m", path)));
1251                 }
1252                 else
1253                 {
1254                         int                     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1255
1256                         /*
1257                          * The file cannot vanish due to concurrency since this function
1258                          * is the only one removing logical mappings and it's run while
1259                          * CheckpointLock is held exclusively.
1260                          */
1261                         if (fd < 0)
1262                                 ereport(ERROR,
1263                                                 (errcode_for_file_access(),
1264                                                  errmsg("could not open file \"%s\": %m", path)));
1265
1266                         /*
1267                          * We could try to avoid fsyncing files that either haven't
1268                          * changed or have only been created since the checkpoint's start,
1269                          * but it's currently not deemed worth the effort.
1270                          */
1271                         else if (pg_fsync(fd) != 0)
1272                                 ereport(ERROR,
1273                                                 (errcode_for_file_access(),
1274                                                  errmsg("could not fsync file \"%s\": %m", path)));
1275                         CloseTransientFile(fd);
1276                 }
1277         }
1278         FreeDir(mappings_dir);
1279 }