]> granicus.if.org Git - postgresql/commitdiff
WAL
authorVadim B. Mikheev <vadim4o@yahoo.com>
Fri, 13 Oct 2000 02:03:02 +0000 (02:03 +0000)
committerVadim B. Mikheev <vadim4o@yahoo.com>
Fri, 13 Oct 2000 02:03:02 +0000 (02:03 +0000)
src/backend/access/heap/heapam.c
src/backend/access/nbtree/nbtinsert.c
src/backend/access/nbtree/nbtpage.c
src/backend/access/nbtree/nbtree.c
src/include/access/nbtree.h

index dbcefbf273376e12b61f22b3a070d426c2ad3dde..3e1de33bfe40e4649904386f2fa51a050c86f1a9 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.86 2000/10/04 00:04:41 vadim Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.87 2000/10/13 02:02:59 vadim Exp $
  *
  *
  * INTERFACE ROUTINES
@@ -2016,6 +2016,22 @@ void heap_redo(XLogRecPtr lsn, XLogRecord *record)
                elog(STOP, "heap_redo: unknown op code %u", info);
 }
 
+void heap_undo(XLogRecPtr lsn, XLogRecord *record)
+{
+       uint8   info = record->xl_info & ~XLR_INFO_MASK;
+
+       if (info == XLOG_HEAP_INSERT)
+               heap_xlog_insert(false, lsn, record);
+       else if (info == XLOG_HEAP_DELETE)
+               heap_xlog_delete(false, lsn, record);
+       else if (info == XLOG_HEAP_UPDATE)
+               heap_xlog_update(false, lsn, record);
+       else if (info == XLOG_HEAP_MOVE)
+               heap_xlog_move(false, lsn, record);
+       else
+               elog(STOP, "heap_undo: unknown op code %u", info);
+}
+
 void heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
 {
        xl_heap_delete *xlrec = (xl_heap_delete*) XLogRecGetData(record);
@@ -2199,7 +2215,7 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
        else    /* we can't delete tuple right now */
        {
                lp->lp_flags |= LP_DELETE;      /* mark for deletion */
-               MarkBufferForCleanup(buffer, PageCleanup);
+               MarkBufferForCleanup(buffer, HeapPageCleanup);
        }
 
 }
index e454a989ee4d33c6e2d4bb69e45111a6881cd297..c72b8ca3df613289213f466596e65f81fe8773ee 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.64 2000/10/05 20:10:20 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.65 2000/10/13 02:03:00 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -61,6 +61,10 @@ static void _bt_pgaddtup(Relation rel, Page page,
 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
                                                int keysz, ScanKey scankey);
 
+#ifdef XLOG
+static Relation                _xlheapRel;     /* temporary hack */
+#endif
+
 /*
  *     _bt_doinsert() -- Handle insertion of a single btitem in the tree.
  *
@@ -119,6 +123,10 @@ top:
                }
        }
 
+#ifdef XLOG
+       _xlheapRel = heapRel;   /* temporary hack */
+#endif
+
        /* do the insertion */
        res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem, 0);
 
@@ -517,21 +525,38 @@ _bt_insertonpg(Relation rel,
 #ifdef XLOG
                /* XLOG stuff */
                {
-                       char                            xlbuf[sizeof(xl_btree_insert) + 2 * sizeof(CommandId)];
+                       char                            xlbuf[sizeof(xl_btree_insert) + 
+                                       sizeof(CommandId) + sizeof(RelFileNode)];
                        xl_btree_insert    *xlrec = xlbuf;
                        int                                     hsize = SizeOfBtreeInsert;
+                       BTItemData                      truncitem;
+                       BTItem                          xlitem = btitem;
+                       Size                            xlsize = IndexTupleDSize(btitem->bti_itup) + 
+                                                       (sizeof(BTItemData) - sizeof(IndexTupleData));
 
                        xlrec->target.node = rel->rd_node;
                        ItemPointerSet(&(xlrec->target.tid), BufferGetBlockNumber(buf), newitemoff);
                        if (P_ISLEAF(lpageop))
-                       {
+                       {
                                CommandId       cid = GetCurrentCommandId();
-                               memcpy(xlbuf + SizeOfBtreeInsert, &(char*)cid, sizeof(CommandId));
+                               memcpy(xlbuf + hsize, &cid, sizeof(CommandId));
                                hsize += sizeof(CommandId);
+                               memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode));
+                               hsize += sizeof(RelFileNode);
+                       }
+                       /*
+                        * Read comments in _bt_pgaddtup
+                        */
+                       else if (newitemoff == P_FIRSTDATAKEY(lpageop))
+                       {
+                               truncitem = *btitem;
+                               truncitem.bti_itup.t_info = sizeof(BTItemData);
+                               xlitem = &truncitem;
+                               xlsize = sizeof(BTItemData);
                        }
 
                        XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_INSERT,
-                               xlbuf, hsize, (char*) btitem, itemsz);
+                               xlbuf, hsize, (char*) xlitem, xlsize);
 
                        PageSetLSN(page, recptr);
                        PageSetSUI(page, ThisStartUpID);
