]> granicus.if.org Git - postgresql/blob - src/backend/access/nbtree/nbtinsert.c
4b92f2f56112e9fa79865a490b757b665c2e60e9
[postgresql] / src / backend / access / nbtree / nbtinsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * btinsert.c
4  *        Item insertion in Lehman and Yao btrees for Postgres.
5  *
6  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.67 2000/10/21 15:43:18 vadim Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20
21
22 typedef struct
23 {
24         /* context data for _bt_checksplitloc */
25         Size    newitemsz;                      /* size of new item to be inserted */
26         bool    non_leaf;                       /* T if splitting an internal node */
27
28         bool    have_split;                     /* found a valid split? */
29
30         /* these fields valid only if have_split is true */
31         bool    newitemonleft;          /* new item on left or right of best split */
32         OffsetNumber firstright;        /* best split point */
33         int             best_delta;                     /* best size delta so far */
34 } FindSplitData;
35
36 void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
37
38 static TransactionId _bt_check_unique(Relation rel, BTItem btitem,
39                                                                           Relation heapRel, Buffer buf,
40                                                                           ScanKey itup_scankey);
41 static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf,
42                                                                                 BTStack stack,
43                                                                                 int keysz, ScanKey scankey,
44                                                                                 BTItem btitem,
45                                                                                 OffsetNumber afteritem);
46 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
47                                                 OffsetNumber newitemoff, Size newitemsz,
48                                                 BTItem newitem, bool newitemonleft,
49                                                 OffsetNumber *itup_off, BlockNumber *itup_blkno);
50 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
51                                                                          OffsetNumber newitemoff,
52                                                                          Size newitemsz,
53                                                                          bool *newitemonleft);
54 static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
55                                                           int leftfree, int rightfree,
56                                                           bool newitemonleft, Size firstrightitemsz);
57 static Buffer _bt_getstackbuf(Relation rel, BTStack stack);
58 static void _bt_pgaddtup(Relation rel, Page page,
59                                                  Size itemsize, BTItem btitem,
60                                                  OffsetNumber itup_off, const char *where);
61 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
62                                                 int keysz, ScanKey scankey);
63
64 #ifdef XLOG
65 static Relation         _xlheapRel;     /* temporary hack */
66 #endif
67
68 /*
69  *      _bt_doinsert() -- Handle insertion of a single btitem in the tree.
70  *
71  *              This routine is called by the public interface routines, btbuild
72  *              and btinsert.  By here, btitem is filled in, including the TID.
73  */
74 InsertIndexResult
75 _bt_doinsert(Relation rel, BTItem btitem,
76                          bool index_is_unique, Relation heapRel)
77 {
78         IndexTuple      itup = &(btitem->bti_itup);
79         int                     natts = rel->rd_rel->relnatts;
80         ScanKey         itup_scankey;
81         BTStack         stack;
82         Buffer          buf;
83         InsertIndexResult res;
84
85         /* we need a scan key to do our search, so build one */
86         itup_scankey = _bt_mkscankey(rel, itup);
87
88 top:
89         /* find the page containing this key */
90         stack = _bt_search(rel, natts, itup_scankey, &buf, BT_WRITE);
91
92         /* trade in our read lock for a write lock */
93         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
94         LockBuffer(buf, BT_WRITE);
95
96         /*
97          * If the page was split between the time that we surrendered our read
98          * lock and acquired our write lock, then this page may no longer be
99          * the right place for the key we want to insert.  In this case, we
100          * need to move right in the tree.      See Lehman and Yao for an
101          * excruciatingly precise description.
102          */
103         buf = _bt_moveright(rel, buf, natts, itup_scankey, BT_WRITE);
104
105         /*
106          * If we're not allowing duplicates, make sure the key isn't
107          * already in the index.  XXX this belongs somewhere else, likely
108          */
109         if (index_is_unique)
110         {
111                 TransactionId xwait;
112
113                 xwait = _bt_check_unique(rel, btitem, heapRel, buf, itup_scankey);
114
115                 if (TransactionIdIsValid(xwait))
116                 {
117                         /* Have to wait for the other guy ... */
118                         _bt_relbuf(rel, buf, BT_WRITE);
119                         XactLockTableWait(xwait);
120                         /* start over... */
121                         _bt_freestack(stack);
122                         goto top;
123                 }
124         }
125
126 #ifdef XLOG
127         _xlheapRel = heapRel;   /* temporary hack */
128 #endif
129
130         /* do the insertion */
131         res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem, 0);
132
133         /* be tidy */
134         _bt_freestack(stack);
135         _bt_freeskey(itup_scankey);
136
137         return res;
138 }
139
140 /*
141  *      _bt_check_unique() -- Check for violation of unique index constraint
142  *
143  * Returns NullTransactionId if there is no conflict, else an xact ID we
144  * must wait for to see if it commits a conflicting tuple.  If an actual
145  * conflict is detected, no return --- just elog().
146  */
147 static TransactionId
148 _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel,
149                                  Buffer buf, ScanKey itup_scankey)
150 {
151         TupleDesc       itupdesc = RelationGetDescr(rel);
152         int                     natts = rel->rd_rel->relnatts;
153         OffsetNumber offset,
154                                 maxoff;
155         Page            page;
156         BTPageOpaque opaque;
157         Buffer          nbuf = InvalidBuffer;
158         bool            chtup = true;
159
160         page = BufferGetPage(buf);
161         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
162         maxoff = PageGetMaxOffsetNumber(page);
163
164         /*
165          * Find first item >= proposed new item.  Note we could also get
166          * a pointer to end-of-page here.
167          */
168         offset = _bt_binsrch(rel, buf, natts, itup_scankey);
169
170         /*
171          * Scan over all equal tuples, looking for live conflicts.
172          */
173         for (;;)
174         {
175                 HeapTupleData htup;
176                 Buffer          buffer;
177                 BTItem          cbti;
178                 BlockNumber nblkno;
179
180                 /*
181                  * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
182                  * how we handling NULLs - and so we must not use _bt_compare
183                  * in real comparison, but only for ordering/finding items on
184                  * pages. - vadim 03/24/97
185                  *
186                  * make sure the offset points to an actual key
187                  * before trying to compare it...
188                  */
189                 if (offset <= maxoff)
190                 {
191                         if (! _bt_isequal(itupdesc, page, offset, natts, itup_scankey))
192                                 break;                  /* we're past all the equal tuples */
193
194                         /*
195                          * Have to check is inserted heap tuple deleted one (i.e.
196                          * just moved to another place by vacuum)!  We only need to
197                          * do this once, but don't want to do it at all unless
198                          * we see equal tuples, so as not to slow down unequal case.
199                          */
200                         if (chtup)
201                         {
202                                 htup.t_self = btitem->bti_itup.t_tid;
203                                 heap_fetch(heapRel, SnapshotDirty, &htup, &buffer);
204                                 if (htup.t_data == NULL)                /* YES! */
205                                         break;
206                                 /* Live tuple is being inserted, so continue checking */
207                                 ReleaseBuffer(buffer);
208                                 chtup = false;
209                         }
210
211                         cbti = (BTItem) PageGetItem(page, PageGetItemId(page, offset));
212                         htup.t_self = cbti->bti_itup.t_tid;
213                         heap_fetch(heapRel, SnapshotDirty, &htup, &buffer);
214                         if (htup.t_data != NULL)                /* it is a duplicate */
215                         {
216                                 TransactionId xwait =
217                                         (TransactionIdIsValid(SnapshotDirty->xmin)) ?
218                                         SnapshotDirty->xmin : SnapshotDirty->xmax;
219
220                                 /*
221                                  * If this tuple is being updated by other transaction
222                                  * then we have to wait for its commit/abort.
223                                  */
224                                 ReleaseBuffer(buffer);
225                                 if (TransactionIdIsValid(xwait))
226                                 {
227                                         if (nbuf != InvalidBuffer)
228                                                 _bt_relbuf(rel, nbuf, BT_READ);
229                                         /* Tell _bt_doinsert to wait... */
230                                         return xwait;
231                                 }
232                                 /*
233                                  * Otherwise we have a definite conflict.
234                                  */
235                                 elog(ERROR, "Cannot insert a duplicate key into unique index %s",
236                                          RelationGetRelationName(rel));
237                         }
238                         /* htup null so no buffer to release */
239                 }
240
241                 /*
242                  * Advance to next tuple to continue checking.
243                  */
244                 if (offset < maxoff)
245                         offset = OffsetNumberNext(offset);
246                 else
247                 {
248                         /* If scankey == hikey we gotta check the next page too */
249                         if (P_RIGHTMOST(opaque))
250                                 break;
251                         if (!_bt_isequal(itupdesc, page, P_HIKEY,
252                                                          natts, itup_scankey))
253                                 break;
254                         nblkno = opaque->btpo_next;
255                         if (nbuf != InvalidBuffer)
256                                 _bt_relbuf(rel, nbuf, BT_READ);
257                         nbuf = _bt_getbuf(rel, nblkno, BT_READ);
258                         page = BufferGetPage(nbuf);
259                         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
260                         maxoff = PageGetMaxOffsetNumber(page);
261                         offset = P_FIRSTDATAKEY(opaque);
262                 }
263         }
264
265         if (nbuf != InvalidBuffer)
266                 _bt_relbuf(rel, nbuf, BT_READ);
267
268         return NullTransactionId;
269 }
270
271 /*----------
272  *      _bt_insertonpg() -- Insert a tuple on a particular page in the index.
273  *
274  *              This recursive procedure does the following things:
275  *
276  *                      +  finds the right place to insert the tuple.
277  *                      +  if necessary, splits the target page (making sure that the
278  *                         split is equitable as far as post-insert free space goes).
279  *                      +  inserts the tuple.
280  *                      +  if the page was split, pops the parent stack, and finds the
281  *                         right place to insert the new child pointer (by walking
282  *                         right using information stored in the parent stack).
283  *                      +  invokes itself with the appropriate tuple for the right
284  *                         child page on the parent.
285  *
286  *              On entry, we must have the right buffer on which to do the
287  *              insertion, and the buffer must be pinned and locked.  On return,
288  *              we will have dropped both the pin and the write lock on the buffer.
289  *
290  *              If 'afteritem' is >0 then the new tuple must be inserted after the
291  *              existing item of that number, noplace else.  If 'afteritem' is 0
292  *              then the procedure finds the exact spot to insert it by searching.
293  *              (keysz and scankey parameters are used ONLY if afteritem == 0.)
294  *
295  *              NOTE: if the new key is equal to one or more existing keys, we can
296  *              legitimately place it anywhere in the series of equal keys --- in fact,
297  *              if the new key is equal to the page's "high key" we can place it on
298  *              the next page.  If it is equal to the high key, and there's not room
299  *              to insert the new tuple on the current page without splitting, then
300  *              we can move right hoping to find more free space and avoid a split.
301  *              (We should not move right indefinitely, however, since that leads to
302  *              O(N^2) insertion behavior in the presence of many equal keys.)
303  *              Once we have chosen the page to put the key on, we'll insert it before
304  *              any existing equal keys because of the way _bt_binsrch() works.
305  *
306  *              The locking interactions in this code are critical.  You should
307  *              grok Lehman and Yao's paper before making any changes.  In addition,
308  *              you need to understand how we disambiguate duplicate keys in this
309  *              implementation, in order to be able to find our location using
310  *              L&Y "move right" operations.  Since we may insert duplicate user
311  *              keys, and since these dups may propagate up the tree, we use the
312  *              'afteritem' parameter to position ourselves correctly for the
313  *              insertion on internal pages.
314  *----------
315  */
316 static InsertIndexResult
317 _bt_insertonpg(Relation rel,
318                            Buffer buf,
319                            BTStack stack,
320                            int keysz,
321                            ScanKey scankey,
322                            BTItem btitem,
323                            OffsetNumber afteritem)
324 {
325         InsertIndexResult res;
326         Page            page;
327         BTPageOpaque lpageop;
328         OffsetNumber itup_off;
329         BlockNumber itup_blkno;
330         OffsetNumber newitemoff;
331         OffsetNumber firstright = InvalidOffsetNumber;
332         Size            itemsz;
333
334         page = BufferGetPage(buf);
335         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
336
337         itemsz = IndexTupleDSize(btitem->bti_itup)
338                 + (sizeof(BTItemData) - sizeof(IndexTupleData));
339
340         itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but
341                                                                  * we need to be consistent */
342
343         /*
344          * Check whether the item can fit on a btree page at all. (Eventually,
345          * we ought to try to apply TOAST methods if not.) We actually need to
346          * be able to fit three items on every page, so restrict any one item
347          * to 1/3 the per-page available space. Note that at this point,
348          * itemsz doesn't include the ItemId.
349          */
350         if (itemsz > (PageGetPageSize(page) - sizeof(PageHeaderData) - MAXALIGN(sizeof(BTPageOpaqueData))) / 3 - sizeof(ItemIdData))
351                 elog(ERROR, "btree: index item size %u exceeds maximum %lu",
352                          itemsz,
353                          (PageGetPageSize(page) - sizeof(PageHeaderData) - MAXALIGN(sizeof(BTPageOpaqueData))) /3 - sizeof(ItemIdData));
354
355         /*
356          * Determine exactly where new item will go.
357          */
358         if (afteritem > 0)
359         {
360                 newitemoff = afteritem + 1;
361         }
362         else
363         {
364                 /*----------
365                  * If we will need to split the page to put the item here,
366                  * check whether we can put the tuple somewhere to the right,
367                  * instead.  Keep scanning right until we
368                  *              (a) find a page with enough free space,
369                  *              (b) reach the last page where the tuple can legally go, or
370                  *              (c) get tired of searching.
371                  * (c) is not flippant; it is important because if there are many
372                  * pages' worth of equal keys, it's better to split one of the early
373                  * pages than to scan all the way to the end of the run of equal keys
374                  * on every insert.  We implement "get tired" as a random choice,
375                  * since stopping after scanning a fixed number of pages wouldn't work
376                  * well (we'd never reach the right-hand side of previously split
377                  * pages).  Currently the probability of moving right is set at 0.99,
378                  * which may seem too high to change the behavior much, but it does an
379                  * excellent job of preventing O(N^2) behavior with many equal keys.
380                  *----------
381                  */
382                 bool    movedright = false;
383
384                 while (PageGetFreeSpace(page) < itemsz &&
385                            !P_RIGHTMOST(lpageop) &&
386                            _bt_compare(rel, keysz, scankey, page, P_HIKEY) == 0 &&
387                            random() > (MAX_RANDOM_VALUE / 100))
388                 {
389                         /* step right one page */
390                         BlockNumber             rblkno = lpageop->btpo_next;
391
392                         _bt_relbuf(rel, buf, BT_WRITE);
393                         buf = _bt_getbuf(rel, rblkno, BT_WRITE);
394                         page = BufferGetPage(buf);
395                         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
396                         movedright = true;
397                 }
398                 /*
399                  * Now we are on the right page, so find the insert position.
400                  * If we moved right at all, we know we should insert at the
401                  * start of the page, else must find the position by searching.
402                  */
403                 if (movedright)
404                         newitemoff = P_FIRSTDATAKEY(lpageop);
405                 else
406                         newitemoff = _bt_binsrch(rel, buf, keysz, scankey);
407         }
408
409         /*
410          * Do we need to split the page to fit the item on it?
411          *
412          * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its
413          * result, so this comparison is correct even though we appear to
414          * be accounting only for the item and not for its line pointer.
415          */
416         if (PageGetFreeSpace(page) < itemsz)
417         {
418                 Buffer          rbuf;
419                 BlockNumber bknum = BufferGetBlockNumber(buf);
420                 BlockNumber rbknum;
421                 bool            is_root = P_ISROOT(lpageop);
422                 bool            newitemonleft;
423
424                 /* Choose the split point */
425                 firstright = _bt_findsplitloc(rel, page,
426                                                                           newitemoff, itemsz,
427                                                                           &newitemonleft);
428
429                 /* split the buffer into left and right halves */
430                 rbuf = _bt_split(rel, buf, firstright,
431                                                  newitemoff, itemsz, btitem, newitemonleft,
432                                                  &itup_off, &itup_blkno);
433
434                 /*----------
435                  * By here,
436                  *
437                  *              +  our target page has been split;
438                  *              +  the original tuple has been inserted;
439                  *              +  we have write locks on both the old (left half)
440                  *                 and new (right half) buffers, after the split; and
441                  *              +  we know the key we want to insert into the parent
442                  *                 (it's the "high key" on the left child page).
443                  *
444                  * We're ready to do the parent insertion.  We need to hold onto the
445                  * locks for the child pages until we locate the parent, but we can
446                  * release them before doing the actual insertion (see Lehman and Yao
447                  * for the reasoning).
448                  *
449                  * Here we have to do something Lehman and Yao don't talk about:
450                  * deal with a root split and construction of a new root.  If our
451                  * stack is empty then we have just split a node on what had been
452                  * the root level when we descended the tree.  If it is still the
453                  * root then we perform a new-root construction.  If it *wasn't*
454                  * the root anymore, use the parent pointer to get up to the root
455                  * level that someone constructed meanwhile, and find the right
456                  * place to insert as for the normal case.
457                  *----------
458                  */
459
460                 if (is_root)
461                 {
462                         Assert(stack == (BTStack) NULL);
463                         /* create a new root node and release the split buffers */
464                         _bt_newroot(rel, buf, rbuf);
465                 }
466                 else
467                 {
468                         InsertIndexResult newres;
469                         BTItem          new_item;
470                         BTStackData     fakestack;
471                         BTItem          ritem;
472                         Buffer          pbuf;
473
474                         /* Set up a phony stack entry if we haven't got a real one */
475                         if (stack == (BTStack) NULL)
476                         {
477                                 elog(DEBUG, "btree: concurrent ROOT page split");
478                                 stack = &fakestack;
479                                 stack->bts_blkno = lpageop->btpo_parent;
480                                 stack->bts_offset = InvalidOffsetNumber;
481                                 /* bts_btitem will be initialized below */
482                                 stack->bts_parent = NULL;
483                         }
484
485                         /* get high key from left page == lowest key on new right page */
486                         ritem = (BTItem) PageGetItem(page,
487                                                                                  PageGetItemId(page, P_HIKEY));
488
489                         /* form an index tuple that points at the new right page */
490                         new_item = _bt_formitem(&(ritem->bti_itup));
491                         rbknum = BufferGetBlockNumber(rbuf);
492                         ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY);
493
494                         /*
495                          * Find the parent buffer and get the parent page.
496                          *
497                          * Oops - if we were moved right then we need to change stack
498                          * item! We want to find parent pointing to where we are,
499                          * right ?        - vadim 05/27/97
500                          *
501                          * Interestingly, this means we didn't *really* need to stack
502                          * the parent key at all; all we really care about is the
503                          * saved block and offset as a starting point for our search...
504                          */
505                         ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
506                                                    bknum, P_HIKEY);
507
508                         pbuf = _bt_getstackbuf(rel, stack);
509
510                         /* Now we can write and unlock the children */
511                         _bt_wrtbuf(rel, rbuf);
512                         _bt_wrtbuf(rel, buf);
513
514                         /* Recursively update the parent */
515                         newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
516                                                                         0, NULL, new_item, stack->bts_offset);
517
518                         /* be tidy */
519                         pfree(newres);
520                         pfree(new_item);
521                 }
522         }
523         else
524         {
525 #ifdef XLOG
526                 /* XLOG stuff */
527                 {
528                         char                            xlbuf[sizeof(xl_btree_insert) + 
529                                         sizeof(CommandId) + sizeof(RelFileNode)];
530                         xl_btree_insert    *xlrec = (xl_btree_insert*)xlbuf;
531                         int                                     hsize = SizeOfBtreeInsert;
532                         BTItemData                      truncitem;
533                         BTItem                          xlitem = btitem;
534                         Size                            xlsize = IndexTupleDSize(btitem->bti_itup) + 
535                                                         (sizeof(BTItemData) - sizeof(IndexTupleData));
536                         XLogRecPtr                      recptr;
537
538                         xlrec->target.node = rel->rd_node;
539                         ItemPointerSet(&(xlrec->target.tid), BufferGetBlockNumber(buf), newitemoff);
540                         if (P_ISLEAF(lpageop))
541                         {
542                                 CommandId       cid = GetCurrentCommandId();
543                                 memcpy(xlbuf + hsize, &cid, sizeof(CommandId));
544                                 hsize += sizeof(CommandId);
545                                 memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode));
546                                 hsize += sizeof(RelFileNode);
547                         }
548                         /*
549                          * Read comments in _bt_pgaddtup
550                          */
551                         else if (newitemoff == P_FIRSTDATAKEY(lpageop))
552                         {
553                                 truncitem = *btitem;
554                                 truncitem.bti_itup.t_info = sizeof(BTItemData);
555                                 xlitem = &truncitem;
556                                 xlsize = sizeof(BTItemData);
557                         }
558
559                         recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_INSERT,
560                                 xlbuf, hsize, (char*) xlitem, xlsize);
561
562                         PageSetLSN(page, recptr);
563                         PageSetSUI(page, ThisStartUpID);
564                 }
565 #endif
566                 _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
567                 itup_off = newitemoff;
568                 itup_blkno = BufferGetBlockNumber(buf);
569                 /* Write out the updated page and release pin/lock */
570                 _bt_wrtbuf(rel, buf);
571         }
572
573         /* by here, the new tuple is inserted at itup_blkno/itup_off */
574         res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
575         ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
576
577         return res;
578 }
579
580 /*
581  *      _bt_split() -- split a page in the btree.
582  *
583  *              On entry, buf is the page to split, and is write-locked and pinned.
584  *              firstright is the item index of the first item to be moved to the
585  *              new right page.  newitemoff etc. tell us about the new item that
586  *              must be inserted along with the data from the old page.
587  *
588  *              Returns the new right sibling of buf, pinned and write-locked.
589  *              The pin and lock on buf are maintained.  *itup_off and *itup_blkno
590  *              are set to the exact location where newitem was inserted.
591  */
592 static Buffer
593 _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
594                   OffsetNumber newitemoff, Size newitemsz, BTItem newitem,
595                   bool newitemonleft,
596                   OffsetNumber *itup_off, BlockNumber *itup_blkno)
597 {
598         Buffer          rbuf;
599         Page            origpage;
600         Page            leftpage,
601                                 rightpage;
602         BTPageOpaque ropaque,
603                                 lopaque,
604                                 oopaque;
605         Buffer          sbuf = 0;
606         Page            spage = 0;
607         BTPageOpaque sopaque;
608         Size            itemsz;
609         ItemId          itemid;
610         BTItem          item;
611         OffsetNumber leftoff,
612                                 rightoff;
613         OffsetNumber maxoff;
614         OffsetNumber i;
615
616 #ifdef XLOG
617         BTItem          lhikey;
618 #endif
619
620         rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
621         origpage = BufferGetPage(buf);
622         leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
623         rightpage = BufferGetPage(rbuf);
624
625         _bt_pageinit(leftpage, BufferGetPageSize(buf));
626         _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
627
628         /* init btree private data */
629         oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
630         lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
631         ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
632
633         /* if we're splitting this page, it won't be the root when we're done */
634         lopaque->btpo_flags = oopaque->btpo_flags;
635         lopaque->btpo_flags &= ~BTP_ROOT;
636         ropaque->btpo_flags = lopaque->btpo_flags;
637         lopaque->btpo_prev = oopaque->btpo_prev;
638         lopaque->btpo_next = BufferGetBlockNumber(rbuf);
639         ropaque->btpo_prev = BufferGetBlockNumber(buf);
640         ropaque->btpo_next = oopaque->btpo_next;
641
642         /*
643          * Must copy the original parent link into both new pages, even though
644          * it might be quite obsolete by now.  We might need it if this level
645          * is or recently was the root (see README).
646          */
647         lopaque->btpo_parent = ropaque->btpo_parent = oopaque->btpo_parent;
648
649         /*
650          * If the page we're splitting is not the rightmost page at its level
651          * in the tree, then the first entry on the page is the high key
652          * for the page.  We need to copy that to the right half.  Otherwise
653          * (meaning the rightmost page case), all the items on the right half
654          * will be user data.
655          */
656         rightoff = P_HIKEY;
657
658         if (!P_RIGHTMOST(oopaque))
659         {
660                 itemid = PageGetItemId(origpage, P_HIKEY);
661                 itemsz = ItemIdGetLength(itemid);
662                 item = (BTItem) PageGetItem(origpage, itemid);
663                 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
664                                                 LP_USED) == InvalidOffsetNumber)
665                         elog(STOP, "btree: failed to add hikey to the right sibling");
666                 rightoff = OffsetNumberNext(rightoff);
667         }
668
669         /*
670          * The "high key" for the new left page will be the first key that's
671          * going to go into the new right page.  This might be either the
672          * existing data item at position firstright, or the incoming tuple.
673          */
674         leftoff = P_HIKEY;
675         if (!newitemonleft && newitemoff == firstright)
676         {
677                 /* incoming tuple will become first on right page */
678                 itemsz = newitemsz;
679                 item = newitem;
680         }
681         else
682         {
683                 /* existing item at firstright will become first on right page */
684                 itemid = PageGetItemId(origpage, firstright);
685                 itemsz = ItemIdGetLength(itemid);
686                 item = (BTItem) PageGetItem(origpage, itemid);
687         }
688 #ifdef XLOG
689         lhikey = item;
690 #endif
691         if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
692                                         LP_USED) == InvalidOffsetNumber)
693                 elog(STOP, "btree: failed to add hikey to the left sibling");
694         leftoff = OffsetNumberNext(leftoff);
695
696         /*
697          * Now transfer all the data items to the appropriate page
698          */
699         maxoff = PageGetMaxOffsetNumber(origpage);
700
701         for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
702         {
703                 itemid = PageGetItemId(origpage, i);
704                 itemsz = ItemIdGetLength(itemid);
705                 item = (BTItem) PageGetItem(origpage, itemid);
706
707                 /* does new item belong before this one? */
708                 if (i == newitemoff)
709                 {
710                         if (newitemonleft)
711                         {
712                                 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
713                                                          "left sibling");
714                                 *itup_off = leftoff;
715                                 *itup_blkno = BufferGetBlockNumber(buf);
716                                 leftoff = OffsetNumberNext(leftoff);
717                         }
718                         else
719                         {
720                                 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
721                                                          "right sibling");
722                                 *itup_off = rightoff;
723                                 *itup_blkno = BufferGetBlockNumber(rbuf);
724                                 rightoff = OffsetNumberNext(rightoff);
725                         }
726                 }
727
728                 /* decide which page to put it on */
729                 if (i < firstright)
730                 {
731                         _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
732                                                          "left sibling");
733                         leftoff = OffsetNumberNext(leftoff);
734                 }
735                 else
736                 {
737                         _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
738                                                          "right sibling");
739                         rightoff = OffsetNumberNext(rightoff);
740                 }
741         }
742
743         /* cope with possibility that newitem goes at the end */
744         if (i <= newitemoff)
745         {
746                 if (newitemonleft)
747                 {
748                         _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
749                                                  "left sibling");
750                         *itup_off = leftoff;
751                         *itup_blkno = BufferGetBlockNumber(buf);
752                         leftoff = OffsetNumberNext(leftoff);
753                 }
754                 else
755                 {
756                         _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
757                                                  "right sibling");
758                         *itup_off = rightoff;
759                         *itup_blkno = BufferGetBlockNumber(rbuf);
760                         rightoff = OffsetNumberNext(rightoff);
761                 }
762         }
763
764         /*
765          * We have to grab the right sibling (if any) and fix the prev
766          * pointer there. We are guaranteed that this is deadlock-free
767          * since no other writer will be holding a lock on that page
768          * and trying to move left, and all readers release locks on a page
769          * before trying to fetch its neighbors.
770          */
771
772         if (!P_RIGHTMOST(ropaque))
773         {
774                 sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
775                 spage = BufferGetPage(sbuf);
776         }
777
778 #ifdef XLOG
779         /*
780          * Right sibling is locked, new siblings are prepared, but original
781          * page is not updated yet. Log changes before continuing.
782          *
783          * NO ELOG(ERROR) till right sibling is updated.
784          *
785          */
786         {
787                 char                            xlbuf[sizeof(xl_btree_split) + 
788                         sizeof(CommandId) + sizeof(RelFileNode) + BLCKSZ];
789                 xl_btree_split     *xlrec = (xl_btree_split*) xlbuf;
790                 int                                     hsize = SizeOfBtreeSplit;
791                 int                                     flag = (newitemonleft) ? 
792                                 XLOG_BTREE_SPLEFT : XLOG_BTREE_SPLIT;
793                 BlockNumber                     blkno;
794                 XLogRecPtr                      recptr;
795
796                 xlrec->target.node = rel->rd_node;
797                 ItemPointerSet(&(xlrec->target.tid), *itup_blkno, *itup_off);
798                 if (P_ISLEAF(lopaque))
799                 {
800                         CommandId       cid = GetCurrentCommandId();
801                         memcpy(xlbuf + hsize, &cid, sizeof(CommandId));
802                         hsize += sizeof(CommandId);
803                         memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode));
804                         hsize += sizeof(RelFileNode);
805                 }
806                 else
807                 {
808                         Size    itemsz = IndexTupleDSize(lhikey->bti_itup) + 
809                                                 (sizeof(BTItemData) - sizeof(IndexTupleData));
810                         memcpy(xlbuf + hsize, (char*) lhikey, itemsz);
811                         hsize += itemsz;
812                 }
813                 if (newitemonleft)
814                 {
815                         /*
816                          * Read comments in _bt_pgaddtup.
817                          * Actually, seems that in non-leaf splits newitem shouldn't
818                          * go to first data key position on left page.
819                          */
820                         if (! P_ISLEAF(lopaque) && *itup_off == P_FIRSTDATAKEY(lopaque))
821                         {
822                                 BTItemData      truncitem = *newitem;
823                                 truncitem.bti_itup.t_info = sizeof(BTItemData);
824                                 memcpy(xlbuf + hsize, &truncitem, sizeof(BTItemData));
825                                 hsize += sizeof(BTItemData);
826                         }
827                         else
828                         {
829                                 Size    itemsz = IndexTupleDSize(newitem->bti_itup) + 
830                                                         (sizeof(BTItemData) - sizeof(IndexTupleData));
831                                 memcpy(xlbuf + hsize, (char*) newitem, itemsz);
832                                 hsize += itemsz;
833                         }
834                         blkno = BufferGetBlockNumber(rbuf);
835                         BlockIdSet(&(xlrec->otherblk), blkno);
836                 }
837                 else
838                 {
839                         blkno = BufferGetBlockNumber(buf);
840                         BlockIdSet(&(xlrec->otherblk), blkno);
841                 }
842
843                 BlockIdSet(&(xlrec->rightblk), ropaque->btpo_next);
844
845                 /* 
846                  * Dirrect access to page is not good but faster - we should 
847                  * implement some new func in page API.
848                  */
849                 recptr = XLogInsert(RM_BTREE_ID, flag, xlbuf, 
850                         hsize, (char*)rightpage + ((PageHeader) rightpage)->pd_upper,
851                         ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
852
853                 PageSetLSN(leftpage, recptr);
854                 PageSetSUI(leftpage, ThisStartUpID);
855                 PageSetLSN(rightpage, recptr);
856                 PageSetSUI(rightpage, ThisStartUpID);
857                 if (!P_RIGHTMOST(ropaque))
858                 {
859                         PageSetLSN(spage, recptr);
860                         PageSetSUI(spage, ThisStartUpID);
861                 }
862         }
863 #endif
864
865         /*
866          * By here, the original data page has been split into two new halves,
867          * and these are correct.  The algorithm requires that the left page
868          * never move during a split, so we copy the new left page back on top
869          * of the original.  Note that this is not a waste of time, since we
870          * also require (in the page management code) that the center of a
871          * page always be clean, and the most efficient way to guarantee this
872          * is just to compact the data by reinserting it into a new left page.
873          */
874
875         PageRestoreTempPage(leftpage, origpage);
876
877         if (!P_RIGHTMOST(ropaque))
878         {
879                 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
880                 sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
881
882                 /* write and release the old right sibling */
883                 _bt_wrtbuf(rel, sbuf);
884         }
885
886         /* split's done */
887         return rbuf;
888 }
889
890 /*
891  *      _bt_findsplitloc() -- find an appropriate place to split a page.
892  *
893  * The idea here is to equalize the free space that will be on each split
894  * page, *after accounting for the inserted tuple*.  (If we fail to account
895  * for it, we might find ourselves with too little room on the page that
896  * it needs to go into!)
897  *
898  * We are passed the intended insert position of the new tuple, expressed as
899  * the offsetnumber of the tuple it must go in front of.  (This could be
900  * maxoff+1 if the tuple is to go at the end.)
901  *
902  * We return the index of the first existing tuple that should go on the
903  * righthand page, plus a boolean indicating whether the new tuple goes on
904  * the left or right page.  The bool is necessary to disambiguate the case
905  * where firstright == newitemoff.
906  */
907 static OffsetNumber
908 _bt_findsplitloc(Relation rel,
909                                  Page page,
910                                  OffsetNumber newitemoff,
911                                  Size newitemsz,
912                                  bool *newitemonleft)
913 {
914         BTPageOpaque opaque;
915         OffsetNumber offnum;
916         OffsetNumber maxoff;
917         ItemId          itemid;
918         FindSplitData state;
919         int                     leftspace,
920                                 rightspace,
921                                 goodenough,
922                                 dataitemtotal,
923                                 dataitemstoleft;
924
925         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
926
927         /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
928         newitemsz += sizeof(ItemIdData);
929         state.newitemsz = newitemsz;
930         state.non_leaf = ! P_ISLEAF(opaque);
931         state.have_split = false;
932
933         /* Total free space available on a btree page, after fixed overhead */
934         leftspace = rightspace =
935                 PageGetPageSize(page) - sizeof(PageHeaderData) -
936                 MAXALIGN(sizeof(BTPageOpaqueData))
937                 + sizeof(ItemIdData);
938
939         /*
940          * Finding the best possible split would require checking all the possible
941          * split points, because of the high-key and left-key special cases.
942          * That's probably more work than it's worth; instead, stop as soon as
943          * we find a "good-enough" split, where good-enough is defined as an
944          * imbalance in free space of no more than pagesize/16 (arbitrary...)
945          * This should let us stop near the middle on most pages, instead of
946          * plowing to the end.
947          */
948         goodenough = leftspace / 16;
949
950         /* The right page will have the same high key as the old page */
951         if (!P_RIGHTMOST(opaque))
952         {
953                 itemid = PageGetItemId(page, P_HIKEY);
954                 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
955                                                          sizeof(ItemIdData));
956         }
957
958         /* Count up total space in data items without actually scanning 'em */
959         dataitemtotal = rightspace - (int) PageGetFreeSpace(page);
960
961         /*
962          * Scan through the data items and calculate space usage for a split
963          * at each possible position.
964          */
965         dataitemstoleft = 0;
966         maxoff = PageGetMaxOffsetNumber(page);
967
968         for (offnum = P_FIRSTDATAKEY(opaque);
969                  offnum <= maxoff;
970                  offnum = OffsetNumberNext(offnum))
971         {
972                 Size            itemsz;
973                 int                     leftfree,
974                                         rightfree;
975
976                 itemid = PageGetItemId(page, offnum);
977                 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
978
979                 /*
980                  * We have to allow for the current item becoming the high key of
981                  * the left page; therefore it counts against left space as well
982                  * as right space.
983                  */
984                 leftfree = leftspace - dataitemstoleft - (int) itemsz;
985                 rightfree = rightspace - (dataitemtotal - dataitemstoleft);
986                 /*
987                  * Will the new item go to left or right of split?
988                  */
989                 if (offnum > newitemoff)
990                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
991                                                           true, itemsz);
992                 else if (offnum < newitemoff)
993                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
994                                                           false, itemsz);
995                 else
996                 {
997                         /* need to try it both ways! */
998                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
999                                                           true, itemsz);
1000                         /* here we are contemplating newitem as first on right */
1001                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1002                                                           false, newitemsz);
1003                 }
1004
1005                 /* Abort scan once we find a good-enough choice */
1006                 if (state.have_split && state.best_delta <= goodenough)
1007                         break;
1008
1009                 dataitemstoleft += itemsz;
1010         }
1011
1012         /*
1013          * I believe it is not possible to fail to find a feasible split,
1014          * but just in case ...
1015          */
1016         if (! state.have_split)
1017                 elog(FATAL, "_bt_findsplitloc: can't find a feasible split point for %s",
1018                          RelationGetRelationName(rel));
1019
1020         *newitemonleft = state.newitemonleft;
1021         return state.firstright;
1022 }
1023
1024 /*
1025  * Subroutine to analyze a particular possible split choice (ie, firstright
1026  * and newitemonleft settings), and record the best split so far in *state.
1027  */
1028 static void
1029 _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
1030                                   int leftfree, int rightfree,
1031                                   bool newitemonleft, Size firstrightitemsz)
1032 {
1033         /*
1034          * Account for the new item on whichever side it is to be put.
1035          */
1036         if (newitemonleft)
1037                 leftfree -= (int) state->newitemsz;
1038         else
1039                 rightfree -= (int) state->newitemsz;
1040         /*
1041          * If we are not on the leaf level, we will be able to discard the
1042          * key data from the first item that winds up on the right page.
1043          */
1044         if (state->non_leaf)
1045                 rightfree += (int) firstrightitemsz -
1046                         (int) (MAXALIGN(sizeof(BTItemData)) + sizeof(ItemIdData));
1047         /*
1048          * If feasible split point, remember best delta.
1049          */
1050         if (leftfree >= 0 && rightfree >= 0)
1051         {
1052                 int             delta = leftfree - rightfree;
1053
1054                 if (delta < 0)
1055                         delta = -delta;
1056                 if (!state->have_split || delta < state->best_delta)
1057                 {
1058                         state->have_split = true;
1059                         state->newitemonleft = newitemonleft;
1060                         state->firstright = firstright;
1061                         state->best_delta = delta;
1062                 }
1063         }
1064 }
1065
1066 /*
1067  *      _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1068  *                                               we last looked at in the parent.
1069  *
1070  *              This is possible because we save a bit image of the last item
1071  *              we looked at in the parent, and the update algorithm guarantees
1072  *              that if items above us in the tree move, they only move right.
1073  *
1074  *              Also, re-set bts_blkno & bts_offset if changed.
1075  */
1076 static Buffer
1077 _bt_getstackbuf(Relation rel, BTStack stack)
1078 {
1079         BlockNumber blkno;
1080         Buffer          buf;
1081         OffsetNumber start,
1082                                 offnum,
1083                                 maxoff;
1084         Page            page;
1085         ItemId          itemid;
1086         BTItem          item;
1087         BTPageOpaque opaque;
1088
1089         blkno = stack->bts_blkno;
1090         buf = _bt_getbuf(rel, blkno, BT_WRITE);
1091         page = BufferGetPage(buf);
1092         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1093         maxoff = PageGetMaxOffsetNumber(page);
1094
1095         start = stack->bts_offset;
1096         /*
1097          * _bt_insertonpg set bts_offset to InvalidOffsetNumber in the
1098          * case of concurrent ROOT page split.  Also, watch out for
1099          * possibility that page has a high key now when it didn't before.
1100          */
1101         if (start < P_FIRSTDATAKEY(opaque))
1102                 start = P_FIRSTDATAKEY(opaque);
1103
1104         for (;;)
1105         {
1106                 /* see if it's on this page */
1107                 for (offnum = start;
1108                          offnum <= maxoff;
1109                          offnum = OffsetNumberNext(offnum))
1110                 {
1111                         itemid = PageGetItemId(page, offnum);
1112                         item = (BTItem) PageGetItem(page, itemid);
1113                         if (BTItemSame(item, &stack->bts_btitem))
1114                         {
1115                                 /* Return accurate pointer to where link is now */
1116                                 stack->bts_blkno = blkno;
1117                                 stack->bts_offset = offnum;
1118                                 return buf;
1119                         }
1120                 }
1121                 /* by here, the item we're looking for moved right at least one page */
1122                 if (P_RIGHTMOST(opaque))
1123                         elog(FATAL, "_bt_getstackbuf: my bits moved right off the end of the world!"
1124                                  "\n\tRecreate index %s.", RelationGetRelationName(rel));
1125
1126                 blkno = opaque->btpo_next;
1127                 _bt_relbuf(rel, buf, BT_WRITE);
1128                 buf = _bt_getbuf(rel, blkno, BT_WRITE);
1129                 page = BufferGetPage(buf);
1130                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1131                 maxoff = PageGetMaxOffsetNumber(page);
1132                 start = P_FIRSTDATAKEY(opaque);
1133         }
1134 }
1135
1136 /*
1137  *      _bt_newroot() -- Create a new root page for the index.
1138  *
1139  *              We've just split the old root page and need to create a new one.
1140  *              In order to do this, we add a new root page to the file, then lock
1141  *              the metadata page and update it.  This is guaranteed to be deadlock-
1142  *              free, because all readers release their locks on the metadata page
1143  *              before trying to lock the root, and all writers lock the root before
1144  *              trying to lock the metadata page.  We have a write lock on the old
1145  *              root page, so we have not introduced any cycles into the waits-for
1146  *              graph.
1147  *
1148  *              On entry, lbuf (the old root) and rbuf (its new peer) are write-
1149  *              locked.  On exit, a new root page exists with entries for the
1150  *              two new children.  The new root page is neither pinned nor locked, and
1151  *              we have also written out lbuf and rbuf and dropped their pins/locks.
1152  */
1153 void
1154 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1155 {
1156         Buffer          rootbuf;
1157         Page            lpage,
1158                                 rpage,
1159                                 rootpage;
1160         BlockNumber lbkno,
1161                                 rbkno;
1162         BlockNumber rootblknum;
1163         BTPageOpaque rootopaque;
1164         ItemId          itemid;
1165         BTItem          item;
1166         Size            itemsz;
1167         BTItem          new_item;
1168
1169 #ifdef XLOG
1170         Buffer          metabuf;
1171 #endif
1172
1173         /* get a new root page */
1174         rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1175         rootpage = BufferGetPage(rootbuf);
1176         rootblknum = BufferGetBlockNumber(rootbuf);
1177
1178 #ifdef XLOG
1179         metabuf = _bt_getbuf(rel, BTREE_METAPAGE,BT_WRITE);
1180 #endif
1181
1182         /* NO ELOG(ERROR) from here till newroot op is logged */
1183
1184         /* set btree special data */
1185         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1186         rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1187         rootopaque->btpo_flags |= BTP_ROOT;
1188         rootopaque->btpo_parent = BTREE_METAPAGE;
1189
1190         lbkno = BufferGetBlockNumber(lbuf);
1191         rbkno = BufferGetBlockNumber(rbuf);
1192         lpage = BufferGetPage(lbuf);
1193         rpage = BufferGetPage(rbuf);
1194
1195         /*
1196          * Make sure pages in old root level have valid parent links --- we will
1197          * need this in _bt_insertonpg() if a concurrent root split happens (see
1198          * README).
1199          */
1200         ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo_parent =
1201                 ((BTPageOpaque) PageGetSpecialPointer(rpage))->btpo_parent =
1202                 rootblknum;
1203
1204         /*
1205          * Create downlink item for left page (old root).  Since this will be
1206          * the first item in a non-leaf page, it implicitly has minus-infinity
1207          * key value, so we need not store any actual key in it.
1208          */
1209         itemsz = sizeof(BTItemData);
1210         new_item = (BTItem) palloc(itemsz);
1211         new_item->bti_itup.t_info = itemsz;
1212         ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY);
1213
1214         /*
1215          * Insert the left page pointer into the new root page.  The root page
1216          * is the rightmost page on its level so there is no "high key" in it;
1217          * the two items will go into positions P_HIKEY and P_FIRSTKEY.
1218          */
1219         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
1220                 elog(STOP, "btree: failed to add leftkey to new root page");
1221         pfree(new_item);
1222
1223         /*
1224          * Create downlink item for right page.  The key for it is obtained from
1225          * the "high key" position in the left page.
1226          */
1227         itemid = PageGetItemId(lpage, P_HIKEY);
1228         itemsz = ItemIdGetLength(itemid);
1229         item = (BTItem) PageGetItem(lpage, itemid);
1230         new_item = _bt_formitem(&(item->bti_itup));
1231         ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY);
1232
1233         /*
1234          * insert the right page pointer into the new root page.
1235          */
1236         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
1237                 elog(STOP, "btree: failed to add rightkey to new root page");
1238         pfree(new_item);
1239
1240 #ifdef XLOG
1241         /* XLOG stuff */
1242         {
1243                 xl_btree_newroot        xlrec;
1244                 Page                            metapg = BufferGetPage(metabuf);
1245                 BTMetaPageData     *metad = BTPageGetMeta(metapg);
1246                 XLogRecPtr                      recptr;
1247
1248                 xlrec.node = rel->rd_node;
1249                 BlockIdSet(&(xlrec.rootblk), rootblknum);
1250
1251                 /* 
1252                  * Dirrect access to page is not good but faster - we should 
1253                  * implement some new func in page API.
1254                  */
1255                 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT,
1256                         (char*)&xlrec, SizeOfBtreeNewroot, 
1257                         (char*)rootpage + ((PageHeader) rootpage)->pd_upper,
1258                         ((PageHeader) rootpage)->pd_special - ((PageHeader) rootpage)->pd_upper);
1259
1260                 metad->btm_root = rootblknum;
1261                 (metad->btm_level)++;
1262
1263                 PageSetLSN(rootpage, recptr);
1264                 PageSetSUI(rootpage, ThisStartUpID);
1265                 PageSetLSN(metapg, recptr);
1266                 PageSetSUI(metapg, ThisStartUpID);
1267
1268                 _bt_wrtbuf(rel, metabuf);
1269         }
1270 #endif
1271
1272         /* write and let go of the new root buffer */
1273         _bt_wrtbuf(rel, rootbuf);
1274
1275 #ifndef XLOG
1276         /* update metadata page with new root block number */
1277         _bt_metaproot(rel, rootblknum, 0);
1278 #endif
1279
1280         /* update and release new sibling, and finally the old root */
1281         _bt_wrtbuf(rel, rbuf);
1282         _bt_wrtbuf(rel, lbuf);
1283 }
1284
1285 /*
1286  *      _bt_pgaddtup() -- add a tuple to a particular page in the index.
1287  *
1288  *              This routine adds the tuple to the page as requested.  It does
1289  *              not affect pin/lock status, but you'd better have a write lock
1290  *              and pin on the target buffer!  Don't forget to write and release
1291  *              the buffer afterwards, either.
1292  *
1293  *              The main difference between this routine and a bare PageAddItem call
1294  *              is that this code knows that the leftmost data item on a non-leaf
1295  *              btree page doesn't need to have a key.  Therefore, it strips such
1296  *              items down to just the item header.  CAUTION: this works ONLY if
1297  *              we insert the items in order, so that the given itup_off does
1298  *              represent the final position of the item!
1299  */
1300 static void
1301 _bt_pgaddtup(Relation rel,
1302                          Page page,
1303                          Size itemsize,
1304                          BTItem btitem,
1305                          OffsetNumber itup_off,
1306                          const char *where)
1307 {
1308         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1309         BTItemData truncitem;
1310
1311         if (! P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
1312         {
1313                 memcpy(&truncitem, btitem, sizeof(BTItemData));
1314                 truncitem.bti_itup.t_info = sizeof(BTItemData);
1315                 btitem = &truncitem;
1316                 itemsize = sizeof(BTItemData);
1317         }
1318
1319         if (PageAddItem(page, (Item) btitem, itemsize, itup_off,
1320                                         LP_USED) == InvalidOffsetNumber)
1321                 elog(STOP, "btree: failed to add item to the %s for %s",
1322                          where, RelationGetRelationName(rel));
1323 }
1324
1325 /*
1326  * _bt_isequal - used in _bt_doinsert in check for duplicates.
1327  *
1328  * This is very similar to _bt_compare, except for NULL handling.
1329  * Rule is simple: NOT_NULL not equal NULL, NULL not_equal NULL too.
1330  */
1331 static bool
1332 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
1333                         int keysz, ScanKey scankey)
1334 {
1335         BTItem          btitem;
1336         IndexTuple      itup;
1337         int                     i;
1338
1339         /* Better be comparing to a leaf item */
1340         Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
1341
1342         btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
1343         itup = &(btitem->bti_itup);
1344
1345         for (i = 1; i <= keysz; i++)
1346         {
1347                 ScanKey         entry = &scankey[i - 1];
1348                 AttrNumber      attno;
1349                 Datum           datum;
1350                 bool            isNull;
1351                 int32           result;
1352
1353                 attno = entry->sk_attno;
1354                 Assert(attno == i);
1355                 datum = index_getattr(itup, attno, itupdesc, &isNull);
1356
1357                 /* NULLs are never equal to anything */
1358                 if (entry->sk_flags & SK_ISNULL || isNull)
1359                         return false;
1360
1361                 result = DatumGetInt32(FunctionCall2(&entry->sk_func,
1362                                                                                          entry->sk_argument,
1363                                                                                          datum));
1364
1365                 if (result != 0)
1366                         return false;
1367         }
1368
1369         /* if we get here, the keys are equal */
1370         return true;
1371 }