]> granicus.if.org Git - postgresql/commitdiff
Make standby server continuously retry restoring the next WAL segment with
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 27 Jan 2010 15:27:51 +0000 (15:27 +0000)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 27 Jan 2010 15:27:51 +0000 (15:27 +0000)
restore_command, if the connection to the primary server is lost. This
ensures that the standby can recover automatically, if the connection is
lost for a long time and standby falls behind so much that the required
WAL segments have been archived and deleted in the master.

This also makes standby_mode useful without streaming replication; the
server will keep retrying restore_command every few seconds until the
trigger file is found. That's the same basic functionality pg_standby
offers, but without the bells and whistles.

To implement that, refactor the ReadRecord/FetchRecord functions. The
FetchRecord() function introduced in the original streaming replication
patch is removed, and all the retry logic is now in a new function called
XLogReadPage(). XLogReadPage() is now responsible for executing
restore_command, launching walreceiver, and waiting for new WAL to arrive
from primary, as required.

This also changes the life cycle of walreceiver. When launched, it now only
tries to connect to the master once, and exits if the connection fails, or
is lost during streaming for any reason. The startup process detects the
death, and re-launches walreceiver if necessary.

src/backend/access/transam/xlog.c
src/backend/postmaster/postmaster.c
src/backend/replication/walreceiver.c
src/backend/replication/walreceiverfuncs.c
src/include/replication/walreceiver.h
src/include/storage/pmsignal.h

index 1eb877e5fcbdd4b2776453409067f78453493e47..60d40d4505b677af4976b03df60b9ffb405b88f6 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.361 2010/01/26 00:07:13 sriggs Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.362 2010/01/27 15:27:50 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -143,16 +143,6 @@ HotStandbyState            standbyState = STANDBY_DISABLED;
 
 static         XLogRecPtr      LastRec;
 
-/*
- * Are we doing recovery from XLOG stream? If so, we recover without using
- * offline XLOG archives even though InArchiveRecovery==true. This flag is
- * used only in standby mode.
- */
-static bool InStreamingRecovery = false;
-
-/* The current log page is partially-filled, and so needs to be read again? */
-static bool needReread = false;
-
 /*
  * Local copy of SharedRecoveryInProgress variable. True actually means "not
  * known, need to check the shared state".
@@ -457,12 +447,16 @@ static uint32 openLogOff = 0;
  * These variables are used similarly to the ones above, but for reading
  * the XLOG.  Note, however, that readOff generally represents the offset
  * of the page just read, not the seek position of the FD itself, which
- * will be just past that page.
+ * will be just past that page. readLen indicates how much of the current
+ * page has been read into readBuf.
  */
 static int     readFile = -1;
 static uint32 readId = 0;
 static uint32 readSeg = 0;
 static uint32 readOff = 0;
+static uint32 readLen = 0;
+/* Is the currently open segment being streamed from primary? */
+static bool readStreamed = false;
 
 /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
 static char *readBuf = NULL;
@@ -474,7 +468,6 @@ static uint32 readRecordBufSize = 0;
 /* State information for XLOG reading */
 static XLogRecPtr ReadRecPtr;  /* start of last record read */
 static XLogRecPtr EndRecPtr;   /* end+1 of last record read */
-static XLogRecord *nextRecord = NULL;
 static TimeLineID lastPageTLI = 0;
 
 static XLogRecPtr minRecoveryPoint;            /* local copy of
@@ -516,7 +509,12 @@ static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
 static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
                                           bool find_free, int *max_advance,
                                           bool use_lock);
-static int     XLogFileRead(uint32 log, uint32 seg, int emode);
+static int     XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
+                        bool fromArchive, bool notexistOk);
+static int     XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode,
+                                  bool fromArchive);
+static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
+                        bool randAccess);
 static void XLogFileClose(void);
 static bool RestoreArchivedFile(char *path, const char *xlogfname,
                                        const char *recovername, off_t expectedSize);
@@ -526,8 +524,7 @@ static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr);
 static void ValidateXLOGDirectoryStructure(void);
 static void CleanupBackupHistory(void);
 static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
-static XLogRecord *FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
-static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
+static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
 static List *readTimeLineHistory(TimeLineID targetTLI);
@@ -539,6 +536,7 @@ static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
 static void WriteControlFile(void);
 static void ReadControlFile(void);
 static char *str_time(pg_time_t tnow);
+static bool CheckForStandbyTrigger(void);
 
 #ifdef WAL_DEBUG
 static void xlog_outrec(StringInfo buf, XLogRecord *record);
@@ -2586,36 +2584,22 @@ XLogFileOpen(uint32 log, uint32 seg)
 
 /*
  * Open a logfile segment for reading (during recovery).
+ *
+ * If fromArchive is true, the segment is retrieved from archive, otherwise
+ * it's read from pg_xlog.
  */
 static int
-XLogFileRead(uint32 log, uint32 seg, int emode)
+XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
+                        bool fromArchive, bool notfoundOk)
 {
-       char            path[MAXPGPATH];
        char            xlogfname[MAXFNAMELEN];
        char            activitymsg[MAXFNAMELEN + 16];
-       ListCell   *cell;
+       char            path[MAXPGPATH];
        int                     fd;
 
-       /*
-        * Loop looking for a suitable timeline ID: we might need to read any of
-        * the timelines listed in expectedTLIs.
-        *
-        * We expect curFileTLI on entry to be the TLI of the preceding file in
-        * sequence, or 0 if there was no predecessor.  We do not allow curFileTLI
-        * to go backwards; this prevents us from picking up the wrong file when a
-        * parent timeline extends to higher segment numbers than the child we
-        * want to read.
-        */
-       foreach(cell, expectedTLIs)
-       {
-               TimeLineID      tli = (TimeLineID) lfirst_int(cell);
-
-               if (tli < curFileTLI)
-                       break;                          /* don't bother looking at too-old TLIs */
-
                XLogFileName(xlogfname, tli, log, seg);
 
-               if (InArchiveRecovery && !InStreamingRecovery)
+               if (fromArchive)
                {
                        /* Report recovery progress in PS display */
                        snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
@@ -2625,9 +2609,14 @@ XLogFileRead(uint32 log, uint32 seg, int emode)
                        restoredFromArchive = RestoreArchivedFile(path, xlogfname,
                                                                                                          "RECOVERYXLOG",
                                                                                                          XLogSegSize);
+                       if (!restoredFromArchive)
+                               return -1;
                }
                else
+               {
                        XLogFilePath(path, tli, log, seg);
+                       restoredFromArchive = false;
+               }
 
                fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
                if (fd >= 0)
@@ -2642,11 +2631,62 @@ XLogFileRead(uint32 log, uint32 seg, int emode)
 
                        return fd;
                }
-               if (errno != ENOENT)    /* unexpected failure? */
+               if (errno != ENOENT || !notfoundOk)     /* unexpected failure? */
                        ereport(PANIC,
                                        (errcode_for_file_access(),
                        errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
                                   path, log, seg)));