@@ -752,7 +777,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
         */
        {
                char                            xlbuf[sizeof(xl_btree_split) + 
-                       2 * sizeof(CommandId) + BLCKSZ];
+                       sizeof(CommandId) + sizeof(RelFileNode) + BLCKSZ];
                xl_btree_split     *xlrec = xlbuf;
                int                                     hsize = SizeOfBtreeSplit;
                int                                     flag = (newitemonleft) ? 
@@ -765,11 +790,30 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
                        CommandId       cid = GetCurrentCommandId();
                        memcpy(xlbuf + hsize, &(char*)cid, sizeof(CommandId));
                        hsize += sizeof(CommandId);
+                       memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode));
+                       hsize += sizeof(RelFileNode);
                }
                if (newitemonleft)
                {
-                       memcpy(xlbuf + hsize, (char*) newitem, newitemsz);
-                       hsize += newitemsz;
+                       /*
+                        * Read comments in _bt_pgaddtup.
+                        * Actually, seems that in non-leaf splits newitem shouldn't
+                        * go to first data key position.
+                        */
+                       if (! P_ISLEAF(lopaque) && itup_off == P_FIRSTDATAKEY(lopaque))
+                       {
+                               BTItemData      truncitem = *newitem;
+                               truncitem.bti_itup.t_info = sizeof(BTItemData);
+                               memcpy(xlbuf + hsize, &truncitem, sizeof(BTItemData));
+                               hsize += sizeof(BTItemData);
+                       }
+                       else
+                       {
+                               Size    itemsz = IndexTupleDSize(newitem->bti_itup) + 
+                                                       (sizeof(BTItemData) - sizeof(IndexTupleData));
+                               memcpy(xlbuf + hsize, (char*) newitem, itemsz);
+                               hsize += itemsz;
+                       }
                        xlrec->otherblk = BufferGetBlockNumber(rbuf);
                }
                else
