]> granicus.if.org Git - postgresql/commitdiff
Add pg_recvlogical —-endpos=LSN
authorSimon Riggs <simon@2ndQuadrant.com>
Wed, 4 Jan 2017 19:02:07 +0000 (19:02 +0000)
committerSimon Riggs <simon@2ndQuadrant.com>
Wed, 4 Jan 2017 19:02:07 +0000 (19:02 +0000)
Allow pg_recvlogical to specify an ending LSN, complementing
the existing -—startpos=LSN option.

Craig Ringer, reviewed by Euler Taveira and Naoki Okano

doc/src/sgml/ref/pg_recvlogical.sgml
src/bin/pg_basebackup/pg_recvlogical.c

index b35881f2b9eca9e614a1ad4f940609b54f1727d1..d066ce87016e379ef935d137bd149aed71d74469 100644 (file)
@@ -38,6 +38,14 @@ PostgreSQL documentation
    constraints as <xref linkend="app-pgreceivexlog">, plus those for logical
    replication (see <xref linkend="logicaldecoding">).
   </para>
+
+  <para>
+   <command>pg_recvlogical</> has no equivalent to the logical decoding
+   SQL interface's peek and get modes. It sends replay confirmations for
+   data lazily as it receives it and on clean exit. To examine pending data on
+    a slot without consuming it, use
+   <link linkend="functions-replication"><function>pg_logical_slot_peek_changes</></>.
+  </para>
  </refsect1>
 
  <refsect1>
@@ -154,6 +162,32 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>-E <replaceable>lsn</replaceable></option></term>
+      <term><option>--endpos=<replaceable>lsn</replaceable></option></term>
+      <listitem>
+       <para>
+        In <option>--start</option> mode, automatically stop replication
+        and exit with normal exit status 0 when receiving reaches the
+        specified LSN.  If specified when not in <option>--start</option>
+        mode, an error is raised.
+       </para>
+
+       <para>
+        If there's a record with LSN exactly equal to <replaceable>lsn</>,
+        the record will be output.
+       </para>
+
+       <para>
+        The <option>--endpos</option> option is not aware of transaction
+        boundaries and may truncate output partway through a transaction.
+        Any partially output transaction will not be consumed and will be
+        replayed again when the slot is next read from. Individual messages
+        are never truncated.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>--if-not-exists</option></term>
       <listitem>
index 49ed2abe55822d7b1e8484cbc572ed9ef5484ac7..658e2ba91f56f29d46841335199f003adb4e122b 100644 (file)
@@ -40,6 +40,7 @@ static int    noloop = 0;
 static int     standby_message_timeout = 10 * 1000;            /* 10 sec = default */
 static int     fsync_interval = 10 * 1000; /* 10 sec = default */
 static XLogRecPtr startpos = InvalidXLogRecPtr;
+static XLogRecPtr endpos = InvalidXLogRecPtr;
 static bool do_create_slot = false;
 static bool slot_exists_ok = false;
 static bool do_start_slot = false;
@@ -63,6 +64,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
 static void usage(void);
 static void StreamLogicalLog(void);
 static void disconnect_and_exit(int code);
+static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
+static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
+                                  bool keepalive, XLogRecPtr lsn);
 
 static void
 usage(void)
@@ -81,6 +85,7 @@ usage(void)
                         "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
        printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
        printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
+       printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
        printf(_("  -n, --no-loop          do not loop on connection lost\n"));
        printf(_("  -o, --option=NAME[=VALUE]\n"
                         "                         pass option NAME with optional value VALUE to the\n"
@@ -281,6 +286,7 @@ StreamLogicalLog(void)
                int                     bytes_written;
                int64           now;
                int                     hdr_len;
+               XLogRecPtr      cur_record_lsn = InvalidXLogRecPtr;
 
                if (copybuf != NULL)
                {
@@ -454,6 +460,7 @@ StreamLogicalLog(void)
                        int                     pos;
                        bool            replyRequested;
                        XLogRecPtr      walEnd;
+                       bool            endposReached = false;
 
                        /*
                         * Parse the keepalive message, enclosed in the CopyData message.
@@ -476,18 +483,32 @@ StreamLogicalLog(void)
                        }
                        replyRequested = copybuf[pos];
 
-                       /* If the server requested an immediate reply, send one. */
-                       if (replyRequested)
+                       if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
                        {
-                               /* fsync data, so we send a recent flush pointer */
-                               if (!OutputFsync(now))
-                                       goto error;
+                               /*
+                                * If there's nothing to read on the socket until a keepalive
+                                * we know that the server has nothing to send us; and if
+                                * walEnd has passed endpos, we know nothing else can have
+                                * committed before endpos.  So we can bail out now.
+                                */
+                               endposReached = true;
+                       }
 
-                               now = feGetCurrentTimestamp();
-                               if (!sendFeedback(conn, now, true, false))
+                       /* Send a reply, if necessary */
+                       if (replyRequested || endposReached)
+                       {
+                               if (!flushAndSendFeedback(conn, &now))
                                        goto error;
                                last_status = now;
                        }
+
+                       if (endposReached)
+                       {
+                               prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
+                               time_to_abort = true;
+                               break;
+                       }
+
                        continue;
                }
                else if (copybuf[0] != 'w')
@@ -497,7 +518,6 @@ StreamLogicalLog(void)
                        goto error;
                }
 
-
                /*
                 * Read the header of the XLogData message, enclosed in the CopyData
                 * message. We only need the WAL location field (dataStart), the rest
@@ -515,12 +535,23 @@ StreamLogicalLog(void)
                }
 
                /* Extract WAL location for this block */
-               {
-                       XLogRecPtr      temp = fe_recvint64(&copybuf[1]);
+               cur_record_lsn = fe_recvint64(&copybuf[1]);
 
-                       output_written_lsn = Max(temp, output_written_lsn);
+               if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
+               {
+                       /*
+                        * We've read past our endpoint, so prepare to go away being
+                        * cautious about what happens to our output data.
+                        */
+                       if (!flushAndSendFeedback(conn, &now))
+                               goto error;
+                       prepareToTerminate(conn, endpos, false, cur_record_lsn);
+                       time_to_abort = true;
+                       break;
                }
 
+               output_written_lsn = Max(cur_record_lsn, output_written_lsn);
+
                bytes_left = r - hdr_len;
                bytes_written = 0;
 
@@ -557,10 +588,29 @@ StreamLogicalLog(void)
                                        strerror(errno));
                        goto error;
                }
