]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/transam/xlogutils.c
pgindent run for 9.4
[postgresql] / src / backend / access / transam / xlogutils.c
index 9abcce65483133a01b52eed0aabf71d980bb112f..b7829ff4c6ddce25e2ffcbde9858acf8dbfc5377 100644 (file)
@@ -8,18 +8,20 @@
  * None of this code is used during normal system operation.
  *
  *
- * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/backend/access/transam/xlogutils.c,v 1.59 2008/09/30 10:52:11 heikki Exp $
+ * src/backend/access/transam/xlogutils.c
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
+#include "access/xlog.h"
 #include "access/xlogutils.h"
-#include "storage/bufmgr.h"
+#include "catalog/catalog.h"
 #include "storage/smgr.h"
+#include "utils/guc.h"
 #include "utils/hsearch.h"
 #include "utils/rel.h"
 
@@ -50,6 +52,22 @@ typedef struct xl_invalid_page
 static HTAB *invalid_page_tab = NULL;
 
 
+/* Report a reference to an invalid page */
+static void
+report_invalid_page(int elevel, RelFileNode node, ForkNumber forkno,
+                                       BlockNumber blkno, bool present)
+{
+       char       *path = relpathperm(node, forkno);
+
+       if (present)
+               elog(elevel, "page %u of relation %s is uninitialized",
+                        blkno, path);
+       else
+               elog(elevel, "page %u of relation %s does not exist",
+                        blkno, path);
+       pfree(path);
+}
+
 /* Log a reference to an invalid page */
 static void
 log_invalid_page(RelFileNode node, ForkNumber forkno, BlockNumber blkno,
@@ -59,17 +77,27 @@ log_invalid_page(RelFileNode node, ForkNumber forkno, BlockNumber blkno,
        xl_invalid_page *hentry;
        bool            found;
 
+       /*
+        * Once recovery has reached a consistent state, the invalid-page table
+        * should be empty and remain so. If a reference to an invalid page is
+        * found after consistency is reached, PANIC immediately. This might seem
+        * aggressive, but it's better than letting the invalid reference linger
+        * in the hash table until the end of recovery and PANIC there, which
+        * might come only much later if this is a standby server.
+        */
+       if (reachedConsistency)
+       {
+               report_invalid_page(WARNING, node, forkno, blkno, present);
+               elog(PANIC, "WAL contains references to invalid pages");
+       }
+
        /*
         * Log references to invalid pages at DEBUG1 level.  This allows some
         * tracing of the cause (note the elog context mechanism will tell us
         * something about the XLOG record that generated the reference).
         */
-       if (present)
-               elog(DEBUG1, "page %u of relation %u/%u/%u/%u is uninitialized",
-                        blkno, node.spcNode, node.dbNode, node.relNode, forkno);
-       else
-               elog(DEBUG1, "page %u of relation %u/%u/%u/%u does not exist",
-                        blkno, node.spcNode, node.dbNode, node.relNode, forkno);
+       if (log_min_messages <= DEBUG1 || client_min_messages <= DEBUG1)
+               report_invalid_page(DEBUG1, node, forkno, blkno, present);
 
        if (invalid_page_tab == NULL)
        {
@@ -123,9 +151,14 @@ forget_invalid_pages(RelFileNode node, ForkNumber forkno, BlockNumber minblkno)
                        hentry->key.forkno == forkno &&
                        hentry->key.blkno >= minblkno)
                {
-                       elog(DEBUG2, "page %u of relation %u/%u/%u/%u has been dropped",
-                                hentry->key.blkno, hentry->key.node.spcNode,
-                                hentry->key.node.dbNode, hentry->key.node.relNode, forkno);
+                       if (log_min_messages <= DEBUG2 || client_min_messages <= DEBUG2)
+                       {
+                               char       *path = relpathperm(hentry->key.node, forkno);
+
+                               elog(DEBUG2, "page %u of relation %s has been dropped",
+                                        hentry->key.blkno, path);
+                               pfree(path);
+                       }
 
                        if (hash_search(invalid_page_tab,
                                                        (void *) &hentry->key,
@@ -151,9 +184,14 @@ forget_invalid_pages_db(Oid dbid)
        {
                if (hentry->key.node.dbNode == dbid)
                {
-                       elog(DEBUG2, "page %u of relation %u/%u/%u has been dropped",
-                                hentry->key.blkno, hentry->key.node.spcNode,
-                                hentry->key.node.dbNode, hentry->key.node.relNode);
+                       if (log_min_messages <= DEBUG2 || client_min_messages <= DEBUG2)
+                       {
+                               char       *path = relpathperm(hentry->key.node, hentry->key.forkno);
+
+                               elog(DEBUG2, "page %u of relation %s has been dropped",
+                                        hentry->key.blkno, path);
+                               pfree(path);
+                       }
 
                        if (hash_search(invalid_page_tab,
                                                        (void *) &hentry->key,
@@ -163,6 +201,16 @@ forget_invalid_pages_db(Oid dbid)
        }
 }
 
+/* Are there any unresolved references to invalid pages? */
+bool
+XLogHaveInvalidPages(void)
+{
+       if (invalid_page_tab != NULL &&
+               hash_get_num_entries(invalid_page_tab) > 0)
+               return true;
+       return false;
+}
+
 /* Complain about any remaining invalid-page entries */
 void
 XLogCheckInvalidPages(void)
@@ -182,14 +230,8 @@ XLogCheckInvalidPages(void)
         */
        while ((hentry = (xl_invalid_page *) hash_seq_search(&status)) != NULL)
        {
-               if (hentry->present)
-                       elog(WARNING, "page %u of relation %u/%u/%u was uninitialized",
-                                hentry->key.blkno, hentry->key.node.spcNode,
-                                hentry->key.node.dbNode, hentry->key.node.relNode);
-               else
-                       elog(WARNING, "page %u of relation %u/%u/%u did not exist",
-                                hentry->key.blkno, hentry->key.node.spcNode,
-                                hentry->key.node.dbNode, hentry->key.node.relNode);
+               report_invalid_page(WARNING, hentry->key.node, hentry->key.forkno,
+                                                       hentry->key.blkno, hentry->present);
                foundone = true;
        }
 
@@ -200,45 +242,59 @@ XLogCheckInvalidPages(void)
        invalid_page_tab = NULL;
 }
 
-
 /*
  * XLogReadBuffer
- *             Read a page during XLOG replay
+ *             Read a page during XLOG replay.
+ *
+ * This is a shorthand of XLogReadBufferExtended() followed by
+ * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), for reading from the main
+ * fork.
  *
- * This is functionally comparable to ReadBuffer followed by
- * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE): you get back a pinned
- * and locked buffer.  (Getting the lock is not really necessary, since we
- * expect that this is only used during single-process XLOG replay, but
- * some subroutines such as MarkBufferDirty will complain if we don't.)
+ * (Getting the buffer lock is not really necessary during single-process
+ * crash recovery, but some subroutines such as MarkBufferDirty will complain
+ * if we don't have the lock.  In hot standby mode it's definitely necessary.)
  *
- * If "init" is true then the caller intends to rewrite the page fully
- * using the info in the XLOG record.  In this case we will extend the
- * relation if needed to make the page exist, and we will not complain about
- * the page being "new" (all zeroes); in fact, we usually will supply a
- * zeroed buffer without reading the page at all, so as to avoid unnecessary
- * failure if the page is present on disk but has corrupt headers.
+ * The returned buffer is exclusively-locked.
  *
- * If "init" is false then the caller needs the page to be valid already.
- * If the page doesn't exist or contains zeroes, we return InvalidBuffer.
- * In this case the caller should silently skip the update on this page.
- * (In this situation, we expect that the page was later dropped or truncated.
- * If we don't see evidence of that later in the WAL sequence, we'll complain
- * at the end of WAL replay.)
+ * For historical reasons, instead of a ReadBufferMode argument, this only
+ * supports RBM_ZERO (init == true) and RBM_NORMAL (init == false) modes.
  */
 Buffer
 XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
 {
-       return XLogReadBufferWithFork(rnode, MAIN_FORKNUM, blkno, init);
+       Buffer          buf;
+
+       buf = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno,
+                                                                init ? RBM_ZERO : RBM_NORMAL);
+       if (BufferIsValid(buf))
+               LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+
+       return buf;
 }
 
 /*
- * XLogReadBufferWithFork
- *             Like XLogReadBuffer, but for reading other relation forks than
- *             the main one.
+ * XLogReadBufferExtended
+ *             Read a page during XLOG replay
+ *
+ * This is functionally comparable to ReadBufferExtended. There's some
+ * differences in the behavior wrt. the "mode" argument:
+ *
+ * In RBM_NORMAL mode, if the page doesn't exist, or contains all-zeroes, we
+ * return InvalidBuffer. In this case the caller should silently skip the
+ * update on this page. (In this situation, we expect that the page was later
+ * dropped or truncated. If we don't see evidence of that later in the WAL
+ * sequence, we'll complain at the end of WAL replay.)
+ *
+ * In RBM_ZERO and RBM_ZERO_ON_ERROR modes, if the page doesn't exist, the
+ * relation is extended with all-zeroes pages up to the given block number.
+ *
+ * In RBM_NORMAL_NO_LOG mode, we return InvalidBuffer if the page doesn't
+ * exist, and we don't check for all-zeroes.  Thus, no log entry is made
+ * to imply that the page should be dropped or truncated later.
  */
 Buffer
-XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
-                                          BlockNumber blkno, bool init)
+XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
+                                          BlockNumber blkno, ReadBufferMode mode)
 {
        BlockNumber lastblock;
        Buffer          buffer;
@@ -247,7 +303,7 @@ XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
        Assert(blkno != P_NEW);
 
        /* Open the relation at smgr level */
-       smgr = smgropen(rnode);
+       smgr = smgropen(rnode, InvalidBackendId);
 
        /*
         * Create the target file if it doesn't already exist.  This lets us cope
@@ -257,48 +313,60 @@ XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
         * filesystem loses an inode during a crash.  Better to write the data
         * until we are actually told to delete the file.)
         */
-       smgrcreate(smgr, forknum, false, true);
+       smgrcreate(smgr, forknum, true);
 
        lastblock = smgrnblocks(smgr, forknum);
 
        if (blkno < lastblock)
        {
                /* page exists in file */
-               buffer = ReadBufferWithoutRelcache(rnode, false, forknum, blkno, init);
+               buffer = ReadBufferWithoutRelcache(rnode, forknum, blkno,
+                                                                                  mode, NULL);
        }
        else
        {
                /* hm, page doesn't exist in file */
-               if (!init)
+               if (mode == RBM_NORMAL)
                {
                        log_invalid_page(rnode, forknum, blkno, false);
                        return InvalidBuffer;
                }
+               if (mode == RBM_NORMAL_NO_LOG)
+                       return InvalidBuffer;
                /* OK to extend the file */
                /* we do this in recovery only - no rel-extension lock needed */
                Assert(InRecovery);
                buffer = InvalidBuffer;
-               while (blkno >= lastblock)
+               do
                {
                        if (buffer != InvalidBuffer)
                                ReleaseBuffer(buffer);
-                       buffer = ReadBufferWithoutRelcache(rnode, false, forknum,
-                                                                                          P_NEW, false);
-                       lastblock++;
+                       buffer = ReadBufferWithoutRelcache(rnode, forknum,
+                                                                                          P_NEW, mode, NULL);
+               }
+               while (BufferGetBlockNumber(buffer) < blkno);
+               /* Handle the corner case that P_NEW returns non-consecutive pages */
+               if (BufferGetBlockNumber(buffer) != blkno)
+               {
+                       ReleaseBuffer(buffer);
+                       buffer = ReadBufferWithoutRelcache(rnode, forknum, blkno,
+                                                                                          mode, NULL);
                }
-               Assert(BufferGetBlockNumber(buffer) == blkno);
        }
 
-       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
-       if (!init)
+       if (mode == RBM_NORMAL)
        {
                /* check that page has been initialized */
                Page            page = (Page) BufferGetPage(buffer);
 
+               /*
+                * We assume that PageIsNew is safe without a lock. During recovery,
+                * there should be no other backends that could modify the buffer at
+                * the same time.
+                */
                if (PageIsNew(page))
                {
-                       UnlockReleaseBuffer(buffer);
+                       ReleaseBuffer(buffer);
                        log_invalid_page(rnode, forknum, blkno, true);
                        return InvalidBuffer;
                }
@@ -314,8 +382,8 @@ XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
  */
 typedef struct
 {
-       RelationData            reldata;        /* Note: this must be first */
-       FormData_pg_class       pgc;
+       RelationData reldata;           /* Note: this must be first */
+       FormData_pg_class pgc;
 } FakeRelCacheEntryData;
 
 typedef FakeRelCacheEntryData *FakeRelCacheEntry;
@@ -324,10 +392,10 @@ typedef FakeRelCacheEntryData *FakeRelCacheEntry;
  * Create a fake relation cache entry for a physical relation
  *
  * It's often convenient to use the same functions in XLOG replay as in the
- * main codepath, but those functions typically work with a relcache entry. 
- * We don't have a working relation cache during XLOG replay, but this 
- * function can be used to create a fake relcache entry instead. Only the 
- * fields related to physical storage, like rd_rel, are initialized, so the 
+ * main codepath, but those functions typically work with a relcache entry.
+ * We don't have a working relation cache during XLOG replay, but this
+ * function can be used to create a fake relcache entry instead. Only the
+ * fields related to physical storage, like rd_rel, are initialized, so the
  * fake entry is only usable in low-level operations like ReadBuffer().
  *
  * Caller must free the returned entry with FreeFakeRelcacheEntry().
@@ -336,7 +404,9 @@ Relation
 CreateFakeRelcacheEntry(RelFileNode rnode)
 {
        FakeRelCacheEntry fakeentry;
-       Relation rel;
+       Relation        rel;
+
+       Assert(InRecovery);
 
        /* Allocate the Relation struct and all related space in one block. */
        fakeentry = palloc0(sizeof(FakeRelCacheEntryData));
@@ -344,6 +414,11 @@ CreateFakeRelcacheEntry(RelFileNode rnode)
 
        rel->rd_rel = &fakeentry->pgc;
        rel->rd_node = rnode;
+       /* We will never be working with temp rels during recovery */
+       rel->rd_backend = InvalidBackendId;
+
+       /* It must be a permanent table if we're in recovery. */
+       rel->rd_rel->relpersistence = RELPERSISTENCE_PERMANENT;
 
        /* We don't know the name of the relation; use relfilenode instead */
        sprintf(RelationGetRelationName(rel), "%u", rnode.relNode);
@@ -351,15 +426,13 @@ CreateFakeRelcacheEntry(RelFileNode rnode)
        /*
         * We set up the lockRelId in case anything tries to lock the dummy
         * relation.  Note that this is fairly bogus since relNode may be
-        * different from the relation's OID.  It shouldn't really matter
-        * though, since we are presumably running by ourselves and can't have
-        * any lock conflicts ...
+        * different from the relation's OID.  It shouldn't really matter though,
+        * since we are presumably running by ourselves and can't have any lock
+        * conflicts ...
         */
        rel->rd_lockInfo.lockRelId.dbId = rnode.dbNode;
        rel->rd_lockInfo.lockRelId.relId = rnode.relNode;
 
-       rel->rd_targblock = InvalidBlockNumber;
-       rel->rd_fsm_nblocks_cache = InvalidBlockNumber;
        rel->rd_smgr = NULL;
 
        return rel;
@@ -371,6 +444,9 @@ CreateFakeRelcacheEntry(RelFileNode rnode)
 void
 FreeFakeRelcacheEntry(Relation fakerel)
 {
+       /* make sure the fakerel is not referenced by the SmgrRelation anymore */
+       if (fakerel->rd_smgr != NULL)
+               smgrclearowner(&fakerel->rd_smgr, fakerel->rd_smgr);
        pfree(fakerel);
 }
 
@@ -396,10 +472,9 @@ XLogDropDatabase(Oid dbid)
 {
        /*
         * This is unnecessarily heavy-handed, as it will close SMgrRelation
-        * objects for other databases as well. DROP DATABASE occurs seldom
-        * enough that it's not worth introducing a variant of smgrclose for
-        * just this purpose. XXX: Or should we rather leave the smgr entries
-        * dangling?
+        * objects for other databases as well. DROP DATABASE occurs seldom enough
+        * that it's not worth introducing a variant of smgrclose for just this
+        * purpose. XXX: Or should we rather leave the smgr entries dangling?
         */
        smgrcloseall();