* Implements the basic DB functions used by the archiver.
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v 1.48 2003/06/22 00:56:58 tgl Exp $
+ * src/bin/pg_dump/pg_backup_db.c
*
*-------------------------------------------------------------------------
*/
-
-#include "pg_backup.h"
-#include "pg_backup_archiver.h"
-#include "pg_backup_db.h"
-#include "dumputils.h"
+#include "postgres_fe.h"
#include <unistd.h>
#include <ctype.h>
-
#ifdef HAVE_TERMIOS_H
#include <termios.h>
#endif
-#include "libpq-fe.h"
-#include "libpq/libpq-fs.h"
-#ifndef HAVE_STRDUP
-#include "strdup.h"
-#endif
-
-static const char *modulename = gettext_noop("archiver (db)");
+#include "dumputils.h"
+#include "fe_utils/connect.h"
+#include "fe_utils/string_utils.h"
+#include "parallel.h"
+#include "pg_backup_archiver.h"
+#include "pg_backup_db.h"
+#include "pg_backup_utils.h"
-static void _check_database_version(ArchiveHandle *AH, bool ignoreVersion);
+static void _check_database_version(ArchiveHandle *AH);
static PGconn *_connectDB(ArchiveHandle *AH, const char *newdbname, const char *newUser);
-static int _executeSqlCommand(ArchiveHandle *AH, PGconn *conn, PQExpBuffer qry, char *desc);
static void notice_processor(void *arg, const char *message);
-static char *_sendSQLLine(ArchiveHandle *AH, char *qry, char *eos);
-static char *_sendCopyLine(ArchiveHandle *AH, char *qry, char *eos);
-
-
-static int
-_parse_version(ArchiveHandle *AH, const char *versionString)
-{
- int v;
-
- v = parse_version(versionString);
- if (v < 0)
- die_horribly(AH, modulename, "unable to parse version string \"%s\"\n", versionString);
-
- return v;
-}
static void
-_check_database_version(ArchiveHandle *AH, bool ignoreVersion)
+_check_database_version(ArchiveHandle *AH)
{
- int myversion;
const char *remoteversion_str;
int remoteversion;
-
- myversion = _parse_version(AH, PG_VERSION);
+ PGresult *res;
remoteversion_str = PQparameterStatus(AH->connection, "server_version");
- if (!remoteversion_str)
- die_horribly(AH, modulename, "could not get server_version from libpq\n");
-
- remoteversion = _parse_version(AH, remoteversion_str);
+ remoteversion = PQserverVersion(AH->connection);
+ if (remoteversion == 0 || !remoteversion_str)
+ fatal("could not get server_version from libpq");
+ AH->public.remoteVersionStr = pg_strdup(remoteversion_str);
AH->public.remoteVersion = remoteversion;
+ if (!AH->archiveRemoteVersion)
+ AH->archiveRemoteVersion = AH->public.remoteVersionStr;
- if (myversion != remoteversion
+ if (remoteversion != PG_VERSION_NUM
&& (remoteversion < AH->public.minRemoteVersion ||
remoteversion > AH->public.maxRemoteVersion))
{
- write_msg(NULL, "server version: %s; %s version: %s\n",
- remoteversion_str, progname, PG_VERSION);
- if (ignoreVersion)
- write_msg(NULL, "proceeding despite version mismatch\n");
- else
- die_horribly(AH, NULL, "aborting because of version mismatch (Use the -i option to proceed anyway.)\n");
+ pg_log_error("server version: %s; %s version: %s",
+ remoteversion_str, progname, PG_VERSION);
+ fatal("aborting because of server version mismatch");
+ }
+
+ /*
+ * When running against 9.0 or later, check if we are in recovery mode,
+ * which means we are on a hot standby.
+ */
+ if (remoteversion >= 90000)
+ {
+ res = ExecuteSqlQueryForSingleRow((Archive *) AH, "SELECT pg_catalog.pg_is_in_recovery()");
+
+ AH->public.isStandby = (strcmp(PQgetvalue(res, 0, 0), "t") == 0);
+ PQclear(res);
}
+ else
+ AH->public.isStandby = false;
}
/*
* Reconnect to the server. If dbname is not NULL, use that database,
* else the one associated with the archive handle. If username is
- * not NULL, use that user name, else the one from the handle. If
- * both the database and the user and match the existing connection
- * already, nothing will be done.
- *
- * Returns 1 in any case.
+ * not NULL, use that user name, else the one from the handle.
*/
-int
+void
ReconnectToServer(ArchiveHandle *AH, const char *dbname, const char *username)
{
PGconn *newConn;
else
newusername = username;
- /* Let's see if the request is already satisfied */
- if (strcmp(newusername, PQuser(AH->connection)) == 0
- && strcmp(newdbname, PQdb(AH->connection)) == 0)
- return 1;
-
newConn = _connectDB(AH, newdbname, newusername);
+ /* Update ArchiveHandle's connCancel before closing old connection */
+ set_archive_cancel_info(AH, newConn);
+
PQfinish(AH->connection);
AH->connection = newConn;
- /* don't assume we still know the output schema */
- if (AH->currSchema)
- free(AH->currSchema);
- AH->currSchema = strdup("");
-
- return 1;
+ /* Start strict; later phases may override this. */
+ PQclear(ExecuteSqlQueryForSingleRow((Archive *) AH,
+ ALWAYS_SECURE_SEARCH_PATH_SQL));
}
/*
* Connect to the db again.
+ *
+ * Note: it's not really all that sensible to use a single-entry password
+ * cache if the username keeps changing. In current usage, however, the
+ * username never does change, so one savedPassword is sufficient. We do
+ * update the cache on the off chance that the password has changed since the
+ * start of the run.
*/
static PGconn *
_connectDB(ArchiveHandle *AH, const char *reqdb, const char *requser)
{
- int need_pass;
+ PQExpBufferData connstr;
PGconn *newConn;
- char *password = NULL;
- int badPwd = 0;
- int noPwd = 0;
- char *newdb;
- char *newuser;
+ const char *newdb;
+ const char *newuser;
+ char *password;
+ char passbuf[100];
+ bool new_pass;
if (!reqdb)
newdb = PQdb(AH->connection);
else
- newdb = (char *) reqdb;
+ newdb = reqdb;
- if (!requser || (strlen(requser) == 0))
+ if (!requser || strlen(requser) == 0)
newuser = PQuser(AH->connection);
else
- newuser = (char *) requser;
+ newuser = requser;
- ahlog(AH, 1, "connecting to database %s as user %s\n", newdb, newuser);
+ pg_log_info("connecting to database \"%s\" as user \"%s\"",
+ newdb, newuser);
- if (AH->requirePassword)
+ password = AH->savedPassword;
+
+ if (AH->promptPassword == TRI_YES && password == NULL)
{
- password = simple_prompt("Password: ", 100, false);
- if (password == NULL)
- die_horribly(AH, modulename, "out of memory\n");
+ simple_prompt("Password: ", passbuf, sizeof(passbuf), false);
+ password = passbuf;
}
+ initPQExpBuffer(&connstr);
+ appendPQExpBufferStr(&connstr, "dbname=");
+ appendConnStrVal(&connstr, newdb);
+
do
{
- need_pass = false;
- newConn = PQsetdbLogin(PQhost(AH->connection), PQport(AH->connection),
- NULL, NULL, newdb,
- newuser, password);
+ const char *keywords[7];
+ const char *values[7];
+
+ keywords[0] = "host";
+ values[0] = PQhost(AH->connection);
+ keywords[1] = "port";
+ values[1] = PQport(AH->connection);
+ keywords[2] = "user";
+ values[2] = newuser;
+ keywords[3] = "password";
+ values[3] = password;
+ keywords[4] = "dbname";
+ values[4] = connstr.data;
+ keywords[5] = "fallback_application_name";
+ values[5] = progname;
+ keywords[6] = NULL;
+ values[6] = NULL;
+
+ new_pass = false;
+ newConn = PQconnectdbParams(keywords, values, true);
+
if (!newConn)
- die_horribly(AH, modulename, "failed to reconnect to database\n");
+ fatal("could not reconnect to database");
if (PQstatus(newConn) == CONNECTION_BAD)
{
- noPwd = (strcmp(PQerrorMessage(newConn),
- "fe_sendauth: no password supplied\n") == 0);
- badPwd = (strncmp(PQerrorMessage(newConn),
- "Password authentication failed for user", 39) == 0);
-
- if (noPwd || badPwd)
- {
+ if (!PQconnectionNeedsPassword(newConn))
+ fatal("could not reconnect to database: %s",
+ PQerrorMessage(newConn));
+ PQfinish(newConn);
- if (badPwd)
- fprintf(stderr, "Password incorrect\n");
+ if (password)
+ fprintf(stderr, "Password incorrect\n");
- fprintf(stderr, "Connecting to %s as %s\n",
- PQdb(AH->connection), newuser);
+ fprintf(stderr, "Connecting to %s as %s\n",
+ newdb, newuser);
- need_pass = true;
- if (password)
- free(password);
- password = simple_prompt("Password: ", 100, false);
+ if (AH->promptPassword != TRI_NO)
+ {
+ simple_prompt("Password: ", passbuf, sizeof(passbuf), false);
+ password = passbuf;
}
else
- die_horribly(AH, modulename, "could not reconnect to database: %s",
- PQerrorMessage(newConn));
+ fatal("connection needs password");
+
+ new_pass = true;
}
- } while (need_pass);
+ } while (new_pass);
+
+ /*
+ * We want to remember connection's actual password, whether or not we got
+ * it by prompting. So we don't just store the password variable.
+ */
+ if (PQconnectionUsedPassword(newConn))
+ {
+ if (AH->savedPassword)
+ free(AH->savedPassword);
+ AH->savedPassword = pg_strdup(PQpass(newConn));
+ }
- if (password)
- free(password);
+ termPQExpBuffer(&connstr);
/* check for version mismatch */
- _check_database_version(AH, true);
+ _check_database_version(AH);
PQsetNoticeProcessor(newConn, notice_processor, NULL);
* Make a database connection with the given parameters. The
* connection handle is returned, the parameters are stored in AHX.
* An interactive password prompt is automatically issued if required.
+ *
+ * Note: it's not really all that sensible to use a single-entry password
+ * cache if the username keeps changing. In current usage, however, the
+ * username never does change, so one savedPassword is sufficient.
*/
-PGconn *
+void
ConnectDatabase(Archive *AHX,
const char *dbname,
const char *pghost,
const char *pgport,
const char *username,
- const int reqPwd,
- const int ignoreVersion)
+ trivalue prompt_password)
{
ArchiveHandle *AH = (ArchiveHandle *) AHX;
- char *password = NULL;
- bool need_pass = false;
+ char *password;
+ char passbuf[100];
+ bool new_pass;
if (AH->connection)
- die_horribly(AH, modulename, "already connected to a database\n");
+ fatal("already connected to a database");
+
+ password = AH->savedPassword;
- if (reqPwd)
+ if (prompt_password == TRI_YES && password == NULL)
{
- password = simple_prompt("Password: ", 100, false);
- if (password == NULL)
- die_horribly(AH, modulename, "out of memory\n");
- AH->requirePassword = true;
+ simple_prompt("Password: ", passbuf, sizeof(passbuf), false);
+ password = passbuf;
}
- else
- AH->requirePassword = false;
+ AH->promptPassword = prompt_password;
/*
- * Start the connection. Loop until we have a password if requested
- * by backend.
+ * Start the connection. Loop until we have a password if requested by
+ * backend.
*/
do
{
- need_pass = false;
- AH->connection = PQsetdbLogin(pghost, pgport, NULL, NULL,
- dbname, username, password);
+ const char *keywords[7];
+ const char *values[7];
+
+ keywords[0] = "host";
+ values[0] = pghost;
+ keywords[1] = "port";
+ values[1] = pgport;
+ keywords[2] = "user";
+ values[2] = username;
+ keywords[3] = "password";
+ values[3] = password;
+ keywords[4] = "dbname";
+ values[4] = dbname;
+ keywords[5] = "fallback_application_name";
+ values[5] = progname;
+ keywords[6] = NULL;
+ values[6] = NULL;
+
+ new_pass = false;
+ AH->connection = PQconnectdbParams(keywords, values, true);
if (!AH->connection)
- die_horribly(AH, modulename, "failed to connect to database\n");
+ fatal("could not connect to database");
if (PQstatus(AH->connection) == CONNECTION_BAD &&
- strcmp(PQerrorMessage(AH->connection), "fe_sendauth: no password supplied\n") == 0 &&
- !feof(stdin))
+ PQconnectionNeedsPassword(AH->connection) &&
+ password == NULL &&
+ prompt_password != TRI_NO)
{
PQfinish(AH->connection);
- need_pass = true;
- free(password);
- password = NULL;
- password = simple_prompt("Password: ", 100, false);
+ simple_prompt("Password: ", passbuf, sizeof(passbuf), false);
+ password = passbuf;
+ new_pass = true;
}
- } while (need_pass);
-
- if (password)
- free(password);
+ } while (new_pass);
/* check to see that the backend connection was successfully made */
if (PQstatus(AH->connection) == CONNECTION_BAD)
- die_horribly(AH, modulename, "connection to database \"%s\" failed: %s",
- PQdb(AH->connection), PQerrorMessage(AH->connection));
+ fatal("connection to database \"%s\" failed: %s",
+ PQdb(AH->connection) ? PQdb(AH->connection) : "",
+ PQerrorMessage(AH->connection));
- /* check for version mismatch */
- _check_database_version(AH, ignoreVersion);
-
- PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
-
- return AH->connection;
-}
+ /* Start strict; later phases may override this. */
+ PQclear(ExecuteSqlQueryForSingleRow((Archive *) AH,
+ ALWAYS_SECURE_SEARCH_PATH_SQL));
+ /*
+ * We want to remember connection's actual password, whether or not we got
+ * it by prompting. So we don't just store the password variable.
+ */
+ if (PQconnectionUsedPassword(AH->connection))
+ {
+ if (AH->savedPassword)
+ free(AH->savedPassword);
+ AH->savedPassword = pg_strdup(PQpass(AH->connection));
+ }
-static void
-notice_processor(void *arg, const char *message)
-{
- write_msg(NULL, "%s", message);
-}
+ /* check for version mismatch */
+ _check_database_version(AH);
+ PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
-/* Public interface */
-/* Convenience function to send a query. Monitors result to handle COPY statements */
-int
-ExecuteSqlCommand(ArchiveHandle *AH, PQExpBuffer qry, char *desc, bool use_blob)
-{
- if (use_blob)
- return _executeSqlCommand(AH, AH->blobConnection, qry, desc);
- else
- return _executeSqlCommand(AH, AH->connection, qry, desc);
+ /* arrange for SIGINT to issue a query cancel on this connection */
+ set_archive_cancel_info(AH, AH->connection);
}
/*
- * Handle command execution. This is used to execute a command on more than one connection,
- * but the 'pgCopyIn' setting assumes the COPY commands are ONLY executed on the primary
- * setting...an error will be raised otherwise.
+ * Close the connection to the database and also cancel off the query if we
+ * have one running.
*/
-static int
-_executeSqlCommand(ArchiveHandle *AH, PGconn *conn, PQExpBuffer qry, char *desc)
+void
+DisconnectDatabase(Archive *AHX)
{
- PGresult *res;
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ char errbuf[1];
- /* fprintf(stderr, "Executing: '%s'\n\n", qry->data); */
- res = PQexec(conn, qry->data);
- if (!res)
- die_horribly(AH, modulename, "%s: no result from server\n", desc);
+ if (!AH->connection)
+ return;
- if (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)
+ if (AH->connCancel)
{
- if (PQresultStatus(res) == PGRES_COPY_IN)
- {
- if (conn != AH->connection)
- die_horribly(AH, modulename, "COPY command executed in non-primary connection\n");
+ /*
+ * If we have an active query, send a cancel before closing, ignoring
+ * any errors. This is of no use for a normal exit, but might be
+ * helpful during fatal().
+ */
+ if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
+ (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
- AH->pgCopyIn = 1;
- }
- else
- die_horribly(AH, modulename, "%s: %s",
- desc, PQerrorMessage(AH->connection));
+ /*
+ * Prevent signal handler from sending a cancel after this.
+ */
+ set_archive_cancel_info(AH, NULL);
}
- PQclear(res);
+ PQfinish(AH->connection);
+ AH->connection = NULL;
+}
- return strlen(qry->data);
+PGconn *
+GetConnection(Archive *AHX)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+ return AH->connection;
}
-/*
- * Used by ExecuteSqlCommandBuf to send one buffered line when running a COPY command.
- */
-static char *
-_sendCopyLine(ArchiveHandle *AH, char *qry, char *eos)
+static void
+notice_processor(void *arg, const char *message)
{
- size_t loc; /* Location of next newline */
- int pos = 0; /* Current position */
- int sPos = 0; /* Last pos of a slash char */
- int isEnd = 0;
+ pg_log_generic(PG_LOG_INFO, "%s", message);
+}
- /* loop to find unquoted newline ending the line of COPY data */
- for (;;)
- {
- loc = strcspn(&qry[pos], "\n") + pos;
+/* Like fatal(), but with a complaint about a particular query. */
+static void
+die_on_query_failure(ArchiveHandle *AH, const char *query)
+{
+ pg_log_error("query failed: %s",
+ PQerrorMessage(AH->connection));
+ fatal("query was: %s", query);
+}
- /* If no match, then wait */
- if (loc >= (eos - qry)) /* None found */
- {
- appendBinaryPQExpBuffer(AH->pgCopyBuf, qry, (eos - qry));
- return eos;
- }
+void
+ExecuteSqlStatement(Archive *AHX, const char *query)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ PGresult *res;
- /*
- * fprintf(stderr, "Found cr at %d, prev char was %c, next was
- * %c\n", loc, qry[loc-1], qry[loc+1]);
- */
+ res = PQexec(AH->connection, query);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ die_on_query_failure(AH, query);
+ PQclear(res);
+}
- /* Count the number of preceding slashes */
- sPos = loc;
- while (sPos > 0 && qry[sPos - 1] == '\\')
- sPos--;
+PGresult *
+ExecuteSqlQuery(Archive *AHX, const char *query, ExecStatusType status)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ PGresult *res;
- sPos = loc - sPos;
+ res = PQexec(AH->connection, query);
+ if (PQresultStatus(res) != status)
+ die_on_query_failure(AH, query);
+ return res;
+}
- /*
- * If an odd number of preceding slashes, then \n was escaped so
- * set the next search pos, and loop (if any left).
- */
- if ((sPos & 1) == 1)
- {
- /* fprintf(stderr, "cr was escaped\n"); */
- pos = loc + 1;
- if (pos >= (eos - qry))
- {
- appendBinaryPQExpBuffer(AH->pgCopyBuf, qry, (eos - qry));
- return eos;
- }
- }
- else
- break;
- }
+/*
+ * Execute an SQL query and verify that we got exactly one row back.
+ */
+PGresult *
+ExecuteSqlQueryForSingleRow(Archive *fout, const char *query)
+{
+ PGresult *res;
+ int ntups;
- /* We found an unquoted newline */
- qry[loc] = '\0';
- appendPQExpBuffer(AH->pgCopyBuf, "%s\n", qry);
- isEnd = (strcmp(AH->pgCopyBuf->data, "\\.\n") == 0);
+ res = ExecuteSqlQuery(fout, query, PGRES_TUPLES_OK);
- /*---------
- * fprintf(stderr, "Sending '%s' via
- * COPY (at end = %d)\n\n", AH->pgCopyBuf->data, isEnd);
- *---------
- */
+ /* Expecting a single result only */
+ ntups = PQntuples(res);
+ if (ntups != 1)
+ fatal(ngettext("query returned %d row instead of one: %s",
+ "query returned %d rows instead of one: %s",
+ ntups),
+ ntups, query);
- if (PQputline(AH->connection, AH->pgCopyBuf->data) != 0)
- die_horribly(AH, modulename, "error returned by PQputline\n");
+ return res;
+}
- resetPQExpBuffer(AH->pgCopyBuf);
+/*
+ * Convenience function to send a query.
+ * Monitors result to detect COPY statements
+ */
+static void
+ExecuteSqlCommand(ArchiveHandle *AH, const char *qry, const char *desc)
+{
+ PGconn *conn = AH->connection;
+ PGresult *res;
- /*
- * fprintf(stderr, "Buffer is '%s'\n", AH->pgCopyBuf->data);
- */
+#ifdef NOT_USED
+ fprintf(stderr, "Executing: '%s'\n\n", qry);
+#endif
+ res = PQexec(conn, qry);
- if (isEnd)
+ switch (PQresultStatus(res))
{
- if (PQendcopy(AH->connection) != 0)
- die_horribly(AH, modulename, "error returned by PQendcopy\n");
-
- AH->pgCopyIn = 0;
+ case PGRES_COMMAND_OK:
+ case PGRES_TUPLES_OK:
+ case PGRES_EMPTY_QUERY:
+ /* A-OK */
+ break;
+ case PGRES_COPY_IN:
+ /* Assume this is an expected result */
+ AH->pgCopyIn = true;
+ break;
+ default:
+ /* trouble */
+ warn_or_exit_horribly(AH, "%s: %sCommand was: %s",
+ desc, PQerrorMessage(conn), qry);
+ break;
}
- return qry + loc + 1;
+ PQclear(res);
}
+
/*
- * Used by ExecuteSqlCommandBuf to send one buffered line of SQL (not data for the copy command).
+ * Process non-COPY table data (that is, INSERT commands).
+ *
+ * The commands have been run together as one long string for compressibility,
+ * and we are receiving them in bufferloads with arbitrary boundaries, so we
+ * have to locate command boundaries and save partial commands across calls.
+ * All state must be kept in AH->sqlparse, not in local variables of this
+ * routine. We assume that AH->sqlparse was filled with zeroes when created.
+ *
+ * We have to lex the data to the extent of identifying literals and quoted
+ * identifiers, so that we can recognize statement-terminating semicolons.
+ * We assume that INSERT data will not contain SQL comments, E'' literals,
+ * or dollar-quoted strings, so this is much simpler than a full SQL lexer.
+ *
+ * Note: when restoring from a pre-9.0 dump file, this code is also used to
+ * process BLOB COMMENTS data, which has the same problem of containing
+ * multiple SQL commands that might be split across bufferloads. Fortunately,
+ * that data won't contain anything complicated to lex either.
*/
-static char *
-_sendSQLLine(ArchiveHandle *AH, char *qry, char *eos)
+static void
+ExecuteSimpleCommands(ArchiveHandle *AH, const char *buf, size_t bufLen)
{
- int pos = 0; /* Current position */
+ const char *qry = buf;
+ const char *eos = buf + bufLen;
- /*
- * The following is a mini state machine to assess the end of an SQL
- * statement. It really only needs to parse good SQL, or at least
- * that's the theory... End-of-statement is assumed to be an unquoted,
- * un commented semi-colon.
- */
+ /* initialize command buffer if first time through */
+ if (AH->sqlparse.curCmd == NULL)
+ AH->sqlparse.curCmd = createPQExpBuffer();
- /*
- * fprintf(stderr, "Buffer at start is: '%s'\n\n", AH->sqlBuf->data);
- */
-
- for (pos = 0; pos < (eos - qry); pos++)
+ for (; qry < eos; qry++)
{
- appendPQExpBufferChar(AH->sqlBuf, qry[pos]);
- /* fprintf(stderr, " %c",qry[pos]); */
+ char ch = *qry;
+
+ /* For neatness, we skip any newlines between commands */
+ if (!(ch == '\n' && AH->sqlparse.curCmd->len == 0))
+ appendPQExpBufferChar(AH->sqlparse.curCmd, ch);
switch (AH->sqlparse.state)
{
-
case SQL_SCAN: /* Default state == 0, set in _allocAH */
- if (qry[pos] == ';' && AH->sqlparse.braceDepth == 0)
+ if (ch == ';')
{
- /* Send It & reset the buffer */
-
- /*
- * fprintf(stderr, " sending: '%s'\n\n",
- * AH->sqlBuf->data);
- */
- ExecuteSqlCommand(AH, AH->sqlBuf, "could not execute query", false);
- resetPQExpBuffer(AH->sqlBuf);
- AH->sqlparse.lastChar = '\0';
-
/*
- * Remove any following newlines - so that embedded
- * COPY commands don't get a starting newline.
+ * We've found the end of a statement. Send it and reset
+ * the buffer.
*/
- pos++;
- for (; pos < (eos - qry) && qry[pos] == '\n'; pos++);
-
- /* We've got our line, so exit */
- return qry + pos;
+ ExecuteSqlCommand(AH, AH->sqlparse.curCmd->data,
+ "could not execute query");
+ resetPQExpBuffer(AH->sqlparse.curCmd);
}
- else
+ else if (ch == '\'')
{
- if (qry[pos] == '"' || qry[pos] == '\'')
- {
- /* fprintf(stderr,"[startquote]\n"); */
- AH->sqlparse.state = SQL_IN_QUOTE;
- AH->sqlparse.quoteChar = qry[pos];
- AH->sqlparse.backSlash = 0;
- }
- else if (qry[pos] == '-' && AH->sqlparse.lastChar == '-')
- AH->sqlparse.state = SQL_IN_SQL_COMMENT;
- else if (qry[pos] == '*' && AH->sqlparse.lastChar == '/')
- AH->sqlparse.state = SQL_IN_EXT_COMMENT;
- else if (qry[pos] == '(')
- AH->sqlparse.braceDepth++;
- else if (qry[pos] == ')')
- AH->sqlparse.braceDepth--;
-
- AH->sqlparse.lastChar = qry[pos];
+ AH->sqlparse.state = SQL_IN_SINGLE_QUOTE;
+ AH->sqlparse.backSlash = false;
+ }
+ else if (ch == '"')
+ {
+ AH->sqlparse.state = SQL_IN_DOUBLE_QUOTE;
}
break;
- case SQL_IN_SQL_COMMENT:
- if (qry[pos] == '\n')
- AH->sqlparse.state = SQL_SCAN;
- break;
-
- case SQL_IN_EXT_COMMENT:
- if (AH->sqlparse.lastChar == '*' && qry[pos] == '/')
+ case SQL_IN_SINGLE_QUOTE:
+ /* We needn't handle '' specially */
+ if (ch == '\'' && !AH->sqlparse.backSlash)
AH->sqlparse.state = SQL_SCAN;
+ else if (ch == '\\' && !AH->public.std_strings)
+ AH->sqlparse.backSlash = !AH->sqlparse.backSlash;
+ else
+ AH->sqlparse.backSlash = false;
break;
- case SQL_IN_QUOTE:
- if (!AH->sqlparse.backSlash && AH->sqlparse.quoteChar == qry[pos])
- {
- /* fprintf(stderr,"[endquote]\n"); */
+ case SQL_IN_DOUBLE_QUOTE:
+ /* We needn't handle "" specially */
+ if (ch == '"')
AH->sqlparse.state = SQL_SCAN;
- }
- else
- {
-
- if (qry[pos] == '\\')
- {
- if (AH->sqlparse.lastChar == '\\')
- AH->sqlparse.backSlash = !AH->sqlparse.backSlash;
- else
- AH->sqlparse.backSlash = 1;
- }
- else
- AH->sqlparse.backSlash = 0;
- }
break;
-
}
- AH->sqlparse.lastChar = qry[pos];
- /* fprintf(stderr, "\n"); */
}
-
- /*
- * If we get here, we've processed entire string with no complete SQL
- * stmt
- */
- return eos;
}
-/* Convenience function to send one or more queries. Monitors result to handle COPY statements */
+/*
+ * Implement ahwrite() for direct-to-DB restore
+ */
int
-ExecuteSqlCommandBuf(ArchiveHandle *AH, void *qryv, size_t bufLen)
+ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen)
{
- char *qry = (char *) qryv;
- char *eos = qry + bufLen;
-
- /*
- * fprintf(stderr, "\n\n*****\n
- * Buffer:\n\n%s\n*******************\n\n", qry);
- */
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
- /* Could switch between command and COPY IN mode at each line */
- while (qry < eos)
+ if (AH->outputKind == OUTPUT_COPYDATA)
{
- if (AH->pgCopyIn)
- qry = _sendCopyLine(AH, qry, eos);
- else
- qry = _sendSQLLine(AH, qry, eos);
+ /*
+ * COPY data.
+ *
+ * We drop the data on the floor if libpq has failed to enter COPY
+ * mode; this allows us to behave reasonably when trying to continue
+ * after an error in a COPY command.
+ */
+ if (AH->pgCopyIn &&
+ PQputCopyData(AH->connection, buf, bufLen) <= 0)
+ fatal("error returned by PQputCopyData: %s",
+ PQerrorMessage(AH->connection));
}
-
- return 1;
-}
-
-void
-FixupBlobRefs(ArchiveHandle *AH, TocEntry *te)
-{
- PQExpBuffer tblName;
- PQExpBuffer tblQry;
- PGresult *res,
- *uRes;
- int i,
- n;
-
- if (strcmp(te->tag, BLOB_XREF_TABLE) == 0)
- return;
-
- tblName = createPQExpBuffer();
- tblQry = createPQExpBuffer();
-
- if (te->namespace && strlen(te->namespace) > 0)
- appendPQExpBuffer(tblName, "%s.",
- fmtId(te->namespace));
- appendPQExpBuffer(tblName, "%s",
- fmtId(te->tag));
-
- appendPQExpBuffer(tblQry,
- "SELECT a.attname, t.typname FROM "
- "pg_catalog.pg_attribute a, pg_catalog.pg_type t "
- "WHERE a.attnum > 0 AND a.attrelid = '%s'::pg_catalog.regclass "
- "AND a.atttypid = t.oid AND t.typname in ('oid', 'lo')",
- tblName->data);
-
- res = PQexec(AH->blobConnection, tblQry->data);
- if (!res)
- die_horribly(AH, modulename, "could not find oid columns of table \"%s\": %s",
- te->tag, PQerrorMessage(AH->connection));
-
- if ((n = PQntuples(res)) == 0)
+ else if (AH->outputKind == OUTPUT_OTHERDATA)
{
- /* nothing to do */
- ahlog(AH, 1, "no OID type columns in table %s\n", te->tag);
+ /*
+ * Table data expressed as INSERT commands; or, in old dump files,
+ * BLOB COMMENTS data (which is expressed as COMMENT ON commands).
+ */
+ ExecuteSimpleCommands(AH, buf, bufLen);
}
-
- for (i = 0; i < n; i++)
+ else
{
- char *attr;
- char *typname;
- bool typeisoid;
-
- attr = PQgetvalue(res, i, 0);
- typname = PQgetvalue(res, i, 1);
-
- typeisoid = (strcmp(typname, "oid") == 0);
-
- ahlog(AH, 1, "fixing large object cross-references for %s.%s\n",
- te->tag, attr);
-
- resetPQExpBuffer(tblQry);
-
/*
- * Note: we use explicit typename() cast style here because if we
- * are dealing with a dump from a pre-7.3 database containing LO
- * columns, the dump probably will not have CREATE CAST commands
- * for lo<->oid conversions. What it will have is functions,
- * which we will invoke as functions.
+ * General SQL commands; we assume that commands will not be split
+ * across calls.
+ *
+ * In most cases the data passed to us will be a null-terminated
+ * string, but if it's not, we have to add a trailing null.
*/
-
- /* Can't use fmtId more than once per call... */
- appendPQExpBuffer(tblQry,
- "UPDATE %s SET %s = ",
- tblName->data, fmtId(attr));
- if (typeisoid)
- appendPQExpBuffer(tblQry,
- "%s.newOid",
- BLOB_XREF_TABLE);
+ if (buf[bufLen] == '\0')
+ ExecuteSqlCommand(AH, buf, "could not execute query");
else
- appendPQExpBuffer(tblQry,
- "%s(%s.newOid)",
- fmtId(typname),
- BLOB_XREF_TABLE);
- appendPQExpBuffer(tblQry,
- " FROM %s WHERE %s.oldOid = ",
- BLOB_XREF_TABLE,
- BLOB_XREF_TABLE);
- if (typeisoid)
- appendPQExpBuffer(tblQry,
- "%s.%s",
- tblName->data, fmtId(attr));
- else
- appendPQExpBuffer(tblQry,
- "oid(%s.%s)",
- tblName->data, fmtId(attr));
-
- ahlog(AH, 10, "SQL: %s\n", tblQry->data);
-
- uRes = PQexec(AH->blobConnection, tblQry->data);
- if (!uRes)
- die_horribly(AH, modulename,
- "could not update column \"%s\" of table \"%s\": %s",
- attr, te->tag, PQerrorMessage(AH->blobConnection));
-
- if (PQresultStatus(uRes) != PGRES_COMMAND_OK)
- die_horribly(AH, modulename,
- "error while updating column \"%s\" of table \"%s\": %s",
- attr, te->tag, PQerrorMessage(AH->blobConnection));
+ {
+ char *str = (char *) pg_malloc(bufLen + 1);
- PQclear(uRes);
+ memcpy(str, buf, bufLen);
+ str[bufLen] = '\0';
+ ExecuteSqlCommand(AH, str, "could not execute query");
+ free(str);
+ }
}
- PQclear(res);
- destroyPQExpBuffer(tblName);
- destroyPQExpBuffer(tblQry);
-}
-
-/**********
- * Convenient SQL calls
- **********/
-void
-CreateBlobXrefTable(ArchiveHandle *AH)
-{
- PQExpBuffer qry = createPQExpBuffer();
-
- /* IF we don't have a BLOB connection, then create one */
- if (!AH->blobConnection)
- AH->blobConnection = _connectDB(AH, NULL, NULL);
-
- ahlog(AH, 1, "creating table for large object cross-references\n");
-
- appendPQExpBuffer(qry, "Create Temporary Table %s(oldOid pg_catalog.oid, newOid pg_catalog.oid);", BLOB_XREF_TABLE);
-
- ExecuteSqlCommand(AH, qry, "could not create large object cross-reference table", true);
-
- resetPQExpBuffer(qry);
-
- appendPQExpBuffer(qry, "Create Unique Index %s_ix on %s(oldOid)", BLOB_XREF_TABLE, BLOB_XREF_TABLE);
- ExecuteSqlCommand(AH, qry, "could not create index on large object cross-reference table", true);
-
- destroyPQExpBuffer(qry);
+ return bufLen;
}
+/*
+ * Terminate a COPY operation during direct-to-DB restore
+ */
void
-InsertBlobXref(ArchiveHandle *AH, Oid old, Oid new)
+EndDBCopyMode(Archive *AHX, const char *tocEntryTag)
{
- PQExpBuffer qry = createPQExpBuffer();
-
- appendPQExpBuffer(qry,
- "Insert Into %s(oldOid, newOid) Values ('%u', '%u');",
- BLOB_XREF_TABLE, old, new);
-
- ExecuteSqlCommand(AH, qry, "could not create large object cross-reference entry", true);
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
- destroyPQExpBuffer(qry);
-}
+ if (AH->pgCopyIn)
+ {
+ PGresult *res;
-void
-StartTransaction(ArchiveHandle *AH)
-{
- PQExpBuffer qry = createPQExpBuffer();
+ if (PQputCopyEnd(AH->connection, NULL) <= 0)
+ fatal("error returned by PQputCopyEnd: %s",
+ PQerrorMessage(AH->connection));
- appendPQExpBuffer(qry, "Begin;");
+ /* Check command status and return to normal libpq state */
+ res = PQgetResult(AH->connection);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ warn_or_exit_horribly(AH, "COPY failed for table \"%s\": %s",
+ tocEntryTag, PQerrorMessage(AH->connection));
+ PQclear(res);
- ExecuteSqlCommand(AH, qry, "could not start database transaction", false);
- AH->txActive = true;
+ /* Do this to ensure we've pumped libpq back to idle state */
+ if (PQgetResult(AH->connection) != NULL)
+ pg_log_warning("unexpected extra results during COPY of table \"%s\"",
+ tocEntryTag);
- destroyPQExpBuffer(qry);
+ AH->pgCopyIn = false;
+ }
}
void
-StartTransactionXref(ArchiveHandle *AH)
+StartTransaction(Archive *AHX)
{
- PQExpBuffer qry = createPQExpBuffer();
-
- appendPQExpBuffer(qry, "Begin;");
-
- ExecuteSqlCommand(AH, qry,
- "could not start transaction for large object cross-references", true);
- AH->blobTxActive = true;
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
- destroyPQExpBuffer(qry);
+ ExecuteSqlCommand(AH, "BEGIN", "could not start database transaction");
}
void
-CommitTransaction(ArchiveHandle *AH)
+CommitTransaction(Archive *AHX)
{
- PQExpBuffer qry = createPQExpBuffer();
-
- appendPQExpBuffer(qry, "Commit;");
-
- ExecuteSqlCommand(AH, qry, "could not commit database transaction", false);
- AH->txActive = false;
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
- destroyPQExpBuffer(qry);
+ ExecuteSqlCommand(AH, "COMMIT", "could not commit database transaction");
}
void
-CommitTransactionXref(ArchiveHandle *AH)
+DropBlobIfExists(ArchiveHandle *AH, Oid oid)
{
- PQExpBuffer qry = createPQExpBuffer();
-
- appendPQExpBuffer(qry, "Commit;");
-
- ExecuteSqlCommand(AH, qry, "could not commit transaction for large object cross-references", true);
- AH->blobTxActive = false;
-
- destroyPQExpBuffer(qry);
+ /*
+ * If we are not restoring to a direct database connection, we have to
+ * guess about how to detect whether the blob exists. Assume new-style.
+ */
+ if (AH->connection == NULL ||
+ PQserverVersion(AH->connection) >= 90000)
+ {
+ ahprintf(AH,
+ "SELECT pg_catalog.lo_unlink(oid) "
+ "FROM pg_catalog.pg_largeobject_metadata "
+ "WHERE oid = '%u';\n",
+ oid);
+ }
+ else
+ {
+ /* Restoring to pre-9.0 server, so do it the old way */
+ ahprintf(AH,
+ "SELECT CASE WHEN EXISTS("
+ "SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'"
+ ") THEN pg_catalog.lo_unlink('%u') END;\n",
+ oid, oid);
+ }
}