2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3 * (a.k.a. Fault Tolerance or Continuous Replication)
5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6 * Copyright (c) 2016 FUJITSU LIMITED
7 * Copyright (c) 2016 Intel Corporation
9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
11 * This work is licensed under the terms of the GNU GPL, version 2 or
12 * later. See the COPYING file in the top-level directory.
15 #include "qemu/osdep.h"
16 #include "qemu/error-report.h"
18 #include "qemu-common.h"
19 #include "qapi/qmp/qerror.h"
20 #include "qapi/error.h"
23 #include "qom/object_interfaces.h"
25 #include "qom/object.h"
26 #include "qemu/typedefs.h"
27 #include "net/queue.h"
28 #include "sysemu/char.h"
29 #include "qemu/sockets.h"
30 #include "qapi-visit.h"
33 #define TYPE_COLO_COMPARE "colo-compare"
34 #define COLO_COMPARE(obj) \
35 OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
37 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
38 #define MAX_QUEUE_SIZE 1024
40 /* TODO: Should be configurable */
41 #define REGULAR_PACKET_CHECK_MS 3000
46 +---------------+ +---------------+ +---------------+
47 |conn list +--->conn +--------->conn |
48 +---------------+ +---------------+ +---------------+
50 +---------------+ +---v----+ +---v----+ +---v----+ +---v----+
51 |primary | |secondary |primary | |secondary
52 |packet | |packet + |packet | |packet +
53 +--------+ +--------+ +--------+ +--------+
55 +---v----+ +---v----+ +---v----+ +---v----+
56 |primary | |secondary |primary | |secondary
57 |packet | |packet + |packet | |packet +
58 +--------+ +--------+ +--------+ +--------+
60 +---v----+ +---v----+ +---v----+ +---v----+
61 |primary | |secondary |primary | |secondary
62 |packet | |packet + |packet | |packet +
63 +--------+ +--------+ +--------+ +--------+
65 typedef struct CompareState
{
71 CharBackend chr_pri_in
;
72 CharBackend chr_sec_in
;
74 SocketReadState pri_rs
;
75 SocketReadState sec_rs
;
77 /* connection list: the connections belonged to this NIC could be found
79 * element type: Connection
82 /* hashtable to save connection */
83 GHashTable
*connection_track_table
;
84 /* compare thread, a thread for each NIC */
86 /* Timer used on the primary to find packets that are never matched */
88 QemuMutex timer_check_lock
;
91 typedef struct CompareClass
{
92 ObjectClass parent_class
;
95 typedef struct CompareChardevProps
{
97 } CompareChardevProps
;
104 static int compare_chr_send(CharBackend
*out
,
109 * Return 0 on success, if return -1 means the pkt
110 * is unsupported(arp and ipv6) and will be sent later
112 static int packet_enqueue(CompareState
*s
, int mode
)
118 if (mode
== PRIMARY_IN
) {
119 pkt
= packet_new(s
->pri_rs
.buf
, s
->pri_rs
.packet_len
);
121 pkt
= packet_new(s
->sec_rs
.buf
, s
->sec_rs
.packet_len
);
124 if (parse_packet_early(pkt
)) {
125 packet_destroy(pkt
, NULL
);
129 fill_connection_key(pkt
, &key
);
131 conn
= connection_get(s
->connection_track_table
,
135 if (!conn
->processing
) {
136 g_queue_push_tail(&s
->conn_list
, conn
);
137 conn
->processing
= true;
140 if (mode
== PRIMARY_IN
) {
141 if (g_queue_get_length(&conn
->primary_list
) <=
143 g_queue_push_tail(&conn
->primary_list
, pkt
);
145 error_report("colo compare primary queue size too big,"
149 if (g_queue_get_length(&conn
->secondary_list
) <=
151 g_queue_push_tail(&conn
->secondary_list
, pkt
);
153 error_report("colo compare secondary queue size too big,"
162 * The IP packets sent by primary and secondary
163 * will be compared in here
164 * TODO support ip fragment, Out-Of-Order
165 * return: 0 means packet same
166 * > 0 || < 0 means packet different
168 static int colo_packet_compare(Packet
*ppkt
, Packet
*spkt
)
170 trace_colo_compare_ip_info(ppkt
->size
, inet_ntoa(ppkt
->ip
->ip_src
),
171 inet_ntoa(ppkt
->ip
->ip_dst
), spkt
->size
,
172 inet_ntoa(spkt
->ip
->ip_src
),
173 inet_ntoa(spkt
->ip
->ip_dst
));
175 if (ppkt
->size
== spkt
->size
) {
176 return memcmp(ppkt
->data
, spkt
->data
, spkt
->size
);
183 * Called from the compare thread on the primary
184 * for compare tcp packet
185 * compare_tcp copied from Dr. David Alan Gilbert's branch
187 static int colo_packet_compare_tcp(Packet
*spkt
, Packet
*ppkt
)
189 struct tcphdr
*ptcp
, *stcp
;
191 char *sdebug
, *ddebug
;
193 trace_colo_compare_main("compare tcp");
194 if (ppkt
->size
!= spkt
->size
) {
195 if (trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE
)) {
196 trace_colo_compare_main("pkt size not same");
201 ptcp
= (struct tcphdr
*)ppkt
->transport_header
;
202 stcp
= (struct tcphdr
*)spkt
->transport_header
;
205 * The 'identification' field in the IP header is *very* random
206 * it almost never matches. Fudge this by ignoring differences in
207 * unfragmented packets; they'll normally sort themselves out if different
208 * anyway, and it should recover at the TCP level.
209 * An alternative would be to get both the primary and secondary to rewrite
210 * somehow; but that would need some sync traffic to sync the state
212 if (ntohs(ppkt
->ip
->ip_off
) & IP_DF
) {
213 spkt
->ip
->ip_id
= ppkt
->ip
->ip_id
;
214 /* and the sum will be different if the IDs were different */
215 spkt
->ip
->ip_sum
= ppkt
->ip
->ip_sum
;
218 res
= memcmp(ppkt
->data
+ ETH_HLEN
, spkt
->data
+ ETH_HLEN
,
219 (spkt
->size
- ETH_HLEN
));
221 if (res
!= 0 && trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE
)) {
222 sdebug
= strdup(inet_ntoa(ppkt
->ip
->ip_src
));
223 ddebug
= strdup(inet_ntoa(ppkt
->ip
->ip_dst
));
224 fprintf(stderr
, "%s: src/dst: %s/%s p: seq/ack=%u/%u"
225 " s: seq/ack=%u/%u res=%d flags=%x/%x\n",
226 __func__
, sdebug
, ddebug
,
227 (unsigned int)ntohl(ptcp
->th_seq
),
228 (unsigned int)ntohl(ptcp
->th_ack
),
229 (unsigned int)ntohl(stcp
->th_seq
),
230 (unsigned int)ntohl(stcp
->th_ack
),
231 res
, ptcp
->th_flags
, stcp
->th_flags
);
233 fprintf(stderr
, "Primary len = %d\n", ppkt
->size
);
234 qemu_hexdump((char *)ppkt
->data
, stderr
, "colo-compare", ppkt
->size
);
235 fprintf(stderr
, "Secondary len = %d\n", spkt
->size
);
236 qemu_hexdump((char *)spkt
->data
, stderr
, "colo-compare", spkt
->size
);
246 * Called from the compare thread on the primary
247 * for compare udp packet
249 static int colo_packet_compare_udp(Packet
*spkt
, Packet
*ppkt
)
253 trace_colo_compare_main("compare udp");
254 ret
= colo_packet_compare(ppkt
, spkt
);
257 trace_colo_compare_udp_miscompare("primary pkt size", ppkt
->size
);
258 qemu_hexdump((char *)ppkt
->data
, stderr
, "colo-compare", ppkt
->size
);
259 trace_colo_compare_udp_miscompare("Secondary pkt size", spkt
->size
);
260 qemu_hexdump((char *)spkt
->data
, stderr
, "colo-compare", spkt
->size
);
267 * Called from the compare thread on the primary
268 * for compare icmp packet
270 static int colo_packet_compare_icmp(Packet
*spkt
, Packet
*ppkt
)
274 trace_colo_compare_main("compare icmp");
275 network_length
= ppkt
->ip
->ip_hl
* 4;
276 if (ppkt
->size
!= spkt
->size
||
277 ppkt
->size
< network_length
+ ETH_HLEN
) {
281 if (colo_packet_compare(ppkt
, spkt
)) {
282 trace_colo_compare_icmp_miscompare("primary pkt size",
284 qemu_hexdump((char *)ppkt
->data
, stderr
, "colo-compare",
286 trace_colo_compare_icmp_miscompare("Secondary pkt size",
288 qemu_hexdump((char *)spkt
->data
, stderr
, "colo-compare",
297 * Called from the compare thread on the primary
298 * for compare other packet
300 static int colo_packet_compare_other(Packet
*spkt
, Packet
*ppkt
)
302 trace_colo_compare_main("compare other");
303 trace_colo_compare_ip_info(ppkt
->size
, inet_ntoa(ppkt
->ip
->ip_src
),
304 inet_ntoa(ppkt
->ip
->ip_dst
), spkt
->size
,
305 inet_ntoa(spkt
->ip
->ip_src
),
306 inet_ntoa(spkt
->ip
->ip_dst
));
307 return colo_packet_compare(ppkt
, spkt
);
310 static int colo_old_packet_check_one(Packet
*pkt
, int64_t *check_time
)
312 int64_t now
= qemu_clock_get_ms(QEMU_CLOCK_HOST
);
314 if ((now
- pkt
->creation_ms
) > (*check_time
)) {
315 trace_colo_old_packet_check_found(pkt
->creation_ms
);
322 static void colo_old_packet_check_one_conn(void *opaque
,
325 Connection
*conn
= opaque
;
326 GList
*result
= NULL
;
327 int64_t check_time
= REGULAR_PACKET_CHECK_MS
;
329 result
= g_queue_find_custom(&conn
->primary_list
,
331 (GCompareFunc
)colo_old_packet_check_one
);
334 /* do checkpoint will flush old packet */
335 /* TODO: colo_notify_checkpoint();*/
340 * Look for old packets that the secondary hasn't matched,
341 * if we have some then we have to checkpoint to wake
344 static void colo_old_packet_check(void *opaque
)
346 CompareState
*s
= opaque
;
348 g_queue_foreach(&s
->conn_list
, colo_old_packet_check_one_conn
, NULL
);
352 * Called from the compare thread on the primary
353 * for compare connection
355 static void colo_compare_connection(void *opaque
, void *user_data
)
357 CompareState
*s
= user_data
;
358 Connection
*conn
= opaque
;
360 GList
*result
= NULL
;
363 while (!g_queue_is_empty(&conn
->primary_list
) &&
364 !g_queue_is_empty(&conn
->secondary_list
)) {
365 qemu_mutex_lock(&s
->timer_check_lock
);
366 pkt
= g_queue_pop_tail(&conn
->primary_list
);
367 qemu_mutex_unlock(&s
->timer_check_lock
);
368 switch (conn
->ip_proto
) {
370 result
= g_queue_find_custom(&conn
->secondary_list
,
371 pkt
, (GCompareFunc
)colo_packet_compare_tcp
);
374 result
= g_queue_find_custom(&conn
->secondary_list
,
375 pkt
, (GCompareFunc
)colo_packet_compare_udp
);
378 result
= g_queue_find_custom(&conn
->secondary_list
,
379 pkt
, (GCompareFunc
)colo_packet_compare_icmp
);
382 result
= g_queue_find_custom(&conn
->secondary_list
,
383 pkt
, (GCompareFunc
)colo_packet_compare_other
);
388 ret
= compare_chr_send(&s
->chr_out
, pkt
->data
, pkt
->size
);
390 error_report("colo_send_primary_packet failed");
392 trace_colo_compare_main("packet same and release packet");
393 g_queue_remove(&conn
->secondary_list
, result
->data
);
394 packet_destroy(pkt
, NULL
);
397 * If one packet arrive late, the secondary_list or
398 * primary_list will be empty, so we can't compare it
399 * until next comparison.
401 trace_colo_compare_main("packet different");
402 qemu_mutex_lock(&s
->timer_check_lock
);
403 g_queue_push_tail(&conn
->primary_list
, pkt
);
404 qemu_mutex_unlock(&s
->timer_check_lock
);
405 /* TODO: colo_notify_checkpoint();*/
411 static int compare_chr_send(CharBackend
*out
,
416 uint32_t len
= htonl(size
);
422 ret
= qemu_chr_fe_write_all(out
, (uint8_t *)&len
, sizeof(len
));
423 if (ret
!= sizeof(len
)) {
427 ret
= qemu_chr_fe_write_all(out
, (uint8_t *)buf
, size
);
435 return ret
< 0 ? ret
: -EIO
;
438 static int compare_chr_can_read(void *opaque
)
440 return COMPARE_READ_LEN_MAX
;
444 * Called from the main thread on the primary for packets
445 * arriving over the socket from the primary.
447 static void compare_pri_chr_in(void *opaque
, const uint8_t *buf
, int size
)
449 CompareState
*s
= COLO_COMPARE(opaque
);
452 ret
= net_fill_rstate(&s
->pri_rs
, buf
, size
);
454 qemu_chr_fe_set_handlers(&s
->chr_pri_in
, NULL
, NULL
, NULL
, NULL
, NULL
);
455 error_report("colo-compare primary_in error");
460 * Called from the main thread on the primary for packets
461 * arriving over the socket from the secondary.
463 static void compare_sec_chr_in(void *opaque
, const uint8_t *buf
, int size
)
465 CompareState
*s
= COLO_COMPARE(opaque
);
468 ret
= net_fill_rstate(&s
->sec_rs
, buf
, size
);
470 qemu_chr_fe_set_handlers(&s
->chr_sec_in
, NULL
, NULL
, NULL
, NULL
, NULL
);
471 error_report("colo-compare secondary_in error");
475 static void *colo_compare_thread(void *opaque
)
477 GMainContext
*worker_context
;
478 GMainLoop
*compare_loop
;
479 CompareState
*s
= opaque
;
481 worker_context
= g_main_context_new();
483 qemu_chr_fe_set_handlers(&s
->chr_pri_in
, compare_chr_can_read
,
484 compare_pri_chr_in
, NULL
, s
, worker_context
);
485 qemu_chr_fe_set_handlers(&s
->chr_sec_in
, compare_chr_can_read
,
486 compare_sec_chr_in
, NULL
, s
, worker_context
);
488 compare_loop
= g_main_loop_new(worker_context
, FALSE
);
490 g_main_loop_run(compare_loop
);
492 g_main_loop_unref(compare_loop
);
493 g_main_context_unref(worker_context
);
497 static char *compare_get_pri_indev(Object
*obj
, Error
**errp
)
499 CompareState
*s
= COLO_COMPARE(obj
);
501 return g_strdup(s
->pri_indev
);
504 static void compare_set_pri_indev(Object
*obj
, const char *value
, Error
**errp
)
506 CompareState
*s
= COLO_COMPARE(obj
);
508 g_free(s
->pri_indev
);
509 s
->pri_indev
= g_strdup(value
);
512 static char *compare_get_sec_indev(Object
*obj
, Error
**errp
)
514 CompareState
*s
= COLO_COMPARE(obj
);
516 return g_strdup(s
->sec_indev
);
519 static void compare_set_sec_indev(Object
*obj
, const char *value
, Error
**errp
)
521 CompareState
*s
= COLO_COMPARE(obj
);
523 g_free(s
->sec_indev
);
524 s
->sec_indev
= g_strdup(value
);
527 static char *compare_get_outdev(Object
*obj
, Error
**errp
)
529 CompareState
*s
= COLO_COMPARE(obj
);
531 return g_strdup(s
->outdev
);
534 static void compare_set_outdev(Object
*obj
, const char *value
, Error
**errp
)
536 CompareState
*s
= COLO_COMPARE(obj
);
539 s
->outdev
= g_strdup(value
);
542 static void compare_pri_rs_finalize(SocketReadState
*pri_rs
)
544 CompareState
*s
= container_of(pri_rs
, CompareState
, pri_rs
);
546 if (packet_enqueue(s
, PRIMARY_IN
)) {
547 trace_colo_compare_main("primary: unsupported packet in");
548 compare_chr_send(&s
->chr_out
, pri_rs
->buf
, pri_rs
->packet_len
);
550 /* compare connection */
551 g_queue_foreach(&s
->conn_list
, colo_compare_connection
, s
);
555 static void compare_sec_rs_finalize(SocketReadState
*sec_rs
)
557 CompareState
*s
= container_of(sec_rs
, CompareState
, sec_rs
);
559 if (packet_enqueue(s
, SECONDARY_IN
)) {
560 trace_colo_compare_main("secondary: unsupported packet in");
562 /* compare connection */
563 g_queue_foreach(&s
->conn_list
, colo_compare_connection
, s
);
569 * Return 0 is success.
570 * Return 1 is failed.
572 static int find_and_check_chardev(CharDriverState
**chr
,
576 CompareChardevProps props
;
578 *chr
= qemu_chr_find(chr_name
);
580 error_setg(errp
, "Device '%s' not found",
585 memset(&props
, 0, sizeof(props
));
587 if (!qemu_chr_has_feature(*chr
, QEMU_CHAR_FEATURE_RECONNECTABLE
)) {
588 error_setg(errp
, "chardev \"%s\" is not reconnectable",
597 * Check old packet regularly so it can watch for any packets
598 * that the secondary hasn't produced equivalents of.
600 static void check_old_packet_regular(void *opaque
)
602 CompareState
*s
= opaque
;
604 timer_mod(s
->timer
, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL
) +
605 REGULAR_PACKET_CHECK_MS
);
606 /* if have old packet we will notify checkpoint */
608 * TODO: Make timer handler run in compare thread
609 * like qemu_chr_add_handlers_full.
611 qemu_mutex_lock(&s
->timer_check_lock
);
612 colo_old_packet_check(s
);
613 qemu_mutex_unlock(&s
->timer_check_lock
);
617 * Called from the main thread on the primary
618 * to setup colo-compare.
620 static void colo_compare_complete(UserCreatable
*uc
, Error
**errp
)
622 CompareState
*s
= COLO_COMPARE(uc
);
623 CharDriverState
*chr
;
624 char thread_name
[64];
625 static int compare_id
;
627 if (!s
->pri_indev
|| !s
->sec_indev
|| !s
->outdev
) {
628 error_setg(errp
, "colo compare needs 'primary_in' ,"
629 "'secondary_in','outdev' property set");
631 } else if (!strcmp(s
->pri_indev
, s
->outdev
) ||
632 !strcmp(s
->sec_indev
, s
->outdev
) ||
633 !strcmp(s
->pri_indev
, s
->sec_indev
)) {
634 error_setg(errp
, "'indev' and 'outdev' could not be same "
635 "for compare module");
639 if (find_and_check_chardev(&chr
, s
->pri_indev
, errp
) ||
640 !qemu_chr_fe_init(&s
->chr_pri_in
, chr
, errp
)) {
644 if (find_and_check_chardev(&chr
, s
->sec_indev
, errp
) ||
645 !qemu_chr_fe_init(&s
->chr_sec_in
, chr
, errp
)) {
649 if (find_and_check_chardev(&chr
, s
->outdev
, errp
) ||
650 !qemu_chr_fe_init(&s
->chr_out
, chr
, errp
)) {
654 net_socket_rs_init(&s
->pri_rs
, compare_pri_rs_finalize
);
655 net_socket_rs_init(&s
->sec_rs
, compare_sec_rs_finalize
);
657 g_queue_init(&s
->conn_list
);
658 qemu_mutex_init(&s
->timer_check_lock
);
660 s
->connection_track_table
= g_hash_table_new_full(connection_key_hash
,
661 connection_key_equal
,
665 sprintf(thread_name
, "colo-compare %d", compare_id
);
666 qemu_thread_create(&s
->thread
, thread_name
,
667 colo_compare_thread
, s
,
668 QEMU_THREAD_JOINABLE
);
671 /* A regular timer to kick any packets that the secondary doesn't match */
672 s
->timer
= timer_new_ms(QEMU_CLOCK_VIRTUAL
, /* Only when guest runs */
673 check_old_packet_regular
, s
);
674 timer_mod(s
->timer
, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL
) +
675 REGULAR_PACKET_CHECK_MS
);
680 static void colo_compare_class_init(ObjectClass
*oc
, void *data
)
682 UserCreatableClass
*ucc
= USER_CREATABLE_CLASS(oc
);
684 ucc
->complete
= colo_compare_complete
;
687 static void colo_compare_init(Object
*obj
)
689 object_property_add_str(obj
, "primary_in",
690 compare_get_pri_indev
, compare_set_pri_indev
,
692 object_property_add_str(obj
, "secondary_in",
693 compare_get_sec_indev
, compare_set_sec_indev
,
695 object_property_add_str(obj
, "outdev",
696 compare_get_outdev
, compare_set_outdev
,
700 static void colo_compare_finalize(Object
*obj
)
702 CompareState
*s
= COLO_COMPARE(obj
);
704 qemu_chr_fe_deinit(&s
->chr_pri_in
);
705 qemu_chr_fe_deinit(&s
->chr_sec_in
);
706 qemu_chr_fe_deinit(&s
->chr_out
);
708 g_queue_free(&s
->conn_list
);
710 if (qemu_thread_is_self(&s
->thread
)) {
711 /* compare connection */
712 g_queue_foreach(&s
->conn_list
, colo_compare_connection
, s
);
713 qemu_thread_join(&s
->thread
);
720 qemu_mutex_destroy(&s
->timer_check_lock
);
722 g_free(s
->pri_indev
);
723 g_free(s
->sec_indev
);
727 static const TypeInfo colo_compare_info
= {
728 .name
= TYPE_COLO_COMPARE
,
729 .parent
= TYPE_OBJECT
,
730 .instance_size
= sizeof(CompareState
),
731 .instance_init
= colo_compare_init
,
732 .instance_finalize
= colo_compare_finalize
,
733 .class_size
= sizeof(CompareClass
),
734 .class_init
= colo_compare_class_init
,
735 .interfaces
= (InterfaceInfo
[]) {
736 { TYPE_USER_CREATABLE
},
741 static void register_types(void)
743 type_register_static(&colo_compare_info
);
746 type_init(register_types
);