]> granicus.if.org Git - postgresql/commitdiff
Allow I/O reliability checks using 16-bit checksums
authorSimon Riggs <simon@2ndQuadrant.com>
Fri, 22 Mar 2013 13:54:07 +0000 (13:54 +0000)
committerSimon Riggs <simon@2ndQuadrant.com>
Fri, 22 Mar 2013 13:54:07 +0000 (13:54 +0000)
Checksums are set immediately prior to flush out of shared buffers
and checked when pages are read in again. Hint bit setting will
require full page write when block is dirtied, which causes various
infrastructure changes. Extensive comments, docs and README.

WARNING message thrown if checksum fails on non-all zeroes page;
ERROR thrown but can be disabled with ignore_checksum_failure = on.

Feature enabled by an initdb option, since transition from option off
to option on is long and complex and has not yet been implemented.
Default is not to use checksums.

Checksum used is WAL CRC-32 truncated to 16-bits.

Simon Riggs, Jeff Davis, Greg Smith
Wide input and assistance from many community members. Thank you.

40 files changed:
contrib/pg_upgrade/controldata.c
contrib/pg_upgrade/pg_upgrade.h
doc/src/sgml/config.sgml
doc/src/sgml/ref/initdb.sgml
src/backend/access/gist/gistget.c
src/backend/access/hash/hash.c
src/backend/access/heap/heapam.c
src/backend/access/heap/pruneheap.c
src/backend/access/heap/rewriteheap.c
src/backend/access/heap/visibilitymap.c
src/backend/access/nbtree/nbtinsert.c
src/backend/access/nbtree/nbtree.c
src/backend/access/nbtree/nbtsort.c
src/backend/access/nbtree/nbtutils.c
src/backend/access/rmgrdesc/xlogdesc.c
src/backend/access/spgist/spginsert.c
src/backend/access/transam/README
src/backend/access/transam/xlog.c
src/backend/bootstrap/bootstrap.c
src/backend/commands/matview.c
src/backend/commands/sequence.c
src/backend/commands/tablecmds.c
src/backend/commands/vacuumlazy.c
src/backend/storage/buffer/bufmgr.c
src/backend/storage/buffer/localbuf.c
src/backend/storage/freespace/README
src/backend/storage/freespace/freespace.c
src/backend/storage/freespace/fsmpage.c
src/backend/storage/page/bufpage.c
src/backend/utils/misc/guc.c
src/backend/utils/time/tqual.c
src/bin/initdb/initdb.c
src/bin/pg_controldata/pg_controldata.c
src/bin/pg_resetxlog/pg_resetxlog.c
src/include/access/heapam_xlog.h
src/include/access/visibilitymap.h
src/include/access/xlog.h
src/include/catalog/pg_control.h
src/include/storage/bufmgr.h
src/include/storage/bufpage.h

index c33c20c7c015b7ec1bc83693a02f7828924f2311..41a8c694e7a77076cf5ac669d9d08179457b964a 100644 (file)
@@ -56,6 +56,7 @@ get_control_data(ClusterInfo *cluster, bool live_check)
        bool            got_toast = false;
        bool            got_date_is_int = false;
        bool            got_float8_pass_by_value = false;
+       bool            got_data_checksums = false;
        char       *lc_collate = NULL;
        char       *lc_ctype = NULL;
        char       *lc_monetary = NULL;
@@ -131,6 +132,13 @@ get_control_data(ClusterInfo *cluster, bool live_check)
                got_float8_pass_by_value = true;
        }
 
+       /* Only in <= 9.2 */
+       if (GET_MAJOR_VERSION(cluster->major_version) <= 902)
+       {
+               cluster->controldata.data_checksums = false;
+               got_data_checksums = true;
+       }
+
        /* we have the result of cmd in "output". so parse it line by line now */
        while (fgets(bufin, sizeof(bufin), output))
        {
@@ -393,6 +401,18 @@ get_control_data(ClusterInfo *cluster, bool live_check)
                        cluster->controldata.float8_pass_by_value = strstr(p, "by value") != NULL;
                        got_float8_pass_by_value = true;
                }
+               else if ((p = strstr(bufin, "checksums")) != NULL)
+               {
+                       p = strchr(p, ':');
+
+                       if (p == NULL || strlen(p) <= 1)
+                               pg_log(PG_FATAL, "%d: controldata retrieval problem\n", __LINE__);
+
+                       p++;                            /* removing ':' char */
+                       /* used later for contrib check */
+                       cluster->controldata.data_checksums = strstr(p, "enabled") != NULL;
+                       got_data_checksums = true;
+               }
                /* In pre-8.4 only */
                else if ((p = strstr(bufin, "LC_COLLATE:")) != NULL)
                {
@@ -476,7 +496,7 @@ get_control_data(ClusterInfo *cluster, bool live_check)
                !got_tli ||
                !got_align || !got_blocksz || !got_largesz || !got_walsz ||
                !got_walseg || !got_ident || !got_index || !got_toast ||
-               !got_date_is_int || !got_float8_pass_by_value)
+               !got_date_is_int || !got_float8_pass_by_value || !got_data_checksums)
        {
                pg_log(PG_REPORT,
                        "The %s cluster lacks some required control information:\n",
@@ -535,6 +555,10 @@ get_control_data(ClusterInfo *cluster, bool live_check)
                if (!got_float8_pass_by_value)
                        pg_log(PG_REPORT, "  float8 argument passing method\n");
 
+               /* value added in Postgres 9.3 */
+               if (!got_data_checksums)
+                       pg_log(PG_REPORT, "  data checksums\n");
+
                pg_log(PG_FATAL,
                           "Cannot continue without required control information, terminating\n");
        }
@@ -596,6 +620,12 @@ check_control_data(ControlData *oldctrl,
                           "--disable-integer-datetimes or get server binaries built with those\n"
                           "options.\n");
        }
+
+       if (oldctrl->data_checksums != newctrl->data_checksums)
+       {
+               pg_log(PG_FATAL,
+                          "old and new pg_controldata checksums settings are invalid or do not match\n");
+       }
 }
 
 
index 497098199f6de0fe2a469262c51d501deaac79f0..370315f0cb34bc5824dc7974aac1cfc3aa136302 100644 (file)
@@ -202,6 +202,7 @@ typedef struct
        uint32          toast;
        bool            date_is_int;
        bool            float8_pass_by_value;
+       bool            data_checksums;
        char       *lc_collate;
        char       *lc_ctype;
        char       *encoding;
index 8c520e1267ba158c18228dbb1899ac16aa20fa28..95de86441e4254a98f8c2265d33dcd02faff2dca 100644 (file)
@@ -6629,6 +6629,30 @@ LOG:  CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1)
       </listitem>
      </varlistentry>
 
+    <varlistentry id="guc-ignore-checksum-failure" xreflabel="ignore_checksum_failure">
+      <term><varname>ignore_checksum_failure</varname> (<type>boolean</type>)</term>
+      <indexterm>
+       <primary><varname>ignore_checksum_failure</> configuration parameter</primary>
+      </indexterm>
+      <listitem>
+       <para>
+        Only has effect if <xref linkend="app-initdb-data-checksums"> are enabled.
+       </para>
+       <para>
+        Detection of a checksum failure during a read normally causes
+        <productname>PostgreSQL</> to report an error, aborting the current
+        transaction.  Setting <varname>ignore_checksum_failure</> to on causes
+        the system to ignore the failure (but still report a warning), and
+        continue processing.  This behavior may <emphasis>cause crashes, propagate
+        or hide corruption, or other serious problems</>.  However, it may allow
+        you to get past the error and retrieve undamaged tuples that might still be
+        present in the table if the block header is still sane. If the header is
+        corrupt an error will be reported even if this option is enabled. The
+        default setting is <literal>off</>, and it can only be changed by a superuser.
+       </para>
+      </listitem>
+     </varlistentry>
+
     <varlistentry id="guc-zero-damaged-pages" xreflabel="zero_damaged_pages">
       <term><varname>zero_damaged_pages</varname> (<type>boolean</type>)</term>
       <indexterm>
index a1e46eb4c6766df85f9e9d1b9743cdab9caf3183..b1067e235058bae655249b1bad20c791494ce3d1 100644 (file)
@@ -182,6 +182,20 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry id="app-initdb-data-checksums" xreflabel="data checksums">
+      <term><option>-k</option></term>
+      <term><option>--data-checksums</option></term>
+      <listitem>
+       <para>
+        Use checksums on data pages to help detect corruption by the
+        I/O system that would otherwise be silent. Enabling checksums
+        may incur a noticeable performance penalty. This option can only
+        be set during initialization, and cannot be changed later. If
+        set, checksums are calculated for all objects, in all databases.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>--locale=<replaceable>locale</replaceable></option></term>
       <listitem>
index 3300fec644ece33f43532a842583f80e5bd8c10b..cef31ce66e99be832f7f3d00e6b9abc1a40bc43a 100644 (file)
@@ -362,8 +362,12 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
                        {
                                /* Creating index-page GISTSearchItem */
                                item->blkno = ItemPointerGetBlockNumber(&it->t_tid);
-                               /* lsn of current page is lsn of parent page for child */
-                               item->data.parentlsn = PageGetLSN(page);
+
+                               /*
+                                * LSN of current page is lsn of parent page for child. We only
+                                * have a shared lock, so we need to get the LSN atomically.
+                                */
+                               item->data.parentlsn = BufferGetLSNAtomic(buffer);
                        }
 
                        /* Insert it into the queue using new distance data */
