]> granicus.if.org Git - postgresql/commitdiff
Add pg_recvlogical, a tool to receive data logical decoding data.
authorRobert Haas <rhaas@postgresql.org>
Tue, 18 Mar 2014 16:19:57 +0000 (12:19 -0400)
committerRobert Haas <rhaas@postgresql.org>
Tue, 18 Mar 2014 16:25:14 +0000 (12:25 -0400)
This is fairly basic at the moment, but it's at least useful for
testing and debugging, and possibly more.

Andres Freund

src/bin/pg_basebackup/.gitignore
src/bin/pg_basebackup/Makefile
src/bin/pg_basebackup/nls.mk
src/bin/pg_basebackup/pg_recvlogical.c [new file with mode: 0644]
src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/receivelog.h
src/bin/pg_basebackup/streamutil.c
src/bin/pg_basebackup/streamutil.h

index 1334a1f77b1a606df4f7d0a781d27602207372e6..7abea15a3f9a08908c7308dc5a6fa7aaed18272a 100644 (file)
@@ -1,2 +1,3 @@
 /pg_basebackup
 /pg_receivexlog
+/pg_recvlogical
index 17c91af124036b1d0ebab1453c1867825bf05909..346560eeab1cca5f2e66b2921e952013393c6d5f 100644 (file)
@@ -20,7 +20,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 
 OBJS=receivelog.o streamutil.o $(WIN32RES)
 
