]> granicus.if.org Git - postgresql/blob - src/bin/pg_basebackup/pg_receivexlog.c
Replace the XLogInsert slots with regular LWLocks.
[postgresql] / src / bin / pg_basebackup / pg_receivexlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * pg_receivexlog.c - receive streaming transaction log data and write it
4  *                                        to a local file.
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  *                src/bin/pg_basebackup/pg_receivexlog.c
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres_fe.h"
16
17 #include <dirent.h>
18 #include <signal.h>
19 #include <sys/stat.h>
20 #include <sys/types.h>
21 #include <unistd.h>
22
23 #include "libpq-fe.h"
24 #include "access/xlog_internal.h"
25 #include "getopt_long.h"
26
27 #include "receivelog.h"
28 #include "streamutil.h"
29
30
31 /* Time to sleep between reconnection attempts */
32 #define RECONNECT_SLEEP_TIME 5
33
34 /* Global options */
35 static char *basedir = NULL;
36 static int      verbose = 0;
37 static int      noloop = 0;
38 static int      standby_message_timeout = 10 * 1000;            /* 10 sec = default */
39 static volatile bool time_to_abort = false;
40
41
42 static void usage(void);
43 static XLogRecPtr FindStreamingStart(uint32 *tli);
44 static void StreamLog();
45 static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
46                            bool segment_finished);
47
48 #define disconnect_and_exit(code)                               \
49         {                                                                                       \
50         if (conn != NULL) PQfinish(conn);                       \
51         exit(code);                                                                     \
52         }
53
54
55 static void
56 usage(void)
57 {
58         printf(_("%s receives PostgreSQL streaming transaction logs.\n\n"),
59                    progname);
60         printf(_("Usage:\n"));
61         printf(_("  %s [OPTION]...\n"), progname);
62         printf(_("\nOptions:\n"));
63         printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
64         printf(_("  -n, --no-loop          do not loop on connection lost\n"));
65         printf(_("  -v, --verbose          output verbose messages\n"));
66         printf(_("  -V, --version          output version information, then exit\n"));
67         printf(_("  -?, --help             show this help, then exit\n"));
68         printf(_("\nConnection options:\n"));
69         printf(_("  -d, --dbname=CONNSTR   connection string\n"));
70         printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
71         printf(_("  -p, --port=PORT        database server port number\n"));
72         printf(_("  -s, --status-interval=INTERVAL\n"
73                          "                         time between status packets sent to server (in seconds)\n"));
74         printf(_("  -U, --username=NAME    connect as specified database user\n"));
75         printf(_("  -w, --no-password      never prompt for password\n"));
76         printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
77         printf(_("      --slot=SLOTNAME    replication slot to use\n"));
78         printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
79 }
80
81 static bool
82 stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
83 {
84         static uint32 prevtimeline = 0;
85         static XLogRecPtr prevpos = InvalidXLogRecPtr;
86
87         /* we assume that we get called once at the end of each segment */
88         if (verbose && segment_finished)
89                 fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
90                                 progname, (uint32) (xlogpos >> 32), (uint32) xlogpos,
91                                 timeline);
92
93         /*
94          * Note that we report the previous, not current, position here. After a
95          * timeline switch, xlogpos points to the beginning of the segment because
96          * that's where we always begin streaming. Reporting the end of previous
97          * timeline isn't totally accurate, because the next timeline can begin
98          * slightly before the end of the WAL that we received on the previous
99          * timeline, but it's close enough for reporting purposes.
100          */
101         if (prevtimeline != 0 && prevtimeline != timeline)
102                 fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"),
103                                 progname, timeline,
104                                 (uint32) (prevpos >> 32), (uint32) prevpos);
105
106         prevtimeline = timeline;
107         prevpos = xlogpos;
108
109         if (time_to_abort)
110         {
111                 fprintf(stderr, _("%s: received interrupt signal, exiting\n"),
112                                 progname);
113                 return true;
114         }
115         return false;
116 }
117
118 /*
119  * Determine starting location for streaming, based on any existing xlog
120  * segments in the directory. We start at the end of the last one that is
121  * complete (size matches XLogSegSize), on the timeline with highest ID.
122  *
123  * If there are no WAL files in the directory, returns InvalidXLogRecPtr.
124  */
125 static XLogRecPtr
126 FindStreamingStart(uint32 *tli)
127 {
128         DIR                *dir;
129         struct dirent *dirent;
130         XLogSegNo       high_segno = 0;
131         uint32          high_tli = 0;
132         bool            high_ispartial = false;
133
134         dir = opendir(basedir);
135         if (dir == NULL)
136         {
137                 fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
138                                 progname, basedir, strerror(errno));
139                 disconnect_and_exit(1);
140         }
141
142         while ((dirent = readdir(dir)) != NULL)
143         {
144                 uint32          tli;
145                 XLogSegNo       segno;
146                 bool            ispartial;
147
148                 /*
149                  * Check if the filename looks like an xlog file, or a .partial file.
150                  * Xlog files are always 24 characters, and .partial files are 32
151                  * characters.
152                  */
153                 if (strlen(dirent->d_name) == 24)
154                 {
155                         if (strspn(dirent->d_name, "0123456789ABCDEF") != 24)
156                                 continue;
157                         ispartial = false;
158                 }
159                 else if (strlen(dirent->d_name) == 32)
160                 {
161                         if (strspn(dirent->d_name, "0123456789ABCDEF") != 24)
162                                 continue;
163                         if (strcmp(&dirent->d_name[24], ".partial") != 0)
164                                 continue;
165                         ispartial = true;
166                 }
167                 else
168                         continue;
169
170                 /*
171                  * Looks like an xlog file. Parse its position.
172                  */
173                 XLogFromFileName(dirent->d_name, &tli, &segno);
174
175                 /*
176                  * Check that the segment has the right size, if it's supposed to be
177                  * completed.
178                  */
179                 if (!ispartial)
180                 {
181                         struct stat statbuf;
182                         char            fullpath[MAXPGPATH];
183
184                         snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
185                         if (stat(fullpath, &statbuf) != 0)
186                         {
187                                 fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"),
188                                                 progname, fullpath, strerror(errno));
189                                 disconnect_and_exit(1);
190                         }
191
192                         if (statbuf.st_size != XLOG_SEG_SIZE)
193                         {
194                                 fprintf(stderr,
195                                                 _("%s: segment file \"%s\" has incorrect size %d, skipping\n"),
196                                                 progname, dirent->d_name, (int) statbuf.st_size);
197                                 continue;
198                         }
199                 }
200
201                 /* Looks like a valid segment. Remember that we saw it. */
202                 if ((segno > high_segno) ||
203                         (segno == high_segno && tli > high_tli) ||
204                         (segno == high_segno && tli == high_tli && high_ispartial && !ispartial))
205                 {
206                         high_segno = segno;
207                         high_tli = tli;
208                         high_ispartial = ispartial;
209                 }
210         }
211
212         closedir(dir);
213
214         if (high_segno > 0)
215         {
216                 XLogRecPtr      high_ptr;
217
218                 /*
219                  * Move the starting pointer to the start of the next segment, if
220                  * the highest one we saw was completed. Otherwise start streaming
221                  * from the beginning of the .partial segment.
222                  */
223                 if (!high_ispartial)
224                         high_segno++;
225
226                 XLogSegNoOffsetToRecPtr(high_segno, 0, high_ptr);
227
228                 *tli = high_tli;
229                 return high_ptr;
230         }
231         else
232                 return InvalidXLogRecPtr;
233 }
234
235 /*
236  * Start the log streaming
237  */
238 static void
239 StreamLog(void)
240 {
241         PGresult   *res;
242         XLogRecPtr      startpos;
243         uint32          starttli;
244         XLogRecPtr      serverpos;
245         uint32          servertli;
246         uint32          hi,
247                                 lo;
248
249         /*
250          * Connect in replication mode to the server
251          */
252         conn = GetConnection();
253         if (!conn)
254                 /* Error message already written in GetConnection() */
255                 return;
256
257         if (!CheckServerVersionForStreaming(conn))
258         {
259                 /*
260                  * Error message already written in CheckServerVersionForStreaming().
261                  * There's no hope of recovering from a version mismatch, so don't
262                  * retry.
263                  */
264                 disconnect_and_exit(1);
265         }
266
267         /*
268          * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
269          * position.
270          */
271         res = PQexec(conn, "IDENTIFY_SYSTEM");
272         if (PQresultStatus(res) != PGRES_TUPLES_OK)
273         {
274                 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
275                                 progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
276                 disconnect_and_exit(1);
277         }
278         if (PQntuples(res) != 1 || PQnfields(res) < 3)
279         {
280                 fprintf(stderr,
281                                 _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
282                                 progname, PQntuples(res), PQnfields(res), 1, 3);
283                 disconnect_and_exit(1);
284         }
285         servertli = atoi(PQgetvalue(res, 0, 1));
286         if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
287         {
288                 fprintf(stderr,
289                                 _("%s: could not parse transaction log location \"%s\"\n"),
290                                 progname, PQgetvalue(res, 0, 2));
291                 disconnect_and_exit(1);
292         }
293         serverpos = ((uint64) hi) << 32 | lo;
294         PQclear(res);
295
296         /*
297          * Figure out where to start streaming.
298          */
299         startpos = FindStreamingStart(&starttli);
300         if (startpos == InvalidXLogRecPtr)
301         {
302                 startpos = serverpos;
303                 starttli = servertli;
304         }
305
306         /*
307          * Always start streaming at the beginning of a segment
308          */
309         startpos -= startpos % XLOG_SEG_SIZE;
310
311         /*
312          * Start the replication
313          */
314         if (verbose)
315                 fprintf(stderr,
316                                 _("%s: starting log streaming at %X/%X (timeline %u)\n"),
317                                 progname, (uint32) (startpos >> 32), (uint32) startpos,
318                                 starttli);
319
320         ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
321                                           stop_streaming, standby_message_timeout, ".partial");
322
323         PQfinish(conn);
324 }
325
326 /*
327  * When sigint is called, just tell the system to exit at the next possible
328  * moment.
329  */
330 #ifndef WIN32
331
332 static void
333 sigint_handler(int signum)
334 {
335         time_to_abort = true;
336 }
337 #endif
338
339 int
340 main(int argc, char **argv)
341 {
342         static struct option long_options[] = {
343                 {"help", no_argument, NULL, '?'},
344                 {"version", no_argument, NULL, 'V'},
345                 {"directory", required_argument, NULL, 'D'},
346                 {"dbname", required_argument, NULL, 'd'},
347                 {"host", required_argument, NULL, 'h'},
348                 {"port", required_argument, NULL, 'p'},
349                 {"username", required_argument, NULL, 'U'},
350                 {"no-loop", no_argument, NULL, 'n'},
351                 {"no-password", no_argument, NULL, 'w'},
352                 {"password", no_argument, NULL, 'W'},
353                 {"status-interval", required_argument, NULL, 's'},
354                 {"slot", required_argument, NULL, 'S'},
355                 {"verbose", no_argument, NULL, 'v'},
356                 {NULL, 0, NULL, 0}
357         };
358
359         int                     c;
360         int                     option_index;
361
362         progname = get_progname(argv[0]);
363         set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog"));
364
365         if (argc > 1)
366         {
367                 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
368                 {
369                         usage();
370                         exit(0);
371                 }
372                 else if (strcmp(argv[1], "-V") == 0 ||
373                                  strcmp(argv[1], "--version") == 0)
374                 {
375                         puts("pg_receivexlog (PostgreSQL) " PG_VERSION);
376                         exit(0);
377                 }
378         }
379
380         while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
381                                                         long_options, &option_index)) != -1)
382         {
383                 switch (c)
384                 {
385                         case 'D':
386                                 basedir = pg_strdup(optarg);
387                                 break;
388                         case 'd':
389                                 connection_string = pg_strdup(optarg);
390                                 break;
391                         case 'h':
392                                 dbhost = pg_strdup(optarg);
393                                 break;
394                         case 'p':
395                                 if (atoi(optarg) <= 0)
396                                 {
397                                         fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
398                                                         progname, optarg);
399                                         exit(1);
400                                 }
401                                 dbport = pg_strdup(optarg);
402                                 break;
403                         case 'U':
404                                 dbuser = pg_strdup(optarg);
405                                 break;
406                         case 'w':
407                                 dbgetpassword = -1;
408                                 break;
409                         case 'W':
410                                 dbgetpassword = 1;
411                                 break;
412                         case 's':
413                                 standby_message_timeout = atoi(optarg) * 1000;
414                                 if (standby_message_timeout < 0)
415                                 {
416                                         fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
417                                                         progname, optarg);
418                                         exit(1);
419                                 }
420                                 break;
421                         case 'S':
422                                 replication_slot = pg_strdup(optarg);
423                                 break;
424                         case 'n':
425                                 noloop = 1;
426                                 break;
427                         case 'v':
428                                 verbose++;
429                                 break;
430                         default:
431
432                                 /*
433                                  * getopt_long already emitted a complaint
434                                  */
435                                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
436                                                 progname);
437                                 exit(1);
438                 }
439         }
440
441         /*
442          * Any non-option arguments?
443          */
444         if (optind < argc)
445         {
446                 fprintf(stderr,
447                                 _("%s: too many command-line arguments (first is \"%s\")\n"),
448                                 progname, argv[optind]);
449                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
450                                 progname);
451                 exit(1);
452         }
453
454         /*
455          * Required arguments
456          */
457         if (basedir == NULL)
458         {
459                 fprintf(stderr, _("%s: no target directory specified\n"), progname);
460                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
461                                 progname);
462                 exit(1);
463         }
464
465 #ifndef WIN32
466         pqsignal(SIGINT, sigint_handler);
467 #endif
468
469         while (true)
470         {
471                 StreamLog();
472                 if (time_to_abort)
473                 {
474                         /*
475                          * We've been Ctrl-C'ed. That's not an error, so exit without an
476                          * errorcode.
477                          */
478                         exit(0);
479                 }
480                 else if (noloop)
481                 {
482                         fprintf(stderr, _("%s: disconnected\n"), progname);
483                         exit(1);
484                 }
485                 else
486                 {
487                         fprintf(stderr,
488                         /* translator: check source for value for %d */
489                                         _("%s: disconnected; waiting %d seconds to try again\n"),
490                                         progname, RECONNECT_SLEEP_TIME);
491                         pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
492                 }
493         }
494 }