From: Magnus Hagander Date: Sun, 23 Oct 2016 13:16:31 +0000 (+0200) Subject: Allow pg_basebackup to stream transaction log in tar mode X-Git-Tag: REL_10_BETA1~1525 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=56c7d8d;p=postgresql Allow pg_basebackup to stream transaction log in tar mode This will write the received transaction log into a file called pg_wal.tar(.gz) next to the other tarfiles instead of writing it to base.tar. When using fetch mode, the transaction log is still written to base.tar like before, and when used against a pre-10 server, the file is named pg_xlog.tar. To do this, implement a new concept of a "walmethod", which is responsible for writing the WAL. Two implementations exist, one that writes to a plain directory (which is also used by pg_receivexlog) and one that writes to a tar file with optional compression. Reviewed by Michael Paquier --- diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index 7cb690dded..e66a7ae8ee 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -180,7 +180,8 @@ PostgreSQL documentation target directory, the tar contents will be written to standard output, suitable for piping to for example gzip. This is only possible if - the cluster has no additional tablespaces. + the cluster has no additional tablespaces and transaction + log streaming is not used. @@ -323,6 +324,10 @@ PostgreSQL documentation If the log has been rotated when it's time to transfer it, the backup will fail and be unusable. + + The transaction log files will be written to + the base.tar file. + @@ -339,6 +344,11 @@ PostgreSQL documentation client can keep up with transaction log received, using this mode requires no extra transaction logs to be saved on the master. + + The transaction log files are written to a separate file + named pg_wal.tar (if the server is a version + earlier than 10, the file will be named pg_xlog.tar). + @@ -353,7 +363,8 @@ PostgreSQL documentation Enables gzip compression of tar file output, with the default compression level. Compression is only available when using - the tar format. + the tar format, and the suffix .gz will + automatically be added to all tar filenames. @@ -366,7 +377,8 @@ PostgreSQL documentation Enables gzip compression of tar file output, and specifies the compression level (0 through 9, 0 being no compression and 9 being best compression). Compression is only available when using the tar - format. + format, and the suffix .gz will + automatically be added to all tar filenames. diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile index fa1ce8b24d..52ac9e9fb8 100644 --- a/src/bin/pg_basebackup/Makefile +++ b/src/bin/pg_basebackup/Makefile @@ -19,7 +19,7 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) LDFLAGS += -L$(top_builddir)/src/fe_utils -lpgfeutils -lpq -OBJS=receivelog.o streamutil.o $(WIN32RES) +OBJS=receivelog.o streamutil.o walmethods.o $(WIN32RES) all: pg_basebackup pg_receivexlog pg_recvlogical diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index b82b8e1b26..16cab978d0 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -449,7 +449,7 @@ typedef struct { PGconn *bgconn; XLogRecPtr startptr; - char xlogdir[MAXPGPATH]; + char xlog[MAXPGPATH]; /* directory or tarfile depending on mode */ char *sysidentifier; int timeline; } logstreamer_param; @@ -470,9 +470,13 @@ LogStreamerMain(logstreamer_param *param) stream.synchronous = false; stream.do_sync = do_sync; stream.mark_done = true; - stream.basedir = param->xlogdir; stream.partial_suffix = NULL; + if (format == 'p') + stream.walmethod = CreateWalDirectoryMethod(param->xlog, do_sync); + else + stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync); + if (!ReceiveXlogStream(param->bgconn, &stream)) /* @@ -482,6 +486,14 @@ LogStreamerMain(logstreamer_param *param) */ return 1; + if (!stream.walmethod->finish()) + { + fprintf(stderr, + _("%s: could not finish writing WAL files: %s\n"), + progname, strerror(errno)); + return 1; + } + PQfinish(param->bgconn); return 0; } @@ -533,28 +545,32 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) exit(1); /* In post-10 cluster, pg_xlog has been renamed to pg_wal */ - snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/%s", + snprintf(param->xlog, sizeof(param->xlog), "%s/%s", basedir, PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ? "pg_xlog" : "pg_wal"); - /* - * Create pg_wal/archive_status or pg_xlog/archive_status (and thus - * pg_wal or pg_xlog) depending on the target server so we can write to - * basedir/pg_wal or basedir/pg_xlog as the directory entry in the tar - * file may arrive later. - */ - snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status", - basedir, - PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ? - "pg_xlog" : "pg_wal"); - if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST) + if (format == 'p') { - fprintf(stderr, - _("%s: could not create directory \"%s\": %s\n"), - progname, statusdir, strerror(errno)); - disconnect_and_exit(1); + /* + * Create pg_wal/archive_status or pg_xlog/archive_status (and thus + * pg_wal or pg_xlog) depending on the target server so we can write to + * basedir/pg_wal or basedir/pg_xlog as the directory entry in the tar + * file may arrive later. + */ + snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status", + basedir, + PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ? + "pg_xlog" : "pg_wal"); + + if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST) + { + fprintf(stderr, + _("%s: could not create directory \"%s\": %s\n"), + progname, statusdir, strerror(errno)); + disconnect_and_exit(1); + } } /* @@ -2245,16 +2261,6 @@ main(int argc, char **argv) exit(1); } - if (format != 'p' && streamwal) - { - fprintf(stderr, - _("%s: WAL streaming can only be used in plain mode\n"), - progname); - fprintf(stderr, _("Try \"%s --help\" for more information.\n"), - progname); - exit(1); - } - if (replication_slot && !streamwal) { fprintf(stderr, diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index a58a251a59..bbdf96edfd 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -338,11 +338,19 @@ StreamLog(void) stream.synchronous = synchronous; stream.do_sync = true; stream.mark_done = false; - stream.basedir = basedir; + stream.walmethod = CreateWalDirectoryMethod(basedir, stream.do_sync); stream.partial_suffix = ".partial"; ReceiveXlogStream(conn, &stream); + if (!stream.walmethod->finish()) + { + fprintf(stderr, + _("%s: could not finish writing WAL files: %s\n"), + progname, strerror(errno)); + return; + } + PQfinish(conn); conn = NULL; } diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index b0fa916b44..fcd0269473 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -30,7 +30,7 @@ /* fd and filename for currently open WAL file */ -static int walfile = -1; +static Walfile *walfile = NULL; static char current_walfile_name[MAXPGPATH] = ""; static bool reportFlushPosition = false; static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; @@ -56,29 +56,23 @@ static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); static bool -mark_file_as_archived(const char *basedir, const char *fname, bool do_sync) +mark_file_as_archived(StreamCtl *stream, const char *fname) { - int fd; + Walfile *f; static char tmppath[MAXPGPATH]; - snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done", - basedir, fname); + snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done", + fname); - fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); - if (fd < 0) + f = stream->walmethod->open_for_write(tmppath, NULL, 0); + if (f == NULL) { fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"), - progname, tmppath, strerror(errno)); + progname, tmppath, stream->walmethod->getlasterror()); return false; } - close(fd); - - if (do_sync && fsync_fname(tmppath, false, progname) != 0) - return false; - - if (do_sync && fsync_parent_path(tmppath, progname) != 0) - return false; + stream->walmethod->close(f, CLOSE_NORMAL); return true; } @@ -95,121 +89,82 @@ mark_file_as_archived(const char *basedir, const char *fname, bool do_sync) static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint) { - int f; + Walfile *f; char fn[MAXPGPATH]; - struct stat statbuf; - char *zerobuf; - int bytes; + ssize_t size; XLogSegNo segno; XLByteToSeg(startpoint, segno); XLogFileName(current_walfile_name, stream->timeline, segno); - snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name, + snprintf(fn, sizeof(fn), "%s%s", current_walfile_name, stream->partial_suffix ? stream->partial_suffix : ""); - f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); - if (f == -1) - { - fprintf(stderr, - _("%s: could not open transaction log file \"%s\": %s\n"), - progname, fn, strerror(errno)); - return false; - } /* - * Verify that the file is either empty (just created), or a complete - * XLogSegSize segment. Anything in between indicates a corrupt file. + * When streaming to files, if an existing file exists we verify that it's + * either empty (just created), or a complete XLogSegSize segment (in + * which case it has been created and padded). Anything else indicates a + * corrupt file. + * + * When streaming to tar, no file with this name will exist before, so we + * never have to verify a size. */ - if (fstat(f, &statbuf) != 0) + if (stream->walmethod->existsfile(fn)) { - fprintf(stderr, - _("%s: could not stat transaction log file \"%s\": %s\n"), - progname, fn, strerror(errno)); - close(f); - return false; - } - if (statbuf.st_size == XLogSegSize) - { - /* - * fsync, in case of a previous crash between padding and fsyncing the - * file. - */ - if (stream->do_sync) + size = stream->walmethod->get_file_size(fn); + if (size < 0) { - if (fsync_fname(fn, false, progname) != 0 || - fsync_parent_path(fn, progname) != 0) + fprintf(stderr, + _("%s: could not get size of transaction log file \"%s\": %s\n"), + progname, fn, stream->walmethod->getlasterror()); + return false; + } + if (size == XLogSegSize) + { + /* Already padded file. Open it for use */ + f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0); + if (f == NULL) { - /* error already printed */ - close(f); + fprintf(stderr, + _("%s: could not open existing transaction log file \"%s\": %s\n"), + progname, fn, stream->walmethod->getlasterror()); return false; } - } - /* File is open and ready to use */ - walfile = f; - return true; - } - if (statbuf.st_size != 0) - { - fprintf(stderr, - _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"), - progname, fn, (int) statbuf.st_size, XLogSegSize); - close(f); - return false; - } + /* fsync file in case of a previous crash */ + if (!stream->walmethod->fsync(f)) + { + stream->walmethod->close(f, CLOSE_UNLINK); + return false; + } - /* - * New, empty, file. So pad it to 16Mb with zeroes. If we fail partway - * through padding, we should attempt to unlink the file on failure, so as - * not to leave behind a partially-filled file. - */ - zerobuf = pg_malloc0(XLOG_BLCKSZ); - for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ) - { - errno = 0; - if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) + walfile = f; + return true; + } + if (size != 0) { /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) errno = ENOSPC; fprintf(stderr, - _("%s: could not pad transaction log file \"%s\": %s\n"), - progname, fn, strerror(errno)); - free(zerobuf); - close(f); - unlink(fn); + _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"), + progname, fn, (int) size, XLogSegSize); return false; } + /* File existed and was empty, so fall through and open */ } - free(zerobuf); - /* - * fsync WAL file and containing directory, to ensure the file is - * persistently created and zeroed. That's particularly important when - * using synchronous mode, where the file is modified and fsynced - * in-place, without a directory fsync. - */ - if (stream->do_sync) - { - if (fsync_fname(fn, false, progname) != 0 || - fsync_parent_path(fn, progname) != 0) - { - /* error already printed */ - close(f); - return false; - } - } + /* No file existed, so create one */ - if (lseek(f, SEEK_SET, 0) != 0) + f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, XLogSegSize); + if (f == NULL) { fprintf(stderr, - _("%s: could not seek to beginning of transaction log file \"%s\": %s\n"), - progname, fn, strerror(errno)); - close(f); + _("%s: could not open transaction log file \"%s\": %s\n"), + progname, fn, stream->walmethod->getlasterror()); return false; } - /* File is open and ready to use */ walfile = f; return true; } @@ -223,59 +178,46 @@ static bool close_walfile(StreamCtl *stream, XLogRecPtr pos) { off_t currpos; + int r; - if (walfile == -1) + if (walfile == NULL) return true; - currpos = lseek(walfile, 0, SEEK_CUR); + currpos = stream->walmethod->get_current_pos(walfile); if (currpos == -1) { fprintf(stderr, _("%s: could not determine seek position in file \"%s\": %s\n"), - progname, current_walfile_name, strerror(errno)); - close(walfile); - walfile = -1; + progname, current_walfile_name, stream->walmethod->getlasterror()); + stream->walmethod->close(walfile, CLOSE_UNLINK); + walfile = NULL; + return false; } - if (stream->do_sync && fsync(walfile) != 0) + if (stream->partial_suffix) { - fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), - progname, current_walfile_name, strerror(errno)); - close(walfile); - walfile = -1; - return false; + if (currpos == XLOG_SEG_SIZE) + r = stream->walmethod->close(walfile, CLOSE_NORMAL); + else + { + fprintf(stderr, + _("%s: not renaming \"%s%s\", segment is not complete\n"), + progname, current_walfile_name, stream->partial_suffix); + r = stream->walmethod->close(walfile, CLOSE_NO_RENAME); + } } + else + r = stream->walmethod->close(walfile, CLOSE_NORMAL); - if (close(walfile) != 0) + walfile = NULL; + + if (r != 0) { fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), - progname, current_walfile_name, strerror(errno)); - walfile = -1; + progname, current_walfile_name, stream->walmethod->getlasterror()); return false; } - walfile = -1; - - /* - * If we finished writing a .partial file, rename it into place. - */ - if (currpos == XLOG_SEG_SIZE && stream->partial_suffix) - { - char oldfn[MAXPGPATH]; - char newfn[MAXPGPATH]; - - snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix); - snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name); - if (durable_rename(oldfn, newfn, progname) != 0) - { - /* durable_rename produced a log entry */ - return false; - } - } - else if (stream->partial_suffix) - fprintf(stderr, - _("%s: not renaming \"%s%s\", segment is not complete\n"), - progname, current_walfile_name, stream->partial_suffix); /* * Mark file as archived if requested by the caller - pg_basebackup needs @@ -286,8 +228,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos) if (currpos == XLOG_SEG_SIZE && stream->mark_done) { /* writes error message if failed */ - if (!mark_file_as_archived(stream->basedir, current_walfile_name, - stream->do_sync)) + if (!mark_file_as_archived(stream, current_walfile_name)) return false; } @@ -302,9 +243,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos) static bool existsTimeLineHistoryFile(StreamCtl *stream) { - char path[MAXPGPATH]; char histfname[MAXFNAMELEN]; - int fd; /* * Timeline 1 never has a history file. We treat that as if it existed, @@ -315,31 +254,15 @@ existsTimeLineHistoryFile(StreamCtl *stream) TLHistoryFileName(histfname, stream->timeline); - snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname); - - fd = open(path, O_RDONLY | PG_BINARY, 0); - if (fd < 0) - { - if (errno != ENOENT) - fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s\n"), - progname, path, strerror(errno)); - return false; - } - else - { - close(fd); - return true; - } + return stream->walmethod->existsfile(histfname); } static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) { int size = strlen(content); - char path[MAXPGPATH]; - char tmppath[MAXPGPATH]; char histfname[MAXFNAMELEN]; - int fd; + Walfile *f; /* * Check that the server's idea of how timeline history files should be @@ -353,53 +276,31 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) return false; } - snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname); - - /* - * Write into a temp file name. - */ - snprintf(tmppath, MAXPGPATH, "%s.tmp", path); - - unlink(tmppath); - - fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); - if (fd < 0) + f = stream->walmethod->open_for_write(histfname, ".tmp", 0); + if (f == NULL) { fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"), - progname, tmppath, strerror(errno)); + progname, histfname, stream->walmethod->getlasterror()); return false; } - errno = 0; - if ((int) write(fd, content, size) != size) + if ((int) stream->walmethod->write(f, content, size) != size) { - int save_errno = errno; + fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"), + progname, histfname, stream->walmethod->getlasterror()); /* * If we fail to make the file, delete it to release disk space */ - close(fd); - unlink(tmppath); - errno = save_errno; + stream->walmethod->close(f, CLOSE_UNLINK); - fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"), - progname, tmppath, strerror(errno)); return false; } - if (close(fd) != 0) + if (stream->walmethod->close(f, CLOSE_NORMAL) != 0) { fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), - progname, tmppath, strerror(errno)); - return false; - } - - /* - * Now move the completed history file into place with its final name. - */ - if (durable_rename(tmppath, path, progname) < 0) - { - /* durable_rename produced a log entry */ + progname, histfname, stream->walmethod->getlasterror()); return false; } @@ -407,8 +308,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) if (stream->mark_done) { /* writes error message if failed */ - if (!mark_file_as_archived(stream->basedir, histfname, - stream->do_sync)) + if (!mark_file_as_archived(stream, histfname)) return false; } @@ -618,7 +518,9 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) { /* * Fetch the timeline history file for this timeline, if we don't have - * it already. + * it already. When streaming log to tar, this will always return + * false, as we are never streaming into an existing file and + * therefore there can be no pre-existing timeline history file. */ if (!existsTimeLineHistoryFile(stream)) { @@ -777,10 +679,10 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) } error: - if (walfile != -1 && close(walfile) != 0) + if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NORMAL) != 0) fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), - progname, current_walfile_name, strerror(errno)); - walfile = -1; + progname, current_walfile_name, stream->walmethod->getlasterror()); + walfile = NULL; return false; } @@ -864,12 +766,12 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, * If synchronous option is true, issue sync command as soon as there * are WAL data which has not been flushed yet. */ - if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1) + if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL) { - if (stream->do_sync && fsync(walfile) != 0) + if (stream->walmethod->fsync(walfile) != 0) { fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), - progname, current_walfile_name, strerror(errno)); + progname, current_walfile_name, stream->walmethod->getlasterror()); goto error; } lastFlushPosition = blockpos; @@ -1100,7 +1002,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, if (replyRequested && still_sending) { if (reportFlushPosition && lastFlushPosition < blockpos && - walfile != -1) + walfile != NULL) { /* * If a valid flush location needs to be reported, flush the @@ -1109,10 +1011,10 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, * data has been successfully replicated or not, at the normal * shutdown of the server. */ - if (stream->do_sync && fsync(walfile) != 0) + if (stream->walmethod->fsync(walfile) != 0) { fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), - progname, current_walfile_name, strerror(errno)); + progname, current_walfile_name, stream->walmethod->getlasterror()); return false; } lastFlushPosition = blockpos; @@ -1170,7 +1072,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, * Verify that the initial location in the stream matches where we think * we are. */ - if (walfile == -1) + if (walfile == NULL) { /* No file open yet */ if (xlogoff != 0) @@ -1184,12 +1086,11 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, else { /* More data in existing segment */ - /* XXX: store seek value don't reseek all the time */ - if (lseek(walfile, 0, SEEK_CUR) != xlogoff) + if (stream->walmethod->get_current_pos(walfile) != xlogoff) { fprintf(stderr, _("%s: got WAL data offset %08x, expected %08x\n"), - progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); + progname, xlogoff, (int) stream->walmethod->get_current_pos(walfile)); return false; } } @@ -1210,7 +1111,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, else bytes_to_write = bytes_left; - if (walfile == -1) + if (walfile == NULL) { if (!open_walfile(stream, *blockpos)) { @@ -1219,14 +1120,13 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, } } - if (write(walfile, - copybuf + hdr_len + bytes_written, - bytes_to_write) != bytes_to_write) + if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written, + bytes_to_write) != bytes_to_write) { fprintf(stderr, _("%s: could not write %u bytes to WAL file \"%s\": %s\n"), progname, bytes_to_write, current_walfile_name, - strerror(errno)); + stream->walmethod->getlasterror()); return false; } diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h index 7a3bbc5080..b5913ea995 100644 --- a/src/bin/pg_basebackup/receivelog.h +++ b/src/bin/pg_basebackup/receivelog.h @@ -13,6 +13,7 @@ #define RECEIVELOG_H #include "libpq-fe.h" +#include "walmethods.h" #include "access/xlogdefs.h" @@ -41,7 +42,7 @@ typedef struct StreamCtl stream_stop_callback stream_stop; /* Stop streaming when returns true */ - char *basedir; /* Received segments written to this dir */ + WalWriteMethod *walmethod; /* How to write the WAL */ char *partial_suffix; /* Suffix appended to partially received files */ } StreamCtl; diff --git a/src/bin/pg_basebackup/t/010_pg_basebackup.pl b/src/bin/pg_basebackup/t/010_pg_basebackup.pl index 579d7a15fb..91eb84e238 100644 --- a/src/bin/pg_basebackup/t/010_pg_basebackup.pl +++ b/src/bin/pg_basebackup/t/010_pg_basebackup.pl @@ -4,7 +4,7 @@ use Cwd; use Config; use PostgresNode; use TestLib; -use Test::More tests => 67; +use Test::More tests => 69; program_help_ok('pg_basebackup'); program_version_ok('pg_basebackup'); @@ -237,6 +237,10 @@ $node->command_ok( 'pg_basebackup -X stream runs'); ok(grep(/^[0-9A-F]{24}$/, slurp_dir("$tempdir/backupxf/pg_wal")), 'WAL files copied'); +$node->command_ok( + [ 'pg_basebackup', '-D', "$tempdir/backupxst", '-X', 'stream', '-Ft' ], + 'pg_basebackup -X stream runs in tar mode'); +ok(-f "$tempdir/backupxst/pg_wal.tar", "tar file was created"); $node->command_fails( [ 'pg_basebackup', '-D', "$tempdir/fail", '-S', 'slot1' ], diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c new file mode 100644 index 0000000000..e0ec752bbd --- /dev/null +++ b/src/bin/pg_basebackup/walmethods.c @@ -0,0 +1,886 @@ +/*------------------------------------------------------------------------- + * + * walmethods.c - implementations of different ways to write received wal + * + * NOTE! The caller must ensure that only one method is instantiated in + * any given program, and that it's only instantiated once! + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/walmethods.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include +#include +#include +#ifdef HAVE_LIBZ +#include +#endif + +#include "pgtar.h" +#include "common/file_utils.h" + +#include "receivelog.h" +#include "streamutil.h" + +/* Size of zlib buffer for .tar.gz */ +#define ZLIB_OUT_SIZE 4096 + +/*------------------------------------------------------------------------- + * WalDirectoryMethod - write wal to a directory looking like pg_xlog + *------------------------------------------------------------------------- + */ + +/* + * Global static data for this method + */ +typedef struct DirectoryMethodData +{ + char *basedir; + bool sync; +} DirectoryMethodData; +static DirectoryMethodData *dir_data = NULL; + +/* + * Local file handle + */ +typedef struct DirectoryMethodFile +{ + int fd; + off_t currpos; + char *pathname; + char *fullpath; + char *temp_suffix; +} DirectoryMethodFile; + +static char * +dir_getlasterror(void) +{ + /* Directory method always sets errno, so just use strerror */ + return strerror(errno); +} + +static Walfile +dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size) +{ + static char tmppath[MAXPGPATH]; + int fd; + DirectoryMethodFile *f; + + snprintf(tmppath, sizeof(tmppath), "%s/%s%s", + dir_data->basedir, pathname, temp_suffix ? temp_suffix : ""); + + fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); + if (fd < 0) + return NULL; + + if (pad_to_size) + { + /* Always pre-pad on regular files */ + char *zerobuf; + int bytes; + + zerobuf = pg_malloc0(XLOG_BLCKSZ); + for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ) + { + if (write(fd, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) + { + int save_errno = errno; + + pg_free(zerobuf); + close(fd); + errno = save_errno; + return NULL; + } + } + pg_free(zerobuf); + + if (lseek(fd, 0, SEEK_SET) != 0) + { + int save_errno = errno; + + close(fd); + errno = save_errno; + return NULL; + } + } + + /* + * fsync WAL file and containing directory, to ensure the file is + * persistently created and zeroed (if padded). That's particularly + * important when using synchronous mode, where the file is modified and + * fsynced in-place, without a directory fsync. + */ + if (dir_data->sync) + { + if (fsync_fname(tmppath, false, progname) != 0 || + fsync_parent_path(tmppath, progname) != 0) + { + close(fd); + return NULL; + } + } + + f = pg_malloc0(sizeof(DirectoryMethodFile)); + f->fd = fd; + f->currpos = 0; + f->pathname = pg_strdup(pathname); + f->fullpath = pg_strdup(tmppath); + if (temp_suffix) + f->temp_suffix = pg_strdup(temp_suffix); + + return f; +} + +static ssize_t +dir_write(Walfile f, const void *buf, size_t count) +{ + ssize_t r; + DirectoryMethodFile *df = (DirectoryMethodFile *) f; + + Assert(f != NULL); + + r = write(df->fd, buf, count); + if (r > 0) + df->currpos += r; + return r; +} + +static off_t +dir_get_current_pos(Walfile f) +{ + Assert(f != NULL); + + /* Use a cached value to prevent lots of reseeks */ + return ((DirectoryMethodFile *) f)->currpos; +} + +static int +dir_close(Walfile f, WalCloseMethod method) +{ + int r; + DirectoryMethodFile *df = (DirectoryMethodFile *) f; + static char tmppath[MAXPGPATH]; + static char tmppath2[MAXPGPATH]; + + Assert(f != NULL); + + r = close(df->fd); + + if (r == 0) + { + /* Build path to the current version of the file */ + if (method == CLOSE_NORMAL && df->temp_suffix) + { + /* + * If we have a temp prefix, normal operation is to rename the + * file. + */ + snprintf(tmppath, sizeof(tmppath), "%s/%s%s", + dir_data->basedir, df->pathname, df->temp_suffix); + snprintf(tmppath2, sizeof(tmppath2), "%s/%s", + dir_data->basedir, df->pathname); + r = durable_rename(tmppath, tmppath2, progname); + } + else if (method == CLOSE_UNLINK) + { + /* Unlink the file once it's closed */ + snprintf(tmppath, sizeof(tmppath), "%s/%s%s", + dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : ""); + r = unlink(tmppath); + } + else + { + /* + * Else either CLOSE_NORMAL and no temp suffix, or + * CLOSE_NO_RENAME. In this case, fsync the file and containing + * directory if sync mode is requested. + */ + if (dir_data->sync) + { + r = fsync_fname(df->fullpath, false, progname); + if (r == 0) + r = fsync_parent_path(df->fullpath, progname); + } + } + } + + pg_free(df->pathname); + pg_free(df->fullpath); + if (df->temp_suffix) + pg_free(df->temp_suffix); + pg_free(df); + + return r; +} + +static int +dir_fsync(Walfile f) +{ + Assert(f != NULL); + + if (!dir_data->sync) + return 0; + + return fsync(((DirectoryMethodFile *) f)->fd); +} + +static ssize_t +dir_get_file_size(const char *pathname) +{ + struct stat statbuf; + static char tmppath[MAXPGPATH]; + + snprintf(tmppath, sizeof(tmppath), "%s/%s", + dir_data->basedir, pathname); + + if (stat(tmppath, &statbuf) != 0) + return -1; + + return statbuf.st_size; +} + +static bool +dir_existsfile(const char *pathname) +{ + static char tmppath[MAXPGPATH]; + int fd; + + snprintf(tmppath, sizeof(tmppath), "%s/%s", + dir_data->basedir, pathname); + + fd = open(tmppath, O_RDONLY | PG_BINARY, 0); + if (fd < 0) + return false; + close(fd); + return true; +} + +static bool +dir_finish(void) +{ + if (dir_data->sync) + { + /* + * Files are fsynced when they are closed, but we need to fsync the + * directory entry here as well. + */ + if (fsync_fname(dir_data->basedir, true, progname) != 0) + return false; + } + return true; +} + + +WalWriteMethod * +CreateWalDirectoryMethod(const char *basedir, bool sync) +{ + WalWriteMethod *method; + + method = pg_malloc0(sizeof(WalWriteMethod)); + method->open_for_write = dir_open_for_write; + method->write = dir_write; + method->get_current_pos = dir_get_current_pos; + method->get_file_size = dir_get_file_size; + method->close = dir_close; + method->fsync = dir_fsync; + method->existsfile = dir_existsfile; + method->finish = dir_finish; + method->getlasterror = dir_getlasterror; + + dir_data = pg_malloc0(sizeof(DirectoryMethodData)); + dir_data->basedir = pg_strdup(basedir); + dir_data->sync = sync; + + return method; +} + + +/*------------------------------------------------------------------------- + * WalTarMethod - write wal to a tar file containing pg_xlog contents + *------------------------------------------------------------------------- + */ + +typedef struct TarMethodFile +{ + off_t ofs_start; /* Where does the *header* for this file start */ + off_t currpos; + char header[512]; + char *pathname; + size_t pad_to_size; +} TarMethodFile; + +typedef struct TarMethodData +{ + char *tarfilename; + int fd; + int compression; + bool sync; + TarMethodFile *currentfile; + char lasterror[1024]; +#ifdef HAVE_LIBZ + z_streamp zp; + void *zlibOut; +#endif +} TarMethodData; +static TarMethodData *tar_data = NULL; + +#define tar_clear_error() tar_data->lasterror[0] = '\0' +#define tar_set_error(msg) strlcpy(tar_data->lasterror, msg, sizeof(tar_data->lasterror)) + +static char * +tar_getlasterror(void) +{ + /* + * If a custom error is set, return that one. Otherwise, assume errno is + * set and return that one. + */ + if (tar_data->lasterror[0]) + return tar_data->lasterror; + return strerror(errno); +} + +#ifdef HAVE_LIBZ +static bool +tar_write_compressed_data(void *buf, size_t count, bool flush) +{ + tar_data->zp->next_in = buf; + tar_data->zp->avail_in = count; + + while (tar_data->zp->avail_in || flush) + { + int r; + + r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH); + if (r == Z_STREAM_ERROR) + { + tar_set_error("deflate failed"); + return false; + } + + if (tar_data->zp->avail_out < ZLIB_OUT_SIZE) + { + size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out; + + if (write(tar_data->fd, tar_data->zlibOut, len) != len) + return false; + + tar_data->zp->next_out = tar_data->zlibOut; + tar_data->zp->avail_out = ZLIB_OUT_SIZE; + } + + if (r == Z_STREAM_END) + break; + } + + if (flush) + { + /* Reset the stream for writing */ + if (deflateReset(tar_data->zp) != Z_OK) + { + tar_set_error("deflateReset failed"); + return false; + } + } + + return true; +} +#endif + +static ssize_t +tar_write(Walfile f, const void *buf, size_t count) +{ + ssize_t r; + + Assert(f != NULL); + tar_clear_error(); + + /* Tarfile will always be positioned at the end */ + if (!tar_data->compression) + { + r = write(tar_data->fd, buf, count); + if (r > 0) + ((TarMethodFile *) f)->currpos += r; + return r; + } +#ifdef HAVE_LIBZ + else + { + if (!tar_write_compressed_data((void *) buf, count, false)) + return -1; + ((TarMethodFile *) f)->currpos += count; + return count; + } +#endif +} + +static bool +tar_write_padding_data(TarMethodFile * f, size_t bytes) +{ + char *zerobuf = pg_malloc0(XLOG_BLCKSZ); + size_t bytesleft = bytes; + + while (bytesleft) + { + size_t bytestowrite = bytesleft > XLOG_BLCKSZ ? XLOG_BLCKSZ : bytesleft; + + size_t r = tar_write(f, zerobuf, bytestowrite); + + if (r < 0) + return false; + bytesleft -= r; + } + return true; +} + +static Walfile +tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size) +{ + int save_errno; + static char tmppath[MAXPGPATH]; + + tar_clear_error(); + + if (tar_data->fd < 0) + { + /* + * We open the tar file only when we first try to write to it. + */ + tar_data->fd = open(tar_data->tarfilename, + O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); + if (tar_data->fd < 0) + return NULL; + +#ifdef HAVE_LIBZ + if (tar_data->compression) + { + tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream)); + tar_data->zp->zalloc = Z_NULL; + tar_data->zp->zfree = Z_NULL; + tar_data->zp->opaque = Z_NULL; + tar_data->zp->next_out = tar_data->zlibOut; + tar_data->zp->avail_out = ZLIB_OUT_SIZE; + + /* + * Initialize deflation library. Adding the magic value 16 to the + * default 15 for the windowBits parameter makes the output be + * gzip instead of zlib. + */ + if (deflateInit2(tar_data->zp, tar_data->compression, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) + { + pg_free(tar_data->zp); + tar_data->zp = NULL; + tar_set_error("deflateInit2 failed"); + return NULL; + } + } +#endif + + /* There's no tar header itself, the file starts with regular files */ + } + + Assert(tar_data->currentfile == NULL); + if (tar_data->currentfile != NULL) + { + tar_set_error("implementation error: tar files can't have more than one open file\n"); + return NULL; + } + + tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile)); + + snprintf(tmppath, sizeof(tmppath), "%s%s", + pathname, temp_suffix ? temp_suffix : ""); + + /* Create a header with size set to 0 - we will fill out the size on close */ + if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK) + { + pg_free(tar_data->currentfile); + tar_data->currentfile = NULL; + tar_set_error("could not create tar header"); + return NULL; + } + +#ifdef HAVE_LIBZ + if (tar_data->compression) + { + /* Flush existing data */ + if (!tar_write_compressed_data(NULL, 0, true)) + return NULL; + + /* Turn off compression for header */ + if (deflateParams(tar_data->zp, 0, 0) != Z_OK) + { + tar_set_error("deflateParams failed"); + return NULL; + } + } +#endif + + tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR); + if (tar_data->currentfile->ofs_start == -1) + { + save_errno = errno; + pg_free(tar_data->currentfile); + tar_data->currentfile = NULL; + errno = save_errno; + return NULL; + } + tar_data->currentfile->currpos = 0; + + if (!tar_data->compression) + { + if (write(tar_data->fd, tar_data->currentfile->header, 512) != 512) + { + save_errno = errno; + pg_free(tar_data->currentfile); + tar_data->currentfile = NULL; + errno = save_errno; + return NULL; + } + } +#ifdef HAVE_LIBZ + else + { + /* Write header through the zlib APIs but with no compression */ + if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true)) + return NULL; + + /* Re-enable compression for the rest of the file */ + if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK) + { + tar_set_error("deflateParams failed"); + return NULL; + } + } +#endif + + tar_data->currentfile->pathname = pg_strdup(pathname); + + /* + * Uncompressed files are padded on creation, but for compression we can't + * do that + */ + if (pad_to_size) + { + tar_data->currentfile->pad_to_size = pad_to_size; + if (!tar_data->compression) + { + /* Uncompressed, so pad now */ + tar_write_padding_data(tar_data->currentfile, pad_to_size); + /* Seek back to start */ + if (lseek(tar_data->fd, tar_data->currentfile->ofs_start + 512, SEEK_SET) != tar_data->currentfile->ofs_start + 512) + return NULL; + + tar_data->currentfile->currpos = 0; + } + } + + return tar_data->currentfile; +} + +static ssize_t +tar_get_file_size(const char *pathname) +{ + tar_clear_error(); + + /* Currently not used, so not supported */ + errno = ENOSYS; + return -1; +} + +static off_t +tar_get_current_pos(Walfile f) +{ + Assert(f != NULL); + tar_clear_error(); + + return ((TarMethodFile *) f)->currpos; +} + +static int +tar_fsync(Walfile f) +{ + Assert(f != NULL); + tar_clear_error(); + + /* + * Always sync the whole tarfile, because that's all we can do. This makes + * no sense on compressed files, so just ignore those. + */ + if (tar_data->compression) + return 0; + + return fsync(tar_data->fd); +} + +static int +tar_close(Walfile f, WalCloseMethod method) +{ + ssize_t filesize; + int padding; + TarMethodFile *tf = (TarMethodFile *) f; + + Assert(f != NULL); + tar_clear_error(); + + if (method == CLOSE_UNLINK) + { + if (tar_data->compression) + { + tar_set_error("unlink not supported with compression"); + return -1; + } + + /* + * Unlink the file that we just wrote to the tar. We do this by + * truncating it to the start of the header. This is safe as we only + * allow writing of the very last file. + */ + if (ftruncate(tar_data->fd, tf->ofs_start) != 0) + return -1; + + pg_free(tf->pathname); + pg_free(tf); + tar_data->currentfile = NULL; + + return 0; + } + + /* + * Pad the file itself with zeroes if necessary. Note that this is + * different from the tar format padding -- this is the padding we asked + * for when the file was opened. + */ + if (tf->pad_to_size) + { + if (tar_data->compression) + { + /* + * A compressed tarfile is padded on close since we cannot know + * the size of the compressed output until the end. + */ + size_t sizeleft = tf->pad_to_size - tf->currpos; + + if (sizeleft) + { + if (!tar_write_padding_data(tf, sizeleft)) + return -1; + } + } + else + { + /* + * An uncompressed tarfile was padded on creation, so just adjust + * the current position as if we seeked to the end. + */ + tf->currpos = tf->pad_to_size; + } + } + + /* + * Get the size of the file, and pad the current data up to the nearest + * 512 byte boundary. + */ + filesize = tar_get_current_pos(f); + padding = ((filesize + 511) & ~511) - filesize; + if (padding) + { + char zerobuf[512]; + + MemSet(zerobuf, 0, padding); + if (tar_write(f, zerobuf, padding) != padding) + return -1; + } + + +#ifdef HAVE_LIBZ + if (tar_data->compression) + { + /* Flush the current buffer */ + if (!tar_write_compressed_data(NULL, 0, true)) + { + errno = EINVAL; + return -1; + } + } +#endif + + /* + * Now go back and update the header with the correct filesize and + * possibly also renaming the file. We overwrite the entire current header + * when done, including the checksum. + */ + print_tar_number(&(tf->header[124]), 12, filesize); + + if (method == CLOSE_NORMAL) + + /* + * We overwrite it with what it was before if we have no tempname, + * since we're going to write the buffer anyway. + */ + strlcpy(&(tf->header[0]), tf->pathname, 100); + + print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header)); + if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start) + return -1; + if (!tar_data->compression) + { + if (write(tar_data->fd, tf->header, 512) != 512) + return -1; + } +#ifdef HAVE_LIBZ + else + { + /* Turn off compression */ + if (deflateParams(tar_data->zp, 0, 0) != Z_OK) + { + tar_set_error("deflateParams failed"); + return -1; + } + + /* Overwrite the header, assuming the size will be the same */ + if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true)) + return -1; + + /* Turn compression back on */ + if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK) + { + tar_set_error("deflateParams failed"); + return -1; + } + } +#endif + + /* Move file pointer back down to end, so we can write the next file */ + if (lseek(tar_data->fd, 0, SEEK_END) < 0) + return -1; + + /* Always fsync on close, so the padding gets fsynced */ + tar_fsync(f); + + /* Clean up and done */ + pg_free(tf->pathname); + pg_free(tf); + tar_data->currentfile = NULL; + + return 0; +} + +static bool +tar_existsfile(const char *pathname) +{ + tar_clear_error(); + /* We only deal with new tarfiles, so nothing externally created exists */ + return false; +} + +static bool +tar_finish(void) +{ + char zerobuf[1024]; + + tar_clear_error(); + + if (tar_data->currentfile) + { + if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0) + return false; + } + + /* A tarfile always ends with two empty blocks */ + MemSet(zerobuf, 0, sizeof(zerobuf)); + if (!tar_data->compression) + { + if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf)) + return false; + } +#ifdef HAVE_LIBZ + else + { + if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false)) + return false; + + /* Also flush all data to make sure the gzip stream is finished */ + tar_data->zp->next_in = NULL; + tar_data->zp->avail_in = 0; + while (true) + { + int r; + + r = deflate(tar_data->zp, Z_FINISH); + + if (r == Z_STREAM_ERROR) + { + tar_set_error("deflate failed"); + return false; + } + if (tar_data->zp->avail_out < ZLIB_OUT_SIZE) + { + size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out; + + if (write(tar_data->fd, tar_data->zlibOut, len) != len) + return false; + } + if (r == Z_STREAM_END) + break; + } + + if (deflateEnd(tar_data->zp) != Z_OK) + { + tar_set_error("deflateEnd failed"); + return false; + } + } +#endif + + /* sync the empty blocks as well, since they're after the last file */ + fsync(tar_data->fd); + + if (close(tar_data->fd) != 0) + return false; + + tar_data->fd = -1; + + if (tar_data->sync) + { + if (fsync_fname(tar_data->tarfilename, false, progname) != 0) + return false; + if (fsync_parent_path(tar_data->tarfilename, progname) != 0) + return false; + } + + return true; +} + +WalWriteMethod * +CreateWalTarMethod(const char *tarbase, int compression, bool sync) +{ + WalWriteMethod *method; + const char *suffix = (compression != 0) ? ".tar.gz" : ".tar"; + + method = pg_malloc0(sizeof(WalWriteMethod)); + method->open_for_write = tar_open_for_write; + method->write = tar_write; + method->get_current_pos = tar_get_current_pos; + method->get_file_size = tar_get_file_size; + method->close = tar_close; + method->fsync = tar_fsync; + method->existsfile = tar_existsfile; + method->finish = tar_finish; + method->getlasterror = tar_getlasterror; + + tar_data = pg_malloc0(sizeof(TarMethodData)); + tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1); + sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix); + tar_data->fd = -1; + tar_data->compression = compression; + tar_data->sync = sync; + if (compression) + tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1); + + return method; +} diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h new file mode 100644 index 0000000000..fa58f812f6 --- /dev/null +++ b/src/bin/pg_basebackup/walmethods.h @@ -0,0 +1,45 @@ +/*------------------------------------------------------------------------- + * + * walmethods.h + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/walmethods.h + *------------------------------------------------------------------------- + */ + + +typedef void *Walfile; + +typedef enum +{ + CLOSE_NORMAL, + CLOSE_UNLINK, + CLOSE_NO_RENAME, +} WalCloseMethod; + +typedef struct WalWriteMethod WalWriteMethod; +struct WalWriteMethod +{ + Walfile(*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size); + int (*close) (Walfile f, WalCloseMethod method); + bool (*existsfile) (const char *pathname); + ssize_t (*get_file_size) (const char *pathname); + + ssize_t (*write) (Walfile f, const void *buf, size_t count); + off_t (*get_current_pos) (Walfile f); + int (*fsync) (Walfile f); + bool (*finish) (void); + char *(*getlasterror) (void); +}; + +/* + * Available WAL methods: + * - WalDirectoryMethod - write WAL to regular files in a standard pg_xlog + * - TarDirectoryMethod - write WAL to a tarfile corresponding to pg_xlog + * (only implements the methods required for pg_basebackup, + * not all those required for pg_receivexlog) + */ +WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, bool sync); +WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync); diff --git a/src/include/pgtar.h b/src/include/pgtar.h index 45ca400f98..1d179f0df1 100644 --- a/src/include/pgtar.h +++ b/src/include/pgtar.h @@ -22,4 +22,5 @@ enum tarError extern enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget, pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime); extern uint64 read_tar_number(const char *s, int len); +extern void print_tar_number(char *s, int len, uint64 val); extern int tarChecksum(char *header); diff --git a/src/port/tar.c b/src/port/tar.c index 52a2113a47..f1da959dac 100644 --- a/src/port/tar.c +++ b/src/port/tar.c @@ -16,7 +16,7 @@ * support only non-negative numbers, so we don't worry about the GNU rules * for handling negative numbers.) */ -static void +void print_tar_number(char *s, int len, uint64 val) { if (val < (((uint64) 1) << ((len - 1) * 3)))