diff --git a/net/colo-compare.c b/net/colo-compare.c index eee33b87a8..b3f35d729a 100644 --- a/net/colo-compare.c +++ b/net/colo-compare.c @@ -29,6 +29,7 @@ #include "qemu/sockets.h" #include "qapi-visit.h" #include "net/colo.h" +#include "sysemu/iothread.h" #define TYPE_COLO_COMPARE "colo-compare" #define COLO_COMPARE(obj) \ @@ -82,11 +83,10 @@ typedef struct CompareState { GQueue conn_list; /* Record the connection without repetition */ GHashTable *connection_track_table; - /* This thread just do packet compare job */ - QemuThread thread; + IOThread *iothread; GMainContext *worker_context; - GMainLoop *compare_loop; + QEMUTimer *packet_check_timer; } CompareState; typedef struct CompareClass { @@ -615,22 +615,40 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size) * Check old packet regularly so it can watch for any packets * that the secondary hasn't produced equivalents of. */ -static gboolean check_old_packet_regular(void *opaque) +static void check_old_packet_regular(void *opaque) { CompareState *s = opaque; /* if have old packet we will notify checkpoint */ colo_old_packet_check(s); - - return TRUE; + timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) + + REGULAR_PACKET_CHECK_MS); } -static void *colo_compare_thread(void *opaque) +static void colo_compare_timer_init(CompareState *s) { - CompareState *s = opaque; - GSource *timeout_source; + AioContext *ctx = iothread_get_aio_context(s->iothread); - s->worker_context = g_main_context_new(); + s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL, + SCALE_MS, check_old_packet_regular, + s); + timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) + + REGULAR_PACKET_CHECK_MS); +} + +static void colo_compare_timer_del(CompareState *s) +{ + if (s->packet_check_timer) { + timer_del(s->packet_check_timer); + timer_free(s->packet_check_timer); + s->packet_check_timer = NULL; + } + } + +static void colo_compare_iothread(CompareState *s) +{ + object_ref(OBJECT(s->iothread)); + s->worker_context = iothread_get_g_main_context(s->iothread); qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read, compare_pri_chr_in, NULL, NULL, @@ -639,20 +657,7 @@ static void *colo_compare_thread(void *opaque) compare_sec_chr_in, NULL, NULL, s, s->worker_context, true); - s->compare_loop = g_main_loop_new(s->worker_context, FALSE); - - /* To kick any packets that the secondary doesn't match */ - timeout_source = g_timeout_source_new(REGULAR_PACKET_CHECK_MS); - g_source_set_callback(timeout_source, - (GSourceFunc)check_old_packet_regular, s, NULL); - g_source_attach(timeout_source, s->worker_context); - - g_main_loop_run(s->compare_loop); - - g_source_unref(timeout_source); - g_main_loop_unref(s->compare_loop); - g_main_context_unref(s->worker_context); - return NULL; + colo_compare_timer_init(s); } static char *compare_get_pri_indev(Object *obj, Error **errp) @@ -777,12 +782,10 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) { CompareState *s = COLO_COMPARE(uc); Chardev *chr; - char thread_name[64]; - static int compare_id; - if (!s->pri_indev || !s->sec_indev || !s->outdev) { + if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) { error_setg(errp, "colo compare needs 'primary_in' ," - "'secondary_in','outdev' property set"); + "'secondary_in','outdev','iothread' property set"); return; } else if (!strcmp(s->pri_indev, s->outdev) || !strcmp(s->sec_indev, s->outdev) || @@ -817,12 +820,7 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) g_free, connection_destroy); - sprintf(thread_name, "colo-compare %d", compare_id); - qemu_thread_create(&s->thread, thread_name, - colo_compare_thread, s, - QEMU_THREAD_JOINABLE); - compare_id++; - + colo_compare_iothread(s); return; } @@ -866,6 +864,10 @@ static void colo_compare_init(Object *obj) object_property_add_str(obj, "outdev", compare_get_outdev, compare_set_outdev, NULL); + object_property_add_link(obj, "iothread", TYPE_IOTHREAD, + (Object **)&s->iothread, + object_property_allow_set_link, + OBJ_PROP_LINK_UNREF_ON_RELEASE, NULL); s->vnet_hdr = false; object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr, @@ -879,16 +881,21 @@ static void colo_compare_finalize(Object *obj) qemu_chr_fe_deinit(&s->chr_pri_in, false); qemu_chr_fe_deinit(&s->chr_sec_in, false); qemu_chr_fe_deinit(&s->chr_out, false); - - g_main_loop_quit(s->compare_loop); - qemu_thread_join(&s->thread); - + if (s->iothread) { + colo_compare_timer_del(s); + } /* Release all unhandled packets after compare thead exited */ g_queue_foreach(&s->conn_list, colo_flush_packets, s); g_queue_clear(&s->conn_list); - g_hash_table_destroy(s->connection_track_table); + if (s->connection_track_table) { + g_hash_table_destroy(s->connection_track_table); + } + + if (s->iothread) { + object_unref(OBJECT(s->iothread)); + } g_free(s->pri_indev); g_free(s->sec_indev); g_free(s->outdev);