+               return -1;
+}
+
+/*
+ * Open a logfile segment for reading (during recovery).
+ *
+ * This version searches for the segment with any TLI listed in expectedTLIs.
+ * If not in StandbyMode and fromArchive is true, the segment is also
+ * searched in pg_xlog if not found in archive.
+ */
+static int
+XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive)
+{
+       char            path[MAXPGPATH];
+       ListCell   *cell;
+       int                     fd;
+
+       /*
+        * Loop looking for a suitable timeline ID: we might need to read any of
+        * the timelines listed in expectedTLIs.
+        *
+        * We expect curFileTLI on entry to be the TLI of the preceding file in
+        * sequence, or 0 if there was no predecessor.  We do not allow curFileTLI
+        * to go backwards; this prevents us from picking up the wrong file when a
+        * parent timeline extends to higher segment numbers than the child we
+        * want to read.
+        */
+       foreach(cell, expectedTLIs)
+       {
+               TimeLineID      tli = (TimeLineID) lfirst_int(cell);
+
+               if (tli < curFileTLI)
+                       break;                          /* don't bother looking at too-old TLIs */
+
+               fd = XLogFileRead(log, seg, emode, tli, fromArchive, true);
+               if (fd != -1)
+                       return fd;
+
+               /*
+                * If not in StandbyMode, fall back to searching pg_xlog. In
+                * StandbyMode we're streaming segments from the primary to pg_xlog,
+                * and we mustn't confuse the (possibly partial) segments in pg_xlog
+                * with complete segments ready to be applied. We rather wait for
+                * the records to arrive through streaming.
+                */
+               if (!StandbyMode && fromArchive)
+               {
+                       fd = XLogFileRead(log, seg, emode, tli, false, true);
+                       if (fd != -1)
+                               return fd;
+               }
        }
 
        /* Couldn't find it.  For simplicity, complain about front timeline */
@@ -3163,7 +3203,7 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
                         * different filename that can't be confused with regular XLOG
                         * files.
                         */
-                       if (InStreamingRecovery || XLogArchiveCheckDone(xlde->d_name))
+                       if (WalRcvInProgress() || XLogArchiveCheckDone(xlde->d_name))
                        {
                                snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
 
@@ -3473,79 +3513,6 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
        return true;
 }
 
-/*
- * Attempt to fetch an XLOG record.
- *
- * If RecPtr is not NULL, try to fetch a record at that position.  Otherwise
- * try to fetch a record just after the last one previously read.
- *
- * In standby mode, if we failed in reading a valid record and are not doing
- * recovery from XLOG stream yet, we ignore the failure and start walreceiver
- * process to fetch the record from the primary. Otherwise, returns NULL,
- * or fails if emode is PANIC. (emode must be either PANIC or LOG.)
- *
- * If fetching_ckpt is TRUE, RecPtr points to the checkpoint location. In
- * this case, if we have to start XLOG streaming, we use RedoStartLSN as the
- * streaming start position instead of RecPtr.
- *
- * The record is copied into readRecordBuf, so that on successful return,
- * the returned record pointer always points there.
- */
-static XLogRecord *
-FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
-{
-       if (StandbyMode && !InStreamingRecovery)
-       {
-               XLogRecord *record;
-               XLogRecPtr      startlsn;
-               bool            haveNextRecord = (nextRecord != NULL);
-
-               /* An invalid record is OK here, so we set emode to DEBUG2 */
-               record = ReadRecord(RecPtr, DEBUG2);
-               if (record != NULL)
-                       return record;
-
-               /*
-                * Start XLOG streaming if there is no more valid records available
-                * in the archive.
-                *
-                * We need to calculate the start position of XLOG streaming. If we
-                * read a record in the middle of a segment which doesn't exist in
-                * pg_xlog, we use the start of the segment as the start position.
-                * That prevents a broken segment (i.e., with no records in the
-                * first half of a segment) from being created by XLOG streaming,
-                * which might cause trouble later on if the segment is e.g
-                * archived.
-                */
-               startlsn = fetching_ckpt ? RedoStartLSN : EndRecPtr;
-               if (startlsn.xrecoff % XLogSegSize != 0)
-               {
-                       char            xlogpath[MAXPGPATH];
-                       struct stat     stat_buf;
-                       uint32          log;
-                       uint32          seg;
-
-                       XLByteToSeg(startlsn, log, seg);
-                       XLogFilePath(xlogpath, recoveryTargetTLI, log, seg);
-
-                       if (stat(xlogpath, &stat_buf) != 0)
-                               startlsn.xrecoff -= startlsn.xrecoff % XLogSegSize;
-               }
-               RequestXLogStreaming(startlsn, PrimaryConnInfo);
-
-               /* Needs to read the current page again if the next record is in it */
-               needReread = haveNextRecord;
-               nextRecord = NULL;
-
-               InStreamingRecovery = true;
-               ereport(LOG,
-                               (errmsg("starting streaming recovery at %X/%X",
-                                               startlsn.xlogid, startlsn.xrecoff)));
-       }
-
-       return ReadRecord(RecPtr, emode);
-}
-
 /*
  * Attempt to read an XLOG record.
  *
@@ -3553,13 +3520,13 @@ FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
  * try to read a record just after the last one previously read.
  *
  * If no valid record is available, returns NULL, or fails if emode is PANIC.
- * (emode must be either PANIC, LOG or DEBUG2.)
+ * (emode must be either PANIC, LOG)
  *
  * The record is copied into readRecordBuf, so that on successful return,
  * the returned record pointer always points there.
  */
 static XLogRecord *
-ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
+ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
 {
        XLogRecord *record;
        char       *buffer;
@@ -3567,11 +3534,8 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
        bool            randAccess = false;
        uint32          len,
                                total_len;
-       uint32          targetPageOff;
        uint32          targetRecOff;
        uint32          pageHeaderSize;
-       XLogRecPtr      receivedUpto = {0,0};
-       bool            finished;
        int                     emode;
 
        /*
@@ -3579,7 +3543,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
         * should never hit the end of WAL because we wait for it to be streamed.
         * Therefore treat any broken WAL as PANIC, instead of failing over.
         */
-       if (InStreamingRecovery)
+       if (StandbyMode)
                emode = PANIC;
        else
                emode = emode_arg;
@@ -3600,20 +3564,16 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
        if (RecPtr == NULL)
        {
                RecPtr = &tmpRecPtr;
-               /* fast case if next record is on same page */
-               if (nextRecord != NULL)
-               {
-                       record = nextRecord;
-                       goto got_record;
-               }
 
                /*
-                * Align old recptr to next page if the current page is filled and
-                * doesn't need to be read again.
+                * Align recptr to next page if no more records can fit on the
+                * current page.
                 */
-               if (!needReread)
+               if (XLOG_BLCKSZ - (RecPtr->xrecoff % XLOG_BLCKSZ) < SizeOfXLogRecord)
+               {
                        NextLogPage(tmpRecPtr);
-               /* We will account for page header size below */
+                       /* We will account for page header size below */
+               }
        }
        else
        {
@@ -3633,81 +3593,10 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
                randAccess = true;              /* allow curFileTLI to go backwards too */
        }
 
-       if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
-       {
-               close(readFile);
-               readFile = -1;
-       }
-
-       /* Is the target record ready yet? */
-       if (InStreamingRecovery)
-       {
-               receivedUpto = WaitNextXLogAvailable(*RecPtr, &finished);
-               if (finished)
-               {
-                       if (emode_arg == PANIC)
-                               ereport(PANIC,
-                                               (errmsg("streaming recovery ended")));
-                       else
-                               return NULL;
-               }
-       }
-
-       XLByteToSeg(*RecPtr, readId, readSeg);
-       if (readFile < 0)
-       {
-               /* Now it's okay to reset curFileTLI if random fetch */
-               if (randAccess)
-                       curFileTLI = 0;
-
-               readFile = XLogFileRead(readId, readSeg, emode);
-               if (readFile < 0)
-                       goto next_record_is_invalid;
-
-               /*
-                * Whenever switching to a new WAL segment, we read the first page of
-                * the file and validate its header, even if that's not where the
-                * target record is.  This is so that we can check the additional
-                * identification info that is present in the first page's "long"
-                * header.
-                */
-               readOff = 0;
-               if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
-               {
-                       ereport(emode,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not read from log file %u, segment %u, offset %u: %m",
-                                                       readId, readSeg, readOff)));
-                       goto next_record_is_invalid;
-               }
-               if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
-                       goto next_record_is_invalid;
-       }
+       /* Read the page containing the record */
+       if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
+               return NULL;
 
