]> granicus.if.org Git - postgresql/commitdiff
Introduce replication slots.
authorRobert Haas <rhaas@postgresql.org>
Sat, 1 Feb 2014 03:45:17 +0000 (22:45 -0500)
committerRobert Haas <rhaas@postgresql.org>
Sat, 1 Feb 2014 03:45:36 +0000 (22:45 -0500)
Replication slots are a crash-safe data structure which can be created
on either a master or a standby to prevent premature removal of
write-ahead log segments needed by a standby, as well as (with
hot_standby_feedback=on) pruning of tuples whose removal would cause
replication conflicts.  Slots have some advantages over existing
techniques, as explained in the documentation.

In a few places, we refer to the type of replication slots introduced
by this patch as "physical" slots, because forthcoming patches for
logical decoding will also have slots, but with somewhat different
properties.

Andres Freund and Robert Haas

42 files changed:
doc/src/sgml/catalogs.sgml
doc/src/sgml/config.sgml
doc/src/sgml/func.sgml
doc/src/sgml/high-availability.sgml
doc/src/sgml/protocol.sgml
doc/src/sgml/recovery-config.sgml
doc/src/sgml/ref/pg_receivexlog.sgml
src/backend/access/transam/xlog.c
src/backend/catalog/system_views.sql
src/backend/replication/Makefile
src/backend/replication/README
src/backend/replication/basebackup.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/repl_gram.y
src/backend/replication/repl_scanner.l
src/backend/replication/slot.c [new file with mode: 0644]
src/backend/replication/slotfuncs.c [new file with mode: 0644]
src/backend/replication/walreceiver.c
src/backend/replication/walreceiverfuncs.c
src/backend/replication/walsender.c
src/backend/storage/ipc/ipci.c
src/backend/storage/ipc/procarray.c
src/backend/storage/lmgr/lwlock.c
src/backend/storage/lmgr/proc.c
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/bin/initdb/initdb.c
src/bin/pg_basebackup/pg_receivexlog.c
src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/streamutil.c
src/bin/pg_basebackup/streamutil.h
src/include/access/xlog.h
src/include/catalog/catversion.h
src/include/catalog/pg_proc.h
src/include/nodes/nodes.h
src/include/nodes/replnodes.h
src/include/replication/slot.h [new file with mode: 0644]
src/include/replication/walreceiver.h
src/include/storage/lwlock.h
src/include/storage/procarray.h
src/test/regress/expected/rules.out
src/tools/pgindent/typedefs.list

index 3f8d9bfafbb1734cb73e3e8042b3eb105cc61b90..dca24fc0705790bd760ef6b298f232df1316d85c 100644 (file)
       <entry>query rewrite rules</entry>
      </row>
 
+     <row>
+      <entry><link linkend="catalog-pg-replication-slots"><structname>pg_replication_slots</structname></link></entry>
+      <entry>replication slot information</entry>
+     </row>
+
      <row>
       <entry><link linkend="catalog-pg-seclabel"><structname>pg_seclabel</structname></link></entry>
       <entry>security labels on database objects</entry>
 
  </sect1>
 
+ <sect1 id="catalog-pg-replication-slots">
+  <title><structname>pg_replication_slots</structname></title>
+
+  <indexterm zone="catalog-pg-replication-slots">
+   <primary>pg_replication_slots</primary>
+  </indexterm>
+
+  <para>
+   The <structname>pg_replication_slots</structname> view provides a listing
+   of all replication slots that currently exist on the database cluster,
+   along with their current state.
+  </para>
+
+  <para>
+   For more on replication slots,
+   see <xref linkend="streaming-replication-slots">.
+  </para>
+
+  <table>
+
+   <title><structname>pg_replication_slots</structname> Columns</title>
+
+   <tgroup cols="4">
+    <thead>
+     <row>
+      <entry>Name</entry>
+      <entry>Type</entry>
+      <entry>References</entry>
+      <entry>Description</entry>
+     </row>
+    </thead>
+
+    <tbody>
+     <row>
+      <entry><structfield>slot_name</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry></entry>
+      <entry>A unique, cluster-wide identifier for the replication slot</entry>
+     </row>
+
+     <row>
+      <entry><structfield>slot_type</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry></entry>
+      <entry>The slot type - <literal>physical</> or <literal>logical</></entry>
+     </row>
+
+     <row>
+      <entry><structfield>datoid</structfield></entry>
+      <entry><type>oid</type></entry>
+      <entry><literal><link linkend="catalog-pg-database"><structname>pg_database</structname></link>.oid</literal></entry>
+      <entry>The oid of the database this slot is associated with, or
+      null. Only logical slots have an associated database.</entry>
+     </row>
+
+     <row>
+      <entry><structfield>database</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry><literal><link linkend="catalog-pg-database"><structname>pg_database</structname></link>.datname</literal></entry>
+      <entry>The name of the database this slot is associated with, or
+      null. Only logical slots have an associated database.</entry>
+     </row>
+
+     <row>
+      <entry><structfield>active</structfield></entry>
+      <entry><type>boolean</type></entry>
+      <entry></entry>
+      <entry>True if this slot is currently actively being used</entry>
+     </row>
+
+     <row>
+      <entry><structfield>xmin</structfield></entry>
+      <entry><type>xid</type></entry>
+      <entry></entry>
+      <entry>The oldest transaction that this slot needs the database to
+      retain.  <literal>VACUUM</literal> cannot remove tuples deleted
+      by any later transaction.
+      </entry>
+     </row>
+
+     <row>
+      <entry><structfield>restart_lsn</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry></entry>
+      <entry>The address (<literal>LSN</literal>) of oldest WAL which still
+      might be required by the consumer of this slot and thus won't be
+      automatically removed during checkpoints.
+      </entry>
+     </row>
+    </tbody>
+   </tgroup>
+  </table>
+ </sect1>
+
 
  <sect1 id="catalog-pg-seclabel">
   <title><structname>pg_seclabel</structname></title>
index 1b5f831d6556a7e522173ea59ca7558f3d0c34a7..000a46fabb04ca21d5ebb853348e36f4cc0472b3 100644 (file)
@@ -2348,6 +2348,25 @@ include 'filename'
        </listitem>
       </varlistentry>
 
