migration/multifd: Cleanup multifd_save_cleanup()

Shrink the function by moving relevant works into helpers: move the thread
join()s into multifd_send_terminate_threads(), then create two more helpers
to cover channel/state cleanups.

Add a TODO entry for the thread terminate process because p->running is
still buggy.  We need to fix it at some point but not yet covered.

Suggested-by: Fabiano Rosas <farosas@suse.de>
Reviewed-by: Fabiano Rosas <farosas@suse.de>
Link: https://lore.kernel.org/r/20240202102857.110210-20-peterx@redhat.com
Signed-off-by: Peter Xu <peterx@redhat.com>
This commit is contained in:
Peter Xu 2024-02-02 18:28:53 +08:00
parent f88f86c4ee
commit 12808db3b8

View File

@ -593,6 +593,11 @@ static void multifd_send_terminate_threads(void)
* always set it. * always set it.
*/ */
qatomic_set(&multifd_send_state->exiting, 1); qatomic_set(&multifd_send_state->exiting, 1);
/*
* Firstly, kick all threads out; no matter whether they are just idle,
* or blocked in an IO system call.
*/
for (i = 0; i < migrate_multifd_channels(); i++) { for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i]; MultiFDSendParams *p = &multifd_send_state->params[i];
@ -601,6 +606,21 @@ static void multifd_send_terminate_threads(void)
qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
} }
} }
/*
* Finally recycle all the threads.
*
* TODO: p->running is still buggy, e.g. we can reach here without the
* corresponding multifd_new_send_channel_async() get invoked yet,
* then a new thread can even be created after this function returns.
*/
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
if (p->running) {
qemu_thread_join(&p->thread);
}
}
} }
static int multifd_send_channel_destroy(QIOChannel *send) static int multifd_send_channel_destroy(QIOChannel *send)
@ -608,48 +628,32 @@ static int multifd_send_channel_destroy(QIOChannel *send)
return socket_send_channel_destroy(send); return socket_send_channel_destroy(send);
} }
void multifd_save_cleanup(void) static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
{ {
int i; if (p->registered_yank) {
migration_ioc_unregister_yank(p->c);
if (!migrate_multifd()) {
return;
} }
multifd_send_terminate_threads(); multifd_send_channel_destroy(p->c);
for (i = 0; i < migrate_multifd_channels(); i++) { p->c = NULL;
MultiFDSendParams *p = &multifd_send_state->params[i]; qemu_mutex_destroy(&p->mutex);
qemu_sem_destroy(&p->sem);
qemu_sem_destroy(&p->sem_sync);
g_free(p->name);
p->name = NULL;
multifd_pages_clear(p->pages);
p->pages = NULL;
p->packet_len = 0;
g_free(p->packet);
p->packet = NULL;
g_free(p->iov);
p->iov = NULL;
multifd_send_state->ops->send_cleanup(p, errp);
if (p->running) { return *errp == NULL;
qemu_thread_join(&p->thread); }
}
}
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
Error *local_err = NULL;
if (p->registered_yank) { static void multifd_send_cleanup_state(void)
migration_ioc_unregister_yank(p->c); {
}
multifd_send_channel_destroy(p->c);
p->c = NULL;
qemu_mutex_destroy(&p->mutex);
qemu_sem_destroy(&p->sem);
qemu_sem_destroy(&p->sem_sync);
g_free(p->name);
p->name = NULL;
multifd_pages_clear(p->pages);
p->pages = NULL;
p->packet_len = 0;
g_free(p->packet);
p->packet = NULL;
g_free(p->iov);
p->iov = NULL;
multifd_send_state->ops->send_cleanup(p, &local_err);
if (local_err) {
migrate_set_error(migrate_get_current(), local_err);
error_free(local_err);
}
}
qemu_sem_destroy(&multifd_send_state->channels_ready); qemu_sem_destroy(&multifd_send_state->channels_ready);
g_free(multifd_send_state->params); g_free(multifd_send_state->params);
multifd_send_state->params = NULL; multifd_send_state->params = NULL;
@ -659,6 +663,29 @@ void multifd_save_cleanup(void)
multifd_send_state = NULL; multifd_send_state = NULL;
} }
void multifd_save_cleanup(void)
{
int i;
if (!migrate_multifd()) {
return;
}
multifd_send_terminate_threads();
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
Error *local_err = NULL;
if (!multifd_send_cleanup_channel(p, &local_err)) {
migrate_set_error(migrate_get_current(), local_err);
error_free(local_err);
}
}
multifd_send_cleanup_state();
}
static int multifd_zero_copy_flush(QIOChannel *c) static int multifd_zero_copy_flush(QIOChannel *c)
{ {
int ret; int ret;