-       targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
-       if (readOff != targetPageOff || needReread)
-       {
-               readOff = targetPageOff;
-               needReread = false;
-               if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
-               {
-                       ereport(emode,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not seek in log file %u, segment %u to offset %u: %m",
-                                                       readId, readSeg, readOff)));
-                       goto next_record_is_invalid;
-               }
-               if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
-               {
-                       ereport(emode,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not read from log file %u, segment %u, offset %u: %m",
-                                                       readId, readSeg, readOff)));
-                       goto next_record_is_invalid;
-               }
-               if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
-                       goto next_record_is_invalid;
-       }
        pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
        targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
        if (targetRecOff == 0)
@@ -3737,8 +3626,6 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
        }
        record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
 
-got_record:;
-
        /*
         * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
         * required.
@@ -3838,58 +3725,35 @@ got_record:;
        }
 
        buffer = readRecordBuf;
-       nextRecord = NULL;
        len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
        if (total_len > len)
        {
                /* Need to reassemble record */
                XLogContRecord *contrecord;
-               XLogRecPtr      nextpagelsn = *RecPtr;
+               XLogRecPtr      pagelsn;
                uint32          gotlen = len;
 
+               /* Initialize pagelsn to the beginning of the page this record is on */
+               pagelsn = *RecPtr;
+               pagelsn.xrecoff = (pagelsn.xrecoff / XLOG_BLCKSZ) * XLOG_BLCKSZ;
+
                memcpy(buffer, record, len);
                record = (XLogRecord *) buffer;
                buffer += len;
                for (;;)
                {
-                       /* Is the next page ready yet? */
-                       if (InStreamingRecovery)
+                       /* Calculate pointer to beginning of next page */
+                       pagelsn.xrecoff += XLOG_BLCKSZ;
+                       if (pagelsn.xrecoff >= XLogFileSize)
                        {
-                               if (gotlen != len)
-                                       nextpagelsn.xrecoff += XLOG_BLCKSZ;
-                               NextLogPage(nextpagelsn);
-                               receivedUpto = WaitNextXLogAvailable(nextpagelsn, &finished);
-                               if (finished)
-                               {
-                                       if (emode_arg == PANIC)
-                                               ereport(PANIC,
-                                                               (errmsg("streaming recovery ended")));
-                                       else
-                                               return NULL;
-                               }
+                               (pagelsn.xlogid)++;
+                               pagelsn.xrecoff = 0;
                        }
+                       /* Wait for the next page to become available */
+                       if (!XLogPageRead(&pagelsn, emode, false, false))
+                               return NULL;
 
-                       readOff += XLOG_BLCKSZ;
-                       if (readOff >= XLogSegSize)
-                       {
-                               close(readFile);
-                               readFile = -1;
-                               NextLogSeg(readId, readSeg);
-                               readFile = XLogFileRead(readId, readSeg, emode);
-                               if (readFile < 0)
-                                       goto next_record_is_invalid;
-                               readOff = 0;
-                       }
-                       if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
-                       {
-                               ereport(emode,
-                                               (errcode_for_file_access(),
-                                                errmsg("could not read from log file %u, segment %u, offset %u: %m",
-                                                               readId, readSeg, readOff)));
-                               goto next_record_is_invalid;
-                       }
-                       if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
-                               goto next_record_is_invalid;
+                       /* Check that the continuation record looks valid */
                        if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
                        {
                                ereport(emode,
@@ -3923,31 +3787,11 @@ got_record:;
                if (!RecordIsValid(record, *RecPtr, emode))
                        goto next_record_is_invalid;
                pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
-               if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
-                       MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
-               {
-                       nextRecord = (XLogRecord *) ((char *) contrecord +
-                                       MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len));
-               }
                EndRecPtr.xlogid = readId;
                EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
                        pageHeaderSize +
                        MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
 
-               /*
-                * Check whether the current page needs to be read again. If there is no
-                * unread record in the current page (nextRecord == NULL), obviously we
-                * don't need to reread it. If we're not in streaming recovery mode yet,
-                * partially-filled page doesn't need to be reread because it is the
-                * last valid page.
-                */
-               if (nextRecord != NULL && InStreamingRecovery &&
-                       XLByteLE(receivedUpto, EndRecPtr))
-               {
-                       nextRecord      = NULL;
-                       needReread      = true;
-               }
-
                ReadRecPtr = *RecPtr;
                /* needn't worry about XLOG SWITCH, it can't cross page boundaries */
                return record;
@@ -3956,26 +3800,9 @@ got_record:;
        /* Record does not cross a page boundary */
        if (!RecordIsValid(record, *RecPtr, emode))
                goto next_record_is_invalid;
-       if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ +
-               MAXALIGN(total_len))
-               nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
        EndRecPtr.xlogid = RecPtr->xlogid;
        EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
 
-       /*
-        * Check whether the current page needs to be read again. If there is no
-        * unread record in the current page (nextRecord == NULL), obviously we
-        * don't need to reread it. If we're not in streaming recovery mode yet,
-        * partially-filled page doesn't need to be reread because it is the last
-        * valid page.
-        */
-       if (nextRecord != NULL && InStreamingRecovery &&
-               XLByteLE(receivedUpto, EndRecPtr))
-       {
-               nextRecord      = NULL;
-               needReread      = true;
-       }
-
        ReadRecPtr = *RecPtr;
        memcpy(buffer, record, total_len);
 
@@ -3987,8 +3814,6 @@ got_record:;
                /* Pretend it extends to end of segment */
                EndRecPtr.xrecoff += XLogSegSize - 1;
                EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize;
-               nextRecord = NULL;              /* definitely not on same page */
-               needReread = false;
 
                /*
                 * Pretend that readBuf contains the last page of the segment. This is
@@ -4005,7 +3830,6 @@ next_record_is_invalid:;
                close(readFile);
                readFile = -1;
        }
-       nextRecord = NULL;
        return NULL;
 }
 
@@ -5730,7 +5554,7 @@ StartupXLOG(void)
                                        (errmsg("checkpoint record is at %X/%X",
                                                        checkPointLoc.xlogid, checkPointLoc.xrecoff)));
                }
-               else if (InStreamingRecovery)
+               else if (StandbyMode)
                {
                        /*
                         * The last valid checkpoint record required for a streaming
@@ -5938,12 +5762,12 @@ StartupXLOG(void)
                if (XLByteLT(checkPoint.redo, RecPtr))
                {
                        /* back up to find the record */
-                       record = FetchRecord(&(checkPoint.redo), PANIC, false);
+                       record = ReadRecord(&(checkPoint.redo), PANIC, false);
                }
                else
                {
                        /* just have to read next record after CheckPoint */
-                       record = FetchRecord(NULL, LOG, false);
+                       record = ReadRecord(NULL, LOG, false);
                }
 
                if (record != NULL)