+      <varlistentry id="guc-max-replication-slots" xreflabel="max_replication_slots">
+       <term><varname>max_replication_slots</varname> (<type>integer</type>)</term>
+       <indexterm>
+        <primary><varname>max_replication_slots</> configuration parameter</primary>
+       </indexterm>
+       <listitem>
+        <para>
+         Specifies the maximum number of replication slots
+         (see <xref linkend="streaming-replication-slots"> that the server
+         can support. The default is zero.  This parameter can only be set at
+         server start.
+         <varname>wal_level</varname> must be set
+         to <literal>archive</literal> or higher to allow replication slots to
+         be used. Setting it to a lower value than the number of currently
+         existing replication slots will prevent the server from starting.
+        </para>
+       </listitem>
+      </varlistentry>
+
       <varlistentry id="guc-wal-keep-segments" xreflabel="wal_keep_segments">
        <term><varname>wal_keep_segments</varname> (<type>integer</type>)</term>
        <indexterm>
index 252539f93bef5a026829f461f9b03500efd9ac8f..8cc65b94d10d4108bcacffee9370860f5fd0e299 100644 (file)
@@ -16290,6 +16290,76 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
    </para>
   </sect2>
 
+  <sect2 id="functions-replication">
+   <title>Replication Functions</title>
+
+   <para>
+    PostgreSQL exposes a number of functions for controlling and interacting
+    with replication features. See <xref linkend="streaming-replication">
+    and <xref linkend="streaming-replication-slots">.
+   </para>
+
+   <para>
+    Many of these functions have equivalent commands in the replication
+    protocol; see <xref linkend="protocol-replication">.
+   </para>
+
+   <para>
+    The sections <xref linkend="functions-snapshot-synchronization">, <xref
+    linkend="functions-recovery-control"> and <xref
+    linkend="functions-admin-backup"> are also relevant for replication.
+   </para>
+
+   <table id="functions-replication-table">
+    <title>Replication <acronym>SQL</acronym> Functions</title>
+    <tgroup cols="3">
+     <thead>
+      <row>
+       <entry>Function</entry>
+       <entry>Return Type</entry>
+       <entry>Description</entry>
+      </row>
+     </thead>
+     <tbody>
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_create_physical_replication_slot</primary>
+        </indexterm>
+        <literal><function>pg_create_physical_replication_slot(<parameter>slotname</parameter> <type>text</type>, <parameter>plugin</parameter> <type>text</type>)</function></literal>
+       </entry>
+       <entry>
+        (<parameter>slotname</parameter> <type>text</type>, <parameter>xlog_position</parameter> <type>text</type>)
+       </entry>
+       <entry>
+        Creates a new physical replication slot named
+        <parameter>slotname</parameter>. Streaming changes from a physical slot
+        is only possible with the walsender protocol - see <xref
+        linkend="protocol-replication">. Corresponds to the walsender protocol
+        command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>.
+       </entry>
+      </row>
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_drop_replication_slot</primary>
+        </indexterm>
+        <literal><function>pg_drop_replication_slot(<parameter>slotname</parameter> <type>text</type>)</function></literal>
+       </entry>
+       <entry>
+        (<parameter>slotname</parameter> <type>text</type>)
+       </entry>
+       <entry>
+        Drops the physical or logical replication slot
+        named <parameter>slotname</parameter>. Same as walsender protocol
+        command <literal>DROP_REPLICATION_SLOT</>.
+       </entry>
+      </row>
+     </tbody>
+    </tgroup>
+   </table>
+  </sect2>
+
   <sect2 id="functions-admin-dbobject">
    <title>Database Object Management Functions</title>
 
index e2e5ac93ab98e6fd1635e6f88e3bad6c5782e255..9d43586fe2f99dc66059799f745465d9c21d2e4b 100644 (file)
@@ -643,7 +643,9 @@ protocol to make nodes agree on a serializable transactional order.
     entries in <filename>pg_hba.conf</> with the database field set to
     <literal>replication</>.  Also ensure <varname>max_wal_senders</> is set
     to a sufficiently large value in the configuration file of the primary
-    server.
+    server. If replication slots will be used,
+    ensure that <varname>max_replication_slots</varname> is set sufficiently
+    high as well.
    </para>
 
    <para>
@@ -750,13 +752,14 @@ archive_cleanup_command = 'pg_archivecleanup /path/to/archive %r'
 
    <para>
     If you use streaming replication without file-based continuous
-    archiving, you have to set <varname>wal_keep_segments</> in the master
-    to a value high enough to ensure that old WAL segments are not recycled
-    too early, while the standby might still need them to catch up. If the
-    standby falls behind too much, it needs to be reinitialized from a new
-    base backup. If you set up a WAL archive that's accessible from the
-    standby, <varname>wal_keep_segments</> is not required as the standby can always
-    use the archive to catch up.
+    archiving, the server might recycle old WAL segments before the standby
+    has received them.  If this occurs, the standby will need to be
+    reinitialized from a new base backup.  You can avoid this by setting
+    <varname>wal_keep_segments</> to a value large enough to ensure that
+    WAL segments are not recycled too early, or by configuration a replication
+    slot for the standby.  If you set up a WAL archive that's accessible from
+    the standby, these solutions are not required, since the standby can
+    always use the archive to catch up provided it retains enough segments.
    </para>
 
    <para>
@@ -871,6 +874,81 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
    </sect3>
   </sect2>
 
+  <sect2 id="streaming-replication-slots">
+   <title>Replication Slots</title>
+   <indexterm zone="high-availability">
+    <primary>Replication Slots</primary>
+   </indexterm>
+   <para>
+    Replication slots provide an automated way to ensure that the master does
+    not remove WAL segments until they have been received by all standbys,
+    and that the master does not remove rows which could cause a
+    <link linkend="hot-standby-conflict">recovery conflict</> even when the
+    standby is disconnected.
+   </para>
+   <para>
+    In lieu of using replication slots, it is possible to prevent the removal
+    of old WAL segments using <xref linkend="guc-wal-keep-segments">, or by
+    storing the segments in an archive using <xref linkend="restore-command">.
+    However, these methods often result in retaining more WAL segments than
+    required, whereas replication slots retain only the number of segments
+    known to be needed.  An advantage of these methods is that they bound
+    the space requirement for <literal>pg_xlog</>; there is currently no way
+    to do this using replication slots.
+   </para>
+   <para>
+    Similarly, <varname>hot_standby_feedback</varname>
+    and <varname>vacuum_defer_cleanup_age</varname> provide protection against
+    relevant rows being removed by vacuum, but the former provides no
+    protection during any time period when the standby is not connected,
+    and the latter often needs to be set to a high value to provide adequate
+    protection.  Replication slots overcome these disadvantages.
+   </para>
+   <sect3 id="streaming-replication-slots-manipulation">
+    <title>Querying and manipulating replication slots</title>
+    <para>
+     Each replication slot has a name, which can contain lower-case letters,
+     numbers, and the underscore character.
+    </para>
+    <para>
+     Existing replication slots and their state can be seen in the
+     <link linkend="catalog-pg-replication-slots"><structname>pg_replication_slots</structname></link>
+     view.
+    </para>
+    <para>
+     Slots can be created and dropped either via the streaming replication
+     protocol (see <xref linkend="protocol-replication">) or via SQL
+     functions (see <xref linkend="functions-replication">).
+    </para>
+   </sect3>
+   <sect3 id="streaming-replication-slots-config">
+    <title>Configuration Example</title>
+    <para>
+     You can create a replication slot like this:
+<programlisting>
+postgres=# SELECT * FROM pg_create_physical_replication_slot('node_a_slot');
+  slotname   | xlog_position
+-------------+---------------
+ node_a_slot |
+
+postgres=# SELECT * FROM pg_replication_slots;
+  slot_name  | slot_type | datoid | database | active | xmin | restart_lsn
+-------------+-----------+--------+----------+--------+------+-------------
+ node_a_slot | physical  |      0 |          | f      |      |
+(1 row)
+</programlisting>
+     To configure the standby to use this slot, <varname>primary_slotname</>
+     should be configured in the standby's <filename>recovery.conf</>.
+     Here is a simple example:
+<programlisting>
+standby_mode = 'on'
+primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
+primary_slotname = 'node_a_slot'
+</programlisting>
+    </para>
+   </sect3>
+  </sect2>
+
   <sect2 id="cascading-replication">
    <title>Cascading Replication</title>
 
index 7d99976a49c2b4c0e4850b8b594686646afb0993..832524e95e45582059ef4b459aa24569c47411e5 100644 (file)
@@ -1401,15 +1401,39 @@ The commands accepted in walsender mode are:
   </varlistentry>
 
   <varlistentry>
-    <term>START_REPLICATION <replaceable class="parameter">XXX/XXX</> TIMELINE <replaceable class="parameter">tli</></term>
+    <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slotname</> <literal>PHYSICAL</literal></term>
+    <indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
+    <listitem>
+     <para>
+      Create a physical replication
+      slot. See <xref linkend="streaming-replication-slots"> for more about
+      replication slots.
+     </para>
+     <variablelist>
+      <varlistentry>
+       <term><replaceable class="parameter">slotname</></term>
+       <listitem>
+         <para>
+          The name of the slot to create. Must be a valid replication slot
+          name (see <xref linkend="streaming-replication-slots-manipulation">).
+         </para>
+       </listitem>
+      </varlistentry>
+     </variablelist>
+    </listitem>
+  </varlistentry>
+
+  <varlistentry>
+    <term><literal>START_REPLICATION</literal> [<literal>SLOT</literal> <replaceable class="parameter">slotname</>] [<literal>PHYSICAL</literal>] <replaceable class="parameter">XXX/XXX</> <literal>TIMELINE</literal> <replaceable class="parameter">tli</></term>
     <listitem>
      <para>
       Instructs server to start streaming WAL, starting at
-      WAL position <replaceable class="parameter">XXX/XXX</> on timeline
-      <replaceable class="parameter">tli</>.
-      The server can reply with an error, e.g. if the requested section of WAL
-      has already been recycled. On success, server responds with a
-      CopyBothResponse message, and then starts to stream WAL to the frontend.
+      WAL position <replaceable class="parameter">XXX/XXX</>. If specified,
+      streaming starts on timeline <replaceable class="parameter">tli</>;
+      otherwise, the server's current timeline is selected. The server can
+      reply with an error, e.g. if the requested section of WAL has already
+      been recycled. On success, server responds with a CopyBothResponse
+      message, and then starts to stream WAL to the frontend.
      </para>
 
      <para>
@@ -1443,6 +1467,14 @@ The commands accepted in walsender mode are:
       client contains a message of one of the following formats:
      </para>
 
+     <para>
+      If a slot's name is provided
+      via <replaceable class="parameter">slotname</>, it will be updated
+      as replication progresses so that the server knows which WAL segments -
+      and if <varname>hot_standby_feedback</> is on which transactions -
+      are still needed by the standby.
+     </para>
+
      <para>
       <variablelist>
       <varlistentry>
@@ -1719,6 +1751,26 @@ The commands accepted in walsender mode are:
     </listitem>
   </varlistentry>
 
+  <varlistentry>
+    <term><literal>DROP_REPLICATION_SLOT</literal> <replaceable class="parameter">slotname</></term>
+    <listitem>
+     <para>
+      Drops a replication slot, freeing any reserved server-side resources. If
+      the slot is currently in use by an active connection, this command fails.
+     </para>
+     <variablelist>
+      <varlistentry>
+       <term><replaceable class="parameter">slotname</></term>
+       <listitem>
+         <para>
+          The name of the slot to drop.
+         </para>
+       </listitem>
+      </varlistentry>
+     </variablelist>
+    </listitem>
+  </varlistentry>
+
   <varlistentry>
     <term>BASE_BACKUP [<literal>LABEL</literal> <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] [<literal>FAST</literal>] [<literal>WAL</literal>] [<literal>NOWAIT</literal>]</term>
     <listitem>
index 4a97bb7a9c446f23b44ffc2aa309e499ad8b7649..b69ce287c8c8bc817e1ef594367216e9a5eb8b41 100644 (file)
@@ -418,6 +418,22 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
          </para>
         </listitem>
        </varlistentry>
+       <varlistentry id="primary-slotname" xreflabel="primary_slotname">
+        <term><varname>primary_slotname</varname> (<type>string</type>)</term>
+        <indexterm>
+          <primary><varname>primary_slotname</> recovery parameter</primary>
+        </indexterm>
+        <listitem>
+         <para>
+          Optionally specifies an existing replication slot to be used when
+          connecting to the primary via streaming replication to control
+          resource removal on the upstream node
+          (see <xref linkend="streaming-replication-slots">).
+          This setting has no effect if <varname>primary_conninfo</> is not
+          set.
+         </para>
+        </listitem>
+       </varlistentry>
        <varlistentry id="trigger-file" xreflabel="trigger_file">
         <term><varname>trigger_file</varname> (<type>string</type>)</term>
         <indexterm>
index 19bebb62f7ab6aa285b7d38f6ba06960abdb3f30..2a44af46c529784335192b62fc0c5a8585f25770 100644 (file)
@@ -225,6 +225,24 @@ PostgreSQL documentation
        </para>
       </listitem>
      </varlistentry>
+
+     <varlistentry>
+      <term><option>--slot</option></term>
+      <listitem>
+        <para>
+         Require <application>pg_receivexlog</application> to use an existing
+         replication slot (see <xref linkend="streaming-replication-slots">).
+         When this option is used, <application>pg_receivexlog</> will report
+         a flush position to the server, indicating when each segment has been
+         synchronized to disk so that the server can remove that segment if it
+         is not otherwise needed.  When using this paramter, it is important
+         to make sure that <application>pg_receivexlog</> cannot become the
+         synchronous standby through an incautious setting of
+         <xref linkend="guc-synchronous-standby-names">; it does not flush
+         data frequently enough for this to work correctly.
+        </para>
+      </listitem>
+     </varlistentry>
     </variablelist>
    </para>
 
index b333d820c7236e9df0ed5472857fe94f2484fa56..7f63185b1cc10998144b0ea28cfdb1bc7ce5e1b1 100644 (file)
@@ -39,6 +39,7 @@
 #include "pgstat.h"
 #include "postmaster/bgwriter.h"
 #include "postmaster/startup.h"
+#include "replication/slot.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/barrier.h"
@@ -225,6 +226,7 @@ static TimestampTz recoveryDelayUntilTime;
 /* options taken from recovery.conf for XLOG streaming */
 static bool StandbyModeRequested = false;
 static char *PrimaryConnInfo = NULL;
+static char *PrimarySlotName = NULL;
 static char *TriggerFile = NULL;
 
 /* are we currently in standby mode? */
@@ -485,6 +487,8 @@ typedef struct XLogCtlData
        uint32          ckptXidEpoch;   /* nextXID & epoch of latest checkpoint */
        TransactionId ckptXid;
        XLogRecPtr      asyncXactLSN;   /* LSN of newest async commit/abort */
+       XLogRecPtr      replicationSlotMinLSN;  /* oldest LSN needed by any slot */
+
        XLogSegNo       lastRemovedSegNo;               /* latest removed/recycled XLOG
                                                                                 * segment */
 
@@ -748,6 +752,7 @@ static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
+static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
 static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
                                XLogRecPtr *lsn, BkpBlock *bkpb);
@@ -2908,6 +2913,39 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
                SetLatch(ProcGlobal->walwriterLatch);
 }
 
+/*
+ * Record the LSN up to which we can remove WAL because it's not required by
+ * any replication slot.
+ */
+void
+XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+
+       SpinLockAcquire(&xlogctl->info_lck);
+       xlogctl->replicationSlotMinLSN = lsn;
+       SpinLockRelease(&xlogctl->info_lck);
+}
+
+
+/*
+ * Return the oldest LSN we must retain to satisfy the needs of some
+ * replication slot.
+ */
+static XLogRecPtr
+XLogGetReplicationSlotMinimumLSN(void)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+       XLogRecPtr              retval;
+       SpinLockAcquire(&xlogctl->info_lck);
+       retval = xlogctl->replicationSlotMinLSN;
+       SpinLockRelease(&xlogctl->info_lck);
+
+       return retval;
+}
+
 /*
  * Advance minRecoveryPoint in control file.
  *
@@ -5478,6 +5516,14 @@ readRecoveryCommandFile(void)
                                        (errmsg_internal("primary_conninfo = '%s'",
                                                                         PrimaryConnInfo)));
                }
+               else if (strcmp(item->name, "primary_slotname") == 0)
+               {
+                       ReplicationSlotValidateName(item->value, ERROR);
+                       PrimarySlotName = pstrdup(item->value);
+                       ereport(DEBUG2,
+                                       (errmsg_internal("primary_slotname = '%s'",
+                                                                        PrimarySlotName)));
+               }
                else if (strcmp(item->name, "trigger_file") == 0)
                {
                        TriggerFile = pstrdup(item->value);
@@ -6505,6 +6551,12 @@ StartupXLOG(void)
        XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
        XLogCtl->ckptXid = checkPoint.nextXid;
 
+       /*
+        * Initialize replication slots, before there's a chance to remove
+        * required resources.
+        */
+       StartupReplicationSlots(checkPoint.redo);
+
        /*
         * Startup MultiXact.  We need to do this early for two reasons: one
         * is that we might try to access multixacts when we do tuple freezing,
@@ -8620,6 +8672,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
        CheckPointMultiXact();
        CheckPointPredicate();
        CheckPointRelationMap();
+       CheckPointReplicationSlots();
        CheckPointBuffers(flags);       /* performs all required fsyncs */
        /* We deliberately delay 2PC checkpointing as long as possible */
        CheckPointTwoPhase(checkPointRedo);
@@ -8938,24 +8991,43 @@ CreateRestartPoint(int flags)
 
 /*
  * Retreat *logSegNo to the last segment that we need to retain because of
- * wal_keep_segments. This is calculated by subtracting wal_keep_segments
- * from the given xlog location, recptr.
+ * either wal_keep_segments or replication slots.
+ *
+ * This is calculated by subtracting wal_keep_segments from the given xlog
+ * location, recptr and by making sure that that result is below the
+ * requirement of replication slots.
  */
 static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 {
        XLogSegNo       segno;
-
-       if (wal_keep_segments == 0)
-               return;
+       XLogRecPtr      keep;
 
        XLByteToSeg(recptr, segno);
+       keep = XLogGetReplicationSlotMinimumLSN();
 
-       /* avoid underflow, don't go below 1 */
-       if (segno <= wal_keep_segments)
-               segno = 1;
-       else
-               segno = segno - wal_keep_segments;
+       /* compute limit for wal_keep_segments first */
+       if (wal_keep_segments > 0)
+       {
+               /* avoid underflow, don't go below 1 */
+               if (segno <= wal_keep_segments)
+                       segno = 1;
+               else
+                       segno = segno - wal_keep_segments;
+       }
+
+       /* then check whether slots limit removal further */
+       if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
+       {
+               XLogRecPtr slotSegNo;
+
+               XLByteToSeg(keep, slotSegNo);
+
+               if (slotSegNo <= 0)
+                       segno = 1;
+               else if (slotSegNo < segno)
+                       segno = slotSegNo;
+       }
 
        /* don't delete WAL segments newer than the calculated segment */
        if (segno < *logSegNo)
@@ -11026,7 +11098,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                                                         tli, curFileTLI);
                                                }
                                                curFileTLI = tli;
-                                               RequestXLogStreaming(tli, ptr, PrimaryConnInfo);
+                                               RequestXLogStreaming(tli, ptr, PrimaryConnInfo,
+                                                                                        PrimarySlotName);
                                                receivedUpto = 0;
                                        }
 
index 277af61f9daa33aa6d2c6d884fb25dda6c7df7d5..f02efeca974ef58b83a1b3a52aa8b3d35485cd47 100644 (file)
@@ -613,6 +613,18 @@ CREATE VIEW pg_stat_replication AS
     WHERE S.usesysid = U.oid AND
             S.pid = W.pid;
 
+CREATE VIEW pg_replication_slots AS
+    SELECT
+            L.slot_name,
+            L.slot_type,
+            L.datoid,
+            D.datname AS database,
+            L.active,
+            L.xmin,
+            L.restart_lsn
+    FROM pg_get_replication_slots() AS L
+            LEFT JOIN pg_database D ON (L.datoid = D.oid);
+
 CREATE VIEW pg_stat_database AS
     SELECT
             D.oid AS datid,
index 2dde0118a47185d6de954725c55abc7007afd20d..7941cb8d5e7663d4b4bb72f53b685a1f326de0cb 100644 (file)
@@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
 override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
 OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
-       repl_gram.o syncrep.o
+       repl_gram.o slot.o slotfuncs.o syncrep.o
 
 include $(top_srcdir)/src/backend/common.mk
 
index 60120ede29ced5559a507153eab234971db6a69c..2f5df49de67b3c93cc0dc81653e191bb31e735b4 100644 (file)
@@ -47,8 +47,9 @@ to fetch more WAL (if streaming replication is configured).
 
 Walreceiver is a postmaster subprocess, so the startup process can't fork it
 directly. Instead, it sends a signal to postmaster, asking postmaster to launch
