* of XLogRecData structs by a call to XLogRecordAssemble(). See
* access/transam/README for details.
*
- * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/backend/access/transam/xloginsert.c
#include "access/xlog_internal.h"
#include "access/xloginsert.h"
#include "catalog/pg_control.h"
+#include "common/pg_lzcompress.h"
#include "miscadmin.h"
+#include "replication/origin.h"
#include "storage/bufmgr.h"
#include "storage/proc.h"
#include "utils/memutils.h"
#include "pg_trace.h"
+/* Buffer size required to store a compressed version of backup block image */
+#define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
+
/*
* For each block reference registered with XLogRegisterBuffer, we fill in
* a registered_buffer struct.
XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to
* backup block data in XLogRecordAssemble() */
-} registered_buffer;
+
+ /* buffer to store a compressed version of backup block image */
+ char compressed_page[PGLZ_MAX_BLCKSZ];
+} registered_buffer;
static registered_buffer *registered_buffers;
-static int max_registered_buffers; /* allocated size */
-static int max_registered_block_id = 0; /* highest block_id + 1
- * currently registered */
+static int max_registered_buffers; /* allocated size */
+static int max_registered_block_id = 0; /* highest block_id + 1 currently
+ * registered */
/*
* A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
static uint32 mainrdata_len; /* total # of bytes in chain */
+/* flags for the in-progress insertion */
+static uint8 curinsert_flags = 0;
+
/*
* These are used to hold the record header while constructing a record.
* 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
static XLogRecData hdr_rdt;
static char *hdr_scratch = NULL;
+#define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char))
+
#define HEADER_SCRATCH_SIZE \
(SizeOfXLogRecord + \
MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
- SizeOfXLogRecordDataHeaderLong)
+ SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
/*
* An array of XLogRecData structs, to hold registered data.
static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
XLogRecPtr RedoRecPtr, bool doPageWrites,
XLogRecPtr *fpw_lsn);
+static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
+ uint16 hole_length, char *dest, uint16 *dlen);
/*
* Begin constructing a WAL record. This must be called before the
Assert(max_registered_block_id == 0);
Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
Assert(mainrdata_len == 0);
- Assert(!begininsert_called);
/* cross-check on whether we should be here or not */
if (!XLogInsertAllowed())
elog(ERROR, "cannot make new WAL entries during recovery");
+ if (begininsert_called)
+ elog(ERROR, "XLogBeginInsert was already called");
+
begininsert_called = true;
}
max_registered_block_id = 0;
mainrdata_len = 0;
mainrdata_last = (XLogRecData *) &mainrdata_head;
+ curinsert_flags = 0;
begininsert_called = false;
}
* Add data to the WAL record that's being constructed.
*
* The data is appended to the "main chunk", available at replay with
- * XLogGetRecData().
+ * XLogRecGetData().
*/
void
XLogRegisterData(char *data, int len)
regbuf->rdata_len += len;
}
+/*
+ * Set insert status flags for the upcoming WAL record.
+ *
+ * The flags that can be used here are:
+ * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
+ * included in the record.
+ * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
+ * durability, which allows to avoid triggering WAL archiving and other
+ * background activity.
+ */
+void
+XLogSetRecordFlags(uint8 flags)
+{
+ Assert(begininsert_called);
+ curinsert_flags = flags;
+}
+
/*
* Insert an XLOG record having the specified RMID and info bytes, with the
* body of the record being the data and buffer references registered earlier
elog(ERROR, "XLogBeginInsert was not called");
/*
- * The caller can set rmgr bits and XLR_SPECIAL_REL_UPDATE; the rest are
- * reserved for use by me.
+ * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
+ * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
*/
- if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE)) != 0)
+ if ((info & ~(XLR_RMGR_INFO_MASK |
+ XLR_SPECIAL_REL_UPDATE |
+ XLR_CHECK_CONSISTENCY)) != 0)
elog(PANIC, "invalid xlog info mask %02X", info);
- TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
+ TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
/*
* In bootstrap mode, we don't actually log anything but XLOG resources;
if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
{
XLogResetInsertion();
- EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
+ EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
return EndPos;
}
/*
* Get values needed to decide whether to do full-page writes. Since
* we don't yet have an insertion lock, these could change under us,
- * but XLogInsertRecData will recheck them once it has a lock.
+ * but XLogInsertRecord will recheck them once it has a lock.
*/
GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
&fpw_lsn);
- EndPos = XLogInsertRecord(rdt, fpw_lsn);
+ EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
} while (EndPos == InvalidXLogRecPtr);
XLogResetInsertion();
XLogRecData *rdt;
uint32 total_len = 0;
int block_id;
- pg_crc32 rdata_crc;
+ pg_crc32c rdata_crc;
registered_buffer *prev_regbuf = NULL;
XLogRecData *rdt_datas_last;
XLogRecord *rechdr;
rdt_datas_last = &hdr_rdt;
hdr_rdt.data = hdr_scratch;
+ /*
+ * Enforce consistency checks for this record if user is looking for it.
+ * Do this before at the beginning of this routine to give the possibility
+ * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
+ * a record.
+ */
+ if (wal_consistency_checking[rmid])
+ info |= XLR_CHECK_CONSISTENCY;
+
/*
* Make an rdata chain containing all the data portions of all block
* references. This includes the data for full-page images. Also append
bool needs_data;
XLogRecordBlockHeader bkpb;
XLogRecordBlockImageHeader bimg;
+ XLogRecordBlockCompressHeader cbimg = {0};
bool samerel;
+ bool is_compressed = false;
+ bool include_image;
if (!regbuf->in_use)
continue;
if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
- if (needs_backup)
+ /*
+ * If needs_backup is true or WAL checking is enabled for current
+ * resource manager, log a full-page write for the current block.
+ */
+ include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
+
+ if (include_image)
{
Page page = regbuf->page;
+ uint16 compressed_len;
/*
- * The page needs to be backed up, so set up *bimg
+ * The page needs to be backed up, so calculate its hole length
+ * and offset.
*/
if (regbuf->flags & REGBUF_STANDARD)
{
upper <= BLCKSZ)
{
bimg.hole_offset = lower;
- bimg.hole_length = upper - lower;
+ cbimg.hole_length = upper - lower;
}
else
{
/* No "hole" to compress out */
bimg.hole_offset = 0;
- bimg.hole_length = 0;
+ cbimg.hole_length = 0;
}
}
else
{
/* Not a standard page header, don't try to eliminate "hole" */
bimg.hole_offset = 0;
- bimg.hole_length = 0;
+ cbimg.hole_length = 0;
}
- /* Fill in the remaining fields in the XLogRecordBlockData struct */
- bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
+ /*
+ * Try to compress a block image if wal_compression is enabled
+ */
+ if (wal_compression)
+ {
+ is_compressed =
+ XLogCompressBackupBlock(page, bimg.hole_offset,
+ cbimg.hole_length,
+ regbuf->compressed_page,
+ &compressed_len);
+ }
- total_len += BLCKSZ - bimg.hole_length;
+ /*
+ * Fill in the remaining fields in the XLogRecordBlockHeader
+ * struct
+ */
+ bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
/*
* Construct XLogRecData entries for the page content.
*/
rdt_datas_last->next = ®buf->bkp_rdatas[0];
rdt_datas_last = rdt_datas_last->next;
- if (bimg.hole_length == 0)
+
+ bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
+
+ /*
+ * If WAL consistency checking is enabled for the resource manager
+ * of this WAL record, a full-page image is included in the record
+ * for the block modified. During redo, the full-page is replayed
+ * only if BKPIMAGE_APPLY is set.
+ */
+ if (needs_backup)
+ bimg.bimg_info |= BKPIMAGE_APPLY;
+
+ if (is_compressed)
{
- rdt_datas_last->data = page;
- rdt_datas_last->len = BLCKSZ;
+ bimg.length = compressed_len;
+ bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;
+
+ rdt_datas_last->data = regbuf->compressed_page;
+ rdt_datas_last->len = compressed_len;
}
else
{
- /* must skip the hole */
- rdt_datas_last->data = page;
- rdt_datas_last->len = bimg.hole_offset;
+ bimg.length = BLCKSZ - cbimg.hole_length;
+
+ if (cbimg.hole_length == 0)
+ {
+ rdt_datas_last->data = page;
+ rdt_datas_last->len = BLCKSZ;
+ }
+ else
+ {
+ /* must skip the hole */
+ rdt_datas_last->data = page;
+ rdt_datas_last->len = bimg.hole_offset;
- rdt_datas_last->next = ®buf->bkp_rdatas[1];
- rdt_datas_last = rdt_datas_last->next;
+ rdt_datas_last->next = ®buf->bkp_rdatas[1];
+ rdt_datas_last = rdt_datas_last->next;
- rdt_datas_last->data = page + (bimg.hole_offset + bimg.hole_length);
- rdt_datas_last->len = BLCKSZ - (bimg.hole_offset + bimg.hole_length);
+ rdt_datas_last->data =
+ page + (bimg.hole_offset + cbimg.hole_length);
+ rdt_datas_last->len =
+ BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
+ }
}
+
+ total_len += bimg.length;
}
if (needs_data)
{
samerel = true;
bkpb.fork_flags |= BKPBLOCK_SAME_REL;
- prev_regbuf = regbuf;
}
else
samerel = false;
+ prev_regbuf = regbuf;
/* Ok, copy the header to the scratch buffer */
memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
scratch += SizeOfXLogRecordBlockHeader;
- if (needs_backup)
+ if (include_image)
{
memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
scratch += SizeOfXLogRecordBlockImageHeader;
+ if (cbimg.hole_length != 0 && is_compressed)
+ {
+ memcpy(scratch, &cbimg,
+ SizeOfXLogRecordBlockCompressHeader);
+ scratch += SizeOfXLogRecordBlockCompressHeader;
+ }
}
if (!samerel)
{
scratch += sizeof(BlockNumber);
}
+ /* followed by the record's origin, if any */
+ if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
+ replorigin_session_origin != InvalidRepOriginId)
+ {
+ *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
+ memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
+ scratch += sizeof(replorigin_session_origin);
+ }
+
/* followed by main data, if any */
if (mainrdata_len > 0)
{
if (mainrdata_len > 255)
{
- *(scratch++) = XLR_BLOCK_ID_DATA_LONG;
+ *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
memcpy(scratch, &mainrdata_len, sizeof(uint32));
scratch += sizeof(uint32);
}
else
{
- *(scratch++) = XLR_BLOCK_ID_DATA_SHORT;
+ *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
*(scratch++) = (uint8) mainrdata_len;
}
rdt_datas_last->next = mainrdata_head;
return &hdr_rdt;
}
+/*
+ * Create a compressed version of a backup block image.
+ *
+ * Returns FALSE if compression fails (i.e., compressed result is actually
+ * bigger than original). Otherwise, returns TRUE and sets 'dlen' to
+ * the length of compressed block image.
+ */
+static bool
+XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
+ char *dest, uint16 *dlen)
+{
+ int32 orig_len = BLCKSZ - hole_length;
+ int32 len;
+ int32 extra_bytes = 0;
+ char *source;
+ char tmp[BLCKSZ];
+
+ if (hole_length != 0)
+ {
+ /* must skip the hole */
+ source = tmp;
+ memcpy(source, page, hole_offset);
+ memcpy(source + hole_offset,
+ page + (hole_offset + hole_length),
+ BLCKSZ - (hole_length + hole_offset));
+
+ /*
+ * Extra data needs to be stored in WAL record for the compressed
+ * version of block image if the hole exists.
+ */
+ extra_bytes = SizeOfXLogRecordBlockCompressHeader;
+ }
+ else
+ source = page;
+
+ /*
+ * We recheck the actual size even if pglz_compress() reports success and
+ * see if the number of bytes saved by compression is larger than the
+ * length of extra data needed for the compressed version of block image.
+ */
+ len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
+ if (len >= 0 &&
+ len + extra_bytes < orig_len)
+ {
+ *dlen = (uint16) len; /* successful compression */
+ return true;
+ }
+ return false;
+}
+
/*
* Determine whether the buffer referenced has to be backed up.
*
BufferGetTag(buffer, &rnode, &forkno, &blkno);
XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer, flags);
- recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
}
return recptr;
{
xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
"WAL record construction",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
+ ALLOCSET_DEFAULT_SIZES);
}
if (registered_buffers == NULL)
* Allocate a buffer to hold the header information for a WAL record.
*/
if (hdr_scratch == NULL)
- hdr_scratch = palloc0(HEADER_SCRATCH_SIZE);
+ hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
+ HEADER_SCRATCH_SIZE);
}