index 2813dd1e7498ce4ff4e77238245f55576cfa3931..5ca27a231e2798d895077d5f0d214955fe77c1cb 100644 (file)
@@ -285,11 +285,9 @@ hashgettuple(PG_FUNCTION_ARGS)
                        ItemIdMarkDead(PageGetItemId(page, offnum));
 
                        /*
-                        * Since this can be redone later if needed, it's treated the same
-                        * as a commit-hint-bit status update for heap tuples: we mark the
-                        * buffer dirty but don't make a WAL log entry.
+                        * Since this can be redone later if needed, mark as a hint.
                         */
-                       SetBufferCommitInfoNeedsSave(buf);
+                       MarkBufferDirtyHint(buf);
                }
 
                /*
index 151422a1b0620c25baba5bb64db1d0ad16e6700e..fe563188876b01ffcbd63b2fd9a546071c8372cc 100644 (file)
@@ -5754,17 +5754,23 @@ log_heap_freeze(Relation reln, Buffer buffer,
  * being marked all-visible, and vm_buffer is the buffer containing the
  * corresponding visibility map block. Both should have already been modified
  * and dirtied.
+ *
+ * If checksums are enabled, we also add the heap_buffer to the chain to
+ * protect it from being torn.
  */
 XLogRecPtr
-log_heap_visible(RelFileNode rnode, BlockNumber block, Buffer vm_buffer,
+log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
                                 TransactionId cutoff_xid)
 {
        xl_heap_visible xlrec;
        XLogRecPtr      recptr;
-       XLogRecData rdata[2];
+       XLogRecData rdata[3];
+
+       Assert(BufferIsValid(heap_buffer));
+       Assert(BufferIsValid(vm_buffer));
 
        xlrec.node = rnode;
-       xlrec.block = block;
+       xlrec.block = BufferGetBlockNumber(heap_buffer);
        xlrec.cutoff_xid = cutoff_xid;
 
        rdata[0].data = (char *) &xlrec;
@@ -5778,6 +5784,17 @@ log_heap_visible(RelFileNode rnode, BlockNumber block, Buffer vm_buffer,
        rdata[1].buffer_std = false;
        rdata[1].next = NULL;
 
+       if (DataChecksumsEnabled())
+       {
+               rdata[1].next = &(rdata[2]);
+
+               rdata[2].data = NULL;
+               rdata[2].len = 0;
+               rdata[2].buffer = heap_buffer;
+               rdata[2].buffer_std = true;
+               rdata[2].next = NULL;
+       }
+
        recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_VISIBLE, rdata);
 
        return recptr;
@@ -6139,8 +6156,6 @@ static void
 heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
 {
        xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record);
-       Buffer          buffer;
-       Page            page;
 
        /*
         * If there are any Hot Standby transactions running that have an xmin
@@ -6155,39 +6170,56 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
                ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, xlrec->node);
 
        /*
-        * Read the heap page, if it still exists.      If the heap file has been
-        * dropped or truncated later in recovery, we don't need to update the
-        * page, but we'd better still update the visibility map.
+        * If heap block was backed up, restore it. This can only happen with
+        * checksums enabled.
         */
-       buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block,
-                                                                       RBM_NORMAL);
-       if (BufferIsValid(buffer))
+       if (record->xl_info & XLR_BKP_BLOCK(1))
        {
-               LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
-               page = (Page) BufferGetPage(buffer);
+               Assert(DataChecksumsEnabled());
+               (void) RestoreBackupBlock(lsn, record, 1, false, false);
+       }
+       else
+       {
+               Buffer          buffer;
+               Page            page;
 
                /*
-                * We don't bump the LSN of the heap page when setting the visibility
-                * map bit, because that would generate an unworkable volume of
-                * full-page writes.  This exposes us to torn page hazards, but since
-                * we're not inspecting the existing page contents in any way, we
-                * don't care.
-                *
-                * However, all operations that clear the visibility map bit *do* bump
-                * the LSN, and those operations will only be replayed if the XLOG LSN
-                * follows the page LSN.  Thus, if the page LSN has advanced past our
-                * XLOG record's LSN, we mustn't mark the page all-visible, because
-                * the subsequent update won't be replayed to clear the flag.
+                * Read the heap page, if it still exists. If the heap file has been
+                * dropped or truncated later in recovery, we don't need to update the
+                * page, but we'd better still update the visibility map.
                 */
-               if (lsn > PageGetLSN(page))
+               buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM,
+                                                                               xlrec->block, RBM_NORMAL);
+               if (BufferIsValid(buffer))
                {
-                       PageSetAllVisible(page);
-                       MarkBufferDirty(buffer);
-               }
+                       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
-               /* Done with heap page. */
-               UnlockReleaseBuffer(buffer);
+                       page = (Page) BufferGetPage(buffer);
+
+                       /*
+                        * We don't bump the LSN of the heap page when setting the
+                        * visibility map bit (unless checksums are enabled, in which case
+                        * we must), because that would generate an unworkable volume of
+                        * full-page writes.  This exposes us to torn page hazards, but
+                        * since we're not inspecting the existing page contents in any
+                        * way, we don't care.
+                        *
+                        * However, all operations that clear the visibility map bit *do*
+                        * bump the LSN, and those operations will only be replayed if the
+                        * XLOG LSN follows the page LSN.  Thus, if the page LSN has
+                        * advanced past our XLOG record's LSN, we mustn't mark the page
+                        * all-visible, because the subsequent update won't be replayed to
+                        * clear the flag.
+                        */
+                       if (lsn > PageGetLSN(page))
+                       {
+                               PageSetAllVisible(page);
+                               MarkBufferDirty(buffer);
+                       }
+
+                       /* Done with heap page. */
+                       UnlockReleaseBuffer(buffer);
+               }
        }
 
        /*
@@ -6218,7 +6250,7 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
                 * real harm is done; and the next VACUUM will fix it.
                 */
                if (lsn > PageGetLSN(BufferGetPage(vmbuffer)))
