From ff00bed1897c3d27adc5b0cec6f6eeb5a7d13176 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Thu, 2 Mar 2023 11:10:56 +0100 Subject: [PATCH 1/9] qatomic: add smp_mb__before/after_rmw() On ARM, seqcst loads and stores (which QEMU does not use) are compiled respectively as LDAR and STLR instructions. Even though LDAR is also used for load-acquire operations, it also waits for all STLRs to leave the store buffer. Thus, LDAR and STLR alone are load-acquire and store-release operations, but LDAR also provides store-against-load ordering as long as the previous store is a STLR. Compare this to ARMv7, where store-release is DMB+STR and load-acquire is LDR+DMB, but an additional DMB is needed between store-seqcst and load-seqcst (e.g. DMB+STR+DMB+LDR+DMB); or with x86, where MOV provides load-acquire and store-release semantics and the two can be reordered. Likewise, on ARM sequentially consistent read-modify-write operations only need to use LDAXR and STLXR respectively for the load and the store, while on x86 they need to use the stronger LOCK prefix. In a strange twist of events, however, the _stronger_ semantics of the ARM instructions can end up causing bugs on ARM, not on x86. The problems occur when seqcst atomics are mixed with relaxed atomics. QEMU's atomics try to bridge the Linux API (that most of the developers are familiar with) and the C11 API, and the two have a substantial difference: - in Linux, strongly-ordered atomics such as atomic_add_return() affect the global ordering of _all_ memory operations, including for example READ_ONCE()/WRITE_ONCE() - in C11, sequentially consistent atomics (except for seq-cst fences) only affect the ordering of sequentially consistent operations. In particular, since relaxed loads are done with LDR on ARM, they are not ordered against seqcst stores (which are done with STLR). QEMU implements high-level synchronization primitives with the idea that the primitives contain the necessary memory barriers, and the callers can use relaxed atomics (qatomic_read/qatomic_set) or even regular accesses. This is very much incompatible with the C11 view that seqcst accesses are only ordered against other seqcst accesses, and requires using seqcst fences as in the following example: qatomic_set(&y, 1); qatomic_set(&x, 1); smp_mb(); smp_mb(); ... qatomic_read(&x) ... ... qatomic_read(&y) ... When a qatomic_*() read-modify write operation is used instead of one or both stores, developers that are more familiar with the Linux API may be tempted to omit the smp_mb(), which will work on x86 but not on ARM. This nasty difference between Linux and C11 read-modify-write operations has already caused issues in util/async.c and more are being found. Provide something similar to Linux smp_mb__before/after_atomic(); this has the double function of documenting clearly why there is a memory barrier, and avoiding a double barrier on x86 and s390x systems. The new macro can already be put to use in qatomic_mb_set(). Reviewed-by: Richard Henderson Reviewed-by: David Hildenbrand Signed-off-by: Paolo Bonzini --- docs/devel/atomics.rst | 26 +++++++++++++++++++++----- include/qemu/atomic.h | 17 ++++++++++++++++- 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/docs/devel/atomics.rst b/docs/devel/atomics.rst index 7957310071..633df65a97 100644 --- a/docs/devel/atomics.rst +++ b/docs/devel/atomics.rst @@ -27,7 +27,8 @@ provides macros that fall in three camps: - weak atomic access and manual memory barriers: ``qatomic_read()``, ``qatomic_set()``, ``smp_rmb()``, ``smp_wmb()``, ``smp_mb()``, - ``smp_mb_acquire()``, ``smp_mb_release()``, ``smp_read_barrier_depends()``; + ``smp_mb_acquire()``, ``smp_mb_release()``, ``smp_read_barrier_depends()``, + ``smp_mb__before_rmw()``, ``smp_mb__after_rmw()``; - sequentially consistent atomic access: everything else. @@ -472,7 +473,7 @@ and memory barriers, and the equivalents in QEMU: sequential consistency. - in QEMU, ``qatomic_read()`` and ``qatomic_set()`` do not participate in - the total ordering enforced by sequentially-consistent operations. + the ordering enforced by read-modify-write operations. This is because QEMU uses the C11 memory model. The following example is correct in Linux but not in QEMU: @@ -488,9 +489,24 @@ and memory barriers, and the equivalents in QEMU: because the read of ``y`` can be moved (by either the processor or the compiler) before the write of ``x``. - Fixing this requires an ``smp_mb()`` memory barrier between the write - of ``x`` and the read of ``y``. In the common case where only one thread - writes ``x``, it is also possible to write it like this: + Fixing this requires a full memory barrier between the write of ``x`` and + the read of ``y``. QEMU provides ``smp_mb__before_rmw()`` and + ``smp_mb__after_rmw()``; they act both as an optimization, + avoiding the memory barrier on processors where it is unnecessary, + and as a clarification of this corner case of the C11 memory model: + + +--------------------------------+ + | QEMU (correct) | + +================================+ + | :: | + | | + | a = qatomic_fetch_add(&x, 2);| + | smp_mb__after_rmw(); | + | b = qatomic_read(&y); | + +--------------------------------+ + + In the common case where only one thread writes ``x``, it is also possible + to write it like this: +--------------------------------+ | QEMU (correct) | diff --git a/include/qemu/atomic.h b/include/qemu/atomic.h index 874134fd19..f85834ee8b 100644 --- a/include/qemu/atomic.h +++ b/include/qemu/atomic.h @@ -245,6 +245,20 @@ #define smp_wmb() smp_mb_release() #define smp_rmb() smp_mb_acquire() +/* + * SEQ_CST is weaker than the older __sync_* builtins and Linux + * kernel read-modify-write atomics. Provide a macro to obtain + * the same semantics. + */ +#if !defined(QEMU_SANITIZE_THREAD) && \ + (defined(__i386__) || defined(__x86_64__) || defined(__s390x__)) +# define smp_mb__before_rmw() signal_barrier() +# define smp_mb__after_rmw() signal_barrier() +#else +# define smp_mb__before_rmw() smp_mb() +# define smp_mb__after_rmw() smp_mb() +#endif + /* qatomic_mb_read/set semantics map Java volatile variables. They are * less expensive on some platforms (notably POWER) than fully * sequentially consistent operations. @@ -259,7 +273,8 @@ #if !defined(QEMU_SANITIZE_THREAD) && \ (defined(__i386__) || defined(__x86_64__) || defined(__s390x__)) /* This is more efficient than a store plus a fence. */ -# define qatomic_mb_set(ptr, i) ((void)qatomic_xchg(ptr, i)) +# define qatomic_mb_set(ptr, i) \ + ({ (void)qatomic_xchg(ptr, i); smp_mb__after_rmw(); }) #else # define qatomic_mb_set(ptr, i) \ ({ qatomic_store_release(ptr, i); smp_mb(); }) From 9586a1329f5dce6c1d7f4de53cf0536644d7e593 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Thu, 2 Mar 2023 11:19:52 +0100 Subject: [PATCH 2/9] qemu-thread-posix: cleanup, fix, document QemuEvent QemuEvent is currently broken on ARM due to missing memory barriers after qatomic_*(). Apart from adding the memory barrier, a closer look reveals some unpaired memory barriers too. Document more clearly what is going on. Reviewed-by: Richard Henderson Reviewed-by: David Hildenbrand Signed-off-by: Paolo Bonzini --- util/qemu-thread-posix.c | 69 ++++++++++++++++++++++++++++------------ 1 file changed, 49 insertions(+), 20 deletions(-) diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c index 93d2505797..b2e26e2120 100644 --- a/util/qemu-thread-posix.c +++ b/util/qemu-thread-posix.c @@ -384,13 +384,21 @@ void qemu_event_destroy(QemuEvent *ev) void qemu_event_set(QemuEvent *ev) { - /* qemu_event_set has release semantics, but because it *loads* + assert(ev->initialized); + + /* + * Pairs with both qemu_event_reset() and qemu_event_wait(). + * + * qemu_event_set has release semantics, but because it *loads* * ev->value we need a full memory barrier here. */ - assert(ev->initialized); smp_mb(); if (qatomic_read(&ev->value) != EV_SET) { - if (qatomic_xchg(&ev->value, EV_SET) == EV_BUSY) { + int old = qatomic_xchg(&ev->value, EV_SET); + + /* Pairs with memory barrier in kernel futex_wait system call. */ + smp_mb__after_rmw(); + if (old == EV_BUSY) { /* There were waiters, wake them up. */ qemu_futex_wake(ev, INT_MAX); } @@ -399,18 +407,19 @@ void qemu_event_set(QemuEvent *ev) void qemu_event_reset(QemuEvent *ev) { - unsigned value; - assert(ev->initialized); - value = qatomic_read(&ev->value); - smp_mb_acquire(); - if (value == EV_SET) { - /* - * If there was a concurrent reset (or even reset+wait), - * do nothing. Otherwise change EV_SET->EV_FREE. - */ - qatomic_or(&ev->value, EV_FREE); - } + + /* + * If there was a concurrent reset (or even reset+wait), + * do nothing. Otherwise change EV_SET->EV_FREE. + */ + qatomic_or(&ev->value, EV_FREE); + + /* + * Order reset before checking the condition in the caller. + * Pairs with the first memory barrier in qemu_event_set(). + */ + smp_mb__after_rmw(); } void qemu_event_wait(QemuEvent *ev) @@ -418,20 +427,40 @@ void qemu_event_wait(QemuEvent *ev) unsigned value; assert(ev->initialized); - value = qatomic_read(&ev->value); - smp_mb_acquire(); + + /* + * qemu_event_wait must synchronize with qemu_event_set even if it does + * not go down the slow path, so this load-acquire is needed that + * synchronizes with the first memory barrier in qemu_event_set(). + * + * If we do go down the slow path, there is no requirement at all: we + * might miss a qemu_event_set() here but ultimately the memory barrier in + * qemu_futex_wait() will ensure the check is done correctly. + */ + value = qatomic_load_acquire(&ev->value); if (value != EV_SET) { if (value == EV_FREE) { /* - * Leave the event reset and tell qemu_event_set that there - * are waiters. No need to retry, because there cannot be - * a concurrent busy->free transition. After the CAS, the - * event will be either set or busy. + * Leave the event reset and tell qemu_event_set that there are + * waiters. No need to retry, because there cannot be a concurrent + * busy->free transition. After the CAS, the event will be either + * set or busy. + * + * This cmpxchg doesn't have particular ordering requirements if it + * succeeds (moving the store earlier can only cause qemu_event_set() + * to issue _more_ wakeups), the failing case needs acquire semantics + * like the load above. */ if (qatomic_cmpxchg(&ev->value, EV_FREE, EV_BUSY) == EV_SET) { return; } } + + /* + * This is the final check for a concurrent set, so it does need + * a smp_mb() pairing with the second barrier of qemu_event_set(). + * The barrier is inside the FUTEX_WAIT system call. + */ qemu_futex_wait(ev, EV_BUSY); } } From 6c5df4b48f0c52a61342ecb307a43f4c2a3565c4 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Thu, 2 Mar 2023 11:22:50 +0100 Subject: [PATCH 3/9] qemu-thread-win32: cleanup, fix, document QemuEvent QemuEvent is currently broken on ARM due to missing memory barriers after qatomic_*(). Apart from adding the memory barrier, a closer look reveals some unpaired memory barriers that are not really needed and complicated the functions unnecessarily. Also, it is relying on a memory barrier in ResetEvent(); the barrier _ought_ to be there but there is really no documentation about it, so make it explicit. Reviewed-by: Richard Henderson Reviewed-by: David Hildenbrand Signed-off-by: Paolo Bonzini --- util/qemu-thread-win32.c | 82 +++++++++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 26 deletions(-) diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c index 69db254ac7..a7fe3cc345 100644 --- a/util/qemu-thread-win32.c +++ b/util/qemu-thread-win32.c @@ -272,12 +272,20 @@ void qemu_event_destroy(QemuEvent *ev) void qemu_event_set(QemuEvent *ev) { assert(ev->initialized); - /* qemu_event_set has release semantics, but because it *loads* + + /* + * Pairs with both qemu_event_reset() and qemu_event_wait(). + * + * qemu_event_set has release semantics, but because it *loads* * ev->value we need a full memory barrier here. */ smp_mb(); if (qatomic_read(&ev->value) != EV_SET) { - if (qatomic_xchg(&ev->value, EV_SET) == EV_BUSY) { + int old = qatomic_xchg(&ev->value, EV_SET); + + /* Pairs with memory barrier after ResetEvent. */ + smp_mb__after_rmw(); + if (old == EV_BUSY) { /* There were waiters, wake them up. */ SetEvent(ev->event); } @@ -286,17 +294,19 @@ void qemu_event_set(QemuEvent *ev) void qemu_event_reset(QemuEvent *ev) { - unsigned value; - assert(ev->initialized); - value = qatomic_read(&ev->value); - smp_mb_acquire(); - if (value == EV_SET) { - /* If there was a concurrent reset (or even reset+wait), - * do nothing. Otherwise change EV_SET->EV_FREE. - */ - qatomic_or(&ev->value, EV_FREE); - } + + /* + * If there was a concurrent reset (or even reset+wait), + * do nothing. Otherwise change EV_SET->EV_FREE. + */ + qatomic_or(&ev->value, EV_FREE); + + /* + * Order reset before checking the condition in the caller. + * Pairs with the first memory barrier in qemu_event_set(). + */ + smp_mb__after_rmw(); } void qemu_event_wait(QemuEvent *ev) @@ -304,29 +314,49 @@ void qemu_event_wait(QemuEvent *ev) unsigned value; assert(ev->initialized); - value = qatomic_read(&ev->value); - smp_mb_acquire(); + + /* + * qemu_event_wait must synchronize with qemu_event_set even if it does + * not go down the slow path, so this load-acquire is needed that + * synchronizes with the first memory barrier in qemu_event_set(). + * + * If we do go down the slow path, there is no requirement at all: we + * might miss a qemu_event_set() here but ultimately the memory barrier in + * qemu_futex_wait() will ensure the check is done correctly. + */ + value = qatomic_load_acquire(&ev->value); if (value != EV_SET) { if (value == EV_FREE) { - /* qemu_event_set is not yet going to call SetEvent, but we are - * going to do another check for EV_SET below when setting EV_BUSY. - * At that point it is safe to call WaitForSingleObject. + /* + * Here the underlying kernel event is reset, but qemu_event_set is + * not yet going to call SetEvent. However, there will be another + * check for EV_SET below when setting EV_BUSY. At that point it + * is safe to call WaitForSingleObject. */ ResetEvent(ev->event); - /* Tell qemu_event_set that there are waiters. No need to retry - * because there cannot be a concurrent busy->free transition. - * After the CAS, the event will be either set or busy. + /* + * It is not clear whether ResetEvent provides this barrier; kernel + * APIs (KeResetEvent/KeClearEvent) do not. Better safe than sorry! + */ + smp_mb(); + + /* + * Leave the event reset and tell qemu_event_set that there are + * waiters. No need to retry, because there cannot be a concurrent + * busy->free transition. After the CAS, the event will be either + * set or busy. */ if (qatomic_cmpxchg(&ev->value, EV_FREE, EV_BUSY) == EV_SET) { - value = EV_SET; - } else { - value = EV_BUSY; + return; } } - if (value == EV_BUSY) { - WaitForSingleObject(ev->event, INFINITE); - } + + /* + * ev->value is now EV_BUSY. Since we didn't observe EV_SET, + * qemu_event_set() must observe EV_BUSY and call SetEvent(). + */ + WaitForSingleObject(ev->event, INFINITE); } } From 2482aeea4195ad84cf3d4e5b15b28ec5b420ed5a Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Thu, 2 Mar 2023 11:16:13 +0100 Subject: [PATCH 4/9] edu: add smp_mb__after_rmw() Ensure ordering between clearing the COMPUTING flag and checking IRQFACT, and between setting the IRQFACT flag and checking COMPUTING. This ensures that no wakeups are lost. Reviewed-by: Richard Henderson Reviewed-by: David Hildenbrand Signed-off-by: Paolo Bonzini --- hw/misc/edu.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/hw/misc/edu.c b/hw/misc/edu.c index e935c418d4..a1f8bc77e7 100644 --- a/hw/misc/edu.c +++ b/hw/misc/edu.c @@ -267,6 +267,8 @@ static void edu_mmio_write(void *opaque, hwaddr addr, uint64_t val, case 0x20: if (val & EDU_STATUS_IRQFACT) { qatomic_or(&edu->status, EDU_STATUS_IRQFACT); + /* Order check of the COMPUTING flag after setting IRQFACT. */ + smp_mb__after_rmw(); } else { qatomic_and(&edu->status, ~EDU_STATUS_IRQFACT); } @@ -349,6 +351,9 @@ static void *edu_fact_thread(void *opaque) qemu_mutex_unlock(&edu->thr_mutex); qatomic_and(&edu->status, ~EDU_STATUS_COMPUTING); + /* Clear COMPUTING flag before checking IRQFACT. */ + smp_mb__after_rmw(); + if (qatomic_read(&edu->status) & EDU_STATUS_IRQFACT) { qemu_mutex_lock_iothread(); edu_raise_irq(edu, FACT_IRQ); From b532526a07ef3b903ead2e055fe6cc87b41057a3 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Fri, 3 Mar 2023 11:03:52 +0100 Subject: [PATCH 5/9] aio-wait: switch to smp_mb__after_rmw() The barrier comes after an atomic increment, so it is enough to use smp_mb__after_rmw(); this avoids a double barrier on x86 systems. Reviewed-by: Richard Henderson Reviewed-by: David Hildenbrand Signed-off-by: Paolo Bonzini --- include/block/aio-wait.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/block/aio-wait.h b/include/block/aio-wait.h index dd9a7f6461..da13357bb8 100644 --- a/include/block/aio-wait.h +++ b/include/block/aio-wait.h @@ -85,7 +85,7 @@ extern AioWait global_aio_wait; /* Increment wait_->num_waiters before evaluating cond. */ \ qatomic_inc(&wait_->num_waiters); \ /* Paired with smp_mb in aio_wait_kick(). */ \ - smp_mb(); \ + smp_mb__after_rmw(); \ if (ctx_ && in_aio_context_home_thread(ctx_)) { \ while ((cond)) { \ aio_poll(ctx_, true); \ From e3a3b6ec8169eab2feb241b4982585001512cd55 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Fri, 3 Mar 2023 10:52:59 +0100 Subject: [PATCH 6/9] qemu-coroutine-lock: add smp_mb__after_rmw() mutex->from_push and mutex->handoff in qemu-coroutine-lock implement the familiar pattern: write a write b smp_mb() smp_mb() read b read a The memory barrier is required by the C memory model even after a SEQ_CST read-modify-write operation such as QSLIST_INSERT_HEAD_ATOMIC. Add it and avoid the unclear qatomic_mb_read() operation. Reviewed-by: Richard Henderson Reviewed-by: David Hildenbrand Signed-off-by: Paolo Bonzini --- util/qemu-coroutine-lock.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c index 58f3f77181..84a50a9e91 100644 --- a/util/qemu-coroutine-lock.c +++ b/util/qemu-coroutine-lock.c @@ -201,10 +201,16 @@ static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx, trace_qemu_co_mutex_lock_entry(mutex, self); push_waiter(mutex, &w); + /* + * Add waiter before reading mutex->handoff. Pairs with qatomic_mb_set + * in qemu_co_mutex_unlock. + */ + smp_mb__after_rmw(); + /* This is the "Responsibility Hand-Off" protocol; a lock() picks from * a concurrent unlock() the responsibility of waking somebody up. */ - old_handoff = qatomic_mb_read(&mutex->handoff); + old_handoff = qatomic_read(&mutex->handoff); if (old_handoff && has_waiters(mutex) && qatomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) { @@ -303,6 +309,7 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) } our_handoff = mutex->sequence; + /* Set handoff before checking for waiters. */ qatomic_mb_set(&mutex->handoff, our_handoff); if (!has_waiters(mutex)) { /* The concurrent lock has not added itself yet, so it From 33828ca11da08436e1b32f3e79dabce3061a0427 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Fri, 3 Mar 2023 14:36:32 +0100 Subject: [PATCH 7/9] physmem: add missing memory barrier Reviewed-by: Richard Henderson Reviewed-by: David Hildenbrand Signed-off-by: Paolo Bonzini --- softmmu/physmem.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/softmmu/physmem.c b/softmmu/physmem.c index 47143edb4f..a6efd8e8dd 100644 --- a/softmmu/physmem.c +++ b/softmmu/physmem.c @@ -2927,6 +2927,8 @@ void cpu_register_map_client(QEMUBH *bh) qemu_mutex_lock(&map_client_list_lock); client->bh = bh; QLIST_INSERT_HEAD(&map_client_list, client, link); + /* Write map_client_list before reading in_use. */ + smp_mb(); if (!qatomic_read(&bounce.in_use)) { cpu_notify_map_clients_locked(); } @@ -3116,6 +3118,7 @@ void address_space_unmap(AddressSpace *as, void *buffer, hwaddr len, qemu_vfree(bounce.buffer); bounce.buffer = NULL; memory_region_unref(bounce.mr); + /* Clear in_use before reading map_client_list. */ qatomic_mb_set(&bounce.in_use, false); cpu_notify_map_clients(); } From 8dd48650b43dfde4ebea34191ac267e474bcc29e Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 6 Mar 2023 10:15:06 +0100 Subject: [PATCH 8/9] async: update documentation of the memory barriers Ever since commit 8c6b0356b539 ("util/async: make bh_aio_poll() O(1)", 2020-02-22), synchronization between qemu_bh_schedule() and aio_bh_poll() is happening when the bottom half is enqueued in the bh_list; not when the flags are set. Update the documentation to match. Reviewed-by: Stefan Hajnoczi Signed-off-by: Paolo Bonzini --- util/async.c | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/util/async.c b/util/async.c index 0657b75397..e4b494150e 100644 --- a/util/async.c +++ b/util/async.c @@ -74,14 +74,21 @@ static void aio_bh_enqueue(QEMUBH *bh, unsigned new_flags) unsigned old_flags; /* - * The memory barrier implicit in qatomic_fetch_or makes sure that: - * 1. idle & any writes needed by the callback are done before the - * locations are read in the aio_bh_poll. - * 2. ctx is loaded before the callback has a chance to execute and bh - * could be freed. + * Synchronizes with atomic_fetch_and() in aio_bh_dequeue(), ensuring that + * insertion starts after BH_PENDING is set. */ old_flags = qatomic_fetch_or(&bh->flags, BH_PENDING | new_flags); + if (!(old_flags & BH_PENDING)) { + /* + * At this point the bottom half becomes visible to aio_bh_poll(). + * This insertion thus synchronizes with QSLIST_MOVE_ATOMIC in + * aio_bh_poll(), ensuring that: + * 1. any writes needed by the callback are visible from the callback + * after aio_bh_dequeue() returns bh. + * 2. ctx is loaded before the callback has a chance to execute and bh + * could be freed. + */ QSLIST_INSERT_HEAD_ATOMIC(&ctx->bh_list, bh, next); } @@ -107,11 +114,8 @@ static QEMUBH *aio_bh_dequeue(BHList *head, unsigned *flags) QSLIST_REMOVE_HEAD(head, next); /* - * The qatomic_and is paired with aio_bh_enqueue(). The implicit memory - * barrier ensures that the callback sees all writes done by the scheduling - * thread. It also ensures that the scheduling thread sees the cleared - * flag before bh->cb has run, and thus will call aio_notify again if - * necessary. + * Synchronizes with qatomic_fetch_or() in aio_bh_enqueue(), ensuring that + * the removal finishes before BH_PENDING is reset. */ *flags = qatomic_fetch_and(&bh->flags, ~(BH_PENDING | BH_SCHEDULED | BH_IDLE)); @@ -158,6 +162,7 @@ int aio_bh_poll(AioContext *ctx) BHListSlice *s; int ret = 0; + /* Synchronizes with QSLIST_INSERT_HEAD_ATOMIC in aio_bh_enqueue(). */ QSLIST_MOVE_ATOMIC(&slice.bh_list, &ctx->bh_list); QSIMPLEQ_INSERT_TAIL(&ctx->bh_slice_list, &slice, next); @@ -448,15 +453,15 @@ LuringState *aio_get_linux_io_uring(AioContext *ctx) void aio_notify(AioContext *ctx) { /* - * Write e.g. bh->flags before writing ctx->notified. Pairs with smp_mb in - * aio_notify_accept. + * Write e.g. ctx->bh_list before writing ctx->notified. Pairs with + * smp_mb() in aio_notify_accept(). */ smp_wmb(); qatomic_set(&ctx->notified, true); /* - * Write ctx->notified before reading ctx->notify_me. Pairs - * with smp_mb in aio_ctx_prepare or aio_poll. + * Write ctx->notified (and also ctx->bh_list) before reading ctx->notify_me. + * Pairs with smp_mb() in aio_ctx_prepare or aio_poll. */ smp_mb(); if (qatomic_read(&ctx->notify_me)) { From 6229438cca037d42f44a96d38feb15cb102a444f Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 6 Mar 2023 10:43:52 +0100 Subject: [PATCH 9/9] async: clarify usage of barriers in the polling case Explain that aio_context_notifier_poll() relies on aio_notify_accept() to catch all the memory writes that were done before ctx->notified was set to true. Reviewed-by: Richard Henderson Reviewed-by: Stefan Hajnoczi Signed-off-by: Paolo Bonzini --- util/async.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/util/async.c b/util/async.c index e4b494150e..21016a1ac7 100644 --- a/util/async.c +++ b/util/async.c @@ -474,8 +474,9 @@ void aio_notify_accept(AioContext *ctx) qatomic_set(&ctx->notified, false); /* - * Write ctx->notified before reading e.g. bh->flags. Pairs with smp_wmb - * in aio_notify. + * Order reads of ctx->notified (in aio_context_notifier_poll()) and the + * above clearing of ctx->notified before reads of e.g. bh->flags. Pairs + * with smp_wmb() in aio_notify. */ smp_mb(); } @@ -498,6 +499,11 @@ static bool aio_context_notifier_poll(void *opaque) EventNotifier *e = opaque; AioContext *ctx = container_of(e, AioContext, notifier); + /* + * No need for load-acquire because we just want to kick the + * event loop. aio_notify_accept() takes care of synchronizing + * the event loop with the producers. + */ return qatomic_read(&ctx->notified); }