nbd: do not block in nbd_wr_sync if no data at all is available

Right now, nbd_wr_sync will hang if no data at all is available on the
socket and the other side is not going to provide any.  Relax this by
making it loop only for writes or partial reads.  This fixes a race
where one thread is executing qemu_aio_wait() and another is executing
main_loop_wait().  Then, the select() call in main_loop_wait() can return
stale data and call the "readable" callback with no data in the socket.

Reported-by: Laurent Vivier <laurent@vivier.eu>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
This commit is contained in:
Paolo Bonzini 2012-03-05 09:10:35 +01:00
parent 185b43386a
commit 7fe7b68b32
2 changed files with 44 additions and 8 deletions

View File

@ -151,10 +151,18 @@ static void nbd_reply_ready(void *opaque)
{ {
BDRVNBDState *s = opaque; BDRVNBDState *s = opaque;
uint64_t i; uint64_t i;
int ret;
if (s->reply.handle == 0) { if (s->reply.handle == 0) {
/* No reply already in flight. Fetch a header. */ /* No reply already in flight. Fetch a header. It is possible
if (nbd_receive_reply(s->sock, &s->reply) < 0) { * that another thread has done the same thing in parallel, so
* the socket is not readable anymore.
*/
ret = nbd_receive_reply(s->sock, &s->reply);
if (ret == -EAGAIN) {
return;
}
if (ret < 0) {
s->reply.handle = 0; s->reply.handle = 0;
goto fail; goto fail;
} }

40
nbd.c
View File

@ -78,9 +78,6 @@
/* That's all folks */ /* That's all folks */
#define read_sync(fd, buffer, size) nbd_wr_sync(fd, buffer, size, true)
#define write_sync(fd, buffer, size) nbd_wr_sync(fd, buffer, size, false)
ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
{ {
size_t offset = 0; size_t offset = 0;
@ -107,7 +104,7 @@ ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
err = socket_error(); err = socket_error();
/* recoverable error */ /* recoverable error */
if (err == EINTR || err == EAGAIN) { if (err == EINTR || (offset > 0 && err == EAGAIN)) {
continue; continue;
} }
@ -126,6 +123,26 @@ ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
return offset; return offset;
} }
static ssize_t read_sync(int fd, void *buffer, size_t size)
{
/* Sockets are kept in blocking mode in the negotiation phase. After
* that, a non-readable socket simply means that another thread stole
* our request/reply. Synchronization is done with recv_coroutine, so
* that this is coroutine-safe.
*/
return nbd_wr_sync(fd, buffer, size, true);
}
static ssize_t write_sync(int fd, void *buffer, size_t size)
{
int ret;
do {
/* For writes, we do expect the socket to be writable. */
ret = nbd_wr_sync(fd, buffer, size, false);
} while (ret == -EAGAIN);
return ret;
}
static void combine_addr(char *buf, size_t len, const char* address, static void combine_addr(char *buf, size_t len, const char* address,
uint16_t port) uint16_t port)
{ {
@ -203,6 +220,7 @@ static int nbd_send_negotiate(int csock, off_t size, uint32_t flags)
[28 .. 151] reserved (0) [28 .. 151] reserved (0)
*/ */
socket_set_block(csock);
rc = -EINVAL; rc = -EINVAL;
TRACE("Beginning negotiation."); TRACE("Beginning negotiation.");
@ -222,6 +240,7 @@ static int nbd_send_negotiate(int csock, off_t size, uint32_t flags)
TRACE("Negotiation succeeded."); TRACE("Negotiation succeeded.");
rc = 0; rc = 0;
fail: fail:
socket_set_nonblock(csock);
return rc; return rc;
} }
@ -235,6 +254,7 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
TRACE("Receiving negotiation."); TRACE("Receiving negotiation.");
socket_set_block(csock);
rc = -EINVAL; rc = -EINVAL;
if (read_sync(csock, buf, 8) != 8) { if (read_sync(csock, buf, 8) != 8) {
@ -349,6 +369,7 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
rc = 0; rc = 0;
fail: fail:
socket_set_nonblock(csock);
return rc; return rc;
} }
@ -742,8 +763,11 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque
ssize_t rc; ssize_t rc;
client->recv_coroutine = qemu_coroutine_self(); client->recv_coroutine = qemu_coroutine_self();
if (nbd_receive_request(csock, request) < 0) { rc = nbd_receive_request(csock, request);
rc = -EIO; if (rc < 0) {
if (rc != -EAGAIN) {
rc = -EIO;
}
goto out; goto out;
} }
@ -791,6 +815,9 @@ static void nbd_trip(void *opaque)
TRACE("Reading request."); TRACE("Reading request.");
ret = nbd_co_receive_request(req, &request); ret = nbd_co_receive_request(req, &request);
if (ret == -EAGAIN) {
goto done;
}
if (ret == -EIO) { if (ret == -EIO) {
goto out; goto out;
} }
@ -901,6 +928,7 @@ static void nbd_trip(void *opaque)
TRACE("Request/Reply complete"); TRACE("Request/Reply complete");
done:
nbd_request_put(req); nbd_request_put(req);
return; return;