]> granicus.if.org Git - postgresql/blob - src/bin/pg_basebackup/pg_recvlogical.c
Phase 2 of pgindent updates.
[postgresql] / src / bin / pg_basebackup / pg_recvlogical.c
1 /*-------------------------------------------------------------------------
2  *
3  * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4  *                                        fashion and write it to a local file.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *                src/bin/pg_basebackup/pg_recvlogical.c
10  *-------------------------------------------------------------------------
11  */
12
13 #include "postgres_fe.h"
14
15 #include <dirent.h>
16 #include <sys/stat.h>
17 #include <unistd.h>
18 #ifdef HAVE_SYS_SELECT_H
19 #include <sys/select.h>
20 #endif
21
22 /* local includes */
23 #include "streamutil.h"
24
25 #include "access/xlog_internal.h"
26 #include "common/fe_memutils.h"
27 #include "getopt_long.h"
28 #include "libpq-fe.h"
29 #include "libpq/pqsignal.h"
30 #include "pqexpbuffer.h"
31
32
33 /* Time to sleep between reconnection attempts */
34 #define RECONNECT_SLEEP_TIME 5
35
36 /* Global Options */
37 static char *outfile = NULL;
38 static int      verbose = 0;
39 static int      noloop = 0;
40 static int      standby_message_timeout = 10 * 1000;    /* 10 sec = default */
41 static int      fsync_interval = 10 * 1000; /* 10 sec = default */
42 static XLogRecPtr startpos = InvalidXLogRecPtr;
43 static XLogRecPtr endpos = InvalidXLogRecPtr;
44 static bool do_create_slot = false;
45 static bool slot_exists_ok = false;
46 static bool do_start_slot = false;
47 static bool do_drop_slot = false;
48 static char *replication_slot = NULL;
49
50 /* filled pairwise with option, value. value may be NULL */
51 static char **options;
52 static size_t noptions = 0;
53 static const char *plugin = "test_decoding";
54
55 /* Global State */
56 static int      outfd = -1;
57 static volatile sig_atomic_t time_to_abort = false;
58 static volatile sig_atomic_t output_reopen = false;
59 static bool output_isfile;
60 static TimestampTz output_last_fsync = -1;
61 static bool output_needs_fsync = false;
62 static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
63 static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
64
65 static void usage(void);
66 static void StreamLogicalLog(void);
67 static void disconnect_and_exit(int code);
68 static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
69 static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
70                                    bool keepalive, XLogRecPtr lsn);
71
72 static void
73 usage(void)
74 {
75         printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
76                    progname);
77         printf(_("Usage:\n"));
78         printf(_("  %s [OPTION]...\n"), progname);
79         printf(_("\nAction to be performed:\n"));
80         printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
81         printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
82         printf(_("      --start            start streaming in a replication slot (for the slot's name see --slot)\n"));
83         printf(_("\nOptions:\n"));
84         printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
85         printf(_("  -f, --file=FILE        receive log into this file, - for stdout\n"));
86         printf(_("  -F  --fsync-interval=SECS\n"
87                          "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
88         printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
89         printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
90         printf(_("  -n, --no-loop          do not loop on connection lost\n"));
91         printf(_("  -o, --option=NAME[=VALUE]\n"
92                          "                         pass option NAME with optional value VALUE to the\n"
93                          "                         output plugin\n"));
94         printf(_("  -P, --plugin=PLUGIN    use output plugin PLUGIN (default: %s)\n"), plugin);
95         printf(_("  -s, --status-interval=SECS\n"
96                          "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
97         printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
98         printf(_("  -v, --verbose          output verbose messages\n"));
99         printf(_("  -V, --version          output version information, then exit\n"));
100         printf(_("  -?, --help             show this help, then exit\n"));
101         printf(_("\nConnection options:\n"));
102         printf(_("  -d, --dbname=DBNAME    database to connect to\n"));
103         printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
104         printf(_("  -p, --port=PORT        database server port number\n"));
105         printf(_("  -U, --username=NAME    connect as specified database user\n"));
106         printf(_("  -w, --no-password      never prompt for password\n"));
107         printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
108         printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
109 }
110
111 /*
112  * Send a Standby Status Update message to server.
113  */
114 static bool
115 sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
116 {
117         static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
118         static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
119
120         char            replybuf[1 + 8 + 8 + 8 + 8 + 1];
121         int                     len = 0;
122
123         /*
124          * we normally don't want to send superfluous feedbacks, but if it's
125          * because of a timeout we need to, otherwise wal_sender_timeout will kill
126          * us.
127          */
128         if (!force &&
129                 last_written_lsn == output_written_lsn &&
130                 last_fsync_lsn != output_fsync_lsn)
131                 return true;
132
133         if (verbose)
134                 fprintf(stderr,
135                    _("%s: confirming write up to %X/%X, flush to %X/%X (slot %s)\n"),
136                                 progname,
137                         (uint32) (output_written_lsn >> 32), (uint32) output_written_lsn,
138                                 (uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn,
139                                 replication_slot);
140
141         replybuf[len] = 'r';
142         len += 1;
143         fe_sendint64(output_written_lsn, &replybuf[len]);       /* write */
144         len += 8;
145         fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
146         len += 8;
147         fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);        /* apply */
148         len += 8;
149         fe_sendint64(now, &replybuf[len]);      /* sendTime */
150         len += 8;
151         replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
152         len += 1;
153
154         startpos = output_written_lsn;
155         last_written_lsn = output_written_lsn;
156         last_fsync_lsn = output_fsync_lsn;
157
158         if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
159         {
160                 fprintf(stderr, _("%s: could not send feedback packet: %s"),
161                                 progname, PQerrorMessage(conn));
162                 return false;
163         }
164
165         return true;
166 }
167
168 static void
169 disconnect_and_exit(int code)
170 {
171         if (conn != NULL)
172                 PQfinish(conn);
173
174         exit(code);
175 }
176
177 static bool
178 OutputFsync(TimestampTz now)
179 {
180         output_last_fsync = now;
181
182         output_fsync_lsn = output_written_lsn;
183
184         if (fsync_interval <= 0)
185                 return true;
186
187         if (!output_needs_fsync)
188                 return true;
189
190         output_needs_fsync = false;
191
192         /* can only fsync if it's a regular file */
193         if (!output_isfile)
194                 return true;
195
196         if (fsync(outfd) != 0)
197         {
198                 fprintf(stderr,
199                                 _("%s: could not fsync log file \"%s\": %s\n"),
200                                 progname, outfile, strerror(errno));
201                 return false;
202         }
203
204         return true;
205 }
206
207 /*
208  * Start the log streaming
209  */
210 static void
211 StreamLogicalLog(void)
212 {
213         PGresult   *res;
214         char       *copybuf = NULL;
215         TimestampTz last_status = -1;
216         int                     i;
217         PQExpBuffer query;
218
219         output_written_lsn = InvalidXLogRecPtr;
220         output_fsync_lsn = InvalidXLogRecPtr;
221
222         query = createPQExpBuffer();
223
224         /*
225          * Connect in replication mode to the server
226          */
227         if (!conn)
228                 conn = GetConnection();
229         if (!conn)
230                 /* Error message already written in GetConnection() */
231                 return;
232
233         /*
234          * Start the replication
235          */
236         if (verbose)
237                 fprintf(stderr,
238                                 _("%s: starting log streaming at %X/%X (slot %s)\n"),
239                                 progname, (uint32) (startpos >> 32), (uint32) startpos,
240                                 replication_slot);
241
242         /* Initiate the replication stream at specified location */
243         appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
244                          replication_slot, (uint32) (startpos >> 32), (uint32) startpos);
245
246         /* print options if there are any */
247         if (noptions)
248                 appendPQExpBufferStr(query, " (");
249
250         for (i = 0; i < noptions; i++)
251         {
252                 /* separator */
253                 if (i > 0)
254                         appendPQExpBufferStr(query, ", ");
255
256                 /* write option name */
257                 appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
258
259                 /* write option value if specified */
260                 if (options[(i * 2) + 1] != NULL)
261                         appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
262         }
263
264         if (noptions)
265                 appendPQExpBufferChar(query, ')');
266
267         res = PQexec(conn, query->data);
268         if (PQresultStatus(res) != PGRES_COPY_BOTH)
269         {
270                 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
271                                 progname, query->data, PQresultErrorMessage(res));
272                 PQclear(res);
273                 goto error;
274         }
275         PQclear(res);
276         resetPQExpBuffer(query);
277
278         if (verbose)
279                 fprintf(stderr,
280                                 _("%s: streaming initiated\n"),
281                                 progname);
282
283         while (!time_to_abort)
284         {
285                 int                     r;
286                 int                     bytes_left;
287                 int                     bytes_written;
288                 TimestampTz now;
289                 int                     hdr_len;
290                 XLogRecPtr      cur_record_lsn = InvalidXLogRecPtr;
291
292                 if (copybuf != NULL)
293                 {
294                         PQfreemem(copybuf);
295                         copybuf = NULL;
296                 }
297
298                 /*
299                  * Potentially send a status message to the master
300                  */
301                 now = feGetCurrentTimestamp();
302
303                 if (outfd != -1 &&
304                         feTimestampDifferenceExceeds(output_last_fsync, now,
305                                                                                  fsync_interval))
306                 {
307                         if (!OutputFsync(now))
308                                 goto error;
309                 }
310
311                 if (standby_message_timeout > 0 &&
312                         feTimestampDifferenceExceeds(last_status, now,
313                                                                                  standby_message_timeout))
314                 {
315                         /* Time to send feedback! */
316                         if (!sendFeedback(conn, now, true, false))
317                                 goto error;
318
319                         last_status = now;
320                 }
321
322                 /* got SIGHUP, close output file */
323                 if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
324                 {
325                         now = feGetCurrentTimestamp();
326                         if (!OutputFsync(now))
327                                 goto error;
328                         close(outfd);
329                         outfd = -1;
330                 }
331                 output_reopen = false;
332
333                 /* open the output file, if not open yet */
334                 if (outfd == -1)
335                 {
336                         struct stat statbuf;
337
338                         if (strcmp(outfile, "-") == 0)
339                                 outfd = fileno(stdout);
340                         else
341                                 outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
342                                                          S_IRUSR | S_IWUSR);
343                         if (outfd == -1)
344                         {
345                                 fprintf(stderr,
346                                                 _("%s: could not open log file \"%s\": %s\n"),
347                                                 progname, outfile, strerror(errno));
348                                 goto error;
349                         }
350
351                         if (fstat(outfd, &statbuf) != 0)
352                                 fprintf(stderr,
353                                                 _("%s: could not stat file \"%s\": %s\n"),
354                                                 progname, outfile, strerror(errno));
355
356                         output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
357                 }
358
359                 r = PQgetCopyData(conn, &copybuf, 1);
360                 if (r == 0)
361                 {
362                         /*
363                          * In async mode, and no data available. We block on reading but
364                          * not more than the specified timeout, so that we can send a
365                          * response back to the client.
366                          */
367                         fd_set          input_mask;
368                         TimestampTz message_target = 0;
369                         TimestampTz fsync_target = 0;
370                         struct timeval timeout;
371                         struct timeval *timeoutptr = NULL;
372
373                         if (PQsocket(conn) < 0)
374                         {
375                                 fprintf(stderr,
376                                                 _("%s: invalid socket: %s"),
377                                                 progname, PQerrorMessage(conn));
378                                 goto error;
379                         }
380
381                         FD_ZERO(&input_mask);
382                         FD_SET(PQsocket(conn), &input_mask);
383
384                         /* Compute when we need to wakeup to send a keepalive message. */
385                         if (standby_message_timeout)
386                                 message_target = last_status + (standby_message_timeout - 1) *
387                                         ((int64) 1000);
388
389                         /* Compute when we need to wakeup to fsync the output file. */
390                         if (fsync_interval > 0 && output_needs_fsync)
391                                 fsync_target = output_last_fsync + (fsync_interval - 1) *
392                                         ((int64) 1000);
393
394                         /* Now compute when to wakeup. */
395                         if (message_target > 0 || fsync_target > 0)
396                         {
397                                 TimestampTz targettime;
398                                 long            secs;
399                                 int                     usecs;
400
401                                 targettime = message_target;
402
403                                 if (fsync_target > 0 && fsync_target < targettime)
404                                         targettime = fsync_target;
405
406                                 feTimestampDifference(now,
407                                                                           targettime,
408                                                                           &secs,
409                                                                           &usecs);
410                                 if (secs <= 0)
411                                         timeout.tv_sec = 1; /* Always sleep at least 1 sec */
412                                 else
413                                         timeout.tv_sec = secs;
414                                 timeout.tv_usec = usecs;
415                                 timeoutptr = &timeout;
416                         }
417
418                         r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
419                         if (r == 0 || (r < 0 && errno == EINTR))
420                         {
421                                 /*
422                                  * Got a timeout or signal. Continue the loop and either
423                                  * deliver a status packet to the server or just go back into
424                                  * blocking.
425                                  */
426                                 continue;
427                         }
428                         else if (r < 0)
429                         {
430                                 fprintf(stderr, _("%s: select() failed: %s\n"),
431                                                 progname, strerror(errno));
432                                 goto error;
433                         }
434
435                         /* Else there is actually data on the socket */
436                         if (PQconsumeInput(conn) == 0)
437                         {
438                                 fprintf(stderr,
439                                                 _("%s: could not receive data from WAL stream: %s"),
440                                                 progname, PQerrorMessage(conn));
441                                 goto error;
442                         }
443                         continue;
444                 }
445
446                 /* End of copy stream */
447                 if (r == -1)
448                         break;
449
450                 /* Failure while reading the copy stream */
451                 if (r == -2)
452                 {
453                         fprintf(stderr, _("%s: could not read COPY data: %s"),
454                                         progname, PQerrorMessage(conn));
455                         goto error;
456                 }
457
458                 /* Check the message type. */
459                 if (copybuf[0] == 'k')
460                 {
461                         int                     pos;
462                         bool            replyRequested;
463                         XLogRecPtr      walEnd;
464                         bool            endposReached = false;
465
466                         /*
467                          * Parse the keepalive message, enclosed in the CopyData message.
468                          * We just check if the server requested a reply, and ignore the
469                          * rest.
470                          */
471                         pos = 1;                        /* skip msgtype 'k' */
472                         walEnd = fe_recvint64(&copybuf[pos]);
473                         output_written_lsn = Max(walEnd, output_written_lsn);
474
475                         pos += 8;                       /* read walEnd */
476
477                         pos += 8;                       /* skip sendTime */
478
479                         if (r < pos + 1)
480                         {
481                                 fprintf(stderr, _("%s: streaming header too small: %d\n"),
482                                                 progname, r);
483                                 goto error;
484                         }
485                         replyRequested = copybuf[pos];
486
487                         if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
488                         {
489                                 /*
490                                  * If there's nothing to read on the socket until a keepalive
491                                  * we know that the server has nothing to send us; and if
492                                  * walEnd has passed endpos, we know nothing else can have
493                                  * committed before endpos.  So we can bail out now.
494                                  */
495                                 endposReached = true;
496                         }
497
498                         /* Send a reply, if necessary */
499                         if (replyRequested || endposReached)
500                         {
501                                 if (!flushAndSendFeedback(conn, &now))
502                                         goto error;
503                                 last_status = now;
504                         }
505
506                         if (endposReached)
507                         {
508                                 prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
509                                 time_to_abort = true;
510                                 break;
511                         }
512
513                         continue;
514                 }
515                 else if (copybuf[0] != 'w')
516                 {
517                         fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
518                                         progname, copybuf[0]);
519                         goto error;
520                 }
521
522                 /*
523                  * Read the header of the XLogData message, enclosed in the CopyData
524                  * message. We only need the WAL location field (dataStart), the rest
525                  * of the header is ignored.
526                  */
527                 hdr_len = 1;                    /* msgtype 'w' */
528                 hdr_len += 8;                   /* dataStart */
529                 hdr_len += 8;                   /* walEnd */
530                 hdr_len += 8;                   /* sendTime */
531                 if (r < hdr_len + 1)
532                 {
533                         fprintf(stderr, _("%s: streaming header too small: %d\n"),
534                                         progname, r);
535                         goto error;
536                 }
537
538                 /* Extract WAL location for this block */
539                 cur_record_lsn = fe_recvint64(&copybuf[1]);
540
541                 if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
542                 {
543                         /*
544                          * We've read past our endpoint, so prepare to go away being
545                          * cautious about what happens to our output data.
546                          */
547                         if (!flushAndSendFeedback(conn, &now))
548                                 goto error;
549                         prepareToTerminate(conn, endpos, false, cur_record_lsn);
550                         time_to_abort = true;
551                         break;
552                 }
553
554                 output_written_lsn = Max(cur_record_lsn, output_written_lsn);
555
556                 bytes_left = r - hdr_len;
557                 bytes_written = 0;
558
559                 /* signal that a fsync is needed */
560                 output_needs_fsync = true;
561
562                 while (bytes_left)
563                 {
564                         int                     ret;
565
566                         ret = write(outfd,
567                                                 copybuf + hdr_len + bytes_written,
568                                                 bytes_left);
569
570                         if (ret < 0)
571                         {
572                                 fprintf(stderr,
573                                   _("%s: could not write %u bytes to log file \"%s\": %s\n"),
574                                                 progname, bytes_left, outfile,
575                                                 strerror(errno));
576                                 goto error;
577                         }
578
579                         /* Write was successful, advance our position */
580                         bytes_written += ret;
581                         bytes_left -= ret;
582                 }
583
584                 if (write(outfd, "\n", 1) != 1)
585                 {
586                         fprintf(stderr,
587                                   _("%s: could not write %u bytes to log file \"%s\": %s\n"),
588                                         progname, 1, outfile,
589                                         strerror(errno));
590                         goto error;
591                 }
592
593                 if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
594                 {
595                         /* endpos was exactly the record we just processed, we're done */
596                         if (!flushAndSendFeedback(conn, &now))
597                                 goto error;
598                         prepareToTerminate(conn, endpos, false, cur_record_lsn);
599                         time_to_abort = true;
600                         break;
601                 }
602         }
603
604         res = PQgetResult(conn);
605         if (PQresultStatus(res) == PGRES_COPY_OUT)
606         {
607                 /*
608                  * We're doing a client-initiated clean exit and have sent CopyDone to
609                  * the server. We've already sent replay confirmation and fsync'd so
610                  * we can just clean up the connection now.
611                  */
612                 goto error;
613         }
614         else if (PQresultStatus(res) != PGRES_COMMAND_OK)
615         {
616                 fprintf(stderr,
617                                 _("%s: unexpected termination of replication stream: %s"),
618                                 progname, PQresultErrorMessage(res));
619                 goto error;
620         }
621         PQclear(res);
622
623         if (outfd != -1 && strcmp(outfile, "-") != 0)
624         {
625                 TimestampTz t = feGetCurrentTimestamp();
626
627                 /* no need to jump to error on failure here, we're finishing anyway */
628                 OutputFsync(t);
629
630                 if (close(outfd) != 0)
631                         fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
632                                         progname, outfile, strerror(errno));
633         }
634         outfd = -1;
635 error:
636         if (copybuf != NULL)
637         {
638                 PQfreemem(copybuf);
639                 copybuf = NULL;
640         }
641         destroyPQExpBuffer(query);
642         PQfinish(conn);
643         conn = NULL;
644 }
645
646 /*
647  * Unfortunately we can't do sensible signal handling on windows...
648  */
649 #ifndef WIN32
650
651 /*
652  * When sigint is called, just tell the system to exit at the next possible
653  * moment.
654  */
655 static void
656 sigint_handler(int signum)
657 {
658         time_to_abort = true;
659 }
660
661 /*
662  * Trigger the output file to be reopened.
663  */
664 static void
665 sighup_handler(int signum)
666 {
667         output_reopen = true;
668 }
669 #endif
670
671
672 int
673 main(int argc, char **argv)
674 {
675         static struct option long_options[] = {
676 /* general options */
677                 {"file", required_argument, NULL, 'f'},
678                 {"fsync-interval", required_argument, NULL, 'F'},
679                 {"no-loop", no_argument, NULL, 'n'},
680                 {"verbose", no_argument, NULL, 'v'},
681                 {"version", no_argument, NULL, 'V'},
682                 {"help", no_argument, NULL, '?'},
683 /* connection options */
684                 {"dbname", required_argument, NULL, 'd'},
685                 {"host", required_argument, NULL, 'h'},
686                 {"port", required_argument, NULL, 'p'},
687                 {"username", required_argument, NULL, 'U'},
688                 {"no-password", no_argument, NULL, 'w'},
689                 {"password", no_argument, NULL, 'W'},
690 /* replication options */
691                 {"startpos", required_argument, NULL, 'I'},
692                 {"endpos", required_argument, NULL, 'E'},
693                 {"option", required_argument, NULL, 'o'},
694                 {"plugin", required_argument, NULL, 'P'},
695                 {"status-interval", required_argument, NULL, 's'},
696                 {"slot", required_argument, NULL, 'S'},
697 /* action */
698                 {"create-slot", no_argument, NULL, 1},
699                 {"start", no_argument, NULL, 2},
700                 {"drop-slot", no_argument, NULL, 3},
701                 {"if-not-exists", no_argument, NULL, 4},
702                 {NULL, 0, NULL, 0}
703         };
704         int                     c;
705         int                     option_index;
706         uint32          hi,
707                                 lo;
708         char       *db_name;
709
710         progname = get_progname(argv[0]);
711         set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
712
713         if (argc > 1)
714         {
715                 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
716                 {
717                         usage();
718                         exit(0);
719                 }
720                 else if (strcmp(argv[1], "-V") == 0 ||
721                                  strcmp(argv[1], "--version") == 0)
722                 {
723                         puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
724                         exit(0);
725                 }
726         }
727
728         while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:",
729                                                         long_options, &option_index)) != -1)
730         {
731                 switch (c)
732                 {
733 /* general options */
734                         case 'f':
735                                 outfile = pg_strdup(optarg);
736                                 break;
737                         case 'F':
738                                 fsync_interval = atoi(optarg) * 1000;
739                                 if (fsync_interval < 0)
740                                 {
741                                         fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
742                                                         progname, optarg);
743                                         exit(1);
744                                 }
745                                 break;
746                         case 'n':
747                                 noloop = 1;
748                                 break;
749                         case 'v':
750                                 verbose++;
751                                 break;
752 /* connection options */
753                         case 'd':
754                                 dbname = pg_strdup(optarg);
755                                 break;
756                         case 'h':
757                                 dbhost = pg_strdup(optarg);
758                                 break;
759                         case 'p':
760                                 if (atoi(optarg) <= 0)
761                                 {
762                                         fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
763                                                         progname, optarg);
764                                         exit(1);
765                                 }
766                                 dbport = pg_strdup(optarg);
767                                 break;
768                         case 'U':
769                                 dbuser = pg_strdup(optarg);
770                                 break;
771                         case 'w':
772                                 dbgetpassword = -1;
773                                 break;
774                         case 'W':
775                                 dbgetpassword = 1;
776                                 break;
777 /* replication options */
778                         case 'I':
779                                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
780                                 {
781                                         fprintf(stderr,
782                                                         _("%s: could not parse start position \"%s\"\n"),
783                                                         progname, optarg);
784                                         exit(1);
785                                 }
786                                 startpos = ((uint64) hi) << 32 | lo;
787                                 break;
788                         case 'E':
789                                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
790                                 {
791                                         fprintf(stderr,
792                                                         _("%s: could not parse end position \"%s\"\n"),
793                                                         progname, optarg);
794                                         exit(1);
795                                 }
796                                 endpos = ((uint64) hi) << 32 | lo;
797                                 break;
798                         case 'o':
799                                 {
800                                         char       *data = pg_strdup(optarg);
801                                         char       *val = strchr(data, '=');
802
803                                         if (val != NULL)
804                                         {
805                                                 /* remove =; separate data from val */
806                                                 *val = '\0';
807                                                 val++;
808                                         }
809
810                                         noptions += 1;
811                                         options = pg_realloc(options, sizeof(char *) * noptions * 2);
812
813                                         options[(noptions - 1) * 2] = data;
814                                         options[(noptions - 1) * 2 + 1] = val;
815                                 }
816
817                                 break;
818                         case 'P':
819                                 plugin = pg_strdup(optarg);
820                                 break;
821                         case 's':
822                                 standby_message_timeout = atoi(optarg) * 1000;
823                                 if (standby_message_timeout < 0)
824                                 {
825                                         fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
826                                                         progname, optarg);
827                                         exit(1);
828                                 }
829                                 break;
830                         case 'S':
831                                 replication_slot = pg_strdup(optarg);
832                                 break;
833 /* action */
834                         case 1:
835                                 do_create_slot = true;
836                                 break;
837                         case 2:
838                                 do_start_slot = true;
839                                 break;
840                         case 3:
841                                 do_drop_slot = true;
842                                 break;
843                         case 4:
844                                 slot_exists_ok = true;
845                                 break;
846
847                         default:
848
849                                 /*
850                                  * getopt_long already emitted a complaint
851                                  */
852                                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
853                                                 progname);
854                                 exit(1);
855                 }
856         }
857
858         /*
859          * Any non-option arguments?
860          */
861         if (optind < argc)
862         {
863                 fprintf(stderr,
864                                 _("%s: too many command-line arguments (first is \"%s\")\n"),
865                                 progname, argv[optind]);
866                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
867                                 progname);
868                 exit(1);
869         }
870
871         /*
872          * Required arguments
873          */
874         if (replication_slot == NULL)
875         {
876                 fprintf(stderr, _("%s: no slot specified\n"), progname);
877                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
878                                 progname);
879                 exit(1);
880         }
881
882         if (do_start_slot && outfile == NULL)
883         {
884                 fprintf(stderr, _("%s: no target file specified\n"), progname);
885                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
886                                 progname);
887                 exit(1);
888         }
889
890         if (!do_drop_slot && dbname == NULL)
891         {
892                 fprintf(stderr, _("%s: no database specified\n"), progname);
893                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
894                                 progname);
895                 exit(1);
896         }
897
898         if (!do_drop_slot && !do_create_slot && !do_start_slot)
899         {
900                 fprintf(stderr, _("%s: at least one action needs to be specified\n"), progname);
901                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
902                                 progname);
903                 exit(1);
904         }
905
906         if (do_drop_slot && (do_create_slot || do_start_slot))
907         {
908                 fprintf(stderr, _("%s: cannot use --create-slot or --start together with --drop-slot\n"), progname);
909                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
910                                 progname);
911                 exit(1);
912         }
913
914         if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
915         {
916                 fprintf(stderr, _("%s: cannot use --create-slot or --drop-slot together with --startpos\n"), progname);
917                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
918                                 progname);
919                 exit(1);
920         }
921
922         if (endpos != InvalidXLogRecPtr && !do_start_slot)
923         {
924                 fprintf(stderr,
925                                 _("%s: --endpos may only be specified with --start\n"),
926                                 progname);
927                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
928                                 progname);
929                 exit(1);
930         }
931
932 #ifndef WIN32
933         pqsignal(SIGINT, sigint_handler);
934         pqsignal(SIGHUP, sighup_handler);
935 #endif
936
937         /*
938          * Obtain a connection to server. This is not really necessary but it
939          * helps to get more precise error messages about authentication, required
940          * GUC parameters and such.
941          */
942         conn = GetConnection();
943         if (!conn)
944                 /* Error message already written in GetConnection() */
945                 exit(1);
946
947         /*
948          * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
949          * replication connection.
950          */
951         if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
952                 disconnect_and_exit(1);
953
954         if (db_name == NULL)
955         {
956                 fprintf(stderr,
957                                 _("%s: could not establish database-specific replication connection\n"),
958                                 progname);
959                 disconnect_and_exit(1);
960         }
961
962         /* Drop a replication slot. */
963         if (do_drop_slot)
964         {
965                 if (verbose)
966                         fprintf(stderr,
967                                         _("%s: dropping replication slot \"%s\"\n"),
968                                         progname, replication_slot);
969
970                 if (!DropReplicationSlot(conn, replication_slot))
971                         disconnect_and_exit(1);
972         }
973
974         /* Create a replication slot. */
975         if (do_create_slot)
976         {
977                 if (verbose)
978                         fprintf(stderr,
979                                         _("%s: creating replication slot \"%s\"\n"),
980                                         progname, replication_slot);
981
982                 if (!CreateReplicationSlot(conn, replication_slot, plugin,
983                                                                    false, slot_exists_ok))
984                         disconnect_and_exit(1);
985                 startpos = InvalidXLogRecPtr;
986         }
987
988         if (!do_start_slot)
989                 disconnect_and_exit(0);
990
991         /* Stream loop */
992         while (true)
993         {
994                 StreamLogicalLog();
995                 if (time_to_abort)
996                 {
997                         /*
998                          * We've been Ctrl-C'ed or reached an exit limit condition. That's
999                          * not an error, so exit without an errorcode.
1000                          */
1001                         disconnect_and_exit(0);
1002                 }
1003                 else if (noloop)
1004                 {
1005                         fprintf(stderr, _("%s: disconnected\n"), progname);
1006                         exit(1);
1007                 }
1008                 else
1009                 {
1010                         fprintf(stderr,
1011                         /* translator: check source for value for %d */
1012                                         _("%s: disconnected; waiting %d seconds to try again\n"),
1013                                         progname, RECONNECT_SLEEP_TIME);
1014                         pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
1015                 }
1016         }
1017 }
1018
1019 /*
1020  * Fsync our output data, and send a feedback message to the server.  Returns
1021  * true if successful, false otherwise.
1022  *
1023  * If successful, *now is updated to the current timestamp just before sending
1024  * feedback.
1025  */
1026 static bool
1027 flushAndSendFeedback(PGconn *conn, TimestampTz *now)
1028 {
1029         /* flush data to disk, so that we send a recent flush pointer */
1030         if (!OutputFsync(*now))
1031                 return false;
1032         *now = feGetCurrentTimestamp();
1033         if (!sendFeedback(conn, *now, true, false))
1034                 return false;
1035
1036         return true;
1037 }
1038
1039 /*
1040  * Try to inform the server about of upcoming demise, but don't wait around or
1041  * retry on failure.
1042  */
1043 static void
1044 prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
1045 {
1046         (void) PQputCopyEnd(conn, NULL);
1047         (void) PQflush(conn);
1048
1049         if (verbose)
1050         {
1051                 if (keepalive)
1052                         fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n",
1053                                         progname,
1054                                         (uint32) (endpos >> 32), (uint32) endpos);
1055                 else
1056                         fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n",
1057                                         progname, (uint32) (endpos >> 32), (uint32) (endpos),
1058                                         (uint32) (lsn >> 32), (uint32) lsn);
1059
1060         }
1061 }