colo: compare the packet based on the tcp sequence number

Packet size some time different or when network is busy.
Based on same payload size, but TCP protocol can not
guarantee send the same one packet in the same way,

like that:
We send this payload:
------------------------------
| header |1|2|3|4|5|6|7|8|9|0|
------------------------------

primary:
ppkt1:
----------------
| header |1|2|3|
----------------
ppkt2:
------------------------
| header |4|5|6|7|8|9|0|
------------------------

secondary:
spkt1:
------------------------------
| header |1|2|3|4|5|6|7|8|9|0|
------------------------------

In the original method, ppkt1 and ppkt2 are different in size and
spkt1, so they can't compare and trigger the checkpoint.

I have tested FTP get 200M and 1G file many times, I found that
the performance was less than 1% of the native.

Now I reconstructed the comparison of TCP packets based on the
TCP sequence number. first of all, ppkt1 and spkt1 have the same
starting sequence number, so they can compare, even though their
length is different. And then ppkt1 with a smaller payload length
is used as the comparison length, if the payload is same, send
out the ppkt1 and record the offset(the length of ppkt1 payload)
in spkt1. The next comparison, ppkt2 and spkt1 can be compared
from the recorded position of spkt1.

like that:
----------------
| header |1|2|3| ppkt1
---------|-----|
         |     |
---------v-----v--------------
| header |1|2|3|4|5|6|7|8|9|0| spkt1
---------------|\------------|
               | \offset     |
      ---------v-------------v
      | header |4|5|6|7|8|9|0| ppkt2
      ------------------------

In this way, the performance can reach native 20% in my multiple
tests.

Cc: Zhang Chen <zhangckid@gmail.com>
Cc: Li Zhijian <lizhijian@cn.fujitsu.com>
Cc: Jason Wang <jasowang@redhat.com>

Signed-off-by: Mao Zhongyi <maozy.fnst@cn.fujitsu.com>
Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
Signed-off-by: Zhang Chen <zhangckid@gmail.com>
Reviewed-by: Zhang Chen <zhangckid@gmail.com>
Tested-by: Zhang Chen <zhangckid@gmail.com>
Signed-off-by: Jason Wang <jasowang@redhat.com>
This commit is contained in:
Mao Zhongyi 2017-12-25 10:54:12 +08:00 committed by Jason Wang
parent 9394133f86
commit f449c9e549
4 changed files with 251 additions and 120 deletions

View File

