]> granicus.if.org Git - postgresql/commitdiff
postgres_fdw: Allow cancellation of transaction control commands.
authorRobert Haas <rhaas@postgresql.org>
Wed, 7 Jun 2017 19:14:55 +0000 (15:14 -0400)
committerRobert Haas <rhaas@postgresql.org>
Wed, 7 Jun 2017 19:24:22 +0000 (15:24 -0400)
Commit f039eaac7131ef2a4cf63a10cf98486f8bcd09d2, later back-patched
with commit 1b812afb0eafe125b820cc3b95e7ca03821aa675, allowed many of
the queries issued by postgres_fdw to fetch remote data to respond to
cancel interrupts in a timely fashion.  However, it didn't do anything
about the transaction control commands, which remained
noninterruptible.

Improve the situation by changing do_sql_command() to retrieve query
results using pgfdw_get_result(), which uses the asynchronous
interface to libpq so that it can check for interrupts every time
libpq returns control.  Since this might result in a situation
where we can no longer be sure that the remote transaction state
matches the local transaction state, add a facility to force all
levels of the local transaction to abort if we've lost track of
the remote state; without this, an apparently-successful commit of
the local transaction might fail to commit changes made on the
remote side.  Also, add a 60-second timeout for queries issue during
transaction abort; if that expires, give up and mark the state of
the connection as unknown.  Drop all such connections when we exit
the local transaction.  Together, these changes mean that if we're
aborting the local toplevel transaction anyway, we can just drop the
remote connection in lieu of waiting (possibly for a very long time)
for it to complete an abort.

This still leaves quite a bit of room for improvement.  PQcancel()
has no asynchronous interface, so if we get stuck sending the cancel
request we'll still hang.  Also, PQsetnonblocking() is not used, which
means we could block uninterruptibly when sending a query.  There
might be some other optimizations possible as well.  Nonetheless,
this allows us to escape a wait for an unresponsive remote server
quickly in many more cases than previously.

Report by Suraj Kharage.  Patch by me and Rafia Sabih.  Review
and testing by Amit Kapila and Tushar Ahuja.

Discussion: http://postgr.es/m/CAF1DzPU8Kx+fMXEbFoP289xtm3bz3t+ZfxhmKavr98Bh-C0TqQ@mail.gmail.com

contrib/postgres_fdw/connection.c

index ff04f43cf20e205738555a56b3323516f3390e8b..a3b7bf8c22aedef6fd45987c12cf924d5d98dfbf 100644 (file)
 
 #include "postgres_fdw.h"
 
+#include "access/htup_details.h"
+#include "catalog/pg_user_mapping.h"
 #include "access/xact.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "storage/latch.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
+#include "utils/syscache.h"
 
 
 /*
@@ -48,6 +51,7 @@ typedef struct ConnCacheEntry
                                                                 * one level of subxact open, etc */
        bool            have_prep_stmt; /* have we prepared any stmts in this xact? */
        bool            have_error;             /* have any subxacts aborted in this xact? */
