]> granicus.if.org Git - postgresql/commitdiff
Cascading replication feature for streaming log-based replication.
authorSimon Riggs <simon@2ndQuadrant.com>
Tue, 19 Jul 2011 02:40:03 +0000 (03:40 +0100)
committerSimon Riggs <simon@2ndQuadrant.com>
Tue, 19 Jul 2011 02:40:03 +0000 (03:40 +0100)
Standby servers can now have WALSender processes, which can work with
either WALReceiver or archive_commands to pass data. Fully updated
docs, including new conceptual terms of sending server, upstream and
downstream servers. WALSenders terminated when promote to master.

Fujii Masao, review, rework and doc rewrite by Simon Riggs

doc/src/sgml/config.sgml
doc/src/sgml/high-availability.sgml
src/backend/access/transam/xlog.c
src/backend/postmaster/postmaster.c
src/backend/replication/basebackup.c
src/backend/replication/syncrep.c
src/backend/replication/walreceiver.c
src/backend/replication/walsender.c
src/include/access/xlog.h
src/include/replication/walsender.h

index afa087a3465287f4a0dc77af3e149a7e25ec6b94..6091105a860765328f6f015123300985d18bd8bd 100644 (file)
@@ -1962,24 +1962,26 @@ SET ENABLE_SEQSCAN TO OFF;
     <para>
      These settings control the behavior of the built-in
      <firstterm>streaming replication</> feature (see
-     <xref linkend="streaming-replication">).
-     Some parameters must be set on the master server, while others must be
-     set on the standby server(s) that will receive replication data.
+     <xref linkend="streaming-replication">).  Servers will be either a
+     Master or a Standby server.  Masters can send data, while Standby(s)
+     are always receivers of replicated data.  When cascading replication
+     (see <xref linkend="cascading-replication">) is used, Standby server(s)
+     can also be senders, as well as receivers.
+     Parameters are mainly for Sending and Standby servers, though some
+     parameters have meaning only on the Master server.  Settings may vary
+     across the cluster without problems if that is required.
     </para>
 
-    <sect2 id="runtime-config-replication-master">
-     <title>Master Server</title>
+    <sect2 id="runtime-config-replication-sender">
+     <title>Sending Server(s)</title>
 
      <para>
-      These parameters can be set on the primary server that is
+      These parameters can be set on any server that is
       to send replication data to one or more standby servers.
-      Note that in addition to these parameters,
-      <xref linkend="guc-wal-level"> must be set appropriately on the master
-      server, and you will typically want to enable WAL archiving as
-      well (see <xref linkend="runtime-config-wal-archiving">).
-      The values of these parameters on standby servers are irrelevant,
-      although you may wish to set them there in preparation for the
-      possibility of a standby becoming the master.
+      The master is always a sending server, so these parameters must
+      always be set on the master.
+      The role and meaning of these parameters does not change after a
+      standby becomes the master.
      </para>
 
      <variablelist>
@@ -2034,10 +2036,11 @@ SET ENABLE_SEQSCAN TO OFF;
         <filename>pg_xlog</>
         directory, in case a standby server needs to fetch them for streaming
         replication. Each segment is normally 16 megabytes. If a standby
-        server connected to the primary falls behind by more than
-        <varname>wal_keep_segments</> segments, the primary might remove
+        server connected to the sending server falls behind by more than
+        <varname>wal_keep_segments</> segments, the sending server might remove
         a WAL segment still needed by the standby, in which case the
