]> granicus.if.org Git - postgresql/blob - src/backend/storage/freespace/freespace.c
724e87fa2044714b9f4f9a5a6c63d2e2478d9877
[postgresql] / src / backend / storage / freespace / freespace.c
1 /*-------------------------------------------------------------------------
2  *
3  * freespace.c
4  *        POSTGRES free space map for quickly finding free space in relations
5  *
6  *
7  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/storage/freespace/freespace.c,v 1.65 2008/10/31 15:05:00 heikki Exp $
12  *
13  *
14  * NOTES:
15  *
16  *  Free Space Map keeps track of the amount of free space on pages, and
17  *  allows quickly searching for a page with enough free space. The FSM is
18  *  stored in a dedicated relation fork of all heap relations, and those
19  *  index access methods that need it (see also indexfsm.c). See README for
20  *  more information.
21  *
22  *-------------------------------------------------------------------------
23  */
24 #include "postgres.h"
25
26 #include "access/htup.h"
27 #include "access/xlogutils.h"
28 #include "storage/bufpage.h"
29 #include "storage/bufmgr.h"
30 #include "storage/freespace.h"
31 #include "storage/fsm_internals.h"
32 #include "storage/lmgr.h"
33 #include "storage/lwlock.h"
34 #include "storage/smgr.h"
35 #include "utils/rel.h"
36 #include "utils/inval.h"
37 #include "miscadmin.h"
38
39 /*
40  * We use just one byte to store the amount of free space on a page, so we
41  * divide the amount of free space a page can have into 256 different
42  * categories. The highest category, 255, represents a page with at least
43  * MaxFSMRequestSize bytes of free space, and the second highest category
44  * represents the range from 254 * FSM_CAT_STEP, inclusive, to
45  * MaxFSMRequestSize, exclusive.
46  *
47  * MaxFSMRequestSize depends on the architecture and BLCKSZ, but assuming
48  * default 8k BLCKSZ, and that MaxFSMRequestSize is 24 bytes, the categories
49  * look like this
50  * 
51  *
52  * Range     Category
53  * 0    - 31   0
54  * 32   - 63   1
55  * ...    ...  ...
56  * 8096 - 8127 253
57  * 8128 - 8163 254
58  * 8164 - 8192 255
59  *
60  * The reason that MaxFSMRequestSize is special is that if MaxFSMRequestSize
61  * isn't equal to a range boundary, a page with exactly MaxFSMRequestSize
62  * bytes of free space wouldn't satisfy a request for MaxFSMRequestSize
63  * bytes. If there isn't more than MaxFSMRequestSize bytes of free space on a
64  * completely empty page, that would mean that we could never satisfy a
65  * request of exactly MaxFSMRequestSize bytes.
66  */
67 #define FSM_CATEGORIES  256
68 #define FSM_CAT_STEP    (BLCKSZ / FSM_CATEGORIES)
69 #define MaxFSMRequestSize       MaxHeapTupleSize
70
71 /*
72  * Depth of the on-disk tree. We need to be able to address 2^32-1 blocks,
73  * and 1626 is the smallest number that satisfies X^3 >= 2^32-1. Likewise,
74  * 216 is the smallest number that satisfies X^4 >= 2^32-1. In practice,
75  * this means that 4096 bytes is the smallest BLCKSZ that we can get away
76  * with a 3-level tree, and 512 is the smallest we support.
77  */
78 #define FSM_TREE_DEPTH  ((SlotsPerFSMPage >= 1626) ? 3 : 4)
79
80 #define FSM_ROOT_LEVEL  (FSM_TREE_DEPTH - 1)
81 #define FSM_BOTTOM_LEVEL 0
82
83 /*
84  * The internal FSM routines work on a logical addressing scheme. Each
85  * level of the tree can be thought of as a separately addressable file.
86  */
87 typedef struct
88 {
89         int level;              /* level */
90         int logpageno;  /* page number within the level */
91 } FSMAddress;
92
93 /* Address of the root page. */
94 static const FSMAddress FSM_ROOT_ADDRESS = { FSM_ROOT_LEVEL, 0 };
95
96 /* XLOG record types */
97 #define XLOG_FSM_TRUNCATE     0x00    /* truncate */
98
99 typedef struct
100 {
101         RelFileNode node;                       /* truncated relation */
102         BlockNumber nheapblocks;        /* new number of blocks in the heap */
103 } xl_fsm_truncate;
104
105 /* functions to navigate the tree */
106 static FSMAddress fsm_get_child(FSMAddress parent, uint16 slot);
107 static FSMAddress fsm_get_parent(FSMAddress child, uint16 *slot);
108 static FSMAddress fsm_get_location(BlockNumber heapblk, uint16 *slot);
109 static BlockNumber fsm_get_heap_blk(FSMAddress addr, uint16 slot);
110 static BlockNumber fsm_logical_to_physical(FSMAddress addr);
111
112 static Buffer fsm_readbuf(Relation rel, FSMAddress addr, bool extend);
113 static void fsm_extend(Relation rel, BlockNumber nfsmblocks);
114
115 /* functions to convert amount of free space to a FSM category */
116 static uint8 fsm_space_avail_to_cat(Size avail);
117 static uint8 fsm_space_needed_to_cat(Size needed);
118 static Size  fsm_space_cat_to_avail(uint8 cat);
119
120 /* workhorse functions for various operations */
121 static int fsm_set_and_search(Relation rel, FSMAddress addr, uint16 slot,
122                                                           uint8 newValue, uint8 minValue);
123 static BlockNumber fsm_search(Relation rel, uint8 min_cat);
124 static uint8 fsm_vacuum_page(Relation rel, FSMAddress addr, bool *eof);
125
126 static void fsm_redo_truncate(xl_fsm_truncate *xlrec);
127
128
129 /******** Public API ********/
130
131 /*
132  * GetPageWithFreeSpace - try to find a page in the given relation with
133  *              at least the specified amount of free space.
134  *
135  * If successful, return the block number; if not, return InvalidBlockNumber.
136  *
137  * The caller must be prepared for the possibility that the returned page
138  * will turn out to have too little space available by the time the caller
139  * gets a lock on it.  In that case, the caller should report the actual
140  * amount of free space available on that page and then try again (see
141  * RecordAndGetPageWithFreeSpace).      If InvalidBlockNumber is returned,
142  * extend the relation.
143  */
144 BlockNumber
145 GetPageWithFreeSpace(Relation rel, Size spaceNeeded)
146 {
147         uint8 min_cat = fsm_space_needed_to_cat(spaceNeeded);
148         return fsm_search(rel, min_cat);
149 }
150
151 /*
152  * RecordAndGetPageWithFreeSpace - update info about a page and try again.
153  *
154  * We provide this combo form to save some locking overhead, compared to
155  * separate RecordPageWithFreeSpace + GetPageWithFreeSpace calls. There's
156  * also some effort to return a page close to the old page; if there's a
157  * page with enough free space on the same FSM page where the old one page
158  * is located, it is preferred.
159  */
160 BlockNumber
161 RecordAndGetPageWithFreeSpace(Relation rel, BlockNumber oldPage,
162                                                           Size oldSpaceAvail, Size spaceNeeded)
163 {
164         int                     old_cat = fsm_space_avail_to_cat(oldSpaceAvail);
165         int                     search_cat = fsm_space_needed_to_cat(spaceNeeded);
166         FSMAddress      addr;
167         uint16          slot;
168         int                     search_slot;
169
170         /* Get the location of the FSM byte representing the heap block */
171         addr = fsm_get_location(oldPage, &slot);
172
173         search_slot = fsm_set_and_search(rel, addr, slot, old_cat, search_cat);
174
175         /*
176          * If fsm_set_and_search found a suitable new block, return that.
177          * Otherwise, search as usual.
178          */
179         if (search_slot != -1)
180                 return fsm_get_heap_blk(addr, search_slot);
181         else
182                 return fsm_search(rel, search_cat);
183 }
184
185 /*
186  * RecordPageWithFreeSpace - update info about a page.
187  *
188  * Note that if the new spaceAvail value is higher than the old value stored
189  * in the FSM, the space might not become visible to searchers until the next
190  * FreeSpaceMapVacuum call, which updates the upper level pages.
191  */
192 void
193 RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk, Size spaceAvail)
194 {
195         int                     new_cat = fsm_space_avail_to_cat(spaceAvail);
196         FSMAddress      addr;
197         uint16          slot;
198
199         /* Get the location of the FSM byte representing the heap block */
200         addr = fsm_get_location(heapBlk, &slot);
201
202         fsm_set_and_search(rel, addr, slot, new_cat, 0);
203 }
204
205 /*
206  * GetRecordedFreePage - return the amount of free space on a particular page,
207  *              according to the FSM.
208  */
209 Size
210 GetRecordedFreeSpace(Relation rel, BlockNumber heapBlk)
211 {
212         FSMAddress      addr;
213         uint16          slot;
214         Buffer          buf;
215         uint8           cat;
216
217         /* Get the location of the FSM byte representing the heap block */
218         addr = fsm_get_location(heapBlk, &slot);
219
220         buf = fsm_readbuf(rel, addr, false);
221         if (!BufferIsValid(buf))
222                 return 0;
223         cat = fsm_get_avail(BufferGetPage(buf), slot);
224         ReleaseBuffer(buf);
225
226         return fsm_space_cat_to_avail(cat);
227 }
228
229 /*
230  * FreeSpaceMapTruncateRel - adjust for truncation of a relation.
231  *
232  * The caller must hold AccessExclusiveLock on the relation, to ensure
233  * that other backends receive the relcache invalidation event that this
234  * function sends, before accessing the FSM again.
235  *
236  * nblocks is the new size of the heap.
237  */
238 void
239 FreeSpaceMapTruncateRel(Relation rel, BlockNumber nblocks)
240 {
241         BlockNumber     new_nfsmblocks;
242         FSMAddress      first_removed_address;
243         uint16          first_removed_slot;
244         Buffer          buf;
245
246         RelationOpenSmgr(rel);
247
248         /* Get the location in the FSM of the first removed heap block */
249         first_removed_address = fsm_get_location(nblocks, &first_removed_slot);
250
251         /*
252          * Zero out the tail of the last remaining FSM page. If the slot
253          * representing the first removed heap block is at a page boundary, as
254          * the first slot on the FSM page that first_removed_address points to,
255          * we can just truncate that page altogether.
256          */
257         if (first_removed_slot > 0)
258         {
259                 buf = fsm_readbuf(rel, first_removed_address, false);
260                 if (!BufferIsValid(buf))
261                         return; /* nothing to do; the FSM was already smaller */
262                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
263                 fsm_truncate_avail(BufferGetPage(buf), first_removed_slot);
264                 MarkBufferDirty(buf);
265                 UnlockReleaseBuffer(buf);
266
267                 new_nfsmblocks = fsm_logical_to_physical(first_removed_address) + 1;
268         }
269         else
270         {
271                 new_nfsmblocks = fsm_logical_to_physical(first_removed_address);
272                 if (smgrnblocks(rel->rd_smgr, FSM_FORKNUM) <= new_nfsmblocks)
273                         return; /* nothing to do; the FSM was already smaller */
274         }
275
276         /* Truncate the unused FSM pages */
277         smgrtruncate(rel->rd_smgr, FSM_FORKNUM, new_nfsmblocks, rel->rd_istemp);
278
279         /*
280          * FSM truncations are WAL-logged, because we must never return a block
281          * that doesn't exist in the heap, not even if we crash before the FSM
282          * truncation has made it to disk. smgrtruncate() writes its own WAL
283          * record, but that's not enough to zero out the last remaining FSM page.
284          * (if we didn't need to zero out anything above, we can skip this)
285          */
286         if (!rel->rd_istemp && first_removed_slot != 0)
287         {
288                 xl_fsm_truncate xlrec;
289                 XLogRecData             rdata;
290                 XLogRecPtr              recptr;
291
292                 xlrec.node = rel->rd_node;
293                 xlrec.nheapblocks = nblocks;
294
295                 rdata.data = (char *) &xlrec;
296                 rdata.len = sizeof(xl_fsm_truncate);
297                 rdata.buffer = InvalidBuffer;
298                 rdata.next = NULL;
299
300                 recptr = XLogInsert(RM_FREESPACE_ID, XLOG_FSM_TRUNCATE, &rdata);
301
302                 /*
303                  * Flush, because otherwise the truncation of the main relation
304                  * might hit the disk before the WAL record of truncating the
305                  * FSM is flushed. If we crashed during that window, we'd be
306                  * left with a truncated heap, without a truncated FSM.
307                  */
308                 XLogFlush(recptr);
309         }
310
311         /*
312          * Need to invalidate the relcache entry, because rd_fsm_nblocks_cache
313          * seen by other backends is no longer valid.
314          */
315         CacheInvalidateRelcache(rel);
316
317         rel->rd_fsm_nblocks_cache = new_nfsmblocks;
318 }
319
320 /*
321  * FreeSpaceMapVacuum - scan and fix any inconsistencies in the FSM
322  */
323 void
324 FreeSpaceMapVacuum(Relation rel)
325 {
326         bool dummy;
327
328         /*
329          * Traverse the tree in depth-first order. The tree is stored physically
330          * in depth-first order, so this should be pretty I/O efficient.
331          */
332         fsm_vacuum_page(rel, FSM_ROOT_ADDRESS, &dummy);
333 }
334
335 /******** Internal routines ********/
336
337 /*
338  * Return category corresponding x bytes of free space
339  */
340 static uint8
341 fsm_space_avail_to_cat(Size avail)
342 {
343         int cat;
344
345         Assert(avail < BLCKSZ);
346
347         if (avail >= MaxFSMRequestSize)
348                 return 255;
349
350         cat = avail / FSM_CAT_STEP;
351
352         /*
353          * The highest category, 255, is reserved for MaxFSMRequestSize bytes or
354          * more.
355          */
356         if (cat > 254)
357                 cat = 254;
358
359         return (uint8) cat;
360 }
361
362 /*
363  * Return the lower bound of the range of free space represented by given
364  * category.
365  */
366 static Size
367 fsm_space_cat_to_avail(uint8 cat)
368 {
369         /* The highest category represents exactly MaxFSMRequestSize bytes. */
370         if (cat == 255)
371                 return MaxFSMRequestSize;
372         else
373                 return cat * FSM_CAT_STEP;
374 }
375
376 /*
377  * Which category does a page need to have, to accommodate x bytes of data?
378  * While fsm_size_to_avail_cat() rounds down, this needs to round up.
379  */
380 static uint8
381 fsm_space_needed_to_cat(Size needed)
382 {
383         int cat;
384
385         /* Can't ask for more space than the highest category represents */
386         if (needed > MaxFSMRequestSize)
387                         elog(ERROR, "invalid FSM request size %lu",
388                                  (unsigned long) needed);
389
390         if (needed == 0)
391                 return 1;
392
393         cat = (needed + FSM_CAT_STEP - 1) / FSM_CAT_STEP;
394
395         if (cat > 255)
396                 cat = 255;
397
398         return (uint8) cat;
399 }
400
401 /*
402  * Returns the physical block number an FSM page
403  */
404 static BlockNumber
405 fsm_logical_to_physical(FSMAddress addr)
406 {
407         BlockNumber pages;
408         int leafno;
409         int l;
410
411         /*
412          * Calculate the logical page number of the first leaf page below the
413          * given page.
414          */
415         leafno = addr.logpageno;
416         for (l = 0; l < addr.level; l++)
417                 leafno *= SlotsPerFSMPage;
418
419         /* Count upper level nodes required to address the leaf page */
420         pages = 0;
421         for (l = 0; l < FSM_TREE_DEPTH; l++)
422         {
423                 pages += leafno + 1;
424                 leafno /= SlotsPerFSMPage;
425         }
426
427         /*
428          * If the page we were asked for wasn't at the bottom level, subtract
429          * the additional lower level pages we counted above.
430          */
431         pages -= addr.level;
432
433         /* Turn the page count into 0-based block number */
434         return pages - 1;
435 }
436
437 /*
438  * Return the FSM location corresponding to given heap block.
439  */
440 static FSMAddress
441 fsm_get_location(BlockNumber heapblk, uint16 *slot)
442 {
443         FSMAddress addr;
444
445         addr.level = FSM_BOTTOM_LEVEL;
446         addr.logpageno = heapblk / SlotsPerFSMPage;
447         *slot = heapblk % SlotsPerFSMPage;
448
449         return addr;
450 }
451
452 /*
453  * Return the heap block number corresponding to given location in the FSM.
454  */
455 static BlockNumber
456 fsm_get_heap_blk(FSMAddress addr, uint16 slot)
457 {
458         Assert(addr.level == FSM_BOTTOM_LEVEL);
459         return ((unsigned int) addr.logpageno) * SlotsPerFSMPage + slot;
460 }
461
462 /*
463  * Given a logical address of a child page, get the logical page number of
464  * the parent, and the slot within the parent corresponding to the child.
465  */
466 static FSMAddress
467 fsm_get_parent(FSMAddress child, uint16 *slot)
468 {
469         FSMAddress parent;
470
471         Assert(child.level < FSM_ROOT_LEVEL);
472
473         parent.level = child.level + 1;
474         parent.logpageno = child.logpageno / SlotsPerFSMPage;
475         *slot = child.logpageno % SlotsPerFSMPage;
476
477         return parent;
478 }
479
480 /*
481  * Given a logical address of a parent page, and a slot number get the
482  * logical address of the corresponding child page.
483  */
484 static FSMAddress
485 fsm_get_child(FSMAddress parent, uint16 slot)
486 {
487         FSMAddress child;
488
489         Assert(parent.level > FSM_BOTTOM_LEVEL);
490
491         child.level = parent.level - 1;
492         child.logpageno = parent.logpageno * SlotsPerFSMPage + slot;
493
494         return child;
495 }
496
497 /*
498  * Read a FSM page.
499  *
500  * If the page doesn't exist, InvalidBuffer is returned, or if 'extend' is
501  * true, the FSM file is extended.
502  */
503 static Buffer
504 fsm_readbuf(Relation rel, FSMAddress addr, bool extend)
505 {
506         BlockNumber blkno = fsm_logical_to_physical(addr);
507         Buffer buf;
508
509         RelationOpenSmgr(rel);
510
511         if (rel->rd_fsm_nblocks_cache == InvalidBlockNumber || 
512                 rel->rd_fsm_nblocks_cache <= blkno)
513                 rel->rd_fsm_nblocks_cache = smgrnblocks(rel->rd_smgr, FSM_FORKNUM);
514
515         if (blkno >= rel->rd_fsm_nblocks_cache)
516         {
517                 if (extend)
518                         fsm_extend(rel, blkno + 1);
519                 else
520                         return InvalidBuffer;
521         }
522
523         /*
524          * Use ZERO_ON_ERROR mode, and initialize the page if necessary. The FSM
525          * information is not accurate anyway, so it's better to clear corrupt
526          * pages than error out. Since the FSM changes are not WAL-logged, the
527          * so-called torn page problem on crash can lead to pages with corrupt
528          * headers, for example.
529          */
530         buf = ReadBufferExtended(rel, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR, NULL);
531         if (PageIsNew(BufferGetPage(buf)))
532                 PageInit(BufferGetPage(buf), BLCKSZ, 0);
533         return buf;
534 }
535
536 /*
537  * Ensure that the FSM fork is at least n_fsmblocks long, extending
538  * it if necessary with empty pages. And by empty, I mean pages filled
539  * with zeros, meaning there's no free space.
540  */
541 static void
542 fsm_extend(Relation rel, BlockNumber n_fsmblocks)
543 {
544         BlockNumber n_fsmblocks_now;
545         Page pg;
546
547         pg = (Page) palloc(BLCKSZ);
548         PageInit(pg, BLCKSZ, 0);
549
550         /*
551          * We use the relation extension lock to lock out other backends
552          * trying to extend the FSM at the same time. It also locks out
553          * extension of the main fork, unnecessarily, but extending the
554          * FSM happens seldom enough that it doesn't seem worthwhile to
555          * have a separate lock tag type for it.
556          *
557          * Note that another backend might have extended the relation
558          * before we get the lock.
559          */
560         LockRelationForExtension(rel, ExclusiveLock);
561
562         n_fsmblocks_now = smgrnblocks(rel->rd_smgr, FSM_FORKNUM);
563         while (n_fsmblocks_now < n_fsmblocks)
564         {
565                 smgrextend(rel->rd_smgr, FSM_FORKNUM, n_fsmblocks_now,
566                                    (char *) pg, rel->rd_istemp);
567                 n_fsmblocks_now++;
568         }
569
570         UnlockRelationForExtension(rel, ExclusiveLock);
571
572         pfree(pg);
573
574         /* update the cache with the up-to-date size */
575         rel->rd_fsm_nblocks_cache = n_fsmblocks_now;
576 }
577
578 /*
579  * Set value in given FSM page and slot.
580  *
581  * If minValue > 0, the updated page is also searched for a page with at
582  * least minValue of free space. If one is found, its slot number is
583  * returned, -1 otherwise.
584  */
585 static int
586 fsm_set_and_search(Relation rel, FSMAddress addr, uint16 slot,
587                                    uint8 newValue, uint8 minValue)
588 {
589         Buffer          buf;
590         Page            page;
591         int                     newslot = -1;
592
593         buf = fsm_readbuf(rel, addr, true);
594         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
595
596         page = BufferGetPage(buf);
597
598         if (fsm_set_avail(page, slot, newValue))
599                 MarkBufferDirty(buf);
600
601         if (minValue != 0)
602         {
603                 /* Search while we still hold the lock */
604                 newslot = fsm_search_avail(buf, minValue,
605                                                                    addr.level == FSM_BOTTOM_LEVEL,
606                                                                    true);
607         }
608
609         UnlockReleaseBuffer(buf);
610
611         return newslot;
612 }
613
614 /*
615  * Search the tree for a heap page with at least min_cat of free space
616  */
617 static BlockNumber
618 fsm_search(Relation rel, uint8 min_cat)
619 {
620         int restarts = 0;
621         FSMAddress addr = FSM_ROOT_ADDRESS;
622
623         for (;;)
624         {
625                 int slot;
626                 Buffer buf;
627                 uint8 max_avail = 0;
628
629                 /*
630                  * Read the FSM page. The root page is created if it doesn't exist
631                  * yet, to save future searchers the effort of having to call
632                  * smgrnblocks() in fsm_readbuf(), only to see that the FSM is
633                  * completely empty.
634                  */
635                 buf = fsm_readbuf(rel, addr, (addr.level != FSM_ROOT_LEVEL));
636
637                 /* Search within the page */
638                 if (BufferIsValid(buf))
639                 {
640                         LockBuffer(buf, BUFFER_LOCK_SHARE);
641                         slot = fsm_search_avail(buf, min_cat,
642                                                                         (addr.level == FSM_BOTTOM_LEVEL),
643                                                                         false);
644                         if (slot == -1)
645                                 max_avail = fsm_get_max_avail(BufferGetPage(buf));
646                         UnlockReleaseBuffer(buf);
647                 }
648                 else
649                         slot = -1;
650
651                 if (slot != -1)
652                 {
653                         /*
654                          * Descend the tree, or return the found block if we're at the
655                          * bottom.
656                          */
657                         if (addr.level == FSM_BOTTOM_LEVEL)
658                                 return fsm_get_heap_blk(addr, slot);
659
660                         addr = fsm_get_child(addr, slot);
661                 }
662                 else if (addr.level == FSM_ROOT_LEVEL)
663                 {
664                         /*
665                          * At the root, failure means there's no page with enough free
666                          * space in the FSM. Give up.
667                          */
668                         return InvalidBlockNumber;
669                 }
670                 else
671                 {
672                         uint16 parentslot;
673                         FSMAddress parent;
674
675                         /*
676                          * At lower level, failure can happen if the value in the upper-
677                          * level node didn't reflect the value on the lower page. Update
678                          * the upper node, to avoid falling into the same trap again, and
679                          * start over.
680                          *
681                          * There's a race condition here, if another backend updates this
682                          * page right after we release it, and gets the lock on the parent
683                          * page before us. We'll then update the parent page with the now
684                          * stale information we had. It's OK, because it should happen
685                          * rarely, and will be fixed by the next vacuum.
686                          */
687                         parent = fsm_get_parent(addr, &parentslot);
688                         fsm_set_and_search(rel, parent, parentslot, max_avail, 0);
689
690                         /*
691                          * If the upper pages are badly out of date, we might need to
692                          * loop quite a few times, updating them as we go. Any
693                          * inconsistencies should eventually be corrected and the loop
694                          * should end. Looping indefinitely is nevertheless scary, so
695                          * provide an emergency valve.
696                          */
697                         if (restarts++ > 10000)
698                                 return InvalidBlockNumber;
699
700                         /* Start search all over from the root */
701                         addr = FSM_ROOT_ADDRESS;
702                 }
703         }
704 }
705
706
707 /*
708  * Recursive guts of FreeSpaceMapVacuum
709  */
710 static uint8
711 fsm_vacuum_page(Relation rel, FSMAddress addr, bool *eof_p)
712 {
713         Buffer buf;
714         Page page;
715         uint8 max_avail;
716
717         /* Read the page if it exists, or return EOF */
718         buf = fsm_readbuf(rel, addr, false);
719         if (!BufferIsValid(buf))
720         {
721                 *eof_p = true;
722                 return 0;
723         }
724         else
725                 *eof_p = false;
726
727         page = BufferGetPage(buf);
728
729         /*
730          * Recurse into children, and fix the information stored about them
731          * at this level.
732          */
733         if (addr.level > FSM_BOTTOM_LEVEL)
734         {
735                 int slot;
736                 bool eof = false;
737
738                 for (slot = 0; slot < SlotsPerFSMPage; slot++)
739                 {
740                         int child_avail;
741
742                         /* After we hit end-of-file, just clear the rest of the slots */
743                         if (!eof)
744                                 child_avail = fsm_vacuum_page(rel, fsm_get_child(addr, slot), &eof);
745                         else
746                                 child_avail = 0;
747
748                         /* Update information about the child */
749                         if (fsm_get_avail(page, slot) != child_avail)
750                         {
751                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
752                                 fsm_set_avail(BufferGetPage(buf), slot, child_avail);
753                                 MarkBufferDirty(buf);
754                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
755                         }
756                 }
757         }
758
759         max_avail = fsm_get_max_avail(BufferGetPage(buf));
760
761         /*
762          * Reset the next slot pointer. This encourages the use of low-numbered
763          * pages, increasing the chances that a later vacuum can truncate the
764          * relation.
765          */
766         ((FSMPage) PageGetContents(page))->fp_next_slot = 0;
767
768         ReleaseBuffer(buf);
769
770         return max_avail;
771 }
772
773
774 /****** WAL-logging ******/
775
776 static void
777 fsm_redo_truncate(xl_fsm_truncate *xlrec)
778 {
779         FSMAddress      first_removed_address;
780         uint16          first_removed_slot;
781         BlockNumber fsmblk;
782         Buffer          buf;
783
784         /* Get the location in the FSM of the first removed heap block */
785         first_removed_address = fsm_get_location(xlrec->nheapblocks,
786                                                                                          &first_removed_slot);
787         fsmblk = fsm_logical_to_physical(first_removed_address);
788
789         /*
790          * Zero out the tail of the last remaining FSM page. We rely on the
791          * replay of the smgr truncation record to remove completely unused
792          * pages.
793          */
794         buf = XLogReadBufferExtended(xlrec->node, FSM_FORKNUM, fsmblk,
795                                                                  RBM_ZERO_ON_ERROR);
796         if (BufferIsValid(buf))
797         {
798                 Page page = BufferGetPage(buf);
799
800                 if (PageIsNew(page))
801                         PageInit(page, BLCKSZ, 0);
802                 fsm_truncate_avail(page, first_removed_slot);
803                 MarkBufferDirty(buf);
804                 UnlockReleaseBuffer(buf);
805         }
806 }
807
808 void
809 fsm_redo(XLogRecPtr lsn, XLogRecord *record)
810 {
811         uint8           info = record->xl_info & ~XLR_INFO_MASK;
812
813         switch (info)
814         {
815                 case XLOG_FSM_TRUNCATE:
816                         fsm_redo_truncate((xl_fsm_truncate *) XLogRecGetData(record));
817                         break;
818                 default:
819                         elog(PANIC, "fsm_redo: unknown op code %u", info);
820         }
821 }
822
823 void
824 fsm_desc(StringInfo buf, uint8 xl_info, char *rec)
825 {
826         uint8           info = xl_info & ~XLR_INFO_MASK;
827
828         switch (info)
829         {
830                 case XLOG_FSM_TRUNCATE:
831                 {
832                         xl_fsm_truncate *xlrec = (xl_fsm_truncate *) rec;
833
834                         appendStringInfo(buf, "truncate: rel %u/%u/%u; nheapblocks %u;",
835                                                          xlrec->node.spcNode, xlrec->node.dbNode,
836                                                          xlrec->node.relNode, xlrec->nheapblocks);
837                         break;
838                 }
839                 default:
840                         appendStringInfo(buf, "UNKNOWN");
841                         break;
842         }
843 }