-                       visibilitymap_set(reln, xlrec->block, lsn, vmbuffer,
+                       visibilitymap_set(reln, xlrec->block, InvalidBuffer, lsn, vmbuffer,
                                                          xlrec->cutoff_xid);
 
                ReleaseBuffer(vmbuffer);
index 0fc032e128b88fafdc057e0d550e328311c71af3..2ab723ddf196c6d6925e10b19df647a0cdfd80bb 100644 (file)
@@ -262,7 +262,7 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
                {
                        ((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
                        PageClearFull(page);
-                       SetBufferCommitInfoNeedsSave(buffer);
+                       MarkBufferDirtyHint(buffer);
                }
        }
 
index 84472f80cd9dd1c439ff80d538013e3aa2da205c..8a22eccf87aa9fbae8b7126907d1d67873c6c9c1 100644 (file)
@@ -273,6 +273,8 @@ end_heap_rewrite(RewriteState state)
        /* Write the last page, if any */
        if (state->rs_buffer_valid)
        {
+               PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
+
                if (state->rs_use_wal)
                        log_newpage(&state->rs_new_rel->rd_node,
                                                MAIN_FORKNUM,
@@ -614,6 +616,8 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
                {
                        /* Doesn't fit, so write out the existing page */
 
+                       PageSetChecksumInplace(page, state->rs_blockno);
+
                        /* XLOG stuff */
                        if (state->rs_use_wal)
                                log_newpage(&state->rs_new_rel->rd_node,
index 3209c87bb86f0c296c97a30b955537ba704a6aa4..af64fe97e89f92068d0bdd145a0120de332e4d5f 100644 (file)
@@ -233,13 +233,18 @@ visibilitymap_pin_ok(BlockNumber heapBlk, Buffer buf)
  * marked all-visible; it is needed for Hot Standby, and can be
  * InvalidTransactionId if the page contains no tuples.
  *
+ * Caller is expected to set the heap page's PD_ALL_VISIBLE bit before calling
+ * this function. Except in recovery, caller should also pass the heap
+ * buffer. When checksums are enabled and we're not in recovery, we must add
+ * the heap buffer to the WAL chain to protect it from being torn.
+ *
  * You must pass a buffer containing the correct map page to this function.
  * Call visibilitymap_pin first to pin the right one. This function doesn't do
  * any I/O.
  */
 void
-visibilitymap_set(Relation rel, BlockNumber heapBlk, XLogRecPtr recptr,
-                                 Buffer buf, TransactionId cutoff_xid)
+visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
+                                 XLogRecPtr recptr, Buffer vmBuf, TransactionId cutoff_xid)
 {
        BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
        uint32          mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
@@ -252,34 +257,55 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, XLogRecPtr recptr,
 #endif
 
        Assert(InRecovery || XLogRecPtrIsInvalid(recptr));
+       Assert(InRecovery || BufferIsValid(heapBuf));
 
-       /* Check that we have the right page pinned */
-       if (!BufferIsValid(buf) || BufferGetBlockNumber(buf) != mapBlock)
-               elog(ERROR, "wrong buffer passed to visibilitymap_set");
+       /* Check that we have the right heap page pinned, if present */
+       if (BufferIsValid(heapBuf) && BufferGetBlockNumber(heapBuf) != heapBlk)
+               elog(ERROR, "wrong heap buffer passed to visibilitymap_set");
 
-       page = BufferGetPage(buf);
+       /* Check that we have the right VM page pinned */
+       if (!BufferIsValid(vmBuf) || BufferGetBlockNumber(vmBuf) != mapBlock)
+               elog(ERROR, "wrong VM buffer passed to visibilitymap_set");
+
+       page = BufferGetPage(vmBuf);
        map = PageGetContents(page);
-       LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+       LockBuffer(vmBuf, BUFFER_LOCK_EXCLUSIVE);
 
        if (!(map[mapByte] & (1 << mapBit)))
        {
                START_CRIT_SECTION();
 
                map[mapByte] |= (1 << mapBit);
-               MarkBufferDirty(buf);
+               MarkBufferDirty(vmBuf);
 
                if (RelationNeedsWAL(rel))
                {
                        if (XLogRecPtrIsInvalid(recptr))
-                               recptr = log_heap_visible(rel->rd_node, heapBlk, buf,
+                       {
+                               Assert(!InRecovery);
+                               recptr = log_heap_visible(rel->rd_node, heapBuf, vmBuf,
                                                                                  cutoff_xid);
+
+                               /*
+                                * If data checksums are enabled, we need to protect the heap
+                                * page from being torn.
+                                */
+                               if (DataChecksumsEnabled())
+                               {
+                                       Page heapPage = BufferGetPage(heapBuf);
+
+                                       /* caller is expected to set PD_ALL_VISIBLE first */
+                                       Assert(PageIsAllVisible(heapPage));
+                                       PageSetLSN(heapPage, recptr);
+                               }
+                       }
                        PageSetLSN(page, recptr);
                }
 
                END_CRIT_SECTION();
        }
 
-       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+       LockBuffer(vmBuf, BUFFER_LOCK_UNLOCK);
 }
 
 /*
@@ -579,6 +605,8 @@ vm_extend(Relation rel, BlockNumber vm_nblocks)
        /* Now extend the file */
        while (vm_nblocks_now < vm_nblocks)
        {
+               PageSetChecksumInplace(pg, vm_nblocks_now);
+
                smgrextend(rel->rd_smgr, VISIBILITYMAP_FORKNUM, vm_nblocks_now,
                                   (char *) pg, false);
                vm_nblocks_now++;
index 63e099b0fd982316d9b873244e34cd671fae6e5a..6ad4f765f5bb335d91bd5c15e62d3670364c86e5 100644 (file)
@@ -407,11 +407,15 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
                                         */
                                        ItemIdMarkDead(curitemid);
                                        opaque->btpo_flags |= BTP_HAS_GARBAGE;
-                                       /* be sure to mark the proper buffer dirty... */
+
+                                       /*
+                                        * Mark buffer with a dirty hint, since state is not
+                                        * crucial. Be sure to mark the proper buffer dirty.
+                                        */
                                        if (nbuf != InvalidBuffer)
-                                               SetBufferCommitInfoNeedsSave(nbuf);
+                                               MarkBufferDirtyHint(nbuf);
                                        else
-                                               SetBufferCommitInfoNeedsSave(buf);
+                                               MarkBufferDirtyHint(buf);
                                }
                        }
                }
index 0e04168365220e1cb13691e03076fe01976d9e1c..621b0556390f1c27d3b268f01033e6c393d6fdcc 100644 (file)
@@ -217,6 +217,7 @@ btbuildempty(PG_FUNCTION_ARGS)
        _bt_initmetapage(metapage, P_NONE, 0);
 
        /* Write the page.      If archiving/streaming, XLOG it. */
+       PageSetChecksumInplace(metapage, BTREE_METAPAGE);
        smgrwrite(index->rd_smgr, INIT_FORKNUM, BTREE_METAPAGE,
                          (char *) metapage, true);
        if (XLogIsNeeded())
@@ -1051,7 +1052,7 @@ restart:
                                opaque->btpo_cycleid == vstate->cycleid)
                        {
                                opaque->btpo_cycleid = 0;
-                               SetBufferCommitInfoNeedsSave(buf);
+                               MarkBufferDirtyHint(buf);
                        }
                }
 
index abd99954e93fb62104e4f56b64f5878435d13949..52c5a2676ef477ccf026e0491fd9dac9f5f6dd23 100644 (file)
@@ -288,12 +288,15 @@ _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
        {
                if (!wstate->btws_zeropage)
                        wstate->btws_zeropage = (Page) palloc0(BLCKSZ);
+               /* don't set checksum for all-zero page */
                smgrextend(wstate->index->rd_smgr, MAIN_FORKNUM,
                                   wstate->btws_pages_written++,
                                   (char *) wstate->btws_zeropage,
                                   true);
        }
 
+       PageSetChecksumInplace(page, blkno);
+
        /*
         * Now write the page.  There's no need for smgr to schedule an fsync for
         * this write; we'll do it ourselves before ending the build.
index 04e1ac4a9bd4ae558e375fd147cf538cf86f2ed3..fe53ec1fe0a1eb8887daeea4846840445c3555c4 100644 (file)
@@ -1781,9 +1781,7 @@ _bt_killitems(IndexScanDesc scan, bool haveLock)
        }
 
        /*
-        * Since this can be redone later if needed, it's treated the same as a
-        * commit-hint-bit status update for heap tuples: we mark the buffer dirty
-        * but don't make a WAL log entry.
+        * Since this can be redone later if needed, mark as dirty hint.
         *
         * Whenever we mark anything LP_DEAD, we also set the page's
         * BTP_HAS_GARBAGE flag, which is likewise just a hint.
@@ -1791,7 +1789,7 @@ _bt_killitems(IndexScanDesc scan, bool haveLock)
        if (killedsomething)
        {
                opaque->btpo_flags |= BTP_HAS_GARBAGE;
-               SetBufferCommitInfoNeedsSave(so->currPos.buf);
+               MarkBufferDirtyHint(so->currPos.buf);
        }
 
        if (!haveLock)
index b22e66e57953f4991e35e9c426147ad922e25e62..52cf75973573f08d75bec227aa1eb30cadccc262 100644 (file)
@@ -81,6 +81,10 @@ xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
                appendStringInfo(buf, "restore point: %s", xlrec->rp_name);
 
        }
+       else if (info == XLOG_HINT)
+       {
+               appendStringInfo(buf, "page hint");
+       }
        else if (info == XLOG_BACKUP_END)
        {
                XLogRecPtr      startpoint;
index ac01fd292c1dc0fd7cb723f4631089009c846f84..94384acc485d6b9886c68741e23ade267b1bb0c7 100644 (file)
@@ -154,6 +154,7 @@ spgbuildempty(PG_FUNCTION_ARGS)
        SpGistInitMetapage(page);
 
        /* Write the page.      If archiving/streaming, XLOG it. */
+       PageSetChecksumInplace(page, SPGIST_METAPAGE_BLKNO);
        smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO,
                          (char *) page, true);
        if (XLogIsNeeded())
@@ -163,6 +164,7 @@ spgbuildempty(PG_FUNCTION_ARGS)
        /* Likewise for the root page. */
        SpGistInitPage(page, SPGIST_LEAF);
 
+       PageSetChecksumInplace(page, SPGIST_ROOT_BLKNO);
        smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_ROOT_BLKNO,
                          (char *) page, true);
        if (XLogIsNeeded())
@@ -172,6 +174,7 @@ spgbuildempty(PG_FUNCTION_ARGS)
        /* Likewise for the null-tuples root page. */
        SpGistInitPage(page, SPGIST_LEAF | SPGIST_NULLS);
 
+       PageSetChecksumInplace(page, SPGIST_NULL_BLKNO);
        smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_NULL_BLKNO,
                          (char *) page, true);
        if (XLogIsNeeded())
index c77f9003441480faddea1982485e115ce82d8491..3a32471e9518c04796914cafe5cbf870a159d784 100644 (file)
@@ -437,6 +437,8 @@ critical section.)
 
 4. Mark the shared buffer(s) as dirty with MarkBufferDirty().  (This must
 happen before the WAL record is inserted; see notes in SyncOneBuffer().)
+Note that marking a buffer dirty with MarkBufferDirty() should only
+happen iff you write a WAL record; see Writing Hints below.
 
 5. If the relation requires WAL-logging, build a WAL log record and pass it
 to XLogInsert(); then update the page's LSN using the returned XLOG
@@ -584,6 +586,26 @@ replay code has to do the insertion on its own to restore the index to
 consistency.  Such insertions occur after WAL is operational, so they can
 and should write WAL records for the additional generated actions.
 
+Writing Hints
+-------------
+
+In some cases, we write additional information to data blocks without
+writing a preceding WAL record. This should only happen iff the data can
+be reconstructed later following a crash and the action is simply a way
+of optimising for performance. When a hint is written we use
+MarkBufferDirtyHint() to mark the block dirty.
+
+If the buffer is clean and checksums are in use then
+MarkBufferDirtyHint() inserts an XLOG_HINT record to ensure that we
+take a full page image that includes the hint. We do this to avoid
+a partial page write, when we write the dirtied page. WAL is not
+written during recovery, so we simply skip dirtying blocks because
+of hints when in recovery.
+
+If you do decide to optimise away a WAL record, then any calls to
+MarkBufferDirty() must be replaced by MarkBufferDirtyHint(),
+otherwise you will expose the risk of partial page writes.
+
 
 Write-Ahead Logging for Filesystem Actions
 ------------------------------------------
index 7f9edef435cda99b38c4f783857fdf811411e8b7..07c68adf0bcda74ddd6ae102e6da281733d792e7 100644 (file)
@@ -60,6 +60,7 @@
 #include "utils/timestamp.h"
 #include "pg_trace.h"
 
+extern bool bootstrap_data_checksums;
 
 /* File path names (all relative to $PGDATA) */
 #define RECOVERY_COMMAND_FILE  "recovery.conf"
@@ -730,6 +731,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
        bool            updrqst;
        bool            doPageWrites;
        bool            isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
