fprintf(script, RMDIR_CMD " %s\n", fix_path_separator(old_cluster.pgdata));
/* delete old cluster's alternate tablespaces */
- for (tblnum = 0; tblnum < os_info.num_tablespaces; tblnum++)
+ for (tblnum = 0; tblnum < os_info.num_old_tablespaces; tblnum++)
{
/*
* Do the old cluster's per-database directories share a directory
/* remove PG_VERSION? */
if (GET_MAJOR_VERSION(old_cluster.major_version) <= 804)
fprintf(script, RM_CMD " %s%s%cPG_VERSION\n",
- fix_path_separator(os_info.tablespaces[tblnum]),
+ fix_path_separator(os_info.old_tablespaces[tblnum]),
fix_path_separator(old_cluster.tablespace_suffix),
PATH_SEPARATOR);
for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
{
fprintf(script, RMDIR_CMD " %s%s%c%d\n",
- fix_path_separator(os_info.tablespaces[tblnum]),
+ fix_path_separator(os_info.old_tablespaces[tblnum]),
fix_path_separator(old_cluster.tablespace_suffix),
PATH_SEPARATOR, old_cluster.dbarr.dbs[dbnum].db_oid);
}
* or a version-specific subdirectory.
*/
fprintf(script, RMDIR_CMD " %s%s\n",
- fix_path_separator(os_info.tablespaces[tblnum]),
+ fix_path_separator(os_info.old_tablespaces[tblnum]),
fix_path_separator(old_cluster.tablespace_suffix));
}
* relation belongs to the default tablespace, hence relfiles should
* exist in the data directories.
*/
- snprintf(map->old_dir, sizeof(map->old_dir), "%s/base/%u", old_data,
- old_db->db_oid);
- snprintf(map->new_dir, sizeof(map->new_dir), "%s/base/%u", new_data,
- new_db->db_oid);
+ strlcpy(map->old_tablespace, old_data, sizeof(map->old_tablespace));
+ strlcpy(map->new_tablespace, new_data, sizeof(map->new_tablespace));
+ strlcpy(map->old_tablespace_suffix, "/base", sizeof(map->old_tablespace_suffix));
+ strlcpy(map->new_tablespace_suffix, "/base", sizeof(map->new_tablespace_suffix));
}
else
{
/* relation belongs to a tablespace, so use the tablespace location */
- snprintf(map->old_dir, sizeof(map->old_dir), "%s%s/%u", old_rel->tablespace,
- old_cluster.tablespace_suffix, old_db->db_oid);
- snprintf(map->new_dir, sizeof(map->new_dir), "%s%s/%u", new_rel->tablespace,
- new_cluster.tablespace_suffix, new_db->db_oid);
+ strlcpy(map->old_tablespace, old_rel->tablespace, sizeof(map->old_tablespace));
+ strlcpy(map->new_tablespace, new_rel->tablespace, sizeof(map->new_tablespace));
+ strlcpy(map->old_tablespace_suffix, old_cluster.tablespace_suffix,
+ sizeof(map->old_tablespace_suffix));
+ strlcpy(map->new_tablespace_suffix, new_cluster.tablespace_suffix,
+ sizeof(map->new_tablespace_suffix));
}
+ map->old_db_oid = old_db->db_oid;
+ map->new_db_oid = new_db->db_oid;
+
/*
* old_relfilenode might differ from pg_class.oid (and hence
* new_relfilenode) because of CLUSTER, REINDEX, or VACUUM FULL.
char log_file[MAXPGPATH];
char opt_log_file[MAXPGPATH];
char cmd[MAX_STRING];
-} thread_arg;
+} exec_thread_arg;
-thread_arg **thread_args;
+typedef struct {
+ DbInfoArr *old_db_arr;
+ DbInfoArr *new_db_arr;
+ char old_pgdata[MAXPGPATH];
+ char new_pgdata[MAXPGPATH];
+ char old_tablespace[MAXPGPATH];
+} transfer_thread_arg;
+
+exec_thread_arg **exec_thread_args;
+transfer_thread_arg **transfer_thread_args;
+
+/* track current thread_args struct so reap_child() can be used for all cases */
+void **cur_thread_args;
-DWORD win32_exec_prog(thread_arg *args);
+DWORD win32_exec_prog(exec_thread_arg *args);
+DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args);
#endif
pid_t child;
#else
HANDLE child;
- thread_arg *new_arg;
+ exec_thread_arg *new_arg;
#endif
va_start(args, fmt);
else
{
/* parallel */
-
+#ifdef WIN32
+ cur_thread_args = (void **)exec_thread_args;
+#endif
/* harvest any dead children */
while (reap_child(false) == true)
;
int i;
thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
- thread_args = pg_malloc(user_opts.jobs * sizeof(thread_arg *));
+ exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
/*
* For safety and performance, we keep the args allocated during
* in a thread different from the one that allocated it.
*/
for (i = 0; i < user_opts.jobs; i++)
- thread_args[i] = pg_malloc(sizeof(thread_arg));
+ exec_thread_args[i] = pg_malloc(sizeof(exec_thread_arg));
}
/* use first empty array element */
- new_arg = thread_args[parallel_jobs-1];
+ new_arg = exec_thread_args[parallel_jobs-1];
/* Can only pass one pointer into the function, so use a struct */
strcpy(new_arg->log_file, log_file);
#ifdef WIN32
DWORD
-win32_exec_prog(thread_arg *args)
+win32_exec_prog(exec_thread_arg *args)
{
int ret;
#endif
+/*
+ * parallel_transfer_all_new_dbs
+ *
+ * This has the same API as transfer_all_new_dbs, except it does parallel execution
+ * by transfering multiple tablespaces in parallel
+ */
+void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
+ char *old_pgdata, char *new_pgdata,
+ char *old_tablespace)
+{
+#ifndef WIN32
+ pid_t child;
+#else
+ HANDLE child;
+ transfer_thread_arg *new_arg;
+#endif
+
+ if (user_opts.jobs <= 1)
+ /* throw_error must be true to allow jobs */
+ transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
+ else
+ {
+ /* parallel */
+#ifdef WIN32
+ cur_thread_args = (void **)transfer_thread_args;
+#endif
+ /* harvest any dead children */
+ while (reap_child(false) == true)
+ ;
+
+ /* must we wait for a dead child? */
+ if (parallel_jobs >= user_opts.jobs)
+ reap_child(true);
+
+ /* set this before we start the job */
+ parallel_jobs++;
+
+ /* Ensure stdio state is quiesced before forking */
+ fflush(NULL);
+
+#ifndef WIN32
+ child = fork();
+ if (child == 0)
+ {
+ transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
+ old_tablespace);
+ /* if we take another exit path, it will be non-zero */
+ /* use _exit to skip atexit() functions */
+ _exit(0);
+ }
+ else if (child < 0)
+ /* fork failed */
+ pg_log(PG_FATAL, "could not create worker process: %s\n", strerror(errno));
+#else
+ if (thread_handles == NULL)
+ {
+ int i;
+
+ thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
+ transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
+
+ /*
+ * For safety and performance, we keep the args allocated during
+ * the entire life of the process, and we don't free the args
+ * in a thread different from the one that allocated it.
+ */
+ for (i = 0; i < user_opts.jobs; i++)
+ transfer_thread_args[i] = pg_malloc(sizeof(transfer_thread_arg));
+ }
+
+ /* use first empty array element */
+ new_arg = transfer_thread_args[parallel_jobs-1];
+
+ /* Can only pass one pointer into the function, so use a struct */
+ new_arg->old_db_arr = old_db_arr;
+ new_arg->new_db_arr = new_db_arr;
+ strcpy(new_arg->old_pgdata, old_pgdata);
+ strcpy(new_arg->new_pgdata, new_pgdata);
+ strcpy(new_arg->old_tablespace, old_tablespace);
+
+ child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
+ new_arg, 0, NULL);
+ if (child == 0)
+ pg_log(PG_FATAL, "could not create worker thread: %s\n", strerror(errno));
+
+ thread_handles[parallel_jobs-1] = child;
+#endif
+ }
+
+ return;
+}
+
+
+#ifdef WIN32
+DWORD
+win32_transfer_all_new_dbs(transfer_thread_arg *args)
+{
+ transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
+ args->new_pgdata, args->old_tablespace);
+
+ /* terminates thread */
+ return 0;
+}
+#endif
+
+
/*
* collect status from a completed worker child
*/
/* Move last slot into dead child's position */
if (thread_num != parallel_jobs - 1)
{
- thread_arg *tmp_args;
+ void *tmp_args;
thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
* reused by the next created thread. Instead, the new thread
* will use the arg struct of the thread that just died.
*/
- tmp_args = thread_args[thread_num];
- thread_args[thread_num] = thread_args[parallel_jobs - 1];
- thread_args[parallel_jobs - 1] = tmp_args;
+ tmp_args = cur_thread_args[thread_num];
+ cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
+ cur_thread_args[parallel_jobs - 1] = tmp_args;
}
#endif
if (user_opts.transfer_mode == TRANSFER_MODE_LINK)
disable_old_cluster();
- transfer_all_new_dbs(&old_cluster.dbarr, &new_cluster.dbarr,
+ transfer_all_new_tablespaces(&old_cluster.dbarr, &new_cluster.dbarr,
old_cluster.pgdata, new_cluster.pgdata);
/*
*/
typedef struct
{
- char old_dir[MAXPGPATH];
- char new_dir[MAXPGPATH];
+ char old_tablespace[MAXPGPATH];
+ char new_tablespace[MAXPGPATH];
+ char old_tablespace_suffix[MAXPGPATH];
+ char new_tablespace_suffix[MAXPGPATH];
+ Oid old_db_oid;
+ Oid new_db_oid;
/*
* old/new relfilenodes might differ for pg_largeobject(_metadata) indexes
const char *progname; /* complete pathname for this program */
char *exec_path; /* full path to my executable */
char *user; /* username for clusters */
- char **tablespaces; /* tablespaces */
- int num_tablespaces;
+ char **old_tablespaces; /* tablespaces */
+ int num_old_tablespaces;
char **libraries; /* loadable libraries */
int num_libraries;
ClusterInfo *running_cluster;
/* relfilenode.c */
void get_pg_database_relfilenode(ClusterInfo *cluster);
-void transfer_all_new_dbs(DbInfoArr *olddb_arr,
- DbInfoArr *newdb_arr, char *old_pgdata, char *new_pgdata);
-
+void transfer_all_new_tablespaces(DbInfoArr *old_db_arr,
+ DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata);
+void transfer_all_new_dbs(DbInfoArr *old_db_arr,
+ DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata,
+ char *old_tablespace);
/* tablespace.c */
char *old_8_3_create_sequence_script(ClusterInfo *cluster);
/* parallel.c */
-void parallel_exec_prog(const char *log_file, const char *opt_log_file,
+void parallel_exec_prog(const char *log_file, const char *opt_log_file,
const char *fmt,...)
__attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4)));
-
-bool reap_child(bool wait_for_child);
+void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
+ char *old_pgdata, char *new_pgdata,
+ char *old_tablespace);
+bool reap_child(bool wait_for_child);
static void transfer_single_new_db(pageCnvCtx *pageConverter,
- FileNameMap *maps, int size);
+ FileNameMap *maps, int size, char *old_tablespace);
static void transfer_relfile(pageCnvCtx *pageConverter, FileNameMap *map,
const char *suffix);
+/*
+ * transfer_all_new_tablespaces()
+ *
+ * Responsible for upgrading all database. invokes routines to generate mappings and then
+ * physically link the databases.
+ */
+void
+transfer_all_new_tablespaces(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
+ char *old_pgdata, char *new_pgdata)
+{
+ pg_log(PG_REPORT, "%s user relation files\n",
+ user_opts.transfer_mode == TRANSFER_MODE_LINK ? "Linking" : "Copying");
+
+ /*
+ * Transfering files by tablespace is tricky because a single database
+ * can use multiple tablespaces. For non-parallel mode, we just pass a
+ * NULL tablespace path, which matches all tablespaces. In parallel mode,
+ * we pass the default tablespace and all user-created tablespaces
+ * and let those operations happen in parallel.
+ */
+ if (user_opts.jobs <= 1)
+ parallel_transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata,
+ new_pgdata, NULL);
+ else
+ {
+ int tblnum;
+
+ /* transfer default tablespace */
+ parallel_transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata,
+ new_pgdata, old_pgdata);
+
+ for (tblnum = 0; tblnum < os_info.num_old_tablespaces; tblnum++)
+ parallel_transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata,
+ new_pgdata, os_info.old_tablespaces[tblnum]);
+ /* reap all children */
+ while (reap_child(true) == true)
+ ;
+ }
+
+ end_progress_output();
+ check_ok();
+
+ return;
+}
+
+
/*
* transfer_all_new_dbs()
*
* physically link the databases.
*/
void
-transfer_all_new_dbs(DbInfoArr *old_db_arr,
- DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata)
+transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
+ char *old_pgdata, char *new_pgdata, char *old_tablespace)
{
int old_dbnum,
new_dbnum;
- pg_log(PG_REPORT, "%s user relation files\n",
- user_opts.transfer_mode == TRANSFER_MODE_LINK ? "Linking" : "Copying");
-
/* Scan the old cluster databases and transfer their files */
for (old_dbnum = new_dbnum = 0;
old_dbnum < old_db_arr->ndbs;
#ifdef PAGE_CONVERSION
pageConverter = setupPageConverter();
#endif
- transfer_single_new_db(pageConverter, mappings, n_maps);
+ transfer_single_new_db(pageConverter, mappings, n_maps,
+ old_tablespace);
pg_free(mappings);
}
}
- end_progress_output();
- check_ok();
-
return;
}
*/
static void
transfer_single_new_db(pageCnvCtx *pageConverter,
- FileNameMap *maps, int size)
+ FileNameMap *maps, int size, char *old_tablespace)
{
int mapnum;
bool vm_crashsafe_match = true;
for (mapnum = 0; mapnum < size; mapnum++)
{
- /* transfer primary file */
- transfer_relfile(pageConverter, &maps[mapnum], "");
-
- /* fsm/vm files added in PG 8.4 */
- if (GET_MAJOR_VERSION(old_cluster.major_version) >= 804)
+ if (old_tablespace == NULL ||
+ strcmp(maps[mapnum].old_tablespace, old_tablespace) == 0)
{
- /*
- * Copy/link any fsm and vm files, if they exist
- */
- transfer_relfile(pageConverter, &maps[mapnum], "_fsm");
- if (vm_crashsafe_match)
- transfer_relfile(pageConverter, &maps[mapnum], "_vm");
+ /* transfer primary file */
+ transfer_relfile(pageConverter, &maps[mapnum], "");
+
+ /* fsm/vm files added in PG 8.4 */
+ if (GET_MAJOR_VERSION(old_cluster.major_version) >= 804)
+ {
+ /*
+ * Copy/link any fsm and vm files, if they exist
+ */
+ transfer_relfile(pageConverter, &maps[mapnum], "_fsm");
+ if (vm_crashsafe_match)
+ transfer_relfile(pageConverter, &maps[mapnum], "_vm");
+ }
}
}
}
else
snprintf(extent_suffix, sizeof(extent_suffix), ".%d", segno);
- snprintf(old_file, sizeof(old_file), "%s/%u%s%s", map->old_dir,
- map->old_relfilenode, type_suffix, extent_suffix);
- snprintf(new_file, sizeof(new_file), "%s/%u%s%s", map->new_dir,
- map->new_relfilenode, type_suffix, extent_suffix);
+ snprintf(old_file, sizeof(old_file), "%s%s/%u/%u%s%s", map->old_tablespace,
+ map->old_tablespace_suffix, map->old_db_oid, map->old_relfilenode,
+ type_suffix, extent_suffix);
+ snprintf(new_file, sizeof(new_file), "%s%s/%u/%u%s%s", map->new_tablespace,
+ map->new_tablespace_suffix, map->new_db_oid, map->new_relfilenode,
+ type_suffix, extent_suffix);
/* Is it an extent, fsm, or vm file? */
if (type_suffix[0] != '\0' || segno != 0)
return;
}
+
set_tablespace_directory_suffix(&old_cluster);
set_tablespace_directory_suffix(&new_cluster);
- if (os_info.num_tablespaces > 0 &&
+ if (os_info.num_old_tablespaces > 0 &&
strcmp(old_cluster.tablespace_suffix, new_cluster.tablespace_suffix) == 0)
pg_log(PG_FATAL,
"Cannot upgrade to/from the same system catalog version when\n"
res = executeQueryOrDie(conn, "%s", query);
- if ((os_info.num_tablespaces = PQntuples(res)) != 0)
- os_info.tablespaces = (char **) pg_malloc(
- os_info.num_tablespaces * sizeof(char *));
+ if ((os_info.num_old_tablespaces = PQntuples(res)) != 0)
+ os_info.old_tablespaces = (char **) pg_malloc(
+ os_info.num_old_tablespaces * sizeof(char *));
else
- os_info.tablespaces = NULL;
+ os_info.old_tablespaces = NULL;
i_spclocation = PQfnumber(res, "spclocation");
- for (tblnum = 0; tblnum < os_info.num_tablespaces; tblnum++)
- os_info.tablespaces[tblnum] = pg_strdup(
+ for (tblnum = 0; tblnum < os_info.num_old_tablespaces; tblnum++)
+ os_info.old_tablespaces[tblnum] = pg_strdup(
PQgetvalue(res, tblnum, i_spclocation));
PQclear(res);
<para>
The <option>--jobs</> option allows multiple CPU cores to be used
- to dump and reload database schemas in parallel; a good place to
- start is the number of CPU cores on the server. This option can
- dramatically reduce the time to upgrade a multi-database server
- running on a multiprocessor machine.
+ for copying/linking of files and to dump and reload database schemas
+ in parallel; a good place to start is the maximum of the number of
+ CPU cores and tablespaces. This option can dramatically reduce the
+ time to upgrade a multi-database server running on a multiprocessor
+ machine.
</para>
<para>