]> granicus.if.org Git - postgresql/commitdiff
Add compression support to pg_receivexlog
authorMagnus Hagander <magnus@hagander.net>
Tue, 17 Jan 2017 11:10:26 +0000 (12:10 +0100)
committerMagnus Hagander <magnus@hagander.net>
Tue, 17 Jan 2017 11:10:26 +0000 (12:10 +0100)
Author: Michael Paquier, review and small changes by me

doc/src/sgml/ref/pg_receivexlog.sgml
src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivexlog.c
src/bin/pg_basebackup/walmethods.c
src/bin/pg_basebackup/walmethods.h

index bfa055b58b3f9ee9220c8cf2d7198d038a73cf1f..8c1ea9a2e2e8f68b82204c47bdd6d1d902c554f0 100644 (file)
@@ -180,6 +180,19 @@ PostgreSQL documentation
        </para>
       </listitem>
      </varlistentry>
+
+     <varlistentry>
+      <term><option>-Z <replaceable class="parameter">level</replaceable></option></term>
+      <term><option>--compress=<replaceable class="parameter">level</replaceable></option></term>
+      <listitem>
+       <para>
+        Enables gzip compression of transaction logs, and specifies the
+        compression level (0 through 9, 0 being no compression and 9 being best
+        compression).  The suffix <filename>.gz</filename> will
+        automatically be added to all filenames.
+       </para>
+      </listitem>
+     </varlistentry>
     </variablelist>
 
    <para>
index c5ae1cc147b7d310fb7b889f1fc7fd4e434815af..ce1fe3bf0089321d292cb36a3fef3c6063235963 100644 (file)
@@ -494,7 +494,7 @@ LogStreamerMain(logstreamer_param *param)
                stream.replication_slot = psprintf("pg_basebackup_%d", (int) getpid());
 
        if (format == 'p')
-               stream.walmethod = CreateWalDirectoryMethod(param->xlog, do_sync);
+               stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
        else
                stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
 
index 11c31bbe137293ed922a2c93c1cb11137711e043..135e2070f37a104a7a02f92512f5c87b4db39dc3 100644 (file)
@@ -34,6 +34,7 @@
 /* Global options */
 static char *basedir = NULL;
 static int     verbose = 0;
+static int     compresslevel = 0;
 static int     noloop = 0;
 static int     standby_message_timeout = 10 * 1000;            /* 10 sec = default */
 static volatile bool time_to_abort = false;
@@ -58,6 +59,15 @@ static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
        exit(code);                                                                     \
        }
 