+       bool            isHint = (rmid == RM_XLOG_ID && info == XLOG_HINT);
        uint8           info_orig = info;
        static XLogRecord *rechdr;
 
@@ -999,6 +1001,18 @@ begin:;
                goto begin;
        }
 
+       /*
+        * If this is a hint record and we don't need a backup block then
+        * we have no more work to do and can exit quickly without inserting
+        * a WAL record at all. In that case return InvalidXLogRecPtr.
+        */
+       if (isHint && !(info & XLR_BKP_BLOCK_MASK))
+       {
+               LWLockRelease(WALInsertLock);
+               END_CRIT_SECTION();
+               return InvalidXLogRecPtr;
+       }
+
        /*
         * If the current page is completely full, the record goes to the next
         * page, right after the page header.
@@ -1253,10 +1267,10 @@ XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
         * not. We don't need the buffer header lock for PageGetLSN because we
         * have exclusive lock on the page and/or the relation.
         */
-       *lsn = PageGetLSN(page);
+       *lsn = BufferGetLSNAtomic(rdata->buffer);
 
        if (doPageWrites &&
-               PageGetLSN(page) <= RedoRecPtr)
+               *lsn <= RedoRecPtr)
        {
                /*
                 * The page needs to be backed up, so set up *bkpb
@@ -3187,6 +3201,11 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
                                           BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
                        }
 
+                       /*
+                        * Any checksum set on this page will be invalid. We don't need
+                        * to reset it here since it will be set before being written.
+                        */
+
                        PageSetLSN(page, lsn);
                        MarkBufferDirty(buffer);
 
@@ -3766,6 +3785,16 @@ GetSystemIdentifier(void)
        return ControlFile->system_identifier;
 }
 
+/*
+ * Are checksums enabled for data pages?
+ */
+bool
+DataChecksumsEnabled(void)
+{
+       Assert(ControlFile != NULL);
+       return ControlFile->data_checksums;
+}
+
 /*
  * Returns a fake LSN for unlogged relations.
  *
@@ -4092,6 +4121,7 @@ BootStrapXLOG(void)
        ControlFile->max_prepared_xacts = max_prepared_xacts;
        ControlFile->max_locks_per_xact = max_locks_per_xact;
        ControlFile->wal_level = wal_level;
+       ControlFile->data_checksums = bootstrap_data_checksums;
 
        /* some additional ControlFile fields are set in WriteControlFile() */
 
@@ -7601,6 +7631,51 @@ XLogRestorePoint(const char *rpName)
        return RecPtr;
 }
 
+/*
+ * Write a backup block if needed when we are setting a hint. Note that
+ * this may be called for a variety of page types, not just heaps.
+ *
+ * Deciding the "if needed" part is delicate and requires us to either
+ * grab WALInsertLock or check the info_lck spinlock. If we check the
+ * spinlock and it says Yes then we will need to get WALInsertLock as well,
+ * so the design choice here is to just go straight for the WALInsertLock
+ * and trust that calls to this function are minimised elsewhere.
+ *
+ * Callable while holding just share lock on the buffer content.
+ *
+ * Possible that multiple concurrent backends could attempt to write
+ * WAL records. In that case, more than one backup block may be recorded
+ * though that isn't important to the outcome and the backup blocks are
+ * likely to be identical anyway.
+ */
+#define        XLOG_HINT_WATERMARK             13579
+XLogRecPtr
+XLogSaveBufferForHint(Buffer buffer)
+{
+       /*
+        * Make an XLOG entry reporting the hint
+        */
+       XLogRecData rdata[2];
+       int                     watermark = XLOG_HINT_WATERMARK;
+
+       /*
+        * Not allowed to have zero-length records, so use a small watermark
+        */
+       rdata[0].data = (char *) (&watermark);
+       rdata[0].len = sizeof(int);
+       rdata[0].buffer = InvalidBuffer;
+       rdata[0].buffer_std = false;
+       rdata[0].next = &(rdata[1]);
+
+       rdata[1].data = NULL;
+       rdata[1].len = 0;
+       rdata[1].buffer = buffer;
+       rdata[1].buffer_std = true;
+       rdata[1].next = NULL;
+
+       return XLogInsert(RM_XLOG_ID, XLOG_HINT, rdata);
+}
+
 /*
  * Check if any of the GUC parameters that are critical for hot standby
  * have changed, and update the value in pg_control file if necessary.
@@ -7767,8 +7842,8 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
-       /* Backup blocks are not used in xlog records */
-       Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+       /* Backup blocks are not used in most xlog records */
+       Assert(info == XLOG_HINT || !(record->xl_info & XLR_BKP_BLOCK_MASK));
 
        if (info == XLOG_NEXTOID)
        {
@@ -7961,6 +8036,34 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
        {
                /* nothing to do here */
        }
+       else if (info == XLOG_HINT)
+       {
+#ifdef USE_ASSERT_CHECKING
+               int     *watermark = (int *) XLogRecGetData(record);
+#endif
+
+               /* Check the watermark is correct for the hint record */
+               Assert(*watermark == XLOG_HINT_WATERMARK);
+
+               /* Backup blocks must be present for smgr hint records */
+               Assert(record->xl_info & XLR_BKP_BLOCK_MASK);
+
+               /*
+                * Hint records have no information that needs to be replayed.
+                * The sole purpose of them is to ensure that a hint bit does
+                * not cause a checksum invalidation if a hint bit write should
+                * cause a torn page. So the body of the record is empty but
+                * there must be one backup block.
+                *
+                * Since the only change in the backup block is a hint bit,
+                * there is no confict with Hot Standby.
+                *
+                * This also means there is no corresponding API call for this,
+                * so an smgr implementation has no need to implement anything.
+                * Which means nothing is needed in md.c etc
+                */
+               RestoreBackupBlock(lsn, record, 0, false, false);
+       }
        else if (info == XLOG_BACKUP_END)
        {
                XLogRecPtr      startpoint;
index 82ef726574e7f62b8a5247f79a31c84074c96c10..287f19b6eceefdeaa596f9679417fc3bd459349a 100644 (file)
@@ -48,6 +48,8 @@
 extern int     optind;
 extern char *optarg;
 
+bool bootstrap_data_checksums = false;
+
 
 #define ALLOC(t, c)            ((t *) calloc((unsigned)(c), sizeof(t)))
 
@@ -233,7 +235,7 @@ AuxiliaryProcessMain(int argc, char *argv[])
        /* If no -x argument, we are a CheckerProcess */
        MyAuxProcType = CheckerProcess;
 
-       while ((flag = getopt(argc, argv, "B:c:d:D:Fr:x:-:")) != -1)
+       while ((flag = getopt(argc, argv, "B:c:d:D:Fkr:x:-:")) != -1)
        {
                switch (flag)
                {
@@ -259,6 +261,9 @@ AuxiliaryProcessMain(int argc, char *argv[])
                        case 'F':
                                SetConfigOption("fsync", "false", PGC_POSTMASTER, PGC_S_ARGV);
                                break;
+                       case 'k':
+                               bootstrap_data_checksums = true;
+                               break;
                        case 'r':
                                strlcpy(OutputFileName, optarg, MAXPGPATH);
                                break;
index e20fedaeaf77772a47b34f423ddeb062b429bf1f..1d2b34782894b3249b46654239145e89ff2769a2 100644 (file)
@@ -76,6 +76,8 @@ SetRelationIsScannable(Relation relation)
                log_newpage(&(relation->rd_node), MAIN_FORKNUM, 0, page);
 
        RelationOpenSmgr(relation);
+
+       PageSetChecksumInplace(page, 0);
        smgrextend(relation->rd_smgr, MAIN_FORKNUM, 0, (char *) page, true);
 
        pfree(page);
index a360be4daafde763abb259cf8958d1fc1e88c8b5..c6add68b9f2d56714767122727c8e86a452dae6c 100644 (file)
@@ -1118,7 +1118,7 @@ read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple)
                HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
                seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
                seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
-               SetBufferCommitInfoNeedsSave(*buf);
+               MarkBufferDirtyHint(*buf);
        }
 
        seq = (Form_pg_sequence) GETSTRUCT(seqtuple);
index 51bf13944b2a6a54d65c7f58a7b521b9287f3e57..57cf0a07b5179e58f509ad6c4c1ca42db171c579 100644 (file)
@@ -8902,6 +8902,8 @@ copy_relation_data(SMgrRelation src, SMgrRelation dst,
 
                smgrread(src, forkNum, blkno, buf);
 
+               PageSetChecksumInplace(page, blkno);
+
                /* XLOG stuff */
                if (use_wal)
                        log_newpage(&dst->smgr_rnode.node, forkNum, blkno, page);
index 4d3364aeff09a92d55601a33cd4f851327ccbc33..d39269897a5c10c6150ba6fb8cae246e0f25a982 100644 (file)
@@ -672,8 +672,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                        {
                                PageSetAllVisible(page);
                                MarkBufferDirty(buf);
-                               visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, vmbuffer,
-                                                                 InvalidTransactionId);
+                               visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
+                                                                 vmbuffer, InvalidTransactionId);
                        }
 
                        UnlockReleaseBuffer(buf);
@@ -907,8 +907,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                        {
                                PageSetAllVisible(page);
                                MarkBufferDirty(buf);
-                               visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, vmbuffer,
-                                                                 visibility_cutoff_xid);
+                               visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
+                                                                 vmbuffer, visibility_cutoff_xid);
                        }
                        else if (!all_visible_according_to_vm)
                        {
@@ -918,8 +918,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                                 * allowed.  Set the visibility map bit as well so that we get
                                 * back in sync.
                                 */
-                               visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, vmbuffer,
-                                                                 visibility_cutoff_xid);
+                               visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
+                                                                 vmbuffer, visibility_cutoff_xid);
                        }
                }
 
