-----BEGIN PGP SIGNATURE-----

Version: GnuPG v1
 
 iQEcBAABAgAGBQJbyUxzAAoJEO8Ells5jWIR24kIAKgKwm2Y4DU5vvaFVBzaK0/F
 97aUnnXDGQ9FjBC6xG2Eo99fCRCbQ3AGHKlFR/sdPhK/5RaV6Zes7DJuYGlmE5wk
 u+nieFDL3d+B3kmoJI2Uy2h2dQEtGwTGvLi+WnjrDteCIDAapiaN1ZWOnN5OvP3a
 c903tU1l5oZsgr/4SGONXn86dw6JaWR8VD+GW6yFQgiB4+icGxMJrLTFTG2Ef4RG
 5WFwqmFlTddTWWdcGDA+myLVLgakWwkxmFK75doeNDcNBjazZ+V8b0PH2Ph+aGXZ
 9AGH2l8W2xPs1TLTJFUbya+2XnRbOTiFl6klVUZvDH3/74nY+kOVtPpljFjiBn0=
 =AhoN
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'remotes/jasowang/tags/net-pull-request' into staging

# gpg: Signature made Fri 19 Oct 2018 04:16:03 BST
# gpg:                using RSA key EF04965B398D6211
# gpg: Good signature from "Jason Wang (Jason Wang on RedHat) <jasowang@redhat.com>"
# gpg: WARNING: This key is not certified with sufficiently trusted signatures!
# gpg:          It is not certain that the signature belongs to the owner.
# Primary key fingerprint: 215D 46F4 8246 689E C77F  3562 EF04 965B 398D 6211

* remotes/jasowang/tags/net-pull-request: (26 commits)
  qemu-options: Fix bad "macaddr" property in the documentation
  e1000: indicate dropped packets in HW counters
  net: ignore packet size greater than INT_MAX
  pcnet: fix possible buffer overflow
  rtl8139: fix possible out of bound access
  ne2000: fix possible out of bound access in ne2000_receive
  clean up callback when del virtqueue
  docs: Add COLO status diagram to COLO-FT.txt
  COLO: quick failover process by kick COLO thread
  COLO: notify net filters about checkpoint/failover event
  filter-rewriter: handle checkpoint and failover event
  filter: Add handle_event method for NetFilterClass
  COLO: flush host dirty ram from cache
  savevm: split the process of different stages for loadvm/savevm
  qapi: Add new command to query colo status
  qapi/migration.json: Rename COLO unknown mode to none mode.
  qmp event: Add COLO_EXIT event to notify users while exited COLO
  COLO: Flush memory data from ram cache
  ram/COLO: Record the dirty pages that SVM received
  COLO: Load dirty pages into SVM's RAM cache firstly
  ...

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
Peter Maydell 2018-10-19 15:30:40 +01:00
commit 784c2e4f23
30 changed files with 963 additions and 157 deletions

View File

@ -110,6 +110,40 @@ Note:
HeartBeat has not been implemented yet, so you need to trigger failover process
by using 'x-colo-lost-heartbeat' command.
== COLO operation status ==
+-----------------+
| |
| Start COLO |
| |
+--------+--------+
|
| Main qmp command:
| migrate-set-capabilities with x-colo
| migrate
|
v
+--------+--------+
| |
| COLO running |
| |
+--------+--------+
|
| Main qmp command:
| x-colo-lost-heartbeat
| or
| some error happened
v
+--------+--------+
| | send qmp event:
| COLO failover | COLO_EXIT
| |
+-----------------+
COLO use the qmp command to switch and report operation status.
The diagram just shows the main qmp command, you can get the detail
in test procedure.
== Test procedure ==
1. Startup qemu
Primary:

View File

@ -36,6 +36,7 @@
#include "qemu/range.h"
#include "e1000x_common.h"
#include "trace.h"
static const uint8_t bcast[] = {0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
@ -847,6 +848,15 @@ static uint64_t rx_desc_base(E1000State *s)
return (bah << 32) + bal;
}
static void
e1000_receiver_overrun(E1000State *s, size_t size)
{
trace_e1000_receiver_overrun(size, s->mac_reg[RDH], s->mac_reg[RDT]);
e1000x_inc_reg_if_not_full(s->mac_reg, RNBC);
e1000x_inc_reg_if_not_full(s->mac_reg, MPC);
set_ics(s, 0, E1000_ICS_RXO);
}
static ssize_t
e1000_receive_iov(NetClientState *nc, const struct iovec *iov, int iovcnt)
{
@ -916,7 +926,7 @@ e1000_receive_iov(NetClientState *nc, const struct iovec *iov, int iovcnt)
desc_offset = 0;
total_size = size + e1000x_fcs_len(s->mac_reg);
if (!e1000_has_rxbufs(s, total_size)) {
set_ics(s, 0, E1000_ICS_RXO);
e1000_receiver_overrun(s, total_size);
return -1;
}
do {
@ -969,7 +979,7 @@ e1000_receive_iov(NetClientState *nc, const struct iovec *iov, int iovcnt)
rdh_start >= s->mac_reg[RDLEN] / sizeof(desc)) {
DBGOUT(RXERR, "RDH wraparound @%x, RDT %x, RDLEN %x\n",
rdh_start, s->mac_reg[RDT], s->mac_reg[RDLEN]);
set_ics(s, 0, E1000_ICS_RXO);
e1000_receiver_overrun(s, total_size);
return -1;
}
} while (desc_offset < total_size);

View File

