#include "pgstat.h"
#include "storage/latch.h"
#include "utils/hsearch.h"
+#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
{
ConnCacheKey key; /* hash key (must be first) */
PGconn *conn; /* connection to foreign server, or NULL */
+ /* Remaining fields are invalid when conn is NULL: */
int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
* one level of subxact open, etc */
bool have_prep_stmt; /* have we prepared any stmts in this xact? */
bool have_error; /* have any subxacts aborted in this xact? */
bool changing_xact_state; /* xact state change in process */
+ bool invalidated; /* true if reconnect is pending */
+ uint32 server_hashvalue; /* hash value of foreign server OID */
+ uint32 mapping_hashvalue; /* hash value of user mapping OID */
} ConnCacheEntry;
/*
/* prototypes of private functions */
static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+static void disconnect_pg_server(ConnCacheEntry *entry);
static void check_conn_params(const char **keywords, const char **values);
static void configure_remote_session(PGconn *conn);
static void do_sql_command(PGconn *conn, const char *sql);
SubTransactionId mySubid,
SubTransactionId parentSubid,
void *arg);
+static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
static bool pgfdw_cancel_query(PGconn *conn);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
* will_prep_stmt must be true if caller intends to create any prepared
* statements. Since those don't go away automatically at transaction end
* (not even on error), we need this flag to cue manual cleanup.
- *
- * XXX Note that caching connections theoretically requires a mechanism to
- * detect change of FDW objects to invalidate already established connections.
- * We could manage that by watching for invalidation events on the relevant
- * syscaches. For the moment, though, it's not clear that this would really
- * be useful and not mere pedantry. We could not flush any active connections
- * mid-transaction anyway.
*/
PGconn *
GetConnection(UserMapping *user, bool will_prep_stmt)
*/
RegisterXactCallback(pgfdw_xact_callback, NULL);
RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
+ CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+ pgfdw_inval_callback, (Datum) 0);
+ CacheRegisterSyscacheCallback(USERMAPPINGOID,
+ pgfdw_inval_callback, (Datum) 0);
}
/* Set flag that we did GetConnection during the current transaction */
entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
if (!found)
{
- /* initialize new hashtable entry (key is already filled in) */
+ /*
+ * We need only clear "conn" here; remaining fields will be filled
+ * later when "conn" is set.
+ */
entry->conn = NULL;
- entry->xact_depth = 0;
- entry->have_prep_stmt = false;
- entry->have_error = false;
- entry->changing_xact_state = false;
}
/* Reject further use of connections which failed abort cleanup. */
pgfdw_reject_incomplete_xact_state_change(entry);
+ /*
+ * If the connection needs to be remade due to invalidation, disconnect as
+ * soon as we're out of all transactions.
+ */
+ if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
+ {
+ elog(DEBUG3, "closing connection %p for option changes to take effect",
+ entry->conn);
+ disconnect_pg_server(entry);
+ }
+
/*
* We don't check the health of cached connection here, because it would
* require some overhead. Broken connection will be detected when the
/*
* If cache entry doesn't have a connection, we have to establish a new
* connection. (If connect_pg_server throws an error, the cache entry
- * will be left in a valid empty state.)
+ * will remain in a valid empty state, ie conn == NULL.)
*/
if (entry->conn == NULL)
{
ForeignServer *server = GetForeignServer(user->serverid);
- entry->xact_depth = 0; /* just to be sure */
+ /* Reset all transient state fields, to be sure all are clean */
+ entry->xact_depth = 0;
entry->have_prep_stmt = false;
entry->have_error = false;
+ entry->changing_xact_state = false;
+ entry->invalidated = false;
+ entry->server_hashvalue =
+ GetSysCacheHashValue1(FOREIGNSERVEROID,
+ ObjectIdGetDatum(server->serverid));
+ entry->mapping_hashvalue =
+ GetSysCacheHashValue1(USERMAPPINGOID,
+ ObjectIdGetDatum(user->umid));
+
+ /* Now try to make the connection */
entry->conn = connect_pg_server(server, user);
elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
return conn;
}
+/*
+ * Disconnect any open connection for a connection cache entry.
+ */
+static void
+disconnect_pg_server(ConnCacheEntry *entry)
+{
+ if (entry->conn != NULL)
+ {
+ PQfinish(entry->conn);
+ entry->conn = NULL;
+ }
+}
+
/*
* For non-superusers, insist that the connstr specify a password. This
* prevents a password from being picked up from .pgpass, a service file,
entry->changing_xact_state)
{
elog(DEBUG3, "discarding connection %p", entry->conn);
- PQfinish(entry->conn);
- entry->conn = NULL;
- entry->changing_xact_state = false;
+ disconnect_pg_server(entry);
}
}
}
}
+/*
+ * Connection invalidation callback function
+ *
+ * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
+ * mark connections depending on that entry as needing to be remade.
+ * We can't immediately destroy them, since they might be in the midst of
+ * a transaction, but we'll remake them at the next opportunity.
+ *
+ * Although most cache invalidation callbacks blow away all the related stuff
+ * regardless of the given hashvalue, connections are expensive enough that
+ * it's worth trying to avoid that.
+ *
+ * NB: We could avoid unnecessary disconnection more strictly by examining
+ * individual option values, but it seems too much effort for the gain.
+ */
+static void
+pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
+{
+ HASH_SEQ_STATUS scan;
+ ConnCacheEntry *entry;
+
+ Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
+
+ /* ConnectionHash must exist already, if we're registered */
+ hash_seq_init(&scan, ConnectionHash);
+ while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ {
+ /* Ignore invalid entries */
+ if (entry->conn == NULL)
+ continue;
+
+ /* hashvalue == 0 means a cache reset, must clear all state */
+ if (hashvalue == 0 ||
+ (cacheid == FOREIGNSERVEROID &&
+ entry->server_hashvalue == hashvalue) ||
+ (cacheid == USERMAPPINGOID &&
+ entry->mapping_hashvalue == hashvalue))
+ entry->invalidated = true;
+ }
+}
+
/*
* Raise an error if the given connection cache entry is marked as being
* in the middle of an xact state change. This should be called at which no
Form_pg_user_mapping umform;
ForeignServer *server;
- if (!entry->changing_xact_state)
+ /* nothing to do for inactive entries and entries of sane state */
+ if (entry->conn == NULL || !entry->changing_xact_state)
return;
+ /* make sure this entry is inactive */
+ disconnect_pg_server(entry);
+
+ /* find server name to be shown in the message below */
tup = SearchSysCache1(USERMAPPINGOID,
ObjectIdGetDatum(entry->key));
if (!HeapTupleIsValid(tup))
public | ft_pg_type | loopback | (schema_name 'pg_catalog', table_name 'pg_type') |
(6 rows)
+-- Test that alteration of server options causes reconnection
+SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work
+ c3 | c4
+-------+------------------------------
+ 00001 | Fri Jan 02 00:00:00 1970 PST
+(1 row)
+
+ALTER SERVER loopback OPTIONS (SET dbname 'no such database');
+SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail
+ERROR: could not connect to server "loopback"
+DETAIL: FATAL: database "no such database" does not exist
+DO $d$
+ BEGIN
+ EXECUTE $$ALTER SERVER loopback
+ OPTIONS (SET dbname '$$||current_database()||$$')$$;
+ END;
+$d$;
+SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
+ c3 | c4
+-------+------------------------------
+ 00001 | Fri Jan 02 00:00:00 1970 PST
+(1 row)
+
+-- Test that alteration of user mapping options causes reconnection
+ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+ OPTIONS (ADD user 'no such user');
+SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail
+ERROR: could not connect to server "loopback"
+DETAIL: FATAL: role "no such user" does not exist
+ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+ OPTIONS (DROP user);
+SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
+ c3 | c4
+-------+------------------------------
+ 00001 | Fri Jan 02 00:00:00 1970 PST
+(1 row)
+
-- Now we should be able to run ANALYZE.
-- To exercise multiple code paths, we use local stats on ft1
-- and remote-estimate mode on ft2.