]> granicus.if.org Git - postgresql/commitdiff
Documentation for logical decoding.
authorRobert Haas <rhaas@postgresql.org>
Tue, 18 Mar 2014 17:20:01 +0000 (13:20 -0400)
committerRobert Haas <rhaas@postgresql.org>
Tue, 18 Mar 2014 17:20:01 +0000 (13:20 -0400)
Craig Ringer, Andres Freund, Christian Kruse, with edits by me.

doc/src/sgml/catalogs.sgml
doc/src/sgml/filelist.sgml
doc/src/sgml/func.sgml
doc/src/sgml/logicaldecoding.sgml [new file with mode: 0644]
doc/src/sgml/postgres.sgml
doc/src/sgml/protocol.sgml
doc/src/sgml/ref/allfiles.sgml
doc/src/sgml/ref/alter_table.sgml
doc/src/sgml/ref/create_table.sgml
doc/src/sgml/ref/pg_recvlogical.sgml [new file with mode: 0644]
doc/src/sgml/reference.sgml

index 908f947f81ab1e3da884921b726369cfe1195751..a12ee56ad02af2772d7e34c094437774a0cee19f 100644 (file)
 
   <para>
    For more on replication slots,
-   see <xref linkend="streaming-replication-slots">.
+   see <xref linkend="streaming-replication-slots"> and <xref linkend="logicaldecoding">.
   </para>
 
   <table>
       <entry>The slot type - <literal>physical</> or <literal>logical</></entry>
      </row>
 
+     <row>
+      <entry><structfield>plugin</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry></entry>
+      <entry>The basename of the shared object containing the output plugin this logical slot is using, or null for physical slots.</entry>
+     </row>
+
      <row>
       <entry><structfield>datoid</structfield></entry>
       <entry><type>oid</type></entry>
       </entry>
      </row>
 
+     <row>
+      <entry><structfield>xmin</structfield></entry>
+      <entry><type>xid</type></entry>
+      <entry></entry>
+      <entry>The oldest transaction that this slot needs the database to
+      retain.  <literal>VACUUM</literal> cannot remove catalog tuples deleted
+      by any later transaction.
+      </entry>
+     </row>
+
+     <row>
+      <entry><structfield>catalog_xmin</structfield></entry>
+      <entry><type>xid</type></entry>
+      <entry></entry>
+      <entry>The <literal>xmin</literal>, or oldest transaction ID, that this
+      slot forces to be retained in the system catalogs. </entry>
+     </row>
+
      <row>
       <entry><structfield>restart_lsn</structfield></entry>
       <entry><type>pg_lsn</type></entry>
index 0e863ee064e11f7fb40494b4576506e68068cab8..6c8e254a584460dbdb5fb26248635105c0190c56 100644 (file)
@@ -91,6 +91,7 @@
 <!ENTITY nls        SYSTEM "nls.sgml">
 <!ENTITY plhandler  SYSTEM "plhandler.sgml">
 <!ENTITY fdwhandler SYSTEM "fdwhandler.sgml">
+<!ENTITY logicaldecoding SYSTEM "logicaldecoding.sgml">
 <!ENTITY protocol   SYSTEM "protocol.sgml">
 <!ENTITY sources    SYSTEM "sources.sgml">
 <!ENTITY storage    SYSTEM "storage.sgml">
index 080da4345919ace2747a76fb0cbd1e5936a71ecf..71b9829d852b83bfb9557e4bdc3bc29aa9aadd3d 100644 (file)
@@ -16437,9 +16437,108 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         command <literal>DROP_REPLICATION_SLOT</>.
        </entry>
       </row>
