#define DefaultOption ""
#define DefaultAuthtype ""
#define DefaultPassword ""
+#define DefaultTargetSessionAttrs "any"
#ifdef USE_SSL
#define DefaultSSLMode "prefer"
#else
"Replication", "D", 5,
offsetof(struct pg_conn, replication)},
+ {"target_session_attrs", "PGTARGETSESSIONATTRS",
+ DefaultTargetSessionAttrs, NULL,
+ "Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
+ offsetof(struct pg_conn, target_session_attrs)},
+
/* Terminating entry --- MUST BE LAST */
{NULL, NULL, NULL, NULL,
NULL, NULL, 0}
static bool fillPGconn(PGconn *conn, PQconninfoOption *connOptions);
static void freePGconn(PGconn *conn);
static void closePGconn(PGconn *conn);
+static void release_all_addrinfo(PGconn *conn);
+static void sendTerminateConn(PGconn *conn);
static PQconninfoOption *conninfo_init(PQExpBuffer errorMessage);
static PQconninfoOption *parse_connection_string(const char *conninfo,
PQExpBuffer errorMessage, bool use_defaults);
goto oom_error;
}
+ /*
+ * Validate target_session_attrs option.
+ */
+ if (conn->target_session_attrs)
+ {
+ if (strcmp(conn->target_session_attrs, "any") != 0
+ && strcmp(conn->target_session_attrs, "read-write") != 0)
+ {
+ conn->status = CONNECTION_BAD;
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("invalid target_session_attrs value: \"%s\"\n"),
+ conn->target_session_attrs);
+ return false;
+ }
+ }
+
/*
* Only if we get this far is it appropriate to try to connect. (We need a
* state flag, rather than just the boolean result of this function, in
/* Special cases: proceed without waiting. */
case CONNECTION_SSL_STARTUP:
case CONNECTION_NEEDED:
+ case CONNECTION_CHECK_WRITABLE:
break;
default:
goto error_return;
}
- /* We can release the address lists now. */
- if (conn->connhost != NULL)
- {
- int i;
-
- for (i = 0; i < conn->nconnhost; ++i)
- {
- int family = AF_UNSPEC;
-
-#ifdef HAVE_UNIX_SOCKETS
- if (conn->connhost[i].type == CHT_UNIX_SOCKET)
- family = AF_UNIX;
-#endif
-
- pg_freeaddrinfo_all(family,
- conn->connhost[i].addrlist);
- conn->connhost[i].addrlist = NULL;
- }
- }
- conn->addr_cur = NULL;
-
/* Fire up post-connection housekeeping if needed */
if (PG_PROTOCOL_MAJOR(conn->pversion) < 3)
{
return PGRES_POLLING_WRITING;
}
- /* Otherwise, we are open for business! */
+ /*
+ * If a read-write connection is required, see if we have one.
+ */
+ if (conn->target_session_attrs != NULL &&
+ strcmp(conn->target_session_attrs, "read-write") == 0)
+ {
+ conn->status = CONNECTION_OK;
+ if (!PQsendQuery(conn,
+ "show transaction_read_only"))
+ goto error_return;
+ conn->status = CONNECTION_CHECK_WRITABLE;
+ return PGRES_POLLING_READING;
+ }
+
+ /* We can release the address lists now. */
+ release_all_addrinfo(conn);
+
+ /* We are open for business! */
conn->status = CONNECTION_OK;
return PGRES_POLLING_OK;
}
goto error_return;
}
+ /*
+ * If a read-write connection is requisted check for same.
+ */
+ if (conn->target_session_attrs != NULL &&
+ strcmp(conn->target_session_attrs, "read-write") == 0)
+ {
+ conn->status = CONNECTION_OK;
+ if (!PQsendQuery(conn,
+ "show transaction_read_only"))
+ goto error_return;
+ conn->status = CONNECTION_CHECK_WRITABLE;
+ return PGRES_POLLING_READING;
+ }
+
+ /* We can release the address lists now. */
+ release_all_addrinfo(conn);
+
/* We are open for business! */
conn->status = CONNECTION_OK;
return PGRES_POLLING_OK;
+ case CONNECTION_CHECK_WRITABLE:
+ {
+ conn->status = CONNECTION_OK;
+ if (!PQconsumeInput(conn))
+ goto error_return;
+
+ if (PQisBusy(conn))
+ {
+ conn->status = CONNECTION_CHECK_WRITABLE;
+ return PGRES_POLLING_READING;
+ }
+
+ res = PQgetResult(conn);
+ if (res && (PQresultStatus(res) == PGRES_TUPLES_OK) &&
+ PQntuples(res) == 1)
+ {
+ char *val;
+
+ val = PQgetvalue(res, 0, 0);
+ if (strncmp(val, "on", 2) == 0)
+ {
+ PQclear(res);
+
+ /* Not writable; close connection. */
+ appendPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("could not make a writable "
+ "connection to server "
+ "\"%s:%s\"\n"),
+ conn->connhost[conn->whichhost].host,
+ conn->connhost[conn->whichhost].port);
+ conn->status = CONNECTION_OK;
+ sendTerminateConn(conn);
+ pqDropConnection(conn, true);
+
+ /* Skip any remaining addresses for this host. */
+ conn->addr_cur = NULL;
+ if (conn->whichhost + 1 < conn->nconnhost)
+ {
+ conn->status = CONNECTION_NEEDED;
+ goto keep_going;
+ }
+
+ /* No more addresses to try. So we fail. */
+ goto error_return;
+ }
+ PQclear(res);
+
+ /* We can release the address lists now. */
+ release_all_addrinfo(conn);
+
+ /* We are open for business! */
+ conn->status = CONNECTION_OK;
+ return PGRES_POLLING_OK;
+ }
+
+ /*
+ * Something went wrong with "show transaction_read_only". We
+ * should try next addresses.
+ */
+ if (res)
+ PQclear(res);
+ appendPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("test \"show transaction_read_only\" failed "
+ " on \"%s:%s\" \n"),
+ conn->connhost[conn->whichhost].host,
+ conn->connhost[conn->whichhost].port);
+ conn->status = CONNECTION_OK;
+ sendTerminateConn(conn);
+ pqDropConnection(conn, true);
+
+ if (conn->addr_cur->ai_next != NULL ||
+ conn->whichhost + 1 < conn->nconnhost)
+ {
+ conn->addr_cur = conn->addr_cur->ai_next;
+ conn->status = CONNECTION_NEEDED;
+ goto keep_going;
+ }
+
+ /* No more addresses to try. So we fail. */
+ goto error_return;
+ }
+
default:
appendPQExpBuffer(&conn->errorMessage,
libpq_gettext("invalid connection state %d, "
free(conn->outBuffer);
if (conn->rowBuf)
free(conn->rowBuf);
+ if (conn->target_session_attrs)
+ free(conn->target_session_attrs);
termPQExpBuffer(&conn->errorMessage);
termPQExpBuffer(&conn->workBuffer);
}
/*
- * closePGconn
- * - properly close a connection to the backend
- *
- * This should reset or release all transient state, but NOT the connection
- * parameters. On exit, the PGconn should be in condition to start a fresh
- * connection with the same parameters (see PQreset()).
+ * release_all_addrinfo
+ * - free addrinfo of all hostconn elements.
*/
+
static void
-closePGconn(PGconn *conn)
+release_all_addrinfo(PGconn *conn)
{
- PGnotify *notify;
- pgParameterStatus *pstatus;
+ if (conn->connhost != NULL)
+ {
+ int i;
+
+ for (i = 0; i < conn->nconnhost; ++i)
+ {
+ int family = AF_UNSPEC;
+
+#ifdef HAVE_UNIX_SOCKETS
+ if (conn->connhost[i].type == CHT_UNIX_SOCKET)
+ family = AF_UNIX;
+#endif
+ pg_freeaddrinfo_all(family,
+ conn->connhost[i].addrlist);
+ conn->connhost[i].addrlist = NULL;
+ }
+ }
+ conn->addr_cur = NULL;
+}
+
+/*
+ * sendTerminateConn
+ * - Send a terminate message to backend.
+ */
+static void
+sendTerminateConn(PGconn *conn)
+{
/*
* Note that the protocol doesn't allow us to send Terminate messages
* during the startup phase.
pqPutMsgEnd(conn);
(void) pqFlush(conn);
}
+}
+
+/*
+ * closePGconn
+ * - properly close a connection to the backend
+ *
+ * This should reset or release all transient state, but NOT the connection
+ * parameters. On exit, the PGconn should be in condition to start a fresh
+ * connection with the same parameters (see PQreset()).
+ */
+static void
+closePGconn(PGconn *conn)
+{
+ PGnotify *notify;
+ pgParameterStatus *pstatus;
+
+ sendTerminateConn(conn);
/*
* Must reset the blocking status so a possible reconnect will work.
conn->asyncStatus = PGASYNC_IDLE;
pqClearAsyncResult(conn); /* deallocate result */
resetPQExpBuffer(&conn->errorMessage);
- if (conn->connhost != NULL)
- {
- int i;
-
- for (i = 0; i < conn->nconnhost; ++i)
- {
- int family = AF_UNSPEC;
-
-#ifdef HAVE_UNIX_SOCKETS
- if (conn->connhost[i].type == CHT_UNIX_SOCKET)
- family = AF_UNIX;
-#endif
+ release_all_addrinfo(conn);
- pg_freeaddrinfo_all(family,
- conn->connhost[i].addrlist);
- conn->connhost[i].addrlist = NULL;
- }
- }
- conn->addr_cur = NULL;
notify = conn->notifyHead;
while (notify != NULL)
{