-all: pg_basebackup pg_receivexlog
+all: pg_basebackup pg_receivexlog pg_recvlogical
 
 pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
        $(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
@@ -28,9 +28,13 @@ pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
 pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport
        $(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
 
+pg_recvlogical: pg_recvlogical.o $(OBJS) | submake-libpq submake-libpgport
+       $(CC) $(CFLAGS) pg_recvlogical.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
 install: all installdirs
        $(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
        $(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
+       $(INSTALL_PROGRAM) pg_recvlogical$(X) '$(DESTDIR)$(bindir)/pg_recvlogical$(X)'
 
 installdirs:
        $(MKDIR_P) '$(DESTDIR)$(bindir)'
@@ -38,6 +42,9 @@ installdirs:
 uninstall:
        rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
        rm -f '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
+       rm -f '$(DESTDIR)$(bindir)/pg_recvlogical$(X)'
 
 clean distclean maintainer-clean:
-       rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o
+       rm -f pg_basebackup$(X) pg_receivexlog$(X) pg_recvlogical$(X) \
+               pg_basebackup.o pg_receivexlog.o pg_recvlogical.o \
+               $(OBJS)
index e1c96dd4c49937e36c65a742e2ed78666fec2b87..29df4bcdb39698773aa9db750cdf13eab564fe4c 100644 (file)
@@ -1,4 +1,4 @@
 # src/bin/pg_basebackup/nls.mk
 CATALOG_NAME     = pg_basebackup
 AVAIL_LANGUAGES  = cs de es fr it ja pl pt_BR ru zh_CN
-GETTEXT_FILES    = pg_basebackup.c pg_receivexlog.c receivelog.c streamutil.c ../../common/fe_memutils.c
+GETTEXT_FILES    = pg_basebackup.c pg_receivexlog.c pg_recvlogical.c receivelog.c streamutil.c ../../common/fe_memutils.c
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
new file mode 100644 (file)
index 0000000..a631cee
--- /dev/null
@@ -0,0 +1,978 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_recvlogical.c - receive data from a logical decoding slot in a streaming fashion
+ *                                       and write it to to a local file.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *               src/bin/pg_basebackup/pg_recvlogical.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <dirent.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+/* local includes */
+#include "streamutil.h"
+
+#include "access/xlog_internal.h"
+#include "common/fe_memutils.h"
+#include "getopt_long.h"
+#include "libpq-fe.h"
+#include "libpq/pqsignal.h"
+#include "pqexpbuffer.h"
+
+
+/* Time to sleep between reconnection attempts */
+#define RECONNECT_SLEEP_TIME 5
+
+/* Global Options */
+static char    *outfile = NULL;
+static int             verbose = 0;
+static int             noloop = 0;
+static int             standby_message_timeout = 10 * 1000;            /* 10 sec = default */
+static int             fsync_interval = 10 * 1000;             /* 10 sec = default */
+static XLogRecPtr startpos = InvalidXLogRecPtr;
+static bool            do_create_slot = false;
+static bool            do_start_slot = false;
+static bool            do_drop_slot = false;
+
+/* filled pairwise with option, value. value may be NULL */
+static char      **options;
+static size_t  noptions = 0;
+static const char *plugin = "test_decoding";
+
+/* Global State */
+static int             outfd = -1;
+static volatile sig_atomic_t time_to_abort = false;
+static volatile sig_atomic_t output_reopen = false;
+static int64   output_last_fsync = -1;
+static bool            output_unsynced = false;
+static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
+static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
+
+static void usage(void);
+static void StreamLog();
+static void disconnect_and_exit(int code);
+
+static void
+usage(void)
+{
+       printf(_("%s receives PostgreSQL logical change stream.\n\n"),
+                  progname);
+       printf(_("Usage:\n"));
+       printf(_("  %s [OPTION]...\n"), progname);
+       printf(_("\nOptions:\n"));
+       printf(_("  -f, --file=FILE        receive log into this file. - for stdout\n"));
+       printf(_("  -n, --no-loop          do not loop on connection lost\n"));
+       printf(_("  -v, --verbose          output verbose messages\n"));
+       printf(_("  -V, --version          output version information, then exit\n"));
+       printf(_("  -?, --help             show this help, then exit\n"));
+       printf(_("\nConnection options:\n"));
+       printf(_("  -d, --dbname=DBNAME    database to connect to\n"));
+       printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
+       printf(_("  -p, --port=PORT        database server port number\n"));
+       printf(_("  -U, --username=NAME    connect as specified database user\n"));
+       printf(_("  -w, --no-password      never prompt for password\n"));
+       printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
+       printf(_("\nReplication options:\n"));
+       printf(_("  -F  --fsync-interval=INTERVAL\n"
+                        "                         frequency of syncs to the output file (in seconds, defaults to 10)\n"));
+       printf(_("  -o, --option=NAME[=VALUE]\n"
+                        "                         Specify option NAME with optional value VAL, to be passed\n"
+                        "                         to the output plugin\n"));
+       printf(_("  -P, --plugin=PLUGIN    use output plugin PLUGIN (defaults to test_decoding)\n"));
+       printf(_("  -s, --status-interval=INTERVAL\n"
+                        "                         time between status packets sent to server (in seconds, defaults to 10)\n"));
+       printf(_("  -S, --slot=SLOT        use existing replication slot SLOT instead of starting a new one\n"));
+       printf(_("  -I, --startpos=PTR     Where in an existing slot should the streaming start\n"));
+       printf(_("\nAction to be performed:\n"));
+       printf(_("      --create           create a new replication slot (for the slotname see --slot)\n"));
+       printf(_("      --start            start streaming in a replication slot (for the slotname see --slot)\n"));
+       printf(_("      --drop             drop the replication slot (for the slotname see --slot)\n"));
+       printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
+}
+
+/*
+ * Send a Standby Status Update message to server.
+ */
+static bool
+sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested)
+{
+       static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
+       static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
+
+       char            replybuf[1 + 8 + 8 + 8 + 8 + 1];
+       int                     len = 0;
+
+       /*
+        * we normally don't want to send superflous feedbacks, but if it's
+        * because of a timeout we need to, otherwise wal_sender_timeout will
+        * kill us.
+        */
+       if (!force &&
+               last_written_lsn == output_written_lsn &&
+               last_fsync_lsn != output_fsync_lsn)
+               return true;
+
+       if (verbose)
+               fprintf(stderr,
+                               _("%s: confirming write up to %X/%X, flush to %X/%X (slot %s)\n"),
+                               progname,
+                               (uint32) (output_written_lsn >> 32), (uint32) output_written_lsn,
+                               (uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn,
+                               replication_slot);
+
+       replybuf[len] = 'r';
+       len += 1;
+       fe_sendint64(output_written_lsn, &replybuf[len]);               /* write */
+       len += 8;
+       fe_sendint64(output_fsync_lsn, &replybuf[len]);         /* flush */
+       len += 8;
+       fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);                /* apply */
+       len += 8;
+       fe_sendint64(now, &replybuf[len]);                      /* sendTime */
+       len += 8;
+       replybuf[len] = replyRequested ? 1 : 0;         /* replyRequested */
+       len += 1;
+
+       startpos = output_written_lsn;
+       last_written_lsn = output_written_lsn;
+       last_fsync_lsn = output_fsync_lsn;
+
+       if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
+       {
+               fprintf(stderr, _("%s: could not send feedback packet: %s"),
+                               progname, PQerrorMessage(conn));
+               return false;
+       }
+
+       return true;
+}
+
+static void
+disconnect_and_exit(int code)
+{
+       if (conn != NULL)
+               PQfinish(conn);
+
+       exit(code);
+}
+
+static bool
+OutputFsync(int64 now)
+{
+       output_last_fsync = now;
+
+       output_fsync_lsn = output_written_lsn;
+
+       if (fsync_interval <= 0)
+               return true;
+
+       if (!output_unsynced)
+               return true;
+
+       output_unsynced = false;
+
+       /* Accept EINVAL, in case output is writing to a pipe or similar. */
+       if (fsync(outfd) != 0 && errno != EINVAL)
+       {
+               fprintf(stderr,
+                               _("%s: could not fsync log file \"%s\": %s\n"),
+                               progname, outfile, strerror(errno));
+               return false;
+       }
+
+       return true;
+}
+
+/*
+ * Start the log streaming
+ */
+static void
+StreamLog(void)
+{
+       PGresult   *res;
+       char       *copybuf = NULL;
+       int64           last_status = -1;
+       int                     i;
+       PQExpBuffer query;
+
+       output_written_lsn = InvalidXLogRecPtr;
+       output_fsync_lsn = InvalidXLogRecPtr;
+
+       query = createPQExpBuffer();
+
+       /*
+        * Connect in replication mode to the server
+        */
+       if (!conn)
+               conn = GetConnection();
+       if (!conn)
+               /* Error message already written in GetConnection() */
+               return;
+
+       /*
+        * Start the replication
+        */
+       if (verbose)
+               fprintf(stderr,
+                               _("%s: starting log streaming at %X/%X (slot %s)\n"),
+                               progname, (uint32) (startpos >> 32), (uint32) startpos,
+                               replication_slot);
+
+       /* Initiate the replication stream at specified location */
+       appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
+                                         replication_slot, (uint32) (startpos >> 32), (uint32) startpos);
+
+       /* print options if there are any */
+       if (noptions)
+               appendPQExpBufferStr(query, " (");
+
+       for (i = 0; i < noptions; i++)
+       {
+               /* separator */
+               if (i > 0)
+                       appendPQExpBufferStr(query, ", ");
+
+               /* write option name */
+               appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
+
+               /* write option value if specified */
+               if (options[(i * 2) + 1] != NULL)
+                       appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
+       }
+
+       if (noptions)
+               appendPQExpBufferChar(query, ')');
+
+       res = PQexec(conn, query->data);
+       if (PQresultStatus(res) != PGRES_COPY_BOTH)
+       {
+               fprintf(stderr, _("%s: could not send replication command \"%s\": %s\n"),
+                               progname, query->data, PQresultErrorMessage(res));
+               PQclear(res);
+               goto error;
+       }
+       PQclear(res);
+       resetPQExpBuffer(query);
+
+       if (verbose)
+               fprintf(stderr,
+                               _("%s: initiated streaming\n"),
+                               progname);
+
+       while (!time_to_abort)
+       {
+               int                     r;
+               int                     bytes_left;
+               int                     bytes_written;
+               int64           now;
+               int                     hdr_len;
+
+               if (copybuf != NULL)
+               {
+                       PQfreemem(copybuf);
+                       copybuf = NULL;
+               }
+
+               /*
+                * Potentially send a status message to the master
+                */
+               now = feGetCurrentTimestamp();
+
+               if (outfd != -1 &&
+                       feTimestampDifferenceExceeds(output_last_fsync, now,
+                                                                                fsync_interval))
+               {
+                       if (!OutputFsync(now))
+                               goto error;
+               }
+
+               if (standby_message_timeout > 0 &&
+                       feTimestampDifferenceExceeds(last_status, now,
+                                                                                standby_message_timeout))
+               {
+                       /* Time to send feedback! */
+                       if (!sendFeedback(conn, now, true, false))
+                               goto error;
+
+                       last_status = now;
+               }
+
+               r = PQgetCopyData(conn, &copybuf, 1);
+               if (r == 0)
+               {
+                       /*
+                        * In async mode, and no data available. We block on reading but
+                        * not more than the specified timeout, so that we can send a
+                        * response back to the client.
+                        */
+                       fd_set          input_mask;
+                       int64           message_target = 0;
+                       int64           fsync_target = 0;
+                       struct timeval timeout;
+                       struct timeval *timeoutptr;
+
+                       FD_ZERO(&input_mask);
+                       FD_SET(PQsocket(conn), &input_mask);
+
+                       /* Compute when we need to wakeup to send a keepalive message. */
+                       if (standby_message_timeout)
+                               message_target = last_status + (standby_message_timeout - 1) *
+                                       ((int64) 1000);
+
+                       /* Compute when we need to wakeup to fsync the output file. */
+                       if (fsync_interval > 0 && output_unsynced)
+                               fsync_target = output_last_fsync + (fsync_interval - 1) *
+                                       ((int64) 1000);
+
+                       /* Now compute when to wakeup. */
+                       if (message_target > 0 || fsync_target > 0)
+                       {
+                               int64           targettime;
+                               long            secs;
+                               int                     usecs;
+
+                               targettime = message_target;
+
+                               if (fsync_target > 0 && fsync_target < targettime)
+                                       targettime = fsync_target;
+
+                               feTimestampDifference(now,
+                                                                         targettime,
+                                                                         &secs,
+                                                                         &usecs);
+                               if (secs <= 0)
+                                       timeout.tv_sec = 1; /* Always sleep at least 1 sec */
+                               else
+                                       timeout.tv_sec = secs;
+                               timeout.tv_usec = usecs;
+                               timeoutptr = &timeout;
+                       }
+
+                       r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+                       if (r == 0 || (r < 0 && errno == EINTR))
+                       {
+                               /*
+                                * Got a timeout or signal. Continue the loop and either
+                                * deliver a status packet to the server or just go back into
+                                * blocking.
+                                */
+                               continue;
+                       }
+                       else if (r < 0)
+                       {
+                               fprintf(stderr, _("%s: select() failed: %s\n"),
+                                               progname, strerror(errno));
+                               goto error;
+                       }
+
+                       /* Else there is actually data on the socket */
+                       if (PQconsumeInput(conn) == 0)
+                       {
+                               fprintf(stderr,
+                                               _("%s: could not receive data from WAL stream: %s"),
+                                               progname, PQerrorMessage(conn));
+                               goto error;
+                       }
+                       continue;
+               }
+
+               /* End of copy stream */
+               if (r == -1)
+                       break;
+
+               /* Failure while reading the copy stream */
+               if (r == -2)
+               {
+                       fprintf(stderr, _("%s: could not read COPY data: %s"),
+                                       progname, PQerrorMessage(conn));
+                       goto error;
+               }
+
+               /* Check the message type. */
+               if (copybuf[0] == 'k')
+               {
+                       int                     pos;
+                       bool            replyRequested;
+                       XLogRecPtr      walEnd;
+
+                       /*
+                        * Parse the keepalive message, enclosed in the CopyData message.
+                        * We just check if the server requested a reply, and ignore the
+                        * rest.
+                        */
+                       pos = 1;                        /* skip msgtype 'k' */
+                       walEnd = fe_recvint64(&copybuf[pos]);
+                       output_written_lsn = Max(walEnd, output_written_lsn);
+
+                       pos += 8;                       /* read walEnd */
+
+                       pos += 8;                       /* skip sendTime */
+
+                       if (r < pos + 1)
+                       {
+                               fprintf(stderr, _("%s: streaming header too small: %d\n"),
+                                               progname, r);
+                               goto error;
+                       }
+                       replyRequested = copybuf[pos];
+
+                       /* If the server requested an immediate reply, send one. */
+                       if (replyRequested)
+                       {
+                               /* fsync data, so we send a recent flush pointer */
+                               if (!OutputFsync(now))
+                                       goto error;
+
+                               now = feGetCurrentTimestamp();
+                               if (!sendFeedback(conn, now, true, false))
+                                       goto error;
+                               last_status = now;
+                       }
+                       continue;
+               }
+               else if (copybuf[0] != 'w')
+               {
+                       fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+                                       progname, copybuf[0]);
+                       goto error;
+               }
+
+
+               /*
+                * Read the header of the XLogData message, enclosed in the CopyData
+                * message. We only need the WAL location field (dataStart), the rest
+                * of the header is ignored.
+                */
+               hdr_len = 1;                    /* msgtype 'w' */
+               hdr_len += 8;                   /* dataStart */
+               hdr_len += 8;                   /* walEnd */
+               hdr_len += 8;                   /* sendTime */
+               if (r < hdr_len + 1)
+               {
+                       fprintf(stderr, _("%s: streaming header too small: %d\n"),
+                                       progname, r);
+                       goto error;
+               }
+
+               /* Extract WAL location for this block */
+               {
+                       XLogRecPtr      temp = fe_recvint64(&copybuf[1]);
+
+                       output_written_lsn = Max(temp, output_written_lsn);
+               }
+
+               /* redirect output to stdout */
+               if (outfd == -1 && strcmp(outfile, "-") == 0)
+               {
+                       outfd = fileno(stdout);
+               }
+
+               /* got SIGHUP, close output file */
+               if (outfd != -1 && output_reopen)
+               {
+                       now = feGetCurrentTimestamp();
+                       if (!OutputFsync(now))
+                               goto error;
+                       close(outfd);
+                       outfd = -1;
+                       output_reopen = false;
+               }
+
+               if (outfd == -1)
+               {
+
+                       outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
+                                                S_IRUSR | S_IWUSR);
+                       if (outfd == -1)
+                       {
+                               fprintf(stderr,
+                                               _("%s: could not open log file \"%s\": %s\n"),
+                                               progname, outfile, strerror(errno));
+                               goto error;
+                       }
+               }
+
+               bytes_left = r - hdr_len;
+               bytes_written = 0;
+
+               /* signal that a fsync is needed */
+               output_unsynced = true;
+
+               while (bytes_left)
+               {
+                       int                     ret;
+
+                       ret = write(outfd,
+                                               copybuf + hdr_len + bytes_written,
+                                               bytes_left);
+
+                       if (ret < 0)
+                       {
+                               fprintf(stderr,
+                                 _("%s: could not write %u bytes to log file \"%s\": %s\n"),
+                                               progname, bytes_left, outfile,
+                                               strerror(errno));
+                               goto error;
+                       }
+
+                       /* Write was successful, advance our position */
+                       bytes_written += ret;
+                       bytes_left -= ret;
+               }
+
+               if (write(outfd, "\n", 1) != 1)
+               {
+                       fprintf(stderr,
+                                 _("%s: could not write %u bytes to log file \"%s\": %s\n"),
+                                       progname, 1, outfile,
+                                       strerror(errno));
+                       goto error;
+               }
+       }
+
+       res = PQgetResult(conn);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+       {
+               fprintf(stderr,
+                               _("%s: unexpected termination of replication stream: %s"),
+                               progname, PQresultErrorMessage(res));
+               goto error;
+       }
+       PQclear(res);
+
+       if (copybuf != NULL)
+               PQfreemem(copybuf);
+
+       if (outfd != -1 && strcmp(outfile, "-") != 0)
+       {
+               int64 t = feGetCurrentTimestamp();
+
+               /* no need to jump to error on failure here, we're finishing anyway */
+               OutputFsync(t);
+
+               if (close(outfd) != 0)
+                       fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
+                                       progname, outfile, strerror(errno));
+       }
+       outfd = -1;
+error:
+       destroyPQExpBuffer(query);
+       PQfinish(conn);
+       conn = NULL;
+}
+
+/*
+ * Unfortunately we can't do sensible signal handling on windows...
+ */
+#ifndef WIN32
+
+/*
+ * When sigint is called, just tell the system to exit at the next possible
+ * moment.
+ */
+static void
+sigint_handler(int signum)
+{
+       time_to_abort = true;
+}
+
+/*
+ * Trigger the output file to be reopened.
+ */
+static void
+sighup_handler(int signum)
+{
+       output_reopen = true;
+}
+#endif
+
+
+int
+main(int argc, char **argv)
+{
+       PGresult   *res;
+       static struct option long_options[] = {
+/* general options */
+               {"file", required_argument, NULL, 'f'},
+               {"no-loop", no_argument, NULL, 'n'},
+               {"verbose", no_argument, NULL, 'v'},
+               {"version", no_argument, NULL, 'V'},
+               {"help", no_argument, NULL, '?'},
+/* connnection options */
+               {"dbname", required_argument, NULL, 'd'},
+               {"host", required_argument, NULL, 'h'},
+               {"port", required_argument, NULL, 'p'},
+               {"username", required_argument, NULL, 'U'},
+               {"no-password", no_argument, NULL, 'w'},
+               {"password", no_argument, NULL, 'W'},
+/* replication options */
+               {"option", required_argument, NULL, 'o'},
+               {"plugin", required_argument, NULL, 'P'},
+               {"status-interval", required_argument, NULL, 's'},
+               {"fsync-interval", required_argument, NULL, 'F'},
+               {"slot", required_argument, NULL, 'S'},
+               {"startpos", required_argument, NULL, 'I'},
+/* action */
+               {"create", no_argument, NULL, 1},
+               {"start", no_argument, NULL, 2},
+               {"drop", no_argument, NULL, 3},
+               {NULL, 0, NULL, 0}
+       };
+       int                     c;
+       int                     option_index;
+       uint32          hi,
+                               lo;
+
+       progname = get_progname(argv[0]);
+       set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_recvlogical"));
+
+       if (argc > 1)
+       {
+               if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+               {
+                       usage();
+                       exit(0);
+               }
+               else if (strcmp(argv[1], "-V") == 0 ||
+                                strcmp(argv[1], "--version") == 0)
+               {
+                       puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
+                       exit(0);
+               }
+       }
+
+       while ((c = getopt_long(argc, argv, "f:F:nvd:h:o:p:U:wWP:s:S:",
+                                                       long_options, &option_index)) != -1)
+       {
+               switch (c)
+               {
+/* general options */
+                       case 'f':
+                               outfile = pg_strdup(optarg);
+                               break;
+                       case 'n':
+                               noloop = 1;
+                               break;
+                       case 'v':
+                               verbose++;
+                               break;
+/* connnection options */
+                       case 'd':
+                               dbname = pg_strdup(optarg);
+                               break;
+                       case 'h':
+                               dbhost = pg_strdup(optarg);
+                               break;
+                       case 'p':
+                               if (atoi(optarg) <= 0)
+                               {
+                                       fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
+                                                       progname, optarg);
+                                       exit(1);
+                               }
+                               dbport = pg_strdup(optarg);
+                               break;
+                       case 'U':
+                               dbuser = pg_strdup(optarg);
+                               break;
+                       case 'w':
+                               dbgetpassword = -1;
+                               break;
+                       case 'W':
+                               dbgetpassword = 1;
+                               break;
+/* replication options */
+                       case 'o':
+                               {
+                                       char *data = pg_strdup(optarg);
+                                       char *val = strchr(data, '=');
+
+                                       if (val != NULL)
+                                       {
+                                               /* remove =; separate data from val */
+                                               *val = '\0';
+                                               val++;
+                                       }
+
+                                       noptions += 1;
+                                       options = pg_realloc(options, sizeof(char*) * noptions * 2);
+
+                                       options[(noptions - 1) * 2] = data;
+                                       options[(noptions - 1) * 2 + 1] = val;
+                               }
+
+                               break;
+                       case 'P':
+                               plugin = pg_strdup(optarg);
+                               break;
+                       case 's':
+                               standby_message_timeout = atoi(optarg) * 1000;
+                               if (standby_message_timeout < 0)
+                               {
+                                       fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
+                                                       progname, optarg);
+                                       exit(1);
+                               }
+                               break;
+                       case 'F':
+                               fsync_interval = atoi(optarg) * 1000;
+                               if (fsync_interval < 0)
+                               {
+                                       fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
+                                                       progname, optarg);
+                                       exit(1);
+                               }
+                               break;
+                       case 'S':
+                               replication_slot = pg_strdup(optarg);
+                               break;
+                       case 'I':
+                               if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
+                               {
+                                       fprintf(stderr,
+                                                       _("%s: could not parse start position \"%s\"\n"),
+                                                       progname, optarg);
+                                       exit(1);
+                               }
+                               startpos = ((uint64) hi) << 32 | lo;
+                               break;
+/* action */
+                       case 1:
+                               do_create_slot = true;
+                               break;
+                       case 2:
+                               do_start_slot = true;
+                               break;
+                       case 3:
+                               do_drop_slot = true;
+                               break;
+
+                       default:
+
+                               /*
+                                * getopt_long already emitted a complaint
+                                */
+                               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                                               progname);
+                               exit(1);
+               }
+       }
+
+       /*
+        * Any non-option arguments?
+        */
+       if (optind < argc)
+       {
+               fprintf(stderr,
+                               _("%s: too many command-line arguments (first is \"%s\")\n"),
+                               progname, argv[optind]);
+               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                               progname);
+               exit(1);
+       }
+
+       /*
+        * Required arguments
+        */
+       if (replication_slot == NULL)
+       {
+               fprintf(stderr, _("%s: no slot specified\n"), progname);
+               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                               progname);
+               exit(1);
+       }
+
+       if (do_start_slot && outfile == NULL)
+       {
+               fprintf(stderr, _("%s: no target file specified\n"), progname);
+               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                               progname);
+               exit(1);
+       }
+
+       if (!do_drop_slot && dbname == NULL)
+       {
+               fprintf(stderr, _("%s: no database specified\n"), progname);
+               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                               progname);
+               exit(1);
+       }
+
+       if (!do_drop_slot && !do_create_slot && !do_start_slot)
+       {
+               fprintf(stderr, _("%s: at least one action needs to be specified\n"), progname);
+               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                               progname);
+               exit(1);
+       }
+
+       if (do_drop_slot && (do_create_slot || do_start_slot))
+       {
+               fprintf(stderr, _("%s: --stop cannot be combined with --init or --start\n"), progname);
+               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                               progname);
+               exit(1);
+       }
+
+       if (startpos && (do_create_slot || do_drop_slot))
+       {
+               fprintf(stderr, _("%s: --startpos cannot be combined with --init or --stop\n"), progname);
+               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                               progname);
+               exit(1);
+       }
+
+#ifndef WIN32
+       pqsignal(SIGINT, sigint_handler);
+       pqsignal(SIGHUP, sighup_handler);
+#endif
+
+       /*
+        * don't really need this but it actually helps to get more precise error
+        * messages about authentication, required GUCs and such without starting
+        * to loop around connection attempts lateron.
+        */
+       {
+               conn = GetConnection();
+               if (!conn)
+                       /* Error message already written in GetConnection() */
+                       exit(1);
+
+               /*
+                * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
+                * position.
+                */
+               res = PQexec(conn, "IDENTIFY_SYSTEM");
+               if (PQresultStatus(res) != PGRES_TUPLES_OK)
+               {
+                       fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+                                       progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
+                       disconnect_and_exit(1);
+               }
+
+               if (PQntuples(res) != 1 || PQnfields(res) < 4)
+               {
+                       fprintf(stderr,
+                                       _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
+                                       progname, PQntuples(res), PQnfields(res), 1, 4);
+                       disconnect_and_exit(1);
+               }
+               PQclear(res);
+       }
+
+
+       /*
+        * stop a replication slot
+        */
+       if (do_drop_slot)
+       {
+               char            query[256];
+
+               if (verbose)
+                       fprintf(stderr,
+                                       _("%s: freeing replication slot \"%s\"\n"),
+                                       progname, replication_slot);
+
+               snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"",
+                                replication_slot);
+               res = PQexec(conn, query);
+               if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               {
+                       fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+                                       progname, query, PQerrorMessage(conn));
+                       disconnect_and_exit(1);
+               }
+
+               if (PQntuples(res) != 0 || PQnfields(res) != 0)
+               {
+                       fprintf(stderr,
+                                       _("%s: could not stop logical rep: got %d rows and %d fields, expected %d rows and %d fields\n"),
+                                       progname, PQntuples(res), PQnfields(res), 0, 0);
+                       disconnect_and_exit(1);
+               }
+
+               PQclear(res);
+               disconnect_and_exit(0);
+       }
+
+       /*
+        * init a replication slot
+        */
+       if (do_create_slot)
+       {
+               char            query[256];
+
+               if (verbose)
+                       fprintf(stderr,
+                                       _("%s: initializing replication slot \"%s\"\n"),
+                                       progname, replication_slot);
+
+               snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
+                                replication_slot, plugin);
+
+               res = PQexec(conn, query);
+               if (PQresultStatus(res) != PGRES_TUPLES_OK)
+               {
+                       fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+                                       progname, query, PQerrorMessage(conn));
+                       disconnect_and_exit(1);
+               }
+
+               if (PQntuples(res) != 1 || PQnfields(res) != 4)
+               {
+                       fprintf(stderr,
+                                       _("%s: could not init logical rep: got %d rows and %d fields, expected %d rows and %d fields\n"),
+                                       progname, PQntuples(res), PQnfields(res), 1, 4);
+                       disconnect_and_exit(1);
+               }
+
+               if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+               {
+                       fprintf(stderr,
+                                       _("%s: could not parse log location \"%s\"\n"),
+                                       progname, PQgetvalue(res, 0, 1));
+                       disconnect_and_exit(1);
+               }
+               startpos = ((uint64) hi) << 32 | lo;
+
+               replication_slot = strdup(PQgetvalue(res, 0, 0));
+               PQclear(res);
+       }
+
+
+       if (!do_start_slot)
+               disconnect_and_exit(0);
+
+       while (true)
+       {
+               StreamLog();
+               if (time_to_abort)
+               {
+                       /*
+                        * We've been Ctrl-C'ed. That's not an error, so exit without an
+                        * errorcode.
+                        */
+                       disconnect_and_exit(0);
+               }
+               else if (noloop)
+               {
+                       fprintf(stderr, _("%s: disconnected.\n"), progname);
+                       exit(1);
+               }
+               else
+               {
+                       fprintf(stderr,
+                       /* translator: check source for value for %d */
+                                       _("%s: disconnected. Waiting %d seconds to try again.\n"),
+                                       progname, RECONNECT_SLEEP_TIME);
+                       pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
+               }
+       }
+}
index febe3d1a2b7063917332d9008b9d27f442e55e52..55f3d7f367ec8f81f09bc34f5d5e4911797ac95c 100644 (file)
  *               src/bin/pg_basebackup/receivelog.c
  *-------------------------------------------------------------------------
  */
