1 /*-------------------------------------------------------------------------
3 * pg_receivexlog.c - receive streaming transaction log data and write it
6 * Author: Magnus Hagander <magnus@hagander.net>
8 * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
11 * src/bin/pg_basebackup/pg_receivexlog.c
12 *-------------------------------------------------------------------------
15 #include "postgres_fe.h"
20 #include <sys/types.h>
24 #include "access/xlog_internal.h"
25 #include "getopt_long.h"
27 #include "receivelog.h"
28 #include "streamutil.h"
31 /* Time to sleep between reconnection attempts */
32 #define RECONNECT_SLEEP_TIME 5
35 static char *basedir = NULL;
36 static int verbose = 0;
37 static int noloop = 0;
38 static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
39 static volatile bool time_to_abort = false;
42 static void usage(void);
43 static XLogRecPtr FindStreamingStart(uint32 *tli);
44 static void StreamLog();
45 static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
46 bool segment_finished);
48 #define disconnect_and_exit(code) \
50 if (conn != NULL) PQfinish(conn); \
58 printf(_("%s receives PostgreSQL streaming transaction logs.\n\n"),
60 printf(_("Usage:\n"));
61 printf(_(" %s [OPTION]...\n"), progname);
62 printf(_("\nOptions:\n"));
63 printf(_(" -D, --directory=DIR receive transaction log files into this directory\n"));
64 printf(_(" -n, --no-loop do not loop on connection lost\n"));
65 printf(_(" -v, --verbose output verbose messages\n"));
66 printf(_(" -V, --version output version information, then exit\n"));
67 printf(_(" -?, --help show this help, then exit\n"));
68 printf(_("\nConnection options:\n"));
69 printf(_(" -d, --dbname=CONNSTR connection string\n"));
70 printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
71 printf(_(" -p, --port=PORT database server port number\n"));
72 printf(_(" -s, --status-interval=INTERVAL\n"
73 " time between status packets sent to server (in seconds)\n"));
74 printf(_(" -U, --username=NAME connect as specified database user\n"));
75 printf(_(" -w, --no-password never prompt for password\n"));
76 printf(_(" -W, --password force password prompt (should happen automatically)\n"));
77 printf(_(" --slot=SLOTNAME replication slot to use\n"));
78 printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
82 stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
84 static uint32 prevtimeline = 0;
85 static XLogRecPtr prevpos = InvalidXLogRecPtr;
87 /* we assume that we get called once at the end of each segment */
88 if (verbose && segment_finished)
89 fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
90 progname, (uint32) (xlogpos >> 32), (uint32) xlogpos,
94 * Note that we report the previous, not current, position here. After a
95 * timeline switch, xlogpos points to the beginning of the segment because
96 * that's where we always begin streaming. Reporting the end of previous
97 * timeline isn't totally accurate, because the next timeline can begin
98 * slightly before the end of the WAL that we received on the previous
99 * timeline, but it's close enough for reporting purposes.
101 if (prevtimeline != 0 && prevtimeline != timeline)
102 fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"),
104 (uint32) (prevpos >> 32), (uint32) prevpos);
106 prevtimeline = timeline;
111 fprintf(stderr, _("%s: received interrupt signal, exiting\n"),
119 * Determine starting location for streaming, based on any existing xlog
120 * segments in the directory. We start at the end of the last one that is
121 * complete (size matches XLogSegSize), on the timeline with highest ID.
123 * If there are no WAL files in the directory, returns InvalidXLogRecPtr.
126 FindStreamingStart(uint32 *tli)
129 struct dirent *dirent;
130 XLogSegNo high_segno = 0;
132 bool high_ispartial = false;
134 dir = opendir(basedir);
137 fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
138 progname, basedir, strerror(errno));
139 disconnect_and_exit(1);
142 while (errno = 0, (dirent = readdir(dir)) != NULL)
149 * Check if the filename looks like an xlog file, or a .partial file.
150 * Xlog files are always 24 characters, and .partial files are 32
153 if (strlen(dirent->d_name) == 24)
155 if (strspn(dirent->d_name, "0123456789ABCDEF") != 24)
159 else if (strlen(dirent->d_name) == 32)
161 if (strspn(dirent->d_name, "0123456789ABCDEF") != 24)
163 if (strcmp(&dirent->d_name[24], ".partial") != 0)
171 * Looks like an xlog file. Parse its position.
173 XLogFromFileName(dirent->d_name, &tli, &segno);
176 * Check that the segment has the right size, if it's supposed to be
182 char fullpath[MAXPGPATH];
184 snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
185 if (stat(fullpath, &statbuf) != 0)
187 fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"),
188 progname, fullpath, strerror(errno));
189 disconnect_and_exit(1);
192 if (statbuf.st_size != XLOG_SEG_SIZE)
195 _("%s: segment file \"%s\" has incorrect size %d, skipping\n"),
196 progname, dirent->d_name, (int) statbuf.st_size);
201 /* Looks like a valid segment. Remember that we saw it. */
202 if ((segno > high_segno) ||
203 (segno == high_segno && tli > high_tli) ||
204 (segno == high_segno && tli == high_tli && high_ispartial && !ispartial))
208 high_ispartial = ispartial;
213 /* Bug in old Mingw dirent.c; fixed in mingw-runtime-3.2, 2003-10-10 */
214 if (GetLastError() == ERROR_NO_MORE_FILES)
220 fprintf(stderr, _("%s: could not read directory \"%s\": %s\n"),
221 progname, basedir, strerror(errno));
222 disconnect_and_exit(1);
227 fprintf(stderr, _("%s: could not close directory \"%s\": %s\n"),
228 progname, basedir, strerror(errno));
229 disconnect_and_exit(1);
237 * Move the starting pointer to the start of the next segment, if
238 * the highest one we saw was completed. Otherwise start streaming
239 * from the beginning of the .partial segment.
244 XLogSegNoOffsetToRecPtr(high_segno, 0, high_ptr);
250 return InvalidXLogRecPtr;
254 * Start the log streaming
262 XLogRecPtr serverpos;
268 * Connect in replication mode to the server
270 conn = GetConnection();
272 /* Error message already written in GetConnection() */
275 if (!CheckServerVersionForStreaming(conn))
278 * Error message already written in CheckServerVersionForStreaming().
279 * There's no hope of recovering from a version mismatch, so don't
282 disconnect_and_exit(1);
286 * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
289 res = PQexec(conn, "IDENTIFY_SYSTEM");
290 if (PQresultStatus(res) != PGRES_TUPLES_OK)
292 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
293 progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
294 disconnect_and_exit(1);
296 if (PQntuples(res) != 1 || PQnfields(res) < 3)
299 _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
300 progname, PQntuples(res), PQnfields(res), 1, 3);
301 disconnect_and_exit(1);
303 servertli = atoi(PQgetvalue(res, 0, 1));
304 if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
307 _("%s: could not parse transaction log location \"%s\"\n"),
308 progname, PQgetvalue(res, 0, 2));
309 disconnect_and_exit(1);
311 serverpos = ((uint64) hi) << 32 | lo;
315 * Figure out where to start streaming.
317 startpos = FindStreamingStart(&starttli);
318 if (startpos == InvalidXLogRecPtr)
320 startpos = serverpos;
321 starttli = servertli;
325 * Always start streaming at the beginning of a segment
327 startpos -= startpos % XLOG_SEG_SIZE;
330 * Start the replication
334 _("%s: starting log streaming at %X/%X (timeline %u)\n"),
335 progname, (uint32) (startpos >> 32), (uint32) startpos,
338 ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
339 stop_streaming, standby_message_timeout, ".partial");
345 * When sigint is called, just tell the system to exit at the next possible
351 sigint_handler(int signum)
353 time_to_abort = true;
358 main(int argc, char **argv)
360 static struct option long_options[] = {
361 {"help", no_argument, NULL, '?'},
362 {"version", no_argument, NULL, 'V'},
363 {"directory", required_argument, NULL, 'D'},
364 {"dbname", required_argument, NULL, 'd'},
365 {"host", required_argument, NULL, 'h'},
366 {"port", required_argument, NULL, 'p'},
367 {"username", required_argument, NULL, 'U'},
368 {"no-loop", no_argument, NULL, 'n'},
369 {"no-password", no_argument, NULL, 'w'},
370 {"password", no_argument, NULL, 'W'},
371 {"status-interval", required_argument, NULL, 's'},
372 {"slot", required_argument, NULL, 'S'},
373 {"verbose", no_argument, NULL, 'v'},
380 progname = get_progname(argv[0]);
381 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog"));
385 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
390 else if (strcmp(argv[1], "-V") == 0 ||
391 strcmp(argv[1], "--version") == 0)
393 puts("pg_receivexlog (PostgreSQL) " PG_VERSION);
398 while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
399 long_options, &option_index)) != -1)
404 basedir = pg_strdup(optarg);
407 connection_string = pg_strdup(optarg);
410 dbhost = pg_strdup(optarg);
413 if (atoi(optarg) <= 0)
415 fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
419 dbport = pg_strdup(optarg);
422 dbuser = pg_strdup(optarg);
431 standby_message_timeout = atoi(optarg) * 1000;
432 if (standby_message_timeout < 0)
434 fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
440 replication_slot = pg_strdup(optarg);
451 * getopt_long already emitted a complaint
453 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
460 * Any non-option arguments?
465 _("%s: too many command-line arguments (first is \"%s\")\n"),
466 progname, argv[optind]);
467 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
477 fprintf(stderr, _("%s: no target directory specified\n"), progname);
478 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
484 pqsignal(SIGINT, sigint_handler);
493 * We've been Ctrl-C'ed. That's not an error, so exit without an
500 fprintf(stderr, _("%s: disconnected\n"), progname);
506 /* translator: check source for value for %d */
507 _("%s: disconnected; waiting %d seconds to try again\n"),
508 progname, RECONNECT_SLEEP_TIME);
509 pg_usleep(RECONNECT_SLEEP_TIME * 1000000);