static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
XLogRecPtr *lsn, BkpBlock *bkpb);
+static Buffer RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb,
+ char *blk, bool get_cleanup_lock, bool keep_buffer);
static bool AdvanceXLInsertBuffer(bool new_segment);
static bool XLogCheckpointNeeded(XLogSegNo new_segno);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
bool updrqst;
bool doPageWrites;
bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
- bool isHint = (rmid == RM_XLOG_ID && info == XLOG_HINT);
uint8 info_orig = info;
static XLogRecord *rechdr;
goto begin;
}
- /*
- * If this is a hint record and we don't need a backup block then
- * we have no more work to do and can exit quickly without inserting
- * a WAL record at all. In that case return InvalidXLogRecPtr.
- */
- if (isHint && !(info & XLR_BKP_BLOCK_MASK))
- {
- LWLockRelease(WALInsertLock);
- END_CRIT_SECTION();
- return InvalidXLogRecPtr;
- }
-
/*
* If the current page is completely full, the record goes to the next
* page, right after the page header.
RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
bool get_cleanup_lock, bool keep_buffer)
{
- Buffer buffer;
- Page page;
BkpBlock bkpb;
char *blk;
int i;
if (i == block_index)
{
/* Found it, apply the update */
- buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
- RBM_ZERO);
- Assert(BufferIsValid(buffer));
- if (get_cleanup_lock)
- LockBufferForCleanup(buffer);
- else
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
- page = (Page) BufferGetPage(buffer);
-
- if (bkpb.hole_length == 0)
- {
- memcpy((char *) page, blk, BLCKSZ);
- }
- else
- {
- memcpy((char *) page, blk, bkpb.hole_offset);
- /* must zero-fill the hole */
- MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length);
- memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
- blk + bkpb.hole_offset,
- BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
- }
-
- /*
- * Any checksum set on this page will be invalid. We don't need
- * to reset it here since it will be set before being written.
- */
-
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
-
- if (!keep_buffer)
- UnlockReleaseBuffer(buffer);
-
- return buffer;
+ return RestoreBackupBlockContents(lsn, bkpb, blk, get_cleanup_lock,
+ keep_buffer);
}
blk += BLCKSZ - bkpb.hole_length;
return InvalidBuffer; /* keep compiler quiet */
}
+/*
+ * Workhorse for RestoreBackupBlock usable without an xlog record
+ *
+ * Restores a full-page image from BkpBlock and a data pointer.
+ */
+static Buffer
+RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk,
+ bool get_cleanup_lock, bool keep_buffer)
+{
+ Buffer buffer;
+ Page page;
+
+ buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
+ RBM_ZERO);
+ Assert(BufferIsValid(buffer));
+ if (get_cleanup_lock)
+ LockBufferForCleanup(buffer);
+ else
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ page = (Page) BufferGetPage(buffer);
+
+ if (bkpb.hole_length == 0)
+ {
+ memcpy((char *) page, blk, BLCKSZ);
+ }
+ else
+ {
+ memcpy((char *) page, blk, bkpb.hole_offset);
+ /* must zero-fill the hole */
+ MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length);
+ memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
+ blk + bkpb.hole_offset,
+ BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
+ }
+
+ /*
+ * The checksum value on this page is currently invalid. We don't
+ * need to reset it here since it will be set before being written.
+ */
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+
+ if (!keep_buffer)
+ UnlockReleaseBuffer(buffer);
+
+ return buffer;
+}
+
/*
* Attempt to read an XLOG record.
*
* Write a backup block if needed when we are setting a hint. Note that
* this may be called for a variety of page types, not just heaps.
*
- * Deciding the "if needed" part is delicate and requires us to either
- * grab WALInsertLock or check the info_lck spinlock. If we check the
- * spinlock and it says Yes then we will need to get WALInsertLock as well,
- * so the design choice here is to just go straight for the WALInsertLock
- * and trust that calls to this function are minimised elsewhere.
- *
* Callable while holding just share lock on the buffer content.
*
- * Possible that multiple concurrent backends could attempt to write
- * WAL records. In that case, more than one backup block may be recorded
- * though that isn't important to the outcome and the backup blocks are
- * likely to be identical anyway.
+ * We can't use the plain backup block mechanism since that relies on the
+ * Buffer being exclusively locked. Since some modifications (setting LSN, hint
+ * bits) are allowed in a sharelocked buffer that can lead to wal checksum
+ * failures. So instead we copy the page and insert the copied data as normal
+ * record data.
+ *
+ * We only need to do something if page has not yet been full page written in
+ * this checkpoint round. The LSN of the inserted wal record is returned if we
+ * had to write, InvalidXLogRecPtr otherwise.
+ *
+ * It is possible that multiple concurrent backends could attempt to write WAL
+ * records. In that case, multiple copies of the same block would be recorded
+ * in separate WAL records by different backends, though that is still OK from
+ * a correctness perspective.
+ *
+ * Note that this only works for buffers that fit the standard page model,
+ * i.e. those for which buffer_std == true
*/
-#define XLOG_HINT_WATERMARK 13579
XLogRecPtr
XLogSaveBufferForHint(Buffer buffer)
{
+ XLogRecPtr recptr = InvalidXLogRecPtr;
+ XLogRecPtr lsn;
+ XLogRecData rdata[2];
+ BkpBlock bkpb;
+
/*
- * Make an XLOG entry reporting the hint
+ * Ensure no checkpoint can change our view of RedoRecPtr.
*/
- XLogRecData rdata[2];
- int watermark = XLOG_HINT_WATERMARK;
+ Assert(MyPgXact->delayChkpt);
+
+ /*
+ * Update RedoRecPtr so XLogCheckBuffer can make the right decision
+ */
+ GetRedoRecPtr();
/*
- * Not allowed to have zero-length records, so use a small watermark
+ * Setup phony rdata element for use within XLogCheckBuffer only.
+ * We reuse and reset rdata for any actual WAL record insert.
*/
- rdata[0].data = (char *) (&watermark);
- rdata[0].len = sizeof(int);
- rdata[0].buffer = InvalidBuffer;
- rdata[0].buffer_std = false;
- rdata[0].next = &(rdata[1]);
+ rdata[0].buffer = buffer;
+ rdata[0].buffer_std = true;
+
+ if (XLogCheckBuffer(rdata, true, &lsn, &bkpb))
+ {
+ char copied_buffer[BLCKSZ];
+ char *origdata = (char *) BufferGetBlock(buffer);
+
+ /*
+ * Copy buffer so we don't have to worry about concurrent hint bit or
+ * lsn updates. We assume pd_lower/upper cannot be changed without an
+ * exclusive lock, so the contents bkp are not racy.
+ */
+ memcpy(copied_buffer, origdata, bkpb.hole_offset);
+ memcpy(copied_buffer + bkpb.hole_offset,
+ origdata + bkpb.hole_offset + bkpb.hole_length,
+ BLCKSZ - bkpb.hole_offset - bkpb.hole_length);
+
+ /*
+ * Header for backup block.
+ */
+ rdata[0].data = (char *) &bkpb;
+ rdata[0].len = sizeof(BkpBlock);
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].next = &(rdata[1]);
- rdata[1].data = NULL;
- rdata[1].len = 0;
- rdata[1].buffer = buffer;
- rdata[1].buffer_std = true;
- rdata[1].next = NULL;
+ /*
+ * Save copy of the buffer.
+ */
+ rdata[1].data = copied_buffer;
+ rdata[1].len = BLCKSZ - bkpb.hole_length;
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].next = NULL;
- return XLogInsert(RM_XLOG_ID, XLOG_HINT, rdata);
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_HINT, rdata);
+ }
+
+ return recptr;
}
/*
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
- /* Backup blocks are not used in most xlog records */
- Assert(info == XLOG_HINT || !(record->xl_info & XLR_BKP_BLOCK_MASK));
+ /* Backup blocks are not used by XLOG rmgr */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
if (info == XLOG_NEXTOID)
{
}
else if (info == XLOG_HINT)
{
-#ifdef USE_ASSERT_CHECKING
- int *watermark = (int *) XLogRecGetData(record);
-#endif
-
- /* Check the watermark is correct for the hint record */
- Assert(*watermark == XLOG_HINT_WATERMARK);
-
- /* Backup blocks must be present for smgr hint records */
- Assert(record->xl_info & XLR_BKP_BLOCK_MASK);
+ char *data;
+ BkpBlock bkpb;
/*
- * Hint records have no information that needs to be replayed.
- * The sole purpose of them is to ensure that a hint bit does
- * not cause a checksum invalidation if a hint bit write should
- * cause a torn page. So the body of the record is empty but
- * there must be one backup block.
+ * Hint bit records contain a backup block stored "inline" in the normal
+ * data since the locking when writing hint records isn't sufficient to
+ * use the normal backup block mechanism, which assumes exclusive lock
+ * on the buffer supplied.
*
- * Since the only change in the backup block is a hint bit,
- * there is no confict with Hot Standby.
+ * Since the only change in these backup block are hint bits, there are
+ * no recovery conflicts generated.
*
* This also means there is no corresponding API call for this,
* so an smgr implementation has no need to implement anything.
* Which means nothing is needed in md.c etc
*/
- RestoreBackupBlock(lsn, record, 0, false, false);
+ data = XLogRecGetData(record);
+ memcpy(&bkpb, data, sizeof(BkpBlock));
+ data += sizeof(BkpBlock);
+
+ RestoreBackupBlockContents(lsn, bkpb, data, false, false);
}
else if (info == XLOG_BACKUP_END)
{