+
+               if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
+               {
+                       /* endpos was exactly the record we just processed, we're done */
+                       if (!flushAndSendFeedback(conn, &now))
+                               goto error;
+                       prepareToTerminate(conn, endpos, false, cur_record_lsn);
+                       time_to_abort = true;
+                       break;
+               }
        }
 
        res = PQgetResult(conn);
-       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+       if (PQresultStatus(res) == PGRES_COPY_OUT)
+       {
+               /*
+                * We're doing a client-initiated clean exit and have sent CopyDone to
+                * the server. We've already sent replay confirmation and fsync'd so
+                * we can just clean up the connection now.
+                */
+               goto error;
+       }
+       else if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                fprintf(stderr,
                                _("%s: unexpected termination of replication stream: %s"),
@@ -638,6 +688,7 @@ main(int argc, char **argv)
                {"password", no_argument, NULL, 'W'},
 /* replication options */
                {"startpos", required_argument, NULL, 'I'},
+               {"endpos", required_argument, NULL, 'E'},
                {"option", required_argument, NULL, 'o'},
                {"plugin", required_argument, NULL, 'P'},
                {"status-interval", required_argument, NULL, 's'},
@@ -673,7 +724,7 @@ main(int argc, char **argv)
                }
        }
 
-       while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:o:P:s:S:",
+       while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:E:o:P:s:S:",
                                                        long_options, &option_index)) != -1)
        {
                switch (c)
@@ -733,6 +784,16 @@ main(int argc, char **argv)
                                }
                                startpos = ((uint64) hi) << 32 | lo;
                                break;
+                       case 'E':
+                               if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
+                               {
+                                       fprintf(stderr,
+                                                       _("%s: could not parse end position \"%s\"\n"),
+                                                       progname, optarg);
+                                       exit(1);
+                               }
+                               endpos = ((uint64) hi) << 32 | lo;
+                               break;
                        case 'o':
                                {
                                        char       *data = pg_strdup(optarg);
@@ -857,6 +918,16 @@ main(int argc, char **argv)
                exit(1);
        }
 
+       if (endpos != InvalidXLogRecPtr && !do_start_slot)
+       {
+               fprintf(stderr,
+                               _("%s: --endpos may only be specified with --start\n"),
+                               progname);
+               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                               progname);
+               exit(1);
+       }
+
 #ifndef WIN32
        pqsignal(SIGINT, sigint_handler);
        pqsignal(SIGHUP, sighup_handler);
@@ -923,8 +994,8 @@ main(int argc, char **argv)
                if (time_to_abort)
                {
                        /*
-                        * We've been Ctrl-C'ed. That's not an error, so exit without an
-                        * errorcode.
+                        * We've been Ctrl-C'ed or reached an exit limit condition. That's
+                        * not an error, so exit without an errorcode.
                         */
                        disconnect_and_exit(0);
                }
@@ -943,3 +1014,47 @@ main(int argc, char **argv)
                }
        }
 }
+
+/*
+ * Fsync our output data, and send a feedback message to the server.  Returns
+ * true if successful, false otherwise.
+ *
+ * If successful, *now is updated to the current timestamp just before sending
+ * feedback.
+ */
+static bool
+flushAndSendFeedback(PGconn *conn, TimestampTz *now)
+{
+       /* flush data to disk, so that we send a recent flush pointer */
+       if (!OutputFsync(*now))
+               return false;
+       *now = feGetCurrentTimestamp();
+       if (!sendFeedback(conn, *now, true, false))
+               return false;
+
+       return true;
+}
+
+/*
+ * Try to inform the server about of upcoming demise, but don't wait around or
+ * retry on failure.
+ */
+static void
+prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
+{
+       (void) PQputCopyEnd(conn, NULL);
+       (void) PQflush(conn);
+
+       if (verbose)
+       {
+               if (keepalive)
+                       fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n",
+                                       progname,
+                                       (uint32) (endpos >> 32), (uint32) endpos);
+               else
+                       fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n",
+                                       progname, (uint32) (endpos >> 32), (uint32) (endpos),
+                                       (uint32) (lsn >> 32), (uint32) lsn);
+
+       }
+}