]> granicus.if.org Git - postgresql/commitdiff
Fix several mistakes around parallel workers and client_encoding.
authorRobert Haas <rhaas@postgresql.org>
Thu, 30 Jun 2016 22:35:32 +0000 (18:35 -0400)
committerRobert Haas <rhaas@postgresql.org>
Thu, 30 Jun 2016 22:35:32 +0000 (18:35 -0400)
Previously, workers sent data to the leader using the client encoding.
That mostly worked, but the leader the converted the data back to the
server encoding.  Since not all encoding conversions are reversible,
that could provoke failures.  Fix by using the database encoding for
all communication between worker and leader.

Also, while temporary changes to GUC settings, as from the SET clause
of a function, are in general OK for parallel query, changing
client_encoding this way inside of a parallel worker is not OK.
Previously, that would have confused the leader; with these changes,
it would not confuse the leader, but it wouldn't do anything either.
So refuse such changes in parallel workers.

Also, the previous code naively assumed that when it received a
NotifyResonse from the worker, it could pass that directly back to the
user.  But now that worker-to-leader communication always uses the
database encoding, that's clearly no longer correct - though,
actually, the old way was always broken for V2 clients.  So
disassemble and reconstitute the message instead.

Issues reported by Peter Eisentraut.  Patch by me, reviewed by
Peter Eisentraut.

src/backend/access/transam/parallel.c
src/backend/commands/async.c
src/backend/commands/variable.c
src/backend/libpq/pqformat.c
src/backend/libpq/pqmq.c
src/include/commands/async.h
src/include/libpq/pqformat.h

index 088700e17cb910a12e5ddb2cbdbe8ea131f4866e..eef1dc2b1843de5376e415a12ec1eb86a6a56182 100644 (file)
@@ -810,7 +810,17 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
                case 'A':                               /* NotifyResponse */
                        {
                                /* Propagate NotifyResponse. */
-                               pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1);
+                               int32           pid;
+                               const char *channel;
+                               const char *payload;
+
+                               pid = pq_getmsgint(msg, 4);
+                               channel = pq_getmsgrawstring(msg);
+                               payload = pq_getmsgrawstring(msg);
+                               pq_endmessage(msg);
+
+                               NotifyMyFrontEnd(channel, payload, pid);
+
                                break;
                        }
 
@@ -988,6 +998,12 @@ ParallelWorkerMain(Datum main_arg)
        BackgroundWorkerInitializeConnectionByOid(fps->database_id,
                                                                                          fps->authenticated_user_id);
 
+       /*
+        * Set the client encoding to the database encoding, since that is what
+        * the leader will expect.
+        */
+       SetClientEncoding(GetDatabaseEncoding());
+
        /* Restore GUC values from launching backend. */
        gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC);
        Assert(gucspace != NULL);
index c39ac3aeef00d506308034c85b43f8223177cdcf..716f1c33183da6e2cc0d50bf463098bfcbf213a6 100644 (file)
@@ -390,9 +390,6 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
                                                         char *page_buffer);
 static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(void);
-static void NotifyMyFrontEnd(const char *channel,
-                                const char *payload,
-                                int32 srcPid);
 static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
 static void ClearPendingActionsAndNotifies(void);
 
@@ -2076,7 +2073,7 @@ ProcessIncomingNotify(void)
 /*
  * Send NOTIFY message to my front end.
  */