-it. Before that, however, startup process fills in WalRcvData->conninfo,
-and initializes the starting point in WalRcvData->receiveStart.
+it. Before that, however, startup process fills in WalRcvData->conninfo
+and WalRcvData->slotname, and initializes the starting point in
+WalRcvData->receiveStart.
 
 As walreceiver receives WAL from the master server, and writes and flushes
 it to disk (in pg_xlog), it updates WalRcvData->receivedUpto and signals
index 7d0ed9ce4c84fc92f3d7b7b424a1764212d46471..781f678097d4c6dc5d000703f30fe7bbffe2a43f 100644 (file)
@@ -847,6 +847,10 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces)
                if (strcmp(de->d_name, BACKUP_LABEL_FILE) == 0)
                        continue;
 
+               /* Skip pg_replslot, not useful to copy */
+               if (strcmp(de->d_name, "pg_replslot") == 0)
+                       continue;
+
                /*
                 * Check if the postmaster has signaled us to exit, and abort with an
                 * error in that case. The error handler further up will call
index 2e057b8969fa8d4a1ae15fb123746f0953764c30..ecec8b345634960d591289ba90b220aad0f5e5da 100644 (file)
@@ -49,7 +49,8 @@ static char *recvBuf = NULL;
 static void libpqrcv_connect(char *conninfo);
 static void libpqrcv_identify_system(TimeLineID *primary_tli);
 static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
-static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint);
+static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
+                                                                       char *slotname);
 static void libpqrcv_endstreaming(TimeLineID *next_tli);
 static int     libpqrcv_receive(int timeout, char **buffer);
 static void libpqrcv_send(const char *buffer, int nbytes);
@@ -171,15 +172,20 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
  * throws an ERROR.
  */
 static bool
-libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint)
+libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname)
 {
        char            cmd[64];
        PGresult   *res;
 
        /* Start streaming from the point requested by startup process */
-       snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X TIMELINE %u",
-                        (uint32) (startpoint >> 32), (uint32) startpoint,
-                        tli);
+       if (slotname != NULL)
+               snprintf(cmd, sizeof(cmd),
+                                "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u", slotname,
+                                (uint32) (startpoint >> 32), (uint32) startpoint, tli);
+       else
+               snprintf(cmd, sizeof(cmd),
+                                "START_REPLICATION %X/%X TIMELINE %u",
+                                (uint32) (startpoint >> 32), (uint32) startpoint, tli);
        res = libpqrcv_PQexec(cmd);
 
        if (PQresultStatus(res) == PGRES_COMMAND_OK)
index 015aa44d89c3be831ffb1c999fab41c65d43f65a..d4bd59bab24073565ae858804b2135e3545412b7 100644 (file)
@@ -65,7 +65,7 @@ Node *replication_parse_result;
 }
 
 /* Non-keyword tokens */
-%token <str> SCONST
+%token <str> SCONST IDENT
 %token <uintval> UCONST
 %token <recptr> RECPTR
 
@@ -73,6 +73,8 @@ Node *replication_parse_result;
 %token K_BASE_BACKUP
 %token K_IDENTIFY_SYSTEM
 %token K_START_REPLICATION
+%token K_CREATE_REPLICATION_SLOT
+%token K_DROP_REPLICATION_SLOT
 %token K_TIMELINE_HISTORY
 %token K_LABEL
 %token K_PROGRESS
@@ -80,12 +82,15 @@ Node *replication_parse_result;
 %token K_NOWAIT
 %token K_WAL
 %token K_TIMELINE
+%token K_PHYSICAL
+%token K_SLOT
 
 %type <node>   command
-%type <node>   base_backup start_replication identify_system timeline_history
+%type <node>   base_backup start_replication create_replication_slot drop_replication_slot identify_system timeline_history
 %type <list>   base_backup_opt_list
 %type <defelt> base_backup_opt
 %type <uintval>        opt_timeline
+%type <str>            opt_slot
 %%
 
 firstcmd: command opt_semicolon
@@ -102,6 +107,8 @@ command:
                        identify_system
                        | base_backup
                        | start_replication
+                       | create_replication_slot
+                       | drop_replication_slot
                        | timeline_history
                        ;
 
@@ -158,18 +165,42 @@ base_backup_opt:
                                }
                        ;
 
+/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL */
+create_replication_slot:
+                       K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL
+                               {
+                                       CreateReplicationSlotCmd *cmd;
+                                       cmd = makeNode(CreateReplicationSlotCmd);
+                                       cmd->kind = REPLICATION_KIND_PHYSICAL;
+                                       cmd->slotname = $2;
+                                       $$ = (Node *) cmd;
+                               }
+                       ;
+
+/* DROP_REPLICATION_SLOT SLOT slot */
+drop_replication_slot:
+                       K_DROP_REPLICATION_SLOT IDENT
+                               {
+                                       DropReplicationSlotCmd *cmd;
+                                       cmd = makeNode(DropReplicationSlotCmd);
+                                       cmd->slotname = $2;
+                                       $$ = (Node *) cmd;
+                               }
+                       ;
+
 /*
- * START_REPLICATION %X/%X [TIMELINE %d]
+ * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
  */
 start_replication:
-                       K_START_REPLICATION RECPTR opt_timeline
+                       K_START_REPLICATION opt_slot opt_physical RECPTR opt_timeline
                                {
                                        StartReplicationCmd *cmd;
 
                                        cmd = makeNode(StartReplicationCmd);
-                                       cmd->startpoint = $2;
-                                       cmd->timeline = $3;
-
+                                       cmd->kind = REPLICATION_KIND_PHYSICAL;
+                                       cmd->slotname = $2;
+                                       cmd->startpoint = $4;
+                                       cmd->timeline = $5;
                                        $$ = (Node *) cmd;
                                }
                        ;
@@ -205,6 +236,15 @@ timeline_history:
                                        $$ = (Node *) cmd;
                                }
                        ;
+
+opt_physical : K_PHYSICAL | /* EMPTY */;
+
+
+opt_slot :     K_SLOT IDENT
+                               {
+                                       $$ = $2;
+                               }
+                               | /* nothing */                 { $$ = NULL; }
 %%
 
 #include "repl_scanner.c"
index 01e5ac6efb03219eac7702627c0e2cced1bb3880..24195a5971979420dbc5274a2eac1f509b45a0a7 100644 (file)
@@ -16,6 +16,7 @@
 #include "postgres.h"
 
 #include "utils/builtins.h"
+#include "parser/scansup.h"
 
 /* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */
 #undef fprintf
@@ -48,7 +49,7 @@ static void addlitchar(unsigned char ychar);
 %option warn
 %option prefix="replication_yy"
 
-%x xq
+%x xq xd
 
 /* Extended quote
  * xqdouble implements embedded quote, ''''
@@ -57,12 +58,26 @@ xqstart                     {quote}
 xqdouble               {quote}{quote}
 xqinside               [^']+
 
+/* Double quote
+ * Allows embedded spaces and other special characters into identifiers.
+ */
+dquote                 \"
+xdstart                        {dquote}
+xdstop                 {dquote}
+xddouble               {dquote}{dquote}
+xdinside               [^"]+
+
 digit                  [0-9]+
 hexdigit               [0-9A-Za-z]+
 
 quote                  '
 quotestop              {quote}
 
+ident_start            [A-Za-z\200-\377_]
+ident_cont             [A-Za-z\200-\377_0-9\$]
+
+identifier             {ident_start}{ident_cont}*
+
 %%
 
 BASE_BACKUP                    { return K_BASE_BACKUP; }
@@ -74,9 +89,16 @@ PROGRESS                     { return K_PROGRESS; }
 WAL                    { return K_WAL; }
 TIMELINE                       { return K_TIMELINE; }
 START_REPLICATION      { return K_START_REPLICATION; }
+CREATE_REPLICATION_SLOT                { return K_CREATE_REPLICATION_SLOT; }
+DROP_REPLICATION_SLOT          { return K_DROP_REPLICATION_SLOT; }
 TIMELINE_HISTORY       { return K_TIMELINE_HISTORY; }
+PHYSICAL                       { return K_PHYSICAL; }
+SLOT                           { return K_SLOT; }
+
 ","                            { return ','; }
 ";"                            { return ';'; }
+"("                            { return '('; }
+")"                            { return ')'; }
 
 [\n]                   ;
 [\t]                   ;
@@ -100,20 +122,49 @@ TIMELINE_HISTORY  { return K_TIMELINE_HISTORY; }
                                        BEGIN(xq);
                                        startlit();
                                }
+
 <xq>{quotestop}        {
                                        yyless(1);
                                        BEGIN(INITIAL);
                                        yylval.str = litbufdup();
                                        return SCONST;
                                }