+
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_create_logical_replication_slot</primary>
+        </indexterm>
+        <literal><function>pg_create_logical_replication_slot(<parameter>slotname</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type>)</function></literal>
+       </entry>
+       <entry>
+        (<parameter>slotname</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>)
+       </entry>
+       <entry>
+        Creates a new logical (decoding) replication slot named
+        <parameter>slotname</parameter> using the output plugin
+        <parameter>plugin</parameter>.  A call to this function has the same
+        effect as the replication protocol command
+        <literal>CREATE REPLICATION SLOT ... LOGICAL</literal>.
+       </entry>
+      </row>
+
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_logical_slot_get_changes</primary>
+        </indexterm>
+        <literal><function>pg_logical_slot_get_changes(<parameter>slotname</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>, <parameter>upto_nchanges</parameter> <type>int</type>, VARIADIC <parameter>options</parameter> <type>text[]</type>)</function></literal>
+       </entry>
+       <entry>
+        (<parameter>location</parameter> <type>pg_lsn</type>, <parameter>xid</parameter> <type>xid</type>, <parameter>data</parameter> <type>text</type>)
+       </entry>
+       <entry>
+        Returns changes in the slot <parameter>slotname</parameter>, starting
+        from the point at which since changes have been consumed last.  If
+        <parameter>upto_lsn</> and <parameter>upto_nchanges</> are NULL,
+        logical decoding will continue until end of WAL.  If 
+        <parameter>upto_lsn</> is non-NULL, decoding will include only
+        those transactions which commit prior to the specified LSN.  If
+        <parameter>upto_nchanges</parameter> is non-NULL, decoding will
+        stop when the number of rows produced by decoding exceeds
+        the specified value.  Note, however, that the actual number of
+        rows returned may be larger, since this limit is only checked after
+        adding the rows produced when decoding each new transaction commit.
+       </entry>
+      </row>
+
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_logical_slot_peek_changes</primary>
+        </indexterm>
+        <literal><function>pg_logical_slot_peek_changes(<parameter>slotname</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>, <parameter>upto_nchanges</parameter> <type>int</type>, VARIADIC <parameter>options</parameter> <type>text[]</type>)</function></literal>
+       </entry>
+       <entry>
+        (<parameter>location</parameter> <type>text</type>, <parameter>xid</parameter> <type>xid</type>, <parameter>data</parameter> <type>text</type>)
+       </entry>
+       <entry>
+        Behaves just like
+        the <function>pg_logical_slot_get_changes()</function> function,
+        except that changes are not consumed; that is, they will be returned
+        again on future calls.
+       </entry>
+      </row>
+
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_logical_slot_get_binary_changes</primary>
+        </indexterm>
+        <literal><function>pg_logical_slot_get_binary_changes(<parameter>slotname</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>, <parameter>upto_nchanges</parameter> <type>int</type>, VARIADIC <parameter>options</parameter> <type>text[]</type>)</function></literal>
+       </entry>
+       <entry>
+        (<parameter>location</parameter> <type>pg_lsn</type>, <parameter>xid</parameter> <type>xid</type>, <parameter>data</parameter> <type>bytea</type>)
+       </entry>
+       <entry>
+        Behaves just like
+        the <function>pg_logical_slot_get_changes()</function> function,
+        except that changes are returned as <type>bytea</type>.
+       </entry>
+      </row>
+
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_logical_slot_peek_binary_changes</primary>
+        </indexterm>
+        <literal><function>pg_logical_slot_peek_binary_changes(<parameter>slotname</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>, <parameter>upto_nchanges</parameter> <type>int</type>, VARIADIC <parameter>options</parameter> <type>text[]</type>)</function></literal>
+       </entry>
+       <entry>
+        (<parameter>location</parameter> <type>pg_lsn</type>, <parameter>xid</parameter> <type>xid</type>, <parameter>data</parameter> <type>bytea</type>)
+       </entry>
+       <entry>
+        Behaves just like
+        the <function>pg_logical_slot_get_changes()</function> function,
+        except that changes are returned as <type>bytea</type> and that
+        changes are not consumed; that is, they will be returned again
+        on future calls.
+       </entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
+
   </sect2>
 
   <sect2 id="functions-admin-dbobject">
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
new file mode 100644 (file)
index 0000000..eabdd5f
--- /dev/null
@@ -0,0 +1,560 @@
+<!-- doc/src/sgml/logicaldecoding.sgml -->
+ <chapter id="logicaldecoding">
+  <title>Logical Decoding</title>
+  <indexterm zone="logicaldecoding">
+   <primary>Logical Decoding</primary>
+  </indexterm>
+  <para>
+   PostgreSQL provides infrastructure to stream the modifications performed
+   via SQL to external consumers.  This functionality can be used to for a
+   variety of purposes, including replication solutions and auditing.
+  </para>
+
+  <para>
+   Changes are sent out in streams identified by logical replication slots.
+   Each stream outputs each change exactly once.
+  </para>
+
+  <para>
+   The format in which those changes are streamed is determined by the output
+   plugin used.  An example plugin is provided, and additional plugins can be
+   written to extend the choice of available formats without modifying any
+   core code.
+   Every output plugin has access to each individual new row produced
+   by <command>INSERT</command> and the new row version created
+   by <command>UPDATE</command>.  Availability of old row versions for
+   <command>UPDATE</command> and delete <command>DELETE</command> depends on
+   the configured
+   <link linkend="SQL-CREATETABLE-REPLICA-IDENTITY"><literal>REPLICA
+   IDENTITY</literal></link>.
+  </para>
+
+  <para>
+   Changes can be consumed either using the streaming replication protocol
+   (see <xref linkend="protocol-replication"> and
+   <xref linkend="logicaldecoding-walsender">), or by calling functions
+   via SQL (see <xref linkend="logicaldecoding-sql">). It is also possible
+   to write additional methods of consuming the output of a replication slot
+   without modifying core code
+   (see <xref linkend="logicaldecoding-writer">).
+  </para>
+
+  <sect1 id="logicaldecoding-example">
+   <title>Logical Decoding Example</title>
+   <para>
+    The following example demonstartes the SQL interface.
+   </para>
+   <para>
+    Before you can use logical decoding, you must set
+    <xref linkend="guc-wal-level"> to logical and
+    <xref linkend="guc-max-replication-slots"> ot at least 1.
+    Then, you should connect to the target database (in the example
+    below, <literal>postgres</literal>) as a superuser.
+   </para>
+   <programlisting>
+postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding'
+postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+    slotname     | xlog_position
+-----------------+---------------
+ regression_slot | 0/16B1970
+(1 row)
+
+postgres=# SELECT * FROM pg_replication_slots;
+    slot_name    |    plugin     | slot_type | datoid | database | active |  xmin  | catalog_xmin | restart_lsn
+-----------------+---------------+-----------+--------+----------+--------+--------+--------------+-------------
+ regression_slot | test_decoding | logical   |  12052 | postgres | f      |        |          684 | 0/16A4408
+(1 row)
+
+postgres=# -- There are no changes to see yet
+postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
+ location | xid | data
+----------+-----+------
+(0 rows)
+
+postgres=# CREATE TABLE data(id serial primary key, data text);
+CREATE TABLE
+
+postgres=# -- DDL isn't replicated, so all you'll see is the transaction
+postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
+ location  | xid |    data
+-----------+-----+------------
+ 0/16D5D48 | 688 | BEGIN 688
+ 0/16E0380 | 688 | COMMIT 688
+(2 rows)
+
+postgres=# -- Once changes are read, they're consumed and not emitted
+postgres=# -- in a subsequent call:
+postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
+ location | xid | data
+----------+-----+------
+(0 rows)
+
+postgres=# BEGIN;
+postgres=# INSERT INTO data(data) VALUES('1');
+postgres=# INSERT INTO data(data) VALUES('2');
+postgres=# COMMIT;
+
+postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
+ location  | xid |                     data
+-----------+-----+-----------------------------------------------
+ 0/16E0478 | 689 | BEGIN 689
+ 0/16E0478 | 689 | table public.data: INSERT: id[int4]:1 data[text]:'1'
+ 0/16E0580 | 689 | table public.data: INSERT: id[int4]:2 data[text]:'2'
+ 0/16E0650 | 689 | COMMIT 689
+(4 rows)
+
+postgres=# INSERT INTO data(data) VALUES('3');
+
+postgres=# -- You can also peek ahead in the change stream without consuming changes
+postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
+ location  | xid |                     data
+-----------+-----+-----------------------------------------------
+ 0/16E09C0 | 690 | BEGIN 690
+ 0/16E09C0 | 690 | table public.data: INSERT: id[int4]:3 data[text]:'3'
+ 0/16E0B90 | 690 | COMMIT 690
+(3 rows)
+
+postgres=# -- You can also peek ahead in the change stream without consuming changes
+postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
+ location  | xid |                     data
+-----------+-----+-----------------------------------------------
+ 0/16E09C0 | 690 | BEGIN 690
+ 0/16E09C0 | 690 | table public.data: INSERT: id[int4]:3 data[text]:'3'
+ 0/16E0B90 | 690 | COMMIT 690
+(3 rows)
+
+postgres=# -- options can be passed to output plugin, to influence the formatting
+postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on');
+ location  | xid |                     data
+-----------+-----+-----------------------------------------------
+ 0/16E09C0 | 690 | BEGIN 690
+ 0/16E09C0 | 690 | table public.data: INSERT: id[int4]:3 data[text]:'3'
+ 0/16E0B90 | 690 | COMMIT 690 (at 2014-02-27 16:41:51.863092+01)
+(3 rows)
+
+postgres=# -- Remember to destroy a slot you no longer need to stop it consuming
+postgres=# -- server resources:
+postgres=# SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+-----------------------
+
+(1 row)
+    </programlisting>
+   <para>
+    The following example shows usage of the walsender interface using
+    the <link linkend="app-pgrecvlogical"><command>pg_recvlogical</command></link>
+    shell command. It requires the replication configurations to be allowed
+    (see <xref linkend="streaming-replication-authentication">)
+    and <varname>max_wal_senders</varname> to be set sufficiently high for
+    another connection.
+   </para>
+   <programlisting>
+# pg_recvlogical -d testdb --slot test --create
+# pg_recvlogical -d testdb --slot test --start -f -
+CTRL-Z
+# psql -c "INSERT INTO data(data) VALUES('4');"
+# fg
+BEGIN 693
+table public.data: INSERT: id[int4]:4 data[text]:'4'
+COMMIT 693
+CTRL-C
+# pg_recvlogical -d testdb --slot test --drop
+   </programlisting>
+  </sect1>
+  <sect1 id="logicaldecoding-explanation">
+   <title>Logical Decoding Concepts</title>
+   <sect2>
+    <indexterm>
+     <primary>Logical Decoding</primary>
+    </indexterm>
+    <title>Logical Decoding</title>
+    <para>
+     Logical decoding is the the process of extracting all persistent changes
+     to a database's tables into a coherent, easy to understand format which
+     can be interpreted without detailed knowledge of the database's internal
+     state.
+    </para>
+    <para>
+     In <productname>PostgreSQL</productname>, logical decoding is implemented
+     by decoding the contents of the <link linkend="wal">write-ahead
+     log</link>, which describe changes on a storage level, into an
+     application-specific form such as a stream of tuples or SQL statements.
+    </para>
+   </sect2>
+
+   <sect2>
+    <indexterm>
+     <primary>Logical Replication Slot</primary>
+    </indexterm>
+    <indexterm>
+     <primary>Replication Slot</primary>
+    </indexterm>
+    <title>Replication Slots</title>
+    <para>
+     In the context of logical replication, a slot represents a stream of
+     changes which can be replayed to a client in the order they were made on
+     the origin server. Each slot streams a sequence of changes from a single
+     database, sending each change exactly once (except when peeking forward
+     in the stream).
+    </para>
+    <note>
+     <para>PostgreSQL also has streaming replication slots
+     (see <xref linkend="streaming-replication">), but they are used somewhat
+     differently there.
+     </para>
+    </note>
+    <para>
+     Replication slots have an identifier which is unique across all databases
+     in a <productname>PostgreSQL</productname> cluster. Slots persist
+     independently of the connection using them and are crash-safe.
+    </para>
+    <para>
+     Multiple independent slots may exist for a single database. Each slot has
+     its own state, allowing different consumers to receive changes from
+     different points in the database change stream. For most applications, a
+     separate slot will be required for each consumer.
+    </para>
+    <para>
+     A logical replication slot knows nothing about the state of the
+     receiver(s).  It's even possible to have multiple different receivers using
+     the same slot at different times; they'll just get the changes following
+     on from when the last receiver stopped consuming them. Only one receiver
+     may consume changes from a slot at any given time.
+    </para>
+    <note>
+     <para>
+      Replication slots persist across crashes and know nothing about the state
+      of their consumer(s). They will prevent removal of required resources
+      even when there is no connection using them. This consumes storage
+      because neither required WAL nor required rows from the system catalogs
+      can be removed by VACUUM as long as they are required by a replication
+      slot, so if a slot is no longer required it should be dropped.
+     </para>
+    </note>
+   </sect2>
+   <sect2>
+    <title>Output Plugins</title>
+    <para>
+     Output plugins transform the data from the write-ahead log's internal
+     representation into the format the consumer of a replication slot desires.
+    </para>
+   </sect2>
+   <sect2>
+    <title>Exported Snapshots</title>
+    <para>
+     When a new replication slot is created using the walsender interface a
+     snapshot is exported
+     (see <xref linkend="functions-snapshot-synchronization">) which will show
+     exactly the state of the database after which all changes will be
+     included in the change stream. This can be used to create a new replica by
+     using <link linkend="sql-set-transaction"><literal>SET TRANSACTION
+     SNAPSHOT</literal></link> to read the state of the database at the moment
+     the slot was created. This transaction can then be used to dump the
+     database's state at that point in time which afterwards can be updated
+     using the slot's contents without loosing any changes.
+    </para>
+   </sect2>
+  </sect1>
+  <sect1 id="logicaldecoding-walsender">
+   <title>Streaming Replication Protocol Interface</title>
+   <para>
+    The <literal>CREATE_REPLICATION_SLOT SLOT slotname LOGICAL
+    options</literal>, <literal>DROP_REPLICATION_SLOT SLOT slotname</literal>
+    and <literal>START_REPLICATION SLOT slotname LOGICAL options</literal>
+    commands can be used to create, drop and stream changes from a replication
+    slot respectively. These commands are only available over a replication
+    connection; they cannot be used via SQL.
+    See <xref linkend="protocol-replication">.
+   </para>
+   <para>
+    The <command>pg_recvlogical</command> command
+    (see <xref linkend="app-pgrecvlogical">) can be used to control logical
+    decoding over a walsender connection.
+   </para>
+  </sect1>
+  <sect1 id="logicaldecoding-sql">
+   <title>Logical Decoding <acronym>SQL</acronym> Interface</title>
+   <para>
+     See <xref linkend="functions-replication"> for detailed documentation on
+     the SQL-level API for interacting with logical decoding.
+   </para>
+   <para>
+    Synchronous replication (see <xref linkend="synchronous-replication">) is
+    only supported on replication slots used over the walsender interface. The
+    function interface and additional, non-core interfaces do not support
+    synchronous replication.
+   </para>
+  </sect1>
+  <sect1 id="logicaldecoding-catalogs">
+   <title>System catalogs related to logical decoding</title>
+   <para>
+    The <link linkend="catalog-pg-replication-slots"><structname>pg_replication_slots</structname></link>
+    view and the
+    <link linkend="monitoring-stats-views-table"><structname>pg_stat_replication</structname></link>
+    view provide information about the current state of replication slots and
+    walsender connections respectively. These views apply to both physical and
+    logical replication.
+   </para>
+  </sect1>
+  <sect1 id="logicaldecoding-output-plugin">
+   <title>Logical Decoding Output Plugins</title>
+   <para>
+    An example output plugin can be found in the
+    <link linkend="test-decoding">
+     <filename>contrib/test_decoding</filename>
+    </link>
+    subdirectory of the PostgreSQL source tree.
+   </para>
+   <sect2 id="logicaldecoding-output-init">
+    <title>Initialization Function</title>
+    <indexterm zone="logicaldecoding">
+     <primary>_PG_output_plugin_init</primary>
+    </indexterm>
+    <para>
+     An output plugin is loaded by dynamically loading a shared library with
+     the output plugin's name as the library basename. The normal library
+     search path is used to locate the library. To provide the required output
+     plugin callbacks and to indicate that the library is actually an output
+     plugin it needs to provide a function named
+     <function>_PG_output_plugin_init</function>. This function is passed a
+     struct that needs to be filled with the callback function pointers for
+     individual actions.
+     <programlisting>
+typedef struct OutputPluginCallbacks
+{
+    LogicalDecodeStartupCB startup_cb;
+    LogicalDecodeBeginCB begin_cb;
+    LogicalDecodeChangeCB change_cb;
+    LogicalDecodeCommitCB commit_cb;
+    LogicalDecodeShutdownCB shutdown_cb;
+} OutputPluginCallbacks;
+typedef void (*LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb);
+     </programlisting>
+     The <function>begin_cb</function>, <function>change_cb</function>
+     and <function>commit_cb</function> callbacks are required,
+     while <function>startup_cb</function>
+     and <function>shutdown_cb</function> are optional.
+    </para>
+   </sect2>
+
+   <sect2 id="logicaldecoding-capabilities">
+    <title>Capabilities</title>
+    <para>
+     To decode, format and output changes, output plugins can use most of the
+     backend's normal infrastructure, including calling output functions. Read
+     only access to relations is permitted as long as only relations are
+     accessed that either have been created by <command>initdb</command> in
+     the <literal>pg_catalog</literal> schema, or have are marked as user
+     provided catalog tables using
+     <programlisting>
+ALTER TABLE user_catalog_table SET (user_catalog_table = true);
+CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
+     </programlisting>
+     Any actions leading to xid assignment are prohibited. That, among others,
+     includes writing to tables, performing DDL changes and
+     calling <literal>txid_current()</literal>.
+    </para>
+   </sect2>
+
+   <sect2 id="logicaldecoding-output-plugin-callbacks">
+    <title>Output Plugin Callbacks</title>
+    <para>
+     An output plugin gets notified about changes that are happening via
+     various callbacks it needs to provide.
+    </para>
+    <para>
+     Concurrent transactions are decoded in commit order and only changes
+     belonging to a specific transaction are decoded inbetween
+     the <literal>begin</literal> and <literal>commit</literal>
+     callbacks. Transactions that were rolled back explicitly or implicitly
+     never get
+     decoded. Successfull <link linkend="SQL-SAVEPOINT">SAVEPOINTs</link> are
+     folded into the transaction containing them in the order they were
+     exectuded within that transaction.
+    </para>
+    <note>
+     <para>
+      Only transactions that have already safely been flushed to disk will be
+      decoded. That can lead to a COMMIT not immediately being decoded in a
+      directly following <literal>pg_logical_slot_get_changes()</literal>
+      when <varname>synchronous_commit</varname> is set
+      to <literal>off</literal>.
+     </para>
+    </note>
+    <sect3 id="logicaldecoding-output-plugin-startup">
+     <title>Startup Callback</title>
+     <para>
+      The optional <function>startup_cb</function> callback is called whenever
+      an replication slot is created or asked to stream changes, independent
+      of the number of changes that are ready to be put out.
+      <programlisting>
+typedef void (*LogicalDecodeStartupCB) (
+    struct LogicalDecodingContext *ctx,
+    OutputPluginOptions *options,
+    bool is_init
+);
+      </programlisting>
+      The <literal>is_init</literal> paramter will be true when the
+      replication slot is being created and false
+      otherwise. <parameter>options</parameter> points to a struct of options
+      that output plugins can set:
+      <programlisting>
+typedef struct OutputPluginOptions
+{
+    OutputPluginOutputType output_type;
+} OutputPluginOptions;
+      </programlisting>
+      <literal>output_type</literal> has to either be set to
+      <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal>
+      or <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal>.
+     </para>
+     <para>
+      The startup callback should validate the options present in
+      <literal>ctx-&gt;output_plugin_options</literal>. If the output plugin
+      needs to have a state, it can
+      use <literal>ctx-&gt;output_plugin_private</literal> to store it.
+     </para>
+    </sect3>
+    <sect3 id="logicaldecoding-output-plugin-shutdown">
+     <title>Shutdown Callback</title>
+     <para>
+      The optional <function>shutdown_cb</function> callback is called
+      whenever a formerly active replication slot is not used anymore and can
+      be used to deallocate resources private to the output plugin. The slot
+      isn't necessarily being dropped, streaming is just being stopped.
+      <programlisting>
+typedef void (*LogicalDecodeShutdownCB) (
+    struct LogicalDecodingContext *ctx
+);
+      </programlisting>
+     </para>
+   </sect3>
+    <sect3 id="logicaldecoding-output-plugin-begin">
+     <title>Transaction Begin Callback</title>
+     <para>
+      The required <function>begin_cb</function> callback is called whenever a
+      start of a commited transaction has been decoded. Aborted transactions
+      and their contents never get decoded.
+      <programlisting>
+typedef void (*LogicalDecodeBeginCB) (
+    struct LogicalDecodingContext *,
+    ReorderBufferTXN *txn
+);
+      </programlisting>
+      The <parameter>txn</parameter> parameter contains meta information about
+      the transaction, like the timestamp at which it has been committed and
+      its XID.
+     </para>
+   </sect3>
+    <sect3 id="logicaldecoding-output-plugin-commit">
+     <title>Transaction End Callback</title>
+     <para>
+      The required <function>commit_cb</function> callback is called whenever
+      a transaction commit has been
+      decoded. The <function>change_cb</function> callbacks for all modified
+      rows will have been called before this, if there have been any modified
+      rows.
+      <programlisting>
+typedef void (*LogicalDecodeCommitCB) (
+    struct LogicalDecodingContext *,
+    ReorderBufferTXN *txn
+);
+      </programlisting>
+     </para>
+    </sect3>
+    <sect3 id="logicaldecoding-output-plugin-change">
+     <title>Callback called for each individual change in a
+     transaction</title>
+     <para>
+      The required <function>change_cb</function> callback is called for every
+      individual row modification inside a transaction, may it be
+      an <command>INSERT</command>, <command>UPDATE</command>
+      or <command>DELETE</command>. Even if the original command modified
+      several rows at once the callback will be called indvidually for each
+      row.
+      <programlisting>
+typedef void (*LogicalDecodeChangeCB) (
+    struct LogicalDecodingContext *ctx,
+    ReorderBufferTXN *txn,
+    Relation relation,
+    ReorderBufferChange *change
+);
+      </programlisting>
+      The <parameter>ctx</parameter> and <parameter>txn</parameter> parameters
+      have the same contents as for the <function>begin_cb</function>
+      and <function>commit_cb</function> callbacks, but additionally the
+      relation descriptor <parameter>relation</parameter> points to the
+      relation the row belongs to and a struct
+      <parameter>change</parameter> describing the row modification are passed
+      in.
+     </para>
+     <note>
+      <para>
+       Only changes in user defined tables that are not unlogged
+       (see <xref linkend="SQL-CREATETABLE-UNLOGGED">) and not temporary
+       (see <xref linkend="SQL-CREATETABLE-TEMPORARY">) can be extracted using
+       logical decoding.
+      </para>
+     </note>
+    </sect3>
+   </sect2>
+   <sect2 id="logicaldecoding-output-plugin-output">
+    <title>Functions for producing output from an output plugin</title>
+    <para>
+     To actually produce output, output plugins can write data to
+     the <literal>StringInfo</literal> output buffer
+     in <literal>ctx-&gt;out</literal> when inside
+     the <function>begin_cb</function>, <function>commit_cb</function>
+     or <function>change_cb</function> callbacks. Before writing to the output
+     buffer <function>OutputPluginPrepareWrite(ctx, last_write)</function> has
+     to be called, and after finishing writing to the
+     buffer <function>OutputPluginWrite(ctx, last_write)</function> has to be
+     called to perform the write. The <parameter>last_write</parameter>
+     indicates whether a particular write was the callback's last write.
+    </para>
+    <para>
+     The following example shows how to output data to the consumer of an
+     output plugin:
+     <programlisting>
+OutputPluginPrepareWrite(ctx, true);
+appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
+OutputPluginWrite(ctx, true);
+     </programlisting>
+    </para>
+   </sect2>
+  </sect1>
+  <sect1 id="logicaldecoding-writer">
+   <title>Logical Decoding Output Writers</title>
+   <para>
+    It is possible to add more output methods for logical decoding.
+    For details, see
+    <filename>src/backend/replication/logical/logicalfuncs.c</filename>.
+    Essentially, three functions need to be provided: one to read WAL, one to
+    prepare writing output, and one to write the output
+    (see <xref linkend="logicaldecoding-output-plugin-output">).
+   </para>
+  </sect1>
+  <sect1 id="logicaldecoding-synchronous">
+   <title>Synchronous replication support for Logical Decoding</title>
+   <para>
+    Logical decoding may be used to to build
+    <link linkend="synchronous-replication">synchronous
+    replication</link> solutions with the same user interface as synchronous
+    replication for <link linkend="streaming-replication">streaming
+    replication</link>.  To do this, the walsender interface
+    (see <xref linkend="logicaldecoding-walsender">) must be used to stream out
+    data. Clients have to send <literal>Standby status update (F)</literal>
+    (see <xref linkend="protocol-replication">) messages, just like streaming
+    replication clients do.
+   </para>
+   <note>
+    <para>
+     A synchronous replica receiving changes via logical decoding will work in
+     the scope of a single database. Since, in contrast to
+     that, <parameter>synchronous_standby_names</parameter> currently is
+     server wide, this means this technique will not work properly if more
+     than one database is actively used.
+     </para>
+   </note>
+  </sect1>
+ </chapter>
index b47bf529a2a4eeee415944251d6cb052f52a7f7d..9bde1085e9b1221d76881367108ef85a82e1d511 100644 (file)
 
   &spi;
   &bgworker;
