Patch to remove segfault on the exiting of a service.
[openais.git] / exec / vsf_ykd.c
blobc61d479ac543ee18c232827bb32f254f9ae406ee
1 /*
2 * Copyright (c) 2005 MontaVista Software, Inc.
3 * Copyright (c) 2006 Red Hat, Inc.
4 * Copyright (c) 2006 Sun Microsystems, Inc.
6 * All rights reserved.
8 * Author: Steven Dake (sdake@mvista.com)
10 * This software licensed under BSD license, the text of which follows:
12 * Redistribution and use in source and binary forms, with or without
13 * modification, are permitted provided that the following conditions are met:
15 * - Redistributions of source code must retain the above copyright notice,
16 * this list of conditions and the following disclaimer.
17 * - Redistributions in binary form must reproduce the above copyright notice,
18 * this list of conditions and the following disclaimer in the documentation
19 * and/or other materials provided with the distribution.
20 * - Neither the name of the MontaVista Software, Inc. nor the names of its
21 * contributors may be used to endorse or promote products derived from this
22 * software without specific prior written permission.
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
34 * THE POSSIBILITY OF SUCH DAMAGE.
37 #include <assert.h>
38 #include <pwd.h>
39 #include <grp.h>
40 #include <sys/types.h>
41 #include <sys/poll.h>
42 #include <sys/uio.h>
43 #include <sys/mman.h>
44 #include <sys/socket.h>
45 #include <sys/un.h>
46 #include <sys/time.h>
47 #include <sys/resource.h>
48 #include <netinet/in.h>
49 #include <arpa/inet.h>
50 #include <unistd.h>
51 #include <fcntl.h>
52 #include <stdlib.h>
53 #include <stdio.h>
54 #include <errno.h>
55 #include <signal.h>
56 #include <sched.h>
57 #include <time.h>
59 #include "main.h"
60 #include "logsys.h"
61 #include "swab.h"
62 #include "vsf.h"
63 #include "../lcr/lcr_comp.h"
65 LOGSYS_DECLARE_SUBSYS ("YKD", LOG_INFO);
67 #define YKD_PROCESSOR_COUNT_MAX 32
69 enum ykd_header_values {
70 YKD_HEADER_SENDSTATE = 0,
71 YKD_HEADER_ATTEMPT = 1
74 enum ykd_mode {
75 YKD_MODE_SENDSTATE = 0,
76 YKD_MODE_ATTEMPT = 1
79 struct ykd_header {
80 int id;
83 struct ykd_session {
84 unsigned int member_list[YKD_PROCESSOR_COUNT_MAX];
85 int member_list_entries;
86 int session_id;
89 struct ykd_state {
90 struct ykd_session last_primary;
92 struct ykd_session last_formed[YKD_PROCESSOR_COUNT_MAX];
94 int last_formed_entries;
96 struct ykd_session ambiguous_sessions[YKD_PROCESSOR_COUNT_MAX];
98 int ambiguous_sessions_entries;
100 int session_id;
103 struct state_received {
104 unsigned int nodeid;
105 int received;
106 struct ykd_state ykd_state;
109 struct ykd_state ykd_state;
111 static totempg_groups_handle ykd_group_handle;
113 static struct state_received state_received_confchg[YKD_PROCESSOR_COUNT_MAX];
115 static int state_received_confchg_entries;
117 static struct state_received state_received_process[YKD_PROCESSOR_COUNT_MAX];
119 static int state_received_process_entries;
121 static enum ykd_mode ykd_mode;
123 static unsigned int view_list[YKD_PROCESSOR_COUNT_MAX];
125 static int view_list_entries;
127 static int session_id_max;
129 static struct ykd_session *last_primary_max;
131 static struct ykd_session ambiguous_sessions_max[YKD_PROCESSOR_COUNT_MAX];
133 static int ambiguous_sessions_max_entries;
135 static int primary_designated = 0;
137 static struct memb_ring_id ykd_ring_id;
139 static void *ykd_attempt_send_callback_token_handle = 0;
141 static void *ykd_state_send_callback_token_handle = 0;
143 static void (*ykd_primary_callback_fn) (
144 unsigned int *view_list,
145 int view_list_entries,
146 int primary_designated,
147 struct memb_ring_id *ring_id) = NULL;
149 void ykd_state_init (void)
151 ykd_state.session_id = 0;
152 ykd_state.last_formed_entries = 0;
153 ykd_state.ambiguous_sessions_entries = 0;
154 ykd_state.last_primary.session_id = 0;
155 ykd_state.last_primary.member_list_entries = 0;
158 static int ykd_state_send_msg (enum totem_callback_token_type type, void *context)
160 struct iovec iovec[2];
161 struct ykd_header header;
162 int res;
164 header.id = YKD_HEADER_SENDSTATE;
166 iovec[0].iov_base = (char *)&header;
167 iovec[0].iov_len = sizeof (struct ykd_header);
168 iovec[1].iov_base = (char *)&ykd_state;
169 iovec[1].iov_len = sizeof (struct ykd_state);
171 res = totempg_groups_mcast_joined (ykd_group_handle, iovec, 2,
172 TOTEMPG_AGREED);
174 return (res);
177 static void ykd_state_send (void)
179 totempg_callback_token_create (
180 &ykd_state_send_callback_token_handle,
181 TOTEM_CALLBACK_TOKEN_SENT,
182 1, /* delete after callback */
183 ykd_state_send_msg,
184 NULL);
187 static int ykd_attempt_send_msg (enum totem_callback_token_type type, void *context)
189 struct iovec iovec;
190 struct ykd_header header;
191 int res;
193 header.id = YKD_HEADER_SENDSTATE;
195 iovec.iov_base = (char *)&header;
196 iovec.iov_len = sizeof (struct ykd_header);
198 res = totempg_groups_mcast_joined (ykd_group_handle, &iovec, 1,
199 TOTEMPG_AGREED);
201 return (res);
204 static void ykd_attempt_send (void)
206 totempg_callback_token_create (
207 &ykd_attempt_send_callback_token_handle,
208 TOTEM_CALLBACK_TOKEN_SENT,
209 1, /* delete after callback */
210 ykd_attempt_send_msg,
211 NULL);
214 static void compute (void)
216 int i;
217 int j;
219 session_id_max = 0;
220 last_primary_max = &state_received_process[0].ykd_state.last_primary;
221 ambiguous_sessions_max_entries = 0;
223 for (i = 0; i < state_received_process_entries; i++) {
225 * Calculate maximum session id
227 if (state_received_process[i].ykd_state.session_id > session_id_max) {
228 session_id_max = state_received_process[i].ykd_state.session_id;
232 * Calculate maximum primary id
234 if (state_received_process[i].ykd_state.last_primary.session_id > last_primary_max->session_id) {
235 last_primary_max = &state_received_process[i].ykd_state.last_primary;
239 * generate the maximum ambiguous sessions list
241 for (j = 0; j < state_received_process[i].ykd_state.ambiguous_sessions_entries; j++) {
242 if (state_received_process[i].ykd_state.ambiguous_sessions[j].session_id > last_primary_max->session_id) {
243 memcpy (&ambiguous_sessions_max[ambiguous_sessions_max_entries],
244 &state_received_process[i].ykd_state.ambiguous_sessions[j],
245 sizeof (struct ykd_session));
246 ambiguous_sessions_max_entries += 1;
252 static int subquorum (
253 unsigned int *member_list,
254 int member_list_entries,
255 struct ykd_session *session)
257 int intersections = 0;
258 int i;
259 int j;
261 for (i = 0; i < member_list_entries; i++) {
262 for (j = 0; j < session->member_list_entries; j++) {
263 if (member_list[i] == session->member_list[j]) {
264 intersections += 1;
270 * even split
272 if (intersections == (session->member_list_entries - intersections)) {
273 return (1);
274 } else
277 * majority split
279 if (intersections > (session->member_list_entries - intersections)) {
280 return (1);
282 return (0);
285 static int decide (void)
287 int i;
290 * Determine if there is a subquorum
292 if (subquorum (view_list, view_list_entries, last_primary_max) == 0) {
293 return (0);
296 for (i = 0; i < ambiguous_sessions_max_entries; i++) {
297 if (subquorum (view_list, view_list_entries, &ambiguous_sessions_max[i]) == 0) {
298 return (0);
302 return (1);
305 static void ykd_session_endian_convert (struct ykd_session *ykd_session)
307 int i;
309 ykd_session->member_list_entries =
310 swab32 (ykd_session->member_list_entries);
311 ykd_session->session_id = swab32 (ykd_session->session_id);
312 for (i = 0; i < ykd_session->member_list_entries; i++) {
313 ykd_session->member_list[i] =
314 swab32 (ykd_session->member_list[i]);
318 static void ykd_state_endian_convert (struct ykd_state *ykd_state)
320 int i;
322 ykd_session_endian_convert (&ykd_state->last_primary);
323 ykd_state->last_formed_entries = swab32 (ykd_state->last_formed_entries);
324 ykd_state->ambiguous_sessions_entries = swab32 (ykd_state->ambiguous_sessions_entries);
325 ykd_state->session_id = swab32 (ykd_state->session_id);
327 for (i = 0; i < ykd_state->last_formed_entries; i++) {
328 ykd_session_endian_convert (&ykd_state->last_formed[i]);
331 for (i = 0; i < ykd_state->ambiguous_sessions_entries; i++) {
332 ykd_session_endian_convert (&ykd_state->ambiguous_sessions[i]);
336 static void ykd_deliver_fn (
337 unsigned int nodeid,
338 struct iovec *iovec,
339 int iov_len,
340 int endian_conversion_required)
342 int all_received = 1;
343 int state_position = 0;
344 int i;
345 char *msg_state = iovec->iov_base + sizeof (struct ykd_header);
348 * If this is a localhost address, this node is always primary
350 #ifdef TODO
351 if (totemip_localhost_check (source_addr)) {
352 log_printf (LOG_LEVEL_NOTICE,
353 "This processor is within the primary component.\n");
354 primary_designated = 1;
356 ykd_primary_callback_fn (
357 view_list,
358 view_list_entries,
359 primary_designated,
360 &ykd_ring_id);
361 return;
363 #endif
364 if (endian_conversion_required &&
365 (iovec->iov_len > sizeof (struct ykd_header))) {
366 ykd_state_endian_convert ((struct ykd_state *)msg_state);
370 * Set completion for source_addr's address
372 for (state_position = 0; state_position < state_received_confchg_entries; state_position++) {
373 if (nodeid == state_received_process[state_position].nodeid) {
375 * State position contains the address of the state to modify
376 * This may be used later by the other algorithms
378 state_received_process[state_position].received = 1;
379 break;
384 * Test if all nodes have submitted their state data
386 for (i = 0; i < state_received_confchg_entries; i++) {
387 if (state_received_process[i].received == 0) {
388 all_received = 0;
392 switch (ykd_mode) {
393 case YKD_MODE_SENDSTATE:
394 assert (iovec->iov_len > sizeof (struct ykd_header));
396 * Copy state information for the sending processor
398 memcpy (&state_received_process[state_position].ykd_state,
399 msg_state, sizeof (struct ykd_state));
402 * Try to form a component
404 if (all_received) {
405 for (i = 0; i < state_received_confchg_entries; i++) {
406 state_received_process[i].received = 0;
408 ykd_mode = YKD_MODE_ATTEMPT;
410 // TODO resolve optimizes for failure conditions during ykd calculation
411 // resolve();
412 compute();
414 if (decide ()) {
415 ykd_state.session_id = session_id_max + 1;
416 memcpy (ykd_state.ambiguous_sessions[ykd_state.ambiguous_sessions_entries].member_list,
417 view_list, sizeof (unsigned int) * view_list_entries);
418 ykd_state.ambiguous_sessions[ykd_state.ambiguous_sessions_entries].member_list_entries = view_list_entries;
419 ykd_state.ambiguous_sessions_entries += 1;
420 ykd_attempt_send();
423 break;
425 case YKD_MODE_ATTEMPT:
426 if (all_received) {
427 log_printf (LOG_LEVEL_NOTICE,
428 "This processor is within the primary component.\n");
429 primary_designated = 1;
431 ykd_primary_callback_fn (
432 view_list,
433 view_list_entries,
434 primary_designated,
435 &ykd_ring_id);
437 memcpy (ykd_state.last_primary.member_list, view_list, sizeof (view_list));
438 ykd_state.last_primary.member_list_entries = view_list_entries;
439 ykd_state.last_primary.session_id = ykd_state.session_id;
440 ykd_state.ambiguous_sessions_entries = 0;
442 break;
446 int first_run = 1;
447 static void ykd_confchg_fn (
448 enum totem_configuration_type configuration_type,
449 unsigned int *member_list, int member_list_entries,
450 unsigned int *left_list, int left_list_entries,
451 unsigned int *joined_list, int joined_list_entries,
452 struct memb_ring_id *ring_id)
454 int i;
456 if (configuration_type != TOTEM_CONFIGURATION_REGULAR) {
457 return;
460 memcpy (&ykd_ring_id, ring_id, sizeof (struct memb_ring_id));
462 if (first_run) {
463 ykd_state.last_primary.member_list[0] = totempg_my_nodeid_get();
464 ykd_state.last_primary.member_list_entries = 1;
465 ykd_state.last_primary.session_id = 0;
466 first_run = 0;
468 memcpy (view_list, member_list,
469 member_list_entries * sizeof (unsigned int));
470 view_list_entries = member_list_entries;
472 ykd_mode = YKD_MODE_SENDSTATE;
474 primary_designated = 0;
476 ykd_primary_callback_fn (
477 view_list,
478 view_list_entries,
479 primary_designated,
480 &ykd_ring_id);
482 memset (&state_received_confchg, 0, sizeof (state_received_confchg));
483 for (i = 0; i < member_list_entries; i++) {
484 state_received_confchg[i].nodeid = member_list[i];
485 state_received_confchg[i].received = 0;
487 memcpy (state_received_process, state_received_confchg,
488 sizeof (state_received_confchg));
490 state_received_confchg_entries = member_list_entries;
491 state_received_process_entries = member_list_entries;
493 ykd_state_send ();
496 struct totempg_group ykd_group = {
497 .group = "ykd",
498 .group_len = 3
501 static int ykd_init (
502 void (*primary_callback_fn) (
503 unsigned int *view_list,
504 int view_list_entries,
505 int primary_designated,
506 struct memb_ring_id *ring_id))
508 ykd_primary_callback_fn = primary_callback_fn;
510 totempg_groups_initialize (
511 &ykd_group_handle,
512 ykd_deliver_fn,
513 ykd_confchg_fn);
515 totempg_groups_join (
516 ykd_group_handle,
517 &ykd_group,
520 ykd_state_init ();
522 return (0);
526 * Returns 1 if this processor is in the primary
528 static int ykd_primary (void) {
529 return (primary_designated);
533 * lcrso object definition
535 static struct openais_vsf_iface_ver0 vsf_ykd_iface_ver0 = {
536 .init = ykd_init,
537 .primary = ykd_primary
540 static struct lcr_iface openais_vsf_ykd_ver0[1] = {
542 .name = "openais_vsf_ykd",
543 .version = 0,
544 .versions_replace = 0,
545 .versions_replace_count = 0,
546 .dependencies = 0,
547 .dependency_count = 0,
548 .constructor = NULL,
549 .destructor = NULL,
550 .interfaces = (void **)(void *)&vsf_ykd_iface_ver0,
554 static struct lcr_comp vsf_ykd_comp_ver0 = {
555 .iface_count = 1,
556 .ifaces = openais_vsf_ykd_ver0
559 __attribute__ ((constructor)) static void vsf_ykd_comp_register (void) {
560 lcr_component_register (&vsf_ykd_comp_ver0);