-<xq>{xqdouble} {
+
+<xq>{xqdouble} {
                                        addlitchar('\'');
                                }
+
 <xq>{xqinside}  {
                                        addlit(yytext, yyleng);
                                }
 
-<xq><<EOF>>            { yyerror("unterminated quoted string"); }
+{xdstart}              {
+                                       BEGIN(xd);
+                                       startlit();
+                               }
+
+<xd>{xdstop}   {
+                                       int len;
+                                       yyless(1);
+                                       BEGIN(INITIAL);
+                                       yylval.str = litbufdup();
+                                       len = strlen(yylval.str);
+                                       truncate_identifier(yylval.str, len, true);
+                                       return IDENT;
+                               }
+
+<xd>{xdinside}  {
+                                       addlit(yytext, yyleng);
+                               }
+
+{identifier}   {
+                                       int len = strlen(yytext);
+
+                                       yylval.str = downcase_truncate_identifier(yytext, len, true);
+                                       return IDENT;
+                               }
+
+<xq,xd><<EOF>> { yyerror("unterminated quoted string"); }
 
 
 <<EOF>>                        {
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
new file mode 100644 (file)
index 0000000..30aff5f
--- /dev/null
@@ -0,0 +1,1066 @@
+/*-------------------------------------------------------------------------
+ *
+ * slot.c
+ *        Replication slot management.
+ *
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *       src/backend/replication/slot.c
+ *
+ * NOTES
+ *
+ * Replication slots are used to keep state about replication streams
+ * originating from this cluster.  Their primary purpose is to prevent the
+ * premature removal of WAL or of old tuple versions in a manner that would
+ * interfere with replication; they also useful for monitoring purposes.
+ * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
+ * on standbys (to support cascading setups).  The requirement that slots be
+ * usable on standbys precludes storing them in the system catalogs.
+ *
+ * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
+ * directory. Inside that directory the state file will contain the slot's
+ * own data. Additional data can be stored alongside that file if required.
+ * While the server is running, the state data is also cached in memory for
+ * efficiency.
+ *
+ * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
+ * or free a slot. ReplicationSlotControlLock must be taken in shared mode
+ * to iterate over the slots, and in exclusive mode to change the in_use flag
+ * of a slot.  The remaining data in each slot is protected by its mutex.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <unistd.h>
+#include <sys/stat.h>
+
+#include "access/transam.h"
+#include "miscadmin.h"
+#include "replication/slot.h"
+#include "storage/fd.h"
+#include "storage/procarray.h"
+
+/*
+ * Replication slot on-disk data structure.
+ */
+typedef struct ReplicationSlotOnDisk
+{
+       /* first part of this struct needs to be version independent */
+
+       /* data not covered by checksum */
+       uint32          magic;
+       pg_crc32        checksum;
+
+       /* data covered by checksum */
+       uint32          version;
+       uint32          length;
+
+       ReplicationSlotPersistentData slotdata;
+} ReplicationSlotOnDisk;
+
+/* size of the part of the slot that is version independent */
+#define ReplicationSlotOnDiskConstantSize \
+       offsetof(ReplicationSlotOnDisk, slotdata)
+/* size of the slots that is not version indepenent */
+#define ReplicationSlotOnDiskDynamicSize \
+       sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
+
+#define SLOT_MAGIC             0x1051CA1               /* format identifier */
+#define SLOT_VERSION   1                               /* version for new files */
+
+/* Control array for replication slot management */
+ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
+
+/* My backend's replication slot in the shared memory array */
+ReplicationSlot *MyReplicationSlot = NULL;
+
+/* GUCs */
+int                    max_replication_slots = 0;      /* the maximum number of replication slots */
+
+/* internal persistency functions */
+static void RestoreSlotFromDisk(const char *name);
+static void CreateSlotOnDisk(ReplicationSlot *slot);
+static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
+
+/*
+ * Report shared-memory space needed by ReplicationSlotShmemInit.
+ */
+Size
+ReplicationSlotsShmemSize(void)
+{
+       Size            size = 0;
+
+       if (max_replication_slots == 0)
+               return size;
+
+       size = offsetof(ReplicationSlotCtlData, replication_slots);
+       size = add_size(size,
+                                       mul_size(max_replication_slots, sizeof(ReplicationSlot)));
+
+       return size;
+}
+
+/*
+ * Allocate and initialize walsender-related shared memory.
+ */
+void
+ReplicationSlotsShmemInit(void)
+{
+       bool            found;
+
+       if (max_replication_slots == 0)
+               return;
+
+       ReplicationSlotCtl = (ReplicationSlotCtlData *)
+               ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
+                                               &found);
+
+       if (!found)
+       {
+               int                     i;
+
+               /* First time through, so initialize */
+               MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
+
+               for (i = 0; i < max_replication_slots; i++)
+               {
+                       ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
+
+                       /* everything else is zeroed by the memset above */
+                       SpinLockInit(&slot->mutex);
+                       slot->io_in_progress_lock = LWLockAssign();
+               }
+       }
+}
+
+/*
+ * Check whether the passed slot name is valid and report errors at elevel.
+ *
+ * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
+ * the name to be uses as a directory name on every supported OS.
+ *
+ * Returns whether the directory name is valid or not if elevel < ERROR.
+ */
+bool
+ReplicationSlotValidateName(const char *name, int elevel)
+{
+       const char *cp;
+
+       if (strlen(name) == 0)
+       {
+               ereport(elevel,
+                               (errcode(ERRCODE_INVALID_NAME),
+                                errmsg("replication slot name \"%s\" is too short",
+                                               name)));
+               return false;
+       }
+
+       if (strlen(name) >= NAMEDATALEN)
+       {
+               ereport(elevel,
+                               (errcode(ERRCODE_NAME_TOO_LONG),
+                                errmsg("replication slot name \"%s\" is too long",
+                                               name)));
+               return false;
+       }
+
+       for (cp = name; *cp; cp++)
+       {
+               if (!((*cp >= 'a' && *cp <= 'z')
+                         || (*cp >= '0' && *cp <= '9')
+                         || (*cp == '_')))
+               {
+                       ereport(elevel,
+                                       (errcode(ERRCODE_INVALID_NAME),
+                                        errmsg("replication slot name \"%s\" contains invalid character",
+                                                       name),
+                                        errhint("Replication slot names may only contain letters, numbers and the underscore character.")));
+                       return false;
+               }
+       }
+       return true;
+}
+
+/*
+ * Create a new replication slot and mark it as used by this backend.
+ *
+ * name: Name of the slot
+ * db_specific: changeset extraction is db specific, if the slot is going to
+ *     be used for that pass true, otherwise false.
+ */
+void
+ReplicationSlotCreate(const char *name, bool db_specific)
+{
+       ReplicationSlot *slot = NULL;
+       int                     i;
+
+       Assert(MyReplicationSlot == NULL);
+
+       ReplicationSlotValidateName(name, ERROR);
+
+       /*
+        * If some other backend ran this code currently with us, we'd likely
+        * both allocate the same slot, and that would be bad.  We'd also be
+        * at risk of missing a name collision.  Also, we don't want to try to
+        * create a new slot while somebody's busy cleaning up an old one, because
+        * we might both be monkeying with the same directory.
+        */
+       LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
+
+       /*
+        * Check for name collision, and identify an allocatable slot.  We need
+        * to hold ReplicationSlotControlLock in shared mode for this, so that
+        * nobody else can change the in_use flags while we're looking at them.
+        */
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       for (i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+               if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DUPLICATE_OBJECT),
+                                        errmsg("replication slot \"%s\" already exists", name)));
+               if (!s->in_use && slot == NULL)
+                       slot = s;
+       }
+       LWLockRelease(ReplicationSlotControlLock);
+
+       /* If all slots are in use, we're out of luck. */
+       if (slot == NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+                                errmsg("all replication slots are in use"),
+                                errhint("Free one or increase max_replication_slots.")));
+
+       /*
+        * Since this slot is not in use, nobody should be looking at any
+        * part of it other than the in_use field unless they're trying to allocate
+        * it.  And since we hold ReplicationSlotAllocationLock, nobody except us
+        * can be doing that.  So it's safe to initialize the slot.
+        */
+       Assert(!slot->in_use);
+       Assert(!slot->active);
+       slot->data.xmin = InvalidTransactionId;
+       slot->effective_xmin = InvalidTransactionId;
+       strncpy(NameStr(slot->data.name), name, NAMEDATALEN);
+       NameStr(slot->data.name)[NAMEDATALEN - 1] = '\0';
+       slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
+       slot->data.restart_lsn = InvalidXLogRecPtr;
+
+       /*
+        * Create the slot on disk.  We haven't actually marked the slot allocated
+        * yet, so no special cleanup is required if this errors out.
+        */
+       CreateSlotOnDisk(slot);
+
+       /*
+        * We need to briefly prevent any other backend from iterating over the
+        * slots while we flip the in_use flag. We also need to set the active
+        * flag while holding the ControlLock as otherwise a concurrent
+        * SlotAcquire() could acquire the slot as well.
+        */
+       LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+
+       slot->in_use = true;
+
+       /* We can now mark the slot active, and that makes it our slot. */
+       {
+               volatile ReplicationSlot *vslot = slot;
+
+               SpinLockAcquire(&slot->mutex);
+               Assert(!vslot->active);
+               vslot->active = true;
+               SpinLockRelease(&slot->mutex);
+               MyReplicationSlot = slot;
+       }
+
+       LWLockRelease(ReplicationSlotControlLock);
+
+       /*
+        * Now that the slot has been marked as in_use and in_active, it's safe to
+        * let somebody else try to allocate a slot.
+        */
+       LWLockRelease(ReplicationSlotAllocationLock);
+}
+
+/*
+ * Find an previously created slot and mark it as used by this backend.
+ */
+void
+ReplicationSlotAcquire(const char *name)
+{
+       ReplicationSlot *slot = NULL;
+       int                     i;
+       bool            active = false;
+
+       Assert(MyReplicationSlot == NULL);
+
+       ReplicationSlotValidateName(name, ERROR);
+
+       /* Search for the named slot and mark it active if we find it. */
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       for (i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+               if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+               {
+                       volatile ReplicationSlot *vslot = s;
+
+                       SpinLockAcquire(&s->mutex);
+                       active = vslot->active;
+                       vslot->active = true;
+                       SpinLockRelease(&s->mutex);
+                       slot = s;
+                       break;
+               }
+       }
+       LWLockRelease(ReplicationSlotControlLock);
+
+       /* If we did not find the slot or it was already active, error out. */
+       if (slot == NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                errmsg("replication slot \"%s\" does not exist", name)));
+       if (active)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_IN_USE),
+                                errmsg("replication slot \"%s\" is already active", name)));
+
+       /* We made this slot active, so it's ours now. */
+       MyReplicationSlot = slot;
+}
+
+/*
+ * Release a replication slot, this or another backend can ReAcquire it
+ * later. Resources this slot requires will be preserved.
+ */
+void
+ReplicationSlotRelease(void)
+{
+       ReplicationSlot *slot = MyReplicationSlot;
+
+       Assert(slot != NULL && slot->active);
+
+       /* Mark slot inactive.  We're not freeing it, just disconnecting. */
+       {
+               volatile ReplicationSlot *vslot = slot;
+               SpinLockAcquire(&slot->mutex);
+               vslot->active = false;
+               SpinLockRelease(&slot->mutex);
+               MyReplicationSlot = NULL;
+       }
+}
+
+/*
+ * Permanently drop replication slot identified by the passed in name.
+ */
+void
+ReplicationSlotDrop(const char *name)
+{
+       ReplicationSlot *slot = NULL;
+       int                     i;
+       bool            active;
+       char            path[MAXPGPATH];
+       char            tmppath[MAXPGPATH];
+
+       ReplicationSlotValidateName(name, ERROR);
+
+       /*
+        * If some other backend ran this code currently with us, we might both
+        * try to free the same slot at the same time.  Or we might try to delete
+        * a slot with a certain name while someone else was trying to create a
+        * slot with the same name.
+        */
+       LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
+
+       /* Search for the named slot and mark it active if we find it. */
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       for (i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+               if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+               {
+                       volatile ReplicationSlot *vslot = s;
+
+                       SpinLockAcquire(&s->mutex);
+                       active = vslot->active;
+                       vslot->active = true;
+                       SpinLockRelease(&s->mutex);
+                       slot = s;
+                       break;
+               }
+       }
+       LWLockRelease(ReplicationSlotControlLock);
+
+       /* If we did not find the slot or it was already active, error out. */
+       if (slot == NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                errmsg("replication slot \"%s\" does not exist", name)));
+       if (active)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_IN_USE),
+                                errmsg("replication slot \"%s\" is already active", name)));
+
+       /* Generate pathnames. */
+       sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
+       sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
+
+       /*
+        * Rename the slot directory on disk, so that we'll no longer recognize
+        * this as a valid slot.  Note that if this fails, we've got to mark the
+        * slot inactive again before bailing out.
+        */
+       if (rename(path, tmppath) != 0)
+       {
+               volatile ReplicationSlot *vslot = slot;
+
+               SpinLockAcquire(&slot->mutex);
+               vslot->active = false;
+               SpinLockRelease(&slot->mutex);
+
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not rename \"%s\" to \"%s\": %m",
+                                               path, tmppath)));
+       }
+
+       /*
+        * We need to fsync() the directory we just renamed and its parent to make
+        * sure that our changes are on disk in a crash-safe fashion.  If fsync()
+        * fails, we can't be sure whether the changes are on disk or not.  For
+        * now, we handle that by panicking; StartupReplicationSlots() will
+        * try to straighten it out after restart.
+        */
+       START_CRIT_SECTION();
+       fsync_fname(tmppath, true);
+       fsync_fname("pg_replslot", true);
+       END_CRIT_SECTION();
+
+       /*
+        * The slot is definitely gone.  Lock out concurrent scans of the array
+        * long enough to kill it.  It's OK to clear the active flag here without
+        * grabbing the mutex because nobody else can be scanning the array here,
+        * and nobody can be attached to this slot and thus access it without
+        * scanning the array.
+        */
+       LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+       slot->active = false;
+       slot->in_use = false;
+       LWLockRelease(ReplicationSlotControlLock);
+
+       /*
+        * Slot is dead and doesn't prevent resource removal anymore, recompute
+        * limits.
+        */
+       ReplicationSlotsComputeRequiredXmin();
+       ReplicationSlotsComputeRequiredLSN();
+
+       /*
+        * If removing the directory fails, the worst thing that will happen is
+        * that the user won't be able to create a new slot with the same name
+        * until the next server restart.  We warn about it, but that's all.
+        */
+       if (!rmtree(tmppath, true))
+               ereport(WARNING,
+                               (errcode_for_file_access(),
+                                errmsg("could not remove directory \"%s\"", tmppath)));
+
+       /*
+        * We release this at the very end, so that nobody starts trying to create
+        * a slot while we're still cleaning up the detritus of the old one.
+        */
+       LWLockRelease(ReplicationSlotAllocationLock);
+}
+
+/*
+ * Serialize the currently acquired slot's state from memory to disk, thereby
+ * guaranteeing the current state will survive a crash.
+ */
+void
+ReplicationSlotSave(void)
+{
+       char            path[MAXPGPATH];
+
+       Assert(MyReplicationSlot != NULL);
+
+       sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
+       SaveSlotToPath(MyReplicationSlot, path, ERROR);
+}
+
+/*
+ * Signal that it would be useful if the currently acquired slot would be
+ * flushed out to disk.
+ *
+ * Note that the actual flush to disk can be delayed for a long time, if
+ * required for correctness explicitly do a ReplicationSlotSave().
+ */
+void
+ReplicationSlotMarkDirty(void)
+{
+       Assert(MyReplicationSlot != NULL);
+
+       {
+               volatile ReplicationSlot *vslot = MyReplicationSlot;
+
+               SpinLockAcquire(&vslot->mutex);
+               MyReplicationSlot->just_dirtied = true;
+               MyReplicationSlot->dirty = true;
+               SpinLockRelease(&vslot->mutex);
+       }
+}
+
+/*
+ * Compute the oldest xmin across all slots and store it in the ProcArray.
+ */
+void
+ReplicationSlotsComputeRequiredXmin(void)
+{
+       int                     i;
+       TransactionId agg_xmin = InvalidTransactionId;
+
+       Assert(ReplicationSlotCtl != NULL);
+
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       for (i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+               TransactionId   effective_xmin;
+
+               if (!s->in_use)
+                       continue;
+
+               {
+                       volatile ReplicationSlot *vslot = s;
+
+                       SpinLockAcquire(&s->mutex);
+                       effective_xmin = vslot->effective_xmin;
+                       SpinLockRelease(&s->mutex);
+               }
+
+               /* check the data xmin */
+               if (TransactionIdIsValid(effective_xmin) &&
+                       (!TransactionIdIsValid(agg_xmin) ||
+                        TransactionIdPrecedes(effective_xmin, agg_xmin)))
+                       agg_xmin = effective_xmin;
+       }
+       LWLockRelease(ReplicationSlotControlLock);
+
+       ProcArraySetReplicationSlotXmin(agg_xmin);
+}
+
+/*
+ * Compute the oldest restart LSN across all slots and inform xlog module.
+ */
+void
+ReplicationSlotsComputeRequiredLSN(void)
+{
+       int                     i;
+       XLogRecPtr  min_required = InvalidXLogRecPtr;
+
+       Assert(ReplicationSlotCtl != NULL);
+
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       for (i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+               XLogRecPtr restart_lsn;
+
+               if (!s->in_use)
+                       continue;
+
+               {
+                       volatile ReplicationSlot *vslot = s;
+
+                       SpinLockAcquire(&s->mutex);
+                       restart_lsn = vslot->data.restart_lsn;
+                       SpinLockRelease(&s->mutex);
+               }
+
+               if (restart_lsn != InvalidXLogRecPtr &&
+                       (min_required == InvalidXLogRecPtr ||
+                        restart_lsn < min_required))
+                       min_required = restart_lsn;
+       }
+       LWLockRelease(ReplicationSlotControlLock);
+
+       XLogSetReplicationSlotMinimumLSN(min_required);
+}
+
+/*
+ * Check whether the server's configuration supports using replication
+ * slots.
+ */
+void
+CheckSlotRequirements(void)
+{
+       if (max_replication_slots == 0)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                (errmsg("replication slots can only be used if max_replication_slots > 0"))));
+
+       if (wal_level < WAL_LEVEL_ARCHIVE)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("replication slots can only be used if wal_level >= archive")));
+}
+
+/*
+ * Returns whether the string `str' has the postfix `end'.
+ */
+static bool
+string_endswith(const char *str, const char *end)
+{
+       size_t slen = strlen(str);
+       size_t elen = strlen(end);
+
+       /* can't be a postfix if longer */
+       if (elen > slen)
+               return false;
+
+       /* compare the end of the strings */
+       str += slen - elen;
+       return strcmp(str, end) == 0;
+}
+
+/*
+ * Flush all replication slots to disk.
+ *
+ * This needn't actually be part of a checkpoint, but it's a convenient
+ * location.
+ */
+void
+CheckPointReplicationSlots(void)
+{
+       int                     i;
+
+       ereport(DEBUG1,
+                       (errmsg("performing replication slot checkpoint")));
+
+       /*
+        * Prevent any slot from being created/dropped while we're active. As we
+        * explicitly do *not* want to block iterating over replication_slots or
+        * acquiring a slot we cannot take the control lock - but that's OK,
+        * because holding ReplicationSlotAllocationLock is strictly stronger,
+        * and enough to guarantee that nobody can change the in_use bits on us.
+        */
+       LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
+
+       for (i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+               char            path[MAXPGPATH];
+
+               if (!s->in_use)
+                       continue;
+
+               /* save the slot to disk, locking is handled in SaveSlotToPath() */
+               sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
+               SaveSlotToPath(s, path, LOG);
+       }
+       LWLockRelease(ReplicationSlotAllocationLock);
+}
+
+/*
+ * Load all replication slots from disk into memory at server startup. This
+ * needs to be run before we start crash recovery.
+ */
+void
+StartupReplicationSlots(XLogRecPtr checkPointRedo)
+{
+       DIR                *replication_dir;
+       struct dirent *replication_de;
+
+       ereport(DEBUG1,
+                       (errmsg("starting up replication slots")));
+
+       /* restore all slots by iterating over all on-disk entries */
+       replication_dir = AllocateDir("pg_replslot");
+       while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
+       {
+               struct stat     statbuf;
+               char            path[MAXPGPATH];
+
+               if (strcmp(replication_de->d_name, ".") == 0 ||
+                       strcmp(replication_de->d_name, "..") == 0)
+                       continue;
+
+               snprintf(path, MAXPGPATH, "pg_replslot/%s", replication_de->d_name);
+
+               /* we're only creating directories here, skip if it's not our's */
+               if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
+                       continue;
+
+               /* we crashed while a slot was being setup or deleted, clean up */
+               if (string_endswith(replication_de->d_name, ".tmp"))
+               {
+                       if (!rmtree(path, true))
+                       {
+                               ereport(WARNING,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not remove directory \"%s\"", path)));
+                               continue;
+                       }
+                       fsync_fname("pg_replslot", true);
+                       continue;
+               }
+
+               /* looks like a slot in a normal state, restore */
+               RestoreSlotFromDisk(replication_de->d_name);
+       }
+       FreeDir(replication_dir);
+
+       /* currently no slots exist, we're done. */
+       if (max_replication_slots <= 0)
+               return;
+
+       /* Now that we have recovered all the data, compute replication xmin */
+       ReplicationSlotsComputeRequiredXmin();
+       ReplicationSlotsComputeRequiredLSN();
+}
+
+/* ----
+ * Manipulation of ondisk state of replication slots
+ *
+ * NB: none of the routines below should take any notice whether a slot is the
+ * current one or not, that's all handled a layer above.
+ * ----
+ */
+static void
+CreateSlotOnDisk(ReplicationSlot *slot)
+{
+       char            tmppath[MAXPGPATH];
+       char            path[MAXPGPATH];
+       struct stat     st;
+
+       /*
+        * No need to take out the io_in_progress_lock, nobody else can see this
+        * slot yet, so nobody else wil write. We're reusing SaveSlotToPath which
+        * takes out the lock, if we'd take the lock here, we'd deadlock.
+        */
+
+       sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
+       sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
+
+       /*
+        * It's just barely possible that some previous effort to create or
+        * drop a slot with this name left a temp directory lying around.
+        * If that seems to be the case, try to remove it.  If the rmtree()
+        * fails, we'll error out at the mkdir() below, so we don't bother
+        * checking success.
+        */
+       if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
+               rmtree(tmppath, true);
+
+       /* Create and fsync the temporary slot directory. */
+       if (mkdir(tmppath, S_IRWXU) < 0)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not create directory \"%s\": %m",
+                                               tmppath)));
+       fsync_fname(tmppath, true);
+
+       /* Write the actual state file. */
+       slot->dirty = true; /* signal that we really need to write */
+       SaveSlotToPath(slot, tmppath, ERROR);
+
+       /* Rename the directory into place. */
+       if (rename(tmppath, path) != 0)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not rename file \"%s\" to \"%s\": %m",
+                                               tmppath, path)));
+
+       /*
+        * If we'd now fail - really unlikely - we wouldn't know wether this slot
+        * would persist after an OS crash or not - so, force a restart. The
+        * restart would try to fysnc this again till it works.
+        */
+       START_CRIT_SECTION();
+
+       fsync_fname(path, true);
+       fsync_fname("pg_replslot", true);
+
+       END_CRIT_SECTION();
+}
+
+/*
+ * Shared functionality between saving and creating a replication slot.
+ */
+static void
+SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
+{
+       char            tmppath[MAXPGPATH];
+       char            path[MAXPGPATH];
+       int                     fd;
+       ReplicationSlotOnDisk cp;
+       bool            was_dirty;
+
+       /* first check whether there's something to write out */
+       {
+               volatile ReplicationSlot *vslot = slot;
+
+               SpinLockAcquire(&vslot->mutex);
+               was_dirty = vslot->dirty;
+               vslot->just_dirtied = false;
+               SpinLockRelease(&vslot->mutex);
+       }
+
+       /* and don't do anything if there's nothing to write */
+       if (!was_dirty)
+               return;
+
+       LWLockAcquire(slot->io_in_progress_lock, LW_EXCLUSIVE);
+
+       /* silence valgrind :( */
+       memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
+
+       sprintf(tmppath, "%s/state.tmp", dir);
+       sprintf(path, "%s/state", dir);
+
+       fd = OpenTransientFile(tmppath,
+                                                  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+                                                  S_IRUSR | S_IWUSR);
+       if (fd < 0)
+       {
+               ereport(elevel,
+                               (errcode_for_file_access(),
+                                errmsg("could not create file \"%s\": %m",
+                                               tmppath)));
+               return;
+       }
+
+       cp.magic = SLOT_MAGIC;
+       INIT_CRC32(cp.checksum);
+       cp.version = 1;
+       cp.length = ReplicationSlotOnDiskDynamicSize;
+
+       SpinLockAcquire(&slot->mutex);
+
+       memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
+
+       SpinLockRelease(&slot->mutex);
+
+       COMP_CRC32(cp.checksum,
+                          (char *)(&cp) + ReplicationSlotOnDiskConstantSize,
+                          ReplicationSlotOnDiskDynamicSize);
+
+       if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
+       {
+               int save_errno = errno;
+               CloseTransientFile(fd);
+               errno = save_errno;
+               ereport(elevel,
+                               (errcode_for_file_access(),
+                                errmsg("could not write to file \"%s\": %m",
+                                               tmppath)));
+               return;
+       }
+
+       /* fsync the temporary file */
+       if (pg_fsync(fd) != 0)
+       {
+               int save_errno = errno;
+               CloseTransientFile(fd);
+               errno = save_errno;
+               ereport(elevel,
+                               (errcode_for_file_access(),
+                                errmsg("could not fsync file \"%s\": %m",
+                                               tmppath)));
+               return;
+       }
+
+       CloseTransientFile(fd);
+
+       /* rename to permanent file, fsync file and directory */
+       if (rename(tmppath, path) != 0)
+       {
+               ereport(elevel,
+                               (errcode_for_file_access(),
+                                errmsg("could not rename \"%s\" to \"%s\": %m",
+                                               tmppath, path)));
+               return;
+       }
+
+       /* Check CreateSlot() for the reasoning of using a crit. section. */
+       START_CRIT_SECTION();
+
+       fsync_fname(path, false);
+       fsync_fname((char *) dir, true);
+       fsync_fname("pg_replslot", true);
+
+       END_CRIT_SECTION();
+
+       /*
+        * Successfully wrote, unset dirty bit, unless somebody dirtied again
+        * already.
+        */
+       {
+               volatile ReplicationSlot *vslot = slot;
+
+               SpinLockAcquire(&vslot->mutex);
+               if (!vslot->just_dirtied)
+                       vslot->dirty = false;
+               SpinLockRelease(&vslot->mutex);
+       }
+
+       LWLockRelease(slot->io_in_progress_lock);
+}
+
+/*
+ * Load a single slot from disk into memory.
+ */
+static void
+RestoreSlotFromDisk(const char *name)
+{
+       ReplicationSlotOnDisk cp;
+       int                     i;
+       char            path[MAXPGPATH];
+       int                     fd;
+       bool            restored = false;
+       int                     readBytes;
+       pg_crc32        checksum;
+
+       /* no need to lock here, no concurrent access allowed yet */
+
+       /* delete temp file if it exists */
+       sprintf(path, "pg_replslot/%s/state.tmp", name);
+       if (unlink(path) < 0 && errno != ENOENT)
+               ereport(PANIC,
+                               (errcode_for_file_access(),
+                                errmsg("could not unlink file \"%s\": %m", path)));
+
+       sprintf(path, "pg_replslot/%s/state", name);
+
+       elog(DEBUG1, "restoring replication slot from \"%s\"", path);
+
+       fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
+
+       /*
+        * We do not need to handle this as we are rename()ing the directory into
+        * place only after we fsync()ed the state file.
+        */
+       if (fd < 0)
+               ereport(PANIC,
+                               (errcode_for_file_access(),
+                                errmsg("could not open file \"%s\": %m", path)));
+
+       /*
+        * Sync state file before we're reading from it. We might have crashed
+        * while it wasn't synced yet and we shouldn't continue on that basis.
+        */
+       if (pg_fsync(fd) != 0)
+       {
+               CloseTransientFile(fd);
+               ereport(PANIC,
+                               (errcode_for_file_access(),
+                                errmsg("could not fsync file \"%s\": %m",
+                                               path)));
+       }
+
+       /* Also sync the parent directory */
+       START_CRIT_SECTION();
+       fsync_fname(path, true);
+       END_CRIT_SECTION();
+
+       /* read part of statefile that's guaranteed to be version independent */
+       readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
+       if (readBytes != ReplicationSlotOnDiskConstantSize)
+       {
+               int                     saved_errno = errno;
+
+               CloseTransientFile(fd);
+               errno = saved_errno;
+               ereport(PANIC,
+                               (errcode_for_file_access(),
+                                errmsg("could not read file \"%s\", read %d of %u: %m",
+                                               path, readBytes,
+                                               (uint32) ReplicationSlotOnDiskConstantSize)));
+       }
+
+       /* verify magic */
+       if (cp.magic != SLOT_MAGIC)
+               ereport(PANIC,
+                               (errcode_for_file_access(),
+                                errmsg("replication slot file \"%s\" has wrong magic %u instead of %u",
+                                               path, cp.magic, SLOT_MAGIC)));
+
+       /* verify version */
+       if (cp.version != SLOT_VERSION)
+               ereport(PANIC,
+                               (errcode_for_file_access(),
+                                errmsg("replication slot file \"%s\" has unsupported version %u",
+                                               path, cp.version)));
+
+       /* boundary check on length */
+       if (cp.length != ReplicationSlotOnDiskDynamicSize)
+               ereport(PANIC,
+                               (errcode_for_file_access(),
+                                errmsg("replication slot file \"%s\" has corrupted length %u",
+                                               path, cp.length)));
+
+       /* Now that we know the size, read the entire file */
+       readBytes = read(fd,
+                                        (char *)&cp + ReplicationSlotOnDiskConstantSize,
+                                        cp.length);
+       if (readBytes != cp.length)
+       {
+               int                     saved_errno = errno;
+
+               CloseTransientFile(fd);
+               errno = saved_errno;
+               ereport(PANIC,
+                               (errcode_for_file_access(),
+                                errmsg("could not read file \"%s\", read %d of %u: %m",
+                                               path, readBytes, cp.length)));
+       }
+
+       CloseTransientFile(fd);
+
+       /* now verify the CRC32 */
+       INIT_CRC32(checksum);
+       COMP_CRC32(checksum,
+                          (char *)&cp + ReplicationSlotOnDiskConstantSize,
+                          ReplicationSlotOnDiskDynamicSize);
+
+       if (!EQ_CRC32(checksum, cp.checksum))
+               ereport(PANIC,
+                               (errmsg("replication slot file %s: checksum mismatch, is %u, should be %u",
+                                               path, checksum, cp.checksum)));
+
+       /* nothing can be active yet, don't lock anything */
+       for (i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *slot;
+
+               slot = &ReplicationSlotCtl->replication_slots[i];
+
+               if (slot->in_use)
+                       continue;
+
+               /* restore the entire set of persistent data */
+               memcpy(&slot->data, &cp.slotdata,
+                          sizeof(ReplicationSlotPersistentData));
+
+               /* initialize in memory state */
+               slot->effective_xmin = cp.slotdata.xmin;
+               slot->in_use = true;
+               slot->active = false;
+
+               restored = true;
+               break;
+       }
+
+       if (!restored)
+               ereport(PANIC,
+                               (errmsg("too many replication slots active before shutdown"),
+                                errhint("Increase max_replication_slots and try again.")));
+}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
new file mode 100644 (file)
index 0000000..98a860e
--- /dev/null
@@ -0,0 +1,193 @@
+/*-------------------------------------------------------------------------
+ *
+ * slotfuncs.c
+ *        Support functions for replication slots
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *       src/backend/replication/slotfuncs.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "access/htup_details.h"
+#include "utils/builtins.h"
+#include "replication/slot.h"
+
+Datum          pg_create_physical_replication_slot(PG_FUNCTION_ARGS);
+Datum          pg_drop_replication_slot(PG_FUNCTION_ARGS);
+
+static void
+check_permissions(void)
+{
+       if (!superuser() && !has_rolreplication(GetUserId()))
+               ereport(ERROR,
+                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                (errmsg("must be superuser or replication role to use replication slots"))));
+}
+
+/*
+ * SQL function for creating a new physical (streaming replication)
+ * replication slot.
+ */
+Datum
+pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
+{
+       Name            name = PG_GETARG_NAME(0);
+       Datum           values[2];
+       bool            nulls[2];
+       TupleDesc       tupdesc;
+       HeapTuple       tuple;
+       Datum           result;
+
+       check_permissions();
+
+       CheckSlotRequirements();
+
+       if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+               elog(ERROR, "return type must be a row type");
+
+       /* acquire replication slot, this will check for conflicting names*/
+       ReplicationSlotCreate(NameStr(*name), false);
+
+       values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name));
+
+       nulls[0] = false;
+       nulls[1] = true;
+
+       tuple = heap_form_tuple(tupdesc, values, nulls);
+       result = HeapTupleGetDatum(tuple);
+
+       ReplicationSlotRelease();
+
+       PG_RETURN_DATUM(result);
+}
+
+/*
+ * SQL function for dropping a replication slot.
+ */
+Datum
+pg_drop_replication_slot(PG_FUNCTION_ARGS)
+{
+       Name            name = PG_GETARG_NAME(0);
+
+       check_permissions();
+
+       CheckSlotRequirements();
+
+       ReplicationSlotDrop(NameStr(*name));
+
+       PG_RETURN_VOID();
+}
+
+/*
+ * pg_get_replication_slots - SQL SRF showing active replication slots.
+ */
+Datum
+pg_get_replication_slots(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_REPLICATION_SLOTS_COLS 6
+       ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+       TupleDesc       tupdesc;
+       Tuplestorestate *tupstore;
+       MemoryContext per_query_ctx;
+       MemoryContext oldcontext;
+       int                     slotno;
+
+       /* check to see if caller supports us returning a tuplestore */
+       if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("set-valued function called in context that cannot accept a set")));
+       if (!(rsinfo->allowedModes & SFRM_Materialize))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("materialize mode required, but it is not " \
+                                               "allowed in this context")));
+
+       /* Build a tuple descriptor for our result type */
+       if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+               elog(ERROR, "return type must be a row type");
+
+       /*
+        * We don't require any special permission to see this function's data
+        * because nothing should be sensitive. The most critical being the slot
+        * name, which shouldn't contain anything particularly sensitive.
+        */
+
+       per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+       oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+       tupstore = tuplestore_begin_heap(true, false, work_mem);
+       rsinfo->returnMode = SFRM_Materialize;
+       rsinfo->setResult = tupstore;
+       rsinfo->setDesc = tupdesc;
+
+       MemoryContextSwitchTo(oldcontext);
+
+       for (slotno = 0; slotno < max_replication_slots; slotno++)
+       {
+               ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
+               Datum           values[PG_STAT_GET_REPLICATION_SLOTS_COLS];
+               bool            nulls[PG_STAT_GET_REPLICATION_SLOTS_COLS];
+
+               TransactionId xmin;
+               XLogRecPtr      restart_lsn;
+               bool            active;
+               Oid                     database;
+               const char *slot_name;
+
+               char            restart_lsn_s[MAXFNAMELEN];
+               int                     i;
+
+               SpinLockAcquire(&slot->mutex);
+               if (!slot->in_use)
+               {
+                       SpinLockRelease(&slot->mutex);
+                       continue;
+               }
+               else
+               {
+                       xmin = slot->data.xmin;
+                       database = slot->data.database;
+                       restart_lsn = slot->data.restart_lsn;
+                       slot_name = pstrdup(NameStr(slot->data.name));
+
+                       active = slot->active;
+               }
+               SpinLockRelease(&slot->mutex);
+
+               memset(nulls, 0, sizeof(nulls));
+
+               snprintf(restart_lsn_s, sizeof(restart_lsn_s), "%X/%X",
+                                (uint32) (restart_lsn >> 32), (uint32) restart_lsn);
+
+               i = 0;
+               values[i++] = CStringGetTextDatum(slot_name);
+               if (database == InvalidOid)
+                       values[i++] = CStringGetTextDatum("physical");
+               else
+                       values[i++] = CStringGetTextDatum("logical");
+               values[i++] = database;
+               values[i++] = BoolGetDatum(active);
+               if (xmin != InvalidTransactionId)
+                       values[i++] = TransactionIdGetDatum(xmin);
+               else
+                       nulls[i++] = true;
+               if (restart_lsn != InvalidTransactionId)
+                       values[i++] = CStringGetTextDatum(restart_lsn_s);
+               else
+                       nulls[i++] = true;
+
+               tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+       }
+
+       tuplestore_donestoring(tupstore);
+
+       return (Datum) 0;
+}
index 1fbd33ef61b004d3940a5e17f80007f051e28084..cc3d775307406e95a4d07f59866a2a74fffdd04d 100644 (file)
@@ -187,6 +187,7 @@ void
 WalReceiverMain(void)
 {
        char            conninfo[MAXCONNINFO];
+       char            slotname[NAMEDATALEN];
        XLogRecPtr      startpoint;
        TimeLineID      startpointTLI;
        TimeLineID      primaryTLI;
@@ -241,6 +242,7 @@ WalReceiverMain(void)
 
        /* Fetch information required to start streaming */
        strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+       strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
        startpoint = walrcv->receiveStart;
        startpointTLI = walrcv->receiveStartTLI;
 
@@ -355,7 +357,8 @@ WalReceiverMain(void)
                 * on the new timeline.
                 */
                ThisTimeLineID = startpointTLI;
-               if (walrcv_startstreaming(startpointTLI, startpoint))
+               if (walrcv_startstreaming(startpointTLI, startpoint,
+                                                                 slotname[0] != '\0' ? slotname : NULL))
                {
                        bool            endofwal = false;
 
index cc96d7c2f8a967708238a3f9a5d6342a473316e1..acadec57f5a77c5fab39c179b5dad4f2c505af7b 100644 (file)
@@ -219,11 +219,13 @@ ShutdownWalRcv(void)
 /*
  * Request postmaster to start walreceiver.
  *
- * recptr indicates the position where streaming should begin, and conninfo
- * is a libpq connection string to use.
+ * recptr indicates the position where streaming should begin, conninfo
+ * is a libpq connection string to use, and slotname is, optionally, the name
+ * of a replication slot to acquire.
  */
 void
-RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
+RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
+                                        const char *slotname)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalRcvData *walrcv = WalRcv;
@@ -250,6 +252,11 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
        else
                walrcv->conninfo[0] = '\0';
 
+       if (slotname != NULL)
+               strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
+       else
+               walrcv->slotname[0] = '\0';
+
        if (walrcv->walRcvState == WALRCV_STOPPED)
        {
                launch = true;
index 652487e3de776d1f0153d2da656fe1597267202b..119a920af215b1c600b2239dd534111d41b6f31b 100644 (file)
@@ -53,6 +53,7 @@
 #include "miscadmin.h"
 #include "nodes/replnodes.h"
 #include "replication/basebackup.h"
+#include "replication/slot.h"
 #include "replication/syncrep.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
@@ -218,12 +219,17 @@ InitWalSender(void)
 void
 WalSndErrorCleanup()
 {
+       LWLockReleaseAll();
+
        if (sendFile >= 0)
        {
                close(sendFile);
                sendFile = -1;
        }
 
+       if (MyReplicationSlot != NULL)
+               ReplicationSlotRelease();
+
        replication_active = false;
        if (walsender_ready_to_stop)
                proc_exit(0);
@@ -421,6 +427,15 @@ StartReplication(StartReplicationCmd *cmd)
         * written at wal_level='minimal'.
         */
 
+       if (cmd->slotname)
+       {
+               ReplicationSlotAcquire(cmd->slotname);
+               if (MyReplicationSlot->data.database != InvalidOid)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                        (errmsg("cannot use a replication slot created for changeset extraction for streaming replication"))));
+       }
+
        /*
         * Select the timeline. If it was given explicitly by the client, use
         * that. Otherwise use the timeline of the last replayed record, which is
@@ -565,6 +580,9 @@ StartReplication(StartReplicationCmd *cmd)
                Assert(streamingDoneSending && streamingDoneReceiving);
        }
 
+       if (cmd->slotname)
+               ReplicationSlotRelease();
+
        /*
         * Copy is finished now. Send a single-row result set indicating the next
         * timeline.
@@ -622,6 +640,75 @@ StartReplication(StartReplicationCmd *cmd)
        pq_puttextmessage('C', "START_STREAMING");
 }
 
+/*
+ * Create a new replication slot.
+ */
+static void
+CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
+{
+       const char *slot_name;
+       StringInfoData buf;
+
+       Assert(!MyReplicationSlot);
+
+       /* setup state for XLogReadPage */
+       sendTimeLineIsHistoric = false;
+       sendTimeLine = ThisTimeLineID;
+
+       ReplicationSlotCreate(cmd->slotname, cmd->kind == REPLICATION_KIND_LOGICAL);
+
+       initStringInfo(&output_message);
+
+       slot_name = NameStr(MyReplicationSlot->data.name);
+
+       /*
+        * It may seem somewhat pointless to send back the same slot name the
+        * client just requested and nothing else, but logical replication
+        * will add more fields here.  (We could consider removing the slot
+        * name from what's sent back, though, since the client has specified
+        * that.)
+        */
+
+       pq_beginmessage(&buf, 'T');
+       pq_sendint(&buf, 1, 2);         /* 1 field */
+
+       /* first field: slot name */
+       pq_sendstring(&buf, "slot_name");       /* col name */
+       pq_sendint(&buf, 0, 4);         /* table oid */
+       pq_sendint(&buf, 0, 2);         /* attnum */
+       pq_sendint(&buf, TEXTOID, 4);           /* type oid */
+       pq_sendint(&buf, -1, 2);        /* typlen */
+       pq_sendint(&buf, 0, 4);         /* typmod */
+       pq_sendint(&buf, 0, 2);         /* format code */
+
+       pq_endmessage(&buf);
+
+       /* Send a DataRow message */
+       pq_beginmessage(&buf, 'D');
+       pq_sendint(&buf, 1, 2);         /* # of columns */
+
+       /* slot_name */
+       pq_sendint(&buf, strlen(slot_name), 4); /* col1 len */
+       pq_sendbytes(&buf, slot_name, strlen(slot_name));
+
+       pq_endmessage(&buf);
+
+       /*
+        * release active status again, START_REPLICATION will reacquire it
+        */
+       ReplicationSlotRelease();
+}
+
+/*
+ * Get rid of a replication slot that is no longer wanted.
+ */
+static void
+DropReplicationSlot(DropReplicationSlotCmd *cmd)
+{
+       ReplicationSlotDrop(cmd->slotname);
+       EndCommand("DROP_REPLICATION_SLOT", DestRemote);
+}
+
 /*
  * Execute an incoming replication command.
  */
@@ -660,14 +747,28 @@ exec_replication_command(const char *cmd_string)
                        IdentifySystem();
                        break;
 
-               case T_StartReplicationCmd:
-                       StartReplication((StartReplicationCmd *) cmd_node);
-                       break;
-
                case T_BaseBackupCmd:
                        SendBaseBackup((BaseBackupCmd *) cmd_node);
                        break;
 
+               case T_CreateReplicationSlotCmd:
+                       CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
+                       break;
+
+               case T_DropReplicationSlotCmd:
+                       DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
+                       break;
+
+               case T_StartReplicationCmd:
+                       {
+                               StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
+                               if (cmd->kind == REPLICATION_KIND_PHYSICAL)
+                                       StartReplication(cmd);
+                               else
+                                       elog(ERROR, "cannot handle changeset extraction yet");
+                               break;
+                       }
+
                case T_TimeLineHistoryCmd:
                        SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
                        break;
@@ -830,6 +931,39 @@ ProcessStandbyMessage(void)
        }
 }
 
