]> granicus.if.org Git - postgresql/blob - src/bin/pg_basebackup/receivelog.c
Phase 2 of pgindent updates.
[postgresql] / src / bin / pg_basebackup / receivelog.c
1 /*-------------------------------------------------------------------------
2  *
3  * receivelog.c - receive WAL files using the streaming
4  *                                replication protocol.
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  *                src/bin/pg_basebackup/receivelog.c
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres_fe.h"
16
17 #include <sys/stat.h>
18 #include <unistd.h>
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
21 #endif
22
23 /* local includes */
24 #include "receivelog.h"
25 #include "streamutil.h"
26
27 #include "libpq-fe.h"
28 #include "access/xlog_internal.h"
29 #include "common/file_utils.h"
30
31
32 /* fd and filename for currently open WAL file */
33 static Walfile *walfile = NULL;
34 static char current_walfile_name[MAXPGPATH] = "";
35 static bool reportFlushPosition = false;
36 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
37
38 static bool still_sending = true;       /* feedback still needs to be sent? */
39
40 static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
41                                  XLogRecPtr *stoppos);
42 static int      CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
43 static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
44                                   char **buffer);
45 static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
46                                         int len, XLogRecPtr blockpos, TimestampTz *last_status);
47 static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
48                                    XLogRecPtr *blockpos);
49 static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
50                                           XLogRecPtr blockpos, XLogRecPtr *stoppos);
51 static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
52                                         XLogRecPtr *stoppos);
53 static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
54                                                          TimestampTz last_status);
55
56 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
57                                                  uint32 *timeline);
58
59 static bool
60 mark_file_as_archived(StreamCtl *stream, const char *fname)
61 {
62         Walfile    *f;
63         static char tmppath[MAXPGPATH];
64
65         snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
66                          fname);
67
68         f = stream->walmethod->open_for_write(tmppath, NULL, 0);
69         if (f == NULL)
70         {
71                 fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
72                                 progname, tmppath, stream->walmethod->getlasterror());
73                 return false;
74         }
75
76         stream->walmethod->close(f, CLOSE_NORMAL);
77
78         return true;
79 }
80
81 /*
82  * Open a new WAL file in the specified directory.
83  *
84  * Returns true if OK; on failure, returns false after printing an error msg.
85  * On success, 'walfile' is set to the FD for the file, and the base filename
86  * (without partial_suffix) is stored in 'current_walfile_name'.
87  *
88  * The file will be padded to 16Mb with zeroes.
89  */
90 static bool
91 open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
92 {
93         Walfile    *f;
94         char            fn[MAXPGPATH];
95         ssize_t         size;
96         XLogSegNo       segno;
97
98         XLByteToSeg(startpoint, segno);
99         XLogFileName(current_walfile_name, stream->timeline, segno);
100
101         snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
102                          stream->partial_suffix ? stream->partial_suffix : "");
103
104         /*
105          * When streaming to files, if an existing file exists we verify that it's
106          * either empty (just created), or a complete XLogSegSize segment (in
107          * which case it has been created and padded). Anything else indicates a
108          * corrupt file.
109          *
110          * When streaming to tar, no file with this name will exist before, so we
111          * never have to verify a size.
112          */
113         if (stream->walmethod->existsfile(fn))
114         {
115                 size = stream->walmethod->get_file_size(fn);
116                 if (size < 0)
117                 {
118                         fprintf(stderr,
119                         _("%s: could not get size of write-ahead log file \"%s\": %s\n"),
120                                         progname, fn, stream->walmethod->getlasterror());
121                         return false;
122                 }
123                 if (size == XLogSegSize)
124                 {
125                         /* Already padded file. Open it for use */
126                         f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
127                         if (f == NULL)
128                         {
129                                 fprintf(stderr,
130                                                 _("%s: could not open existing write-ahead log file \"%s\": %s\n"),
131                                                 progname, fn, stream->walmethod->getlasterror());
132                                 return false;
133                         }
134
135                         /* fsync file in case of a previous crash */
136                         if (stream->walmethod->sync(f) != 0)
137                         {
138                                 fprintf(stderr,
139                                                 _("%s: could not sync existing write-ahead log file \"%s\": %s\n"),
140                                                 progname, fn, stream->walmethod->getlasterror());
141                                 stream->walmethod->close(f, CLOSE_UNLINK);
142                                 return false;
143                         }
144
145                         walfile = f;
146                         return true;
147                 }
148                 if (size != 0)
149                 {
150                         /* if write didn't set errno, assume problem is no disk space */
151                         if (errno == 0)
152                                 errno = ENOSPC;
153                         fprintf(stderr,
154                                         _("%s: write-ahead log file \"%s\" has %d bytes, should be 0 or %d\n"),
155                                         progname, fn, (int) size, XLogSegSize);
156                         return false;
157                 }
158                 /* File existed and was empty, so fall through and open */
159         }
160
161         /* No file existed, so create one */
162
163         f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, XLogSegSize);
164         if (f == NULL)
165         {
166                 fprintf(stderr,
167                                 _("%s: could not open write-ahead log file \"%s\": %s\n"),
168                                 progname, fn, stream->walmethod->getlasterror());
169                 return false;
170         }
171
172         walfile = f;
173         return true;
174 }
175
176 /*
177  * Close the current WAL file (if open), and rename it to the correct
178  * filename if it's complete. On failure, prints an error message to stderr
179  * and returns false, otherwise returns true.
180  */
181 static bool
182 close_walfile(StreamCtl *stream, XLogRecPtr pos)
183 {
184         off_t           currpos;
185         int                     r;
186
187         if (walfile == NULL)
188                 return true;
189
190         currpos = stream->walmethod->get_current_pos(walfile);
191         if (currpos == -1)
192         {
193                 fprintf(stderr,
194                          _("%s: could not determine seek position in file \"%s\": %s\n"),
195                   progname, current_walfile_name, stream->walmethod->getlasterror());
196                 stream->walmethod->close(walfile, CLOSE_UNLINK);
197                 walfile = NULL;
198
199                 return false;
200         }
201
202         if (stream->partial_suffix)
203         {
204                 if (currpos == XLOG_SEG_SIZE)
205                         r = stream->walmethod->close(walfile, CLOSE_NORMAL);
206                 else
207                 {
208                         fprintf(stderr,
209                                         _("%s: not renaming \"%s%s\", segment is not complete\n"),
210                                         progname, current_walfile_name, stream->partial_suffix);
211                         r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
212                 }
213         }
214         else
215                 r = stream->walmethod->close(walfile, CLOSE_NORMAL);
216
217         walfile = NULL;
218
219         if (r != 0)
220         {
221                 fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
222                   progname, current_walfile_name, stream->walmethod->getlasterror());
223                 return false;
224         }
225
226         /*
227          * Mark file as archived if requested by the caller - pg_basebackup needs
228          * to do so as files can otherwise get archived again after promotion of a
229          * new node. This is in line with walreceiver.c always doing a
230          * XLogArchiveForceDone() after a complete segment.
231          */
232         if (currpos == XLOG_SEG_SIZE && stream->mark_done)
233         {
234                 /* writes error message if failed */
235                 if (!mark_file_as_archived(stream, current_walfile_name))
236                         return false;
237         }
238
239         lastFlushPosition = pos;
240         return true;
241 }
242
243
244 /*
245  * Check if a timeline history file exists.
246  */
247 static bool
248 existsTimeLineHistoryFile(StreamCtl *stream)
249 {
250         char            histfname[MAXFNAMELEN];
251
252         /*
253          * Timeline 1 never has a history file. We treat that as if it existed,
254          * since we never need to stream it.
255          */
256         if (stream->timeline == 1)
257                 return true;
258
259         TLHistoryFileName(histfname, stream->timeline);
260
261         return stream->walmethod->existsfile(histfname);
262 }
263
264 static bool
265 writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
266 {
267         int                     size = strlen(content);
268         char            histfname[MAXFNAMELEN];
269         Walfile    *f;
270
271         /*
272          * Check that the server's idea of how timeline history files should be
273          * named matches ours.
274          */
275         TLHistoryFileName(histfname, stream->timeline);
276         if (strcmp(histfname, filename) != 0)
277         {
278                 fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
279                                 progname, stream->timeline, filename);
280                 return false;
281         }
282
283         f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
284         if (f == NULL)
285         {
286                 fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"),
287                                 progname, histfname, stream->walmethod->getlasterror());
288                 return false;
289         }
290
291         if ((int) stream->walmethod->write(f, content, size) != size)
292         {
293                 fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
294                                 progname, histfname, stream->walmethod->getlasterror());
295
296                 /*
297                  * If we fail to make the file, delete it to release disk space
298                  */
299                 stream->walmethod->close(f, CLOSE_UNLINK);
300
301                 return false;
302         }
303
304         if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
305         {
306                 fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
307                                 progname, histfname, stream->walmethod->getlasterror());
308                 return false;
309         }
310
311         /* Maintain archive_status, check close_walfile() for details. */
312         if (stream->mark_done)
313         {
314                 /* writes error message if failed */
315                 if (!mark_file_as_archived(stream, histfname))
316                         return false;
317         }
318
319         return true;
320 }
321
322 /*
323  * Send a Standby Status Update message to server.
324  */
325 static bool
326 sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
327 {
328         char            replybuf[1 + 8 + 8 + 8 + 8 + 1];
329         int                     len = 0;
330
331         replybuf[len] = 'r';
332         len += 1;
333         fe_sendint64(blockpos, &replybuf[len]); /* write */
334         len += 8;
335         if (reportFlushPosition)
336                 fe_sendint64(lastFlushPosition, &replybuf[len]);        /* flush */
337         else
338                 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);        /* flush */
339         len += 8;
340         fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);        /* apply */
341         len += 8;
342         fe_sendint64(now, &replybuf[len]);      /* sendTime */
343         len += 8;
344         replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
345         len += 1;
346
347         if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
348         {
349                 fprintf(stderr, _("%s: could not send feedback packet: %s"),
350                                 progname, PQerrorMessage(conn));
351                 return false;
352         }
353
354         return true;
355 }
356
357 /*
358  * Check that the server version we're connected to is supported by
359  * ReceiveXlogStream().
360  *
361  * If it's not, an error message is printed to stderr, and false is returned.
362  */
363 bool
364 CheckServerVersionForStreaming(PGconn *conn)
365 {
366         int                     minServerMajor,
367                                 maxServerMajor;
368         int                     serverMajor;
369
370         /*
371          * The message format used in streaming replication changed in 9.3, so we
372          * cannot stream from older servers. And we don't support servers newer
373          * than the client; it might work, but we don't know, so err on the safe
374          * side.
375          */
376         minServerMajor = 903;
377         maxServerMajor = PG_VERSION_NUM / 100;
378         serverMajor = PQserverVersion(conn) / 100;
379         if (serverMajor < minServerMajor)
380         {
381                 const char *serverver = PQparameterStatus(conn, "server_version");
382
383                 fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions older than %s\n"),
384                                 progname,
385                                 serverver ? serverver : "'unknown'",
386                                 "9.3");
387                 return false;
388         }
389         else if (serverMajor > maxServerMajor)
390         {
391                 const char *serverver = PQparameterStatus(conn, "server_version");
392
393                 fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions newer than %s\n"),
394                                 progname,
395                                 serverver ? serverver : "'unknown'",
396                                 PG_VERSION);
397                 return false;
398         }
399         return true;
400 }
401
402 /*
403  * Receive a log stream starting at the specified position.
404  *
405  * Individual parameters are passed through the StreamCtl structure.
406  *
407  * If sysidentifier is specified, validate that both the system
408  * identifier and the timeline matches the specified ones
409  * (by sending an extra IDENTIFY_SYSTEM command)
410  *
411  * All received segments will be written to the directory
412  * specified by basedir. This will also fetch any missing timeline history
413  * files.
414  *
415  * The stream_stop callback will be called every time data
416  * is received, and whenever a segment is completed. If it returns
417  * true, the streaming will stop and the function
418  * return. As long as it returns false, streaming will continue
419  * indefinitely.
420  *
421  * If stream_stop() checks for external input, stop_socket should be set to
422  * the FD it checks.  This will allow such input to be detected promptly
423  * rather than after standby_message_timeout (which might be indefinite).
424  * Note that signals will interrupt waits for input as well, but that is
425  * race-y since a signal received while busy won't interrupt the wait.
426  *
427  * standby_message_timeout controls how often we send a message
428  * back to the master letting it know our progress, in milliseconds.
429  * Zero means no messages are sent.
430  * This message will only contain the write location, and never
431  * flush or replay.
432  *
433  * If 'partial_suffix' is not NULL, files are initially created with the
434  * given suffix, and the suffix is removed once the file is finished. That
435  * allows you to tell the difference between partial and completed files,
436  * so that you can continue later where you left.
437  *
438  * If 'synchronous' is true, the received WAL is flushed as soon as written,
439  * otherwise only when the WAL file is closed.
440  *
441  * Note: The WAL location *must* be at a log segment start!
442  */
443 bool
444 ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
445 {
446         char            query[128];
447         char            slotcmd[128];
448         PGresult   *res;
449         XLogRecPtr      stoppos;
450
451         /*
452          * The caller should've checked the server version already, but doesn't do
453          * any harm to check it here too.
454          */
455         if (!CheckServerVersionForStreaming(conn))
456                 return false;
457
458         /*
459          * Decide whether we want to report the flush position. If we report the
460          * flush position, the primary will know what WAL we'll possibly
461          * re-request, and it can then remove older WAL safely. We must always do
462          * that when we are using slots.
463          *
464          * Reporting the flush position makes one eligible as a synchronous
465          * replica. People shouldn't include generic names in
466          * synchronous_standby_names, but we've protected them against it so far,
467          * so let's continue to do so unless specifically requested.
468          */
469         if (stream->replication_slot != NULL)
470         {
471                 reportFlushPosition = true;
472                 sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
473         }
474         else
475         {
476                 if (stream->synchronous)
477                         reportFlushPosition = true;
478                 else
479                         reportFlushPosition = false;
480                 slotcmd[0] = 0;
481         }
482
483         if (stream->sysidentifier != NULL)
484         {
485                 /* Validate system identifier hasn't changed */
486                 res = PQexec(conn, "IDENTIFY_SYSTEM");
487                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
488                 {
489                         fprintf(stderr,
490                                         _("%s: could not send replication command \"%s\": %s"),
491                                         progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
492                         PQclear(res);
493                         return false;
494                 }
495                 if (PQntuples(res) != 1 || PQnfields(res) < 3)
496                 {
497                         fprintf(stderr,
498                                         _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
499                                         progname, PQntuples(res), PQnfields(res), 1, 3);
500                         PQclear(res);
501                         return false;
502                 }
503                 if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
504                 {
505                         fprintf(stderr,
506                                         _("%s: system identifier does not match between base backup and streaming connection\n"),
507                                         progname);
508                         PQclear(res);
509                         return false;
510                 }
511                 if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
512                 {
513                         fprintf(stderr,
514                                 _("%s: starting timeline %u is not present in the server\n"),
515                                         progname, stream->timeline);
516                         PQclear(res);
517                         return false;
518                 }
519                 PQclear(res);
520         }
521
522         /*
523          * Create temporary replication slot if one is needed
524          */
525         if (stream->temp_slot)
526         {
527                 snprintf(query, sizeof(query),
528                          "CREATE_REPLICATION_SLOT \"%s\" TEMPORARY PHYSICAL RESERVE_WAL",
529                                  stream->replication_slot);
530                 res = PQexec(conn, query);
531                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
532                 {
533                         fprintf(stderr, _("%s: could not create temporary replication slot \"%s\": %s"),
534                                         progname, stream->replication_slot, PQerrorMessage(conn));
535                         PQclear(res);
536                         return false;
537                 }
538         }
539
540         /*
541          * initialize flush position to starting point, it's the caller's
542          * responsibility that that's sane.
543          */
544         lastFlushPosition = stream->startpos;
545
546         while (1)
547         {
548                 /*
549                  * Fetch the timeline history file for this timeline, if we don't have
550                  * it already. When streaming log to tar, this will always return
551                  * false, as we are never streaming into an existing file and
552                  * therefore there can be no pre-existing timeline history file.
553                  */
554                 if (!existsTimeLineHistoryFile(stream))
555                 {
556                         snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
557                         res = PQexec(conn, query);
558                         if (PQresultStatus(res) != PGRES_TUPLES_OK)
559                         {
560                                 /* FIXME: we might send it ok, but get an error */
561                                 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
562                                         progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
563                                 PQclear(res);
564                                 return false;
565                         }
566
567                         /*
568                          * The response to TIMELINE_HISTORY is a single row result set
569                          * with two fields: filename and content
570                          */
571                         if (PQnfields(res) != 2 || PQntuples(res) != 1)
572                         {
573                                 fprintf(stderr,
574                                                 _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
575                                                 progname, PQntuples(res), PQnfields(res), 1, 2);
576                         }
577
578                         /* Write the history file to disk */
579                         writeTimeLineHistoryFile(stream,
580                                                                          PQgetvalue(res, 0, 0),
581                                                                          PQgetvalue(res, 0, 1));
582
583                         PQclear(res);
584                 }
585
586                 /*
587                  * Before we start streaming from the requested location, check if the
588                  * callback tells us to stop here.
589                  */
590                 if (stream->stream_stop(stream->startpos, stream->timeline, false))
591                         return true;
592
593                 /* Initiate the replication stream at specified location */
594                 snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
595                                  slotcmd,
596                                  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
597                                  stream->timeline);
598                 res = PQexec(conn, query);
599                 if (PQresultStatus(res) != PGRES_COPY_BOTH)
600                 {
601                         fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
602                                         progname, "START_REPLICATION", PQresultErrorMessage(res));
603                         PQclear(res);
604                         return false;
605                 }
606                 PQclear(res);
607
608                 /* Stream the WAL */
609                 res = HandleCopyStream(conn, stream, &stoppos);
610                 if (res == NULL)
611                         goto error;
612
613                 /*
614                  * Streaming finished.
615                  *
616                  * There are two possible reasons for that: a controlled shutdown, or
617                  * we reached the end of the current timeline. In case of
618                  * end-of-timeline, the server sends a result set after Copy has
619                  * finished, containing information about the next timeline. Read
620                  * that, and restart streaming from the next timeline. In case of
621                  * controlled shutdown, stop here.
622                  */
623                 if (PQresultStatus(res) == PGRES_TUPLES_OK)
624                 {
625                         /*
626                          * End-of-timeline. Read the next timeline's ID and starting
627                          * position. Usually, the starting position will match the end of
628                          * the previous timeline, but there are corner cases like if the
629                          * server had sent us half of a WAL record, when it was promoted.
630                          * The new timeline will begin at the end of the last complete
631                          * record in that case, overlapping the partial WAL record on the
632                          * old timeline.
633                          */
634                         uint32          newtimeline;
635                         bool            parsed;
636
637                         parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
638                         PQclear(res);
639                         if (!parsed)
640                                 goto error;
641
642                         /* Sanity check the values the server gave us */
643                         if (newtimeline <= stream->timeline)
644                         {
645                                 fprintf(stderr,
646                                                 _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
647                                                 progname, newtimeline, stream->timeline);
648                                 goto error;
649                         }
650                         if (stream->startpos > stoppos)
651                         {
652                                 fprintf(stderr,
653                                                 _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
654                                                 progname,
655                                 stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
656                                                 newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
657                                 goto error;
658                         }
659
660                         /* Read the final result, which should be CommandComplete. */
661                         res = PQgetResult(conn);
662                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
663                         {
664                                 fprintf(stderr,
665                                    _("%s: unexpected termination of replication stream: %s"),
666                                                 progname, PQresultErrorMessage(res));
667                                 PQclear(res);
668                                 goto error;
669                         }
670                         PQclear(res);
671
672                         /*
673                          * Loop back to start streaming from the new timeline. Always
674                          * start streaming at the beginning of a segment.
675                          */
676                         stream->timeline = newtimeline;
677                         stream->startpos = stream->startpos - (stream->startpos % XLOG_SEG_SIZE);
678                         continue;
679                 }
680                 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
681                 {
682                         PQclear(res);
683
684                         /*
685                          * End of replication (ie. controlled shut down of the server).
686                          *
687                          * Check if the callback thinks it's OK to stop here. If not,
688                          * complain.
689                          */
690                         if (stream->stream_stop(stoppos, stream->timeline, false))
691                                 return true;
692                         else
693                         {
694                                 fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
695                                                 progname);
696                                 goto error;
697                         }
698                 }
699                 else
700                 {
701                         /* Server returned an error. */
702                         fprintf(stderr,
703                                         _("%s: unexpected termination of replication stream: %s"),
704                                         progname, PQresultErrorMessage(res));
705                         PQclear(res);
706                         goto error;
707                 }
708         }
709
710 error:
711         if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
712                 fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
713                   progname, current_walfile_name, stream->walmethod->getlasterror());
714         walfile = NULL;
715         return false;
716 }
717
718 /*
719  * Helper function to parse the result set returned by server after streaming
720  * has finished. On failure, prints an error to stderr and returns false.
721  */
722 static bool
723 ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
724 {
725         uint32          startpos_xlogid,
726                                 startpos_xrecoff;
727
728         /*----------
729          * The result set consists of one row and two columns, e.g:
730          *
731          *      next_tli | next_tli_startpos
732          * ----------+-------------------
733          *                 4 | 0/9949AE0
734          *
735          * next_tli is the timeline ID of the next timeline after the one that
736          * just finished streaming. next_tli_startpos is the WAL location where
737          * the server switched to it.
738          *----------
739          */
740         if (PQnfields(res) < 2 || PQntuples(res) != 1)
741         {
742                 fprintf(stderr,
743                                 _("%s: unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields\n"),
744                                 progname, PQntuples(res), PQnfields(res), 1, 2);
745                 return false;
746         }
747
748         *timeline = atoi(PQgetvalue(res, 0, 0));
749         if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
750                            &startpos_xrecoff) != 2)
751         {
752                 fprintf(stderr,
753                         _("%s: could not parse next timeline's starting point \"%s\"\n"),
754                                 progname, PQgetvalue(res, 0, 1));
755                 return false;
756         }
757         *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
758
759         return true;
760 }
761
762 /*
763  * The main loop of ReceiveXlogStream. Handles the COPY stream after
764  * initiating streaming with the START_STREAMING command.
765  *
766  * If the COPY ends (not necessarily successfully) due a message from the
767  * server, returns a PGresult and sets *stoppos to the last byte written.
768  * On any other sort of error, returns NULL.
769  */
770 static PGresult *
771 HandleCopyStream(PGconn *conn, StreamCtl *stream,
772                                  XLogRecPtr *stoppos)
773 {
774         char       *copybuf = NULL;
775         TimestampTz last_status = -1;
776         XLogRecPtr      blockpos = stream->startpos;
777
778         still_sending = true;
779
780         while (1)
781         {
782                 int                     r;
783                 TimestampTz now;
784                 long            sleeptime;
785
786                 /*
787                  * Check if we should continue streaming, or abort at this point.
788                  */
789                 if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
790                         goto error;
791
792                 now = feGetCurrentTimestamp();
793
794                 /*
795                  * If synchronous option is true, issue sync command as soon as there
796                  * are WAL data which has not been flushed yet.
797                  */
798                 if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
799                 {
800                         if (stream->walmethod->sync(walfile) != 0)
801                         {
802                                 fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
803                                                 progname, current_walfile_name, stream->walmethod->getlasterror());
804                                 goto error;
805                         }
806                         lastFlushPosition = blockpos;
807
808                         /*
809                          * Send feedback so that the server sees the latest WAL locations
810                          * immediately.
811                          */
812                         if (!sendFeedback(conn, blockpos, now, false))
813                                 goto error;
814                         last_status = now;
815                 }
816
817                 /*
818                  * Potentially send a status message to the master
819                  */
820                 if (still_sending && stream->standby_message_timeout > 0 &&
821                         feTimestampDifferenceExceeds(last_status, now,
822                                                                                  stream->standby_message_timeout))
823                 {
824                         /* Time to send feedback! */
825                         if (!sendFeedback(conn, blockpos, now, false))
826                                 goto error;
827                         last_status = now;
828                 }
829
830                 /*
831                  * Calculate how long send/receive loops should sleep
832                  */
833                 sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
834                                                                                                  last_status);
835
836                 r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
837                 while (r != 0)
838                 {
839                         if (r == -1)
840                                 goto error;
841                         if (r == -2)
842                         {
843                                 PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
844
845                                 if (res == NULL)
846                                         goto error;
847                                 else
848                                         return res;
849                         }
850
851                         /* Check the message type. */
852                         if (copybuf[0] == 'k')
853                         {
854                                 if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
855                                                                                  &last_status))
856                                         goto error;
857                         }
858                         else if (copybuf[0] == 'w')
859                         {
860                                 if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
861                                         goto error;
862
863                                 /*
864                                  * Check if we should continue streaming, or abort at this
865                                  * point.
866                                  */
867                                 if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
868                                         goto error;
869                         }
870                         else
871                         {
872                                 fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
873                                                 progname, copybuf[0]);
874                                 goto error;
875                         }
876
877                         /*
878                          * Process the received data, and any subsequent data we can read
879                          * without blocking.
880                          */
881                         r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
882                 }
883         }
884
885 error:
886         if (copybuf != NULL)
887                 PQfreemem(copybuf);
888         return NULL;
889 }
890
891 /*
892  * Wait until we can read a CopyData message,
893  * or timeout, or occurrence of a signal or input on the stop_socket.
894  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
895  *
896  * Returns 1 if data has become available for reading, 0 if timed out
897  * or interrupted by signal or stop_socket input, and -1 on an error.
898  */
899 static int
900 CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
901 {
902         int                     ret;
903         fd_set          input_mask;
904         int                     connsocket;
905         int                     maxfd;
906         struct timeval timeout;
907         struct timeval *timeoutptr;
908
909         connsocket = PQsocket(conn);
910         if (connsocket < 0)
911         {
912                 fprintf(stderr, _("%s: invalid socket: %s"), progname,
913                                 PQerrorMessage(conn));
914                 return -1;
915         }
916
917         FD_ZERO(&input_mask);
918         FD_SET(connsocket, &input_mask);
919         maxfd = connsocket;
920         if (stop_socket != PGINVALID_SOCKET)
921         {
922                 FD_SET(stop_socket, &input_mask);
923                 maxfd = Max(maxfd, stop_socket);
924         }
925
926         if (timeout_ms < 0)
927                 timeoutptr = NULL;
928         else
929         {
930                 timeout.tv_sec = timeout_ms / 1000L;
931                 timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
932                 timeoutptr = &timeout;
933         }
934
935         ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
936
937         if (ret < 0)
938         {
939                 if (errno == EINTR)
940                         return 0;                       /* Got a signal, so not an error */
941                 fprintf(stderr, _("%s: select() failed: %s\n"),
942                                 progname, strerror(errno));
943                 return -1;
944         }
945         if (ret > 0 && FD_ISSET(connsocket, &input_mask))
946                 return 1;                               /* Got input on connection socket */
947
948         return 0;                                       /* Got timeout or input on stop_socket */
949 }
950
951 /*
952  * Receive CopyData message available from XLOG stream, blocking for
953  * maximum of 'timeout' ms.
954  *
955  * If data was received, returns the length of the data. *buffer is set to
956  * point to a buffer holding the received message. The buffer is only valid
957  * until the next CopyStreamReceive call.
958  *
959  * Returns 0 if no data was available within timeout, or if wait was
960  * interrupted by signal or stop_socket input.
961  * -1 on error. -2 if the server ended the COPY.
962  */
963 static int
964 CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
965                                   char **buffer)
966 {
967         char       *copybuf = NULL;
968         int                     rawlen;
969
970         if (*buffer != NULL)
971                 PQfreemem(*buffer);
972         *buffer = NULL;
973
974         /* Try to receive a CopyData message */
975         rawlen = PQgetCopyData(conn, &copybuf, 1);
976         if (rawlen == 0)
977         {
978                 int                     ret;
979
980                 /*
981                  * No data available.  Wait for some to appear, but not longer than
982                  * the specified timeout, so that we can ping the server.  Also stop
983                  * waiting if input appears on stop_socket.
984                  */
985                 ret = CopyStreamPoll(conn, timeout, stop_socket);
986                 if (ret <= 0)
987                         return ret;
988
989                 /* Now there is actually data on the socket */
990                 if (PQconsumeInput(conn) == 0)
991                 {
992                         fprintf(stderr,
993                                         _("%s: could not receive data from WAL stream: %s"),
994                                         progname, PQerrorMessage(conn));
995                         return -1;
996                 }
997
998                 /* Now that we've consumed some input, try again */
999                 rawlen = PQgetCopyData(conn, &copybuf, 1);
1000                 if (rawlen == 0)
1001                         return 0;
1002         }
1003         if (rawlen == -1)                       /* end-of-streaming or error */
1004                 return -2;
1005         if (rawlen == -2)
1006         {
1007                 fprintf(stderr, _("%s: could not read COPY data: %s"),
1008                                 progname, PQerrorMessage(conn));
1009                 return -1;
1010         }
1011
1012         /* Return received messages to caller */
1013         *buffer = copybuf;
1014         return rawlen;
1015 }
1016
1017 /*
1018  * Process the keepalive message.
1019  */
1020 static bool
1021 ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1022                                         XLogRecPtr blockpos, TimestampTz *last_status)
1023 {
1024         int                     pos;
1025         bool            replyRequested;
1026         TimestampTz now;
1027
1028         /*
1029          * Parse the keepalive message, enclosed in the CopyData message. We just
1030          * check if the server requested a reply, and ignore the rest.
1031          */
1032         pos = 1;                                        /* skip msgtype 'k' */
1033         pos += 8;                                       /* skip walEnd */
1034         pos += 8;                                       /* skip sendTime */
1035
1036         if (len < pos + 1)
1037         {
1038                 fprintf(stderr, _("%s: streaming header too small: %d\n"),
1039                                 progname, len);
1040                 return false;
1041         }
1042         replyRequested = copybuf[pos];
1043
1044         /* If the server requested an immediate reply, send one. */
1045         if (replyRequested && still_sending)
1046         {
1047                 if (reportFlushPosition && lastFlushPosition < blockpos &&
1048                         walfile != NULL)
1049                 {
1050                         /*
1051                          * If a valid flush location needs to be reported, flush the
1052                          * current WAL file so that the latest flush location is sent back
1053                          * to the server. This is necessary to see whether the last WAL
1054                          * data has been successfully replicated or not, at the normal
1055                          * shutdown of the server.
1056                          */
1057                         if (stream->walmethod->sync(walfile) != 0)
1058                         {
1059                                 fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
1060                                                 progname, current_walfile_name, stream->walmethod->getlasterror());
1061                                 return false;
1062                         }
1063                         lastFlushPosition = blockpos;
1064                 }
1065
1066                 now = feGetCurrentTimestamp();
1067                 if (!sendFeedback(conn, blockpos, now, false))
1068                         return false;
1069                 *last_status = now;
1070         }
1071
1072         return true;
1073 }
1074
1075 /*
1076  * Process XLogData message.
1077  */
1078 static bool
1079 ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1080                                    XLogRecPtr *blockpos)
1081 {
1082         int                     xlogoff;
1083         int                     bytes_left;
1084         int                     bytes_written;
1085         int                     hdr_len;
1086
1087         /*
1088          * Once we've decided we don't want to receive any more, just ignore any
1089          * subsequent XLogData messages.
1090          */
1091         if (!(still_sending))
1092                 return true;
1093
1094         /*
1095          * Read the header of the XLogData message, enclosed in the CopyData
1096          * message. We only need the WAL location field (dataStart), the rest of
1097          * the header is ignored.
1098          */
1099         hdr_len = 1;                            /* msgtype 'w' */
1100         hdr_len += 8;                           /* dataStart */
1101         hdr_len += 8;                           /* walEnd */
1102         hdr_len += 8;                           /* sendTime */
1103         if (len < hdr_len)
1104         {
1105                 fprintf(stderr, _("%s: streaming header too small: %d\n"),
1106                                 progname, len);
1107                 return false;
1108         }
1109         *blockpos = fe_recvint64(&copybuf[1]);
1110
1111         /* Extract WAL location for this block */
1112         xlogoff = *blockpos % XLOG_SEG_SIZE;
1113
1114         /*
1115          * Verify that the initial location in the stream matches where we think
1116          * we are.
1117          */
1118         if (walfile == NULL)
1119         {
1120                 /* No file open yet */
1121                 if (xlogoff != 0)
1122                 {
1123                         fprintf(stderr,
1124                                         _("%s: received write-ahead log record for offset %u with no file open\n"),
1125                                         progname, xlogoff);
1126                         return false;
1127                 }
1128         }
1129         else
1130         {
1131                 /* More data in existing segment */
1132                 if (stream->walmethod->get_current_pos(walfile) != xlogoff)
1133                 {
1134                         fprintf(stderr,
1135                                         _("%s: got WAL data offset %08x, expected %08x\n"),
1136                                         progname, xlogoff, (int) stream->walmethod->get_current_pos(walfile));
1137                         return false;
1138                 }
1139         }
1140
1141         bytes_left = len - hdr_len;
1142         bytes_written = 0;
1143
1144         while (bytes_left)
1145         {
1146                 int                     bytes_to_write;
1147
1148                 /*
1149                  * If crossing a WAL boundary, only write up until we reach
1150                  * XLOG_SEG_SIZE.
1151                  */
1152                 if (xlogoff + bytes_left > XLOG_SEG_SIZE)
1153                         bytes_to_write = XLOG_SEG_SIZE - xlogoff;
1154                 else
1155                         bytes_to_write = bytes_left;
1156
1157                 if (walfile == NULL)
1158                 {
1159                         if (!open_walfile(stream, *blockpos))
1160                         {
1161                                 /* Error logged by open_walfile */
1162                                 return false;
1163                         }
1164                 }
1165
1166                 if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
1167                                                                          bytes_to_write) != bytes_to_write)
1168                 {
1169                         fprintf(stderr,
1170                                   _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
1171                                         progname, bytes_to_write, current_walfile_name,
1172                                         stream->walmethod->getlasterror());
1173                         return false;
1174                 }
1175
1176                 /* Write was successful, advance our position */
1177                 bytes_written += bytes_to_write;
1178                 bytes_left -= bytes_to_write;
1179                 *blockpos += bytes_to_write;
1180                 xlogoff += bytes_to_write;
1181
1182                 /* Did we reach the end of a WAL segment? */
1183                 if (*blockpos % XLOG_SEG_SIZE == 0)
1184                 {
1185                         if (!close_walfile(stream, *blockpos))
1186                                 /* Error message written in close_walfile() */
1187                                 return false;
1188
1189                         xlogoff = 0;
1190
1191                         if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1192                         {
1193                                 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1194                                 {
1195                                         fprintf(stderr, _("%s: could not send copy-end packet: %s"),
1196                                                         progname, PQerrorMessage(conn));
1197                                         return false;
1198                                 }
1199                                 still_sending = false;
1200                                 return true;    /* ignore the rest of this XLogData packet */
1201                         }
1202                 }
1203         }
1204         /* No more data left to write, receive next copy packet */
1205
1206         return true;
1207 }
1208
1209 /*
1210  * Handle end of the copy stream.
1211  */
1212 static PGresult *
1213 HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
1214                                           XLogRecPtr blockpos, XLogRecPtr *stoppos)
1215 {
1216         PGresult   *res = PQgetResult(conn);
1217
1218         /*
1219          * The server closed its end of the copy stream.  If we haven't closed
1220          * ours already, we need to do so now, unless the server threw an error,
1221          * in which case we don't.
1222          */
1223         if (still_sending)
1224         {
1225                 if (!close_walfile(stream, blockpos))
1226                 {
1227                         /* Error message written in close_walfile() */
1228                         PQclear(res);
1229                         return NULL;
1230                 }
1231                 if (PQresultStatus(res) == PGRES_COPY_IN)
1232                 {
1233                         if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1234                         {
1235                                 fprintf(stderr,
1236                                                 _("%s: could not send copy-end packet: %s"),
1237                                                 progname, PQerrorMessage(conn));
1238                                 PQclear(res);
1239                                 return NULL;
1240                         }
1241                         res = PQgetResult(conn);
1242                 }
1243                 still_sending = false;
1244         }
1245         if (copybuf != NULL)
1246                 PQfreemem(copybuf);
1247         *stoppos = blockpos;
1248         return res;
1249 }
1250
1251 /*
1252  * Check if we should continue streaming, or abort at this point.
1253  */
1254 static bool
1255 CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
1256                                         XLogRecPtr *stoppos)
1257 {
1258         if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1259         {
1260                 if (!close_walfile(stream, blockpos))
1261                 {
1262                         /* Potential error message is written by close_walfile */
1263                         return false;
1264                 }
1265                 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1266                 {
1267                         fprintf(stderr, _("%s: could not send copy-end packet: %s"),
1268                                         progname, PQerrorMessage(conn));
1269                         return false;
1270                 }
1271                 still_sending = false;
1272         }
1273
1274         return true;
1275 }
1276
1277 /*
1278  * Calculate how long send/receive loops should sleep
1279  */
1280 static long
1281 CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
1282                                                          TimestampTz last_status)
1283 {
1284         TimestampTz status_targettime = 0;
1285         long            sleeptime;
1286
1287         if (standby_message_timeout && still_sending)
1288                 status_targettime = last_status +
1289                         (standby_message_timeout - 1) * ((int64) 1000);
1290
1291         if (status_targettime > 0)
1292         {
1293                 long            secs;
1294                 int                     usecs;
1295
1296                 feTimestampDifference(now,
1297                                                           status_targettime,
1298                                                           &secs,
1299                                                           &usecs);
1300                 /* Always sleep at least 1 sec */
1301                 if (secs <= 0)
1302                 {
1303                         secs = 1;
1304                         usecs = 0;
1305                 }
1306
1307                 sleeptime = secs * 1000 + usecs / 1000;
1308         }
1309         else
1310                 sleeptime = -1;
1311
1312         return sleeptime;
1313 }