Add qemu_aio_process_queue()

We'll leave some AIO completions unhandled when we can't call the callback.
qemu_aio_process_queue() is used later to run any callbacks that are left and
can be run then.

Signed-off-by: Kevin Wolf <kwolf@redhat.com>
Signed-off-by: Anthony Liguori <aliguori@us.ibm.com>
This commit is contained in:
Kevin Wolf 2009-10-22 17:54:36 +02:00 committed by Anthony Liguori
parent 59c7b155aa
commit 8febfa2684
5 changed files with 50 additions and 6 deletions

30
aio.c
View File

@ -33,6 +33,7 @@ struct AioHandler
IOHandler *io_read; IOHandler *io_read;
IOHandler *io_write; IOHandler *io_write;
AioFlushHandler *io_flush; AioFlushHandler *io_flush;
AioProcessQueue *io_process_queue;
int deleted; int deleted;
void *opaque; void *opaque;
QLIST_ENTRY(AioHandler) node; QLIST_ENTRY(AioHandler) node;
@ -55,6 +56,7 @@ int qemu_aio_set_fd_handler(int fd,
IOHandler *io_read, IOHandler *io_read,
IOHandler *io_write, IOHandler *io_write,
AioFlushHandler *io_flush, AioFlushHandler *io_flush,
AioProcessQueue *io_process_queue,
void *opaque) void *opaque)
{ {
AioHandler *node; AioHandler *node;
@ -87,6 +89,7 @@ int qemu_aio_set_fd_handler(int fd,
node->io_read = io_read; node->io_read = io_read;
node->io_write = io_write; node->io_write = io_write;
node->io_flush = io_flush; node->io_flush = io_flush;
node->io_process_queue = io_process_queue;
node->opaque = opaque; node->opaque = opaque;
} }
@ -115,6 +118,26 @@ void qemu_aio_flush(void)
} while (qemu_bh_poll() || ret > 0); } while (qemu_bh_poll() || ret > 0);
} }
int qemu_aio_process_queue(void)
{
AioHandler *node;
int ret = 0;
walking_handlers = 1;
QLIST_FOREACH(node, &aio_handlers, node) {
if (node->io_process_queue) {
if (node->io_process_queue(node->opaque)) {
ret = 1;
}
}
}
walking_handlers = 0;
return ret;
}
void qemu_aio_wait(void) void qemu_aio_wait(void)
{ {
int ret; int ret;
@ -122,6 +145,13 @@ void qemu_aio_wait(void)
if (qemu_bh_poll()) if (qemu_bh_poll())
return; return;
/*
* If there are callbacks left that have been queued, we need to call then.
* Return afterwards to avoid waiting needlessly in select().
*/
if (qemu_aio_process_queue())
return;
do { do {
AioHandler *node; AioHandler *node;
fd_set rdfds, wrfds; fd_set rdfds, wrfds;

View File

@ -83,17 +83,17 @@ static int curl_sock_cb(CURL *curl, curl_socket_t fd, int action,
dprintf("CURL (AIO): Sock action %d on fd %d\n", action, fd); dprintf("CURL (AIO): Sock action %d on fd %d\n", action, fd);
switch (action) { switch (action) {
case CURL_POLL_IN: case CURL_POLL_IN:
qemu_aio_set_fd_handler(fd, curl_multi_do, NULL, NULL, s); qemu_aio_set_fd_handler(fd, curl_multi_do, NULL, NULL, NULL, s);
break; break;
case CURL_POLL_OUT: case CURL_POLL_OUT:
qemu_aio_set_fd_handler(fd, NULL, curl_multi_do, NULL, s); qemu_aio_set_fd_handler(fd, NULL, curl_multi_do, NULL, NULL, s);
break; break;
case CURL_POLL_INOUT: case CURL_POLL_INOUT:
qemu_aio_set_fd_handler(fd, curl_multi_do, qemu_aio_set_fd_handler(fd, curl_multi_do,
curl_multi_do, NULL, s); curl_multi_do, NULL, NULL, s);
break; break;
case CURL_POLL_REMOVE: case CURL_POLL_REMOVE:
qemu_aio_set_fd_handler(fd, NULL, NULL, NULL, NULL); qemu_aio_set_fd_handler(fd, NULL, NULL, NULL, NULL, NULL);
break; break;
} }

View File

@ -192,7 +192,7 @@ void *laio_init(void)
goto out_close_efd; goto out_close_efd;
qemu_aio_set_fd_handler(s->efd, qemu_laio_completion_cb, qemu_aio_set_fd_handler(s->efd, qemu_laio_completion_cb,
NULL, qemu_laio_flush_cb, s); NULL, qemu_laio_flush_cb, NULL, s);
return s; return s;

View File

@ -624,7 +624,8 @@ void *paio_init(void)
fcntl(s->rfd, F_SETFL, O_NONBLOCK); fcntl(s->rfd, F_SETFL, O_NONBLOCK);
fcntl(s->wfd, F_SETFL, O_NONBLOCK); fcntl(s->wfd, F_SETFL, O_NONBLOCK);
qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s); qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
posix_aio_process_queue, s);
ret = pthread_attr_init(&attr); ret = pthread_attr_init(&attr);
if (ret) if (ret)

View File

@ -20,6 +20,11 @@
/* Returns 1 if there are still outstanding AIO requests; 0 otherwise */ /* Returns 1 if there are still outstanding AIO requests; 0 otherwise */
typedef int (AioFlushHandler)(void *opaque); typedef int (AioFlushHandler)(void *opaque);
/* Runs all currently allowed AIO callbacks of completed requests in the
* respective AIO backend. Returns 0 if no requests was handled, non-zero
* if at least one queued request was handled. */
typedef int (AioProcessQueue)(void *opaque);
/* Flush any pending AIO operation. This function will block until all /* Flush any pending AIO operation. This function will block until all
* outstanding AIO operations have been completed or cancelled. */ * outstanding AIO operations have been completed or cancelled. */
void qemu_aio_flush(void); void qemu_aio_flush(void);
@ -30,6 +35,13 @@ void qemu_aio_flush(void);
* result of executing I/O completion or bh callbacks. */ * result of executing I/O completion or bh callbacks. */
void qemu_aio_wait(void); void qemu_aio_wait(void);
/*
* Runs all currently allowed AIO callbacks of completed requests. Returns 0
* if no requests were handled, non-zero if at least one request was
* processed.
*/
int qemu_aio_process_queue(void);
/* Register a file descriptor and associated callbacks. Behaves very similarly /* Register a file descriptor and associated callbacks. Behaves very similarly
* to qemu_set_fd_handler2. Unlike qemu_set_fd_handler2, these callbacks will * to qemu_set_fd_handler2. Unlike qemu_set_fd_handler2, these callbacks will
* be invoked when using either qemu_aio_wait() or qemu_aio_flush(). * be invoked when using either qemu_aio_wait() or qemu_aio_flush().
@ -41,6 +53,7 @@ int qemu_aio_set_fd_handler(int fd,
IOHandler *io_read, IOHandler *io_read,
IOHandler *io_write, IOHandler *io_write,
AioFlushHandler *io_flush, AioFlushHandler *io_flush,
AioProcessQueue *io_process_queue,
void *opaque); void *opaque);
#endif #endif