]> granicus.if.org Git - postgresql/blob - src/backend/access/nbtree/nbtinsert.c
Tweak btree insertion to avoid O(N^2) slowdown with large numbers of
[postgresql] / src / backend / access / nbtree / nbtinsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * btinsert.c
4  *        Item insertion in Lehman and Yao btrees for Postgres.
5  *
6  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.62 2000/08/25 23:13:33 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20
21
22 typedef struct
23 {
24         /* context data for _bt_checksplitloc */
25         Size    newitemsz;                      /* size of new item to be inserted */
26         bool    non_leaf;                       /* T if splitting an internal node */
27
28         bool    have_split;                     /* found a valid split? */
29
30         /* these fields valid only if have_split is true */
31         bool    newitemonleft;          /* new item on left or right of best split */
32         OffsetNumber firstright;        /* best split point */
33         int             best_delta;                     /* best size delta so far */
34 } FindSplitData;
35
36
37 static TransactionId _bt_check_unique(Relation rel, BTItem btitem,
38                                                                           Relation heapRel, Buffer buf,
39                                                                           ScanKey itup_scankey);
40 static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf,
41                                                                                 BTStack stack,
42                                                                                 int keysz, ScanKey scankey,
43                                                                                 BTItem btitem,
44                                                                                 OffsetNumber afteritem);
45 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
46                                                 OffsetNumber newitemoff, Size newitemsz,
47                                                 BTItem newitem, bool newitemonleft,
48                                                 OffsetNumber *itup_off, BlockNumber *itup_blkno);
49 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
50                                                                          OffsetNumber newitemoff,
51                                                                          Size newitemsz,
52                                                                          bool *newitemonleft);
53 static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
54                                                           int leftfree, int rightfree,
55                                                           bool newitemonleft, Size firstrightitemsz);
56 static Buffer _bt_getstackbuf(Relation rel, BTStack stack);
57 static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
58 static void _bt_pgaddtup(Relation rel, Page page,
59                                                  Size itemsize, BTItem btitem,
60                                                  OffsetNumber itup_off, const char *where);
61 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
62                                                 int keysz, ScanKey scankey);
63
64 /*
65  *      _bt_doinsert() -- Handle insertion of a single btitem in the tree.
66  *
67  *              This routine is called by the public interface routines, btbuild
68  *              and btinsert.  By here, btitem is filled in, including the TID.
69  */
70 InsertIndexResult
71 _bt_doinsert(Relation rel, BTItem btitem,
72                          bool index_is_unique, Relation heapRel)
73 {
74         IndexTuple      itup = &(btitem->bti_itup);
75         int                     natts = rel->rd_rel->relnatts;
76         ScanKey         itup_scankey;
77         BTStack         stack;
78         Buffer          buf;
79         InsertIndexResult res;
80
81         /* we need a scan key to do our search, so build one */
82         itup_scankey = _bt_mkscankey(rel, itup);
83
84 top:
85         /* find the page containing this key */
86         stack = _bt_search(rel, natts, itup_scankey, &buf, BT_WRITE);
87
88         /* trade in our read lock for a write lock */
89         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
90         LockBuffer(buf, BT_WRITE);
91
92         /*
93          * If the page was split between the time that we surrendered our read
94          * lock and acquired our write lock, then this page may no longer be
95          * the right place for the key we want to insert.  In this case, we
96          * need to move right in the tree.      See Lehman and Yao for an
97          * excruciatingly precise description.
98          */
99         buf = _bt_moveright(rel, buf, natts, itup_scankey, BT_WRITE);
100
101         /*
102          * If we're not allowing duplicates, make sure the key isn't
103          * already in the index.  XXX this belongs somewhere else, likely
104          */
105         if (index_is_unique)
106         {
107                 TransactionId xwait;
108
109                 xwait = _bt_check_unique(rel, btitem, heapRel, buf, itup_scankey);
110
111                 if (TransactionIdIsValid(xwait))
112                 {
113                         /* Have to wait for the other guy ... */
114                         _bt_relbuf(rel, buf, BT_WRITE);
115                         XactLockTableWait(xwait);
116                         /* start over... */
117                         _bt_freestack(stack);
118                         goto top;
119                 }
120         }
121
122         /* do the insertion */
123         res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem, 0);
124
125         /* be tidy */
126         _bt_freestack(stack);
127         _bt_freeskey(itup_scankey);
128
129         return res;
130 }
131
132 /*
133  *      _bt_check_unique() -- Check for violation of unique index constraint
134  *
135  * Returns NullTransactionId if there is no conflict, else an xact ID we
136  * must wait for to see if it commits a conflicting tuple.  If an actual
137  * conflict is detected, no return --- just elog().
138  */
139 static TransactionId
140 _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel,
141                                  Buffer buf, ScanKey itup_scankey)
142 {
143         TupleDesc       itupdesc = RelationGetDescr(rel);
144         int                     natts = rel->rd_rel->relnatts;
145         OffsetNumber offset,
146                                 maxoff;
147         Page            page;
148         BTPageOpaque opaque;
149         Buffer          nbuf = InvalidBuffer;
150         bool            chtup = true;
151
152         page = BufferGetPage(buf);
153         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
154         maxoff = PageGetMaxOffsetNumber(page);
155
156         /*
157          * Find first item >= proposed new item.  Note we could also get
158          * a pointer to end-of-page here.
159          */
160         offset = _bt_binsrch(rel, buf, natts, itup_scankey);
161
162         /*
163          * Scan over all equal tuples, looking for live conflicts.
164          */
165         for (;;)
166         {
167                 HeapTupleData htup;
168                 Buffer          buffer;
169                 BTItem          cbti;
170                 BlockNumber nblkno;
171
172                 /*
173                  * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
174                  * how we handling NULLs - and so we must not use _bt_compare
175                  * in real comparison, but only for ordering/finding items on
176                  * pages. - vadim 03/24/97
177                  *
178                  * make sure the offset points to an actual key
179                  * before trying to compare it...
180                  */
181                 if (offset <= maxoff)
182                 {
183                         if (! _bt_isequal(itupdesc, page, offset, natts, itup_scankey))
184                                 break;                  /* we're past all the equal tuples */
185
186                         /*
187                          * Have to check is inserted heap tuple deleted one (i.e.
188                          * just moved to another place by vacuum)!  We only need to
189                          * do this once, but don't want to do it at all unless
190                          * we see equal tuples, so as not to slow down unequal case.
191                          */
192                         if (chtup)
193                         {
194                                 htup.t_self = btitem->bti_itup.t_tid;
195                                 heap_fetch(heapRel, SnapshotDirty, &htup, &buffer);
196                                 if (htup.t_data == NULL)                /* YES! */
197                                         break;
198                                 /* Live tuple is being inserted, so continue checking */
199                                 ReleaseBuffer(buffer);
200                                 chtup = false;
201                         }
202
203                         cbti = (BTItem) PageGetItem(page, PageGetItemId(page, offset));
204                         htup.t_self = cbti->bti_itup.t_tid;
205                         heap_fetch(heapRel, SnapshotDirty, &htup, &buffer);
206                         if (htup.t_data != NULL)                /* it is a duplicate */
207                         {
208                                 TransactionId xwait =
209                                         (TransactionIdIsValid(SnapshotDirty->xmin)) ?
210                                         SnapshotDirty->xmin : SnapshotDirty->xmax;
211
212                                 /*
213                                  * If this tuple is being updated by other transaction
214                                  * then we have to wait for its commit/abort.
215                                  */
216                                 ReleaseBuffer(buffer);
217                                 if (TransactionIdIsValid(xwait))
218                                 {
219                                         if (nbuf != InvalidBuffer)
220                                                 _bt_relbuf(rel, nbuf, BT_READ);
221                                         /* Tell _bt_doinsert to wait... */
222                                         return xwait;
223                                 }
224                                 /*
225                                  * Otherwise we have a definite conflict.
226                                  */
227                                 elog(ERROR, "Cannot insert a duplicate key into unique index %s",
228                                          RelationGetRelationName(rel));
229                         }
230                         /* htup null so no buffer to release */
231                 }
232
233                 /*
234                  * Advance to next tuple to continue checking.
235                  */
236                 if (offset < maxoff)
237                         offset = OffsetNumberNext(offset);
238                 else
239                 {
240                         /* If scankey == hikey we gotta check the next page too */
241                         if (P_RIGHTMOST(opaque))
242                                 break;
243                         if (!_bt_isequal(itupdesc, page, P_HIKEY,
244                                                          natts, itup_scankey))
245                                 break;
246                         nblkno = opaque->btpo_next;
247                         if (nbuf != InvalidBuffer)
248                                 _bt_relbuf(rel, nbuf, BT_READ);
249                         nbuf = _bt_getbuf(rel, nblkno, BT_READ);
250                         page = BufferGetPage(nbuf);
251                         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
252                         maxoff = PageGetMaxOffsetNumber(page);
253                         offset = P_FIRSTDATAKEY(opaque);
254                 }
255         }
256
257         if (nbuf != InvalidBuffer)
258                 _bt_relbuf(rel, nbuf, BT_READ);
259
260         return NullTransactionId;
261 }
262
263 /*----------
264  *      _bt_insertonpg() -- Insert a tuple on a particular page in the index.
265  *
266  *              This recursive procedure does the following things:
267  *
268  *                      +  finds the right place to insert the tuple.
269  *                      +  if necessary, splits the target page (making sure that the
270  *                         split is equitable as far as post-insert free space goes).
271  *                      +  inserts the tuple.
272  *                      +  if the page was split, pops the parent stack, and finds the
273  *                         right place to insert the new child pointer (by walking
274  *                         right using information stored in the parent stack).
275  *                      +  invokes itself with the appropriate tuple for the right
276  *                         child page on the parent.
277  *
278  *              On entry, we must have the right buffer on which to do the
279  *              insertion, and the buffer must be pinned and locked.  On return,
280  *              we will have dropped both the pin and the write lock on the buffer.
281  *
282  *              If 'afteritem' is >0 then the new tuple must be inserted after the
283  *              existing item of that number, noplace else.  If 'afteritem' is 0
284  *              then the procedure finds the exact spot to insert it by searching.
285  *              (keysz and scankey parameters are used ONLY if afteritem == 0.)
286  *
287  *              NOTE: if the new key is equal to one or more existing keys, we can
288  *              legitimately place it anywhere in the series of equal keys --- in fact,
289  *              if the new key is equal to the page's "high key" we can place it on
290  *              the next page.  If it is equal to the high key, and there's not room
291  *              to insert the new tuple on the current page without splitting, then
292  *              we can move right hoping to find more free space and avoid a split.
293  *              (We should not move right indefinitely, however, since that leads to
294  *              O(N^2) insertion behavior in the presence of many equal keys.)
295  *              Once we have chosen the page to put the key on, we'll insert it before
296  *              any existing equal keys because of the way _bt_binsrch() works.
297  *
298  *              The locking interactions in this code are critical.  You should
299  *              grok Lehman and Yao's paper before making any changes.  In addition,
300  *              you need to understand how we disambiguate duplicate keys in this
301  *              implementation, in order to be able to find our location using
302  *              L&Y "move right" operations.  Since we may insert duplicate user
303  *              keys, and since these dups may propagate up the tree, we use the
304  *              'afteritem' parameter to position ourselves correctly for the
305  *              insertion on internal pages.
306  *----------
307  */
308 static InsertIndexResult
309 _bt_insertonpg(Relation rel,
310                            Buffer buf,
311                            BTStack stack,
312                            int keysz,
313                            ScanKey scankey,
314                            BTItem btitem,
315                            OffsetNumber afteritem)
316 {
317         InsertIndexResult res;
318         Page            page;
319         BTPageOpaque lpageop;
320         OffsetNumber itup_off;
321         BlockNumber itup_blkno;
322         OffsetNumber newitemoff;
323         OffsetNumber firstright = InvalidOffsetNumber;
324         Size            itemsz;
325
326         page = BufferGetPage(buf);
327         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
328
329         itemsz = IndexTupleDSize(btitem->bti_itup)
330                 + (sizeof(BTItemData) - sizeof(IndexTupleData));
331
332         itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but
333                                                                  * we need to be consistent */
334
335         /*
336          * Check whether the item can fit on a btree page at all. (Eventually,
337          * we ought to try to apply TOAST methods if not.) We actually need to
338          * be able to fit three items on every page, so restrict any one item
339          * to 1/3 the per-page available space. Note that at this point,
340          * itemsz doesn't include the ItemId.
341          */
342         if (itemsz > (PageGetPageSize(page) - sizeof(PageHeaderData) - MAXALIGN(sizeof(BTPageOpaqueData))) / 3 - sizeof(ItemIdData))
343                 elog(ERROR, "btree: index item size %u exceeds maximum %lu",
344                          itemsz,
345                          (PageGetPageSize(page) - sizeof(PageHeaderData) - MAXALIGN(sizeof(BTPageOpaqueData))) /3 - sizeof(ItemIdData));
346
347         /*
348          * Determine exactly where new item will go.
349          */
350         if (afteritem > 0)
351         {
352                 newitemoff = afteritem + 1;
353         }
354         else
355         {
356                 /*----------
357                  * If we will need to split the page to put the item here,
358                  * check whether we can put the tuple somewhere to the right,
359                  * instead.  Keep scanning right until we
360                  *              (a) find a page with enough free space,
361                  *              (b) reach the last page where the tuple can legally go, or
362                  *              (c) get tired of searching.
363                  * (c) is not flippant; it is important because if there are many
364                  * pages' worth of equal keys, it's better to split one of the early
365                  * pages than to scan all the way to the end of the run of equal keys
366                  * on every insert.  We implement "get tired" as a random choice,
367                  * since stopping after scanning a fixed number of pages wouldn't work
368                  * well (we'd never reach the right-hand side of previously split
369                  * pages).  Currently the probability of moving right is set at 0.99,
370                  * which may seem too high to change the behavior much, but it does an
371                  * excellent job of preventing O(N^2) behavior with many equal keys.
372                  *----------
373                  */
374                 bool    movedright = false;
375
376                 while (PageGetFreeSpace(page) < itemsz &&
377                            !P_RIGHTMOST(lpageop) &&
378                            _bt_compare(rel, keysz, scankey, page, P_HIKEY) == 0 &&
379                            random() > (MAX_RANDOM_VALUE / 100))
380                 {
381                         /* step right one page */
382                         BlockNumber             rblkno = lpageop->btpo_next;
383
384                         _bt_relbuf(rel, buf, BT_WRITE);
385                         buf = _bt_getbuf(rel, rblkno, BT_WRITE);
386                         page = BufferGetPage(buf);
387                         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
388                         movedright = true;
389                 }
390                 /*
391                  * Now we are on the right page, so find the insert position.
392                  * If we moved right at all, we know we should insert at the
393                  * start of the page, else must find the position by searching.
394                  */
395                 if (movedright)
396                         newitemoff = P_FIRSTDATAKEY(lpageop);
397                 else
398                         newitemoff = _bt_binsrch(rel, buf, keysz, scankey);
399         }
400
401         /*
402          * Do we need to split the page to fit the item on it?
403          *
404          * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its
405          * result, so this comparison is correct even though we appear to
406          * be accounting only for the item and not for its line pointer.
407          */
408         if (PageGetFreeSpace(page) < itemsz)
409         {
410                 Buffer          rbuf;
411                 BlockNumber bknum = BufferGetBlockNumber(buf);
412                 BlockNumber rbknum;
413                 bool            is_root = P_ISROOT(lpageop);
414                 bool            newitemonleft;
415
416                 /* Choose the split point */
417                 firstright = _bt_findsplitloc(rel, page,
418                                                                           newitemoff, itemsz,
419                                                                           &newitemonleft);
420
421                 /* split the buffer into left and right halves */
422                 rbuf = _bt_split(rel, buf, firstright,
423                                                  newitemoff, itemsz, btitem, newitemonleft,
424                                                  &itup_off, &itup_blkno);
425
426                 /*----------
427                  * By here,
428                  *
429                  *              +  our target page has been split;
430                  *              +  the original tuple has been inserted;
431                  *              +  we have write locks on both the old (left half)
432                  *                 and new (right half) buffers, after the split; and
433                  *              +  we know the key we want to insert into the parent
434                  *                 (it's the "high key" on the left child page).
435                  *
436                  * We're ready to do the parent insertion.  We need to hold onto the
437                  * locks for the child pages until we locate the parent, but we can
438                  * release them before doing the actual insertion (see Lehman and Yao
439                  * for the reasoning).
440                  *
441                  * Here we have to do something Lehman and Yao don't talk about:
442                  * deal with a root split and construction of a new root.  If our
443                  * stack is empty then we have just split a node on what had been
444                  * the root level when we descended the tree.  If it is still the
445                  * root then we perform a new-root construction.  If it *wasn't*
446                  * the root anymore, use the parent pointer to get up to the root
447                  * level that someone constructed meanwhile, and find the right
448                  * place to insert as for the normal case.
449                  *----------
450                  */
451
452                 if (is_root)
453                 {
454                         Assert(stack == (BTStack) NULL);
455                         /* create a new root node and release the split buffers */
456                         _bt_newroot(rel, buf, rbuf);
457                 }
458                 else
459                 {
460                         InsertIndexResult newres;
461                         BTItem          new_item;
462                         BTStackData     fakestack;
463                         BTItem          ritem;
464                         Buffer          pbuf;
465
466                         /* Set up a phony stack entry if we haven't got a real one */
467                         if (stack == (BTStack) NULL)
468                         {
469                                 elog(DEBUG, "btree: concurrent ROOT page split");
470                                 stack = &fakestack;
471                                 stack->bts_blkno = lpageop->btpo_parent;
472                                 stack->bts_offset = InvalidOffsetNumber;
473                                 /* bts_btitem will be initialized below */
474                                 stack->bts_parent = NULL;
475                         }
476
477                         /* get high key from left page == lowest key on new right page */
478                         ritem = (BTItem) PageGetItem(page,
479                                                                                  PageGetItemId(page, P_HIKEY));
480
481                         /* form an index tuple that points at the new right page */
482                         new_item = _bt_formitem(&(ritem->bti_itup));
483                         rbknum = BufferGetBlockNumber(rbuf);
484                         ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY);
485
486                         /*
487                          * Find the parent buffer and get the parent page.
488                          *
489                          * Oops - if we were moved right then we need to change stack
490                          * item! We want to find parent pointing to where we are,
491                          * right ?        - vadim 05/27/97
492                          *
493                          * Interestingly, this means we didn't *really* need to stack
494                          * the parent key at all; all we really care about is the
495                          * saved block and offset as a starting point for our search...
496                          */
497                         ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
498                                                    bknum, P_HIKEY);
499
500                         pbuf = _bt_getstackbuf(rel, stack);
501
502                         /* Now we can write and unlock the children */
503                         _bt_wrtbuf(rel, rbuf);
504                         _bt_wrtbuf(rel, buf);
505
506                         /* Recursively update the parent */
507                         newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
508                                                                         0, NULL, new_item, stack->bts_offset);
509
510                         /* be tidy */
511                         pfree(newres);
512                         pfree(new_item);
513                 }
514         }
515         else
516         {
517                 _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
518                 itup_off = newitemoff;
519                 itup_blkno = BufferGetBlockNumber(buf);
520                 /* Write out the updated page and release pin/lock */
521                 _bt_wrtbuf(rel, buf);
522         }
523
524         /* by here, the new tuple is inserted at itup_blkno/itup_off */
525         res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
526         ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
527
528         return res;
529 }
530
531 /*
532  *      _bt_split() -- split a page in the btree.
533  *
534  *              On entry, buf is the page to split, and is write-locked and pinned.
535  *              firstright is the item index of the first item to be moved to the
536  *              new right page.  newitemoff etc. tell us about the new item that
537  *              must be inserted along with the data from the old page.
538  *
539  *              Returns the new right sibling of buf, pinned and write-locked.
540  *              The pin and lock on buf are maintained.  *itup_off and *itup_blkno
541  *              are set to the exact location where newitem was inserted.
542  */
543 static Buffer
544 _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
545                   OffsetNumber newitemoff, Size newitemsz, BTItem newitem,
546                   bool newitemonleft,
547                   OffsetNumber *itup_off, BlockNumber *itup_blkno)
548 {
549         Buffer          rbuf;
550         Page            origpage;
551         Page            leftpage,
552                                 rightpage;
553         BTPageOpaque ropaque,
554                                 lopaque,
555                                 oopaque;
556         Buffer          sbuf;
557         Page            spage;
558         BTPageOpaque sopaque;
559         Size            itemsz;
560         ItemId          itemid;
561         BTItem          item;
562         OffsetNumber leftoff,
563                                 rightoff;
564         OffsetNumber maxoff;
565         OffsetNumber i;
566
567         rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
568         origpage = BufferGetPage(buf);
569         leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
570         rightpage = BufferGetPage(rbuf);
571
572         _bt_pageinit(leftpage, BufferGetPageSize(buf));
573         _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
574
575         /* init btree private data */
576         oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
577         lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
578         ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
579
580         /* if we're splitting this page, it won't be the root when we're done */
581         oopaque->btpo_flags &= ~BTP_ROOT;
582         lopaque->btpo_flags = ropaque->btpo_flags = oopaque->btpo_flags;
583         lopaque->btpo_prev = oopaque->btpo_prev;
584         lopaque->btpo_next = BufferGetBlockNumber(rbuf);
585         ropaque->btpo_prev = BufferGetBlockNumber(buf);
586         ropaque->btpo_next = oopaque->btpo_next;
587
588         /*
589          * Must copy the original parent link into both new pages, even though
590          * it might be quite obsolete by now.  We might need it if this level
591          * is or recently was the root (see README).
592          */
593         lopaque->btpo_parent = ropaque->btpo_parent = oopaque->btpo_parent;
594
595         /*
596          * If the page we're splitting is not the rightmost page at its level
597          * in the tree, then the first entry on the page is the high key
598          * for the page.  We need to copy that to the right half.  Otherwise
599          * (meaning the rightmost page case), all the items on the right half
600          * will be user data.
601          */
602         rightoff = P_HIKEY;
603
604         if (!P_RIGHTMOST(oopaque))
605         {
606                 itemid = PageGetItemId(origpage, P_HIKEY);
607                 itemsz = ItemIdGetLength(itemid);
608                 item = (BTItem) PageGetItem(origpage, itemid);
609                 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
610                                                 LP_USED) == InvalidOffsetNumber)
611                         elog(FATAL, "btree: failed to add hikey to the right sibling");
612                 rightoff = OffsetNumberNext(rightoff);
613         }
614
615         /*
616          * The "high key" for the new left page will be the first key that's
617          * going to go into the new right page.  This might be either the
618          * existing data item at position firstright, or the incoming tuple.
619          */
620         leftoff = P_HIKEY;
621         if (!newitemonleft && newitemoff == firstright)
622         {
623                 /* incoming tuple will become first on right page */
624                 itemsz = newitemsz;
625                 item = newitem;
626         }
627         else
628         {
629                 /* existing item at firstright will become first on right page */
630                 itemid = PageGetItemId(origpage, firstright);
631                 itemsz = ItemIdGetLength(itemid);
632                 item = (BTItem) PageGetItem(origpage, itemid);
633         }
634         if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
635                                         LP_USED) == InvalidOffsetNumber)
636                 elog(FATAL, "btree: failed to add hikey to the left sibling");
637         leftoff = OffsetNumberNext(leftoff);
638
639         /*
640          * Now transfer all the data items to the appropriate page
641          */
642         maxoff = PageGetMaxOffsetNumber(origpage);
643
644         for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
645         {
646                 itemid = PageGetItemId(origpage, i);
647                 itemsz = ItemIdGetLength(itemid);
648                 item = (BTItem) PageGetItem(origpage, itemid);
649
650                 /* does new item belong before this one? */
651                 if (i == newitemoff)
652                 {
653                         if (newitemonleft)
654                         {
655                                 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
656                                                          "left sibling");
657                                 *itup_off = leftoff;
658                                 *itup_blkno = BufferGetBlockNumber(buf);
659                                 leftoff = OffsetNumberNext(leftoff);
660                         }
661                         else
662                         {
663                                 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
664                                                          "right sibling");
665                                 *itup_off = rightoff;
666                                 *itup_blkno = BufferGetBlockNumber(rbuf);
667                                 rightoff = OffsetNumberNext(rightoff);
668                         }
669                 }
670
671                 /* decide which page to put it on */
672                 if (i < firstright)
673                 {
674                         _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
675                                                          "left sibling");
676                         leftoff = OffsetNumberNext(leftoff);
677                 }
678                 else
679                 {
680                         _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
681                                                          "right sibling");
682                         rightoff = OffsetNumberNext(rightoff);
683                 }
684         }
685
686         /* cope with possibility that newitem goes at the end */
687         if (i <= newitemoff)
688         {
689                 if (newitemonleft)
690                 {
691                         _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
692                                                  "left sibling");
693                         *itup_off = leftoff;
694                         *itup_blkno = BufferGetBlockNumber(buf);
695                         leftoff = OffsetNumberNext(leftoff);
696                 }
697                 else
698                 {
699                         _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
700                                                  "right sibling");
701                         *itup_off = rightoff;
702                         *itup_blkno = BufferGetBlockNumber(rbuf);
703                         rightoff = OffsetNumberNext(rightoff);
704                 }
705         }
706
707         /*
708          * By here, the original data page has been split into two new halves,
709          * and these are correct.  The algorithm requires that the left page
710          * never move during a split, so we copy the new left page back on top
711          * of the original.  Note that this is not a waste of time, since we
712          * also require (in the page management code) that the center of a
713          * page always be clean, and the most efficient way to guarantee this
714          * is just to compact the data by reinserting it into a new left page.
715          */
716
717         PageRestoreTempPage(leftpage, origpage);
718
719         /*
720          * Finally, we need to grab the right sibling (if any) and fix the
721          * prev pointer there.  We are guaranteed that this is deadlock-free
722          * since no other writer will be holding a lock on that page
723          * and trying to move left, and all readers release locks on a page
724          * before trying to fetch its neighbors.
725          */
726
727         if (!P_RIGHTMOST(ropaque))
728         {
729                 sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
730                 spage = BufferGetPage(sbuf);
731                 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
732                 sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
733
734                 /* write and release the old right sibling */
735                 _bt_wrtbuf(rel, sbuf);
736         }
737
738         /* split's done */
739         return rbuf;
740 }
741
742 /*
743  *      _bt_findsplitloc() -- find an appropriate place to split a page.
744  *
745  * The idea here is to equalize the free space that will be on each split
746  * page, *after accounting for the inserted tuple*.  (If we fail to account
747  * for it, we might find ourselves with too little room on the page that
748  * it needs to go into!)
749  *
750  * We are passed the intended insert position of the new tuple, expressed as
751  * the offsetnumber of the tuple it must go in front of.  (This could be
752  * maxoff+1 if the tuple is to go at the end.)
753  *
754  * We return the index of the first existing tuple that should go on the
755  * righthand page, plus a boolean indicating whether the new tuple goes on
756  * the left or right page.  The bool is necessary to disambiguate the case
757  * where firstright == newitemoff.
758  */
759 static OffsetNumber
760 _bt_findsplitloc(Relation rel,
761                                  Page page,
762                                  OffsetNumber newitemoff,
763                                  Size newitemsz,
764                                  bool *newitemonleft)
765 {
766         BTPageOpaque opaque;
767         OffsetNumber offnum;
768         OffsetNumber maxoff;
769         ItemId          itemid;
770         FindSplitData state;
771         int                     leftspace,
772                                 rightspace,
773                                 goodenough,
774                                 dataitemtotal,
775                                 dataitemstoleft;
776
777         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
778
779         /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
780         newitemsz += sizeof(ItemIdData);
781         state.newitemsz = newitemsz;
782         state.non_leaf = ! P_ISLEAF(opaque);
783         state.have_split = false;
784
785         /* Total free space available on a btree page, after fixed overhead */
786         leftspace = rightspace =
787                 PageGetPageSize(page) - sizeof(PageHeaderData) -
788                 MAXALIGN(sizeof(BTPageOpaqueData))
789                 + sizeof(ItemIdData);
790
791         /*
792          * Finding the best possible split would require checking all the possible
793          * split points, because of the high-key and left-key special cases.
794          * That's probably more work than it's worth; instead, stop as soon as
795          * we find a "good-enough" split, where good-enough is defined as an
796          * imbalance in free space of no more than pagesize/16 (arbitrary...)
797          * This should let us stop near the middle on most pages, instead of
798          * plowing to the end.
799          */
800         goodenough = leftspace / 16;
801
802         /* The right page will have the same high key as the old page */
803         if (!P_RIGHTMOST(opaque))
804         {
805                 itemid = PageGetItemId(page, P_HIKEY);
806                 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
807                                                          sizeof(ItemIdData));
808         }
809
810         /* Count up total space in data items without actually scanning 'em */
811         dataitemtotal = rightspace - (int) PageGetFreeSpace(page);
812
813         /*
814          * Scan through the data items and calculate space usage for a split
815          * at each possible position.
816          */
817         dataitemstoleft = 0;
818         maxoff = PageGetMaxOffsetNumber(page);
819
820         for (offnum = P_FIRSTDATAKEY(opaque);
821                  offnum <= maxoff;
822                  offnum = OffsetNumberNext(offnum))
823         {
824                 Size            itemsz;
825                 int                     leftfree,
826                                         rightfree;
827
828                 itemid = PageGetItemId(page, offnum);
829                 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
830
831                 /*
832                  * We have to allow for the current item becoming the high key of
833                  * the left page; therefore it counts against left space as well
834                  * as right space.
835                  */
836                 leftfree = leftspace - dataitemstoleft - (int) itemsz;
837                 rightfree = rightspace - (dataitemtotal - dataitemstoleft);
838                 /*
839                  * Will the new item go to left or right of split?
840                  */
841                 if (offnum > newitemoff)
842                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
843                                                           true, itemsz);
844                 else if (offnum < newitemoff)
845                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
846                                                           false, itemsz);
847                 else
848                 {
849                         /* need to try it both ways! */
850                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
851                                                           true, itemsz);
852                         /* here we are contemplating newitem as first on right */
853                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
854                                                           false, newitemsz);
855                 }
856
857                 /* Abort scan once we find a good-enough choice */
858                 if (state.have_split && state.best_delta <= goodenough)
859                         break;
860
861                 dataitemstoleft += itemsz;
862         }
863
864         /*
865          * I believe it is not possible to fail to find a feasible split,
866          * but just in case ...
867          */
868         if (! state.have_split)
869                 elog(FATAL, "_bt_findsplitloc: can't find a feasible split point for %s",
870                          RelationGetRelationName(rel));
871
872         *newitemonleft = state.newitemonleft;
873         return state.firstright;
874 }
875
876 /*
877  * Subroutine to analyze a particular possible split choice (ie, firstright
878  * and newitemonleft settings), and record the best split so far in *state.
879  */
880 static void
881 _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
882                                   int leftfree, int rightfree,
883                                   bool newitemonleft, Size firstrightitemsz)
884 {
885         /*
886          * Account for the new item on whichever side it is to be put.
887          */
888         if (newitemonleft)
889                 leftfree -= (int) state->newitemsz;
890         else
891                 rightfree -= (int) state->newitemsz;
892         /*
893          * If we are not on the leaf level, we will be able to discard the
894          * key data from the first item that winds up on the right page.
895          */
896         if (state->non_leaf)
897                 rightfree += (int) firstrightitemsz -
898                         (int) (MAXALIGN(sizeof(BTItemData)) + sizeof(ItemIdData));
899         /*
900          * If feasible split point, remember best delta.
901          */
902         if (leftfree >= 0 && rightfree >= 0)
903         {
904                 int             delta = leftfree - rightfree;
905
906                 if (delta < 0)
907                         delta = -delta;
908                 if (!state->have_split || delta < state->best_delta)
909                 {
910                         state->have_split = true;
911                         state->newitemonleft = newitemonleft;
912                         state->firstright = firstright;
913                         state->best_delta = delta;
914                 }
915         }
916 }
917
918 /*
919  *      _bt_getstackbuf() -- Walk back up the tree one step, and find the item
920  *                                               we last looked at in the parent.
921  *
922  *              This is possible because we save a bit image of the last item
923  *              we looked at in the parent, and the update algorithm guarantees
924  *              that if items above us in the tree move, they only move right.
925  *
926  *              Also, re-set bts_blkno & bts_offset if changed.
927  */
928 static Buffer
929 _bt_getstackbuf(Relation rel, BTStack stack)
930 {
931         BlockNumber blkno;
932         Buffer          buf;
933         OffsetNumber start,
934                                 offnum,
935                                 maxoff;
936         Page            page;
937         ItemId          itemid;
938         BTItem          item;
939         BTPageOpaque opaque;
940
941         blkno = stack->bts_blkno;
942         buf = _bt_getbuf(rel, blkno, BT_WRITE);
943         page = BufferGetPage(buf);
944         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
945         maxoff = PageGetMaxOffsetNumber(page);
946
947         start = stack->bts_offset;
948         /*
949          * _bt_insertonpg set bts_offset to InvalidOffsetNumber in the
950          * case of concurrent ROOT page split.  Also, watch out for
951          * possibility that page has a high key now when it didn't before.
952          */
953         if (start < P_FIRSTDATAKEY(opaque))
954                 start = P_FIRSTDATAKEY(opaque);
955
956         for (;;)
957         {
958                 /* see if it's on this page */
959                 for (offnum = start;
960                          offnum <= maxoff;
961                          offnum = OffsetNumberNext(offnum))
962                 {
963                         itemid = PageGetItemId(page, offnum);
964                         item = (BTItem) PageGetItem(page, itemid);
965                         if (BTItemSame(item, &stack->bts_btitem))
966                         {
967                                 /* Return accurate pointer to where link is now */
968                                 stack->bts_blkno = blkno;
969                                 stack->bts_offset = offnum;
970                                 return buf;
971                         }
972                 }
973                 /* by here, the item we're looking for moved right at least one page */
974                 if (P_RIGHTMOST(opaque))
975                         elog(FATAL, "_bt_getstackbuf: my bits moved right off the end of the world!"
976                                  "\n\tRecreate index %s.", RelationGetRelationName(rel));
977
978                 blkno = opaque->btpo_next;
979                 _bt_relbuf(rel, buf, BT_WRITE);
980                 buf = _bt_getbuf(rel, blkno, BT_WRITE);
981                 page = BufferGetPage(buf);
982                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
983                 maxoff = PageGetMaxOffsetNumber(page);
984                 start = P_FIRSTDATAKEY(opaque);
985         }
986 }
987
988 /*
989  *      _bt_newroot() -- Create a new root page for the index.
990  *
991  *              We've just split the old root page and need to create a new one.
992  *              In order to do this, we add a new root page to the file, then lock
993  *              the metadata page and update it.  This is guaranteed to be deadlock-
994  *              free, because all readers release their locks on the metadata page
995  *              before trying to lock the root, and all writers lock the root before
996  *              trying to lock the metadata page.  We have a write lock on the old
997  *              root page, so we have not introduced any cycles into the waits-for
998  *              graph.
999  *
1000  *              On entry, lbuf (the old root) and rbuf (its new peer) are write-
1001  *              locked.  On exit, a new root page exists with entries for the
1002  *              two new children.  The new root page is neither pinned nor locked, and
1003  *              we have also written out lbuf and rbuf and dropped their pins/locks.
1004  */
1005 static void
1006 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1007 {
1008         Buffer          rootbuf;
1009         Page            lpage,
1010                                 rpage,
1011                                 rootpage;
1012         BlockNumber lbkno,
1013                                 rbkno;
1014         BlockNumber rootbknum;
1015         BTPageOpaque rootopaque;
1016         ItemId          itemid;
1017         BTItem          item;
1018         Size            itemsz;
1019         BTItem          new_item;
1020
1021         /* get a new root page */
1022         rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1023         rootpage = BufferGetPage(rootbuf);
1024         rootbknum = BufferGetBlockNumber(rootbuf);
1025
1026         /* set btree special data */
1027         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1028         rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1029         rootopaque->btpo_flags |= BTP_ROOT;
1030
1031         lbkno = BufferGetBlockNumber(lbuf);
1032         rbkno = BufferGetBlockNumber(rbuf);
1033         lpage = BufferGetPage(lbuf);
1034         rpage = BufferGetPage(rbuf);
1035
1036         /*
1037          * Make sure pages in old root level have valid parent links --- we will
1038          * need this in _bt_insertonpg() if a concurrent root split happens (see
1039          * README).
1040          */
1041         ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo_parent =
1042                 ((BTPageOpaque) PageGetSpecialPointer(rpage))->btpo_parent =
1043                 rootbknum;
1044
1045         /*
1046          * Create downlink item for left page (old root).  Since this will be
1047          * the first item in a non-leaf page, it implicitly has minus-infinity
1048          * key value, so we need not store any actual key in it.
1049          */
1050         itemsz = sizeof(BTItemData);
1051         new_item = (BTItem) palloc(itemsz);
1052         new_item->bti_itup.t_info = itemsz;
1053         ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY);
1054
1055         /*
1056          * Insert the left page pointer into the new root page.  The root page
1057          * is the rightmost page on its level so there is no "high key" in it;
1058          * the two items will go into positions P_HIKEY and P_FIRSTKEY.
1059          */
1060         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
1061                 elog(FATAL, "btree: failed to add leftkey to new root page");
1062         pfree(new_item);
1063
1064         /*
1065          * Create downlink item for right page.  The key for it is obtained from
1066          * the "high key" position in the left page.
1067          */
1068         itemid = PageGetItemId(lpage, P_HIKEY);
1069         itemsz = ItemIdGetLength(itemid);
1070         item = (BTItem) PageGetItem(lpage, itemid);
1071         new_item = _bt_formitem(&(item->bti_itup));
1072         ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY);
1073
1074         /*
1075          * insert the right page pointer into the new root page.
1076          */
1077         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
1078                 elog(FATAL, "btree: failed to add rightkey to new root page");
1079         pfree(new_item);
1080
1081         /* write and let go of the new root buffer */
1082         _bt_wrtbuf(rel, rootbuf);
1083
1084         /* update metadata page with new root block number */
1085         _bt_metaproot(rel, rootbknum, 0);
1086
1087         /* update and release new sibling, and finally the old root */
1088         _bt_wrtbuf(rel, rbuf);
1089         _bt_wrtbuf(rel, lbuf);
1090 }
1091
1092 /*
1093  *      _bt_pgaddtup() -- add a tuple to a particular page in the index.
1094  *
1095  *              This routine adds the tuple to the page as requested.  It does
1096  *              not affect pin/lock status, but you'd better have a write lock
1097  *              and pin on the target buffer!  Don't forget to write and release
1098  *              the buffer afterwards, either.
1099  *
1100  *              The main difference between this routine and a bare PageAddItem call
1101  *              is that this code knows that the leftmost data item on a non-leaf
1102  *              btree page doesn't need to have a key.  Therefore, it strips such
1103  *              items down to just the item header.  CAUTION: this works ONLY if
1104  *              we insert the items in order, so that the given itup_off does
1105  *              represent the final position of the item!
1106  */
1107 static void
1108 _bt_pgaddtup(Relation rel,
1109                          Page page,
1110                          Size itemsize,
1111                          BTItem btitem,
1112                          OffsetNumber itup_off,
1113                          const char *where)
1114 {
1115         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1116         BTItemData truncitem;
1117
1118         if (! P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
1119         {
1120                 memcpy(&truncitem, btitem, sizeof(BTItemData));
1121                 truncitem.bti_itup.t_info = sizeof(BTItemData);
1122                 btitem = &truncitem;
1123                 itemsize = sizeof(BTItemData);
1124         }
1125
1126         if (PageAddItem(page, (Item) btitem, itemsize, itup_off,
1127                                         LP_USED) == InvalidOffsetNumber)
1128                 elog(FATAL, "btree: failed to add item to the %s for %s",
1129                          where, RelationGetRelationName(rel));
1130 }
1131
1132 /*
1133  * _bt_isequal - used in _bt_doinsert in check for duplicates.
1134  *
1135  * This is very similar to _bt_compare, except for NULL handling.
1136  * Rule is simple: NOT_NULL not equal NULL, NULL not_equal NULL too.
1137  */
1138 static bool
1139 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
1140                         int keysz, ScanKey scankey)
1141 {
1142         BTItem          btitem;
1143         IndexTuple      itup;
1144         int                     i;
1145
1146         /* Better be comparing to a leaf item */
1147         Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
1148
1149         btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
1150         itup = &(btitem->bti_itup);
1151
1152         for (i = 1; i <= keysz; i++)
1153         {
1154                 ScanKey         entry = &scankey[i - 1];
1155                 AttrNumber      attno;
1156                 Datum           datum;
1157                 bool            isNull;
1158                 int32           result;
1159
1160                 attno = entry->sk_attno;
1161                 Assert(attno == i);
1162                 datum = index_getattr(itup, attno, itupdesc, &isNull);
1163
1164                 /* NULLs are never equal to anything */
1165                 if (entry->sk_flags & SK_ISNULL || isNull)
1166                         return false;
1167
1168                 result = DatumGetInt32(FunctionCall2(&entry->sk_func,
1169                                                                                          entry->sk_argument,
1170                                                                                          datum));
1171
1172                 if (result != 0)
1173                         return false;
1174         }
1175
1176         /* if we get here, the keys are equal */
1177         return true;
1178 }