@@ -1154,7 +1154,7 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
        {
                Assert(BufferIsValid(*vmbuffer));
                PageSetAllVisible(page);
-               visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, *vmbuffer,
+               visibilitymap_set(onerel, blkno, buffer, InvalidXLogRecPtr, *vmbuffer,
                                visibility_cutoff_xid);
        }
 
index 0b4c2ed0a018c62d8bc8b9b7134f5cecbce0e356..1cd0ac806a7b086989a71e21029c7d3314832725 100644 (file)
@@ -34,6 +34,7 @@
 #include <unistd.h>
 
 #include "catalog/catalog.h"
+#include "catalog/storage.h"
 #include "common/relpath.h"
 #include "executor/instrument.h"
 #include "miscadmin.h"
@@ -431,6 +432,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
        {
                /* new buffers are zero-filled */
                MemSet((char *) bufBlock, 0, BLCKSZ);
+               /* don't set checksum for all-zero page */
                smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
        }
        else
@@ -460,13 +462,13 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
                        }
 
                        /* check for garbage data */
-                       if (!PageHeaderIsValid((PageHeader) bufBlock))
+                       if (!PageIsVerified((Page) bufBlock, blockNum))
                        {
                                if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
                                {
                                        ereport(WARNING,
                                                        (errcode(ERRCODE_DATA_CORRUPTED),
-                                                        errmsg("invalid page header in block %u of relation %s; zeroing out page",
+                                                        errmsg("invalid page in block %u of relation %s; zeroing out page",
                                                                        blockNum,
                                                                        relpath(smgr->smgr_rnode, forkNum))));
                                        MemSet((char *) bufBlock, 0, BLCKSZ);
@@ -474,7 +476,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
                                else
                                        ereport(ERROR,
                                                        (errcode(ERRCODE_DATA_CORRUPTED),
-                                        errmsg("invalid page header in block %u of relation %s",
+                                        errmsg("invalid page in block %u of relation %s",
                                                        blockNum,
                                                        relpath(smgr->smgr_rnode, forkNum))));
                        }
@@ -655,14 +657,23 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
                                 * victim.      We need lock to inspect the page LSN, so this
                                 * can't be done inside StrategyGetBuffer.
                                 */
-                               if (strategy != NULL &&
-                                       XLogNeedsFlush(BufferGetLSN(buf)) &&
-                                       StrategyRejectBuffer(strategy, buf))
+                               if (strategy != NULL)
                                {
-                                       /* Drop lock/pin and loop around for another buffer */
-                                       LWLockRelease(buf->content_lock);
-                                       UnpinBuffer(buf, true);
-                                       continue;
+                                       XLogRecPtr      lsn;
+
+                                       /* Read the LSN while holding buffer header lock */
+                                       LockBufHdr(buf);
+                                       lsn = BufferGetLSN(buf);
+                                       UnlockBufHdr(buf);
+
+                                       if (XLogNeedsFlush(lsn) &&
+                                               StrategyRejectBuffer(strategy, buf))
+                                       {
+                                               /* Drop lock/pin and loop around for another buffer */
+                                               LWLockRelease(buf->content_lock);
+                                               UnpinBuffer(buf, true);
+                                               continue;
+                                       }
                                }
 
                                /* OK, do the I/O */
@@ -1906,6 +1917,8 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
        ErrorContextCallback errcallback;
        instr_time      io_start,
                                io_time;
+       Block           bufBlock;
+       char            *bufToWrite;
 
        /*
         * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
@@ -1931,6 +1944,18 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
                                                                                reln->smgr_rnode.node.dbNode,
                                                                                reln->smgr_rnode.node.relNode);
 
+       LockBufHdr(buf);
+
+       /*
+        * Run PageGetLSN while holding header lock, since we don't have the
+        * buffer locked exclusively in all cases.
+        */
+       recptr = BufferGetLSN(buf);
+
+       /* To check if block content changes while flushing. - vadim 01/17/97 */
+       buf->flags &= ~BM_JUST_DIRTIED;
+       UnlockBufHdr(buf);
+
        /*
         * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
         * rule that log updates must hit disk before any of the data-file changes
@@ -1949,10 +1974,7 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
         * buffer isn't permanent.
         */
        if (buf->flags & BM_PERMANENT)
-       {
-               recptr = BufferGetLSN(buf);
                XLogFlush(recptr);
-       }
 
        /*
         * Now it's safe to write buffer to disk. Note that no one else should
@@ -1960,18 +1982,20 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
         * we have the io_in_progress lock.
         */
 
-       /* To check if block content changes while flushing. - vadim 01/17/97 */
-       LockBufHdr(buf);
-       buf->flags &= ~BM_JUST_DIRTIED;
-       UnlockBufHdr(buf);
+       bufBlock = BufHdrGetBlock(buf);
+
+       bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
 
        if (track_io_timing)
                INSTR_TIME_SET_CURRENT(io_start);
 
+       /*
+        * bufToWrite is either the shared buffer or a copy, as appropriate.
+        */
        smgrwrite(reln,
                          buf->tag.forkNum,
                          buf->tag.blockNum,
-                         (char *) BufHdrGetBlock(buf),
+                         bufToWrite,
                          false);
 
        if (track_io_timing)
@@ -2042,6 +2066,34 @@ BufferIsPermanent(Buffer buffer)
        return (bufHdr->flags & BM_PERMANENT) != 0;
 }
 
+/*
+ * BufferGetLSNAtomic
+ *             Retrieves the LSN of the buffer atomically using a buffer header lock.
+ *             This is necessary for some callers who may not have an exclusive lock
+ *             on the buffer.
+ */
+XLogRecPtr
+BufferGetLSNAtomic(Buffer buffer)
+{
+       volatile BufferDesc *bufHdr = &BufferDescriptors[buffer - 1];
+       char                            *page = BufferGetPage(buffer);
+       XLogRecPtr                       lsn;
+
+       /* Local buffers don't need a lock. */
+       if (BufferIsLocal(buffer))
+               return PageGetLSN(page);
+
+       /* Make sure we've got a real buffer, and that we hold a pin on it. */
+       Assert(BufferIsValid(buffer));
+       Assert(BufferIsPinned(buffer));
+
+       LockBufHdr(bufHdr);
+       lsn = PageGetLSN(page);
+       UnlockBufHdr(bufHdr);
+
+       return lsn;
+}
+
 /* ---------------------------------------------------------------------
  *             DropRelFileNodeBuffers
  *
@@ -2343,7 +2395,10 @@ FlushRelationBuffers(Relation rel)
                        if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
                                (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
                        {
-                               ErrorContextCallback errcallback;
+                               ErrorContextCallback    errcallback;
+                               Page                                    localpage;
+
+                               localpage = (char *) LocalBufHdrGetBlock(bufHdr);
 
                                /* Setup error traceback support for ereport() */
                                errcallback.callback = local_buffer_write_error_callback;
@@ -2351,10 +2406,12 @@ FlushRelationBuffers(Relation rel)
                                errcallback.previous = error_context_stack;
                                error_context_stack = &errcallback;
 
+                               PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
+
                                smgrwrite(rel->rd_smgr,
                                                  bufHdr->tag.forkNum,
                                                  bufHdr->tag.blockNum,
-                                                 (char *) LocalBufHdrGetBlock(bufHdr),
+                                                 localpage,
                                                  false);
 
                                bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
@@ -2509,22 +2566,24 @@ IncrBufferRefCount(Buffer buffer)
 }
 
 /*
- * SetBufferCommitInfoNeedsSave
+ * MarkBufferDirtyHint
  *
- *     Mark a buffer dirty when we have updated tuple commit-status bits in it.
+ *     Mark a buffer dirty for non-critical changes.
  *
- * This is essentially the same as MarkBufferDirty, except that the caller
- * might have only share-lock instead of exclusive-lock on the buffer's
- * content lock.  We preserve the distinction mainly as a way of documenting
- * that the caller has not made a critical data change --- the status-bit
- * update could be redone by someone else just as easily.  Therefore, no WAL
- * log record need be generated, whereas calls to MarkBufferDirty really ought
- * to be associated with a WAL-entry-creating action.
+ * This is essentially the same as MarkBufferDirty, except:
+ *
+ * 1. The caller does not write WAL; so if checksums are enabled, we may need
+ *    to write an XLOG_HINT WAL record to protect against torn pages.
+ * 2. The caller might have only share-lock instead of exclusive-lock on the
+ *    buffer's content lock.
+ * 3. This function does not guarantee that the buffer is always marked dirty
+ *    (due to a race condition), so it cannot be used for important changes.
  */
 void
-SetBufferCommitInfoNeedsSave(Buffer buffer)
+MarkBufferDirtyHint(Buffer buffer)
 {
        volatile BufferDesc *bufHdr;
+       Page    page = BufferGetPage(buffer);
 
        if (!BufferIsValid(buffer))
                elog(ERROR, "bad buffer ID: %d", buffer);
@@ -2544,28 +2603,105 @@ SetBufferCommitInfoNeedsSave(Buffer buffer)
        /*
         * This routine might get called many times on the same page, if we are
         * making the first scan after commit of an xact that added/deleted many
-        * tuples.      So, be as quick as we can if the buffer is already dirty.  We
-        * do this by not acquiring spinlock if it looks like the status bits are
-        * already.  Since we make this test unlocked, there's a chance we might
-        * fail to notice that the flags have just been cleared, and failed to
-        * reset them, due to memory-ordering issues.  But since this function is
-        * only intended to be used in cases where failing to write out the data
+        * tuples. So, be as quick as we can if the buffer is already dirty.  We do
+        * this by not acquiring spinlock if it looks like the status bits are
+        * already set.  Since we make this test unlocked, there's a chance we
+        * might fail to notice that the flags have just been cleared, and failed
+        * to reset them, due to memory-ordering issues.  But since this function
+        * is only intended to be used in cases where failing to write out the data
         * would be harmless anyway, it doesn't really matter.
         */
        if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
                (BM_DIRTY | BM_JUST_DIRTIED))
        {
+               XLogRecPtr      lsn = InvalidXLogRecPtr;
+               bool            dirtied = false;
+               bool            delayChkpt = false;
+
+               /*
+                * If checksums are enabled, and the buffer is permanent, then a full
+                * page image may be required even for some hint bit updates to protect
+                * against torn pages. This full page image is only necessary if the
+                * hint bit update is the first change to the page since the last
+                * checkpoint.
+                *
+                * We don't check full_page_writes here because that logic is
+                * included when we call XLogInsert() since the value changes
+                * dynamically.
+                */
+               if (DataChecksumsEnabled() && (bufHdr->flags & BM_PERMANENT))
+               {
+                       /*
+                        * If we're in recovery we cannot dirty a page because of a hint.
+                        * We can set the hint, just not dirty the page as a result so
+                        * the hint is lost when we evict the page or shutdown.
+                        *
+                        * See src/backend/storage/page/README for longer discussion.
+                        */
+                       if (RecoveryInProgress())
+                               return;
+
+                       /*
+                        * If the block is already dirty because we either made a change
+                        * or set a hint already, then we don't need to write a full page
+                        * image.  Note that aggressive cleaning of blocks
+                        * dirtied by hint bit setting would increase the call rate.
+                        * Bulk setting of hint bits would reduce the call rate...
+                        *
+                        * We must issue the WAL record before we mark the buffer dirty.
+                        * Otherwise we might write the page before we write the WAL.
+                        * That causes a race condition, since a checkpoint might occur
+                        * between writing the WAL record and marking the buffer dirty.
+                        * We solve that with a kluge, but one that is already in use
+                        * during transaction commit to prevent race conditions.
+                        * Basically, we simply prevent the checkpoint WAL record from
+                        * being written until we have marked the buffer dirty. We don't
+                        * start the checkpoint flush until we have marked dirty, so our
+                        * checkpoint must flush the change to disk successfully or the
+                        * checkpoint never gets written, so crash recovery will fix.
+                        *
+                        * It's possible we may enter here without an xid, so it is
+                        * essential that CreateCheckpoint waits for virtual transactions
+                        * rather than full transactionids.
+                        */
+                       MyPgXact->delayChkpt = delayChkpt = true;
+                       lsn = XLogSaveBufferForHint(buffer);
+               }
+
                LockBufHdr(bufHdr);
                Assert(bufHdr->refcount > 0);
                if (!(bufHdr->flags & BM_DIRTY))
                {
-                       /* Do vacuum cost accounting */
+                       dirtied = true;         /* Means "will be dirtied by this action" */
+
+                       /*
+                        * Set the page LSN if we wrote a backup block. We aren't
+                        * supposed to set this when only holding a share lock but
+                        * as long as we serialise it somehow we're OK. We choose to
+                        * set LSN while holding the buffer header lock, which causes
+                        * any reader of an LSN who holds only a share lock to also
+                        * obtain a buffer header lock before using PageGetLSN().
+                        * Fortunately, thats not too many places.
+                        *
+                        * If checksums are enabled, you might think we should reset the
+                        * checksum here. That will happen when the page is written
+                        * sometime later in this checkpoint cycle.
+                        */
+                       if (!XLogRecPtrIsInvalid(lsn))
+                               PageSetLSN(page, lsn);
+               }
+               bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+               UnlockBufHdr(bufHdr);
+
+               if (delayChkpt)
+                       MyPgXact->delayChkpt = false;
+
+               if (dirtied)
+               {
                        VacuumPageDirty++;
                        if (VacuumCostActive)
                                VacuumCostBalance += VacuumCostPageDirty;
                }
-               bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
-               UnlockBufHdr(bufHdr);
        }
 }
 
