]> granicus.if.org Git - postgresql/commitdiff
Allow pg_basebackup to stream transaction log in tar mode
authorMagnus Hagander <magnus@hagander.net>
Sun, 23 Oct 2016 13:16:31 +0000 (15:16 +0200)
committerMagnus Hagander <magnus@hagander.net>
Sun, 23 Oct 2016 13:23:11 +0000 (15:23 +0200)
This will write the received transaction log into a file called
pg_wal.tar(.gz) next to the other tarfiles instead of writing it to
base.tar. When using fetch mode, the transaction log is still written to
base.tar like before, and when used against a pre-10 server, the file
is named pg_xlog.tar.

To do this, implement a new concept of a "walmethod", which is
responsible for writing the WAL. Two implementations exist, one that
writes to a plain directory (which is also used by pg_receivexlog) and
one that writes to a tar file with optional compression.

Reviewed by Michael Paquier

doc/src/sgml/ref/pg_basebackup.sgml
src/bin/pg_basebackup/Makefile
src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivexlog.c
src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/receivelog.h
src/bin/pg_basebackup/t/010_pg_basebackup.pl
src/bin/pg_basebackup/walmethods.c [new file with mode: 0644]
src/bin/pg_basebackup/walmethods.h [new file with mode: 0644]
src/include/pgtar.h
src/port/tar.c

index 7cb690dded77cb8eddc2f91db816ab61df5fa2f0..e66a7ae8eedc6c8719e1d21be2cb15dd7199fdad 100644 (file)
@@ -180,7 +180,8 @@ PostgreSQL documentation
             target directory, the tar contents will be written to
             standard output, suitable for piping to for example
             <productname>gzip</productname>. This is only possible if
-            the cluster has no additional tablespaces.
+            the cluster has no additional tablespaces and transaction
+            log streaming is not used.
            </para>
            </listitem>
          </varlistentry>
@@ -323,6 +324,10 @@ PostgreSQL documentation
              If the log has been rotated when it's time to transfer it, the
              backup will fail and be unusable.
            </para>
+           <para>
+            The transaction log files will be written to
+            the <filename>base.tar</filename> file.
+           </para>
           </listitem>
          </varlistentry>
 
@@ -339,6 +344,11 @@ PostgreSQL documentation
              client can keep up with transaction log received, using this mode
              requires no extra transaction logs to be saved on the master.
            </para>
+           <para>
+            The transaction log files are written to a separate file
+            named <filename>pg_wal.tar</filename> (if the server is a version
+            earlier than 10, the file will be named <filename>pg_xlog.tar</filename>).
+           </para>
           </listitem>
          </varlistentry>
         </variablelist>
@@ -353,7 +363,8 @@ PostgreSQL documentation
        <para>
         Enables gzip compression of tar file output, with the default
         compression level. Compression is only available when using
-        the tar format.
+        the tar format, and the suffix <filename>.gz</filename> will
+        automatically be added to all tar filenames.
        </para>
       </listitem>
      </varlistentry>
@@ -366,7 +377,8 @@ PostgreSQL documentation
         Enables gzip compression of tar file output, and specifies the
         compression level (0 through 9, 0 being no compression and 9 being best
         compression). Compression is only available when using the tar
-        format.
+        format, and the suffix <filename>.gz</filename> will
+        automatically be added to all tar filenames.
        </para>
       </listitem>
      </varlistentry>
index fa1ce8b24d1358bf511adf2f76b218bdb110b8b0..52ac9e9fb8912afe5e0ba67ae310f0e8887b49a1 100644 (file)
@@ -19,7 +19,7 @@ include $(top_builddir)/src/Makefile.global
 override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 LDFLAGS += -L$(top_builddir)/src/fe_utils -lpgfeutils -lpq
 
-OBJS=receivelog.o streamutil.o $(WIN32RES)
+OBJS=receivelog.o streamutil.o walmethods.o $(WIN32RES)
 
 all: pg_basebackup pg_receivexlog pg_recvlogical
 
index b82b8e1b263564ecd5507f1a0344dc5761918dc1..16cab978d06601e7bb02543badc16185c64feb84 100644 (file)
@@ -449,7 +449,7 @@ typedef struct
 {
        PGconn     *bgconn;
        XLogRecPtr      startptr;
-       char            xlogdir[MAXPGPATH];
+       char            xlog[MAXPGPATH];        /* directory or tarfile depending on mode */
        char       *sysidentifier;
        int                     timeline;
 } logstreamer_param;
@@ -470,9 +470,13 @@ LogStreamerMain(logstreamer_param *param)
        stream.synchronous = false;
        stream.do_sync = do_sync;
        stream.mark_done = true;
-       stream.basedir = param->xlogdir;
        stream.partial_suffix = NULL;
 
+       if (format == 'p')
+               stream.walmethod = CreateWalDirectoryMethod(param->xlog, do_sync);
+       else
+               stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
+
        if (!ReceiveXlogStream(param->bgconn, &stream))
 
                /*
@@ -482,6 +486,14 @@ LogStreamerMain(logstreamer_param *param)
                 */
                return 1;
 
+       if (!stream.walmethod->finish())
+       {
+               fprintf(stderr,
+                               _("%s: could not finish writing WAL files: %s\n"),
+                               progname, strerror(errno));
+               return 1;
+       }
+
        PQfinish(param->bgconn);
        return 0;
 }
@@ -533,28 +545,32 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
                exit(1);
 
        /* In post-10 cluster, pg_xlog has been renamed to pg_wal */
-       snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/%s",
+       snprintf(param->xlog, sizeof(param->xlog), "%s/%s",
                         basedir,
                         PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
                                "pg_xlog" : "pg_wal");
 
-       /*
-        * Create pg_wal/archive_status or pg_xlog/archive_status (and thus
-        * pg_wal or pg_xlog) depending on the target server so we can write to
-        * basedir/pg_wal or basedir/pg_xlog as the directory entry in the tar
-        * file may arrive later.
-        */
-       snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status",
-                        basedir,
-                        PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
-                               "pg_xlog" : "pg_wal");
 
