]> granicus.if.org Git - postgresql/blob - src/backend/access/nbtree/nbtinsert.c
dd3736ad8a99ccba69fe56094a3d81af9fa04517
[postgresql] / src / backend / access / nbtree / nbtinsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtinsert.c
4  *        Item insertion in Lehman and Yao btrees for Postgres.
5  *
6  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.173 2009/08/01 20:59:17 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "access/transam.h"
21 #include "miscadmin.h"
22 #include "storage/bufmgr.h"
23 #include "storage/lmgr.h"
24 #include "utils/inval.h"
25 #include "utils/tqual.h"
26
27
28 typedef struct
29 {
30         /* context data for _bt_checksplitloc */
31         Size            newitemsz;              /* size of new item to be inserted */
32         int                     fillfactor;             /* needed when splitting rightmost page */
33         bool            is_leaf;                /* T if splitting a leaf page */
34         bool            is_rightmost;   /* T if splitting a rightmost page */
35         OffsetNumber newitemoff;        /* where the new item is to be inserted */
36         int                     leftspace;              /* space available for items on left page */
37         int                     rightspace;             /* space available for items on right page */
38         int                     olddataitemstotal;              /* space taken by old items */
39
40         bool            have_split;             /* found a valid split? */
41
42         /* these fields valid only if have_split is true */
43         bool            newitemonleft;  /* new item on left or right of best split */
44         OffsetNumber firstright;        /* best split point */
45         int                     best_delta;             /* best size delta so far */
46 } FindSplitData;
47
48
49 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
50
51 static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
52                                  Relation heapRel, Buffer buf, OffsetNumber offset,
53                                  ScanKey itup_scankey,
54                                  IndexUniqueCheck checkUnique, bool *is_unique);
55 static void _bt_findinsertloc(Relation rel,
56                                   Buffer *bufptr,
57                                   OffsetNumber *offsetptr,
58                                   int keysz,
59                                   ScanKey scankey,
60                                   IndexTuple newtup);
61 static void _bt_insertonpg(Relation rel, Buffer buf,
62                            BTStack stack,
63                            IndexTuple itup,
64                            OffsetNumber newitemoff,
65                            bool split_only_page);
66 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
67                   OffsetNumber newitemoff, Size newitemsz,
68                   IndexTuple newitem, bool newitemonleft);
69 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
70                                  OffsetNumber newitemoff,
71                                  Size newitemsz,
72                                  bool *newitemonleft);
73 static void _bt_checksplitloc(FindSplitData *state,
74                                   OffsetNumber firstoldonright, bool newitemonleft,
75                                   int dataitemstoleft, Size firstoldonrightsz);
76 static void _bt_pgaddtup(Relation rel, Page page,
77                          Size itemsize, IndexTuple itup,
78                          OffsetNumber itup_off, const char *where);
79 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
80                         int keysz, ScanKey scankey);
81 static void _bt_vacuum_one_page(Relation rel, Buffer buffer);
82
83
84 /*
85  *      _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
86  *
87  *              This routine is called by the public interface routines, btbuild
88  *              and btinsert.  By here, itup is filled in, including the TID.
89  *
90  *              If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
91  *              will allow duplicates.  Otherwise (UNIQUE_CHECK_YES or
92  *              UNIQUE_CHECK_EXISTING) it will throw error for a duplicate.
93  *              For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and
94  *              don't actually insert.
95  *
96  *              The result value is only significant for UNIQUE_CHECK_PARTIAL:
97  *              it must be TRUE if the entry is known unique, else FALSE.
98  *              (In the current implementation we'll also return TRUE after a
99  *              successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but
100  *              that's just a coding artifact.)
101  */
102 bool
103 _bt_doinsert(Relation rel, IndexTuple itup,
104                          IndexUniqueCheck checkUnique, Relation heapRel)
105 {
106         bool            is_unique = false;
107         int                     natts = rel->rd_rel->relnatts;
108         ScanKey         itup_scankey;
109         BTStack         stack;
110         Buffer          buf;
111         OffsetNumber offset;
112
113         /* we need an insertion scan key to do our search, so build one */
114         itup_scankey = _bt_mkscankey(rel, itup);
115
116 top:
117         /* find the first page containing this key */
118         stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
119
120         offset = InvalidOffsetNumber;
121
122         /* trade in our read lock for a write lock */
123         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
124         LockBuffer(buf, BT_WRITE);
125
126         /*
127          * If the page was split between the time that we surrendered our read
128          * lock and acquired our write lock, then this page may no longer be the
129          * right place for the key we want to insert.  In this case, we need to
130          * move right in the tree.      See Lehman and Yao for an excruciatingly
131          * precise description.
132          */
133         buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
134
135         /*
136          * If we're not allowing duplicates, make sure the key isn't already in
137          * the index.
138          *
139          * NOTE: obviously, _bt_check_unique can only detect keys that are already
140          * in the index; so it cannot defend against concurrent insertions of the
141          * same key.  We protect against that by means of holding a write lock on
142          * the target page.  Any other would-be inserter of the same key must
143          * acquire a write lock on the same target page, so only one would-be
144          * inserter can be making the check at one time.  Furthermore, once we are
145          * past the check we hold write locks continuously until we have performed
146          * our insertion, so no later inserter can fail to see our insertion.
147          * (This requires some care in _bt_insertonpg.)
148          *
149          * If we must wait for another xact, we release the lock while waiting,
150          * and then must start over completely.
151          *
152          * For a partial uniqueness check, we don't wait for the other xact.
153          * Just let the tuple in and return false for possibly non-unique,
154          * or true for definitely unique.
155          */
156         if (checkUnique != UNIQUE_CHECK_NO)
157         {
158                 TransactionId xwait;
159
160                 offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
161                 xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
162                                                                  checkUnique, &is_unique);
163
164                 if (TransactionIdIsValid(xwait))
165                 {
166                         /* Have to wait for the other guy ... */
167                         _bt_relbuf(rel, buf);
168                         XactLockTableWait(xwait);
169                         /* start over... */
170                         _bt_freestack(stack);
171                         goto top;
172                 }
173         }
174
175         if (checkUnique != UNIQUE_CHECK_EXISTING)
176         {
177                 /* do the insertion */
178                 _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup);
179                 _bt_insertonpg(rel, buf, stack, itup, offset, false);
180         }
181         else
182         {
183                 /* just release the buffer */
184                 _bt_relbuf(rel, buf);
185         }
186
187         /* be tidy */
188         _bt_freestack(stack);
189         _bt_freeskey(itup_scankey);
190
191         return is_unique;
192 }
193
194 /*
195  *      _bt_check_unique() -- Check for violation of unique index constraint
196  *
197  * offset points to the first possible item that could conflict. It can
198  * also point to end-of-page, which means that the first tuple to check
199  * is the first tuple on the next page.
200  *
201  * Returns InvalidTransactionId if there is no conflict, else an xact ID
202  * we must wait for to see if it commits a conflicting tuple.   If an actual
203  * conflict is detected, no return --- just ereport().
204  *
205  * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
206  * InvalidTransactionId because we don't want to wait.  In this case we
207  * set *is_unique to false if there is a potential conflict, and the
208  * core code must redo the uniqueness check later.
209  */
210 static TransactionId
211 _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
212                                  Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
213                                  IndexUniqueCheck checkUnique, bool *is_unique)
214 {
215         TupleDesc       itupdesc = RelationGetDescr(rel);
216         int                     natts = rel->rd_rel->relnatts;
217         SnapshotData SnapshotDirty;
218         OffsetNumber maxoff;
219         Page            page;
220         BTPageOpaque opaque;
221         Buffer          nbuf = InvalidBuffer;
222         bool            found = false;
223
224         /* Assume unique until we find a duplicate */
225         *is_unique = true;
226
227         InitDirtySnapshot(SnapshotDirty);
228
229         page = BufferGetPage(buf);
230         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
231         maxoff = PageGetMaxOffsetNumber(page);
232
233         /*
234          * Scan over all equal tuples, looking for live conflicts.
235          */
236         for (;;)
237         {
238                 ItemId          curitemid;
239                 IndexTuple      curitup;
240                 BlockNumber nblkno;
241
242                 /*
243                  * make sure the offset points to an actual item before trying to
244                  * examine it...
245                  */
246                 if (offset <= maxoff)
247                 {
248                         curitemid = PageGetItemId(page, offset);
249
250                         /*
251                          * We can skip items that are marked killed.
252                          *
253                          * Formerly, we applied _bt_isequal() before checking the kill
254                          * flag, so as to fall out of the item loop as soon as possible.
255                          * However, in the presence of heavy update activity an index may
256                          * contain many killed items with the same key; running
257                          * _bt_isequal() on each killed item gets expensive. Furthermore
258                          * it is likely that the non-killed version of each key appears
259                          * first, so that we didn't actually get to exit any sooner
260                          * anyway. So now we just advance over killed items as quickly as
261                          * we can. We only apply _bt_isequal() when we get to a non-killed
262                          * item or the end of the page.
263                          */
264                         if (!ItemIdIsDead(curitemid))
265                         {
266                                 ItemPointerData htid;
267                                 bool            all_dead;
268
269                                 /*
270                                  * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
271                                  * how we handling NULLs - and so we must not use _bt_compare
272                                  * in real comparison, but only for ordering/finding items on
273                                  * pages. - vadim 03/24/97
274                                  */
275                                 if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
276                                         break;          /* we're past all the equal tuples */
277
278                                 /* okay, we gotta fetch the heap tuple ... */
279                                 curitup = (IndexTuple) PageGetItem(page, curitemid);
280                                 htid = curitup->t_tid;
281
282                                 /*
283                                  * If we are doing a recheck, we expect to find the tuple we
284                                  * are rechecking.  It's not a duplicate, but we have to keep
285                                  * scanning.
286                                  */
287                                 if (checkUnique == UNIQUE_CHECK_EXISTING &&
288                                         ItemPointerCompare(&htid, &itup->t_tid) == 0)
289                                 {
290                                         found = true;
291                                 }
292
293                                 /*
294                                  * We check the whole HOT-chain to see if there is any tuple
295                                  * that satisfies SnapshotDirty.  This is necessary because we
296                                  * have just a single index entry for the entire chain.
297                                  */
298                                 else if (heap_hot_search(&htid, heapRel, &SnapshotDirty,
299                                                                                  &all_dead))
300                                 {
301                                         TransactionId xwait;
302
303                                         /*
304                                          * It is a duplicate. If we are only doing a partial
305                                          * check, then don't bother checking if the tuple is
306                                          * being updated in another transaction. Just return
307                                          * the fact that it is a potential conflict and leave
308                                          * the full check till later.
309                                          */
310                                         if (checkUnique == UNIQUE_CHECK_PARTIAL)
311                                         {
312                                                 if (nbuf != InvalidBuffer)
313                                                         _bt_relbuf(rel, nbuf);
314                                                 *is_unique = false;
315                                                 return InvalidTransactionId;
316                                         }
317
318                                         /*
319                                          * If this tuple is being updated by other transaction
320                                          * then we have to wait for its commit/abort.
321                                          */
322                                         xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ?
323                                                 SnapshotDirty.xmin : SnapshotDirty.xmax;
324
325                                         if (TransactionIdIsValid(xwait))
326                                         {
327                                                 if (nbuf != InvalidBuffer)
328                                                         _bt_relbuf(rel, nbuf);
329                                                 /* Tell _bt_doinsert to wait... */
330                                                 return xwait;
331                                         }
332
333                                         /*
334                                          * Otherwise we have a definite conflict.  But before
335                                          * complaining, look to see if the tuple we want to insert
336                                          * is itself now committed dead --- if so, don't complain.
337                                          * This is a waste of time in normal scenarios but we must
338                                          * do it to support CREATE INDEX CONCURRENTLY.
339                                          *
340                                          * We must follow HOT-chains here because during
341                                          * concurrent index build, we insert the root TID though
342                                          * the actual tuple may be somewhere in the HOT-chain.
343                                          * While following the chain we might not stop at the
344                                          * exact tuple which triggered the insert, but that's OK
345                                          * because if we find a live tuple anywhere in this chain,
346                                          * we have a unique key conflict.  The other live tuple is
347                                          * not part of this chain because it had a different index
348                                          * entry.
349                                          */
350                                         htid = itup->t_tid;
351                                         if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL))
352                                         {
353                                                 /* Normal case --- it's still live */
354                                         }
355                                         else
356                                         {
357                                                 /*
358                                                  * It's been deleted, so no error, and no need to
359                                                  * continue searching
360                                                  */
361                                                 break;
362                                         }
363
364                                         /*
365                                          * This is a definite conflict.  Break the tuple down
366                                          * into datums and report the error.  But first, make
367                                          * sure we release the buffer locks we're holding ---
368                                          * BuildIndexValueDescription could make catalog accesses,
369                                          * which in the worst case might touch this same index
370                                          * and cause deadlocks.
371                                          */
372                                         if (nbuf != InvalidBuffer)
373                                                 _bt_relbuf(rel, nbuf);
374                                         _bt_relbuf(rel, buf);
375
376                                         {
377                                                 Datum   values[INDEX_MAX_KEYS];
378                                                 bool    isnull[INDEX_MAX_KEYS];
379
380                                                 index_deform_tuple(itup, RelationGetDescr(rel),
381                                                                                    values, isnull);
382                                                 ereport(ERROR,
383                                                                 (errcode(ERRCODE_UNIQUE_VIOLATION),
384                                                                  errmsg("duplicate key value violates unique constraint \"%s\"",
385                                                                                 RelationGetRelationName(rel)),
386                                                                  errdetail("Key %s already exists.",
387                                                                                    BuildIndexValueDescription(rel,
388                                                                                                                         values, isnull))));
389                                         }
390                                 }
391                                 else if (all_dead)
392                                 {
393                                         /*
394                                          * The conflicting tuple (or whole HOT chain) is dead to
395                                          * everyone, so we may as well mark the index entry
396                                          * killed.
397                                          */
398                                         ItemIdMarkDead(curitemid);
399                                         opaque->btpo_flags |= BTP_HAS_GARBAGE;
400                                         /* be sure to mark the proper buffer dirty... */
401                                         if (nbuf != InvalidBuffer)
402                                                 SetBufferCommitInfoNeedsSave(nbuf);
403                                         else
404                                                 SetBufferCommitInfoNeedsSave(buf);
405                                 }
406                         }
407                 }
408
409                 /*
410                  * Advance to next tuple to continue checking.
411                  */
412                 if (offset < maxoff)
413                         offset = OffsetNumberNext(offset);
414                 else
415                 {
416                         /* If scankey == hikey we gotta check the next page too */
417                         if (P_RIGHTMOST(opaque))
418                                 break;
419                         if (!_bt_isequal(itupdesc, page, P_HIKEY,
420                                                          natts, itup_scankey))
421                                 break;
422                         /* Advance to next non-dead page --- there must be one */
423                         for (;;)
424                         {
425                                 nblkno = opaque->btpo_next;
426                                 nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
427                                 page = BufferGetPage(nbuf);
428                                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
429                                 if (!P_IGNORE(opaque))
430                                         break;
431                                 if (P_RIGHTMOST(opaque))
432                                         elog(ERROR, "fell off the end of index \"%s\"",
433                                                  RelationGetRelationName(rel));
434                         }
435                         maxoff = PageGetMaxOffsetNumber(page);
436                         offset = P_FIRSTDATAKEY(opaque);
437                 }
438         }
439
440         /*
441          * If we are doing a recheck then we should have found the tuple we
442          * are checking.  Otherwise there's something very wrong --- probably,
443          * the index is on a non-immutable expression.
444          */
445         if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
446                 ereport(ERROR,
447                                 (errcode(ERRCODE_INTERNAL_ERROR),
448                                  errmsg("failed to re-find tuple within index \"%s\"",
449                                                 RelationGetRelationName(rel)),
450                                  errhint("This may be because of a non-immutable index expression.")));
451
452         if (nbuf != InvalidBuffer)
453                 _bt_relbuf(rel, nbuf);
454
455         return InvalidTransactionId;
456 }
457
458
459 /*
460  *      _bt_findinsertloc() -- Finds an insert location for a tuple
461  *
462  *              If the new key is equal to one or more existing keys, we can
463  *              legitimately place it anywhere in the series of equal keys --- in fact,
464  *              if the new key is equal to the page's "high key" we can place it on
465  *              the next page.  If it is equal to the high key, and there's not room
466  *              to insert the new tuple on the current page without splitting, then
467  *              we can move right hoping to find more free space and avoid a split.
468  *              (We should not move right indefinitely, however, since that leads to
469  *              O(N^2) insertion behavior in the presence of many equal keys.)
470  *              Once we have chosen the page to put the key on, we'll insert it before
471  *              any existing equal keys because of the way _bt_binsrch() works.
472  *
473  *              If there's not enough room in the space, we try to make room by
474  *              removing any LP_DEAD tuples.
475  *
476  *              On entry, *buf and *offsetptr point to the first legal position
477  *              where the new tuple could be inserted.  The caller should hold an
478  *              exclusive lock on *buf.  *offsetptr can also be set to
479  *              InvalidOffsetNumber, in which case the function will search for the
480  *              right location within the page if needed.  On exit, they point to the
481  *              chosen insert location.  If _bt_findinsertloc decides to move right,
482  *              the lock and pin on the original page will be released and the new
483  *              page returned to the caller is exclusively locked instead.
484  *
485  *              newtup is the new tuple we're inserting, and scankey is an insertion
486  *              type scan key for it.
487  */
488 static void
489 _bt_findinsertloc(Relation rel,
490                                   Buffer *bufptr,
491                                   OffsetNumber *offsetptr,
492                                   int keysz,
493                                   ScanKey scankey,
494                                   IndexTuple newtup)
495 {
496         Buffer          buf = *bufptr;
497         Page            page = BufferGetPage(buf);
498         Size            itemsz;
499         BTPageOpaque lpageop;
500         bool            movedright,
501                                 vacuumed;
502         OffsetNumber newitemoff;
503         OffsetNumber firstlegaloff = *offsetptr;
504
505         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
506
507         itemsz = IndexTupleDSize(*newtup);
508         itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but we
509                                                                  * need to be consistent */
510
511         /*
512          * Check whether the item can fit on a btree page at all. (Eventually, we
513          * ought to try to apply TOAST methods if not.) We actually need to be
514          * able to fit three items on every page, so restrict any one item to 1/3
515          * the per-page available space. Note that at this point, itemsz doesn't
516          * include the ItemId.
517          */
518         if (itemsz > BTMaxItemSize(page))
519                 ereport(ERROR,
520                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
521                                  errmsg("index row size %lu exceeds btree maximum, %lu",
522                                                 (unsigned long) itemsz,
523                                                 (unsigned long) BTMaxItemSize(page)),
524                 errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
525                                 "Consider a function index of an MD5 hash of the value, "
526                                 "or use full text indexing.")));
527
528         /*----------
529          * If we will need to split the page to put the item on this page,
530          * check whether we can put the tuple somewhere to the right,
531          * instead.  Keep scanning right until we
532          *              (a) find a page with enough free space,
533          *              (b) reach the last page where the tuple can legally go, or
534          *              (c) get tired of searching.
535          * (c) is not flippant; it is important because if there are many
536          * pages' worth of equal keys, it's better to split one of the early
537          * pages than to scan all the way to the end of the run of equal keys
538          * on every insert.  We implement "get tired" as a random choice,
539          * since stopping after scanning a fixed number of pages wouldn't work
540          * well (we'd never reach the right-hand side of previously split
541          * pages).      Currently the probability of moving right is set at 0.99,
542          * which may seem too high to change the behavior much, but it does an
543          * excellent job of preventing O(N^2) behavior with many equal keys.
544          *----------
545          */
546         movedright = false;
547         vacuumed = false;
548         while (PageGetFreeSpace(page) < itemsz)
549         {
550                 Buffer          rbuf;
551
552                 /*
553                  * before considering moving right, see if we can obtain enough space
554                  * by erasing LP_DEAD items
555                  */
556                 if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
557                 {
558                         _bt_vacuum_one_page(rel, buf);
559
560                         /*
561                          * remember that we vacuumed this page, because that makes the
562                          * hint supplied by the caller invalid
563                          */
564                         vacuumed = true;
565
566                         if (PageGetFreeSpace(page) >= itemsz)
567                                 break;                  /* OK, now we have enough space */
568                 }
569
570                 /*
571                  * nope, so check conditions (b) and (c) enumerated above
572                  */
573                 if (P_RIGHTMOST(lpageop) ||
574                         _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
575                         random() <= (MAX_RANDOM_VALUE / 100))
576                         break;
577
578                 /*
579                  * step right to next non-dead page
580                  *
581                  * must write-lock that page before releasing write lock on current
582                  * page; else someone else's _bt_check_unique scan could fail to see
583                  * our insertion.  write locks on intermediate dead pages won't do
584                  * because we don't know when they will get de-linked from the tree.
585                  */
586                 rbuf = InvalidBuffer;
587
588                 for (;;)
589                 {
590                         BlockNumber rblkno = lpageop->btpo_next;
591
592                         rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
593                         page = BufferGetPage(rbuf);
594                         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
595                         if (!P_IGNORE(lpageop))
596                                 break;
597                         if (P_RIGHTMOST(lpageop))
598                                 elog(ERROR, "fell off the end of index \"%s\"",
599                                          RelationGetRelationName(rel));
600                 }
601                 _bt_relbuf(rel, buf);
602                 buf = rbuf;
603                 movedright = true;
604                 vacuumed = false;
605         }
606
607         /*
608          * Now we are on the right page, so find the insert position. If we moved
609          * right at all, we know we should insert at the start of the page. If we
610          * didn't move right, we can use the firstlegaloff hint if the caller
611          * supplied one, unless we vacuumed the page which might have moved tuples
612          * around making the hint invalid. If we didn't move right or can't use
613          * the hint, find the position by searching.
614          */
615         if (movedright)
616                 newitemoff = P_FIRSTDATAKEY(lpageop);
617         else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
618                 newitemoff = firstlegaloff;
619         else
620                 newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
621
622         *bufptr = buf;
623         *offsetptr = newitemoff;
624 }
625
626 /*----------
627  *      _bt_insertonpg() -- Insert a tuple on a particular page in the index.
628  *
629  *              This recursive procedure does the following things:
630  *
631  *                      +  if necessary, splits the target page (making sure that the
632  *                         split is equitable as far as post-insert free space goes).
633  *                      +  inserts the tuple.
634  *                      +  if the page was split, pops the parent stack, and finds the
635  *                         right place to insert the new child pointer (by walking
636  *                         right using information stored in the parent stack).
637  *                      +  invokes itself with the appropriate tuple for the right
638  *                         child page on the parent.
639  *                      +  updates the metapage if a true root or fast root is split.
640  *
641  *              On entry, we must have the right buffer in which to do the
642  *              insertion, and the buffer must be pinned and write-locked.      On return,
643  *              we will have dropped both the pin and the lock on the buffer.
644  *
645  *              The locking interactions in this code are critical.  You should
646  *              grok Lehman and Yao's paper before making any changes.  In addition,
647  *              you need to understand how we disambiguate duplicate keys in this
648  *              implementation, in order to be able to find our location using
649  *              L&Y "move right" operations.  Since we may insert duplicate user
650  *              keys, and since these dups may propagate up the tree, we use the
651  *              'afteritem' parameter to position ourselves correctly for the
652  *              insertion on internal pages.
653  *----------
654  */
655 static void
656 _bt_insertonpg(Relation rel,
657                            Buffer buf,
658                            BTStack stack,
659                            IndexTuple itup,
660                            OffsetNumber newitemoff,
661                            bool split_only_page)
662 {
663         Page            page;
664         BTPageOpaque lpageop;
665         OffsetNumber firstright = InvalidOffsetNumber;
666         Size            itemsz;
667
668         page = BufferGetPage(buf);
669         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
670
671         itemsz = IndexTupleDSize(*itup);
672         itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but we
673                                                                  * need to be consistent */
674
675         /*
676          * Do we need to split the page to fit the item on it?
677          *
678          * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
679          * so this comparison is correct even though we appear to be accounting
680          * only for the item and not for its line pointer.
681          */
682         if (PageGetFreeSpace(page) < itemsz)
683         {
684                 bool            is_root = P_ISROOT(lpageop);
685                 bool            is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
686                 bool            newitemonleft;
687                 Buffer          rbuf;
688
689                 /* Choose the split point */
690                 firstright = _bt_findsplitloc(rel, page,
691                                                                           newitemoff, itemsz,
692                                                                           &newitemonleft);
693
694                 /* split the buffer into left and right halves */
695                 rbuf = _bt_split(rel, buf, firstright,
696                                                  newitemoff, itemsz, itup, newitemonleft);
697
698                 /*----------
699                  * By here,
700                  *
701                  *              +  our target page has been split;
702                  *              +  the original tuple has been inserted;
703                  *              +  we have write locks on both the old (left half)
704                  *                 and new (right half) buffers, after the split; and
705                  *              +  we know the key we want to insert into the parent
706                  *                 (it's the "high key" on the left child page).
707                  *
708                  * We're ready to do the parent insertion.  We need to hold onto the
709                  * locks for the child pages until we locate the parent, but we can
710                  * release them before doing the actual insertion (see Lehman and Yao
711                  * for the reasoning).
712                  *----------
713                  */
714                 _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
715         }
716         else
717         {
718                 Buffer          metabuf = InvalidBuffer;
719                 Page            metapg = NULL;
720                 BTMetaPageData *metad = NULL;
721                 OffsetNumber itup_off;
722                 BlockNumber itup_blkno;
723
724                 itup_off = newitemoff;
725                 itup_blkno = BufferGetBlockNumber(buf);
726
727                 /*
728                  * If we are doing this insert because we split a page that was the
729                  * only one on its tree level, but was not the root, it may have been
730                  * the "fast root".  We need to ensure that the fast root link points
731                  * at or above the current page.  We can safely acquire a lock on the
732                  * metapage here --- see comments for _bt_newroot().
733                  */
734                 if (split_only_page)
735                 {
736                         Assert(!P_ISLEAF(lpageop));
737
738                         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
739                         metapg = BufferGetPage(metabuf);
740                         metad = BTPageGetMeta(metapg);
741
742                         if (metad->btm_fastlevel >= lpageop->btpo.level)
743                         {
744                                 /* no update wanted */
745                                 _bt_relbuf(rel, metabuf);
746                                 metabuf = InvalidBuffer;
747                         }
748                 }
749
750                 /* Do the update.  No ereport(ERROR) until changes are logged */
751                 START_CRIT_SECTION();
752
753                 _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page");
754
755                 MarkBufferDirty(buf);
756
757                 if (BufferIsValid(metabuf))
758                 {
759                         metad->btm_fastroot = itup_blkno;
760                         metad->btm_fastlevel = lpageop->btpo.level;
761                         MarkBufferDirty(metabuf);
762                 }
763
764                 /* XLOG stuff */
765                 if (!rel->rd_istemp)
766                 {
767                         xl_btree_insert xlrec;
768                         BlockNumber xldownlink;
769                         xl_btree_metadata xlmeta;
770                         uint8           xlinfo;
771                         XLogRecPtr      recptr;
772                         XLogRecData rdata[4];
773                         XLogRecData *nextrdata;
774                         IndexTupleData trunctuple;
775
776                         xlrec.target.node = rel->rd_node;
777                         ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
778
779                         rdata[0].data = (char *) &xlrec;
780                         rdata[0].len = SizeOfBtreeInsert;
781                         rdata[0].buffer = InvalidBuffer;
782                         rdata[0].next = nextrdata = &(rdata[1]);
783
784                         if (P_ISLEAF(lpageop))
785                                 xlinfo = XLOG_BTREE_INSERT_LEAF;
786                         else
787                         {
788                                 xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid));
789                                 Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
790
791                                 nextrdata->data = (char *) &xldownlink;
792                                 nextrdata->len = sizeof(BlockNumber);
793                                 nextrdata->buffer = InvalidBuffer;
794                                 nextrdata->next = nextrdata + 1;
795                                 nextrdata++;
796
797                                 xlinfo = XLOG_BTREE_INSERT_UPPER;
798                         }
799
800                         if (BufferIsValid(metabuf))
801                         {
802                                 xlmeta.root = metad->btm_root;
803                                 xlmeta.level = metad->btm_level;
804                                 xlmeta.fastroot = metad->btm_fastroot;
805                                 xlmeta.fastlevel = metad->btm_fastlevel;
806
807                                 nextrdata->data = (char *) &xlmeta;
808                                 nextrdata->len = sizeof(xl_btree_metadata);
809                                 nextrdata->buffer = InvalidBuffer;
810                                 nextrdata->next = nextrdata + 1;
811                                 nextrdata++;
812
813                                 xlinfo = XLOG_BTREE_INSERT_META;
814                         }
815
816                         /* Read comments in _bt_pgaddtup */
817                         if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
818                         {
819                                 trunctuple = *itup;
820                                 trunctuple.t_info = sizeof(IndexTupleData);
821                                 nextrdata->data = (char *) &trunctuple;
822                                 nextrdata->len = sizeof(IndexTupleData);
823                         }
824                         else
825                         {
826                                 nextrdata->data = (char *) itup;
827                                 nextrdata->len = IndexTupleDSize(*itup);
828                         }
829                         nextrdata->buffer = buf;
830                         nextrdata->buffer_std = true;
831                         nextrdata->next = NULL;
832
833                         recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
834
835                         if (BufferIsValid(metabuf))
836                         {
837                                 PageSetLSN(metapg, recptr);
838                                 PageSetTLI(metapg, ThisTimeLineID);
839                         }
840
841                         PageSetLSN(page, recptr);
842                         PageSetTLI(page, ThisTimeLineID);
843                 }
844
845                 END_CRIT_SECTION();
846
847                 /* release buffers; send out relcache inval if metapage changed */
848                 if (BufferIsValid(metabuf))
849                 {
850                         if (!InRecovery)
851                                 CacheInvalidateRelcache(rel);
852                         _bt_relbuf(rel, metabuf);
853                 }
854
855                 _bt_relbuf(rel, buf);
856         }
857 }
858
859 /*
860  *      _bt_split() -- split a page in the btree.
861  *
862  *              On entry, buf is the page to split, and is pinned and write-locked.
863  *              firstright is the item index of the first item to be moved to the
864  *              new right page.  newitemoff etc. tell us about the new item that
865  *              must be inserted along with the data from the old page.
866  *
867  *              Returns the new right sibling of buf, pinned and write-locked.
868  *              The pin and lock on buf are maintained.
869  */
870 static Buffer
871 _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
872                   OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
873                   bool newitemonleft)
874 {
875         Buffer          rbuf;
876         Page            origpage;
877         Page            leftpage,
878                                 rightpage;
879         BTPageOpaque ropaque,
880                                 lopaque,
881                                 oopaque;
882         Buffer          sbuf = InvalidBuffer;
883         Page            spage = NULL;
884         BTPageOpaque sopaque = NULL;
885         Size            itemsz;
886         ItemId          itemid;
887         IndexTuple      item;
888         OffsetNumber leftoff,
889                                 rightoff;
890         OffsetNumber maxoff;
891         OffsetNumber i;
892         bool            isroot;
893
894         rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
895         origpage = BufferGetPage(buf);
896         leftpage = PageGetTempPage(origpage);
897         rightpage = BufferGetPage(rbuf);
898
899         _bt_pageinit(leftpage, BufferGetPageSize(buf));
900         /* rightpage was already initialized by _bt_getbuf */
901
902         /*
903          * Copy the original page's LSN and TLI into leftpage, which will become
904          * the updated version of the page.  We need this because XLogInsert will
905          * examine these fields and possibly dump them in a page image.
906          */
907         PageSetLSN(leftpage, PageGetLSN(origpage));
908         PageSetTLI(leftpage, PageGetTLI(origpage));
909
910         /* init btree private data */
911         oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
912         lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
913         ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
914
915         isroot = P_ISROOT(oopaque);
916
917         /* if we're splitting this page, it won't be the root when we're done */
918         /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
919         lopaque->btpo_flags = oopaque->btpo_flags;
920         lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
921         ropaque->btpo_flags = lopaque->btpo_flags;
922         lopaque->btpo_prev = oopaque->btpo_prev;
923         lopaque->btpo_next = BufferGetBlockNumber(rbuf);
924         ropaque->btpo_prev = BufferGetBlockNumber(buf);
925         ropaque->btpo_next = oopaque->btpo_next;
926         lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
927         /* Since we already have write-lock on both pages, ok to read cycleid */
928         lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
929         ropaque->btpo_cycleid = lopaque->btpo_cycleid;
930
931         /*
932          * If the page we're splitting is not the rightmost page at its level in
933          * the tree, then the first entry on the page is the high key for the
934          * page.  We need to copy that to the right half.  Otherwise (meaning the
935          * rightmost page case), all the items on the right half will be user
936          * data.
937          */
938         rightoff = P_HIKEY;
939
940         if (!P_RIGHTMOST(oopaque))
941         {
942                 itemid = PageGetItemId(origpage, P_HIKEY);
943                 itemsz = ItemIdGetLength(itemid);
944                 item = (IndexTuple) PageGetItem(origpage, itemid);
945                 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
946                                                 false, false) == InvalidOffsetNumber)
947                         elog(PANIC, "failed to add hikey to the right sibling"
948                                  " while splitting block %u of index \"%s\"",
949                                  BufferGetBlockNumber(buf), RelationGetRelationName(rel));
950                 rightoff = OffsetNumberNext(rightoff);
951         }
952
953         /*
954          * The "high key" for the new left page will be the first key that's going
955          * to go into the new right page.  This might be either the existing data
956          * item at position firstright, or the incoming tuple.
957          */
958         leftoff = P_HIKEY;
959         if (!newitemonleft && newitemoff == firstright)
960         {
961                 /* incoming tuple will become first on right page */
962                 itemsz = newitemsz;
963                 item = newitem;
964         }
965         else
966         {
967                 /* existing item at firstright will become first on right page */
968                 itemid = PageGetItemId(origpage, firstright);
969                 itemsz = ItemIdGetLength(itemid);
970                 item = (IndexTuple) PageGetItem(origpage, itemid);
971         }
972         if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
973                                         false, false) == InvalidOffsetNumber)
974                 elog(PANIC, "failed to add hikey to the left sibling"
975                          " while splitting block %u of index \"%s\"",
976                          BufferGetBlockNumber(buf), RelationGetRelationName(rel));
977         leftoff = OffsetNumberNext(leftoff);
978
979         /*
980          * Now transfer all the data items to the appropriate page.
981          *
982          * Note: we *must* insert at least the right page's items in item-number
983          * order, for the benefit of _bt_restore_page().
984          */
985         maxoff = PageGetMaxOffsetNumber(origpage);
986
987         for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
988         {
989                 itemid = PageGetItemId(origpage, i);
990                 itemsz = ItemIdGetLength(itemid);
991                 item = (IndexTuple) PageGetItem(origpage, itemid);
992
993                 /* does new item belong before this one? */
994                 if (i == newitemoff)
995                 {
996                         if (newitemonleft)
997                         {
998                                 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
999                                                          "left sibling");
1000                                 leftoff = OffsetNumberNext(leftoff);
1001                         }
1002                         else
1003                         {
1004                                 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
1005                                                          "right sibling");
1006                                 rightoff = OffsetNumberNext(rightoff);
1007                         }
1008                 }
1009
1010                 /* decide which page to put it on */
1011                 if (i < firstright)
1012                 {
1013                         _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
1014                                                  "left sibling");
1015                         leftoff = OffsetNumberNext(leftoff);
1016                 }
1017                 else
1018                 {
1019                         _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
1020                                                  "right sibling");
1021                         rightoff = OffsetNumberNext(rightoff);
1022                 }
1023         }
1024
1025         /* cope with possibility that newitem goes at the end */
1026         if (i <= newitemoff)
1027         {
1028                 /*
1029                  * Can't have newitemonleft here; that would imply we were told to put
1030                  * *everything* on the left page, which cannot fit (if it could, we'd
1031                  * not be splitting the page).
1032                  */
1033                 Assert(!newitemonleft);
1034                 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
1035                                          "right sibling");
1036                 rightoff = OffsetNumberNext(rightoff);
1037         }
1038
1039         /*
1040          * We have to grab the right sibling (if any) and fix the prev pointer
1041          * there. We are guaranteed that this is deadlock-free since no other
1042          * writer will be holding a lock on that page and trying to move left, and
1043          * all readers release locks on a page before trying to fetch its
1044          * neighbors.
1045          */
1046
1047         if (!P_RIGHTMOST(ropaque))
1048         {
1049                 sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
1050                 spage = BufferGetPage(sbuf);
1051                 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
1052                 if (sopaque->btpo_prev != ropaque->btpo_prev)
1053                         elog(PANIC, "right sibling's left-link doesn't match: "
1054                            "block %u links to %u instead of expected %u in index \"%s\"",
1055                                  ropaque->btpo_next, sopaque->btpo_prev, ropaque->btpo_prev,
1056                                  RelationGetRelationName(rel));
1057
1058                 /*
1059                  * Check to see if we can set the SPLIT_END flag in the right-hand
1060                  * split page; this can save some I/O for vacuum since it need not
1061                  * proceed to the right sibling.  We can set the flag if the right
1062                  * sibling has a different cycleid: that means it could not be part of
1063                  * a group of pages that were all split off from the same ancestor
1064                  * page.  If you're confused, imagine that page A splits to A B and
1065                  * then again, yielding A C B, while vacuum is in progress.  Tuples
1066                  * originally in A could now be in either B or C, hence vacuum must
1067                  * examine both pages.  But if D, our right sibling, has a different
1068                  * cycleid then it could not contain any tuples that were in A when
1069                  * the vacuum started.
1070                  */
1071                 if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
1072                         ropaque->btpo_flags |= BTP_SPLIT_END;
1073         }
1074
1075         /*
1076          * Right sibling is locked, new siblings are prepared, but original page
1077          * is not updated yet.
1078          *
1079          * NO EREPORT(ERROR) till right sibling is updated.  We can get away with
1080          * not starting the critical section till here because we haven't been
1081          * scribbling on the original page yet, and we don't care about the new
1082          * sibling until it's linked into the btree.
1083          */
1084         START_CRIT_SECTION();
1085
1086         /*
1087          * By here, the original data page has been split into two new halves, and
1088          * these are correct.  The algorithm requires that the left page never
1089          * move during a split, so we copy the new left page back on top of the
1090          * original.  Note that this is not a waste of time, since we also require
1091          * (in the page management code) that the center of a page always be
1092          * clean, and the most efficient way to guarantee this is just to compact
1093          * the data by reinserting it into a new left page.  (XXX the latter
1094          * comment is probably obsolete.)
1095          *
1096          * We need to do this before writing the WAL record, so that XLogInsert
1097          * can WAL log an image of the page if necessary.
1098          */
1099         PageRestoreTempPage(leftpage, origpage);
1100
1101         MarkBufferDirty(buf);
1102         MarkBufferDirty(rbuf);
1103
1104         if (!P_RIGHTMOST(ropaque))
1105         {
1106                 sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
1107                 MarkBufferDirty(sbuf);
1108         }
1109
1110         /* XLOG stuff */
1111         if (!rel->rd_istemp)
1112         {
1113                 xl_btree_split xlrec;
1114                 uint8           xlinfo;
1115                 XLogRecPtr      recptr;
1116                 XLogRecData rdata[7];
1117                 XLogRecData *lastrdata;
1118
1119                 xlrec.node = rel->rd_node;
1120                 xlrec.leftsib = BufferGetBlockNumber(buf);
1121                 xlrec.rightsib = BufferGetBlockNumber(rbuf);
1122                 xlrec.rnext = ropaque->btpo_next;
1123                 xlrec.level = ropaque->btpo.level;
1124                 xlrec.firstright = firstright;
1125
1126                 rdata[0].data = (char *) &xlrec;
1127                 rdata[0].len = SizeOfBtreeSplit;
1128                 rdata[0].buffer = InvalidBuffer;
1129
1130                 lastrdata = &rdata[0];
1131
1132                 if (ropaque->btpo.level > 0)
1133                 {
1134                         /* Log downlink on non-leaf pages */
1135                         lastrdata->next = lastrdata + 1;
1136                         lastrdata++;
1137
1138                         lastrdata->data = (char *) &newitem->t_tid.ip_blkid;
1139                         lastrdata->len = sizeof(BlockIdData);
1140                         lastrdata->buffer = InvalidBuffer;
1141
1142                         /*
1143                          * We must also log the left page's high key, because the right
1144                          * page's leftmost key is suppressed on non-leaf levels.  Show it
1145                          * as belonging to the left page buffer, so that it is not stored
1146                          * if XLogInsert decides it needs a full-page image of the left
1147                          * page.
1148                          */
1149                         lastrdata->next = lastrdata + 1;
1150                         lastrdata++;
1151
1152                         itemid = PageGetItemId(origpage, P_HIKEY);
1153                         item = (IndexTuple) PageGetItem(origpage, itemid);
1154                         lastrdata->data = (char *) item;
1155                         lastrdata->len = MAXALIGN(IndexTupleSize(item));
1156                         lastrdata->buffer = buf;        /* backup block 1 */
1157                         lastrdata->buffer_std = true;
1158                 }
1159
1160                 /*
1161                  * Log the new item and its offset, if it was inserted on the left
1162                  * page. (If it was put on the right page, we don't need to explicitly
1163                  * WAL log it because it's included with all the other items on the
1164                  * right page.) Show the new item as belonging to the left page
1165                  * buffer, so that it is not stored if XLogInsert decides it needs a
1166                  * full-page image of the left page.  We store the offset anyway,
1167                  * though, to support archive compression of these records.
1168                  */
1169                 if (newitemonleft)
1170                 {
1171                         lastrdata->next = lastrdata + 1;
1172                         lastrdata++;
1173
1174                         lastrdata->data = (char *) &newitemoff;
1175                         lastrdata->len = sizeof(OffsetNumber);
1176                         lastrdata->buffer = InvalidBuffer;
1177
1178                         lastrdata->next = lastrdata + 1;
1179                         lastrdata++;
1180
1181                         lastrdata->data = (char *) newitem;
1182                         lastrdata->len = MAXALIGN(newitemsz);
1183                         lastrdata->buffer = buf;        /* backup block 1 */
1184                         lastrdata->buffer_std = true;
1185                 }
1186                 else if (ropaque->btpo.level == 0)
1187                 {
1188                         /*
1189                          * Although we don't need to WAL-log the new item, we still need
1190                          * XLogInsert to consider storing a full-page image of the left
1191                          * page, so make an empty entry referencing that buffer. This also
1192                          * ensures that the left page is always backup block 1.
1193                          */
1194                         lastrdata->next = lastrdata + 1;
1195                         lastrdata++;
1196
1197                         lastrdata->data = NULL;
1198                         lastrdata->len = 0;
1199                         lastrdata->buffer = buf;        /* backup block 1 */
1200                         lastrdata->buffer_std = true;
1201                 }
1202
1203                 /*
1204                  * Log the contents of the right page in the format understood by
1205                  * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
1206                  * because we're going to recreate the whole page anyway, so it should
1207                  * never be stored by XLogInsert.
1208                  *
1209                  * Direct access to page is not good but faster - we should implement
1210                  * some new func in page API.  Note we only store the tuples
1211                  * themselves, knowing that they were inserted in item-number order
1212                  * and so the item pointers can be reconstructed.  See comments for
1213                  * _bt_restore_page().
1214                  */
1215                 lastrdata->next = lastrdata + 1;
1216                 lastrdata++;
1217
1218                 lastrdata->data = (char *) rightpage +
1219                         ((PageHeader) rightpage)->pd_upper;
1220                 lastrdata->len = ((PageHeader) rightpage)->pd_special -
1221                         ((PageHeader) rightpage)->pd_upper;
1222                 lastrdata->buffer = InvalidBuffer;
1223
1224                 /* Log the right sibling, because we've changed its' prev-pointer. */
1225                 if (!P_RIGHTMOST(ropaque))
1226                 {
1227                         lastrdata->next = lastrdata + 1;
1228                         lastrdata++;
1229
1230                         lastrdata->data = NULL;
1231                         lastrdata->len = 0;
1232                         lastrdata->buffer = sbuf;       /* backup block 2 */
1233                         lastrdata->buffer_std = true;
1234                 }
1235
1236                 lastrdata->next = NULL;
1237
1238                 if (isroot)
1239                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
1240                 else
1241                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
1242
1243                 recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
1244
1245                 PageSetLSN(origpage, recptr);
1246                 PageSetTLI(origpage, ThisTimeLineID);
1247                 PageSetLSN(rightpage, recptr);
1248                 PageSetTLI(rightpage, ThisTimeLineID);
1249                 if (!P_RIGHTMOST(ropaque))
1250                 {
1251                         PageSetLSN(spage, recptr);
1252                         PageSetTLI(spage, ThisTimeLineID);
1253                 }
1254         }
1255
1256         END_CRIT_SECTION();
1257
1258         /* release the old right sibling */
1259         if (!P_RIGHTMOST(ropaque))
1260                 _bt_relbuf(rel, sbuf);
1261
1262         /* split's done */
1263         return rbuf;
1264 }
1265
1266 /*
1267  *      _bt_findsplitloc() -- find an appropriate place to split a page.
1268  *
1269  * The idea here is to equalize the free space that will be on each split
1270  * page, *after accounting for the inserted tuple*.  (If we fail to account
1271  * for it, we might find ourselves with too little room on the page that
1272  * it needs to go into!)
1273  *
1274  * If the page is the rightmost page on its level, we instead try to arrange
1275  * to leave the left split page fillfactor% full.  In this way, when we are
1276  * inserting successively increasing keys (consider sequences, timestamps,
1277  * etc) we will end up with a tree whose pages are about fillfactor% full,
1278  * instead of the 50% full result that we'd get without this special case.
1279  * This is the same as nbtsort.c produces for a newly-created tree.  Note
1280  * that leaf and nonleaf pages use different fillfactors.
1281  *
1282  * We are passed the intended insert position of the new tuple, expressed as
1283  * the offsetnumber of the tuple it must go in front of.  (This could be
1284  * maxoff+1 if the tuple is to go at the end.)
1285  *
1286  * We return the index of the first existing tuple that should go on the
1287  * righthand page, plus a boolean indicating whether the new tuple goes on
1288  * the left or right page.      The bool is necessary to disambiguate the case
1289  * where firstright == newitemoff.
1290  */
1291 static OffsetNumber
1292 _bt_findsplitloc(Relation rel,
1293                                  Page page,
1294                                  OffsetNumber newitemoff,
1295                                  Size newitemsz,
1296                                  bool *newitemonleft)
1297 {
1298         BTPageOpaque opaque;
1299         OffsetNumber offnum;
1300         OffsetNumber maxoff;
1301         ItemId          itemid;
1302         FindSplitData state;
1303         int                     leftspace,
1304                                 rightspace,
1305                                 goodenough,
1306                                 olddataitemstotal,
1307                                 olddataitemstoleft;
1308         bool            goodenoughfound;
1309
1310         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1311
1312         /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
1313         newitemsz += sizeof(ItemIdData);
1314
1315         /* Total free space available on a btree page, after fixed overhead */
1316         leftspace = rightspace =
1317                 PageGetPageSize(page) - SizeOfPageHeaderData -
1318                 MAXALIGN(sizeof(BTPageOpaqueData));
1319
1320         /* The right page will have the same high key as the old page */
1321         if (!P_RIGHTMOST(opaque))
1322         {
1323                 itemid = PageGetItemId(page, P_HIKEY);
1324                 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1325                                                          sizeof(ItemIdData));
1326         }
1327
1328         /* Count up total space in data items without actually scanning 'em */
1329         olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page);
1330
1331         state.newitemsz = newitemsz;
1332         state.is_leaf = P_ISLEAF(opaque);
1333         state.is_rightmost = P_RIGHTMOST(opaque);
1334         state.have_split = false;
1335         if (state.is_leaf)
1336                 state.fillfactor = RelationGetFillFactor(rel,
1337                                                                                                  BTREE_DEFAULT_FILLFACTOR);
1338         else
1339                 state.fillfactor = BTREE_NONLEAF_FILLFACTOR;
1340         state.newitemonleft = false;    /* these just to keep compiler quiet */
1341         state.firstright = 0;
1342         state.best_delta = 0;
1343         state.leftspace = leftspace;
1344         state.rightspace = rightspace;
1345         state.olddataitemstotal = olddataitemstotal;
1346         state.newitemoff = newitemoff;
1347
1348         /*
1349          * Finding the best possible split would require checking all the possible
1350          * split points, because of the high-key and left-key special cases.
1351          * That's probably more work than it's worth; instead, stop as soon as we
1352          * find a "good-enough" split, where good-enough is defined as an
1353          * imbalance in free space of no more than pagesize/16 (arbitrary...) This
1354          * should let us stop near the middle on most pages, instead of plowing to
1355          * the end.
1356          */
1357         goodenough = leftspace / 16;
1358
1359         /*
1360          * Scan through the data items and calculate space usage for a split at
1361          * each possible position.
1362          */
1363         olddataitemstoleft = 0;
1364         goodenoughfound = false;
1365         maxoff = PageGetMaxOffsetNumber(page);
1366
1367         for (offnum = P_FIRSTDATAKEY(opaque);
1368                  offnum <= maxoff;
1369                  offnum = OffsetNumberNext(offnum))
1370         {
1371                 Size            itemsz;
1372
1373                 itemid = PageGetItemId(page, offnum);
1374                 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1375
1376                 /*
1377                  * Will the new item go to left or right of split?
1378                  */
1379                 if (offnum > newitemoff)
1380                         _bt_checksplitloc(&state, offnum, true,
1381                                                           olddataitemstoleft, itemsz);
1382
1383                 else if (offnum < newitemoff)
1384                         _bt_checksplitloc(&state, offnum, false,
1385                                                           olddataitemstoleft, itemsz);
1386                 else
1387                 {
1388                         /* need to try it both ways! */
1389                         _bt_checksplitloc(&state, offnum, true,
1390                                                           olddataitemstoleft, itemsz);
1391
1392                         _bt_checksplitloc(&state, offnum, false,
1393                                                           olddataitemstoleft, itemsz);
1394                 }
1395
1396                 /* Abort scan once we find a good-enough choice */
1397                 if (state.have_split && state.best_delta <= goodenough)
1398                 {
1399                         goodenoughfound = true;
1400                         break;
1401                 }
1402
1403                 olddataitemstoleft += itemsz;
1404         }
1405
1406         /*
1407          * If the new item goes as the last item, check for splitting so that all
1408          * the old items go to the left page and the new item goes to the right
1409          * page.
1410          */
1411         if (newitemoff > maxoff && !goodenoughfound)
1412                 _bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0);
1413
1414         /*
1415          * I believe it is not possible to fail to find a feasible split, but just
1416          * in case ...
1417          */
1418         if (!state.have_split)
1419                 elog(ERROR, "could not find a feasible split point for index \"%s\"",
1420                          RelationGetRelationName(rel));
1421
1422         *newitemonleft = state.newitemonleft;
1423         return state.firstright;
1424 }
1425
1426 /*
1427  * Subroutine to analyze a particular possible split choice (ie, firstright
1428  * and newitemonleft settings), and record the best split so far in *state.
1429  *
1430  * firstoldonright is the offset of the first item on the original page
1431  * that goes to the right page, and firstoldonrightsz is the size of that
1432  * tuple. firstoldonright can be > max offset, which means that all the old
1433  * items go to the left page and only the new item goes to the right page.
1434  * In that case, firstoldonrightsz is not used.
1435  *
1436  * olddataitemstoleft is the total size of all old items to the left of
1437  * firstoldonright.
1438  */
1439 static void
1440 _bt_checksplitloc(FindSplitData *state,
1441                                   OffsetNumber firstoldonright,
1442                                   bool newitemonleft,
1443                                   int olddataitemstoleft,
1444                                   Size firstoldonrightsz)
1445 {
1446         int                     leftfree,
1447                                 rightfree;
1448         Size            firstrightitemsz;
1449         bool            newitemisfirstonright;
1450
1451         /* Is the new item going to be the first item on the right page? */
1452         newitemisfirstonright = (firstoldonright == state->newitemoff
1453                                                          && !newitemonleft);
1454
1455         if (newitemisfirstonright)
1456                 firstrightitemsz = state->newitemsz;
1457         else
1458                 firstrightitemsz = firstoldonrightsz;
1459
1460         /* Account for all the old tuples */
1461         leftfree = state->leftspace - olddataitemstoleft;
1462         rightfree = state->rightspace -
1463                 (state->olddataitemstotal - olddataitemstoleft);
1464
1465         /*
1466          * The first item on the right page becomes the high key of the left page;
1467          * therefore it counts against left space as well as right space.
1468          */
1469         leftfree -= firstrightitemsz;
1470
1471         /* account for the new item */
1472         if (newitemonleft)
1473                 leftfree -= (int) state->newitemsz;
1474         else
1475                 rightfree -= (int) state->newitemsz;
1476
1477         /*
1478          * If we are not on the leaf level, we will be able to discard the key
1479          * data from the first item that winds up on the right page.
1480          */
1481         if (!state->is_leaf)
1482                 rightfree += (int) firstrightitemsz -
1483                         (int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData));
1484
1485         /*
1486          * If feasible split point, remember best delta.
1487          */
1488         if (leftfree >= 0 && rightfree >= 0)
1489         {
1490                 int                     delta;
1491
1492                 if (state->is_rightmost)
1493                 {
1494                         /*
1495                          * If splitting a rightmost page, try to put (100-fillfactor)% of
1496                          * free space on left page. See comments for _bt_findsplitloc.
1497                          */
1498                         delta = (state->fillfactor * leftfree)
1499                                 - ((100 - state->fillfactor) * rightfree);
1500                 }
1501                 else
1502                 {
1503                         /* Otherwise, aim for equal free space on both sides */
1504                         delta = leftfree - rightfree;
1505                 }
1506
1507                 if (delta < 0)
1508                         delta = -delta;
1509                 if (!state->have_split || delta < state->best_delta)
1510                 {
1511                         state->have_split = true;
1512                         state->newitemonleft = newitemonleft;
1513                         state->firstright = firstoldonright;
1514                         state->best_delta = delta;
1515                 }
1516         }
1517 }
1518
1519 /*
1520  * _bt_insert_parent() -- Insert downlink into parent after a page split.
1521  *
1522  * On entry, buf and rbuf are the left and right split pages, which we
1523  * still hold write locks on per the L&Y algorithm.  We release the
1524  * write locks once we have write lock on the parent page.      (Any sooner,
1525  * and it'd be possible for some other process to try to split or delete
1526  * one of these pages, and get confused because it cannot find the downlink.)
1527  *
1528  * stack - stack showing how we got here.  May be NULL in cases that don't
1529  *                      have to be efficient (concurrent ROOT split, WAL recovery)
1530  * is_root - we split the true root
1531  * is_only - we split a page alone on its level (might have been fast root)
1532  *
1533  * This is exported so it can be called by nbtxlog.c.
1534  */
1535 void
1536 _bt_insert_parent(Relation rel,
1537                                   Buffer buf,
1538                                   Buffer rbuf,
1539                                   BTStack stack,
1540                                   bool is_root,
1541                                   bool is_only)
1542 {
1543         /*
1544          * Here we have to do something Lehman and Yao don't talk about: deal with
1545          * a root split and construction of a new root.  If our stack is empty
1546          * then we have just split a node on what had been the root level when we
1547          * descended the tree.  If it was still the root then we perform a
1548          * new-root construction.  If it *wasn't* the root anymore, search to find
1549          * the next higher level that someone constructed meanwhile, and find the
1550          * right place to insert as for the normal case.
1551          *
1552          * If we have to search for the parent level, we do so by re-descending
1553          * from the root.  This is not super-efficient, but it's rare enough not
1554          * to matter.  (This path is also taken when called from WAL recovery ---
1555          * we have no stack in that case.)
1556          */
1557         if (is_root)
1558         {
1559                 Buffer          rootbuf;
1560
1561                 Assert(stack == NULL);
1562                 Assert(is_only);
1563                 /* create a new root node and update the metapage */
1564                 rootbuf = _bt_newroot(rel, buf, rbuf);
1565                 /* release the split buffers */
1566                 _bt_relbuf(rel, rootbuf);
1567                 _bt_relbuf(rel, rbuf);
1568                 _bt_relbuf(rel, buf);
1569         }
1570         else
1571         {
1572                 BlockNumber bknum = BufferGetBlockNumber(buf);
1573                 BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1574                 Page            page = BufferGetPage(buf);
1575                 IndexTuple      new_item;
1576                 BTStackData fakestack;
1577                 IndexTuple      ritem;
1578                 Buffer          pbuf;
1579
1580                 if (stack == NULL)
1581                 {
1582                         BTPageOpaque lpageop;
1583
1584                         if (!InRecovery)
1585                                 elog(DEBUG2, "concurrent ROOT page split");
1586                         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1587                         /* Find the leftmost page at the next level up */
1588                         pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
1589                         /* Set up a phony stack entry pointing there */
1590                         stack = &fakestack;
1591                         stack->bts_blkno = BufferGetBlockNumber(pbuf);
1592                         stack->bts_offset = InvalidOffsetNumber;
1593                         /* bts_btentry will be initialized below */
1594                         stack->bts_parent = NULL;
1595                         _bt_relbuf(rel, pbuf);
1596                 }
1597
1598                 /* get high key from left page == lowest key on new right page */
1599                 ritem = (IndexTuple) PageGetItem(page,
1600                                                                                  PageGetItemId(page, P_HIKEY));
1601
1602                 /* form an index tuple that points at the new right page */
1603                 new_item = CopyIndexTuple(ritem);
1604                 ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
1605
1606                 /*
1607                  * Find the parent buffer and get the parent page.
1608                  *
1609                  * Oops - if we were moved right then we need to change stack item! We
1610                  * want to find parent pointing to where we are, right ?        - vadim
1611                  * 05/27/97
1612                  */
1613                 ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
1614
1615                 pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1616
1617                 /* Now we can unlock the children */
1618                 _bt_relbuf(rel, rbuf);
1619                 _bt_relbuf(rel, buf);
1620
1621                 /* Check for error only after writing children */
1622                 if (pbuf == InvalidBuffer)
1623                         elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
1624                                  RelationGetRelationName(rel), bknum, rbknum);
1625
1626                 /* Recursively update the parent */
1627                 _bt_insertonpg(rel, pbuf, stack->bts_parent,
1628                                            new_item, stack->bts_offset + 1,
1629                                            is_only);
1630
1631                 /* be tidy */
1632                 pfree(new_item);
1633         }
1634 }
1635
1636 /*
1637  *      _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1638  *                                               we last looked at in the parent.
1639  *
1640  *              This is possible because we save the downlink from the parent item,
1641  *              which is enough to uniquely identify it.  Insertions into the parent
1642  *              level could cause the item to move right; deletions could cause it
1643  *              to move left, but not left of the page we previously found it in.
1644  *
1645  *              Adjusts bts_blkno & bts_offset if changed.
1646  *
1647  *              Returns InvalidBuffer if item not found (should not happen).
1648  */
1649 Buffer
1650 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1651 {
1652         BlockNumber blkno;
1653         OffsetNumber start;
1654
1655         blkno = stack->bts_blkno;
1656         start = stack->bts_offset;
1657
1658         for (;;)
1659         {
1660                 Buffer          buf;
1661                 Page            page;
1662                 BTPageOpaque opaque;
1663
1664                 buf = _bt_getbuf(rel, blkno, access);
1665                 page = BufferGetPage(buf);
1666                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1667
1668                 if (!P_IGNORE(opaque))
1669                 {
1670                         OffsetNumber offnum,
1671                                                 minoff,
1672                                                 maxoff;
1673                         ItemId          itemid;
1674                         IndexTuple      item;
1675
1676                         minoff = P_FIRSTDATAKEY(opaque);
1677                         maxoff = PageGetMaxOffsetNumber(page);
1678
1679                         /*
1680                          * start = InvalidOffsetNumber means "search the whole page". We
1681                          * need this test anyway due to possibility that page has a high
1682                          * key now when it didn't before.
1683                          */
1684                         if (start < minoff)
1685                                 start = minoff;
1686
1687                         /*
1688                          * Need this check too, to guard against possibility that page
1689                          * split since we visited it originally.
1690                          */
1691                         if (start > maxoff)
1692                                 start = OffsetNumberNext(maxoff);
1693
1694                         /*
1695                          * These loops will check every item on the page --- but in an
1696                          * order that's attuned to the probability of where it actually
1697                          * is.  Scan to the right first, then to the left.
1698                          */
1699                         for (offnum = start;
1700                                  offnum <= maxoff;
1701                                  offnum = OffsetNumberNext(offnum))
1702                         {
1703                                 itemid = PageGetItemId(page, offnum);
1704                                 item = (IndexTuple) PageGetItem(page, itemid);
1705                                 if (BTEntrySame(item, &stack->bts_btentry))
1706                                 {
1707                                         /* Return accurate pointer to where link is now */
1708                                         stack->bts_blkno = blkno;
1709                                         stack->bts_offset = offnum;
1710                                         return buf;
1711                                 }
1712                         }
1713
1714                         for (offnum = OffsetNumberPrev(start);
1715                                  offnum >= minoff;
1716                                  offnum = OffsetNumberPrev(offnum))
1717                         {
1718                                 itemid = PageGetItemId(page, offnum);
1719                                 item = (IndexTuple) PageGetItem(page, itemid);
1720                                 if (BTEntrySame(item, &stack->bts_btentry))
1721                                 {
1722                                         /* Return accurate pointer to where link is now */
1723                                         stack->bts_blkno = blkno;
1724                                         stack->bts_offset = offnum;
1725                                         return buf;
1726                                 }
1727                         }
1728                 }
1729
1730                 /*
1731                  * The item we're looking for moved right at least one page.
1732                  */
1733                 if (P_RIGHTMOST(opaque))
1734                 {
1735                         _bt_relbuf(rel, buf);
1736                         return InvalidBuffer;
1737                 }
1738                 blkno = opaque->btpo_next;
1739                 start = InvalidOffsetNumber;
1740                 _bt_relbuf(rel, buf);
1741         }
1742 }
1743
1744 /*
1745  *      _bt_newroot() -- Create a new root page for the index.
1746  *
1747  *              We've just split the old root page and need to create a new one.
1748  *              In order to do this, we add a new root page to the file, then lock
1749  *              the metadata page and update it.  This is guaranteed to be deadlock-
1750  *              free, because all readers release their locks on the metadata page
1751  *              before trying to lock the root, and all writers lock the root before
1752  *              trying to lock the metadata page.  We have a write lock on the old
1753  *              root page, so we have not introduced any cycles into the waits-for
1754  *              graph.
1755  *
1756  *              On entry, lbuf (the old root) and rbuf (its new peer) are write-
1757  *              locked. On exit, a new root page exists with entries for the
1758  *              two new children, metapage is updated and unlocked/unpinned.
1759  *              The new root buffer is returned to caller which has to unlock/unpin
1760  *              lbuf, rbuf & rootbuf.
1761  */
1762 static Buffer
1763 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1764 {
1765         Buffer          rootbuf;
1766         Page            lpage,
1767                                 rootpage;
1768         BlockNumber lbkno,
1769                                 rbkno;
1770         BlockNumber rootblknum;
1771         BTPageOpaque rootopaque;
1772         ItemId          itemid;
1773         IndexTuple      item;
1774         Size            itemsz;
1775         IndexTuple      new_item;
1776         Buffer          metabuf;
1777         Page            metapg;
1778         BTMetaPageData *metad;
1779
1780         lbkno = BufferGetBlockNumber(lbuf);
1781         rbkno = BufferGetBlockNumber(rbuf);
1782         lpage = BufferGetPage(lbuf);
1783
1784         /* get a new root page */
1785         rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1786         rootpage = BufferGetPage(rootbuf);
1787         rootblknum = BufferGetBlockNumber(rootbuf);
1788
1789         /* acquire lock on the metapage */
1790         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1791         metapg = BufferGetPage(metabuf);
1792         metad = BTPageGetMeta(metapg);
1793
1794         /* NO EREPORT(ERROR) from here till newroot op is logged */
1795         START_CRIT_SECTION();
1796
1797         /* set btree special data */
1798         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1799         rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1800         rootopaque->btpo_flags = BTP_ROOT;
1801         rootopaque->btpo.level =
1802                 ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1803         rootopaque->btpo_cycleid = 0;
1804
1805         /* update metapage data */
1806         metad->btm_root = rootblknum;
1807         metad->btm_level = rootopaque->btpo.level;
1808         metad->btm_fastroot = rootblknum;
1809         metad->btm_fastlevel = rootopaque->btpo.level;
1810
1811         /*
1812          * Create downlink item for left page (old root).  Since this will be the
1813          * first item in a non-leaf page, it implicitly has minus-infinity key
1814          * value, so we need not store any actual key in it.
1815          */
1816         itemsz = sizeof(IndexTupleData);
1817         new_item = (IndexTuple) palloc(itemsz);
1818         new_item->t_info = itemsz;
1819         ItemPointerSet(&(new_item->t_tid), lbkno, P_HIKEY);
1820
1821         /*
1822          * Insert the left page pointer into the new root page.  The root page is
1823          * the rightmost page on its level so there is no "high key" in it; the
1824          * two items will go into positions P_HIKEY and P_FIRSTKEY.
1825          *
1826          * Note: we *must* insert the two items in item-number order, for the
1827          * benefit of _bt_restore_page().
1828          */
1829         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY,
1830                                         false, false) == InvalidOffsetNumber)
1831                 elog(PANIC, "failed to add leftkey to new root page"
1832                          " while splitting block %u of index \"%s\"",
1833                          BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
1834         pfree(new_item);
1835
1836         /*
1837          * Create downlink item for right page.  The key for it is obtained from
1838          * the "high key" position in the left page.
1839          */
1840         itemid = PageGetItemId(lpage, P_HIKEY);
1841         itemsz = ItemIdGetLength(itemid);
1842         item = (IndexTuple) PageGetItem(lpage, itemid);
1843         new_item = CopyIndexTuple(item);
1844         ItemPointerSet(&(new_item->t_tid), rbkno, P_HIKEY);
1845
1846         /*
1847          * insert the right page pointer into the new root page.
1848          */
1849         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY,
1850                                         false, false) == InvalidOffsetNumber)
1851                 elog(PANIC, "failed to add rightkey to new root page"
1852                          " while splitting block %u of index \"%s\"",
1853                          BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
1854         pfree(new_item);
1855
1856         MarkBufferDirty(rootbuf);
1857         MarkBufferDirty(metabuf);
1858
1859         /* XLOG stuff */
1860         if (!rel->rd_istemp)
1861         {
1862                 xl_btree_newroot xlrec;
1863                 XLogRecPtr      recptr;
1864                 XLogRecData rdata[2];
1865
1866                 xlrec.node = rel->rd_node;
1867                 xlrec.rootblk = rootblknum;
1868                 xlrec.level = metad->btm_level;
1869
1870                 rdata[0].data = (char *) &xlrec;
1871                 rdata[0].len = SizeOfBtreeNewroot;
1872                 rdata[0].buffer = InvalidBuffer;
1873                 rdata[0].next = &(rdata[1]);
1874
1875                 /*
1876                  * Direct access to page is not good but faster - we should implement
1877                  * some new func in page API.
1878                  */
1879                 rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
1880                 rdata[1].len = ((PageHeader) rootpage)->pd_special -
1881                         ((PageHeader) rootpage)->pd_upper;
1882                 rdata[1].buffer = InvalidBuffer;
1883                 rdata[1].next = NULL;
1884
1885                 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
1886
1887                 PageSetLSN(rootpage, recptr);
1888                 PageSetTLI(rootpage, ThisTimeLineID);
1889                 PageSetLSN(metapg, recptr);
1890                 PageSetTLI(metapg, ThisTimeLineID);
1891         }
1892
1893         END_CRIT_SECTION();
1894
1895         /* send out relcache inval for metapage change */
1896         if (!InRecovery)
1897                 CacheInvalidateRelcache(rel);
1898
1899         /* done with metapage */
1900         _bt_relbuf(rel, metabuf);
1901
1902         return rootbuf;
1903 }
1904
1905 /*
1906  *      _bt_pgaddtup() -- add a tuple to a particular page in the index.
1907  *
1908  *              This routine adds the tuple to the page as requested.  It does
1909  *              not affect pin/lock status, but you'd better have a write lock
1910  *              and pin on the target buffer!  Don't forget to write and release
1911  *              the buffer afterwards, either.
1912  *
1913  *              The main difference between this routine and a bare PageAddItem call
1914  *              is that this code knows that the leftmost index tuple on a non-leaf
1915  *              btree page doesn't need to have a key.  Therefore, it strips such
1916  *              tuples down to just the tuple header.  CAUTION: this works ONLY if
1917  *              we insert the tuples in order, so that the given itup_off does
1918  *              represent the final position of the tuple!
1919  */
1920 static void
1921 _bt_pgaddtup(Relation rel,
1922                          Page page,
1923                          Size itemsize,
1924                          IndexTuple itup,
1925                          OffsetNumber itup_off,
1926                          const char *where)
1927 {
1928         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1929         IndexTupleData trunctuple;
1930
1931         if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
1932         {
1933                 trunctuple = *itup;
1934                 trunctuple.t_info = sizeof(IndexTupleData);
1935                 itup = &trunctuple;
1936                 itemsize = sizeof(IndexTupleData);
1937         }
1938
1939         if (PageAddItem(page, (Item) itup, itemsize, itup_off,
1940                                         false, false) == InvalidOffsetNumber)
1941                 elog(PANIC, "failed to add item to the %s in index \"%s\"",
1942                          where, RelationGetRelationName(rel));
1943 }
1944
1945 /*
1946  * _bt_isequal - used in _bt_doinsert in check for duplicates.
1947  *
1948  * This is very similar to _bt_compare, except for NULL handling.
1949  * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
1950  */
1951 static bool
1952 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
1953                         int keysz, ScanKey scankey)
1954 {
1955         IndexTuple      itup;
1956         int                     i;
1957
1958         /* Better be comparing to a leaf item */
1959         Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
1960
1961         itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
1962
1963         for (i = 1; i <= keysz; i++)
1964         {
1965                 AttrNumber      attno;
1966                 Datum           datum;
1967                 bool            isNull;
1968                 int32           result;
1969
1970                 attno = scankey->sk_attno;
1971                 Assert(attno == i);
1972                 datum = index_getattr(itup, attno, itupdesc, &isNull);
1973
1974                 /* NULLs are never equal to anything */
1975                 if (isNull || (scankey->sk_flags & SK_ISNULL))
1976                         return false;
1977
1978                 result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
1979                                                                                          datum,
1980                                                                                          scankey->sk_argument));
1981
1982                 if (result != 0)
1983                         return false;
1984
1985                 scankey++;
1986         }
1987
1988         /* if we get here, the keys are equal */
1989         return true;
1990 }
1991
1992 /*
1993  * _bt_vacuum_one_page - vacuum just one index page.
1994  *
1995  * Try to remove LP_DEAD items from the given page.  The passed buffer
1996  * must be exclusive-locked, but unlike a real VACUUM, we don't need a
1997  * super-exclusive "cleanup" lock (see nbtree/README).
1998  */
1999 static void
2000 _bt_vacuum_one_page(Relation rel, Buffer buffer)
2001 {
2002         OffsetNumber deletable[MaxOffsetNumber];
2003         int                     ndeletable = 0;
2004         OffsetNumber offnum,
2005                                 minoff,
2006                                 maxoff;
2007         Page            page = BufferGetPage(buffer);
2008         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2009
2010         /*
2011          * Scan over all items to see which ones need to be deleted according to
2012          * LP_DEAD flags.
2013          */
2014         minoff = P_FIRSTDATAKEY(opaque);
2015         maxoff = PageGetMaxOffsetNumber(page);
2016         for (offnum = minoff;
2017                  offnum <= maxoff;
2018                  offnum = OffsetNumberNext(offnum))
2019         {
2020                 ItemId          itemId = PageGetItemId(page, offnum);
2021
2022                 if (ItemIdIsDead(itemId))
2023                         deletable[ndeletable++] = offnum;
2024         }
2025
2026         if (ndeletable > 0)
2027                 _bt_delitems(rel, buffer, deletable, ndeletable);
2028
2029         /*
2030          * Note: if we didn't find any LP_DEAD items, then the page's
2031          * BTP_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
2032          * separate write to clear it, however.  We will clear it when we split
2033          * the page.
2034          */
2035 }