+/* Routines to evaluate segment file format */
+#define IsCompressXLogFileName(fname)    \
+       (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") &&     \
+        strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&         \
+        strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0)
+#define IsPartialCompressXLogFileName(fname)    \
+       (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") &&     \
+        strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&         \
+        strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0)
 
 static void
 usage(void)
@@ -76,6 +86,7 @@ usage(void)
        printf(_("      --synchronous      flush transaction log immediately after writing\n"));
        printf(_("  -v, --verbose          output verbose messages\n"));
        printf(_("  -V, --version          output version information, then exit\n"));
+       printf(_("  -Z, --compress=0-9     compress logs with given compression level\n"));
        printf(_("  -?, --help             show this help, then exit\n"));
        printf(_("\nConnection options:\n"));
        printf(_("  -d, --dbname=CONNSTR   connection string\n"));
@@ -188,14 +199,31 @@ FindStreamingStart(uint32 *tli)
                uint32          tli;
                XLogSegNo       segno;
                bool            ispartial;
+               bool            iscompress;
 
                /*
                 * Check if the filename looks like an xlog file, or a .partial file.
                 */
                if (IsXLogFileName(dirent->d_name))
+               {
                        ispartial = false;
+                       iscompress = false;
+               }
                else if (IsPartialXLogFileName(dirent->d_name))
+               {
+                       ispartial = true;
+                       iscompress = false;
+               }
+               else if (IsCompressXLogFileName(dirent->d_name))
+               {
+                       ispartial = false;
+                       iscompress = true;
+               }
+               else if (IsPartialCompressXLogFileName(dirent->d_name))
+               {
                        ispartial = true;
+                       iscompress = true;
+               }
                else
                        continue;
 
@@ -206,9 +234,15 @@ FindStreamingStart(uint32 *tli)
 
                /*
                 * Check that the segment has the right size, if it's supposed to be
-                * completed.
+                * completed.  For non-compressed segments just check the on-disk size
+                * and see if it matches a completed segment.
+                * For compressed segments, look at the last 4 bytes of the compressed
+                * file, which is where the uncompressed size is located for gz files
+                * with a size lower than 4GB, and then compare it to the size of a
+                * completed segment. The 4 last bytes correspond to the ISIZE member
+                * according to http://www.zlib.org/rfc-gzip.html.
                 */
-               if (!ispartial)
+               if (!ispartial && !iscompress)
                {
                        struct stat statbuf;
                        char            fullpath[MAXPGPATH];
@@ -229,6 +263,47 @@ FindStreamingStart(uint32 *tli)
                                continue;
                        }
                }
+               else if (!ispartial && iscompress)
+               {
+                       int             fd;
+                       char    buf[4];
+                       int             bytes_out;
+                       char    fullpath[MAXPGPATH];
+
+                       snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
+
+                       fd = open(fullpath, O_RDONLY | PG_BINARY);
+                       if (fd < 0)
+                       {
+                               fprintf(stderr, _("%s: could not open compressed file \"%s\": %s\n"),
+                                               progname, fullpath, strerror(errno));
+                               disconnect_and_exit(1);
+                       }
+                       if (lseek(fd, (off_t)(-4), SEEK_END) < 0)
+                       {
+                               fprintf(stderr, _("%s: could not seek compressed file \"%s\": %s\n"),
+                                               progname, fullpath, strerror(errno));
+                               disconnect_and_exit(1);
+                       }
+                       if (read(fd, (char *) buf, sizeof(buf)) != sizeof(buf))
+                       {
+                               fprintf(stderr, _("%s: could not read compressed file \"%s\": %s\n"),
+                                               progname, fullpath, strerror(errno));
+                               disconnect_and_exit(1);
+                       }
+
+                       close(fd);
+                       bytes_out = (buf[3] << 24) | (buf[2] << 16) |
+                                               (buf[1] << 8) | buf[0];
+
+                       if (bytes_out != XLOG_SEG_SIZE)
+                       {
+                               fprintf(stderr,
+                                               _("%s: compressed segment file \"%s\" has incorrect uncompressed size %d, skipping\n"),
+                                               progname, dirent->d_name, bytes_out);
+                               continue;
+                       }
+               }
 
                /* Looks like a valid segment. Remember that we saw it. */
                if ((segno > high_segno) ||
@@ -339,7 +414,8 @@ StreamLog(void)
        stream.synchronous = synchronous;
        stream.do_sync = true;
        stream.mark_done = false;
-       stream.walmethod = CreateWalDirectoryMethod(basedir, stream.do_sync);
+       stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel,
+                                                                                               stream.do_sync);
        stream.partial_suffix = ".partial";
        stream.replication_slot = replication_slot;
        stream.temp_slot = false;
@@ -392,6 +468,7 @@ main(int argc, char **argv)
                {"status-interval", required_argument, NULL, 's'},
                {"slot", required_argument, NULL, 'S'},
                {"verbose", no_argument, NULL, 'v'},
+               {"compress", required_argument, NULL, 'Z'},
 /* action */
                {"create-slot", no_argument, NULL, 1},
                {"drop-slot", no_argument, NULL, 2},
@@ -422,7 +499,7 @@ main(int argc, char **argv)
                }
        }
 
-       while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWv",
+       while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWvZ:",
                                                        long_options, &option_index)) != -1)
        {
                switch (c)
@@ -472,6 +549,15 @@ main(int argc, char **argv)
                        case 'v':
                                verbose++;
                                break;
+                       case 'Z':
+                               compresslevel = atoi(optarg);
+                               if (compresslevel < 0 || compresslevel > 9)
+                               {
+                                       fprintf(stderr, _("%s: invalid compression level \"%s\"\n"),
+                                                       progname, optarg);
+                                       exit(1);
+                               }
+                               break;
 /* action */
                        case 1:
                                do_create_slot = true;
@@ -538,6 +624,16 @@ main(int argc, char **argv)
                exit(1);
        }
 
+#ifndef HAVE_LIBZ
+       if (compresslevel != 0)
+       {
+               fprintf(stderr,
+                               _("%s: this build does not support compression\n"),
+                               progname);
+               exit(1);
+       }
+#endif
+
        /*
         * Check existence of destination folder.
         */
index 88ee603b8bced7f74107e89a23bc364bcea03579..d9ad596bf0603d887686e1413d95e627dab8718c 100644 (file)
@@ -41,6 +41,7 @@
 typedef struct DirectoryMethodData
 {
        char       *basedir;
+       int                     compression;
        bool            sync;
 }      DirectoryMethodData;
 static DirectoryMethodData *dir_data = NULL;
@@ -55,6 +56,9 @@ typedef struct DirectoryMethodFile
        char       *pathname;
        char       *fullpath;
        char       *temp_suffix;
+#ifdef HAVE_LIBZ
+       gzFile          gzfp;
+#endif
 }      DirectoryMethodFile;
 
 static char *
@@ -70,17 +74,47 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
        static char tmppath[MAXPGPATH];
        int                     fd;
        DirectoryMethodFile *f;
+#ifdef HAVE_LIBZ
+       gzFile          gzfp = NULL;
+#endif
 
-       snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-                        dir_data->basedir, pathname, temp_suffix ? temp_suffix : "");
+       snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+                        dir_data->basedir, pathname,
+                        dir_data->compression > 0 ? ".gz" : "",
+                        temp_suffix ? temp_suffix : "");
 
+       /*
+        * Open a file for non-compressed as well as compressed files. Tracking
+        * the file descriptor is important for dir_sync() method as gzflush()
+        * does not do any system calls to fsync() to make changes permanent on
+        * disk.
+        */
        fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
        if (fd < 0)
                return NULL;
 
-       if (pad_to_size)
+#ifdef HAVE_LIBZ
+       if (dir_data->compression > 0)
+       {
+               gzfp = gzdopen(fd, "wb");
+               if (gzfp == NULL)
+               {
+                       close(fd);
+                       return NULL;
+               }
+
+               if (gzsetparams(gzfp, dir_data->compression,
+                                               Z_DEFAULT_STRATEGY) != Z_OK)
+               {
+                       gzclose(gzfp);
+                       return NULL;
+               }
+       }
+#endif
+
+       /* Do pre-padding on non-compressed files */
+       if (pad_to_size && dir_data->compression == 0)
        {
-               /* Always pre-pad on regular files */
                char       *zerobuf;
                int                     bytes;
 
@@ -120,12 +154,21 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
                if (fsync_fname(tmppath, false, progname) != 0 ||
                        fsync_parent_path(tmppath, progname) != 0)
                {
-                       close(fd);
+#ifdef HAVE_LIBZ
+                       if (dir_data->compression > 0)
+                               gzclose(gzfp);
+                       else
+#endif
+                               close(fd);
                        return NULL;
                }
        }
 
        f = pg_malloc0(sizeof(DirectoryMethodFile));
+#ifdef HAVE_LIBZ
+       if (dir_data->compression > 0)
+               f->gzfp = gzfp;
+#endif
        f->fd = fd;
        f->currpos = 0;
        f->pathname = pg_strdup(pathname);
@@ -144,7 +187,12 @@ dir_write(Walfile f, const void *buf, size_t count)
 
        Assert(f != NULL);
 
-       r = write(df->fd, buf, count);
+#ifdef HAVE_LIBZ
+       if (dir_data->compression > 0)
+               r = (ssize_t) gzwrite(df->gzfp, buf, count);
+       else
+#endif
+               r = write(df->fd, buf, count);
        if (r > 0)
                df->currpos += r;
        return r;
@@ -169,7 +217,12 @@ dir_close(Walfile f, WalCloseMethod method)
 
        Assert(f != NULL);
 
-       r = close(df->fd);
+#ifdef HAVE_LIBZ
+       if (dir_data->compression > 0)
+               r = gzclose(df->gzfp);
+       else
+#endif
+               r = close(df->fd);
 
        if (r == 0)
        {
@@ -180,17 +233,22 @@ dir_close(Walfile f, WalCloseMethod method)
                         * If we have a temp prefix, normal operation is to rename the
                         * file.
                         */
-                       snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-                                        dir_data->basedir, df->pathname, df->temp_suffix);
-                       snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
-                                        dir_data->basedir, df->pathname);
+                       snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+                                        dir_data->basedir, df->pathname,
+                                        dir_data->compression > 0 ? ".gz" : "",
+                                        df->temp_suffix);
+                       snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
+                                        dir_data->basedir, df->pathname,
+                                        dir_data->compression > 0 ? ".gz" : "");
                        r = durable_rename(tmppath, tmppath2, progname);
                }
                else if (method == CLOSE_UNLINK)
                {
                        /* Unlink the file once it's closed */
-                       snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-                                        dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : "");
+                       snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+                                        dir_data->basedir, df->pathname,
+                                        dir_data->compression > 0 ? ".gz" : "",
+                                        df->temp_suffix ? df->temp_suffix : "");
                        r = unlink(tmppath);
                }
                else
@@ -226,6 +284,14 @@ dir_sync(Walfile f)
        if (!dir_data->sync)
                return 0;
 
+#ifdef HAVE_LIBZ
+       if (dir_data->compression > 0)
+       {
+               if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
+                       return -1;
+       }
+#endif
+
        return fsync(((DirectoryMethodFile *) f)->fd);
 }
 
@@ -277,7 +343,7 @@ dir_finish(void)
 
 
 WalWriteMethod *
-CreateWalDirectoryMethod(const char *basedir, bool sync)
+CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
 {
        WalWriteMethod *method;
 
@@ -293,6 +359,7 @@ CreateWalDirectoryMethod(const char *basedir, bool sync)
        method->getlasterror = dir_getlasterror;
 
        dir_data = pg_malloc0(sizeof(DirectoryMethodData));
+       dir_data->compression = compression;
        dir_data->basedir = pg_strdup(basedir);
        dir_data->sync = sync;
 
index c1723d53b51b6c6a244e0b47daa3ed3e92f1d431..2cd8b6d75551897d2a8c20ca89d64f57742187a9 100644 (file)
@@ -41,7 +41,8 @@ struct WalWriteMethod
  *                                                (only implements the methods required for pg_basebackup,
  *                                                not all those required for pg_receivexlog)
  */
-WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, bool sync);
+WalWriteMethod *CreateWalDirectoryMethod(const char *basedir,
+                                                                                int compression, bool sync);
 WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync);
 
 /* Cleanup routines for previously-created methods */