]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/transam/xloginsert.c
Phase 2 of pgindent updates.
[postgresql] / src / backend / access / transam / xloginsert.c
index fe2043130241b041b64372f38b08eb567b79b53b..c9cc6636d3fd039754d15727587483f0d6530597 100644 (file)
@@ -9,7 +9,7 @@
  * of XLogRecData structs by a call to XLogRecordAssemble(). See
  * access/transam/README for details.
  *
- * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * src/backend/access/transam/xloginsert.c
 #include "access/xlog_internal.h"
 #include "access/xloginsert.h"
 #include "catalog/pg_control.h"
+#include "common/pg_lzcompress.h"
 #include "miscadmin.h"
+#include "replication/origin.h"
 #include "storage/bufmgr.h"
 #include "storage/proc.h"
 #include "utils/memutils.h"
 #include "pg_trace.h"
 
+/* Buffer size required to store a compressed version of backup block image */
+#define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
+
 /*
  * For each block reference registered with XLogRegisterBuffer, we fill in
  * a registered_buffer struct.
@@ -50,12 +55,15 @@ typedef struct
 
        XLogRecData bkp_rdatas[2];      /* temporary rdatas used to hold references to
                                                                 * backup block data in XLogRecordAssemble() */
-}      registered_buffer;
+
+       /* buffer to store a compressed version of backup block image */
+       char            compressed_page[PGLZ_MAX_BLCKSZ];
+} registered_buffer;
 
 static registered_buffer *registered_buffers;
-static int     max_registered_buffers;         /* allocated size */
-static int     max_registered_block_id = 0;            /* highest block_id + 1
-                                                                                                * currently registered */
+static int     max_registered_buffers; /* allocated size */
+static int     max_registered_block_id = 0;    /* highest block_id + 1 currently
+                                                                                        * registered */
 
 /*
  * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
@@ -65,6 +73,9 @@ static XLogRecData *mainrdata_head;
 static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
 static uint32 mainrdata_len;   /* total # of bytes in chain */
 
+/* flags for the in-progress insertion */
+static uint8 curinsert_flags = 0;
+
 /*
  * These are used to hold the record header while constructing a record.
  * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
@@ -76,10 +87,12 @@ static uint32 mainrdata_len;        /* total # of bytes in chain */
 static XLogRecData hdr_rdt;
 static char *hdr_scratch = NULL;
 
+#define SizeOfXlogOrigin       (sizeof(RepOriginId) + sizeof(char))
+
 #define HEADER_SCRATCH_SIZE \
        (SizeOfXLogRecord + \
         MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
-        SizeOfXLogRecordDataHeaderLong)
+        SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
 
 /*
  * An array of XLogRecData structs, to hold registered data.
@@ -96,6 +109,8 @@ static MemoryContext xloginsert_cxt;
 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
                                   XLogRecPtr RedoRecPtr, bool doPageWrites,
                                   XLogRecPtr *fpw_lsn);
+static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
+                                               uint16 hole_length, char *dest, uint16 *dlen);
 
 /*
  * Begin constructing a WAL record. This must be called before the
@@ -107,12 +122,14 @@ XLogBeginInsert(void)
        Assert(max_registered_block_id == 0);
        Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
        Assert(mainrdata_len == 0);
-       Assert(!begininsert_called);
 
        /* cross-check on whether we should be here or not */
        if (!XLogInsertAllowed())
                elog(ERROR, "cannot make new WAL entries during recovery");
 
+       if (begininsert_called)
+               elog(ERROR, "XLogBeginInsert was already called");
+
        begininsert_called = true;
 }
 
@@ -184,6 +201,7 @@ XLogResetInsertion(void)
        max_registered_block_id = 0;
        mainrdata_len = 0;
        mainrdata_last = (XLogRecData *) &mainrdata_head;
+       curinsert_flags = 0;
        begininsert_called = false;
 }
 
@@ -299,7 +317,7 @@ XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum,
  * Add data to the WAL record that's being constructed.
  *
  * The data is appended to the "main chunk", available at replay with