-        replication connection will be terminated.  (However, the standby
+        replication connection will be terminated.  Downstream connections
+        will also eventually fail as a result.  (However, the standby
         server can recover by fetching the segment from archive, if WAL
         archiving is in use.)
        </para>
@@ -2050,42 +2053,13 @@ SET ENABLE_SEQSCAN TO OFF;
         doesn't keep any extra segments for standby purposes, so the number
         of old WAL segments available to standby servers is a function of
         the location of the previous checkpoint and status of WAL
-        archiving.  This parameter has no effect on restartpoints.
+        archiving.
         This parameter can only be set in the
         <filename>postgresql.conf</> file or on the server command line.
        </para>
        </listitem>
       </varlistentry>
 
-     <varlistentry id="guc-vacuum-defer-cleanup-age" xreflabel="vacuum_defer_cleanup_age">
-      <term><varname>vacuum_defer_cleanup_age</varname> (<type>integer</type>)</term>
-      <indexterm>
-       <primary><varname>vacuum_defer_cleanup_age</> configuration parameter</primary>
-      </indexterm>
-      <listitem>
-       <para>
-        Specifies the number of transactions by which <command>VACUUM</> and
-        <acronym>HOT</> updates will defer cleanup of dead row versions. The
-        default is zero transactions, meaning that dead row versions can be
-        removed as soon as possible, that is, as soon as they are no longer
-        visible to any open transaction.  You may wish to set this to a
-        non-zero value on a primary server that is supporting hot standby
-        servers, as described in <xref linkend="hot-standby">.  This allows
-        more time for queries on the standby to complete without incurring
-        conflicts due to early cleanup of rows.  However, since the value
-        is measured in terms of number of write transactions occurring on the
-        primary server, it is difficult to predict just how much additional
-        grace time will be made available to standby queries.
-        This parameter can only be set in the <filename>postgresql.conf</>
-        file or on the server command line.
-       </para>
-       <para>
-        You should also consider setting <varname>hot_standby_feedback</>
-        as an alternative to using this parameter.
-       </para>
-      </listitem>
-     </varlistentry>
-
      <varlistentry id="guc-replication-timeout" xreflabel="replication_timeout">
       <term><varname>replication_timeout</varname> (<type>integer</type>)</term>
       <indexterm>
@@ -2095,7 +2069,7 @@ SET ENABLE_SEQSCAN TO OFF;
        <para>
         Terminate replication connections that are inactive longer
         than the specified number of milliseconds. This is useful for
-        the primary server to detect a standby crash or network outage.
+        the sending server to detect a standby crash or network outage.
         A value of zero disables the timeout mechanism.  This parameter
         can only be set in
         the <filename>postgresql.conf</> file or on the server command line.
@@ -2110,6 +2084,26 @@ SET ENABLE_SEQSCAN TO OFF;
       </listitem>
      </varlistentry>
 
+     </variablelist>
+    </sect2>
+
+    <sect2 id="runtime-config-replication-master">
+     <title>Master Server</title>
+
+     <para>
+      These parameters can be set on the master/primary server that is
+      to send replication data to one or more standby servers.
+      Note that in addition to these parameters,
+      <xref linkend="guc-wal-level"> must be set appropriately on the master
+      server, and may also want to enable WAL archiving as
+      well (see <xref linkend="runtime-config-wal-archiving">).
+      The values of these parameters on standby servers are irrelevant,
+      although you may wish to set them there in preparation for the
+      possibility of a standby becoming the master.
+     </para>
+
+    <variablelist>
+
      <varlistentry id="guc-synchronous-standby-names" xreflabel="synchronous_standby_names">
       <term><varname>synchronous_standby_names</varname> (<type>string</type>)</term>
       <indexterm>
@@ -2161,6 +2155,35 @@ SET ENABLE_SEQSCAN TO OFF;
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-vacuum-defer-cleanup-age" xreflabel="vacuum_defer_cleanup_age">
+      <term><varname>vacuum_defer_cleanup_age</varname> (<type>integer</type>)</term>
+      <indexterm>
+       <primary><varname>vacuum_defer_cleanup_age</> configuration parameter</primary>
+      </indexterm>
+      <listitem>
+       <para>
+        Specifies the number of transactions by which <command>VACUUM</> and
+        <acronym>HOT</> updates will defer cleanup of dead row versions. The
+        default is zero transactions, meaning that dead row versions can be
+        removed as soon as possible, that is, as soon as they are no longer
+        visible to any open transaction.  You may wish to set this to a
+        non-zero value on a primary server that is supporting hot standby
+        servers, as described in <xref linkend="hot-standby">.  This allows
+        more time for queries on the standby to complete without incurring
+        conflicts due to early cleanup of rows.  However, since the value
+        is measured in terms of number of write transactions occurring on the
+        primary server, it is difficult to predict just how much additional
+        grace time will be made available to standby queries.
+        This parameter can only be set in the <filename>postgresql.conf</>
+        file or on the server command line.
+       </para>
+       <para>
+        You should also consider setting <varname>hot_standby_feedback</>
+        on standby server(s) as an alternative to using this parameter.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
 
@@ -2261,7 +2284,7 @@ SET ENABLE_SEQSCAN TO OFF;
       <para>
        Specifies the minimum frequency for the WAL receiver
        process on the standby to send information about replication progress
-       to the primary, where it can be seen using the
+       to the primary or upstream standby, where it can be seen using the
        <link linkend="monitoring-stats-views-table">
        <literal>pg_stat_replication</></link> view.  The standby will report
        the last transaction log position it has written, the last position it
@@ -2276,7 +2299,7 @@ SET ENABLE_SEQSCAN TO OFF;
        The default value is 10 seconds.
       </para>
       <para>
-       When <xref linkend="guc-replication-timeout"> is enabled on the primary,
+       When <xref linkend="guc-replication-timeout"> is enabled on a sending server,
        <varname>wal_receiver_status_interval</> must be enabled, and its value
        must be less than the value of <varname>replication_timeout</>.
       </para>
@@ -2291,6 +2314,7 @@ SET ENABLE_SEQSCAN TO OFF;
       <listitem>
        <para>
         Specifies whether or not a hot standby will send feedback to the primary
+        or upstream standby
         about queries currently executing on the standby. This parameter can
         be used to eliminate query cancels caused by cleanup records, but
         can cause database bloat on the primary for some workloads.
@@ -2299,6 +2323,11 @@ SET ENABLE_SEQSCAN TO OFF;
         <literal>off</literal>. This parameter can only be set in the
         <filename>postgresql.conf</> file or on the server command line.
        </para>
+       <para>
+        If cascaded replication is in use the feedback is passed upstream
+        until it eventually reaches the primary.  Standbys make no other use
+        of feedback they receive other than to pass upstream.
+       </para>
       </listitem>
      </varlistentry>
 
index 0d3baa04bcd441b731787e9f18deab57b8ec38fc..674bfb80102eedb32cb44f811691633b466f7742 100644 (file)
@@ -877,8 +877,66 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
      network delay, or that the standby is under heavy load.
     </para>
    </sect3>
+  </sect2>
+
+  <sect2 id="cascading-replication">
+   <title>Cascading Replication</title>
+
+   <indexterm zone="high-availability">
+    <primary>Cascading Replication</primary>
+   </indexterm>
+
+   <para>
+    The cascading replication feature allows a standby server to accept replication
+    connections and stream WAL records to other standbys, acting as a relay.
+    This can be used to reduce the number of direct connections to the master
+    and also to minimise inter-site bandwidth overheads.
+   </para>
 
+   <para>
+    A standby acting as both a receiver and a sender is known as a cascading
+    standby.  Standbys that are more directly connected to the master are known
+    as upstream servers, while those standby servers further away are downstream
+    servers.  Cascading replication does not place limits on the number or
+    arrangement of downstream servers, though each standby connects to only
+    one upstream server which eventually links to a single master/primary
+    server.
+   </para>
+
+   <para>
+    A cascading standby sends not only WAL records received from the
+    master but also those restored from the archive. So even if the replication
+    connection in some upstream connection is terminated, streaming replication
+    continues downstream for as long as new WAL records are available.
+   </para>
+
+   <para>
+    Cascading replication is currently asynchronous. Synchronous replication
+    (see <xref linkend="synchronous-replication">) settings have no effect on
+    cascading replication at present.
+   </para>
+
+   <para>
+    Hot Standby feedback propagates upstream, whatever the cascaded arrangement.
+   </para>
+
+   <para>
+    Promoting a cascading standby terminates the immediate downstream replication
+    connections which it serves. This is because the timeline becomes different
+    between standbys, and they can no longer continue replication.  The
+    effected standby(s) may reconnect to reestablish streaming replication.
+   </para>
+
+   <para>
+    To use cascading replication, set up the cascading standby so that it can
+    accept replication connections, i.e., set <varname>max_wal_senders</>,
+    <varname>hot_standby</> and authentication option (see
+    <xref linkend="streaming-replication"> and <xref linkend="hot-standby">).
+    Also set <varname>primary_conninfo</> in the downstream standby to point
+    to the cascading standby.
+   </para>
   </sect2>
+
   <sect2 id="synchronous-replication">
    <title>Synchronous Replication</title>
 
@@ -955,7 +1013,9 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
     confirmation that the commit record has been received. These parameters
     allow the administrator to specify which standby servers should be
     synchronous standbys. Note that the configuration of synchronous
-    replication is mainly on the master.
+    replication is mainly on the master. Named standbys must be directly
+    connected to the master; the master knows nothing about downstream
+    standby servers using cascaded replication.
    </para>
 
    <para>
index 662b26bc27d4ea8d0c26eeaf9b6d5fe593bafbc3..6a6959f728cfc00caa83c9e1b61b40a240118a37 100644 (file)
@@ -446,6 +446,8 @@ typedef struct XLogCtlData
        XLogRecPtr      recoveryLastRecPtr;
        /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
        TimestampTz recoveryLastXTime;
+       /* end of the last record restored from the archive */
+       XLogRecPtr      restoreLastRecPtr;
        /* Are we requested to pause recovery? */
        bool            recoveryPause;
 
@@ -612,6 +614,7 @@ static void CheckRequiredParameterValues(void);
 static void XLogReportParameters(void);
 static void LocalSetXLogInsertAllowed(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
+static void KeepLogSeg(XLogRecPtr recptr, uint32 *logId, uint32 *logSeg);
 
 static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
                                XLogRecPtr *lsn, BkpBlock *bkpb);
@@ -2729,6 +2732,61 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
                        elog(ERROR, "invalid XLogFileRead source %d", source);
        }
 
