]> granicus.if.org Git - postgresql/commitdiff
Implement streaming xlog for backup tools
authorMagnus Hagander <magnus@hagander.net>
Wed, 26 Oct 2011 18:13:33 +0000 (20:13 +0200)
committerMagnus Hagander <magnus@hagander.net>
Wed, 26 Oct 2011 18:13:33 +0000 (20:13 +0200)
Add option for parallel streaming of the transaction log while a
base backup is running, to get the logfiles before the server has
removed them.

Also add a tool called pg_receivexlog, which streams the transaction
log into files, creating a log archive without having to wait for
segments to complete, thus decreasing the window of data loss without
having to waste space using archive_timeout. This works best in
combination with archive_command - suggested usage docs etc coming later.

13 files changed:
doc/src/sgml/ref/allfiles.sgml
doc/src/sgml/ref/pg_basebackup.sgml
doc/src/sgml/ref/pg_receivexlog.sgml [new file with mode: 0644]
doc/src/sgml/reference.sgml
src/bin/pg_basebackup/.gitignore
src/bin/pg_basebackup/Makefile
src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivexlog.c [new file with mode: 0644]
src/bin/pg_basebackup/receivelog.c [new file with mode: 0644]
src/bin/pg_basebackup/receivelog.h [new file with mode: 0644]
src/bin/pg_basebackup/streamutil.c [new file with mode: 0644]
src/bin/pg_basebackup/streamutil.h [new file with mode: 0644]
src/tools/msvc/Mkvcbuild.pm

index 8a8616b000840ac520e49dbd6ddbaa8084a6d761..382d297bdb2ae8865a6d2794c5d8a8c135418f69 100644 (file)
@@ -172,6 +172,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY pgCtl              SYSTEM "pg_ctl-ref.sgml">
 <!ENTITY pgDump             SYSTEM "pg_dump.sgml">
 <!ENTITY pgDumpall          SYSTEM "pg_dumpall.sgml">
+<!ENTITY pgReceivexlog      SYSTEM "pg_receivexlog.sgml">
 <!ENTITY pgResetxlog        SYSTEM "pg_resetxlog.sgml">
 <!ENTITY pgRestore          SYSTEM "pg_restore.sgml">
 <!ENTITY postgres           SYSTEM "postgres-ref.sgml">
index 25280045412e0112586427e29edd9250a198c470..8c8c78f0d15030475e80141075291ec582fd4f9a 100644 (file)
@@ -143,8 +143,8 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
-      <term><option>-x</option></term>
-      <term><option>--xlog</option></term>
+      <term><option>-x <replaceable class="parameter">method</replaceable></option></term>
+      <term><option>--xlog=<replaceable class="parameter">method</replaceable></option></term>
       <listitem>
        <para>
         Includes the required transaction log files (WAL files) in the
@@ -154,16 +154,43 @@ PostgreSQL documentation
         to consult the log archive, thus making this a completely standalone
         backup.
        </para>
-       <note>
-        <para>
-         The transaction log files are collected at the end of the backup.
-         Therefore, it is necessary for the
-         <xref linkend="guc-wal-keep-segments"> parameter to be set high
-         enough that the log is not removed before the end of the backup.
-         If the log has been rotated when it's time to transfer it, the
-         backup will fail and be unusable.
-        </para>
-       </note>
+       <para>
+        The following methods for collecting the transaction logs are
+        supported:
+
+        <variablelist>
+         <varlistentry>
+          <term><literal>f</literal></term>
+          <term><literal>fetch</literal></term>
+          <listitem>
+           <para>
+            The transaction log files are collected at the end of the backup.
+            Therefore, it is necessary for the
+            <xref linkend="guc-wal-keep-segments"> parameter to be set high
+             enough that the log is not removed before the end of the backup.
+             If the log has been rotated when it's time to transfer it, the
+             backup will fail and be unusable.
+           </para>
+          </listitem>
+         </varlistentry>
+
+         <varlistentry>
+          <term><literal>s</literal></term>
+          <term><literal>stream</literal></term>
+          <listitem>
+           <para>
+            Stream the transaction log while the backup is created. This will
+            open a second connection to the server and start streaming the
+            transaction log in parallel while running the backup. Therefore,
+            it will use up two slots configured by the
+            <xref linkend="guc-max-wal-senders"> parameter. As long as the
+             client can keep up with transaction log received, using this mode
+             requires no extra transaction logs to be saved on the master.
+           </para>
+          </listitem>
+         </varlistentry>
+        </variablelist>
+       </para>
       </listitem>
      </varlistentry>
 
@@ -260,6 +287,20 @@ PostgreSQL documentation
     The following command-line options control the database connection parameters.
 
     <variablelist>
+     <varlistentry>
+      <term><option>-s <replaceable class="parameter">interval</replaceable></option></term>
+      <term><option>--statusint=<replaceable class="parameter">interval</replaceable></option></term>
+      <listitem>
+       <para>
+        Specifies the number of seconds between status packets sent back to the
+        server. This is required when streaming the transaction log (using
+        <literal>--xlog=stream</literal>) if replication timeout is configured
+        on the server, and allows for easier monitoring. The default value is
+        10 seconds.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-h <replaceable class="parameter">host</replaceable></option></term>
       <term><option>--host=<replaceable class="parameter">host</replaceable></option></term>
diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml
new file mode 100644 (file)
index 0000000..9a2a24b
--- /dev/null
@@ -0,0 +1,270 @@
+<!--
+doc/src/sgml/ref/pg_receivexlog.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="app-pgreceivexlog">
+ <refmeta>
+  <refentrytitle>pg_receivexlog</refentrytitle>
+  <manvolnum>1</manvolnum>
+  <refmiscinfo>Application</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+  <refname>pg_receivexlog</refname>
+  <refpurpose>streams transaction logs from a <productname>PostgreSQL</productname> cluster</refpurpose>
+ </refnamediv>
+
+ <indexterm zone="app-pgreceivexlog">
+  <primary>pg_receivexlog</primary>
+ </indexterm>
+
+ <refsynopsisdiv>
+  <cmdsynopsis>
+   <command>pg_receivexlog</command>
+   <arg rep="repeat"><replaceable>option</></arg>
+  </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>
+   Description
+  </title>
+  <para>
+   <application>pg_receivexlog</application> is used to stream transaction log
+   from a running <productname>PostgreSQL</productname> cluster. The transaction
+   log is streamed using the streaming replication protocol, and is written
+   to a local directory of files. This directory can be used as the archive
+   location for doing a restore using point-in-time recovery (see
+   <xref linkend="continuous-archiving">).
+  </para>
+
+  <para>
+   <application>pg_receivexlog</application> streams the transaction
+   log in real time as it's being generated on the server, and does not wait
+   for segments to complete like <xref linkend="guc-archive-command"> does.
+   For this reason, it is not necessary to set
+   <xref linkend="guc-archive-timeout"> when using
+    <application>pg_receivexlog</application>.
+  </para>
+
+  <para>
+   The transaction log is streamed over a regular
+   <productname>PostgreSQL</productname> connection, and uses the
+   replication protocol. The connection must be
+   made with a user having <literal>REPLICATION</literal> permissions (see
+   <xref linkend="role-attributes">), and the user must be granted explicit
+   permissions in <filename>pg_hba.conf</filename>. The server must also
+   be configured with <xref linkend="guc-max-wal-senders"> set high enough
+   to leave at least one session available for the stream.
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Options</title>
+
+   <para>
+    The following command-line options control the location and format of the
+    output.
+
+    <variablelist>
+     <varlistentry>
+      <term><option>-D <replaceable class="parameter">directory</replaceable></option></term>
+      <term><option>--dir=<replaceable class="parameter">directory</replaceable></option></term>
+      <listitem>
+       <para>
+        Directory to write the output to.
+       </para>
+       <para>
+        This parameter is required.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+   </para>
+   <para>
+    The following command-line options control the running of the program.
+
+    <variablelist>
+     <varlistentry>
+      <term><option>-v</option></term>
+      <term><option>--verbose</option></term>
+      <listitem>
+       <para>
+        Enables verbose mode.
+       </para>
+      </listitem>
+     </varlistentry>
+
+    </variablelist>
+   </para>
+
+   <para>
+    The following command-line options control the database connection parameters.
+
+    <variablelist>
+     <varlistentry>
+      <term><option>-s <replaceable class="parameter">interval</replaceable></option></term>
+      <term><option>--statusint=<replaceable class="parameter">interval</replaceable></option></term>
+      <listitem>
+       <para>
+        Specifies the number of seconds between status packets sent back to the
+        server. This is required if replication timeout is configured on the
+        server, and allows for easier monitoring. The default value is
+        10 seconds.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-h <replaceable class="parameter">host</replaceable></option></term>
+      <term><option>--host=<replaceable class="parameter">host</replaceable></option></term>
+      <listitem>
+       <para>
+        Specifies the host name of the machine on which the server is
+        running.  If the value begins with a slash, it is used as the
+        directory for the Unix domain socket. The default is taken
+        from the <envar>PGHOST</envar> environment variable, if set,
+        else a Unix domain socket connection is attempted.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-p <replaceable class="parameter">port</replaceable></option></term>
+      <term><option>--port=<replaceable class="parameter">port</replaceable></option></term>
+      <listitem>
+       <para>
+        Specifies the TCP port or local Unix domain socket file
+        extension on which the server is listening for connections.
+        Defaults to the <envar>PGPORT</envar> environment variable, if
+        set, or a compiled-in default.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-U <replaceable>username</replaceable></option></term>
+      <term><option>--username=<replaceable class="parameter">username</replaceable></option></term>
+      <listitem>
+       <para>
+        User name to connect as.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-w</></term>
+      <term><option>--no-password</></term>
+      <listitem>
+       <para>
+        Never issue a password prompt.  If the server requires
+        password authentication and a password is not available by
+        other means such as a <filename>.pgpass</filename> file, the
+        connection attempt will fail.  This option can be useful in
+        batch jobs and scripts where no user is present to enter a
+        password.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-W</option></term>
+      <term><option>--password</option></term>
+      <listitem>
+       <para>
+        Force <application>pg_receivexlog</application> to prompt for a
+        password before connecting to a database.
+       </para>
+
+       <para>
+        This option is never essential, since
+        <application>pg_receivexlog</application> will automatically prompt
+        for a password if the server demands password authentication.
+        However, <application>pg_receivexlog</application> will waste a
+        connection attempt finding out that the server wants a password.
+        In some cases it is worth typing <option>-W</> to avoid the extra
+        connection attempt.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+   </para>
+
+   <para>
+    Other, less commonly used, parameters are also available:
+
+    <variablelist>
+     <varlistentry>
+       <term><option>-V</></term>
+       <term><option>--version</></term>
+       <listitem>
+       <para>
+       Print the <application>pg_receivexlog</application> version and exit.
+       </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
+       <term><option>-?</></term>
+       <term><option>--help</></term>
+       <listitem>
+       <para>
+       Show help about <application>pg_receivexlog</application> command line
+       arguments, and exit.
+       </para>
+       </listitem>
+     </varlistentry>
+
+    </variablelist>
+   </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Environment</title>
+
+  <para>
+   This utility, like most other <productname>PostgreSQL</> utilities,
+   uses the environment variables supported by <application>libpq</>
+   (see <xref linkend="libpq-envars">).
+  </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Notes</title>
+
+  <para>
+   When using <application>pg_receivexlog</application> instead of
+   <xref linkend="guc-archive-command">, the server will continue to
+   recycle transaction log files even if the backups are not properly
+   archived, since there is no command that fails. This can be worked
+   around by having an <xref linkend="guc-archive-command"> that fails
+   when the file has not been properly archived yet.
+  </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Examples</title>
+
+  <para>
+   To stream the transaction log from the server at
+   <literal>mydbserver</literal> and store it in the local directory
+   <filename>/usr/local/pgsql/archive</filename>:
+   <screen>
+    <prompt>$</prompt> <userinput>pg_receivexlog -h mydbserver -D /home/pgbackup/archive</userinput>
+   </screen>
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>See Also</title>
+
+  <simplelist type="inline">
+   <member><xref linkend="APP-PGBASEBACKUP"></member>
+  </simplelist>
+ </refsect1>
+
+</refentry>
index 5fd6410991dc733e3a3e3baeba3519d2cefe17e2..7326519708eb41494751de8fd33a24207ff0f827 100644 (file)
    &pgConfig;
    &pgDump;
    &pgDumpall;
+   &pgReceivexlog;
    &pgRestore;
    &psqlRef;
    &reindexdb;
index a2510fbd8264d345d28d2c4d3d3ab0b31490f387..49f503a4556ba1ae53fe0c6858b2248f66483f2b 100644 (file)
@@ -1 +1,2 @@
 /pg_basebackup
+/pg_receivexlog
\ No newline at end of file
index ccb15025ef5fa5cfd371965ed9eb226eb82eeeed..464506560d66ad6eb2e6973457a31fa47e03159c 100644 (file)
@@ -18,21 +18,26 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 
-OBJS=  pg_basebackup.o $(WIN32RES)
+OBJS=receivelog.o streamutil.o $(WIN32RES)
 
-all: pg_basebackup
+all: pg_basebackup pg_receivexlog
 