index 03055c96d8538c70654b521db9b1ed12f8b1e8df..c67271a4bdf99c0605a947b115cce115f65424b0 100644 (file)
@@ -196,16 +196,19 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
         */
        if (bufHdr->flags & BM_DIRTY)
        {
-               SMgrRelation oreln;
+               SMgrRelation    oreln;
+               Page                    localpage = (char *) LocalBufHdrGetBlock(bufHdr);
 
                /* Find smgr relation for buffer */
                oreln = smgropen(bufHdr->tag.rnode, MyBackendId);
 
+               PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
+
                /* And write... */
                smgrwrite(oreln,
                                  bufHdr->tag.forkNum,
                                  bufHdr->tag.blockNum,
-                                 (char *) LocalBufHdrGetBlock(bufHdr),
+                                 localpage,
                                  false);
 
                /* Mark not-dirty now in case we error out below */
index 9732ad54d3ef237fe076791866bb04c2bd013c5a..bbd1b93fac6ad7022c897d879af216f0a6ac92c1 100644 (file)
@@ -169,7 +169,9 @@ Recovery
 --------
 
 The FSM is not explicitly WAL-logged. Instead, we rely on a bunch of
-self-correcting measures to repair possible corruption.
+self-correcting measures to repair possible corruption. As a result when
+we write to the FSM we treat that as a hint and thus use MarkBufferDirtyHint()
+rather than MarkBufferDirty().
 
 First of all, whenever a value is set on an FSM page, the root node of the
 page is compared against the new value after bubbling up the change is
index 9c2afc5b0e874767b464ccf82aeb1c167089afc1..b76bf9be6b416117f69db651e0f23cf4f08c4019 100644 (file)
@@ -216,7 +216,7 @@ XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
                PageInit(page, BLCKSZ, 0);
 
        if (fsm_set_avail(page, slot, new_cat))
-               MarkBufferDirty(buf);
+               MarkBufferDirtyHint(buf);
        UnlockReleaseBuffer(buf);
 }
 
@@ -286,7 +286,7 @@ FreeSpaceMapTruncateRel(Relation rel, BlockNumber nblocks)
                        return;                         /* nothing to do; the FSM was already smaller */
                LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
                fsm_truncate_avail(BufferGetPage(buf), first_removed_slot);
-               MarkBufferDirty(buf);
+               MarkBufferDirtyHint(buf);
                UnlockReleaseBuffer(buf);
 
                new_nfsmblocks = fsm_logical_to_physical(first_removed_address) + 1;
@@ -583,6 +583,8 @@ fsm_extend(Relation rel, BlockNumber fsm_nblocks)
 
        while (fsm_nblocks_now < fsm_nblocks)
        {
+               PageSetChecksumInplace(pg, fsm_nblocks_now);
+
                smgrextend(rel->rd_smgr, FSM_FORKNUM, fsm_nblocks_now,
                                   (char *) pg, false);
                fsm_nblocks_now++;
@@ -617,7 +619,7 @@ fsm_set_and_search(Relation rel, FSMAddress addr, uint16 slot,
        page = BufferGetPage(buf);
 
        if (fsm_set_avail(page, slot, newValue))
-               MarkBufferDirty(buf);
+               MarkBufferDirtyHint(buf);
 
        if (minValue != 0)
        {
@@ -768,7 +770,7 @@ fsm_vacuum_page(Relation rel, FSMAddress addr, bool *eof_p)
                        {
                                LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
                                fsm_set_avail(BufferGetPage(buf), slot, child_avail);
-                               MarkBufferDirty(buf);
+                               MarkBufferDirtyHint(buf);
                                LockBuffer(buf, BUFFER_LOCK_UNLOCK);
                        }
                }
index acb8038a870af23e291ade52396b5cf753645c38..19c8e09148b03b1fd85f080b59314fd6b3ed8969 100644 (file)
@@ -284,7 +284,7 @@ restart:
                                exclusive_lock_held = true;
                        }
                        fsm_rebuild_page(page);
-                       MarkBufferDirty(buf);
+                       MarkBufferDirtyHint(buf);
                        goto restart;
                }
        }
index 95f3e16bfb176a375ccb48516d193f72492612cb..81cdc6547a3ab90ff48eb570e1429f8c33256695 100644 (file)
 #include "postgres.h"
 
 #include "access/htup_details.h"
+#include "access/xlog.h"
 
+bool ignore_checksum_failure = false;
+
+static char pageCopyData[BLCKSZ];      /* for checksum calculation */
+static Page pageCopy = pageCopyData;
+
+static uint16 PageCalcChecksum16(Page page, BlockNumber blkno);
 
 /* ----------------------------------------------------------------
  *                                             Page support functions
@@ -25,6 +32,8 @@
 /*
  * PageInit
  *             Initializes the contents of a page.
+ *             Note that we don't calculate an initial checksum here; that's not done
+ *             until it's time to write.
  */
 void
 PageInit(Page page, Size pageSize, Size specialSize)
@@ -39,7 +48,7 @@ PageInit(Page page, Size pageSize, Size specialSize)
        /* Make sure all fields of page are zero, as well as unused space */
        MemSet(p, 0, pageSize);
 
-       /* p->pd_flags = 0;                                                             done by above MemSet */
+       p->pd_flags = 0;
        p->pd_lower = SizeOfPageHeaderData;
        p->pd_upper = pageSize - specialSize;
        p->pd_special = pageSize - specialSize;
