qemu-nbd: throttle requests

Limiting the number of in-flight requests is implemented very simply
with a can_read callback.  It does not require a semaphore, unlike the
client side in block/nbd.c, because we can throttle directly the creation
of coroutines.  The client side can have a coroutine created at any time
when an I/O request is made.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
This commit is contained in:
Paolo Bonzini 2011-09-19 15:25:40 +02:00
parent 262db38871
commit 41996e3803

25
nbd.c
View File

@ -587,6 +587,8 @@ static int nbd_send_reply(int csock, struct nbd_reply *reply)
return 0; return 0;
} }
#define MAX_NBD_REQUESTS 16
typedef struct NBDRequest NBDRequest; typedef struct NBDRequest NBDRequest;
struct NBDRequest { struct NBDRequest {
@ -614,6 +616,8 @@ struct NBDClient {
CoMutex send_lock; CoMutex send_lock;
Coroutine *send_coroutine; Coroutine *send_coroutine;
int nb_requests;
}; };
static void nbd_client_get(NBDClient *client) static void nbd_client_get(NBDClient *client)
@ -644,6 +648,9 @@ static NBDRequest *nbd_request_get(NBDClient *client)
NBDRequest *req; NBDRequest *req;
NBDExport *exp = client->exp; NBDExport *exp = client->exp;
assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
client->nb_requests++;
if (QSIMPLEQ_EMPTY(&exp->requests)) { if (QSIMPLEQ_EMPTY(&exp->requests)) {
req = g_malloc0(sizeof(NBDRequest)); req = g_malloc0(sizeof(NBDRequest));
req->data = qemu_blockalign(exp->bs, NBD_BUFFER_SIZE); req->data = qemu_blockalign(exp->bs, NBD_BUFFER_SIZE);
@ -660,6 +667,9 @@ static void nbd_request_put(NBDRequest *req)
{ {
NBDClient *client = req->client; NBDClient *client = req->client;
QSIMPLEQ_INSERT_HEAD(&client->exp->requests, req, entry); QSIMPLEQ_INSERT_HEAD(&client->exp->requests, req, entry);
if (client->nb_requests-- == MAX_NBD_REQUESTS) {
qemu_notify_event();
}
nbd_client_put(client); nbd_client_put(client);
} }
@ -688,6 +698,7 @@ void nbd_export_close(NBDExport *exp)
g_free(exp); g_free(exp);
} }
static int nbd_can_read(void *opaque);
static void nbd_read(void *opaque); static void nbd_read(void *opaque);
static void nbd_restart_write(void *opaque); static void nbd_restart_write(void *opaque);
@ -699,7 +710,8 @@ static int nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
int rc, ret; int rc, ret;
qemu_co_mutex_lock(&client->send_lock); qemu_co_mutex_lock(&client->send_lock);
qemu_set_fd_handler2(csock, NULL, nbd_read, nbd_restart_write, client); qemu_set_fd_handler2(csock, nbd_can_read, nbd_read,
nbd_restart_write, client);
client->send_coroutine = qemu_coroutine_self(); client->send_coroutine = qemu_coroutine_self();
if (!len) { if (!len) {
@ -724,7 +736,7 @@ static int nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
} }
client->send_coroutine = NULL; client->send_coroutine = NULL;
qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client); qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
qemu_co_mutex_unlock(&client->send_lock); qemu_co_mutex_unlock(&client->send_lock);
return rc; return rc;
} }
@ -900,6 +912,13 @@ out:
nbd_client_close(client); nbd_client_close(client);
} }
static int nbd_can_read(void *opaque)
{
NBDClient *client = opaque;
return client->recv_coroutine || client->nb_requests < MAX_NBD_REQUESTS;
}
static void nbd_read(void *opaque) static void nbd_read(void *opaque)
{ {
NBDClient *client = opaque; NBDClient *client = opaque;
@ -931,6 +950,6 @@ NBDClient *nbd_client_new(NBDExport *exp, int csock,
client->sock = csock; client->sock = csock;
client->close = close; client->close = close;
qemu_co_mutex_init(&client->send_lock); qemu_co_mutex_init(&client->send_lock);
qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client); qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
return client; return client;
} }