uint32 freespace;
int curridx;
XLogRecData *rdt;
+ XLogRecData *rdt_lastnormal;
Buffer dtbuf[XLR_MAX_BKP_BLOCKS];
bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
bool updrqst;
bool doPageWrites;
bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
+ uint8 info_orig = info;
/* cross-check on whether we should be here or not */
if (!XLogInsertAllowed())
}
/*
- * Here we scan the rdata chain, determine which buffers must be backed
- * up, and compute the CRC values for the data. Note that the record
- * header isn't added into the CRC initially since we don't know the final
- * length or info bits quite yet. Thus, the CRC will represent the CRC of
- * the whole record in the order "rdata, then backup blocks, then record
- * header".
+ * Here we scan the rdata chain, to determine which buffers must be backed
+ * up.
*
* We may have to loop back to here if a race condition is detected below.
* We could prevent the race by doing all this work while holding the
* insert lock, but it seems better to avoid doing CRC calculations while
- * holding the lock. This means we have to be careful about modifying the
- * rdata chain until we know we aren't going to loop back again. The only
- * change we allow ourselves to make earlier is to set rdt->data = NULL in
- * chain items we have decided we will have to back up the whole buffer
- * for. This is OK because we will certainly decide the same thing again
- * for those items if we do it over; doing it here saves an extra pass
- * over the chain later.
+ * holding the lock.
+ *
+ * We add entries for backup blocks to the chain, so that they don't
+ * need any special treatment in the critical section where the chunks are
+ * copied into the WAL buffers. Those entries have to be unlinked from the
+ * chain if we have to loop back here.
*/
begin:;
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
*/
doPageWrites = fullPageWrites || Insert->forcePageWrites;
- INIT_CRC32(rdata_crc);
len = 0;
for (rdt = rdata;;)
{
{
/* Simple data, just include it */
len += rdt->len;
- COMP_CRC32(rdata_crc, rdt->data, rdt->len);
}
else
{
{
/* Buffer already referenced by earlier chain item */
if (dtbuf_bkp[i])
+ {
rdt->data = NULL;
+ rdt->len = 0;
+ }
else if (rdt->data)
- {
len += rdt->len;
- COMP_CRC32(rdata_crc, rdt->data, rdt->len);
- }
break;
}
if (dtbuf[i] == InvalidBuffer)
{
dtbuf_bkp[i] = true;
rdt->data = NULL;
+ rdt->len = 0;
}
else if (rdt->data)
- {
len += rdt->len;
- COMP_CRC32(rdata_crc, rdt->data, rdt->len);
- }
break;
}
}
rdt = rdt->next;
}
- /*
- * Now add the backup block headers and data into the CRC
- */
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- if (dtbuf_bkp[i])
- {
- BkpBlock *bkpb = &(dtbuf_xlg[i]);
- char *page;
-
- COMP_CRC32(rdata_crc,
- (char *) bkpb,
- sizeof(BkpBlock));
- page = (char *) BufferGetBlock(dtbuf[i]);
- if (bkpb->hole_length == 0)
- {
- COMP_CRC32(rdata_crc,
- page,
- BLCKSZ);
- }
- else
- {
- /* must skip the hole */
- COMP_CRC32(rdata_crc,
- page,
- bkpb->hole_offset);
- COMP_CRC32(rdata_crc,
- page + (bkpb->hole_offset + bkpb->hole_length),
- BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
- }
- }
- }
-
/*
* NOTE: We disallow len == 0 because it provides a useful bit of extra
* error checking in ReadRecord. This means that all callers of
if (len == 0 && !isLogSwitch)
elog(PANIC, "invalid xlog record length %u", len);
- START_CRIT_SECTION();
-
- /* Now wait to get insert lock */
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
-
- /*
- * Check to see if my RedoRecPtr is out of date. If so, may have to go
- * back and recompute everything. This can only happen just after a
- * checkpoint, so it's better to be slow in this case and fast otherwise.
- *
- * If we aren't doing full-page writes then RedoRecPtr doesn't actually
- * affect the contents of the XLOG record, so we'll update our local copy
- * but not force a recomputation.
- */
- if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
- {
- Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
- RedoRecPtr = Insert->RedoRecPtr;
-
- if (doPageWrites)
- {
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- if (dtbuf[i] == InvalidBuffer)
- continue;
- if (dtbuf_bkp[i] == false &&
- XLByteLE(dtbuf_lsn[i], RedoRecPtr))
- {
- /*
- * Oops, this buffer now needs to be backed up, but we
- * didn't think so above. Start over.
- */
- LWLockRelease(WALInsertLock);
- END_CRIT_SECTION();
- goto begin;
- }
- }
- }
- }
-
- /*
- * Also check to see if forcePageWrites was just turned on; if we weren't
- * already doing full-page writes then go back and recompute. (If it was
- * just turned off, we could recompute the record without full pages, but
- * we choose not to bother.)
- */
- if (Insert->forcePageWrites && !doPageWrites)
- {
- /* Oops, must redo it with full-page data */
- LWLockRelease(WALInsertLock);
- END_CRIT_SECTION();
- goto begin;
- }
-
/*
* Make additional rdata chain entries for the backup blocks, so that we
- * don't need to special-case them in the write loop. Note that we have
- * now irrevocably changed the input rdata chain. At the exit of this
- * loop, write_len includes the backup block data.
+ * don't need to special-case them in the write loop. This modifies the
+ * original rdata chain, but we keep a pointer to the last regular entry,
+ * rdt_lastnormal, so that we can undo this if we have to loop back to the
+ * beginning.
+ *
+ * At the exit of this loop, write_len includes the backup block data.
*
* Also set the appropriate info bits to show which buffers were backed
* up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
* buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
*/
+ rdt_lastnormal = rdt;
write_len = len;
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
}
}
+ /*
+ * Calculate CRC of the data, including all the backup blocks
+ *
+ * Note that the record header isn't added into the CRC initially since
+ * we don't know the prev-link yet. Thus, the CRC will represent the CRC
+ * of the whole record in the order: rdata, then backup blocks, then
+ * record header.
+ */
+ INIT_CRC32(rdata_crc);
+ for (rdt = rdata; rdt != NULL; rdt = rdt->next)
+ COMP_CRC32(rdata_crc, rdt->data, rdt->len);
+
+ START_CRIT_SECTION();
+
+ /* Now wait to get insert lock */
+ LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+
+ /*
+ * Check to see if my RedoRecPtr is out of date. If so, may have to go
+ * back and recompute everything. This can only happen just after a
+ * checkpoint, so it's better to be slow in this case and fast otherwise.
+ *
+ * If we aren't doing full-page writes then RedoRecPtr doesn't actually
+ * affect the contents of the XLOG record, so we'll update our local copy
+ * but not force a recomputation.
+ */
+ if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
+ {
+ Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
+ RedoRecPtr = Insert->RedoRecPtr;
+
+ if (doPageWrites)
+ {
+ for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+ {
+ if (dtbuf[i] == InvalidBuffer)
+ continue;
+ if (dtbuf_bkp[i] == false &&
+ XLByteLE(dtbuf_lsn[i], RedoRecPtr))
+ {
+ /*
+ * Oops, this buffer now needs to be backed up, but we
+ * didn't think so above. Start over.
+ */
+ LWLockRelease(WALInsertLock);
+ END_CRIT_SECTION();
+ rdt_lastnormal->next = NULL;
+ info = info_orig;
+ goto begin;
+ }
+ }
+ }
+ }
+
+ /*
+ * Also check to see if forcePageWrites was just turned on; if we weren't
+ * already doing full-page writes then go back and recompute. (If it was
+ * just turned off, we could recompute the record without full pages, but
+ * we choose not to bother.)
+ */
+ if (Insert->forcePageWrites && !doPageWrites)
+ {
+ /* Oops, must redo it with full-page data. */
+ LWLockRelease(WALInsertLock);
+ END_CRIT_SECTION();
+ rdt_lastnormal->next = NULL;
+ info = info_orig;
+ goto begin;
+ }
+
/*
* If there isn't enough space on the current XLOG page for a record
* header, advance to the next page (leaving the unused space as zeroes).