@@ -49,8 +58,8 @@ PageInit(Page page, Size pageSize, Size specialSize)
 
 
 /*
- * PageHeaderIsValid
- *             Check that the header fields of a page appear valid.
+ * PageIsVerified
+ *             Check that the page header and checksum (if any) appear valid.
  *
  * This is called when a page has just been read in from disk. The idea is
  * to cheaply detect trashed pages before we go nuts following bogus item
@@ -67,30 +76,77 @@ PageInit(Page page, Size pageSize, Size specialSize)
  * will clean up such a page and make it usable.
  */
 bool
-PageHeaderIsValid(PageHeader page)
+PageIsVerified(Page page, BlockNumber blkno)
 {
+       PageHeader      p = (PageHeader) page;
        char       *pagebytes;
        int                     i;
+       bool            checksum_failure = false;
+       bool            header_sane = false;
+       bool            all_zeroes = false;
+       uint16          checksum;
 
-       /* Check normal case */
-       if (PageGetPageSize(page) == BLCKSZ &&
-               PageGetPageLayoutVersion(page) == PG_PAGE_LAYOUT_VERSION &&
-               (page->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
-               page->pd_lower >= SizeOfPageHeaderData &&
-               page->pd_lower <= page->pd_upper &&
-               page->pd_upper <= page->pd_special &&
-               page->pd_special <= BLCKSZ &&
-               page->pd_special == MAXALIGN(page->pd_special))
-               return true;
+       /*
+        * Don't verify page data unless the page passes basic non-zero test
+        */
+       if (!PageIsNew(page))
+       {
+               if (DataChecksumsEnabled())
+               {
+                       checksum = PageCalcChecksum16(page, blkno);
+
+                       if (checksum != p->pd_checksum)
+                               checksum_failure = true;
+               }
+
+               /*
+                * The following checks don't prove the header is correct,
+                * only that it looks sane enough to allow into the buffer pool.
+                * Later usage of the block can still reveal problems,
+                * which is why we offer the checksum option.
+                */
+               if ((p->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
+                        p->pd_lower <= p->pd_upper &&
+                        p->pd_upper <= p->pd_special &&
+                        p->pd_special <= BLCKSZ &&
+                        p->pd_special == MAXALIGN(p->pd_special))
+                       header_sane = true;
+
+               if (header_sane && !checksum_failure)
+                       return true;
+       }
 
        /* Check all-zeroes case */
+       all_zeroes = true;
        pagebytes = (char *) page;
        for (i = 0; i < BLCKSZ; i++)
        {
                if (pagebytes[i] != 0)
-                       return false;
+               {
+                       all_zeroes = false;
+                       break;
+               }
+       }
+
+       if (all_zeroes)
+               return true;
+
+       /*
+        * Throw a WARNING if the checksum fails, but only after we've checked for
+        * the all-zeroes case.
+        */
+       if (checksum_failure)
+       {
+               ereport(WARNING,
+                               (ERRCODE_DATA_CORRUPTED,
+                                errmsg("page verification failed, calculated checksum %u but expected %u",
+                                               checksum, p->pd_checksum)));
+
+               if (header_sane && ignore_checksum_failure)
+                       return true;
        }
-       return true;
+
+       return false;
 }
 
 
@@ -827,3 +883,98 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
 
        pfree(itemidbase);
 }
+
+/*
+ * Set checksum for page in shared buffers.
+ *
+ * If checksums are disabled, or if the page is not initialized, just return
+ * the input. Otherwise, we must make a copy of the page before calculating the
+ * checksum, to prevent concurrent modifications (e.g. setting hint bits) from
+ * making the final checksum invalid.
+ *
+ * Returns a pointer to the block-sized data that needs to be written. Uses
+ * statically-allocated memory, so the caller must immediately write the
+ * returned page and not refer to it again.
+ */
+char *
+PageSetChecksumCopy(Page page, BlockNumber blkno)
+{
+       if (PageIsNew(page) || !DataChecksumsEnabled())
+               return (char *) page;
+
+       /*
+        * We make a copy iff we need to calculate a checksum because other
+        * backends may set hint bits on this page while we write, which
+        * would mean the checksum differs from the page contents. It doesn't
+        * matter if we include or exclude hints during the copy, as long
+        * as we write a valid page and associated checksum.
+        */
+       memcpy((char *) pageCopy, (char *) page, BLCKSZ);
+       PageSetChecksumInplace(pageCopy, blkno);
+       return (char *) pageCopy;
+}
+
+/*
+ * Set checksum for page in private memory.
+ *
+ * This is a simpler version of PageSetChecksumCopy(). The more explicit API
+ * allows us to more easily see if we're making the correct call and reduces
+ * the amount of additional code specific to page verification.
+ */
+void
+PageSetChecksumInplace(Page page, BlockNumber blkno)
+{
+       if (PageIsNew(page))
+               return;
+
+       if (DataChecksumsEnabled())
+       {
+               PageHeader      p = (PageHeader) page;
+               p->pd_checksum = PageCalcChecksum16(page, blkno);
+       }
+
+       return;
+}
+
+/*
+ * Calculate checksum for a PostgreSQL Page. This includes the block number (to
+ * detect the case when a page is somehow moved to a different location), the
+ * page header (excluding the checksum itself), and the page data.
+ *
+ * Note that if the checksum validation fails we cannot tell the difference
+ * between a transposed block and failure from direct on-block corruption,
+ * though that is better than just ignoring transposed blocks altogether.
+ */
+static uint16
+PageCalcChecksum16(Page page, BlockNumber blkno)
+{
+       pg_crc32                crc;
+       PageHeader      p = (PageHeader) page;
+
+       /* only calculate the checksum for properly-initialized pages */
+       Assert(!PageIsNew(page));
+
+       INIT_CRC32(crc);
+
+       /*
+        * Initialize the checksum calculation with the block number. This helps
+        * catch corruption from whole blocks being transposed with other whole
+        * blocks.
+        */
+       COMP_CRC32(crc, &blkno, sizeof(blkno));
+
+       /*
+        * Now add in the LSN, which is always the first field on the page.
+        */
+       COMP_CRC32(crc, page, sizeof(p->pd_lsn));
+
+       /*
+        * Now add the rest of the page, skipping the pd_checksum field.
+        */
+       COMP_CRC32(crc, page + sizeof(p->pd_lsn) + sizeof(p->pd_checksum),
+                                 BLCKSZ - sizeof(p->pd_lsn) - sizeof(p->pd_checksum));
+
+       FIN_CRC32(crc);
+
+       return (uint16) crc;
+}
index e1b65256a4feed3f71c89e88b24573547e939cd3..22ba35fef93fda2d13f0058cee1085ea6f3e416b 100644 (file)
@@ -122,6 +122,7 @@ extern int  CommitDelay;
 extern int     CommitSiblings;
 extern char *default_tablespace;
 extern char *temp_tablespaces;
+extern bool ignore_checksum_failure;
 extern bool synchronize_seqscans;
 extern int     ssl_renegotiation_limit;
 extern char *SSLCipherSuites;
@@ -807,6 +808,21 @@ static struct config_bool ConfigureNamesBool[] =
                true,
                NULL, NULL, NULL
        },