@@ -6096,7 +5920,7 @@ StartupXLOG(void)
 
                                LastRec = ReadRecPtr;
 
-                               record = FetchRecord(NULL, LOG, false);
+                               record = ReadRecord(NULL, LOG, false);
                        } while (record != NULL && recoveryContinue);
 
                        /*
@@ -6130,22 +5954,17 @@ StartupXLOG(void)
 
        /*
         * We are now done reading the xlog from stream. Turn off streaming
-        * recovery, and restart fetching the files (which would be required
-        * at end of recovery, e.g., timeline history file) from archive.
+        * recovery to force fetching the files (which would be required
+        * at end of recovery, e.g., timeline history file) from archive or
+        * pg_xlog.
         */
-       if (InStreamingRecovery)
-       {
-               /* We are no longer in streaming recovery state */
-               InStreamingRecovery = false;
-               ereport(LOG,
-                               (errmsg("streaming recovery complete")));
-       }
+       StandbyMode = false;
 
        /*
         * Re-fetch the last valid or last applied record, so we can identify the
         * exact endpoint of what we consider the valid portion of WAL.
         */
-       record = ReadRecord(&LastRec, PANIC);
+       record = ReadRecord(&LastRec, PANIC, false);
        EndOfLog = EndRecPtr;
        XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);
 
@@ -6515,7 +6334,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
                return NULL;
        }
 
-       record = FetchRecord(&RecPtr, LOG, true);
+       record = ReadRecord(&RecPtr, LOG, true);
 
        if (record == NULL)
        {
@@ -7461,10 +7280,6 @@ CreateRestartPoint(int flags)
        }
        LWLockRelease(ControlFileLock);
 
-       /* Are we doing recovery from XLOG stream? */
-       if (!InStreamingRecovery)
-               InStreamingRecovery = WalRcvInProgress();
-
        /*
         * Delete old log files (those no longer needed even for previous
         * checkpoint/restartpoint) to prevent the disk holding the xlog from
@@ -7472,7 +7287,7 @@ CreateRestartPoint(int flags)
         * streaming recovery we have to or the disk will eventually fill up from
         * old log files streamed from master.
         */
-       if (InStreamingRecovery && (_logId || _logSeg))
+       if (WalRcvInProgress() && (_logId || _logSeg))
        {
                XLogRecPtr      endptr;
 
@@ -8791,6 +8606,13 @@ HandleStartupProcInterrupts(void)
         */
        if (shutdown_requested)
                proc_exit(1);
+
+       /*
+        * Emergency bailout if postmaster has died.  This is to avoid the
+        * necessity for manual cleanup of all postmaster children.
+        */
+       if (IsUnderPostmaster && !PostmasterIsAlive(true))
+               exit(1);
 }
 
 /* Main entry point for startup process */
@@ -8843,3 +8665,281 @@ StartupProcessMain(void)
         */
        proc_exit(0);
 }
