1 /*-------------------------------------------------------------------------
4 * code for taking a base backup and streaming it to a standby
6 * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
9 * src/backend/replication/basebackup.c
11 *-------------------------------------------------------------------------
15 #include <sys/types.h>
20 #include "access/xlog_internal.h" /* for pg_start/stop_backup */
21 #include "catalog/pg_type.h"
22 #include "lib/stringinfo.h"
23 #include "libpq/libpq.h"
24 #include "libpq/pqformat.h"
25 #include "nodes/pg_list.h"
26 #include "replication/basebackup.h"
27 #include "replication/walsender.h"
28 #include "storage/fd.h"
29 #include "storage/ipc.h"
30 #include "utils/builtins.h"
31 #include "utils/elog.h"
32 #include "utils/memutils.h"
33 #include "utils/ps_status.h"
44 static int64 sendDir(char *path, int basepathlen, bool sizeonly);
45 static void sendFile(char *readfilename, char *tarfilename,
46 struct stat * statbuf);
47 static void sendFileWithContent(const char *filename, const char *content);
48 static void _tarWriteHeader(const char *filename, char *linktarget,
49 struct stat * statbuf);
50 static void send_int8_string(StringInfoData *buf, int64 intval);
51 static void SendBackupHeader(List *tablespaces);
52 static void base_backup_cleanup(int code, Datum arg);
53 static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
54 static void parse_basebackup_options(List *options, basebackup_options *opt);
55 static void SendXlogRecPtrResult(XLogRecPtr ptr);
58 * Size of each block sent into the tar stream for larger files.
60 * XLogSegSize *MUST* be evenly dividable by this
62 #define TAR_SEND_SIZE 32768
73 * Called when ERROR or FATAL happens in perform_base_backup() after
74 * we have started the backup - make sure we end it!
77 base_backup_cleanup(int code, Datum arg)
83 * Actually do a base backup for the specified tablespaces.
85 * This is split out mainly to avoid complaints about "variable might be
86 * clobbered by longjmp" from stupider versions of gcc.
89 perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
95 startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile);
96 SendXlogRecPtrResult(startptr);
98 PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
100 List *tablespaces = NIL;
105 /* Collect information about all tablespaces */
106 while ((de = ReadDir(tblspcdir, "pg_tblspc")) != NULL)
108 char fullpath[MAXPGPATH];
109 char linkpath[MAXPGPATH];
111 /* Skip special stuff */
112 if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
115 snprintf(fullpath, sizeof(fullpath), "pg_tblspc/%s", de->d_name);
117 MemSet(linkpath, 0, sizeof(linkpath));
118 if (readlink(fullpath, linkpath, sizeof(linkpath) - 1) == -1)
121 (errmsg("unable to read symbolic link %s: %m", fullpath)));
125 ti = palloc(sizeof(tablespaceinfo));
126 ti->oid = pstrdup(de->d_name);
127 ti->path = pstrdup(linkpath);
128 ti->size = opt->progress ? sendDir(linkpath, strlen(linkpath), true) : -1;
129 tablespaces = lappend(tablespaces, ti);
132 /* Add a node for the base directory at the end */
133 ti = palloc0(sizeof(tablespaceinfo));
134 ti->size = opt->progress ? sendDir(".", 1, true) : -1;
135 tablespaces = lappend(tablespaces, ti);
137 /* Send tablespace header */
138 SendBackupHeader(tablespaces);
140 /* Send off our tablespaces one by one */
141 foreach(lc, tablespaces)
143 tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
146 /* Send CopyOutResponse message */
147 pq_beginmessage(&buf, 'H');
148 pq_sendbyte(&buf, 0); /* overall format */
149 pq_sendint(&buf, 0, 2); /* natts */
152 /* In the main tar, include the backup_label first. */
153 if (ti->path == NULL)
154 sendFileWithContent(BACKUP_LABEL_FILE, labelfile);
156 sendDir(ti->path == NULL ? "." : ti->path,
157 ti->path == NULL ? 1 : strlen(ti->path),
161 * If we're including WAL, and this is the main data directory we
162 * don't terminate the tar stream here. Instead, we will append
163 * the xlog files below and terminate it then. This is safe since
164 * the main data directory is always sent *last*.
166 if (opt->includewal && ti->path == NULL)
168 Assert(lnext(lc) == NULL);
171 pq_putemptymessage('c'); /* CopyDone */
174 PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
176 endptr = do_pg_stop_backup(labelfile);
181 * We've left the last tar file "open", so we can now append the
182 * required WAL files to it.
190 MemSet(&statbuf, 0, sizeof(statbuf));
191 statbuf.st_mode = S_IRUSR | S_IWUSR;
193 statbuf.st_uid = geteuid();
194 statbuf.st_gid = getegid();
196 statbuf.st_size = XLogSegSize;
197 statbuf.st_mtime = time(NULL);
199 XLByteToSeg(startptr, logid, logseg);
200 XLByteToPrevSeg(endptr, endlogid, endlogseg);
204 /* Send another xlog segment */
208 XLogFilePath(fn, ThisTimeLineID, logid, logseg);
209 _tarWriteHeader(fn, NULL, &statbuf);
211 /* Send the actual WAL file contents, block-by-block */
212 for (i = 0; i < XLogSegSize / TAR_SEND_SIZE; i++)
214 char buf[TAR_SEND_SIZE];
218 ptr.xrecoff = logseg * XLogSegSize + TAR_SEND_SIZE * i;
221 * Some old compilers, e.g. 2.95.3/x86, think that passing
222 * a struct in the same function as a longjump might clobber
223 * a variable. bjm 2011-02-04
224 * http://lists.apple.com/archives/xcode-users/2003/Dec//msg00051.html
226 XLogRead(buf, ptr, TAR_SEND_SIZE);
227 if (pq_putmessage('d', buf, TAR_SEND_SIZE))
229 (errmsg("base backup could not send data, aborting backup")));
233 * Files are always fixed size, and always end on a 512 byte
234 * boundary, so padding is never necessary.
238 /* Advance to the next WAL file */
239 NextLogSeg(logid, logseg);
241 /* Have we reached our stop position yet? */
242 if (logid > endlogid ||
243 (logid == endlogid && logseg > endlogseg))
247 /* Send CopyDone message for the last tar file */
248 pq_putemptymessage('c');
250 SendXlogRecPtrResult(endptr);
254 * Parse the base backup options passed down by the parser
257 parse_basebackup_options(List *options, basebackup_options *opt)
260 bool o_label = false;
261 bool o_progress = false;
265 MemSet(opt, 0, sizeof(*opt));
266 foreach(lopt, options)
268 DefElem *defel = (DefElem *) lfirst(lopt);
270 if (strcmp(defel->defname, "label") == 0)
274 (errcode(ERRCODE_SYNTAX_ERROR),
275 errmsg("duplicate option \"%s\"", defel->defname)));
276 opt->label = strVal(defel->arg);
279 else if (strcmp(defel->defname, "progress") == 0)
283 (errcode(ERRCODE_SYNTAX_ERROR),
284 errmsg("duplicate option \"%s\"", defel->defname)));
285 opt->progress = true;
288 else if (strcmp(defel->defname, "fast") == 0)
292 (errcode(ERRCODE_SYNTAX_ERROR),
293 errmsg("duplicate option \"%s\"", defel->defname)));
294 opt->fastcheckpoint = true;
297 else if (strcmp(defel->defname, "wal") == 0)
301 (errcode(ERRCODE_SYNTAX_ERROR),
302 errmsg("duplicate option \"%s\"", defel->defname)));
303 opt->includewal = true;
307 elog(ERROR, "option \"%s\" not recognized",
310 if (opt->label == NULL)
311 opt->label = "base backup";
316 * SendBaseBackup() - send a complete base backup.
318 * The function will put the system into backup mode like pg_start_backup()
319 * does, so that the backup is consistent even though we read directly from
320 * the filesystem, bypassing the buffer cache.
323 SendBaseBackup(BaseBackupCmd *cmd)
326 MemoryContext backup_context;
327 MemoryContext old_context;
328 basebackup_options opt;
330 parse_basebackup_options(cmd->options, &opt);
332 backup_context = AllocSetContextCreate(CurrentMemoryContext,
333 "Streaming base backup context",
334 ALLOCSET_DEFAULT_MINSIZE,
335 ALLOCSET_DEFAULT_INITSIZE,
336 ALLOCSET_DEFAULT_MAXSIZE);
337 old_context = MemoryContextSwitchTo(backup_context);
339 WalSndSetState(WALSNDSTATE_BACKUP);
341 if (update_process_title)
343 char activitymsg[50];
345 snprintf(activitymsg, sizeof(activitymsg), "sending backup \"%s\"",
347 set_ps_display(activitymsg, false);
350 /* Make sure we can open the directory with tablespaces in it */
351 dir = AllocateDir("pg_tblspc");
354 (errmsg("unable to open directory pg_tblspc: %m")));
356 perform_base_backup(&opt, dir);
360 MemoryContextSwitchTo(old_context);
361 MemoryContextDelete(backup_context);
365 send_int8_string(StringInfoData *buf, int64 intval)
369 sprintf(is, INT64_FORMAT, intval);
370 pq_sendint(buf, strlen(is), 4);
371 pq_sendbytes(buf, is, strlen(is));
375 SendBackupHeader(List *tablespaces)
380 /* Construct and send the directory information */
381 pq_beginmessage(&buf, 'T'); /* RowDescription */
382 pq_sendint(&buf, 3, 2); /* 3 fields */
384 /* First field - spcoid */
385 pq_sendstring(&buf, "spcoid");
386 pq_sendint(&buf, 0, 4); /* table oid */
387 pq_sendint(&buf, 0, 2); /* attnum */
388 pq_sendint(&buf, OIDOID, 4); /* type oid */
389 pq_sendint(&buf, 4, 2); /* typlen */
390 pq_sendint(&buf, 0, 4); /* typmod */
391 pq_sendint(&buf, 0, 2); /* format code */
393 /* Second field - spcpath */
394 pq_sendstring(&buf, "spclocation");
395 pq_sendint(&buf, 0, 4);
396 pq_sendint(&buf, 0, 2);
397 pq_sendint(&buf, TEXTOID, 4);
398 pq_sendint(&buf, -1, 2);
399 pq_sendint(&buf, 0, 4);
400 pq_sendint(&buf, 0, 2);
402 /* Third field - size */
403 pq_sendstring(&buf, "size");
404 pq_sendint(&buf, 0, 4);
405 pq_sendint(&buf, 0, 2);
406 pq_sendint(&buf, INT8OID, 4);
407 pq_sendint(&buf, 8, 2);
408 pq_sendint(&buf, 0, 4);
409 pq_sendint(&buf, 0, 2);
412 foreach(lc, tablespaces)
414 tablespaceinfo *ti = lfirst(lc);
416 /* Send one datarow message */
417 pq_beginmessage(&buf, 'D');
418 pq_sendint(&buf, 3, 2); /* number of columns */
419 if (ti->path == NULL)
421 pq_sendint(&buf, -1, 4); /* Length = -1 ==> NULL */
422 pq_sendint(&buf, -1, 4);
426 pq_sendint(&buf, strlen(ti->oid), 4); /* length */
427 pq_sendbytes(&buf, ti->oid, strlen(ti->oid));
428 pq_sendint(&buf, strlen(ti->path), 4); /* length */
429 pq_sendbytes(&buf, ti->path, strlen(ti->path));
432 send_int8_string(&buf, ti->size / 1024);
434 pq_sendint(&buf, -1, 4); /* NULL */
439 /* Send a CommandComplete message */
440 pq_puttextmessage('C', "SELECT");
444 * Send a single resultset containing just a single
445 * XlogRecPtr record (in text format)
448 SendXlogRecPtrResult(XLogRecPtr ptr)
451 char str[MAXFNAMELEN];
453 snprintf(str, sizeof(str), "%X/%X", ptr.xlogid, ptr.xrecoff);
455 pq_beginmessage(&buf, 'T'); /* RowDescription */
456 pq_sendint(&buf, 1, 2); /* 1 field */
459 pq_sendstring(&buf, "recptr");
460 pq_sendint(&buf, 0, 4); /* table oid */
461 pq_sendint(&buf, 0, 2); /* attnum */
462 pq_sendint(&buf, TEXTOID, 4); /* type oid */
463 pq_sendint(&buf, -1, 2);
464 pq_sendint(&buf, 0, 4);
465 pq_sendint(&buf, 0, 2);
469 pq_beginmessage(&buf, 'D');
470 pq_sendint(&buf, 1, 2); /* number of columns */
471 pq_sendint(&buf, strlen(str), 4); /* length */
472 pq_sendbytes(&buf, str, strlen(str));
475 /* Send a CommandComplete message */
476 pq_puttextmessage('C', "SELECT");
480 * Inject a file with given name and content in the output tar stream.
483 sendFileWithContent(const char *filename, const char *content)
488 len = strlen(content);
491 * Construct a stat struct for the backup_label file we're injecting
494 /* Windows doesn't have the concept of uid and gid */
499 statbuf.st_uid = geteuid();
500 statbuf.st_gid = getegid();
502 statbuf.st_mtime = time(NULL);
503 statbuf.st_mode = S_IRUSR | S_IWUSR;
504 statbuf.st_size = len;
506 _tarWriteHeader(filename, NULL, &statbuf);
507 /* Send the contents as a CopyData message */
508 pq_putmessage('d', content, len);
510 /* Pad to 512 byte boundary, per tar format requirements */
511 pad = ((len + 511) & ~511) - len;
516 pq_putmessage('d', buf, pad);
521 * Include all files from the given directory in the output tar stream. If
522 * 'sizeonly' is true, we just calculate a total length and return ig, without
523 * actually sending anything.
526 sendDir(char *path, int basepathlen, bool sizeonly)
530 char pathbuf[MAXPGPATH];
534 dir = AllocateDir(path);
535 while ((de = ReadDir(dir, path)) != NULL)
537 /* Skip special stuff */
538 if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
541 /* Skip temporary files */
542 if (strncmp(de->d_name,
544 strlen(PG_TEMP_FILE_PREFIX)) == 0)
548 * If there's a backup_label file, it belongs to a backup started by
549 * the user with pg_start_backup(). It is *not* correct for this
550 * backup, our backup_label is injected into the tar separately.
552 if (strcmp(de->d_name, BACKUP_LABEL_FILE) == 0)
556 * Check if the postmaster has signaled us to exit, and abort
557 * with an error in that case. The error handler further up
558 * will call do_pg_abort_backup() for us.
560 if (walsender_shutdown_requested || walsender_ready_to_stop)
562 (errmsg("shutdown requested, aborting active base backup")));
564 snprintf(pathbuf, MAXPGPATH, "%s/%s", path, de->d_name);
566 /* Skip postmaster.pid in the data directory */
567 if (strcmp(pathbuf, "./postmaster.pid") == 0)
570 if (lstat(pathbuf, &statbuf) != 0)
574 (errcode(errcode_for_file_access()),
575 errmsg("could not stat file or directory \"%s\": %m",
578 /* If the file went away while scanning, it's no error. */
583 * We can skip pg_xlog, the WAL segments need to be fetched from the
584 * WAL archive anyway. But include it as an empty directory anyway, so
585 * we get permissions right.
587 if (strcmp(pathbuf, "./pg_xlog") == 0)
591 /* If pg_xlog is a symlink, write it as a directory anyway */
593 if (S_ISLNK(statbuf.st_mode))
595 if (pgwin32_is_junction(pathbuf))
597 statbuf.st_mode = S_IFDIR | S_IRWXU;
598 _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
600 size += 512; /* Size of the header just added */
601 continue; /* don't recurse into pg_xlog */
605 if (S_ISLNK(statbuf.st_mode) && strcmp(path, "./pg_tblspc") == 0)
607 if (pgwin32_is_junction(pathbuf) && strcmp(path, "./pg_tblspc") == 0)
610 /* Allow symbolic links in pg_tblspc */
611 char linkpath[MAXPGPATH];
613 MemSet(linkpath, 0, sizeof(linkpath));
614 if (readlink(pathbuf, linkpath, sizeof(linkpath) - 1) == -1)
616 (errcode(errcode_for_file_access()),
617 errmsg("could not read symbolic link \"%s\": %m",
620 _tarWriteHeader(pathbuf + basepathlen + 1, linkpath, &statbuf);
621 size += 512; /* Size of the header just added */
623 else if (S_ISDIR(statbuf.st_mode))
626 * Store a directory entry in the tar file so we can get the
630 _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
631 size += 512; /* Size of the header just added */
633 /* call ourselves recursively for a directory */
634 size += sendDir(pathbuf, basepathlen, sizeonly);
636 else if (S_ISREG(statbuf.st_mode))
638 /* Add size, rounded up to 512byte block */
639 size += ((statbuf.st_size + 511) & ~511);
641 sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf);
642 size += 512; /* Size of the header of the file */
646 (errmsg("skipping special file \"%s\"", pathbuf)));
653 * Functions for handling tar file format
655 * Copied from pg_dump, but modified to work with libpq for sending
660 * Utility routine to print possibly larger than 32 bit integers in a
661 * portable fashion. Filled with zeros.
664 print_val(char *s, uint64 val, unsigned int base, size_t len)
668 for (i = len; i > 0; i--)
670 int digit = val % base;
672 s[i - 1] = '0' + digit;
678 * Maximum file size for a tar member: The limit inherent in the
679 * format is 2^33-1 bytes (nearly 8 GB). But we don't want to exceed
680 * what we can represent in pgoff_t.
682 #define MAX_TAR_MEMBER_FILELEN (((int64) 1 << Min(33, sizeof(pgoff_t)*8 - 1)) - 1)
685 _tarChecksum(char *header)
691 for (i = 0; i < 512; i++)
692 if (i < 148 || i >= 156)
693 sum += 0xFF & header[i];
694 return sum + 256; /* Assume 8 blanks in checksum field */
697 /* Given the member, write the TAR header & send the file */
699 sendFile(char *readfilename, char *tarfilename, struct stat *statbuf)
702 char buf[TAR_SEND_SIZE];
707 fp = AllocateFile(readfilename, "rb");
710 (errcode(errcode_for_file_access()),
711 errmsg("could not open file \"%s\": %m", readfilename)));
714 * Some compilers will throw a warning knowing this test can never be true
715 * because pgoff_t can't exceed the compared maximum on their platform.
717 if (statbuf->st_size > MAX_TAR_MEMBER_FILELEN)
719 (errmsg("archive member \"%s\" too large for tar format",
722 _tarWriteHeader(tarfilename, NULL, statbuf);
724 while ((cnt = fread(buf, 1, Min(sizeof(buf), statbuf->st_size - len), fp)) > 0)
726 /* Send the chunk as a CopyData message */
727 if (pq_putmessage('d', buf, cnt))
729 (errmsg("base backup could not send data, aborting backup")));
733 if (len >= statbuf->st_size)
736 * Reached end of file. The file could be longer, if it was
737 * extended while we were sending it, but for a base backup we can
738 * ignore such extended data. It will be restored from WAL.
744 /* If the file was truncated while we were sending it, pad it with zeros */
745 if (len < statbuf->st_size)
747 MemSet(buf, 0, sizeof(buf));
748 while (len < statbuf->st_size)
750 cnt = Min(sizeof(buf), statbuf->st_size - len);
751 pq_putmessage('d', buf, cnt);
756 /* Pad to 512 byte boundary, per tar format requirements */
757 pad = ((len + 511) & ~511) - len;
761 pq_putmessage('d', buf, pad);
769 _tarWriteHeader(const char *filename, char *linktarget, struct stat * statbuf)
775 memset(h, 0, sizeof(h));
778 sprintf(&h[0], "%.99s", filename);
779 if (linktarget != NULL || S_ISDIR(statbuf->st_mode))
782 * We only support symbolic links to directories, and this is
783 * indicated in the tar format by adding a slash at the end of the
784 * name, the same as for regular directories.
786 h[strlen(filename)] = '/';
787 h[strlen(filename) + 1] = '\0';
791 sprintf(&h[100], "%07o ", statbuf->st_mode);
794 sprintf(&h[108], "%07o ", statbuf->st_uid);
797 sprintf(&h[117], "%07o ", statbuf->st_gid);
799 /* File size 12 - 11 digits, 1 space, no NUL */
800 if (linktarget != NULL || S_ISDIR(statbuf->st_mode))
801 /* Symbolic link or directory has size zero */
802 print_val(&h[124], 0, 8, 11);
804 print_val(&h[124], statbuf->st_size, 8, 11);
805 sprintf(&h[135], " ");
808 sprintf(&h[136], "%011o ", (int) statbuf->st_mtime);
811 sprintf(&h[148], "%06o ", lastSum);
813 if (linktarget != NULL)
815 /* Type - Symbolic link */
816 sprintf(&h[156], "2");
817 strcpy(&h[157], linktarget);
819 else if (S_ISDIR(statbuf->st_mode))
820 /* Type - directory */
821 sprintf(&h[156], "5");
823 /* Type - regular file */
824 sprintf(&h[156], "0");
826 /* Link tag 100 (NULL) */
828 /* Magic 6 + Version 2 */
829 sprintf(&h[257], "ustar00");
832 /* XXX: Do we need to care about setting correct username? */
833 sprintf(&h[265], "%.31s", "postgres");
836 /* XXX: Do we need to care about setting correct group name? */
837 sprintf(&h[297], "%.31s", "postgres");
840 sprintf(&h[329], "%6o ", 0);
843 sprintf(&h[337], "%6o ", 0);
845 while ((sum = _tarChecksum(h)) != lastSum)
847 sprintf(&h[148], "%06o ", sum);
851 pq_putmessage('d', h, 512);