-pg_basebackup: $(OBJS) | submake-libpq submake-libpgport
-       $(CC) $(CFLAGS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
+       $(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
+pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport
+       $(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
 
 install: all installdirs
        $(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+       $(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
 
 installdirs:
        $(MKDIR_P) '$(DESTDIR)$(bindir)'
 
 uninstall:
        rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+       rm -f '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
 
 clean distclean maintainer-clean:
-       rm -f pg_basebackup$(X) $(OBJS)
+       rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o
index 5c62be576ee9e45c55d6d0f121db43af2d39a226..68e40f478ffe43f52ac15c15cee81193dcfe3ea9 100644 (file)
  *-------------------------------------------------------------------------
  */
 
-#include "postgres_fe.h"
+/*
+ * We have to use postgres.h not postgres_fe.h here, because there's so much
+ * backend-only stuff in the XLOG include files we need.  But we need a
+ * frontend-ish environment otherwise. Hence this ugly hack.
+ */
+#define FRONTEND 1
+#include "postgres.h"
 #include "libpq-fe.h"
 
 #include <unistd.h>
 #include <dirent.h>
 #include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/wait.h>
 
 #ifdef HAVE_LIBZ
 #include <zlib.h>
 
 #include "getopt_long.h"
 
+#include "receivelog.h"
+#include "streamutil.h"
+
 
 /* Global options */
-static const char *progname;
 char      *basedir = NULL;
 char           format = 'p';           /* p(lain)/t(ar) */
 char      *label = "pg_basebackup base backup";
@@ -34,38 +44,38 @@ bool                showprogress = false;
 int                    verbose = 0;
 int                    compresslevel = 0;
 bool           includewal = false;
+bool           streamwal = false;
 bool           fastcheckpoint = false;
-char      *dbhost = NULL;
-char      *dbuser = NULL;
-char      *dbport = NULL;
-int                    dbgetpassword = 0;      /* 0=auto, -1=never, 1=always */
+int                    standby_message_timeout = 10;           /* 10 sec = default */
 
 /* Progress counters */
 static uint64 totalsize;
 static uint64 totaldone;
 static int     tablespacecount;
 
-/* Connection kept global so we can disconnect easily */
-static PGconn *conn = NULL;
+/* Pipe to communicate with background wal receiver process */
+#ifndef WIN32
+static int     bgpipe[2] = {-1, -1};
+#endif
 
-#define disconnect_and_exit(code)                              \
-       {                                                                                       \
-       if (conn != NULL) PQfinish(conn);                       \
-       exit(code);                                                                     \
-       }
+/* Handle to child process */
+static pid_t bgchild = -1;
+
+/* End position for xlog streaming, empty string if unknown yet */
+static XLogRecPtr xlogendptr;
+static int     has_xlogendptr = 0;
 
 /* Function headers */
-static char *xstrdup(const char *s);
-static void *xmalloc0(int size);
 static void usage(void);
 static void verify_dir_is_empty_or_create(char *dirname);
 static void progress_report(int tablespacenum, const char *filename);
-static PGconn *GetConnection(void);
 
 static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
 static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
 static void BaseBackup(void);
 
+static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+
 #ifdef HAVE_LIBZ
 static const char *
 get_gz_error(gzFile *gzf)
@@ -81,39 +91,6 @@ get_gz_error(gzFile *gzf)
 }
 #endif
 
-/*
- * strdup() and malloc() replacements that prints an error and exits
- * if something goes wrong. Can never return NULL.
- */
-static char *
-xstrdup(const char *s)
-{
-       char       *result;
-
-       result = strdup(s);
-       if (!result)
-       {
-               fprintf(stderr, _("%s: out of memory\n"), progname);
-               exit(1);
-       }
-       return result;
-}
-
-static void *
-xmalloc0(int size)
-{
-       void       *result;
-
-       result = malloc(size);
-       if (!result)
-       {
-               fprintf(stderr, _("%s: out of memory\n"), progname);
-               exit(1);
-       }
-       MemSet(result, 0, size);
-       return result;
-}
-
 
 static void
 usage(void)
@@ -125,7 +102,7 @@ usage(void)
        printf(_("\nOptions controlling the output:\n"));
        printf(_("  -D, --pgdata=DIRECTORY   receive base backup into directory\n"));
        printf(_("  -F, --format=p|t         output format (plain, tar)\n"));
-       printf(_("  -x, --xlog               include required WAL files in backup\n"));
+       printf(_("  -x, --xlog=fetch|stream  include required WAL files in backup\n"));
        printf(_("  -z, --gzip               compress tar output\n"));
        printf(_("  -Z, --compress=0-9       compress tar output with given compression level\n"));
        printf(_("\nGeneral options:\n"));
@@ -137,6 +114,7 @@ usage(void)
        printf(_("  --help                   show this help, then exit\n"));
        printf(_("  --version                output version information, then exit\n"));
        printf(_("\nConnection options:\n"));
+       printf(_("  -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n"));
        printf(_("  -h, --host=HOSTNAME      database server host or socket directory\n"));
        printf(_("  -p, --port=PORT          database server port number\n"));
        printf(_("  -U, --username=NAME      connect as specified database user\n"));
@@ -146,6 +124,199 @@ usage(void)
 }
 
 
+/*
+ * Called in the background process whenever a complete segment of WAL
+ * has been received.
+ * On Unix, we check to see if there is any data on our pipe
+ * (which would mean we have a stop position), and if it is, check if
+ * it is time to stop.
+ * On Windows, we are in a single process, so we can just check if it's
+ * time to stop.
+ */
+static bool
+segment_callback(XLogRecPtr segendpos, uint32 timeline)
+{
+       if (!has_xlogendptr)
+       {
+#ifndef WIN32
+               fd_set          fds;
+               struct timeval tv;
+               int                     r;
+
+               /*
+                * Don't have the end pointer yet - check our pipe to see if it has
+                * been sent yet.
+                */
+               FD_ZERO(&fds);
+               FD_SET(bgpipe[0], &fds);
+
+               MemSet(&tv, 0, sizeof(tv));
+
+               r = select(bgpipe[0] + 1, &fds, NULL, NULL, &tv);
+               if (r == 1)
+               {
+                       char            xlogend[64];
+
+                       MemSet(xlogend, 0, sizeof(xlogend));
+                       r = piperead(bgpipe[0], xlogend, sizeof(xlogend));
+                       if (r < 0)
+                       {
+                               fprintf(stderr, _("%s: could not read from ready pipe: %s\n"),
+                                               progname, strerror(errno));
+                               exit(1);
+                       }
+
+                       if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
+                       {
+                               fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
+                                               progname, xlogend);
+                               exit(1);
+                       }
+                       has_xlogendptr = 1;
+
+                       /*
+                        * Fall through to check if we've reached the point further
+                        * already.
+                        */
+               }
+               else
+               {
+                       /*
+                        * No data received on the pipe means we don't know the end
+                        * position yet - so just say it's not time to stop yet.
+                        */
+                       return false;
+               }
+#else
+
+               /*
+                * On win32, has_xlogendptr is set by the main thread, so if it's not
+                * set here, we just go back and wait until it shows up.
+                */
+               return false;
+#endif
+       }
+
+       /*
+        * At this point we have an end pointer, so compare it to the current
+        * position to figure out if it's time to stop.
+        */
+       if (segendpos.xlogid > xlogendptr.xlogid ||
+               (segendpos.xlogid == xlogendptr.xlogid &&
+                segendpos.xrecoff >= xlogendptr.xrecoff))
+               return true;
+
+       /*
+        * Have end pointer, but haven't reached it yet - so tell the caller to
+        * keep streaming.
+        */
+       return false;
+}
+
+typedef struct
+{
+       PGconn     *bgconn;
+       XLogRecPtr      startptr;
+       char            xlogdir[MAXPGPATH];
+       char       *sysidentifier;
+       int                     timeline;
+}      logstreamer_param;
+
+static int
+LogStreamerMain(logstreamer_param * param)
+{
+       if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
+                                                  param->sysidentifier, param->xlogdir,
+                                                  segment_callback, NULL, standby_message_timeout))
+
+               /*
+                * Any errors will already have been reported in the function process,
+                * but we need to tell the parent that we didn't shutdown in a nice
+                * way.
+                */
+               return 1;
+
+       PQfinish(param->bgconn);
+       return 0;
+}
+
+/*
+ * Initiate background process for receiving xlog during the backup.
+ * The background stream will use its own database connection so we can
+ * stream the logfile in parallel with the backups.
+ */
+static void
+StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
+{
+       logstreamer_param *param;
+
+       param = xmalloc0(sizeof(logstreamer_param));
+       param->timeline = timeline;
+       param->sysidentifier = sysidentifier;
+
+       /* Convert the starting position */
+       if (sscanf(startpos, "%X/%X", &param->startptr.xlogid, &param->startptr.xrecoff) != 2)
+       {
+               fprintf(stderr, _("%s: invalid format of xlog location: %s\n"),
+                               progname, startpos);
+               disconnect_and_exit(1);
+       }
+       /* Round off to even segment position */
+       param->startptr.xrecoff -= param->startptr.xrecoff % XLOG_SEG_SIZE;
+
+#ifndef WIN32
+       /* Create our background pipe */
+       if (pgpipe(bgpipe) < 0)
+       {
+               fprintf(stderr, _("%s: could not create pipe for background process: %s\n"),
+                               progname, strerror(errno));
+               disconnect_and_exit(1);
+       }
+#endif
+
+       /* Get a second connection */
+       param->bgconn = GetConnection();
+
+       /*
+        * Always in plain format, so we can write to basedir/pg_xlog. But the
+        * directory entry in the tar file may arrive later, so make sure it's
+        * created before we start.
+        */
+       snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
+       verify_dir_is_empty_or_create(param->xlogdir);
+
+       /*
+        * Start a child process and tell it to start streaming. On Unix, this is
+        * a fork(). On Windows, we create a thread.
+        */
+#ifndef WIN32
+       bgchild = fork();
+       if (bgchild == 0)
+       {
+               /* in child process */
+               exit(LogStreamerMain(param));
+       }
+       else if (bgchild < 0)
+       {
+               fprintf(stderr, _("%s: could not create background process: %s\n"),
+                               progname, strerror(errno));
+               disconnect_and_exit(1);
+       }
+
+       /*
+        * Else we are in the parent process and all is well.
+        */
+#else                                                  /* WIN32 */
+       bgchild = _beginthreadex(NULL, 0, (void *) LogStreamerMain, param, 0, NULL);
+       if (bgchild == 0)
+       {
+               fprintf(stderr, _("%s: could not create background thread: %s\n"),
+                               progname, strerror(errno));
+               disconnect_and_exit(1);
+       }
+#endif
+}
+
 /*
  * Verify that the given directory exists and is empty. If it does not
  * exist, it is created. If it exists but is not empty, an error will
@@ -502,11 +673,6 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
        else
                strcpy(current_path, PQgetvalue(res, rownum, 1));
 
-       /*
-        * Make sure we're unpacking into an empty directory
-        */
-       verify_dir_is_empty_or_create(current_path);
-
        /*
         * Get the COPY data
         */
@@ -597,13 +763,21 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
                                        /*
                                         * Directory
                                         */
-                                       filename[strlen(filename) - 1] = '\0';  /* Remove trailing slash */
+                                       filename[strlen(filename) - 1] = '\0';          /* Remove trailing slash */
                                        if (mkdir(filename, S_IRWXU) != 0)
                                        {
-                                               fprintf(stderr,
+                                               /*
+                                                * When streaming WAL, pg_xlog will have been created
+                                                * by the wal receiver process, so just ignore failure
+                                                * on that.
+                                                */
+                                               if (!streamwal || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0)
+                                               {
+                                                       fprintf(stderr,
                                                        _("%s: could not create directory \"%s\": %s\n"),
-                                                               progname, filename, strerror(errno));
-                                               disconnect_and_exit(1);
+                                                                       progname, filename, strerror(errno));
+                                                       disconnect_and_exit(1);
+                                               }
                                        }
 #ifndef WIN32
                                        if (chmod(filename, (mode_t) filemode))