+
+/*
+ * Read the XLOG page containing RecPtr into readBuf (if not read already).
+ * Returns true if successful, false otherwise or fails if emode is PANIC.
+ *
+ * This is responsible for restoring files from archive as needed, as well
+ * as for waiting for the requested WAL record to arrive in standby mode.
+ */
+static bool
+XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
+                        bool randAccess)
+{
+       static XLogRecPtr receivedUpto = {0, 0};
+       bool            switched_segment = false;
+       uint32          targetPageOff;
+       uint32          targetRecOff;
+       uint32          targetId;
+       uint32          targetSeg;
+
+       XLByteToSeg(*RecPtr, targetId, targetSeg);
+       targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
+       targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
+
+       /* Fast exit if we have read the record in the current buffer already */
+       if (targetId == readId && targetSeg == readSeg &&
+               targetPageOff == readOff && targetRecOff < readLen)
+               return true;
+
+       /*
+        * See if we need to switch to a new segment because the requested record
+        * is not in the currently open one.
+        */
+       if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
+       {
+               close(readFile);
+               readFile = -1;
+       }
+
+       XLByteToSeg(*RecPtr, readId, readSeg);
+
+       /* See if we need to retrieve more data */
+       if (readFile < 0 ||
+               (readStreamed && !XLByteLT(*RecPtr, receivedUpto)))
+       {
+               if (StandbyMode)
+               {
+                       bool last_restore_failed = false;
+
+                       /*
+                        * In standby mode, wait for the requested record to become
+                        * available, either via restore_command succeeding to restore
+                        * the segment, or via walreceiver having streamed the record.
+                        */
+                       for (;;)
+                       {
+                               if (WalRcvInProgress())
+                               {
+                                       /*
+                                        * While walreceiver is active, wait for new WAL to
+                                        * arrive from primary.
+                                        */
+                                       receivedUpto = GetWalRcvWriteRecPtr();
+                                       if (XLByteLT(*RecPtr, receivedUpto))
+                                       {
+                                               /*
+                                                * Great, streamed far enough. Open the file if it's
+                                                * not open already.
+                                                */
+                                               if (readFile < 0)
+                                               {
+                                                       readFile =
+                                                               XLogFileRead(readId, readSeg, PANIC,
+                                                                                        recoveryTargetTLI, false, false);
+                                                       switched_segment = true;
+                                                       readStreamed = true;
+                                               }
+                                               break;
+                                       }
+
+                                       if (CheckForStandbyTrigger())
+                                               goto next_record_is_invalid;
+
+                                       /*
+                                        * When streaming is active, we want to react quickly when
+                                        * the next WAL record arrives, so sleep only a bit.
+                                        */
+                                       pg_usleep(100000L); /* 100ms */
+                               }
+                               else
+                               {
+                                       /*
+                                        * Until walreceiver manages to reconnect, poll the
+                                        * archive.
+                                        */
+                                       if (readFile >= 0)
+                                       {
+                                               close(readFile);
+                                               readFile = -1;
+                                       }
+                                       /* Reset curFileTLI if random fetch. */
+                                       if (randAccess)
+                                               curFileTLI = 0;
+                                       readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2, true);
+                                       switched_segment = true;
+                                       readStreamed = false;
+                                       if (readFile != -1)
+                                       {
+                                               elog(DEBUG1, "got WAL segment from archive");
+                                               break;
+                                       }
+
+                                       /*
+                                        * If we succeeded restoring some segments from archive
+                                        * since the last connection attempt (or we haven't
+                                        * tried streaming yet, retry immediately. But if we
+                                        * haven't, assume the problem is persistent, so be
+                                        * less aggressive.
+                                        */
+                                       if (last_restore_failed)
+                                       {
+                                               /*
+                                                * Check to see if the trigger file exists. Note that
+                                                * we do this only after failure, so when you create
+                                                * the trigger file, we still finish replaying as much
+                                                * as we can before failover.
+                                                */
+                                               if (CheckForStandbyTrigger())
+                                                       goto next_record_is_invalid;
+                                               pg_usleep(5000000L); /* 5 seconds */
+                                       }
+                                       last_restore_failed = true;
+
+                                       /*
+                                        * Nope, not found in archive. Try to stream it.
+                                        *
+                                        * If fetching_ckpt is TRUE, RecPtr points to the initial
+                                        * checkpoint location. In that case, we use RedoStartLSN
+                                        * as the streaming start position instead of RecPtr, so
+                                        * that when we later jump backwards to start redo at
+                                        * RedoStartLSN, we will have the logs streamed already.
+                                        */
+                                       RequestXLogStreaming(fetching_ckpt ? RedoStartLSN : *RecPtr,
+                                                                                PrimaryConnInfo);
+                               }
+
+                               /*
+                                * This possibly-long loop needs to handle interrupts of startup
+                                * process.
+                                */
+                               HandleStartupProcInterrupts();
+                       }
+               }
+               else
+               {
+                       /* In archive or crash recovery. */
+                       if (readFile < 0)
+                       {
+                               /* Reset curFileTLI if random fetch. */
+                               if (randAccess)
+                                       curFileTLI = 0;
+                               readFile = XLogFileReadAnyTLI(readId, readSeg, emode,
+                                                                                         InArchiveRecovery);
+                               switched_segment = true;
+                               readStreamed = false;
+                               if (readFile < 0)
+                                       return false;
+                       }
+               }
+       }
+
+       /*
+        * At this point, we have the right segment open and we know the
+        * requested record is in it.
+        */
+       Assert(readFile != -1);
+
+       /*
+        * If the current segment is being streamed from master, calculate
+        * how much of the current page we have received already. We know the
+        * requested record has been received, but this is for the benefit
+        * of future calls, to allow quick exit at the top of this function.
+        */
+       if (readStreamed)
+       {
+               if (RecPtr->xlogid != receivedUpto.xlogid ||
+                       (RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ))
+               {
+                       readLen = XLOG_BLCKSZ;
+               }
+               else
+                       readLen = receivedUpto.xrecoff % XLogSegSize - targetPageOff;
+       }
+       else
+               readLen = XLOG_BLCKSZ;
+
+       if (switched_segment && targetPageOff != 0)
+       {
+               /*
+                * Whenever switching to a new WAL segment, we read the first page of
+                * the file and validate its header, even if that's not where the
+                * target record is.  This is so that we can check the additional
+                * identification info that is present in the first page's "long"
+                * header.
+                */
+               readOff = 0;
+               if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+               {
+                       ereport(emode,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not read from log file %u, segment %u, offset %u: %m",
+                                                       readId, readSeg, readOff)));
+                       goto next_record_is_invalid;
+               }
+               if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
+                       goto next_record_is_invalid;
+       }
+
+       /* Read the requested page */
+       readOff = targetPageOff;
+       if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
+       {
+               ereport(emode,
+                               (errcode_for_file_access(),
+                                errmsg("could not seek in log file %u, segment %u to offset %u: %m",
+                                               readId, readSeg, readOff)));
+               goto next_record_is_invalid;
+       }
+       if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+       {
+               ereport(emode,
+                               (errcode_for_file_access(),
+                                errmsg("could not read from log file %u, segment %u, offset %u: %m",
+                                               readId, readSeg, readOff)));
+               goto next_record_is_invalid;
+       }
+       if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
+               goto next_record_is_invalid;
+
+       Assert(targetId == readId);
+       Assert(targetSeg == readSeg);
+       Assert(targetPageOff == readOff);
+       Assert(targetRecOff < readLen);
+
+       return true;
+
+next_record_is_invalid:
+       if (readFile >= 0)
+               close(readFile);
+       readFile = -1;
+       readStreamed = false;
+       readLen = 0;
+
+       return false;
+}
+
+/*
+ * Check to see if the trigger file exists. If it does, request postmaster
+ * to shut down walreceiver, wait for it to exit, remove the trigger
+ * file, and return true.
+ */
+static bool
+CheckForStandbyTrigger(void)
+{
+       struct stat stat_buf;
+
+       if (TriggerFile == NULL)
+               return false;
+
+       if (stat(TriggerFile, &stat_buf) == 0)
+       {
+               ereport(LOG,
+                               (errmsg("trigger file found: %s", TriggerFile)));
+               ShutdownWalRcv();
+               unlink(TriggerFile);
+               return true;
+       }
+       return false;
+}
index 59f994bd16e594a7931807ed1055da3a4346c300..6df11b8a7402703ff03788d8950942c3fb7c7508 100644 (file)
@@ -37,7 +37,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.601 2010/01/15 09:19:02 heikki Exp $
+ *       $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.602 2010/01/27 15:27:50 heikki Exp $
  *
  * NOTES
  *
@@ -224,9 +224,6 @@ static int  Shutdown = NoShutdown;
 static bool FatalError = false; /* T if recovering from backend crash */
 static bool RecoveryError = false;             /* T if WAL recovery failed */
 