+/*
+ * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
+ */
+static void
+PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
+{
+       bool changed = false;
+       /* use volatile pointer to prevent code rearrangement */
+       volatile ReplicationSlot *slot = MyReplicationSlot;
+
+       Assert(lsn != InvalidXLogRecPtr);
+       SpinLockAcquire(&slot->mutex);
+       if (slot->data.restart_lsn != lsn)
+       {
+               changed = true;
+               slot->data.restart_lsn = lsn;
+       }
+       SpinLockRelease(&slot->mutex);
+
+       if (changed)
+       {
+               ReplicationSlotMarkDirty();
+               ReplicationSlotsComputeRequiredLSN();
+       }
+
+       /*
+        * One could argue that the slot should saved to disk now, but that'd be
+        * energy wasted - the worst lost information can do here is give us wrong
+        * information in a statistics view - we'll just potentially be more
+        * conservative in removing files.
+        */
+}
+
 /*
  * Regular reply from standby advising of WAL positions on standby server.
  */
@@ -875,6 +1009,48 @@ ProcessStandbyReplyMessage(void)
 
        if (!am_cascading_walsender)
                SyncRepReleaseWaiters();
+
+       /*
+        * Advance our local xmin horizon when the client confirmed a flush.
+        */
+       if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
+       {
+               if (MyReplicationSlot->data.database != InvalidOid)
+                       elog(ERROR, "cannot handle changeset extraction yet");
+               else
+                       PhysicalConfirmReceivedLocation(flushPtr);
+       }
+}
+
+/* compute new replication slot xmin horizon if needed */
+static void
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+{
+       bool changed = false;
+       volatile ReplicationSlot *slot = MyReplicationSlot;
+
+       SpinLockAcquire(&slot->mutex);
+       MyPgXact->xmin = InvalidTransactionId;
+       /*
+        * For physical replication we don't need the the interlock provided
+        * by xmin and effective_xmin since the consequences of a missed increase
+        * are limited to query cancellations, so set both at once.
+        */
+       if (!TransactionIdIsNormal(slot->data.xmin) ||
+               !TransactionIdIsNormal(feedbackXmin) ||
+               TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
+       {
+               changed = true;
+               slot->data.xmin = feedbackXmin;
+               slot->effective_xmin = feedbackXmin;
+       }
+       SpinLockRelease(&slot->mutex);
+
+       if (changed)
+       {
+               ReplicationSlotMarkDirty();
+               ReplicationSlotsComputeRequiredXmin();
+       }
 }
 
 /*
@@ -904,6 +1080,8 @@ ProcessStandbyHSFeedbackMessage(void)
        if (!TransactionIdIsNormal(feedbackXmin))
        {
                MyPgXact->xmin = InvalidTransactionId;
+               if (MyReplicationSlot != NULL)
+                       PhysicalReplicationSlotNewXmin(feedbackXmin);
                return;
        }
 
@@ -951,8 +1129,17 @@ ProcessStandbyHSFeedbackMessage(void)
         * GetOldestXmin.  (If we're moving our xmin forward, this is obviously
         * safe, and if we're moving it backwards, well, the data is at risk
         * already since a VACUUM could have just finished calling GetOldestXmin.)
+        *
+        * If we're using a replication slot we reserve the xmin via that,
+        * otherwise via the walsender's PGXACT entry.
+
+        * XXX: It might make sense to introduce ephemeral slots and always use
+        * the slot mechanism.
         */
-       MyPgXact->xmin = feedbackXmin;
+       if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
+               PhysicalReplicationSlotNewXmin(feedbackXmin);
+       else
+               MyPgXact->xmin = feedbackXmin;
 }
 
 /* Main loop of walsender process that streams the WAL over Copy messages. */