-       if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
+       if (format == 'p')
        {
-               fprintf(stderr,
-                               _("%s: could not create directory \"%s\": %s\n"),
-                               progname, statusdir, strerror(errno));
-               disconnect_and_exit(1);
+               /*
+                * Create pg_wal/archive_status or pg_xlog/archive_status (and thus
+                * pg_wal or pg_xlog) depending on the target server so we can write to
+                * basedir/pg_wal or basedir/pg_xlog as the directory entry in the tar
+                * file may arrive later.
+                */
+               snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status",
+                                basedir,
+                                PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
+                                "pg_xlog" : "pg_wal");
+
+               if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
+               {
+                       fprintf(stderr,
+                                       _("%s: could not create directory \"%s\": %s\n"),
+                                       progname, statusdir, strerror(errno));
+                       disconnect_and_exit(1);
+               }
        }
 
        /*
@@ -2245,16 +2261,6 @@ main(int argc, char **argv)
                exit(1);
        }
 
-       if (format != 'p' && streamwal)
-       {
-               fprintf(stderr,
-                               _("%s: WAL streaming can only be used in plain mode\n"),
-                               progname);
-               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
-                               progname);
-               exit(1);
-       }
-
        if (replication_slot && !streamwal)
        {
                fprintf(stderr,
index a58a251a59f82086d22e577965245ddf3218abb1..bbdf96edfd284b831a20a2d952b2b7245a5b12b5 100644 (file)
@@ -338,11 +338,19 @@ StreamLog(void)
        stream.synchronous = synchronous;
        stream.do_sync = true;
        stream.mark_done = false;
-       stream.basedir = basedir;
+       stream.walmethod = CreateWalDirectoryMethod(basedir, stream.do_sync);
        stream.partial_suffix = ".partial";
 
        ReceiveXlogStream(conn, &stream);
 
+       if (!stream.walmethod->finish())
+       {
+               fprintf(stderr,
+                               _("%s: could not finish writing WAL files: %s\n"),
+                               progname, strerror(errno));
+               return;
+       }
+
        PQfinish(conn);
        conn = NULL;
 }
index b0fa916b44b4251364fbdd753f2f197672cb382d..fcd026947333b87b8a97efc201f0670903e49a9a 100644 (file)
@@ -30,7 +30,7 @@
 
 
 /* fd and filename for currently open WAL file */
-static int     walfile = -1;
+static Walfile *walfile = NULL;
 static char current_walfile_name[MAXPGPATH] = "";
 static bool reportFlushPosition = false;
 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
@@ -56,29 +56,23 @@ static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
                                                 uint32 *timeline);
 
 static bool
-mark_file_as_archived(const char *basedir, const char *fname, bool do_sync)
+mark_file_as_archived(StreamCtl *stream, const char *fname)
 {
-       int                     fd;
+       Walfile    *f;
        static char tmppath[MAXPGPATH];
 
-       snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
-                        basedir, fname);
+       snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
+                        fname);
 
-       fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
-       if (fd < 0)
+       f = stream->walmethod->open_for_write(tmppath, NULL, 0);
+       if (f == NULL)
        {
                fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
-                               progname, tmppath, strerror(errno));
+                               progname, tmppath, stream->walmethod->getlasterror());
                return false;
        }
 
-       close(fd);
-
-       if (do_sync && fsync_fname(tmppath, false, progname) != 0)
-               return false;
-
-       if (do_sync && fsync_parent_path(tmppath, progname) != 0)
-               return false;
+       stream->walmethod->close(f, CLOSE_NORMAL);
 
        return true;
 }
@@ -95,121 +89,82 @@ mark_file_as_archived(const char *basedir, const char *fname, bool do_sync)
 static bool
 open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
 {
-       int                     f;
+       Walfile    *f;
        char            fn[MAXPGPATH];
-       struct stat statbuf;
-       char       *zerobuf;
-       int                     bytes;
+       ssize_t         size;
        XLogSegNo       segno;
 
        XLByteToSeg(startpoint, segno);
        XLogFileName(current_walfile_name, stream->timeline, segno);
 
-       snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name,
+       snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
                         stream->partial_suffix ? stream->partial_suffix : "");
-       f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
-       if (f == -1)
-       {
-               fprintf(stderr,
-                               _("%s: could not open transaction log file \"%s\": %s\n"),
-                               progname, fn, strerror(errno));
-               return false;
-       }
 
        /*
-        * Verify that the file is either empty (just created), or a complete
-        * XLogSegSize segment. Anything in between indicates a corrupt file.
+        * When streaming to files, if an existing file exists we verify that it's
+        * either empty (just created), or a complete XLogSegSize segment (in
+        * which case it has been created and padded). Anything else indicates a
+        * corrupt file.
+        *
+        * When streaming to tar, no file with this name will exist before, so we
+        * never have to verify a size.
         */
-       if (fstat(f, &statbuf) != 0)
+       if (stream->walmethod->existsfile(fn))
        {
-               fprintf(stderr,
-                               _("%s: could not stat transaction log file \"%s\": %s\n"),
-                               progname, fn, strerror(errno));
-               close(f);
-               return false;
-       }
-       if (statbuf.st_size == XLogSegSize)
-       {
-               /*
-                * fsync, in case of a previous crash between padding and fsyncing the
-                * file.
-                */
-               if (stream->do_sync)
+               size = stream->walmethod->get_file_size(fn);
+               if (size < 0)
                {
-                       if (fsync_fname(fn, false, progname) != 0 ||
-                               fsync_parent_path(fn, progname) != 0)
+                       fprintf(stderr,
+                       _("%s: could not get size of transaction log file \"%s\": %s\n"),
+                                       progname, fn, stream->walmethod->getlasterror());
+                       return false;
+               }
+               if (size == XLogSegSize)
+               {
+                       /* Already padded file. Open it for use */
+                       f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
+                       if (f == NULL)
                        {
-                               /* error already printed */
-                               close(f);
+                               fprintf(stderr,
+                                               _("%s: could not open existing transaction log file \"%s\": %s\n"),
+                                               progname, fn, stream->walmethod->getlasterror());
                                return false;
                        }
-               }
 
-               /* File is open and ready to use */
-               walfile = f;
-               return true;
-       }
-       if (statbuf.st_size != 0)
-       {
-               fprintf(stderr,
-                               _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
-                               progname, fn, (int) statbuf.st_size, XLogSegSize);
-               close(f);
-               return false;
-       }
+                       /* fsync file in case of a previous crash */
+                       if (!stream->walmethod->fsync(f))
+                       {
+                               stream->walmethod->close(f, CLOSE_UNLINK);
+                               return false;
+                       }
 