+
 #include "postgres_fe.h"
 
 #include <sys/stat.h>
-#include <sys/time.h>
-#include <sys/types.h>
 #include <unistd.h>
-/* for ntohl/htonl */
-#include <netinet/in.h>
-#include <arpa/inet.h>
-
-#include "libpq-fe.h"
-#include "access/xlog_internal.h"
 
+/* local includes */
 #include "receivelog.h"
 #include "streamutil.h"
 
+#include "libpq-fe.h"
+#include "access/xlog_internal.h"
+
 
 /* fd and filename for currently open WAL file */
 static int     walfile = -1;
@@ -194,63 +191,6 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
 }
 
 
-/*
- * Local version of GetCurrentTimestamp(), since we are not linked with
- * backend code. The protocol always uses integer timestamps, regardless of
- * server setting.
- */
-static int64
-localGetCurrentTimestamp(void)
-{
-       int64           result;
-       struct timeval tp;
-
-       gettimeofday(&tp, NULL);
-
-       result = (int64) tp.tv_sec -
-               ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
-
-       result = (result * USECS_PER_SEC) + tp.tv_usec;
-
-       return result;
-}
-
-/*
- * Local version of TimestampDifference(), since we are not linked with
- * backend code.
- */
-static void
-localTimestampDifference(int64 start_time, int64 stop_time,
-                                                long *secs, int *microsecs)
-{
-       int64           diff = stop_time - start_time;
-
-       if (diff <= 0)
-       {
-               *secs = 0;
-               *microsecs = 0;
-       }
-       else
-       {
-               *secs = (long) (diff / USECS_PER_SEC);
-               *microsecs = (int) (diff % USECS_PER_SEC);
-       }
-}
-
-/*
- * Local version of TimestampDifferenceExceeds(), since we are not
- * linked with backend code.
- */
-static bool
-localTimestampDifferenceExceeds(int64 start_time,
-                                                               int64 stop_time,
-                                                               int msec)
-{
-       int64           diff = stop_time - start_time;
-
-       return (diff >= msec * INT64CONST(1000));
-}
-
 /*
  * Check if a timeline history file exists.
  */
