colo-compare: track connection and enqueue packet
[qemu/kevin.git] / net / colo-compare.c
blobbcc1beb610c95a1e833c45f3a0bc57af091e6f39
1 /*
2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3 * (a.k.a. Fault Tolerance or Continuous Replication)
5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6 * Copyright (c) 2016 FUJITSU LIMITED
7 * Copyright (c) 2016 Intel Corporation
9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
11 * This work is licensed under the terms of the GNU GPL, version 2 or
12 * later. See the COPYING file in the top-level directory.
15 #include "qemu/osdep.h"
16 #include "qemu/error-report.h"
17 #include "trace.h"
18 #include "qemu-common.h"
19 #include "qapi/qmp/qerror.h"
20 #include "qapi/error.h"
21 #include "net/net.h"
22 #include "qom/object_interfaces.h"
23 #include "qemu/iov.h"
24 #include "qom/object.h"
25 #include "qemu/typedefs.h"
26 #include "net/queue.h"
27 #include "sysemu/char.h"
28 #include "qemu/sockets.h"
29 #include "qapi-visit.h"
30 #include "net/colo.h"
32 #define TYPE_COLO_COMPARE "colo-compare"
33 #define COLO_COMPARE(obj) \
34 OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
36 #define MAX_QUEUE_SIZE 1024
39 + CompareState ++
40 | |
41 +---------------+ +---------------+ +---------------+
42 |conn list +--->conn +--------->conn |
43 +---------------+ +---------------+ +---------------+
44 | | | | | |
45 +---------------+ +---v----+ +---v----+ +---v----+ +---v----+
46 |primary | |secondary |primary | |secondary
47 |packet | |packet + |packet | |packet +
48 +--------+ +--------+ +--------+ +--------+
49 | | | |
50 +---v----+ +---v----+ +---v----+ +---v----+
51 |primary | |secondary |primary | |secondary
52 |packet | |packet + |packet | |packet +
53 +--------+ +--------+ +--------+ +--------+
54 | | | |
55 +---v----+ +---v----+ +---v----+ +---v----+
56 |primary | |secondary |primary | |secondary
57 |packet | |packet + |packet | |packet +
58 +--------+ +--------+ +--------+ +--------+
60 typedef struct CompareState {
61 Object parent;
63 char *pri_indev;
64 char *sec_indev;
65 char *outdev;
66 CharDriverState *chr_pri_in;
67 CharDriverState *chr_sec_in;
68 CharDriverState *chr_out;
69 SocketReadState pri_rs;
70 SocketReadState sec_rs;
72 /* connection list: the connections belonged to this NIC could be found
73 * in this list.
74 * element type: Connection
76 GQueue conn_list;
77 /* hashtable to save connection */
78 GHashTable *connection_track_table;
79 } CompareState;
81 typedef struct CompareClass {
82 ObjectClass parent_class;
83 } CompareClass;
85 typedef struct CompareChardevProps {
86 bool is_socket;
87 } CompareChardevProps;
89 enum {
90 PRIMARY_IN = 0,
91 SECONDARY_IN,
94 static int compare_chr_send(CharDriverState *out,
95 const uint8_t *buf,
96 uint32_t size);
99 * Return 0 on success, if return -1 means the pkt
100 * is unsupported(arp and ipv6) and will be sent later
102 static int packet_enqueue(CompareState *s, int mode)
104 ConnectionKey key;
105 Packet *pkt = NULL;
106 Connection *conn;
108 if (mode == PRIMARY_IN) {
109 pkt = packet_new(s->pri_rs.buf, s->pri_rs.packet_len);
110 } else {
111 pkt = packet_new(s->sec_rs.buf, s->sec_rs.packet_len);
114 if (parse_packet_early(pkt)) {
115 packet_destroy(pkt, NULL);
116 pkt = NULL;
117 return -1;
119 fill_connection_key(pkt, &key);
121 conn = connection_get(s->connection_track_table,
122 &key,
123 &s->conn_list);
125 if (!conn->processing) {
126 g_queue_push_tail(&s->conn_list, conn);
127 conn->processing = true;
130 if (mode == PRIMARY_IN) {
131 if (g_queue_get_length(&conn->primary_list) <=
132 MAX_QUEUE_SIZE) {
133 g_queue_push_tail(&conn->primary_list, pkt);
134 } else {
135 error_report("colo compare primary queue size too big,"
136 "drop packet");
138 } else {
139 if (g_queue_get_length(&conn->secondary_list) <=
140 MAX_QUEUE_SIZE) {
141 g_queue_push_tail(&conn->secondary_list, pkt);
142 } else {
143 error_report("colo compare secondary queue size too big,"
144 "drop packet");
148 return 0;
151 static int compare_chr_send(CharDriverState *out,
152 const uint8_t *buf,
153 uint32_t size)
155 int ret = 0;
156 uint32_t len = htonl(size);
158 if (!size) {
159 return 0;
162 ret = qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len));
163 if (ret != sizeof(len)) {
164 goto err;
167 ret = qemu_chr_fe_write_all(out, (uint8_t *)buf, size);
168 if (ret != size) {
169 goto err;
172 return 0;
174 err:
175 return ret < 0 ? ret : -EIO;
178 static char *compare_get_pri_indev(Object *obj, Error **errp)
180 CompareState *s = COLO_COMPARE(obj);
182 return g_strdup(s->pri_indev);
185 static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
187 CompareState *s = COLO_COMPARE(obj);
189 g_free(s->pri_indev);
190 s->pri_indev = g_strdup(value);
193 static char *compare_get_sec_indev(Object *obj, Error **errp)
195 CompareState *s = COLO_COMPARE(obj);
197 return g_strdup(s->sec_indev);
200 static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
202 CompareState *s = COLO_COMPARE(obj);
204 g_free(s->sec_indev);
205 s->sec_indev = g_strdup(value);
208 static char *compare_get_outdev(Object *obj, Error **errp)
210 CompareState *s = COLO_COMPARE(obj);
212 return g_strdup(s->outdev);
215 static void compare_set_outdev(Object *obj, const char *value, Error **errp)
217 CompareState *s = COLO_COMPARE(obj);
219 g_free(s->outdev);
220 s->outdev = g_strdup(value);
223 static void compare_pri_rs_finalize(SocketReadState *pri_rs)
225 CompareState *s = container_of(pri_rs, CompareState, pri_rs);
227 if (packet_enqueue(s, PRIMARY_IN)) {
228 trace_colo_compare_main("primary: unsupported packet in");
229 compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
233 static void compare_sec_rs_finalize(SocketReadState *sec_rs)
235 CompareState *s = container_of(sec_rs, CompareState, sec_rs);
237 if (packet_enqueue(s, SECONDARY_IN)) {
238 trace_colo_compare_main("secondary: unsupported packet in");
242 static int compare_chardev_opts(void *opaque,
243 const char *name, const char *value,
244 Error **errp)
246 CompareChardevProps *props = opaque;
248 if (strcmp(name, "backend") == 0 &&
249 strcmp(value, "socket") == 0) {
250 props->is_socket = true;
251 return 0;
252 } else if (strcmp(name, "host") == 0 ||
253 (strcmp(name, "port") == 0) ||
254 (strcmp(name, "server") == 0) ||
255 (strcmp(name, "wait") == 0) ||
256 (strcmp(name, "path") == 0)) {
257 return 0;
258 } else {
259 error_setg(errp,
260 "COLO-compare does not support a chardev with option %s=%s",
261 name, value);
262 return -1;
267 * Return 0 is success.
268 * Return 1 is failed.
270 static int find_and_check_chardev(CharDriverState **chr,
271 char *chr_name,
272 Error **errp)
274 CompareChardevProps props;
276 *chr = qemu_chr_find(chr_name);
277 if (*chr == NULL) {
278 error_setg(errp, "Device '%s' not found",
279 chr_name);
280 return 1;
283 memset(&props, 0, sizeof(props));
284 if (qemu_opt_foreach((*chr)->opts, compare_chardev_opts, &props, errp)) {
285 return 1;
288 if (!props.is_socket) {
289 error_setg(errp, "chardev \"%s\" is not a tcp socket",
290 chr_name);
291 return 1;
293 return 0;
297 * Called from the main thread on the primary
298 * to setup colo-compare.
300 static void colo_compare_complete(UserCreatable *uc, Error **errp)
302 CompareState *s = COLO_COMPARE(uc);
304 if (!s->pri_indev || !s->sec_indev || !s->outdev) {
305 error_setg(errp, "colo compare needs 'primary_in' ,"
306 "'secondary_in','outdev' property set");
307 return;
308 } else if (!strcmp(s->pri_indev, s->outdev) ||
309 !strcmp(s->sec_indev, s->outdev) ||
310 !strcmp(s->pri_indev, s->sec_indev)) {
311 error_setg(errp, "'indev' and 'outdev' could not be same "
312 "for compare module");
313 return;
316 if (find_and_check_chardev(&s->chr_pri_in, s->pri_indev, errp)) {
317 return;
320 if (find_and_check_chardev(&s->chr_sec_in, s->sec_indev, errp)) {
321 return;
324 if (find_and_check_chardev(&s->chr_out, s->outdev, errp)) {
325 return;
328 qemu_chr_fe_claim_no_fail(s->chr_pri_in);
330 qemu_chr_fe_claim_no_fail(s->chr_sec_in);
332 qemu_chr_fe_claim_no_fail(s->chr_out);
334 net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize);
335 net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize);
337 g_queue_init(&s->conn_list);
339 s->connection_track_table = g_hash_table_new_full(connection_key_hash,
340 connection_key_equal,
341 g_free,
342 connection_destroy);
344 return;
347 static void colo_compare_class_init(ObjectClass *oc, void *data)
349 UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
351 ucc->complete = colo_compare_complete;
354 static void colo_compare_init(Object *obj)
356 object_property_add_str(obj, "primary_in",
357 compare_get_pri_indev, compare_set_pri_indev,
358 NULL);
359 object_property_add_str(obj, "secondary_in",
360 compare_get_sec_indev, compare_set_sec_indev,
361 NULL);
362 object_property_add_str(obj, "outdev",
363 compare_get_outdev, compare_set_outdev,
364 NULL);
367 static void colo_compare_finalize(Object *obj)
369 CompareState *s = COLO_COMPARE(obj);
371 if (s->chr_pri_in) {
372 qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
373 qemu_chr_fe_release(s->chr_pri_in);
375 if (s->chr_sec_in) {
376 qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
377 qemu_chr_fe_release(s->chr_sec_in);
379 if (s->chr_out) {
380 qemu_chr_fe_release(s->chr_out);
383 g_queue_free(&s->conn_list);
385 g_free(s->pri_indev);
386 g_free(s->sec_indev);
387 g_free(s->outdev);
390 static const TypeInfo colo_compare_info = {
391 .name = TYPE_COLO_COMPARE,
392 .parent = TYPE_OBJECT,
393 .instance_size = sizeof(CompareState),
394 .instance_init = colo_compare_init,
395 .instance_finalize = colo_compare_finalize,
396 .class_size = sizeof(CompareClass),
397 .class_init = colo_compare_class_init,
398 .interfaces = (InterfaceInfo[]) {
399 { TYPE_USER_CREATABLE },
404 static void register_types(void)
406 type_register_static(&colo_compare_info);
409 type_init(register_types);