@ -174,7 +174,7 @@ static int ne2000_buffer_full(NE2000State *s)
ssize_t ne2000_receive(NetClientState *nc, const uint8_t *buf, size_t size_)
{
NE2000State *s = qemu_get_nic_opaque(nc);
int size = size_;
size_t size = size_;
uint8_t *p;
unsigned int total_len, next, avail, len, index, mcast_idx;
uint8_t buf1[60];
@ -182,7 +182,7 @@ ssize_t ne2000_receive(NetClientState *nc, const uint8_t *buf, size_t size_)
{ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff };
#if defined(DEBUG_NE2000)
printf("NE2000: received len=%d\n", size);
printf("NE2000: received len=%zu\n", size);
#endif
if (s->cmd & E8390_STOP || ne2000_buffer_full(s))

View File

@ -988,14 +988,14 @@ ssize_t pcnet_receive(NetClientState *nc, const uint8_t *buf, size_t size_)
uint8_t buf1[60];
int remaining;
int crc_err = 0;
int size = size_;
size_t size = size_;
if (CSR_DRX(s) || CSR_STOP(s) || CSR_SPND(s) || !size ||
(CSR_LOOP(s) && !s->looptest)) {
return -1;
}
#ifdef PCNET_DEBUG
printf("pcnet_receive size=%d\n", size);
printf("pcnet_receive size=%zu\n", size);
#endif
/* if too small buffer, then expand it */

View File

@ -817,7 +817,7 @@ static ssize_t rtl8139_do_receive(NetClientState *nc, const uint8_t *buf, size_t
RTL8139State *s = qemu_get_nic_opaque(nc);
PCIDevice *d = PCI_DEVICE(s);
/* size is the length of the buffer passed to the driver */
int size = size_;
size_t size = size_;
const uint8_t *dot1q_buf = NULL;
uint32_t packet_header = 0;
@ -826,7 +826,7 @@ static ssize_t rtl8139_do_receive(NetClientState *nc, const uint8_t *buf, size_t
static const uint8_t broadcast_macaddr[6] =
{ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff };
DPRINTF(">>> received len=%d\n", size);
DPRINTF(">>> received len=%zu\n", size);
/* test if board clock is stopped */
if (!s->clock_enabled)
@ -1035,7 +1035,7 @@ static ssize_t rtl8139_do_receive(NetClientState *nc, const uint8_t *buf, size_t
if (size+4 > rx_space)
{
DPRINTF("C+ Rx mode : descriptor %d size %d received %d + 4\n",
DPRINTF("C+ Rx mode : descriptor %d size %d received %zu + 4\n",
descriptor, rx_space, size);
s->IntrStatus |= RxOverflow;
@ -1148,7 +1148,7 @@ static ssize_t rtl8139_do_receive(NetClientState *nc, const uint8_t *buf, size_t
if (avail != 0 && RX_ALIGN(size + 8) >= avail)
{
DPRINTF("rx overflow: rx buffer length %d head 0x%04x "
"read 0x%04x === available 0x%04x need 0x%04x\n",
"read 0x%04x === available 0x%04x need 0x%04zx\n",
s->RxBufferSize, s->RxBufAddr, s->RxBufPtr, avail, size + 8);
s->IntrStatus |= RxOverflow;

View File

@ -98,6 +98,9 @@ net_rx_pkt_rss_ip6_ex(void) "Calculating IPv6/EX RSS hash"
net_rx_pkt_rss_hash(size_t rss_length, uint32_t rss_hash) "RSS hash for %zu bytes: 0x%X"
net_rx_pkt_rss_add_chunk(void* ptr, size_t size, size_t input_offset) "Add RSS chunk %p, %zu bytes, RSS input offset %zu bytes"
# hw/net/e1000.c
e1000_receiver_overrun(size_t s, uint32_t rdh, uint32_t rdt) "Receiver overrun: dropped packet of %zu bytes, RDH=%u, RDT=%u"
# hw/net/e1000x_common.c
e1000x_rx_can_recv_disabled(bool link_up, bool rx_enabled, bool pci_master) "link_up: %d, rx_enabled %d, pci_master %d"
e1000x_vlan_is_vlan_pkt(bool is_vlan_pkt, uint16_t eth_proto, uint16_t vet) "Is VLAN packet: %d, ETH proto: 0x%X, VET: 0x%X"

View File

@ -1611,6 +1611,8 @@ void virtio_del_queue(VirtIODevice *vdev, int n)
vdev->vq[n].vring.num = 0;
vdev->vq[n].vring.num_default = 0;
vdev->vq[n].handle_output = NULL;
vdev->vq[n].handle_aio_output = NULL;
}
static void virtio_set_isr(VirtIODevice *vdev, int value)

View File

@ -27,6 +27,7 @@ struct RAMBlock {
struct rcu_head rcu;
struct MemoryRegion *mr;
uint8_t *host;
uint8_t *colo_cache; /* For colo, VM's ram cache */
ram_addr_t offset;
ram_addr_t used_length;
ram_addr_t max_length;

View File

@ -16,14 +16,21 @@
#include "qemu-common.h"
#include "qapi/qapi-types-migration.h"
enum colo_event {
COLO_EVENT_NONE,
COLO_EVENT_CHECKPOINT,
COLO_EVENT_FAILOVER,
};
void colo_info_init(void);
void migrate_start_colo_process(MigrationState *s);
bool migration_in_colo_state(void);
/* loadvm */
bool migration_incoming_enable_colo(void);
void migration_incoming_exit_colo(void);
void migration_incoming_enable_colo(void);
void migration_incoming_disable_colo(void);
bool migration_incoming_colo_enabled(void);
void *colo_process_incoming_thread(void *opaque);
bool migration_incoming_in_colo_state(void);

View File

@ -38,6 +38,8 @@ typedef ssize_t (FilterReceiveIOV)(NetFilterState *nc,
typedef void (FilterStatusChanged) (NetFilterState *nf, Error **errp);
typedef void (FilterHandleEvent) (NetFilterState *nf, int event, Error **errp);
typedef struct NetFilterClass {
ObjectClass parent_class;
@ -45,6 +47,7 @@ typedef struct NetFilterClass {
FilterSetup *setup;
FilterCleanup *cleanup;
FilterStatusChanged *status_changed;
FilterHandleEvent *handle_event;
/* mandatory */
FilterReceiveIOV *receive_iov;
} NetFilterClass;
@ -77,4 +80,6 @@ ssize_t qemu_netfilter_pass_to_next(NetClientState *sender,
int iovcnt,
void *opaque);
void colo_notify_filters_event(int event, Error **errp);
#endif /* QEMU_NET_FILTER_H */

View File

@ -1,6 +1,6 @@
common-obj-y += migration.o socket.o fd.o exec.o
common-obj-y += tls.o channel.o savevm.o
common-obj-y += colo-comm.o colo.o colo-failover.o
common-obj-y += colo.o colo-failover.o
common-obj-y += vmstate.o vmstate-types.o page_cache.o
common-obj-y += qemu-file.o global_state.o
common-obj-y += qemu-file-channel.o

View File

@ -1,76 +0,0 @@
/*
* COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
* (a.k.a. Fault Tolerance or Continuous Replication)
*
* Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
* Copyright (c) 2016 FUJITSU LIMITED
* Copyright (c) 2016 Intel Corporation
*
* This work is licensed under the terms of the GNU GPL, version 2 or
* later. See the COPYING file in the top-level directory.
*
*/
#include "qemu/osdep.h"
#include "migration.h"
#include "migration/colo.h"
#include "migration/vmstate.h"
#include "trace.h"
typedef struct {
bool colo_requested;
} COLOInfo;
static COLOInfo colo_info;
COLOMode get_colo_mode(void)
{
if (migration_in_colo_state()) {
return COLO_MODE_PRIMARY;
} else if (migration_incoming_in_colo_state()) {
return COLO_MODE_SECONDARY;
} else {
return COLO_MODE_UNKNOWN;
}
}
static int colo_info_pre_save(void *opaque)
{
COLOInfo *s = opaque;
s->colo_requested = migrate_colo_enabled();
return 0;
}
static bool colo_info_need(void *opaque)
{
return migrate_colo_enabled();
}
static const VMStateDescription colo_state = {
.name = "COLOState",
.version_id = 1,
.minimum_version_id = 1,
.pre_save = colo_info_pre_save,
.needed = colo_info_need,
.fields = (VMStateField[]) {
VMSTATE_BOOL(colo_requested, COLOInfo),
VMSTATE_END_OF_LIST()
},
};
void colo_info_init(void)
{
vmstate_register(NULL, 0, &colo_state, &colo_info);
}
bool migration_incoming_enable_colo(void)
{
return colo_info.colo_requested;
}
void migration_incoming_exit_colo(void)
{
colo_info.colo_requested = false;
}

View File

@ -77,7 +77,7 @@ FailoverStatus failover_get_state(void)
void qmp_x_colo_lost_heartbeat(Error **errp)
{
if (get_colo_mode() == COLO_MODE_UNKNOWN) {
if (get_colo_mode() == COLO_MODE_NONE) {
error_setg(errp, QERR_FEATURE_DISABLED, "colo");
return;
}

View File

@ -25,8 +25,16 @@
#include "qemu/error-report.h"
#include "migration/failover.h"
#include "replication.h"
#include "net/colo-compare.h"
#include "net/colo.h"
#include "block/block.h"
#include "qapi/qapi-events-migration.h"
#include "qapi/qmp/qerror.h"
#include "sysemu/cpus.h"
#include "net/filter.h"
static bool vmstate_loading;
static Notifier packets_compare_notifier;
#define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
@ -53,6 +61,7 @@ static void secondary_vm_do_failover(void)
{
int old_state;
MigrationIncomingState *mis = migration_incoming_get_current();
Error *local_err = NULL;
/* Can not do failover during the process of VM's loading VMstate, Or
* it will break the secondary VM.
@ -70,6 +79,17 @@ static void secondary_vm_do_failover(void)
migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
MIGRATION_STATUS_COMPLETED);
replication_stop_all(true, &local_err);
if (local_err) {
error_report_err(local_err);
}
/* Notify all filters of all NIC to do checkpoint */
colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
if (local_err) {
error_report_err(local_err);
}
if (!autostart) {
error_report("\"-S\" qemu option will be ignored in secondary side");
/* recover runstate to normal migration finish state */
@ -107,9 +127,15 @@ static void primary_vm_do_failover(void)
{
MigrationState *s = migrate_get_current();
int old_state;
Error *local_err = NULL;
migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
MIGRATION_STATUS_COMPLETED);
/*
* kick COLO thread which might wait at
* qemu_sem_wait(&s->colo_checkpoint_sem).
*/
colo_checkpoint_notify(migrate_get_current());
/*
* Wake up COLO thread which may blocked in recv() or send(),
@ -130,10 +156,28 @@ static void primary_vm_do_failover(void)
FailoverStatus_str(old_state));
return;
}
replication_stop_all(true, &local_err);
if (local_err) {
error_report_err(local_err);
local_err = NULL;
}
/* Notify COLO thread that failover work is finished */
qemu_sem_post(&s->colo_exit_sem);
}
COLOMode get_colo_mode(void)
{
if (migration_in_colo_state()) {
return COLO_MODE_PRIMARY;
} else if (migration_incoming_in_colo_state()) {
return COLO_MODE_SECONDARY;
} else {
return COLO_MODE_NONE;
}
}
void colo_do_failover(MigrationState *s)
{
/* Make sure VM stopped while failover happened. */
@ -207,6 +251,26 @@ void qmp_xen_colo_do_checkpoint(Error **errp)
#endif
}
COLOStatus *qmp_query_colo_status(Error **errp)
{
COLOStatus *s = g_new0(COLOStatus, 1);
s->mode = get_colo_mode();
switch (failover_get_state()) {
case FAILOVER_STATUS_NONE:
s->reason = COLO_EXIT_REASON_NONE;
break;
case FAILOVER_STATUS_REQUIRE:
s->reason = COLO_EXIT_REASON_REQUEST;
break;
default:
s->reason = COLO_EXIT_REASON_ERROR;
}
return s;
}
static void colo_send_message(QEMUFile *f, COLOMessage msg,
Error **errp)
{
@ -343,20 +407,41 @@ static int colo_do_checkpoint_transaction(MigrationState *s,
goto out;
}
/* Disable block migration */
migrate_set_block_enabled(false, &local_err);
qemu_savevm_state_header(fb);
qemu_savevm_state_setup(fb);
qemu_mutex_lock_iothread();
qemu_savevm_state_complete_precopy(fb, false, false);
qemu_mutex_unlock_iothread();
qemu_fflush(fb);
colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
if (local_err) {
goto out;
}
/* Disable block migration */
migrate_set_block_enabled(false, &local_err);
qemu_mutex_lock_iothread();
replication_do_checkpoint_all(&local_err);
if (local_err) {
qemu_mutex_unlock_iothread();
goto out;
}
colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
if (local_err) {
qemu_mutex_unlock_iothread();
goto out;
}
/* Note: device state is saved into buffer */
ret = qemu_save_device_state(fb);
qemu_mutex_unlock_iothread();
if (ret < 0) {
goto out;
}
/*
* Only save VM's live state, which not including device state.
* TODO: We may need a timeout mechanism to prevent COLO process
* to be blocked here.
*/
qemu_savevm_live_state(s->to_dst_file);
qemu_fflush(fb);
/*
* We need the size of the VMstate data in Secondary side,
* With which we can decide how much data should be read.
@ -400,6 +485,11 @@ out:
return ret;
}
static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
{
colo_checkpoint_notify(data);
}
static void colo_process_checkpoint(MigrationState *s)
{
QIOChannelBuffer *bioc;
@ -416,6 +506,9 @@ static void colo_process_checkpoint(MigrationState *s)
goto out;
}
packets_compare_notifier.notify = colo_compare_notify_checkpoint;
colo_compare_register_notifier(&packets_compare_notifier);
/*
* Wait for Secondary finish loading VM states and enter COLO
* restore.
@ -430,6 +523,12 @@ static void colo_process_checkpoint(MigrationState *s)
object_unref(OBJECT(bioc));
qemu_mutex_lock_iothread();
replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
if (local_err) {
qemu_mutex_unlock_iothread();
goto out;
}
vm_start();
qemu_mutex_unlock_iothread();
trace_colo_vm_state_change("stop", "run");
@ -445,6 +544,9 @@ static void colo_process_checkpoint(MigrationState *s)
qemu_sem_wait(&s->colo_checkpoint_sem);
if (s->state != MIGRATION_STATUS_COLO) {
goto out;
}
ret = colo_do_checkpoint_transaction(s, bioc, fb);
if (ret < 0) {
goto out;
@ -461,11 +563,38 @@ out:
qemu_fclose(fb);
}
timer_del(s->colo_delay_timer);
/*
* There are only two reasons we can get here, some error happened
* or the user triggered failover.
*/
switch (failover_get_state()) {
case FAILOVER_STATUS_NONE:
qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
COLO_EXIT_REASON_ERROR);
break;
case FAILOVER_STATUS_REQUIRE:
qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
COLO_EXIT_REASON_REQUEST);
break;
default:
abort();
}
/* Hope this not to be too long to wait here */
qemu_sem_wait(&s->colo_exit_sem);
qemu_sem_destroy(&s->colo_exit_sem);
/*
* It is safe to unregister notifier after failover finished.
* Besides, colo_delay_timer and colo_checkpoint_sem can't be
* released befor unregister notifier, or there will be use-after-free
* error.
*/
colo_compare_unregister_notifier(&packets_compare_notifier);
timer_del(s->colo_delay_timer);
timer_free(s->colo_delay_timer);
qemu_sem_destroy(&s->colo_checkpoint_sem);
/*
* Must be called after failover BH is completed,
* Or the failover BH may shutdown the wrong fd that
@ -533,6 +662,7 @@ void *colo_process_incoming_thread(void *opaque)
uint64_t total_size;
uint64_t value;
Error *local_err = NULL;
int ret;
rcu_register_thread();
qemu_sem_init(&mis->colo_incoming_sem, 0);
@ -559,6 +689,16 @@ void *colo_process_incoming_thread(void *opaque)
fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
object_unref(OBJECT(bioc));
qemu_mutex_lock_iothread();
replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
if (local_err) {
qemu_mutex_unlock_iothread();
goto out;
}
vm_start();
trace_colo_vm_state_change("stop", "run");
qemu_mutex_unlock_iothread();
colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
&local_err);
if (local_err) {
@ -578,6 +718,11 @@ void *colo_process_incoming_thread(void *opaque)
goto out;
}
qemu_mutex_lock_iothread();
vm_stop_force_state(RUN_STATE_COLO);
trace_colo_vm_state_change("run", "stop");
qemu_mutex_unlock_iothread();
/* FIXME: This is unnecessary for periodic checkpoint mode */
colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
&local_err);
@ -591,6 +736,16 @@ void *colo_process_incoming_thread(void *opaque)
goto out;
}
qemu_mutex_lock_iothread();
cpu_synchronize_all_pre_loadvm();
ret = qemu_loadvm_state_main(mis->from_src_file, mis);
qemu_mutex_unlock_iothread();
if (ret < 0) {
error_report("Load VM's live state (ram) error");
goto out;
}
value = colo_receive_message_value(mis->from_src_file,
COLO_MESSAGE_VMSTATE_SIZE, &local_err);
if (local_err) {
@ -622,15 +777,37 @@ void *colo_process_incoming_thread(void *opaque)
}
qemu_mutex_lock_iothread();
qemu_system_reset(SHUTDOWN_CAUSE_NONE);
vmstate_loading = true;
if (qemu_loadvm_state(fb) < 0) {
error_report("COLO: loadvm failed");
ret = qemu_load_device_state(fb);
if (ret < 0) {
error_report("COLO: load device state failed");
qemu_mutex_unlock_iothread();
goto out;
}
replication_get_error_all(&local_err);
if (local_err) {
qemu_mutex_unlock_iothread();
goto out;
}
/* discard colo disk buffer */
replication_do_checkpoint_all(&local_err);
if (local_err) {
qemu_mutex_unlock_iothread();
goto out;
}
/* Notify all filters of all NIC to do checkpoint */
colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
if (local_err) {
qemu_mutex_unlock_iothread();
goto out;
}
vmstate_loading = false;
vm_start();
trace_colo_vm_state_change("stop", "run");
qemu_mutex_unlock_iothread();
if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
@ -654,6 +831,19 @@ out:
error_report_err(local_err);
}
switch (failover_get_state()) {
case FAILOVER_STATUS_NONE:
qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
COLO_EXIT_REASON_ERROR);
break;
case FAILOVER_STATUS_REQUIRE:
qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
COLO_EXIT_REASON_REQUEST);
break;
default:
abort();
}
if (fb) {
qemu_fclose(fb);
}
@ -665,7 +855,7 @@ out:
if (mis->to_src_file) {
qemu_fclose(mis->to_src_file);
}
migration_incoming_exit_colo();
migration_incoming_disable_colo();
rcu_unregister_thread();
return NULL;

View File

@ -76,10 +76,8 @@
/* Migration XBZRLE default cache size */
#define DEFAULT_MIGRATE_XBZRLE_CACHE_SIZE (64 * 1024 * 1024)
/* The delay time (in ms) between two COLO checkpoints
* Note: Please change this default value to 10000 when we support hybrid mode.
*/
#define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY 200
/* The delay time (in ms) between two COLO checkpoints */
#define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY (200 * 100)
#define DEFAULT_MIGRATE_MULTIFD_CHANNELS 2
#define DEFAULT_MIGRATE_MULTIFD_PAGE_COUNT 16
@ -298,6 +296,22 @@ int migrate_send_rp_req_pages(MigrationIncomingState *mis, const char *rbname,
return migrate_send_rp_message(mis, msg_type, msglen, bufc);
}
static bool migration_colo_enabled;
bool migration_incoming_colo_enabled(void)
{
return migration_colo_enabled;
}
void migration_incoming_disable_colo(void)
{
migration_colo_enabled = false;
}
void migration_incoming_enable_colo(void)
{
migration_colo_enabled = true;
}
void qemu_start_incoming_migration(const char *uri, Error **errp)
{
const char *p;
@ -388,6 +402,7 @@ static void process_incoming_migration_co(void *opaque)
MigrationIncomingState *mis = migration_incoming_get_current();
PostcopyState ps;
int ret;
Error *local_err = NULL;
assert(mis->from_src_file);
mis->migration_incoming_co = qemu_coroutine_self();
@ -419,7 +434,21 @@ static void process_incoming_migration_co(void *opaque)
}
/* we get COLO info, and know if we are in COLO mode */
if (!ret && migration_incoming_enable_colo()) {
if (!ret && migration_incoming_colo_enabled()) {
/* Make sure all file formats flush their mutable metadata */
bdrv_invalidate_cache_all(&local_err);
if (local_err) {
migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
MIGRATION_STATUS_FAILED);
error_report_err(local_err);
exit(EXIT_FAILURE);
}
if (colo_init_ram_cache() < 0) {
error_report("Init ram cache failed");
exit(EXIT_FAILURE);
}
qemu_thread_create(&mis->colo_incoming_thread, "COLO incoming",
colo_process_incoming_thread, mis, QEMU_THREAD_JOINABLE);
mis->have_colo_incoming_thread = true;
@ -427,6 +456,8 @@ static void process_incoming_migration_co(void *opaque)
/* Wait checkpoint incoming thread exit before free resource */
qemu_thread_join(&mis->colo_incoming_thread);
/* We hold the global iothread lock, so it is safe here */
colo_release_ram_cache();
}
if (ret < 0) {
@ -3017,6 +3048,11 @@ static void *migration_thread(void *opaque)
qemu_savevm_send_postcopy_advise(s->to_dst_file);
}
if (migrate_colo_enabled()) {
/* Notify migration destination that we enable COLO */
qemu_savevm_send_colo_enable(s->to_dst_file);
}
qemu_savevm_state_setup(s->to_dst_file);
s->setup_time = qemu_clock_get_ms(QEMU_CLOCK_HOST) - setup_start;

View File

@ -3447,6 +3447,29 @@ static inline void *host_from_ram_block_offset(RAMBlock *block,
return block->host + offset;
}
static inline void *colo_cache_from_block_offset(RAMBlock *block,
ram_addr_t offset)
{
if (!offset_in_ramblock(block, offset)) {
return NULL;
}
if (!block->colo_cache) {
error_report("%s: colo_cache is NULL in block :%s",
__func__, block->idstr);
return NULL;
}
/*
* During colo checkpoint, we need bitmap of these migrated pages.
* It help us to decide which pages in ram cache should be flushed
* into VM's RAM later.
*/
if (!test_and_set_bit(offset >> TARGET_PAGE_BITS, block->bmap)) {
ram_state->migration_dirty_pages++;
}
return block->colo_cache + offset;
}
/**
* ram_handle_compressed: handle the zero page case
*
@ -3651,6 +3674,88 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
qemu_mutex_unlock(&decomp_done_lock);
}
/*
* colo cache: this is for secondary VM, we cache the whole
* memory of the secondary VM, it is need to hold the global lock
* to call this helper.
*/
int colo_init_ram_cache(void)
{
RAMBlock *block;
rcu_read_lock();
RAMBLOCK_FOREACH_MIGRATABLE(block) {
block->colo_cache = qemu_anon_ram_alloc(block->used_length,
NULL,
false);
if (!block->colo_cache) {
error_report("%s: Can't alloc memory for COLO cache of block %s,"
"size 0x" RAM_ADDR_FMT, __func__, block->idstr,
block->used_length);
goto out_locked;
}
memcpy(block->colo_cache, block->host, block->used_length);
}
rcu_read_unlock();
/*
* Record the dirty pages that sent by PVM, we use this dirty bitmap together
* with to decide which page in cache should be flushed into SVM's RAM. Here
* we use the same name 'ram_bitmap' as for migration.
*/
if (ram_bytes_total()) {
RAMBlock *block;
RAMBLOCK_FOREACH_MIGRATABLE(block) {
unsigned long pages = block->max_length >> TARGET_PAGE_BITS;
block->bmap = bitmap_new(pages);
bitmap_set(block->bmap, 0, pages);
}
}
ram_state = g_new0(RAMState, 1);
ram_state->migration_dirty_pages = 0;
memory_global_dirty_log_start();
return 0;
out_locked:
RAMBLOCK_FOREACH_MIGRATABLE(block) {
if (block->colo_cache) {
qemu_anon_ram_free(block->colo_cache, block->used_length);
block->colo_cache = NULL;
}
}
rcu_read_unlock();
return -errno;
}
/* It is need to hold the global lock to call this helper */
void colo_release_ram_cache(void)
{
RAMBlock *block;
memory_global_dirty_log_stop();
RAMBLOCK_FOREACH_MIGRATABLE(block) {
g_free(block->bmap);
block->bmap = NULL;
}
rcu_read_lock();
RAMBLOCK_FOREACH_MIGRATABLE(block) {
if (block->colo_cache) {
qemu_anon_ram_free(block->colo_cache, block->used_length);
block->colo_cache = NULL;
}
}
rcu_read_unlock();
g_free(ram_state);
ram_state = NULL;
}
/**
* ram_load_setup: Setup RAM for migration incoming side
*
@ -3667,6 +3772,7 @@ static int ram_load_setup(QEMUFile *f, void *opaque)
xbzrle_load_setup();
ramblock_recv_map_init();
return 0;
}
@ -3687,6 +3793,7 @@ static int ram_load_cleanup(void *opaque)
g_free(rb->receivedmap);
rb->receivedmap = NULL;
}
return 0;
}
@ -3869,6 +3976,46 @@ static bool postcopy_is_running(void)
return ps >= POSTCOPY_INCOMING_LISTENING && ps < POSTCOPY_INCOMING_END;
}
/*
* Flush content of RAM cache into SVM's memory.
* Only flush the pages that be dirtied by PVM or SVM or both.
*/
static void colo_flush_ram_cache(void)
{
RAMBlock *block = NULL;
void *dst_host;
void *src_host;
unsigned long offset = 0;
memory_global_dirty_log_sync();
rcu_read_lock();
RAMBLOCK_FOREACH_MIGRATABLE(block) {
migration_bitmap_sync_range(ram_state, block, 0, block->used_length);
}
rcu_read_unlock();
trace_colo_flush_ram_cache_begin(ram_state->migration_dirty_pages);
rcu_read_lock();
block = QLIST_FIRST_RCU(&ram_list.blocks);
while (block) {
offset = migration_bitmap_find_dirty(ram_state, block, offset);
if (offset << TARGET_PAGE_BITS >= block->used_length) {
offset = 0;
block = QLIST_NEXT_RCU(block, next);
} else {
migration_bitmap_clear_dirty(ram_state, block, offset);
dst_host = block->host + (offset << TARGET_PAGE_BITS);
src_host = block->colo_cache + (offset << TARGET_PAGE_BITS);
memcpy(dst_host, src_host, TARGET_PAGE_SIZE);
}
}
rcu_read_unlock();
trace_colo_flush_ram_cache_end();
}
static int ram_load(QEMUFile *f, void *opaque, int version_id)
{
int flags = 0, ret = 0, invalid_flags = 0;
@ -3924,13 +4071,24 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
RAMBlock *block = ram_block_from_stream(f, flags);
/*
* After going into COLO, we should load the Page into colo_cache.
*/
if (migration_incoming_in_colo_state()) {
host = colo_cache_from_block_offset(block, addr);
} else {
host = host_from_ram_block_offset(block, addr);
}
if (!host) {
error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
ret = -EINVAL;
break;
}
if (!migration_incoming_in_colo_state()) {
ramblock_recv_bitmap_set(block, host);
}
trace_ram_load_loop(block->idstr, (uint64_t)addr, flags, host);
}
@ -4034,6 +4192,10 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
ret |= wait_for_decompress_done();
rcu_read_unlock();
trace_ram_load_complete(ret, seq_iter);
if (!ret && migration_incoming_in_colo_state()) {
colo_flush_ram_cache();
}
return ret;
}

View File

@ -71,4 +71,8 @@ int64_t ramblock_recv_bitmap_send(QEMUFile *file,
const char *block_name);
int ram_dirty_bitmap_reload(MigrationState *s, RAMBlock *rb);
/* ram cache */
int colo_init_ram_cache(void);
void colo_release_ram_cache(void);
#endif

View File

@ -56,6 +56,7 @@
#include "io/channel-file.h"
#include "sysemu/replay.h"
#include "qjson.h"
#include "migration/colo.h"
#ifndef ETH_P_RARP
#define ETH_P_RARP 0x8035
@ -82,6 +83,7 @@ enum qemu_vm_cmd {
were previously sent during
precopy but are dirty. */
MIG_CMD_PACKAGED, /* Send a wrapped stream within this stream */
MIG_CMD_ENABLE_COLO, /* Enable COLO */
MIG_CMD_POSTCOPY_RESUME, /* resume postcopy on dest */
MIG_CMD_RECV_BITMAP, /* Request for recved bitmap on dst */
MIG_CMD_MAX
@ -841,6 +843,12 @@ static void qemu_savevm_command_send(QEMUFile *f,
qemu_fflush(f);
}
void qemu_savevm_send_colo_enable(QEMUFile *f)
{
trace_savevm_send_colo_enable();
qemu_savevm_command_send(f, MIG_CMD_ENABLE_COLO, 0, NULL);
}
void qemu_savevm_send_ping(QEMUFile *f, uint32_t value)
{
uint32_t buf;
@ -1370,13 +1378,21 @@ done:
return ret;
}
static int qemu_save_device_state(QEMUFile *f)
void qemu_savevm_live_state(QEMUFile *f)
{
/* save QEMU_VM_SECTION_END section */
qemu_savevm_state_complete_precopy(f, true, false);
qemu_put_byte(f, QEMU_VM_EOF);
}
int qemu_save_device_state(QEMUFile *f)
{
SaveStateEntry *se;
if (!migration_in_colo_state()) {
qemu_put_be32(f, QEMU_VM_FILE_MAGIC);
qemu_put_be32(f, QEMU_VM_FILE_VERSION);
}
cpu_synchronize_all_states();
QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
@ -1432,8 +1448,6 @@ enum LoadVMExitCodes {
LOADVM_QUIT = 1,
};
static int qemu_loadvm_state_main(QEMUFile *f, MigrationIncomingState *mis);
/* ------ incoming postcopy messages ------ */
/* 'advise' arrives before any transfers just to tell us that a postcopy
* *might* happen - it might be skipped if precopy transferred everything
@ -1922,6 +1936,12 @@ static int loadvm_handle_recv_bitmap(MigrationIncomingState *mis,
return 0;
}
static int loadvm_process_enable_colo(MigrationIncomingState *mis)
{
migration_incoming_enable_colo();
return colo_init_ram_cache();
}
/*
* Process an incoming 'QEMU_VM_COMMAND'
* 0 just a normal return
@ -2001,6 +2021,9 @@ static int loadvm_process_command(QEMUFile *f)
case MIG_CMD_RECV_BITMAP:
return loadvm_handle_recv_bitmap(mis, len);
case MIG_CMD_ENABLE_COLO:
return loadvm_process_enable_colo(mis);
}
return 0;
@ -2230,7 +2253,7 @@ static bool postcopy_pause_incoming(MigrationIncomingState *mis)
return true;
}
static int qemu_loadvm_state_main(QEMUFile *f, MigrationIncomingState *mis)
int qemu_loadvm_state_main(QEMUFile *f, MigrationIncomingState *mis)
{
uint8_t section_type;
int ret = 0;
@ -2401,6 +2424,22 @@ int qemu_loadvm_state(QEMUFile *f)
return ret;
}
int qemu_load_device_state(QEMUFile *f)
{
MigrationIncomingState *mis = migration_incoming_get_current();
int ret;
/* Load QEMU_VM_SECTION_FULL section */
ret = qemu_loadvm_state_main(f, mis);
if (ret < 0) {
error_report("Failed to load device state: %d", ret);
return ret;
}
cpu_synchronize_all_post_init();
return 0;
}
int save_snapshot(const char *name, Error **errp)
{
BlockDriverState *bs, *bs1;

View File

@ -55,8 +55,13 @@ void qemu_savevm_send_postcopy_ram_discard(QEMUFile *f, const char *name,
uint16_t len,
uint64_t *start_list,
uint64_t *length_list);
void qemu_savevm_send_colo_enable(QEMUFile *f);
void qemu_savevm_live_state(QEMUFile *f);
int qemu_save_device_state(QEMUFile *f);
int qemu_loadvm_state(QEMUFile *f);
void qemu_loadvm_state_cleanup(void);
int qemu_loadvm_state_main(QEMUFile *f, MigrationIncomingState *mis);
int qemu_load_device_state(QEMUFile *f);
#endif

View File

@ -37,6 +37,7 @@ savevm_send_ping(uint32_t val) "0x%x"
savevm_send_postcopy_listen(void) ""
savevm_send_postcopy_run(void) ""
savevm_send_postcopy_resume(void) ""
savevm_send_colo_enable(void) ""
savevm_send_recv_bitmap(char *name) "%s"
savevm_state_setup(void) ""
savevm_state_resume_prepare(void) ""
@ -101,6 +102,8 @@ ram_dirty_bitmap_sync_start(void) ""
ram_dirty_bitmap_sync_wait(void) ""
ram_dirty_bitmap_sync_complete(void) ""
ram_state_resume_prepare(uint64_t v) "%" PRId64
colo_flush_ram_cache_begin(uint64_t dirty_pages) "dirty_pages %" PRIu64
colo_flush_ram_cache_end(void) ""
# migration/migration.c
await_return_path_close_on_source_close(void) ""

View File

@ -27,11 +27,20 @@
#include "qemu/sockets.h"
#include "colo.h"
#include "sysemu/iothread.h"
#include "net/colo-compare.h"
#include "migration/colo.h"
#include "migration/migration.h"
#define TYPE_COLO_COMPARE "colo-compare"
#define COLO_COMPARE(obj) \
OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
static QTAILQ_HEAD(, CompareState) net_compares =
QTAILQ_HEAD_INITIALIZER(net_compares);
static NotifierList colo_compare_notifiers =
NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
#define COMPARE_READ_LEN_MAX NET_BUFSIZE
#define MAX_QUEUE_SIZE 1024
@ -41,6 +50,10 @@
/* TODO: Should be configurable */
#define REGULAR_PACKET_CHECK_MS 3000
static QemuMutex event_mtx;
static QemuCond event_complete_cond;
static int event_unhandled_count;
/*
* + CompareState ++
* | |
@ -87,6 +100,11 @@ typedef struct CompareState {
IOThread *iothread;
GMainContext *worker_context;
QEMUTimer *packet_check_timer;
QEMUBH *event_bh;
enum colo_event event;
QTAILQ_ENTRY(CompareState) next;
} CompareState;
typedef struct CompareClass {
@ -98,6 +116,12 @@ enum {
SECONDARY_IN,
};
static void colo_compare_inconsistency_notify(void)
{
notifier_list_notify(&colo_compare_notifiers,
migrate_get_current());
}
static int compare_chr_send(CompareState *s,
const uint8_t *buf,
uint32_t size,
@ -413,10 +437,7 @@ sec:
qemu_hexdump((char *)spkt->data, stderr,
"colo-compare spkt", spkt->size);
/*
* colo_compare_inconsistent_notify();
* TODO: notice to checkpoint();
*/
colo_compare_inconsistency_notify();
}
}
@ -547,6 +568,16 @@ static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
}
}
void colo_compare_register_notifier(Notifier *notify)
{
notifier_list_add(&colo_compare_notifiers, notify);
}
void colo_compare_unregister_notifier(Notifier *notify)
{
notifier_remove(notify);
}
static int colo_old_packet_check_one_conn(Connection *conn,
void *user_data)
{
@ -559,10 +590,7 @@ static int colo_old_packet_check_one_conn(Connection *conn,
if (result) {
/* Do checkpoint will flush old packet */
/*
* TODO: Notify colo frame to do checkpoint.
* colo_compare_inconsistent_notify();
*/
colo_compare_inconsistency_notify();
return 0;
}
@ -606,11 +634,12 @@ static void colo_compare_packet(CompareState *s, Connection *conn,
/*
* If one packet arrive late, the secondary_list or
* primary_list will be empty, so we can't compare it
* until next comparison.
* until next comparison. If the packets in the list are
* timeout, it will trigger a checkpoint request.
*/
trace_colo_compare_main("packet different");
g_queue_push_head(&conn->primary_list, pkt);
/* TODO: colo_notify_checkpoint();*/
colo_compare_inconsistency_notify();
break;
}
}
@ -736,6 +765,25 @@ static void check_old_packet_regular(void *opaque)
REGULAR_PACKET_CHECK_MS);
}
/* Public API, Used for COLO frame to notify compare event */
void colo_notify_compares_event(void *opaque, int event, Error **errp)
{
CompareState *s;
qemu_mutex_lock(&event_mtx);
QTAILQ_FOREACH(s, &net_compares, next) {
s->event = event;
qemu_bh_schedule(s->event_bh);
event_unhandled_count++;
}
/* Wait all compare threads to finish handling this event */
while (event_unhandled_count > 0) {
qemu_cond_wait(&event_complete_cond, &event_mtx);
}
qemu_mutex_unlock(&event_mtx);
}
static void colo_compare_timer_init(CompareState *s)
{
AioContext *ctx = iothread_get_aio_context(s->iothread);
@ -756,6 +804,30 @@ static void colo_compare_timer_del(CompareState *s)
}
}
static void colo_flush_packets(void *opaque, void *user_data);
static void colo_compare_handle_event(void *opaque)
{
CompareState *s = opaque;
switch (s->event) {
case COLO_EVENT_CHECKPOINT:
g_queue_foreach(&s->conn_list, colo_flush_packets, s);
break;
case COLO_EVENT_FAILOVER:
break;
default:
break;
}
assert(event_unhandled_count > 0);
qemu_mutex_lock(&event_mtx);
event_unhandled_count--;
qemu_cond_broadcast(&event_complete_cond);
qemu_mutex_unlock(&event_mtx);
}
static void colo_compare_iothread(CompareState *s)
{
object_ref(OBJECT(s->iothread));
@ -769,6 +841,7 @@ static void colo_compare_iothread(CompareState *s)
s, s->worker_context, true);
colo_compare_timer_init(s);
s->event_bh = qemu_bh_new(colo_compare_handle_event, s);
}
static char *compare_get_pri_indev(Object *obj, Error **errp)
@ -926,8 +999,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
QTAILQ_INSERT_TAIL(&net_compares, s, next);
g_queue_init(&s->conn_list);
qemu_mutex_init(&event_mtx);
qemu_cond_init(&event_complete_cond);
s->connection_track_table = g_hash_table_new_full(connection_key_hash,
connection_key_equal,
g_free,
@ -990,6 +1068,7 @@ static void colo_compare_init(Object *obj)
static void colo_compare_finalize(Object *obj)
{
CompareState *s = COLO_COMPARE(obj);
CompareState *tmp = NULL;
qemu_chr_fe_deinit(&s->chr_pri_in, false);
qemu_chr_fe_deinit(&s->chr_sec_in, false);
@ -997,6 +1076,16 @@ static void colo_compare_finalize(Object *obj)
if (s->iothread) {
colo_compare_timer_del(s);
}
qemu_bh_delete(s->event_bh);
QTAILQ_FOREACH(tmp, &net_compares, next) {
if (tmp == s) {
QTAILQ_REMOVE(&net_compares, s, next);
break;
}
}
/* Release all unhandled packets after compare thead exited */
g_queue_foreach(&s->conn_list, colo_flush_packets, s);
@ -1009,6 +1098,10 @@ static void colo_compare_finalize(Object *obj)
if (s->iothread) {
object_unref(OBJECT(s->iothread));
}
qemu_mutex_destroy(&event_mtx);
qemu_cond_destroy(&event_complete_cond);
g_free(s->pri_indev);
g_free(s->sec_indev);
g_free(s->outdev);

24
net/colo-compare.h Normal file
View File

@ -0,0 +1,24 @@
/*
* COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
* (a.k.a. Fault Tolerance or Continuous Replication)
*
* Copyright (c) 2017 HUAWEI TECHNOLOGIES CO., LTD.
* Copyright (c) 2017 FUJITSU LIMITED
* Copyright (c) 2017 Intel Corporation
*
* Authors:
* zhanghailiang <zhang.zhanghailiang@huawei.com>
* Zhang Chen <zhangckid@gmail.com>
*
* This work is licensed under the terms of the GNU GPL, version 2 or
* later. See the COPYING file in the top-level directory.
*/
#ifndef QEMU_COLO_COMPARE_H
#define QEMU_COLO_COMPARE_H
void colo_notify_compares_event(void *opaque, int event, Error **errp);
void colo_compare_register_notifier(Notifier *notify);
void colo_compare_unregister_notifier(Notifier *notify);
#endif /* QEMU_COLO_COMPARE_H */

View File

@ -137,7 +137,7 @@ Connection *connection_new(ConnectionKey *key)
conn->ip_proto = key->ip_proto;
conn->processing = false;
conn->offset = 0;
conn->syn_flag = 0;
conn->tcp_state = TCPS_CLOSED;
conn->pack = 0;
conn->sack = 0;
g_queue_init(&conn->primary_list);
@ -221,3 +221,11 @@ Connection *connection_get(GHashTable *connection_track_table,
return conn;
}
bool connection_has_tracked(GHashTable *connection_track_table,
ConnectionKey *key)
{
Connection *conn = g_hash_table_lookup(connection_track_table, key);
return conn ? true : false;
}

View File

@ -18,6 +18,7 @@
#include "slirp/slirp.h"
#include "qemu/jhash.h"
#include "qemu/timer.h"
#include "slirp/tcp.h"
#define HASHTABLE_MAX_SIZE 16384
@ -81,11 +82,9 @@ typedef struct Connection {
uint32_t sack;
/* offset = secondary_seq - primary_seq */
tcp_seq offset;
/*
* we use this flag update offset func
* run once in independent tcp connection
*/
int syn_flag;
int tcp_state; /* TCP FSM state */
tcp_seq fin_ack_seq; /* the seq of 'fin=1,ack=1' */
} Connection;
uint32_t connection_key_hash(const void *opaque);
@ -99,6 +98,8 @@ void connection_destroy(void *opaque);
Connection *connection_get(GHashTable *connection_track_table,
ConnectionKey *key,
GQueue *conn_list);
bool connection_has_tracked(GHashTable *connection_track_table,
ConnectionKey *key);
void connection_hashtable_reset(GHashTable *connection_track_table);
Packet *packet_new(const void *data, int size, int vnet_hdr_len);
void packet_destroy(void *opaque, void *user_data);

View File

@ -20,11 +20,15 @@
#include "qemu/main-loop.h"
#include "qemu/iov.h"
#include "net/checksum.h"
#include "net/colo.h"
#include "migration/colo.h"
#define FILTER_COLO_REWRITER(obj) \
OBJECT_CHECK(RewriterState, (obj), TYPE_FILTER_REWRITER)
#define TYPE_FILTER_REWRITER "filter-rewriter"
#define FAILOVER_MODE_ON true
#define FAILOVER_MODE_OFF false
typedef struct RewriterState {
NetFilterState parent_obj;
@ -32,8 +36,14 @@ typedef struct RewriterState {
/* hashtable to save connection */
GHashTable *connection_track_table;
bool vnet_hdr;
bool failover_mode;
} RewriterState;
static void filter_rewriter_failover_mode(RewriterState *s)
{
s->failover_mode = FAILOVER_MODE_ON;
}
static void filter_rewriter_flush(NetFilterState *nf)
{
RewriterState *s = FILTER_COLO_REWRITER(nf);
@ -59,9 +69,9 @@ static int is_tcp_packet(Packet *pkt)
}
/* handle tcp packet from primary guest */
static int handle_primary_tcp_pkt(NetFilterState *nf,
static int handle_primary_tcp_pkt(RewriterState *rf,
Connection *conn,
Packet *pkt)
Packet *pkt, ConnectionKey *key)
{
struct tcphdr *tcp_pkt;
@ -74,23 +84,28 @@ static int handle_primary_tcp_pkt(NetFilterState *nf,
trace_colo_filter_rewriter_conn_offset(conn->offset);
}
if (((tcp_pkt->th_flags & (TH_ACK | TH_SYN)) == (TH_ACK | TH_SYN)) &&
conn->tcp_state == TCPS_SYN_SENT) {
conn->tcp_state = TCPS_ESTABLISHED;
}
if (((tcp_pkt->th_flags & (TH_ACK | TH_SYN)) == TH_SYN)) {
/*
* we use this flag update offset func
* run once in independent tcp connection
*/
conn->syn_flag = 1;
conn->tcp_state = TCPS_SYN_RECEIVED;
}
if (((tcp_pkt->th_flags & (TH_ACK | TH_SYN)) == TH_ACK)) {
if (conn->syn_flag) {
if (conn->tcp_state == TCPS_SYN_RECEIVED) {
/*
* offset = secondary_seq - primary seq
* ack packet sent by guest from primary node,
* so we use th_ack - 1 get primary_seq
*/
conn->offset -= (ntohl(tcp_pkt->th_ack) - 1);
conn->syn_flag = 0;
conn->tcp_state = TCPS_ESTABLISHED;
}
if (conn->offset) {
/* handle packets to the secondary from the primary */
@ -99,15 +114,66 @@ static int handle_primary_tcp_pkt(NetFilterState *nf,
net_checksum_calculate((uint8_t *)pkt->data + pkt->vnet_hdr_len,
pkt->size - pkt->vnet_hdr_len);
}
/*
* Passive close step 3
*/
if ((conn->tcp_state == TCPS_LAST_ACK) &&
(ntohl(tcp_pkt->th_ack) == (conn->fin_ack_seq + 1))) {
conn->tcp_state = TCPS_CLOSED;
g_hash_table_remove(rf->connection_track_table, key);
}
}
if ((tcp_pkt->th_flags & TH_FIN) == TH_FIN) {
/*
* Passive close.
* Step 1:
* The *server* side of this connect is VM, *client* tries to close
* the connection. We will into CLOSE_WAIT status.
*
* Step 2:
* In this step we will into LAST_ACK status.
*
* We got 'fin=1, ack=1' packet from server side, we need to
* record the seq of 'fin=1, ack=1' packet.
*
* Step 3:
* We got 'ack=1' packets from client side, it acks 'fin=1, ack=1'
* packet from server side. From this point, we can ensure that there
* will be no packets in the connection, except that, some errors
* happen between the path of 'filter object' and vNIC, if this rare
* case really happen, we can still create a new connection,
* So it is safe to remove the connection from connection_track_table.
*
*/
if (conn->tcp_state == TCPS_ESTABLISHED) {
conn->tcp_state = TCPS_CLOSE_WAIT;
}
/*
* Active close step 2.
*/
if (conn->tcp_state == TCPS_FIN_WAIT_1) {
conn->tcp_state = TCPS_TIME_WAIT;
/*
* For simplify implementation, we needn't wait 2MSL time
* in filter rewriter. Because guest kernel will track the
* TCP status and wait 2MSL time, if client resend the FIN
* packet, guest will apply the last ACK too.
*/
conn->tcp_state = TCPS_CLOSED;
g_hash_table_remove(rf->connection_track_table, key);
}
}
return 0;
}
/* handle tcp packet from secondary guest */
static int handle_secondary_tcp_pkt(NetFilterState *nf,
static int handle_secondary_tcp_pkt(RewriterState *rf,
Connection *conn,
Packet *pkt)
Packet *pkt, ConnectionKey *key)
{
struct tcphdr *tcp_pkt;
@ -121,7 +187,8 @@ static int handle_secondary_tcp_pkt(NetFilterState *nf,
trace_colo_filter_rewriter_conn_offset(conn->offset);
}
if (((tcp_pkt->th_flags & (TH_ACK | TH_SYN)) == (TH_ACK | TH_SYN))) {
if (conn->tcp_state == TCPS_SYN_RECEIVED &&
((tcp_pkt->th_flags & (TH_ACK | TH_SYN)) == (TH_ACK | TH_SYN))) {
/*
* save offset = secondary_seq and then
* in handle_primary_tcp_pkt make offset
@ -130,6 +197,12 @@ static int handle_secondary_tcp_pkt(NetFilterState *nf,
conn->offset = ntohl(tcp_pkt->th_seq);
}
/* VM active connect */
if (conn->tcp_state == TCPS_CLOSED &&
((tcp_pkt->th_flags & (TH_ACK | TH_SYN)) == TH_SYN)) {
conn->tcp_state = TCPS_SYN_SENT;
}
if ((tcp_pkt->th_flags & (TH_ACK | TH_SYN)) == TH_ACK) {
/* Only need to adjust seq while offset is Non-zero */
if (conn->offset) {
@ -141,6 +214,32 @@ static int handle_secondary_tcp_pkt(NetFilterState *nf,
}
}
/*
* Passive close step 2:
*/
if (conn->tcp_state == TCPS_CLOSE_WAIT &&
(tcp_pkt->th_flags & (TH_ACK | TH_FIN)) == (TH_ACK | TH_FIN)) {
conn->fin_ack_seq = ntohl(tcp_pkt->th_seq);
conn->tcp_state = TCPS_LAST_ACK;
}
/*
* Active close
*
* Step 1:
* The *server* side of this connect is VM, *server* tries to close
* the connection.
*
* Step 2:
* We will into CLOSE_WAIT status.
* We simplify the TCPS_FIN_WAIT_2, TCPS_TIME_WAIT and
* CLOSING status.
*/
if (conn->tcp_state == TCPS_ESTABLISHED &&
(tcp_pkt->th_flags & (TH_ACK | TH_FIN)) == TH_FIN) {
conn->tcp_state = TCPS_FIN_WAIT_1;
}
return 0;
}
@ -184,13 +283,20 @@ static ssize_t colo_rewriter_receive_iov(NetFilterState *nf,
*/
reverse_connection_key(&key);
}
/* After failover we needn't change new TCP packet */
if (s->failover_mode &&
!connection_has_tracked(s->connection_track_table, &key)) {
goto out;
}
conn = connection_get(s->connection_track_table,
&key,
NULL);
if (sender == nf->netdev) {
/* NET_FILTER_DIRECTION_TX */
if (!handle_primary_tcp_pkt(nf, conn, pkt)) {
if (!handle_primary_tcp_pkt(s, conn, pkt, &key)) {
qemu_net_queue_send(s->incoming_queue, sender, 0,
(const uint8_t *)pkt->data, pkt->size, NULL);
packet_destroy(pkt, NULL);
@ -203,7 +309,7 @@ static ssize_t colo_rewriter_receive_iov(NetFilterState *nf,
}
} else {
/* NET_FILTER_DIRECTION_RX */
if (!handle_secondary_tcp_pkt(nf, conn, pkt)) {
if (!handle_secondary_tcp_pkt(s, conn, pkt, &key)) {
qemu_net_queue_send(s->incoming_queue, sender, 0,
(const uint8_t *)pkt->data, pkt->size, NULL);
packet_destroy(pkt, NULL);
@ -217,11 +323,49 @@ static ssize_t colo_rewriter_receive_iov(NetFilterState *nf,
}
}
out:
packet_destroy(pkt, NULL);
pkt = NULL;
return 0;
}
static void reset_seq_offset(gpointer key, gpointer value, gpointer user_data)
{
Connection *conn = (Connection *)value;
conn->offset = 0;
}
static gboolean offset_is_nonzero(gpointer key,
gpointer value,
gpointer user_data)
{
Connection *conn = (Connection *)value;
return conn->offset ? true : false;
}
static void colo_rewriter_handle_event(NetFilterState *nf, int event,
Error **errp)
{
RewriterState *rs = FILTER_COLO_REWRITER(nf);
switch (event) {
case COLO_EVENT_CHECKPOINT:
g_hash_table_foreach(rs->connection_track_table,
reset_seq_offset, NULL);
break;
case COLO_EVENT_FAILOVER:
if (!g_hash_table_find(rs->connection_track_table,
offset_is_nonzero, NULL)) {
filter_rewriter_failover_mode(rs);
}
break;
default:
break;
}
}
static void colo_rewriter_cleanup(NetFilterState *nf)
{
RewriterState *s = FILTER_COLO_REWRITER(nf);
@ -265,6 +409,7 @@ static void filter_rewriter_init(Object *obj)
RewriterState *s = FILTER_COLO_REWRITER(obj);
s->vnet_hdr = false;
s->failover_mode = FAILOVER_MODE_OFF;
object_property_add_bool(obj, "vnet_hdr_support",
filter_rewriter_get_vnet_hdr,
filter_rewriter_set_vnet_hdr, NULL);
@ -277,6 +422,7 @@ static void colo_rewriter_class_init(ObjectClass *oc, void *data)
nfc->setup = colo_rewriter_setup;
nfc->cleanup = colo_rewriter_cleanup;
nfc->receive_iov = colo_rewriter_receive_iov;
nfc->handle_event = colo_rewriter_handle_event;
}
static const TypeInfo colo_rewriter_info = {

View File

@ -17,6 +17,8 @@
#include "net/vhost_net.h"
#include "qom/object_interfaces.h"
#include "qemu/iov.h"
#include "net/colo.h"
#include "migration/colo.h"
static inline bool qemu_can_skip_netfilter(NetFilterState *nf)
{
@ -245,11 +247,26 @@ static void netfilter_finalize(Object *obj)
g_free(nf->netdev_id);
}
static void default_handle_event(NetFilterState *nf, int event, Error **errp)
{
switch (event) {
case COLO_EVENT_CHECKPOINT:
break;
case COLO_EVENT_FAILOVER:
object_property_set_str(OBJECT(nf), "off", "status", errp);
break;
default:
break;
}
}
static void netfilter_class_init(ObjectClass *oc, void *data)
{
UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
NetFilterClass *nfc = NETFILTER_CLASS(oc);
ucc->complete = netfilter_complete;
nfc->handle_event = default_handle_event;
}
static const TypeInfo netfilter_info = {

View File

@ -712,10 +712,15 @@ ssize_t qemu_deliver_packet_iov(NetClientState *sender,
void *opaque)
{
NetClientState *nc = opaque;
size_t size = iov_size(iov, iovcnt);
int ret;
if (size > INT_MAX) {
return size;
}
if (nc->link_down) {
return iov_size(iov, iovcnt);
return size;
}
if (nc->receive_disabled) {
@ -1335,6 +1340,25 @@ void hmp_info_network(Monitor *mon, const QDict *qdict)
}
}
void colo_notify_filters_event(int event, Error **errp)
{
NetClientState *nc;
NetFilterState *nf;
NetFilterClass *nfc = NULL;
Error *local_err = NULL;
QTAILQ_FOREACH(nc, &net_clients, next) {
QTAILQ_FOREACH(nf, &nc->filters, next) {
nfc = NETFILTER_GET_CLASS(OBJECT(nf));
nfc->handle_event(nf, event, &local_err);
if (local_err) {
error_propagate(errp, local_err);
return;
}
}
}
}
void qmp_set_link(const char *name, bool up, Error **errp)
{
NetClientState *ncs[MAX_QUEUE_NUM];

View File

@ -923,18 +923,18 @@
##
# @COLOMode:
#
# The colo mode
# The COLO current mode.
#
# @unknown: unknown mode
# @none: COLO is disabled.
#
# @primary: master side
# @primary: COLO node in primary side.
#
# @secondary: slave side
# @secondary: COLO node in slave side.
#
# Since: 2.8
##
{ 'enum': 'COLOMode',
'data': [ 'unknown', 'primary', 'secondary'] }
'data': [ 'none', 'primary', 'secondary'] }
##
# @FailoverStatus:
@ -956,6 +956,44 @@
{ 'enum': 'FailoverStatus',
'data': [ 'none', 'require', 'active', 'completed', 'relaunch' ] }
##
# @COLO_EXIT:
#
# Emitted when VM finishes COLO mode due to some errors happening or
# at the request of users.
#
# @mode: report COLO mode when COLO exited.
#
# @reason: describes the reason for the COLO exit.
#
# Since: 3.1
#
# Example:
#
# <- { "timestamp": {"seconds": 2032141960, "microseconds": 417172},
# "event": "COLO_EXIT", "data": {"mode": "primary", "reason": "request" } }
#
##
{ 'event': 'COLO_EXIT',
'data': {'mode': 'COLOMode', 'reason': 'COLOExitReason' } }
##
# @COLOExitReason:
#
# The reason for a COLO exit
#
# @none: no failover has ever happened. This can't occur in the
# COLO_EXIT event, only in the result of query-colo-status.
#
# @request: COLO exit is due to an external request
#
# @error: COLO exit is due to an internal error
#
# Since: 3.1
##
{ 'enum': 'COLOExitReason',
'data': [ 'none', 'request', 'error' ] }
##
# @x-colo-lost-heartbeat:
#
@ -1269,6 +1307,38 @@
##
{ 'command': 'xen-colo-do-checkpoint' }
##
# @COLOStatus:
#
# The result format for 'query-colo-status'.
#
# @mode: COLO running mode. If COLO is running, this field will return
# 'primary' or 'secondary'.
#
# @reason: describes the reason for the COLO exit.
#
# Since: 3.0
##
{ 'struct': 'COLOStatus',
'data': { 'mode': 'COLOMode', 'reason': 'COLOExitReason' } }
##
# @query-colo-status:
#
# Query COLO status while the vm is running.
#
# Returns: A @COLOStatus object showing the status.
#
# Example:
#
# -> { "execute": "query-colo-status" }
# <- { "return": { "mode": "primary", "active": true, "reason": "request" } }
#
# Since: 3.0
##
{ 'command': 'query-colo-status',
'returns': 'COLOStatus' }
##
# @migrate-recover:
#

View File

@ -2256,7 +2256,7 @@ qemu-system-i386 linux.img \
-netdev socket,id=n2,mcast=230.0.0.1:1234
# launch yet another QEMU instance on same "bus"
qemu-system-i386 linux.img \
-device e1000,netdev=n3,macaddr=52:54:00:12:34:58 \
-device e1000,netdev=n3,mac=52:54:00:12:34:58 \
-netdev socket,id=n3,mcast=230.0.0.1:1234
@end example

2
vl.c
View File

@ -4365,8 +4365,6 @@ int main(int argc, char **argv, char **envp)
#endif
}
colo_info_init();
if (net_init_clients(&err) < 0) {
error_report_err(err);
exit(1);