@@ -1012,7 +1056,7 @@ static Buffer
 _bt_getstackbuf(Relation rel, BTStack stack)
 {
        BlockNumber blkno;
-       Buffer          buf;
+       Buffer          buf, newbuf;
        OffsetNumber start,
                                offnum,
                                maxoff;
@@ -1101,11 +1145,18 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
        Size            itemsz;
        BTItem          new_item;
 
+#ifdef XLOG
+       Buffer          metabuf;
+#endif
+
        /* get a new root page */
        rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
        rootpage = BufferGetPage(rootbuf);
        rootblknum = BufferGetBlockNumber(rootbuf);
 
+#ifdef XLOG
+       metabuf = _bt_getbuf(rel, BTREE_METAPAGE,BT_WRITE);
+#endif
 
        /* NO ELOG(ERROR) from here till newroot op is logged */
 
@@ -1168,9 +1219,12 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
 #ifdef XLOG
        /* XLOG stuff */
        {
-               xl_btree_newroot           xlrec;
+               xl_btree_newroot        xlrec;
+               Page                            metapg = BufferGetPage(metabuf);
+               BTMetaPageData     *metad = BTPageGetMeta(metapg);
+
                xlrec.node = rel->rd_node;
-               xlrec.rootblk = rootblknum;
+               BlockIdSet(&(xlrec.rootblk), rootblknum);
 
                /* 
                 * Dirrect access to page is not good but faster - we should 
@@ -1181,16 +1235,25 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
                        (char*)rootpage + (PageHeader) rootpage)->pd_upper,
                        ((PageHeader) rootpage)->pd_special - ((PageHeader) rootpage)->upper);
 
+               metad->btm_root = rootblknum;
+               (metad->btm_level)++;
+
                PageSetLSN(rootpage, recptr);
                PageSetSUI(rootpage, ThisStartUpID);
+               PageSetLSN(metapg, recptr);
+               PageSetSUI(metapg, ThisStartUpID);
+
+               _bt_wrtbuf(rel, metabuf);
        }
 #endif
 
        /* write and let go of the new root buffer */
        _bt_wrtbuf(rel, rootbuf);
 
+#ifndef XLOG
        /* update metadata page with new root block number */
        _bt_metaproot(rel, rootblknum, 0);
+#endif
 
        /* update and release new sibling, and finally the old root */
        _bt_wrtbuf(rel, rbuf);
index 2da74219010543275f7bed0ca84c6f3b7d7889b1..41acd11659c13496bc83bfaa6563a3708ec9bf2e 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.38 2000/10/04 00:04:42 vadim Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.39 2000/10/13 02:03:00 vadim Exp $
  *
  *     NOTES
  *        Postgres btree pages look like ordinary relation pages.      The opaque
 #include "access/nbtree.h"
 #include "miscadmin.h"
 
-#define BTREE_METAPAGE 0
-#define BTREE_MAGIC            0x053162
-
-#define BTREE_VERSION  1
-
-typedef struct BTMetaPageData
-{
-       uint32          btm_magic;
-       uint32          btm_version;
-       BlockNumber btm_root;
-       int32           btm_level;
-} BTMetaPageData;
-
-#define BTPageGetMeta(p) \
-       ((BTMetaPageData *) &((PageHeader) p)->pd_linp[0])
-
-
 /*
  *     We use high-concurrency locking on btrees.      There are two cases in
  *     which we don't do locking.  One is when we're building the btree.
@@ -188,14 +171,18 @@ _bt_getroot(Relation rel, int access)
 #ifdef XLOG
                        /* XLOG stuff */
                        {
-                               xl_btree_insert    xlrec;
+                               xl_btree_newroot           xlrec;
+
                                xlrec.node = rel->rd_node;
+                               BlockIdSet(&(xlrec.rootblk), rootblkno);
 
                                XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT,
                                        &xlrec, SizeOfBtreeNewroot, NULL, 0);
 
                                PageSetLSN(rootpage, recptr);
                                PageSetSUI(rootpage, ThisStartUpID);
+                               PageSetLSN(metapg, recptr);
+                               PageSetSUI(metapg, ThisStartUpID);
                        }
 #endif
 
index 7fec982fa2d9e59388ca0f9121255c75ff9fcc8e..1064c2bb1075c7ff6c9f2b71227687569d9a6c17 100644 (file)
@@ -12,7 +12,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.63 2000/08/10 02:33:20 inoue Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.64 2000/10/13 02:03:00 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -730,3 +730,583 @@ _bt_restscan(IndexScanDesc scan)
                so->btso_curbuf = buf;
        }
 }