- * XLogGetRecData().
+ * XLogRecGetData().
  */
 void
 XLogRegisterData(char *data, int len)
@@ -365,6 +383,23 @@ XLogRegisterBufData(uint8 block_id, char *data, int len)
        regbuf->rdata_len += len;
 }
 
+/*
+ * Set insert status flags for the upcoming WAL record.
+ *
+ * The flags that can be used here are:
+ * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
+ *      included in the record.
+ * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
+ *      durability, which allows to avoid triggering WAL archiving and other
+ *      background activity.
+ */
+void
+XLogSetRecordFlags(uint8 flags)
+{
+       Assert(begininsert_called);
+       curinsert_flags = flags;
+}
+
 /*
  * Insert an XLOG record having the specified RMID and info bytes, with the
  * body of the record being the data and buffer references registered earlier
@@ -386,13 +421,15 @@ XLogInsert(RmgrId rmid, uint8 info)
                elog(ERROR, "XLogBeginInsert was not called");
 
        /*
-        * The caller can set rmgr bits and XLR_SPECIAL_REL_UPDATE; the rest are
-        * reserved for use by me.
+        * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
+        * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
         */
-       if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE)) != 0)
+       if ((info & ~(XLR_RMGR_INFO_MASK |
+                                 XLR_SPECIAL_REL_UPDATE |
+                                 XLR_CHECK_CONSISTENCY)) != 0)
                elog(PANIC, "invalid xlog info mask %02X", info);
 
-       TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
+       TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
 
        /*
         * In bootstrap mode, we don't actually log anything but XLOG resources;
@@ -401,7 +438,7 @@ XLogInsert(RmgrId rmid, uint8 info)
        if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
        {
                XLogResetInsertion();
-               EndPos = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
+               EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
                return EndPos;
        }
 
@@ -415,14 +452,14 @@ XLogInsert(RmgrId rmid, uint8 info)
                /*
                 * Get values needed to decide whether to do full-page writes. Since
                 * we don't yet have an insertion lock, these could change under us,
-                * but XLogInsertRecData will recheck them once it has a lock.
+                * but XLogInsertRecord will recheck them once it has a lock.
                 */
                GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
 
                rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
                                                                 &fpw_lsn);
 
-               EndPos = XLogInsertRecord(rdt, fpw_lsn);
+               EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
        } while (EndPos == InvalidXLogRecPtr);
 
        XLogResetInsertion();
@@ -450,7 +487,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
        XLogRecData *rdt;
        uint32          total_len = 0;
        int                     block_id;
-       pg_crc32        rdata_crc;
+       pg_crc32c       rdata_crc;
        registered_buffer *prev_regbuf = NULL;
        XLogRecData *rdt_datas_last;
        XLogRecord *rechdr;
@@ -469,6 +506,15 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
        rdt_datas_last = &hdr_rdt;
        hdr_rdt.data = hdr_scratch;
 
+       /*
+        * Enforce consistency checks for this record if user is looking for it.
+        * Do this before at the beginning of this routine to give the possibility
+        * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
+        * a record.
+        */
+       if (wal_consistency_checking[rmid])
+               info |= XLR_CHECK_CONSISTENCY;
+
        /*
         * Make an rdata chain containing all the data portions of all block
         * references. This includes the data for full-page images. Also append
@@ -482,7 +528,10 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
                bool            needs_data;
                XLogRecordBlockHeader bkpb;
                XLogRecordBlockImageHeader bimg;
+               XLogRecordBlockCompressHeader cbimg = {0};
                bool            samerel;
+               bool            is_compressed = false;
+               bool            include_image;
 
                if (!regbuf->in_use)
                        continue;
@@ -526,12 +575,20 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
                if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
                        bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
 
-               if (needs_backup)
+               /*
+                * If needs_backup is true or WAL checking is enabled for current
+                * resource manager, log a full-page write for the current block.
+                */
+               include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
+
+               if (include_image)
                {
                        Page            page = regbuf->page;
+                       uint16          compressed_len;
 
                        /*
-                        * The page needs to be backed up, so set up *bimg
+                        * The page needs to be backed up, so calculate its hole length
+                        * and offset.
                         */
                        if (regbuf->flags & REGBUF_STANDARD)
                        {
@@ -544,49 +601,91 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
                                        upper <= BLCKSZ)
                                {
                                        bimg.hole_offset = lower;
-                                       bimg.hole_length = upper - lower;
+                                       cbimg.hole_length = upper - lower;
                                }
                                else
                                {
                                        /* No "hole" to compress out */
                                        bimg.hole_offset = 0;
-                                       bimg.hole_length = 0;
+                                       cbimg.hole_length = 0;
                                }
                        }
                        else
                        {
                                /* Not a standard page header, don't try to eliminate "hole" */
                                bimg.hole_offset = 0;
-                               bimg.hole_length = 0;
+                               cbimg.hole_length = 0;
                        }
 