@ -37,6 +37,9 @@
#define COMPARE_READ_LEN_MAX NET_BUFSIZE #define COMPARE_READ_LEN_MAX NET_BUFSIZE
#define MAX_QUEUE_SIZE 1024 #define MAX_QUEUE_SIZE 1024
#define COLO_COMPARE_FREE_PRIMARY 0x01
#define COLO_COMPARE_FREE_SECONDARY 0x02
/* TODO: Should be configurable */ /* TODO: Should be configurable */
#define REGULAR_PACKET_CHECK_MS 3000 #define REGULAR_PACKET_CHECK_MS 3000
@ -111,14 +114,32 @@ static gint seq_sorter(Packet *a, Packet *b, gpointer data)
return ntohl(atcp->th_seq) - ntohl(btcp->th_seq); return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
} }
static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
{
Packet *pkt = data;
struct tcphdr *tcphd;
tcphd = (struct tcphdr *)pkt->transport_header;
pkt->tcp_seq = ntohl(tcphd->th_seq);
pkt->tcp_ack = ntohl(tcphd->th_ack);
*max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
+ (tcphd->th_off << 2) - pkt->vnet_hdr_len;
pkt->payload_size = pkt->size - pkt->header_size;
pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
pkt->flags = tcphd->th_flags;
}
/* /*
* Return 1 on success, if return 0 means the * Return 1 on success, if return 0 means the
* packet will be dropped * packet will be dropped
*/ */
static int colo_insert_packet(GQueue *queue, Packet *pkt) static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
{ {
if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) { if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) {
if (pkt->ip->ip_p == IPPROTO_TCP) { if (pkt->ip->ip_p == IPPROTO_TCP) {
fill_pkt_tcp_info(pkt, max_ack);
g_queue_insert_sorted(queue, g_queue_insert_sorted(queue,
pkt, pkt,
(GCompareDataFunc)seq_sorter, (GCompareDataFunc)seq_sorter,
@ -168,12 +189,12 @@ static int packet_enqueue(CompareState *s, int mode, Connection **con)
} }
if (mode == PRIMARY_IN) { if (mode == PRIMARY_IN) {
if (!colo_insert_packet(&conn->primary_list, pkt)) { if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) {
error_report("colo compare primary queue size too big," error_report("colo compare primary queue size too big,"
"drop packet"); "drop packet");
} }
} else { } else {
if (!colo_insert_packet(&conn->secondary_list, pkt)) { if (!colo_insert_packet(&conn->secondary_list, pkt, &conn->sack)) {
error_report("colo compare secondary queue size too big," error_report("colo compare secondary queue size too big,"
"drop packet"); "drop packet");
} }
@ -183,6 +204,25 @@ static int packet_enqueue(CompareState *s, int mode, Connection **con)
return 0; return 0;
} }
static inline bool after(uint32_t seq1, uint32_t seq2)
{
return (int32_t)(seq1 - seq2) > 0;
}
static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
{
int ret;
ret = compare_chr_send(s,
pkt->data,
pkt->size,
pkt->vnet_hdr_len);
if (ret < 0) {
error_report("colo send primary packet failed");
}
trace_colo_compare_main("packet same and release packet");
packet_destroy(pkt, NULL);
}
/* /*
* The IP packets sent by primary and secondary * The IP packets sent by primary and secondary
* will be compared in here * will be compared in here
@ -214,103 +254,174 @@ static int colo_compare_packet_payload(Packet *ppkt,
} }
/* /*
* Called from the compare thread on the primary * return true means that the payload is consist and
* for compare tcp packet * need to make the next comparison, false means do
* compare_tcp copied from Dr. David Alan Gilbert's branch * the checkpoint
*/ */
static int colo_packet_compare_tcp(Packet *spkt, Packet *ppkt) static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
int8_t *mark, uint32_t max_ack)
{ {
struct tcphdr *ptcp, *stcp; *mark = 0;
int res;
trace_colo_compare_main("compare tcp"); if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
if (colo_compare_packet_payload(ppkt, spkt,
ptcp = (struct tcphdr *)ppkt->transport_header; ppkt->header_size, spkt->header_size,
stcp = (struct tcphdr *)spkt->transport_header; ppkt->payload_size)) {
*mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
/* return true;
* The 'identification' field in the IP header is *very* random }
* it almost never matches. Fudge this by ignoring differences in }
* unfragmented packets; they'll normally sort themselves out if different if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
* anyway, and it should recover at the TCP level. if (colo_compare_packet_payload(ppkt, spkt,
* An alternative would be to get both the primary and secondary to rewrite ppkt->header_size, spkt->header_size,
* somehow; but that would need some sync traffic to sync the state ppkt->payload_size)) {
*/ *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
if (ntohs(ppkt->ip->ip_off) & IP_DF) { return true;
spkt->ip->ip_id = ppkt->ip->ip_id; }
/* and the sum will be different if the IDs were different */
spkt->ip->ip_sum = ppkt->ip->ip_sum;
} }
/* /* one part of secondary packet payload still need to be compared */
* Check tcp header length for tcp option field. if (!after(ppkt->seq_end, spkt->seq_end)) {
* th_off > 5 means this tcp packet have options field. if (colo_compare_packet_payload(ppkt, spkt,
* The tcp options maybe always different. ppkt->header_size + ppkt->offset,
* for example: spkt->header_size + spkt->offset,
* From RFC 7323. ppkt->payload_size - ppkt->offset)) {
* TCP Timestamps option (TSopt): if (!after(ppkt->tcp_ack, max_ack)) {
* Kind: 8 *mark = COLO_COMPARE_FREE_PRIMARY;
* spkt->offset += ppkt->payload_size - ppkt->offset;
* Length: 10 bytes return true;
*
* +-------+-------+---------------------+---------------------+
* |Kind=8 | 10 | TS Value (TSval) |TS Echo Reply (TSecr)|
* +-------+-------+---------------------+---------------------+
* 1 1 4 4
*
* In this case the primary guest's timestamp always different with
* the secondary guest's timestamp. COLO just focus on payload,
* so we just need skip this field.
*/
ptrdiff_t ptcp_offset, stcp_offset;
ptcp_offset = ppkt->transport_header - (uint8_t *)ppkt->data
+ (ptcp->th_off << 2) - ppkt->vnet_hdr_len;
stcp_offset = spkt->transport_header - (uint8_t *)spkt->data
+ (stcp->th_off << 2) - spkt->vnet_hdr_len;
if (ppkt->size - ptcp_offset == spkt->size - stcp_offset) {
res = colo_compare_packet_payload(ppkt, spkt,
ptcp_offset, stcp_offset,
ppkt->size - ptcp_offset);
} else { } else {
trace_colo_compare_main("TCP: payload size of packets are different"); /* secondary guest hasn't ack the data, don't send
res = -1; * out this packet
*/
return false;
}
}
} else {
/* primary packet is longer than secondary packet, compare
* the same part and mark the primary packet offset
*/
if (colo_compare_packet_payload(ppkt, spkt,
ppkt->header_size + ppkt->offset,
spkt->header_size + spkt->offset,
spkt->payload_size - spkt->offset)) {
*mark = COLO_COMPARE_FREE_SECONDARY;
ppkt->offset += spkt->payload_size - spkt->offset;
return true;
}
} }
if (res != 0 && return false;
trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { }
char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src)); static void colo_compare_tcp(CompareState *s, Connection *conn)
strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst)); {
strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src)); Packet *ppkt = NULL, *spkt = NULL;
strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst)); int8_t mark;
trace_colo_compare_ip_info(ppkt->size, pri_ip_src, /*
pri_ip_dst, spkt->size, * If ppkt and spkt have the same payload, but ppkt's ACK
sec_ip_src, sec_ip_dst); * is greater than spkt's ACK, in this case we can not
* send the ppkt because it will cause the secondary guest
* to miss sending some data in the next. Therefore, we
* record the maximum ACK in the current queue at both
* primary side and secondary side. Only when the ack is
* less than the smaller of the two maximum ack, then we
* can ensure that the packet's payload is acknowledged by
* primary and secondary.
*/
uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
trace_colo_compare_tcp_info("pri tcp packet", pri:
ntohl(ptcp->th_seq), if (g_queue_is_empty(&conn->primary_list)) {
ntohl(ptcp->th_ack), return;
res, ptcp->th_flags, }
ppkt->size); ppkt = g_queue_pop_head(&conn->primary_list);
sec:
if (g_queue_is_empty(&conn->secondary_list)) {
g_queue_push_head(&conn->primary_list, ppkt);
return;
}
spkt = g_queue_pop_head(&conn->secondary_list);
trace_colo_compare_tcp_info("sec tcp packet", if (ppkt->tcp_seq == ppkt->seq_end) {
ntohl(stcp->th_seq), colo_release_primary_pkt(s, ppkt);
ntohl(stcp->th_ack), ppkt = NULL;
res, stcp->th_flags, }
spkt->size);
if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
trace_colo_compare_main("pri: this packet has compared");
colo_release_primary_pkt(s, ppkt);
ppkt = NULL;
}
if (spkt->tcp_seq == spkt->seq_end) {
packet_destroy(spkt, NULL);
if (!ppkt) {
goto pri;
} else {
goto sec;
}
} else {
if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
trace_colo_compare_main("sec: this packet has compared");
packet_destroy(spkt, NULL);
if (!ppkt) {
goto pri;
} else {
goto sec;
}
}
if (!ppkt) {
g_queue_push_head(&conn->secondary_list, spkt);
goto pri;
}
}
if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
trace_colo_compare_tcp_info("pri",
ppkt->tcp_seq, ppkt->tcp_ack,
ppkt->header_size, ppkt->payload_size,
ppkt->offset, ppkt->flags);
trace_colo_compare_tcp_info("sec",
spkt->tcp_seq, spkt->tcp_ack,
spkt->header_size, spkt->payload_size,
spkt->offset, spkt->flags);
if (mark == COLO_COMPARE_FREE_PRIMARY) {
conn->compare_seq = ppkt->seq_end;
colo_release_primary_pkt(s, ppkt);
g_queue_push_head(&conn->secondary_list, spkt);
goto pri;
}
if (mark == COLO_COMPARE_FREE_SECONDARY) {
conn->compare_seq = spkt->seq_end;
packet_destroy(spkt, NULL);
goto sec;
}
if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
conn->compare_seq = ppkt->seq_end;
colo_release_primary_pkt(s, ppkt);
packet_destroy(spkt, NULL);
goto pri;
}
} else {
g_queue_push_head(&conn->primary_list, ppkt);
g_queue_push_head(&conn->secondary_list, spkt);
qemu_hexdump((char *)ppkt->data, stderr, qemu_hexdump((char *)ppkt->data, stderr,
"colo-compare ppkt", ppkt->size); "colo-compare ppkt", ppkt->size);
qemu_hexdump((char *)spkt->data, stderr, qemu_hexdump((char *)spkt->data, stderr,
"colo-compare spkt", spkt->size); "colo-compare spkt", spkt->size);
/*
* colo_compare_inconsistent_notify();
* TODO: notice to checkpoint();
*/
}
} }
return res;
}
/* /*
* Called from the compare thread on the primary * Called from the compare thread on the primary
@ -477,53 +588,22 @@ static void colo_old_packet_check(void *opaque)
(GCompareFunc)colo_old_packet_check_one_conn); (GCompareFunc)colo_old_packet_check_one_conn);
} }
/* static void colo_compare_packet(CompareState *s, Connection *conn,
* Called from the compare thread on the primary int (*HandlePacket)(Packet *spkt,
* for compare packet with secondary list of the Packet *ppkt))
* specified connection when a new packet was
* queued to it.
*/
static void colo_compare_connection(void *opaque, void *user_data)
{ {
CompareState *s = user_data;
Connection *conn = opaque;
Packet *pkt = NULL; Packet *pkt = NULL;
GList *result = NULL; GList *result = NULL;
int ret;
while (!g_queue_is_empty(&conn->primary_list) && while (!g_queue_is_empty(&conn->primary_list) &&
!g_queue_is_empty(&conn->secondary_list)) { !g_queue_is_empty(&conn->secondary_list)) {
pkt = g_queue_pop_head(&conn->primary_list); pkt = g_queue_pop_head(&conn->primary_list);
switch (conn->ip_proto) {
case IPPROTO_TCP:
result = g_queue_find_custom(&conn->secondary_list, result = g_queue_find_custom(&conn->secondary_list,
pkt, (GCompareFunc)colo_packet_compare_tcp); pkt, (GCompareFunc)HandlePacket);
break;
case IPPROTO_UDP:
result = g_queue_find_custom(&conn->secondary_list,
pkt, (GCompareFunc)colo_packet_compare_udp);
break;
case IPPROTO_ICMP:
result = g_queue_find_custom(&conn->secondary_list,
pkt, (GCompareFunc)colo_packet_compare_icmp);
break;
default:
result = g_queue_find_custom(&conn->secondary_list,
pkt, (GCompareFunc)colo_packet_compare_other);
break;
}
if (result) { if (result) {
ret = compare_chr_send(s, colo_release_primary_pkt(s, pkt);
pkt->data,
pkt->size,
pkt->vnet_hdr_len);
if (ret < 0) {
error_report("colo_send_primary_packet failed");
}
trace_colo_compare_main("packet same and release packet");
g_queue_remove(&conn->secondary_list, result->data); g_queue_remove(&conn->secondary_list, result->data);
packet_destroy(pkt, NULL);
} else { } else {
/* /*
* If one packet arrive late, the secondary_list or * If one packet arrive late, the secondary_list or
@ -538,6 +618,33 @@ static void colo_compare_connection(void *opaque, void *user_data)
} }
} }
/*
* Called from the compare thread on the primary
* for compare packet with secondary list of the
* specified connection when a new packet was
* queued to it.
*/
static void colo_compare_connection(void *opaque, void *user_data)
{
CompareState *s = user_data;
Connection *conn = opaque;
switch (conn->ip_proto) {
case IPPROTO_TCP:
colo_compare_tcp(s, conn);
break;
case IPPROTO_UDP:
colo_compare_packet(s, conn, colo_packet_compare_udp);
break;
case IPPROTO_ICMP:
colo_compare_packet(s, conn, colo_packet_compare_icmp);
break;
default:
colo_compare_packet(s, conn, colo_packet_compare_other);
break;
}
}
static int compare_chr_send(CompareState *s, static int compare_chr_send(CompareState *s,
const uint8_t *buf, const uint8_t *buf,
uint32_t size, uint32_t size,

View File

@ -138,6 +138,8 @@ Connection *connection_new(ConnectionKey *key)
conn->processing = false; conn->processing = false;
conn->offset = 0; conn->offset = 0;
conn->syn_flag = 0; conn->syn_flag = 0;
conn->pack = 0;
conn->sack = 0;
g_queue_init(&conn->primary_list); g_queue_init(&conn->primary_list);
g_queue_init(&conn->secondary_list); g_queue_init(&conn->secondary_list);
@ -163,6 +165,13 @@ Packet *packet_new(const void *data, int size, int vnet_hdr_len)
pkt->size = size; pkt->size = size;
pkt->creation_ms = qemu_clock_get_ms(QEMU_CLOCK_HOST); pkt->creation_ms = qemu_clock_get_ms(QEMU_CLOCK_HOST);
pkt->vnet_hdr_len = vnet_hdr_len; pkt->vnet_hdr_len = vnet_hdr_len;
pkt->tcp_seq = 0;
pkt->tcp_ack = 0;
pkt->seq_end = 0;
pkt->header_size = 0;
pkt->payload_size = 0;
pkt->offset = 0;
pkt->flags = 0;
return pkt; return pkt;
} }

View File

@ -45,6 +45,15 @@ typedef struct Packet {
int64_t creation_ms; int64_t creation_ms;
/* Get vnet_hdr_len from filter */ /* Get vnet_hdr_len from filter */
uint32_t vnet_hdr_len; uint32_t vnet_hdr_len;
uint32_t tcp_seq; /* sequence number */
uint32_t tcp_ack; /* acknowledgement number */
/* the sequence number of the last byte of the packet */
uint32_t seq_end;
uint8_t header_size; /* the header length */
uint16_t payload_size; /* the payload length */
/* record the payload offset(the length that has been compared) */
uint16_t offset;
uint8_t flags; /* Flags(aka Control bits) */
} Packet; } Packet;
typedef struct ConnectionKey { typedef struct ConnectionKey {
@ -64,6 +73,12 @@ typedef struct Connection {
/* flag to enqueue unprocessed_connections */ /* flag to enqueue unprocessed_connections */
bool processing; bool processing;
uint8_t ip_proto; uint8_t ip_proto;
/* record the sequence number that has been compared */
uint32_t compare_seq;
/* the maximum of acknowledgement number in primary_list queue */
uint32_t pack;
/* the maximum of acknowledgement number in secondary_list queue */
uint32_t sack;
/* offset = secondary_seq - primary_seq */ /* offset = secondary_seq - primary_seq */
tcp_seq offset; tcp_seq offset;
/* /*

View File

@ -13,7 +13,7 @@ colo_compare_icmp_miscompare(const char *sta, int size) ": %s = %d"
colo_compare_ip_info(int psize, const char *sta, const char *stb, int ssize, const char *stc, const char *std) "ppkt size = %d, ip_src = %s, ip_dst = %s, spkt size = %d, ip_src = %s, ip_dst = %s" colo_compare_ip_info(int psize, const char *sta, const char *stb, int ssize, const char *stc, const char *std) "ppkt size = %d, ip_src = %s, ip_dst = %s, spkt size = %d, ip_src = %s, ip_dst = %s"
colo_old_packet_check_found(int64_t old_time) "%" PRId64 colo_old_packet_check_found(int64_t old_time) "%" PRId64
colo_compare_miscompare(void) "" colo_compare_miscompare(void) ""
colo_compare_tcp_info(const char *pkt, uint32_t seq, uint32_t ack, int res, uint32_t flag, int size) "side: %s seq/ack= %u/%u res= %d flags= 0x%x pkt_size: %d\n" colo_compare_tcp_info(const char *pkt, uint32_t seq, uint32_t ack, int hdlen, int pdlen, int offset, int flags) "%s: seq/ack= %u/%u hdlen= %d pdlen= %d offset= %d flags=%d\n"
# net/filter-rewriter.c # net/filter-rewriter.c
colo_filter_rewriter_debug(void) "" colo_filter_rewriter_debug(void) ""