Block layer patches
- Protect BlockBackend.queued_requests with its own lock - Switch to AIO_WAIT_WHILE_UNLOCKED() where possible - AioContext removal: LinuxAioState/LuringState/ThreadPool - Add more coroutine_fn annotations, use bdrv/blk_co_* - Fix crash when execute hmp_commit -----BEGIN PGP SIGNATURE----- iQJFBAABCAAvFiEE3D3rFZqa+V09dFb+fwmycsiPL9YFAmRH0b0RHGt3b2xmQHJl ZGhhdC5jb20ACgkQfwmycsiPL9Y0yw/6A/vzA4TGgFUP3WIvH/sQri4/V3gyR+PT u3hOQUCYZ99nioTpKV91TSuUPuU/Mdspy/0NKM+K92yIXqxa9172A2zLOsGOu21l qKpse+nBf1zqEgB8YzUHyCBdetPz916C/f9RS26SNUCW85GCHYGHA3u7nKvWLMyV oKIoTlA8QOglOuEKlRoYh7hCFm7ET51NOSEftm8GsYbsW/I2Vzl8a1SHN1lHufjd We3+898zUrmFqNMp6Rjdhn+yZmmoGzoZqV4YQi83z7xjiv+Ms4VHVVW7X8d20xRX 5BLFiLHAuZ/1d26HyVhgBUr7KHyf94odocz8BylWKXGl5SXMCZun1Td1vgVKlGK+ GRxzB2cWGWqzC2UmqSTc0Z0aIWbXukKwvcX76uBKsQZ+kB2A7jFobxHiaoQEDJ8B WRNEMH2+CqCAu9rsrNRinnJKhT2nXcr9F9YfwRIlagdAePGWin+EUW8huf14dDBm Z2Y34aKW4RQibF8xirMHeRBbOLmcq2VpKLKwNfBHUDgZB8iuD7bLn4n9nwWXMG1w zgNsTybkv46vLPamTpEaUoNTHfuRDTAuE7Z7lkcc7jF41Z0V1DC/DCCWcL/0LvhP GIxFdkYug3hetdF2U/OZhUoEfxvkqcuBnrr55LFzqheKEllQpPwPpt7UF0aH8bg3 i/YpjHsf3xU= =mpYX -----END PGP SIGNATURE----- Merge tag 'for-upstream' of https://repo.or.cz/qemu/kevin into staging Block layer patches - Protect BlockBackend.queued_requests with its own lock - Switch to AIO_WAIT_WHILE_UNLOCKED() where possible - AioContext removal: LinuxAioState/LuringState/ThreadPool - Add more coroutine_fn annotations, use bdrv/blk_co_* - Fix crash when execute hmp_commit # -----BEGIN PGP SIGNATURE----- # # iQJFBAABCAAvFiEE3D3rFZqa+V09dFb+fwmycsiPL9YFAmRH0b0RHGt3b2xmQHJl # ZGhhdC5jb20ACgkQfwmycsiPL9Y0yw/6A/vzA4TGgFUP3WIvH/sQri4/V3gyR+PT # u3hOQUCYZ99nioTpKV91TSuUPuU/Mdspy/0NKM+K92yIXqxa9172A2zLOsGOu21l # qKpse+nBf1zqEgB8YzUHyCBdetPz916C/f9RS26SNUCW85GCHYGHA3u7nKvWLMyV # oKIoTlA8QOglOuEKlRoYh7hCFm7ET51NOSEftm8GsYbsW/I2Vzl8a1SHN1lHufjd # We3+898zUrmFqNMp6Rjdhn+yZmmoGzoZqV4YQi83z7xjiv+Ms4VHVVW7X8d20xRX # 5BLFiLHAuZ/1d26HyVhgBUr7KHyf94odocz8BylWKXGl5SXMCZun1Td1vgVKlGK+ # GRxzB2cWGWqzC2UmqSTc0Z0aIWbXukKwvcX76uBKsQZ+kB2A7jFobxHiaoQEDJ8B # WRNEMH2+CqCAu9rsrNRinnJKhT2nXcr9F9YfwRIlagdAePGWin+EUW8huf14dDBm # Z2Y34aKW4RQibF8xirMHeRBbOLmcq2VpKLKwNfBHUDgZB8iuD7bLn4n9nwWXMG1w # zgNsTybkv46vLPamTpEaUoNTHfuRDTAuE7Z7lkcc7jF41Z0V1DC/DCCWcL/0LvhP # GIxFdkYug3hetdF2U/OZhUoEfxvkqcuBnrr55LFzqheKEllQpPwPpt7UF0aH8bg3 # i/YpjHsf3xU= # =mpYX # -----END PGP SIGNATURE----- # gpg: Signature made Tue 25 Apr 2023 02:12:29 PM BST # gpg: using RSA key DC3DEB159A9AF95D3D7456FE7F09B272C88F2FD6 # gpg: issuer "kwolf@redhat.com" # gpg: Good signature from "Kevin Wolf <kwolf@redhat.com>" [full] * tag 'for-upstream' of https://repo.or.cz/qemu/kevin: (25 commits) block/monitor: Fix crash when executing HMP commit vmdk: make vmdk_is_cid_valid a coroutine_fn qcow2: mark various functions as coroutine_fn and GRAPH_RDLOCK tests: mark more coroutine_fns qemu-pr-helper: mark more coroutine_fns 9pfs: mark more coroutine_fns nbd: mark more coroutine_fns, do not use co_wrappers mirror: make mirror_flush a coroutine_fn, do not use co_wrappers blkdebug: add missing coroutine_fn annotation vvfat: mark various functions as coroutine_fn thread-pool: avoid passing the pool parameter every time thread-pool: use ThreadPool from the running thread io_uring: use LuringState from the running thread linux-aio: use LinuxAioState from the running thread block: add missing coroutine_fn to bdrv_sum_allocated_file_size() include/block: fixup typos monitor: convert monitor_cleanup() to AIO_WAIT_WHILE_UNLOCKED() hmp: convert handle_hmp_command() to AIO_WAIT_WHILE_UNLOCKED() block: convert bdrv_drain_all_begin() to AIO_WAIT_WHILE_UNLOCKED() block: convert bdrv_graph_wrlock() to AIO_WAIT_WHILE_UNLOCKED() ... Signed-off-by: Richard Henderson <richard.henderson@linaro.org>
This commit is contained in:
commit
4d1467a568
@ -100,8 +100,6 @@ bool tpm_backend_had_startup_error(TPMBackend *s)
|
||||
|
||||
void tpm_backend_deliver_request(TPMBackend *s, TPMBackendCmd *cmd)
|
||||
{
|
||||
ThreadPool *pool = aio_get_thread_pool(qemu_get_aio_context());
|
||||
|
||||
if (s->cmd != NULL) {
|
||||
error_report("There is a TPM request pending");
|
||||
return;
|
||||
@ -109,7 +107,7 @@ void tpm_backend_deliver_request(TPMBackend *s, TPMBackendCmd *cmd)
|
||||
|
||||
s->cmd = cmd;
|
||||
object_ref(OBJECT(s));
|
||||
thread_pool_submit_aio(pool, tpm_backend_worker_thread, s,
|
||||
thread_pool_submit_aio(tpm_backend_worker_thread, s,
|
||||
tpm_backend_request_completed, s);
|
||||
}
|
||||
|
||||
|
2
block.c
2
block.c
@ -5750,7 +5750,7 @@ exit:
|
||||
* sums the size of all data-bearing children. (This excludes backing
|
||||
* children.)
|
||||
*/
|
||||
static int64_t bdrv_sum_allocated_file_size(BlockDriverState *bs)
|
||||
static int64_t coroutine_fn bdrv_sum_allocated_file_size(BlockDriverState *bs)
|
||||
{
|
||||
BdrvChild *child;
|
||||
int64_t child_size, sum = 0;
|
||||
|
@ -583,8 +583,8 @@ out:
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int rule_check(BlockDriverState *bs, uint64_t offset, uint64_t bytes,
|
||||
BlkdebugIOType iotype)
|
||||
static int coroutine_fn rule_check(BlockDriverState *bs, uint64_t offset,
|
||||
uint64_t bytes, BlkdebugIOType iotype)
|
||||
{
|
||||
BDRVBlkdebugState *s = bs->opaque;
|
||||
BlkdebugRule *rule = NULL;
|
||||
|
@ -80,9 +80,10 @@ struct BlockBackend {
|
||||
NotifierList remove_bs_notifiers, insert_bs_notifiers;
|
||||
QLIST_HEAD(, BlockBackendAioNotifier) aio_notifiers;
|
||||
|
||||
int quiesce_counter;
|
||||
int quiesce_counter; /* atomic: written under BQL, read by other threads */
|
||||
QemuMutex queued_requests_lock; /* protects queued_requests */
|
||||
CoQueue queued_requests;
|
||||
bool disable_request_queuing;
|
||||
bool disable_request_queuing; /* atomic */
|
||||
|
||||
VMChangeStateEntry *vmsh;
|
||||
bool force_allow_inactivate;
|
||||
@ -368,6 +369,7 @@ BlockBackend *blk_new(AioContext *ctx, uint64_t perm, uint64_t shared_perm)
|
||||
|
||||
block_acct_init(&blk->stats);
|
||||
|
||||
qemu_mutex_init(&blk->queued_requests_lock);
|
||||
qemu_co_queue_init(&blk->queued_requests);
|
||||
notifier_list_init(&blk->remove_bs_notifiers);
|
||||
notifier_list_init(&blk->insert_bs_notifiers);
|
||||
@ -485,6 +487,8 @@ static void blk_delete(BlockBackend *blk)
|
||||
assert(QLIST_EMPTY(&blk->remove_bs_notifiers.notifiers));
|
||||
assert(QLIST_EMPTY(&blk->insert_bs_notifiers.notifiers));
|
||||
assert(QLIST_EMPTY(&blk->aio_notifiers));
|
||||
assert(qemu_co_queue_empty(&blk->queued_requests));
|
||||
qemu_mutex_destroy(&blk->queued_requests_lock);
|
||||
QTAILQ_REMOVE(&block_backends, blk, link);
|
||||
drive_info_del(blk->legacy_dinfo);
|
||||
block_acct_cleanup(&blk->stats);
|
||||
@ -1057,7 +1061,7 @@ void blk_set_dev_ops(BlockBackend *blk, const BlockDevOps *ops,
|
||||
blk->dev_opaque = opaque;
|
||||
|
||||
/* Are we currently quiesced? Should we enforce this right now? */
|
||||
if (blk->quiesce_counter && ops && ops->drained_begin) {
|
||||
if (qatomic_read(&blk->quiesce_counter) && ops && ops->drained_begin) {
|
||||
ops->drained_begin(opaque);
|
||||
}
|
||||
}
|
||||
@ -1232,7 +1236,7 @@ void blk_set_allow_aio_context_change(BlockBackend *blk, bool allow)
|
||||
void blk_set_disable_request_queuing(BlockBackend *blk, bool disable)
|
||||
{
|
||||
IO_CODE();
|
||||
blk->disable_request_queuing = disable;
|
||||
qatomic_set(&blk->disable_request_queuing, disable);
|
||||
}
|
||||
|
||||
static int coroutine_fn GRAPH_RDLOCK
|
||||
@ -1271,10 +1275,18 @@ static void coroutine_fn blk_wait_while_drained(BlockBackend *blk)
|
||||
{
|
||||
assert(blk->in_flight > 0);
|
||||
|
||||
if (blk->quiesce_counter && !blk->disable_request_queuing) {
|
||||
if (qatomic_read(&blk->quiesce_counter) &&
|
||||
!qatomic_read(&blk->disable_request_queuing)) {
|
||||
/*
|
||||
* Take lock before decrementing in flight counter so main loop thread
|
||||
* waits for us to enqueue ourselves before it can leave the drained
|
||||
* section.
|
||||
*/
|
||||
qemu_mutex_lock(&blk->queued_requests_lock);
|
||||
blk_dec_in_flight(blk);
|
||||
qemu_co_queue_wait(&blk->queued_requests, NULL);
|
||||
qemu_co_queue_wait(&blk->queued_requests, &blk->queued_requests_lock);
|
||||
blk_inc_in_flight(blk);
|
||||
qemu_mutex_unlock(&blk->queued_requests_lock);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1862,14 +1874,8 @@ void blk_drain_all(void)
|
||||
bdrv_drain_all_begin();
|
||||
|
||||
while ((blk = blk_all_next(blk)) != NULL) {
|
||||
AioContext *ctx = blk_get_aio_context(blk);
|
||||
|
||||
aio_context_acquire(ctx);
|
||||
|
||||
/* We may have -ENOMEDIUM completions in flight */
|
||||
AIO_WAIT_WHILE(ctx, qatomic_read(&blk->in_flight) > 0);
|
||||
|
||||
aio_context_release(ctx);
|
||||
AIO_WAIT_WHILE_UNLOCKED(NULL, qatomic_read(&blk->in_flight) > 0);
|
||||
}
|
||||
|
||||
bdrv_drain_all_end();
|
||||
@ -2595,7 +2601,7 @@ static void blk_root_drained_begin(BdrvChild *child)
|
||||
BlockBackend *blk = child->opaque;
|
||||
ThrottleGroupMember *tgm = &blk->public.throttle_group_member;
|
||||
|
||||
if (++blk->quiesce_counter == 1) {
|
||||
if (qatomic_fetch_inc(&blk->quiesce_counter) == 0) {
|
||||
if (blk->dev_ops && blk->dev_ops->drained_begin) {
|
||||
blk->dev_ops->drained_begin(blk->dev_opaque);
|
||||
}
|
||||
@ -2613,7 +2619,7 @@ static bool blk_root_drained_poll(BdrvChild *child)
|
||||
{
|
||||
BlockBackend *blk = child->opaque;
|
||||
bool busy = false;
|
||||
assert(blk->quiesce_counter);
|
||||
assert(qatomic_read(&blk->quiesce_counter));
|
||||
|
||||
if (blk->dev_ops && blk->dev_ops->drained_poll) {
|
||||
busy = blk->dev_ops->drained_poll(blk->dev_opaque);
|
||||
@ -2624,18 +2630,21 @@ static bool blk_root_drained_poll(BdrvChild *child)
|
||||
static void blk_root_drained_end(BdrvChild *child)
|
||||
{
|
||||
BlockBackend *blk = child->opaque;
|
||||
assert(blk->quiesce_counter);
|
||||
assert(qatomic_read(&blk->quiesce_counter));
|
||||
|
||||
assert(blk->public.throttle_group_member.io_limits_disabled);
|
||||
qatomic_dec(&blk->public.throttle_group_member.io_limits_disabled);
|
||||
|
||||
if (--blk->quiesce_counter == 0) {
|
||||
if (qatomic_fetch_dec(&blk->quiesce_counter) == 1) {
|
||||
if (blk->dev_ops && blk->dev_ops->drained_end) {
|
||||
blk->dev_ops->drained_end(blk->dev_opaque);
|
||||
}
|
||||
while (qemu_co_enter_next(&blk->queued_requests, NULL)) {
|
||||
qemu_mutex_lock(&blk->queued_requests_lock);
|
||||
while (qemu_co_enter_next(&blk->queued_requests,
|
||||
&blk->queued_requests_lock)) {
|
||||
/* Resume all queued requests */
|
||||
}
|
||||
qemu_mutex_unlock(&blk->queued_requests_lock);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -306,7 +306,7 @@ void blk_exp_close_all_type(BlockExportType type)
|
||||
blk_exp_request_shutdown(exp);
|
||||
}
|
||||
|
||||
AIO_WAIT_WHILE(NULL, blk_exp_has_type(type));
|
||||
AIO_WAIT_WHILE_UNLOCKED(NULL, blk_exp_has_type(type));
|
||||
}
|
||||
|
||||
void blk_exp_close_all(void)
|
||||
|
@ -2040,12 +2040,9 @@ out:
|
||||
return result;
|
||||
}
|
||||
|
||||
static int coroutine_fn raw_thread_pool_submit(BlockDriverState *bs,
|
||||
ThreadPoolFunc func, void *arg)
|
||||
static int coroutine_fn raw_thread_pool_submit(ThreadPoolFunc func, void *arg)
|
||||
{
|
||||
/* @bs can be NULL, bdrv_get_aio_context() returns the main context then */
|
||||
ThreadPool *pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
|
||||
return thread_pool_submit_co(pool, func, arg);
|
||||
return thread_pool_submit_co(func, arg);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -2089,16 +2086,13 @@ static int coroutine_fn raw_co_prw(BlockDriverState *bs, uint64_t offset,
|
||||
type |= QEMU_AIO_MISALIGNED;
|
||||
#ifdef CONFIG_LINUX_IO_URING
|
||||
} else if (s->use_linux_io_uring) {
|
||||
LuringState *aio = aio_get_linux_io_uring(bdrv_get_aio_context(bs));
|
||||
assert(qiov->size == bytes);
|
||||
return luring_co_submit(bs, aio, s->fd, offset, qiov, type);
|
||||
return luring_co_submit(bs, s->fd, offset, qiov, type);
|
||||
#endif
|
||||
#ifdef CONFIG_LINUX_AIO
|
||||
} else if (s->use_linux_aio) {
|
||||
LinuxAioState *aio = aio_get_linux_aio(bdrv_get_aio_context(bs));
|
||||
assert(qiov->size == bytes);
|
||||
return laio_co_submit(bs, aio, s->fd, offset, qiov, type,
|
||||
s->aio_max_batch);
|
||||
return laio_co_submit(s->fd, offset, qiov, type, s->aio_max_batch);
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -2115,7 +2109,7 @@ static int coroutine_fn raw_co_prw(BlockDriverState *bs, uint64_t offset,
|
||||
};
|
||||
|
||||
assert(qiov->size == bytes);
|
||||
return raw_thread_pool_submit(bs, handle_aiocb_rw, &acb);
|
||||
return raw_thread_pool_submit(handle_aiocb_rw, &acb);
|
||||
}
|
||||
|
||||
static int coroutine_fn raw_co_preadv(BlockDriverState *bs, int64_t offset,
|
||||
@ -2137,14 +2131,12 @@ static void coroutine_fn raw_co_io_plug(BlockDriverState *bs)
|
||||
BDRVRawState __attribute__((unused)) *s = bs->opaque;
|
||||
#ifdef CONFIG_LINUX_AIO
|
||||
if (s->use_linux_aio) {
|
||||
LinuxAioState *aio = aio_get_linux_aio(bdrv_get_aio_context(bs));
|
||||
laio_io_plug(bs, aio);
|
||||
laio_io_plug();
|
||||
}
|
||||
#endif
|
||||
#ifdef CONFIG_LINUX_IO_URING
|
||||
if (s->use_linux_io_uring) {
|
||||
LuringState *aio = aio_get_linux_io_uring(bdrv_get_aio_context(bs));
|
||||
luring_io_plug(bs, aio);
|
||||
luring_io_plug();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
@ -2154,14 +2146,12 @@ static void coroutine_fn raw_co_io_unplug(BlockDriverState *bs)
|
||||
BDRVRawState __attribute__((unused)) *s = bs->opaque;
|
||||
#ifdef CONFIG_LINUX_AIO
|
||||
if (s->use_linux_aio) {
|
||||
LinuxAioState *aio = aio_get_linux_aio(bdrv_get_aio_context(bs));
|
||||
laio_io_unplug(bs, aio, s->aio_max_batch);
|
||||
laio_io_unplug(s->aio_max_batch);
|
||||
}
|
||||
#endif
|
||||
#ifdef CONFIG_LINUX_IO_URING
|
||||
if (s->use_linux_io_uring) {
|
||||
LuringState *aio = aio_get_linux_io_uring(bdrv_get_aio_context(bs));
|
||||
luring_io_unplug(bs, aio);
|
||||
luring_io_unplug();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
@ -2185,11 +2175,10 @@ static int coroutine_fn raw_co_flush_to_disk(BlockDriverState *bs)
|
||||
|
||||
#ifdef CONFIG_LINUX_IO_URING
|
||||
if (s->use_linux_io_uring) {
|
||||
LuringState *aio = aio_get_linux_io_uring(bdrv_get_aio_context(bs));
|
||||
return luring_co_submit(bs, aio, s->fd, 0, NULL, QEMU_AIO_FLUSH);
|
||||
return luring_co_submit(bs, s->fd, 0, NULL, QEMU_AIO_FLUSH);
|
||||
}
|
||||
#endif
|
||||
return raw_thread_pool_submit(bs, handle_aiocb_flush, &acb);
|
||||
return raw_thread_pool_submit(handle_aiocb_flush, &acb);
|
||||
}
|
||||
|
||||
static void raw_aio_attach_aio_context(BlockDriverState *bs,
|
||||
@ -2251,7 +2240,7 @@ raw_regular_truncate(BlockDriverState *bs, int fd, int64_t offset,
|
||||
},
|
||||
};
|
||||
|
||||
return raw_thread_pool_submit(bs, handle_aiocb_truncate, &acb);
|
||||
return raw_thread_pool_submit(handle_aiocb_truncate, &acb);
|
||||
}
|
||||
|
||||
static int coroutine_fn raw_co_truncate(BlockDriverState *bs, int64_t offset,
|
||||
@ -3000,7 +2989,7 @@ raw_do_pdiscard(BlockDriverState *bs, int64_t offset, int64_t bytes,
|
||||
acb.aio_type |= QEMU_AIO_BLKDEV;
|
||||
}
|
||||
|
||||
ret = raw_thread_pool_submit(bs, handle_aiocb_discard, &acb);
|
||||
ret = raw_thread_pool_submit(handle_aiocb_discard, &acb);
|
||||
raw_account_discard(s, bytes, ret);
|
||||
return ret;
|
||||
}
|
||||
@ -3075,7 +3064,7 @@ raw_do_pwrite_zeroes(BlockDriverState *bs, int64_t offset, int64_t bytes,
|
||||
handler = handle_aiocb_write_zeroes;
|
||||
}
|
||||
|
||||
return raw_thread_pool_submit(bs, handler, &acb);
|
||||
return raw_thread_pool_submit(handler, &acb);
|
||||
}
|
||||
|
||||
static int coroutine_fn raw_co_pwrite_zeroes(
|
||||
@ -3313,7 +3302,7 @@ raw_co_copy_range_to(BlockDriverState *bs,
|
||||
},
|
||||
};
|
||||
|
||||
return raw_thread_pool_submit(bs, handle_aiocb_copy_range, &acb);
|
||||
return raw_thread_pool_submit(handle_aiocb_copy_range, &acb);
|
||||
}
|
||||
|
||||
BlockDriver bdrv_file = {
|
||||
@ -3643,7 +3632,7 @@ hdev_co_ioctl(BlockDriverState *bs, unsigned long int req, void *buf)
|
||||
struct sg_io_hdr *io_hdr = buf;
|
||||
if (io_hdr->cmdp[0] == PERSISTENT_RESERVE_OUT ||
|
||||
io_hdr->cmdp[0] == PERSISTENT_RESERVE_IN) {
|
||||
return pr_manager_execute(s->pr_mgr, bdrv_get_aio_context(bs),
|
||||
return pr_manager_execute(s->pr_mgr, qemu_get_current_aio_context(),
|
||||
s->fd, io_hdr);
|
||||
}
|
||||
}
|
||||
@ -3659,7 +3648,7 @@ hdev_co_ioctl(BlockDriverState *bs, unsigned long int req, void *buf)
|
||||
},
|
||||
};
|
||||
|
||||
return raw_thread_pool_submit(bs, handle_aiocb_ioctl, &acb);
|
||||
return raw_thread_pool_submit(handle_aiocb_ioctl, &acb);
|
||||
}
|
||||
#endif /* linux */
|
||||
|
||||
|
@ -153,7 +153,6 @@ static BlockAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
|
||||
BlockCompletionFunc *cb, void *opaque, int type)
|
||||
{
|
||||
RawWin32AIOData *acb = g_new(RawWin32AIOData, 1);
|
||||
ThreadPool *pool;
|
||||
|
||||
acb->bs = bs;
|
||||
acb->hfile = hfile;
|
||||
@ -168,8 +167,7 @@ static BlockAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
|
||||
acb->aio_offset = offset;
|
||||
|
||||
trace_file_paio_submit(acb, opaque, offset, count, type);
|
||||
pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
|
||||
return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
|
||||
return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
|
||||
}
|
||||
|
||||
int qemu_ftruncate64(int fd, int64_t length)
|
||||
|
@ -127,7 +127,7 @@ void bdrv_graph_wrlock(void)
|
||||
* reader lock.
|
||||
*/
|
||||
qatomic_set(&has_writer, 0);
|
||||
AIO_WAIT_WHILE(qemu_get_aio_context(), reader_count() >= 1);
|
||||
AIO_WAIT_WHILE_UNLOCKED(NULL, reader_count() >= 1);
|
||||
qatomic_set(&has_writer, 1);
|
||||
|
||||
/*
|
||||
|
@ -524,7 +524,7 @@ void bdrv_drain_all_begin(void)
|
||||
bdrv_drain_all_begin_nopoll();
|
||||
|
||||
/* Now poll the in-flight requests */
|
||||
AIO_WAIT_WHILE(NULL, bdrv_drain_all_poll());
|
||||
AIO_WAIT_WHILE_UNLOCKED(NULL, bdrv_drain_all_poll());
|
||||
|
||||
while ((bs = bdrv_next_all_states(bs))) {
|
||||
bdrv_drain_assert_idle(bs);
|
||||
|
@ -18,6 +18,9 @@
|
||||
#include "qapi/error.h"
|
||||
#include "trace.h"
|
||||
|
||||
/* Only used for assertions. */
|
||||
#include "qemu/coroutine_int.h"
|
||||
|
||||
/* io_uring ring size */
|
||||
#define MAX_ENTRIES 128
|
||||
|
||||
@ -50,10 +53,9 @@ typedef struct LuringState {
|
||||
|
||||
struct io_uring ring;
|
||||
|
||||
/* io queue for submit at batch. Protected by AioContext lock. */
|
||||
/* No locking required, only accessed from AioContext home thread */
|
||||
LuringQueue io_q;
|
||||
|
||||
/* I/O completion processing. Only runs in I/O thread. */
|
||||
QEMUBH *completion_bh;
|
||||
} LuringState;
|
||||
|
||||
@ -209,6 +211,7 @@ end:
|
||||
* eventually runs later. Coroutines cannot be entered recursively
|
||||
* so avoid doing that!
|
||||
*/
|
||||
assert(luringcb->co->ctx == s->aio_context);
|
||||
if (!qemu_coroutine_entered(luringcb->co)) {
|
||||
aio_co_wake(luringcb->co);
|
||||
}
|
||||
@ -262,13 +265,11 @@ static int ioq_submit(LuringState *s)
|
||||
|
||||
static void luring_process_completions_and_submit(LuringState *s)
|
||||
{
|
||||
aio_context_acquire(s->aio_context);
|
||||
luring_process_completions(s);
|
||||
|
||||
if (!s->io_q.plugged && s->io_q.in_queue > 0) {
|
||||
ioq_submit(s);
|
||||
}
|
||||
aio_context_release(s->aio_context);
|
||||
}
|
||||
|
||||
static void qemu_luring_completion_bh(void *opaque)
|
||||
@ -306,14 +307,18 @@ static void ioq_init(LuringQueue *io_q)
|
||||
io_q->blocked = false;
|
||||
}
|
||||
|
||||
void luring_io_plug(BlockDriverState *bs, LuringState *s)
|
||||
void luring_io_plug(void)
|
||||
{
|
||||
AioContext *ctx = qemu_get_current_aio_context();
|
||||
LuringState *s = aio_get_linux_io_uring(ctx);
|
||||
trace_luring_io_plug(s);
|
||||
s->io_q.plugged++;
|
||||
}
|
||||
|
||||
void luring_io_unplug(BlockDriverState *bs, LuringState *s)
|
||||
void luring_io_unplug(void)
|
||||
{
|
||||
AioContext *ctx = qemu_get_current_aio_context();
|
||||
LuringState *s = aio_get_linux_io_uring(ctx);
|
||||
assert(s->io_q.plugged);
|
||||
trace_luring_io_unplug(s, s->io_q.blocked, s->io_q.plugged,
|
||||
s->io_q.in_queue, s->io_q.in_flight);
|
||||
@ -373,10 +378,12 @@ static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
|
||||
return 0;
|
||||
}
|
||||
|
||||
int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
|
||||
uint64_t offset, QEMUIOVector *qiov, int type)
|
||||
int coroutine_fn luring_co_submit(BlockDriverState *bs, int fd, uint64_t offset,
|
||||
QEMUIOVector *qiov, int type)
|
||||
{
|
||||
int ret;
|
||||
AioContext *ctx = qemu_get_current_aio_context();
|
||||
LuringState *s = aio_get_linux_io_uring(ctx);
|
||||
LuringAIOCB luringcb = {
|
||||
.co = qemu_coroutine_self(),
|
||||
.ret = -EINPROGRESS,
|
||||
|
@ -16,6 +16,9 @@
|
||||
#include "qemu/coroutine.h"
|
||||
#include "qapi/error.h"
|
||||
|
||||
/* Only used for assertions. */
|
||||
#include "qemu/coroutine_int.h"
|
||||
|
||||
#include <libaio.h>
|
||||
|
||||
/*
|
||||
@ -56,10 +59,8 @@ struct LinuxAioState {
|
||||
io_context_t ctx;
|
||||
EventNotifier e;
|
||||
|
||||
/* io queue for submit at batch. Protected by AioContext lock. */
|
||||
/* No locking required, only accessed from AioContext home thread */
|
||||
LaioQueue io_q;
|
||||
|
||||
/* I/O completion processing. Only runs in I/O thread. */
|
||||
QEMUBH *completion_bh;
|
||||
int event_idx;
|
||||
int event_max;
|
||||
@ -102,6 +103,7 @@ static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
|
||||
* later. Coroutines cannot be entered recursively so avoid doing
|
||||
* that!
|
||||
*/
|
||||
assert(laiocb->co->ctx == laiocb->ctx->aio_context);
|
||||
if (!qemu_coroutine_entered(laiocb->co)) {
|
||||
aio_co_wake(laiocb->co);
|
||||
}
|
||||
@ -232,13 +234,11 @@ static void qemu_laio_process_completions(LinuxAioState *s)
|
||||
|
||||
static void qemu_laio_process_completions_and_submit(LinuxAioState *s)
|
||||
{
|
||||
aio_context_acquire(s->aio_context);
|
||||
qemu_laio_process_completions(s);
|
||||
|
||||
if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
|
||||
ioq_submit(s);
|
||||
}
|
||||
aio_context_release(s->aio_context);
|
||||
}
|
||||
|
||||
static void qemu_laio_completion_bh(void *opaque)
|
||||
@ -354,14 +354,19 @@ static uint64_t laio_max_batch(LinuxAioState *s, uint64_t dev_max_batch)
|
||||
return max_batch;
|
||||
}
|
||||
|
||||
void laio_io_plug(BlockDriverState *bs, LinuxAioState *s)
|
||||
void laio_io_plug(void)
|
||||
{
|
||||
AioContext *ctx = qemu_get_current_aio_context();
|
||||
LinuxAioState *s = aio_get_linux_aio(ctx);
|
||||
|
||||
s->io_q.plugged++;
|
||||
}
|
||||
|
||||
void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s,
|
||||
uint64_t dev_max_batch)
|
||||
void laio_io_unplug(uint64_t dev_max_batch)
|
||||
{
|
||||
AioContext *ctx = qemu_get_current_aio_context();
|
||||
LinuxAioState *s = aio_get_linux_aio(ctx);
|
||||
|
||||
assert(s->io_q.plugged);
|
||||
s->io_q.plugged--;
|
||||
|
||||
@ -411,15 +416,15 @@ static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset,
|
||||
return 0;
|
||||
}
|
||||
|
||||
int coroutine_fn laio_co_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
|
||||
uint64_t offset, QEMUIOVector *qiov, int type,
|
||||
uint64_t dev_max_batch)
|
||||
int coroutine_fn laio_co_submit(int fd, uint64_t offset, QEMUIOVector *qiov,
|
||||
int type, uint64_t dev_max_batch)
|
||||
{
|
||||
int ret;
|
||||
AioContext *ctx = qemu_get_current_aio_context();
|
||||
struct qemu_laiocb laiocb = {
|
||||
.co = qemu_coroutine_self(),
|
||||
.nbytes = qiov->size,
|
||||
.ctx = s,
|
||||
.ctx = aio_get_linux_aio(ctx),
|
||||
.ret = -EINPROGRESS,
|
||||
.is_read = (type == QEMU_AIO_READ),
|
||||
.qiov = qiov,
|
||||
|
@ -886,9 +886,9 @@ static int coroutine_fn mirror_dirty_init(MirrorBlockJob *s)
|
||||
/* Called when going out of the streaming phase to flush the bulk of the
|
||||
* data to the medium, or just before completing.
|
||||
*/
|
||||
static int mirror_flush(MirrorBlockJob *s)
|
||||
static int coroutine_fn mirror_flush(MirrorBlockJob *s)
|
||||
{
|
||||
int ret = blk_flush(s->target);
|
||||
int ret = blk_co_flush(s->target);
|
||||
if (ret < 0) {
|
||||
if (mirror_error_action(s, false, -ret) == BLOCK_ERROR_ACTION_REPORT) {
|
||||
s->ret = ret;
|
||||
|
@ -214,15 +214,17 @@ void hmp_commit(Monitor *mon, const QDict *qdict)
|
||||
error_report("Device '%s' not found", device);
|
||||
return;
|
||||
}
|
||||
if (!blk_is_available(blk)) {
|
||||
error_report("Device '%s' has no medium", device);
|
||||
return;
|
||||
}
|
||||
|
||||
bs = bdrv_skip_implicit_filters(blk_bs(blk));
|
||||
aio_context = bdrv_get_aio_context(bs);
|
||||
aio_context_acquire(aio_context);
|
||||
|
||||
if (!blk_is_available(blk)) {
|
||||
error_report("Device '%s' has no medium", device);
|
||||
aio_context_release(aio_context);
|
||||
return;
|
||||
}
|
||||
|
||||
ret = bdrv_commit(bs);
|
||||
|
||||
aio_context_release(aio_context);
|
||||
|
@ -1221,7 +1221,7 @@ out:
|
||||
}
|
||||
|
||||
/* Checks to see if it's safe to resize bitmaps */
|
||||
int qcow2_truncate_bitmaps_check(BlockDriverState *bs, Error **errp)
|
||||
int coroutine_fn qcow2_truncate_bitmaps_check(BlockDriverState *bs, Error **errp)
|
||||
{
|
||||
BDRVQcow2State *s = bs->opaque;
|
||||
Qcow2BitmapList *bm_list;
|
||||
|
@ -1126,7 +1126,7 @@ err:
|
||||
* Frees the allocated clusters because the request failed and they won't
|
||||
* actually be linked.
|
||||
*/
|
||||
void qcow2_alloc_cluster_abort(BlockDriverState *bs, QCowL2Meta *m)
|
||||
void coroutine_fn qcow2_alloc_cluster_abort(BlockDriverState *bs, QCowL2Meta *m)
|
||||
{
|
||||
BDRVQcow2State *s = bs->opaque;
|
||||
if (!has_data_file(bs) && !m->keep_old_clusters) {
|
||||
@ -1156,9 +1156,11 @@ void qcow2_alloc_cluster_abort(BlockDriverState *bs, QCowL2Meta *m)
|
||||
*
|
||||
* Returns 0 on success, -errno on failure.
|
||||
*/
|
||||
static int calculate_l2_meta(BlockDriverState *bs, uint64_t host_cluster_offset,
|
||||
uint64_t guest_offset, unsigned bytes,
|
||||
uint64_t *l2_slice, QCowL2Meta **m, bool keep_old)
|
||||
static int coroutine_fn calculate_l2_meta(BlockDriverState *bs,
|
||||
uint64_t host_cluster_offset,
|
||||
uint64_t guest_offset, unsigned bytes,
|
||||
uint64_t *l2_slice, QCowL2Meta **m,
|
||||
bool keep_old)
|
||||
{
|
||||
BDRVQcow2State *s = bs->opaque;
|
||||
int sc_index, l2_index = offset_to_l2_slice_index(s, guest_offset);
|
||||
@ -1599,8 +1601,10 @@ out:
|
||||
* function has been waiting for another request and the allocation must be
|
||||
* restarted, but the whole request should not be failed.
|
||||
*/
|
||||
static int do_alloc_cluster_offset(BlockDriverState *bs, uint64_t guest_offset,
|
||||
uint64_t *host_offset, uint64_t *nb_clusters)
|
||||
static int coroutine_fn do_alloc_cluster_offset(BlockDriverState *bs,
|
||||
uint64_t guest_offset,
|
||||
uint64_t *host_offset,
|
||||
uint64_t *nb_clusters)
|
||||
{
|
||||
BDRVQcow2State *s = bs->opaque;
|
||||
|
||||
@ -2065,8 +2069,9 @@ static int zero_in_l2_slice(BlockDriverState *bs, uint64_t offset,
|
||||
return nb_clusters;
|
||||
}
|
||||
|
||||
static int zero_l2_subclusters(BlockDriverState *bs, uint64_t offset,
|
||||
unsigned nb_subclusters)
|
||||
static int coroutine_fn
|
||||
zero_l2_subclusters(BlockDriverState *bs, uint64_t offset,
|
||||
unsigned nb_subclusters)
|
||||
{
|
||||
BDRVQcow2State *s = bs->opaque;
|
||||
uint64_t *l2_slice;
|
||||
|
@ -1030,8 +1030,8 @@ int64_t qcow2_alloc_clusters(BlockDriverState *bs, uint64_t size)
|
||||
return offset;
|
||||
}
|
||||
|
||||
int64_t qcow2_alloc_clusters_at(BlockDriverState *bs, uint64_t offset,
|
||||
int64_t nb_clusters)
|
||||
int64_t coroutine_fn qcow2_alloc_clusters_at(BlockDriverState *bs, uint64_t offset,
|
||||
int64_t nb_clusters)
|
||||
{
|
||||
BDRVQcow2State *s = bs->opaque;
|
||||
uint64_t cluster_index, refcount;
|
||||
@ -1069,7 +1069,7 @@ int64_t qcow2_alloc_clusters_at(BlockDriverState *bs, uint64_t offset,
|
||||
|
||||
/* only used to allocate compressed sectors. We try to allocate
|
||||
contiguous sectors. size must be <= cluster_size */
|
||||
int64_t qcow2_alloc_bytes(BlockDriverState *bs, int size)
|
||||
int64_t coroutine_fn qcow2_alloc_bytes(BlockDriverState *bs, int size)
|
||||
{
|
||||
BDRVQcow2State *s = bs->opaque;
|
||||
int64_t offset;
|
||||
@ -3685,7 +3685,7 @@ out:
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t qcow2_get_last_cluster(BlockDriverState *bs, int64_t size)
|
||||
int64_t coroutine_fn qcow2_get_last_cluster(BlockDriverState *bs, int64_t size)
|
||||
{
|
||||
BDRVQcow2State *s = bs->opaque;
|
||||
int64_t i;
|
||||
|
@ -77,10 +77,11 @@ void qcow2_free_snapshots(BlockDriverState *bs)
|
||||
* qcow2_check_refcounts() does not do anything with snapshots'
|
||||
* extra data.)
|
||||
*/
|
||||
static int qcow2_do_read_snapshots(BlockDriverState *bs, bool repair,
|
||||
int *nb_clusters_reduced,
|
||||
int *extra_data_dropped,
|
||||
Error **errp)
|
||||
static coroutine_fn GRAPH_RDLOCK
|
||||
int qcow2_do_read_snapshots(BlockDriverState *bs, bool repair,
|
||||
int *nb_clusters_reduced,
|
||||
int *extra_data_dropped,
|
||||
Error **errp)
|
||||
{
|
||||
BDRVQcow2State *s = bs->opaque;
|
||||
QCowSnapshotHeader h;
|
||||
@ -108,7 +109,7 @@ static int qcow2_do_read_snapshots(BlockDriverState *bs, bool repair,
|
||||
|
||||
/* Read statically sized part of the snapshot header */
|
||||
offset = ROUND_UP(offset, 8);
|
||||
ret = bdrv_pread(bs->file, offset, sizeof(h), &h, 0);
|
||||
ret = bdrv_co_pread(bs->file, offset, sizeof(h), &h, 0);
|
||||
if (ret < 0) {
|
||||
error_setg_errno(errp, -ret, "Failed to read snapshot table");
|
||||
goto fail;
|
||||
@ -146,8 +147,8 @@ static int qcow2_do_read_snapshots(BlockDriverState *bs, bool repair,
|
||||
}
|
||||
|
||||
/* Read known extra data */
|
||||
ret = bdrv_pread(bs->file, offset,
|
||||
MIN(sizeof(extra), sn->extra_data_size), &extra, 0);
|
||||
ret = bdrv_co_pread(bs->file, offset,
|
||||
MIN(sizeof(extra), sn->extra_data_size), &extra, 0);
|
||||
if (ret < 0) {
|
||||
error_setg_errno(errp, -ret, "Failed to read snapshot table");
|
||||
goto fail;
|
||||
@ -184,8 +185,8 @@ static int qcow2_do_read_snapshots(BlockDriverState *bs, bool repair,
|
||||
/* Store unknown extra data */
|
||||
unknown_extra_data_size = sn->extra_data_size - sizeof(extra);
|
||||
sn->unknown_extra_data = g_malloc(unknown_extra_data_size);
|
||||
ret = bdrv_pread(bs->file, offset, unknown_extra_data_size,
|
||||
sn->unknown_extra_data, 0);
|
||||
ret = bdrv_co_pread(bs->file, offset, unknown_extra_data_size,
|
||||
sn->unknown_extra_data, 0);
|
||||
if (ret < 0) {
|
||||
error_setg_errno(errp, -ret,
|
||||
"Failed to read snapshot table");
|
||||
@ -196,7 +197,7 @@ static int qcow2_do_read_snapshots(BlockDriverState *bs, bool repair,
|
||||
|
||||
/* Read snapshot ID */
|
||||
sn->id_str = g_malloc(id_str_size + 1);
|
||||
ret = bdrv_pread(bs->file, offset, id_str_size, sn->id_str, 0);
|
||||
ret = bdrv_co_pread(bs->file, offset, id_str_size, sn->id_str, 0);
|
||||
if (ret < 0) {
|
||||
error_setg_errno(errp, -ret, "Failed to read snapshot table");
|
||||
goto fail;
|
||||
@ -206,7 +207,7 @@ static int qcow2_do_read_snapshots(BlockDriverState *bs, bool repair,
|
||||
|
||||
/* Read snapshot name */
|
||||
sn->name = g_malloc(name_size + 1);
|
||||
ret = bdrv_pread(bs->file, offset, name_size, sn->name, 0);
|
||||
ret = bdrv_co_pread(bs->file, offset, name_size, sn->name, 0);
|
||||
if (ret < 0) {
|
||||
error_setg_errno(errp, -ret, "Failed to read snapshot table");
|
||||
goto fail;
|
||||
@ -261,7 +262,7 @@ fail:
|
||||
return ret;
|
||||
}
|
||||
|
||||
int qcow2_read_snapshots(BlockDriverState *bs, Error **errp)
|
||||
int coroutine_fn qcow2_read_snapshots(BlockDriverState *bs, Error **errp)
|
||||
{
|
||||
return qcow2_do_read_snapshots(bs, false, NULL, NULL, errp);
|
||||
}
|
||||
|
@ -43,7 +43,6 @@ qcow2_co_process(BlockDriverState *bs, ThreadPoolFunc *func, void *arg)
|
||||
{
|
||||
int ret;
|
||||
BDRVQcow2State *s = bs->opaque;
|
||||
ThreadPool *pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
|
||||
|
||||
qemu_co_mutex_lock(&s->lock);
|
||||
while (s->nb_threads >= QCOW2_MAX_THREADS) {
|
||||
@ -52,7 +51,7 @@ qcow2_co_process(BlockDriverState *bs, ThreadPoolFunc *func, void *arg)
|
||||
s->nb_threads++;
|
||||
qemu_co_mutex_unlock(&s->lock);
|
||||
|
||||
ret = thread_pool_submit_co(pool, func, arg);
|
||||
ret = thread_pool_submit_co(func, arg);
|
||||
|
||||
qemu_co_mutex_lock(&s->lock);
|
||||
s->nb_threads--;
|
||||
|
@ -199,10 +199,10 @@ qcow2_extract_crypto_opts(QemuOpts *opts, const char *fmt, Error **errp)
|
||||
* unknown magic is skipped (future extension this version knows nothing about)
|
||||
* return 0 upon success, non-0 otherwise
|
||||
*/
|
||||
static int qcow2_read_extensions(BlockDriverState *bs, uint64_t start_offset,
|
||||
uint64_t end_offset, void **p_feature_table,
|
||||
int flags, bool *need_update_header,
|
||||
Error **errp)
|
||||
static int coroutine_fn GRAPH_RDLOCK
|
||||
qcow2_read_extensions(BlockDriverState *bs, uint64_t start_offset,
|
||||
uint64_t end_offset, void **p_feature_table,
|
||||
int flags, bool *need_update_header, Error **errp)
|
||||
{
|
||||
BDRVQcow2State *s = bs->opaque;
|
||||
QCowExtension ext;
|
||||
@ -228,7 +228,7 @@ static int qcow2_read_extensions(BlockDriverState *bs, uint64_t start_offset,
|
||||
printf("attempting to read extended header in offset %lu\n", offset);
|
||||
#endif
|
||||
|
||||
ret = bdrv_pread(bs->file, offset, sizeof(ext), &ext, 0);
|
||||
ret = bdrv_co_pread(bs->file, offset, sizeof(ext), &ext, 0);
|
||||
if (ret < 0) {
|
||||
error_setg_errno(errp, -ret, "qcow2_read_extension: ERROR: "
|
||||
"pread fail from offset %" PRIu64, offset);
|
||||
@ -256,7 +256,7 @@ static int qcow2_read_extensions(BlockDriverState *bs, uint64_t start_offset,
|
||||
sizeof(bs->backing_format));
|
||||
return 2;
|
||||
}
|
||||
ret = bdrv_pread(bs->file, offset, ext.len, bs->backing_format, 0);
|
||||
ret = bdrv_co_pread(bs->file, offset, ext.len, bs->backing_format, 0);
|
||||
if (ret < 0) {
|
||||
error_setg_errno(errp, -ret, "ERROR: ext_backing_format: "
|
||||
"Could not read format name");
|
||||
@ -272,7 +272,7 @@ static int qcow2_read_extensions(BlockDriverState *bs, uint64_t start_offset,
|
||||
case QCOW2_EXT_MAGIC_FEATURE_TABLE:
|
||||
if (p_feature_table != NULL) {
|
||||
void *feature_table = g_malloc0(ext.len + 2 * sizeof(Qcow2Feature));
|
||||
ret = bdrv_pread(bs->file, offset, ext.len, feature_table, 0);
|
||||
ret = bdrv_co_pread(bs->file, offset, ext.len, feature_table, 0);
|
||||
if (ret < 0) {
|
||||
error_setg_errno(errp, -ret, "ERROR: ext_feature_table: "
|
||||
"Could not read table");
|
||||
@ -298,7 +298,7 @@ static int qcow2_read_extensions(BlockDriverState *bs, uint64_t start_offset,
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
ret = bdrv_pread(bs->file, offset, ext.len, &s->crypto_header, 0);
|
||||
ret = bdrv_co_pread(bs->file, offset, ext.len, &s->crypto_header, 0);
|
||||
if (ret < 0) {
|
||||
error_setg_errno(errp, -ret,
|
||||
"Unable to read CRYPTO header extension");
|
||||
@ -354,7 +354,7 @@ static int qcow2_read_extensions(BlockDriverState *bs, uint64_t start_offset,
|
||||
break;
|
||||
}
|
||||
|
||||
ret = bdrv_pread(bs->file, offset, ext.len, &bitmaps_ext, 0);
|
||||
ret = bdrv_co_pread(bs->file, offset, ext.len, &bitmaps_ext, 0);
|
||||
if (ret < 0) {
|
||||
error_setg_errno(errp, -ret, "bitmaps_ext: "
|
||||
"Could not read ext header");
|
||||
@ -418,7 +418,7 @@ static int qcow2_read_extensions(BlockDriverState *bs, uint64_t start_offset,
|
||||
case QCOW2_EXT_MAGIC_DATA_FILE:
|
||||
{
|
||||
s->image_data_file = g_malloc0(ext.len + 1);
|
||||
ret = bdrv_pread(bs->file, offset, ext.len, s->image_data_file, 0);
|
||||
ret = bdrv_co_pread(bs->file, offset, ext.len, s->image_data_file, 0);
|
||||
if (ret < 0) {
|
||||
error_setg_errno(errp, -ret,
|
||||
"ERROR: Could not read data file name");
|
||||
@ -442,7 +442,7 @@ static int qcow2_read_extensions(BlockDriverState *bs, uint64_t start_offset,
|
||||
uext->len = ext.len;
|
||||
QLIST_INSERT_HEAD(&s->unknown_header_ext, uext, next);
|
||||
|
||||
ret = bdrv_pread(bs->file, offset, uext->len, uext->data, 0);
|
||||
ret = bdrv_co_pread(bs->file, offset, uext->len, uext->data, 0);
|
||||
if (ret < 0) {
|
||||
error_setg_errno(errp, -ret, "ERROR: unknown extension: "
|
||||
"Could not read data");
|
||||
@ -1241,8 +1241,9 @@ static void qcow2_update_options_abort(BlockDriverState *bs,
|
||||
qapi_free_QCryptoBlockOpenOptions(r->crypto_opts);
|
||||
}
|
||||
|
||||
static int qcow2_update_options(BlockDriverState *bs, QDict *options,
|
||||
int flags, Error **errp)
|
||||
static int coroutine_fn
|
||||
qcow2_update_options(BlockDriverState *bs, QDict *options, int flags,
|
||||
Error **errp)
|
||||
{
|
||||
Qcow2ReopenState r = {};
|
||||
int ret;
|
||||
|
@ -862,9 +862,9 @@ int64_t qcow2_refcount_area(BlockDriverState *bs, uint64_t offset,
|
||||
uint64_t new_refblock_offset);
|
||||
|
||||
int64_t qcow2_alloc_clusters(BlockDriverState *bs, uint64_t size);
|
||||
int64_t qcow2_alloc_clusters_at(BlockDriverState *bs, uint64_t offset,
|
||||
int64_t nb_clusters);
|
||||
int64_t qcow2_alloc_bytes(BlockDriverState *bs, int size);
|
||||
int64_t coroutine_fn qcow2_alloc_clusters_at(BlockDriverState *bs, uint64_t offset,
|
||||
int64_t nb_clusters);
|
||||
int64_t coroutine_fn qcow2_alloc_bytes(BlockDriverState *bs, int size);
|
||||
void qcow2_free_clusters(BlockDriverState *bs,
|
||||
int64_t offset, int64_t size,
|
||||
enum qcow2_discard_type type);
|
||||
@ -894,7 +894,7 @@ int qcow2_change_refcount_order(BlockDriverState *bs, int refcount_order,
|
||||
BlockDriverAmendStatusCB *status_cb,
|
||||
void *cb_opaque, Error **errp);
|
||||
int coroutine_fn GRAPH_RDLOCK qcow2_shrink_reftable(BlockDriverState *bs);
|
||||
int64_t qcow2_get_last_cluster(BlockDriverState *bs, int64_t size);
|
||||
int64_t coroutine_fn qcow2_get_last_cluster(BlockDriverState *bs, int64_t size);
|
||||
int coroutine_fn qcow2_detect_metadata_preallocation(BlockDriverState *bs);
|
||||
|
||||
/* qcow2-cluster.c functions */
|
||||
@ -924,7 +924,7 @@ void qcow2_parse_compressed_l2_entry(BlockDriverState *bs, uint64_t l2_entry,
|
||||
int coroutine_fn GRAPH_RDLOCK
|
||||
qcow2_alloc_cluster_link_l2(BlockDriverState *bs, QCowL2Meta *m);
|
||||
|
||||
void qcow2_alloc_cluster_abort(BlockDriverState *bs, QCowL2Meta *m);
|
||||
void coroutine_fn qcow2_alloc_cluster_abort(BlockDriverState *bs, QCowL2Meta *m);
|
||||
int qcow2_cluster_discard(BlockDriverState *bs, uint64_t offset,
|
||||
uint64_t bytes, enum qcow2_discard_type type,
|
||||
bool full_discard);
|
||||
@ -951,7 +951,8 @@ int qcow2_snapshot_load_tmp(BlockDriverState *bs,
|
||||
Error **errp);
|
||||
|
||||
void qcow2_free_snapshots(BlockDriverState *bs);
|
||||
int qcow2_read_snapshots(BlockDriverState *bs, Error **errp);
|
||||
int coroutine_fn GRAPH_RDLOCK
|
||||
qcow2_read_snapshots(BlockDriverState *bs, Error **errp);
|
||||
int qcow2_write_snapshots(BlockDriverState *bs);
|
||||
|
||||
int coroutine_fn GRAPH_RDLOCK
|
||||
@ -994,7 +995,7 @@ bool coroutine_fn qcow2_load_dirty_bitmaps(BlockDriverState *bs,
|
||||
bool qcow2_get_bitmap_info_list(BlockDriverState *bs,
|
||||
Qcow2BitmapInfoList **info_list, Error **errp);
|
||||
int qcow2_reopen_bitmaps_rw(BlockDriverState *bs, Error **errp);
|
||||
int qcow2_truncate_bitmaps_check(BlockDriverState *bs, Error **errp);
|
||||
int coroutine_fn qcow2_truncate_bitmaps_check(BlockDriverState *bs, Error **errp);
|
||||
bool qcow2_store_persistent_dirty_bitmaps(BlockDriverState *bs,
|
||||
bool release_stored, Error **errp);
|
||||
int qcow2_reopen_bitmaps_ro(BlockDriverState *bs, Error **errp);
|
||||
|
@ -376,7 +376,7 @@ out:
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int vmdk_is_cid_valid(BlockDriverState *bs)
|
||||
static int coroutine_fn vmdk_is_cid_valid(BlockDriverState *bs)
|
||||
{
|
||||
BDRVVmdkState *s = bs->opaque;
|
||||
uint32_t cur_pcid;
|
||||
|
@ -1053,7 +1053,7 @@ static BDRVVVFATState *vvv = NULL;
|
||||
#endif
|
||||
|
||||
static int enable_write_target(BlockDriverState *bs, Error **errp);
|
||||
static int is_consistent(BDRVVVFATState *s);
|
||||
static int coroutine_fn is_consistent(BDRVVVFATState *s);
|
||||
|
||||
static QemuOptsList runtime_opts = {
|
||||
.name = "vvfat",
|
||||
@ -1469,8 +1469,8 @@ static void print_mapping(const mapping_t* mapping)
|
||||
}
|
||||
#endif
|
||||
|
||||
static int vvfat_read(BlockDriverState *bs, int64_t sector_num,
|
||||
uint8_t *buf, int nb_sectors)
|
||||
static int coroutine_fn GRAPH_RDLOCK
|
||||
vvfat_read(BlockDriverState *bs, int64_t sector_num, uint8_t *buf, int nb_sectors)
|
||||
{
|
||||
BDRVVVFATState *s = bs->opaque;
|
||||
int i;
|
||||
@ -1490,8 +1490,8 @@ static int vvfat_read(BlockDriverState *bs, int64_t sector_num,
|
||||
DLOG(fprintf(stderr, "sectors %" PRId64 "+%" PRId64
|
||||
" allocated\n", sector_num,
|
||||
n >> BDRV_SECTOR_BITS));
|
||||
if (bdrv_pread(s->qcow, sector_num * BDRV_SECTOR_SIZE, n,
|
||||
buf + i * 0x200, 0) < 0) {
|
||||
if (bdrv_co_pread(s->qcow, sector_num * BDRV_SECTOR_SIZE, n,
|
||||
buf + i * 0x200, 0) < 0) {
|
||||
return -1;
|
||||
}
|
||||
i += (n >> BDRV_SECTOR_BITS) - 1;
|
||||
@ -1532,7 +1532,7 @@ static int vvfat_read(BlockDriverState *bs, int64_t sector_num,
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int coroutine_fn
|
||||
static int coroutine_fn GRAPH_RDLOCK
|
||||
vvfat_co_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
|
||||
QEMUIOVector *qiov, BdrvRequestFlags flags)
|
||||
{
|
||||
@ -1796,8 +1796,8 @@ static inline uint32_t modified_fat_get(BDRVVVFATState* s,
|
||||
}
|
||||
}
|
||||
|
||||
static inline bool cluster_was_modified(BDRVVVFATState *s,
|
||||
uint32_t cluster_num)
|
||||
static inline bool coroutine_fn GRAPH_RDLOCK
|
||||
cluster_was_modified(BDRVVVFATState *s, uint32_t cluster_num)
|
||||
{
|
||||
int was_modified = 0;
|
||||
int i;
|
||||
@ -1852,8 +1852,8 @@ typedef enum {
|
||||
* Further, the files/directories handled by this function are
|
||||
* assumed to be *not* deleted (and *only* those).
|
||||
*/
|
||||
static uint32_t get_cluster_count_for_direntry(BDRVVVFATState* s,
|
||||
direntry_t* direntry, const char* path)
|
||||
static uint32_t coroutine_fn GRAPH_RDLOCK
|
||||
get_cluster_count_for_direntry(BDRVVVFATState* s, direntry_t* direntry, const char* path)
|
||||
{
|
||||
/*
|
||||
* This is a little bit tricky:
|
||||
@ -1979,9 +1979,9 @@ static uint32_t get_cluster_count_for_direntry(BDRVVVFATState* s,
|
||||
if (res) {
|
||||
return -1;
|
||||
}
|
||||
res = bdrv_pwrite(s->qcow, offset * BDRV_SECTOR_SIZE,
|
||||
BDRV_SECTOR_SIZE, s->cluster_buffer,
|
||||
0);
|
||||
res = bdrv_co_pwrite(s->qcow, offset * BDRV_SECTOR_SIZE,
|
||||
BDRV_SECTOR_SIZE, s->cluster_buffer,
|
||||
0);
|
||||
if (res < 0) {
|
||||
return -2;
|
||||
}
|
||||
@ -2011,8 +2011,8 @@ static uint32_t get_cluster_count_for_direntry(BDRVVVFATState* s,
|
||||
* It returns 0 upon inconsistency or error, and the number of clusters
|
||||
* used by the directory, its subdirectories and their files.
|
||||
*/
|
||||
static int check_directory_consistency(BDRVVVFATState *s,
|
||||
int cluster_num, const char* path)
|
||||
static int coroutine_fn GRAPH_RDLOCK
|
||||
check_directory_consistency(BDRVVVFATState *s, int cluster_num, const char* path)
|
||||
{
|
||||
int ret = 0;
|
||||
unsigned char* cluster = g_malloc(s->cluster_size);
|
||||
@ -2138,7 +2138,8 @@ DLOG(fprintf(stderr, "check direntry %d:\n", i); print_direntry(direntries + i))
|
||||
}
|
||||
|
||||
/* returns 1 on success */
|
||||
static int is_consistent(BDRVVVFATState* s)
|
||||
static int coroutine_fn GRAPH_RDLOCK
|
||||
is_consistent(BDRVVVFATState* s)
|
||||
{
|
||||
int i, check;
|
||||
int used_clusters_count = 0;
|
||||
@ -2414,8 +2415,8 @@ static int commit_mappings(BDRVVVFATState* s,
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int commit_direntries(BDRVVVFATState* s,
|
||||
int dir_index, int parent_mapping_index)
|
||||
static int coroutine_fn GRAPH_RDLOCK
|
||||
commit_direntries(BDRVVVFATState* s, int dir_index, int parent_mapping_index)
|
||||
{
|
||||
direntry_t* direntry = array_get(&(s->directory), dir_index);
|
||||
uint32_t first_cluster = dir_index == 0 ? 0 : begin_of_direntry(direntry);
|
||||
@ -2504,8 +2505,8 @@ static int commit_direntries(BDRVVVFATState* s,
|
||||
|
||||
/* commit one file (adjust contents, adjust mapping),
|
||||
return first_mapping_index */
|
||||
static int commit_one_file(BDRVVVFATState* s,
|
||||
int dir_index, uint32_t offset)
|
||||
static int coroutine_fn GRAPH_RDLOCK
|
||||
commit_one_file(BDRVVVFATState* s, int dir_index, uint32_t offset)
|
||||
{
|
||||
direntry_t* direntry = array_get(&(s->directory), dir_index);
|
||||
uint32_t c = begin_of_direntry(direntry);
|
||||
@ -2770,7 +2771,7 @@ static int handle_renames_and_mkdirs(BDRVVVFATState* s)
|
||||
/*
|
||||
* TODO: make sure that the short name is not matching *another* file
|
||||
*/
|
||||
static int handle_commits(BDRVVVFATState* s)
|
||||
static int coroutine_fn GRAPH_RDLOCK handle_commits(BDRVVVFATState* s)
|
||||
{
|
||||
int i, fail = 0;
|
||||
|
||||
@ -2913,7 +2914,7 @@ static int handle_deletes(BDRVVVFATState* s)
|
||||
* - recurse direntries from root (using bs->bdrv_pread)
|
||||
* - delete files corresponding to mappings marked as deleted
|
||||
*/
|
||||
static int do_commit(BDRVVVFATState* s)
|
||||
static int coroutine_fn GRAPH_RDLOCK do_commit(BDRVVVFATState* s)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
@ -2963,7 +2964,7 @@ DLOG(checkpoint());
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int try_commit(BDRVVVFATState* s)
|
||||
static int coroutine_fn GRAPH_RDLOCK try_commit(BDRVVVFATState* s)
|
||||
{
|
||||
vvfat_close_current_file(s);
|
||||
DLOG(checkpoint());
|
||||
@ -2972,8 +2973,9 @@ DLOG(checkpoint());
|
||||
return do_commit(s);
|
||||
}
|
||||
|
||||
static int vvfat_write(BlockDriverState *bs, int64_t sector_num,
|
||||
const uint8_t *buf, int nb_sectors)
|
||||
static int coroutine_fn GRAPH_RDLOCK
|
||||
vvfat_write(BlockDriverState *bs, int64_t sector_num,
|
||||
const uint8_t *buf, int nb_sectors)
|
||||
{
|
||||
BDRVVVFATState *s = bs->opaque;
|
||||
int i, ret;
|
||||
@ -3082,8 +3084,8 @@ DLOG(checkpoint());
|
||||
* Use qcow backend. Commit later.
|
||||
*/
|
||||
DLOG(fprintf(stderr, "Write to qcow backend: %d + %d\n", (int)sector_num, nb_sectors));
|
||||
ret = bdrv_pwrite(s->qcow, sector_num * BDRV_SECTOR_SIZE,
|
||||
nb_sectors * BDRV_SECTOR_SIZE, buf, 0);
|
||||
ret = bdrv_co_pwrite(s->qcow, sector_num * BDRV_SECTOR_SIZE,
|
||||
nb_sectors * BDRV_SECTOR_SIZE, buf, 0);
|
||||
if (ret < 0) {
|
||||
fprintf(stderr, "Error writing to qcow backend\n");
|
||||
return ret;
|
||||
@ -3103,7 +3105,7 @@ DLOG(checkpoint());
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int coroutine_fn
|
||||
static int coroutine_fn GRAPH_RDLOCK
|
||||
vvfat_co_pwritev(BlockDriverState *bs, int64_t offset, int64_t bytes,
|
||||
QEMUIOVector *qiov, BdrvRequestFlags flags)
|
||||
{
|
||||
|
@ -203,7 +203,7 @@ typedef struct V9fsDir {
|
||||
QemuMutex readdir_mutex_L;
|
||||
} V9fsDir;
|
||||
|
||||
static inline void v9fs_readdir_lock(V9fsDir *dir)
|
||||
static inline void coroutine_fn v9fs_readdir_lock(V9fsDir *dir)
|
||||
{
|
||||
if (dir->proto_version == V9FS_PROTO_2000U) {
|
||||
qemu_co_mutex_lock(&dir->readdir_mutex_u);
|
||||
@ -212,7 +212,7 @@ static inline void v9fs_readdir_lock(V9fsDir *dir)
|
||||
}
|
||||
}
|
||||
|
||||
static inline void v9fs_readdir_unlock(V9fsDir *dir)
|
||||
static inline void coroutine_fn v9fs_readdir_unlock(V9fsDir *dir)
|
||||
{
|
||||
if (dir->proto_version == V9FS_PROTO_2000U) {
|
||||
qemu_co_mutex_unlock(&dir->readdir_mutex_u);
|
||||
|
@ -68,9 +68,9 @@ int coroutine_fn v9fs_co_readdir(V9fsPDU *pdu, V9fsFidState *fidp,
|
||||
*
|
||||
* See v9fs_co_readdir_many() (as its only user) below for details.
|
||||
*/
|
||||
static int do_readdir_many(V9fsPDU *pdu, V9fsFidState *fidp,
|
||||
struct V9fsDirEnt **entries, off_t offset,
|
||||
int32_t maxsize, bool dostat)
|
||||
static int coroutine_fn
|
||||
do_readdir_many(V9fsPDU *pdu, V9fsFidState *fidp, struct V9fsDirEnt **entries,
|
||||
off_t offset, int32_t maxsize, bool dostat)
|
||||
{
|
||||
V9fsState *s = pdu->s;
|
||||
V9fsString name;
|
||||
|
@ -41,6 +41,5 @@ static int coroutine_enter_func(void *arg)
|
||||
void co_run_in_worker_bh(void *opaque)
|
||||
{
|
||||
Coroutine *co = opaque;
|
||||
thread_pool_submit_aio(aio_get_thread_pool(qemu_get_aio_context()),
|
||||
coroutine_enter_func, co, coroutine_enter_cb, co);
|
||||
thread_pool_submit_aio(coroutine_enter_func, co, coroutine_enter_cb, co);
|
||||
}
|
||||
|
@ -496,7 +496,6 @@ static int spapr_nvdimm_flush_post_load(void *opaque, int version_id)
|
||||
{
|
||||
SpaprNVDIMMDevice *s_nvdimm = (SpaprNVDIMMDevice *)opaque;
|
||||
SpaprNVDIMMDeviceFlushState *state;
|
||||
ThreadPool *pool = aio_get_thread_pool(qemu_get_aio_context());
|
||||
HostMemoryBackend *backend = MEMORY_BACKEND(PC_DIMM(s_nvdimm)->hostmem);
|
||||
bool is_pmem = object_property_get_bool(OBJECT(backend), "pmem", NULL);
|
||||
bool pmem_override = object_property_get_bool(OBJECT(s_nvdimm),
|
||||
@ -517,7 +516,7 @@ static int spapr_nvdimm_flush_post_load(void *opaque, int version_id)
|
||||
}
|
||||
|
||||
QLIST_FOREACH(state, &s_nvdimm->pending_nvdimm_flush_states, node) {
|
||||
thread_pool_submit_aio(pool, flush_worker_cb, state,
|
||||
thread_pool_submit_aio(flush_worker_cb, state,
|
||||
spapr_nvdimm_flush_completion_cb, state);
|
||||
}
|
||||
|
||||
@ -664,7 +663,6 @@ static target_ulong h_scm_flush(PowerPCCPU *cpu, SpaprMachineState *spapr,
|
||||
PCDIMMDevice *dimm;
|
||||
HostMemoryBackend *backend = NULL;
|
||||
SpaprNVDIMMDeviceFlushState *state;
|
||||
ThreadPool *pool = aio_get_thread_pool(qemu_get_aio_context());
|
||||
int fd;
|
||||
|
||||
if (!drc || !drc->dev ||
|
||||
@ -699,7 +697,7 @@ static target_ulong h_scm_flush(PowerPCCPU *cpu, SpaprMachineState *spapr,
|
||||
|
||||
state->drcidx = drc_index;
|
||||
|
||||
thread_pool_submit_aio(pool, flush_worker_cb, state,
|
||||
thread_pool_submit_aio(flush_worker_cb, state,
|
||||
spapr_nvdimm_flush_completion_cb, state);
|
||||
|
||||
continue_token = state->continue_token;
|
||||
|
@ -70,7 +70,6 @@ static void virtio_pmem_flush(VirtIODevice *vdev, VirtQueue *vq)
|
||||
VirtIODeviceRequest *req_data;
|
||||
VirtIOPMEM *pmem = VIRTIO_PMEM(vdev);
|
||||
HostMemoryBackend *backend = MEMORY_BACKEND(pmem->memdev);
|
||||
ThreadPool *pool = aio_get_thread_pool(qemu_get_aio_context());
|
||||
|
||||
trace_virtio_pmem_flush_request();
|
||||
req_data = virtqueue_pop(vq, sizeof(VirtIODeviceRequest));
|
||||
@ -88,7 +87,7 @@ static void virtio_pmem_flush(VirtIODevice *vdev, VirtQueue *vq)
|
||||
req_data->fd = memory_region_get_fd(&backend->mr);
|
||||
req_data->pmem = pmem;
|
||||
req_data->vdev = vdev;
|
||||
thread_pool_submit_aio(pool, worker_cb, req_data, done_cb, req_data);
|
||||
thread_pool_submit_aio(worker_cb, req_data, done_cb, req_data);
|
||||
}
|
||||
|
||||
static void virtio_pmem_get_config(VirtIODevice *vdev, uint8_t *config)
|
||||
|
@ -63,7 +63,7 @@ extern AioWait global_aio_wait;
|
||||
* @ctx: the aio context, or NULL if multiple aio contexts (for which the
|
||||
* caller does not hold a lock) are involved in the polling condition.
|
||||
* @cond: wait while this conditional expression is true
|
||||
* @unlock: whether to unlock and then lock again @ctx. This apples
|
||||
* @unlock: whether to unlock and then lock again @ctx. This applies
|
||||
* only when waiting for another AioContext from the main loop.
|
||||
* Otherwise it's ignored.
|
||||
*
|
||||
|
@ -208,17 +208,9 @@ struct AioContext {
|
||||
struct ThreadPool *thread_pool;
|
||||
|
||||
#ifdef CONFIG_LINUX_AIO
|
||||
/*
|
||||
* State for native Linux AIO. Uses aio_context_acquire/release for
|
||||
* locking.
|
||||
*/
|
||||
struct LinuxAioState *linux_aio;
|
||||
#endif
|
||||
#ifdef CONFIG_LINUX_IO_URING
|
||||
/*
|
||||
* State for Linux io_uring. Uses aio_context_acquire/release for
|
||||
* locking.
|
||||
*/
|
||||
struct LuringState *linux_io_uring;
|
||||
|
||||
/* State for file descriptor monitoring using Linux io_uring */
|
||||
|
@ -1260,7 +1260,7 @@ extern QemuOptsList bdrv_create_opts_simple;
|
||||
/*
|
||||
* Common functions that are neither I/O nor Global State.
|
||||
*
|
||||
* See include/block/block-commmon.h for more information about
|
||||
* See include/block/block-common.h for more information about
|
||||
* the Common API.
|
||||
*/
|
||||
|
||||
|
@ -49,26 +49,39 @@
|
||||
typedef struct LinuxAioState LinuxAioState;
|
||||
LinuxAioState *laio_init(Error **errp);
|
||||
void laio_cleanup(LinuxAioState *s);
|
||||
int coroutine_fn laio_co_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
|
||||
uint64_t offset, QEMUIOVector *qiov, int type,
|
||||
uint64_t dev_max_batch);
|
||||
|
||||
/* laio_co_submit: submit I/O requests in the thread's current AioContext. */
|
||||
int coroutine_fn laio_co_submit(int fd, uint64_t offset, QEMUIOVector *qiov,
|
||||
int type, uint64_t dev_max_batch);
|
||||
|
||||
void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context);
|
||||
void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context);
|
||||
void laio_io_plug(BlockDriverState *bs, LinuxAioState *s);
|
||||
void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s,
|
||||
uint64_t dev_max_batch);
|
||||
|
||||
/*
|
||||
* laio_io_plug/unplug work in the thread's current AioContext, therefore the
|
||||
* caller must ensure that they are paired in the same IOThread.
|
||||
*/
|
||||
void laio_io_plug(void);
|
||||
void laio_io_unplug(uint64_t dev_max_batch);
|
||||
#endif
|
||||
/* io_uring.c - Linux io_uring implementation */
|
||||
#ifdef CONFIG_LINUX_IO_URING
|
||||
typedef struct LuringState LuringState;
|
||||
LuringState *luring_init(Error **errp);
|
||||
void luring_cleanup(LuringState *s);
|
||||
int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
|
||||
uint64_t offset, QEMUIOVector *qiov, int type);
|
||||
|
||||
/* luring_co_submit: submit I/O requests in the thread's current AioContext. */
|
||||
int coroutine_fn luring_co_submit(BlockDriverState *bs, int fd, uint64_t offset,
|
||||
QEMUIOVector *qiov, int type);
|
||||
void luring_detach_aio_context(LuringState *s, AioContext *old_context);
|
||||
void luring_attach_aio_context(LuringState *s, AioContext *new_context);
|
||||
void luring_io_plug(BlockDriverState *bs, LuringState *s);
|
||||
void luring_io_unplug(BlockDriverState *bs, LuringState *s);
|
||||
|
||||
/*
|
||||
* luring_io_plug/unplug work in the thread's current AioContext, therefore the
|
||||
* caller must ensure that they are paired in the same IOThread.
|
||||
*/
|
||||
void luring_io_plug(void);
|
||||
void luring_io_unplug(void);
|
||||
#endif
|
||||
|
||||
#ifdef _WIN32
|
||||
|
@ -29,12 +29,15 @@ typedef struct ThreadPool ThreadPool;
|
||||
ThreadPool *thread_pool_new(struct AioContext *ctx);
|
||||
void thread_pool_free(ThreadPool *pool);
|
||||
|
||||
BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
|
||||
ThreadPoolFunc *func, void *arg,
|
||||
BlockCompletionFunc *cb, void *opaque);
|
||||
int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
|
||||
ThreadPoolFunc *func, void *arg);
|
||||
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
|
||||
/*
|
||||
* thread_pool_submit* API: submit I/O requests in the thread's
|
||||
* current AioContext.
|
||||
*/
|
||||
BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
|
||||
BlockCompletionFunc *cb, void *opaque);
|
||||
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
|
||||
void thread_pool_submit(ThreadPoolFunc *func, void *arg);
|
||||
|
||||
void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
|
||||
|
||||
#endif
|
||||
|
@ -90,6 +90,11 @@ void blk_iostatus_set_err(BlockBackend *blk, int error);
|
||||
int blk_get_max_iov(BlockBackend *blk);
|
||||
int blk_get_max_hw_iov(BlockBackend *blk);
|
||||
|
||||
/*
|
||||
* blk_io_plug/unplug are thread-local operations. This means that multiple
|
||||
* IOThreads can simultaneously call plug/unplug, but the caller must ensure
|
||||
* that each unplug() is called in the same IOThread of the matching plug().
|
||||
*/
|
||||
void coroutine_fn blk_co_io_plug(BlockBackend *blk);
|
||||
void co_wrapper blk_io_plug(BlockBackend *blk);
|
||||
|
||||
|
@ -1167,7 +1167,7 @@ void handle_hmp_command(MonitorHMP *mon, const char *cmdline)
|
||||
Coroutine *co = qemu_coroutine_create(handle_hmp_command_co, &data);
|
||||
monitor_set_cur(co, &mon->common);
|
||||
aio_co_enter(qemu_get_aio_context(), co);
|
||||
AIO_WAIT_WHILE(qemu_get_aio_context(), !data.done);
|
||||
AIO_WAIT_WHILE_UNLOCKED(NULL, !data.done);
|
||||
}
|
||||
|
||||
qobject_unref(qdict);
|
||||
|
@ -666,7 +666,7 @@ void monitor_cleanup(void)
|
||||
* We need to poll both qemu_aio_context and iohandler_ctx to make
|
||||
* sure that the dispatcher coroutine keeps making progress and
|
||||
* eventually terminates. qemu_aio_context is automatically
|
||||
* polled by calling AIO_WAIT_WHILE on it, but we must poll
|
||||
* polled by calling AIO_WAIT_WHILE_UNLOCKED on it, but we must poll
|
||||
* iohandler_ctx manually.
|
||||
*
|
||||
* Letting the iothread continue while shutting down the dispatcher
|
||||
@ -679,7 +679,7 @@ void monitor_cleanup(void)
|
||||
aio_co_wake(qmp_dispatcher_co);
|
||||
}
|
||||
|
||||
AIO_WAIT_WHILE(qemu_get_aio_context(),
|
||||
AIO_WAIT_WHILE_UNLOCKED(NULL,
|
||||
(aio_poll(iohandler_get_aio_context(), false),
|
||||
qatomic_mb_read(&qmp_dispatcher_co_busy)));
|
||||
|
||||
|
48
nbd/server.c
48
nbd/server.c
@ -1409,8 +1409,8 @@ nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int nbd_receive_request(NBDClient *client, NBDRequest *request,
|
||||
Error **errp)
|
||||
static int coroutine_fn nbd_receive_request(NBDClient *client, NBDRequest *request,
|
||||
Error **errp)
|
||||
{
|
||||
uint8_t buf[NBD_REQUEST_SIZE];
|
||||
uint32_t magic;
|
||||
@ -1893,12 +1893,12 @@ static inline void set_be_simple_reply(NBDSimpleReply *reply, uint64_t error,
|
||||
stq_be_p(&reply->handle, handle);
|
||||
}
|
||||
|
||||
static int nbd_co_send_simple_reply(NBDClient *client,
|
||||
uint64_t handle,
|
||||
uint32_t error,
|
||||
void *data,
|
||||
size_t len,
|
||||
Error **errp)
|
||||
static int coroutine_fn nbd_co_send_simple_reply(NBDClient *client,
|
||||
uint64_t handle,
|
||||
uint32_t error,
|
||||
void *data,
|
||||
size_t len,
|
||||
Error **errp)
|
||||
{
|
||||
NBDSimpleReply reply;
|
||||
int nbd_err = system_errno_to_nbd_errno(error);
|
||||
@ -2036,8 +2036,8 @@ static int coroutine_fn nbd_co_send_sparse_read(NBDClient *client,
|
||||
stl_be_p(&chunk.length, pnum);
|
||||
ret = nbd_co_send_iov(client, iov, 1, errp);
|
||||
} else {
|
||||
ret = blk_pread(exp->common.blk, offset + progress, pnum,
|
||||
data + progress, 0);
|
||||
ret = blk_co_pread(exp->common.blk, offset + progress, pnum,
|
||||
data + progress, 0);
|
||||
if (ret < 0) {
|
||||
error_setg_errno(errp, -ret, "reading from file failed");
|
||||
break;
|
||||
@ -2196,9 +2196,9 @@ static int coroutine_fn blockalloc_to_extents(BlockBackend *blk,
|
||||
* @ea is converted to BE by the function
|
||||
* @last controls whether NBD_REPLY_FLAG_DONE is sent.
|
||||
*/
|
||||
static int nbd_co_send_extents(NBDClient *client, uint64_t handle,
|
||||
NBDExtentArray *ea,
|
||||
bool last, uint32_t context_id, Error **errp)
|
||||
static int coroutine_fn
|
||||
nbd_co_send_extents(NBDClient *client, uint64_t handle, NBDExtentArray *ea,
|
||||
bool last, uint32_t context_id, Error **errp)
|
||||
{
|
||||
NBDStructuredMeta chunk;
|
||||
struct iovec iov[] = {
|
||||
@ -2275,10 +2275,10 @@ static void bitmap_to_extents(BdrvDirtyBitmap *bitmap,
|
||||
bdrv_dirty_bitmap_unlock(bitmap);
|
||||
}
|
||||
|
||||
static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle,
|
||||
BdrvDirtyBitmap *bitmap, uint64_t offset,
|
||||
uint32_t length, bool dont_fragment, bool last,
|
||||
uint32_t context_id, Error **errp)
|
||||
static int coroutine_fn nbd_co_send_bitmap(NBDClient *client, uint64_t handle,
|
||||
BdrvDirtyBitmap *bitmap, uint64_t offset,
|
||||
uint32_t length, bool dont_fragment, bool last,
|
||||
uint32_t context_id, Error **errp)
|
||||
{
|
||||
unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
|
||||
g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents);
|
||||
@ -2295,8 +2295,8 @@ static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle,
|
||||
* to the client (although the caller may still need to disconnect after
|
||||
* reporting the error).
|
||||
*/
|
||||
static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
|
||||
Error **errp)
|
||||
static int coroutine_fn nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
|
||||
Error **errp)
|
||||
{
|
||||
NBDClient *client = req->client;
|
||||
int valid_flags;
|
||||
@ -2444,7 +2444,7 @@ static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
|
||||
data, request->len, errp);
|
||||
}
|
||||
|
||||
ret = blk_pread(exp->common.blk, request->from, request->len, data, 0);
|
||||
ret = blk_co_pread(exp->common.blk, request->from, request->len, data, 0);
|
||||
if (ret < 0) {
|
||||
return nbd_send_generic_reply(client, request->handle, ret,
|
||||
"reading from file failed", errp);
|
||||
@ -2511,8 +2511,8 @@ static coroutine_fn int nbd_handle_request(NBDClient *client,
|
||||
if (request->flags & NBD_CMD_FLAG_FUA) {
|
||||
flags |= BDRV_REQ_FUA;
|
||||
}
|
||||
ret = blk_pwrite(exp->common.blk, request->from, request->len, data,
|
||||
flags);
|
||||
ret = blk_co_pwrite(exp->common.blk, request->from, request->len, data,
|
||||
flags);
|
||||
return nbd_send_generic_reply(client, request->handle, ret,
|
||||
"writing to file failed", errp);
|
||||
|
||||
@ -2527,8 +2527,8 @@ static coroutine_fn int nbd_handle_request(NBDClient *client,
|
||||
if (request->flags & NBD_CMD_FLAG_FAST_ZERO) {
|
||||
flags |= BDRV_REQ_NO_FALLBACK;
|
||||
}
|
||||
ret = blk_pwrite_zeroes(exp->common.blk, request->from, request->len,
|
||||
flags);
|
||||
ret = blk_co_pwrite_zeroes(exp->common.blk, request->from, request->len,
|
||||
flags);
|
||||
return nbd_send_generic_reply(client, request->handle, ret,
|
||||
"writing to file failed", errp);
|
||||
|
||||
|
@ -51,7 +51,6 @@ static int pr_manager_worker(void *opaque)
|
||||
int coroutine_fn pr_manager_execute(PRManager *pr_mgr, AioContext *ctx, int fd,
|
||||
struct sg_io_hdr *hdr)
|
||||
{
|
||||
ThreadPool *pool = aio_get_thread_pool(ctx);
|
||||
PRManagerData data = {
|
||||
.pr_mgr = pr_mgr,
|
||||
.fd = fd,
|
||||
@ -62,7 +61,7 @@ int coroutine_fn pr_manager_execute(PRManager *pr_mgr, AioContext *ctx, int fd,
|
||||
|
||||
/* The matching object_unref is in pr_manager_worker. */
|
||||
object_ref(OBJECT(pr_mgr));
|
||||
return thread_pool_submit_co(pool, pr_manager_worker, &data);
|
||||
return thread_pool_submit_co(pr_manager_worker, &data);
|
||||
}
|
||||
|
||||
bool pr_manager_is_connected(PRManager *pr_mgr)
|
||||
|
@ -177,10 +177,9 @@ static int do_sgio_worker(void *opaque)
|
||||
return status;
|
||||
}
|
||||
|
||||
static int do_sgio(int fd, const uint8_t *cdb, uint8_t *sense,
|
||||
uint8_t *buf, int *sz, int dir)
|
||||
static int coroutine_fn do_sgio(int fd, const uint8_t *cdb, uint8_t *sense,
|
||||
uint8_t *buf, int *sz, int dir)
|
||||
{
|
||||
ThreadPool *pool = aio_get_thread_pool(qemu_get_aio_context());
|
||||
int r;
|
||||
|
||||
PRHelperSGIOData data = {
|
||||
@ -192,7 +191,7 @@ static int do_sgio(int fd, const uint8_t *cdb, uint8_t *sense,
|
||||
.dir = dir,
|
||||
};
|
||||
|
||||
r = thread_pool_submit_co(pool, do_sgio_worker, &data);
|
||||
r = thread_pool_submit_co(do_sgio_worker, &data);
|
||||
*sz = data.sz;
|
||||
return r;
|
||||
}
|
||||
@ -320,7 +319,7 @@ static SCSISense mpath_generic_sense(int r)
|
||||
}
|
||||
}
|
||||
|
||||
static int mpath_reconstruct_sense(int fd, int r, uint8_t *sense)
|
||||
static int coroutine_fn mpath_reconstruct_sense(int fd, int r, uint8_t *sense)
|
||||
{
|
||||
switch (r) {
|
||||
case MPATH_PR_SUCCESS:
|
||||
@ -372,8 +371,8 @@ static int mpath_reconstruct_sense(int fd, int r, uint8_t *sense)
|
||||
}
|
||||
}
|
||||
|
||||
static int multipath_pr_in(int fd, const uint8_t *cdb, uint8_t *sense,
|
||||
uint8_t *data, int sz)
|
||||
static int coroutine_fn multipath_pr_in(int fd, const uint8_t *cdb, uint8_t *sense,
|
||||
uint8_t *data, int sz)
|
||||
{
|
||||
int rq_servact = cdb[1];
|
||||
struct prin_resp resp;
|
||||
@ -427,8 +426,8 @@ static int multipath_pr_in(int fd, const uint8_t *cdb, uint8_t *sense,
|
||||
return mpath_reconstruct_sense(fd, r, sense);
|
||||
}
|
||||
|
||||
static int multipath_pr_out(int fd, const uint8_t *cdb, uint8_t *sense,
|
||||
const uint8_t *param, int sz)
|
||||
static int coroutine_fn multipath_pr_out(int fd, const uint8_t *cdb, uint8_t *sense,
|
||||
const uint8_t *param, int sz)
|
||||
{
|
||||
int rq_servact = cdb[1];
|
||||
int rq_scope = cdb[2] >> 4;
|
||||
@ -545,8 +544,8 @@ static int multipath_pr_out(int fd, const uint8_t *cdb, uint8_t *sense,
|
||||
}
|
||||
#endif
|
||||
|
||||
static int do_pr_in(int fd, const uint8_t *cdb, uint8_t *sense,
|
||||
uint8_t *data, int *resp_sz)
|
||||
static int coroutine_fn do_pr_in(int fd, const uint8_t *cdb, uint8_t *sense,
|
||||
uint8_t *data, int *resp_sz)
|
||||
{
|
||||
#ifdef CONFIG_MPATH
|
||||
if (is_mpath(fd)) {
|
||||
@ -563,8 +562,8 @@ static int do_pr_in(int fd, const uint8_t *cdb, uint8_t *sense,
|
||||
SG_DXFER_FROM_DEV);
|
||||
}
|
||||
|
||||
static int do_pr_out(int fd, const uint8_t *cdb, uint8_t *sense,
|
||||
const uint8_t *param, int sz)
|
||||
static int coroutine_fn do_pr_out(int fd, const uint8_t *cdb, uint8_t *sense,
|
||||
const uint8_t *param, int sz)
|
||||
{
|
||||
int resp_sz;
|
||||
|
||||
|
@ -8,7 +8,6 @@
|
||||
#include "qemu/main-loop.h"
|
||||
|
||||
static AioContext *ctx;
|
||||
static ThreadPool *pool;
|
||||
static int active;
|
||||
|
||||
typedef struct {
|
||||
@ -47,7 +46,7 @@ static void done_cb(void *opaque, int ret)
|
||||
static void test_submit(void)
|
||||
{
|
||||
WorkerTestData data = { .n = 0 };
|
||||
thread_pool_submit(pool, worker_cb, &data);
|
||||
thread_pool_submit(worker_cb, &data);
|
||||
while (data.n == 0) {
|
||||
aio_poll(ctx, true);
|
||||
}
|
||||
@ -57,7 +56,7 @@ static void test_submit(void)
|
||||
static void test_submit_aio(void)
|
||||
{
|
||||
WorkerTestData data = { .n = 0, .ret = -EINPROGRESS };
|
||||
data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data,
|
||||
data.aiocb = thread_pool_submit_aio(worker_cb, &data,
|
||||
done_cb, &data);
|
||||
|
||||
/* The callbacks are not called until after the first wait. */
|
||||
@ -71,14 +70,14 @@ static void test_submit_aio(void)
|
||||
g_assert_cmpint(data.ret, ==, 0);
|
||||
}
|
||||
|
||||
static void co_test_cb(void *opaque)
|
||||
static void coroutine_fn co_test_cb(void *opaque)
|
||||
{
|
||||
WorkerTestData *data = opaque;
|
||||
|
||||
active = 1;
|
||||
data->n = 0;
|
||||
data->ret = -EINPROGRESS;
|
||||
thread_pool_submit_co(pool, worker_cb, data);
|
||||
thread_pool_submit_co(worker_cb, data);
|
||||
|
||||
/* The test continues in test_submit_co, after qemu_coroutine_enter... */
|
||||
|
||||
@ -122,7 +121,7 @@ static void test_submit_many(void)
|
||||
for (i = 0; i < 100; i++) {
|
||||
data[i].n = 0;
|
||||
data[i].ret = -EINPROGRESS;
|
||||
thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]);
|
||||
thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]);
|
||||
}
|
||||
|
||||
active = 100;
|
||||
@ -150,7 +149,7 @@ static void do_test_cancel(bool sync)
|
||||
for (i = 0; i < 100; i++) {
|
||||
data[i].n = 0;
|
||||
data[i].ret = -EINPROGRESS;
|
||||
data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i],
|
||||
data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i],
|
||||
done_cb, &data[i]);
|
||||
}
|
||||
|
||||
@ -235,7 +234,6 @@ int main(int argc, char **argv)
|
||||
{
|
||||
qemu_init_main_loop(&error_abort);
|
||||
ctx = qemu_get_current_aio_context();
|
||||
pool = aio_get_thread_pool(ctx);
|
||||
|
||||
g_test_init(&argc, &argv, NULL);
|
||||
g_test_add_func("/thread-pool/submit", test_submit);
|
||||
|
@ -48,7 +48,7 @@ struct ThreadPoolElement {
|
||||
/* Access to this list is protected by lock. */
|
||||
QTAILQ_ENTRY(ThreadPoolElement) reqs;
|
||||
|
||||
/* Access to this list is protected by the global mutex. */
|
||||
/* This list is only written by the thread pool's mother thread. */
|
||||
QLIST_ENTRY(ThreadPoolElement) all;
|
||||
};
|
||||
|
||||
@ -175,7 +175,6 @@ static void thread_pool_completion_bh(void *opaque)
|
||||
ThreadPool *pool = opaque;
|
||||
ThreadPoolElement *elem, *next;
|
||||
|
||||
aio_context_acquire(pool->ctx);
|
||||
restart:
|
||||
QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
|
||||
if (elem->state != THREAD_DONE) {
|
||||
@ -195,9 +194,7 @@ restart:
|
||||
*/
|
||||
qemu_bh_schedule(pool->completion_bh);
|
||||
|
||||
aio_context_release(pool->ctx);
|
||||
elem->common.cb(elem->common.opaque, elem->ret);
|
||||
aio_context_acquire(pool->ctx);
|
||||
|
||||
/* We can safely cancel the completion_bh here regardless of someone
|
||||
* else having scheduled it meanwhile because we reenter the
|
||||
@ -211,7 +208,6 @@ restart:
|
||||
qemu_aio_unref(elem);
|
||||
}
|
||||
}
|
||||
aio_context_release(pool->ctx);
|
||||
}
|
||||
|
||||
static void thread_pool_cancel(BlockAIOCB *acb)
|
||||
@ -245,11 +241,15 @@ static const AIOCBInfo thread_pool_aiocb_info = {
|
||||
.get_aio_context = thread_pool_get_aio_context,
|
||||
};
|
||||
|
||||
BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
|
||||
ThreadPoolFunc *func, void *arg,
|
||||
BlockCompletionFunc *cb, void *opaque)
|
||||
BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
|
||||
BlockCompletionFunc *cb, void *opaque)
|
||||
{
|
||||
ThreadPoolElement *req;
|
||||
AioContext *ctx = qemu_get_current_aio_context();
|
||||
ThreadPool *pool = aio_get_thread_pool(ctx);
|
||||
|
||||
/* Assert that the thread submitting work is the same running the pool */
|
||||
assert(pool->ctx == qemu_get_current_aio_context());
|
||||
|
||||
req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
|
||||
req->func = func;
|
||||
@ -284,19 +284,18 @@ static void thread_pool_co_cb(void *opaque, int ret)
|
||||
aio_co_wake(co->co);
|
||||
}
|
||||
|
||||
int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
|
||||
void *arg)
|
||||
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
|
||||
{
|
||||
ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
|
||||
assert(qemu_in_coroutine());
|
||||
thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
|
||||
thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
|
||||
qemu_coroutine_yield();
|
||||
return tpc.ret;
|
||||
}
|
||||
|
||||
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
|
||||
void thread_pool_submit(ThreadPoolFunc *func, void *arg)
|
||||
{
|
||||
thread_pool_submit_aio(pool, func, arg, NULL, NULL);
|
||||
thread_pool_submit_aio(func, arg, NULL, NULL);
|
||||
}
|
||||
|
||||
void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
|
||||
|
Loading…
Reference in New Issue
Block a user