/* Global options */
static char *basedir = NULL;
static int verbose = 0;
+static int compresslevel = 0;
static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static volatile bool time_to_abort = false;
exit(code); \
}
+/* Routines to evaluate segment file format */
+#define IsCompressXLogFileName(fname) \
+ (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") && \
+ strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN && \
+ strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0)
+#define IsPartialCompressXLogFileName(fname) \
+ (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") && \
+ strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN && \
+ strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0)
static void
usage(void)
printf(_(" --synchronous flush transaction log immediately after writing\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n"));
+ printf(_(" -Z, --compress=0-9 compress logs with given compression level\n"));
printf(_(" -?, --help show this help, then exit\n"));
printf(_("\nConnection options:\n"));
printf(_(" -d, --dbname=CONNSTR connection string\n"));
uint32 tli;
XLogSegNo segno;
bool ispartial;
+ bool iscompress;
/*
* Check if the filename looks like an xlog file, or a .partial file.
*/
if (IsXLogFileName(dirent->d_name))
+ {
ispartial = false;
+ iscompress = false;
+ }
else if (IsPartialXLogFileName(dirent->d_name))
+ {
+ ispartial = true;
+ iscompress = false;
+ }
+ else if (IsCompressXLogFileName(dirent->d_name))
+ {
+ ispartial = false;
+ iscompress = true;
+ }
+ else if (IsPartialCompressXLogFileName(dirent->d_name))
+ {
ispartial = true;
+ iscompress = true;
+ }
else
continue;
/*
* Check that the segment has the right size, if it's supposed to be
- * completed.
+ * completed. For non-compressed segments just check the on-disk size
+ * and see if it matches a completed segment.
+ * For compressed segments, look at the last 4 bytes of the compressed
+ * file, which is where the uncompressed size is located for gz files
+ * with a size lower than 4GB, and then compare it to the size of a
+ * completed segment. The 4 last bytes correspond to the ISIZE member
+ * according to http://www.zlib.org/rfc-gzip.html.
*/
- if (!ispartial)
+ if (!ispartial && !iscompress)
{
struct stat statbuf;
char fullpath[MAXPGPATH];
continue;
}
}
+ else if (!ispartial && iscompress)
+ {
+ int fd;
+ char buf[4];
+ int bytes_out;
+ char fullpath[MAXPGPATH];
+
+ snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
+
+ fd = open(fullpath, O_RDONLY | PG_BINARY);
+ if (fd < 0)
+ {
+ fprintf(stderr, _("%s: could not open compressed file \"%s\": %s\n"),
+ progname, fullpath, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ if (lseek(fd, (off_t)(-4), SEEK_END) < 0)
+ {
+ fprintf(stderr, _("%s: could not seek compressed file \"%s\": %s\n"),
+ progname, fullpath, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ if (read(fd, (char *) buf, sizeof(buf)) != sizeof(buf))
+ {
+ fprintf(stderr, _("%s: could not read compressed file \"%s\": %s\n"),
+ progname, fullpath, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ close(fd);
+ bytes_out = (buf[3] << 24) | (buf[2] << 16) |
+ (buf[1] << 8) | buf[0];
+
+ if (bytes_out != XLOG_SEG_SIZE)
+ {
+ fprintf(stderr,
+ _("%s: compressed segment file \"%s\" has incorrect uncompressed size %d, skipping\n"),
+ progname, dirent->d_name, bytes_out);
+ continue;
+ }
+ }
/* Looks like a valid segment. Remember that we saw it. */
if ((segno > high_segno) ||
stream.synchronous = synchronous;
stream.do_sync = true;
stream.mark_done = false;
- stream.walmethod = CreateWalDirectoryMethod(basedir, stream.do_sync);
+ stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel,
+ stream.do_sync);
stream.partial_suffix = ".partial";
stream.replication_slot = replication_slot;
stream.temp_slot = false;
{"status-interval", required_argument, NULL, 's'},
{"slot", required_argument, NULL, 'S'},
{"verbose", no_argument, NULL, 'v'},
+ {"compress", required_argument, NULL, 'Z'},
/* action */
{"create-slot", no_argument, NULL, 1},
{"drop-slot", no_argument, NULL, 2},
}
}
- while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWv",
+ while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWvZ:",
long_options, &option_index)) != -1)
{
switch (c)
case 'v':
verbose++;
break;
+ case 'Z':
+ compresslevel = atoi(optarg);
+ if (compresslevel < 0 || compresslevel > 9)
+ {
+ fprintf(stderr, _("%s: invalid compression level \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ break;
/* action */
case 1:
do_create_slot = true;
exit(1);
}
+#ifndef HAVE_LIBZ
+ if (compresslevel != 0)
+ {
+ fprintf(stderr,
+ _("%s: this build does not support compression\n"),
+ progname);
+ exit(1);
+ }
+#endif
+
/*
* Check existence of destination folder.
*/
typedef struct DirectoryMethodData
{
char *basedir;
+ int compression;
bool sync;
} DirectoryMethodData;
static DirectoryMethodData *dir_data = NULL;
char *pathname;
char *fullpath;
char *temp_suffix;
+#ifdef HAVE_LIBZ
+ gzFile gzfp;
+#endif
} DirectoryMethodFile;
static char *
static char tmppath[MAXPGPATH];
int fd;
DirectoryMethodFile *f;
+#ifdef HAVE_LIBZ
+ gzFile gzfp = NULL;
+#endif
- snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
- dir_data->basedir, pathname, temp_suffix ? temp_suffix : "");
+ snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+ dir_data->basedir, pathname,
+ dir_data->compression > 0 ? ".gz" : "",
+ temp_suffix ? temp_suffix : "");
+ /*
+ * Open a file for non-compressed as well as compressed files. Tracking
+ * the file descriptor is important for dir_sync() method as gzflush()
+ * does not do any system calls to fsync() to make changes permanent on
+ * disk.
+ */
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0)
return NULL;
- if (pad_to_size)
+#ifdef HAVE_LIBZ
+ if (dir_data->compression > 0)
+ {
+ gzfp = gzdopen(fd, "wb");
+ if (gzfp == NULL)
+ {
+ close(fd);
+ return NULL;
+ }
+
+ if (gzsetparams(gzfp, dir_data->compression,
+ Z_DEFAULT_STRATEGY) != Z_OK)
+ {
+ gzclose(gzfp);
+ return NULL;
+ }
+ }
+#endif
+
+ /* Do pre-padding on non-compressed files */
+ if (pad_to_size && dir_data->compression == 0)
{
- /* Always pre-pad on regular files */
char *zerobuf;
int bytes;
if (fsync_fname(tmppath, false, progname) != 0 ||
fsync_parent_path(tmppath, progname) != 0)
{
- close(fd);
+#ifdef HAVE_LIBZ
+ if (dir_data->compression > 0)
+ gzclose(gzfp);
+ else
+#endif
+ close(fd);
return NULL;
}
}
f = pg_malloc0(sizeof(DirectoryMethodFile));
+#ifdef HAVE_LIBZ
+ if (dir_data->compression > 0)
+ f->gzfp = gzfp;
+#endif
f->fd = fd;
f->currpos = 0;
f->pathname = pg_strdup(pathname);
Assert(f != NULL);
- r = write(df->fd, buf, count);
+#ifdef HAVE_LIBZ
+ if (dir_data->compression > 0)
+ r = (ssize_t) gzwrite(df->gzfp, buf, count);
+ else
+#endif
+ r = write(df->fd, buf, count);
if (r > 0)
df->currpos += r;
return r;
Assert(f != NULL);
- r = close(df->fd);
+#ifdef HAVE_LIBZ
+ if (dir_data->compression > 0)
+ r = gzclose(df->gzfp);
+ else
+#endif
+ r = close(df->fd);
if (r == 0)
{
* If we have a temp prefix, normal operation is to rename the
* file.
*/
- snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
- dir_data->basedir, df->pathname, df->temp_suffix);
- snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
- dir_data->basedir, df->pathname);
+ snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+ dir_data->basedir, df->pathname,
+ dir_data->compression > 0 ? ".gz" : "",
+ df->temp_suffix);
+ snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
+ dir_data->basedir, df->pathname,
+ dir_data->compression > 0 ? ".gz" : "");
r = durable_rename(tmppath, tmppath2, progname);
}
else if (method == CLOSE_UNLINK)
{
/* Unlink the file once it's closed */
- snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
- dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : "");
+ snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+ dir_data->basedir, df->pathname,
+ dir_data->compression > 0 ? ".gz" : "",
+ df->temp_suffix ? df->temp_suffix : "");
r = unlink(tmppath);
}
else
if (!dir_data->sync)
return 0;
+#ifdef HAVE_LIBZ
+ if (dir_data->compression > 0)
+ {
+ if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
+ return -1;
+ }
+#endif
+
return fsync(((DirectoryMethodFile *) f)->fd);
}
WalWriteMethod *
-CreateWalDirectoryMethod(const char *basedir, bool sync)
+CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
{
WalWriteMethod *method;
method->getlasterror = dir_getlasterror;
dir_data = pg_malloc0(sizeof(DirectoryMethodData));
+ dir_data->compression = compression;
dir_data->basedir = pg_strdup(basedir);
dir_data->sync = sync;