1 /*-------------------------------------------------------------------------
5 * Implements the basic DB functions used by the archiver.
8 * $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v 1.40 2002/09/04 20:31:34 momjian Exp $
10 *-------------------------------------------------------------------------
13 #include "pg_backup.h"
14 #include "pg_backup_archiver.h"
15 #include "pg_backup_db.h"
16 #include "dumputils.h"
26 #include "libpq/libpq-fs.h"
31 static const char *modulename = gettext_noop("archiver (db)");
33 static void _check_database_version(ArchiveHandle *AH, bool ignoreVersion);
34 static PGconn *_connectDB(ArchiveHandle *AH, const char *newdbname, const char *newUser);
35 static int _executeSqlCommand(ArchiveHandle *AH, PGconn *conn, PQExpBuffer qry, char *desc);
36 static void notice_processor(void *arg, const char *message);
37 static char *_sendSQLLine(ArchiveHandle *AH, char *qry, char *eos);
38 static char *_sendCopyLine(ArchiveHandle *AH, char *qry, char *eos);
42 _parse_version(ArchiveHandle *AH, const char *versionString)
49 cnt = sscanf(versionString, "%d.%d.%d", &vmaj, &vmin, &vrev);
52 die_horribly(AH, modulename, "unable to parse version string \"%s\"\n", versionString);
57 return (100 * vmaj + vmin) * 100 + vrev;
61 _check_database_version(ArchiveHandle *AH, bool ignoreVersion)
65 const char *remoteversion_str;
67 PGconn *conn = AH->connection;
69 myversion = _parse_version(AH, PG_VERSION);
71 res = PQexec(conn, "SELECT version();");
73 PQresultStatus(res) != PGRES_TUPLES_OK ||
76 die_horribly(AH, modulename, "could not get version from server: %s", PQerrorMessage(conn));
78 remoteversion_str = PQgetvalue(res, 0, 0);
79 remoteversion = _parse_version(AH, remoteversion_str + 11);
83 AH->public.remoteVersion = remoteversion;
85 if (myversion != remoteversion
86 && (remoteversion < AH->public.minRemoteVersion || remoteversion > AH->public.maxRemoteVersion))
88 write_msg(NULL, "server version: %s; %s version: %s\n",
89 remoteversion_str, progname, PG_VERSION);
91 write_msg(NULL, "proceeding despite version mismatch\n");
93 die_horribly(AH, NULL, "aborting because of version mismatch (Use the -i option to proceed anyway.)\n");
98 * Reconnect to the server. If dbname is not NULL, use that database,
99 * else the one associated with the archive handle. If username is
100 * not NULL, use that user name, else the one from the handle. If
101 * both the database and the user and match the existing connection
102 * already, nothing will be done.
104 * Returns 1 in any case.
107 ReconnectToServer(ArchiveHandle *AH, const char *dbname, const char *username)
110 const char *newdbname;
111 const char *newusername;
114 newdbname = PQdb(AH->connection);
119 newusername = PQuser(AH->connection);
121 newusername = username;
123 /* Let's see if the request is already satisfied */
124 if (strcmp(newusername, PQuser(AH->connection)) == 0
125 && strcmp(newdbname, PQdb(AH->connection)) == 0)
128 newConn = _connectDB(AH, newdbname, newusername);
130 PQfinish(AH->connection);
131 AH->connection = newConn;
133 /* don't assume we still know the output schema */
135 free(AH->currSchema);
136 AH->currSchema = strdup("");
142 * Connect to the db again.
145 _connectDB(ArchiveHandle *AH, const char *reqdb, const char *requser)
149 char *password = NULL;
156 newdb = PQdb(AH->connection);
158 newdb = (char *) reqdb;
160 if (!requser || (strlen(requser) == 0))
161 newuser = PQuser(AH->connection);
163 newuser = (char *) requser;
165 ahlog(AH, 1, "connecting to database %s as user %s\n", newdb, newuser);
167 if (AH->requirePassword)
169 password = simple_prompt("Password: ", 100, false);
170 if (password == NULL)
171 die_horribly(AH, modulename, "out of memory\n");
177 newConn = PQsetdbLogin(PQhost(AH->connection), PQport(AH->connection),
181 die_horribly(AH, modulename, "failed to reconnect to database\n");
183 if (PQstatus(newConn) == CONNECTION_BAD)
185 noPwd = (strcmp(PQerrorMessage(newConn),
186 "fe_sendauth: no password supplied\n") == 0);
187 badPwd = (strncmp(PQerrorMessage(newConn),
188 "Password authentication failed for user", 39) == 0);
194 fprintf(stderr, "Password incorrect\n");
196 fprintf(stderr, "Connecting to %s as %s\n",
197 PQdb(AH->connection), newuser);
202 password = simple_prompt("Password: ", 100, false);
205 die_horribly(AH, modulename, "could not reconnect to database: %s",
206 PQerrorMessage(newConn));
214 PQsetNoticeProcessor(newConn, notice_processor, NULL);
221 * Make a database connection with the given parameters. The
222 * connection handle is returned, the parameters are stored in AHX.
223 * An interactive password prompt is automatically issued if required.
226 ConnectDatabase(Archive *AHX,
230 const char *username,
232 const int ignoreVersion)
234 ArchiveHandle *AH = (ArchiveHandle *) AHX;
235 char *password = NULL;
236 bool need_pass = false;
239 die_horribly(AH, modulename, "already connected to a database\n");
243 password = simple_prompt("Password: ", 100, false);
244 if (password == NULL)
245 die_horribly(AH, modulename, "out of memory\n");
246 AH->requirePassword = true;
249 AH->requirePassword = false;
252 * Start the connection. Loop until we have a password if requested
258 AH->connection = PQsetdbLogin(pghost, pgport, NULL, NULL,
259 dbname, username, password);
262 die_horribly(AH, modulename, "failed to connect to database\n");
264 if (PQstatus(AH->connection) == CONNECTION_BAD &&
265 strcmp(PQerrorMessage(AH->connection), "fe_sendauth: no password supplied\n") == 0 &&
268 PQfinish(AH->connection);
272 password = simple_prompt("Password: ", 100, false);
279 /* check to see that the backend connection was successfully made */
280 if (PQstatus(AH->connection) == CONNECTION_BAD)
281 die_horribly(AH, modulename, "connection to database \"%s\" failed: %s",
282 PQdb(AH->connection), PQerrorMessage(AH->connection));
284 /* check for version mismatch */
285 _check_database_version(AH, ignoreVersion);
287 PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
289 return AH->connection;
294 notice_processor(void *arg, const char *message)
296 write_msg(NULL, "%s", message);
300 /* Public interface */
301 /* Convenience function to send a query. Monitors result to handle COPY statements */
303 ExecuteSqlCommand(ArchiveHandle *AH, PQExpBuffer qry, char *desc, bool use_blob)
306 return _executeSqlCommand(AH, AH->blobConnection, qry, desc);
308 return _executeSqlCommand(AH, AH->connection, qry, desc);
312 * Handle command execution. This is used to execute a command on more than one connection,
313 * but the 'pgCopyIn' setting assumes the COPY commands are ONLY executed on the primary
314 * setting...an error will be raised otherwise.
317 _executeSqlCommand(ArchiveHandle *AH, PGconn *conn, PQExpBuffer qry, char *desc)
321 /* fprintf(stderr, "Executing: '%s'\n\n", qry->data); */
322 res = PQexec(conn, qry->data);
324 die_horribly(AH, modulename, "%s: no result from server\n", desc);
326 if (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)
328 if (PQresultStatus(res) == PGRES_COPY_IN)
330 if (conn != AH->connection)
331 die_horribly(AH, modulename, "COPY command executed in non-primary connection\n");
336 die_horribly(AH, modulename, "%s: %s",
337 desc, PQerrorMessage(AH->connection));
342 return strlen(qry->data);
346 * Used by ExecuteSqlCommandBuf to send one buffered line when running a COPY command.
349 _sendCopyLine(ArchiveHandle *AH, char *qry, char *eos)
351 size_t loc; /* Location of next newline */
352 int pos = 0; /* Current position */
353 int sPos = 0; /* Last pos of a slash char */
356 /* loop to find unquoted newline ending the line of COPY data */
359 loc = strcspn(&qry[pos], "\n") + pos;
361 /* If no match, then wait */
362 if (loc >= (eos - qry)) /* None found */
364 appendBinaryPQExpBuffer(AH->pgCopyBuf, qry, (eos - qry));
369 * fprintf(stderr, "Found cr at %d, prev char was %c, next was
370 * %c\n", loc, qry[loc-1], qry[loc+1]);
373 /* Count the number of preceding slashes */
375 while (sPos > 0 && qry[sPos - 1] == '\\')
381 * If an odd number of preceding slashes, then \n was escaped so
382 * set the next search pos, and loop (if any left).
386 /* fprintf(stderr, "cr was escaped\n"); */
388 if (pos >= (eos - qry))
390 appendBinaryPQExpBuffer(AH->pgCopyBuf, qry, (eos - qry));
398 /* We found an unquoted newline */
400 appendPQExpBuffer(AH->pgCopyBuf, "%s\n", qry);
401 isEnd = (strcmp(AH->pgCopyBuf->data, "\\.\n") == 0);
404 * fprintf(stderr, "Sending '%s' via
405 * COPY (at end = %d)\n\n", AH->pgCopyBuf->data, isEnd);
409 if (PQputline(AH->connection, AH->pgCopyBuf->data) != 0)
410 die_horribly(AH, modulename, "error returned by PQputline\n");
412 resetPQExpBuffer(AH->pgCopyBuf);
415 * fprintf(stderr, "Buffer is '%s'\n", AH->pgCopyBuf->data);
420 if (PQendcopy(AH->connection) != 0)
421 die_horribly(AH, modulename, "error returned by PQendcopy\n");
426 return qry + loc + 1;
430 * Used by ExecuteSqlCommandBuf to send one buffered line of SQL (not data for the copy command).
433 _sendSQLLine(ArchiveHandle *AH, char *qry, char *eos)
435 int pos = 0; /* Current position */
438 * The following is a mini state machine to assess the end of an SQL
439 * statement. It really only needs to parse good SQL, or at least
440 * that's the theory... End-of-statement is assumed to be an unquoted,
441 * un commented semi-colon.
445 * fprintf(stderr, "Buffer at start is: '%s'\n\n", AH->sqlBuf->data);
448 for (pos = 0; pos < (eos - qry); pos++)
450 appendPQExpBufferChar(AH->sqlBuf, qry[pos]);
451 /* fprintf(stderr, " %c",qry[pos]); */
453 switch (AH->sqlparse.state)
456 case SQL_SCAN: /* Default state == 0, set in _allocAH */
458 if (qry[pos] == ';' && AH->sqlparse.braceDepth == 0)
460 /* Send It & reset the buffer */
463 * fprintf(stderr, " sending: '%s'\n\n",
466 ExecuteSqlCommand(AH, AH->sqlBuf, "could not execute query", false);
467 resetPQExpBuffer(AH->sqlBuf);
468 AH->sqlparse.lastChar = '\0';
471 * Remove any following newlines - so that embedded
472 * COPY commands don't get a starting newline.
475 for (; pos < (eos - qry) && qry[pos] == '\n'; pos++);
477 /* We've got our line, so exit */
482 if (qry[pos] == '"' || qry[pos] == '\'')
484 /* fprintf(stderr,"[startquote]\n"); */
485 AH->sqlparse.state = SQL_IN_QUOTE;
486 AH->sqlparse.quoteChar = qry[pos];
487 AH->sqlparse.backSlash = 0;
489 else if (qry[pos] == '-' && AH->sqlparse.lastChar == '-')
490 AH->sqlparse.state = SQL_IN_SQL_COMMENT;
491 else if (qry[pos] == '*' && AH->sqlparse.lastChar == '/')
492 AH->sqlparse.state = SQL_IN_EXT_COMMENT;
493 else if (qry[pos] == '(')
494 AH->sqlparse.braceDepth++;
495 else if (qry[pos] == ')')
496 AH->sqlparse.braceDepth--;
498 AH->sqlparse.lastChar = qry[pos];
503 case SQL_IN_SQL_COMMENT:
505 if (qry[pos] == '\n')
506 AH->sqlparse.state = SQL_SCAN;
509 case SQL_IN_EXT_COMMENT:
511 if (AH->sqlparse.lastChar == '*' && qry[pos] == '/')
512 AH->sqlparse.state = SQL_SCAN;
517 if (!AH->sqlparse.backSlash && AH->sqlparse.quoteChar == qry[pos])
519 /* fprintf(stderr,"[endquote]\n"); */
520 AH->sqlparse.state = SQL_SCAN;
525 if (qry[pos] == '\\')
527 if (AH->sqlparse.lastChar == '\\')
528 AH->sqlparse.backSlash = !AH->sqlparse.backSlash;
530 AH->sqlparse.backSlash = 1;
533 AH->sqlparse.backSlash = 0;
538 AH->sqlparse.lastChar = qry[pos];
539 /* fprintf(stderr, "\n"); */
543 * If we get here, we've processed entire string with no complete SQL
551 /* Convenience function to send one or more queries. Monitors result to handle COPY statements */
553 ExecuteSqlCommandBuf(ArchiveHandle *AH, void *qryv, size_t bufLen)
555 char *qry = (char *) qryv;
556 char *eos = qry + bufLen;
559 * fprintf(stderr, "\n\n*****\n
560 * Buffer:\n\n%s\n*******************\n\n", qry);
563 /* Could switch between command and COPY IN mode at each line */
567 qry = _sendCopyLine(AH, qry, eos);
569 qry = _sendSQLLine(AH, qry, eos);
576 FixupBlobRefs(ArchiveHandle *AH, TocEntry *te)
586 if (strcmp(te->tag, BLOB_XREF_TABLE) == 0)
589 tblName = createPQExpBuffer();
590 tblQry = createPQExpBuffer();
592 if (te->namespace && strlen(te->namespace) > 0)
593 appendPQExpBuffer(tblName, "%s.",
594 fmtId(te->namespace));
595 appendPQExpBuffer(tblName, "%s",
598 appendPQExpBuffer(tblQry,
599 "SELECT a.attname FROM "
600 "pg_catalog.pg_attribute a, pg_catalog.pg_type t "
601 "WHERE a.attnum > 0 AND a.attrelid = '%s'::pg_catalog.regclass "
602 "AND a.atttypid = t.oid AND t.typname in ('oid', 'lo')",
605 res = PQexec(AH->blobConnection, tblQry->data);
607 die_horribly(AH, modulename, "could not find oid columns of table \"%s\": %s",
608 te->tag, PQerrorMessage(AH->connection));
610 if ((n = PQntuples(res)) == 0)
613 ahlog(AH, 1, "no OID type columns in table %s\n", te->tag);
616 for (i = 0; i < n; i++)
618 attr = PQgetvalue(res, i, 0);
620 ahlog(AH, 1, "fixing large object cross-references for %s.%s\n",
623 resetPQExpBuffer(tblQry);
625 /* Can't use fmtId twice in one call... */
626 appendPQExpBuffer(tblQry,
627 "UPDATE %s SET %s = %s.newOid",
628 tblName->data, fmtId(attr),
630 appendPQExpBuffer(tblQry,
631 " FROM %s WHERE %s.oldOid = %s.%s",
634 tblName->data, fmtId(attr));
636 ahlog(AH, 10, "SQL: %s\n", tblQry->data);
638 uRes = PQexec(AH->blobConnection, tblQry->data);
640 die_horribly(AH, modulename,
641 "could not update column \"%s\" of table \"%s\": %s",
642 attr, te->tag, PQerrorMessage(AH->blobConnection));
644 if (PQresultStatus(uRes) != PGRES_COMMAND_OK)
645 die_horribly(AH, modulename,
646 "error while updating column \"%s\" of table \"%s\": %s",
647 attr, te->tag, PQerrorMessage(AH->blobConnection));
653 destroyPQExpBuffer(tblName);
654 destroyPQExpBuffer(tblQry);
658 * Convenient SQL calls
661 CreateBlobXrefTable(ArchiveHandle *AH)
663 PQExpBuffer qry = createPQExpBuffer();
665 /* IF we don't have a BLOB connection, then create one */
666 if (!AH->blobConnection)
667 AH->blobConnection = _connectDB(AH, NULL, NULL);
669 ahlog(AH, 1, "creating table for large object cross-references\n");
671 appendPQExpBuffer(qry, "Create Temporary Table %s(oldOid pg_catalog.oid, newOid pg_catalog.oid);", BLOB_XREF_TABLE);
673 ExecuteSqlCommand(AH, qry, "could not create large object cross-reference table", true);
675 resetPQExpBuffer(qry);
677 appendPQExpBuffer(qry, "Create Unique Index %s_ix on %s(oldOid)", BLOB_XREF_TABLE, BLOB_XREF_TABLE);
678 ExecuteSqlCommand(AH, qry, "could not create index on large object cross-reference table", true);
680 destroyPQExpBuffer(qry);
684 InsertBlobXref(ArchiveHandle *AH, Oid old, Oid new)
686 PQExpBuffer qry = createPQExpBuffer();
688 appendPQExpBuffer(qry,
689 "Insert Into %s(oldOid, newOid) Values ('%u', '%u');",
690 BLOB_XREF_TABLE, old, new);
692 ExecuteSqlCommand(AH, qry, "could not create large object cross-reference entry", true);
694 destroyPQExpBuffer(qry);
698 StartTransaction(ArchiveHandle *AH)
700 PQExpBuffer qry = createPQExpBuffer();
702 appendPQExpBuffer(qry, "Begin;");
704 ExecuteSqlCommand(AH, qry, "could not start database transaction", false);
707 destroyPQExpBuffer(qry);
711 StartTransactionXref(ArchiveHandle *AH)
713 PQExpBuffer qry = createPQExpBuffer();
715 appendPQExpBuffer(qry, "Begin;");
717 ExecuteSqlCommand(AH, qry,
718 "could not start transaction for large object cross-references", true);
719 AH->blobTxActive = true;
721 destroyPQExpBuffer(qry);
725 CommitTransaction(ArchiveHandle *AH)
727 PQExpBuffer qry = createPQExpBuffer();
729 appendPQExpBuffer(qry, "Commit;");
731 ExecuteSqlCommand(AH, qry, "could not commit database transaction", false);
732 AH->txActive = false;
734 destroyPQExpBuffer(qry);
738 CommitTransactionXref(ArchiveHandle *AH)
740 PQExpBuffer qry = createPQExpBuffer();
742 appendPQExpBuffer(qry, "Commit;");
744 ExecuteSqlCommand(AH, qry, "could not commit transaction for large object cross-references", true);
745 AH->blobTxActive = false;
747 destroyPQExpBuffer(qry);