]> granicus.if.org Git - postgresql/blob - src/backend/access/nbtree/nbtinsert.c
Allow read only connections during recovery, known as Hot Standby.
[postgresql] / src / backend / access / nbtree / nbtinsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtinsert.c
4  *        Item insertion in Lehman and Yao btrees for Postgres.
5  *
6  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.175 2009/12/19 01:32:32 sriggs Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "access/transam.h"
21 #include "miscadmin.h"
22 #include "storage/bufmgr.h"
23 #include "storage/lmgr.h"
24 #include "utils/inval.h"
25 #include "utils/tqual.h"
26
27
28 typedef struct
29 {
30         /* context data for _bt_checksplitloc */
31         Size            newitemsz;              /* size of new item to be inserted */
32         int                     fillfactor;             /* needed when splitting rightmost page */
33         bool            is_leaf;                /* T if splitting a leaf page */
34         bool            is_rightmost;   /* T if splitting a rightmost page */
35         OffsetNumber newitemoff;        /* where the new item is to be inserted */
36         int                     leftspace;              /* space available for items on left page */
37         int                     rightspace;             /* space available for items on right page */
38         int                     olddataitemstotal;              /* space taken by old items */
39
40         bool            have_split;             /* found a valid split? */
41
42         /* these fields valid only if have_split is true */
43         bool            newitemonleft;  /* new item on left or right of best split */
44         OffsetNumber firstright;        /* best split point */
45         int                     best_delta;             /* best size delta so far */
46 } FindSplitData;
47
48
49 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
50
51 static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
52                                  Relation heapRel, Buffer buf, OffsetNumber offset,
53                                  ScanKey itup_scankey,
54                                  IndexUniqueCheck checkUnique, bool *is_unique);
55 static void _bt_findinsertloc(Relation rel,
56                                   Buffer *bufptr,
57                                   OffsetNumber *offsetptr,
58                                   int keysz,
59                                   ScanKey scankey,
60                                   IndexTuple newtup);
61 static void _bt_insertonpg(Relation rel, Buffer buf,
62                            BTStack stack,
63                            IndexTuple itup,
64                            OffsetNumber newitemoff,
65                            bool split_only_page);
66 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
67                   OffsetNumber newitemoff, Size newitemsz,
68                   IndexTuple newitem, bool newitemonleft);
69 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
70                                  OffsetNumber newitemoff,
71                                  Size newitemsz,
72                                  bool *newitemonleft);
73 static void _bt_checksplitloc(FindSplitData *state,
74                                   OffsetNumber firstoldonright, bool newitemonleft,
75                                   int dataitemstoleft, Size firstoldonrightsz);
76 static void _bt_pgaddtup(Relation rel, Page page,
77                          Size itemsize, IndexTuple itup,
78                          OffsetNumber itup_off, const char *where);
79 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
80                         int keysz, ScanKey scankey);
81 static void _bt_vacuum_one_page(Relation rel, Buffer buffer);
82
83
84 /*
85  *      _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
86  *
87  *              This routine is called by the public interface routines, btbuild
88  *              and btinsert.  By here, itup is filled in, including the TID.
89  *
90  *              If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
91  *              will allow duplicates.  Otherwise (UNIQUE_CHECK_YES or
92  *              UNIQUE_CHECK_EXISTING) it will throw error for a duplicate.
93  *              For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and
94  *              don't actually insert.
95  *
96  *              The result value is only significant for UNIQUE_CHECK_PARTIAL:
97  *              it must be TRUE if the entry is known unique, else FALSE.
98  *              (In the current implementation we'll also return TRUE after a
99  *              successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but
100  *              that's just a coding artifact.)
101  */
102 bool
103 _bt_doinsert(Relation rel, IndexTuple itup,
104                          IndexUniqueCheck checkUnique, Relation heapRel)
105 {
106         bool            is_unique = false;
107         int                     natts = rel->rd_rel->relnatts;
108         ScanKey         itup_scankey;
109         BTStack         stack;
110         Buffer          buf;
111         OffsetNumber offset;
112
113         /* we need an insertion scan key to do our search, so build one */
114         itup_scankey = _bt_mkscankey(rel, itup);
115
116 top:
117         /* find the first page containing this key */
118         stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
119
120         offset = InvalidOffsetNumber;
121
122         /* trade in our read lock for a write lock */
123         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
124         LockBuffer(buf, BT_WRITE);
125
126         /*
127          * If the page was split between the time that we surrendered our read
128          * lock and acquired our write lock, then this page may no longer be the
129          * right place for the key we want to insert.  In this case, we need to
130          * move right in the tree.      See Lehman and Yao for an excruciatingly
131          * precise description.
132          */
133         buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
134
135         /*
136          * If we're not allowing duplicates, make sure the key isn't already in
137          * the index.
138          *
139          * NOTE: obviously, _bt_check_unique can only detect keys that are already
140          * in the index; so it cannot defend against concurrent insertions of the
141          * same key.  We protect against that by means of holding a write lock on
142          * the target page.  Any other would-be inserter of the same key must
143          * acquire a write lock on the same target page, so only one would-be
144          * inserter can be making the check at one time.  Furthermore, once we are
145          * past the check we hold write locks continuously until we have performed
146          * our insertion, so no later inserter can fail to see our insertion.
147          * (This requires some care in _bt_insertonpg.)
148          *
149          * If we must wait for another xact, we release the lock while waiting,
150          * and then must start over completely.
151          *
152          * For a partial uniqueness check, we don't wait for the other xact.
153          * Just let the tuple in and return false for possibly non-unique,
154          * or true for definitely unique.
155          */
156         if (checkUnique != UNIQUE_CHECK_NO)
157         {
158                 TransactionId xwait;
159
160                 offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
161                 xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
162                                                                  checkUnique, &is_unique);
163
164                 if (TransactionIdIsValid(xwait))
165                 {
166                         /* Have to wait for the other guy ... */
167                         _bt_relbuf(rel, buf);
168                         XactLockTableWait(xwait);
169                         /* start over... */
170                         _bt_freestack(stack);
171                         goto top;
172                 }
173         }
174
175         if (checkUnique != UNIQUE_CHECK_EXISTING)
176         {
177                 /* do the insertion */
178                 _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup);
179                 _bt_insertonpg(rel, buf, stack, itup, offset, false);
180         }
181         else
182         {
183                 /* just release the buffer */
184                 _bt_relbuf(rel, buf);
185         }
186
187         /* be tidy */
188         _bt_freestack(stack);
189         _bt_freeskey(itup_scankey);
190
191         return is_unique;
192 }
193
194 /*
195  *      _bt_check_unique() -- Check for violation of unique index constraint
196  *
197  * offset points to the first possible item that could conflict. It can
198  * also point to end-of-page, which means that the first tuple to check
199  * is the first tuple on the next page.
200  *
201  * Returns InvalidTransactionId if there is no conflict, else an xact ID
202  * we must wait for to see if it commits a conflicting tuple.   If an actual
203  * conflict is detected, no return --- just ereport().
204  *
205  * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
206  * InvalidTransactionId because we don't want to wait.  In this case we
207  * set *is_unique to false if there is a potential conflict, and the
208  * core code must redo the uniqueness check later.
209  */
210 static TransactionId
211 _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
212                                  Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
213                                  IndexUniqueCheck checkUnique, bool *is_unique)
214 {
215         TupleDesc       itupdesc = RelationGetDescr(rel);
216         int                     natts = rel->rd_rel->relnatts;
217         SnapshotData SnapshotDirty;
218         OffsetNumber maxoff;
219         Page            page;
220         BTPageOpaque opaque;
221         Buffer          nbuf = InvalidBuffer;
222         bool            found = false;
223
224         /* Assume unique until we find a duplicate */
225         *is_unique = true;
226
227         InitDirtySnapshot(SnapshotDirty);
228
229         page = BufferGetPage(buf);
230         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
231         maxoff = PageGetMaxOffsetNumber(page);
232
233         /*
234          * Scan over all equal tuples, looking for live conflicts.
235          */
236         for (;;)
237         {
238                 ItemId          curitemid;
239                 IndexTuple      curitup;
240                 BlockNumber nblkno;
241
242                 /*
243                  * make sure the offset points to an actual item before trying to
244                  * examine it...
245                  */
246                 if (offset <= maxoff)
247                 {
248                         curitemid = PageGetItemId(page, offset);
249
250                         /*
251                          * We can skip items that are marked killed.
252                          *
253                          * Formerly, we applied _bt_isequal() before checking the kill
254                          * flag, so as to fall out of the item loop as soon as possible.
255                          * However, in the presence of heavy update activity an index may
256                          * contain many killed items with the same key; running
257                          * _bt_isequal() on each killed item gets expensive. Furthermore
258                          * it is likely that the non-killed version of each key appears
259                          * first, so that we didn't actually get to exit any sooner
260                          * anyway. So now we just advance over killed items as quickly as
261                          * we can. We only apply _bt_isequal() when we get to a non-killed
262                          * item or the end of the page.
263                          */
264                         if (!ItemIdIsDead(curitemid))
265                         {
266                                 ItemPointerData htid;
267                                 bool            all_dead;
268
269                                 /*
270                                  * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
271                                  * how we handling NULLs - and so we must not use _bt_compare
272                                  * in real comparison, but only for ordering/finding items on
273                                  * pages. - vadim 03/24/97
274                                  */
275                                 if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
276                                         break;          /* we're past all the equal tuples */
277
278                                 /* okay, we gotta fetch the heap tuple ... */
279                                 curitup = (IndexTuple) PageGetItem(page, curitemid);
280                                 htid = curitup->t_tid;
281
282                                 /*
283                                  * If we are doing a recheck, we expect to find the tuple we
284                                  * are rechecking.  It's not a duplicate, but we have to keep
285                                  * scanning.
286                                  */
287                                 if (checkUnique == UNIQUE_CHECK_EXISTING &&
288                                         ItemPointerCompare(&htid, &itup->t_tid) == 0)
289                                 {
290                                         found = true;
291                                 }
292
293                                 /*
294                                  * We check the whole HOT-chain to see if there is any tuple
295                                  * that satisfies SnapshotDirty.  This is necessary because we
296                                  * have just a single index entry for the entire chain.
297                                  */
298                                 else if (heap_hot_search(&htid, heapRel, &SnapshotDirty,
299                                                                                  &all_dead))
300                                 {
301                                         TransactionId xwait;
302
303                                         /*
304                                          * It is a duplicate. If we are only doing a partial
305                                          * check, then don't bother checking if the tuple is
306                                          * being updated in another transaction. Just return
307                                          * the fact that it is a potential conflict and leave
308                                          * the full check till later.
309                                          */
310                                         if (checkUnique == UNIQUE_CHECK_PARTIAL)
311                                         {
312                                                 if (nbuf != InvalidBuffer)
313                                                         _bt_relbuf(rel, nbuf);
314                                                 *is_unique = false;
315                                                 return InvalidTransactionId;
316                                         }
317
318                                         /*
319                                          * If this tuple is being updated by other transaction
320                                          * then we have to wait for its commit/abort.
321                                          */
322                                         xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ?
323                                                 SnapshotDirty.xmin : SnapshotDirty.xmax;
324
325                                         if (TransactionIdIsValid(xwait))
326                                         {
327                                                 if (nbuf != InvalidBuffer)
328                                                         _bt_relbuf(rel, nbuf);
329                                                 /* Tell _bt_doinsert to wait... */
330                                                 return xwait;
331                                         }
332
333                                         /*
334                                          * Otherwise we have a definite conflict.  But before
335                                          * complaining, look to see if the tuple we want to insert
336                                          * is itself now committed dead --- if so, don't complain.
337                                          * This is a waste of time in normal scenarios but we must
338                                          * do it to support CREATE INDEX CONCURRENTLY.
339                                          *
340                                          * We must follow HOT-chains here because during
341                                          * concurrent index build, we insert the root TID though
342                                          * the actual tuple may be somewhere in the HOT-chain.
343                                          * While following the chain we might not stop at the
344                                          * exact tuple which triggered the insert, but that's OK
345                                          * because if we find a live tuple anywhere in this chain,
346                                          * we have a unique key conflict.  The other live tuple is
347                                          * not part of this chain because it had a different index
348                                          * entry.
349                                          */
350                                         htid = itup->t_tid;
351                                         if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL))
352                                         {
353                                                 /* Normal case --- it's still live */
354                                         }
355                                         else
356                                         {
357                                                 /*
358                                                  * It's been deleted, so no error, and no need to
359                                                  * continue searching
360                                                  */
361                                                 break;
362                                         }
363
364                                         /*
365                                          * This is a definite conflict.  Break the tuple down
366                                          * into datums and report the error.  But first, make
367                                          * sure we release the buffer locks we're holding ---
368                                          * BuildIndexValueDescription could make catalog accesses,
369                                          * which in the worst case might touch this same index
370                                          * and cause deadlocks.
371                                          */
372                                         if (nbuf != InvalidBuffer)
373                                                 _bt_relbuf(rel, nbuf);
374                                         _bt_relbuf(rel, buf);
375
376                                         {
377                                                 Datum   values[INDEX_MAX_KEYS];
378                                                 bool    isnull[INDEX_MAX_KEYS];
379
380                                                 index_deform_tuple(itup, RelationGetDescr(rel),
381                                                                                    values, isnull);
382                                                 ereport(ERROR,
383                                                                 (errcode(ERRCODE_UNIQUE_VIOLATION),
384                                                                  errmsg("duplicate key value violates unique constraint \"%s\"",
385                                                                                 RelationGetRelationName(rel)),
386                                                                  errdetail("Key %s already exists.",
387                                                                                    BuildIndexValueDescription(rel,
388                                                                                                                         values, isnull))));
389                                         }
390                                 }
391                                 else if (all_dead)
392                                 {
393                                         /*
394                                          * The conflicting tuple (or whole HOT chain) is dead to
395                                          * everyone, so we may as well mark the index entry
396                                          * killed.
397                                          */
398                                         ItemIdMarkDead(curitemid);
399                                         opaque->btpo_flags |= BTP_HAS_GARBAGE;
400                                         /* be sure to mark the proper buffer dirty... */
401                                         if (nbuf != InvalidBuffer)
402                                                 SetBufferCommitInfoNeedsSave(nbuf);
403                                         else
404                                                 SetBufferCommitInfoNeedsSave(buf);
405                                 }
406                         }
407                 }
408
409                 /*
410                  * Advance to next tuple to continue checking.
411                  */
412                 if (offset < maxoff)
413                         offset = OffsetNumberNext(offset);
414                 else
415                 {
416                         /* If scankey == hikey we gotta check the next page too */
417                         if (P_RIGHTMOST(opaque))
418                                 break;
419                         if (!_bt_isequal(itupdesc, page, P_HIKEY,
420                                                          natts, itup_scankey))
421                                 break;
422                         /* Advance to next non-dead page --- there must be one */
423                         for (;;)
424                         {
425                                 nblkno = opaque->btpo_next;
426                                 nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
427                                 page = BufferGetPage(nbuf);
428                                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
429                                 if (!P_IGNORE(opaque))
430                                         break;
431                                 if (P_RIGHTMOST(opaque))
432                                         elog(ERROR, "fell off the end of index \"%s\"",
433                                                  RelationGetRelationName(rel));
434                         }
435                         maxoff = PageGetMaxOffsetNumber(page);
436                         offset = P_FIRSTDATAKEY(opaque);
437                 }
438         }
439
440         /*
441          * If we are doing a recheck then we should have found the tuple we
442          * are checking.  Otherwise there's something very wrong --- probably,
443          * the index is on a non-immutable expression.
444          */
445         if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
446                 ereport(ERROR,
447                                 (errcode(ERRCODE_INTERNAL_ERROR),
448                                  errmsg("failed to re-find tuple within index \"%s\"",
449                                                 RelationGetRelationName(rel)),
450                                  errhint("This may be because of a non-immutable index expression.")));
451
452         if (nbuf != InvalidBuffer)
453                 _bt_relbuf(rel, nbuf);
454
455         return InvalidTransactionId;
456 }
457
458
459 /*
460  *      _bt_findinsertloc() -- Finds an insert location for a tuple
461  *
462  *              If the new key is equal to one or more existing keys, we can
463  *              legitimately place it anywhere in the series of equal keys --- in fact,
464  *              if the new key is equal to the page's "high key" we can place it on
465  *              the next page.  If it is equal to the high key, and there's not room
466  *              to insert the new tuple on the current page without splitting, then
467  *              we can move right hoping to find more free space and avoid a split.
468  *              (We should not move right indefinitely, however, since that leads to
469  *              O(N^2) insertion behavior in the presence of many equal keys.)
470  *              Once we have chosen the page to put the key on, we'll insert it before
471  *              any existing equal keys because of the way _bt_binsrch() works.
472  *
473  *              If there's not enough room in the space, we try to make room by
474  *              removing any LP_DEAD tuples.
475  *
476  *              On entry, *buf and *offsetptr point to the first legal position
477  *              where the new tuple could be inserted.  The caller should hold an
478  *              exclusive lock on *buf.  *offsetptr can also be set to
479  *              InvalidOffsetNumber, in which case the function will search for the
480  *              right location within the page if needed.  On exit, they point to the
481  *              chosen insert location.  If _bt_findinsertloc decides to move right,
482  *              the lock and pin on the original page will be released and the new
483  *              page returned to the caller is exclusively locked instead.
484  *
485  *              newtup is the new tuple we're inserting, and scankey is an insertion
486  *              type scan key for it.
487  */
488 static void
489 _bt_findinsertloc(Relation rel,
490                                   Buffer *bufptr,
491                                   OffsetNumber *offsetptr,
492                                   int keysz,
493                                   ScanKey scankey,
494                                   IndexTuple newtup)
495 {
496         Buffer          buf = *bufptr;
497         Page            page = BufferGetPage(buf);
498         Size            itemsz;
499         BTPageOpaque lpageop;
500         bool            movedright,
501                                 vacuumed;
502         OffsetNumber newitemoff;
503         OffsetNumber firstlegaloff = *offsetptr;
504
505         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
506
507         itemsz = IndexTupleDSize(*newtup);
508         itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but we
509                                                                  * need to be consistent */
510
511         /*
512          * Check whether the item can fit on a btree page at all. (Eventually, we
513          * ought to try to apply TOAST methods if not.) We actually need to be
514          * able to fit three items on every page, so restrict any one item to 1/3
515          * the per-page available space. Note that at this point, itemsz doesn't
516          * include the ItemId.
517          */
518         if (itemsz > BTMaxItemSize(page))
519                 ereport(ERROR,
520                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
521                                  errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
522                                                 (unsigned long) itemsz,
523                                                 (unsigned long) BTMaxItemSize(page),
524                                                 RelationGetRelationName(rel)),
525                 errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
526                                 "Consider a function index of an MD5 hash of the value, "
527                                 "or use full text indexing.")));
528
529         /*----------
530          * If we will need to split the page to put the item on this page,
531          * check whether we can put the tuple somewhere to the right,
532          * instead.  Keep scanning right until we
533          *              (a) find a page with enough free space,
534          *              (b) reach the last page where the tuple can legally go, or
535          *              (c) get tired of searching.
536          * (c) is not flippant; it is important because if there are many
537          * pages' worth of equal keys, it's better to split one of the early
538          * pages than to scan all the way to the end of the run of equal keys
539          * on every insert.  We implement "get tired" as a random choice,
540          * since stopping after scanning a fixed number of pages wouldn't work
541          * well (we'd never reach the right-hand side of previously split
542          * pages).      Currently the probability of moving right is set at 0.99,
543          * which may seem too high to change the behavior much, but it does an
544          * excellent job of preventing O(N^2) behavior with many equal keys.
545          *----------
546          */
547         movedright = false;
548         vacuumed = false;
549         while (PageGetFreeSpace(page) < itemsz)
550         {
551                 Buffer          rbuf;
552
553                 /*
554                  * before considering moving right, see if we can obtain enough space
555                  * by erasing LP_DEAD items
556                  */
557                 if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
558                 {
559                         _bt_vacuum_one_page(rel, buf);
560
561                         /*
562                          * remember that we vacuumed this page, because that makes the
563                          * hint supplied by the caller invalid
564                          */
565                         vacuumed = true;
566
567                         if (PageGetFreeSpace(page) >= itemsz)
568                                 break;                  /* OK, now we have enough space */
569                 }
570
571                 /*
572                  * nope, so check conditions (b) and (c) enumerated above
573                  */
574                 if (P_RIGHTMOST(lpageop) ||
575                         _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
576                         random() <= (MAX_RANDOM_VALUE / 100))
577                         break;
578
579                 /*
580                  * step right to next non-dead page
581                  *
582                  * must write-lock that page before releasing write lock on current
583                  * page; else someone else's _bt_check_unique scan could fail to see
584                  * our insertion.  write locks on intermediate dead pages won't do
585                  * because we don't know when they will get de-linked from the tree.
586                  */
587                 rbuf = InvalidBuffer;
588
589                 for (;;)
590                 {
591                         BlockNumber rblkno = lpageop->btpo_next;
592
593                         rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
594                         page = BufferGetPage(rbuf);
595                         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
596                         if (!P_IGNORE(lpageop))
597                                 break;
598                         if (P_RIGHTMOST(lpageop))
599                                 elog(ERROR, "fell off the end of index \"%s\"",
600                                          RelationGetRelationName(rel));
601                 }
602                 _bt_relbuf(rel, buf);
603                 buf = rbuf;
604                 movedright = true;
605                 vacuumed = false;
606         }
607
608         /*
609          * Now we are on the right page, so find the insert position. If we moved
610          * right at all, we know we should insert at the start of the page. If we
611          * didn't move right, we can use the firstlegaloff hint if the caller
612          * supplied one, unless we vacuumed the page which might have moved tuples
613          * around making the hint invalid. If we didn't move right or can't use
614          * the hint, find the position by searching.
615          */
616         if (movedright)
617                 newitemoff = P_FIRSTDATAKEY(lpageop);
618         else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
619                 newitemoff = firstlegaloff;
620         else
621                 newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
622
623         *bufptr = buf;
624         *offsetptr = newitemoff;
625 }
626
627 /*----------
628  *      _bt_insertonpg() -- Insert a tuple on a particular page in the index.
629  *
630  *              This recursive procedure does the following things:
631  *
632  *                      +  if necessary, splits the target page (making sure that the
633  *                         split is equitable as far as post-insert free space goes).
634  *                      +  inserts the tuple.
635  *                      +  if the page was split, pops the parent stack, and finds the
636  *                         right place to insert the new child pointer (by walking
637  *                         right using information stored in the parent stack).
638  *                      +  invokes itself with the appropriate tuple for the right
639  *                         child page on the parent.
640  *                      +  updates the metapage if a true root or fast root is split.
641  *
642  *              On entry, we must have the right buffer in which to do the
643  *              insertion, and the buffer must be pinned and write-locked.      On return,
644  *              we will have dropped both the pin and the lock on the buffer.
645  *
646  *              The locking interactions in this code are critical.  You should
647  *              grok Lehman and Yao's paper before making any changes.  In addition,
648  *              you need to understand how we disambiguate duplicate keys in this
649  *              implementation, in order to be able to find our location using
650  *              L&Y "move right" operations.  Since we may insert duplicate user
651  *              keys, and since these dups may propagate up the tree, we use the
652  *              'afteritem' parameter to position ourselves correctly for the
653  *              insertion on internal pages.
654  *----------
655  */
656 static void
657 _bt_insertonpg(Relation rel,
658                            Buffer buf,
659                            BTStack stack,
660                            IndexTuple itup,
661                            OffsetNumber newitemoff,
662                            bool split_only_page)
663 {
664         Page            page;
665         BTPageOpaque lpageop;
666         OffsetNumber firstright = InvalidOffsetNumber;
667         Size            itemsz;
668
669         page = BufferGetPage(buf);
670         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
671
672         itemsz = IndexTupleDSize(*itup);
673         itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but we
674                                                                  * need to be consistent */
675
676         /*
677          * Do we need to split the page to fit the item on it?
678          *
679          * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
680          * so this comparison is correct even though we appear to be accounting
681          * only for the item and not for its line pointer.
682          */
683         if (PageGetFreeSpace(page) < itemsz)
684         {
685                 bool            is_root = P_ISROOT(lpageop);
686                 bool            is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
687                 bool            newitemonleft;
688                 Buffer          rbuf;
689
690                 /* Choose the split point */
691                 firstright = _bt_findsplitloc(rel, page,
692                                                                           newitemoff, itemsz,
693                                                                           &newitemonleft);
694
695                 /* split the buffer into left and right halves */
696                 rbuf = _bt_split(rel, buf, firstright,
697                                                  newitemoff, itemsz, itup, newitemonleft);
698
699                 /*----------
700                  * By here,
701                  *
702                  *              +  our target page has been split;
703                  *              +  the original tuple has been inserted;
704                  *              +  we have write locks on both the old (left half)
705                  *                 and new (right half) buffers, after the split; and
706                  *              +  we know the key we want to insert into the parent
707                  *                 (it's the "high key" on the left child page).
708                  *
709                  * We're ready to do the parent insertion.  We need to hold onto the
710                  * locks for the child pages until we locate the parent, but we can
711                  * release them before doing the actual insertion (see Lehman and Yao
712                  * for the reasoning).
713                  *----------
714                  */
715                 _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
716         }
717         else
718         {
719                 Buffer          metabuf = InvalidBuffer;
720                 Page            metapg = NULL;
721                 BTMetaPageData *metad = NULL;
722                 OffsetNumber itup_off;
723                 BlockNumber itup_blkno;
724
725                 itup_off = newitemoff;
726                 itup_blkno = BufferGetBlockNumber(buf);
727
728                 /*
729                  * If we are doing this insert because we split a page that was the
730                  * only one on its tree level, but was not the root, it may have been
731                  * the "fast root".  We need to ensure that the fast root link points
732                  * at or above the current page.  We can safely acquire a lock on the
733                  * metapage here --- see comments for _bt_newroot().
734                  */
735                 if (split_only_page)
736                 {
737                         Assert(!P_ISLEAF(lpageop));
738
739                         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
740                         metapg = BufferGetPage(metabuf);
741                         metad = BTPageGetMeta(metapg);
742
743                         if (metad->btm_fastlevel >= lpageop->btpo.level)
744                         {
745                                 /* no update wanted */
746                                 _bt_relbuf(rel, metabuf);
747                                 metabuf = InvalidBuffer;
748                         }
749                 }
750
751                 /* Do the update.  No ereport(ERROR) until changes are logged */
752                 START_CRIT_SECTION();
753
754                 _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page");
755
756                 MarkBufferDirty(buf);
757
758                 if (BufferIsValid(metabuf))
759                 {
760                         metad->btm_fastroot = itup_blkno;
761                         metad->btm_fastlevel = lpageop->btpo.level;
762                         MarkBufferDirty(metabuf);
763                 }
764
765                 /* XLOG stuff */
766                 if (!rel->rd_istemp)
767                 {
768                         xl_btree_insert xlrec;
769                         BlockNumber xldownlink;
770                         xl_btree_metadata xlmeta;
771                         uint8           xlinfo;
772                         XLogRecPtr      recptr;
773                         XLogRecData rdata[4];
774                         XLogRecData *nextrdata;
775                         IndexTupleData trunctuple;
776
777                         xlrec.target.node = rel->rd_node;
778                         ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
779
780                         rdata[0].data = (char *) &xlrec;
781                         rdata[0].len = SizeOfBtreeInsert;
782                         rdata[0].buffer = InvalidBuffer;
783                         rdata[0].next = nextrdata = &(rdata[1]);
784
785                         if (P_ISLEAF(lpageop))
786                                 xlinfo = XLOG_BTREE_INSERT_LEAF;
787                         else
788                         {
789                                 xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid));
790                                 Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
791
792                                 nextrdata->data = (char *) &xldownlink;
793                                 nextrdata->len = sizeof(BlockNumber);
794                                 nextrdata->buffer = InvalidBuffer;
795                                 nextrdata->next = nextrdata + 1;
796                                 nextrdata++;
797
798                                 xlinfo = XLOG_BTREE_INSERT_UPPER;
799                         }
800
801                         if (BufferIsValid(metabuf))
802                         {
803                                 xlmeta.root = metad->btm_root;
804                                 xlmeta.level = metad->btm_level;
805                                 xlmeta.fastroot = metad->btm_fastroot;
806                                 xlmeta.fastlevel = metad->btm_fastlevel;
807
808                                 nextrdata->data = (char *) &xlmeta;
809                                 nextrdata->len = sizeof(xl_btree_metadata);
810                                 nextrdata->buffer = InvalidBuffer;
811                                 nextrdata->next = nextrdata + 1;
812                                 nextrdata++;
813
814                                 xlinfo = XLOG_BTREE_INSERT_META;
815                         }
816
817                         /* Read comments in _bt_pgaddtup */
818                         if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
819                         {
820                                 trunctuple = *itup;
821                                 trunctuple.t_info = sizeof(IndexTupleData);
822                                 nextrdata->data = (char *) &trunctuple;
823                                 nextrdata->len = sizeof(IndexTupleData);
824                         }
825                         else
826                         {
827                                 nextrdata->data = (char *) itup;
828                                 nextrdata->len = IndexTupleDSize(*itup);
829                         }
830                         nextrdata->buffer = buf;
831                         nextrdata->buffer_std = true;
832                         nextrdata->next = NULL;
833
834                         recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
835
836                         if (BufferIsValid(metabuf))
837                         {
838                                 PageSetLSN(metapg, recptr);
839                                 PageSetTLI(metapg, ThisTimeLineID);
840                         }
841
842                         PageSetLSN(page, recptr);
843                         PageSetTLI(page, ThisTimeLineID);
844                 }
845
846                 END_CRIT_SECTION();
847
848                 /* release buffers; send out relcache inval if metapage changed */
849                 if (BufferIsValid(metabuf))
850                 {
851                         if (!InRecovery)
852                                 CacheInvalidateRelcache(rel);
853                         _bt_relbuf(rel, metabuf);
854                 }
855
856                 _bt_relbuf(rel, buf);
857         }
858 }
859
860 /*
861  *      _bt_split() -- split a page in the btree.
862  *
863  *              On entry, buf is the page to split, and is pinned and write-locked.
864  *              firstright is the item index of the first item to be moved to the
865  *              new right page.  newitemoff etc. tell us about the new item that
866  *              must be inserted along with the data from the old page.
867  *
868  *              Returns the new right sibling of buf, pinned and write-locked.
869  *              The pin and lock on buf are maintained.
870  */
871 static Buffer
872 _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
873                   OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
874                   bool newitemonleft)
875 {
876         Buffer          rbuf;
877         Page            origpage;
878         Page            leftpage,
879                                 rightpage;
880         BTPageOpaque ropaque,
881                                 lopaque,
882                                 oopaque;
883         Buffer          sbuf = InvalidBuffer;
884         Page            spage = NULL;
885         BTPageOpaque sopaque = NULL;
886         Size            itemsz;
887         ItemId          itemid;
888         IndexTuple      item;
889         OffsetNumber leftoff,
890                                 rightoff;
891         OffsetNumber maxoff;
892         OffsetNumber i;
893         bool            isroot;
894
895         rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
896         origpage = BufferGetPage(buf);
897         leftpage = PageGetTempPage(origpage);
898         rightpage = BufferGetPage(rbuf);
899
900         _bt_pageinit(leftpage, BufferGetPageSize(buf));
901         /* rightpage was already initialized by _bt_getbuf */
902
903         /*
904          * Copy the original page's LSN and TLI into leftpage, which will become
905          * the updated version of the page.  We need this because XLogInsert will
906          * examine these fields and possibly dump them in a page image.
907          */
908         PageSetLSN(leftpage, PageGetLSN(origpage));
909         PageSetTLI(leftpage, PageGetTLI(origpage));
910
911         /* init btree private data */
912         oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
913         lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
914         ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
915
916         isroot = P_ISROOT(oopaque);
917
918         /* if we're splitting this page, it won't be the root when we're done */
919         /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
920         lopaque->btpo_flags = oopaque->btpo_flags;
921         lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
922         ropaque->btpo_flags = lopaque->btpo_flags;
923         lopaque->btpo_prev = oopaque->btpo_prev;
924         lopaque->btpo_next = BufferGetBlockNumber(rbuf);
925         ropaque->btpo_prev = BufferGetBlockNumber(buf);
926         ropaque->btpo_next = oopaque->btpo_next;
927         lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
928         /* Since we already have write-lock on both pages, ok to read cycleid */
929         lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
930         ropaque->btpo_cycleid = lopaque->btpo_cycleid;
931
932         /*
933          * If the page we're splitting is not the rightmost page at its level in
934          * the tree, then the first entry on the page is the high key for the
935          * page.  We need to copy that to the right half.  Otherwise (meaning the
936          * rightmost page case), all the items on the right half will be user
937          * data.
938          */
939         rightoff = P_HIKEY;
940
941         if (!P_RIGHTMOST(oopaque))
942         {
943                 itemid = PageGetItemId(origpage, P_HIKEY);
944                 itemsz = ItemIdGetLength(itemid);
945                 item = (IndexTuple) PageGetItem(origpage, itemid);
946                 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
947                                                 false, false) == InvalidOffsetNumber)
948                         elog(PANIC, "failed to add hikey to the right sibling"
949                                  " while splitting block %u of index \"%s\"",
950                                  BufferGetBlockNumber(buf), RelationGetRelationName(rel));
951                 rightoff = OffsetNumberNext(rightoff);
952         }
953
954         /*
955          * The "high key" for the new left page will be the first key that's going
956          * to go into the new right page.  This might be either the existing data
957          * item at position firstright, or the incoming tuple.
958          */
959         leftoff = P_HIKEY;
960         if (!newitemonleft && newitemoff == firstright)
961         {
962                 /* incoming tuple will become first on right page */
963                 itemsz = newitemsz;
964                 item = newitem;
965         }
966         else
967         {
968                 /* existing item at firstright will become first on right page */
969                 itemid = PageGetItemId(origpage, firstright);
970                 itemsz = ItemIdGetLength(itemid);
971                 item = (IndexTuple) PageGetItem(origpage, itemid);
972         }
973         if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
974                                         false, false) == InvalidOffsetNumber)
975                 elog(PANIC, "failed to add hikey to the left sibling"
976                          " while splitting block %u of index \"%s\"",
977                          BufferGetBlockNumber(buf), RelationGetRelationName(rel));
978         leftoff = OffsetNumberNext(leftoff);
979
980         /*
981          * Now transfer all the data items to the appropriate page.
982          *
983          * Note: we *must* insert at least the right page's items in item-number
984          * order, for the benefit of _bt_restore_page().
985          */
986         maxoff = PageGetMaxOffsetNumber(origpage);
987
988         for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
989         {
990                 itemid = PageGetItemId(origpage, i);
991                 itemsz = ItemIdGetLength(itemid);
992                 item = (IndexTuple) PageGetItem(origpage, itemid);
993
994                 /* does new item belong before this one? */
995                 if (i == newitemoff)
996                 {
997                         if (newitemonleft)
998                         {
999                                 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
1000                                                          "left sibling");
1001                                 leftoff = OffsetNumberNext(leftoff);
1002                         }
1003                         else
1004                         {
1005                                 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
1006                                                          "right sibling");
1007                                 rightoff = OffsetNumberNext(rightoff);
1008                         }
1009                 }
1010
1011                 /* decide which page to put it on */
1012                 if (i < firstright)
1013                 {
1014                         _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
1015                                                  "left sibling");
1016                         leftoff = OffsetNumberNext(leftoff);
1017                 }
1018                 else
1019                 {
1020                         _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
1021                                                  "right sibling");
1022                         rightoff = OffsetNumberNext(rightoff);
1023                 }
1024         }
1025
1026         /* cope with possibility that newitem goes at the end */
1027         if (i <= newitemoff)
1028         {
1029                 /*
1030                  * Can't have newitemonleft here; that would imply we were told to put
1031                  * *everything* on the left page, which cannot fit (if it could, we'd
1032                  * not be splitting the page).
1033                  */
1034                 Assert(!newitemonleft);
1035                 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
1036                                          "right sibling");
1037                 rightoff = OffsetNumberNext(rightoff);
1038         }
1039
1040         /*
1041          * We have to grab the right sibling (if any) and fix the prev pointer
1042          * there. We are guaranteed that this is deadlock-free since no other
1043          * writer will be holding a lock on that page and trying to move left, and
1044          * all readers release locks on a page before trying to fetch its
1045          * neighbors.
1046          */
1047
1048         if (!P_RIGHTMOST(ropaque))
1049         {
1050                 sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
1051                 spage = BufferGetPage(sbuf);
1052                 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
1053                 if (sopaque->btpo_prev != ropaque->btpo_prev)
1054                         elog(PANIC, "right sibling's left-link doesn't match: "
1055                            "block %u links to %u instead of expected %u in index \"%s\"",
1056                                  ropaque->btpo_next, sopaque->btpo_prev, ropaque->btpo_prev,
1057                                  RelationGetRelationName(rel));
1058
1059                 /*
1060                  * Check to see if we can set the SPLIT_END flag in the right-hand
1061                  * split page; this can save some I/O for vacuum since it need not
1062                  * proceed to the right sibling.  We can set the flag if the right
1063                  * sibling has a different cycleid: that means it could not be part of
1064                  * a group of pages that were all split off from the same ancestor
1065                  * page.  If you're confused, imagine that page A splits to A B and
1066                  * then again, yielding A C B, while vacuum is in progress.  Tuples
1067                  * originally in A could now be in either B or C, hence vacuum must
1068                  * examine both pages.  But if D, our right sibling, has a different
1069                  * cycleid then it could not contain any tuples that were in A when
1070                  * the vacuum started.
1071                  */
1072                 if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
1073                         ropaque->btpo_flags |= BTP_SPLIT_END;
1074         }
1075
1076         /*
1077          * Right sibling is locked, new siblings are prepared, but original page
1078          * is not updated yet.
1079          *
1080          * NO EREPORT(ERROR) till right sibling is updated.  We can get away with
1081          * not starting the critical section till here because we haven't been
1082          * scribbling on the original page yet, and we don't care about the new
1083          * sibling until it's linked into the btree.
1084          */
1085         START_CRIT_SECTION();
1086
1087         /*
1088          * By here, the original data page has been split into two new halves, and
1089          * these are correct.  The algorithm requires that the left page never
1090          * move during a split, so we copy the new left page back on top of the
1091          * original.  Note that this is not a waste of time, since we also require
1092          * (in the page management code) that the center of a page always be
1093          * clean, and the most efficient way to guarantee this is just to compact
1094          * the data by reinserting it into a new left page.  (XXX the latter
1095          * comment is probably obsolete.)
1096          *
1097          * We need to do this before writing the WAL record, so that XLogInsert
1098          * can WAL log an image of the page if necessary.
1099          */
1100         PageRestoreTempPage(leftpage, origpage);
1101
1102         MarkBufferDirty(buf);
1103         MarkBufferDirty(rbuf);
1104
1105         if (!P_RIGHTMOST(ropaque))
1106         {
1107                 sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
1108                 MarkBufferDirty(sbuf);
1109         }
1110
1111         /* XLOG stuff */
1112         if (!rel->rd_istemp)
1113         {
1114                 xl_btree_split xlrec;
1115                 uint8           xlinfo;
1116                 XLogRecPtr      recptr;
1117                 XLogRecData rdata[7];
1118                 XLogRecData *lastrdata;
1119
1120                 xlrec.node = rel->rd_node;
1121                 xlrec.leftsib = BufferGetBlockNumber(buf);
1122                 xlrec.rightsib = BufferGetBlockNumber(rbuf);
1123                 xlrec.rnext = ropaque->btpo_next;
1124                 xlrec.level = ropaque->btpo.level;
1125                 xlrec.firstright = firstright;
1126
1127                 rdata[0].data = (char *) &xlrec;
1128                 rdata[0].len = SizeOfBtreeSplit;
1129                 rdata[0].buffer = InvalidBuffer;
1130
1131                 lastrdata = &rdata[0];
1132
1133                 if (ropaque->btpo.level > 0)
1134                 {
1135                         /* Log downlink on non-leaf pages */
1136                         lastrdata->next = lastrdata + 1;
1137                         lastrdata++;
1138
1139                         lastrdata->data = (char *) &newitem->t_tid.ip_blkid;
1140                         lastrdata->len = sizeof(BlockIdData);
1141                         lastrdata->buffer = InvalidBuffer;
1142
1143                         /*
1144                          * We must also log the left page's high key, because the right
1145                          * page's leftmost key is suppressed on non-leaf levels.  Show it
1146                          * as belonging to the left page buffer, so that it is not stored
1147                          * if XLogInsert decides it needs a full-page image of the left
1148                          * page.
1149                          */
1150                         lastrdata->next = lastrdata + 1;
1151                         lastrdata++;
1152
1153                         itemid = PageGetItemId(origpage, P_HIKEY);
1154                         item = (IndexTuple) PageGetItem(origpage, itemid);
1155                         lastrdata->data = (char *) item;
1156                         lastrdata->len = MAXALIGN(IndexTupleSize(item));
1157                         lastrdata->buffer = buf;        /* backup block 1 */
1158                         lastrdata->buffer_std = true;
1159                 }
1160
1161                 /*
1162                  * Log the new item and its offset, if it was inserted on the left
1163                  * page. (If it was put on the right page, we don't need to explicitly
1164                  * WAL log it because it's included with all the other items on the
1165                  * right page.) Show the new item as belonging to the left page
1166                  * buffer, so that it is not stored if XLogInsert decides it needs a
1167                  * full-page image of the left page.  We store the offset anyway,
1168                  * though, to support archive compression of these records.
1169                  */
1170                 if (newitemonleft)
1171                 {
1172                         lastrdata->next = lastrdata + 1;
1173                         lastrdata++;
1174
1175                         lastrdata->data = (char *) &newitemoff;
1176                         lastrdata->len = sizeof(OffsetNumber);
1177                         lastrdata->buffer = InvalidBuffer;
1178
1179                         lastrdata->next = lastrdata + 1;
1180                         lastrdata++;
1181
1182                         lastrdata->data = (char *) newitem;
1183                         lastrdata->len = MAXALIGN(newitemsz);
1184                         lastrdata->buffer = buf;        /* backup block 1 */
1185                         lastrdata->buffer_std = true;
1186                 }
1187                 else if (ropaque->btpo.level == 0)
1188                 {
1189                         /*
1190                          * Although we don't need to WAL-log the new item, we still need
1191                          * XLogInsert to consider storing a full-page image of the left
1192                          * page, so make an empty entry referencing that buffer. This also
1193                          * ensures that the left page is always backup block 1.
1194                          */
1195                         lastrdata->next = lastrdata + 1;
1196                         lastrdata++;
1197
1198                         lastrdata->data = NULL;
1199                         lastrdata->len = 0;
1200                         lastrdata->buffer = buf;        /* backup block 1 */
1201                         lastrdata->buffer_std = true;
1202                 }
1203
1204                 /*
1205                  * Log the contents of the right page in the format understood by
1206                  * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
1207                  * because we're going to recreate the whole page anyway, so it should
1208                  * never be stored by XLogInsert.
1209                  *
1210                  * Direct access to page is not good but faster - we should implement
1211                  * some new func in page API.  Note we only store the tuples
1212                  * themselves, knowing that they were inserted in item-number order
1213                  * and so the item pointers can be reconstructed.  See comments for
1214                  * _bt_restore_page().
1215                  */
1216                 lastrdata->next = lastrdata + 1;
1217                 lastrdata++;
1218
1219                 lastrdata->data = (char *) rightpage +
1220                         ((PageHeader) rightpage)->pd_upper;
1221                 lastrdata->len = ((PageHeader) rightpage)->pd_special -
1222                         ((PageHeader) rightpage)->pd_upper;
1223                 lastrdata->buffer = InvalidBuffer;
1224
1225                 /* Log the right sibling, because we've changed its' prev-pointer. */
1226                 if (!P_RIGHTMOST(ropaque))
1227                 {
1228                         lastrdata->next = lastrdata + 1;
1229                         lastrdata++;
1230
1231                         lastrdata->data = NULL;
1232                         lastrdata->len = 0;
1233                         lastrdata->buffer = sbuf;       /* backup block 2 */
1234                         lastrdata->buffer_std = true;
1235                 }
1236
1237                 lastrdata->next = NULL;
1238
1239                 if (isroot)
1240                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
1241                 else
1242                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
1243
1244                 recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
1245
1246                 PageSetLSN(origpage, recptr);
1247                 PageSetTLI(origpage, ThisTimeLineID);
1248                 PageSetLSN(rightpage, recptr);
1249                 PageSetTLI(rightpage, ThisTimeLineID);
1250                 if (!P_RIGHTMOST(ropaque))
1251                 {
1252                         PageSetLSN(spage, recptr);
1253                         PageSetTLI(spage, ThisTimeLineID);
1254                 }
1255         }
1256
1257         END_CRIT_SECTION();
1258
1259         /* release the old right sibling */
1260         if (!P_RIGHTMOST(ropaque))
1261                 _bt_relbuf(rel, sbuf);
1262
1263         /* split's done */
1264         return rbuf;
1265 }
1266
1267 /*
1268  *      _bt_findsplitloc() -- find an appropriate place to split a page.
1269  *
1270  * The idea here is to equalize the free space that will be on each split
1271  * page, *after accounting for the inserted tuple*.  (If we fail to account
1272  * for it, we might find ourselves with too little room on the page that
1273  * it needs to go into!)
1274  *
1275  * If the page is the rightmost page on its level, we instead try to arrange
1276  * to leave the left split page fillfactor% full.  In this way, when we are
1277  * inserting successively increasing keys (consider sequences, timestamps,
1278  * etc) we will end up with a tree whose pages are about fillfactor% full,
1279  * instead of the 50% full result that we'd get without this special case.
1280  * This is the same as nbtsort.c produces for a newly-created tree.  Note
1281  * that leaf and nonleaf pages use different fillfactors.
1282  *
1283  * We are passed the intended insert position of the new tuple, expressed as
1284  * the offsetnumber of the tuple it must go in front of.  (This could be
1285  * maxoff+1 if the tuple is to go at the end.)
1286  *
1287  * We return the index of the first existing tuple that should go on the
1288  * righthand page, plus a boolean indicating whether the new tuple goes on
1289  * the left or right page.      The bool is necessary to disambiguate the case
1290  * where firstright == newitemoff.
1291  */
1292 static OffsetNumber
1293 _bt_findsplitloc(Relation rel,
1294                                  Page page,
1295                                  OffsetNumber newitemoff,
1296                                  Size newitemsz,
1297                                  bool *newitemonleft)
1298 {
1299         BTPageOpaque opaque;
1300         OffsetNumber offnum;
1301         OffsetNumber maxoff;
1302         ItemId          itemid;
1303         FindSplitData state;
1304         int                     leftspace,
1305                                 rightspace,
1306                                 goodenough,
1307                                 olddataitemstotal,
1308                                 olddataitemstoleft;
1309         bool            goodenoughfound;
1310
1311         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1312
1313         /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
1314         newitemsz += sizeof(ItemIdData);
1315
1316         /* Total free space available on a btree page, after fixed overhead */
1317         leftspace = rightspace =
1318                 PageGetPageSize(page) - SizeOfPageHeaderData -
1319                 MAXALIGN(sizeof(BTPageOpaqueData));
1320
1321         /* The right page will have the same high key as the old page */
1322         if (!P_RIGHTMOST(opaque))
1323         {
1324                 itemid = PageGetItemId(page, P_HIKEY);
1325                 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1326                                                          sizeof(ItemIdData));
1327         }
1328
1329         /* Count up total space in data items without actually scanning 'em */
1330         olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page);
1331
1332         state.newitemsz = newitemsz;
1333         state.is_leaf = P_ISLEAF(opaque);
1334         state.is_rightmost = P_RIGHTMOST(opaque);
1335         state.have_split = false;
1336         if (state.is_leaf)
1337                 state.fillfactor = RelationGetFillFactor(rel,
1338                                                                                                  BTREE_DEFAULT_FILLFACTOR);
1339         else
1340                 state.fillfactor = BTREE_NONLEAF_FILLFACTOR;
1341         state.newitemonleft = false;    /* these just to keep compiler quiet */
1342         state.firstright = 0;
1343         state.best_delta = 0;
1344         state.leftspace = leftspace;
1345         state.rightspace = rightspace;
1346         state.olddataitemstotal = olddataitemstotal;
1347         state.newitemoff = newitemoff;
1348
1349         /*
1350          * Finding the best possible split would require checking all the possible
1351          * split points, because of the high-key and left-key special cases.
1352          * That's probably more work than it's worth; instead, stop as soon as we
1353          * find a "good-enough" split, where good-enough is defined as an
1354          * imbalance in free space of no more than pagesize/16 (arbitrary...) This
1355          * should let us stop near the middle on most pages, instead of plowing to
1356          * the end.
1357          */
1358         goodenough = leftspace / 16;
1359
1360         /*
1361          * Scan through the data items and calculate space usage for a split at
1362          * each possible position.
1363          */
1364         olddataitemstoleft = 0;
1365         goodenoughfound = false;
1366         maxoff = PageGetMaxOffsetNumber(page);
1367
1368         for (offnum = P_FIRSTDATAKEY(opaque);
1369                  offnum <= maxoff;
1370                  offnum = OffsetNumberNext(offnum))
1371         {
1372                 Size            itemsz;
1373
1374                 itemid = PageGetItemId(page, offnum);
1375                 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1376
1377                 /*
1378                  * Will the new item go to left or right of split?
1379                  */
1380                 if (offnum > newitemoff)
1381                         _bt_checksplitloc(&state, offnum, true,
1382                                                           olddataitemstoleft, itemsz);
1383
1384                 else if (offnum < newitemoff)
1385                         _bt_checksplitloc(&state, offnum, false,
1386                                                           olddataitemstoleft, itemsz);
1387                 else
1388                 {
1389                         /* need to try it both ways! */
1390                         _bt_checksplitloc(&state, offnum, true,
1391                                                           olddataitemstoleft, itemsz);
1392
1393                         _bt_checksplitloc(&state, offnum, false,
1394                                                           olddataitemstoleft, itemsz);
1395                 }
1396
1397                 /* Abort scan once we find a good-enough choice */
1398                 if (state.have_split && state.best_delta <= goodenough)
1399                 {
1400                         goodenoughfound = true;
1401                         break;
1402                 }
1403
1404                 olddataitemstoleft += itemsz;
1405         }
1406
1407         /*
1408          * If the new item goes as the last item, check for splitting so that all
1409          * the old items go to the left page and the new item goes to the right
1410          * page.
1411          */
1412         if (newitemoff > maxoff && !goodenoughfound)
1413                 _bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0);
1414
1415         /*
1416          * I believe it is not possible to fail to find a feasible split, but just
1417          * in case ...
1418          */
1419         if (!state.have_split)
1420                 elog(ERROR, "could not find a feasible split point for index \"%s\"",
1421                          RelationGetRelationName(rel));
1422
1423         *newitemonleft = state.newitemonleft;
1424         return state.firstright;
1425 }
1426
1427 /*
1428  * Subroutine to analyze a particular possible split choice (ie, firstright
1429  * and newitemonleft settings), and record the best split so far in *state.
1430  *
1431  * firstoldonright is the offset of the first item on the original page
1432  * that goes to the right page, and firstoldonrightsz is the size of that
1433  * tuple. firstoldonright can be > max offset, which means that all the old
1434  * items go to the left page and only the new item goes to the right page.
1435  * In that case, firstoldonrightsz is not used.
1436  *
1437  * olddataitemstoleft is the total size of all old items to the left of
1438  * firstoldonright.
1439  */
1440 static void
1441 _bt_checksplitloc(FindSplitData *state,
1442                                   OffsetNumber firstoldonright,
1443                                   bool newitemonleft,
1444                                   int olddataitemstoleft,
1445                                   Size firstoldonrightsz)
1446 {
1447         int                     leftfree,
1448                                 rightfree;
1449         Size            firstrightitemsz;
1450         bool            newitemisfirstonright;
1451
1452         /* Is the new item going to be the first item on the right page? */
1453         newitemisfirstonright = (firstoldonright == state->newitemoff
1454                                                          && !newitemonleft);
1455
1456         if (newitemisfirstonright)
1457                 firstrightitemsz = state->newitemsz;
1458         else
1459                 firstrightitemsz = firstoldonrightsz;
1460
1461         /* Account for all the old tuples */
1462         leftfree = state->leftspace - olddataitemstoleft;
1463         rightfree = state->rightspace -
1464                 (state->olddataitemstotal - olddataitemstoleft);
1465
1466         /*
1467          * The first item on the right page becomes the high key of the left page;
1468          * therefore it counts against left space as well as right space.
1469          */
1470         leftfree -= firstrightitemsz;
1471
1472         /* account for the new item */
1473         if (newitemonleft)
1474                 leftfree -= (int) state->newitemsz;
1475         else
1476                 rightfree -= (int) state->newitemsz;
1477
1478         /*
1479          * If we are not on the leaf level, we will be able to discard the key
1480          * data from the first item that winds up on the right page.
1481          */
1482         if (!state->is_leaf)
1483                 rightfree += (int) firstrightitemsz -
1484                         (int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData));
1485
1486         /*
1487          * If feasible split point, remember best delta.
1488          */
1489         if (leftfree >= 0 && rightfree >= 0)
1490         {
1491                 int                     delta;
1492
1493                 if (state->is_rightmost)
1494                 {
1495                         /*
1496                          * If splitting a rightmost page, try to put (100-fillfactor)% of
1497                          * free space on left page. See comments for _bt_findsplitloc.
1498                          */
1499                         delta = (state->fillfactor * leftfree)
1500                                 - ((100 - state->fillfactor) * rightfree);
1501                 }
1502                 else
1503                 {
1504                         /* Otherwise, aim for equal free space on both sides */
1505                         delta = leftfree - rightfree;
1506                 }
1507
1508                 if (delta < 0)
1509                         delta = -delta;
1510                 if (!state->have_split || delta < state->best_delta)
1511                 {
1512                         state->have_split = true;
1513                         state->newitemonleft = newitemonleft;
1514                         state->firstright = firstoldonright;
1515                         state->best_delta = delta;
1516                 }
1517         }
1518 }
1519
1520 /*
1521  * _bt_insert_parent() -- Insert downlink into parent after a page split.
1522  *
1523  * On entry, buf and rbuf are the left and right split pages, which we
1524  * still hold write locks on per the L&Y algorithm.  We release the
1525  * write locks once we have write lock on the parent page.      (Any sooner,
1526  * and it'd be possible for some other process to try to split or delete
1527  * one of these pages, and get confused because it cannot find the downlink.)
1528  *
1529  * stack - stack showing how we got here.  May be NULL in cases that don't
1530  *                      have to be efficient (concurrent ROOT split, WAL recovery)
1531  * is_root - we split the true root
1532  * is_only - we split a page alone on its level (might have been fast root)
1533  *
1534  * This is exported so it can be called by nbtxlog.c.
1535  */
1536 void
1537 _bt_insert_parent(Relation rel,
1538                                   Buffer buf,
1539                                   Buffer rbuf,
1540                                   BTStack stack,
1541                                   bool is_root,
1542                                   bool is_only)
1543 {
1544         /*
1545          * Here we have to do something Lehman and Yao don't talk about: deal with
1546          * a root split and construction of a new root.  If our stack is empty
1547          * then we have just split a node on what had been the root level when we
1548          * descended the tree.  If it was still the root then we perform a
1549          * new-root construction.  If it *wasn't* the root anymore, search to find
1550          * the next higher level that someone constructed meanwhile, and find the
1551          * right place to insert as for the normal case.
1552          *
1553          * If we have to search for the parent level, we do so by re-descending
1554          * from the root.  This is not super-efficient, but it's rare enough not
1555          * to matter.  (This path is also taken when called from WAL recovery ---
1556          * we have no stack in that case.)
1557          */
1558         if (is_root)
1559         {
1560                 Buffer          rootbuf;
1561
1562                 Assert(stack == NULL);
1563                 Assert(is_only);
1564                 /* create a new root node and update the metapage */
1565                 rootbuf = _bt_newroot(rel, buf, rbuf);
1566                 /* release the split buffers */
1567                 _bt_relbuf(rel, rootbuf);
1568                 _bt_relbuf(rel, rbuf);
1569                 _bt_relbuf(rel, buf);
1570         }
1571         else
1572         {
1573                 BlockNumber bknum = BufferGetBlockNumber(buf);
1574                 BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1575                 Page            page = BufferGetPage(buf);
1576                 IndexTuple      new_item;
1577                 BTStackData fakestack;
1578                 IndexTuple      ritem;
1579                 Buffer          pbuf;
1580
1581                 if (stack == NULL)
1582                 {
1583                         BTPageOpaque lpageop;
1584
1585                         if (!InRecovery)
1586                                 elog(DEBUG2, "concurrent ROOT page split");
1587                         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1588                         /* Find the leftmost page at the next level up */
1589                         pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
1590                         /* Set up a phony stack entry pointing there */
1591                         stack = &fakestack;
1592                         stack->bts_blkno = BufferGetBlockNumber(pbuf);
1593                         stack->bts_offset = InvalidOffsetNumber;
1594                         /* bts_btentry will be initialized below */
1595                         stack->bts_parent = NULL;
1596                         _bt_relbuf(rel, pbuf);
1597                 }
1598
1599                 /* get high key from left page == lowest key on new right page */
1600                 ritem = (IndexTuple) PageGetItem(page,
1601                                                                                  PageGetItemId(page, P_HIKEY));
1602
1603                 /* form an index tuple that points at the new right page */
1604                 new_item = CopyIndexTuple(ritem);
1605                 ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
1606
1607                 /*
1608                  * Find the parent buffer and get the parent page.
1609                  *
1610                  * Oops - if we were moved right then we need to change stack item! We
1611                  * want to find parent pointing to where we are, right ?        - vadim
1612                  * 05/27/97
1613                  */
1614                 ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
1615
1616                 pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1617
1618                 /* Now we can unlock the children */
1619                 _bt_relbuf(rel, rbuf);
1620                 _bt_relbuf(rel, buf);
1621
1622                 /* Check for error only after writing children */
1623                 if (pbuf == InvalidBuffer)
1624                         elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
1625                                  RelationGetRelationName(rel), bknum, rbknum);
1626
1627                 /* Recursively update the parent */
1628                 _bt_insertonpg(rel, pbuf, stack->bts_parent,
1629                                            new_item, stack->bts_offset + 1,
1630                                            is_only);
1631
1632                 /* be tidy */
1633                 pfree(new_item);
1634         }
1635 }
1636
1637 /*
1638  *      _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1639  *                                               we last looked at in the parent.
1640  *
1641  *              This is possible because we save the downlink from the parent item,
1642  *              which is enough to uniquely identify it.  Insertions into the parent
1643  *              level could cause the item to move right; deletions could cause it
1644  *              to move left, but not left of the page we previously found it in.
1645  *
1646  *              Adjusts bts_blkno & bts_offset if changed.
1647  *
1648  *              Returns InvalidBuffer if item not found (should not happen).
1649  */
1650 Buffer
1651 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1652 {
1653         BlockNumber blkno;
1654         OffsetNumber start;
1655
1656         blkno = stack->bts_blkno;
1657         start = stack->bts_offset;
1658
1659         for (;;)
1660         {
1661                 Buffer          buf;
1662                 Page            page;
1663                 BTPageOpaque opaque;
1664
1665                 buf = _bt_getbuf(rel, blkno, access);
1666                 page = BufferGetPage(buf);
1667                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1668
1669                 if (!P_IGNORE(opaque))
1670                 {
1671                         OffsetNumber offnum,
1672                                                 minoff,
1673                                                 maxoff;
1674                         ItemId          itemid;
1675                         IndexTuple      item;
1676
1677                         minoff = P_FIRSTDATAKEY(opaque);
1678                         maxoff = PageGetMaxOffsetNumber(page);
1679
1680                         /*
1681                          * start = InvalidOffsetNumber means "search the whole page". We
1682                          * need this test anyway due to possibility that page has a high
1683                          * key now when it didn't before.
1684                          */
1685                         if (start < minoff)
1686                                 start = minoff;
1687
1688                         /*
1689                          * Need this check too, to guard against possibility that page
1690                          * split since we visited it originally.
1691                          */
1692                         if (start > maxoff)
1693                                 start = OffsetNumberNext(maxoff);
1694
1695                         /*
1696                          * These loops will check every item on the page --- but in an
1697                          * order that's attuned to the probability of where it actually
1698                          * is.  Scan to the right first, then to the left.
1699                          */
1700                         for (offnum = start;
1701                                  offnum <= maxoff;
1702                                  offnum = OffsetNumberNext(offnum))
1703                         {
1704                                 itemid = PageGetItemId(page, offnum);
1705                                 item = (IndexTuple) PageGetItem(page, itemid);
1706                                 if (BTEntrySame(item, &stack->bts_btentry))
1707                                 {
1708                                         /* Return accurate pointer to where link is now */
1709                                         stack->bts_blkno = blkno;
1710                                         stack->bts_offset = offnum;
1711                                         return buf;
1712                                 }
1713                         }
1714
1715                         for (offnum = OffsetNumberPrev(start);
1716                                  offnum >= minoff;
1717                                  offnum = OffsetNumberPrev(offnum))
1718                         {
1719                                 itemid = PageGetItemId(page, offnum);
1720                                 item = (IndexTuple) PageGetItem(page, itemid);
1721                                 if (BTEntrySame(item, &stack->bts_btentry))
1722                                 {
1723                                         /* Return accurate pointer to where link is now */
1724                                         stack->bts_blkno = blkno;
1725                                         stack->bts_offset = offnum;
1726                                         return buf;
1727                                 }
1728                         }
1729                 }
1730
1731                 /*
1732                  * The item we're looking for moved right at least one page.
1733                  */
1734                 if (P_RIGHTMOST(opaque))
1735                 {
1736                         _bt_relbuf(rel, buf);
1737                         return InvalidBuffer;
1738                 }
1739                 blkno = opaque->btpo_next;
1740                 start = InvalidOffsetNumber;
1741                 _bt_relbuf(rel, buf);
1742         }
1743 }
1744
1745 /*
1746  *      _bt_newroot() -- Create a new root page for the index.
1747  *
1748  *              We've just split the old root page and need to create a new one.
1749  *              In order to do this, we add a new root page to the file, then lock
1750  *              the metadata page and update it.  This is guaranteed to be deadlock-
1751  *              free, because all readers release their locks on the metadata page
1752  *              before trying to lock the root, and all writers lock the root before
1753  *              trying to lock the metadata page.  We have a write lock on the old
1754  *              root page, so we have not introduced any cycles into the waits-for
1755  *              graph.
1756  *
1757  *              On entry, lbuf (the old root) and rbuf (its new peer) are write-
1758  *              locked. On exit, a new root page exists with entries for the
1759  *              two new children, metapage is updated and unlocked/unpinned.
1760  *              The new root buffer is returned to caller which has to unlock/unpin
1761  *              lbuf, rbuf & rootbuf.
1762  */
1763 static Buffer
1764 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1765 {
1766         Buffer          rootbuf;
1767         Page            lpage,
1768                                 rootpage;
1769         BlockNumber lbkno,
1770                                 rbkno;
1771         BlockNumber rootblknum;
1772         BTPageOpaque rootopaque;
1773         ItemId          itemid;
1774         IndexTuple      item;
1775         Size            itemsz;
1776         IndexTuple      new_item;
1777         Buffer          metabuf;
1778         Page            metapg;
1779         BTMetaPageData *metad;
1780
1781         lbkno = BufferGetBlockNumber(lbuf);
1782         rbkno = BufferGetBlockNumber(rbuf);
1783         lpage = BufferGetPage(lbuf);
1784
1785         /* get a new root page */
1786         rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1787         rootpage = BufferGetPage(rootbuf);
1788         rootblknum = BufferGetBlockNumber(rootbuf);
1789
1790         /* acquire lock on the metapage */
1791         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1792         metapg = BufferGetPage(metabuf);
1793         metad = BTPageGetMeta(metapg);
1794
1795         /* NO EREPORT(ERROR) from here till newroot op is logged */
1796         START_CRIT_SECTION();
1797
1798         /* set btree special data */
1799         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1800         rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1801         rootopaque->btpo_flags = BTP_ROOT;
1802         rootopaque->btpo.level =
1803                 ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1804         rootopaque->btpo_cycleid = 0;
1805
1806         /* update metapage data */
1807         metad->btm_root = rootblknum;
1808         metad->btm_level = rootopaque->btpo.level;
1809         metad->btm_fastroot = rootblknum;
1810         metad->btm_fastlevel = rootopaque->btpo.level;
1811
1812         /*
1813          * Create downlink item for left page (old root).  Since this will be the
1814          * first item in a non-leaf page, it implicitly has minus-infinity key
1815          * value, so we need not store any actual key in it.
1816          */
1817         itemsz = sizeof(IndexTupleData);
1818         new_item = (IndexTuple) palloc(itemsz);
1819         new_item->t_info = itemsz;
1820         ItemPointerSet(&(new_item->t_tid), lbkno, P_HIKEY);
1821
1822         /*
1823          * Insert the left page pointer into the new root page.  The root page is
1824          * the rightmost page on its level so there is no "high key" in it; the
1825          * two items will go into positions P_HIKEY and P_FIRSTKEY.
1826          *
1827          * Note: we *must* insert the two items in item-number order, for the
1828          * benefit of _bt_restore_page().
1829          */
1830         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY,
1831                                         false, false) == InvalidOffsetNumber)
1832                 elog(PANIC, "failed to add leftkey to new root page"
1833                          " while splitting block %u of index \"%s\"",
1834                          BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
1835         pfree(new_item);
1836
1837         /*
1838          * Create downlink item for right page.  The key for it is obtained from
1839          * the "high key" position in the left page.
1840          */
1841         itemid = PageGetItemId(lpage, P_HIKEY);
1842         itemsz = ItemIdGetLength(itemid);
1843         item = (IndexTuple) PageGetItem(lpage, itemid);
1844         new_item = CopyIndexTuple(item);
1845         ItemPointerSet(&(new_item->t_tid), rbkno, P_HIKEY);
1846
1847         /*
1848          * insert the right page pointer into the new root page.
1849          */
1850         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY,
1851                                         false, false) == InvalidOffsetNumber)
1852                 elog(PANIC, "failed to add rightkey to new root page"
1853                          " while splitting block %u of index \"%s\"",
1854                          BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
1855         pfree(new_item);
1856
1857         MarkBufferDirty(rootbuf);
1858         MarkBufferDirty(metabuf);
1859
1860         /* XLOG stuff */
1861         if (!rel->rd_istemp)
1862         {
1863                 xl_btree_newroot xlrec;
1864                 XLogRecPtr      recptr;
1865                 XLogRecData rdata[2];
1866
1867                 xlrec.node = rel->rd_node;
1868                 xlrec.rootblk = rootblknum;
1869                 xlrec.level = metad->btm_level;
1870
1871                 rdata[0].data = (char *) &xlrec;
1872                 rdata[0].len = SizeOfBtreeNewroot;
1873                 rdata[0].buffer = InvalidBuffer;
1874                 rdata[0].next = &(rdata[1]);
1875
1876                 /*
1877                  * Direct access to page is not good but faster - we should implement
1878                  * some new func in page API.
1879                  */
1880                 rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
1881                 rdata[1].len = ((PageHeader) rootpage)->pd_special -
1882                         ((PageHeader) rootpage)->pd_upper;
1883                 rdata[1].buffer = InvalidBuffer;
1884                 rdata[1].next = NULL;
1885
1886                 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
1887
1888                 PageSetLSN(rootpage, recptr);
1889                 PageSetTLI(rootpage, ThisTimeLineID);
1890                 PageSetLSN(metapg, recptr);
1891                 PageSetTLI(metapg, ThisTimeLineID);
1892         }
1893
1894         END_CRIT_SECTION();
1895
1896         /* send out relcache inval for metapage change */
1897         if (!InRecovery)
1898                 CacheInvalidateRelcache(rel);
1899
1900         /* done with metapage */
1901         _bt_relbuf(rel, metabuf);
1902
1903         return rootbuf;
1904 }
1905
1906 /*
1907  *      _bt_pgaddtup() -- add a tuple to a particular page in the index.
1908  *
1909  *              This routine adds the tuple to the page as requested.  It does
1910  *              not affect pin/lock status, but you'd better have a write lock
1911  *              and pin on the target buffer!  Don't forget to write and release
1912  *              the buffer afterwards, either.
1913  *
1914  *              The main difference between this routine and a bare PageAddItem call
1915  *              is that this code knows that the leftmost index tuple on a non-leaf
1916  *              btree page doesn't need to have a key.  Therefore, it strips such
1917  *              tuples down to just the tuple header.  CAUTION: this works ONLY if
1918  *              we insert the tuples in order, so that the given itup_off does
1919  *              represent the final position of the tuple!
1920  */
1921 static void
1922 _bt_pgaddtup(Relation rel,
1923                          Page page,
1924                          Size itemsize,
1925                          IndexTuple itup,
1926                          OffsetNumber itup_off,
1927                          const char *where)
1928 {
1929         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1930         IndexTupleData trunctuple;
1931
1932         if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
1933         {
1934                 trunctuple = *itup;
1935                 trunctuple.t_info = sizeof(IndexTupleData);
1936                 itup = &trunctuple;
1937                 itemsize = sizeof(IndexTupleData);
1938         }
1939
1940         if (PageAddItem(page, (Item) itup, itemsize, itup_off,
1941                                         false, false) == InvalidOffsetNumber)
1942                 elog(PANIC, "failed to add item to the %s in index \"%s\"",
1943                          where, RelationGetRelationName(rel));
1944 }
1945
1946 /*
1947  * _bt_isequal - used in _bt_doinsert in check for duplicates.
1948  *
1949  * This is very similar to _bt_compare, except for NULL handling.
1950  * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
1951  */
1952 static bool
1953 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
1954                         int keysz, ScanKey scankey)
1955 {
1956         IndexTuple      itup;
1957         int                     i;
1958
1959         /* Better be comparing to a leaf item */
1960         Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
1961
1962         itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
1963
1964         for (i = 1; i <= keysz; i++)
1965         {
1966                 AttrNumber      attno;
1967                 Datum           datum;
1968                 bool            isNull;
1969                 int32           result;
1970
1971                 attno = scankey->sk_attno;
1972                 Assert(attno == i);
1973                 datum = index_getattr(itup, attno, itupdesc, &isNull);
1974
1975                 /* NULLs are never equal to anything */
1976                 if (isNull || (scankey->sk_flags & SK_ISNULL))
1977                         return false;
1978
1979                 result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
1980                                                                                          datum,
1981                                                                                          scankey->sk_argument));
1982
1983                 if (result != 0)
1984                         return false;
1985
1986                 scankey++;
1987         }
1988
1989         /* if we get here, the keys are equal */
1990         return true;
1991 }
1992
1993 /*
1994  * _bt_vacuum_one_page - vacuum just one index page.
1995  *
1996  * Try to remove LP_DEAD items from the given page.  The passed buffer
1997  * must be exclusive-locked, but unlike a real VACUUM, we don't need a
1998  * super-exclusive "cleanup" lock (see nbtree/README).
1999  */
2000 static void
2001 _bt_vacuum_one_page(Relation rel, Buffer buffer)
2002 {
2003         OffsetNumber deletable[MaxOffsetNumber];
2004         int                     ndeletable = 0;
2005         OffsetNumber offnum,
2006                                 minoff,
2007                                 maxoff;
2008         Page            page = BufferGetPage(buffer);
2009         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2010
2011         /*
2012          * Scan over all items to see which ones need to be deleted according to
2013          * LP_DEAD flags.
2014          */
2015         minoff = P_FIRSTDATAKEY(opaque);
2016         maxoff = PageGetMaxOffsetNumber(page);
2017         for (offnum = minoff;
2018                  offnum <= maxoff;
2019                  offnum = OffsetNumberNext(offnum))
2020         {
2021                 ItemId          itemId = PageGetItemId(page, offnum);
2022
2023                 if (ItemIdIsDead(itemId))
2024                         deletable[ndeletable++] = offnum;
2025         }
2026
2027         if (ndeletable > 0)
2028                 _bt_delitems(rel, buffer, deletable, ndeletable, false, 0);
2029
2030         /*
2031          * Note: if we didn't find any LP_DEAD items, then the page's
2032          * BTP_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
2033          * separate write to clear it, however.  We will clear it when we split
2034          * the page.
2035          */
2036 }