+       bool            changing_xact_state;    /* xact state change in process */
 } ConnCacheEntry;
 
 /*
@@ -73,6 +77,12 @@ static void pgfdw_subxact_callback(SubXactEvent event,
                                           SubTransactionId mySubid,
                                           SubTransactionId parentSubid,
                                           void *arg);
+static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
+static bool pgfdw_cancel_query(PGconn *conn);
+static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
+                                                bool ignore_errors);
+static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
+                                                PGresult **result);
 
 
 /*
@@ -138,8 +148,12 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
                entry->xact_depth = 0;
                entry->have_prep_stmt = false;
                entry->have_error = false;
+               entry->changing_xact_state = false;
        }
 
+       /* Reject further use of connections which failed abort cleanup. */
+       pgfdw_reject_incomplete_xact_state_change(entry);
+
        /*
         * We don't check the health of cached connection here, because it would
         * require some overhead.  Broken connection will be detected when the
@@ -352,7 +366,9 @@ do_sql_command(PGconn *conn, const char *sql)
 {
        PGresult   *res;
 
-       res = PQexec(conn, sql);
+       if (!PQsendQuery(conn, sql))
+               pgfdw_report_error(ERROR, NULL, conn, false, sql);
+       res = pgfdw_get_result(conn, sql);
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
                pgfdw_report_error(ERROR, res, conn, true, sql);
        PQclear(res);
@@ -385,8 +401,10 @@ begin_remote_xact(ConnCacheEntry *entry)
                        sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
                else
                        sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
+               entry->changing_xact_state = true;
                do_sql_command(entry->conn, sql);
                entry->xact_depth = 1;
+               entry->changing_xact_state = false;
        }
 
        /*
@@ -399,8 +417,10 @@ begin_remote_xact(ConnCacheEntry *entry)
                char            sql[64];
 
                snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
+               entry->changing_xact_state = true;
                do_sql_command(entry->conn, sql);
                entry->xact_depth++;
+               entry->changing_xact_state = false;
        }
 }
 
@@ -613,6 +633,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
                /* If it has an open remote transaction, try to close it */
                if (entry->xact_depth > 0)
                {
+                       bool            abort_cleanup_failure = false;
+
                        elog(DEBUG3, "closing remote transaction on connection %p",
                                 entry->conn);
 
@@ -620,8 +642,17 @@ pgfdw_xact_callback(XactEvent event, void *arg)
                        {
                                case XACT_EVENT_PARALLEL_PRE_COMMIT:
                                case XACT_EVENT_PRE_COMMIT:
+
+                                       /*
+                                        * If abort cleanup previously failed for this connection,
+                                        * we can't issue any more commands against it.
+                                        */
+                                       pgfdw_reject_incomplete_xact_state_change(entry);
+
                                        /* Commit all remote transactions during pre-commit */
+                                       entry->changing_xact_state = true;
                                        do_sql_command(entry->conn, "COMMIT TRANSACTION");
+                                       entry->changing_xact_state = false;
 
                                        /*
                                         * If there were any errors in subtransactions, and we
@@ -669,6 +700,27 @@ pgfdw_xact_callback(XactEvent event, void *arg)
                                        break;
                                case XACT_EVENT_PARALLEL_ABORT:
                                case XACT_EVENT_ABORT:
+
+                                       /*
+                                        * Don't try to clean up the connection if we're already
+                                        * in error recursion trouble.
+                                        */
+                                       if (in_error_recursion_trouble())
+                                               entry->changing_xact_state = true;
+
+                                       /*
+                                        * If connection is already unsalvageable, don't touch it
+                                        * further.
+                                        */
+                                       if (entry->changing_xact_state)
+                                               break;
+
+                                       /*
+                                        * Mark this connection as in the process of changing
+                                        * transaction state.
+                                        */
+                                       entry->changing_xact_state = true;
+
                                        /* Assume we might have lost track of prepared statements */
                                        entry->have_error = true;
 
@@ -679,40 +731,35 @@ pgfdw_xact_callback(XactEvent event, void *arg)
                                         * command is still being processed by the remote server,
                                         * and if so, request cancellation of the command.
                                         */
-                                       if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+                                       if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
+                                               !pgfdw_cancel_query(entry->conn))
                                        {
-                                               PGcancel   *cancel;
-                                               char            errbuf[256];
-
-                                               if ((cancel = PQgetCancel(entry->conn)))
-                                               {
-                                                       if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
-                                                               ereport(WARNING,
-                                                                               (errcode(ERRCODE_CONNECTION_FAILURE),
-                                                                 errmsg("could not send cancel request: %s",
-                                                                                errbuf)));
-                                                       PQfreeCancel(cancel);
-                                               }
+                                               /* Unable to cancel running query. */
+                                               abort_cleanup_failure = true;
+                                       }
+                                       else if (!pgfdw_exec_cleanup_query(entry->conn,
+                                                                                                          "ABORT TRANSACTION",
+                                                                                                          false))
+                                       {
+                                               /* Unable to abort remote transaction. */
+                                               abort_cleanup_failure = true;
+                                       }
+                                       else if (entry->have_prep_stmt && entry->have_error &&
+                                                        !pgfdw_exec_cleanup_query(entry->conn,
+                                                                                                          "DEALLOCATE ALL",
+                                                                                                          true))
+                                       {
+                                               /* Trouble clearing prepared statements. */
+                                               abort_cleanup_failure = true;
                                        }
-
-                                       /* If we're aborting, abort all remote transactions too */
-                                       res = PQexec(entry->conn, "ABORT TRANSACTION");
-                                       /* Note: can't throw ERROR, it would be infinite loop */
-                                       if (PQresultStatus(res) != PGRES_COMMAND_OK)
-                                               pgfdw_report_error(WARNING, res, entry->conn, true,
-                                                                                  "ABORT TRANSACTION");
                                        else
                                        {
-                                               PQclear(res);
-                                               /* As above, make sure to clear any prepared stmts */
-                                               if (entry->have_prep_stmt && entry->have_error)
-                                               {
-                                                       res = PQexec(entry->conn, "DEALLOCATE ALL");
-                                                       PQclear(res);
-                                               }
                                                entry->have_prep_stmt = false;
                                                entry->have_error = false;
                                        }
+
+                                       /* Disarm changing_xact_state if it all worked. */
+                                       entry->changing_xact_state = abort_cleanup_failure;
                                        break;
                        }
                }
@@ -725,11 +772,13 @@ pgfdw_xact_callback(XactEvent event, void *arg)
                 * recover. Next GetConnection will open a new connection.
                 */
                if (PQstatus(entry->conn) != CONNECTION_OK ||
-                       PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
+                       PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
+                       entry->changing_xact_state)
                {
                        elog(DEBUG3, "discarding connection %p", entry->conn);
                        PQfinish(entry->conn);
                        entry->conn = NULL;
+                       entry->changing_xact_state = false;
                }
        }
 
@@ -772,7 +821,6 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
        hash_seq_init(&scan, ConnectionHash);
        while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
        {
-               PGresult   *res;
                char            sql[100];
 
                /*
@@ -788,12 +836,33 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 
                if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
                {
+                       /*
+                        * If abort cleanup previously failed for this connection, we
+                        * can't issue any more commands against it.
+                        */
+                       pgfdw_reject_incomplete_xact_state_change(entry);
+
                        /* Commit all remote subtransactions during pre-commit */
                        snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
+                       entry->changing_xact_state = true;
                        do_sql_command(entry->conn, sql);
+                       entry->changing_xact_state = false;
                }
-               else
+               else if (in_error_recursion_trouble())
+               {
+                       /*
+                        * Don't try to clean up the connection if we're already in error
+                        * recursion trouble.
+                        */
+                       entry->changing_xact_state = true;
+               }
+               else if (!entry->changing_xact_state)
                {
+                       bool            abort_cleanup_failure = false;
+
+                       /* Remember that abort cleanup is in progress. */
+                       entry->changing_xact_state = true;
+
                        /* Assume we might have lost track of prepared statements */
                        entry->have_error = true;
 
@@ -804,34 +873,220 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
                         * processed by the remote server, and if so, request cancellation
                         * of the command.
                         */
-                       if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+                       if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
+                               !pgfdw_cancel_query(entry->conn))
+                               abort_cleanup_failure = true;
+                       else
                        {
-                               PGcancel   *cancel;
-                               char            errbuf[256];
-
-                               if ((cancel = PQgetCancel(entry->conn)))
-                               {
-                                       if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
-                                               ereport(WARNING,
-                                                               (errcode(ERRCODE_CONNECTION_FAILURE),
-                                                                errmsg("could not send cancel request: %s",
-                                                                               errbuf)));
-                                       PQfreeCancel(cancel);
-                               }
+                               /* Rollback all remote subtransactions during abort */
+                               snprintf(sql, sizeof(sql),
+                                                "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
+                                                curlevel, curlevel);
+                               if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
+                                       abort_cleanup_failure = true;
                        }
 
-                       /* Rollback all remote subtransactions during abort */
-                       snprintf(sql, sizeof(sql),
-                                        "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
-                                        curlevel, curlevel);
-                       res = PQexec(entry->conn, sql);
-                       if (PQresultStatus(res) != PGRES_COMMAND_OK)
-                               pgfdw_report_error(WARNING, res, entry->conn, true, sql);
-                       else
-                               PQclear(res);
+                       /* Disarm changing_xact_state if it all worked. */
+                       entry->changing_xact_state = abort_cleanup_failure;
                }
 
                /* OK, we're outta that level of subtransaction */
                entry->xact_depth--;
        }
 }