-/* If WalReceiverActive is true, restart walreceiver if it dies */
-static bool WalReceiverActive = false;
-
 /*
  * We use a simple state machine to control startup, shutdown, and
  * crash recovery (which is rather like shutdown followed by startup).
@@ -1469,11 +1466,6 @@ ServerLoop(void)
                if (PgStatPID == 0 && pmState == PM_RUN)
                        PgStatPID = pgstat_start();
 
-               /* If we have lost walreceiver, try to start a new one */
-               if (WalReceiverPID == 0 && WalReceiverActive &&
-                       (pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT))
-                       WalReceiverPID = StartWalReceiver();
-
                /* If we need to signal the autovacuum launcher, do so now */
                if (avlauncher_needs_signal)
                {
@@ -4167,16 +4159,9 @@ sigusr1_handler(SIGNAL_ARGS)
                WalReceiverPID == 0)
        {
                /* Startup Process wants us to start the walreceiver process. */
-               WalReceiverActive = true;
                WalReceiverPID = StartWalReceiver();
        }
 
-       if (CheckPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER))
-       {
-               /* The walreceiver process doesn't want to be restarted anymore */
-               WalReceiverActive = false;
-       }
-
        PG_SETMASK(&UnBlockSig);
 
        errno = save_errno;
index f805e673e114b651cc16c4a6404b2b32ac3776ce..4a5ba5b4263631e08bc7586fe47ca81abee1416c 100644 (file)
@@ -29,7 +29,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $
+ *       $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.2 2010/01/27 15:27:51 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -134,8 +134,7 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS);
 static void WalRcvQuickDieHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
-static void InitWalRcv(void);
-static void WalRcvKill(int code, Datum arg);
+static void WalRcvDie(int code, Datum arg);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(void);
 
@@ -153,21 +152,57 @@ static struct
 void
 WalReceiverMain(void)
 {
-       sigjmp_buf      local_sigjmp_buf;
-       MemoryContext walrcv_context;
        char conninfo[MAXCONNINFO];
        XLogRecPtr startpoint;
        /* use volatile pointer to prevent code rearrangement */
        volatile WalRcvData *walrcv = WalRcv;
 
-       /* Load the libpq-specific functions */
-       load_file("libpqwalreceiver", false);
-       if (walrcv_connect == NULL || walrcv_receive == NULL ||
-               walrcv_disconnect == NULL)
-               elog(ERROR, "libpqwalreceiver didn't initialize correctly");
+       /*
+        * WalRcv should be set up already (if we are a backend, we inherit
+        * this by fork() or EXEC_BACKEND mechanism from the postmaster).
+        */
+       Assert(walrcv != NULL);
+
+       /*
+        * Mark walreceiver as running in shared memory.
+        *
+        * Do this as early as possible, so that if we fail later on, we'll
+        * set state to STOPPED. If we die before this, the startup process
+        * will keep waiting for us to start up, until it times out.
+        */
+       SpinLockAcquire(&walrcv->mutex);
+       Assert(walrcv->pid == 0);
+       switch(walrcv->walRcvState)
+       {
+               case WALRCV_STOPPING:
+                       /* If we've already been requested to stop, don't start up. */
+                       walrcv->walRcvState = WALRCV_STOPPED;
+                       /* fall through */
+
+               case WALRCV_STOPPED:
+                       SpinLockRelease(&walrcv->mutex);
+                       proc_exit(1);
+                       break;
+
+               case WALRCV_STARTING:
+                       /* The usual case */
+                       break;
+
+               case WALRCV_RUNNING:
+                       /* Shouldn't happen */
+                       elog(PANIC, "walreceiver still running according to shared memory state");
+       }
+       /* Advertise our PID so that the startup process can kill us */
+       walrcv->pid = MyProcPid;
+       walrcv->walRcvState = WALRCV_RUNNING;
+
+       /* Fetch information required to start streaming */
+       strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+       startpoint = walrcv->receivedUpto;
+       SpinLockRelease(&walrcv->mutex);
 
-       /* Mark walreceiver in progress */
-       InitWalRcv();
+       /* Arrange to clean up at walreceiver exit */
+       on_shmem_exit(WalRcvDie, 0);
 
        /*
         * If possible, make this process a group leader, so that the postmaster
@@ -200,81 +235,21 @@ WalReceiverMain(void)
        /* We allow SIGQUIT (quickdie) at all times */
        sigdelset(&BlockSig, SIGQUIT);
 
+       /* Load the libpq-specific functions */
+       load_file("libpqwalreceiver", false);
+       if (walrcv_connect == NULL || walrcv_receive == NULL ||
+               walrcv_disconnect == NULL)
+               elog(ERROR, "libpqwalreceiver didn't initialize correctly");
+
        /*
         * Create a resource owner to keep track of our resources (not clear that
         * we need this, but may as well have one).
         */
        CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
 
-       /*
-        * Create a memory context that we will do all our work in.  We do this so
-        * that we can reset the context during error recovery and thereby avoid
-        * possible memory leaks.
-        */
-       walrcv_context = AllocSetContextCreate(TopMemoryContext,
-                                                                                         "Wal Receiver",
-                                                                                         ALLOCSET_DEFAULT_MINSIZE,
-                                                                                         ALLOCSET_DEFAULT_INITSIZE,
-                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
-       MemoryContextSwitchTo(walrcv_context);
-
-       /*
-        * If an exception is encountered, processing resumes here.
-        *
-        * This code is heavily based on bgwriter.c, q.v.
-        */
-       if (sigsetjmp(local_sigjmp_buf, 1) != 0)
-       {
-               /* Since not using PG_TRY, must reset error stack by hand */
-               error_context_stack = NULL;
-
-               /* Reset WalRcvImmediateInterruptOK */
-               DisableWalRcvImmediateExit();
-
-               /* Prevent interrupts while cleaning up */
-               HOLD_INTERRUPTS();
-
-               /* Report the error to the server log */
-               EmitErrorReport();
-
-               /* Disconnect any previous connection. */
-               EnableWalRcvImmediateExit();
-               walrcv_disconnect();
-               DisableWalRcvImmediateExit();
-
-               /*
-                * Now return to normal top-level context and clear ErrorContext for
-                * next time.
-                */
-               MemoryContextSwitchTo(walrcv_context);
-               FlushErrorState();
-
-               /* Flush any leaked data in the top-level context */
-               MemoryContextResetAndDeleteChildren(walrcv_context);
-
-               /* Now we can allow interrupts again */
-               RESUME_INTERRUPTS();
-
-               /*
-                * Sleep at least 1 second after any error.  A write error is likely
-                * to be repeated, and we don't want to be filling the error logs as
-                * fast as we can.
-                */
-               pg_usleep(1000000L);
-       }
-
-       /* We can now handle ereport(ERROR) */
-       PG_exception_stack = &local_sigjmp_buf;
-
        /* Unblock signals (they were blocked when the postmaster forked us) */
        PG_SETMASK(&UnBlockSig);
 
-       /* Fetch connection information from shared memory */
-       SpinLockAcquire(&walrcv->mutex);
-       strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
-       startpoint = walrcv->receivedUpto;
-       SpinLockRelease(&walrcv->mutex);
-
        /* Establish the connection to the primary for XLOG streaming */
        EnableWalRcvImmediateExit();
        walrcv_connect(conninfo, startpoint);
@@ -330,63 +305,24 @@ WalReceiverMain(void)
        }
 }
 
