#include "postgres_fdw.h"
+#include "access/htup_details.h"
+#include "catalog/pg_user_mapping.h"
#include "access/xact.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "storage/latch.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
+#include "utils/syscache.h"
/*
* one level of subxact open, etc */
bool have_prep_stmt; /* have we prepared any stmts in this xact? */
bool have_error; /* have any subxacts aborted in this xact? */
+ bool changing_xact_state; /* xact state change in process */
} ConnCacheEntry;
/*
SubTransactionId mySubid,
SubTransactionId parentSubid,
void *arg);
+static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
+static bool pgfdw_cancel_query(PGconn *conn);
+static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
+ bool ignore_errors);
+static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
+ PGresult **result);
/*
entry->xact_depth = 0;
entry->have_prep_stmt = false;
entry->have_error = false;
+ entry->changing_xact_state = false;
}
+ /* Reject further use of connections which failed abort cleanup. */
+ pgfdw_reject_incomplete_xact_state_change(entry);
+
/*
* We don't check the health of cached connection here, because it would
* require some overhead. Broken connection will be detected when the
{
PGresult *res;
- res = PQexec(conn, sql);
+ if (!PQsendQuery(conn, sql))
+ pgfdw_report_error(ERROR, NULL, conn, false, sql);
+ res = pgfdw_get_result(conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
else
sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
+ entry->changing_xact_state = true;
do_sql_command(entry->conn, sql);
entry->xact_depth = 1;
+ entry->changing_xact_state = false;
}
/*
char sql[64];
snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
+ entry->changing_xact_state = true;
do_sql_command(entry->conn, sql);
entry->xact_depth++;
+ entry->changing_xact_state = false;
}
}
/* If it has an open remote transaction, try to close it */
if (entry->xact_depth > 0)
{
+ bool abort_cleanup_failure = false;
+
elog(DEBUG3, "closing remote transaction on connection %p",
entry->conn);
{
case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_COMMIT:
+
+ /*
+ * If abort cleanup previously failed for this connection,
+ * we can't issue any more commands against it.
+ */
+ pgfdw_reject_incomplete_xact_state_change(entry);
+
/* Commit all remote transactions during pre-commit */
+ entry->changing_xact_state = true;
do_sql_command(entry->conn, "COMMIT TRANSACTION");
+ entry->changing_xact_state = false;
/*
* If there were any errors in subtransactions, and we
break;
case XACT_EVENT_PARALLEL_ABORT:
case XACT_EVENT_ABORT:
+
+ /*
+ * Don't try to clean up the connection if we're already
+ * in error recursion trouble.
+ */
+ if (in_error_recursion_trouble())
+ entry->changing_xact_state = true;
+
+ /*
+ * If connection is already unsalvageable, don't touch it
+ * further.
+ */
+ if (entry->changing_xact_state)
+ break;
+
+ /*
+ * Mark this connection as in the process of changing
+ * transaction state.
+ */
+ entry->changing_xact_state = true;
+
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
* command is still being processed by the remote server,
* and if so, request cancellation of the command.
*/
- if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+ if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
+ !pgfdw_cancel_query(entry->conn))
{
- PGcancel *cancel;
- char errbuf[256];
-
- if ((cancel = PQgetCancel(entry->conn)))
- {
- if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
- ereport(WARNING,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("could not send cancel request: %s",
- errbuf)));
- PQfreeCancel(cancel);
- }
+ /* Unable to cancel running query. */
+ abort_cleanup_failure = true;
+ }
+ else if (!pgfdw_exec_cleanup_query(entry->conn,
+ "ABORT TRANSACTION",
+ false))
+ {
+ /* Unable to abort remote transaction. */
+ abort_cleanup_failure = true;
+ }
+ else if (entry->have_prep_stmt && entry->have_error &&
+ !pgfdw_exec_cleanup_query(entry->conn,
+ "DEALLOCATE ALL",
+ true))
+ {
+ /* Trouble clearing prepared statements. */
+ abort_cleanup_failure = true;
}
-
- /* If we're aborting, abort all remote transactions too */
- res = PQexec(entry->conn, "ABORT TRANSACTION");
- /* Note: can't throw ERROR, it would be infinite loop */
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(WARNING, res, entry->conn, true,
- "ABORT TRANSACTION");
else
{
- PQclear(res);
- /* As above, make sure to clear any prepared stmts */
- if (entry->have_prep_stmt && entry->have_error)
- {
- res = PQexec(entry->conn, "DEALLOCATE ALL");
- PQclear(res);
- }
entry->have_prep_stmt = false;
entry->have_error = false;
}
+
+ /* Disarm changing_xact_state if it all worked. */
+ entry->changing_xact_state = abort_cleanup_failure;
break;
}
}
* recover. Next GetConnection will open a new connection.
*/
if (PQstatus(entry->conn) != CONNECTION_OK ||
- PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
+ PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
+ entry->changing_xact_state)
{
elog(DEBUG3, "discarding connection %p", entry->conn);
PQfinish(entry->conn);
entry->conn = NULL;
+ entry->changing_xact_state = false;
}
}
hash_seq_init(&scan, ConnectionHash);
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
{
- PGresult *res;
char sql[100];
/*
if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
{
+ /*
+ * If abort cleanup previously failed for this connection, we
+ * can't issue any more commands against it.
+ */
+ pgfdw_reject_incomplete_xact_state_change(entry);
+
/* Commit all remote subtransactions during pre-commit */
snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
+ entry->changing_xact_state = true;
do_sql_command(entry->conn, sql);
+ entry->changing_xact_state = false;
}
- else
+ else if (in_error_recursion_trouble())
+ {
+ /*
+ * Don't try to clean up the connection if we're already in error
+ * recursion trouble.
+ */
+ entry->changing_xact_state = true;
+ }
+ else if (!entry->changing_xact_state)
{
+ bool abort_cleanup_failure = false;
+
+ /* Remember that abort cleanup is in progress. */
+ entry->changing_xact_state = true;
+
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
* processed by the remote server, and if so, request cancellation
* of the command.
*/
- if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+ if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
+ !pgfdw_cancel_query(entry->conn))
+ abort_cleanup_failure = true;
+ else
{
- PGcancel *cancel;
- char errbuf[256];
-
- if ((cancel = PQgetCancel(entry->conn)))
- {
- if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
- ereport(WARNING,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("could not send cancel request: %s",
- errbuf)));
- PQfreeCancel(cancel);
- }
+ /* Rollback all remote subtransactions during abort */
+ snprintf(sql, sizeof(sql),
+ "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
+ curlevel, curlevel);
+ if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
+ abort_cleanup_failure = true;
}
- /* Rollback all remote subtransactions during abort */
- snprintf(sql, sizeof(sql),
- "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
- curlevel, curlevel);
- res = PQexec(entry->conn, sql);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(WARNING, res, entry->conn, true, sql);
- else
- PQclear(res);
+ /* Disarm changing_xact_state if it all worked. */
+ entry->changing_xact_state = abort_cleanup_failure;
}
/* OK, we're outta that level of subtransaction */
entry->xact_depth--;
}
}
+
+/*
+ * Raise an error if the given connection cache entry is marked as being
+ * in the middle of an xact state change. This should be called at which no
+ * such change is expected to be in progress; if one is found to be in
+ * progress, it means that we aborted in the middle of a previous state change
+ * and now don't know what the remote transaction state actually is.
+ * Such connections can't safely be further used. Re-establishing the
+ * connection would change the snapshot and roll back any writes already
+ * performed, so that's not an option, either. Thus, we must abort.
+ */
+static void
+pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
+{
+ HeapTuple tup;
+ Form_pg_user_mapping umform;
+ ForeignServer *server;
+
+ if (!entry->changing_xact_state)
+ return;
+
+ tup = SearchSysCache1(USERMAPPINGOID,
+ ObjectIdGetDatum(entry->key));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for user mapping %u", entry->key);
+ umform = (Form_pg_user_mapping) GETSTRUCT(tup);
+ server = GetForeignServer(umform->umserver);
+ ReleaseSysCache(tup);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_EXCEPTION),
+ errmsg("connection to server \"%s\" was lost",
+ server->servername)));
+}
+
+/*
+ * Cancel the currently-in-progress query (whose query text we do not have)
+ * and ignore the result. Returns true if we successfully cancel the query
+ * and discard any pending result, and false if not.
+ */
+static bool
+pgfdw_cancel_query(PGconn *conn)
+{
+ PGcancel *cancel;
+ char errbuf[256];
+ PGresult *result = NULL;
+ TimestampTz endtime;
+
+ /*
+ * If it takes too long to cancel the query and discard the result, assume
+ * the connection is dead.
+ */
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+
+ /*
+ * Issue cancel request. Unfortunately, there's no good way to limit the
+ * amount of time that we might block inside PQgetCancel().
+ */
+ if ((cancel = PQgetCancel(conn)))
+ {
+ if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not send cancel request: %s",
+ errbuf)));
+ PQfreeCancel(cancel);
+ return false;
+ }
+ PQfreeCancel(cancel);
+ }
+
+ /* Get and discard the result of the query. */
+ if (pgfdw_get_cleanup_result(conn, endtime, &result))
+ return false;
+ PQclear(result);
+
+ return true;
+}
+
+/*
+ * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
+ * result. If the query is executed without error, the return value is true.
+ * If the query is executed successfully but returns an error, the return
+ * value is true if and only if ignore_errors is set. If the query can't be
+ * sent or times out, the return value is false.
+ */
+static bool
+pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
+{
+ PGresult *result = NULL;
+ TimestampTz endtime;
+
+ /*
+ * If it takes too long to execute a cleanup query, assume the connection
+ * is dead. It's fairly likely that this is why we aborted in the first
+ * place (e.g. statement timeout, user cancel), so the timeout shouldn't
+ * be too long.
+ */
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+
+ /*
+ * Submit a query. Since we don't use non-blocking mode, this also can
+ * block. But its risk is relatively small, so we ignore that for now.
+ */
+ if (!PQsendQuery(conn, query))
+ {
+ pgfdw_report_error(WARNING, NULL, conn, false, query);
+ return false;
+ }
+
+ /* Get the result of the query. */
+ if (pgfdw_get_cleanup_result(conn, endtime, &result))
+ return false;
+
+ /* Issue a warning if not successful. */
+ if (PQresultStatus(result) != PGRES_COMMAND_OK)
+ {
+ pgfdw_report_error(WARNING, result, conn, true, query);
+ return ignore_errors;
+ }
+
+ return true;
+}
+
+/*
+ * Get, during abort cleanup, the result of a query that is in progress. This
+ * might be a query that is being interrupted by transaction abort, or it might
+ * be a query that was initiated as part of transaction abort to get the remote
+ * side back to the appropriate state.
+ *
+ * It's not a huge problem if we throw an ERROR here, but if we get into error
+ * recursion trouble, we'll end up slamming the connection shut, which will
+ * necessitate failing the entire toplevel transaction even if subtransactions
+ * were used. Try to use WARNING where we can.
+ *
+ * endtime is the time at which we should give up and assume the remote
+ * side is dead. Returns true if the timeout expired, otherwise false.
+ * Sets *result except in case of a timeout.
+ */
+static bool
+pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
+{
+ PGresult *last_res = NULL;
+
+ for (;;)
+ {
+ PGresult *res;
+
+ while (PQisBusy(conn))
+ {
+ int wc;
+ TimestampTz now = GetCurrentTimestamp();
+ long secs;
+ int microsecs;
+ long cur_timeout;
+
+ /* If timeout has expired, give up, else get sleep time. */
+ if (now >= endtime)
+ return true;
+ TimestampDifference(now, endtime, &secs, µsecs);
+
+ /* To protect against clock skew, limit sleep to one minute. */
+ cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs);
+
+ /* Sleep until there's something to do */
+ wc = WaitLatchOrSocket(MyLatch,
+ WL_LATCH_SET | WL_SOCKET_READABLE | WL_TIMEOUT,
+ PQsocket(conn),
+ cur_timeout, PG_WAIT_EXTENSION);
+ ResetLatch(MyLatch);
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* Data available in socket */
+ if (wc & WL_SOCKET_READABLE)
+ {
+ if (!PQconsumeInput(conn))
+ {
+ *result = NULL;
+ return false;
+ }
+ }
+ }
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ break; /* query is complete */
+
+ PQclear(last_res);
+ last_res = res;
+ }
+
+ *result = last_res;
+ return false;
+}