+
+/*
+ * Raise an error if the given connection cache entry is marked as being
+ * in the middle of an xact state change.  This should be called at which no
+ * such change is expected to be in progress; if one is found to be in
+ * progress, it means that we aborted in the middle of a previous state change
+ * and now don't know what the remote transaction state actually is.
+ * Such connections can't safely be further used.  Re-establishing the
+ * connection would change the snapshot and roll back any writes already
+ * performed, so that's not an option, either. Thus, we must abort.
+ */
+static void
+pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
+{
+       HeapTuple       tup;
+       Form_pg_user_mapping umform;
+       ForeignServer *server;
+
+       if (!entry->changing_xact_state)
+               return;
+
+       tup = SearchSysCache1(USERMAPPINGOID,
+                                                 ObjectIdGetDatum(entry->key));
+       if (!HeapTupleIsValid(tup))
+               elog(ERROR, "cache lookup failed for user mapping %u", entry->key);
+       umform = (Form_pg_user_mapping) GETSTRUCT(tup);
+       server = GetForeignServer(umform->umserver);
+       ReleaseSysCache(tup);
+
+       ereport(ERROR,
+                       (errcode(ERRCODE_CONNECTION_EXCEPTION),
+                        errmsg("connection to server \"%s\" was lost",
+                                       server->servername)));
+}
+
+/*
+ * Cancel the currently-in-progress query (whose query text we do not have)
+ * and ignore the result.  Returns true if we successfully cancel the query
+ * and discard any pending result, and false if not.
+ */
+static bool
+pgfdw_cancel_query(PGconn *conn)
+{
+       PGcancel   *cancel;
+       char            errbuf[256];
+       PGresult   *result = NULL;
+       TimestampTz endtime;
+
+       /*
+        * If it takes too long to cancel the query and discard the result, assume
+        * the connection is dead.
+        */
+       endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+
+       /*
+        * Issue cancel request.  Unfortunately, there's no good way to limit the
+        * amount of time that we might block inside PQgetCancel().
+        */
+       if ((cancel = PQgetCancel(conn)))
+       {
+               if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
+               {
+                       ereport(WARNING,
+                                       (errcode(ERRCODE_CONNECTION_FAILURE),
+                                        errmsg("could not send cancel request: %s",
+                                                       errbuf)));
+                       PQfreeCancel(cancel);
+                       return false;
+               }
+               PQfreeCancel(cancel);
+       }
+
+       /* Get and discard the result of the query. */
+       if (pgfdw_get_cleanup_result(conn, endtime, &result))
+               return false;
+       PQclear(result);
+
+       return true;
+}
+
+/*
+ * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
+ * result.  If the query is executed without error, the return value is true.
+ * If the query is executed successfully but returns an error, the return
+ * value is true if and only if ignore_errors is set.  If the query can't be
+ * sent or times out, the return value is false.
+ */
+static bool
+pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
+{
+       PGresult   *result = NULL;
+       TimestampTz endtime;
+
+       /*
+        * If it takes too long to execute a cleanup query, assume the connection
+        * is dead.  It's fairly likely that this is why we aborted in the first
+        * place (e.g. statement timeout, user cancel), so the timeout shouldn't
+        * be too long.
+        */
+       endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+
+       /*
+        * Submit a query.  Since we don't use non-blocking mode, this also can
+        * block.  But its risk is relatively small, so we ignore that for now.
+        */
+       if (!PQsendQuery(conn, query))
+       {
+               pgfdw_report_error(WARNING, NULL, conn, false, query);
+               return false;
+       }
+
+       /* Get the result of the query. */
+       if (pgfdw_get_cleanup_result(conn, endtime, &result))
+               return false;
+
+       /* Issue a warning if not successful. */
+       if (PQresultStatus(result) != PGRES_COMMAND_OK)
+       {
+               pgfdw_report_error(WARNING, result, conn, true, query);
+               return ignore_errors;
+       }
+
+       return true;
+}
+
+/*
+ * Get, during abort cleanup, the result of a query that is in progress.  This
+ * might be a query that is being interrupted by transaction abort, or it might
+ * be a query that was initiated as part of transaction abort to get the remote
+ * side back to the appropriate state.
+ *
+ * It's not a huge problem if we throw an ERROR here, but if we get into error
+ * recursion trouble, we'll end up slamming the connection shut, which will
+ * necessitate failing the entire toplevel transaction even if subtransactions
+ * were used.  Try to use WARNING where we can.
+ *
+ * endtime is the time at which we should give up and assume the remote
+ * side is dead.  Returns true if the timeout expired, otherwise false.
+ * Sets *result except in case of a timeout.
+ */
+static bool
+pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
+{
+       PGresult   *last_res = NULL;
+
+       for (;;)
+       {
+               PGresult   *res;
+
+               while (PQisBusy(conn))
+               {
+                       int                     wc;
+                       TimestampTz now = GetCurrentTimestamp();
+                       long            secs;
+                       int                     microsecs;
+                       long            cur_timeout;
+
+                       /* If timeout has expired, give up, else get sleep time. */
+                       if (now >= endtime)
+                               return true;
+                       TimestampDifference(now, endtime, &secs, &microsecs);
+
+                       /* To protect against clock skew, limit sleep to one minute. */
+                       cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs);
+
+                       /* Sleep until there's something to do */
+                       wc = WaitLatchOrSocket(MyLatch,
+                                                         WL_LATCH_SET | WL_SOCKET_READABLE | WL_TIMEOUT,
+                                                                  PQsocket(conn),
+                                                                  cur_timeout);
+                       ResetLatch(MyLatch);
+
+                       CHECK_FOR_INTERRUPTS();
+
+                       /* Data available in socket */
+                       if (wc & WL_SOCKET_READABLE)
+                       {
+                               if (!PQconsumeInput(conn))
+                               {
+                                       *result = NULL;
+                                       return false;
+                               }
+                       }
+               }
+
+               res = PQgetResult(conn);
+               if (res == NULL)
+                       break;                          /* query is complete */
+
+               PQclear(last_res);
+               last_res = res;
+       }
+
+       *result = last_res;
+       return false;
+}