-/* Advertise our pid in shared memory, so that startup process can kill us. */
-static void
-InitWalRcv(void)
-{
-       /* use volatile pointer to prevent code rearrangement */
-       volatile WalRcvData *walrcv = WalRcv;
-
-       /*
-        * WalRcv should be set up already (if we are a backend, we inherit
-        * this by fork() or EXEC_BACKEND mechanism from the postmaster).
-        */
-       if (walrcv == NULL)
-               elog(PANIC, "walreceiver control data uninitialized");
-
-       /* If we've already been requested to stop, don't start up */
-       SpinLockAcquire(&walrcv->mutex);
-       Assert(walrcv->pid == 0);
-       if (walrcv->walRcvState == WALRCV_STOPPED ||
-               walrcv->walRcvState == WALRCV_STOPPING)
-       {
-               walrcv->walRcvState = WALRCV_STOPPED;
-               SpinLockRelease(&walrcv->mutex);
-               proc_exit(1);
-       }
-       walrcv->pid = MyProcPid;
-       SpinLockRelease(&walrcv->mutex);
-
-       /* Arrange to clean up at walreceiver exit */
-       on_shmem_exit(WalRcvKill, 0);
-}
-
 /*
- * Clear our pid from shared memory at exit.
+ * Mark us as STOPPED in shared memory at exit.
  */
 static void
-WalRcvKill(int code, Datum arg)
+WalRcvDie(int code, Datum arg)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalRcvData *walrcv = WalRcv;
-       bool stopped = false;
 
        SpinLockAcquire(&walrcv->mutex);
-       if (walrcv->walRcvState == WALRCV_STOPPING ||
-               walrcv->walRcvState == WALRCV_STOPPED)
-       {
-               walrcv->walRcvState = WALRCV_STOPPED;
-               stopped = true;
-               elog(LOG, "walreceiver stopped");
-       }
+       Assert(walrcv->walRcvState == WALRCV_RUNNING ||
+                  walrcv->walRcvState == WALRCV_STOPPING);
+       walrcv->walRcvState = WALRCV_STOPPED;
        walrcv->pid = 0;
        SpinLockRelease(&walrcv->mutex);
 
+       /* Terminate the connection gracefully. */
        walrcv_disconnect();
-
-       /* If requested to stop, tell postmaster to not restart us. */
-       if (stopped)
-               SendPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER);
 }
 
 /* SIGHUP: set flag to re-read config file at next convenient time */
index c1d7b5588740808afef7f3e925b553f0123bce4e..4fb132dcd4e5b8ca5da8eca0e7419b978496d81a 100644 (file)
@@ -10,7 +10,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.2 2010/01/20 09:16:24 heikki Exp $
+ *       $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.3 2010/01/27 15:27:51 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -18,6 +18,8 @@
 
 #include <sys/types.h>
 #include <sys/stat.h>
+#include <sys/time.h>
+#include <time.h>
 #include <unistd.h>
 #include <signal.h>
 
 
 WalRcvData *WalRcv = NULL;
 
-static bool CheckForStandbyTrigger(void);
-static void ShutdownWalRcv(void);
+/*
+ * How long to wait for walreceiver to start up after requesting
+ * postmaster to launch it. In seconds.
+ */
+#define WALRCV_STARTUP_TIMEOUT 10
 
 /* Report shared memory space needed by WalRcvShmemInit */
 Size
@@ -62,7 +67,7 @@ WalRcvShmemInit(void)
 
        /* Initialize the data structures */
        MemSet(WalRcv, 0, WalRcvShmemSize());
-       WalRcv->walRcvState = WALRCV_NOT_STARTED;
+       WalRcv->walRcvState = WALRCV_STOPPED;
        SpinLockInit(&WalRcv->mutex);
 }
 
@@ -73,90 +78,51 @@ WalRcvInProgress(void)
        /* use volatile pointer to prevent code rearrangement */
        volatile WalRcvData *walrcv = WalRcv;
        WalRcvState state;
+       pg_time_t startTime;
 
        SpinLockAcquire(&walrcv->mutex);
-       state = walrcv->walRcvState;
-       SpinLockRelease(&walrcv->mutex);
 
-       if (state == WALRCV_RUNNING || state == WALRCV_STOPPING)
-               return true;
-       else
-               return false;
-}
-
-/*
- * Wait for the XLOG record at given position to become available.
- *
- * 'recptr' indicates the byte position which caller wants to read the
- * XLOG record up to. The byte position actually written and flushed
- * by walreceiver is returned. It can be higher than the requested
- * location, and the caller can safely read up to that point without
- * calling WaitNextXLogAvailable() again.
- *
- * If WAL streaming is ended (because a trigger file is found), *finished
- * is set to true and function returns immediately. The returned position
- * can be lower than requested in that case.
- *
- * Called by the startup process during streaming recovery.
- */
-XLogRecPtr
-WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished)
-{
-       static XLogRecPtr receivedUpto = {0, 0};
-
-       *finished = false;
+       state = walrcv->walRcvState;
+       startTime = walrcv->startTime;
 
-       /* Quick exit if already known available */
-       if (XLByteLT(recptr, receivedUpto))
-               return receivedUpto;
+       SpinLockRelease(&walrcv->mutex);
 
-       for (;;)
+       /*
+        * If it has taken too long for walreceiver to start up, give up.
+        * Setting the state to STOPPED ensures that if walreceiver later
+        * does start up after all, it will see that it's not supposed to be
+        * running and die without doing anything.
+        */
+       if (state == WALRCV_STARTING)
        {
-               /* use volatile pointer to prevent code rearrangement */
-               volatile WalRcvData *walrcv = WalRcv;
-
-               /* Update local status */
-               SpinLockAcquire(&walrcv->mutex);
-               receivedUpto = walrcv->receivedUpto;
-               SpinLockRelease(&walrcv->mutex);
+               pg_time_t now = (pg_time_t) time(NULL);
 
-               /* If available already, leave here */
-               if (XLByteLT(recptr, receivedUpto))
-                       return receivedUpto;
-
-               /* Check to see if the trigger file exists */
-               if (CheckForStandbyTrigger())
+               if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
                {
-                       *finished = true;
-                       return receivedUpto;
-               }
+                       SpinLockAcquire(&walrcv->mutex);
 
-               pg_usleep(100000L); /* 100ms */
-
-               /*
-                * This possibly-long loop needs to handle interrupts of startup
-                * process.
-                */
-               HandleStartupProcInterrupts();
+                       if (walrcv->walRcvState == WALRCV_STARTING)
+                               state = walrcv->walRcvState = WALRCV_STOPPED;
 
-               /*
-                * Emergency bailout if postmaster has died.  This is to avoid the
-                * necessity for manual cleanup of all postmaster children.
-                */
-               if (!PostmasterIsAlive(true))
-                       exit(1);
+                       SpinLockRelease(&walrcv->mutex);
+               }
        }
