]> granicus.if.org Git - postgresql/blob - src/backend/access/nbtree/nbtinsert.c
868a91ab3a5efa16c1294d7389693f7c31b26916
[postgresql] / src / backend / access / nbtree / nbtinsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * nbtinsert.c
4  *        Item insertion in Lehman and Yao btrees for Postgres.
5  *
6  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.120 2005/03/21 01:23:59 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "miscadmin.h"
21
22
23 typedef struct
24 {
25         /* context data for _bt_checksplitloc */
26         Size            newitemsz;              /* size of new item to be inserted */
27         bool            is_leaf;                /* T if splitting a leaf page */
28         bool            is_rightmost;   /* T if splitting a rightmost page */
29
30         bool            have_split;             /* found a valid split? */
31
32         /* these fields valid only if have_split is true */
33         bool            newitemonleft;  /* new item on left or right of best split */
34         OffsetNumber firstright;        /* best split point */
35         int                     best_delta;             /* best size delta so far */
36 } FindSplitData;
37
38
39 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
40
41 static TransactionId _bt_check_unique(Relation rel, BTItem btitem,
42                                  Relation heapRel, Buffer buf,
43                                  ScanKey itup_scankey);
44 static void _bt_insertonpg(Relation rel, Buffer buf,
45                            BTStack stack,
46                            int keysz, ScanKey scankey,
47                            BTItem btitem,
48                            OffsetNumber afteritem,
49                            bool split_only_page);
50 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
51                   OffsetNumber newitemoff, Size newitemsz,
52                   BTItem newitem, bool newitemonleft,
53                   OffsetNumber *itup_off, BlockNumber *itup_blkno);
54 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
55                                  OffsetNumber newitemoff,
56                                  Size newitemsz,
57                                  bool *newitemonleft);
58 static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
59                                   int leftfree, int rightfree,
60                                   bool newitemonleft, Size firstrightitemsz);
61 static void _bt_pgaddtup(Relation rel, Page page,
62                          Size itemsize, BTItem btitem,
63                          OffsetNumber itup_off, const char *where);
64 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
65                         int keysz, ScanKey scankey);
66
67
68 /*
69  *      _bt_doinsert() -- Handle insertion of a single btitem in the tree.
70  *
71  *              This routine is called by the public interface routines, btbuild
72  *              and btinsert.  By here, btitem is filled in, including the TID.
73  */
74 void
75 _bt_doinsert(Relation rel, BTItem btitem,
76                          bool index_is_unique, Relation heapRel)
77 {
78         IndexTuple      itup = &(btitem->bti_itup);
79         int                     natts = rel->rd_rel->relnatts;
80         ScanKey         itup_scankey;
81         BTStack         stack;
82         Buffer          buf;
83
84         /* we need a scan key to do our search, so build one */
85         itup_scankey = _bt_mkscankey(rel, itup);
86
87 top:
88         /* find the first page containing this key */
89         stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
90
91         /* trade in our read lock for a write lock */
92         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
93         LockBuffer(buf, BT_WRITE);
94
95         /*
96          * If the page was split between the time that we surrendered our read
97          * lock and acquired our write lock, then this page may no longer be
98          * the right place for the key we want to insert.  In this case, we
99          * need to move right in the tree.      See Lehman and Yao for an
100          * excruciatingly precise description.
101          */
102         buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
103
104         /*
105          * If we're not allowing duplicates, make sure the key isn't already
106          * in the index.
107          *
108          * NOTE: obviously, _bt_check_unique can only detect keys that are
109          * already in the index; so it cannot defend against concurrent
110          * insertions of the same key.  We protect against that by means of
111          * holding a write lock on the target page.  Any other would-be
112          * inserter of the same key must acquire a write lock on the same
113          * target page, so only one would-be inserter can be making the check
114          * at one time.  Furthermore, once we are past the check we hold write
115          * locks continuously until we have performed our insertion, so no
116          * later inserter can fail to see our insertion.  (This requires some
117          * care in _bt_insertonpg.)
118          *
119          * If we must wait for another xact, we release the lock while waiting,
120          * and then must start over completely.
121          */
122         if (index_is_unique)
123         {
124                 TransactionId xwait;
125
126                 xwait = _bt_check_unique(rel, btitem, heapRel, buf, itup_scankey);
127
128                 if (TransactionIdIsValid(xwait))
129                 {
130                         /* Have to wait for the other guy ... */
131                         _bt_relbuf(rel, buf);
132                         XactLockTableWait(xwait);
133                         /* start over... */
134                         _bt_freestack(stack);
135                         goto top;
136                 }
137         }
138
139         /* do the insertion */
140         _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem, 0, false);
141
142         /* be tidy */
143         _bt_freestack(stack);
144         _bt_freeskey(itup_scankey);
145 }
146
147 /*
148  *      _bt_check_unique() -- Check for violation of unique index constraint
149  *
150  * Returns InvalidTransactionId if there is no conflict, else an xact ID
151  * we must wait for to see if it commits a conflicting tuple.   If an actual
152  * conflict is detected, no return --- just ereport().
153  */
154 static TransactionId
155 _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel,
156                                  Buffer buf, ScanKey itup_scankey)
157 {
158         TupleDesc       itupdesc = RelationGetDescr(rel);
159         int                     natts = rel->rd_rel->relnatts;
160         OffsetNumber offset,
161                                 maxoff;
162         Page            page;
163         BTPageOpaque opaque;
164         Buffer          nbuf = InvalidBuffer;
165
166         page = BufferGetPage(buf);
167         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
168         maxoff = PageGetMaxOffsetNumber(page);
169
170         /*
171          * Find first item >= proposed new item.  Note we could also get a
172          * pointer to end-of-page here.
173          */
174         offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
175
176         /*
177          * Scan over all equal tuples, looking for live conflicts.
178          */
179         for (;;)
180         {
181                 HeapTupleData htup;
182                 Buffer          hbuffer;
183                 ItemId          curitemid;
184                 BTItem          cbti;
185                 BlockNumber nblkno;
186
187                 /*
188                  * make sure the offset points to an actual item before trying to
189                  * examine it...
190                  */
191                 if (offset <= maxoff)
192                 {
193                         curitemid = PageGetItemId(page, offset);
194
195                         /*
196                          * We can skip items that are marked killed.
197                          *
198                          * Formerly, we applied _bt_isequal() before checking the kill
199                          * flag, so as to fall out of the item loop as soon as
200                          * possible. However, in the presence of heavy update activity
201                          * an index may contain many killed items with the same key;
202                          * running _bt_isequal() on each killed item gets expensive.
203                          * Furthermore it is likely that the non-killed version of
204                          * each key appears first, so that we didn't actually get to
205                          * exit any sooner anyway. So now we just advance over killed
206                          * items as quickly as we can. We only apply _bt_isequal()
207                          * when we get to a non-killed item or the end of the page.
208                          */
209                         if (!ItemIdDeleted(curitemid))
210                         {
211                                 /*
212                                  * _bt_compare returns 0 for (1,NULL) and (1,NULL) -
213                                  * this's how we handling NULLs - and so we must not use
214                                  * _bt_compare in real comparison, but only for
215                                  * ordering/finding items on pages. - vadim 03/24/97
216                                  */
217                                 if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
218                                         break;          /* we're past all the equal tuples */
219
220                                 /* okay, we gotta fetch the heap tuple ... */
221                                 cbti = (BTItem) PageGetItem(page, curitemid);
222                                 htup.t_self = cbti->bti_itup.t_tid;
223                                 if (heap_fetch(heapRel, SnapshotDirty, &htup, &hbuffer,
224                                                            true, NULL))
225                                 {
226                                         /* it is a duplicate */
227                                         TransactionId xwait =
228                                         (TransactionIdIsValid(SnapshotDirty->xmin)) ?
229                                         SnapshotDirty->xmin : SnapshotDirty->xmax;
230
231                                         ReleaseBuffer(hbuffer);
232
233                                         /*
234                                          * If this tuple is being updated by other transaction
235                                          * then we have to wait for its commit/abort.
236                                          */
237                                         if (TransactionIdIsValid(xwait))
238                                         {
239                                                 if (nbuf != InvalidBuffer)
240                                                         _bt_relbuf(rel, nbuf);
241                                                 /* Tell _bt_doinsert to wait... */
242                                                 return xwait;
243                                         }
244
245                                         /*
246                                          * Otherwise we have a definite conflict.
247                                          */
248                                         ereport(ERROR,
249                                                         (errcode(ERRCODE_UNIQUE_VIOLATION),
250                                                          errmsg("duplicate key violates unique constraint \"%s\"",
251                                                                         RelationGetRelationName(rel))));
252                                 }
253                                 else if (htup.t_data != NULL)
254                                 {
255                                         /*
256                                          * Hmm, if we can't see the tuple, maybe it can be
257                                          * marked killed.  This logic should match
258                                          * index_getnext and btgettuple.
259                                          */
260                                         LockBuffer(hbuffer, BUFFER_LOCK_SHARE);
261                                         if (HeapTupleSatisfiesVacuum(htup.t_data, RecentGlobalXmin,
262                                                                                                  hbuffer) == HEAPTUPLE_DEAD)
263                                         {
264                                                 curitemid->lp_flags |= LP_DELETE;
265                                                 SetBufferCommitInfoNeedsSave(buf);
266                                         }
267                                         LockBuffer(hbuffer, BUFFER_LOCK_UNLOCK);
268                                 }
269                                 ReleaseBuffer(hbuffer);
270                         }
271                 }
272
273                 /*
274                  * Advance to next tuple to continue checking.
275                  */
276                 if (offset < maxoff)
277                         offset = OffsetNumberNext(offset);
278                 else
279                 {
280                         /* If scankey == hikey we gotta check the next page too */
281                         if (P_RIGHTMOST(opaque))
282                                 break;
283                         if (!_bt_isequal(itupdesc, page, P_HIKEY,
284                                                          natts, itup_scankey))
285                                 break;
286                         /* Advance to next non-dead page --- there must be one */
287                         for (;;)
288                         {
289                                 nblkno = opaque->btpo_next;
290                                 nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
291                                 page = BufferGetPage(nbuf);
292                                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
293                                 if (!P_IGNORE(opaque))
294                                         break;
295                                 if (P_RIGHTMOST(opaque))
296                                         elog(ERROR, "fell off the end of \"%s\"",
297                                                  RelationGetRelationName(rel));
298                         }
299                         maxoff = PageGetMaxOffsetNumber(page);
300                         offset = P_FIRSTDATAKEY(opaque);
301                 }
302         }
303
304         if (nbuf != InvalidBuffer)
305                 _bt_relbuf(rel, nbuf);
306
307         return InvalidTransactionId;
308 }
309
310 /*----------
311  *      _bt_insertonpg() -- Insert a tuple on a particular page in the index.
312  *
313  *              This recursive procedure does the following things:
314  *
315  *                      +  finds the right place to insert the tuple.
316  *                      +  if necessary, splits the target page (making sure that the
317  *                         split is equitable as far as post-insert free space goes).
318  *                      +  inserts the tuple.
319  *                      +  if the page was split, pops the parent stack, and finds the
320  *                         right place to insert the new child pointer (by walking
321  *                         right using information stored in the parent stack).
322  *                      +  invokes itself with the appropriate tuple for the right
323  *                         child page on the parent.
324  *                      +  updates the metapage if a true root or fast root is split.
325  *
326  *              On entry, we must have the right buffer on which to do the
327  *              insertion, and the buffer must be pinned and locked.  On return,
328  *              we will have dropped both the pin and the write lock on the buffer.
329  *
330  *              If 'afteritem' is >0 then the new tuple must be inserted after the
331  *              existing item of that number, noplace else.  If 'afteritem' is 0
332  *              then the procedure finds the exact spot to insert it by searching.
333  *              (keysz and scankey parameters are used ONLY if afteritem == 0.)
334  *
335  *              NOTE: if the new key is equal to one or more existing keys, we can
336  *              legitimately place it anywhere in the series of equal keys --- in fact,
337  *              if the new key is equal to the page's "high key" we can place it on
338  *              the next page.  If it is equal to the high key, and there's not room
339  *              to insert the new tuple on the current page without splitting, then
340  *              we can move right hoping to find more free space and avoid a split.
341  *              (We should not move right indefinitely, however, since that leads to
342  *              O(N^2) insertion behavior in the presence of many equal keys.)
343  *              Once we have chosen the page to put the key on, we'll insert it before
344  *              any existing equal keys because of the way _bt_binsrch() works.
345  *
346  *              The locking interactions in this code are critical.  You should
347  *              grok Lehman and Yao's paper before making any changes.  In addition,
348  *              you need to understand how we disambiguate duplicate keys in this
349  *              implementation, in order to be able to find our location using
350  *              L&Y "move right" operations.  Since we may insert duplicate user
351  *              keys, and since these dups may propagate up the tree, we use the
352  *              'afteritem' parameter to position ourselves correctly for the
353  *              insertion on internal pages.
354  *----------
355  */
356 static void
357 _bt_insertonpg(Relation rel,
358                            Buffer buf,
359                            BTStack stack,
360                            int keysz,
361                            ScanKey scankey,
362                            BTItem btitem,
363                            OffsetNumber afteritem,
364                            bool split_only_page)
365 {
366         Page            page;
367         BTPageOpaque lpageop;
368         OffsetNumber itup_off;
369         BlockNumber itup_blkno;
370         OffsetNumber newitemoff;
371         OffsetNumber firstright = InvalidOffsetNumber;
372         Size            itemsz;
373
374         page = BufferGetPage(buf);
375         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
376
377         itemsz = IndexTupleDSize(btitem->bti_itup)
378                 + (sizeof(BTItemData) - sizeof(IndexTupleData));
379
380         itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but
381                                                                  * we need to be consistent */
382
383         /*
384          * Check whether the item can fit on a btree page at all. (Eventually,
385          * we ought to try to apply TOAST methods if not.) We actually need to
386          * be able to fit three items on every page, so restrict any one item
387          * to 1/3 the per-page available space. Note that at this point,
388          * itemsz doesn't include the ItemId.
389          */
390         if (itemsz > BTMaxItemSize(page))
391                 ereport(ERROR,
392                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
393                                  errmsg("index row size %lu exceeds btree maximum, %lu",
394                                                 (unsigned long) itemsz,
395                                                 (unsigned long) BTMaxItemSize(page))));
396
397         /*
398          * Determine exactly where new item will go.
399          */
400         if (afteritem > 0)
401                 newitemoff = afteritem + 1;
402         else
403         {
404                 /*----------
405                  * If we will need to split the page to put the item here,
406                  * check whether we can put the tuple somewhere to the right,
407                  * instead.  Keep scanning right until we
408                  *              (a) find a page with enough free space,
409                  *              (b) reach the last page where the tuple can legally go, or
410                  *              (c) get tired of searching.
411                  * (c) is not flippant; it is important because if there are many
412                  * pages' worth of equal keys, it's better to split one of the early
413                  * pages than to scan all the way to the end of the run of equal keys
414                  * on every insert.  We implement "get tired" as a random choice,
415                  * since stopping after scanning a fixed number of pages wouldn't work
416                  * well (we'd never reach the right-hand side of previously split
417                  * pages).      Currently the probability of moving right is set at 0.99,
418                  * which may seem too high to change the behavior much, but it does an
419                  * excellent job of preventing O(N^2) behavior with many equal keys.
420                  *----------
421                  */
422                 bool            movedright = false;
423
424                 while (PageGetFreeSpace(page) < itemsz &&
425                            !P_RIGHTMOST(lpageop) &&
426                            _bt_compare(rel, keysz, scankey, page, P_HIKEY) == 0 &&
427                            random() > (MAX_RANDOM_VALUE / 100))
428                 {
429                         /*
430                          * step right to next non-dead page
431                          *
432                          * must write-lock that page before releasing write lock on
433                          * current page; else someone else's _bt_check_unique scan
434                          * could fail to see our insertion.  write locks on
435                          * intermediate dead pages won't do because we don't know when
436                          * they will get de-linked from the tree.
437                          */
438                         Buffer          rbuf = InvalidBuffer;
439
440                         for (;;)
441                         {
442                                 BlockNumber rblkno = lpageop->btpo_next;
443
444                                 rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
445                                 page = BufferGetPage(rbuf);
446                                 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
447                                 if (!P_IGNORE(lpageop))
448                                         break;
449                                 if (P_RIGHTMOST(lpageop))
450                                         elog(ERROR, "fell off the end of \"%s\"",
451                                                  RelationGetRelationName(rel));
452                         }
453                         _bt_relbuf(rel, buf);
454                         buf = rbuf;
455                         movedright = true;
456                 }
457
458                 /*
459                  * Now we are on the right page, so find the insert position. If
460                  * we moved right at all, we know we should insert at the start of
461                  * the page, else must find the position by searching.
462                  */
463                 if (movedright)
464                         newitemoff = P_FIRSTDATAKEY(lpageop);
465                 else
466                         newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
467         }
468
469         /*
470          * Do we need to split the page to fit the item on it?
471          *
472          * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
473          * so this comparison is correct even though we appear to be
474          * accounting only for the item and not for its line pointer.
475          */
476         if (PageGetFreeSpace(page) < itemsz)
477         {
478                 bool            is_root = P_ISROOT(lpageop);
479                 bool            is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
480                 bool            newitemonleft;
481                 Buffer          rbuf;
482
483                 /* Choose the split point */
484                 firstright = _bt_findsplitloc(rel, page,
485                                                                           newitemoff, itemsz,
486                                                                           &newitemonleft);
487
488                 /* split the buffer into left and right halves */
489                 rbuf = _bt_split(rel, buf, firstright,
490                                                  newitemoff, itemsz, btitem, newitemonleft,
491                                                  &itup_off, &itup_blkno);
492
493                 /*----------
494                  * By here,
495                  *
496                  *              +  our target page has been split;
497                  *              +  the original tuple has been inserted;
498                  *              +  we have write locks on both the old (left half)
499                  *                 and new (right half) buffers, after the split; and
500                  *              +  we know the key we want to insert into the parent
501                  *                 (it's the "high key" on the left child page).
502                  *
503                  * We're ready to do the parent insertion.  We need to hold onto the
504                  * locks for the child pages until we locate the parent, but we can
505                  * release them before doing the actual insertion (see Lehman and Yao
506                  * for the reasoning).
507                  *----------
508                  */
509                 _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
510         }
511         else
512         {
513                 Buffer          metabuf = InvalidBuffer;
514                 Page            metapg = NULL;
515                 BTMetaPageData *metad = NULL;
516
517                 itup_off = newitemoff;
518                 itup_blkno = BufferGetBlockNumber(buf);
519
520                 /*
521                  * If we are doing this insert because we split a page that was
522                  * the only one on its tree level, but was not the root, it may
523                  * have been the "fast root".  We need to ensure that the fast
524                  * root link points at or above the current page.  We can safely
525                  * acquire a lock on the metapage here --- see comments for
526                  * _bt_newroot().
527                  */
528                 if (split_only_page)
529                 {
530                         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
531                         metapg = BufferGetPage(metabuf);
532                         metad = BTPageGetMeta(metapg);
533
534                         if (metad->btm_fastlevel >= lpageop->btpo.level)
535                         {
536                                 /* no update wanted */
537                                 _bt_relbuf(rel, metabuf);
538                                 metabuf = InvalidBuffer;
539                         }
540                 }
541
542                 /* Do the update.  No ereport(ERROR) until changes are logged */
543                 START_CRIT_SECTION();
544
545                 _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
546
547                 if (BufferIsValid(metabuf))
548                 {
549                         metad->btm_fastroot = itup_blkno;
550                         metad->btm_fastlevel = lpageop->btpo.level;
551                 }
552
553                 /* XLOG stuff */
554                 if (!rel->rd_istemp)
555                 {
556                         xl_btree_insert xlrec;
557                         xl_btree_metadata xlmeta;
558                         uint8           xlinfo;
559                         XLogRecPtr      recptr;
560                         XLogRecData rdata[3];
561                         XLogRecData *nextrdata;
562                         BTItemData      truncitem;
563
564                         xlrec.target.node = rel->rd_node;
565                         ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
566
567                         rdata[0].buffer = InvalidBuffer;
568                         rdata[0].data = (char *) &xlrec;
569                         rdata[0].len = SizeOfBtreeInsert;
570                         rdata[0].next = nextrdata = &(rdata[1]);
571
572                         if (BufferIsValid(metabuf))
573                         {
574                                 xlmeta.root = metad->btm_root;
575                                 xlmeta.level = metad->btm_level;
576                                 xlmeta.fastroot = metad->btm_fastroot;
577                                 xlmeta.fastlevel = metad->btm_fastlevel;
578
579                                 nextrdata->buffer = InvalidBuffer;
580                                 nextrdata->data = (char *) &xlmeta;
581                                 nextrdata->len = sizeof(xl_btree_metadata);
582                                 nextrdata->next = nextrdata + 1;
583                                 nextrdata++;
584                                 xlinfo = XLOG_BTREE_INSERT_META;
585                         }
586                         else if (P_ISLEAF(lpageop))
587                                 xlinfo = XLOG_BTREE_INSERT_LEAF;
588                         else
589                                 xlinfo = XLOG_BTREE_INSERT_UPPER;
590
591                         /* Read comments in _bt_pgaddtup */
592                         if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
593                         {
594                                 truncitem = *btitem;
595                                 truncitem.bti_itup.t_info = sizeof(BTItemData);
596                                 nextrdata->data = (char *) &truncitem;
597                                 nextrdata->len = sizeof(BTItemData);
598                         }
599                         else
600                         {
601                                 nextrdata->data = (char *) btitem;
602                                 nextrdata->len = IndexTupleDSize(btitem->bti_itup) +
603                                         (sizeof(BTItemData) - sizeof(IndexTupleData));
604                         }
605                         nextrdata->buffer = buf;
606                         nextrdata->next = NULL;
607
608                         recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
609
610                         if (BufferIsValid(metabuf))
611                         {
612                                 PageSetLSN(metapg, recptr);
613                                 PageSetTLI(metapg, ThisTimeLineID);
614                         }
615
616                         PageSetLSN(page, recptr);
617                         PageSetTLI(page, ThisTimeLineID);
618                 }
619
620                 END_CRIT_SECTION();
621
622                 /* Write out the updated page and release pin/lock */
623                 if (BufferIsValid(metabuf))
624                         _bt_wrtbuf(rel, metabuf);
625
626                 _bt_wrtbuf(rel, buf);
627         }
628 }
629
630 /*
631  *      _bt_split() -- split a page in the btree.
632  *
633  *              On entry, buf is the page to split, and is write-locked and pinned.
634  *              firstright is the item index of the first item to be moved to the
635  *              new right page.  newitemoff etc. tell us about the new item that
636  *              must be inserted along with the data from the old page.
637  *
638  *              Returns the new right sibling of buf, pinned and write-locked.
639  *              The pin and lock on buf are maintained.  *itup_off and *itup_blkno
640  *              are set to the exact location where newitem was inserted.
641  */
642 static Buffer
643 _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
644                   OffsetNumber newitemoff, Size newitemsz, BTItem newitem,
645                   bool newitemonleft,
646                   OffsetNumber *itup_off, BlockNumber *itup_blkno)
647 {
648         Buffer          rbuf;
649         Page            origpage;
650         Page            leftpage,
651                                 rightpage;
652         BTPageOpaque ropaque,
653                                 lopaque,
654                                 oopaque;
655         Buffer          sbuf = InvalidBuffer;
656         Page            spage = NULL;
657         BTPageOpaque sopaque = NULL;
658         Size            itemsz;
659         ItemId          itemid;
660         BTItem          item;
661         OffsetNumber leftoff,
662                                 rightoff;
663         OffsetNumber maxoff;
664         OffsetNumber i;
665
666         rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
667         origpage = BufferGetPage(buf);
668         leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
669         rightpage = BufferGetPage(rbuf);
670
671         _bt_pageinit(leftpage, BufferGetPageSize(buf));
672         _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
673
674         /* init btree private data */
675         oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
676         lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
677         ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
678
679         /* if we're splitting this page, it won't be the root when we're done */
680         lopaque->btpo_flags = oopaque->btpo_flags;
681         lopaque->btpo_flags &= ~BTP_ROOT;
682         ropaque->btpo_flags = lopaque->btpo_flags;
683         lopaque->btpo_prev = oopaque->btpo_prev;
684         lopaque->btpo_next = BufferGetBlockNumber(rbuf);
685         ropaque->btpo_prev = BufferGetBlockNumber(buf);
686         ropaque->btpo_next = oopaque->btpo_next;
687         lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
688
689         /*
690          * If the page we're splitting is not the rightmost page at its level
691          * in the tree, then the first entry on the page is the high key for
692          * the page.  We need to copy that to the right half.  Otherwise
693          * (meaning the rightmost page case), all the items on the right half
694          * will be user data.
695          */
696         rightoff = P_HIKEY;
697
698         if (!P_RIGHTMOST(oopaque))
699         {
700                 itemid = PageGetItemId(origpage, P_HIKEY);
701                 itemsz = ItemIdGetLength(itemid);
702                 item = (BTItem) PageGetItem(origpage, itemid);
703                 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
704                                                 LP_USED) == InvalidOffsetNumber)
705                         elog(PANIC, "failed to add hikey to the right sibling");
706                 rightoff = OffsetNumberNext(rightoff);
707         }
708
709         /*
710          * The "high key" for the new left page will be the first key that's
711          * going to go into the new right page.  This might be either the
712          * existing data item at position firstright, or the incoming tuple.
713          */
714         leftoff = P_HIKEY;
715         if (!newitemonleft && newitemoff == firstright)
716         {
717                 /* incoming tuple will become first on right page */
718                 itemsz = newitemsz;
719                 item = newitem;
720         }
721         else
722         {
723                 /* existing item at firstright will become first on right page */
724                 itemid = PageGetItemId(origpage, firstright);
725                 itemsz = ItemIdGetLength(itemid);
726                 item = (BTItem) PageGetItem(origpage, itemid);
727         }
728         if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
729                                         LP_USED) == InvalidOffsetNumber)
730                 elog(PANIC, "failed to add hikey to the left sibling");
731         leftoff = OffsetNumberNext(leftoff);
732
733         /*
734          * Now transfer all the data items to the appropriate page
735          */
736         maxoff = PageGetMaxOffsetNumber(origpage);
737
738         for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
739         {
740                 itemid = PageGetItemId(origpage, i);
741                 itemsz = ItemIdGetLength(itemid);
742                 item = (BTItem) PageGetItem(origpage, itemid);
743
744                 /* does new item belong before this one? */
745                 if (i == newitemoff)
746                 {
747                         if (newitemonleft)
748                         {
749                                 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
750                                                          "left sibling");
751                                 *itup_off = leftoff;
752                                 *itup_blkno = BufferGetBlockNumber(buf);
753                                 leftoff = OffsetNumberNext(leftoff);
754                         }
755                         else
756                         {
757                                 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
758                                                          "right sibling");
759                                 *itup_off = rightoff;
760                                 *itup_blkno = BufferGetBlockNumber(rbuf);
761                                 rightoff = OffsetNumberNext(rightoff);
762                         }
763                 }
764
765                 /* decide which page to put it on */
766                 if (i < firstright)
767                 {
768                         _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
769                                                  "left sibling");
770                         leftoff = OffsetNumberNext(leftoff);
771                 }
772                 else
773                 {
774                         _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
775                                                  "right sibling");
776                         rightoff = OffsetNumberNext(rightoff);
777                 }
778         }
779
780         /* cope with possibility that newitem goes at the end */
781         if (i <= newitemoff)
782         {
783                 if (newitemonleft)
784                 {
785                         _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
786                                                  "left sibling");
787                         *itup_off = leftoff;
788                         *itup_blkno = BufferGetBlockNumber(buf);
789                         leftoff = OffsetNumberNext(leftoff);
790                 }
791                 else
792                 {
793                         _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
794                                                  "right sibling");
795                         *itup_off = rightoff;
796                         *itup_blkno = BufferGetBlockNumber(rbuf);
797                         rightoff = OffsetNumberNext(rightoff);
798                 }
799         }
800
801         /*
802          * We have to grab the right sibling (if any) and fix the prev pointer
803          * there. We are guaranteed that this is deadlock-free since no other
804          * writer will be holding a lock on that page and trying to move left,
805          * and all readers release locks on a page before trying to fetch its
806          * neighbors.
807          */
808
809         if (!P_RIGHTMOST(ropaque))
810         {
811                 sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
812                 spage = BufferGetPage(sbuf);
813                 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
814                 if (sopaque->btpo_prev != ropaque->btpo_prev)
815                         elog(PANIC, "right sibling's left-link doesn't match");
816         }
817
818         /*
819          * Right sibling is locked, new siblings are prepared, but original
820          * page is not updated yet. Log changes before continuing.
821          *
822          * NO EREPORT(ERROR) till right sibling is updated.
823          */
824         START_CRIT_SECTION();
825
826         if (!P_RIGHTMOST(ropaque))
827                 sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
828
829         /* XLOG stuff */
830         if (!rel->rd_istemp)
831         {
832                 xl_btree_split xlrec;
833                 uint8           xlinfo;
834                 XLogRecPtr      recptr;
835                 XLogRecData rdata[4];
836
837                 xlrec.target.node = rel->rd_node;
838                 ItemPointerSet(&(xlrec.target.tid), *itup_blkno, *itup_off);
839                 if (newitemonleft)
840                         xlrec.otherblk = BufferGetBlockNumber(rbuf);
841                 else
842                         xlrec.otherblk = BufferGetBlockNumber(buf);
843                 xlrec.leftblk = lopaque->btpo_prev;
844                 xlrec.rightblk = ropaque->btpo_next;
845                 xlrec.level = lopaque->btpo.level;
846
847                 /*
848                  * Direct access to page is not good but faster - we should
849                  * implement some new func in page API.  Note we only store the
850                  * tuples themselves, knowing that the item pointers are in the
851                  * same order and can be reconstructed by scanning the tuples.
852                  */
853                 xlrec.leftlen = ((PageHeader) leftpage)->pd_special -
854                         ((PageHeader) leftpage)->pd_upper;
855
856                 rdata[0].buffer = InvalidBuffer;
857                 rdata[0].data = (char *) &xlrec;
858                 rdata[0].len = SizeOfBtreeSplit;
859                 rdata[0].next = &(rdata[1]);
860
861                 rdata[1].buffer = InvalidBuffer;
862                 rdata[1].data = (char *) leftpage + ((PageHeader) leftpage)->pd_upper;
863                 rdata[1].len = xlrec.leftlen;
864                 rdata[1].next = &(rdata[2]);
865
866                 rdata[2].buffer = InvalidBuffer;
867                 rdata[2].data = (char *) rightpage + ((PageHeader) rightpage)->pd_upper;
868                 rdata[2].len = ((PageHeader) rightpage)->pd_special -
869                         ((PageHeader) rightpage)->pd_upper;
870                 rdata[2].next = NULL;
871
872                 if (!P_RIGHTMOST(ropaque))
873                 {
874                         rdata[2].next = &(rdata[3]);
875                         rdata[3].buffer = sbuf;
876                         rdata[3].data = NULL;
877                         rdata[3].len = 0;
878                         rdata[3].next = NULL;
879                 }
880
881                 if (P_ISROOT(oopaque))
882                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
883                 else
884                         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
885
886                 recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
887
888                 PageSetLSN(leftpage, recptr);
889                 PageSetTLI(leftpage, ThisTimeLineID);
890                 PageSetLSN(rightpage, recptr);
891                 PageSetTLI(rightpage, ThisTimeLineID);
892                 if (!P_RIGHTMOST(ropaque))
893                 {
894                         PageSetLSN(spage, recptr);
895                         PageSetTLI(spage, ThisTimeLineID);
896                 }
897         }
898
899         /*
900          * By here, the original data page has been split into two new halves,
901          * and these are correct.  The algorithm requires that the left page
902          * never move during a split, so we copy the new left page back on top
903          * of the original.  Note that this is not a waste of time, since we
904          * also require (in the page management code) that the center of a
905          * page always be clean, and the most efficient way to guarantee this
906          * is just to compact the data by reinserting it into a new left page.
907          */
908
909         PageRestoreTempPage(leftpage, origpage);
910
911         END_CRIT_SECTION();
912
913         /* write and release the old right sibling */
914         if (!P_RIGHTMOST(ropaque))
915                 _bt_wrtbuf(rel, sbuf);
916
917         /* split's done */
918         return rbuf;
919 }
920
921 /*
922  *      _bt_findsplitloc() -- find an appropriate place to split a page.
923  *
924  * The idea here is to equalize the free space that will be on each split
925  * page, *after accounting for the inserted tuple*.  (If we fail to account
926  * for it, we might find ourselves with too little room on the page that
927  * it needs to go into!)
928  *
929  * If the page is the rightmost page on its level, we instead try to arrange
930  * for twice as much free space on the right as on the left.  In this way,
931  * when we are inserting successively increasing keys (consider sequences,
932  * timestamps, etc) we will end up with a tree whose pages are about 67% full,
933  * instead of the 50% full result that we'd get without this special case.
934  * (We could bias it even further to make the initially-loaded tree more full.
935  * But since the steady-state load for a btree is about 70%, we'd likely just
936  * be making more page-splitting work for ourselves later on, when we start
937  * seeing updates to existing tuples.)
938  *
939  * We are passed the intended insert position of the new tuple, expressed as
940  * the offsetnumber of the tuple it must go in front of.  (This could be
941  * maxoff+1 if the tuple is to go at the end.)
942  *
943  * We return the index of the first existing tuple that should go on the
944  * righthand page, plus a boolean indicating whether the new tuple goes on
945  * the left or right page.      The bool is necessary to disambiguate the case
946  * where firstright == newitemoff.
947  */
948 static OffsetNumber
949 _bt_findsplitloc(Relation rel,
950                                  Page page,
951                                  OffsetNumber newitemoff,
952                                  Size newitemsz,
953                                  bool *newitemonleft)
954 {
955         BTPageOpaque opaque;
956         OffsetNumber offnum;
957         OffsetNumber maxoff;
958         ItemId          itemid;
959         FindSplitData state;
960         int                     leftspace,
961                                 rightspace,
962                                 goodenough,
963                                 dataitemtotal,
964                                 dataitemstoleft;
965
966         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
967
968         /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
969         newitemsz += sizeof(ItemIdData);
970         state.newitemsz = newitemsz;
971         state.is_leaf = P_ISLEAF(opaque);
972         state.is_rightmost = P_RIGHTMOST(opaque);
973         state.have_split = false;
974
975         /* Total free space available on a btree page, after fixed overhead */
976         leftspace = rightspace =
977                 PageGetPageSize(page) - SizeOfPageHeaderData -
978                 MAXALIGN(sizeof(BTPageOpaqueData));
979
980         /*
981          * Finding the best possible split would require checking all the
982          * possible split points, because of the high-key and left-key special
983          * cases. That's probably more work than it's worth; instead, stop as
984          * soon as we find a "good-enough" split, where good-enough is defined
985          * as an imbalance in free space of no more than pagesize/16
986          * (arbitrary...) This should let us stop near the middle on most
987          * pages, instead of plowing to the end.
988          */
989         goodenough = leftspace / 16;
990
991         /* The right page will have the same high key as the old page */
992         if (!P_RIGHTMOST(opaque))
993         {
994                 itemid = PageGetItemId(page, P_HIKEY);
995                 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
996                                                          sizeof(ItemIdData));
997         }
998
999         /* Count up total space in data items without actually scanning 'em */
1000         dataitemtotal = rightspace - (int) PageGetFreeSpace(page);
1001
1002         /*
1003          * Scan through the data items and calculate space usage for a split
1004          * at each possible position.
1005          */
1006         dataitemstoleft = 0;
1007         maxoff = PageGetMaxOffsetNumber(page);
1008
1009         for (offnum = P_FIRSTDATAKEY(opaque);
1010                  offnum <= maxoff;
1011                  offnum = OffsetNumberNext(offnum))
1012         {
1013                 Size            itemsz;
1014                 int                     leftfree,
1015                                         rightfree;
1016
1017                 itemid = PageGetItemId(page, offnum);
1018                 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1019
1020                 /*
1021                  * We have to allow for the current item becoming the high key of
1022                  * the left page; therefore it counts against left space as well
1023                  * as right space.
1024                  */
1025                 leftfree = leftspace - dataitemstoleft - (int) itemsz;
1026                 rightfree = rightspace - (dataitemtotal - dataitemstoleft);
1027
1028                 /*
1029                  * Will the new item go to left or right of split?
1030                  */
1031                 if (offnum > newitemoff)
1032                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1033                                                           true, itemsz);
1034                 else if (offnum < newitemoff)
1035                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1036                                                           false, itemsz);
1037                 else
1038                 {
1039                         /* need to try it both ways! */
1040                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1041                                                           true, itemsz);
1042                         /* here we are contemplating newitem as first on right */
1043                         _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1044                                                           false, newitemsz);
1045                 }
1046
1047                 /* Abort scan once we find a good-enough choice */
1048                 if (state.have_split && state.best_delta <= goodenough)
1049                         break;
1050
1051                 dataitemstoleft += itemsz;
1052         }
1053
1054         /*
1055          * I believe it is not possible to fail to find a feasible split, but
1056          * just in case ...
1057          */
1058         if (!state.have_split)
1059                 elog(ERROR, "could not find a feasible split point for \"%s\"",
1060                          RelationGetRelationName(rel));
1061
1062         *newitemonleft = state.newitemonleft;
1063         return state.firstright;
1064 }
1065
1066 /*
1067  * Subroutine to analyze a particular possible split choice (ie, firstright
1068  * and newitemonleft settings), and record the best split so far in *state.
1069  */
1070 static void
1071 _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
1072                                   int leftfree, int rightfree,
1073                                   bool newitemonleft, Size firstrightitemsz)
1074 {
1075         /*
1076          * Account for the new item on whichever side it is to be put.
1077          */
1078         if (newitemonleft)
1079                 leftfree -= (int) state->newitemsz;
1080         else
1081                 rightfree -= (int) state->newitemsz;
1082
1083         /*
1084          * If we are not on the leaf level, we will be able to discard the key
1085          * data from the first item that winds up on the right page.
1086          */
1087         if (!state->is_leaf)
1088                 rightfree += (int) firstrightitemsz -
1089                         (int) (MAXALIGN(sizeof(BTItemData)) + sizeof(ItemIdData));
1090
1091         /*
1092          * If feasible split point, remember best delta.
1093          */
1094         if (leftfree >= 0 && rightfree >= 0)
1095         {
1096                 int                     delta;
1097
1098                 if (state->is_rightmost)
1099                 {
1100                         /*
1101                          * On a rightmost page, try to equalize right free space with
1102                          * twice the left free space.  See comments for
1103                          * _bt_findsplitloc.
1104                          */
1105                         delta = (2 * leftfree) - rightfree;
1106                 }
1107                 else
1108                 {
1109                         /* Otherwise, aim for equal free space on both sides */
1110                         delta = leftfree - rightfree;
1111                 }
1112
1113                 if (delta < 0)
1114                         delta = -delta;
1115                 if (!state->have_split || delta < state->best_delta)
1116                 {
1117                         state->have_split = true;
1118                         state->newitemonleft = newitemonleft;
1119                         state->firstright = firstright;
1120                         state->best_delta = delta;
1121                 }
1122         }
1123 }
1124
1125 /*
1126  * _bt_insert_parent() -- Insert downlink into parent after a page split.
1127  *
1128  * On entry, buf and rbuf are the left and right split pages, which we
1129  * still hold write locks on per the L&Y algorithm.  We release the
1130  * write locks once we have write lock on the parent page.      (Any sooner,
1131  * and it'd be possible for some other process to try to split or delete
1132  * one of these pages, and get confused because it cannot find the downlink.)
1133  *
1134  * stack - stack showing how we got here.  May be NULL in cases that don't
1135  *                      have to be efficient (concurrent ROOT split, WAL recovery)
1136  * is_root - we split the true root
1137  * is_only - we split a page alone on its level (might have been fast root)
1138  *
1139  * This is exported so it can be called by nbtxlog.c.
1140  */
1141 void
1142 _bt_insert_parent(Relation rel,
1143                                   Buffer buf,
1144                                   Buffer rbuf,
1145                                   BTStack stack,
1146                                   bool is_root,
1147                                   bool is_only)
1148 {
1149         /*
1150          * Here we have to do something Lehman and Yao don't talk about: deal
1151          * with a root split and construction of a new root.  If our stack is
1152          * empty then we have just split a node on what had been the root
1153          * level when we descended the tree.  If it was still the root then we
1154          * perform a new-root construction.  If it *wasn't* the root anymore,
1155          * search to find the next higher level that someone constructed
1156          * meanwhile, and find the right place to insert as for the normal
1157          * case.
1158          *
1159          * If we have to search for the parent level, we do so by re-descending
1160          * from the root.  This is not super-efficient, but it's rare enough
1161          * not to matter.  (This path is also taken when called from WAL
1162          * recovery --- we have no stack in that case.)
1163          */
1164         if (is_root)
1165         {
1166                 Buffer          rootbuf;
1167
1168                 Assert(stack == NULL);
1169                 Assert(is_only);
1170                 /* create a new root node and update the metapage */
1171                 rootbuf = _bt_newroot(rel, buf, rbuf);
1172                 /* release the split buffers */
1173                 _bt_wrtbuf(rel, rootbuf);
1174                 _bt_wrtbuf(rel, rbuf);
1175                 _bt_wrtbuf(rel, buf);
1176         }
1177         else
1178         {
1179                 BlockNumber bknum = BufferGetBlockNumber(buf);
1180                 BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1181                 Page            page = BufferGetPage(buf);
1182                 BTItem          new_item;
1183                 BTStackData fakestack;
1184                 BTItem          ritem;
1185                 Buffer          pbuf;
1186
1187                 if (stack == NULL)
1188                 {
1189                         BTPageOpaque lpageop;
1190
1191                         if (!InRecovery)
1192                                 elog(DEBUG2, "concurrent ROOT page split");
1193                         lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1194                         /* Find the leftmost page at the next level up */
1195                         pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
1196                         /* Set up a phony stack entry pointing there */
1197                         stack = &fakestack;
1198                         stack->bts_blkno = BufferGetBlockNumber(pbuf);
1199                         stack->bts_offset = InvalidOffsetNumber;
1200                         /* bts_btitem will be initialized below */
1201                         stack->bts_parent = NULL;
1202                         _bt_relbuf(rel, pbuf);
1203                 }
1204
1205                 /* get high key from left page == lowest key on new right page */
1206                 ritem = (BTItem) PageGetItem(page,
1207                                                                          PageGetItemId(page, P_HIKEY));
1208
1209                 /* form an index tuple that points at the new right page */
1210                 new_item = _bt_formitem(&(ritem->bti_itup));
1211                 ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY);
1212
1213                 /*
1214                  * Find the parent buffer and get the parent page.
1215                  *
1216                  * Oops - if we were moved right then we need to change stack item!
1217                  * We want to find parent pointing to where we are, right ?    -
1218                  * vadim 05/27/97
1219                  */
1220                 ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
1221                                            bknum, P_HIKEY);
1222
1223                 pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1224
1225                 /* Now we can write and unlock the children */
1226                 _bt_wrtbuf(rel, rbuf);
1227                 _bt_wrtbuf(rel, buf);
1228
1229                 /* Check for error only after writing children */
1230                 if (pbuf == InvalidBuffer)
1231                         elog(ERROR, "failed to re-find parent key in \"%s\"",
1232                                  RelationGetRelationName(rel));
1233
1234                 /* Recursively update the parent */
1235                 _bt_insertonpg(rel, pbuf, stack->bts_parent,
1236                                            0, NULL, new_item, stack->bts_offset,
1237                                            is_only);
1238
1239                 /* be tidy */
1240                 pfree(new_item);
1241         }
1242 }
1243
1244 /*
1245  *      _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1246  *                                               we last looked at in the parent.
1247  *
1248  *              This is possible because we save the downlink from the parent item,
1249  *              which is enough to uniquely identify it.  Insertions into the parent
1250  *              level could cause the item to move right; deletions could cause it
1251  *              to move left, but not left of the page we previously found it in.
1252  *
1253  *              Adjusts bts_blkno & bts_offset if changed.
1254  *
1255  *              Returns InvalidBuffer if item not found (should not happen).
1256  */
1257 Buffer
1258 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1259 {
1260         BlockNumber blkno;
1261         OffsetNumber start;
1262
1263         blkno = stack->bts_blkno;
1264         start = stack->bts_offset;
1265
1266         for (;;)
1267         {
1268                 Buffer          buf;
1269                 Page            page;
1270                 BTPageOpaque opaque;
1271
1272                 buf = _bt_getbuf(rel, blkno, access);
1273                 page = BufferGetPage(buf);
1274                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1275
1276                 if (!P_IGNORE(opaque))
1277                 {
1278                         OffsetNumber offnum,
1279                                                 minoff,
1280                                                 maxoff;
1281                         ItemId          itemid;
1282                         BTItem          item;
1283
1284                         minoff = P_FIRSTDATAKEY(opaque);
1285                         maxoff = PageGetMaxOffsetNumber(page);
1286
1287                         /*
1288                          * start = InvalidOffsetNumber means "search the whole page".
1289                          * We need this test anyway due to possibility that page has a
1290                          * high key now when it didn't before.
1291                          */
1292                         if (start < minoff)
1293                                 start = minoff;
1294
1295                         /*
1296                          * Need this check too, to guard against possibility that page
1297                          * split since we visited it originally.
1298                          */
1299                         if (start > maxoff)
1300                                 start = OffsetNumberNext(maxoff);
1301
1302                         /*
1303                          * These loops will check every item on the page --- but in an
1304                          * order that's attuned to the probability of where it
1305                          * actually is.  Scan to the right first, then to the left.
1306                          */
1307                         for (offnum = start;
1308                                  offnum <= maxoff;
1309                                  offnum = OffsetNumberNext(offnum))
1310                         {
1311                                 itemid = PageGetItemId(page, offnum);
1312                                 item = (BTItem) PageGetItem(page, itemid);
1313                                 if (BTItemSame(item, &stack->bts_btitem))
1314                                 {
1315                                         /* Return accurate pointer to where link is now */
1316                                         stack->bts_blkno = blkno;
1317                                         stack->bts_offset = offnum;
1318                                         return buf;
1319                                 }
1320                         }
1321
1322                         for (offnum = OffsetNumberPrev(start);
1323                                  offnum >= minoff;
1324                                  offnum = OffsetNumberPrev(offnum))
1325                         {
1326                                 itemid = PageGetItemId(page, offnum);
1327                                 item = (BTItem) PageGetItem(page, itemid);
1328                                 if (BTItemSame(item, &stack->bts_btitem))
1329                                 {
1330                                         /* Return accurate pointer to where link is now */
1331                                         stack->bts_blkno = blkno;
1332                                         stack->bts_offset = offnum;
1333                                         return buf;
1334                                 }
1335                         }
1336                 }
1337
1338                 /*
1339                  * The item we're looking for moved right at least one page.
1340                  */
1341                 if (P_RIGHTMOST(opaque))
1342                 {
1343                         _bt_relbuf(rel, buf);
1344                         return InvalidBuffer;
1345                 }
1346                 blkno = opaque->btpo_next;
1347                 start = InvalidOffsetNumber;
1348                 _bt_relbuf(rel, buf);
1349         }
1350 }
1351
1352 /*
1353  *      _bt_newroot() -- Create a new root page for the index.
1354  *
1355  *              We've just split the old root page and need to create a new one.
1356  *              In order to do this, we add a new root page to the file, then lock
1357  *              the metadata page and update it.  This is guaranteed to be deadlock-
1358  *              free, because all readers release their locks on the metadata page
1359  *              before trying to lock the root, and all writers lock the root before
1360  *              trying to lock the metadata page.  We have a write lock on the old
1361  *              root page, so we have not introduced any cycles into the waits-for
1362  *              graph.
1363  *
1364  *              On entry, lbuf (the old root) and rbuf (its new peer) are write-
1365  *              locked. On exit, a new root page exists with entries for the
1366  *              two new children, metapage is updated and unlocked/unpinned.
1367  *              The new root buffer is returned to caller which has to unlock/unpin
1368  *              lbuf, rbuf & rootbuf.
1369  */
1370 static Buffer
1371 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1372 {
1373         Buffer          rootbuf;
1374         Page            lpage,
1375                                 rpage,
1376                                 rootpage;
1377         BlockNumber lbkno,
1378                                 rbkno;
1379         BlockNumber rootblknum;
1380         BTPageOpaque rootopaque;
1381         ItemId          itemid;
1382         BTItem          item;
1383         Size            itemsz;
1384         BTItem          new_item;
1385         Buffer          metabuf;
1386         Page            metapg;
1387         BTMetaPageData *metad;
1388
1389         lbkno = BufferGetBlockNumber(lbuf);
1390         rbkno = BufferGetBlockNumber(rbuf);
1391         lpage = BufferGetPage(lbuf);
1392         rpage = BufferGetPage(rbuf);
1393
1394         /* get a new root page */
1395         rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1396         rootpage = BufferGetPage(rootbuf);
1397         rootblknum = BufferGetBlockNumber(rootbuf);
1398
1399         /* acquire lock on the metapage */
1400         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1401         metapg = BufferGetPage(metabuf);
1402         metad = BTPageGetMeta(metapg);
1403
1404         /* NO EREPORT(ERROR) from here till newroot op is logged */
1405         START_CRIT_SECTION();
1406
1407         /* set btree special data */
1408         rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1409         rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1410         rootopaque->btpo_flags = BTP_ROOT;
1411         rootopaque->btpo.level =
1412                 ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1413
1414         /* update metapage data */
1415         metad->btm_root = rootblknum;
1416         metad->btm_level = rootopaque->btpo.level;
1417         metad->btm_fastroot = rootblknum;
1418         metad->btm_fastlevel = rootopaque->btpo.level;
1419
1420         /*
1421          * Create downlink item for left page (old root).  Since this will be
1422          * the first item in a non-leaf page, it implicitly has minus-infinity
1423          * key value, so we need not store any actual key in it.
1424          */
1425         itemsz = sizeof(BTItemData);
1426         new_item = (BTItem) palloc(itemsz);
1427         new_item->bti_itup.t_info = itemsz;
1428         ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY);
1429
1430         /*
1431          * Insert the left page pointer into the new root page.  The root page
1432          * is the rightmost page on its level so there is no "high key" in it;
1433          * the two items will go into positions P_HIKEY and P_FIRSTKEY.
1434          */
1435         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
1436                 elog(PANIC, "failed to add leftkey to new root page");
1437         pfree(new_item);
1438
1439         /*
1440          * Create downlink item for right page.  The key for it is obtained
1441          * from the "high key" position in the left page.
1442          */
1443         itemid = PageGetItemId(lpage, P_HIKEY);
1444         itemsz = ItemIdGetLength(itemid);
1445         item = (BTItem) PageGetItem(lpage, itemid);
1446         new_item = _bt_formitem(&(item->bti_itup));
1447         ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY);
1448
1449         /*
1450          * insert the right page pointer into the new root page.
1451          */
1452         if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
1453                 elog(PANIC, "failed to add rightkey to new root page");
1454         pfree(new_item);
1455
1456         /* XLOG stuff */
1457         if (!rel->rd_istemp)
1458         {
1459                 xl_btree_newroot xlrec;
1460                 XLogRecPtr      recptr;
1461                 XLogRecData rdata[2];
1462
1463                 xlrec.node = rel->rd_node;
1464                 xlrec.rootblk = rootblknum;
1465                 xlrec.level = metad->btm_level;
1466
1467                 rdata[0].buffer = InvalidBuffer;
1468                 rdata[0].data = (char *) &xlrec;
1469                 rdata[0].len = SizeOfBtreeNewroot;
1470                 rdata[0].next = &(rdata[1]);
1471
1472                 /*
1473                  * Direct access to page is not good but faster - we should
1474                  * implement some new func in page API.
1475                  */
1476                 rdata[1].buffer = InvalidBuffer;
1477                 rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
1478                 rdata[1].len = ((PageHeader) rootpage)->pd_special -
1479                         ((PageHeader) rootpage)->pd_upper;
1480                 rdata[1].next = NULL;
1481
1482                 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
1483
1484                 PageSetLSN(rootpage, recptr);
1485                 PageSetTLI(rootpage, ThisTimeLineID);
1486                 PageSetLSN(metapg, recptr);
1487                 PageSetTLI(metapg, ThisTimeLineID);
1488                 PageSetLSN(lpage, recptr);
1489                 PageSetTLI(lpage, ThisTimeLineID);
1490                 PageSetLSN(rpage, recptr);
1491                 PageSetTLI(rpage, ThisTimeLineID);
1492         }
1493
1494         END_CRIT_SECTION();
1495
1496         /* write and let go of metapage buffer */
1497         _bt_wrtbuf(rel, metabuf);
1498
1499         return (rootbuf);
1500 }
1501
1502 /*
1503  *      _bt_pgaddtup() -- add a tuple to a particular page in the index.
1504  *
1505  *              This routine adds the tuple to the page as requested.  It does
1506  *              not affect pin/lock status, but you'd better have a write lock
1507  *              and pin on the target buffer!  Don't forget to write and release
1508  *              the buffer afterwards, either.
1509  *
1510  *              The main difference between this routine and a bare PageAddItem call
1511  *              is that this code knows that the leftmost data item on a non-leaf
1512  *              btree page doesn't need to have a key.  Therefore, it strips such
1513  *              items down to just the item header.  CAUTION: this works ONLY if
1514  *              we insert the items in order, so that the given itup_off does
1515  *              represent the final position of the item!
1516  */
1517 static void
1518 _bt_pgaddtup(Relation rel,
1519                          Page page,
1520                          Size itemsize,
1521                          BTItem btitem,
1522                          OffsetNumber itup_off,
1523                          const char *where)
1524 {
1525         BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1526         BTItemData      truncitem;
1527
1528         if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
1529         {
1530                 memcpy(&truncitem, btitem, sizeof(BTItemData));
1531                 truncitem.bti_itup.t_info = sizeof(BTItemData);
1532                 btitem = &truncitem;
1533                 itemsize = sizeof(BTItemData);
1534         }
1535
1536         if (PageAddItem(page, (Item) btitem, itemsize, itup_off,
1537                                         LP_USED) == InvalidOffsetNumber)
1538                 elog(PANIC, "failed to add item to the %s for \"%s\"",
1539                          where, RelationGetRelationName(rel));
1540 }
1541
1542 /*
1543  * _bt_isequal - used in _bt_doinsert in check for duplicates.
1544  *
1545  * This is very similar to _bt_compare, except for NULL handling.
1546  * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
1547  */
1548 static bool
1549 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
1550                         int keysz, ScanKey scankey)
1551 {
1552         BTItem          btitem;
1553         IndexTuple      itup;
1554         int                     i;
1555
1556         /* Better be comparing to a leaf item */
1557         Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
1558
1559         btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
1560         itup = &(btitem->bti_itup);
1561
1562         for (i = 1; i <= keysz; i++)
1563         {
1564                 AttrNumber      attno;
1565                 Datum           datum;
1566                 bool            isNull;
1567                 int32           result;
1568
1569                 attno = scankey->sk_attno;
1570                 Assert(attno == i);
1571                 datum = index_getattr(itup, attno, itupdesc, &isNull);
1572
1573                 /* NULLs are never equal to anything */
1574                 if (isNull || (scankey->sk_flags & SK_ISNULL))
1575                         return false;
1576
1577                 result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
1578                                                                                          datum,
1579                                                                                          scankey->sk_argument));
1580
1581                 if (result != 0)
1582                         return false;
1583
1584                 scankey++;
1585         }
1586
1587         /* if we get here, the keys are equal */
1588         return true;
1589 }