iothread: release iothread around aio_poll

This is the first step towards having fine-grained critical sections in
dataplane threads, which resolves lock ordering problems between
address_space_* functions (which need the BQL when doing MMIO, even
after we complete RCU-based dispatch) and the AioContext.

Because AioContext does not use contention callbacks anymore, the
unit test has to be changed.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Message-id: 1424449612-18215-4-git-send-email-pbonzini@redhat.com
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
This commit is contained in:
Paolo Bonzini 2015-02-20 17:26:52 +01:00 committed by Kevin Wolf
parent 49110174f8
commit a0710f7995
3 changed files with 14 additions and 24 deletions

View File

@ -280,12 +280,6 @@ static void aio_timerlist_notify(void *opaque)
aio_notify(opaque); aio_notify(opaque);
} }
static void aio_rfifolock_cb(void *opaque)
{
/* Kick owner thread in case they are blocked in aio_poll() */
aio_notify(opaque);
}
AioContext *aio_context_new(Error **errp) AioContext *aio_context_new(Error **errp)
{ {
int ret; int ret;
@ -303,7 +297,7 @@ AioContext *aio_context_new(Error **errp)
event_notifier_test_and_clear); event_notifier_test_and_clear);
ctx->thread_pool = NULL; ctx->thread_pool = NULL;
qemu_mutex_init(&ctx->bh_lock); qemu_mutex_init(&ctx->bh_lock);
rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx); rfifolock_init(&ctx->lock, NULL, NULL);
timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx); timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
return ctx; return ctx;

View File

@ -31,21 +31,14 @@ typedef ObjectClass IOThreadClass;
static void *iothread_run(void *opaque) static void *iothread_run(void *opaque)
{ {
IOThread *iothread = opaque; IOThread *iothread = opaque;
bool blocking;
qemu_mutex_lock(&iothread->init_done_lock); qemu_mutex_lock(&iothread->init_done_lock);
iothread->thread_id = qemu_get_thread_id(); iothread->thread_id = qemu_get_thread_id();
qemu_cond_signal(&iothread->init_done_cond); qemu_cond_signal(&iothread->init_done_cond);
qemu_mutex_unlock(&iothread->init_done_lock); qemu_mutex_unlock(&iothread->init_done_lock);
while (!iothread->stopping) { while (!atomic_read(&iothread->stopping)) {
aio_context_acquire(iothread->ctx); aio_poll(iothread->ctx, true);
blocking = true;
while (!iothread->stopping && aio_poll(iothread->ctx, blocking)) {
/* Progress was made, keep going */
blocking = false;
}
aio_context_release(iothread->ctx);
} }
return NULL; return NULL;
} }

View File

@ -107,6 +107,7 @@ static void test_notify(void)
typedef struct { typedef struct {
QemuMutex start_lock; QemuMutex start_lock;
EventNotifier notifier;
bool thread_acquired; bool thread_acquired;
} AcquireTestData; } AcquireTestData;
@ -118,6 +119,8 @@ static void *test_acquire_thread(void *opaque)
qemu_mutex_lock(&data->start_lock); qemu_mutex_lock(&data->start_lock);
qemu_mutex_unlock(&data->start_lock); qemu_mutex_unlock(&data->start_lock);
g_usleep(500000);
event_notifier_set(&data->notifier);
aio_context_acquire(ctx); aio_context_acquire(ctx);
aio_context_release(ctx); aio_context_release(ctx);
@ -126,20 +129,19 @@ static void *test_acquire_thread(void *opaque)
return NULL; return NULL;
} }
static void dummy_notifier_read(EventNotifier *unused) static void dummy_notifier_read(EventNotifier *n)
{ {
g_assert(false); /* should never be invoked */ event_notifier_test_and_clear(n);
} }
static void test_acquire(void) static void test_acquire(void)
{ {
QemuThread thread; QemuThread thread;
EventNotifier notifier;
AcquireTestData data; AcquireTestData data;
/* Dummy event notifier ensures aio_poll() will block */ /* Dummy event notifier ensures aio_poll() will block */
event_notifier_init(&notifier, false); event_notifier_init(&data.notifier, false);
aio_set_event_notifier(ctx, &notifier, dummy_notifier_read); aio_set_event_notifier(ctx, &data.notifier, dummy_notifier_read);
g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */ g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */
qemu_mutex_init(&data.start_lock); qemu_mutex_init(&data.start_lock);
@ -153,12 +155,13 @@ static void test_acquire(void)
/* Block in aio_poll(), let other thread kick us and acquire context */ /* Block in aio_poll(), let other thread kick us and acquire context */
aio_context_acquire(ctx); aio_context_acquire(ctx);
qemu_mutex_unlock(&data.start_lock); /* let the thread run */ qemu_mutex_unlock(&data.start_lock); /* let the thread run */
g_assert(!aio_poll(ctx, true)); g_assert(aio_poll(ctx, true));
g_assert(!data.thread_acquired);
aio_context_release(ctx); aio_context_release(ctx);
qemu_thread_join(&thread); qemu_thread_join(&thread);
aio_set_event_notifier(ctx, &notifier, NULL); aio_set_event_notifier(ctx, &data.notifier, NULL);
event_notifier_cleanup(&notifier); event_notifier_cleanup(&data.notifier);
g_assert(data.thread_acquired); g_assert(data.thread_acquired);
} }