+
+       if (state != WALRCV_STOPPED)
+               return true;
+       else
+               return false;
 }
 
 /*
- * Stop walreceiver and wait for it to die.
+ * Stop walreceiver (if running) and wait for it to die.
  */
-static void
+void
 ShutdownWalRcv(void)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalRcvData *walrcv = WalRcv;
-       pid_t walrcvpid;
+       pid_t walrcvpid = 0;
 
        /*
         * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
@@ -164,15 +130,25 @@ ShutdownWalRcv(void)
         * restart itself.
         */
        SpinLockAcquire(&walrcv->mutex);
-       Assert(walrcv->walRcvState == WALRCV_RUNNING);
-       walrcv->walRcvState = WALRCV_STOPPING;
-       walrcvpid = walrcv->pid;
+       switch(walrcv->walRcvState)
+       {
+               case WALRCV_STOPPED:
+                       break;
+               case WALRCV_STARTING:
+                       walrcv->walRcvState = WALRCV_STOPPED;
+                       break;
+
+               case WALRCV_RUNNING:
+                       walrcv->walRcvState = WALRCV_STOPPING;
+                       /* fall through */
+               case WALRCV_STOPPING:
+                       walrcvpid = walrcv->pid;
+                       break;
+       }
        SpinLockRelease(&walrcv->mutex);
 
        /*
-        * Pid can be 0, if no walreceiver process is active right now.
-        * Postmaster should restart it, and when it does, it will see the
-        * STOPPING state.
+        * Signal walreceiver process if it was still running.
         */
        if (walrcvpid != 0)
                kill(walrcvpid, SIGTERM);
@@ -193,30 +169,6 @@ ShutdownWalRcv(void)
        }
 }
 
-/*
- * Check to see if the trigger file exists. If it does, request postmaster
- * to shut down walreceiver and wait for it to exit, and remove the trigger
- * file.
- */
-static bool
-CheckForStandbyTrigger(void)
-{
-       struct stat stat_buf;
-
-       if (TriggerFile == NULL)
-               return false;
-
-       if (stat(TriggerFile, &stat_buf) == 0)
-       {
-               ereport(LOG,
-                               (errmsg("trigger file found: %s", TriggerFile)));
-               ShutdownWalRcv();
-               unlink(TriggerFile);
-               return true;
-       }
-       return false;
-}
-
 /*
  * Request postmaster to start walreceiver.
  *
@@ -228,17 +180,30 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalRcvData *walrcv = WalRcv;
+       pg_time_t now = (pg_time_t) time(NULL);
 
-       Assert(walrcv->walRcvState == WALRCV_NOT_STARTED);
+       /*
+        * We always start at the beginning of the segment.
+        * That prevents a broken segment (i.e., with no records in the
+        * first half of a segment) from being created by XLOG streaming,
+        * which might cause trouble later on if the segment is e.g
+        * archived.
+        */
+       if (recptr.xrecoff % XLogSegSize != 0)
+               recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
+
+       /* It better be stopped before we try to restart it */
+       Assert(walrcv->walRcvState == WALRCV_STOPPED);
 
-       /* locking is just pro forma here; walreceiver isn't started yet */
        SpinLockAcquire(&walrcv->mutex);
-       walrcv->receivedUpto = recptr;
        if (conninfo != NULL)
                strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
        else
                walrcv->conninfo[0] = '\0';
-       walrcv->walRcvState = WALRCV_RUNNING;
+       walrcv->walRcvState = WALRCV_STARTING;
+       walrcv->startTime = now;
+
+       walrcv->receivedUpto = recptr;
        SpinLockRelease(&walrcv->mutex);
 
        SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
@@ -260,3 +225,4 @@ GetWalRcvWriteRecPtr(void)
 
        return recptr;
 }
+
index a645d18b5dcf757748b3b05b9816a3b21441b7c9..083eb4f07fb3f51ea930ed95b3ba1f7948024761 100644 (file)
@@ -5,7 +5,7 @@
  *
  * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
  *
- * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.4 2010/01/20 18:54:27 heikki Exp $
+ * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.5 2010/01/27 15:27:51 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
  */
 typedef enum
 {
-       WALRCV_NOT_STARTED,
-       WALRCV_RUNNING,         /* walreceiver has been started */
-       WALRCV_STOPPING,        /* requested to stop, but still running */
-       WALRCV_STOPPED          /* stopped and mustn't start up again */
+       WALRCV_STOPPED,         /* stopped and mustn't start up again */
+       WALRCV_STARTING,        /* launched, but the process hasn't initialized yet */
+       WALRCV_RUNNING,         /* walreceiver is running */
+       WALRCV_STOPPING         /* requested to stop, but still running */
 } WalRcvState;
 
 /* Shared memory area for management of walreceiver process */
@@ -47,6 +47,7 @@ typedef struct
         */
        pid_t   pid;
        WalRcvState walRcvState;
+       pg_time_t startTime;
 
        /*
         * receivedUpto-1 is the last byte position that has been already
@@ -74,6 +75,7 @@ extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
 extern void WalReceiverMain(void);
 extern Size WalRcvShmemSize(void);
 extern void WalRcvShmemInit(void);
+extern void ShutdownWalRcv(void);
 extern bool WalRcvInProgress(void);
 extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
 extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
index 75ef17a5a0a7ec2edbef6d3c7ccb6a726cb9d3e7..c49c2f5fd2bbeb36576f6c3f3c749b7c9ed35077 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.28 2010/01/15 09:19:09 heikki Exp $
+ * $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.29 2010/01/27 15:27:51 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -30,7 +30,6 @@ typedef enum
        PMSIGNAL_START_AUTOVAC_LAUNCHER,        /* start an autovacuum launcher */
        PMSIGNAL_START_AUTOVAC_WORKER,          /* start an autovacuum worker */
        PMSIGNAL_START_WALRECEIVER,                     /* start a walreceiver */
-       PMSIGNAL_SHUTDOWN_WALRECEIVER,          /* shut down a walreceiver */
 
        NUM_PMSIGNALS                           /* Must be last value of enum! */
 } PMSignalReason;