]> granicus.if.org Git - postgresql/blob - src/backend/access/nbtree/nbtinsert.c
fe3e98b982c322c4fdc059a543b8dc0a9bef5832
[postgresql] / src / backend / access / nbtree / nbtinsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * btinsert.c
4  *        Item insertion in Lehman and Yao btrees for Postgres.
5  *
6  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.91 2002/05/24 18:57:55 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "miscadmin.h"
21
22
23 typedef struct
24 {
25         /* context data for _bt_checksplitloc */
26         Size            newitemsz;              /* size of new item to be inserted */
27         bool            is_leaf;                /* T if splitting a leaf page */
28         bool            is_rightmost;   /* T if splitting a rightmost page */
29
30         bool            have_split;             /* found a valid split? */
31
32         /* these fields valid only if have_split is true */
33         bool            newitemonleft;  /* new item on left or right of best split */
34         OffsetNumber firstright;        /* best split point */
35         int                     best_delta;             /* best size delta so far */
36 } FindSplitData;
37
38 extern bool FixBTree;
39
40 Buffer          _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release);
41 static void _bt_fixtree(Relation rel, BlockNumber blkno);
42 static void _bt_fixbranch(Relation rel, BlockNumber lblkno,
43                           BlockNumber rblkno, BTStack true_stack);
44 static void _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit);
45 static void _bt_fixup(Relation rel, Buffer buf);
46 static OffsetNumber _bt_getoff(Page page, BlockNumber blkno);
47
48 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
49
50 static TransactionId _bt_check_unique(Relation rel, BTItem btitem,
51                                  Relation heapRel, Buffer buf,
52                                  ScanKey itup_scankey);
53 static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf,
54                            BTStack stack,
55                            int keysz, ScanKey scankey,
56                            BTItem btitem,
57                            OffsetNumber afteritem);
58 static void _bt_insertuple(Relation rel, Buffer buf,
59                            Size itemsz, BTItem btitem, OffsetNumber newitemoff);
60 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
61                   OffsetNumber newitemoff, Size newitemsz,
62                   BTItem newitem, bool newitemonleft,
63                   OffsetNumber *itup_off, BlockNumber *itup_blkno);
64 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
65                                  OffsetNumber newitemoff,
66                                  Size newitemsz,
67                                  bool *newitemonleft);
68 static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
69                                   int leftfree, int rightfree,
70                                   bool newitemonleft, Size firstrightitemsz);
71 static Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
72 static void _bt_pgaddtup(Relation rel, Page page,
73                          Size itemsize, BTItem btitem,
74                          OffsetNumber itup_off, const char *where);
75 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
76                         int keysz, ScanKey scankey);
77
78
79 /*
80  *      _bt_doinsert() -- Handle insertion of a single btitem in the tree.
81  *
82  *              This routine is called by the public interface routines, btbuild
83  *              and btinsert.  By here, btitem is filled in, including the TID.
84  */
85 InsertIndexResult
86 _bt_doinsert(Relation rel, BTItem btitem,
87                          bool index_is_unique, Relation heapRel)
88 {
89         IndexTuple      itup = &(btitem->bti_itup);
90         int                     natts = rel->rd_rel->relnatts;
91         ScanKey         itup_scankey;
92         BTStack         stack;
93         Buffer          buf;
94         InsertIndexResult res;
95
96         /* we need a scan key to do our search, so build one */
97         itup_scankey = _bt_mkscankey(rel, itup);
98
99 top:
100         /* find the page containing this key */
101         stack = _bt_search(rel, natts, itup_scankey, &buf, BT_WRITE);
102
103         /* trade in our read lock for a write lock */
104         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
105         LockBuffer(buf, BT_WRITE);
106
107         /*
108          * If the page was split between the time that we surrendered our read
109          * lock and acquired our write lock, then this page may no longer be
110          * the right place for the key we want to insert.  In this case, we
111          * need to move right in the tree.      See Lehman and Yao for an
112          * excruciatingly precise description.
113          */
114         buf = _bt_moveright(rel, buf, natts, itup_scankey, BT_WRITE);
115
116         /*
117          * If we're not allowing duplicates, make sure the key isn't already
118          * in the index.
119          *
120          * NOTE: obviously, _bt_check_unique can only detect keys that are
121          * already in the index; so it cannot defend against concurrent
122          * insertions of the same key.  We protect against that by means
123          * of holding a write lock on the target page.  Any other would-be
124          * inserter of the same key must acquire a write lock on the same
125          * target page, so only one would-be inserter can be making the check
126          * at one time.  Furthermore, once we are past the check we hold
127          * write locks continuously until we have performed our insertion,
128          * so no later inserter can fail to see our insertion.  (This
129          * requires some care in _bt_insertonpg.)
130          *
131          * If we must wait for another xact, we release the lock while waiting,
132          * and then must start over completely.
133          */
134         if (index_is_unique)
135         {
136                 TransactionId xwait;
137
138                 xwait = _bt_check_unique(rel, btitem, heapRel, buf, itup_scankey);
139
140                 if (TransactionIdIsValid(xwait))
141                 {
142                         /* Have to wait for the other guy ... */
143                         _bt_relbuf(rel, buf);
144                         XactLockTableWait(xwait);
145                         /* start over... */
146                         _bt_freestack(stack);
147                         goto top;
148                 }
149         }
150
151         /* do the insertion */
152         res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem, 0);
153
154         /* be tidy */
155         _bt_freestack(stack);
156         _bt_freeskey(itup_scankey);
157
158         return res;
159 }
160
161 /*
162  *      _bt_check_unique() -- Check for violation of unique index constraint
163  *
164  * Returns InvalidTransactionId if there is no conflict, else an xact ID
165  * we must wait for to see if it commits a conflicting tuple.   If an actual
166  * conflict is detected, no return --- just elog().
167  */
168 static TransactionId
169 _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel,
170                                  Buffer buf, ScanKey itup_scankey)
171 {
172         TupleDesc       itupdesc = RelationGetDescr(rel);
173         int                     natts = rel->rd_rel->relnatts;
174         OffsetNumber offset,
175                                 maxoff;
176         Page            page;
177         BTPageOpaque opaque;
178         Buffer          nbuf = InvalidBuffer;
179
180         page = BufferGetPage(buf);
181         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
182         maxoff = PageGetMaxOffsetNumber(page);
183
184         /*
185          * Find first item >= proposed new item.  Note we could also get a
186          * pointer to end-of-page here.
187          */
188         offset = _bt_binsrch(rel, buf, natts, itup_scankey);
189
190         /*
191          * Scan over all equal tuples, looking for live conflicts.
192          */
193         for (;;)
194         {
195                 HeapTupleData htup;
196                 Buffer          hbuffer;
197                 ItemId          curitemid;
198                 BTItem          cbti;
199                 BlockNumber nblkno;
200
201                 /*
202                  * make sure the offset points to an actual key before trying to
203                  * compare it...
204                  */
205                 if (offset <= maxoff)
206                 {
207                         /*
208                          * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's how we
209                          * handling NULLs - and so we must not use _bt_compare in real
210                          * comparison, but only for ordering/finding items on pages. -
211                          * vadim 03/24/97
212                          */
213                         if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
214                                 break;                  /* we're past all the equal tuples */
215
216                         curitemid = PageGetItemId(page, offset);
217                         /*
218                          * We can skip the heap fetch if the item is marked killed.
219                          */
220                         if (!ItemIdDeleted(curitemid))
221                         {
222                                 cbti = (BTItem) PageGetItem(page, curitemid);
223                                 htup.t_self = cbti->bti_itup.t_tid;
224                                 if (heap_fetch(heapRel, SnapshotDirty, &htup, &hbuffer,
225                                                            true, NULL))
226                                 {
227                                         /* it is a duplicate */
228                                         TransactionId xwait =
229                                                 (TransactionIdIsValid(SnapshotDirty->xmin)) ?
230                                                 SnapshotDirty->xmin : SnapshotDirty->xmax;
231
232                                         ReleaseBuffer(hbuffer);
233                                         /*
234                                          * If this tuple is being updated by other transaction
235                                          * then we have to wait for its commit/abort.
236                                          */
237                                         if (TransactionIdIsValid(xwait))
238                                         {
239                                                 if (nbuf != InvalidBuffer)
240                                                         _bt_relbuf(rel, nbuf);
241                                                 /* Tell _bt_doinsert to wait... */
242                                                 return xwait;
243                                         }
244
245                                         /*
246                                          * Otherwise we have a definite conflict.
247                                          */
248                                         elog(ERROR, "Cannot insert a duplicate key into unique index %s",
249                                                  RelationGetRelationName(rel));
250                                 }
251                                 else
252                                 {
253                                         /*
254                                          * Hmm, if we can't see the tuple, maybe it can be
255                                          * marked killed.  This logic should match index_getnext
256                                          * and btgettuple.
257                                          */
258                                         uint16          sv_infomask;
259
260                                         LockBuffer(hbuffer, BUFFER_LOCK_SHARE);
261                                         sv_infomask = htup.t_data->t_infomask;
262                                         if (HeapTupleSatisfiesVacuum(htup.t_data,
263                                                                                                  RecentGlobalXmin) ==
264                                                 HEAPTUPLE_DEAD)
265                                         {
266                                                 curitemid->lp_flags |= LP_DELETE;
267                                                 SetBufferCommitInfoNeedsSave(buf);
268                                         }
269                                         if (sv_infomask != htup.t_data->t_infomask)
270                                                 SetBufferCommitInfoNeedsSave(hbuffer);
271                                         LockBuffer(hbuffer, BUFFER_LOCK_UNLOCK);
272                                         ReleaseBuffer(hbuffer);
273                                 }
274                         }
275                 }
276
277                 /*
278                  * Advance to next tuple to continue checking.
279                  */
280                 if (offset < maxoff)
281                         offset = OffsetNumberNext(offset);
282                 else
283                 {
284                         /* If scankey == hikey we gotta check the next page too */
285                         if (P_RIGHTMOST(opaque))
286                                 break;
287                         if (!_bt_isequal(itupdesc, page, P_HIKEY,
288                                                          natts, itup_scankey))
289                                 break;
290                         nblkno = opaque->btpo_next;
291                         if (nbuf != InvalidBuffer)
292                                 _bt_relbuf(rel, nbuf);
293                         nbuf = _bt_getbuf(rel, nblkno, BT_READ);
294                         page = BufferGetPage(nbuf);
295                         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
296                         maxoff = PageGetMaxOffsetNumber(page);
297                         offset = P_FIRSTDATAKEY(opaque);
298                 }
299         }
300
301         if (nbuf != InvalidBuffer)
302                 _bt_relbuf(rel, nbuf);
303
304         return InvalidTransactionId;
305 }
306
307 /*----------
308  *      _bt_insertonpg() -- Insert a tuple on a particular page in the index.
309  *
310  *              This recursive procedure does the following things:
311  *
312  *                      +  finds the right place to insert the tuple.
313  *                      +  if necessary, splits the target page (making sure that the
314  *                         split is equitable as far as post-insert free space goes).
315  *                      +  inserts the tuple.
316  *                      +  if the page was split, pops the parent stack, and finds the
317  *                         right place to insert the new child pointer (by walking
318  *                         right using information stored in the parent stack).
319  *                      +  invokes itself with the appropriate tuple for the right
320  *                         child page on the parent.
321  *
322  *              On entry, we must have the right buffer on which to do the
323  *              insertion, and the buffer must be pinned and locked.  On return,
324  *              we will have dropped both the pin and the write lock on the buffer.
325  *
326  *              If 'afteritem' is >0 then the new tuple must be inserted after the
327  *              existing item of that number, noplace else.  If 'afteritem' is 0
328  *              then the procedure finds the exact spot to insert it by searching.
329  *              (keysz and scankey parameters are used ONLY if afteritem == 0.)
330  *
331  *              NOTE: if the new key is equal to one or more existing keys, we can
332  *              legitimately place it anywhere in the series of equal keys --- in fact,
333  *              if the new key is equal to the page's "high key" we can place it on
334  *              the next page.  If it is equal to the high key, and there's not room
335  *              to insert the new tuple on the current page without splitting, then
336  *              we can move right hoping to find more free space and avoid a split.
337  *              (We should not move right indefinitely, however, since that leads to
338  *              O(N^2) insertion behavior in the presence of many equal keys.)
339  *              Once we have chosen the page to put the key on, we'll insert it before
340  *              any existing equal keys because of the way _bt_binsrch() works.
341  *
342  *              The locking interactions in this code are critical.  You should
343  *              grok Lehman and Yao's paper before making any changes.  In addition,
344  *              you need to understand how we disambiguate duplicate keys in this
345  *              implementation, in order to be able to find our location using
346  *              L&Y "move right" operations.  Since we may insert duplicate user
347  *              keys, and since these dups may propagate up the tree, we use the
348  *              'afteritem' parameter to position ourselves correctly for the
349  *              insertion on internal pages.
350  *----------
351  */
352 static InsertIndexResult
353 _bt_insertonpg(Relation rel,
354                            Buffer buf,
355                            BTStack stack,
356                            int keysz,
357                            ScanKey scankey,
358                            BTItem btitem,
359                            OffsetNumber afteritem)
360 {
361         InsertIndexResult res;
362         Page            page;
363         BTPageOpaque lpageop;
364         OffsetNumber itup_off;
365         BlockNumber itup_blkno;
366         OffsetNumber newitemoff;
367         OffsetNumber firstright = InvalidOffsetNumber;
368         Size            itemsz;
369
370         page = BufferGetPage(buf);
371         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
372
373         itemsz = IndexTupleDSize(btitem->bti_itup)
374                 + (sizeof(BTItemData) - sizeof(IndexTupleData));
375
376         itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but
377                                                                  * we need to be consistent */
378
379         /*
380          * Check whether the item can fit on a btree page at all. (Eventually,
381          * we ought to try to apply TOAST methods if not.) We actually need to
382          * be able to fit three items on every page, so restrict any one item
383          * to 1/3 the per-page available space. Note that at this point,
384          * itemsz doesn't include the ItemId.
385          */
386         if (itemsz > (PageGetPageSize(page) - sizeof(PageHeaderData) - MAXALIGN(sizeof(BTPageOpaqueData))) / 3 - sizeof(ItemIdData))
387                 elog(ERROR, "btree: index item size %lu exceeds maximum %lu",
388                          (unsigned long) itemsz,
389                          (PageGetPageSize(page) - sizeof(PageHeaderData) - MAXALIGN(sizeof(BTPageOpaqueData))) / 3 - sizeof(ItemIdData));
390
391         /*
392          * Determine exactly where new item will go.
393          */
394         if (afteritem > 0)
395                 newitemoff = afteritem + 1;
396         else
397         {
398                 /*----------
399                  * If we will need to split the page to put the item here,
400                  * check whether we can put the tuple somewhere to the right,
401                  * instead.  Keep scanning right until we
402                  *              (a) find a page with enough free space,
403                  *              (b) reach the last page where the tuple can legally go, or
404                  *              (c) get tired of searching.
405                  * (c) is not flippant; it is important because if there are many
406                  * pages' worth of equal keys, it's better to split one of the early
407                  * pages than to scan all the way to the end of the run of equal keys
408                  * on every insert.  We implement "get tired" as a random choice,
409                  * since stopping after scanning a fixed number of pages wouldn't work
410                  * well (we'd never reach the right-hand side of previously split
411                  * pages).      Currently the probability of moving right is set at 0.99,
412                  * which may seem too high to change the behavior much, but it does an
413                  * excellent job of preventing O(N^2) behavior with many equal keys.
414                  *----------
415                  */
416                 bool            movedright = false;
417
418                 while (PageGetFreeSpace(page) < itemsz &&
419                            !P_RIGHTMOST(lpageop) &&
420                            _bt_compare(rel, keysz, scankey, page, P_HIKEY) == 0 &&
421                            random() > (MAX_RANDOM_VALUE / 100))
422                 {
423                         /* step right one page */
424                         BlockNumber rblkno = lpageop->btpo_next;
425                         Buffer  rbuf;
426
427                         /*
428                          * must write-lock next page before releasing write lock on
429                          * current page; else someone else's _bt_check_unique scan
430                          * could fail to see our insertion.
431                          */
432                         rbuf = _bt_getbuf(rel, rblkno, BT_WRITE);
433                         _bt_relbuf(rel, buf);
434                         buf = rbuf;
435                         page = BufferGetPage(buf);
436                         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
437                         movedright = true;
438                 }
439
440                 /*
441                  * Now we are on the right page, so find the insert position. If
442                  * we moved right at all, we know we should insert at the start of
443                  * the page, else must find the position by searching.
444                  */
445                 if (movedright)
446                         newitemoff = P_FIRSTDATAKEY(lpageop);
447                 else
448                         newitemoff = _bt_binsrch(rel, buf, keysz, scankey);
449         }
450
451         /*
452          * Do we need to split the page to fit the item on it?
453          *
454          * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
455          * so this comparison is correct even though we appear to be
456          * accounting only for the item and not for its line pointer.
457          */
458         if (PageGetFreeSpace(page) < itemsz)
459         {
460                 Buffer          rbuf;
461                 BlockNumber bknum = BufferGetBlockNumber(buf);
462                 BlockNumber rbknum;
463                 bool            is_root = P_ISROOT(lpageop);
464                 bool            newitemonleft;
465
466                 /* Choose the split point */
467                 firstright = _bt_findsplitloc(rel, page,
468                                                                           newitemoff, itemsz,
469                                                                           &newitemonleft);
470
471                 /* split the buffer into left and right halves */
472                 rbuf = _bt_split(rel, buf, firstright,
473                                                  newitemoff, itemsz, btitem, newitemonleft,
474                                                  &itup_off, &itup_blkno);
475
476                 /*----------
477                  * By here,
478                  *
479                  *              +  our target page has been split;
480                  *              +  the original tuple has been inserted;
481                  *              +  we have write locks on both the old (left half)
482                  *                 and new (right half) buffers, after the split; and
483                  *              +  we know the key we want to insert into the parent
484                  *                 (it's the "high key" on the left child page).
485                  *
486                  * We're ready to do the parent insertion.  We need to hold onto the
487                  * locks for the child pages until we locate the parent, but we can
488                  * release them before doing the actual insertion (see Lehman and Yao
489                  * for the reasoning).
490                  *
491                  * Here we have to do something Lehman and Yao don't talk about:
492                  * deal with a root split and construction of a new root.  If our
493                  * stack is empty then we have just split a node on what had been
494                  * the root level when we descended the tree.  If it is still the
495                  * root then we perform a new-root construction.  If it *wasn't*
496                  * the root anymore, use the parent pointer to get up to the root
497                  * level that someone constructed meanwhile, and find the right
498                  * place to insert as for the normal case.
499                  *----------
500                  */
501
502                 if (is_root)
503                 {
504                         Buffer          rootbuf;
505
506                         Assert(stack == (BTStack) NULL);
507                         /* create a new root node and release the split buffers */
508                         rootbuf = _bt_newroot(rel, buf, rbuf);
509                         _bt_wrtbuf(rel, rootbuf);
510                         _bt_wrtbuf(rel, rbuf);
511                         _bt_wrtbuf(rel, buf);
512                 }
513                 else
514                 {
515                         InsertIndexResult newres;
516                         BTItem          new_item;
517                         BTStackData fakestack;
518                         BTItem          ritem;
519                         Buffer          pbuf;
520
521                         /* If root page was splitted */
522                         if (stack == (BTStack) NULL)
523                         {
524                                 elog(LOG, "btree: concurrent ROOT page split");
525
526                                 /*
527                                  * If root page splitter failed to create new root page
528                                  * then old root' btpo_parent still points to metapage. We
529                                  * have to fix root page in this case.
530                                  */
531                                 if (BTreeInvalidParent(lpageop))
532                                 {
533                                         if (!FixBTree)
534                                                 elog(ERROR, "bt_insertonpg[%s]: no root page found", RelationGetRelationName(rel));
535                                         _bt_wrtbuf(rel, rbuf);
536                                         _bt_wrtnorelbuf(rel, buf);
537                                         elog(WARNING, "bt_insertonpg[%s]: root page unfound - fixing upper levels", RelationGetRelationName(rel));
538                                         _bt_fixup(rel, buf);
539                                         goto formres;
540                                 }
541
542                                 /*
543                                  * Set up a phony stack entry if we haven't got a real one
544                                  */
545                                 stack = &fakestack;
546                                 stack->bts_blkno = lpageop->btpo_parent;
547                                 stack->bts_offset = InvalidOffsetNumber;
548                                 /* bts_btitem will be initialized below */
549                                 stack->bts_parent = NULL;
550                         }
551
552                         /* get high key from left page == lowest key on new right page */
553                         ritem = (BTItem) PageGetItem(page,
554                                                                                  PageGetItemId(page, P_HIKEY));
555
556                         /* form an index tuple that points at the new right page */
557                         new_item = _bt_formitem(&(ritem->bti_itup));
558                         rbknum = BufferGetBlockNumber(rbuf);
559                         ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY);
560
561                         /*
562                          * Find the parent buffer and get the parent page.
563                          *
564                          * Oops - if we were moved right then we need to change stack
565                          * item! We want to find parent pointing to where we are,
566                          * right ?        - vadim 05/27/97
567                          *
568                          * Interestingly, this means we didn't *really* need to stack the
569                          * parent key at all; all we really care about is the saved
570                          * block and offset as a starting point for our search...
571                          */
572                         ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
573                                                    bknum, P_HIKEY);
574
575                         pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
576
577                         /* Now we can write and unlock the children */
578                         _bt_wrtbuf(rel, rbuf);
579                         _bt_wrtbuf(rel, buf);
580
581                         if (pbuf == InvalidBuffer)
582                         {
583                                 if (!FixBTree)
584                                         elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!"
585                                                  "\n\tRecreate index %s.", RelationGetRelationName(rel));
586                                 pfree(new_item);
587                                 elog(WARNING, "bt_insertonpg[%s]: parent page unfound - fixing branch", RelationGetRelationName(rel));
588                                 _bt_fixbranch(rel, bknum, rbknum, stack);
589                                 goto formres;
590                         }
591                         /* Recursively update the parent */
592                         newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
593                                                                         0, NULL, new_item, stack->bts_offset);
594
595                         /* be tidy */
596                         pfree(newres);
597                         pfree(new_item);
598                 }
599         }
600         else
601         {
602                 itup_off = newitemoff;
603                 itup_blkno = BufferGetBlockNumber(buf);
604
605                 _bt_insertuple(rel, buf, itemsz, btitem, newitemoff);
606
607                 /* Write out the updated page and release pin/lock */
608                 _bt_wrtbuf(rel, buf);
609         }
610
611 formres:;
612         /* by here, the new tuple is inserted at itup_blkno/itup_off */
613         res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
614         ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
615
616         return res;
617 }
618
619 static void
620 _bt_insertuple(Relation rel, Buffer buf,
621                            Size itemsz, BTItem btitem, OffsetNumber newitemoff)
622 {
623         Page            page = BufferGetPage(buf);
624         BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
625
626         START_CRIT_SECTION();
627         _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
628         /* XLOG stuff */
629         {
630                 xl_btree_insert xlrec;
631                 uint8           flag = XLOG_BTREE_INSERT;
632                 XLogRecPtr      recptr;
633                 XLogRecData rdata[2];
634                 BTItemData      truncitem;
635
636                 xlrec.target.node = rel->rd_node;
637                 ItemPointerSet(&(xlrec.target.tid), BufferGetBlockNumber(buf), newitemoff);
638                 rdata[0].buffer = InvalidBuffer;
639                 rdata[0].data = (char *) &xlrec;
640                 rdata[0].len = SizeOfBtreeInsert;
641                 rdata[0].next = &(rdata[1]);
642
643                 /* Read comments in _bt_pgaddtup */
644                 if (!(P_ISLEAF(pageop)) && newitemoff == P_FIRSTDATAKEY(pageop))
645                 {
646                         truncitem = *btitem;
647                         truncitem.bti_itup.t_info = sizeof(BTItemData);
648                         rdata[1].data = (char *) &truncitem;
649                         rdata[1].len = sizeof(BTItemData);
650                 }
651                 else
652                 {
653                         rdata[1].data = (char *) btitem;
654                         rdata[1].len = IndexTupleDSize(btitem->bti_itup) +
655                                 (sizeof(BTItemData) - sizeof(IndexTupleData));
656                 }
657                 rdata[1].buffer = buf;
658                 rdata[1].next = NULL;
659                 if (P_ISLEAF(pageop))
660                         flag |= XLOG_BTREE_LEAF;
661
662                 recptr = XLogInsert(RM_BTREE_ID, flag, rdata);
663
664                 PageSetLSN(page, recptr);
665                 PageSetSUI(page, ThisStartUpID);
666         }
667
668         END_CRIT_SECTION();
669 }
670
671 /*
672  *      _bt_split() -- split a page in the btree.
673  *
674  *              On entry, buf is the page to split, and is write-locked and pinned.
675  *              firstright is the item index of the first item to be moved to the
676  *              new right page.  newitemoff etc. tell us about the new item that
677  *              must be inserted along with the data from the old page.
678  *
679  *              Returns the new right sibling of buf, pinned and write-locked.
680  *              The pin and lock on buf are maintained.  *itup_off and *itup_blkno
681  *              are set to the exact location where newitem was inserted.
682  */
683 static Buffer
684 _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
685                   OffsetNumber newitemoff, Size newitemsz, BTItem newitem,
686                   bool newitemonleft,
687                   OffsetNumber *itup_off, BlockNumber *itup_blkno)
688 {
689         Buffer          rbuf;
690         Page            origpage;
691         Page            leftpage,
692                                 rightpage;
693         BTPageOpaque ropaque,
694                                 lopaque,
695                                 oopaque;
696         Buffer          sbuf = 0;
697         Page            spage = 0;
698         Size            itemsz;
699         ItemId          itemid;
700         BTItem          item;
701         OffsetNumber leftoff,
702                                 rightoff;
703         OffsetNumber maxoff;
704         OffsetNumber i;
705         BTItem          lhikey;
706
707         rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
708         origpage = BufferGetPage(buf);
709         leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
710         rightpage = BufferGetPage(rbuf);
711
712         _bt_pageinit(leftpage, BufferGetPageSize(buf));
713         _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
714
715         /* init btree private data */
716         oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
717         lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
718         ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
719
720         /* if we're splitting this page, it won't be the root when we're done */
721         lopaque->btpo_flags = oopaque->btpo_flags;
722         lopaque->btpo_flags &= ~BTP_ROOT;
723         ropaque->btpo_flags = lopaque->btpo_flags;
724         lopaque->btpo_prev = oopaque->btpo_prev;
725         lopaque->btpo_next = BufferGetBlockNumber(rbuf);
726         ropaque->btpo_prev = BufferGetBlockNumber(buf);
727         ropaque->btpo_next = oopaque->btpo_next;
728
729         /*
730          * Must copy the original parent link into both new pages, even though
731          * it might be quite obsolete by now.  We might need it if this level
732          * is or recently was the root (see README).
733          */
734         lopaque->btpo_parent = ropaque->btpo_parent = oopaque->btpo_parent;
735
736         /*
737          * If the page we're splitting is not the rightmost page at its level
738          * in the tree, then the first entry on the page is the high key for
739          * the page.  We need to copy that to the right half.  Otherwise
740          * (meaning the rightmost page case), all the items on the right half
741          * will be user data.
742          */
743         rightoff = P_HIKEY;
744
745         if (!P_RIGHTMOST(oopaque))
746         {
747                 itemid = PageGetItemId(origpage, P_HIKEY);
748                 itemsz = ItemIdGetLength(itemid);
749                 item = (BTItem) PageGetItem(origpage, itemid);
750                 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
751                                                 LP_USED) == InvalidOffsetNumber)
752                         elog(PANIC, "btree: failed to add hikey to the right sibling");
753                 rightoff = OffsetNumberNext(rightoff);
754         }
755
756         /*
757          * The "high key" for the new left page will be the first key that's
758          * going to go into the new right page.  This might be either the
759          * existing data item at position firstright, or the incoming tuple.
760          */
761         leftoff = P_HIKEY;
762         if (!newitemonleft && newitemoff == firstright)
763         {
764                 /* incoming tuple will become first on right page */
765                 itemsz = newitemsz;
766                 item = newitem;
767         }
768         else
769         {
770                 /* existing item at firstright will become first on right page */
771                 itemid = PageGetItemId(origpage, firstright);
772                 itemsz = ItemIdGetLength(itemid);
773                 item = (BTItem) PageGetItem(origpage, itemid);
774         }
775         lhikey = item;
776         if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
777                                         LP_USED) == InvalidOffsetNumber)
778                 elog(PANIC, "btree: failed to add hikey to the left sibling");
779         leftoff = OffsetNumberNext(leftoff);
780
781         /*
782          * Now transfer all the data items to the appropriate page
783          */
784         maxoff = PageGetMaxOffsetNumber(origpage);
785
786         for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
787         {
788                 itemid = PageGetItemId(origpage, i);
789                 itemsz = ItemIdGetLength(itemid);
790                 item = (BTItem) PageGetItem(origpage, itemid);
791
792                 /* does new item belong before this one? */
793                 if (i == newitemoff)
794                 {
795                         if (newitemonleft)
796                         {
797                                 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
798                                                          "left sibling");
799                                 *itup_off = leftoff;
800                                 *itup_blkno = BufferGetBlockNumber(buf);
801                                 leftoff = OffsetNumberNext(leftoff);
802                         }
803                         else
804                         {
805                                 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
806                                                          "right sibling");
807                                 *itup_off = rightoff;
808                                 *itup_blkno = BufferGetBlockNumber(rbuf);
809                                 rightoff = OffsetNumberNext(rightoff);
810                         }
811                 }
812
813                 /* decide which page to put it on */
814                 if (i < firstright)
815                 {
816                         _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
817                                                  "left sibling");
818                         leftoff = OffsetNumberNext(leftoff);
819                 }
820                 else
821                 {
822                         _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
823                                                  "right sibling");
824                         rightoff = OffsetNumberNext(rightoff);
825                 }
826         }
827
828         /* cope with possibility that newitem goes at the end */
829         if (i <= newitemoff)
830         {
831                 if (newitemonleft)
832                 {
833                         _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
834                                                  "left sibling");
835                         *itup_off = leftoff;
836                         *itup_blkno = BufferGetBlockNumber(buf);
837                         leftoff = OffsetNumberNext(leftoff);
838                 }
839                 else
840                 {
841                         _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
842                                                  "right sibling");
843                         *itup_off = rightoff;
844                         *itup_blkno = BufferGetBlockNumber(rbuf);
845                         rightoff = OffsetNumberNext(rightoff);
846                 }
847         }
848
849         /*
850          * We have to grab the right sibling (if any) and fix the prev pointer
851          * there. We are guaranteed that this is deadlock-free since no other
852          * writer will be holding a lock on that page and trying to move left,
853          * and all readers release locks on a page before trying to fetch its
854          * neighbors.
855          */
856
857         if (!P_RIGHTMOST(ropaque))
858         {
859                 sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
860                 spage = BufferGetPage(sbuf);
861         }
862
863         /*
864          * Right sibling is locked, new siblings are prepared, but original
865          * page is not updated yet. Log changes before continuing.
866          *
867          * NO ELOG(ERROR) till right sibling is updated.
868          */
869         START_CRIT_SECTION();
870         {
871                 xl_btree_split xlrec;
872                 int                     flag = (newitemonleft) ?
873                 XLOG_BTREE_SPLEFT : XLOG_BTREE_SPLIT;
874                 BlockNumber blkno;
875                 XLogRecPtr      recptr;
876                 XLogRecData rdata[4];
877
878                 xlrec.target.node = rel->rd_node;
879                 ItemPointerSet(&(xlrec.target.tid), *itup_blkno, *itup_off);
880                 if (newitemonleft)
881                 {
882                         blkno = BufferGetBlockNumber(rbuf);
883                         BlockIdSet(&(xlrec.otherblk), blkno);
884                 }
885                 else
886                 {
887                         blkno = BufferGetBlockNumber(buf);
888                         BlockIdSet(&(xlrec.otherblk), blkno);
889                 }
890                 BlockIdSet(&(xlrec.parentblk), lopaque->btpo_parent);
891                 BlockIdSet(&(xlrec.leftblk), lopaque->btpo_prev);
892                 BlockIdSet(&(xlrec.rightblk), ropaque->btpo_next);
893
894                 /*
895                  * Dirrect access to page is not good but faster - we should
896                  * implement some new func in page API.
897                  */
898                 xlrec.leftlen = ((PageHeader) leftpage)->pd_special -
899                         ((PageHeader) leftpage)->pd_upper;
900                 rdata[0].buffer = InvalidBuffer;
901                 rdata[0].data = (char *) &xlrec;
902                 rdata[0].len = SizeOfBtreeSplit;
903                 rdata[0].next = &(rdata[1]);
904
905                 rdata[1].buffer = InvalidBuffer;
906                 rdata[1].data = (char *) leftpage + ((PageHeader) leftpage)->pd_upper;
907                 rdata[1].len = xlrec.leftlen;
908                 rdata[1].next = &(rdata[2]);
909
910                 rdata[2].buffer = InvalidBuffer;
911                 rdata[2].data = (char *) rightpage + ((PageHeader) rightpage)->pd_upper;
912                 rdata[2].len = ((PageHeader) rightpage)->pd_special -
913                         ((PageHeader) rightpage)->pd_upper;
914                 rdata[2].next = NULL;
915
916                 if (!P_RIGHTMOST(ropaque))
917                 {
918                         BTPageOpaque sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
919
920                         sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
921
922                         rdata[2].next = &(rdata[3]);
923                         rdata[3].buffer = sbuf;
924                         rdata[3].data = NULL;
925                         rdata[3].len = 0;
926                         rdata[3].next = NULL;
927                 }
928
929                 if (P_ISLEAF(lopaque))
930                         flag |= XLOG_BTREE_LEAF;
931
932                 recptr = XLogInsert(RM_BTREE_ID, flag, rdata);
933
934                 PageSetLSN(leftpage, recptr);
935                 PageSetSUI(leftpage, ThisStartUpID);
936                 PageSetLSN(rightpage, recptr);
937                 PageSetSUI(rightpage, ThisStartUpID);
938                 if (!P_RIGHTMOST(ropaque))
939                 {
940                         PageSetLSN(spage, recptr);
941                         PageSetSUI(spage, ThisStartUpID);
942                 }
943         }
944
945         /*
946          * By here, the original data page has been split into two new halves,
947          * and these are correct.  The algorithm requires that the left page
948          * never move during a split, so we copy the new left page back on top
949          * of the original.  Note that this is not a waste of time, since we
950          * also require (in the page management code) that the center of a
951          * page always be clean, and the most efficient way to guarantee this
952          * is just to compact the data by reinserting it into a new left page.
953          */
954
955         PageRestoreTempPage(leftpage, origpage);
956
957         END_CRIT_SECTION();
958
959         /* write and release the old right sibling */
960         if (!P_RIGHTMOST(ropaque))
961                 _bt_wrtbuf(rel, sbuf);
962
963         /* split's done */
964         return rbuf;
965 }
966
967 /*
968  *      _bt_findsplitloc() -- find an appropriate place to split a page.
969  *
970  * The idea here is to equalize the free space that will be on each split
971  * page, *after accounting for the inserted tuple*.  (If we fail to account
972  * for it, we might find ourselves with too little room on the page that
973  * it needs to go into!)
974  *
975  * If the page is the rightmost page on its level, we instead try to arrange
976  * for twice as much free space on the right as on the left.  In this way,
977  * when we are inserting successively increasing keys (consider sequences,
978  * timestamps, etc) we will end up with a tree whose pages are about 67% full,
979  * instead of the 50% full result that we'd get without this special case.
980  * (We could bias it even further to make the initially-loaded tree more full.
981  * But since the steady-state load for a btree is about 70%, we'd likely just
982  * be making more page-splitting work for ourselves later on, when we start
983  * seeing updates to existing tuples.)
984  *
985  * We are passed the intended insert position of the new tuple, expressed as
986  * the offsetnumber of the tuple it must go in front of.  (This could be
987  * maxoff+1 if the tuple is to go at the end.)
988  *
989  * We return the index of the first existing tuple that should go on the
990  * righthand page, plus a boolean indicating whether the new tuple goes on
991  * the left or right page.      The bool is necessary to disambiguate the case
992  * where firstright == newitemoff.
993  */
994 static OffsetNumber
995 _bt_findsplitloc(Relation rel,
996                                  Page page,
997                                  OffsetNumber newitemoff,
998                                  Size newitemsz,
999                                  bool *newitemonleft)
1000 {
1001         BTPageOpaque opaque;
1002         OffsetNumber offnum;
1003         OffsetNumber maxoff;
1004         ItemId          itemid;
1005         FindSplitData state;
1006         int                     leftspace,
1007                                 rightspace,
1008                                 goodenough,
1009                                 dataitemtotal,
1010                                 dataitemstoleft;
1011
1012         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1013
1014         /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
1015         newitemsz += sizeof(ItemIdData);
1016         state.newitemsz = newitemsz;
1017         state.is_leaf = P_ISLEAF(opaque);
1018         state.is_rightmost = P_RIGHTMOST(opaque);
1019         state.have_split = false;
1020
1021         /* Total free space available on a btree page, after fixed overhead */
1022         leftspace = rightspace =
1023                 PageGetPageSize(page) - sizeof(PageHeaderData) -
1024                 MAXALIGN(sizeof(BTPageOpaqueData))
1025                 +sizeof(ItemIdData);
1026
1027         /*
1028          * Finding the best possible split would require checking all the
1029          * possible split points, because of the high-key and left-key special
1030          * cases. That's probably more work than it's worth; instead, stop as
1031          * soon as we find a "good-enough" split, where good-enough is defined
1032          * as an imbalance in free space of no more than pagesize/16
1033          * (arbitrary...) This should let us stop near the middle on most
1034          * pages, instead of plowing to the end.
1035          */
1036         goodenough = leftspace / 16;
1037
1038         /* The right page will have the same high key as the old page */
1039         if (!P_RIGHTMOST(opaque))
1040         {
1041                 itemid = PageGetItemId(page, P_HIKEY);
1042                 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1043                                                          sizeof(ItemIdData));
1044         }
1045
1046         /* Count up total space in data items without actually scanning 'em */
1047         dataitemtotal = rightspace - (int) PageGetFreeSpace(page);
1048
1049         /*
1050          * Scan through the data items and calculate space usage for a split
1051          * at each possible position.
1052          */
1053         dataitemstoleft = 0;
1054         maxoff = PageGetMaxOffsetNumber(page);
1055
1056         for (offnum = P_FIRSTDATAKEY(opaque);
1057                  offnum <= maxoff;
1058                  offnum = OffsetNumberNext(offnum))
1059         {
1060                 Size            itemsz;
1061                 int                     leftfree,
1062                                         rightfree;
1063
1064                 itemid = PageGetItemId(page, offnum);
1065                 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1066
1067                 /*
1068                  * We have to allow for the current item becoming the high key of
1069                  * the left page; therefore it counts against left space as well
1070                  * as right space.
1071                  */
1072                 leftfree = leftspace - dataitemstoleft - (int) itemsz;
1073                 rightfree = rightspace - (dataitemtotal - dataitemstoleft);
1074
1075                 /*
1076                  * Will the new item go to left or right of split?
1077                  */
1078                 if (offnum > newitemoff)
1079                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1080                                                           true, itemsz);
1081                 else if (offnum < newitemoff)
1082                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1083                                                           false, itemsz);
1084                 else
1085                 {
1086                         /* need to try it both ways! */
1087                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1088                                                           true, itemsz);
1089                         /* here we are contemplating newitem as first on right */
1090                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1091                                                           false, newitemsz);
1092                 }
1093
1094                 /* Abort scan once we find a good-enough choice */
1095                 if (state.have_split && state.best_delta <= goodenough)
1096                         break;
1097
1098                 dataitemstoleft += itemsz;
1099         }
1100
1101         /*
1102          * I believe it is not possible to fail to find a feasible split, but
1103          * just in case ...
1104          */
1105         if (!state.have_split)
1106                 elog(FATAL, "_bt_findsplitloc: can't find a feasible split point for %s",
1107                          RelationGetRelationName(rel));
1108
1109         *newitemonleft = state.newitemonleft;
1110         return state.firstright;
1111 }
1112
1113 /*
1114  * Subroutine to analyze a particular possible split choice (ie, firstright
1115  * and newitemonleft settings), and record the best split so far in *state.
1116  */
1117 static void
1118 _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
1119                                   int leftfree, int rightfree,
1120                                   bool newitemonleft, Size firstrightitemsz)
1121 {
1122         /*
1123          * Account for the new item on whichever side it is to be put.
1124          */
1125         if (newitemonleft)
1126                 leftfree -= (int) state->newitemsz;
1127         else
1128                 rightfree -= (int) state->newitemsz;
1129
1130         /*
1131          * If we are not on the leaf level, we will be able to discard the key
1132          * data from the first item that winds up on the right page.
1133          */
1134         if (!state->is_leaf)
1135                 rightfree += (int) firstrightitemsz -
1136                         (int) (MAXALIGN(sizeof(BTItemData)) + sizeof(ItemIdData));
1137
1138         /*
1139          * If feasible split point, remember best delta.
1140          */
1141         if (leftfree >= 0 && rightfree >= 0)
1142         {
1143                 int                     delta;
1144
1145                 if (state->is_rightmost)
1146                 {
1147                         /*
1148                          * On a rightmost page, try to equalize right free space with
1149                          * twice the left free space.  See comments for
1150                          * _bt_findsplitloc.
1151                          */
1152                         delta = (2 * leftfree) - rightfree;
1153                 }
1154                 else
1155                 {
1156                         /* Otherwise, aim for equal free space on both sides */
1157                         delta = leftfree - rightfree;
1158                 }
1159
1160                 if (delta < 0)
1161                         delta = -delta;
1162                 if (!state->have_split || delta < state->best_delta)
1163                 {
1164                         state->have_split = true;
1165                         state->newitemonleft = newitemonleft;
1166                         state->firstright = firstright;
1167                         state->best_delta = delta;
1168                 }
1169         }
1170 }
1171
1172 /*
1173  *      _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1174  *                                               we last looked at in the parent.
1175  *
1176  *              This is possible because we save a bit image of the last item
1177  *              we looked at in the parent, and the update algorithm guarantees
1178  *              that if items above us in the tree move, they only move right.
1179  *
1180  *              Also, re-set bts_blkno & bts_offset if changed.
1181  */
1182 static Buffer
1183 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1184 {
1185         BlockNumber blkno;
1186         Buffer          buf;
1187         OffsetNumber start,
1188                                 offnum,
1189                                 maxoff;
1190         Page            page;
1191         ItemId          itemid;
1192         BTItem          item;
1193         BTPageOpaque opaque;
1194
1195         blkno = stack->bts_blkno;
1196         buf = _bt_getbuf(rel, blkno, access);
1197         page = BufferGetPage(buf);
1198         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1199         maxoff = PageGetMaxOffsetNumber(page);
1200
1201         start = stack->bts_offset;
1202
1203         /*
1204          * _bt_insertonpg set bts_offset to InvalidOffsetNumber in the case of
1205          * concurrent ROOT page split.  Also, watch out for possibility that
1206          * page has a high key now when it didn't before.
1207          */
1208         if (start < P_FIRSTDATAKEY(opaque))
1209                 start = P_FIRSTDATAKEY(opaque);
1210
1211         for (;;)
1212         {
1213                 /* see if it's on this page */
1214                 for (offnum = start;
1215                          offnum <= maxoff;
1216                          offnum = OffsetNumberNext(offnum))
1217                 {
1218                         itemid = PageGetItemId(page, offnum);
1219                         item = (BTItem) PageGetItem(page, itemid);
1220                         if (BTItemSame(item, &stack->bts_btitem))
1221                         {
1222                                 /* Return accurate pointer to where link is now */
1223                                 stack->bts_blkno = blkno;
1224                                 stack->bts_offset = offnum;
1225                                 return buf;
1226                         }
1227                 }
1228
1229                 /*
1230                  * by here, the item we're looking for moved right at least one
1231                  * page
1232                  */
1233                 if (P_RIGHTMOST(opaque))
1234                 {
1235                         _bt_relbuf(rel, buf);
1236                         return (InvalidBuffer);
1237                 }
1238
1239                 blkno = opaque->btpo_next;
1240                 _bt_relbuf(rel, buf);
1241                 buf = _bt_getbuf(rel, blkno, access);
1242                 page = BufferGetPage(buf);
1243                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1244                 maxoff = PageGetMaxOffsetNumber(page);
1245                 start = P_FIRSTDATAKEY(opaque);
1246         }
1247 }
1248
1249 /*
1250  *      _bt_newroot() -- Create a new root page for the index.
1251  *
1252  *              We've just split the old root page and need to create a new one.
1253  *              In order to do this, we add a new root page to the file, then lock
1254  *              the metadata page and update it.  This is guaranteed to be deadlock-
1255  *              free, because all readers release their locks on the metadata page
1256  *              before trying to lock the root, and all writers lock the root before
1257  *              trying to lock the metadata page.  We have a write lock on the old
1258  *              root page, so we have not introduced any cycles into the waits-for
1259  *              graph.
1260  *
1261  *              On entry, lbuf (the old root) and rbuf (its new peer) are write-
1262  *              locked. On exit, a new root page exists with entries for the
1263  *              two new children, metapage is updated and unlocked/unpinned.
1264  *              The new root buffer is returned to caller which has to unlock/unpin
1265  *              lbuf, rbuf & rootbuf.
1266  */
1267 static Buffer
1268 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1269 {
1270         Buffer          rootbuf;
1271         Page            lpage,
1272                                 rpage,
1273                                 rootpage;
1274         BlockNumber lbkno,
1275                                 rbkno;
1276         BlockNumber rootblknum;
1277         BTPageOpaque rootopaque;
1278         ItemId          itemid;
1279         BTItem          item;
1280         Size            itemsz;
1281         BTItem          new_item;
1282         Buffer          metabuf;
1283         Page            metapg;
1284         BTMetaPageData *metad;
1285
1286         /* get a new root page */
1287         rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1288         rootpage = BufferGetPage(rootbuf);
1289         rootblknum = BufferGetBlockNumber(rootbuf);
1290         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1291         metapg = BufferGetPage(metabuf);
1292         metad = BTPageGetMeta(metapg);
1293
1294         /* NO ELOG(ERROR) from here till newroot op is logged */
1295         START_CRIT_SECTION();
1296
1297         /* set btree special data */
1298         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1299         rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1300         rootopaque->btpo_flags |= BTP_ROOT;
1301         rootopaque->btpo_parent = BTREE_METAPAGE;
1302
1303         lbkno = BufferGetBlockNumber(lbuf);
1304         rbkno = BufferGetBlockNumber(rbuf);
1305         lpage = BufferGetPage(lbuf);
1306         rpage = BufferGetPage(rbuf);
1307
1308         /*
1309          * Make sure pages in old root level have valid parent links --- we
1310          * will need this in _bt_insertonpg() if a concurrent root split
1311          * happens (see README).
1312          */
1313         ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo_parent =
1314                 ((BTPageOpaque) PageGetSpecialPointer(rpage))->btpo_parent =
1315                 rootblknum;
1316
1317         /*
1318          * Create downlink item for left page (old root).  Since this will be
1319          * the first item in a non-leaf page, it implicitly has minus-infinity
1320          * key value, so we need not store any actual key in it.
1321          */
1322         itemsz = sizeof(BTItemData);
1323         new_item = (BTItem) palloc(itemsz);
1324         new_item->bti_itup.t_info = itemsz;
1325         ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY);
1326
1327         /*
1328          * Insert the left page pointer into the new root page.  The root page
1329          * is the rightmost page on its level so there is no "high key" in it;
1330          * the two items will go into positions P_HIKEY and P_FIRSTKEY.
1331          */
1332         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
1333                 elog(PANIC, "btree: failed to add leftkey to new root page");
1334         pfree(new_item);
1335
1336         /*
1337          * Create downlink item for right page.  The key for it is obtained
1338          * from the "high key" position in the left page.
1339          */
1340         itemid = PageGetItemId(lpage, P_HIKEY);
1341         itemsz = ItemIdGetLength(itemid);
1342         item = (BTItem) PageGetItem(lpage, itemid);
1343         new_item = _bt_formitem(&(item->bti_itup));
1344         ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY);
1345
1346         /*
1347          * insert the right page pointer into the new root page.
1348          */
1349         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
1350                 elog(PANIC, "btree: failed to add rightkey to new root page");
1351         pfree(new_item);
1352
1353         metad->btm_root = rootblknum;
1354         (metad->btm_level)++;
1355
1356         /* XLOG stuff */
1357         {
1358                 xl_btree_newroot xlrec;
1359                 XLogRecPtr      recptr;
1360                 XLogRecData rdata[2];
1361
1362                 xlrec.node = rel->rd_node;
1363                 xlrec.level = metad->btm_level;
1364                 BlockIdSet(&(xlrec.rootblk), rootblknum);
1365                 rdata[0].buffer = InvalidBuffer;
1366                 rdata[0].data = (char *) &xlrec;
1367                 rdata[0].len = SizeOfBtreeNewroot;
1368                 rdata[0].next = &(rdata[1]);
1369
1370                 /*
1371                  * Dirrect access to page is not good but faster - we should
1372                  * implement some new func in page API.
1373                  */
1374                 rdata[1].buffer = InvalidBuffer;
1375                 rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
1376                 rdata[1].len = ((PageHeader) rootpage)->pd_special -
1377                         ((PageHeader) rootpage)->pd_upper;
1378                 rdata[1].next = NULL;
1379
1380                 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
1381
1382                 PageSetLSN(rootpage, recptr);
1383                 PageSetSUI(rootpage, ThisStartUpID);
1384                 PageSetLSN(metapg, recptr);
1385                 PageSetSUI(metapg, ThisStartUpID);
1386
1387                 /* we changed their btpo_parent */
1388                 PageSetLSN(lpage, recptr);
1389                 PageSetSUI(lpage, ThisStartUpID);
1390                 PageSetLSN(rpage, recptr);
1391                 PageSetSUI(rpage, ThisStartUpID);
1392         }
1393         END_CRIT_SECTION();
1394
1395         /* write and let go of metapage buffer */
1396         _bt_wrtbuf(rel, metabuf);
1397
1398         return (rootbuf);
1399 }
1400
1401 /*
1402  * In the event old root page was splitted but no new one was created we
1403  * build required parent levels keeping write lock on old root page.
1404  * Note: it's assumed that old root page' btpo_parent points to meta page,
1405  * ie not to parent page. On exit, new root page buffer is write locked.
1406  * If "release" is TRUE then oldrootbuf will be released immediately
1407  * after upper level is builded.
1408  */
1409 Buffer
1410 _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release)
1411 {
1412         Buffer          rootbuf;
1413         BlockNumber rootblk;
1414         Page            rootpage;
1415         XLogRecPtr      rootLSN;
1416         Page            oldrootpage = BufferGetPage(oldrootbuf);
1417         BTPageOpaque oldrootopaque = (BTPageOpaque)
1418         PageGetSpecialPointer(oldrootpage);
1419         Buffer          buf,
1420                                 leftbuf,
1421                                 rightbuf;
1422         Page            page,
1423                                 leftpage,
1424                                 rightpage;
1425         BTPageOpaque opaque,
1426                                 leftopaque,
1427                                 rightopaque;
1428         OffsetNumber newitemoff;
1429         BTItem          btitem,
1430                                 ritem;
1431         Size            itemsz;
1432
1433         if (!P_LEFTMOST(oldrootopaque) || P_RIGHTMOST(oldrootopaque))
1434                 elog(ERROR, "bt_fixroot: not valid old root page");
1435
1436         /* Read right neighbor and create new root page */
1437         leftbuf = _bt_getbuf(rel, oldrootopaque->btpo_next, BT_WRITE);
1438         leftpage = BufferGetPage(leftbuf);
1439         leftopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
1440         rootbuf = _bt_newroot(rel, oldrootbuf, leftbuf);
1441         rootpage = BufferGetPage(rootbuf);
1442         rootLSN = PageGetLSN(rootpage);
1443         rootblk = BufferGetBlockNumber(rootbuf);
1444
1445         /* parent page where to insert pointers */
1446         buf = rootbuf;
1447         page = BufferGetPage(buf);
1448         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1449
1450         /*
1451          * Now read other pages (if any) on level and add them to new root.
1452          * Here we break one of our locking rules - never hold lock on parent
1453          * page when acquiring lock on its child, - but we free from deadlock:
1454          *
1455          * If concurrent process will split one of pages on this level then it
1456          * will see either btpo_parent == metablock or btpo_parent == rootblk.
1457          * In first case it will give up its locks and walk to the leftmost
1458          * page (oldrootbuf) in _bt_fixup() - ie it will wait for us and let
1459          * us continue. In second case it will try to lock rootbuf keeping its
1460          * locks on buffers we already passed, also waiting for us. If we'll
1461          * have to unlock rootbuf (split it) and that process will have to
1462          * split page of new level we created (level of rootbuf) then it will
1463          * wait while we create upper level. Etc.
1464          */
1465         while (!P_RIGHTMOST(leftopaque))
1466         {
1467                 rightbuf = _bt_getbuf(rel, leftopaque->btpo_next, BT_WRITE);
1468                 rightpage = BufferGetPage(rightbuf);
1469                 rightopaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
1470
1471                 /*
1472                  * Update LSN & StartUpID of child page buffer to ensure that it
1473                  * will be written on disk after flushing log record for new root
1474                  * creation. Unfortunately, for the moment (?) we do not log this
1475                  * operation and so possibly break our rule to log entire page
1476                  * content on first after checkpoint modification.
1477                  */
1478                 HOLD_INTERRUPTS();
1479                 rightopaque->btpo_parent = rootblk;
1480                 if (XLByteLT(PageGetLSN(rightpage), rootLSN))
1481                         PageSetLSN(rightpage, rootLSN);
1482                 PageSetSUI(rightpage, ThisStartUpID);
1483                 RESUME_INTERRUPTS();
1484
1485                 ritem = (BTItem) PageGetItem(leftpage, PageGetItemId(leftpage, P_HIKEY));
1486                 btitem = _bt_formitem(&(ritem->bti_itup));
1487                 ItemPointerSet(&(btitem->bti_itup.t_tid), leftopaque->btpo_next, P_HIKEY);
1488                 itemsz = IndexTupleDSize(btitem->bti_itup)
1489                         + (sizeof(BTItemData) - sizeof(IndexTupleData));
1490                 itemsz = MAXALIGN(itemsz);
1491
1492                 newitemoff = OffsetNumberNext(PageGetMaxOffsetNumber(page));
1493
1494                 if (PageGetFreeSpace(page) < itemsz)
1495                 {
1496                         Buffer          newbuf;
1497                         OffsetNumber firstright;
1498                         OffsetNumber itup_off;
1499                         BlockNumber itup_blkno;
1500                         bool            newitemonleft;
1501
1502                         firstright = _bt_findsplitloc(rel, page,
1503                                                                          newitemoff, itemsz, &newitemonleft);
1504                         newbuf = _bt_split(rel, buf, firstright,
1505                                                            newitemoff, itemsz, btitem, newitemonleft,
1506                                                            &itup_off, &itup_blkno);
1507                         /* Keep lock on new "root" buffer ! */
1508                         if (buf != rootbuf)
1509                                 _bt_relbuf(rel, buf);
1510                         buf = newbuf;
1511                         page = BufferGetPage(buf);
1512                         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1513                 }
1514                 else
1515                         _bt_insertuple(rel, buf, itemsz, btitem, newitemoff);
1516
1517                 /* give up left buffer */
1518                 _bt_wrtbuf(rel, leftbuf);
1519                 pfree(btitem);
1520                 leftbuf = rightbuf;
1521                 leftpage = rightpage;
1522                 leftopaque = rightopaque;
1523         }
1524
1525         /* give up rightmost page buffer */
1526         _bt_wrtbuf(rel, leftbuf);
1527
1528         /*
1529          * Here we hold locks on old root buffer, new root buffer we've
1530          * created with _bt_newroot() - rootbuf, - and buf we've used for last
1531          * insert ops - buf. If rootbuf != buf then we have to create at least
1532          * one more level. And if "release" is TRUE then we give up
1533          * oldrootbuf.
1534          */
1535         if (release)
1536                 _bt_wrtbuf(rel, oldrootbuf);
1537
1538         if (rootbuf != buf)
1539         {
1540                 _bt_wrtbuf(rel, buf);
1541                 return (_bt_fixroot(rel, rootbuf, true));
1542         }
1543
1544         return (rootbuf);
1545 }
1546
1547 /*
1548  * Using blkno of leftmost page on a level inside tree this func
1549  * checks/fixes tree from this level up to the root page.
1550  */
1551 static void
1552 _bt_fixtree(Relation rel, BlockNumber blkno)
1553 {
1554         Buffer          buf;
1555         Page            page;
1556         BTPageOpaque opaque;
1557         BlockNumber pblkno;
1558
1559         for (;;)
1560         {
1561                 buf = _bt_getbuf(rel, blkno, BT_READ);
1562                 page = BufferGetPage(buf);
1563                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1564                 if (!P_LEFTMOST(opaque) || P_ISLEAF(opaque))
1565                         elog(ERROR, "bt_fixtree[%s]: invalid start page (need to recreate index)", RelationGetRelationName(rel));
1566                 pblkno = opaque->btpo_parent;
1567
1568                 /* check/fix entire level */
1569                 _bt_fixlevel(rel, buf, InvalidBlockNumber);
1570
1571                 /*
1572                  * No pins/locks are held here. Re-read start page if its
1573                  * btpo_parent pointed to meta page else go up one level.
1574                  *
1575                  * XXX have to catch InvalidBlockNumber at the moment -:(
1576                  */
1577                 if (pblkno == BTREE_METAPAGE || pblkno == InvalidBlockNumber)
1578                 {
1579                         buf = _bt_getbuf(rel, blkno, BT_WRITE);
1580                         page = BufferGetPage(buf);
1581                         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1582                         if (P_ISROOT(opaque))
1583                         {
1584                                 /* Tree is Ok now */
1585                                 _bt_relbuf(rel, buf);
1586                                 return;
1587                         }
1588                         /* Call _bt_fixroot() if there is no upper level */
1589                         if (BTreeInvalidParent(opaque))
1590                         {
1591                                 elog(WARNING, "bt_fixtree[%s]: fixing root page", RelationGetRelationName(rel));
1592                                 buf = _bt_fixroot(rel, buf, true);
1593                                 _bt_relbuf(rel, buf);
1594                                 return;
1595                         }
1596                         /* Have to go up one level */
1597                         pblkno = opaque->btpo_parent;
1598                         _bt_relbuf(rel, buf);
1599                 }
1600                 blkno = pblkno;
1601         }
1602
1603 }
1604
1605 /*
1606  * Check/fix level starting from page in buffer buf up to block
1607  * limit on *child* level (or till rightmost child page if limit
1608  * is InvalidBlockNumber). Start buffer must be read locked.
1609  * No pins/locks are held on exit.
1610  */
1611 static void
1612 _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
1613 {
1614         BlockNumber blkno = BufferGetBlockNumber(buf);
1615         Page            page;
1616         BTPageOpaque opaque;
1617         BlockNumber cblkno[3];
1618         OffsetNumber coff[3];
1619         Buffer          cbuf[3];
1620         Page            cpage[3];
1621         BTPageOpaque copaque[3];
1622         BTItem          btitem;
1623         int                     cidx,
1624                                 i;
1625         bool            goodbye = false;
1626         char            tbuf[BLCKSZ];
1627
1628         page = BufferGetPage(buf);
1629         /* copy page to temp storage */
1630         memmove(tbuf, page, PageGetPageSize(page));
1631         _bt_relbuf(rel, buf);
1632
1633         page = (Page) tbuf;
1634         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1635
1636         /* Initialize first child data */
1637         coff[0] = P_FIRSTDATAKEY(opaque);
1638         if (coff[0] > PageGetMaxOffsetNumber(page))
1639                 elog(ERROR, "bt_fixlevel[%s]: invalid maxoff on start page (need to recreate index)", RelationGetRelationName(rel));
1640         btitem = (BTItem) PageGetItem(page, PageGetItemId(page, coff[0]));
1641         cblkno[0] = ItemPointerGetBlockNumber(&(btitem->bti_itup.t_tid));
1642         cbuf[0] = _bt_getbuf(rel, cblkno[0], BT_READ);
1643         cpage[0] = BufferGetPage(cbuf[0]);
1644         copaque[0] = (BTPageOpaque) PageGetSpecialPointer(cpage[0]);
1645         if (P_LEFTMOST(opaque) && !P_LEFTMOST(copaque[0]))
1646                 elog(ERROR, "bt_fixtlevel[%s]: non-leftmost child page of leftmost parent (need to recreate index)", RelationGetRelationName(rel));
1647         /* caller should take care and avoid this */
1648         if (P_RIGHTMOST(copaque[0]))
1649                 elog(ERROR, "bt_fixtlevel[%s]: invalid start child (need to recreate index)", RelationGetRelationName(rel));
1650
1651         for (;;)
1652         {
1653                 /*
1654                  * Read up to 2 more child pages and look for pointers to them in
1655                  * *saved* parent page
1656                  */
1657                 coff[1] = coff[2] = InvalidOffsetNumber;
1658                 for (cidx = 0; cidx < 2;)
1659                 {
1660                         cidx++;
1661                         cblkno[cidx] = (copaque[cidx - 1])->btpo_next;
1662                         cbuf[cidx] = _bt_getbuf(rel, cblkno[cidx], BT_READ);
1663                         cpage[cidx] = BufferGetPage(cbuf[cidx]);
1664                         copaque[cidx] = (BTPageOpaque) PageGetSpecialPointer(cpage[cidx]);
1665                         coff[cidx] = _bt_getoff(page, cblkno[cidx]);
1666
1667                         /* sanity check */
1668                         if (coff[cidx] != InvalidOffsetNumber)
1669                         {
1670                                 for (i = cidx - 1; i >= 0; i--)
1671                                 {
1672                                         if (coff[i] == InvalidOffsetNumber)
1673                                                 continue;
1674                                         if (coff[cidx] != coff[i] + 1)
1675                                                 elog(ERROR, "bt_fixlevel[%s]: invalid item order(1) (need to recreate index)", RelationGetRelationName(rel));
1676                                         break;
1677                                 }
1678                         }
1679
1680                         if (P_RIGHTMOST(copaque[cidx]))
1681                                 break;
1682                 }
1683
1684                 /*
1685                  * Read parent page and insert missed pointers.
1686                  */
1687                 if (coff[1] == InvalidOffsetNumber ||
1688                         (cidx == 2 && coff[2] == InvalidOffsetNumber))
1689                 {
1690                         Buffer          newbuf;
1691                         Page            newpage;
1692                         BTPageOpaque newopaque;
1693                         BTItem          ritem;
1694                         Size            itemsz;
1695                         OffsetNumber newitemoff;
1696                         BlockNumber parblk[3];
1697                         BTStackData stack;
1698
1699                         stack.bts_parent = NULL;
1700                         stack.bts_blkno = blkno;
1701                         stack.bts_offset = InvalidOffsetNumber;
1702                         ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid),
1703                                                    cblkno[0], P_HIKEY);
1704
1705                         buf = _bt_getstackbuf(rel, &stack, BT_WRITE);
1706                         if (buf == InvalidBuffer)
1707                                 elog(ERROR, "bt_fixlevel[%s]: pointer disappeared (need to recreate index)", RelationGetRelationName(rel));
1708
1709                         page = BufferGetPage(buf);
1710                         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1711                         coff[0] = stack.bts_offset;
1712                         blkno = BufferGetBlockNumber(buf);
1713                         parblk[0] = blkno;
1714
1715                         /* Check/insert missed pointers */
1716                         for (i = 1; i <= cidx; i++)
1717                         {
1718                                 coff[i] = _bt_getoff(page, cblkno[i]);
1719
1720                                 /* sanity check */
1721                                 parblk[i] = BufferGetBlockNumber(buf);
1722                                 if (coff[i] != InvalidOffsetNumber)
1723                                 {
1724                                         if (parblk[i] == parblk[i - 1] &&
1725                                                 coff[i] != coff[i - 1] + 1)
1726                                                 elog(ERROR, "bt_fixlevel[%s]: invalid item order(2) (need to recreate index)", RelationGetRelationName(rel));
1727                                         continue;
1728                                 }
1729                                 /* Have to check next page ? */
1730                                 if ((!P_RIGHTMOST(opaque)) &&
1731                                         coff[i - 1] == PageGetMaxOffsetNumber(page))            /* yes */
1732                                 {
1733                                         newbuf = _bt_getbuf(rel, opaque->btpo_next, BT_WRITE);
1734                                         newpage = BufferGetPage(newbuf);
1735                                         newopaque = (BTPageOpaque) PageGetSpecialPointer(newpage);
1736                                         coff[i] = _bt_getoff(newpage, cblkno[i]);
1737                                         if (coff[i] != InvalidOffsetNumber) /* found ! */
1738                                         {
1739                                                 if (coff[i] != P_FIRSTDATAKEY(newopaque))
1740                                                         elog(ERROR, "bt_fixlevel[%s]: invalid item order(3) (need to recreate index)", RelationGetRelationName(rel));
1741                                                 _bt_relbuf(rel, buf);
1742                                                 buf = newbuf;
1743                                                 page = newpage;
1744                                                 opaque = newopaque;
1745                                                 blkno = BufferGetBlockNumber(buf);
1746                                                 parblk[i] = blkno;
1747                                                 continue;
1748                                         }
1749                                         /* unfound - need to insert on current page */
1750                                         _bt_relbuf(rel, newbuf);
1751                                 }
1752                                 /* insert pointer */
1753                                 ritem = (BTItem) PageGetItem(cpage[i - 1],
1754                                                                    PageGetItemId(cpage[i - 1], P_HIKEY));
1755                                 btitem = _bt_formitem(&(ritem->bti_itup));
1756                                 ItemPointerSet(&(btitem->bti_itup.t_tid), cblkno[i], P_HIKEY);
1757                                 itemsz = IndexTupleDSize(btitem->bti_itup)
1758                                         + (sizeof(BTItemData) - sizeof(IndexTupleData));
1759                                 itemsz = MAXALIGN(itemsz);
1760
1761                                 newitemoff = coff[i - 1] + 1;
1762
1763                                 if (PageGetFreeSpace(page) < itemsz)
1764                                 {
1765                                         OffsetNumber firstright;
1766                                         OffsetNumber itup_off;
1767                                         BlockNumber itup_blkno;
1768                                         bool            newitemonleft;
1769
1770                                         firstright = _bt_findsplitloc(rel, page,
1771                                                                          newitemoff, itemsz, &newitemonleft);
1772                                         newbuf = _bt_split(rel, buf, firstright,
1773                                                            newitemoff, itemsz, btitem, newitemonleft,
1774                                                                            &itup_off, &itup_blkno);
1775                                         /* what buffer we need in ? */
1776                                         if (newitemonleft)
1777                                                 _bt_relbuf(rel, newbuf);
1778                                         else
1779                                         {
1780                                                 _bt_relbuf(rel, buf);
1781                                                 buf = newbuf;
1782                                                 page = BufferGetPage(buf);
1783                                                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1784                                         }
1785                                         blkno = BufferGetBlockNumber(buf);
1786                                         coff[i] = itup_off;
1787                                 }
1788                                 else
1789                                 {
1790                                         _bt_insertuple(rel, buf, itemsz, btitem, newitemoff);
1791                                         coff[i] = newitemoff;
1792                                 }
1793
1794                                 pfree(btitem);
1795                                 parblk[i] = blkno;
1796                         }
1797
1798                         /* copy page with pointer to cblkno[cidx] to temp storage */
1799                         memmove(tbuf, page, PageGetPageSize(page));
1800                         _bt_relbuf(rel, buf);
1801                         page = (Page) tbuf;
1802                         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1803                 }
1804
1805                 /* Continue if current check/fix level page is rightmost */
1806                 if (P_RIGHTMOST(opaque))
1807                         goodbye = false;
1808
1809                 /* Pointers to child pages are Ok - right end of child level ? */
1810                 _bt_relbuf(rel, cbuf[0]);
1811                 _bt_relbuf(rel, cbuf[1]);
1812                 if (cidx == 1 ||
1813                         (cidx == 2 && (P_RIGHTMOST(copaque[2]) || goodbye)))
1814                 {
1815                         if (cidx == 2)
1816                                 _bt_relbuf(rel, cbuf[2]);
1817                         return;
1818                 }
1819                 if (cblkno[0] == limit || cblkno[1] == limit)
1820                         goodbye = true;
1821                 cblkno[0] = cblkno[2];
1822                 cbuf[0] = cbuf[2];
1823                 cpage[0] = cpage[2];
1824                 copaque[0] = copaque[2];
1825                 coff[0] = coff[2];
1826         }
1827 }
1828
1829 /*
1830  * Check/fix part of tree - branch - up from parent of level with blocks
1831  * lblkno and rblknum. We first ensure that parent level has pointers
1832  * to both lblkno & rblknum and if those pointers are on different
1833  * parent pages then do the same for parent level, etc. No locks must
1834  * be held on target level and upper on entry. No locks will be held
1835  * on exit. Stack created when traversing tree down should be provided and
1836  * it must points to parent level. rblkno must be on the right from lblkno.
1837  * (This function is special edition of more expensive _bt_fixtree(),
1838  * but it doesn't guarantee full consistency of tree.)
1839  */
1840 static void
1841 _bt_fixbranch(Relation rel, BlockNumber lblkno,
1842                           BlockNumber rblkno, BTStack true_stack)
1843 {
1844         BlockNumber blkno = true_stack->bts_blkno;
1845         BTStackData stack;
1846         BTPageOpaque opaque;
1847         Buffer          buf,
1848                                 rbuf;
1849         Page            page;
1850         OffsetNumber offnum;
1851
1852         true_stack = true_stack->bts_parent;
1853         for (;;)
1854         {
1855                 buf = _bt_getbuf(rel, blkno, BT_READ);
1856
1857                 /* Check/fix parent level pointed by blkno */
1858                 _bt_fixlevel(rel, buf, rblkno);
1859
1860                 /*
1861                  * Here parent level should have pointers for both lblkno and
1862                  * rblkno and we have to find them.
1863                  */
1864                 stack.bts_parent = NULL;
1865                 stack.bts_blkno = blkno;
1866                 stack.bts_offset = InvalidOffsetNumber;
1867                 ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), lblkno, P_HIKEY);
1868                 buf = _bt_getstackbuf(rel, &stack, BT_READ);
1869                 if (buf == InvalidBuffer)
1870                         elog(ERROR, "bt_fixbranch[%s]: left pointer unfound (need to recreate index)", RelationGetRelationName(rel));
1871                 page = BufferGetPage(buf);
1872                 offnum = _bt_getoff(page, rblkno);
1873
1874                 if (offnum != InvalidOffsetNumber)              /* right pointer found */
1875                 {
1876                         if (offnum <= stack.bts_offset)
1877                                 elog(ERROR, "bt_fixbranch[%s]: invalid item order (need to recreate index)", RelationGetRelationName(rel));
1878                         _bt_relbuf(rel, buf);
1879                         return;
1880                 }
1881
1882                 /* Pointers are on different parent pages - find right one */
1883                 lblkno = BufferGetBlockNumber(buf);
1884                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1885                 if (P_RIGHTMOST(opaque))
1886                         elog(ERROR, "bt_fixbranch[%s]: right pointer unfound(1) (need to recreate index)", RelationGetRelationName(rel));
1887
1888                 stack.bts_parent = NULL;
1889                 stack.bts_blkno = opaque->btpo_next;
1890                 stack.bts_offset = InvalidOffsetNumber;
1891                 ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), rblkno, P_HIKEY);
1892                 rbuf = _bt_getstackbuf(rel, &stack, BT_READ);
1893                 if (rbuf == InvalidBuffer)
1894                         elog(ERROR, "bt_fixbranch[%s]: right pointer unfound(2) (need to recreate index)", RelationGetRelationName(rel));
1895                 rblkno = BufferGetBlockNumber(rbuf);
1896                 _bt_relbuf(rel, rbuf);
1897
1898                 /*
1899                  * If we have parent item in true_stack then go up one level and
1900                  * ensure that it has pointers to new lblkno & rblkno.
1901                  */
1902                 if (true_stack)
1903                 {
1904                         _bt_relbuf(rel, buf);
1905                         blkno = true_stack->bts_blkno;
1906                         true_stack = true_stack->bts_parent;
1907                         continue;
1908                 }
1909
1910                 /*
1911                  * Well, we are on the level that was root or unexistent when we
1912                  * started traversing tree down. If btpo_parent is updated then
1913                  * we'll use it to continue, else we'll fix/restore upper levels
1914                  * entirely.
1915                  */
1916                 if (!BTreeInvalidParent(opaque))
1917                 {
1918                         blkno = opaque->btpo_parent;
1919                         _bt_relbuf(rel, buf);
1920                         continue;
1921                 }
1922
1923                 /* Have to switch to excl buf lock and re-check btpo_parent */
1924                 _bt_relbuf(rel, buf);
1925                 buf = _bt_getbuf(rel, blkno, BT_WRITE);
1926                 page = BufferGetPage(buf);
1927                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1928                 if (!BTreeInvalidParent(opaque))
1929                 {
1930                         blkno = opaque->btpo_parent;
1931                         _bt_relbuf(rel, buf);
1932                         continue;
1933                 }
1934
1935                 /*
1936                  * We hold excl lock on some internal page with unupdated
1937                  * btpo_parent - time for _bt_fixup.
1938                  */
1939                 break;
1940         }
1941
1942         elog(WARNING, "bt_fixbranch[%s]: fixing upper levels", RelationGetRelationName(rel));
1943         _bt_fixup(rel, buf);
1944
1945         return;
1946 }
1947
1948 /*
1949  * Having buf excl locked this routine walks to the left on level and
1950  * uses either _bt_fixtree() or _bt_fixroot() to create/check&fix upper
1951  * levels. No buffer pins/locks will be held on exit.
1952  */
1953 static void
1954 _bt_fixup(Relation rel, Buffer buf)
1955 {
1956         Page            page;
1957         BTPageOpaque opaque;
1958         BlockNumber blkno;
1959
1960         for (;;)
1961         {
1962                 page = BufferGetPage(buf);
1963                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1964
1965                 /*
1966                  * If someone else already created parent pages then it's time for
1967                  * _bt_fixtree() to check upper levels and fix them, if required.
1968                  */
1969                 if (!BTreeInvalidParent(opaque))
1970                 {
1971                         blkno = opaque->btpo_parent;
1972                         _bt_relbuf(rel, buf);
1973                         elog(WARNING, "bt_fixup[%s]: checking/fixing upper levels", RelationGetRelationName(rel));
1974                         _bt_fixtree(rel, blkno);
1975                         return;
1976                 }
1977                 if (P_LEFTMOST(opaque))
1978                         break;
1979                 blkno = opaque->btpo_prev;
1980                 _bt_relbuf(rel, buf);
1981                 buf = _bt_getbuf(rel, blkno, BT_WRITE);
1982         }
1983
1984         /*
1985          * Ok, we are on the leftmost page, it's write locked by us and its
1986          * btpo_parent points to meta page - time for _bt_fixroot().
1987          */
1988         elog(WARNING, "bt_fixup[%s]: fixing root page", RelationGetRelationName(rel));
1989         buf = _bt_fixroot(rel, buf, true);
1990         _bt_relbuf(rel, buf);
1991 }
1992
1993 static OffsetNumber
1994 _bt_getoff(Page page, BlockNumber blkno)
1995 {
1996         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1997         OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
1998         OffsetNumber offnum = P_FIRSTDATAKEY(opaque);
1999         BlockNumber curblkno;
2000         ItemId          itemid;
2001         BTItem          item;
2002
2003         for (; offnum <= maxoff; offnum++)
2004         {
2005                 itemid = PageGetItemId(page, offnum);
2006                 item = (BTItem) PageGetItem(page, itemid);
2007                 curblkno = ItemPointerGetBlockNumber(&(item->bti_itup.t_tid));
2008                 if (curblkno == blkno)
2009                         return (offnum);
2010         }
2011
2012         return (InvalidOffsetNumber);
2013 }
2014
2015 /*
2016  *      _bt_pgaddtup() -- add a tuple to a particular page in the index.
2017  *
2018  *              This routine adds the tuple to the page as requested.  It does
2019  *              not affect pin/lock status, but you'd better have a write lock
2020  *              and pin on the target buffer!  Don't forget to write and release
2021  *              the buffer afterwards, either.
2022  *
2023  *              The main difference between this routine and a bare PageAddItem call
2024  *              is that this code knows that the leftmost data item on a non-leaf
2025  *              btree page doesn't need to have a key.  Therefore, it strips such
2026  *              items down to just the item header.  CAUTION: this works ONLY if
2027  *              we insert the items in order, so that the given itup_off does
2028  *              represent the final position of the item!
2029  */
2030 static void
2031 _bt_pgaddtup(Relation rel,
2032                          Page page,
2033                          Size itemsize,
2034                          BTItem btitem,
2035                          OffsetNumber itup_off,
2036                          const char *where)
2037 {
2038         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2039         BTItemData      truncitem;
2040
2041         if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
2042         {
2043                 memcpy(&truncitem, btitem, sizeof(BTItemData));
2044                 truncitem.bti_itup.t_info = sizeof(BTItemData);
2045                 btitem = &truncitem;
2046                 itemsize = sizeof(BTItemData);
2047         }
2048
2049         if (PageAddItem(page, (Item) btitem, itemsize, itup_off,
2050                                         LP_USED) == InvalidOffsetNumber)
2051                 elog(PANIC, "btree: failed to add item to the %s for %s",
2052                          where, RelationGetRelationName(rel));
2053 }
2054
2055 /*
2056  * _bt_isequal - used in _bt_doinsert in check for duplicates.
2057  *
2058  * This is very similar to _bt_compare, except for NULL handling.
2059  * Rule is simple: NOT_NULL not equal NULL, NULL not_equal NULL too.
2060  */
2061 static bool
2062 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
2063                         int keysz, ScanKey scankey)
2064 {
2065         BTItem          btitem;
2066         IndexTuple      itup;
2067         int                     i;
2068
2069         /* Better be comparing to a leaf item */
2070         Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
2071
2072         btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
2073         itup = &(btitem->bti_itup);
2074
2075         for (i = 1; i <= keysz; i++)
2076         {
2077                 ScanKey         entry = &scankey[i - 1];
2078                 AttrNumber      attno;
2079                 Datum           datum;
2080                 bool            isNull;
2081                 int32           result;
2082
2083                 attno = entry->sk_attno;
2084                 Assert(attno == i);
2085                 datum = index_getattr(itup, attno, itupdesc, &isNull);
2086
2087                 /* NULLs are never equal to anything */
2088                 if (entry->sk_flags & SK_ISNULL || isNull)
2089                         return false;
2090
2091                 result = DatumGetInt32(FunctionCall2(&entry->sk_func,
2092                                                                                          entry->sk_argument,
2093                                                                                          datum));
2094
2095                 if (result != 0)
2096                         return false;
2097         }
2098
2099         /* if we get here, the keys are equal */
2100         return true;
2101 }