1 /*-------------------------------------------------------------------------
4 * Functions for fetching files from a remote server.
6 * Copyright (c) 2013-2019, PostgreSQL Global Development Group
8 *-------------------------------------------------------------------------
10 #include "postgres_fe.h"
17 #include "catalog/pg_type_d.h"
18 #include "datapagemap.h"
19 #include "fe_utils/connect.h"
23 #include "pg_rewind.h"
24 #include "port/pg_bswap.h"
29 * Files are fetched max CHUNKSIZE bytes at a time.
31 * (This only applies to files that are copied in whole, or for truncated
32 * files where we copy the tail. Relation files, where we know the individual
33 * blocks that need to be fetched, are fetched in BLCKSZ chunks.)
35 #define CHUNKSIZE 1000000
37 static void receiveFileChunks(const char *sql);
38 static void execute_pagemap(datapagemap_t *pagemap, const char *path);
39 static char *run_simple_query(const char *sql);
40 static void run_simple_command(const char *sql);
43 libpqConnect(const char *connstr)
48 conn = PQconnectdb(connstr);
49 if (PQstatus(conn) == CONNECTION_BAD)
50 pg_fatal("could not connect to server: %s",
51 PQerrorMessage(conn));
54 pg_log_info("connected to server");
56 /* disable all types of timeouts */
57 run_simple_command("SET statement_timeout = 0");
58 run_simple_command("SET lock_timeout = 0");
59 run_simple_command("SET idle_in_transaction_session_timeout = 0");
61 res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
62 if (PQresultStatus(res) != PGRES_TUPLES_OK)
63 pg_fatal("could not clear search_path: %s",
64 PQresultErrorMessage(res));
68 * Check that the server is not in hot standby mode. There is no
69 * fundamental reason that couldn't be made to work, but it doesn't
70 * currently because we use a temporary table. Better to check for it
71 * explicitly than error out, for a better error message.
73 str = run_simple_query("SELECT pg_is_in_recovery()");
74 if (strcmp(str, "f") != 0)
75 pg_fatal("source server must not be in recovery mode");
79 * Also check that full_page_writes is enabled. We can get torn pages if
80 * a page is modified while we read it with pg_read_binary_file(), and we
81 * rely on full page images to fix them.
83 str = run_simple_query("SHOW full_page_writes");
84 if (strcmp(str, "on") != 0)
85 pg_fatal("full_page_writes must be enabled in the source server");
89 * Although we don't do any "real" updates, we do work with a temporary
90 * table. We don't care about synchronous commit for that. It doesn't
91 * otherwise matter much, but if the server is using synchronous
92 * replication, and replication isn't working for some reason, we don't
93 * want to get stuck, waiting for it to start working again.
95 run_simple_command("SET synchronous_commit = off");
99 * Runs a query that returns a single value.
100 * The result should be pg_free'd after use.
103 run_simple_query(const char *sql)
108 res = PQexec(conn, sql);
110 if (PQresultStatus(res) != PGRES_TUPLES_OK)
111 pg_fatal("error running query (%s) in source server: %s",
112 sql, PQresultErrorMessage(res));
114 /* sanity check the result set */
115 if (PQnfields(res) != 1 || PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
116 pg_fatal("unexpected result set from query");
118 result = pg_strdup(PQgetvalue(res, 0, 0));
127 * In the event of a failure, exit immediately.
130 run_simple_command(const char *sql)
134 res = PQexec(conn, sql);
136 if (PQresultStatus(res) != PGRES_COMMAND_OK)
137 pg_fatal("error running query (%s) in source server: %s",
138 sql, PQresultErrorMessage(res));
144 * Calls pg_current_wal_insert_lsn() function
147 libpqGetCurrentXlogInsertLocation(void)
154 val = run_simple_query("SELECT pg_current_wal_insert_lsn()");
156 if (sscanf(val, "%X/%X", &hi, &lo) != 2)
157 pg_fatal("unrecognized result \"%s\" for current WAL insert location", val);
159 result = ((uint64) hi) << 32 | lo;
167 * Get a list of all files in the data directory.
170 libpqProcessFileList(void)
177 * Create a recursive directory listing of the whole data directory.
179 * The WITH RECURSIVE part does most of the work. The second part gets the
180 * targets of the symlinks in pg_tblspc directory.
182 * XXX: There is no backend function to get a symbolic link's target in
183 * general, so if the admin has put any custom symbolic links in the data
184 * directory, they won't be copied correctly.
187 "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
188 " SELECT '' AS path, filename, size, isdir FROM\n"
189 " (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
190 " pg_stat_file(fn.filename, true) AS this\n"
192 " SELECT parent.path || parent.filename || '/' AS path,\n"
193 " fn, this.size, this.isdir\n"
194 " FROM files AS parent,\n"
195 " pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
196 " pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
197 " WHERE parent.isdir = 't'\n"
199 "SELECT path || filename, size, isdir,\n"
200 " pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
202 "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
203 " AND oid::text = files.filename\n";
204 res = PQexec(conn, sql);
206 if (PQresultStatus(res) != PGRES_TUPLES_OK)
207 pg_fatal("could not fetch file list: %s",
208 PQresultErrorMessage(res));
210 /* sanity check the result set */
211 if (PQnfields(res) != 4)
212 pg_fatal("unexpected result set while fetching file list");
214 /* Read result to local variables */
215 for (i = 0; i < PQntuples(res); i++)
217 char *path = PQgetvalue(res, i, 0);
218 int64 filesize = atol(PQgetvalue(res, i, 1));
219 bool isdir = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
220 char *link_target = PQgetvalue(res, i, 3);
223 if (PQgetisnull(res, 0, 1))
226 * The file was removed from the server while the query was
227 * running. Ignore it.
233 type = FILE_TYPE_SYMLINK;
235 type = FILE_TYPE_DIRECTORY;
237 type = FILE_TYPE_REGULAR;
239 process_source_file(path, type, filesize, link_target);
245 * Runs a query, which returns pieces of files from the remote source data
246 * directory, and overwrites the corresponding parts of target files with
247 * the received parts. The result set is expected to be of format:
249 * path text -- path in the data directory, e.g "base/1/123"
250 * begin int8 -- offset within the file
251 * chunk bytea -- file content
255 receiveFileChunks(const char *sql)
259 if (PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
260 pg_fatal("could not send query: %s", PQerrorMessage(conn));
262 pg_log_debug("getting file chunks");
264 if (PQsetSingleRowMode(conn) != 1)
265 pg_fatal("could not set libpq connection to single row mode");
267 while ((res = PQgetResult(conn)) != NULL)
275 switch (PQresultStatus(res))
277 case PGRES_SINGLE_TUPLE:
280 case PGRES_TUPLES_OK:
282 continue; /* final zero-row result */
285 pg_fatal("unexpected result while fetching remote files: %s",
286 PQresultErrorMessage(res));
289 /* sanity check the result set */
290 if (PQnfields(res) != 3 || PQntuples(res) != 1)
291 pg_fatal("unexpected result set size while fetching remote files");
293 if (PQftype(res, 0) != TEXTOID ||
294 PQftype(res, 1) != INT8OID ||
295 PQftype(res, 2) != BYTEAOID)
297 pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u",
298 PQftype(res, 0), PQftype(res, 1), PQftype(res, 2));
301 if (PQfformat(res, 0) != 1 &&
302 PQfformat(res, 1) != 1 &&
303 PQfformat(res, 2) != 1)
305 pg_fatal("unexpected result format while fetching remote files");
308 if (PQgetisnull(res, 0, 0) ||
309 PQgetisnull(res, 0, 1))
311 pg_fatal("unexpected null values in result while fetching remote files");
314 if (PQgetlength(res, 0, 1) != sizeof(int64))
315 pg_fatal("unexpected result length while fetching remote files");
317 /* Read result set to local variables */
318 memcpy(&chunkoff, PQgetvalue(res, 0, 1), sizeof(int64));
319 chunkoff = pg_ntoh64(chunkoff);
320 chunksize = PQgetlength(res, 0, 2);
322 filenamelen = PQgetlength(res, 0, 0);
323 filename = pg_malloc(filenamelen + 1);
324 memcpy(filename, PQgetvalue(res, 0, 0), filenamelen);
325 filename[filenamelen] = '\0';
327 chunk = PQgetvalue(res, 0, 2);
330 * If a file has been deleted on the source, remove it on the target
331 * as well. Note that multiple unlink() calls may happen on the same
332 * file if multiple data chunks are associated with it, hence ignore
333 * unconditionally anything missing. If this file is not a relation
334 * data file, then it has been already truncated when creating the
335 * file chunk list at the previous execution of the filemap.
337 if (PQgetisnull(res, 0, 2))
339 pg_log_debug("received null value for chunk for file \"%s\", file has been deleted",
341 remove_target_file(filename, true);
347 pg_log_debug("received chunk for file \"%s\", offset %lld, size %d",
348 filename, (long long int) chunkoff, chunksize);
350 open_target_file(filename, false);
352 write_target_range(chunk, chunkoff, chunksize);
361 * Receive a single file as a malloc'd buffer.
364 libpqGetFile(const char *filename, size_t *filesize)
369 const char *paramValues[1];
371 paramValues[0] = filename;
372 res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
373 1, NULL, paramValues, NULL, NULL, 1);
375 if (PQresultStatus(res) != PGRES_TUPLES_OK)
376 pg_fatal("could not fetch remote file \"%s\": %s",
377 filename, PQresultErrorMessage(res));
379 /* sanity check the result set */
380 if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
381 pg_fatal("unexpected result set while fetching remote file \"%s\"",
384 /* Read result to local variables */
385 len = PQgetlength(res, 0, 0);
386 result = pg_malloc(len + 1);
387 memcpy(result, PQgetvalue(res, 0, 0), len);
392 pg_log_debug("fetched file \"%s\", length %d", filename, len);
400 * Write a file range to a temporary table in the server.
402 * The range is sent to the server as a COPY formatted line, to be inserted
403 * into the 'fetchchunks' temporary table. It is used in receiveFileChunks()
404 * function to actually fetch the data.
407 fetch_file_range(const char *path, uint64 begin, uint64 end)
409 char linebuf[MAXPGPATH + 23];
411 /* Split the range into CHUNKSIZE chunks */
412 while (end - begin > 0)
416 /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
417 if (end - begin > CHUNKSIZE)
420 len = (unsigned int) (end - begin);
422 snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
424 if (PQputCopyData(conn, linebuf, strlen(linebuf)) != 1)
425 pg_fatal("could not send COPY data: %s",
426 PQerrorMessage(conn));
433 * Fetch all changed blocks from remote source data directory.
436 libpq_executeFileMap(filemap_t *map)
444 * First create a temporary table, and load it with the blocks that we
447 sql = "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4);";
448 run_simple_command(sql);
450 sql = "COPY fetchchunks FROM STDIN";
451 res = PQexec(conn, sql);
453 if (PQresultStatus(res) != PGRES_COPY_IN)
454 pg_fatal("could not send file list: %s",
455 PQresultErrorMessage(res));
458 for (i = 0; i < map->narray; i++)
460 entry = map->array[i];
462 /* If this is a relation file, copy the modified blocks */
463 execute_pagemap(&entry->pagemap, entry->path);
465 switch (entry->action)
467 case FILE_ACTION_NONE:
468 /* nothing else to do */
471 case FILE_ACTION_COPY:
472 /* Truncate the old file out of the way, if any */
473 open_target_file(entry->path, true);
474 fetch_file_range(entry->path, 0, entry->newsize);
477 case FILE_ACTION_TRUNCATE:
478 truncate_target_file(entry->path, entry->newsize);
481 case FILE_ACTION_COPY_TAIL:
482 fetch_file_range(entry->path, entry->oldsize, entry->newsize);
485 case FILE_ACTION_REMOVE:
486 remove_target(entry);
489 case FILE_ACTION_CREATE:
490 create_target(entry);
495 if (PQputCopyEnd(conn, NULL) != 1)
496 pg_fatal("could not send end-of-COPY: %s",
497 PQerrorMessage(conn));
499 while ((res = PQgetResult(conn)) != NULL)
501 if (PQresultStatus(res) != PGRES_COMMAND_OK)
502 pg_fatal("unexpected result while sending file list: %s",
503 PQresultErrorMessage(res));
508 * We've now copied the list of file ranges that we need to fetch to the
509 * temporary table. Now, actually fetch all of those ranges.
512 "SELECT path, begin,\n"
513 " pg_read_binary_file(path, begin, len, true) AS chunk\n"
514 "FROM fetchchunks\n";
516 receiveFileChunks(sql);
520 execute_pagemap(datapagemap_t *pagemap, const char *path)
522 datapagemap_iterator_t *iter;
526 iter = datapagemap_iterate(pagemap);
527 while (datapagemap_next(iter, &blkno))
529 offset = blkno * BLCKSZ;
531 fetch_file_range(path, offset, offset + BLCKSZ);