]> granicus.if.org Git - postgresql/blob - src/backend/access/nbtree/nbtinsert.c
Reduce pinning and buffer content locking for btree scans.
[postgresql] / src / backend / access / nbtree / nbtinsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtinsert.c
4  *        Item insertion in Lehman and Yao btrees for Postgres.
5  *
6  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/access/nbtree/nbtinsert.c
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "access/transam.h"
21 #include "access/xloginsert.h"
22 #include "miscadmin.h"
23 #include "storage/lmgr.h"
24 #include "storage/predicate.h"
25 #include "utils/tqual.h"
26
27
28 typedef struct
29 {
30         /* context data for _bt_checksplitloc */
31         Size            newitemsz;              /* size of new item to be inserted */
32         int                     fillfactor;             /* needed when splitting rightmost page */
33         bool            is_leaf;                /* T if splitting a leaf page */
34         bool            is_rightmost;   /* T if splitting a rightmost page */
35         OffsetNumber newitemoff;        /* where the new item is to be inserted */
36         int                     leftspace;              /* space available for items on left page */
37         int                     rightspace;             /* space available for items on right page */
38         int                     olddataitemstotal;              /* space taken by old items */
39
40         bool            have_split;             /* found a valid split? */
41
42         /* these fields valid only if have_split is true */
43         bool            newitemonleft;  /* new item on left or right of best split */
44         OffsetNumber firstright;        /* best split point */
45         int                     best_delta;             /* best size delta so far */
46 } FindSplitData;
47
48
49 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
50
51 static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
52                                  Relation heapRel, Buffer buf, OffsetNumber offset,
53                                  ScanKey itup_scankey,
54                                  IndexUniqueCheck checkUnique, bool *is_unique);
55 static void _bt_findinsertloc(Relation rel,
56                                   Buffer *bufptr,
57                                   OffsetNumber *offsetptr,
58                                   int keysz,
59                                   ScanKey scankey,
60                                   IndexTuple newtup,
61                                   BTStack stack,
62                                   Relation heapRel);
63 static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf,
64                            BTStack stack,
65                            IndexTuple itup,
66                            OffsetNumber newitemoff,
67                            bool split_only_page);
68 static Buffer _bt_split(Relation rel, Buffer buf, Buffer cbuf,
69                   OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz,
70                   IndexTuple newitem, bool newitemonleft);
71 static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
72                                   BTStack stack, bool is_root, bool is_only);
73 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
74                                  OffsetNumber newitemoff,
75                                  Size newitemsz,
76                                  bool *newitemonleft);
77 static void _bt_checksplitloc(FindSplitData *state,
78                                   OffsetNumber firstoldonright, bool newitemonleft,
79                                   int dataitemstoleft, Size firstoldonrightsz);
80 static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
81                          OffsetNumber itup_off);
82 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
83                         int keysz, ScanKey scankey);
84 static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
85
86
87 /*
88  *      _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
89  *
90  *              This routine is called by the public interface routine, btinsert.
91  *              By here, itup is filled in, including the TID.
92  *
93  *              If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
94  *              will allow duplicates.  Otherwise (UNIQUE_CHECK_YES or
95  *              UNIQUE_CHECK_EXISTING) it will throw error for a duplicate.
96  *              For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and
97  *              don't actually insert.
98  *
99  *              The result value is only significant for UNIQUE_CHECK_PARTIAL:
100  *              it must be TRUE if the entry is known unique, else FALSE.
101  *              (In the current implementation we'll also return TRUE after a
102  *              successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but
103  *              that's just a coding artifact.)
104  */
105 bool
106 _bt_doinsert(Relation rel, IndexTuple itup,
107                          IndexUniqueCheck checkUnique, Relation heapRel)
108 {
109         bool            is_unique = false;
110         int                     natts = rel->rd_rel->relnatts;
111         ScanKey         itup_scankey;
112         BTStack         stack;
113         Buffer          buf;
114         OffsetNumber offset;
115
116         /* we need an insertion scan key to do our search, so build one */
117         itup_scankey = _bt_mkscankey(rel, itup);
118
119 top:
120         /* find the first page containing this key */
121         stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
122
123         offset = InvalidOffsetNumber;
124
125         /* trade in our read lock for a write lock */
126         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
127         LockBuffer(buf, BT_WRITE);
128
129         /*
130          * If the page was split between the time that we surrendered our read
131          * lock and acquired our write lock, then this page may no longer be the
132          * right place for the key we want to insert.  In this case, we need to
133          * move right in the tree.  See Lehman and Yao for an excruciatingly
134          * precise description.
135          */
136         buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
137                                                 true, stack, BT_WRITE);
138
139         /*
140          * If we're not allowing duplicates, make sure the key isn't already in
141          * the index.
142          *
143          * NOTE: obviously, _bt_check_unique can only detect keys that are already
144          * in the index; so it cannot defend against concurrent insertions of the
145          * same key.  We protect against that by means of holding a write lock on
146          * the target page.  Any other would-be inserter of the same key must
147          * acquire a write lock on the same target page, so only one would-be
148          * inserter can be making the check at one time.  Furthermore, once we are
149          * past the check we hold write locks continuously until we have performed
150          * our insertion, so no later inserter can fail to see our insertion.
151          * (This requires some care in _bt_insertonpg.)
152          *
153          * If we must wait for another xact, we release the lock while waiting,
154          * and then must start over completely.
155          *
156          * For a partial uniqueness check, we don't wait for the other xact. Just
157          * let the tuple in and return false for possibly non-unique, or true for
158          * definitely unique.
159          */
160         if (checkUnique != UNIQUE_CHECK_NO)
161         {
162                 TransactionId xwait;
163
164                 offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
165                 xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
166                                                                  checkUnique, &is_unique);
167
168                 if (TransactionIdIsValid(xwait))
169                 {
170                         /* Have to wait for the other guy ... */
171                         _bt_relbuf(rel, buf);
172                         XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
173                         /* start over... */
174                         _bt_freestack(stack);
175                         goto top;
176                 }
177         }
178
179         if (checkUnique != UNIQUE_CHECK_EXISTING)
180         {
181                 /*
182                  * The only conflict predicate locking cares about for indexes is when
183                  * an index tuple insert conflicts with an existing lock.  Since the
184                  * actual location of the insert is hard to predict because of the
185                  * random search used to prevent O(N^2) performance when there are
186                  * many duplicate entries, we can just use the "first valid" page.
187                  */
188                 CheckForSerializableConflictIn(rel, NULL, buf);
189                 /* do the insertion */
190                 _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
191                                                   stack, heapRel);
192                 _bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
193         }
194         else
195         {
196                 /* just release the buffer */
197                 _bt_relbuf(rel, buf);
198         }
199
200         /* be tidy */
201         _bt_freestack(stack);
202         _bt_freeskey(itup_scankey);
203
204         return is_unique;
205 }
206
207 /*
208  *      _bt_check_unique() -- Check for violation of unique index constraint
209  *
210  * offset points to the first possible item that could conflict. It can
211  * also point to end-of-page, which means that the first tuple to check
212  * is the first tuple on the next page.
213  *
214  * Returns InvalidTransactionId if there is no conflict, else an xact ID
215  * we must wait for to see if it commits a conflicting tuple.   If an actual
216  * conflict is detected, no return --- just ereport().
217  *
218  * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
219  * InvalidTransactionId because we don't want to wait.  In this case we
220  * set *is_unique to false if there is a potential conflict, and the
221  * core code must redo the uniqueness check later.
222  */
223 static TransactionId
224 _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
225                                  Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
226                                  IndexUniqueCheck checkUnique, bool *is_unique)
227 {
228         TupleDesc       itupdesc = RelationGetDescr(rel);
229         int                     natts = rel->rd_rel->relnatts;
230         SnapshotData SnapshotDirty;
231         OffsetNumber maxoff;
232         Page            page;
233         BTPageOpaque opaque;
234         Buffer          nbuf = InvalidBuffer;
235         bool            found = false;
236
237         /* Assume unique until we find a duplicate */
238         *is_unique = true;
239
240         InitDirtySnapshot(SnapshotDirty);
241
242         page = BufferGetPage(buf);
243         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
244         maxoff = PageGetMaxOffsetNumber(page);
245
246         /*
247          * Scan over all equal tuples, looking for live conflicts.
248          */
249         for (;;)
250         {
251                 ItemId          curitemid;
252                 IndexTuple      curitup;
253                 BlockNumber nblkno;
254
255                 /*
256                  * make sure the offset points to an actual item before trying to
257                  * examine it...
258                  */
259                 if (offset <= maxoff)
260                 {
261                         curitemid = PageGetItemId(page, offset);
262
263                         /*
264                          * We can skip items that are marked killed.
265                          *
266                          * Formerly, we applied _bt_isequal() before checking the kill
267                          * flag, so as to fall out of the item loop as soon as possible.
268                          * However, in the presence of heavy update activity an index may
269                          * contain many killed items with the same key; running
270                          * _bt_isequal() on each killed item gets expensive. Furthermore
271                          * it is likely that the non-killed version of each key appears
272                          * first, so that we didn't actually get to exit any sooner
273                          * anyway. So now we just advance over killed items as quickly as
274                          * we can. We only apply _bt_isequal() when we get to a non-killed
275                          * item or the end of the page.
276                          */
277                         if (!ItemIdIsDead(curitemid))
278                         {
279                                 ItemPointerData htid;
280                                 bool            all_dead;
281
282                                 /*
283                                  * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
284                                  * how we handling NULLs - and so we must not use _bt_compare
285                                  * in real comparison, but only for ordering/finding items on
286                                  * pages. - vadim 03/24/97
287                                  */
288                                 if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
289                                         break;          /* we're past all the equal tuples */
290
291                                 /* okay, we gotta fetch the heap tuple ... */
292                                 curitup = (IndexTuple) PageGetItem(page, curitemid);
293                                 htid = curitup->t_tid;
294
295                                 /*
296                                  * If we are doing a recheck, we expect to find the tuple we
297                                  * are rechecking.  It's not a duplicate, but we have to keep
298                                  * scanning.
299                                  */
300                                 if (checkUnique == UNIQUE_CHECK_EXISTING &&
301                                         ItemPointerCompare(&htid, &itup->t_tid) == 0)
302                                 {
303                                         found = true;
304                                 }
305
306                                 /*
307                                  * We check the whole HOT-chain to see if there is any tuple
308                                  * that satisfies SnapshotDirty.  This is necessary because we
309                                  * have just a single index entry for the entire chain.
310                                  */
311                                 else if (heap_hot_search(&htid, heapRel, &SnapshotDirty,
312                                                                                  &all_dead))
313                                 {
314                                         TransactionId xwait;
315
316                                         /*
317                                          * It is a duplicate. If we are only doing a partial
318                                          * check, then don't bother checking if the tuple is being
319                                          * updated in another transaction. Just return the fact
320                                          * that it is a potential conflict and leave the full
321                                          * check till later.
322                                          */
323                                         if (checkUnique == UNIQUE_CHECK_PARTIAL)
324                                         {
325                                                 if (nbuf != InvalidBuffer)
326                                                         _bt_relbuf(rel, nbuf);
327                                                 *is_unique = false;
328                                                 return InvalidTransactionId;
329                                         }
330
331                                         /*
332                                          * If this tuple is being updated by other transaction
333                                          * then we have to wait for its commit/abort.
334                                          */
335                                         xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ?
336                                                 SnapshotDirty.xmin : SnapshotDirty.xmax;
337
338                                         if (TransactionIdIsValid(xwait))
339                                         {
340                                                 if (nbuf != InvalidBuffer)
341                                                         _bt_relbuf(rel, nbuf);
342                                                 /* Tell _bt_doinsert to wait... */
343                                                 return xwait;
344                                         }
345
346                                         /*
347                                          * Otherwise we have a definite conflict.  But before
348                                          * complaining, look to see if the tuple we want to insert
349                                          * is itself now committed dead --- if so, don't complain.
350                                          * This is a waste of time in normal scenarios but we must
351                                          * do it to support CREATE INDEX CONCURRENTLY.
352                                          *
353                                          * We must follow HOT-chains here because during
354                                          * concurrent index build, we insert the root TID though
355                                          * the actual tuple may be somewhere in the HOT-chain.
356                                          * While following the chain we might not stop at the
357                                          * exact tuple which triggered the insert, but that's OK
358                                          * because if we find a live tuple anywhere in this chain,
359                                          * we have a unique key conflict.  The other live tuple is
360                                          * not part of this chain because it had a different index
361                                          * entry.
362                                          */
363                                         htid = itup->t_tid;
364                                         if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL))
365                                         {
366                                                 /* Normal case --- it's still live */
367                                         }
368                                         else
369                                         {
370                                                 /*
371                                                  * It's been deleted, so no error, and no need to
372                                                  * continue searching
373                                                  */
374                                                 break;
375                                         }
376
377                                         /*
378                                          * This is a definite conflict.  Break the tuple down into
379                                          * datums and report the error.  But first, make sure we
380                                          * release the buffer locks we're holding ---
381                                          * BuildIndexValueDescription could make catalog accesses,
382                                          * which in the worst case might touch this same index and
383                                          * cause deadlocks.
384                                          */
385                                         if (nbuf != InvalidBuffer)
386                                                 _bt_relbuf(rel, nbuf);
387                                         _bt_relbuf(rel, buf);
388
389                                         {
390                                                 Datum           values[INDEX_MAX_KEYS];
391                                                 bool            isnull[INDEX_MAX_KEYS];
392                                                 char       *key_desc;
393
394                                                 index_deform_tuple(itup, RelationGetDescr(rel),
395                                                                                    values, isnull);
396
397                                                 key_desc = BuildIndexValueDescription(rel, values,
398                                                                                                                           isnull);
399
400                                                 ereport(ERROR,
401                                                                 (errcode(ERRCODE_UNIQUE_VIOLATION),
402                                                                  errmsg("duplicate key value violates unique constraint \"%s\"",
403                                                                                 RelationGetRelationName(rel)),
404                                                                  key_desc ? errdetail("Key %s already exists.",
405                                                                                                           key_desc) : 0,
406                                                                  errtableconstraint(heapRel,
407                                                                                          RelationGetRelationName(rel))));
408                                         }
409                                 }
410                                 else if (all_dead)
411                                 {
412                                         /*
413                                          * The conflicting tuple (or whole HOT chain) is dead to
414                                          * everyone, so we may as well mark the index entry
415                                          * killed.
416                                          */
417                                         ItemIdMarkDead(curitemid);
418                                         opaque->btpo_flags |= BTP_HAS_GARBAGE;
419
420                                         /*
421                                          * Mark buffer with a dirty hint, since state is not
422                                          * crucial. Be sure to mark the proper buffer dirty.
423                                          */
424                                         if (nbuf != InvalidBuffer)
425                                                 MarkBufferDirtyHint(nbuf, true);
426                                         else
427                                                 MarkBufferDirtyHint(buf, true);
428                                 }
429                         }
430                 }
431
432                 /*
433                  * Advance to next tuple to continue checking.
434                  */
435                 if (offset < maxoff)
436                         offset = OffsetNumberNext(offset);
437                 else
438                 {
439                         /* If scankey == hikey we gotta check the next page too */
440                         if (P_RIGHTMOST(opaque))
441                                 break;
442                         if (!_bt_isequal(itupdesc, page, P_HIKEY,
443                                                          natts, itup_scankey))
444                                 break;
445                         /* Advance to next non-dead page --- there must be one */
446                         for (;;)
447                         {
448                                 nblkno = opaque->btpo_next;
449                                 nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
450                                 page = BufferGetPage(nbuf);
451                                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
452                                 if (!P_IGNORE(opaque))
453                                         break;
454                                 if (P_RIGHTMOST(opaque))
455                                         elog(ERROR, "fell off the end of index \"%s\"",
456                                                  RelationGetRelationName(rel));
457                         }
458                         maxoff = PageGetMaxOffsetNumber(page);
459                         offset = P_FIRSTDATAKEY(opaque);
460                 }
461         }
462
463         /*
464          * If we are doing a recheck then we should have found the tuple we are
465          * checking.  Otherwise there's something very wrong --- probably, the
466          * index is on a non-immutable expression.
467          */
468         if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
469                 ereport(ERROR,
470                                 (errcode(ERRCODE_INTERNAL_ERROR),
471                                  errmsg("failed to re-find tuple within index \"%s\"",
472                                                 RelationGetRelationName(rel)),
473                  errhint("This may be because of a non-immutable index expression."),
474                                  errtableconstraint(heapRel,
475                                                                         RelationGetRelationName(rel))));
476
477         if (nbuf != InvalidBuffer)
478                 _bt_relbuf(rel, nbuf);
479
480         return InvalidTransactionId;
481 }
482
483
484 /*
485  *      _bt_findinsertloc() -- Finds an insert location for a tuple
486  *
487  *              If the new key is equal to one or more existing keys, we can
488  *              legitimately place it anywhere in the series of equal keys --- in fact,
489  *              if the new key is equal to the page's "high key" we can place it on
490  *              the next page.  If it is equal to the high key, and there's not room
491  *              to insert the new tuple on the current page without splitting, then
492  *              we can move right hoping to find more free space and avoid a split.
493  *              (We should not move right indefinitely, however, since that leads to
494  *              O(N^2) insertion behavior in the presence of many equal keys.)
495  *              Once we have chosen the page to put the key on, we'll insert it before
496  *              any existing equal keys because of the way _bt_binsrch() works.
497  *
498  *              If there's not enough room in the space, we try to make room by
499  *              removing any LP_DEAD tuples.
500  *
501  *              On entry, *bufptr and *offsetptr point to the first legal position
502  *              where the new tuple could be inserted.  The caller should hold an
503  *              exclusive lock on *bufptr.  *offsetptr can also be set to
504  *              InvalidOffsetNumber, in which case the function will search for the
505  *              right location within the page if needed.  On exit, they point to the
506  *              chosen insert location.  If _bt_findinsertloc decides to move right,
507  *              the lock and pin on the original page will be released and the new
508  *              page returned to the caller is exclusively locked instead.
509  *
510  *              newtup is the new tuple we're inserting, and scankey is an insertion
511  *              type scan key for it.
512  */
513 static void
514 _bt_findinsertloc(Relation rel,
515                                   Buffer *bufptr,
516                                   OffsetNumber *offsetptr,
517                                   int keysz,
518                                   ScanKey scankey,
519                                   IndexTuple newtup,
520                                   BTStack stack,
521                                   Relation heapRel)
522 {
523         Buffer          buf = *bufptr;
524         Page            page = BufferGetPage(buf);
525         Size            itemsz;
526         BTPageOpaque lpageop;
527         bool            movedright,
528                                 vacuumed;
529         OffsetNumber newitemoff;
530         OffsetNumber firstlegaloff = *offsetptr;
531
532         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
533
534         itemsz = IndexTupleDSize(*newtup);
535         itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but we
536                                                                  * need to be consistent */
537
538         /*
539          * Check whether the item can fit on a btree page at all. (Eventually, we
540          * ought to try to apply TOAST methods if not.) We actually need to be
541          * able to fit three items on every page, so restrict any one item to 1/3
542          * the per-page available space. Note that at this point, itemsz doesn't
543          * include the ItemId.
544          *
545          * NOTE: if you change this, see also the similar code in _bt_buildadd().
546          */
547         if (itemsz > BTMaxItemSize(page))
548                 ereport(ERROR,
549                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
550                         errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
551                                    itemsz, BTMaxItemSize(page),
552                                    RelationGetRelationName(rel)),
553                 errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
554                                 "Consider a function index of an MD5 hash of the value, "
555                                 "or use full text indexing."),
556                                  errtableconstraint(heapRel,
557                                                                         RelationGetRelationName(rel))));
558
559         /*----------
560          * If we will need to split the page to put the item on this page,
561          * check whether we can put the tuple somewhere to the right,
562          * instead.  Keep scanning right until we
563          *              (a) find a page with enough free space,
564          *              (b) reach the last page where the tuple can legally go, or
565          *              (c) get tired of searching.
566          * (c) is not flippant; it is important because if there are many
567          * pages' worth of equal keys, it's better to split one of the early
568          * pages than to scan all the way to the end of the run of equal keys
569          * on every insert.  We implement "get tired" as a random choice,
570          * since stopping after scanning a fixed number of pages wouldn't work
571          * well (we'd never reach the right-hand side of previously split
572          * pages).  Currently the probability of moving right is set at 0.99,
573          * which may seem too high to change the behavior much, but it does an
574          * excellent job of preventing O(N^2) behavior with many equal keys.
575          *----------
576          */
577         movedright = false;
578         vacuumed = false;
579         while (PageGetFreeSpace(page) < itemsz)
580         {
581                 Buffer          rbuf;
582                 BlockNumber rblkno;
583
584                 /*
585                  * before considering moving right, see if we can obtain enough space
586                  * by erasing LP_DEAD items
587                  */
588                 if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
589                 {
590                         _bt_vacuum_one_page(rel, buf, heapRel);
591
592                         /*
593                          * remember that we vacuumed this page, because that makes the
594                          * hint supplied by the caller invalid
595                          */
596                         vacuumed = true;
597
598                         if (PageGetFreeSpace(page) >= itemsz)
599                                 break;                  /* OK, now we have enough space */
600                 }
601
602                 /*
603                  * nope, so check conditions (b) and (c) enumerated above
604                  */
605                 if (P_RIGHTMOST(lpageop) ||
606                         _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
607                         random() <= (MAX_RANDOM_VALUE / 100))
608                         break;
609
610                 /*
611                  * step right to next non-dead page
612                  *
613                  * must write-lock that page before releasing write lock on current
614                  * page; else someone else's _bt_check_unique scan could fail to see
615                  * our insertion.  write locks on intermediate dead pages won't do
616                  * because we don't know when they will get de-linked from the tree.
617                  */
618                 rbuf = InvalidBuffer;
619
620                 rblkno = lpageop->btpo_next;
621                 for (;;)
622                 {
623                         rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
624                         page = BufferGetPage(rbuf);
625                         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
626
627                         /*
628                          * If this page was incompletely split, finish the split now. We
629                          * do this while holding a lock on the left sibling, which is not
630                          * good because finishing the split could be a fairly lengthy
631                          * operation.  But this should happen very seldom.
632                          */
633                         if (P_INCOMPLETE_SPLIT(lpageop))
634                         {
635                                 _bt_finish_split(rel, rbuf, stack);
636                                 rbuf = InvalidBuffer;
637                                 continue;
638                         }
639
640                         if (!P_IGNORE(lpageop))
641                                 break;
642                         if (P_RIGHTMOST(lpageop))
643                                 elog(ERROR, "fell off the end of index \"%s\"",
644                                          RelationGetRelationName(rel));
645
646                         rblkno = lpageop->btpo_next;
647                 }
648                 _bt_relbuf(rel, buf);
649                 buf = rbuf;
650                 movedright = true;
651                 vacuumed = false;
652         }
653
654         /*
655          * Now we are on the right page, so find the insert position. If we moved
656          * right at all, we know we should insert at the start of the page. If we
657          * didn't move right, we can use the firstlegaloff hint if the caller
658          * supplied one, unless we vacuumed the page which might have moved tuples
659          * around making the hint invalid. If we didn't move right or can't use
660          * the hint, find the position by searching.
661          */
662         if (movedright)
663                 newitemoff = P_FIRSTDATAKEY(lpageop);
664         else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
665                 newitemoff = firstlegaloff;
666         else
667                 newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
668
669         *bufptr = buf;
670         *offsetptr = newitemoff;
671 }
672
673 /*----------
674  *      _bt_insertonpg() -- Insert a tuple on a particular page in the index.
675  *
676  *              This recursive procedure does the following things:
677  *
678  *                      +  if necessary, splits the target page (making sure that the
679  *                         split is equitable as far as post-insert free space goes).
680  *                      +  inserts the tuple.
681  *                      +  if the page was split, pops the parent stack, and finds the
682  *                         right place to insert the new child pointer (by walking
683  *                         right using information stored in the parent stack).
684  *                      +  invokes itself with the appropriate tuple for the right
685  *                         child page on the parent.
686  *                      +  updates the metapage if a true root or fast root is split.
687  *
688  *              On entry, we must have the correct buffer in which to do the
689  *              insertion, and the buffer must be pinned and write-locked.  On return,
690  *              we will have dropped both the pin and the lock on the buffer.
691  *
692  *              When inserting to a non-leaf page, 'cbuf' is the left-sibling of the
693  *              page we're inserting the downlink for.  This function will clear the
694  *              INCOMPLETE_SPLIT flag on it, and release the buffer.
695  *
696  *              The locking interactions in this code are critical.  You should
697  *              grok Lehman and Yao's paper before making any changes.  In addition,
698  *              you need to understand how we disambiguate duplicate keys in this
699  *              implementation, in order to be able to find our location using
700  *              L&Y "move right" operations.  Since we may insert duplicate user
701  *              keys, and since these dups may propagate up the tree, we use the
702  *              'afteritem' parameter to position ourselves correctly for the
703  *              insertion on internal pages.
704  *----------
705  */
706 static void
707 _bt_insertonpg(Relation rel,
708                            Buffer buf,
709                            Buffer cbuf,
710                            BTStack stack,
711                            IndexTuple itup,
712                            OffsetNumber newitemoff,
713                            bool split_only_page)
714 {
715         Page            page;
716         BTPageOpaque lpageop;
717         OffsetNumber firstright = InvalidOffsetNumber;
718         Size            itemsz;
719
720         page = BufferGetPage(buf);
721         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
722
723         /* child buffer must be given iff inserting on an internal page */
724         Assert(P_ISLEAF(lpageop) == !BufferIsValid(cbuf));
725
726         /* The caller should've finished any incomplete splits already. */
727         if (P_INCOMPLETE_SPLIT(lpageop))
728                 elog(ERROR, "cannot insert to incompletely split page %u",
729                          BufferGetBlockNumber(buf));
730
731         itemsz = IndexTupleDSize(*itup);
732         itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but we
733                                                                  * need to be consistent */
734
735         /*
736          * Do we need to split the page to fit the item on it?
737          *
738          * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
739          * so this comparison is correct even though we appear to be accounting
740          * only for the item and not for its line pointer.
741          */
742         if (PageGetFreeSpace(page) < itemsz)
743         {
744                 bool            is_root = P_ISROOT(lpageop);
745                 bool            is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
746                 bool            newitemonleft;
747                 Buffer          rbuf;
748
749                 /* Choose the split point */
750                 firstright = _bt_findsplitloc(rel, page,
751                                                                           newitemoff, itemsz,
752                                                                           &newitemonleft);
753
754                 /* split the buffer into left and right halves */
755                 rbuf = _bt_split(rel, buf, cbuf, firstright,
756                                                  newitemoff, itemsz, itup, newitemonleft);
757                 PredicateLockPageSplit(rel,
758                                                            BufferGetBlockNumber(buf),
759                                                            BufferGetBlockNumber(rbuf));
760
761                 /*----------
762                  * By here,
763                  *
764                  *              +  our target page has been split;
765                  *              +  the original tuple has been inserted;
766                  *              +  we have write locks on both the old (left half)
767                  *                 and new (right half) buffers, after the split; and
768                  *              +  we know the key we want to insert into the parent
769                  *                 (it's the "high key" on the left child page).
770                  *
771                  * We're ready to do the parent insertion.  We need to hold onto the
772                  * locks for the child pages until we locate the parent, but we can
773                  * release them before doing the actual insertion (see Lehman and Yao
774                  * for the reasoning).
775                  *----------
776                  */
777                 _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
778         }
779         else
780         {
781                 Buffer          metabuf = InvalidBuffer;
782                 Page            metapg = NULL;
783                 BTMetaPageData *metad = NULL;
784                 OffsetNumber itup_off;
785                 BlockNumber itup_blkno;
786
787                 itup_off = newitemoff;
788                 itup_blkno = BufferGetBlockNumber(buf);
789
790                 /*
791                  * If we are doing this insert because we split a page that was the
792                  * only one on its tree level, but was not the root, it may have been
793                  * the "fast root".  We need to ensure that the fast root link points
794                  * at or above the current page.  We can safely acquire a lock on the
795                  * metapage here --- see comments for _bt_newroot().
796                  */
797                 if (split_only_page)
798                 {
799                         Assert(!P_ISLEAF(lpageop));
800
801                         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
802                         metapg = BufferGetPage(metabuf);
803                         metad = BTPageGetMeta(metapg);
804
805                         if (metad->btm_fastlevel >= lpageop->btpo.level)
806                         {
807                                 /* no update wanted */
808                                 _bt_relbuf(rel, metabuf);
809                                 metabuf = InvalidBuffer;
810                         }
811                 }
812
813                 /* Do the update.  No ereport(ERROR) until changes are logged */
814                 START_CRIT_SECTION();
815
816                 if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
817                         elog(PANIC, "failed to add new item to block %u in index \"%s\"",
818                                  itup_blkno, RelationGetRelationName(rel));
819
820                 MarkBufferDirty(buf);
821
822                 if (BufferIsValid(metabuf))
823                 {
824                         metad->btm_fastroot = itup_blkno;
825                         metad->btm_fastlevel = lpageop->btpo.level;
826                         MarkBufferDirty(metabuf);
827                 }
828
829                 /* clear INCOMPLETE_SPLIT flag on child if inserting a downlink */
830                 if (BufferIsValid(cbuf))
831                 {
832                         Page            cpage = BufferGetPage(cbuf);
833                         BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
834
835                         Assert(P_INCOMPLETE_SPLIT(cpageop));
836                         cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
837                         MarkBufferDirty(cbuf);
838                 }
839
840                 /* XLOG stuff */
841                 if (RelationNeedsWAL(rel))
842                 {
843                         xl_btree_insert xlrec;
844                         xl_btree_metadata xlmeta;
845                         uint8           xlinfo;
846                         XLogRecPtr      recptr;
847                         IndexTupleData trunctuple;
848
849                         xlrec.offnum = itup_off;
850
851                         XLogBeginInsert();
852                         XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert);
853
854                         if (P_ISLEAF(lpageop))
855                                 xlinfo = XLOG_BTREE_INSERT_LEAF;
856                         else
857                         {
858                                 /*
859                                  * Register the left child whose INCOMPLETE_SPLIT flag was
860                                  * cleared.
861                                  */
862                                 XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD);
863
864                                 xlinfo = XLOG_BTREE_INSERT_UPPER;
865                         }
866
867                         if (BufferIsValid(metabuf))
868                         {
869                                 xlmeta.root = metad->btm_root;
870                                 xlmeta.level = metad->btm_level;
871                                 xlmeta.fastroot = metad->btm_fastroot;
872                                 xlmeta.fastlevel = metad->btm_fastlevel;
873
874                                 XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
875                                 XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata));
876
877                                 xlinfo = XLOG_BTREE_INSERT_META;
878                         }
879
880                         /* Read comments in _bt_pgaddtup */
881                         XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
882                         if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
883                         {
884                                 trunctuple = *itup;
885                                 trunctuple.t_info = sizeof(IndexTupleData);
886                                 XLogRegisterBufData(0, (char *) &trunctuple,
887                                                                         sizeof(IndexTupleData));
888                         }
889                         else
890                                 XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
891
892                         recptr = XLogInsert(RM_BTREE_ID, xlinfo);
893
894                         if (BufferIsValid(metabuf))
895                         {
896                                 PageSetLSN(metapg, recptr);
897                         }
898                         if (BufferIsValid(cbuf))
899                         {
900                                 PageSetLSN(BufferGetPage(cbuf), recptr);
901                         }
902
903                         PageSetLSN(page, recptr);
904                 }
905
906                 END_CRIT_SECTION();
907
908                 /* release buffers */
909                 if (BufferIsValid(metabuf))
910                         _bt_relbuf(rel, metabuf);
911                 if (BufferIsValid(cbuf))
912                         _bt_relbuf(rel, cbuf);
913                 _bt_relbuf(rel, buf);
914         }
915 }
916
917 /*
918  *      _bt_split() -- split a page in the btree.
919  *
920  *              On entry, buf is the page to split, and is pinned and write-locked.
921  *              firstright is the item index of the first item to be moved to the
922  *              new right page.  newitemoff etc. tell us about the new item that
923  *              must be inserted along with the data from the old page.
924  *
925  *              When splitting a non-leaf page, 'cbuf' is the left-sibling of the
926  *              page we're inserting the downlink for.  This function will clear the
927  *              INCOMPLETE_SPLIT flag on it, and release the buffer.
928  *
929  *              Returns the new right sibling of buf, pinned and write-locked.
930  *              The pin and lock on buf are maintained.
931  */
932 static Buffer
933 _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
934                   OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
935                   bool newitemonleft)
936 {
937         Buffer          rbuf;
938         Page            origpage;
939         Page            leftpage,
940                                 rightpage;
941         BlockNumber origpagenumber,
942                                 rightpagenumber;
943         BTPageOpaque ropaque,
944                                 lopaque,
945                                 oopaque;
946         Buffer          sbuf = InvalidBuffer;
947         Page            spage = NULL;
948         BTPageOpaque sopaque = NULL;
949         Size            itemsz;
950         ItemId          itemid;
951         IndexTuple      item;
952         OffsetNumber leftoff,
953                                 rightoff;
954         OffsetNumber maxoff;
955         OffsetNumber i;
956         bool            isroot;
957         bool            isleaf;
958
959         /* Acquire a new page to split into */
960         rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
961
962         /*
963          * origpage is the original page to be split.  leftpage is a temporary
964          * buffer that receives the left-sibling data, which will be copied back
965          * into origpage on success.  rightpage is the new page that receives the
966          * right-sibling data.  If we fail before reaching the critical section,
967          * origpage hasn't been modified and leftpage is only workspace. In
968          * principle we shouldn't need to worry about rightpage either, because it
969          * hasn't been linked into the btree page structure; but to avoid leaving
970          * possibly-confusing junk behind, we are careful to rewrite rightpage as
971          * zeroes before throwing any error.
972          */
973         origpage = BufferGetPage(buf);
974         leftpage = PageGetTempPage(origpage);
975         rightpage = BufferGetPage(rbuf);
976
977         origpagenumber = BufferGetBlockNumber(buf);
978         rightpagenumber = BufferGetBlockNumber(rbuf);
979
980         _bt_pageinit(leftpage, BufferGetPageSize(buf));
981         /* rightpage was already initialized by _bt_getbuf */
982
983         /*
984          * Copy the original page's LSN into leftpage, which will become the
985          * updated version of the page.  We need this because XLogInsert will
986          * examine the LSN and possibly dump it in a page image.
987          */
988         PageSetLSN(leftpage, PageGetLSN(origpage));
989
990         /* init btree private data */
991         oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
992         lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
993         ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
994
995         isroot = P_ISROOT(oopaque);
996         isleaf = P_ISLEAF(oopaque);
997
998         /* if we're splitting this page, it won't be the root when we're done */
999         /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
1000         lopaque->btpo_flags = oopaque->btpo_flags;
1001         lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
1002         ropaque->btpo_flags = lopaque->btpo_flags;
1003         /* set flag in left page indicating that the right page has no downlink */
1004         lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT;
1005         lopaque->btpo_prev = oopaque->btpo_prev;
1006         lopaque->btpo_next = rightpagenumber;
1007         ropaque->btpo_prev = origpagenumber;
1008         ropaque->btpo_next = oopaque->btpo_next;
1009         lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
1010         /* Since we already have write-lock on both pages, ok to read cycleid */
1011         lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
1012         ropaque->btpo_cycleid = lopaque->btpo_cycleid;
1013
1014         /*
1015          * If the page we're splitting is not the rightmost page at its level in
1016          * the tree, then the first entry on the page is the high key for the
1017          * page.  We need to copy that to the right half.  Otherwise (meaning the
1018          * rightmost page case), all the items on the right half will be user
1019          * data.
1020          */
1021         rightoff = P_HIKEY;
1022
1023         if (!P_RIGHTMOST(oopaque))
1024         {
1025                 itemid = PageGetItemId(origpage, P_HIKEY);
1026                 itemsz = ItemIdGetLength(itemid);
1027                 item = (IndexTuple) PageGetItem(origpage, itemid);
1028                 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
1029                                                 false, false) == InvalidOffsetNumber)
1030                 {
1031                         memset(rightpage, 0, BufferGetPageSize(rbuf));
1032                         elog(ERROR, "failed to add hikey to the right sibling"
1033                                  " while splitting block %u of index \"%s\"",
1034                                  origpagenumber, RelationGetRelationName(rel));
1035                 }
1036                 rightoff = OffsetNumberNext(rightoff);
1037         }
1038
1039         /*
1040          * The "high key" for the new left page will be the first key that's going
1041          * to go into the new right page.  This might be either the existing data
1042          * item at position firstright, or the incoming tuple.
1043          */
1044         leftoff = P_HIKEY;
1045         if (!newitemonleft && newitemoff == firstright)
1046         {
1047                 /* incoming tuple will become first on right page */
1048                 itemsz = newitemsz;
1049                 item = newitem;
1050         }
1051         else
1052         {
1053                 /* existing item at firstright will become first on right page */
1054                 itemid = PageGetItemId(origpage, firstright);
1055                 itemsz = ItemIdGetLength(itemid);
1056                 item = (IndexTuple) PageGetItem(origpage, itemid);
1057         }
1058         if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
1059                                         false, false) == InvalidOffsetNumber)
1060         {
1061                 memset(rightpage, 0, BufferGetPageSize(rbuf));
1062                 elog(ERROR, "failed to add hikey to the left sibling"
1063                          " while splitting block %u of index \"%s\"",
1064                          origpagenumber, RelationGetRelationName(rel));
1065         }
1066         leftoff = OffsetNumberNext(leftoff);
1067
1068         /*
1069          * Now transfer all the data items to the appropriate page.
1070          *
1071          * Note: we *must* insert at least the right page's items in item-number
1072          * order, for the benefit of _bt_restore_page().
1073          */
1074         maxoff = PageGetMaxOffsetNumber(origpage);
1075
1076         for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
1077         {
1078                 itemid = PageGetItemId(origpage, i);
1079                 itemsz = ItemIdGetLength(itemid);
1080                 item = (IndexTuple) PageGetItem(origpage, itemid);
1081
1082                 /* does new item belong before this one? */
1083                 if (i == newitemoff)
1084                 {
1085                         if (newitemonleft)
1086                         {
1087                                 if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
1088                                 {
1089                                         memset(rightpage, 0, BufferGetPageSize(rbuf));
1090                                         elog(ERROR, "failed to add new item to the left sibling"
1091                                                  " while splitting block %u of index \"%s\"",
1092                                                  origpagenumber, RelationGetRelationName(rel));
1093                                 }
1094                                 leftoff = OffsetNumberNext(leftoff);
1095                         }
1096                         else
1097                         {
1098                                 if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
1099                                 {
1100                                         memset(rightpage, 0, BufferGetPageSize(rbuf));
1101                                         elog(ERROR, "failed to add new item to the right sibling"
1102                                                  " while splitting block %u of index \"%s\"",
1103                                                  origpagenumber, RelationGetRelationName(rel));
1104                                 }
1105                                 rightoff = OffsetNumberNext(rightoff);
1106                         }
1107                 }
1108
1109                 /* decide which page to put it on */
1110                 if (i < firstright)
1111                 {
1112                         if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
1113                         {
1114                                 memset(rightpage, 0, BufferGetPageSize(rbuf));
1115                                 elog(ERROR, "failed to add old item to the left sibling"
1116                                          " while splitting block %u of index \"%s\"",
1117                                          origpagenumber, RelationGetRelationName(rel));
1118                         }
1119                         leftoff = OffsetNumberNext(leftoff);
1120                 }
1121                 else
1122                 {
1123                         if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
1124                         {
1125                                 memset(rightpage, 0, BufferGetPageSize(rbuf));
1126                                 elog(ERROR, "failed to add old item to the right sibling"
1127                                          " while splitting block %u of index \"%s\"",
1128                                          origpagenumber, RelationGetRelationName(rel));
1129                         }
1130                         rightoff = OffsetNumberNext(rightoff);
1131                 }
1132         }
1133
1134         /* cope with possibility that newitem goes at the end */
1135         if (i <= newitemoff)
1136         {
1137                 /*
1138                  * Can't have newitemonleft here; that would imply we were told to put
1139                  * *everything* on the left page, which cannot fit (if it could, we'd
1140                  * not be splitting the page).
1141                  */
1142                 Assert(!newitemonleft);
1143                 if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
1144                 {
1145                         memset(rightpage, 0, BufferGetPageSize(rbuf));
1146                         elog(ERROR, "failed to add new item to the right sibling"
1147                                  " while splitting block %u of index \"%s\"",
1148                                  origpagenumber, RelationGetRelationName(rel));
1149                 }
1150                 rightoff = OffsetNumberNext(rightoff);
1151         }
1152
1153         /*
1154          * We have to grab the right sibling (if any) and fix the prev pointer
1155          * there. We are guaranteed that this is deadlock-free since no other
1156          * writer will be holding a lock on that page and trying to move left, and
1157          * all readers release locks on a page before trying to fetch its
1158          * neighbors.
1159          */
1160
1161         if (!P_RIGHTMOST(oopaque))
1162         {
1163                 sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
1164                 spage = BufferGetPage(sbuf);
1165                 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
1166                 if (sopaque->btpo_prev != origpagenumber)
1167                 {
1168                         memset(rightpage, 0, BufferGetPageSize(rbuf));
1169                         elog(ERROR, "right sibling's left-link doesn't match: "
1170                            "block %u links to %u instead of expected %u in index \"%s\"",
1171                                  oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
1172                                  RelationGetRelationName(rel));
1173                 }
1174
1175                 /*
1176                  * Check to see if we can set the SPLIT_END flag in the right-hand
1177                  * split page; this can save some I/O for vacuum since it need not
1178                  * proceed to the right sibling.  We can set the flag if the right
1179                  * sibling has a different cycleid: that means it could not be part of
1180                  * a group of pages that were all split off from the same ancestor
1181                  * page.  If you're confused, imagine that page A splits to A B and
1182                  * then again, yielding A C B, while vacuum is in progress.  Tuples
1183                  * originally in A could now be in either B or C, hence vacuum must
1184                  * examine both pages.  But if D, our right sibling, has a different
1185                  * cycleid then it could not contain any tuples that were in A when
1186                  * the vacuum started.
1187                  */
1188                 if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
1189                         ropaque->btpo_flags |= BTP_SPLIT_END;
1190         }
1191
1192         /*
1193          * Right sibling is locked, new siblings are prepared, but original page
1194          * is not updated yet.
1195          *
1196          * NO EREPORT(ERROR) till right sibling is updated.  We can get away with
1197          * not starting the critical section till here because we haven't been
1198          * scribbling on the original page yet; see comments above.
1199          */
1200         START_CRIT_SECTION();
1201
1202         /*
1203          * By here, the original data page has been split into two new halves, and
1204          * these are correct.  The algorithm requires that the left page never
1205          * move during a split, so we copy the new left page back on top of the
1206          * original.  Note that this is not a waste of time, since we also require
1207          * (in the page management code) that the center of a page always be
1208          * clean, and the most efficient way to guarantee this is just to compact
1209          * the data by reinserting it into a new left page.  (XXX the latter
1210          * comment is probably obsolete; but in any case it's good to not scribble
1211          * on the original page until we enter the critical section.)
1212          *
1213          * We need to do this before writing the WAL record, so that XLogInsert
1214          * can WAL log an image of the page if necessary.
1215          */
1216         PageRestoreTempPage(leftpage, origpage);
1217         /* leftpage, lopaque must not be used below here */
1218
1219         MarkBufferDirty(buf);
1220         MarkBufferDirty(rbuf);
1221
1222         if (!P_RIGHTMOST(ropaque))
1223         {
1224                 sopaque->btpo_prev = rightpagenumber;
1225                 MarkBufferDirty(sbuf);
1226         }
1227
1228         /*
1229          * Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes
1230          * a split.
1231          */
1232         if (!isleaf)
1233         {
1234                 Page            cpage = BufferGetPage(cbuf);
1235                 BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
1236
1237                 cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
1238                 MarkBufferDirty(cbuf);
1239         }
1240
1241         /* XLOG stuff */
1242         if (RelationNeedsWAL(rel))
1243         {
1244                 xl_btree_split xlrec;
1245                 uint8           xlinfo;
1246                 XLogRecPtr      recptr;
1247
1248                 xlrec.level = ropaque->btpo.level;
1249                 xlrec.firstright = firstright;
1250                 xlrec.newitemoff = newitemoff;
1251
1252                 XLogBeginInsert();
1253                 XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit);
1254
1255                 XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
1256                 XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT);
1257                 /* Log the right sibling, because we've changed its prev-pointer. */
1258                 if (!P_RIGHTMOST(ropaque))
1259                         XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD);
1260                 if (BufferIsValid(cbuf))
1261                         XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD);
1262
1263                 /*
1264                  * Log the new item, if it was inserted on the left page. (If it was
1265                  * put on the right page, we don't need to explicitly WAL log it
1266                  * because it's included with all the other items on the right page.)
1267                  * Show the new item as belonging to the left page buffer, so that it
1268                  * is not stored if XLogInsert decides it needs a full-page image of
1269                  * the left page.  We store the offset anyway, though, to support
1270                  * archive compression of these records.
1271                  */
1272                 if (newitemonleft)
1273                         XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz));
1274
1275                 /* Log left page */
1276                 if (!isleaf)
1277                 {
1278                         /*
1279                          * We must also log the left page's high key, because the right
1280                          * page's leftmost key is suppressed on non-leaf levels.  Show it
1281                          * as belonging to the left page buffer, so that it is not stored
1282                          * if XLogInsert decides it needs a full-page image of the left
1283                          * page.
1284                          */
1285                         itemid = PageGetItemId(origpage, P_HIKEY);
1286                         item = (IndexTuple) PageGetItem(origpage, itemid);
1287                         XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item)));
1288                 }
1289
1290                 /*
1291                  * Log the contents of the right page in the format understood by
1292                  * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
1293                  * because we're going to recreate the whole page anyway, so it should
1294                  * never be stored by XLogInsert.
1295                  *
1296                  * Direct access to page is not good but faster - we should implement
1297                  * some new func in page API.  Note we only store the tuples
1298                  * themselves, knowing that they were inserted in item-number order
1299                  * and so the item pointers can be reconstructed.  See comments for
1300                  * _bt_restore_page().
1301                  */
1302                 XLogRegisterBufData(1,
1303                                          (char *) rightpage + ((PageHeader) rightpage)->pd_upper,
1304                                                         ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
1305
1306                 if (isroot)
1307                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
1308                 else
1309                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
1310
1311                 recptr = XLogInsert(RM_BTREE_ID, xlinfo);
1312
1313                 PageSetLSN(origpage, recptr);
1314                 PageSetLSN(rightpage, recptr);
1315                 if (!P_RIGHTMOST(ropaque))
1316                 {
1317                         PageSetLSN(spage, recptr);
1318                 }
1319                 if (!isleaf)
1320                 {
1321                         PageSetLSN(BufferGetPage(cbuf), recptr);
1322                 }
1323         }
1324
1325         END_CRIT_SECTION();
1326
1327         /* release the old right sibling */
1328         if (!P_RIGHTMOST(ropaque))
1329                 _bt_relbuf(rel, sbuf);
1330
1331         /* release the child */
1332         if (!isleaf)
1333                 _bt_relbuf(rel, cbuf);
1334
1335         /* split's done */
1336         return rbuf;
1337 }
1338
1339 /*
1340  *      _bt_findsplitloc() -- find an appropriate place to split a page.
1341  *
1342  * The idea here is to equalize the free space that will be on each split
1343  * page, *after accounting for the inserted tuple*.  (If we fail to account
1344  * for it, we might find ourselves with too little room on the page that
1345  * it needs to go into!)
1346  *
1347  * If the page is the rightmost page on its level, we instead try to arrange
1348  * to leave the left split page fillfactor% full.  In this way, when we are
1349  * inserting successively increasing keys (consider sequences, timestamps,
1350  * etc) we will end up with a tree whose pages are about fillfactor% full,
1351  * instead of the 50% full result that we'd get without this special case.
1352  * This is the same as nbtsort.c produces for a newly-created tree.  Note
1353  * that leaf and nonleaf pages use different fillfactors.
1354  *
1355  * We are passed the intended insert position of the new tuple, expressed as
1356  * the offsetnumber of the tuple it must go in front of.  (This could be
1357  * maxoff+1 if the tuple is to go at the end.)
1358  *
1359  * We return the index of the first existing tuple that should go on the
1360  * righthand page, plus a boolean indicating whether the new tuple goes on
1361  * the left or right page.  The bool is necessary to disambiguate the case
1362  * where firstright == newitemoff.
1363  */
1364 static OffsetNumber
1365 _bt_findsplitloc(Relation rel,
1366                                  Page page,
1367                                  OffsetNumber newitemoff,
1368                                  Size newitemsz,
1369                                  bool *newitemonleft)
1370 {
1371         BTPageOpaque opaque;
1372         OffsetNumber offnum;
1373         OffsetNumber maxoff;
1374         ItemId          itemid;
1375         FindSplitData state;
1376         int                     leftspace,
1377                                 rightspace,
1378                                 goodenough,
1379                                 olddataitemstotal,
1380                                 olddataitemstoleft;
1381         bool            goodenoughfound;
1382
1383         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1384
1385         /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
1386         newitemsz += sizeof(ItemIdData);
1387
1388         /* Total free space available on a btree page, after fixed overhead */
1389         leftspace = rightspace =
1390                 PageGetPageSize(page) - SizeOfPageHeaderData -
1391                 MAXALIGN(sizeof(BTPageOpaqueData));
1392
1393         /* The right page will have the same high key as the old page */
1394         if (!P_RIGHTMOST(opaque))
1395         {
1396                 itemid = PageGetItemId(page, P_HIKEY);
1397                 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1398                                                          sizeof(ItemIdData));
1399         }
1400
1401         /* Count up total space in data items without actually scanning 'em */
1402         olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page);
1403
1404         state.newitemsz = newitemsz;
1405         state.is_leaf = P_ISLEAF(opaque);
1406         state.is_rightmost = P_RIGHTMOST(opaque);
1407         state.have_split = false;
1408         if (state.is_leaf)
1409                 state.fillfactor = RelationGetFillFactor(rel,
1410                                                                                                  BTREE_DEFAULT_FILLFACTOR);
1411         else
1412                 state.fillfactor = BTREE_NONLEAF_FILLFACTOR;
1413         state.newitemonleft = false;    /* these just to keep compiler quiet */
1414         state.firstright = 0;
1415         state.best_delta = 0;
1416         state.leftspace = leftspace;
1417         state.rightspace = rightspace;
1418         state.olddataitemstotal = olddataitemstotal;
1419         state.newitemoff = newitemoff;
1420
1421         /*
1422          * Finding the best possible split would require checking all the possible
1423          * split points, because of the high-key and left-key special cases.
1424          * That's probably more work than it's worth; instead, stop as soon as we
1425          * find a "good-enough" split, where good-enough is defined as an
1426          * imbalance in free space of no more than pagesize/16 (arbitrary...) This
1427          * should let us stop near the middle on most pages, instead of plowing to
1428          * the end.
1429          */
1430         goodenough = leftspace / 16;
1431
1432         /*
1433          * Scan through the data items and calculate space usage for a split at
1434          * each possible position.
1435          */
1436         olddataitemstoleft = 0;
1437         goodenoughfound = false;
1438         maxoff = PageGetMaxOffsetNumber(page);
1439
1440         for (offnum = P_FIRSTDATAKEY(opaque);
1441                  offnum <= maxoff;
1442                  offnum = OffsetNumberNext(offnum))
1443         {
1444                 Size            itemsz;
1445
1446                 itemid = PageGetItemId(page, offnum);
1447                 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1448
1449                 /*
1450                  * Will the new item go to left or right of split?
1451                  */
1452                 if (offnum > newitemoff)
1453                         _bt_checksplitloc(&state, offnum, true,
1454                                                           olddataitemstoleft, itemsz);
1455
1456                 else if (offnum < newitemoff)
1457                         _bt_checksplitloc(&state, offnum, false,
1458                                                           olddataitemstoleft, itemsz);
1459                 else
1460                 {
1461                         /* need to try it both ways! */
1462                         _bt_checksplitloc(&state, offnum, true,
1463                                                           olddataitemstoleft, itemsz);
1464
1465                         _bt_checksplitloc(&state, offnum, false,
1466                                                           olddataitemstoleft, itemsz);
1467                 }
1468
1469                 /* Abort scan once we find a good-enough choice */
1470                 if (state.have_split && state.best_delta <= goodenough)
1471                 {
1472                         goodenoughfound = true;
1473                         break;
1474                 }
1475
1476                 olddataitemstoleft += itemsz;
1477         }
1478
1479         /*
1480          * If the new item goes as the last item, check for splitting so that all
1481          * the old items go to the left page and the new item goes to the right
1482          * page.
1483          */
1484         if (newitemoff > maxoff && !goodenoughfound)
1485                 _bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0);
1486
1487         /*
1488          * I believe it is not possible to fail to find a feasible split, but just
1489          * in case ...
1490          */
1491         if (!state.have_split)
1492                 elog(ERROR, "could not find a feasible split point for index \"%s\"",
1493                          RelationGetRelationName(rel));
1494
1495         *newitemonleft = state.newitemonleft;
1496         return state.firstright;
1497 }
1498
1499 /*
1500  * Subroutine to analyze a particular possible split choice (ie, firstright
1501  * and newitemonleft settings), and record the best split so far in *state.
1502  *
1503  * firstoldonright is the offset of the first item on the original page
1504  * that goes to the right page, and firstoldonrightsz is the size of that
1505  * tuple. firstoldonright can be > max offset, which means that all the old
1506  * items go to the left page and only the new item goes to the right page.
1507  * In that case, firstoldonrightsz is not used.
1508  *
1509  * olddataitemstoleft is the total size of all old items to the left of
1510  * firstoldonright.
1511  */
1512 static void
1513 _bt_checksplitloc(FindSplitData *state,
1514                                   OffsetNumber firstoldonright,
1515                                   bool newitemonleft,
1516                                   int olddataitemstoleft,
1517                                   Size firstoldonrightsz)
1518 {
1519         int                     leftfree,
1520                                 rightfree;
1521         Size            firstrightitemsz;
1522         bool            newitemisfirstonright;
1523
1524         /* Is the new item going to be the first item on the right page? */
1525         newitemisfirstonright = (firstoldonright == state->newitemoff
1526                                                          && !newitemonleft);
1527
1528         if (newitemisfirstonright)
1529                 firstrightitemsz = state->newitemsz;
1530         else
1531                 firstrightitemsz = firstoldonrightsz;
1532
1533         /* Account for all the old tuples */
1534         leftfree = state->leftspace - olddataitemstoleft;
1535         rightfree = state->rightspace -
1536                 (state->olddataitemstotal - olddataitemstoleft);
1537
1538         /*
1539          * The first item on the right page becomes the high key of the left page;
1540          * therefore it counts against left space as well as right space.
1541          */
1542         leftfree -= firstrightitemsz;
1543
1544         /* account for the new item */
1545         if (newitemonleft)
1546                 leftfree -= (int) state->newitemsz;
1547         else
1548                 rightfree -= (int) state->newitemsz;
1549
1550         /*
1551          * If we are not on the leaf level, we will be able to discard the key
1552          * data from the first item that winds up on the right page.
1553          */
1554         if (!state->is_leaf)
1555                 rightfree += (int) firstrightitemsz -
1556                         (int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData));
1557
1558         /*
1559          * If feasible split point, remember best delta.
1560          */
1561         if (leftfree >= 0 && rightfree >= 0)
1562         {
1563                 int                     delta;
1564
1565                 if (state->is_rightmost)
1566                 {
1567                         /*
1568                          * If splitting a rightmost page, try to put (100-fillfactor)% of
1569                          * free space on left page. See comments for _bt_findsplitloc.
1570                          */
1571                         delta = (state->fillfactor * leftfree)
1572                                 - ((100 - state->fillfactor) * rightfree);
1573                 }
1574                 else
1575                 {
1576                         /* Otherwise, aim for equal free space on both sides */
1577                         delta = leftfree - rightfree;
1578                 }
1579
1580                 if (delta < 0)
1581                         delta = -delta;
1582                 if (!state->have_split || delta < state->best_delta)
1583                 {
1584                         state->have_split = true;
1585                         state->newitemonleft = newitemonleft;
1586                         state->firstright = firstoldonright;
1587                         state->best_delta = delta;
1588                 }
1589         }
1590 }
1591
1592 /*
1593  * _bt_insert_parent() -- Insert downlink into parent after a page split.
1594  *
1595  * On entry, buf and rbuf are the left and right split pages, which we
1596  * still hold write locks on per the L&Y algorithm.  We release the
1597  * write locks once we have write lock on the parent page.  (Any sooner,
1598  * and it'd be possible for some other process to try to split or delete
1599  * one of these pages, and get confused because it cannot find the downlink.)
1600  *
1601  * stack - stack showing how we got here.  May be NULL in cases that don't
1602  *                      have to be efficient (concurrent ROOT split, WAL recovery)
1603  * is_root - we split the true root
1604  * is_only - we split a page alone on its level (might have been fast root)
1605  */
1606 static void
1607 _bt_insert_parent(Relation rel,
1608                                   Buffer buf,
1609                                   Buffer rbuf,
1610                                   BTStack stack,
1611                                   bool is_root,
1612                                   bool is_only)
1613 {
1614         /*
1615          * Here we have to do something Lehman and Yao don't talk about: deal with
1616          * a root split and construction of a new root.  If our stack is empty
1617          * then we have just split a node on what had been the root level when we
1618          * descended the tree.  If it was still the root then we perform a
1619          * new-root construction.  If it *wasn't* the root anymore, search to find
1620          * the next higher level that someone constructed meanwhile, and find the
1621          * right place to insert as for the normal case.
1622          *
1623          * If we have to search for the parent level, we do so by re-descending
1624          * from the root.  This is not super-efficient, but it's rare enough not
1625          * to matter.
1626          */
1627         if (is_root)
1628         {
1629                 Buffer          rootbuf;
1630
1631                 Assert(stack == NULL);
1632                 Assert(is_only);
1633                 /* create a new root node and update the metapage */
1634                 rootbuf = _bt_newroot(rel, buf, rbuf);
1635                 /* release the split buffers */
1636                 _bt_relbuf(rel, rootbuf);
1637                 _bt_relbuf(rel, rbuf);
1638                 _bt_relbuf(rel, buf);
1639         }
1640         else
1641         {
1642                 BlockNumber bknum = BufferGetBlockNumber(buf);
1643                 BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1644                 Page            page = BufferGetPage(buf);
1645                 IndexTuple      new_item;
1646                 BTStackData fakestack;
1647                 IndexTuple      ritem;
1648                 Buffer          pbuf;
1649
1650                 if (stack == NULL)
1651                 {
1652                         BTPageOpaque lpageop;
1653
1654                         elog(DEBUG2, "concurrent ROOT page split");
1655                         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1656                         /* Find the leftmost page at the next level up */
1657                         pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
1658                         /* Set up a phony stack entry pointing there */
1659                         stack = &fakestack;
1660                         stack->bts_blkno = BufferGetBlockNumber(pbuf);
1661                         stack->bts_offset = InvalidOffsetNumber;
1662                         /* bts_btentry will be initialized below */
1663                         stack->bts_parent = NULL;
1664                         _bt_relbuf(rel, pbuf);
1665                 }
1666
1667                 /* get high key from left page == lowest key on new right page */
1668                 ritem = (IndexTuple) PageGetItem(page,
1669                                                                                  PageGetItemId(page, P_HIKEY));
1670
1671                 /* form an index tuple that points at the new right page */
1672                 new_item = CopyIndexTuple(ritem);
1673                 ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
1674
1675                 /*
1676                  * Find the parent buffer and get the parent page.
1677                  *
1678                  * Oops - if we were moved right then we need to change stack item! We
1679                  * want to find parent pointing to where we are, right ?        - vadim
1680                  * 05/27/97
1681                  */
1682                 ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
1683                 pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1684
1685                 /*
1686                  * Now we can unlock the right child. The left child will be unlocked
1687                  * by _bt_insertonpg().
1688                  */
1689                 _bt_relbuf(rel, rbuf);
1690
1691                 /* Check for error only after writing children */
1692                 if (pbuf == InvalidBuffer)
1693                         elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
1694                                  RelationGetRelationName(rel), bknum, rbknum);
1695
1696                 /* Recursively update the parent */
1697                 _bt_insertonpg(rel, pbuf, buf, stack->bts_parent,
1698                                            new_item, stack->bts_offset + 1,
1699                                            is_only);
1700
1701                 /* be tidy */
1702                 pfree(new_item);
1703         }
1704 }
1705
1706 /*
1707  * _bt_finish_split() -- Finish an incomplete split
1708  *
1709  * A crash or other failure can leave a split incomplete.  The insertion
1710  * routines won't allow to insert on a page that is incompletely split.
1711  * Before inserting on such a page, call _bt_finish_split().
1712  *
1713  * On entry, 'lbuf' must be locked in write-mode.  On exit, it is unlocked
1714  * and unpinned.
1715  */
1716 void
1717 _bt_finish_split(Relation rel, Buffer lbuf, BTStack stack)
1718 {
1719         Page            lpage = BufferGetPage(lbuf);
1720         BTPageOpaque lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
1721         Buffer          rbuf;
1722         Page            rpage;
1723         BTPageOpaque rpageop;
1724         bool            was_root;
1725         bool            was_only;
1726
1727         Assert(P_INCOMPLETE_SPLIT(lpageop));
1728
1729         /* Lock right sibling, the one missing the downlink */
1730         rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
1731         rpage = BufferGetPage(rbuf);
1732         rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
1733
1734         /* Could this be a root split? */
1735         if (!stack)
1736         {
1737                 Buffer          metabuf;
1738                 Page            metapg;
1739                 BTMetaPageData *metad;
1740
1741                 /* acquire lock on the metapage */
1742                 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1743                 metapg = BufferGetPage(metabuf);
1744                 metad = BTPageGetMeta(metapg);
1745
1746                 was_root = (metad->btm_root == BufferGetBlockNumber(lbuf));
1747
1748                 _bt_relbuf(rel, metabuf);
1749         }
1750         else
1751                 was_root = false;
1752
1753         /* Was this the only page on the level before split? */
1754         was_only = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop));
1755
1756         elog(DEBUG1, "finishing incomplete split of %u/%u",
1757                  BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf));
1758
1759         _bt_insert_parent(rel, lbuf, rbuf, stack, was_root, was_only);
1760 }
1761
1762 /*
1763  *      _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1764  *                                               we last looked at in the parent.
1765  *
1766  *              This is possible because we save the downlink from the parent item,
1767  *              which is enough to uniquely identify it.  Insertions into the parent
1768  *              level could cause the item to move right; deletions could cause it
1769  *              to move left, but not left of the page we previously found it in.
1770  *
1771  *              Adjusts bts_blkno & bts_offset if changed.
1772  *
1773  *              Returns InvalidBuffer if item not found (should not happen).
1774  */
1775 Buffer
1776 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1777 {
1778         BlockNumber blkno;
1779         OffsetNumber start;
1780
1781         blkno = stack->bts_blkno;
1782         start = stack->bts_offset;
1783
1784         for (;;)
1785         {
1786                 Buffer          buf;
1787                 Page            page;
1788                 BTPageOpaque opaque;
1789
1790                 buf = _bt_getbuf(rel, blkno, access);
1791                 page = BufferGetPage(buf);
1792                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1793
1794                 if (access == BT_WRITE && P_INCOMPLETE_SPLIT(opaque))
1795                 {
1796                         _bt_finish_split(rel, buf, stack->bts_parent);
1797                         continue;
1798                 }
1799
1800                 if (!P_IGNORE(opaque))
1801                 {
1802                         OffsetNumber offnum,
1803                                                 minoff,
1804                                                 maxoff;
1805                         ItemId          itemid;
1806                         IndexTuple      item;
1807
1808                         minoff = P_FIRSTDATAKEY(opaque);
1809                         maxoff = PageGetMaxOffsetNumber(page);
1810
1811                         /*
1812                          * start = InvalidOffsetNumber means "search the whole page". We
1813                          * need this test anyway due to possibility that page has a high
1814                          * key now when it didn't before.
1815                          */
1816                         if (start < minoff)
1817                                 start = minoff;
1818
1819                         /*
1820                          * Need this check too, to guard against possibility that page
1821                          * split since we visited it originally.
1822                          */
1823                         if (start > maxoff)
1824                                 start = OffsetNumberNext(maxoff);
1825
1826                         /*
1827                          * These loops will check every item on the page --- but in an
1828                          * order that's attuned to the probability of where it actually
1829                          * is.  Scan to the right first, then to the left.
1830                          */
1831                         for (offnum = start;
1832                                  offnum <= maxoff;
1833                                  offnum = OffsetNumberNext(offnum))
1834                         {
1835                                 itemid = PageGetItemId(page, offnum);
1836                                 item = (IndexTuple) PageGetItem(page, itemid);
1837                                 if (BTEntrySame(item, &stack->bts_btentry))
1838                                 {
1839                                         /* Return accurate pointer to where link is now */
1840                                         stack->bts_blkno = blkno;
1841                                         stack->bts_offset = offnum;
1842                                         return buf;
1843                                 }
1844                         }
1845
1846                         for (offnum = OffsetNumberPrev(start);
1847                                  offnum >= minoff;
1848                                  offnum = OffsetNumberPrev(offnum))
1849                         {
1850                                 itemid = PageGetItemId(page, offnum);
1851                                 item = (IndexTuple) PageGetItem(page, itemid);
1852                                 if (BTEntrySame(item, &stack->bts_btentry))
1853                                 {
1854                                         /* Return accurate pointer to where link is now */
1855                                         stack->bts_blkno = blkno;
1856                                         stack->bts_offset = offnum;
1857                                         return buf;
1858                                 }
1859                         }
1860                 }
1861
1862                 /*
1863                  * The item we're looking for moved right at least one page.
1864                  */
1865                 if (P_RIGHTMOST(opaque))
1866                 {
1867                         _bt_relbuf(rel, buf);
1868                         return InvalidBuffer;
1869                 }
1870                 blkno = opaque->btpo_next;
1871                 start = InvalidOffsetNumber;
1872                 _bt_relbuf(rel, buf);
1873         }
1874 }
1875
1876 /*
1877  *      _bt_newroot() -- Create a new root page for the index.
1878  *
1879  *              We've just split the old root page and need to create a new one.
1880  *              In order to do this, we add a new root page to the file, then lock
1881  *              the metadata page and update it.  This is guaranteed to be deadlock-
1882  *              free, because all readers release their locks on the metadata page
1883  *              before trying to lock the root, and all writers lock the root before
1884  *              trying to lock the metadata page.  We have a write lock on the old
1885  *              root page, so we have not introduced any cycles into the waits-for
1886  *              graph.
1887  *
1888  *              On entry, lbuf (the old root) and rbuf (its new peer) are write-
1889  *              locked. On exit, a new root page exists with entries for the
1890  *              two new children, metapage is updated and unlocked/unpinned.
1891  *              The new root buffer is returned to caller which has to unlock/unpin
1892  *              lbuf, rbuf & rootbuf.
1893  */
1894 static Buffer
1895 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1896 {
1897         Buffer          rootbuf;
1898         Page            lpage,
1899                                 rootpage;
1900         BlockNumber lbkno,
1901                                 rbkno;
1902         BlockNumber rootblknum;
1903         BTPageOpaque rootopaque;
1904         BTPageOpaque lopaque;
1905         ItemId          itemid;
1906         IndexTuple      item;
1907         IndexTuple      left_item;
1908         Size            left_item_sz;
1909         IndexTuple      right_item;
1910         Size            right_item_sz;
1911         Buffer          metabuf;
1912         Page            metapg;
1913         BTMetaPageData *metad;
1914
1915         lbkno = BufferGetBlockNumber(lbuf);
1916         rbkno = BufferGetBlockNumber(rbuf);
1917         lpage = BufferGetPage(lbuf);
1918         lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
1919
1920         /* get a new root page */
1921         rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1922         rootpage = BufferGetPage(rootbuf);
1923         rootblknum = BufferGetBlockNumber(rootbuf);
1924
1925         /* acquire lock on the metapage */
1926         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1927         metapg = BufferGetPage(metabuf);
1928         metad = BTPageGetMeta(metapg);
1929
1930         /*
1931          * Create downlink item for left page (old root).  Since this will be the
1932          * first item in a non-leaf page, it implicitly has minus-infinity key
1933          * value, so we need not store any actual key in it.
1934          */
1935         left_item_sz = sizeof(IndexTupleData);
1936         left_item = (IndexTuple) palloc(left_item_sz);
1937         left_item->t_info = left_item_sz;
1938         ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY);
1939
1940         /*
1941          * Create downlink item for right page.  The key for it is obtained from
1942          * the "high key" position in the left page.
1943          */
1944         itemid = PageGetItemId(lpage, P_HIKEY);
1945         right_item_sz = ItemIdGetLength(itemid);
1946         item = (IndexTuple) PageGetItem(lpage, itemid);
1947         right_item = CopyIndexTuple(item);
1948         ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
1949
1950         /* NO EREPORT(ERROR) from here till newroot op is logged */
1951         START_CRIT_SECTION();
1952
1953         /* set btree special data */
1954         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1955         rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1956         rootopaque->btpo_flags = BTP_ROOT;
1957         rootopaque->btpo.level =
1958                 ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1959         rootopaque->btpo_cycleid = 0;
1960
1961         /* update metapage data */
1962         metad->btm_root = rootblknum;
1963         metad->btm_level = rootopaque->btpo.level;
1964         metad->btm_fastroot = rootblknum;
1965         metad->btm_fastlevel = rootopaque->btpo.level;
1966
1967         /*
1968          * Insert the left page pointer into the new root page.  The root page is
1969          * the rightmost page on its level so there is no "high key" in it; the
1970          * two items will go into positions P_HIKEY and P_FIRSTKEY.
1971          *
1972          * Note: we *must* insert the two items in item-number order, for the
1973          * benefit of _bt_restore_page().
1974          */
1975         if (PageAddItem(rootpage, (Item) left_item, left_item_sz, P_HIKEY,
1976                                         false, false) == InvalidOffsetNumber)
1977                 elog(PANIC, "failed to add leftkey to new root page"
1978                          " while splitting block %u of index \"%s\"",
1979                          BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
1980
1981         /*
1982          * insert the right page pointer into the new root page.
1983          */
1984         if (PageAddItem(rootpage, (Item) right_item, right_item_sz, P_FIRSTKEY,
1985                                         false, false) == InvalidOffsetNumber)
1986                 elog(PANIC, "failed to add rightkey to new root page"
1987                          " while splitting block %u of index \"%s\"",
1988                          BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
1989
1990         /* Clear the incomplete-split flag in the left child */
1991         Assert(P_INCOMPLETE_SPLIT(lopaque));
1992         lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
1993         MarkBufferDirty(lbuf);
1994
1995         MarkBufferDirty(rootbuf);
1996         MarkBufferDirty(metabuf);
1997
1998         /* XLOG stuff */
1999         if (RelationNeedsWAL(rel))
2000         {
2001                 xl_btree_newroot xlrec;
2002                 XLogRecPtr      recptr;
2003                 xl_btree_metadata md;
2004
2005                 xlrec.rootblk = rootblknum;
2006                 xlrec.level = metad->btm_level;
2007
2008                 XLogBeginInsert();
2009                 XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot);
2010
2011                 XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
2012                 XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
2013                 XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
2014
2015                 md.root = rootblknum;
2016                 md.level = metad->btm_level;
2017                 md.fastroot = rootblknum;
2018                 md.fastlevel = metad->btm_level;
2019
2020                 XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata));
2021
2022                 /*
2023                  * Direct access to page is not good but faster - we should implement
2024                  * some new func in page API.
2025                  */
2026                 XLogRegisterBufData(0,
2027                                            (char *) rootpage + ((PageHeader) rootpage)->pd_upper,
2028                                                         ((PageHeader) rootpage)->pd_special -
2029                                                         ((PageHeader) rootpage)->pd_upper);
2030
2031                 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT);
2032
2033                 PageSetLSN(lpage, recptr);
2034                 PageSetLSN(rootpage, recptr);
2035                 PageSetLSN(metapg, recptr);
2036         }
2037
2038         END_CRIT_SECTION();
2039
2040         /* done with metapage */
2041         _bt_relbuf(rel, metabuf);
2042
2043         pfree(left_item);
2044         pfree(right_item);
2045
2046         return rootbuf;
2047 }
2048
2049 /*
2050  *      _bt_pgaddtup() -- add a tuple to a particular page in the index.
2051  *
2052  *              This routine adds the tuple to the page as requested.  It does
2053  *              not affect pin/lock status, but you'd better have a write lock
2054  *              and pin on the target buffer!  Don't forget to write and release
2055  *              the buffer afterwards, either.
2056  *
2057  *              The main difference between this routine and a bare PageAddItem call
2058  *              is that this code knows that the leftmost index tuple on a non-leaf
2059  *              btree page doesn't need to have a key.  Therefore, it strips such
2060  *              tuples down to just the tuple header.  CAUTION: this works ONLY if
2061  *              we insert the tuples in order, so that the given itup_off does
2062  *              represent the final position of the tuple!
2063  */
2064 static bool
2065 _bt_pgaddtup(Page page,
2066                          Size itemsize,
2067                          IndexTuple itup,
2068                          OffsetNumber itup_off)
2069 {
2070         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2071         IndexTupleData trunctuple;
2072
2073         if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
2074         {
2075                 trunctuple = *itup;
2076                 trunctuple.t_info = sizeof(IndexTupleData);
2077                 itup = &trunctuple;
2078                 itemsize = sizeof(IndexTupleData);
2079         }
2080
2081         if (PageAddItem(page, (Item) itup, itemsize, itup_off,
2082                                         false, false) == InvalidOffsetNumber)
2083                 return false;
2084
2085         return true;
2086 }
2087
2088 /*
2089  * _bt_isequal - used in _bt_doinsert in check for duplicates.
2090  *
2091  * This is very similar to _bt_compare, except for NULL handling.
2092  * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
2093  */
2094 static bool
2095 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
2096                         int keysz, ScanKey scankey)
2097 {
2098         IndexTuple      itup;
2099         int                     i;
2100
2101         /* Better be comparing to a leaf item */
2102         Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
2103
2104         itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
2105
2106         for (i = 1; i <= keysz; i++)
2107         {
2108                 AttrNumber      attno;
2109                 Datum           datum;
2110                 bool            isNull;
2111                 int32           result;
2112
2113                 attno = scankey->sk_attno;
2114                 Assert(attno == i);
2115                 datum = index_getattr(itup, attno, itupdesc, &isNull);
2116
2117                 /* NULLs are never equal to anything */
2118                 if (isNull || (scankey->sk_flags & SK_ISNULL))
2119                         return false;
2120
2121                 result = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
2122                                                                                                  scankey->sk_collation,
2123                                                                                                  datum,
2124                                                                                                  scankey->sk_argument));
2125
2126                 if (result != 0)
2127                         return false;
2128
2129                 scankey++;
2130         }
2131
2132         /* if we get here, the keys are equal */
2133         return true;
2134 }
2135
2136 /*
2137  * _bt_vacuum_one_page - vacuum just one index page.
2138  *
2139  * Try to remove LP_DEAD items from the given page.  The passed buffer
2140  * must be exclusive-locked, but unlike a real VACUUM, we don't need a
2141  * super-exclusive "cleanup" lock (see nbtree/README).
2142  */
2143 static void
2144 _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel)
2145 {
2146         OffsetNumber deletable[MaxOffsetNumber];
2147         int                     ndeletable = 0;
2148         OffsetNumber offnum,
2149                                 minoff,
2150                                 maxoff;
2151         Page            page = BufferGetPage(buffer);
2152         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2153
2154         /*
2155          * Scan over all items to see which ones need to be deleted according to
2156          * LP_DEAD flags.
2157          */
2158         minoff = P_FIRSTDATAKEY(opaque);
2159         maxoff = PageGetMaxOffsetNumber(page);
2160         for (offnum = minoff;
2161                  offnum <= maxoff;
2162                  offnum = OffsetNumberNext(offnum))
2163         {
2164                 ItemId          itemId = PageGetItemId(page, offnum);
2165
2166                 if (ItemIdIsDead(itemId))
2167                         deletable[ndeletable++] = offnum;
2168         }
2169
2170         if (ndeletable > 0)
2171                 _bt_delitems_delete(rel, buffer, deletable, ndeletable, heapRel);
2172
2173         /*
2174          * Note: if we didn't find any LP_DEAD items, then the page's
2175          * BTP_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
2176          * separate write to clear it, however.  We will clear it when we split
2177          * the page.
2178          */
2179 }