@@ -616,12 +790,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
                                        /*
                                         * Symbolic link
                                         */
-                                       filename[strlen(filename) - 1] = '\0';  /* Remove trailing slash */
+                                       filename[strlen(filename) - 1] = '\0';          /* Remove trailing slash */
                                        if (symlink(&copybuf[157], filename) != 0)
                                        {
                                                fprintf(stderr,
                                                                _("%s: could not create symbolic link from \"%s\" to \"%s\": %s\n"),
-                                                               progname, filename, &copybuf[157], strerror(errno));
+                                                progname, filename, &copybuf[157], strerror(errno));
                                                disconnect_and_exit(1);
                                        }
                                }
@@ -714,94 +888,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 }
 
 
-static PGconn *
-GetConnection(void)
-{
-       PGconn     *tmpconn;
-       int                     argcount = 4;   /* dbname, replication, fallback_app_name,
-                                                                * password */
-       int                     i;
-       const char **keywords;
-       const char **values;
-       char       *password = NULL;
-
-       if (dbhost)
-               argcount++;
-       if (dbuser)
-               argcount++;
-       if (dbport)
-               argcount++;
-
-       keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
-       values = xmalloc0((argcount + 1) * sizeof(*values));
-
-       keywords[0] = "dbname";
-       values[0] = "replication";
-       keywords[1] = "replication";
-       values[1] = "true";
-       keywords[2] = "fallback_application_name";
-       values[2] = progname;
-       i = 3;
-       if (dbhost)
-       {
-               keywords[i] = "host";
-               values[i] = dbhost;
-               i++;
-       }
-       if (dbuser)
-       {
-               keywords[i] = "user";
-               values[i] = dbuser;
-               i++;
-       }
-       if (dbport)
-       {
-               keywords[i] = "port";
-               values[i] = dbport;
-               i++;
-       }
-
-       while (true)
-       {
-               if (dbgetpassword == 1)
-               {
-                       /* Prompt for a password */
-                       password = simple_prompt(_("Password: "), 100, false);
-                       keywords[argcount - 1] = "password";
-                       values[argcount - 1] = password;
-               }
-
-               tmpconn = PQconnectdbParams(keywords, values, true);
-               if (password)
-                       free(password);
-
-               if (PQstatus(tmpconn) == CONNECTION_BAD &&
-                       PQconnectionNeedsPassword(tmpconn) &&
-                       dbgetpassword != -1)
-               {
-                       dbgetpassword = 1;      /* ask for password next time */
-                       PQfinish(tmpconn);
-                       continue;
-               }
-
-               if (PQstatus(tmpconn) != CONNECTION_OK)
-               {
-                       fprintf(stderr, _("%s: could not connect to server: %s"),
-                                       progname, PQerrorMessage(tmpconn));
-                       exit(1);
-               }
-
-               /* Connection ok! */
-               free(values);
-               free(keywords);
-               return tmpconn;
-       }
-}
-
 static void
 BaseBackup(void)
 {
        PGresult   *res;
+       char       *sysidentifier;
+       uint32          timeline;
        char            current_path[MAXPGPATH];
        char            escaped_label[MAXPGPATH];
        int                     i;
@@ -813,6 +905,26 @@ BaseBackup(void)
         */
        conn = GetConnection();
 
+       /*
+        * Run IDENTIFY_SYSTEM so we can get the timeline
+        */
+       res = PQexec(conn, "IDENTIFY_SYSTEM");
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       {
+               fprintf(stderr, _("%s: could not identify system: %s\n"),
+                               progname, PQerrorMessage(conn));
+               disconnect_and_exit(1);
+       }
+       if (PQntuples(res) != 1)
+       {
+               fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
+                               progname, PQntuples(res));
+               disconnect_and_exit(1);
+       }
+       sysidentifier = strdup(PQgetvalue(res, 0, 0));
+       timeline = atoi(PQgetvalue(res, 0, 1));
+       PQclear(res);
+
        /*
         * Start the actual backup
         */
@@ -820,7 +932,7 @@ BaseBackup(void)
        snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s",
                         escaped_label,
                         showprogress ? "PROGRESS" : "",
-                        includewal ? "WAL" : "",
+                        includewal && !streamwal ? "WAL" : "",
                         fastcheckpoint ? "FAST" : "",
                         includewal ? "NOWAIT" : "");
 
@@ -898,6 +1010,18 @@ BaseBackup(void)
                disconnect_and_exit(1);
        }
 
+       /*
+        * If we're streaming WAL, start the streaming session before we start
+        * receiving the actual data chunks.
+        */
+       if (streamwal)
+       {
+               if (verbose)
+                       fprintf(stderr, _("%s: starting background WAL receiver\n"),
+                                       progname);
+               StartLogStreamer(xlogstart, timeline, sysidentifier);
+       }
+
        /*
         * Start receiving chunks
         */
@@ -945,6 +1069,92 @@ BaseBackup(void)
                disconnect_and_exit(1);
        }
 
+       if (bgchild > 0)
+       {
+               int                     status;
+
+#ifndef WIN32
+               int                     r;
+#endif
+
+               if (verbose)
+                       fprintf(stderr, _("%s: waiting for background process to finish streaming...\n"), progname);
+
+#ifndef WIN32
+               if (pipewrite(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend))
+               {
+                       fprintf(stderr, _("%s: could not send command to background pipe: %s\n"),
+                                       progname, strerror(errno));
+                       disconnect_and_exit(1);
+               }
+
+               /* Just wait for the background process to exit */
+               r = waitpid(bgchild, &status, 0);
+               if (r == -1)
+               {
+                       fprintf(stderr, _("%s: could not wait for child process: %s\n"),
+                                       progname, strerror(errno));
+                       disconnect_and_exit(1);
+               }
+               if (r != bgchild)
+               {
+                       fprintf(stderr, _("%s: child %i died, expected %i\n"),
+                                       progname, r, bgchild);
+                       disconnect_and_exit(1);
+               }
+               if (!WIFEXITED(status))
+               {
+                       fprintf(stderr, _("%s: child process did not exit normally\n"),
+                                       progname);
+                       disconnect_and_exit(1);
+               }
+               if (WEXITSTATUS(status) != 0)
+               {
+                       fprintf(stderr, _("%s: child process exited with error %i\n"),
+                                       progname, WEXITSTATUS(status));
+                       disconnect_and_exit(1);
+               }
+               /* Exited normally, we're happy! */
+#else                                                  /* WIN32 */
+
+               /*
+                * On Windows, since we are in the same process, we can just store the
+                * value directly in the variable, and then set the flag that says
+                * it's there.
+                */
+               if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
+               {
+                       fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
+                                       progname, xlogend);
+                       exit(1);
+               }
+               InterlockedIncrement(&has_xlogendptr);
+
+               /* First wait for the thread to exit */
+               if (WaitForSingleObjectEx((HANDLE) bgchild, INFINITE, FALSE) != WAIT_OBJECT_0)
+               {
+                       _dosmaperr(GetLastError());
+                       fprintf(stderr, _("%s: could not wait for child thread: %s\n"),
+                                       progname, strerror(errno));
+                       disconnect_and_exit(1);
+               }
+               if (GetExitCodeThread((HANDLE) bgchild, &status) == 0)
+               {
+                       _dosmaperr(GetLastError());
+                       fprintf(stderr, _("%s: could not get child thread exit status: %s\n"),
+                                       progname, strerror(errno));
+                       disconnect_and_exit(1);
+               }
+               if (status != 0)
+               {
+                       fprintf(stderr, _("%s: child thread exited with error %u\n"),
+                                       progname, status);
+                       disconnect_and_exit(1);
+               }
+               /* Exited normally, we're happy */
+#endif
+       }
+
        /*
         * End of copy data. Final result is already checked inside the loop.
         */
@@ -964,7 +1174,7 @@ main(int argc, char **argv)
                {"pgdata", required_argument, NULL, 'D'},
                {"format", required_argument, NULL, 'F'},
                {"checkpoint", required_argument, NULL, 'c'},
-               {"xlog", no_argument, NULL, 'x'},
+               {"xlog", required_argument, NULL, 'x'},
                {"gzip", no_argument, NULL, 'z'},
                {"compress", required_argument, NULL, 'Z'},
                {"label", required_argument, NULL, 'l'},