-                       /* Fill in the remaining fields in the XLogRecordBlockData struct */
-                       bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
+                       /*
+                        * Try to compress a block image if wal_compression is enabled
+                        */
+                       if (wal_compression)
+                       {
+                               is_compressed =
+                                       XLogCompressBackupBlock(page, bimg.hole_offset,
+                                                                                       cbimg.hole_length,
+                                                                                       regbuf->compressed_page,
+                                                                                       &compressed_len);
+                       }
 
-                       total_len += BLCKSZ - bimg.hole_length;
+                       /*
+                        * Fill in the remaining fields in the XLogRecordBlockHeader
+                        * struct
+                        */
+                       bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
 
                        /*
                         * Construct XLogRecData entries for the page content.
                         */
                        rdt_datas_last->next = &regbuf->bkp_rdatas[0];
                        rdt_datas_last = rdt_datas_last->next;
-                       if (bimg.hole_length == 0)
+
+                       bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
+
+                       /*
+                        * If WAL consistency checking is enabled for the resource manager
+                        * of this WAL record, a full-page image is included in the record
+                        * for the block modified. During redo, the full-page is replayed
+                        * only if BKPIMAGE_APPLY is set.
+                        */
+                       if (needs_backup)
+                               bimg.bimg_info |= BKPIMAGE_APPLY;
+
+                       if (is_compressed)
                        {
-                               rdt_datas_last->data = page;
-                               rdt_datas_last->len = BLCKSZ;
+                               bimg.length = compressed_len;
+                               bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;
+
+                               rdt_datas_last->data = regbuf->compressed_page;
+                               rdt_datas_last->len = compressed_len;
                        }
                        else
                        {
-                               /* must skip the hole */
-                               rdt_datas_last->data = page;
-                               rdt_datas_last->len = bimg.hole_offset;
+                               bimg.length = BLCKSZ - cbimg.hole_length;
+
+                               if (cbimg.hole_length == 0)
+                               {
+                                       rdt_datas_last->data = page;
+                                       rdt_datas_last->len = BLCKSZ;
+                               }
+                               else
+                               {
+                                       /* must skip the hole */
+                                       rdt_datas_last->data = page;
+                                       rdt_datas_last->len = bimg.hole_offset;
 
-                               rdt_datas_last->next = &regbuf->bkp_rdatas[1];
-                               rdt_datas_last = rdt_datas_last->next;
+                                       rdt_datas_last->next = &regbuf->bkp_rdatas[1];
+                                       rdt_datas_last = rdt_datas_last->next;
 
-                               rdt_datas_last->data = page + (bimg.hole_offset + bimg.hole_length);
-                               rdt_datas_last->len = BLCKSZ - (bimg.hole_offset + bimg.hole_length);
+                                       rdt_datas_last->data =
+                                               page + (bimg.hole_offset + cbimg.hole_length);
+                                       rdt_datas_last->len =
+                                               BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
+                               }
                        }
+
+                       total_len += bimg.length;
                }
 
                if (needs_data)
@@ -607,18 +706,24 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
                {
                        samerel = true;
                        bkpb.fork_flags |= BKPBLOCK_SAME_REL;
-                       prev_regbuf = regbuf;
                }
                else
                        samerel = false;
+               prev_regbuf = regbuf;
 
                /* Ok, copy the header to the scratch buffer */
                memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
                scratch += SizeOfXLogRecordBlockHeader;
