From: Fujii Masao Date: Fri, 4 Jul 2014 03:00:48 +0000 (+0900) Subject: Refactor pg_receivexlog main loop code, for readability. X-Git-Tag: REL9_5_ALPHA1~1765 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=74cbe966fe2d76de1d607d933c98c144dab58769;p=postgresql Refactor pg_receivexlog main loop code, for readability. Previously the source codes for receiving the data and for polling the socket were included in pg_receivexlog main loop. This commit splits out them as separate functions. This is useful for improving the readability of main loop code and making the future pg_receivexlog-related patch simpler. --- diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index d76e605e21..4aa35da2fc 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -35,6 +35,8 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, XLogRecPtr *stoppos); +static int CopyStreamPoll(PGconn *conn, long timeout_ms); +static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); @@ -744,12 +746,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, int bytes_written; int64 now; int hdr_len; - - if (copybuf != NULL) - { - PQfreemem(copybuf); - copybuf = NULL; - } + long sleeptime; /* * Check if we should continue streaming, or abort at this point. @@ -784,67 +781,38 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, last_status = now; } - r = PQgetCopyData(conn, ©buf, 1); - if (r == 0) + /* + * Compute how long send/receive loops should sleep + */ + if (standby_message_timeout && still_sending) { - /* - * No data available. Wait for some to appear, but not longer than - * the specified timeout, so that we can ping the server. - */ - fd_set input_mask; - struct timeval timeout; - struct timeval *timeoutptr; - - FD_ZERO(&input_mask); - FD_SET(PQsocket(conn), &input_mask); - if (standby_message_timeout && still_sending) + int64 targettime; + long secs; + int usecs; + + targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000); + feTimestampDifference(now, + targettime, + &secs, + &usecs); + /* Always sleep at least 1 sec */ + if (secs <= 0) { - int64 targettime; - long secs; - int usecs; - - targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000); - feTimestampDifference(now, - targettime, - &secs, - &usecs); - if (secs <= 0) - timeout.tv_sec = 1; /* Always sleep at least 1 sec */ - else - timeout.tv_sec = secs; - timeout.tv_usec = usecs; - timeoutptr = &timeout; + secs = 1; + usecs = 0; } - else - timeoutptr = NULL; - r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr); - if (r == 0 || (r < 0 && errno == EINTR)) - { - /* - * Got a timeout or signal. Continue the loop and either - * deliver a status packet to the server or just go back into - * blocking. - */ - continue; - } - else if (r < 0) - { - fprintf(stderr, _("%s: select() failed: %s\n"), - progname, strerror(errno)); - goto error; - } - /* Else there is actually data on the socket */ - if (PQconsumeInput(conn) == 0) - { - fprintf(stderr, - _("%s: could not receive data from WAL stream: %s"), - progname, PQerrorMessage(conn)); - goto error; - } - continue; + sleeptime = secs * 1000 + usecs / 1000; } + else + sleeptime = -1; + + r = CopyStreamReceive(conn, sleeptime, ©buf); + if (r == 0) + continue; if (r == -1) + goto error; + if (r == -2) { PGresult *res = PQgetResult(conn); @@ -877,15 +845,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, } if (copybuf != NULL) PQfreemem(copybuf); + copybuf = NULL; *stoppos = blockpos; return res; } - if (r == -2) - { - fprintf(stderr, _("%s: could not read COPY data: %s"), - progname, PQerrorMessage(conn)); - goto error; - } /* Check the message type. */ if (copybuf[0] == 'k') @@ -1056,3 +1019,115 @@ error: PQfreemem(copybuf); return NULL; } + +/* + * Wait until we can read CopyData message, or timeout. + * + * Returns 1 if data has become available for reading, 0 if timed out + * or interrupted by signal, and -1 on an error. + */ +static int +CopyStreamPoll(PGconn *conn, long timeout_ms) +{ + int ret; + fd_set input_mask; + struct timeval timeout; + struct timeval *timeoutptr; + + if (PQsocket(conn) < 0) + { + fprintf(stderr, _("%s: socket not open"), progname); + return -1; + } + + FD_ZERO(&input_mask); + FD_SET(PQsocket(conn), &input_mask); + + if (timeout_ms < 0) + timeoutptr = NULL; + else + { + timeout.tv_sec = timeout_ms / 1000L; + timeout.tv_usec = (timeout_ms % 1000L) * 1000L; + timeoutptr = &timeout; + } + + ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr); + if (ret == 0 || (ret < 0 && errno == EINTR)) + return 0; /* Got a timeout or signal */ + else if (ret < 0) + { + fprintf(stderr, _("%s: select() failed: %s\n"), + progname, strerror(errno)); + return -1; + } + + return 1; +} + +/* + * Receive CopyData message available from XLOG stream, blocking for + * maximum of 'timeout' ms. + * + * If data was received, returns the length of the data. *buffer is set to + * point to a buffer holding the received message. The buffer is only valid + * until the next CopyStreamReceive call. + * + * 0 if no data was available within timeout, or wait was interrupted + * by signal. -1 on error. -2 if the server ended the COPY. + */ +static int +CopyStreamReceive(PGconn *conn, long timeout, char **buffer) +{ + static char *copybuf = NULL; + int rawlen; + + if (copybuf != NULL) + PQfreemem(copybuf); + copybuf = NULL; + *buffer = NULL; + + /* Try to receive a CopyData message */ + rawlen = PQgetCopyData(conn, ©buf, 1); + if (rawlen == 0) + { + /* + * No data available. Wait for some to appear, but not longer than + * the specified timeout, so that we can ping the server. + */ + if (timeout > 0) + { + int ret; + + ret = CopyStreamPoll(conn, timeout); + if (ret <= 0) + return ret; + } + + /* Else there is actually data on the socket */ + if (PQconsumeInput(conn) == 0) + { + fprintf(stderr, + _("%s: could not receive data from WAL stream: %s"), + progname, PQerrorMessage(conn)); + return -1; + } + + /* Now that we've consumed some input, try again */ + rawlen = PQgetCopyData(conn, ©buf, 1); + if (rawlen == 0) + return 0; + } + if (rawlen == -1) /* end-of-streaming or error */ + return -2; + if (rawlen == -2) + { + fprintf(stderr, _("%s: could not read COPY data: %s"), + progname, PQerrorMessage(conn)); + return -1; + } + + /* Return received messages to caller */ + *buffer = copybuf; + return rawlen; +}