index 2e717457b123f600eb60d6315ac4624f78c8dacf..c392d4fa228a3e1403ea4d340aec2b7d9d8e1f86 100644 (file)
@@ -27,6 +27,7 @@
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/bgwriter.h"
 #include "postmaster/postmaster.h"
+#include "replication/slot.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
@@ -126,6 +127,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
                size = add_size(size, ProcSignalShmemSize());
                size = add_size(size, CheckpointerShmemSize());
                size = add_size(size, AutoVacuumShmemSize());
+               size = add_size(size, ReplicationSlotsShmemSize());
                size = add_size(size, WalSndShmemSize());
                size = add_size(size, WalRcvShmemSize());
                size = add_size(size, BTreeShmemSize());
@@ -230,6 +232,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
        ProcSignalShmemInit();
        CheckpointerShmemInit();
        AutoVacuumShmemInit();
+       ReplicationSlotsShmemInit();
        WalSndShmemInit();
        WalRcvShmemInit();
 
index b68c95612c5397d1b12396e915d431b8032616d8..082115b4fffdf2edb363d7eee20daa030780b02a 100644 (file)
@@ -82,6 +82,9 @@ typedef struct ProcArrayStruct
         */
        TransactionId lastOverflowedXid;
 
+       /* oldest xmin of any replication slot */
+       TransactionId replication_slot_xmin;
+
        /*
         * We declare pgprocnos[] as 1 entry because C wants a fixed-size array,
         * but actually it is maxProcs entries long.
@@ -228,6 +231,7 @@ CreateSharedProcArray(void)
                 */
                procArray->numProcs = 0;
                procArray->maxProcs = PROCARRAY_MAXPROCS;
