]> granicus.if.org Git - postgresql/blob - src/bin/pg_rewind/libpq_fetch.c
pg_rewind: Improve some messages
[postgresql] / src / bin / pg_rewind / libpq_fetch.c
1 /*-------------------------------------------------------------------------
2  *
3  * libpq_fetch.c
4  *        Functions for fetching files from a remote server.
5  *
6  * Copyright (c) 2013-2015, PostgreSQL Global Development Group
7  *
8  *-------------------------------------------------------------------------
9  */
10 #include "postgres_fe.h"
11
12 #include <sys/types.h>
13 #include <sys/stat.h>
14 #include <dirent.h>
15 #include <fcntl.h>
16 #include <unistd.h>
17
18 /* for ntohl/htonl */
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
21
22 #include "pg_rewind.h"
23 #include "datapagemap.h"
24 #include "fetch.h"
25 #include "file_ops.h"
26 #include "filemap.h"
27 #include "logging.h"
28
29 #include "libpq-fe.h"
30 #include "catalog/catalog.h"
31 #include "catalog/pg_type.h"
32
33 static PGconn *conn = NULL;
34
35 /*
36  * Files are fetched max CHUNKSIZE bytes at a time.
37  *
38  * (This only applies to files that are copied in whole, or for truncated
39  * files where we copy the tail. Relation files, where we know the individual
40  * blocks that need to be fetched, are fetched in BLCKSZ chunks.)
41  */
42 #define CHUNKSIZE 1000000
43
44 static void receiveFileChunks(const char *sql);
45 static void execute_pagemap(datapagemap_t *pagemap, const char *path);
46 static char *run_simple_query(const char *sql);
47
48 void
49 libpqConnect(const char *connstr)
50 {
51         char       *str;
52
53         conn = PQconnectdb(connstr);
54         if (PQstatus(conn) == CONNECTION_BAD)
55                 pg_fatal("could not connect to server: %s",
56                                  PQerrorMessage(conn));
57
58         pg_log(PG_PROGRESS, "connected to server\n");
59
60         /*
61          * Check that the server is not in hot standby mode. There is no
62          * fundamental reason that couldn't be made to work, but it doesn't
63          * currently because we use a temporary table. Better to check for it
64          * explicitly than error out, for a better error message.
65          */
66         str = run_simple_query("SELECT pg_is_in_recovery()");
67         if (strcmp(str, "f") != 0)
68                 pg_fatal("source server must not be in recovery mode\n");
69         pg_free(str);
70
71         /*
72          * Also check that full_page_writes is enabled.  We can get torn pages if
73          * a page is modified while we read it with pg_read_binary_file(), and we
74          * rely on full page images to fix them.
75          */
76         str = run_simple_query("SHOW full_page_writes");
77         if (strcmp(str, "on") != 0)
78                 pg_fatal("full_page_writes must be enabled in the source server\n");
79         pg_free(str);
80 }
81
82 /*
83  * Runs a query that returns a single value.
84  * The result should be pg_free'd after use.
85  */
86 static char *
87 run_simple_query(const char *sql)
88 {
89         PGresult   *res;
90         char       *result;
91
92         res = PQexec(conn, sql);
93
94         if (PQresultStatus(res) != PGRES_TUPLES_OK)
95                 pg_fatal("error running query (%s) in source server: %s",
96                                  sql, PQresultErrorMessage(res));
97
98         /* sanity check the result set */
99         if (PQnfields(res) != 1 || PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
100                 pg_fatal("unexpected result set from query\n");
101
102         result = pg_strdup(PQgetvalue(res, 0, 0));
103
104         PQclear(res);
105
106         return result;
107 }
108
109 /*
110  * Calls pg_current_xlog_insert_location() function
111  */
112 XLogRecPtr
113 libpqGetCurrentXlogInsertLocation(void)
114 {
115         XLogRecPtr      result;
116         uint32          hi;
117         uint32          lo;
118         char       *val;
119
120         val = run_simple_query("SELECT pg_current_xlog_insert_location()");
121
122         if (sscanf(val, "%X/%X", &hi, &lo) != 2)
123                 pg_fatal("unrecognized result \"%s\" for current WAL insert location\n", val);
124
125         result = ((uint64) hi) << 32 | lo;
126
127         pg_free(val);
128
129         return result;
130 }
131
132 /*
133  * Get a list of all files in the data directory.
134  */
135 void
136 libpqProcessFileList(void)
137 {
138         PGresult   *res;
139         const char *sql;
140         int                     i;
141
142         /*
143          * Create a recursive directory listing of the whole data directory.
144          *
145          * The WITH RECURSIVE part does most of the work. The second part gets the
146          * targets of the symlinks in pg_tblspc directory.
147          *
148          * XXX: There is no backend function to get a symbolic link's target in
149          * general, so if the admin has put any custom symbolic links in the data
150          * directory, they won't be copied correctly.
151          */
152         sql =
153                 "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
154                 "  SELECT '' AS path, filename, size, isdir FROM\n"
155                 "  (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
156                 "        pg_stat_file(fn.filename, true) AS this\n"
157                 "  UNION ALL\n"
158                 "  SELECT parent.path || parent.filename || '/' AS path,\n"
159                 "         fn, this.size, this.isdir\n"
160                 "  FROM files AS parent,\n"
161                 "       pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
162                 "       pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
163                 "       WHERE parent.isdir = 't'\n"
164                 ")\n"
165                 "SELECT path || filename, size, isdir,\n"
166                 "       pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
167                 "FROM files\n"
168                 "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
169                 "                             AND oid::text = files.filename\n";
170         res = PQexec(conn, sql);
171
172         if (PQresultStatus(res) != PGRES_TUPLES_OK)
173                 pg_fatal("could not fetch file list: %s",
174                                  PQresultErrorMessage(res));
175
176         /* sanity check the result set */
177         if (PQnfields(res) != 4)
178                 pg_fatal("unexpected result set while fetching file list\n");
179
180         /* Read result to local variables */
181         for (i = 0; i < PQntuples(res); i++)
182         {
183                 char       *path = PQgetvalue(res, i, 0);
184                 int                     filesize = atoi(PQgetvalue(res, i, 1));
185                 bool            isdir = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
186                 char       *link_target = PQgetvalue(res, i, 3);
187                 file_type_t type;
188
189                 if (PQgetisnull(res, 0, 1))
190                 {
191                         /*
192                          * The file was removed from the server while the query was
193                          * running. Ignore it.
194                          */
195                         continue;
196                 }
197
198                 if (link_target[0])
199                         type = FILE_TYPE_SYMLINK;
200                 else if (isdir)
201                         type = FILE_TYPE_DIRECTORY;
202                 else
203                         type = FILE_TYPE_REGULAR;
204
205                 process_source_file(path, type, filesize, link_target);
206         }
207         PQclear(res);
208 }
209
210 /*----
211  * Runs a query, which returns pieces of files from the remote source data
212  * directory, and overwrites the corresponding parts of target files with
213  * the received parts. The result set is expected to be of format:
214  *
215  * path         text    -- path in the data directory, e.g "base/1/123"
216  * begin        int4    -- offset within the file
217  * chunk        bytea   -- file content
218  *----
219  */
220 static void
221 receiveFileChunks(const char *sql)
222 {
223         PGresult   *res;
224
225         if (PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
226                 pg_fatal("could not send query: %s", PQerrorMessage(conn));
227
228         pg_log(PG_DEBUG, "getting file chunks\n");
229
230         if (PQsetSingleRowMode(conn) != 1)
231                 pg_fatal("could not set libpq connection to single row mode\n");
232
233         while ((res = PQgetResult(conn)) != NULL)
234         {
235                 char       *filename;
236                 int                     filenamelen;
237                 int                     chunkoff;
238                 int                     chunksize;
239                 char       *chunk;
240
241                 switch (PQresultStatus(res))
242                 {
243                         case PGRES_SINGLE_TUPLE:
244                                 break;
245
246                         case PGRES_TUPLES_OK:
247                                 PQclear(res);
248                                 continue;               /* final zero-row result */
249
250                         default:
251                                 pg_fatal("unexpected result while fetching remote files: %s",
252                                                  PQresultErrorMessage(res));
253                 }
254
255                 /* sanity check the result set */
256                 if (PQnfields(res) != 3 || PQntuples(res) != 1)
257                         pg_fatal("unexpected result set size while fetching remote files\n");
258
259                 if (PQftype(res, 0) != TEXTOID &&
260                         PQftype(res, 1) != INT4OID &&
261                         PQftype(res, 2) != BYTEAOID)
262                 {
263                         pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u\n",
264                                          PQftype(res, 0), PQftype(res, 1), PQftype(res, 2));
265                 }
266
267                 if (PQfformat(res, 0) != 1 &&
268                         PQfformat(res, 1) != 1 &&
269                         PQfformat(res, 2) != 1)
270                 {
271                         pg_fatal("unexpected result format while fetching remote files\n");
272                 }
273
274                 if (PQgetisnull(res, 0, 0) ||
275                         PQgetisnull(res, 0, 1))
276                 {
277                         pg_fatal("unexpected null values in result while fetching remote files\n");
278                 }
279
280                 if (PQgetlength(res, 0, 1) != sizeof(int32))
281                         pg_fatal("unexpected result length while fetching remote files\n");
282
283                 /* Read result set to local variables */
284                 memcpy(&chunkoff, PQgetvalue(res, 0, 1), sizeof(int32));
285                 chunkoff = ntohl(chunkoff);
286                 chunksize = PQgetlength(res, 0, 2);
287
288                 filenamelen = PQgetlength(res, 0, 0);
289                 filename = pg_malloc(filenamelen + 1);
290                 memcpy(filename, PQgetvalue(res, 0, 0), filenamelen);
291                 filename[filenamelen] = '\0';
292
293                 chunk = PQgetvalue(res, 0, 2);
294
295                 /*
296                  * It's possible that the file was deleted on remote side after we
297                  * created the file map. In this case simply ignore it, as if it was
298                  * not there in the first place, and move on.
299                  */
300                 if (PQgetisnull(res, 0, 2))
301                 {
302                         pg_log(PG_DEBUG,
303                           "received null value for chunk for file \"%s\", file has been deleted\n",
304                                    filename);
305                         pg_free(filename);
306                         PQclear(res);
307                         continue;
308                 }
309
310                 pg_log(PG_DEBUG, "received chunk for file \"%s\", offset %d, size %d\n",
311                            filename, chunkoff, chunksize);
312
313                 open_target_file(filename, false);
314
315                 write_target_range(chunk, chunkoff, chunksize);
316
317                 pg_free(filename);
318
319                 PQclear(res);
320         }
321 }
322
323 /*
324  * Receive a single file as a malloc'd buffer.
325  */
326 char *
327 libpqGetFile(const char *filename, size_t *filesize)
328 {
329         PGresult   *res;
330         char       *result;
331         int                     len;
332         const char *paramValues[1];
333
334         paramValues[0] = filename;
335         res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
336                                            1, NULL, paramValues, NULL, NULL, 1);
337
338         if (PQresultStatus(res) != PGRES_TUPLES_OK)
339                 pg_fatal("could not fetch remote file \"%s\": %s",
340                                  filename, PQresultErrorMessage(res));
341
342         /* sanity check the result set */
343         if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
344                 pg_fatal("unexpected result set while fetching remote file \"%s\"\n",
345                                  filename);
346
347         /* Read result to local variables */
348         len = PQgetlength(res, 0, 0);
349         result = pg_malloc(len + 1);
350         memcpy(result, PQgetvalue(res, 0, 0), len);
351         result[len] = '\0';
352
353         PQclear(res);
354
355         pg_log(PG_DEBUG, "fetched file \"%s\", length %d\n", filename, len);
356
357         if (filesize)
358                 *filesize = len;
359         return result;
360 }
361
362 /*
363  * Write a file range to a temporary table in the server.
364  *
365  * The range is sent to the server as a COPY formatted line, to be inserted
366  * into the 'fetchchunks' temporary table. It is used in receiveFileChunks()
367  * function to actually fetch the data.
368  */
369 static void
370 fetch_file_range(const char *path, unsigned int begin, unsigned int end)
371 {
372         char            linebuf[MAXPGPATH + 23];
373
374         /* Split the range into CHUNKSIZE chunks */
375         while (end - begin > 0)
376         {
377                 unsigned int len;
378
379                 if (end - begin > CHUNKSIZE)
380                         len = CHUNKSIZE;
381                 else
382                         len = end - begin;
383
384                 snprintf(linebuf, sizeof(linebuf), "%s\t%u\t%u\n", path, begin, len);
385
386                 if (PQputCopyData(conn, linebuf, strlen(linebuf)) != 1)
387                         pg_fatal("could not send COPY data: %s",
388                                          PQerrorMessage(conn));
389
390                 begin += len;
391         }
392 }
393
394 /*
395  * Fetch all changed blocks from remote source data directory.
396  */
397 void
398 libpq_executeFileMap(filemap_t *map)
399 {
400         file_entry_t *entry;
401         const char *sql;
402         PGresult   *res;
403         int                     i;
404
405         /*
406          * First create a temporary table, and load it with the blocks that we
407          * need to fetch.
408          */
409         sql = "CREATE TEMPORARY TABLE fetchchunks(path text, begin int4, len int4);";
410         res = PQexec(conn, sql);
411
412         if (PQresultStatus(res) != PGRES_COMMAND_OK)
413                 pg_fatal("could not create temporary table: %s",
414                                  PQresultErrorMessage(res));
415         PQclear(res);
416
417         sql = "COPY fetchchunks FROM STDIN";
418         res = PQexec(conn, sql);
419
420         if (PQresultStatus(res) != PGRES_COPY_IN)
421                 pg_fatal("could not send file list: %s",
422                                  PQresultErrorMessage(res));
423         PQclear(res);
424
425         for (i = 0; i < map->narray; i++)
426         {
427                 entry = map->array[i];
428
429                 /* If this is a relation file, copy the modified blocks */
430                 execute_pagemap(&entry->pagemap, entry->path);
431
432                 switch (entry->action)
433                 {
434                         case FILE_ACTION_NONE:
435                                 /* nothing else to do */
436                                 break;
437
438                         case FILE_ACTION_COPY:
439                                 /* Truncate the old file out of the way, if any */
440                                 open_target_file(entry->path, true);
441                                 fetch_file_range(entry->path, 0, entry->newsize);
442                                 break;
443
444                         case FILE_ACTION_TRUNCATE:
445                                 truncate_target_file(entry->path, entry->newsize);
446                                 break;
447
448                         case FILE_ACTION_COPY_TAIL:
449                                 fetch_file_range(entry->path, entry->oldsize, entry->newsize);
450                                 break;
451
452                         case FILE_ACTION_REMOVE:
453                                 remove_target(entry);
454                                 break;
455
456                         case FILE_ACTION_CREATE:
457                                 create_target(entry);
458                                 break;
459                 }
460         }
461
462         if (PQputCopyEnd(conn, NULL) != 1)
463                 pg_fatal("could not send end-of-COPY: %s",
464                                  PQerrorMessage(conn));
465
466         while ((res = PQgetResult(conn)) != NULL)
467         {
468                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
469                         pg_fatal("unexpected result while sending file list: %s",
470                                          PQresultErrorMessage(res));
471                 PQclear(res);
472         }
473
474         /*
475          * We've now copied the list of file ranges that we need to fetch to the
476          * temporary table. Now, actually fetch all of those ranges.
477          */
478         sql =
479                 "SELECT path, begin, \n"
480                 "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
481                 "FROM fetchchunks\n";
482
483         receiveFileChunks(sql);
484 }
485
486 static void
487 execute_pagemap(datapagemap_t *pagemap, const char *path)
488 {
489         datapagemap_iterator_t *iter;
490         BlockNumber blkno;
491         off_t           offset;
492
493         iter = datapagemap_iterate(pagemap);
494         while (datapagemap_next(iter, &blkno))
495         {
496                 offset = blkno * BLCKSZ;
497
498                 fetch_file_range(path, offset, offset + BLCKSZ);
499         }
500         pg_free(iter);
501 }