]> granicus.if.org Git - postgresql/blob - src/backend/access/nbtree/nbtinsert.c
32615a62c27a3250805b7fb677f51c62ed3ddef1
[postgresql] / src / backend / access / nbtree / nbtinsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtinsert.c
4  *        Item insertion in Lehman and Yao btrees for Postgres.
5  *
6  * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.117 2004/10/15 22:39:49 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "miscadmin.h"
21
22
23 typedef struct
24 {
25         /* context data for _bt_checksplitloc */
26         Size            newitemsz;              /* size of new item to be inserted */
27         bool            is_leaf;                /* T if splitting a leaf page */
28         bool            is_rightmost;   /* T if splitting a rightmost page */
29
30         bool            have_split;             /* found a valid split? */
31
32         /* these fields valid only if have_split is true */
33         bool            newitemonleft;  /* new item on left or right of best split */
34         OffsetNumber firstright;        /* best split point */
35         int                     best_delta;             /* best size delta so far */
36 } FindSplitData;
37
38
39 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
40
41 static TransactionId _bt_check_unique(Relation rel, BTItem btitem,
42                                  Relation heapRel, Buffer buf,
43                                  ScanKey itup_scankey);
44 static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf,
45                            BTStack stack,
46                            int keysz, ScanKey scankey,
47                            BTItem btitem,
48                            OffsetNumber afteritem,
49                            bool split_only_page);
50 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
51                   OffsetNumber newitemoff, Size newitemsz,
52                   BTItem newitem, bool newitemonleft,
53                   OffsetNumber *itup_off, BlockNumber *itup_blkno);
54 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
55                                  OffsetNumber newitemoff,
56                                  Size newitemsz,
57                                  bool *newitemonleft);
58 static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
59                                   int leftfree, int rightfree,
60                                   bool newitemonleft, Size firstrightitemsz);
61 static void _bt_pgaddtup(Relation rel, Page page,
62                          Size itemsize, BTItem btitem,
63                          OffsetNumber itup_off, const char *where);
64 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
65                         int keysz, ScanKey scankey);
66
67
68 /*
69  *      _bt_doinsert() -- Handle insertion of a single btitem in the tree.
70  *
71  *              This routine is called by the public interface routines, btbuild
72  *              and btinsert.  By here, btitem is filled in, including the TID.
73  */
74 InsertIndexResult
75 _bt_doinsert(Relation rel, BTItem btitem,
76                          bool index_is_unique, Relation heapRel)
77 {
78         IndexTuple      itup = &(btitem->bti_itup);
79         int                     natts = rel->rd_rel->relnatts;
80         ScanKey         itup_scankey;
81         BTStack         stack;
82         Buffer          buf;
83         InsertIndexResult res;
84
85         /* we need a scan key to do our search, so build one */
86         itup_scankey = _bt_mkscankey(rel, itup);
87
88 top:
89         /* find the first page containing this key */
90         stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
91
92         /* trade in our read lock for a write lock */
93         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
94         LockBuffer(buf, BT_WRITE);
95
96         /*
97          * If the page was split between the time that we surrendered our read
98          * lock and acquired our write lock, then this page may no longer be
99          * the right place for the key we want to insert.  In this case, we
100          * need to move right in the tree.      See Lehman and Yao for an
101          * excruciatingly precise description.
102          */
103         buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
104
105         /*
106          * If we're not allowing duplicates, make sure the key isn't already
107          * in the index.
108          *
109          * NOTE: obviously, _bt_check_unique can only detect keys that are
110          * already in the index; so it cannot defend against concurrent
111          * insertions of the same key.  We protect against that by means of
112          * holding a write lock on the target page.  Any other would-be
113          * inserter of the same key must acquire a write lock on the same
114          * target page, so only one would-be inserter can be making the check
115          * at one time.  Furthermore, once we are past the check we hold write
116          * locks continuously until we have performed our insertion, so no
117          * later inserter can fail to see our insertion.  (This requires some
118          * care in _bt_insertonpg.)
119          *
120          * If we must wait for another xact, we release the lock while waiting,
121          * and then must start over completely.
122          */
123         if (index_is_unique)
124         {
125                 TransactionId xwait;
126
127                 xwait = _bt_check_unique(rel, btitem, heapRel, buf, itup_scankey);
128
129                 if (TransactionIdIsValid(xwait))
130                 {
131                         /* Have to wait for the other guy ... */
132                         _bt_relbuf(rel, buf);
133                         XactLockTableWait(xwait);
134                         /* start over... */
135                         _bt_freestack(stack);
136                         goto top;
137                 }
138         }
139
140         /* do the insertion */
141         res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem,
142                                                  0, false);
143
144         /* be tidy */
145         _bt_freestack(stack);
146         _bt_freeskey(itup_scankey);
147
148         return res;
149 }
150
151 /*
152  *      _bt_check_unique() -- Check for violation of unique index constraint
153  *
154  * Returns InvalidTransactionId if there is no conflict, else an xact ID
155  * we must wait for to see if it commits a conflicting tuple.   If an actual
156  * conflict is detected, no return --- just ereport().
157  */
158 static TransactionId
159 _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel,
160                                  Buffer buf, ScanKey itup_scankey)
161 {
162         TupleDesc       itupdesc = RelationGetDescr(rel);
163         int                     natts = rel->rd_rel->relnatts;
164         OffsetNumber offset,
165                                 maxoff;
166         Page            page;
167         BTPageOpaque opaque;
168         Buffer          nbuf = InvalidBuffer;
169
170         page = BufferGetPage(buf);
171         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
172         maxoff = PageGetMaxOffsetNumber(page);
173
174         /*
175          * Find first item >= proposed new item.  Note we could also get a
176          * pointer to end-of-page here.
177          */
178         offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
179
180         /*
181          * Scan over all equal tuples, looking for live conflicts.
182          */
183         for (;;)
184         {
185                 HeapTupleData htup;
186                 Buffer          hbuffer;
187                 ItemId          curitemid;
188                 BTItem          cbti;
189                 BlockNumber nblkno;
190
191                 /*
192                  * make sure the offset points to an actual item before trying to
193                  * examine it...
194                  */
195                 if (offset <= maxoff)
196                 {
197                         curitemid = PageGetItemId(page, offset);
198
199                         /*
200                          * We can skip items that are marked killed.
201                          *
202                          * Formerly, we applied _bt_isequal() before checking the kill
203                          * flag, so as to fall out of the item loop as soon as
204                          * possible. However, in the presence of heavy update activity
205                          * an index may contain many killed items with the same key;
206                          * running _bt_isequal() on each killed item gets expensive.
207                          * Furthermore it is likely that the non-killed version of
208                          * each key appears first, so that we didn't actually get to
209                          * exit any sooner anyway. So now we just advance over killed
210                          * items as quickly as we can. We only apply _bt_isequal()
211                          * when we get to a non-killed item or the end of the page.
212                          */
213                         if (!ItemIdDeleted(curitemid))
214                         {
215                                 /*
216                                  * _bt_compare returns 0 for (1,NULL) and (1,NULL) -
217                                  * this's how we handling NULLs - and so we must not use
218                                  * _bt_compare in real comparison, but only for
219                                  * ordering/finding items on pages. - vadim 03/24/97
220                                  */
221                                 if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
222                                         break;          /* we're past all the equal tuples */
223
224                                 /* okay, we gotta fetch the heap tuple ... */
225                                 cbti = (BTItem) PageGetItem(page, curitemid);
226                                 htup.t_self = cbti->bti_itup.t_tid;
227                                 if (heap_fetch(heapRel, SnapshotDirty, &htup, &hbuffer,
228                                                            true, NULL))
229                                 {
230                                         /* it is a duplicate */
231                                         TransactionId xwait =
232                                         (TransactionIdIsValid(SnapshotDirty->xmin)) ?
233                                         SnapshotDirty->xmin : SnapshotDirty->xmax;
234
235                                         ReleaseBuffer(hbuffer);
236
237                                         /*
238                                          * If this tuple is being updated by other transaction
239                                          * then we have to wait for its commit/abort.
240                                          */
241                                         if (TransactionIdIsValid(xwait))
242                                         {
243                                                 if (nbuf != InvalidBuffer)
244                                                         _bt_relbuf(rel, nbuf);
245                                                 /* Tell _bt_doinsert to wait... */
246                                                 return xwait;
247                                         }
248
249                                         /*
250                                          * Otherwise we have a definite conflict.
251                                          */
252                                         ereport(ERROR,
253                                                         (errcode(ERRCODE_UNIQUE_VIOLATION),
254                                                          errmsg("duplicate key violates unique constraint \"%s\"",
255                                                                         RelationGetRelationName(rel))));
256                                 }
257                                 else if (htup.t_data != NULL)
258                                 {
259                                         /*
260                                          * Hmm, if we can't see the tuple, maybe it can be
261                                          * marked killed.  This logic should match
262                                          * index_getnext and btgettuple.
263                                          */
264                                         LockBuffer(hbuffer, BUFFER_LOCK_SHARE);
265                                         if (HeapTupleSatisfiesVacuum(htup.t_data, RecentGlobalXmin,
266                                                                                                  hbuffer) == HEAPTUPLE_DEAD)
267                                         {
268                                                 curitemid->lp_flags |= LP_DELETE;
269                                                 SetBufferCommitInfoNeedsSave(buf);
270                                         }
271                                         LockBuffer(hbuffer, BUFFER_LOCK_UNLOCK);
272                                         ReleaseBuffer(hbuffer);
273                                 }
274                         }
275                 }
276
277                 /*
278                  * Advance to next tuple to continue checking.
279                  */
280                 if (offset < maxoff)
281                         offset = OffsetNumberNext(offset);
282                 else
283                 {
284                         /* If scankey == hikey we gotta check the next page too */
285                         if (P_RIGHTMOST(opaque))
286                                 break;
287                         if (!_bt_isequal(itupdesc, page, P_HIKEY,
288                                                          natts, itup_scankey))
289                                 break;
290                         /* Advance to next non-dead page --- there must be one */
291                         for (;;)
292                         {
293                                 nblkno = opaque->btpo_next;
294                                 nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
295                                 page = BufferGetPage(nbuf);
296                                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
297                                 if (!P_IGNORE(opaque))
298                                         break;
299                                 if (P_RIGHTMOST(opaque))
300                                         elog(ERROR, "fell off the end of \"%s\"",
301                                                  RelationGetRelationName(rel));
302                         }
303                         maxoff = PageGetMaxOffsetNumber(page);
304                         offset = P_FIRSTDATAKEY(opaque);
305                 }
306         }
307
308         if (nbuf != InvalidBuffer)
309                 _bt_relbuf(rel, nbuf);
310
311         return InvalidTransactionId;
312 }
313
314 /*----------
315  *      _bt_insertonpg() -- Insert a tuple on a particular page in the index.
316  *
317  *              This recursive procedure does the following things:
318  *
319  *                      +  finds the right place to insert the tuple.
320  *                      +  if necessary, splits the target page (making sure that the
321  *                         split is equitable as far as post-insert free space goes).
322  *                      +  inserts the tuple.
323  *                      +  if the page was split, pops the parent stack, and finds the
324  *                         right place to insert the new child pointer (by walking
325  *                         right using information stored in the parent stack).
326  *                      +  invokes itself with the appropriate tuple for the right
327  *                         child page on the parent.
328  *                      +  updates the metapage if a true root or fast root is split.
329  *
330  *              On entry, we must have the right buffer on which to do the
331  *              insertion, and the buffer must be pinned and locked.  On return,
332  *              we will have dropped both the pin and the write lock on the buffer.
333  *
334  *              If 'afteritem' is >0 then the new tuple must be inserted after the
335  *              existing item of that number, noplace else.  If 'afteritem' is 0
336  *              then the procedure finds the exact spot to insert it by searching.
337  *              (keysz and scankey parameters are used ONLY if afteritem == 0.)
338  *
339  *              NOTE: if the new key is equal to one or more existing keys, we can
340  *              legitimately place it anywhere in the series of equal keys --- in fact,
341  *              if the new key is equal to the page's "high key" we can place it on
342  *              the next page.  If it is equal to the high key, and there's not room
343  *              to insert the new tuple on the current page without splitting, then
344  *              we can move right hoping to find more free space and avoid a split.
345  *              (We should not move right indefinitely, however, since that leads to
346  *              O(N^2) insertion behavior in the presence of many equal keys.)
347  *              Once we have chosen the page to put the key on, we'll insert it before
348  *              any existing equal keys because of the way _bt_binsrch() works.
349  *
350  *              The locking interactions in this code are critical.  You should
351  *              grok Lehman and Yao's paper before making any changes.  In addition,
352  *              you need to understand how we disambiguate duplicate keys in this
353  *              implementation, in order to be able to find our location using
354  *              L&Y "move right" operations.  Since we may insert duplicate user
355  *              keys, and since these dups may propagate up the tree, we use the
356  *              'afteritem' parameter to position ourselves correctly for the
357  *              insertion on internal pages.
358  *----------
359  */
360 static InsertIndexResult
361 _bt_insertonpg(Relation rel,
362                            Buffer buf,
363                            BTStack stack,
364                            int keysz,
365                            ScanKey scankey,
366                            BTItem btitem,
367                            OffsetNumber afteritem,
368                            bool split_only_page)
369 {
370         InsertIndexResult res;
371         Page            page;
372         BTPageOpaque lpageop;
373         OffsetNumber itup_off;
374         BlockNumber itup_blkno;
375         OffsetNumber newitemoff;
376         OffsetNumber firstright = InvalidOffsetNumber;
377         Size            itemsz;
378
379         page = BufferGetPage(buf);
380         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
381
382         itemsz = IndexTupleDSize(btitem->bti_itup)
383                 + (sizeof(BTItemData) - sizeof(IndexTupleData));
384
385         itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but
386                                                                  * we need to be consistent */
387
388         /*
389          * Check whether the item can fit on a btree page at all. (Eventually,
390          * we ought to try to apply TOAST methods if not.) We actually need to
391          * be able to fit three items on every page, so restrict any one item
392          * to 1/3 the per-page available space. Note that at this point,
393          * itemsz doesn't include the ItemId.
394          */
395         if (itemsz > BTMaxItemSize(page))
396                 ereport(ERROR,
397                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
398                                  errmsg("index row size %lu exceeds btree maximum, %lu",
399                                                 (unsigned long) itemsz,
400                                                 (unsigned long) BTMaxItemSize(page))));
401
402         /*
403          * Determine exactly where new item will go.
404          */
405         if (afteritem > 0)
406                 newitemoff = afteritem + 1;
407         else
408         {
409                 /*----------
410                  * If we will need to split the page to put the item here,
411                  * check whether we can put the tuple somewhere to the right,
412                  * instead.  Keep scanning right until we
413                  *              (a) find a page with enough free space,
414                  *              (b) reach the last page where the tuple can legally go, or
415                  *              (c) get tired of searching.
416                  * (c) is not flippant; it is important because if there are many
417                  * pages' worth of equal keys, it's better to split one of the early
418                  * pages than to scan all the way to the end of the run of equal keys
419                  * on every insert.  We implement "get tired" as a random choice,
420                  * since stopping after scanning a fixed number of pages wouldn't work
421                  * well (we'd never reach the right-hand side of previously split
422                  * pages).      Currently the probability of moving right is set at 0.99,
423                  * which may seem too high to change the behavior much, but it does an
424                  * excellent job of preventing O(N^2) behavior with many equal keys.
425                  *----------
426                  */
427                 bool            movedright = false;
428
429                 while (PageGetFreeSpace(page) < itemsz &&
430                            !P_RIGHTMOST(lpageop) &&
431                            _bt_compare(rel, keysz, scankey, page, P_HIKEY) == 0 &&
432                            random() > (MAX_RANDOM_VALUE / 100))
433                 {
434                         /*
435                          * step right to next non-dead page
436                          *
437                          * must write-lock that page before releasing write lock on
438                          * current page; else someone else's _bt_check_unique scan
439                          * could fail to see our insertion.  write locks on
440                          * intermediate dead pages won't do because we don't know when
441                          * they will get de-linked from the tree.
442                          */
443                         Buffer          rbuf = InvalidBuffer;
444
445                         for (;;)
446                         {
447                                 BlockNumber rblkno = lpageop->btpo_next;
448
449                                 rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
450                                 page = BufferGetPage(rbuf);
451                                 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
452                                 if (!P_IGNORE(lpageop))
453                                         break;
454                                 if (P_RIGHTMOST(lpageop))
455                                         elog(ERROR, "fell off the end of \"%s\"",
456                                                  RelationGetRelationName(rel));
457                         }
458                         _bt_relbuf(rel, buf);
459                         buf = rbuf;
460                         movedright = true;
461                 }
462
463                 /*
464                  * Now we are on the right page, so find the insert position. If
465                  * we moved right at all, we know we should insert at the start of
466                  * the page, else must find the position by searching.
467                  */
468                 if (movedright)
469                         newitemoff = P_FIRSTDATAKEY(lpageop);
470                 else
471                         newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
472         }
473
474         /*
475          * Do we need to split the page to fit the item on it?
476          *
477          * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
478          * so this comparison is correct even though we appear to be
479          * accounting only for the item and not for its line pointer.
480          */
481         if (PageGetFreeSpace(page) < itemsz)
482         {
483                 bool            is_root = P_ISROOT(lpageop);
484                 bool            is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
485                 bool            newitemonleft;
486                 Buffer          rbuf;
487
488                 /* Choose the split point */
489                 firstright = _bt_findsplitloc(rel, page,
490                                                                           newitemoff, itemsz,
491                                                                           &newitemonleft);
492
493                 /* split the buffer into left and right halves */
494                 rbuf = _bt_split(rel, buf, firstright,
495                                                  newitemoff, itemsz, btitem, newitemonleft,
496                                                  &itup_off, &itup_blkno);
497
498                 /*----------
499                  * By here,
500                  *
501                  *              +  our target page has been split;
502                  *              +  the original tuple has been inserted;
503                  *              +  we have write locks on both the old (left half)
504                  *                 and new (right half) buffers, after the split; and
505                  *              +  we know the key we want to insert into the parent
506                  *                 (it's the "high key" on the left child page).
507                  *
508                  * We're ready to do the parent insertion.  We need to hold onto the
509                  * locks for the child pages until we locate the parent, but we can
510                  * release them before doing the actual insertion (see Lehman and Yao
511                  * for the reasoning).
512                  *----------
513                  */
514                 _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
515         }
516         else
517         {
518                 Buffer          metabuf = InvalidBuffer;
519                 Page            metapg = NULL;
520                 BTMetaPageData *metad = NULL;
521
522                 itup_off = newitemoff;
523                 itup_blkno = BufferGetBlockNumber(buf);
524
525                 /*
526                  * If we are doing this insert because we split a page that was
527                  * the only one on its tree level, but was not the root, it may
528                  * have been the "fast root".  We need to ensure that the fast
529                  * root link points at or above the current page.  We can safely
530                  * acquire a lock on the metapage here --- see comments for
531                  * _bt_newroot().
532                  */
533                 if (split_only_page)
534                 {
535                         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
536                         metapg = BufferGetPage(metabuf);
537                         metad = BTPageGetMeta(metapg);
538
539                         if (metad->btm_fastlevel >= lpageop->btpo.level)
540                         {
541                                 /* no update wanted */
542                                 _bt_relbuf(rel, metabuf);
543                                 metabuf = InvalidBuffer;
544                         }
545                 }
546
547                 /* Do the update.  No ereport(ERROR) until changes are logged */
548                 START_CRIT_SECTION();
549
550                 _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
551
552                 if (BufferIsValid(metabuf))
553                 {
554                         metad->btm_fastroot = itup_blkno;
555                         metad->btm_fastlevel = lpageop->btpo.level;
556                 }
557
558                 /* XLOG stuff */
559                 if (!rel->rd_istemp)
560                 {
561                         xl_btree_insert xlrec;
562                         xl_btree_metadata xlmeta;
563                         uint8           xlinfo;
564                         XLogRecPtr      recptr;
565                         XLogRecData rdata[3];
566                         XLogRecData *nextrdata;
567                         BTItemData      truncitem;
568
569                         xlrec.target.node = rel->rd_node;
570                         ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
571
572                         rdata[0].buffer = InvalidBuffer;
573                         rdata[0].data = (char *) &xlrec;
574                         rdata[0].len = SizeOfBtreeInsert;
575                         rdata[0].next = nextrdata = &(rdata[1]);
576
577                         if (BufferIsValid(metabuf))
578                         {
579                                 xlmeta.root = metad->btm_root;
580                                 xlmeta.level = metad->btm_level;
581                                 xlmeta.fastroot = metad->btm_fastroot;
582                                 xlmeta.fastlevel = metad->btm_fastlevel;
583
584                                 nextrdata->buffer = InvalidBuffer;
585                                 nextrdata->data = (char *) &xlmeta;
586                                 nextrdata->len = sizeof(xl_btree_metadata);
587                                 nextrdata->next = nextrdata + 1;
588                                 nextrdata++;
589                                 xlinfo = XLOG_BTREE_INSERT_META;
590                         }
591                         else if (P_ISLEAF(lpageop))
592                                 xlinfo = XLOG_BTREE_INSERT_LEAF;
593                         else
594                                 xlinfo = XLOG_BTREE_INSERT_UPPER;
595
596                         /* Read comments in _bt_pgaddtup */
597                         if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
598                         {
599                                 truncitem = *btitem;
600                                 truncitem.bti_itup.t_info = sizeof(BTItemData);
601                                 nextrdata->data = (char *) &truncitem;
602                                 nextrdata->len = sizeof(BTItemData);
603                         }
604                         else
605                         {
606                                 nextrdata->data = (char *) btitem;
607                                 nextrdata->len = IndexTupleDSize(btitem->bti_itup) +
608                                         (sizeof(BTItemData) - sizeof(IndexTupleData));
609                         }
610                         nextrdata->buffer = buf;
611                         nextrdata->next = NULL;
612
613                         recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
614
615                         if (BufferIsValid(metabuf))
616                         {
617                                 PageSetLSN(metapg, recptr);
618                                 PageSetTLI(metapg, ThisTimeLineID);
619                         }
620
621                         PageSetLSN(page, recptr);
622                         PageSetTLI(page, ThisTimeLineID);
623                 }
624
625                 END_CRIT_SECTION();
626
627                 /* Write out the updated page and release pin/lock */
628                 if (BufferIsValid(metabuf))
629                         _bt_wrtbuf(rel, metabuf);
630
631                 _bt_wrtbuf(rel, buf);
632         }
633
634         /* by here, the new tuple is inserted at itup_blkno/itup_off */
635         res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
636         ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
637
638         return res;
639 }
640
641 /*
642  *      _bt_split() -- split a page in the btree.
643  *
644  *              On entry, buf is the page to split, and is write-locked and pinned.
645  *              firstright is the item index of the first item to be moved to the
646  *              new right page.  newitemoff etc. tell us about the new item that
647  *              must be inserted along with the data from the old page.
648  *
649  *              Returns the new right sibling of buf, pinned and write-locked.
650  *              The pin and lock on buf are maintained.  *itup_off and *itup_blkno
651  *              are set to the exact location where newitem was inserted.
652  */
653 static Buffer
654 _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
655                   OffsetNumber newitemoff, Size newitemsz, BTItem newitem,
656                   bool newitemonleft,
657                   OffsetNumber *itup_off, BlockNumber *itup_blkno)
658 {
659         Buffer          rbuf;
660         Page            origpage;
661         Page            leftpage,
662                                 rightpage;
663         BTPageOpaque ropaque,
664                                 lopaque,
665                                 oopaque;
666         Buffer          sbuf = InvalidBuffer;
667         Page            spage = NULL;
668         BTPageOpaque sopaque = NULL;
669         Size            itemsz;
670         ItemId          itemid;
671         BTItem          item;
672         OffsetNumber leftoff,
673                                 rightoff;
674         OffsetNumber maxoff;
675         OffsetNumber i;
676
677         rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
678         origpage = BufferGetPage(buf);
679         leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
680         rightpage = BufferGetPage(rbuf);
681
682         _bt_pageinit(leftpage, BufferGetPageSize(buf));
683         _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
684
685         /* init btree private data */
686         oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
687         lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
688         ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
689
690         /* if we're splitting this page, it won't be the root when we're done */
691         lopaque->btpo_flags = oopaque->btpo_flags;
692         lopaque->btpo_flags &= ~BTP_ROOT;
693         ropaque->btpo_flags = lopaque->btpo_flags;
694         lopaque->btpo_prev = oopaque->btpo_prev;
695         lopaque->btpo_next = BufferGetBlockNumber(rbuf);
696         ropaque->btpo_prev = BufferGetBlockNumber(buf);
697         ropaque->btpo_next = oopaque->btpo_next;
698         lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
699
700         /*
701          * If the page we're splitting is not the rightmost page at its level
702          * in the tree, then the first entry on the page is the high key for
703          * the page.  We need to copy that to the right half.  Otherwise
704          * (meaning the rightmost page case), all the items on the right half
705          * will be user data.
706          */
707         rightoff = P_HIKEY;
708
709         if (!P_RIGHTMOST(oopaque))
710         {
711                 itemid = PageGetItemId(origpage, P_HIKEY);
712                 itemsz = ItemIdGetLength(itemid);
713                 item = (BTItem) PageGetItem(origpage, itemid);
714                 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
715                                                 LP_USED) == InvalidOffsetNumber)
716                         elog(PANIC, "failed to add hikey to the right sibling");
717                 rightoff = OffsetNumberNext(rightoff);
718         }
719
720         /*
721          * The "high key" for the new left page will be the first key that's
722          * going to go into the new right page.  This might be either the
723          * existing data item at position firstright, or the incoming tuple.
724          */
725         leftoff = P_HIKEY;
726         if (!newitemonleft && newitemoff == firstright)
727         {
728                 /* incoming tuple will become first on right page */
729                 itemsz = newitemsz;
730                 item = newitem;
731         }
732         else
733         {
734                 /* existing item at firstright will become first on right page */
735                 itemid = PageGetItemId(origpage, firstright);
736                 itemsz = ItemIdGetLength(itemid);
737                 item = (BTItem) PageGetItem(origpage, itemid);
738         }
739         if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
740                                         LP_USED) == InvalidOffsetNumber)
741                 elog(PANIC, "failed to add hikey to the left sibling");
742         leftoff = OffsetNumberNext(leftoff);
743
744         /*
745          * Now transfer all the data items to the appropriate page
746          */
747         maxoff = PageGetMaxOffsetNumber(origpage);
748
749         for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
750         {
751                 itemid = PageGetItemId(origpage, i);
752                 itemsz = ItemIdGetLength(itemid);
753                 item = (BTItem) PageGetItem(origpage, itemid);
754
755                 /* does new item belong before this one? */
756                 if (i == newitemoff)
757                 {
758                         if (newitemonleft)
759                         {
760                                 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
761                                                          "left sibling");
762                                 *itup_off = leftoff;
763                                 *itup_blkno = BufferGetBlockNumber(buf);
764                                 leftoff = OffsetNumberNext(leftoff);
765                         }
766                         else
767                         {
768                                 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
769                                                          "right sibling");
770                                 *itup_off = rightoff;
771                                 *itup_blkno = BufferGetBlockNumber(rbuf);
772                                 rightoff = OffsetNumberNext(rightoff);
773                         }
774                 }
775
776                 /* decide which page to put it on */
777                 if (i < firstright)
778                 {
779                         _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
780                                                  "left sibling");
781                         leftoff = OffsetNumberNext(leftoff);
782                 }
783                 else
784                 {
785                         _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
786                                                  "right sibling");
787                         rightoff = OffsetNumberNext(rightoff);
788                 }
789         }
790
791         /* cope with possibility that newitem goes at the end */
792         if (i <= newitemoff)
793         {
794                 if (newitemonleft)
795                 {
796                         _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
797                                                  "left sibling");
798                         *itup_off = leftoff;
799                         *itup_blkno = BufferGetBlockNumber(buf);
800                         leftoff = OffsetNumberNext(leftoff);
801                 }
802                 else
803                 {
804                         _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
805                                                  "right sibling");
806                         *itup_off = rightoff;
807                         *itup_blkno = BufferGetBlockNumber(rbuf);
808                         rightoff = OffsetNumberNext(rightoff);
809                 }
810         }
811
812         /*
813          * We have to grab the right sibling (if any) and fix the prev pointer
814          * there. We are guaranteed that this is deadlock-free since no other
815          * writer will be holding a lock on that page and trying to move left,
816          * and all readers release locks on a page before trying to fetch its
817          * neighbors.
818          */
819
820         if (!P_RIGHTMOST(ropaque))
821         {
822                 sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
823                 spage = BufferGetPage(sbuf);
824                 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
825                 if (sopaque->btpo_prev != ropaque->btpo_prev)
826                         elog(PANIC, "right sibling's left-link doesn't match");
827         }
828
829         /*
830          * Right sibling is locked, new siblings are prepared, but original
831          * page is not updated yet. Log changes before continuing.
832          *
833          * NO EREPORT(ERROR) till right sibling is updated.
834          */
835         START_CRIT_SECTION();
836
837         if (!P_RIGHTMOST(ropaque))
838                 sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
839
840         /* XLOG stuff */
841         if (!rel->rd_istemp)
842         {
843                 xl_btree_split xlrec;
844                 uint8           xlinfo;
845                 XLogRecPtr      recptr;
846                 XLogRecData rdata[4];
847
848                 xlrec.target.node = rel->rd_node;
849                 ItemPointerSet(&(xlrec.target.tid), *itup_blkno, *itup_off);
850                 if (newitemonleft)
851                         xlrec.otherblk = BufferGetBlockNumber(rbuf);
852                 else
853                         xlrec.otherblk = BufferGetBlockNumber(buf);
854                 xlrec.leftblk = lopaque->btpo_prev;
855                 xlrec.rightblk = ropaque->btpo_next;
856                 xlrec.level = lopaque->btpo.level;
857
858                 /*
859                  * Direct access to page is not good but faster - we should
860                  * implement some new func in page API.  Note we only store the
861                  * tuples themselves, knowing that the item pointers are in the
862                  * same order and can be reconstructed by scanning the tuples.
863                  */
864                 xlrec.leftlen = ((PageHeader) leftpage)->pd_special -
865                         ((PageHeader) leftpage)->pd_upper;
866
867                 rdata[0].buffer = InvalidBuffer;
868                 rdata[0].data = (char *) &xlrec;
869                 rdata[0].len = SizeOfBtreeSplit;
870                 rdata[0].next = &(rdata[1]);
871
872                 rdata[1].buffer = InvalidBuffer;
873                 rdata[1].data = (char *) leftpage + ((PageHeader) leftpage)->pd_upper;
874                 rdata[1].len = xlrec.leftlen;
875                 rdata[1].next = &(rdata[2]);
876
877                 rdata[2].buffer = InvalidBuffer;
878                 rdata[2].data = (char *) rightpage + ((PageHeader) rightpage)->pd_upper;
879                 rdata[2].len = ((PageHeader) rightpage)->pd_special -
880                         ((PageHeader) rightpage)->pd_upper;
881                 rdata[2].next = NULL;
882
883                 if (!P_RIGHTMOST(ropaque))
884                 {
885                         rdata[2].next = &(rdata[3]);
886                         rdata[3].buffer = sbuf;
887                         rdata[3].data = NULL;
888                         rdata[3].len = 0;
889                         rdata[3].next = NULL;
890                 }
891
892                 if (P_ISROOT(oopaque))
893                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
894                 else
895                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
896
897                 recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
898
899                 PageSetLSN(leftpage, recptr);
900                 PageSetTLI(leftpage, ThisTimeLineID);
901                 PageSetLSN(rightpage, recptr);
902                 PageSetTLI(rightpage, ThisTimeLineID);
903                 if (!P_RIGHTMOST(ropaque))
904                 {
905                         PageSetLSN(spage, recptr);
906                         PageSetTLI(spage, ThisTimeLineID);
907                 }
908         }
909
910         /*
911          * By here, the original data page has been split into two new halves,
912          * and these are correct.  The algorithm requires that the left page
913          * never move during a split, so we copy the new left page back on top
914          * of the original.  Note that this is not a waste of time, since we
915          * also require (in the page management code) that the center of a
916          * page always be clean, and the most efficient way to guarantee this
917          * is just to compact the data by reinserting it into a new left page.
918          */
919
920         PageRestoreTempPage(leftpage, origpage);
921
922         END_CRIT_SECTION();
923
924         /* write and release the old right sibling */
925         if (!P_RIGHTMOST(ropaque))
926                 _bt_wrtbuf(rel, sbuf);
927
928         /* split's done */
929         return rbuf;
930 }
931
932 /*
933  *      _bt_findsplitloc() -- find an appropriate place to split a page.
934  *
935  * The idea here is to equalize the free space that will be on each split
936  * page, *after accounting for the inserted tuple*.  (If we fail to account
937  * for it, we might find ourselves with too little room on the page that
938  * it needs to go into!)
939  *
940  * If the page is the rightmost page on its level, we instead try to arrange
941  * for twice as much free space on the right as on the left.  In this way,
942  * when we are inserting successively increasing keys (consider sequences,
943  * timestamps, etc) we will end up with a tree whose pages are about 67% full,
944  * instead of the 50% full result that we'd get without this special case.
945  * (We could bias it even further to make the initially-loaded tree more full.
946  * But since the steady-state load for a btree is about 70%, we'd likely just
947  * be making more page-splitting work for ourselves later on, when we start
948  * seeing updates to existing tuples.)
949  *
950  * We are passed the intended insert position of the new tuple, expressed as
951  * the offsetnumber of the tuple it must go in front of.  (This could be
952  * maxoff+1 if the tuple is to go at the end.)
953  *
954  * We return the index of the first existing tuple that should go on the
955  * righthand page, plus a boolean indicating whether the new tuple goes on
956  * the left or right page.      The bool is necessary to disambiguate the case
957  * where firstright == newitemoff.
958  */
959 static OffsetNumber
960 _bt_findsplitloc(Relation rel,
961                                  Page page,
962                                  OffsetNumber newitemoff,
963                                  Size newitemsz,
964                                  bool *newitemonleft)
965 {
966         BTPageOpaque opaque;
967         OffsetNumber offnum;
968         OffsetNumber maxoff;
969         ItemId          itemid;
970         FindSplitData state;
971         int                     leftspace,
972                                 rightspace,
973                                 goodenough,
974                                 dataitemtotal,
975                                 dataitemstoleft;
976
977         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
978
979         /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
980         newitemsz += sizeof(ItemIdData);
981         state.newitemsz = newitemsz;
982         state.is_leaf = P_ISLEAF(opaque);
983         state.is_rightmost = P_RIGHTMOST(opaque);
984         state.have_split = false;
985
986         /* Total free space available on a btree page, after fixed overhead */
987         leftspace = rightspace =
988                 PageGetPageSize(page) - SizeOfPageHeaderData -
989                 MAXALIGN(sizeof(BTPageOpaqueData));
990
991         /*
992          * Finding the best possible split would require checking all the
993          * possible split points, because of the high-key and left-key special
994          * cases. That's probably more work than it's worth; instead, stop as
995          * soon as we find a "good-enough" split, where good-enough is defined
996          * as an imbalance in free space of no more than pagesize/16
997          * (arbitrary...) This should let us stop near the middle on most
998          * pages, instead of plowing to the end.
999          */
1000         goodenough = leftspace / 16;
1001
1002         /* The right page will have the same high key as the old page */
1003         if (!P_RIGHTMOST(opaque))
1004         {
1005                 itemid = PageGetItemId(page, P_HIKEY);
1006                 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1007                                                          sizeof(ItemIdData));
1008         }
1009
1010         /* Count up total space in data items without actually scanning 'em */
1011         dataitemtotal = rightspace - (int) PageGetFreeSpace(page);
1012
1013         /*
1014          * Scan through the data items and calculate space usage for a split
1015          * at each possible position.
1016          */
1017         dataitemstoleft = 0;
1018         maxoff = PageGetMaxOffsetNumber(page);
1019
1020         for (offnum = P_FIRSTDATAKEY(opaque);
1021                  offnum <= maxoff;
1022                  offnum = OffsetNumberNext(offnum))
1023         {
1024                 Size            itemsz;
1025                 int                     leftfree,
1026                                         rightfree;
1027
1028                 itemid = PageGetItemId(page, offnum);
1029                 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1030
1031                 /*
1032                  * We have to allow for the current item becoming the high key of
1033                  * the left page; therefore it counts against left space as well
1034                  * as right space.
1035                  */
1036                 leftfree = leftspace - dataitemstoleft - (int) itemsz;
1037                 rightfree = rightspace - (dataitemtotal - dataitemstoleft);
1038
1039                 /*
1040                  * Will the new item go to left or right of split?
1041                  */
1042                 if (offnum > newitemoff)
1043                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1044                                                           true, itemsz);
1045                 else if (offnum < newitemoff)
1046                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1047                                                           false, itemsz);
1048                 else
1049                 {
1050                         /* need to try it both ways! */
1051                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1052                                                           true, itemsz);
1053                         /* here we are contemplating newitem as first on right */
1054                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1055                                                           false, newitemsz);
1056                 }
1057
1058                 /* Abort scan once we find a good-enough choice */
1059                 if (state.have_split && state.best_delta <= goodenough)
1060                         break;
1061
1062                 dataitemstoleft += itemsz;
1063         }
1064
1065         /*
1066          * I believe it is not possible to fail to find a feasible split, but
1067          * just in case ...
1068          */
1069         if (!state.have_split)
1070                 elog(ERROR, "could not find a feasible split point for \"%s\"",
1071                          RelationGetRelationName(rel));
1072
1073         *newitemonleft = state.newitemonleft;
1074         return state.firstright;
1075 }
1076
1077 /*
1078  * Subroutine to analyze a particular possible split choice (ie, firstright
1079  * and newitemonleft settings), and record the best split so far in *state.
1080  */
1081 static void
1082 _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
1083                                   int leftfree, int rightfree,
1084                                   bool newitemonleft, Size firstrightitemsz)
1085 {
1086         /*
1087          * Account for the new item on whichever side it is to be put.
1088          */
1089         if (newitemonleft)
1090                 leftfree -= (int) state->newitemsz;
1091         else
1092                 rightfree -= (int) state->newitemsz;
1093
1094         /*
1095          * If we are not on the leaf level, we will be able to discard the key
1096          * data from the first item that winds up on the right page.
1097          */
1098         if (!state->is_leaf)
1099                 rightfree += (int) firstrightitemsz -
1100                         (int) (MAXALIGN(sizeof(BTItemData)) + sizeof(ItemIdData));
1101
1102         /*
1103          * If feasible split point, remember best delta.
1104          */
1105         if (leftfree >= 0 && rightfree >= 0)
1106         {
1107                 int                     delta;
1108
1109                 if (state->is_rightmost)
1110                 {
1111                         /*
1112                          * On a rightmost page, try to equalize right free space with
1113                          * twice the left free space.  See comments for
1114                          * _bt_findsplitloc.
1115                          */
1116                         delta = (2 * leftfree) - rightfree;
1117                 }
1118                 else
1119                 {
1120                         /* Otherwise, aim for equal free space on both sides */
1121                         delta = leftfree - rightfree;
1122                 }
1123
1124                 if (delta < 0)
1125                         delta = -delta;
1126                 if (!state->have_split || delta < state->best_delta)
1127                 {
1128                         state->have_split = true;
1129                         state->newitemonleft = newitemonleft;
1130                         state->firstright = firstright;
1131                         state->best_delta = delta;
1132                 }
1133         }
1134 }
1135
1136 /*
1137  * _bt_insert_parent() -- Insert downlink into parent after a page split.
1138  *
1139  * On entry, buf and rbuf are the left and right split pages, which we
1140  * still hold write locks on per the L&Y algorithm.  We release the
1141  * write locks once we have write lock on the parent page.      (Any sooner,
1142  * and it'd be possible for some other process to try to split or delete
1143  * one of these pages, and get confused because it cannot find the downlink.)
1144  *
1145  * stack - stack showing how we got here.  May be NULL in cases that don't
1146  *                      have to be efficient (concurrent ROOT split, WAL recovery)
1147  * is_root - we split the true root
1148  * is_only - we split a page alone on its level (might have been fast root)
1149  *
1150  * This is exported so it can be called by nbtxlog.c.
1151  */
1152 void
1153 _bt_insert_parent(Relation rel,
1154                                   Buffer buf,
1155                                   Buffer rbuf,
1156                                   BTStack stack,
1157                                   bool is_root,
1158                                   bool is_only)
1159 {
1160         /*
1161          * Here we have to do something Lehman and Yao don't talk about: deal
1162          * with a root split and construction of a new root.  If our stack is
1163          * empty then we have just split a node on what had been the root
1164          * level when we descended the tree.  If it was still the root then we
1165          * perform a new-root construction.  If it *wasn't* the root anymore,
1166          * search to find the next higher level that someone constructed
1167          * meanwhile, and find the right place to insert as for the normal
1168          * case.
1169          *
1170          * If we have to search for the parent level, we do so by re-descending
1171          * from the root.  This is not super-efficient, but it's rare enough
1172          * not to matter.  (This path is also taken when called from WAL
1173          * recovery --- we have no stack in that case.)
1174          */
1175         if (is_root)
1176         {
1177                 Buffer          rootbuf;
1178
1179                 Assert(stack == NULL);
1180                 Assert(is_only);
1181                 /* create a new root node and update the metapage */
1182                 rootbuf = _bt_newroot(rel, buf, rbuf);
1183                 /* release the split buffers */
1184                 _bt_wrtbuf(rel, rootbuf);
1185                 _bt_wrtbuf(rel, rbuf);
1186                 _bt_wrtbuf(rel, buf);
1187         }
1188         else
1189         {
1190                 BlockNumber bknum = BufferGetBlockNumber(buf);
1191                 BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1192                 Page            page = BufferGetPage(buf);
1193                 InsertIndexResult newres;
1194                 BTItem          new_item;
1195                 BTStackData fakestack;
1196                 BTItem          ritem;
1197                 Buffer          pbuf;
1198
1199                 if (stack == NULL)
1200                 {
1201                         BTPageOpaque lpageop;
1202
1203                         if (!InRecovery)
1204                                 elog(DEBUG2, "concurrent ROOT page split");
1205                         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1206                         /* Find the leftmost page at the next level up */
1207                         pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
1208                         /* Set up a phony stack entry pointing there */
1209                         stack = &fakestack;
1210                         stack->bts_blkno = BufferGetBlockNumber(pbuf);
1211                         stack->bts_offset = InvalidOffsetNumber;
1212                         /* bts_btitem will be initialized below */
1213                         stack->bts_parent = NULL;
1214                         _bt_relbuf(rel, pbuf);
1215                 }
1216
1217                 /* get high key from left page == lowest key on new right page */
1218                 ritem = (BTItem) PageGetItem(page,
1219                                                                          PageGetItemId(page, P_HIKEY));
1220
1221                 /* form an index tuple that points at the new right page */
1222                 new_item = _bt_formitem(&(ritem->bti_itup));
1223                 ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY);
1224
1225                 /*
1226                  * Find the parent buffer and get the parent page.
1227                  *
1228                  * Oops - if we were moved right then we need to change stack item!
1229                  * We want to find parent pointing to where we are, right ?    -
1230                  * vadim 05/27/97
1231                  */
1232                 ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
1233                                            bknum, P_HIKEY);
1234
1235                 pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1236
1237                 /* Now we can write and unlock the children */
1238                 _bt_wrtbuf(rel, rbuf);
1239                 _bt_wrtbuf(rel, buf);
1240
1241                 /* Check for error only after writing children */
1242                 if (pbuf == InvalidBuffer)
1243                         elog(ERROR, "failed to re-find parent key in \"%s\"",
1244                                  RelationGetRelationName(rel));
1245
1246                 /* Recursively update the parent */
1247                 newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
1248                                                                 0, NULL, new_item, stack->bts_offset,
1249                                                                 is_only);
1250
1251                 /* be tidy */
1252                 pfree(newres);
1253                 pfree(new_item);
1254         }
1255 }
1256
1257 /*
1258  *      _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1259  *                                               we last looked at in the parent.
1260  *
1261  *              This is possible because we save the downlink from the parent item,
1262  *              which is enough to uniquely identify it.  Insertions into the parent
1263  *              level could cause the item to move right; deletions could cause it
1264  *              to move left, but not left of the page we previously found it in.
1265  *
1266  *              Adjusts bts_blkno & bts_offset if changed.
1267  *
1268  *              Returns InvalidBuffer if item not found (should not happen).
1269  */
1270 Buffer
1271 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1272 {
1273         BlockNumber blkno;
1274         OffsetNumber start;
1275
1276         blkno = stack->bts_blkno;
1277         start = stack->bts_offset;
1278
1279         for (;;)
1280         {
1281                 Buffer          buf;
1282                 Page            page;
1283                 BTPageOpaque opaque;
1284
1285                 buf = _bt_getbuf(rel, blkno, access);
1286                 page = BufferGetPage(buf);
1287                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1288
1289                 if (!P_IGNORE(opaque))
1290                 {
1291                         OffsetNumber offnum,
1292                                                 minoff,
1293                                                 maxoff;
1294                         ItemId          itemid;
1295                         BTItem          item;
1296
1297                         minoff = P_FIRSTDATAKEY(opaque);
1298                         maxoff = PageGetMaxOffsetNumber(page);
1299
1300                         /*
1301                          * start = InvalidOffsetNumber means "search the whole page".
1302                          * We need this test anyway due to possibility that page has a
1303                          * high key now when it didn't before.
1304                          */
1305                         if (start < minoff)
1306                                 start = minoff;
1307
1308                         /*
1309                          * Need this check too, to guard against possibility that page
1310                          * split since we visited it originally.
1311                          */
1312                         if (start > maxoff)
1313                                 start = OffsetNumberNext(maxoff);
1314
1315                         /*
1316                          * These loops will check every item on the page --- but in an
1317                          * order that's attuned to the probability of where it
1318                          * actually is.  Scan to the right first, then to the left.
1319                          */
1320                         for (offnum = start;
1321                                  offnum <= maxoff;
1322                                  offnum = OffsetNumberNext(offnum))
1323                         {
1324                                 itemid = PageGetItemId(page, offnum);
1325                                 item = (BTItem) PageGetItem(page, itemid);
1326                                 if (BTItemSame(item, &stack->bts_btitem))
1327                                 {
1328                                         /* Return accurate pointer to where link is now */
1329                                         stack->bts_blkno = blkno;
1330                                         stack->bts_offset = offnum;
1331                                         return buf;
1332                                 }
1333                         }
1334
1335                         for (offnum = OffsetNumberPrev(start);
1336                                  offnum >= minoff;
1337                                  offnum = OffsetNumberPrev(offnum))
1338                         {
1339                                 itemid = PageGetItemId(page, offnum);
1340                                 item = (BTItem) PageGetItem(page, itemid);
1341                                 if (BTItemSame(item, &stack->bts_btitem))
1342                                 {
1343                                         /* Return accurate pointer to where link is now */
1344                                         stack->bts_blkno = blkno;
1345                                         stack->bts_offset = offnum;
1346                                         return buf;
1347                                 }
1348                         }
1349                 }
1350
1351                 /*
1352                  * The item we're looking for moved right at least one page.
1353                  */
1354                 if (P_RIGHTMOST(opaque))
1355                 {
1356                         _bt_relbuf(rel, buf);
1357                         return InvalidBuffer;
1358                 }
1359                 blkno = opaque->btpo_next;
1360                 start = InvalidOffsetNumber;
1361                 _bt_relbuf(rel, buf);
1362         }
1363 }
1364
1365 /*
1366  *      _bt_newroot() -- Create a new root page for the index.
1367  *
1368  *              We've just split the old root page and need to create a new one.
1369  *              In order to do this, we add a new root page to the file, then lock
1370  *              the metadata page and update it.  This is guaranteed to be deadlock-
1371  *              free, because all readers release their locks on the metadata page
1372  *              before trying to lock the root, and all writers lock the root before
1373  *              trying to lock the metadata page.  We have a write lock on the old
1374  *              root page, so we have not introduced any cycles into the waits-for
1375  *              graph.
1376  *
1377  *              On entry, lbuf (the old root) and rbuf (its new peer) are write-
1378  *              locked. On exit, a new root page exists with entries for the
1379  *              two new children, metapage is updated and unlocked/unpinned.
1380  *              The new root buffer is returned to caller which has to unlock/unpin
1381  *              lbuf, rbuf & rootbuf.
1382  */
1383 static Buffer
1384 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1385 {
1386         Buffer          rootbuf;
1387         Page            lpage,
1388                                 rpage,
1389                                 rootpage;
1390         BlockNumber lbkno,
1391                                 rbkno;
1392         BlockNumber rootblknum;
1393         BTPageOpaque rootopaque;
1394         ItemId          itemid;
1395         BTItem          item;
1396         Size            itemsz;
1397         BTItem          new_item;
1398         Buffer          metabuf;
1399         Page            metapg;
1400         BTMetaPageData *metad;
1401
1402         lbkno = BufferGetBlockNumber(lbuf);
1403         rbkno = BufferGetBlockNumber(rbuf);
1404         lpage = BufferGetPage(lbuf);
1405         rpage = BufferGetPage(rbuf);
1406
1407         /* get a new root page */
1408         rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1409         rootpage = BufferGetPage(rootbuf);
1410         rootblknum = BufferGetBlockNumber(rootbuf);
1411
1412         /* acquire lock on the metapage */
1413         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1414         metapg = BufferGetPage(metabuf);
1415         metad = BTPageGetMeta(metapg);
1416
1417         /* NO EREPORT(ERROR) from here till newroot op is logged */
1418         START_CRIT_SECTION();
1419
1420         /* set btree special data */
1421         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1422         rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1423         rootopaque->btpo_flags = BTP_ROOT;
1424         rootopaque->btpo.level =
1425                 ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1426
1427         /* update metapage data */
1428         metad->btm_root = rootblknum;
1429         metad->btm_level = rootopaque->btpo.level;
1430         metad->btm_fastroot = rootblknum;
1431         metad->btm_fastlevel = rootopaque->btpo.level;
1432
1433         /*
1434          * Create downlink item for left page (old root).  Since this will be
1435          * the first item in a non-leaf page, it implicitly has minus-infinity
1436          * key value, so we need not store any actual key in it.
1437          */
1438         itemsz = sizeof(BTItemData);
1439         new_item = (BTItem) palloc(itemsz);
1440         new_item->bti_itup.t_info = itemsz;
1441         ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY);
1442
1443         /*
1444          * Insert the left page pointer into the new root page.  The root page
1445          * is the rightmost page on its level so there is no "high key" in it;
1446          * the two items will go into positions P_HIKEY and P_FIRSTKEY.
1447          */
1448         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
1449                 elog(PANIC, "failed to add leftkey to new root page");
1450         pfree(new_item);
1451
1452         /*
1453          * Create downlink item for right page.  The key for it is obtained
1454          * from the "high key" position in the left page.
1455          */
1456         itemid = PageGetItemId(lpage, P_HIKEY);
1457         itemsz = ItemIdGetLength(itemid);
1458         item = (BTItem) PageGetItem(lpage, itemid);
1459         new_item = _bt_formitem(&(item->bti_itup));
1460         ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY);
1461
1462         /*
1463          * insert the right page pointer into the new root page.
1464          */
1465         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
1466                 elog(PANIC, "failed to add rightkey to new root page");
1467         pfree(new_item);
1468
1469         /* XLOG stuff */
1470         if (!rel->rd_istemp)
1471         {
1472                 xl_btree_newroot xlrec;
1473                 XLogRecPtr      recptr;
1474                 XLogRecData rdata[2];
1475
1476                 xlrec.node = rel->rd_node;
1477                 xlrec.rootblk = rootblknum;
1478                 xlrec.level = metad->btm_level;
1479
1480                 rdata[0].buffer = InvalidBuffer;
1481                 rdata[0].data = (char *) &xlrec;
1482                 rdata[0].len = SizeOfBtreeNewroot;
1483                 rdata[0].next = &(rdata[1]);
1484
1485                 /*
1486                  * Direct access to page is not good but faster - we should
1487                  * implement some new func in page API.
1488                  */
1489                 rdata[1].buffer = InvalidBuffer;
1490                 rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
1491                 rdata[1].len = ((PageHeader) rootpage)->pd_special -
1492                         ((PageHeader) rootpage)->pd_upper;
1493                 rdata[1].next = NULL;
1494
1495                 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
1496
1497                 PageSetLSN(rootpage, recptr);
1498                 PageSetTLI(rootpage, ThisTimeLineID);
1499                 PageSetLSN(metapg, recptr);
1500                 PageSetTLI(metapg, ThisTimeLineID);
1501                 PageSetLSN(lpage, recptr);
1502                 PageSetTLI(lpage, ThisTimeLineID);
1503                 PageSetLSN(rpage, recptr);
1504                 PageSetTLI(rpage, ThisTimeLineID);
1505         }
1506
1507         END_CRIT_SECTION();
1508
1509         /* write and let go of metapage buffer */
1510         _bt_wrtbuf(rel, metabuf);
1511
1512         return (rootbuf);
1513 }
1514
1515 /*
1516  *      _bt_pgaddtup() -- add a tuple to a particular page in the index.
1517  *
1518  *              This routine adds the tuple to the page as requested.  It does
1519  *              not affect pin/lock status, but you'd better have a write lock
1520  *              and pin on the target buffer!  Don't forget to write and release
1521  *              the buffer afterwards, either.
1522  *
1523  *              The main difference between this routine and a bare PageAddItem call
1524  *              is that this code knows that the leftmost data item on a non-leaf
1525  *              btree page doesn't need to have a key.  Therefore, it strips such
1526  *              items down to just the item header.  CAUTION: this works ONLY if
1527  *              we insert the items in order, so that the given itup_off does
1528  *              represent the final position of the item!
1529  */
1530 static void
1531 _bt_pgaddtup(Relation rel,
1532                          Page page,
1533                          Size itemsize,
1534                          BTItem btitem,
1535                          OffsetNumber itup_off,
1536                          const char *where)
1537 {
1538         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1539         BTItemData      truncitem;
1540
1541         if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
1542         {
1543                 memcpy(&truncitem, btitem, sizeof(BTItemData));
1544                 truncitem.bti_itup.t_info = sizeof(BTItemData);
1545                 btitem = &truncitem;
1546                 itemsize = sizeof(BTItemData);
1547         }
1548
1549         if (PageAddItem(page, (Item) btitem, itemsize, itup_off,
1550                                         LP_USED) == InvalidOffsetNumber)
1551                 elog(PANIC, "failed to add item to the %s for \"%s\"",
1552                          where, RelationGetRelationName(rel));
1553 }
1554
1555 /*
1556  * _bt_isequal - used in _bt_doinsert in check for duplicates.
1557  *
1558  * This is very similar to _bt_compare, except for NULL handling.
1559  * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
1560  */
1561 static bool
1562 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
1563                         int keysz, ScanKey scankey)
1564 {
1565         BTItem          btitem;
1566         IndexTuple      itup;
1567         int                     i;
1568
1569         /* Better be comparing to a leaf item */
1570         Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
1571
1572         btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
1573         itup = &(btitem->bti_itup);
1574
1575         for (i = 1; i <= keysz; i++)
1576         {
1577                 AttrNumber      attno;
1578                 Datum           datum;
1579                 bool            isNull;
1580                 int32           result;
1581
1582                 attno = scankey->sk_attno;
1583                 Assert(attno == i);
1584                 datum = index_getattr(itup, attno, itupdesc, &isNull);
1585
1586                 /* NULLs are never equal to anything */
1587                 if (isNull || (scankey->sk_flags & SK_ISNULL))
1588                         return false;
1589
1590                 result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
1591                                                                                          datum,
1592                                                                                          scankey->sk_argument));
1593
1594                 if (result != 0)
1595                         return false;
1596
1597                 scankey++;
1598         }
1599
1600         /* if we get here, the keys are equal */
1601         return true;
1602 }