@@ -973,6 +1183,7 @@ main(int argc, char **argv)
                {"username", required_argument, NULL, 'U'},
                {"no-password", no_argument, NULL, 'w'},
                {"password", no_argument, NULL, 'W'},
+               {"statusint", required_argument, NULL, 's'},
                {"verbose", no_argument, NULL, 'v'},
                {"progress", no_argument, NULL, 'P'},
                {NULL, 0, NULL, 0}
@@ -999,7 +1210,7 @@ main(int argc, char **argv)
                }
        }
 
-       while ((c = getopt_long(argc, argv, "D:F:xl:zZ:c:h:p:U:wWvP",
+       while ((c = getopt_long(argc, argv, "D:F:x:l:zZ:c:h:p:U:s:wWvP",
                                                        long_options, &option_index)) != -1)
        {
                switch (c)
@@ -1021,6 +1232,18 @@ main(int argc, char **argv)
                                break;
                        case 'x':
                                includewal = true;
+                               if (strcmp(optarg, "f") == 0 ||
+                                       strcmp(optarg, "fetch") == 0)
+                                       streamwal = false;
+                               else if (strcmp(optarg, "s") == 0 ||
+                                                strcmp(optarg, "stream") == 0)
+                                       streamwal = true;
+                               else
+                               {
+                                       fprintf(stderr, _("%s: invalid xlog option \"%s\", must be empty, \"fetch\" or \"stream\"\n"),
+                                                       progname, optarg);
+                                       exit(1);
+                               }
                                break;
                        case 'l':
                                label = xstrdup(optarg);
@@ -1068,6 +1291,15 @@ main(int argc, char **argv)
                        case 'W':
                                dbgetpassword = 1;
                                break;
+                       case 's':
+                               standby_message_timeout = atoi(optarg);
+                               if (standby_message_timeout < 0)
+                               {
+                                       fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
+                                                       progname, optarg);
+                                       exit(1);
+                               }
+                               break;
                        case 'v':
                                verbose++;
                                break;
@@ -1122,6 +1354,16 @@ main(int argc, char **argv)
                exit(1);
        }
 
+       if (format != 'p' && streamwal)
+       {
+               fprintf(stderr,
+                               _("%s: wal streaming can only be used in plain mode\n"),
+                               progname);
+               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                               progname);
+               exit(1);
+       }
+
 #ifndef HAVE_LIBZ
        if (compresslevel != 0)
        {
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
new file mode 100644 (file)
index 0000000..ba533d3
--- /dev/null
@@ -0,0 +1,465 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_receivexlog.c - receive streaming transaction log data and write it
+ *                                       to a local file.
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *               src/bin/pg_basebackup/pg_receivexlog.c
+ *-------------------------------------------------------------------------
+ */
+
+/*
+ * We have to use postgres.h not postgres_fe.h here, because there's so much
+ * backend-only stuff in the XLOG include files we need.  But we need a
+ * frontend-ish environment otherwise. Hence this ugly hack.
+ */
+#define FRONTEND 1
+#include "postgres.h"
+#include "libpq-fe.h"
+#include "libpq/pqsignal.h"
+#include "access/xlog_internal.h"
+
+#include "receivelog.h"
+#include "streamutil.h"
+
+#include <dirent.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "getopt_long.h"
+
+/* Global options */
+char      *basedir = NULL;
+int                    verbose = 0;
+int                    standby_message_timeout = 10;           /* 10 sec = default */
+volatile bool time_to_abort = false;
+
+
+static void usage(void);
+static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline);
+static void StreamLog();
+static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+
+static void
+usage(void)
+{
+       printf(_("%s receives PostgreSQL streaming transaction logs\n\n"),
+                  progname);
+       printf(_("Usage:\n"));
+       printf(_("  %s [OPTION]...\n"), progname);
+       printf(_("\nOptions controlling the output:\n"));
+       printf(_("  -D, --dir=directory       receive xlog files into this directory\n"));
+       printf(_("\nGeneral options:\n"));
+       printf(_("  -v, --verbose             output verbose messages\n"));
+       printf(_("  -?, --help                show this help, then exit\n"));
+       printf(_("  -V, --version             output version information, then exit\n"));
+       printf(_("\nConnection options:\n"));
+       printf(_("  -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n"));
+       printf(_("  -h, --host=HOSTNAME      database server host or socket directory\n"));
+       printf(_("  -p, --port=PORT          database server port number\n"));
+       printf(_("  -U, --username=NAME      connect as specified database user\n"));
+       printf(_("  -w, --no-password        never prompt for password\n"));
+       printf(_("  -W, --password           force password prompt (should happen automatically)\n"));
+       printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
+}
+
+static bool
+segment_callback(XLogRecPtr segendpos, uint32 timeline)
+{
+       char            fn[MAXPGPATH];
+       struct stat statbuf;
+
+       if (verbose)
+               fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
+                               progname, segendpos.xlogid, segendpos.xrecoff, timeline);
+
+       /*
+        * Check if there is a partial file for the name we just finished, and if
+        * there is, remove it under the assumption that we have now got all the
+        * data we need.
+        */
+       segendpos.xrecoff /= XLOG_SEG_SIZE;
+       PrevLogSeg(segendpos.xlogid, segendpos.xrecoff);
+       snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial",
+                        basedir, timeline,
+                        segendpos.xlogid,
+                        segendpos.xrecoff);
+       if (stat(fn, &statbuf) == 0)
+       {
+               /* File existed, get rid of it */
+               if (verbose)
+                       fprintf(stderr, _("%s: removing file \"%s\"\n"),
+                                       progname, fn);
+               unlink(fn);
+       }
+
+       /*
+        * Never abort from this - we handle all aborting in continue_streaming()
+        */
+       return false;
+}
+
+static bool
+continue_streaming(void)
+{
+       if (time_to_abort)
+       {
+               fprintf(stderr, _("%s: received interrupt signal, exiting.\n"),
+                               progname);
+               return true;
+       }
+       return false;
+}
+
+/*
+ * Determine starting location for streaming, based on:
+ * 1. If there are existing xlog segments, start at the end of the last one
+ * 2. If the last one is a partial segment, rename it and start over, since
+ *       we don't sync after every write.
+ * 3. If no existing xlog exists, start from the beginning of the current
+ *       WAL segment.
+ */
+static XLogRecPtr
+FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
+{
+       DIR                *dir;
+       struct dirent *dirent;
+       int                     i;
+       bool            b;
+       uint32          high_log = 0;
+       uint32          high_seg = 0;
+       bool            partial = false;
+
+       dir = opendir(basedir);
+       if (dir == NULL)
+       {
+               fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
+                               progname, basedir, strerror(errno));
+               disconnect_and_exit(1);
+       }
+
+       while ((dirent = readdir(dir)) != NULL)
+       {
+               char            fullpath[MAXPGPATH];
+               struct stat statbuf;
+               uint32          tli,
+                                       log,
+                                       seg;
+
+               if (!strcmp(dirent->d_name, ".") || !strcmp(dirent->d_name, ".."))
+                       continue;
+
+               /* xlog files are always 24 characters */
+               if (strlen(dirent->d_name) != 24)
+                       continue;
+
+               /* Filenames are always made out of 0-9 and A-F */
+               b = false;
+               for (i = 0; i < 24; i++)
+               {
+                       if (!(dirent->d_name[i] >= '0' && dirent->d_name[i] <= '9') &&
+                               !(dirent->d_name[i] >= 'A' && dirent->d_name[i] <= 'F'))
+                       {
+                               b = true;
+                               break;
+                       }
+               }
+               if (b)
+                       continue;
+
+               /*
+                * Looks like an xlog file. Parse its position.
+                */
+               if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3)
+               {
+                       fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"),
+                                       progname, dirent->d_name);
+                       disconnect_and_exit(1);
+               }
+
+               /* Ignore any files that are for another timeline */
+               if (tli != currenttimeline)
+                       continue;
+
+               /* Check if this is a completed segment or not */
+               snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
+               if (stat(fullpath, &statbuf) != 0)
+               {
+                       fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"),
+                                       progname, fullpath, strerror(errno));
+                       disconnect_and_exit(1);
+               }
+
+               if (statbuf.st_size == 16 * 1024 * 1024)
+               {
+                       /* Completed segment */
+                       if (log > high_log ||
+                               (log == high_log && seg > high_seg))
+                       {
+                               high_log = log;
+                               high_seg = seg;
+                               continue;
+                       }
+               }
+               else
+               {
+                       /*
+                        * This is a partial file. Rename it out of the way.
+                        */
+                       char            newfn[MAXPGPATH];
+
+                       fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"),
+                                       progname, dirent->d_name, dirent->d_name);
+
+                       snprintf(newfn, sizeof(newfn), "%s/%s.partial",
+                                        basedir, dirent->d_name);
+
+                       if (stat(newfn, &statbuf) == 0)
+                       {
+                               /*
+                                * XXX: perhaps we should only error out if the existing file
+                                * is larger?
+                                */
+                               fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"),
+                                               progname, newfn);
+                               disconnect_and_exit(1);
+                       }
+                       if (rename(fullpath, newfn) != 0)
+                       {
+                               fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"),
+                                               progname, fullpath, newfn, strerror(errno));
+                               disconnect_and_exit(1);
+                       }
+
+                       /* Don't continue looking for more, we assume this is the last */
+                       partial = true;
+                       break;
+               }
+       }
+
+       closedir(dir);
+
+       if (high_log > 0 || high_seg > 0)
+       {
+               XLogRecPtr      high_ptr;
+
+               if (!partial)
+               {
+                       /*
+                        * If the segment was partial, the pointer is already at the right
+                        * location since we want to re-transmit that segment. If it was
+                        * not, we need to move it to the next segment, since we are
+                        * tracking the last one that was complete.
+                        */
+                       NextLogSeg(high_log, high_seg);
+               }
+
+               high_ptr.xlogid = high_log;
+               high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE;
+
+               return high_ptr;
+       }
+       else
+               return currentpos;
+}
+
+/*
+ * Start the log streaming
+ */
+static void
+StreamLog(void)
+{
+       PGresult   *res;
+       uint32          timeline;
+       XLogRecPtr      startpos;
+
+       /*
+        * Connect in replication mode to the server
+        */
+       conn = GetConnection();
+
+       /*
+        * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
+        * position.
+        */
+       res = PQexec(conn, "IDENTIFY_SYSTEM");
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       {
+               fprintf(stderr, _("%s: could not identify system: %s\n"),
+                               progname, PQerrorMessage(conn));
+               disconnect_and_exit(1);
+       }
+       if (PQntuples(res) != 1)
+       {
+               fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
+                               progname, PQntuples(res));
+               disconnect_and_exit(1);
+       }
+       timeline = atoi(PQgetvalue(res, 0, 1));
+       if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &startpos.xlogid, &startpos.xrecoff) != 2)
+       {
+               fprintf(stderr, _("%s: could not parse log start position from value \"%s\"\n"),
+                               progname, PQgetvalue(res, 0, 2));
+               disconnect_and_exit(1);
+       }
+       PQclear(res);
+
+       /*
+        * Figure out where to start streaming.
+        */
+       startpos = FindStreamingStart(startpos, timeline);
+
+       /*
+        * Always start streaming at the beginning of a segment
+        */
+       startpos.xrecoff -= startpos.xrecoff % XLOG_SEG_SIZE;
+
+       /*
+        * Start the replication
+        */
+       if (verbose)
+               fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"),
+                               progname, startpos.xlogid, startpos.xrecoff, timeline);
+
+       ReceiveXlogStream(conn, startpos, timeline, NULL, basedir,
+                                         segment_callback, continue_streaming,
+                                         standby_message_timeout);
+}
+
+/*
+ * When sigint is called, just tell the system to exit at the next possible
+ * moment.
+ */
+static void
+sigint_handler(int signum)
+{
+       time_to_abort = true;
+}
+
+int
+main(int argc, char **argv)
+{
+       static struct option long_options[] = {
+               {"help", no_argument, NULL, '?'},
+               {"version", no_argument, NULL, 'V'},
+               {"dir", required_argument, NULL, 'D'},
+               {"host", required_argument, NULL, 'h'},
+               {"port", required_argument, NULL, 'p'},
+               {"username", required_argument, NULL, 'U'},
+               {"no-password", no_argument, NULL, 'w'},
+               {"password", no_argument, NULL, 'W'},
+               {"statusint", required_argument, NULL, 's'},
+               {"verbose", no_argument, NULL, 'v'},
+               {NULL, 0, NULL, 0}
+       };
+       int                     c;
+
+       int                     option_index;
+
+       progname = get_progname(argv[0]);
+       set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog"));
+
+       if (argc > 1)
+       {
+               if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+               {
+                       usage();
+                       exit(0);
+               }
+               else if (strcmp(argv[1], "-V") == 0
+                                || strcmp(argv[1], "--version") == 0)
+               {
+                       puts("pg_receivexlog (PostgreSQL) " PG_VERSION);
+                       exit(0);
+               }
+       }
+
+       while ((c = getopt_long(argc, argv, "D:h:p:U:s:wWv",
+                                                       long_options, &option_index)) != -1)
+       {
+               switch (c)
+               {
+                       case 'D':
+                               basedir = xstrdup(optarg);
+                               break;
+                       case 'h':
+                               dbhost = xstrdup(optarg);
+                               break;
+                       case 'p':
+                               if (atoi(optarg) <= 0)
+                               {
+                                       fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
+                                                       progname, optarg);
+                                       exit(1);
+                               }
+                               dbport = xstrdup(optarg);
+                               break;
+                       case 'U':
+                               dbuser = xstrdup(optarg);
+                               break;
+                       case 'w':
+                               dbgetpassword = -1;
+                               break;
+                       case 'W':
+                               dbgetpassword = 1;
+                               break;
+                       case 's':
+                               standby_message_timeout = atoi(optarg);
+                               if (standby_message_timeout < 0)
+                               {
+                                       fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
+                                                       progname, optarg);
+                                       exit(1);
+                               }
+                               break;
+                       case 'v':
+                               verbose++;
+                               break;
+                       default:
+
+                               /*
+                                * getopt_long already emitted a complaint
+                                */
+                               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                                               progname);
+                               exit(1);
+               }
+       }
+
+       /*
+        * Any non-option arguments?
+        */
+       if (optind < argc)
+       {
+               fprintf(stderr,
+                               _("%s: too many command-line arguments (first is \"%s\")\n"),
+                               progname, argv[optind]);
+               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                               progname);
+               exit(1);
+       }
+
+       /*
+        * Required arguments
+        */
+       if (basedir == NULL)
+       {
+               fprintf(stderr, _("%s: no target directory specified\n"), progname);
+               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                               progname);
+               exit(1);
+       }
+
+#ifndef WIN32
+       pqsignal(SIGINT, sigint_handler);
+#endif
+
+       StreamLog();
+
+       exit(0);
+}
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
new file mode 100644 (file)
index 0000000..0ca30c4
--- /dev/null
@@ -0,0 +1,398 @@
+/*-------------------------------------------------------------------------
+ *
+ * receivelog.c - receive transaction log files using the streaming
+ *                               replication protocol.
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *               src/bin/pg_basebackup/receivelog.c
+ *-------------------------------------------------------------------------
+ */
+
+/*
+ * We have to use postgres.h not postgres_fe.h here, because there's so much
+ * backend-only stuff in the XLOG include files we need.  But we need a
+ * frontend-ish environment otherwise. Hence this ugly hack.
+ */
+#define FRONTEND 1
+#include "postgres.h"
+#include "libpq-fe.h"
+#include "access/xlog_internal.h"
+#include "replication/walprotocol.h"
+#include "utils/datetime.h"
+
+#include "receivelog.h"
+#include "streamutil.h"
+
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+
+/* Size of the streaming replication protocol header */
+#define STREAMING_HEADER_SIZE (1+8+8+8)
+
+const XLogRecPtr InvalidXLogRecPtr = {0, 0};
+
+/*
+ * Open a new WAL file in the specified directory. Store the name
+ * (not including the full directory) in namebuf. Assumes there is
+ * enough room in this buffer...
+ */
+static int
+open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
+{
+       int                     f;
+       char            fn[MAXPGPATH];
+
+       XLogFileName(namebuf, timeline, startpoint.xlogid,
+                                startpoint.xrecoff / XLOG_SEG_SIZE);
+
+       snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf);
+       f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666);
+       if (f == -1)
+               fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
+                               progname, namebuf, strerror(errno));
+       return f;
+}
+
+/*
+ * Local version of GetCurrentTimestamp(), since we are not linked with
+ * backend code.
+ */
+static TimestampTz
+localGetCurrentTimestamp(void)
+{
+       TimestampTz result;
+       struct timeval tp;
+
+       gettimeofday(&tp, NULL);
+
+       result = (TimestampTz) tp.tv_sec -
+               ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
+
+#ifdef HAVE_INT64_TIMESTAMP
+       result = (result * USECS_PER_SEC) + tp.tv_usec;
+#else
+       result = result + (tp.tv_usec / 1000000.0);
+#endif
+
+       return result;
+}
+
+/*
+ * Receive a log stream starting at the specified position.
+ *
+ * If sysidentifier is specified, validate that both the system
+ * identifier and the timeline matches the specified ones
+ * (by sending an extra IDENTIFY_SYSTEM command)
+ *
+ * All received segments will be written to the directory
+ * specified by basedir.
+ *
+ * The segment_finish callback will be called after each segment
+ * has been finished, and the stream_continue callback will be
+ * called every time data is received. If either of these callbacks
+ * return true, the streaming will stop and the function
+ * return. As long as they return false, streaming will continue
+ * indefinitely.
+ *
+ * standby_message_timeout controls how often we send a message
+ * back to the master letting it know our progress, in seconds.
+ * This message will only contain the write location, and never
+ * flush or replay.
+ *
+ * Note: The log position *must* be at a log segment start!
+ */
+bool
+ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, segment_finish_callback segment_finish, stream_continue_callback stream_continue, int standby_message_timeout)
+{
+       char            query[128];
+       char            current_walfile_name[MAXPGPATH];
+       PGresult   *res;
+       char       *copybuf = NULL;
+       int                     walfile = -1;
+       int64           last_status = -1;
+       XLogRecPtr      blockpos = InvalidXLogRecPtr;
+
+       if (sysidentifier != NULL)
+       {
+               /* Validate system identifier and timeline hasn't changed */
+               res = PQexec(conn, "IDENTIFY_SYSTEM");
+               if (PQresultStatus(res) != PGRES_TUPLES_OK)
+               {
+                       fprintf(stderr, _("%s: could not identify system: %s\n"),
+                                       progname, PQerrorMessage(conn));
+                       PQclear(res);
+                       return false;
+               }
+               if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0)
+               {
+                       fprintf(stderr, _("%s: system identifier does not match between base backup and streaming connection\n"), progname);
+                       PQclear(res);
+                       return false;
+               }
+               if (timeline != atoi(PQgetvalue(res, 0, 1)))
+               {
+                       fprintf(stderr, _("%s: timeline does not match between base backup and streaming connection\n"), progname);
+                       PQclear(res);
+                       return false;
+               }
+               PQclear(res);
+       }
+
+       /* Initiate the replication stream at specified location */
+       snprintf(query, sizeof(query), "START_REPLICATION %X/%X", startpos.xlogid, startpos.xrecoff);
+       res = PQexec(conn, query);
+       if (PQresultStatus(res) != PGRES_COPY_BOTH)
+       {
+               fprintf(stderr, _("%s: could not start replication: %s\n"),
+                               progname, PQresultErrorMessage(res));
+               return false;
+       }
+       PQclear(res);
+
+       /*
+        * Receive the actual xlog data
+        */
+       while (1)
+       {
+               int                     r;
+               int                     xlogoff;
+               int                     bytes_left;
+               int                     bytes_written;
+               int64           now;
+
+               if (copybuf != NULL)
+               {
+                       PQfreemem(copybuf);
+                       copybuf = NULL;
+               }
+
+               /*
+                * Check if we should continue streaming, or abort at this point.
+                */
+               if (stream_continue && stream_continue())
+               {
+                       if (walfile != -1)
+                       {
+                               fsync(walfile);
+                               close(walfile);
+                       }
+                       return true;
+               }
+
+               /*
+                * Potentially send a status message to the master
+                */
+               now = localGetCurrentTimestamp();
+               if (standby_message_timeout > 0 &&
+                       last_status < now - standby_message_timeout * 1000000)
+               {
+                       /* Time to send feedback! */
+                       char            replybuf[sizeof(StandbyReplyMessage) + 1];
+                       StandbyReplyMessage *replymsg = (StandbyReplyMessage *) (replybuf + 1);
+
+                       replymsg->write = blockpos;
+                       replymsg->flush = InvalidXLogRecPtr;
+                       replymsg->apply = InvalidXLogRecPtr;
+                       replymsg->sendTime = now;
+                       replybuf[0] = 'r';
+
+                       if (PQputCopyData(conn, replybuf, sizeof(replybuf)) <= 0 ||
+                               PQflush(conn))
+                       {
+                               fprintf(stderr, _("%s: could not send feedback packet: %s"),
+                                               progname, PQerrorMessage(conn));
+                               return false;
+                       }
+
+                       last_status = now;
+               }
+
+               r = PQgetCopyData(conn, &copybuf, 1);
+               if (r == 0)
+               {
+                       /*
+                        * In async mode, and no data available. We block on reading but
+                        * not more than the specified timeout, so that we can send a
+                        * response back to the client.
+                        */
+                       fd_set          input_mask;
+                       struct timeval timeout;
+                       struct timeval *timeoutptr;
+
+                       FD_ZERO(&input_mask);
+                       FD_SET(PQsocket(conn), &input_mask);
+                       if (standby_message_timeout)
+                       {
+                               timeout.tv_sec = last_status + standby_message_timeout - now - 1;
+                               if (timeout.tv_sec <= 0)
+                                       timeout.tv_sec = 1; /* Always sleep at least 1 sec */
+                               timeout.tv_usec = 0;
+                               timeoutptr = &timeout;
+                       }
+                       else
+                               timeoutptr = NULL;
+
+                       r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+                       if (r == 0 || (r < 0 && errno == EINTR))
+                       {
+                               /*
+                                * Got a timeout or signal. Continue the loop and either
+                                * deliver a status packet to the server or just go back into
+                                * blocking.
+                                */
+                               continue;
+                       }
+                       else if (r < 0)
+                       {
+                               fprintf(stderr, _("%s: select() failed: %m\n"), progname);
+                               return false;
+                       }
+                       /* Else there is actually data on the socket */
+                       if (PQconsumeInput(conn) == 0)
+                       {
+                               fprintf(stderr, _("%s: could not receive data from WAL stream: %s\n"),
+                                               progname, PQerrorMessage(conn));
+                               return false;
+                       }
+                       continue;
+               }
+               if (r == -1)
+                       /* End of copy stream */
+                       break;
+               if (r == -2)
+               {
+                       fprintf(stderr, _("%s: could not read copy data: %s\n"),
+                                       progname, PQerrorMessage(conn));
+                       return false;
+               }
+               if (r < STREAMING_HEADER_SIZE + 1)
+               {
+                       fprintf(stderr, _("%s: streaming header too small: %i\n"),
+                                       progname, r);
+                       return false;
+               }
+               if (copybuf[0] != 'w')
+               {
+                       fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+                                       progname, copybuf[0]);
+                       return false;
+               }
+
+               /* Extract WAL location for this block */
+               memcpy(&blockpos, copybuf + 1, 8);
+               xlogoff = blockpos.xrecoff % XLOG_SEG_SIZE;
+
+               /*
+                * Verify that the initial location in the stream matches where we
+                * think we are.
+                */
+               if (walfile == -1)
+               {
+                       /* No file open yet */
+                       if (xlogoff != 0)
+                       {
+                               fprintf(stderr, _("%s: received xlog record for offset %u with no file open\n"),
+                                               progname, xlogoff);
+                               return false;
+                       }
+               }
+               else
+               {
+                       /* More data in existing segment */
+                       /* XXX: store seek value don't reseek all the time */
+                       if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+                       {
+                               fprintf(stderr, _("%s: got WAL data offset %08x, expected %08x\n"),
+                                               progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+                               return false;
+                       }
+               }
+
+               bytes_left = r - STREAMING_HEADER_SIZE;
+               bytes_written = 0;
+
+               while (bytes_left)
+               {
+                       int                     bytes_to_write;
+
+                       /*
+                        * If crossing a WAL boundary, only write up until we reach
+                        * XLOG_SEG_SIZE.
+                        */
+                       if (xlogoff + bytes_left > XLOG_SEG_SIZE)
+                               bytes_to_write = XLOG_SEG_SIZE - xlogoff;
+                       else
+                               bytes_to_write = bytes_left;
+
+                       if (walfile == -1)
+                       {
+                               walfile = open_walfile(blockpos, timeline,
+                                                                          basedir, current_walfile_name);
+                               if (walfile == -1)
+                                       /* Error logged by open_walfile */
+                                       return false;
+                       }
+
+                       if (write(walfile,
+                                         copybuf + STREAMING_HEADER_SIZE + bytes_written,
+                                         bytes_to_write) != bytes_to_write)
+                       {
+                               fprintf(stderr, _("%s: could not write %u bytes to WAL file %s: %s\n"),
+                                               progname,
+                                               bytes_to_write,
+                                               current_walfile_name,
+                                               strerror(errno));
+                               return false;
+                       }
+
+                       /* Write was successful, advance our position */
+                       bytes_written += bytes_to_write;
+                       bytes_left -= bytes_to_write;
+                       XLByteAdvance(blockpos, bytes_to_write);
+                       xlogoff += bytes_to_write;
+
+                       /* Did we reach the end of a WAL segment? */
+                       if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
+                       {
+                               fsync(walfile);
+                               close(walfile);
+                               walfile = -1;
+                               xlogoff = 0;
+
+                               if (segment_finish != NULL)
+                               {
+                                       /*
+                                        * Callback when the segment finished, and return if it
+                                        * told us to.
+                                        */
+                                       if (segment_finish(blockpos, timeline))
+                                               return true;
+                               }
+                       }
+               }
+               /* No more data left to write, start receiving next copy packet */
+       }
+
+       /*
+        * The only way to get out of the loop is if the server shut down the
+        * replication stream. If it's a controlled shutdown, the server will send
+        * a shutdown message, and we'll return the latest xlog location that has
+        * been streamed.
+        */
+
+       res = PQgetResult(conn);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+       {
+               fprintf(stderr, _("%s: unexpected termination of replication stream: %s\n"),
+                               progname, PQresultErrorMessage(res));
+               return false;
+       }
+       PQclear(res);
+       return true;
+}
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
new file mode 100644 (file)
index 0000000..1c61ea8
--- /dev/null
@@ -0,0 +1,22 @@
+#include "access/xlogdefs.h"
+
+/*
+ * Called whenever a segment is finished, return true to stop
+ * the streaming at this point.
+ */
+typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline);
+
+/*
+ * Called before trying to read more data. Return true to stop
+ * the streaming at this point.
+ */
+typedef bool (*stream_continue_callback)(void);
+
+extern bool ReceiveXlogStream(PGconn *conn,
+                                                         XLogRecPtr startpos,
+                                                         uint32 timeline,
+                                                         char *sysidentifier,
+                                                         char *basedir,
+                                                         segment_finish_callback segment_finish,
+                                                         stream_continue_callback stream_continue,
+                                                         int standby_message_timeout);
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
new file mode 100644 (file)
index 0000000..8123431
--- /dev/null
@@ -0,0 +1,165 @@
+/*-------------------------------------------------------------------------
+ *
+ * streamutil.c - utility functions for pg_basebackup and pg_receivelog
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *               src/bin/pg_basebackup/streamutil.c
+ *-------------------------------------------------------------------------
+ */
+
+/*
+ * We have to use postgres.h not postgres_fe.h here, because there's so much
+ * backend-only stuff in the XLOG include files we need.  But we need a
+ * frontend-ish environment otherwise. Hence this ugly hack.
+ */
+#define FRONTEND 1
+#include "postgres.h"
+#include "streamutil.h"
+
+#include <stdio.h>
+#include <string.h>
+
+const char *progname;
+char      *dbhost = NULL;
+char      *dbuser = NULL;
+char      *dbport = NULL;
+int                    dbgetpassword = 0;      /* 0=auto, -1=never, 1=always */
+static char *dbpassword = NULL;
+PGconn    *conn = NULL;
+
+/*
+ * strdup() and malloc() replacements that prints an error and exits
+ * if something goes wrong. Can never return NULL.
+ */
+char *
+xstrdup(const char *s)
+{
+       char       *result;
+
+       result = strdup(s);
+       if (!result)
+       {
+               fprintf(stderr, _("%s: out of memory\n"), progname);
+               exit(1);
+       }
+       return result;
+}
+
+void *
+xmalloc0(int size)
+{
+       void       *result;
+
+       result = malloc(size);
+       if (!result)
+       {
+               fprintf(stderr, _("%s: out of memory\n"), progname);
+               exit(1);
+       }
+       MemSet(result, 0, size);
+       return result;
+}
+
+
+PGconn *
+GetConnection(void)
+{
+       PGconn     *tmpconn;
+       int                     argcount = 4;   /* dbname, replication, fallback_app_name,
+                                                                * password */
+       int                     i;
+       const char **keywords;
+       const char **values;
+       char       *password = NULL;
+
+       if (dbhost)
+               argcount++;
+       if (dbuser)
+               argcount++;
+       if (dbport)
+               argcount++;
+
+       keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
+       values = xmalloc0((argcount + 1) * sizeof(*values));
+
+       keywords[0] = "dbname";
+       values[0] = "replication";
+       keywords[1] = "replication";
+       values[1] = "true";
+       keywords[2] = "fallback_application_name";
+       values[2] = progname;
+       i = 3;
+       if (dbhost)
+       {
+               keywords[i] = "host";
+               values[i] = dbhost;
+               i++;
+       }
+       if (dbuser)
+       {
+               keywords[i] = "user";
+               values[i] = dbuser;
+               i++;
+       }
+       if (dbport)
+       {
+               keywords[i] = "port";
+               values[i] = dbport;
+               i++;
+       }
+
+       while (true)
+       {
+               if (password)
+                       free(password);
+
+               if (dbpassword)
+               {
+                       /*
+                        * We've saved a password when a previous connection succeeded,
+                        * meaning this is the call for a second session to the same
+                        * database, so just forcibly reuse that password.
+                        */
+                       keywords[argcount - 1] = "password";
+                       values[argcount - 1] = dbpassword;
+                       dbgetpassword = -1; /* Don't try again if this fails */
+               }
+               else if (dbgetpassword == 1)
+               {
+                       password = simple_prompt(_("Password: "), 100, false);
+                       keywords[argcount - 1] = "password";
+                       values[argcount - 1] = password;
+               }
+
+               tmpconn = PQconnectdbParams(keywords, values, true);
+
+               if (PQstatus(tmpconn) == CONNECTION_BAD &&
+                       PQconnectionNeedsPassword(tmpconn) &&
+                       dbgetpassword != -1)
+               {
+                       dbgetpassword = 1;      /* ask for password next time */
+                       PQfinish(tmpconn);
+                       continue;
+               }
+
+               if (PQstatus(tmpconn) != CONNECTION_OK)
+               {
+                       fprintf(stderr, _("%s: could not connect to server: %s\n"),
+                                       progname, PQerrorMessage(tmpconn));
+                       exit(1);
+               }
+
+               /* Connection ok! */
+               free(values);
+               free(keywords);
+
+               /* Store the password for next run */
+               if (password)
+                       dbpassword = password;
+               return tmpconn;
+       }
+}
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
new file mode 100644 (file)
index 0000000..baba5eb
--- /dev/null
@@ -0,0 +1,22 @@
+#include "libpq-fe.h"
+
+extern const char *progname;
+extern char *dbhost;
+extern char *dbuser;
+extern char *dbport;
+extern int     dbgetpassword;
+
+/* Connection kept global so we can disconnect easily */
+extern PGconn *conn;
+
+#define disconnect_and_exit(code)                              \
+       {                                                                                       \
+       if (conn != NULL) PQfinish(conn);                       \
+       exit(code);                                                                     \
+       }
+
+
+char      *xstrdup(const char *s);
+void      *xmalloc0(int size);
+
+PGconn    *GetConnection(void);
index 3d71c881cfb282bf45f3d97ad4a04703a14c32fd..e2ae0a15781f680bdd82e76c01276b56b29d353b 100644 (file)
@@ -305,6 +305,13 @@ sub mkvcbuild
     $initdb->AddLibrary('ws2_32.lib');
 
     my $pgbasebackup = AddSimpleFrontend('pg_basebackup', 1);
+    $pgbasebackup->AddFile('src\bin\pg_basebackup\pg_basebackup.c');
+    $pgbasebackup->AddLibrary('ws2_32.lib');
+
+    my $pgreceivexlog = AddSimpleFrontend('pg_basebackup', 1);
+    $pgreceivexlog->{name} = 'pg_receivexlog';
+    $pgreceivexlog->AddFile('src\bin\pg_basebackup\pg_receivexlog.c');
+    $pgreceivexlog->AddLibrary('ws2_32.lib');
 
     my $pgconfig = AddSimpleFrontend('pg_config');