+       /*
+        * If the segment was fetched from archival storage, replace
+        * the existing xlog segment (if any) with the archival version.
+        */
+       if (source == XLOG_FROM_ARCHIVE)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
+               XLogRecPtr              endptr;
+               char                    xlogfpath[MAXPGPATH];
+               bool                    reload = false;
+               struct stat             statbuf;
+
+               XLogFilePath(xlogfpath, tli, log, seg);
+               if (stat(xlogfpath, &statbuf) == 0)
+               {
+                       if (unlink(xlogfpath) != 0)
+                               ereport(FATAL,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not remove file \"%s\": %m",
+                                                               xlogfpath)));
+                       reload = true;
+               }
+
+               if (rename(path, xlogfpath) < 0)
+                       ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not rename file \"%s\" to \"%s\": %m",
+                                               path, xlogfpath)));
+
+               /*
+                * If the existing segment was replaced, since walsenders might have
+                * it open, request them to reload a currently-open segment.
+                */
+               if (reload)
+                       WalSndRqstFileReload();
+
+               /*
+                * Calculate the end location of the restored WAL file and save it in
+                * shmem. It's used as current standby flush position, and cascading
+                * walsenders try to send WAL records up to this location.
+                */
+               endptr.xlogid = log;
+               endptr.xrecoff = seg * XLogSegSize;
+               XLByteAdvance(endptr, XLogSegSize);
+
+               SpinLockAcquire(&xlogctl->info_lck);
+               xlogctl->restoreLastRecPtr = endptr;
+               SpinLockRelease(&xlogctl->info_lck);
+
+               /* Signal walsender that new WAL has arrived */
+               if (AllowCascadeReplication())
+                       WalSndWakeup();
+       }
+
        fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
        if (fd >= 0)
        {
@@ -3361,18 +3419,7 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
                        strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
                        strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
                {
-                       /*
-                        * Normally we don't delete old XLOG files during recovery to
-                        * avoid accidentally deleting a file that looks stale due to a
-                        * bug or hardware issue, but in fact contains important data.
-                        * During streaming recovery, however, we will eventually fill the
-                        * disk if we never clean up, so we have to. That's not an issue
-                        * with file-based archive recovery because in that case we
-                        * restore one XLOG file at a time, on-demand, and with a
-                        * different filename that can't be confused with regular XLOG
-                        * files.
-                        */
-                       if (WalRcvInProgress() || XLogArchiveCheckDone(xlde->d_name))
+                       if (RecoveryInProgress() || XLogArchiveCheckDone(xlde->d_name))
                        {
                                snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
 
@@ -5484,62 +5531,23 @@ exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
        }
 
        /*
-        * If the segment was fetched from archival storage, we want to replace
-        * the existing xlog segment (if any) with the archival version.  This is
-        * because whatever is in XLOGDIR is very possibly older than what we have
-        * from the archives, since it could have come from restoring a PGDATA
-        * backup.      In any case, the archival version certainly is more
-        * descriptive of what our current database state is, because that is what
-        * we replayed from.
+        * If we are establishing a new timeline, we have to copy data from
+        * the last WAL segment of the old timeline to create a starting WAL
+        * segment for the new timeline.
         *
-        * Note that if we are establishing a new timeline, ThisTimeLineID is
-        * already set to the new value, and so we will create a new file instead
-        * of overwriting any existing file.  (This is, in fact, always the case
-        * at present.)
+        * Notify the archiver that the last WAL segment of the old timeline
+        * is ready to copy to archival storage. Otherwise, it is not archived
+        * for a while.
         */
-       snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
-       XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
-
-       if (restoredFromArchive)
+       if (endTLI != ThisTimeLineID)
        {
-               ereport(DEBUG3,
-                               (errmsg_internal("moving last restored xlog to \"%s\"",
-                                                                xlogpath)));
-               unlink(xlogpath);               /* might or might not exist */
-               if (rename(recoveryPath, xlogpath) != 0)
-                       ereport(FATAL,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not rename file \"%s\" to \"%s\": %m",
-                                                       recoveryPath, xlogpath)));
-               /* XXX might we need to fix permissions on the file? */
-       }
-       else
-       {
-               /*
-                * If the latest segment is not archival, but there's still a
-                * RECOVERYXLOG laying about, get rid of it.
-                */
-               unlink(recoveryPath);   /* ignore any error */
+               XLogFileCopy(endLogId, endLogSeg,
+                                        endTLI, endLogId, endLogSeg);
 
-               /*
-                * If we are establishing a new timeline, we have to copy data from
-                * the last WAL segment of the old timeline to create a starting WAL
-                * segment for the new timeline.
-                *
-                * Notify the archiver that the last WAL segment of the old timeline
-                * is ready to copy to archival storage. Otherwise, it is not archived
-                * for a while.
-                */
-               if (endTLI != ThisTimeLineID)
+               if (XLogArchivingActive())
                {
-                       XLogFileCopy(endLogId, endLogSeg,
-                                                endTLI, endLogId, endLogSeg);
-
-                       if (XLogArchivingActive())
-                       {
-                               XLogFileName(xlogpath, endTLI, endLogId, endLogSeg);
-                               XLogArchiveNotify(xlogpath);
-                       }
+                       XLogFileName(xlogpath, endTLI, endLogId, endLogSeg);
+                       XLogArchiveNotify(xlogpath);
                }
        }
 
