nbd-server: Coroutine based negotiation

Create a coroutine in nbd_client_new, so that nbd_send_negotiate doesn't
need qemu_set_block().

Handlers need to be set temporarily for csock fd in case the coroutine
yields during I/O.

With this, if the other end disappears in the middle of the negotiation,
we don't block the whole event loop.

To make the code clearer, unify all function names that belong to
negotiate, so they are less likely to be misused. This is important
because we rely on negotiation staying in main loop, as commented in
nbd_negotiate_read/write().

Signed-off-by: Fam Zheng <famz@redhat.com>
Message-Id: <1452760863-25350-4-git-send-email-famz@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
This commit is contained in:
Fam Zheng 2016-01-14 16:41:03 +08:00 committed by Paolo Bonzini
parent 798bfe0006
commit 1a6245a5b0

View File

@ -93,13 +93,45 @@ static void nbd_set_handlers(NBDClient *client);
static void nbd_unset_handlers(NBDClient *client); static void nbd_unset_handlers(NBDClient *client);
static void nbd_update_can_read(NBDClient *client); static void nbd_update_can_read(NBDClient *client);
static ssize_t drop_sync(int fd, size_t size) static void nbd_negotiate_continue(void *opaque)
{
qemu_coroutine_enter(opaque, NULL);
}
static ssize_t nbd_negotiate_read(int fd, void *buffer, size_t size)
{
ssize_t ret;
assert(qemu_in_coroutine());
/* Negotiation are always in main loop. */
qemu_set_fd_handler(fd, nbd_negotiate_continue, NULL,
qemu_coroutine_self());
ret = read_sync(fd, buffer, size);
qemu_set_fd_handler(fd, NULL, NULL, NULL);
return ret;
}
static ssize_t nbd_negotiate_write(int fd, void *buffer, size_t size)
{
ssize_t ret;
assert(qemu_in_coroutine());
/* Negotiation are always in main loop. */
qemu_set_fd_handler(fd, NULL, nbd_negotiate_continue,
qemu_coroutine_self());
ret = write_sync(fd, buffer, size);
qemu_set_fd_handler(fd, NULL, NULL, NULL);
return ret;
}
static ssize_t nbd_negotiate_drop_sync(int fd, size_t size)
{ {
ssize_t ret, dropped = size; ssize_t ret, dropped = size;
uint8_t *buffer = g_malloc(MIN(65536, size)); uint8_t *buffer = g_malloc(MIN(65536, size));
while (size > 0) { while (size > 0) {
ret = read_sync(fd, buffer, MIN(65536, size)); ret = nbd_negotiate_read(fd, buffer, MIN(65536, size));
if (ret < 0) { if (ret < 0) {
g_free(buffer); g_free(buffer);
return ret; return ret;
@ -140,96 +172,96 @@ static ssize_t drop_sync(int fd, size_t size)
*/ */
static int nbd_send_rep(int csock, uint32_t type, uint32_t opt) static int nbd_negotiate_send_rep(int csock, uint32_t type, uint32_t opt)
{ {
uint64_t magic; uint64_t magic;
uint32_t len; uint32_t len;
magic = cpu_to_be64(NBD_REP_MAGIC); magic = cpu_to_be64(NBD_REP_MAGIC);
if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) { if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
LOG("write failed (rep magic)"); LOG("write failed (rep magic)");
return -EINVAL; return -EINVAL;
} }
opt = cpu_to_be32(opt); opt = cpu_to_be32(opt);
if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) { if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
LOG("write failed (rep opt)"); LOG("write failed (rep opt)");
return -EINVAL; return -EINVAL;
} }
type = cpu_to_be32(type); type = cpu_to_be32(type);
if (write_sync(csock, &type, sizeof(type)) != sizeof(type)) { if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
LOG("write failed (rep type)"); LOG("write failed (rep type)");
return -EINVAL; return -EINVAL;
} }
len = cpu_to_be32(0); len = cpu_to_be32(0);
if (write_sync(csock, &len, sizeof(len)) != sizeof(len)) { if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
LOG("write failed (rep data length)"); LOG("write failed (rep data length)");
return -EINVAL; return -EINVAL;
} }
return 0; return 0;
} }
static int nbd_send_rep_list(int csock, NBDExport *exp) static int nbd_negotiate_send_rep_list(int csock, NBDExport *exp)
{ {
uint64_t magic, name_len; uint64_t magic, name_len;
uint32_t opt, type, len; uint32_t opt, type, len;
name_len = strlen(exp->name); name_len = strlen(exp->name);
magic = cpu_to_be64(NBD_REP_MAGIC); magic = cpu_to_be64(NBD_REP_MAGIC);
if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) { if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
LOG("write failed (magic)"); LOG("write failed (magic)");
return -EINVAL; return -EINVAL;
} }
opt = cpu_to_be32(NBD_OPT_LIST); opt = cpu_to_be32(NBD_OPT_LIST);
if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) { if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
LOG("write failed (opt)"); LOG("write failed (opt)");
return -EINVAL; return -EINVAL;
} }
type = cpu_to_be32(NBD_REP_SERVER); type = cpu_to_be32(NBD_REP_SERVER);
if (write_sync(csock, &type, sizeof(type)) != sizeof(type)) { if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
LOG("write failed (reply type)"); LOG("write failed (reply type)");
return -EINVAL; return -EINVAL;
} }
len = cpu_to_be32(name_len + sizeof(len)); len = cpu_to_be32(name_len + sizeof(len));
if (write_sync(csock, &len, sizeof(len)) != sizeof(len)) { if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
LOG("write failed (length)"); LOG("write failed (length)");
return -EINVAL; return -EINVAL;
} }
len = cpu_to_be32(name_len); len = cpu_to_be32(name_len);
if (write_sync(csock, &len, sizeof(len)) != sizeof(len)) { if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
LOG("write failed (length)"); LOG("write failed (length)");
return -EINVAL; return -EINVAL;
} }
if (write_sync(csock, exp->name, name_len) != name_len) { if (nbd_negotiate_write(csock, exp->name, name_len) != name_len) {
LOG("write failed (buffer)"); LOG("write failed (buffer)");
return -EINVAL; return -EINVAL;
} }
return 0; return 0;
} }
static int nbd_handle_list(NBDClient *client, uint32_t length) static int nbd_negotiate_handle_list(NBDClient *client, uint32_t length)
{ {
int csock; int csock;
NBDExport *exp; NBDExport *exp;
csock = client->sock; csock = client->sock;
if (length) { if (length) {
if (drop_sync(csock, length) != length) { if (nbd_negotiate_drop_sync(csock, length) != length) {
return -EIO; return -EIO;
} }
return nbd_send_rep(csock, NBD_REP_ERR_INVALID, NBD_OPT_LIST); return nbd_negotiate_send_rep(csock, NBD_REP_ERR_INVALID, NBD_OPT_LIST);
} }
/* For each export, send a NBD_REP_SERVER reply. */ /* For each export, send a NBD_REP_SERVER reply. */
QTAILQ_FOREACH(exp, &exports, next) { QTAILQ_FOREACH(exp, &exports, next) {
if (nbd_send_rep_list(csock, exp)) { if (nbd_negotiate_send_rep_list(csock, exp)) {
return -EINVAL; return -EINVAL;
} }
} }
/* Finish with a NBD_REP_ACK. */ /* Finish with a NBD_REP_ACK. */
return nbd_send_rep(csock, NBD_REP_ACK, NBD_OPT_LIST); return nbd_negotiate_send_rep(csock, NBD_REP_ACK, NBD_OPT_LIST);
} }
static int nbd_handle_export_name(NBDClient *client, uint32_t length) static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length)
{ {
int rc = -EINVAL, csock = client->sock; int rc = -EINVAL, csock = client->sock;
char name[256]; char name[256];
@ -242,7 +274,7 @@ static int nbd_handle_export_name(NBDClient *client, uint32_t length)
LOG("Bad length received"); LOG("Bad length received");
goto fail; goto fail;
} }
if (read_sync(csock, name, length) != length) { if (nbd_negotiate_read(csock, name, length) != length) {
LOG("read failed"); LOG("read failed");
goto fail; goto fail;
} }
@ -261,7 +293,7 @@ fail:
return rc; return rc;
} }
static int nbd_receive_options(NBDClient *client) static int nbd_negotiate_options(NBDClient *client)
{ {
int csock = client->sock; int csock = client->sock;
uint32_t flags; uint32_t flags;
@ -280,7 +312,7 @@ static int nbd_receive_options(NBDClient *client)
... Rest of request ... Rest of request
*/ */
if (read_sync(csock, &flags, sizeof(flags)) != sizeof(flags)) { if (nbd_negotiate_read(csock, &flags, sizeof(flags)) != sizeof(flags)) {
LOG("read failed"); LOG("read failed");
return -EIO; return -EIO;
} }
@ -296,7 +328,7 @@ static int nbd_receive_options(NBDClient *client)
uint32_t tmp, length; uint32_t tmp, length;
uint64_t magic; uint64_t magic;
if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) { if (nbd_negotiate_read(csock, &magic, sizeof(magic)) != sizeof(magic)) {
LOG("read failed"); LOG("read failed");
return -EINVAL; return -EINVAL;
} }
@ -306,12 +338,13 @@ static int nbd_receive_options(NBDClient *client)
return -EINVAL; return -EINVAL;
} }
if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) { if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
LOG("read failed"); LOG("read failed");
return -EINVAL; return -EINVAL;
} }
if (read_sync(csock, &length, sizeof(length)) != sizeof(length)) { if (nbd_negotiate_read(csock, &length,
sizeof(length)) != sizeof(length)) {
LOG("read failed"); LOG("read failed");
return -EINVAL; return -EINVAL;
} }
@ -320,7 +353,7 @@ static int nbd_receive_options(NBDClient *client)
TRACE("Checking option"); TRACE("Checking option");
switch (be32_to_cpu(tmp)) { switch (be32_to_cpu(tmp)) {
case NBD_OPT_LIST: case NBD_OPT_LIST:
ret = nbd_handle_list(client, length); ret = nbd_negotiate_handle_list(client, length);
if (ret < 0) { if (ret < 0) {
return ret; return ret;
} }
@ -330,19 +363,25 @@ static int nbd_receive_options(NBDClient *client)
return -EINVAL; return -EINVAL;
case NBD_OPT_EXPORT_NAME: case NBD_OPT_EXPORT_NAME:
return nbd_handle_export_name(client, length); return nbd_negotiate_handle_export_name(client, length);
default: default:
tmp = be32_to_cpu(tmp); tmp = be32_to_cpu(tmp);
LOG("Unsupported option 0x%x", tmp); LOG("Unsupported option 0x%x", tmp);
nbd_send_rep(client->sock, NBD_REP_ERR_UNSUP, tmp); nbd_negotiate_send_rep(client->sock, NBD_REP_ERR_UNSUP, tmp);
return -EINVAL; return -EINVAL;
} }
} }
} }
static int nbd_send_negotiate(NBDClient *client) typedef struct {
NBDClient *client;
Coroutine *co;
} NBDClientNewData;
static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
{ {
NBDClient *client = data->client;
int csock = client->sock; int csock = client->sock;
char buf[8 + 8 + 8 + 128]; char buf[8 + 8 + 8 + 128];
int rc; int rc;
@ -368,7 +407,6 @@ static int nbd_send_negotiate(NBDClient *client)
[28 .. 151] reserved (0) [28 .. 151] reserved (0)
*/ */
qemu_set_block(csock);
rc = -EINVAL; rc = -EINVAL;
TRACE("Beginning negotiation."); TRACE("Beginning negotiation.");
@ -385,16 +423,16 @@ static int nbd_send_negotiate(NBDClient *client)
} }
if (client->exp) { if (client->exp) {
if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) { if (nbd_negotiate_write(csock, buf, sizeof(buf)) != sizeof(buf)) {
LOG("write failed"); LOG("write failed");
goto fail; goto fail;
} }
} else { } else {
if (write_sync(csock, buf, 18) != 18) { if (nbd_negotiate_write(csock, buf, 18) != 18) {
LOG("write failed"); LOG("write failed");
goto fail; goto fail;
} }
rc = nbd_receive_options(client); rc = nbd_negotiate_options(client);
if (rc != 0) { if (rc != 0) {
LOG("option negotiation failed"); LOG("option negotiation failed");
goto fail; goto fail;
@ -403,7 +441,8 @@ static int nbd_send_negotiate(NBDClient *client)
assert ((client->exp->nbdflags & ~65535) == 0); assert ((client->exp->nbdflags & ~65535) == 0);
cpu_to_be64w((uint64_t*)(buf + 18), client->exp->size); cpu_to_be64w((uint64_t*)(buf + 18), client->exp->size);
cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags); cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
if (write_sync(csock, buf + 18, sizeof(buf) - 18) != sizeof(buf) - 18) { if (nbd_negotiate_write(csock, buf + 18,
sizeof(buf) - 18) != sizeof(buf) - 18) {
LOG("write failed"); LOG("write failed");
goto fail; goto fail;
} }
@ -412,7 +451,6 @@ static int nbd_send_negotiate(NBDClient *client)
TRACE("Negotiation succeeded."); TRACE("Negotiation succeeded.");
rc = 0; rc = 0;
fail: fail:
qemu_set_nonblock(csock);
return rc; return rc;
} }
@ -1028,25 +1066,43 @@ static void nbd_update_can_read(NBDClient *client)
} }
} }
void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *)) static coroutine_fn void nbd_co_client_start(void *opaque)
{ {
NBDClient *client; NBDClientNewData *data = opaque;
client = g_malloc0(sizeof(NBDClient)); NBDClient *client = data->client;
client->refcount = 1; NBDExport *exp = client->exp;
client->exp = exp;
client->sock = csock; if (exp) {
client->can_read = true; nbd_export_get(exp);
if (nbd_send_negotiate(client)) { }
shutdown(client->sock, 2); if (nbd_negotiate(data)) {
close_fn(client); shutdown(client->sock, 2);
return; client->close(client);
goto out;
} }
client->close = close_fn;
qemu_co_mutex_init(&client->send_lock); qemu_co_mutex_init(&client->send_lock);
nbd_set_handlers(client); nbd_set_handlers(client);
if (exp) { if (exp) {
QTAILQ_INSERT_TAIL(&exp->clients, client, next); QTAILQ_INSERT_TAIL(&exp->clients, client, next);
nbd_export_get(exp);
} }
out:
g_free(data);
}
void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *))
{
NBDClient *client;
NBDClientNewData *data = g_new(NBDClientNewData, 1);
client = g_malloc0(sizeof(NBDClient));
client->refcount = 1;
client->exp = exp;
client->sock = csock;
client->can_read = true;
client->close = close_fn;
data->client = client;
data->co = qemu_coroutine_create(nbd_co_client_start);
qemu_coroutine_enter(data->co, data);
} }