@@ -370,47 +310,6 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co
        return true;
 }
 
-/*
- * Converts an int64 to network byte order.
- */
-static void
-sendint64(int64 i, char *buf)
-{
-       uint32          n32;
-
-       /* High order half first, since we're doing MSB-first */
-       n32 = (uint32) (i >> 32);
-       n32 = htonl(n32);
-       memcpy(&buf[0], &n32, 4);
-
-       /* Now the low order half */
-       n32 = (uint32) i;
-       n32 = htonl(n32);
-       memcpy(&buf[4], &n32, 4);
-}
-
-/*
- * Converts an int64 from network byte order to native format.
- */
-static int64
-recvint64(char *buf)
-{
-       int64           result;
-       uint32          h32;
-       uint32          l32;
-
-       memcpy(&h32, buf, 4);
-       memcpy(&l32, buf + 4, 4);
-       h32 = ntohl(h32);
-       l32 = ntohl(l32);
-
-       result = h32;
-       result <<= 32;
-       result |= l32;
-
-       return result;
-}
-
 /*
  * Send a Standby Status Update message to server.
  */
@@ -422,16 +321,16 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
 
        replybuf[len] = 'r';
        len += 1;
-       sendint64(blockpos, &replybuf[len]);            /* write */
+       fe_sendint64(blockpos, &replybuf[len]);         /* write */
        len += 8;
        if (reportFlushPosition)
