]> granicus.if.org Git - postgresql/blob - src/backend/access/nbtree/nbtinsert.c
3ef40a5cb6c96603bf957cc7e5d1ff7061dbcebf
[postgresql] / src / backend / access / nbtree / nbtinsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtinsert.c
4  *        Item insertion in Lehman and Yao btrees for Postgres.
5  *
6  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.128 2005/11/06 19:29:00 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "miscadmin.h"
21
22
23 typedef struct
24 {
25         /* context data for _bt_checksplitloc */
26         Size            newitemsz;              /* size of new item to be inserted */
27         bool            is_leaf;                /* T if splitting a leaf page */
28         bool            is_rightmost;   /* T if splitting a rightmost page */
29
30         bool            have_split;             /* found a valid split? */
31
32         /* these fields valid only if have_split is true */
33         bool            newitemonleft;  /* new item on left or right of best split */
34         OffsetNumber firstright;        /* best split point */
35         int                     best_delta;             /* best size delta so far */
36 } FindSplitData;
37
38
39 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
40
41 static TransactionId _bt_check_unique(Relation rel, BTItem btitem,
42                                  Relation heapRel, Buffer buf,
43                                  ScanKey itup_scankey);
44 static void _bt_insertonpg(Relation rel, Buffer buf,
45                            BTStack stack,
46                            int keysz, ScanKey scankey,
47                            BTItem btitem,
48                            OffsetNumber afteritem,
49                            bool split_only_page);
50 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
51                   OffsetNumber newitemoff, Size newitemsz,
52                   BTItem newitem, bool newitemonleft);
53 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
54                                  OffsetNumber newitemoff,
55                                  Size newitemsz,
56                                  bool *newitemonleft);
57 static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
58                                   int leftfree, int rightfree,
59                                   bool newitemonleft, Size firstrightitemsz);
60 static void _bt_pgaddtup(Relation rel, Page page,
61                          Size itemsize, BTItem btitem,
62                          OffsetNumber itup_off, const char *where);
63 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
64                         int keysz, ScanKey scankey);
65
66
67 /*
68  *      _bt_doinsert() -- Handle insertion of a single btitem in the tree.
69  *
70  *              This routine is called by the public interface routines, btbuild
71  *              and btinsert.  By here, btitem is filled in, including the TID.
72  */
73 void
74 _bt_doinsert(Relation rel, BTItem btitem,
75                          bool index_is_unique, Relation heapRel)
76 {
77         IndexTuple      itup = &(btitem->bti_itup);
78         int                     natts = rel->rd_rel->relnatts;
79         ScanKey         itup_scankey;
80         BTStack         stack;
81         Buffer          buf;
82
83         /* we need a scan key to do our search, so build one */
84         itup_scankey = _bt_mkscankey(rel, itup);
85
86 top:
87         /* find the first page containing this key */
88         stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
89
90         /* trade in our read lock for a write lock */
91         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
92         LockBuffer(buf, BT_WRITE);
93
94         /*
95          * If the page was split between the time that we surrendered our read
96          * lock and acquired our write lock, then this page may no longer be the
97          * right place for the key we want to insert.  In this case, we need to
98          * move right in the tree.      See Lehman and Yao for an excruciatingly
99          * precise description.
100          */
101         buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
102
103         /*
104          * If we're not allowing duplicates, make sure the key isn't already in
105          * the index.
106          *
107          * NOTE: obviously, _bt_check_unique can only detect keys that are already in
108          * the index; so it cannot defend against concurrent insertions of the
109          * same key.  We protect against that by means of holding a write lock on
110          * the target page.  Any other would-be inserter of the same key must
111          * acquire a write lock on the same target page, so only one would-be
112          * inserter can be making the check at one time.  Furthermore, once we are
113          * past the check we hold write locks continuously until we have performed
114          * our insertion, so no later inserter can fail to see our insertion.
115          * (This requires some care in _bt_insertonpg.)
116          *
117          * If we must wait for another xact, we release the lock while waiting, and
118          * then must start over completely.
119          */
120         if (index_is_unique)
121         {
122                 TransactionId xwait;
123
124                 xwait = _bt_check_unique(rel, btitem, heapRel, buf, itup_scankey);
125
126                 if (TransactionIdIsValid(xwait))
127                 {
128                         /* Have to wait for the other guy ... */
129                         _bt_relbuf(rel, buf);
130                         XactLockTableWait(xwait);
131                         /* start over... */
132                         _bt_freestack(stack);
133                         goto top;
134                 }
135         }
136
137         /* do the insertion */
138         _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem, 0, false);
139
140         /* be tidy */
141         _bt_freestack(stack);
142         _bt_freeskey(itup_scankey);
143 }
144
145 /*
146  *      _bt_check_unique() -- Check for violation of unique index constraint
147  *
148  * Returns InvalidTransactionId if there is no conflict, else an xact ID
149  * we must wait for to see if it commits a conflicting tuple.   If an actual
150  * conflict is detected, no return --- just ereport().
151  */
152 static TransactionId
153 _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel,
154                                  Buffer buf, ScanKey itup_scankey)
155 {
156         TupleDesc       itupdesc = RelationGetDescr(rel);
157         int                     natts = rel->rd_rel->relnatts;
158         OffsetNumber offset,
159                                 maxoff;
160         Page            page;
161         BTPageOpaque opaque;
162         Buffer          nbuf = InvalidBuffer;
163
164         page = BufferGetPage(buf);
165         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
166         maxoff = PageGetMaxOffsetNumber(page);
167
168         /*
169          * Find first item >= proposed new item.  Note we could also get a pointer
170          * to end-of-page here.
171          */
172         offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
173
174         /*
175          * Scan over all equal tuples, looking for live conflicts.
176          */
177         for (;;)
178         {
179                 HeapTupleData htup;
180                 Buffer          hbuffer;
181                 ItemId          curitemid;
182                 BTItem          cbti;
183                 BlockNumber nblkno;
184
185                 /*
186                  * make sure the offset points to an actual item before trying to
187                  * examine it...
188                  */
189                 if (offset <= maxoff)
190                 {
191                         curitemid = PageGetItemId(page, offset);
192
193                         /*
194                          * We can skip items that are marked killed.
195                          *
196                          * Formerly, we applied _bt_isequal() before checking the kill flag,
197                          * so as to fall out of the item loop as soon as possible.
198                          * However, in the presence of heavy update activity an index may
199                          * contain many killed items with the same key; running
200                          * _bt_isequal() on each killed item gets expensive. Furthermore
201                          * it is likely that the non-killed version of each key appears
202                          * first, so that we didn't actually get to exit any sooner
203                          * anyway. So now we just advance over killed items as quickly as
204                          * we can. We only apply _bt_isequal() when we get to a non-killed
205                          * item or the end of the page.
206                          */
207                         if (!ItemIdDeleted(curitemid))
208                         {
209                                 /*
210                                  * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
211                                  * how we handling NULLs - and so we must not use _bt_compare
212                                  * in real comparison, but only for ordering/finding items on
213                                  * pages. - vadim 03/24/97
214                                  */
215                                 if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
216                                         break;          /* we're past all the equal tuples */
217
218                                 /* okay, we gotta fetch the heap tuple ... */
219                                 cbti = (BTItem) PageGetItem(page, curitemid);
220                                 htup.t_self = cbti->bti_itup.t_tid;
221                                 if (heap_fetch(heapRel, SnapshotDirty, &htup, &hbuffer,
222                                                            true, NULL))
223                                 {
224                                         /* it is a duplicate */
225                                         TransactionId xwait =
226                                         (TransactionIdIsValid(SnapshotDirty->xmin)) ?
227                                         SnapshotDirty->xmin : SnapshotDirty->xmax;
228
229                                         ReleaseBuffer(hbuffer);
230
231                                         /*
232                                          * If this tuple is being updated by other transaction
233                                          * then we have to wait for its commit/abort.
234                                          */
235                                         if (TransactionIdIsValid(xwait))
236                                         {
237                                                 if (nbuf != InvalidBuffer)
238                                                         _bt_relbuf(rel, nbuf);
239                                                 /* Tell _bt_doinsert to wait... */
240                                                 return xwait;
241                                         }
242
243                                         /*
244                                          * Otherwise we have a definite conflict.
245                                          */
246                                         ereport(ERROR,
247                                                         (errcode(ERRCODE_UNIQUE_VIOLATION),
248                                         errmsg("duplicate key violates unique constraint \"%s\"",
249                                                    RelationGetRelationName(rel))));
250                                 }
251                                 else if (htup.t_data != NULL)
252                                 {
253                                         /*
254                                          * Hmm, if we can't see the tuple, maybe it can be marked
255                                          * killed.      This logic should match index_getnext and
256                                          * btgettuple.
257                                          */
258                                         LockBuffer(hbuffer, BUFFER_LOCK_SHARE);
259                                         if (HeapTupleSatisfiesVacuum(htup.t_data, RecentGlobalXmin,
260                                                                                                  hbuffer) == HEAPTUPLE_DEAD)
261                                         {
262                                                 curitemid->lp_flags |= LP_DELETE;
263                                                 if (nbuf != InvalidBuffer)
264                                                         SetBufferCommitInfoNeedsSave(nbuf);
265                                                 else
266                                                         SetBufferCommitInfoNeedsSave(buf);
267                                         }
268                                         LockBuffer(hbuffer, BUFFER_LOCK_UNLOCK);
269                                 }
270                                 ReleaseBuffer(hbuffer);
271                         }
272                 }
273
274                 /*
275                  * Advance to next tuple to continue checking.
276                  */
277                 if (offset < maxoff)
278                         offset = OffsetNumberNext(offset);
279                 else
280                 {
281                         /* If scankey == hikey we gotta check the next page too */
282                         if (P_RIGHTMOST(opaque))
283                                 break;
284                         if (!_bt_isequal(itupdesc, page, P_HIKEY,
285                                                          natts, itup_scankey))
286                                 break;
287                         /* Advance to next non-dead page --- there must be one */
288                         for (;;)
289                         {
290                                 nblkno = opaque->btpo_next;
291                                 nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
292                                 page = BufferGetPage(nbuf);
293                                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
294                                 if (!P_IGNORE(opaque))
295                                         break;
296                                 if (P_RIGHTMOST(opaque))
297                                         elog(ERROR, "fell off the end of \"%s\"",
298                                                  RelationGetRelationName(rel));
299                         }
300                         maxoff = PageGetMaxOffsetNumber(page);
301                         offset = P_FIRSTDATAKEY(opaque);
302                 }
303         }
304
305         if (nbuf != InvalidBuffer)
306                 _bt_relbuf(rel, nbuf);
307
308         return InvalidTransactionId;
309 }
310
311 /*----------
312  *      _bt_insertonpg() -- Insert a tuple on a particular page in the index.
313  *
314  *              This recursive procedure does the following things:
315  *
316  *                      +  finds the right place to insert the tuple.
317  *                      +  if necessary, splits the target page (making sure that the
318  *                         split is equitable as far as post-insert free space goes).
319  *                      +  inserts the tuple.
320  *                      +  if the page was split, pops the parent stack, and finds the
321  *                         right place to insert the new child pointer (by walking
322  *                         right using information stored in the parent stack).
323  *                      +  invokes itself with the appropriate tuple for the right
324  *                         child page on the parent.
325  *                      +  updates the metapage if a true root or fast root is split.
326  *
327  *              On entry, we must have the right buffer on which to do the
328  *              insertion, and the buffer must be pinned and locked.  On return,
329  *              we will have dropped both the pin and the write lock on the buffer.
330  *
331  *              If 'afteritem' is >0 then the new tuple must be inserted after the
332  *              existing item of that number, noplace else.  If 'afteritem' is 0
333  *              then the procedure finds the exact spot to insert it by searching.
334  *              (keysz and scankey parameters are used ONLY if afteritem == 0.)
335  *
336  *              NOTE: if the new key is equal to one or more existing keys, we can
337  *              legitimately place it anywhere in the series of equal keys --- in fact,
338  *              if the new key is equal to the page's "high key" we can place it on
339  *              the next page.  If it is equal to the high key, and there's not room
340  *              to insert the new tuple on the current page without splitting, then
341  *              we can move right hoping to find more free space and avoid a split.
342  *              (We should not move right indefinitely, however, since that leads to
343  *              O(N^2) insertion behavior in the presence of many equal keys.)
344  *              Once we have chosen the page to put the key on, we'll insert it before
345  *              any existing equal keys because of the way _bt_binsrch() works.
346  *
347  *              The locking interactions in this code are critical.  You should
348  *              grok Lehman and Yao's paper before making any changes.  In addition,
349  *              you need to understand how we disambiguate duplicate keys in this
350  *              implementation, in order to be able to find our location using
351  *              L&Y "move right" operations.  Since we may insert duplicate user
352  *              keys, and since these dups may propagate up the tree, we use the
353  *              'afteritem' parameter to position ourselves correctly for the
354  *              insertion on internal pages.
355  *----------
356  */
357 static void
358 _bt_insertonpg(Relation rel,
359                            Buffer buf,
360                            BTStack stack,
361                            int keysz,
362                            ScanKey scankey,
363                            BTItem btitem,
364                            OffsetNumber afteritem,
365                            bool split_only_page)
366 {
367         Page            page;
368         BTPageOpaque lpageop;
369         OffsetNumber newitemoff;
370         OffsetNumber firstright = InvalidOffsetNumber;
371         Size            itemsz;
372
373         page = BufferGetPage(buf);
374         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
375
376         itemsz = IndexTupleDSize(btitem->bti_itup)
377                 + (sizeof(BTItemData) - sizeof(IndexTupleData));
378
379         itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but we
380                                                                  * need to be consistent */
381
382         /*
383          * Check whether the item can fit on a btree page at all. (Eventually, we
384          * ought to try to apply TOAST methods if not.) We actually need to be
385          * able to fit three items on every page, so restrict any one item to 1/3
386          * the per-page available space. Note that at this point, itemsz doesn't
387          * include the ItemId.
388          */
389         if (itemsz > BTMaxItemSize(page))
390                 ereport(ERROR,
391                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
392                                  errmsg("index row size %lu exceeds btree maximum, %lu",
393                                                 (unsigned long) itemsz,
394                                                 (unsigned long) BTMaxItemSize(page)),
395                 errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
396                                 "Consider a function index of an MD5 hash of the value, "
397                                 "or use full text indexing.")));
398
399         /*
400          * Determine exactly where new item will go.
401          */
402         if (afteritem > 0)
403                 newitemoff = afteritem + 1;
404         else
405         {
406                 /*----------
407                  * If we will need to split the page to put the item here,
408                  * check whether we can put the tuple somewhere to the right,
409                  * instead.  Keep scanning right until we
410                  *              (a) find a page with enough free space,
411                  *              (b) reach the last page where the tuple can legally go, or
412                  *              (c) get tired of searching.
413                  * (c) is not flippant; it is important because if there are many
414                  * pages' worth of equal keys, it's better to split one of the early
415                  * pages than to scan all the way to the end of the run of equal keys
416                  * on every insert.  We implement "get tired" as a random choice,
417                  * since stopping after scanning a fixed number of pages wouldn't work
418                  * well (we'd never reach the right-hand side of previously split
419                  * pages).      Currently the probability of moving right is set at 0.99,
420                  * which may seem too high to change the behavior much, but it does an
421                  * excellent job of preventing O(N^2) behavior with many equal keys.
422                  *----------
423                  */
424                 bool            movedright = false;
425
426                 while (PageGetFreeSpace(page) < itemsz &&
427                            !P_RIGHTMOST(lpageop) &&
428                            _bt_compare(rel, keysz, scankey, page, P_HIKEY) == 0 &&
429                            random() > (MAX_RANDOM_VALUE / 100))
430                 {
431                         /*
432                          * step right to next non-dead page
433                          *
434                          * must write-lock that page before releasing write lock on current
435                          * page; else someone else's _bt_check_unique scan could fail to
436                          * see our insertion.  write locks on intermediate dead pages
437                          * won't do because we don't know when they will get de-linked
438                          * from the tree.
439                          */
440                         Buffer          rbuf = InvalidBuffer;
441
442                         for (;;)
443                         {
444                                 BlockNumber rblkno = lpageop->btpo_next;
445
446                                 rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
447                                 page = BufferGetPage(rbuf);
448                                 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
449                                 if (!P_IGNORE(lpageop))
450                                         break;
451                                 if (P_RIGHTMOST(lpageop))
452                                         elog(ERROR, "fell off the end of \"%s\"",
453                                                  RelationGetRelationName(rel));
454                         }
455                         _bt_relbuf(rel, buf);
456                         buf = rbuf;
457                         movedright = true;
458                 }
459
460                 /*
461                  * Now we are on the right page, so find the insert position. If we
462                  * moved right at all, we know we should insert at the start of the
463                  * page, else must find the position by searching.
464                  */
465                 if (movedright)
466                         newitemoff = P_FIRSTDATAKEY(lpageop);
467                 else
468                         newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
469         }
470
471         /*
472          * Do we need to split the page to fit the item on it?
473          *
474          * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result, so
475          * this comparison is correct even though we appear to be accounting only
476          * for the item and not for its line pointer.
477          */
478         if (PageGetFreeSpace(page) < itemsz)
479         {
480                 bool            is_root = P_ISROOT(lpageop);
481                 bool            is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
482                 bool            newitemonleft;
483                 Buffer          rbuf;
484
485                 /* Choose the split point */
486                 firstright = _bt_findsplitloc(rel, page,
487                                                                           newitemoff, itemsz,
488                                                                           &newitemonleft);
489
490                 /* split the buffer into left and right halves */
491                 rbuf = _bt_split(rel, buf, firstright,
492                                                  newitemoff, itemsz, btitem, newitemonleft);
493
494                 /*----------
495                  * By here,
496                  *
497                  *              +  our target page has been split;
498                  *              +  the original tuple has been inserted;
499                  *              +  we have write locks on both the old (left half)
500                  *                 and new (right half) buffers, after the split; and
501                  *              +  we know the key we want to insert into the parent
502                  *                 (it's the "high key" on the left child page).
503                  *
504                  * We're ready to do the parent insertion.  We need to hold onto the
505                  * locks for the child pages until we locate the parent, but we can
506                  * release them before doing the actual insertion (see Lehman and Yao
507                  * for the reasoning).
508                  *----------
509                  */
510                 _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
511         }
512         else
513         {
514                 Buffer          metabuf = InvalidBuffer;
515                 Page            metapg = NULL;
516                 BTMetaPageData *metad = NULL;
517                 OffsetNumber itup_off;
518                 BlockNumber itup_blkno;
519
520                 itup_off = newitemoff;
521                 itup_blkno = BufferGetBlockNumber(buf);
522
523                 /*
524                  * If we are doing this insert because we split a page that was the
525                  * only one on its tree level, but was not the root, it may have been
526                  * the "fast root".  We need to ensure that the fast root link points
527                  * at or above the current page.  We can safely acquire a lock on the
528                  * metapage here --- see comments for _bt_newroot().
529                  */
530                 if (split_only_page)
531                 {
532                         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
533                         metapg = BufferGetPage(metabuf);
534                         metad = BTPageGetMeta(metapg);
535
536                         if (metad->btm_fastlevel >= lpageop->btpo.level)
537                         {
538                                 /* no update wanted */
539                                 _bt_relbuf(rel, metabuf);
540                                 metabuf = InvalidBuffer;
541                         }
542                 }
543
544                 /* Do the update.  No ereport(ERROR) until changes are logged */
545                 START_CRIT_SECTION();
546
547                 _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
548
549                 if (BufferIsValid(metabuf))
550                 {
551                         metad->btm_fastroot = itup_blkno;
552                         metad->btm_fastlevel = lpageop->btpo.level;
553                 }
554
555                 /* XLOG stuff */
556                 if (!rel->rd_istemp)
557                 {
558                         xl_btree_insert xlrec;
559                         xl_btree_metadata xlmeta;
560                         uint8           xlinfo;
561                         XLogRecPtr      recptr;
562                         XLogRecData rdata[3];
563                         XLogRecData *nextrdata;
564                         BTItemData      truncitem;
565
566                         xlrec.target.node = rel->rd_node;
567                         ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
568
569                         rdata[0].data = (char *) &xlrec;
570                         rdata[0].len = SizeOfBtreeInsert;
571                         rdata[0].buffer = InvalidBuffer;
572                         rdata[0].next = nextrdata = &(rdata[1]);
573
574                         if (BufferIsValid(metabuf))
575                         {
576                                 xlmeta.root = metad->btm_root;
577                                 xlmeta.level = metad->btm_level;
578                                 xlmeta.fastroot = metad->btm_fastroot;
579                                 xlmeta.fastlevel = metad->btm_fastlevel;
580
581                                 nextrdata->data = (char *) &xlmeta;
582                                 nextrdata->len = sizeof(xl_btree_metadata);
583                                 nextrdata->buffer = InvalidBuffer;
584                                 nextrdata->next = nextrdata + 1;
585                                 nextrdata++;
586                                 xlinfo = XLOG_BTREE_INSERT_META;
587                         }
588                         else if (P_ISLEAF(lpageop))
589                                 xlinfo = XLOG_BTREE_INSERT_LEAF;
590                         else
591                                 xlinfo = XLOG_BTREE_INSERT_UPPER;
592
593                         /* Read comments in _bt_pgaddtup */
594                         if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
595                         {
596                                 truncitem = *btitem;
597                                 truncitem.bti_itup.t_info = sizeof(BTItemData);
598                                 nextrdata->data = (char *) &truncitem;
599                                 nextrdata->len = sizeof(BTItemData);
600                         }
601                         else
602                         {
603                                 nextrdata->data = (char *) btitem;
604                                 nextrdata->len = IndexTupleDSize(btitem->bti_itup) +
605                                         (sizeof(BTItemData) - sizeof(IndexTupleData));
606                         }
607                         nextrdata->buffer = buf;
608                         nextrdata->buffer_std = true;
609                         nextrdata->next = NULL;
610
611                         recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
612
613                         if (BufferIsValid(metabuf))
614                         {
615                                 PageSetLSN(metapg, recptr);
616                                 PageSetTLI(metapg, ThisTimeLineID);
617                         }
618
619                         PageSetLSN(page, recptr);
620                         PageSetTLI(page, ThisTimeLineID);
621                 }
622
623                 END_CRIT_SECTION();
624
625                 /* Write out the updated page and release pin/lock */
626                 if (BufferIsValid(metabuf))
627                         _bt_wrtbuf(rel, metabuf);
628
629                 _bt_wrtbuf(rel, buf);
630         }
631 }
632
633 /*
634  *      _bt_split() -- split a page in the btree.
635  *
636  *              On entry, buf is the page to split, and is write-locked and pinned.
637  *              firstright is the item index of the first item to be moved to the
638  *              new right page.  newitemoff etc. tell us about the new item that
639  *              must be inserted along with the data from the old page.
640  *
641  *              Returns the new right sibling of buf, pinned and write-locked.
642  *              The pin and lock on buf are maintained.
643  */
644 static Buffer
645 _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
646                   OffsetNumber newitemoff, Size newitemsz, BTItem newitem,
647                   bool newitemonleft)
648 {
649         Buffer          rbuf;
650         Page            origpage;
651         Page            leftpage,
652                                 rightpage;
653         BTPageOpaque ropaque,
654                                 lopaque,
655                                 oopaque;
656         Buffer          sbuf = InvalidBuffer;
657         Page            spage = NULL;
658         BTPageOpaque sopaque = NULL;
659         OffsetNumber itup_off = 0;
660         BlockNumber itup_blkno = 0;
661         Size            itemsz;
662         ItemId          itemid;
663         BTItem          item;
664         OffsetNumber leftoff,
665                                 rightoff;
666         OffsetNumber maxoff;
667         OffsetNumber i;
668
669         rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
670         origpage = BufferGetPage(buf);
671         leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
672         rightpage = BufferGetPage(rbuf);
673
674         _bt_pageinit(leftpage, BufferGetPageSize(buf));
675         /* rightpage was already initialized by _bt_getbuf */
676
677         /* init btree private data */
678         oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
679         lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
680         ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
681
682         /* if we're splitting this page, it won't be the root when we're done */
683         lopaque->btpo_flags = oopaque->btpo_flags;
684         lopaque->btpo_flags &= ~BTP_ROOT;
685         ropaque->btpo_flags = lopaque->btpo_flags;
686         lopaque->btpo_prev = oopaque->btpo_prev;
687         lopaque->btpo_next = BufferGetBlockNumber(rbuf);
688         ropaque->btpo_prev = BufferGetBlockNumber(buf);
689         ropaque->btpo_next = oopaque->btpo_next;
690         lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
691
692         /*
693          * If the page we're splitting is not the rightmost page at its level in
694          * the tree, then the first entry on the page is the high key for the
695          * page.  We need to copy that to the right half.  Otherwise (meaning the
696          * rightmost page case), all the items on the right half will be user
697          * data.
698          */
699         rightoff = P_HIKEY;
700
701         if (!P_RIGHTMOST(oopaque))
702         {
703                 itemid = PageGetItemId(origpage, P_HIKEY);
704                 itemsz = ItemIdGetLength(itemid);
705                 item = (BTItem) PageGetItem(origpage, itemid);
706                 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
707                                                 LP_USED) == InvalidOffsetNumber)
708                         elog(PANIC, "failed to add hikey to the right sibling");
709                 rightoff = OffsetNumberNext(rightoff);
710         }
711
712         /*
713          * The "high key" for the new left page will be the first key that's going
714          * to go into the new right page.  This might be either the existing data
715          * item at position firstright, or the incoming tuple.
716          */
717         leftoff = P_HIKEY;
718         if (!newitemonleft && newitemoff == firstright)
719         {
720                 /* incoming tuple will become first on right page */
721                 itemsz = newitemsz;
722                 item = newitem;
723         }
724         else
725         {
726                 /* existing item at firstright will become first on right page */
727                 itemid = PageGetItemId(origpage, firstright);
728                 itemsz = ItemIdGetLength(itemid);
729                 item = (BTItem) PageGetItem(origpage, itemid);
730         }
731         if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
732                                         LP_USED) == InvalidOffsetNumber)
733                 elog(PANIC, "failed to add hikey to the left sibling");
734         leftoff = OffsetNumberNext(leftoff);
735
736         /*
737          * Now transfer all the data items to the appropriate page
738          */
739         maxoff = PageGetMaxOffsetNumber(origpage);
740
741         for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
742         {
743                 itemid = PageGetItemId(origpage, i);
744                 itemsz = ItemIdGetLength(itemid);
745                 item = (BTItem) PageGetItem(origpage, itemid);
746
747                 /* does new item belong before this one? */
748                 if (i == newitemoff)
749                 {
750                         if (newitemonleft)
751                         {
752                                 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
753                                                          "left sibling");
754                                 itup_off = leftoff;
755                                 itup_blkno = BufferGetBlockNumber(buf);
756                                 leftoff = OffsetNumberNext(leftoff);
757                         }
758                         else
759                         {
760                                 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
761                                                          "right sibling");
762                                 itup_off = rightoff;
763                                 itup_blkno = BufferGetBlockNumber(rbuf);
764                                 rightoff = OffsetNumberNext(rightoff);
765                         }
766                 }
767
768                 /* decide which page to put it on */
769                 if (i < firstright)
770                 {
771                         _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
772                                                  "left sibling");
773                         leftoff = OffsetNumberNext(leftoff);
774                 }
775                 else
776                 {
777                         _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
778                                                  "right sibling");
779                         rightoff = OffsetNumberNext(rightoff);
780                 }
781         }
782
783         /* cope with possibility that newitem goes at the end */
784         if (i <= newitemoff)
785         {
786                 if (newitemonleft)
787                 {
788                         _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
789                                                  "left sibling");
790                         itup_off = leftoff;
791                         itup_blkno = BufferGetBlockNumber(buf);
792                         leftoff = OffsetNumberNext(leftoff);
793                 }
794                 else
795                 {
796                         _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
797                                                  "right sibling");
798                         itup_off = rightoff;
799                         itup_blkno = BufferGetBlockNumber(rbuf);
800                         rightoff = OffsetNumberNext(rightoff);
801                 }
802         }
803
804         /*
805          * We have to grab the right sibling (if any) and fix the prev pointer
806          * there. We are guaranteed that this is deadlock-free since no other
807          * writer will be holding a lock on that page and trying to move left, and
808          * all readers release locks on a page before trying to fetch its
809          * neighbors.
810          */
811
812         if (!P_RIGHTMOST(ropaque))
813         {
814                 sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
815                 spage = BufferGetPage(sbuf);
816                 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
817                 if (sopaque->btpo_prev != ropaque->btpo_prev)
818                         elog(PANIC, "right sibling's left-link doesn't match");
819         }
820
821         /*
822          * Right sibling is locked, new siblings are prepared, but original page
823          * is not updated yet. Log changes before continuing.
824          *
825          * NO EREPORT(ERROR) till right sibling is updated.
826          */
827         START_CRIT_SECTION();
828
829         if (!P_RIGHTMOST(ropaque))
830                 sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
831
832         /* XLOG stuff */
833         if (!rel->rd_istemp)
834         {
835                 xl_btree_split xlrec;
836                 uint8           xlinfo;
837                 XLogRecPtr      recptr;
838                 XLogRecData rdata[4];
839
840                 xlrec.target.node = rel->rd_node;
841                 ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
842                 if (newitemonleft)
843                         xlrec.otherblk = BufferGetBlockNumber(rbuf);
844                 else
845                         xlrec.otherblk = BufferGetBlockNumber(buf);
846                 xlrec.leftblk = lopaque->btpo_prev;
847                 xlrec.rightblk = ropaque->btpo_next;
848                 xlrec.level = lopaque->btpo.level;
849
850                 /*
851                  * Direct access to page is not good but faster - we should implement
852                  * some new func in page API.  Note we only store the tuples
853                  * themselves, knowing that the item pointers are in the same order
854                  * and can be reconstructed by scanning the tuples.
855                  */
856                 xlrec.leftlen = ((PageHeader) leftpage)->pd_special -
857                         ((PageHeader) leftpage)->pd_upper;
858
859                 rdata[0].data = (char *) &xlrec;
860                 rdata[0].len = SizeOfBtreeSplit;
861                 rdata[0].buffer = InvalidBuffer;
862                 rdata[0].next = &(rdata[1]);
863
864                 rdata[1].data = (char *) leftpage + ((PageHeader) leftpage)->pd_upper;
865                 rdata[1].len = xlrec.leftlen;
866                 rdata[1].buffer = InvalidBuffer;
867                 rdata[1].next = &(rdata[2]);
868
869                 rdata[2].data = (char *) rightpage + ((PageHeader) rightpage)->pd_upper;
870                 rdata[2].len = ((PageHeader) rightpage)->pd_special -
871                         ((PageHeader) rightpage)->pd_upper;
872                 rdata[2].buffer = InvalidBuffer;
873                 rdata[2].next = NULL;
874
875                 if (!P_RIGHTMOST(ropaque))
876                 {
877                         rdata[2].next = &(rdata[3]);
878                         rdata[3].data = NULL;
879                         rdata[3].len = 0;
880                         rdata[3].buffer = sbuf;
881                         rdata[3].buffer_std = true;
882                         rdata[3].next = NULL;
883                 }
884
885                 if (P_ISROOT(oopaque))
886                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
887                 else
888                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
889
890                 recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
891
892                 PageSetLSN(leftpage, recptr);
893                 PageSetTLI(leftpage, ThisTimeLineID);
894                 PageSetLSN(rightpage, recptr);
895                 PageSetTLI(rightpage, ThisTimeLineID);
896                 if (!P_RIGHTMOST(ropaque))
897                 {
898                         PageSetLSN(spage, recptr);
899                         PageSetTLI(spage, ThisTimeLineID);
900                 }
901         }
902
903         /*
904          * By here, the original data page has been split into two new halves, and
905          * these are correct.  The algorithm requires that the left page never
906          * move during a split, so we copy the new left page back on top of the
907          * original.  Note that this is not a waste of time, since we also require
908          * (in the page management code) that the center of a page always be
909          * clean, and the most efficient way to guarantee this is just to compact
910          * the data by reinserting it into a new left page.
911          */
912
913         PageRestoreTempPage(leftpage, origpage);
914
915         END_CRIT_SECTION();
916
917         /* write and release the old right sibling */
918         if (!P_RIGHTMOST(ropaque))
919                 _bt_wrtbuf(rel, sbuf);
920
921         /* split's done */
922         return rbuf;
923 }
924
925 /*
926  *      _bt_findsplitloc() -- find an appropriate place to split a page.
927  *
928  * The idea here is to equalize the free space that will be on each split
929  * page, *after accounting for the inserted tuple*.  (If we fail to account
930  * for it, we might find ourselves with too little room on the page that
931  * it needs to go into!)
932  *
933  * If the page is the rightmost page on its level, we instead try to arrange
934  * for twice as much free space on the right as on the left.  In this way,
935  * when we are inserting successively increasing keys (consider sequences,
936  * timestamps, etc) we will end up with a tree whose pages are about 67% full,
937  * instead of the 50% full result that we'd get without this special case.
938  * (We could bias it even further to make the initially-loaded tree more full.
939  * But since the steady-state load for a btree is about 70%, we'd likely just
940  * be making more page-splitting work for ourselves later on, when we start
941  * seeing updates to existing tuples.)
942  *
943  * We are passed the intended insert position of the new tuple, expressed as
944  * the offsetnumber of the tuple it must go in front of.  (This could be
945  * maxoff+1 if the tuple is to go at the end.)
946  *
947  * We return the index of the first existing tuple that should go on the
948  * righthand page, plus a boolean indicating whether the new tuple goes on
949  * the left or right page.      The bool is necessary to disambiguate the case
950  * where firstright == newitemoff.
951  */
952 static OffsetNumber
953 _bt_findsplitloc(Relation rel,
954                                  Page page,
955                                  OffsetNumber newitemoff,
956                                  Size newitemsz,
957                                  bool *newitemonleft)
958 {
959         BTPageOpaque opaque;
960         OffsetNumber offnum;
961         OffsetNumber maxoff;
962         ItemId          itemid;
963         FindSplitData state;
964         int                     leftspace,
965                                 rightspace,
966                                 goodenough,
967                                 dataitemtotal,
968                                 dataitemstoleft;
969
970         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
971
972         /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
973         newitemsz += sizeof(ItemIdData);
974         state.newitemsz = newitemsz;
975         state.is_leaf = P_ISLEAF(opaque);
976         state.is_rightmost = P_RIGHTMOST(opaque);
977         state.have_split = false;
978
979         /* Total free space available on a btree page, after fixed overhead */
980         leftspace = rightspace =
981                 PageGetPageSize(page) - SizeOfPageHeaderData -
982                 MAXALIGN(sizeof(BTPageOpaqueData));
983
984         /*
985          * Finding the best possible split would require checking all the possible
986          * split points, because of the high-key and left-key special cases.
987          * That's probably more work than it's worth; instead, stop as soon as we
988          * find a "good-enough" split, where good-enough is defined as an
989          * imbalance in free space of no more than pagesize/16 (arbitrary...) This
990          * should let us stop near the middle on most pages, instead of plowing to
991          * the end.
992          */
993         goodenough = leftspace / 16;
994
995         /* The right page will have the same high key as the old page */
996         if (!P_RIGHTMOST(opaque))
997         {
998                 itemid = PageGetItemId(page, P_HIKEY);
999                 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1000                                                          sizeof(ItemIdData));
1001         }
1002
1003         /* Count up total space in data items without actually scanning 'em */
1004         dataitemtotal = rightspace - (int) PageGetFreeSpace(page);
1005
1006         /*
1007          * Scan through the data items and calculate space usage for a split at
1008          * each possible position.
1009          */
1010         dataitemstoleft = 0;
1011         maxoff = PageGetMaxOffsetNumber(page);
1012
1013         for (offnum = P_FIRSTDATAKEY(opaque);
1014                  offnum <= maxoff;
1015                  offnum = OffsetNumberNext(offnum))
1016         {
1017                 Size            itemsz;
1018                 int                     leftfree,
1019                                         rightfree;
1020
1021                 itemid = PageGetItemId(page, offnum);
1022                 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1023
1024                 /*
1025                  * We have to allow for the current item becoming the high key of the
1026                  * left page; therefore it counts against left space as well as right
1027                  * space.
1028                  */
1029                 leftfree = leftspace - dataitemstoleft - (int) itemsz;
1030                 rightfree = rightspace - (dataitemtotal - dataitemstoleft);
1031
1032                 /*
1033                  * Will the new item go to left or right of split?
1034                  */
1035                 if (offnum > newitemoff)
1036                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1037                                                           true, itemsz);
1038                 else if (offnum < newitemoff)
1039                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1040                                                           false, itemsz);
1041                 else
1042                 {
1043                         /* need to try it both ways! */
1044                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1045                                                           true, itemsz);
1046                         /* here we are contemplating newitem as first on right */
1047                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1048                                                           false, newitemsz);
1049                 }
1050
1051                 /* Abort scan once we find a good-enough choice */
1052                 if (state.have_split && state.best_delta <= goodenough)
1053                         break;
1054
1055                 dataitemstoleft += itemsz;
1056         }
1057
1058         /*
1059          * I believe it is not possible to fail to find a feasible split, but just
1060          * in case ...
1061          */
1062         if (!state.have_split)
1063                 elog(ERROR, "could not find a feasible split point for \"%s\"",
1064                          RelationGetRelationName(rel));
1065
1066         *newitemonleft = state.newitemonleft;
1067         return state.firstright;
1068 }
1069
1070 /*
1071  * Subroutine to analyze a particular possible split choice (ie, firstright
1072  * and newitemonleft settings), and record the best split so far in *state.
1073  */
1074 static void
1075 _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
1076                                   int leftfree, int rightfree,
1077                                   bool newitemonleft, Size firstrightitemsz)
1078 {
1079         /*
1080          * Account for the new item on whichever side it is to be put.
1081          */
1082         if (newitemonleft)
1083                 leftfree -= (int) state->newitemsz;
1084         else
1085                 rightfree -= (int) state->newitemsz;
1086
1087         /*
1088          * If we are not on the leaf level, we will be able to discard the key
1089          * data from the first item that winds up on the right page.
1090          */
1091         if (!state->is_leaf)
1092                 rightfree += (int) firstrightitemsz -
1093                         (int) (MAXALIGN(sizeof(BTItemData)) + sizeof(ItemIdData));
1094
1095         /*
1096          * If feasible split point, remember best delta.
1097          */
1098         if (leftfree >= 0 && rightfree >= 0)
1099         {
1100                 int                     delta;
1101
1102                 if (state->is_rightmost)
1103                 {
1104                         /*
1105                          * On a rightmost page, try to equalize right free space with
1106                          * twice the left free space.  See comments for _bt_findsplitloc.
1107                          */
1108                         delta = (2 * leftfree) - rightfree;
1109                 }
1110                 else
1111                 {
1112                         /* Otherwise, aim for equal free space on both sides */
1113                         delta = leftfree - rightfree;
1114                 }
1115
1116                 if (delta < 0)
1117                         delta = -delta;
1118                 if (!state->have_split || delta < state->best_delta)
1119                 {
1120                         state->have_split = true;
1121                         state->newitemonleft = newitemonleft;
1122                         state->firstright = firstright;
1123                         state->best_delta = delta;
1124                 }
1125         }
1126 }
1127
1128 /*
1129  * _bt_insert_parent() -- Insert downlink into parent after a page split.
1130  *
1131  * On entry, buf and rbuf are the left and right split pages, which we
1132  * still hold write locks on per the L&Y algorithm.  We release the
1133  * write locks once we have write lock on the parent page.      (Any sooner,
1134  * and it'd be possible for some other process to try to split or delete
1135  * one of these pages, and get confused because it cannot find the downlink.)
1136  *
1137  * stack - stack showing how we got here.  May be NULL in cases that don't
1138  *                      have to be efficient (concurrent ROOT split, WAL recovery)
1139  * is_root - we split the true root
1140  * is_only - we split a page alone on its level (might have been fast root)
1141  *
1142  * This is exported so it can be called by nbtxlog.c.
1143  */
1144 void
1145 _bt_insert_parent(Relation rel,
1146                                   Buffer buf,
1147                                   Buffer rbuf,
1148                                   BTStack stack,
1149                                   bool is_root,
1150                                   bool is_only)
1151 {
1152         /*
1153          * Here we have to do something Lehman and Yao don't talk about: deal with
1154          * a root split and construction of a new root.  If our stack is empty
1155          * then we have just split a node on what had been the root level when we
1156          * descended the tree.  If it was still the root then we perform a
1157          * new-root construction.  If it *wasn't* the root anymore, search to find
1158          * the next higher level that someone constructed meanwhile, and find the
1159          * right place to insert as for the normal case.
1160          *
1161          * If we have to search for the parent level, we do so by re-descending from
1162          * the root.  This is not super-efficient, but it's rare enough not to
1163          * matter.      (This path is also taken when called from WAL recovery --- we
1164          * have no stack in that case.)
1165          */
1166         if (is_root)
1167         {
1168                 Buffer          rootbuf;
1169
1170                 Assert(stack == NULL);
1171                 Assert(is_only);
1172                 /* create a new root node and update the metapage */
1173                 rootbuf = _bt_newroot(rel, buf, rbuf);
1174                 /* release the split buffers */
1175                 _bt_wrtbuf(rel, rootbuf);
1176                 _bt_wrtbuf(rel, rbuf);
1177                 _bt_wrtbuf(rel, buf);
1178         }
1179         else
1180         {
1181                 BlockNumber bknum = BufferGetBlockNumber(buf);
1182                 BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1183                 Page            page = BufferGetPage(buf);
1184                 BTItem          new_item;
1185                 BTStackData fakestack;
1186                 BTItem          ritem;
1187                 Buffer          pbuf;
1188
1189                 if (stack == NULL)
1190                 {
1191                         BTPageOpaque lpageop;
1192
1193                         if (!InRecovery)
1194                                 elog(DEBUG2, "concurrent ROOT page split");
1195                         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1196                         /* Find the leftmost page at the next level up */
1197                         pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
1198                         /* Set up a phony stack entry pointing there */
1199                         stack = &fakestack;
1200                         stack->bts_blkno = BufferGetBlockNumber(pbuf);
1201                         stack->bts_offset = InvalidOffsetNumber;
1202                         /* bts_btitem will be initialized below */
1203                         stack->bts_parent = NULL;
1204                         _bt_relbuf(rel, pbuf);
1205                 }
1206
1207                 /* get high key from left page == lowest key on new right page */
1208                 ritem = (BTItem) PageGetItem(page,
1209                                                                          PageGetItemId(page, P_HIKEY));
1210
1211                 /* form an index tuple that points at the new right page */
1212                 new_item = _bt_formitem(&(ritem->bti_itup));
1213                 ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY);
1214
1215                 /*
1216                  * Find the parent buffer and get the parent page.
1217                  *
1218                  * Oops - if we were moved right then we need to change stack item! We
1219                  * want to find parent pointing to where we are, right ?        - vadim
1220                  * 05/27/97
1221                  */
1222                 ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
1223                                            bknum, P_HIKEY);
1224
1225                 pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1226
1227                 /* Now we can write and unlock the children */
1228                 _bt_wrtbuf(rel, rbuf);
1229                 _bt_wrtbuf(rel, buf);
1230
1231                 /* Check for error only after writing children */
1232                 if (pbuf == InvalidBuffer)
1233                         elog(ERROR, "failed to re-find parent key in \"%s\"",
1234                                  RelationGetRelationName(rel));
1235
1236                 /* Recursively update the parent */
1237                 _bt_insertonpg(rel, pbuf, stack->bts_parent,
1238                                            0, NULL, new_item, stack->bts_offset,
1239                                            is_only);
1240
1241                 /* be tidy */
1242                 pfree(new_item);
1243         }
1244 }
1245
1246 /*
1247  *      _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1248  *                                               we last looked at in the parent.
1249  *
1250  *              This is possible because we save the downlink from the parent item,
1251  *              which is enough to uniquely identify it.  Insertions into the parent
1252  *              level could cause the item to move right; deletions could cause it
1253  *              to move left, but not left of the page we previously found it in.
1254  *
1255  *              Adjusts bts_blkno & bts_offset if changed.
1256  *
1257  *              Returns InvalidBuffer if item not found (should not happen).
1258  */
1259 Buffer
1260 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1261 {
1262         BlockNumber blkno;
1263         OffsetNumber start;
1264
1265         blkno = stack->bts_blkno;
1266         start = stack->bts_offset;
1267
1268         for (;;)
1269         {
1270                 Buffer          buf;
1271                 Page            page;
1272                 BTPageOpaque opaque;
1273
1274                 buf = _bt_getbuf(rel, blkno, access);
1275                 page = BufferGetPage(buf);
1276                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1277
1278                 if (!P_IGNORE(opaque))
1279                 {
1280                         OffsetNumber offnum,
1281                                                 minoff,
1282                                                 maxoff;
1283                         ItemId          itemid;
1284                         BTItem          item;
1285
1286                         minoff = P_FIRSTDATAKEY(opaque);
1287                         maxoff = PageGetMaxOffsetNumber(page);
1288
1289                         /*
1290                          * start = InvalidOffsetNumber means "search the whole page". We
1291                          * need this test anyway due to possibility that page has a high
1292                          * key now when it didn't before.
1293                          */
1294                         if (start < minoff)
1295                                 start = minoff;
1296
1297                         /*
1298                          * Need this check too, to guard against possibility that page
1299                          * split since we visited it originally.
1300                          */
1301                         if (start > maxoff)
1302                                 start = OffsetNumberNext(maxoff);
1303
1304                         /*
1305                          * These loops will check every item on the page --- but in an
1306                          * order that's attuned to the probability of where it actually
1307                          * is.  Scan to the right first, then to the left.
1308                          */
1309                         for (offnum = start;
1310                                  offnum <= maxoff;
1311                                  offnum = OffsetNumberNext(offnum))
1312                         {
1313                                 itemid = PageGetItemId(page, offnum);
1314                                 item = (BTItem) PageGetItem(page, itemid);
1315                                 if (BTItemSame(item, &stack->bts_btitem))
1316                                 {
1317                                         /* Return accurate pointer to where link is now */
1318                                         stack->bts_blkno = blkno;
1319                                         stack->bts_offset = offnum;
1320                                         return buf;
1321                                 }
1322                         }
1323
1324                         for (offnum = OffsetNumberPrev(start);
1325                                  offnum >= minoff;
1326                                  offnum = OffsetNumberPrev(offnum))
1327                         {
1328                                 itemid = PageGetItemId(page, offnum);
1329                                 item = (BTItem) PageGetItem(page, itemid);
1330                                 if (BTItemSame(item, &stack->bts_btitem))
1331                                 {
1332                                         /* Return accurate pointer to where link is now */
1333                                         stack->bts_blkno = blkno;
1334                                         stack->bts_offset = offnum;
1335                                         return buf;
1336                                 }
1337                         }
1338                 }
1339
1340                 /*
1341                  * The item we're looking for moved right at least one page.
1342                  */
1343                 if (P_RIGHTMOST(opaque))
1344                 {
1345                         _bt_relbuf(rel, buf);
1346                         return InvalidBuffer;
1347                 }
1348                 blkno = opaque->btpo_next;
1349                 start = InvalidOffsetNumber;
1350                 _bt_relbuf(rel, buf);
1351         }
1352 }
1353
1354 /*
1355  *      _bt_newroot() -- Create a new root page for the index.
1356  *
1357  *              We've just split the old root page and need to create a new one.
1358  *              In order to do this, we add a new root page to the file, then lock
1359  *              the metadata page and update it.  This is guaranteed to be deadlock-
1360  *              free, because all readers release their locks on the metadata page
1361  *              before trying to lock the root, and all writers lock the root before
1362  *              trying to lock the metadata page.  We have a write lock on the old
1363  *              root page, so we have not introduced any cycles into the waits-for
1364  *              graph.
1365  *
1366  *              On entry, lbuf (the old root) and rbuf (its new peer) are write-
1367  *              locked. On exit, a new root page exists with entries for the
1368  *              two new children, metapage is updated and unlocked/unpinned.
1369  *              The new root buffer is returned to caller which has to unlock/unpin
1370  *              lbuf, rbuf & rootbuf.
1371  */
1372 static Buffer
1373 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1374 {
1375         Buffer          rootbuf;
1376         Page            lpage,
1377                                 rpage,
1378                                 rootpage;
1379         BlockNumber lbkno,
1380                                 rbkno;
1381         BlockNumber rootblknum;
1382         BTPageOpaque rootopaque;
1383         ItemId          itemid;
1384         BTItem          item;
1385         Size            itemsz;
1386         BTItem          new_item;
1387         Buffer          metabuf;
1388         Page            metapg;
1389         BTMetaPageData *metad;
1390
1391         lbkno = BufferGetBlockNumber(lbuf);
1392         rbkno = BufferGetBlockNumber(rbuf);
1393         lpage = BufferGetPage(lbuf);
1394         rpage = BufferGetPage(rbuf);
1395
1396         /* get a new root page */
1397         rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1398         rootpage = BufferGetPage(rootbuf);
1399         rootblknum = BufferGetBlockNumber(rootbuf);
1400
1401         /* acquire lock on the metapage */
1402         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1403         metapg = BufferGetPage(metabuf);
1404         metad = BTPageGetMeta(metapg);
1405
1406         /* NO EREPORT(ERROR) from here till newroot op is logged */
1407         START_CRIT_SECTION();
1408
1409         /* set btree special data */
1410         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1411         rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1412         rootopaque->btpo_flags = BTP_ROOT;
1413         rootopaque->btpo.level =
1414                 ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1415
1416         /* update metapage data */
1417         metad->btm_root = rootblknum;
1418         metad->btm_level = rootopaque->btpo.level;
1419         metad->btm_fastroot = rootblknum;
1420         metad->btm_fastlevel = rootopaque->btpo.level;
1421
1422         /*
1423          * Create downlink item for left page (old root).  Since this will be the
1424          * first item in a non-leaf page, it implicitly has minus-infinity key
1425          * value, so we need not store any actual key in it.
1426          */
1427         itemsz = sizeof(BTItemData);
1428         new_item = (BTItem) palloc(itemsz);
1429         new_item->bti_itup.t_info = itemsz;
1430         ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY);
1431
1432         /*
1433          * Insert the left page pointer into the new root page.  The root page is
1434          * the rightmost page on its level so there is no "high key" in it; the
1435          * two items will go into positions P_HIKEY and P_FIRSTKEY.
1436          */
1437         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
1438                 elog(PANIC, "failed to add leftkey to new root page");
1439         pfree(new_item);
1440
1441         /*
1442          * Create downlink item for right page.  The key for it is obtained from
1443          * the "high key" position in the left page.
1444          */
1445         itemid = PageGetItemId(lpage, P_HIKEY);
1446         itemsz = ItemIdGetLength(itemid);
1447         item = (BTItem) PageGetItem(lpage, itemid);
1448         new_item = _bt_formitem(&(item->bti_itup));
1449         ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY);
1450
1451         /*
1452          * insert the right page pointer into the new root page.
1453          */
1454         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
1455                 elog(PANIC, "failed to add rightkey to new root page");
1456         pfree(new_item);
1457
1458         /* XLOG stuff */
1459         if (!rel->rd_istemp)
1460         {
1461                 xl_btree_newroot xlrec;
1462                 XLogRecPtr      recptr;
1463                 XLogRecData rdata[2];
1464
1465                 xlrec.node = rel->rd_node;
1466                 xlrec.rootblk = rootblknum;
1467                 xlrec.level = metad->btm_level;
1468
1469                 rdata[0].data = (char *) &xlrec;
1470                 rdata[0].len = SizeOfBtreeNewroot;
1471                 rdata[0].buffer = InvalidBuffer;
1472                 rdata[0].next = &(rdata[1]);
1473
1474                 /*
1475                  * Direct access to page is not good but faster - we should implement
1476                  * some new func in page API.
1477                  */
1478                 rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
1479                 rdata[1].len = ((PageHeader) rootpage)->pd_special -
1480                         ((PageHeader) rootpage)->pd_upper;
1481                 rdata[1].buffer = InvalidBuffer;
1482                 rdata[1].next = NULL;
1483
1484                 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
1485
1486                 PageSetLSN(rootpage, recptr);
1487                 PageSetTLI(rootpage, ThisTimeLineID);
1488                 PageSetLSN(metapg, recptr);
1489                 PageSetTLI(metapg, ThisTimeLineID);
1490                 PageSetLSN(lpage, recptr);
1491                 PageSetTLI(lpage, ThisTimeLineID);
1492                 PageSetLSN(rpage, recptr);
1493                 PageSetTLI(rpage, ThisTimeLineID);
1494         }
1495
1496         END_CRIT_SECTION();
1497
1498         /* write and let go of metapage buffer */
1499         _bt_wrtbuf(rel, metabuf);
1500
1501         return (rootbuf);
1502 }
1503
1504 /*
1505  *      _bt_pgaddtup() -- add a tuple to a particular page in the index.
1506  *
1507  *              This routine adds the tuple to the page as requested.  It does
1508  *              not affect pin/lock status, but you'd better have a write lock
1509  *              and pin on the target buffer!  Don't forget to write and release
1510  *              the buffer afterwards, either.
1511  *
1512  *              The main difference between this routine and a bare PageAddItem call
1513  *              is that this code knows that the leftmost data item on a non-leaf
1514  *              btree page doesn't need to have a key.  Therefore, it strips such
1515  *              items down to just the item header.  CAUTION: this works ONLY if
1516  *              we insert the items in order, so that the given itup_off does
1517  *              represent the final position of the item!
1518  */
1519 static void
1520 _bt_pgaddtup(Relation rel,
1521                          Page page,
1522                          Size itemsize,
1523                          BTItem btitem,
1524                          OffsetNumber itup_off,
1525                          const char *where)
1526 {
1527         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1528         BTItemData      truncitem;
1529
1530         if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
1531         {
1532                 memcpy(&truncitem, btitem, sizeof(BTItemData));
1533                 truncitem.bti_itup.t_info = sizeof(BTItemData);
1534                 btitem = &truncitem;
1535                 itemsize = sizeof(BTItemData);
1536         }
1537
1538         if (PageAddItem(page, (Item) btitem, itemsize, itup_off,
1539                                         LP_USED) == InvalidOffsetNumber)
1540                 elog(PANIC, "failed to add item to the %s for \"%s\"",
1541                          where, RelationGetRelationName(rel));
1542 }
1543
1544 /*
1545  * _bt_isequal - used in _bt_doinsert in check for duplicates.
1546  *
1547  * This is very similar to _bt_compare, except for NULL handling.
1548  * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
1549  */
1550 static bool
1551 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
1552                         int keysz, ScanKey scankey)
1553 {
1554         BTItem          btitem;
1555         IndexTuple      itup;
1556         int                     i;
1557
1558         /* Better be comparing to a leaf item */
1559         Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
1560
1561         btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
1562         itup = &(btitem->bti_itup);
1563
1564         for (i = 1; i <= keysz; i++)
1565         {
1566                 AttrNumber      attno;
1567                 Datum           datum;
1568                 bool            isNull;
1569                 int32           result;
1570
1571                 attno = scankey->sk_attno;
1572                 Assert(attno == i);
1573                 datum = index_getattr(itup, attno, itupdesc, &isNull);
1574
1575                 /* NULLs are never equal to anything */
1576                 if (isNull || (scankey->sk_flags & SK_ISNULL))
1577                         return false;
1578
1579                 result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
1580                                                                                          datum,
1581                                                                                          scankey->sk_argument));
1582
1583                 if (result != 0)
1584                         return false;
1585
1586                 scankey++;
1587         }
1588
1589         /* if we get here, the keys are equal */
1590         return true;
1591 }