From a89c46f9bc314ed549245d888da09b8c5cace104 Mon Sep 17 00:00:00 2001 From: Bruce Momjian Date: Wed, 9 Jan 2013 08:57:47 -0500 Subject: [PATCH] Allow parallel copy/link in pg_upgrade This patch implements parallel copying/linking of files by tablespace using the --jobs option in pg_upgrade. --- contrib/pg_upgrade/check.c | 8 +- contrib/pg_upgrade/info.c | 21 +++-- contrib/pg_upgrade/parallel.c | 147 ++++++++++++++++++++++++++++--- contrib/pg_upgrade/pg_upgrade.c | 2 +- contrib/pg_upgrade/pg_upgrade.h | 28 +++--- contrib/pg_upgrade/relfilenode.c | 100 +++++++++++++++------ contrib/pg_upgrade/tablespace.c | 14 +-- doc/src/sgml/pgupgrade.sgml | 9 +- 8 files changed, 256 insertions(+), 73 deletions(-) diff --git a/contrib/pg_upgrade/check.c b/contrib/pg_upgrade/check.c index 59f8fd0219..1780788e00 100644 --- a/contrib/pg_upgrade/check.c +++ b/contrib/pg_upgrade/check.c @@ -606,7 +606,7 @@ create_script_for_old_cluster_deletion(char **deletion_script_file_name) fprintf(script, RMDIR_CMD " %s\n", fix_path_separator(old_cluster.pgdata)); /* delete old cluster's alternate tablespaces */ - for (tblnum = 0; tblnum < os_info.num_tablespaces; tblnum++) + for (tblnum = 0; tblnum < os_info.num_old_tablespaces; tblnum++) { /* * Do the old cluster's per-database directories share a directory @@ -621,14 +621,14 @@ create_script_for_old_cluster_deletion(char **deletion_script_file_name) /* remove PG_VERSION? */ if (GET_MAJOR_VERSION(old_cluster.major_version) <= 804) fprintf(script, RM_CMD " %s%s%cPG_VERSION\n", - fix_path_separator(os_info.tablespaces[tblnum]), + fix_path_separator(os_info.old_tablespaces[tblnum]), fix_path_separator(old_cluster.tablespace_suffix), PATH_SEPARATOR); for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) { fprintf(script, RMDIR_CMD " %s%s%c%d\n", - fix_path_separator(os_info.tablespaces[tblnum]), + fix_path_separator(os_info.old_tablespaces[tblnum]), fix_path_separator(old_cluster.tablespace_suffix), PATH_SEPARATOR, old_cluster.dbarr.dbs[dbnum].db_oid); } @@ -640,7 +640,7 @@ create_script_for_old_cluster_deletion(char **deletion_script_file_name) * or a version-specific subdirectory. */ fprintf(script, RMDIR_CMD " %s%s\n", - fix_path_separator(os_info.tablespaces[tblnum]), + fix_path_separator(os_info.old_tablespaces[tblnum]), fix_path_separator(old_cluster.tablespace_suffix)); } diff --git a/contrib/pg_upgrade/info.c b/contrib/pg_upgrade/info.c index 0c11ff8857..7fd4584dff 100644 --- a/contrib/pg_upgrade/info.c +++ b/contrib/pg_upgrade/info.c @@ -106,20 +106,25 @@ create_rel_filename_map(const char *old_data, const char *new_data, * relation belongs to the default tablespace, hence relfiles should * exist in the data directories. */ - snprintf(map->old_dir, sizeof(map->old_dir), "%s/base/%u", old_data, - old_db->db_oid); - snprintf(map->new_dir, sizeof(map->new_dir), "%s/base/%u", new_data, - new_db->db_oid); + strlcpy(map->old_tablespace, old_data, sizeof(map->old_tablespace)); + strlcpy(map->new_tablespace, new_data, sizeof(map->new_tablespace)); + strlcpy(map->old_tablespace_suffix, "/base", sizeof(map->old_tablespace_suffix)); + strlcpy(map->new_tablespace_suffix, "/base", sizeof(map->new_tablespace_suffix)); } else { /* relation belongs to a tablespace, so use the tablespace location */ - snprintf(map->old_dir, sizeof(map->old_dir), "%s%s/%u", old_rel->tablespace, - old_cluster.tablespace_suffix, old_db->db_oid); - snprintf(map->new_dir, sizeof(map->new_dir), "%s%s/%u", new_rel->tablespace, - new_cluster.tablespace_suffix, new_db->db_oid); + strlcpy(map->old_tablespace, old_rel->tablespace, sizeof(map->old_tablespace)); + strlcpy(map->new_tablespace, new_rel->tablespace, sizeof(map->new_tablespace)); + strlcpy(map->old_tablespace_suffix, old_cluster.tablespace_suffix, + sizeof(map->old_tablespace_suffix)); + strlcpy(map->new_tablespace_suffix, new_cluster.tablespace_suffix, + sizeof(map->new_tablespace_suffix)); } + map->old_db_oid = old_db->db_oid; + map->new_db_oid = new_db->db_oid; + /* * old_relfilenode might differ from pg_class.oid (and hence * new_relfilenode) because of CLUSTER, REINDEX, or VACUUM FULL. diff --git a/contrib/pg_upgrade/parallel.c b/contrib/pg_upgrade/parallel.c index 8ea36bc6b9..d157511781 100644 --- a/contrib/pg_upgrade/parallel.c +++ b/contrib/pg_upgrade/parallel.c @@ -34,11 +34,24 @@ typedef struct { char log_file[MAXPGPATH]; char opt_log_file[MAXPGPATH]; char cmd[MAX_STRING]; -} thread_arg; +} exec_thread_arg; -thread_arg **thread_args; +typedef struct { + DbInfoArr *old_db_arr; + DbInfoArr *new_db_arr; + char old_pgdata[MAXPGPATH]; + char new_pgdata[MAXPGPATH]; + char old_tablespace[MAXPGPATH]; +} transfer_thread_arg; + +exec_thread_arg **exec_thread_args; +transfer_thread_arg **transfer_thread_args; + +/* track current thread_args struct so reap_child() can be used for all cases */ +void **cur_thread_args; -DWORD win32_exec_prog(thread_arg *args); +DWORD win32_exec_prog(exec_thread_arg *args); +DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args); #endif @@ -58,7 +71,7 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file, pid_t child; #else HANDLE child; - thread_arg *new_arg; + exec_thread_arg *new_arg; #endif va_start(args, fmt); @@ -71,7 +84,9 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file, else { /* parallel */ - +#ifdef WIN32 + cur_thread_args = (void **)exec_thread_args; +#endif /* harvest any dead children */ while (reap_child(false) == true) ; @@ -100,7 +115,7 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file, int i; thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE)); - thread_args = pg_malloc(user_opts.jobs * sizeof(thread_arg *)); + exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *)); /* * For safety and performance, we keep the args allocated during @@ -108,11 +123,11 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file, * in a thread different from the one that allocated it. */ for (i = 0; i < user_opts.jobs; i++) - thread_args[i] = pg_malloc(sizeof(thread_arg)); + exec_thread_args[i] = pg_malloc(sizeof(exec_thread_arg)); } /* use first empty array element */ - new_arg = thread_args[parallel_jobs-1]; + new_arg = exec_thread_args[parallel_jobs-1]; /* Can only pass one pointer into the function, so use a struct */ strcpy(new_arg->log_file, log_file); @@ -134,7 +149,7 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file, #ifdef WIN32 DWORD -win32_exec_prog(thread_arg *args) +win32_exec_prog(exec_thread_arg *args) { int ret; @@ -146,6 +161,112 @@ win32_exec_prog(thread_arg *args) #endif +/* + * parallel_transfer_all_new_dbs + * + * This has the same API as transfer_all_new_dbs, except it does parallel execution + * by transfering multiple tablespaces in parallel + */ +void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, + char *old_pgdata, char *new_pgdata, + char *old_tablespace) +{ +#ifndef WIN32 + pid_t child; +#else + HANDLE child; + transfer_thread_arg *new_arg; +#endif + + if (user_opts.jobs <= 1) + /* throw_error must be true to allow jobs */ + transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL); + else + { + /* parallel */ +#ifdef WIN32 + cur_thread_args = (void **)transfer_thread_args; +#endif + /* harvest any dead children */ + while (reap_child(false) == true) + ; + + /* must we wait for a dead child? */ + if (parallel_jobs >= user_opts.jobs) + reap_child(true); + + /* set this before we start the job */ + parallel_jobs++; + + /* Ensure stdio state is quiesced before forking */ + fflush(NULL); + +#ifndef WIN32 + child = fork(); + if (child == 0) + { + transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, + old_tablespace); + /* if we take another exit path, it will be non-zero */ + /* use _exit to skip atexit() functions */ + _exit(0); + } + else if (child < 0) + /* fork failed */ + pg_log(PG_FATAL, "could not create worker process: %s\n", strerror(errno)); +#else + if (thread_handles == NULL) + { + int i; + + thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE)); + transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *)); + + /* + * For safety and performance, we keep the args allocated during + * the entire life of the process, and we don't free the args + * in a thread different from the one that allocated it. + */ + for (i = 0; i < user_opts.jobs; i++) + transfer_thread_args[i] = pg_malloc(sizeof(transfer_thread_arg)); + } + + /* use first empty array element */ + new_arg = transfer_thread_args[parallel_jobs-1]; + + /* Can only pass one pointer into the function, so use a struct */ + new_arg->old_db_arr = old_db_arr; + new_arg->new_db_arr = new_db_arr; + strcpy(new_arg->old_pgdata, old_pgdata); + strcpy(new_arg->new_pgdata, new_pgdata); + strcpy(new_arg->old_tablespace, old_tablespace); + + child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog, + new_arg, 0, NULL); + if (child == 0) + pg_log(PG_FATAL, "could not create worker thread: %s\n", strerror(errno)); + + thread_handles[parallel_jobs-1] = child; +#endif + } + + return; +} + + +#ifdef WIN32 +DWORD +win32_transfer_all_new_dbs(transfer_thread_arg *args) +{ + transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata, + args->new_pgdata, args->old_tablespace); + + /* terminates thread */ + return 0; +} +#endif + + /* * collect status from a completed worker child */ @@ -195,7 +316,7 @@ reap_child(bool wait_for_child) /* Move last slot into dead child's position */ if (thread_num != parallel_jobs - 1) { - thread_arg *tmp_args; + void *tmp_args; thread_handles[thread_num] = thread_handles[parallel_jobs - 1]; @@ -205,9 +326,9 @@ reap_child(bool wait_for_child) * reused by the next created thread. Instead, the new thread * will use the arg struct of the thread that just died. */ - tmp_args = thread_args[thread_num]; - thread_args[thread_num] = thread_args[parallel_jobs - 1]; - thread_args[parallel_jobs - 1] = tmp_args; + tmp_args = cur_thread_args[thread_num]; + cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1]; + cur_thread_args[parallel_jobs - 1] = tmp_args; } #endif diff --git a/contrib/pg_upgrade/pg_upgrade.c b/contrib/pg_upgrade/pg_upgrade.c index 70c749d55b..85997e59bf 100644 --- a/contrib/pg_upgrade/pg_upgrade.c +++ b/contrib/pg_upgrade/pg_upgrade.c @@ -133,7 +133,7 @@ main(int argc, char **argv) if (user_opts.transfer_mode == TRANSFER_MODE_LINK) disable_old_cluster(); - transfer_all_new_dbs(&old_cluster.dbarr, &new_cluster.dbarr, + transfer_all_new_tablespaces(&old_cluster.dbarr, &new_cluster.dbarr, old_cluster.pgdata, new_cluster.pgdata); /* diff --git a/contrib/pg_upgrade/pg_upgrade.h b/contrib/pg_upgrade/pg_upgrade.h index c1a2f532e7..d5c3fa9e83 100644 --- a/contrib/pg_upgrade/pg_upgrade.h +++ b/contrib/pg_upgrade/pg_upgrade.h @@ -134,8 +134,12 @@ typedef struct */ typedef struct { - char old_dir[MAXPGPATH]; - char new_dir[MAXPGPATH]; + char old_tablespace[MAXPGPATH]; + char new_tablespace[MAXPGPATH]; + char old_tablespace_suffix[MAXPGPATH]; + char new_tablespace_suffix[MAXPGPATH]; + Oid old_db_oid; + Oid new_db_oid; /* * old/new relfilenodes might differ for pg_largeobject(_metadata) indexes @@ -276,8 +280,8 @@ typedef struct const char *progname; /* complete pathname for this program */ char *exec_path; /* full path to my executable */ char *user; /* username for clusters */ - char **tablespaces; /* tablespaces */ - int num_tablespaces; + char **old_tablespaces; /* tablespaces */ + int num_old_tablespaces; char **libraries; /* loadable libraries */ int num_libraries; ClusterInfo *running_cluster; @@ -398,9 +402,11 @@ void get_sock_dir(ClusterInfo *cluster, bool live_check); /* relfilenode.c */ void get_pg_database_relfilenode(ClusterInfo *cluster); -void transfer_all_new_dbs(DbInfoArr *olddb_arr, - DbInfoArr *newdb_arr, char *old_pgdata, char *new_pgdata); - +void transfer_all_new_tablespaces(DbInfoArr *old_db_arr, + DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata); +void transfer_all_new_dbs(DbInfoArr *old_db_arr, + DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, + char *old_tablespace); /* tablespace.c */ @@ -464,9 +470,11 @@ void old_8_3_invalidate_bpchar_pattern_ops_indexes(ClusterInfo *cluster, char *old_8_3_create_sequence_script(ClusterInfo *cluster); /* parallel.c */ -void parallel_exec_prog(const char *log_file, const char *opt_log_file, +void parallel_exec_prog(const char *log_file, const char *opt_log_file, const char *fmt,...) __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4))); - -bool reap_child(bool wait_for_child); +void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, + char *old_pgdata, char *new_pgdata, + char *old_tablespace); +bool reap_child(bool wait_for_child); diff --git a/contrib/pg_upgrade/relfilenode.c b/contrib/pg_upgrade/relfilenode.c index 9d0d5a0917..552f2033c2 100644 --- a/contrib/pg_upgrade/relfilenode.c +++ b/contrib/pg_upgrade/relfilenode.c @@ -16,11 +16,57 @@ static void transfer_single_new_db(pageCnvCtx *pageConverter, - FileNameMap *maps, int size); + FileNameMap *maps, int size, char *old_tablespace); static void transfer_relfile(pageCnvCtx *pageConverter, FileNameMap *map, const char *suffix); +/* + * transfer_all_new_tablespaces() + * + * Responsible for upgrading all database. invokes routines to generate mappings and then + * physically link the databases. + */ +void +transfer_all_new_tablespaces(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, + char *old_pgdata, char *new_pgdata) +{ + pg_log(PG_REPORT, "%s user relation files\n", + user_opts.transfer_mode == TRANSFER_MODE_LINK ? "Linking" : "Copying"); + + /* + * Transfering files by tablespace is tricky because a single database + * can use multiple tablespaces. For non-parallel mode, we just pass a + * NULL tablespace path, which matches all tablespaces. In parallel mode, + * we pass the default tablespace and all user-created tablespaces + * and let those operations happen in parallel. + */ + if (user_opts.jobs <= 1) + parallel_transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, + new_pgdata, NULL); + else + { + int tblnum; + + /* transfer default tablespace */ + parallel_transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, + new_pgdata, old_pgdata); + + for (tblnum = 0; tblnum < os_info.num_old_tablespaces; tblnum++) + parallel_transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, + new_pgdata, os_info.old_tablespaces[tblnum]); + /* reap all children */ + while (reap_child(true) == true) + ; + } + + end_progress_output(); + check_ok(); + + return; +} + + /* * transfer_all_new_dbs() * @@ -28,15 +74,12 @@ static void transfer_relfile(pageCnvCtx *pageConverter, FileNameMap *map, * physically link the databases. */ void -transfer_all_new_dbs(DbInfoArr *old_db_arr, - DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata) +transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, + char *old_pgdata, char *new_pgdata, char *old_tablespace) { int old_dbnum, new_dbnum; - pg_log(PG_REPORT, "%s user relation files\n", - user_opts.transfer_mode == TRANSFER_MODE_LINK ? "Linking" : "Copying"); - /* Scan the old cluster databases and transfer their files */ for (old_dbnum = new_dbnum = 0; old_dbnum < old_db_arr->ndbs; @@ -75,15 +118,13 @@ transfer_all_new_dbs(DbInfoArr *old_db_arr, #ifdef PAGE_CONVERSION pageConverter = setupPageConverter(); #endif - transfer_single_new_db(pageConverter, mappings, n_maps); + transfer_single_new_db(pageConverter, mappings, n_maps, + old_tablespace); pg_free(mappings); } } - end_progress_output(); - check_ok(); - return; } @@ -125,7 +166,7 @@ get_pg_database_relfilenode(ClusterInfo *cluster) */ static void transfer_single_new_db(pageCnvCtx *pageConverter, - FileNameMap *maps, int size) + FileNameMap *maps, int size, char *old_tablespace) { int mapnum; bool vm_crashsafe_match = true; @@ -140,18 +181,22 @@ transfer_single_new_db(pageCnvCtx *pageConverter, for (mapnum = 0; mapnum < size; mapnum++) { - /* transfer primary file */ - transfer_relfile(pageConverter, &maps[mapnum], ""); - - /* fsm/vm files added in PG 8.4 */ - if (GET_MAJOR_VERSION(old_cluster.major_version) >= 804) + if (old_tablespace == NULL || + strcmp(maps[mapnum].old_tablespace, old_tablespace) == 0) { - /* - * Copy/link any fsm and vm files, if they exist - */ - transfer_relfile(pageConverter, &maps[mapnum], "_fsm"); - if (vm_crashsafe_match) - transfer_relfile(pageConverter, &maps[mapnum], "_vm"); + /* transfer primary file */ + transfer_relfile(pageConverter, &maps[mapnum], ""); + + /* fsm/vm files added in PG 8.4 */ + if (GET_MAJOR_VERSION(old_cluster.major_version) >= 804) + { + /* + * Copy/link any fsm and vm files, if they exist + */ + transfer_relfile(pageConverter, &maps[mapnum], "_fsm"); + if (vm_crashsafe_match) + transfer_relfile(pageConverter, &maps[mapnum], "_vm"); + } } } } @@ -187,10 +232,12 @@ transfer_relfile(pageCnvCtx *pageConverter, FileNameMap *map, else snprintf(extent_suffix, sizeof(extent_suffix), ".%d", segno); - snprintf(old_file, sizeof(old_file), "%s/%u%s%s", map->old_dir, - map->old_relfilenode, type_suffix, extent_suffix); - snprintf(new_file, sizeof(new_file), "%s/%u%s%s", map->new_dir, - map->new_relfilenode, type_suffix, extent_suffix); + snprintf(old_file, sizeof(old_file), "%s%s/%u/%u%s%s", map->old_tablespace, + map->old_tablespace_suffix, map->old_db_oid, map->old_relfilenode, + type_suffix, extent_suffix); + snprintf(new_file, sizeof(new_file), "%s%s/%u/%u%s%s", map->new_tablespace, + map->new_tablespace_suffix, map->new_db_oid, map->new_relfilenode, + type_suffix, extent_suffix); /* Is it an extent, fsm, or vm file? */ if (type_suffix[0] != '\0' || segno != 0) @@ -239,3 +286,4 @@ transfer_relfile(pageCnvCtx *pageConverter, FileNameMap *map, return; } + diff --git a/contrib/pg_upgrade/tablespace.c b/contrib/pg_upgrade/tablespace.c index a93c51768a..321738dabc 100644 --- a/contrib/pg_upgrade/tablespace.c +++ b/contrib/pg_upgrade/tablespace.c @@ -23,7 +23,7 @@ init_tablespaces(void) set_tablespace_directory_suffix(&old_cluster); set_tablespace_directory_suffix(&new_cluster); - if (os_info.num_tablespaces > 0 && + if (os_info.num_old_tablespaces > 0 && strcmp(old_cluster.tablespace_suffix, new_cluster.tablespace_suffix) == 0) pg_log(PG_FATAL, "Cannot upgrade to/from the same system catalog version when\n" @@ -57,16 +57,16 @@ get_tablespace_paths(void) res = executeQueryOrDie(conn, "%s", query); - if ((os_info.num_tablespaces = PQntuples(res)) != 0) - os_info.tablespaces = (char **) pg_malloc( - os_info.num_tablespaces * sizeof(char *)); + if ((os_info.num_old_tablespaces = PQntuples(res)) != 0) + os_info.old_tablespaces = (char **) pg_malloc( + os_info.num_old_tablespaces * sizeof(char *)); else - os_info.tablespaces = NULL; + os_info.old_tablespaces = NULL; i_spclocation = PQfnumber(res, "spclocation"); - for (tblnum = 0; tblnum < os_info.num_tablespaces; tblnum++) - os_info.tablespaces[tblnum] = pg_strdup( + for (tblnum = 0; tblnum < os_info.num_old_tablespaces; tblnum++) + os_info.old_tablespaces[tblnum] = pg_strdup( PQgetvalue(res, tblnum, i_spclocation)); PQclear(res); diff --git a/doc/src/sgml/pgupgrade.sgml b/doc/src/sgml/pgupgrade.sgml index 53781e45ed..e0765babfc 100644 --- a/doc/src/sgml/pgupgrade.sgml +++ b/doc/src/sgml/pgupgrade.sgml @@ -342,10 +342,11 @@ NET STOP pgsql-8.3 (PostgreSQL 8.3 and older used a different s The -- 2.40.0