+  &logicaldecoding;
 
  </part>
 
index cb2dfb2ebc00ab6486b6570b4ad2a273d6d5b173..ea48c270852c02c448affb30d06db3187f13320d 100644 (file)
@@ -1309,9 +1309,22 @@ the simple query protocol can be used in walsender mode.
 Passing <literal>database</> as the value instructs walsender to connect to
 the database specified in the <literal>dbname</> parameter, which will allow
 the connection to be used for logical replication from that database.
+</para>
+<para>
+ For the purpose of testing replication commands, you can make a replication
+ connection via <application>psql</application> or any other <literal>libpq</literal>-using
+ tool with a connection string including the <literal>replication</literal> option,
+ e.g.:
+ <programlisting>
+  psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
+ </programlisting>
+ However it is often more useful to use
+ <application>pg_receivexlog</application> (for physical replication) or
+ <application>pg_recvlogical</application> (for logical replication).
+</para>
 
+<para>
 The commands accepted in walsender mode are:
-
 <variablelist>
   <varlistentry>
     <term>IDENTIFY_SYSTEM</term>
@@ -1764,6 +1777,43 @@ The commands accepted in walsender mode are:
      </para>
     </listitem>
   </varlistentry>
+  <varlistentry>
+    <term><literal>START_REPLICATION</literal> <literal>SLOT</literal> <replaceable class="parameter">slotname</> <literal>LOGICAL</literal> <replaceable class="parameter">XXX/XXX</></term>
+    <listitem>
+     <para>
+      Instructs server to start streaming WAL for logical replication, starting
+      at WAL position <replaceable class="parameter">XXX/XXX</>. The server can
+      reply with an error, e.g. if the requested section of WAL has already
+      been recycled. On success, server responds with a CopyBothResponse
+      message, and then starts to stream WAL to the frontend.
+     </para>
+     <para>
+      The output plugin associated with the selected slot is used
+      to process the output for streaming.
+     </para>
+     <variablelist>
+      <varlistentry>
+       <term><literal>SLOT</literal> <replaceable class="parameter">slotname</></term>
+       <listitem>
+         <para>
+          The name of the slot to stream changes from. This parameter is required,
+          and must correspond to an existing logical replication slot created
+          with <literal>CREATE_REPLICATION_SLOT</literal> in
+          <literal>LOGICAL</literal> mode.
+         </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term><replaceable class="parameter">XXX/XXX</></term>
+       <listitem>
+        <para>
+         The WAL position to begin streaming at.
+        </para>
+       </listitem>
+      </varlistentry>
+     </variablelist>
+    </listitem>
+  </varlistentry>
 
   <varlistentry>
     <term><literal>DROP_REPLICATION_SLOT</literal> <replaceable class="parameter">slotname</></term>
