]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/transam/xloginsert.c
Phase 2 of pgindent updates.
[postgresql] / src / backend / access / transam / xloginsert.c
index b83343bf5bdd26f94845de25c69a775e41f66bca..c9cc6636d3fd039754d15727587483f0d6530597 100644 (file)
@@ -3,7 +3,13 @@
  * xloginsert.c
  *             Functions for constructing WAL records
  *
- * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Constructing a WAL record begins with a call to XLogBeginInsert,
+ * followed by a number of XLogRegister* calls. The registered data is
+ * collected in private working memory, and finally assembled into a chain
+ * of XLogRecData structs by a call to XLogRecordAssemble(). See
+ * access/transam/README for details.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * src/backend/access/transam/xloginsert.c
 #include "access/xlog_internal.h"
 #include "access/xloginsert.h"
 #include "catalog/pg_control.h"
+#include "common/pg_lzcompress.h"
 #include "miscadmin.h"
+#include "replication/origin.h"
 #include "storage/bufmgr.h"
 #include "storage/proc.h"
 #include "utils/memutils.h"
 #include "pg_trace.h"
 
+/* Buffer size required to store a compressed version of backup block image */
+#define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
+
+/*
+ * For each block reference registered with XLogRegisterBuffer, we fill in
+ * a registered_buffer struct.
+ */
+typedef struct
+{
+       bool            in_use;                 /* is this slot in use? */
+       uint8           flags;                  /* REGBUF_* flags */
+       RelFileNode rnode;                      /* identifies the relation and block */
+       ForkNumber      forkno;
+       BlockNumber block;
+       Page            page;                   /* page content */
+       uint32          rdata_len;              /* total length of data in rdata chain */
+       XLogRecData *rdata_head;        /* head of the chain of data registered with
+                                                                * this block */
+       XLogRecData *rdata_tail;        /* last entry in the chain, or &rdata_head if
+                                                                * empty */
+
+       XLogRecData bkp_rdatas[2];      /* temporary rdatas used to hold references to
+                                                                * backup block data in XLogRecordAssemble() */
+
+       /* buffer to store a compressed version of backup block image */
+       char            compressed_page[PGLZ_MAX_BLCKSZ];
+} registered_buffer;
+
+static registered_buffer *registered_buffers;
+static int     max_registered_buffers; /* allocated size */
+static int     max_registered_block_id = 0;    /* highest block_id + 1 currently
+                                                                                        * registered */
+
+/*
+ * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
+ * with XLogRegisterData(...).
+ */
+static XLogRecData *mainrdata_head;
+static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
+static uint32 mainrdata_len;   /* total # of bytes in chain */
+
+/* flags for the in-progress insertion */
+static uint8 curinsert_flags = 0;
+
+/*
+ * These are used to hold the record header while constructing a record.
+ * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
+ * because we want it to be MAXALIGNed and padding bytes zeroed.
+ *
+ * For simplicity, it's allocated large enough to hold the headers for any
+ * WAL record.
+ */
+static XLogRecData hdr_rdt;
+static char *hdr_scratch = NULL;
+
+#define SizeOfXlogOrigin       (sizeof(RepOriginId) + sizeof(char))
+
+#define HEADER_SCRATCH_SIZE \
+       (SizeOfXLogRecord + \
+        MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
+        SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
+
+/*
+ * An array of XLogRecData structs, to hold registered data.
+ */
+static XLogRecData *rdatas;
+static int     num_rdatas;                     /* entries currently used */
+static int     max_rdatas;                     /* allocated size */
+
+static bool begininsert_called = false;
+
+/* Memory context to hold the registered buffer and data references. */
+static MemoryContext xloginsert_cxt;
+
 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
-                                  XLogRecData *rdata,
                                   XLogRecPtr RedoRecPtr, bool doPageWrites,
-                                  XLogRecPtr *fpw_lsn, XLogRecData **rdt_lastnormal);
-static void XLogFillBkpBlock(Buffer buffer, bool buffer_std, BkpBlock *bkpb);
+                                  XLogRecPtr *fpw_lsn);
+static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
+                                               uint16 hole_length, char *dest, uint16 *dlen);
 
 /*
- * Insert an XLOG record having the specified RMID and info bytes,
- * with the body of the record being the data chunk(s) described by
- * the rdata chain (see xloginsert.h for notes about rdata).
+ * Begin constructing a WAL record. This must be called before the
+ * XLogRegister* functions and XLogInsert().
+ */
+void
+XLogBeginInsert(void)
+{
+       Assert(max_registered_block_id == 0);
+       Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
+       Assert(mainrdata_len == 0);
+
+       /* cross-check on whether we should be here or not */
+       if (!XLogInsertAllowed())
+               elog(ERROR, "cannot make new WAL entries during recovery");
+
+       if (begininsert_called)
+               elog(ERROR, "XLogBeginInsert was already called");
+
+       begininsert_called = true;
+}
+
+/*
+ * Ensure that there are enough buffer and data slots in the working area,
+ * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
+ * calls.
+ *
+ * There is always space for a small number of buffers and data chunks, enough
+ * for most record types. This function is for the exceptional cases that need
+ * more.
+ */
+void
+XLogEnsureRecordSpace(int max_block_id, int ndatas)
+{
+       int                     nbuffers;
+
+       /*
+        * This must be called before entering a critical section, because
+        * allocating memory inside a critical section can fail. repalloc() will
+        * check the same, but better to check it here too so that we fail
+        * consistently even if the arrays happen to be large enough already.
+        */
+       Assert(CritSectionCount == 0);
+
+       /* the minimum values can't be decreased */
+       if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
+               max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
+       if (ndatas < XLR_NORMAL_RDATAS)
+               ndatas = XLR_NORMAL_RDATAS;
+
+       if (max_block_id > XLR_MAX_BLOCK_ID)
+               elog(ERROR, "maximum number of WAL record block references exceeded");
+       nbuffers = max_block_id + 1;
+
+       if (nbuffers > max_registered_buffers)
+       {
+               registered_buffers = (registered_buffer *)
+                       repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
+
+               /*
+                * At least the padding bytes in the structs must be zeroed, because
+                * they are included in WAL data, but initialize it all for tidiness.
+                */
+               MemSet(&registered_buffers[max_registered_buffers], 0,
+                       (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
+               max_registered_buffers = nbuffers;
+       }
+
+       if (ndatas > max_rdatas)
+       {
+               rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
+               max_rdatas = ndatas;
+       }
+}
+
+/*
+ * Reset WAL record construction buffers.
+ */
+void
+XLogResetInsertion(void)
+{
+       int                     i;
+
+       for (i = 0; i < max_registered_block_id; i++)
+               registered_buffers[i].in_use = false;
+
+       num_rdatas = 0;
+       max_registered_block_id = 0;
+       mainrdata_len = 0;
+       mainrdata_last = (XLogRecData *) &mainrdata_head;
+       curinsert_flags = 0;
+       begininsert_called = false;
+}
+
+/*
+ * Register a reference to a buffer with the WAL record being constructed.
+ * This must be called for every page that the WAL-logged operation modifies.
+ */
+void
+XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
+{
+       registered_buffer *regbuf;
+
+       /* NO_IMAGE doesn't make sense with FORCE_IMAGE */
+       Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
+       Assert(begininsert_called);
+
+       if (block_id >= max_registered_block_id)
+       {
+               if (block_id >= max_registered_buffers)
+                       elog(ERROR, "too many registered buffers");
+               max_registered_block_id = block_id + 1;
+       }
+
+       regbuf = &registered_buffers[block_id];
+
+       BufferGetTag(buffer, &regbuf->rnode, &regbuf->forkno, &regbuf->block);
+       regbuf->page = BufferGetPage(buffer);
+       regbuf->flags = flags;
+       regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
+       regbuf->rdata_len = 0;
+
+       /*
+        * Check that this page hasn't already been registered with some other
+        * block_id.
+        */
+#ifdef USE_ASSERT_CHECKING
+       {
+               int                     i;
+
+               for (i = 0; i < max_registered_block_id; i++)
+               {
+                       registered_buffer *regbuf_old = &registered_buffers[i];
+
+                       if (i == block_id || !regbuf_old->in_use)
+                               continue;
+
+                       Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
+                                  regbuf_old->forkno != regbuf->forkno ||
+                                  regbuf_old->block != regbuf->block);
+               }
+       }
+#endif
+
+       regbuf->in_use = true;
+}
+
+/*
+ * Like XLogRegisterBuffer, but for registering a block that's not in the
+ * shared buffer pool (i.e. when you don't have a Buffer for it).
+ */
+void
+XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum,
+                                 BlockNumber blknum, Page page, uint8 flags)
+{
+       registered_buffer *regbuf;
+
+       /* This is currently only used to WAL-log a full-page image of a page */
+       Assert(flags & REGBUF_FORCE_IMAGE);
+       Assert(begininsert_called);
+
+       if (block_id >= max_registered_block_id)
+               max_registered_block_id = block_id + 1;
+
+       if (block_id >= max_registered_buffers)
+               elog(ERROR, "too many registered buffers");
+
+       regbuf = &registered_buffers[block_id];
+
+       regbuf->rnode = *rnode;
+       regbuf->forkno = forknum;
+       regbuf->block = blknum;
+       regbuf->page = page;
+       regbuf->flags = flags;
+       regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
+       regbuf->rdata_len = 0;
+
+       /*
+        * Check that this page hasn't already been registered with some other
+        * block_id.
+        */
+#ifdef USE_ASSERT_CHECKING
+       {
+               int                     i;
+
+               for (i = 0; i < max_registered_block_id; i++)
+               {
+                       registered_buffer *regbuf_old = &registered_buffers[i];
+
+                       if (i == block_id || !regbuf_old->in_use)
+                               continue;
+
+                       Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
+                                  regbuf_old->forkno != regbuf->forkno ||
+                                  regbuf_old->block != regbuf->block);
+               }
+       }
+#endif
+
+       regbuf->in_use = true;
+}
+
+/*
+ * Add data to the WAL record that's being constructed.
+ *
+ * The data is appended to the "main chunk", available at replay with
+ * XLogRecGetData().
+ */
+void
+XLogRegisterData(char *data, int len)
+{
+       XLogRecData *rdata;
+
+       Assert(begininsert_called);
+
+       if (num_rdatas >= max_rdatas)
+               elog(ERROR, "too much WAL data");
+       rdata = &rdatas[num_rdatas++];
+
+       rdata->data = data;
+       rdata->len = len;
+
+       /*
+        * we use the mainrdata_last pointer to track the end of the chain, so no
+        * need to clear 'next' here.
+        */
+
+       mainrdata_last->next = rdata;
+       mainrdata_last = rdata;
+
+       mainrdata_len += len;
+}
+
+/*
+ * Add buffer-specific data to the WAL record that's being constructed.
+ *
+ * Block_id must reference a block previously registered with
+ * XLogRegisterBuffer(). If this is called more than once for the same
+ * block_id, the data is appended.
+ *
+ * The maximum amount of data that can be registered per block is 65535
+ * bytes. That should be plenty; if you need more than BLCKSZ bytes to
+ * reconstruct the changes to the page, you might as well just log a full
+ * copy of it. (the "main data" that's not associated with a block is not
+ * limited)
+ */
+void
+XLogRegisterBufData(uint8 block_id, char *data, int len)
+{
+       registered_buffer *regbuf;
+       XLogRecData *rdata;
+
+       Assert(begininsert_called);
+
+       /* find the registered buffer struct */
+       regbuf = &registered_buffers[block_id];
+       if (!regbuf->in_use)
+               elog(ERROR, "no block with id %d registered with WAL insertion",
+                        block_id);
+
+       if (num_rdatas >= max_rdatas)
+               elog(ERROR, "too much WAL data");
+       rdata = &rdatas[num_rdatas++];
+
+       rdata->data = data;
+       rdata->len = len;
+
+       regbuf->rdata_tail->next = rdata;
+       regbuf->rdata_tail = rdata;
+       regbuf->rdata_len += len;
+}
+
+/*
+ * Set insert status flags for the upcoming WAL record.
+ *
+ * The flags that can be used here are:
+ * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
+ *      included in the record.
+ * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
+ *      durability, which allows to avoid triggering WAL archiving and other
+ *      background activity.
+ */
+void
+XLogSetRecordFlags(uint8 flags)
+{
+       Assert(begininsert_called);
+       curinsert_flags = flags;
+}
+
+/*
+ * Insert an XLOG record having the specified RMID and info bytes, with the
+ * body of the record being the data and buffer references registered earlier
+ * with XLogRegister* calls.
  *
  * Returns XLOG pointer to end of record (beginning of next record).
  * This can be used as LSN for data pages affected by the logged action.
  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
  * before the data page can be written out.  This implements the basic
  * WAL rule "write the log before the data".)
- *
- * NB: this routine feels free to scribble on the XLogRecData structs,
- * though not on the data they reference.  This is OK since the XLogRecData
- * structs are always just temporaries in the calling code.
  */
 XLogRecPtr
-XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
+XLogInsert(RmgrId rmid, uint8 info)
 {
-       XLogRecPtr      RedoRecPtr;
-       bool            doPageWrites;
        XLogRecPtr      EndPos;
-       XLogRecPtr      fpw_lsn;
-       XLogRecData *rdt;
-       XLogRecData *rdt_lastnormal;
 
-       /* info's high bits are reserved for use by me */
-       if (info & XLR_INFO_MASK)
+       /* XLogBeginInsert() must have been called. */
+       if (!begininsert_called)
+               elog(ERROR, "XLogBeginInsert was not called");
+
+       /*
+        * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
+        * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
+        */
+       if ((info & ~(XLR_RMGR_INFO_MASK |
+                                 XLR_SPECIAL_REL_UPDATE |
+                                 XLR_CHECK_CONSISTENCY)) != 0)
                elog(PANIC, "invalid xlog info mask %02X", info);
 
-       TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
+       TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
 
        /*
         * In bootstrap mode, we don't actually log anything but XLOG resources;
@@ -67,296 +437,413 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
         */
        if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
        {
-               EndPos = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
+               XLogResetInsertion();
+               EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
                return EndPos;
        }
 
-       /*
-        * Get values needed to decide whether to do full-page writes. Since we
-        * don't yet have an insertion lock, these could change under us, but
-        * XLogInsertRecord will recheck them once it has a lock.
-        */
-       GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
-
-       /*
-        * Assemble an XLogRecData chain representing the WAL record, including
-        * any backup blocks needed.
-        *
-        * We may have to loop back to here if a race condition is detected in
-        * XLogInsertRecord.  We could prevent the race by doing all this work
-        * while holding an insertion lock, but it seems better to avoid doing CRC
-        * calculations while holding one.
-        */
-retry:
-       rdt = XLogRecordAssemble(rmid, info, rdata, RedoRecPtr, doPageWrites,
-                                                        &fpw_lsn, &rdt_lastnormal);
-
-       EndPos = XLogInsertRecord(rdt, fpw_lsn);
-
-       if (EndPos == InvalidXLogRecPtr)
+       do
        {
+               XLogRecPtr      RedoRecPtr;
+               bool            doPageWrites;
+               XLogRecPtr      fpw_lsn;
+               XLogRecData *rdt;
+
                /*
-                * Undo the changes we made to the rdata chain, and retry.
-                *
-                * XXX: This doesn't undo *all* the changes; the XLogRecData
-                * entries for buffers that we had already decided to back up have
-                * had their data-pointers cleared. That's OK, as long as we
-                * decide to back them up on the next iteration as well. Hence,
-                * don't allow "doPageWrites" value to go from true to false after
-                * we've modified the rdata chain.
+                * Get values needed to decide whether to do full-page writes. Since
+                * we don't yet have an insertion lock, these could change under us,
+                * but XLogInsertRecord will recheck them once it has a lock.
                 */
-               bool            newDoPageWrites;
+               GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
 
-               GetFullPageWriteInfo(&RedoRecPtr, &newDoPageWrites);
-               doPageWrites = doPageWrites || newDoPageWrites;
-               rdt_lastnormal->next = NULL;
+               rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
+                                                                &fpw_lsn);
 
-               goto retry;
-       }
+               EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
+       } while (EndPos == InvalidXLogRecPtr);
+
+       XLogResetInsertion();
 
        return EndPos;
 }
 
 /*
- * Assemble a full WAL record, including backup blocks, from an XLogRecData
- * chain, ready for insertion with XLogInsertRecord(). The record header
- * fields are filled in, except for the xl_prev field and CRC.
+ * Assemble a WAL record from the registered data and buffers into an
+ * XLogRecData chain, ready for insertion with XLogInsertRecord().
  *
- * The rdata chain is modified, adding entries for full-page images.
- * *rdt_lastnormal is set to point to the last normal (ie. not added by
- * this function) entry. It can be used to reset the chain to its original
- * state.
+ * The record header fields are filled in, except for the xl_prev field. The
+ * calculated CRC does not include the record header yet.
  *
- * If the rdata chain contains any buffer references, and a full-page image
- * was not taken of all the buffers, *fpw_lsn is set to the lowest LSN among
- * such pages. This signals that the assembled record is only good for
- * insertion on the assumption that the RedoRecPtr and doPageWrites values
- * were up-to-date.
+ * If there are any registered buffers, and a full-page image was not taken
+ * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
+ * signals that the assembled record is only good for insertion on the
+ * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
  */
 static XLogRecData *
-XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecData *rdata,
+XLogRecordAssemble(RmgrId rmid, uint8 info,
                                   XLogRecPtr RedoRecPtr, bool doPageWrites,
-                                  XLogRecPtr *fpw_lsn, XLogRecData **rdt_lastnormal)
+                                  XLogRecPtr *fpw_lsn)
 {
-       bool            isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
        XLogRecData *rdt;
-       Buffer          dtbuf[XLR_MAX_BKP_BLOCKS];
-       bool            dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
-       uint32          len,
-                               total_len;
-       unsigned        i;
+       uint32          total_len = 0;
+       int                     block_id;
+       pg_crc32c       rdata_crc;
+       registered_buffer *prev_regbuf = NULL;
+       XLogRecData *rdt_datas_last;
+       XLogRecord *rechdr;
+       char       *scratch = hdr_scratch;
 
        /*
-        * These need to be static because they are returned to the caller as part
-        * of the XLogRecData chain.
+        * Note: this function can be called multiple times for the same record.
+        * All the modifications we do to the rdata chains below must handle that.
         */
-       static BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
-       static XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
-       static XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
-       static XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
-       static XLogRecData hdr_rdt;
-       static XLogRecord *rechdr;
-
-       if (rechdr == NULL)
-       {
-               static char rechdrbuf[SizeOfXLogRecord + MAXIMUM_ALIGNOF];
 
-               rechdr = (XLogRecord *) MAXALIGN(&rechdrbuf);
-               MemSet(rechdr, 0, SizeOfXLogRecord);
-       }
+       /* The record begins with the fixed-size header */
+       rechdr = (XLogRecord *) scratch;
+       scratch += SizeOfXLogRecord;
 
-       /* The record begins with the header */
-       hdr_rdt.data = (char *) rechdr;
-       hdr_rdt.len = SizeOfXLogRecord;
-       hdr_rdt.next = rdata;
-       total_len = SizeOfXLogRecord;
+       hdr_rdt.next = NULL;
+       rdt_datas_last = &hdr_rdt;
+       hdr_rdt.data = hdr_scratch;
 
        /*
-        * Here we scan the rdata chain, to determine which buffers must be backed
-        * up.
-        *
-        * We add entries for backup blocks to the chain, so that they don't need
-        * any special treatment in the critical section where the chunks are
-        * copied into the WAL buffers. Those entries have to be unlinked from the
-        * chain if we have to loop back here.
+        * Enforce consistency checks for this record if user is looking for it.
+        * Do this before at the beginning of this routine to give the possibility
+        * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
+        * a record.
         */
-       for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
-       {
-               dtbuf[i] = InvalidBuffer;
-               dtbuf_bkp[i] = false;
-       }
+       if (wal_consistency_checking[rmid])
+               info |= XLR_CHECK_CONSISTENCY;
 
+       /*
+        * Make an rdata chain containing all the data portions of all block
+        * references. This includes the data for full-page images. Also append
+        * the headers for the block references in the scratch buffer.
+        */
        *fpw_lsn = InvalidXLogRecPtr;
-       len = 0;
-       for (rdt = rdata;;)
+       for (block_id = 0; block_id < max_registered_block_id; block_id++)
        {
-               if (rdt->buffer == InvalidBuffer)
+               registered_buffer *regbuf = &registered_buffers[block_id];
+               bool            needs_backup;
+               bool            needs_data;
+               XLogRecordBlockHeader bkpb;
+               XLogRecordBlockImageHeader bimg;
+               XLogRecordBlockCompressHeader cbimg = {0};
+               bool            samerel;
+               bool            is_compressed = false;
+               bool            include_image;
+
+               if (!regbuf->in_use)
+                       continue;
+
+               /* Determine if this block needs to be backed up */
+               if (regbuf->flags & REGBUF_FORCE_IMAGE)
+                       needs_backup = true;
+               else if (regbuf->flags & REGBUF_NO_IMAGE)
+                       needs_backup = false;
+               else if (!doPageWrites)
+                       needs_backup = false;
+               else
                {
-                       /* Simple data, just include it */
-                       len += rdt->len;
+                       /*
+                        * We assume page LSN is first data on *every* page that can be
+                        * passed to XLogInsert, whether it has the standard page layout
+                        * or not.
+                        */
+                       XLogRecPtr      page_lsn = PageGetLSN(regbuf->page);
+
+                       needs_backup = (page_lsn <= RedoRecPtr);
+                       if (!needs_backup)
+                       {
+                               if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
+                                       *fpw_lsn = page_lsn;
+                       }
                }
+
+               /* Determine if the buffer data needs to included */
+               if (regbuf->rdata_len == 0)
+                       needs_data = false;
+               else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
+                       needs_data = true;
                else
+                       needs_data = !needs_backup;
+
+               bkpb.id = block_id;
+               bkpb.fork_flags = regbuf->forkno;
+               bkpb.data_length = 0;
+
+               if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
+                       bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
+
+               /*
+                * If needs_backup is true or WAL checking is enabled for current
+                * resource manager, log a full-page write for the current block.
+                */
+               include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
+
+               if (include_image)
                {
-                       /* Find info for buffer */
-                       for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+                       Page            page = regbuf->page;
+                       uint16          compressed_len;
+
+                       /*
+                        * The page needs to be backed up, so calculate its hole length
+                        * and offset.
+                        */
+                       if (regbuf->flags & REGBUF_STANDARD)
                        {
-                               if (rdt->buffer == dtbuf[i])
+                               /* Assume we can omit data between pd_lower and pd_upper */
+                               uint16          lower = ((PageHeader) page)->pd_lower;
+                               uint16          upper = ((PageHeader) page)->pd_upper;
+
+                               if (lower >= SizeOfPageHeaderData &&
+                                       upper > lower &&
+                                       upper <= BLCKSZ)
                                {
-                                       /* Buffer already referenced by earlier chain item */
-                                       if (dtbuf_bkp[i])
-                                       {
-                                               rdt->data = NULL;
-                                               rdt->len = 0;
-                                       }
-                                       else if (rdt->data)
-                                               len += rdt->len;
-                                       break;
+                                       bimg.hole_offset = lower;
+                                       cbimg.hole_length = upper - lower;
                                }
-                               if (dtbuf[i] == InvalidBuffer)
+                               else
                                {
-                                       /* OK, put it in this slot */
-                                       XLogRecPtr      page_lsn;
-                                       bool            needs_backup;
-
-                                       dtbuf[i] = rdt->buffer;
-
-                                       /*
-                                        * Determine whether the buffer has to be backed up.
-                                        *
-                                        * We assume page LSN is first data on *every* page that
-                                        * can be passed to XLogInsert, whether it has the
-                                        * standard page layout or not. We don't need to take the
-                                        * buffer header lock for PageGetLSN because we hold an
-                                        * exclusive lock on the page and/or the relation.
-                                        */
-                                       page_lsn = PageGetLSN(BufferGetPage(rdt->buffer));
-                                       if (!doPageWrites)
-                                               needs_backup = false;
-                                       else if (page_lsn <= RedoRecPtr)
-                                               needs_backup = true;
-                                       else
-                                               needs_backup = false;
-
-                                       if (needs_backup)
-                                       {
-                                               /*
-                                                * The page needs to be backed up, so set up BkpBlock
-                                                */
-                                               XLogFillBkpBlock(rdt->buffer, rdt->buffer_std,
-                                                                                &(dtbuf_xlg[i]));
-                                               dtbuf_bkp[i] = true;
-                                               rdt->data = NULL;
-                                               rdt->len = 0;
-                                       }
-                                       else
-                                       {
-                                               if (rdt->data)
-                                                       len += rdt->len;
-                                               if (*fpw_lsn == InvalidXLogRecPtr ||
-                                                       page_lsn < *fpw_lsn)
-                                               {
-                                                       *fpw_lsn = page_lsn;
-                                               }
-                                       }
-                                       break;
+                                       /* No "hole" to compress out */
+                                       bimg.hole_offset = 0;
+                                       cbimg.hole_length = 0;
                                }
                        }
-                       if (i >= XLR_MAX_BKP_BLOCKS)
-                               elog(PANIC, "can backup at most %d blocks per xlog record",
-                                        XLR_MAX_BKP_BLOCKS);
-               }
-               /* Break out of loop when rdt points to last chain item */
-               if (rdt->next == NULL)
-                       break;
-               rdt = rdt->next;
-       }
-       total_len += len;
+                       else
+                       {
+                               /* Not a standard page header, don't try to eliminate "hole" */
+                               bimg.hole_offset = 0;
+                               cbimg.hole_length = 0;
+                       }
 
-       /*
-        * Make additional rdata chain entries for the backup blocks, so that we
-        * don't need to special-case them in the write loop.  This modifies the
-        * original rdata chain, but we keep a pointer to the last regular entry,
-        * rdt_lastnormal, so that we can undo this if we have to start over.
-        *
-        * At the exit of this loop, total_len includes the backup block data.
-        *
-        * Also set the appropriate info bits to show which buffers were backed
-        * up. The XLR_BKP_BLOCK(N) bit corresponds to the N'th distinct buffer
-        * value (ignoring InvalidBuffer) appearing in the rdata chain.
-        */
-       *rdt_lastnormal = rdt;
-       for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
-       {
-               BkpBlock   *bkpb;
-               char       *page;
+                       /*
+                        * Try to compress a block image if wal_compression is enabled
+                        */
+                       if (wal_compression)
+                       {
+                               is_compressed =
+                                       XLogCompressBackupBlock(page, bimg.hole_offset,
+                                                                                       cbimg.hole_length,
+                                                                                       regbuf->compressed_page,
+                                                                                       &compressed_len);
+                       }
 
-               if (!dtbuf_bkp[i])
-                       continue;
+                       /*
+                        * Fill in the remaining fields in the XLogRecordBlockHeader
+                        * struct
+                        */
+                       bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
+
+                       /*
+                        * Construct XLogRecData entries for the page content.
+                        */
+                       rdt_datas_last->next = &regbuf->bkp_rdatas[0];
+                       rdt_datas_last = rdt_datas_last->next;
+
+                       bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
+
+                       /*
+                        * If WAL consistency checking is enabled for the resource manager
+                        * of this WAL record, a full-page image is included in the record
+                        * for the block modified. During redo, the full-page is replayed
+                        * only if BKPIMAGE_APPLY is set.
+                        */
+                       if (needs_backup)
+                               bimg.bimg_info |= BKPIMAGE_APPLY;
+
+                       if (is_compressed)
+                       {
+                               bimg.length = compressed_len;
+                               bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;
+
+                               rdt_datas_last->data = regbuf->compressed_page;
+                               rdt_datas_last->len = compressed_len;
+                       }
+                       else
+                       {
+                               bimg.length = BLCKSZ - cbimg.hole_length;
 
-               info |= XLR_BKP_BLOCK(i);
+                               if (cbimg.hole_length == 0)
+                               {
+                                       rdt_datas_last->data = page;
+                                       rdt_datas_last->len = BLCKSZ;
+                               }
+                               else
+                               {
+                                       /* must skip the hole */
+                                       rdt_datas_last->data = page;
+                                       rdt_datas_last->len = bimg.hole_offset;
 
-               bkpb = &(dtbuf_xlg[i]);
-               page = (char *) BufferGetBlock(dtbuf[i]);
+                                       rdt_datas_last->next = &regbuf->bkp_rdatas[1];
+                                       rdt_datas_last = rdt_datas_last->next;
 
-               rdt->next = &(dtbuf_rdt1[i]);
-               rdt = rdt->next;
+                                       rdt_datas_last->data =
+                                               page + (bimg.hole_offset + cbimg.hole_length);
+                                       rdt_datas_last->len =
+                                               BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
+                               }
+                       }
 
-               rdt->data = (char *) bkpb;
-               rdt->len = sizeof(BkpBlock);
-               total_len += sizeof(BkpBlock);
+                       total_len += bimg.length;
+               }
 
-               rdt->next = &(dtbuf_rdt2[i]);
-               rdt = rdt->next;
+               if (needs_data)
+               {
+                       /*
+                        * Link the caller-supplied rdata chain for this buffer to the
+                        * overall list.
+                        */
+                       bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
+                       bkpb.data_length = regbuf->rdata_len;
+                       total_len += regbuf->rdata_len;
+
+                       rdt_datas_last->next = regbuf->rdata_head;
+                       rdt_datas_last = regbuf->rdata_tail;
+               }
 
-               if (bkpb->hole_length == 0)
+               if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
                {
-                       rdt->data = page;
-                       rdt->len = BLCKSZ;
-                       total_len += BLCKSZ;
-                       rdt->next = NULL;
+                       samerel = true;
+                       bkpb.fork_flags |= BKPBLOCK_SAME_REL;
                }
                else
+                       samerel = false;
+               prev_regbuf = regbuf;
+
+               /* Ok, copy the header to the scratch buffer */
+               memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
+               scratch += SizeOfXLogRecordBlockHeader;
+               if (include_image)
                {
-                       /* must skip the hole */
-                       rdt->data = page;
-                       rdt->len = bkpb->hole_offset;
-                       total_len += bkpb->hole_offset;
-
-                       rdt->next = &(dtbuf_rdt3[i]);
-                       rdt = rdt->next;
-
-                       rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
-                       rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
-                       total_len += rdt->len;
-                       rdt->next = NULL;
+                       memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
+                       scratch += SizeOfXLogRecordBlockImageHeader;
+                       if (cbimg.hole_length != 0 && is_compressed)
+                       {
+                               memcpy(scratch, &cbimg,
+                                          SizeOfXLogRecordBlockCompressHeader);
+                               scratch += SizeOfXLogRecordBlockCompressHeader;
+                       }
                }
+               if (!samerel)
+               {
+                       memcpy(scratch, &regbuf->rnode, sizeof(RelFileNode));
+                       scratch += sizeof(RelFileNode);
+               }
+               memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
+               scratch += sizeof(BlockNumber);
+       }
+
+       /* followed by the record's origin, if any */
+       if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
+               replorigin_session_origin != InvalidRepOriginId)
+       {
+               *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
+               memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
+               scratch += sizeof(replorigin_session_origin);
        }
 
+       /* followed by main data, if any */
+       if (mainrdata_len > 0)
+       {
+               if (mainrdata_len > 255)
+               {
+                       *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
+                       memcpy(scratch, &mainrdata_len, sizeof(uint32));
+                       scratch += sizeof(uint32);
+               }
+               else
+               {
+                       *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
+                       *(scratch++) = (uint8) mainrdata_len;
+               }
+               rdt_datas_last->next = mainrdata_head;
+               rdt_datas_last = mainrdata_last;
+               total_len += mainrdata_len;
+       }
+       rdt_datas_last->next = NULL;
+
+       hdr_rdt.len = (scratch - hdr_scratch);
+       total_len += hdr_rdt.len;
+
        /*
-        * We disallow len == 0 because it provides a useful bit of extra error
-        * checking in ReadRecord.  This means that all callers of XLogInsert
-        * must supply at least some not-in-a-buffer data.  However, we make an
-        * exception for XLOG SWITCH records because we don't want them to ever
-        * cross a segment boundary.
+        * Calculate CRC of the data
+        *
+        * Note that the record header isn't added into the CRC initially since we
+        * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
+        * the whole record in the order: rdata, then backup blocks, then record
+        * header.
         */
-       if (len == 0 && !isLogSwitch)
-               elog(PANIC, "invalid xlog record length %u", rechdr->xl_len);
+       INIT_CRC32C(rdata_crc);
+       COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
+       for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
+               COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
 
        /*
         * Fill in the fields in the record header. Prev-link is filled in later,
-        * once we know where in the WAL the record will be inserted. CRC is also
-        * not calculated yet.
+        * once we know where in the WAL the record will be inserted. The CRC does
+        * not include the record header yet.
         */
        rechdr->xl_xid = GetCurrentTransactionIdIfAny();
        rechdr->xl_tot_len = total_len;
-       rechdr->xl_len = len;           /* doesn't include backup blocks */
        rechdr->xl_info = info;
        rechdr->xl_rmid = rmid;
        rechdr->xl_prev = InvalidXLogRecPtr;
+       rechdr->xl_crc = rdata_crc;
 
        return &hdr_rdt;
 }
 
+/*
+ * Create a compressed version of a backup block image.
+ *
+ * Returns FALSE if compression fails (i.e., compressed result is actually
+ * bigger than original). Otherwise, returns TRUE and sets 'dlen' to
+ * the length of compressed block image.
+ */
+static bool
+XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
+                                               char *dest, uint16 *dlen)
+{
+       int32           orig_len = BLCKSZ - hole_length;
+       int32           len;
+       int32           extra_bytes = 0;
+       char       *source;
+       char            tmp[BLCKSZ];
+
+       if (hole_length != 0)
+       {
+               /* must skip the hole */
+               source = tmp;
+               memcpy(source, page, hole_offset);
+               memcpy(source + hole_offset,
+                          page + (hole_offset + hole_length),
+                          BLCKSZ - (hole_length + hole_offset));
+
+               /*
+                * Extra data needs to be stored in WAL record for the compressed
+                * version of block image if the hole exists.
+                */
+               extra_bytes = SizeOfXLogRecordBlockCompressHeader;
+       }
+       else
+               source = page;
+
+       /*
+        * We recheck the actual size even if pglz_compress() reports success and
+        * see if the number of bytes saved by compression is larger than the
+        * length of extra data needed for the compressed version of block image.
+        */
+       len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
+       if (len >= 0 &&
+               len + extra_bytes < orig_len)
+       {
+               *dlen = (uint16) len;   /* successful compression */
+               return true;
+       }
+       return false;
+}
+
 /*
  * Determine whether the buffer referenced has to be backed up.
  *
@@ -429,45 +916,41 @@ XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
 
        if (lsn <= RedoRecPtr)
        {
-               XLogRecData rdata[2];
-               BkpBlock        bkpb;
+               int                     flags;
                char            copied_buffer[BLCKSZ];
                char       *origdata = (char *) BufferGetBlock(buffer);
-
-               /* Make a BkpBlock struct representing the buffer */
-               XLogFillBkpBlock(buffer, buffer_std, &bkpb);
+               RelFileNode rnode;
+               ForkNumber      forkno;
+               BlockNumber blkno;
 
                /*
                 * Copy buffer so we don't have to worry about concurrent hint bit or
                 * lsn updates. We assume pd_lower/upper cannot be changed without an
                 * exclusive lock, so the contents bkp are not racy.
-                *
-                * With buffer_std set to false, XLogFillBkpBlock() sets hole_length
-                * and hole_offset to 0; so the following code is safe for either
-                * case.
                 */
