2 * Copyright (c) 2005-2006 MontaVista Software, Inc.
3 * Copyright (c) 2006 Ericsson AB.
4 * Copyright (c) 2006-2007 Red Hat, Inc.
6 * Author: Steven Dake (sdake@mvista.com)
12 * This software licensed under BSD license, the text of which follows:
14 * Redistribution and use in source and binary forms, with or without
15 * modification, are permitted provided that the following conditions are met:
17 * - Redistributions of source code must retain the above copyright notice,
18 * this list of conditions and the following disclaimer.
19 * - Redistributions in binary form must reproduce the above copyright notice,
20 * this list of conditions and the following disclaimer in the documentation
21 * and/or other materials provided with the distribution.
22 * - Neither the name of the MontaVista Software, Inc. nor the names of its
23 * contributors may be used to endorse or promote products derived from this
24 * software without specific prior written permission.
26 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
27 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
28 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
29 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
30 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
31 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
32 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
33 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
34 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
35 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
36 * THE POSSIBILITY OF SUCH DAMAGE.
38 #include <sys/types.h>
39 #include <sys/socket.h>
41 #include <sys/ioctl.h>
42 #include <netinet/in.h>
52 #include <netinet/in.h>
53 #include <arpa/inet.h>
55 #include "../include/saAis.h"
62 #include "../lcr/lcr_ifact.h"
66 LOGSYS_DECLARE_SUBSYS ("SYNC", LOG_INFO
);
68 #define MESSAGE_REQ_SYNC_BARRIER 0
69 #define MESSAGE_REQ_SYNC_REQUEST 1
76 static struct memb_ring_id
*sync_ring_id
;
78 static int vsf_none
= 0;
80 static int (*sync_callbacks_retrieve
) (int sync_id
,
81 struct sync_callbacks
*callack
);
83 static struct sync_callbacks sync_callbacks
;
85 static int sync_processing
= 0;
87 static void (*sync_synchronization_completed
) (void);
89 static int sync_recovery_index
= 0;
91 static void *sync_callback_token_handle
= 0;
92 static void *sync_request_token_handle
;
94 static struct barrier_data barrier_data_process
[PROCESSOR_COUNT_MAX
];
96 static struct openais_vsf_iface_ver0
*vsf_iface
;
98 static int sync_barrier_send (struct memb_ring_id
*ring_id
);
100 static int sync_start_process (
101 enum totem_callback_token_type type
, void *data
);
103 static void sync_service_init (struct memb_ring_id
*ring_id
);
105 static int sync_service_process (
106 enum totem_callback_token_type type
, void *data
);
108 static void sync_deliver_fn (
112 int endian_conversion_required
);
114 static void sync_confchg_fn (
115 enum totem_configuration_type configuration_type
,
116 unsigned int *member_list
, int member_list_entries
,
117 unsigned int *left_list
, int left_list_entries
,
118 unsigned int *joined_list
, int joined_list_entries
,
119 struct memb_ring_id
*ring_id
);
121 static void sync_primary_callback_fn (
122 unsigned int *view_list
,
123 int view_list_entries
,
124 int primary_designated
,
125 struct memb_ring_id
*ring_id
);
127 static struct totempg_group sync_group
= {
132 static totempg_groups_handle sync_group_handle
;
133 static char *service_name
;
134 static struct memb_ring_id deliver_ring_id
;
135 static unsigned int current_members
[PROCESSOR_COUNT_MAX
];
136 static unsigned int current_members_cnt
;
138 struct sync_barrier_start
{
141 struct sync_request
{
143 char name
[0] __attribute__((aligned(8)));
146 typedef struct sync_msg
{
147 mar_req_header_t header
;
148 struct memb_ring_id ring_id
;
150 struct sync_barrier_start sync_barrier_start
;
151 struct sync_request sync_request
;
156 * Send a barrier data structure
158 static int sync_barrier_send (struct memb_ring_id
*ring_id
)
164 msg
.header
.size
= sizeof (sync_msg_t
);
165 msg
.header
.id
= MESSAGE_REQ_SYNC_BARRIER
;
167 memcpy (&msg
.ring_id
, ring_id
, sizeof (struct memb_ring_id
));
169 iovec
.iov_base
= (char *)&msg
;
170 iovec
.iov_len
= sizeof (msg
);
172 res
= totempg_groups_mcast_joined (
173 sync_group_handle
, &iovec
, 1, TOTEMPG_AGREED
);
178 static void sync_start_init (struct memb_ring_id
*ring_id
)
181 totempg_callback_token_create (
182 &sync_callback_token_handle
,
183 TOTEM_CALLBACK_TOKEN_SENT
,
184 0, /* don't delete after callback */
190 static void sync_service_init (struct memb_ring_id
*ring_id
)
193 sync_callbacks
.sync_init ();
194 totempg_callback_token_destroy (&sync_callback_token_handle
);
197 * Create the token callback for the processing
199 totempg_callback_token_create (
200 &sync_callback_token_handle
,
201 TOTEM_CALLBACK_TOKEN_SENT
,
202 0, /* don't delete after callback */
203 sync_service_process
,
208 static int sync_start_process (
209 enum totem_callback_token_type type
, void *data
)
212 struct memb_ring_id
*ring_id
= (struct memb_ring_id
*)data
;
215 res
= sync_barrier_send (ring_id
);
218 * Delete the token callback for the barrier
220 totempg_callback_token_destroy (&sync_callback_token_handle
);
226 static void sync_callbacks_load (void)
232 res
= sync_callbacks_retrieve (sync_recovery_index
,
235 * No more service handlers have sync callbacks at this time
241 if ((service_name
!= NULL
) &&
242 strcmp (sync_callbacks
.name
, service_name
) != 0) {
243 sync_recovery_index
+= 1;
246 sync_recovery_index
+= 1;
247 if (sync_callbacks
.sync_init
) {
254 static int sync_service_process (
255 enum totem_callback_token_type type
, void *data
)
258 struct memb_ring_id
*ring_id
= (struct memb_ring_id
*)data
;
263 * If process operation not from this ring id, then ignore it and stop
266 if (memcmp (ring_id
, sync_ring_id
, sizeof (struct memb_ring_id
)) != 0) {
271 * If process returns 0, then its time to activate
272 * and start the next service's synchronization
274 res
= sync_callbacks
.sync_process ();
278 totempg_callback_token_destroy (&sync_callback_token_handle
);
280 sync_start_init (ring_id
);
288 int (*callbacks_retrieve
) (int sync_id
, struct sync_callbacks
*callack
),
289 void (*synchronization_completed
) (void),
293 unsigned int vsf_handle
;
295 char openais_vsf_type
[1024];
297 res
= totempg_groups_initialize (
302 log_printf (LOG_LEVEL_ERROR
,
303 "Couldn't initialize groups interface.\n");
307 res
= totempg_groups_join (
312 log_printf (LOG_LEVEL_ERROR
, "Couldn't join group.\n");
316 if (strcmp (vsf_type
, "none") == 0) {
317 log_printf (LOG_LEVEL_NOTICE
,
318 "Not using a virtual synchrony filter.\n");
323 sprintf (openais_vsf_type
, "openais_vsf_%s", vsf_type
);
324 res
= lcr_ifact_reference (
332 log_printf (LOG_LEVEL_NOTICE
,
333 "Couldn't load virtual synchrony filter %s\n",
338 log_printf (LOG_LEVEL_NOTICE
,
339 "Using virtual synchrony filter %s\n", openais_vsf_type
);
341 vsf_iface
= (struct openais_vsf_iface_ver0
*)vsf_iface_p
;
342 vsf_iface
->init (sync_primary_callback_fn
);
345 sync_callbacks_retrieve
= callbacks_retrieve
;
346 sync_synchronization_completed
= synchronization_completed
;
350 static void sync_primary_callback_fn (
351 unsigned int *view_list
,
352 int view_list_entries
,
353 int primary_designated
,
354 struct memb_ring_id
*ring_id
)
360 if (primary_designated
) {
361 log_printf (LOG_LEVEL_NOTICE
,
362 "This node is within the primary component and will provide"
365 log_printf (LOG_LEVEL_NOTICE
,
366 "This node is within the non-primary component and will NOT"
367 " provide any services.\n");
372 * Execute configuration change for synchronization service
376 totempg_callback_token_destroy (&sync_callback_token_handle
);
378 sync_recovery_index
= 0;
380 for (i
= 0; i
< view_list_entries
; i
++) {
381 barrier_data_process
[i
].nodeid
= view_list
[i
];
382 barrier_data_process
[i
].completed
= 0;
385 sync_start_init (sync_ring_id
);
389 static void sync_deliver_fn (
393 int endian_conversion_required
)
396 int barrier_completed
;
397 sync_msg_t
*msg
= (sync_msg_t
*)iovec
[0].iov_base
;
399 ENTER("type %d, len %d", msg
->header
.id
, (int)iovec
[0].iov_len
);
401 if (endian_conversion_required
) {
402 swab_mar_req_header_t (&msg
->header
);
403 swab_memb_ring_id_t (&msg
->ring_id
);
407 * If this message is not from this configuration, ignore it
409 if (memcmp (&msg
->ring_id
, sync_ring_id
,
410 sizeof (struct memb_ring_id
)) != 0) {
414 if (msg
->header
.id
== MESSAGE_REQ_SYNC_REQUEST
) {
415 if (endian_conversion_required
) {
416 swab_mar_uint32_t (&msg
->sync_request
.name_len
);
419 * If there is an ongoing sync, abort it. A requested sync is
420 * only allowed to abort other requested synchronizations,
421 * not full synchronizations.
423 if (sync_processing
&& sync_callbacks
.sync_abort
) {
424 sync_callbacks
.sync_abort();
425 sync_callbacks
.sync_activate
= NULL
;
427 assert (service_name
!= NULL
);
432 service_name
= malloc (msg
->sync_request
.name_len
);
433 strcpy (service_name
, msg
->sync_request
.name
);
436 * Start requested synchronization
438 sync_primary_callback_fn (current_members
, current_members_cnt
, 1,
444 barrier_completed
= 1;
446 memcpy (&deliver_ring_id
, &msg
->ring_id
, sizeof (struct memb_ring_id
));
449 * Set completion for source_addr's address
451 for (i
= 0; i
< current_members_cnt
; i
++) {
452 if (nodeid
== barrier_data_process
[i
].nodeid
) {
453 barrier_data_process
[i
].completed
= 1;
454 log_printf (LOG_LEVEL_DEBUG
,
455 "Barrier Start Received From %d\n",
456 barrier_data_process
[i
].nodeid
);
462 * Test if barrier is complete
464 for (i
= 0; i
< current_members_cnt
; i
++) {
465 log_printf (LOG_LEVEL_DEBUG
,
466 "Barrier completion status for nodeid %d = %d. \n",
467 barrier_data_process
[i
].nodeid
,
468 barrier_data_process
[i
].completed
);
470 if (barrier_data_process
[i
].completed
== 0) {
471 barrier_completed
= 0;
475 if (barrier_completed
) {
476 log_printf (LOG_LEVEL_DEBUG
, "Synchronization barrier completed\n");
478 * This sync is complete so activate and start next service sync
480 if (sync_callbacks
.sync_activate
) {
481 log_printf (LOG_LEVEL_DEBUG
,
482 "Committing synchronization for (%s)\n",
483 sync_callbacks
.name
);
485 sync_callbacks
.sync_activate ();
489 * Start synchronization if the barrier has completed
491 for (i
= 0; i
< current_members_cnt
; i
++) {
492 barrier_data_process
[i
].nodeid
= current_members
[i
];
493 barrier_data_process
[i
].completed
= 0;
496 sync_callbacks_load();
499 * if sync service found, execute it
501 if (sync_processing
&& sync_callbacks
.sync_init
) {
502 log_printf (LOG_LEVEL_DEBUG
,
503 "Synchronization actions starting for (%s)\n",
504 sync_callbacks
.name
);
505 sync_service_init (&deliver_ring_id
);
507 if (service_name
!= NULL
) {
517 static void sync_confchg_fn (
518 enum totem_configuration_type configuration_type
,
519 unsigned int *member_list
, int member_list_entries
,
520 unsigned int *left_list
, int left_list_entries
,
521 unsigned int *joined_list
, int joined_list_entries
,
522 struct memb_ring_id
*ring_id
)
528 if (configuration_type
!= TOTEM_CONFIGURATION_REGULAR
) {
534 * Save current members and ring ID for later use
536 for (i
= 0; i
< member_list_entries
; i
++) {
537 current_members
[i
] = member_list
[i
];
539 current_members_cnt
= member_list_entries
;
540 sync_ring_id
= ring_id
;
543 * If no virtual synchrony filter configured.
547 * If there is an ongoing synchronization, abort it.
549 if (sync_processing
&& sync_callbacks
.sync_abort
) {
550 sync_callbacks
.sync_abort();
551 sync_callbacks
.sync_activate
= NULL
;
553 if (service_name
!= NULL
) {
560 * Start new synchronization process
562 sync_primary_callback_fn (
563 member_list
, member_list_entries
, 1, ring_id
);
569 * TOTEM callback function used to multicast a sync_request
576 static int sync_request_send (
577 enum totem_callback_token_type type
, void *_name
)
582 struct iovec iovec
[2];
587 name_len
= strlen (name
) + 1;
588 msg
.header
.size
= sizeof (msg
) + name_len
;
589 msg
.header
.id
= MESSAGE_REQ_SYNC_REQUEST
;
591 memcpy (&msg
.ring_id
, sync_ring_id
, sizeof (struct memb_ring_id
));
592 msg
.sync_request
.name_len
= name_len
;
594 iovec
[0].iov_base
= (char *)&msg
;
595 iovec
[0].iov_len
= sizeof (msg
);
596 iovec
[1].iov_base
= _name
;
597 iovec
[1].iov_len
= name_len
;
599 res
= totempg_groups_mcast_joined (
600 sync_group_handle
, iovec
, 2, TOTEMPG_AGREED
);
604 * We managed to multicast the message so delete the token callback
605 * for the sync request.
607 totempg_callback_token_destroy (&sync_request_token_handle
);
611 * if we failed to multicast the message, this function will be called
619 int sync_in_process (void)
621 return (sync_processing
);
624 int sync_primary_designated (void)
629 return (vsf_iface
->primary());
634 * Execute synchronization upon request for the named service
639 int sync_request (char *name
)
641 assert (name
!= NULL
);
645 if (sync_processing
) {
649 totempg_callback_token_create (&sync_request_token_handle
,
650 TOTEM_CALLBACK_TOKEN_SENT
, 0, /* don't delete after callback */
651 sync_request_send
, name
);