if (metadata->head == InvalidBlockNumber)
{
/*
- * Main list is empty, so just copy sublist into main list
+ * Main list is empty, so just insert sublist as main list
*/
START_CRIT_SECTION();
LockBuffer(buffer, GIN_EXCLUSIVE);
page = BufferGetPage(buffer);
+ rdata[0].next = rdata + 1;
+
+ rdata[1].buffer = buffer;
+ rdata[1].buffer_std = true;
+ rdata[1].data = NULL;
+ rdata[1].len = 0;
+ rdata[1].next = NULL;
+
Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
START_CRIT_SECTION();
MetaBuffer;
Page page;
+ /* Backup blocks are not used in create_index records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true);
Assert(BufferIsValid(MetaBuffer));
page = (Page) BufferGetPage(MetaBuffer);
Buffer buffer;
Page page;
+ /* Backup blocks are not used in create_ptree records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
buffer = XLogReadBuffer(data->node, data->blkno, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
}
}
- /* nothing else to do if page was backed up */
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
buffer = XLogReadBuffer(data->node, data->blkno, false);
if (!BufferIsValid(buffer))
if (data->isData)
flags |= GIN_DATA;
+ /* Backup blocks are not used in split records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
Assert(BufferIsValid(lbuffer));
lpage = (Page) BufferGetPage(lbuffer);
Buffer buffer;
Page page;
- /* nothing to do if page was backed up (and no info to do it with) */
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
buffer = XLogReadBuffer(data->node, data->blkno, false);
if (!BufferIsValid(buffer))
ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
- Buffer buffer;
+ Buffer dbuffer;
+ Buffer pbuffer;
+ Buffer lbuffer;
Page page;
- if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ dbuffer = RestoreBackupBlock(lsn, record, 0, false, true);
+ else
{
- buffer = XLogReadBuffer(data->node, data->blkno, false);
- if (BufferIsValid(buffer))
+ dbuffer = XLogReadBuffer(data->node, data->blkno, false);
+ if (BufferIsValid(dbuffer))
{
- page = BufferGetPage(buffer);
+ page = BufferGetPage(dbuffer);
if (!XLByteLE(lsn, PageGetLSN(page)))
{
Assert(GinPageIsData(page));
GinPageGetOpaque(page)->flags = GIN_DELETED;
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
+ MarkBufferDirty(dbuffer);
}
- UnlockReleaseBuffer(buffer);
}
}
- if (!(record->xl_info & XLR_BKP_BLOCK_2))
+ if (record->xl_info & XLR_BKP_BLOCK(1))
+ pbuffer = RestoreBackupBlock(lsn, record, 1, false, true);
+ else
{
- buffer = XLogReadBuffer(data->node, data->parentBlkno, false);
- if (BufferIsValid(buffer))
+ pbuffer = XLogReadBuffer(data->node, data->parentBlkno, false);
+ if (BufferIsValid(pbuffer))
{
- page = BufferGetPage(buffer);
+ page = BufferGetPage(pbuffer);
if (!XLByteLE(lsn, PageGetLSN(page)))
{
Assert(GinPageIsData(page));
PageDeletePostingItem(page, data->parentOffset);
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
+ MarkBufferDirty(pbuffer);
}
- UnlockReleaseBuffer(buffer);
}
}
- if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
+ if (record->xl_info & XLR_BKP_BLOCK(2))
+ (void) RestoreBackupBlock(lsn, record, 2, false, false);
+ else if (data->leftBlkno != InvalidBlockNumber)
{
- buffer = XLogReadBuffer(data->node, data->leftBlkno, false);
- if (BufferIsValid(buffer))
+ lbuffer = XLogReadBuffer(data->node, data->leftBlkno, false);
+ if (BufferIsValid(lbuffer))
{
- page = BufferGetPage(buffer);
+ page = BufferGetPage(lbuffer);
if (!XLByteLE(lsn, PageGetLSN(page)))
{
Assert(GinPageIsData(page));
GinPageGetOpaque(page)->rightlink = data->rightLink;
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
+ MarkBufferDirty(lbuffer);
}
- UnlockReleaseBuffer(buffer);
+ UnlockReleaseBuffer(lbuffer);
}
}
+
+ if (BufferIsValid(pbuffer))
+ UnlockReleaseBuffer(pbuffer);
+ if (BufferIsValid(dbuffer))
+ UnlockReleaseBuffer(dbuffer);
}
static void
/*
* insert into tail page
*/
- if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
{
buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
if (BufferIsValid(buffer))
/*
* New tail
*/
- buffer = XLogReadBuffer(data->node, data->prevTail, false);
- if (BufferIsValid(buffer))
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
{
- Page page = BufferGetPage(buffer);
-
- if (!XLByteLE(lsn, PageGetLSN(page)))
+ buffer = XLogReadBuffer(data->node, data->prevTail, false);
+ if (BufferIsValid(buffer))
{
- GinPageGetOpaque(page)->rightlink = data->newRightlink;
+ Page page = BufferGetPage(buffer);
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ GinPageGetOpaque(page)->rightlink = data->newRightlink;
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
}
- UnlockReleaseBuffer(buffer);
}
}
tupsize;
IndexTuple tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsertListPage));
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
buffer = XLogReadBuffer(data->node, data->blkno, true);
Assert(BufferIsValid(buffer));
Page metapage;
int i;
+ /* Backup blocks are not used in delete_listpage records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
if (!BufferIsValid(metabuffer))
return; /* assume index was deleted, nothing to do */
MarkBufferDirty(metabuffer);
}
+ /*
+ * In normal operation, shiftList() takes exclusive lock on all the
+ * pages-to-be-deleted simultaneously. During replay, however, it should
+ * be all right to lock them one at a time. This is dependent on the fact
+ * that we are deleting pages from the head of the list, and that readers
+ * share-lock the next page before releasing the one they are on. So we
+ * cannot get past a reader that is on, or due to visit, any page we are
+ * going to delete. New incoming readers will block behind our metapage
+ * lock and then see a fully updated page list.
+ */
for (i = 0; i < data->ndeleted; i++)
{
Buffer buffer = XLogReadBuffer(data->node, data->toDelete[i], false);
* GIN indexes do not require any conflict processing.
*/
- RestoreBkpBlocks(lsn, record, false);
-
topCtx = MemoryContextSwitchTo(opCtx);
switch (info)
{
Buffer buffer;
Page page;
- /* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */
+ /* we must fix incomplete_inserts list even if it's full-page image */
forgetIncompleteInsert(xldata->node, xldata->key);
if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO)
&(xldata->blkno), 1,
NULL);
- /* nothing else to do if page was backed up (and no info to do it with) */
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
decodePageUpdateRecord(&xlrec, record);
UnlockReleaseBuffer(buffer);
}
-static void
-gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
-{
- gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
- Buffer buffer;
- Page page;
-
- /* nothing else to do if page was backed up (and no info to do it with) */
- if (record->xl_info & XLR_BKP_BLOCK_1)
- return;
-
- buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
- if (!BufferIsValid(buffer))
- return;
-
- page = (Page) BufferGetPage(buffer);
- GistPageSetDeleted(page);
-
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
-}
-
static void
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
{
int i;
int flags;
+ /* Backup blocks are not used in split records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
decodePageSplitRecord(&xlrec, record);
flags = xlrec.data->origleaf ? F_LEAF : 0;
Buffer buffer;
Page page;
+ /* Backup blocks are not used in create_index records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
*ptr;
gistxlogInsertComplete *xlrec;
+ /* Backup blocks are not used in insert_complete records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
xlrec = (gistxlogInsertComplete *) begin;
ptr = begin + sizeof(gistxlogInsertComplete);
* tuples outside VACUUM, we'll need to handle that here.
*/
- RestoreBkpBlocks(lsn, record, false);
-
oldCxt = MemoryContextSwitchTo(opCtx);
switch (info)
{
case XLOG_GIST_PAGE_UPDATE:
gistRedoPageUpdateRecord(lsn, record, false);
break;
- case XLOG_GIST_PAGE_DELETE:
- gistRedoPageDeleteRecord(lsn, record);
- break;
case XLOG_GIST_NEW_ROOT:
gistRedoPageUpdateRecord(lsn, record, true);
break;
appendStringInfo(buf, "; block number %u", xlrec->blkno);
}
-static void
-out_gistxlogPageDelete(StringInfo buf, gistxlogPageDelete *xlrec)
-{
- appendStringInfo(buf, "page_delete: rel %u/%u/%u; blkno %u",
- xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
- xlrec->blkno);
-}
-
static void
out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
{
appendStringInfo(buf, "page_update: ");
out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec);
break;
- case XLOG_GIST_PAGE_DELETE:
- out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
- break;
case XLOG_GIST_NEW_ROOT:
appendStringInfo(buf, "new_root: ");
out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
* conflict processing to occur before we begin index vacuum actions. see
* vacuumlazy.c and also comments in btvacuumpage()
*/
+
+ /* Backup blocks are not used in cleanup_info records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
}
/*
ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
xlrec->node);
- RestoreBkpBlocks(lsn, record, true);
-
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /*
+ * If we have a full-page image, restore it (using a cleanup lock) and
+ * we're done.
+ */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, true, false);
return;
+ }
buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
if (!BufferIsValid(buffer))
if (InHotStandby)
ResolveRecoveryConflictWithSnapshot(cutoff_xid, xlrec->node);
- RestoreBkpBlocks(lsn, record, false);
-
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
- buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
+ buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
if (!BufferIsValid(buffer))
return;
- LockBufferForCleanup(buffer);
page = (Page) BufferGetPage(buffer);
if (XLByteLE(lsn, PageGetLSN(page)))
Buffer buffer;
Page page;
+ /* Backup blocks are not used in newpage records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
/*
* Note: the NEWPAGE log record is used for both heaps and indexes, so do
* not do anything that assumes we are touching a heap.
FreeFakeRelcacheEntry(reln);
}
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
buffer = XLogReadBuffer(xlrec->target.node, blkno, false);
if (!BufferIsValid(buffer))
FreeFakeRelcacheEntry(reln);
}
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
if (record->xl_info & XLOG_HEAP_INIT_PAGE)
{
heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
{
xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
- Buffer buffer;
bool samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
+ Buffer obuffer,
+ nbuffer;
Page page;
OffsetNumber offnum;
ItemId lp = NULL;
FreeFakeRelcacheEntry(reln);
}
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /*
+ * In normal operation, it is important to lock the two pages in
+ * page-number order, to avoid possible deadlocks against other update
+ * operations going the other way. However, during WAL replay there can
+ * be no other update happening, so we don't need to worry about that. But
+ * we *do* need to worry that we don't expose an inconsistent state to Hot
+ * Standby queries --- so the original page can't be unlocked before we've
+ * added the new tuple to the new page.
+ */
+
+ if (record->xl_info & XLR_BKP_BLOCK(0))
{
+ obuffer = RestoreBackupBlock(lsn, record, 0, false, true);
if (samepage)
- return; /* backup block covered both changes */
+ {
+ /* backup block covered both changes, so we're done */
+ UnlockReleaseBuffer(obuffer);
+ return;
+ }
goto newt;
}
/* Deal with old tuple version */
- buffer = XLogReadBuffer(xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->target.tid)),
- false);
- if (!BufferIsValid(buffer))
+ obuffer = XLogReadBuffer(xlrec->target.node,
+ ItemPointerGetBlockNumber(&(xlrec->target.tid)),
+ false);
+ if (!BufferIsValid(obuffer))
goto newt;
- page = (Page) BufferGetPage(buffer);
+ page = (Page) BufferGetPage(obuffer);
if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
{
- UnlockReleaseBuffer(buffer);
if (samepage)
+ {
+ UnlockReleaseBuffer(obuffer);
return;
+ }
goto newt;
}
* is already applied
*/
if (samepage)
+ {
+ nbuffer = obuffer;
goto newsame;
+ }
+
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
+ MarkBufferDirty(obuffer);
/* Deal with new tuple */
FreeFakeRelcacheEntry(reln);
}
- if (record->xl_info & XLR_BKP_BLOCK_2)
+ if (record->xl_info & XLR_BKP_BLOCK(1))
+ {
+ (void) RestoreBackupBlock(lsn, record, 1, false, false);
+ if (BufferIsValid(obuffer))
+ UnlockReleaseBuffer(obuffer);
return;
+ }
if (record->xl_info & XLOG_HEAP_INIT_PAGE)
{
- buffer = XLogReadBuffer(xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->newtid)),
- true);
- Assert(BufferIsValid(buffer));
- page = (Page) BufferGetPage(buffer);
+ nbuffer = XLogReadBuffer(xlrec->target.node,
+ ItemPointerGetBlockNumber(&(xlrec->newtid)),
+ true);
+ Assert(BufferIsValid(nbuffer));
+ page = (Page) BufferGetPage(nbuffer);
- PageInit(page, BufferGetPageSize(buffer), 0);
+ PageInit(page, BufferGetPageSize(nbuffer), 0);
}
else
{
- buffer = XLogReadBuffer(xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->newtid)),
- false);
- if (!BufferIsValid(buffer))
+ nbuffer = XLogReadBuffer(xlrec->target.node,
+ ItemPointerGetBlockNumber(&(xlrec->newtid)),
+ false);
+ if (!BufferIsValid(nbuffer))
return;
- page = (Page) BufferGetPage(buffer);
+ page = (Page) BufferGetPage(nbuffer);
if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
{
- UnlockReleaseBuffer(buffer);
+ UnlockReleaseBuffer(nbuffer);
+ if (BufferIsValid(obuffer))
+ UnlockReleaseBuffer(obuffer);
return;
}
}
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
+ MarkBufferDirty(nbuffer);
+ UnlockReleaseBuffer(nbuffer);
+
+ if (BufferIsValid(obuffer) && obuffer != nbuffer)
+ UnlockReleaseBuffer(obuffer);
/*
- * If the page is running low on free space, update the FSM as well.
+ * If the new page is running low on free space, update the FSM as well.
* Arbitrarily, our definition of "low" is less than 20%. We can't do much
* better than that without knowing the fill-factor for the table.
*
*/
if (!hot_update && freespace < BLCKSZ / 5)
XLogRecordPageWithFreeSpace(xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->newtid)), freespace);
+ ItemPointerGetBlockNumber(&(xlrec->newtid)),
+ freespace);
}
static void
ItemId lp = NULL;
HeapTupleHeader htup;
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
uint32 oldlen;
uint32 newlen;
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
* required. The ones in heap2 rmgr do.
*/
- RestoreBkpBlocks(lsn, record, false);
-
switch (info & XLOG_HEAP_OPMASK)
{
case XLOG_HEAP_INSERT:
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
- /*
- * Note that RestoreBkpBlocks() is called after conflict processing within
- * each record type handling function.
- */
-
switch (info & XLOG_HEAP_OPMASK)
{
case XLOG_HEAP2_FREEZE:
datalen -= sizeof(xl_btree_metadata);
}
- if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf)
- return; /* nothing to do */
-
- if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
{
buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
}
}
+ /*
+ * Note: in normal operation, we'd update the metapage while still holding
+ * lock on the page we inserted into. But during replay it's not
+ * necessary to hold that lock, since no other index updates can be
+ * happening concurrently, and readers will cope fine with following an
+ * obsolete link from the metapage.
+ */
if (ismeta)
_bt_restore_meta(xlrec->target.node, lsn,
md.root, md.level,
forget_matching_split(xlrec->node, downlink, false);
/* Extract left hikey and its size (still assuming 16-bit alignment) */
- if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ if (!(record->xl_info & XLR_BKP_BLOCK(0)))
{
/* We assume 16-bit alignment is enough for IndexTupleSize */
left_hikey = (Item) datapos;
datalen -= sizeof(OffsetNumber);
}
- if (onleft && !(record->xl_info & XLR_BKP_BLOCK_1))
+ if (onleft && !(record->xl_info & XLR_BKP_BLOCK(0)))
{
/*
* We assume that 16-bit alignment is enough to apply IndexTupleSize
datalen -= newitemsz;
}
- /* Reconstruct right (new) sibling from scratch */
+ /* Reconstruct right (new) sibling page from scratch */
rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true);
Assert(BufferIsValid(rbuf));
rpage = (Page) BufferGetPage(rbuf);
/* don't release the buffer yet; we touch right page's first item below */
- /*
- * Reconstruct left (original) sibling if needed. Note that this code
- * ensures that the items remaining on the left page are in the correct
- * item number order, but it does not reproduce the physical order they
- * would have had. Is this worth changing? See also _bt_restore_page().
- */
- if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ /* Now reconstruct left (original) sibling page */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
{
Buffer lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);
if (BufferIsValid(lbuf))
{
+ /*
+ * Note that this code ensures that the items remaining on the
+ * left page are in the correct item number order, but it does not
+ * reproduce the physical order they would have had. Is this
+ * worth changing? See also _bt_restore_page().
+ */
Page lpage = (Page) BufferGetPage(lbuf);
BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
/* We no longer need the right buffer */
UnlockReleaseBuffer(rbuf);
- /* Fix left-link of the page to the right of the new right sibling */
- if (xlrec->rnext != P_NONE && !(record->xl_info & XLR_BKP_BLOCK_2))
+ /*
+ * Fix left-link of the page to the right of the new right sibling.
+ *
+ * Note: in normal operation, we do this while still holding lock on the
+ * two split pages. However, that's not necessary for correctness in WAL
+ * replay, because no other index update can be in progress, and readers
+ * will cope properly when following an obsolete left-link.
+ */
+ if (record->xl_info & XLR_BKP_BLOCK(1))
+ (void) RestoreBackupBlock(lsn, record, 1, false, false);
+ else if (xlrec->rnext != P_NONE)
{
Buffer buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);
static void
btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record)
{
- xl_btree_vacuum *xlrec;
+ xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
Buffer buffer;
Page page;
BTPageOpaque opaque;
- xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
-
/*
* If queries might be active then we need to ensure every block is
* unpinned between the lastBlockVacuumed and the current block, if there
}
/*
- * If the block was restored from a full page image, nothing more to do.
- * The RestoreBkpBlocks() call already pinned and took cleanup lock on it.
- * XXX: Perhaps we should call RestoreBkpBlocks() *after* the loop above,
- * to make the disk access more sequential.
+ * If we have a full-page image, restore it (using a cleanup lock) and
+ * we're done.
*/
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, true, false);
return;
+ }
/*
* Like in btvacuumpage(), we need to take a cleanup lock on every leaf
* XXX optimise later with something like XLogPrefetchBuffer()
*/
static TransactionId
-btree_xlog_delete_get_latestRemovedXid(XLogRecord *record)
+btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec)
{
- xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
OffsetNumber *unused;
Buffer ibuffer,
hbuffer;
static void
btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
{
- xl_btree_delete *xlrec;
+ xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
Buffer buffer;
Page page;
BTPageOpaque opaque;
- if (record->xl_info & XLR_BKP_BLOCK_1)
- return;
+ /*
+ * If we have any conflict processing to do, it must happen before we
+ * update the page.
+ *
+ * Btree delete records can conflict with standby queries. You might
+ * think that vacuum records would conflict as well, but we've handled
+ * that already. XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
+ * cleaned by the vacuum of the heap and so we can resolve any conflicts
+ * just once when that arrives. After that we know that no conflicts
+ * exist from individual btree vacuum records on that index.
+ */
+ if (InHotStandby)
+ {
+ TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(xlrec);
- xlrec = (xl_btree_delete *) XLogRecGetData(record);
+ ResolveRecoveryConflictWithSnapshot(latestRemovedXid, xlrec->node);
+ }
+
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ return;
+ }
/*
* We don't need to take a cleanup lock to apply these changes. See
leftsib = xlrec->leftblk;
rightsib = xlrec->rightblk;
+ /*
+ * In normal operation, we would lock all the pages this WAL record
+ * touches before changing any of them. In WAL replay, it should be okay
+ * to lock just one page at a time, since no concurrent index updates can
+ * be happening, and readers should not care whether they arrive at the
+ * target page or not (since it's surely empty).
+ */
+
/* parent page */
- if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
{
buffer = XLogReadBuffer(xlrec->target.node, parent, false);
if (BufferIsValid(buffer))
}
/* Fix left-link of right sibling */
- if (!(record->xl_info & XLR_BKP_BLOCK_2))
+ if (record->xl_info & XLR_BKP_BLOCK(1))
+ (void) RestoreBackupBlock(lsn, record, 1, false, false);
+ else
{
buffer = XLogReadBuffer(xlrec->target.node, rightsib, false);
if (BufferIsValid(buffer))
}
/* Fix right-link of left sibling, if any */
- if (!(record->xl_info & XLR_BKP_BLOCK_3))
+ if (record->xl_info & XLR_BKP_BLOCK(2))
+ (void) RestoreBackupBlock(lsn, record, 2, false, false);
+ else
{
if (leftsib != P_NONE)
{
BTPageOpaque pageop;
BlockNumber downlink = 0;
+ /* Backup blocks are not used in newroot records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
forget_matching_split(xlrec->node, downlink, true);
}
-
-void
-btree_redo(XLogRecPtr lsn, XLogRecord *record)
+static void
+btree_xlog_reuse_page(XLogRecPtr lsn, XLogRecord *record)
{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
+ /*
+ * Btree reuse_page records exist to provide a conflict point when we
+ * reuse pages in the index via the FSM. That's all they do though.
+ *
+ * latestRemovedXid was the page's btpo.xact. The btpo.xact <
+ * RecentGlobalXmin test in _bt_page_recyclable() conceptually mirrors the
+ * pgxact->xmin > limitXmin test in GetConflictingVirtualXIDs().
+ * Consequently, one XID value achieves the same exclusion effect on
+ * master and standby.
+ */
if (InHotStandby)
{
- switch (info)
- {
- case XLOG_BTREE_DELETE:
-
- /*
- * Btree delete records can conflict with standby queries. You
- * might think that vacuum records would conflict as well, but
- * we've handled that already. XLOG_HEAP2_CLEANUP_INFO records
- * provide the highest xid cleaned by the vacuum of the heap
- * and so we can resolve any conflicts just once when that
- * arrives. After that any we know that no conflicts exist
- * from individual btree vacuum records on that index.
- */
- {
- TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(record);
- xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
-
- ResolveRecoveryConflictWithSnapshot(latestRemovedXid, xlrec->node);
- }
- break;
-
- case XLOG_BTREE_REUSE_PAGE:
-
- /*
- * Btree reuse page records exist to provide a conflict point
- * when we reuse pages in the index via the FSM. That's all it
- * does though. latestRemovedXid was the page's btpo.xact. The
- * btpo.xact < RecentGlobalXmin test in _bt_page_recyclable()
- * conceptually mirrors the pgxact->xmin > limitXmin test in
- * GetConflictingVirtualXIDs(). Consequently, one XID value
- * achieves the same exclusion effect on master and standby.
- */
- {
- xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
+ ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+ xlrec->node);
+ }
- ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node);
- }
- return;
+ /* Backup blocks are not used in reuse_page records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+}
- default:
- break;
- }
- }
- /*
- * Vacuum needs to pin and take cleanup lock on every leaf page, a regular
- * exclusive lock is enough for all other purposes.
- */
- RestoreBkpBlocks(lsn, record, (info == XLOG_BTREE_VACUUM));
+void
+btree_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
switch (info)
{
btree_xlog_newroot(lsn, record);
break;
case XLOG_BTREE_REUSE_PAGE:
- /* Handled above before restoring bkp block */
+ btree_xlog_reuse_page(lsn, record);
break;
default:
elog(PANIC, "btree_redo: unknown op code %u", info);
4. Mark the shared buffer(s) as dirty with MarkBufferDirty(). (This must
happen before the WAL record is inserted; see notes in SyncOneBuffer().)
-5. Build a WAL log record and pass it to XLogInsert(); then update the page's
-LSN and TLI using the returned XLOG location. For instance,
+5. If the relation requires WAL-logging, build a WAL log record and pass it
+to XLogInsert(); then update the page's LSN and TLI using the returned XLOG
+location. For instance,
recptr = XLogInsert(rmgr_id, info, rdata);
what the XLOG record actually contains. XLOG records that describe multi-page
changes therefore require some care to design: you must be certain that you
know what data is indicated by each "BKP" bit. An example of the trickiness
-is that in a HEAP_UPDATE record, BKP(1) normally is associated with the source
-page and BKP(2) is associated with the destination page --- but if these are
-the same page, only BKP(1) would have been set.
+is that in a HEAP_UPDATE record, BKP(0) normally is associated with the source
+page and BKP(1) is associated with the destination page --- but if these are
+the same page, only BKP(0) would have been set.
For this reason as well as the risk of deadlocking on buffer locks, it's best
to design WAL records so that they reflect small atomic actions involving just
ID at least once; otherwise there is no defense against torn-page problems.
The standard replay-routine pattern for this case is
- if (record->xl_info & XLR_BKP_BLOCK_n)
- << do nothing, page was rewritten from logged copy >>;
+ if (record->xl_info & XLR_BKP_BLOCK(N))
+ {
+ /* apply the change from the full-page image */
+ (void) RestoreBackupBlock(lsn, record, N, false, false);
+ return;
+ }
reln = XLogOpenRelation(rnode);
buffer = XLogReadBuffer(reln, blkno, false);
if (!BufferIsValid(buffer))
- << do nothing, page has been deleted >>;
+ {
+ /* page has been deleted, so we need do nothing */
+ return;
+ }
page = (Page) BufferGetPage(buffer);
if (XLByteLE(lsn, PageGetLSN(page)))
UnlockReleaseBuffer(buffer);
As noted above, for a multi-page update you need to be able to determine
-which XLR_BKP_BLOCK_n flag applies to each page. If a WAL record reflects
+which XLR_BKP_BLOCK(N) flag applies to each page. If a WAL record reflects
a combination of fully-rewritable and incremental updates, then the rewritable
-pages don't count for the XLR_BKP_BLOCK_n numbering. (XLR_BKP_BLOCK_n is
-associated with the n'th distinct buffer ID seen in the "rdata" array, and
+pages don't count for the XLR_BKP_BLOCK(N) numbering. (XLR_BKP_BLOCK(N) is
+associated with the N'th distinct buffer ID seen in the "rdata" array, and
per the above discussion, fully-rewritable buffers shouldn't be mentioned in
"rdata".)
+When replaying a WAL record that describes changes on multiple pages, you
+must be careful to lock the pages properly to prevent concurrent Hot Standby
+queries from seeing an inconsistent state. If this requires that two
+or more buffer locks be held concurrently, the coding pattern shown above
+is too simplistic, since it assumes the routine can exit as soon as it's
+known the current page requires no modification. Instead, you might have
+something like
+
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ /* apply the change from the full-page image */
+ buffer0 = RestoreBackupBlock(lsn, record, 0, false, true);
+ }
+ else
+ {
+ buffer0 = XLogReadBuffer(rnode, blkno, false);
+ if (BufferIsValid(buffer0))
+ {
+ ... apply the change if not already done ...
+ MarkBufferDirty(buffer0);
+ }
+ }
+
+ ... similarly apply the changes for remaining pages ...
+
+ /* and now we can release the lock on the first page */
+ if (BufferIsValid(buffer0))
+ UnlockReleaseBuffer(buffer0);
+
Due to all these constraints, complex changes (such as a multilevel index
insertion) normally need to be described by a series of atomic-action WAL
records. What do you do if the intermediate states are not self-consistent?
* loop, write_len includes the backup block data.
*
* Also set the appropriate info bits to show which buffers were backed
- * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
- * buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
+ * up. The XLR_BKP_BLOCK(N) bit corresponds to the N'th distinct buffer
+ * value (ignoring InvalidBuffer) appearing in the rdata chain.
*/
write_len = len;
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
if (!dtbuf_bkp[i])
continue;
- info |= XLR_SET_BKP_BLOCK(i);
+ info |= XLR_BKP_BLOCK(i);
bkpb = &(dtbuf_xlg[i]);
page = (char *) BufferGetBlock(dtbuf[i]);
}
/*
- * Restore the backup blocks present in an XLOG record, if any.
+ * Restore a full-page image from a backup block attached to an XLOG record.
*
- * We assume all of the record has been read into memory at *record.
+ * lsn: LSN of the XLOG record being replayed
+ * record: the complete XLOG record
+ * block_index: which backup block to restore (0 .. XLR_MAX_BKP_BLOCKS - 1)
+ * get_cleanup_lock: TRUE to get a cleanup rather than plain exclusive lock
+ * keep_buffer: TRUE to return the buffer still locked and pinned
+ *
+ * Returns the buffer number containing the page. Note this is not terribly
+ * useful unless keep_buffer is specified as TRUE.
*
* Note: when a backup block is available in XLOG, we restore it
* unconditionally, even if the page in the database appears newer.
* modifications of the page that appear in XLOG, rather than possibly
* ignoring them as already applied, but that's not a huge drawback.
*
- * If 'cleanup' is true, a cleanup lock is used when restoring blocks.
- * Otherwise, a normal exclusive lock is used. During crash recovery, that's
- * just pro forma because there can't be any regular backends in the system,
- * but in hot standby mode the distinction is important. The 'cleanup'
- * argument applies to all backup blocks in the WAL record, that suffices for
- * now.
+ * If 'get_cleanup_lock' is true, a cleanup lock is obtained on the buffer,
+ * else a normal exclusive lock is used. During crash recovery, that's just
+ * pro forma because there can't be any regular backends in the system, but
+ * in hot standby mode the distinction is important.
+ *
+ * If 'keep_buffer' is true, return without releasing the buffer lock and pin;
+ * then caller is responsible for doing UnlockReleaseBuffer() later. This
+ * is needed in some cases when replaying XLOG records that touch multiple
+ * pages, to prevent inconsistent states from being visible to other backends.
+ * (Again, that's only important in hot standby mode.)
*/
-void
-RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup)
+Buffer
+RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
+ bool get_cleanup_lock, bool keep_buffer)
{
Buffer buffer;
Page page;
char *blk;
int i;
- if (!(record->xl_info & XLR_BKP_BLOCK_MASK))
- return;
-
+ /* Locate requested BkpBlock in the record */
blk = (char *) XLogRecGetData(record) + record->xl_len;
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
- if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
+ if (!(record->xl_info & XLR_BKP_BLOCK(i)))
continue;
memcpy(&bkpb, blk, sizeof(BkpBlock));
blk += sizeof(BkpBlock);
- buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
- RBM_ZERO);
- Assert(BufferIsValid(buffer));
- if (cleanup)
- LockBufferForCleanup(buffer);
- else
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ if (i == block_index)
+ {
+ /* Found it, apply the update */
+ buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
+ RBM_ZERO);
+ Assert(BufferIsValid(buffer));
+ if (get_cleanup_lock)
+ LockBufferForCleanup(buffer);
+ else
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- page = (Page) BufferGetPage(buffer);
+ page = (Page) BufferGetPage(buffer);
- if (bkpb.hole_length == 0)
- {
- memcpy((char *) page, blk, BLCKSZ);
- }
- else
- {
- memcpy((char *) page, blk, bkpb.hole_offset);
- /* must zero-fill the hole */
- MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length);
- memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
- blk + bkpb.hole_offset,
- BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
- }
+ if (bkpb.hole_length == 0)
+ {
+ memcpy((char *) page, blk, BLCKSZ);
+ }
+ else
+ {
+ memcpy((char *) page, blk, bkpb.hole_offset);
+ /* must zero-fill the hole */
+ MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length);
+ memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
+ blk + bkpb.hole_offset,
+ BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
+ }
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
+ if (!keep_buffer)
+ UnlockReleaseBuffer(buffer);
+
+ return buffer;
+ }
blk += BLCKSZ - bkpb.hole_length;
}
+
+ /* Caller specified a bogus block_index */
+ elog(ERROR, "failed to restore block_index %d", block_index);
+ return InvalidBuffer; /* keep compiler quiet */
}
/*
{
uint32 blen;
- if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
+ if (!(record->xl_info & XLR_BKP_BLOCK(i)))
continue;
memcpy(&bkpb, blk, sizeof(BkpBlock));
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
- if (record->xl_info & XLR_SET_BKP_BLOCK(i))
- appendStringInfo(buf, "; bkpb%d", i + 1);
+ if (record->xl_info & XLR_BKP_BLOCK(i))
+ appendStringInfo(buf, "; bkpb%d", i);
}
appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name);
*/
#define XLR_BKP_BLOCK_MASK 0x0E /* all info bits used for bkp blocks */
#define XLR_MAX_BKP_BLOCKS 3
+#define XLR_BKP_BLOCK(iblk) (0x08 >> (iblk)) /* iblk in 0..2 */
+
+/* These macros are deprecated and will be removed in 9.3; use XLR_BKP_BLOCK */
#define XLR_SET_BKP_BLOCK(iblk) (0x08 >> (iblk))
#define XLR_BKP_BLOCK_1 XLR_SET_BKP_BLOCK(0) /* 0x08 */
#define XLR_BKP_BLOCK_2 XLR_SET_BKP_BLOCK(1) /* 0x04 */
* If buffer is valid then XLOG will check if buffer must be backed up
* (ie, whether this is first change of that page since last checkpoint).
* If so, the whole page contents are attached to the XLOG record, and XLOG
- * sets XLR_BKP_BLOCK_X bit in xl_info. Note that the buffer must be pinned
+ * sets XLR_BKP_BLOCK(N) bit in xl_info. Note that the buffer must be pinned
* and exclusive-locked by the caller, so that it won't change under us.
* NB: when the buffer is backed up, we DO NOT insert the data pointed to by
* this XLogRecData struct into the XLOG record, since we assume it's present
* in the buffer. Therefore, rmgr redo routines MUST pay attention to
- * XLR_BKP_BLOCK_X to know what is actually stored in the XLOG record.
- * The i'th XLR_BKP_BLOCK bit corresponds to the i'th distinct buffer
+ * XLR_BKP_BLOCK(N) to know what is actually stored in the XLOG record.
+ * The N'th XLR_BKP_BLOCK bit corresponds to the N'th distinct buffer
* value (ignoring InvalidBuffer) appearing in the rdata chain.
*
* When buffer is valid, caller must set buffer_std to indicate whether the
extern void XLogGetLastRemoved(uint32 *log, uint32 *seg);
extern void XLogSetAsyncXactLSN(XLogRecPtr record);
-extern void RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup);
+extern Buffer RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record,
+ int block_index,
+ bool get_cleanup_lock, bool keep_buffer);
extern void xlog_redo(XLogRecPtr lsn, XLogRecord *record);
extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec);