]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/transam/xloginsert.c
Phase 2 of pgindent updates.
[postgresql] / src / backend / access / transam / xloginsert.c
index 88209c3d163211c81fca255f8d18c567b4b44e10..c9cc6636d3fd039754d15727587483f0d6530597 100644 (file)
@@ -9,7 +9,7 @@
  * of XLogRecData structs by a call to XLogRecordAssemble(). See
  * access/transam/README for details.
  *
- * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * src/backend/access/transam/xloginsert.c
 #include "catalog/pg_control.h"
 #include "common/pg_lzcompress.h"
 #include "miscadmin.h"
+#include "replication/origin.h"
 #include "storage/bufmgr.h"
 #include "storage/proc.h"
 #include "utils/memutils.h"
 #include "pg_trace.h"
 
 /* Buffer size required to store a compressed version of backup block image */
-#define PGLZ_MAX_BLCKSZ        PGLZ_MAX_OUTPUT(BLCKSZ)
+#define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
 
 /*
  * For each block reference registered with XLogRegisterBuffer, we fill in
@@ -57,12 +58,12 @@ typedef struct
 
        /* buffer to store a compressed version of backup block image */
        char            compressed_page[PGLZ_MAX_BLCKSZ];
-}      registered_buffer;
+} registered_buffer;
 
 static registered_buffer *registered_buffers;
-static int     max_registered_buffers;         /* allocated size */
-static int     max_registered_block_id = 0;            /* highest block_id + 1
-                                                                                                * currently registered */
+static int     max_registered_buffers; /* allocated size */
+static int     max_registered_block_id = 0;    /* highest block_id + 1 currently
+                                                                                        * registered */
 
 /*
  * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
@@ -72,6 +73,9 @@ static XLogRecData *mainrdata_head;
 static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
 static uint32 mainrdata_len;   /* total # of bytes in chain */
 
+/* flags for the in-progress insertion */
+static uint8 curinsert_flags = 0;
+
 /*
  * These are used to hold the record header while constructing a record.
  * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
@@ -83,10 +87,12 @@ static uint32 mainrdata_len;        /* total # of bytes in chain */
 static XLogRecData hdr_rdt;
 static char *hdr_scratch = NULL;
 
+#define SizeOfXlogOrigin       (sizeof(RepOriginId) + sizeof(char))
+
 #define HEADER_SCRATCH_SIZE \
        (SizeOfXLogRecord + \
         MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
-        SizeOfXLogRecordDataHeaderLong)
+        SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
 
 /*
  * An array of XLogRecData structs, to hold registered data.
@@ -104,7 +110,7 @@ static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
                                   XLogRecPtr RedoRecPtr, bool doPageWrites,
                                   XLogRecPtr *fpw_lsn);
 static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
-                                                                       uint16 hole_length, char *dest, uint16 *dlen);
+                                               uint16 hole_length, char *dest, uint16 *dlen);
 
 /*
  * Begin constructing a WAL record. This must be called before the
@@ -116,12 +122,14 @@ XLogBeginInsert(void)
        Assert(max_registered_block_id == 0);
        Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
        Assert(mainrdata_len == 0);
-       Assert(!begininsert_called);
 
        /* cross-check on whether we should be here or not */
        if (!XLogInsertAllowed())
                elog(ERROR, "cannot make new WAL entries during recovery");
 
+       if (begininsert_called)
+               elog(ERROR, "XLogBeginInsert was already called");
+
        begininsert_called = true;
 }
 
@@ -193,6 +201,7 @@ XLogResetInsertion(void)
        max_registered_block_id = 0;
        mainrdata_len = 0;
        mainrdata_last = (XLogRecData *) &mainrdata_head;
+       curinsert_flags = 0;
        begininsert_called = false;
 }
 
@@ -374,6 +383,23 @@ XLogRegisterBufData(uint8 block_id, char *data, int len)
        regbuf->rdata_len += len;
 }
 
+/*
+ * Set insert status flags for the upcoming WAL record.
+ *
+ * The flags that can be used here are:
+ * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
+ *      included in the record.
+ * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
+ *      durability, which allows to avoid triggering WAL archiving and other
+ *      background activity.
+ */
+void
+XLogSetRecordFlags(uint8 flags)
+{
+       Assert(begininsert_called);
+       curinsert_flags = flags;
+}
+
 /*
  * Insert an XLOG record having the specified RMID and info bytes, with the
  * body of the record being the data and buffer references registered earlier
@@ -395,13 +421,15 @@ XLogInsert(RmgrId rmid, uint8 info)
                elog(ERROR, "XLogBeginInsert was not called");
 
        /*
-        * The caller can set rmgr bits and XLR_SPECIAL_REL_UPDATE; the rest are
-        * reserved for use by me.
+        * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
+        * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
         */
-       if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE)) != 0)
+       if ((info & ~(XLR_RMGR_INFO_MASK |
+                                 XLR_SPECIAL_REL_UPDATE |
+                                 XLR_CHECK_CONSISTENCY)) != 0)
                elog(PANIC, "invalid xlog info mask %02X", info);
 
-       TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
+       TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
 
        /*
         * In bootstrap mode, we don't actually log anything but XLOG resources;
@@ -410,7 +438,7 @@ XLogInsert(RmgrId rmid, uint8 info)
        if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
        {
                XLogResetInsertion();
-               EndPos = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
+               EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
                return EndPos;
        }
 
@@ -424,14 +452,14 @@ XLogInsert(RmgrId rmid, uint8 info)
                /*
                 * Get values needed to decide whether to do full-page writes. Since
                 * we don't yet have an insertion lock, these could change under us,
-                * but XLogInsertRecData will recheck them once it has a lock.
+                * but XLogInsertRecord will recheck them once it has a lock.
                 */
                GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
 
                rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
                                                                 &fpw_lsn);
 
-               EndPos = XLogInsertRecord(rdt, fpw_lsn);
+               EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
        } while (EndPos == InvalidXLogRecPtr);
 
        XLogResetInsertion();
@@ -459,7 +487,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
        XLogRecData *rdt;
        uint32          total_len = 0;
        int                     block_id;
-       pg_crc32        rdata_crc;
+       pg_crc32c       rdata_crc;
        registered_buffer *prev_regbuf = NULL;
        XLogRecData *rdt_datas_last;
        XLogRecord *rechdr;
@@ -478,6 +506,15 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
        rdt_datas_last = &hdr_rdt;
        hdr_rdt.data = hdr_scratch;
 
+       /*
+        * Enforce consistency checks for this record if user is looking for it.
+        * Do this before at the beginning of this routine to give the possibility
+        * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
+        * a record.
+        */
+       if (wal_consistency_checking[rmid])
+               info |= XLR_CHECK_CONSISTENCY;
+
        /*
         * Make an rdata chain containing all the data portions of all block
         * references. This includes the data for full-page images. Also append
@@ -494,6 +531,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
                XLogRecordBlockCompressHeader cbimg = {0};
                bool            samerel;
                bool            is_compressed = false;
+               bool            include_image;
 
                if (!regbuf->in_use)
                        continue;
@@ -537,7 +575,13 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
                if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
                        bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
 
-               if (needs_backup)
+               /*
+                * If needs_backup is true or WAL checking is enabled for current
+                * resource manager, log a full-page write for the current block.
+                */
+               include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
+
+               if (include_image)
                {
                        Page            page = regbuf->page;
                        uint16          compressed_len;
@@ -585,7 +629,10 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
                                                                                        &compressed_len);
                        }
 
-                       /* Fill in the remaining fields in the XLogRecordBlockHeader struct */
+                       /*
+                        * Fill in the remaining fields in the XLogRecordBlockHeader
+                        * struct
+                        */
                        bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
 
                        /*
@@ -596,6 +643,15 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 
                        bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
 
+                       /*
+                        * If WAL consistency checking is enabled for the resource manager
+                        * of this WAL record, a full-page image is included in the record
+                        * for the block modified. During redo, the full-page is replayed
+                        * only if BKPIMAGE_APPLY is set.
+                        */
+                       if (needs_backup)
+                               bimg.bimg_info |= BKPIMAGE_APPLY;
+
                        if (is_compressed)
                        {
                                bimg.length = compressed_len;
@@ -650,15 +706,15 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
                {
                        samerel = true;
                        bkpb.fork_flags |= BKPBLOCK_SAME_REL;
-                       prev_regbuf = regbuf;
                }
                else
                        samerel = false;
+               prev_regbuf = regbuf;
 
                /* Ok, copy the header to the scratch buffer */
                memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
                scratch += SizeOfXLogRecordBlockHeader;
-               if (needs_backup)
+               if (include_image)
                {
                        memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
                        scratch += SizeOfXLogRecordBlockImageHeader;
@@ -678,18 +734,27 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
                scratch += sizeof(BlockNumber);
        }
 
+       /* followed by the record's origin, if any */
+       if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
+               replorigin_session_origin != InvalidRepOriginId)
+       {
+               *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
+               memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
+               scratch += sizeof(replorigin_session_origin);
+       }
+
        /* followed by main data, if any */
        if (mainrdata_len > 0)
        {
                if (mainrdata_len > 255)
                {
-                       *(scratch++) = XLR_BLOCK_ID_DATA_LONG;
+                       *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
                        memcpy(scratch, &mainrdata_len, sizeof(uint32));
                        scratch += sizeof(uint32);
                }
                else
                {
-                       *(scratch++) = XLR_BLOCK_ID_DATA_SHORT;
+                       *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
                        *(scratch++) = (uint8) mainrdata_len;
                }
                rdt_datas_last->next = mainrdata_head;
@@ -737,7 +802,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
  * the length of compressed block image.
  */
 static bool
-XLogCompressBackupBlock(char * page, uint16 hole_offset, uint16 hole_length,
+XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
                                                char *dest, uint16 *dlen)
 {
        int32           orig_len = BLCKSZ - hole_length;
@@ -765,16 +830,15 @@ XLogCompressBackupBlock(char * page, uint16 hole_offset, uint16 hole_length,
                source = page;
 
        /*
-        * We recheck the actual size even if pglz_compress() reports success
-        * and see if the number of bytes saved by compression is larger than
-        * the length of extra data needed for the compressed version of block
-        * image.
+        * We recheck the actual size even if pglz_compress() reports success and
+        * see if the number of bytes saved by compression is larger than the
+        * length of extra data needed for the compressed version of block image.
         */
        len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
        if (len >= 0 &&
                len + extra_bytes < orig_len)
        {
-               *dlen = (uint16) len;           /* successful compression */
+               *dlen = (uint16) len;   /* successful compression */
                return true;
        }
        return false;
@@ -968,9 +1032,7 @@ InitXLogInsert(void)
        {
                xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
                                                                                           "WAL record construction",
-                                                                                          ALLOCSET_DEFAULT_MINSIZE,
-                                                                                          ALLOCSET_DEFAULT_INITSIZE,
-                                                                                          ALLOCSET_DEFAULT_MAXSIZE);
+                                                                                          ALLOCSET_DEFAULT_SIZES);
        }
 
        if (registered_buffers == NULL)