-               memcpy(copied_buffer, origdata, bkpb.hole_offset);
-               memcpy(copied_buffer + bkpb.hole_offset,
-                          origdata + bkpb.hole_offset + bkpb.hole_length,
-                          BLCKSZ - bkpb.hole_offset - bkpb.hole_length);
+               if (buffer_std)
+               {
+                       /* Assume we can omit data between pd_lower and pd_upper */
+                       Page            page = BufferGetPage(buffer);
+                       uint16          lower = ((PageHeader) page)->pd_lower;
+                       uint16          upper = ((PageHeader) page)->pd_upper;
 
-               /*
-                * Header for backup block.
-                */
-               rdata[0].data = (char *) &bkpb;
-               rdata[0].len = sizeof(BkpBlock);
-               rdata[0].buffer = InvalidBuffer;
-               rdata[0].next = &(rdata[1]);
+                       memcpy(copied_buffer, origdata, lower);
+                       memcpy(copied_buffer + upper, origdata + upper, BLCKSZ - upper);
+               }
+               else
+                       memcpy(copied_buffer, origdata, BLCKSZ);
 
-               /*
-                * Save copy of the buffer.
-                */
-               rdata[1].data = copied_buffer;
-               rdata[1].len = BLCKSZ - bkpb.hole_length;
-               rdata[1].buffer = InvalidBuffer;
-               rdata[1].next = NULL;
+               XLogBeginInsert();
+
+               flags = REGBUF_FORCE_IMAGE;
+               if (buffer_std)
+                       flags |= REGBUF_STANDARD;
 
