From: Magnus Hagander Date: Mon, 10 Jan 2011 13:03:55 +0000 (+0100) Subject: Backend support for streaming base backups X-Git-Tag: REL9_1_ALPHA4~476 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=0eb59c4591ecf4f1c69d89e9f043a18e7dce9e47;p=postgresql Backend support for streaming base backups Add BASE_BACKUP command to walsender, allowing it to stream a base backup to the client (in tar format). The syntax is still far from ideal, that will be fixed in the switch to use a proper grammar for walsender. No client included yet, will come as a separate commit. Magnus Hagander and Heikki Linnakangas --- diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 220a197286..80c14fb74c 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1458,6 +1458,98 @@ The commands accepted in walsender mode are: + + + BASE_BACKUP options;label + + + Instructs the server to start streaming a base backup. + The system will automatically be put in backup mode with the label + specified in label before the backup is started, and + taken out of it when the backup is complete. The following options + are accepted: + + + PROGRESS + + + Request information required to generate a progress report. This will + send back an approximate size in the header of each tablespace, which + can be used to calculate how far along the stream is done. This is + calculated by enumerating all the file sizes once before the transfer + is even started, and may as such have a negative impact on the + performance - in particular it may take longer before the first data + is streamed. Since the database files can change during the backup, + the size is only approximate and may both grow and shrink between + the time of approximation and the sending of the actual files. + + + + + + + When the backup is started, the server will first send a header in + ordinary result set format, followed by one or more CopyResponse + results, one for PGDATA and one for each additional tablespace other + than pg_default and pg_global. The data in + the CopyResponse results will be a tar format (using ustar00 + extensions) dump of the tablespace contents. + + + The header is an ordinary resultset with one row for each tablespace. + The fields in this row are: + + + spcoid + + + The oid of the tablespace, or NULL if it's the base + directory. + + + + + spclocation + + + The full path of the tablespace directory, or NULL + if it's the base directory. + + + + + size + + + The approximate size of the tablespace, if progress report has + been requested; otherwise it's NULL. + + + + + + + The tar archive for the data directory and each tablespace will contain + all files in the directories, regardless of whether they are + PostgreSQL files or other files added to the same + directory. The only excluded files are: + + + + postmaster.pid + + + + + pg_xlog (including subdirectories) + + + + Owner, group and file mode are set if the underlying filesystem on + the server supports it. + + + diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index e9d9886113..21fc096df3 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -12,6 +12,6 @@ subdir = src/backend/replication top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = walsender.o walreceiverfuncs.o walreceiver.o +OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c new file mode 100644 index 0000000000..0ebeef23b1 --- /dev/null +++ b/src/backend/replication/basebackup.c @@ -0,0 +1,555 @@ +/*------------------------------------------------------------------------- + * + * basebackup.c + * code for taking a base backup and streaming it to a standby + * + * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/basebackup.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include +#include +#include +#include + +#include "access/xlog_internal.h" /* for pg_start/stop_backup */ +#include "catalog/pg_type.h" +#include "lib/stringinfo.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "nodes/pg_list.h" +#include "replication/basebackup.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "utils/builtins.h" +#include "utils/elog.h" +#include "utils/memutils.h" + +static int64 sendDir(char *path, int basepathlen, bool sizeonly); +static void sendFile(char *path, int basepathlen, struct stat * statbuf); +static void _tarWriteHeader(char *filename, char *linktarget, + struct stat * statbuf); +static void send_int8_string(StringInfoData *buf, int64 intval); +static void SendBackupHeader(List *tablespaces); +static void SendBackupDirectory(char *location, char *spcoid); +static void base_backup_cleanup(int code, Datum arg); + +typedef struct +{ + char *oid; + char *path; + int64 size; +} tablespaceinfo; + + +/* + * Called when ERROR or FATAL happens in SendBaseBackup() after + * we have started the backup - make sure we end it! + */ +static void +base_backup_cleanup(int code, Datum arg) +{ + do_pg_abort_backup(); +} + +/* + * SendBaseBackup() - send a complete base backup. + * + * The function will take care of running pg_start_backup() and + * pg_stop_backup() for the user. + */ +void +SendBaseBackup(const char *options) +{ + DIR *dir; + struct dirent *de; + char *backup_label = strchr(options, ';'); + bool progress = false; + List *tablespaces = NIL; + tablespaceinfo *ti; + MemoryContext backup_context; + MemoryContext old_context; + + backup_context = AllocSetContextCreate(CurrentMemoryContext, + "Streaming base backup context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + old_context = MemoryContextSwitchTo(backup_context); + + if (backup_label == NULL) + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid base backup options: %s", options))); + backup_label++; /* Walk past the semicolon */ + + /* Currently the only option string supported is PROGRESS */ + if (strncmp(options, "PROGRESS", 8) == 0) + progress = true; + else if (options[0] != ';') + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid base backup options: %s", options))); + + /* Make sure we can open the directory with tablespaces in it */ + dir = AllocateDir("pg_tblspc"); + if (!dir) + ereport(ERROR, + (errmsg("unable to open directory pg_tblspc: %m"))); + + /* Add a node for the base directory */ + ti = palloc0(sizeof(tablespaceinfo)); + ti->size = progress ? sendDir(".", 1, true) : -1; + tablespaces = lappend(tablespaces, ti); + + /* Collect information about all tablespaces */ + while ((de = ReadDir(dir, "pg_tblspc")) != NULL) + { + char fullpath[MAXPGPATH]; + char linkpath[MAXPGPATH]; + + /* Skip special stuff */ + if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) + continue; + + snprintf(fullpath, sizeof(fullpath), "pg_tblspc/%s", de->d_name); + + MemSet(linkpath, 0, sizeof(linkpath)); + if (readlink(fullpath, linkpath, sizeof(linkpath) - 1) == -1) + { + ereport(WARNING, + (errmsg("unable to read symbolic link %s: %m", fullpath))); + continue; + } + + ti = palloc(sizeof(tablespaceinfo)); + ti->oid = pstrdup(de->d_name); + ti->path = pstrdup(linkpath); + ti->size = progress ? sendDir(linkpath, strlen(linkpath), true) : -1; + tablespaces = lappend(tablespaces, ti); + } + FreeDir(dir); + + do_pg_start_backup(backup_label, true); + + PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0); + { + ListCell *lc; + + /* Send tablespace header */ + SendBackupHeader(tablespaces); + + /* Send off our tablespaces one by one */ + foreach(lc, tablespaces) + { + ti = (tablespaceinfo *) lfirst(lc); + + SendBackupDirectory(ti->path, ti->oid); + } + } + PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0); + + do_pg_stop_backup(); + + MemoryContextSwitchTo(old_context); + MemoryContextDelete(backup_context); +} + +static void +send_int8_string(StringInfoData *buf, int64 intval) +{ + char is[32]; + + sprintf(is, INT64_FORMAT, intval); + pq_sendint(buf, strlen(is), 4); + pq_sendbytes(buf, is, strlen(is)); +} + +static void +SendBackupHeader(List *tablespaces) +{ + StringInfoData buf; + ListCell *lc; + + /* Construct and send the directory information */ + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint(&buf, 3, 2); /* 3 fields */ + + /* First field - spcoid */ + pq_sendstring(&buf, "spcoid"); + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, OIDOID, 4); /* type oid */ + pq_sendint(&buf, 4, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + + /* Second field - spcpath */ + pq_sendstring(&buf, "spclocation"); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); + pq_sendint(&buf, TEXTOID, 4); + pq_sendint(&buf, -1, 2); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); + + /* Third field - size */ + pq_sendstring(&buf, "size"); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); + pq_sendint(&buf, INT8OID, 4); + pq_sendint(&buf, 8, 2); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); + pq_endmessage(&buf); + + foreach(lc, tablespaces) + { + tablespaceinfo *ti = lfirst(lc); + + /* Send one datarow message */ + pq_beginmessage(&buf, 'D'); + pq_sendint(&buf, 3, 2); /* number of columns */ + if (ti->path == NULL) + { + pq_sendint(&buf, -1, 4); /* Length = -1 ==> NULL */ + pq_sendint(&buf, -1, 4); + } + else + { + pq_sendint(&buf, strlen(ti->oid), 4); /* length */ + pq_sendbytes(&buf, ti->oid, strlen(ti->oid)); + pq_sendint(&buf, strlen(ti->path), 4); /* length */ + pq_sendbytes(&buf, ti->path, strlen(ti->path)); + } + if (ti->size >= 0) + send_int8_string(&buf, ti->size / 1024); + else + pq_sendint(&buf, -1, 4); /* NULL */ + + pq_endmessage(&buf); + } + + /* Send a CommandComplete message */ + pq_puttextmessage('C', "SELECT"); +} + +static void +SendBackupDirectory(char *location, char *spcoid) +{ + StringInfoData buf; + + /* Send CopyOutResponse message */ + pq_beginmessage(&buf, 'H'); + pq_sendbyte(&buf, 0); /* overall format */ + pq_sendint(&buf, 0, 2); /* natts */ + pq_endmessage(&buf); + + /* tar up the data directory if NULL, otherwise the tablespace */ + sendDir(location == NULL ? "." : location, + location == NULL ? 1 : strlen(location), + false); + + /* Send CopyDone message */ + pq_putemptymessage('c'); +} + + +static int64 +sendDir(char *path, int basepathlen, bool sizeonly) +{ + DIR *dir; + struct dirent *de; + char pathbuf[MAXPGPATH]; + struct stat statbuf; + int64 size = 0; + + dir = AllocateDir(path); + while ((de = ReadDir(dir, path)) != NULL) + { + /* Skip special stuff */ + if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) + continue; + + snprintf(pathbuf, MAXPGPATH, "%s/%s", path, de->d_name); + + /* Skip postmaster.pid in the data directory */ + if (strcmp(pathbuf, "./postmaster.pid") == 0) + continue; + + if (lstat(pathbuf, &statbuf) != 0) + { + if (errno != ENOENT) + ereport(ERROR, + (errcode(errcode_for_file_access()), + errmsg("could not stat file or directory \"%s\": %m", + pathbuf))); + + /* If the file went away while scanning, it's no error. */ + continue; + } + + /* + * We can skip pg_xlog, the WAL segments need to be fetched from the + * WAL archive anyway. But include it as an empty directory anyway, so + * we get permissions right. + */ + if (strcmp(pathbuf, "./pg_xlog") == 0) + { + if (!sizeonly) + _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf); + size += 512; /* Size of the header just added */ + continue; /* don't recurse into pg_xlog */ + } + +#ifndef WIN32 + if (S_ISLNK(statbuf.st_mode) && strcmp(path, "./pg_tblspc") == 0) +#else + if (pgwin32_is_junction(pathbuf) && strcmp(path, "./pg_tblspc") == 0) +#endif + { + /* Allow symbolic links in pg_tblspc */ + char linkpath[MAXPGPATH]; + + MemSet(linkpath, 0, sizeof(linkpath)); + if (readlink(pathbuf, linkpath, sizeof(linkpath) - 1) == -1) + ereport(ERROR, + (errcode(errcode_for_file_access()), + errmsg("could not read symbolic link \"%s\": %m", + pathbuf))); + if (!sizeonly) + _tarWriteHeader(pathbuf + basepathlen + 1, linkpath, &statbuf); + size += 512; /* Size of the header just added */ + } + else if (S_ISDIR(statbuf.st_mode)) + { + /* + * Store a directory entry in the tar file so we can get the + * permissions right. + */ + if (!sizeonly) + _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf); + size += 512; /* Size of the header just added */ + + /* call ourselves recursively for a directory */ + size += sendDir(pathbuf, basepathlen, sizeonly); + } + else if (S_ISREG(statbuf.st_mode)) + { + /* Add size, rounded up to 512byte block */ + size += ((statbuf.st_size + 511) & ~511); + if (!sizeonly) + sendFile(pathbuf, basepathlen, &statbuf); + size += 512; /* Size of the header of the file */ + } + else + ereport(WARNING, + (errmsg("skipping special file \"%s\"", pathbuf))); + } + FreeDir(dir); + return size; +} + +/***** + * Functions for handling tar file format + * + * Copied from pg_dump, but modified to work with libpq for sending + */ + + +/* + * Utility routine to print possibly larger than 32 bit integers in a + * portable fashion. Filled with zeros. + */ +static void +print_val(char *s, uint64 val, unsigned int base, size_t len) +{ + int i; + + for (i = len; i > 0; i--) + { + int digit = val % base; + + s[i - 1] = '0' + digit; + val = val / base; + } +} + +/* + * Maximum file size for a tar member: The limit inherent in the + * format is 2^33-1 bytes (nearly 8 GB). But we don't want to exceed + * what we can represent in pgoff_t. + */ +#define MAX_TAR_MEMBER_FILELEN (((int64) 1 << Min(33, sizeof(pgoff_t)*8 - 1)) - 1) + +static int +_tarChecksum(char *header) +{ + int i, + sum; + + sum = 0; + for (i = 0; i < 512; i++) + if (i < 148 || i >= 156) + sum += 0xFF & header[i]; + return sum + 256; /* Assume 8 blanks in checksum field */ +} + +/* Given the member, write the TAR header & send the file */ +static void +sendFile(char *filename, int basepathlen, struct stat * statbuf) +{ + FILE *fp; + char buf[32768]; + size_t cnt; + pgoff_t len = 0; + size_t pad; + + fp = AllocateFile(filename, "rb"); + if (fp == NULL) + ereport(ERROR, + (errcode(errcode_for_file_access()), + errmsg("could not open file \"%s\": %m", filename))); + + /* + * Some compilers will throw a warning knowing this test can never be true + * because pgoff_t can't exceed the compared maximum on their platform. + */ + if (statbuf->st_size > MAX_TAR_MEMBER_FILELEN) + ereport(ERROR, + (errmsg("archive member \"%s\" too large for tar format", + filename))); + + _tarWriteHeader(filename + basepathlen + 1, NULL, statbuf); + + while ((cnt = fread(buf, 1, Min(sizeof(buf), statbuf->st_size - len), fp)) > 0) + { + /* Send the chunk as a CopyData message */ + pq_putmessage('d', buf, cnt); + len += cnt; + + if (len >= statbuf->st_size) + { + /* + * Reached end of file. The file could be longer, if it was + * extended while we were sending it, but for a base backup we can + * ignore such extended data. It will be restored from WAL. + */ + break; + } + } + + /* If the file was truncated while we were sending it, pad it with zeros */ + if (len < statbuf->st_size) + { + MemSet(buf, 0, sizeof(buf)); + while (len < statbuf->st_size) + { + cnt = Min(sizeof(buf), statbuf->st_size - len); + pq_putmessage('d', buf, cnt); + len += cnt; + } + } + + /* Pad to 512 byte boundary, per tar format requirements */ + pad = ((len + 511) & ~511) - len; + if (pad > 0) + { + MemSet(buf, 0, pad); + pq_putmessage('d', buf, pad); + } + + FreeFile(fp); +} + + +static void +_tarWriteHeader(char *filename, char *linktarget, struct stat * statbuf) +{ + char h[512]; + int lastSum = 0; + int sum; + + memset(h, 0, sizeof(h)); + + /* Name 100 */ + sprintf(&h[0], "%.99s", filename); + if (linktarget != NULL || S_ISDIR(statbuf->st_mode)) + { + /* + * We only support symbolic links to directories, and this is + * indicated in the tar format by adding a slash at the end of the + * name, the same as for regular directories. + */ + h[strlen(filename)] = '/'; + h[strlen(filename) + 1] = '\0'; + } + + /* Mode 8 */ + sprintf(&h[100], "%07o ", statbuf->st_mode); + + /* User ID 8 */ + sprintf(&h[108], "%07o ", statbuf->st_uid); + + /* Group 8 */ + sprintf(&h[117], "%07o ", statbuf->st_gid); + + /* File size 12 - 11 digits, 1 space, no NUL */ + if (linktarget != NULL || S_ISDIR(statbuf->st_mode)) + /* Symbolic link or directory has size zero */ + print_val(&h[124], 0, 8, 11); + else + print_val(&h[124], statbuf->st_size, 8, 11); + sprintf(&h[135], " "); + + /* Mod Time 12 */ + sprintf(&h[136], "%011o ", (int) statbuf->st_mtime); + + /* Checksum 8 */ + sprintf(&h[148], "%06o ", lastSum); + + if (linktarget != NULL) + { + /* Type - Symbolic link */ + sprintf(&h[156], "2"); + strcpy(&h[157], linktarget); + } + else if (S_ISDIR(statbuf->st_mode)) + /* Type - directory */ + sprintf(&h[156], "5"); + else + /* Type - regular file */ + sprintf(&h[156], "0"); + + /* Link tag 100 (NULL) */ + + /* Magic 6 + Version 2 */ + sprintf(&h[257], "ustar00"); + + /* User 32 */ + /* XXX: Do we need to care about setting correct username? */ + sprintf(&h[265], "%.31s", "postgres"); + + /* Group 32 */ + /* XXX: Do we need to care about setting correct group name? */ + sprintf(&h[297], "%.31s", "postgres"); + + /* Maj Dev 8 */ + sprintf(&h[329], "%6o ", 0); + + /* Min Dev 8 */ + sprintf(&h[337], "%6o ", 0); + + while ((sum = _tarChecksum(h)) != lastSum) + { + sprintf(&h[148], "%06o ", sum); + lastSum = sum; + } + + pq_putmessage('d', h, 512); +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index f2a3ee2073..bd35f31e6f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -44,6 +44,7 @@ #include "libpq/pqformat.h" #include "libpq/pqsignal.h" #include "miscadmin.h" +#include "replication/basebackup.h" #include "replication/walprotocol.h" #include "replication/walsender.h" #include "storage/fd.h" @@ -54,6 +55,7 @@ #include "utils/guc.h" #include "utils/memutils.h" #include "utils/ps_status.h" +#include "utils/resowner.h" /* Array of WalSnds in shared memory */ @@ -136,6 +138,9 @@ WalSenderMain(void) ALLOCSET_DEFAULT_MAXSIZE); MemoryContextSwitchTo(walsnd_context); + /* Set up resource owner */ + CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner"); + /* Unblock signals (they were blocked when the postmaster forked us) */ PG_SETMASK(&UnBlockSig); @@ -305,6 +310,15 @@ WalSndHandshake(void) /* break out of the loop */ replication_started = true; } + else if (strncmp(query_string, "BASE_BACKUP ", 12) == 0) + { + /* Command is BASE_BACKUP ;