-       /*
-        * New, empty, file. So pad it to 16Mb with zeroes.  If we fail partway
-        * through padding, we should attempt to unlink the file on failure, so as
-        * not to leave behind a partially-filled file.
-        */
-       zerobuf = pg_malloc0(XLOG_BLCKSZ);
-       for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
-       {
-               errno = 0;
-               if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+                       walfile = f;
+                       return true;
+               }
+               if (size != 0)
                {
                        /* if write didn't set errno, assume problem is no disk space */
                        if (errno == 0)
                                errno = ENOSPC;
                        fprintf(stderr,
-                                       _("%s: could not pad transaction log file \"%s\": %s\n"),
-                                       progname, fn, strerror(errno));
-                       free(zerobuf);
-                       close(f);
-                       unlink(fn);
+                                       _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
+                                       progname, fn, (int) size, XLogSegSize);
                        return false;
                }
+               /* File existed and was empty, so fall through and open */
        }
-       free(zerobuf);
 
-       /*
-        * fsync WAL file and containing directory, to ensure the file is
-        * persistently created and zeroed. That's particularly important when
-        * using synchronous mode, where the file is modified and fsynced
-        * in-place, without a directory fsync.
-        */
-       if (stream->do_sync)
-       {
-               if (fsync_fname(fn, false, progname) != 0 ||
-                       fsync_parent_path(fn, progname) != 0)
-               {
-                       /* error already printed */
-                       close(f);
-                       return false;
-               }
-       }
+       /* No file existed, so create one */
 
-       if (lseek(f, SEEK_SET, 0) != 0)
+       f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, XLogSegSize);
+       if (f == NULL)
        {
                fprintf(stderr,
-                               _("%s: could not seek to beginning of transaction log file \"%s\": %s\n"),
-                               progname, fn, strerror(errno));
-               close(f);
+                               _("%s: could not open transaction log file \"%s\": %s\n"),
+                               progname, fn, stream->walmethod->getlasterror());
                return false;
        }
 
-       /* File is open and ready to use */
        walfile = f;
        return true;
 }
@@ -223,59 +178,46 @@ static bool
 close_walfile(StreamCtl *stream, XLogRecPtr pos)
 {
        off_t           currpos;
+       int                     r;
 
-       if (walfile == -1)
+       if (walfile == NULL)
                return true;
 
-       currpos = lseek(walfile, 0, SEEK_CUR);
+       currpos = stream->walmethod->get_current_pos(walfile);
        if (currpos == -1)
        {
                fprintf(stderr,
                         _("%s: could not determine seek position in file \"%s\": %s\n"),
-                               progname, current_walfile_name, strerror(errno));
-               close(walfile);
-               walfile = -1;
+                 progname, current_walfile_name, stream->walmethod->getlasterror());
+               stream->walmethod->close(walfile, CLOSE_UNLINK);
+               walfile = NULL;
+
                return false;
        }
 
-       if (stream->do_sync && fsync(walfile) != 0)
+       if (stream->partial_suffix)
        {
-               fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-                               progname, current_walfile_name, strerror(errno));
-               close(walfile);
-               walfile = -1;
-               return false;
+               if (currpos == XLOG_SEG_SIZE)
+                       r = stream->walmethod->close(walfile, CLOSE_NORMAL);
+               else
+               {
+                       fprintf(stderr,
+                                       _("%s: not renaming \"%s%s\", segment is not complete\n"),
+                                       progname, current_walfile_name, stream->partial_suffix);
+                       r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
+               }
        }
+       else
+               r = stream->walmethod->close(walfile, CLOSE_NORMAL);
 
-       if (close(walfile) != 0)
+       walfile = NULL;
+
+       if (r != 0)
        {
                fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-                               progname, current_walfile_name, strerror(errno));
-               walfile = -1;
+                 progname, current_walfile_name, stream->walmethod->getlasterror());
                return false;
        }
-       walfile = -1;
-
-       /*
-        * If we finished writing a .partial file, rename it into place.
-        */
-       if (currpos == XLOG_SEG_SIZE && stream->partial_suffix)
-       {
-               char            oldfn[MAXPGPATH];
-               char            newfn[MAXPGPATH];
-
-               snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix);
-               snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name);
-               if (durable_rename(oldfn, newfn, progname) != 0)
-               {
-                       /* durable_rename produced a log entry */
-                       return false;
-               }
-       }
-       else if (stream->partial_suffix)
-               fprintf(stderr,
-                               _("%s: not renaming \"%s%s\", segment is not complete\n"),
-                               progname, current_walfile_name, stream->partial_suffix);
 
        /*
         * Mark file as archived if requested by the caller - pg_basebackup needs
@@ -286,8 +228,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
        if (currpos == XLOG_SEG_SIZE && stream->mark_done)
        {
                /* writes error message if failed */
-               if (!mark_file_as_archived(stream->basedir, current_walfile_name,
-                                                                  stream->do_sync))
+               if (!mark_file_as_archived(stream, current_walfile_name))
                        return false;
        }
 
@@ -302,9 +243,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
 static bool
 existsTimeLineHistoryFile(StreamCtl *stream)
 {
-       char            path[MAXPGPATH];
        char            histfname[MAXFNAMELEN];
-       int                     fd;
 
        /*
         * Timeline 1 never has a history file. We treat that as if it existed,
@@ -315,31 +254,15 @@ existsTimeLineHistoryFile(StreamCtl *stream)
 
        TLHistoryFileName(histfname, stream->timeline);
 
-       snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
-
-       fd = open(path, O_RDONLY | PG_BINARY, 0);
-       if (fd < 0)
-       {
-               if (errno != ENOENT)
-                       fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s\n"),
-                                       progname, path, strerror(errno));
-               return false;
-       }
-       else
-       {
-               close(fd);
-               return true;
-       }
+       return stream->walmethod->existsfile(histfname);
 }
 
 static bool
 writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
 {
        int                     size = strlen(content);
-       char            path[MAXPGPATH];
-       char            tmppath[MAXPGPATH];
        char            histfname[MAXFNAMELEN];
-       int                     fd;
+       Walfile    *f;
 
        /*
         * Check that the server's idea of how timeline history files should be
@@ -353,53 +276,31 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
                return false;
        }
 
-       snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
-
-       /*
-        * Write into a temp file name.
-        */
-       snprintf(tmppath, MAXPGPATH, "%s.tmp", path);
-
-       unlink(tmppath);
-
-       fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
-       if (fd < 0)
+       f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
+       if (f == NULL)
        {
                fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"),
-                               progname, tmppath, strerror(errno));
+                               progname, histfname, stream->walmethod->getlasterror());
                return false;
        }
 