index ce7a5e3cb6cc250ea7c816254afb0101bea3882a..1b0962c253d8f7be11a87c1c8324c28d84e100c6 100644 (file)
@@ -183,6 +183,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY pgDumpall          SYSTEM "pg_dumpall.sgml">
 <!ENTITY pgIsready          SYSTEM "pg_isready.sgml">
 <!ENTITY pgReceivexlog      SYSTEM "pg_receivexlog.sgml">
+<!ENTITY pgRecvlogical      SYSTEM "pg_recvlogical.sgml">
 <!ENTITY pgResetxlog        SYSTEM "pg_resetxlog.sgml">
 <!ENTITY pgRestore          SYSTEM "pg_restore.sgml">
 <!ENTITY postgres           SYSTEM "postgres-ref.sgml">
index 2b02e668e089f50da16d49cffb846cb509305a47..4847d6631654aa8595c36ccacb787ba39cb5c45c 100644 (file)
@@ -580,7 +580,7 @@ ALTER TABLE [ IF EXISTS ] <replaceable class="PARAMETER">name</replaceable>
     </listitem>
    </varlistentry>
 
-   <varlistentry>
+   <varlistentry id="SQL-CREATETABLE-REPLICA-IDENTITY">
     <term><literal>REPLICA IDENTITY</literal></term>
     <listitem>
      <para>
