]> granicus.if.org Git - postgresql/commitdiff
Now that START_REPLICATION returns the next timeline's ID after reaching end
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 18 Jan 2013 09:48:29 +0000 (11:48 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 18 Jan 2013 09:59:34 +0000 (11:59 +0200)
of timeline, take advantage of that in walreceiver.

Startup process is still in control of choosign the target timeline, by
scanning the timeline history files present in pg_xlog, but walreceiver now
uses the next timeline's ID to fetch its history file immediately after it
has finished streaming the old timeline. Before, the standby would first try
to restart streaming on the old timeline, which fetches the missing timeline
history file as a side-effect, and only then restart from the new timeline.
This patch eliminates the extra iteration, which speeds up the timeline
switch and reduces the noise in the log caused by the extra restart on the
old timeline.

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/walreceiver.c
src/include/replication/walreceiver.h

index 84ab25b0e2a5666d310305128124b0bee0534148..e6e670e9e4bc8f8002d5b06b672fb46c2a79765b 100644 (file)
@@ -50,7 +50,7 @@ static void libpqrcv_connect(char *conninfo);
 static void libpqrcv_identify_system(TimeLineID *primary_tli);
 static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
 static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint);
-static void libpqrcv_endstreaming(void);
+static void libpqrcv_endstreaming(TimeLineID *next_tli);
 static int libpqrcv_receive(int timeout, char **buffer);
 static void libpqrcv_send(const char *buffer, int nbytes);
 static void libpqrcv_disconnect(void);
@@ -199,10 +199,11 @@ libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint)
 }
 
 /*
- * Stop streaming WAL data.
+ * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
+ * reported by the server, or 0 if it did not report it.
  */
 static void
-libpqrcv_endstreaming(void)
+libpqrcv_endstreaming(TimeLineID *next_tli)
 {
        PGresult   *res;
 
@@ -211,33 +212,42 @@ libpqrcv_endstreaming(void)
                                (errmsg("could not send end-of-streaming message to primary: %s",
                                                PQerrorMessage(streamConn))));
 
-       /* Read the command result after COPY is finished */
-
-       while ((res = PQgetResult(streamConn)) != NULL)
+       /*
+        * After COPY is finished, we should receive a result set indicating the
+        * next timeline's ID, or just CommandComplete if the server was shut down.
+        *
+        * If we had not yet received CopyDone from the backend, PGRES_COPY_IN
+        * would also be possible. However, at the moment this function is only
+        * called after receiving CopyDone from the backend - the walreceiver
+        * never terminates replication on its own initiative.
+        */
+       res = PQgetResult(streamConn);
+       if (PQresultStatus(res) == PGRES_TUPLES_OK)
        {
-               /*
-                * After Copy, if the streaming ended because we reached end of the
-                * timeline, server sends one result set with the next timeline's ID.
-                * We don't need it, so just slurp and ignore it.
-                *
-                * If we had not yet received CopyDone from the backend, PGRES_COPY_IN
-                * is also possible. However, at the moment this function is only
-                * called after receiving CopyDone from the backend - the walreceiver
-                * never terminates replication on its own initiative.
-                */
-               switch (PQresultStatus(res))
-               {
-                       case PGRES_COMMAND_OK:
-                       case PGRES_TUPLES_OK:
-                               break;
-
-                       default:
-                               ereport(ERROR,
-                                               (errmsg("error reading result of streaming command: %s",
-                                                               PQerrorMessage(streamConn))));
-               }
+               /* Read the next timeline's ID */
+               if (PQnfields(res) != 1 || PQntuples(res) != 1)
+                       ereport(ERROR,
+                                       (errmsg("unexpected result set after end-of-streaming")));
+               *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
                PQclear(res);
+
+               /* the result set should be followed by CommandComplete */
+               res = PQgetResult(streamConn);
        }
+       else
+               *next_tli = 0;
+
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               ereport(ERROR,
+                               (errmsg("error reading result of streaming command: %s",
+                                               PQerrorMessage(streamConn))));
+
+       /* Verify that there are no more results */
+       res = PQgetResult(streamConn);
+       if (res != NULL)
+               ereport(ERROR,
+                               (errmsg("unexpected result after CommandComplete: %s",
+                                               PQerrorMessage(streamConn))));
 }
 
 /*
index 444be9463bc7dd50297bcabc9944ef662993b855..73592973ac6218c936b0f12abf5f337106dcb4d2 100644 (file)
@@ -505,8 +505,15 @@ WalReceiverMain(void)
                         * our side, too.
                         */
                        EnableWalRcvImmediateExit();
-                       walrcv_endstreaming();
+                       walrcv_endstreaming(&primaryTLI);
                        DisableWalRcvImmediateExit();
+
+                       /*
+                        * If the server had switched to a new timeline that we didn't know
+                        * about when we began streaming, fetch its timeline history file
+                        * now.
+                        */
+                       WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
                }
                else
                        ereport(LOG,
index 3afc4a86bbd6133ad6540a6c44827b08f4f46439..72878f84c618fd200f0dc5e8514d6d6feabd8175 100644 (file)
@@ -128,7 +128,7 @@ extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistor
 typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint);
 extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;
 
-typedef void (*walrcv_endstreaming_type) (void);
+typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli);
 extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming;
 
 typedef int (*walrcv_receive_type) (int timeout, char **buffer);