]> granicus.if.org Git - postgresql/blob - src/bin/pg_rewind/libpq_fetch.c
Make the order of the header file includes consistent in non-backend modules.
[postgresql] / src / bin / pg_rewind / libpq_fetch.c
1 /*-------------------------------------------------------------------------
2  *
3  * libpq_fetch.c
4  *        Functions for fetching files from a remote server.
5  *
6  * Copyright (c) 2013-2019, PostgreSQL Global Development Group
7  *
8  *-------------------------------------------------------------------------
9  */
10 #include "postgres_fe.h"
11
12 #include <sys/stat.h>
13 #include <dirent.h>
14 #include <fcntl.h>
15 #include <unistd.h>
16
17 #include "catalog/pg_type_d.h"
18 #include "datapagemap.h"
19 #include "fe_utils/connect.h"
20 #include "fetch.h"
21 #include "file_ops.h"
22 #include "filemap.h"
23 #include "pg_rewind.h"
24 #include "port/pg_bswap.h"
25
26 PGconn *conn = NULL;
27
28 /*
29  * Files are fetched max CHUNKSIZE bytes at a time.
30  *
31  * (This only applies to files that are copied in whole, or for truncated
32  * files where we copy the tail. Relation files, where we know the individual
33  * blocks that need to be fetched, are fetched in BLCKSZ chunks.)
34  */
35 #define CHUNKSIZE 1000000
36
37 static void receiveFileChunks(const char *sql);
38 static void execute_pagemap(datapagemap_t *pagemap, const char *path);
39 static char *run_simple_query(const char *sql);
40 static void run_simple_command(const char *sql);
41
42 void
43 libpqConnect(const char *connstr)
44 {
45         char       *str;
46         PGresult   *res;
47
48         conn = PQconnectdb(connstr);
49         if (PQstatus(conn) == CONNECTION_BAD)
50                 pg_fatal("could not connect to server: %s",
51                                  PQerrorMessage(conn));
52
53         if (showprogress)
54                 pg_log_info("connected to server");
55
56         /* disable all types of timeouts */
57         run_simple_command("SET statement_timeout = 0");
58         run_simple_command("SET lock_timeout = 0");
59         run_simple_command("SET idle_in_transaction_session_timeout = 0");
60
61         res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
62         if (PQresultStatus(res) != PGRES_TUPLES_OK)
63                 pg_fatal("could not clear search_path: %s",
64                                  PQresultErrorMessage(res));
65         PQclear(res);
66
67         /*
68          * Check that the server is not in hot standby mode. There is no
69          * fundamental reason that couldn't be made to work, but it doesn't
70          * currently because we use a temporary table. Better to check for it
71          * explicitly than error out, for a better error message.
72          */
73         str = run_simple_query("SELECT pg_is_in_recovery()");
74         if (strcmp(str, "f") != 0)
75                 pg_fatal("source server must not be in recovery mode");
76         pg_free(str);
77
78         /*
79          * Also check that full_page_writes is enabled.  We can get torn pages if
80          * a page is modified while we read it with pg_read_binary_file(), and we
81          * rely on full page images to fix them.
82          */
83         str = run_simple_query("SHOW full_page_writes");
84         if (strcmp(str, "on") != 0)
85                 pg_fatal("full_page_writes must be enabled in the source server");
86         pg_free(str);
87
88         /*
89          * Although we don't do any "real" updates, we do work with a temporary
90          * table. We don't care about synchronous commit for that. It doesn't
91          * otherwise matter much, but if the server is using synchronous
92          * replication, and replication isn't working for some reason, we don't
93          * want to get stuck, waiting for it to start working again.
94          */
95         run_simple_command("SET synchronous_commit = off");
96 }
97
98 /*
99  * Runs a query that returns a single value.
100  * The result should be pg_free'd after use.
101  */
102 static char *
103 run_simple_query(const char *sql)
104 {
105         PGresult   *res;
106         char       *result;
107
108         res = PQexec(conn, sql);
109
110         if (PQresultStatus(res) != PGRES_TUPLES_OK)
111                 pg_fatal("error running query (%s) in source server: %s",
112                                  sql, PQresultErrorMessage(res));
113
114         /* sanity check the result set */
115         if (PQnfields(res) != 1 || PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
116                 pg_fatal("unexpected result set from query");
117
118         result = pg_strdup(PQgetvalue(res, 0, 0));
119
120         PQclear(res);
121
122         return result;
123 }
124
125 /*
126  * Runs a command.
127  * In the event of a failure, exit immediately.
128  */
129 static void
130 run_simple_command(const char *sql)
131 {
132         PGresult   *res;
133
134         res = PQexec(conn, sql);
135
136         if (PQresultStatus(res) != PGRES_COMMAND_OK)
137                 pg_fatal("error running query (%s) in source server: %s",
138                                  sql, PQresultErrorMessage(res));
139
140         PQclear(res);
141 }
142
143 /*
144  * Calls pg_current_wal_insert_lsn() function
145  */
146 XLogRecPtr
147 libpqGetCurrentXlogInsertLocation(void)
148 {
149         XLogRecPtr      result;
150         uint32          hi;
151         uint32          lo;
152         char       *val;
153
154         val = run_simple_query("SELECT pg_current_wal_insert_lsn()");
155
156         if (sscanf(val, "%X/%X", &hi, &lo) != 2)
157                 pg_fatal("unrecognized result \"%s\" for current WAL insert location", val);
158
159         result = ((uint64) hi) << 32 | lo;
160
161         pg_free(val);
162
163         return result;
164 }
165
166 /*
167  * Get a list of all files in the data directory.
168  */
169 void
170 libpqProcessFileList(void)
171 {
172         PGresult   *res;
173         const char *sql;
174         int                     i;
175
176         /*
177          * Create a recursive directory listing of the whole data directory.
178          *
179          * The WITH RECURSIVE part does most of the work. The second part gets the
180          * targets of the symlinks in pg_tblspc directory.
181          *
182          * XXX: There is no backend function to get a symbolic link's target in
183          * general, so if the admin has put any custom symbolic links in the data
184          * directory, they won't be copied correctly.
185          */
186         sql =
187                 "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
188                 "  SELECT '' AS path, filename, size, isdir FROM\n"
189                 "  (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
190                 "        pg_stat_file(fn.filename, true) AS this\n"
191                 "  UNION ALL\n"
192                 "  SELECT parent.path || parent.filename || '/' AS path,\n"
193                 "         fn, this.size, this.isdir\n"
194                 "  FROM files AS parent,\n"
195                 "       pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
196                 "       pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
197                 "       WHERE parent.isdir = 't'\n"
198                 ")\n"
199                 "SELECT path || filename, size, isdir,\n"
200                 "       pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
201                 "FROM files\n"
202                 "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
203                 "                             AND oid::text = files.filename\n";
204         res = PQexec(conn, sql);
205
206         if (PQresultStatus(res) != PGRES_TUPLES_OK)
207                 pg_fatal("could not fetch file list: %s",
208                                  PQresultErrorMessage(res));
209
210         /* sanity check the result set */
211         if (PQnfields(res) != 4)
212                 pg_fatal("unexpected result set while fetching file list");
213
214         /* Read result to local variables */
215         for (i = 0; i < PQntuples(res); i++)
216         {
217                 char       *path = PQgetvalue(res, i, 0);
218                 int64           filesize = atol(PQgetvalue(res, i, 1));
219                 bool            isdir = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
220                 char       *link_target = PQgetvalue(res, i, 3);
221                 file_type_t type;
222
223                 if (PQgetisnull(res, 0, 1))
224                 {
225                         /*
226                          * The file was removed from the server while the query was
227                          * running. Ignore it.
228                          */
229                         continue;
230                 }
231
232                 if (link_target[0])
233                         type = FILE_TYPE_SYMLINK;
234                 else if (isdir)
235                         type = FILE_TYPE_DIRECTORY;
236                 else
237                         type = FILE_TYPE_REGULAR;
238
239                 process_source_file(path, type, filesize, link_target);
240         }
241         PQclear(res);
242 }
243
244 /*----
245  * Runs a query, which returns pieces of files from the remote source data
246  * directory, and overwrites the corresponding parts of target files with
247  * the received parts. The result set is expected to be of format:
248  *
249  * path         text    -- path in the data directory, e.g "base/1/123"
250  * begin        int8    -- offset within the file
251  * chunk        bytea   -- file content
252  *----
253  */
254 static void
255 receiveFileChunks(const char *sql)
256 {
257         PGresult   *res;
258
259         if (PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
260                 pg_fatal("could not send query: %s", PQerrorMessage(conn));
261
262         pg_log_debug("getting file chunks");
263
264         if (PQsetSingleRowMode(conn) != 1)
265                 pg_fatal("could not set libpq connection to single row mode");
266
267         while ((res = PQgetResult(conn)) != NULL)
268         {
269                 char       *filename;
270                 int                     filenamelen;
271                 int64           chunkoff;
272                 int                     chunksize;
273                 char       *chunk;
274
275                 switch (PQresultStatus(res))
276                 {
277                         case PGRES_SINGLE_TUPLE:
278                                 break;
279
280                         case PGRES_TUPLES_OK:
281                                 PQclear(res);
282                                 continue;               /* final zero-row result */
283
284                         default:
285                                 pg_fatal("unexpected result while fetching remote files: %s",
286                                                  PQresultErrorMessage(res));
287                 }
288
289                 /* sanity check the result set */
290                 if (PQnfields(res) != 3 || PQntuples(res) != 1)
291                         pg_fatal("unexpected result set size while fetching remote files");
292
293                 if (PQftype(res, 0) != TEXTOID ||
294                         PQftype(res, 1) != INT8OID ||
295                         PQftype(res, 2) != BYTEAOID)
296                 {
297                         pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u",
298                                          PQftype(res, 0), PQftype(res, 1), PQftype(res, 2));
299                 }
300
301                 if (PQfformat(res, 0) != 1 &&
302                         PQfformat(res, 1) != 1 &&
303                         PQfformat(res, 2) != 1)
304                 {
305                         pg_fatal("unexpected result format while fetching remote files");
306                 }
307
308                 if (PQgetisnull(res, 0, 0) ||
309                         PQgetisnull(res, 0, 1))
310                 {
311                         pg_fatal("unexpected null values in result while fetching remote files");
312                 }
313
314                 if (PQgetlength(res, 0, 1) != sizeof(int64))
315                         pg_fatal("unexpected result length while fetching remote files");
316
317                 /* Read result set to local variables */
318                 memcpy(&chunkoff, PQgetvalue(res, 0, 1), sizeof(int64));
319                 chunkoff = pg_ntoh64(chunkoff);
320                 chunksize = PQgetlength(res, 0, 2);
321
322                 filenamelen = PQgetlength(res, 0, 0);
323                 filename = pg_malloc(filenamelen + 1);
324                 memcpy(filename, PQgetvalue(res, 0, 0), filenamelen);
325                 filename[filenamelen] = '\0';
326
327                 chunk = PQgetvalue(res, 0, 2);
328
329                 /*
330                  * If a file has been deleted on the source, remove it on the target
331                  * as well.  Note that multiple unlink() calls may happen on the same
332                  * file if multiple data chunks are associated with it, hence ignore
333                  * unconditionally anything missing.  If this file is not a relation
334                  * data file, then it has been already truncated when creating the
335                  * file chunk list at the previous execution of the filemap.
336                  */
337                 if (PQgetisnull(res, 0, 2))
338                 {
339                         pg_log_debug("received null value for chunk for file \"%s\", file has been deleted",
340                                                  filename);
341                         remove_target_file(filename, true);
342                         pg_free(filename);
343                         PQclear(res);
344                         continue;
345                 }
346
347                 pg_log_debug("received chunk for file \"%s\", offset %lld, size %d",
348                                          filename, (long long int) chunkoff, chunksize);
349
350                 open_target_file(filename, false);
351
352                 write_target_range(chunk, chunkoff, chunksize);
353
354                 pg_free(filename);
355
356                 PQclear(res);
357         }
358 }
359
360 /*
361  * Receive a single file as a malloc'd buffer.
362  */
363 char *
364 libpqGetFile(const char *filename, size_t *filesize)
365 {
366         PGresult   *res;
367         char       *result;
368         int                     len;
369         const char *paramValues[1];
370
371         paramValues[0] = filename;
372         res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
373                                            1, NULL, paramValues, NULL, NULL, 1);
374
375         if (PQresultStatus(res) != PGRES_TUPLES_OK)
376                 pg_fatal("could not fetch remote file \"%s\": %s",
377                                  filename, PQresultErrorMessage(res));
378
379         /* sanity check the result set */
380         if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
381                 pg_fatal("unexpected result set while fetching remote file \"%s\"",
382                                  filename);
383
384         /* Read result to local variables */
385         len = PQgetlength(res, 0, 0);
386         result = pg_malloc(len + 1);
387         memcpy(result, PQgetvalue(res, 0, 0), len);
388         result[len] = '\0';
389
390         PQclear(res);
391
392         pg_log_debug("fetched file \"%s\", length %d", filename, len);
393
394         if (filesize)
395                 *filesize = len;
396         return result;
397 }
398
399 /*
400  * Write a file range to a temporary table in the server.
401  *
402  * The range is sent to the server as a COPY formatted line, to be inserted
403  * into the 'fetchchunks' temporary table. It is used in receiveFileChunks()
404  * function to actually fetch the data.
405  */
406 static void
407 fetch_file_range(const char *path, uint64 begin, uint64 end)
408 {
409         char            linebuf[MAXPGPATH + 23];
410
411         /* Split the range into CHUNKSIZE chunks */
412         while (end - begin > 0)
413         {
414                 unsigned int len;
415
416                 /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
417                 if (end - begin > CHUNKSIZE)
418                         len = CHUNKSIZE;
419                 else
420                         len = (unsigned int) (end - begin);
421
422                 snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
423
424                 if (PQputCopyData(conn, linebuf, strlen(linebuf)) != 1)
425                         pg_fatal("could not send COPY data: %s",
426                                          PQerrorMessage(conn));
427
428                 begin += len;
429         }
430 }
431
432 /*
433  * Fetch all changed blocks from remote source data directory.
434  */
435 void
436 libpq_executeFileMap(filemap_t *map)
437 {
438         file_entry_t *entry;
439         const char *sql;
440         PGresult   *res;
441         int                     i;
442
443         /*
444          * First create a temporary table, and load it with the blocks that we
445          * need to fetch.
446          */
447         sql = "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4);";
448         run_simple_command(sql);
449
450         sql = "COPY fetchchunks FROM STDIN";
451         res = PQexec(conn, sql);
452
453         if (PQresultStatus(res) != PGRES_COPY_IN)
454                 pg_fatal("could not send file list: %s",
455                                  PQresultErrorMessage(res));
456         PQclear(res);
457
458         for (i = 0; i < map->narray; i++)
459         {
460                 entry = map->array[i];
461
462                 /* If this is a relation file, copy the modified blocks */
463                 execute_pagemap(&entry->pagemap, entry->path);
464
465                 switch (entry->action)
466                 {
467                         case FILE_ACTION_NONE:
468                                 /* nothing else to do */
469                                 break;
470
471                         case FILE_ACTION_COPY:
472                                 /* Truncate the old file out of the way, if any */
473                                 open_target_file(entry->path, true);
474                                 fetch_file_range(entry->path, 0, entry->newsize);
475                                 break;
476
477                         case FILE_ACTION_TRUNCATE:
478                                 truncate_target_file(entry->path, entry->newsize);
479                                 break;
480
481                         case FILE_ACTION_COPY_TAIL:
482                                 fetch_file_range(entry->path, entry->oldsize, entry->newsize);
483                                 break;
484
485                         case FILE_ACTION_REMOVE:
486                                 remove_target(entry);
487                                 break;
488
489                         case FILE_ACTION_CREATE:
490                                 create_target(entry);
491                                 break;
492                 }
493         }
494
495         if (PQputCopyEnd(conn, NULL) != 1)
496                 pg_fatal("could not send end-of-COPY: %s",
497                                  PQerrorMessage(conn));
498
499         while ((res = PQgetResult(conn)) != NULL)
500         {
501                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
502                         pg_fatal("unexpected result while sending file list: %s",
503                                          PQresultErrorMessage(res));
504                 PQclear(res);
505         }
506
507         /*
508          * We've now copied the list of file ranges that we need to fetch to the
509          * temporary table. Now, actually fetch all of those ranges.
510          */
511         sql =
512                 "SELECT path, begin,\n"
513                 "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
514                 "FROM fetchchunks\n";
515
516         receiveFileChunks(sql);
517 }
518
519 static void
520 execute_pagemap(datapagemap_t *pagemap, const char *path)
521 {
522         datapagemap_iterator_t *iter;
523         BlockNumber blkno;
524         off_t           offset;
525
526         iter = datapagemap_iterate(pagemap);
527         while (datapagemap_next(iter, &blkno))
528         {
529                 offset = blkno * BLCKSZ;
530
531                 fetch_file_range(path, offset, offset + BLCKSZ);
532         }
533         pg_free(iter);
534 }