@@ -5550,6 +5558,13 @@ exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
        XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
        XLogArchiveCleanup(xlogpath);
 
+       /*
+        * Since there might be a partial WAL segment named RECOVERYXLOG,
+        * get rid of it.
+        */
+       snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
+       unlink(recoveryPath);           /* ignore any error */
+
        /* Get rid of any remaining recovered timeline-history file, too */
        snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYHISTORY");
        unlink(recoveryPath);           /* ignore any error */
@@ -7871,46 +7886,7 @@ CreateCheckPoint(int flags)
         */
        if (_logId || _logSeg)
        {
-               /*
-                * Calculate the last segment that we need to retain because of
-                * wal_keep_segments, by subtracting wal_keep_segments from the new
-                * checkpoint location.
-                */
-               if (wal_keep_segments > 0)
-               {
-                       uint32          log;
-                       uint32          seg;
-                       int                     d_log;
-                       int                     d_seg;
-
-                       XLByteToSeg(recptr, log, seg);
-
-                       d_seg = wal_keep_segments % XLogSegsPerFile;
-                       d_log = wal_keep_segments / XLogSegsPerFile;
-                       if (seg < d_seg)
-                       {
-                               d_log += 1;
-                               seg = seg - d_seg + XLogSegsPerFile;
-                       }
-                       else
-                               seg = seg - d_seg;
-                       /* avoid underflow, don't go below (0,1) */
-                       if (log < d_log || (log == d_log && seg == 0))
-                       {
-                               log = 0;
-                               seg = 1;
-                       }
-                       else
-                               log = log - d_log;
-
-                       /* don't delete WAL segments newer than the calculated segment */
-                       if (log < _logId || (log == _logId && seg < _logSeg))
-                       {
-                               _logId = log;
-                               _logSeg = seg;
-                       }
-               }
-
+               KeepLogSeg(recptr, &_logId, &_logSeg);
                PrevLogSeg(_logId, _logSeg);
                RemoveOldXlogFiles(_logId, _logSeg, recptr);
        }