-       errno = 0;
-       if ((int) write(fd, content, size) != size)
+       if ((int) stream->walmethod->write(f, content, size) != size)
        {
-               int                     save_errno = errno;
+               fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
+                               progname, histfname, stream->walmethod->getlasterror());
 
                /*
                 * If we fail to make the file, delete it to release disk space
                 */
-               close(fd);
-               unlink(tmppath);
-               errno = save_errno;
+               stream->walmethod->close(f, CLOSE_UNLINK);
 
-               fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
-                               progname, tmppath, strerror(errno));
                return false;
        }
 
-       if (close(fd) != 0)
+       if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
        {
                fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-                               progname, tmppath, strerror(errno));
-               return false;
-       }
-
-       /*
-        * Now move the completed history file into place with its final name.
-        */
-       if (durable_rename(tmppath, path, progname) < 0)
-       {
-               /* durable_rename produced a log entry */
+                               progname, histfname, stream->walmethod->getlasterror());
                return false;
        }
 
@@ -407,8 +308,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
        if (stream->mark_done)
        {
                /* writes error message if failed */
-               if (!mark_file_as_archived(stream->basedir, histfname,
-                                                                  stream->do_sync))
+               if (!mark_file_as_archived(stream, histfname))
                        return false;
        }
 
@@ -618,7 +518,9 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
        {
                /*
                 * Fetch the timeline history file for this timeline, if we don't have
-                * it already.
+                * it already. When streaming log to tar, this will always return
+                * false, as we are never streaming into an existing file and
+                * therefore there can be no pre-existing timeline history file.
                 */
                if (!existsTimeLineHistoryFile(stream))
                {
@@ -777,10 +679,10 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
        }
 
 error:
-       if (walfile != -1 && close(walfile) != 0)
+       if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NORMAL) != 0)
                fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-                               progname, current_walfile_name, strerror(errno));
-       walfile = -1;
+                 progname, current_walfile_name, stream->walmethod->getlasterror());
+       walfile = NULL;
        return false;
 }
 
@@ -864,12 +766,12 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
                 * If synchronous option is true, issue sync command as soon as there
                 * are WAL data which has not been flushed yet.
                 */
-               if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1)
+               if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
                {
-                       if (stream->do_sync && fsync(walfile) != 0)
+                       if (stream->walmethod->fsync(walfile) != 0)
                        {
                                fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-                                               progname, current_walfile_name, strerror(errno));
+                                               progname, current_walfile_name, stream->walmethod->getlasterror());
                                goto error;
                        }
                        lastFlushPosition = blockpos;
@@ -1100,7 +1002,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
        if (replyRequested && still_sending)
        {
                if (reportFlushPosition && lastFlushPosition < blockpos &&
-                       walfile != -1)
+                       walfile != NULL)
                {
                        /*
                         * If a valid flush location needs to be reported, flush the
@@ -1109,10 +1011,10 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
                         * data has been successfully replicated or not, at the normal
                         * shutdown of the server.
                         */
-                       if (stream->do_sync && fsync(walfile) != 0)
+                       if (stream->walmethod->fsync(walfile) != 0)
                        {
                                fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-                                               progname, current_walfile_name, strerror(errno));
+                                               progname, current_walfile_name, stream->walmethod->getlasterror());
                                return false;
                        }
                        lastFlushPosition = blockpos;
@@ -1170,7 +1072,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
         * Verify that the initial location in the stream matches where we think
         * we are.
         */
-       if (walfile == -1)
+       if (walfile == NULL)
        {
                /* No file open yet */
                if (xlogoff != 0)
@@ -1184,12 +1086,11 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
        else
        {
                /* More data in existing segment */
-               /* XXX: store seek value don't reseek all the time */
-               if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+               if (stream->walmethod->get_current_pos(walfile) != xlogoff)
                {
                        fprintf(stderr,
                                        _("%s: got WAL data offset %08x, expected %08x\n"),
-                                       progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+                                       progname, xlogoff, (int) stream->walmethod->get_current_pos(walfile));
                        return false;
                }
        }
@@ -1210,7 +1111,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
                else
                        bytes_to_write = bytes_left;
 
-               if (walfile == -1)
+               if (walfile == NULL)
                {
                        if (!open_walfile(stream, *blockpos))
                        {
@@ -1219,14 +1120,13 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
                        }
                }
 
-               if (write(walfile,
-                                 copybuf + hdr_len + bytes_written,
-                                 bytes_to_write) != bytes_to_write)
+               if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
+                                                                        bytes_to_write) != bytes_to_write)
                {
                        fprintf(stderr,
                                  _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
                                        progname, bytes_to_write, current_walfile_name,
-                                       strerror(errno));
+                                       stream->walmethod->getlasterror());
                        return false;
                }
 
index 7a3bbc50800d1f540ed93b311c36b15b7293e92c..b5913ea995786960a2b46ba74e6cf52d3c7feee0 100644 (file)
@@ -13,6 +13,7 @@
 #define RECEIVELOG_H
 
 #include "libpq-fe.h"
+#include "walmethods.h"
 
 #include "access/xlogdefs.h"
 
@@ -41,7 +42,7 @@ typedef struct StreamCtl
 
        stream_stop_callback stream_stop;       /* Stop streaming when returns true */
 
-       char       *basedir;            /* Received segments written to this dir */
+       WalWriteMethod *walmethod;      /* How to write the WAL */
        char       *partial_suffix; /* Suffix appended to partially received files */
 } StreamCtl;
 
index 579d7a15fbfde0fbf9e30551461a87e2e1329b29..91eb84e2386b16832ea41a98b827cae3a3b2069c 100644 (file)
@@ -4,7 +4,7 @@ use Cwd;
 use Config;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 67;
+use Test::More tests => 69;
 
 program_help_ok('pg_basebackup');
 program_version_ok('pg_basebackup');
@@ -237,6 +237,10 @@ $node->command_ok(
        'pg_basebackup -X stream runs');
 ok(grep(/^[0-9A-F]{24}$/, slurp_dir("$tempdir/backupxf/pg_wal")),
        'WAL files copied');
