Patch to remove segfault on the exiting of a service.
[openais.git] / exec / sync.c
blob706e5ee460d18bc30c742b0c77ce1c91787f64ec
1 /*
2 * Copyright (c) 2005-2006 MontaVista Software, Inc.
3 * Copyright (c) 2006 Ericsson AB.
4 * Copyright (c) 2006-2007 Red Hat, Inc.
6 * Author: Steven Dake (sdake@mvista.com)
7 * Author: Hans Feldt
9 * All rights reserved.
12 * This software licensed under BSD license, the text of which follows:
14 * Redistribution and use in source and binary forms, with or without
15 * modification, are permitted provided that the following conditions are met:
17 * - Redistributions of source code must retain the above copyright notice,
18 * this list of conditions and the following disclaimer.
19 * - Redistributions in binary form must reproduce the above copyright notice,
20 * this list of conditions and the following disclaimer in the documentation
21 * and/or other materials provided with the distribution.
22 * - Neither the name of the MontaVista Software, Inc. nor the names of its
23 * contributors may be used to endorse or promote products derived from this
24 * software without specific prior written permission.
26 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
27 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
28 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
29 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
30 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
31 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
32 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
33 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
34 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
35 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
36 * THE POSSIBILITY OF SUCH DAMAGE.
38 #include <sys/types.h>
39 #include <sys/socket.h>
40 #include <sys/un.h>
41 #include <sys/ioctl.h>
42 #include <netinet/in.h>
43 #include <sys/uio.h>
44 #include <unistd.h>
45 #include <fcntl.h>
46 #include <stdlib.h>
47 #include <stdio.h>
48 #include <errno.h>
49 #include <signal.h>
50 #include <time.h>
51 #include <unistd.h>
52 #include <netinet/in.h>
53 #include <arpa/inet.h>
55 #include "../include/saAis.h"
56 #include "main.h"
57 #include "sync.h"
58 #include "totempg.h"
59 #include "totemip.h"
60 #include "totem.h"
61 #include "vsf.h"
62 #include "../lcr/lcr_ifact.h"
63 #include "logsys.h"
64 #include "util.h"
66 LOGSYS_DECLARE_SUBSYS ("SYNC", LOG_INFO);
68 #define MESSAGE_REQ_SYNC_BARRIER 0
69 #define MESSAGE_REQ_SYNC_REQUEST 1
71 struct barrier_data {
72 unsigned int nodeid;
73 int completed;
76 static struct memb_ring_id *sync_ring_id;
78 static int vsf_none = 0;
80 static int (*sync_callbacks_retrieve) (int sync_id,
81 struct sync_callbacks *callack);
83 static struct sync_callbacks sync_callbacks;
85 static int sync_processing = 0;
87 static void (*sync_synchronization_completed) (void);
89 static int sync_recovery_index = 0;
91 static void *sync_callback_token_handle = 0;
92 static void *sync_request_token_handle;
94 static struct barrier_data barrier_data_process[PROCESSOR_COUNT_MAX];
96 static struct openais_vsf_iface_ver0 *vsf_iface;
98 static int sync_barrier_send (struct memb_ring_id *ring_id);
100 static int sync_start_process (
101 enum totem_callback_token_type type, void *data);
103 static void sync_service_init (struct memb_ring_id *ring_id);
105 static int sync_service_process (
106 enum totem_callback_token_type type, void *data);
108 static void sync_deliver_fn (
109 unsigned int nodeid,
110 struct iovec *iovec,
111 int iov_len,
112 int endian_conversion_required);
114 static void sync_confchg_fn (
115 enum totem_configuration_type configuration_type,
116 unsigned int *member_list, int member_list_entries,
117 unsigned int *left_list, int left_list_entries,
118 unsigned int *joined_list, int joined_list_entries,
119 struct memb_ring_id *ring_id);
121 static void sync_primary_callback_fn (
122 unsigned int *view_list,
123 int view_list_entries,
124 int primary_designated,
125 struct memb_ring_id *ring_id);
127 static struct totempg_group sync_group = {
128 .group = "sync",
129 .group_len = 4
132 static totempg_groups_handle sync_group_handle;
133 static char *service_name;
134 static struct memb_ring_id deliver_ring_id;
135 static unsigned int current_members[PROCESSOR_COUNT_MAX];
136 static unsigned int current_members_cnt;
138 struct sync_barrier_start {
141 struct sync_request {
142 uint32_t name_len;
143 char name[0] __attribute__((aligned(8)));
146 typedef struct sync_msg {
147 mar_req_header_t header;
148 struct memb_ring_id ring_id;
149 union {
150 struct sync_barrier_start sync_barrier_start;
151 struct sync_request sync_request;
153 } sync_msg_t;
156 * Send a barrier data structure
158 static int sync_barrier_send (struct memb_ring_id *ring_id)
160 sync_msg_t msg;
161 struct iovec iovec;
162 int res;
164 msg.header.size = sizeof (sync_msg_t);
165 msg.header.id = MESSAGE_REQ_SYNC_BARRIER;
167 memcpy (&msg.ring_id, ring_id, sizeof (struct memb_ring_id));
169 iovec.iov_base = (char *)&msg;
170 iovec.iov_len = sizeof (msg);
172 res = totempg_groups_mcast_joined (
173 sync_group_handle, &iovec, 1, TOTEMPG_AGREED);
175 return (res);
178 static void sync_start_init (struct memb_ring_id *ring_id)
180 ENTER("");
181 totempg_callback_token_create (
182 &sync_callback_token_handle,
183 TOTEM_CALLBACK_TOKEN_SENT,
184 0, /* don't delete after callback */
185 sync_start_process,
186 (void *)ring_id);
187 LEAVE("");
190 static void sync_service_init (struct memb_ring_id *ring_id)
192 ENTER("");
193 sync_callbacks.sync_init ();
194 totempg_callback_token_destroy (&sync_callback_token_handle);
197 * Create the token callback for the processing
199 totempg_callback_token_create (
200 &sync_callback_token_handle,
201 TOTEM_CALLBACK_TOKEN_SENT,
202 0, /* don't delete after callback */
203 sync_service_process,
204 (void *)ring_id);
205 LEAVE("");
208 static int sync_start_process (
209 enum totem_callback_token_type type, void *data)
211 int res;
212 struct memb_ring_id *ring_id = (struct memb_ring_id *)data;
214 ENTER("");
215 res = sync_barrier_send (ring_id);
216 if (res == 0) {
218 * Delete the token callback for the barrier
220 totempg_callback_token_destroy (&sync_callback_token_handle);
222 LEAVE("");
223 return (0);
226 static void sync_callbacks_load (void)
228 int res;
230 ENTER("");
231 for (;;) {
232 res = sync_callbacks_retrieve (sync_recovery_index,
233 &sync_callbacks);
235 * No more service handlers have sync callbacks at this time
237 if (res == -1) {
238 sync_processing = 0;
239 break;
241 if ((service_name != NULL) &&
242 strcmp (sync_callbacks.name, service_name) != 0) {
243 sync_recovery_index += 1;
244 continue;
246 sync_recovery_index += 1;
247 if (sync_callbacks.sync_init) {
248 break;
251 LEAVE("");
254 static int sync_service_process (
255 enum totem_callback_token_type type, void *data)
257 int res;
258 struct memb_ring_id *ring_id = (struct memb_ring_id *)data;
260 ENTER("");
263 * If process operation not from this ring id, then ignore it and stop
264 * processing
266 if (memcmp (ring_id, sync_ring_id, sizeof (struct memb_ring_id)) != 0) {
267 goto end;
271 * If process returns 0, then its time to activate
272 * and start the next service's synchronization
274 res = sync_callbacks.sync_process ();
275 if (res != 0) {
276 goto end;
278 totempg_callback_token_destroy (&sync_callback_token_handle);
280 sync_start_init (ring_id);
282 end:
283 LEAVE("");
284 return (0);
287 int sync_register (
288 int (*callbacks_retrieve) (int sync_id, struct sync_callbacks *callack),
289 void (*synchronization_completed) (void),
290 char *vsf_type)
292 unsigned int res;
293 unsigned int vsf_handle;
294 void *vsf_iface_p;
295 char openais_vsf_type[1024];
297 res = totempg_groups_initialize (
298 &sync_group_handle,
299 sync_deliver_fn,
300 sync_confchg_fn);
301 if (res == -1) {
302 log_printf (LOG_LEVEL_ERROR,
303 "Couldn't initialize groups interface.\n");
304 return (-1);
307 res = totempg_groups_join (
308 sync_group_handle,
309 &sync_group,
311 if (res == -1) {
312 log_printf (LOG_LEVEL_ERROR, "Couldn't join group.\n");
313 return (-1);
316 if (strcmp (vsf_type, "none") == 0) {
317 log_printf (LOG_LEVEL_NOTICE,
318 "Not using a virtual synchrony filter.\n");
319 vsf_none = 1;
320 } else {
321 vsf_none = 0;
323 sprintf (openais_vsf_type, "openais_vsf_%s", vsf_type);
324 res = lcr_ifact_reference (
325 &vsf_handle,
326 openais_vsf_type,
328 &vsf_iface_p,
331 if (res == -1) {
332 log_printf (LOG_LEVEL_NOTICE,
333 "Couldn't load virtual synchrony filter %s\n",
334 vsf_type);
335 return (-1);
338 log_printf (LOG_LEVEL_NOTICE,
339 "Using virtual synchrony filter %s\n", openais_vsf_type);
341 vsf_iface = (struct openais_vsf_iface_ver0 *)vsf_iface_p;
342 vsf_iface->init (sync_primary_callback_fn);
345 sync_callbacks_retrieve = callbacks_retrieve;
346 sync_synchronization_completed = synchronization_completed;
347 return (0);
350 static void sync_primary_callback_fn (
351 unsigned int *view_list,
352 int view_list_entries,
353 int primary_designated,
354 struct memb_ring_id *ring_id)
356 int i;
358 ENTER("");
360 if (primary_designated) {
361 log_printf (LOG_LEVEL_NOTICE,
362 "This node is within the primary component and will provide"
363 " service.\n");
364 } else {
365 log_printf (LOG_LEVEL_NOTICE,
366 "This node is within the non-primary component and will NOT"
367 " provide any services.\n");
368 return;
372 * Execute configuration change for synchronization service
374 sync_processing = 1;
376 totempg_callback_token_destroy (&sync_callback_token_handle);
378 sync_recovery_index = 0;
380 for (i = 0; i < view_list_entries; i++) {
381 barrier_data_process[i].nodeid = view_list[i];
382 barrier_data_process[i].completed = 0;
385 sync_start_init (sync_ring_id);
386 LEAVE("");
389 static void sync_deliver_fn (
390 unsigned int nodeid,
391 struct iovec *iovec,
392 int iov_len,
393 int endian_conversion_required)
395 int i;
396 int barrier_completed;
397 sync_msg_t *msg = (sync_msg_t *)iovec[0].iov_base;
399 ENTER("type %d, len %d", msg->header.id, (int)iovec[0].iov_len);
401 if (endian_conversion_required) {
402 swab_mar_req_header_t (&msg->header);
403 swab_memb_ring_id_t (&msg->ring_id);
407 * If this message is not from this configuration, ignore it
409 if (memcmp (&msg->ring_id, sync_ring_id,
410 sizeof (struct memb_ring_id)) != 0) {
411 goto end;
414 if (msg->header.id == MESSAGE_REQ_SYNC_REQUEST) {
415 if (endian_conversion_required) {
416 swab_mar_uint32_t (&msg->sync_request.name_len);
419 * If there is an ongoing sync, abort it. A requested sync is
420 * only allowed to abort other requested synchronizations,
421 * not full synchronizations.
423 if (sync_processing && sync_callbacks.sync_abort) {
424 sync_callbacks.sync_abort();
425 sync_callbacks.sync_activate = NULL;
426 sync_processing = 0;
427 assert (service_name != NULL);
428 free (service_name);
429 service_name = NULL;
432 service_name = malloc (msg->sync_request.name_len);
433 strcpy (service_name, msg->sync_request.name);
436 * Start requested synchronization
438 sync_primary_callback_fn (current_members, current_members_cnt, 1,
439 sync_ring_id);
441 goto end;
444 barrier_completed = 1;
446 memcpy (&deliver_ring_id, &msg->ring_id, sizeof (struct memb_ring_id));
449 * Set completion for source_addr's address
451 for (i = 0; i < current_members_cnt; i++) {
452 if (nodeid == barrier_data_process[i].nodeid) {
453 barrier_data_process[i].completed = 1;
454 log_printf (LOG_LEVEL_DEBUG,
455 "Barrier Start Received From %d\n",
456 barrier_data_process[i].nodeid);
457 break;
462 * Test if barrier is complete
464 for (i = 0; i < current_members_cnt; i++) {
465 log_printf (LOG_LEVEL_DEBUG,
466 "Barrier completion status for nodeid %d = %d. \n",
467 barrier_data_process[i].nodeid,
468 barrier_data_process[i].completed);
470 if (barrier_data_process[i].completed == 0) {
471 barrier_completed = 0;
475 if (barrier_completed) {
476 log_printf (LOG_LEVEL_DEBUG, "Synchronization barrier completed\n");
478 * This sync is complete so activate and start next service sync
480 if (sync_callbacks.sync_activate) {
481 log_printf (LOG_LEVEL_DEBUG,
482 "Committing synchronization for (%s)\n",
483 sync_callbacks.name);
485 sync_callbacks.sync_activate ();
489 * Start synchronization if the barrier has completed
491 for (i = 0; i < current_members_cnt; i++) {
492 barrier_data_process[i].nodeid = current_members[i];
493 barrier_data_process[i].completed = 0;
496 sync_callbacks_load();
499 * if sync service found, execute it
501 if (sync_processing && sync_callbacks.sync_init) {
502 log_printf (LOG_LEVEL_DEBUG,
503 "Synchronization actions starting for (%s)\n",
504 sync_callbacks.name);
505 sync_service_init (&deliver_ring_id);
506 } else {
507 if (service_name != NULL) {
508 free (service_name);
509 service_name = NULL;
513 end:
514 LEAVE("");
517 static void sync_confchg_fn (
518 enum totem_configuration_type configuration_type,
519 unsigned int *member_list, int member_list_entries,
520 unsigned int *left_list, int left_list_entries,
521 unsigned int *joined_list, int joined_list_entries,
522 struct memb_ring_id *ring_id)
524 int i;
526 ENTER("");
528 if (configuration_type != TOTEM_CONFIGURATION_REGULAR) {
529 LEAVE("");
530 return;
534 * Save current members and ring ID for later use
536 for (i = 0; i < member_list_entries; i++) {
537 current_members[i] = member_list[i];
539 current_members_cnt = member_list_entries;
540 sync_ring_id = ring_id;
543 * If no virtual synchrony filter configured.
545 if (vsf_none == 1) {
547 * If there is an ongoing synchronization, abort it.
549 if (sync_processing && sync_callbacks.sync_abort) {
550 sync_callbacks.sync_abort();
551 sync_callbacks.sync_activate = NULL;
552 sync_processing = 0;
553 if (service_name != NULL) {
554 free (service_name);
555 service_name = NULL;
560 * Start new synchronization process
562 sync_primary_callback_fn (
563 member_list, member_list_entries, 1, ring_id);
565 LEAVE("");
569 * TOTEM callback function used to multicast a sync_request
570 * message
571 * @param type
572 * @param _name
574 * @return int
576 static int sync_request_send (
577 enum totem_callback_token_type type, void *_name)
579 int res;
580 char *name = _name;
581 sync_msg_t msg;
582 struct iovec iovec[2];
583 int name_len;
585 ENTER("'%s'", name);
587 name_len = strlen (name) + 1;
588 msg.header.size = sizeof (msg) + name_len;
589 msg.header.id = MESSAGE_REQ_SYNC_REQUEST;
591 memcpy (&msg.ring_id, sync_ring_id, sizeof (struct memb_ring_id));
592 msg.sync_request.name_len = name_len;
594 iovec[0].iov_base = (char *)&msg;
595 iovec[0].iov_len = sizeof (msg);
596 iovec[1].iov_base = _name;
597 iovec[1].iov_len = name_len;
599 res = totempg_groups_mcast_joined (
600 sync_group_handle, iovec, 2, TOTEMPG_AGREED);
602 if (res == 0) {
604 * We managed to multicast the message so delete the token callback
605 * for the sync request.
607 totempg_callback_token_destroy (&sync_request_token_handle);
611 * if we failed to multicast the message, this function will be called
612 * again.
615 LEAVE("");
616 return (0);
619 int sync_in_process (void)
621 return (sync_processing);
624 int sync_primary_designated (void)
626 if (vsf_none == 1) {
627 return (1);
628 } else {
629 return (vsf_iface->primary());
634 * Execute synchronization upon request for the named service
635 * @param name
637 * @return int
639 int sync_request (char *name)
641 assert (name != NULL);
643 ENTER("'%s'", name);
645 if (sync_processing) {
646 return -1;
649 totempg_callback_token_create (&sync_request_token_handle,
650 TOTEM_CALLBACK_TOKEN_SENT, 0, /* don't delete after callback */
651 sync_request_send, name);
653 LEAVE("");
655 return 0;