]> granicus.if.org Git - postgresql/blob - src/backend/access/nbtree/nbtinsert.c
Arrange to cache btree metapage data in the relcache entry for the index,
[postgresql] / src / backend / access / nbtree / nbtinsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtinsert.c
4  *        Item insertion in Lehman and Yao btrees for Postgres.
5  *
6  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.136 2006/04/25 22:46:05 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "miscadmin.h"
21 #include "utils/inval.h"
22
23
24 typedef struct
25 {
26         /* context data for _bt_checksplitloc */
27         Size            newitemsz;              /* size of new item to be inserted */
28         bool            is_leaf;                /* T if splitting a leaf page */
29         bool            is_rightmost;   /* T if splitting a rightmost page */
30
31         bool            have_split;             /* found a valid split? */
32
33         /* these fields valid only if have_split is true */
34         bool            newitemonleft;  /* new item on left or right of best split */
35         OffsetNumber firstright;        /* best split point */
36         int                     best_delta;             /* best size delta so far */
37 } FindSplitData;
38
39
40 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
41
42 static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
43                                  Relation heapRel, Buffer buf,
44                                  ScanKey itup_scankey);
45 static void _bt_insertonpg(Relation rel, Buffer buf,
46                            BTStack stack,
47                            int keysz, ScanKey scankey,
48                            IndexTuple itup,
49                            OffsetNumber afteritem,
50                            bool split_only_page);
51 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
52                   OffsetNumber newitemoff, Size newitemsz,
53                   IndexTuple newitem, bool newitemonleft);
54 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
55                                  OffsetNumber newitemoff,
56                                  Size newitemsz,
57                                  bool *newitemonleft);
58 static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
59                                   int leftfree, int rightfree,
60                                   bool newitemonleft, Size firstrightitemsz);
61 static void _bt_pgaddtup(Relation rel, Page page,
62                          Size itemsize, IndexTuple itup,
63                          OffsetNumber itup_off, const char *where);
64 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
65                         int keysz, ScanKey scankey);
66
67
68 /*
69  *      _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
70  *
71  *              This routine is called by the public interface routines, btbuild
72  *              and btinsert.  By here, itup is filled in, including the TID.
73  */
74 void
75 _bt_doinsert(Relation rel, IndexTuple itup,
76                          bool index_is_unique, Relation heapRel)
77 {
78         int                     natts = rel->rd_rel->relnatts;
79         ScanKey         itup_scankey;
80         BTStack         stack;
81         Buffer          buf;
82
83         /* we need an insertion scan key to do our search, so build one */
84         itup_scankey = _bt_mkscankey(rel, itup);
85
86 top:
87         /* find the first page containing this key */
88         stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
89
90         /* trade in our read lock for a write lock */
91         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
92         LockBuffer(buf, BT_WRITE);
93
94         /*
95          * If the page was split between the time that we surrendered our read
96          * lock and acquired our write lock, then this page may no longer be the
97          * right place for the key we want to insert.  In this case, we need to
98          * move right in the tree.      See Lehman and Yao for an excruciatingly
99          * precise description.
100          */
101         buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
102
103         /*
104          * If we're not allowing duplicates, make sure the key isn't already in
105          * the index.
106          *
107          * NOTE: obviously, _bt_check_unique can only detect keys that are already
108          * in the index; so it cannot defend against concurrent insertions of the
109          * same key.  We protect against that by means of holding a write lock on
110          * the target page.  Any other would-be inserter of the same key must
111          * acquire a write lock on the same target page, so only one would-be
112          * inserter can be making the check at one time.  Furthermore, once we are
113          * past the check we hold write locks continuously until we have performed
114          * our insertion, so no later inserter can fail to see our insertion.
115          * (This requires some care in _bt_insertonpg.)
116          *
117          * If we must wait for another xact, we release the lock while waiting,
118          * and then must start over completely.
119          */
120         if (index_is_unique)
121         {
122                 TransactionId xwait;
123
124                 xwait = _bt_check_unique(rel, itup, heapRel, buf, itup_scankey);
125
126                 if (TransactionIdIsValid(xwait))
127                 {
128                         /* Have to wait for the other guy ... */
129                         _bt_relbuf(rel, buf);
130                         XactLockTableWait(xwait);
131                         /* start over... */
132                         _bt_freestack(stack);
133                         goto top;
134                 }
135         }
136
137         /* do the insertion */
138         _bt_insertonpg(rel, buf, stack, natts, itup_scankey, itup, 0, false);
139
140         /* be tidy */
141         _bt_freestack(stack);
142         _bt_freeskey(itup_scankey);
143 }
144
145 /*
146  *      _bt_check_unique() -- Check for violation of unique index constraint
147  *
148  * Returns InvalidTransactionId if there is no conflict, else an xact ID
149  * we must wait for to see if it commits a conflicting tuple.   If an actual
150  * conflict is detected, no return --- just ereport().
151  */
152 static TransactionId
153 _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
154                                  Buffer buf, ScanKey itup_scankey)
155 {
156         TupleDesc       itupdesc = RelationGetDescr(rel);
157         int                     natts = rel->rd_rel->relnatts;
158         OffsetNumber offset,
159                                 maxoff;
160         Page            page;
161         BTPageOpaque opaque;
162         Buffer          nbuf = InvalidBuffer;
163
164         page = BufferGetPage(buf);
165         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
166         maxoff = PageGetMaxOffsetNumber(page);
167
168         /*
169          * Find first item >= proposed new item.  Note we could also get a pointer
170          * to end-of-page here.
171          */
172         offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
173
174         /*
175          * Scan over all equal tuples, looking for live conflicts.
176          */
177         for (;;)
178         {
179                 HeapTupleData htup;
180                 Buffer          hbuffer;
181                 ItemId          curitemid;
182                 IndexTuple      curitup;
183                 BlockNumber nblkno;
184
185                 /*
186                  * make sure the offset points to an actual item before trying to
187                  * examine it...
188                  */
189                 if (offset <= maxoff)
190                 {
191                         curitemid = PageGetItemId(page, offset);
192
193                         /*
194                          * We can skip items that are marked killed.
195                          *
196                          * Formerly, we applied _bt_isequal() before checking the kill
197                          * flag, so as to fall out of the item loop as soon as possible.
198                          * However, in the presence of heavy update activity an index may
199                          * contain many killed items with the same key; running
200                          * _bt_isequal() on each killed item gets expensive. Furthermore
201                          * it is likely that the non-killed version of each key appears
202                          * first, so that we didn't actually get to exit any sooner
203                          * anyway. So now we just advance over killed items as quickly as
204                          * we can. We only apply _bt_isequal() when we get to a non-killed
205                          * item or the end of the page.
206                          */
207                         if (!ItemIdDeleted(curitemid))
208                         {
209                                 /*
210                                  * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
211                                  * how we handling NULLs - and so we must not use _bt_compare
212                                  * in real comparison, but only for ordering/finding items on
213                                  * pages. - vadim 03/24/97
214                                  */
215                                 if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
216                                         break;          /* we're past all the equal tuples */
217
218                                 /* okay, we gotta fetch the heap tuple ... */
219                                 curitup = (IndexTuple) PageGetItem(page, curitemid);
220                                 htup.t_self = curitup->t_tid;
221                                 if (heap_fetch(heapRel, SnapshotDirty, &htup, &hbuffer,
222                                                            true, NULL))
223                                 {
224                                         /* it is a duplicate */
225                                         TransactionId xwait =
226                                         (TransactionIdIsValid(SnapshotDirty->xmin)) ?
227                                         SnapshotDirty->xmin : SnapshotDirty->xmax;
228
229                                         ReleaseBuffer(hbuffer);
230
231                                         /*
232                                          * If this tuple is being updated by other transaction
233                                          * then we have to wait for its commit/abort.
234                                          */
235                                         if (TransactionIdIsValid(xwait))
236                                         {
237                                                 if (nbuf != InvalidBuffer)
238                                                         _bt_relbuf(rel, nbuf);
239                                                 /* Tell _bt_doinsert to wait... */
240                                                 return xwait;
241                                         }
242
243                                         /*
244                                          * Otherwise we have a definite conflict.
245                                          */
246                                         ereport(ERROR,
247                                                         (errcode(ERRCODE_UNIQUE_VIOLATION),
248                                         errmsg("duplicate key violates unique constraint \"%s\"",
249                                                    RelationGetRelationName(rel))));
250                                 }
251                                 else if (htup.t_data != NULL)
252                                 {
253                                         /*
254                                          * Hmm, if we can't see the tuple, maybe it can be marked
255                                          * killed.      This logic should match index_getnext and
256                                          * btgettuple.
257                                          */
258                                         LockBuffer(hbuffer, BUFFER_LOCK_SHARE);
259                                         if (HeapTupleSatisfiesVacuum(htup.t_data, RecentGlobalXmin,
260                                                                                                  hbuffer) == HEAPTUPLE_DEAD)
261                                         {
262                                                 curitemid->lp_flags |= LP_DELETE;
263                                                 if (nbuf != InvalidBuffer)
264                                                         SetBufferCommitInfoNeedsSave(nbuf);
265                                                 else
266                                                         SetBufferCommitInfoNeedsSave(buf);
267                                         }
268                                         LockBuffer(hbuffer, BUFFER_LOCK_UNLOCK);
269                                 }
270                                 ReleaseBuffer(hbuffer);
271                         }
272                 }
273
274                 /*
275                  * Advance to next tuple to continue checking.
276                  */
277                 if (offset < maxoff)
278                         offset = OffsetNumberNext(offset);
279                 else
280                 {
281                         /* If scankey == hikey we gotta check the next page too */
282                         if (P_RIGHTMOST(opaque))
283                                 break;
284                         if (!_bt_isequal(itupdesc, page, P_HIKEY,
285                                                          natts, itup_scankey))
286                                 break;
287                         /* Advance to next non-dead page --- there must be one */
288                         for (;;)
289                         {
290                                 nblkno = opaque->btpo_next;
291                                 nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
292                                 page = BufferGetPage(nbuf);
293                                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
294                                 if (!P_IGNORE(opaque))
295                                         break;
296                                 if (P_RIGHTMOST(opaque))
297                                         elog(ERROR, "fell off the end of \"%s\"",
298                                                  RelationGetRelationName(rel));
299                         }
300                         maxoff = PageGetMaxOffsetNumber(page);
301                         offset = P_FIRSTDATAKEY(opaque);
302                 }
303         }
304
305         if (nbuf != InvalidBuffer)
306                 _bt_relbuf(rel, nbuf);
307
308         return InvalidTransactionId;
309 }
310
311 /*----------
312  *      _bt_insertonpg() -- Insert a tuple on a particular page in the index.
313  *
314  *              This recursive procedure does the following things:
315  *
316  *                      +  finds the right place to insert the tuple.
317  *                      +  if necessary, splits the target page (making sure that the
318  *                         split is equitable as far as post-insert free space goes).
319  *                      +  inserts the tuple.
320  *                      +  if the page was split, pops the parent stack, and finds the
321  *                         right place to insert the new child pointer (by walking
322  *                         right using information stored in the parent stack).
323  *                      +  invokes itself with the appropriate tuple for the right
324  *                         child page on the parent.
325  *                      +  updates the metapage if a true root or fast root is split.
326  *
327  *              On entry, we must have the right buffer in which to do the
328  *              insertion, and the buffer must be pinned and write-locked.  On return,
329  *              we will have dropped both the pin and the lock on the buffer.
330  *
331  *              If 'afteritem' is >0 then the new tuple must be inserted after the
332  *              existing item of that number, noplace else.  If 'afteritem' is 0
333  *              then the procedure finds the exact spot to insert it by searching.
334  *              (keysz and scankey parameters are used ONLY if afteritem == 0.
335  *              The scankey must be an insertion-type scankey.)
336  *
337  *              NOTE: if the new key is equal to one or more existing keys, we can
338  *              legitimately place it anywhere in the series of equal keys --- in fact,
339  *              if the new key is equal to the page's "high key" we can place it on
340  *              the next page.  If it is equal to the high key, and there's not room
341  *              to insert the new tuple on the current page without splitting, then
342  *              we can move right hoping to find more free space and avoid a split.
343  *              (We should not move right indefinitely, however, since that leads to
344  *              O(N^2) insertion behavior in the presence of many equal keys.)
345  *              Once we have chosen the page to put the key on, we'll insert it before
346  *              any existing equal keys because of the way _bt_binsrch() works.
347  *
348  *              The locking interactions in this code are critical.  You should
349  *              grok Lehman and Yao's paper before making any changes.  In addition,
350  *              you need to understand how we disambiguate duplicate keys in this
351  *              implementation, in order to be able to find our location using
352  *              L&Y "move right" operations.  Since we may insert duplicate user
353  *              keys, and since these dups may propagate up the tree, we use the
354  *              'afteritem' parameter to position ourselves correctly for the
355  *              insertion on internal pages.
356  *----------
357  */
358 static void
359 _bt_insertonpg(Relation rel,
360                            Buffer buf,
361                            BTStack stack,
362                            int keysz,
363                            ScanKey scankey,
364                            IndexTuple itup,
365                            OffsetNumber afteritem,
366                            bool split_only_page)
367 {
368         Page            page;
369         BTPageOpaque lpageop;
370         OffsetNumber newitemoff;
371         OffsetNumber firstright = InvalidOffsetNumber;
372         Size            itemsz;
373
374         page = BufferGetPage(buf);
375         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
376
377         itemsz = IndexTupleDSize(*itup);
378         itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but we
379                                                                  * need to be consistent */
380
381         /*
382          * Check whether the item can fit on a btree page at all. (Eventually, we
383          * ought to try to apply TOAST methods if not.) We actually need to be
384          * able to fit three items on every page, so restrict any one item to 1/3
385          * the per-page available space. Note that at this point, itemsz doesn't
386          * include the ItemId.
387          */
388         if (itemsz > BTMaxItemSize(page))
389                 ereport(ERROR,
390                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
391                                  errmsg("index row size %lu exceeds btree maximum, %lu",
392                                                 (unsigned long) itemsz,
393                                                 (unsigned long) BTMaxItemSize(page)),
394                 errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
395                                 "Consider a function index of an MD5 hash of the value, "
396                                 "or use full text indexing.")));
397
398         /*
399          * Determine exactly where new item will go.
400          */
401         if (afteritem > 0)
402                 newitemoff = afteritem + 1;
403         else
404         {
405                 /*----------
406                  * If we will need to split the page to put the item here,
407                  * check whether we can put the tuple somewhere to the right,
408                  * instead.  Keep scanning right until we
409                  *              (a) find a page with enough free space,
410                  *              (b) reach the last page where the tuple can legally go, or
411                  *              (c) get tired of searching.
412                  * (c) is not flippant; it is important because if there are many
413                  * pages' worth of equal keys, it's better to split one of the early
414                  * pages than to scan all the way to the end of the run of equal keys
415                  * on every insert.  We implement "get tired" as a random choice,
416                  * since stopping after scanning a fixed number of pages wouldn't work
417                  * well (we'd never reach the right-hand side of previously split
418                  * pages).      Currently the probability of moving right is set at 0.99,
419                  * which may seem too high to change the behavior much, but it does an
420                  * excellent job of preventing O(N^2) behavior with many equal keys.
421                  *----------
422                  */
423                 bool            movedright = false;
424
425                 while (PageGetFreeSpace(page) < itemsz &&
426                            !P_RIGHTMOST(lpageop) &&
427                            _bt_compare(rel, keysz, scankey, page, P_HIKEY) == 0 &&
428                            random() > (MAX_RANDOM_VALUE / 100))
429                 {
430                         /*
431                          * step right to next non-dead page
432                          *
433                          * must write-lock that page before releasing write lock on
434                          * current page; else someone else's _bt_check_unique scan could
435                          * fail to see our insertion.  write locks on intermediate dead
436                          * pages won't do because we don't know when they will get
437                          * de-linked from the tree.
438                          */
439                         Buffer          rbuf = InvalidBuffer;
440
441                         for (;;)
442                         {
443                                 BlockNumber rblkno = lpageop->btpo_next;
444
445                                 rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
446                                 page = BufferGetPage(rbuf);
447                                 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
448                                 if (!P_IGNORE(lpageop))
449                                         break;
450                                 if (P_RIGHTMOST(lpageop))
451                                         elog(ERROR, "fell off the end of \"%s\"",
452                                                  RelationGetRelationName(rel));
453                         }
454                         _bt_relbuf(rel, buf);
455                         buf = rbuf;
456                         movedright = true;
457                 }
458
459                 /*
460                  * Now we are on the right page, so find the insert position. If we
461                  * moved right at all, we know we should insert at the start of the
462                  * page, else must find the position by searching.
463                  */
464                 if (movedright)
465                         newitemoff = P_FIRSTDATAKEY(lpageop);
466                 else
467                         newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
468         }
469
470         /*
471          * Do we need to split the page to fit the item on it?
472          *
473          * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
474          * so this comparison is correct even though we appear to be accounting
475          * only for the item and not for its line pointer.
476          */
477         if (PageGetFreeSpace(page) < itemsz)
478         {
479                 bool            is_root = P_ISROOT(lpageop);
480                 bool            is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
481                 bool            newitemonleft;
482                 Buffer          rbuf;
483
484                 /* Choose the split point */
485                 firstright = _bt_findsplitloc(rel, page,
486                                                                           newitemoff, itemsz,
487                                                                           &newitemonleft);
488
489                 /* split the buffer into left and right halves */
490                 rbuf = _bt_split(rel, buf, firstright,
491                                                  newitemoff, itemsz, itup, newitemonleft);
492
493                 /*----------
494                  * By here,
495                  *
496                  *              +  our target page has been split;
497                  *              +  the original tuple has been inserted;
498                  *              +  we have write locks on both the old (left half)
499                  *                 and new (right half) buffers, after the split; and
500                  *              +  we know the key we want to insert into the parent
501                  *                 (it's the "high key" on the left child page).
502                  *
503                  * We're ready to do the parent insertion.  We need to hold onto the
504                  * locks for the child pages until we locate the parent, but we can
505                  * release them before doing the actual insertion (see Lehman and Yao
506                  * for the reasoning).
507                  *----------
508                  */
509                 _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
510         }
511         else
512         {
513                 Buffer          metabuf = InvalidBuffer;
514                 Page            metapg = NULL;
515                 BTMetaPageData *metad = NULL;
516                 OffsetNumber itup_off;
517                 BlockNumber itup_blkno;
518
519                 itup_off = newitemoff;
520                 itup_blkno = BufferGetBlockNumber(buf);
521
522                 /*
523                  * If we are doing this insert because we split a page that was the
524                  * only one on its tree level, but was not the root, it may have been
525                  * the "fast root".  We need to ensure that the fast root link points
526                  * at or above the current page.  We can safely acquire a lock on the
527                  * metapage here --- see comments for _bt_newroot().
528                  */
529                 if (split_only_page)
530                 {
531                         Assert(!P_ISLEAF(lpageop));
532
533                         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
534                         metapg = BufferGetPage(metabuf);
535                         metad = BTPageGetMeta(metapg);
536
537                         if (metad->btm_fastlevel >= lpageop->btpo.level)
538                         {
539                                 /* no update wanted */
540                                 _bt_relbuf(rel, metabuf);
541                                 metabuf = InvalidBuffer;
542                         }
543                 }
544
545                 /* Do the update.  No ereport(ERROR) until changes are logged */
546                 START_CRIT_SECTION();
547
548                 _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page");
549
550                 MarkBufferDirty(buf);
551
552                 if (BufferIsValid(metabuf))
553                 {
554                         metad->btm_fastroot = itup_blkno;
555                         metad->btm_fastlevel = lpageop->btpo.level;
556                         MarkBufferDirty(metabuf);
557                 }
558
559                 /* XLOG stuff */
560                 if (!rel->rd_istemp)
561                 {
562                         xl_btree_insert xlrec;
563                         BlockNumber     xldownlink;
564                         xl_btree_metadata xlmeta;
565                         uint8           xlinfo;
566                         XLogRecPtr      recptr;
567                         XLogRecData rdata[4];
568                         XLogRecData *nextrdata;
569                         IndexTupleData trunctuple;
570
571                         xlrec.target.node = rel->rd_node;
572                         ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
573
574                         rdata[0].data = (char *) &xlrec;
575                         rdata[0].len = SizeOfBtreeInsert;
576                         rdata[0].buffer = InvalidBuffer;
577                         rdata[0].next = nextrdata = &(rdata[1]);
578
579                         if (P_ISLEAF(lpageop))
580                                 xlinfo = XLOG_BTREE_INSERT_LEAF;
581                         else
582                         {
583                                 xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid));
584                                 Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
585
586                                 nextrdata->data = (char *) &xldownlink;
587                                 nextrdata->len = sizeof(BlockNumber);
588                                 nextrdata->buffer = InvalidBuffer;
589                                 nextrdata->next = nextrdata + 1;
590                                 nextrdata++;
591
592                                 xlinfo = XLOG_BTREE_INSERT_UPPER;
593                         }
594
595                         if (BufferIsValid(metabuf))
596                         {
597                                 xlmeta.root = metad->btm_root;
598                                 xlmeta.level = metad->btm_level;
599                                 xlmeta.fastroot = metad->btm_fastroot;
600                                 xlmeta.fastlevel = metad->btm_fastlevel;
601
602                                 nextrdata->data = (char *) &xlmeta;
603                                 nextrdata->len = sizeof(xl_btree_metadata);
604                                 nextrdata->buffer = InvalidBuffer;
605                                 nextrdata->next = nextrdata + 1;
606                                 nextrdata++;
607
608                                 xlinfo = XLOG_BTREE_INSERT_META;
609                         }
610
611                         /* Read comments in _bt_pgaddtup */
612                         if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
613                         {
614                                 trunctuple = *itup;
615                                 trunctuple.t_info = sizeof(IndexTupleData);
616                                 nextrdata->data = (char *) &trunctuple;
617                                 nextrdata->len = sizeof(IndexTupleData);
618                         }
619                         else
620                         {
621                                 nextrdata->data = (char *) itup;
622                                 nextrdata->len = IndexTupleDSize(*itup);
623                         }
624                         nextrdata->buffer = buf;
625                         nextrdata->buffer_std = true;
626                         nextrdata->next = NULL;
627
628                         recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
629
630                         if (BufferIsValid(metabuf))
631                         {
632                                 PageSetLSN(metapg, recptr);
633                                 PageSetTLI(metapg, ThisTimeLineID);
634                         }
635
636                         PageSetLSN(page, recptr);
637                         PageSetTLI(page, ThisTimeLineID);
638                 }
639
640                 END_CRIT_SECTION();
641
642                 /* release buffers; send out relcache inval if metapage changed */
643                 if (BufferIsValid(metabuf))
644                 {
645                         CacheInvalidateRelcache(rel);
646                         _bt_relbuf(rel, metabuf);
647                 }
648
649                 _bt_relbuf(rel, buf);
650         }
651 }
652
653 /*
654  *      _bt_split() -- split a page in the btree.
655  *
656  *              On entry, buf is the page to split, and is pinned and write-locked.
657  *              firstright is the item index of the first item to be moved to the
658  *              new right page.  newitemoff etc. tell us about the new item that
659  *              must be inserted along with the data from the old page.
660  *
661  *              Returns the new right sibling of buf, pinned and write-locked.
662  *              The pin and lock on buf are maintained.
663  */
664 static Buffer
665 _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
666                   OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
667                   bool newitemonleft)
668 {
669         Buffer          rbuf;
670         Page            origpage;
671         Page            leftpage,
672                                 rightpage;
673         BTPageOpaque ropaque,
674                                 lopaque,
675                                 oopaque;
676         Buffer          sbuf = InvalidBuffer;
677         Page            spage = NULL;
678         BTPageOpaque sopaque = NULL;
679         OffsetNumber itup_off = 0;
680         BlockNumber itup_blkno = 0;
681         Size            itemsz;
682         ItemId          itemid;
683         IndexTuple      item;
684         OffsetNumber leftoff,
685                                 rightoff;
686         OffsetNumber maxoff;
687         OffsetNumber i;
688
689         rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
690         origpage = BufferGetPage(buf);
691         leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
692         rightpage = BufferGetPage(rbuf);
693
694         _bt_pageinit(leftpage, BufferGetPageSize(buf));
695         /* rightpage was already initialized by _bt_getbuf */
696
697         /* init btree private data */
698         oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
699         lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
700         ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
701
702         /* if we're splitting this page, it won't be the root when we're done */
703         lopaque->btpo_flags = oopaque->btpo_flags;
704         lopaque->btpo_flags &= ~BTP_ROOT;
705         ropaque->btpo_flags = lopaque->btpo_flags;
706         lopaque->btpo_prev = oopaque->btpo_prev;
707         lopaque->btpo_next = BufferGetBlockNumber(rbuf);
708         ropaque->btpo_prev = BufferGetBlockNumber(buf);
709         ropaque->btpo_next = oopaque->btpo_next;
710         lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
711
712         /*
713          * If the page we're splitting is not the rightmost page at its level in
714          * the tree, then the first entry on the page is the high key for the
715          * page.  We need to copy that to the right half.  Otherwise (meaning the
716          * rightmost page case), all the items on the right half will be user
717          * data.
718          */
719         rightoff = P_HIKEY;
720
721         if (!P_RIGHTMOST(oopaque))
722         {
723                 itemid = PageGetItemId(origpage, P_HIKEY);
724                 itemsz = ItemIdGetLength(itemid);
725                 item = (IndexTuple) PageGetItem(origpage, itemid);
726                 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
727                                                 LP_USED) == InvalidOffsetNumber)
728                         elog(PANIC, "failed to add hikey to the right sibling");
729                 rightoff = OffsetNumberNext(rightoff);
730         }
731
732         /*
733          * The "high key" for the new left page will be the first key that's going
734          * to go into the new right page.  This might be either the existing data
735          * item at position firstright, or the incoming tuple.
736          */
737         leftoff = P_HIKEY;
738         if (!newitemonleft && newitemoff == firstright)
739         {
740                 /* incoming tuple will become first on right page */
741                 itemsz = newitemsz;
742                 item = newitem;
743         }
744         else
745         {
746                 /* existing item at firstright will become first on right page */
747                 itemid = PageGetItemId(origpage, firstright);
748                 itemsz = ItemIdGetLength(itemid);
749                 item = (IndexTuple) PageGetItem(origpage, itemid);
750         }
751         if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
752                                         LP_USED) == InvalidOffsetNumber)
753                 elog(PANIC, "failed to add hikey to the left sibling");
754         leftoff = OffsetNumberNext(leftoff);
755
756         /*
757          * Now transfer all the data items to the appropriate page
758          */
759         maxoff = PageGetMaxOffsetNumber(origpage);
760
761         for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
762         {
763                 itemid = PageGetItemId(origpage, i);
764                 itemsz = ItemIdGetLength(itemid);
765                 item = (IndexTuple) PageGetItem(origpage, itemid);
766
767                 /* does new item belong before this one? */
768                 if (i == newitemoff)
769                 {
770                         if (newitemonleft)
771                         {
772                                 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
773                                                          "left sibling");
774                                 itup_off = leftoff;
775                                 itup_blkno = BufferGetBlockNumber(buf);
776                                 leftoff = OffsetNumberNext(leftoff);
777                         }
778                         else
779                         {
780                                 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
781                                                          "right sibling");
782                                 itup_off = rightoff;
783                                 itup_blkno = BufferGetBlockNumber(rbuf);
784                                 rightoff = OffsetNumberNext(rightoff);
785                         }
786                 }
787
788                 /* decide which page to put it on */
789                 if (i < firstright)
790                 {
791                         _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
792                                                  "left sibling");
793                         leftoff = OffsetNumberNext(leftoff);
794                 }
795                 else
796                 {
797                         _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
798                                                  "right sibling");
799                         rightoff = OffsetNumberNext(rightoff);
800                 }
801         }
802
803         /* cope with possibility that newitem goes at the end */
804         if (i <= newitemoff)
805         {
806                 if (newitemonleft)
807                 {
808                         _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
809                                                  "left sibling");
810                         itup_off = leftoff;
811                         itup_blkno = BufferGetBlockNumber(buf);
812                         leftoff = OffsetNumberNext(leftoff);
813                 }
814                 else
815                 {
816                         _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
817                                                  "right sibling");
818                         itup_off = rightoff;
819                         itup_blkno = BufferGetBlockNumber(rbuf);
820                         rightoff = OffsetNumberNext(rightoff);
821                 }
822         }
823
824         /*
825          * We have to grab the right sibling (if any) and fix the prev pointer
826          * there. We are guaranteed that this is deadlock-free since no other
827          * writer will be holding a lock on that page and trying to move left, and
828          * all readers release locks on a page before trying to fetch its
829          * neighbors.
830          */
831
832         if (!P_RIGHTMOST(ropaque))
833         {
834                 sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
835                 spage = BufferGetPage(sbuf);
836                 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
837                 if (sopaque->btpo_prev != ropaque->btpo_prev)
838                         elog(PANIC, "right sibling's left-link doesn't match");
839         }
840
841         /*
842          * Right sibling is locked, new siblings are prepared, but original page
843          * is not updated yet. Log changes before continuing.
844          *
845          * NO EREPORT(ERROR) till right sibling is updated.  We can get away with
846          * not starting the critical section till here because we haven't been
847          * scribbling on the original page yet, and we don't care about the
848          * new sibling until it's linked into the btree.
849          */
850         START_CRIT_SECTION();
851
852         MarkBufferDirty(buf);
853         MarkBufferDirty(rbuf);
854
855         if (!P_RIGHTMOST(ropaque))
856         {
857                 sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
858                 MarkBufferDirty(sbuf);
859         }
860
861         /* XLOG stuff */
862         if (!rel->rd_istemp)
863         {
864                 xl_btree_split xlrec;
865                 uint8           xlinfo;
866                 XLogRecPtr      recptr;
867                 XLogRecData rdata[4];
868
869                 xlrec.target.node = rel->rd_node;
870                 ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
871                 if (newitemonleft)
872                         xlrec.otherblk = BufferGetBlockNumber(rbuf);
873                 else
874                         xlrec.otherblk = BufferGetBlockNumber(buf);
875                 xlrec.leftblk = lopaque->btpo_prev;
876                 xlrec.rightblk = ropaque->btpo_next;
877                 xlrec.level = lopaque->btpo.level;
878
879                 /*
880                  * Direct access to page is not good but faster - we should implement
881                  * some new func in page API.  Note we only store the tuples
882                  * themselves, knowing that the item pointers are in the same order
883                  * and can be reconstructed by scanning the tuples.  See comments
884                  * for _bt_restore_page().
885                  */
886                 xlrec.leftlen = ((PageHeader) leftpage)->pd_special -
887                         ((PageHeader) leftpage)->pd_upper;
888
889                 rdata[0].data = (char *) &xlrec;
890                 rdata[0].len = SizeOfBtreeSplit;
891                 rdata[0].buffer = InvalidBuffer;
892                 rdata[0].next = &(rdata[1]);
893
894                 rdata[1].data = (char *) leftpage + ((PageHeader) leftpage)->pd_upper;
895                 rdata[1].len = xlrec.leftlen;
896                 rdata[1].buffer = InvalidBuffer;
897                 rdata[1].next = &(rdata[2]);
898
899                 rdata[2].data = (char *) rightpage + ((PageHeader) rightpage)->pd_upper;
900                 rdata[2].len = ((PageHeader) rightpage)->pd_special -
901                         ((PageHeader) rightpage)->pd_upper;
902                 rdata[2].buffer = InvalidBuffer;
903                 rdata[2].next = NULL;
904
905                 if (!P_RIGHTMOST(ropaque))
906                 {
907                         rdata[2].next = &(rdata[3]);
908                         rdata[3].data = NULL;
909                         rdata[3].len = 0;
910                         rdata[3].buffer = sbuf;
911                         rdata[3].buffer_std = true;
912                         rdata[3].next = NULL;
913                 }
914
915                 if (P_ISROOT(oopaque))
916                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
917                 else
918                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
919
920                 recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
921
922                 PageSetLSN(leftpage, recptr);
923                 PageSetTLI(leftpage, ThisTimeLineID);
924                 PageSetLSN(rightpage, recptr);
925                 PageSetTLI(rightpage, ThisTimeLineID);
926                 if (!P_RIGHTMOST(ropaque))
927                 {
928                         PageSetLSN(spage, recptr);
929                         PageSetTLI(spage, ThisTimeLineID);
930                 }
931         }
932
933         /*
934          * By here, the original data page has been split into two new halves, and
935          * these are correct.  The algorithm requires that the left page never
936          * move during a split, so we copy the new left page back on top of the
937          * original.  Note that this is not a waste of time, since we also require
938          * (in the page management code) that the center of a page always be
939          * clean, and the most efficient way to guarantee this is just to compact
940          * the data by reinserting it into a new left page.  (XXX the latter
941          * comment is probably obsolete.)
942          *
943          * It's a bit weird that we don't fill in the left page till after writing
944          * the XLOG entry, but not really worth changing.  Note that we use the
945          * origpage data (specifically its BTP_ROOT bit) while preparing the XLOG
946          * entry, so simply reshuffling the code won't do.
947          */
948
949         PageRestoreTempPage(leftpage, origpage);
950
951         END_CRIT_SECTION();
952
953         /* release the old right sibling */
954         if (!P_RIGHTMOST(ropaque))
955                 _bt_relbuf(rel, sbuf);
956
957         /* split's done */
958         return rbuf;
959 }
960
961 /*
962  *      _bt_findsplitloc() -- find an appropriate place to split a page.
963  *
964  * The idea here is to equalize the free space that will be on each split
965  * page, *after accounting for the inserted tuple*.  (If we fail to account
966  * for it, we might find ourselves with too little room on the page that
967  * it needs to go into!)
968  *
969  * If the page is the rightmost page on its level, we instead try to arrange
970  * for twice as much free space on the right as on the left.  In this way,
971  * when we are inserting successively increasing keys (consider sequences,
972  * timestamps, etc) we will end up with a tree whose pages are about 67% full,
973  * instead of the 50% full result that we'd get without this special case.
974  * (We could bias it even further to make the initially-loaded tree more full.
975  * But since the steady-state load for a btree is about 70%, we'd likely just
976  * be making more page-splitting work for ourselves later on, when we start
977  * seeing updates to existing tuples.)
978  *
979  * We are passed the intended insert position of the new tuple, expressed as
980  * the offsetnumber of the tuple it must go in front of.  (This could be
981  * maxoff+1 if the tuple is to go at the end.)
982  *
983  * We return the index of the first existing tuple that should go on the
984  * righthand page, plus a boolean indicating whether the new tuple goes on
985  * the left or right page.      The bool is necessary to disambiguate the case
986  * where firstright == newitemoff.
987  */
988 static OffsetNumber
989 _bt_findsplitloc(Relation rel,
990                                  Page page,
991                                  OffsetNumber newitemoff,
992                                  Size newitemsz,
993                                  bool *newitemonleft)
994 {
995         BTPageOpaque opaque;
996         OffsetNumber offnum;
997         OffsetNumber maxoff;
998         ItemId          itemid;
999         FindSplitData state;
1000         int                     leftspace,
1001                                 rightspace,
1002                                 goodenough,
1003                                 dataitemtotal,
1004                                 dataitemstoleft;
1005
1006         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1007
1008         /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
1009         newitemsz += sizeof(ItemIdData);
1010         state.newitemsz = newitemsz;
1011         state.is_leaf = P_ISLEAF(opaque);
1012         state.is_rightmost = P_RIGHTMOST(opaque);
1013         state.have_split = false;
1014
1015         /* Total free space available on a btree page, after fixed overhead */
1016         leftspace = rightspace =
1017                 PageGetPageSize(page) - SizeOfPageHeaderData -
1018                 MAXALIGN(sizeof(BTPageOpaqueData));
1019
1020         /*
1021          * Finding the best possible split would require checking all the possible
1022          * split points, because of the high-key and left-key special cases.
1023          * That's probably more work than it's worth; instead, stop as soon as we
1024          * find a "good-enough" split, where good-enough is defined as an
1025          * imbalance in free space of no more than pagesize/16 (arbitrary...) This
1026          * should let us stop near the middle on most pages, instead of plowing to
1027          * the end.
1028          */
1029         goodenough = leftspace / 16;
1030
1031         /* The right page will have the same high key as the old page */
1032         if (!P_RIGHTMOST(opaque))
1033         {
1034                 itemid = PageGetItemId(page, P_HIKEY);
1035                 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1036                                                          sizeof(ItemIdData));
1037         }
1038
1039         /* Count up total space in data items without actually scanning 'em */
1040         dataitemtotal = rightspace - (int) PageGetFreeSpace(page);
1041
1042         /*
1043          * Scan through the data items and calculate space usage for a split at
1044          * each possible position.
1045          */
1046         dataitemstoleft = 0;
1047         maxoff = PageGetMaxOffsetNumber(page);
1048
1049         for (offnum = P_FIRSTDATAKEY(opaque);
1050                  offnum <= maxoff;
1051                  offnum = OffsetNumberNext(offnum))
1052         {
1053                 Size            itemsz;
1054                 int                     leftfree,
1055                                         rightfree;
1056
1057                 itemid = PageGetItemId(page, offnum);
1058                 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1059
1060                 /*
1061                  * We have to allow for the current item becoming the high key of the
1062                  * left page; therefore it counts against left space as well as right
1063                  * space.
1064                  */
1065                 leftfree = leftspace - dataitemstoleft - (int) itemsz;
1066                 rightfree = rightspace - (dataitemtotal - dataitemstoleft);
1067
1068                 /*
1069                  * Will the new item go to left or right of split?
1070                  */
1071                 if (offnum > newitemoff)
1072                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1073                                                           true, itemsz);
1074                 else if (offnum < newitemoff)
1075                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1076                                                           false, itemsz);
1077                 else
1078                 {
1079                         /* need to try it both ways! */
1080                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1081                                                           true, itemsz);
1082                         /* here we are contemplating newitem as first on right */
1083                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1084                                                           false, newitemsz);
1085                 }
1086
1087                 /* Abort scan once we find a good-enough choice */
1088                 if (state.have_split && state.best_delta <= goodenough)
1089                         break;
1090
1091                 dataitemstoleft += itemsz;
1092         }
1093
1094         /*
1095          * I believe it is not possible to fail to find a feasible split, but just
1096          * in case ...
1097          */
1098         if (!state.have_split)
1099                 elog(ERROR, "could not find a feasible split point for \"%s\"",
1100                          RelationGetRelationName(rel));
1101
1102         *newitemonleft = state.newitemonleft;
1103         return state.firstright;
1104 }
1105
1106 /*
1107  * Subroutine to analyze a particular possible split choice (ie, firstright
1108  * and newitemonleft settings), and record the best split so far in *state.
1109  */
1110 static void
1111 _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
1112                                   int leftfree, int rightfree,
1113                                   bool newitemonleft, Size firstrightitemsz)
1114 {
1115         /*
1116          * Account for the new item on whichever side it is to be put.
1117          */
1118         if (newitemonleft)
1119                 leftfree -= (int) state->newitemsz;
1120         else
1121                 rightfree -= (int) state->newitemsz;
1122
1123         /*
1124          * If we are not on the leaf level, we will be able to discard the key
1125          * data from the first item that winds up on the right page.
1126          */
1127         if (!state->is_leaf)
1128                 rightfree += (int) firstrightitemsz -
1129                         (int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData));
1130
1131         /*
1132          * If feasible split point, remember best delta.
1133          */
1134         if (leftfree >= 0 && rightfree >= 0)
1135         {
1136                 int                     delta;
1137
1138                 if (state->is_rightmost)
1139                 {
1140                         /*
1141                          * On a rightmost page, try to equalize right free space with
1142                          * twice the left free space.  See comments for _bt_findsplitloc.
1143                          */
1144                         delta = (2 * leftfree) - rightfree;
1145                 }
1146                 else
1147                 {
1148                         /* Otherwise, aim for equal free space on both sides */
1149                         delta = leftfree - rightfree;
1150                 }
1151
1152                 if (delta < 0)
1153                         delta = -delta;
1154                 if (!state->have_split || delta < state->best_delta)
1155                 {
1156                         state->have_split = true;
1157                         state->newitemonleft = newitemonleft;
1158                         state->firstright = firstright;
1159                         state->best_delta = delta;
1160                 }
1161         }
1162 }
1163
1164 /*
1165  * _bt_insert_parent() -- Insert downlink into parent after a page split.
1166  *
1167  * On entry, buf and rbuf are the left and right split pages, which we
1168  * still hold write locks on per the L&Y algorithm.  We release the
1169  * write locks once we have write lock on the parent page.      (Any sooner,
1170  * and it'd be possible for some other process to try to split or delete
1171  * one of these pages, and get confused because it cannot find the downlink.)
1172  *
1173  * stack - stack showing how we got here.  May be NULL in cases that don't
1174  *                      have to be efficient (concurrent ROOT split, WAL recovery)
1175  * is_root - we split the true root
1176  * is_only - we split a page alone on its level (might have been fast root)
1177  *
1178  * This is exported so it can be called by nbtxlog.c.
1179  */
1180 void
1181 _bt_insert_parent(Relation rel,
1182                                   Buffer buf,
1183                                   Buffer rbuf,
1184                                   BTStack stack,
1185                                   bool is_root,
1186                                   bool is_only)
1187 {
1188         /*
1189          * Here we have to do something Lehman and Yao don't talk about: deal with
1190          * a root split and construction of a new root.  If our stack is empty
1191          * then we have just split a node on what had been the root level when we
1192          * descended the tree.  If it was still the root then we perform a
1193          * new-root construction.  If it *wasn't* the root anymore, search to find
1194          * the next higher level that someone constructed meanwhile, and find the
1195          * right place to insert as for the normal case.
1196          *
1197          * If we have to search for the parent level, we do so by re-descending
1198          * from the root.  This is not super-efficient, but it's rare enough not
1199          * to matter.  (This path is also taken when called from WAL recovery ---
1200          * we have no stack in that case.)
1201          */
1202         if (is_root)
1203         {
1204                 Buffer          rootbuf;
1205
1206                 Assert(stack == NULL);
1207                 Assert(is_only);
1208                 /* create a new root node and update the metapage */
1209                 rootbuf = _bt_newroot(rel, buf, rbuf);
1210                 /* release the split buffers */
1211                 _bt_relbuf(rel, rootbuf);
1212                 _bt_relbuf(rel, rbuf);
1213                 _bt_relbuf(rel, buf);
1214         }
1215         else
1216         {
1217                 BlockNumber bknum = BufferGetBlockNumber(buf);
1218                 BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1219                 Page            page = BufferGetPage(buf);
1220                 IndexTuple      new_item;
1221                 BTStackData fakestack;
1222                 IndexTuple      ritem;
1223                 Buffer          pbuf;
1224
1225                 if (stack == NULL)
1226                 {
1227                         BTPageOpaque lpageop;
1228
1229                         if (!InRecovery)
1230                                 elog(DEBUG2, "concurrent ROOT page split");
1231                         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1232                         /* Find the leftmost page at the next level up */
1233                         pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
1234                         /* Set up a phony stack entry pointing there */
1235                         stack = &fakestack;
1236                         stack->bts_blkno = BufferGetBlockNumber(pbuf);
1237                         stack->bts_offset = InvalidOffsetNumber;
1238                         /* bts_btentry will be initialized below */
1239                         stack->bts_parent = NULL;
1240                         _bt_relbuf(rel, pbuf);
1241                 }
1242
1243                 /* get high key from left page == lowest key on new right page */
1244                 ritem = (IndexTuple) PageGetItem(page,
1245                                                                                  PageGetItemId(page, P_HIKEY));
1246
1247                 /* form an index tuple that points at the new right page */
1248                 new_item = CopyIndexTuple(ritem);
1249                 ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
1250
1251                 /*
1252                  * Find the parent buffer and get the parent page.
1253                  *
1254                  * Oops - if we were moved right then we need to change stack item! We
1255                  * want to find parent pointing to where we are, right ?        - vadim
1256                  * 05/27/97
1257                  */
1258                 ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
1259
1260                 pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1261
1262                 /* Now we can unlock the children */
1263                 _bt_relbuf(rel, rbuf);
1264                 _bt_relbuf(rel, buf);
1265
1266                 /* Check for error only after writing children */
1267                 if (pbuf == InvalidBuffer)
1268                         elog(ERROR, "failed to re-find parent key in \"%s\"",
1269                                  RelationGetRelationName(rel));
1270
1271                 /* Recursively update the parent */
1272                 _bt_insertonpg(rel, pbuf, stack->bts_parent,
1273                                            0, NULL, new_item, stack->bts_offset,
1274                                            is_only);
1275
1276                 /* be tidy */
1277                 pfree(new_item);
1278         }
1279 }
1280
1281 /*
1282  *      _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1283  *                                               we last looked at in the parent.
1284  *
1285  *              This is possible because we save the downlink from the parent item,
1286  *              which is enough to uniquely identify it.  Insertions into the parent
1287  *              level could cause the item to move right; deletions could cause it
1288  *              to move left, but not left of the page we previously found it in.
1289  *
1290  *              Adjusts bts_blkno & bts_offset if changed.
1291  *
1292  *              Returns InvalidBuffer if item not found (should not happen).
1293  */
1294 Buffer
1295 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1296 {
1297         BlockNumber blkno;
1298         OffsetNumber start;
1299
1300         blkno = stack->bts_blkno;
1301         start = stack->bts_offset;
1302
1303         for (;;)
1304         {
1305                 Buffer          buf;
1306                 Page            page;
1307                 BTPageOpaque opaque;
1308
1309                 buf = _bt_getbuf(rel, blkno, access);
1310                 page = BufferGetPage(buf);
1311                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1312
1313                 if (!P_IGNORE(opaque))
1314                 {
1315                         OffsetNumber offnum,
1316                                                 minoff,
1317                                                 maxoff;
1318                         ItemId          itemid;
1319                         IndexTuple      item;
1320
1321                         minoff = P_FIRSTDATAKEY(opaque);
1322                         maxoff = PageGetMaxOffsetNumber(page);
1323
1324                         /*
1325                          * start = InvalidOffsetNumber means "search the whole page". We
1326                          * need this test anyway due to possibility that page has a high
1327                          * key now when it didn't before.
1328                          */
1329                         if (start < minoff)
1330                                 start = minoff;
1331
1332                         /*
1333                          * Need this check too, to guard against possibility that page
1334                          * split since we visited it originally.
1335                          */
1336                         if (start > maxoff)
1337                                 start = OffsetNumberNext(maxoff);
1338
1339                         /*
1340                          * These loops will check every item on the page --- but in an
1341                          * order that's attuned to the probability of where it actually
1342                          * is.  Scan to the right first, then to the left.
1343                          */
1344                         for (offnum = start;
1345                                  offnum <= maxoff;
1346                                  offnum = OffsetNumberNext(offnum))
1347                         {
1348                                 itemid = PageGetItemId(page, offnum);
1349                                 item = (IndexTuple) PageGetItem(page, itemid);
1350                                 if (BTEntrySame(item, &stack->bts_btentry))
1351                                 {
1352                                         /* Return accurate pointer to where link is now */
1353                                         stack->bts_blkno = blkno;
1354                                         stack->bts_offset = offnum;
1355                                         return buf;
1356                                 }
1357                         }
1358
1359                         for (offnum = OffsetNumberPrev(start);
1360                                  offnum >= minoff;
1361                                  offnum = OffsetNumberPrev(offnum))
1362                         {
1363                                 itemid = PageGetItemId(page, offnum);
1364                                 item = (IndexTuple) PageGetItem(page, itemid);
1365                                 if (BTEntrySame(item, &stack->bts_btentry))
1366                                 {
1367                                         /* Return accurate pointer to where link is now */
1368                                         stack->bts_blkno = blkno;
1369                                         stack->bts_offset = offnum;
1370                                         return buf;
1371                                 }
1372                         }
1373                 }
1374
1375                 /*
1376                  * The item we're looking for moved right at least one page.
1377                  */
1378                 if (P_RIGHTMOST(opaque))
1379                 {
1380                         _bt_relbuf(rel, buf);
1381                         return InvalidBuffer;
1382                 }
1383                 blkno = opaque->btpo_next;
1384                 start = InvalidOffsetNumber;
1385                 _bt_relbuf(rel, buf);
1386         }
1387 }
1388
1389 /*
1390  *      _bt_newroot() -- Create a new root page for the index.
1391  *
1392  *              We've just split the old root page and need to create a new one.
1393  *              In order to do this, we add a new root page to the file, then lock
1394  *              the metadata page and update it.  This is guaranteed to be deadlock-
1395  *              free, because all readers release their locks on the metadata page
1396  *              before trying to lock the root, and all writers lock the root before
1397  *              trying to lock the metadata page.  We have a write lock on the old
1398  *              root page, so we have not introduced any cycles into the waits-for
1399  *              graph.
1400  *
1401  *              On entry, lbuf (the old root) and rbuf (its new peer) are write-
1402  *              locked. On exit, a new root page exists with entries for the
1403  *              two new children, metapage is updated and unlocked/unpinned.
1404  *              The new root buffer is returned to caller which has to unlock/unpin
1405  *              lbuf, rbuf & rootbuf.
1406  */
1407 static Buffer
1408 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1409 {
1410         Buffer          rootbuf;
1411         Page            lpage,
1412                                 rootpage;
1413         BlockNumber lbkno,
1414                                 rbkno;
1415         BlockNumber rootblknum;
1416         BTPageOpaque rootopaque;
1417         ItemId          itemid;
1418         IndexTuple      item;
1419         Size            itemsz;
1420         IndexTuple      new_item;
1421         Buffer          metabuf;
1422         Page            metapg;
1423         BTMetaPageData *metad;
1424
1425         lbkno = BufferGetBlockNumber(lbuf);
1426         rbkno = BufferGetBlockNumber(rbuf);
1427         lpage = BufferGetPage(lbuf);
1428
1429         /* get a new root page */
1430         rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1431         rootpage = BufferGetPage(rootbuf);
1432         rootblknum = BufferGetBlockNumber(rootbuf);
1433
1434         /* acquire lock on the metapage */
1435         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1436         metapg = BufferGetPage(metabuf);
1437         metad = BTPageGetMeta(metapg);
1438
1439         /* NO EREPORT(ERROR) from here till newroot op is logged */
1440         START_CRIT_SECTION();
1441
1442         /* set btree special data */
1443         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1444         rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1445         rootopaque->btpo_flags = BTP_ROOT;
1446         rootopaque->btpo.level =
1447                 ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1448
1449         /* update metapage data */
1450         metad->btm_root = rootblknum;
1451         metad->btm_level = rootopaque->btpo.level;
1452         metad->btm_fastroot = rootblknum;
1453         metad->btm_fastlevel = rootopaque->btpo.level;
1454
1455         /*
1456          * Create downlink item for left page (old root).  Since this will be the
1457          * first item in a non-leaf page, it implicitly has minus-infinity key
1458          * value, so we need not store any actual key in it.
1459          */
1460         itemsz = sizeof(IndexTupleData);
1461         new_item = (IndexTuple) palloc(itemsz);
1462         new_item->t_info = itemsz;
1463         ItemPointerSet(&(new_item->t_tid), lbkno, P_HIKEY);
1464
1465         /*
1466          * Insert the left page pointer into the new root page.  The root page is
1467          * the rightmost page on its level so there is no "high key" in it; the
1468          * two items will go into positions P_HIKEY and P_FIRSTKEY.
1469          *
1470          * Note: we *must* insert the two items in item-number order, for the
1471          * benefit of _bt_restore_page().
1472          */
1473         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
1474                 elog(PANIC, "failed to add leftkey to new root page");
1475         pfree(new_item);
1476
1477         /*
1478          * Create downlink item for right page.  The key for it is obtained from
1479          * the "high key" position in the left page.
1480          */
1481         itemid = PageGetItemId(lpage, P_HIKEY);
1482         itemsz = ItemIdGetLength(itemid);
1483         item = (IndexTuple) PageGetItem(lpage, itemid);
1484         new_item = CopyIndexTuple(item);
1485         ItemPointerSet(&(new_item->t_tid), rbkno, P_HIKEY);
1486
1487         /*
1488          * insert the right page pointer into the new root page.
1489          */
1490         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
1491                 elog(PANIC, "failed to add rightkey to new root page");
1492         pfree(new_item);
1493
1494         MarkBufferDirty(rootbuf);
1495         MarkBufferDirty(metabuf);
1496
1497         /* XLOG stuff */
1498         if (!rel->rd_istemp)
1499         {
1500                 xl_btree_newroot xlrec;
1501                 XLogRecPtr      recptr;
1502                 XLogRecData rdata[2];
1503
1504                 xlrec.node = rel->rd_node;
1505                 xlrec.rootblk = rootblknum;
1506                 xlrec.level = metad->btm_level;
1507
1508                 rdata[0].data = (char *) &xlrec;
1509                 rdata[0].len = SizeOfBtreeNewroot;
1510                 rdata[0].buffer = InvalidBuffer;
1511                 rdata[0].next = &(rdata[1]);
1512
1513                 /*
1514                  * Direct access to page is not good but faster - we should implement
1515                  * some new func in page API.
1516                  */
1517                 rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
1518                 rdata[1].len = ((PageHeader) rootpage)->pd_special -
1519                         ((PageHeader) rootpage)->pd_upper;
1520                 rdata[1].buffer = InvalidBuffer;
1521                 rdata[1].next = NULL;
1522
1523                 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
1524
1525                 PageSetLSN(rootpage, recptr);
1526                 PageSetTLI(rootpage, ThisTimeLineID);
1527                 PageSetLSN(metapg, recptr);
1528                 PageSetTLI(metapg, ThisTimeLineID);
1529         }
1530
1531         END_CRIT_SECTION();
1532
1533         /* send out relcache inval for metapage change */
1534         CacheInvalidateRelcache(rel);
1535
1536         /* done with metapage */
1537         _bt_relbuf(rel, metabuf);
1538
1539         return rootbuf;
1540 }
1541
1542 /*
1543  *      _bt_pgaddtup() -- add a tuple to a particular page in the index.
1544  *
1545  *              This routine adds the tuple to the page as requested.  It does
1546  *              not affect pin/lock status, but you'd better have a write lock
1547  *              and pin on the target buffer!  Don't forget to write and release
1548  *              the buffer afterwards, either.
1549  *
1550  *              The main difference between this routine and a bare PageAddItem call
1551  *              is that this code knows that the leftmost index tuple on a non-leaf
1552  *              btree page doesn't need to have a key.  Therefore, it strips such
1553  *              tuples down to just the tuple header.  CAUTION: this works ONLY if
1554  *              we insert the tuples in order, so that the given itup_off does
1555  *              represent the final position of the tuple!
1556  */
1557 static void
1558 _bt_pgaddtup(Relation rel,
1559                          Page page,
1560                          Size itemsize,
1561                          IndexTuple itup,
1562                          OffsetNumber itup_off,
1563                          const char *where)
1564 {
1565         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1566         IndexTupleData trunctuple;
1567
1568         if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
1569         {
1570                 trunctuple = *itup;
1571                 trunctuple.t_info = sizeof(IndexTupleData);
1572                 itup = &trunctuple;
1573                 itemsize = sizeof(IndexTupleData);
1574         }
1575
1576         if (PageAddItem(page, (Item) itup, itemsize, itup_off,
1577                                         LP_USED) == InvalidOffsetNumber)
1578                 elog(PANIC, "failed to add item to the %s for \"%s\"",
1579                          where, RelationGetRelationName(rel));
1580 }
1581
1582 /*
1583  * _bt_isequal - used in _bt_doinsert in check for duplicates.
1584  *
1585  * This is very similar to _bt_compare, except for NULL handling.
1586  * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
1587  */
1588 static bool
1589 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
1590                         int keysz, ScanKey scankey)
1591 {
1592         IndexTuple      itup;
1593         int                     i;
1594
1595         /* Better be comparing to a leaf item */
1596         Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
1597
1598         itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
1599
1600         for (i = 1; i <= keysz; i++)
1601         {
1602                 AttrNumber      attno;
1603                 Datum           datum;
1604                 bool            isNull;
1605                 int32           result;
1606
1607                 attno = scankey->sk_attno;
1608                 Assert(attno == i);
1609                 datum = index_getattr(itup, attno, itupdesc, &isNull);
1610
1611                 /* NULLs are never equal to anything */
1612                 if (isNull || (scankey->sk_flags & SK_ISNULL))
1613                         return false;
1614
1615                 result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
1616                                                                                          datum,
1617                                                                                          scankey->sk_argument));
1618
1619                 if (result != 0)
1620                         return false;
1621
1622                 scankey++;
1623         }
1624
1625         /* if we get here, the keys are equal */
1626         return true;
1627 }