+       {
+               {"ignore_checksum_failure", PGC_SUSET, DEVELOPER_OPTIONS,
+                       gettext_noop("Continues processing after a checksum failure."),
+                       gettext_noop("Detection of a checksum failure normally causes PostgreSQL to "
+                               "report an error, aborting the current transaction. Setting "
+                                                "ignore_checksum_failure to true causes the system to ignore the failure "
+                                                "(but still report a warning), and continue processing. This "
+                                                "behavior could cause crashes or other serious problems. Only "
+                                                "has an effect if checksums are enabled."),
+                       GUC_NOT_IN_SAMPLE
+               },
+               &ignore_checksum_failure,
+               false,
+               NULL, NULL, NULL
+       },
        {
                {"zero_damaged_pages", PGC_SUSET, DEVELOPER_OPTIONS,
                        gettext_noop("Continues processing past damaged page headers."),
index f2c9ff2e1c16860d779d7538deaed758c92719ef..24384b498900727ff51d1a44c5ebcf099788b953 100644 (file)
@@ -6,7 +6,7 @@
  * NOTE: all the HeapTupleSatisfies routines will update the tuple's
  * "hint" status bits if we see that the inserting or deleting transaction
  * has now committed or aborted (and it is safe to set the hint bits).
- * If the hint bits are changed, SetBufferCommitInfoNeedsSave is called on
+ * If the hint bits are changed, MarkBufferDirtyHint is called on
  * the passed-in buffer.  The caller must hold not only a pin, but at least
  * shared buffer content lock on the buffer containing the tuple.
  *
@@ -121,7 +121,7 @@ SetHintBits(HeapTupleHeader tuple, Buffer buffer,
        }
 
        tuple->t_infomask |= infomask;
-       SetBufferCommitInfoNeedsSave(buffer);
+       MarkBufferDirtyHint(buffer);
 }
 
 /*
index e16f3e3c80a91188ee8be4048ae1e2db40b9cdf3..bb38796eb28b6ade2f74cf20556a8f69be901bcd 100644 (file)
@@ -119,6 +119,7 @@ static bool noclean = false;
 static bool do_sync = true;
 static bool sync_only = false;
 static bool show_setting = false;
+static bool data_checksums = false;
 static char *xlog_dir = "";
 
 
@@ -1441,8 +1442,10 @@ bootstrap_template1(void)
        unsetenv("PGCLIENTENCODING");
 
        snprintf(cmd, sizeof(cmd),
-                        "\"%s\" --boot -x1 %s %s",
-                        backend_exec, boot_options, talkargs);
+                        "\"%s\" --boot -x1 %s %s %s",
+                        backend_exec,
+                        data_checksums ? "-k" : "",
+                        boot_options, talkargs);
 
        PG_CMD_OPEN;
 
@@ -2748,6 +2751,7 @@ usage(const char *progname)
        printf(_("  -X, --xlogdir=XLOGDIR     location for the transaction log directory\n"));
        printf(_("\nLess commonly used options:\n"));
        printf(_("  -d, --debug               generate lots of debugging output\n"));
+       printf(_("  -k, --data-checksums      data page checksums\n"));
        printf(_("  -L DIRECTORY              where to find the input files\n"));
        printf(_("  -n, --noclean             do not clean up after errors\n"));
        printf(_("  -N, --nosync              do not wait for changes to be written safely to disk\n"));
@@ -3424,6 +3428,7 @@ main(int argc, char *argv[])
                {"nosync", no_argument, NULL, 'N'},
                {"sync-only", no_argument, NULL, 'S'},
                {"xlogdir", required_argument, NULL, 'X'},
+               {"data-checksums", no_argument, NULL, 'k'},
                {NULL, 0, NULL, 0}
        };
 
@@ -3455,7 +3460,7 @@ main(int argc, char *argv[])
 
        /* process command-line options */
 
-       while ((c = getopt_long(argc, argv, "dD:E:L:nNU:WA:sST:X:", long_options, &option_index)) != -1)
+       while ((c = getopt_long(argc, argv, "dD:E:kL:nNU:WA:sST:X:", long_options, &option_index)) != -1)
        {
                switch (c)
                {
@@ -3504,6 +3509,9 @@ main(int argc, char *argv[])
                        case 'S':
                                sync_only = true;
                                break;
+                       case 'k':
+                               data_checksums = true;
+                               break;
                        case 'L':
                                share_path = pg_strdup(optarg);
                                break;
@@ -3615,6 +3623,11 @@ main(int argc, char *argv[])
        setup_locale_encoding();
 
        setup_text_search();
+
+       if (data_checksums)
+               printf(_("Data page checksums are enabled.\n"));
+       else
+               printf(_("Data page checksums are disabled.\n"));
        
        printf("\n");
 
index 369f2fb075988198eecd0c59d7ac695bbaab0d03..ceb412505bf2a51cd15bbde23aead4476d4d613c 100644 (file)
@@ -287,5 +287,7 @@ main(int argc, char *argv[])
                   (ControlFile.float4ByVal ? _("by value") : _("by reference")));
        printf(_("Float8 argument passing:              %s\n"),
                   (ControlFile.float8ByVal ? _("by value") : _("by reference")));
+       printf(_("Data page checksums:                  %s\n"),
+                  (ControlFile.data_checksums ? _("enabled") : _("disabled")));
        return 0;
 }
index acf9f8dd3e3aa800dec87b876dcdc49a93a5d053..124dcfb7772ae4c963da1ffca68319ad7382516c 100644 (file)
@@ -624,6 +624,8 @@ PrintControlValues(bool guessed)
                   (ControlFile.float4ByVal ? _("by value") : _("by reference")));
        printf(_("Float8 argument passing:              %s\n"),
                   (ControlFile.float8ByVal ? _("by value") : _("by reference")));
+       printf(_("Data page checksums:                  %s\n"),
+                  (ControlFile.data_checksums ? _("enabled") : _("disabled")));
 }
 
 
index 270924a01ae87d380c9d99cc4b5c590863ff5ba8..e58eae5630ca9a81e9df5545e925e65ef2793f5f 100644 (file)
@@ -279,7 +279,7 @@ extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
 extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
                                TransactionId cutoff_xid, MultiXactId cutoff_multi,
                                OffsetNumber *offsets, int offcnt);
-extern XLogRecPtr log_heap_visible(RelFileNode rnode, BlockNumber block,
+extern XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer,
                                 Buffer vm_buffer, TransactionId cutoff_xid);
 extern XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum,
                        BlockNumber blk, Page page);
index 99b2dc5bb65281a96356daf01b6f7c9a9a6bfbc1..43789c230fcf1bd5013c23076088f82d3dcb78e5 100644 (file)
@@ -24,8 +24,8 @@ extern void visibilitymap_clear(Relation rel, BlockNumber heapBlk,
 extern void visibilitymap_pin(Relation rel, BlockNumber heapBlk,
                                  Buffer *vmbuf);
 extern bool visibilitymap_pin_ok(BlockNumber heapBlk, Buffer vmbuf);
-extern void visibilitymap_set(Relation rel, BlockNumber heapBlk,
-                                 XLogRecPtr recptr, Buffer vmbuf, TransactionId cutoff_xid);
+extern void visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
+                                 XLogRecPtr recptr, Buffer vmBuf, TransactionId cutoff_xid);
 extern bool visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *vmbuf);
 extern BlockNumber visibilitymap_count(Relation rel);
 extern void visibilitymap_truncate(Relation rel, BlockNumber nheapblocks);
index 8a65492a3464440d42308c461d600305ab23715f..f8f06c1f38b26420ff974635ecb274eae262c94f 100644 (file)
@@ -267,6 +267,8 @@ extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
 extern int XLogFileInit(XLogSegNo segno, bool *use_existent, bool use_lock);
 extern int     XLogFileOpen(XLogSegNo segno);
 
+extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer);
+
 extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
 extern void XLogSetAsyncXactLSN(XLogRecPtr record);
 
@@ -294,6 +296,7 @@ extern char *XLogFileNameP(TimeLineID tli, XLogSegNo segno);
 
 extern void UpdateControlFile(void);
 extern uint64 GetSystemIdentifier(void);
+extern bool DataChecksumsEnabled(void);
 extern XLogRecPtr GetFakeLSNForUnloggedRel(void);
 extern Size XLOGShmemSize(void);
 extern void XLOGShmemInit(void);
index 306d18885400f13108302b0921077316c4a59ad8..1d003d6d7a093a16ef468fe0cf0f260cf5ae70f4 100644 (file)
@@ -21,7 +21,7 @@
 
 
 /* Version identifier for this pg_control format */
-#define PG_CONTROL_VERSION     935
+#define PG_CONTROL_VERSION     936
 
 /*
  * Body of CheckPoint XLOG records.  This is declared here because we keep
@@ -65,8 +65,9 @@ typedef struct CheckPoint
 #define XLOG_BACKUP_END                                        0x50
 #define XLOG_PARAMETER_CHANGE                  0x60
 #define XLOG_RESTORE_POINT                             0x70
-#define XLOG_FPW_CHANGE                                0x80
+#define XLOG_FPW_CHANGE                                        0x80
 #define XLOG_END_OF_RECOVERY                   0x90
+#define XLOG_HINT                                              0xA0
 
 
 /*
@@ -212,6 +213,9 @@ typedef struct ControlFileData
        bool            float4ByVal;    /* float4 pass-by-value? */
        bool            float8ByVal;    /* float8, int8, etc pass-by-value? */
 
+       /* Are data pages protected by checksums? */
+       bool            data_checksums;
+
        /* CRC of all above ... MUST BE LAST! */
        pg_crc32        crc;
 } ControlFileData;
index 2ad536b745fb74e2189554ab8211e556ae223f75..9be18608426c0315c820b91438bc4f78f941859a 100644 (file)
@@ -195,6 +195,7 @@ extern void DropDatabaseBuffers(Oid dbid);
        RelationGetNumberOfBlocksInFork(reln, MAIN_FORKNUM)
 
 extern bool BufferIsPermanent(Buffer buffer);
+extern XLogRecPtr BufferGetLSNAtomic(Buffer buffer);
 
 #ifdef NOT_USED
 extern void PrintPinnedBufs(void);
@@ -203,7 +204,7 @@ extern Size BufferShmemSize(void);
 extern void BufferGetTag(Buffer buffer, RelFileNode *rnode,
                         ForkNumber *forknum, BlockNumber *blknum);
 
-extern void SetBufferCommitInfoNeedsSave(Buffer buffer);
+extern void MarkBufferDirtyHint(Buffer buffer);
 
 extern void UnlockBuffers(void);
 extern void LockBuffer(Buffer buffer, int mode);
index 42f8f2fa496176336dbee04f42b9aa3d018a6e40..b9ee7c27e10dc134c79a7addc71eb5c62523892f 100644 (file)
@@ -15,6 +15,7 @@
 #define BUFPAGE_H
 
 #include "access/xlogdefs.h"
+#include "storage/block.h"
 #include "storage/item.h"
 #include "storage/off.h"
 
@@ -386,7 +387,7 @@ do { \
  */
 
 extern void PageInit(Page page, Size pageSize, Size specialSize);
-extern bool PageHeaderIsValid(PageHeader page);
+extern bool PageIsVerified(Page page, BlockNumber blkno);
 extern OffsetNumber PageAddItem(Page page, Item item, Size size,
                        OffsetNumber offsetNumber, bool overwrite, bool is_heap);
 extern Page PageGetTempPage(Page page);
@@ -399,5 +400,7 @@ extern Size PageGetExactFreeSpace(Page page);
 extern Size PageGetHeapFreeSpace(Page page);
 extern void PageIndexTupleDelete(Page page, OffsetNumber offset);
 extern void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems);
+extern char *PageSetChecksumCopy(Page page, BlockNumber blkno);
+extern void PageSetChecksumInplace(Page page, BlockNumber blkno);
 
 #endif   /* BUFPAGE_H */