2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3 * (a.k.a. Fault Tolerance or Continuous Replication)
5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6 * Copyright (c) 2016 FUJITSU LIMITED
7 * Copyright (c) 2016 Intel Corporation
9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
11 * This work is licensed under the terms of the GNU GPL, version 2 or
12 * later. See the COPYING file in the top-level directory.
15 #include "qemu/osdep.h"
16 #include "qemu/error-report.h"
18 #include "qemu-common.h"
19 #include "qapi/qmp/qerror.h"
20 #include "qapi/error.h"
22 #include "qom/object_interfaces.h"
24 #include "qom/object.h"
25 #include "qemu/typedefs.h"
26 #include "net/queue.h"
27 #include "sysemu/char.h"
28 #include "qemu/sockets.h"
29 #include "qapi-visit.h"
32 #define TYPE_COLO_COMPARE "colo-compare"
33 #define COLO_COMPARE(obj) \
34 OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
39 +---------------+ +---------------+ +---------------+
40 |conn list +--->conn +--------->conn |
41 +---------------+ +---------------+ +---------------+
43 +---------------+ +---v----+ +---v----+ +---v----+ +---v----+
44 |primary | |secondary |primary | |secondary
45 |packet | |packet + |packet | |packet +
46 +--------+ +--------+ +--------+ +--------+
48 +---v----+ +---v----+ +---v----+ +---v----+
49 |primary | |secondary |primary | |secondary
50 |packet | |packet + |packet | |packet +
51 +--------+ +--------+ +--------+ +--------+
53 +---v----+ +---v----+ +---v----+ +---v----+
54 |primary | |secondary |primary | |secondary
55 |packet | |packet + |packet | |packet +
56 +--------+ +--------+ +--------+ +--------+
58 typedef struct CompareState
{
64 CharDriverState
*chr_pri_in
;
65 CharDriverState
*chr_sec_in
;
66 CharDriverState
*chr_out
;
67 SocketReadState pri_rs
;
68 SocketReadState sec_rs
;
70 /* hashtable to save connection */
71 GHashTable
*connection_track_table
;
74 typedef struct CompareClass
{
75 ObjectClass parent_class
;
78 typedef struct CompareChardevProps
{
80 } CompareChardevProps
;
87 static int compare_chr_send(CharDriverState
*out
,
92 * Return 0 on success, if return -1 means the pkt
93 * is unsupported(arp and ipv6) and will be sent later
95 static int packet_enqueue(CompareState
*s
, int mode
)
99 if (mode
== PRIMARY_IN
) {
100 pkt
= packet_new(s
->pri_rs
.buf
, s
->pri_rs
.packet_len
);
102 pkt
= packet_new(s
->sec_rs
.buf
, s
->sec_rs
.packet_len
);
105 if (parse_packet_early(pkt
)) {
106 packet_destroy(pkt
, NULL
);
110 /* TODO: get connection key from pkt */
113 * TODO: use connection key get conn from
114 * connection_track_table
118 * TODO: insert pkt to it's conn->primary_list
119 * or conn->secondary_list
125 static int compare_chr_send(CharDriverState
*out
,
130 uint32_t len
= htonl(size
);
136 ret
= qemu_chr_fe_write_all(out
, (uint8_t *)&len
, sizeof(len
));
137 if (ret
!= sizeof(len
)) {
141 ret
= qemu_chr_fe_write_all(out
, (uint8_t *)buf
, size
);
149 return ret
< 0 ? ret
: -EIO
;
152 static char *compare_get_pri_indev(Object
*obj
, Error
**errp
)
154 CompareState
*s
= COLO_COMPARE(obj
);
156 return g_strdup(s
->pri_indev
);
159 static void compare_set_pri_indev(Object
*obj
, const char *value
, Error
**errp
)
161 CompareState
*s
= COLO_COMPARE(obj
);
163 g_free(s
->pri_indev
);
164 s
->pri_indev
= g_strdup(value
);
167 static char *compare_get_sec_indev(Object
*obj
, Error
**errp
)
169 CompareState
*s
= COLO_COMPARE(obj
);
171 return g_strdup(s
->sec_indev
);
174 static void compare_set_sec_indev(Object
*obj
, const char *value
, Error
**errp
)
176 CompareState
*s
= COLO_COMPARE(obj
);
178 g_free(s
->sec_indev
);
179 s
->sec_indev
= g_strdup(value
);
182 static char *compare_get_outdev(Object
*obj
, Error
**errp
)
184 CompareState
*s
= COLO_COMPARE(obj
);
186 return g_strdup(s
->outdev
);
189 static void compare_set_outdev(Object
*obj
, const char *value
, Error
**errp
)
191 CompareState
*s
= COLO_COMPARE(obj
);
194 s
->outdev
= g_strdup(value
);
197 static void compare_pri_rs_finalize(SocketReadState
*pri_rs
)
199 CompareState
*s
= container_of(pri_rs
, CompareState
, pri_rs
);
201 if (packet_enqueue(s
, PRIMARY_IN
)) {
202 trace_colo_compare_main("primary: unsupported packet in");
203 compare_chr_send(s
->chr_out
, pri_rs
->buf
, pri_rs
->packet_len
);
207 static void compare_sec_rs_finalize(SocketReadState
*sec_rs
)
209 CompareState
*s
= container_of(sec_rs
, CompareState
, sec_rs
);
211 if (packet_enqueue(s
, SECONDARY_IN
)) {
212 trace_colo_compare_main("secondary: unsupported packet in");
216 static int compare_chardev_opts(void *opaque
,
217 const char *name
, const char *value
,
220 CompareChardevProps
*props
= opaque
;
222 if (strcmp(name
, "backend") == 0 &&
223 strcmp(value
, "socket") == 0) {
224 props
->is_socket
= true;
226 } else if (strcmp(name
, "host") == 0 ||
227 (strcmp(name
, "port") == 0) ||
228 (strcmp(name
, "server") == 0) ||
229 (strcmp(name
, "wait") == 0) ||
230 (strcmp(name
, "path") == 0)) {
234 "COLO-compare does not support a chardev with option %s=%s",
241 * Return 0 is success.
242 * Return 1 is failed.
244 static int find_and_check_chardev(CharDriverState
**chr
,
248 CompareChardevProps props
;
250 *chr
= qemu_chr_find(chr_name
);
252 error_setg(errp
, "Device '%s' not found",
257 memset(&props
, 0, sizeof(props
));
258 if (qemu_opt_foreach((*chr
)->opts
, compare_chardev_opts
, &props
, errp
)) {
262 if (!props
.is_socket
) {
263 error_setg(errp
, "chardev \"%s\" is not a tcp socket",
271 * Called from the main thread on the primary
272 * to setup colo-compare.
274 static void colo_compare_complete(UserCreatable
*uc
, Error
**errp
)
276 CompareState
*s
= COLO_COMPARE(uc
);
278 if (!s
->pri_indev
|| !s
->sec_indev
|| !s
->outdev
) {
279 error_setg(errp
, "colo compare needs 'primary_in' ,"
280 "'secondary_in','outdev' property set");
282 } else if (!strcmp(s
->pri_indev
, s
->outdev
) ||
283 !strcmp(s
->sec_indev
, s
->outdev
) ||
284 !strcmp(s
->pri_indev
, s
->sec_indev
)) {
285 error_setg(errp
, "'indev' and 'outdev' could not be same "
286 "for compare module");
290 if (find_and_check_chardev(&s
->chr_pri_in
, s
->pri_indev
, errp
)) {
294 if (find_and_check_chardev(&s
->chr_sec_in
, s
->sec_indev
, errp
)) {
298 if (find_and_check_chardev(&s
->chr_out
, s
->outdev
, errp
)) {
302 qemu_chr_fe_claim_no_fail(s
->chr_pri_in
);
304 qemu_chr_fe_claim_no_fail(s
->chr_sec_in
);
306 qemu_chr_fe_claim_no_fail(s
->chr_out
);
308 net_socket_rs_init(&s
->pri_rs
, compare_pri_rs_finalize
);
309 net_socket_rs_init(&s
->sec_rs
, compare_sec_rs_finalize
);
311 /* use g_hash_table_new_full() to new a hashtable */
316 static void colo_compare_class_init(ObjectClass
*oc
, void *data
)
318 UserCreatableClass
*ucc
= USER_CREATABLE_CLASS(oc
);
320 ucc
->complete
= colo_compare_complete
;
323 static void colo_compare_init(Object
*obj
)
325 object_property_add_str(obj
, "primary_in",
326 compare_get_pri_indev
, compare_set_pri_indev
,
328 object_property_add_str(obj
, "secondary_in",
329 compare_get_sec_indev
, compare_set_sec_indev
,
331 object_property_add_str(obj
, "outdev",
332 compare_get_outdev
, compare_set_outdev
,
336 static void colo_compare_finalize(Object
*obj
)
338 CompareState
*s
= COLO_COMPARE(obj
);
341 qemu_chr_add_handlers(s
->chr_pri_in
, NULL
, NULL
, NULL
, NULL
);
342 qemu_chr_fe_release(s
->chr_pri_in
);
345 qemu_chr_add_handlers(s
->chr_sec_in
, NULL
, NULL
, NULL
, NULL
);
346 qemu_chr_fe_release(s
->chr_sec_in
);
349 qemu_chr_fe_release(s
->chr_out
);
352 g_free(s
->pri_indev
);
353 g_free(s
->sec_indev
);
357 static const TypeInfo colo_compare_info
= {
358 .name
= TYPE_COLO_COMPARE
,
359 .parent
= TYPE_OBJECT
,
360 .instance_size
= sizeof(CompareState
),
361 .instance_init
= colo_compare_init
,
362 .instance_finalize
= colo_compare_finalize
,
363 .class_size
= sizeof(CompareClass
),
364 .class_init
= colo_compare_class_init
,
365 .interfaces
= (InterfaceInfo
[]) {
366 { TYPE_USER_CREATABLE
},
371 static void register_types(void)
373 type_register_static(&colo_compare_info
);
376 type_init(register_types
);