@@ -8151,17 +8127,16 @@ CreateRestartPoint(int flags)
        /*
         * Delete old log files (those no longer needed even for previous
         * checkpoint/restartpoint) to prevent the disk holding the xlog from
-        * growing full. We don't need do this during normal recovery, but during
-        * streaming recovery we have to or the disk will eventually fill up from
-        * old log files streamed from master.
+        * growing full.
         */
-       if (WalRcvInProgress() && (_logId || _logSeg))
+       if (_logId || _logSeg)
        {
                XLogRecPtr      endptr;
 
                /* Get the current (or recent) end of xlog */
-               endptr = GetWalRcvWriteRecPtr(NULL);
+               endptr = GetStandbyFlushRecPtr();
 
+               KeepLogSeg(endptr, &_logId, &_logSeg);
                PrevLogSeg(_logId, _logSeg);
                RemoveOldXlogFiles(_logId, _logSeg, endptr);
 
@@ -8206,6 +8181,50 @@ CreateRestartPoint(int flags)
        return true;
 }
 
+/*
+ * Calculate the last segment that we need to retain because of
+ * wal_keep_segments, by subtracting wal_keep_segments from
+ * the given xlog location, recptr.
+ */
+static void
+KeepLogSeg(XLogRecPtr recptr, uint32 *logId, uint32 *logSeg)
+{
+       uint32          log;
+       uint32          seg;
+       int                     d_log;
+       int                     d_seg;
+
+       if (wal_keep_segments == 0)
+               return;
+
+       XLByteToSeg(recptr, log, seg);
+
+       d_seg = wal_keep_segments % XLogSegsPerFile;
+       d_log = wal_keep_segments / XLogSegsPerFile;
+       if (seg < d_seg)
+       {
+               d_log += 1;
+               seg = seg - d_seg + XLogSegsPerFile;
+       }
+       else
+               seg = seg - d_seg;
+       /* avoid underflow, don't go below (0,1) */
+       if (log < d_log || (log == d_log && seg == 0))
+       {
+               log = 0;
+               seg = 1;
+       }
+       else
+               log = log - d_log;
+
+       /* don't delete WAL segments newer than the calculated segment */
+       if (log < *logId || (log == *logId && seg < *logSeg))
+       {
+               *logId = log;
+               *logSeg = seg;
+       }
+}
+
 /*
  * Write a NEXTOID log record
  */
@@ -9549,10 +9568,14 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
 /*
  * Get latest redo apply position.
  *
+ * Optionally, returns the end byte position of the last restored
+ * WAL segment. Callers not interested in that value may pass
+ * NULL for restoreLastRecPtr.
+ *
  * Exported to allow WALReceiver to read the pointer directly.
  */
 XLogRecPtr
-GetXLogReplayRecPtr(void)
+GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;
@@ -9560,11 +9583,33 @@ GetXLogReplayRecPtr(void)
 
        SpinLockAcquire(&xlogctl->info_lck);
        recptr = xlogctl->recoveryLastRecPtr;
+       if (restoreLastRecPtr)
+               *restoreLastRecPtr = xlogctl->restoreLastRecPtr;
        SpinLockRelease(&xlogctl->info_lck);
 
        return recptr;
 }
 
