]> granicus.if.org Git - postgresql/blob - src/bin/pg_basebackup/receivelog.c
f0f8760e2dbc81d3fcb2e55e333301e85fc4750e
[postgresql] / src / bin / pg_basebackup / receivelog.c
1 /*-------------------------------------------------------------------------
2  *
3  * receivelog.c - receive transaction log files using the streaming
4  *                                replication protocol.
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  *                src/bin/pg_basebackup/receivelog.c
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres_fe.h"
16
17 #include <sys/stat.h>
18 #include <unistd.h>
19
20 /* local includes */
21 #include "receivelog.h"
22 #include "streamutil.h"
23
24 #include "libpq-fe.h"
25 #include "access/xlog_internal.h"
26
27
28 /* fd and filename for currently open WAL file */
29 static int      walfile = -1;
30 static char current_walfile_name[MAXPGPATH] = "";
31 static bool reportFlushPosition = false;
32 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
33
34 static bool still_sending = true;               /* feedback still needs to be sent? */
35
36 static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
37                                  uint32 timeline, char *basedir,
38                            stream_stop_callback stream_stop, int standby_message_timeout,
39                                   char *partial_suffix, XLogRecPtr *stoppos,
40                                   bool synchronous);
41 static int CopyStreamPoll(PGconn *conn, long timeout_ms);
42 static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
43 static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
44                                                                 XLogRecPtr blockpos, int64 *last_status);
45 static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
46                                                            XLogRecPtr *blockpos, uint32 timeline,
47                                                            char *basedir, stream_stop_callback stream_stop,
48                                                            char *partial_suffix);
49 static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
50                                                                            XLogRecPtr blockpos, char *basedir, char *partial_suffix,
51                                                                            XLogRecPtr *stoppos);
52 static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
53                                                                 uint32 timeline, char *basedir,
54                                                                 stream_stop_callback stream_stop,
55                                                                 char *partial_suffix, XLogRecPtr *stoppos);
56 static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
57                                                                                  int64 last_status);
58
59 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
60                                                  uint32 *timeline);
61
62 /*
63  * Open a new WAL file in the specified directory.
64  *
65  * The file will be padded to 16Mb with zeroes. The base filename (without
66  * partial_suffix) is stored in current_walfile_name.
67  */
68 static bool
69 open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
70                          char *partial_suffix)
71 {
72         int                     f;
73         char            fn[MAXPGPATH];
74         struct stat statbuf;
75         char       *zerobuf;
76         int                     bytes;
77         XLogSegNo       segno;
78
79         XLByteToSeg(startpoint, segno);
80         XLogFileName(current_walfile_name, timeline, segno);
81
82         snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name,
83                          partial_suffix ? partial_suffix : "");
84         f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
85         if (f == -1)
86         {
87                 fprintf(stderr,
88                                 _("%s: could not open transaction log file \"%s\": %s\n"),
89                                 progname, fn, strerror(errno));
90                 return false;
91         }
92
93         /*
94          * Verify that the file is either empty (just created), or a complete
95          * XLogSegSize segment. Anything in between indicates a corrupt file.
96          */
97         if (fstat(f, &statbuf) != 0)
98         {
99                 fprintf(stderr,
100                                 _("%s: could not stat transaction log file \"%s\": %s\n"),
101                                 progname, fn, strerror(errno));
102                 close(f);
103                 return false;
104         }
105         if (statbuf.st_size == XLogSegSize)
106         {
107                 /* File is open and ready to use */
108                 walfile = f;
109                 return true;
110         }
111         if (statbuf.st_size != 0)
112         {
113                 fprintf(stderr,
114                                 _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
115                                 progname, fn, (int) statbuf.st_size, XLogSegSize);
116                 close(f);
117                 return false;
118         }
119
120         /* New, empty, file. So pad it to 16Mb with zeroes */
121         zerobuf = pg_malloc0(XLOG_BLCKSZ);
122         for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
123         {
124                 if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
125                 {
126                         fprintf(stderr,
127                                         _("%s: could not pad transaction log file \"%s\": %s\n"),
128                                         progname, fn, strerror(errno));
129                         free(zerobuf);
130                         close(f);
131                         unlink(fn);
132                         return false;
133                 }
134         }
135         free(zerobuf);
136
137         if (lseek(f, SEEK_SET, 0) != 0)
138         {
139                 fprintf(stderr,
140                                 _("%s: could not seek to beginning of transaction log file \"%s\": %s\n"),
141                                 progname, fn, strerror(errno));
142                 close(f);
143                 return false;
144         }
145         walfile = f;
146         return true;
147 }
148
149 /*
150  * Close the current WAL file (if open), and rename it to the correct
151  * filename if it's complete. On failure, prints an error message to stderr
152  * and returns false, otherwise returns true.
153  */
154 static bool
155 close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
156 {
157         off_t           currpos;
158
159         if (walfile == -1)
160                 return true;
161
162         currpos = lseek(walfile, 0, SEEK_CUR);
163         if (currpos == -1)
164         {
165                 fprintf(stderr,
166                          _("%s: could not determine seek position in file \"%s\": %s\n"),
167                                 progname, current_walfile_name, strerror(errno));
168                 return false;
169         }
170
171         if (fsync(walfile) != 0)
172         {
173                 fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
174                                 progname, current_walfile_name, strerror(errno));
175                 return false;
176         }
177
178         if (close(walfile) != 0)
179         {
180                 fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
181                                 progname, current_walfile_name, strerror(errno));
182                 walfile = -1;
183                 return false;
184         }
185         walfile = -1;
186
187         /*
188          * If we finished writing a .partial file, rename it into place.
189          */
190         if (currpos == XLOG_SEG_SIZE && partial_suffix)
191         {
192                 char            oldfn[MAXPGPATH];
193                 char            newfn[MAXPGPATH];
194
195                 snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix);
196                 snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name);
197                 if (rename(oldfn, newfn) != 0)
198                 {
199                         fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
200                                         progname, current_walfile_name, strerror(errno));
201                         return false;
202                 }
203         }
204         else if (partial_suffix)
205                 fprintf(stderr,
206                                 _("%s: not renaming \"%s%s\", segment is not complete\n"),
207                                 progname, current_walfile_name, partial_suffix);
208
209         lastFlushPosition = pos;
210         return true;
211 }
212
213
214 /*
215  * Check if a timeline history file exists.
216  */
217 static bool
218 existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
219 {
220         char            path[MAXPGPATH];
221         char            histfname[MAXFNAMELEN];
222         int                     fd;
223
224         /*
225          * Timeline 1 never has a history file. We treat that as if it existed,
226          * since we never need to stream it.
227          */
228         if (tli == 1)
229                 return true;
230
231         TLHistoryFileName(histfname, tli);
232
233         snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
234
235         fd = open(path, O_RDONLY | PG_BINARY, 0);
236         if (fd < 0)
237         {
238                 if (errno != ENOENT)
239                         fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s\n"),
240                                         progname, path, strerror(errno));
241                 return false;
242         }
243         else
244         {
245                 close(fd);
246                 return true;
247         }
248 }
249
250 static bool
251 writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content)
252 {
253         int                     size = strlen(content);
254         char            path[MAXPGPATH];
255         char            tmppath[MAXPGPATH];
256         char            histfname[MAXFNAMELEN];
257         int                     fd;
258
259         /*
260          * Check that the server's idea of how timeline history files should be
261          * named matches ours.
262          */
263         TLHistoryFileName(histfname, tli);
264         if (strcmp(histfname, filename) != 0)
265         {
266                 fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
267                                 progname, tli, filename);
268                 return false;
269         }
270
271         snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
272
273         /*
274          * Write into a temp file name.
275          */
276         snprintf(tmppath, MAXPGPATH, "%s.tmp", path);
277
278         unlink(tmppath);
279
280         fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
281         if (fd < 0)
282         {
283                 fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"),
284                                 progname, tmppath, strerror(errno));
285                 return false;
286         }
287
288         errno = 0;
289         if ((int) write(fd, content, size) != size)
290         {
291                 int                     save_errno = errno;
292
293                 /*
294                  * If we fail to make the file, delete it to release disk space
295                  */
296                 close(fd);
297                 unlink(tmppath);
298                 errno = save_errno;
299
300                 fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
301                                 progname, tmppath, strerror(errno));
302                 return false;
303         }
304
305         if (fsync(fd) != 0)
306         {
307                 close(fd);
308                 fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
309                                 progname, tmppath, strerror(errno));
310                 return false;
311         }
312
313         if (close(fd) != 0)
314         {
315                 fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
316                                 progname, tmppath, strerror(errno));
317                 return false;
318         }
319
320         /*
321          * Now move the completed history file into place with its final name.
322          */
323         if (rename(tmppath, path) < 0)
324         {
325                 fprintf(stderr, _("%s: could not rename file \"%s\" to \"%s\": %s\n"),
326                                 progname, tmppath, path, strerror(errno));
327                 return false;
328         }
329
330         return true;
331 }
332
333 /*
334  * Send a Standby Status Update message to server.
335  */
336 static bool
337 sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
338 {
339         char            replybuf[1 + 8 + 8 + 8 + 8 + 1];
340         int                     len = 0;
341
342         replybuf[len] = 'r';
343         len += 1;
344         fe_sendint64(blockpos, &replybuf[len]);         /* write */
345         len += 8;
346         if (reportFlushPosition)
347                 fe_sendint64(lastFlushPosition, &replybuf[len]);                /* flush */
348         else
349                 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);                /* flush */
350         len += 8;
351         fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);        /* apply */
352         len += 8;
353         fe_sendint64(now, &replybuf[len]);      /* sendTime */
354         len += 8;
355         replybuf[len] = replyRequested ? 1 : 0;         /* replyRequested */
356         len += 1;
357
358         if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
359         {
360                 fprintf(stderr, _("%s: could not send feedback packet: %s"),
361                                 progname, PQerrorMessage(conn));
362                 return false;
363         }
364
365         return true;
366 }
367
368 /*
369  * Check that the server version we're connected to is supported by
370  * ReceiveXlogStream().
371  *
372  * If it's not, an error message is printed to stderr, and false is returned.
373  */
374 bool
375 CheckServerVersionForStreaming(PGconn *conn)
376 {
377         int                     minServerMajor,
378                                 maxServerMajor;
379         int                     serverMajor;
380
381         /*
382          * The message format used in streaming replication changed in 9.3, so we
383          * cannot stream from older servers. And we don't support servers newer
384          * than the client; it might work, but we don't know, so err on the safe
385          * side.
386          */
387         minServerMajor = 903;
388         maxServerMajor = PG_VERSION_NUM / 100;
389         serverMajor = PQserverVersion(conn) / 100;
390         if (serverMajor < minServerMajor)
391         {
392                 const char *serverver = PQparameterStatus(conn, "server_version");
393
394                 fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions older than %s\n"),
395                                 progname,
396                                 serverver ? serverver : "'unknown'",
397                                 "9.3");
398                 return false;
399         }
400         else if (serverMajor > maxServerMajor)
401         {
402                 const char *serverver = PQparameterStatus(conn, "server_version");
403
404                 fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions newer than %s\n"),
405                                 progname,
406                                 serverver ? serverver : "'unknown'",
407                                 PG_VERSION);
408                 return false;
409         }
410         return true;
411 }
412
413 /*
414  * Receive a log stream starting at the specified position.
415  *
416  * If sysidentifier is specified, validate that both the system
417  * identifier and the timeline matches the specified ones
418  * (by sending an extra IDENTIFY_SYSTEM command)
419  *
420  * All received segments will be written to the directory
421  * specified by basedir. This will also fetch any missing timeline history
422  * files.
423  *
424  * The stream_stop callback will be called every time data
425  * is received, and whenever a segment is completed. If it returns
426  * true, the streaming will stop and the function
427  * return. As long as it returns false, streaming will continue
428  * indefinitely.
429  *
430  * standby_message_timeout controls how often we send a message
431  * back to the master letting it know our progress, in milliseconds.
432  * This message will only contain the write location, and never
433  * flush or replay.
434  *
435  * If 'partial_suffix' is not NULL, files are initially created with the
436  * given suffix, and the suffix is removed once the file is finished. That
437  * allows you to tell the difference between partial and completed files,
438  * so that you can continue later where you left.
439  *
440  * If 'synchronous' is true, the received WAL is flushed as soon as written,
441  * otherwise only when the WAL file is closed.
442  *
443  * Note: The log position *must* be at a log segment start!
444  */
445 bool
446 ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
447                                   char *sysidentifier, char *basedir,
448                                   stream_stop_callback stream_stop,
449                                   int standby_message_timeout, char *partial_suffix,
450                                   bool synchronous)
451 {
452         char            query[128];
453         char            slotcmd[128];
454         PGresult   *res;
455         XLogRecPtr      stoppos;
456
457         /*
458          * The caller should've checked the server version already, but doesn't do
459          * any harm to check it here too.
460          */
461         if (!CheckServerVersionForStreaming(conn))
462                 return false;
463
464         if (replication_slot != NULL)
465         {
466                 /*
467                  * Report the flush position, so the primary can know what WAL we'll
468                  * possibly re-request, and remove older WAL safely.
469                  *
470                  * We only report it when a slot has explicitly been used, because
471                  * reporting the flush position makes one eligible as a synchronous
472                  * replica. People shouldn't include generic names in
473                  * synchronous_standby_names, but we've protected them against it so
474                  * far, so let's continue to do so in the situations when possible. If
475                  * they've got a slot, though, we need to report the flush position,
476                  * so that the master can remove WAL.
477                  */
478                 reportFlushPosition = true;
479                 sprintf(slotcmd, "SLOT \"%s\" ", replication_slot);
480         }
481         else
482         {
483                 reportFlushPosition = false;
484                 slotcmd[0] = 0;
485         }
486
487         if (sysidentifier != NULL)
488         {
489                 /* Validate system identifier hasn't changed */
490                 res = PQexec(conn, "IDENTIFY_SYSTEM");
491                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
492                 {
493                         fprintf(stderr,
494                                         _("%s: could not send replication command \"%s\": %s"),
495                                         progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
496                         PQclear(res);
497                         return false;
498                 }
499                 if (PQntuples(res) != 1 || PQnfields(res) < 3)
500                 {
501                         fprintf(stderr,
502                                         _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
503                                         progname, PQntuples(res), PQnfields(res), 1, 3);
504                         PQclear(res);
505                         return false;
506                 }
507                 if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0)
508                 {
509                         fprintf(stderr,
510                                         _("%s: system identifier does not match between base backup and streaming connection\n"),
511                                         progname);
512                         PQclear(res);
513                         return false;
514                 }
515                 if (timeline > atoi(PQgetvalue(res, 0, 1)))
516                 {
517                         fprintf(stderr,
518                                 _("%s: starting timeline %u is not present in the server\n"),
519                                         progname, timeline);
520                         PQclear(res);
521                         return false;
522                 }
523                 PQclear(res);
524         }
525
526         /*
527          * initialize flush position to starting point, it's the caller's
528          * responsibility that that's sane.
529          */
530         lastFlushPosition = startpos;
531
532         while (1)
533         {
534                 /*
535                  * Fetch the timeline history file for this timeline, if we don't have
536                  * it already.
537                  */
538                 if (!existsTimeLineHistoryFile(basedir, timeline))
539                 {
540                         snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline);
541                         res = PQexec(conn, query);
542                         if (PQresultStatus(res) != PGRES_TUPLES_OK)
543                         {
544                                 /* FIXME: we might send it ok, but get an error */
545                                 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
546                                         progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
547                                 PQclear(res);
548                                 return false;
549                         }
550
551                         /*
552                          * The response to TIMELINE_HISTORY is a single row result set
553                          * with two fields: filename and content
554                          */
555                         if (PQnfields(res) != 2 || PQntuples(res) != 1)
556                         {
557                                 fprintf(stderr,
558                                                 _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
559                                                 progname, PQntuples(res), PQnfields(res), 1, 2);
560                         }
561
562                         /* Write the history file to disk */
563                         writeTimeLineHistoryFile(basedir, timeline,
564                                                                          PQgetvalue(res, 0, 0),
565                                                                          PQgetvalue(res, 0, 1));
566
567                         PQclear(res);
568                 }
569
570                 /*
571                  * Before we start streaming from the requested location, check if the
572                  * callback tells us to stop here.
573                  */
574                 if (stream_stop(startpos, timeline, false))
575                         return true;
576
577                 /* Initiate the replication stream at specified location */
578                 snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
579                                  slotcmd,
580                                  (uint32) (startpos >> 32), (uint32) startpos,
581                                  timeline);
582                 res = PQexec(conn, query);
583                 if (PQresultStatus(res) != PGRES_COPY_BOTH)
584                 {
585                         fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
586                                         progname, "START_REPLICATION", PQresultErrorMessage(res));
587                         PQclear(res);
588                         return false;
589                 }
590                 PQclear(res);
591
592                 /* Stream the WAL */
593                 res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
594                                                            standby_message_timeout, partial_suffix,
595                                                            &stoppos, synchronous);
596                 if (res == NULL)
597                         goto error;
598
599                 /*
600                  * Streaming finished.
601                  *
602                  * There are two possible reasons for that: a controlled shutdown, or
603                  * we reached the end of the current timeline. In case of
604                  * end-of-timeline, the server sends a result set after Copy has
605                  * finished, containing information about the next timeline. Read
606                  * that, and restart streaming from the next timeline. In case of
607                  * controlled shutdown, stop here.
608                  */
609                 if (PQresultStatus(res) == PGRES_TUPLES_OK)
610                 {
611                         /*
612                          * End-of-timeline. Read the next timeline's ID and starting
613                          * position. Usually, the starting position will match the end of
614                          * the previous timeline, but there are corner cases like if the
615                          * server had sent us half of a WAL record, when it was promoted.
616                          * The new timeline will begin at the end of the last complete
617                          * record in that case, overlapping the partial WAL record on the
618                          * the old timeline.
619                          */
620                         uint32          newtimeline;
621                         bool            parsed;
622
623                         parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline);
624                         PQclear(res);
625                         if (!parsed)
626                                 goto error;
627
628                         /* Sanity check the values the server gave us */
629                         if (newtimeline <= timeline)
630                         {
631                                 fprintf(stderr,
632                                                 _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
633                                                 progname, newtimeline, timeline);
634                                 goto error;
635                         }
636                         if (startpos > stoppos)
637                         {
638                                 fprintf(stderr,
639                                                 _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
640                                                 progname,
641                                                 timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
642                                   newtimeline, (uint32) (startpos >> 32), (uint32) startpos);
643                                 goto error;
644                         }
645
646                         /* Read the final result, which should be CommandComplete. */
647                         res = PQgetResult(conn);
648                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
649                         {
650                                 fprintf(stderr,
651                                    _("%s: unexpected termination of replication stream: %s"),
652                                                 progname, PQresultErrorMessage(res));
653                                 PQclear(res);
654                                 goto error;
655                         }
656                         PQclear(res);
657
658                         /*
659                          * Loop back to start streaming from the new timeline. Always
660                          * start streaming at the beginning of a segment.
661                          */
662                         timeline = newtimeline;
663                         startpos = startpos - (startpos % XLOG_SEG_SIZE);
664                         continue;
665                 }
666                 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
667                 {
668                         PQclear(res);
669
670                         /*
671                          * End of replication (ie. controlled shut down of the server).
672                          *
673                          * Check if the callback thinks it's OK to stop here. If not,
674                          * complain.
675                          */
676                         if (stream_stop(stoppos, timeline, false))
677                                 return true;
678                         else
679                         {
680                                 fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
681                                                 progname);
682                                 goto error;
683                         }
684                 }
685                 else
686                 {
687                         /* Server returned an error. */
688                         fprintf(stderr,
689                                         _("%s: unexpected termination of replication stream: %s"),
690                                         progname, PQresultErrorMessage(res));
691                         PQclear(res);
692                         goto error;
693                 }
694         }
695
696 error:
697         if (walfile != -1 && close(walfile) != 0)
698                 fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
699                                 progname, current_walfile_name, strerror(errno));
700         walfile = -1;
701         return false;
702 }
703
704 /*
705  * Helper function to parse the result set returned by server after streaming
706  * has finished. On failure, prints an error to stderr and returns false.
707  */
708 static bool
709 ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
710 {
711         uint32          startpos_xlogid,
712                                 startpos_xrecoff;
713
714         /*----------
715          * The result set consists of one row and two columns, e.g:
716          *
717          *      next_tli | next_tli_startpos
718          * ----------+-------------------
719          *                 4 | 0/9949AE0
720          *
721          * next_tli is the timeline ID of the next timeline after the one that
722          * just finished streaming. next_tli_startpos is the XLOG position where
723          * the server switched to it.
724          *----------
725          */
726         if (PQnfields(res) < 2 || PQntuples(res) != 1)
727         {
728                 fprintf(stderr,
729                                 _("%s: unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields\n"),
730                                 progname, PQntuples(res), PQnfields(res), 1, 2);
731                 return false;
732         }
733
734         *timeline = atoi(PQgetvalue(res, 0, 0));
735         if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
736                            &startpos_xrecoff) != 2)
737         {
738                 fprintf(stderr,
739                         _("%s: could not parse next timeline's starting point \"%s\"\n"),
740                                 progname, PQgetvalue(res, 0, 1));
741                 return false;
742         }
743         *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
744
745         return true;
746 }
747
748 /*
749  * The main loop of ReceiveXlogStream. Handles the COPY stream after
750  * initiating streaming with the START_STREAMING command.
751  *
752  * If the COPY ends (not necessarily successfully) due a message from the
753  * server, returns a PGresult and sets *stoppos to the last byte written.
754  * On any other sort of error, returns NULL.
755  */
756 static PGresult *
757 HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
758                                  char *basedir, stream_stop_callback stream_stop,
759                                  int standby_message_timeout, char *partial_suffix,
760                                  XLogRecPtr *stoppos, bool synchronous)
761 {
762         char       *copybuf = NULL;
763         int64           last_status = -1;
764         XLogRecPtr      blockpos = startpos;
765
766         still_sending = true;
767
768         while (1)
769         {
770                 int                     r;
771                 int64           now;
772                 long            sleeptime;
773
774                 /*
775                  * Check if we should continue streaming, or abort at this point.
776                  */
777                 if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
778                                                                 stream_stop, partial_suffix, stoppos))
779                         goto error;
780
781                 now = feGetCurrentTimestamp();
782
783                 /*
784                  * If synchronous option is true, issue sync command as soon as
785                  * there are WAL data which has not been flushed yet.
786                  */
787                 if (synchronous && lastFlushPosition < blockpos && walfile != -1)
788                 {
789                         if (fsync(walfile) != 0)
790                         {
791                                 fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
792                                                 progname, current_walfile_name, strerror(errno));
793                                 goto error;
794                         }
795                         lastFlushPosition = blockpos;
796
797                         /*
798                          * Send feedback so that the server sees the latest WAL locations
799                          * immediately.
800                          */
801                         if (!sendFeedback(conn, blockpos, now, false))
802                                 goto error;
803                         last_status = now;
804                 }
805
806                 /*
807                  * Potentially send a status message to the master
808                  */
809                 if (still_sending && standby_message_timeout > 0 &&
810                         feTimestampDifferenceExceeds(last_status, now,
811                                                                                  standby_message_timeout))
812                 {
813                         /* Time to send feedback! */
814                         if (!sendFeedback(conn, blockpos, now, false))
815                                 goto error;
816                         last_status = now;
817                 }
818
819                 /*
820                  * Calculate how long send/receive loops should sleep
821                  */
822                 sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
823                                                                                                  last_status);
824
825                 r = CopyStreamReceive(conn, sleeptime, &copybuf);
826                 while (r != 0)
827                 {
828                         if (r == -1)
829                                 goto error;
830                         if (r == -2)
831                         {
832                                 PGresult        *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
833                                                                                                                  basedir, partial_suffix, stoppos);
834                                 if (res == NULL)
835                                         goto error;
836                                 else
837                                         return res;
838                         }
839
840                         /* Check the message type. */
841                         if (copybuf[0] == 'k')
842                         {
843                                 if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
844                                                                                  &last_status))
845                                         goto error;
846                         }
847                         else if (copybuf[0] == 'w')
848                         {
849                                 if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
850                                                                                 timeline, basedir, stream_stop, partial_suffix))
851                                         goto error;
852
853                                 /*
854                                  * Check if we should continue streaming, or abort at this point.
855                                  */
856                                 if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
857                                                                                  stream_stop, partial_suffix, stoppos))
858                                         goto error;
859                         }
860                         else
861                         {
862                                 fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
863                                                 progname, copybuf[0]);
864                                 goto error;
865                         }
866
867                         /*
868                          * Process the received data, and any subsequent data we
869                          * can read without blocking.
870                          */
871                         r = CopyStreamReceive(conn, 0, &copybuf);
872                 }
873         }
874
875 error:
876         if (copybuf != NULL)
877                 PQfreemem(copybuf);
878         return NULL;
879 }
880
881 /*
882  * Wait until we can read CopyData message, or timeout.
883  *
884  * Returns 1 if data has become available for reading, 0 if timed out
885  * or interrupted by signal, and -1 on an error.
886  */
887 static int
888 CopyStreamPoll(PGconn *conn, long timeout_ms)
889 {
890         int                     ret;
891         fd_set          input_mask;
892         struct timeval timeout;
893         struct timeval *timeoutptr;
894
895         if (PQsocket(conn) < 0)
896         {
897                 fprintf(stderr, _("%s: socket not open"), progname);
898                 return -1;
899         }
900
901         FD_ZERO(&input_mask);
902         FD_SET(PQsocket(conn), &input_mask);
903
904         if (timeout_ms < 0)
905                 timeoutptr = NULL;
906         else
907         {
908                 timeout.tv_sec = timeout_ms / 1000L;
909                 timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
910                 timeoutptr = &timeout;
911         }
912
913         ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
914         if (ret == 0 || (ret < 0 && errno == EINTR))
915                 return 0;               /* Got a timeout or signal */
916         else if (ret < 0)
917         {
918                 fprintf(stderr, _("%s: select() failed: %s\n"),
919                                 progname, strerror(errno));
920                 return -1;
921         }
922
923         return 1;
924 }
925
926 /*
927  * Receive CopyData message available from XLOG stream, blocking for
928  * maximum of 'timeout' ms.
929  *
930  * If data was received, returns the length of the data. *buffer is set to
931  * point to a buffer holding the received message. The buffer is only valid
932  * until the next CopyStreamReceive call.
933  *
934  * 0 if no data was available within timeout, or wait was interrupted
935  * by signal. -1 on error. -2 if the server ended the COPY.
936  */
937 static int
938 CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
939 {
940         char       *copybuf = NULL;
941         int                     rawlen;
942
943         if (*buffer != NULL)
944                 PQfreemem(*buffer);
945         *buffer = NULL;
946
947         /* Try to receive a CopyData message */
948         rawlen = PQgetCopyData(conn, &copybuf, 1);
949         if (rawlen == 0)
950         {
951                 /*
952                  * No data available. Wait for some to appear, but not longer than
953                  * the specified timeout, so that we can ping the server.
954                  */
955                 if (timeout != 0)
956                 {
957                         int             ret;
958
959                         ret = CopyStreamPoll(conn, timeout);
960                         if (ret <= 0)
961                                 return ret;
962                 }
963
964                 /* Else there is actually data on the socket */
965                 if (PQconsumeInput(conn) == 0)
966                 {
967                         fprintf(stderr,
968                                         _("%s: could not receive data from WAL stream: %s"),
969                                         progname, PQerrorMessage(conn));
970                         return -1;
971                 }
972
973                 /* Now that we've consumed some input, try again */
974                 rawlen = PQgetCopyData(conn, &copybuf, 1);
975                 if (rawlen == 0)
976                         return 0;
977         }
978         if (rawlen == -1)                       /* end-of-streaming or error */
979                 return -2;
980         if (rawlen == -2)
981         {
982                 fprintf(stderr, _("%s: could not read COPY data: %s"),
983                                 progname, PQerrorMessage(conn));
984                 return -1;
985         }
986
987         /* Return received messages to caller */
988         *buffer = copybuf;
989         return rawlen;
990 }
991
992 /*
993  * Process the keepalive message.
994  */
995 static bool
996 ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
997                                         XLogRecPtr blockpos, int64 *last_status)
998 {
999         int                     pos;
1000         bool            replyRequested;
1001         int64           now;
1002
1003         /*
1004          * Parse the keepalive message, enclosed in the CopyData message.
1005          * We just check if the server requested a reply, and ignore the
1006          * rest.
1007          */
1008         pos = 1;                        /* skip msgtype 'k' */
1009         pos += 8;                       /* skip walEnd */
1010         pos += 8;                       /* skip sendTime */
1011
1012         if (len < pos + 1)
1013         {
1014                 fprintf(stderr, _("%s: streaming header too small: %d\n"),
1015                                 progname, len);
1016                 return false;
1017         }
1018         replyRequested = copybuf[pos];
1019
1020         /* If the server requested an immediate reply, send one. */
1021         if (replyRequested && still_sending)
1022         {
1023                 if (reportFlushPosition && lastFlushPosition < blockpos &&
1024                         walfile != -1)
1025                 {
1026                         /*
1027                          * If a valid flush location needs to be reported,
1028                          * flush the current WAL file so that the latest flush
1029                          * location is sent back to the server. This is necessary to
1030                          * see whether the last WAL data has been successfully
1031                          * replicated or not, at the normal shutdown of the server.
1032                          */
1033                         if (fsync(walfile) != 0)
1034                         {
1035                                 fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
1036                                                 progname, current_walfile_name, strerror(errno));
1037                                 return false;
1038                         }
1039                         lastFlushPosition = blockpos;
1040                 }
1041
1042                 now = feGetCurrentTimestamp();
1043                 if (!sendFeedback(conn, blockpos, now, false))
1044                         return false;
1045                 *last_status = now;
1046         }
1047
1048         return true;
1049 }
1050
1051 /*
1052  * Process XLogData message.
1053  */
1054 static bool
1055 ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
1056                                    XLogRecPtr *blockpos, uint32 timeline,
1057                                    char *basedir, stream_stop_callback stream_stop,
1058                                    char *partial_suffix)
1059 {
1060         int                     xlogoff;
1061         int                     bytes_left;
1062         int                     bytes_written;
1063         int                     hdr_len;
1064
1065         /*
1066          * Once we've decided we don't want to receive any more, just
1067          * ignore any subsequent XLogData messages.
1068          */
1069         if (!(still_sending))
1070                 return true;
1071
1072         /*
1073          * Read the header of the XLogData message, enclosed in the
1074          * CopyData message. We only need the WAL location field
1075          * (dataStart), the rest of the header is ignored.
1076          */
1077         hdr_len = 1;            /* msgtype 'w' */
1078         hdr_len += 8;           /* dataStart */
1079         hdr_len += 8;           /* walEnd */
1080         hdr_len += 8;           /* sendTime */
1081         if (len < hdr_len)
1082         {
1083                 fprintf(stderr, _("%s: streaming header too small: %d\n"),
1084                                 progname, len);
1085                 return false;
1086         }
1087         *blockpos = fe_recvint64(&copybuf[1]);
1088
1089         /* Extract WAL location for this block */
1090         xlogoff = *blockpos % XLOG_SEG_SIZE;
1091
1092         /*
1093          * Verify that the initial location in the stream matches where we
1094          * think we are.
1095          */
1096         if (walfile == -1)
1097         {
1098                 /* No file open yet */
1099                 if (xlogoff != 0)
1100                 {
1101                         fprintf(stderr,
1102                                         _("%s: received transaction log record for offset %u with no file open\n"),
1103                                         progname, xlogoff);
1104                         return false;
1105                 }
1106         }
1107         else
1108         {
1109                 /* More data in existing segment */
1110                 /* XXX: store seek value don't reseek all the time */
1111                 if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
1112                 {
1113                         fprintf(stderr,
1114                                         _("%s: got WAL data offset %08x, expected %08x\n"),
1115                                         progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
1116                         return false;
1117                 }
1118         }
1119
1120         bytes_left = len - hdr_len;
1121         bytes_written = 0;
1122
1123         while (bytes_left)
1124         {
1125                 int                     bytes_to_write;
1126
1127                 /*
1128                  * If crossing a WAL boundary, only write up until we reach
1129                  * XLOG_SEG_SIZE.
1130                  */
1131                 if (xlogoff + bytes_left > XLOG_SEG_SIZE)
1132                         bytes_to_write = XLOG_SEG_SIZE - xlogoff;
1133                 else
1134                         bytes_to_write = bytes_left;
1135
1136                 if (walfile == -1)
1137                 {
1138                         if (!open_walfile(*blockpos, timeline,
1139                                                           basedir, partial_suffix))
1140                         {
1141                                 /* Error logged by open_walfile */
1142                                 return false;
1143                         }
1144                 }
1145
1146                 if (write(walfile,
1147                                   copybuf + hdr_len + bytes_written,
1148                                   bytes_to_write) != bytes_to_write)
1149                 {
1150                         fprintf(stderr,
1151                                         _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
1152                                         progname, bytes_to_write, current_walfile_name,
1153                                         strerror(errno));
1154                         return false;
1155                 }
1156
1157                 /* Write was successful, advance our position */
1158                 bytes_written += bytes_to_write;
1159                 bytes_left -= bytes_to_write;
1160                 *blockpos += bytes_to_write;
1161                 xlogoff += bytes_to_write;
1162
1163                 /* Did we reach the end of a WAL segment? */
1164                 if (*blockpos % XLOG_SEG_SIZE == 0)
1165                 {
1166                         if (!close_walfile(basedir, partial_suffix, *blockpos))
1167                                 /* Error message written in close_walfile() */
1168                                 return false;
1169
1170                         xlogoff = 0;
1171
1172                         if (still_sending && stream_stop(*blockpos, timeline, true))
1173                         {
1174                                 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1175                                 {
1176                                         fprintf(stderr, _("%s: could not send copy-end packet: %s"),
1177                                                         progname, PQerrorMessage(conn));
1178                                         return false;
1179                                 }
1180                                 still_sending = false;
1181                                 return true;    /* ignore the rest of this XLogData packet */
1182                         }
1183                 }
1184         }
1185         /* No more data left to write, receive next copy packet */
1186
1187         return true;
1188 }
1189
1190 /*
1191  * Handle end of the copy stream.
1192  */
1193 static PGresult *
1194 HandleEndOfCopyStream(PGconn *conn, char *copybuf,
1195                                           XLogRecPtr blockpos, char *basedir, char *partial_suffix,
1196                                           XLogRecPtr *stoppos)
1197 {
1198         PGresult   *res = PQgetResult(conn);
1199
1200         /*
1201          * The server closed its end of the copy stream.  If we haven't
1202          * closed ours already, we need to do so now, unless the server
1203          * threw an error, in which case we don't.
1204          */
1205         if (still_sending)
1206         {
1207                 if (!close_walfile(basedir, partial_suffix, blockpos))
1208                 {
1209                         /* Error message written in close_walfile() */
1210                         PQclear(res);
1211                         return NULL;
1212                 }
1213                 if (PQresultStatus(res) == PGRES_COPY_IN)
1214                 {
1215                         if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1216                         {
1217                                 fprintf(stderr,
1218                                                 _("%s: could not send copy-end packet: %s"),
1219                                                 progname, PQerrorMessage(conn));
1220                                 PQclear(res);
1221                                 return NULL;
1222                         }
1223                         res = PQgetResult(conn);
1224                 }
1225                 still_sending = false;
1226         }
1227         if (copybuf != NULL)
1228                 PQfreemem(copybuf);
1229         *stoppos = blockpos;
1230         return res;
1231 }
1232
1233 /*
1234  * Check if we should continue streaming, or abort at this point.
1235  */
1236 static bool
1237 CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
1238                                         char *basedir, stream_stop_callback stream_stop,
1239                                         char *partial_suffix, XLogRecPtr *stoppos)
1240 {
1241         if (still_sending && stream_stop(blockpos, timeline, false))
1242         {
1243                 if (!close_walfile(basedir, partial_suffix, blockpos))
1244                 {
1245                         /* Potential error message is written by close_walfile */
1246                         return false;
1247                 }
1248                 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1249                 {
1250                         fprintf(stderr, _("%s: could not send copy-end packet: %s"),
1251                                         progname, PQerrorMessage(conn));
1252                         return false;
1253                 }
1254                 still_sending = false;
1255         }
1256
1257         return true;
1258 }
1259
1260 /*
1261  * Calculate how long send/receive loops should sleep
1262  */
1263 static long
1264 CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
1265                                                          int64 last_status)
1266 {
1267         int64           status_targettime = 0;
1268         long            sleeptime;
1269
1270         if (standby_message_timeout && still_sending)
1271                 status_targettime = last_status +
1272                         (standby_message_timeout - 1) * ((int64) 1000);
1273
1274         if (status_targettime > 0)
1275         {
1276                 long            secs;
1277                 int                     usecs;
1278
1279                 feTimestampDifference(now,
1280                                                           status_targettime,
1281                                                           &secs,
1282                                                           &usecs);
1283                 /* Always sleep at least 1 sec */
1284                 if (secs <= 0)
1285                 {
1286                         secs = 1;
1287                         usecs = 0;
1288                 }
1289
1290                 sleeptime = secs * 1000 + usecs / 1000;
1291         }
1292         else
1293                 sleeptime = -1;
1294
1295         return sleeptime;
1296 }