]> granicus.if.org Git - postgresql/blob - src/bin/pg_basebackup/pg_receivewal.c
pg_receivewal: Add --no-sync option.
[postgresql] / src / bin / pg_basebackup / pg_receivewal.c
1 /*-------------------------------------------------------------------------
2  *
3  * pg_receivewal.c - receive streaming WAL data and write it
4  *                                        to a local file.
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  *                src/bin/pg_basebackup/pg_receivewal.c
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres_fe.h"
16
17 #include <dirent.h>
18 #include <signal.h>
19 #include <sys/stat.h>
20 #include <unistd.h>
21
22 #include "libpq-fe.h"
23 #include "access/xlog_internal.h"
24 #include "getopt_long.h"
25
26 #include "receivelog.h"
27 #include "streamutil.h"
28
29
30 /* Time to sleep between reconnection attempts */
31 #define RECONNECT_SLEEP_TIME 5
32
33 /* Global options */
34 static char *basedir = NULL;
35 static int      verbose = 0;
36 static int      compresslevel = 0;
37 static int      noloop = 0;
38 static int      standby_message_timeout = 10 * 1000;    /* 10 sec = default */
39 static volatile bool time_to_stop = false;
40 static bool do_create_slot = false;
41 static bool slot_exists_ok = false;
42 static bool do_drop_slot = false;
43 static bool do_sync = true;
44 static bool synchronous = false;
45 static char *replication_slot = NULL;
46 static XLogRecPtr endpos = InvalidXLogRecPtr;
47
48
49 static void usage(void);
50 static DIR *get_destination_dir(char *dest_folder);
51 static void close_destination_dir(DIR *dest_dir, char *dest_folder);
52 static XLogRecPtr FindStreamingStart(uint32 *tli);
53 static void StreamLog(void);
54 static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
55                            bool segment_finished);
56
57 #define disconnect_and_exit(code)                               \
58         {                                                                                       \
59         if (conn != NULL) PQfinish(conn);                       \
60         exit(code);                                                                     \
61         }
62
63 /* Routines to evaluate segment file format */
64 #define IsCompressXLogFileName(fname)    \
65         (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") && \
66          strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&         \
67          strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0)
68 #define IsPartialCompressXLogFileName(fname)    \
69         (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") && \
70          strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&         \
71          strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0)
72
73 static void
74 usage(void)
75 {
76         printf(_("%s receives PostgreSQL streaming write-ahead logs.\n\n"),
77                    progname);
78         printf(_("Usage:\n"));
79         printf(_("  %s [OPTION]...\n"), progname);
80         printf(_("\nOptions:\n"));
81         printf(_("  -D, --directory=DIR    receive write-ahead log files into this directory\n"));
82         printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
83         printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
84         printf(_("  -n, --no-loop          do not loop on connection lost\n"));
85         printf(_("      --no-sync          do not wait for changes to be written safely to disk\n"));
86         printf(_("  -s, --status-interval=SECS\n"
87                          "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
88         printf(_("  -S, --slot=SLOTNAME    replication slot to use\n"));
89         printf(_("      --synchronous      flush write-ahead log immediately after writing\n"));
90         printf(_("  -v, --verbose          output verbose messages\n"));
91         printf(_("  -V, --version          output version information, then exit\n"));
92         printf(_("  -Z, --compress=0-9     compress logs with given compression level\n"));
93         printf(_("  -?, --help             show this help, then exit\n"));
94         printf(_("\nConnection options:\n"));
95         printf(_("  -d, --dbname=CONNSTR   connection string\n"));
96         printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
97         printf(_("  -p, --port=PORT        database server port number\n"));
98         printf(_("  -U, --username=NAME    connect as specified database user\n"));
99         printf(_("  -w, --no-password      never prompt for password\n"));
100         printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
101         printf(_("\nOptional actions:\n"));
102         printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
103         printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
104         printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
105 }
106
107 static bool
108 stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
109 {
110         static uint32 prevtimeline = 0;
111         static XLogRecPtr prevpos = InvalidXLogRecPtr;
112
113         /* we assume that we get called once at the end of each segment */
114         if (verbose && segment_finished)
115                 fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
116                                 progname, (uint32) (xlogpos >> 32), (uint32) xlogpos,
117                                 timeline);
118
119         if (!XLogRecPtrIsInvalid(endpos) && endpos < xlogpos)
120         {
121                 if (verbose)
122                         fprintf(stderr, _("%s: stopped streaming at %X/%X (timeline %u)\n"),
123                                         progname, (uint32) (xlogpos >> 32), (uint32) xlogpos,
124                                         timeline);
125                 time_to_stop = true;
126                 return true;
127         }
128
129         /*
130          * Note that we report the previous, not current, position here. After a
131          * timeline switch, xlogpos points to the beginning of the segment because
132          * that's where we always begin streaming. Reporting the end of previous
133          * timeline isn't totally accurate, because the next timeline can begin
134          * slightly before the end of the WAL that we received on the previous
135          * timeline, but it's close enough for reporting purposes.
136          */
137         if (verbose && prevtimeline != 0 && prevtimeline != timeline)
138                 fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"),
139                                 progname, timeline,
140                                 (uint32) (prevpos >> 32), (uint32) prevpos);
141
142         prevtimeline = timeline;
143         prevpos = xlogpos;
144
145         if (time_to_stop)
146         {
147                 if (verbose)
148                         fprintf(stderr, _("%s: received interrupt signal, exiting\n"),
149                                         progname);
150                 return true;
151         }
152         return false;
153 }
154
155
156 /*
157  * Get destination directory.
158  */
159 static DIR *
160 get_destination_dir(char *dest_folder)
161 {
162         DIR                *dir;
163
164         Assert(dest_folder != NULL);
165         dir = opendir(dest_folder);
166         if (dir == NULL)
167         {
168                 fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
169                                 progname, basedir, strerror(errno));
170                 disconnect_and_exit(1);
171         }
172
173         return dir;
174 }
175
176
177 /*
178  * Close existing directory.
179  */
180 static void
181 close_destination_dir(DIR *dest_dir, char *dest_folder)
182 {
183         Assert(dest_dir != NULL && dest_folder != NULL);
184         if (closedir(dest_dir))
185         {
186                 fprintf(stderr, _("%s: could not close directory \"%s\": %s\n"),
187                                 progname, dest_folder, strerror(errno));
188                 disconnect_and_exit(1);
189         }
190 }
191
192
193 /*
194  * Determine starting location for streaming, based on any existing xlog
195  * segments in the directory. We start at the end of the last one that is
196  * complete (size matches wal segment size), on the timeline with highest ID.
197  *
198  * If there are no WAL files in the directory, returns InvalidXLogRecPtr.
199  */
200 static XLogRecPtr
201 FindStreamingStart(uint32 *tli)
202 {
203         DIR                *dir;
204         struct dirent *dirent;
205         XLogSegNo       high_segno = 0;
206         uint32          high_tli = 0;
207         bool            high_ispartial = false;
208
209         dir = get_destination_dir(basedir);
210
211         while (errno = 0, (dirent = readdir(dir)) != NULL)
212         {
213                 uint32          tli;
214                 XLogSegNo       segno;
215                 bool            ispartial;
216                 bool            iscompress;
217
218                 /*
219                  * Check if the filename looks like an xlog file, or a .partial file.
220                  */
221                 if (IsXLogFileName(dirent->d_name))
222                 {
223                         ispartial = false;
224                         iscompress = false;
225                 }
226                 else if (IsPartialXLogFileName(dirent->d_name))
227                 {
228                         ispartial = true;
229                         iscompress = false;
230                 }
231                 else if (IsCompressXLogFileName(dirent->d_name))
232                 {
233                         ispartial = false;
234                         iscompress = true;
235                 }
236                 else if (IsPartialCompressXLogFileName(dirent->d_name))
237                 {
238                         ispartial = true;
239                         iscompress = true;
240                 }
241                 else
242                         continue;
243
244                 /*
245                  * Looks like an xlog file. Parse its position.
246                  */
247                 XLogFromFileName(dirent->d_name, &tli, &segno, WalSegSz);
248
249                 /*
250                  * Check that the segment has the right size, if it's supposed to be
251                  * completed.  For non-compressed segments just check the on-disk size
252                  * and see if it matches a completed segment. For compressed segments,
253                  * look at the last 4 bytes of the compressed file, which is where the
254                  * uncompressed size is located for gz files with a size lower than
255                  * 4GB, and then compare it to the size of a completed segment. The 4
256                  * last bytes correspond to the ISIZE member according to
257                  * http://www.zlib.org/rfc-gzip.html.
258                  */
259                 if (!ispartial && !iscompress)
260                 {
261                         struct stat statbuf;
262                         char            fullpath[MAXPGPATH * 2];
263
264                         snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
265                         if (stat(fullpath, &statbuf) != 0)
266                         {
267                                 fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"),
268                                                 progname, fullpath, strerror(errno));
269                                 disconnect_and_exit(1);
270                         }
271
272                         if (statbuf.st_size != WalSegSz)
273                         {
274                                 fprintf(stderr,
275                                                 _("%s: segment file \"%s\" has incorrect size %d, skipping\n"),
276                                                 progname, dirent->d_name, (int) statbuf.st_size);
277                                 continue;
278                         }
279                 }
280                 else if (!ispartial && iscompress)
281                 {
282                         int                     fd;
283                         char            buf[4];
284                         int                     bytes_out;
285                         char            fullpath[MAXPGPATH * 2];
286
287                         snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
288
289                         fd = open(fullpath, O_RDONLY | PG_BINARY);
290                         if (fd < 0)
291                         {
292                                 fprintf(stderr, _("%s: could not open compressed file \"%s\": %s\n"),
293                                                 progname, fullpath, strerror(errno));
294                                 disconnect_and_exit(1);
295                         }
296                         if (lseek(fd, (off_t) (-4), SEEK_END) < 0)
297                         {
298                                 fprintf(stderr, _("%s: could not seek in compressed file \"%s\": %s\n"),
299                                                 progname, fullpath, strerror(errno));
300                                 disconnect_and_exit(1);
301                         }
302                         if (read(fd, (char *) buf, sizeof(buf)) != sizeof(buf))
303                         {
304                                 fprintf(stderr, _("%s: could not read compressed file \"%s\": %s\n"),
305                                                 progname, fullpath, strerror(errno));
306                                 disconnect_and_exit(1);
307                         }
308
309                         close(fd);
310                         bytes_out = (buf[3] << 24) | (buf[2] << 16) |
311                                 (buf[1] << 8) | buf[0];
312
313                         if (bytes_out != WalSegSz)
314                         {
315                                 fprintf(stderr,
316                                                 _("%s: compressed segment file \"%s\" has incorrect uncompressed size %d, skipping\n"),
317                                                 progname, dirent->d_name, bytes_out);
318                                 continue;
319                         }
320                 }
321
322                 /* Looks like a valid segment. Remember that we saw it. */
323                 if ((segno > high_segno) ||
324                         (segno == high_segno && tli > high_tli) ||
325                         (segno == high_segno && tli == high_tli && high_ispartial && !ispartial))
326                 {
327                         high_segno = segno;
328                         high_tli = tli;
329                         high_ispartial = ispartial;
330                 }
331         }
332
333         if (errno)
334         {
335                 fprintf(stderr, _("%s: could not read directory \"%s\": %s\n"),
336                                 progname, basedir, strerror(errno));
337                 disconnect_and_exit(1);
338         }
339
340         close_destination_dir(dir, basedir);
341
342         if (high_segno > 0)
343         {
344                 XLogRecPtr      high_ptr;
345
346                 /*
347                  * Move the starting pointer to the start of the next segment, if the
348                  * highest one we saw was completed. Otherwise start streaming from
349                  * the beginning of the .partial segment.
350                  */
351                 if (!high_ispartial)
352                         high_segno++;
353
354                 XLogSegNoOffsetToRecPtr(high_segno, 0, high_ptr, WalSegSz);
355
356                 *tli = high_tli;
357                 return high_ptr;
358         }
359         else
360                 return InvalidXLogRecPtr;
361 }
362
363 /*
364  * Start the log streaming
365  */
366 static void
367 StreamLog(void)
368 {
369         XLogRecPtr      serverpos;
370         TimeLineID      servertli;
371         StreamCtl       stream;
372
373         MemSet(&stream, 0, sizeof(stream));
374
375         /*
376          * Connect in replication mode to the server
377          */
378         if (conn == NULL)
379                 conn = GetConnection();
380         if (!conn)
381                 /* Error message already written in GetConnection() */
382                 return;
383
384         if (!CheckServerVersionForStreaming(conn))
385         {
386                 /*
387                  * Error message already written in CheckServerVersionForStreaming().
388                  * There's no hope of recovering from a version mismatch, so don't
389                  * retry.
390                  */
391                 disconnect_and_exit(1);
392         }
393
394         /*
395          * Identify server, obtaining start LSN position and current timeline ID
396          * at the same time, necessary if not valid data can be found in the
397          * existing output directory.
398          */
399         if (!RunIdentifySystem(conn, NULL, &servertli, &serverpos, NULL))
400                 disconnect_and_exit(1);
401
402         /*
403          * Figure out where to start streaming.
404          */
405         stream.startpos = FindStreamingStart(&stream.timeline);
406         if (stream.startpos == InvalidXLogRecPtr)
407         {
408                 stream.startpos = serverpos;
409                 stream.timeline = servertli;
410         }
411
412         /*
413          * Always start streaming at the beginning of a segment
414          */
415         stream.startpos -= XLogSegmentOffset(stream.startpos, WalSegSz);
416
417         /*
418          * Start the replication
419          */
420         if (verbose)
421                 fprintf(stderr,
422                                 _("%s: starting log streaming at %X/%X (timeline %u)\n"),
423                                 progname, (uint32) (stream.startpos >> 32), (uint32) stream.startpos,
424                                 stream.timeline);
425
426         stream.stream_stop = stop_streaming;
427         stream.stop_socket = PGINVALID_SOCKET;
428         stream.standby_message_timeout = standby_message_timeout;
429         stream.synchronous = synchronous;
430         stream.do_sync = do_sync;
431         stream.mark_done = false;
432         stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel,
433                                                                                                 stream.do_sync);
434         stream.partial_suffix = ".partial";
435         stream.replication_slot = replication_slot;
436
437         ReceiveXlogStream(conn, &stream);
438
439         if (!stream.walmethod->finish())
440         {
441                 fprintf(stderr,
442                                 _("%s: could not finish writing WAL files: %s\n"),
443                                 progname, strerror(errno));
444                 return;
445         }
446
447         PQfinish(conn);
448
449         FreeWalDirectoryMethod();
450         pg_free(stream.walmethod);
451
452         conn = NULL;
453 }
454
455 /*
456  * When sigint is called, just tell the system to exit at the next possible
457  * moment.
458  */
459 #ifndef WIN32
460
461 static void
462 sigint_handler(int signum)
463 {
464         time_to_stop = true;
465 }
466 #endif
467
468 int
469 main(int argc, char **argv)
470 {
471         static struct option long_options[] = {
472                 {"help", no_argument, NULL, '?'},
473                 {"version", no_argument, NULL, 'V'},
474                 {"directory", required_argument, NULL, 'D'},
475                 {"dbname", required_argument, NULL, 'd'},
476                 {"endpos", required_argument, NULL, 'E'},
477                 {"host", required_argument, NULL, 'h'},
478                 {"port", required_argument, NULL, 'p'},
479                 {"username", required_argument, NULL, 'U'},
480                 {"no-loop", no_argument, NULL, 'n'},
481                 {"no-password", no_argument, NULL, 'w'},
482                 {"password", no_argument, NULL, 'W'},
483                 {"status-interval", required_argument, NULL, 's'},
484                 {"slot", required_argument, NULL, 'S'},
485                 {"verbose", no_argument, NULL, 'v'},
486                 {"compress", required_argument, NULL, 'Z'},
487 /* action */
488                 {"create-slot", no_argument, NULL, 1},
489                 {"drop-slot", no_argument, NULL, 2},
490                 {"if-not-exists", no_argument, NULL, 3},
491                 {"synchronous", no_argument, NULL, 4},
492                 {"no-sync", no_argument, NULL, 5},
493                 {NULL, 0, NULL, 0}
494         };
495
496         int                     c;
497         int                     option_index;
498         char       *db_name;
499         uint32          hi, lo;
500
501         progname = get_progname(argv[0]);
502         set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
503
504         if (argc > 1)
505         {
506                 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
507                 {
508                         usage();
509                         exit(0);
510                 }
511                 else if (strcmp(argv[1], "-V") == 0 ||
512                                  strcmp(argv[1], "--version") == 0)
513                 {
514                         puts("pg_receivewal (PostgreSQL) " PG_VERSION);
515                         exit(0);
516                 }
517         }
518
519         while ((c = getopt_long(argc, argv, "D:d:E:h:p:U:s:S:nwWvZ:",
520                                                         long_options, &option_index)) != -1)
521         {
522                 switch (c)
523                 {
524                         case 'D':
525                                 basedir = pg_strdup(optarg);
526                                 break;
527                         case 'd':
528                                 connection_string = pg_strdup(optarg);
529                                 break;
530                         case 'h':
531                                 dbhost = pg_strdup(optarg);
532                                 break;
533                         case 'p':
534                                 if (atoi(optarg) <= 0)
535                                 {
536                                         fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
537                                                         progname, optarg);
538                                         exit(1);
539                                 }
540                                 dbport = pg_strdup(optarg);
541                                 break;
542                         case 'U':
543                                 dbuser = pg_strdup(optarg);
544                                 break;
545                         case 'w':
546                                 dbgetpassword = -1;
547                                 break;
548                         case 'W':
549                                 dbgetpassword = 1;
550                                 break;
551                         case 's':
552                                 standby_message_timeout = atoi(optarg) * 1000;
553                                 if (standby_message_timeout < 0)
554                                 {
555                                         fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
556                                                         progname, optarg);
557                                         exit(1);
558                                 }
559                                 break;
560                         case 'S':
561                                 replication_slot = pg_strdup(optarg);
562                                 break;
563                         case 'E':
564                                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
565                                 {
566                                         fprintf(stderr,
567                                                         _("%s: could not parse end position \"%s\"\n"),
568                                                         progname, optarg);
569                                         exit(1);
570                                 }
571                                 endpos = ((uint64) hi) << 32 | lo;
572                                 break;
573                         case 'n':
574                                 noloop = 1;
575                                 break;
576                         case 'v':
577                                 verbose++;
578                                 break;
579                         case 'Z':
580                                 compresslevel = atoi(optarg);
581                                 if (compresslevel < 0 || compresslevel > 9)
582                                 {
583                                         fprintf(stderr, _("%s: invalid compression level \"%s\"\n"),
584                                                         progname, optarg);
585                                         exit(1);
586                                 }
587                                 break;
588 /* action */
589                         case 1:
590                                 do_create_slot = true;
591                                 break;
592                         case 2:
593                                 do_drop_slot = true;
594                                 break;
595                         case 3:
596                                 slot_exists_ok = true;
597                                 break;
598                         case 4:
599                                 synchronous = true;
600                                 break;
601                         case 5:
602                                 do_sync = false;
603                                 break;
604                         default:
605
606                                 /*
607                                  * getopt_long already emitted a complaint
608                                  */
609                                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
610                                                 progname);
611                                 exit(1);
612                 }
613         }
614
615         /*
616          * Any non-option arguments?
617          */
618         if (optind < argc)
619         {
620                 fprintf(stderr,
621                                 _("%s: too many command-line arguments (first is \"%s\")\n"),
622                                 progname, argv[optind]);
623                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
624                                 progname);
625                 exit(1);
626         }
627
628         if (do_drop_slot && do_create_slot)
629         {
630                 fprintf(stderr, _("%s: cannot use --create-slot together with --drop-slot\n"), progname);
631                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
632                                 progname);
633                 exit(1);
634         }
635
636         if (replication_slot == NULL && (do_drop_slot || do_create_slot))
637         {
638                 /* translator: second %s is an option name */
639                 fprintf(stderr, _("%s: %s needs a slot to be specified using --slot\n"), progname,
640                                 do_drop_slot ? "--drop-slot" : "--create-slot");
641                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
642                                 progname);
643                 exit(1);
644         }
645
646         if (synchronous && !do_sync)
647         {
648                 fprintf(stderr, _("%s: cannot use --synchronous together with --no-sync\n"), progname);
649                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
650                                 progname);
651                 exit(1);
652         }
653
654         /*
655          * Required arguments
656          */
657         if (basedir == NULL && !do_drop_slot && !do_create_slot)
658         {
659                 fprintf(stderr, _("%s: no target directory specified\n"), progname);
660                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
661                                 progname);
662                 exit(1);
663         }
664
665 #ifndef HAVE_LIBZ
666         if (compresslevel != 0)
667         {
668                 fprintf(stderr,
669                                 _("%s: this build does not support compression\n"),
670                                 progname);
671                 exit(1);
672         }
673 #endif
674
675         /*
676          * Check existence of destination folder.
677          */
678         if (!do_drop_slot && !do_create_slot)
679         {
680                 DIR                *dir = get_destination_dir(basedir);
681
682                 close_destination_dir(dir, basedir);
683         }
684
685 #ifndef WIN32
686         pqsignal(SIGINT, sigint_handler);
687 #endif
688
689         /*
690          * Obtain a connection before doing anything.
691          */
692         conn = GetConnection();
693         if (!conn)
694                 /* error message already written in GetConnection() */
695                 exit(1);
696
697         /*
698          * Run IDENTIFY_SYSTEM to make sure we've successfully have established a
699          * replication connection and haven't connected using a database specific
700          * connection.
701          */
702         if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
703                 disconnect_and_exit(1);
704
705         /* determine remote server's xlog segment size */
706         if (!RetrieveWalSegSize(conn))
707                 disconnect_and_exit(1);
708
709         /*
710          * Check that there is a database associated with connection, none should
711          * be defined in this context.
712          */
713         if (db_name)
714         {
715                 fprintf(stderr,
716                                 _("%s: replication connection using slot \"%s\" is unexpectedly database specific\n"),
717                                 progname, replication_slot);
718                 disconnect_and_exit(1);
719         }
720
721         /*
722          * Drop a replication slot.
723          */
724         if (do_drop_slot)
725         {
726                 if (verbose)
727                         fprintf(stderr,
728                                         _("%s: dropping replication slot \"%s\"\n"),
729                                         progname, replication_slot);
730
731                 if (!DropReplicationSlot(conn, replication_slot))
732                         disconnect_and_exit(1);
733                 disconnect_and_exit(0);
734         }
735
736         /* Create a replication slot */
737         if (do_create_slot)
738         {
739                 if (verbose)
740                         fprintf(stderr,
741                                         _("%s: creating replication slot \"%s\"\n"),
742                                         progname, replication_slot);
743
744                 if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
745                                                                    slot_exists_ok))
746                         disconnect_and_exit(1);
747                 disconnect_and_exit(0);
748         }
749
750         /*
751          * Don't close the connection here so that subsequent StreamLog() can
752          * reuse it.
753          */
754
755         while (true)
756         {
757                 StreamLog();
758                 if (time_to_stop)
759                 {
760                         /*
761                          * We've been Ctrl-C'ed or end of streaming position has been
762                          * willingly reached, so exit without an error code.
763                          */
764                         exit(0);
765                 }
766                 else if (noloop)
767                 {
768                         fprintf(stderr, _("%s: disconnected\n"), progname);
769                         exit(1);
770                 }
771                 else
772                 {
773                         fprintf(stderr,
774                         /* translator: check source for value for %d */
775                                         _("%s: disconnected; waiting %d seconds to try again\n"),
776                                         progname, RECONNECT_SLEEP_TIME);
777                         pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
778                 }
779         }
780 }