Fix the logic in libpqrcv_receive() to determine if there's any incoming data
that can be read without blocking. It used to conclude that there isn't, even though there was data in the socket receive buffer. That lead walreceiver to flush the WAL after every received chunk, potentially causing big performance issues. Backpatch to 9.0, because the performance impact can be very significant.
This commit is contained in:
parent
c667cc24e8
commit
a5a02a7445
@ -41,7 +41,6 @@ void _PG_init(void);
|
||||
|
||||
/* Current connection to the primary, if any */
|
||||
static PGconn *streamConn = NULL;
|
||||
static bool justconnected = false;
|
||||
|
||||
/* Buffer for currently read records */
|
||||
static char *recvBuf = NULL;
|
||||
@ -168,7 +167,6 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
|
||||
}
|
||||
PQclear(res);
|
||||
|
||||
justconnected = true;
|
||||
ereport(LOG,
|
||||
(errmsg("streaming replication successfully connected to primary")));
|
||||
|
||||
@ -321,7 +319,6 @@ libpqrcv_disconnect(void)
|
||||
{
|
||||
PQfinish(streamConn);
|
||||
streamConn = NULL;
|
||||
justconnected = false;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -351,28 +348,30 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
|
||||
PQfreemem(recvBuf);
|
||||
recvBuf = NULL;
|
||||
|
||||
/*
|
||||
* If the caller requested to block, wait for data to arrive. But if this
|
||||
* is the first call after connecting, don't wait, because there might
|
||||
* already be some data in libpq buffer that we haven't returned to
|
||||
* caller.
|
||||
*/
|
||||
if (timeout > 0 && !justconnected)
|
||||
/* Try to receive a CopyData message */
|
||||
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
|
||||
if (rawlen == 0)
|
||||
{
|
||||
if (!libpq_select(timeout))
|
||||
return false;
|
||||
/*
|
||||
* No data available yet. If the caller requested to block, wait for
|
||||
* more data to arrive.
|
||||
*/
|
||||
if (timeout > 0)
|
||||
{
|
||||
if (!libpq_select(timeout))
|
||||
return false;
|
||||
}
|
||||
|
||||
if (PQconsumeInput(streamConn) == 0)
|
||||
ereport(ERROR,
|
||||
(errmsg("could not receive data from WAL stream: %s",
|
||||
PQerrorMessage(streamConn))));
|
||||
}
|
||||
justconnected = false;
|
||||
|
||||
/* Receive CopyData message */
|
||||
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
|
||||
if (rawlen == 0) /* no data available yet, then return */
|
||||
return false;
|
||||
/* Now that we've consumed some input, try again */
|
||||
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
|
||||
if (rawlen == 0)
|
||||
return false;
|
||||
}
|
||||
if (rawlen == -1) /* end-of-streaming or error */
|
||||
{
|
||||
PGresult *res;
|
||||
|
Loading…
Reference in New Issue
Block a user