-               sendint64(lastFlushPosition, &replybuf[len]);           /* flush */
+               fe_sendint64(lastFlushPosition, &replybuf[len]);                /* flush */
        else
-               sendint64(InvalidXLogRecPtr, &replybuf[len]);           /* flush */
+               fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);                /* flush */
        len += 8;
-       sendint64(InvalidXLogRecPtr, &replybuf[len]);           /* apply */
+       fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);                /* apply */
        len += 8;
-       sendint64(now, &replybuf[len]);         /* sendTime */
+       fe_sendint64(now, &replybuf[len]);              /* sendTime */
        len += 8;
        replybuf[len] = replyRequested ? 1 : 0;         /* replyRequested */
        len += 1;
@@ -864,9 +763,9 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                /*
                 * Potentially send a status message to the master
                 */
-               now = localGetCurrentTimestamp();
+               now = feGetCurrentTimestamp();
                if (still_sending && standby_message_timeout > 0 &&
-                       localTimestampDifferenceExceeds(last_status, now,
+                       feTimestampDifferenceExceeds(last_status, now,
                                                                                        standby_message_timeout))
                {
                        /* Time to send feedback! */
@@ -895,10 +794,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                int                     usecs;
 
                                targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
-                               localTimestampDifference(now,
-                                                                                targettime,
-                                                                                &secs,
-                                                                                &usecs);
+                               feTimestampDifference(now,
+                                                                         targettime,
+                                                                         &secs,
+                                                                         &usecs);
                                if (secs <= 0)
                                        timeout.tv_sec = 1; /* Always sleep at least 1 sec */
                                else
@@ -1002,7 +901,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        /* If the server requested an immediate reply, send one. */
                        if (replyRequested && still_sending)
                        {
-                               now = localGetCurrentTimestamp();
+                               now = feGetCurrentTimestamp();
                                if (!sendFeedback(conn, blockpos, now, false))
                                        goto error;
                                last_status = now;
@@ -1032,7 +931,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                                progname, r);
                                goto error;
                        }
-                       blockpos = recvint64(&copybuf[1]);
+                       blockpos = fe_recvint64(&copybuf[1]);
 
                        /* Extract WAL location for this block */
                        xlogoff = blockpos % XLOG_SEG_SIZE;
index 7c983cd604a42a78d8a75ca4843439e3fa9344e7..f4789a580ae75df4b97fe40be89ec3fe4817ba70 100644 (file)
@@ -1,3 +1,5 @@
+#include "libpq-fe.h"
+
 #include "access/xlogdefs.h"
 
 /*
index 041076ff1d73976b9baf1f7a40dd3c87298d4a67..e440dc4e244d58e85a7136a4b2edefcc9738684d 100644 (file)
  */
 
 #include "postgres_fe.h"
-#include "streamutil.h"
 
 #include <stdio.h>
 #include <string.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+/* for ntohl/htonl */
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+/* local includes */
+#include "receivelog.h"
+#include "streamutil.h"
+
+#include "common/fe_memutils.h"
+#include "datatype/timestamp.h"
 
 const char *progname;
 char      *connection_string = NULL;
@@ -23,6 +36,7 @@ char     *dbhost = NULL;
 char      *dbuser = NULL;
 char      *dbport = NULL;
 char      *replication_slot = NULL;
+char      *dbname = NULL;
 int                    dbgetpassword = 0;      /* 0=auto, -1=never, 1=always */
 static char *dbpassword = NULL;
 PGconn    *conn = NULL;
@@ -87,10 +101,10 @@ GetConnection(void)
        }
 
        keywords[i] = "dbname";