+
+#ifdef XLOG
+void btree_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+       uint8   info = record->xl_info & ~XLR_INFO_MASK;
+
+       if (info == XLOG_BTREE_DELETE)
+               btree_xlog_delete(true, lsn, record);
+       else if (info == XLOG_BTREE_INSERT)
+               btree_xlog_insert(true, lsn, record);
+       else if (info == XLOG_BTREE_SPLIT)
+               btree_xlog_split(true, false, lsn, record);     /* new item on the right */
+       else if (info == XLOG_BTREE_SPLEFT)
+               btree_xlog_split(true, true, lsn, record);      /* new item on the left */
+       else if (info == XLOG_BTREE_NEWROOT)
+               btree_xlog_newroot(true, lsn, record);
+       else
+               elog(STOP, "btree_redo: unknown op code %u", info);
+}
+
+void btree_undo(XLogRecPtr lsn, XLogRecord *record)
+{
+       uint8   info = record->xl_info & ~XLR_INFO_MASK;
+
+       if (info == XLOG_BTREE_DELETE)
+               btree_xlog_delete(false, lsn, record);
+       else if (info == XLOG_BTREE_INSERT)
+               btree_xlog_insert(false, lsn, record);
+       else if (info == XLOG_BTREE_SPLIT)
+               btree_xlog_split(false, false, lsn, record);/* new item on the right */
+       else if (info == XLOG_BTREE_SPLEFT)
+               btree_xlog_split(false, true, lsn, record);     /* new item on the left */
+       else if (info == XLOG_BTREE_NEWROOT)
+               btree_xlog_newroot(false, lsn, record);
+       else
+               elog(STOP, "btree_undo: unknown op code %u", info);
+}
+
+static void btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
+{
+       xl_btree_delete    *xlrec;
+       Relation                   *reln;
+       Buffer                          buffer;
+       Page                            page;
+
+       if (!redo)
+               return;
+
+       xlrec = (xl_btree_delete*) XLogRecGetData(record);
+       reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
+       if (!RelationIsValid(reln))
+               return;
+       buffer = XLogReadBuffer(false, reln, 
+                               ItemPointerGetBlockNumber(&(xlrec->target.tid)));
+       if (!BufferIsValid(buffer))
+               elog(STOP, "btree_delete_redo: block unfound");
+       page = (Page) BufferGetPage(buffer);
+       if (PageIsNew((PageHeader) page))
+               elog(STOP, "btree_delete_redo: uninitialized page");
+
+       PageIndexTupleDelete(page, ItemPointerGetOffsetNumber(&(xlrec->target.tid)));
+
+       return;
+}
+
+static void btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
+{
+       xl_btree_insert    *xlrec;
+       Relation                   *reln;
+       Buffer                          buffer;
+       Page                            page;
+       BTPageOpaque            pageop;
+
+       xlrec = (xl_btree_insert*) XLogRecGetData(record);
+       reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
+       if (!RelationIsValid(reln))
+               return;
+       buffer = XLogReadBuffer((redo) ? true : false, reln, 
+                               ItemPointerGetBlockNumber(&(xlrec->target.tid)));
+       if (!BufferIsValid(buffer))
+               return;
+       page = (Page) BufferGetPage(buffer);
+       if (PageIsNew((PageHeader) page))
+               elog(STOP, "btree_insert_%s: uninitialized page",
+                       (redo) ? "redo" : "undo");
+       pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+       if (redo)
+       {
+               if (XLByteLE(lsn, PageGetLSN(page)))
+                       UnlockAndReleaseBuffer(buffer);
+               else
+               {
+                       Size            hsize = SizeOfBtreeInsert;
+                       RelFileNode     hnode;
+
+                       if (P_ISLEAF(pageop))
+                       {
+                               hsize += (sizeof(CommandId) + sizeof(RelFileNode));
+                               memcpy(&hnode, (char*)xlrec + SizeOfBtreeInsert + 
+                                                       sizeof(CommandId), sizeof(RelFileNode));
+                       }
+
+                       if (! _bt_add_item(page, 
+                                       ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
+                                       (char*)xlrec + hsize,
+                                       record->xl_len - hsize,
+                                       &hnode))
+                               elog(STOP, "btree_insert_redo: failed to add item");
+
+                       PageSetLSN(page, lsn);
+                       PageSetSUI(page, ThisStartUpID);
+                       UnlockAndWriteBuffer(buffer);
+               }
+       }
+       else
+       {
+               BTItemData              btdata;
+
+               if (XLByteLT(PageGetLSN(page), lsn))
+                       elog(STOP, "btree_insert_undo: bad page LSN");
+
+               if (! P_ISLEAF(pageop))
+               {
+                       UnlockAndReleaseBuffer(buffer);
+                       return;
+               }
+
+               memcpy(&btdata, (char*)xlrec + SizeOfBtreeInsert + 
+                       sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData));
+
+               _bt_del_item(reln, buffer, &btdata, true, lsn, record);
+
+       }
+
+       return;
+}
+
+static void
+btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
+{
+       xl_btree_split     *xlrec;
+       Relation                   *reln;
+       BlockNumber                     blkno;
+       BlockNumber                     parent;
+       Buffer                          buffer;
+       Page                            page;
+       BTPageOpaque            pageop;
+       char                       *op = (redo) ? "redo" : "undo";
+       bool                            isleaf;
+
+       xlrec = (xl_btree_split*) XLogRecGetData(record);
+       reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
+       if (!RelationIsValid(reln))
+               return;
+
+       /* Left (original) sibling */
+       blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
+                                       BlockIdGetBlockNumber(xlrec->otherblk);
+       buffer = XLogReadBuffer(false, reln, blkno);
+       if (!BufferIsValid(buffer))
+               elog(STOP, "btree_split_%s: lost left sibling", op);
+
+       page = (Page) BufferGetPage(buffer);
+       if (PageIsNew((PageHeader) page))
+               elog(STOP, "btree_split_%s: uninitialized left sibling", op);
+
+       pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+       isleaf = P_ISLEAF(pageop);
+       parent = pageop->btpo_parent;
+
+       if (redo)
+       {
+               if (XLByteLE(lsn, PageGetLSN(page)))
+                       UnlockAndReleaseBuffer(buffer);
+               else
+               {
+                       /* Delete items related to new right sibling */
+                       _bt_thin_left_page(page, record);
+
+                       if (onleft)
+                       {
+                               BTItemData      btdata;
+                               Size            hsize = SizeOfBtreeSplit;
+                               Size            itemsz;
+                               RelFileNode     hnode;
+
+                               pageop->btpo_next = BlockIdGetBlockNumber(xlrec->otherblk);
+                               if (isleaf)
+                               {
+                                       hsize += (sizeof(CommandId) + sizeof(RelFileNode));
+                                       memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + 
+                                                               sizeof(CommandId), sizeof(RelFileNode));
+                               }
+
+                               memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
+                               itemsz = IndexTupleDSize(btdata.bti_itup) +
+                                                       (sizeof(BTItemData) - sizeof(IndexTupleData));
+
+                               if (! _bt_add_item(page, 
+                                               ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
+                                               (char*)xlrec + hsize,
+                                               itemsz,
+                                               &hnode))
+                                       elog(STOP, "btree_split_redo: failed to add item");
+                       }
+                       else
+                               pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid));
+
+                       PageSetLSN(page, lsn);
+                       PageSetSUI(page, ThisStartUpID);
+                       UnlockAndWriteBuffer(buffer);
+               }
+       }
+       else    /* undo */
+       {
+               if (XLByteLT(PageGetLSN(page), lsn))
+                       elog(STOP, "btree_split_undo: bad left sibling LSN");
+
+               if (! isleaf || ! onleft)
+                       UnlockAndReleaseBuffer(buffer);
+               else
+               {
+                       BTItemData              btdata;
+
+                       memcpy(&btdata, (char*)xlrec + SizeOfBtreeSplit + 
+                               sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData));
+
+                       _bt_del_item(reln, buffer, &btdata, false, lsn, record);
+               }
+       }
+
+       /* Right (new) sibling */
+       blkno = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) : 
+                                       ItemPointerGetBlockNumber(&(xlrec->target.tid));
+       buffer = XLogReadBuffer((redo) ? true : false, reln, blkno);
+       if (!BufferIsValid(buffer))
+               elog(STOP, "btree_split_%s: lost right sibling", op);
+
+       page = (Page) BufferGetPage(buffer);
+       if (PageIsNew((PageHeader) page))
+       {
+               if (!redo)
+                       elog(STOP, "btree_split_undo: uninitialized right sibling");
+               PageInit(page, BufferGetPageSize(buffer), 0);
+       }
+
+       if (redo)
+       {
+               if (XLByteLE(lsn, PageGetLSN(page)))
+                       UnlockAndReleaseBuffer(buffer);
+               else
+               {
+                       Size            hsize = SizeOfBtreeSplit;
+                       BTItemData      btdata;
+                       Size            itemsz;
+
+                       _bt_pageinit(page, BufferGetPageSize(buffer));
+                       pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+                       if (isleaf)
+                       {
+                               pageop->btpo_flags |= BTP_LEAF;
+                               hsize += (sizeof(CommandId) + sizeof(RelFileNode));
+                       }
+                       if (onleft)             /* skip target item */
+                       {
+                               memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
+                               itemsz = IndexTupleDSize(btdata.bti_itup) +
+                                                       (sizeof(BTItemData) - sizeof(IndexTupleData));
+                               hsize += itemsz;
+                       }
+
+                       for (char* item = (char*)xlrec + hsize;
+                                       item < (char*)record + record->xl_len; )
+                       {
+                               memcpy(&btdata, item, sizeof(BTItemData));
+                               itemsz = IndexTupleDSize(btdata.bti_itup) +
+                                                       (sizeof(BTItemData) - sizeof(IndexTupleData));
+                               itemsz = MAXALIGN(itemsz);
+                               if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber,   
+                                               LP_USED) == InvalidOffsetNumber)
+                                       elog(STOP, "btree_split_redo: can't add item to right sibling");
+                               item += itemsz;
+                       }
+
+                       pageop->btpo_prev = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
+                                       BlockIdGetBlockNumber(xlrec->otherblk);
+                       pageop->btpo_next = BlockIdGetBlockNumber(xlrec->rightblk);
+                       pageop->btpo_parent = parent;
+
+                       PageSetLSN(page, lsn);
+                       PageSetSUI(page, ThisStartUpID);
+                       UnlockAndWriteBuffer(buffer);
+               }
+       }
+       else    /* undo */
+       {
+               if (XLByteLT(PageGetLSN(page), lsn))
+                       elog(STOP, "btree_split_undo: bad right sibling LSN");
+
+               if (! isleaf || onleft)
+                       UnlockAndReleaseBuffer(buffer);
+               else
+               {
+                       char            tbuf[BLCKSZ];
+                       int                     cnt;
+                       char       *item;
+                       Size            itemsz;
+
+                       item = (char*)xlrec + SizeOfBtreeSplit +
+                                       sizeof(CommandId) + sizeof(RelFileNode);
+                       for (cnt = 0; item < (char*)record + record->xl_len; )
+                       {
+                               BTItem  btitem = (BTItem)
+                                       (tbuf + cnt * (MAXALIGN(sizeof(BTItemData))));
+                               memcpy(btitem, item, sizeof(BTItemData));
+                               itemsz = IndexTupleDSize(btitem->bti_itup) +
+                                                       (sizeof(BTItemData) - sizeof(IndexTupleData));
+                               itemsz = MAXALIGN(itemsz);
+                               item += itemsz;
+                               cnt++;
+                       }
+                       cnt -= ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+                       if (cnt < 0)
+                               elog(STOP, "btree_split_undo: target item unfound in right sibling");
+
+                       item = tbuf + cnt * (MAXALIGN(sizeof(BTItemData)));
+
+                       _bt_del_item(reln, buffer, (BTItem)item, false, lsn, record);
+               }
+       }
+
+       /* Right (next) page */
+       blkno = BlockIdGetBlockNumber(xlrec->rightblk);
+       buffer = XLogReadBuffer(false, reln, blkno);
+       if (!BufferIsValid(buffer))
+               elog(STOP, "btree_split_%s: lost next right page", op);
+
+       page = (Page) BufferGetPage(buffer);
+       if (PageIsNew((PageHeader) page))
+               elog(STOP, "btree_split_%s: uninitialized next right page", op);
+
+       if (redo)
+       {
+               if (XLByteLE(lsn, PageGetLSN(page)))
+                       UnlockAndReleaseBuffer(buffer);
+               else
+               {
+                       pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+                       pageop->btpo_prev = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) :
+                                       ItemPointerGetBlockNumber(&(xlrec->target.tid));
+
+                       PageSetLSN(page, lsn);
+                       PageSetSUI(page, ThisStartUpID);
+                       UnlockAndWriteBuffer(buffer);
+               }
+       }
+       else    /* undo */
+       {
+               if (XLByteLT(PageGetLSN(page), lsn))
+                       elog(STOP, "btree_split_undo: bad next right page LSN");
+
+               UnlockAndReleaseBuffer(buffer);
+       }
+
+}
+
+static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
+{
+       xl_btree_newroot   *xlrec;
+       Relation                   *reln;
+       Buffer                          buffer;
+       Page                            page;
+       Buffer                          metabuf;
+       Page                            metapg;
+
+       if (!redo)
+               return;
+
+       xlrec = (xl_btree_newroot*) XLogRecGetData(record);
+       reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node);
+       if (!RelationIsValid(reln))
+               return;
+       buffer = XLogReadBuffer(true, reln, BlockIdGetBlockNumber(&(xlrec->rootblk)));
+       if (!BufferIsValid(buffer))
+               elog(STOP, "btree_newroot_redo: no root page");
+       metabuf = XLogReadBuffer(false, reln, BTREE_METAPAGE);
+       if (!BufferIsValid(buffer))
+               elog(STOP, "btree_newroot_redo: no metapage");
+       page = (Page) BufferGetPage(buffer);
+
+       if (PageIsNew((PageHeader) page) || XLByteLT(PageGetLSN(page), lsn))
+       {
+               _bt_pageinit(page, BufferGetPageSize(buffer));
+               pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+               pageop->btpo_flags |= BTP_ROOT;
+               pageop->btpo_prev = pageop->btpo_next = P_NONE;
+               pageop->btpo_parent = BTREE_METAPAGE;
+
+               if (record->xl_len == SizeOfBtreeNewroot)       /* no childs */
+                       pageop->btpo_flags |= BTP_LEAF;
+               else
+               {
+                       BTItemData      btdata;
+                       Size            itemsz;
+
+                       for (char* item = (char*)xlrec + SizeOfBtreeNewroot;
+                                       item < (char*)record + record->xl_len; )
+                       {
+                               memcpy(&btdata, item, sizeof(BTItemData));
+                               itemsz = IndexTupleDSize(btdata.bti_itup) +
+                                                       (sizeof(BTItemData) - sizeof(IndexTupleData));
+                               itemsz = MAXALIGN(itemsz);
+                               if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber,   
+                                               LP_USED) == InvalidOffsetNumber)
+                                       elog(STOP, "btree_newroot_redo: can't add item");
+                               item += itemsz;
+                       }
+               }
+
+               PageSetLSN(page, lsn);
+               PageSetSUI(page, ThisStartUpID);
+               UnlockAndWriteBuffer(buffer);
+       }
+       else
+               UnlockAndReleaseBuffer(buffer);
+
+       metapg = BufferGetPage(metabuf);
+       if (PageIsNew((PageHeader) metapg))
+       {
+               BTMetaPageData  md;
+
+               _bt_pageinit(metapg, BufferGetPageSize(metabuf));
+               md.btm_magic = BTREE_MAGIC;
+               md.btm_version = BTREE_VERSION;
+               md.btm_root = P_NONE;
+               md.btm_level = 0;
+               memcpy((char *) BTPageGetMeta(pg), (char *) &md, sizeof(md));
+       }
+
+       if (XLByteLT(PageGetLSN(metapg), lsn))
+       {
+               BTMetaPageData     *metad = BTPageGetMeta(metapg);
+
+               metad->btm_root = BlockIdGetBlockNumber(&(xlrec->rootblk));
+               (metad->btm_level)++;
+               PageSetLSN(metapg, lsn);
+               PageSetSUI(metapg, ThisStartUpID);
+               UnlockAndWriteBuffer(metabuf);
+       }
+       else
+               UnlockAndReleaseBuffer(metabuf);
+
+       return;
+}
+
+/*
+ * UNDO insertion on *leaf* page: 
+ * - find inserted tuple;
+ * - delete it if heap tuple was inserted by the same xaction
+ */
+static void
+_bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert, 
+                               XLogRecPtr lsn, XLogRecord *record)
+{
+       char               *xlrec = (char*) XLogRecGetData(record);
+       Page                    page = (Page) BufferGetPage(buffer);
+       BTPageOpaque    pageop;
+       BlockNumber             blkno;
+       OffsetNumber    offno;
+       ItemId                  lp;
+
+       for ( ; ; )
+       {
+               offno = _bt_find_btitem(page, btitem);
+               if (offno != InvalidOffsetNumber)
+                       break;
+               pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+               if (P_RIGHTMOST(pageop))
+                       break;
+               blkno = pageop->btpo_next;
+               UnlockAndReleaseBuffer(buffer);
+               buffer = XLogReadBuffer(false, reln, blkno);
+               if (!BufferIsValid(buffer))
+                       elog(STOP, "btree_%s_undo: lost right sibling",
+                               (insert) ? "insert" : "split");
+               page = (Page) BufferGetPage(buffer);
+               if (PageIsNew((PageHeader) page))
+                       elog(STOP, "btree_%s_undo: uninitialized right sibling",
+                               (insert) ? "insert" : "split");
+               if (XLByteLT(PageGetLSN(page), lsn))
+                       break;
+       }
+
+       if (offno == InvalidOffsetNumber)       /* not found */
+       {
+               if (!InRecovery)
+                       elog(STOP, "btree_%s_undo: lost target tuple in rollback",
+                               (insert) ? "insert" : "split");
+               UnlockAndReleaseBuffer(buffer);
+               return;
+       }
+
+       lp = PageGetItemId(page, offno);
+       if (ItemIdDeleted(lp))  /* marked for deletion */
+       {
+               if (!InRecovery)
+                       elog(STOP, "btree_%s_undo: deleted target tuple in rollback",
+                               (insert) ? "insert" : "split");
+       }
+       else if (InRecovery)    /* check heap tuple */
+       {
+               int                     result;
+               CommandId       cid;
+               RelFileNode     hnode;
+               Size            hsize = (insert) ? SizeOfBtreeInsert : SizeOfBtreeSplit;
+
+               memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId));
+               memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode));
+               result = XLogCheckHeapTuple(hnode, &(btitem->bti_itup.t_tid),
+                                       record->xl_xid, cid);
+               if (result <= 0)        /* no tuple or not owner */
+               {
+                       UnlockAndReleaseBuffer(buffer);
+                       return;
+               }
+       }
+       else if (! BufferIsUpdatable(buffer))   /* normal rollback */
+       {
+               lp->lp_flags |= LP_DELETE;
+               MarkBufferForCleanup(buffer, IndexPageCleanup);
+               return;
+       }
+
+       PageIndexTupleDelete(page, offno);
+       if (InRecovery)
+       {
+               pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+               pageop->btpo_flags |= BTP_REORDER;
+       }
+       UnlockAndWriteBuffer(buffer);
+
+       return;
+}
+
+static bool
+_bt_add_item(Page page, OffsetNumber offno, 
+       char* item, Size size, RelFileNode* hnode)
+{
+       BTPageOpaque    pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+       if (offno > PageGetMaxOffsetNumber(page) + 1)
+       {
+               if (! (pageop->btpo_flags & BTP_REORDER))
+               {
+                       elog(NOTICE, "btree_add_item: BTP_REORDER flag was expected");
+                       pageop->btpo_flags |= BTP_REORDER;
+               }
+               offno = PageGetMaxOffsetNumber(page) + 1;
+       }
+
+       if (PageAddItem(page, (Item) item, size, offno, 
+                       LP_USED) == InvalidOffsetNumber)
+       {
+               /* ops, not enough space - try to deleted dead tuples */
+               bool            result;
+
+               if (! P_ISLEAF(pageop))
+                       return(false);
+               result = _bt_cleanup_page(page, hnode);
+               if (!result || PageAddItem(page, (Item) item, size, offno,      
+                               LP_USED) == InvalidOffsetNumber)
+                       return(false);
+       }
+
+       return(true);
+}
+
+#endif
index 437b6637b2438bbe47f798b5c2c3d4e027a11fb1..4ca61e0c630c5f6c6d725f7652ffdcaf3ab4fa92 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: nbtree.h,v 1.43 2000/10/04 00:04:43 vadim Exp $
+ * $Id: nbtree.h,v 1.44 2000/10/13 02:03:02 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -42,11 +42,28 @@ typedef struct BTPageOpaqueData
 #define BTP_FREE               (1 << 2)        /* not currently used... */
 #define BTP_META               (1 << 3)        /* Set in the meta-page only */
 
