]> granicus.if.org Git - postgresql/commitdiff
Pre-pad WAL files when streaming transaction log
authorMagnus Hagander <magnus@hagander.net>
Thu, 3 Nov 2011 14:37:08 +0000 (15:37 +0100)
committerMagnus Hagander <magnus@hagander.net>
Thu, 3 Nov 2011 14:37:08 +0000 (15:37 +0100)
Instead of filling files as they appear, pre-pad the
WAL files received when streaming xlog the same way
that the server does. Data is streamed into a .partial
file which is then renamed()d into palce when it's complete,
but it will always be 16MB.

This also means that the starting position for pg_receivexlog
is now simply right after the last complete segment, and we
never need to deal with partial segments there.

Patch by me, review by Fujii Masao

src/bin/pg_basebackup/pg_receivexlog.c
src/bin/pg_basebackup/receivelog.c

index ba533d35978d91ac07a9f02b81e5dc5575d3cff9..9facc198e4e176638e98993ce1183ead18eeb930 100644 (file)
@@ -71,33 +71,10 @@ usage(void)
 static bool
 segment_callback(XLogRecPtr segendpos, uint32 timeline)
 {
-       char            fn[MAXPGPATH];
-       struct stat statbuf;
-
        if (verbose)
                fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
                                progname, segendpos.xlogid, segendpos.xrecoff, timeline);
 
-       /*
-        * Check if there is a partial file for the name we just finished, and if
-        * there is, remove it under the assumption that we have now got all the
-        * data we need.
-        */
-       segendpos.xrecoff /= XLOG_SEG_SIZE;
-       PrevLogSeg(segendpos.xlogid, segendpos.xrecoff);
-       snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial",
-                        basedir, timeline,
-                        segendpos.xlogid,
-                        segendpos.xrecoff);
-       if (stat(fn, &statbuf) == 0)
-       {
-               /* File existed, get rid of it */
-               if (verbose)
-                       fprintf(stderr, _("%s: removing file \"%s\"\n"),
-                                       progname, fn);
-               unlink(fn);
-       }
-
        /*
         * Never abort from this - we handle all aborting in continue_streaming()
         */
@@ -119,9 +96,8 @@ continue_streaming(void)
 /*
  * Determine starting location for streaming, based on:
  * 1. If there are existing xlog segments, start at the end of the last one
- * 2. If the last one is a partial segment, rename it and start over, since
- *       we don't sync after every write.
- * 3. If no existing xlog exists, start from the beginning of the current
+ *    that is complete (size matches XLogSegSize)
+ * 2. If no valid xlog exists, start from the beginning of the current
  *       WAL segment.
  */
 static XLogRecPtr
@@ -133,7 +109,6 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
        bool            b;
        uint32          high_log = 0;
        uint32          high_seg = 0;
-       bool            partial = false;
 
        dir = opendir(basedir);
        if (dir == NULL)
@@ -195,7 +170,7 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
                        disconnect_and_exit(1);
                }
 
-               if (statbuf.st_size == 16 * 1024 * 1024)
+               if (statbuf.st_size == XLOG_SEG_SIZE)
                {
                        /* Completed segment */
                        if (log > high_log ||
@@ -208,37 +183,9 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
                }
                else
                {
-                       /*
-                        * This is a partial file. Rename it out of the way.
-                        */
-                       char            newfn[MAXPGPATH];
-
-                       fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"),
-                                       progname, dirent->d_name, dirent->d_name);
-
-                       snprintf(newfn, sizeof(newfn), "%s/%s.partial",
-                                        basedir, dirent->d_name);
-
-                       if (stat(newfn, &statbuf) == 0)
-                       {
-                               /*
-                                * XXX: perhaps we should only error out if the existing file
-                                * is larger?
-                                */
-                               fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"),
-                                               progname, newfn);
-                               disconnect_and_exit(1);
-                       }
-                       if (rename(fullpath, newfn) != 0)
-                       {
-                               fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"),
-                                               progname, fullpath, newfn, strerror(errno));
-                               disconnect_and_exit(1);
-                       }
-
-                       /* Don't continue looking for more, we assume this is the last */
-                       partial = true;
-                       break;
+                       fprintf(stderr, _("%s: segment file '%s' is incorrect size %d, skipping\n"),
+                                       progname, dirent->d_name, (int) statbuf.st_size);
+                       continue;
                }
        }
 
