*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.149 2007/02/06 14:55:11 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.150 2007/02/08 05:05:53 momjian Exp $
*
*-------------------------------------------------------------------------
*/
rightoff;
OffsetNumber maxoff;
OffsetNumber i;
+ bool isroot;
rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
origpage = BufferGetPage(buf);
lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
+ isroot = P_ISROOT(oopaque);
+
/* if we're splitting this page, it won't be the root when we're done */
/* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
lopaque->btpo_flags = oopaque->btpo_flags;
MarkBufferDirty(sbuf);
}
+ /*
+ * By here, the original data page has been split into two new halves, and
+ * these are correct. The algorithm requires that the left page never
+ * move during a split, so we copy the new left page back on top of the
+ * original. Note that this is not a waste of time, since we also require
+ * (in the page management code) that the center of a page always be
+ * clean, and the most efficient way to guarantee this is just to compact
+ * the data by reinserting it into a new left page. (XXX the latter
+ * comment is probably obsolete.)
+ *
+ * We need to do this before writing the WAL record, so that XLogInsert can
+ * WAL log an image of the page if necessary.
+ */
+ PageRestoreTempPage(leftpage, origpage);
+
/* XLOG stuff */
if (!rel->rd_istemp)
{
xl_btree_split xlrec;
uint8 xlinfo;
XLogRecPtr recptr;
- XLogRecData rdata[4];
+ XLogRecData rdata[6];
+ XLogRecData *lastrdata;
- xlrec.target.node = rel->rd_node;
- ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
+ xlrec.node = rel->rd_node;
+ xlrec.leftsib = BufferGetBlockNumber(buf);
+ xlrec.rightsib = BufferGetBlockNumber(rbuf);
+ xlrec.firstright = firstright;
+ xlrec.rnext = ropaque->btpo_next;
+ xlrec.level = lopaque->btpo.level;
+
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = SizeOfBtreeSplit;
+ rdata[0].buffer = InvalidBuffer;
+
+ lastrdata = &rdata[0];
+
+ /* Log downlink on non-leaf pages. */
+ if (lopaque->btpo.level > 0)
+ {
+ lastrdata->next = lastrdata + 1;
+ lastrdata++;
+
+ lastrdata->data = (char *) &newitem->t_tid.ip_blkid;
+ lastrdata->len = sizeof(BlockIdData);
+ lastrdata->buffer = InvalidBuffer;
+ }
+
+ /* Log the new item, if it was inserted on the left page. If it was
+ * put on the right page, we don't need to explicitly WAL log it
+ * because it's included with all the other items on the right page.
+ */
+ lastrdata->next = lastrdata + 1;
+ lastrdata++;
if (newitemonleft)
- xlrec.otherblk = BufferGetBlockNumber(rbuf);
+ {
+ lastrdata->data = (char *) &newitemoff;
+ lastrdata->len = sizeof(OffsetNumber);
+ lastrdata->buffer = buf; /* backup block 1 */
+ lastrdata->buffer_std = true;
+
+ lastrdata->next = lastrdata + 1;
+ lastrdata++;
+ lastrdata->data = (char *)newitem;
+ lastrdata->len = newitemsz;
+ lastrdata->buffer = buf; /* backup block 1 */
+ lastrdata->buffer_std = true;
+ }
else
- xlrec.otherblk = BufferGetBlockNumber(buf);
- xlrec.leftblk = lopaque->btpo_prev;
- xlrec.rightblk = ropaque->btpo_next;
- xlrec.level = lopaque->btpo.level;
+ {
+ lastrdata->data = NULL;
+ lastrdata->len = 0;
+ lastrdata->buffer = buf; /* backup block 1 */
+ lastrdata->buffer_std = true;
+ }
- /*
+ /* Log the contents of the right page in the format understood by
+ * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
+ * because we're going to recreate the whole page anyway.
+ *
* Direct access to page is not good but faster - we should implement
* some new func in page API. Note we only store the tuples
* themselves, knowing that the item pointers are in the same order
* and can be reconstructed by scanning the tuples. See comments for
* _bt_restore_page().
*/
- xlrec.leftlen = ((PageHeader) leftpage)->pd_special -
- ((PageHeader) leftpage)->pd_upper;
+ lastrdata->next = lastrdata + 1;
+ lastrdata++;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBtreeSplit;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
-
- rdata[1].data = (char *) leftpage + ((PageHeader) leftpage)->pd_upper;
- rdata[1].len = xlrec.leftlen;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = &(rdata[2]);
-
- rdata[2].data = (char *) rightpage + ((PageHeader) rightpage)->pd_upper;
- rdata[2].len = ((PageHeader) rightpage)->pd_special -
+ lastrdata->data = (char *) rightpage +
((PageHeader) rightpage)->pd_upper;
- rdata[2].buffer = InvalidBuffer;
- rdata[2].next = NULL;
+ lastrdata->len = ((PageHeader) rightpage)->pd_special -
+ ((PageHeader) rightpage)->pd_upper;
+ lastrdata->buffer = InvalidBuffer;
+ /* Log the right sibling, because we've changed it's prev-pointer. */
if (!P_RIGHTMOST(ropaque))
{
- rdata[2].next = &(rdata[3]);
- rdata[3].data = NULL;
- rdata[3].len = 0;
- rdata[3].buffer = sbuf;
- rdata[3].buffer_std = true;
- rdata[3].next = NULL;
+ lastrdata->next = lastrdata + 1;
+ lastrdata++;
+
+ lastrdata->data = NULL;
+ lastrdata->len = 0;
+ lastrdata->buffer = sbuf; /* backup block 2 */
+ lastrdata->buffer_std = true;
}
- if (P_ISROOT(oopaque))
+ lastrdata->next = NULL;
+
+ if (isroot)
xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
else
xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
}
}
- /*
- * By here, the original data page has been split into two new halves, and
- * these are correct. The algorithm requires that the left page never
- * move during a split, so we copy the new left page back on top of the
- * original. Note that this is not a waste of time, since we also require
- * (in the page management code) that the center of a page always be
- * clean, and the most efficient way to guarantee this is just to compact
- * the data by reinserting it into a new left page. (XXX the latter
- * comment is probably obsolete.)
- *
- * It's a bit weird that we don't fill in the left page till after writing
- * the XLOG entry, but not really worth changing. Note that we use the
- * origpage data (specifically its BTP_ROOT bit) while preparing the XLOG
- * entry, so simply reshuffling the code won't do.
- */
-
- PageRestoreTempPage(leftpage, origpage);
-
END_CRIT_SECTION();
/* release the old right sibling */
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.41 2007/02/01 19:10:25 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.42 2007/02/08 05:05:53 momjian Exp $
*
*-------------------------------------------------------------------------
*/
{
xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
Relation reln;
- BlockNumber targetblk;
- OffsetNumber targetoff;
- BlockNumber leftsib;
- BlockNumber rightsib;
- BlockNumber downlink = 0;
- Buffer buffer;
- Page page;
- BTPageOpaque pageop;
+ Buffer lbuf, rbuf;
+ Page lpage, rpage;
+ BTPageOpaque ropaque, lopaque;
+ char *datapos;
+ int datalen;
+ bool bkp_left = record->xl_info & XLR_BKP_BLOCK_1;
+ bool bkp_nextsib = record->xl_info & XLR_BKP_BLOCK_2;
+ OffsetNumber newitemoff;
+ Item newitem = NULL;
+ Size newitemsz = 0;
- reln = XLogOpenRelation(xlrec->target.node);
- targetblk = ItemPointerGetBlockNumber(&(xlrec->target.tid));
- targetoff = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
- leftsib = (onleft) ? targetblk : xlrec->otherblk;
- rightsib = (onleft) ? xlrec->otherblk : targetblk;
+ reln = XLogOpenRelation(xlrec->node);
- /* Left (original) sibling */
- buffer = XLogReadBuffer(reln, leftsib, true);
- Assert(BufferIsValid(buffer));
- page = (Page) BufferGetPage(buffer);
+ datapos = (char *) xlrec + SizeOfBtreeSplit;
+ datalen = record->xl_len - SizeOfBtreeSplit;
- _bt_pageinit(page, BufferGetPageSize(buffer));
- pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ /* Forget any split this insertion completes */
+ if (xlrec->level > 0)
+ {
+ BlockNumber downlink = BlockIdGetBlockNumber((BlockId) datapos);
- pageop->btpo_prev = xlrec->leftblk;
- pageop->btpo_next = rightsib;
- pageop->btpo.level = xlrec->level;
- pageop->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
- pageop->btpo_cycleid = 0;
+ datapos += sizeof(BlockIdData);
+ datalen -= sizeof(BlockIdData);
+
+ forget_matching_split(xlrec->node, downlink, false);
+ }
- _bt_restore_page(page,
- (char *) xlrec + SizeOfBtreeSplit,
- xlrec->leftlen);
- if (onleft && xlrec->level > 0)
+ /* Extract newitem and newitemoff */
+ if (!bkp_left && onleft)
{
- IndexTuple itup;
+ IndexTupleData itupdata;
- /* extract downlink in the target tuple */
- itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, targetoff));
- downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
- Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
+ /* Extract the offset of the new tuple and it's contents */
+ memcpy(&newitemoff, datapos, sizeof(OffsetNumber));
+ datapos += sizeof(OffsetNumber);
+ datalen -= sizeof(OffsetNumber);
+
+ newitem = datapos;
+ /* Need to copy tuple header due to alignment considerations */
+ memcpy(&itupdata, datapos, sizeof(IndexTupleData));
+ newitemsz = IndexTupleDSize(itupdata);
+ newitemsz = MAXALIGN(newitemsz);
+ datapos += newitemsz;
+ datalen -= newitemsz;
}
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
+ /* Reconstruct right (new) sibling */
+ rbuf = XLogReadBuffer(reln, xlrec->rightsib, true);
+ Assert(BufferIsValid(rbuf));
+ rpage = (Page) BufferGetPage(rbuf);
- /* Right (new) sibling */
- buffer = XLogReadBuffer(reln, rightsib, true);
- Assert(BufferIsValid(buffer));
- page = (Page) BufferGetPage(buffer);
+ _bt_pageinit(rpage, BufferGetPageSize(rbuf));
+ ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
- _bt_pageinit(page, BufferGetPageSize(buffer));
- pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ ropaque->btpo_prev = xlrec->leftsib;
+ ropaque->btpo_next = xlrec->rnext;
+ ropaque->btpo.level = xlrec->level;
+ ropaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
+ ropaque->btpo_cycleid = 0;
- pageop->btpo_prev = leftsib;
- pageop->btpo_next = xlrec->rightblk;
- pageop->btpo.level = xlrec->level;
- pageop->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
- pageop->btpo_cycleid = 0;
+ _bt_restore_page(rpage, datapos, datalen);
- _bt_restore_page(page,
- (char *) xlrec + SizeOfBtreeSplit + xlrec->leftlen,
- record->xl_len - SizeOfBtreeSplit - xlrec->leftlen);
+ PageSetLSN(rpage, lsn);
+ PageSetTLI(rpage, ThisTimeLineID);
+ MarkBufferDirty(rbuf);
- if (!onleft && xlrec->level > 0)
- {
- IndexTuple itup;
+ /* don't release the buffer yet, because reconstructing the left sibling
+ * needs to access the data on the right page
+ */
- /* extract downlink in the target tuple */
- itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, targetoff));
- downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
- Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
- }
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
+ /* Reconstruct left (original) sibling */
- /* Fix left-link of right (next) page */
- if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ if(!bkp_left)
{
- if (xlrec->rightblk != P_NONE)
+ lbuf = XLogReadBuffer(reln, xlrec->leftsib, false);
+
+ if (BufferIsValid(lbuf))
{
- buffer = XLogReadBuffer(reln, xlrec->rightblk, false);
- if (BufferIsValid(buffer))
- {
- page = (Page) BufferGetPage(buffer);
+ lpage = (Page) BufferGetPage(lbuf);
+ lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
- if (XLByteLE(lsn, PageGetLSN(page)))
+ if (!XLByteLE(lsn, PageGetLSN(lpage)))
+ {
+ /* Remove the items from the left page that were copied to
+ * right page, and add the new item if it was inserted to
+ * left page.
+ */
+ OffsetNumber off;
+ OffsetNumber maxoff = PageGetMaxOffsetNumber(lpage);
+ ItemId hiItemId;
+ Item hiItem;
+
+ for(off = maxoff ; off >= xlrec->firstright; off--)
+ PageIndexTupleDelete(lpage, off);
+
+ if (onleft)
{
- UnlockReleaseBuffer(buffer);
+ if (PageAddItem(lpage, newitem, newitemsz, newitemoff,
+ LP_USED) == InvalidOffsetNumber)
+ elog(PANIC, "can't add new item to left sibling after split");
}
- else
- {
- pageop = (BTPageOpaque) PageGetSpecialPointer(page);
- pageop->btpo_prev = rightsib;
+ /* Set high key equal to the first key on the right page */
+ hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque));
+ hiItem = PageGetItem(rpage, hiItemId);
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
+ if(!P_RIGHTMOST(lopaque))
+ {
+ /* but remove the old high key first */
+ PageIndexTupleDelete(lpage, P_HIKEY);
}
+
+ if(PageAddItem(lpage, hiItem, ItemIdGetLength(hiItemId),
+ P_HIKEY, LP_USED) == InvalidOffsetNumber)
+ elog(PANIC, "can't add high key after split to left page");
+
+ /* Fix opaque fields */
+ lopaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
+ lopaque->btpo_next = xlrec->rightsib;
+ lopaque->btpo_cycleid = 0;
+
+ PageSetLSN(lpage, lsn);
+ PageSetTLI(lpage, ThisTimeLineID);
+ MarkBufferDirty(lbuf);
}
+
+ UnlockReleaseBuffer(lbuf);
}
+
}
- /* Forget any split this insertion completes */
- if (xlrec->level > 0)
- forget_matching_split(xlrec->target.node, downlink, false);
+ /* we no longer need the right buffer. */
+ UnlockReleaseBuffer(rbuf);
+
+ /* Fix left-link of the page to the right of the new right sibling */
+ if (!bkp_nextsib && xlrec->rnext != P_NONE)
+ {
+ Buffer buffer = XLogReadBuffer(reln, xlrec->rnext, false);
+ if (BufferIsValid(buffer))
+ {
+ Page page = (Page) BufferGetPage(buffer);
+
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ pageop->btpo_prev = xlrec->rightsib;
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
+ }
/* The job ain't done till the parent link is inserted... */
- log_incomplete_split(xlrec->target.node,
- leftsib, rightsib, isroot);
+ log_incomplete_split(xlrec->node,
+ xlrec->leftsib, xlrec->rightsib, isroot);
}
static void
{
xl_btree_split *xlrec = (xl_btree_split *) rec;
- appendStringInfo(buf, "split_l: ");
- out_target(buf, &(xlrec->target));
- appendStringInfo(buf, "; oth %u; rgh %u",
- xlrec->otherblk, xlrec->rightblk);
+ appendStringInfo(buf, "split_l: rel %u/%u/%u ",
+ xlrec->node.spcNode, xlrec->node.dbNode,
+ xlrec->node.relNode);
+ appendStringInfo(buf, "left %u, right %u off %u level %u",
+ xlrec->leftsib, xlrec->rightsib,
+ xlrec->firstright, xlrec->level);
break;
}
case XLOG_BTREE_SPLIT_R:
{
xl_btree_split *xlrec = (xl_btree_split *) rec;
- appendStringInfo(buf, "split_r: ");
- out_target(buf, &(xlrec->target));
- appendStringInfo(buf, "; oth %u; rgh %u",
- xlrec->otherblk, xlrec->rightblk);
+ appendStringInfo(buf, "split_r: rel %u/%u/%u ",
+ xlrec->node.spcNode, xlrec->node.dbNode,
+ xlrec->node.relNode);
+ appendStringInfo(buf, "left %u, right %u off %u level %u",
+ xlrec->leftsib, xlrec->rightsib,
+ xlrec->firstright, xlrec->level);
break;
}
case XLOG_BTREE_SPLIT_L_ROOT:
{
xl_btree_split *xlrec = (xl_btree_split *) rec;
- appendStringInfo(buf, "split_l_root: ");
- out_target(buf, &(xlrec->target));
- appendStringInfo(buf, "; oth %u; rgh %u",
- xlrec->otherblk, xlrec->rightblk);
+ appendStringInfo(buf, "split_l_root: rel %u/%u/%u ",
+ xlrec->node.spcNode, xlrec->node.dbNode,
+ xlrec->node.relNode);
+ appendStringInfo(buf, "left %u, right %u off %u level %u",
+ xlrec->leftsib, xlrec->rightsib,
+ xlrec->firstright, xlrec->level);
break;
}
case XLOG_BTREE_SPLIT_R_ROOT:
{
xl_btree_split *xlrec = (xl_btree_split *) rec;
- appendStringInfo(buf, "split_r_root: ");
- out_target(buf, &(xlrec->target));
- appendStringInfo(buf, "; oth %u; rgh %u",
- xlrec->otherblk, xlrec->rightblk);
+ appendStringInfo(buf, "split_r_root: rel %u/%u/%u ",
+ xlrec->node.spcNode, xlrec->node.dbNode,
+ xlrec->node.relNode);
+ appendStringInfo(buf, "left %u, right %u off %u level %u",
+ xlrec->leftsib, xlrec->rightsib,
+ xlrec->firstright, xlrec->level);
break;
}
case XLOG_BTREE_DELETE: