qemu/migration/multifd.h
Peter Xu 98ea497d8b migration/multifd: Fix MultiFDSendParams.packet_num race
As reported correctly by Fabiano [1] (while per Fabiano, it sourced back to
Elena's initial report in Oct 2023), MultiFDSendParams.packet_num is buggy
to be assigned and stored.  Consider two consequent operations of: (1)
queue a job into multifd send thread X, then (2) queue another sync request
to the same send thread X.  Then the MultiFDSendParams.packet_num will be
assigned twice, and the first assignment can get lost already.

To avoid that, we move the packet_num assignment from p->packet_num into
where the thread will fill in the packet.  Use atomic operations to protect
the field, making sure there's no race.

Note that atomic fetch_add() may not be good for scaling purposes, however
multifd should be fine as number of threads should normally not go beyond
16 threads.  Let's leave that concern for later but fix the issue first.

There's also a trick on how to make it always work even on 32 bit hosts for
uint64_t packet number.  Switching to uintptr_t as of now to simply the
case.  It will cause packet number to overflow easier on 32 bit, but that
shouldn't be a major concern for now as 32 bit systems is not the major
audience for any performance concerns like what multifd wants to address.

We also need to move multifd_send_state definition upper, so that
multifd_send_fill_packet() can reference it.

[1] https://lore.kernel.org/r/87o7d1jlu5.fsf@suse.de

Reported-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
Reviewed-by: Fabiano Rosas <farosas@suse.de>
Link: https://lore.kernel.org/r/20240202102857.110210-23-peterx@redhat.com
Signed-off-by: Peter Xu <peterx@redhat.com>
2024-02-05 14:42:11 +08:00

221 lines
6.3 KiB
C

/*
* Multifd common functions
*
* Copyright (c) 2019-2020 Red Hat Inc
*
* Authors:
* Juan Quintela <quintela@redhat.com>
*
* This work is licensed under the terms of the GNU GPL, version 2 or later.
* See the COPYING file in the top-level directory.
*/
#ifndef QEMU_MIGRATION_MULTIFD_H
#define QEMU_MIGRATION_MULTIFD_H
int multifd_send_setup(Error **errp);
void multifd_send_shutdown(void);
int multifd_recv_setup(Error **errp);
void multifd_recv_cleanup(void);
void multifd_recv_shutdown(void);
bool multifd_recv_all_channels_created(void);
void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
void multifd_recv_sync_main(void);
int multifd_send_sync_main(void);
bool multifd_queue_page(RAMBlock *block, ram_addr_t offset);
/* Multifd Compression flags */
#define MULTIFD_FLAG_SYNC (1 << 0)
/* We reserve 3 bits for compression methods */
#define MULTIFD_FLAG_COMPRESSION_MASK (7 << 1)
/* we need to be compatible. Before compression value was 0 */
#define MULTIFD_FLAG_NOCOMP (0 << 1)
#define MULTIFD_FLAG_ZLIB (1 << 1)
#define MULTIFD_FLAG_ZSTD (2 << 1)
/* This value needs to be a multiple of qemu_target_page_size() */
#define MULTIFD_PACKET_SIZE (512 * 1024)
typedef struct {
uint32_t magic;
uint32_t version;
uint32_t flags;
/* maximum number of allocated pages */
uint32_t pages_alloc;
/* non zero pages */
uint32_t normal_pages;
/* size of the next packet that contains pages */
uint32_t next_packet_size;
uint64_t packet_num;
uint64_t unused[4]; /* Reserved for future use */
char ramblock[256];
uint64_t offset[];
} __attribute__((packed)) MultiFDPacket_t;
typedef struct {
/* number of used pages */
uint32_t num;
/* number of allocated pages */
uint32_t allocated;
/* offset of each page */
ram_addr_t *offset;
RAMBlock *block;
} MultiFDPages_t;
typedef struct {
/* Fields are only written at creating/deletion time */
/* No lock required for them, they are read only */
/* channel number */
uint8_t id;
/* channel thread name */
char *name;
/* channel thread id */
QemuThread thread;
/* communication channel */
QIOChannel *c;
/* is the yank function registered */
bool registered_yank;
/* packet allocated len */
uint32_t packet_len;
/* guest page size */
uint32_t page_size;
/* number of pages in a full packet */
uint32_t page_count;
/* multifd flags for sending ram */
int write_flags;
/* sem where to wait for more work */
QemuSemaphore sem;
/* syncs main thread and channels */
QemuSemaphore sem_sync;
/* this mutex protects the following parameters */
QemuMutex mutex;
/* is this channel thread running */
bool running;
/* multifd flags for each packet */
uint32_t flags;
/*
* The sender thread has work to do if either of below boolean is set.
*
* @pending_job: a job is pending
* @pending_sync: a sync request is pending
*
* For both of these fields, they're only set by the requesters, and
* cleared by the multifd sender threads.
*/
bool pending_job;
bool pending_sync;
/* array of pages to sent.
* The owner of 'pages' depends of 'pending_job' value:
* pending_job == 0 -> migration_thread can use it.
* pending_job != 0 -> multifd_channel can use it.
*/
MultiFDPages_t *pages;
/* thread local variables. No locking required */
/* pointer to the packet */
MultiFDPacket_t *packet;
/* size of the next packet that contains pages */
uint32_t next_packet_size;
/* packets sent through this channel */
uint64_t packets_sent;
/* non zero pages sent through this channel */
uint64_t total_normal_pages;
/* buffers to send */
struct iovec *iov;
/* number of iovs used */
uint32_t iovs_num;
/* used for compression methods */
void *data;
} MultiFDSendParams;
typedef struct {
/* Fields are only written at creating/deletion time */
/* No lock required for them, they are read only */
/* channel number */
uint8_t id;
/* channel thread name */
char *name;
/* channel thread id */
QemuThread thread;
/* communication channel */
QIOChannel *c;
/* packet allocated len */
uint32_t packet_len;
/* guest page size */
uint32_t page_size;
/* number of pages in a full packet */
uint32_t page_count;
/* syncs main thread and channels */
QemuSemaphore sem_sync;
/* this mutex protects the following parameters */
QemuMutex mutex;
/* is this channel thread running */
bool running;
/* should this thread finish */
bool quit;
/* multifd flags for each packet */
uint32_t flags;
/* global number of generated multifd packets */
uint64_t packet_num;
/* thread local variables. No locking required */
/* pointer to the packet */
MultiFDPacket_t *packet;
/* size of the next packet that contains pages */
uint32_t next_packet_size;
/* packets received through this channel */
uint64_t packets_recved;
/* ramblock */
RAMBlock *block;
/* ramblock host address */
uint8_t *host;
/* non zero pages recv through this channel */
uint64_t total_normal_pages;
/* buffers to recv */
struct iovec *iov;
/* Pages that are not zero */
ram_addr_t *normal;
/* num of non zero pages */
uint32_t normal_num;
/* used for de-compression methods */
void *data;
} MultiFDRecvParams;
typedef struct {
/* Setup for sending side */
int (*send_setup)(MultiFDSendParams *p, Error **errp);
/* Cleanup for sending side */
void (*send_cleanup)(MultiFDSendParams *p, Error **errp);
/* Prepare the send packet */
int (*send_prepare)(MultiFDSendParams *p, Error **errp);
/* Setup for receiving side */
int (*recv_setup)(MultiFDRecvParams *p, Error **errp);
/* Cleanup for receiving side */
void (*recv_cleanup)(MultiFDRecvParams *p);
/* Read all pages */
int (*recv_pages)(MultiFDRecvParams *p, Error **errp);
} MultiFDMethods;
void multifd_register_ops(int method, MultiFDMethods *ops);
void multifd_send_fill_packet(MultiFDSendParams *p);
static inline void multifd_send_prepare_header(MultiFDSendParams *p)
{
p->iov[0].iov_len = p->packet_len;
p->iov[0].iov_base = p->packet;
p->iovs_num++;
}
#endif