@@ -247,17 +194,11 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
        if (high_log > 0 || high_seg > 0)
        {
                XLogRecPtr      high_ptr;
-
-               if (!partial)
-               {
-                       /*
-                        * If the segment was partial, the pointer is already at the right
-                        * location since we want to re-transmit that segment. If it was
-                        * not, we need to move it to the next segment, since we are
-                        * tracking the last one that was complete.
-                        */
-                       NextLogSeg(high_log, high_seg);
-               }
+               /*
+                * Move the starting pointer to the start of the next segment,
+                * since the highest one we've seen was completed.
+                */
+               NextLogSeg(high_log, high_seg);
 
                high_ptr.xlogid = high_log;
                high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE;
index 0ca30c425f32559bd50fe55aa7ee944b3d54d014..dea944beb947d42ef4f75b11cf23cc4ef22b66e0 100644 (file)
@@ -27,6 +27,7 @@
 #include "receivelog.h"
 #include "streamutil.h"
 
+#include <sys/stat.h>
 #include <sys/time.h>
 #include <sys/types.h>
 #include <unistd.h>
@@ -41,24 +42,128 @@ const XLogRecPtr InvalidXLogRecPtr = {0, 0};
  * Open a new WAL file in the specified directory. Store the name
  * (not including the full directory) in namebuf. Assumes there is
  * enough room in this buffer...
+ *
+ * The file will be padded to 16Mb with zeroes.
  */
 static int
 open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
 {
        int                     f;
        char            fn[MAXPGPATH];
+       struct stat     statbuf;
+       char       *zerobuf;
+       int                     bytes;
 
        XLogFileName(namebuf, timeline, startpoint.xlogid,
                                 startpoint.xrecoff / XLOG_SEG_SIZE);
 
-       snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf);
-       f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666);
+       snprintf(fn, sizeof(fn), "%s/%s.partial", basedir, namebuf);
+       f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
        if (f == -1)
+       {
                fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
-                               progname, namebuf, strerror(errno));
+                               progname, fn, strerror(errno));
+               return -1;
+       }
+
+       /*
+        * Verify that the file is either empty (just created), or a complete
+        * XLogSegSize segment. Anything in between indicates a corrupt file.
+        */
+       if (fstat(f, &statbuf) != 0)
+       {
+               fprintf(stderr, _("%s: could not stat WAL segment %s: %s\n"),
+                               progname, fn, strerror(errno));
+               close(f);
+               return -1;
+       }
+       if (statbuf.st_size == XLogSegSize)
+               return f; /* File is open and ready to use */
+       if (statbuf.st_size != 0)
+       {
+               fprintf(stderr, _("%s: WAL segment %s is %d bytes, should be 0 or %d\n"),
+                               progname, fn, (int) statbuf.st_size, XLogSegSize);
+               close(f);
+               return -1;
+       }
+
+       /* New, empty, file. So pad it to 16Mb with zeroes */
+       zerobuf = xmalloc0(XLOG_BLCKSZ);
+       for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
+       {
+               if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+               {
+                       fprintf(stderr, _("%s: could not pad WAL segment %s: %s\n"),
+                                       progname, fn, strerror(errno));
+                       close(f);
+                       unlink(fn);
+                       return -1;
+               }
+       }
+       free(zerobuf);
+
+       if (lseek(f, SEEK_SET, 0) != 0)
+       {
+               fprintf(stderr, _("%s: could not seek back to beginning of WAL segment %s: %s\n"),
+                               progname, fn, strerror(errno));
+               close(f);
+               return -1;
+       }
        return f;
 }
 
+static bool
+close_walfile(int walfile, char *basedir, char *walname)
+{
+       off_t           currpos = lseek(walfile, 0, SEEK_CUR);
+
+       if (currpos == -1)
+       {
+               fprintf(stderr, _("%s: could not get current position in file %s: %s\n"),
+                               progname, walname, strerror(errno));
+               return false;
+       }
+
+       if (fsync(walfile) != 0)
+       {
+               fprintf(stderr, _("%s: could not fsync file %s: %s\n"),
+                               progname, walname, strerror(errno));
+               return false;
+       }
+
+       if (close(walfile) != 0)
+       {
+               fprintf(stderr, _("%s: could not close file %s: %s\n"),
+                               progname, walname, strerror(errno));
+               return false;
+       }
+
+       /*
+        * Rename the .partial file only if we've completed writing the
+        * whole segment.
+        */
+       if (currpos == XLOG_SEG_SIZE)
+       {
+               char            oldfn[MAXPGPATH];
+               char            newfn[MAXPGPATH];
+
+               snprintf(oldfn, sizeof(oldfn), "%s/%s.partial", basedir, walname);
+               snprintf(newfn, sizeof(newfn), "%s/%s", basedir, walname);
+               if (rename(oldfn, newfn) != 0)
+               {
+                       fprintf(stderr, _("%s: could not rename file %s: %s\n"),
+                                       progname, walname, strerror(errno));
+                       return false;
+               }
+       }
+       else
+               fprintf(stderr, _("%s: not renaming %s, segment is not complete.\n"),
+                               progname, walname);
+
+       return true;
+}
+
+
 /*
  * Local version of GetCurrentTimestamp(), since we are not linked with
  * backend code.
@@ -178,10 +283,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
                if (stream_continue && stream_continue())
                {
                        if (walfile != -1)
-                       {
-                               fsync(walfile);
-                               close(walfile);
-                       }
+                               /* Potential error message is written by close_walfile */
+                               return close_walfile(walfile, basedir, current_walfile_name);
                        return true;
                }
 
@@ -360,8 +463,10 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
                        /* Did we reach the end of a WAL segment? */
                        if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
                        {
-                               fsync(walfile);
-                               close(walfile);
+                               if (!close_walfile(walfile, basedir, current_walfile_name))
+                                       /* Error message written in close_walfile() */
+                                       return false;
+
                                walfile = -1;
                                xlogoff = 0;