+$node->command_ok(
+       [ 'pg_basebackup', '-D', "$tempdir/backupxst", '-X', 'stream', '-Ft' ],
+       'pg_basebackup -X stream runs in tar mode');
+ok(-f "$tempdir/backupxst/pg_wal.tar", "tar file was created");
 
 $node->command_fails(
        [ 'pg_basebackup', '-D', "$tempdir/fail", '-S', 'slot1' ],
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
new file mode 100644 (file)
index 0000000..e0ec752
--- /dev/null
@@ -0,0 +1,886 @@
+/*-------------------------------------------------------------------------
+ *
+ * walmethods.c - implementations of different ways to write received wal
+ *
+ * NOTE! The caller must ensure that only one method is instantiated in
+ *              any given program, and that it's only instantiated once!
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *               src/bin/pg_basebackup/walmethods.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <sys/stat.h>
+#include <time.h>
+#include <unistd.h>
+#ifdef HAVE_LIBZ
+#include <zlib.h>
+#endif
+
+#include "pgtar.h"
+#include "common/file_utils.h"
+
+#include "receivelog.h"
+#include "streamutil.h"
+
+/* Size of zlib buffer for .tar.gz */
+#define ZLIB_OUT_SIZE 4096
+
+/*-------------------------------------------------------------------------
+ * WalDirectoryMethod - write wal to a directory looking like pg_xlog
+ *-------------------------------------------------------------------------
+ */
+
+/*
+ * Global static data for this method
+ */
+typedef struct DirectoryMethodData
+{
+       char       *basedir;
+       bool            sync;
+}      DirectoryMethodData;
+static DirectoryMethodData *dir_data = NULL;
+
+/*
+ * Local file handle
+ */
+typedef struct DirectoryMethodFile
+{
+       int                     fd;
+       off_t           currpos;
+       char       *pathname;
+       char       *fullpath;
+       char       *temp_suffix;
+}      DirectoryMethodFile;
+
+static char *
+dir_getlasterror(void)
+{
+       /* Directory method always sets errno, so just use strerror */
+       return strerror(errno);
+}
+
+static Walfile
+dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+{
+       static char tmppath[MAXPGPATH];
+       int                     fd;
+       DirectoryMethodFile *f;
+
+       snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
+                        dir_data->basedir, pathname, temp_suffix ? temp_suffix : "");
+
+       fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+       if (fd < 0)
+               return NULL;
+
+       if (pad_to_size)
+       {
+               /* Always pre-pad on regular files */
+               char       *zerobuf;
+               int                     bytes;
+
+               zerobuf = pg_malloc0(XLOG_BLCKSZ);
+               for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
+               {
+                       if (write(fd, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+                       {
+                               int                     save_errno = errno;
+
+                               pg_free(zerobuf);
+                               close(fd);
+                               errno = save_errno;
+                               return NULL;
+                       }
+               }
+               pg_free(zerobuf);
+
+               if (lseek(fd, 0, SEEK_SET) != 0)
+               {
+                       int                     save_errno = errno;
+
+                       close(fd);
+                       errno = save_errno;
+                       return NULL;
+               }
+       }
+
+       /*
+        * fsync WAL file and containing directory, to ensure the file is
+        * persistently created and zeroed (if padded). That's particularly
+        * important when using synchronous mode, where the file is modified and
+        * fsynced in-place, without a directory fsync.
+        */
+       if (dir_data->sync)
+       {
+               if (fsync_fname(tmppath, false, progname) != 0 ||
+                       fsync_parent_path(tmppath, progname) != 0)
+               {
+                       close(fd);
+                       return NULL;
+               }
+       }
+
+       f = pg_malloc0(sizeof(DirectoryMethodFile));
+       f->fd = fd;
+       f->currpos = 0;
+       f->pathname = pg_strdup(pathname);
+       f->fullpath = pg_strdup(tmppath);
+       if (temp_suffix)
+               f->temp_suffix = pg_strdup(temp_suffix);
+
+       return f;
+}
+
+static ssize_t
+dir_write(Walfile f, const void *buf, size_t count)
+{
+       ssize_t         r;
+       DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+
+       Assert(f != NULL);
+
+       r = write(df->fd, buf, count);
+       if (r > 0)
+               df->currpos += r;
+       return r;
+}
+
+static off_t
+dir_get_current_pos(Walfile f)
+{
+       Assert(f != NULL);
+
+       /* Use a cached value to prevent lots of reseeks */
+       return ((DirectoryMethodFile *) f)->currpos;
+}
+
+static int
+dir_close(Walfile f, WalCloseMethod method)
+{
+       int                     r;
+       DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+       static char tmppath[MAXPGPATH];
+       static char tmppath2[MAXPGPATH];
+
+       Assert(f != NULL);
+
+       r = close(df->fd);
+
+       if (r == 0)
+       {
+               /* Build path to the current version of the file */
+               if (method == CLOSE_NORMAL && df->temp_suffix)
+               {
+                       /*
+                        * If we have a temp prefix, normal operation is to rename the
+                        * file.
+                        */
+                       snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
+                                        dir_data->basedir, df->pathname, df->temp_suffix);
+                       snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
+                                        dir_data->basedir, df->pathname);
+                       r = durable_rename(tmppath, tmppath2, progname);
+               }
+               else if (method == CLOSE_UNLINK)
+               {
+                       /* Unlink the file once it's closed */
+                       snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
+                                        dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : "");
+                       r = unlink(tmppath);
+               }
+               else
+               {
+                       /*
+                        * Else either CLOSE_NORMAL and no temp suffix, or
+                        * CLOSE_NO_RENAME. In this case, fsync the file and containing
+                        * directory if sync mode is requested.
+                        */
+                       if (dir_data->sync)
+                       {
+                               r = fsync_fname(df->fullpath, false, progname);
+                               if (r == 0)
+                                       r = fsync_parent_path(df->fullpath, progname);
+                       }
+               }
+       }
+
+       pg_free(df->pathname);
+       pg_free(df->fullpath);
+       if (df->temp_suffix)
+               pg_free(df->temp_suffix);
+       pg_free(df);
+
+       return r;
+}
+
+static int
+dir_fsync(Walfile f)
+{
+       Assert(f != NULL);
+
+       if (!dir_data->sync)
+               return 0;
+
+       return fsync(((DirectoryMethodFile *) f)->fd);
+}
+
+static ssize_t
+dir_get_file_size(const char *pathname)
+{
+       struct stat statbuf;
+       static char tmppath[MAXPGPATH];
+
+       snprintf(tmppath, sizeof(tmppath), "%s/%s",
+                        dir_data->basedir, pathname);
+
+       if (stat(tmppath, &statbuf) != 0)
+               return -1;
+
+       return statbuf.st_size;
+}
+
+static bool
+dir_existsfile(const char *pathname)
+{
+       static char tmppath[MAXPGPATH];
+       int                     fd;
+
+       snprintf(tmppath, sizeof(tmppath), "%s/%s",
+                        dir_data->basedir, pathname);
+
+       fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
+       if (fd < 0)
+               return false;
+       close(fd);
+       return true;
+}
+
+static bool
+dir_finish(void)
+{
+       if (dir_data->sync)
+       {
+               /*
+                * Files are fsynced when they are closed, but we need to fsync the
+                * directory entry here as well.
+                */
+               if (fsync_fname(dir_data->basedir, true, progname) != 0)
+                       return false;
+       }
+       return true;
+}
+
+
+WalWriteMethod *
+CreateWalDirectoryMethod(const char *basedir, bool sync)
+{
+       WalWriteMethod *method;
+
+       method = pg_malloc0(sizeof(WalWriteMethod));
+       method->open_for_write = dir_open_for_write;
+       method->write = dir_write;
+       method->get_current_pos = dir_get_current_pos;
+       method->get_file_size = dir_get_file_size;
+       method->close = dir_close;
+       method->fsync = dir_fsync;
+       method->existsfile = dir_existsfile;
+       method->finish = dir_finish;
+       method->getlasterror = dir_getlasterror;
+
+       dir_data = pg_malloc0(sizeof(DirectoryMethodData));
+       dir_data->basedir = pg_strdup(basedir);
+       dir_data->sync = sync;
+
+       return method;
+}
+
+
+/*-------------------------------------------------------------------------
+ * WalTarMethod - write wal to a tar file containing pg_xlog contents
+ *-------------------------------------------------------------------------
+ */
+
+typedef struct TarMethodFile
+{
+       off_t           ofs_start;              /* Where does the *header* for this file start */
+       off_t           currpos;
+       char            header[512];
+       char       *pathname;
+       size_t          pad_to_size;
+}      TarMethodFile;
+
+typedef struct TarMethodData
+{
+       char       *tarfilename;
+       int                     fd;
+       int                     compression;
+       bool            sync;
+       TarMethodFile *currentfile;
+       char            lasterror[1024];
+#ifdef HAVE_LIBZ
+       z_streamp       zp;
+       void       *zlibOut;
+#endif
+}      TarMethodData;
+static TarMethodData *tar_data = NULL;
+
+#define tar_clear_error() tar_data->lasterror[0] = '\0'
+#define tar_set_error(msg) strlcpy(tar_data->lasterror, msg, sizeof(tar_data->lasterror))
+
+static char *
+tar_getlasterror(void)
+{
+       /*
+        * If a custom error is set, return that one. Otherwise, assume errno is
+        * set and return that one.
+        */
+       if (tar_data->lasterror[0])
+               return tar_data->lasterror;
+       return strerror(errno);
+}
+
+#ifdef HAVE_LIBZ
+static bool
+tar_write_compressed_data(void *buf, size_t count, bool flush)
+{
+       tar_data->zp->next_in = buf;
+       tar_data->zp->avail_in = count;
+
+       while (tar_data->zp->avail_in || flush)
+       {
+               int                     r;
+
+               r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
+               if (r == Z_STREAM_ERROR)
+               {
+                       tar_set_error("deflate failed");
+                       return false;
+               }
+
+               if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
+               {
+                       size_t          len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
+
+                       if (write(tar_data->fd, tar_data->zlibOut, len) != len)
+                               return false;
+
+                       tar_data->zp->next_out = tar_data->zlibOut;
+                       tar_data->zp->avail_out = ZLIB_OUT_SIZE;
+               }
+
+               if (r == Z_STREAM_END)
+                       break;
+       }
+
+       if (flush)
+       {
+               /* Reset the stream for writing */
+               if (deflateReset(tar_data->zp) != Z_OK)
+               {
+                       tar_set_error("deflateReset failed");
+                       return false;
+               }
+       }
+
+       return true;
+}
+#endif
+
+static ssize_t
+tar_write(Walfile f, const void *buf, size_t count)
+{
+       ssize_t         r;
+
+       Assert(f != NULL);
+       tar_clear_error();
+
+       /* Tarfile will always be positioned at the end */
+       if (!tar_data->compression)
+       {
+               r = write(tar_data->fd, buf, count);
+               if (r > 0)
+                       ((TarMethodFile *) f)->currpos += r;
+               return r;
+       }
+#ifdef HAVE_LIBZ
+       else
+       {
+               if (!tar_write_compressed_data((void *) buf, count, false))
+                       return -1;
+               ((TarMethodFile *) f)->currpos += count;
+               return count;
+       }
+#endif
+}
+
+static bool
+tar_write_padding_data(TarMethodFile * f, size_t bytes)
+{
+       char       *zerobuf = pg_malloc0(XLOG_BLCKSZ);
+       size_t          bytesleft = bytes;
+
+       while (bytesleft)
+       {
+               size_t          bytestowrite = bytesleft > XLOG_BLCKSZ ? XLOG_BLCKSZ : bytesleft;
+
+               size_t          r = tar_write(f, zerobuf, bytestowrite);
+
+               if (r < 0)
+                       return false;
+               bytesleft -= r;
+       }
+       return true;
+}
+
+static Walfile
+tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+{
+       int                     save_errno;
+       static char tmppath[MAXPGPATH];
+
+       tar_clear_error();
+
+       if (tar_data->fd < 0)
+       {
+               /*
+                * We open the tar file only when we first try to write to it.
+                */
+               tar_data->fd = open(tar_data->tarfilename,
+                                                 O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+               if (tar_data->fd < 0)
+                       return NULL;
+
+#ifdef HAVE_LIBZ
+               if (tar_data->compression)
+               {
+                       tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
+                       tar_data->zp->zalloc = Z_NULL;
+                       tar_data->zp->zfree = Z_NULL;
+                       tar_data->zp->opaque = Z_NULL;
+                       tar_data->zp->next_out = tar_data->zlibOut;
+                       tar_data->zp->avail_out = ZLIB_OUT_SIZE;
+
+                       /*
+                        * Initialize deflation library. Adding the magic value 16 to the
+                        * default 15 for the windowBits parameter makes the output be
+                        * gzip instead of zlib.
+                        */
+                       if (deflateInit2(tar_data->zp, tar_data->compression, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
+                       {
+                               pg_free(tar_data->zp);
+                               tar_data->zp = NULL;
+                               tar_set_error("deflateInit2 failed");
+                               return NULL;
+                       }
+               }
+#endif
+
+               /* There's no tar header itself, the file starts with regular files */
+       }
+
+       Assert(tar_data->currentfile == NULL);
+       if (tar_data->currentfile != NULL)
+       {
+               tar_set_error("implementation error: tar files can't have more than one open file\n");
+               return NULL;
+       }
+
+       tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
+
+       snprintf(tmppath, sizeof(tmppath), "%s%s",
+                        pathname, temp_suffix ? temp_suffix : "");
+
+       /* Create a header with size set to 0 - we will fill out the size on close */
+       if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
+       {
+               pg_free(tar_data->currentfile);
+               tar_data->currentfile = NULL;
+               tar_set_error("could not create tar header");
+               return NULL;
+       }
+
+#ifdef HAVE_LIBZ
+       if (tar_data->compression)
+       {
+               /* Flush existing data */
+               if (!tar_write_compressed_data(NULL, 0, true))
+                       return NULL;
+
+               /* Turn off compression for header */
+               if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
+               {
+                       tar_set_error("deflateParams failed");
+                       return NULL;
+               }
+       }
+#endif
+
+       tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
+       if (tar_data->currentfile->ofs_start == -1)
+       {
+               save_errno = errno;
+               pg_free(tar_data->currentfile);
+               tar_data->currentfile = NULL;
+               errno = save_errno;
+               return NULL;
+       }
+       tar_data->currentfile->currpos = 0;
+
+       if (!tar_data->compression)
+       {
+               if (write(tar_data->fd, tar_data->currentfile->header, 512) != 512)
+               {
+                       save_errno = errno;
+                       pg_free(tar_data->currentfile);
+                       tar_data->currentfile = NULL;
+                       errno = save_errno;
+                       return NULL;
+               }
+       }
+#ifdef HAVE_LIBZ
+       else
+       {
+               /* Write header through the zlib APIs but with no compression */
+               if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
+                       return NULL;
+
+               /* Re-enable compression for the rest of the file */
+               if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
+               {
+                       tar_set_error("deflateParams failed");
+                       return NULL;
+               }
+       }
+#endif
+
+       tar_data->currentfile->pathname = pg_strdup(pathname);
+
+       /*
+        * Uncompressed files are padded on creation, but for compression we can't
+        * do that
+        */
+       if (pad_to_size)
+       {
+               tar_data->currentfile->pad_to_size = pad_to_size;
+               if (!tar_data->compression)
+               {
+                       /* Uncompressed, so pad now */
+                       tar_write_padding_data(tar_data->currentfile, pad_to_size);
+                       /* Seek back to start */
+                       if (lseek(tar_data->fd, tar_data->currentfile->ofs_start + 512, SEEK_SET) != tar_data->currentfile->ofs_start + 512)
+                               return NULL;
+
+                       tar_data->currentfile->currpos = 0;
+               }
+       }
+
+       return tar_data->currentfile;
+}
+
+static ssize_t
+tar_get_file_size(const char *pathname)
+{
+       tar_clear_error();
+
+       /* Currently not used, so not supported */
+       errno = ENOSYS;
+       return -1;
+}
+
+static off_t
+tar_get_current_pos(Walfile f)
+{
+       Assert(f != NULL);
+       tar_clear_error();
+
+       return ((TarMethodFile *) f)->currpos;
+}
+
+static int
+tar_fsync(Walfile f)
+{
+       Assert(f != NULL);
+       tar_clear_error();
+
+       /*
+        * Always sync the whole tarfile, because that's all we can do. This makes
+        * no sense on compressed files, so just ignore those.
+        */
+       if (tar_data->compression)
+               return 0;
+
+       return fsync(tar_data->fd);
+}
+
+static int
+tar_close(Walfile f, WalCloseMethod method)
+{
+       ssize_t         filesize;
+       int                     padding;
+       TarMethodFile *tf = (TarMethodFile *) f;
+
+       Assert(f != NULL);
+       tar_clear_error();
+
+       if (method == CLOSE_UNLINK)
+       {
+               if (tar_data->compression)
+               {
+                       tar_set_error("unlink not supported with compression");
+                       return -1;
+               }
+
+               /*
+                * Unlink the file that we just wrote to the tar. We do this by
+                * truncating it to the start of the header. This is safe as we only
+                * allow writing of the very last file.
+                */
+               if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
+                       return -1;
+
+               pg_free(tf->pathname);
+               pg_free(tf);
+               tar_data->currentfile = NULL;
+
+               return 0;
+       }
+
+       /*
+        * Pad the file itself with zeroes if necessary. Note that this is
+        * different from the tar format padding -- this is the padding we asked
+        * for when the file was opened.
+        */
+       if (tf->pad_to_size)
+       {
+               if (tar_data->compression)
+               {
+                       /*
+                        * A compressed tarfile is padded on close since we cannot know
+                        * the size of the compressed output until the end.
+                        */
+                       size_t          sizeleft = tf->pad_to_size - tf->currpos;
+
+                       if (sizeleft)
+                       {
+                               if (!tar_write_padding_data(tf, sizeleft))
+                                       return -1;
+                       }
+               }
+               else
+               {
+                       /*
+                        * An uncompressed tarfile was padded on creation, so just adjust
+                        * the current position as if we seeked to the end.
+                        */
+                       tf->currpos = tf->pad_to_size;
+               }
+       }
+
+       /*
+        * Get the size of the file, and pad the current data up to the nearest
+        * 512 byte boundary.
+        */
+       filesize = tar_get_current_pos(f);
+       padding = ((filesize + 511) & ~511) - filesize;
+       if (padding)
+       {
+               char            zerobuf[512];
+
+               MemSet(zerobuf, 0, padding);
+               if (tar_write(f, zerobuf, padding) != padding)
+                       return -1;
+       }
+
+
+#ifdef HAVE_LIBZ
+       if (tar_data->compression)
+       {
+               /* Flush the current buffer */
+               if (!tar_write_compressed_data(NULL, 0, true))
+               {
+                       errno = EINVAL;
+                       return -1;
+               }
+       }
+#endif
+
+       /*
+        * Now go back and update the header with the correct filesize and
+        * possibly also renaming the file. We overwrite the entire current header
+        * when done, including the checksum.
+        */
+       print_tar_number(&(tf->header[124]), 12, filesize);
+
+       if (method == CLOSE_NORMAL)
+
+               /*
+                * We overwrite it with what it was before if we have no tempname,
+                * since we're going to write the buffer anyway.
+                */
+               strlcpy(&(tf->header[0]), tf->pathname, 100);
+
+       print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
+       if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
+               return -1;
+       if (!tar_data->compression)
+       {
+               if (write(tar_data->fd, tf->header, 512) != 512)
+                       return -1;
+       }
+#ifdef HAVE_LIBZ
+       else
+       {
+               /* Turn off compression */
+               if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
+               {
+                       tar_set_error("deflateParams failed");
+                       return -1;
+               }
+
+               /* Overwrite the header, assuming the size will be the same */
+               if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
+                       return -1;
+
+               /* Turn compression back on */
+               if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
+               {
+                       tar_set_error("deflateParams failed");
+                       return -1;
+               }
+       }
+#endif
+
+       /* Move file pointer back down to end, so we can write the next file */
+       if (lseek(tar_data->fd, 0, SEEK_END) < 0)
+               return -1;
+
+       /* Always fsync on close, so the padding gets fsynced */
+       tar_fsync(f);
+
+       /* Clean up and done */
+       pg_free(tf->pathname);
+       pg_free(tf);
+       tar_data->currentfile = NULL;
+
+       return 0;
+}
+
+static bool
+tar_existsfile(const char *pathname)
+{
+       tar_clear_error();
+       /* We only deal with new tarfiles, so nothing externally created exists */
+       return false;
+}
+
+static bool
+tar_finish(void)
+{
+       char            zerobuf[1024];
+
+       tar_clear_error();
+
+       if (tar_data->currentfile)
+       {
+               if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0)
+                       return false;
+       }
+
+       /* A tarfile always ends with two empty  blocks */
+       MemSet(zerobuf, 0, sizeof(zerobuf));
+       if (!tar_data->compression)
+       {
+               if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
+                       return false;
+       }
+#ifdef HAVE_LIBZ
+       else
+       {
+               if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
+                       return false;
+
+               /* Also flush all data to make sure the gzip stream is finished */
+               tar_data->zp->next_in = NULL;
+               tar_data->zp->avail_in = 0;
+               while (true)
+               {
+                       int                     r;
+
+                       r = deflate(tar_data->zp, Z_FINISH);
+
+                       if (r == Z_STREAM_ERROR)
+                       {
+                               tar_set_error("deflate failed");
+                               return false;
+                       }
+                       if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
+                       {
+                               size_t          len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
+
+                               if (write(tar_data->fd, tar_data->zlibOut, len) != len)
+                                       return false;
+                       }
+                       if (r == Z_STREAM_END)
+                               break;
+               }
+
+               if (deflateEnd(tar_data->zp) != Z_OK)
+               {
+                       tar_set_error("deflateEnd failed");
+                       return false;
+               }
+       }
+#endif
+
+       /* sync the empty blocks as well, since they're after the last file */
+       fsync(tar_data->fd);
+
+       if (close(tar_data->fd) != 0)
+               return false;
+
+       tar_data->fd = -1;
+
+       if (tar_data->sync)
+       {
+               if (fsync_fname(tar_data->tarfilename, false, progname) != 0)
+                       return false;
+               if (fsync_parent_path(tar_data->tarfilename, progname) != 0)
+                       return false;
+       }
+
+       return true;
+}
+
+WalWriteMethod *
+CreateWalTarMethod(const char *tarbase, int compression, bool sync)
+{
+       WalWriteMethod *method;
+       const char *suffix = (compression != 0) ? ".tar.gz" : ".tar";
+
+       method = pg_malloc0(sizeof(WalWriteMethod));
+       method->open_for_write = tar_open_for_write;
+       method->write = tar_write;
+       method->get_current_pos = tar_get_current_pos;
+       method->get_file_size = tar_get_file_size;
+       method->close = tar_close;
+       method->fsync = tar_fsync;
+       method->existsfile = tar_existsfile;
+       method->finish = tar_finish;
+       method->getlasterror = tar_getlasterror;
+
+       tar_data = pg_malloc0(sizeof(TarMethodData));
+       tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
+       sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
+       tar_data->fd = -1;
+       tar_data->compression = compression;
+       tar_data->sync = sync;
+       if (compression)
+               tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
+
+       return method;
+}
diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h
new file mode 100644 (file)
index 0000000..fa58f81
--- /dev/null
@@ -0,0 +1,45 @@
+/*-------------------------------------------------------------------------
+ *
+ * walmethods.h
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *               src/bin/pg_basebackup/walmethods.h
+ *-------------------------------------------------------------------------
+ */
+
+
+typedef void *Walfile;
+
+typedef enum
+{
+       CLOSE_NORMAL,
+       CLOSE_UNLINK,
+       CLOSE_NO_RENAME,
+}      WalCloseMethod;
+
+typedef struct WalWriteMethod WalWriteMethod;
+struct WalWriteMethod
+{
+       Walfile(*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size);
+       int                     (*close) (Walfile f, WalCloseMethod method);
+       bool            (*existsfile) (const char *pathname);
+       ssize_t         (*get_file_size) (const char *pathname);
+
+       ssize_t         (*write) (Walfile f, const void *buf, size_t count);
+       off_t           (*get_current_pos) (Walfile f);
+       int                     (*fsync) (Walfile f);
+       bool            (*finish) (void);
+       char       *(*getlasterror) (void);
+};
+
+/*
+ * Available WAL methods:
+ *     - WalDirectoryMethod - write WAL to regular files in a standard pg_xlog
+ *     - TarDirectoryMethod - write WAL to a tarfile corresponding to pg_xlog
+ *                                                (only implements the methods required for pg_basebackup,
+ *                                                not all those required for pg_receivexlog)
+ */
+WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, bool sync);
+WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync);
index 45ca400f98e6171b29c8a45f6fdc2852b6244f48..1d179f0df1ee80c68419e628f6b80ff926278e10 100644 (file)
@@ -22,4 +22,5 @@ enum tarError
 extern enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget,
                          pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime);
 extern uint64 read_tar_number(const char *s, int len);
+extern void print_tar_number(char *s, int len, uint64 val);
 extern int     tarChecksum(char *header);
index 52a2113a47e2592f4b19bf8dbe6fdc63e76ae9a1..f1da959dacf578727cc3b5bc8c1a15fac2c80ba1 100644 (file)
@@ -16,7 +16,7 @@
  * support only non-negative numbers, so we don't worry about the GNU rules
  * for handling negative numbers.)
  */
-static void
+void
 print_tar_number(char *s, int len, uint64 val)
 {
        if (val < (((uint64) 1) << ((len - 1) * 3)))