+               procArray->replication_slot_xmin = InvalidTransactionId;
                procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
                procArray->numKnownAssignedXids = 0;
                procArray->tailKnownAssignedXids = 0;
@@ -1153,6 +1157,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
        ProcArrayStruct *arrayP = procArray;
        TransactionId result;
        int                     index;
+       volatile TransactionId replication_slot_xmin = InvalidTransactionId;
 
        /* Cannot look for individual databases during recovery */
        Assert(allDbs || !RecoveryInProgress());
@@ -1204,6 +1209,9 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
                }
        }
 
+       /* fetch into volatile var while ProcArrayLock is held */
+       replication_slot_xmin = procArray->replication_slot_xmin;
+
        if (RecoveryInProgress())
        {
                /*
@@ -1244,6 +1252,13 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
                        result = FirstNormalTransactionId;
        }
 
+       /*
+        * Check whether there are replication slots requiring an older xmin.
+        */
+       if (TransactionIdIsValid(replication_slot_xmin) &&
+               NormalTransactionIdPrecedes(replication_slot_xmin, result))
+               result = replication_slot_xmin;
+
        return result;
 }
 
@@ -1313,6 +1328,7 @@ GetSnapshotData(Snapshot snapshot)
        int                     count = 0;
        int                     subcount = 0;
        bool            suboverflowed = false;
+       volatile TransactionId replication_slot_xmin = InvalidTransactionId;
 
        Assert(snapshot != NULL);
 
@@ -1490,8 +1506,13 @@ GetSnapshotData(Snapshot snapshot)
                        suboverflowed = true;
        }
 
+
+       /* fetch into volatile var while ProcArrayLock is held */
+       replication_slot_xmin = procArray->replication_slot_xmin;
+
        if (!TransactionIdIsValid(MyPgXact->xmin))
                MyPgXact->xmin = TransactionXmin = xmin;
+
        LWLockRelease(ProcArrayLock);
 
        /*
@@ -1506,6 +1527,12 @@ GetSnapshotData(Snapshot snapshot)
        RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
        if (!TransactionIdIsNormal(RecentGlobalXmin))
                RecentGlobalXmin = FirstNormalTransactionId;
+
+       /* Check whether there's a replication slot requiring an older xmin. */
+       if (TransactionIdIsValid(replication_slot_xmin) &&
+               NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
+               RecentGlobalXmin = replication_slot_xmin;
+
        RecentXmin = xmin;
 
        snapshot->xmin = xmin;
@@ -2491,6 +2518,21 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
        return true;                            /* timed out, still conflicts */
 }
 
+/*
+ * ProcArraySetReplicationSlotXmin
+ *
+ * Install limits to future computations of the xmin horizon to prevent vacuum
+ * and HOT pruning from removing affected rows still needed by clients with
+ * replicaton slots.
+ */
+void
+ProcArraySetReplicationSlotXmin(TransactionId xmin)
+{
+       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+       procArray->replication_slot_xmin = xmin;
+       LWLockRelease(ProcArrayLock);
+}
+
 
 #define XidCacheRemove(i) \
        do { \
index 55d9d7837cabdc019b7f58237ebf8967ddb406d7..82ef44094948afa741c8e40c928395ab2d4a0842 100644 (file)
@@ -27,6 +27,7 @@
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
+#include "replication/slot.h"
 #include "storage/ipc.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
@@ -238,6 +239,9 @@ NumLWLocks(void)
        /* predicate.c needs one per old serializable xid buffer */
        numLocks += NUM_OLDSERXID_BUFFERS;
 
+       /* slot.c needs one for each slot */
+       numLocks += max_replication_slots;
+
        /*
         * Add any requested by loadable modules; for backwards-compatibility
         * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if
index 9d32f9405d58e3759f095e37b989b764d1d154d4..fb449a8820422637abbb2c3d436cd812b2788290 100644 (file)
@@ -40,6 +40,7 @@
 #include "access/xact.h"
 #include "miscadmin.h"
 #include "postmaster/autovacuum.h"
+#include "replication/slot.h"
 #include "replication/syncrep.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
@@ -780,6 +781,10 @@ ProcKill(int code, Datum arg)
        /* Make sure we're out of the sync rep lists */
        SyncRepCleanupAtProcExit();
 
+       /* Make sure active replication slots are released */
+       if (MyReplicationSlot != NULL)
+               ReplicationSlotRelease();
+
 #ifdef USE_ASSERT_CHECKING
        if (assert_enabled)
        {
index a9b9794965b6b9961a8b831e204bdf5372026b6a..70d73d9898ee02a32db20d1806bf79538b1ffc0a 100644 (file)
@@ -57,6 +57,7 @@
 #include "postmaster/postmaster.h"
 #include "postmaster/syslogger.h"
 #include "postmaster/walwriter.h"
+#include "replication/slot.h"
 #include "replication/syncrep.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
@@ -2123,6 +2124,17 @@ static struct config_int ConfigureNamesInt[] =
                NULL, NULL, NULL
        },
 
+       {
+               /* see max_connections */
+               {"max_replication_slots", PGC_POSTMASTER, REPLICATION_SENDING,
+                       gettext_noop("Sets the maximum number of simultaneously defined replication slots."),
+                       NULL
+               },
+               &max_replication_slots,
+               0, 0, MAX_BACKENDS /* XXX?*/,
+               NULL, NULL, NULL
+       },
+
        {
                {"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING,
                        gettext_noop("Sets the maximum time to wait for WAL replication."),
index c8673b382da2ae557e9c2287ea38f6b02f63d8b8..d10e8a5783a2fc210ac16563582a8d3fe27fb4ff 100644 (file)
 #wal_keep_segments = 0         # in logfile segments, 16MB each; 0 disables
 #wal_sender_timeout = 60s      # in milliseconds; 0 disables
 
+#max_replication_slots = 0     # max number of replication slots.
+                               # (change requires restart)
+
 # - Master Server -
 
 # These settings are ignored on a standby server.
index 6b5302f6fd3826314b4b38fc51c4b1db9bca7302..a71320d94581fad5d4cba46448b2b2f85e868d2c 100644 (file)
@@ -195,6 +195,7 @@ const char *subdirs[] = {
        "pg_multixact/offsets",
        "base",
        "base/1",
+       "pg_replslot",
        "pg_tblspc",
        "pg_stat",
        "pg_stat_tmp"
index 3c6ab9a90245f001837da0457833e0b7f113f307..8a702e3388015e0174f6ede850ced57b0e2caed3 100644 (file)
@@ -67,6 +67,7 @@ usage(void)
        printf(_("  -U, --username=NAME    connect as specified database user\n"));
        printf(_("  -w, --no-password      never prompt for password\n"));
        printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
+       printf(_("      --slot             replication slot to use\n"));
        printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
 }
 
@@ -343,6 +344,7 @@ main(int argc, char **argv)
                {"no-password", no_argument, NULL, 'w'},
                {"password", no_argument, NULL, 'W'},
                {"status-interval", required_argument, NULL, 's'},
+               {"slot", required_argument, NULL, 'S'},
                {"verbose", no_argument, NULL, 'v'},
                {NULL, 0, NULL, 0}
        };
@@ -409,6 +411,9 @@ main(int argc, char **argv)
                                        exit(1);
                                }
                                break;
+                       case 'S':
+                               replication_slot = pg_strdup(optarg);
+                               break;
                        case 'n':
                                noloop = 1;
                                break;
index 2555904cd06a3db79b00a85762e7b8dbf39e4f29..7d3c76c9941d19712180d2638062ccadf6b23151 100644 (file)
@@ -31,6 +31,8 @@
 /* fd and filename for currently open WAL file */
 static int     walfile = -1;
 static char current_walfile_name[MAXPGPATH] = "";
+static bool reportFlushPosition = false;
+static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
 
 static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
                                 uint32 timeline, char *basedir,
@@ -133,7 +135,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
  * and returns false, otherwise returns true.
  */
 static bool
-close_walfile(char *basedir, char *partial_suffix)
+close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
 {
        off_t           currpos;
 
@@ -187,6 +189,7 @@ close_walfile(char *basedir, char *partial_suffix)
                                _("%s: not renaming \"%s%s\", segment is not complete\n"),
                                progname, current_walfile_name, partial_suffix);
 
+       lastFlushPosition = pos;
        return true;
 }
 
@@ -421,7 +424,10 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
        len += 1;
        sendint64(blockpos, &replybuf[len]);            /* write */
        len += 8;
-       sendint64(InvalidXLogRecPtr, &replybuf[len]);           /* flush */
+       if (reportFlushPosition)
+               sendint64(lastFlushPosition, &replybuf[len]);           /* flush */
+       else
+               sendint64(InvalidXLogRecPtr, &replybuf[len]);           /* flush */
        len += 8;
        sendint64(InvalidXLogRecPtr, &replybuf[len]);           /* apply */
        len += 8;
