Previously, workers sent data to the leader using the client encoding.
That mostly worked, but the leader the converted the data back to the
server encoding. Since not all encoding conversions are reversible,
that could provoke failures. Fix by using the database encoding for
all communication between worker and leader.
Also, while temporary changes to GUC settings, as from the SET clause
of a function, are in general OK for parallel query, changing
client_encoding this way inside of a parallel worker is not OK.
Previously, that would have confused the leader; with these changes,
it would not confuse the leader, but it wouldn't do anything either.
So refuse such changes in parallel workers.
Also, the previous code naively assumed that when it received a
NotifyResonse from the worker, it could pass that directly back to the
user. But now that worker-to-leader communication always uses the
database encoding, that's clearly no longer correct - though,
actually, the old way was always broken for V2 clients. So
disassemble and reconstitute the message instead.
Issues reported by Peter Eisentraut. Patch by me, reviewed by
Peter Eisentraut.
case 'A': /* NotifyResponse */
{
/* Propagate NotifyResponse. */
- pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1);
+ int32 pid;
+ const char *channel;
+ const char *payload;
+
+ pid = pq_getmsgint(msg, 4);
+ channel = pq_getmsgrawstring(msg);
+ payload = pq_getmsgrawstring(msg);
+ pq_endmessage(msg);
+
+ NotifyMyFrontEnd(channel, payload, pid);
+
break;
}
BackgroundWorkerInitializeConnectionByOid(fps->database_id,
fps->authenticated_user_id);
+ /*
+ * Set the client encoding to the database encoding, since that is what
+ * the leader will expect.
+ */
+ SetClientEncoding(GetDatabaseEncoding());
+
/* Restore GUC values from launching backend. */
gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC);
Assert(gucspace != NULL);
char *page_buffer);
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(void);
-static void NotifyMyFrontEnd(const char *channel,
- const char *payload,
- int32 srcPid);
static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
static void ClearPendingActionsAndNotifies(void);
/*
* Send NOTIFY message to my front end.
*/
-static void
+void
NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
{
if (whereToSendOutput == DestRemote)
{
int encoding = *((int *) extra);
+ /*
+ * Parallel workers send data to the leader, not the client. They always
+ * send data using the database encoding.
+ */
+ if (IsParallelWorker())
+ {
+ /*
+ * During parallel worker startup, we want to accept the leader's
+ * client_encoding setting so that anyone who looks at the value in
+ * the worker sees the same value that they would see in the leader.
+ */
+ if (InitializingParallelWorker)
+ return;
+
+ /*
+ * A change other than during startup, for example due to a SET clause
+ * attached to a function definition, should be rejected, as there is
+ * nothing we can do inside the worker to make it take effect.
+ */
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot change client_encoding in a parallel worker")));
+ }
+
/* We do not expect an error if PrepareClientEncoding succeeded */
if (SetClientEncoding(encoding) < 0)
elog(LOG, "SetClientEncoding(%d) failed", encoding);
* pq_copymsgbytes - copy raw data from a message buffer
* pq_getmsgtext - get a counted text string (with conversion)
* pq_getmsgstring - get a null-terminated text string (with conversion)
+ * pq_getmsgrawstring - get a null-terminated text string - NO conversion
* pq_getmsgend - verify message fully consumed
*/
return pg_client_to_server(str, slen);
}
+/* --------------------------------
+ * pq_getmsgrawstring - get a null-terminated text string - NO conversion
+ *
+ * Returns a pointer directly into the message buffer.
+ * --------------------------------
+ */
+const char *
+pq_getmsgrawstring(StringInfo msg)
+{
+ char *str;
+ int slen;
+
+ str = &msg->data[msg->cursor];
+
+ /*
+ * It's safe to use strlen() here because a StringInfo is guaranteed to
+ * have a trailing null byte. But check we found a null inside the
+ * message.
+ */
+ slen = strlen(str);
+ if (msg->cursor + slen >= msg->len)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid string in message")));
+ msg->cursor += slen + 1;
+
+ return str;
+}
+
/* --------------------------------
* pq_getmsgend - verify message fully consumed
* --------------------------------
pq_getmsgend(msg);
break;
}
- value = pq_getmsgstring(msg);
+ value = pq_getmsgrawstring(msg);
switch (code)
{
extern Size AsyncShmemSize(void);
extern void AsyncShmemInit(void);
+extern void NotifyMyFrontEnd(const char *channel,
+ const char *payload,
+ int32 srcPid);
+
/* notify-related SQL statements */
extern void Async_Notify(const char *channel, const char *payload);
extern void Async_Listen(const char *channel);
extern void pq_copymsgbytes(StringInfo msg, char *buf, int datalen);
extern char *pq_getmsgtext(StringInfo msg, int rawbytes, int *nbytes);
extern const char *pq_getmsgstring(StringInfo msg);
+extern const char *pq_getmsgrawstring(StringInfo msg);
extern void pq_getmsgend(StringInfo msg);
#endif /* PQFORMAT_H */