From deee783052892fc92711de95066f58a129906099 Mon Sep 17 00:00:00 2001 From: "Vadim B. Mikheev" Date: Fri, 13 Oct 2000 12:05:22 +0000 Subject: [PATCH] WAL --- src/backend/access/heap/heapam.c | 8 +- src/backend/access/nbtree/nbtinsert.c | 18 ++- src/backend/access/nbtree/nbtree.c | 169 ++++++++++++++++++++++++-- src/backend/access/transam/Makefile | 4 +- src/include/access/nbtree.h | 11 +- 5 files changed, 187 insertions(+), 23 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 3e1de33bfe..794ba977cc 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.87 2000/10/13 02:02:59 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.88 2000/10/13 12:05:20 vadim Exp $ * * * INTERFACE ROUTINES @@ -2198,7 +2198,8 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) HeapTupleHeader htup = (HeapTupleHeader) PageGetItem(page, lp); /* is it our tuple ? */ - if (htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid) + if (PageGetSUI(page) != ThisStartUpID || + htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid) { if (!InRecovery) elog(STOP, "heap_insert_undo: invalid target tuple in rollback"); @@ -2394,7 +2395,8 @@ newt:; htup = (HeapTupleHeader) PageGetItem(page, lp); /* is it our tuple ? */ - if (htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid) + if (PageGetSUI(page) != ThisStartUpID || + htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid) { if (!InRecovery) elog(STOP, "heap_update_undo: invalid new tuple in rollback"); diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index c72b8ca3df..0105739b5c 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.65 2000/10/13 02:03:00 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.66 2000/10/13 12:05:20 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -612,6 +612,10 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, OffsetNumber maxoff; OffsetNumber i; +#ifdef XLOG + BTItem lhikey; +#endif + rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); origpage = BufferGetPage(buf); leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData)); @@ -680,6 +684,9 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, itemsz = ItemIdGetLength(itemid); item = (BTItem) PageGetItem(origpage, itemid); } +#ifdef XLOG + lhikey = item; +#endif if (PageAddItem(leftpage, (Item) item, itemsz, leftoff, LP_USED) == InvalidOffsetNumber) elog(STOP, "btree: failed to add hikey to the left sibling"); @@ -793,12 +800,19 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode)); hsize += sizeof(RelFileNode); } + else + { + Size itemsz = IndexTupleDSize(lhikey->bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + memcpy(xlbuf + hsize, (char*) lhikey, itemsz); + hsize += itemsz; + } if (newitemonleft) { /* * Read comments in _bt_pgaddtup. * Actually, seems that in non-leaf splits newitem shouldn't - * go to first data key position. + * go to first data key position on left page. */ if (! P_ISLEAF(lopaque) && itup_off == P_FIRSTDATAKEY(lopaque)) { diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 1064c2bb10..a57bac1c81 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -12,7 +12,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.64 2000/10/13 02:03:00 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.65 2000/10/13 12:05:20 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -837,7 +837,7 @@ static void btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) ItemPointerGetOffsetNumber(&(xlrec->target.tid)), (char*)xlrec + hsize, record->xl_len - hsize, - &hnode)) + hnode)) elog(STOP, "btree_insert_redo: failed to add item"); PageSetLSN(page, lsn); @@ -908,7 +908,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) else { /* Delete items related to new right sibling */ - _bt_thin_left_page(page, record); + _bt_fix_left_page(page, record, onleft); if (onleft) { @@ -924,6 +924,13 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + sizeof(CommandId), sizeof(RelFileNode)); } + else + { + memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + hsize += itemsz; + } memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); itemsz = IndexTupleDSize(btdata.bti_itup) + @@ -933,7 +940,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) ItemPointerGetOffsetNumber(&(xlrec->target.tid)), (char*)xlrec + hsize, itemsz, - &hnode)) + hnode)) elog(STOP, "btree_split_redo: failed to add item"); } else @@ -994,6 +1001,13 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) pageop->btpo_flags |= BTP_LEAF; hsize += (sizeof(CommandId) + sizeof(RelFileNode)); } + else + { + memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + hsize += itemsz; + } if (onleft) /* skip target item */ { memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); @@ -1198,17 +1212,28 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert, { char *xlrec = (char*) XLogRecGetData(record); Page page = (Page) BufferGetPage(buffer); - BTPageOpaque pageop; + BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); BlockNumber blkno; OffsetNumber offno; ItemId lp; + BTItem item; for ( ; ; ) { - offno = _bt_find_btitem(page, btitem); - if (offno != InvalidOffsetNumber) + OffsetNumber maxoff = PageGetMaxOffsetNumber(page); + + for (offno = P_FIRSTDATAKEY(pageop); + offno <= maxoff; + offno = OffsetNumberNext(offno)) + { + lp = PageGetItemId(page, offno); + item = (BTItem) PageGetItem(page, lp); + if (BTItemSame(item, btitem)) + break; + } + if (offno <= maxoff) break; - pageop = (BTPageOpaque) PageGetSpecialPointer(page); + offno = InvalidOffsetNumber; if (P_RIGHTMOST(pageop)) break; blkno = pageop->btpo_next; @@ -1221,6 +1246,7 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert, if (PageIsNew((PageHeader) page)) elog(STOP, "btree_%s_undo: uninitialized right sibling", (insert) ? "insert" : "split"); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); if (XLByteLT(PageGetLSN(page), lsn)) break; } @@ -1250,9 +1276,9 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert, memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId)); memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode)); - result = XLogCheckHeapTuple(hnode, &(btitem->bti_itup.t_tid), + result = XLogIsOwnerOfTuple(hnode, &(btitem->bti_itup.t_tid), record->xl_xid, cid); - if (result <= 0) /* no tuple or not owner */ + if (result < 0) /* not owner */ { UnlockAndReleaseBuffer(buffer); return; @@ -1278,7 +1304,7 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert, static bool _bt_add_item(Page page, OffsetNumber offno, - char* item, Size size, RelFileNode* hnode) + char* item, Size size, RelFileNode hnode) { BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); @@ -1309,4 +1335,125 @@ _bt_add_item(Page page, OffsetNumber offno, return(true); } +static bool +_bt_cleanup_page(Page page, RelFileNode hnode) +{ + OffsetNumber maxoff = PageGetMaxOffsetNumber(page); + OffsetNumber offno; + ItemId lp; + BTItem item; + bool result = false; + + for (offno = P_FIRSTDATAKEY(pageop); offno <= maxoff; ) + { + lp = PageGetItemId(page, offno); + item = (BTItem) PageGetItem(page, lp); + if (XLogIsValidTuple(hnode, &(item->bti_itup.t_tid)) + offno = OffsetNumberNext(offno); + else + { + PageIndexTupleDelete(page, offno); + maxoff = PageGetMaxOffsetNumber(page); + result = true; + } + } + + return(result); +} + +/* + * Remove from left sibling items belonging to right sibling + * and change P_HIKEY + */ +static void +_bt_fix_left_page(Page page, XLogRecord *record, bool onleft) +{ + char *xlrec = (char*) XLogRecGetData(record); + BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); + Size hsize = SizeOfBtreeSplit; + RelFileNode hnode; + BTItemData btdata; + OffsetNumber maxoff = PageGetMaxOffsetNumber(page); + OffsetNumber offno; + char *item; + Size itemsz; + char *previtem = NULL; + char *lhikey = NULL; + Size lhisize = 0; + + if (pageop->btpo_flags & BTP_LEAF) + { + hsize += (sizeof(CommandId) + sizeof(RelFileNode)); + memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + + sizeof(CommandId), sizeof(RelFileNode)); + } + else + { + lhikey = (char*)xlrec + hsize; + memcpy(&btdata, lhikey, sizeof(BTItemData)); + lhisize = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + hsize += lhisize; + } + + if (! P_RIGHTMOST(pageop)) + PageIndexTupleDelete(page, P_HIKEY); + + if (onleft) /* skip target item */ + { + memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + hsize += itemsz; + } + + for (item = (char*)xlrec + hsize; ; ) + { + memcpy(&btdata, item, sizeof(BTItemData)); + for (offno = P_FIRSTDATAKEY(pageop); + offno <= maxoff; + offno = OffsetNumberNext(offno)) + { + ItemId lp = PageGetItemId(page, offno); + BTItem btitem = (BTItem) PageGetItem(page, lp); + + if (BTItemSame(&btdata, btitem)) + { + PageIndexTupleDelete(page, offno); + break; + } + } + + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = MAXALIGN(itemsz); + + if (item + itemsz < (char*)record + record->xl_len) + { + previtem = item; + item += itemsz; + } + else + break; + } + + /* time to insert hi-key */ + if (pageop->btpo_flags & BTP_LEAF) + { + lhikey = (P_RIGHTMOST(pageop)) ? item : previtem; + memcpy(&btdata, lhikey, sizeof(BTItemData)); + lhisize = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + } + + if (! _bt_add_item(page, + P_HIKEY, + lhikey, + lhisize, + &hnode)) + elog(STOP, "btree_split_redo: failed to add hi key to left sibling"); + + return; +} + #endif diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 303936f455..272a883c52 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -4,7 +4,7 @@ # Makefile for access/transam # # IDENTIFICATION -# $Header: /cvsroot/pgsql/src/backend/access/transam/Makefile,v 1.12 2000/08/31 16:09:46 petere Exp $ +# $Header: /cvsroot/pgsql/src/backend/access/transam/Makefile,v 1.13 2000/10/13 12:05:21 vadim Exp $ # #------------------------------------------------------------------------- @@ -12,7 +12,7 @@ subdir = src/backend/access/transam top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = transam.o transsup.o varsup.o xact.o xid.o xlog.o rmgr.o +OBJS = transam.o transsup.o varsup.o xact.o xid.o xlog.o xlogutils.o rmgr.o all: SUBSYS.o diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index 4ca61e0c63..02a0bb6f5d 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: nbtree.h,v 1.44 2000/10/13 02:03:02 vadim Exp $ + * $Id: nbtree.h,v 1.45 2000/10/13 12:05:22 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -259,8 +259,9 @@ typedef struct xl_btree_insert /* * This is what we need to know about insert with split - - * 22 + [4+8] + [btitem] + right sibling btitems. Note that we need in - * CommandID and HeapNode (4 + 8 bytes) only for leaf page insert. + * 22 + {4 + 8 | left hi-key} + [btitem] + right sibling btitems. Note that + * we need in CommandID and HeapNode (4 + 8 bytes) for leaf pages + * and in left page hi-key for non-leaf ones. */ typedef struct xl_btree_split { @@ -271,8 +272,8 @@ typedef struct xl_btree_split /* * We log all btitems from the right sibling. If new btitem goes on * the left sibling then we log it too and it will be the first - * BTItemData at the end of this struct, but after (for the leaf - * pages) CommandId and HeapNode. + * BTItemData at the end of this struct after CommandId and HeapNode + * on the leaf pages and left page hi-key on non-leaf ones. */ } xl_btree_split; -- 2.40.0