From d3d414696f39e2b57072fab3dd4fa11e465be4ed Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Sat, 11 Dec 2010 09:27:37 -0500 Subject: [PATCH] Allow bidirectional copy messages in streaming replication mode. Fujii Masao. Review by Alvaro Herrera, Tom Lane, and myself. --- doc/src/sgml/libpq.sgml | 10 ++ doc/src/sgml/protocol.sgml | 98 +++++++++++++++++-- .../libpqwalreceiver/libpqwalreceiver.c | 22 ++++- src/backend/replication/walreceiver.c | 3 +- src/backend/replication/walsender.c | 4 +- src/include/replication/walreceiver.h | 3 + src/interfaces/libpq/fe-exec.c | 27 ++++- src/interfaces/libpq/fe-protocol2.c | 4 + src/interfaces/libpq/fe-protocol3.c | 18 +++- src/interfaces/libpq/libpq-fe.h | 1 + src/interfaces/libpq/libpq-int.h | 3 +- 11 files changed, 172 insertions(+), 21 deletions(-) diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index c253c7c61c..c502439356 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -2194,6 +2194,16 @@ ExecStatusType PQresultStatus(const PGresult *res); + + PGRES_COPY_BOTH + + + Copy In/Out (to and from server) data transfer started. This is + currently used only for streaming replication. + + + + PGRES_BAD_RESPONSE diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 7b2482be5a..e3d636d557 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1033,12 +1033,25 @@ - The CopyInResponse and CopyOutResponse messages include fields that - inform the frontend of the number of columns per row and the format - codes being used for each column. (As of the present implementation, - all columns in a given COPY operation will use the same - format, but the message design does not assume this.) + There is another Copy-related mode called Copy-both, which allows + high-speed bulk data transfer to and from the server. + Copy-both mode is initiated when a backend in walsender mode + executes a START_REPLICATION statement. The + backend sends a CopyBothResponse message to the frontend. Both + the backend and the frontend may then send CopyData messages + until the connection is terminated. See see . + + + The CopyInResponse, CopyOutResponse and CopyBothResponse messages + include fields that inform the frontend of the number of columns + per row and the format codes being used for each column. (As of + the present implementation, all columns in a given COPY + operation will use the same format, but the message design does not + assume this.) + + @@ -1344,7 +1357,7 @@ The commands accepted in walsender mode are: WAL position XXX/XXX. The server can reply with an error, e.g. if the requested section of WAL has already been recycled. On success, server responds with a - CopyOutResponse message, and then starts to stream WAL to the frontend. + CopyBothResponse message, and then starts to stream WAL to the frontend. WAL will continue to be streamed until the connection is broken; no further commands will be accepted. @@ -2694,6 +2707,79 @@ CopyOutResponse (B) + + +CopyBothResponse (B) + + + + + + + + Byte1('W') + + + + Identifies the message as a Start Copy Both response. + This message is used only for Streaming Replication. + + + + + + Int32 + + + + Length of message contents in bytes, including self. + + + + + + Int8 + + + + 0 indicates the overall COPY format + is textual (rows separated by newlines, columns + separated by separator characters, etc). 1 indicates + the overall copy format is binary (similar to DataRow + format). See for more information. + + + + + + Int16 + + + + The number of columns in the data to be copied + (denoted N below). + + + + + + Int16[N] + + + + The format codes to be used for each column. + Each must presently be zero (text) or one (binary). + All must be zero if the overall copy format is textual. + + + + + + + + + + DataRow (B) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index f66a6b46b9..d1ab36755f 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -50,6 +50,7 @@ static char *recvBuf = NULL; static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint); static bool libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len); +static void libpqrcv_send(const char *buffer, int nbytes); static void libpqrcv_disconnect(void); /* Prototypes for private functions */ @@ -64,10 +65,11 @@ _PG_init(void) { /* Tell walreceiver how to reach us */ if (walrcv_connect != NULL || walrcv_receive != NULL || - walrcv_disconnect != NULL) + walrcv_send != NULL || walrcv_disconnect != NULL) elog(ERROR, "libpqwalreceiver already loaded"); walrcv_connect = libpqrcv_connect; walrcv_receive = libpqrcv_receive; + walrcv_send = libpqrcv_send; walrcv_disconnect = libpqrcv_disconnect; } @@ -157,7 +159,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", startpoint.xlogid, startpoint.xrecoff); res = libpqrcv_PQexec(cmd); - if (PQresultStatus(res) != PGRES_COPY_OUT) + if (PQresultStatus(res) != PGRES_COPY_BOTH) { PQclear(res); ereport(ERROR, @@ -303,6 +305,7 @@ libpqrcv_PQexec(const char *query) if (PQresultStatus(lastResult) == PGRES_COPY_IN || PQresultStatus(lastResult) == PGRES_COPY_OUT || + PQresultStatus(lastResult) == PGRES_COPY_BOTH || PQstatus(streamConn) == CONNECTION_BAD) break; } @@ -398,3 +401,18 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) return true; } + +/* + * Send a message to XLOG stream. + * + * ereports on error. + */ +static void +libpqrcv_send(const char *buffer, int nbytes) +{ + if (PQputCopyData(streamConn, buffer, nbytes) <= 0 || + PQflush(streamConn)) + ereport(ERROR, + (errmsg("could not send data to WAL stream: %s", + PQerrorMessage(streamConn)))); +} diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index a49ff6c896..fac3be340f 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -57,6 +57,7 @@ bool am_walreceiver; /* libpqreceiver hooks to these when loaded */ walrcv_connect_type walrcv_connect = NULL; walrcv_receive_type walrcv_receive = NULL; +walrcv_send_type walrcv_send = NULL; walrcv_disconnect_type walrcv_disconnect = NULL; #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ @@ -247,7 +248,7 @@ WalReceiverMain(void) /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); if (walrcv_connect == NULL || walrcv_receive == NULL || - walrcv_disconnect == NULL) + walrcv_send == NULL || walrcv_disconnect == NULL) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d2b9e5c5f9..c8d2433158 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -287,8 +287,8 @@ WalSndHandshake(void) (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("standby connections not allowed because wal_level=minimal"))); - /* Send a CopyOutResponse message, and start streaming */ - pq_beginmessage(&buf, 'H'); + /* Send a CopyBothResponse message, and start streaming */ + pq_beginmessage(&buf, 'W'); pq_sendbyte(&buf, 0); pq_sendint(&buf, 0, 2); pq_endmessage(&buf); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index df7eadfa9c..485df782ff 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -84,6 +84,9 @@ typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type, char **buffer, int *len); extern PGDLLIMPORT walrcv_receive_type walrcv_receive; +typedef void (*walrcv_send_type) (const char *buffer, int nbytes); +extern PGDLLIMPORT walrcv_send_type walrcv_send; + typedef void (*walrcv_disconnect_type) (void); extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect; diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 8f25f5eb27..9858faeaa6 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -35,6 +35,7 @@ char *const pgresStatus[] = { "PGRES_TUPLES_OK", "PGRES_COPY_OUT", "PGRES_COPY_IN", + "PGRES_COPY_BOTH", "PGRES_BAD_RESPONSE", "PGRES_NONFATAL_ERROR", "PGRES_FATAL_ERROR" @@ -174,6 +175,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) case PGRES_TUPLES_OK: case PGRES_COPY_OUT: case PGRES_COPY_IN: + case PGRES_COPY_BOTH: /* non-error cases */ break; default: @@ -1591,6 +1593,12 @@ PQgetResult(PGconn *conn) else res = PQmakeEmptyPGresult(conn, PGRES_COPY_OUT); break; + case PGASYNC_COPY_BOTH: + if (conn->result && conn->result->resultStatus == PGRES_COPY_BOTH) + res = pqPrepareAsyncResult(conn); + else + res = PQmakeEmptyPGresult(conn, PGRES_COPY_BOTH); + break; default: printfPQExpBuffer(&conn->errorMessage, libpq_gettext("unexpected asyncStatus: %d\n"), @@ -1775,6 +1783,13 @@ PQexecStart(PGconn *conn) return false; } } + else if (resultStatus == PGRES_COPY_BOTH) + { + /* We don't allow PQexec during COPY BOTH */ + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("PQexec not allowed during COPY BOTH\n")); + return false; + } /* check for loss of connection, too */ if (conn->status == CONNECTION_BAD) return false; @@ -1798,7 +1813,7 @@ PQexecFinish(PGconn *conn) * than one --- but merge error messages if we get more than one error * result. * - * We have to stop if we see copy in/out, however. We will resume parsing + * We have to stop if we see copy in/out/both, however. We will resume parsing * after application performs the data transfer. * * Also stop if the connection is lost (else we'll loop infinitely). @@ -1827,6 +1842,7 @@ PQexecFinish(PGconn *conn) lastResult = result; if (result->resultStatus == PGRES_COPY_IN || result->resultStatus == PGRES_COPY_OUT || + result->resultStatus == PGRES_COPY_BOTH || conn->status == CONNECTION_BAD) break; } @@ -2000,7 +2016,7 @@ PQnotifies(PGconn *conn) } /* - * PQputCopyData - send some data to the backend during COPY IN + * PQputCopyData - send some data to the backend during COPY IN or COPY BOTH * * Returns 1 if successful, 0 if data could not be sent (only possible * in nonblock mode), or -1 if an error occurs. @@ -2010,7 +2026,8 @@ PQputCopyData(PGconn *conn, const char *buffer, int nbytes) { if (!conn) return -1; - if (conn->asyncStatus != PGASYNC_COPY_IN) + if (conn->asyncStatus != PGASYNC_COPY_IN && + conn->asyncStatus != PGASYNC_COPY_BOTH) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("no COPY in progress\n")); @@ -2148,6 +2165,7 @@ PQputCopyEnd(PGconn *conn, const char *errormsg) /* * PQgetCopyData - read a row of data from the backend during COPY OUT + * or COPY BOTH * * If successful, sets *buffer to point to a malloc'd row of data, and * returns row length (always > 0) as result. @@ -2161,7 +2179,8 @@ PQgetCopyData(PGconn *conn, char **buffer, int async) *buffer = NULL; /* for all failure cases */ if (!conn) return -2; - if (conn->asyncStatus != PGASYNC_COPY_OUT) + if (conn->asyncStatus != PGASYNC_COPY_OUT && + conn->asyncStatus != PGASYNC_COPY_BOTH) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("no COPY in progress\n")); diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index 31eff831ee..ccf1342329 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -541,6 +541,10 @@ pqParseInput2(PGconn *conn) case 'H': /* Start Copy Out */ conn->asyncStatus = PGASYNC_COPY_OUT; break; + /* + * Don't need to process CopyBothResponse here because + * it never arrives from the server during protocol 2.0. + */ default: printfPQExpBuffer(&conn->errorMessage, libpq_gettext( diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index cf9407de7a..c398304156 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -358,6 +358,12 @@ pqParseInput3(PGconn *conn) conn->asyncStatus = PGASYNC_COPY_OUT; conn->copy_already_done = 0; break; + case 'W': /* Start Copy Both */ + if (getCopyStart(conn, PGRES_COPY_BOTH)) + return; + conn->asyncStatus = PGASYNC_COPY_BOTH; + conn->copy_already_done = 0; + break; case 'd': /* Copy Data */ /* @@ -1196,7 +1202,8 @@ getNotify(PGconn *conn) } /* - * getCopyStart - process CopyInResponse or CopyOutResponse message + * getCopyStart - process CopyInResponse, CopyOutResponse or + * CopyBothResponse message * * parseInput already read the message type and length. */ @@ -1367,6 +1374,7 @@ getCopyDataMessage(PGconn *conn) /* * PQgetCopyData - read a row of data from the backend during COPY OUT + * or COPY BOTH * * If successful, sets *buffer to point to a malloc'd row of data, and * returns row length (always > 0) as result. @@ -1390,10 +1398,10 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async) if (msgLength < 0) { /* - * On end-of-copy, exit COPY_OUT mode and let caller read status - * with PQgetResult(). The normal case is that it's Copy Done, - * but we let parseInput read that. If error, we expect the state - * was already changed. + * On end-of-copy, exit COPY_OUT or COPY_BOTH mode and let caller + * read status with PQgetResult(). The normal case is that it's + * Copy Done, but we let parseInput read that. If error, we expect + * the state was already changed. */ if (msgLength == -1) conn->asyncStatus = PGASYNC_BUSY; diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index d9e3067894..271afedb7b 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -85,6 +85,7 @@ typedef enum * contains the result tuples */ PGRES_COPY_OUT, /* Copy Out data transfer in progress */ PGRES_COPY_IN, /* Copy In data transfer in progress */ + PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */ PGRES_BAD_RESPONSE, /* an unexpected response was recv'd from the * backend */ PGRES_NONFATAL_ERROR, /* notice or warning message */ diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index ce5f330f9e..bac3d0b3bf 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -218,7 +218,8 @@ typedef enum PGASYNC_BUSY, /* query in progress */ PGASYNC_READY, /* result ready for PQgetResult */ PGASYNC_COPY_IN, /* Copy In data transfer in progress */ - PGASYNC_COPY_OUT /* Copy Out data transfer in progress */ + PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */ + PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */ } PGAsyncStatusType; /* PGQueryClass tracks which query protocol we are now executing */