]> granicus.if.org Git - postgresql/commitdiff
Add pg_rewind, for re-synchronizing a master server after failback.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 23 Mar 2015 17:47:52 +0000 (19:47 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 23 Mar 2015 17:47:52 +0000 (19:47 +0200)
Earlier versions of this tool were available (and still are) on github.

Thanks to Michael Paquier, Alvaro Herrera, Peter Eisentraut, Amit Kapila,
and Satoshi Nagayasu for review.

29 files changed:
doc/src/sgml/high-availability.sgml
doc/src/sgml/ref/allfiles.sgml
doc/src/sgml/ref/pg_rewind.sgml [new file with mode: 0644]
doc/src/sgml/reference.sgml
src/bin/Makefile
src/bin/pg_rewind/.gitignore [new file with mode: 0644]
src/bin/pg_rewind/Makefile [new file with mode: 0644]
src/bin/pg_rewind/RewindTest.pm [new file with mode: 0644]
src/bin/pg_rewind/copy_fetch.c [new file with mode: 0644]
src/bin/pg_rewind/datapagemap.c [new file with mode: 0644]
src/bin/pg_rewind/datapagemap.h [new file with mode: 0644]
src/bin/pg_rewind/fetch.c [new file with mode: 0644]
src/bin/pg_rewind/fetch.h [new file with mode: 0644]
src/bin/pg_rewind/file_ops.c [new file with mode: 0644]
src/bin/pg_rewind/file_ops.h [new file with mode: 0644]
src/bin/pg_rewind/filemap.c [new file with mode: 0644]
src/bin/pg_rewind/filemap.h [new file with mode: 0644]
src/bin/pg_rewind/libpq_fetch.c [new file with mode: 0644]
src/bin/pg_rewind/logging.c [new file with mode: 0644]
src/bin/pg_rewind/logging.h [new file with mode: 0644]
src/bin/pg_rewind/nls.mk [new file with mode: 0644]
src/bin/pg_rewind/parsexlog.c [new file with mode: 0644]
src/bin/pg_rewind/pg_rewind.c [new file with mode: 0644]
src/bin/pg_rewind/pg_rewind.h [new file with mode: 0644]
src/bin/pg_rewind/t/001_basic.pl [new file with mode: 0644]
src/bin/pg_rewind/t/002_databases.pl [new file with mode: 0644]
src/bin/pg_rewind/t/003_extrafiles.pl [new file with mode: 0644]
src/bin/pg_rewind/timeline.c [new file with mode: 0644]
src/tools/msvc/Mkvcbuild.pm

index d249959f205970c1b287ec5a9bdee59ffa4dfd8c..a17f55545c92842440bfe906e8149bab322c43b4 100644 (file)
@@ -1272,7 +1272,9 @@ primary_slot_name = 'node_a_slot'
     and might stay down.  To return to normal operation, a standby server
     must be recreated,
     either on the former primary system when it comes up, or on a third,
-    possibly new, system. Once complete, the primary and standby can be
+    possibly new, system. The <xref linkend="app-pgrewind"> utility can be
+    used to speed up this process on large clusters.
+    Once complete, the primary and standby can be
     considered to have switched roles. Some people choose to use a third
     server to provide backup for the new primary until the new standby
     server is recreated,
index 7aa3128090d801168432f37d1871188c3de9ad0e..5b4692fb6d8dacd923a4a1ab66edbff05ab0a6fe 100644 (file)
@@ -190,6 +190,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY pgRecvlogical      SYSTEM "pg_recvlogical.sgml">
 <!ENTITY pgResetxlog        SYSTEM "pg_resetxlog.sgml">
 <!ENTITY pgRestore          SYSTEM "pg_restore.sgml">
+<!ENTITY pgRewind           SYSTEM "pg_rewind.sgml">
 <!ENTITY postgres           SYSTEM "postgres-ref.sgml">
 <!ENTITY postmaster         SYSTEM "postmaster.sgml">
 <!ENTITY psqlRef            SYSTEM "psql-ref.sgml">
diff --git a/doc/src/sgml/ref/pg_rewind.sgml b/doc/src/sgml/ref/pg_rewind.sgml
new file mode 100644 (file)
index 0000000..37b5d67
--- /dev/null
@@ -0,0 +1,237 @@
+<!--
+doc/src/sgml/ref/pg_rewind.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="app-pgrewind">
+ <indexterm zone="app-pgrewind">
+  <primary>pg_rewind</primary>
+ </indexterm>
+
+ <refmeta>
+  <refentrytitle><application>pg_rewind</application></refentrytitle>
+  <manvolnum>1</manvolnum>
+  <refmiscinfo>Application</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+  <refname>pg_rewind</refname>
+  <refpurpose>synchronize a <productname>PostgreSQL</productname> data directory with another data directory that was forked from the first one</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+  <cmdsynopsis>
+   <command>pg_rewind</command>
+   <arg rep="repeat"><replaceable>option</replaceable></arg>
+   <group choice="plain">
+    <group choice="req">
+     <arg choice="plain"><option>-D </option></arg>
+     <arg choice="plain"><option>--target-pgdata</option></arg>
+    </group>
+    <replaceable> directory</replaceable>
+    <group choice="req">
+     <arg choice="plain"><option>--source-pgdata=<replaceable>directory</replaceable></option></arg>
+     <arg choice="plain"><option>--source-server=<replaceable>connstr</replaceable></option></arg>
+    </group>
+   </group>
+  </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+
+  <para>
+   <application>pg_rewind</> is a tool for synchronizing a PostgreSQL cluster
+   with another copy of the same cluster, after the clusters' timelines have
+   diverged. A typical scenario is to bring an old master server back online
+   after failover, as a standby that follows the new master.
+  </para>
+
+  <para>
+   The result is equivalent to replacing the target data directory with the
+   source one. All files are copied, including configuration files. The
+   advantage of <application>pg_rewind</> over taking a new base backup, or
+   tools like <application>rsync</>, is that <application>pg_rewind</> does
+   not require reading through all unchanged files in the cluster. That makes
+   it a lot faster when the database is large and only a small portion of it
+   differs between the clusters.
+  </para>
+
+  <para>
+   <application>pg_rewind</> examines the timeline histories of the source
+   and target clusters to determine the point where they diverged, and
+   expects to find WAL in the target cluster's <filename>pg_xlog</> directory
+   reaching all the way back to the point of divergence. In the typical
+   failover scenario where the target cluster was shut down soon after the
+   divergence, that is not a problem, but if the target cluster had run for a
+   long time after the divergence, the old WAL files might not be present
+   anymore. In that case, they can be manually copied from the WAL archive to
+   the <filename>pg_xlog</> directory. Fetching missing files from a WAL
+   archive automatically is currently not supported.
+  </para>
+
+  <para>
+   When the target server is started up for the first time after running
+   <application>pg_rewind</>, it will go into recovery mode and replay all
+   WAL generated in the source server after the point of divergence.
+   If some of the WAL was no longer available in the source server when
+   <application>pg_rewind</> was run, and therefore could not be copied by
+   <application>pg_rewind</> session, it needs to be made available when the
+   target server is started up. That can be done by creating a
+   <filename>recovery.conf</> file in the target data directory with a
+   suitable <varname>restore_command</>.
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Options</title>
+
+   <para>
+    <application>pg_rewind</application> accepts the following command-line
+    arguments:
+
+    <variablelist>
+     <varlistentry>
+      <term><option>-D</option></term>
+      <term><option>--target-pgdata</option></term>
+      <listitem>
+       <para>
+        This option specifies the target data directory that is synchronized
+        with the source. The target server must shut down cleanly before
+        running <application>pg_rewind</application>
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>--source-pgdata</option></term>
+      <listitem>
+       <para>
+        Specifies path to the data directory of the source server, to
+        synchronize the target with. When <option>--source-pgdata</> is
+        used, the source server must be cleanly shut down.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>--source-server</option></term>
+      <listitem>
+       <para>
+        Specifies a libpq connection string to connect to the source
+        <productname>PostgreSQL</> server to synchronize the target with.
+        The server must be up and running, and must not be in recovery mode.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-n</option></term>
+      <term><option>--dry-run</option></term>
+      <listitem>
+       <para>
+        Do everything except actually modifying the target directory.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-P</option></term>
+      <term><option>--progress</option></term>
+      <listitem>
+       <para>
+        Enables progress reporting. Turning this on will deliver an approximate
+        progress report while copying data over from the source cluster.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>--debug</option></term>
+      <listitem>
+       <para>
+        Print verbose debugging output that is mostly useful for developers
+        debugging <application>pg_rewind</>.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-V</option></term>
+      <term><option>--version</option></term>
+      <listitem><para>Display version information, then exit</para></listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-?</option></term>
+      <term><option>--help</option></term>
+      <listitem><para>Show help, then exit</para></listitem>
+     </varlistentry>
+
+    </variablelist>
+   </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Environment</title>
+
+  <para>
+   When <option>--source-server</> option is used,
+   <application>pg_rewind</application> also uses the environment variables
+   supported by <application>libpq</> (see <xref linkend="libpq-envars">).
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Notes</title>
+
+  <para>
+   <application>pg_rewind</> requires that the <varname>wal_log_hints</>
+   option is enabled in <filename>postgresql.conf</>, or that data checksums
+   were enabled when the cluster was initialized with <application>initdb</>.
+   <varname>full_page_writes</> must also be enabled.
+  </para>
+
+  <refsect2>
+   <title>How it works</title>
+
+   <para>
+    The basic idea is to copy everything from the new cluster to the old
+    cluster, except for the blocks that we know to be the same.
+   </para>
+
+   <procedure>
+    <step>
+     <para>
+      Scan the WAL log of the old cluster, starting from the last checkpoint
+      before the point where the new cluster's timeline history forked off
+      from the old cluster. For each WAL record, make a note of the data
+      blocks that were touched. This yields a list of all the data blocks
+      that were changed in the old cluster, after the new cluster forked off.
+     </para>
+    </step>
+    <step>
+     <para>
+      Copy all those changed blocks from the new cluster to the old cluster.
+     </para>
+    </step>
+    <step>
+     <para>
+      Copy all other files like clog, conf files etc. from the new cluster
+      to old cluster. Everything except the relation files.
+     </para>
+    </step>
+    <step>
+     <para>
+      Apply the WAL from the new cluster, starting from the checkpoint
+      created at failover. (Strictly speaking, <application>pg_rewind</>
+      doesn't apply the WAL, it just creates a backup label file indicating
+      that when <productname>PostgreSQL</> is started, it will start replay
+      from that checkpoint and apply all the required WAL.)
+     </para>
+    </step>
+   </procedure>
+  </refsect2>
+ </refsect1>
+
+</refentry>
index 10c9a6d4030606296c5533f9b7e9e26689a42aac..65ad795afe97b8ce0bd6496b8d7654e9a3401ea9 100644 (file)
    &pgControldata;
    &pgCtl;
    &pgResetxlog;
+   &pgRewind;
    &postgres;
    &postmaster;
 
index 90ca1a8cce498f1185dea240d6f51743fef0dc6c..7832deca0db3d574bfaec064afb53428529e9823 100644 (file)
@@ -21,6 +21,7 @@ SUBDIRS = \
        pg_ctl \
        pg_dump \
        pg_resetxlog \
+       pg_rewind \
        psql \
        scripts
 
diff --git a/src/bin/pg_rewind/.gitignore b/src/bin/pg_rewind/.gitignore
new file mode 100644 (file)
index 0000000..9ade7ef
--- /dev/null
@@ -0,0 +1,7 @@
+# Files generated during build
+/xlogreader.c
+/pg_rewind
+
+# Generated by test suite
+/tmp_check/
+/regress_log/
diff --git a/src/bin/pg_rewind/Makefile b/src/bin/pg_rewind/Makefile
new file mode 100644 (file)
index 0000000..5b9e620
--- /dev/null
@@ -0,0 +1,52 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for src/bin/pg_rewind
+#
+# Portions Copyright (c) 2013-2015, PostgreSQL Global Development Group
+#
+# src/bin/pg_rewind/Makefile
+#
+#-------------------------------------------------------------------------
+
+PGFILEDESC = "pg_rewind - repurpose an old master server as standby"
+PGAPPICON = win32
+
+subdir = src/bin/pg_rewind
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LIBS = $(libpq_pgport)
+
+override CPPFLAGS := -I$(libpq_srcdir) -DFRONTEND $(CPPFLAGS)
+
+OBJS   = pg_rewind.o parsexlog.o xlogreader.o datapagemap.o timeline.o \
+       fetch.o file_ops.o copy_fetch.o libpq_fetch.o filemap.o logging.o \
+       $(WIN32RES)
+
+EXTRA_CLEAN = $(RMGRDESCSOURCES) xlogreader.c
+
+all: pg_rewind
+
+pg_rewind: $(OBJS) | submake-libpq submake-libpgport
+       $(CC) $(CFLAGS) $^ $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
+xlogreader.c: % : $(top_srcdir)/src/backend/access/transam/%
+       rm -f $@ && $(LN_S) $< .
+
+install: all installdirs
+       $(INSTALL_PROGRAM) pg_rewind$(X) '$(DESTDIR)$(bindir)/pg_rewind$(X)'
+
+installdirs:
+       $(MKDIR_P) '$(DESTDIR)$(bindir)'
+
+uninstall:
+       rm -f '$(DESTDIR)$(bindir)/pg_rewind$(X)'
+
+clean distclean maintainer-clean:
+       rm -f pg_rewind$(X) $(OBJS) xlogreader.c
+       rm -rf tmp_check
+
+check: all
+       $(prove_check) :: local
+       $(prove_check) :: remote
diff --git a/src/bin/pg_rewind/RewindTest.pm b/src/bin/pg_rewind/RewindTest.pm
new file mode 100644 (file)
index 0000000..0f8f4ca
--- /dev/null
@@ -0,0 +1,271 @@
+package RewindTest;
+
+# Test driver for pg_rewind. Each test consists of a cycle where a new cluster
+# is first created with initdb, and a streaming replication standby is set up
+# to follow the master. Then the master is shut down and the standby is
+# promoted, and finally pg_rewind is used to rewind the old master, using the
+# standby as the source.
+#
+# To run a test, the test script (in t/ subdirectory) calls the functions
+# in this module. These functions should be called in this sequence:
+#
+# 1. init_rewind_test - sets up log file etc.
+#
+# 2. setup_cluster - creates a PostgreSQL cluster that runs as the master
+#
+# 3. create_standby - runs pg_basebackup to initialize a standby server, and
+#    sets it up to follow the master.
+#
+# 4. promote_standby - runs "pg_ctl promote" to promote the standby server.
+# The old master keeps running.
+#
+# 5. run_pg_rewind - stops the old master (if it's still running) and runs
+# pg_rewind to synchronize it with the now-promoted standby server.
+#
+# The test script can use the helper functions master_psql and standby_psql
+# to run psql against the master and standby servers, respectively. The
+# test script can also use the $connstr_master and $connstr_standby global
+# variables, which contain libpq connection strings for connecting to the
+# master and standby servers. The data directories are also available
+# in paths $test_master_datadir and $test_standby_datadir
+
+use TestLib;
+use Test::More;
+
+use File::Copy;
+use File::Path qw(remove_tree);
+use IPC::Run qw(run start);
+
+use Exporter 'import';
+our @EXPORT = qw(
+  $connstr_master
+  $connstr_standby
+  $test_master_datadir
+  $test_standby_datadir
+
+  append_to_file
+  master_psql
+  standby_psql
+  check_query
+
+  init_rewind_test
+  setup_cluster
+  create_standby
+  promote_standby
+  run_pg_rewind
+);
+
+
+# Adjust these paths for your environment
+my $testroot = "./tmp_check";
+$test_master_datadir="$testroot/data_master";
+$test_standby_datadir="$testroot/data_standby";
+
+mkdir $testroot;
+
+# Log files are created here
+mkdir "regress_log";
+
+# Define non-conflicting ports for both nodes.
+my $port_master=$ENV{PGPORT};
+my $port_standby=$port_master + 1;
+
+my $log_path;
+my $tempdir_short;
+
+$connstr_master="port=$port_master";
+$connstr_standby="port=$port_standby";
+
+$ENV{PGDATABASE} = "postgres";
+
+sub master_psql
+{
+       my $cmd = shift;
+
+       system_or_bail("psql -q --no-psqlrc -d $connstr_master -c \"$cmd\"");
+}
+
+sub standby_psql
+{
+       my $cmd = shift;
+
+       system_or_bail("psql -q --no-psqlrc -d $connstr_standby -c \"$cmd\"");
+}
+
+# Run a query against the master, and check that the output matches what's
+# expected
+sub check_query
+{
+       my ($query, $expected_stdout, $test_name) = @_;
+       my ($stdout, $stderr);
+
+       # we want just the output, no formatting
+       my $result = run ['psql', '-q', '-A', '-t', '--no-psqlrc',
+                                         '-d', $connstr_master,
+                                         '-c' , $query],
+                                        '>', \$stdout, '2>', \$stderr;
+       # We don't use ok() for the exit code and stderr, because we want this
+       # check to be just a single test.
+       if (!$result) {
+               fail ("$test_name: psql exit code");
+       } elsif ($stderr ne '') {
+               diag $stderr;
+               fail ("$test_name: psql no stderr");
+       } else {
+               is ($stdout, $expected_stdout, "$test_name: query result matches");
+       }
+}
+
+sub append_to_file
+{
+       my($filename, $str) = @_;
+
+       open my $fh, ">>", $filename or die "could not open file $filename";
+       print $fh $str;
+       close $fh;
+}
+
+sub init_rewind_test
+{
+       ($testname, $test_mode) = @_;
+
+       $log_path="regress_log/pg_rewind_log_${testname}_${test_mode}";
+
+       remove_tree $log_path;
+}
+
+sub setup_cluster
+{
+       $tempdir_short = tempdir_short;
+
+       # Initialize master, data checksums are mandatory
+       remove_tree($test_master_datadir);
+       standard_initdb($test_master_datadir);
+
+       # Custom parameters for master's postgresql.conf
+       append_to_file("$test_master_datadir/postgresql.conf", qq(
+wal_level = hot_standby
+max_wal_senders = 2
+wal_keep_segments = 20
+max_wal_size = 200MB
+shared_buffers = 1MB
+wal_log_hints = on
+hot_standby = on
+autovacuum = off
+max_connections = 10
+));
+
+       # Accept replication connections on master
+       append_to_file("$test_master_datadir/pg_hba.conf", qq(
+local replication all trust
+));
+
+       system_or_bail("pg_ctl -w -D $test_master_datadir -o \"-k $tempdir_short --listen-addresses='' -p $port_master\" start >>$log_path 2>&1");
+
+       #### Now run the test-specific parts to initialize the master before setting
+       # up standby
+       $ENV{PGHOST}         = $tempdir_short;
+}
+
+sub create_standby
+{
+       # Set up standby with necessary parameter
+       remove_tree $test_standby_datadir;
+
+       # Base backup is taken with xlog files included
+       system_or_bail("pg_basebackup -D $test_standby_datadir -p $port_master -x >>$log_path 2>&1");
+       append_to_file("$test_standby_datadir/recovery.conf", qq(
+primary_conninfo='$connstr_master'
+standby_mode=on
+recovery_target_timeline='latest'
+));
+
+       # Start standby
+       system_or_bail("pg_ctl -w -D $test_standby_datadir -o \"-k $tempdir_short --listen-addresses='' -p $port_standby\" start >>$log_path 2>&1");
+
+       # sleep a bit to make sure the standby has caught up.
+       sleep 1;
+}
+
+sub promote_standby
+{
+       #### Now run the test-specific parts to run after standby has been started
+       # up standby
+
+       # Now promote slave and insert some new data on master, this will put
+       # the master out-of-sync with the standby.
+       system_or_bail("pg_ctl -w -D $test_standby_datadir promote >>$log_path 2>&1");
+       sleep 1;
+}
+
+sub run_pg_rewind
+{
+       # Stop the master and be ready to perform the rewind
+       system_or_bail("pg_ctl -w -D $test_master_datadir stop -m fast >>$log_path 2>&1");
+
+       # At this point, the rewind processing is ready to run.
+       # We now have a very simple scenario with a few diverged WAL record.
+       # The real testing begins really now with a bifurcation of the possible
+       # scenarios that pg_rewind supports.
+
+       # Keep a temporary postgresql.conf for master node or it would be
+       # overwritten during the rewind.
+       copy("$test_master_datadir/postgresql.conf", "$testroot/master-postgresql.conf.tmp");
+       # Now run pg_rewind
+       if ($test_mode == "local")
+       {
+               # Do rewind using a local pgdata as source
+               # Stop the master and be ready to perform the rewind
+               system_or_bail("pg_ctl -w -D $test_standby_datadir stop -m fast >>$log_path 2>&1");
+               my $result =
+                       run(['./pg_rewind',
+                                "--debug",
+                                "--source-pgdata=$test_standby_datadir",
+                                "--target-pgdata=$test_master_datadir"],
+                               '>>', $log_path, '2>&1');
+               ok ($result, 'pg_rewind local');
+       }
+       elsif ($test_mode == "remote")
+       {
+               # Do rewind using a remote connection as source
+               my $result =
+                       run(['./pg_rewind',
+                                "--source-server=\"port=$port_standby dbname=postgres\"",
+                                "--target-pgdata=$test_master_datadir"],
+                               '>>', $log_path, '2>&1');
+               ok ($result, 'pg_rewind remote');
+       } else {
+               # Cannot come here normally
+               die("Incorrect test mode specified");
+       }
+
+       # Now move back postgresql.conf with old settings
+       move("$testroot/master-postgresql.conf.tmp", "$test_master_datadir/postgresql.conf");
+
+       # Plug-in rewound node to the now-promoted standby node
+       append_to_file("$test_master_datadir/recovery.conf", qq(
+primary_conninfo='port=$port_standby'
+standby_mode=on
+recovery_target_timeline='latest'
+));
+
+       # Restart the master to check that rewind went correctly
+       system_or_bail("pg_ctl -w -D $test_master_datadir -o \"-k $tempdir_short --listen-addresses='' -p $port_master\" start >>$log_path 2>&1");
+
+       #### Now run the test-specific parts to check the result
+}
+
+# Clean up after the test. Stop both servers, if they're still running.
+END
+{
+       my $save_rc = $?;
+       if ($test_master_datadir)
+       {
+               system "pg_ctl -D $test_master_datadir -s -m immediate stop 2> /dev/null";
+       }
+       if ($test_standby_datadir)
+       {
+               system "pg_ctl -D $test_standby_datadir -s -m immediate stop 2> /dev/null";
+       }
+       $? = $save_rc;
+}
diff --git a/src/bin/pg_rewind/copy_fetch.c b/src/bin/pg_rewind/copy_fetch.c
new file mode 100644 (file)
index 0000000..887fec9
--- /dev/null
@@ -0,0 +1,261 @@
+/*-------------------------------------------------------------------------
+ *
+ * copy_fetch.c
+ *       Functions for using a data directory as the source.
+ *
+ * Portions Copyright (c) 2013-2015, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "datapagemap.h"
+#include "fetch.h"
+#include "file_ops.h"
+#include "filemap.h"
+#include "logging.h"
+#include "pg_rewind.h"
+
+#include "catalog/catalog.h"
+
+static void recurse_dir(const char *datadir, const char *path,
+                       process_file_callback_t callback);
+
+static void execute_pagemap(datapagemap_t *pagemap, const char *path);
+
+/*
+ * Traverse through all files in a data directory, calling 'callback'
+ * for each file.
+ */
+void
+traverse_datadir(const char *datadir, process_file_callback_t callback)
+{
+       recurse_dir(datadir, NULL, callback);
+}
+
+/*
+ * recursive part of traverse_datadir
+ */
+static void
+recurse_dir(const char *datadir, const char *parentpath,
+                       process_file_callback_t callback)
+{
+       DIR                *xldir;
+       struct dirent *xlde;
+       char            fullparentpath[MAXPGPATH];
+
+       if (parentpath)
+               snprintf(fullparentpath, MAXPGPATH, "%s/%s", datadir, parentpath);
+       else
+               snprintf(fullparentpath, MAXPGPATH, "%s", datadir);
+
+       xldir = opendir(fullparentpath);
+       if (xldir == NULL)
+               pg_fatal("could not open directory \"%s\": %s\n",
+                                fullparentpath, strerror(errno));
+
+       while (errno = 0, (xlde = readdir(xldir)) != NULL)
+       {
+               struct stat fst;
+               char            fullpath[MAXPGPATH];
+               char            path[MAXPGPATH];
+
+               if (strcmp(xlde->d_name, ".") == 0 ||
+                       strcmp(xlde->d_name, "..") == 0)
+                       continue;
+
+               snprintf(fullpath, MAXPGPATH, "%s/%s", fullparentpath, xlde->d_name);
+
+               if (lstat(fullpath, &fst) < 0)
+               {
+                       pg_log(PG_WARNING, "could not stat file \"%s\": %s",
+                                  fullpath, strerror(errno));
+
+                       /*
+                        * This is ok, if the new master is running and the file was just
+                        * removed. If it was a data file, there should be a WAL record of
+                        * the removal. If it was something else, it couldn't have been
+                        * critical anyway.
+                        *
+                        * TODO: But complain if we're processing the target dir!
+                        */
+               }
+
+               if (parentpath)
+                       snprintf(path, MAXPGPATH, "%s/%s", parentpath, xlde->d_name);
+               else
+                       snprintf(path, MAXPGPATH, "%s", xlde->d_name);
+
+               if (S_ISREG(fst.st_mode))
+                       callback(path, FILE_TYPE_REGULAR, fst.st_size, NULL);
+               else if (S_ISDIR(fst.st_mode))
+               {
+                       callback(path, FILE_TYPE_DIRECTORY, 0, NULL);
+                       /* recurse to handle subdirectories */
+                       recurse_dir(datadir, path, callback);
+               }
+#ifndef WIN32
+               else if (S_ISLNK(fst.st_mode))
+#else
+               else if (pgwin32_is_junction(fullpath))
+#endif
+               {
+#if defined(HAVE_READLINK) || defined(WIN32)
+                       char            link_target[MAXPGPATH];
+                       ssize_t         len;
+
+                       len = readlink(fullpath, link_target, sizeof(link_target) - 1);
+                       if (len == -1)
+                               pg_fatal("readlink() failed on \"%s\": %s\n",
+                                                fullpath, strerror(errno));
+
+                       if (len == sizeof(link_target) - 1)
+                       {
+                               /* path was truncated */
+                               pg_fatal("symbolic link \"%s\" target path too long\n",
+                                                fullpath);
+                       }
+
+                       callback(path, FILE_TYPE_SYMLINK, 0, link_target);
+
+                       /*
+                        * If it's a symlink within pg_tblspc, we need to recurse into it,
+                        * to process all the tablespaces.
+                        */
+                       if (strcmp(parentpath, "pg_tblspc") == 0)
+                               recurse_dir(datadir, path, callback);
+#else
+                       pg_fatal("\"%s\" is a symbolic link, but symbolic links are not supported on this platform\n",
+                                        fullpath);
+#endif   /* HAVE_READLINK */
+               }
+       }
+
+       if (errno)
+               pg_fatal("could not read directory \"%s\": %s\n",
+                                fullparentpath, strerror(errno));
+
+       if (closedir(xldir))
+               pg_fatal("could not close archive location \"%s\": %s\n",
+                                fullparentpath, strerror(errno));
+}
+
+/*
+ * Copy a file from source to target, between 'begin' and 'end' offsets.
+ *
+ * If 'trunc' is true, any existing file with the same name is truncated.
+ */
+static void
+copy_file_range(const char *path, off_t begin, off_t end, bool trunc)
+{
+       char            buf[BLCKSZ];
+       char            srcpath[MAXPGPATH];
+       int                     srcfd;
+
+       snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir_source, path);
+
+       srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
+       if (srcfd < 0)
+               pg_fatal("could not open source file \"%s\": %s\n",
+                                srcpath, strerror(errno));
+
+       if (lseek(srcfd, begin, SEEK_SET) == -1)
+               pg_fatal("could not seek in source file: %s\n", strerror(errno));
+
+       open_target_file(path, trunc);
+
+       while (end - begin > 0)
+       {
+               int                     readlen;
+               int                     len;
+
+               if (end - begin > sizeof(buf))
+                       len = sizeof(buf);
+               else
+                       len = end - begin;
+
+               readlen = read(srcfd, buf, len);
+
+               if (readlen < 0)
+                       pg_fatal("could not read file \"%s\": %s\n",
+                                        srcpath, strerror(errno));
+               else if (readlen == 0)
+                       pg_fatal("unexpected EOF while reading file \"%s\"\n", srcpath);
+
+               write_target_range(buf, begin, readlen);
+               begin += readlen;
+       }
+
+       if (close(srcfd) != 0)
+               pg_fatal("error closing file \"%s\": %s\n", srcpath, strerror(errno));
+}
+
+/*
+ * Copy all relation data files from datadir_source to datadir_target, which
+ * are marked in the given data page map.
+ */
+void
+copy_executeFileMap(filemap_t *map)
+{
+       file_entry_t *entry;
+       int                     i;
+
+       for (i = 0; i < map->narray; i++)
+       {
+               entry = map->array[i];
+               execute_pagemap(&entry->pagemap, entry->path);
+
+               switch (entry->action)
+               {
+                       case FILE_ACTION_NONE:
+                               /* ok, do nothing.. */
+                               break;
+
+                       case FILE_ACTION_COPY:
+                               copy_file_range(entry->path, 0, entry->newsize, true);
+                               break;
+
+                       case FILE_ACTION_TRUNCATE:
+                               truncate_target_file(entry->path, entry->newsize);
+                               break;
+
+                       case FILE_ACTION_COPY_TAIL:
+                               copy_file_range(entry->path, entry->oldsize, entry->newsize, false);
+                               break;
+
+                       case FILE_ACTION_CREATE:
+                               create_target(entry);
+                               break;
+
+                       case FILE_ACTION_REMOVE:
+                               remove_target(entry);
+                               break;
+               }
+       }
+
+       close_target_file();
+}
+
+static void
+execute_pagemap(datapagemap_t *pagemap, const char *path)
+{
+       datapagemap_iterator_t *iter;
+       BlockNumber blkno;
+       off_t           offset;
+
+       iter = datapagemap_iterate(pagemap);
+       while (datapagemap_next(iter, &blkno))
+       {
+               offset = blkno * BLCKSZ;
+               copy_file_range(path, offset, offset + BLCKSZ, false);
+               /* Ok, this block has now been copied from new data dir to old */
+       }
+       free(iter);
+}
diff --git a/src/bin/pg_rewind/datapagemap.c b/src/bin/pg_rewind/datapagemap.c
new file mode 100644 (file)
index 0000000..3477366
--- /dev/null
@@ -0,0 +1,126 @@
+/*-------------------------------------------------------------------------
+ *
+ * datapagemap.c
+ *       A data structure for keeping track of data pages that have changed.
+ *
+ * This is a fairly simple bitmap.
+ *
+ * Copyright (c) 2013-2015, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include "datapagemap.h"
+
+struct datapagemap_iterator
+{
+       datapagemap_t *map;
+       BlockNumber nextblkno;
+};
+
+/*****
+ * Public functions
+ */
+
+/*
+ * Add a block to the bitmap.
+ */
+void
+datapagemap_add(datapagemap_t *map, BlockNumber blkno)
+{
+       int                     offset;
+       int                     bitno;
+
+       offset = blkno / 8;
+       bitno = blkno % 8;
+
+       /* enlarge or create bitmap if needed */
+       if (map->bitmapsize <= offset)
+       {
+               int                     oldsize = map->bitmapsize;
+               int                     newsize;
+
+               /*
+                * The minimum to hold the new bit is offset + 1. But add some
+                * headroom, so that we don't need to repeatedly enlarge the bitmap in
+                * the common case that blocks are modified in order, from beginning
+                * of a relation to the end.
+                */
+               newsize = offset + 1;
+               newsize += 10;
+
+               map->bitmap = pg_realloc(map->bitmap, newsize);
+
+               /* zero out the newly allocated region */
+               memset(&map->bitmap[oldsize], 0, newsize - oldsize);
+
+               map->bitmapsize = newsize;
+       }
+
+       /* Set the bit */
+       map->bitmap[offset] |= (1 << bitno);
+}
+
+/*
+ * Start iterating through all entries in the page map.
+ *
+ * After datapagemap_iterate, call datapagemap_next to return the entries,
+ * until it returns NULL. After you're done, use free() to destroy the
+ * iterator.
+ */
+datapagemap_iterator_t *
+datapagemap_iterate(datapagemap_t *map)
+{
+       datapagemap_iterator_t *iter;
+
+       iter = pg_malloc(sizeof(datapagemap_iterator_t));
+       iter->map = map;
+       iter->nextblkno = 0;
+
+       return iter;
+}
+
+bool
+datapagemap_next(datapagemap_iterator_t *iter, BlockNumber *blkno)
+{
+       datapagemap_t *map = iter->map;
+
+       for (;;)
+       {
+               BlockNumber blk = iter->nextblkno;
+               int                     nextoff = blk / 8;
+               int                     bitno = blk % 8;
+
+               if (nextoff >= map->bitmapsize)
+                       break;
+
+               iter->nextblkno++;
+
+               if (map->bitmap[nextoff] & (1 << bitno))
+               {
+                       *blkno = blk;
+                       return true;
+               }
+       }
+
+       /* no more set bits in this bitmap. */
+       return false;
+}
+
+/*
+ * A debugging aid. Prints out the contents of the page map.
+ */
+void
+datapagemap_print(datapagemap_t *map)
+{
+       datapagemap_iterator_t *iter;
+       BlockNumber blocknum;
+
+       iter = datapagemap_iterate(map);
+       while (datapagemap_next(iter, &blocknum))
+               printf("  blk %u\n", blocknum);
+
+       free(iter);
+}
diff --git a/src/bin/pg_rewind/datapagemap.h b/src/bin/pg_rewind/datapagemap.h
new file mode 100644 (file)
index 0000000..32bbec8
--- /dev/null
@@ -0,0 +1,32 @@
+/*-------------------------------------------------------------------------
+ *
+ * datapagemap.h
+ *
+ * Copyright (c) 2013-2015, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef DATAPAGEMAP_H
+#define DATAPAGEMAP_H
+
+#include "storage/relfilenode.h"
+#include "storage/block.h"
+
+
+struct datapagemap
+{
+       char       *bitmap;
+       int                     bitmapsize;
+};
+
+typedef struct datapagemap datapagemap_t;
+typedef struct datapagemap_iterator datapagemap_iterator_t;
+
+extern datapagemap_t *datapagemap_create(void);
+extern void datapagemap_destroy(datapagemap_t *map);
+extern void datapagemap_add(datapagemap_t *map, BlockNumber blkno);
+extern datapagemap_iterator_t *datapagemap_iterate(datapagemap_t *map);
+extern bool datapagemap_next(datapagemap_iterator_t *iter, BlockNumber *blkno);
+extern void datapagemap_print(datapagemap_t *map);
+
+#endif   /* DATAPAGEMAP_H */
diff --git a/src/bin/pg_rewind/fetch.c b/src/bin/pg_rewind/fetch.c
new file mode 100644 (file)
index 0000000..eb2dd24
--- /dev/null
@@ -0,0 +1,61 @@
+/*-------------------------------------------------------------------------
+ *
+ * fetch.c
+ *       Functions for fetching files from a local or remote data dir
+ *
+ * This file forms an abstraction of getting files from the "source".
+ * There are two implementations of this interface: one for copying files
+ * from a data directory via normal filesystem operations (copy_fetch.c),
+ * and another for fetching files from a remote server via a libpq
+ * connection (libpq_fetch.c)
+ *
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "pg_rewind.h"
+#include "fetch.h"
+#include "file_ops.h"
+#include "filemap.h"
+
+void
+fetchRemoteFileList(void)
+{
+       if (datadir_source)
+               traverse_datadir(datadir_source, &process_remote_file);
+       else
+               libpqProcessFileList();
+}
+
+/*
+ * Fetch all relation data files that are marked in the given data page map.
+ */
+void
+executeFileMap(void)
+{
+       if (datadir_source)
+               copy_executeFileMap(filemap);
+       else
+               libpq_executeFileMap(filemap);
+}
+
+/*
+ * Fetch a single file into a malloc'd buffer. The file size is returned
+ * in *filesize. The returned buffer is always zero-terminated, which is
+ * handy for text files.
+ */
+char *
+fetchFile(char *filename, size_t *filesize)
+{
+       if (datadir_source)
+               return slurpFile(datadir_source, filename, filesize);
+       else
+               return libpqGetFile(filename, filesize);
+}
diff --git a/src/bin/pg_rewind/fetch.h b/src/bin/pg_rewind/fetch.h
new file mode 100644 (file)
index 0000000..d0e7dd3
--- /dev/null
@@ -0,0 +1,46 @@
+/*-------------------------------------------------------------------------
+ *
+ * fetch.h
+ *       Fetching data from a local or remote data directory.
+ *
+ * This file includes the prototypes for functions used to copy files from
+ * one data directory to another. The source to copy from can be a local
+ * directory (copy method), or a remote PostgreSQL server (libpq fetch
+ * method).
+ *
+ * Copyright (c) 2013-2015, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef FETCH_H
+#define FETCH_H
+
+#include "c.h"
+
+#include "access/xlogdefs.h"
+
+#include "filemap.h"
+
+/*
+ * Common interface. Calls the copy or libpq method depending on global
+ * config options.
+ */
+extern void fetchRemoteFileList(void);
+extern char *fetchFile(char *filename, size_t *filesize);
+extern void executeFileMap(void);
+
+/* in libpq_fetch.c */
+extern void libpqProcessFileList(void);
+extern char *libpqGetFile(const char *filename, size_t *filesize);
+extern void libpq_executeFileMap(filemap_t *map);
+
+extern void libpqConnect(const char *connstr);
+extern XLogRecPtr libpqGetCurrentXlogInsertLocation(void);
+
+/* in copy_fetch.c */
+extern void copy_executeFileMap(filemap_t *map);
+
+typedef void (*process_file_callback_t) (const char *path, file_type_t type, size_t size, const char *link_target);
+extern void traverse_datadir(const char *datadir, process_file_callback_t callback);
+
+#endif   /* FETCH_H */
diff --git a/src/bin/pg_rewind/file_ops.c b/src/bin/pg_rewind/file_ops.c
new file mode 100644 (file)
index 0000000..589a01a
--- /dev/null
@@ -0,0 +1,305 @@
+/*-------------------------------------------------------------------------
+ *
+ * file_ops.c
+ *       Helper functions for operating on files.
+ *
+ * Most of the functions in this file are helper functions for writing to
+ * the target data directory. The functions check the --dry-run flag, and
+ * do nothing if it's enabled. You should avoid accessing the target files
+ * directly but if you do, make sure you honor the --dry-run mode!
+ *
+ * Portions Copyright (c) 2013-2015, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "file_ops.h"
+#include "filemap.h"
+#include "logging.h"
+#include "pg_rewind.h"
+
+/*
+ * Currently open destination file.
+ */
+static int     dstfd = -1;
+static char dstpath[MAXPGPATH] = "";
+
+static void remove_target_file(const char *path);
+static void create_target_dir(const char *path);
+static void remove_target_dir(const char *path);
+static void create_target_symlink(const char *path, const char *link);
+static void remove_target_symlink(const char *path);
+
+/*
+ * Open a target file for writing. If 'trunc' is true and the file already
+ * exists, it will be truncated.
+ */
+void
+open_target_file(const char *path, bool trunc)
+{
+       int                     mode;
+
+       if (dry_run)
+               return;
+
+       if (dstfd != -1 && !trunc &&
+               strcmp(path, &dstpath[strlen(datadir_target) + 1]) == 0)
+               return;                                 /* already open */
+
+       close_target_file();
+
+       snprintf(dstpath, sizeof(dstpath), "%s/%s", datadir_target, path);
+
+       mode = O_WRONLY | O_CREAT | PG_BINARY;
+       if (trunc)
+               mode |= O_TRUNC;
+       dstfd = open(dstpath, mode, 0600);
+       if (dstfd < 0)
+               pg_fatal("could not open destination file \"%s\": %s\n",
+                                dstpath, strerror(errno));
+}
+
+/*
+ * Close target file, if it's open.
+ */
+void
+close_target_file(void)
+{
+       if (dstfd == -1)
+               return;
+
+       if (close(dstfd) != 0)
+               pg_fatal("error closing destination file \"%s\": %s\n",
+                                dstpath, strerror(errno));
+
+       dstfd = -1;
+       /* fsync? */
+}
+
+void
+write_target_range(char *buf, off_t begin, size_t size)
+{
+       int                     writeleft;
+       char       *p;
+
+       /* update progress report */
+       fetch_done += size;
+       progress_report(false);
+
+       if (dry_run)
+               return;
+
+       if (lseek(dstfd, begin, SEEK_SET) == -1)
+               pg_fatal("could not seek in destination file \"%s\": %s\n",
+                                dstpath, strerror(errno));
+
+       writeleft = size;
+       p = buf;
+       while (writeleft > 0)
+       {
+               int                     writelen;
+
+               writelen = write(dstfd, p, writeleft);
+               if (writelen < 0)
+                       pg_fatal("could not write file \"%s\": %s\n",
+                                        dstpath, strerror(errno));
+
+               p += writelen;
+               writeleft -= writelen;
+       }
+
+       /* keep the file open, in case we need to copy more blocks in it */
+}
+
+
+void
+remove_target(file_entry_t *entry)
+{
+       Assert(entry->action == FILE_ACTION_REMOVE);
+
+       switch (entry->type)
+       {
+               case FILE_TYPE_DIRECTORY:
+                       remove_target_dir(entry->path);
+                       break;
+
+               case FILE_TYPE_REGULAR:
+                       remove_target_file(entry->path);
+                       break;
+
+               case FILE_TYPE_SYMLINK:
+                       remove_target_symlink(entry->path);
+                       break;
+       }
+}
+
+void
+create_target(file_entry_t *entry)
+{
+       Assert(entry->action == FILE_ACTION_CREATE);
+
+       switch (entry->type)
+       {
+               case FILE_TYPE_DIRECTORY:
+                       create_target_dir(entry->path);
+                       break;
+
+               case FILE_TYPE_SYMLINK:
+                       create_target_symlink(entry->path, entry->link_target);
+                       break;
+
+               case FILE_TYPE_REGULAR:
+                       /* can't happen. Regular files are created with open_target_file. */
+                       pg_fatal("invalid action (CREATE) for regular file\n");
+                       break;
+       }
+}
+
+static void
+remove_target_file(const char *path)
+{
+       char            dstpath[MAXPGPATH];
+
+       if (dry_run)
+               return;
+
+       snprintf(dstpath, sizeof(dstpath), "%s/%s", datadir_target, path);
+       if (unlink(dstpath) != 0)
+               pg_fatal("could not remove file \"%s\": %s\n",
+                                dstpath, strerror(errno));
+}
+
+void
+truncate_target_file(const char *path, off_t newsize)
+{
+       char            dstpath[MAXPGPATH];
+       int                     fd;
+
+       if (dry_run)
+               return;
+
+       snprintf(dstpath, sizeof(dstpath), "%s/%s", datadir_target, path);
+
+       fd = open(dstpath, O_WRONLY, 0);
+       if (fd < 0)
+               pg_fatal("could not open file \"%s\" for truncation: %s\n",
+                                dstpath, strerror(errno));
+
+       if (ftruncate(fd, newsize) != 0)
+               pg_fatal("could not truncate file \"%s\" to %u bytes: %s\n",
+                                dstpath, (unsigned int) newsize, strerror(errno));
+
+       close(fd);
+}
+
+static void
+create_target_dir(const char *path)
+{
+       char            dstpath[MAXPGPATH];
+
+       if (dry_run)
+               return;
+
+       snprintf(dstpath, sizeof(dstpath), "%s/%s", datadir_target, path);
+       if (mkdir(dstpath, S_IRWXU) != 0)
+               pg_fatal("could not create directory \"%s\": %s\n",
+                                dstpath, strerror(errno));
+}
+
+static void
+remove_target_dir(const char *path)
+{
+       char            dstpath[MAXPGPATH];
+
+       if (dry_run)
+               return;
+
+       snprintf(dstpath, sizeof(dstpath), "%s/%s", datadir_target, path);
+       if (rmdir(dstpath) != 0)
+               pg_fatal("could not remove directory \"%s\": %s\n",
+                                dstpath, strerror(errno));
+}
+
+static void
+create_target_symlink(const char *path, const char *link)
+{
+       char            dstpath[MAXPGPATH];
+
+       if (dry_run)
+               return;
+
+       snprintf(dstpath, sizeof(dstpath), "%s/%s", datadir_target, path);
+       if (symlink(link, dstpath) != 0)
+               pg_fatal("could not create symbolic link at \"%s\": %s\n",
+                                dstpath, strerror(errno));
+}
+
+static void
+remove_target_symlink(const char *path)
+{
+       char            dstpath[MAXPGPATH];
+
+       if (dry_run)
+               return;
+
+       snprintf(dstpath, sizeof(dstpath), "%s/%s", datadir_target, path);
+       if (unlink(dstpath) != 0)
+               pg_fatal("could not remove symbolic link \"%s\": %s\n",
+                                dstpath, strerror(errno));
+}
+
+
+/*
+ * Read a file into memory. The file to be read is <datadir>/<path>.
+ * The file contents are returned in a malloc'd buffer, and *filesize
+ * is set to the length of the file.
+ *
+ * The returned buffer is always zero-terminated; the size of the returned
+ * buffer is actually *filesize + 1. That's handy when reading a text file.
+ * This function can be used to read binary files as well, you can just
+ * ignore the zero-terminator in that case.
+ *
+ * This function is used to implement the fetchFile function in the "fetch"
+ * interface (see fetch.c), but is also called directly.
+ */
+char *
+slurpFile(const char *datadir, const char *path, size_t *filesize)
+{
+       int                     fd;
+       char       *buffer;
+       struct stat statbuf;
+       char            fullpath[MAXPGPATH];
+       int                     len;
+
+       snprintf(fullpath, sizeof(fullpath), "%s/%s", datadir, path);
+
+       if ((fd = open(fullpath, O_RDONLY | PG_BINARY, 0)) == -1)
+               pg_fatal("could not open file \"%s\" for reading: %s\n",
+                                fullpath, strerror(errno));
+
+       if (fstat(fd, &statbuf) < 0)
+               pg_fatal("could not open file \"%s\" for reading: %s\n",
+                                fullpath, strerror(errno));
+
+       len = statbuf.st_size;
+
+       buffer = pg_malloc(len + 1);
+
+       if (read(fd, buffer, len) != len)
+               pg_fatal("could not read file \"%s\": %s\n",
+                                fullpath, strerror(errno));
+       close(fd);
+
+       /* Zero-terminate the buffer. */
+       buffer[len] = '\0';
+
+       if (filesize)
+               *filesize = len;
+       return buffer;
+}
diff --git a/src/bin/pg_rewind/file_ops.h b/src/bin/pg_rewind/file_ops.h
new file mode 100644 (file)
index 0000000..f68c71d
--- /dev/null
@@ -0,0 +1,24 @@
+/*-------------------------------------------------------------------------
+ *
+ * file_ops.h
+ *       Helper functions for operating on files
+ *
+ * Copyright (c) 2013-2015, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef FILE_OPS_H
+#define FILE_OPS_H
+
+#include "filemap.h"
+
+extern void open_target_file(const char *path, bool trunc);
+extern void write_target_range(char *buf, off_t begin, size_t size);
+extern void close_target_file(void);
+extern void truncate_target_file(const char *path, off_t newsize);
+extern void create_target(file_entry_t *t);
+extern void remove_target(file_entry_t *t);
+
+extern char *slurpFile(const char *datadir, const char *path, size_t *filesize);
+
+#endif   /* FILE_OPS_H */
diff --git a/src/bin/pg_rewind/filemap.c b/src/bin/pg_rewind/filemap.c
new file mode 100644 (file)
index 0000000..4e02647
--- /dev/null
@@ -0,0 +1,667 @@
+/*-------------------------------------------------------------------------
+ *
+ * filemap.c
+ *       A data structure for keeping track of files that have changed.
+ *
+ * Copyright (c) 2013-2015, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "datapagemap.h"
+#include "filemap.h"
+#include "logging.h"
+#include "pg_rewind.h"
+
+#include "common/string.h"
+#include "catalog/pg_tablespace.h"
+#include "storage/fd.h"
+
+filemap_t  *filemap = NULL;
+
+static bool isRelDataFile(const char *path);
+static char *datasegpath(RelFileNode rnode, ForkNumber forknum,
+                       BlockNumber segno);
+static int     path_cmp(const void *a, const void *b);
+static int     final_filemap_cmp(const void *a, const void *b);
+static void filemap_list_to_array(void);
+
+/*
+ * Create a new file map.
+ */
+filemap_t *
+filemap_create(void)
+{
+       filemap_t  *map;
+
+       map = pg_malloc(sizeof(filemap_t));
+       map->first = map->last = NULL;
+       map->nlist = 0;
+       map->array = NULL;
+       map->narray = 0;
+
+       Assert(filemap == NULL);
+       filemap = map;
+
+       return map;
+}
+
+/*
+ * Callback for processing remote file list.
+ *
+ * This is called once for every file in the source server. We decide what
+ * action needs to be taken for the file, depending on whether the file
+ * exists in the target and whether the size matches.
+ */
+void
+process_remote_file(const char *path, file_type_t type, size_t newsize,
+                                       const char *link_target)
+{
+       bool            exists;
+       char            localpath[MAXPGPATH];
+       struct stat statbuf;
+       filemap_t  *map = filemap;
+       file_action_t action = FILE_ACTION_NONE;
+       size_t          oldsize = 0;
+       file_entry_t *entry;
+
+       Assert(map->array == NULL);
+
+       /*
+        * Completely ignore some special files in source and destination.
+        */
+       if (strcmp(path, "postmaster.pid") == 0 ||
+               strcmp(path, "postmaster.opts") == 0)
+               return;
+
+       /*
+        * Skip temporary files, .../pgsql_tmp/... and .../pgsql_tmp.* in source.
+        * This has the effect that all temporary files in the destination will be
+        * removed.
+        */
+       if (strstr(path, "/" PG_TEMP_FILE_PREFIX) != NULL)
+               return;
+       if (strstr(path, "/" PG_TEMP_FILES_DIR "/") != NULL)
+               return;
+
+       /*
+        * sanity check: a filename that looks like a data file better be a
+        * regular file
+        */
+       if (type != FILE_TYPE_REGULAR && isRelDataFile(path))
+               pg_fatal("data file in source \"%s\" is not a regular file\n", path);
+
+       snprintf(localpath, sizeof(localpath), "%s/%s", datadir_target, path);
+
+       /* Does the corresponding local file exist? */
+       if (lstat(localpath, &statbuf) < 0)
+       {
+               if (errno != ENOENT)
+                       pg_fatal("could not stat file \"%s\": %s\n",
+                                        localpath, strerror(errno));
+
+               exists = false;
+       }
+       else
+               exists = true;
+
+       switch (type)
+       {
+               case FILE_TYPE_DIRECTORY:
+                       if (exists && !S_ISDIR(statbuf.st_mode))
+                       {
+                               /* it's a directory in target, but not in source. Strange.. */
+                               pg_fatal("\"%s\" is not a directory\n", localpath);
+                       }
+
+                       if (!exists)
+                               action = FILE_ACTION_CREATE;
+                       else
+                               action = FILE_ACTION_NONE;
+                       oldsize = 0;
+                       break;
+
+               case FILE_TYPE_SYMLINK:
+                       if (exists &&
+#ifndef WIN32
+                               !S_ISLNK(statbuf.st_mode)
+#else
+                               !pgwin32_is_junction(localpath)
+#endif
+                               )
+                       {
+                               /*
+                                * It's a symbolic link in target, but not in source.
+                                * Strange..
+                                */
+                               pg_fatal("\"%s\" is not a symbolic link\n", localpath);
+                       }
+
+                       if (!exists)
+                               action = FILE_ACTION_CREATE;
+                       else
+                               action = FILE_ACTION_NONE;
+                       oldsize = 0;
+                       break;
+
+               case FILE_TYPE_REGULAR:
+                       if (exists && !S_ISREG(statbuf.st_mode))
+                               pg_fatal("\"%s\" is not a regular file\n", localpath);
+
+                       if (!exists || !isRelDataFile(path))
+                       {
+                               /*
+                                * File exists in source, but not in target. Or it's a
+                                * non-data file that we have no special processing for. Copy
+                                * it in toto.
+                                *
+                                * An exception: PG_VERSIONs should be identical, but avoid
+                                * overwriting it for paranoia.
+                                */
+                               if (pg_str_endswith(path, "PG_VERSION"))
+                               {
+                                       action = FILE_ACTION_NONE;
+                                       oldsize = statbuf.st_size;
+                               }
+                               else
+                               {
+                                       action = FILE_ACTION_COPY;
+                                       oldsize = 0;
+                               }
+                       }
+                       else
+                       {
+                               /*
+                                * It's a data file that exists in both.
+                                *
+                                * If it's larger in target, we can truncate it. There will
+                                * also be a WAL record of the truncation in the source
+                                * system, so WAL replay would eventually truncate the target
+                                * too, but we might as well do it now.
+                                *
+                                * If it's smaller in the target, it means that it has been
+                                * truncated in the target, or enlarged in the source, or
+                                * both. If it was truncated locally, we need to copy the
+                                * missing tail from the remote system. If it was enlarged in
+                                * the remote system, there will be WAL records in the remote
+                                * system for the new blocks, so we wouldn't need to copy them
+                                * here. But we don't know which scenario we're dealing with,
+                                * and there's no harm in copying the missing blocks now, so
+                                * do it now.
+                                *
+                                * If it's the same size, do nothing here. Any locally
+                                * modified blocks will be copied based on parsing the local
+                                * WAL, and any remotely modified blocks will be updated after
+                                * rewinding, when the remote WAL is replayed.
+                                */
+                               oldsize = statbuf.st_size;
+                               if (oldsize < newsize)
+                                       action = FILE_ACTION_COPY_TAIL;
+                               else if (oldsize > newsize)
+                                       action = FILE_ACTION_TRUNCATE;
+                               else
+                                       action = FILE_ACTION_NONE;
+                       }
+                       break;
+       }
+
+       /* Create a new entry for this file */
+       entry = pg_malloc(sizeof(file_entry_t));
+       entry->path = pg_strdup(path);
+       entry->type = type;
+       entry->action = action;
+       entry->oldsize = oldsize;
+       entry->newsize = newsize;
+       entry->link_target = link_target ? pg_strdup(link_target) : NULL;
+       entry->next = NULL;
+       entry->pagemap.bitmap = NULL;
+       entry->pagemap.bitmapsize = 0;
+       entry->isrelfile = isRelDataFile(path);
+
+       if (map->last)
+       {
+               map->last->next = entry;
+               map->last = entry;
+       }
+       else
+               map->first = map->last = entry;
+       map->nlist++;
+}
+
+/*
+ * Callback for processing local file list.
+ *
+ * All remote files must be already processed before calling this. This only
+ * marks local files that didn't exist in the remote system for deletion.
+ */
+void
+process_local_file(const char *path, file_type_t type, size_t oldsize,
+                                  const char *link_target)
+{
+       bool            exists;
+       char            localpath[MAXPGPATH];
+       struct stat statbuf;
+       file_entry_t key;
+       file_entry_t *key_ptr;
+       filemap_t  *map = filemap;
+       file_entry_t *entry;
+
+       snprintf(localpath, sizeof(localpath), "%s/%s", datadir_target, path);
+       if (lstat(localpath, &statbuf) < 0)
+       {
+               if (errno != ENOENT)
+                       pg_fatal("could not stat file \"%s\": %s",
+                                        localpath, strerror(errno));
+
+               exists = false;
+       }
+
+       if (map->array == NULL)
+       {
+               /* on first call, initialize lookup array */
+               if (map->nlist == 0)
+               {
+                       /* should not happen */
+                       pg_fatal("remote file list is empty\n");
+               }
+
+               filemap_list_to_array();
+               qsort(map->array, map->narray, sizeof(file_entry_t *), path_cmp);
+       }
+
+       /*
+        * Completely ignore some special files
+        */
+       if (strcmp(path, "postmaster.pid") == 0 ||
+               strcmp(path, "postmaster.opts") == 0)
+               return;
+
+       key.path = (char *) path;
+       key_ptr = &key;
+       exists = bsearch(&key_ptr, map->array, map->narray, sizeof(file_entry_t *),
+                                        path_cmp) != NULL;
+
+       /* Remove any file or folder that doesn't exist in the remote system. */
+       if (!exists)
+       {
+               entry = pg_malloc(sizeof(file_entry_t));
+               entry->path = pg_strdup(path);
+               entry->type = type;
+               entry->action = FILE_ACTION_REMOVE;
+               entry->oldsize = oldsize;
+               entry->newsize = 0;
+               entry->link_target = link_target ? pg_strdup(link_target) : NULL;
+               entry->next = NULL;
+               entry->pagemap.bitmap = NULL;
+               entry->pagemap.bitmapsize = 0;
+               entry->isrelfile = isRelDataFile(path);
+
+               if (map->last == NULL)
+                       map->first = entry;
+               else
+                       map->last->next = entry;
+               map->last = entry;
+               map->nlist++;
+       }
+       else
+       {
+               /*
+                * We already handled all files that exist in the remote system in
+                * process_remote_file().
+                */
+       }
+}
+
+/*
+ * This callback gets called while we read the old WAL, for every block that
+ * have changed in the local system. It makes note of all the changed blocks
+ * in the pagemap of the file.
+ */
+void
+process_block_change(ForkNumber forknum, RelFileNode rnode, BlockNumber blkno)
+{
+       char       *path;
+       file_entry_t key;
+       file_entry_t *key_ptr;
+       file_entry_t *entry;
+       BlockNumber blkno_inseg;
+       int                     segno;
+       filemap_t  *map = filemap;
+       file_entry_t **e;
+
+       Assert(filemap->array);
+
+       segno = blkno / RELSEG_SIZE;
+       blkno_inseg = blkno % RELSEG_SIZE;
+
+       path = datasegpath(rnode, forknum, segno);
+
+       key.path = (char *) path;
+       key_ptr = &key;
+
+       e = bsearch(&key_ptr, map->array, map->narray, sizeof(file_entry_t *),
+                               path_cmp);
+       if (e)
+               entry = *e;
+       else
+               entry = NULL;
+       free(path);
+
+       if (entry)
+       {
+               Assert(entry->isrelfile);
+
+               switch (entry->action)
+               {
+                       case FILE_ACTION_NONE:
+                       case FILE_ACTION_TRUNCATE:
+                               /* skip if we're truncating away the modified block anyway */
+                               if ((blkno_inseg + 1) * BLCKSZ <= entry->newsize)
+                                       datapagemap_add(&entry->pagemap, blkno_inseg);
+                               break;
+
+                       case FILE_ACTION_COPY_TAIL:
+                               /*
+                                * skip the modified block if it is part of the "tail" that
+                                * we're copying anyway.
+                                */
+                               if ((blkno_inseg + 1) * BLCKSZ <= entry->oldsize)
+                                       datapagemap_add(&entry->pagemap, blkno_inseg);
+                               break;
+
+                       case FILE_ACTION_COPY:
+                       case FILE_ACTION_REMOVE:
+                               break;
+
+                       case FILE_ACTION_CREATE:
+                               pg_fatal("unexpected page modification for directory or symbolic link \"%s\"\n", entry->path);
+               }
+       }
+       else
+       {
+               /*
+                * If we don't have any record of this file in the file map, it means
+                * that it's a relation that doesn't exist in the remote system, and
+                * it was subsequently removed in the local system, too. We can safely
+                * ignore it.
+                */
+       }
+}
+
+/*
+ * Convert the linked list of entries in filemap->first/last to the array,
+ * filemap->array.
+ */
+static void
+filemap_list_to_array(void)
+{
+       int                     narray;
+       file_entry_t *entry,
+                          *next;
+
+       filemap->array =
+               pg_realloc(filemap->array,
+                                  (filemap->nlist + filemap->narray) * sizeof(file_entry_t));
+
+       narray = filemap->narray;
+       for (entry = filemap->first; entry != NULL; entry = next)
+       {
+               filemap->array[narray++] = entry;
+               next = entry->next;
+               entry->next = NULL;
+       }
+       Assert(narray == filemap->nlist + filemap->narray);
+       filemap->narray = narray;
+       filemap->nlist = 0;
+       filemap->first = filemap->last = NULL;
+}
+
+void
+filemap_finalize(void)
+{
+       filemap_list_to_array();
+       qsort(filemap->array, filemap->narray, sizeof(file_entry_t *),
+                 final_filemap_cmp);
+}
+
+static const char *
+action_to_str(file_action_t action)
+{
+       switch (action)
+       {
+               case FILE_ACTION_NONE:
+                       return "NONE";
+               case FILE_ACTION_COPY:
+                       return "COPY";
+               case FILE_ACTION_TRUNCATE:
+                       return "TRUNCATE";
+               case FILE_ACTION_COPY_TAIL:
+                       return "COPY_TAIL";
+               case FILE_ACTION_CREATE:
+                       return "CREATE";
+               case FILE_ACTION_REMOVE:
+                       return "REMOVE";
+
+               default:
+                       return "unknown";
+       }
+}
+
+/*
+ * Calculate the totals needed for progress reports.
+ */
+void
+calculate_totals(void)
+{
+       file_entry_t *entry;
+       int                     i;
+       filemap_t  *map = filemap;
+
+       map->total_size = 0;
+       map->fetch_size = 0;
+
+       for (i = 0; i < filemap->narray; i++)
+       {
+               entry = filemap->array[i];
+
+               if (entry->type != FILE_TYPE_REGULAR)
+                       continue;
+
+               map->total_size += entry->newsize;
+
+               if (entry->action == FILE_ACTION_COPY)
+               {
+                       map->fetch_size += entry->newsize;
+                       continue;
+               }
+
+               if (entry->action == FILE_ACTION_COPY_TAIL)
+                       map->fetch_size += (entry->newsize - entry->oldsize);
+
+               if (entry->pagemap.bitmapsize > 0)
+               {
+                       datapagemap_iterator_t *iter;
+                       BlockNumber blk;
+
+                       iter = datapagemap_iterate(&entry->pagemap);
+                       while (datapagemap_next(iter, &blk))
+                               map->fetch_size += BLCKSZ;
+
+                       pg_free(iter);
+               }
+       }
+}
+
+void
+print_filemap(void)
+{
+       file_entry_t *entry;
+       int                     i;
+
+       for (i = 0; i < filemap->narray; i++)
+       {
+               entry = filemap->array[i];
+               if (entry->action != FILE_ACTION_NONE ||
+                       entry->pagemap.bitmapsize > 0)
+               {
+                       printf("%s (%s)\n", entry->path, action_to_str(entry->action));
+
+                       if (entry->pagemap.bitmapsize > 0)
+                               datapagemap_print(&entry->pagemap);
+               }
+       }
+       fflush(stdout);
+}
+
+/*
+ * Does it look like a relation data file?
+ *
+ * For our purposes, only files belonging to the main fork are considered
+ * relation files. Other forks are alwayes copied in toto, because we cannot
+ * reliably track changes to them, because WAL only contains block references
+ * for the main fork.
+ */
+static bool
+isRelDataFile(const char *path)
+{
+       char            buf[20 + 1];
+       RelFileNode rnode;
+       unsigned int segNo;
+       int                     nmatch;
+       bool            matched;
+
+       /*----
+        * Relation data files can be in one of the following directories:
+        *
+        * global/
+        *              shared relations
+        *
+        * base/<db oid>/
+        *              regular relations, default tablespace
+        *
+        * pg_tblspc/<tblspc oid>/PG_9.4_201403261/
+        *              within a non-default tablespace (the name of the directory
+        *              depends on version)
+        *
+        * And the relation data files themselves have a filename like:
+        *
+        * <oid>.<segment number>
+        *
+        *----
+        */
+       rnode.spcNode = InvalidOid;
+       rnode.dbNode = InvalidOid;
+       rnode.relNode = InvalidOid;
+       segNo = 0;
+       matched = false;
+
+       nmatch = sscanf(path, "global/%u.%u", &rnode.relNode, &segNo);
+       if (nmatch == 1 || nmatch == 2)
+       {
+               rnode.spcNode = GLOBALTABLESPACE_OID;
+               rnode.dbNode = 0;
+               matched = true;
+       }
+       else
+       {
+               nmatch = sscanf(path, "base/%u/%u.%u",
+                                               &rnode.dbNode, &rnode.relNode, &segNo);
+               if (nmatch == 2 || nmatch == 3)
+               {
+                       rnode.spcNode = DEFAULTTABLESPACE_OID;
+                       matched = true;
+               }
+               else
+               {
+                       nmatch = sscanf(path, "pg_tblspc/%u/PG_%20s/%u/%u.%u",
+                                                 &rnode.spcNode, buf, &rnode.dbNode, &rnode.relNode,
+                                                       &segNo);
+                       if (nmatch == 4 || nmatch == 5)
+                               matched = true;
+               }
+       }
+
+       /*
+        * The sscanf tests above can match files that have extra characters at
+        * the end, and the last check can also match a path belonging to a
+        * different version (different TABLESPACE_VERSION_DIRECTORY). To make
+        * eliminate such cases, cross-check that GetRelationPath creates the
+        * exact same filename, when passed the RelFileNode information we
+        * extracted from the filename.
+        */
+       if (matched)
+       {
+               char       *check_path = datasegpath(rnode, MAIN_FORKNUM, segNo);
+
+               if (strcmp(check_path, path) != 0)
+                       matched = false;
+
+               pfree(check_path);
+       }
+
+       return matched;
+}
+
+/*
+ * A helper function to create the path of a relation file and segment.
+ *
+ * The returned path is palloc'd
+ */
+static char *
+datasegpath(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
+{
+       char       *path;
+       char       *segpath;
+
+       path = relpathperm(rnode, forknum);
+       if (segno > 0)
+       {
+               segpath = psprintf("%s.%u", path, segno);
+               pfree(path);
+               return segpath;
+       }
+       else
+               return path;
+}
+
+static int
+path_cmp(const void *a, const void *b)
+{
+       file_entry_t *fa = *((file_entry_t **) a);
+       file_entry_t *fb = *((file_entry_t **) b);
+
+       return strcmp(fa->path, fb->path);
+}
+
+/*
+ * In the final stage, the filemap is sorted so that removals come last.
+ * From disk space usage point of view, it would be better to do removals
+ * first, but for now, safety first. If a whole directory is deleted, all
+ * files and subdirectories inside it need to removed first. On creation,
+ * parent directory needs to be created before files and directories inside
+ * it. To achieve that, the file_action_t enum is ordered so that we can
+ * just sort on that first. Furthermore, sort REMOVE entries in reverse
+ * path order, so that "foo/bar" subdirectory is removed before "foo".
+ */
+static int
+final_filemap_cmp(const void *a, const void *b)
+{
+       file_entry_t *fa = *((file_entry_t **) a);
+       file_entry_t *fb = *((file_entry_t **) b);
+
+       if (fa->action > fb->action)
+               return 1;
+       if (fa->action < fb->action)
+               return -1;
+
+       if (fa->action == FILE_ACTION_REMOVE)
+               return -strcmp(fa->path, fb->path);
+       else
+               return strcmp(fa->path, fb->path);
+}
diff --git a/src/bin/pg_rewind/filemap.h b/src/bin/pg_rewind/filemap.h
new file mode 100644 (file)
index 0000000..57f0f92
--- /dev/null
@@ -0,0 +1,108 @@
+/*-------------------------------------------------------------------------
+ *
+ * filemap.h
+ *
+ * Copyright (c) 2013-2015, PostgreSQL Global Development Group
+ *-------------------------------------------------------------------------
+ */
+#ifndef FILEMAP_H
+#define FILEMAP_H
+
+#include "storage/relfilenode.h"
+#include "storage/block.h"
+
+#include "datapagemap.h"
+
+/*
+ * For every file found in the local or remote system, we have a file entry
+ * which says what we are going to do with the file. For relation files,
+ * there is also a page map, marking pages in the file that were changed
+ * locally.
+ *
+ * The enum values are sorted in the order we want actions to be processed.
+ */
+typedef enum
+{
+       FILE_ACTION_CREATE,             /* create local directory or symbolic link */
+       FILE_ACTION_COPY,               /* copy whole file, overwriting if exists */
+       FILE_ACTION_COPY_TAIL,  /* copy tail from 'oldsize' to 'newsize' */
+       FILE_ACTION_NONE,               /* no action (we might still copy modified blocks
+                                                        * based on the parsed WAL) */
+       FILE_ACTION_TRUNCATE,   /* truncate local file to 'newsize' bytes */
+       FILE_ACTION_REMOVE,             /* remove local file / directory / symlink */
+
+} file_action_t;
+
+typedef enum
+{
+       FILE_TYPE_REGULAR,
+       FILE_TYPE_DIRECTORY,
+       FILE_TYPE_SYMLINK
+} file_type_t;
+
+struct file_entry_t
+{
+       char       *path;
+       file_type_t type;
+
+       file_action_t action;
+
+       /* for a regular file */
+       size_t          oldsize;
+       size_t          newsize;
+       bool            isrelfile;              /* is it a relation data file? */
+
+       datapagemap_t   pagemap;
+
+       /* for a symlink */
+       char            *link_target;
+
+       struct file_entry_t *next;
+};
+
+typedef struct file_entry_t file_entry_t;
+
+struct filemap_t
+{
+       /*
+        * New entries are accumulated to a linked list, in process_remote_file
+        * and process_local_file.
+        */
+       file_entry_t *first;
+       file_entry_t *last;
+       int                     nlist;
+
+       /*
+        * After processing all the remote files, the entries in the linked list
+        * are moved to this array. After processing local files, too, all the
+        * local entries are added to the array by filemap_finalize, and sorted
+        * in the final order. After filemap_finalize, all the entries are in
+        * the array, and the linked list is empty.
+        */
+       file_entry_t **array;
+       int                     narray;
+
+       /*
+        * Summary information. total_size is the total size of the source cluster,
+        * and fetch_size is the number of bytes that needs to be copied.
+        */
+       uint64          total_size;
+       uint64          fetch_size;
+};
+
+typedef struct filemap_t filemap_t;
+
+extern filemap_t * filemap;
+
+extern filemap_t *filemap_create(void);
+
+extern void calculate_totals(void);
+extern void print_filemap(void);
+
+/* Functions for populating the filemap */
+extern void process_remote_file(const char *path, file_type_t type, size_t newsize, const char *link_target);
+extern void process_local_file(const char *path, file_type_t type, size_t newsize, const char *link_target);
+extern void process_block_change(ForkNumber forknum, RelFileNode rnode, BlockNumber blkno);
+extern void filemap_finalize(void);
+
+#endif   /* FILEMAP_H */
diff --git a/src/bin/pg_rewind/libpq_fetch.c b/src/bin/pg_rewind/libpq_fetch.c
new file mode 100644 (file)
index 0000000..0c9d46d
--- /dev/null
@@ -0,0 +1,464 @@
+/*-------------------------------------------------------------------------
+ *
+ * libpq_fetch.c
+ *       Functions for fetching files from a remote server.
+ *
+ * Copyright (c) 2013-2015, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+/* for ntohl/htonl */
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "pg_rewind.h"
+#include "datapagemap.h"
+#include "fetch.h"
+#include "file_ops.h"
+#include "filemap.h"
+#include "logging.h"
+
+#include "libpq-fe.h"
+#include "catalog/catalog.h"
+#include "catalog/pg_type.h"
+
+static PGconn *conn = NULL;
+
+/*
+ * Files are fetched max CHUNKSIZE bytes at a time.
+ *
+ * (This only applies to files that are copied in whole, or for truncated
+ * files where we copy the tail. Relation files, where we know the individual
+ * blocks that need to be fetched, are fetched in BLCKSZ chunks.)
+ */
+#define CHUNKSIZE 1000000
+
+static void receiveFileChunks(const char *sql);
+static void execute_pagemap(datapagemap_t *pagemap, const char *path);
+static char *run_simple_query(const char *sql);
+
+void
+libpqConnect(const char *connstr)
+{
+       char       *str;
+
+       conn = PQconnectdb(connstr);
+       if (PQstatus(conn) == CONNECTION_BAD)
+               pg_fatal("could not connect to remote server: %s\n",
+                                PQerrorMessage(conn));
+
+       pg_log(PG_PROGRESS, "connected to remote server\n");
+
+       /*
+        * Check that the server is not in hot standby mode. There is no
+        * fundamental reason that couldn't be made to work, but it doesn't
+        * currently because we use a temporary table. Better to check for it
+        * explicitly than error out, for a better error message.
+        */
+       str = run_simple_query("SELECT pg_is_in_recovery()");
+       if (strcmp(str, "f") != 0)
+               pg_fatal("source server must not be in recovery mode\n");
+       pg_free(str);
+
+       /*
+        * Also check that full_page-writes are enabled. We can get torn pages if
+        * a page is modified while we read it with pg_read_binary_file(), and we
+        * rely on full page images to fix them.
+        */
+       str = run_simple_query("SHOW full_page_writes");
+       if (strcmp(str, "on") != 0)
+               pg_fatal("full_page_writes must be enabled in the source server\n");
+       pg_free(str);
+}
+
+/*
+ * Runs a query that returns a single value.
+ */
+static char *
+run_simple_query(const char *sql)
+{
+       PGresult   *res;
+       char       *result;
+
+       res = PQexec(conn, sql);
+
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+               pg_fatal("error running query (%s) in source server: %s\n",
+                                sql, PQresultErrorMessage(res));
+
+       /* sanity check the result set */
+       if (PQnfields(res) != 1 || PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
+               pg_fatal("unexpected result set while running query\n");
+
+       result = pg_strdup(PQgetvalue(res, 0, 0));
+
+       PQclear(res);
+
+       return result;
+}
+
+/*
+ * Calls pg_current_xlog_insert_location() function
+ */
+XLogRecPtr
+libpqGetCurrentXlogInsertLocation(void)
+{
+       XLogRecPtr      result;
+       uint32          hi;
+       uint32          lo;
+       char       *val;
+
+       val = run_simple_query("SELECT pg_current_xlog_insert_location()");
+
+       if (sscanf(val, "%X/%X", &hi, &lo) != 2)
+               pg_fatal("unexpected result \"%s\" while fetching current XLOG insert location\n", val);
+
+       result = ((uint64) hi) << 32 | lo;
+
+       return result;
+}
+
+/*
+ * Get a list of all files in the data directory.
+ */
+void
+libpqProcessFileList(void)
+{
+       PGresult   *res;
+       const char *sql;
+       int                     i;
+
+       /*
+        * Create a recursive directory listing of the whole data directory.
+        *
+        * The WITH RECURSIVE part does most of the work. The second part gets the
+        * targets of the symlinks in pg_tblspc directory.
+        *
+        * XXX: There is no backend function to get a symbolic link's target in
+        * general, so if the admin has put any custom symbolic links in the data
+        * directory, they won't be copied correctly.
+        */
+       sql =
+               "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
+               "  SELECT '' AS path, filename, size, isdir FROM\n"
+               "  (SELECT pg_ls_dir('.') AS filename) AS fn,\n"
+               "        pg_stat_file(fn.filename) AS this\n"
+               "  UNION ALL\n"
+               "  SELECT parent.path || parent.filename || '/' AS path,\n"
+               "         fn, this.size, this.isdir\n"
+               "  FROM files AS parent,\n"
+               "       pg_ls_dir(parent.path || parent.filename) AS fn,\n"
+               "       pg_stat_file(parent.path || parent.filename || '/' || fn) AS this\n"
+               "       WHERE parent.isdir = 't'\n"
+               ")\n"
+               "SELECT path || filename, size, isdir,\n"
+               "       pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
+               "FROM files\n"
+               "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
+               "                             AND oid::text = files.filename\n";
+       res = PQexec(conn, sql);
+
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+               pg_fatal("unexpected result while fetching file list: %s\n",
+                                PQresultErrorMessage(res));
+
+       /* sanity check the result set */
+       if (PQnfields(res) != 4)
+               pg_fatal("unexpected result set while fetching file list\n");
+
+       /* Read result to local variables */
+       for (i = 0; i < PQntuples(res); i++)
+       {
+               char       *path = PQgetvalue(res, i, 0);
+               int                     filesize = atoi(PQgetvalue(res, i, 1));
+               bool            isdir = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
+               char       *link_target = PQgetvalue(res, i, 3);
+               file_type_t type;
+
+               if (link_target[0])
+                       type = FILE_TYPE_SYMLINK;
+               else if (isdir)
+                       type = FILE_TYPE_DIRECTORY;
+               else
+                       type = FILE_TYPE_REGULAR;
+
+               process_remote_file(path, type, filesize, link_target);
+       }
+}
+
+/*----
+ * Runs a query, which returns pieces of files from the remote source data
+ * directory, and overwrites the corresponding parts of target files with
+ * the received parts. The result set is expected to be of format:
+ *
+ * path                text    -- path in the data directory, e.g "base/1/123"
+ * begin       int4    -- offset within the file
+ * chunk       bytea   -- file content
+ *----
+ */
+static void
+receiveFileChunks(const char *sql)
+{
+       PGresult   *res;
+
+       if (PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
+               pg_fatal("could not send query: %s\n", PQerrorMessage(conn));
+
+       pg_log(PG_DEBUG, "getting file chunks");
+
+       if (PQsetSingleRowMode(conn) != 1)
+               pg_fatal("could not set libpq connection to single row mode\n");
+
+       while ((res = PQgetResult(conn)) != NULL)
+       {
+               char       *filename;
+               int                     filenamelen;
+               int                     chunkoff;
+               int                     chunksize;
+               char       *chunk;
+
+               switch (PQresultStatus(res))
+               {
+                       case PGRES_SINGLE_TUPLE:
+                               break;
+
+                       case PGRES_TUPLES_OK:
+                               continue;               /* final zero-row result */
+
+                       default:
+                               pg_fatal("unexpected result while fetching remote files: %s\n",
+                                                PQresultErrorMessage(res));
+               }
+
+               /* sanity check the result set */
+               if (PQnfields(res) != 3 || PQntuples(res) != 1)
+                       pg_fatal("unexpected result set size while fetching remote files\n");
+
+               if (PQftype(res, 0) != TEXTOID &&
+                       PQftype(res, 1) != INT4OID &&
+                       PQftype(res, 2) != BYTEAOID)
+               {
+                       pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u\n",
+                                        PQftype(res, 0), PQftype(res, 1), PQftype(res, 2));
+               }
+
+               if (PQfformat(res, 0) != 1 &&
+                       PQfformat(res, 1) != 1 &&
+                       PQfformat(res, 2) != 1)
+               {
+                       pg_fatal("unexpected result format while fetching remote files\n");
+               }
+
+               if (PQgetisnull(res, 0, 0) ||
+                       PQgetisnull(res, 0, 1) ||
+                       PQgetisnull(res, 0, 2))
+               {
+                       pg_fatal("unexpected NULL result while fetching remote files\n");
+               }
+
+               if (PQgetlength(res, 0, 1) != sizeof(int32))
+                       pg_fatal("unexpected result length while fetching remote files\n");
+
+               /* Read result set to local variables */
+               memcpy(&chunkoff, PQgetvalue(res, 0, 1), sizeof(int32));
+               chunkoff = ntohl(chunkoff);
+               chunksize = PQgetlength(res, 0, 2);
+
+               filenamelen = PQgetlength(res, 0, 0);
+               filename = pg_malloc(filenamelen + 1);
+               memcpy(filename, PQgetvalue(res, 0, 0), filenamelen);
+               filename[filenamelen] = '\0';
+
+               chunk = PQgetvalue(res, 0, 2);
+
+               pg_log(PG_DEBUG, "received chunk for file \"%s\", off %d, len %d\n",
+                          filename, chunkoff, chunksize);
+
+               open_target_file(filename, false);
+
+               write_target_range(chunk, chunkoff, chunksize);
+       }
+}
+
+/*
+ * Receive a single file as a malloc'd buffer.
+ */
+char *
+libpqGetFile(const char *filename, size_t *filesize)
+{
+       PGresult   *res;
+       char       *result;
+       int                     len;
+       const char *paramValues[1];
+
+       paramValues[0] = filename;
+       res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
+                                          1, NULL, paramValues, NULL, NULL, 1);
+
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+               pg_fatal("unexpected result while fetching remote file \"%s\": %s\n",
+                                filename, PQresultErrorMessage(res));
+
+       /* sanity check the result set */
+       if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
+               pg_fatal("unexpected result set while fetching remote file \"%s\"\n",
+                                filename);
+
+       /* Read result to local variables */
+       len = PQgetlength(res, 0, 0);
+       result = pg_malloc(len + 1);
+       memcpy(result, PQgetvalue(res, 0, 0), len);
+       result[len] = '\0';
+
+       pg_log(PG_DEBUG, "fetched file \"%s\", length %d\n", filename, len);
+
+       if (filesize)
+               *filesize = len;
+       return result;
+}
+
+/*
+ * Write a file range to a temporary table in the server.
+ *
+ * The range is sent to the server as a COPY formatted line, to be inserted
+ * into the 'fetchchunks' temporary table. It is used in receiveFileChunks()
+ * function to actually fetch the data.
+ */
+static void
+fetch_file_range(const char *path, unsigned int begin, unsigned int end)
+{
+       char            linebuf[MAXPGPATH + 23];
+
+       /* Split the range into CHUNKSIZE chunks */
+       while (end - begin > 0)
+       {
+               unsigned int len;
+
+               if (end - begin > CHUNKSIZE)
+                       len = CHUNKSIZE;
+               else
+                       len = end - begin;
+
+               snprintf(linebuf, sizeof(linebuf), "%s\t%u\t%u\n", path, begin, len);
+
+               if (PQputCopyData(conn, linebuf, strlen(linebuf)) != 1)
+                       pg_fatal("error sending COPY data: %s\n",
+                                        PQerrorMessage(conn));
+
+               begin += len;
+       }
+}
+
+/*
+ * Fetch all changed blocks from remote source data directory.
+ */
+void
+libpq_executeFileMap(filemap_t *map)
+{
+       file_entry_t *entry;
+       const char *sql;
+       PGresult   *res;
+       int                     i;
+
+       /*
+        * First create a temporary table, and load it with the blocks that we
+        * need to fetch.
+        */
+       sql = "CREATE TEMPORARY TABLE fetchchunks(path text, begin int4, len int4);";
+       res = PQexec(conn, sql);
+
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pg_fatal("error creating temporary table: %s\n",
+                                PQresultErrorMessage(res));
+
+       sql = "COPY fetchchunks FROM STDIN";
+       res = PQexec(conn, sql);
+
+       if (PQresultStatus(res) != PGRES_COPY_IN)
+               pg_fatal("unexpected result while sending file list: %s\n",
+                                PQresultErrorMessage(res));
+
+       for (i = 0; i < map->narray; i++)
+       {
+               entry = map->array[i];
+
+               /* If this is a relation file, copy the modified blocks */
+               execute_pagemap(&entry->pagemap, entry->path);
+
+               switch (entry->action)
+               {
+                       case FILE_ACTION_NONE:
+                               /* nothing else to do */
+                               break;
+
+                       case FILE_ACTION_COPY:
+                               /* Truncate the old file out of the way, if any */
+                               open_target_file(entry->path, true);
+                               fetch_file_range(entry->path, 0, entry->newsize);
+                               break;
+
+                       case FILE_ACTION_TRUNCATE:
+                               truncate_target_file(entry->path, entry->newsize);
+                               break;
+
+                       case FILE_ACTION_COPY_TAIL:
+                               fetch_file_range(entry->path, entry->oldsize, entry->newsize);
+                               break;
+
+                       case FILE_ACTION_REMOVE:
+                               remove_target(entry);
+                               break;
+
+                       case FILE_ACTION_CREATE:
+                               create_target(entry);
+                               break;
+               }
+       }
+
+       if (PQputCopyEnd(conn, NULL) != 1)
+               pg_fatal("error sending end-of-COPY: %s\n",
+                                PQerrorMessage(conn));
+
+       while ((res = PQgetResult(conn)) != NULL)
+       {
+               if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                       pg_fatal("unexpected result while sending file list: %s\n",
+                                        PQresultErrorMessage(res));
+       }
+
+       /*
+        * We've now copied the list of file ranges that we need to fetch to the
+        * temporary table. Now, actually fetch all of those ranges.
+        */
+       sql =
+               "SELECT path, begin, \n"
+               "  pg_read_binary_file(path, begin, len) AS chunk\n"
+               "FROM fetchchunks\n";
+
+       receiveFileChunks(sql);
+}
+
+static void
+execute_pagemap(datapagemap_t *pagemap, const char *path)
+{
+       datapagemap_iterator_t *iter;
+       BlockNumber blkno;
+       off_t           offset;
+
+       iter = datapagemap_iterate(pagemap);
+       while (datapagemap_next(iter, &blkno))
+       {
+               offset = blkno * BLCKSZ;
+
+               fetch_file_range(path, offset, offset + BLCKSZ);
+       }
+       free(iter);
+}
diff --git a/src/bin/pg_rewind/logging.c b/src/bin/pg_rewind/logging.c
new file mode 100644 (file)
index 0000000..aba12d8
--- /dev/null
@@ -0,0 +1,140 @@
+/*-------------------------------------------------------------------------
+ *
+ * logging.c
+ *      logging functions
+ *
+ *     Copyright (c) 2010-2015, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include <unistd.h>
+#include <time.h>
+
+#include "pg_rewind.h"
+#include "logging.h"
+
+#include "pgtime.h"
+
+/* Progress counters */
+uint64         fetch_size;
+uint64         fetch_done;
+
+static pg_time_t last_progress_report = 0;
+
+#define QUERY_ALLOC                    8192
+
+#define MESSAGE_WIDTH          60
+
+static
+pg_attribute_printf(2, 0)
+void
+pg_log_v(eLogType type, const char *fmt, va_list ap)
+{
+       char            message[QUERY_ALLOC];
+
+       vsnprintf(message, sizeof(message), fmt, ap);
+
+       switch (type)
+       {
+               case PG_DEBUG:
+                       if (debug)
+                               printf("%s", _(message));
+                       break;
+
+               case PG_PROGRESS:
+                       if (showprogress)
+                               printf("%s", _(message));
+                       break;
+
+               case PG_WARNING:
+                       printf("%s", _(message));
+                       break;
+
+               case PG_FATAL:
+                       printf("\n%s", _(message));
+                       printf("%s", _("Failure, exiting\n"));
+                       exit(1);
+                       break;
+
+               default:
+                       break;
+       }
+       fflush(stdout);
+}
+
+
+void
+pg_log(eLogType type, const char *fmt,...)
+{
+       va_list         args;
+
+       va_start(args, fmt);
+       pg_log_v(type, fmt, args);
+       va_end(args);
+}
+
+
+void
+pg_fatal(const char *fmt,...)
+{
+       va_list         args;
+
+       va_start(args, fmt);
+       pg_log_v(PG_FATAL, fmt, args);
+       va_end(args);
+       /* should not get here, pg_log_v() exited already */
+       exit(1);
+}
+
+
+/*
+ * Print a progress report based on the global variables.
+ *
+ * Progress report is written at maximum once per second, unless the
+ * force parameter is set to true.
+ */
+void
+progress_report(bool force)
+{
+       int                     percent;
+       char            fetch_done_str[32];
+       char            fetch_size_str[32];
+       pg_time_t       now;
+
+       if (!showprogress)
+               return;
+
+       now = time(NULL);
+       if (now == last_progress_report && !force)
+               return;                                 /* Max once per second */
+
+       last_progress_report = now;
+       percent = fetch_size ? (int) ((fetch_done) * 100 / fetch_size) : 0;
+
+       /*
+        * Avoid overflowing past 100% or the full size. This may make the total
+        * size number change as we approach the end of the backup (the estimate
+        * will always be wrong if WAL is included), but that's better than having
+        * the done column be bigger than the total.
+        */
+       if (percent > 100)
+               percent = 100;
+       if (fetch_done > fetch_size)
+               fetch_size = fetch_done;
+
+       /*
+        * Separate step to keep platform-dependent format code out of
+        * translatable strings.  And we only test for INT64_FORMAT availability
+        * in snprintf, not fprintf.
+        */
+       snprintf(fetch_done_str, sizeof(fetch_done_str), INT64_FORMAT,
+                        fetch_done / 1024);
+       snprintf(fetch_size_str, sizeof(fetch_size_str), INT64_FORMAT,
+                        fetch_size / 1024);
+
+       pg_log(PG_PROGRESS, "%*s/%s kB (%d%%) copied\r",
+                  (int) strlen(fetch_size_str), fetch_done_str, fetch_size_str,
+                  percent);
+}
diff --git a/src/bin/pg_rewind/logging.h b/src/bin/pg_rewind/logging.h
new file mode 100644 (file)
index 0000000..e089ac7
--- /dev/null
@@ -0,0 +1,37 @@
+/*-------------------------------------------------------------------------
+ *
+ * logging.h
+ *      prototypes for logging functions
+ *
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_REWIND_LOGGING_H
+#define PG_REWIND_LOGGING_H
+
+/* progress counters */
+extern uint64 fetch_size;
+extern uint64 fetch_done;
+
+/*
+ * Enumeration to denote pg_log modes
+ */
+typedef enum
+{
+       PG_DEBUG,
+       PG_PROGRESS,
+       PG_WARNING,
+       PG_FATAL
+} eLogType;
+
+extern void pg_log(eLogType type, const char *fmt,...)
+pg_attribute_printf(2, 3);
+extern void pg_fatal(const char *fmt,...)
+pg_attribute_printf(1, 2) pg_attribute_noreturn;
+
+extern void progress_report(bool force);
+
+#endif
diff --git a/src/bin/pg_rewind/nls.mk b/src/bin/pg_rewind/nls.mk
new file mode 100644 (file)
index 0000000..e43f3b9
--- /dev/null
@@ -0,0 +1,9 @@
+# src/bin/pg_rewind/nls.mk
+CATALOG_NAME     = pg_rewind
+AVAIL_LANGUAGES  =
+GETTEXT_FILES    = copy_fetch.c datapagemap.c fetch.c filemap.c libpq_fetch.c logging.c parsexlog.c pg_rewind.c timeline.c ../../common/fe_memutils.c ../../../src/backend/access/transam/xlogreader.c
+
+GETTEXT_TRIGGERS = pg_log pg_fatal report_invalid_record:2
+GETTEXT_FLAGS    = pg_log:2:c-format \
+    pg_fatal:1:c-format \
+    report_invalid_record:2:c-format
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
new file mode 100644 (file)
index 0000000..0787ca1
--- /dev/null
@@ -0,0 +1,374 @@
+/*-------------------------------------------------------------------------
+ *
+ * parsexlog.c
+ *       Functions for reading Write-Ahead-Log
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <unistd.h>
+
+#include "pg_rewind.h"
+#include "filemap.h"
+#include "logging.h"
+
+#include "access/rmgr.h"
+#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
+#include "catalog/pg_control.h"
+#include "catalog/storage_xlog.h"
+#include "commands/dbcommands_xlog.h"
+
+
+/*
+ * RmgrNames is an array of resource manager names, to make error messages
+ * a bit nicer.
+ */
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup) \
+  name,
+
+static const char *RmgrNames[RM_MAX_ID + 1] = {
+#include "access/rmgrlist.h"
+};
+
+static void extractPageInfo(XLogReaderState *record);
+
+static int     xlogreadfd = -1;
+static XLogSegNo xlogreadsegno = -1;
+static char xlogfpath[MAXPGPATH];
+
+typedef struct XLogPageReadPrivate
+{
+       const char *datadir;
+       TimeLineID      tli;
+} XLogPageReadPrivate;
+
+static int SimpleXLogPageRead(XLogReaderState *xlogreader,
+                                  XLogRecPtr targetPagePtr,
+                                  int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
+                                  TimeLineID *pageTLI);
+
+/*
+ * Read WAL from the datadir/pg_xlog, starting from 'startpoint' on timeline
+ * 'tli', until 'endpoint'. Make note of the data blocks touched by the WAL
+ * records, and return them in a page map.
+ */
+void
+extractPageMap(const char *datadir, XLogRecPtr startpoint, TimeLineID tli,
+                          XLogRecPtr endpoint)
+{
+       XLogRecord *record;
+       XLogReaderState *xlogreader;
+       char       *errormsg;
+       XLogPageReadPrivate private;
+
+       private.datadir = datadir;
+       private.tli = tli;
+       xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &private);
+
+       do
+       {
+               record = XLogReadRecord(xlogreader, startpoint, &errormsg);
+
+               if (record == NULL)
+               {
+                       XLogRecPtr      errptr;
+
+                       errptr = startpoint ? startpoint : xlogreader->EndRecPtr;
+
+                       if (errormsg)
+                               pg_fatal("error reading WAL at %X/%X: %s\n",
+                                                (uint32) (errptr >> 32), (uint32) (errptr),
+                                                errormsg);
+                       else
+                               pg_fatal("error reading WAL at %X/%X\n",
+                                                (uint32) (startpoint >> 32),
+                                                (uint32) (startpoint));
+               }
+
+               extractPageInfo(xlogreader);
+
+               startpoint = InvalidXLogRecPtr; /* continue reading at next record */
+
+       } while (xlogreader->ReadRecPtr != endpoint);
+
+       XLogReaderFree(xlogreader);
+       if (xlogreadfd != -1)
+       {
+               close(xlogreadfd);
+               xlogreadfd = -1;
+       }
+}
+
+/*
+ * Reads one WAL record. Returns the end position of the record, without
+ * doing anything with the record itself.
+ */
+XLogRecPtr
+readOneRecord(const char *datadir, XLogRecPtr ptr, TimeLineID tli)
+{
+       XLogRecord *record;
+       XLogReaderState *xlogreader;
+       char       *errormsg;
+       XLogPageReadPrivate private;
+       XLogRecPtr      endptr;
+
+       private.datadir = datadir;
+       private.tli = tli;
+       xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &private);
+
+       record = XLogReadRecord(xlogreader, ptr, &errormsg);
+       if (record == NULL)
+       {
+               if (errormsg)
+                       pg_fatal("could not read WAL record at %X/%X: %s\n",
+                                        (uint32) (ptr >> 32), (uint32) (ptr), errormsg);
+               else
+                       pg_fatal("could not read WAL record at %X/%X\n",
+                                        (uint32) (ptr >> 32), (uint32) (ptr));
+       }
+       endptr = xlogreader->EndRecPtr;
+
+       XLogReaderFree(xlogreader);
+       if (xlogreadfd != -1)
+       {
+               close(xlogreadfd);
+               xlogreadfd = -1;
+       }
+
+       return endptr;
+}
+
+/*
+ * Find the previous checkpoint preceding given WAL position.
+ */
+void
+findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, TimeLineID tli,
+                                  XLogRecPtr *lastchkptrec, TimeLineID *lastchkpttli,
+                                  XLogRecPtr *lastchkptredo)
+{
+       /* Walk backwards, starting from the given record */
+       XLogRecord *record;
+       XLogRecPtr      searchptr;
+       XLogReaderState *xlogreader;
+       char       *errormsg;
+       XLogPageReadPrivate private;
+
+       /*
+        * The given fork pointer points to the end of the last common record,
+        * which is not necessarily the beginning of the next record, if the
+        * previous record happens to end at a page boundary. Skip over the page
+        * header in that case to find the next record.
+        */
+       if (forkptr % XLOG_BLCKSZ == 0)
+               forkptr += (forkptr % XLogSegSize == 0) ? SizeOfXLogLongPHD : SizeOfXLogShortPHD;
+
+       private.datadir = datadir;
+       private.tli = tli;
+       xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &private);
+
+       searchptr = forkptr;
+       for (;;)
+       {
+               uint8           info;
+
+               record = XLogReadRecord(xlogreader, searchptr, &errormsg);
+
+               if (record == NULL)
+               {
+                       if (errormsg)
+                               pg_fatal("could not find previous WAL record at %X/%X: %s\n",
+                                                (uint32) (searchptr >> 32), (uint32) (searchptr),
+                                                errormsg);
+                       else
+                               pg_fatal("could not find previous WAL record at %X/%X\n",
+                                                (uint32) (searchptr >> 32), (uint32) (searchptr));
+               }
+
+               /*
+                * Check if it is a checkpoint record. This checkpoint record needs to
+                * be the latest checkpoint before WAL forked and not the checkpoint
+                * where the master has been stopped to be rewinded.
+                */
+               info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
+               if (searchptr < forkptr &&
+                       XLogRecGetRmid(xlogreader) == RM_XLOG_ID &&
+                       (info == XLOG_CHECKPOINT_SHUTDOWN ||
+                        info == XLOG_CHECKPOINT_ONLINE))
+               {
+                       CheckPoint      checkPoint;
+
+                       memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
+                       *lastchkptrec = searchptr;
+                       *lastchkpttli = checkPoint.ThisTimeLineID;
+                       *lastchkptredo = checkPoint.redo;
+                       break;
+               }
+
+               /* Walk backwards to previous record. */
+               searchptr = record->xl_prev;
+       }
+
+       XLogReaderFree(xlogreader);
+       if (xlogreadfd != -1)
+       {
+               close(xlogreadfd);
+               xlogreadfd = -1;
+       }
+}
+
+/* XLogreader callback function, to read a WAL page */
+int
+SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
+                                  int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
+                                  TimeLineID *pageTLI)
+{
+       XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
+       uint32          targetPageOff;
+       XLogSegNo       targetSegNo PG_USED_FOR_ASSERTS_ONLY;
+
+       XLByteToSeg(targetPagePtr, targetSegNo);
+       targetPageOff = targetPagePtr % XLogSegSize;
+
+       /*
+        * See if we need to switch to a new segment because the requested record
+        * is not in the currently open one.
+        */
+       if (xlogreadfd >= 0 && !XLByteInSeg(targetPagePtr, xlogreadsegno))
+       {
+               close(xlogreadfd);
+               xlogreadfd = -1;
+       }
+
+       XLByteToSeg(targetPagePtr, xlogreadsegno);
+
+       if (xlogreadfd < 0)
+       {
+               char            xlogfname[MAXFNAMELEN];
+
+               XLogFileName(xlogfname, private->tli, xlogreadsegno);
+
+               snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", private->datadir, xlogfname);
+
+               xlogreadfd = open(xlogfpath, O_RDONLY | PG_BINARY, 0);
+
+               if (xlogreadfd < 0)
+               {
+                       printf(_("could not open file \"%s\": %s\n"), xlogfpath,
+                                  strerror(errno));
+                       return -1;
+               }
+       }
+
+       /*
+        * At this point, we have the right segment open.
+        */
+       Assert(xlogreadfd != -1);
+
+       /* Read the requested page */
+       if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0)
+       {
+               printf(_("could not seek in file \"%s\": %s\n"), xlogfpath,
+                          strerror(errno));
+               return -1;
+       }
+
+       if (read(xlogreadfd, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+       {
+               printf(_("could not read from file \"%s\": %s\n"), xlogfpath,
+                          strerror(errno));
+               return -1;
+       }
+
+       Assert(targetSegNo == xlogreadsegno);
+
+       *pageTLI = private->tli;
+       return XLOG_BLCKSZ;
+}
+
+/*
+ * Extract information on which blocks the current record modifies.
+ */
+static void
+extractPageInfo(XLogReaderState *record)
+{
+       int                     block_id;
+       RmgrId          rmid = XLogRecGetRmid(record);
+       uint8           info = XLogRecGetInfo(record);
+       uint8           rminfo = info & ~XLR_INFO_MASK;
+
+       /* Is this a special record type that I recognize? */
+
+       if (rmid == RM_DBASE_ID && rminfo == XLOG_DBASE_CREATE)
+       {
+               /*
+                * New databases can be safely ignored. It won't be present in the
+                * remote system, so it will be copied in toto. There's one
+                * corner-case, though: if a new, different, database is also created
+                * in the remote system, we'll see that the files already exist and
+                * not copy them. That's OK, though; WAL replay of creating the new
+                * database, from the remote WAL, will re-copy the new database,
+                * overwriting the database created in the local system.
+                */
+       }
+       else if (rmid == RM_DBASE_ID && rminfo == XLOG_DBASE_DROP)
+       {
+               /*
+                * An existing database was dropped. We'll see that the files don't
+                * exist in local system, and copy them in toto from the remote
+                * system. No need to do anything special here.
+                */
+       }
+       else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_CREATE)
+       {
+               /*
+                * We can safely ignore these. The local file will be removed, if it
+                * doesn't exist in remote system. If a file with same name is created
+                * in remote system, too, there will be WAL records for all the blocks
+                * in it.
+                */
+       }
+       else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_TRUNCATE)
+       {
+               /*
+                * We can safely ignore these. If a file is truncated locally, we'll
+                * notice that when we compare the sizes, and will copy the missing
+                * tail from remote system.
+                *
+                * TODO: But it would be nice to do some sanity cross-checking here..
+                */
+       }
+       else if (info & XLR_SPECIAL_REL_UPDATE)
+       {
+               /*
+                * This record type modifies a relation file in some special way, but
+                * we don't recognize the type. That's bad - we don't know how to
+                * track that change.
+                */
+               pg_fatal("WAL record modifies a relation, but record type is not recognized\n"
+                                "lsn: %X/%X, rmgr: %s, info: %02X\n",
+                 (uint32) (record->ReadRecPtr >> 32), (uint32) (record->ReadRecPtr),
+                                RmgrNames[rmid], info);
+       }
+
+       for (block_id = 0; block_id <= record->max_block_id; block_id++)
+       {
+               RelFileNode rnode;
+               ForkNumber      forknum;
+               BlockNumber blkno;
+
+               if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
+                       continue;
+
+               /* We only care about the main fork; others are copied in toto */
+               if (forknum != MAIN_FORKNUM)
+                       continue;
+
+               process_block_change(forknum, rnode, blkno);
+       }
+}
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
new file mode 100644 (file)
index 0000000..6d458b0
--- /dev/null
@@ -0,0 +1,550 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_rewind.c
+ *       Synchronizes an old master server to a new timeline
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <time.h>
+#include <unistd.h>
+
+#include "pg_rewind.h"
+#include "fetch.h"
+#include "file_ops.h"
+#include "filemap.h"
+#include "logging.h"
+
+#include "access/timeline.h"
+#include "access/xlog_internal.h"
+#include "catalog/catversion.h"
+#include "catalog/pg_control.h"
+#include "getopt_long.h"
+#include "storage/bufpage.h"
+
+static void usage(const char *progname);
+
+static void createBackupLabel(XLogRecPtr startpoint, TimeLineID starttli,
+                                 XLogRecPtr checkpointloc);
+
+static void digestControlFile(ControlFileData *ControlFile, char *source,
+                                 size_t size);
+static void updateControlFile(ControlFileData *ControlFile);
+static void sanityChecks(void);
+static void findCommonAncestorTimeline(XLogRecPtr *recptr, TimeLineID *tli);
+
+static ControlFileData ControlFile_target;
+static ControlFileData ControlFile_source;
+
+const char *progname;
+
+/* Configuration options */
+char      *datadir_target = NULL;
+char      *datadir_source = NULL;
+char      *connstr_source = NULL;
+
+bool           debug = false;
+bool           showprogress = false;
+bool           dry_run = false;
+
+static void
+usage(const char *progname)
+{
+       printf(_("%s resynchronizes a cluster with another copy of the cluster.\n\n"), progname);
+       printf(_("Usage:\n  %s [OPTION]...\n\n"), progname);
+       printf(_("Options:\n"));
+       printf(_("  -D, --target-pgdata=DIRECTORY\n"));
+       printf(_("                 existing data directory to modify\n"));
+       printf(_("  --source-pgdata=DIRECTORY\n"));
+       printf(_("                 source data directory to sync with\n"));
+       printf(_("  --source-server=CONNSTR\n"));
+       printf(_("                 source server to sync with\n"));
+       printf(_("  -P, --progress write progress messages\n"));
+       printf(_("  -n, --dry-run  stop before modifying anything\n"));
+       printf(_("  --debug        write a lot of debug messages\n"));
+       printf(_("  -V, --version  output version information, then exit\n"));
+       printf(_("  -?, --help     show this help, then exit\n"));
+       printf(_("\n"));
+       printf(_("Report bugs to <pgsql-bugs@postgresql.org>.\n"));
+}
+
+
+int
+main(int argc, char **argv)
+{
+       static struct option long_options[] = {
+               {"help", no_argument, NULL, '?'},
+               {"target-pgdata", required_argument, NULL, 'D'},
+               {"source-pgdata", required_argument, NULL, 1},
+               {"source-server", required_argument, NULL, 2},
+               {"version", no_argument, NULL, 'V'},
+               {"dry-run", no_argument, NULL, 'n'},
+               {"progress", no_argument, NULL, 'P'},
+               {"debug", no_argument, NULL, 3},
+               {NULL, 0, NULL, 0}
+       };
+       int                     option_index;
+       int                     c;
+       XLogRecPtr      divergerec;
+       TimeLineID      lastcommontli;
+       XLogRecPtr      chkptrec;
+       TimeLineID      chkpttli;
+       XLogRecPtr      chkptredo;
+       size_t          size;
+       char       *buffer;
+       bool            rewind_needed;
+       XLogRecPtr      endrec;
+       TimeLineID      endtli;
+       ControlFileData ControlFile_new;
+
+       progname = get_progname(argv[0]);
+
+       /* Process command-line arguments */
+       if (argc > 1)
+       {
+               if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+               {
+                       usage(progname);
+                       exit(0);
+               }
+               if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
+               {
+                       puts("pg_rewind (PostgreSQL) " PG_VERSION);
+                       exit(0);
+               }
+       }
+
+       while ((c = getopt_long(argc, argv, "D:NnP", long_options, &option_index)) != -1)
+       {
+               switch (c)
+               {
+                       case '?':
+                               fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
+                               exit(1);
+
+                       case 'P':
+                               showprogress = true;
+                               break;
+
+                       case 'n':
+                               dry_run = true;
+                               break;
+
+                       case 3:
+                               debug = true;
+                               break;
+
+                       case 'D':                       /* -D or --target-pgdata */
+                               datadir_target = pg_strdup(optarg);
+                               break;
+
+                       case 1:                         /* --source-pgdata */
+                               datadir_source = pg_strdup(optarg);
+                               break;
+                       case 2:                         /* --source-server */
+                               connstr_source = pg_strdup(optarg);
+                               break;
+               }
+       }
+
+       /* No source given? Show usage */
+       if (datadir_source == NULL && connstr_source == NULL)
+       {
+               pg_fatal("no source specified (--source-pgdata or --source-server)\n");
+               pg_fatal("Try \"%s --help\" for more information.\n", progname);
+               exit(1);
+       }
+
+       if (datadir_target == NULL)
+       {
+               pg_fatal("no target data directory specified (--target-pgdata)\n");
+               fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
+               exit(1);
+       }
+
+       if (argc != optind)
+       {
+               pg_fatal("%s: invalid arguments\n", progname);
+               fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
+               exit(1);
+       }
+
+       /* Connect to remote server */
+       if (connstr_source)
+               libpqConnect(connstr_source);
+
+       /*
+        * Ok, we have all the options and we're ready to start. Read in all the
+        * information we need from both clusters.
+        */
+       buffer = slurpFile(datadir_target, "global/pg_control", &size);
+       digestControlFile(&ControlFile_target, buffer, size);
+       pg_free(buffer);
+
+       buffer = fetchFile("global/pg_control", &size);
+       digestControlFile(&ControlFile_source, buffer, size);
+       pg_free(buffer);
+
+       sanityChecks();
+
+       /*
+        * If both clusters are already on the same timeline, there's nothing to
+        * do.
+        */
+       if (ControlFile_target.checkPointCopy.ThisTimeLineID == ControlFile_source.checkPointCopy.ThisTimeLineID)
+               pg_fatal("source and target cluster are on the same timeline\n");
+
+       findCommonAncestorTimeline(&divergerec, &lastcommontli);
+       printf(_("The servers diverged at WAL position %X/%X on timeline %u.\n"),
+                  (uint32) (divergerec >> 32), (uint32) divergerec, lastcommontli);
+
+       /*
+        * Check for the possibility that the target is in fact a direct ancestor
+        * of the source. In that case, there is no divergent history in the
+        * target that needs rewinding.
+        */
+       if (ControlFile_target.checkPoint >= divergerec)
+       {
+               rewind_needed = true;
+       }
+       else
+       {
+               XLogRecPtr      chkptendrec;
+
+               /* Read the checkpoint record on the target to see where it ends. */
+               chkptendrec = readOneRecord(datadir_target,
+                                                                       ControlFile_target.checkPoint,
+                                                  ControlFile_target.checkPointCopy.ThisTimeLineID);
+
+               /*
+                * If the histories diverged exactly at the end of the shutdown
+                * checkpoint record on the target, there are no WAL records in the
+                * target that don't belong in the source's history, and no rewind is
+                * needed.
+                */
+               if (chkptendrec == divergerec)
+                       rewind_needed = false;
+               else
+                       rewind_needed = true;
+       }
+
+       if (!rewind_needed)
+       {
+               printf(_("No rewind required.\n"));
+               exit(0);
+       }
+
+       findLastCheckpoint(datadir_target, divergerec, lastcommontli,
+                                          &chkptrec, &chkpttli, &chkptredo);
+       printf(_("Rewinding from last common checkpoint at %X/%X on timeline %u\n"),
+                  (uint32) (chkptrec >> 32), (uint32) chkptrec,
+                  chkpttli);
+
+       /*
+        * Build the filemap, by comparing the remote and local data directories.
+        */
+       (void) filemap_create();
+       pg_log(PG_PROGRESS, "reading source file list\n");
+       fetchRemoteFileList();
+       pg_log(PG_PROGRESS, "reading target file list\n");
+       traverse_datadir(datadir_target, &process_local_file);
+
+       /*
+        * Read the target WAL from last checkpoint before the point of fork, to
+        * extract all the pages that were modified on the target cluster after
+        * the fork. We can stop reading after reaching the final shutdown record.
+        * XXX: If we supported rewinding a server that was not shut down cleanly,
+        * we would need to replay until the end of WAL here.
+        */
+       pg_log(PG_PROGRESS, "reading WAL in target\n");
+       extractPageMap(datadir_target, chkptrec, lastcommontli,
+                                  ControlFile_target.checkPoint);
+       filemap_finalize();
+
+       if (showprogress)
+               calculate_totals();
+
+       /* this is too verbose even for verbose mode */
+       if (debug)
+               print_filemap();
+
+       /*
+        * Ok, we're ready to start copying things over.
+        */
+       if (showprogress)
+       {
+               pg_log(PG_PROGRESS, "Need to copy %lu MB (total source directory size is %lu MB)\n",
+                          (unsigned long) (filemap->fetch_size / (1024 * 1024)),
+                          (unsigned long) (filemap->total_size / (1024 * 1024)));
+
+               fetch_size = filemap->fetch_size;
+               fetch_done = 0;
+       }
+
+       /*
+        * This is the point of no return. Once we start copying things, we have
+        * modified the target directory and there is no turning back!
+        */
+
+       executeFileMap();
+
+       progress_report(true);
+
+       pg_log(PG_PROGRESS, "\ncreating backup label and updating control file\n");
+       createBackupLabel(chkptredo, chkpttli, chkptrec);
+
+       /*
+        * Update control file of target. Make it ready to perform archive
+        * recovery when restarting.
+        *
+        * minRecoveryPoint is set to the current WAL insert location in the
+        * source server. Like in an online backup, it's important that we recover
+        * all the WAL that was generated while we copied the files over.
+        */
+       memcpy(&ControlFile_new, &ControlFile_source, sizeof(ControlFileData));
+
+       if (connstr_source)
+       {
+               endrec = libpqGetCurrentXlogInsertLocation();
+               endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
+       }
+       else
+       {
+               endrec = ControlFile_source.checkPoint;
+               endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
+       }
+       ControlFile_new.minRecoveryPoint = endrec;
+       ControlFile_new.minRecoveryPointTLI = endtli;
+       ControlFile_new.state = DB_IN_ARCHIVE_RECOVERY;
+       updateControlFile(&ControlFile_new);
+
+       printf(_("Done!\n"));
+
+       return 0;
+}
+
+static void
+sanityChecks(void)
+{
+       /* TODO Check that there's no backup_label in either cluster */
+
+       /* Check system_id match */
+       if (ControlFile_target.system_identifier != ControlFile_source.system_identifier)
+               pg_fatal("source and target clusters are from different systems\n");
+
+       /* check version */
+       if (ControlFile_target.pg_control_version != PG_CONTROL_VERSION ||
+               ControlFile_source.pg_control_version != PG_CONTROL_VERSION ||
+               ControlFile_target.catalog_version_no != CATALOG_VERSION_NO ||
+               ControlFile_source.catalog_version_no != CATALOG_VERSION_NO)
+       {
+               pg_fatal("clusters are not compatible with this version of pg_rewind\n");
+       }
+
+       /*
+        * Target cluster need to use checksums or hint bit wal-logging, this to
+        * prevent from data corruption that could occur because of hint bits.
+        */
+       if (ControlFile_target.data_checksum_version != PG_DATA_CHECKSUM_VERSION &&
+               !ControlFile_target.wal_log_hints)
+       {
+               pg_fatal("target server need to use either data checksums or \"wal_log_hints = on\"\n");
+       }
+
+       /*
+        * Target cluster better not be running. This doesn't guard against
+        * someone starting the cluster concurrently. Also, this is probably more
+        * strict than necessary; it's OK if the master was not shut down cleanly,
+        * as long as it isn't running at the moment.
+        */
+       if (ControlFile_target.state != DB_SHUTDOWNED)
+               pg_fatal("target server must be shut down cleanly\n");
+
+       /*
+        * When the source is a data directory, also require that the source
+        * server is shut down. There isn't any very strong reason for this
+        * limitation, but better safe than sorry.
+        */
+       if (datadir_source && ControlFile_source.state != DB_SHUTDOWNED)
+               pg_fatal("source data directory must be shut down cleanly\n");
+}
+
+/*
+ * Determine the TLI of the last common timeline in the histories of the two
+ * clusters. *tli is set to the last common timeline, and *recptr is set to
+ * the position where the histories diverged (ie. the first WAL record that's
+ * not the same in both clusters).
+ *
+ * Control files of both clusters must be read into ControlFile_target/source
+ * before calling this.
+ */
+static void
+findCommonAncestorTimeline(XLogRecPtr *recptr, TimeLineID *tli)
+{
+       TimeLineID      targettli;
+       TimeLineHistoryEntry *sourceHistory;
+       int                     nentries;
+       int                     i;
+       TimeLineID      sourcetli;
+
+       targettli = ControlFile_target.checkPointCopy.ThisTimeLineID;
+       sourcetli = ControlFile_source.checkPointCopy.ThisTimeLineID;
+
+       /* Timeline 1 does not have a history file, so no need to check */
+       if (sourcetli == 1)
+       {
+               sourceHistory = (TimeLineHistoryEntry *) pg_malloc(sizeof(TimeLineHistoryEntry));
+               sourceHistory->tli = sourcetli;
+               sourceHistory->begin = sourceHistory->end = InvalidXLogRecPtr;
+               nentries = 1;
+       }
+       else
+       {
+               char            path[MAXPGPATH];
+               char       *histfile;
+
+               TLHistoryFilePath(path, sourcetli);
+               histfile = fetchFile(path, NULL);
+
+               sourceHistory = rewind_parseTimeLineHistory(histfile,
+                                                       ControlFile_source.checkPointCopy.ThisTimeLineID,
+                                                                                                       &nentries);
+               pg_free(histfile);
+       }
+
+       /*
+        * Trace the history backwards, until we hit the target timeline.
+        *
+        * TODO: This assumes that there are no timeline switches on the target
+        * cluster after the fork.
+        */
+       for (i = nentries - 1; i >= 0; i--)
+       {
+               TimeLineHistoryEntry *entry = &sourceHistory[i];
+
+               if (entry->tli == targettli)
+               {
+                       /* found it */
+                       *recptr = entry->end;
+                       *tli = entry->tli;
+
+                       free(sourceHistory);
+                       return;
+               }
+       }
+
+       pg_fatal("could not find common ancestor of the source and target cluster's timelines\n");
+}
+
+
+/*
+ * Create a backup_label file that forces recovery to begin at the last common
+ * checkpoint.
+ */
+static void
+createBackupLabel(XLogRecPtr startpoint, TimeLineID starttli, XLogRecPtr checkpointloc)
+{
+       XLogSegNo       startsegno;
+       time_t          stamp_time;
+       char            strfbuf[128];
+       char            xlogfilename[MAXFNAMELEN];
+       struct tm  *tmp;
+       char            buf[1000];
+       int                     len;
+
+       XLByteToSeg(startpoint, startsegno);
+       XLogFileName(xlogfilename, starttli, startsegno);
+
+       /*
+        * Construct backup label file
+        */
+       stamp_time = time(NULL);
+       tmp = localtime(&stamp_time);
+       strftime(strfbuf, sizeof(strfbuf), "%Y-%m-%d %H:%M:%S %Z", tmp);
+
+       len = snprintf(buf, sizeof(buf),
+                                  "START WAL LOCATION: %X/%X (file %s)\n"
+                                  "CHECKPOINT LOCATION: %X/%X\n"
+                                  "BACKUP METHOD: pg_rewind\n"
+                                  "BACKUP FROM: standby\n"
+                                  "START TIME: %s\n",
+                                  /* omit LABEL: line */
+                                  (uint32) (startpoint >> 32), (uint32) startpoint, xlogfilename,
+                                  (uint32) (checkpointloc >> 32), (uint32) checkpointloc,
+                                  strfbuf);
+       if (len >= sizeof(buf))
+               pg_fatal("backup label buffer too small\n"); /* shouldn't happen */
+
+       /* TODO: move old file out of the way, if any. */
+       open_target_file("backup_label", true); /* BACKUP_LABEL_FILE */
+       write_target_range(buf, 0, len);
+}
+
+/*
+ * Check CRC of control file
+ */
+static void
+checkControlFile(ControlFileData *ControlFile)
+{
+       pg_crc32        crc;
+
+       /* Calculate CRC */
+       INIT_CRC32C(crc);
+       COMP_CRC32C(crc, (char *) ControlFile, offsetof(ControlFileData, crc));
+       FIN_CRC32C(crc);
+
+       /* And simply compare it */
+       if (!EQ_CRC32C(crc, ControlFile->crc))
+               pg_fatal("unexpected control file CRC\n");
+}
+
+/*
+ * Verify control file contents in the buffer src, and copy it to *ControlFile.
+ */
+static void
+digestControlFile(ControlFileData *ControlFile, char *src, size_t size)
+{
+       if (size != PG_CONTROL_SIZE)
+               pg_fatal("unexpected control file size %d, expected %d\n",
+                                (int) size, PG_CONTROL_SIZE);
+
+       memcpy(ControlFile, src, sizeof(ControlFileData));
+
+       /* Additional checks on control file */
+       checkControlFile(ControlFile);
+}
+
+/*
+ * Update the target's control file.
+ */
+static void
+updateControlFile(ControlFileData *ControlFile)
+{
+       char            buffer[PG_CONTROL_SIZE];
+
+       /* Recalculate CRC of control file */
+       INIT_CRC32C(ControlFile->crc);
+       COMP_CRC32C(ControlFile->crc,
+                               (char *) ControlFile,
+                               offsetof(ControlFileData, crc));
+       FIN_CRC32C(ControlFile->crc);
+
+       /*
+        * Write out PG_CONTROL_SIZE bytes into pg_control by zero-padding the
+        * excess over sizeof(ControlFileData) to avoid premature EOF related
+        * errors when reading it.
+        */
+       memset(buffer, 0, PG_CONTROL_SIZE);
+       memcpy(buffer, ControlFile, sizeof(ControlFileData));
+
+       open_target_file("global/pg_control", false);
+
+       write_target_range(buffer, 0, PG_CONTROL_SIZE);
+
+       close_target_file();
+}
diff --git a/src/bin/pg_rewind/pg_rewind.h b/src/bin/pg_rewind/pg_rewind.h
new file mode 100644 (file)
index 0000000..e281369
--- /dev/null
@@ -0,0 +1,44 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_rewind.h
+ *
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_REWIND_H
+#define PG_REWIND_H
+
+#include "c.h"
+
+#include "datapagemap.h"
+
+#include "access/timeline.h"
+#include "storage/block.h"
+#include "storage/relfilenode.h"
+
+/* Configuration options */
+extern char *datadir_target;
+extern char *datadir_source;
+extern char *connstr_source;
+extern bool debug;
+extern bool showprogress;
+extern bool dry_run;
+
+/* in parsexlog.c */
+extern void extractPageMap(const char *datadir, XLogRecPtr startpoint,
+                          TimeLineID tli, XLogRecPtr endpoint);
+extern void findLastCheckpoint(const char *datadir, XLogRecPtr searchptr,
+                                  TimeLineID tli,
+                                  XLogRecPtr *lastchkptrec, TimeLineID *lastchkpttli,
+                                  XLogRecPtr *lastchkptredo);
+extern XLogRecPtr readOneRecord(const char *datadir, XLogRecPtr ptr,
+                         TimeLineID tli);
+
+/* in timeline.c */
+extern TimeLineHistoryEntry *rewind_parseTimeLineHistory(char *buffer,
+                                                       TimeLineID targetTLI, int *nentries);
+
+#endif   /* PG_REWIND_H */
diff --git a/src/bin/pg_rewind/t/001_basic.pl b/src/bin/pg_rewind/t/001_basic.pl
new file mode 100644 (file)
index 0000000..7198a3a
--- /dev/null
@@ -0,0 +1,80 @@
+use strict;
+use warnings;
+use TestLib;
+use Test::More tests => 4;
+
+use RewindTest;
+
+my $testmode = shift;
+
+RewindTest::init_rewind_test('basic', $testmode);
+RewindTest::setup_cluster();
+
+# Create a test table and insert a row in master.
+master_psql("CREATE TABLE tbl1 (d text)");
+master_psql("INSERT INTO tbl1 VALUES ('in master')");
+
+# This test table will be used to test truncation, i.e. the table
+# is extended in the old master after promotion
+master_psql("CREATE TABLE trunc_tbl (d text)");
+master_psql("INSERT INTO trunc_tbl VALUES ('in master')");
+
+# This test table will be used to test the "copy-tail" case, i.e. the
+# table is truncated in the old master after promotion
+master_psql("CREATE TABLE tail_tbl (id integer, d text)");
+master_psql("INSERT INTO tail_tbl VALUES (0, 'in master')");
+
+
+master_psql("CHECKPOINT");
+
+RewindTest::create_standby();
+
+# Insert additional data on master that will be replicated to standby
+master_psql("INSERT INTO tbl1 values ('in master, before promotion')");
+master_psql("INSERT INTO trunc_tbl values ('in master, before promotion')");
+master_psql("INSERT INTO tail_tbl SELECT g, 'in master, before promotion: ' || g FROM generate_series(1, 10000) g");
+
+master_psql('CHECKPOINT');
+
+RewindTest::promote_standby();
+
+# Insert a row in the old master. This causes the master and standby
+# to have "diverged", it's no longer possible to just apply the
+# standy's logs over master directory - you need to rewind.
+master_psql("INSERT INTO tbl1 VALUES ('in master, after promotion')");
+
+# Also insert a new row in the standby, which won't be present in the
+# old master.
+standby_psql("INSERT INTO tbl1 VALUES ('in standby, after promotion')");
+
+# Insert enough rows to trunc_tbl to extend the file. pg_rewind should
+# truncate it back to the old size.
+master_psql("INSERT INTO trunc_tbl SELECT 'in master, after promotion: ' || g FROM generate_series(1, 10000) g");
+
+# Truncate tail_tbl. pg_rewind should copy back the truncated part
+# (We cannot use an actual TRUNCATE command here, as that creates a
+# whole new relfilenode)
+master_psql("DELETE FROM tail_tbl WHERE id > 10");
+master_psql("VACUUM tail_tbl");
+
+RewindTest::run_pg_rewind();
+
+check_query('SELECT * FROM tbl1',
+       qq(in master
+in master, before promotion
+in standby, after promotion
+),
+       'table content');
+
+check_query('SELECT * FROM trunc_tbl',
+       qq(in master
+in master, before promotion
+),
+       'truncation');
+
+check_query('SELECT count(*) FROM tail_tbl',
+       qq(10001
+),
+       'tail-copy');
+
+exit(0);
diff --git a/src/bin/pg_rewind/t/002_databases.pl b/src/bin/pg_rewind/t/002_databases.pl
new file mode 100644 (file)
index 0000000..709c81e
--- /dev/null
@@ -0,0 +1,41 @@
+use strict;
+use warnings;
+use TestLib;
+use Test::More tests => 2;
+
+use RewindTest;
+
+my $testmode = shift;
+
+RewindTest::init_rewind_test('databases', $testmode);
+RewindTest::setup_cluster();
+
+# Create a database in master.
+master_psql('CREATE DATABASE inmaster');
+
+RewindTest::create_standby();
+
+# Create another database, the creation is replicated to the standby
+master_psql('CREATE DATABASE beforepromotion');
+
+RewindTest::promote_standby();
+
+# Create databases in the old master and the new promoted standby.
+master_psql('CREATE DATABASE master_afterpromotion');
+standby_psql('CREATE DATABASE standby_afterpromotion');
+# The clusters are now diverged.
+
+RewindTest::run_pg_rewind();
+
+# Check that the correct databases are present after pg_rewind.
+check_query('SELECT datname FROM pg_database',
+                  qq(template1
+template0
+postgres
+inmaster
+beforepromotion
+standby_afterpromotion
+),
+                  'database names');
+
+exit(0);
diff --git a/src/bin/pg_rewind/t/003_extrafiles.pl b/src/bin/pg_rewind/t/003_extrafiles.pl
new file mode 100644 (file)
index 0000000..dd76fb8
--- /dev/null
@@ -0,0 +1,61 @@
+# Test how pg_rewind reacts to extra files and directories in the data dirs.
+
+use strict;
+use warnings;
+use TestLib;
+use Test::More tests => 2;
+
+use File::Find;
+
+use RewindTest;
+
+my $testmode = shift;
+
+RewindTest::init_rewind_test('extrafiles', $testmode);
+RewindTest::setup_cluster();
+
+# Create a subdir and files that will be present in both
+mkdir "$test_master_datadir/tst_both_dir";
+append_to_file "$test_master_datadir/tst_both_dir/both_file1", "in both1";
+append_to_file "$test_master_datadir/tst_both_dir/both_file2", "in both2";
+mkdir "$test_master_datadir/tst_both_dir/both_subdir/";
+append_to_file "$test_master_datadir/tst_both_dir/both_subdir/both_file3", "in both3";
+
+RewindTest::create_standby();
+
+# Create different subdirs and files in master and standby
+
+mkdir "$test_standby_datadir/tst_standby_dir";
+append_to_file "$test_standby_datadir/tst_standby_dir/standby_file1", "in standby1";
+append_to_file "$test_standby_datadir/tst_standby_dir/standby_file2", "in standby2";
+mkdir "$test_standby_datadir/tst_standby_dir/standby_subdir/";
+append_to_file "$test_standby_datadir/tst_standby_dir/standby_subdir/standby_file3", "in standby3";
+
+mkdir "$test_master_datadir/tst_master_dir";
+append_to_file "$test_master_datadir/tst_master_dir/master_file1", "in master1";
+append_to_file "$test_master_datadir/tst_master_dir/master_file2", "in master2";
+mkdir "$test_master_datadir/tst_master_dir/master_subdir/";
+append_to_file "$test_master_datadir/tst_master_dir/master_subdir/master_file3", "in master3";
+
+RewindTest::promote_standby();
+RewindTest::run_pg_rewind();
+
+# List files in the data directory after rewind.
+my @paths;
+find(sub {push @paths, $File::Find::name if $File::Find::name =~ m/.*tst_.*/},
+        $test_master_datadir);
+@paths = sort @paths;
+is_deeply(\@paths,
+                 ["$test_master_datadir/tst_both_dir",
+                  "$test_master_datadir/tst_both_dir/both_file1",
+                  "$test_master_datadir/tst_both_dir/both_file2",
+                  "$test_master_datadir/tst_both_dir/both_subdir",
+                  "$test_master_datadir/tst_both_dir/both_subdir/both_file3",
+                  "$test_master_datadir/tst_standby_dir",
+                  "$test_master_datadir/tst_standby_dir/standby_file1",
+                  "$test_master_datadir/tst_standby_dir/standby_file2",
+                  "$test_master_datadir/tst_standby_dir/standby_subdir",
+                  "$test_master_datadir/tst_standby_dir/standby_subdir/standby_file3"],
+                 "file lists match");
+
+exit(0);
diff --git a/src/bin/pg_rewind/timeline.c b/src/bin/pg_rewind/timeline.c
new file mode 100644 (file)
index 0000000..07ca370
--- /dev/null
@@ -0,0 +1,131 @@
+/*-------------------------------------------------------------------------
+ *
+ * timeline.c
+ *       timeline-related functions.
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include "pg_rewind.h"
+
+#include "access/timeline.h"
+#include "access/xlog_internal.h"
+
+/*
+ * This is copy-pasted from the backend readTimeLineHistory, modified to
+ * return a malloc'd array and to work without backend functions.
+ */
+/*
+ * Try to read a timeline's history file.
+ *
+ * If successful, return the list of component TLIs (the given TLI followed by
+ * its ancestor TLIs).  If we can't find the history file, assume that the
+ * timeline has no parents, and return a list of just the specified timeline
+ * ID.
+ */
+TimeLineHistoryEntry *
+rewind_parseTimeLineHistory(char *buffer, TimeLineID targetTLI, int *nentries)
+{
+       char       *fline;
+       TimeLineHistoryEntry *entry;
+       TimeLineHistoryEntry *entries = NULL;
+       int                     nlines = 0;
+       TimeLineID      lasttli = 0;
+       XLogRecPtr      prevend;
+       char       *bufptr;
+       bool            lastline = false;
+
+       /*
+        * Parse the file...
+        */
+       prevend = InvalidXLogRecPtr;
+       bufptr = buffer;
+       while (!lastline)
+       {
+               char       *ptr;
+               TimeLineID      tli;
+               uint32          switchpoint_hi;
+               uint32          switchpoint_lo;
+               int                     nfields;
+
+               fline = bufptr;
+               while (*bufptr && *bufptr != '\n')
+                       bufptr++;
+               if (!(*bufptr))
+                       lastline = true;
+               else
+                       *bufptr++ = '\0';
+
+               /* skip leading whitespace and check for # comment */
+               for (ptr = fline; *ptr; ptr++)
+               {
+                       if (!isspace((unsigned char) *ptr))
+                               break;
+               }
+               if (*ptr == '\0' || *ptr == '#')
+                       continue;
+
+               nfields = sscanf(fline, "%u\t%X/%X", &tli, &switchpoint_hi, &switchpoint_lo);
+
+               if (nfields < 1)
+               {
+                       /* expect a numeric timeline ID as first field of line */
+                       printf(_("syntax error in history file: %s\n"), fline);
+                       printf(_("Expected a numeric timeline ID.\n"));
+                       exit(1);
+               }
+               if (nfields != 3)
+               {
+                       printf(_("syntax error in history file: %s\n"), fline);
+                       printf(_("Expected an XLOG switchpoint location.\n"));
+                       exit(1);
+               }
+               if (entries && tli <= lasttli)
+               {
+                       printf(_("invalid data in history file: %s\n"), fline);
+                       printf(_("Timeline IDs must be in increasing sequence.\n"));
+                       exit(1);
+               }
+
+               lasttli = tli;
+
+               nlines++;
+               entries = pg_realloc(entries, nlines * sizeof(TimeLineHistoryEntry));
+
+               entry = &entries[nlines - 1];
+               entry->tli = tli;
+               entry->begin = prevend;
+               entry->end = ((uint64) (switchpoint_hi)) << 32 | (uint64) switchpoint_lo;
+               prevend = entry->end;
+
+               /* we ignore the remainder of each line */
+       }
+
+       if (entries && targetTLI <= lasttli)
+       {
+               printf(_("invalid data in history file\n"));
+               printf(_("Timeline IDs must be less than child timeline's ID.\n"));
+               exit(1);
+       }
+
+       /*
+        * Create one more entry for the "tip" of the timeline, which has no entry
+        * in the history file.
+        */
+       nlines++;
+       if (entries)
+               entries = pg_realloc(entries, nlines * sizeof(TimeLineHistoryEntry));
+       else
+               entries = pg_malloc(1 * sizeof(TimeLineHistoryEntry));
+
+       entry = &entries[nlines - 1];
+       entry->tli = targetTLI;
+       entry->begin = prevend;
+       entry->end = InvalidXLogRecPtr;
+
+       *nentries = nlines;
+       return entries;
+}
index 473a310624768553360bca2e43fa53851fca0b13..8099a61ee0d1d7361ab2530395bfb81e06b6797c 100644 (file)
@@ -65,7 +65,8 @@ my $frontend_extraincludes = {
        'initdb' => ['src\timezone'],
        'psql'   => [ 'src\bin\pg_dump', 'src\backend' ] };
 my $frontend_extrasource = { 'psql' => ['src\bin\psql\psqlscan.l'] };
-my @frontend_excludes = ('pgevent', 'pg_basebackup', 'pg_dump', 'scripts');
+my @frontend_excludes =
+  ('pgevent', 'pg_basebackup', 'pg_rewind', 'pg_dump', 'scripts');
 
 sub mkvcbuild
 {
@@ -422,6 +423,11 @@ sub mkvcbuild
        $pgrecvlogical->AddFile('src\bin\pg_basebackup\pg_recvlogical.c');
        $pgrecvlogical->AddLibrary('ws2_32.lib');
 
+       my $pgrewind = AddSimpleFrontend('pg_rewind', 1);
+       $pgrewind->{name} = 'pg_rewind';
+       $pgrewind->AddFile('src\backend\access\transam\xlogreader.c');
+       $pgrewind->AddLibrary('ws2_32.lib');
+
        my $pgevent = $solution->AddProject('pgevent', 'dll', 'bin');
        $pgevent->AddFiles('src\bin\pgevent', 'pgevent.c', 'pgmsgevent.rc');
        $pgevent->AddResourceFile('src\bin\pgevent', 'Eventlog message formatter',