-               recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata);
+               BufferGetTag(buffer, &rnode, &forkno, &blkno);
+               XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer, flags);
+
+               recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
        }
 
        return recptr;
@@ -489,71 +972,16 @@ XLogRecPtr
 log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
                        Page page, bool page_std)
 {
-       BkpBlock        bkpb;
+       int                     flags;
        XLogRecPtr      recptr;
-       XLogRecData rdata[3];
-
-       /* NO ELOG(ERROR) from here till newpage op is logged */
-       START_CRIT_SECTION();
-
-       bkpb.node = *rnode;
-       bkpb.fork = forkNum;
-       bkpb.block = blkno;
 
+       flags = REGBUF_FORCE_IMAGE;
        if (page_std)
-       {
-               /* Assume we can omit data between pd_lower and pd_upper */
-               uint16          lower = ((PageHeader) page)->pd_lower;
-               uint16          upper = ((PageHeader) page)->pd_upper;
-
-               if (lower >= SizeOfPageHeaderData &&
-                       upper > lower &&
-                       upper <= BLCKSZ)
-               {
-                       bkpb.hole_offset = lower;
-                       bkpb.hole_length = upper - lower;
-               }
-               else
-               {
-                       /* No "hole" to compress out */
-                       bkpb.hole_offset = 0;
-                       bkpb.hole_length = 0;
-               }
-       }
-       else
-       {
-               /* Not a standard page header, don't try to eliminate "hole" */
-               bkpb.hole_offset = 0;
-               bkpb.hole_length = 0;
-       }
-
-       rdata[0].data = (char *) &bkpb;
-       rdata[0].len = sizeof(BkpBlock);
-       rdata[0].buffer = InvalidBuffer;
-       rdata[0].next = &(rdata[1]);
-
-       if (bkpb.hole_length == 0)
-       {
-               rdata[1].data = (char *) page;
-               rdata[1].len = BLCKSZ;
-               rdata[1].buffer = InvalidBuffer;
-               rdata[1].next = NULL;
-       }
-       else
-       {
-               /* must skip the hole */
-               rdata[1].data = (char *) page;
-               rdata[1].len = bkpb.hole_offset;
-               rdata[1].buffer = InvalidBuffer;
-               rdata[1].next = &rdata[2];
-
-               rdata[2].data = (char *) page + (bkpb.hole_offset + bkpb.hole_length);
-               rdata[2].len = BLCKSZ - (bkpb.hole_offset + bkpb.hole_length);
-               rdata[2].buffer = InvalidBuffer;
-               rdata[2].next = NULL;
-       }
+               flags |= REGBUF_STANDARD;
 
-       recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata);
+       XLogBeginInsert();
+       XLogRegisterBlock(0, rnode, forkNum, blkno, page, flags);
+       recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
 
        /*
         * The page may be uninitialized. If so, we can't set the LSN because that
@@ -564,8 +992,6 @@ log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
                PageSetLSN(page, recptr);
        }
 
-       END_CRIT_SECTION();
-
        return recptr;
 }
 
@@ -596,38 +1022,37 @@ log_newpage_buffer(Buffer buffer, bool page_std)
 }
 
 /*
- * Fill a BkpBlock for a buffer.
+ * Allocate working buffers needed for WAL record construction.
  */
