]> granicus.if.org Git - postgresql/blob - src/bin/pg_basebackup/pg_receivewal.c
pg_basebackup: Add option to create replication slot
[postgresql] / src / bin / pg_basebackup / pg_receivewal.c
1 /*-------------------------------------------------------------------------
2  *
3  * pg_receivewal.c - receive streaming WAL data and write it
4  *                                        to a local file.
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  *                src/bin/pg_basebackup/pg_receivewal.c
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres_fe.h"
16
17 #include <dirent.h>
18 #include <signal.h>
19 #include <sys/stat.h>
20 #include <unistd.h>
21
22 #include "libpq-fe.h"
23 #include "access/xlog_internal.h"
24 #include "getopt_long.h"
25
26 #include "receivelog.h"
27 #include "streamutil.h"
28
29
30 /* Time to sleep between reconnection attempts */
31 #define RECONNECT_SLEEP_TIME 5
32
33 /* Global options */
34 static char *basedir = NULL;
35 static int      verbose = 0;
36 static int      compresslevel = 0;
37 static int      noloop = 0;
38 static int      standby_message_timeout = 10 * 1000;    /* 10 sec = default */
39 static volatile bool time_to_stop = false;
40 static bool do_create_slot = false;
41 static bool slot_exists_ok = false;
42 static bool do_drop_slot = false;
43 static bool synchronous = false;
44 static char *replication_slot = NULL;
45 static XLogRecPtr endpos = InvalidXLogRecPtr;
46
47
48 static void usage(void);
49 static DIR *get_destination_dir(char *dest_folder);
50 static void close_destination_dir(DIR *dest_dir, char *dest_folder);
51 static XLogRecPtr FindStreamingStart(uint32 *tli);
52 static void StreamLog(void);
53 static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
54                            bool segment_finished);
55
56 #define disconnect_and_exit(code)                               \
57         {                                                                                       \
58         if (conn != NULL) PQfinish(conn);                       \
59         exit(code);                                                                     \
60         }
61
62 /* Routines to evaluate segment file format */
63 #define IsCompressXLogFileName(fname)    \
64         (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") && \
65          strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&         \
66          strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0)
67 #define IsPartialCompressXLogFileName(fname)    \
68         (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") && \
69          strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&         \
70          strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0)
71
72 static void
73 usage(void)
74 {
75         printf(_("%s receives PostgreSQL streaming write-ahead logs.\n\n"),
76                    progname);
77         printf(_("Usage:\n"));
78         printf(_("  %s [OPTION]...\n"), progname);
79         printf(_("\nOptions:\n"));
80         printf(_("  -D, --directory=DIR    receive write-ahead log files into this directory\n"));
81         printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
82         printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
83         printf(_("  -n, --no-loop          do not loop on connection lost\n"));
84         printf(_("  -s, --status-interval=SECS\n"
85                          "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
86         printf(_("  -S, --slot=SLOTNAME    replication slot to use\n"));
87         printf(_("      --synchronous      flush write-ahead log immediately after writing\n"));
88         printf(_("  -v, --verbose          output verbose messages\n"));
89         printf(_("  -V, --version          output version information, then exit\n"));
90         printf(_("  -Z, --compress=0-9     compress logs with given compression level\n"));
91         printf(_("  -?, --help             show this help, then exit\n"));
92         printf(_("\nConnection options:\n"));
93         printf(_("  -d, --dbname=CONNSTR   connection string\n"));
94         printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
95         printf(_("  -p, --port=PORT        database server port number\n"));
96         printf(_("  -U, --username=NAME    connect as specified database user\n"));
97         printf(_("  -w, --no-password      never prompt for password\n"));
98         printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
99         printf(_("\nOptional actions:\n"));
100         printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
101         printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
102         printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
103 }
104
105 static bool
106 stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
107 {
108         static uint32 prevtimeline = 0;
109         static XLogRecPtr prevpos = InvalidXLogRecPtr;
110
111         /* we assume that we get called once at the end of each segment */
112         if (verbose && segment_finished)
113                 fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
114                                 progname, (uint32) (xlogpos >> 32), (uint32) xlogpos,
115                                 timeline);
116
117         if (!XLogRecPtrIsInvalid(endpos) && endpos < xlogpos)
118         {
119                 if (verbose)
120                         fprintf(stderr, _("%s: stopped streaming at %X/%X (timeline %u)\n"),
121                                         progname, (uint32) (xlogpos >> 32), (uint32) xlogpos,
122                                         timeline);
123                 time_to_stop = true;
124                 return true;
125         }
126
127         /*
128          * Note that we report the previous, not current, position here. After a
129          * timeline switch, xlogpos points to the beginning of the segment because
130          * that's where we always begin streaming. Reporting the end of previous
131          * timeline isn't totally accurate, because the next timeline can begin
132          * slightly before the end of the WAL that we received on the previous
133          * timeline, but it's close enough for reporting purposes.
134          */
135         if (verbose && prevtimeline != 0 && prevtimeline != timeline)
136                 fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"),
137                                 progname, timeline,
138                                 (uint32) (prevpos >> 32), (uint32) prevpos);
139
140         prevtimeline = timeline;
141         prevpos = xlogpos;
142
143         if (time_to_stop)
144         {
145                 if (verbose)
146                         fprintf(stderr, _("%s: received interrupt signal, exiting\n"),
147                                         progname);
148                 return true;
149         }
150         return false;
151 }
152
153
154 /*
155  * Get destination directory.
156  */
157 static DIR *
158 get_destination_dir(char *dest_folder)
159 {
160         DIR                *dir;
161
162         Assert(dest_folder != NULL);
163         dir = opendir(dest_folder);
164         if (dir == NULL)
165         {
166                 fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
167                                 progname, basedir, strerror(errno));
168                 disconnect_and_exit(1);
169         }
170
171         return dir;
172 }
173
174
175 /*
176  * Close existing directory.
177  */
178 static void
179 close_destination_dir(DIR *dest_dir, char *dest_folder)
180 {
181         Assert(dest_dir != NULL && dest_folder != NULL);
182         if (closedir(dest_dir))
183         {
184                 fprintf(stderr, _("%s: could not close directory \"%s\": %s\n"),
185                                 progname, dest_folder, strerror(errno));
186                 disconnect_and_exit(1);
187         }
188 }
189
190
191 /*
192  * Determine starting location for streaming, based on any existing xlog
193  * segments in the directory. We start at the end of the last one that is
194  * complete (size matches wal segment size), on the timeline with highest ID.
195  *
196  * If there are no WAL files in the directory, returns InvalidXLogRecPtr.
197  */
198 static XLogRecPtr
199 FindStreamingStart(uint32 *tli)
200 {
201         DIR                *dir;
202         struct dirent *dirent;
203         XLogSegNo       high_segno = 0;
204         uint32          high_tli = 0;
205         bool            high_ispartial = false;
206
207         dir = get_destination_dir(basedir);
208
209         while (errno = 0, (dirent = readdir(dir)) != NULL)
210         {
211                 uint32          tli;
212                 XLogSegNo       segno;
213                 bool            ispartial;
214                 bool            iscompress;
215
216                 /*
217                  * Check if the filename looks like an xlog file, or a .partial file.
218                  */
219                 if (IsXLogFileName(dirent->d_name))
220                 {
221                         ispartial = false;
222                         iscompress = false;
223                 }
224                 else if (IsPartialXLogFileName(dirent->d_name))
225                 {
226                         ispartial = true;
227                         iscompress = false;
228                 }
229                 else if (IsCompressXLogFileName(dirent->d_name))
230                 {
231                         ispartial = false;
232                         iscompress = true;
233                 }
234                 else if (IsPartialCompressXLogFileName(dirent->d_name))
235                 {
236                         ispartial = true;
237                         iscompress = true;
238                 }
239                 else
240                         continue;
241
242                 /*
243                  * Looks like an xlog file. Parse its position.
244                  */
245                 XLogFromFileName(dirent->d_name, &tli, &segno, WalSegSz);
246
247                 /*
248                  * Check that the segment has the right size, if it's supposed to be
249                  * completed.  For non-compressed segments just check the on-disk size
250                  * and see if it matches a completed segment. For compressed segments,
251                  * look at the last 4 bytes of the compressed file, which is where the
252                  * uncompressed size is located for gz files with a size lower than
253                  * 4GB, and then compare it to the size of a completed segment. The 4
254                  * last bytes correspond to the ISIZE member according to
255                  * http://www.zlib.org/rfc-gzip.html.
256                  */
257                 if (!ispartial && !iscompress)
258                 {
259                         struct stat statbuf;
260                         char            fullpath[MAXPGPATH * 2];
261
262                         snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
263                         if (stat(fullpath, &statbuf) != 0)
264                         {
265                                 fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"),
266                                                 progname, fullpath, strerror(errno));
267                                 disconnect_and_exit(1);
268                         }
269
270                         if (statbuf.st_size != WalSegSz)
271                         {
272                                 fprintf(stderr,
273                                                 _("%s: segment file \"%s\" has incorrect size %d, skipping\n"),
274                                                 progname, dirent->d_name, (int) statbuf.st_size);
275                                 continue;
276                         }
277                 }
278                 else if (!ispartial && iscompress)
279                 {
280                         int                     fd;
281                         char            buf[4];
282                         int                     bytes_out;
283                         char            fullpath[MAXPGPATH * 2];
284
285                         snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
286
287                         fd = open(fullpath, O_RDONLY | PG_BINARY);
288                         if (fd < 0)
289                         {
290                                 fprintf(stderr, _("%s: could not open compressed file \"%s\": %s\n"),
291                                                 progname, fullpath, strerror(errno));
292                                 disconnect_and_exit(1);
293                         }
294                         if (lseek(fd, (off_t) (-4), SEEK_END) < 0)
295                         {
296                                 fprintf(stderr, _("%s: could not seek in compressed file \"%s\": %s\n"),
297                                                 progname, fullpath, strerror(errno));
298                                 disconnect_and_exit(1);
299                         }
300                         if (read(fd, (char *) buf, sizeof(buf)) != sizeof(buf))
301                         {
302                                 fprintf(stderr, _("%s: could not read compressed file \"%s\": %s\n"),
303                                                 progname, fullpath, strerror(errno));
304                                 disconnect_and_exit(1);
305                         }
306
307                         close(fd);
308                         bytes_out = (buf[3] << 24) | (buf[2] << 16) |
309                                 (buf[1] << 8) | buf[0];
310
311                         if (bytes_out != WalSegSz)
312                         {
313                                 fprintf(stderr,
314                                                 _("%s: compressed segment file \"%s\" has incorrect uncompressed size %d, skipping\n"),
315                                                 progname, dirent->d_name, bytes_out);
316                                 continue;
317                         }
318                 }
319
320                 /* Looks like a valid segment. Remember that we saw it. */
321                 if ((segno > high_segno) ||
322                         (segno == high_segno && tli > high_tli) ||
323                         (segno == high_segno && tli == high_tli && high_ispartial && !ispartial))
324                 {
325                         high_segno = segno;
326                         high_tli = tli;
327                         high_ispartial = ispartial;
328                 }
329         }
330
331         if (errno)
332         {
333                 fprintf(stderr, _("%s: could not read directory \"%s\": %s\n"),
334                                 progname, basedir, strerror(errno));
335                 disconnect_and_exit(1);
336         }
337
338         close_destination_dir(dir, basedir);
339
340         if (high_segno > 0)
341         {
342                 XLogRecPtr      high_ptr;
343
344                 /*
345                  * Move the starting pointer to the start of the next segment, if the
346                  * highest one we saw was completed. Otherwise start streaming from
347                  * the beginning of the .partial segment.
348                  */
349                 if (!high_ispartial)
350                         high_segno++;
351
352                 XLogSegNoOffsetToRecPtr(high_segno, 0, high_ptr, WalSegSz);
353
354                 *tli = high_tli;
355                 return high_ptr;
356         }
357         else
358                 return InvalidXLogRecPtr;
359 }
360
361 /*
362  * Start the log streaming
363  */
364 static void
365 StreamLog(void)
366 {
367         XLogRecPtr      serverpos;
368         TimeLineID      servertli;
369         StreamCtl       stream;
370
371         MemSet(&stream, 0, sizeof(stream));
372
373         /*
374          * Connect in replication mode to the server
375          */
376         if (conn == NULL)
377                 conn = GetConnection();
378         if (!conn)
379                 /* Error message already written in GetConnection() */
380                 return;
381
382         if (!CheckServerVersionForStreaming(conn))
383         {
384                 /*
385                  * Error message already written in CheckServerVersionForStreaming().
386                  * There's no hope of recovering from a version mismatch, so don't
387                  * retry.
388                  */
389                 disconnect_and_exit(1);
390         }
391
392         /*
393          * Identify server, obtaining start LSN position and current timeline ID
394          * at the same time, necessary if not valid data can be found in the
395          * existing output directory.
396          */
397         if (!RunIdentifySystem(conn, NULL, &servertli, &serverpos, NULL))
398                 disconnect_and_exit(1);
399
400         /*
401          * Figure out where to start streaming.
402          */
403         stream.startpos = FindStreamingStart(&stream.timeline);
404         if (stream.startpos == InvalidXLogRecPtr)
405         {
406                 stream.startpos = serverpos;
407                 stream.timeline = servertli;
408         }
409
410         /*
411          * Always start streaming at the beginning of a segment
412          */
413         stream.startpos -= XLogSegmentOffset(stream.startpos, WalSegSz);
414
415         /*
416          * Start the replication
417          */
418         if (verbose)
419                 fprintf(stderr,
420                                 _("%s: starting log streaming at %X/%X (timeline %u)\n"),
421                                 progname, (uint32) (stream.startpos >> 32), (uint32) stream.startpos,
422                                 stream.timeline);
423
424         stream.stream_stop = stop_streaming;
425         stream.stop_socket = PGINVALID_SOCKET;
426         stream.standby_message_timeout = standby_message_timeout;
427         stream.synchronous = synchronous;
428         stream.do_sync = true;
429         stream.mark_done = false;
430         stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel,
431                                                                                                 stream.do_sync);
432         stream.partial_suffix = ".partial";
433         stream.replication_slot = replication_slot;
434
435         ReceiveXlogStream(conn, &stream);
436
437         if (!stream.walmethod->finish())
438         {
439                 fprintf(stderr,
440                                 _("%s: could not finish writing WAL files: %s\n"),
441                                 progname, strerror(errno));
442                 return;
443         }
444
445         PQfinish(conn);
446
447         FreeWalDirectoryMethod();
448         pg_free(stream.walmethod);
449
450         conn = NULL;
451 }
452
453 /*
454  * When sigint is called, just tell the system to exit at the next possible
455  * moment.
456  */
457 #ifndef WIN32
458
459 static void
460 sigint_handler(int signum)
461 {
462         time_to_stop = true;
463 }
464 #endif
465
466 int
467 main(int argc, char **argv)
468 {
469         static struct option long_options[] = {
470                 {"help", no_argument, NULL, '?'},
471                 {"version", no_argument, NULL, 'V'},
472                 {"directory", required_argument, NULL, 'D'},
473                 {"dbname", required_argument, NULL, 'd'},
474                 {"endpos", required_argument, NULL, 'E'},
475                 {"host", required_argument, NULL, 'h'},
476                 {"port", required_argument, NULL, 'p'},
477                 {"username", required_argument, NULL, 'U'},
478                 {"no-loop", no_argument, NULL, 'n'},
479                 {"no-password", no_argument, NULL, 'w'},
480                 {"password", no_argument, NULL, 'W'},
481                 {"status-interval", required_argument, NULL, 's'},
482                 {"slot", required_argument, NULL, 'S'},
483                 {"verbose", no_argument, NULL, 'v'},
484                 {"compress", required_argument, NULL, 'Z'},
485 /* action */
486                 {"create-slot", no_argument, NULL, 1},
487                 {"drop-slot", no_argument, NULL, 2},
488                 {"if-not-exists", no_argument, NULL, 3},
489                 {"synchronous", no_argument, NULL, 4},
490                 {NULL, 0, NULL, 0}
491         };
492
493         int                     c;
494         int                     option_index;
495         char       *db_name;
496         uint32          hi, lo;
497
498         progname = get_progname(argv[0]);
499         set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
500
501         if (argc > 1)
502         {
503                 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
504                 {
505                         usage();
506                         exit(0);
507                 }
508                 else if (strcmp(argv[1], "-V") == 0 ||
509                                  strcmp(argv[1], "--version") == 0)
510                 {
511                         puts("pg_receivewal (PostgreSQL) " PG_VERSION);
512                         exit(0);
513                 }
514         }
515
516         while ((c = getopt_long(argc, argv, "D:d:E:h:p:U:s:S:nwWvZ:",
517                                                         long_options, &option_index)) != -1)
518         {
519                 switch (c)
520                 {
521                         case 'D':
522                                 basedir = pg_strdup(optarg);
523                                 break;
524                         case 'd':
525                                 connection_string = pg_strdup(optarg);
526                                 break;
527                         case 'h':
528                                 dbhost = pg_strdup(optarg);
529                                 break;
530                         case 'p':
531                                 if (atoi(optarg) <= 0)
532                                 {
533                                         fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
534                                                         progname, optarg);
535                                         exit(1);
536                                 }
537                                 dbport = pg_strdup(optarg);
538                                 break;
539                         case 'U':
540                                 dbuser = pg_strdup(optarg);
541                                 break;
542                         case 'w':
543                                 dbgetpassword = -1;
544                                 break;
545                         case 'W':
546                                 dbgetpassword = 1;
547                                 break;
548                         case 's':
549                                 standby_message_timeout = atoi(optarg) * 1000;
550                                 if (standby_message_timeout < 0)
551                                 {
552                                         fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
553                                                         progname, optarg);
554                                         exit(1);
555                                 }
556                                 break;
557                         case 'S':
558                                 replication_slot = pg_strdup(optarg);
559                                 break;
560                         case 'E':
561                                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
562                                 {
563                                         fprintf(stderr,
564                                                         _("%s: could not parse end position \"%s\"\n"),
565                                                         progname, optarg);
566                                         exit(1);
567                                 }
568                                 endpos = ((uint64) hi) << 32 | lo;
569                                 break;
570                         case 'n':
571                                 noloop = 1;
572                                 break;
573                         case 'v':
574                                 verbose++;
575                                 break;
576                         case 'Z':
577                                 compresslevel = atoi(optarg);
578                                 if (compresslevel < 0 || compresslevel > 9)
579                                 {
580                                         fprintf(stderr, _("%s: invalid compression level \"%s\"\n"),
581                                                         progname, optarg);
582                                         exit(1);
583                                 }
584                                 break;
585 /* action */
586                         case 1:
587                                 do_create_slot = true;
588                                 break;
589                         case 2:
590                                 do_drop_slot = true;
591                                 break;
592                         case 3:
593                                 slot_exists_ok = true;
594                                 break;
595                         case 4:
596                                 synchronous = true;
597                                 break;
598                         default:
599
600                                 /*
601                                  * getopt_long already emitted a complaint
602                                  */
603                                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
604                                                 progname);
605                                 exit(1);
606                 }
607         }
608
609         /*
610          * Any non-option arguments?
611          */
612         if (optind < argc)
613         {
614                 fprintf(stderr,
615                                 _("%s: too many command-line arguments (first is \"%s\")\n"),
616                                 progname, argv[optind]);
617                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
618                                 progname);
619                 exit(1);
620         }
621
622         if (do_drop_slot && do_create_slot)
623         {
624                 fprintf(stderr, _("%s: cannot use --create-slot together with --drop-slot\n"), progname);
625                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
626                                 progname);
627                 exit(1);
628         }
629
630         if (replication_slot == NULL && (do_drop_slot || do_create_slot))
631         {
632                 /* translator: second %s is an option name */
633                 fprintf(stderr, _("%s: %s needs a slot to be specified using --slot\n"), progname,
634                                 do_drop_slot ? "--drop-slot" : "--create-slot");
635                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
636                                 progname);
637                 exit(1);
638         }
639
640         /*
641          * Required arguments
642          */
643         if (basedir == NULL && !do_drop_slot && !do_create_slot)
644         {
645                 fprintf(stderr, _("%s: no target directory specified\n"), progname);
646                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
647                                 progname);
648                 exit(1);
649         }
650
651 #ifndef HAVE_LIBZ
652         if (compresslevel != 0)
653         {
654                 fprintf(stderr,
655                                 _("%s: this build does not support compression\n"),
656                                 progname);
657                 exit(1);
658         }
659 #endif
660
661         /*
662          * Check existence of destination folder.
663          */
664         if (!do_drop_slot && !do_create_slot)
665         {
666                 DIR                *dir = get_destination_dir(basedir);
667
668                 close_destination_dir(dir, basedir);
669         }
670
671 #ifndef WIN32
672         pqsignal(SIGINT, sigint_handler);
673 #endif
674
675         /*
676          * Obtain a connection before doing anything.
677          */
678         conn = GetConnection();
679         if (!conn)
680                 /* error message already written in GetConnection() */
681                 exit(1);
682
683         /*
684          * Run IDENTIFY_SYSTEM to make sure we've successfully have established a
685          * replication connection and haven't connected using a database specific
686          * connection.
687          */
688         if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
689                 disconnect_and_exit(1);
690
691         /* determine remote server's xlog segment size */
692         if (!RetrieveWalSegSize(conn))
693                 disconnect_and_exit(1);
694
695         /*
696          * Check that there is a database associated with connection, none should
697          * be defined in this context.
698          */
699         if (db_name)
700         {
701                 fprintf(stderr,
702                                 _("%s: replication connection using slot \"%s\" is unexpectedly database specific\n"),
703                                 progname, replication_slot);
704                 disconnect_and_exit(1);
705         }
706
707         /*
708          * Drop a replication slot.
709          */
710         if (do_drop_slot)
711         {
712                 if (verbose)
713                         fprintf(stderr,
714                                         _("%s: dropping replication slot \"%s\"\n"),
715                                         progname, replication_slot);
716
717                 if (!DropReplicationSlot(conn, replication_slot))
718                         disconnect_and_exit(1);
719                 disconnect_and_exit(0);
720         }
721
722         /* Create a replication slot */
723         if (do_create_slot)
724         {
725                 if (verbose)
726                         fprintf(stderr,
727                                         _("%s: creating replication slot \"%s\"\n"),
728                                         progname, replication_slot);
729
730                 if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
731                                                                    slot_exists_ok))
732                         disconnect_and_exit(1);
733                 disconnect_and_exit(0);
734         }
735
736         /*
737          * Don't close the connection here so that subsequent StreamLog() can
738          * reuse it.
739          */
740
741         while (true)
742         {
743                 StreamLog();
744                 if (time_to_stop)
745                 {
746                         /*
747                          * We've been Ctrl-C'ed or end of streaming position has been
748                          * willingly reached, so exit without an error code.
749                          */
750                         exit(0);
751                 }
752                 else if (noloop)
753                 {
754                         fprintf(stderr, _("%s: disconnected\n"), progname);
755                         exit(1);
756                 }
757                 else
758                 {
759                         fprintf(stderr,
760                         /* translator: check source for value for %d */
761                                         _("%s: disconnected; waiting %d seconds to try again\n"),
762                                         progname, RECONNECT_SLEEP_TIME);
763                         pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
764                 }
765         }
766 }