Allow make from the exec directory.
[openais.git] / exec / totempg.c
blob34f5f81571238abeba675ffa3ac1e67c3fa3cad0
1 /*
2 * Copyright (c) 2003-2005 MontaVista Software, Inc.
3 * Copyright (c) 2005 OSDL.
4 * Copyright (c) 2006-2007 Red Hat, Inc.
5 * Copyright (c) 2006 Sun Microsystems, Inc.
7 * All rights reserved.
9 * Author: Steven Dake (sdake@redhat.com)
10 * Mark Haverkamp (markh@osdl.org)
12 * This software licensed under BSD license, the text of which follows:
14 * Redistribution and use in source and binary forms, with or without
15 * modification, are permitted provided that the following conditions are met:
17 * - Redistributions of source code must retain the above copyright notice,
18 * this list of conditions and the following disclaimer.
19 * - Redistributions in binary form must reproduce the above copyright notice,
20 * this list of conditions and the following disclaimer in the documentation
21 * and/or other materials provided with the distribution.
22 * - Neither the name of the MontaVista Software, Inc. nor the names of its
23 * contributors may be used to endorse or promote products derived from this
24 * software without specific prior written permission.
26 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
27 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
28 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
29 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
30 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
31 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
32 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
33 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
34 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
35 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
36 * THE POSSIBILITY OF SUCH DAMAGE.
40 * FRAGMENTATION AND PACKING ALGORITHM:
42 * Assemble the entire message into one buffer
43 * if full fragment
44 * store fragment into lengths list
45 * for each full fragment
46 * multicast fragment
47 * set length and fragment fields of pg mesage
48 * store remaining multicast into head of fragmentation data and set lens field
50 * If a message exceeds the maximum packet size allowed by the totem
51 * single ring protocol, the protocol could lose forward progress.
52 * Statically calculating the allowed data amount doesn't work because
53 * the amount of data allowed depends on the number of fragments in
54 * each message. In this implementation, the maximum fragment size
55 * is dynamically calculated for each fragment added to the message.
57 * It is possible for a message to be two bytes short of the maximum
58 * packet size. This occurs when a message or collection of
59 * messages + the mcast header + the lens are two bytes short of the
60 * end of the packet. Since another len field consumes two bytes, the
61 * len field would consume the rest of the packet without room for data.
63 * One optimization would be to forgo the final len field and determine
64 * it from the size of the udp datagram. Then this condition would no
65 * longer occur.
69 * ASSEMBLY AND UNPACKING ALGORITHM:
71 * copy incoming packet into assembly data buffer indexed by current
72 * location of end of fragment
74 * if not fragmented
75 * deliver all messages in assembly data buffer
76 * else
77 * if msg_count > 1 and fragmented
78 * deliver all messages except last message in assembly data buffer
79 * copy last fragmented section to start of assembly data buffer
80 * else
81 * if msg_count = 1 and fragmented
82 * do nothing
86 #ifndef OPENAIS_BSD
87 #include <alloca.h>
88 #endif
89 #include <netinet/in.h>
90 #include <sys/uio.h>
91 #include <stdio.h>
92 #include <stdlib.h>
93 #include <string.h>
94 #include <assert.h>
95 #include <pthread.h>
96 #include <errno.h>
98 #include "swab.h"
99 #include "../include/hdb.h"
100 #include "../include/list.h"
101 #include "totempg.h"
102 #include "totemmrp.h"
103 #include "totemsrp.h"
104 #include "swab.h"
106 #define min(a,b) ((a) < (b)) ? a : b
108 struct totempg_mcast_header {
109 short version;
110 short type;
115 * totempg_mcast structure
117 * header: Identify the mcast.
118 * fragmented: Set if this message continues into next message
119 * continuation: Set if this message is a continuation from last message
120 * msg_count Indicates how many packed messages are contained
121 * in the mcast.
122 * Also, the size of each packed message and the messages themselves are
123 * appended to the end of this structure when sent.
125 struct totempg_mcast {
126 struct totempg_mcast_header header;
127 unsigned char fragmented;
128 unsigned char continuation;
129 unsigned short msg_count;
131 * short msg_len[msg_count];
134 * data for messages
139 * Maximum packet size for totem pg messages
141 #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
142 sizeof (struct totempg_mcast))
145 * Local variables used for packing small messages
147 static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
149 static int mcast_packed_msg_count = 0;
152 * Function and data used to log messages
154 static int totempg_log_level_security;
155 static int totempg_log_level_error;
156 static int totempg_log_level_warning;
157 static int totempg_log_level_notice;
158 static int totempg_log_level_debug;
159 static void (*totempg_log_printf) (char *file, int line, int level, char *format, ...) __attribute__((format(printf, 4, 5))) = NULL;
161 struct totem_config *totempg_totem_config;
163 struct assembly {
164 unsigned int nodeid;
165 unsigned char data[MESSAGE_SIZE_MAX];
166 int index;
167 unsigned char last_frag_num;
168 struct list_head list;
171 DECLARE_LIST_INIT(assembly_list_inuse);
173 DECLARE_LIST_INIT(assembly_list_free);
176 * Staging buffer for packed messages. Messages are staged in this buffer
177 * before sending. Multiple messages may fit which cuts down on the
178 * number of mcasts sent. If a message doesn't completely fit, then
179 * the mcast header has a fragment bit set that says that there are more
180 * data to follow. fragment_size is an index into the buffer. It indicates
181 * the size of message data and where to place new message data.
182 * fragment_contuation indicates whether the first packed message in
183 * the buffer is a continuation of a previously packed fragment.
185 static char *fragmentation_data;
187 static int fragment_size = 0;
189 static int fragment_continuation = 0;
191 static struct iovec iov_delv;
193 static unsigned int totempg_max_handle = 0;
195 struct totempg_group_instance {
196 void (*deliver_fn) (
197 unsigned int nodeid,
198 struct iovec *iovec,
199 int iov_len,
200 int endian_conversion_required);
202 void (*confchg_fn) (
203 enum totem_configuration_type configuration_type,
204 unsigned int *member_list, int member_list_entries,
205 unsigned int *left_list, int left_list_entries,
206 unsigned int *joined_list, int joined_list_entries,
207 struct memb_ring_id *ring_id);
209 struct totempg_group *groups;
211 int groups_cnt;
214 static struct hdb_handle_database totempg_groups_instance_database = {
215 .handle_count = 0,
216 .handles = 0,
217 .iterator = 0,
218 .mutex = PTHREAD_MUTEX_INITIALIZER
221 static int send_ok (int msg_size);
223 static unsigned char next_fragment = 1;
225 static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
227 static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER;
229 static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER;
231 #define log_printf(level, format, args...) \
232 totempg_log_printf (__FILE__, __LINE__, level, format, ##args)
234 static struct assembly *assembly_ref (unsigned int nodeid)
236 struct assembly *assembly;
237 struct list_head *list;
240 * Search inuse list for node id and return assembly buffer if found
242 for (list = assembly_list_inuse.next;
243 list != &assembly_list_inuse;
244 list = list->next) {
246 assembly = list_entry (list, struct assembly, list);
248 if (nodeid == assembly->nodeid) {
249 return (assembly);
254 * Nothing found in inuse list get one from free list if available
256 if (list_empty (&assembly_list_free) == 0) {
257 assembly = list_entry (assembly_list_free.next, struct assembly, list);
258 list_del (&assembly->list);
259 list_add (&assembly->list, &assembly_list_inuse);
260 assembly->nodeid = nodeid;
261 return (assembly);
265 * Nothing available in inuse or free list, so allocate a new one
267 assembly = malloc (sizeof (struct assembly));
268 memset (assembly, 0, sizeof (struct assembly));
270 * TODO handle memory allocation failure here
272 assert (assembly);
273 assembly->nodeid = nodeid;
274 list_init (&assembly->list);
275 list_add (&assembly->list, &assembly_list_inuse);
277 return (assembly);
280 void assembly_deref (struct assembly *assembly)
282 list_del (&assembly->list);
283 list_add (&assembly->list, &assembly_list_free);
286 static inline void app_confchg_fn (
287 enum totem_configuration_type configuration_type,
288 unsigned int *member_list, int member_list_entries,
289 unsigned int *left_list, int left_list_entries,
290 unsigned int *joined_list, int joined_list_entries,
291 struct memb_ring_id *ring_id)
293 int i;
294 struct totempg_group_instance *instance;
295 unsigned int res;
297 for (i = 0; i <= totempg_max_handle; i++) {
298 res = hdb_handle_get (&totempg_groups_instance_database,
299 i, (void *)&instance);
301 if (res == 0) {
302 if (instance->confchg_fn) {
303 instance->confchg_fn (
304 configuration_type,
305 member_list,
306 member_list_entries,
307 left_list,
308 left_list_entries,
309 joined_list,
310 joined_list_entries,
311 ring_id);
314 hdb_handle_put (&totempg_groups_instance_database, i);
319 static inline void group_endian_convert (
320 struct iovec *iovec)
322 unsigned short *group_len;
323 int i;
325 group_len = (unsigned short *)iovec->iov_base;
326 group_len[0] = swab16(group_len[0]);
327 for (i = 1; i < group_len[0] + 1; i++) {
328 group_len[i] = swab16(group_len[i]);
333 static inline int group_matches (
334 struct iovec *iovec,
335 unsigned int iov_len,
336 struct totempg_group *groups_b,
337 unsigned int group_b_cnt,
338 unsigned int *adjust_iovec)
340 unsigned short *group_len;
341 char *group_name;
342 int i;
343 int j;
344 #ifdef __sparc
345 struct iovec iovec_aligned = { NULL, 0 };
346 #endif
348 assert (iov_len == 1);
350 #ifdef __sparc
351 if ((size_t)iovec->iov_base % 4 != 0) {
352 iovec_aligned.iov_base = alloca(iovec->iov_len);
353 memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len);
354 iovec_aligned.iov_len = iovec->iov_len;
355 iovec = &iovec_aligned;
357 #endif
359 group_len = (unsigned short *)iovec->iov_base;
360 group_name = ((char *)iovec->iov_base) +
361 sizeof (unsigned short) * (group_len[0] + 1);
364 * Calculate amount to adjust the iovec by before delivering to app
366 *adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1);
367 for (i = 1; i < group_len[0] + 1; i++) {
368 *adjust_iovec += group_len[i];
372 * Determine if this message should be delivered to this instance
374 for (i = 1; i < group_len[0] + 1; i++) {
375 for (j = 0; j < group_b_cnt; j++) {
376 if ((group_len[i] == groups_b[j].group_len) &&
377 (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) {
378 return (1);
381 group_name += group_len[i];
383 return (0);
387 static inline void app_deliver_fn (
388 unsigned int nodeid,
389 struct iovec *iovec,
390 unsigned int iov_len,
391 int endian_conversion_required)
393 int i;
394 struct totempg_group_instance *instance;
395 struct iovec stripped_iovec;
396 #ifdef __sparc
397 struct iovec aligned_iovec = { NULL, 0 };
398 #endif
399 unsigned int adjust_iovec;
400 unsigned int res;
402 if (endian_conversion_required) {
403 #ifdef __sparc
404 if ((size_t)iovec->iov_base % 4 != 0) {
405 /* Deal with misalignment */
406 aligned_iovec.iov_base = alloca(iovec->iov_len);
407 aligned_iovec.iov_len = iovec->iov_len;
408 memcpy(aligned_iovec.iov_base, iovec->iov_base,
409 iovec->iov_len);
410 iovec = &aligned_iovec;
412 #endif
413 group_endian_convert (iovec);
415 for (i = 0; i <= totempg_max_handle; i++) {
416 res = hdb_handle_get (&totempg_groups_instance_database,
417 i, (void *)&instance);
419 if (res == 0) {
420 assert (iov_len == 1);
421 if (group_matches (iovec, iov_len, instance->groups,
422 instance->groups_cnt, &adjust_iovec)) {
423 stripped_iovec.iov_len =
424 iovec->iov_len - adjust_iovec;
425 #ifndef __sparc
426 stripped_iovec.iov_base =
427 (char *)iovec->iov_base + adjust_iovec;
428 #else
429 if (iovec->iov_base + adjust_iovec % 4 != 0) {
430 /* Deal with misalignment */
432 * XXX Using alloca() is dangerous,
433 * since it may be called multiple
434 * times within the for() loop
436 stripped_iovec.iov_base = alloca(
437 stripped_iovec.iov_len);
438 memcpy(stripped_iovec.iov_base,
439 iovec->iov_base + adjust_iovec,
440 stripped_iovec.iov_len);
442 #endif
443 instance->deliver_fn (
444 nodeid,
445 &stripped_iovec,
446 iov_len,
447 endian_conversion_required);
450 hdb_handle_put (&totempg_groups_instance_database, i);
454 static void totempg_confchg_fn (
455 enum totem_configuration_type configuration_type,
456 unsigned int *member_list, int member_list_entries,
457 unsigned int *left_list, int left_list_entries,
458 unsigned int *joined_list, int joined_list_entries,
459 struct memb_ring_id *ring_id)
461 // TODO optimize this
462 app_confchg_fn (configuration_type,
463 member_list, member_list_entries,
464 left_list, left_list_entries,
465 joined_list, joined_list_entries,
466 ring_id);
469 static void totempg_deliver_fn (
470 unsigned int nodeid,
471 struct iovec *iovec,
472 int iov_len,
473 int endian_conversion_required)
475 struct totempg_mcast *mcast;
476 unsigned short *msg_lens;
477 int i;
478 struct assembly *assembly;
479 char header[FRAME_SIZE_MAX];
480 int h_index;
481 int a_i = 0;
482 int msg_count;
483 int continuation;
484 int start;
486 assembly = assembly_ref (nodeid);
487 assert (assembly);
490 * Assemble the header into one block of data and
491 * assemble the packet contents into one block of data to simplify delivery
493 if (iov_len == 1) {
495 * This message originated from external processor
496 * because there is only one iovec for the full msg.
498 char *data;
499 int datasize;
501 mcast = (struct totempg_mcast *)iovec[0].iov_base;
502 if (endian_conversion_required) {
503 mcast->msg_count = swab16 (mcast->msg_count);
506 msg_count = mcast->msg_count;
507 datasize = sizeof (struct totempg_mcast) +
508 msg_count * sizeof (unsigned short);
510 assert (iovec[0].iov_len >= datasize);
512 memcpy (header, iovec[0].iov_base, datasize);
513 assert(iovec);
514 data = iovec[0].iov_base;
516 msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast));
517 if (endian_conversion_required) {
518 for (i = 0; i < mcast->msg_count; i++) {
519 msg_lens[i] = swab16 (msg_lens[i]);
523 memcpy (&assembly->data[assembly->index], &data[datasize],
524 iovec[0].iov_len - datasize);
525 } else {
527 * The message originated from local processor
528 * because there is greater than one iovec for then full msg.
530 h_index = 0;
531 for (i = 0; i < 2; i++) {
532 memcpy (&header[h_index], iovec[i].iov_base, iovec[i].iov_len);
533 h_index += iovec[i].iov_len;
536 mcast = (struct totempg_mcast *)header;
537 // TODO make sure we are using a copy of mcast not the actual data itself
539 msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast));
541 for (i = 2; i < iov_len; i++) {
542 a_i = assembly->index;
543 assert (iovec[i].iov_len + a_i <= MESSAGE_SIZE_MAX);
544 memcpy (&assembly->data[a_i], iovec[i].iov_base, iovec[i].iov_len);
545 a_i += msg_lens[i - 2];
547 iov_len -= 2;
551 * If the last message in the buffer is a fragment, then we
552 * can't deliver it. We'll first deliver the full messages
553 * then adjust the assembly buffer so we can add the rest of the
554 * fragment when it arrives.
556 msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count;
557 continuation = mcast->continuation;
558 iov_delv.iov_base = (char *)&assembly->data[0];
559 iov_delv.iov_len = assembly->index + msg_lens[0];
562 * Make sure that if this message is a continuation, that it
563 * matches the sequence number of the previous fragment.
564 * Also, if the first packed message is a continuation
565 * of a previous message, but the assembly buffer
566 * is empty, then we need to discard it since we can't
567 * assemble a complete message. Likewise, if this message isn't a
568 * continuation and the assembly buffer is empty, we have to discard
569 * the continued message.
571 start = 0;
572 if (continuation) {
574 if (continuation != assembly->last_frag_num) {
575 log_printf (totempg_log_level_error,
576 "Message continuation doesn't match previous frag e: %u - a: %u\n",
577 assembly->last_frag_num, continuation);
578 continuation = 0;
581 if ((assembly->index == 0) ||
582 (!continuation && assembly->index)) {
583 log_printf (totempg_log_level_error,
584 "Throwing away broken message: continuation %u, index %u\n",
585 continuation, assembly->index);
586 continuation = 0;
590 * we decided to throw away the first continued message
591 * in this buffer, if continuation was set to zero.
593 if (!continuation) {
594 assembly->index += msg_lens[0];
595 iov_delv.iov_base = (char *)&assembly->data[assembly->index];
596 iov_delv.iov_len = msg_lens[1];
597 start = 1;
602 for (i = start; i < msg_count; i++) {
603 app_deliver_fn(nodeid, &iov_delv, 1,
604 endian_conversion_required);
605 assembly->index += msg_lens[i];
606 iov_delv.iov_base = (char *)&assembly->data[assembly->index];
607 if (i < (msg_count - 1)) {
608 iov_delv.iov_len = msg_lens[i + 1];
612 if (mcast->fragmented == 0) {
614 * End of messages, dereference assembly struct
616 assembly->last_frag_num = 0;
617 assembly->index = 0;
618 assembly_deref (assembly);
619 } else {
621 * Message is fragmented, keep around assembly list
623 assembly->last_frag_num = mcast->fragmented;
624 if (mcast->msg_count > 1) {
625 memmove (&assembly->data[0],
626 &assembly->data[assembly->index],
627 msg_lens[msg_count]);
629 assembly->index = 0;
631 assembly->index += msg_lens[msg_count];
636 * Totem Process Group Abstraction
637 * depends on poll abstraction, POSIX, IPV4
640 void *callback_token_received_handle;
642 int callback_token_received_fn (enum totem_callback_token_type type,
643 void *data)
645 struct totempg_mcast mcast;
646 struct iovec iovecs[3];
647 int res;
649 pthread_mutex_lock (&mcast_msg_mutex);
650 if (mcast_packed_msg_count == 0) {
651 pthread_mutex_unlock (&mcast_msg_mutex);
652 return (0);
654 if (totemmrp_avail() == 0) {
655 pthread_mutex_unlock (&mcast_msg_mutex);
656 return (0);
658 mcast.fragmented = 0;
661 * Was the first message in this buffer a continuation of a
662 * fragmented message?
664 mcast.continuation = fragment_continuation;
665 fragment_continuation = 0;
667 mcast.msg_count = mcast_packed_msg_count;
669 iovecs[0].iov_base = (char *)&mcast;
670 iovecs[0].iov_len = sizeof (struct totempg_mcast);
671 iovecs[1].iov_base = (char *)mcast_packed_msg_lens;
672 iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short);
673 iovecs[2].iov_base = &fragmentation_data[0];
674 iovecs[2].iov_len = fragment_size;
675 res = totemmrp_mcast (iovecs, 3, 0);
677 mcast_packed_msg_count = 0;
678 fragment_size = 0;
680 pthread_mutex_unlock (&mcast_msg_mutex);
681 return (0);
685 * Initialize the totem process group abstraction
687 int totempg_initialize (
688 poll_handle poll_handle,
689 struct totem_config *totem_config)
691 int res;
693 totempg_totem_config = totem_config;
694 totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security;
695 totempg_log_level_error = totem_config->totem_logging_configuration.log_level_error;
696 totempg_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
697 totempg_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
698 totempg_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
699 totempg_log_printf = totem_config->totem_logging_configuration.log_printf;
701 fragmentation_data = malloc (TOTEMPG_PACKET_SIZE);
702 if (fragmentation_data == 0) {
703 return (-1);
706 res = totemmrp_initialize (
707 poll_handle,
708 totem_config,
709 totempg_deliver_fn,
710 totempg_confchg_fn);
712 totemmrp_callback_token_create (
713 &callback_token_received_handle,
714 TOTEM_CALLBACK_TOKEN_RECEIVED,
716 callback_token_received_fn,
719 totemsrp_net_mtu_adjust (totem_config);
721 return (res);
724 void totempg_finalize (void)
726 pthread_mutex_lock (&totempg_mutex);
727 totemmrp_finalize ();
728 pthread_mutex_unlock (&totempg_mutex);
732 * Multicast a message
734 static int mcast_msg (
735 struct iovec *iovec,
736 int iov_len,
737 int guarantee)
739 int res = 0;
740 struct totempg_mcast mcast;
741 struct iovec iovecs[3];
742 int i;
743 int max_packet_size = 0;
744 int copy_len = 0;
745 int copy_base = 0;
746 int total_size = 0;
748 pthread_mutex_lock (&mcast_msg_mutex);
749 totemmrp_new_msg_signal ();
751 max_packet_size = TOTEMPG_PACKET_SIZE -
752 (sizeof (unsigned short) * (mcast_packed_msg_count + 1));
754 mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
757 * Check if we would overwrite new message queue
759 for (i = 0; i < iov_len; i++) {
760 total_size += iovec[i].iov_len;
763 if (send_ok (total_size + sizeof(unsigned short) *
764 (mcast_packed_msg_count+1)) == 0) {
766 pthread_mutex_unlock (&mcast_msg_mutex);
767 return(-1);
770 for (i = 0; i < iov_len; ) {
771 mcast.fragmented = 0;
772 mcast.continuation = fragment_continuation;
773 copy_len = iovec[i].iov_len - copy_base;
776 * If it all fits with room left over, copy it in.
777 * We need to leave at least sizeof(short) + 1 bytes in the
778 * fragment_buffer on exit so that max_packet_size + fragment_size
779 * doesn't exceed the size of the fragment_buffer on the next call.
781 if ((copy_len + fragment_size) <
782 (max_packet_size - sizeof (unsigned short))) {
784 memcpy (&fragmentation_data[fragment_size],
785 iovec[i].iov_base + copy_base, copy_len);
786 fragment_size += copy_len;
787 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
788 copy_len = 0;
789 copy_base = 0;
790 i++;
791 continue;
794 * If it just fits or is too big, then send out what fits.
796 } else {
797 char *data_ptr;
799 copy_len = min(copy_len, max_packet_size - fragment_size);
800 if( copy_len == max_packet_size )
801 data_ptr = iovec[i].iov_base + copy_base;
802 else {
803 data_ptr = fragmentation_data;
804 memcpy (&fragmentation_data[fragment_size],
805 iovec[i].iov_base + copy_base, copy_len);
808 memcpy (&fragmentation_data[fragment_size],
809 iovec[i].iov_base + copy_base, copy_len);
810 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
813 * if we're not on the last iovec or the iovec is too large to
814 * fit, then indicate a fragment. This also means that the next
815 * message will have the continuation of this one.
817 if ((i < (iov_len - 1)) ||
818 ((copy_base + copy_len) < iovec[i].iov_len)) {
819 if (!next_fragment) {
820 next_fragment++;
822 fragment_continuation = next_fragment;
823 mcast.fragmented = next_fragment++;
824 assert(fragment_continuation != 0);
825 assert(mcast.fragmented != 0);
826 } else {
827 fragment_continuation = 0;
831 * assemble the message and send it
833 mcast.msg_count = ++mcast_packed_msg_count;
834 iovecs[0].iov_base = (char *)&mcast;
835 iovecs[0].iov_len = sizeof(struct totempg_mcast);
836 iovecs[1].iov_base = (char *)mcast_packed_msg_lens;
837 iovecs[1].iov_len = mcast_packed_msg_count *
838 sizeof(unsigned short);
839 iovecs[2].iov_base = data_ptr;
840 iovecs[2].iov_len = max_packet_size;
841 assert (totemmrp_avail() > 0);
842 res = totemmrp_mcast (iovecs, 3, guarantee);
845 * Recalculate counts and indexes for the next.
847 mcast_packed_msg_lens[0] = 0;
848 mcast_packed_msg_count = 0;
849 fragment_size = 0;
850 max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short));
853 * If the iovec all fit, go to the next iovec
855 if ((copy_base + copy_len) == iovec[i].iov_len) {
856 copy_len = 0;
857 copy_base = 0;
858 i++;
861 * Continue with the rest of the current iovec.
863 } else {
864 copy_base += copy_len;
870 * Bump only if we added message data. This may be zero if
871 * the last buffer just fit into the fragmentation_data buffer
872 * and we were at the last iovec.
874 if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
875 mcast_packed_msg_count++;
878 pthread_mutex_unlock (&mcast_msg_mutex);
879 return (res);
883 * Determine if a message of msg_size could be queued
885 static int send_ok (
886 int msg_size)
888 int avail = 0;
889 int total;
891 avail = totemmrp_avail ();
894 * msg size less then totempg_totem_config->net_mtu - 25 will take up
895 * a full message, so add +1
896 * totempg_totem_config->net_mtu - 25 is for the totempg_mcast header
898 total = (msg_size / (totempg_totem_config->net_mtu - 25)) + 1;
900 return (avail >= total);
903 int totempg_callback_token_create (
904 void **handle_out,
905 enum totem_callback_token_type type,
906 int delete,
907 int (*callback_fn) (enum totem_callback_token_type type, void *),
908 void *data)
910 unsigned int res;
911 pthread_mutex_lock (&callback_token_mutex);
912 res = totemmrp_callback_token_create (handle_out, type, delete,
913 callback_fn, data);
914 pthread_mutex_unlock (&callback_token_mutex);
915 return (res);
918 void totempg_callback_token_destroy (
919 void *handle_out)
921 pthread_mutex_lock (&callback_token_mutex);
922 totemmrp_callback_token_destroy (handle_out);
923 pthread_mutex_unlock (&callback_token_mutex);
927 * vi: set autoindent tabstop=4 shiftwidth=4 :
930 int totempg_groups_initialize (
931 totempg_groups_handle *handle,
933 void (*deliver_fn) (
934 unsigned int nodeid,
935 struct iovec *iovec,
936 int iov_len,
937 int endian_conversion_required),
939 void (*confchg_fn) (
940 enum totem_configuration_type configuration_type,
941 unsigned int *member_list, int member_list_entries,
942 unsigned int *left_list, int left_list_entries,
943 unsigned int *joined_list, int joined_list_entries,
944 struct memb_ring_id *ring_id))
946 struct totempg_group_instance *instance;
947 unsigned int res;
949 pthread_mutex_lock (&totempg_mutex);
950 res = hdb_handle_create (&totempg_groups_instance_database,
951 sizeof (struct totempg_group_instance), handle);
952 if (res != 0) {
953 goto error_exit;
956 if (*handle > totempg_max_handle) {
957 totempg_max_handle = *handle;
960 res = hdb_handle_get (&totempg_groups_instance_database, *handle,
961 (void *)&instance);
962 if (res != 0) {
963 goto error_destroy;
966 instance->deliver_fn = deliver_fn;
967 instance->confchg_fn = confchg_fn;
968 instance->groups = 0;
969 instance->groups_cnt = 0;
972 hdb_handle_put (&totempg_groups_instance_database, *handle);
974 pthread_mutex_unlock (&totempg_mutex);
975 return (0);
976 error_destroy:
977 hdb_handle_destroy (&totempg_groups_instance_database, *handle);
979 error_exit:
980 pthread_mutex_unlock (&totempg_mutex);
981 return (-1);
984 int totempg_groups_join (
985 totempg_groups_handle handle,
986 struct totempg_group *groups,
987 int group_cnt)
989 struct totempg_group_instance *instance;
990 struct totempg_group *new_groups;
991 unsigned int res;
993 pthread_mutex_lock (&totempg_mutex);
994 res = hdb_handle_get (&totempg_groups_instance_database, handle,
995 (void *)&instance);
996 if (res != 0) {
997 goto error_exit;
1000 new_groups = realloc (instance->groups,
1001 sizeof (struct totempg_group) *
1002 (instance->groups_cnt + group_cnt));
1003 if (new_groups == NULL) {
1004 res = ENOMEM;
1005 goto error_exit;
1007 memcpy (&new_groups[instance->groups_cnt],
1008 groups, group_cnt * sizeof (struct totempg_group));
1009 instance->groups = new_groups;
1010 instance->groups_cnt = instance->groups_cnt = group_cnt;
1012 hdb_handle_put (&totempg_groups_instance_database, handle);
1014 error_exit:
1015 pthread_mutex_unlock (&totempg_mutex);
1016 return (res);
1019 int totempg_groups_leave (
1020 totempg_groups_handle handle,
1021 struct totempg_group *groups,
1022 int group_cnt)
1024 struct totempg_group_instance *instance;
1025 unsigned int res;
1027 pthread_mutex_lock (&totempg_mutex);
1028 res = hdb_handle_get (&totempg_groups_instance_database, handle,
1029 (void *)&instance);
1030 if (res != 0) {
1031 goto error_exit;
1034 hdb_handle_put (&totempg_groups_instance_database, handle);
1036 error_exit:
1037 pthread_mutex_unlock (&totempg_mutex);
1038 return (res);
1041 #define MAX_IOVECS_FROM_APP 32
1042 #define MAX_GROUPS_PER_MSG 32
1044 int totempg_groups_mcast_joined (
1045 totempg_groups_handle handle,
1046 struct iovec *iovec,
1047 int iov_len,
1048 int guarantee)
1050 struct totempg_group_instance *instance;
1051 unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1052 struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1053 int i;
1054 unsigned int res;
1056 pthread_mutex_lock (&totempg_mutex);
1057 res = hdb_handle_get (&totempg_groups_instance_database, handle,
1058 (void *)&instance);
1059 if (res != 0) {
1060 goto error_exit;
1064 * Build group_len structure and the iovec_mcast structure
1066 group_len[0] = instance->groups_cnt;
1067 for (i = 0; i < instance->groups_cnt; i++) {
1068 group_len[i + 1] = instance->groups[i].group_len;
1069 iovec_mcast[i + 1].iov_len = instance->groups[i].group_len;
1070 iovec_mcast[i + 1].iov_base = instance->groups[i].group;
1072 iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short);
1073 iovec_mcast[0].iov_base = (char *)group_len;
1074 for (i = 0; i < iov_len; i++) {
1075 iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len;
1076 iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base;
1079 res = mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee);
1080 hdb_handle_put (&totempg_groups_instance_database, handle);
1082 error_exit:
1083 pthread_mutex_unlock (&totempg_mutex);
1084 return (res);
1087 int totempg_groups_send_ok_joined (
1088 totempg_groups_handle handle,
1089 struct iovec *iovec,
1090 int iov_len)
1092 struct totempg_group_instance *instance;
1093 unsigned int size = 0;
1094 unsigned int i;
1095 unsigned int res;
1097 pthread_mutex_lock (&totempg_mutex);
1098 pthread_mutex_lock (&mcast_msg_mutex);
1099 res = hdb_handle_get (&totempg_groups_instance_database, handle,
1100 (void *)&instance);
1101 if (res != 0) {
1102 goto error_exit;
1105 for (i = 0; i < instance->groups_cnt; i++) {
1106 size += instance->groups[i].group_len;
1108 for (i = 0; i < iov_len; i++) {
1109 size += iovec[i].iov_len;
1112 res = send_ok (size);
1114 hdb_handle_put (&totempg_groups_instance_database, handle);
1116 error_exit:
1117 pthread_mutex_unlock (&mcast_msg_mutex);
1118 pthread_mutex_unlock (&totempg_mutex);
1119 return (res);
1122 int totempg_groups_mcast_groups (
1123 totempg_groups_handle handle,
1124 int guarantee,
1125 struct totempg_group *groups,
1126 int groups_cnt,
1127 struct iovec *iovec,
1128 int iov_len)
1130 struct totempg_group_instance *instance;
1131 unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1132 struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1133 int i;
1134 unsigned int res;
1136 pthread_mutex_lock (&totempg_mutex);
1137 res = hdb_handle_get (&totempg_groups_instance_database, handle,
1138 (void *)&instance);
1139 if (res != 0) {
1140 goto error_exit;
1144 * Build group_len structure and the iovec_mcast structure
1146 group_len[0] = groups_cnt;
1147 for (i = 0; i < groups_cnt; i++) {
1148 group_len[i + 1] = groups[i].group_len;
1149 iovec_mcast[i + 1].iov_len = groups[i].group_len;
1150 iovec_mcast[i + 1].iov_base = groups[i].group;
1152 iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short);
1153 iovec_mcast[0].iov_base = (char *)group_len;
1154 for (i = 0; i < iov_len; i++) {
1155 iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len;
1156 iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base;
1159 res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee);
1161 hdb_handle_put (&totempg_groups_instance_database, handle);
1163 error_exit:
1164 pthread_mutex_unlock (&totempg_mutex);
1165 return (res);
1169 * Returns -1 if error, 0 if can't send, 1 if can send the message
1171 int totempg_groups_send_ok_groups (
1172 totempg_groups_handle handle,
1173 struct totempg_group *groups,
1174 int groups_cnt,
1175 struct iovec *iovec,
1176 int iov_len)
1178 struct totempg_group_instance *instance;
1179 unsigned int size = 0;
1180 unsigned int i;
1181 unsigned int res;
1183 pthread_mutex_lock (&totempg_mutex);
1184 res = hdb_handle_get (&totempg_groups_instance_database, handle,
1185 (void *)&instance);
1186 if (res != 0) {
1187 goto error_exit;
1190 for (i = 0; i < groups_cnt; i++) {
1191 size += groups[i].group_len;
1193 for (i = 0; i < iov_len; i++) {
1194 size += iovec[i].iov_len;
1197 res = send_ok (size);
1199 hdb_handle_put (&totempg_groups_instance_database, handle);
1200 error_exit:
1201 pthread_mutex_unlock (&totempg_mutex);
1202 return (res);
1205 int totempg_ifaces_get (
1206 unsigned int nodeid,
1207 struct totem_ip_address *interfaces,
1208 char ***status,
1209 unsigned int *iface_count)
1211 int res;
1213 res = totemmrp_ifaces_get (
1214 nodeid,
1215 interfaces,
1216 status,
1217 iface_count);
1219 return (res);
1222 int totempg_ring_reenable (void)
1224 int res;
1226 res = totemmrp_ring_reenable ();
1228 return (res);
1231 char *totempg_ifaces_print (unsigned int nodeid)
1233 static char iface_string[256 * INTERFACE_MAX];
1234 char one_iface[64];
1235 struct totem_ip_address interfaces[INTERFACE_MAX];
1236 char **status;
1237 unsigned int iface_count;
1238 unsigned int i;
1239 int res;
1241 iface_string[0] = '\0';
1243 res = totempg_ifaces_get (nodeid, interfaces, &status, &iface_count);
1244 if (res == -1) {
1245 return ("no interface found for nodeid");
1248 for (i = 0; i < iface_count; i++) {
1249 sprintf (one_iface, "r(%d) ip(%s) ",
1250 i, totemip_print (&interfaces[i]));
1251 strcat (iface_string, one_iface);
1253 return (iface_string);
1256 int totempg_my_nodeid_get (void)
1258 return (totemmrp_my_nodeid_get());
1261 int totempg_my_family_get (void)
1263 return (totemmrp_my_family_get());