target directory, the tar contents will be written to
standard output, suitable for piping to for example
<productname>gzip</productname>. This is only possible if
- the cluster has no additional tablespaces.
+ the cluster has no additional tablespaces and transaction
+ log streaming is not used.
</para>
</listitem>
</varlistentry>
If the log has been rotated when it's time to transfer it, the
backup will fail and be unusable.
</para>
+ <para>
+ The transaction log files will be written to
+ the <filename>base.tar</filename> file.
+ </para>
</listitem>
</varlistentry>
client can keep up with transaction log received, using this mode
requires no extra transaction logs to be saved on the master.
</para>
+ <para>
+ The transaction log files are written to a separate file
+ named <filename>pg_wal.tar</filename> (if the server is a version
+ earlier than 10, the file will be named <filename>pg_xlog.tar</filename>).
+ </para>
</listitem>
</varlistentry>
</variablelist>
<para>
Enables gzip compression of tar file output, with the default
compression level. Compression is only available when using
- the tar format.
+ the tar format, and the suffix <filename>.gz</filename> will
+ automatically be added to all tar filenames.
</para>
</listitem>
</varlistentry>
Enables gzip compression of tar file output, and specifies the
compression level (0 through 9, 0 being no compression and 9 being best
compression). Compression is only available when using the tar
- format.
+ format, and the suffix <filename>.gz</filename> will
+ automatically be added to all tar filenames.
</para>
</listitem>
</varlistentry>
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
LDFLAGS += -L$(top_builddir)/src/fe_utils -lpgfeutils -lpq
-OBJS=receivelog.o streamutil.o $(WIN32RES)
+OBJS=receivelog.o streamutil.o walmethods.o $(WIN32RES)
all: pg_basebackup pg_receivexlog pg_recvlogical
{
PGconn *bgconn;
XLogRecPtr startptr;
- char xlogdir[MAXPGPATH];
+ char xlog[MAXPGPATH]; /* directory or tarfile depending on mode */
char *sysidentifier;
int timeline;
} logstreamer_param;
stream.synchronous = false;
stream.do_sync = do_sync;
stream.mark_done = true;
- stream.basedir = param->xlogdir;
stream.partial_suffix = NULL;
+ if (format == 'p')
+ stream.walmethod = CreateWalDirectoryMethod(param->xlog, do_sync);
+ else
+ stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
+
if (!ReceiveXlogStream(param->bgconn, &stream))
/*
*/
return 1;
+ if (!stream.walmethod->finish())
+ {
+ fprintf(stderr,
+ _("%s: could not finish writing WAL files: %s\n"),
+ progname, strerror(errno));
+ return 1;
+ }
+
PQfinish(param->bgconn);
return 0;
}
exit(1);
/* In post-10 cluster, pg_xlog has been renamed to pg_wal */
- snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/%s",
+ snprintf(param->xlog, sizeof(param->xlog), "%s/%s",
basedir,
PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
"pg_xlog" : "pg_wal");
- /*
- * Create pg_wal/archive_status or pg_xlog/archive_status (and thus
- * pg_wal or pg_xlog) depending on the target server so we can write to
- * basedir/pg_wal or basedir/pg_xlog as the directory entry in the tar
- * file may arrive later.
- */
- snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status",
- basedir,
- PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
- "pg_xlog" : "pg_wal");
- if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
+ if (format == 'p')
{
- fprintf(stderr,
- _("%s: could not create directory \"%s\": %s\n"),
- progname, statusdir, strerror(errno));
- disconnect_and_exit(1);
+ /*
+ * Create pg_wal/archive_status or pg_xlog/archive_status (and thus
+ * pg_wal or pg_xlog) depending on the target server so we can write to
+ * basedir/pg_wal or basedir/pg_xlog as the directory entry in the tar
+ * file may arrive later.
+ */
+ snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status",
+ basedir,
+ PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
+ "pg_xlog" : "pg_wal");
+
+ if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
+ {
+ fprintf(stderr,
+ _("%s: could not create directory \"%s\": %s\n"),
+ progname, statusdir, strerror(errno));
+ disconnect_and_exit(1);
+ }
}
/*
exit(1);
}
- if (format != 'p' && streamwal)
- {
- fprintf(stderr,
- _("%s: WAL streaming can only be used in plain mode\n"),
- progname);
- fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
- progname);
- exit(1);
- }
-
if (replication_slot && !streamwal)
{
fprintf(stderr,
stream.synchronous = synchronous;
stream.do_sync = true;
stream.mark_done = false;
- stream.basedir = basedir;
+ stream.walmethod = CreateWalDirectoryMethod(basedir, stream.do_sync);
stream.partial_suffix = ".partial";
ReceiveXlogStream(conn, &stream);
+ if (!stream.walmethod->finish())
+ {
+ fprintf(stderr,
+ _("%s: could not finish writing WAL files: %s\n"),
+ progname, strerror(errno));
+ return;
+ }
+
PQfinish(conn);
conn = NULL;
}
/* fd and filename for currently open WAL file */
-static int walfile = -1;
+static Walfile *walfile = NULL;
static char current_walfile_name[MAXPGPATH] = "";
static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
uint32 *timeline);
static bool
-mark_file_as_archived(const char *basedir, const char *fname, bool do_sync)
+mark_file_as_archived(StreamCtl *stream, const char *fname)
{
- int fd;
+ Walfile *f;
static char tmppath[MAXPGPATH];
- snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
- basedir, fname);
+ snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
+ fname);
- fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
- if (fd < 0)
+ f = stream->walmethod->open_for_write(tmppath, NULL, 0);
+ if (f == NULL)
{
fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
- progname, tmppath, strerror(errno));
+ progname, tmppath, stream->walmethod->getlasterror());
return false;
}
- close(fd);
-
- if (do_sync && fsync_fname(tmppath, false, progname) != 0)
- return false;
-
- if (do_sync && fsync_parent_path(tmppath, progname) != 0)
- return false;
+ stream->walmethod->close(f, CLOSE_NORMAL);
return true;
}
static bool
open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
{
- int f;
+ Walfile *f;
char fn[MAXPGPATH];
- struct stat statbuf;
- char *zerobuf;
- int bytes;
+ ssize_t size;
XLogSegNo segno;
XLByteToSeg(startpoint, segno);
XLogFileName(current_walfile_name, stream->timeline, segno);
- snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name,
+ snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
stream->partial_suffix ? stream->partial_suffix : "");
- f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
- if (f == -1)
- {
- fprintf(stderr,
- _("%s: could not open transaction log file \"%s\": %s\n"),
- progname, fn, strerror(errno));
- return false;
- }
/*
- * Verify that the file is either empty (just created), or a complete
- * XLogSegSize segment. Anything in between indicates a corrupt file.
+ * When streaming to files, if an existing file exists we verify that it's
+ * either empty (just created), or a complete XLogSegSize segment (in
+ * which case it has been created and padded). Anything else indicates a
+ * corrupt file.
+ *
+ * When streaming to tar, no file with this name will exist before, so we
+ * never have to verify a size.
*/
- if (fstat(f, &statbuf) != 0)
+ if (stream->walmethod->existsfile(fn))
{
- fprintf(stderr,
- _("%s: could not stat transaction log file \"%s\": %s\n"),
- progname, fn, strerror(errno));
- close(f);
- return false;
- }
- if (statbuf.st_size == XLogSegSize)
- {
- /*
- * fsync, in case of a previous crash between padding and fsyncing the
- * file.
- */
- if (stream->do_sync)
+ size = stream->walmethod->get_file_size(fn);
+ if (size < 0)
{
- if (fsync_fname(fn, false, progname) != 0 ||
- fsync_parent_path(fn, progname) != 0)
+ fprintf(stderr,
+ _("%s: could not get size of transaction log file \"%s\": %s\n"),
+ progname, fn, stream->walmethod->getlasterror());
+ return false;
+ }
+ if (size == XLogSegSize)
+ {
+ /* Already padded file. Open it for use */
+ f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
+ if (f == NULL)
{
- /* error already printed */
- close(f);
+ fprintf(stderr,
+ _("%s: could not open existing transaction log file \"%s\": %s\n"),
+ progname, fn, stream->walmethod->getlasterror());
return false;
}
- }
- /* File is open and ready to use */
- walfile = f;
- return true;
- }
- if (statbuf.st_size != 0)
- {
- fprintf(stderr,
- _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
- progname, fn, (int) statbuf.st_size, XLogSegSize);
- close(f);
- return false;
- }
+ /* fsync file in case of a previous crash */
+ if (!stream->walmethod->fsync(f))
+ {
+ stream->walmethod->close(f, CLOSE_UNLINK);
+ return false;
+ }
- /*
- * New, empty, file. So pad it to 16Mb with zeroes. If we fail partway
- * through padding, we should attempt to unlink the file on failure, so as
- * not to leave behind a partially-filled file.
- */
- zerobuf = pg_malloc0(XLOG_BLCKSZ);
- for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
- {
- errno = 0;
- if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+ walfile = f;
+ return true;
+ }
+ if (size != 0)
{
/* if write didn't set errno, assume problem is no disk space */
if (errno == 0)
errno = ENOSPC;
fprintf(stderr,
- _("%s: could not pad transaction log file \"%s\": %s\n"),
- progname, fn, strerror(errno));
- free(zerobuf);
- close(f);
- unlink(fn);
+ _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
+ progname, fn, (int) size, XLogSegSize);
return false;
}
+ /* File existed and was empty, so fall through and open */
}
- free(zerobuf);
- /*
- * fsync WAL file and containing directory, to ensure the file is
- * persistently created and zeroed. That's particularly important when
- * using synchronous mode, where the file is modified and fsynced
- * in-place, without a directory fsync.
- */
- if (stream->do_sync)
- {
- if (fsync_fname(fn, false, progname) != 0 ||
- fsync_parent_path(fn, progname) != 0)
- {
- /* error already printed */
- close(f);
- return false;
- }
- }
+ /* No file existed, so create one */
- if (lseek(f, SEEK_SET, 0) != 0)
+ f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, XLogSegSize);
+ if (f == NULL)
{
fprintf(stderr,
- _("%s: could not seek to beginning of transaction log file \"%s\": %s\n"),
- progname, fn, strerror(errno));
- close(f);
+ _("%s: could not open transaction log file \"%s\": %s\n"),
+ progname, fn, stream->walmethod->getlasterror());
return false;
}
- /* File is open and ready to use */
walfile = f;
return true;
}
close_walfile(StreamCtl *stream, XLogRecPtr pos)
{
off_t currpos;
+ int r;
- if (walfile == -1)
+ if (walfile == NULL)
return true;
- currpos = lseek(walfile, 0, SEEK_CUR);
+ currpos = stream->walmethod->get_current_pos(walfile);
if (currpos == -1)
{
fprintf(stderr,
_("%s: could not determine seek position in file \"%s\": %s\n"),
- progname, current_walfile_name, strerror(errno));
- close(walfile);
- walfile = -1;
+ progname, current_walfile_name, stream->walmethod->getlasterror());
+ stream->walmethod->close(walfile, CLOSE_UNLINK);
+ walfile = NULL;
+
return false;
}
- if (stream->do_sync && fsync(walfile) != 0)
+ if (stream->partial_suffix)
{
- fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
- progname, current_walfile_name, strerror(errno));
- close(walfile);
- walfile = -1;
- return false;
+ if (currpos == XLOG_SEG_SIZE)
+ r = stream->walmethod->close(walfile, CLOSE_NORMAL);
+ else
+ {
+ fprintf(stderr,
+ _("%s: not renaming \"%s%s\", segment is not complete\n"),
+ progname, current_walfile_name, stream->partial_suffix);
+ r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
+ }
}
+ else
+ r = stream->walmethod->close(walfile, CLOSE_NORMAL);
- if (close(walfile) != 0)
+ walfile = NULL;
+
+ if (r != 0)
{
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
- progname, current_walfile_name, strerror(errno));
- walfile = -1;
+ progname, current_walfile_name, stream->walmethod->getlasterror());
return false;
}
- walfile = -1;
-
- /*
- * If we finished writing a .partial file, rename it into place.
- */
- if (currpos == XLOG_SEG_SIZE && stream->partial_suffix)
- {
- char oldfn[MAXPGPATH];
- char newfn[MAXPGPATH];
-
- snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix);
- snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name);
- if (durable_rename(oldfn, newfn, progname) != 0)
- {
- /* durable_rename produced a log entry */
- return false;
- }
- }
- else if (stream->partial_suffix)
- fprintf(stderr,
- _("%s: not renaming \"%s%s\", segment is not complete\n"),
- progname, current_walfile_name, stream->partial_suffix);
/*
* Mark file as archived if requested by the caller - pg_basebackup needs
if (currpos == XLOG_SEG_SIZE && stream->mark_done)
{
/* writes error message if failed */
- if (!mark_file_as_archived(stream->basedir, current_walfile_name,
- stream->do_sync))
+ if (!mark_file_as_archived(stream, current_walfile_name))
return false;
}
static bool
existsTimeLineHistoryFile(StreamCtl *stream)
{
- char path[MAXPGPATH];
char histfname[MAXFNAMELEN];
- int fd;
/*
* Timeline 1 never has a history file. We treat that as if it existed,
TLHistoryFileName(histfname, stream->timeline);
- snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
-
- fd = open(path, O_RDONLY | PG_BINARY, 0);
- if (fd < 0)
- {
- if (errno != ENOENT)
- fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s\n"),
- progname, path, strerror(errno));
- return false;
- }
- else
- {
- close(fd);
- return true;
- }
+ return stream->walmethod->existsfile(histfname);
}
static bool
writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
{
int size = strlen(content);
- char path[MAXPGPATH];
- char tmppath[MAXPGPATH];
char histfname[MAXFNAMELEN];
- int fd;
+ Walfile *f;
/*
* Check that the server's idea of how timeline history files should be
return false;
}
- snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
-
- /*
- * Write into a temp file name.
- */
- snprintf(tmppath, MAXPGPATH, "%s.tmp", path);
-
- unlink(tmppath);
-
- fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
- if (fd < 0)
+ f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
+ if (f == NULL)
{
fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"),
- progname, tmppath, strerror(errno));
+ progname, histfname, stream->walmethod->getlasterror());
return false;
}
- errno = 0;
- if ((int) write(fd, content, size) != size)
+ if ((int) stream->walmethod->write(f, content, size) != size)
{
- int save_errno = errno;
+ fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
+ progname, histfname, stream->walmethod->getlasterror());
/*
* If we fail to make the file, delete it to release disk space
*/
- close(fd);
- unlink(tmppath);
- errno = save_errno;
+ stream->walmethod->close(f, CLOSE_UNLINK);
- fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
- progname, tmppath, strerror(errno));
return false;
}
- if (close(fd) != 0)
+ if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
{
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
- progname, tmppath, strerror(errno));
- return false;
- }
-
- /*
- * Now move the completed history file into place with its final name.
- */
- if (durable_rename(tmppath, path, progname) < 0)
- {
- /* durable_rename produced a log entry */
+ progname, histfname, stream->walmethod->getlasterror());
return false;
}
if (stream->mark_done)
{
/* writes error message if failed */
- if (!mark_file_as_archived(stream->basedir, histfname,
- stream->do_sync))
+ if (!mark_file_as_archived(stream, histfname))
return false;
}
{
/*
* Fetch the timeline history file for this timeline, if we don't have
- * it already.
+ * it already. When streaming log to tar, this will always return
+ * false, as we are never streaming into an existing file and
+ * therefore there can be no pre-existing timeline history file.
*/
if (!existsTimeLineHistoryFile(stream))
{
}
error:
- if (walfile != -1 && close(walfile) != 0)
+ if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NORMAL) != 0)
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
- progname, current_walfile_name, strerror(errno));
- walfile = -1;
+ progname, current_walfile_name, stream->walmethod->getlasterror());
+ walfile = NULL;
return false;
}
* If synchronous option is true, issue sync command as soon as there
* are WAL data which has not been flushed yet.
*/
- if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1)
+ if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
{
- if (stream->do_sync && fsync(walfile) != 0)
+ if (stream->walmethod->fsync(walfile) != 0)
{
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
- progname, current_walfile_name, strerror(errno));
+ progname, current_walfile_name, stream->walmethod->getlasterror());
goto error;
}
lastFlushPosition = blockpos;
if (replyRequested && still_sending)
{
if (reportFlushPosition && lastFlushPosition < blockpos &&
- walfile != -1)
+ walfile != NULL)
{
/*
* If a valid flush location needs to be reported, flush the
* data has been successfully replicated or not, at the normal
* shutdown of the server.
*/
- if (stream->do_sync && fsync(walfile) != 0)
+ if (stream->walmethod->fsync(walfile) != 0)
{
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
- progname, current_walfile_name, strerror(errno));
+ progname, current_walfile_name, stream->walmethod->getlasterror());
return false;
}
lastFlushPosition = blockpos;
* Verify that the initial location in the stream matches where we think
* we are.
*/
- if (walfile == -1)
+ if (walfile == NULL)
{
/* No file open yet */
if (xlogoff != 0)
else
{
/* More data in existing segment */
- /* XXX: store seek value don't reseek all the time */
- if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+ if (stream->walmethod->get_current_pos(walfile) != xlogoff)
{
fprintf(stderr,
_("%s: got WAL data offset %08x, expected %08x\n"),
- progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+ progname, xlogoff, (int) stream->walmethod->get_current_pos(walfile));
return false;
}
}
else
bytes_to_write = bytes_left;
- if (walfile == -1)
+ if (walfile == NULL)
{
if (!open_walfile(stream, *blockpos))
{
}
}
- if (write(walfile,
- copybuf + hdr_len + bytes_written,
- bytes_to_write) != bytes_to_write)
+ if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
+ bytes_to_write) != bytes_to_write)
{
fprintf(stderr,
_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
progname, bytes_to_write, current_walfile_name,
- strerror(errno));
+ stream->walmethod->getlasterror());
return false;
}
#define RECEIVELOG_H
#include "libpq-fe.h"
+#include "walmethods.h"
#include "access/xlogdefs.h"
stream_stop_callback stream_stop; /* Stop streaming when returns true */
- char *basedir; /* Received segments written to this dir */
+ WalWriteMethod *walmethod; /* How to write the WAL */
char *partial_suffix; /* Suffix appended to partially received files */
} StreamCtl;
use Config;
use PostgresNode;
use TestLib;
-use Test::More tests => 67;
+use Test::More tests => 69;
program_help_ok('pg_basebackup');
program_version_ok('pg_basebackup');
'pg_basebackup -X stream runs');
ok(grep(/^[0-9A-F]{24}$/, slurp_dir("$tempdir/backupxf/pg_wal")),
'WAL files copied');
+$node->command_ok(
+ [ 'pg_basebackup', '-D', "$tempdir/backupxst", '-X', 'stream', '-Ft' ],
+ 'pg_basebackup -X stream runs in tar mode');
+ok(-f "$tempdir/backupxst/pg_wal.tar", "tar file was created");
$node->command_fails(
[ 'pg_basebackup', '-D', "$tempdir/fail", '-S', 'slot1' ],
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * walmethods.c - implementations of different ways to write received wal
+ *
+ * NOTE! The caller must ensure that only one method is instantiated in
+ * any given program, and that it's only instantiated once!
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/walmethods.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <sys/stat.h>
+#include <time.h>
+#include <unistd.h>
+#ifdef HAVE_LIBZ
+#include <zlib.h>
+#endif
+
+#include "pgtar.h"
+#include "common/file_utils.h"
+
+#include "receivelog.h"
+#include "streamutil.h"
+
+/* Size of zlib buffer for .tar.gz */
+#define ZLIB_OUT_SIZE 4096
+
+/*-------------------------------------------------------------------------
+ * WalDirectoryMethod - write wal to a directory looking like pg_xlog
+ *-------------------------------------------------------------------------
+ */
+
+/*
+ * Global static data for this method
+ */
+typedef struct DirectoryMethodData
+{
+ char *basedir;
+ bool sync;
+} DirectoryMethodData;
+static DirectoryMethodData *dir_data = NULL;
+
+/*
+ * Local file handle
+ */
+typedef struct DirectoryMethodFile
+{
+ int fd;
+ off_t currpos;
+ char *pathname;
+ char *fullpath;
+ char *temp_suffix;
+} DirectoryMethodFile;
+
+static char *
+dir_getlasterror(void)
+{
+ /* Directory method always sets errno, so just use strerror */
+ return strerror(errno);
+}
+
+static Walfile
+dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+{
+ static char tmppath[MAXPGPATH];
+ int fd;
+ DirectoryMethodFile *f;
+
+ snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
+ dir_data->basedir, pathname, temp_suffix ? temp_suffix : "");
+
+ fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ return NULL;
+
+ if (pad_to_size)
+ {
+ /* Always pre-pad on regular files */
+ char *zerobuf;
+ int bytes;
+
+ zerobuf = pg_malloc0(XLOG_BLCKSZ);
+ for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
+ {
+ if (write(fd, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+ {
+ int save_errno = errno;
+
+ pg_free(zerobuf);
+ close(fd);
+ errno = save_errno;
+ return NULL;
+ }
+ }
+ pg_free(zerobuf);
+
+ if (lseek(fd, 0, SEEK_SET) != 0)
+ {
+ int save_errno = errno;
+
+ close(fd);
+ errno = save_errno;
+ return NULL;
+ }
+ }
+
+ /*
+ * fsync WAL file and containing directory, to ensure the file is
+ * persistently created and zeroed (if padded). That's particularly
+ * important when using synchronous mode, where the file is modified and
+ * fsynced in-place, without a directory fsync.
+ */
+ if (dir_data->sync)
+ {
+ if (fsync_fname(tmppath, false, progname) != 0 ||
+ fsync_parent_path(tmppath, progname) != 0)
+ {
+ close(fd);
+ return NULL;
+ }
+ }
+
+ f = pg_malloc0(sizeof(DirectoryMethodFile));
+ f->fd = fd;
+ f->currpos = 0;
+ f->pathname = pg_strdup(pathname);
+ f->fullpath = pg_strdup(tmppath);
+ if (temp_suffix)
+ f->temp_suffix = pg_strdup(temp_suffix);
+
+ return f;
+}
+
+static ssize_t
+dir_write(Walfile f, const void *buf, size_t count)
+{
+ ssize_t r;
+ DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+
+ Assert(f != NULL);
+
+ r = write(df->fd, buf, count);
+ if (r > 0)
+ df->currpos += r;
+ return r;
+}
+
+static off_t
+dir_get_current_pos(Walfile f)
+{
+ Assert(f != NULL);
+
+ /* Use a cached value to prevent lots of reseeks */
+ return ((DirectoryMethodFile *) f)->currpos;
+}
+
+static int
+dir_close(Walfile f, WalCloseMethod method)
+{
+ int r;
+ DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+ static char tmppath[MAXPGPATH];
+ static char tmppath2[MAXPGPATH];
+
+ Assert(f != NULL);
+
+ r = close(df->fd);
+
+ if (r == 0)
+ {
+ /* Build path to the current version of the file */
+ if (method == CLOSE_NORMAL && df->temp_suffix)
+ {
+ /*
+ * If we have a temp prefix, normal operation is to rename the
+ * file.
+ */
+ snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
+ dir_data->basedir, df->pathname, df->temp_suffix);
+ snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
+ dir_data->basedir, df->pathname);
+ r = durable_rename(tmppath, tmppath2, progname);
+ }
+ else if (method == CLOSE_UNLINK)
+ {
+ /* Unlink the file once it's closed */
+ snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
+ dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : "");
+ r = unlink(tmppath);
+ }
+ else
+ {
+ /*
+ * Else either CLOSE_NORMAL and no temp suffix, or
+ * CLOSE_NO_RENAME. In this case, fsync the file and containing
+ * directory if sync mode is requested.
+ */
+ if (dir_data->sync)
+ {
+ r = fsync_fname(df->fullpath, false, progname);
+ if (r == 0)
+ r = fsync_parent_path(df->fullpath, progname);
+ }
+ }
+ }
+
+ pg_free(df->pathname);
+ pg_free(df->fullpath);
+ if (df->temp_suffix)
+ pg_free(df->temp_suffix);
+ pg_free(df);
+
+ return r;
+}
+
+static int
+dir_fsync(Walfile f)
+{
+ Assert(f != NULL);
+
+ if (!dir_data->sync)
+ return 0;
+
+ return fsync(((DirectoryMethodFile *) f)->fd);
+}
+
+static ssize_t
+dir_get_file_size(const char *pathname)
+{
+ struct stat statbuf;
+ static char tmppath[MAXPGPATH];
+
+ snprintf(tmppath, sizeof(tmppath), "%s/%s",
+ dir_data->basedir, pathname);
+
+ if (stat(tmppath, &statbuf) != 0)
+ return -1;
+
+ return statbuf.st_size;
+}
+
+static bool
+dir_existsfile(const char *pathname)
+{
+ static char tmppath[MAXPGPATH];
+ int fd;
+
+ snprintf(tmppath, sizeof(tmppath), "%s/%s",
+ dir_data->basedir, pathname);
+
+ fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
+ if (fd < 0)
+ return false;
+ close(fd);
+ return true;
+}
+
+static bool
+dir_finish(void)
+{
+ if (dir_data->sync)
+ {
+ /*
+ * Files are fsynced when they are closed, but we need to fsync the
+ * directory entry here as well.
+ */
+ if (fsync_fname(dir_data->basedir, true, progname) != 0)
+ return false;
+ }
+ return true;
+}
+
+
+WalWriteMethod *
+CreateWalDirectoryMethod(const char *basedir, bool sync)
+{
+ WalWriteMethod *method;
+
+ method = pg_malloc0(sizeof(WalWriteMethod));
+ method->open_for_write = dir_open_for_write;
+ method->write = dir_write;
+ method->get_current_pos = dir_get_current_pos;
+ method->get_file_size = dir_get_file_size;
+ method->close = dir_close;
+ method->fsync = dir_fsync;
+ method->existsfile = dir_existsfile;
+ method->finish = dir_finish;
+ method->getlasterror = dir_getlasterror;
+
+ dir_data = pg_malloc0(sizeof(DirectoryMethodData));
+ dir_data->basedir = pg_strdup(basedir);
+ dir_data->sync = sync;
+
+ return method;
+}
+
+
+/*-------------------------------------------------------------------------
+ * WalTarMethod - write wal to a tar file containing pg_xlog contents
+ *-------------------------------------------------------------------------
+ */
+
+typedef struct TarMethodFile
+{
+ off_t ofs_start; /* Where does the *header* for this file start */
+ off_t currpos;
+ char header[512];
+ char *pathname;
+ size_t pad_to_size;
+} TarMethodFile;
+
+typedef struct TarMethodData
+{
+ char *tarfilename;
+ int fd;
+ int compression;
+ bool sync;
+ TarMethodFile *currentfile;
+ char lasterror[1024];
+#ifdef HAVE_LIBZ
+ z_streamp zp;
+ void *zlibOut;
+#endif
+} TarMethodData;
+static TarMethodData *tar_data = NULL;
+
+#define tar_clear_error() tar_data->lasterror[0] = '\0'
+#define tar_set_error(msg) strlcpy(tar_data->lasterror, msg, sizeof(tar_data->lasterror))
+
+static char *
+tar_getlasterror(void)
+{
+ /*
+ * If a custom error is set, return that one. Otherwise, assume errno is
+ * set and return that one.
+ */
+ if (tar_data->lasterror[0])
+ return tar_data->lasterror;
+ return strerror(errno);
+}
+
+#ifdef HAVE_LIBZ
+static bool
+tar_write_compressed_data(void *buf, size_t count, bool flush)
+{
+ tar_data->zp->next_in = buf;
+ tar_data->zp->avail_in = count;
+
+ while (tar_data->zp->avail_in || flush)
+ {
+ int r;
+
+ r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
+ if (r == Z_STREAM_ERROR)
+ {
+ tar_set_error("deflate failed");
+ return false;
+ }
+
+ if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
+ {
+ size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
+
+ if (write(tar_data->fd, tar_data->zlibOut, len) != len)
+ return false;
+
+ tar_data->zp->next_out = tar_data->zlibOut;
+ tar_data->zp->avail_out = ZLIB_OUT_SIZE;
+ }
+
+ if (r == Z_STREAM_END)
+ break;
+ }
+
+ if (flush)
+ {
+ /* Reset the stream for writing */
+ if (deflateReset(tar_data->zp) != Z_OK)
+ {
+ tar_set_error("deflateReset failed");
+ return false;
+ }
+ }
+
+ return true;
+}
+#endif
+
+static ssize_t
+tar_write(Walfile f, const void *buf, size_t count)
+{
+ ssize_t r;
+
+ Assert(f != NULL);
+ tar_clear_error();
+
+ /* Tarfile will always be positioned at the end */
+ if (!tar_data->compression)
+ {
+ r = write(tar_data->fd, buf, count);
+ if (r > 0)
+ ((TarMethodFile *) f)->currpos += r;
+ return r;
+ }
+#ifdef HAVE_LIBZ
+ else
+ {
+ if (!tar_write_compressed_data((void *) buf, count, false))
+ return -1;
+ ((TarMethodFile *) f)->currpos += count;
+ return count;
+ }
+#endif
+}
+
+static bool
+tar_write_padding_data(TarMethodFile * f, size_t bytes)
+{
+ char *zerobuf = pg_malloc0(XLOG_BLCKSZ);
+ size_t bytesleft = bytes;
+
+ while (bytesleft)
+ {
+ size_t bytestowrite = bytesleft > XLOG_BLCKSZ ? XLOG_BLCKSZ : bytesleft;
+
+ size_t r = tar_write(f, zerobuf, bytestowrite);
+
+ if (r < 0)
+ return false;
+ bytesleft -= r;
+ }
+ return true;
+}
+
+static Walfile
+tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+{
+ int save_errno;
+ static char tmppath[MAXPGPATH];
+
+ tar_clear_error();
+
+ if (tar_data->fd < 0)
+ {
+ /*
+ * We open the tar file only when we first try to write to it.
+ */
+ tar_data->fd = open(tar_data->tarfilename,
+ O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+ if (tar_data->fd < 0)
+ return NULL;
+
+#ifdef HAVE_LIBZ
+ if (tar_data->compression)
+ {
+ tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
+ tar_data->zp->zalloc = Z_NULL;
+ tar_data->zp->zfree = Z_NULL;
+ tar_data->zp->opaque = Z_NULL;
+ tar_data->zp->next_out = tar_data->zlibOut;
+ tar_data->zp->avail_out = ZLIB_OUT_SIZE;
+
+ /*
+ * Initialize deflation library. Adding the magic value 16 to the
+ * default 15 for the windowBits parameter makes the output be
+ * gzip instead of zlib.
+ */
+ if (deflateInit2(tar_data->zp, tar_data->compression, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
+ {
+ pg_free(tar_data->zp);
+ tar_data->zp = NULL;
+ tar_set_error("deflateInit2 failed");
+ return NULL;
+ }
+ }
+#endif
+
+ /* There's no tar header itself, the file starts with regular files */
+ }
+
+ Assert(tar_data->currentfile == NULL);
+ if (tar_data->currentfile != NULL)
+ {
+ tar_set_error("implementation error: tar files can't have more than one open file\n");
+ return NULL;
+ }
+
+ tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
+
+ snprintf(tmppath, sizeof(tmppath), "%s%s",
+ pathname, temp_suffix ? temp_suffix : "");
+
+ /* Create a header with size set to 0 - we will fill out the size on close */
+ if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
+ {
+ pg_free(tar_data->currentfile);
+ tar_data->currentfile = NULL;
+ tar_set_error("could not create tar header");
+ return NULL;
+ }
+
+#ifdef HAVE_LIBZ
+ if (tar_data->compression)
+ {
+ /* Flush existing data */
+ if (!tar_write_compressed_data(NULL, 0, true))
+ return NULL;
+
+ /* Turn off compression for header */
+ if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
+ {
+ tar_set_error("deflateParams failed");
+ return NULL;
+ }
+ }
+#endif
+
+ tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
+ if (tar_data->currentfile->ofs_start == -1)
+ {
+ save_errno = errno;
+ pg_free(tar_data->currentfile);
+ tar_data->currentfile = NULL;
+ errno = save_errno;
+ return NULL;
+ }
+ tar_data->currentfile->currpos = 0;
+
+ if (!tar_data->compression)
+ {
+ if (write(tar_data->fd, tar_data->currentfile->header, 512) != 512)
+ {
+ save_errno = errno;
+ pg_free(tar_data->currentfile);
+ tar_data->currentfile = NULL;
+ errno = save_errno;
+ return NULL;
+ }
+ }
+#ifdef HAVE_LIBZ
+ else
+ {
+ /* Write header through the zlib APIs but with no compression */
+ if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
+ return NULL;
+
+ /* Re-enable compression for the rest of the file */
+ if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
+ {
+ tar_set_error("deflateParams failed");
+ return NULL;
+ }
+ }
+#endif
+
+ tar_data->currentfile->pathname = pg_strdup(pathname);
+
+ /*
+ * Uncompressed files are padded on creation, but for compression we can't
+ * do that
+ */
+ if (pad_to_size)
+ {
+ tar_data->currentfile->pad_to_size = pad_to_size;
+ if (!tar_data->compression)
+ {
+ /* Uncompressed, so pad now */
+ tar_write_padding_data(tar_data->currentfile, pad_to_size);
+ /* Seek back to start */
+ if (lseek(tar_data->fd, tar_data->currentfile->ofs_start + 512, SEEK_SET) != tar_data->currentfile->ofs_start + 512)
+ return NULL;
+
+ tar_data->currentfile->currpos = 0;
+ }
+ }
+
+ return tar_data->currentfile;
+}
+
+static ssize_t
+tar_get_file_size(const char *pathname)
+{
+ tar_clear_error();
+
+ /* Currently not used, so not supported */
+ errno = ENOSYS;
+ return -1;
+}
+
+static off_t
+tar_get_current_pos(Walfile f)
+{
+ Assert(f != NULL);
+ tar_clear_error();
+
+ return ((TarMethodFile *) f)->currpos;
+}
+
+static int
+tar_fsync(Walfile f)
+{
+ Assert(f != NULL);
+ tar_clear_error();
+
+ /*
+ * Always sync the whole tarfile, because that's all we can do. This makes
+ * no sense on compressed files, so just ignore those.
+ */
+ if (tar_data->compression)
+ return 0;
+
+ return fsync(tar_data->fd);
+}
+
+static int
+tar_close(Walfile f, WalCloseMethod method)
+{
+ ssize_t filesize;
+ int padding;
+ TarMethodFile *tf = (TarMethodFile *) f;
+
+ Assert(f != NULL);
+ tar_clear_error();
+
+ if (method == CLOSE_UNLINK)
+ {
+ if (tar_data->compression)
+ {
+ tar_set_error("unlink not supported with compression");
+ return -1;
+ }
+
+ /*
+ * Unlink the file that we just wrote to the tar. We do this by
+ * truncating it to the start of the header. This is safe as we only
+ * allow writing of the very last file.
+ */
+ if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
+ return -1;
+
+ pg_free(tf->pathname);
+ pg_free(tf);
+ tar_data->currentfile = NULL;
+
+ return 0;
+ }
+
+ /*
+ * Pad the file itself with zeroes if necessary. Note that this is
+ * different from the tar format padding -- this is the padding we asked
+ * for when the file was opened.
+ */
+ if (tf->pad_to_size)
+ {
+ if (tar_data->compression)
+ {
+ /*
+ * A compressed tarfile is padded on close since we cannot know
+ * the size of the compressed output until the end.
+ */
+ size_t sizeleft = tf->pad_to_size - tf->currpos;
+
+ if (sizeleft)
+ {
+ if (!tar_write_padding_data(tf, sizeleft))
+ return -1;
+ }
+ }
+ else
+ {
+ /*
+ * An uncompressed tarfile was padded on creation, so just adjust
+ * the current position as if we seeked to the end.
+ */
+ tf->currpos = tf->pad_to_size;
+ }
+ }
+
+ /*
+ * Get the size of the file, and pad the current data up to the nearest
+ * 512 byte boundary.
+ */
+ filesize = tar_get_current_pos(f);
+ padding = ((filesize + 511) & ~511) - filesize;
+ if (padding)
+ {
+ char zerobuf[512];
+
+ MemSet(zerobuf, 0, padding);
+ if (tar_write(f, zerobuf, padding) != padding)
+ return -1;
+ }
+
+
+#ifdef HAVE_LIBZ
+ if (tar_data->compression)
+ {
+ /* Flush the current buffer */
+ if (!tar_write_compressed_data(NULL, 0, true))
+ {
+ errno = EINVAL;
+ return -1;
+ }
+ }
+#endif
+
+ /*
+ * Now go back and update the header with the correct filesize and
+ * possibly also renaming the file. We overwrite the entire current header
+ * when done, including the checksum.
+ */
+ print_tar_number(&(tf->header[124]), 12, filesize);
+
+ if (method == CLOSE_NORMAL)
+
+ /*
+ * We overwrite it with what it was before if we have no tempname,
+ * since we're going to write the buffer anyway.
+ */
+ strlcpy(&(tf->header[0]), tf->pathname, 100);
+
+ print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
+ if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
+ return -1;
+ if (!tar_data->compression)
+ {
+ if (write(tar_data->fd, tf->header, 512) != 512)
+ return -1;
+ }
+#ifdef HAVE_LIBZ
+ else
+ {
+ /* Turn off compression */
+ if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
+ {
+ tar_set_error("deflateParams failed");
+ return -1;
+ }
+
+ /* Overwrite the header, assuming the size will be the same */
+ if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
+ return -1;
+
+ /* Turn compression back on */
+ if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
+ {
+ tar_set_error("deflateParams failed");
+ return -1;
+ }
+ }
+#endif
+
+ /* Move file pointer back down to end, so we can write the next file */
+ if (lseek(tar_data->fd, 0, SEEK_END) < 0)
+ return -1;
+
+ /* Always fsync on close, so the padding gets fsynced */
+ tar_fsync(f);
+
+ /* Clean up and done */
+ pg_free(tf->pathname);
+ pg_free(tf);
+ tar_data->currentfile = NULL;
+
+ return 0;
+}
+
+static bool
+tar_existsfile(const char *pathname)
+{
+ tar_clear_error();
+ /* We only deal with new tarfiles, so nothing externally created exists */
+ return false;
+}
+
+static bool
+tar_finish(void)
+{
+ char zerobuf[1024];
+
+ tar_clear_error();
+
+ if (tar_data->currentfile)
+ {
+ if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0)
+ return false;
+ }
+
+ /* A tarfile always ends with two empty blocks */
+ MemSet(zerobuf, 0, sizeof(zerobuf));
+ if (!tar_data->compression)
+ {
+ if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
+ return false;
+ }
+#ifdef HAVE_LIBZ
+ else
+ {
+ if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
+ return false;
+
+ /* Also flush all data to make sure the gzip stream is finished */
+ tar_data->zp->next_in = NULL;
+ tar_data->zp->avail_in = 0;
+ while (true)
+ {
+ int r;
+
+ r = deflate(tar_data->zp, Z_FINISH);
+
+ if (r == Z_STREAM_ERROR)
+ {
+ tar_set_error("deflate failed");
+ return false;
+ }
+ if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
+ {
+ size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
+
+ if (write(tar_data->fd, tar_data->zlibOut, len) != len)
+ return false;
+ }
+ if (r == Z_STREAM_END)
+ break;
+ }
+
+ if (deflateEnd(tar_data->zp) != Z_OK)
+ {
+ tar_set_error("deflateEnd failed");
+ return false;
+ }
+ }
+#endif
+
+ /* sync the empty blocks as well, since they're after the last file */
+ fsync(tar_data->fd);
+
+ if (close(tar_data->fd) != 0)
+ return false;
+
+ tar_data->fd = -1;
+
+ if (tar_data->sync)
+ {
+ if (fsync_fname(tar_data->tarfilename, false, progname) != 0)
+ return false;
+ if (fsync_parent_path(tar_data->tarfilename, progname) != 0)
+ return false;
+ }
+
+ return true;
+}
+
+WalWriteMethod *
+CreateWalTarMethod(const char *tarbase, int compression, bool sync)
+{
+ WalWriteMethod *method;
+ const char *suffix = (compression != 0) ? ".tar.gz" : ".tar";
+
+ method = pg_malloc0(sizeof(WalWriteMethod));
+ method->open_for_write = tar_open_for_write;
+ method->write = tar_write;
+ method->get_current_pos = tar_get_current_pos;
+ method->get_file_size = tar_get_file_size;
+ method->close = tar_close;
+ method->fsync = tar_fsync;
+ method->existsfile = tar_existsfile;
+ method->finish = tar_finish;
+ method->getlasterror = tar_getlasterror;
+
+ tar_data = pg_malloc0(sizeof(TarMethodData));
+ tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
+ sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
+ tar_data->fd = -1;
+ tar_data->compression = compression;
+ tar_data->sync = sync;
+ if (compression)
+ tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
+
+ return method;
+}
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * walmethods.h
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/walmethods.h
+ *-------------------------------------------------------------------------
+ */
+
+
+typedef void *Walfile;
+
+typedef enum
+{
+ CLOSE_NORMAL,
+ CLOSE_UNLINK,
+ CLOSE_NO_RENAME,
+} WalCloseMethod;
+
+typedef struct WalWriteMethod WalWriteMethod;
+struct WalWriteMethod
+{
+ Walfile(*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size);
+ int (*close) (Walfile f, WalCloseMethod method);
+ bool (*existsfile) (const char *pathname);
+ ssize_t (*get_file_size) (const char *pathname);
+
+ ssize_t (*write) (Walfile f, const void *buf, size_t count);
+ off_t (*get_current_pos) (Walfile f);
+ int (*fsync) (Walfile f);
+ bool (*finish) (void);
+ char *(*getlasterror) (void);
+};
+
+/*
+ * Available WAL methods:
+ * - WalDirectoryMethod - write WAL to regular files in a standard pg_xlog
+ * - TarDirectoryMethod - write WAL to a tarfile corresponding to pg_xlog
+ * (only implements the methods required for pg_basebackup,
+ * not all those required for pg_receivexlog)
+ */
+WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, bool sync);
+WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync);
extern enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget,
pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime);
extern uint64 read_tar_number(const char *s, int len);
+extern void print_tar_number(char *s, int len, uint64 val);
extern int tarChecksum(char *header);
* support only non-negative numbers, so we don't worry about the GNU rules
* for handling negative numbers.)
*/
-static void
+void
print_tar_number(char *s, int len, uint64 val)
{
if (val < (((uint64) 1) << ((len - 1) * 3)))