@@ -511,6 +517,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                  int standby_message_timeout, char *partial_suffix)
 {
        char            query[128];
+       char            slotcmd[128];
        PGresult   *res;
        XLogRecPtr      stoppos;
 
@@ -521,6 +528,29 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
        if (!CheckServerVersionForStreaming(conn))
                return false;
 
+       if (replication_slot != NULL)
+       {
+               /*
+                * Report the flush position, so the primary can know what WAL we'll
+                * possibly re-request, and remove older WAL safely.
+                *
+                * We only report it when a slot has explicitly been used, because
+                * reporting the flush position makes one elegible as a synchronous
+                * replica. People shouldn't include generic names in
+                * synchronous_standby_names, but we've protected them against it so
+                * far, so let's continue to do so in the situations when possible.
+                * If they've got a slot, though, we need to report the flush position,
+                * so that the master can remove WAL.
+                */
+               reportFlushPosition = true;
+               sprintf(slotcmd, "SLOT \"%s\" ", replication_slot);
+       }
+       else
+       {
+               reportFlushPosition = false;
+               slotcmd[0] = 0;
+       }
+
        if (sysidentifier != NULL)
        {
                /* Validate system identifier hasn't changed */
@@ -560,6 +590,12 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                PQclear(res);
        }
 
+       /*
+        * initialize flush position to starting point, it's the caller's
+        * responsibility that that's sane.
+        */
+       lastFlushPosition = startpos;
+
        while (1)
        {
                /*
@@ -606,7 +642,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        return true;
 
                /* Initiate the replication stream at specified location */
-               snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u",
+               snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
+                                slotcmd,
                                 (uint32) (startpos >> 32), (uint32) startpos,
                                 timeline);
                res = PQexec(conn, query);
@@ -810,7 +847,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                 */
                if (still_sending && stream_stop(blockpos, timeline, false))
                {
-                       if (!close_walfile(basedir, partial_suffix))
+                       if (!close_walfile(basedir, partial_suffix, blockpos))
                        {
                                /* Potential error message is written by close_walfile */
                                goto error;
@@ -909,7 +946,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                         */
                        if (still_sending)
                        {
-                               if (!close_walfile(basedir, partial_suffix))
+                               if (!close_walfile(basedir, partial_suffix, blockpos))
                                {
                                        /* Error message written in close_walfile() */
                                        goto error;
@@ -1074,7 +1111,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                /* Did we reach the end of a WAL segment? */
                                if (blockpos % XLOG_SEG_SIZE == 0)
                                {
-                                       if (!close_walfile(basedir, partial_suffix))
+                                       if (!close_walfile(basedir, partial_suffix, blockpos))
                                                /* Error message written in close_walfile() */
                                                goto error;
 
index 96fbed898fc3303ab09edf45b7088b95df74ae80..041076ff1d73976b9baf1f7a40dd3c87298d4a67 100644 (file)
@@ -22,6 +22,7 @@ char     *connection_string = NULL;
 char      *dbhost = NULL;
 char      *dbuser = NULL;
 char      *dbport = NULL;
+char      *replication_slot = NULL;
 int                    dbgetpassword = 0;      /* 0=auto, -1=never, 1=always */
 static char *dbpassword = NULL;
 PGconn    *conn = NULL;
index 77d6b86ced3c4aed719f76d14654dfb12442e7d6..bb3c34db07f831a1e0b9aee6e22e3c5c53e8952b 100644 (file)
@@ -6,6 +6,7 @@ extern char *dbhost;
 extern char *dbuser;
 extern char *dbport;
 extern int     dbgetpassword;
+extern char *replication_slot;
 
 /* Connection kept global so we can disconnect easily */
 extern PGconn *conn;
index 47e302276b4e977e25ec1b473b1d330dfb4b89eb..11ab27719907f2a190aeb70f95de80811b306d9c 100644 (file)
@@ -289,6 +289,7 @@ extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std);
 
 extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
 extern void XLogSetAsyncXactLSN(XLogRecPtr record);
+extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn);
 
 extern Buffer RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record,
                                   int block_index,
index c4661a8a66b70514bf5244c10e4f0291bbb3bb79..ad4def37b964354d8ab19325ad18ebed9a114409 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     201401301
+#define CATALOG_VERSION_NO     201401311
 
 #endif
index 1682fa9f9992ef0b3691c79aca4f19d3d2259a13..d7bb21ecccef8411e69066018fc5eb3fa2fc1555 100644 (file)
@@ -4782,6 +4782,14 @@ DESCR("SP-GiST support for quad tree over range");
 DATA(insert OID = 3473 (  spg_range_quad_leaf_consistent       PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 16 "2281 2281" _null_ _null_ _null_ _null_  spg_range_quad_leaf_consistent _null_ _null_ _null_ ));
 DESCR("SP-GiST support for quad tree over range");
 
+/* replication slots */
+DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2249 "19" "{19,25,25}" "{i,o,o}" "{slotname,slotname,xlog_position}" _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
+DESCR("create a physical replication slot");
+DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
+DESCR("drop a replication slot");
+DATA(insert OID = 3781 (  pg_get_replication_slots     PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{25,25,26,16,28,25}" "{o,o,o,o,o,o}" "{slot_name,slot_type,datoid,active,xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
+DESCR("information about replication slots currently in use");
+
 /* event triggers */
 DATA(insert OID = 3566 (  pg_event_trigger_dropped_objects             PGNSP PGUID 12 10 100 0 0 f f f f t t s 0 0 2249 "" "{26,26,23,25,25,25,25}" "{o,o,o,o,o,o,o}" "{classid, objid, objsubid, object_type, schema_name, object_name, object_identity}" _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
 DESCR("list objects dropped by the current command");
index dfcc01344eaf9c7ee75154b429932c89f79f213a..5b8df59bc65d1b3b5e96f050ebe1aa8bb1ce3da2 100644 (file)
@@ -412,6 +412,8 @@ typedef enum NodeTag
         */
        T_IdentifySystemCmd,
        T_BaseBackupCmd,
+       T_CreateReplicationSlotCmd,
+       T_DropReplicationSlotCmd,
        T_StartReplicationCmd,
        T_TimeLineHistoryCmd,
 
index 40971153b0349e6ce198c0505f32fe5bdd638b01..aac75fd1024c551456750bacddc0129791279b6d 100644 (file)
 #include "access/xlogdefs.h"
 #include "nodes/pg_list.h"
 
+typedef enum ReplicationKind {
+       REPLICATION_KIND_PHYSICAL,
+       REPLICATION_KIND_LOGICAL
+} ReplicationKind;
+
 
 /* ----------------------
  *             IDENTIFY_SYSTEM command
@@ -39,6 +44,30 @@ typedef struct BaseBackupCmd
 } BaseBackupCmd;
 
 
+/* ----------------------
+ *             CREATE_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct CreateReplicationSlotCmd
+{
+       NodeTag         type;
+       char       *slotname;
+       ReplicationKind kind;
+       char       *plugin;
+} CreateReplicationSlotCmd;
+
+
+/* ----------------------
+ *             DROP_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct DropReplicationSlotCmd
+{
+       NodeTag         type;
+       char       *slotname;
+} DropReplicationSlotCmd;
+
+
 /* ----------------------
  *             START_REPLICATION command
  * ----------------------
@@ -46,8 +75,11 @@ typedef struct BaseBackupCmd
 typedef struct StartReplicationCmd
 {
        NodeTag         type;
+       ReplicationKind kind;
+       char       *slotname;
        TimeLineID      timeline;
        XLogRecPtr      startpoint;
+       List       *options;
 } StartReplicationCmd;
 
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
new file mode 100644 (file)
index 0000000..089b0f4
--- /dev/null
@@ -0,0 +1,120 @@
+/*-------------------------------------------------------------------------
+ * slot.h
+ *        Replication slot management.
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SLOT_H
+#define SLOT_H
+
+#include "fmgr.h"
+#include "access/xlog.h"
+#include "access/xlogreader.h"
+#include "storage/lwlock.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+
+typedef struct ReplicationSlotPersistentData
+{
+       /* The slot's identifier */
+       NameData        name;
+
+       /* database the slot is active on */
+       Oid                     database;
+
+       /*
+        * xmin horizon for data
+        *
+        * NB: This may represent a value that hasn't been written to disk yet;
+        * see notes for effective_xmin, below.
+        */
+       TransactionId xmin;
+
+       /* oldest LSN that might be required by this replication slot */
+       XLogRecPtr      restart_lsn;
+
+} ReplicationSlotPersistentData;
+
+/*
+ * Shared memory state of a single replication slot.
+ */
+typedef struct ReplicationSlot
+{
+       /* lock, on same cacheline as effective_xmin */
+       slock_t         mutex;
+
+       /* is this slot defined */
+       bool            in_use;
+
+       /* is somebody streaming out changes for this slot */
+       bool            active;
+
+       /* any outstanding modifications? */
+       bool            just_dirtied;
+       bool            dirty;
+
+       /*
+        * For logical decoding, it's extremely important that we never remove any
+        * data that's still needed for decoding purposes, even after a crash;
+        * otherwise, decoding will produce wrong answers.  Ordinary streaming
+        * replication also needs to prevent old row versions from being removed
+        * too soon, but the worst consequence we might encounter there is unwanted
+        * query cancellations on the standby.  Thus, for logical decoding,
+        * this value represents the latest xmin that has actually been
+        * written to disk, whereas for streaming replication, it's just the
+        * same as the persistent value (data.xmin).
+        */
+       TransactionId effective_xmin;
+
+       /* data surviving shutdowns and crashes */
+       ReplicationSlotPersistentData data;
+
+       /* is somebody performing io on this slot? */
+       LWLock     *io_in_progress_lock;
+} ReplicationSlot;
+
+/*
+ * Shared memory control area for all of replication slots.
+ */
+typedef struct ReplicationSlotCtlData
+{
+       ReplicationSlot replication_slots[1];
+} ReplicationSlotCtlData;
+
+/*
+ * Pointers to shared memory
+ */
+extern ReplicationSlotCtlData *ReplicationSlotCtl;
+extern ReplicationSlot *MyReplicationSlot;
+
+/* GUCs */
+extern PGDLLIMPORT int max_replication_slots;
+
+/* shmem initialization functions */
+extern Size ReplicationSlotsShmemSize(void);
+extern void ReplicationSlotsShmemInit(void);
+
+/* management of individual slots */
+extern void ReplicationSlotCreate(const char *name, bool db_specific);
+extern void ReplicationSlotDrop(const char *name);
+extern void ReplicationSlotAcquire(const char *name);
+extern void ReplicationSlotRelease(void);
+extern void ReplicationSlotSave(void);
+extern void ReplicationSlotMarkDirty(void);
+
+/* misc stuff */
+extern bool ReplicationSlotValidateName(const char *name, int elevel);
+extern void ReplicationSlotsComputeRequiredXmin(void);
+extern void ReplicationSlotsComputeRequiredLSN(void);
+extern void StartupReplicationSlots(XLogRecPtr checkPointRedo);
+extern void CheckPointReplicationSlots(void);
+
+extern void CheckSlotRequirements(void);
+extern void ReplicationSlotAtProcExit(void);
+
+/* SQL callable functions */
+extern Datum pg_get_replication_slots(PG_FUNCTION_ARGS);
+
+#endif /* SLOT_H */
index 3c656197cc0d6ceed12fc0c58a1caad2a89c8bc4..3d9401059b769fca852027776b4c0df6285bf08a 100644 (file)
@@ -103,6 +103,12 @@ typedef struct
         */
        char            conninfo[MAXCONNINFO];
 
+       /*
+        * replication slot name; is also used for walreceiver to connect with
+        * the primary
+        */
+       char            slotname[NAMEDATALEN];
+
        slock_t         mutex;                  /* locks shared variables shown above */
 
        /*
@@ -125,7 +131,7 @@ extern PGDLLIMPORT walrcv_identify_system_type walrcv_identify_system;
 typedef void (*walrcv_readtimelinehistoryfile_type) (TimeLineID tli, char **filename, char **content, int *size);
 extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile;
 
-typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint);
+typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint, char *slotname);
 extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;
 
 typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli);
@@ -149,7 +155,8 @@ extern void WalRcvShmemInit(void);
 extern void ShutdownWalRcv(void);
 extern bool WalRcvStreaming(void);
 extern bool WalRcvRunning(void);
-extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo);
+extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
+                                        const char *conninfo, const char *slotname);
 extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
 extern int     GetReplicationApplyDelay(void);
 extern int     GetReplicationTransferLatency(void);
index 45079262740f7c8d93952502ccd4b841605e929e..c8ff4ebfb8ac73cf95fd7b4e49dc44a6e28a89b5 100644 (file)
@@ -125,7 +125,9 @@ extern LWLockPadded *MainLWLockArray;
 #define BackgroundWorkerLock           (&MainLWLockArray[33].lock)
 #define DynamicSharedMemoryControlLock         (&MainLWLockArray[34].lock)
 #define AutoFileLock                           (&MainLWLockArray[35].lock)
-#define NUM_INDIVIDUAL_LWLOCKS         36
+#define ReplicationSlotAllocationLock  (&MainLWLockArray[36].lock)
+#define ReplicationSlotControlLock             (&MainLWLockArray[37].lock)
+#define NUM_INDIVIDUAL_LWLOCKS         38
 
 /*
  * It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS
index 2947cc4af9f334cee986fff232c5389d8f25f408..d1a58a3661b0ad6f7389722b7651cdb65b890a8b 100644 (file)
@@ -77,4 +77,6 @@ extern void XidCacheRemoveRunningXids(TransactionId xid,
                                                  int nxids, const TransactionId *xids,
                                                  TransactionId latestXid);
 
+extern void ProcArraySetReplicationSlotXmin(TransactionId xmin);
+
 #endif   /* PROCARRAY_H */
index 540373dc485575918c682eebb90226426b98ad57..220e18b0bbdccdafa5ee26729987c43dd58d824a 100644 (file)
@@ -1367,6 +1367,15 @@ pg_prepared_xacts| SELECT p.transaction,
    FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
    LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
    LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_replication_slots| SELECT l.slot_name,
+    l.slot_type,
+    l.datoid,
+    d.datname AS database,
+    l.active,
+    l.xmin,
+    l.restart_lsn
+   FROM (pg_get_replication_slots() l(slot_name, slot_type, datoid, active, xmin, restart_lsn)
+   LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
     pg_authid.rolinherit,
index ad40735333b178fff7463d9a61ec3dff5fc151a1..3b7f61ef20865bc55e734d942ba4d6d28c1f2520 100644 (file)
@@ -343,6 +343,7 @@ CreateOpClassItem
 CreateOpClassStmt
 CreateOpFamilyStmt
 CreatePLangStmt
+CreateReplicationSlotCmd
 CreateRangeStmt
 CreateRoleStmt
 CreateSchemaStmt
@@ -416,6 +417,7 @@ DomainConstraintType
 DomainIOData
 DropBehavior
 DropOwnedStmt
+DropReplicationSlotCmd
 DropRoleStmt
 DropStmt
 DropTableSpaceStmt