*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.62 2000/08/25 23:13:33 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.63 2000/10/04 00:04:42 vadim Exp $
*
*-------------------------------------------------------------------------
*/
int best_delta; /* best size delta so far */
} FindSplitData;
+void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
static TransactionId _bt_check_unique(Relation rel, BTItem btitem,
Relation heapRel, Buffer buf,
int leftfree, int rightfree,
bool newitemonleft, Size firstrightitemsz);
static Buffer _bt_getstackbuf(Relation rel, BTStack stack);
-static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
static void _bt_pgaddtup(Relation rel, Page page,
Size itemsize, BTItem btitem,
OffsetNumber itup_off, const char *where);
}
else
{
+#ifdef XLOG
+ /* XLOG stuff */
+ {
+ char xlbuf[sizeof(xl_btree_insert) + 2 * sizeof(CommandId)];
+ xl_btree_insert *xlrec = xlbuf;
+ int hsize = SizeOfBtreeInsert;
+
+ xlrec->target.node = rel->rd_node;
+ ItemPointerSet(&(xlrec->target.tid), BufferGetBlockNumber(buf), newitemoff);
+ if (P_ISLEAF(lpageop))
+ {
+ CommandId cid = GetCurrentCommandId();
+ memcpy(xlbuf + SizeOfBtreeInsert, &(char*)cid, sizeof(CommandId));
+ hsize += sizeof(CommandId);
+ }
+
+ XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_INSERT,
+ xlbuf, hsize, (char*) btitem, itemsz);
+
+ PageSetLSN(page, recptr);
+ PageSetSUI(page, ThisStartUpID);
+ }
+#endif
_bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
itup_off = newitemoff;
itup_blkno = BufferGetBlockNumber(buf);
ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
/* if we're splitting this page, it won't be the root when we're done */
- oopaque->btpo_flags &= ~BTP_ROOT;
- lopaque->btpo_flags = ropaque->btpo_flags = oopaque->btpo_flags;
+ lopaque->btpo_flags = oopaque->btpo_flags;
+ lopaque->btpo_flags &= ~BTP_ROOT;
+ ropaque->btpo_flags = lopaque->btpo_flags;
lopaque->btpo_prev = oopaque->btpo_prev;
lopaque->btpo_next = BufferGetBlockNumber(rbuf);
ropaque->btpo_prev = BufferGetBlockNumber(buf);
item = (BTItem) PageGetItem(origpage, itemid);
if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
LP_USED) == InvalidOffsetNumber)
- elog(FATAL, "btree: failed to add hikey to the right sibling");
+ elog(STOP, "btree: failed to add hikey to the right sibling");
rightoff = OffsetNumberNext(rightoff);
}
}
if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
LP_USED) == InvalidOffsetNumber)
- elog(FATAL, "btree: failed to add hikey to the left sibling");
+ elog(STOP, "btree: failed to add hikey to the left sibling");
leftoff = OffsetNumberNext(leftoff);
/*
}
}
+ /*
+ * We have to grab the right sibling (if any) and fix the prev
+ * pointer there. We are guaranteed that this is deadlock-free
+ * since no other writer will be holding a lock on that page
+ * and trying to move left, and all readers release locks on a page
+ * before trying to fetch its neighbors.
+ */
+
+ if (!P_RIGHTMOST(ropaque))
+ {
+ sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
+ spage = BufferGetPage(sbuf);
+ }
+
+#ifdef XLOG
+ /*
+ * Right sibling is locked, new siblings are prepared, but original
+ * page is not updated yet. Log changes before continuing.
+ *
+ * NO ELOG(ERROR) till right sibling is updated.
+ *
+ */
+ {
+ char xlbuf[sizeof(xl_btree_split) +
+ 2 * sizeof(CommandId) + BLCKSZ];
+ xl_btree_split *xlrec = xlbuf;
+ int hsize = SizeOfBtreeSplit;
+ int flag = (newitemonleft) ?
+ XLOG_BTREE_SPLEFT : XLOG_BTREE_SPLIT;
+
+ xlrec->target.node = rel->rd_node;
+ ItemPointerSet(&(xlrec->target.tid), itup_blkno, itup_off);
+ if (P_ISLEAF(lopaque))
+ {
+ CommandId cid = GetCurrentCommandId();
+ memcpy(xlbuf + hsize, &(char*)cid, sizeof(CommandId));
+ hsize += sizeof(CommandId);
+ }
+ if (newitemonleft)
+ {
+ memcpy(xlbuf + hsize, (char*) newitem, newitemsz);
+ hsize += newitemsz;
+ xlrec->otherblk = BufferGetBlockNumber(rbuf);
+ }
+ else
+ xlrec->otherblk = BufferGetBlockNumber(buf);
+
+ xlrec->rightblk = ropaque->btpo_next;
+
+ /*
+ * Dirrect access to page is not good but faster - we should
+ * implement some new func in page API.
+ */
+ XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, flag, xlbuf,
+ hsize, (char*)rightpage + (PageHeader) rightpage)->pd_upper,
+ ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->upper);
+
+ PageSetLSN(leftpage, recptr);
+ PageSetSUI(leftpage, ThisStartUpID);
+ PageSetLSN(rightpage, recptr);
+ PageSetSUI(rightpage, ThisStartUpID);
+ if (!P_RIGHTMOST(ropaque))
+ {
+ PageSetLSN(spage, recptr);
+ PageSetSUI(spage, ThisStartUpID);
+ }
+ }
+#endif
+
/*
* By here, the original data page has been split into two new halves,
* and these are correct. The algorithm requires that the left page
PageRestoreTempPage(leftpage, origpage);
- /*
- * Finally, we need to grab the right sibling (if any) and fix the
- * prev pointer there. We are guaranteed that this is deadlock-free
- * since no other writer will be holding a lock on that page
- * and trying to move left, and all readers release locks on a page
- * before trying to fetch its neighbors.
- */
-
if (!P_RIGHTMOST(ropaque))
{
- sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
- spage = BufferGetPage(sbuf);
sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
* two new children. The new root page is neither pinned nor locked, and
* we have also written out lbuf and rbuf and dropped their pins/locks.
*/
-static void
+void
_bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
{
Buffer rootbuf;
rootpage;
BlockNumber lbkno,
rbkno;
- BlockNumber rootbknum;
+ BlockNumber rootblknum;
BTPageOpaque rootopaque;
ItemId itemid;
BTItem item;
/* get a new root page */
rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
rootpage = BufferGetPage(rootbuf);
- rootbknum = BufferGetBlockNumber(rootbuf);
+ rootblknum = BufferGetBlockNumber(rootbuf);
+
+
+ /* NO ELOG(ERROR) from here till newroot op is logged */
/* set btree special data */
rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
rootopaque->btpo_flags |= BTP_ROOT;
+ rootopaque->btpo_parent = BTREE_METAPAGE;
lbkno = BufferGetBlockNumber(lbuf);
rbkno = BufferGetBlockNumber(rbuf);
*/
((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo_parent =
((BTPageOpaque) PageGetSpecialPointer(rpage))->btpo_parent =
- rootbknum;
+ rootblknum;
/*
* Create downlink item for left page (old root). Since this will be
* the two items will go into positions P_HIKEY and P_FIRSTKEY.
*/
if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
- elog(FATAL, "btree: failed to add leftkey to new root page");
+ elog(STOP, "btree: failed to add leftkey to new root page");
pfree(new_item);
/*
* insert the right page pointer into the new root page.
*/
if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
- elog(FATAL, "btree: failed to add rightkey to new root page");
+ elog(STOP, "btree: failed to add rightkey to new root page");
pfree(new_item);
+#ifdef XLOG
+ /* XLOG stuff */
+ {
+ xl_btree_newroot xlrec;
+ xlrec.node = rel->rd_node;
+ xlrec.rootblk = rootblknum;
+
+ /*
+ * Dirrect access to page is not good but faster - we should
+ * implement some new func in page API.
+ */
+ XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT,
+ &xlrec, SizeOfBtreeNewroot,
+ (char*)rootpage + (PageHeader) rootpage)->pd_upper,
+ ((PageHeader) rootpage)->pd_special - ((PageHeader) rootpage)->upper);
+
+ PageSetLSN(rootpage, recptr);
+ PageSetSUI(rootpage, ThisStartUpID);
+ }
+#endif
+
/* write and let go of the new root buffer */
_bt_wrtbuf(rel, rootbuf);
/* update metadata page with new root block number */
- _bt_metaproot(rel, rootbknum, 0);
+ _bt_metaproot(rel, rootblknum, 0);
/* update and release new sibling, and finally the old root */
_bt_wrtbuf(rel, rbuf);
if (PageAddItem(page, (Item) btitem, itemsize, itup_off,
LP_USED) == InvalidOffsetNumber)
- elog(FATAL, "btree: failed to add item to the %s for %s",
+ elog(STOP, "btree: failed to add item to the %s for %s",
where, RelationGetRelationName(rel));
}
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.37 2000/07/21 06:42:32 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.38 2000/10/04 00:04:42 vadim Exp $
*
* NOTES
* Postgres btree pages look like ordinary relation pages. The opaque
Page metapg;
BTPageOpaque metaopaque;
Buffer rootbuf;
- Page rootpg;
+ Page rootpage;
BTPageOpaque rootopaque;
BlockNumber rootblkno;
BTMetaPageData *metad;
*/
rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
rootblkno = BufferGetBlockNumber(rootbuf);
- rootpg = BufferGetPage(rootbuf);
+ rootpage = BufferGetPage(rootbuf);
+
+ /* NO ELOG(ERROR) till meta is updated */
+
+ _bt_pageinit(rootpage, BufferGetPageSize(rootbuf));
+ rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
+ rootopaque->btpo_flags |= (BTP_LEAF | BTP_ROOT);
+
+#ifdef XLOG
+ /* XLOG stuff */
+ {
+ xl_btree_insert xlrec;
+ xlrec.node = rel->rd_node;
+
+ XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT,
+ &xlrec, SizeOfBtreeNewroot, NULL, 0);
+
+ PageSetLSN(rootpage, recptr);
+ PageSetSUI(rootpage, ThisStartUpID);
+ }
+#endif
metad->btm_root = rootblkno;
metad->btm_level = 1;
- _bt_pageinit(rootpg, BufferGetPageSize(rootbuf));
- rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpg);
- rootopaque->btpo_flags |= (BTP_LEAF | BTP_ROOT);
_bt_wrtnorelbuf(rel, rootbuf);
/* swap write lock for read lock */
* at the metadata page and got the root buffer, then we got the wrong
* buffer. Release it and try again.
*/
- rootpg = BufferGetPage(rootbuf);
- rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpg);
+ rootpage = BufferGetPage(rootbuf);
+ rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
if (! P_ISROOT(rootopaque))
{
buf = _bt_getbuf(rel, blkno, BT_WRITE);
page = BufferGetPage(buf);
+#ifdef XLOG
+ /* XLOG stuff */
+ {
+ xl_btree_delete xlrec;
+ xlrec.target.node = rel->rd_node;
+ xlrec.target.tid = *tid;
+ XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE,
+ (char*) xlrec, SizeOfBtreeDelete, NULL, 0);
+
+ PageSetLSN(page, recptr);
+ PageSetSUI(page, ThisStartUpID);
+ }
+#endif
+
PageIndexTupleDelete(page, offno);
/* write the buffer and release the lock */
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: nbtree.h,v 1.42 2000/09/12 06:07:52 vadim Exp $
+ * $Id: nbtree.h,v 1.43 2000/10/04 00:04:43 vadim Exp $
*
*-------------------------------------------------------------------------
*/
typedef BTPageOpaqueData *BTPageOpaque;
+#define BTREE_METAPAGE 0 /* first page is meta */
+
/*
* BTScanOpaqueData is used to remember which buffers we're currently
* examining in the scan. We keep these buffers pinned (but not locked,
typedef BTItemData *BTItem;
+/*
+ * For XLOG: size without alignement. Sizeof works as long as
+ * IndexTupleData has exactly 8 bytes.
+ */
+#define SizeOfBTItem sizeof(BTItemData)
+
/* Test whether items are the "same" per the above notes */
#define BTItemSame(i1, i2) ( (i1)->bti_itup.t_tid.ip_blkid.bi_hi == \
(i2)->bti_itup.t_tid.ip_blkid.bi_hi && \
#define XLOG_BTREE_DELETE 0x00 /* delete btitem */
#define XLOG_BTREE_INSERT 0x10 /* add btitem without split */
#define XLOG_BTREE_SPLIT 0x20 /* add btitem with split */
-#define XLOG_BTREE_ONLEFT 0x40 /* flag for split case: new btitem */
+#define XLOG_BTREE_SPLEFT 0x30 /* as above + flag that new btitem */
/* goes to the left sibling */
+#define XLOG_BTREE_NEWROOT 0x40 /* new root page */
/*
- * All what we need to find changed index tuple (18 bytes)
+ * All what we need to find changed index tuple (14 bytes)
*/
typedef struct xl_btreetid
{
RelFileNode node;
- CommandId cid; /* this is for "better" tuple' */
- /* identification - it allows to avoid */
- /* "compensation" records for undo */
ItemPointerData tid; /* changed tuple id */
} xl_btreetid;
-/* This is what we need to know about delete - ALIGN(18) = 24 bytes */
+/*
+ * This is what we need to know about delete - ALIGN(14) = 18 bytes.
+ */
typedef struct xl_btree_delete
{
xl_btreetid target; /* deleted tuple id */
} xl_btree_delete;
-#define SizeOfBtreeDelete (offsetof(xl_btreetid, tid) + SizeOfIptrData))
+#define SizeOfBtreeDelete (offsetof(xl_btreetid, tid) + SizeOfIptrData)
-/* This is what we need to know about pure (without split) insert - 26 + key data */
+/*
+ * This is what we need to know about pure (without split) insert -
+ * 14 + [4] + btitem with key data. Note that we need in CommandID
+ * (4 bytes) only for leaf page insert.
+ */
typedef struct xl_btree_insert
{
xl_btreetid target; /* inserted tuple id */
- BTItemData btitem;
- /* KEY DATA FOLLOWS AT END OF STRUCT */
+ /* [CommandID and ] BTITEM FOLLOWS AT END OF STRUCT */
} xl_btree_insert;
-#define SizeOfBtreeInsert (offsetof(xl_btree_insert, btitem) + sizeof(BTItemData))
+#define SizeOfBtreeInsert (offsetof(xl_btreetid, tid) + SizeOfIptrData)
-/* This is what we need to know about insert with split - 26 + right sibling btitems */
+/*
+ * This is what we need to know about insert with split -
+ * 22 + [4] + [btitem] + right sibling btitems. Note that we need in
+ * CommandID (4 bytes) only for leaf page insert.
+ */
typedef struct xl_btree_split
{
xl_btreetid target; /* inserted tuple id */
- BlockNumber othblk; /* second block participated in split: */
+ BlockId otherblk; /* second block participated in split: */
/* first one is stored in target' tid */
- BlockNumber parblk; /* parent block to be updated */
+ BlockId rightblk; /* next right block */
/*
* We log all btitems from the right sibling. If new btitem goes on
- * the left sibling then we log it too and it will be first BTItemData
- * at the end of this struct.
+ * the left sibling then we log it too and it will be the first
+ * BTItemData at the end of this struct, but after (for the leaf
+ * pages) CommandId.
*/
} xl_btree_split;
-#define SizeOfBtreeSplit (offsetof(xl_btree_insert, parblk) + sizeof(BlockNumber))
+#define SizeOfBtreeSplit (offsetof(xl_btree_insert, rightblk) + sizeof(BlockId))
+
+/*
+ * New root log record.
+ */
+typedef struct xl_btree_newroot
+{
+ RelFileNode node;
+ BlockId rootblk;
+ /* 0 or 2 BTITEMS FOLLOW AT END OF STRUCT */
+} xl_btree_newroot;
+
+#define SizeOfBtreeNewroot (offsetof(xl_btree_newroot, rootblk) + sizeof(BlockId))
/* end of XLOG stuff */