-static void
+void
 NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
 {
        if (whereToSendOutput == DestRemote)
index 962d75db6e45be3afdd3059c6249bbc822eaea70..4ad8266a51c576edb7c60cf74b6c8cf14008caf3 100644 (file)
@@ -755,6 +755,30 @@ assign_client_encoding(const char *newval, void *extra)
 {
        int                     encoding = *((int *) extra);
 
+       /*
+        * Parallel workers send data to the leader, not the client.  They always
+        * send data using the database encoding.
+        */
+       if (IsParallelWorker())
+       {
+               /*
+                * During parallel worker startup, we want to accept the leader's
+                * client_encoding setting so that anyone who looks at the value in
+                * the worker sees the same value that they would see in the leader.
+                */
+               if (InitializingParallelWorker)
+                       return;
+
+               /*
+                * A change other than during startup, for example due to a SET clause
+                * attached to a function definition, should be rejected, as there is
+                * nothing we can do inside the worker to make it take effect.
+                */
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+                                errmsg("cannot change client_encoding in a parallel worker")));
+       }
+
        /* We do not expect an error if PrepareClientEncoding succeeded */
        if (SetClientEncoding(encoding) < 0)
                elog(LOG, "SetClientEncoding(%d) failed", encoding);
index 4ddea8285fc1758e330dd15d4def908004040722..b5d9d64e5479355759862df21db0787196831e45 100644 (file)
@@ -65,6 +65,7 @@
  *             pq_copymsgbytes - copy raw data from a message buffer
  *             pq_getmsgtext   - get a counted text string (with conversion)
  *             pq_getmsgstring - get a null-terminated text string (with conversion)
+ *             pq_getmsgrawstring - get a null-terminated text string - NO conversion
  *             pq_getmsgend    - verify message fully consumed
  */
 
@@ -639,6 +640,35 @@ pq_getmsgstring(StringInfo msg)
        return pg_client_to_server(str, slen);
 }
 
+/* --------------------------------
+ *             pq_getmsgrawstring - get a null-terminated text string - NO conversion
+ *
+ *             Returns a pointer directly into the message buffer.
+ * --------------------------------
+ */
+const char *
+pq_getmsgrawstring(StringInfo msg)
+{
+       char       *str;
+       int                     slen;
+
+       str = &msg->data[msg->cursor];
+
+       /*
+        * It's safe to use strlen() here because a StringInfo is guaranteed to
+        * have a trailing null byte.  But check we found a null inside the
+        * message.
+        */
+       slen = strlen(str);
+       if (msg->cursor + slen >= msg->len)
+               ereport(ERROR,
+                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                errmsg("invalid string in message")));
+       msg->cursor += slen + 1;
+
+       return str;
+}
+
 /* --------------------------------
  *             pq_getmsgend    - verify message fully consumed
  * --------------------------------
index 3225c1fa0e7f5c1673d13506710504a31e0ef7c3..0dcdee03db53fdd8584e5b4594afac5746b46c30 100644 (file)
@@ -232,7 +232,7 @@ pq_parse_errornotice(StringInfo msg, ErrorData *edata)
                        pq_getmsgend(msg);
                        break;
                }
-               value = pq_getmsgstring(msg);
+               value = pq_getmsgrawstring(msg);
 
                switch (code)
                {
index b4c13fac4a311498c020293b27058a5b90909d35..95559df19fea933ce1fb85da4ec166430062963e 100644 (file)
@@ -28,6 +28,10 @@ extern volatile sig_atomic_t notifyInterruptPending;
 extern Size AsyncShmemSize(void);
 extern void AsyncShmemInit(void);
 
+extern void NotifyMyFrontEnd(const char *channel,
+                                                        const char *payload,
+                                                        int32 srcPid);
+
 /* notify-related SQL statements */
 extern void Async_Notify(const char *channel, const char *payload);
 extern void Async_Listen(const char *channel);
index 65ebf37fbc0bae12e592f01bc2f7ddb7fe05a34a..3c0d4b2622bfffe9f2745871d98db10b9daef2b0 100644 (file)
@@ -44,6 +44,7 @@ extern const char *pq_getmsgbytes(StringInfo msg, int datalen);
 extern void pq_copymsgbytes(StringInfo msg, char *buf, int datalen);
 extern char *pq_getmsgtext(StringInfo msg, int rawbytes, int *nbytes);
 extern const char *pq_getmsgstring(StringInfo msg);
+extern const char *pq_getmsgrawstring(StringInfo msg);
 extern void pq_getmsgend(StringInfo msg);
 
 #endif   /* PQFORMAT_H */