-static void
-XLogFillBkpBlock(Buffer buffer, bool buffer_std, BkpBlock *bkpb)
+void
+InitXLogInsert(void)
 {
-       BufferGetTag(buffer, &bkpb->node, &bkpb->fork, &bkpb->block);
+       /* Initialize the working areas */
+       if (xloginsert_cxt == NULL)
+       {
+               xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
+                                                                                          "WAL record construction",
+                                                                                          ALLOCSET_DEFAULT_SIZES);
+       }
 
-       if (buffer_std)
+       if (registered_buffers == NULL)
        {
-               /* Assume we can omit data between pd_lower and pd_upper */
-               Page            page = BufferGetPage(buffer);
-               uint16          lower = ((PageHeader) page)->pd_lower;
-               uint16          upper = ((PageHeader) page)->pd_upper;
-
-               if (lower >= SizeOfPageHeaderData &&
-                       upper > lower &&
-                       upper <= BLCKSZ)
-               {
-                       bkpb->hole_offset = lower;
-                       bkpb->hole_length = upper - lower;
-               }
-               else
-               {
-                       /* No "hole" to compress out */
-                       bkpb->hole_offset = 0;
-                       bkpb->hole_length = 0;
-               }
+               registered_buffers = (registered_buffer *)
+                       MemoryContextAllocZero(xloginsert_cxt,
+                                 sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
+               max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
        }
-       else
+       if (rdatas == NULL)
        {
-               /* Not a standard page header, don't try to eliminate "hole" */
-               bkpb->hole_offset = 0;
-               bkpb->hole_length = 0;
+               rdatas = MemoryContextAlloc(xloginsert_cxt,
+                                                                       sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
+               max_rdatas = XLR_NORMAL_RDATAS;
        }
+
+       /*
+        * Allocate a buffer to hold the header information for a WAL record.
+        */
+       if (hdr_scratch == NULL)
+               hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
+                                                                                        HEADER_SCRATCH_SIZE);
 }