-       values[i] = "replication";
+       values[i] = dbname == NULL ? "replication" : dbname;
        i++;
        keywords[i] = "replication";
-       values[i] = "true";
+       values[i] = dbname == NULL ? "true" : "database";
        i++;
        keywords[i] = "fallback_application_name";
        values[i] = progname;
@@ -212,3 +226,102 @@ GetConnection(void)
 
        return tmpconn;
 }
+
+
+/*
+ * Frontend version of GetCurrentTimestamp(), since we are not linked with
+ * backend code. The protocol always uses integer timestamps, regardless of
+ * server setting.
+ */
+int64
+feGetCurrentTimestamp(void)
+{
+       int64           result;
+       struct timeval tp;
+
+       gettimeofday(&tp, NULL);
+
+       result = (int64) tp.tv_sec -
+               ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
+
+       result = (result * USECS_PER_SEC) + tp.tv_usec;
+
+       return result;
+}
+
+/*
+ * Frontend version of TimestampDifference(), since we are not linked with
+ * backend code.
+ */
+void
+feTimestampDifference(int64 start_time, int64 stop_time,
+                                                long *secs, int *microsecs)
+{
+       int64           diff = stop_time - start_time;
+
+       if (diff <= 0)
+       {
+               *secs = 0;
+               *microsecs = 0;
+       }
+       else
+       {
+               *secs = (long) (diff / USECS_PER_SEC);
+               *microsecs = (int) (diff % USECS_PER_SEC);
+       }
+}
+
+/*
+ * Frontend version of TimestampDifferenceExceeds(), since we are not
+ * linked with backend code.
+ */
+bool
+feTimestampDifferenceExceeds(int64 start_time,
+                                                               int64 stop_time,
+                                                               int msec)
+{
+       int64           diff = stop_time - start_time;
+
+       return (diff >= msec * INT64CONST(1000));
+}
+
+/*
+ * Converts an int64 to network byte order.
+ */
+void
+fe_sendint64(int64 i, char *buf)
+{
+       uint32          n32;
+
+       /* High order half first, since we're doing MSB-first */
+       n32 = (uint32) (i >> 32);
+       n32 = htonl(n32);
+       memcpy(&buf[0], &n32, 4);
+
+       /* Now the low order half */
+       n32 = (uint32) i;
+       n32 = htonl(n32);
+       memcpy(&buf[4], &n32, 4);
+}
+
+/*
+ * Converts an int64 from network byte order to native format.
+ */
+int64
+fe_recvint64(char *buf)
+{
+       int64           result;
+       uint32          h32;
+       uint32          l32;
+
+       memcpy(&h32, buf, 4);
+       memcpy(&l32, buf + 4, 4);
+       h32 = ntohl(h32);
+       l32 = ntohl(l32);
+
+       result = h32;
+       result <<= 32;
+       result |= l32;
+
+       return result;
+}
index 7c7d0228897780dae2f12328501401e327050f52..d0f3799d1e34282e5e12de3ea725f5c6b203fba2 100644 (file)
@@ -5,6 +5,7 @@ extern char *connection_string;
 extern char *dbhost;
 extern char *dbuser;
 extern char *dbport;
+extern char *dbname;
 extern int     dbgetpassword;
 extern char *replication_slot;
 
@@ -12,3 +13,12 @@ extern char *replication_slot;
 extern PGconn *conn;
 
 extern PGconn *GetConnection(void);
+
+extern int64 feGetCurrentTimestamp(void);
+extern void feTimestampDifference(int64 start_time, int64 stop_time,
+                                                                        long *secs, int *microsecs);
+
+extern bool feTimestampDifferenceExceeds(int64 start_time, int64 stop_time,
+                                                                                       int msec);
+extern void fe_sendint64(int64 i, char *buf);
+extern int64 fe_recvint64(char *buf);