]> granicus.if.org Git - postgresql/blob - src/backend/access/nbtree/nbtinsert.c
adaa570566f4a5788cf380e0f407261dd2efccf9
[postgresql] / src / backend / access / nbtree / nbtinsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtinsert.c
4  *        Item insertion in Lehman and Yao btrees for Postgres.
5  *
6  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.163 2007/12/31 04:52:05 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "access/transam.h"
21 #include "miscadmin.h"
22 #include "utils/inval.h"
23
24
25 typedef struct
26 {
27         /* context data for _bt_checksplitloc */
28         Size            newitemsz;              /* size of new item to be inserted */
29         int                     fillfactor;             /* needed when splitting rightmost page */
30         bool            is_leaf;                /* T if splitting a leaf page */
31         bool            is_rightmost;   /* T if splitting a rightmost page */
32         OffsetNumber newitemoff;        /* where the new item is to be inserted */
33         int                     leftspace;              /* space available for items on left page */
34         int                     rightspace;             /* space available for items on right page */
35         int                     olddataitemstotal;              /* space taken by old items */
36
37         bool            have_split;             /* found a valid split? */
38
39         /* these fields valid only if have_split is true */
40         bool            newitemonleft;  /* new item on left or right of best split */
41         OffsetNumber firstright;        /* best split point */
42         int                     best_delta;             /* best size delta so far */
43 } FindSplitData;
44
45
46 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
47
48 static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
49                                  Relation heapRel, Buffer buf, OffsetNumber ioffset,
50                                  ScanKey itup_scankey);
51 static void _bt_findinsertloc(Relation rel,
52                                   Buffer *bufptr,
53                                   OffsetNumber *offsetptr,
54                                   int keysz,
55                                   ScanKey scankey,
56                                   IndexTuple newtup);
57 static void _bt_insertonpg(Relation rel, Buffer buf,
58                            BTStack stack,
59                            IndexTuple itup,
60                            OffsetNumber newitemoff,
61                            bool split_only_page);
62 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
63                   OffsetNumber newitemoff, Size newitemsz,
64                   IndexTuple newitem, bool newitemonleft);
65 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
66                                  OffsetNumber newitemoff,
67                                  Size newitemsz,
68                                  bool *newitemonleft);
69 static void _bt_checksplitloc(FindSplitData *state,
70                                   OffsetNumber firstoldonright, bool newitemonleft,
71                                   int dataitemstoleft, Size firstoldonrightsz);
72 static void _bt_pgaddtup(Relation rel, Page page,
73                          Size itemsize, IndexTuple itup,
74                          OffsetNumber itup_off, const char *where);
75 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
76                         int keysz, ScanKey scankey);
77 static void _bt_vacuum_one_page(Relation rel, Buffer buffer);
78
79
80 /*
81  *      _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
82  *
83  *              This routine is called by the public interface routines, btbuild
84  *              and btinsert.  By here, itup is filled in, including the TID.
85  */
86 void
87 _bt_doinsert(Relation rel, IndexTuple itup,
88                          bool index_is_unique, Relation heapRel)
89 {
90         int                     natts = rel->rd_rel->relnatts;
91         ScanKey         itup_scankey;
92         BTStack         stack;
93         Buffer          buf;
94         OffsetNumber offset;
95
96         /* we need an insertion scan key to do our search, so build one */
97         itup_scankey = _bt_mkscankey(rel, itup);
98
99 top:
100         /* find the first page containing this key */
101         stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
102
103         offset = InvalidOffsetNumber;
104
105         /* trade in our read lock for a write lock */
106         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
107         LockBuffer(buf, BT_WRITE);
108
109         /*
110          * If the page was split between the time that we surrendered our read
111          * lock and acquired our write lock, then this page may no longer be the
112          * right place for the key we want to insert.  In this case, we need to
113          * move right in the tree.      See Lehman and Yao for an excruciatingly
114          * precise description.
115          */
116         buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
117
118         /*
119          * If we're not allowing duplicates, make sure the key isn't already in
120          * the index.
121          *
122          * NOTE: obviously, _bt_check_unique can only detect keys that are already
123          * in the index; so it cannot defend against concurrent insertions of the
124          * same key.  We protect against that by means of holding a write lock on
125          * the target page.  Any other would-be inserter of the same key must
126          * acquire a write lock on the same target page, so only one would-be
127          * inserter can be making the check at one time.  Furthermore, once we are
128          * past the check we hold write locks continuously until we have performed
129          * our insertion, so no later inserter can fail to see our insertion.
130          * (This requires some care in _bt_insertonpg.)
131          *
132          * If we must wait for another xact, we release the lock while waiting,
133          * and then must start over completely.
134          */
135         if (index_is_unique)
136         {
137                 TransactionId xwait;
138
139                 offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
140                 xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey);
141
142                 if (TransactionIdIsValid(xwait))
143                 {
144                         /* Have to wait for the other guy ... */
145                         _bt_relbuf(rel, buf);
146                         XactLockTableWait(xwait);
147                         /* start over... */
148                         _bt_freestack(stack);
149                         goto top;
150                 }
151         }
152
153         /* do the insertion */
154         _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup);
155         _bt_insertonpg(rel, buf, stack, itup, offset, false);
156
157         /* be tidy */
158         _bt_freestack(stack);
159         _bt_freeskey(itup_scankey);
160 }
161
162 /*
163  *      _bt_check_unique() -- Check for violation of unique index constraint
164  *
165  * offset points to the first possible item that could conflict. It can
166  * also point to end-of-page, which means that the first tuple to check
167  * is the first tuple on the next page.
168  *
169  * Returns InvalidTransactionId if there is no conflict, else an xact ID
170  * we must wait for to see if it commits a conflicting tuple.   If an actual
171  * conflict is detected, no return --- just ereport().
172  */
173 static TransactionId
174 _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
175                                  Buffer buf, OffsetNumber offset, ScanKey itup_scankey)
176 {
177         TupleDesc       itupdesc = RelationGetDescr(rel);
178         int                     natts = rel->rd_rel->relnatts;
179         SnapshotData SnapshotDirty;
180         OffsetNumber maxoff;
181         Page            page;
182         BTPageOpaque opaque;
183         Buffer          nbuf = InvalidBuffer;
184
185         InitDirtySnapshot(SnapshotDirty);
186
187         page = BufferGetPage(buf);
188         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
189         maxoff = PageGetMaxOffsetNumber(page);
190
191         /*
192          * Scan over all equal tuples, looking for live conflicts.
193          */
194         for (;;)
195         {
196                 ItemId          curitemid;
197                 IndexTuple      curitup;
198                 BlockNumber nblkno;
199
200                 /*
201                  * make sure the offset points to an actual item before trying to
202                  * examine it...
203                  */
204                 if (offset <= maxoff)
205                 {
206                         curitemid = PageGetItemId(page, offset);
207
208                         /*
209                          * We can skip items that are marked killed.
210                          *
211                          * Formerly, we applied _bt_isequal() before checking the kill
212                          * flag, so as to fall out of the item loop as soon as possible.
213                          * However, in the presence of heavy update activity an index may
214                          * contain many killed items with the same key; running
215                          * _bt_isequal() on each killed item gets expensive. Furthermore
216                          * it is likely that the non-killed version of each key appears
217                          * first, so that we didn't actually get to exit any sooner
218                          * anyway. So now we just advance over killed items as quickly as
219                          * we can. We only apply _bt_isequal() when we get to a non-killed
220                          * item or the end of the page.
221                          */
222                         if (!ItemIdIsDead(curitemid))
223                         {
224                                 ItemPointerData htid;
225                                 bool            all_dead;
226
227                                 /*
228                                  * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
229                                  * how we handling NULLs - and so we must not use _bt_compare
230                                  * in real comparison, but only for ordering/finding items on
231                                  * pages. - vadim 03/24/97
232                                  */
233                                 if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
234                                         break;          /* we're past all the equal tuples */
235
236                                 /* okay, we gotta fetch the heap tuple ... */
237                                 curitup = (IndexTuple) PageGetItem(page, curitemid);
238                                 htid = curitup->t_tid;
239
240                                 /*
241                                  * We check the whole HOT-chain to see if there is any tuple
242                                  * that satisfies SnapshotDirty.  This is necessary because we
243                                  * have just a single index entry for the entire chain.
244                                  */
245                                 if (heap_hot_search(&htid, heapRel, &SnapshotDirty, &all_dead))
246                                 {
247                                         /* it is a duplicate */
248                                         TransactionId xwait =
249                                         (TransactionIdIsValid(SnapshotDirty.xmin)) ?
250                                         SnapshotDirty.xmin : SnapshotDirty.xmax;
251
252                                         /*
253                                          * If this tuple is being updated by other transaction
254                                          * then we have to wait for its commit/abort.
255                                          */
256                                         if (TransactionIdIsValid(xwait))
257                                         {
258                                                 if (nbuf != InvalidBuffer)
259                                                         _bt_relbuf(rel, nbuf);
260                                                 /* Tell _bt_doinsert to wait... */
261                                                 return xwait;
262                                         }
263
264                                         /*
265                                          * Otherwise we have a definite conflict.  But before
266                                          * complaining, look to see if the tuple we want to insert
267                                          * is itself now committed dead --- if so, don't complain.
268                                          * This is a waste of time in normal scenarios but we must
269                                          * do it to support CREATE INDEX CONCURRENTLY.
270                                          *
271                                          * We must follow HOT-chains here because during
272                                          * concurrent index build, we insert the root TID though
273                                          * the actual tuple may be somewhere in the HOT-chain.
274                                          * While following the chain we might not stop at the
275                                          * exact tuple which triggered the insert, but that's OK
276                                          * because if we find a live tuple anywhere in this chain,
277                                          * we have a unique key conflict.  The other live tuple is
278                                          * not part of this chain because it had a different index
279                                          * entry.
280                                          */
281                                         htid = itup->t_tid;
282                                         if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL))
283                                         {
284                                                 /* Normal case --- it's still live */
285                                         }
286                                         else
287                                         {
288                                                 /*
289                                                  * It's been deleted, so no error, and no need to
290                                                  * continue searching
291                                                  */
292                                                 break;
293                                         }
294
295                                         ereport(ERROR,
296                                                         (errcode(ERRCODE_UNIQUE_VIOLATION),
297                                                          errmsg("duplicate key value violates unique constraint \"%s\"",
298                                                                         RelationGetRelationName(rel))));
299                                 }
300                                 else if (all_dead)
301                                 {
302                                         /*
303                                          * The conflicting tuple (or whole HOT chain) is dead to
304                                          * everyone, so we may as well mark the index entry
305                                          * killed.
306                                          */
307                                         ItemIdMarkDead(curitemid);
308                                         opaque->btpo_flags |= BTP_HAS_GARBAGE;
309                                         /* be sure to mark the proper buffer dirty... */
310                                         if (nbuf != InvalidBuffer)
311                                                 SetBufferCommitInfoNeedsSave(nbuf);
312                                         else
313                                                 SetBufferCommitInfoNeedsSave(buf);
314                                 }
315                         }
316                 }
317
318                 /*
319                  * Advance to next tuple to continue checking.
320                  */
321                 if (offset < maxoff)
322                         offset = OffsetNumberNext(offset);
323                 else
324                 {
325                         /* If scankey == hikey we gotta check the next page too */
326                         if (P_RIGHTMOST(opaque))
327                                 break;
328                         if (!_bt_isequal(itupdesc, page, P_HIKEY,
329                                                          natts, itup_scankey))
330                                 break;
331                         /* Advance to next non-dead page --- there must be one */
332                         for (;;)
333                         {
334                                 nblkno = opaque->btpo_next;
335                                 nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
336                                 page = BufferGetPage(nbuf);
337                                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
338                                 if (!P_IGNORE(opaque))
339                                         break;
340                                 if (P_RIGHTMOST(opaque))
341                                         elog(ERROR, "fell off the end of index \"%s\"",
342                                                  RelationGetRelationName(rel));
343                         }
344                         maxoff = PageGetMaxOffsetNumber(page);
345                         offset = P_FIRSTDATAKEY(opaque);
346                 }
347         }
348
349         if (nbuf != InvalidBuffer)
350                 _bt_relbuf(rel, nbuf);
351
352         return InvalidTransactionId;
353 }
354
355
356 /*
357  *      _bt_findinsertloc() -- Finds an insert location for a tuple
358  *
359  *              If the new key is equal to one or more existing keys, we can
360  *              legitimately place it anywhere in the series of equal keys --- in fact,
361  *              if the new key is equal to the page's "high key" we can place it on
362  *              the next page.  If it is equal to the high key, and there's not room
363  *              to insert the new tuple on the current page without splitting, then
364  *              we can move right hoping to find more free space and avoid a split.
365  *              (We should not move right indefinitely, however, since that leads to
366  *              O(N^2) insertion behavior in the presence of many equal keys.)
367  *              Once we have chosen the page to put the key on, we'll insert it before
368  *              any existing equal keys because of the way _bt_binsrch() works.
369  *
370  *              If there's not enough room in the space, we try to make room by
371  *              removing any LP_DEAD tuples.
372  *
373  *              On entry, *buf and *offsetptr point to the first legal position
374  *              where the new tuple could be inserted.  The caller should hold an
375  *              exclusive lock on *buf.  *offsetptr can also be set to
376  *              InvalidOffsetNumber, in which case the function will search for the
377  *              right location within the page if needed.  On exit, they point to the
378  *              chosen insert location.  If _bt_findinsertloc decides to move right,
379  *              the lock and pin on the original page will be released and the new
380  *              page returned to the caller is exclusively locked instead.
381  *
382  *              newtup is the new tuple we're inserting, and scankey is an insertion
383  *              type scan key for it.
384  */
385 static void
386 _bt_findinsertloc(Relation rel,
387                                   Buffer *bufptr,
388                                   OffsetNumber *offsetptr,
389                                   int keysz,
390                                   ScanKey scankey,
391                                   IndexTuple newtup)
392 {
393         Buffer          buf = *bufptr;
394         Page            page = BufferGetPage(buf);
395         Size            itemsz;
396         BTPageOpaque lpageop;
397         bool            movedright,
398                                 vacuumed;
399         OffsetNumber newitemoff;
400         OffsetNumber firstlegaloff = *offsetptr;
401
402         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
403
404         itemsz = IndexTupleDSize(*newtup);
405         itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but we
406                                                                  * need to be consistent */
407
408         /*
409          * Check whether the item can fit on a btree page at all. (Eventually, we
410          * ought to try to apply TOAST methods if not.) We actually need to be
411          * able to fit three items on every page, so restrict any one item to 1/3
412          * the per-page available space. Note that at this point, itemsz doesn't
413          * include the ItemId.
414          */
415         if (itemsz > BTMaxItemSize(page))
416                 ereport(ERROR,
417                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
418                                  errmsg("index row size %lu exceeds btree maximum, %lu",
419                                                 (unsigned long) itemsz,
420                                                 (unsigned long) BTMaxItemSize(page)),
421                 errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
422                                 "Consider a function index of an MD5 hash of the value, "
423                                 "or use full text indexing.")));
424
425         /*----------
426          * If we will need to split the page to put the item on this page,
427          * check whether we can put the tuple somewhere to the right,
428          * instead.  Keep scanning right until we
429          *              (a) find a page with enough free space,
430          *              (b) reach the last page where the tuple can legally go, or
431          *              (c) get tired of searching.
432          * (c) is not flippant; it is important because if there are many
433          * pages' worth of equal keys, it's better to split one of the early
434          * pages than to scan all the way to the end of the run of equal keys
435          * on every insert.  We implement "get tired" as a random choice,
436          * since stopping after scanning a fixed number of pages wouldn't work
437          * well (we'd never reach the right-hand side of previously split
438          * pages).      Currently the probability of moving right is set at 0.99,
439          * which may seem too high to change the behavior much, but it does an
440          * excellent job of preventing O(N^2) behavior with many equal keys.
441          *----------
442          */
443         movedright = false;
444         vacuumed = false;
445         while (PageGetFreeSpace(page) < itemsz)
446         {
447                 Buffer          rbuf;
448
449                 /*
450                  * before considering moving right, see if we can obtain enough space
451                  * by erasing LP_DEAD items
452                  */
453                 if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
454                 {
455                         _bt_vacuum_one_page(rel, buf);
456
457                         /*
458                          * remember that we vacuumed this page, because that makes the
459                          * hint supplied by the caller invalid
460                          */
461                         vacuumed = true;
462
463                         if (PageGetFreeSpace(page) >= itemsz)
464                                 break;                  /* OK, now we have enough space */
465                 }
466
467                 /*
468                  * nope, so check conditions (b) and (c) enumerated above
469                  */
470                 if (P_RIGHTMOST(lpageop) ||
471                         _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
472                         random() <= (MAX_RANDOM_VALUE / 100))
473                         break;
474
475                 /*
476                  * step right to next non-dead page
477                  *
478                  * must write-lock that page before releasing write lock on current
479                  * page; else someone else's _bt_check_unique scan could fail to see
480                  * our insertion.  write locks on intermediate dead pages won't do
481                  * because we don't know when they will get de-linked from the tree.
482                  */
483                 rbuf = InvalidBuffer;
484
485                 for (;;)
486                 {
487                         BlockNumber rblkno = lpageop->btpo_next;
488
489                         rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
490                         page = BufferGetPage(rbuf);
491                         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
492                         if (!P_IGNORE(lpageop))
493                                 break;
494                         if (P_RIGHTMOST(lpageop))
495                                 elog(ERROR, "fell off the end of index \"%s\"",
496                                          RelationGetRelationName(rel));
497                 }
498                 _bt_relbuf(rel, buf);
499                 buf = rbuf;
500                 movedright = true;
501                 vacuumed = false;
502         }
503
504         /*
505          * Now we are on the right page, so find the insert position. If we moved
506          * right at all, we know we should insert at the start of the page. If we
507          * didn't move right, we can use the firstlegaloff hint if the caller
508          * supplied one, unless we vacuumed the page which might have moved tuples
509          * around making the hint invalid. If we didn't move right or can't use
510          * the hint, find the position by searching.
511          */
512         if (movedright)
513                 newitemoff = P_FIRSTDATAKEY(lpageop);
514         else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
515                 newitemoff = firstlegaloff;
516         else
517                 newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
518
519         *bufptr = buf;
520         *offsetptr = newitemoff;
521 }
522
523 /*----------
524  *      _bt_insertonpg() -- Insert a tuple on a particular page in the index.
525  *
526  *              This recursive procedure does the following things:
527  *
528  *                      +  if necessary, splits the target page (making sure that the
529  *                         split is equitable as far as post-insert free space goes).
530  *                      +  inserts the tuple.
531  *                      +  if the page was split, pops the parent stack, and finds the
532  *                         right place to insert the new child pointer (by walking
533  *                         right using information stored in the parent stack).
534  *                      +  invokes itself with the appropriate tuple for the right
535  *                         child page on the parent.
536  *                      +  updates the metapage if a true root or fast root is split.
537  *
538  *              On entry, we must have the right buffer in which to do the
539  *              insertion, and the buffer must be pinned and write-locked.      On return,
540  *              we will have dropped both the pin and the lock on the buffer.
541  *
542  *              The locking interactions in this code are critical.  You should
543  *              grok Lehman and Yao's paper before making any changes.  In addition,
544  *              you need to understand how we disambiguate duplicate keys in this
545  *              implementation, in order to be able to find our location using
546  *              L&Y "move right" operations.  Since we may insert duplicate user
547  *              keys, and since these dups may propagate up the tree, we use the
548  *              'afteritem' parameter to position ourselves correctly for the
549  *              insertion on internal pages.
550  *----------
551  */
552 static void
553 _bt_insertonpg(Relation rel,
554                            Buffer buf,
555                            BTStack stack,
556                            IndexTuple itup,
557                            OffsetNumber newitemoff,
558                            bool split_only_page)
559 {
560         Page            page;
561         BTPageOpaque lpageop;
562         OffsetNumber firstright = InvalidOffsetNumber;
563         Size            itemsz;
564
565         page = BufferGetPage(buf);
566         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
567
568         itemsz = IndexTupleDSize(*itup);
569         itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but we
570                                                                  * need to be consistent */
571
572         /*
573          * Do we need to split the page to fit the item on it?
574          *
575          * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
576          * so this comparison is correct even though we appear to be accounting
577          * only for the item and not for its line pointer.
578          */
579         if (PageGetFreeSpace(page) < itemsz)
580         {
581                 bool            is_root = P_ISROOT(lpageop);
582                 bool            is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
583                 bool            newitemonleft;
584                 Buffer          rbuf;
585
586                 /* Choose the split point */
587                 firstright = _bt_findsplitloc(rel, page,
588                                                                           newitemoff, itemsz,
589                                                                           &newitemonleft);
590
591                 /* split the buffer into left and right halves */
592                 rbuf = _bt_split(rel, buf, firstright,
593                                                  newitemoff, itemsz, itup, newitemonleft);
594
595                 /*----------
596                  * By here,
597                  *
598                  *              +  our target page has been split;
599                  *              +  the original tuple has been inserted;
600                  *              +  we have write locks on both the old (left half)
601                  *                 and new (right half) buffers, after the split; and
602                  *              +  we know the key we want to insert into the parent
603                  *                 (it's the "high key" on the left child page).
604                  *
605                  * We're ready to do the parent insertion.  We need to hold onto the
606                  * locks for the child pages until we locate the parent, but we can
607                  * release them before doing the actual insertion (see Lehman and Yao
608                  * for the reasoning).
609                  *----------
610                  */
611                 _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
612         }
613         else
614         {
615                 Buffer          metabuf = InvalidBuffer;
616                 Page            metapg = NULL;
617                 BTMetaPageData *metad = NULL;
618                 OffsetNumber itup_off;
619                 BlockNumber itup_blkno;
620
621                 itup_off = newitemoff;
622                 itup_blkno = BufferGetBlockNumber(buf);
623
624                 /*
625                  * If we are doing this insert because we split a page that was the
626                  * only one on its tree level, but was not the root, it may have been
627                  * the "fast root".  We need to ensure that the fast root link points
628                  * at or above the current page.  We can safely acquire a lock on the
629                  * metapage here --- see comments for _bt_newroot().
630                  */
631                 if (split_only_page)
632                 {
633                         Assert(!P_ISLEAF(lpageop));
634
635                         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
636                         metapg = BufferGetPage(metabuf);
637                         metad = BTPageGetMeta(metapg);
638
639                         if (metad->btm_fastlevel >= lpageop->btpo.level)
640                         {
641                                 /* no update wanted */
642                                 _bt_relbuf(rel, metabuf);
643                                 metabuf = InvalidBuffer;
644                         }
645                 }
646
647                 /* Do the update.  No ereport(ERROR) until changes are logged */
648                 START_CRIT_SECTION();
649
650                 _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page");
651
652                 MarkBufferDirty(buf);
653
654                 if (BufferIsValid(metabuf))
655                 {
656                         metad->btm_fastroot = itup_blkno;
657                         metad->btm_fastlevel = lpageop->btpo.level;
658                         MarkBufferDirty(metabuf);
659                 }
660
661                 /* XLOG stuff */
662                 if (!rel->rd_istemp)
663                 {
664                         xl_btree_insert xlrec;
665                         BlockNumber xldownlink;
666                         xl_btree_metadata xlmeta;
667                         uint8           xlinfo;
668                         XLogRecPtr      recptr;
669                         XLogRecData rdata[4];
670                         XLogRecData *nextrdata;
671                         IndexTupleData trunctuple;
672
673                         xlrec.target.node = rel->rd_node;
674                         ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
675
676                         rdata[0].data = (char *) &xlrec;
677                         rdata[0].len = SizeOfBtreeInsert;
678                         rdata[0].buffer = InvalidBuffer;
679                         rdata[0].next = nextrdata = &(rdata[1]);
680
681                         if (P_ISLEAF(lpageop))
682                                 xlinfo = XLOG_BTREE_INSERT_LEAF;
683                         else
684                         {
685                                 xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid));
686                                 Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
687
688                                 nextrdata->data = (char *) &xldownlink;
689                                 nextrdata->len = sizeof(BlockNumber);
690                                 nextrdata->buffer = InvalidBuffer;
691                                 nextrdata->next = nextrdata + 1;
692                                 nextrdata++;
693
694                                 xlinfo = XLOG_BTREE_INSERT_UPPER;
695                         }
696
697                         if (BufferIsValid(metabuf))
698                         {
699                                 xlmeta.root = metad->btm_root;
700                                 xlmeta.level = metad->btm_level;
701                                 xlmeta.fastroot = metad->btm_fastroot;
702                                 xlmeta.fastlevel = metad->btm_fastlevel;
703
704                                 nextrdata->data = (char *) &xlmeta;
705                                 nextrdata->len = sizeof(xl_btree_metadata);
706                                 nextrdata->buffer = InvalidBuffer;
707                                 nextrdata->next = nextrdata + 1;
708                                 nextrdata++;
709
710                                 xlinfo = XLOG_BTREE_INSERT_META;
711                         }
712
713                         /* Read comments in _bt_pgaddtup */
714                         if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
715                         {
716                                 trunctuple = *itup;
717                                 trunctuple.t_info = sizeof(IndexTupleData);
718                                 nextrdata->data = (char *) &trunctuple;
719                                 nextrdata->len = sizeof(IndexTupleData);
720                         }
721                         else
722                         {
723                                 nextrdata->data = (char *) itup;
724                                 nextrdata->len = IndexTupleDSize(*itup);
725                         }
726                         nextrdata->buffer = buf;
727                         nextrdata->buffer_std = true;
728                         nextrdata->next = NULL;
729
730                         recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
731
732                         if (BufferIsValid(metabuf))
733                         {
734                                 PageSetLSN(metapg, recptr);
735                                 PageSetTLI(metapg, ThisTimeLineID);
736                         }
737
738                         PageSetLSN(page, recptr);
739                         PageSetTLI(page, ThisTimeLineID);
740                 }
741
742                 END_CRIT_SECTION();
743
744                 /* release buffers; send out relcache inval if metapage changed */
745                 if (BufferIsValid(metabuf))
746                 {
747                         CacheInvalidateRelcache(rel);
748                         _bt_relbuf(rel, metabuf);
749                 }
750
751                 _bt_relbuf(rel, buf);
752         }
753 }
754
755 /*
756  *      _bt_split() -- split a page in the btree.
757  *
758  *              On entry, buf is the page to split, and is pinned and write-locked.
759  *              firstright is the item index of the first item to be moved to the
760  *              new right page.  newitemoff etc. tell us about the new item that
761  *              must be inserted along with the data from the old page.
762  *
763  *              Returns the new right sibling of buf, pinned and write-locked.
764  *              The pin and lock on buf are maintained.
765  */
766 static Buffer
767 _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
768                   OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
769                   bool newitemonleft)
770 {
771         Buffer          rbuf;
772         Page            origpage;
773         Page            leftpage,
774                                 rightpage;
775         BTPageOpaque ropaque,
776                                 lopaque,
777                                 oopaque;
778         Buffer          sbuf = InvalidBuffer;
779         Page            spage = NULL;
780         BTPageOpaque sopaque = NULL;
781         Size            itemsz;
782         ItemId          itemid;
783         IndexTuple      item;
784         OffsetNumber leftoff,
785                                 rightoff;
786         OffsetNumber maxoff;
787         OffsetNumber i;
788         bool            isroot;
789
790         rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
791         origpage = BufferGetPage(buf);
792         leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
793         rightpage = BufferGetPage(rbuf);
794
795         _bt_pageinit(leftpage, BufferGetPageSize(buf));
796         /* rightpage was already initialized by _bt_getbuf */
797
798         /*
799          * Copy the original page's LSN and TLI into leftpage, which will become
800          * the updated version of the page.  We need this because XLogInsert will
801          * examine these fields and possibly dump them in a page image.
802          */
803         PageSetLSN(leftpage, PageGetLSN(origpage));
804         PageSetTLI(leftpage, PageGetTLI(origpage));
805
806         /* init btree private data */
807         oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
808         lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
809         ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
810
811         isroot = P_ISROOT(oopaque);
812
813         /* if we're splitting this page, it won't be the root when we're done */
814         /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
815         lopaque->btpo_flags = oopaque->btpo_flags;
816         lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
817         ropaque->btpo_flags = lopaque->btpo_flags;
818         lopaque->btpo_prev = oopaque->btpo_prev;
819         lopaque->btpo_next = BufferGetBlockNumber(rbuf);
820         ropaque->btpo_prev = BufferGetBlockNumber(buf);
821         ropaque->btpo_next = oopaque->btpo_next;
822         lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
823         /* Since we already have write-lock on both pages, ok to read cycleid */
824         lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
825         ropaque->btpo_cycleid = lopaque->btpo_cycleid;
826
827         /*
828          * If the page we're splitting is not the rightmost page at its level in
829          * the tree, then the first entry on the page is the high key for the
830          * page.  We need to copy that to the right half.  Otherwise (meaning the
831          * rightmost page case), all the items on the right half will be user
832          * data.
833          */
834         rightoff = P_HIKEY;
835
836         if (!P_RIGHTMOST(oopaque))
837         {
838                 itemid = PageGetItemId(origpage, P_HIKEY);
839                 itemsz = ItemIdGetLength(itemid);
840                 item = (IndexTuple) PageGetItem(origpage, itemid);
841                 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
842                                                 false, false) == InvalidOffsetNumber)
843                         elog(PANIC, "failed to add hikey to the right sibling"
844                                  " while splitting block %u of index \"%s\"",
845                                  BufferGetBlockNumber(buf), RelationGetRelationName(rel));
846                 rightoff = OffsetNumberNext(rightoff);
847         }
848
849         /*
850          * The "high key" for the new left page will be the first key that's going
851          * to go into the new right page.  This might be either the existing data
852          * item at position firstright, or the incoming tuple.
853          */
854         leftoff = P_HIKEY;
855         if (!newitemonleft && newitemoff == firstright)
856         {
857                 /* incoming tuple will become first on right page */
858                 itemsz = newitemsz;
859                 item = newitem;
860         }
861         else
862         {
863                 /* existing item at firstright will become first on right page */
864                 itemid = PageGetItemId(origpage, firstright);
865                 itemsz = ItemIdGetLength(itemid);
866                 item = (IndexTuple) PageGetItem(origpage, itemid);
867         }
868         if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
869                                         false, false) == InvalidOffsetNumber)
870                 elog(PANIC, "failed to add hikey to the left sibling"
871                          " while splitting block %u of index \"%s\"",
872                          BufferGetBlockNumber(buf), RelationGetRelationName(rel));
873         leftoff = OffsetNumberNext(leftoff);
874
875         /*
876          * Now transfer all the data items to the appropriate page.
877          *
878          * Note: we *must* insert at least the right page's items in item-number
879          * order, for the benefit of _bt_restore_page().
880          */
881         maxoff = PageGetMaxOffsetNumber(origpage);
882
883         for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
884         {
885                 itemid = PageGetItemId(origpage, i);
886                 itemsz = ItemIdGetLength(itemid);
887                 item = (IndexTuple) PageGetItem(origpage, itemid);
888
889                 /* does new item belong before this one? */
890                 if (i == newitemoff)
891                 {
892                         if (newitemonleft)
893                         {
894                                 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
895                                                          "left sibling");
896                                 leftoff = OffsetNumberNext(leftoff);
897                         }
898                         else
899                         {
900                                 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
901                                                          "right sibling");
902                                 rightoff = OffsetNumberNext(rightoff);
903                         }
904                 }
905
906                 /* decide which page to put it on */
907                 if (i < firstright)
908                 {
909                         _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
910                                                  "left sibling");
911                         leftoff = OffsetNumberNext(leftoff);
912                 }
913                 else
914                 {
915                         _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
916                                                  "right sibling");
917                         rightoff = OffsetNumberNext(rightoff);
918                 }
919         }
920
921         /* cope with possibility that newitem goes at the end */
922         if (i <= newitemoff)
923         {
924                 /*
925                  * Can't have newitemonleft here; that would imply we were told to put
926                  * *everything* on the left page, which cannot fit (if it could, we'd
927                  * not be splitting the page).
928                  */
929                 Assert(!newitemonleft);
930                 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
931                                          "right sibling");
932                 rightoff = OffsetNumberNext(rightoff);
933         }
934
935         /*
936          * We have to grab the right sibling (if any) and fix the prev pointer
937          * there. We are guaranteed that this is deadlock-free since no other
938          * writer will be holding a lock on that page and trying to move left, and
939          * all readers release locks on a page before trying to fetch its
940          * neighbors.
941          */
942
943         if (!P_RIGHTMOST(ropaque))
944         {
945                 sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
946                 spage = BufferGetPage(sbuf);
947                 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
948                 if (sopaque->btpo_prev != ropaque->btpo_prev)
949                         elog(PANIC, "right sibling's left-link doesn't match: "
950                                  "block %u links to %u instead of expected %u in index \"%s\"",
951                                  ropaque->btpo_next, sopaque->btpo_prev, ropaque->btpo_prev,
952                                  RelationGetRelationName(rel));
953
954                 /*
955                  * Check to see if we can set the SPLIT_END flag in the right-hand
956                  * split page; this can save some I/O for vacuum since it need not
957                  * proceed to the right sibling.  We can set the flag if the right
958                  * sibling has a different cycleid: that means it could not be part of
959                  * a group of pages that were all split off from the same ancestor
960                  * page.  If you're confused, imagine that page A splits to A B and
961                  * then again, yielding A C B, while vacuum is in progress.  Tuples
962                  * originally in A could now be in either B or C, hence vacuum must
963                  * examine both pages.  But if D, our right sibling, has a different
964                  * cycleid then it could not contain any tuples that were in A when
965                  * the vacuum started.
966                  */
967                 if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
968                         ropaque->btpo_flags |= BTP_SPLIT_END;
969         }
970
971         /*
972          * Right sibling is locked, new siblings are prepared, but original page
973          * is not updated yet.
974          *
975          * NO EREPORT(ERROR) till right sibling is updated.  We can get away with
976          * not starting the critical section till here because we haven't been
977          * scribbling on the original page yet, and we don't care about the new
978          * sibling until it's linked into the btree.
979          */
980         START_CRIT_SECTION();
981
982         /*
983          * By here, the original data page has been split into two new halves, and
984          * these are correct.  The algorithm requires that the left page never
985          * move during a split, so we copy the new left page back on top of the
986          * original.  Note that this is not a waste of time, since we also require
987          * (in the page management code) that the center of a page always be
988          * clean, and the most efficient way to guarantee this is just to compact
989          * the data by reinserting it into a new left page.  (XXX the latter
990          * comment is probably obsolete.)
991          *
992          * We need to do this before writing the WAL record, so that XLogInsert
993          * can WAL log an image of the page if necessary.
994          */
995         PageRestoreTempPage(leftpage, origpage);
996
997         MarkBufferDirty(buf);
998         MarkBufferDirty(rbuf);
999
1000         if (!P_RIGHTMOST(ropaque))
1001         {
1002                 sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
1003                 MarkBufferDirty(sbuf);
1004         }
1005
1006         /* XLOG stuff */
1007         if (!rel->rd_istemp)
1008         {
1009                 xl_btree_split xlrec;
1010                 uint8           xlinfo;
1011                 XLogRecPtr      recptr;
1012                 XLogRecData rdata[7];
1013                 XLogRecData *lastrdata;
1014
1015                 xlrec.node = rel->rd_node;
1016                 xlrec.leftsib = BufferGetBlockNumber(buf);
1017                 xlrec.rightsib = BufferGetBlockNumber(rbuf);
1018                 xlrec.rnext = ropaque->btpo_next;
1019                 xlrec.level = ropaque->btpo.level;
1020                 xlrec.firstright = firstright;
1021
1022                 rdata[0].data = (char *) &xlrec;
1023                 rdata[0].len = SizeOfBtreeSplit;
1024                 rdata[0].buffer = InvalidBuffer;
1025
1026                 lastrdata = &rdata[0];
1027
1028                 if (ropaque->btpo.level > 0)
1029                 {
1030                         /* Log downlink on non-leaf pages */
1031                         lastrdata->next = lastrdata + 1;
1032                         lastrdata++;
1033
1034                         lastrdata->data = (char *) &newitem->t_tid.ip_blkid;
1035                         lastrdata->len = sizeof(BlockIdData);
1036                         lastrdata->buffer = InvalidBuffer;
1037
1038                         /*
1039                          * We must also log the left page's high key, because the right
1040                          * page's leftmost key is suppressed on non-leaf levels.  Show it
1041                          * as belonging to the left page buffer, so that it is not stored
1042                          * if XLogInsert decides it needs a full-page image of the left
1043                          * page.
1044                          */
1045                         lastrdata->next = lastrdata + 1;
1046                         lastrdata++;
1047
1048                         itemid = PageGetItemId(origpage, P_HIKEY);
1049                         item = (IndexTuple) PageGetItem(origpage, itemid);
1050                         lastrdata->data = (char *) item;
1051                         lastrdata->len = MAXALIGN(IndexTupleSize(item));
1052                         lastrdata->buffer = buf;        /* backup block 1 */
1053                         lastrdata->buffer_std = true;
1054                 }
1055
1056                 /*
1057                  * Log the new item and its offset, if it was inserted on the left
1058                  * page. (If it was put on the right page, we don't need to explicitly
1059                  * WAL log it because it's included with all the other items on the
1060                  * right page.) Show the new item as belonging to the left page
1061                  * buffer, so that it is not stored if XLogInsert decides it needs a
1062                  * full-page image of the left page.  We store the offset anyway,
1063                  * though, to support archive compression of these records.
1064                  */
1065                 if (newitemonleft)
1066                 {
1067                         lastrdata->next = lastrdata + 1;
1068                         lastrdata++;
1069
1070                         lastrdata->data = (char *) &newitemoff;
1071                         lastrdata->len = sizeof(OffsetNumber);
1072                         lastrdata->buffer = InvalidBuffer;
1073
1074                         lastrdata->next = lastrdata + 1;
1075                         lastrdata++;
1076
1077                         lastrdata->data = (char *) newitem;
1078                         lastrdata->len = MAXALIGN(newitemsz);
1079                         lastrdata->buffer = buf;        /* backup block 1 */
1080                         lastrdata->buffer_std = true;
1081                 }
1082                 else if (ropaque->btpo.level == 0)
1083                 {
1084                         /*
1085                          * Although we don't need to WAL-log the new item, we still need
1086                          * XLogInsert to consider storing a full-page image of the left
1087                          * page, so make an empty entry referencing that buffer. This also
1088                          * ensures that the left page is always backup block 1.
1089                          */
1090                         lastrdata->next = lastrdata + 1;
1091                         lastrdata++;
1092
1093                         lastrdata->data = NULL;
1094                         lastrdata->len = 0;
1095                         lastrdata->buffer = buf;        /* backup block 1 */
1096                         lastrdata->buffer_std = true;
1097                 }
1098
1099                 /*
1100                  * Log the contents of the right page in the format understood by
1101                  * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
1102                  * because we're going to recreate the whole page anyway, so it should
1103                  * never be stored by XLogInsert.
1104                  *
1105                  * Direct access to page is not good but faster - we should implement
1106                  * some new func in page API.  Note we only store the tuples
1107                  * themselves, knowing that they were inserted in item-number order
1108                  * and so the item pointers can be reconstructed.  See comments for
1109                  * _bt_restore_page().
1110                  */
1111                 lastrdata->next = lastrdata + 1;
1112                 lastrdata++;
1113
1114                 lastrdata->data = (char *) rightpage +
1115                         ((PageHeader) rightpage)->pd_upper;
1116                 lastrdata->len = ((PageHeader) rightpage)->pd_special -
1117                         ((PageHeader) rightpage)->pd_upper;
1118                 lastrdata->buffer = InvalidBuffer;
1119
1120                 /* Log the right sibling, because we've changed its' prev-pointer. */
1121                 if (!P_RIGHTMOST(ropaque))
1122                 {
1123                         lastrdata->next = lastrdata + 1;
1124                         lastrdata++;
1125
1126                         lastrdata->data = NULL;
1127                         lastrdata->len = 0;
1128                         lastrdata->buffer = sbuf;       /* backup block 2 */
1129                         lastrdata->buffer_std = true;
1130                 }
1131
1132                 lastrdata->next = NULL;
1133
1134                 if (isroot)
1135                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
1136                 else
1137                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
1138
1139                 recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
1140
1141                 PageSetLSN(origpage, recptr);
1142                 PageSetTLI(origpage, ThisTimeLineID);
1143                 PageSetLSN(rightpage, recptr);
1144                 PageSetTLI(rightpage, ThisTimeLineID);
1145                 if (!P_RIGHTMOST(ropaque))
1146                 {
1147                         PageSetLSN(spage, recptr);
1148                         PageSetTLI(spage, ThisTimeLineID);
1149                 }
1150         }
1151
1152         END_CRIT_SECTION();
1153
1154         /* release the old right sibling */
1155         if (!P_RIGHTMOST(ropaque))
1156                 _bt_relbuf(rel, sbuf);
1157
1158         /* split's done */
1159         return rbuf;
1160 }
1161
1162 /*
1163  *      _bt_findsplitloc() -- find an appropriate place to split a page.
1164  *
1165  * The idea here is to equalize the free space that will be on each split
1166  * page, *after accounting for the inserted tuple*.  (If we fail to account
1167  * for it, we might find ourselves with too little room on the page that
1168  * it needs to go into!)
1169  *
1170  * If the page is the rightmost page on its level, we instead try to arrange
1171  * to leave the left split page fillfactor% full.  In this way, when we are
1172  * inserting successively increasing keys (consider sequences, timestamps,
1173  * etc) we will end up with a tree whose pages are about fillfactor% full,
1174  * instead of the 50% full result that we'd get without this special case.
1175  * This is the same as nbtsort.c produces for a newly-created tree.  Note
1176  * that leaf and nonleaf pages use different fillfactors.
1177  *
1178  * We are passed the intended insert position of the new tuple, expressed as
1179  * the offsetnumber of the tuple it must go in front of.  (This could be
1180  * maxoff+1 if the tuple is to go at the end.)
1181  *
1182  * We return the index of the first existing tuple that should go on the
1183  * righthand page, plus a boolean indicating whether the new tuple goes on
1184  * the left or right page.      The bool is necessary to disambiguate the case
1185  * where firstright == newitemoff.
1186  */
1187 static OffsetNumber
1188 _bt_findsplitloc(Relation rel,
1189                                  Page page,
1190                                  OffsetNumber newitemoff,
1191                                  Size newitemsz,
1192                                  bool *newitemonleft)
1193 {
1194         BTPageOpaque opaque;
1195         OffsetNumber offnum;
1196         OffsetNumber maxoff;
1197         ItemId          itemid;
1198         FindSplitData state;
1199         int                     leftspace,
1200                                 rightspace,
1201                                 goodenough,
1202                                 olddataitemstotal,
1203                                 olddataitemstoleft;
1204         bool            goodenoughfound;
1205
1206         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1207
1208         /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
1209         newitemsz += sizeof(ItemIdData);
1210
1211         /* Total free space available on a btree page, after fixed overhead */
1212         leftspace = rightspace =
1213                 PageGetPageSize(page) - SizeOfPageHeaderData -
1214                 MAXALIGN(sizeof(BTPageOpaqueData));
1215
1216         /* The right page will have the same high key as the old page */
1217         if (!P_RIGHTMOST(opaque))
1218         {
1219                 itemid = PageGetItemId(page, P_HIKEY);
1220                 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1221                                                          sizeof(ItemIdData));
1222         }
1223
1224         /* Count up total space in data items without actually scanning 'em */
1225         olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page);
1226
1227         state.newitemsz = newitemsz;
1228         state.is_leaf = P_ISLEAF(opaque);
1229         state.is_rightmost = P_RIGHTMOST(opaque);
1230         state.have_split = false;
1231         if (state.is_leaf)
1232                 state.fillfactor = RelationGetFillFactor(rel,
1233                                                                                                  BTREE_DEFAULT_FILLFACTOR);
1234         else
1235                 state.fillfactor = BTREE_NONLEAF_FILLFACTOR;
1236         state.newitemonleft = false;    /* these just to keep compiler quiet */
1237         state.firstright = 0;
1238         state.best_delta = 0;
1239         state.leftspace = leftspace;
1240         state.rightspace = rightspace;
1241         state.olddataitemstotal = olddataitemstotal;
1242         state.newitemoff = newitemoff;
1243
1244         /*
1245          * Finding the best possible split would require checking all the possible
1246          * split points, because of the high-key and left-key special cases.
1247          * That's probably more work than it's worth; instead, stop as soon as we
1248          * find a "good-enough" split, where good-enough is defined as an
1249          * imbalance in free space of no more than pagesize/16 (arbitrary...) This
1250          * should let us stop near the middle on most pages, instead of plowing to
1251          * the end.
1252          */
1253         goodenough = leftspace / 16;
1254
1255         /*
1256          * Scan through the data items and calculate space usage for a split at
1257          * each possible position.
1258          */
1259         olddataitemstoleft = 0;
1260         goodenoughfound = false;
1261         maxoff = PageGetMaxOffsetNumber(page);
1262
1263         for (offnum = P_FIRSTDATAKEY(opaque);
1264                  offnum <= maxoff;
1265                  offnum = OffsetNumberNext(offnum))
1266         {
1267                 Size            itemsz;
1268
1269                 itemid = PageGetItemId(page, offnum);
1270                 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1271
1272                 /*
1273                  * Will the new item go to left or right of split?
1274                  */
1275                 if (offnum > newitemoff)
1276                         _bt_checksplitloc(&state, offnum, true,
1277                                                           olddataitemstoleft, itemsz);
1278
1279                 else if (offnum < newitemoff)
1280                         _bt_checksplitloc(&state, offnum, false,
1281                                                           olddataitemstoleft, itemsz);
1282                 else
1283                 {
1284                         /* need to try it both ways! */
1285                         _bt_checksplitloc(&state, offnum, true,
1286                                                           olddataitemstoleft, itemsz);
1287
1288                         _bt_checksplitloc(&state, offnum, false,
1289                                                           olddataitemstoleft, itemsz);
1290                 }
1291
1292                 /* Abort scan once we find a good-enough choice */
1293                 if (state.have_split && state.best_delta <= goodenough)
1294                 {
1295                         goodenoughfound = true;
1296                         break;
1297                 }
1298
1299                 olddataitemstoleft += itemsz;
1300         }
1301
1302         /*
1303          * If the new item goes as the last item, check for splitting so that all
1304          * the old items go to the left page and the new item goes to the right
1305          * page.
1306          */
1307         if (newitemoff > maxoff && !goodenoughfound)
1308                 _bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0);
1309
1310         /*
1311          * I believe it is not possible to fail to find a feasible split, but just
1312          * in case ...
1313          */
1314         if (!state.have_split)
1315                 elog(ERROR, "could not find a feasible split point for index \"%s\"",
1316                          RelationGetRelationName(rel));
1317
1318         *newitemonleft = state.newitemonleft;
1319         return state.firstright;
1320 }
1321
1322 /*
1323  * Subroutine to analyze a particular possible split choice (ie, firstright
1324  * and newitemonleft settings), and record the best split so far in *state.
1325  *
1326  * firstoldonright is the offset of the first item on the original page
1327  * that goes to the right page, and firstoldonrightsz is the size of that
1328  * tuple. firstoldonright can be > max offset, which means that all the old
1329  * items go to the left page and only the new item goes to the right page.
1330  * In that case, firstoldonrightsz is not used.
1331  *
1332  * olddataitemstoleft is the total size of all old items to the left of
1333  * firstoldonright.
1334  */
1335 static void
1336 _bt_checksplitloc(FindSplitData *state,
1337                                   OffsetNumber firstoldonright,
1338                                   bool newitemonleft,
1339                                   int olddataitemstoleft,
1340                                   Size firstoldonrightsz)
1341 {
1342         int                     leftfree,
1343                                 rightfree;
1344         Size            firstrightitemsz;
1345         bool            newitemisfirstonright;
1346
1347         /* Is the new item going to be the first item on the right page? */
1348         newitemisfirstonright = (firstoldonright == state->newitemoff
1349                                                          && !newitemonleft);
1350
1351         if (newitemisfirstonright)
1352                 firstrightitemsz = state->newitemsz;
1353         else
1354                 firstrightitemsz = firstoldonrightsz;
1355
1356         /* Account for all the old tuples */
1357         leftfree = state->leftspace - olddataitemstoleft;
1358         rightfree = state->rightspace -
1359                 (state->olddataitemstotal - olddataitemstoleft);
1360
1361         /*
1362          * The first item on the right page becomes the high key of the left page;
1363          * therefore it counts against left space as well as right space.
1364          */
1365         leftfree -= firstrightitemsz;
1366
1367         /* account for the new item */
1368         if (newitemonleft)
1369                 leftfree -= (int) state->newitemsz;
1370         else
1371                 rightfree -= (int) state->newitemsz;
1372
1373         /*
1374          * If we are not on the leaf level, we will be able to discard the key
1375          * data from the first item that winds up on the right page.
1376          */
1377         if (!state->is_leaf)
1378                 rightfree += (int) firstrightitemsz -
1379                         (int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData));
1380
1381         /*
1382          * If feasible split point, remember best delta.
1383          */
1384         if (leftfree >= 0 && rightfree >= 0)
1385         {
1386                 int                     delta;
1387
1388                 if (state->is_rightmost)
1389                 {
1390                         /*
1391                          * If splitting a rightmost page, try to put (100-fillfactor)% of
1392                          * free space on left page. See comments for _bt_findsplitloc.
1393                          */
1394                         delta = (state->fillfactor * leftfree)
1395                                 - ((100 - state->fillfactor) * rightfree);
1396                 }
1397                 else
1398                 {
1399                         /* Otherwise, aim for equal free space on both sides */
1400                         delta = leftfree - rightfree;
1401                 }
1402
1403                 if (delta < 0)
1404                         delta = -delta;
1405                 if (!state->have_split || delta < state->best_delta)
1406                 {
1407                         state->have_split = true;
1408                         state->newitemonleft = newitemonleft;
1409                         state->firstright = firstoldonright;
1410                         state->best_delta = delta;
1411                 }
1412         }
1413 }
1414
1415 /*
1416  * _bt_insert_parent() -- Insert downlink into parent after a page split.
1417  *
1418  * On entry, buf and rbuf are the left and right split pages, which we
1419  * still hold write locks on per the L&Y algorithm.  We release the
1420  * write locks once we have write lock on the parent page.      (Any sooner,
1421  * and it'd be possible for some other process to try to split or delete
1422  * one of these pages, and get confused because it cannot find the downlink.)
1423  *
1424  * stack - stack showing how we got here.  May be NULL in cases that don't
1425  *                      have to be efficient (concurrent ROOT split, WAL recovery)
1426  * is_root - we split the true root
1427  * is_only - we split a page alone on its level (might have been fast root)
1428  *
1429  * This is exported so it can be called by nbtxlog.c.
1430  */
1431 void
1432 _bt_insert_parent(Relation rel,
1433                                   Buffer buf,
1434                                   Buffer rbuf,
1435                                   BTStack stack,
1436                                   bool is_root,
1437                                   bool is_only)
1438 {
1439         /*
1440          * Here we have to do something Lehman and Yao don't talk about: deal with
1441          * a root split and construction of a new root.  If our stack is empty
1442          * then we have just split a node on what had been the root level when we
1443          * descended the tree.  If it was still the root then we perform a
1444          * new-root construction.  If it *wasn't* the root anymore, search to find
1445          * the next higher level that someone constructed meanwhile, and find the
1446          * right place to insert as for the normal case.
1447          *
1448          * If we have to search for the parent level, we do so by re-descending
1449          * from the root.  This is not super-efficient, but it's rare enough not
1450          * to matter.  (This path is also taken when called from WAL recovery ---
1451          * we have no stack in that case.)
1452          */
1453         if (is_root)
1454         {
1455                 Buffer          rootbuf;
1456
1457                 Assert(stack == NULL);
1458                 Assert(is_only);
1459                 /* create a new root node and update the metapage */
1460                 rootbuf = _bt_newroot(rel, buf, rbuf);
1461                 /* release the split buffers */
1462                 _bt_relbuf(rel, rootbuf);
1463                 _bt_relbuf(rel, rbuf);
1464                 _bt_relbuf(rel, buf);
1465         }
1466         else
1467         {
1468                 BlockNumber bknum = BufferGetBlockNumber(buf);
1469                 BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1470                 Page            page = BufferGetPage(buf);
1471                 IndexTuple      new_item;
1472                 BTStackData fakestack;
1473                 IndexTuple      ritem;
1474                 Buffer          pbuf;
1475
1476                 if (stack == NULL)
1477                 {
1478                         BTPageOpaque lpageop;
1479
1480                         if (!InRecovery)
1481                                 elog(DEBUG2, "concurrent ROOT page split");
1482                         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1483                         /* Find the leftmost page at the next level up */
1484                         pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
1485                         /* Set up a phony stack entry pointing there */
1486                         stack = &fakestack;
1487                         stack->bts_blkno = BufferGetBlockNumber(pbuf);
1488                         stack->bts_offset = InvalidOffsetNumber;
1489                         /* bts_btentry will be initialized below */
1490                         stack->bts_parent = NULL;
1491                         _bt_relbuf(rel, pbuf);
1492                 }
1493
1494                 /* get high key from left page == lowest key on new right page */
1495                 ritem = (IndexTuple) PageGetItem(page,
1496                                                                                  PageGetItemId(page, P_HIKEY));
1497
1498                 /* form an index tuple that points at the new right page */
1499                 new_item = CopyIndexTuple(ritem);
1500                 ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
1501
1502                 /*
1503                  * Find the parent buffer and get the parent page.
1504                  *
1505                  * Oops - if we were moved right then we need to change stack item! We
1506                  * want to find parent pointing to where we are, right ?        - vadim
1507                  * 05/27/97
1508                  */
1509                 ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
1510
1511                 pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1512
1513                 /* Now we can unlock the children */
1514                 _bt_relbuf(rel, rbuf);
1515                 _bt_relbuf(rel, buf);
1516
1517                 /* Check for error only after writing children */
1518                 if (pbuf == InvalidBuffer)
1519                         elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
1520                                  RelationGetRelationName(rel), bknum, rbknum);
1521
1522                 /* Recursively update the parent */
1523                 _bt_insertonpg(rel, pbuf, stack->bts_parent,
1524                                            new_item, stack->bts_offset + 1,
1525                                            is_only);
1526
1527                 /* be tidy */
1528                 pfree(new_item);
1529         }
1530 }
1531
1532 /*
1533  *      _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1534  *                                               we last looked at in the parent.
1535  *
1536  *              This is possible because we save the downlink from the parent item,
1537  *              which is enough to uniquely identify it.  Insertions into the parent
1538  *              level could cause the item to move right; deletions could cause it
1539  *              to move left, but not left of the page we previously found it in.
1540  *
1541  *              Adjusts bts_blkno & bts_offset if changed.
1542  *
1543  *              Returns InvalidBuffer if item not found (should not happen).
1544  */
1545 Buffer
1546 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1547 {
1548         BlockNumber blkno;
1549         OffsetNumber start;
1550
1551         blkno = stack->bts_blkno;
1552         start = stack->bts_offset;
1553
1554         for (;;)
1555         {
1556                 Buffer          buf;
1557                 Page            page;
1558                 BTPageOpaque opaque;
1559
1560                 buf = _bt_getbuf(rel, blkno, access);
1561                 page = BufferGetPage(buf);
1562                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1563
1564                 if (!P_IGNORE(opaque))
1565                 {
1566                         OffsetNumber offnum,
1567                                                 minoff,
1568                                                 maxoff;
1569                         ItemId          itemid;
1570                         IndexTuple      item;
1571
1572                         minoff = P_FIRSTDATAKEY(opaque);
1573                         maxoff = PageGetMaxOffsetNumber(page);
1574
1575                         /*
1576                          * start = InvalidOffsetNumber means "search the whole page". We
1577                          * need this test anyway due to possibility that page has a high
1578                          * key now when it didn't before.
1579                          */
1580                         if (start < minoff)
1581                                 start = minoff;
1582
1583                         /*
1584                          * Need this check too, to guard against possibility that page
1585                          * split since we visited it originally.
1586                          */
1587                         if (start > maxoff)
1588                                 start = OffsetNumberNext(maxoff);
1589
1590                         /*
1591                          * These loops will check every item on the page --- but in an
1592                          * order that's attuned to the probability of where it actually
1593                          * is.  Scan to the right first, then to the left.
1594                          */
1595                         for (offnum = start;
1596                                  offnum <= maxoff;
1597                                  offnum = OffsetNumberNext(offnum))
1598                         {
1599                                 itemid = PageGetItemId(page, offnum);
1600                                 item = (IndexTuple) PageGetItem(page, itemid);
1601                                 if (BTEntrySame(item, &stack->bts_btentry))
1602                                 {
1603                                         /* Return accurate pointer to where link is now */
1604                                         stack->bts_blkno = blkno;
1605                                         stack->bts_offset = offnum;
1606                                         return buf;
1607                                 }
1608                         }
1609
1610                         for (offnum = OffsetNumberPrev(start);
1611                                  offnum >= minoff;
1612                                  offnum = OffsetNumberPrev(offnum))
1613                         {
1614                                 itemid = PageGetItemId(page, offnum);
1615                                 item = (IndexTuple) PageGetItem(page, itemid);
1616                                 if (BTEntrySame(item, &stack->bts_btentry))
1617                                 {
1618                                         /* Return accurate pointer to where link is now */
1619                                         stack->bts_blkno = blkno;
1620                                         stack->bts_offset = offnum;
1621                                         return buf;
1622                                 }
1623                         }
1624                 }
1625
1626                 /*
1627                  * The item we're looking for moved right at least one page.
1628                  */
1629                 if (P_RIGHTMOST(opaque))
1630                 {
1631                         _bt_relbuf(rel, buf);
1632                         return InvalidBuffer;
1633                 }
1634                 blkno = opaque->btpo_next;
1635                 start = InvalidOffsetNumber;
1636                 _bt_relbuf(rel, buf);
1637         }
1638 }
1639
1640 /*
1641  *      _bt_newroot() -- Create a new root page for the index.
1642  *
1643  *              We've just split the old root page and need to create a new one.
1644  *              In order to do this, we add a new root page to the file, then lock
1645  *              the metadata page and update it.  This is guaranteed to be deadlock-
1646  *              free, because all readers release their locks on the metadata page
1647  *              before trying to lock the root, and all writers lock the root before
1648  *              trying to lock the metadata page.  We have a write lock on the old
1649  *              root page, so we have not introduced any cycles into the waits-for
1650  *              graph.
1651  *
1652  *              On entry, lbuf (the old root) and rbuf (its new peer) are write-
1653  *              locked. On exit, a new root page exists with entries for the
1654  *              two new children, metapage is updated and unlocked/unpinned.
1655  *              The new root buffer is returned to caller which has to unlock/unpin
1656  *              lbuf, rbuf & rootbuf.
1657  */
1658 static Buffer
1659 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1660 {
1661         Buffer          rootbuf;
1662         Page            lpage,
1663                                 rootpage;
1664         BlockNumber lbkno,
1665                                 rbkno;
1666         BlockNumber rootblknum;
1667         BTPageOpaque rootopaque;
1668         ItemId          itemid;
1669         IndexTuple      item;
1670         Size            itemsz;
1671         IndexTuple      new_item;
1672         Buffer          metabuf;
1673         Page            metapg;
1674         BTMetaPageData *metad;
1675
1676         lbkno = BufferGetBlockNumber(lbuf);
1677         rbkno = BufferGetBlockNumber(rbuf);
1678         lpage = BufferGetPage(lbuf);
1679
1680         /* get a new root page */
1681         rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1682         rootpage = BufferGetPage(rootbuf);
1683         rootblknum = BufferGetBlockNumber(rootbuf);
1684
1685         /* acquire lock on the metapage */
1686         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1687         metapg = BufferGetPage(metabuf);
1688         metad = BTPageGetMeta(metapg);
1689
1690         /* NO EREPORT(ERROR) from here till newroot op is logged */
1691         START_CRIT_SECTION();
1692
1693         /* set btree special data */
1694         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1695         rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1696         rootopaque->btpo_flags = BTP_ROOT;
1697         rootopaque->btpo.level =
1698                 ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1699         rootopaque->btpo_cycleid = 0;
1700
1701         /* update metapage data */
1702         metad->btm_root = rootblknum;
1703         metad->btm_level = rootopaque->btpo.level;
1704         metad->btm_fastroot = rootblknum;
1705         metad->btm_fastlevel = rootopaque->btpo.level;
1706
1707         /*
1708          * Create downlink item for left page (old root).  Since this will be the
1709          * first item in a non-leaf page, it implicitly has minus-infinity key
1710          * value, so we need not store any actual key in it.
1711          */
1712         itemsz = sizeof(IndexTupleData);
1713         new_item = (IndexTuple) palloc(itemsz);
1714         new_item->t_info = itemsz;
1715         ItemPointerSet(&(new_item->t_tid), lbkno, P_HIKEY);
1716
1717         /*
1718          * Insert the left page pointer into the new root page.  The root page is
1719          * the rightmost page on its level so there is no "high key" in it; the
1720          * two items will go into positions P_HIKEY and P_FIRSTKEY.
1721          *
1722          * Note: we *must* insert the two items in item-number order, for the
1723          * benefit of _bt_restore_page().
1724          */
1725         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY,
1726                                         false, false) == InvalidOffsetNumber)
1727                 elog(PANIC, "failed to add leftkey to new root page"
1728                          " while splitting block %u of index \"%s\"",
1729                          BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
1730         pfree(new_item);
1731
1732         /*
1733          * Create downlink item for right page.  The key for it is obtained from
1734          * the "high key" position in the left page.
1735          */
1736         itemid = PageGetItemId(lpage, P_HIKEY);
1737         itemsz = ItemIdGetLength(itemid);
1738         item = (IndexTuple) PageGetItem(lpage, itemid);
1739         new_item = CopyIndexTuple(item);
1740         ItemPointerSet(&(new_item->t_tid), rbkno, P_HIKEY);
1741
1742         /*
1743          * insert the right page pointer into the new root page.
1744          */
1745         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY,
1746                                         false, false) == InvalidOffsetNumber)
1747                 elog(PANIC, "failed to add rightkey to new root page"
1748                          " while splitting block %u of index \"%s\"",
1749                          BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
1750         pfree(new_item);
1751
1752         MarkBufferDirty(rootbuf);
1753         MarkBufferDirty(metabuf);
1754
1755         /* XLOG stuff */
1756         if (!rel->rd_istemp)
1757         {
1758                 xl_btree_newroot xlrec;
1759                 XLogRecPtr      recptr;
1760                 XLogRecData rdata[2];
1761
1762                 xlrec.node = rel->rd_node;
1763                 xlrec.rootblk = rootblknum;
1764                 xlrec.level = metad->btm_level;
1765
1766                 rdata[0].data = (char *) &xlrec;
1767                 rdata[0].len = SizeOfBtreeNewroot;
1768                 rdata[0].buffer = InvalidBuffer;
1769                 rdata[0].next = &(rdata[1]);
1770
1771                 /*
1772                  * Direct access to page is not good but faster - we should implement
1773                  * some new func in page API.
1774                  */
1775                 rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
1776                 rdata[1].len = ((PageHeader) rootpage)->pd_special -
1777                         ((PageHeader) rootpage)->pd_upper;
1778                 rdata[1].buffer = InvalidBuffer;
1779                 rdata[1].next = NULL;
1780
1781                 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
1782
1783                 PageSetLSN(rootpage, recptr);
1784                 PageSetTLI(rootpage, ThisTimeLineID);
1785                 PageSetLSN(metapg, recptr);
1786                 PageSetTLI(metapg, ThisTimeLineID);
1787         }
1788
1789         END_CRIT_SECTION();
1790
1791         /* send out relcache inval for metapage change */
1792         CacheInvalidateRelcache(rel);
1793
1794         /* done with metapage */
1795         _bt_relbuf(rel, metabuf);
1796
1797         return rootbuf;
1798 }
1799
1800 /*
1801  *      _bt_pgaddtup() -- add a tuple to a particular page in the index.
1802  *
1803  *              This routine adds the tuple to the page as requested.  It does
1804  *              not affect pin/lock status, but you'd better have a write lock
1805  *              and pin on the target buffer!  Don't forget to write and release
1806  *              the buffer afterwards, either.
1807  *
1808  *              The main difference between this routine and a bare PageAddItem call
1809  *              is that this code knows that the leftmost index tuple on a non-leaf
1810  *              btree page doesn't need to have a key.  Therefore, it strips such
1811  *              tuples down to just the tuple header.  CAUTION: this works ONLY if
1812  *              we insert the tuples in order, so that the given itup_off does
1813  *              represent the final position of the tuple!
1814  */
1815 static void
1816 _bt_pgaddtup(Relation rel,
1817                          Page page,
1818                          Size itemsize,
1819                          IndexTuple itup,
1820                          OffsetNumber itup_off,
1821                          const char *where)
1822 {
1823         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1824         IndexTupleData trunctuple;
1825
1826         if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
1827         {
1828                 trunctuple = *itup;
1829                 trunctuple.t_info = sizeof(IndexTupleData);
1830                 itup = &trunctuple;
1831                 itemsize = sizeof(IndexTupleData);
1832         }
1833
1834         if (PageAddItem(page, (Item) itup, itemsize, itup_off,
1835                                         false, false) == InvalidOffsetNumber)
1836                 elog(PANIC, "failed to add item to the %s in index \"%s\"",
1837                          where, RelationGetRelationName(rel));
1838 }
1839
1840 /*
1841  * _bt_isequal - used in _bt_doinsert in check for duplicates.
1842  *
1843  * This is very similar to _bt_compare, except for NULL handling.
1844  * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
1845  */
1846 static bool
1847 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
1848                         int keysz, ScanKey scankey)
1849 {
1850         IndexTuple      itup;
1851         int                     i;
1852
1853         /* Better be comparing to a leaf item */
1854         Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
1855
1856         itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
1857
1858         for (i = 1; i <= keysz; i++)
1859         {
1860                 AttrNumber      attno;
1861                 Datum           datum;
1862                 bool            isNull;
1863                 int32           result;
1864
1865                 attno = scankey->sk_attno;
1866                 Assert(attno == i);
1867                 datum = index_getattr(itup, attno, itupdesc, &isNull);
1868
1869                 /* NULLs are never equal to anything */
1870                 if (isNull || (scankey->sk_flags & SK_ISNULL))
1871                         return false;
1872
1873                 result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
1874                                                                                          datum,
1875                                                                                          scankey->sk_argument));
1876
1877                 if (result != 0)
1878                         return false;
1879
1880                 scankey++;
1881         }
1882
1883         /* if we get here, the keys are equal */
1884         return true;
1885 }
1886
1887 /*
1888  * _bt_vacuum_one_page - vacuum just one index page.
1889  *
1890  * Try to remove LP_DEAD items from the given page.  The passed buffer
1891  * must be exclusive-locked, but unlike a real VACUUM, we don't need a
1892  * super-exclusive "cleanup" lock (see nbtree/README).
1893  */
1894 static void
1895 _bt_vacuum_one_page(Relation rel, Buffer buffer)
1896 {
1897         OffsetNumber deletable[MaxOffsetNumber];
1898         int                     ndeletable = 0;
1899         OffsetNumber offnum,
1900                                 minoff,
1901                                 maxoff;
1902         Page            page = BufferGetPage(buffer);
1903         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1904
1905         /*
1906          * Scan over all items to see which ones need to be deleted according to
1907          * LP_DEAD flags.
1908          */
1909         minoff = P_FIRSTDATAKEY(opaque);
1910         maxoff = PageGetMaxOffsetNumber(page);
1911         for (offnum = minoff;
1912                  offnum <= maxoff;
1913                  offnum = OffsetNumberNext(offnum))
1914         {
1915                 ItemId          itemId = PageGetItemId(page, offnum);
1916
1917                 if (ItemIdIsDead(itemId))
1918                         deletable[ndeletable++] = offnum;
1919         }
1920
1921         if (ndeletable > 0)
1922                 _bt_delitems(rel, buffer, deletable, ndeletable);
1923
1924         /*
1925          * Note: if we didn't find any LP_DEAD items, then the page's
1926          * BTP_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
1927          * separate write to clear it, however.  We will clear it when we split
1928          * the page.
1929          */
1930 }