]> granicus.if.org Git - postgresql/blob - src/bin/pg_dump/pg_backup_db.c
aeea9d02c88b06a56daa3fce474fe58285d89722
[postgresql] / src / bin / pg_dump / pg_backup_db.c
1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_db.c
4  *
5  *      Implements the basic DB functions used by the archiver.
6  *
7  * IDENTIFICATION
8  *        $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v 1.29 2001/10/25 05:49:52 momjian Exp $
9  *
10  * NOTES
11  *
12  * Modifications - 04-Jan-2001 - pjw@rhyme.com.au
13  *
14  *        - Check results of PQ routines more carefully.
15  *
16  * Modifications - 19-Mar-2001 - pjw@rhyme.com.au
17  *
18  *        - Avoid forcing table name to lower case in FixupBlobXrefs!
19  *
20  *-------------------------------------------------------------------------
21  */
22
23 #include "pg_backup.h"
24 #include "pg_backup_archiver.h"
25 #include "pg_backup_db.h"
26
27 #include <unistd.h>                             /* for getopt() */
28 #include <ctype.h>
29
30 #ifdef HAVE_TERMIOS_H
31 #include <termios.h>
32 #endif
33
34 #include "libpq-fe.h"
35 #include "libpq/libpq-fs.h"
36 #ifndef HAVE_STRDUP
37 #include "strdup.h"
38 #endif
39
40 static const char *modulename = gettext_noop("archiver (db)");
41
42 static void _check_database_version(ArchiveHandle *AH, bool ignoreVersion);
43 static PGconn *_connectDB(ArchiveHandle *AH, const char *newdbname, const char *newUser);
44 static int      _executeSqlCommand(ArchiveHandle *AH, PGconn *conn, PQExpBuffer qry, char *desc);
45 static void notice_processor(void *arg, const char *message);
46
47
48 /*
49  * simple_prompt  --- borrowed from psql
50  *
51  * Generalized function especially intended for reading in usernames and
52  * password interactively. Reads from /dev/tty or stdin/stderr.
53  *
54  * prompt:              The prompt to print
55  * maxlen:              How many characters to accept
56  * echo:                Set to false if you want to hide what is entered (for passwords)
57  *
58  * Returns a malloc()'ed string with the input (w/o trailing newline).
59  */
60 static bool prompt_state = false;
61
62 char *
63 simple_prompt(const char *prompt, int maxlen, bool echo)
64 {
65         int                     length;
66         char       *destination;
67         FILE       *termin,
68                            *termout;
69
70 #ifdef HAVE_TERMIOS_H
71         struct termios t_orig,
72                                 t;
73 #endif
74
75         destination = (char *) malloc(maxlen + 2);
76         if (!destination)
77                 return NULL;
78
79         prompt_state = true;            /* disable SIGINT */
80
81         /*
82          * Do not try to collapse these into one "w+" mode file. Doesn't work
83          * on some platforms (eg, HPUX 10.20).
84          */
85         termin = fopen("/dev/tty", "r");
86         termout = fopen("/dev/tty", "w");
87         if (!termin || !termout)
88         {
89                 if (termin)
90                         fclose(termin);
91                 if (termout)
92                         fclose(termout);
93                 termin = stdin;
94                 termout = stderr;
95         }
96
97 #ifdef HAVE_TERMIOS_H
98         if (!echo)
99         {
100                 tcgetattr(fileno(termin), &t);
101                 t_orig = t;
102                 t.c_lflag &= ~ECHO;
103                 tcsetattr(fileno(termin), TCSAFLUSH, &t);
104         }
105 #endif
106
107         if (prompt)
108         {
109                 fputs(gettext(prompt), termout);
110                 fflush(termout);
111         }
112
113         if (fgets(destination, maxlen, termin) == NULL)
114                 destination[0] = '\0';
115
116         length = strlen(destination);
117         if (length > 0 && destination[length - 1] != '\n')
118         {
119                 /* eat rest of the line */
120                 char            buf[128];
121                 int                     buflen;
122
123                 do
124                 {
125                         if (fgets(buf, sizeof(buf), termin) == NULL)
126                                 break;
127                         buflen = strlen(buf);
128                 } while (buflen > 0 && buf[buflen - 1] != '\n');
129         }
130
131         if (length > 0 && destination[length - 1] == '\n')
132                 /* remove trailing newline */
133                 destination[length - 1] = '\0';
134
135 #ifdef HAVE_TERMIOS_H
136         if (!echo)
137         {
138                 tcsetattr(fileno(termin), TCSAFLUSH, &t_orig);
139                 fputs("\n", termout);
140                 fflush(termout);
141         }
142 #endif
143
144         if (termin != stdin)
145         {
146                 fclose(termin);
147                 fclose(termout);
148         }
149
150         prompt_state = false;           /* SIGINT okay again */
151
152         return destination;
153 }
154
155
156 static int
157 _parse_version(ArchiveHandle *AH, const char *versionString)
158 {
159         int                     cnt;
160         int                     vmaj,
161                                 vmin,
162                                 vrev;
163
164         cnt = sscanf(versionString, "%d.%d.%d", &vmaj, &vmin, &vrev);
165
166         if (cnt < 2)
167                 die_horribly(AH, modulename, "unable to parse version string \"%s\"\n", versionString);
168
169         if (cnt == 2)
170                 vrev = 0;
171
172         return (100 * vmaj + vmin) * 100 + vrev;
173 }
174
175 static void
176 _check_database_version(ArchiveHandle *AH, bool ignoreVersion)
177 {
178         PGresult   *res;
179         int                     myversion;
180         const char *remoteversion_str;
181         int                     remoteversion;
182         PGconn     *conn = AH->connection;
183
184         myversion = _parse_version(AH, PG_VERSION);
185
186         res = PQexec(conn, "SELECT version();");
187         if (!res ||
188                 PQresultStatus(res) != PGRES_TUPLES_OK ||
189                 PQntuples(res) != 1)
190
191                 die_horribly(AH, modulename, "could not get version from server: %s", PQerrorMessage(conn));
192
193         remoteversion_str = PQgetvalue(res, 0, 0);
194         remoteversion = _parse_version(AH, remoteversion_str + 11);
195
196         PQclear(res);
197
198         AH->public.remoteVersion = remoteversion;
199
200         if (myversion != remoteversion
201                 && (remoteversion < AH->public.minRemoteVersion || remoteversion > AH->public.maxRemoteVersion))
202         {
203                 write_msg(NULL, "server version: %s; %s version: %s\n",
204                                   remoteversion_str, progname, PG_VERSION);
205                 if (ignoreVersion)
206                         write_msg(NULL, "proceeding despite version mismatch\n");
207                 else
208                         die_horribly(AH, NULL, "aborting because of version mismatch  (Use the -i option to proceed anyway.)\n");
209         }
210 }
211
212 /*
213  * Check if a given user is a superuser.
214  */
215 int
216 UserIsSuperuser(ArchiveHandle *AH, char *user)
217 {
218         PQExpBuffer qry = createPQExpBuffer();
219         PGresult   *res;
220         int                     i_usesuper;
221         int                     ntups;
222         int                     isSuper;
223
224         /* Get the superuser setting */
225         appendPQExpBuffer(qry, "select usesuper from pg_user where usename = '%s'", user);
226         res = PQexec(AH->connection, qry->data);
227
228         if (!res)
229                 die_horribly(AH, modulename, "null result checking superuser status of %s\n", user);
230
231         if (PQresultStatus(res) != PGRES_TUPLES_OK)
232                 die_horribly(AH, modulename, "could not check superuser status of %s: %s",
233                                          user, PQerrorMessage(AH->connection));
234
235         ntups = PQntuples(res);
236
237         if (ntups == 0)
238                 isSuper = 0;
239         else
240         {
241                 i_usesuper = PQfnumber(res, "usesuper");
242                 isSuper = (strcmp(PQgetvalue(res, 0, i_usesuper), "t") == 0);
243         }
244         PQclear(res);
245
246         destroyPQExpBuffer(qry);
247
248         return isSuper;
249 }
250
251 int
252 ConnectedUserIsSuperuser(ArchiveHandle *AH)
253 {
254         return UserIsSuperuser(AH, PQuser(AH->connection));
255 }
256
257 char *
258 ConnectedUser(ArchiveHandle *AH)
259 {
260         return PQuser(AH->connection);
261 }
262
263 /*
264  * Reconnect to the server.  If dbname is not NULL, use that database,
265  * else the one associated with the archive handle.  If username is
266  * not NULL, use that user name, else the one from the handle.  If
267  * both the database and the user and match the existing connection
268  * already, nothing will be done.
269  *
270  * Returns 1 in any case.
271  */
272 int
273 ReconnectToServer(ArchiveHandle *AH, const char *dbname, const char *username)
274 {
275         PGconn     *newConn;
276         const char *newdbname;
277         const char *newusername;
278
279         if (!dbname)
280                 newdbname = PQdb(AH->connection);
281         else
282                 newdbname = dbname;
283
284         if (!username)
285                 newusername = PQuser(AH->connection);
286         else
287                 newusername = username;
288
289         /* Let's see if the request is already satisfied */
290         if (strcmp(newusername, PQuser(AH->connection)) == 0
291                 && strcmp(newdbname, PQdb(AH->connection)) == 0)
292                 return 1;
293
294         newConn = _connectDB(AH, newdbname, newusername);
295
296         PQfinish(AH->connection);
297         AH->connection = newConn;
298
299         free(AH->username);
300         AH->username = strdup(newusername);
301         /* XXX Why don't we update AH->dbname? */
302
303         return 1;
304 }
305
306 /*
307  * Connect to the db again.
308  */
309 static PGconn *
310 _connectDB(ArchiveHandle *AH, const char *reqdb, const char *requser)
311 {
312         int                     need_pass;
313         PGconn     *newConn;
314         char       *password = NULL;
315         int                     badPwd = 0;
316         int                     noPwd = 0;
317         char       *newdb;
318         char       *newuser;
319
320         if (!reqdb)
321                 newdb = PQdb(AH->connection);
322         else
323                 newdb = (char *) reqdb;
324
325         if (!requser || (strlen(requser) == 0))
326                 newuser = PQuser(AH->connection);
327         else
328                 newuser = (char *) requser;
329
330         ahlog(AH, 1, "connecting to database %s as user %s\n", newdb, newuser);
331
332         if (AH->requirePassword)
333         {
334                 password = simple_prompt("Password: ", 100, false);
335                 if (password == NULL)
336                         die_horribly(AH, modulename, "out of memory\n");
337         }
338
339         do
340         {
341                 need_pass = false;
342                 newConn = PQsetdbLogin(PQhost(AH->connection), PQport(AH->connection),
343                                                            NULL, NULL, newdb,
344                                                            newuser, password);
345                 if (!newConn)
346                         die_horribly(AH, modulename, "failed to reconnect to database\n");
347
348                 if (PQstatus(newConn) == CONNECTION_BAD)
349                 {
350                         noPwd = (strcmp(PQerrorMessage(newConn),
351                                                         "fe_sendauth: no password supplied\n") == 0);
352                         badPwd = (strncmp(PQerrorMessage(newConn),
353                                         "Password authentication failed for user", 39) == 0);
354
355                         if (noPwd || badPwd)
356                         {
357
358                                 if (badPwd)
359                                         fprintf(stderr, "Password incorrect\n");
360
361                                 fprintf(stderr, "Connecting to %s as %s\n",
362                                                 PQdb(AH->connection), newuser);
363
364                                 need_pass = true;
365                                 if (password)
366                                         free(password);
367                                 password = simple_prompt("Password: ", 100, false);
368                         }
369                         else
370                                 die_horribly(AH, modulename, "could not reconnect to database: %s",
371                                                          PQerrorMessage(newConn));
372                 }
373
374         } while (need_pass);
375
376         if (password)
377                 free(password);
378
379         PQsetNoticeProcessor(newConn, notice_processor, NULL);
380
381         return newConn;
382 }
383
384
385 /*
386  * Make a database connection with the given parameters.  The
387  * connection handle is returned, the parameters are stored in AHX.
388  * An interactive password prompt is automatically issued if required.
389  */
390 PGconn *
391 ConnectDatabase(Archive *AHX,
392                                 const char *dbname,
393                                 const char *pghost,
394                                 const char *pgport,
395                                 const char *username,
396                                 const int reqPwd,
397                                 const int ignoreVersion)
398 {
399         ArchiveHandle *AH = (ArchiveHandle *) AHX;
400         char       *password = NULL;
401         bool            need_pass = false;
402
403         if (AH->connection)
404                 die_horribly(AH, modulename, "already connected to a database\n");
405
406         if (!dbname && !(dbname = getenv("PGDATABASE")))
407                 die_horribly(AH, modulename, "no database name specified\n");
408
409         AH->dbname = strdup(dbname);
410
411         if (pghost != NULL)
412                 AH->pghost = strdup(pghost);
413         else
414                 AH->pghost = NULL;
415
416         if (pgport != NULL)
417                 AH->pgport = strdup(pgport);
418         else
419                 AH->pgport = NULL;
420
421         if (username != NULL)
422                 AH->username = strdup(username);
423         else
424                 AH->username = NULL;
425
426         if (reqPwd)
427         {
428                 password = simple_prompt("Password: ", 100, false);
429                 if (password == NULL)
430                         die_horribly(AH, modulename, "out of memory\n");
431                 AH->requirePassword = true;
432         }
433         else
434                 AH->requirePassword = false;
435
436         /*
437          * Start the connection.  Loop until we have a password if requested
438          * by backend.
439          */
440         do
441         {
442                 need_pass = false;
443                 AH->connection = PQsetdbLogin(AH->pghost, AH->pgport, NULL, NULL,
444                                                                           AH->dbname, AH->username, password);
445
446                 if (!AH->connection)
447                         die_horribly(AH, modulename, "failed to connect to database\n");
448
449                 if (PQstatus(AH->connection) == CONNECTION_BAD &&
450                         strcmp(PQerrorMessage(AH->connection), "fe_sendauth: no password supplied\n") == 0 &&
451                         !feof(stdin))
452                 {
453                         PQfinish(AH->connection);
454                         need_pass = true;
455                         free(password);
456                         password = NULL;
457                         password = simple_prompt("Password: ", 100, false);
458                 }
459         } while (need_pass);
460
461         if (password)
462                 free(password);
463
464         /* check to see that the backend connection was successfully made */
465         if (PQstatus(AH->connection) == CONNECTION_BAD)
466                 die_horribly(AH, modulename, "connection to database \"%s\" failed: %s",
467                                          AH->dbname, PQerrorMessage(AH->connection));
468
469         /* check for version mismatch */
470         _check_database_version(AH, ignoreVersion);
471
472         PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
473
474         /*
475          * AH->currUser = PQuser(AH->connection);
476          *
477          * Removed because it prevented an initial \connect when dumping to SQL
478          * in pg_dump.
479          */
480
481         return AH->connection;
482 }
483
484
485 static void
486 notice_processor(void *arg, const char *message)
487 {
488         write_msg(NULL, "%s", message);
489 }
490
491
492 /* Public interface */
493 /* Convenience function to send a query. Monitors result to handle COPY statements */
494 int
495 ExecuteSqlCommand(ArchiveHandle *AH, PQExpBuffer qry, char *desc, bool use_blob)
496 {
497         if (use_blob)
498                 return _executeSqlCommand(AH, AH->blobConnection, qry, desc);
499         else
500                 return _executeSqlCommand(AH, AH->connection, qry, desc);
501 }
502
503 /*
504  * Handle command execution. This is used to execute a command on more than one connection,
505  * but the 'pgCopyIn' setting assumes the COPY commands are ONLY executed on the primary
506  * setting...an error will be raised otherwise.
507  */
508 static int
509 _executeSqlCommand(ArchiveHandle *AH, PGconn *conn, PQExpBuffer qry, char *desc)
510 {
511         PGresult   *res;
512
513         /* fprintf(stderr, "Executing: '%s'\n\n", qry->data); */
514         res = PQexec(conn, qry->data);
515         if (!res)
516                 die_horribly(AH, modulename, "%s: no result from server\n", desc);
517
518         if (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)
519         {
520                 if (PQresultStatus(res) == PGRES_COPY_IN)
521                 {
522                         if (conn != AH->connection)
523                                 die_horribly(AH, modulename, "COPY command executed in non-primary connection\n");
524
525                         AH->pgCopyIn = 1;
526                 }
527                 else
528                         die_horribly(AH, modulename, "%s: %s",
529                                                  desc, PQerrorMessage(AH->connection));
530         }
531
532         PQclear(res);
533
534         return strlen(qry->data);
535 }
536
537 /* Convenience function to send one or more queries. Monitors result to handle COPY statements */
538 int
539 ExecuteSqlCommandBuf(ArchiveHandle *AH, void *qryv, int bufLen)
540 {
541         int                     loc;
542         int                     pos = 0;
543         int                     sPos = 0;
544         char       *qry = (char *) qryv;
545         int                     isEnd = 0;
546         char       *eos = qry + bufLen;
547
548         /*
549          * fprintf(stderr, "\n\n*****\n
550          * Buffer:\n\n%s\n*******************\n\n", qry);
551          */
552
553         /* If we're in COPY IN mode, then just break it into lines and send... */
554         if (AH->pgCopyIn)
555         {
556                 for (;;)
557                 {
558
559                         /* Find a lf */
560                         loc = strcspn(&qry[pos], "\n") + pos;
561                         pos = 0;
562
563                         /* If no match, then wait */
564                         if (loc >= (eos - qry))         /* None found */
565                         {
566                                 appendBinaryPQExpBuffer(AH->pgCopyBuf, qry, (eos - qry));
567                                 break;
568                         };
569
570                         /*
571                          * fprintf(stderr, "Found cr at %d, prev char was %c, next was
572                          * %c\n", loc, qry[loc-1], qry[loc+1]);
573                          */
574
575                         /* Count the number of preceding slashes */
576                         sPos = loc;
577                         while (sPos > 0 && qry[sPos - 1] == '\\')
578                                 sPos--;
579
580                         sPos = loc - sPos;
581
582                         /*
583                          * If an odd number of preceding slashes, then \n was escaped
584                          * so set the next search pos, and restart (if any left).
585                          */
586                         if ((sPos & 1) == 1)
587                         {
588                                 /* fprintf(stderr, "cr was escaped\n"); */
589                                 pos = loc + 1;
590                                 if (pos >= (eos - qry))
591                                 {
592                                         appendBinaryPQExpBuffer(AH->pgCopyBuf, qry, (eos - qry));
593                                         break;
594                                 }
595                         }
596                         else
597                         {
598                                 /* We got a good cr */
599                                 qry[loc] = '\0';
600                                 appendPQExpBuffer(AH->pgCopyBuf, "%s\n", qry);
601                                 qry += loc + 1;
602                                 isEnd = (strcmp(AH->pgCopyBuf->data, "\\.\n") == 0);
603
604                                 /*---------
605                                  * fprintf(stderr, "Sending '%s' via
606                                  *              COPY (at end = %d)\n\n", AH->pgCopyBuf->data, isEnd);
607                                  *---------
608                                  */
609
610                                 if (PQputline(AH->connection, AH->pgCopyBuf->data) != 0)
611                                         die_horribly(AH, modulename, "error returned by PQputline\n");
612
613                                 resetPQExpBuffer(AH->pgCopyBuf);
614
615                                 /*
616                                  * fprintf(stderr, "Buffer is '%s'\n",
617                                  * AH->pgCopyBuf->data);
618                                  */
619
620                                 if (isEnd)
621                                 {
622                                         if (PQendcopy(AH->connection) != 0)
623                                                 die_horribly(AH, modulename, "error returned by PQendcopy\n");
624
625                                         AH->pgCopyIn = 0;
626                                         break;
627                                 }
628
629                         }
630
631                         /* Make sure we're not past the original buffer end */
632                         if (qry >= eos)
633                                 break;
634
635                 }
636         }
637
638         /* We may have finished Copy In, and have a non-empty buffer */
639         if (!AH->pgCopyIn)
640         {
641                 /*
642                  * The following is a mini state machine to assess then of of an
643                  * SQL statement. It really only needs to parse good SQL, or at
644                  * least that's the theory... End-of-statement is assumed to be an
645                  * unquoted, un commented semi-colon.
646                  */
647
648                 /*
649                  * fprintf(stderr, "Buffer at start is: '%s'\n\n",
650                  * AH->sqlBuf->data);
651                  */
652
653                 for (pos = 0; pos < (eos - qry); pos++)
654                 {
655                         appendPQExpBufferChar(AH->sqlBuf, qry[pos]);
656                         /* fprintf(stderr, " %c",qry[pos]); */
657
658                         switch (AH->sqlparse.state)
659                         {
660
661                                 case SQL_SCAN:  /* Default state == 0, set in _allocAH */
662
663                                         if (qry[pos] == ';' && AH->sqlparse.braceDepth == 0)
664                                         {
665                                                 /* Send It & reset the buffer */
666
667                                                 /*
668                                                  * fprintf(stderr, "    sending: '%s'\n\n",
669                                                  * AH->sqlBuf->data);
670                                                  */
671                                                 ExecuteSqlCommand(AH, AH->sqlBuf, "could not execute query", false);
672                                                 resetPQExpBuffer(AH->sqlBuf);
673                                                 AH->sqlparse.lastChar = '\0';
674                                         }
675                                         else
676                                         {
677                                                 if (qry[pos] == '"' || qry[pos] == '\'')
678                                                 {
679                                                         /* fprintf(stderr,"[startquote]\n"); */
680                                                         AH->sqlparse.state = SQL_IN_QUOTE;
681                                                         AH->sqlparse.quoteChar = qry[pos];
682                                                         AH->sqlparse.backSlash = 0;
683                                                 }
684                                                 else if (qry[pos] == '-' && AH->sqlparse.lastChar == '-')
685                                                         AH->sqlparse.state = SQL_IN_SQL_COMMENT;
686                                                 else if (qry[pos] == '*' && AH->sqlparse.lastChar == '/')
687                                                         AH->sqlparse.state = SQL_IN_EXT_COMMENT;
688                                                 else if (qry[pos] == '(')
689                                                         AH->sqlparse.braceDepth++;
690                                                 else if (qry[pos] == ')')
691                                                         AH->sqlparse.braceDepth--;
692
693                                                 AH->sqlparse.lastChar = qry[pos];
694                                         }
695
696                                         break;
697
698                                 case SQL_IN_SQL_COMMENT:
699
700                                         if (qry[pos] == '\n')
701                                                 AH->sqlparse.state = SQL_SCAN;
702                                         break;
703
704                                 case SQL_IN_EXT_COMMENT:
705
706                                         if (AH->sqlparse.lastChar == '*' && qry[pos] == '/')
707                                                 AH->sqlparse.state = SQL_SCAN;
708                                         break;
709
710                                 case SQL_IN_QUOTE:
711
712                                         if (!AH->sqlparse.backSlash && AH->sqlparse.quoteChar == qry[pos])
713                                         {
714                                                 /* fprintf(stderr,"[endquote]\n"); */
715                                                 AH->sqlparse.state = SQL_SCAN;
716                                         }
717                                         else
718                                         {
719
720                                                 if (qry[pos] == '\\')
721                                                 {
722                                                         if (AH->sqlparse.lastChar == '\\')
723                                                                 AH->sqlparse.backSlash = !AH->sqlparse.backSlash;
724                                                         else
725                                                                 AH->sqlparse.backSlash = 1;
726                                                 }
727                                                 else
728                                                         AH->sqlparse.backSlash = 0;
729                                         }
730                                         break;
731
732                         }
733                         AH->sqlparse.lastChar = qry[pos];
734                         /* fprintf(stderr, "\n"); */
735                 }
736
737         }
738
739         return 1;
740 }
741
742 void
743 FixupBlobRefs(ArchiveHandle *AH, char *tablename)
744 {
745         PQExpBuffer tblQry;
746         PGresult   *res,
747                            *uRes;
748         int                     i,
749                                 n;
750         char       *attr;
751
752         if (strcmp(tablename, BLOB_XREF_TABLE) == 0)
753                 return;
754
755         tblQry = createPQExpBuffer();
756
757         appendPQExpBuffer(tblQry, "SELECT a.attname FROM pg_class c, pg_attribute a, pg_type t "
758          " WHERE a.attnum > 0 AND a.attrelid = c.oid AND a.atttypid = t.oid "
759          " AND t.typname in ('oid', 'lo') AND c.relname = '%s';", tablename);
760
761         res = PQexec(AH->blobConnection, tblQry->data);
762         if (!res)
763                 die_horribly(AH, modulename, "could not find oid columns of table \"%s\": %s",
764                                          tablename, PQerrorMessage(AH->connection));
765
766         if ((n = PQntuples(res)) == 0)
767         {
768                 /* nothing to do */
769                 ahlog(AH, 1, "no OID type columns in table %s\n", tablename);
770         }
771
772         for (i = 0; i < n; i++)
773         {
774                 attr = PQgetvalue(res, i, 0);
775
776                 ahlog(AH, 1, "fixing large object cross-references for %s.%s\n", tablename, attr);
777
778                 resetPQExpBuffer(tblQry);
779
780                 /*
781                  * We should use coalesce here (rather than 'exists'), but it
782                  * seems to be broken in 7.0.2 (weird optimizer strategy)
783                  */
784                 appendPQExpBuffer(tblQry, "UPDATE \"%s\" SET \"%s\" = ", tablename, attr);
785                 appendPQExpBuffer(tblQry, " (SELECT x.newOid FROM \"%s\" x WHERE x.oldOid = \"%s\".\"%s\")",
786                                                   BLOB_XREF_TABLE, tablename, attr);
787                 appendPQExpBuffer(tblQry, " where exists"
788                                   "(select * from %s x where x.oldOid = \"%s\".\"%s\");",
789                                                   BLOB_XREF_TABLE, tablename, attr);
790
791                 ahlog(AH, 10, "SQL: %s\n", tblQry->data);
792
793                 uRes = PQexec(AH->blobConnection, tblQry->data);
794                 if (!uRes)
795                         die_horribly(AH, modulename,
796                                         "could not update column \"%s\" of table \"%s\": %s",
797                                         attr, tablename, PQerrorMessage(AH->blobConnection));
798
799                 if (PQresultStatus(uRes) != PGRES_COMMAND_OK)
800                         die_horribly(AH, modulename,
801                                 "error while updating column \"%s\" of table \"%s\": %s",
802                                         attr, tablename, PQerrorMessage(AH->blobConnection));
803
804                 PQclear(uRes);
805         }
806
807         PQclear(res);
808         destroyPQExpBuffer(tblQry);
809 }
810
811 /**********
812  *      Convenient SQL calls
813  **********/
814 void
815 CreateBlobXrefTable(ArchiveHandle *AH)
816 {
817         PQExpBuffer qry = createPQExpBuffer();
818
819         /* IF we don't have a BLOB connection, then create one */
820         if (!AH->blobConnection)
821                 AH->blobConnection = _connectDB(AH, NULL, NULL);
822
823         ahlog(AH, 1, "creating table for large object cross-references\n");
824
825         appendPQExpBuffer(qry, "Create Temporary Table %s(oldOid oid, newOid oid);", BLOB_XREF_TABLE);
826
827         ExecuteSqlCommand(AH, qry, "could not create large object cross-reference table", true);
828
829         resetPQExpBuffer(qry);
830
831         appendPQExpBuffer(qry, "Create Unique Index %s_ix on %s(oldOid)", BLOB_XREF_TABLE, BLOB_XREF_TABLE);
832         ExecuteSqlCommand(AH, qry, "could not create index on large object cross-reference table", true);
833
834         destroyPQExpBuffer(qry);
835 }
836
837 void
838 InsertBlobXref(ArchiveHandle *AH, int old, int new)
839 {
840         PQExpBuffer qry = createPQExpBuffer();
841
842         appendPQExpBuffer(qry, "Insert Into %s(oldOid, newOid) Values (%d, %d);", BLOB_XREF_TABLE, old, new);
843
844         ExecuteSqlCommand(AH, qry, "could not create large object cross-reference entry", true);
845
846         destroyPQExpBuffer(qry);
847 }
848
849 void
850 StartTransaction(ArchiveHandle *AH)
851 {
852         PQExpBuffer qry = createPQExpBuffer();
853
854         appendPQExpBuffer(qry, "Begin;");
855
856         ExecuteSqlCommand(AH, qry, "could not start database transaction", false);
857         AH->txActive = true;
858
859         destroyPQExpBuffer(qry);
860 }
861
862 void
863 StartTransactionXref(ArchiveHandle *AH)
864 {
865         PQExpBuffer qry = createPQExpBuffer();
866
867         appendPQExpBuffer(qry, "Begin;");
868
869         ExecuteSqlCommand(AH, qry,
870                                           "could not start transaction for large object cross-references", true);
871         AH->blobTxActive = true;
872
873         destroyPQExpBuffer(qry);
874 }
875
876 void
877 CommitTransaction(ArchiveHandle *AH)
878 {
879         PQExpBuffer qry = createPQExpBuffer();
880
881         appendPQExpBuffer(qry, "Commit;");
882
883         ExecuteSqlCommand(AH, qry, "could not commit database transaction", false);
884         AH->txActive = false;
885
886         destroyPQExpBuffer(qry);
887 }
888
889 void
890 CommitTransactionXref(ArchiveHandle *AH)
891 {
892         PQExpBuffer qry = createPQExpBuffer();
893
894         appendPQExpBuffer(qry, "Commit;");
895
896         ExecuteSqlCommand(AH, qry, "could not commit transaction for large object cross-references", true);
897         AH->blobTxActive = false;
898
899         destroyPQExpBuffer(qry);
900 }