nbd: convert to using I/O channels for actual socket I/O
Now that all callers are converted to use I/O channels for initial connection setup, it is possible to switch the core NBD protocol handling core over to use QIOChannel APIs for actual sockets I/O. Signed-off-by: Daniel P. Berrange <berrange@redhat.com> Message-Id: <1455129674-17255-7-git-send-email-berrange@redhat.com> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
This commit is contained in:
parent
ae39827802
commit
1c778ef729
@ -80,7 +80,7 @@ static void nbd_reply_ready(void *opaque)
|
|||||||
* that another thread has done the same thing in parallel, so
|
* that another thread has done the same thing in parallel, so
|
||||||
* the socket is not readable anymore.
|
* the socket is not readable anymore.
|
||||||
*/
|
*/
|
||||||
ret = nbd_receive_reply(s->sioc->fd, &s->reply);
|
ret = nbd_receive_reply(s->ioc, &s->reply);
|
||||||
if (ret == -EAGAIN) {
|
if (ret == -EAGAIN) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -131,6 +131,7 @@ static int nbd_co_send_request(BlockDriverState *bs,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
g_assert(qemu_in_coroutine());
|
||||||
assert(i < MAX_NBD_REQUESTS);
|
assert(i < MAX_NBD_REQUESTS);
|
||||||
request->handle = INDEX_TO_HANDLE(s, i);
|
request->handle = INDEX_TO_HANDLE(s, i);
|
||||||
|
|
||||||
@ -146,17 +147,17 @@ static int nbd_co_send_request(BlockDriverState *bs,
|
|||||||
nbd_reply_ready, nbd_restart_write, bs);
|
nbd_reply_ready, nbd_restart_write, bs);
|
||||||
if (qiov) {
|
if (qiov) {
|
||||||
qio_channel_set_cork(s->ioc, true);
|
qio_channel_set_cork(s->ioc, true);
|
||||||
rc = nbd_send_request(s->sioc->fd, request);
|
rc = nbd_send_request(s->ioc, request);
|
||||||
if (rc >= 0) {
|
if (rc >= 0) {
|
||||||
ret = qemu_co_sendv(s->sioc->fd, qiov->iov, qiov->niov,
|
ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov,
|
||||||
offset, request->len);
|
offset, request->len, 0);
|
||||||
if (ret != request->len) {
|
if (ret != request->len) {
|
||||||
rc = -EIO;
|
rc = -EIO;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
qio_channel_set_cork(s->ioc, false);
|
qio_channel_set_cork(s->ioc, false);
|
||||||
} else {
|
} else {
|
||||||
rc = nbd_send_request(s->sioc->fd, request);
|
rc = nbd_send_request(s->ioc, request);
|
||||||
}
|
}
|
||||||
aio_set_fd_handler(aio_context, s->sioc->fd, false,
|
aio_set_fd_handler(aio_context, s->sioc->fd, false,
|
||||||
nbd_reply_ready, NULL, bs);
|
nbd_reply_ready, NULL, bs);
|
||||||
@ -180,8 +181,8 @@ static void nbd_co_receive_reply(NbdClientSession *s,
|
|||||||
reply->error = EIO;
|
reply->error = EIO;
|
||||||
} else {
|
} else {
|
||||||
if (qiov && reply->error == 0) {
|
if (qiov && reply->error == 0) {
|
||||||
ret = qemu_co_recvv(s->sioc->fd, qiov->iov, qiov->niov,
|
ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov,
|
||||||
offset, request->len);
|
offset, request->len, 1);
|
||||||
if (ret != request->len) {
|
if (ret != request->len) {
|
||||||
reply->error = EIO;
|
reply->error = EIO;
|
||||||
}
|
}
|
||||||
@ -388,7 +389,7 @@ void nbd_client_close(BlockDriverState *bs)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
nbd_send_request(client->sioc->fd, &request);
|
nbd_send_request(client->ioc, &request);
|
||||||
|
|
||||||
nbd_teardown_connection(bs);
|
nbd_teardown_connection(bs);
|
||||||
}
|
}
|
||||||
@ -403,7 +404,7 @@ int nbd_client_init(BlockDriverState *bs, QIOChannelSocket *sioc,
|
|||||||
logout("session init %s\n", export);
|
logout("session init %s\n", export);
|
||||||
qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
|
qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
|
||||||
|
|
||||||
ret = nbd_receive_negotiate(sioc->fd, export,
|
ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), export,
|
||||||
&client->nbdflags, &client->size, errp);
|
&client->nbdflags, &client->size, errp);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
logout("Failed to negotiate with the NBD server\n");
|
logout("Failed to negotiate with the NBD server\n");
|
||||||
|
@ -27,7 +27,6 @@ static gboolean nbd_accept(QIOChannel *ioc, GIOCondition condition,
|
|||||||
gpointer opaque)
|
gpointer opaque)
|
||||||
{
|
{
|
||||||
QIOChannelSocket *cioc;
|
QIOChannelSocket *cioc;
|
||||||
int fd;
|
|
||||||
|
|
||||||
cioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
|
cioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
|
||||||
NULL);
|
NULL);
|
||||||
@ -35,10 +34,7 @@ static gboolean nbd_accept(QIOChannel *ioc, GIOCondition condition,
|
|||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
fd = dup(cioc->fd);
|
nbd_client_new(NULL, cioc, nbd_client_put);
|
||||||
if (fd >= 0) {
|
|
||||||
nbd_client_new(NULL, fd, nbd_client_put);
|
|
||||||
}
|
|
||||||
object_unref(OBJECT(cioc));
|
object_unref(OBJECT(cioc));
|
||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
|
|
||||||
#include "qemu-common.h"
|
#include "qemu-common.h"
|
||||||
#include "qemu/option.h"
|
#include "qemu/option.h"
|
||||||
|
#include "io/channel-socket.h"
|
||||||
|
|
||||||
struct nbd_request {
|
struct nbd_request {
|
||||||
uint32_t magic;
|
uint32_t magic;
|
||||||
@ -73,12 +74,17 @@ enum {
|
|||||||
/* Maximum size of a single READ/WRITE data buffer */
|
/* Maximum size of a single READ/WRITE data buffer */
|
||||||
#define NBD_MAX_BUFFER_SIZE (32 * 1024 * 1024)
|
#define NBD_MAX_BUFFER_SIZE (32 * 1024 * 1024)
|
||||||
|
|
||||||
ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read);
|
ssize_t nbd_wr_syncv(QIOChannel *ioc,
|
||||||
int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
|
struct iovec *iov,
|
||||||
|
size_t niov,
|
||||||
|
size_t offset,
|
||||||
|
size_t length,
|
||||||
|
bool do_read);
|
||||||
|
int nbd_receive_negotiate(QIOChannel *ioc, const char *name, uint32_t *flags,
|
||||||
off_t *size, Error **errp);
|
off_t *size, Error **errp);
|
||||||
int nbd_init(int fd, int csock, uint32_t flags, off_t size);
|
int nbd_init(int fd, QIOChannelSocket *sioc, uint32_t flags, off_t size);
|
||||||
ssize_t nbd_send_request(int csock, struct nbd_request *request);
|
ssize_t nbd_send_request(QIOChannel *ioc, struct nbd_request *request);
|
||||||
ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply);
|
ssize_t nbd_receive_reply(QIOChannel *ioc, struct nbd_reply *reply);
|
||||||
int nbd_client(int fd);
|
int nbd_client(int fd);
|
||||||
int nbd_disconnect(int fd);
|
int nbd_disconnect(int fd);
|
||||||
|
|
||||||
@ -98,7 +104,9 @@ NBDExport *nbd_export_find(const char *name);
|
|||||||
void nbd_export_set_name(NBDExport *exp, const char *name);
|
void nbd_export_set_name(NBDExport *exp, const char *name);
|
||||||
void nbd_export_close_all(void);
|
void nbd_export_close_all(void);
|
||||||
|
|
||||||
void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *));
|
void nbd_client_new(NBDExport *exp,
|
||||||
|
QIOChannelSocket *sioc,
|
||||||
|
void (*close)(NBDClient *));
|
||||||
void nbd_client_get(NBDClient *client);
|
void nbd_client_get(NBDClient *client);
|
||||||
void nbd_client_put(NBDClient *client);
|
void nbd_client_put(NBDClient *client);
|
||||||
|
|
||||||
|
40
nbd/client.c
40
nbd/client.c
@ -71,7 +71,7 @@ static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
|
|||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
|
int nbd_receive_negotiate(QIOChannel *ioc, const char *name, uint32_t *flags,
|
||||||
off_t *size, Error **errp)
|
off_t *size, Error **errp)
|
||||||
{
|
{
|
||||||
char buf[256];
|
char buf[256];
|
||||||
@ -83,7 +83,7 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
|
|||||||
|
|
||||||
rc = -EINVAL;
|
rc = -EINVAL;
|
||||||
|
|
||||||
if (read_sync(csock, buf, 8) != 8) {
|
if (read_sync(ioc, buf, 8) != 8) {
|
||||||
error_setg(errp, "Failed to read data");
|
error_setg(errp, "Failed to read data");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
@ -109,7 +109,7 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
|
|||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
|
if (read_sync(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
|
||||||
error_setg(errp, "Failed to read magic");
|
error_setg(errp, "Failed to read magic");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
@ -130,35 +130,35 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
|
|||||||
}
|
}
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
|
if (read_sync(ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) {
|
||||||
error_setg(errp, "Failed to read server flags");
|
error_setg(errp, "Failed to read server flags");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
*flags = be16_to_cpu(tmp) << 16;
|
*flags = be16_to_cpu(tmp) << 16;
|
||||||
/* reserved for future use */
|
/* reserved for future use */
|
||||||
if (write_sync(csock, &reserved, sizeof(reserved)) !=
|
if (write_sync(ioc, &reserved, sizeof(reserved)) !=
|
||||||
sizeof(reserved)) {
|
sizeof(reserved)) {
|
||||||
error_setg(errp, "Failed to read reserved field");
|
error_setg(errp, "Failed to read reserved field");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
/* write the export name */
|
/* write the export name */
|
||||||
magic = cpu_to_be64(magic);
|
magic = cpu_to_be64(magic);
|
||||||
if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
|
if (write_sync(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
|
||||||
error_setg(errp, "Failed to send export name magic");
|
error_setg(errp, "Failed to send export name magic");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
opt = cpu_to_be32(NBD_OPT_EXPORT_NAME);
|
opt = cpu_to_be32(NBD_OPT_EXPORT_NAME);
|
||||||
if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) {
|
if (write_sync(ioc, &opt, sizeof(opt)) != sizeof(opt)) {
|
||||||
error_setg(errp, "Failed to send export name option number");
|
error_setg(errp, "Failed to send export name option number");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
namesize = cpu_to_be32(strlen(name));
|
namesize = cpu_to_be32(strlen(name));
|
||||||
if (write_sync(csock, &namesize, sizeof(namesize)) !=
|
if (write_sync(ioc, &namesize, sizeof(namesize)) !=
|
||||||
sizeof(namesize)) {
|
sizeof(namesize)) {
|
||||||
error_setg(errp, "Failed to send export name length");
|
error_setg(errp, "Failed to send export name length");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
if (write_sync(csock, (char*)name, strlen(name)) != strlen(name)) {
|
if (write_sync(ioc, (char *)name, strlen(name)) != strlen(name)) {
|
||||||
error_setg(errp, "Failed to send export name");
|
error_setg(errp, "Failed to send export name");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
@ -175,7 +175,7 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (read_sync(csock, &s, sizeof(s)) != sizeof(s)) {
|
if (read_sync(ioc, &s, sizeof(s)) != sizeof(s)) {
|
||||||
error_setg(errp, "Failed to read export length");
|
error_setg(errp, "Failed to read export length");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
@ -183,19 +183,19 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
|
|||||||
TRACE("Size is %" PRIu64, *size);
|
TRACE("Size is %" PRIu64, *size);
|
||||||
|
|
||||||
if (!name) {
|
if (!name) {
|
||||||
if (read_sync(csock, flags, sizeof(*flags)) != sizeof(*flags)) {
|
if (read_sync(ioc, flags, sizeof(*flags)) != sizeof(*flags)) {
|
||||||
error_setg(errp, "Failed to read export flags");
|
error_setg(errp, "Failed to read export flags");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
*flags = be32_to_cpup(flags);
|
*flags = be32_to_cpup(flags);
|
||||||
} else {
|
} else {
|
||||||
if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
|
if (read_sync(ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) {
|
||||||
error_setg(errp, "Failed to read export flags");
|
error_setg(errp, "Failed to read export flags");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
*flags |= be16_to_cpu(tmp);
|
*flags |= be16_to_cpu(tmp);
|
||||||
}
|
}
|
||||||
if (read_sync(csock, &buf, 124) != 124) {
|
if (read_sync(ioc, &buf, 124) != 124) {
|
||||||
error_setg(errp, "Failed to read reserved block");
|
error_setg(errp, "Failed to read reserved block");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
@ -206,11 +206,11 @@ fail:
|
|||||||
}
|
}
|
||||||
|
|
||||||
#ifdef __linux__
|
#ifdef __linux__
|
||||||
int nbd_init(int fd, int csock, uint32_t flags, off_t size)
|
int nbd_init(int fd, QIOChannelSocket *sioc, uint32_t flags, off_t size)
|
||||||
{
|
{
|
||||||
TRACE("Setting NBD socket");
|
TRACE("Setting NBD socket");
|
||||||
|
|
||||||
if (ioctl(fd, NBD_SET_SOCK, csock) < 0) {
|
if (ioctl(fd, NBD_SET_SOCK, sioc->fd) < 0) {
|
||||||
int serrno = errno;
|
int serrno = errno;
|
||||||
LOG("Failed to set NBD socket");
|
LOG("Failed to set NBD socket");
|
||||||
return -serrno;
|
return -serrno;
|
||||||
@ -283,7 +283,7 @@ int nbd_client(int fd)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
int nbd_init(int fd, int csock, uint32_t flags, off_t size)
|
int nbd_init(int fd, QIOChannelSocket *ioc, uint32_t flags, off_t size)
|
||||||
{
|
{
|
||||||
return -ENOTSUP;
|
return -ENOTSUP;
|
||||||
}
|
}
|
||||||
@ -294,7 +294,7 @@ int nbd_client(int fd)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
ssize_t nbd_send_request(int csock, struct nbd_request *request)
|
ssize_t nbd_send_request(QIOChannel *ioc, struct nbd_request *request)
|
||||||
{
|
{
|
||||||
uint8_t buf[NBD_REQUEST_SIZE];
|
uint8_t buf[NBD_REQUEST_SIZE];
|
||||||
ssize_t ret;
|
ssize_t ret;
|
||||||
@ -309,7 +309,7 @@ ssize_t nbd_send_request(int csock, struct nbd_request *request)
|
|||||||
"{ .from = %" PRIu64", .len = %u, .handle = %" PRIu64", .type=%i}",
|
"{ .from = %" PRIu64", .len = %u, .handle = %" PRIu64", .type=%i}",
|
||||||
request->from, request->len, request->handle, request->type);
|
request->from, request->len, request->handle, request->type);
|
||||||
|
|
||||||
ret = write_sync(csock, buf, sizeof(buf));
|
ret = write_sync(ioc, buf, sizeof(buf));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -321,13 +321,13 @@ ssize_t nbd_send_request(int csock, struct nbd_request *request)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply)
|
ssize_t nbd_receive_reply(QIOChannel *ioc, struct nbd_reply *reply)
|
||||||
{
|
{
|
||||||
uint8_t buf[NBD_REPLY_SIZE];
|
uint8_t buf[NBD_REPLY_SIZE];
|
||||||
uint32_t magic;
|
uint32_t magic;
|
||||||
ssize_t ret;
|
ssize_t ret;
|
||||||
|
|
||||||
ret = read_sync(csock, buf, sizeof(buf));
|
ret = read_sync(ioc, buf, sizeof(buf));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
66
nbd/common.c
66
nbd/common.c
@ -19,47 +19,59 @@
|
|||||||
#include "qemu/osdep.h"
|
#include "qemu/osdep.h"
|
||||||
#include "nbd-internal.h"
|
#include "nbd-internal.h"
|
||||||
|
|
||||||
ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
|
ssize_t nbd_wr_syncv(QIOChannel *ioc,
|
||||||
|
struct iovec *iov,
|
||||||
|
size_t niov,
|
||||||
|
size_t offset,
|
||||||
|
size_t length,
|
||||||
|
bool do_read)
|
||||||
{
|
{
|
||||||
size_t offset = 0;
|
ssize_t done = 0;
|
||||||
int err;
|
Error *local_err = NULL;
|
||||||
|
struct iovec *local_iov = g_new(struct iovec, niov);
|
||||||
|
struct iovec *local_iov_head = local_iov;
|
||||||
|
unsigned int nlocal_iov = niov;
|
||||||
|
|
||||||
if (qemu_in_coroutine()) {
|
nlocal_iov = iov_copy(local_iov, nlocal_iov,
|
||||||
if (do_read) {
|
iov, niov,
|
||||||
return qemu_co_recv(fd, buffer, size);
|
offset, length);
|
||||||
} else {
|
|
||||||
return qemu_co_send(fd, buffer, size);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
while (offset < size) {
|
while (nlocal_iov > 0) {
|
||||||
ssize_t len;
|
ssize_t len;
|
||||||
|
|
||||||
if (do_read) {
|
if (do_read) {
|
||||||
len = qemu_recv(fd, buffer + offset, size - offset, 0);
|
len = qio_channel_readv(ioc, local_iov, nlocal_iov, &local_err);
|
||||||
} else {
|
} else {
|
||||||
len = send(fd, buffer + offset, size - offset, 0);
|
len = qio_channel_writev(ioc, local_iov, nlocal_iov, &local_err);
|
||||||
|
}
|
||||||
|
if (len == QIO_CHANNEL_ERR_BLOCK) {
|
||||||
|
if (qemu_in_coroutine()) {
|
||||||
|
/* XXX figure out if we can create a variant on
|
||||||
|
* qio_channel_yield() that works with AIO contexts
|
||||||
|
* and consider using that in this branch */
|
||||||
|
qemu_coroutine_yield();
|
||||||
|
} else {
|
||||||
|
qio_channel_wait(ioc,
|
||||||
|
do_read ? G_IO_IN : G_IO_OUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (len < 0) {
|
|
||||||
err = socket_error();
|
|
||||||
|
|
||||||
/* recoverable error */
|
|
||||||
if (err == EINTR || (offset > 0 && (err == EAGAIN || err == EWOULDBLOCK))) {
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (len < 0) {
|
||||||
/* unrecoverable error */
|
TRACE("I/O error: %s", error_get_pretty(local_err));
|
||||||
return -err;
|
error_free(local_err);
|
||||||
|
/* XXX handle Error objects */
|
||||||
|
done = -EIO;
|
||||||
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* eof */
|
if (do_read && len == 0) {
|
||||||
if (len == 0) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
offset += len;
|
iov_discard_front(&local_iov, &nlocal_iov, len);
|
||||||
|
done += len;
|
||||||
}
|
}
|
||||||
|
|
||||||
return offset;
|
cleanup:
|
||||||
|
g_free(local_iov_head);
|
||||||
|
return done;
|
||||||
}
|
}
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
#include "sysemu/block-backend.h"
|
#include "sysemu/block-backend.h"
|
||||||
|
|
||||||
#include "qemu/coroutine.h"
|
#include "qemu/coroutine.h"
|
||||||
|
#include "qemu/iov.h"
|
||||||
|
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
@ -29,7 +30,6 @@
|
|||||||
#include <linux/fs.h>
|
#include <linux/fs.h>
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "qemu/sockets.h"
|
|
||||||
#include "qemu/queue.h"
|
#include "qemu/queue.h"
|
||||||
#include "qemu/main-loop.h"
|
#include "qemu/main-loop.h"
|
||||||
|
|
||||||
@ -90,24 +90,22 @@
|
|||||||
#define NBD_EINVAL 22
|
#define NBD_EINVAL 22
|
||||||
#define NBD_ENOSPC 28
|
#define NBD_ENOSPC 28
|
||||||
|
|
||||||
static inline ssize_t read_sync(int fd, void *buffer, size_t size)
|
static inline ssize_t read_sync(QIOChannel *ioc, void *buffer, size_t size)
|
||||||
{
|
{
|
||||||
|
struct iovec iov = { .iov_base = buffer, .iov_len = size };
|
||||||
/* Sockets are kept in blocking mode in the negotiation phase. After
|
/* Sockets are kept in blocking mode in the negotiation phase. After
|
||||||
* that, a non-readable socket simply means that another thread stole
|
* that, a non-readable socket simply means that another thread stole
|
||||||
* our request/reply. Synchronization is done with recv_coroutine, so
|
* our request/reply. Synchronization is done with recv_coroutine, so
|
||||||
* that this is coroutine-safe.
|
* that this is coroutine-safe.
|
||||||
*/
|
*/
|
||||||
return nbd_wr_sync(fd, buffer, size, true);
|
return nbd_wr_syncv(ioc, &iov, 1, 0, size, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline ssize_t write_sync(int fd, void *buffer, size_t size)
|
static inline ssize_t write_sync(QIOChannel *ioc, void *buffer, size_t size)
|
||||||
{
|
{
|
||||||
int ret;
|
struct iovec iov = { .iov_base = buffer, .iov_len = size };
|
||||||
do {
|
|
||||||
/* For writes, we do expect the socket to be writable. */
|
return nbd_wr_syncv(ioc, &iov, 1, 0, size, false);
|
||||||
ret = nbd_wr_sync(fd, buffer, size, false);
|
|
||||||
} while (ret == -EAGAIN);
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
150
nbd/server.c
150
nbd/server.c
@ -76,7 +76,8 @@ struct NBDClient {
|
|||||||
void (*close)(NBDClient *client);
|
void (*close)(NBDClient *client);
|
||||||
|
|
||||||
NBDExport *exp;
|
NBDExport *exp;
|
||||||
int sock;
|
QIOChannelSocket *sioc; /* The underlying data channel */
|
||||||
|
QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
|
||||||
|
|
||||||
Coroutine *recv_coroutine;
|
Coroutine *recv_coroutine;
|
||||||
|
|
||||||
@ -96,45 +97,56 @@ static void nbd_set_handlers(NBDClient *client);
|
|||||||
static void nbd_unset_handlers(NBDClient *client);
|
static void nbd_unset_handlers(NBDClient *client);
|
||||||
static void nbd_update_can_read(NBDClient *client);
|
static void nbd_update_can_read(NBDClient *client);
|
||||||
|
|
||||||
static void nbd_negotiate_continue(void *opaque)
|
static gboolean nbd_negotiate_continue(QIOChannel *ioc,
|
||||||
|
GIOCondition condition,
|
||||||
|
void *opaque)
|
||||||
{
|
{
|
||||||
qemu_coroutine_enter(opaque, NULL);
|
qemu_coroutine_enter(opaque, NULL);
|
||||||
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static ssize_t nbd_negotiate_read(int fd, void *buffer, size_t size)
|
static ssize_t nbd_negotiate_read(QIOChannel *ioc, void *buffer, size_t size)
|
||||||
{
|
{
|
||||||
ssize_t ret;
|
ssize_t ret;
|
||||||
|
guint watch;
|
||||||
|
|
||||||
assert(qemu_in_coroutine());
|
assert(qemu_in_coroutine());
|
||||||
/* Negotiation are always in main loop. */
|
/* Negotiation are always in main loop. */
|
||||||
qemu_set_fd_handler(fd, nbd_negotiate_continue, NULL,
|
watch = qio_channel_add_watch(ioc,
|
||||||
qemu_coroutine_self());
|
G_IO_IN,
|
||||||
ret = read_sync(fd, buffer, size);
|
nbd_negotiate_continue,
|
||||||
qemu_set_fd_handler(fd, NULL, NULL, NULL);
|
qemu_coroutine_self(),
|
||||||
|
NULL);
|
||||||
|
ret = read_sync(ioc, buffer, size);
|
||||||
|
g_source_remove(watch);
|
||||||
return ret;
|
return ret;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static ssize_t nbd_negotiate_write(int fd, void *buffer, size_t size)
|
static ssize_t nbd_negotiate_write(QIOChannel *ioc, void *buffer, size_t size)
|
||||||
{
|
{
|
||||||
ssize_t ret;
|
ssize_t ret;
|
||||||
|
guint watch;
|
||||||
|
|
||||||
assert(qemu_in_coroutine());
|
assert(qemu_in_coroutine());
|
||||||
/* Negotiation are always in main loop. */
|
/* Negotiation are always in main loop. */
|
||||||
qemu_set_fd_handler(fd, NULL, nbd_negotiate_continue,
|
watch = qio_channel_add_watch(ioc,
|
||||||
qemu_coroutine_self());
|
G_IO_OUT,
|
||||||
ret = write_sync(fd, buffer, size);
|
nbd_negotiate_continue,
|
||||||
qemu_set_fd_handler(fd, NULL, NULL, NULL);
|
qemu_coroutine_self(),
|
||||||
|
NULL);
|
||||||
|
ret = write_sync(ioc, buffer, size);
|
||||||
|
g_source_remove(watch);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static ssize_t nbd_negotiate_drop_sync(int fd, size_t size)
|
static ssize_t nbd_negotiate_drop_sync(QIOChannel *ioc, size_t size)
|
||||||
{
|
{
|
||||||
ssize_t ret, dropped = size;
|
ssize_t ret, dropped = size;
|
||||||
uint8_t *buffer = g_malloc(MIN(65536, size));
|
uint8_t *buffer = g_malloc(MIN(65536, size));
|
||||||
|
|
||||||
while (size > 0) {
|
while (size > 0) {
|
||||||
ret = nbd_negotiate_read(fd, buffer, MIN(65536, size));
|
ret = nbd_negotiate_read(ioc, buffer, MIN(65536, size));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
g_free(buffer);
|
g_free(buffer);
|
||||||
return ret;
|
return ret;
|
||||||
@ -175,66 +187,66 @@ static ssize_t nbd_negotiate_drop_sync(int fd, size_t size)
|
|||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
static int nbd_negotiate_send_rep(int csock, uint32_t type, uint32_t opt)
|
static int nbd_negotiate_send_rep(QIOChannel *ioc, uint32_t type, uint32_t opt)
|
||||||
{
|
{
|
||||||
uint64_t magic;
|
uint64_t magic;
|
||||||
uint32_t len;
|
uint32_t len;
|
||||||
|
|
||||||
magic = cpu_to_be64(NBD_REP_MAGIC);
|
magic = cpu_to_be64(NBD_REP_MAGIC);
|
||||||
if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
|
if (nbd_negotiate_write(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
|
||||||
LOG("write failed (rep magic)");
|
LOG("write failed (rep magic)");
|
||||||
return -EINVAL;
|
return -EINVAL;
|
||||||
}
|
}
|
||||||
opt = cpu_to_be32(opt);
|
opt = cpu_to_be32(opt);
|
||||||
if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
|
if (nbd_negotiate_write(ioc, &opt, sizeof(opt)) != sizeof(opt)) {
|
||||||
LOG("write failed (rep opt)");
|
LOG("write failed (rep opt)");
|
||||||
return -EINVAL;
|
return -EINVAL;
|
||||||
}
|
}
|
||||||
type = cpu_to_be32(type);
|
type = cpu_to_be32(type);
|
||||||
if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
|
if (nbd_negotiate_write(ioc, &type, sizeof(type)) != sizeof(type)) {
|
||||||
LOG("write failed (rep type)");
|
LOG("write failed (rep type)");
|
||||||
return -EINVAL;
|
return -EINVAL;
|
||||||
}
|
}
|
||||||
len = cpu_to_be32(0);
|
len = cpu_to_be32(0);
|
||||||
if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
|
if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) {
|
||||||
LOG("write failed (rep data length)");
|
LOG("write failed (rep data length)");
|
||||||
return -EINVAL;
|
return -EINVAL;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int nbd_negotiate_send_rep_list(int csock, NBDExport *exp)
|
static int nbd_negotiate_send_rep_list(QIOChannel *ioc, NBDExport *exp)
|
||||||
{
|
{
|
||||||
uint64_t magic, name_len;
|
uint64_t magic, name_len;
|
||||||
uint32_t opt, type, len;
|
uint32_t opt, type, len;
|
||||||
|
|
||||||
name_len = strlen(exp->name);
|
name_len = strlen(exp->name);
|
||||||
magic = cpu_to_be64(NBD_REP_MAGIC);
|
magic = cpu_to_be64(NBD_REP_MAGIC);
|
||||||
if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
|
if (nbd_negotiate_write(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
|
||||||
LOG("write failed (magic)");
|
LOG("write failed (magic)");
|
||||||
return -EINVAL;
|
return -EINVAL;
|
||||||
}
|
}
|
||||||
opt = cpu_to_be32(NBD_OPT_LIST);
|
opt = cpu_to_be32(NBD_OPT_LIST);
|
||||||
if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
|
if (nbd_negotiate_write(ioc, &opt, sizeof(opt)) != sizeof(opt)) {
|
||||||
LOG("write failed (opt)");
|
LOG("write failed (opt)");
|
||||||
return -EINVAL;
|
return -EINVAL;
|
||||||
}
|
}
|
||||||
type = cpu_to_be32(NBD_REP_SERVER);
|
type = cpu_to_be32(NBD_REP_SERVER);
|
||||||
if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
|
if (nbd_negotiate_write(ioc, &type, sizeof(type)) != sizeof(type)) {
|
||||||
LOG("write failed (reply type)");
|
LOG("write failed (reply type)");
|
||||||
return -EINVAL;
|
return -EINVAL;
|
||||||
}
|
}
|
||||||
len = cpu_to_be32(name_len + sizeof(len));
|
len = cpu_to_be32(name_len + sizeof(len));
|
||||||
if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
|
if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) {
|
||||||
LOG("write failed (length)");
|
LOG("write failed (length)");
|
||||||
return -EINVAL;
|
return -EINVAL;
|
||||||
}
|
}
|
||||||
len = cpu_to_be32(name_len);
|
len = cpu_to_be32(name_len);
|
||||||
if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
|
if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) {
|
||||||
LOG("write failed (length)");
|
LOG("write failed (length)");
|
||||||
return -EINVAL;
|
return -EINVAL;
|
||||||
}
|
}
|
||||||
if (nbd_negotiate_write(csock, exp->name, name_len) != name_len) {
|
if (nbd_negotiate_write(ioc, exp->name, name_len) != name_len) {
|
||||||
LOG("write failed (buffer)");
|
LOG("write failed (buffer)");
|
||||||
return -EINVAL;
|
return -EINVAL;
|
||||||
}
|
}
|
||||||
@ -243,30 +255,29 @@ static int nbd_negotiate_send_rep_list(int csock, NBDExport *exp)
|
|||||||
|
|
||||||
static int nbd_negotiate_handle_list(NBDClient *client, uint32_t length)
|
static int nbd_negotiate_handle_list(NBDClient *client, uint32_t length)
|
||||||
{
|
{
|
||||||
int csock;
|
|
||||||
NBDExport *exp;
|
NBDExport *exp;
|
||||||
|
|
||||||
csock = client->sock;
|
|
||||||
if (length) {
|
if (length) {
|
||||||
if (nbd_negotiate_drop_sync(csock, length) != length) {
|
if (nbd_negotiate_drop_sync(client->ioc, length) != length) {
|
||||||
return -EIO;
|
return -EIO;
|
||||||
}
|
}
|
||||||
return nbd_negotiate_send_rep(csock, NBD_REP_ERR_INVALID, NBD_OPT_LIST);
|
return nbd_negotiate_send_rep(client->ioc,
|
||||||
|
NBD_REP_ERR_INVALID, NBD_OPT_LIST);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* For each export, send a NBD_REP_SERVER reply. */
|
/* For each export, send a NBD_REP_SERVER reply. */
|
||||||
QTAILQ_FOREACH(exp, &exports, next) {
|
QTAILQ_FOREACH(exp, &exports, next) {
|
||||||
if (nbd_negotiate_send_rep_list(csock, exp)) {
|
if (nbd_negotiate_send_rep_list(client->ioc, exp)) {
|
||||||
return -EINVAL;
|
return -EINVAL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/* Finish with a NBD_REP_ACK. */
|
/* Finish with a NBD_REP_ACK. */
|
||||||
return nbd_negotiate_send_rep(csock, NBD_REP_ACK, NBD_OPT_LIST);
|
return nbd_negotiate_send_rep(client->ioc, NBD_REP_ACK, NBD_OPT_LIST);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length)
|
static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length)
|
||||||
{
|
{
|
||||||
int rc = -EINVAL, csock = client->sock;
|
int rc = -EINVAL;
|
||||||
char name[256];
|
char name[256];
|
||||||
|
|
||||||
/* Client sends:
|
/* Client sends:
|
||||||
@ -277,7 +288,7 @@ static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length)
|
|||||||
LOG("Bad length received");
|
LOG("Bad length received");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
if (nbd_negotiate_read(csock, name, length) != length) {
|
if (nbd_negotiate_read(client->ioc, name, length) != length) {
|
||||||
LOG("read failed");
|
LOG("read failed");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
@ -298,7 +309,6 @@ fail:
|
|||||||
|
|
||||||
static int nbd_negotiate_options(NBDClient *client)
|
static int nbd_negotiate_options(NBDClient *client)
|
||||||
{
|
{
|
||||||
int csock = client->sock;
|
|
||||||
uint32_t flags;
|
uint32_t flags;
|
||||||
|
|
||||||
/* Client sends:
|
/* Client sends:
|
||||||
@ -315,7 +325,8 @@ static int nbd_negotiate_options(NBDClient *client)
|
|||||||
... Rest of request
|
... Rest of request
|
||||||
*/
|
*/
|
||||||
|
|
||||||
if (nbd_negotiate_read(csock, &flags, sizeof(flags)) != sizeof(flags)) {
|
if (nbd_negotiate_read(client->ioc, &flags, sizeof(flags)) !=
|
||||||
|
sizeof(flags)) {
|
||||||
LOG("read failed");
|
LOG("read failed");
|
||||||
return -EIO;
|
return -EIO;
|
||||||
}
|
}
|
||||||
@ -331,7 +342,8 @@ static int nbd_negotiate_options(NBDClient *client)
|
|||||||
uint32_t tmp, length;
|
uint32_t tmp, length;
|
||||||
uint64_t magic;
|
uint64_t magic;
|
||||||
|
|
||||||
if (nbd_negotiate_read(csock, &magic, sizeof(magic)) != sizeof(magic)) {
|
if (nbd_negotiate_read(client->ioc, &magic, sizeof(magic)) !=
|
||||||
|
sizeof(magic)) {
|
||||||
LOG("read failed");
|
LOG("read failed");
|
||||||
return -EINVAL;
|
return -EINVAL;
|
||||||
}
|
}
|
||||||
@ -341,13 +353,13 @@ static int nbd_negotiate_options(NBDClient *client)
|
|||||||
return -EINVAL;
|
return -EINVAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
|
if (nbd_negotiate_read(client->ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) {
|
||||||
LOG("read failed");
|
LOG("read failed");
|
||||||
return -EINVAL;
|
return -EINVAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nbd_negotiate_read(csock, &length,
|
if (nbd_negotiate_read(client->ioc, &length, sizeof(length)) !=
|
||||||
sizeof(length)) != sizeof(length)) {
|
sizeof(length)) {
|
||||||
LOG("read failed");
|
LOG("read failed");
|
||||||
return -EINVAL;
|
return -EINVAL;
|
||||||
}
|
}
|
||||||
@ -371,7 +383,7 @@ static int nbd_negotiate_options(NBDClient *client)
|
|||||||
default:
|
default:
|
||||||
tmp = be32_to_cpu(tmp);
|
tmp = be32_to_cpu(tmp);
|
||||||
LOG("Unsupported option 0x%x", tmp);
|
LOG("Unsupported option 0x%x", tmp);
|
||||||
nbd_negotiate_send_rep(client->sock, NBD_REP_ERR_UNSUP, tmp);
|
nbd_negotiate_send_rep(client->ioc, NBD_REP_ERR_UNSUP, tmp);
|
||||||
return -EINVAL;
|
return -EINVAL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -385,7 +397,6 @@ typedef struct {
|
|||||||
static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
|
static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
|
||||||
{
|
{
|
||||||
NBDClient *client = data->client;
|
NBDClient *client = data->client;
|
||||||
int csock = client->sock;
|
|
||||||
char buf[8 + 8 + 8 + 128];
|
char buf[8 + 8 + 8 + 128];
|
||||||
int rc;
|
int rc;
|
||||||
const int myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM |
|
const int myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM |
|
||||||
@ -410,6 +421,7 @@ static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
|
|||||||
[28 .. 151] reserved (0)
|
[28 .. 151] reserved (0)
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
qio_channel_set_blocking(client->ioc, false, NULL);
|
||||||
rc = -EINVAL;
|
rc = -EINVAL;
|
||||||
|
|
||||||
TRACE("Beginning negotiation.");
|
TRACE("Beginning negotiation.");
|
||||||
@ -426,12 +438,12 @@ static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (client->exp) {
|
if (client->exp) {
|
||||||
if (nbd_negotiate_write(csock, buf, sizeof(buf)) != sizeof(buf)) {
|
if (nbd_negotiate_write(client->ioc, buf, sizeof(buf)) != sizeof(buf)) {
|
||||||
LOG("write failed");
|
LOG("write failed");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (nbd_negotiate_write(csock, buf, 18) != 18) {
|
if (nbd_negotiate_write(client->ioc, buf, 18) != 18) {
|
||||||
LOG("write failed");
|
LOG("write failed");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
@ -444,8 +456,8 @@ static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
|
|||||||
assert ((client->exp->nbdflags & ~65535) == 0);
|
assert ((client->exp->nbdflags & ~65535) == 0);
|
||||||
stq_be_p(buf + 18, client->exp->size);
|
stq_be_p(buf + 18, client->exp->size);
|
||||||
stw_be_p(buf + 26, client->exp->nbdflags | myflags);
|
stw_be_p(buf + 26, client->exp->nbdflags | myflags);
|
||||||
if (nbd_negotiate_write(csock, buf + 18,
|
if (nbd_negotiate_write(client->ioc, buf + 18, sizeof(buf) - 18) !=
|
||||||
sizeof(buf) - 18) != sizeof(buf) - 18) {
|
sizeof(buf) - 18) {
|
||||||
LOG("write failed");
|
LOG("write failed");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
@ -475,13 +487,13 @@ int nbd_disconnect(int fd)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static ssize_t nbd_receive_request(int csock, struct nbd_request *request)
|
static ssize_t nbd_receive_request(QIOChannel *ioc, struct nbd_request *request)
|
||||||
{
|
{
|
||||||
uint8_t buf[NBD_REQUEST_SIZE];
|
uint8_t buf[NBD_REQUEST_SIZE];
|
||||||
uint32_t magic;
|
uint32_t magic;
|
||||||
ssize_t ret;
|
ssize_t ret;
|
||||||
|
|
||||||
ret = read_sync(csock, buf, sizeof(buf));
|
ret = read_sync(ioc, buf, sizeof(buf));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -516,7 +528,7 @@ static ssize_t nbd_receive_request(int csock, struct nbd_request *request)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply)
|
static ssize_t nbd_send_reply(QIOChannel *ioc, struct nbd_reply *reply)
|
||||||
{
|
{
|
||||||
uint8_t buf[NBD_REPLY_SIZE];
|
uint8_t buf[NBD_REPLY_SIZE];
|
||||||
ssize_t ret;
|
ssize_t ret;
|
||||||
@ -534,7 +546,7 @@ static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply)
|
|||||||
|
|
||||||
TRACE("Sending response to client");
|
TRACE("Sending response to client");
|
||||||
|
|
||||||
ret = write_sync(csock, buf, sizeof(buf));
|
ret = write_sync(ioc, buf, sizeof(buf));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -562,8 +574,8 @@ void nbd_client_put(NBDClient *client)
|
|||||||
assert(client->closing);
|
assert(client->closing);
|
||||||
|
|
||||||
nbd_unset_handlers(client);
|
nbd_unset_handlers(client);
|
||||||
close(client->sock);
|
object_unref(OBJECT(client->sioc));
|
||||||
client->sock = -1;
|
object_unref(OBJECT(client->ioc));
|
||||||
if (client->exp) {
|
if (client->exp) {
|
||||||
QTAILQ_REMOVE(&client->exp->clients, client, next);
|
QTAILQ_REMOVE(&client->exp->clients, client, next);
|
||||||
nbd_export_put(client->exp);
|
nbd_export_put(client->exp);
|
||||||
@ -583,7 +595,8 @@ static void client_close(NBDClient *client)
|
|||||||
/* Force requests to finish. They will drop their own references,
|
/* Force requests to finish. They will drop their own references,
|
||||||
* then we'll close the socket and free the NBDClient.
|
* then we'll close the socket and free the NBDClient.
|
||||||
*/
|
*/
|
||||||
shutdown(client->sock, 2);
|
qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH,
|
||||||
|
NULL);
|
||||||
|
|
||||||
/* Also tell the client, so that they release their reference. */
|
/* Also tell the client, so that they release their reference. */
|
||||||
if (client->close) {
|
if (client->close) {
|
||||||
@ -789,25 +802,25 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
|
|||||||
int len)
|
int len)
|
||||||
{
|
{
|
||||||
NBDClient *client = req->client;
|
NBDClient *client = req->client;
|
||||||
int csock = client->sock;
|
|
||||||
ssize_t rc, ret;
|
ssize_t rc, ret;
|
||||||
|
|
||||||
|
g_assert(qemu_in_coroutine());
|
||||||
qemu_co_mutex_lock(&client->send_lock);
|
qemu_co_mutex_lock(&client->send_lock);
|
||||||
client->send_coroutine = qemu_coroutine_self();
|
client->send_coroutine = qemu_coroutine_self();
|
||||||
nbd_set_handlers(client);
|
nbd_set_handlers(client);
|
||||||
|
|
||||||
if (!len) {
|
if (!len) {
|
||||||
rc = nbd_send_reply(csock, reply);
|
rc = nbd_send_reply(client->ioc, reply);
|
||||||
} else {
|
} else {
|
||||||
socket_set_cork(csock, 1);
|
qio_channel_set_cork(client->ioc, true);
|
||||||
rc = nbd_send_reply(csock, reply);
|
rc = nbd_send_reply(client->ioc, reply);
|
||||||
if (rc >= 0) {
|
if (rc >= 0) {
|
||||||
ret = qemu_co_send(csock, req->data, len);
|
ret = write_sync(client->ioc, req->data, len);
|
||||||
if (ret != len) {
|
if (ret != len) {
|
||||||
rc = -EIO;
|
rc = -EIO;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
socket_set_cork(csock, 0);
|
qio_channel_set_cork(client->ioc, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
client->send_coroutine = NULL;
|
client->send_coroutine = NULL;
|
||||||
@ -819,14 +832,14 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
|
|||||||
static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *request)
|
static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *request)
|
||||||
{
|
{
|
||||||
NBDClient *client = req->client;
|
NBDClient *client = req->client;
|
||||||
int csock = client->sock;
|
|
||||||
uint32_t command;
|
uint32_t command;
|
||||||
ssize_t rc;
|
ssize_t rc;
|
||||||
|
|
||||||
|
g_assert(qemu_in_coroutine());
|
||||||
client->recv_coroutine = qemu_coroutine_self();
|
client->recv_coroutine = qemu_coroutine_self();
|
||||||
nbd_update_can_read(client);
|
nbd_update_can_read(client);
|
||||||
|
|
||||||
rc = nbd_receive_request(csock, request);
|
rc = nbd_receive_request(client->ioc, request);
|
||||||
if (rc < 0) {
|
if (rc < 0) {
|
||||||
if (rc != -EAGAIN) {
|
if (rc != -EAGAIN) {
|
||||||
rc = -EIO;
|
rc = -EIO;
|
||||||
@ -861,7 +874,7 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque
|
|||||||
if (command == NBD_CMD_WRITE) {
|
if (command == NBD_CMD_WRITE) {
|
||||||
TRACE("Reading %u byte(s)", request->len);
|
TRACE("Reading %u byte(s)", request->len);
|
||||||
|
|
||||||
if (qemu_co_recv(csock, req->data, request->len) != request->len) {
|
if (read_sync(client->ioc, req->data, request->len) != request->len) {
|
||||||
LOG("reading from socket failed");
|
LOG("reading from socket failed");
|
||||||
rc = -EIO;
|
rc = -EIO;
|
||||||
goto out;
|
goto out;
|
||||||
@ -1056,7 +1069,7 @@ static void nbd_restart_write(void *opaque)
|
|||||||
static void nbd_set_handlers(NBDClient *client)
|
static void nbd_set_handlers(NBDClient *client)
|
||||||
{
|
{
|
||||||
if (client->exp && client->exp->ctx) {
|
if (client->exp && client->exp->ctx) {
|
||||||
aio_set_fd_handler(client->exp->ctx, client->sock,
|
aio_set_fd_handler(client->exp->ctx, client->sioc->fd,
|
||||||
true,
|
true,
|
||||||
client->can_read ? nbd_read : NULL,
|
client->can_read ? nbd_read : NULL,
|
||||||
client->send_coroutine ? nbd_restart_write : NULL,
|
client->send_coroutine ? nbd_restart_write : NULL,
|
||||||
@ -1067,7 +1080,7 @@ static void nbd_set_handlers(NBDClient *client)
|
|||||||
static void nbd_unset_handlers(NBDClient *client)
|
static void nbd_unset_handlers(NBDClient *client)
|
||||||
{
|
{
|
||||||
if (client->exp && client->exp->ctx) {
|
if (client->exp && client->exp->ctx) {
|
||||||
aio_set_fd_handler(client->exp->ctx, client->sock,
|
aio_set_fd_handler(client->exp->ctx, client->sioc->fd,
|
||||||
true, NULL, NULL, NULL);
|
true, NULL, NULL, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1109,7 +1122,9 @@ out:
|
|||||||
g_free(data);
|
g_free(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *))
|
void nbd_client_new(NBDExport *exp,
|
||||||
|
QIOChannelSocket *sioc,
|
||||||
|
void (*close_fn)(NBDClient *))
|
||||||
{
|
{
|
||||||
NBDClient *client;
|
NBDClient *client;
|
||||||
NBDClientNewData *data = g_new(NBDClientNewData, 1);
|
NBDClientNewData *data = g_new(NBDClientNewData, 1);
|
||||||
@ -1117,7 +1132,10 @@ void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *))
|
|||||||
client = g_malloc0(sizeof(NBDClient));
|
client = g_malloc0(sizeof(NBDClient));
|
||||||
client->refcount = 1;
|
client->refcount = 1;
|
||||||
client->exp = exp;
|
client->exp = exp;
|
||||||
client->sock = csock;
|
client->sioc = sioc;
|
||||||
|
object_ref(OBJECT(client->sioc));
|
||||||
|
client->ioc = QIO_CHANNEL(sioc);
|
||||||
|
object_ref(OBJECT(client->ioc));
|
||||||
client->can_read = true;
|
client->can_read = true;
|
||||||
client->close = close_fn;
|
client->close = close_fn;
|
||||||
|
|
||||||
|
10
qemu-nbd.c
10
qemu-nbd.c
@ -249,7 +249,7 @@ static void *nbd_client_thread(void *arg)
|
|||||||
goto out;
|
goto out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = nbd_receive_negotiate(sioc->fd, NULL, &nbdflags,
|
ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), NULL, &nbdflags,
|
||||||
&size, &local_error);
|
&size, &local_error);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
if (local_error) {
|
if (local_error) {
|
||||||
@ -265,7 +265,7 @@ static void *nbd_client_thread(void *arg)
|
|||||||
goto out_socket;
|
goto out_socket;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = nbd_init(fd, sioc->fd, nbdflags, size);
|
ret = nbd_init(fd, sioc, nbdflags, size);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
goto out_fd;
|
goto out_fd;
|
||||||
}
|
}
|
||||||
@ -325,7 +325,6 @@ static void nbd_client_closed(NBDClient *client)
|
|||||||
static gboolean nbd_accept(QIOChannel *ioc, GIOCondition cond, gpointer opaque)
|
static gboolean nbd_accept(QIOChannel *ioc, GIOCondition cond, gpointer opaque)
|
||||||
{
|
{
|
||||||
QIOChannelSocket *cioc;
|
QIOChannelSocket *cioc;
|
||||||
int fd;
|
|
||||||
|
|
||||||
cioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
|
cioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
|
||||||
NULL);
|
NULL);
|
||||||
@ -340,10 +339,7 @@ static gboolean nbd_accept(QIOChannel *ioc, GIOCondition cond, gpointer opaque)
|
|||||||
|
|
||||||
nb_fds++;
|
nb_fds++;
|
||||||
nbd_update_server_watch();
|
nbd_update_server_watch();
|
||||||
fd = dup(cioc->fd);
|
nbd_client_new(exp, cioc, nbd_client_closed);
|
||||||
if (fd >= 0) {
|
|
||||||
nbd_client_new(exp, fd, nbd_client_closed);
|
|
||||||
}
|
|
||||||
object_unref(OBJECT(cioc));
|
object_unref(OBJECT(cioc));
|
||||||
|
|
||||||
return TRUE;
|
return TRUE;
|
||||||
|
Loading…
Reference in New Issue
Block a user