-               if (needs_backup)
+               if (include_image)
                {
                        memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
                        scratch += SizeOfXLogRecordBlockImageHeader;
+                       if (cbimg.hole_length != 0 && is_compressed)
+                       {
+                               memcpy(scratch, &cbimg,
+                                          SizeOfXLogRecordBlockCompressHeader);
+                               scratch += SizeOfXLogRecordBlockCompressHeader;
+                       }
                }
                if (!samerel)
                {
@@ -629,18 +734,27 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
                scratch += sizeof(BlockNumber);
        }
 
+       /* followed by the record's origin, if any */
+       if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
+               replorigin_session_origin != InvalidRepOriginId)
+       {
+               *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
+               memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
+               scratch += sizeof(replorigin_session_origin);
+       }
+
        /* followed by main data, if any */
        if (mainrdata_len > 0)
        {
                if (mainrdata_len > 255)
                {
-                       *(scratch++) = XLR_BLOCK_ID_DATA_LONG;
+                       *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
                        memcpy(scratch, &mainrdata_len, sizeof(uint32));
                        scratch += sizeof(uint32);
                }
                else
                {
-                       *(scratch++) = XLR_BLOCK_ID_DATA_SHORT;
+                       *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
                        *(scratch++) = (uint8) mainrdata_len;
                }
                rdt_datas_last->next = mainrdata_head;
@@ -680,6 +794,56 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
        return &hdr_rdt;
 }
 
+/*
+ * Create a compressed version of a backup block image.
+ *
+ * Returns FALSE if compression fails (i.e., compressed result is actually
+ * bigger than original). Otherwise, returns TRUE and sets 'dlen' to
+ * the length of compressed block image.
+ */
+static bool
+XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
+                                               char *dest, uint16 *dlen)
+{
+       int32           orig_len = BLCKSZ - hole_length;
+       int32           len;
+       int32           extra_bytes = 0;
+       char       *source;
+       char            tmp[BLCKSZ];
+
+       if (hole_length != 0)
+       {
+               /* must skip the hole */
+               source = tmp;
+               memcpy(source, page, hole_offset);
+               memcpy(source + hole_offset,
+                          page + (hole_offset + hole_length),
+                          BLCKSZ - (hole_length + hole_offset));
+
+               /*
+                * Extra data needs to be stored in WAL record for the compressed
+                * version of block image if the hole exists.
+                */
+               extra_bytes = SizeOfXLogRecordBlockCompressHeader;
+       }
+       else
+               source = page;
+
+       /*
+        * We recheck the actual size even if pglz_compress() reports success and
+        * see if the number of bytes saved by compression is larger than the
+        * length of extra data needed for the compressed version of block image.
+        */
+       len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
+       if (len >= 0 &&
+               len + extra_bytes < orig_len)
+       {
+               *dlen = (uint16) len;   /* successful compression */
+               return true;
+       }
+       return false;
+}
+
 /*
  * Determine whether the buffer referenced has to be backed up.
  *
@@ -786,7 +950,7 @@ XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
                BufferGetTag(buffer, &rnode, &forkno, &blkno);
                XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer, flags);
 
-               recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
+               recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
        }
 
        return recptr;
@@ -868,9 +1032,7 @@ InitXLogInsert(void)
        {
                xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
                                                                                           "WAL record construction",
-                                                                                          ALLOCSET_DEFAULT_MINSIZE,
-                                                                                          ALLOCSET_DEFAULT_INITSIZE,
-                                                                                          ALLOCSET_DEFAULT_MAXSIZE);
+                                                                                          ALLOCSET_DEFAULT_SIZES);
        }
 
        if (registered_buffers == NULL)
@@ -891,5 +1053,6 @@ InitXLogInsert(void)
         * Allocate a buffer to hold the header information for a WAL record.
         */
        if (hdr_scratch == NULL)
-               hdr_scratch = palloc0(HEADER_SCRATCH_SIZE);
+               hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
+                                                                                        HEADER_SCRATCH_SIZE);
 }