2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3 * (a.k.a. Fault Tolerance or Continuous Replication)
5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6 * Copyright (c) 2016 FUJITSU LIMITED
7 * Copyright (c) 2016 Intel Corporation
9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
11 * This work is licensed under the terms of the GNU GPL, version 2 or
12 * later. See the COPYING file in the top-level directory.
15 #include "qemu/osdep.h"
16 #include "qemu/error-report.h"
18 #include "qemu-common.h"
19 #include "qapi/qmp/qerror.h"
20 #include "qapi/error.h"
23 #include "qom/object_interfaces.h"
25 #include "qom/object.h"
26 #include "net/queue.h"
27 #include "chardev/char-fe.h"
28 #include "qemu/sockets.h"
29 #include "qapi-visit.h"
31 #include "sysemu/iothread.h"
33 #define TYPE_COLO_COMPARE "colo-compare"
34 #define COLO_COMPARE(obj) \
35 OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
37 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
38 #define MAX_QUEUE_SIZE 1024
40 #define COLO_COMPARE_FREE_PRIMARY 0x01
41 #define COLO_COMPARE_FREE_SECONDARY 0x02
43 /* TODO: Should be configurable */
44 #define REGULAR_PACKET_CHECK_MS 3000
49 * +---------------+ +---------------+ +---------------+
50 * | conn list + - > conn + ------- > conn + -- > ......
51 * +---------------+ +---------------+ +---------------+
53 * +---------------+ +---v----+ +---v----+ +---v----+ +---v----+
54 * |primary | |secondary |primary | |secondary
55 * |packet | |packet + |packet | |packet +
56 * +--------+ +--------+ +--------+ +--------+
58 * +---v----+ +---v----+ +---v----+ +---v----+
59 * |primary | |secondary |primary | |secondary
60 * |packet | |packet + |packet | |packet +
61 * +--------+ +--------+ +--------+ +--------+
63 * +---v----+ +---v----+ +---v----+ +---v----+
64 * |primary | |secondary |primary | |secondary
65 * |packet | |packet + |packet | |packet +
66 * +--------+ +--------+ +--------+ +--------+
68 typedef struct CompareState
{
74 CharBackend chr_pri_in
;
75 CharBackend chr_sec_in
;
77 SocketReadState pri_rs
;
78 SocketReadState sec_rs
;
82 * Record the connection that through the NIC
83 * Element type: Connection
86 /* Record the connection without repetition */
87 GHashTable
*connection_track_table
;
90 GMainContext
*worker_context
;
91 QEMUTimer
*packet_check_timer
;
94 typedef struct CompareClass
{
95 ObjectClass parent_class
;
103 static int compare_chr_send(CompareState
*s
,
106 uint32_t vnet_hdr_len
);
108 static gint
seq_sorter(Packet
*a
, Packet
*b
, gpointer data
)
110 struct tcphdr
*atcp
, *btcp
;
112 atcp
= (struct tcphdr
*)(a
->transport_header
);
113 btcp
= (struct tcphdr
*)(b
->transport_header
);
114 return ntohl(atcp
->th_seq
) - ntohl(btcp
->th_seq
);
117 static void fill_pkt_tcp_info(void *data
, uint32_t *max_ack
)
120 struct tcphdr
*tcphd
;
122 tcphd
= (struct tcphdr
*)pkt
->transport_header
;
124 pkt
->tcp_seq
= ntohl(tcphd
->th_seq
);
125 pkt
->tcp_ack
= ntohl(tcphd
->th_ack
);
126 *max_ack
= *max_ack
> pkt
->tcp_ack
? *max_ack
: pkt
->tcp_ack
;
127 pkt
->header_size
= pkt
->transport_header
- (uint8_t *)pkt
->data
128 + (tcphd
->th_off
<< 2) - pkt
->vnet_hdr_len
;
129 pkt
->payload_size
= pkt
->size
- pkt
->header_size
;
130 pkt
->seq_end
= pkt
->tcp_seq
+ pkt
->payload_size
;
131 pkt
->flags
= tcphd
->th_flags
;
135 * Return 1 on success, if return 0 means the
136 * packet will be dropped
138 static int colo_insert_packet(GQueue
*queue
, Packet
*pkt
, uint32_t *max_ack
)
140 if (g_queue_get_length(queue
) <= MAX_QUEUE_SIZE
) {
141 if (pkt
->ip
->ip_p
== IPPROTO_TCP
) {
142 fill_pkt_tcp_info(pkt
, max_ack
);
143 g_queue_insert_sorted(queue
,
145 (GCompareDataFunc
)seq_sorter
,
148 g_queue_push_tail(queue
, pkt
);
156 * Return 0 on success, if return -1 means the pkt
157 * is unsupported(arp and ipv6) and will be sent later
159 static int packet_enqueue(CompareState
*s
, int mode
, Connection
**con
)
165 if (mode
== PRIMARY_IN
) {
166 pkt
= packet_new(s
->pri_rs
.buf
,
167 s
->pri_rs
.packet_len
,
168 s
->pri_rs
.vnet_hdr_len
);
170 pkt
= packet_new(s
->sec_rs
.buf
,
171 s
->sec_rs
.packet_len
,
172 s
->sec_rs
.vnet_hdr_len
);
175 if (parse_packet_early(pkt
)) {
176 packet_destroy(pkt
, NULL
);
180 fill_connection_key(pkt
, &key
);
182 conn
= connection_get(s
->connection_track_table
,
186 if (!conn
->processing
) {
187 g_queue_push_tail(&s
->conn_list
, conn
);
188 conn
->processing
= true;
191 if (mode
== PRIMARY_IN
) {
192 if (!colo_insert_packet(&conn
->primary_list
, pkt
, &conn
->pack
)) {
193 error_report("colo compare primary queue size too big,"
197 if (!colo_insert_packet(&conn
->secondary_list
, pkt
, &conn
->sack
)) {
198 error_report("colo compare secondary queue size too big,"
207 static inline bool after(uint32_t seq1
, uint32_t seq2
)
209 return (int32_t)(seq1
- seq2
) > 0;
212 static void colo_release_primary_pkt(CompareState
*s
, Packet
*pkt
)
215 ret
= compare_chr_send(s
,
220 error_report("colo send primary packet failed");
222 trace_colo_compare_main("packet same and release packet");
223 packet_destroy(pkt
, NULL
);
227 * The IP packets sent by primary and secondary
228 * will be compared in here
229 * TODO support ip fragment, Out-Of-Order
230 * return: 0 means packet same
231 * > 0 || < 0 means packet different
233 static int colo_compare_packet_payload(Packet
*ppkt
,
240 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE
)) {
241 char pri_ip_src
[20], pri_ip_dst
[20], sec_ip_src
[20], sec_ip_dst
[20];
243 strcpy(pri_ip_src
, inet_ntoa(ppkt
->ip
->ip_src
));
244 strcpy(pri_ip_dst
, inet_ntoa(ppkt
->ip
->ip_dst
));
245 strcpy(sec_ip_src
, inet_ntoa(spkt
->ip
->ip_src
));
246 strcpy(sec_ip_dst
, inet_ntoa(spkt
->ip
->ip_dst
));
248 trace_colo_compare_ip_info(ppkt
->size
, pri_ip_src
,
249 pri_ip_dst
, spkt
->size
,
250 sec_ip_src
, sec_ip_dst
);
253 return memcmp(ppkt
->data
+ poffset
, spkt
->data
+ soffset
, len
);
257 * return true means that the payload is consist and
258 * need to make the next comparison, false means do
261 static bool colo_mark_tcp_pkt(Packet
*ppkt
, Packet
*spkt
,
262 int8_t *mark
, uint32_t max_ack
)
266 if (ppkt
->tcp_seq
== spkt
->tcp_seq
&& ppkt
->seq_end
== spkt
->seq_end
) {
267 if (colo_compare_packet_payload(ppkt
, spkt
,
268 ppkt
->header_size
, spkt
->header_size
,
269 ppkt
->payload_size
)) {
270 *mark
= COLO_COMPARE_FREE_SECONDARY
| COLO_COMPARE_FREE_PRIMARY
;
274 if (ppkt
->tcp_seq
== spkt
->tcp_seq
&& ppkt
->seq_end
== spkt
->seq_end
) {
275 if (colo_compare_packet_payload(ppkt
, spkt
,
276 ppkt
->header_size
, spkt
->header_size
,
277 ppkt
->payload_size
)) {
278 *mark
= COLO_COMPARE_FREE_SECONDARY
| COLO_COMPARE_FREE_PRIMARY
;
283 /* one part of secondary packet payload still need to be compared */
284 if (!after(ppkt
->seq_end
, spkt
->seq_end
)) {
285 if (colo_compare_packet_payload(ppkt
, spkt
,
286 ppkt
->header_size
+ ppkt
->offset
,
287 spkt
->header_size
+ spkt
->offset
,
288 ppkt
->payload_size
- ppkt
->offset
)) {
289 if (!after(ppkt
->tcp_ack
, max_ack
)) {
290 *mark
= COLO_COMPARE_FREE_PRIMARY
;
291 spkt
->offset
+= ppkt
->payload_size
- ppkt
->offset
;
294 /* secondary guest hasn't ack the data, don't send
301 /* primary packet is longer than secondary packet, compare
302 * the same part and mark the primary packet offset
304 if (colo_compare_packet_payload(ppkt
, spkt
,
305 ppkt
->header_size
+ ppkt
->offset
,
306 spkt
->header_size
+ spkt
->offset
,
307 spkt
->payload_size
- spkt
->offset
)) {
308 *mark
= COLO_COMPARE_FREE_SECONDARY
;
309 ppkt
->offset
+= spkt
->payload_size
- spkt
->offset
;
317 static void colo_compare_tcp(CompareState
*s
, Connection
*conn
)
319 Packet
*ppkt
= NULL
, *spkt
= NULL
;
323 * If ppkt and spkt have the same payload, but ppkt's ACK
324 * is greater than spkt's ACK, in this case we can not
325 * send the ppkt because it will cause the secondary guest
326 * to miss sending some data in the next. Therefore, we
327 * record the maximum ACK in the current queue at both
328 * primary side and secondary side. Only when the ack is
329 * less than the smaller of the two maximum ack, then we
330 * can ensure that the packet's payload is acknowledged by
331 * primary and secondary.
333 uint32_t min_ack
= conn
->pack
> conn
->sack
? conn
->sack
: conn
->pack
;
336 if (g_queue_is_empty(&conn
->primary_list
)) {
339 ppkt
= g_queue_pop_head(&conn
->primary_list
);
341 if (g_queue_is_empty(&conn
->secondary_list
)) {
342 g_queue_push_head(&conn
->primary_list
, ppkt
);
345 spkt
= g_queue_pop_head(&conn
->secondary_list
);
347 if (ppkt
->tcp_seq
== ppkt
->seq_end
) {
348 colo_release_primary_pkt(s
, ppkt
);
352 if (ppkt
&& conn
->compare_seq
&& !after(ppkt
->seq_end
, conn
->compare_seq
)) {
353 trace_colo_compare_main("pri: this packet has compared");
354 colo_release_primary_pkt(s
, ppkt
);
358 if (spkt
->tcp_seq
== spkt
->seq_end
) {
359 packet_destroy(spkt
, NULL
);
366 if (conn
->compare_seq
&& !after(spkt
->seq_end
, conn
->compare_seq
)) {
367 trace_colo_compare_main("sec: this packet has compared");
368 packet_destroy(spkt
, NULL
);
376 g_queue_push_head(&conn
->secondary_list
, spkt
);
381 if (colo_mark_tcp_pkt(ppkt
, spkt
, &mark
, min_ack
)) {
382 trace_colo_compare_tcp_info("pri",
383 ppkt
->tcp_seq
, ppkt
->tcp_ack
,
384 ppkt
->header_size
, ppkt
->payload_size
,
385 ppkt
->offset
, ppkt
->flags
);
387 trace_colo_compare_tcp_info("sec",
388 spkt
->tcp_seq
, spkt
->tcp_ack
,
389 spkt
->header_size
, spkt
->payload_size
,
390 spkt
->offset
, spkt
->flags
);
392 if (mark
== COLO_COMPARE_FREE_PRIMARY
) {
393 conn
->compare_seq
= ppkt
->seq_end
;
394 colo_release_primary_pkt(s
, ppkt
);
395 g_queue_push_head(&conn
->secondary_list
, spkt
);
398 if (mark
== COLO_COMPARE_FREE_SECONDARY
) {
399 conn
->compare_seq
= spkt
->seq_end
;
400 packet_destroy(spkt
, NULL
);
403 if (mark
== (COLO_COMPARE_FREE_PRIMARY
| COLO_COMPARE_FREE_SECONDARY
)) {
404 conn
->compare_seq
= ppkt
->seq_end
;
405 colo_release_primary_pkt(s
, ppkt
);
406 packet_destroy(spkt
, NULL
);
410 g_queue_push_head(&conn
->primary_list
, ppkt
);
411 g_queue_push_head(&conn
->secondary_list
, spkt
);
413 qemu_hexdump((char *)ppkt
->data
, stderr
,
414 "colo-compare ppkt", ppkt
->size
);
415 qemu_hexdump((char *)spkt
->data
, stderr
,
416 "colo-compare spkt", spkt
->size
);
419 * colo_compare_inconsistent_notify();
420 * TODO: notice to checkpoint();
427 * Called from the compare thread on the primary
428 * for compare udp packet
430 static int colo_packet_compare_udp(Packet
*spkt
, Packet
*ppkt
)
432 uint16_t network_header_length
= ppkt
->ip
->ip_hl
<< 2;
433 uint16_t offset
= network_header_length
+ ETH_HLEN
+ ppkt
->vnet_hdr_len
;
435 trace_colo_compare_main("compare udp");
438 * Because of ppkt and spkt are both in the same connection,
439 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
440 * same with spkt. In addition, IP header's Identification is a random
441 * field, we can handle it in IP fragmentation function later.
442 * COLO just concern the response net packet payload from primary guest
443 * and secondary guest are same or not, So we ignored all IP header include
444 * other field like TOS,TTL,IP Checksum. we only need to compare
445 * the ip payload here.
447 if (ppkt
->size
!= spkt
->size
) {
448 trace_colo_compare_main("UDP: payload size of packets are different");
451 if (colo_compare_packet_payload(ppkt
, spkt
, offset
, offset
,
452 ppkt
->size
- offset
)) {
453 trace_colo_compare_udp_miscompare("primary pkt size", ppkt
->size
);
454 trace_colo_compare_udp_miscompare("Secondary pkt size", spkt
->size
);
455 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE
)) {
456 qemu_hexdump((char *)ppkt
->data
, stderr
, "colo-compare pri pkt",
458 qemu_hexdump((char *)spkt
->data
, stderr
, "colo-compare sec pkt",
468 * Called from the compare thread on the primary
469 * for compare icmp packet
471 static int colo_packet_compare_icmp(Packet
*spkt
, Packet
*ppkt
)
473 uint16_t network_header_length
= ppkt
->ip
->ip_hl
<< 2;
474 uint16_t offset
= network_header_length
+ ETH_HLEN
+ ppkt
->vnet_hdr_len
;
476 trace_colo_compare_main("compare icmp");
479 * Because of ppkt and spkt are both in the same connection,
480 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
481 * same with spkt. In addition, IP header's Identification is a random
482 * field, we can handle it in IP fragmentation function later.
483 * COLO just concern the response net packet payload from primary guest
484 * and secondary guest are same or not, So we ignored all IP header include
485 * other field like TOS,TTL,IP Checksum. we only need to compare
486 * the ip payload here.
488 if (ppkt
->size
!= spkt
->size
) {
489 trace_colo_compare_main("ICMP: payload size of packets are different");
492 if (colo_compare_packet_payload(ppkt
, spkt
, offset
, offset
,
493 ppkt
->size
- offset
)) {
494 trace_colo_compare_icmp_miscompare("primary pkt size",
496 trace_colo_compare_icmp_miscompare("Secondary pkt size",
498 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE
)) {
499 qemu_hexdump((char *)ppkt
->data
, stderr
, "colo-compare pri pkt",
501 qemu_hexdump((char *)spkt
->data
, stderr
, "colo-compare sec pkt",
511 * Called from the compare thread on the primary
512 * for compare other packet
514 static int colo_packet_compare_other(Packet
*spkt
, Packet
*ppkt
)
516 uint16_t offset
= ppkt
->vnet_hdr_len
;
518 trace_colo_compare_main("compare other");
519 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE
)) {
520 char pri_ip_src
[20], pri_ip_dst
[20], sec_ip_src
[20], sec_ip_dst
[20];
522 strcpy(pri_ip_src
, inet_ntoa(ppkt
->ip
->ip_src
));
523 strcpy(pri_ip_dst
, inet_ntoa(ppkt
->ip
->ip_dst
));
524 strcpy(sec_ip_src
, inet_ntoa(spkt
->ip
->ip_src
));
525 strcpy(sec_ip_dst
, inet_ntoa(spkt
->ip
->ip_dst
));
527 trace_colo_compare_ip_info(ppkt
->size
, pri_ip_src
,
528 pri_ip_dst
, spkt
->size
,
529 sec_ip_src
, sec_ip_dst
);
532 if (ppkt
->size
!= spkt
->size
) {
533 trace_colo_compare_main("Other: payload size of packets are different");
536 return colo_compare_packet_payload(ppkt
, spkt
, offset
, offset
,
537 ppkt
->size
- offset
);
540 static int colo_old_packet_check_one(Packet
*pkt
, int64_t *check_time
)
542 int64_t now
= qemu_clock_get_ms(QEMU_CLOCK_HOST
);
544 if ((now
- pkt
->creation_ms
) > (*check_time
)) {
545 trace_colo_old_packet_check_found(pkt
->creation_ms
);
552 static int colo_old_packet_check_one_conn(Connection
*conn
,
555 GList
*result
= NULL
;
556 int64_t check_time
= REGULAR_PACKET_CHECK_MS
;
558 result
= g_queue_find_custom(&conn
->primary_list
,
560 (GCompareFunc
)colo_old_packet_check_one
);
563 /* Do checkpoint will flush old packet */
565 * TODO: Notify colo frame to do checkpoint.
566 * colo_compare_inconsistent_notify();
575 * Look for old packets that the secondary hasn't matched,
576 * if we have some then we have to checkpoint to wake
579 static void colo_old_packet_check(void *opaque
)
581 CompareState
*s
= opaque
;
584 * If we find one old packet, stop finding job and notify
585 * COLO frame do checkpoint.
587 g_queue_find_custom(&s
->conn_list
, NULL
,
588 (GCompareFunc
)colo_old_packet_check_one_conn
);
591 static void colo_compare_packet(CompareState
*s
, Connection
*conn
,
592 int (*HandlePacket
)(Packet
*spkt
,
596 GList
*result
= NULL
;
598 while (!g_queue_is_empty(&conn
->primary_list
) &&
599 !g_queue_is_empty(&conn
->secondary_list
)) {
600 pkt
= g_queue_pop_head(&conn
->primary_list
);
601 result
= g_queue_find_custom(&conn
->secondary_list
,
602 pkt
, (GCompareFunc
)HandlePacket
);
605 colo_release_primary_pkt(s
, pkt
);
606 g_queue_remove(&conn
->secondary_list
, result
->data
);
609 * If one packet arrive late, the secondary_list or
610 * primary_list will be empty, so we can't compare it
611 * until next comparison.
613 trace_colo_compare_main("packet different");
614 g_queue_push_head(&conn
->primary_list
, pkt
);
615 /* TODO: colo_notify_checkpoint();*/
622 * Called from the compare thread on the primary
623 * for compare packet with secondary list of the
624 * specified connection when a new packet was
627 static void colo_compare_connection(void *opaque
, void *user_data
)
629 CompareState
*s
= user_data
;
630 Connection
*conn
= opaque
;
632 switch (conn
->ip_proto
) {
634 colo_compare_tcp(s
, conn
);
637 colo_compare_packet(s
, conn
, colo_packet_compare_udp
);
640 colo_compare_packet(s
, conn
, colo_packet_compare_icmp
);
643 colo_compare_packet(s
, conn
, colo_packet_compare_other
);
648 static int compare_chr_send(CompareState
*s
,
651 uint32_t vnet_hdr_len
)
654 uint32_t len
= htonl(size
);
660 ret
= qemu_chr_fe_write_all(&s
->chr_out
, (uint8_t *)&len
, sizeof(len
));
661 if (ret
!= sizeof(len
)) {
667 * We send vnet header len make other module(like filter-redirector)
668 * know how to parse net packet correctly.
670 len
= htonl(vnet_hdr_len
);
671 ret
= qemu_chr_fe_write_all(&s
->chr_out
, (uint8_t *)&len
, sizeof(len
));
672 if (ret
!= sizeof(len
)) {
677 ret
= qemu_chr_fe_write_all(&s
->chr_out
, (uint8_t *)buf
, size
);
685 return ret
< 0 ? ret
: -EIO
;
688 static int compare_chr_can_read(void *opaque
)
690 return COMPARE_READ_LEN_MAX
;
694 * Called from the main thread on the primary for packets
695 * arriving over the socket from the primary.
697 static void compare_pri_chr_in(void *opaque
, const uint8_t *buf
, int size
)
699 CompareState
*s
= COLO_COMPARE(opaque
);
702 ret
= net_fill_rstate(&s
->pri_rs
, buf
, size
);
704 qemu_chr_fe_set_handlers(&s
->chr_pri_in
, NULL
, NULL
, NULL
, NULL
,
706 error_report("colo-compare primary_in error");
711 * Called from the main thread on the primary for packets
712 * arriving over the socket from the secondary.
714 static void compare_sec_chr_in(void *opaque
, const uint8_t *buf
, int size
)
716 CompareState
*s
= COLO_COMPARE(opaque
);
719 ret
= net_fill_rstate(&s
->sec_rs
, buf
, size
);
721 qemu_chr_fe_set_handlers(&s
->chr_sec_in
, NULL
, NULL
, NULL
, NULL
,
723 error_report("colo-compare secondary_in error");
728 * Check old packet regularly so it can watch for any packets
729 * that the secondary hasn't produced equivalents of.
731 static void check_old_packet_regular(void *opaque
)
733 CompareState
*s
= opaque
;
735 /* if have old packet we will notify checkpoint */
736 colo_old_packet_check(s
);
737 timer_mod(s
->packet_check_timer
, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL
) +
738 REGULAR_PACKET_CHECK_MS
);
741 static void colo_compare_timer_init(CompareState
*s
)
743 AioContext
*ctx
= iothread_get_aio_context(s
->iothread
);
745 s
->packet_check_timer
= aio_timer_new(ctx
, QEMU_CLOCK_VIRTUAL
,
746 SCALE_MS
, check_old_packet_regular
,
748 timer_mod(s
->packet_check_timer
, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL
) +
749 REGULAR_PACKET_CHECK_MS
);
752 static void colo_compare_timer_del(CompareState
*s
)
754 if (s
->packet_check_timer
) {
755 timer_del(s
->packet_check_timer
);
756 timer_free(s
->packet_check_timer
);
757 s
->packet_check_timer
= NULL
;
761 static void colo_compare_iothread(CompareState
*s
)
763 object_ref(OBJECT(s
->iothread
));
764 s
->worker_context
= iothread_get_g_main_context(s
->iothread
);
766 qemu_chr_fe_set_handlers(&s
->chr_pri_in
, compare_chr_can_read
,
767 compare_pri_chr_in
, NULL
, NULL
,
768 s
, s
->worker_context
, true);
769 qemu_chr_fe_set_handlers(&s
->chr_sec_in
, compare_chr_can_read
,
770 compare_sec_chr_in
, NULL
, NULL
,
771 s
, s
->worker_context
, true);
773 colo_compare_timer_init(s
);
776 static char *compare_get_pri_indev(Object
*obj
, Error
**errp
)
778 CompareState
*s
= COLO_COMPARE(obj
);
780 return g_strdup(s
->pri_indev
);
783 static void compare_set_pri_indev(Object
*obj
, const char *value
, Error
**errp
)
785 CompareState
*s
= COLO_COMPARE(obj
);
787 g_free(s
->pri_indev
);
788 s
->pri_indev
= g_strdup(value
);
791 static char *compare_get_sec_indev(Object
*obj
, Error
**errp
)
793 CompareState
*s
= COLO_COMPARE(obj
);
795 return g_strdup(s
->sec_indev
);
798 static void compare_set_sec_indev(Object
*obj
, const char *value
, Error
**errp
)
800 CompareState
*s
= COLO_COMPARE(obj
);
802 g_free(s
->sec_indev
);
803 s
->sec_indev
= g_strdup(value
);
806 static char *compare_get_outdev(Object
*obj
, Error
**errp
)
808 CompareState
*s
= COLO_COMPARE(obj
);
810 return g_strdup(s
->outdev
);
813 static void compare_set_outdev(Object
*obj
, const char *value
, Error
**errp
)
815 CompareState
*s
= COLO_COMPARE(obj
);
818 s
->outdev
= g_strdup(value
);
821 static bool compare_get_vnet_hdr(Object
*obj
, Error
**errp
)
823 CompareState
*s
= COLO_COMPARE(obj
);
828 static void compare_set_vnet_hdr(Object
*obj
,
832 CompareState
*s
= COLO_COMPARE(obj
);
837 static void compare_pri_rs_finalize(SocketReadState
*pri_rs
)
839 CompareState
*s
= container_of(pri_rs
, CompareState
, pri_rs
);
840 Connection
*conn
= NULL
;
842 if (packet_enqueue(s
, PRIMARY_IN
, &conn
)) {
843 trace_colo_compare_main("primary: unsupported packet in");
847 pri_rs
->vnet_hdr_len
);
849 /* compare packet in the specified connection */
850 colo_compare_connection(conn
, s
);
854 static void compare_sec_rs_finalize(SocketReadState
*sec_rs
)
856 CompareState
*s
= container_of(sec_rs
, CompareState
, sec_rs
);
857 Connection
*conn
= NULL
;
859 if (packet_enqueue(s
, SECONDARY_IN
, &conn
)) {
860 trace_colo_compare_main("secondary: unsupported packet in");
862 /* compare packet in the specified connection */
863 colo_compare_connection(conn
, s
);
869 * Return 0 is success.
870 * Return 1 is failed.
872 static int find_and_check_chardev(Chardev
**chr
,
876 *chr
= qemu_chr_find(chr_name
);
878 error_setg(errp
, "Device '%s' not found",
883 if (!qemu_chr_has_feature(*chr
, QEMU_CHAR_FEATURE_RECONNECTABLE
)) {
884 error_setg(errp
, "chardev \"%s\" is not reconnectable",
893 * Called from the main thread on the primary
894 * to setup colo-compare.
896 static void colo_compare_complete(UserCreatable
*uc
, Error
**errp
)
898 CompareState
*s
= COLO_COMPARE(uc
);
901 if (!s
->pri_indev
|| !s
->sec_indev
|| !s
->outdev
|| !s
->iothread
) {
902 error_setg(errp
, "colo compare needs 'primary_in' ,"
903 "'secondary_in','outdev','iothread' property set");
905 } else if (!strcmp(s
->pri_indev
, s
->outdev
) ||
906 !strcmp(s
->sec_indev
, s
->outdev
) ||
907 !strcmp(s
->pri_indev
, s
->sec_indev
)) {
908 error_setg(errp
, "'indev' and 'outdev' could not be same "
909 "for compare module");
913 if (find_and_check_chardev(&chr
, s
->pri_indev
, errp
) ||
914 !qemu_chr_fe_init(&s
->chr_pri_in
, chr
, errp
)) {
918 if (find_and_check_chardev(&chr
, s
->sec_indev
, errp
) ||
919 !qemu_chr_fe_init(&s
->chr_sec_in
, chr
, errp
)) {
923 if (find_and_check_chardev(&chr
, s
->outdev
, errp
) ||
924 !qemu_chr_fe_init(&s
->chr_out
, chr
, errp
)) {
928 net_socket_rs_init(&s
->pri_rs
, compare_pri_rs_finalize
, s
->vnet_hdr
);
929 net_socket_rs_init(&s
->sec_rs
, compare_sec_rs_finalize
, s
->vnet_hdr
);
931 g_queue_init(&s
->conn_list
);
933 s
->connection_track_table
= g_hash_table_new_full(connection_key_hash
,
934 connection_key_equal
,
938 colo_compare_iothread(s
);
942 static void colo_flush_packets(void *opaque
, void *user_data
)
944 CompareState
*s
= user_data
;
945 Connection
*conn
= opaque
;
948 while (!g_queue_is_empty(&conn
->primary_list
)) {
949 pkt
= g_queue_pop_head(&conn
->primary_list
);
954 packet_destroy(pkt
, NULL
);
956 while (!g_queue_is_empty(&conn
->secondary_list
)) {
957 pkt
= g_queue_pop_head(&conn
->secondary_list
);
958 packet_destroy(pkt
, NULL
);
962 static void colo_compare_class_init(ObjectClass
*oc
, void *data
)
964 UserCreatableClass
*ucc
= USER_CREATABLE_CLASS(oc
);
966 ucc
->complete
= colo_compare_complete
;
969 static void colo_compare_init(Object
*obj
)
971 CompareState
*s
= COLO_COMPARE(obj
);
973 object_property_add_str(obj
, "primary_in",
974 compare_get_pri_indev
, compare_set_pri_indev
,
976 object_property_add_str(obj
, "secondary_in",
977 compare_get_sec_indev
, compare_set_sec_indev
,
979 object_property_add_str(obj
, "outdev",
980 compare_get_outdev
, compare_set_outdev
,
982 object_property_add_link(obj
, "iothread", TYPE_IOTHREAD
,
983 (Object
**)&s
->iothread
,
984 object_property_allow_set_link
,
985 OBJ_PROP_LINK_UNREF_ON_RELEASE
, NULL
);
988 object_property_add_bool(obj
, "vnet_hdr_support", compare_get_vnet_hdr
,
989 compare_set_vnet_hdr
, NULL
);
992 static void colo_compare_finalize(Object
*obj
)
994 CompareState
*s
= COLO_COMPARE(obj
);
996 qemu_chr_fe_deinit(&s
->chr_pri_in
, false);
997 qemu_chr_fe_deinit(&s
->chr_sec_in
, false);
998 qemu_chr_fe_deinit(&s
->chr_out
, false);
1000 colo_compare_timer_del(s
);
1002 /* Release all unhandled packets after compare thead exited */
1003 g_queue_foreach(&s
->conn_list
, colo_flush_packets
, s
);
1005 g_queue_clear(&s
->conn_list
);
1007 if (s
->connection_track_table
) {
1008 g_hash_table_destroy(s
->connection_track_table
);
1012 object_unref(OBJECT(s
->iothread
));
1014 g_free(s
->pri_indev
);
1015 g_free(s
->sec_indev
);
1019 static const TypeInfo colo_compare_info
= {
1020 .name
= TYPE_COLO_COMPARE
,
1021 .parent
= TYPE_OBJECT
,
1022 .instance_size
= sizeof(CompareState
),
1023 .instance_init
= colo_compare_init
,
1024 .instance_finalize
= colo_compare_finalize
,
1025 .class_size
= sizeof(CompareClass
),
1026 .class_init
= colo_compare_class_init
,
1027 .interfaces
= (InterfaceInfo
[]) {
1028 { TYPE_USER_CREATABLE
},
1033 static void register_types(void)
1035 type_register_static(&colo_compare_info
);
1038 type_init(register_types
);