2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3 * (a.k.a. Fault Tolerance or Continuous Replication)
5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6 * Copyright (c) 2016 FUJITSU LIMITED
7 * Copyright (c) 2016 Intel Corporation
9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
11 * This work is licensed under the terms of the GNU GPL, version 2 or
12 * later. See the COPYING file in the top-level directory.
15 #include "qemu/osdep.h"
16 #include "qemu/error-report.h"
18 #include "qemu-common.h"
19 #include "qapi/qmp/qerror.h"
20 #include "qapi/error.h"
23 #include "qom/object_interfaces.h"
25 #include "qom/object.h"
26 #include "net/queue.h"
27 #include "chardev/char-fe.h"
28 #include "qemu/sockets.h"
29 #include "qapi-visit.h"
31 #include "sysemu/iothread.h"
33 #define TYPE_COLO_COMPARE "colo-compare"
34 #define COLO_COMPARE(obj) \
35 OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
37 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
38 #define MAX_QUEUE_SIZE 1024
40 /* TODO: Should be configurable */
41 #define REGULAR_PACKET_CHECK_MS 3000
46 * +---------------+ +---------------+ +---------------+
47 * | conn list + - > conn + ------- > conn + -- > ......
48 * +---------------+ +---------------+ +---------------+
50 * +---------------+ +---v----+ +---v----+ +---v----+ +---v----+
51 * |primary | |secondary |primary | |secondary
52 * |packet | |packet + |packet | |packet +
53 * +--------+ +--------+ +--------+ +--------+
55 * +---v----+ +---v----+ +---v----+ +---v----+
56 * |primary | |secondary |primary | |secondary
57 * |packet | |packet + |packet | |packet +
58 * +--------+ +--------+ +--------+ +--------+
60 * +---v----+ +---v----+ +---v----+ +---v----+
61 * |primary | |secondary |primary | |secondary
62 * |packet | |packet + |packet | |packet +
63 * +--------+ +--------+ +--------+ +--------+
65 typedef struct CompareState
{
71 CharBackend chr_pri_in
;
72 CharBackend chr_sec_in
;
74 SocketReadState pri_rs
;
75 SocketReadState sec_rs
;
79 * Record the connection that through the NIC
80 * Element type: Connection
83 /* Record the connection without repetition */
84 GHashTable
*connection_track_table
;
87 GMainContext
*worker_context
;
88 QEMUTimer
*packet_check_timer
;
91 typedef struct CompareClass
{
92 ObjectClass parent_class
;
100 static int compare_chr_send(CompareState
*s
,
103 uint32_t vnet_hdr_len
);
105 static gint
seq_sorter(Packet
*a
, Packet
*b
, gpointer data
)
107 struct tcphdr
*atcp
, *btcp
;
109 atcp
= (struct tcphdr
*)(a
->transport_header
);
110 btcp
= (struct tcphdr
*)(b
->transport_header
);
111 return ntohl(atcp
->th_seq
) - ntohl(btcp
->th_seq
);
115 * Return 1 on success, if return 0 means the
116 * packet will be dropped
118 static int colo_insert_packet(GQueue
*queue
, Packet
*pkt
)
120 if (g_queue_get_length(queue
) <= MAX_QUEUE_SIZE
) {
121 if (pkt
->ip
->ip_p
== IPPROTO_TCP
) {
122 g_queue_insert_sorted(queue
,
124 (GCompareDataFunc
)seq_sorter
,
127 g_queue_push_tail(queue
, pkt
);
135 * Return 0 on success, if return -1 means the pkt
136 * is unsupported(arp and ipv6) and will be sent later
138 static int packet_enqueue(CompareState
*s
, int mode
, Connection
**con
)
144 if (mode
== PRIMARY_IN
) {
145 pkt
= packet_new(s
->pri_rs
.buf
,
146 s
->pri_rs
.packet_len
,
147 s
->pri_rs
.vnet_hdr_len
);
149 pkt
= packet_new(s
->sec_rs
.buf
,
150 s
->sec_rs
.packet_len
,
151 s
->sec_rs
.vnet_hdr_len
);
154 if (parse_packet_early(pkt
)) {
155 packet_destroy(pkt
, NULL
);
159 fill_connection_key(pkt
, &key
);
161 conn
= connection_get(s
->connection_track_table
,
165 if (!conn
->processing
) {
166 g_queue_push_tail(&s
->conn_list
, conn
);
167 conn
->processing
= true;
170 if (mode
== PRIMARY_IN
) {
171 if (!colo_insert_packet(&conn
->primary_list
, pkt
)) {
172 error_report("colo compare primary queue size too big,"
176 if (!colo_insert_packet(&conn
->secondary_list
, pkt
)) {
177 error_report("colo compare secondary queue size too big,"
187 * The IP packets sent by primary and secondary
188 * will be compared in here
189 * TODO support ip fragment, Out-Of-Order
190 * return: 0 means packet same
191 * > 0 || < 0 means packet different
193 static int colo_compare_packet_payload(Packet
*ppkt
,
200 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE
)) {
201 char pri_ip_src
[20], pri_ip_dst
[20], sec_ip_src
[20], sec_ip_dst
[20];
203 strcpy(pri_ip_src
, inet_ntoa(ppkt
->ip
->ip_src
));
204 strcpy(pri_ip_dst
, inet_ntoa(ppkt
->ip
->ip_dst
));
205 strcpy(sec_ip_src
, inet_ntoa(spkt
->ip
->ip_src
));
206 strcpy(sec_ip_dst
, inet_ntoa(spkt
->ip
->ip_dst
));
208 trace_colo_compare_ip_info(ppkt
->size
, pri_ip_src
,
209 pri_ip_dst
, spkt
->size
,
210 sec_ip_src
, sec_ip_dst
);
213 return memcmp(ppkt
->data
+ poffset
, spkt
->data
+ soffset
, len
);
217 * Called from the compare thread on the primary
218 * for compare tcp packet
219 * compare_tcp copied from Dr. David Alan Gilbert's branch
221 static int colo_packet_compare_tcp(Packet
*spkt
, Packet
*ppkt
)
223 struct tcphdr
*ptcp
, *stcp
;
226 trace_colo_compare_main("compare tcp");
228 ptcp
= (struct tcphdr
*)ppkt
->transport_header
;
229 stcp
= (struct tcphdr
*)spkt
->transport_header
;
232 * The 'identification' field in the IP header is *very* random
233 * it almost never matches. Fudge this by ignoring differences in
234 * unfragmented packets; they'll normally sort themselves out if different
235 * anyway, and it should recover at the TCP level.
236 * An alternative would be to get both the primary and secondary to rewrite
237 * somehow; but that would need some sync traffic to sync the state
239 if (ntohs(ppkt
->ip
->ip_off
) & IP_DF
) {
240 spkt
->ip
->ip_id
= ppkt
->ip
->ip_id
;
241 /* and the sum will be different if the IDs were different */
242 spkt
->ip
->ip_sum
= ppkt
->ip
->ip_sum
;
246 * Check tcp header length for tcp option field.
247 * th_off > 5 means this tcp packet have options field.
248 * The tcp options maybe always different.
251 * TCP Timestamps option (TSopt):
256 * +-------+-------+---------------------+---------------------+
257 * |Kind=8 | 10 | TS Value (TSval) |TS Echo Reply (TSecr)|
258 * +-------+-------+---------------------+---------------------+
261 * In this case the primary guest's timestamp always different with
262 * the secondary guest's timestamp. COLO just focus on payload,
263 * so we just need skip this field.
266 ptrdiff_t ptcp_offset
, stcp_offset
;
268 ptcp_offset
= ppkt
->transport_header
- (uint8_t *)ppkt
->data
269 + (ptcp
->th_off
<< 2) - ppkt
->vnet_hdr_len
;
270 stcp_offset
= spkt
->transport_header
- (uint8_t *)spkt
->data
271 + (stcp
->th_off
<< 2) - spkt
->vnet_hdr_len
;
272 if (ppkt
->size
- ptcp_offset
== spkt
->size
- stcp_offset
) {
273 res
= colo_compare_packet_payload(ppkt
, spkt
,
274 ptcp_offset
, stcp_offset
,
275 ppkt
->size
- ptcp_offset
);
277 trace_colo_compare_main("TCP: payload size of packets are different");
282 trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE
)) {
283 char pri_ip_src
[20], pri_ip_dst
[20], sec_ip_src
[20], sec_ip_dst
[20];
285 strcpy(pri_ip_src
, inet_ntoa(ppkt
->ip
->ip_src
));
286 strcpy(pri_ip_dst
, inet_ntoa(ppkt
->ip
->ip_dst
));
287 strcpy(sec_ip_src
, inet_ntoa(spkt
->ip
->ip_src
));
288 strcpy(sec_ip_dst
, inet_ntoa(spkt
->ip
->ip_dst
));
290 trace_colo_compare_ip_info(ppkt
->size
, pri_ip_src
,
291 pri_ip_dst
, spkt
->size
,
292 sec_ip_src
, sec_ip_dst
);
294 trace_colo_compare_tcp_info("pri tcp packet",
300 trace_colo_compare_tcp_info("sec tcp packet",
306 qemu_hexdump((char *)ppkt
->data
, stderr
,
307 "colo-compare ppkt", ppkt
->size
);
308 qemu_hexdump((char *)spkt
->data
, stderr
,
309 "colo-compare spkt", spkt
->size
);
316 * Called from the compare thread on the primary
317 * for compare udp packet
319 static int colo_packet_compare_udp(Packet
*spkt
, Packet
*ppkt
)
321 uint16_t network_header_length
= ppkt
->ip
->ip_hl
<< 2;
322 uint16_t offset
= network_header_length
+ ETH_HLEN
+ ppkt
->vnet_hdr_len
;
324 trace_colo_compare_main("compare udp");
327 * Because of ppkt and spkt are both in the same connection,
328 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
329 * same with spkt. In addition, IP header's Identification is a random
330 * field, we can handle it in IP fragmentation function later.
331 * COLO just concern the response net packet payload from primary guest
332 * and secondary guest are same or not, So we ignored all IP header include
333 * other field like TOS,TTL,IP Checksum. we only need to compare
334 * the ip payload here.
336 if (ppkt
->size
!= spkt
->size
) {
337 trace_colo_compare_main("UDP: payload size of packets are different");
340 if (colo_compare_packet_payload(ppkt
, spkt
, offset
, offset
,
341 ppkt
->size
- offset
)) {
342 trace_colo_compare_udp_miscompare("primary pkt size", ppkt
->size
);
343 trace_colo_compare_udp_miscompare("Secondary pkt size", spkt
->size
);
344 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE
)) {
345 qemu_hexdump((char *)ppkt
->data
, stderr
, "colo-compare pri pkt",
347 qemu_hexdump((char *)spkt
->data
, stderr
, "colo-compare sec pkt",
357 * Called from the compare thread on the primary
358 * for compare icmp packet
360 static int colo_packet_compare_icmp(Packet
*spkt
, Packet
*ppkt
)
362 uint16_t network_header_length
= ppkt
->ip
->ip_hl
<< 2;
363 uint16_t offset
= network_header_length
+ ETH_HLEN
+ ppkt
->vnet_hdr_len
;
365 trace_colo_compare_main("compare icmp");
368 * Because of ppkt and spkt are both in the same connection,
369 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
370 * same with spkt. In addition, IP header's Identification is a random
371 * field, we can handle it in IP fragmentation function later.
372 * COLO just concern the response net packet payload from primary guest
373 * and secondary guest are same or not, So we ignored all IP header include
374 * other field like TOS,TTL,IP Checksum. we only need to compare
375 * the ip payload here.
377 if (ppkt
->size
!= spkt
->size
) {
378 trace_colo_compare_main("ICMP: payload size of packets are different");
381 if (colo_compare_packet_payload(ppkt
, spkt
, offset
, offset
,
382 ppkt
->size
- offset
)) {
383 trace_colo_compare_icmp_miscompare("primary pkt size",
385 trace_colo_compare_icmp_miscompare("Secondary pkt size",
387 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE
)) {
388 qemu_hexdump((char *)ppkt
->data
, stderr
, "colo-compare pri pkt",
390 qemu_hexdump((char *)spkt
->data
, stderr
, "colo-compare sec pkt",
400 * Called from the compare thread on the primary
401 * for compare other packet
403 static int colo_packet_compare_other(Packet
*spkt
, Packet
*ppkt
)
405 uint16_t offset
= ppkt
->vnet_hdr_len
;
407 trace_colo_compare_main("compare other");
408 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE
)) {
409 char pri_ip_src
[20], pri_ip_dst
[20], sec_ip_src
[20], sec_ip_dst
[20];
411 strcpy(pri_ip_src
, inet_ntoa(ppkt
->ip
->ip_src
));
412 strcpy(pri_ip_dst
, inet_ntoa(ppkt
->ip
->ip_dst
));
413 strcpy(sec_ip_src
, inet_ntoa(spkt
->ip
->ip_src
));
414 strcpy(sec_ip_dst
, inet_ntoa(spkt
->ip
->ip_dst
));
416 trace_colo_compare_ip_info(ppkt
->size
, pri_ip_src
,
417 pri_ip_dst
, spkt
->size
,
418 sec_ip_src
, sec_ip_dst
);
421 if (ppkt
->size
!= spkt
->size
) {
422 trace_colo_compare_main("Other: payload size of packets are different");
425 return colo_compare_packet_payload(ppkt
, spkt
, offset
, offset
,
426 ppkt
->size
- offset
);
429 static int colo_old_packet_check_one(Packet
*pkt
, int64_t *check_time
)
431 int64_t now
= qemu_clock_get_ms(QEMU_CLOCK_HOST
);
433 if ((now
- pkt
->creation_ms
) > (*check_time
)) {
434 trace_colo_old_packet_check_found(pkt
->creation_ms
);
441 static int colo_old_packet_check_one_conn(Connection
*conn
,
444 GList
*result
= NULL
;
445 int64_t check_time
= REGULAR_PACKET_CHECK_MS
;
447 result
= g_queue_find_custom(&conn
->primary_list
,
449 (GCompareFunc
)colo_old_packet_check_one
);
452 /* Do checkpoint will flush old packet */
454 * TODO: Notify colo frame to do checkpoint.
455 * colo_compare_inconsistent_notify();
464 * Look for old packets that the secondary hasn't matched,
465 * if we have some then we have to checkpoint to wake
468 static void colo_old_packet_check(void *opaque
)
470 CompareState
*s
= opaque
;
473 * If we find one old packet, stop finding job and notify
474 * COLO frame do checkpoint.
476 g_queue_find_custom(&s
->conn_list
, NULL
,
477 (GCompareFunc
)colo_old_packet_check_one_conn
);
481 * Called from the compare thread on the primary
482 * for compare packet with secondary list of the
483 * specified connection when a new packet was
486 static void colo_compare_connection(void *opaque
, void *user_data
)
488 CompareState
*s
= user_data
;
489 Connection
*conn
= opaque
;
491 GList
*result
= NULL
;
494 while (!g_queue_is_empty(&conn
->primary_list
) &&
495 !g_queue_is_empty(&conn
->secondary_list
)) {
496 pkt
= g_queue_pop_head(&conn
->primary_list
);
497 switch (conn
->ip_proto
) {
499 result
= g_queue_find_custom(&conn
->secondary_list
,
500 pkt
, (GCompareFunc
)colo_packet_compare_tcp
);
503 result
= g_queue_find_custom(&conn
->secondary_list
,
504 pkt
, (GCompareFunc
)colo_packet_compare_udp
);
507 result
= g_queue_find_custom(&conn
->secondary_list
,
508 pkt
, (GCompareFunc
)colo_packet_compare_icmp
);
511 result
= g_queue_find_custom(&conn
->secondary_list
,
512 pkt
, (GCompareFunc
)colo_packet_compare_other
);
517 ret
= compare_chr_send(s
,
522 error_report("colo_send_primary_packet failed");
524 trace_colo_compare_main("packet same and release packet");
525 g_queue_remove(&conn
->secondary_list
, result
->data
);
526 packet_destroy(pkt
, NULL
);
529 * If one packet arrive late, the secondary_list or
530 * primary_list will be empty, so we can't compare it
531 * until next comparison.
533 trace_colo_compare_main("packet different");
534 g_queue_push_head(&conn
->primary_list
, pkt
);
535 /* TODO: colo_notify_checkpoint();*/
541 static int compare_chr_send(CompareState
*s
,
544 uint32_t vnet_hdr_len
)
547 uint32_t len
= htonl(size
);
553 ret
= qemu_chr_fe_write_all(&s
->chr_out
, (uint8_t *)&len
, sizeof(len
));
554 if (ret
!= sizeof(len
)) {
560 * We send vnet header len make other module(like filter-redirector)
561 * know how to parse net packet correctly.
563 len
= htonl(vnet_hdr_len
);
564 ret
= qemu_chr_fe_write_all(&s
->chr_out
, (uint8_t *)&len
, sizeof(len
));
565 if (ret
!= sizeof(len
)) {
570 ret
= qemu_chr_fe_write_all(&s
->chr_out
, (uint8_t *)buf
, size
);
578 return ret
< 0 ? ret
: -EIO
;
581 static int compare_chr_can_read(void *opaque
)
583 return COMPARE_READ_LEN_MAX
;
587 * Called from the main thread on the primary for packets
588 * arriving over the socket from the primary.
590 static void compare_pri_chr_in(void *opaque
, const uint8_t *buf
, int size
)
592 CompareState
*s
= COLO_COMPARE(opaque
);
595 ret
= net_fill_rstate(&s
->pri_rs
, buf
, size
);
597 qemu_chr_fe_set_handlers(&s
->chr_pri_in
, NULL
, NULL
, NULL
, NULL
,
599 error_report("colo-compare primary_in error");
604 * Called from the main thread on the primary for packets
605 * arriving over the socket from the secondary.
607 static void compare_sec_chr_in(void *opaque
, const uint8_t *buf
, int size
)
609 CompareState
*s
= COLO_COMPARE(opaque
);
612 ret
= net_fill_rstate(&s
->sec_rs
, buf
, size
);
614 qemu_chr_fe_set_handlers(&s
->chr_sec_in
, NULL
, NULL
, NULL
, NULL
,
616 error_report("colo-compare secondary_in error");
621 * Check old packet regularly so it can watch for any packets
622 * that the secondary hasn't produced equivalents of.
624 static void check_old_packet_regular(void *opaque
)
626 CompareState
*s
= opaque
;
628 /* if have old packet we will notify checkpoint */
629 colo_old_packet_check(s
);
630 timer_mod(s
->packet_check_timer
, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL
) +
631 REGULAR_PACKET_CHECK_MS
);
634 static void colo_compare_timer_init(CompareState
*s
)
636 AioContext
*ctx
= iothread_get_aio_context(s
->iothread
);
638 s
->packet_check_timer
= aio_timer_new(ctx
, QEMU_CLOCK_VIRTUAL
,
639 SCALE_MS
, check_old_packet_regular
,
641 timer_mod(s
->packet_check_timer
, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL
) +
642 REGULAR_PACKET_CHECK_MS
);
645 static void colo_compare_timer_del(CompareState
*s
)
647 if (s
->packet_check_timer
) {
648 timer_del(s
->packet_check_timer
);
649 timer_free(s
->packet_check_timer
);
650 s
->packet_check_timer
= NULL
;
654 static void colo_compare_iothread(CompareState
*s
)
656 object_ref(OBJECT(s
->iothread
));
657 s
->worker_context
= iothread_get_g_main_context(s
->iothread
);
659 qemu_chr_fe_set_handlers(&s
->chr_pri_in
, compare_chr_can_read
,
660 compare_pri_chr_in
, NULL
, NULL
,
661 s
, s
->worker_context
, true);
662 qemu_chr_fe_set_handlers(&s
->chr_sec_in
, compare_chr_can_read
,
663 compare_sec_chr_in
, NULL
, NULL
,
664 s
, s
->worker_context
, true);
666 colo_compare_timer_init(s
);
669 static char *compare_get_pri_indev(Object
*obj
, Error
**errp
)
671 CompareState
*s
= COLO_COMPARE(obj
);
673 return g_strdup(s
->pri_indev
);
676 static void compare_set_pri_indev(Object
*obj
, const char *value
, Error
**errp
)
678 CompareState
*s
= COLO_COMPARE(obj
);
680 g_free(s
->pri_indev
);
681 s
->pri_indev
= g_strdup(value
);
684 static char *compare_get_sec_indev(Object
*obj
, Error
**errp
)
686 CompareState
*s
= COLO_COMPARE(obj
);
688 return g_strdup(s
->sec_indev
);
691 static void compare_set_sec_indev(Object
*obj
, const char *value
, Error
**errp
)
693 CompareState
*s
= COLO_COMPARE(obj
);
695 g_free(s
->sec_indev
);
696 s
->sec_indev
= g_strdup(value
);
699 static char *compare_get_outdev(Object
*obj
, Error
**errp
)
701 CompareState
*s
= COLO_COMPARE(obj
);
703 return g_strdup(s
->outdev
);
706 static void compare_set_outdev(Object
*obj
, const char *value
, Error
**errp
)
708 CompareState
*s
= COLO_COMPARE(obj
);
711 s
->outdev
= g_strdup(value
);
714 static bool compare_get_vnet_hdr(Object
*obj
, Error
**errp
)
716 CompareState
*s
= COLO_COMPARE(obj
);
721 static void compare_set_vnet_hdr(Object
*obj
,
725 CompareState
*s
= COLO_COMPARE(obj
);
730 static void compare_pri_rs_finalize(SocketReadState
*pri_rs
)
732 CompareState
*s
= container_of(pri_rs
, CompareState
, pri_rs
);
733 Connection
*conn
= NULL
;
735 if (packet_enqueue(s
, PRIMARY_IN
, &conn
)) {
736 trace_colo_compare_main("primary: unsupported packet in");
740 pri_rs
->vnet_hdr_len
);
742 /* compare packet in the specified connection */
743 colo_compare_connection(conn
, s
);
747 static void compare_sec_rs_finalize(SocketReadState
*sec_rs
)
749 CompareState
*s
= container_of(sec_rs
, CompareState
, sec_rs
);
750 Connection
*conn
= NULL
;
752 if (packet_enqueue(s
, SECONDARY_IN
, &conn
)) {
753 trace_colo_compare_main("secondary: unsupported packet in");
755 /* compare packet in the specified connection */
756 colo_compare_connection(conn
, s
);
762 * Return 0 is success.
763 * Return 1 is failed.
765 static int find_and_check_chardev(Chardev
**chr
,
769 *chr
= qemu_chr_find(chr_name
);
771 error_setg(errp
, "Device '%s' not found",
776 if (!qemu_chr_has_feature(*chr
, QEMU_CHAR_FEATURE_RECONNECTABLE
)) {
777 error_setg(errp
, "chardev \"%s\" is not reconnectable",
786 * Called from the main thread on the primary
787 * to setup colo-compare.
789 static void colo_compare_complete(UserCreatable
*uc
, Error
**errp
)
791 CompareState
*s
= COLO_COMPARE(uc
);
794 if (!s
->pri_indev
|| !s
->sec_indev
|| !s
->outdev
|| !s
->iothread
) {
795 error_setg(errp
, "colo compare needs 'primary_in' ,"
796 "'secondary_in','outdev','iothread' property set");
798 } else if (!strcmp(s
->pri_indev
, s
->outdev
) ||
799 !strcmp(s
->sec_indev
, s
->outdev
) ||
800 !strcmp(s
->pri_indev
, s
->sec_indev
)) {
801 error_setg(errp
, "'indev' and 'outdev' could not be same "
802 "for compare module");
806 if (find_and_check_chardev(&chr
, s
->pri_indev
, errp
) ||
807 !qemu_chr_fe_init(&s
->chr_pri_in
, chr
, errp
)) {
811 if (find_and_check_chardev(&chr
, s
->sec_indev
, errp
) ||
812 !qemu_chr_fe_init(&s
->chr_sec_in
, chr
, errp
)) {
816 if (find_and_check_chardev(&chr
, s
->outdev
, errp
) ||
817 !qemu_chr_fe_init(&s
->chr_out
, chr
, errp
)) {
821 net_socket_rs_init(&s
->pri_rs
, compare_pri_rs_finalize
, s
->vnet_hdr
);
822 net_socket_rs_init(&s
->sec_rs
, compare_sec_rs_finalize
, s
->vnet_hdr
);
824 g_queue_init(&s
->conn_list
);
826 s
->connection_track_table
= g_hash_table_new_full(connection_key_hash
,
827 connection_key_equal
,
831 colo_compare_iothread(s
);
835 static void colo_flush_packets(void *opaque
, void *user_data
)
837 CompareState
*s
= user_data
;
838 Connection
*conn
= opaque
;
841 while (!g_queue_is_empty(&conn
->primary_list
)) {
842 pkt
= g_queue_pop_head(&conn
->primary_list
);
847 packet_destroy(pkt
, NULL
);
849 while (!g_queue_is_empty(&conn
->secondary_list
)) {
850 pkt
= g_queue_pop_head(&conn
->secondary_list
);
851 packet_destroy(pkt
, NULL
);
855 static void colo_compare_class_init(ObjectClass
*oc
, void *data
)
857 UserCreatableClass
*ucc
= USER_CREATABLE_CLASS(oc
);
859 ucc
->complete
= colo_compare_complete
;
862 static void colo_compare_init(Object
*obj
)
864 CompareState
*s
= COLO_COMPARE(obj
);
866 object_property_add_str(obj
, "primary_in",
867 compare_get_pri_indev
, compare_set_pri_indev
,
869 object_property_add_str(obj
, "secondary_in",
870 compare_get_sec_indev
, compare_set_sec_indev
,
872 object_property_add_str(obj
, "outdev",
873 compare_get_outdev
, compare_set_outdev
,
875 object_property_add_link(obj
, "iothread", TYPE_IOTHREAD
,
876 (Object
**)&s
->iothread
,
877 object_property_allow_set_link
,
878 OBJ_PROP_LINK_UNREF_ON_RELEASE
, NULL
);
881 object_property_add_bool(obj
, "vnet_hdr_support", compare_get_vnet_hdr
,
882 compare_set_vnet_hdr
, NULL
);
885 static void colo_compare_finalize(Object
*obj
)
887 CompareState
*s
= COLO_COMPARE(obj
);
889 qemu_chr_fe_deinit(&s
->chr_pri_in
, false);
890 qemu_chr_fe_deinit(&s
->chr_sec_in
, false);
891 qemu_chr_fe_deinit(&s
->chr_out
, false);
893 colo_compare_timer_del(s
);
895 /* Release all unhandled packets after compare thead exited */
896 g_queue_foreach(&s
->conn_list
, colo_flush_packets
, s
);
898 g_queue_clear(&s
->conn_list
);
900 if (s
->connection_track_table
) {
901 g_hash_table_destroy(s
->connection_track_table
);
905 object_unref(OBJECT(s
->iothread
));
907 g_free(s
->pri_indev
);
908 g_free(s
->sec_indev
);
912 static const TypeInfo colo_compare_info
= {
913 .name
= TYPE_COLO_COMPARE
,
914 .parent
= TYPE_OBJECT
,
915 .instance_size
= sizeof(CompareState
),
916 .instance_init
= colo_compare_init
,
917 .instance_finalize
= colo_compare_finalize
,
918 .class_size
= sizeof(CompareClass
),
919 .class_init
= colo_compare_class_init
,
920 .interfaces
= (InterfaceInfo
[]) {
921 { TYPE_USER_CREATABLE
},
926 static void register_types(void)
928 type_register_static(&colo_compare_info
);
931 type_init(register_types
);