]> granicus.if.org Git - postgresql/blob - src/backend/access/nbtree/nbtxlog.c
5f9fc49e78ca1388ab482e24c8b5a873238ae0b6
[postgresql] / src / backend / access / nbtree / nbtxlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtxlog.c
4  *        WAL replay logic for btrees.
5  *
6  *
7  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        src/backend/access/nbtree/nbtxlog.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/heapam_xlog.h"
18 #include "access/nbtree.h"
19 #include "access/transam.h"
20 #include "storage/procarray.h"
21 #include "miscadmin.h"
22
23 /*
24  * _bt_restore_page -- re-enter all the index tuples on a page
25  *
26  * The page is freshly init'd, and *from (length len) is a copy of what
27  * had been its upper part (pd_upper to pd_special).  We assume that the
28  * tuples had been added to the page in item-number order, and therefore
29  * the one with highest item number appears first (lowest on the page).
30  */
31 static void
32 _bt_restore_page(Page page, char *from, int len)
33 {
34         IndexTupleData itupdata;
35         Size            itemsz;
36         char       *end = from + len;
37         Item            items[MaxIndexTuplesPerPage];
38         uint16          itemsizes[MaxIndexTuplesPerPage];
39         int                     i;
40         int                     nitems;
41
42         /*
43          * To get the items back in the original order, we add them to the page in
44          * reverse.  To figure out where one tuple ends and another begins, we
45          * have to scan them in forward order first.
46          */
47         i = 0;
48         while (from < end)
49         {
50                 /* Need to copy tuple header due to alignment considerations */
51                 memcpy(&itupdata, from, sizeof(IndexTupleData));
52                 itemsz = IndexTupleDSize(itupdata);
53                 itemsz = MAXALIGN(itemsz);
54
55                 items[i] = (Item) from;
56                 itemsizes[i] = itemsz;
57                 i++;
58
59                 from += itemsz;
60         }
61         nitems = i;
62
63         for (i = nitems - 1; i >= 0; i--)
64         {
65                 if (PageAddItem(page, items[i], itemsizes[i], nitems - i,
66                                                 false, false) == InvalidOffsetNumber)
67                         elog(PANIC, "_bt_restore_page: cannot add item to page");
68                 from += itemsz;
69         }
70 }
71
72 static void
73 _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn,
74                                  BlockNumber root, uint32 level,
75                                  BlockNumber fastroot, uint32 fastlevel)
76 {
77         Buffer          metabuf;
78         Page            metapg;
79         BTMetaPageData *md;
80         BTPageOpaque pageop;
81
82         metabuf = XLogReadBuffer(rnode, BTREE_METAPAGE, true);
83         Assert(BufferIsValid(metabuf));
84         metapg = BufferGetPage(metabuf);
85
86         _bt_pageinit(metapg, BufferGetPageSize(metabuf));
87
88         md = BTPageGetMeta(metapg);
89         md->btm_magic = BTREE_MAGIC;
90         md->btm_version = BTREE_VERSION;
91         md->btm_root = root;
92         md->btm_level = level;
93         md->btm_fastroot = fastroot;
94         md->btm_fastlevel = fastlevel;
95
96         pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
97         pageop->btpo_flags = BTP_META;
98
99         /*
100          * Set pd_lower just past the end of the metadata.  This is not essential
101          * but it makes the page look compressible to xlog.c.
102          */
103         ((PageHeader) metapg)->pd_lower =
104                 ((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;
105
106         PageSetLSN(metapg, lsn);
107         MarkBufferDirty(metabuf);
108         UnlockReleaseBuffer(metabuf);
109 }
110
111 /*
112  * _bt_clear_incomplete_split -- clear INCOMPLETE_SPLIT flag on a page
113  *
114  * This is a common subroutine of the redo functions of all the WAL record
115  * types that can insert a downlink: insert, split, and newroot.
116  */
117 static void
118 _bt_clear_incomplete_split(XLogRecPtr lsn, XLogRecord *record,
119                                                    RelFileNode rnode, BlockNumber cblock)
120 {
121         Buffer          buf;
122
123         buf = XLogReadBuffer(rnode, cblock, false);
124         if (BufferIsValid(buf))
125         {
126                 Page            page = (Page) BufferGetPage(buf);
127
128                 if (lsn > PageGetLSN(page))
129                 {
130                         BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
131
132                         Assert((pageop->btpo_flags & BTP_INCOMPLETE_SPLIT) != 0);
133                         pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
134
135                         PageSetLSN(page, lsn);
136                         MarkBufferDirty(buf);
137                 }
138                 UnlockReleaseBuffer(buf);
139         }
140 }
141
142 static void
143 btree_xlog_insert(bool isleaf, bool ismeta,
144                                   XLogRecPtr lsn, XLogRecord *record)
145 {
146         xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
147         Buffer          buffer;
148         Page            page;
149         char       *datapos;
150         int                     datalen;
151         xl_btree_metadata md;
152         BlockNumber cblkno = 0;
153         int                     main_blk_index;
154
155         datapos = (char *) xlrec + SizeOfBtreeInsert;
156         datalen = record->xl_len - SizeOfBtreeInsert;
157
158         /*
159          * if this insert finishes a split at lower level, extract the block
160          * number of the (left) child.
161          */
162         if (!isleaf && (record->xl_info & XLR_BKP_BLOCK(0)) == 0)
163         {
164                 memcpy(&cblkno, datapos, sizeof(BlockNumber));
165                 Assert(cblkno != 0);
166                 datapos += sizeof(BlockNumber);
167                 datalen -= sizeof(BlockNumber);
168         }
169         if (ismeta)
170         {
171                 memcpy(&md, datapos, sizeof(xl_btree_metadata));
172                 datapos += sizeof(xl_btree_metadata);
173                 datalen -= sizeof(xl_btree_metadata);
174         }
175
176         /*
177          * Insertion to an internal page finishes an incomplete split at the child
178          * level.  Clear the incomplete-split flag in the child.  Note: during
179          * normal operation, the child and parent pages are locked at the same
180          * time, so that clearing the flag and inserting the downlink appear
181          * atomic to other backends.  We don't bother with that during replay,
182          * because readers don't care about the incomplete-split flag and there
183          * cannot be updates happening.
184          */
185         if (!isleaf)
186         {
187                 if (record->xl_info & XLR_BKP_BLOCK(0))
188                         (void) RestoreBackupBlock(lsn, record, 0, false, false);
189                 else
190                         _bt_clear_incomplete_split(lsn, record, xlrec->target.node, cblkno);
191                 main_blk_index = 1;
192         }
193         else
194                 main_blk_index = 0;
195
196         if (record->xl_info & XLR_BKP_BLOCK(main_blk_index))
197                 (void) RestoreBackupBlock(lsn, record, main_blk_index, false, false);
198         else
199         {
200                 buffer = XLogReadBuffer(xlrec->target.node,
201                                                          ItemPointerGetBlockNumber(&(xlrec->target.tid)),
202                                                                 false);
203                 if (BufferIsValid(buffer))
204                 {
205                         page = (Page) BufferGetPage(buffer);
206
207                         if (lsn > PageGetLSN(page))
208                         {
209                                 if (PageAddItem(page, (Item) datapos, datalen,
210                                                         ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
211                                                                 false, false) == InvalidOffsetNumber)
212                                         elog(PANIC, "btree_insert_redo: failed to add item");
213
214                                 PageSetLSN(page, lsn);
215                                 MarkBufferDirty(buffer);
216                         }
217                         UnlockReleaseBuffer(buffer);
218                 }
219         }
220
221         /*
222          * Note: in normal operation, we'd update the metapage while still holding
223          * lock on the page we inserted into.  But during replay it's not
224          * necessary to hold that lock, since no other index updates can be
225          * happening concurrently, and readers will cope fine with following an
226          * obsolete link from the metapage.
227          */
228         if (ismeta)
229                 _bt_restore_meta(xlrec->target.node, lsn,
230                                                  md.root, md.level,
231                                                  md.fastroot, md.fastlevel);
232 }
233
234 static void
235 btree_xlog_split(bool onleft, bool isroot,
236                                  XLogRecPtr lsn, XLogRecord *record)
237 {
238         xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
239         bool            isleaf = (xlrec->level == 0);
240         Buffer          lbuf;
241         Buffer          rbuf;
242         Page            rpage;
243         BTPageOpaque ropaque;
244         char       *datapos;
245         int                     datalen;
246         OffsetNumber newitemoff = 0;
247         Item            newitem = NULL;
248         Size            newitemsz = 0;
249         Item            left_hikey = NULL;
250         Size            left_hikeysz = 0;
251         BlockNumber cblkno = InvalidBlockNumber;
252
253         datapos = (char *) xlrec + SizeOfBtreeSplit;
254         datalen = record->xl_len - SizeOfBtreeSplit;
255
256         /* Extract newitemoff and newitem, if present */
257         if (onleft)
258         {
259                 memcpy(&newitemoff, datapos, sizeof(OffsetNumber));
260                 datapos += sizeof(OffsetNumber);
261                 datalen -= sizeof(OffsetNumber);
262         }
263         if (onleft && !(record->xl_info & XLR_BKP_BLOCK(0)))
264         {
265                 /*
266                  * We assume that 16-bit alignment is enough to apply IndexTupleSize
267                  * (since it's fetching from a uint16 field) and also enough for
268                  * PageAddItem to insert the tuple.
269                  */
270                 newitem = (Item) datapos;
271                 newitemsz = MAXALIGN(IndexTupleSize(newitem));
272                 datapos += newitemsz;
273                 datalen -= newitemsz;
274         }
275
276         /* Extract left hikey and its size (still assuming 16-bit alignment) */
277         if (!isleaf && !(record->xl_info & XLR_BKP_BLOCK(0)))
278         {
279                 left_hikey = (Item) datapos;
280                 left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
281                 datapos += left_hikeysz;
282                 datalen -= left_hikeysz;
283         }
284
285         /*
286          * If this insertion finishes an incomplete split, get the block number of
287          * the child.
288          */
289         if (!isleaf && !(record->xl_info & XLR_BKP_BLOCK(1)))
290         {
291                 memcpy(&cblkno, datapos, sizeof(BlockNumber));
292                 datapos += sizeof(BlockNumber);
293                 datalen -= sizeof(BlockNumber);
294         }
295
296         /*
297          * Clear the incomplete split flag on the left sibling of the child page
298          * this is a downlink for.  (Like in btree_xlog_insert, this can be done
299          * before locking the other pages)
300          */
301         if (!isleaf)
302         {
303                 if (record->xl_info & XLR_BKP_BLOCK(1))
304                         (void) RestoreBackupBlock(lsn, record, 1, false, false);
305                 else
306                         _bt_clear_incomplete_split(lsn, record, xlrec->node, cblkno);
307         }
308
309         /* Reconstruct right (new) sibling page from scratch */
310         rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true);
311         Assert(BufferIsValid(rbuf));
312         rpage = (Page) BufferGetPage(rbuf);
313
314         _bt_pageinit(rpage, BufferGetPageSize(rbuf));
315         ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
316
317         ropaque->btpo_prev = xlrec->leftsib;
318         ropaque->btpo_next = xlrec->rnext;
319         ropaque->btpo.level = xlrec->level;
320         ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
321         ropaque->btpo_cycleid = 0;
322
323         _bt_restore_page(rpage, datapos, datalen);
324
325         /*
326          * On leaf level, the high key of the left page is equal to the first key
327          * on the right page.
328          */
329         if (isleaf)
330         {
331                 ItemId          hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque));
332
333                 left_hikey = PageGetItem(rpage, hiItemId);
334                 left_hikeysz = ItemIdGetLength(hiItemId);
335         }
336
337         PageSetLSN(rpage, lsn);
338         MarkBufferDirty(rbuf);
339
340         /* don't release the buffer yet; we touch right page's first item below */
341
342         /* Now reconstruct left (original) sibling page */
343         if (record->xl_info & XLR_BKP_BLOCK(0))
344                 lbuf = RestoreBackupBlock(lsn, record, 0, false, true);
345         else
346         {
347                 lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);
348
349                 if (BufferIsValid(lbuf))
350                 {
351                         /*
352                          * To retain the same physical order of the tuples that they had,
353                          * we initialize a temporary empty page for the left page and add
354                          * all the items to that in item number order.  This mirrors how
355                          * _bt_split() works.  It's not strictly required to retain the
356                          * same physical order, as long as the items are in the correct
357                          * item number order, but it helps debugging.  See also
358                          * _bt_restore_page(), which does the same for the right page.
359                          */
360                         Page            lpage = (Page) BufferGetPage(lbuf);
361                         BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
362
363                         if (lsn > PageGetLSN(lpage))
364                         {
365                                 OffsetNumber off;
366                                 Page            newlpage;
367                                 OffsetNumber leftoff;
368
369                                 newlpage = PageGetTempPageCopySpecial(lpage);
370
371                                 /* Set high key */
372                                 leftoff = P_HIKEY;
373                                 if (PageAddItem(newlpage, left_hikey, left_hikeysz,
374                                                                 P_HIKEY, false, false) == InvalidOffsetNumber)
375                                         elog(PANIC, "failed to add high key to left page after split");
376                                 leftoff = OffsetNumberNext(leftoff);
377
378                                 for (off = P_FIRSTDATAKEY(lopaque); off < xlrec->firstright; off++)
379                                 {
380                                         ItemId          itemid;
381                                         Size            itemsz;
382                                         Item            item;
383
384                                         /* add the new item if it was inserted on left page */
385                                         if (onleft && off == newitemoff)
386                                         {
387                                                 if (PageAddItem(newlpage, newitem, newitemsz, leftoff,
388                                                                                 false, false) == InvalidOffsetNumber)
389                                                         elog(ERROR, "failed to add new item to left page after split");
390                                                 leftoff = OffsetNumberNext(leftoff);
391                                         }
392
393                                         itemid = PageGetItemId(lpage, off);
394                                         itemsz = ItemIdGetLength(itemid);
395                                         item = PageGetItem(lpage, itemid);
396                                         if (PageAddItem(newlpage, item, itemsz, leftoff,
397                                                                         false, false) == InvalidOffsetNumber)
398                                                 elog(ERROR, "failed to add old item to left page after split");
399                                         leftoff = OffsetNumberNext(leftoff);
400                                 }
401
402                                 /* cope with possibility that newitem goes at the end */
403                                 if (onleft && off == newitemoff)
404                                 {
405                                         if (PageAddItem(newlpage, newitem, newitemsz, leftoff,
406                                                                         false, false) == InvalidOffsetNumber)
407                                                 elog(ERROR, "failed to add new item to left page after split");
408                                         leftoff = OffsetNumberNext(leftoff);
409                                 }
410
411                                 PageRestoreTempPage(newlpage, lpage);
412
413                                 /* Fix opaque fields */
414                                 lopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
415                                 if (isleaf)
416                                         lopaque->btpo_flags |= BTP_LEAF;
417                                 lopaque->btpo_next = xlrec->rightsib;
418                                 lopaque->btpo_cycleid = 0;
419
420                                 PageSetLSN(lpage, lsn);
421                                 MarkBufferDirty(lbuf);
422                         }
423                 }
424         }
425
426         /* We no longer need the buffers */
427         if (BufferIsValid(lbuf))
428                 UnlockReleaseBuffer(lbuf);
429         UnlockReleaseBuffer(rbuf);
430
431         /*
432          * Fix left-link of the page to the right of the new right sibling.
433          *
434          * Note: in normal operation, we do this while still holding lock on the
435          * two split pages.  However, that's not necessary for correctness in WAL
436          * replay, because no other index update can be in progress, and readers
437          * will cope properly when following an obsolete left-link.
438          */
439         if (xlrec->rnext != P_NONE)
440         {
441                 /*
442                  * the backup block containing right sibling is 1 or 2, depending
443                  * whether this was a leaf or internal page.
444                  */
445                 int                     rnext_index = isleaf ? 1 : 2;
446
447                 if (record->xl_info & XLR_BKP_BLOCK(rnext_index))
448                         (void) RestoreBackupBlock(lsn, record, rnext_index, false, false);
449                 else
450                 {
451                         Buffer          buffer;
452
453                         buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);
454
455                         if (BufferIsValid(buffer))
456                         {
457                                 Page            page = (Page) BufferGetPage(buffer);
458
459                                 if (lsn > PageGetLSN(page))
460                                 {
461                                         BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
462
463                                         pageop->btpo_prev = xlrec->rightsib;
464
465                                         PageSetLSN(page, lsn);
466                                         MarkBufferDirty(buffer);
467                                 }
468                                 UnlockReleaseBuffer(buffer);
469                         }
470                 }
471         }
472 }
473
474 static void
475 btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record)
476 {
477         xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
478         Buffer          buffer;
479         Page            page;
480         BTPageOpaque opaque;
481
482         /*
483          * If queries might be active then we need to ensure every leaf page is
484          * unpinned between the lastBlockVacuumed and the current block, if there
485          * are any.  This prevents replay of the VACUUM from reaching the stage of
486          * removing heap tuples while there could still be indexscans "in flight"
487          * to those particular tuples (see nbtree/README).
488          *
489          * It might be worth checking if there are actually any backends running;
490          * if not, we could just skip this.
491          *
492          * Since VACUUM can visit leaf pages out-of-order, it might issue records
493          * with lastBlockVacuumed >= block; that's not an error, it just means
494          * nothing to do now.
495          *
496          * Note: since we touch all pages in the range, we will lock non-leaf
497          * pages, and also any empty (all-zero) pages that may be in the index. It
498          * doesn't seem worth the complexity to avoid that.  But it's important
499          * that HotStandbyActiveInReplay() will not return true if the database
500          * isn't yet consistent; so we need not fear reading still-corrupt blocks
501          * here during crash recovery.
502          */
503         if (HotStandbyActiveInReplay())
504         {
505                 BlockNumber blkno;
506
507                 for (blkno = xlrec->lastBlockVacuumed + 1; blkno < xlrec->block; blkno++)
508                 {
509                         /*
510                          * We use RBM_NORMAL_NO_LOG mode because it's not an error
511                          * condition to see all-zero pages.  The original btvacuumpage
512                          * scan would have skipped over all-zero pages, noting them in FSM
513                          * but not bothering to initialize them just yet; so we mustn't
514                          * throw an error here.  (We could skip acquiring the cleanup lock
515                          * if PageIsNew, but it's probably not worth the cycles to test.)
516                          *
517                          * XXX we don't actually need to read the block, we just need to
518                          * confirm it is unpinned. If we had a special call into the
519                          * buffer manager we could optimise this so that if the block is
520                          * not in shared_buffers we confirm it as unpinned.
521                          */
522                         buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, blkno,
523                                                                                         RBM_NORMAL_NO_LOG);
524                         if (BufferIsValid(buffer))
525                         {
526                                 LockBufferForCleanup(buffer);
527                                 UnlockReleaseBuffer(buffer);
528                         }
529                 }
530         }
531
532         /*
533          * If we have a full-page image, restore it (using a cleanup lock) and
534          * we're done.
535          */
536         if (record->xl_info & XLR_BKP_BLOCK(0))
537         {
538                 (void) RestoreBackupBlock(lsn, record, 0, true, false);
539                 return;
540         }
541
542         /*
543          * Like in btvacuumpage(), we need to take a cleanup lock on every leaf
544          * page. See nbtree/README for details.
545          */
546         buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
547         if (!BufferIsValid(buffer))
548                 return;
549         LockBufferForCleanup(buffer);
550         page = (Page) BufferGetPage(buffer);
551
552         if (lsn <= PageGetLSN(page))
553         {
554                 UnlockReleaseBuffer(buffer);
555                 return;
556         }
557
558         if (record->xl_len > SizeOfBtreeVacuum)
559         {
560                 OffsetNumber *unused;
561                 OffsetNumber *unend;
562
563                 unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeVacuum);
564                 unend = (OffsetNumber *) ((char *) xlrec + record->xl_len);
565
566                 if ((unend - unused) > 0)
567                         PageIndexMultiDelete(page, unused, unend - unused);
568         }
569
570         /*
571          * Mark the page as not containing any LP_DEAD items --- see comments in
572          * _bt_delitems_vacuum().
573          */
574         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
575         opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
576
577         PageSetLSN(page, lsn);
578         MarkBufferDirty(buffer);
579         UnlockReleaseBuffer(buffer);
580 }
581
582 /*
583  * Get the latestRemovedXid from the heap pages pointed at by the index
584  * tuples being deleted. This puts the work for calculating latestRemovedXid
585  * into the recovery path rather than the primary path.
586  *
587  * It's possible that this generates a fair amount of I/O, since an index
588  * block may have hundreds of tuples being deleted. Repeat accesses to the
589  * same heap blocks are common, though are not yet optimised.
590  *
591  * XXX optimise later with something like XLogPrefetchBuffer()
592  */
593 static TransactionId
594 btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec)
595 {
596         OffsetNumber *unused;
597         Buffer          ibuffer,
598                                 hbuffer;
599         Page            ipage,
600                                 hpage;
601         ItemId          iitemid,
602                                 hitemid;
603         IndexTuple      itup;
604         HeapTupleHeader htuphdr;
605         BlockNumber hblkno;
606         OffsetNumber hoffnum;
607         TransactionId latestRemovedXid = InvalidTransactionId;
608         int                     i;
609
610         /*
611          * If there's nothing running on the standby we don't need to derive a
612          * full latestRemovedXid value, so use a fast path out of here.  This
613          * returns InvalidTransactionId, and so will conflict with all HS
614          * transactions; but since we just worked out that that's zero people,
615          * it's OK.
616          *
617          * XXX There is a race condition here, which is that a new backend might
618          * start just after we look.  If so, it cannot need to conflict, but this
619          * coding will result in throwing a conflict anyway.
620          */
621         if (CountDBBackends(InvalidOid) == 0)
622                 return latestRemovedXid;
623
624         /*
625          * In what follows, we have to examine the previous state of the index
626          * page, as well as the heap page(s) it points to.  This is only valid if
627          * WAL replay has reached a consistent database state; which means that
628          * the preceding check is not just an optimization, but is *necessary*. We
629          * won't have let in any user sessions before we reach consistency.
630          */
631         if (!reachedConsistency)
632                 elog(PANIC, "btree_xlog_delete_get_latestRemovedXid: cannot operate with inconsistent data");
633
634         /*
635          * Get index page.  If the DB is consistent, this should not fail, nor
636          * should any of the heap page fetches below.  If one does, we return
637          * InvalidTransactionId to cancel all HS transactions.  That's probably
638          * overkill, but it's safe, and certainly better than panicking here.
639          */
640         ibuffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
641         if (!BufferIsValid(ibuffer))
642                 return InvalidTransactionId;
643         ipage = (Page) BufferGetPage(ibuffer);
644
645         /*
646          * Loop through the deleted index items to obtain the TransactionId from
647          * the heap items they point to.
648          */
649         unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
650
651         for (i = 0; i < xlrec->nitems; i++)
652         {
653                 /*
654                  * Identify the index tuple about to be deleted
655                  */
656                 iitemid = PageGetItemId(ipage, unused[i]);
657                 itup = (IndexTuple) PageGetItem(ipage, iitemid);
658
659                 /*
660                  * Locate the heap page that the index tuple points at
661                  */
662                 hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
663                 hbuffer = XLogReadBuffer(xlrec->hnode, hblkno, false);
664                 if (!BufferIsValid(hbuffer))
665                 {
666                         UnlockReleaseBuffer(ibuffer);
667                         return InvalidTransactionId;
668                 }
669                 hpage = (Page) BufferGetPage(hbuffer);
670
671                 /*
672                  * Look up the heap tuple header that the index tuple points at by
673                  * using the heap node supplied with the xlrec. We can't use
674                  * heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
675                  * Note that we are not looking at tuple data here, just headers.
676                  */
677                 hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
678                 hitemid = PageGetItemId(hpage, hoffnum);
679
680                 /*
681                  * Follow any redirections until we find something useful.
682                  */
683                 while (ItemIdIsRedirected(hitemid))
684                 {
685                         hoffnum = ItemIdGetRedirect(hitemid);
686                         hitemid = PageGetItemId(hpage, hoffnum);
687                         CHECK_FOR_INTERRUPTS();
688                 }
689
690                 /*
691                  * If the heap item has storage, then read the header and use that to
692                  * set latestRemovedXid.
693                  *
694                  * Some LP_DEAD items may not be accessible, so we ignore them.
695                  */
696                 if (ItemIdHasStorage(hitemid))
697                 {
698                         htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
699
700                         HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
701                 }
702                 else if (ItemIdIsDead(hitemid))
703                 {
704                         /*
705                          * Conjecture: if hitemid is dead then it had xids before the xids
706                          * marked on LP_NORMAL items. So we just ignore this item and move
707                          * onto the next, for the purposes of calculating
708                          * latestRemovedxids.
709                          */
710                 }
711                 else
712                         Assert(!ItemIdIsUsed(hitemid));
713
714                 UnlockReleaseBuffer(hbuffer);
715         }
716
717         UnlockReleaseBuffer(ibuffer);
718
719         /*
720          * If all heap tuples were LP_DEAD then we will be returning
721          * InvalidTransactionId here, which avoids conflicts. This matches
722          * existing logic which assumes that LP_DEAD tuples must already be older
723          * than the latestRemovedXid on the cleanup record that set them as
724          * LP_DEAD, hence must already have generated a conflict.
725          */
726         return latestRemovedXid;
727 }
728
729 static void
730 btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
731 {
732         xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
733         Buffer          buffer;
734         Page            page;
735         BTPageOpaque opaque;
736
737         /*
738          * If we have any conflict processing to do, it must happen before we
739          * update the page.
740          *
741          * Btree delete records can conflict with standby queries.  You might
742          * think that vacuum records would conflict as well, but we've handled
743          * that already.  XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
744          * cleaned by the vacuum of the heap and so we can resolve any conflicts
745          * just once when that arrives.  After that we know that no conflicts
746          * exist from individual btree vacuum records on that index.
747          */
748         if (InHotStandby)
749         {
750                 TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(xlrec);
751
752                 ResolveRecoveryConflictWithSnapshot(latestRemovedXid, xlrec->node);
753         }
754
755         /* If we have a full-page image, restore it and we're done */
756         if (record->xl_info & XLR_BKP_BLOCK(0))
757         {
758                 (void) RestoreBackupBlock(lsn, record, 0, false, false);
759                 return;
760         }
761
762         /*
763          * We don't need to take a cleanup lock to apply these changes. See
764          * nbtree/README for details.
765          */
766         buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
767         if (!BufferIsValid(buffer))
768                 return;
769         page = (Page) BufferGetPage(buffer);
770
771         if (lsn <= PageGetLSN(page))
772         {
773                 UnlockReleaseBuffer(buffer);
774                 return;
775         }
776
777         if (record->xl_len > SizeOfBtreeDelete)
778         {
779                 OffsetNumber *unused;
780
781                 unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
782
783                 PageIndexMultiDelete(page, unused, xlrec->nitems);
784         }
785
786         /*
787          * Mark the page as not containing any LP_DEAD items --- see comments in
788          * _bt_delitems_delete().
789          */
790         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
791         opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
792
793         PageSetLSN(page, lsn);
794         MarkBufferDirty(buffer);
795         UnlockReleaseBuffer(buffer);
796 }
797
798 static void
799 btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record)
800 {
801         xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *) XLogRecGetData(record);
802         BlockNumber parent;
803         Buffer          buffer;
804         Page            page;
805         BTPageOpaque pageop;
806         IndexTupleData trunctuple;
807
808         parent = ItemPointerGetBlockNumber(&(xlrec->target.tid));
809
810         /*
811          * In normal operation, we would lock all the pages this WAL record
812          * touches before changing any of them.  In WAL replay, it should be okay
813          * to lock just one page at a time, since no concurrent index updates can
814          * be happening, and readers should not care whether they arrive at the
815          * target page or not (since it's surely empty).
816          */
817
818         /* parent page */
819         if (record->xl_info & XLR_BKP_BLOCK(0))
820                 (void) RestoreBackupBlock(lsn, record, 0, false, false);
821         else
822         {
823                 buffer = XLogReadBuffer(xlrec->target.node, parent, false);
824                 if (BufferIsValid(buffer))
825                 {
826                         page = (Page) BufferGetPage(buffer);
827                         pageop = (BTPageOpaque) PageGetSpecialPointer(page);
828                         if (lsn > PageGetLSN(page))
829                         {
830                                 OffsetNumber poffset;
831                                 ItemId          itemid;
832                                 IndexTuple      itup;
833                                 OffsetNumber nextoffset;
834                                 BlockNumber rightsib;
835
836                                 poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
837
838                                 nextoffset = OffsetNumberNext(poffset);
839                                 itemid = PageGetItemId(page, nextoffset);
840                                 itup = (IndexTuple) PageGetItem(page, itemid);
841                                 rightsib = ItemPointerGetBlockNumber(&itup->t_tid);
842
843                                 itemid = PageGetItemId(page, poffset);
844                                 itup = (IndexTuple) PageGetItem(page, itemid);
845                                 ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
846                                 nextoffset = OffsetNumberNext(poffset);
847                                 PageIndexTupleDelete(page, nextoffset);
848
849                                 PageSetLSN(page, lsn);
850                                 MarkBufferDirty(buffer);
851                         }
852                         UnlockReleaseBuffer(buffer);
853                 }
854         }
855
856         /* Rewrite the leaf page as a halfdead page */
857         buffer = XLogReadBuffer(xlrec->target.node, xlrec->leafblk, true);
858         Assert(BufferIsValid(buffer));
859         page = (Page) BufferGetPage(buffer);
860
861         _bt_pageinit(page, BufferGetPageSize(buffer));
862         pageop = (BTPageOpaque) PageGetSpecialPointer(page);
863
864         pageop->btpo_prev = xlrec->leftblk;
865         pageop->btpo_next = xlrec->rightblk;
866         pageop->btpo.level = 0;
867         pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
868         pageop->btpo_cycleid = 0;
869
870         /*
871          * Construct a dummy hikey item that points to the next parent to be
872          * deleted (if any).
873          */
874         MemSet(&trunctuple, 0, sizeof(IndexTupleData));
875         trunctuple.t_info = sizeof(IndexTupleData);
876         if (xlrec->topparent != InvalidBlockNumber)
877                 ItemPointerSet(&trunctuple.t_tid, xlrec->topparent, P_HIKEY);
878         else
879                 ItemPointerSetInvalid(&trunctuple.t_tid);
880         if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
881                                         false, false) == InvalidOffsetNumber)
882                 elog(ERROR, "could not add dummy high key to half-dead page");
883
884         PageSetLSN(page, lsn);
885         MarkBufferDirty(buffer);
886         UnlockReleaseBuffer(buffer);
887 }
888
889
890 static void
891 btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
892 {
893         xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *) XLogRecGetData(record);
894         BlockNumber target;
895         BlockNumber leftsib;
896         BlockNumber rightsib;
897         Buffer          buffer;
898         Page            page;
899         BTPageOpaque pageop;
900
901         target = xlrec->deadblk;
902         leftsib = xlrec->leftsib;
903         rightsib = xlrec->rightsib;
904
905         /*
906          * In normal operation, we would lock all the pages this WAL record
907          * touches before changing any of them.  In WAL replay, it should be okay
908          * to lock just one page at a time, since no concurrent index updates can
909          * be happening, and readers should not care whether they arrive at the
910          * target page or not (since it's surely empty).
911          */
912
913         /* Fix left-link of right sibling */
914         if (record->xl_info & XLR_BKP_BLOCK(0))
915                 (void) RestoreBackupBlock(lsn, record, 0, false, false);
916         else
917         {
918                 buffer = XLogReadBuffer(xlrec->node, rightsib, false);
919                 if (BufferIsValid(buffer))
920                 {
921                         page = (Page) BufferGetPage(buffer);
922                         if (lsn <= PageGetLSN(page))
923                         {
924                                 UnlockReleaseBuffer(buffer);
925                         }
926                         else
927                         {
928                                 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
929                                 pageop->btpo_prev = leftsib;
930
931                                 PageSetLSN(page, lsn);
932                                 MarkBufferDirty(buffer);
933                                 UnlockReleaseBuffer(buffer);
934                         }
935                 }
936         }
937
938         /* Fix right-link of left sibling, if any */
939         if (record->xl_info & XLR_BKP_BLOCK(1))
940                 (void) RestoreBackupBlock(lsn, record, 1, false, false);
941         else
942         {
943                 if (leftsib != P_NONE)
944                 {
945                         buffer = XLogReadBuffer(xlrec->node, leftsib, false);
946                         if (BufferIsValid(buffer))
947                         {
948                                 page = (Page) BufferGetPage(buffer);
949                                 if (lsn <= PageGetLSN(page))
950                                 {
951                                         UnlockReleaseBuffer(buffer);
952                                 }
953                                 else
954                                 {
955                                         pageop = (BTPageOpaque) PageGetSpecialPointer(page);
956                                         pageop->btpo_next = rightsib;
957
958                                         PageSetLSN(page, lsn);
959                                         MarkBufferDirty(buffer);
960                                         UnlockReleaseBuffer(buffer);
961                                 }
962                         }
963                 }
964         }
965
966         /* Rewrite target page as empty deleted page */
967         buffer = XLogReadBuffer(xlrec->node, target, true);
968         Assert(BufferIsValid(buffer));
969         page = (Page) BufferGetPage(buffer);
970
971         _bt_pageinit(page, BufferGetPageSize(buffer));
972         pageop = (BTPageOpaque) PageGetSpecialPointer(page);
973
974         pageop->btpo_prev = leftsib;
975         pageop->btpo_next = rightsib;
976         pageop->btpo.xact = xlrec->btpo_xact;
977         pageop->btpo_flags = BTP_DELETED;
978         pageop->btpo_cycleid = 0;
979
980         PageSetLSN(page, lsn);
981         MarkBufferDirty(buffer);
982         UnlockReleaseBuffer(buffer);
983
984         /*
985          * If we deleted a parent of the targeted leaf page, instead of the leaf
986          * itself, update the leaf to point to the next remaining child in the
987          * branch.
988          */
989         if (target != xlrec->leafblk)
990         {
991                 /*
992                  * There is no real data on the page, so we just re-create it from
993                  * scratch using the information from the WAL record.
994                  */
995                 IndexTupleData trunctuple;
996
997                 buffer = XLogReadBuffer(xlrec->node, xlrec->leafblk, true);
998                 Assert(BufferIsValid(buffer));
999                 page = (Page) BufferGetPage(buffer);
1000                 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
1001
1002                 _bt_pageinit(page, BufferGetPageSize(buffer));
1003                 pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
1004                 pageop->btpo_prev = xlrec->leafleftsib;
1005                 pageop->btpo_next = xlrec->leafrightsib;
1006                 pageop->btpo.level = 0;
1007                 pageop->btpo_cycleid = 0;
1008
1009                 /* Add a dummy hikey item */
1010                 MemSet(&trunctuple, 0, sizeof(IndexTupleData));
1011                 trunctuple.t_info = sizeof(IndexTupleData);
1012                 if (xlrec->topparent != InvalidBlockNumber)
1013                         ItemPointerSet(&trunctuple.t_tid, xlrec->topparent, P_HIKEY);
1014                 else
1015                         ItemPointerSetInvalid(&trunctuple.t_tid);
1016                 if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
1017                                                 false, false) == InvalidOffsetNumber)
1018                         elog(ERROR, "could not add dummy high key to half-dead page");
1019
1020                 PageSetLSN(page, lsn);
1021                 MarkBufferDirty(buffer);
1022                 UnlockReleaseBuffer(buffer);
1023         }
1024
1025         /* Update metapage if needed */
1026         if (info == XLOG_BTREE_UNLINK_PAGE_META)
1027         {
1028                 xl_btree_metadata md;
1029
1030                 memcpy(&md, (char *) xlrec + SizeOfBtreeUnlinkPage,
1031                            sizeof(xl_btree_metadata));
1032                 _bt_restore_meta(xlrec->node, lsn,
1033                                                  md.root, md.level,
1034                                                  md.fastroot, md.fastlevel);
1035         }
1036 }
1037
1038 static void
1039 btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
1040 {
1041         xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
1042         Buffer          buffer;
1043         Page            page;
1044         BTPageOpaque pageop;
1045
1046         buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true);
1047         Assert(BufferIsValid(buffer));
1048         page = (Page) BufferGetPage(buffer);
1049
1050         _bt_pageinit(page, BufferGetPageSize(buffer));
1051         pageop = (BTPageOpaque) PageGetSpecialPointer(page);
1052
1053         pageop->btpo_flags = BTP_ROOT;
1054         pageop->btpo_prev = pageop->btpo_next = P_NONE;
1055         pageop->btpo.level = xlrec->level;
1056         if (xlrec->level == 0)
1057                 pageop->btpo_flags |= BTP_LEAF;
1058         pageop->btpo_cycleid = 0;
1059
1060         if (record->xl_len > SizeOfBtreeNewroot)
1061         {
1062                 IndexTuple      itup;
1063                 BlockNumber cblkno;
1064
1065                 _bt_restore_page(page,
1066                                                  (char *) xlrec + SizeOfBtreeNewroot,
1067                                                  record->xl_len - SizeOfBtreeNewroot);
1068                 /* extract block number of the left-hand split page */
1069                 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_HIKEY));
1070                 cblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
1071                 Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
1072
1073                 /* Clear the incomplete-split flag in left child */
1074                 if (record->xl_info & XLR_BKP_BLOCK(0))
1075                         (void) RestoreBackupBlock(lsn, record, 0, false, false);
1076                 else
1077                         _bt_clear_incomplete_split(lsn, record, xlrec->node, cblkno);
1078         }
1079
1080         PageSetLSN(page, lsn);
1081         MarkBufferDirty(buffer);
1082         UnlockReleaseBuffer(buffer);
1083
1084         _bt_restore_meta(xlrec->node, lsn,
1085                                          xlrec->rootblk, xlrec->level,
1086                                          xlrec->rootblk, xlrec->level);
1087 }
1088
1089 static void
1090 btree_xlog_reuse_page(XLogRecPtr lsn, XLogRecord *record)
1091 {
1092         xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
1093
1094         /*
1095          * Btree reuse_page records exist to provide a conflict point when we
1096          * reuse pages in the index via the FSM.  That's all they do though.
1097          *
1098          * latestRemovedXid was the page's btpo.xact.  The btpo.xact <
1099          * RecentGlobalXmin test in _bt_page_recyclable() conceptually mirrors the
1100          * pgxact->xmin > limitXmin test in GetConflictingVirtualXIDs().
1101          * Consequently, one XID value achieves the same exclusion effect on
1102          * master and standby.
1103          */
1104         if (InHotStandby)
1105         {
1106                 ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
1107                                                                                         xlrec->node);
1108         }
1109
1110         /* Backup blocks are not used in reuse_page records */
1111         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
1112 }
1113
1114
1115 void
1116 btree_redo(XLogRecPtr lsn, XLogRecord *record)
1117 {
1118         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1119
1120         switch (info)
1121         {
1122                 case XLOG_BTREE_INSERT_LEAF:
1123                         btree_xlog_insert(true, false, lsn, record);
1124                         break;
1125                 case XLOG_BTREE_INSERT_UPPER:
1126                         btree_xlog_insert(false, false, lsn, record);
1127                         break;
1128                 case XLOG_BTREE_INSERT_META:
1129                         btree_xlog_insert(false, true, lsn, record);
1130                         break;
1131                 case XLOG_BTREE_SPLIT_L:
1132                         btree_xlog_split(true, false, lsn, record);
1133                         break;
1134                 case XLOG_BTREE_SPLIT_R:
1135                         btree_xlog_split(false, false, lsn, record);
1136                         break;
1137                 case XLOG_BTREE_SPLIT_L_ROOT:
1138                         btree_xlog_split(true, true, lsn, record);
1139                         break;
1140                 case XLOG_BTREE_SPLIT_R_ROOT:
1141                         btree_xlog_split(false, true, lsn, record);
1142                         break;
1143                 case XLOG_BTREE_VACUUM:
1144                         btree_xlog_vacuum(lsn, record);
1145                         break;
1146                 case XLOG_BTREE_DELETE:
1147                         btree_xlog_delete(lsn, record);
1148                         break;
1149                 case XLOG_BTREE_MARK_PAGE_HALFDEAD:
1150                         btree_xlog_mark_page_halfdead(info, lsn, record);
1151                         break;
1152                 case XLOG_BTREE_UNLINK_PAGE:
1153                 case XLOG_BTREE_UNLINK_PAGE_META:
1154                         btree_xlog_unlink_page(info, lsn, record);
1155                         break;
1156                 case XLOG_BTREE_NEWROOT:
1157                         btree_xlog_newroot(lsn, record);
1158                         break;
1159                 case XLOG_BTREE_REUSE_PAGE:
1160                         btree_xlog_reuse_page(lsn, record);
1161                         break;
1162                 default:
1163                         elog(PANIC, "btree_redo: unknown op code %u", info);
1164         }
1165 }