diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 290236e8b6..b6ba22e567 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -376,6 +376,10 @@ PostgresPollingStatusType *PQconnectPoll(PQconn *conn)
tested under Windows, and so it is currently off by default. This may be
changed in the future.
+
+ These functions leave the socket in a non-blocking state as if
+ PQsetnonblocking had been called.
+
These functions are not thread-safe.
@@ -1168,8 +1172,58 @@ discarded by PQexec.
Applications that do not like these limitations can instead use the
underlying functions that PQexec is built from:
PQsendQuery and PQgetResult.
+
+
+Older programs that used this functionality as well as
+PQputline and PQputnbytes
+could block waiting to send data to the backend, to
+address that issue, the function PQsetnonblocking
+was added.
+
+
+Old applications can neglect to use PQsetnonblocking
+and get the older potentially blocking behavior. Newer programs can use
+PQsetnonblocking to achieve a completely non-blocking
+connection to the backend.
+
+
+ PQsetnonblocking Sets the state of the connection
+ to non-blocking.
+
+int PQsetnonblocking(PGconn *conn)
+
+ this function will ensure that calls to
+ PQputline, PQputnbytes,
+ PQsendQuery and PQendcopy
+ will not block but instead return an error if they need to be called
+ again.
+
+
+ When a database connection has been set to non-blocking mode and
+ PQexec is called, it will temporarily set the state
+ of the connection to blocking until the PQexec
+ completes.
+
+
+ More of libpq is expected to be made safe for
+ PQsetnonblocking functionality in the near future.
+
+
+
+
+
+PQisnonblocking
+ Returns the blocking status of the database connection.
+
+int PQisnonblocking(const PGconn *conn)
+
+ Returns TRUE if the connection is set to non-blocking mode,
+ FALSE if blocking.
+
+
+
PQsendQuery
@@ -1265,23 +1319,46 @@ state will never end.
+
+
+PQflush Attempt to flush any data queued to the backend,
+returns 0 if successful (or if the send queue is empty) or EOF if it failed for
+some reason.
+
+int PQflush(PGconn *conn);
+
+PQflush needs to be called on a non-blocking connection
+before calling select to determine if a responce has
+arrived. If 0 is returned it ensures that there is no data queued to the
+backend that has not actually been sent. Only applications that have used
+PQsetnonblocking have a need for this.
+
+
+
PQsocket
Obtain the file descriptor number for the backend connection socket.
- A valid descriptor will be >= 0; a result of -1 indicates that
+ A valid descriptor will be >= 0; a result of -1 indicates that
no backend connection is currently open.
int PQsocket(const PGconn *conn);
PQsocket should be used to obtain the backend socket descriptor
in preparation for executing select(2). This allows an
-application to wait for either backend responses or other conditions.
+application using a blocking connection to wait for either backend responses or
+other conditions.
If the result of select(2) indicates that data can be read from
the backend socket, then PQconsumeInput should be called to read the
data; after which, PQisBusy, PQgetResult,
and/or PQnotifies can be used to process the response.
+
+Non-blocking connections (that have used PQsetnonblocking)
+should not use select until PQflush
+has returned 0 indicating that there is no buffered data waiting to be sent
+to the backend.
+
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 4318357360..818b85f0ea 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/interfaces/libpq/fe-connect.c,v 1.111 2000/01/16 21:18:52 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/interfaces/libpq/fe-connect.c,v 1.112 2000/01/18 06:09:24 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -594,31 +594,6 @@ update_db_info(PGconn *conn)
return 0;
}
-
-/* ----------
- * connectMakeNonblocking -
- * Make a connection non-blocking.
- * Returns 1 if successful, 0 if not.
- * ----------
- */
-static int
-connectMakeNonblocking(PGconn *conn)
-{
-#ifndef WIN32
- if (fcntl(conn->sock, F_SETFL, O_NONBLOCK) < 0)
-#else
- if (ioctlsocket(conn->sock, FIONBIO, &on) != 0)
-#endif
- {
- printfPQExpBuffer(&conn->errorMessage,
- "connectMakeNonblocking -- fcntl() failed: errno=%d\n%s\n",
- errno, strerror(errno));
- return 0;
- }
-
- return 1;
-}
-
/* ----------
* connectNoDelay -
* Sets the TCP_NODELAY socket option.
@@ -789,7 +764,7 @@ connectDBStart(PGconn *conn)
* Ewan Mellor .
* ---------- */
#if (!defined(WIN32) || defined(WIN32_NON_BLOCKING_CONNECTIONS)) && !defined(USE_SSL)
- if (!connectMakeNonblocking(conn))
+ if (PQsetnonblocking(conn, TRUE) != 0)
goto connect_errReturn;
#endif
@@ -898,7 +873,7 @@ connectDBStart(PGconn *conn)
/* This makes the connection non-blocking, for all those cases which forced us
not to do it above. */
#if (defined(WIN32) && !defined(WIN32_NON_BLOCKING_CONNECTIONS)) || defined(USE_SSL)
- if (!connectMakeNonblocking(conn))
+ if (PQsetnonblocking(conn, TRUE) != 0)
goto connect_errReturn;
#endif
@@ -1720,6 +1695,7 @@ makeEmptyPGconn(void)
conn->inBuffer = (char *) malloc(conn->inBufSize);
conn->outBufSize = 8 * 1024;
conn->outBuffer = (char *) malloc(conn->outBufSize);
+ conn->nonblocking = FALSE;
initPQExpBuffer(&conn->errorMessage);
initPQExpBuffer(&conn->workBuffer);
if (conn->inBuffer == NULL ||
@@ -1830,6 +1806,7 @@ closePGconn(PGconn *conn)
conn->lobjfuncs = NULL;
conn->inStart = conn->inCursor = conn->inEnd = 0;
conn->outCount = 0;
+ conn->nonblocking = FALSE;
}
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index e6fb7e9ee7..9840cc3b9c 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -7,12 +7,13 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v 1.86 1999/11/11 00:10:14 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v 1.87 2000/01/18 06:09:24 momjian Exp $
*
*-------------------------------------------------------------------------
*/
#include
#include
+#include
#include "postgres.h"
#include "libpq-fe.h"
@@ -24,7 +25,6 @@
#include
#endif
-
/* keep this in same order as ExecStatusType in libpq-fe.h */
const char *const pgresStatus[] = {
"PGRES_EMPTY_QUERY",
@@ -514,13 +514,53 @@ PQsendQuery(PGconn *conn, const char *query)
conn->curTuple = NULL;
/* send the query to the backend; */
- /* the frontend-backend protocol uses 'Q' to designate queries */
- if (pqPutnchar("Q", 1, conn) ||
- pqPuts(query, conn) ||
- pqFlush(conn))
+
+ /*
+ * in order to guarantee that we don't send a partial query
+ * where we would become out of sync with the backend and/or
+ * block during a non-blocking connection we must first flush
+ * the send buffer before sending more data
+ *
+ * an alternative is to implement 'queue reservations' where
+ * we are able to roll up a transaction
+ * (the 'Q' along with our query) and make sure we have
+ * enough space for it all in the send buffer.
+ */
+ if (pqIsnonblocking(conn))
{
- handleSendFailure(conn);
- return 0;
+ /*
+ * the buffer must have emptied completely before we allow
+ * a new query to be buffered
+ */
+ if (pqFlush(conn))
+ return 0;
+ /* 'Q' == queries */
+ /* XXX: if we fail here we really ought to not block */
+ if (pqPutnchar("Q", 1, conn) ||
+ pqPuts(query, conn))
+ {
+ handleSendFailure(conn);
+ return 0;
+ }
+ /*
+ * give the data a push, ignore the return value as
+ * ConsumeInput() will do any aditional flushing if needed
+ */
+ (void) pqFlush(conn);
+ }
+ else
+ {
+ /*
+ * the frontend-backend protocol uses 'Q' to
+ * designate queries
+ */
+ if (pqPutnchar("Q", 1, conn) ||
+ pqPuts(query, conn) ||
+ pqFlush(conn))
+ {
+ handleSendFailure(conn);
+ return 0;
+ }
}
/* OK, it's launched! */
@@ -574,7 +614,17 @@ PQconsumeInput(PGconn *conn)
* we will NOT block waiting for more input.
*/
if (pqReadData(conn) < 0)
+ {
+ /*
+ * for non-blocking connections
+ * try to flush the send-queue otherwise we may never get a
+ * responce for something that may not have already been sent
+ * because it's in our write buffer!
+ */
+ if (pqIsnonblocking(conn))
+ (void) pqFlush(conn);
return 0;
+ }
/* Parsing of the data waits till later. */
return 1;
}
@@ -1088,6 +1138,16 @@ PQexec(PGconn *conn, const char *query)
{
PGresult *result;
PGresult *lastResult;
+ bool savedblocking;
+
+ /*
+ * we assume anyone calling PQexec wants blocking behaviour,
+ * we force the blocking status of the connection to blocking
+ * for the duration of this function and restore it on return
+ */
+ savedblocking = pqIsnonblocking(conn);
+ if (PQsetnonblocking(conn, FALSE) == -1)
+ return NULL;
/*
* Silently discard any prior query result that application didn't
@@ -1102,14 +1162,15 @@ PQexec(PGconn *conn, const char *query)
PQclear(result);
printfPQExpBuffer(&conn->errorMessage,
"PQexec: you gotta get out of a COPY state yourself.\n");
- return NULL;
+ /* restore blocking status */
+ goto errout;
}
PQclear(result);
}
/* OK to send the message */
if (!PQsendQuery(conn, query))
- return NULL;
+ goto errout; /* restore blocking status */
/*
* For backwards compatibility, return the last result if there are
@@ -1142,7 +1203,15 @@ PQexec(PGconn *conn, const char *query)
result->resultStatus == PGRES_COPY_OUT)
break;
}
+
+ if (PQsetnonblocking(conn, savedblocking) == -1)
+ return NULL;
return lastResult;
+
+errout:
+ if (PQsetnonblocking(conn, savedblocking) == -1)
+ return NULL;
+ return NULL;
}
@@ -1432,7 +1501,16 @@ PQendcopy(PGconn *conn)
return 1;
}
- (void) pqFlush(conn); /* make sure no data is waiting to be sent */
+ /*
+ * make sure no data is waiting to be sent,
+ * abort if we are non-blocking and the flush fails
+ */
+ if (pqFlush(conn) && pqIsnonblocking(conn))
+ return (1);
+
+ /* non blocking connections may have to abort at this point. */
+ if (pqIsnonblocking(conn) && PQisBusy(conn))
+ return (1);
/* Return to active duty */
conn->asyncStatus = PGASYNC_BUSY;
@@ -2026,3 +2104,89 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num)
else
return 0;
}
+
+/* PQsetnonblocking:
+ sets the PGconn's database connection non-blocking if the arg is TRUE
+ or makes it non-blocking if the arg is FALSE, this will not protect
+ you from PQexec(), you'll only be safe when using the non-blocking
+ API
+ Needs to be called only on a connected database connection.
+*/
+
+int
+PQsetnonblocking(PGconn *conn, int arg)
+{
+ int fcntlarg;
+
+ arg = (arg == TRUE) ? 1 : 0;
+ /* early out if the socket is already in the state requested */
+ if (arg == conn->nonblocking)
+ return (0);
+
+ /*
+ * to guarantee constancy for flushing/query/result-polling behavior
+ * we need to flush the send queue at this point in order to guarantee
+ * proper behavior.
+ * this is ok because either they are making a transition
+ * _from_ or _to_ blocking mode, either way we can block them.
+ */
+ /* if we are going from blocking to non-blocking flush here */
+ if (!pqIsnonblocking(conn) && pqFlush(conn))
+ return (-1);
+
+
+#ifdef USE_SSL
+ if (conn->ssl)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ "PQsetnonblocking() -- not supported when using SSL\n");
+ return (-1);
+ }
+#endif /* USE_SSL */
+
+#ifndef WIN32
+ fcntlarg = fcntl(conn->sock, F_GETFL, 0);
+ if (fcntlarg == -1)
+ return (-1);
+
+ if ((arg == TRUE &&
+ fcntl(conn->sock, F_SETFL, fcntlarg | O_NONBLOCK) == -1) ||
+ (arg == FALSE &&
+ fcntl(conn->sock, F_SETFL, fcntlarg & ~O_NONBLOCK) == -1))
+#else
+ fcntlarg = arg;
+ if (ioctlsocket(conn->sock, FIONBIO, &fcntlarg) != 0)
+#endif
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ "PQsetblocking() -- unable to set nonblocking status to %s\n",
+ arg == TRUE ? "TRUE" : "FALSE");
+ return (-1);
+ }
+
+ conn->nonblocking = arg;
+
+ /* if we are going from non-blocking to blocking flush here */
+ if (pqIsnonblocking(conn) && pqFlush(conn))
+ return (-1);
+
+ return (0);
+}
+
+/* return the blocking status of the database connection, TRUE == nonblocking,
+ FALSE == blocking
+*/
+int
+PQisnonblocking(const PGconn *conn)
+{
+
+ return (pqIsnonblocking(conn));
+}
+
+/* try to force data out, really only useful for non-blocking users */
+int
+PQflush(PGconn *conn)
+{
+
+ return (pqFlush(conn));
+}
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 89425e0034..a936e93742 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -24,7 +24,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/interfaces/libpq/fe-misc.c,v 1.33 1999/11/30 03:08:19 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/interfaces/libpq/fe-misc.c,v 1.34 2000/01/18 06:09:24 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -86,6 +86,37 @@ pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
{
size_t avail = Max(conn->outBufSize - conn->outCount, 0);
+ /*
+ * if we are non-blocking and the send queue is too full to buffer this
+ * request then try to flush some and return an error
+ */
+ if (pqIsnonblocking(conn) && nbytes > avail && pqFlush(conn))
+ {
+ /*
+ * even if the flush failed we may still have written some
+ * data, recalculate the size of the send-queue relative
+ * to the amount we have to send, we may be able to queue it
+ * afterall even though it's not sent to the database it's
+ * ok, any routines that check the data coming from the
+ * database better call pqFlush() anyway.
+ */
+ if (nbytes > Max(conn->outBufSize - conn->outCount, 0))
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ "pqPutBytes -- pqFlush couldn't flush enough"
+ " data: space available: %d, space needed %d\n",
+ Max(conn->outBufSize - conn->outCount, 0), nbytes);
+ return EOF;
+ }
+ }
+
+ /*
+ * is the amount of data to be sent is larger than the size of the
+ * output buffer then we must flush it to make more room.
+ *
+ * the code above will make sure the loop conditional is never
+ * true for non-blocking connections
+ */
while (nbytes > avail)
{
memcpy(conn->outBuffer + conn->outCount, s, avail);
@@ -548,6 +579,14 @@ pqFlush(PGconn *conn)
return EOF;
}
+ /*
+ * don't try to send zero data, allows us to use this function
+ * without too much worry about overhead
+ */
+ if (len == 0)
+ return (0);
+
+ /* while there's still data to send */
while (len > 0)
{
/* Prevent being SIGPIPEd if backend has closed the connection. */
@@ -556,6 +595,7 @@ pqFlush(PGconn *conn)
#endif
int sent;
+
#ifdef USE_SSL
if (conn->ssl)
sent = SSL_write(conn->ssl, ptr, len);
@@ -585,6 +625,8 @@ pqFlush(PGconn *conn)
case EWOULDBLOCK:
break;
#endif
+ case EINTR:
+ continue;
case EPIPE:
#ifdef ECONNRESET
@@ -616,13 +658,31 @@ pqFlush(PGconn *conn)
ptr += sent;
len -= sent;
}
+
if (len > 0)
{
/* We didn't send it all, wait till we can send more */
- /* At first glance this looks as though it should block. I think
- * that it will be OK though, as long as the socket is
- * non-blocking. */
+ /*
+ * if the socket is in non-blocking mode we may need
+ * to abort here
+ */
+#ifdef USE_SSL
+ /* can't do anything for our SSL users yet */
+ if (conn->ssl == NULL)
+ {
+#endif
+ if (pqIsnonblocking(conn))
+ {
+ /* shift the contents of the buffer */
+ memmove(conn->outBuffer, ptr, len);
+ conn->outCount = len;
+ return EOF;
+ }
+#ifdef USE_SSL
+ }
+#endif
+
if (pqWait(FALSE, TRUE, conn))
return EOF;
}
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index ea07bf11cb..677e35966d 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -6,7 +6,7 @@
*
* Copyright (c) 1994, Regents of the University of California
*
- * $Id: libpq-fe.h,v 1.55 2000/01/15 05:37:21 ishii Exp $
+ * $Id: libpq-fe.h,v 1.56 2000/01/18 06:09:24 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -263,6 +263,13 @@ extern "C"
extern int PQputnbytes(PGconn *conn, const char *buffer, int nbytes);
extern int PQendcopy(PGconn *conn);
+ /* Set blocking/nonblocking connection to the backend */
+ extern int PQsetnonblocking(PGconn *conn, int arg);
+ extern int PQisnonblocking(const PGconn *conn);
+
+ /* Force the write buffer to be written (or at least try) */
+ extern int PQflush(PGconn *conn);
+
/*
* "Fast path" interface --- not really recommended for application
* use
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index b310beb4c4..fd1d776b48 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -11,7 +11,7 @@
*
* Copyright (c) 1994, Regents of the University of California
*
- * $Id: libpq-int.h,v 1.16 2000/01/15 05:37:21 ishii Exp $
+ * $Id: libpq-int.h,v 1.17 2000/01/18 06:09:24 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -214,6 +214,9 @@ struct pg_conn
int inEnd; /* offset to first position after avail
* data */
+ int nonblocking; /* whether this connection is using a blocking
+ * socket to the backend or not */
+
/* Buffer for data not yet sent to backend */
char *outBuffer; /* currently allocated buffer */
int outBufSize; /* allocated size of buffer */
@@ -300,4 +303,10 @@ extern char *sys_errlist[];
#endif /* sunos4 */
#endif /* !strerror */
+/*
+ * this is so that we can check is a connection is non-blocking internally
+ * without the overhead of a function call
+ */
+#define pqIsnonblocking(conn) (conn->nonblocking)
+
#endif /* LIBPQ_INT_H */