]> granicus.if.org Git - postgresql/blob - src/backend/access/nbtree/nbtinsert.c
01e1ea494489cb69bd1f8d0e5c3c0e884be0d541
[postgresql] / src / backend / access / nbtree / nbtinsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtinsert.c
4  *        Item insertion in Lehman and Yao btrees for Postgres.
5  *
6  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.167 2008/06/11 08:38:56 heikki Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "access/transam.h"
21 #include "miscadmin.h"
22 #include "storage/bufmgr.h"
23 #include "storage/lmgr.h"
24 #include "utils/inval.h"
25 #include "utils/tqual.h"
26
27
28 typedef struct
29 {
30         /* context data for _bt_checksplitloc */
31         Size            newitemsz;              /* size of new item to be inserted */
32         int                     fillfactor;             /* needed when splitting rightmost page */
33         bool            is_leaf;                /* T if splitting a leaf page */
34         bool            is_rightmost;   /* T if splitting a rightmost page */
35         OffsetNumber newitemoff;        /* where the new item is to be inserted */
36         int                     leftspace;              /* space available for items on left page */
37         int                     rightspace;             /* space available for items on right page */
38         int                     olddataitemstotal;              /* space taken by old items */
39
40         bool            have_split;             /* found a valid split? */
41
42         /* these fields valid only if have_split is true */
43         bool            newitemonleft;  /* new item on left or right of best split */
44         OffsetNumber firstright;        /* best split point */
45         int                     best_delta;             /* best size delta so far */
46 } FindSplitData;
47
48
49 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
50
51 static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
52                                  Relation heapRel, Buffer buf, OffsetNumber ioffset,
53                                  ScanKey itup_scankey);
54 static void _bt_findinsertloc(Relation rel,
55                                   Buffer *bufptr,
56                                   OffsetNumber *offsetptr,
57                                   int keysz,
58                                   ScanKey scankey,
59                                   IndexTuple newtup);
60 static void _bt_insertonpg(Relation rel, Buffer buf,
61                            BTStack stack,
62                            IndexTuple itup,
63                            OffsetNumber newitemoff,
64                            bool split_only_page);
65 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
66                   OffsetNumber newitemoff, Size newitemsz,
67                   IndexTuple newitem, bool newitemonleft);
68 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
69                                  OffsetNumber newitemoff,
70                                  Size newitemsz,
71                                  bool *newitemonleft);
72 static void _bt_checksplitloc(FindSplitData *state,
73                                   OffsetNumber firstoldonright, bool newitemonleft,
74                                   int dataitemstoleft, Size firstoldonrightsz);
75 static void _bt_pgaddtup(Relation rel, Page page,
76                          Size itemsize, IndexTuple itup,
77                          OffsetNumber itup_off, const char *where);
78 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
79                         int keysz, ScanKey scankey);
80 static void _bt_vacuum_one_page(Relation rel, Buffer buffer);
81
82
83 /*
84  *      _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
85  *
86  *              This routine is called by the public interface routines, btbuild
87  *              and btinsert.  By here, itup is filled in, including the TID.
88  */
89 void
90 _bt_doinsert(Relation rel, IndexTuple itup,
91                          bool index_is_unique, Relation heapRel)
92 {
93         int                     natts = rel->rd_rel->relnatts;
94         ScanKey         itup_scankey;
95         BTStack         stack;
96         Buffer          buf;
97         OffsetNumber offset;
98
99         /* we need an insertion scan key to do our search, so build one */
100         itup_scankey = _bt_mkscankey(rel, itup);
101
102 top:
103         /* find the first page containing this key */
104         stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
105
106         offset = InvalidOffsetNumber;
107
108         /* trade in our read lock for a write lock */
109         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
110         LockBuffer(buf, BT_WRITE);
111
112         /*
113          * If the page was split between the time that we surrendered our read
114          * lock and acquired our write lock, then this page may no longer be the
115          * right place for the key we want to insert.  In this case, we need to
116          * move right in the tree.      See Lehman and Yao for an excruciatingly
117          * precise description.
118          */
119         buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
120
121         /*
122          * If we're not allowing duplicates, make sure the key isn't already in
123          * the index.
124          *
125          * NOTE: obviously, _bt_check_unique can only detect keys that are already
126          * in the index; so it cannot defend against concurrent insertions of the
127          * same key.  We protect against that by means of holding a write lock on
128          * the target page.  Any other would-be inserter of the same key must
129          * acquire a write lock on the same target page, so only one would-be
130          * inserter can be making the check at one time.  Furthermore, once we are
131          * past the check we hold write locks continuously until we have performed
132          * our insertion, so no later inserter can fail to see our insertion.
133          * (This requires some care in _bt_insertonpg.)
134          *
135          * If we must wait for another xact, we release the lock while waiting,
136          * and then must start over completely.
137          */
138         if (index_is_unique)
139         {
140                 TransactionId xwait;
141
142                 offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
143                 xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey);
144
145                 if (TransactionIdIsValid(xwait))
146                 {
147                         /* Have to wait for the other guy ... */
148                         _bt_relbuf(rel, buf);
149                         XactLockTableWait(xwait);
150                         /* start over... */
151                         _bt_freestack(stack);
152                         goto top;
153                 }
154         }
155
156         /* do the insertion */
157         _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup);
158         _bt_insertonpg(rel, buf, stack, itup, offset, false);
159
160         /* be tidy */
161         _bt_freestack(stack);
162         _bt_freeskey(itup_scankey);
163 }
164
165 /*
166  *      _bt_check_unique() -- Check for violation of unique index constraint
167  *
168  * offset points to the first possible item that could conflict. It can
169  * also point to end-of-page, which means that the first tuple to check
170  * is the first tuple on the next page.
171  *
172  * Returns InvalidTransactionId if there is no conflict, else an xact ID
173  * we must wait for to see if it commits a conflicting tuple.   If an actual
174  * conflict is detected, no return --- just ereport().
175  */
176 static TransactionId
177 _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
178                                  Buffer buf, OffsetNumber offset, ScanKey itup_scankey)
179 {
180         TupleDesc       itupdesc = RelationGetDescr(rel);
181         int                     natts = rel->rd_rel->relnatts;
182         SnapshotData SnapshotDirty;
183         OffsetNumber maxoff;
184         Page            page;
185         BTPageOpaque opaque;
186         Buffer          nbuf = InvalidBuffer;
187
188         InitDirtySnapshot(SnapshotDirty);
189
190         page = BufferGetPage(buf);
191         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
192         maxoff = PageGetMaxOffsetNumber(page);
193
194         /*
195          * Scan over all equal tuples, looking for live conflicts.
196          */
197         for (;;)
198         {
199                 ItemId          curitemid;
200                 IndexTuple      curitup;
201                 BlockNumber nblkno;
202
203                 /*
204                  * make sure the offset points to an actual item before trying to
205                  * examine it...
206                  */
207                 if (offset <= maxoff)
208                 {
209                         curitemid = PageGetItemId(page, offset);
210
211                         /*
212                          * We can skip items that are marked killed.
213                          *
214                          * Formerly, we applied _bt_isequal() before checking the kill
215                          * flag, so as to fall out of the item loop as soon as possible.
216                          * However, in the presence of heavy update activity an index may
217                          * contain many killed items with the same key; running
218                          * _bt_isequal() on each killed item gets expensive. Furthermore
219                          * it is likely that the non-killed version of each key appears
220                          * first, so that we didn't actually get to exit any sooner
221                          * anyway. So now we just advance over killed items as quickly as
222                          * we can. We only apply _bt_isequal() when we get to a non-killed
223                          * item or the end of the page.
224                          */
225                         if (!ItemIdIsDead(curitemid))
226                         {
227                                 ItemPointerData htid;
228                                 bool            all_dead;
229
230                                 /*
231                                  * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
232                                  * how we handling NULLs - and so we must not use _bt_compare
233                                  * in real comparison, but only for ordering/finding items on
234                                  * pages. - vadim 03/24/97
235                                  */
236                                 if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
237                                         break;          /* we're past all the equal tuples */
238
239                                 /* okay, we gotta fetch the heap tuple ... */
240                                 curitup = (IndexTuple) PageGetItem(page, curitemid);
241                                 htid = curitup->t_tid;
242
243                                 /*
244                                  * We check the whole HOT-chain to see if there is any tuple
245                                  * that satisfies SnapshotDirty.  This is necessary because we
246                                  * have just a single index entry for the entire chain.
247                                  */
248                                 if (heap_hot_search(&htid, heapRel, &SnapshotDirty, &all_dead))
249                                 {
250                                         /* it is a duplicate */
251                                         TransactionId xwait =
252                                         (TransactionIdIsValid(SnapshotDirty.xmin)) ?
253                                         SnapshotDirty.xmin : SnapshotDirty.xmax;
254
255                                         /*
256                                          * If this tuple is being updated by other transaction
257                                          * then we have to wait for its commit/abort.
258                                          */
259                                         if (TransactionIdIsValid(xwait))
260                                         {
261                                                 if (nbuf != InvalidBuffer)
262                                                         _bt_relbuf(rel, nbuf);
263                                                 /* Tell _bt_doinsert to wait... */
264                                                 return xwait;
265                                         }
266
267                                         /*
268                                          * Otherwise we have a definite conflict.  But before
269                                          * complaining, look to see if the tuple we want to insert
270                                          * is itself now committed dead --- if so, don't complain.
271                                          * This is a waste of time in normal scenarios but we must
272                                          * do it to support CREATE INDEX CONCURRENTLY.
273                                          *
274                                          * We must follow HOT-chains here because during
275                                          * concurrent index build, we insert the root TID though
276                                          * the actual tuple may be somewhere in the HOT-chain.
277                                          * While following the chain we might not stop at the
278                                          * exact tuple which triggered the insert, but that's OK
279                                          * because if we find a live tuple anywhere in this chain,
280                                          * we have a unique key conflict.  The other live tuple is
281                                          * not part of this chain because it had a different index
282                                          * entry.
283                                          */
284                                         htid = itup->t_tid;
285                                         if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL))
286                                         {
287                                                 /* Normal case --- it's still live */
288                                         }
289                                         else
290                                         {
291                                                 /*
292                                                  * It's been deleted, so no error, and no need to
293                                                  * continue searching
294                                                  */
295                                                 break;
296                                         }
297
298                                         ereport(ERROR,
299                                                         (errcode(ERRCODE_UNIQUE_VIOLATION),
300                                                          errmsg("duplicate key value violates unique constraint \"%s\"",
301                                                                         RelationGetRelationName(rel))));
302                                 }
303                                 else if (all_dead)
304                                 {
305                                         /*
306                                          * The conflicting tuple (or whole HOT chain) is dead to
307                                          * everyone, so we may as well mark the index entry
308                                          * killed.
309                                          */
310                                         ItemIdMarkDead(curitemid);
311                                         opaque->btpo_flags |= BTP_HAS_GARBAGE;
312                                         /* be sure to mark the proper buffer dirty... */
313                                         if (nbuf != InvalidBuffer)
314                                                 SetBufferCommitInfoNeedsSave(nbuf);
315                                         else
316                                                 SetBufferCommitInfoNeedsSave(buf);
317                                 }
318                         }
319                 }
320
321                 /*
322                  * Advance to next tuple to continue checking.
323                  */
324                 if (offset < maxoff)
325                         offset = OffsetNumberNext(offset);
326                 else
327                 {
328                         /* If scankey == hikey we gotta check the next page too */
329                         if (P_RIGHTMOST(opaque))
330                                 break;
331                         if (!_bt_isequal(itupdesc, page, P_HIKEY,
332                                                          natts, itup_scankey))
333                                 break;
334                         /* Advance to next non-dead page --- there must be one */
335                         for (;;)
336                         {
337                                 nblkno = opaque->btpo_next;
338                                 nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
339                                 page = BufferGetPage(nbuf);
340                                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
341                                 if (!P_IGNORE(opaque))
342                                         break;
343                                 if (P_RIGHTMOST(opaque))
344                                         elog(ERROR, "fell off the end of index \"%s\"",
345                                                  RelationGetRelationName(rel));
346                         }
347                         maxoff = PageGetMaxOffsetNumber(page);
348                         offset = P_FIRSTDATAKEY(opaque);
349                 }
350         }
351
352         if (nbuf != InvalidBuffer)
353                 _bt_relbuf(rel, nbuf);
354
355         return InvalidTransactionId;
356 }
357
358
359 /*
360  *      _bt_findinsertloc() -- Finds an insert location for a tuple
361  *
362  *              If the new key is equal to one or more existing keys, we can
363  *              legitimately place it anywhere in the series of equal keys --- in fact,
364  *              if the new key is equal to the page's "high key" we can place it on
365  *              the next page.  If it is equal to the high key, and there's not room
366  *              to insert the new tuple on the current page without splitting, then
367  *              we can move right hoping to find more free space and avoid a split.
368  *              (We should not move right indefinitely, however, since that leads to
369  *              O(N^2) insertion behavior in the presence of many equal keys.)
370  *              Once we have chosen the page to put the key on, we'll insert it before
371  *              any existing equal keys because of the way _bt_binsrch() works.
372  *
373  *              If there's not enough room in the space, we try to make room by
374  *              removing any LP_DEAD tuples.
375  *
376  *              On entry, *buf and *offsetptr point to the first legal position
377  *              where the new tuple could be inserted.  The caller should hold an
378  *              exclusive lock on *buf.  *offsetptr can also be set to
379  *              InvalidOffsetNumber, in which case the function will search for the
380  *              right location within the page if needed.  On exit, they point to the
381  *              chosen insert location.  If _bt_findinsertloc decides to move right,
382  *              the lock and pin on the original page will be released and the new
383  *              page returned to the caller is exclusively locked instead.
384  *
385  *              newtup is the new tuple we're inserting, and scankey is an insertion
386  *              type scan key for it.
387  */
388 static void
389 _bt_findinsertloc(Relation rel,
390                                   Buffer *bufptr,
391                                   OffsetNumber *offsetptr,
392                                   int keysz,
393                                   ScanKey scankey,
394                                   IndexTuple newtup)
395 {
396         Buffer          buf = *bufptr;
397         Page            page = BufferGetPage(buf);
398         Size            itemsz;
399         BTPageOpaque lpageop;
400         bool            movedright,
401                                 vacuumed;
402         OffsetNumber newitemoff;
403         OffsetNumber firstlegaloff = *offsetptr;
404
405         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
406
407         itemsz = IndexTupleDSize(*newtup);
408         itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but we
409                                                                  * need to be consistent */
410
411         /*
412          * Check whether the item can fit on a btree page at all. (Eventually, we
413          * ought to try to apply TOAST methods if not.) We actually need to be
414          * able to fit three items on every page, so restrict any one item to 1/3
415          * the per-page available space. Note that at this point, itemsz doesn't
416          * include the ItemId.
417          */
418         if (itemsz > BTMaxItemSize(page))
419                 ereport(ERROR,
420                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
421                                  errmsg("index row size %lu exceeds btree maximum, %lu",
422                                                 (unsigned long) itemsz,
423                                                 (unsigned long) BTMaxItemSize(page)),
424                 errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
425                                 "Consider a function index of an MD5 hash of the value, "
426                                 "or use full text indexing.")));
427
428         /*----------
429          * If we will need to split the page to put the item on this page,
430          * check whether we can put the tuple somewhere to the right,
431          * instead.  Keep scanning right until we
432          *              (a) find a page with enough free space,
433          *              (b) reach the last page where the tuple can legally go, or
434          *              (c) get tired of searching.
435          * (c) is not flippant; it is important because if there are many
436          * pages' worth of equal keys, it's better to split one of the early
437          * pages than to scan all the way to the end of the run of equal keys
438          * on every insert.  We implement "get tired" as a random choice,
439          * since stopping after scanning a fixed number of pages wouldn't work
440          * well (we'd never reach the right-hand side of previously split
441          * pages).      Currently the probability of moving right is set at 0.99,
442          * which may seem too high to change the behavior much, but it does an
443          * excellent job of preventing O(N^2) behavior with many equal keys.
444          *----------
445          */
446         movedright = false;
447         vacuumed = false;
448         while (PageGetFreeSpace(page) < itemsz)
449         {
450                 Buffer          rbuf;
451
452                 /*
453                  * before considering moving right, see if we can obtain enough space
454                  * by erasing LP_DEAD items
455                  */
456                 if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
457                 {
458                         _bt_vacuum_one_page(rel, buf);
459
460                         /*
461                          * remember that we vacuumed this page, because that makes the
462                          * hint supplied by the caller invalid
463                          */
464                         vacuumed = true;
465
466                         if (PageGetFreeSpace(page) >= itemsz)
467                                 break;                  /* OK, now we have enough space */
468                 }
469
470                 /*
471                  * nope, so check conditions (b) and (c) enumerated above
472                  */
473                 if (P_RIGHTMOST(lpageop) ||
474                         _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
475                         random() <= (MAX_RANDOM_VALUE / 100))
476                         break;
477
478                 /*
479                  * step right to next non-dead page
480                  *
481                  * must write-lock that page before releasing write lock on current
482                  * page; else someone else's _bt_check_unique scan could fail to see
483                  * our insertion.  write locks on intermediate dead pages won't do
484                  * because we don't know when they will get de-linked from the tree.
485                  */
486                 rbuf = InvalidBuffer;
487
488                 for (;;)
489                 {
490                         BlockNumber rblkno = lpageop->btpo_next;
491
492                         rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
493                         page = BufferGetPage(rbuf);
494                         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
495                         if (!P_IGNORE(lpageop))
496                                 break;
497                         if (P_RIGHTMOST(lpageop))
498                                 elog(ERROR, "fell off the end of index \"%s\"",
499                                          RelationGetRelationName(rel));
500                 }
501                 _bt_relbuf(rel, buf);
502                 buf = rbuf;
503                 movedright = true;
504                 vacuumed = false;
505         }
506
507         /*
508          * Now we are on the right page, so find the insert position. If we moved
509          * right at all, we know we should insert at the start of the page. If we
510          * didn't move right, we can use the firstlegaloff hint if the caller
511          * supplied one, unless we vacuumed the page which might have moved tuples
512          * around making the hint invalid. If we didn't move right or can't use
513          * the hint, find the position by searching.
514          */
515         if (movedright)
516                 newitemoff = P_FIRSTDATAKEY(lpageop);
517         else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
518                 newitemoff = firstlegaloff;
519         else
520                 newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
521
522         *bufptr = buf;
523         *offsetptr = newitemoff;
524 }
525
526 /*----------
527  *      _bt_insertonpg() -- Insert a tuple on a particular page in the index.
528  *
529  *              This recursive procedure does the following things:
530  *
531  *                      +  if necessary, splits the target page (making sure that the
532  *                         split is equitable as far as post-insert free space goes).
533  *                      +  inserts the tuple.
534  *                      +  if the page was split, pops the parent stack, and finds the
535  *                         right place to insert the new child pointer (by walking
536  *                         right using information stored in the parent stack).
537  *                      +  invokes itself with the appropriate tuple for the right
538  *                         child page on the parent.
539  *                      +  updates the metapage if a true root or fast root is split.
540  *
541  *              On entry, we must have the right buffer in which to do the
542  *              insertion, and the buffer must be pinned and write-locked.      On return,
543  *              we will have dropped both the pin and the lock on the buffer.
544  *
545  *              The locking interactions in this code are critical.  You should
546  *              grok Lehman and Yao's paper before making any changes.  In addition,
547  *              you need to understand how we disambiguate duplicate keys in this
548  *              implementation, in order to be able to find our location using
549  *              L&Y "move right" operations.  Since we may insert duplicate user
550  *              keys, and since these dups may propagate up the tree, we use the
551  *              'afteritem' parameter to position ourselves correctly for the
552  *              insertion on internal pages.
553  *----------
554  */
555 static void
556 _bt_insertonpg(Relation rel,
557                            Buffer buf,
558                            BTStack stack,
559                            IndexTuple itup,
560                            OffsetNumber newitemoff,
561                            bool split_only_page)
562 {
563         Page            page;
564         BTPageOpaque lpageop;
565         OffsetNumber firstright = InvalidOffsetNumber;
566         Size            itemsz;
567
568         page = BufferGetPage(buf);
569         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
570
571         itemsz = IndexTupleDSize(*itup);
572         itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but we
573                                                                  * need to be consistent */
574
575         /*
576          * Do we need to split the page to fit the item on it?
577          *
578          * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
579          * so this comparison is correct even though we appear to be accounting
580          * only for the item and not for its line pointer.
581          */
582         if (PageGetFreeSpace(page) < itemsz)
583         {
584                 bool            is_root = P_ISROOT(lpageop);
585                 bool            is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
586                 bool            newitemonleft;
587                 Buffer          rbuf;
588
589                 /* Choose the split point */
590                 firstright = _bt_findsplitloc(rel, page,
591                                                                           newitemoff, itemsz,
592                                                                           &newitemonleft);
593
594                 /* split the buffer into left and right halves */
595                 rbuf = _bt_split(rel, buf, firstright,
596                                                  newitemoff, itemsz, itup, newitemonleft);
597
598                 /*----------
599                  * By here,
600                  *
601                  *              +  our target page has been split;
602                  *              +  the original tuple has been inserted;
603                  *              +  we have write locks on both the old (left half)
604                  *                 and new (right half) buffers, after the split; and
605                  *              +  we know the key we want to insert into the parent
606                  *                 (it's the "high key" on the left child page).
607                  *
608                  * We're ready to do the parent insertion.  We need to hold onto the
609                  * locks for the child pages until we locate the parent, but we can
610                  * release them before doing the actual insertion (see Lehman and Yao
611                  * for the reasoning).
612                  *----------
613                  */
614                 _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
615         }
616         else
617         {
618                 Buffer          metabuf = InvalidBuffer;
619                 Page            metapg = NULL;
620                 BTMetaPageData *metad = NULL;
621                 OffsetNumber itup_off;
622                 BlockNumber itup_blkno;
623
624                 itup_off = newitemoff;
625                 itup_blkno = BufferGetBlockNumber(buf);
626
627                 /*
628                  * If we are doing this insert because we split a page that was the
629                  * only one on its tree level, but was not the root, it may have been
630                  * the "fast root".  We need to ensure that the fast root link points
631                  * at or above the current page.  We can safely acquire a lock on the
632                  * metapage here --- see comments for _bt_newroot().
633                  */
634                 if (split_only_page)
635                 {
636                         Assert(!P_ISLEAF(lpageop));
637
638                         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
639                         metapg = BufferGetPage(metabuf);
640                         metad = BTPageGetMeta(metapg);
641
642                         if (metad->btm_fastlevel >= lpageop->btpo.level)
643                         {
644                                 /* no update wanted */
645                                 _bt_relbuf(rel, metabuf);
646                                 metabuf = InvalidBuffer;
647                         }
648                 }
649
650                 /* Do the update.  No ereport(ERROR) until changes are logged */
651                 START_CRIT_SECTION();
652
653                 _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page");
654
655                 MarkBufferDirty(buf);
656
657                 if (BufferIsValid(metabuf))
658                 {
659                         metad->btm_fastroot = itup_blkno;
660                         metad->btm_fastlevel = lpageop->btpo.level;
661                         MarkBufferDirty(metabuf);
662                 }
663
664                 /* XLOG stuff */
665                 if (!rel->rd_istemp)
666                 {
667                         xl_btree_insert xlrec;
668                         BlockNumber xldownlink;
669                         xl_btree_metadata xlmeta;
670                         uint8           xlinfo;
671                         XLogRecPtr      recptr;
672                         XLogRecData rdata[4];
673                         XLogRecData *nextrdata;
674                         IndexTupleData trunctuple;
675
676                         xlrec.target.node = rel->rd_node;
677                         ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
678
679                         rdata[0].data = (char *) &xlrec;
680                         rdata[0].len = SizeOfBtreeInsert;
681                         rdata[0].buffer = InvalidBuffer;
682                         rdata[0].next = nextrdata = &(rdata[1]);
683
684                         if (P_ISLEAF(lpageop))
685                                 xlinfo = XLOG_BTREE_INSERT_LEAF;
686                         else
687                         {
688                                 xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid));
689                                 Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
690
691                                 nextrdata->data = (char *) &xldownlink;
692                                 nextrdata->len = sizeof(BlockNumber);
693                                 nextrdata->buffer = InvalidBuffer;
694                                 nextrdata->next = nextrdata + 1;
695                                 nextrdata++;
696
697                                 xlinfo = XLOG_BTREE_INSERT_UPPER;
698                         }
699
700                         if (BufferIsValid(metabuf))
701                         {
702                                 xlmeta.root = metad->btm_root;
703                                 xlmeta.level = metad->btm_level;
704                                 xlmeta.fastroot = metad->btm_fastroot;
705                                 xlmeta.fastlevel = metad->btm_fastlevel;
706
707                                 nextrdata->data = (char *) &xlmeta;
708                                 nextrdata->len = sizeof(xl_btree_metadata);
709                                 nextrdata->buffer = InvalidBuffer;
710                                 nextrdata->next = nextrdata + 1;
711                                 nextrdata++;
712
713                                 xlinfo = XLOG_BTREE_INSERT_META;
714                         }
715
716                         /* Read comments in _bt_pgaddtup */
717                         if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
718                         {
719                                 trunctuple = *itup;
720                                 trunctuple.t_info = sizeof(IndexTupleData);
721                                 nextrdata->data = (char *) &trunctuple;
722                                 nextrdata->len = sizeof(IndexTupleData);
723                         }
724                         else
725                         {
726                                 nextrdata->data = (char *) itup;
727                                 nextrdata->len = IndexTupleDSize(*itup);
728                         }
729                         nextrdata->buffer = buf;
730                         nextrdata->buffer_std = true;
731                         nextrdata->next = NULL;
732
733                         recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
734
735                         if (BufferIsValid(metabuf))
736                         {
737                                 PageSetLSN(metapg, recptr);
738                                 PageSetTLI(metapg, ThisTimeLineID);
739                         }
740
741                         PageSetLSN(page, recptr);
742                         PageSetTLI(page, ThisTimeLineID);
743                 }
744
745                 END_CRIT_SECTION();
746
747                 /* release buffers; send out relcache inval if metapage changed */
748                 if (BufferIsValid(metabuf))
749                 {
750                         if (!InRecovery)
751                                 CacheInvalidateRelcache(rel);
752                         _bt_relbuf(rel, metabuf);
753                 }
754
755                 _bt_relbuf(rel, buf);
756         }
757 }
758
759 /*
760  *      _bt_split() -- split a page in the btree.
761  *
762  *              On entry, buf is the page to split, and is pinned and write-locked.
763  *              firstright is the item index of the first item to be moved to the
764  *              new right page.  newitemoff etc. tell us about the new item that
765  *              must be inserted along with the data from the old page.
766  *
767  *              Returns the new right sibling of buf, pinned and write-locked.
768  *              The pin and lock on buf are maintained.
769  */
770 static Buffer
771 _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
772                   OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
773                   bool newitemonleft)
774 {
775         Buffer          rbuf;
776         Page            origpage;
777         Page            leftpage,
778                                 rightpage;
779         BTPageOpaque ropaque,
780                                 lopaque,
781                                 oopaque;
782         Buffer          sbuf = InvalidBuffer;
783         Page            spage = NULL;
784         BTPageOpaque sopaque = NULL;
785         Size            itemsz;
786         ItemId          itemid;
787         IndexTuple      item;
788         OffsetNumber leftoff,
789                                 rightoff;
790         OffsetNumber maxoff;
791         OffsetNumber i;
792         bool            isroot;
793
794         rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
795         origpage = BufferGetPage(buf);
796         leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
797         rightpage = BufferGetPage(rbuf);
798
799         _bt_pageinit(leftpage, BufferGetPageSize(buf));
800         /* rightpage was already initialized by _bt_getbuf */
801
802         /*
803          * Copy the original page's LSN and TLI into leftpage, which will become
804          * the updated version of the page.  We need this because XLogInsert will
805          * examine these fields and possibly dump them in a page image.
806          */
807         PageSetLSN(leftpage, PageGetLSN(origpage));
808         PageSetTLI(leftpage, PageGetTLI(origpage));
809
810         /* init btree private data */
811         oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
812         lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
813         ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
814
815         isroot = P_ISROOT(oopaque);
816
817         /* if we're splitting this page, it won't be the root when we're done */
818         /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
819         lopaque->btpo_flags = oopaque->btpo_flags;
820         lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
821         ropaque->btpo_flags = lopaque->btpo_flags;
822         lopaque->btpo_prev = oopaque->btpo_prev;
823         lopaque->btpo_next = BufferGetBlockNumber(rbuf);
824         ropaque->btpo_prev = BufferGetBlockNumber(buf);
825         ropaque->btpo_next = oopaque->btpo_next;
826         lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
827         /* Since we already have write-lock on both pages, ok to read cycleid */
828         lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
829         ropaque->btpo_cycleid = lopaque->btpo_cycleid;
830
831         /*
832          * If the page we're splitting is not the rightmost page at its level in
833          * the tree, then the first entry on the page is the high key for the
834          * page.  We need to copy that to the right half.  Otherwise (meaning the
835          * rightmost page case), all the items on the right half will be user
836          * data.
837          */
838         rightoff = P_HIKEY;
839
840         if (!P_RIGHTMOST(oopaque))
841         {
842                 itemid = PageGetItemId(origpage, P_HIKEY);
843                 itemsz = ItemIdGetLength(itemid);
844                 item = (IndexTuple) PageGetItem(origpage, itemid);
845                 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
846                                                 false, false) == InvalidOffsetNumber)
847                         elog(PANIC, "failed to add hikey to the right sibling"
848                                  " while splitting block %u of index \"%s\"",
849                                  BufferGetBlockNumber(buf), RelationGetRelationName(rel));
850                 rightoff = OffsetNumberNext(rightoff);
851         }
852
853         /*
854          * The "high key" for the new left page will be the first key that's going
855          * to go into the new right page.  This might be either the existing data
856          * item at position firstright, or the incoming tuple.
857          */
858         leftoff = P_HIKEY;
859         if (!newitemonleft && newitemoff == firstright)
860         {
861                 /* incoming tuple will become first on right page */
862                 itemsz = newitemsz;
863                 item = newitem;
864         }
865         else
866         {
867                 /* existing item at firstright will become first on right page */
868                 itemid = PageGetItemId(origpage, firstright);
869                 itemsz = ItemIdGetLength(itemid);
870                 item = (IndexTuple) PageGetItem(origpage, itemid);
871         }
872         if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
873                                         false, false) == InvalidOffsetNumber)
874                 elog(PANIC, "failed to add hikey to the left sibling"
875                          " while splitting block %u of index \"%s\"",
876                          BufferGetBlockNumber(buf), RelationGetRelationName(rel));
877         leftoff = OffsetNumberNext(leftoff);
878
879         /*
880          * Now transfer all the data items to the appropriate page.
881          *
882          * Note: we *must* insert at least the right page's items in item-number
883          * order, for the benefit of _bt_restore_page().
884          */
885         maxoff = PageGetMaxOffsetNumber(origpage);
886
887         for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
888         {
889                 itemid = PageGetItemId(origpage, i);
890                 itemsz = ItemIdGetLength(itemid);
891                 item = (IndexTuple) PageGetItem(origpage, itemid);
892
893                 /* does new item belong before this one? */
894                 if (i == newitemoff)
895                 {
896                         if (newitemonleft)
897                         {
898                                 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
899                                                          "left sibling");
900                                 leftoff = OffsetNumberNext(leftoff);
901                         }
902                         else
903                         {
904                                 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
905                                                          "right sibling");
906                                 rightoff = OffsetNumberNext(rightoff);
907                         }
908                 }
909
910                 /* decide which page to put it on */
911                 if (i < firstright)
912                 {
913                         _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
914                                                  "left sibling");
915                         leftoff = OffsetNumberNext(leftoff);
916                 }
917                 else
918                 {
919                         _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
920                                                  "right sibling");
921                         rightoff = OffsetNumberNext(rightoff);
922                 }
923         }
924
925         /* cope with possibility that newitem goes at the end */
926         if (i <= newitemoff)
927         {
928                 /*
929                  * Can't have newitemonleft here; that would imply we were told to put
930                  * *everything* on the left page, which cannot fit (if it could, we'd
931                  * not be splitting the page).
932                  */
933                 Assert(!newitemonleft);
934                 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
935                                          "right sibling");
936                 rightoff = OffsetNumberNext(rightoff);
937         }
938
939         /*
940          * We have to grab the right sibling (if any) and fix the prev pointer
941          * there. We are guaranteed that this is deadlock-free since no other
942          * writer will be holding a lock on that page and trying to move left, and
943          * all readers release locks on a page before trying to fetch its
944          * neighbors.
945          */
946
947         if (!P_RIGHTMOST(ropaque))
948         {
949                 sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
950                 spage = BufferGetPage(sbuf);
951                 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
952                 if (sopaque->btpo_prev != ropaque->btpo_prev)
953                         elog(PANIC, "right sibling's left-link doesn't match: "
954                                  "block %u links to %u instead of expected %u in index \"%s\"",
955                                  ropaque->btpo_next, sopaque->btpo_prev, ropaque->btpo_prev,
956                                  RelationGetRelationName(rel));
957
958                 /*
959                  * Check to see if we can set the SPLIT_END flag in the right-hand
960                  * split page; this can save some I/O for vacuum since it need not
961                  * proceed to the right sibling.  We can set the flag if the right
962                  * sibling has a different cycleid: that means it could not be part of
963                  * a group of pages that were all split off from the same ancestor
964                  * page.  If you're confused, imagine that page A splits to A B and
965                  * then again, yielding A C B, while vacuum is in progress.  Tuples
966                  * originally in A could now be in either B or C, hence vacuum must
967                  * examine both pages.  But if D, our right sibling, has a different
968                  * cycleid then it could not contain any tuples that were in A when
969                  * the vacuum started.
970                  */
971                 if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
972                         ropaque->btpo_flags |= BTP_SPLIT_END;
973         }
974
975         /*
976          * Right sibling is locked, new siblings are prepared, but original page
977          * is not updated yet.
978          *
979          * NO EREPORT(ERROR) till right sibling is updated.  We can get away with
980          * not starting the critical section till here because we haven't been
981          * scribbling on the original page yet, and we don't care about the new
982          * sibling until it's linked into the btree.
983          */
984         START_CRIT_SECTION();
985
986         /*
987          * By here, the original data page has been split into two new halves, and
988          * these are correct.  The algorithm requires that the left page never
989          * move during a split, so we copy the new left page back on top of the
990          * original.  Note that this is not a waste of time, since we also require
991          * (in the page management code) that the center of a page always be
992          * clean, and the most efficient way to guarantee this is just to compact
993          * the data by reinserting it into a new left page.  (XXX the latter
994          * comment is probably obsolete.)
995          *
996          * We need to do this before writing the WAL record, so that XLogInsert
997          * can WAL log an image of the page if necessary.
998          */
999         PageRestoreTempPage(leftpage, origpage);
1000
1001         MarkBufferDirty(buf);
1002         MarkBufferDirty(rbuf);
1003
1004         if (!P_RIGHTMOST(ropaque))
1005         {
1006                 sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
1007                 MarkBufferDirty(sbuf);
1008         }
1009
1010         /* XLOG stuff */
1011         if (!rel->rd_istemp)
1012         {
1013                 xl_btree_split xlrec;
1014                 uint8           xlinfo;
1015                 XLogRecPtr      recptr;
1016                 XLogRecData rdata[7];
1017                 XLogRecData *lastrdata;
1018
1019                 xlrec.node = rel->rd_node;
1020                 xlrec.leftsib = BufferGetBlockNumber(buf);
1021                 xlrec.rightsib = BufferGetBlockNumber(rbuf);
1022                 xlrec.rnext = ropaque->btpo_next;
1023                 xlrec.level = ropaque->btpo.level;
1024                 xlrec.firstright = firstright;
1025
1026                 rdata[0].data = (char *) &xlrec;
1027                 rdata[0].len = SizeOfBtreeSplit;
1028                 rdata[0].buffer = InvalidBuffer;
1029
1030                 lastrdata = &rdata[0];
1031
1032                 if (ropaque->btpo.level > 0)
1033                 {
1034                         /* Log downlink on non-leaf pages */
1035                         lastrdata->next = lastrdata + 1;
1036                         lastrdata++;
1037
1038                         lastrdata->data = (char *) &newitem->t_tid.ip_blkid;
1039                         lastrdata->len = sizeof(BlockIdData);
1040                         lastrdata->buffer = InvalidBuffer;
1041
1042                         /*
1043                          * We must also log the left page's high key, because the right
1044                          * page's leftmost key is suppressed on non-leaf levels.  Show it
1045                          * as belonging to the left page buffer, so that it is not stored
1046                          * if XLogInsert decides it needs a full-page image of the left
1047                          * page.
1048                          */
1049                         lastrdata->next = lastrdata + 1;
1050                         lastrdata++;
1051
1052                         itemid = PageGetItemId(origpage, P_HIKEY);
1053                         item = (IndexTuple) PageGetItem(origpage, itemid);
1054                         lastrdata->data = (char *) item;
1055                         lastrdata->len = MAXALIGN(IndexTupleSize(item));
1056                         lastrdata->buffer = buf;        /* backup block 1 */
1057                         lastrdata->buffer_std = true;
1058                 }
1059
1060                 /*
1061                  * Log the new item and its offset, if it was inserted on the left
1062                  * page. (If it was put on the right page, we don't need to explicitly
1063                  * WAL log it because it's included with all the other items on the
1064                  * right page.) Show the new item as belonging to the left page
1065                  * buffer, so that it is not stored if XLogInsert decides it needs a
1066                  * full-page image of the left page.  We store the offset anyway,
1067                  * though, to support archive compression of these records.
1068                  */
1069                 if (newitemonleft)
1070                 {
1071                         lastrdata->next = lastrdata + 1;
1072                         lastrdata++;
1073
1074                         lastrdata->data = (char *) &newitemoff;
1075                         lastrdata->len = sizeof(OffsetNumber);
1076                         lastrdata->buffer = InvalidBuffer;
1077
1078                         lastrdata->next = lastrdata + 1;
1079                         lastrdata++;
1080
1081                         lastrdata->data = (char *) newitem;
1082                         lastrdata->len = MAXALIGN(newitemsz);
1083                         lastrdata->buffer = buf;        /* backup block 1 */
1084                         lastrdata->buffer_std = true;
1085                 }
1086                 else if (ropaque->btpo.level == 0)
1087                 {
1088                         /*
1089                          * Although we don't need to WAL-log the new item, we still need
1090                          * XLogInsert to consider storing a full-page image of the left
1091                          * page, so make an empty entry referencing that buffer. This also
1092                          * ensures that the left page is always backup block 1.
1093                          */
1094                         lastrdata->next = lastrdata + 1;
1095                         lastrdata++;
1096
1097                         lastrdata->data = NULL;
1098                         lastrdata->len = 0;
1099                         lastrdata->buffer = buf;        /* backup block 1 */
1100                         lastrdata->buffer_std = true;
1101                 }
1102
1103                 /*
1104                  * Log the contents of the right page in the format understood by
1105                  * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
1106                  * because we're going to recreate the whole page anyway, so it should
1107                  * never be stored by XLogInsert.
1108                  *
1109                  * Direct access to page is not good but faster - we should implement
1110                  * some new func in page API.  Note we only store the tuples
1111                  * themselves, knowing that they were inserted in item-number order
1112                  * and so the item pointers can be reconstructed.  See comments for
1113                  * _bt_restore_page().
1114                  */
1115                 lastrdata->next = lastrdata + 1;
1116                 lastrdata++;
1117
1118                 lastrdata->data = (char *) rightpage +
1119                         ((PageHeader) rightpage)->pd_upper;
1120                 lastrdata->len = ((PageHeader) rightpage)->pd_special -
1121                         ((PageHeader) rightpage)->pd_upper;
1122                 lastrdata->buffer = InvalidBuffer;
1123
1124                 /* Log the right sibling, because we've changed its' prev-pointer. */
1125                 if (!P_RIGHTMOST(ropaque))
1126                 {
1127                         lastrdata->next = lastrdata + 1;
1128                         lastrdata++;
1129
1130                         lastrdata->data = NULL;
1131                         lastrdata->len = 0;
1132                         lastrdata->buffer = sbuf;       /* backup block 2 */
1133                         lastrdata->buffer_std = true;
1134                 }
1135
1136                 lastrdata->next = NULL;
1137
1138                 if (isroot)
1139                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
1140                 else
1141                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
1142
1143                 recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
1144
1145                 PageSetLSN(origpage, recptr);
1146                 PageSetTLI(origpage, ThisTimeLineID);
1147                 PageSetLSN(rightpage, recptr);
1148                 PageSetTLI(rightpage, ThisTimeLineID);
1149                 if (!P_RIGHTMOST(ropaque))
1150                 {
1151                         PageSetLSN(spage, recptr);
1152                         PageSetTLI(spage, ThisTimeLineID);
1153                 }
1154         }
1155
1156         END_CRIT_SECTION();
1157
1158         /* release the old right sibling */
1159         if (!P_RIGHTMOST(ropaque))
1160                 _bt_relbuf(rel, sbuf);
1161
1162         /* split's done */
1163         return rbuf;
1164 }
1165
1166 /*
1167  *      _bt_findsplitloc() -- find an appropriate place to split a page.
1168  *
1169  * The idea here is to equalize the free space that will be on each split
1170  * page, *after accounting for the inserted tuple*.  (If we fail to account
1171  * for it, we might find ourselves with too little room on the page that
1172  * it needs to go into!)
1173  *
1174  * If the page is the rightmost page on its level, we instead try to arrange
1175  * to leave the left split page fillfactor% full.  In this way, when we are
1176  * inserting successively increasing keys (consider sequences, timestamps,
1177  * etc) we will end up with a tree whose pages are about fillfactor% full,
1178  * instead of the 50% full result that we'd get without this special case.
1179  * This is the same as nbtsort.c produces for a newly-created tree.  Note
1180  * that leaf and nonleaf pages use different fillfactors.
1181  *
1182  * We are passed the intended insert position of the new tuple, expressed as
1183  * the offsetnumber of the tuple it must go in front of.  (This could be
1184  * maxoff+1 if the tuple is to go at the end.)
1185  *
1186  * We return the index of the first existing tuple that should go on the
1187  * righthand page, plus a boolean indicating whether the new tuple goes on
1188  * the left or right page.      The bool is necessary to disambiguate the case
1189  * where firstright == newitemoff.
1190  */
1191 static OffsetNumber
1192 _bt_findsplitloc(Relation rel,
1193                                  Page page,
1194                                  OffsetNumber newitemoff,
1195                                  Size newitemsz,
1196                                  bool *newitemonleft)
1197 {
1198         BTPageOpaque opaque;
1199         OffsetNumber offnum;
1200         OffsetNumber maxoff;
1201         ItemId          itemid;
1202         FindSplitData state;
1203         int                     leftspace,
1204                                 rightspace,
1205                                 goodenough,
1206                                 olddataitemstotal,
1207                                 olddataitemstoleft;
1208         bool            goodenoughfound;
1209
1210         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1211
1212         /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
1213         newitemsz += sizeof(ItemIdData);
1214
1215         /* Total free space available on a btree page, after fixed overhead */
1216         leftspace = rightspace =
1217                 PageGetPageSize(page) - SizeOfPageHeaderData -
1218                 MAXALIGN(sizeof(BTPageOpaqueData));
1219
1220         /* The right page will have the same high key as the old page */
1221         if (!P_RIGHTMOST(opaque))
1222         {
1223                 itemid = PageGetItemId(page, P_HIKEY);
1224                 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1225                                                          sizeof(ItemIdData));
1226         }
1227
1228         /* Count up total space in data items without actually scanning 'em */
1229         olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page);
1230
1231         state.newitemsz = newitemsz;
1232         state.is_leaf = P_ISLEAF(opaque);
1233         state.is_rightmost = P_RIGHTMOST(opaque);
1234         state.have_split = false;
1235         if (state.is_leaf)
1236                 state.fillfactor = RelationGetFillFactor(rel,
1237                                                                                                  BTREE_DEFAULT_FILLFACTOR);
1238         else
1239                 state.fillfactor = BTREE_NONLEAF_FILLFACTOR;
1240         state.newitemonleft = false;    /* these just to keep compiler quiet */
1241         state.firstright = 0;
1242         state.best_delta = 0;
1243         state.leftspace = leftspace;
1244         state.rightspace = rightspace;
1245         state.olddataitemstotal = olddataitemstotal;
1246         state.newitemoff = newitemoff;
1247
1248         /*
1249          * Finding the best possible split would require checking all the possible
1250          * split points, because of the high-key and left-key special cases.
1251          * That's probably more work than it's worth; instead, stop as soon as we
1252          * find a "good-enough" split, where good-enough is defined as an
1253          * imbalance in free space of no more than pagesize/16 (arbitrary...) This
1254          * should let us stop near the middle on most pages, instead of plowing to
1255          * the end.
1256          */
1257         goodenough = leftspace / 16;
1258
1259         /*
1260          * Scan through the data items and calculate space usage for a split at
1261          * each possible position.
1262          */
1263         olddataitemstoleft = 0;
1264         goodenoughfound = false;
1265         maxoff = PageGetMaxOffsetNumber(page);
1266
1267         for (offnum = P_FIRSTDATAKEY(opaque);
1268                  offnum <= maxoff;
1269                  offnum = OffsetNumberNext(offnum))
1270         {
1271                 Size            itemsz;
1272
1273                 itemid = PageGetItemId(page, offnum);
1274                 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1275
1276                 /*
1277                  * Will the new item go to left or right of split?
1278                  */
1279                 if (offnum > newitemoff)
1280                         _bt_checksplitloc(&state, offnum, true,
1281                                                           olddataitemstoleft, itemsz);
1282
1283                 else if (offnum < newitemoff)
1284                         _bt_checksplitloc(&state, offnum, false,
1285                                                           olddataitemstoleft, itemsz);
1286                 else
1287                 {
1288                         /* need to try it both ways! */
1289                         _bt_checksplitloc(&state, offnum, true,
1290                                                           olddataitemstoleft, itemsz);
1291
1292                         _bt_checksplitloc(&state, offnum, false,
1293                                                           olddataitemstoleft, itemsz);
1294                 }
1295
1296                 /* Abort scan once we find a good-enough choice */
1297                 if (state.have_split && state.best_delta <= goodenough)
1298                 {
1299                         goodenoughfound = true;
1300                         break;
1301                 }
1302
1303                 olddataitemstoleft += itemsz;
1304         }
1305
1306         /*
1307          * If the new item goes as the last item, check for splitting so that all
1308          * the old items go to the left page and the new item goes to the right
1309          * page.
1310          */
1311         if (newitemoff > maxoff && !goodenoughfound)
1312                 _bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0);
1313
1314         /*
1315          * I believe it is not possible to fail to find a feasible split, but just
1316          * in case ...
1317          */
1318         if (!state.have_split)
1319                 elog(ERROR, "could not find a feasible split point for index \"%s\"",
1320                          RelationGetRelationName(rel));
1321
1322         *newitemonleft = state.newitemonleft;
1323         return state.firstright;
1324 }
1325
1326 /*
1327  * Subroutine to analyze a particular possible split choice (ie, firstright
1328  * and newitemonleft settings), and record the best split so far in *state.
1329  *
1330  * firstoldonright is the offset of the first item on the original page
1331  * that goes to the right page, and firstoldonrightsz is the size of that
1332  * tuple. firstoldonright can be > max offset, which means that all the old
1333  * items go to the left page and only the new item goes to the right page.
1334  * In that case, firstoldonrightsz is not used.
1335  *
1336  * olddataitemstoleft is the total size of all old items to the left of
1337  * firstoldonright.
1338  */
1339 static void
1340 _bt_checksplitloc(FindSplitData *state,
1341                                   OffsetNumber firstoldonright,
1342                                   bool newitemonleft,
1343                                   int olddataitemstoleft,
1344                                   Size firstoldonrightsz)
1345 {
1346         int                     leftfree,
1347                                 rightfree;
1348         Size            firstrightitemsz;
1349         bool            newitemisfirstonright;
1350
1351         /* Is the new item going to be the first item on the right page? */
1352         newitemisfirstonright = (firstoldonright == state->newitemoff
1353                                                          && !newitemonleft);
1354
1355         if (newitemisfirstonright)
1356                 firstrightitemsz = state->newitemsz;
1357         else
1358                 firstrightitemsz = firstoldonrightsz;
1359
1360         /* Account for all the old tuples */
1361         leftfree = state->leftspace - olddataitemstoleft;
1362         rightfree = state->rightspace -
1363                 (state->olddataitemstotal - olddataitemstoleft);
1364
1365         /*
1366          * The first item on the right page becomes the high key of the left page;
1367          * therefore it counts against left space as well as right space.
1368          */
1369         leftfree -= firstrightitemsz;
1370
1371         /* account for the new item */
1372         if (newitemonleft)
1373                 leftfree -= (int) state->newitemsz;
1374         else
1375                 rightfree -= (int) state->newitemsz;
1376
1377         /*
1378          * If we are not on the leaf level, we will be able to discard the key
1379          * data from the first item that winds up on the right page.
1380          */
1381         if (!state->is_leaf)
1382                 rightfree += (int) firstrightitemsz -
1383                         (int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData));
1384
1385         /*
1386          * If feasible split point, remember best delta.
1387          */
1388         if (leftfree >= 0 && rightfree >= 0)
1389         {
1390                 int                     delta;
1391
1392                 if (state->is_rightmost)
1393                 {
1394                         /*
1395                          * If splitting a rightmost page, try to put (100-fillfactor)% of
1396                          * free space on left page. See comments for _bt_findsplitloc.
1397                          */
1398                         delta = (state->fillfactor * leftfree)
1399                                 - ((100 - state->fillfactor) * rightfree);
1400                 }
1401                 else
1402                 {
1403                         /* Otherwise, aim for equal free space on both sides */
1404                         delta = leftfree - rightfree;
1405                 }
1406
1407                 if (delta < 0)
1408                         delta = -delta;
1409                 if (!state->have_split || delta < state->best_delta)
1410                 {
1411                         state->have_split = true;
1412                         state->newitemonleft = newitemonleft;
1413                         state->firstright = firstoldonright;
1414                         state->best_delta = delta;
1415                 }
1416         }
1417 }
1418
1419 /*
1420  * _bt_insert_parent() -- Insert downlink into parent after a page split.
1421  *
1422  * On entry, buf and rbuf are the left and right split pages, which we
1423  * still hold write locks on per the L&Y algorithm.  We release the
1424  * write locks once we have write lock on the parent page.      (Any sooner,
1425  * and it'd be possible for some other process to try to split or delete
1426  * one of these pages, and get confused because it cannot find the downlink.)
1427  *
1428  * stack - stack showing how we got here.  May be NULL in cases that don't
1429  *                      have to be efficient (concurrent ROOT split, WAL recovery)
1430  * is_root - we split the true root
1431  * is_only - we split a page alone on its level (might have been fast root)
1432  *
1433  * This is exported so it can be called by nbtxlog.c.
1434  */
1435 void
1436 _bt_insert_parent(Relation rel,
1437                                   Buffer buf,
1438                                   Buffer rbuf,
1439                                   BTStack stack,
1440                                   bool is_root,
1441                                   bool is_only)
1442 {
1443         /*
1444          * Here we have to do something Lehman and Yao don't talk about: deal with
1445          * a root split and construction of a new root.  If our stack is empty
1446          * then we have just split a node on what had been the root level when we
1447          * descended the tree.  If it was still the root then we perform a
1448          * new-root construction.  If it *wasn't* the root anymore, search to find
1449          * the next higher level that someone constructed meanwhile, and find the
1450          * right place to insert as for the normal case.
1451          *
1452          * If we have to search for the parent level, we do so by re-descending
1453          * from the root.  This is not super-efficient, but it's rare enough not
1454          * to matter.  (This path is also taken when called from WAL recovery ---
1455          * we have no stack in that case.)
1456          */
1457         if (is_root)
1458         {
1459                 Buffer          rootbuf;
1460
1461                 Assert(stack == NULL);
1462                 Assert(is_only);
1463                 /* create a new root node and update the metapage */
1464                 rootbuf = _bt_newroot(rel, buf, rbuf);
1465                 /* release the split buffers */
1466                 _bt_relbuf(rel, rootbuf);
1467                 _bt_relbuf(rel, rbuf);
1468                 _bt_relbuf(rel, buf);
1469         }
1470         else
1471         {
1472                 BlockNumber bknum = BufferGetBlockNumber(buf);
1473                 BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1474                 Page            page = BufferGetPage(buf);
1475                 IndexTuple      new_item;
1476                 BTStackData fakestack;
1477                 IndexTuple      ritem;
1478                 Buffer          pbuf;
1479
1480                 if (stack == NULL)
1481                 {
1482                         BTPageOpaque lpageop;
1483
1484                         if (!InRecovery)
1485                                 elog(DEBUG2, "concurrent ROOT page split");
1486                         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1487                         /* Find the leftmost page at the next level up */
1488                         pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
1489                         /* Set up a phony stack entry pointing there */
1490                         stack = &fakestack;
1491                         stack->bts_blkno = BufferGetBlockNumber(pbuf);
1492                         stack->bts_offset = InvalidOffsetNumber;
1493                         /* bts_btentry will be initialized below */
1494                         stack->bts_parent = NULL;
1495                         _bt_relbuf(rel, pbuf);
1496                 }
1497
1498                 /* get high key from left page == lowest key on new right page */
1499                 ritem = (IndexTuple) PageGetItem(page,
1500                                                                                  PageGetItemId(page, P_HIKEY));
1501
1502                 /* form an index tuple that points at the new right page */
1503                 new_item = CopyIndexTuple(ritem);
1504                 ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
1505
1506                 /*
1507                  * Find the parent buffer and get the parent page.
1508                  *
1509                  * Oops - if we were moved right then we need to change stack item! We
1510                  * want to find parent pointing to where we are, right ?        - vadim
1511                  * 05/27/97
1512                  */
1513                 ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
1514
1515                 pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1516
1517                 /* Now we can unlock the children */
1518                 _bt_relbuf(rel, rbuf);
1519                 _bt_relbuf(rel, buf);
1520
1521                 /* Check for error only after writing children */
1522                 if (pbuf == InvalidBuffer)
1523                         elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
1524                                  RelationGetRelationName(rel), bknum, rbknum);
1525
1526                 /* Recursively update the parent */
1527                 _bt_insertonpg(rel, pbuf, stack->bts_parent,
1528                                            new_item, stack->bts_offset + 1,
1529                                            is_only);
1530
1531                 /* be tidy */
1532                 pfree(new_item);
1533         }
1534 }
1535
1536 /*
1537  *      _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1538  *                                               we last looked at in the parent.
1539  *
1540  *              This is possible because we save the downlink from the parent item,
1541  *              which is enough to uniquely identify it.  Insertions into the parent
1542  *              level could cause the item to move right; deletions could cause it
1543  *              to move left, but not left of the page we previously found it in.
1544  *
1545  *              Adjusts bts_blkno & bts_offset if changed.
1546  *
1547  *              Returns InvalidBuffer if item not found (should not happen).
1548  */
1549 Buffer
1550 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1551 {
1552         BlockNumber blkno;
1553         OffsetNumber start;
1554
1555         blkno = stack->bts_blkno;
1556         start = stack->bts_offset;
1557
1558         for (;;)
1559         {
1560                 Buffer          buf;
1561                 Page            page;
1562                 BTPageOpaque opaque;
1563
1564                 buf = _bt_getbuf(rel, blkno, access);
1565                 page = BufferGetPage(buf);
1566                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1567
1568                 if (!P_IGNORE(opaque))
1569                 {
1570                         OffsetNumber offnum,
1571                                                 minoff,
1572                                                 maxoff;
1573                         ItemId          itemid;
1574                         IndexTuple      item;
1575
1576                         minoff = P_FIRSTDATAKEY(opaque);
1577                         maxoff = PageGetMaxOffsetNumber(page);
1578
1579                         /*
1580                          * start = InvalidOffsetNumber means "search the whole page". We
1581                          * need this test anyway due to possibility that page has a high
1582                          * key now when it didn't before.
1583                          */
1584                         if (start < minoff)
1585                                 start = minoff;
1586
1587                         /*
1588                          * Need this check too, to guard against possibility that page
1589                          * split since we visited it originally.
1590                          */
1591                         if (start > maxoff)
1592                                 start = OffsetNumberNext(maxoff);
1593
1594                         /*
1595                          * These loops will check every item on the page --- but in an
1596                          * order that's attuned to the probability of where it actually
1597                          * is.  Scan to the right first, then to the left.
1598                          */
1599                         for (offnum = start;
1600                                  offnum <= maxoff;
1601                                  offnum = OffsetNumberNext(offnum))
1602                         {
1603                                 itemid = PageGetItemId(page, offnum);
1604                                 item = (IndexTuple) PageGetItem(page, itemid);
1605                                 if (BTEntrySame(item, &stack->bts_btentry))
1606                                 {
1607                                         /* Return accurate pointer to where link is now */
1608                                         stack->bts_blkno = blkno;
1609                                         stack->bts_offset = offnum;
1610                                         return buf;
1611                                 }
1612                         }
1613
1614                         for (offnum = OffsetNumberPrev(start);
1615                                  offnum >= minoff;
1616                                  offnum = OffsetNumberPrev(offnum))
1617                         {
1618                                 itemid = PageGetItemId(page, offnum);
1619                                 item = (IndexTuple) PageGetItem(page, itemid);
1620                                 if (BTEntrySame(item, &stack->bts_btentry))
1621                                 {
1622                                         /* Return accurate pointer to where link is now */
1623                                         stack->bts_blkno = blkno;
1624                                         stack->bts_offset = offnum;
1625                                         return buf;
1626                                 }
1627                         }
1628                 }
1629
1630                 /*
1631                  * The item we're looking for moved right at least one page.
1632                  */
1633                 if (P_RIGHTMOST(opaque))
1634                 {
1635                         _bt_relbuf(rel, buf);
1636                         return InvalidBuffer;
1637                 }
1638                 blkno = opaque->btpo_next;
1639                 start = InvalidOffsetNumber;
1640                 _bt_relbuf(rel, buf);
1641         }
1642 }
1643
1644 /*
1645  *      _bt_newroot() -- Create a new root page for the index.
1646  *
1647  *              We've just split the old root page and need to create a new one.
1648  *              In order to do this, we add a new root page to the file, then lock
1649  *              the metadata page and update it.  This is guaranteed to be deadlock-
1650  *              free, because all readers release their locks on the metadata page
1651  *              before trying to lock the root, and all writers lock the root before
1652  *              trying to lock the metadata page.  We have a write lock on the old
1653  *              root page, so we have not introduced any cycles into the waits-for
1654  *              graph.
1655  *
1656  *              On entry, lbuf (the old root) and rbuf (its new peer) are write-
1657  *              locked. On exit, a new root page exists with entries for the
1658  *              two new children, metapage is updated and unlocked/unpinned.
1659  *              The new root buffer is returned to caller which has to unlock/unpin
1660  *              lbuf, rbuf & rootbuf.
1661  */
1662 static Buffer
1663 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1664 {
1665         Buffer          rootbuf;
1666         Page            lpage,
1667                                 rootpage;
1668         BlockNumber lbkno,
1669                                 rbkno;
1670         BlockNumber rootblknum;
1671         BTPageOpaque rootopaque;
1672         ItemId          itemid;
1673         IndexTuple      item;
1674         Size            itemsz;
1675         IndexTuple      new_item;
1676         Buffer          metabuf;
1677         Page            metapg;
1678         BTMetaPageData *metad;
1679
1680         lbkno = BufferGetBlockNumber(lbuf);
1681         rbkno = BufferGetBlockNumber(rbuf);
1682         lpage = BufferGetPage(lbuf);
1683
1684         /* get a new root page */
1685         rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1686         rootpage = BufferGetPage(rootbuf);
1687         rootblknum = BufferGetBlockNumber(rootbuf);
1688
1689         /* acquire lock on the metapage */
1690         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1691         metapg = BufferGetPage(metabuf);
1692         metad = BTPageGetMeta(metapg);
1693
1694         /* NO EREPORT(ERROR) from here till newroot op is logged */
1695         START_CRIT_SECTION();
1696
1697         /* set btree special data */
1698         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1699         rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1700         rootopaque->btpo_flags = BTP_ROOT;
1701         rootopaque->btpo.level =
1702                 ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1703         rootopaque->btpo_cycleid = 0;
1704
1705         /* update metapage data */
1706         metad->btm_root = rootblknum;
1707         metad->btm_level = rootopaque->btpo.level;
1708         metad->btm_fastroot = rootblknum;
1709         metad->btm_fastlevel = rootopaque->btpo.level;
1710
1711         /*
1712          * Create downlink item for left page (old root).  Since this will be the
1713          * first item in a non-leaf page, it implicitly has minus-infinity key
1714          * value, so we need not store any actual key in it.
1715          */
1716         itemsz = sizeof(IndexTupleData);
1717         new_item = (IndexTuple) palloc(itemsz);
1718         new_item->t_info = itemsz;
1719         ItemPointerSet(&(new_item->t_tid), lbkno, P_HIKEY);
1720
1721         /*
1722          * Insert the left page pointer into the new root page.  The root page is
1723          * the rightmost page on its level so there is no "high key" in it; the
1724          * two items will go into positions P_HIKEY and P_FIRSTKEY.
1725          *
1726          * Note: we *must* insert the two items in item-number order, for the
1727          * benefit of _bt_restore_page().
1728          */
1729         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY,
1730                                         false, false) == InvalidOffsetNumber)
1731                 elog(PANIC, "failed to add leftkey to new root page"
1732                          " while splitting block %u of index \"%s\"",
1733                          BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
1734         pfree(new_item);
1735
1736         /*
1737          * Create downlink item for right page.  The key for it is obtained from
1738          * the "high key" position in the left page.
1739          */
1740         itemid = PageGetItemId(lpage, P_HIKEY);
1741         itemsz = ItemIdGetLength(itemid);
1742         item = (IndexTuple) PageGetItem(lpage, itemid);
1743         new_item = CopyIndexTuple(item);
1744         ItemPointerSet(&(new_item->t_tid), rbkno, P_HIKEY);
1745
1746         /*
1747          * insert the right page pointer into the new root page.
1748          */
1749         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY,
1750                                         false, false) == InvalidOffsetNumber)
1751                 elog(PANIC, "failed to add rightkey to new root page"
1752                          " while splitting block %u of index \"%s\"",
1753                          BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
1754         pfree(new_item);
1755
1756         MarkBufferDirty(rootbuf);
1757         MarkBufferDirty(metabuf);
1758
1759         /* XLOG stuff */
1760         if (!rel->rd_istemp)
1761         {
1762                 xl_btree_newroot xlrec;
1763                 XLogRecPtr      recptr;
1764                 XLogRecData rdata[2];
1765
1766                 xlrec.node = rel->rd_node;
1767                 xlrec.rootblk = rootblknum;
1768                 xlrec.level = metad->btm_level;
1769
1770                 rdata[0].data = (char *) &xlrec;
1771                 rdata[0].len = SizeOfBtreeNewroot;
1772                 rdata[0].buffer = InvalidBuffer;
1773                 rdata[0].next = &(rdata[1]);
1774
1775                 /*
1776                  * Direct access to page is not good but faster - we should implement
1777                  * some new func in page API.
1778                  */
1779                 rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
1780                 rdata[1].len = ((PageHeader) rootpage)->pd_special -
1781                         ((PageHeader) rootpage)->pd_upper;
1782                 rdata[1].buffer = InvalidBuffer;
1783                 rdata[1].next = NULL;
1784
1785                 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
1786
1787                 PageSetLSN(rootpage, recptr);
1788                 PageSetTLI(rootpage, ThisTimeLineID);
1789                 PageSetLSN(metapg, recptr);
1790                 PageSetTLI(metapg, ThisTimeLineID);
1791         }
1792
1793         END_CRIT_SECTION();
1794
1795         /* send out relcache inval for metapage change */
1796         if (!InRecovery)
1797                 CacheInvalidateRelcache(rel);
1798
1799         /* done with metapage */
1800         _bt_relbuf(rel, metabuf);
1801
1802         return rootbuf;
1803 }
1804
1805 /*
1806  *      _bt_pgaddtup() -- add a tuple to a particular page in the index.
1807  *
1808  *              This routine adds the tuple to the page as requested.  It does
1809  *              not affect pin/lock status, but you'd better have a write lock
1810  *              and pin on the target buffer!  Don't forget to write and release
1811  *              the buffer afterwards, either.
1812  *
1813  *              The main difference between this routine and a bare PageAddItem call
1814  *              is that this code knows that the leftmost index tuple on a non-leaf
1815  *              btree page doesn't need to have a key.  Therefore, it strips such
1816  *              tuples down to just the tuple header.  CAUTION: this works ONLY if
1817  *              we insert the tuples in order, so that the given itup_off does
1818  *              represent the final position of the tuple!
1819  */
1820 static void
1821 _bt_pgaddtup(Relation rel,
1822                          Page page,
1823                          Size itemsize,
1824                          IndexTuple itup,
1825                          OffsetNumber itup_off,
1826                          const char *where)
1827 {
1828         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1829         IndexTupleData trunctuple;
1830
1831         if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
1832         {
1833                 trunctuple = *itup;
1834                 trunctuple.t_info = sizeof(IndexTupleData);
1835                 itup = &trunctuple;
1836                 itemsize = sizeof(IndexTupleData);
1837         }
1838
1839         if (PageAddItem(page, (Item) itup, itemsize, itup_off,
1840                                         false, false) == InvalidOffsetNumber)
1841                 elog(PANIC, "failed to add item to the %s in index \"%s\"",
1842                          where, RelationGetRelationName(rel));
1843 }
1844
1845 /*
1846  * _bt_isequal - used in _bt_doinsert in check for duplicates.
1847  *
1848  * This is very similar to _bt_compare, except for NULL handling.
1849  * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
1850  */
1851 static bool
1852 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
1853                         int keysz, ScanKey scankey)
1854 {
1855         IndexTuple      itup;
1856         int                     i;
1857
1858         /* Better be comparing to a leaf item */
1859         Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
1860
1861         itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
1862
1863         for (i = 1; i <= keysz; i++)
1864         {
1865                 AttrNumber      attno;
1866                 Datum           datum;
1867                 bool            isNull;
1868                 int32           result;
1869
1870                 attno = scankey->sk_attno;
1871                 Assert(attno == i);
1872                 datum = index_getattr(itup, attno, itupdesc, &isNull);
1873
1874                 /* NULLs are never equal to anything */
1875                 if (isNull || (scankey->sk_flags & SK_ISNULL))
1876                         return false;
1877
1878                 result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
1879                                                                                          datum,
1880                                                                                          scankey->sk_argument));
1881
1882                 if (result != 0)
1883                         return false;
1884
1885                 scankey++;
1886         }
1887
1888         /* if we get here, the keys are equal */
1889         return true;
1890 }
1891
1892 /*
1893  * _bt_vacuum_one_page - vacuum just one index page.
1894  *
1895  * Try to remove LP_DEAD items from the given page.  The passed buffer
1896  * must be exclusive-locked, but unlike a real VACUUM, we don't need a
1897  * super-exclusive "cleanup" lock (see nbtree/README).
1898  */
1899 static void
1900 _bt_vacuum_one_page(Relation rel, Buffer buffer)
1901 {
1902         OffsetNumber deletable[MaxOffsetNumber];
1903         int                     ndeletable = 0;
1904         OffsetNumber offnum,
1905                                 minoff,
1906                                 maxoff;
1907         Page            page = BufferGetPage(buffer);
1908         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1909
1910         /*
1911          * Scan over all items to see which ones need to be deleted according to
1912          * LP_DEAD flags.
1913          */
1914         minoff = P_FIRSTDATAKEY(opaque);
1915         maxoff = PageGetMaxOffsetNumber(page);
1916         for (offnum = minoff;
1917                  offnum <= maxoff;
1918                  offnum = OffsetNumberNext(offnum))
1919         {
1920                 ItemId          itemId = PageGetItemId(page, offnum);
1921
1922                 if (ItemIdIsDead(itemId))
1923                         deletable[ndeletable++] = offnum;
1924         }
1925
1926         if (ndeletable > 0)
1927                 _bt_delitems(rel, buffer, deletable, ndeletable);
1928
1929         /*
1930          * Note: if we didn't find any LP_DEAD items, then the page's
1931          * BTP_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
1932          * separate write to clear it, however.  We will clear it when we split
1933          * the page.
1934          */
1935 }