+/*
+ * Get current standby flush position, ie, the last WAL position
+ * known to be fsync'd to disk in standby.
+ */
+XLogRecPtr
+GetStandbyFlushRecPtr(void)
+{
+       XLogRecPtr      receivePtr;
+       XLogRecPtr      replayPtr;
+       XLogRecPtr      restorePtr;
+
+       receivePtr = GetWalRcvWriteRecPtr(NULL);
+       replayPtr = GetXLogReplayRecPtr(&restorePtr);
+
+       if (XLByteLT(receivePtr, replayPtr))
+               return XLByteLT(replayPtr, restorePtr) ? restorePtr : replayPtr;
+       else
+               return XLByteLT(receivePtr, restorePtr) ? restorePtr : receivePtr;
+}
+
 /*
  * Report the last WAL replay location (same format as pg_start_backup etc)
  *
@@ -9577,7 +9622,7 @@ pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
        XLogRecPtr      recptr;
        char            location[MAXFNAMELEN];
 
-       recptr = GetXLogReplayRecPtr();
+       recptr = GetXLogReplayRecPtr(NULL);
 
        if (recptr.xlogid == 0 && recptr.xrecoff == 0)
                PG_RETURN_NULL();
index 9bcbf212f8cbb174a83f5116780e30ea39f189ac..412bc96465cf595eba1ff9e80041e98c4c1a7c91 100644 (file)
@@ -2316,6 +2316,26 @@ reaper(SIGNAL_ARGS)
                        ReachedNormalRunning = true;
                        pmState = PM_RUN;
 
+                       /*
+                        * Kill any walsenders to force the downstream standby(s) to
+                        * reread the timeline history file, adjust their timelines and
+                        * establish replication connections again. This is required
+                        * because the timeline of cascading standby is not consistent
+                        * with that of cascaded one just after failover. We LOG this
+                        * message since we need to leave a record to explain this
+                        * disconnection.
+                        *
+                        * XXX should avoid the need for disconnection. When we do,
+                        * am_cascading_walsender should be replaced with RecoveryInProgress()
+                        */
+                       if (max_wal_senders > 0)
+                       {
+                               ereport(LOG,
+                                               (errmsg("terminating walsender all processes to force cascaded"
+                                                               "standby(s) to update timeline and reconnect")));
+                               SignalSomeChildren(SIGUSR2, BACKEND_TYPE_WALSND);
+                       }
+
                        /*
                         * Crank up the background writer, if we didn't do that already
                         * when we entered consistent recovery state.  It doesn't matter
index bcde19c71b69249a7c3902524b748a5535b0a8bb..74d28440bf4c0bbb509d1682497d08aac4857193 100644 (file)
@@ -339,6 +339,11 @@ SendBaseBackup(BaseBackupCmd *cmd)
        MemoryContext old_context;
        basebackup_options opt;
 
+       if (am_cascading_walsender)
+               ereport(FATAL,
+                               (errcode(ERRCODE_CANNOT_CONNECT_NOW),
+                                errmsg("recovery is still in progress, can't accept WAL streaming connections for backup")));
+
        parse_basebackup_options(cmd->options, &opt);
 
        backup_context = AllocSetContextCreate(CurrentMemoryContext,
index b73d225a8ef3fa7fd3afae1c522a043d77900f8b..32db2bc4c524acbd901747eb74cb8e0dddad44d8 100644 (file)
@@ -469,6 +469,13 @@ SyncRepGetStandbyPriority(void)
        int                     priority = 0;
        bool            found = false;
 
+       /*
+        * Since synchronous cascade replication is not allowed, we always
+        * set the priority of cascading walsender to zero.
+        */
+       if (am_cascading_walsender)
+               return 0;
+
        /* Need a modifiable copy of string */
        rawstring = pstrdup(SyncRepStandbyNames);
 
index ea6f6cdcdaf966511452f306e25f977767de2406..c24fa87394d0bb9eb15e15762712149fa61d7ff7 100644 (file)
@@ -44,6 +44,7 @@
 #include "miscadmin.h"
 #include "replication/walprotocol.h"
 #include "replication/walreceiver.h"
+#include "replication/walsender.h"
 #include "storage/ipc.h"
 #include "storage/pmsignal.h"
 #include "storage/procarray.h"
@@ -564,8 +565,10 @@ XLogWalRcvFlush(bool dying)
                }
                SpinLockRelease(&walrcv->mutex);
 
-               /* Signal the startup process that new WAL has arrived */
+               /* Signal the startup process and walsender that new WAL has arrived */
                WakeupRecovery();
+               if (AllowCascadeReplication())
+                       WalSndWakeup();
 
                /* Report XLOG streaming progress in PS display */
                if (update_process_title)
@@ -625,7 +628,7 @@ XLogWalRcvSendReply(void)
        /* Construct a new message */
        reply_message.write = LogstreamResult.Write;
        reply_message.flush = LogstreamResult.Flush;