+#ifdef XLOG
+#define        BTP_REORDER             (1 << 4)        /* items must be re-ordered */
+#endif
 } BTPageOpaqueData;
 
 typedef BTPageOpaqueData *BTPageOpaque;
 
 #define BTREE_METAPAGE 0       /* first page is meta */
+#define BTREE_MAGIC            0x053162
+
+#define BTREE_VERSION  1
+
+typedef struct BTMetaPageData
+{
+       uint32          btm_magic;
+       uint32          btm_version;
+       BlockNumber btm_root;
+       int32           btm_level;
+} BTMetaPageData;
+
+#define BTPageGetMeta(p) \
+       ((BTMetaPageData *) &((PageHeader) p)->pd_linp[0])
 
 /*
  *     BTScanOpaqueData is used to remember which buffers we're currently
@@ -228,13 +245,13 @@ typedef struct xl_btree_delete
 
 /* 
  * This is what we need to know about pure (without split) insert - 
- * 14 + [4] + btitem with key data. Note that we need in CommandID
- * (4 bytes) only for leaf page insert.
+ * 14 + [4+8] + btitem with key data. Note that we need in CommandID
+ * and HeapNode (4 + 8 bytes) only for leaf page insert.
  */
 typedef struct xl_btree_insert
 {
        xl_btreetid                     target;         /* inserted tuple id */
-       /* [CommandID and ] BTITEM FOLLOWS AT END OF STRUCT */
+       /* [CommandID, HeapNode and ] BTITEM FOLLOWS AT END OF STRUCT */
 } xl_btree_insert;
 
 #define SizeOfBtreeInsert      (offsetof(xl_btreetid, tid) + SizeOfIptrData)
@@ -242,8 +259,8 @@ typedef struct xl_btree_insert
 
 /* 
  * This is what we need to know about insert with split - 
- * 22 + [4] + [btitem] + right sibling btitems. Note that we need in
- * CommandID (4 bytes) only for leaf page insert.
+ * 22 + [4+8] + [btitem] + right sibling btitems. Note that we need in
+ * CommandID and HeapNode (4 + 8 bytes) only for leaf page insert.
  */
 typedef struct xl_btree_split
 {
@@ -255,7 +272,7 @@ typedef struct xl_btree_split
         * We log all btitems from the right sibling. If new btitem goes on
         * the left sibling then we log it too and it will be the first
         * BTItemData at the end of this struct, but after (for the leaf
-        * pages) CommandId.
+        * pages) CommandId and HeapNode.
         */
 } xl_btree_split;