index fc7ad09786f257e65850bb6db7c09bd293c13d2d..2a985b82e5d4744cbdd47edb55a28ca7c64416b7 100644 (file)
@@ -137,7 +137,7 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI
 
   <variablelist>
 
-   <varlistentry>
+   <varlistentry id="SQL-CREATETABLE-TEMPORARY">
     <term><literal>TEMPORARY</> or <literal>TEMP</></term>
     <listitem>
      <para>
@@ -171,7 +171,7 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI
     </listitem>
    </varlistentry>
 
-   <varlistentry>
+   <varlistentry id="SQL-CREATETABLE-UNLOGGED">
     <term><literal>UNLOGGED</></term>
     <listitem>
      <para>
@@ -1051,6 +1051,17 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>user_catalog_table</literal> (<type>boolean</type>)</term>
+    <listitem>
+     <para>
+      Declare a table as an additional catalog table, e.g. for the purpose of
+      logical replication. See
+      <xref linkend="logicaldecoding-capabilities"> for details.
+     </para>
+    </listitem>
+   </varlistentry>
+
    </variablelist>
 
   </refsect2>
diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml
new file mode 100644 (file)
index 0000000..41e5e83
--- /dev/null
@@ -0,0 +1,331 @@
+<!--
+doc/src/sgml/ref/pg_recvlogical.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="app-pgrecvlogical">
+ <refmeta>
+  <refentrytitle><application>pg_recvlogical</application></refentrytitle>
+  <manvolnum>1</manvolnum>
+  <refmiscinfo>Application</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+  <refname>pg_recvlogical</refname>
+  <refpurpose>Control logical decoding (see <xref linkend="logicaldecoding">)
+   streams over a walsender connection.</refpurpose>
+ </refnamediv>
+
+ <indexterm zone="app-pgrecvlogical">
+  <primary>pg_recvlogical</primary>
+ </indexterm>
+
+ <refsynopsisdiv>
+  <cmdsynopsis>
+   <command>pg_recvlogical</command>
+   <arg rep="repeat" choice="opt"><option>option</option></arg>
+  </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1 id="R1-APP-PGRECVLOGICAL-1">
+  <title>Description</title>
+  <para>
+   <command>pg_recvlogical</command> controls logical decoding replication
+   slots and streams data from such replication slots.
+  </para>
+  <para>
+   It creates a replication-mode connection, so it is subject to the same
+   constraints as <link
+   linkend="app-pgreceivexlog"><application>pg_receivexlog</application></link>,
+   plus those for logical replication (see <xref
+   linkend="logicaldecoding">).
+  </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Options</title>
+
+   <para>
+    <application>pg_recvlogical</application> runs in one of three modes, which
+    control its primary action:
+
+    <variablelist>
+
+     <varlistentry>
+      <term><option>--create</option></term>
+      <listitem>
+       <para>
+        Create a new logical replication slot with the name specified in
+        <option>--slot</option>, using the output plugin
+        <option>--plugin</option>, then exit. The slot is created for the
+        database given in <option>--dbname</option>.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>--start</option></term>
+      <listitem>
+       <para>
+        Begin streaming changes from the logical replication slot with the name
+        specified in <option>--slot</option>, continuing until terminated with a
+        signal. If the server side change stream ends with a server
+        shutdown or disconnect, retry in a loop unless
+        <option>--no-loop</option> is specified. The stream format is
+        determined by the output plugin specified when the slot was created.
+       </para>
+       <para>
+        You must connect to the same database used to create the slot.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>--drop</option></term>
+      <listitem>
+       <para>
+        Drop the replication slot with the name specified
+        in <option>--slot</option>, then exit.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+
+   </para>
+
+   <para>
+    <application>pg_recvlogical</application> supports all the usual
+    <literal>libpq</literal>-based options. These are explained in detail in
+    the documentation for
+    <link linkend="APP-PSQL"><application>psql</application></link> and for
+    <link linkend="libpq"><literal>libpq</literal></link>.
+
+    <variablelist>
+
+      <varlistentry>
+       <term><option>-U <replaceable>user</replaceable></option></term>
+       <term><option>--username <replaceable>user</replaceable></option></term>
+       <listitem>
+        <para>
+         Username to connect as. Must have a suitable <literal>pg_hba.conf</literal>
+         entry allowing <literal>replication</literal> connections. Defaults to
+         current operating system user name.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry>
+       <term><option>-d <replaceable>database</replaceable></option></term>
+       <term><option>--dbname <replaceable>database</replaceable></option></term>
+       <listitem>
+        <para>
+         The database to connect to in <literal>replication</literal> mode; see
+         mode descriptions for details. May be
+         a <link linkend="LIBPQ-CONNSTRING">libpq connstring</link>
+         instead. Defaults to user name.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry>
+       <term><option>-h <replaceable>hostname-or-ip</replaceable></option></term>
+       <term><option>--host <replaceable>hostname-or-ip</replaceable></option></term>
+       <listitem>
+        <para>
+         Host or socket to connect
+         to. See <link linkend="APP-PSQL"><application>psql</application></link>
+         and <link linkend="libpq"><literal>libpq</literal></link>
+         documentation.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry>
+       <term><option>-p <replaceable>port</replaceable></option></term>
+       <term><option>--port <replaceable>port</replaceable></option></term>
+       <listitem>
+        <para>
+         Port number to connect to. See
+         <link linkend="R1-APP-PSQL-3"><application>psql</application></link>
+         for an explanation of default port choices when this is not
+         specified.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry>
+       <term><option>-w</option></term>
+       <term><option>--no-password</option></term>
+       <listitem>
+        <para>
+         Prevent prompting for a password. Will exit with an error code if a
+         password is required but not available.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry>
+       <term><option>-W</option></term>
+       <term><option>--password</option></term>
+       <listitem>
+        <para>
+         Provide a password for this connection. Please use the pgservice file
+         (see <xref linkend="libpq-pgservice">) or an environment variable
+         instead of this option.
+        </para>
+       </listitem>
+      </varlistentry>
+
+     </variablelist>
+
+   </para>
+
+   <para>
+    The following command-line options control the location and format of the
+    output and other replication behaviour:
+
+    <variablelist>
+
+     <varlistentry>
+      <term><option>-f <replaceable>filename</replaceable></option></term>
+      <term><option>--file=<replaceable>filename</replaceable></option></term>
+      <listitem>
+       <para>
+        Write received and decoded transaction data into this
+        file. Use <literal>-</> for stdout.
+       </para>
+      </listitem>
+     </varlistentry>
+
+
+     <varlistentry>
+      <term><option>-n</option></term>
+      <term><option>--no-loop</option></term>
+      <listitem>
+       <para>
+        When the connection to the server is lost, do not retry in a loop, just exit.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-o <replaceable>NAME</replaceable>[=<replaceable>VALUE</replaceable>]</option></term>
+      <term><option>--option=<replaceable>NAME</replaceable>[=<replaceable>VALUE</replaceable>]</option></term>
+      <listitem>
+       <para>
+        Pass the option <parameter>NAME</parameter> to the output plugin with,
+        if specified, the option value <parameter>NAME</parameter>. Which
+        options exist and their effects depends on the used output plugin.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-F <replaceable>interval_seconds</replaceable></option></term>
+      <term><option>--fsync-interval=<replaceable>interval_seconds</replaceable></option></term>
+      <listitem>
+       <para>
+        How often should
+        <link linkend="app-pgreceivexlog"><application>pg_receivexlog</application></link>
+        issue sync commands to ensure the <parameter>--outputfile</parameter>
+        is safely flushed to disk without being asked by the server to do
+        so. Specifying an interval of <literal>0</literal> disables issuing
+        fsyncs altogether, while still reporting progress the server.
+        In this case, data may be lost in the event of a crash.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-P <replaceable>plugin</replaceable></option></term>
+      <term><option>--plugin=<replaceable>plugin</replaceable></option></term>
+      <listitem>
+       <para>
+        When creating a slot, use the logical decoding output
+        plugin. See <xref linkend="logicaldecoding">. This option has no
+        effect if the slot already exists.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-s <replaceable>interval_seconds</replaceable></option></term>
+      <term><option>--status-interval=<replaceable>interval_seconds</replaceable></option></term>
+      <listitem>
+       <para>
+        This option has the same effect as the option of the same name in <link
+        linkend="app-pgreceivexlog"><application>pg_receivexlog</application></link>.
+        See the description there.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-S <replaceable>slot_name</replaceable></option></term>
+      <term><option>--slot=<replaceable>slot_name</replaceable></option></term>
+      <listitem>
+       <para>
+        In <option>--start</option> mode, use the existing logical replication slot named
+        <replaceable>slot_name</replaceable>. In <option>--create</option> mode, create the
+        slot with this name. In <option>--drop</option> mode, delete the slot with this name.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-I <replaceable>lsn</replaceable></option></term>
+      <term><option>--startpos=<replaceable>lsn</replaceable></option></term>
+      <listitem>
+       <para>
+        In <option>--start</option> mode, start replication from the given
+        LSN.  For details on the effect of this, see the documentation
+        in <xref linkend="logicaldecoding">
+        and <xref linkend="protocol-replication">. Ignored in other modes.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+
+   </para>
+
+   <para>
+    The following additional options are available:
+
+    <variablelist>
+
+     <varlistentry>
+       <term><option>-v</></term>
+       <term><option>--verbose</></term>
+       <listitem>
+       <para>
+        Enables verbose mode.
+       </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
+       <term><option>-V</></term>
+       <term><option>--version</></term>
+       <listitem>
+       <para>
+        Print the <application>pg_recvlogical</application> version and exit.
+       </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-?</></term>
+      <term><option>--help</></term>
+       <listitem>
+        <para>
+         Show help about <application>pg_recvlogical</application> command line
+         arguments, and exit.
+        </para>
+       </listitem>
+      </varlistentry>
+
+    </variablelist>
+   </para>
+ </refsect1>
+</refentry>
index 87e8e9ee8ffdf0a4ac3406479a290ecfc4b374cf..a6575f52ac08ef6db43fc4d73c5eb61db0f2992d 100644 (file)
    &pgDumpall;
    &pgIsready;
    &pgReceivexlog;
+   &pgRecvlogical;
    &pgRestore;
    &psqlRef;
    &reindexdb;