-       reply_message.apply = GetXLogReplayRecPtr();
+       reply_message.apply = GetXLogReplayRecPtr(NULL);
        reply_message.sendTime = now;
 
        elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
index bc5b3300d23c7f0e0bf4623bb024c372a0d6c3e8..63a63048dbb90fc4002108753f98c069a1dd1c00 100644 (file)
@@ -48,6 +48,7 @@
 #include "replication/basebackup.h"
 #include "replication/replnodes.h"
 #include "replication/walprotocol.h"
+#include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -70,6 +71,7 @@ WalSnd           *MyWalSnd = NULL;
 
 /* Global state */
 bool           am_walsender = false;           /* Am I a walsender process ? */
+bool           am_cascading_walsender = false; /* Am I cascading WAL to another standby ? */
 
 /* User-settable parameters for walsender */
 int                    max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
@@ -135,10 +137,7 @@ WalSenderMain(void)
 {
        MemoryContext walsnd_context;
 
-       if (RecoveryInProgress())
-               ereport(FATAL,
-                               (errcode(ERRCODE_CANNOT_CONNECT_NOW),
-                                errmsg("recovery is still in progress, can't accept WAL streaming connections")));
+       am_cascading_walsender = RecoveryInProgress();
 
        /* Create a per-walsender data structure in shared memory */
        InitWalSnd();
@@ -165,6 +164,12 @@ WalSenderMain(void)
        /* Unblock signals (they were blocked when the postmaster forked us) */
        PG_SETMASK(&UnBlockSig);
 
+       /*
+        * Use the recovery target timeline ID during recovery
+        */
+       if (am_cascading_walsender)
+               ThisTimeLineID = GetRecoveryTargetTLI();
+
        /* Tell the standby that walsender is ready for receiving commands */
        ReadyForQuery(DestRemote);
 
@@ -290,7 +295,7 @@ IdentifySystem(void)
                         GetSystemIdentifier());
        snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
 
-       logptr = GetInsertRecPtr();
+       logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
 
        snprintf(xpos, sizeof(xpos), "%X/%X",
                         logptr.xlogid, logptr.xrecoff);
@@ -364,19 +369,13 @@ StartReplication(StartReplicationCmd *cmd)
        SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
 
        /*
-        * Check that we're logging enough information in the WAL for
-        * log-shipping.
+        * We assume here that we're logging enough information in the WAL for
+        * log-shipping, since this is checked in PostmasterMain().
         *
-        * NOTE: This only checks the current value of wal_level. Even if the
-        * current setting is not 'minimal', there can be old WAL in the pg_xlog
-        * directory that was created with 'minimal'. So this is not bulletproof,
-        * the purpose is just to give a user-friendly error message that hints
-        * how to configure the system correctly.
+        * NOTE: wal_level can only change at shutdown, so in most cases it is
+        * difficult for there to be WAL data that we can still see that was written
+        * at wal_level='minimal'.
         */
-       if (wal_level == WAL_LEVEL_MINIMAL)
-               ereport(FATAL,
-                               (errcode(ERRCODE_CANNOT_CONNECT_NOW),
-               errmsg("standby connections not allowed because wal_level=minimal")));
 
        /*
         * When we first start replication the standby will be behind the primary.
@@ -601,7 +600,8 @@ ProcessStandbyReplyMessage(void)
                SpinLockRelease(&walsnd->mutex);
        }
 
-       SyncRepReleaseWaiters();
+       if (!am_cascading_walsender)
+               SyncRepReleaseWaiters();
 }
 
 /*
@@ -764,6 +764,8 @@ WalSndLoop(void)
                /*
                 * When SIGUSR2 arrives, we send any outstanding logs up to the
                 * shutdown checkpoint record (i.e., the latest record) and exit.
+                * This may be a normal termination at shutdown, or a promotion,
+                * the walsender is not sure which.
                 */
                if (walsender_ready_to_stop && !pq_is_send_pending())
                {
@@ -933,7 +935,7 @@ WalSndKill(int code, Datum arg)
 }
 
 /*
- * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
+ * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
  *
  * XXX probably this should be improved to suck data directly from the
  * WAL buffers when possible.
@@ -944,15 +946,21 @@ WalSndKill(int code, Datum arg)
  * more than one.
  */
 void
-XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
+XLogRead(char *buf, XLogRecPtr startptr, Size count)
 {
-       XLogRecPtr      startRecPtr = recptr;
-       char            path[MAXPGPATH];
+       char               *p;
+       XLogRecPtr      recptr;
+       Size                    nbytes;
        uint32          lastRemovedLog;
        uint32          lastRemovedSeg;
        uint32          log;
        uint32          seg;
 
+retry:
+       p = buf;
+       recptr = startptr;
+       nbytes = count;
+
        while (nbytes > 0)
        {
                uint32          startoff;
@@ -963,6 +971,8 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
 
                if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
                {
+                       char            path[MAXPGPATH];
+
                        /* Switch to another logfile segment */
                        if (sendFile >= 0)
                                close(sendFile);
@@ -1014,7 +1024,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
                else
                        segbytes = nbytes;
 
-               readbytes = read(sendFile, buf, segbytes);
+               readbytes = read(sendFile, p, segbytes);
                if (readbytes <= 0)
                        ereport(ERROR,
                                        (errcode_for_file_access(),
@@ -1027,7 +1037,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
 
                sendOff += readbytes;
                nbytes -= readbytes;
-               buf += readbytes;
+               p += readbytes;
        }
 
        /*
@@ -1038,7 +1048,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
         * already have been overwritten with new WAL records.
         */
        XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
-       XLByteToSeg(startRecPtr, log, seg);
+       XLByteToSeg(startptr, log, seg);
        if (log < lastRemovedLog ||
                (log == lastRemovedLog && seg <= lastRemovedSeg))
        {
@@ -1050,6 +1060,32 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
                                 errmsg("requested WAL segment %s has already been removed",
                                                filename)));
        }
+
+       /*
+        * During recovery, the currently-open WAL file might be replaced with
+        * the file of the same name retrieved from archive. So we always need
+        * to check what we read was valid after reading into the buffer. If it's
+        * invalid, we try to open and read the file again.
+        */
+       if (am_cascading_walsender)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = MyWalSnd;
+               bool            reload;
+
+               SpinLockAcquire(&walsnd->mutex);
+               reload = walsnd->needreload;
+               walsnd->needreload = false;
+               SpinLockRelease(&walsnd->mutex);
+
+               if (reload && sendFile >= 0)
+               {
+                       close(sendFile);
+                       sendFile = -1;
+
+                       goto retry;
+               }
+       }
 }
 
 /*
@@ -1082,7 +1118,7 @@ XLogSend(char *msgbuf, bool *caughtup)
         * subsequently crashes and restarts, slaves must not have applied any WAL
         * that gets lost on the master.
         */
-       SendRqstPtr = GetFlushRecPtr();
+       SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr();
 
        /* Quick exit if nothing to do */
        if (XLByteLE(SendRqstPtr, sentPtr))
@@ -1187,6 +1223,28 @@ XLogSend(char *msgbuf, bool *caughtup)
        return;
 }
 
+/*
+ * Request walsenders to reload the currently-open WAL file
+ */
+void
+WalSndRqstFileReload(void)
+{
+       int                     i;
+
+       for (i = 0; i < max_wal_senders; i++)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+               if (walsnd->pid == 0)
+                       continue;
+
+               SpinLockAcquire(&walsnd->mutex);
+               walsnd->needreload = true;
+               SpinLockRelease(&walsnd->mutex);
+       }
+}
+
 /* SIGHUP: set flag to re-read config file at next convenient time */
 static void
 WalSndSigHupHandler(SIGNAL_ARGS)
index 7056fd618913b281d917cb888d274200002ffb01..cdbf63fa76e0e7b154c084191d0df6138e1cbfcc 100644 (file)
@@ -221,6 +221,9 @@ extern int  wal_level;
 /* Do we need to WAL-log information required only for Hot Standby? */
 #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_HOT_STANDBY)
 
+/* Can we allow the standby to accept replication connection from another standby? */
+#define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
+
 #ifdef WAL_DEBUG
 extern bool XLOG_DEBUG;
 #endif
@@ -292,7 +295,8 @@ extern bool RecoveryInProgress(void);
 extern bool HotStandbyActive(void);
 extern bool XLogInsertAllowed(void);
 extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
-extern XLogRecPtr GetXLogReplayRecPtr(void);
+extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr);
+extern XLogRecPtr GetStandbyFlushRecPtr(void);
 
 extern void UpdateControlFile(void);
 extern uint64 GetSystemIdentifier(void);
index 6ee8668d0a4e34b95c3bfc81d7ac0ce35f65abea..cb8e70ef38a9221456ac0dd3bbe1e8e38c744f43 100644 (file)
@@ -35,6 +35,7 @@ typedef struct WalSnd
        pid_t           pid;                    /* this walsender's process id, or 0 */
        WalSndState state;                      /* this walsender's state */
        XLogRecPtr      sentPtr;                /* WAL has been sent up to this point */
+       bool            needreload;             /* does currently-open file need to be reloaded? */
 
        /*
         * The xlog locations that have been written, flushed, and applied by
@@ -92,6 +93,7 @@ extern WalSndCtlData *WalSndCtl;
 
 /* global state */
 extern bool am_walsender;
+extern bool am_cascading_walsender;
 extern volatile sig_atomic_t walsender_shutdown_requested;
 extern volatile sig_atomic_t walsender_ready_to_stop;
 
@@ -106,7 +108,8 @@ extern Size WalSndShmemSize(void);
 extern void WalSndShmemInit(void);
 extern void WalSndWakeup(void);
 extern void WalSndSetState(WalSndState state);
-extern void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
+extern void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+extern void WalSndRqstFileReload(void);
 
 extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);