<entry>query rewrite rules</entry>
</row>
+ <row>
+ <entry><link linkend="catalog-pg-replication-slots"><structname>pg_replication_slots</structname></link></entry>
+ <entry>replication slot information</entry>
+ </row>
+
<row>
<entry><link linkend="catalog-pg-seclabel"><structname>pg_seclabel</structname></link></entry>
<entry>security labels on database objects</entry>
</sect1>
+ <sect1 id="catalog-pg-replication-slots">
+ <title><structname>pg_replication_slots</structname></title>
+
+ <indexterm zone="catalog-pg-replication-slots">
+ <primary>pg_replication_slots</primary>
+ </indexterm>
+
+ <para>
+ The <structname>pg_replication_slots</structname> view provides a listing
+ of all replication slots that currently exist on the database cluster,
+ along with their current state.
+ </para>
+
+ <para>
+ For more on replication slots,
+ see <xref linkend="streaming-replication-slots">.
+ </para>
+
+ <table>
+
+ <title><structname>pg_replication_slots</structname> Columns</title>
+
+ <tgroup cols="4">
+ <thead>
+ <row>
+ <entry>Name</entry>
+ <entry>Type</entry>
+ <entry>References</entry>
+ <entry>Description</entry>
+ </row>
+ </thead>
+
+ <tbody>
+ <row>
+ <entry><structfield>slot_name</structfield></entry>
+ <entry><type>text</type></entry>
+ <entry></entry>
+ <entry>A unique, cluster-wide identifier for the replication slot</entry>
+ </row>
+
+ <row>
+ <entry><structfield>slot_type</structfield></entry>
+ <entry><type>text</type></entry>
+ <entry></entry>
+ <entry>The slot type - <literal>physical</> or <literal>logical</></entry>
+ </row>
+
+ <row>
+ <entry><structfield>datoid</structfield></entry>
+ <entry><type>oid</type></entry>
+ <entry><literal><link linkend="catalog-pg-database"><structname>pg_database</structname></link>.oid</literal></entry>
+ <entry>The oid of the database this slot is associated with, or
+ null. Only logical slots have an associated database.</entry>
+ </row>
+
+ <row>
+ <entry><structfield>database</structfield></entry>
+ <entry><type>text</type></entry>
+ <entry><literal><link linkend="catalog-pg-database"><structname>pg_database</structname></link>.datname</literal></entry>
+ <entry>The name of the database this slot is associated with, or
+ null. Only logical slots have an associated database.</entry>
+ </row>
+
+ <row>
+ <entry><structfield>active</structfield></entry>
+ <entry><type>boolean</type></entry>
+ <entry></entry>
+ <entry>True if this slot is currently actively being used</entry>
+ </row>
+
+ <row>
+ <entry><structfield>xmin</structfield></entry>
+ <entry><type>xid</type></entry>
+ <entry></entry>
+ <entry>The oldest transaction that this slot needs the database to
+ retain. <literal>VACUUM</literal> cannot remove tuples deleted
+ by any later transaction.
+ </entry>
+ </row>
+
+ <row>
+ <entry><structfield>restart_lsn</structfield></entry>
+ <entry><type>text</type></entry>
+ <entry></entry>
+ <entry>The address (<literal>LSN</literal>) of oldest WAL which still
+ might be required by the consumer of this slot and thus won't be
+ automatically removed during checkpoints.
+ </entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+ </sect1>
+
<sect1 id="catalog-pg-seclabel">
<title><structname>pg_seclabel</structname></title>
</listitem>
</varlistentry>
+ <varlistentry id="guc-max-replication-slots" xreflabel="max_replication_slots">
+ <term><varname>max_replication_slots</varname> (<type>integer</type>)</term>
+ <indexterm>
+ <primary><varname>max_replication_slots</> configuration parameter</primary>
+ </indexterm>
+ <listitem>
+ <para>
+ Specifies the maximum number of replication slots
+ (see <xref linkend="streaming-replication-slots"> that the server
+ can support. The default is zero. This parameter can only be set at
+ server start.
+ <varname>wal_level</varname> must be set
+ to <literal>archive</literal> or higher to allow replication slots to
+ be used. Setting it to a lower value than the number of currently
+ existing replication slots will prevent the server from starting.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-wal-keep-segments" xreflabel="wal_keep_segments">
<term><varname>wal_keep_segments</varname> (<type>integer</type>)</term>
<indexterm>
</para>
</sect2>
+ <sect2 id="functions-replication">
+ <title>Replication Functions</title>
+
+ <para>
+ PostgreSQL exposes a number of functions for controlling and interacting
+ with replication features. See <xref linkend="streaming-replication">
+ and <xref linkend="streaming-replication-slots">.
+ </para>
+
+ <para>
+ Many of these functions have equivalent commands in the replication
+ protocol; see <xref linkend="protocol-replication">.
+ </para>
+
+ <para>
+ The sections <xref linkend="functions-snapshot-synchronization">, <xref
+ linkend="functions-recovery-control"> and <xref
+ linkend="functions-admin-backup"> are also relevant for replication.
+ </para>
+
+ <table id="functions-replication-table">
+ <title>Replication <acronym>SQL</acronym> Functions</title>
+ <tgroup cols="3">
+ <thead>
+ <row>
+ <entry>Function</entry>
+ <entry>Return Type</entry>
+ <entry>Description</entry>
+ </row>
+ </thead>
+ <tbody>
+ <row>
+ <entry>
+ <indexterm>
+ <primary>pg_create_physical_replication_slot</primary>
+ </indexterm>
+ <literal><function>pg_create_physical_replication_slot(<parameter>slotname</parameter> <type>text</type>, <parameter>plugin</parameter> <type>text</type>)</function></literal>
+ </entry>
+ <entry>
+ (<parameter>slotname</parameter> <type>text</type>, <parameter>xlog_position</parameter> <type>text</type>)
+ </entry>
+ <entry>
+ Creates a new physical replication slot named
+ <parameter>slotname</parameter>. Streaming changes from a physical slot
+ is only possible with the walsender protocol - see <xref
+ linkend="protocol-replication">. Corresponds to the walsender protocol
+ command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>.
+ </entry>
+ </row>
+ <row>
+ <entry>
+ <indexterm>
+ <primary>pg_drop_replication_slot</primary>
+ </indexterm>
+ <literal><function>pg_drop_replication_slot(<parameter>slotname</parameter> <type>text</type>)</function></literal>
+ </entry>
+ <entry>
+ (<parameter>slotname</parameter> <type>text</type>)
+ </entry>
+ <entry>
+ Drops the physical or logical replication slot
+ named <parameter>slotname</parameter>. Same as walsender protocol
+ command <literal>DROP_REPLICATION_SLOT</>.
+ </entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+ </sect2>
+
<sect2 id="functions-admin-dbobject">
<title>Database Object Management Functions</title>
entries in <filename>pg_hba.conf</> with the database field set to
<literal>replication</>. Also ensure <varname>max_wal_senders</> is set
to a sufficiently large value in the configuration file of the primary
- server.
+ server. If replication slots will be used,
+ ensure that <varname>max_replication_slots</varname> is set sufficiently
+ high as well.
</para>
<para>
<para>
If you use streaming replication without file-based continuous
- archiving, you have to set <varname>wal_keep_segments</> in the master
- to a value high enough to ensure that old WAL segments are not recycled
- too early, while the standby might still need them to catch up. If the
- standby falls behind too much, it needs to be reinitialized from a new
- base backup. If you set up a WAL archive that's accessible from the
- standby, <varname>wal_keep_segments</> is not required as the standby can always
- use the archive to catch up.
+ archiving, the server might recycle old WAL segments before the standby
+ has received them. If this occurs, the standby will need to be
+ reinitialized from a new base backup. You can avoid this by setting
+ <varname>wal_keep_segments</> to a value large enough to ensure that
+ WAL segments are not recycled too early, or by configuration a replication
+ slot for the standby. If you set up a WAL archive that's accessible from
+ the standby, these solutions are not required, since the standby can
+ always use the archive to catch up provided it retains enough segments.
</para>
<para>
</sect3>
</sect2>
+ <sect2 id="streaming-replication-slots">
+ <title>Replication Slots</title>
+ <indexterm zone="high-availability">
+ <primary>Replication Slots</primary>
+ </indexterm>
+ <para>
+ Replication slots provide an automated way to ensure that the master does
+ not remove WAL segments until they have been received by all standbys,
+ and that the master does not remove rows which could cause a
+ <link linkend="hot-standby-conflict">recovery conflict</> even when the
+ standby is disconnected.
+ </para>
+ <para>
+ In lieu of using replication slots, it is possible to prevent the removal
+ of old WAL segments using <xref linkend="guc-wal-keep-segments">, or by
+ storing the segments in an archive using <xref linkend="restore-command">.
+ However, these methods often result in retaining more WAL segments than
+ required, whereas replication slots retain only the number of segments
+ known to be needed. An advantage of these methods is that they bound
+ the space requirement for <literal>pg_xlog</>; there is currently no way
+ to do this using replication slots.
+ </para>
+ <para>
+ Similarly, <varname>hot_standby_feedback</varname>
+ and <varname>vacuum_defer_cleanup_age</varname> provide protection against
+ relevant rows being removed by vacuum, but the former provides no
+ protection during any time period when the standby is not connected,
+ and the latter often needs to be set to a high value to provide adequate
+ protection. Replication slots overcome these disadvantages.
+ </para>
+ <sect3 id="streaming-replication-slots-manipulation">
+ <title>Querying and manipulating replication slots</title>
+ <para>
+ Each replication slot has a name, which can contain lower-case letters,
+ numbers, and the underscore character.
+ </para>
+ <para>
+ Existing replication slots and their state can be seen in the
+ <link linkend="catalog-pg-replication-slots"><structname>pg_replication_slots</structname></link>
+ view.
+ </para>
+ <para>
+ Slots can be created and dropped either via the streaming replication
+ protocol (see <xref linkend="protocol-replication">) or via SQL
+ functions (see <xref linkend="functions-replication">).
+ </para>
+ </sect3>
+ <sect3 id="streaming-replication-slots-config">
+ <title>Configuration Example</title>
+ <para>
+ You can create a replication slot like this:
+<programlisting>
+postgres=# SELECT * FROM pg_create_physical_replication_slot('node_a_slot');
+ slotname | xlog_position
+-------------+---------------
+ node_a_slot |
+
+postgres=# SELECT * FROM pg_replication_slots;
+ slot_name | slot_type | datoid | database | active | xmin | restart_lsn
+-------------+-----------+--------+----------+--------+------+-------------
+ node_a_slot | physical | 0 | | f | |
+(1 row)
+</programlisting>
+ To configure the standby to use this slot, <varname>primary_slotname</>
+ should be configured in the standby's <filename>recovery.conf</>.
+ Here is a simple example:
+<programlisting>
+standby_mode = 'on'
+primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
+primary_slotname = 'node_a_slot'
+</programlisting>
+ </para>
+ </sect3>
+ </sect2>
+
<sect2 id="cascading-replication">
<title>Cascading Replication</title>
</varlistentry>
<varlistentry>
- <term>START_REPLICATION <replaceable class="parameter">XXX/XXX</> TIMELINE <replaceable class="parameter">tli</></term>
+ <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slotname</> <literal>PHYSICAL</literal></term>
+ <indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
+ <listitem>
+ <para>
+ Create a physical replication
+ slot. See <xref linkend="streaming-replication-slots"> for more about
+ replication slots.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term><replaceable class="parameter">slotname</></term>
+ <listitem>
+ <para>
+ The name of the slot to create. Must be a valid replication slot
+ name (see <xref linkend="streaming-replication-slots-manipulation">).
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>START_REPLICATION</literal> [<literal>SLOT</literal> <replaceable class="parameter">slotname</>] [<literal>PHYSICAL</literal>] <replaceable class="parameter">XXX/XXX</> <literal>TIMELINE</literal> <replaceable class="parameter">tli</></term>
<listitem>
<para>
Instructs server to start streaming WAL, starting at
- WAL position <replaceable class="parameter">XXX/XXX</> on timeline
- <replaceable class="parameter">tli</>.
- The server can reply with an error, e.g. if the requested section of WAL
- has already been recycled. On success, server responds with a
- CopyBothResponse message, and then starts to stream WAL to the frontend.
+ WAL position <replaceable class="parameter">XXX/XXX</>. If specified,
+ streaming starts on timeline <replaceable class="parameter">tli</>;
+ otherwise, the server's current timeline is selected. The server can
+ reply with an error, e.g. if the requested section of WAL has already
+ been recycled. On success, server responds with a CopyBothResponse
+ message, and then starts to stream WAL to the frontend.
</para>
<para>
client contains a message of one of the following formats:
</para>
+ <para>
+ If a slot's name is provided
+ via <replaceable class="parameter">slotname</>, it will be updated
+ as replication progresses so that the server knows which WAL segments -
+ and if <varname>hot_standby_feedback</> is on which transactions -
+ are still needed by the standby.
+ </para>
+
<para>
<variablelist>
<varlistentry>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><literal>DROP_REPLICATION_SLOT</literal> <replaceable class="parameter">slotname</></term>
+ <listitem>
+ <para>
+ Drops a replication slot, freeing any reserved server-side resources. If
+ the slot is currently in use by an active connection, this command fails.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term><replaceable class="parameter">slotname</></term>
+ <listitem>
+ <para>
+ The name of the slot to drop.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term>BASE_BACKUP [<literal>LABEL</literal> <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] [<literal>FAST</literal>] [<literal>WAL</literal>] [<literal>NOWAIT</literal>]</term>
<listitem>
</para>
</listitem>
</varlistentry>
+ <varlistentry id="primary-slotname" xreflabel="primary_slotname">
+ <term><varname>primary_slotname</varname> (<type>string</type>)</term>
+ <indexterm>
+ <primary><varname>primary_slotname</> recovery parameter</primary>
+ </indexterm>
+ <listitem>
+ <para>
+ Optionally specifies an existing replication slot to be used when
+ connecting to the primary via streaming replication to control
+ resource removal on the upstream node
+ (see <xref linkend="streaming-replication-slots">).
+ This setting has no effect if <varname>primary_conninfo</> is not
+ set.
+ </para>
+ </listitem>
+ </varlistentry>
<varlistentry id="trigger-file" xreflabel="trigger_file">
<term><varname>trigger_file</varname> (<type>string</type>)</term>
<indexterm>
</para>
</listitem>
</varlistentry>
+
+ <varlistentry>
+ <term><option>--slot</option></term>
+ <listitem>
+ <para>
+ Require <application>pg_receivexlog</application> to use an existing
+ replication slot (see <xref linkend="streaming-replication-slots">).
+ When this option is used, <application>pg_receivexlog</> will report
+ a flush position to the server, indicating when each segment has been
+ synchronized to disk so that the server can remove that segment if it
+ is not otherwise needed. When using this paramter, it is important
+ to make sure that <application>pg_receivexlog</> cannot become the
+ synchronous standby through an incautious setting of
+ <xref linkend="guc-synchronous-standby-names">; it does not flush
+ data frequently enough for this to work correctly.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</para>
#include "pgstat.h"
#include "postmaster/bgwriter.h"
#include "postmaster/startup.h"
+#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/barrier.h"
/* options taken from recovery.conf for XLOG streaming */
static bool StandbyModeRequested = false;
static char *PrimaryConnInfo = NULL;
+static char *PrimarySlotName = NULL;
static char *TriggerFile = NULL;
/* are we currently in standby mode? */
uint32 ckptXidEpoch; /* nextXID & epoch of latest checkpoint */
TransactionId ckptXid;
XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */
+ XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */
+
XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG
* segment */
static void CreateEndOfRecoveryRecord(void);
static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
+static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
XLogRecPtr *lsn, BkpBlock *bkpb);
SetLatch(ProcGlobal->walwriterLatch);
}
+/*
+ * Record the LSN up to which we can remove WAL because it's not required by
+ * any replication slot.
+ */
+void
+XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->replicationSlotMinLSN = lsn;
+ SpinLockRelease(&xlogctl->info_lck);
+}
+
+
+/*
+ * Return the oldest LSN we must retain to satisfy the needs of some
+ * replication slot.
+ */
+static XLogRecPtr
+XLogGetReplicationSlotMinimumLSN(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogRecPtr retval;
+ SpinLockAcquire(&xlogctl->info_lck);
+ retval = xlogctl->replicationSlotMinLSN;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ return retval;
+}
+
/*
* Advance minRecoveryPoint in control file.
*
(errmsg_internal("primary_conninfo = '%s'",
PrimaryConnInfo)));
}
+ else if (strcmp(item->name, "primary_slotname") == 0)
+ {
+ ReplicationSlotValidateName(item->value, ERROR);
+ PrimarySlotName = pstrdup(item->value);
+ ereport(DEBUG2,
+ (errmsg_internal("primary_slotname = '%s'",
+ PrimarySlotName)));
+ }
else if (strcmp(item->name, "trigger_file") == 0)
{
TriggerFile = pstrdup(item->value);
XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
XLogCtl->ckptXid = checkPoint.nextXid;
+ /*
+ * Initialize replication slots, before there's a chance to remove
+ * required resources.
+ */
+ StartupReplicationSlots(checkPoint.redo);
+
/*
* Startup MultiXact. We need to do this early for two reasons: one
* is that we might try to access multixacts when we do tuple freezing,
CheckPointMultiXact();
CheckPointPredicate();
CheckPointRelationMap();
+ CheckPointReplicationSlots();
CheckPointBuffers(flags); /* performs all required fsyncs */
/* We deliberately delay 2PC checkpointing as long as possible */
CheckPointTwoPhase(checkPointRedo);
/*
* Retreat *logSegNo to the last segment that we need to retain because of
- * wal_keep_segments. This is calculated by subtracting wal_keep_segments
- * from the given xlog location, recptr.
+ * either wal_keep_segments or replication slots.
+ *
+ * This is calculated by subtracting wal_keep_segments from the given xlog
+ * location, recptr and by making sure that that result is below the
+ * requirement of replication slots.
*/
static void
KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
{
XLogSegNo segno;
-
- if (wal_keep_segments == 0)
- return;
+ XLogRecPtr keep;
XLByteToSeg(recptr, segno);
+ keep = XLogGetReplicationSlotMinimumLSN();
- /* avoid underflow, don't go below 1 */
- if (segno <= wal_keep_segments)
- segno = 1;
- else
- segno = segno - wal_keep_segments;
+ /* compute limit for wal_keep_segments first */
+ if (wal_keep_segments > 0)
+ {
+ /* avoid underflow, don't go below 1 */
+ if (segno <= wal_keep_segments)
+ segno = 1;
+ else
+ segno = segno - wal_keep_segments;
+ }
+
+ /* then check whether slots limit removal further */
+ if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
+ {
+ XLogRecPtr slotSegNo;
+
+ XLByteToSeg(keep, slotSegNo);
+
+ if (slotSegNo <= 0)
+ segno = 1;
+ else if (slotSegNo < segno)
+ segno = slotSegNo;
+ }
/* don't delete WAL segments newer than the calculated segment */
if (segno < *logSegNo)
tli, curFileTLI);
}
curFileTLI = tli;
- RequestXLogStreaming(tli, ptr, PrimaryConnInfo);
+ RequestXLogStreaming(tli, ptr, PrimaryConnInfo,
+ PrimarySlotName);
receivedUpto = 0;
}
WHERE S.usesysid = U.oid AND
S.pid = W.pid;
+CREATE VIEW pg_replication_slots AS
+ SELECT
+ L.slot_name,
+ L.slot_type,
+ L.datoid,
+ D.datname AS database,
+ L.active,
+ L.xmin,
+ L.restart_lsn
+ FROM pg_get_replication_slots() AS L
+ LEFT JOIN pg_database D ON (L.datoid = D.oid);
+
CREATE VIEW pg_stat_database AS
SELECT
D.oid AS datid,
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
- repl_gram.o syncrep.o
+ repl_gram.o slot.o slotfuncs.o syncrep.o
include $(top_srcdir)/src/backend/common.mk
Walreceiver is a postmaster subprocess, so the startup process can't fork it
directly. Instead, it sends a signal to postmaster, asking postmaster to launch
-it. Before that, however, startup process fills in WalRcvData->conninfo,
-and initializes the starting point in WalRcvData->receiveStart.
+it. Before that, however, startup process fills in WalRcvData->conninfo
+and WalRcvData->slotname, and initializes the starting point in
+WalRcvData->receiveStart.
As walreceiver receives WAL from the master server, and writes and flushes
it to disk (in pg_xlog), it updates WalRcvData->receivedUpto and signals
if (strcmp(de->d_name, BACKUP_LABEL_FILE) == 0)
continue;
+ /* Skip pg_replslot, not useful to copy */
+ if (strcmp(de->d_name, "pg_replslot") == 0)
+ continue;
+
/*
* Check if the postmaster has signaled us to exit, and abort with an
* error in that case. The error handler further up will call
static void libpqrcv_connect(char *conninfo);
static void libpqrcv_identify_system(TimeLineID *primary_tli);
static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
-static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint);
+static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
+ char *slotname);
static void libpqrcv_endstreaming(TimeLineID *next_tli);
static int libpqrcv_receive(int timeout, char **buffer);
static void libpqrcv_send(const char *buffer, int nbytes);
* throws an ERROR.
*/
static bool
-libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint)
+libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname)
{
char cmd[64];
PGresult *res;
/* Start streaming from the point requested by startup process */
- snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X TIMELINE %u",
- (uint32) (startpoint >> 32), (uint32) startpoint,
- tli);
+ if (slotname != NULL)
+ snprintf(cmd, sizeof(cmd),
+ "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u", slotname,
+ (uint32) (startpoint >> 32), (uint32) startpoint, tli);
+ else
+ snprintf(cmd, sizeof(cmd),
+ "START_REPLICATION %X/%X TIMELINE %u",
+ (uint32) (startpoint >> 32), (uint32) startpoint, tli);
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
}
/* Non-keyword tokens */
-%token <str> SCONST
+%token <str> SCONST IDENT
%token <uintval> UCONST
%token <recptr> RECPTR
%token K_BASE_BACKUP
%token K_IDENTIFY_SYSTEM
%token K_START_REPLICATION
+%token K_CREATE_REPLICATION_SLOT
+%token K_DROP_REPLICATION_SLOT
%token K_TIMELINE_HISTORY
%token K_LABEL
%token K_PROGRESS
%token K_NOWAIT
%token K_WAL
%token K_TIMELINE
+%token K_PHYSICAL
+%token K_SLOT
%type <node> command
-%type <node> base_backup start_replication identify_system timeline_history
+%type <node> base_backup start_replication create_replication_slot drop_replication_slot identify_system timeline_history
%type <list> base_backup_opt_list
%type <defelt> base_backup_opt
%type <uintval> opt_timeline
+%type <str> opt_slot
%%
firstcmd: command opt_semicolon
identify_system
| base_backup
| start_replication
+ | create_replication_slot
+ | drop_replication_slot
| timeline_history
;
}
;
+/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL */
+create_replication_slot:
+ K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL
+ {
+ CreateReplicationSlotCmd *cmd;
+ cmd = makeNode(CreateReplicationSlotCmd);
+ cmd->kind = REPLICATION_KIND_PHYSICAL;
+ cmd->slotname = $2;
+ $$ = (Node *) cmd;
+ }
+ ;
+
+/* DROP_REPLICATION_SLOT SLOT slot */
+drop_replication_slot:
+ K_DROP_REPLICATION_SLOT IDENT
+ {
+ DropReplicationSlotCmd *cmd;
+ cmd = makeNode(DropReplicationSlotCmd);
+ cmd->slotname = $2;
+ $$ = (Node *) cmd;
+ }
+ ;
+
/*
- * START_REPLICATION %X/%X [TIMELINE %d]
+ * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
*/
start_replication:
- K_START_REPLICATION RECPTR opt_timeline
+ K_START_REPLICATION opt_slot opt_physical RECPTR opt_timeline
{
StartReplicationCmd *cmd;
cmd = makeNode(StartReplicationCmd);
- cmd->startpoint = $2;
- cmd->timeline = $3;
-
+ cmd->kind = REPLICATION_KIND_PHYSICAL;
+ cmd->slotname = $2;
+ cmd->startpoint = $4;
+ cmd->timeline = $5;
$$ = (Node *) cmd;
}
;
$$ = (Node *) cmd;
}
;
+
+opt_physical : K_PHYSICAL | /* EMPTY */;
+
+
+opt_slot : K_SLOT IDENT
+ {
+ $$ = $2;
+ }
+ | /* nothing */ { $$ = NULL; }
%%
#include "repl_scanner.c"
#include "postgres.h"
#include "utils/builtins.h"
+#include "parser/scansup.h"
/* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */
#undef fprintf
%option warn
%option prefix="replication_yy"
-%x xq
+%x xq xd
/* Extended quote
* xqdouble implements embedded quote, ''''
xqdouble {quote}{quote}
xqinside [^']+
+/* Double quote
+ * Allows embedded spaces and other special characters into identifiers.
+ */
+dquote \"
+xdstart {dquote}
+xdstop {dquote}
+xddouble {dquote}{dquote}
+xdinside [^"]+
+
digit [0-9]+
hexdigit [0-9A-Za-z]+
quote '
quotestop {quote}
+ident_start [A-Za-z\200-\377_]
+ident_cont [A-Za-z\200-\377_0-9\$]
+
+identifier {ident_start}{ident_cont}*
+
%%
BASE_BACKUP { return K_BASE_BACKUP; }
WAL { return K_WAL; }
TIMELINE { return K_TIMELINE; }
START_REPLICATION { return K_START_REPLICATION; }
+CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; }
+DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; }
TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
+PHYSICAL { return K_PHYSICAL; }
+SLOT { return K_SLOT; }
+
"," { return ','; }
";" { return ';'; }
+"(" { return '('; }
+")" { return ')'; }
[\n] ;
[\t] ;
BEGIN(xq);
startlit();
}
+
<xq>{quotestop} {
yyless(1);
BEGIN(INITIAL);
yylval.str = litbufdup();
return SCONST;
}
-<xq>{xqdouble} {
+
+<xq>{xqdouble} {
addlitchar('\'');
}
+
<xq>{xqinside} {
addlit(yytext, yyleng);
}
-<xq><<EOF>> { yyerror("unterminated quoted string"); }
+{xdstart} {
+ BEGIN(xd);
+ startlit();
+ }
+
+<xd>{xdstop} {
+ int len;
+ yyless(1);
+ BEGIN(INITIAL);
+ yylval.str = litbufdup();
+ len = strlen(yylval.str);
+ truncate_identifier(yylval.str, len, true);
+ return IDENT;
+ }
+
+<xd>{xdinside} {
+ addlit(yytext, yyleng);
+ }
+
+{identifier} {
+ int len = strlen(yytext);
+
+ yylval.str = downcase_truncate_identifier(yytext, len, true);
+ return IDENT;
+ }
+
+<xq,xd><<EOF>> { yyerror("unterminated quoted string"); }
<<EOF>> {
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * slot.c
+ * Replication slot management.
+ *
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/replication/slot.c
+ *
+ * NOTES
+ *
+ * Replication slots are used to keep state about replication streams
+ * originating from this cluster. Their primary purpose is to prevent the
+ * premature removal of WAL or of old tuple versions in a manner that would
+ * interfere with replication; they also useful for monitoring purposes.
+ * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
+ * on standbys (to support cascading setups). The requirement that slots be
+ * usable on standbys precludes storing them in the system catalogs.
+ *
+ * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
+ * directory. Inside that directory the state file will contain the slot's
+ * own data. Additional data can be stored alongside that file if required.
+ * While the server is running, the state data is also cached in memory for
+ * efficiency.
+ *
+ * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
+ * or free a slot. ReplicationSlotControlLock must be taken in shared mode
+ * to iterate over the slots, and in exclusive mode to change the in_use flag
+ * of a slot. The remaining data in each slot is protected by its mutex.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <unistd.h>
+#include <sys/stat.h>
+
+#include "access/transam.h"
+#include "miscadmin.h"
+#include "replication/slot.h"
+#include "storage/fd.h"
+#include "storage/procarray.h"
+
+/*
+ * Replication slot on-disk data structure.
+ */
+typedef struct ReplicationSlotOnDisk
+{
+ /* first part of this struct needs to be version independent */
+
+ /* data not covered by checksum */
+ uint32 magic;
+ pg_crc32 checksum;
+
+ /* data covered by checksum */
+ uint32 version;
+ uint32 length;
+
+ ReplicationSlotPersistentData slotdata;
+} ReplicationSlotOnDisk;
+
+/* size of the part of the slot that is version independent */
+#define ReplicationSlotOnDiskConstantSize \
+ offsetof(ReplicationSlotOnDisk, slotdata)
+/* size of the slots that is not version indepenent */
+#define ReplicationSlotOnDiskDynamicSize \
+ sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
+
+#define SLOT_MAGIC 0x1051CA1 /* format identifier */
+#define SLOT_VERSION 1 /* version for new files */
+
+/* Control array for replication slot management */
+ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
+
+/* My backend's replication slot in the shared memory array */
+ReplicationSlot *MyReplicationSlot = NULL;
+
+/* GUCs */
+int max_replication_slots = 0; /* the maximum number of replication slots */
+
+/* internal persistency functions */
+static void RestoreSlotFromDisk(const char *name);
+static void CreateSlotOnDisk(ReplicationSlot *slot);
+static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
+
+/*
+ * Report shared-memory space needed by ReplicationSlotShmemInit.
+ */
+Size
+ReplicationSlotsShmemSize(void)
+{
+ Size size = 0;
+
+ if (max_replication_slots == 0)
+ return size;
+
+ size = offsetof(ReplicationSlotCtlData, replication_slots);
+ size = add_size(size,
+ mul_size(max_replication_slots, sizeof(ReplicationSlot)));
+
+ return size;
+}
+
+/*
+ * Allocate and initialize walsender-related shared memory.
+ */
+void
+ReplicationSlotsShmemInit(void)
+{
+ bool found;
+
+ if (max_replication_slots == 0)
+ return;
+
+ ReplicationSlotCtl = (ReplicationSlotCtlData *)
+ ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
+ &found);
+
+ if (!found)
+ {
+ int i;
+
+ /* First time through, so initialize */
+ MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
+
+ /* everything else is zeroed by the memset above */
+ SpinLockInit(&slot->mutex);
+ slot->io_in_progress_lock = LWLockAssign();
+ }
+ }
+}
+
+/*
+ * Check whether the passed slot name is valid and report errors at elevel.
+ *
+ * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
+ * the name to be uses as a directory name on every supported OS.
+ *
+ * Returns whether the directory name is valid or not if elevel < ERROR.
+ */
+bool
+ReplicationSlotValidateName(const char *name, int elevel)
+{
+ const char *cp;
+
+ if (strlen(name) == 0)
+ {
+ ereport(elevel,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("replication slot name \"%s\" is too short",
+ name)));
+ return false;
+ }
+
+ if (strlen(name) >= NAMEDATALEN)
+ {
+ ereport(elevel,
+ (errcode(ERRCODE_NAME_TOO_LONG),
+ errmsg("replication slot name \"%s\" is too long",
+ name)));
+ return false;
+ }
+
+ for (cp = name; *cp; cp++)
+ {
+ if (!((*cp >= 'a' && *cp <= 'z')
+ || (*cp >= '0' && *cp <= '9')
+ || (*cp == '_')))
+ {
+ ereport(elevel,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("replication slot name \"%s\" contains invalid character",
+ name),
+ errhint("Replication slot names may only contain letters, numbers and the underscore character.")));
+ return false;
+ }
+ }
+ return true;
+}
+
+/*
+ * Create a new replication slot and mark it as used by this backend.
+ *
+ * name: Name of the slot
+ * db_specific: changeset extraction is db specific, if the slot is going to
+ * be used for that pass true, otherwise false.
+ */
+void
+ReplicationSlotCreate(const char *name, bool db_specific)
+{
+ ReplicationSlot *slot = NULL;
+ int i;
+
+ Assert(MyReplicationSlot == NULL);
+
+ ReplicationSlotValidateName(name, ERROR);
+
+ /*
+ * If some other backend ran this code currently with us, we'd likely
+ * both allocate the same slot, and that would be bad. We'd also be
+ * at risk of missing a name collision. Also, we don't want to try to
+ * create a new slot while somebody's busy cleaning up an old one, because
+ * we might both be monkeying with the same directory.
+ */
+ LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
+
+ /*
+ * Check for name collision, and identify an allocatable slot. We need
+ * to hold ReplicationSlotControlLock in shared mode for this, so that
+ * nobody else can change the in_use flags while we're looking at them.
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("replication slot \"%s\" already exists", name)));
+ if (!s->in_use && slot == NULL)
+ slot = s;
+ }
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /* If all slots are in use, we're out of luck. */
+ if (slot == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("all replication slots are in use"),
+ errhint("Free one or increase max_replication_slots.")));
+
+ /*
+ * Since this slot is not in use, nobody should be looking at any
+ * part of it other than the in_use field unless they're trying to allocate
+ * it. And since we hold ReplicationSlotAllocationLock, nobody except us
+ * can be doing that. So it's safe to initialize the slot.
+ */
+ Assert(!slot->in_use);
+ Assert(!slot->active);
+ slot->data.xmin = InvalidTransactionId;
+ slot->effective_xmin = InvalidTransactionId;
+ strncpy(NameStr(slot->data.name), name, NAMEDATALEN);
+ NameStr(slot->data.name)[NAMEDATALEN - 1] = '\0';
+ slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
+ slot->data.restart_lsn = InvalidXLogRecPtr;
+
+ /*
+ * Create the slot on disk. We haven't actually marked the slot allocated
+ * yet, so no special cleanup is required if this errors out.
+ */
+ CreateSlotOnDisk(slot);
+
+ /*
+ * We need to briefly prevent any other backend from iterating over the
+ * slots while we flip the in_use flag. We also need to set the active
+ * flag while holding the ControlLock as otherwise a concurrent
+ * SlotAcquire() could acquire the slot as well.
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+
+ slot->in_use = true;
+
+ /* We can now mark the slot active, and that makes it our slot. */
+ {
+ volatile ReplicationSlot *vslot = slot;
+
+ SpinLockAcquire(&slot->mutex);
+ Assert(!vslot->active);
+ vslot->active = true;
+ SpinLockRelease(&slot->mutex);
+ MyReplicationSlot = slot;
+ }
+
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /*
+ * Now that the slot has been marked as in_use and in_active, it's safe to
+ * let somebody else try to allocate a slot.
+ */
+ LWLockRelease(ReplicationSlotAllocationLock);
+}
+
+/*
+ * Find an previously created slot and mark it as used by this backend.
+ */
+void
+ReplicationSlotAcquire(const char *name)
+{
+ ReplicationSlot *slot = NULL;
+ int i;
+ bool active = false;
+
+ Assert(MyReplicationSlot == NULL);
+
+ ReplicationSlotValidateName(name, ERROR);
+
+ /* Search for the named slot and mark it active if we find it. */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+ {
+ volatile ReplicationSlot *vslot = s;
+
+ SpinLockAcquire(&s->mutex);
+ active = vslot->active;
+ vslot->active = true;
+ SpinLockRelease(&s->mutex);
+ slot = s;
+ break;
+ }
+ }
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /* If we did not find the slot or it was already active, error out. */
+ if (slot == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("replication slot \"%s\" does not exist", name)));
+ if (active)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("replication slot \"%s\" is already active", name)));
+
+ /* We made this slot active, so it's ours now. */
+ MyReplicationSlot = slot;
+}
+
+/*
+ * Release a replication slot, this or another backend can ReAcquire it
+ * later. Resources this slot requires will be preserved.
+ */
+void
+ReplicationSlotRelease(void)
+{
+ ReplicationSlot *slot = MyReplicationSlot;
+
+ Assert(slot != NULL && slot->active);
+
+ /* Mark slot inactive. We're not freeing it, just disconnecting. */
+ {
+ volatile ReplicationSlot *vslot = slot;
+ SpinLockAcquire(&slot->mutex);
+ vslot->active = false;
+ SpinLockRelease(&slot->mutex);
+ MyReplicationSlot = NULL;
+ }
+}
+
+/*
+ * Permanently drop replication slot identified by the passed in name.
+ */
+void
+ReplicationSlotDrop(const char *name)
+{
+ ReplicationSlot *slot = NULL;
+ int i;
+ bool active;
+ char path[MAXPGPATH];
+ char tmppath[MAXPGPATH];
+
+ ReplicationSlotValidateName(name, ERROR);
+
+ /*
+ * If some other backend ran this code currently with us, we might both
+ * try to free the same slot at the same time. Or we might try to delete
+ * a slot with a certain name while someone else was trying to create a
+ * slot with the same name.
+ */
+ LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
+
+ /* Search for the named slot and mark it active if we find it. */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+ {
+ volatile ReplicationSlot *vslot = s;
+
+ SpinLockAcquire(&s->mutex);
+ active = vslot->active;
+ vslot->active = true;
+ SpinLockRelease(&s->mutex);
+ slot = s;
+ break;
+ }
+ }
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /* If we did not find the slot or it was already active, error out. */
+ if (slot == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("replication slot \"%s\" does not exist", name)));
+ if (active)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("replication slot \"%s\" is already active", name)));
+
+ /* Generate pathnames. */
+ sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
+ sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
+
+ /*
+ * Rename the slot directory on disk, so that we'll no longer recognize
+ * this as a valid slot. Note that if this fails, we've got to mark the
+ * slot inactive again before bailing out.
+ */
+ if (rename(path, tmppath) != 0)
+ {
+ volatile ReplicationSlot *vslot = slot;
+
+ SpinLockAcquire(&slot->mutex);
+ vslot->active = false;
+ SpinLockRelease(&slot->mutex);
+
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rename \"%s\" to \"%s\": %m",
+ path, tmppath)));
+ }
+
+ /*
+ * We need to fsync() the directory we just renamed and its parent to make
+ * sure that our changes are on disk in a crash-safe fashion. If fsync()
+ * fails, we can't be sure whether the changes are on disk or not. For
+ * now, we handle that by panicking; StartupReplicationSlots() will
+ * try to straighten it out after restart.
+ */
+ START_CRIT_SECTION();
+ fsync_fname(tmppath, true);
+ fsync_fname("pg_replslot", true);
+ END_CRIT_SECTION();
+
+ /*
+ * The slot is definitely gone. Lock out concurrent scans of the array
+ * long enough to kill it. It's OK to clear the active flag here without
+ * grabbing the mutex because nobody else can be scanning the array here,
+ * and nobody can be attached to this slot and thus access it without
+ * scanning the array.
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+ slot->active = false;
+ slot->in_use = false;
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /*
+ * Slot is dead and doesn't prevent resource removal anymore, recompute
+ * limits.
+ */
+ ReplicationSlotsComputeRequiredXmin();
+ ReplicationSlotsComputeRequiredLSN();
+
+ /*
+ * If removing the directory fails, the worst thing that will happen is
+ * that the user won't be able to create a new slot with the same name
+ * until the next server restart. We warn about it, but that's all.
+ */
+ if (!rmtree(tmppath, true))
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not remove directory \"%s\"", tmppath)));
+
+ /*
+ * We release this at the very end, so that nobody starts trying to create
+ * a slot while we're still cleaning up the detritus of the old one.
+ */
+ LWLockRelease(ReplicationSlotAllocationLock);
+}
+
+/*
+ * Serialize the currently acquired slot's state from memory to disk, thereby
+ * guaranteeing the current state will survive a crash.
+ */
+void
+ReplicationSlotSave(void)
+{
+ char path[MAXPGPATH];
+
+ Assert(MyReplicationSlot != NULL);
+
+ sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
+ SaveSlotToPath(MyReplicationSlot, path, ERROR);
+}
+
+/*
+ * Signal that it would be useful if the currently acquired slot would be
+ * flushed out to disk.
+ *
+ * Note that the actual flush to disk can be delayed for a long time, if
+ * required for correctness explicitly do a ReplicationSlotSave().
+ */
+void
+ReplicationSlotMarkDirty(void)
+{
+ Assert(MyReplicationSlot != NULL);
+
+ {
+ volatile ReplicationSlot *vslot = MyReplicationSlot;
+
+ SpinLockAcquire(&vslot->mutex);
+ MyReplicationSlot->just_dirtied = true;
+ MyReplicationSlot->dirty = true;
+ SpinLockRelease(&vslot->mutex);
+ }
+}
+
+/*
+ * Compute the oldest xmin across all slots and store it in the ProcArray.
+ */
+void
+ReplicationSlotsComputeRequiredXmin(void)
+{
+ int i;
+ TransactionId agg_xmin = InvalidTransactionId;
+
+ Assert(ReplicationSlotCtl != NULL);
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+ TransactionId effective_xmin;
+
+ if (!s->in_use)
+ continue;
+
+ {
+ volatile ReplicationSlot *vslot = s;
+
+ SpinLockAcquire(&s->mutex);
+ effective_xmin = vslot->effective_xmin;
+ SpinLockRelease(&s->mutex);
+ }
+
+ /* check the data xmin */
+ if (TransactionIdIsValid(effective_xmin) &&
+ (!TransactionIdIsValid(agg_xmin) ||
+ TransactionIdPrecedes(effective_xmin, agg_xmin)))
+ agg_xmin = effective_xmin;
+ }
+ LWLockRelease(ReplicationSlotControlLock);
+
+ ProcArraySetReplicationSlotXmin(agg_xmin);
+}
+
+/*
+ * Compute the oldest restart LSN across all slots and inform xlog module.
+ */
+void
+ReplicationSlotsComputeRequiredLSN(void)
+{
+ int i;
+ XLogRecPtr min_required = InvalidXLogRecPtr;
+
+ Assert(ReplicationSlotCtl != NULL);
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+ XLogRecPtr restart_lsn;
+
+ if (!s->in_use)
+ continue;
+
+ {
+ volatile ReplicationSlot *vslot = s;
+
+ SpinLockAcquire(&s->mutex);
+ restart_lsn = vslot->data.restart_lsn;
+ SpinLockRelease(&s->mutex);
+ }
+
+ if (restart_lsn != InvalidXLogRecPtr &&
+ (min_required == InvalidXLogRecPtr ||
+ restart_lsn < min_required))
+ min_required = restart_lsn;
+ }
+ LWLockRelease(ReplicationSlotControlLock);
+
+ XLogSetReplicationSlotMinimumLSN(min_required);
+}
+
+/*
+ * Check whether the server's configuration supports using replication
+ * slots.
+ */
+void
+CheckSlotRequirements(void)
+{
+ if (max_replication_slots == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ (errmsg("replication slots can only be used if max_replication_slots > 0"))));
+
+ if (wal_level < WAL_LEVEL_ARCHIVE)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slots can only be used if wal_level >= archive")));
+}
+
+/*
+ * Returns whether the string `str' has the postfix `end'.
+ */
+static bool
+string_endswith(const char *str, const char *end)
+{
+ size_t slen = strlen(str);
+ size_t elen = strlen(end);
+
+ /* can't be a postfix if longer */
+ if (elen > slen)
+ return false;
+
+ /* compare the end of the strings */
+ str += slen - elen;
+ return strcmp(str, end) == 0;
+}
+
+/*
+ * Flush all replication slots to disk.
+ *
+ * This needn't actually be part of a checkpoint, but it's a convenient
+ * location.
+ */
+void
+CheckPointReplicationSlots(void)
+{
+ int i;
+
+ ereport(DEBUG1,
+ (errmsg("performing replication slot checkpoint")));
+
+ /*
+ * Prevent any slot from being created/dropped while we're active. As we
+ * explicitly do *not* want to block iterating over replication_slots or
+ * acquiring a slot we cannot take the control lock - but that's OK,
+ * because holding ReplicationSlotAllocationLock is strictly stronger,
+ * and enough to guarantee that nobody can change the in_use bits on us.
+ */
+ LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+ char path[MAXPGPATH];
+
+ if (!s->in_use)
+ continue;
+
+ /* save the slot to disk, locking is handled in SaveSlotToPath() */
+ sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
+ SaveSlotToPath(s, path, LOG);
+ }
+ LWLockRelease(ReplicationSlotAllocationLock);
+}
+
+/*
+ * Load all replication slots from disk into memory at server startup. This
+ * needs to be run before we start crash recovery.
+ */
+void
+StartupReplicationSlots(XLogRecPtr checkPointRedo)
+{
+ DIR *replication_dir;
+ struct dirent *replication_de;
+
+ ereport(DEBUG1,
+ (errmsg("starting up replication slots")));
+
+ /* restore all slots by iterating over all on-disk entries */
+ replication_dir = AllocateDir("pg_replslot");
+ while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
+ {
+ struct stat statbuf;
+ char path[MAXPGPATH];
+
+ if (strcmp(replication_de->d_name, ".") == 0 ||
+ strcmp(replication_de->d_name, "..") == 0)
+ continue;
+
+ snprintf(path, MAXPGPATH, "pg_replslot/%s", replication_de->d_name);
+
+ /* we're only creating directories here, skip if it's not our's */
+ if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
+ continue;
+
+ /* we crashed while a slot was being setup or deleted, clean up */
+ if (string_endswith(replication_de->d_name, ".tmp"))
+ {
+ if (!rmtree(path, true))
+ {
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not remove directory \"%s\"", path)));
+ continue;
+ }
+ fsync_fname("pg_replslot", true);
+ continue;
+ }
+
+ /* looks like a slot in a normal state, restore */
+ RestoreSlotFromDisk(replication_de->d_name);
+ }
+ FreeDir(replication_dir);
+
+ /* currently no slots exist, we're done. */
+ if (max_replication_slots <= 0)
+ return;
+
+ /* Now that we have recovered all the data, compute replication xmin */
+ ReplicationSlotsComputeRequiredXmin();
+ ReplicationSlotsComputeRequiredLSN();
+}
+
+/* ----
+ * Manipulation of ondisk state of replication slots
+ *
+ * NB: none of the routines below should take any notice whether a slot is the
+ * current one or not, that's all handled a layer above.
+ * ----
+ */
+static void
+CreateSlotOnDisk(ReplicationSlot *slot)
+{
+ char tmppath[MAXPGPATH];
+ char path[MAXPGPATH];
+ struct stat st;
+
+ /*
+ * No need to take out the io_in_progress_lock, nobody else can see this
+ * slot yet, so nobody else wil write. We're reusing SaveSlotToPath which
+ * takes out the lock, if we'd take the lock here, we'd deadlock.
+ */
+
+ sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
+ sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
+
+ /*
+ * It's just barely possible that some previous effort to create or
+ * drop a slot with this name left a temp directory lying around.
+ * If that seems to be the case, try to remove it. If the rmtree()
+ * fails, we'll error out at the mkdir() below, so we don't bother
+ * checking success.
+ */
+ if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
+ rmtree(tmppath, true);
+
+ /* Create and fsync the temporary slot directory. */
+ if (mkdir(tmppath, S_IRWXU) < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create directory \"%s\": %m",
+ tmppath)));
+ fsync_fname(tmppath, true);
+
+ /* Write the actual state file. */
+ slot->dirty = true; /* signal that we really need to write */
+ SaveSlotToPath(slot, tmppath, ERROR);
+
+ /* Rename the directory into place. */
+ if (rename(tmppath, path) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rename file \"%s\" to \"%s\": %m",
+ tmppath, path)));
+
+ /*
+ * If we'd now fail - really unlikely - we wouldn't know wether this slot
+ * would persist after an OS crash or not - so, force a restart. The
+ * restart would try to fysnc this again till it works.
+ */
+ START_CRIT_SECTION();
+
+ fsync_fname(path, true);
+ fsync_fname("pg_replslot", true);
+
+ END_CRIT_SECTION();
+}
+
+/*
+ * Shared functionality between saving and creating a replication slot.
+ */
+static void
+SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
+{
+ char tmppath[MAXPGPATH];
+ char path[MAXPGPATH];
+ int fd;
+ ReplicationSlotOnDisk cp;
+ bool was_dirty;
+
+ /* first check whether there's something to write out */
+ {
+ volatile ReplicationSlot *vslot = slot;
+
+ SpinLockAcquire(&vslot->mutex);
+ was_dirty = vslot->dirty;
+ vslot->just_dirtied = false;
+ SpinLockRelease(&vslot->mutex);
+ }
+
+ /* and don't do anything if there's nothing to write */
+ if (!was_dirty)
+ return;
+
+ LWLockAcquire(slot->io_in_progress_lock, LW_EXCLUSIVE);
+
+ /* silence valgrind :( */
+ memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
+
+ sprintf(tmppath, "%s/state.tmp", dir);
+ sprintf(path, "%s/state", dir);
+
+ fd = OpenTransientFile(tmppath,
+ O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+ S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ {
+ ereport(elevel,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m",
+ tmppath)));
+ return;
+ }
+
+ cp.magic = SLOT_MAGIC;
+ INIT_CRC32(cp.checksum);
+ cp.version = 1;
+ cp.length = ReplicationSlotOnDiskDynamicSize;
+
+ SpinLockAcquire(&slot->mutex);
+
+ memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
+
+ SpinLockRelease(&slot->mutex);
+
+ COMP_CRC32(cp.checksum,
+ (char *)(&cp) + ReplicationSlotOnDiskConstantSize,
+ ReplicationSlotOnDiskDynamicSize);
+
+ if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
+ {
+ int save_errno = errno;
+ CloseTransientFile(fd);
+ errno = save_errno;
+ ereport(elevel,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m",
+ tmppath)));
+ return;
+ }
+
+ /* fsync the temporary file */
+ if (pg_fsync(fd) != 0)
+ {
+ int save_errno = errno;
+ CloseTransientFile(fd);
+ errno = save_errno;
+ ereport(elevel,
+ (errcode_for_file_access(),
+ errmsg("could not fsync file \"%s\": %m",
+ tmppath)));
+ return;
+ }
+
+ CloseTransientFile(fd);
+
+ /* rename to permanent file, fsync file and directory */
+ if (rename(tmppath, path) != 0)
+ {
+ ereport(elevel,
+ (errcode_for_file_access(),
+ errmsg("could not rename \"%s\" to \"%s\": %m",
+ tmppath, path)));
+ return;
+ }
+
+ /* Check CreateSlot() for the reasoning of using a crit. section. */
+ START_CRIT_SECTION();
+
+ fsync_fname(path, false);
+ fsync_fname((char *) dir, true);
+ fsync_fname("pg_replslot", true);
+
+ END_CRIT_SECTION();
+
+ /*
+ * Successfully wrote, unset dirty bit, unless somebody dirtied again
+ * already.
+ */
+ {
+ volatile ReplicationSlot *vslot = slot;
+
+ SpinLockAcquire(&vslot->mutex);
+ if (!vslot->just_dirtied)
+ vslot->dirty = false;
+ SpinLockRelease(&vslot->mutex);
+ }
+
+ LWLockRelease(slot->io_in_progress_lock);
+}
+
+/*
+ * Load a single slot from disk into memory.
+ */
+static void
+RestoreSlotFromDisk(const char *name)
+{
+ ReplicationSlotOnDisk cp;
+ int i;
+ char path[MAXPGPATH];
+ int fd;
+ bool restored = false;
+ int readBytes;
+ pg_crc32 checksum;
+
+ /* no need to lock here, no concurrent access allowed yet */
+
+ /* delete temp file if it exists */
+ sprintf(path, "pg_replslot/%s/state.tmp", name);
+ if (unlink(path) < 0 && errno != ENOENT)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not unlink file \"%s\": %m", path)));
+
+ sprintf(path, "pg_replslot/%s/state", name);
+
+ elog(DEBUG1, "restoring replication slot from \"%s\"", path);
+
+ fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
+
+ /*
+ * We do not need to handle this as we are rename()ing the directory into
+ * place only after we fsync()ed the state file.
+ */
+ if (fd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m", path)));
+
+ /*
+ * Sync state file before we're reading from it. We might have crashed
+ * while it wasn't synced yet and we shouldn't continue on that basis.
+ */
+ if (pg_fsync(fd) != 0)
+ {
+ CloseTransientFile(fd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not fsync file \"%s\": %m",
+ path)));
+ }
+
+ /* Also sync the parent directory */
+ START_CRIT_SECTION();
+ fsync_fname(path, true);
+ END_CRIT_SECTION();
+
+ /* read part of statefile that's guaranteed to be version independent */
+ readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
+ if (readBytes != ReplicationSlotOnDiskConstantSize)
+ {
+ int saved_errno = errno;
+
+ CloseTransientFile(fd);
+ errno = saved_errno;
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\", read %d of %u: %m",
+ path, readBytes,
+ (uint32) ReplicationSlotOnDiskConstantSize)));
+ }
+
+ /* verify magic */
+ if (cp.magic != SLOT_MAGIC)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("replication slot file \"%s\" has wrong magic %u instead of %u",
+ path, cp.magic, SLOT_MAGIC)));
+
+ /* verify version */
+ if (cp.version != SLOT_VERSION)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("replication slot file \"%s\" has unsupported version %u",
+ path, cp.version)));
+
+ /* boundary check on length */
+ if (cp.length != ReplicationSlotOnDiskDynamicSize)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("replication slot file \"%s\" has corrupted length %u",
+ path, cp.length)));
+
+ /* Now that we know the size, read the entire file */
+ readBytes = read(fd,
+ (char *)&cp + ReplicationSlotOnDiskConstantSize,
+ cp.length);
+ if (readBytes != cp.length)
+ {
+ int saved_errno = errno;
+
+ CloseTransientFile(fd);
+ errno = saved_errno;
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\", read %d of %u: %m",
+ path, readBytes, cp.length)));
+ }
+
+ CloseTransientFile(fd);
+
+ /* now verify the CRC32 */
+ INIT_CRC32(checksum);
+ COMP_CRC32(checksum,
+ (char *)&cp + ReplicationSlotOnDiskConstantSize,
+ ReplicationSlotOnDiskDynamicSize);
+
+ if (!EQ_CRC32(checksum, cp.checksum))
+ ereport(PANIC,
+ (errmsg("replication slot file %s: checksum mismatch, is %u, should be %u",
+ path, checksum, cp.checksum)));
+
+ /* nothing can be active yet, don't lock anything */
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *slot;
+
+ slot = &ReplicationSlotCtl->replication_slots[i];
+
+ if (slot->in_use)
+ continue;
+
+ /* restore the entire set of persistent data */
+ memcpy(&slot->data, &cp.slotdata,
+ sizeof(ReplicationSlotPersistentData));
+
+ /* initialize in memory state */
+ slot->effective_xmin = cp.slotdata.xmin;
+ slot->in_use = true;
+ slot->active = false;
+
+ restored = true;
+ break;
+ }
+
+ if (!restored)
+ ereport(PANIC,
+ (errmsg("too many replication slots active before shutdown"),
+ errhint("Increase max_replication_slots and try again.")));
+}
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * slotfuncs.c
+ * Support functions for replication slots
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/slotfuncs.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "access/htup_details.h"
+#include "utils/builtins.h"
+#include "replication/slot.h"
+
+Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS);
+Datum pg_drop_replication_slot(PG_FUNCTION_ARGS);
+
+static void
+check_permissions(void)
+{
+ if (!superuser() && !has_rolreplication(GetUserId()))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ (errmsg("must be superuser or replication role to use replication slots"))));
+}
+
+/*
+ * SQL function for creating a new physical (streaming replication)
+ * replication slot.
+ */
+Datum
+pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
+{
+ Name name = PG_GETARG_NAME(0);
+ Datum values[2];
+ bool nulls[2];
+ TupleDesc tupdesc;
+ HeapTuple tuple;
+ Datum result;
+
+ check_permissions();
+
+ CheckSlotRequirements();
+
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ /* acquire replication slot, this will check for conflicting names*/
+ ReplicationSlotCreate(NameStr(*name), false);
+
+ values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name));
+
+ nulls[0] = false;
+ nulls[1] = true;
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+ result = HeapTupleGetDatum(tuple);
+
+ ReplicationSlotRelease();
+
+ PG_RETURN_DATUM(result);
+}
+
+/*
+ * SQL function for dropping a replication slot.
+ */
+Datum
+pg_drop_replication_slot(PG_FUNCTION_ARGS)
+{
+ Name name = PG_GETARG_NAME(0);
+
+ check_permissions();
+
+ CheckSlotRequirements();
+
+ ReplicationSlotDrop(NameStr(*name));
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * pg_get_replication_slots - SQL SRF showing active replication slots.
+ */
+Datum
+pg_get_replication_slots(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_REPLICATION_SLOTS_COLS 6
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ Tuplestorestate *tupstore;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ int slotno;
+
+ /* check to see if caller supports us returning a tuplestore */
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mode required, but it is not " \
+ "allowed in this context")));
+
+ /* Build a tuple descriptor for our result type */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ /*
+ * We don't require any special permission to see this function's data
+ * because nothing should be sensitive. The most critical being the slot
+ * name, which shouldn't contain anything particularly sensitive.
+ */
+
+ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ for (slotno = 0; slotno < max_replication_slots; slotno++)
+ {
+ ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
+ Datum values[PG_STAT_GET_REPLICATION_SLOTS_COLS];
+ bool nulls[PG_STAT_GET_REPLICATION_SLOTS_COLS];
+
+ TransactionId xmin;
+ XLogRecPtr restart_lsn;
+ bool active;
+ Oid database;
+ const char *slot_name;
+
+ char restart_lsn_s[MAXFNAMELEN];
+ int i;
+
+ SpinLockAcquire(&slot->mutex);
+ if (!slot->in_use)
+ {
+ SpinLockRelease(&slot->mutex);
+ continue;
+ }
+ else
+ {
+ xmin = slot->data.xmin;
+ database = slot->data.database;
+ restart_lsn = slot->data.restart_lsn;
+ slot_name = pstrdup(NameStr(slot->data.name));
+
+ active = slot->active;
+ }
+ SpinLockRelease(&slot->mutex);
+
+ memset(nulls, 0, sizeof(nulls));
+
+ snprintf(restart_lsn_s, sizeof(restart_lsn_s), "%X/%X",
+ (uint32) (restart_lsn >> 32), (uint32) restart_lsn);
+
+ i = 0;
+ values[i++] = CStringGetTextDatum(slot_name);
+ if (database == InvalidOid)
+ values[i++] = CStringGetTextDatum("physical");
+ else
+ values[i++] = CStringGetTextDatum("logical");
+ values[i++] = database;
+ values[i++] = BoolGetDatum(active);
+ if (xmin != InvalidTransactionId)
+ values[i++] = TransactionIdGetDatum(xmin);
+ else
+ nulls[i++] = true;
+ if (restart_lsn != InvalidTransactionId)
+ values[i++] = CStringGetTextDatum(restart_lsn_s);
+ else
+ nulls[i++] = true;
+
+ tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+ }
+
+ tuplestore_donestoring(tupstore);
+
+ return (Datum) 0;
+}
WalReceiverMain(void)
{
char conninfo[MAXCONNINFO];
+ char slotname[NAMEDATALEN];
XLogRecPtr startpoint;
TimeLineID startpointTLI;
TimeLineID primaryTLI;
/* Fetch information required to start streaming */
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+ strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
startpoint = walrcv->receiveStart;
startpointTLI = walrcv->receiveStartTLI;
* on the new timeline.
*/
ThisTimeLineID = startpointTLI;
- if (walrcv_startstreaming(startpointTLI, startpoint))
+ if (walrcv_startstreaming(startpointTLI, startpoint,
+ slotname[0] != '\0' ? slotname : NULL))
{
bool endofwal = false;
/*
* Request postmaster to start walreceiver.
*
- * recptr indicates the position where streaming should begin, and conninfo
- * is a libpq connection string to use.
+ * recptr indicates the position where streaming should begin, conninfo
+ * is a libpq connection string to use, and slotname is, optionally, the name
+ * of a replication slot to acquire.
*/
void
-RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
+RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
+ const char *slotname)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
else
walrcv->conninfo[0] = '\0';
+ if (slotname != NULL)
+ strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
+ else
+ walrcv->slotname[0] = '\0';
+
if (walrcv->walRcvState == WALRCV_STOPPED)
{
launch = true;
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "replication/basebackup.h"
+#include "replication/slot.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
void
WalSndErrorCleanup()
{
+ LWLockReleaseAll();
+
if (sendFile >= 0)
{
close(sendFile);
sendFile = -1;
}
+ if (MyReplicationSlot != NULL)
+ ReplicationSlotRelease();
+
replication_active = false;
if (walsender_ready_to_stop)
proc_exit(0);
* written at wal_level='minimal'.
*/
+ if (cmd->slotname)
+ {
+ ReplicationSlotAcquire(cmd->slotname);
+ if (MyReplicationSlot->data.database != InvalidOid)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ (errmsg("cannot use a replication slot created for changeset extraction for streaming replication"))));
+ }
+
/*
* Select the timeline. If it was given explicitly by the client, use
* that. Otherwise use the timeline of the last replayed record, which is
Assert(streamingDoneSending && streamingDoneReceiving);
}
+ if (cmd->slotname)
+ ReplicationSlotRelease();
+
/*
* Copy is finished now. Send a single-row result set indicating the next
* timeline.
pq_puttextmessage('C', "START_STREAMING");
}
+/*
+ * Create a new replication slot.
+ */
+static void
+CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
+{
+ const char *slot_name;
+ StringInfoData buf;
+
+ Assert(!MyReplicationSlot);
+
+ /* setup state for XLogReadPage */
+ sendTimeLineIsHistoric = false;
+ sendTimeLine = ThisTimeLineID;
+
+ ReplicationSlotCreate(cmd->slotname, cmd->kind == REPLICATION_KIND_LOGICAL);
+
+ initStringInfo(&output_message);
+
+ slot_name = NameStr(MyReplicationSlot->data.name);
+
+ /*
+ * It may seem somewhat pointless to send back the same slot name the
+ * client just requested and nothing else, but logical replication
+ * will add more fields here. (We could consider removing the slot
+ * name from what's sent back, though, since the client has specified
+ * that.)
+ */
+
+ pq_beginmessage(&buf, 'T');
+ pq_sendint(&buf, 1, 2); /* 1 field */
+
+ /* first field: slot name */
+ pq_sendstring(&buf, "slot_name"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ pq_endmessage(&buf);
+
+ /* Send a DataRow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint(&buf, 1, 2); /* # of columns */
+
+ /* slot_name */
+ pq_sendint(&buf, strlen(slot_name), 4); /* col1 len */
+ pq_sendbytes(&buf, slot_name, strlen(slot_name));
+
+ pq_endmessage(&buf);
+
+ /*
+ * release active status again, START_REPLICATION will reacquire it
+ */
+ ReplicationSlotRelease();
+}
+
+/*
+ * Get rid of a replication slot that is no longer wanted.
+ */
+static void
+DropReplicationSlot(DropReplicationSlotCmd *cmd)
+{
+ ReplicationSlotDrop(cmd->slotname);
+ EndCommand("DROP_REPLICATION_SLOT", DestRemote);
+}
+
/*
* Execute an incoming replication command.
*/
IdentifySystem();
break;
- case T_StartReplicationCmd:
- StartReplication((StartReplicationCmd *) cmd_node);
- break;
-
case T_BaseBackupCmd:
SendBaseBackup((BaseBackupCmd *) cmd_node);
break;
+ case T_CreateReplicationSlotCmd:
+ CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
+ break;
+
+ case T_DropReplicationSlotCmd:
+ DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
+ break;
+
+ case T_StartReplicationCmd:
+ {
+ StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
+ if (cmd->kind == REPLICATION_KIND_PHYSICAL)
+ StartReplication(cmd);
+ else
+ elog(ERROR, "cannot handle changeset extraction yet");
+ break;
+ }
+
case T_TimeLineHistoryCmd:
SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
break;
}
}
+/*
+ * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
+ */
+static void
+PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
+{
+ bool changed = false;
+ /* use volatile pointer to prevent code rearrangement */
+ volatile ReplicationSlot *slot = MyReplicationSlot;
+
+ Assert(lsn != InvalidXLogRecPtr);
+ SpinLockAcquire(&slot->mutex);
+ if (slot->data.restart_lsn != lsn)
+ {
+ changed = true;
+ slot->data.restart_lsn = lsn;
+ }
+ SpinLockRelease(&slot->mutex);
+
+ if (changed)
+ {
+ ReplicationSlotMarkDirty();
+ ReplicationSlotsComputeRequiredLSN();
+ }
+
+ /*
+ * One could argue that the slot should saved to disk now, but that'd be
+ * energy wasted - the worst lost information can do here is give us wrong
+ * information in a statistics view - we'll just potentially be more
+ * conservative in removing files.
+ */
+}
+
/*
* Regular reply from standby advising of WAL positions on standby server.
*/
if (!am_cascading_walsender)
SyncRepReleaseWaiters();
+
+ /*
+ * Advance our local xmin horizon when the client confirmed a flush.
+ */
+ if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
+ {
+ if (MyReplicationSlot->data.database != InvalidOid)
+ elog(ERROR, "cannot handle changeset extraction yet");
+ else
+ PhysicalConfirmReceivedLocation(flushPtr);
+ }
+}
+
+/* compute new replication slot xmin horizon if needed */
+static void
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+{
+ bool changed = false;
+ volatile ReplicationSlot *slot = MyReplicationSlot;
+
+ SpinLockAcquire(&slot->mutex);
+ MyPgXact->xmin = InvalidTransactionId;
+ /*
+ * For physical replication we don't need the the interlock provided
+ * by xmin and effective_xmin since the consequences of a missed increase
+ * are limited to query cancellations, so set both at once.
+ */
+ if (!TransactionIdIsNormal(slot->data.xmin) ||
+ !TransactionIdIsNormal(feedbackXmin) ||
+ TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
+ {
+ changed = true;
+ slot->data.xmin = feedbackXmin;
+ slot->effective_xmin = feedbackXmin;
+ }
+ SpinLockRelease(&slot->mutex);
+
+ if (changed)
+ {
+ ReplicationSlotMarkDirty();
+ ReplicationSlotsComputeRequiredXmin();
+ }
}
/*
if (!TransactionIdIsNormal(feedbackXmin))
{
MyPgXact->xmin = InvalidTransactionId;
+ if (MyReplicationSlot != NULL)
+ PhysicalReplicationSlotNewXmin(feedbackXmin);
return;
}
* GetOldestXmin. (If we're moving our xmin forward, this is obviously
* safe, and if we're moving it backwards, well, the data is at risk
* already since a VACUUM could have just finished calling GetOldestXmin.)
+ *
+ * If we're using a replication slot we reserve the xmin via that,
+ * otherwise via the walsender's PGXACT entry.
+
+ * XXX: It might make sense to introduce ephemeral slots and always use
+ * the slot mechanism.
*/
- MyPgXact->xmin = feedbackXmin;
+ if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
+ PhysicalReplicationSlotNewXmin(feedbackXmin);
+ else
+ MyPgXact->xmin = feedbackXmin;
}
/* Main loop of walsender process that streams the WAL over Copy messages. */
#include "postmaster/bgworker_internals.h"
#include "postmaster/bgwriter.h"
#include "postmaster/postmaster.h"
+#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
size = add_size(size, ProcSignalShmemSize());
size = add_size(size, CheckpointerShmemSize());
size = add_size(size, AutoVacuumShmemSize());
+ size = add_size(size, ReplicationSlotsShmemSize());
size = add_size(size, WalSndShmemSize());
size = add_size(size, WalRcvShmemSize());
size = add_size(size, BTreeShmemSize());
ProcSignalShmemInit();
CheckpointerShmemInit();
AutoVacuumShmemInit();
+ ReplicationSlotsShmemInit();
WalSndShmemInit();
WalRcvShmemInit();
*/
TransactionId lastOverflowedXid;
+ /* oldest xmin of any replication slot */
+ TransactionId replication_slot_xmin;
+
/*
* We declare pgprocnos[] as 1 entry because C wants a fixed-size array,
* but actually it is maxProcs entries long.
*/
procArray->numProcs = 0;
procArray->maxProcs = PROCARRAY_MAXPROCS;
+ procArray->replication_slot_xmin = InvalidTransactionId;
procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
procArray->numKnownAssignedXids = 0;
procArray->tailKnownAssignedXids = 0;
ProcArrayStruct *arrayP = procArray;
TransactionId result;
int index;
+ volatile TransactionId replication_slot_xmin = InvalidTransactionId;
/* Cannot look for individual databases during recovery */
Assert(allDbs || !RecoveryInProgress());
}
}
+ /* fetch into volatile var while ProcArrayLock is held */
+ replication_slot_xmin = procArray->replication_slot_xmin;
+
if (RecoveryInProgress())
{
/*
result = FirstNormalTransactionId;
}
+ /*
+ * Check whether there are replication slots requiring an older xmin.
+ */
+ if (TransactionIdIsValid(replication_slot_xmin) &&
+ NormalTransactionIdPrecedes(replication_slot_xmin, result))
+ result = replication_slot_xmin;
+
return result;
}
int count = 0;
int subcount = 0;
bool suboverflowed = false;
+ volatile TransactionId replication_slot_xmin = InvalidTransactionId;
Assert(snapshot != NULL);
suboverflowed = true;
}
+
+ /* fetch into volatile var while ProcArrayLock is held */
+ replication_slot_xmin = procArray->replication_slot_xmin;
+
if (!TransactionIdIsValid(MyPgXact->xmin))
MyPgXact->xmin = TransactionXmin = xmin;
+
LWLockRelease(ProcArrayLock);
/*
RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
if (!TransactionIdIsNormal(RecentGlobalXmin))
RecentGlobalXmin = FirstNormalTransactionId;
+
+ /* Check whether there's a replication slot requiring an older xmin. */
+ if (TransactionIdIsValid(replication_slot_xmin) &&
+ NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
+ RecentGlobalXmin = replication_slot_xmin;
+
RecentXmin = xmin;
snapshot->xmin = xmin;
return true; /* timed out, still conflicts */
}
+/*
+ * ProcArraySetReplicationSlotXmin
+ *
+ * Install limits to future computations of the xmin horizon to prevent vacuum
+ * and HOT pruning from removing affected rows still needed by clients with
+ * replicaton slots.
+ */
+void
+ProcArraySetReplicationSlotXmin(TransactionId xmin)
+{
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ procArray->replication_slot_xmin = xmin;
+ LWLockRelease(ProcArrayLock);
+}
+
#define XidCacheRemove(i) \
do { \
#include "commands/async.h"
#include "miscadmin.h"
#include "pg_trace.h"
+#include "replication/slot.h"
#include "storage/ipc.h"
#include "storage/predicate.h"
#include "storage/proc.h"
/* predicate.c needs one per old serializable xid buffer */
numLocks += NUM_OLDSERXID_BUFFERS;
+ /* slot.c needs one for each slot */
+ numLocks += max_replication_slots;
+
/*
* Add any requested by loadable modules; for backwards-compatibility
* reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if
#include "access/xact.h"
#include "miscadmin.h"
#include "postmaster/autovacuum.h"
+#include "replication/slot.h"
#include "replication/syncrep.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
/* Make sure we're out of the sync rep lists */
SyncRepCleanupAtProcExit();
+ /* Make sure active replication slots are released */
+ if (MyReplicationSlot != NULL)
+ ReplicationSlotRelease();
+
#ifdef USE_ASSERT_CHECKING
if (assert_enabled)
{
#include "postmaster/postmaster.h"
#include "postmaster/syslogger.h"
#include "postmaster/walwriter.h"
+#include "replication/slot.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
NULL, NULL, NULL
},
+ {
+ /* see max_connections */
+ {"max_replication_slots", PGC_POSTMASTER, REPLICATION_SENDING,
+ gettext_noop("Sets the maximum number of simultaneously defined replication slots."),
+ NULL
+ },
+ &max_replication_slots,
+ 0, 0, MAX_BACKENDS /* XXX?*/,
+ NULL, NULL, NULL
+ },
+
{
{"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING,
gettext_noop("Sets the maximum time to wait for WAL replication."),
#wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
+#max_replication_slots = 0 # max number of replication slots.
+ # (change requires restart)
+
# - Master Server -
# These settings are ignored on a standby server.
"pg_multixact/offsets",
"base",
"base/1",
+ "pg_replslot",
"pg_tblspc",
"pg_stat",
"pg_stat_tmp"
printf(_(" -U, --username=NAME connect as specified database user\n"));
printf(_(" -w, --no-password never prompt for password\n"));
printf(_(" -W, --password force password prompt (should happen automatically)\n"));
+ printf(_(" --slot replication slot to use\n"));
printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
}
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"status-interval", required_argument, NULL, 's'},
+ {"slot", required_argument, NULL, 'S'},
{"verbose", no_argument, NULL, 'v'},
{NULL, 0, NULL, 0}
};
exit(1);
}
break;
+ case 'S':
+ replication_slot = pg_strdup(optarg);
+ break;
case 'n':
noloop = 1;
break;
/* fd and filename for currently open WAL file */
static int walfile = -1;
static char current_walfile_name[MAXPGPATH] = "";
+static bool reportFlushPosition = false;
+static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
* and returns false, otherwise returns true.
*/
static bool
-close_walfile(char *basedir, char *partial_suffix)
+close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
{
off_t currpos;
_("%s: not renaming \"%s%s\", segment is not complete\n"),
progname, current_walfile_name, partial_suffix);
+ lastFlushPosition = pos;
return true;
}
len += 1;
sendint64(blockpos, &replybuf[len]); /* write */
len += 8;
- sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
+ if (reportFlushPosition)
+ sendint64(lastFlushPosition, &replybuf[len]); /* flush */
+ else
+ sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
len += 8;
sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
len += 8;
int standby_message_timeout, char *partial_suffix)
{
char query[128];
+ char slotcmd[128];
PGresult *res;
XLogRecPtr stoppos;
if (!CheckServerVersionForStreaming(conn))
return false;
+ if (replication_slot != NULL)
+ {
+ /*
+ * Report the flush position, so the primary can know what WAL we'll
+ * possibly re-request, and remove older WAL safely.
+ *
+ * We only report it when a slot has explicitly been used, because
+ * reporting the flush position makes one elegible as a synchronous
+ * replica. People shouldn't include generic names in
+ * synchronous_standby_names, but we've protected them against it so
+ * far, so let's continue to do so in the situations when possible.
+ * If they've got a slot, though, we need to report the flush position,
+ * so that the master can remove WAL.
+ */
+ reportFlushPosition = true;
+ sprintf(slotcmd, "SLOT \"%s\" ", replication_slot);
+ }
+ else
+ {
+ reportFlushPosition = false;
+ slotcmd[0] = 0;
+ }
+
if (sysidentifier != NULL)
{
/* Validate system identifier hasn't changed */
PQclear(res);
}
+ /*
+ * initialize flush position to starting point, it's the caller's
+ * responsibility that that's sane.
+ */
+ lastFlushPosition = startpos;
+
while (1)
{
/*
return true;
/* Initiate the replication stream at specified location */
- snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u",
+ snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
+ slotcmd,
(uint32) (startpos >> 32), (uint32) startpos,
timeline);
res = PQexec(conn, query);
*/
if (still_sending && stream_stop(blockpos, timeline, false))
{
- if (!close_walfile(basedir, partial_suffix))
+ if (!close_walfile(basedir, partial_suffix, blockpos))
{
/* Potential error message is written by close_walfile */
goto error;
*/
if (still_sending)
{
- if (!close_walfile(basedir, partial_suffix))
+ if (!close_walfile(basedir, partial_suffix, blockpos))
{
/* Error message written in close_walfile() */
goto error;
/* Did we reach the end of a WAL segment? */
if (blockpos % XLOG_SEG_SIZE == 0)
{
- if (!close_walfile(basedir, partial_suffix))
+ if (!close_walfile(basedir, partial_suffix, blockpos))
/* Error message written in close_walfile() */
goto error;
char *dbhost = NULL;
char *dbuser = NULL;
char *dbport = NULL;
+char *replication_slot = NULL;
int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
static char *dbpassword = NULL;
PGconn *conn = NULL;
extern char *dbuser;
extern char *dbport;
extern int dbgetpassword;
+extern char *replication_slot;
/* Connection kept global so we can disconnect easily */
extern PGconn *conn;
extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
extern void XLogSetAsyncXactLSN(XLogRecPtr record);
+extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn);
extern Buffer RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record,
int block_index,
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 201401301
+#define CATALOG_VERSION_NO 201401311
#endif
DATA(insert OID = 3473 ( spg_range_quad_leaf_consistent PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 16 "2281 2281" _null_ _null_ _null_ _null_ spg_range_quad_leaf_consistent _null_ _null_ _null_ ));
DESCR("SP-GiST support for quad tree over range");
+/* replication slots */
+DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2249 "19" "{19,25,25}" "{i,o,o}" "{slotname,slotname,xlog_position}" _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
+DESCR("create a physical replication slot");
+DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
+DESCR("drop a replication slot");
+DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{25,25,26,16,28,25}" "{o,o,o,o,o,o}" "{slot_name,slot_type,datoid,active,xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
+DESCR("information about replication slots currently in use");
+
/* event triggers */
DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s 0 0 2249 "" "{26,26,23,25,25,25,25}" "{o,o,o,o,o,o,o}" "{classid, objid, objsubid, object_type, schema_name, object_name, object_identity}" _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
DESCR("list objects dropped by the current command");
*/
T_IdentifySystemCmd,
T_BaseBackupCmd,
+ T_CreateReplicationSlotCmd,
+ T_DropReplicationSlotCmd,
T_StartReplicationCmd,
T_TimeLineHistoryCmd,
#include "access/xlogdefs.h"
#include "nodes/pg_list.h"
+typedef enum ReplicationKind {
+ REPLICATION_KIND_PHYSICAL,
+ REPLICATION_KIND_LOGICAL
+} ReplicationKind;
+
/* ----------------------
* IDENTIFY_SYSTEM command
} BaseBackupCmd;
+/* ----------------------
+ * CREATE_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct CreateReplicationSlotCmd
+{
+ NodeTag type;
+ char *slotname;
+ ReplicationKind kind;
+ char *plugin;
+} CreateReplicationSlotCmd;
+
+
+/* ----------------------
+ * DROP_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct DropReplicationSlotCmd
+{
+ NodeTag type;
+ char *slotname;
+} DropReplicationSlotCmd;
+
+
/* ----------------------
* START_REPLICATION command
* ----------------------
typedef struct StartReplicationCmd
{
NodeTag type;
+ ReplicationKind kind;
+ char *slotname;
TimeLineID timeline;
XLogRecPtr startpoint;
+ List *options;
} StartReplicationCmd;
--- /dev/null
+/*-------------------------------------------------------------------------
+ * slot.h
+ * Replication slot management.
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SLOT_H
+#define SLOT_H
+
+#include "fmgr.h"
+#include "access/xlog.h"
+#include "access/xlogreader.h"
+#include "storage/lwlock.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+
+typedef struct ReplicationSlotPersistentData
+{
+ /* The slot's identifier */
+ NameData name;
+
+ /* database the slot is active on */
+ Oid database;
+
+ /*
+ * xmin horizon for data
+ *
+ * NB: This may represent a value that hasn't been written to disk yet;
+ * see notes for effective_xmin, below.
+ */
+ TransactionId xmin;
+
+ /* oldest LSN that might be required by this replication slot */
+ XLogRecPtr restart_lsn;
+
+} ReplicationSlotPersistentData;
+
+/*
+ * Shared memory state of a single replication slot.
+ */
+typedef struct ReplicationSlot
+{
+ /* lock, on same cacheline as effective_xmin */
+ slock_t mutex;
+
+ /* is this slot defined */
+ bool in_use;
+
+ /* is somebody streaming out changes for this slot */
+ bool active;
+
+ /* any outstanding modifications? */
+ bool just_dirtied;
+ bool dirty;
+
+ /*
+ * For logical decoding, it's extremely important that we never remove any
+ * data that's still needed for decoding purposes, even after a crash;
+ * otherwise, decoding will produce wrong answers. Ordinary streaming
+ * replication also needs to prevent old row versions from being removed
+ * too soon, but the worst consequence we might encounter there is unwanted
+ * query cancellations on the standby. Thus, for logical decoding,
+ * this value represents the latest xmin that has actually been
+ * written to disk, whereas for streaming replication, it's just the
+ * same as the persistent value (data.xmin).
+ */
+ TransactionId effective_xmin;
+
+ /* data surviving shutdowns and crashes */
+ ReplicationSlotPersistentData data;
+
+ /* is somebody performing io on this slot? */
+ LWLock *io_in_progress_lock;
+} ReplicationSlot;
+
+/*
+ * Shared memory control area for all of replication slots.
+ */
+typedef struct ReplicationSlotCtlData
+{
+ ReplicationSlot replication_slots[1];
+} ReplicationSlotCtlData;
+
+/*
+ * Pointers to shared memory
+ */
+extern ReplicationSlotCtlData *ReplicationSlotCtl;
+extern ReplicationSlot *MyReplicationSlot;
+
+/* GUCs */
+extern PGDLLIMPORT int max_replication_slots;
+
+/* shmem initialization functions */
+extern Size ReplicationSlotsShmemSize(void);
+extern void ReplicationSlotsShmemInit(void);
+
+/* management of individual slots */
+extern void ReplicationSlotCreate(const char *name, bool db_specific);
+extern void ReplicationSlotDrop(const char *name);
+extern void ReplicationSlotAcquire(const char *name);
+extern void ReplicationSlotRelease(void);
+extern void ReplicationSlotSave(void);
+extern void ReplicationSlotMarkDirty(void);
+
+/* misc stuff */
+extern bool ReplicationSlotValidateName(const char *name, int elevel);
+extern void ReplicationSlotsComputeRequiredXmin(void);
+extern void ReplicationSlotsComputeRequiredLSN(void);
+extern void StartupReplicationSlots(XLogRecPtr checkPointRedo);
+extern void CheckPointReplicationSlots(void);
+
+extern void CheckSlotRequirements(void);
+extern void ReplicationSlotAtProcExit(void);
+
+/* SQL callable functions */
+extern Datum pg_get_replication_slots(PG_FUNCTION_ARGS);
+
+#endif /* SLOT_H */
*/
char conninfo[MAXCONNINFO];
+ /*
+ * replication slot name; is also used for walreceiver to connect with
+ * the primary
+ */
+ char slotname[NAMEDATALEN];
+
slock_t mutex; /* locks shared variables shown above */
/*
typedef void (*walrcv_readtimelinehistoryfile_type) (TimeLineID tli, char **filename, char **content, int *size);
extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile;
-typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint);
+typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint, char *slotname);
extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;
typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli);
extern void ShutdownWalRcv(void);
extern bool WalRcvStreaming(void);
extern bool WalRcvRunning(void);
-extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo);
+extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
+ const char *conninfo, const char *slotname);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
extern int GetReplicationApplyDelay(void);
extern int GetReplicationTransferLatency(void);
#define BackgroundWorkerLock (&MainLWLockArray[33].lock)
#define DynamicSharedMemoryControlLock (&MainLWLockArray[34].lock)
#define AutoFileLock (&MainLWLockArray[35].lock)
-#define NUM_INDIVIDUAL_LWLOCKS 36
+#define ReplicationSlotAllocationLock (&MainLWLockArray[36].lock)
+#define ReplicationSlotControlLock (&MainLWLockArray[37].lock)
+#define NUM_INDIVIDUAL_LWLOCKS 38
/*
* It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS
int nxids, const TransactionId *xids,
TransactionId latestXid);
+extern void ProcArraySetReplicationSlotXmin(TransactionId xmin);
+
#endif /* PROCARRAY_H */
FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_replication_slots| SELECT l.slot_name,
+ l.slot_type,
+ l.datoid,
+ d.datname AS database,
+ l.active,
+ l.xmin,
+ l.restart_lsn
+ FROM (pg_get_replication_slots() l(slot_name, slot_type, datoid, active, xmin, restart_lsn)
+ LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,
pg_authid.rolinherit,
CreateOpClassStmt
CreateOpFamilyStmt
CreatePLangStmt
+CreateReplicationSlotCmd
CreateRangeStmt
CreateRoleStmt
CreateSchemaStmt
DomainIOData
DropBehavior
DropOwnedStmt
+DropReplicationSlotCmd
DropRoleStmt
DropStmt
DropTableSpaceStmt