2 * Copyright (c) 2004-2006 Mark Haverkamp
3 * Copyright (c) 2004-2006 Open Source Development Lab
4 * Copyright (c) 2006 Sun Microsystems, Inc.
8 * This software licensed under BSD license, the text of which follows:
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions are met:
13 * - Redistributions of source code must retain the above copyright notice,
14 * this list of conditions and the following disclaimer.
15 * - Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 * - Neither the name of the Open Source Development Lab nor the names of its
19 * contributors may be used to endorse or promote products derived from this
20 * software without specific prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
26 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
32 * THE POSSIBILITY OF SUCH DAMAGE.
35 #define DUMP_CHAN_INFO
36 #define RECOVERY_EVENT_DEBUG LOG_LEVEL_DEBUG
37 #define RECOVERY_DEBUG LOG_LEVEL_DEBUG
38 #define CHAN_DEL_DEBUG LOG_LEVEL_DEBUG
39 #define CHAN_OPEN_DEBUG LOG_LEVEL_DEBUG
40 #define CHAN_UNLINK_DEBUG LOG_LEVEL_DEBUG
41 #define REMOTE_OP_DEBUG LOG_LEVEL_DEBUG
42 #define RETENTION_TIME_DEBUG LOG_LEVEL_DEBUG
44 #include <sys/types.h>
49 #include <sys/socket.h>
50 #include <netinet/in.h>
51 #include <arpa/inet.h>
52 #include "../include/hdb.h"
53 #include "../include/ipc_evt.h"
54 #include "../include/list.h"
55 #include "../include/queue.h"
56 #include "../lcr/lcr_comp.h"
72 LOGSYS_DECLARE_SUBSYS ("EVT", LOG_INFO
);
74 * event instance structure. Contains information about the
75 * active connection to the API library.
77 * esi_version: Version that the library is running.
78 * esi_open_chans: list of open channels associated with this
79 * instance. Used to clean up any data left
80 * allocated when the finalize is done.
81 * (event_svr_channel_open.eco_instance_entry)
82 * esi_events: list of pending events to be delivered on this
83 * instance (struct chan_event_list.cel_entry)
84 * esi_queue_blocked: non-zero if the delivery queue got too full
85 * and we're blocking new messages until we
86 * drain some of the queued messages.
87 * esi_nevents: Number of events in events lists to be sent.
88 * esi_hdb: Handle data base for open channels on this
89 * instance. Used for a quick lookup of
90 * open channel data from a lib api message.
93 SaVersionT esi_version
;
94 struct list_head esi_open_chans
;
95 struct list_head esi_events
[SA_EVT_LOWEST_PRIORITY
+1];
97 int esi_queue_blocked
;
98 struct hdb_handle_database esi_hdb
;
102 enum evt_message_req_types
{
103 MESSAGE_REQ_EXEC_EVT_EVENTDATA
= 0,
104 MESSAGE_REQ_EXEC_EVT_CHANCMD
= 1,
105 MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA
= 2
108 static void lib_evt_open_channel(void *conn
, void *message
);
109 static void lib_evt_open_channel_async(void *conn
, void *message
);
110 static void lib_evt_close_channel(void *conn
, void *message
);
111 static void lib_evt_unlink_channel(void *conn
, void *message
);
112 static void lib_evt_event_subscribe(void *conn
, void *message
);
113 static void lib_evt_event_unsubscribe(void *conn
, void *message
);
114 static void lib_evt_event_publish(void *conn
, void *message
);
115 static void lib_evt_event_clear_retentiontime(void *conn
, void *message
);
116 static void lib_evt_event_data_get(void *conn
, void *message
);
118 static void evt_conf_change(
119 enum totem_configuration_type configuration_type
,
120 unsigned int *member_list
, int member_list_entries
,
121 unsigned int *left_list
, int left_list_entries
,
122 unsigned int *joined_list
, int joined_list_entries
,
123 struct memb_ring_id
*ring_id
);
125 static int evt_lib_init(void *conn
);
126 static int evt_lib_exit(void *conn
);
127 static int evt_exec_init(struct objdb_iface_ver0
*objdb
);
130 * Recovery sync functions
132 static void evt_sync_init(void);
133 static int evt_sync_process(void);
134 static void evt_sync_activate(void);
135 static void evt_sync_abort(void);
137 static void convert_event(void *msg
);
138 static void convert_chan_packet(void *msg
);
140 static struct openais_lib_handler evt_lib_service
[] = {
142 .lib_handler_fn
= lib_evt_open_channel
,
143 .response_size
= sizeof(struct res_evt_channel_open
),
144 .response_id
= MESSAGE_RES_EVT_OPEN_CHANNEL
,
145 .flow_control
= OPENAIS_FLOW_CONTROL_REQUIRED
148 .lib_handler_fn
= lib_evt_open_channel_async
,
149 .response_size
= sizeof(struct res_evt_channel_open
),
150 .response_id
= MESSAGE_RES_EVT_OPEN_CHANNEL
,
151 .flow_control
= OPENAIS_FLOW_CONTROL_REQUIRED
154 .lib_handler_fn
= lib_evt_close_channel
,
155 .response_size
= sizeof(struct res_evt_channel_close
),
156 .response_id
= MESSAGE_RES_EVT_CLOSE_CHANNEL
,
157 .flow_control
= OPENAIS_FLOW_CONTROL_REQUIRED
160 .lib_handler_fn
= lib_evt_unlink_channel
,
161 .response_size
= sizeof(struct res_evt_channel_unlink
),
162 .response_id
= MESSAGE_RES_EVT_UNLINK_CHANNEL
,
163 .flow_control
= OPENAIS_FLOW_CONTROL_REQUIRED
166 .lib_handler_fn
= lib_evt_event_subscribe
,
167 .response_size
= sizeof(struct res_evt_event_subscribe
),
168 .response_id
= MESSAGE_RES_EVT_SUBSCRIBE
,
169 .flow_control
= OPENAIS_FLOW_CONTROL_NOT_REQUIRED
172 .lib_handler_fn
= lib_evt_event_unsubscribe
,
173 .response_size
= sizeof(struct res_evt_event_unsubscribe
),
174 .response_id
= MESSAGE_RES_EVT_UNSUBSCRIBE
,
175 .flow_control
= OPENAIS_FLOW_CONTROL_NOT_REQUIRED
178 .lib_handler_fn
= lib_evt_event_publish
,
179 .response_size
= sizeof(struct res_evt_event_publish
),
180 .response_id
= MESSAGE_RES_EVT_PUBLISH
,
181 .flow_control
= OPENAIS_FLOW_CONTROL_REQUIRED
184 .lib_handler_fn
= lib_evt_event_clear_retentiontime
,
185 .response_size
= sizeof(struct res_evt_event_clear_retentiontime
),
186 .response_id
= MESSAGE_RES_EVT_CLEAR_RETENTIONTIME
,
187 .flow_control
= OPENAIS_FLOW_CONTROL_REQUIRED
190 .lib_handler_fn
= lib_evt_event_data_get
,
191 .response_size
= sizeof(struct lib_event_data
),
192 .response_id
= MESSAGE_RES_EVT_EVENT_DATA
,
193 .flow_control
= OPENAIS_FLOW_CONTROL_NOT_REQUIRED
198 static void evt_remote_evt(void *msg
, unsigned int nodeid
);
199 static void evt_remote_recovery_evt(void *msg
, unsigned int nodeid
);
200 static void evt_remote_chan_op(void *msg
, unsigned int nodeid
);
202 static struct openais_exec_handler evt_exec_service
[] = {
204 .exec_handler_fn
= evt_remote_evt
,
205 .exec_endian_convert_fn
= convert_event
208 .exec_handler_fn
= evt_remote_chan_op
,
209 .exec_endian_convert_fn
= convert_chan_packet
212 .exec_handler_fn
= evt_remote_recovery_evt
,
213 .exec_endian_convert_fn
= convert_event
217 struct openais_service_handler evt_service_handler
= {
218 .name
= "openais event service B.01.01",
220 .private_data_size
= sizeof (struct libevt_pd
),
221 .flow_control
= OPENAIS_FLOW_CONTROL_NOT_REQUIRED
,
222 .lib_init_fn
= evt_lib_init
,
223 .lib_exit_fn
= evt_lib_exit
,
224 .lib_service
= evt_lib_service
,
225 .lib_service_count
= sizeof(evt_lib_service
) / sizeof(struct openais_lib_handler
),
226 .exec_init_fn
= evt_exec_init
,
227 .exec_service
= evt_exec_service
,
228 .exec_service_count
= sizeof(evt_exec_service
) / sizeof(struct openais_exec_handler
),
229 .exec_dump_fn
= NULL
,
230 .confchg_fn
= evt_conf_change
,
231 .sync_init
= evt_sync_init
,
232 .sync_process
= evt_sync_process
,
233 .sync_activate
= evt_sync_activate
,
234 .sync_abort
= evt_sync_abort
237 static struct openais_service_handler
*evt_get_handler_ver0(void);
239 static struct openais_service_handler_iface_ver0 evt_service_handler_iface
= {
240 .openais_get_service_handler_ver0
= evt_get_handler_ver0
243 static struct lcr_iface openais_evt_ver0
[1] = {
245 .name
= "openais_evt",
247 .versions_replace
= 0,
248 .versions_replace_count
= 0,
250 .dependency_count
= 0,
257 static struct lcr_comp evt_comp_ver0
= {
259 .ifaces
= openais_evt_ver0
262 static struct openais_service_handler
*evt_get_handler_ver0(void)
264 return (&evt_service_handler
);
267 __attribute__ ((constructor
)) static void evt_comp_register (void) {
268 lcr_interfaces_set (&openais_evt_ver0
[0], &evt_service_handler_iface
);
270 lcr_component_register (&evt_comp_ver0
);
274 * MESSAGE_REQ_EXEC_EVT_CHANCMD
276 * Used for various event related operations.
280 EVT_OPEN_CHAN_OP
, /* chc_chan */
281 EVT_CLOSE_CHAN_OP
, /* chc_close_unlink_chan */
282 EVT_UNLINK_CHAN_OP
, /* chc_close_unlink_chan */
283 EVT_CLEAR_RET_OP
, /* chc_event_id */
284 EVT_SET_ID_OP
, /* chc_set_id */
285 EVT_CONF_DONE
, /* no data used */
286 EVT_OPEN_COUNT
, /* chc_set_opens */
287 EVT_OPEN_COUNT_DONE
/* no data used */
291 * Used during recovery to set the next issued event ID
292 * based on the highest ID seen by any of the members
295 mar_uint32_t chc_nodeid
__attribute__((aligned(8)));
296 mar_uint64_t chc_last_id
__attribute__((aligned(8)));
300 * For set open count used during recovery to syncronize all nodes
302 * chc_chan_name: Channel name.
303 * chc_open_count: number of local opens of this channel.
305 struct evt_set_opens
{
306 mar_name_t chc_chan_name
__attribute__((aligned(8)));
307 mar_uint32_t chc_open_count
__attribute__((aligned(8)));
311 * Used to communicate channel to close or unlink.
313 #define EVT_CHAN_ACTIVE 0
314 struct evt_close_unlink_chan
{
315 mar_name_t chcu_name
__attribute__((aligned(8)));
316 mar_uint64_t chcu_unlink_id
__attribute__((aligned(8)));
319 struct open_chan_req
{
320 mar_name_t ocr_name
__attribute__((aligned(8)));
321 mar_uint64_t ocr_serial_no
__attribute__((aligned(8)));
325 * Sent via MESSAGE_REQ_EXEC_EVT_CHANCMD
327 * chc_head: Request head
328 * chc_op: Channel operation (open, close, clear retentiontime)
329 * u: union of operation options.
331 struct req_evt_chan_command
{
332 mar_req_header_t chc_head
__attribute__((aligned(8)));
333 mar_uint32_t chc_op
__attribute__((aligned(8)));
335 struct open_chan_req chc_chan
__attribute__((aligned(8)));
336 mar_evteventid_t chc_event_id
__attribute__((aligned(8)));
337 struct evt_set_id chc_set_id
__attribute__((aligned(8)));
338 struct evt_set_opens chc_set_opens
__attribute__((aligned(8)));
339 struct evt_close_unlink_chan chcu
__attribute__((aligned(8)));
344 * list of all retained events
347 static DECLARE_LIST_INIT(retained_list
);
350 * list of all event channel information
351 * struct event_svr_channel_instance
353 static DECLARE_LIST_INIT(esc_head
);
356 * list of all unlinked event channel information
357 * struct event_svr_channel_instance
359 static DECLARE_LIST_INIT(esc_unlinked_head
);
362 * Track the state of event service recovery.
364 * evt_recovery_complete: Normal operational mode
366 * evt_send_event_id: Node is sending known last
369 * evt_send_open_count: Node is sending its open
370 * Channel information.
372 * evt_wait_open_count_done: Node is done sending open channel data and
373 * is waiting for the other nodes to finish.
375 * evt_send_retained_events: Node is sending retained event data.
377 * evt_send_retained_events_done: Node is sending done message.
379 * evt_wait_send_retained_events: Node is waiting for other nodes to
380 * finish sending retained event data.
382 enum recovery_phases
{
383 evt_recovery_complete
,
386 evt_wait_open_count_done
,
387 evt_send_retained_events
,
388 evt_send_retained_events_done
,
389 evt_wait_send_retained_events
393 * Global varaibles used by the event service
395 * base_id_top: upper bits of next event ID to assign
396 * base_id: Lower bits of Next event ID to assign
397 * my_node_id: My cluster node id
398 * checked_in: keep track during config change.
399 * recovery_node: True if we're the recovery node. i.e. the
400 * node that sends the retained events.
401 * next_retained: pointer to next retained message to send
403 * next_chan: pointer to next channel to send during recovery.
404 * recovery_phase: Indicates what recovery is taking place.
405 * left_member_count: How many left this configuration.
406 * left_member_list: Members that left this config
407 * joined_member_count: How many joined this configuration.
408 * joined_member_list: Members that joined this config
409 * total_member_count: how many members in this cluster
410 * current_member_list: Total membership this config
411 * trans_member_count: Node count in transitional membership
412 * trans_member_list: Total membership from the transitional membership
413 * add_count: count of joined members used for sending event id
415 * add_list: pointer to joined list used for sending event id
417 * processed_open_counts: Flag used to coordinate clearing open
418 * channel counts for config change recovery.
422 #define BASE_ID_MASK 0xffffffffLL
423 static mar_evteventid_t base_id
= 1;
424 static mar_evteventid_t base_id_top
= 0;
425 static mar_uint32_t my_node_id
= 0;
426 static int checked_in
= 0;
427 static int recovery_node
= 0;
428 static struct list_head
*next_retained
= 0;
429 static struct list_head
*next_chan
= 0;
430 static enum recovery_phases recovery_phase
= evt_recovery_complete
;
431 static int left_member_count
= 0;
432 static unsigned int *left_member_list
= 0;
433 static int joined_member_count
= 0;
434 static unsigned int *joined_member_list
= 0;
435 static int total_member_count
= 0;
436 static unsigned int *current_member_list
= 0;
437 static int trans_member_count
= 0;
438 static unsigned int *trans_member_list
= 0;
439 static int add_count
= 0;
440 static unsigned int *add_list
= 0;
441 static int processed_open_counts
= 0;
444 * Structure to track pending channel open requests.
445 * ocp_async: 1 for async open
446 * ocp_invocation: invocation for async open
447 * ocp_chan_name: requested channel
448 * ocp_conn: conn for returning to the library.
449 * ocp_open_flags: channel open flags
450 * ocp_timer_handle: timer handle for sync open
451 * ocp_serial_no: Identifier for the request
452 * ocp_entry: list entry for pending open list.
454 struct open_chan_pending
{
456 mar_invocation_t ocp_invocation
;
457 mar_name_t ocp_chan_name
;
459 mar_evtchannelopenflags_t ocp_open_flag
;
460 timer_handle ocp_timer_handle
;
461 uint64_t ocp_c_handle
;
462 uint64_t ocp_serial_no
;
463 struct list_head ocp_entry
;
465 static uint64_t open_serial_no
= 0;
468 * code to indicate that the open request has timed out. The
469 * invocation data element is used for this code since it is
470 * only used by the open async call which cannot have a timeout.
472 #define OPEN_TIMED_OUT 0x5a6b5a6b5a6b5a6bLLU
475 * list of pending channel opens
477 static DECLARE_LIST_INIT(open_pending
);
478 static void chan_open_timeout(void *data
);
481 * Structure to track pending channel unlink requests.
482 * ucp_unlink_id: unlink ID of unlinked channel.
483 * ucp_conn: conn for returning to the library.
484 * ucp_entry: list entry for pending unlink list.
486 struct unlink_chan_pending
{
487 uint64_t ucp_unlink_id
;
489 struct list_head ucp_entry
;
493 * list of pending unlink requests
495 static DECLARE_LIST_INIT(unlink_pending
);
498 * Structure to track pending retention time clear requests.
499 * rtc_event_id: event ID to clear.
500 * rtc_conn: conn for returning to the library.
501 * rtc_entry: list entry for pending clear list.
503 struct retention_time_clear_pending
{
504 mar_evteventid_t rtc_event_id
;
506 struct list_head rtc_entry
;
510 * list of pending clear requests.
512 static DECLARE_LIST_INIT(clear_pending
);
517 static uint64_t base_unlink_id
= 0;
519 next_chan_unlink_id()
521 uint64_t uid
= my_node_id
;
522 uid
= (uid
<< 32ULL) | base_unlink_id
;
523 base_unlink_id
= (base_unlink_id
+ 1ULL) & BASE_ID_MASK
;
527 #define min(a,b) ((a) < (b) ? (a) : (b))
530 * Throttle event delivery to applications to keep
531 * the exec from using too much memory if the app is
532 * slow to process its events.
534 #define MAX_EVT_DELIVERY_QUEUE 1000
535 #define MIN_EVT_QUEUE_RESUME (MAX_EVT_DELIVERY_QUEUE / 2)
537 static unsigned int evt_delivery_queue_size
= MAX_EVT_DELIVERY_QUEUE
;
538 static unsigned int evt_delivery_queue_resume
= MIN_EVT_QUEUE_RESUME
;
541 #define LOST_PUB "EVENT_SERIVCE"
542 #define LOST_CHAN "LOST EVENT"
544 * Event to send when the delivery queue gets too full
546 char lost_evt
[] = SA_EVT_LOST_EVENT
;
547 static int dropped_event_size
;
548 static struct event_data
*dropped_event
;
550 mar_evt_event_pattern_t pat
;
551 char str
[sizeof(lost_evt
)];
553 static struct evt_pattern dropped_pattern
= {
557 (SaUint8T
*) &dropped_pattern
.str
[0]},
558 .str
= {SA_EVT_LOST_EVENT
}
561 mar_name_t lost_chan
= {
563 .length
= sizeof(LOST_CHAN
)
566 mar_name_t dropped_publisher
= {
568 .length
= sizeof(LOST_PUB
)
571 struct event_svr_channel_open
;
572 struct event_svr_channel_subscr
;
575 mar_uint32_t oc_node_id
;
576 int32_t oc_open_count
;
580 * Structure to contain global channel releated information
582 * esc_channel_name: The name of this channel.
583 * esc_total_opens: The total number of opens on this channel including
585 * esc_local_opens: The number of opens on this channel for this node.
586 * esc_oc_size: The total number of entries in esc_node_opens;
587 * esc_node_opens: list of node IDs and how many opens are associated.
588 * esc_retained_count: How many retained events for this channel
589 * esc_open_chans: list of opens of this channel.
590 * (event_svr_channel_open.eco_entry)
591 * esc_entry: links to other channels. (used by esc_head)
592 * esc_unlink_id: If non-zero, then the channel has been marked
593 * for unlink. This unlink ID is used to
594 * mark events still associated with current openings
595 * so they get delivered to the proper recipients.
597 struct event_svr_channel_instance
{
598 mar_name_t esc_channel_name
;
599 int32_t esc_total_opens
;
600 int32_t esc_local_opens
;
601 uint32_t esc_oc_size
;
602 struct open_count
*esc_node_opens
;
603 uint32_t esc_retained_count
;
604 struct list_head esc_open_chans
;
605 struct list_head esc_entry
;
606 uint64_t esc_unlink_id
;
610 * Has the event data in the correct format to send to the library API
611 * with aditional field for accounting.
613 * ed_ref_count: how many other strutures are referencing.
614 * ed_retained: retained event list.
615 * ed_timer_handle: Timer handle for retained event expiration.
616 * ed_delivered: arrays of open channel pointers that this event
617 * has been delivered to. (only used for events
618 * with a retention time).
619 * ed_delivered_count: Number of entries available in ed_delivered.
620 * ed_delivered_next: Next free spot in ed_delivered
621 * ed_my_chan: pointer to the global channel instance associated
623 * ed_event: The event data formatted to be ready to send.
626 uint32_t ed_ref_count
;
627 struct list_head ed_retained
;
628 timer_handle ed_timer_handle
;
629 struct event_svr_channel_open
**ed_delivered
;
630 uint32_t ed_delivered_count
;
631 uint32_t ed_delivered_next
;
632 struct event_svr_channel_instance
*ed_my_chan
;
633 struct lib_event_data ed_event
;
637 * Contains a list of pending events to be delivered to a subscribed
640 * cel_chan_handle: associated library channel handle
641 * cel_sub_id: associated library subscription ID
642 * cel_event: event structure to deliver.
643 * cel_entry: list of pending events
644 * (struct event_server_instance.esi_events)
646 struct chan_event_list
{
647 uint64_t cel_chan_handle
;
649 struct event_data
* cel_event
;
650 struct list_head cel_entry
;
654 * Contains information about each open for a given channel
656 * eco_flags: How the channel was opened.
657 * eco_lib_handle: channel handle in the app. Used for event delivery.
658 * eco_my_handle: the handle used to access this data structure.
659 * eco_channel: Pointer to global channel info.
660 * eco_entry: links to other opeinings of this channel.
661 * eco_instance_entry: links to other channel opeinings for the
662 * associated server instance.
663 * eco_subscr: head of list of sbuscriptions for this channel open.
664 * (event_svr_channel_subscr.ecs_entry)
665 * eco_conn: refrence to EvtInitialize who owns this open.
667 struct event_svr_channel_open
{
669 uint64_t eco_lib_handle
;
670 uint32_t eco_my_handle
;
671 struct event_svr_channel_instance
*eco_channel
;
672 struct list_head eco_entry
;
673 struct list_head eco_instance_entry
;
674 struct list_head eco_subscr
;
679 * Contains information about each channel subscription
681 * ecs_open_chan: Link to our open channel.
682 * ecs_sub_id: Subscription ID.
683 * ecs_filter_count: number of filters in ecs_filters
684 * ecs_filters: filters for determining event delivery.
685 * ecs_entry: Links to other subscriptions to this channel opening.
687 struct event_svr_channel_subscr
{
688 struct event_svr_channel_open
*ecs_open_chan
;
690 mar_evt_event_filter_array_t
*ecs_filters
;
691 struct list_head ecs_entry
;
697 * mn_node_info: cluster node info from membership
698 * mn_last_msg_id: last seen message ID for this node
699 * mn_started: Indicates that event service has started
701 * mn_next: pointer to the next node in the hash chain.
702 * mn_entry: List of all nodes.
704 struct member_node_data
{
705 unsigned int mn_nodeid
;
706 SaClmClusterNodeT mn_node_info
;
707 mar_evteventid_t mn_last_msg_id
;
708 mar_uint32_t mn_started
;
709 struct member_node_data
*mn_next
;
710 struct list_head mn_entry
;
712 DECLARE_LIST_INIT(mnd
);
714 * Take the filters we received from the application via the library and
715 * make them into a real mar_evt_event_filter_array_t
717 static SaAisErrorT
evtfilt_to_aisfilt(struct req_evt_event_subscribe
*req
,
718 mar_evt_event_filter_array_t
**evtfilters
)
721 mar_evt_event_filter_array_t
*filta
=
722 (mar_evt_event_filter_array_t
*)req
->ics_filter_data
;
723 mar_evt_event_filter_array_t
*filters
;
724 mar_evt_event_filter_t
*filt
= (void *)filta
+ sizeof(mar_evt_event_filter_array_t
);
725 SaUint8T
*str
= (void *)filta
+ sizeof(mar_evt_event_filter_array_t
) +
726 (sizeof(mar_evt_event_filter_t
) * filta
->filters_number
);
730 filters
= malloc(sizeof(mar_evt_event_filter_array_t
));
732 return SA_AIS_ERR_NO_MEMORY
;
735 filters
->filters_number
= filta
->filters_number
;
736 filters
->filters
= malloc(sizeof(mar_evt_event_filter_t
) *
737 filta
->filters_number
);
738 if (!filters
->filters
) {
740 return SA_AIS_ERR_NO_MEMORY
;
743 for (i
= 0; i
< filters
->filters_number
; i
++) {
744 filters
->filters
[i
].filter
.pattern
=
745 malloc(filt
[i
].filter
.pattern_size
);
747 if (!filters
->filters
[i
].filter
.pattern
) {
748 for (j
= 0; j
< i
; j
++) {
749 free(filters
->filters
[j
].filter
.pattern
);
751 free(filters
->filters
);
753 return SA_AIS_ERR_NO_MEMORY
;
755 filters
->filters
[i
].filter
.pattern_size
=
756 filt
[i
].filter
.pattern_size
;
757 filters
->filters
[i
].filter
.allocated_size
=
758 filt
[i
].filter
.pattern_size
;
759 memcpy(filters
->filters
[i
].filter
.pattern
,
760 str
, filters
->filters
[i
].filter
.pattern_size
);
761 filters
->filters
[i
].filter_type
= filt
[i
].filter_type
;
762 str
+= filters
->filters
[i
].filter
.pattern_size
;
765 *evtfilters
= filters
;
771 * Free up filter data
773 static void free_filters(mar_evt_event_filter_array_t
*fp
)
777 for (i
= 0; i
< fp
->filters_number
; i
++) {
778 free(fp
->filters
[i
].filter
.pattern
);
786 * Look up a channel in the global channel list
788 static struct event_svr_channel_instance
*
789 find_channel(mar_name_t
*chan_name
, uint64_t unlink_id
)
791 struct list_head
*l
, *head
;
792 struct event_svr_channel_instance
*eci
;
795 * choose which list to look through
797 if (unlink_id
== EVT_CHAN_ACTIVE
) {
800 head
= &esc_unlinked_head
;
803 for (l
= head
->next
; l
!= head
; l
= l
->next
) {
805 eci
= list_entry(l
, struct event_svr_channel_instance
, esc_entry
);
806 if (!mar_name_match(chan_name
, &eci
->esc_channel_name
)) {
808 } else if (unlink_id
!= eci
->esc_unlink_id
) {
817 * Find the last unlinked version of a channel.
819 static struct event_svr_channel_instance
*
820 find_last_unlinked_channel(mar_name_t
*chan_name
)
823 struct event_svr_channel_instance
*eci
;
826 * unlinked channels are added to the head of the list
827 * so the first one we see is the last one added.
829 for (l
= esc_unlinked_head
.next
; l
!= &esc_unlinked_head
; l
= l
->next
) {
831 eci
= list_entry(l
, struct event_svr_channel_instance
, esc_entry
);
832 if (!mar_name_match(chan_name
, &eci
->esc_channel_name
)) {
840 * Create and initialize a channel instance structure
842 static struct event_svr_channel_instance
*create_channel(mar_name_t
*cn
)
844 struct event_svr_channel_instance
*eci
;
845 eci
= (struct event_svr_channel_instance
*) malloc(sizeof(*eci
));
850 memset(eci
, 0, sizeof(*eci
));
851 list_init(&eci
->esc_entry
);
852 list_init(&eci
->esc_open_chans
);
853 eci
->esc_oc_size
= total_member_count
;
854 eci
->esc_node_opens
=
855 malloc(sizeof(struct open_count
) * total_member_count
);
856 if (!eci
->esc_node_opens
) {
860 memset(eci
->esc_node_opens
, 0,
861 sizeof(struct open_count
) * total_member_count
);
862 eci
->esc_channel_name
= *cn
;
863 eci
->esc_channel_name
.value
[eci
->esc_channel_name
.length
] = '\0';
864 list_add(&eci
->esc_entry
, &esc_head
);
871 * Make sure that the list of nodes is large enough to hold the whole
874 static int check_open_size(struct event_svr_channel_instance
*eci
)
876 struct open_count
*esc_node_opens_tmp
;
878 if (total_member_count
> eci
->esc_oc_size
) {
879 esc_node_opens_tmp
= realloc (eci
->esc_node_opens
,
880 sizeof(struct open_count
) * total_member_count
);
881 if (esc_node_opens_tmp
== NULL
) {
882 log_printf(LOG_LEVEL_WARNING
,
883 "Memory error realloc of node list\n");
886 eci
->esc_node_opens
= esc_node_opens_tmp
;
887 memset(&eci
->esc_node_opens
[eci
->esc_oc_size
], 0,
888 sizeof(struct open_count
) *
889 (total_member_count
- eci
->esc_oc_size
));
890 eci
->esc_oc_size
= total_member_count
;
896 * Find the specified node ID in the node list of the channel.
897 * If it's not in the list, add it.
899 static struct open_count
* find_open_count(
900 struct event_svr_channel_instance
*eci
,
901 mar_uint32_t node_id
)
905 for (i
= 0; i
< eci
->esc_oc_size
; i
++) {
906 if (eci
->esc_node_opens
[i
].oc_node_id
== 0) {
907 eci
->esc_node_opens
[i
].oc_node_id
= node_id
;
908 eci
->esc_node_opens
[i
].oc_open_count
= 0;
910 if (eci
->esc_node_opens
[i
].oc_node_id
== node_id
) {
911 return &eci
->esc_node_opens
[i
];
914 log_printf(LOG_LEVEL_DEBUG
,
915 "find_open_count: node id %s not found\n",
916 totempg_ifaces_print (node_id
));
920 static void dump_chan_opens(struct event_svr_channel_instance
*eci
)
923 log_printf(LOG_LEVEL_NOTICE
,
924 "Channel %s, total %d, local %d\n",
925 eci
->esc_channel_name
.value
,
926 eci
->esc_total_opens
,
927 eci
->esc_local_opens
);
928 for (i
= 0; i
< eci
->esc_oc_size
; i
++) {
929 if (eci
->esc_node_opens
[i
].oc_node_id
== 0) {
932 log_printf(LOG_LEVEL_NOTICE
, "Node %s, count %d\n",
933 totempg_ifaces_print (eci
->esc_node_opens
[i
].oc_node_id
),
934 eci
->esc_node_opens
[i
].oc_open_count
);
938 #ifdef DUMP_CHAN_INFO
940 * Scan the list of channels and dump the open count info.
942 static void dump_all_chans()
945 struct event_svr_channel_instance
*eci
;
947 for (l
= esc_head
.next
; l
!= &esc_head
; l
= l
->next
) {
948 eci
= list_entry(l
, struct event_svr_channel_instance
, esc_entry
);
949 dump_chan_opens(eci
);
956 * Scan the list of channels and zero out the open counts
958 static void zero_chan_open_counts()
961 struct event_svr_channel_instance
*eci
;
964 for (l
= esc_head
.next
; l
!= &esc_head
; l
= l
->next
) {
965 eci
= list_entry(l
, struct event_svr_channel_instance
, esc_entry
);
966 for (i
= 0; i
< eci
->esc_oc_size
; i
++) {
967 if (eci
->esc_node_opens
[i
].oc_node_id
== 0) {
970 eci
->esc_node_opens
[i
].oc_open_count
= 0;
972 eci
->esc_total_opens
= 0;
976 * Replace the current open count for a node with the specified value.
978 static int set_open_count(struct event_svr_channel_instance
*eci
,
979 mar_uint32_t node_id
, uint32_t open_count
)
981 struct open_count
*oc
;
984 if ((i
= check_open_size(eci
)) != 0) {
988 oc
= find_open_count(eci
, node_id
);
990 log_printf(RECOVERY_DEBUG
,
991 "Set count: Chan %s for node %s, was %d, now %d\n",
992 eci
->esc_channel_name
.value
, totempg_ifaces_print (node_id
),
993 oc
->oc_open_count
, open_count
);
995 eci
->esc_total_opens
-= oc
->oc_open_count
;
996 eci
->esc_total_opens
+= open_count
;
997 oc
->oc_open_count
= open_count
;
1004 * Increment the open count for the specified node.
1006 static int inc_open_count(struct event_svr_channel_instance
*eci
,
1007 mar_uint32_t node_id
)
1010 struct open_count
*oc
;
1013 if ((i
= check_open_size(eci
)) != 0) {
1017 if (node_id
== my_node_id
) {
1018 eci
->esc_local_opens
++;
1020 oc
= find_open_count(eci
, node_id
);
1022 eci
->esc_total_opens
++;
1023 oc
->oc_open_count
++;
1030 * Decrement the open count for the specified node in the
1031 * specified channel.
1033 static int dec_open_count(struct event_svr_channel_instance
*eci
,
1034 mar_uint32_t node_id
)
1037 struct open_count
*oc
;
1040 if ((i
= check_open_size(eci
)) != 0) {
1044 if (node_id
== my_node_id
) {
1045 eci
->esc_local_opens
--;
1047 oc
= find_open_count(eci
, node_id
);
1049 eci
->esc_total_opens
--;
1050 oc
->oc_open_count
--;
1051 if ((eci
->esc_total_opens
< 0) || (oc
->oc_open_count
< 0)) {
1052 log_printf(LOG_LEVEL_ERROR
, "Channel open decrement error\n");
1053 dump_chan_opens(eci
);
1062 * Remove a channel and free its memory if it's not in use anymore.
1064 static void delete_channel(struct event_svr_channel_instance
*eci
)
1067 log_printf(CHAN_DEL_DEBUG
,
1068 "Called Delete channel %s t %d, l %d, r %d\n",
1069 eci
->esc_channel_name
.value
,
1070 eci
->esc_total_opens
, eci
->esc_local_opens
,
1071 eci
->esc_retained_count
);
1073 * If no one has the channel open anywhere and there are no unexpired
1074 * retained events for this channel, and it has been marked for deletion
1075 * by an unlink, then it is OK to delete the data structure.
1077 if ((eci
->esc_retained_count
== 0) && (eci
->esc_total_opens
== 0) &&
1078 (eci
->esc_unlink_id
!= EVT_CHAN_ACTIVE
)) {
1079 log_printf(CHAN_DEL_DEBUG
, "Delete channel %s\n",
1080 eci
->esc_channel_name
.value
);
1081 log_printf(CHAN_UNLINK_DEBUG
, "Delete channel %s, unlink_id %0llx\n",
1082 eci
->esc_channel_name
.value
, (unsigned long long)eci
->esc_unlink_id
);
1084 if (!list_empty(&eci
->esc_open_chans
)) {
1085 log_printf(LOG_LEVEL_NOTICE
,
1086 "Last channel close request for %s (still open)\n",
1087 eci
->esc_channel_name
.value
);
1088 dump_chan_opens(eci
);
1093 * adjust if we're sending open counts on a config change.
1095 if ((recovery_phase
!= evt_recovery_complete
) &&
1096 (&eci
->esc_entry
== next_chan
)) {
1097 next_chan
= eci
->esc_entry
.next
;
1100 list_del(&eci
->esc_entry
);
1101 if (eci
->esc_node_opens
) {
1102 free(eci
->esc_node_opens
);
1109 * Free up an event structure if it isn't being used anymore.
1112 free_event_data(struct event_data
*edp
)
1114 if (--edp
->ed_ref_count
) {
1117 log_printf(LOG_LEVEL_DEBUG
, "Freeing event ID: 0x%llx\n",
1118 (unsigned long long)edp
->ed_event
.led_event_id
);
1119 if (edp
->ed_delivered
) {
1120 free(edp
->ed_delivered
);
1127 * Mark a channel for deletion.
1129 static void unlink_channel(struct event_svr_channel_instance
*eci
,
1132 struct event_data
*edp
;
1133 struct list_head
*l
, *nxt
;
1135 log_printf(CHAN_UNLINK_DEBUG
, "Unlink request: %s, id 0x%llx\n",
1136 eci
->esc_channel_name
.value
, (unsigned long long)unlink_id
);
1138 * The unlink ID is used to note that the channel has been marked
1139 * for deletion and is a way to distinguish between multiple
1140 * channels of the same name each marked for deletion.
1142 eci
->esc_unlink_id
= unlink_id
;
1145 * Move the unlinked channel to the unlinked list. This way
1146 * we don't have to worry about filtering it out when we need to
1147 * distribute retained events at recovery time.
1149 list_del(&eci
->esc_entry
);
1150 list_add(&eci
->esc_entry
, &esc_unlinked_head
);
1153 * Scan the retained event list and remove any retained events.
1154 * Since no new opens can occur there won't be any need of sending
1155 * retained events on the channel.
1157 for (l
= retained_list
.next
; l
!= &retained_list
; l
= nxt
) {
1159 edp
= list_entry(l
, struct event_data
, ed_retained
);
1160 if ((edp
->ed_my_chan
== eci
) &&
1161 (edp
->ed_event
.led_chan_unlink_id
== EVT_CHAN_ACTIVE
)) {
1162 openais_timer_delete(edp
->ed_timer_handle
);
1163 edp
->ed_event
.led_retention_time
= 0;
1164 list_del(&edp
->ed_retained
);
1165 list_init(&edp
->ed_retained
);
1166 edp
->ed_my_chan
->esc_retained_count
--;
1168 log_printf(CHAN_UNLINK_DEBUG
,
1169 "Unlink: Delete retained event id 0x%llx\n",
1170 (unsigned long long)edp
->ed_event
.led_event_id
);
1171 free_event_data(edp
);
1175 delete_channel(eci
);
1179 * Remove the specified node from the node list in this channel.
1181 static int remove_open_count(
1182 struct event_svr_channel_instance
*eci
,
1183 mar_uint32_t node_id
)
1189 * Find the node, remove it and re-pack the array.
1191 for (i
= 0; i
< eci
->esc_oc_size
; i
++) {
1192 if (eci
->esc_node_opens
[i
].oc_node_id
== 0) {
1196 log_printf(RECOVERY_DEBUG
, "roc: %s/%s, t %d, oc %d\n",
1197 totempg_ifaces_print (node_id
),
1198 totempg_ifaces_print (eci
->esc_node_opens
[i
].oc_node_id
),
1199 eci
->esc_total_opens
, eci
->esc_node_opens
[i
].oc_open_count
);
1201 if (eci
->esc_node_opens
[i
].oc_node_id
== node_id
) {
1203 eci
->esc_total_opens
-= eci
->esc_node_opens
[i
].oc_open_count
;
1205 for (j
= i
+1; j
< eci
->esc_oc_size
; j
++, i
++) {
1206 eci
->esc_node_opens
[i
].oc_node_id
=
1207 eci
->esc_node_opens
[j
].oc_node_id
;
1208 eci
->esc_node_opens
[i
].oc_open_count
=
1209 eci
->esc_node_opens
[j
].oc_open_count
;
1212 eci
->esc_node_opens
[eci
->esc_oc_size
-1].oc_node_id
= 0;
1213 eci
->esc_node_opens
[eci
->esc_oc_size
-1].oc_open_count
= 0;
1216 * Remove the channel if it's not being used anymore
1218 delete_channel(eci
);
1227 * Send a request to open a channel to the rest of the cluster.
1229 static SaAisErrorT
evt_open_channel(mar_name_t
*cn
, SaUint8T flgs
)
1231 struct req_evt_chan_command cpkt
;
1232 struct event_svr_channel_instance
*eci
;
1233 struct iovec chn_iovec
;
1239 eci
= find_channel(cn
, EVT_CHAN_ACTIVE
);
1242 * If the create flag set, and it doesn't exist, we can make the channel.
1243 * Otherwise, it's an error since we're notified of channels being created
1246 if (!eci
&& !(flgs
& SA_EVT_CHANNEL_CREATE
)) {
1247 ret
= SA_AIS_ERR_NOT_EXIST
;
1252 * create the channel packet to send. Tell the the cluster
1253 * to create the channel.
1255 memset(&cpkt
, 0, sizeof(cpkt
));
1257 SERVICE_ID_MAKE(EVT_SERVICE
, MESSAGE_REQ_EXEC_EVT_CHANCMD
);
1258 cpkt
.chc_head
.size
= sizeof(cpkt
);
1259 cpkt
.chc_op
= EVT_OPEN_CHAN_OP
;
1260 cpkt
.u
.chc_chan
.ocr_name
= *cn
;
1261 cpkt
.u
.chc_chan
.ocr_serial_no
= ++open_serial_no
;
1262 chn_iovec
.iov_base
= (char *)&cpkt
;
1263 chn_iovec
.iov_len
= cpkt
.chc_head
.size
;
1264 log_printf(CHAN_OPEN_DEBUG
, "evt_open_channel: Send open mcast\n");
1265 res
= totempg_groups_mcast_joined(openais_group_handle
,
1266 &chn_iovec
, 1, TOTEMPG_AGREED
);
1267 log_printf(CHAN_OPEN_DEBUG
, "evt_open_channel: Open mcast result: %d\n",
1270 ret
= SA_AIS_ERR_LIBRARY
;
1279 * Send a request to close a channel with the rest of the cluster.
1281 static SaAisErrorT
evt_close_channel(mar_name_t
*cn
, uint64_t unlink_id
)
1283 struct req_evt_chan_command cpkt
;
1284 struct iovec chn_iovec
;
1291 * create the channel packet to send. Tell the the cluster
1292 * to close the channel.
1294 memset(&cpkt
, 0, sizeof(cpkt
));
1296 SERVICE_ID_MAKE(EVT_SERVICE
, MESSAGE_REQ_EXEC_EVT_CHANCMD
);
1297 cpkt
.chc_head
.size
= sizeof(cpkt
);
1298 cpkt
.chc_op
= EVT_CLOSE_CHAN_OP
;
1299 cpkt
.u
.chcu
.chcu_name
= *cn
;
1300 cpkt
.u
.chcu
.chcu_unlink_id
= unlink_id
;
1301 chn_iovec
.iov_base
= (char *)&cpkt
;
1302 chn_iovec
.iov_len
= cpkt
.chc_head
.size
;
1303 res
= totempg_groups_mcast_joined(openais_group_handle
,
1304 &chn_iovec
, 1, TOTEMPG_AGREED
);
1306 ret
= SA_AIS_ERR_LIBRARY
;
1313 * Node data access functions. Used to keep track of event IDs
1314 * delivery of messages.
1316 * add_node: Add a new member node to our list.
1317 * remove_node: Remove a node that left membership.
1318 * find_node: Given the node ID return a pointer to node information.
1321 #define NODE_HASH_SIZE 256
1322 static struct member_node_data
*nl
[NODE_HASH_SIZE
] = {0};
1324 hash_sock_addr(unsigned int nodeid
)
1326 return nodeid
& (NODE_HASH_SIZE
- 1);
1329 static struct member_node_data
**lookup_node(unsigned int nodeid
)
1331 int index
= hash_sock_addr(nodeid
);
1332 struct member_node_data
**nlp
;
1335 for (nlp
= &nl
[index
]; *nlp
; nlp
= &((*nlp
)->mn_next
)) {
1336 if ((*(nlp
))->mn_nodeid
== nodeid
) {
1344 static struct member_node_data
*
1345 evt_find_node(unsigned int nodeid
)
1347 struct member_node_data
**nlp
;
1349 nlp
= lookup_node(nodeid
);
1352 log_printf(LOG_LEVEL_DEBUG
, "find_node: Got NULL nlp?\n");
1361 unsigned int nodeid
,
1362 SaClmClusterNodeT
*cn
)
1364 struct member_node_data
**nlp
;
1365 struct member_node_data
*nl
;
1366 SaAisErrorT err
= SA_AIS_ERR_EXIST
;
1368 nlp
= lookup_node(nodeid
);
1371 log_printf(LOG_LEVEL_DEBUG
, "add_node: Got NULL nlp?\n");
1379 *nlp
= malloc(sizeof(struct member_node_data
));
1381 return SA_AIS_ERR_NO_MEMORY
;
1385 memset(nl
, 0, sizeof(*nl
));
1387 nl
->mn_nodeid
= nodeid
;
1390 list_init(&nl
->mn_entry
);
1391 list_add(&nl
->mn_entry
, &mnd
);
1392 nl
->mn_node_info
= *cn
;
1399 * Find the oldest node in the membership. This is the one we choose to
1400 * perform some cluster-wide functions like distributing retained events.
1401 * We only check nodes that were in our transitional configuration. In this
1402 * way there is a recovery node chosen for each original partition in case
1405 static struct member_node_data
* oldest_node()
1407 struct member_node_data
*mn
= 0;
1408 struct member_node_data
*oldest
= 0;
1411 for (i
= 0; i
< trans_member_count
; i
++) {
1412 mn
= evt_find_node(trans_member_list
[i
]);
1413 if (!mn
|| (mn
->mn_started
== 0)) {
1414 log_printf(LOG_LEVEL_ERROR
,
1415 "Transitional config Node %s not active\n",
1416 totempg_ifaces_print (trans_member_list
[i
]));
1419 if ((oldest
== NULL
) ||
1420 (mn
->mn_node_info
.bootTimestamp
<
1421 oldest
->mn_node_info
.bootTimestamp
)) {
1423 } else if (mn
->mn_node_info
.bootTimestamp
==
1424 oldest
->mn_node_info
.bootTimestamp
) {
1425 if (mn
->mn_node_info
.nodeId
< oldest
->mn_node_info
.nodeId
) {
1435 * keep track of the last event ID from a node.
1436 * If we get an event ID less than our last, we've already
1437 * seen it. It's probably a retained event being sent to
1440 static int check_last_event(
1441 struct lib_event_data
*evtpkt
,
1442 unsigned int nodeid
)
1444 struct member_node_data
*nd
;
1445 SaClmClusterNodeT
*cn
;
1448 nd
= evt_find_node(nodeid
);
1450 log_printf(LOG_LEVEL_DEBUG
,
1451 "Node ID %s not found for event %llx\n",
1452 totempg_ifaces_print (evtpkt
->led_publisher_node_id
),
1453 (unsigned long long)evtpkt
->led_event_id
);
1454 cn
= main_clm_get_by_nodeid(nodeid
);
1456 log_printf(LOG_LEVEL_DEBUG
,
1457 "Cluster Node 0x%s not found for event %llx\n",
1458 totempg_ifaces_print (evtpkt
->led_publisher_node_id
),
1459 (unsigned long long)evtpkt
->led_event_id
);
1461 evt_add_node(nodeid
, cn
);
1462 nd
= evt_find_node(nodeid
);
1470 if ((nd
->mn_last_msg_id
< evtpkt
->led_msg_id
)) {
1471 nd
->mn_last_msg_id
= evtpkt
->led_msg_id
;
1478 * event id generating code. We use the node ID for this node for the
1479 * upper 32 bits of the event ID to make sure that we can generate a cluster
1480 * wide unique event ID for a given event.
1482 SaAisErrorT
set_event_id(mar_uint32_t node_id
)
1484 SaAisErrorT err
= SA_AIS_OK
;
1486 err
= SA_AIS_ERR_EXIST
;
1488 base_id_top
= (mar_evteventid_t
)node_id
<< 32;
1493 * See if an event Id is still in use in the retained event
1496 static int id_in_use(uint64_t id
, uint64_t base
)
1498 struct list_head
*l
;
1499 struct event_data
*edp
;
1500 mar_evteventid_t evtid
= (id
<< 32) | (base
& BASE_ID_MASK
);
1502 for (l
= retained_list
.next
; l
!= &retained_list
; l
= l
->next
) {
1503 edp
= list_entry(l
, struct event_data
, ed_retained
);
1504 if (edp
->ed_event
.led_event_id
== evtid
) {
1511 static SaAisErrorT
get_event_id(uint64_t *event_id
, uint64_t *msg_id
)
1514 * Don't reuse an event ID if it is still valid because of
1517 while (id_in_use(base_id_top
, base_id
)) {
1521 *event_id
= base_id_top
| (base_id
& BASE_ID_MASK
) ;
1522 *msg_id
= base_id
++;
1529 * Timer handler to delete expired events.
1533 event_retention_timeout(void *data
)
1535 struct event_data
*edp
= data
;
1536 log_printf(RETENTION_TIME_DEBUG
, "Event ID %llx expired\n",
1537 (unsigned long long)edp
->ed_event
.led_event_id
);
1539 * adjust next_retained if we're in recovery and
1540 * were in charge of sending retained events.
1542 if (recovery_phase
!= evt_recovery_complete
&& recovery_node
) {
1543 if (next_retained
== &edp
->ed_retained
) {
1544 next_retained
= edp
->ed_retained
.next
;
1547 list_del(&edp
->ed_retained
);
1548 list_init(&edp
->ed_retained
);
1550 * Check to see if the channel isn't in use anymore.
1552 edp
->ed_my_chan
->esc_retained_count
--;
1553 if (edp
->ed_my_chan
->esc_retained_count
== 0) {
1554 delete_channel(edp
->ed_my_chan
);
1556 free_event_data(edp
);
1560 * clear a particular event's retention time.
1561 * This will free the event as long as it isn't being
1566 clear_retention_time(mar_evteventid_t event_id
)
1568 struct event_data
*edp
;
1569 struct list_head
*l
, *nxt
;
1571 log_printf(RETENTION_TIME_DEBUG
, "Search for Event ID %llx\n",
1572 (unsigned long long)event_id
);
1573 for (l
= retained_list
.next
; l
!= &retained_list
; l
= nxt
) {
1575 edp
= list_entry(l
, struct event_data
, ed_retained
);
1576 if (edp
->ed_event
.led_event_id
!= event_id
) {
1580 log_printf(RETENTION_TIME_DEBUG
,
1581 "Clear retention time for Event ID %llx\n",
1582 (unsigned long long)edp
->ed_event
.led_event_id
);
1583 openais_timer_delete(edp
->ed_timer_handle
);
1584 edp
->ed_event
.led_retention_time
= 0;
1585 list_del(&edp
->ed_retained
);
1586 list_init(&edp
->ed_retained
);
1589 * Check to see if the channel isn't in use anymore.
1591 edp
->ed_my_chan
->esc_retained_count
--;
1592 if (edp
->ed_my_chan
->esc_retained_count
== 0) {
1593 delete_channel(edp
->ed_my_chan
);
1595 free_event_data(edp
);
1598 return SA_AIS_ERR_NOT_EXIST
;
1602 * Remove specified channel from event delivery list
1605 remove_delivered_channel(struct event_svr_channel_open
*eco
)
1608 struct list_head
*l
;
1609 struct event_data
*edp
;
1611 for (l
= retained_list
.next
; l
!= &retained_list
; l
= l
->next
) {
1612 edp
= list_entry(l
, struct event_data
, ed_retained
);
1614 for (i
= 0; i
< edp
->ed_delivered_next
; i
++) {
1615 if (edp
->ed_delivered
[i
] == eco
) {
1616 edp
->ed_delivered_next
--;
1617 if (edp
->ed_delivered_next
== i
) {
1620 memmove(&edp
->ed_delivered
[i
],
1621 &edp
->ed_delivered
[i
+1],
1622 &edp
->ed_delivered
[edp
->ed_delivered_next
] -
1623 &edp
->ed_delivered
[i
]);
1631 * If there is a retention time, add this open channel to the event so
1632 * we can check if we've already delivered this message later if a new
1633 * subscription matches.
1635 #define DELIVER_SIZE 8
1637 evt_delivered(struct event_data
*evt
, struct event_svr_channel_open
*eco
)
1639 if (!evt
->ed_event
.led_retention_time
) {
1643 log_printf(LOG_LEVEL_DEBUG
, "delivered ID %llx to eco %p\n",
1644 (unsigned long long)evt
->ed_event
.led_event_id
, eco
);
1645 if (evt
->ed_delivered_count
== evt
->ed_delivered_next
) {
1646 struct event_svr_channel_open
**ed_delivered_tmp
;
1648 ed_delivered_tmp
= realloc (evt
->ed_delivered
,
1649 DELIVER_SIZE
* sizeof(struct event_svr_channel_open
*));
1650 if (ed_delivered_tmp
== NULL
) {
1651 log_printf(LOG_LEVEL_WARNING
, "Memory error realloc\n");
1654 evt
->ed_delivered
= ed_delivered_tmp
;
1655 memset(evt
->ed_delivered
+ evt
->ed_delivered_next
, 0,
1656 DELIVER_SIZE
* sizeof(struct event_svr_channel_open
*));
1657 evt
->ed_delivered_next
= evt
->ed_delivered_count
;
1658 evt
->ed_delivered_count
+= DELIVER_SIZE
;
1661 evt
->ed_delivered
[evt
->ed_delivered_next
++] = eco
;
1665 * Check to see if an event has already been delivered to this open channel
1668 evt_already_delivered(struct event_data
*evt
,
1669 struct event_svr_channel_open
*eco
)
1673 if (!evt
->ed_event
.led_retention_time
) {
1677 log_printf(LOG_LEVEL_DEBUG
, "Deliver count: %d deliver_next %d\n",
1678 evt
->ed_delivered_count
, evt
->ed_delivered_next
);
1679 for (i
= 0; i
< evt
->ed_delivered_next
; i
++) {
1680 log_printf(LOG_LEVEL_DEBUG
, "Checking ID %llx delivered %p eco %p\n",
1681 (unsigned long long)evt
->ed_event
.led_event_id
,
1682 evt
->ed_delivered
[i
], eco
);
1683 if (evt
->ed_delivered
[i
] == eco
) {
1691 * Compare a filter to a given pattern.
1692 * return SA_AIS_OK if the pattern matches a filter
1695 filter_match(mar_evt_event_pattern_t
*ep
, mar_evt_event_filter_t
*ef
)
1698 ret
= SA_AIS_ERR_FAILED_OPERATION
;
1700 switch (ef
->filter_type
) {
1701 case SA_EVT_PREFIX_FILTER
:
1702 if (ef
->filter
.pattern_size
> ep
->pattern_size
) {
1705 if (strncmp((char *)ef
->filter
.pattern
, (char *)ep
->pattern
,
1706 ef
->filter
.pattern_size
) == 0) {
1710 case SA_EVT_SUFFIX_FILTER
:
1711 if (ef
->filter
.pattern_size
> ep
->pattern_size
) {
1714 if (strncmp((char *)ef
->filter
.pattern
,
1715 (char *)&ep
->pattern
[ep
->pattern_size
- ef
->filter
.pattern_size
],
1716 ef
->filter
.pattern_size
) == 0) {
1721 case SA_EVT_EXACT_FILTER
:
1722 if (ef
->filter
.pattern_size
!= ep
->pattern_size
) {
1725 if (strncmp((char *)ef
->filter
.pattern
, (char *)ep
->pattern
,
1726 ef
->filter
.pattern_size
) == 0) {
1730 case SA_EVT_PASS_ALL_FILTER
:
1740 * compare the event's patterns with the subscription's filter rules.
1741 * SA_AIS_OK is returned if the event matches the filter rules.
1744 event_match(struct event_data
*evt
,
1745 struct event_svr_channel_subscr
*ecs
)
1747 mar_evt_event_filter_t
*ef
;
1748 mar_evt_event_pattern_t
*ep
;
1749 uint32_t filt_count
;
1750 SaAisErrorT ret
= SA_AIS_OK
;
1753 ep
= (mar_evt_event_pattern_t
*)(&evt
->ed_event
.led_body
[0]);
1754 ef
= ecs
->ecs_filters
->filters
;
1755 filt_count
= min(ecs
->ecs_filters
->filters_number
,
1756 evt
->ed_event
.led_patterns_number
);
1758 for (i
= 0; i
< filt_count
; i
++) {
1759 ret
= filter_match(ep
, ef
);
1760 if (ret
!= SA_AIS_OK
) {
1770 * Scan undelivered pending events and either remove them if no subscription
1771 * filters match anymore or re-assign them to another matching subscription
1774 filter_undelivered_events(struct event_svr_channel_open
*op_chan
)
1776 struct event_svr_channel_open
*eco
;
1777 struct event_svr_channel_instance
*eci
;
1778 struct event_svr_channel_subscr
*ecs
;
1779 struct chan_event_list
*cel
;
1780 struct libevt_pd
*esip
;
1781 struct list_head
*l
, *nxt
;
1782 struct list_head
*l1
, *l2
;
1785 esip
= (struct libevt_pd
*)openais_conn_private_data_get(op_chan
->eco_conn
);
1786 eci
= op_chan
->eco_channel
;
1789 * Scan each of the priority queues for messages
1791 for (i
= SA_EVT_HIGHEST_PRIORITY
; i
<= SA_EVT_LOWEST_PRIORITY
; i
++) {
1793 * examine each message queued for delivery
1795 for (l
= esip
->esi_events
[i
].next
; l
!= &esip
->esi_events
[i
]; l
= nxt
) {
1797 cel
= list_entry(l
, struct chan_event_list
, cel_entry
);
1799 * Check open channels
1801 for (l1
= eci
->esc_open_chans
.next
;
1802 l1
!= &eci
->esc_open_chans
; l1
= l1
->next
) {
1803 eco
= list_entry(l1
, struct event_svr_channel_open
, eco_entry
);
1806 * See if this channel open instance belongs
1807 * to this evtinitialize instance
1809 if (eco
->eco_conn
!= op_chan
->eco_conn
) {
1814 * See if enabled to receive
1816 if (!(eco
->eco_flags
& SA_EVT_CHANNEL_SUBSCRIBER
)) {
1821 * Check subscriptions
1823 for (l2
= eco
->eco_subscr
.next
;
1824 l2
!= &eco
->eco_subscr
; l2
= l2
->next
) {
1825 ecs
= list_entry(l2
,
1826 struct event_svr_channel_subscr
, ecs_entry
);
1827 if (event_match(cel
->cel_event
, ecs
) == SA_AIS_OK
) {
1829 * Something still matches.
1830 * We'll assign it to
1831 * the new subscription.
1833 cel
->cel_sub_id
= ecs
->ecs_sub_id
;
1834 cel
->cel_chan_handle
= eco
->eco_lib_handle
;
1840 * No subscription filter matches anymore. We
1841 * can delete this event.
1843 list_del(&cel
->cel_entry
);
1844 list_init(&cel
->cel_entry
);
1845 esip
->esi_nevents
--;
1847 free_event_data(cel
->cel_event
);
1856 * Notify the library of a pending event
1858 static void __notify_event(void *conn
)
1860 struct res_evt_event_data res
;
1861 struct libevt_pd
*esip
;
1863 esip
= (struct libevt_pd
*)openais_conn_private_data_get(conn
);
1864 log_printf(LOG_LEVEL_DEBUG
, "DELIVER: notify\n");
1865 if (esip
->esi_nevents
!= 0) {
1866 res
.evd_head
.size
= sizeof(res
);
1867 res
.evd_head
.id
= MESSAGE_RES_EVT_AVAILABLE
;
1868 res
.evd_head
.error
= SA_AIS_OK
;
1869 openais_conn_send_response(openais_conn_partner_get(conn
),
1874 inline void notify_event(void *conn
)
1876 struct libevt_pd
*esip
;
1878 esip
= (struct libevt_pd
*)openais_conn_private_data_get(conn
);
1881 * Give the library a kick if there aren't already
1882 * events queued for delivery.
1884 if (esip
->esi_nevents
++ == 0) {
1885 __notify_event(conn
);
1890 * sends/queues up an event for a subscribed channel.
1893 deliver_event(struct event_data
*evt
,
1894 struct event_svr_channel_open
*eco
,
1895 struct event_svr_channel_subscr
*ecs
)
1897 struct chan_event_list
*ep
;
1898 SaEvtEventPriorityT evt_prio
= evt
->ed_event
.led_priority
;
1899 struct chan_event_list
*cel
;
1900 int do_deliver_event
= 0;
1901 int do_deliver_warning
= 0;
1903 struct libevt_pd
*esip
;
1905 esip
= (struct libevt_pd
*)openais_conn_private_data_get(eco
->eco_conn
);
1907 if (evt_prio
> SA_EVT_LOWEST_PRIORITY
) {
1908 evt_prio
= SA_EVT_LOWEST_PRIORITY
;
1912 * Delivery queue check.
1913 * - If the queue is blocked, see if we've sent enough messages to
1915 * - If it isn't blocked, see if this message will put us over the top.
1916 * - If we can't deliver this message, see if we can toss some lower
1917 * priority message to make room for this one.
1918 * - If we toss any messages, queue up an event of SA_EVT_LOST_EVENT_PATTERN
1919 * to let the application know that we dropped some messages.
1921 if (esip
->esi_queue_blocked
) {
1922 if (esip
->esi_nevents
< evt_delivery_queue_resume
) {
1923 esip
->esi_queue_blocked
= 0;
1924 log_printf(LOG_LEVEL_DEBUG
, "unblock\n");
1928 assert (esip
->esi_nevents
>= 0);
1929 if (!esip
->esi_queue_blocked
&&
1930 (esip
->esi_nevents
>= evt_delivery_queue_size
)) {
1931 log_printf(LOG_LEVEL_DEBUG
, "block\n");
1932 esip
->esi_queue_blocked
= 1;
1933 do_deliver_warning
= 1;
1936 if (esip
->esi_queue_blocked
) {
1937 do_deliver_event
= 0;
1938 for (i
= SA_EVT_LOWEST_PRIORITY
; i
> evt_prio
; i
--) {
1939 if (!list_empty(&esip
->esi_events
[i
])) {
1941 * Get the last item on the list, so we drop the most
1942 * recent lowest priority event.
1944 cel
= list_entry(esip
->esi_events
[i
].prev
,
1945 struct chan_event_list
, cel_entry
);
1946 log_printf(LOG_LEVEL_DEBUG
, "Drop 0x%0llx\n",
1947 (unsigned long long)cel
->cel_event
->ed_event
.led_event_id
);
1948 list_del(&cel
->cel_entry
);
1949 free_event_data(cel
->cel_event
);
1951 esip
->esi_nevents
--;
1952 do_deliver_event
= 1;
1957 do_deliver_event
= 1;
1961 * Queue the event for delivery
1963 if (do_deliver_event
) {
1964 ep
= malloc(sizeof(*ep
));
1966 log_printf(LOG_LEVEL_WARNING
,
1967 "3Memory allocation error, can't deliver event\n");
1970 evt
->ed_ref_count
++;
1971 ep
->cel_chan_handle
= eco
->eco_lib_handle
;
1972 ep
->cel_sub_id
= ecs
->ecs_sub_id
;
1973 list_init(&ep
->cel_entry
);
1974 ep
->cel_event
= evt
;
1975 list_add_tail(&ep
->cel_entry
, &esip
->esi_events
[evt_prio
]);
1976 evt_delivered(evt
, eco
);
1977 notify_event(eco
->eco_conn
);
1981 * If we dropped an event, queue this so that the application knows
1982 * what has happened.
1984 if (do_deliver_warning
) {
1985 struct event_data
*ed
;
1986 ed
= malloc(dropped_event_size
);
1988 log_printf(LOG_LEVEL_WARNING
,
1989 "4Memory allocation error, can't deliver event\n");
1992 log_printf(LOG_LEVEL_DEBUG
, "Warn 0x%0llx\n",
1993 (unsigned long long)evt
->ed_event
.led_event_id
);
1994 memcpy(ed
, dropped_event
, dropped_event_size
);
1995 ed
->ed_event
.led_publish_time
= clust_time_now();
1996 ed
->ed_event
.led_event_id
= SA_EVT_EVENTID_LOST
;
1997 list_init(&ed
->ed_retained
);
1999 ep
= malloc(sizeof(*ep
));
2001 log_printf(LOG_LEVEL_WARNING
,
2002 "5Memory allocation error, can't deliver event\n");
2005 ep
->cel_chan_handle
= eco
->eco_lib_handle
;
2006 ep
->cel_sub_id
= ecs
->ecs_sub_id
;
2007 list_init(&ep
->cel_entry
);
2009 list_add_tail(&ep
->cel_entry
, &esip
->esi_events
[SA_EVT_HIGHEST_PRIORITY
]);
2010 notify_event(eco
->eco_conn
);
2015 * Take the event data and swap the elements so they match our architectures
2019 convert_event(void *msg
)
2021 struct lib_event_data
*evt
= (struct lib_event_data
*)msg
;
2022 mar_evt_event_pattern_t
*eps
;
2026 * The following elements don't require processing:
2028 * converted in the main deliver_fn:
2029 * led_head.id, led_head.size.
2032 * source_addr, publisher_node_id, receive_time.
2034 * Used internaly only:
2035 * led_svr_channel_handle and led_lib_channel_handle.
2038 swab_mar_name_t (&evt
->led_chan_name
);
2039 evt
->led_chan_unlink_id
= swab64(evt
->led_chan_unlink_id
);
2040 evt
->led_event_id
= swab64(evt
->led_event_id
);
2041 evt
->led_sub_id
= swab32(evt
->led_sub_id
);
2042 swab_mar_name_t (&evt
->led_publisher_name
);
2043 evt
->led_retention_time
= swab64(evt
->led_retention_time
);
2044 evt
->led_publish_time
= swab64(evt
->led_publish_time
);
2045 evt
->led_user_data_offset
= swab32(evt
->led_user_data_offset
);
2046 evt
->led_user_data_size
= swab32(evt
->led_user_data_size
);
2047 evt
->led_patterns_number
= swab32(evt
->led_patterns_number
);
2050 * Now we need to go through the led_body and swizzle pattern counts.
2051 * We can't do anything about user data since it doesn't have a specified
2052 * format. The application is on its own here.
2054 eps
= (mar_evt_event_pattern_t
*)evt
->led_body
;
2055 for (i
= 0; i
< evt
->led_patterns_number
; i
++) {
2056 eps
->pattern_size
= swab32(eps
->pattern_size
);
2057 eps
->allocated_size
= swab32(eps
->allocated_size
);
2064 * Take an event received from the network and fix it up to be usable.
2065 * - fix up pointers for pattern list.
2066 * - fill in some channel info
2068 static struct event_data
*
2069 make_local_event(struct lib_event_data
*p
,
2070 struct event_svr_channel_instance
*eci
)
2072 struct event_data
*ed
;
2073 mar_evt_event_pattern_t
*eps
;
2078 ed_size
= sizeof(*ed
) + p
->led_user_data_offset
+ p
->led_user_data_size
;
2079 ed
= malloc(ed_size
);
2081 log_printf(LOG_LEVEL_WARNING
,
2082 "Failed to allocate %u bytes for event, offset %u, data size %u\n",
2083 ed_size
, p
->led_user_data_offset
, p
->led_user_data_size
);
2086 memset(ed
, 0, ed_size
);
2087 list_init(&ed
->ed_retained
);
2088 ed
->ed_my_chan
= eci
;
2091 * Fill in lib_event_data and make the pattern pointers valid
2093 memcpy(&ed
->ed_event
, p
, sizeof(*p
) +
2094 p
->led_user_data_offset
+ p
->led_user_data_size
);
2096 eps
= (mar_evt_event_pattern_t
*)ed
->ed_event
.led_body
;
2097 str
= ed
->ed_event
.led_body
+
2098 (ed
->ed_event
.led_patterns_number
* sizeof(mar_evt_event_pattern_t
));
2099 for (i
= 0; i
< ed
->ed_event
.led_patterns_number
; i
++) {
2101 str
+= eps
->pattern_size
;
2110 * Set an event to be retained.
2112 static void retain_event(struct event_data
*evt
)
2115 evt
->ed_ref_count
++;
2116 evt
->ed_my_chan
->esc_retained_count
++;
2117 list_add_tail(&evt
->ed_retained
, &retained_list
);
2119 ret
= openais_timer_add_duration (
2120 evt
->ed_event
.led_retention_time
,
2122 event_retention_timeout
,
2123 &evt
->ed_timer_handle
);
2126 log_printf(LOG_LEVEL_ERROR
,
2127 "retention of event id 0x%llx failed\n",
2128 (unsigned long long)evt
->ed_event
.led_event_id
);
2130 log_printf(RETENTION_TIME_DEBUG
, "Retain event ID 0x%llx for %llu ms\n",
2131 (unsigned long long)evt
->ed_event
.led_event_id
, evt
->ed_event
.led_retention_time
/100000LL);
2136 * Scan the subscription list and look for the specified subsctiption ID.
2137 * Only look for the ID in subscriptions that are associated with the
2138 * saEvtInitialize associated with the specified open channel.
2140 static struct event_svr_channel_subscr
*find_subscr(
2141 struct event_svr_channel_open
*open_chan
, SaEvtSubscriptionIdT sub_id
)
2143 struct event_svr_channel_instance
*eci
;
2144 struct event_svr_channel_subscr
*ecs
;
2145 struct event_svr_channel_open
*eco
;
2146 struct list_head
*l
, *l1
;
2147 void *conn
= open_chan
->eco_conn
;
2149 eci
= open_chan
->eco_channel
;
2152 * Check for subscription id already in use.
2153 * Subscriptions are unique within saEvtInitialize (Callback scope).
2155 for (l
= eci
->esc_open_chans
.next
; l
!= &eci
->esc_open_chans
; l
= l
->next
) {
2156 eco
= list_entry(l
, struct event_svr_channel_open
, eco_entry
);
2158 * Don't bother with open channels associated with another
2161 if (eco
->eco_conn
!= conn
) {
2165 for (l1
= eco
->eco_subscr
.next
; l1
!= &eco
->eco_subscr
; l1
= l1
->next
) {
2166 ecs
= list_entry(l1
, struct event_svr_channel_subscr
, ecs_entry
);
2167 if (ecs
->ecs_sub_id
== sub_id
) {
2176 * Handler for saEvtInitialize
2178 static int evt_lib_init(void *conn
)
2180 struct libevt_pd
*libevt_pd
;
2183 libevt_pd
= (struct libevt_pd
*)openais_conn_private_data_get(conn
);
2186 log_printf(LOG_LEVEL_DEBUG
, "saEvtInitialize request.\n");
2189 * Initailze event instance data
2191 memset(libevt_pd
, 0, sizeof(*libevt_pd
));
2194 * Initialize the open channel handle database.
2196 hdb_create(&libevt_pd
->esi_hdb
);
2199 * list of channels open on this instance
2201 list_init(&libevt_pd
->esi_open_chans
);
2204 * pending event lists for each piriority
2206 for (i
= SA_EVT_HIGHEST_PRIORITY
; i
<= SA_EVT_LOWEST_PRIORITY
; i
++) {
2207 list_init(&libevt_pd
->esi_events
[i
]);
2214 * Handler for saEvtChannelOpen
2216 static void lib_evt_open_channel(void *conn
, void *message
)
2219 struct req_evt_channel_open
*req
;
2220 struct res_evt_channel_open res
;
2221 struct open_chan_pending
*ocp
;
2227 log_printf(CHAN_OPEN_DEBUG
,
2228 "saEvtChannelOpen (Open channel request)\n");
2229 log_printf(CHAN_OPEN_DEBUG
,
2230 "handle 0x%llx, to 0x%llx\n",
2231 (unsigned long long)req
->ico_c_handle
,
2232 (unsigned long long)req
->ico_timeout
);
2233 log_printf(CHAN_OPEN_DEBUG
, "flags %x, channel name(%d) %s\n",
2235 req
->ico_channel_name
.length
,
2236 req
->ico_channel_name
.value
);
2241 error
= evt_open_channel(&req
->ico_channel_name
, req
->ico_open_flag
);
2243 if (error
!= SA_AIS_OK
) {
2247 ocp
= malloc(sizeof(struct open_chan_pending
));
2249 error
= SA_AIS_ERR_NO_MEMORY
;
2254 ocp
->ocp_invocation
= 0;
2255 ocp
->ocp_chan_name
= req
->ico_channel_name
;
2256 ocp
->ocp_open_flag
= req
->ico_open_flag
;
2257 ocp
->ocp_conn
= conn
;
2258 ocp
->ocp_c_handle
= req
->ico_c_handle
;
2259 ocp
->ocp_timer_handle
= 0;
2260 ocp
->ocp_serial_no
= open_serial_no
;
2261 list_init(&ocp
->ocp_entry
);
2262 list_add_tail(&ocp
->ocp_entry
, &open_pending
);
2263 ret
= openais_timer_add_duration (
2267 &ocp
->ocp_timer_handle
);
2269 log_printf(LOG_LEVEL_WARNING
,
2270 "Error setting timeout for open channel %s\n",
2271 req
->ico_channel_name
.value
);
2277 res
.ico_head
.size
= sizeof(res
);
2278 res
.ico_head
.id
= MESSAGE_RES_EVT_OPEN_CHANNEL
;
2279 res
.ico_head
.error
= error
;
2280 openais_conn_send_response(conn
, &res
, sizeof(res
));
2284 * Handler for saEvtChannelOpen
2286 static void lib_evt_open_channel_async(void *conn
, void *message
)
2289 struct req_evt_channel_open
*req
;
2290 struct res_evt_channel_open res
;
2291 struct open_chan_pending
*ocp
;
2296 log_printf(CHAN_OPEN_DEBUG
,
2297 "saEvtChannelOpenAsync (Async Open channel request)\n");
2298 log_printf(CHAN_OPEN_DEBUG
,
2299 "handle 0x%llx, to 0x%llx\n",
2300 (unsigned long long)req
->ico_c_handle
,
2301 (unsigned long long)req
->ico_invocation
);
2302 log_printf(CHAN_OPEN_DEBUG
, "flags %x, channel name(%d) %s\n",
2304 req
->ico_channel_name
.length
,
2305 req
->ico_channel_name
.value
);
2310 error
= evt_open_channel(&req
->ico_channel_name
, req
->ico_open_flag
);
2312 if (error
!= SA_AIS_OK
) {
2316 ocp
= malloc(sizeof(struct open_chan_pending
));
2318 error
= SA_AIS_ERR_NO_MEMORY
;
2323 ocp
->ocp_invocation
= req
->ico_invocation
;
2324 ocp
->ocp_c_handle
= req
->ico_c_handle
;
2325 ocp
->ocp_chan_name
= req
->ico_channel_name
;
2326 ocp
->ocp_open_flag
= req
->ico_open_flag
;
2327 ocp
->ocp_conn
= conn
;
2328 ocp
->ocp_timer_handle
= 0;
2329 ocp
->ocp_serial_no
= open_serial_no
;
2330 list_init(&ocp
->ocp_entry
);
2331 list_add_tail(&ocp
->ocp_entry
, &open_pending
);
2334 res
.ico_head
.size
= sizeof(res
);
2335 res
.ico_head
.id
= MESSAGE_RES_EVT_OPEN_CHANNEL
;
2336 res
.ico_head
.error
= error
;
2337 openais_conn_send_response(conn
, &res
, sizeof(res
));
2343 * Used by the channel close code and by the implicit close
2344 * when saEvtFinalize is called with channels open.
2347 common_chan_close(struct event_svr_channel_open
*eco
, struct libevt_pd
*esip
)
2349 struct event_svr_channel_subscr
*ecs
;
2350 struct list_head
*l
, *nxt
;
2352 log_printf(LOG_LEVEL_DEBUG
, "Close channel %s flags 0x%02x\n",
2353 eco
->eco_channel
->esc_channel_name
.value
,
2357 * Disconnect the channel open structure.
2359 * Check for subscriptions and deal with them. In this case
2360 * if there are any, we just implicitly unsubscribe.
2362 * When We're done with the channel open data then we can
2363 * remove it's handle (this frees the memory too).
2366 list_del(&eco
->eco_entry
);
2367 list_del(&eco
->eco_instance_entry
);
2369 for (l
= eco
->eco_subscr
.next
; l
!= &eco
->eco_subscr
; l
= nxt
) {
2371 ecs
= list_entry(l
, struct event_svr_channel_subscr
, ecs_entry
);
2372 log_printf(LOG_LEVEL_DEBUG
, "Unsubscribe ID: %x\n",
2374 list_del(&ecs
->ecs_entry
);
2377 * Purge any pending events associated with this subscription
2378 * that don't match another subscription.
2380 filter_undelivered_events(eco
);
2384 * Remove this channel from the retained event's notion
2385 * of who they have been delivered to.
2387 remove_delivered_channel(eco
);
2388 return evt_close_channel(&eco
->eco_channel
->esc_channel_name
,
2389 eco
->eco_channel
->esc_unlink_id
);
2393 * Handler for saEvtChannelClose
2395 static void lib_evt_close_channel(void *conn
, void *message
)
2397 struct req_evt_channel_close
*req
;
2398 struct res_evt_channel_close res
;
2399 struct event_svr_channel_open
*eco
;
2402 struct libevt_pd
*esip
;
2404 esip
= (struct libevt_pd
*)openais_conn_private_data_get(conn
);
2408 log_printf(LOG_LEVEL_DEBUG
,
2409 "saEvtChannelClose (Close channel request)\n");
2410 log_printf(LOG_LEVEL_DEBUG
, "handle 0x%x\n", req
->icc_channel_handle
);
2413 * look up the channel handle
2415 ret
= hdb_handle_get(&esip
->esi_hdb
,
2416 req
->icc_channel_handle
, &ptr
);
2418 goto chan_close_done
;
2422 common_chan_close(eco
, esip
);
2423 hdb_handle_destroy(&esip
->esi_hdb
, req
->icc_channel_handle
);
2424 hdb_handle_put(&esip
->esi_hdb
, req
->icc_channel_handle
);
2427 res
.icc_head
.size
= sizeof(res
);
2428 res
.icc_head
.id
= MESSAGE_RES_EVT_CLOSE_CHANNEL
;
2429 res
.icc_head
.error
= ((ret
== 0) ? SA_AIS_OK
: SA_AIS_ERR_BAD_HANDLE
);
2430 openais_conn_send_response(conn
, &res
, sizeof(res
));
2434 * Handler for saEvtChannelUnlink
2436 static void lib_evt_unlink_channel(void *conn
, void *message
)
2438 struct req_evt_channel_unlink
*req
;
2439 struct res_evt_channel_unlink res
;
2440 struct iovec chn_iovec
;
2441 struct unlink_chan_pending
*ucp
= 0;
2442 struct req_evt_chan_command cpkt
;
2443 SaAisErrorT error
= SA_AIS_ERR_LIBRARY
;
2447 log_printf(CHAN_UNLINK_DEBUG
,
2448 "saEvtChannelUnlink (Unlink channel request)\n");
2449 log_printf(CHAN_UNLINK_DEBUG
, "Channel Name %s\n",
2450 req
->iuc_channel_name
.value
);
2452 if (!find_channel(&req
->iuc_channel_name
, EVT_CHAN_ACTIVE
)) {
2453 log_printf(CHAN_UNLINK_DEBUG
, "Channel Name doesn't exist\n");
2454 error
= SA_AIS_ERR_NOT_EXIST
;
2455 goto evt_unlink_err
;
2459 * Set up the data structure so that the channel op
2460 * mcast handler can complete the unlink comamnd back to the
2463 ucp
= malloc(sizeof(*ucp
));
2465 log_printf(LOG_LEVEL_ERROR
,
2466 "saEvtChannelUnlink: Memory allocation failure\n");
2467 error
= SA_AIS_ERR_TRY_AGAIN
;
2468 goto evt_unlink_err
;
2471 ucp
->ucp_unlink_id
= next_chan_unlink_id();
2472 ucp
->ucp_conn
= conn
;
2473 list_init(&ucp
->ucp_entry
);
2474 list_add_tail(&ucp
->ucp_entry
, &unlink_pending
);
2477 * Put together a mcast packet to notify everyone
2478 * of the channel unlink.
2480 memset(&cpkt
, 0, sizeof(cpkt
));
2482 SERVICE_ID_MAKE(EVT_SERVICE
, MESSAGE_REQ_EXEC_EVT_CHANCMD
);
2483 cpkt
.chc_head
.size
= sizeof(cpkt
);
2484 cpkt
.chc_op
= EVT_UNLINK_CHAN_OP
;
2485 cpkt
.u
.chcu
.chcu_name
= req
->iuc_channel_name
;
2486 cpkt
.u
.chcu
.chcu_unlink_id
= ucp
->ucp_unlink_id
;
2487 chn_iovec
.iov_base
= (char *)&cpkt
;
2488 chn_iovec
.iov_len
= cpkt
.chc_head
.size
;
2489 if (totempg_groups_mcast_joined(openais_group_handle
,
2490 &chn_iovec
, 1, TOTEMPG_AGREED
) == 0) {
2496 list_del(&ucp
->ucp_entry
);
2499 res
.iuc_head
.size
= sizeof(res
);
2500 res
.iuc_head
.id
= MESSAGE_RES_EVT_UNLINK_CHANNEL
;
2501 res
.iuc_head
.error
= error
;
2502 openais_conn_send_response(conn
, &res
, sizeof(res
));
2506 * Subscribe to an event channel.
2508 * - First look up the channel to subscribe.
2509 * - Make sure that the subscription ID is not already in use.
2510 * - Fill in the subscription data structures and add them to the channels
2511 * subscription list.
2512 * - See if there are any events with retetion times that need to be delivered
2513 * because of the new subscription.
2515 static char *filter_types
[] = {
2516 "INVALID FILTER TYPE",
2517 "SA_EVT_PREFIX_FILTER",
2518 "SA_EVT_SUFFIX_FILTER",
2519 "SA_EVT_EXACT_FILTER",
2520 "SA_EVT_PASS_ALL_FILTER",
2524 * saEvtEventSubscribe Handler
2526 static void lib_evt_event_subscribe(void *conn
, void *message
)
2528 struct req_evt_event_subscribe
*req
;
2529 struct res_evt_event_subscribe res
;
2530 mar_evt_event_filter_array_t
*filters
;
2532 struct event_svr_channel_open
*eco
;
2533 struct event_svr_channel_instance
*eci
;
2534 struct event_svr_channel_subscr
*ecs
;
2535 struct event_data
*evt
;
2536 struct list_head
*l
;
2540 struct libevt_pd
*esip
;
2542 esip
= (struct libevt_pd
*)openais_conn_private_data_get(conn
);
2546 log_printf(LOG_LEVEL_DEBUG
,
2547 "saEvtEventSubscribe (Subscribe request)\n");
2548 log_printf(LOG_LEVEL_DEBUG
, "subscription Id: 0x%llx\n",
2549 (unsigned long long)req
->ics_sub_id
);
2552 * look up the channel handle
2554 ret
= hdb_handle_get(&esip
->esi_hdb
, req
->ics_channel_handle
, &ptr
);
2556 error
= SA_AIS_ERR_BAD_HANDLE
;
2561 eci
= eco
->eco_channel
;
2564 * See if the id is already being used
2566 ecs
= find_subscr(eco
, req
->ics_sub_id
);
2568 error
= SA_AIS_ERR_EXIST
;
2572 error
= evtfilt_to_aisfilt(req
, &filters
);
2574 if (error
== SA_AIS_OK
) {
2575 log_printf(LOG_LEVEL_DEBUG
, "Subscribe filters count %d\n",
2576 (int)filters
->filters_number
);
2577 for (i
= 0; i
< filters
->filters_number
; i
++) {
2578 log_printf(LOG_LEVEL_DEBUG
, "type %s(%d) sz %d, <%s>\n",
2579 filter_types
[filters
->filters
[i
].filter_type
],
2580 filters
->filters
[i
].filter_type
,
2581 (int)filters
->filters
[i
].filter
.pattern_size
,
2582 (filters
->filters
[i
].filter
.pattern_size
)
2583 ? (char *)filters
->filters
[i
].filter
.pattern
2588 if (error
!= SA_AIS_OK
) {
2592 ecs
= (struct event_svr_channel_subscr
*)malloc(sizeof(*ecs
));
2594 error
= SA_AIS_ERR_NO_MEMORY
;
2597 ecs
->ecs_filters
= filters
;
2598 ecs
->ecs_sub_id
= req
->ics_sub_id
;
2599 list_init(&ecs
->ecs_entry
);
2600 list_add(&ecs
->ecs_entry
, &eco
->eco_subscr
);
2603 res
.ics_head
.size
= sizeof(res
);
2604 res
.ics_head
.id
= MESSAGE_RES_EVT_SUBSCRIBE
;
2605 res
.ics_head
.error
= error
;
2606 openais_conn_send_response(conn
, &res
, sizeof(res
));
2609 * See if an existing event with a retention time
2610 * needs to be delivered based on this subscription
2612 for (l
= retained_list
.next
; l
!= &retained_list
; l
= l
->next
) {
2613 evt
= list_entry(l
, struct event_data
, ed_retained
);
2614 log_printf(LOG_LEVEL_DEBUG
,
2615 "Checking event ID %llx chanp %p -- sub chanp %p\n",
2616 (unsigned long long)evt
->ed_event
.led_event_id
,
2617 evt
->ed_my_chan
, eci
);
2618 if (evt
->ed_my_chan
== eci
) {
2619 if (evt_already_delivered(evt
, eco
)) {
2622 if (event_match(evt
, ecs
) == SA_AIS_OK
) {
2623 log_printf(LOG_LEVEL_DEBUG
,
2624 "deliver event ID: 0x%llx\n",
2625 (unsigned long long)evt
->ed_event
.led_event_id
);
2626 deliver_event(evt
, eco
, ecs
);
2630 hdb_handle_put(&esip
->esi_hdb
, req
->ics_channel_handle
);
2634 hdb_handle_put(&esip
->esi_hdb
, req
->ics_channel_handle
);
2636 res
.ics_head
.size
= sizeof(res
);
2637 res
.ics_head
.id
= MESSAGE_RES_EVT_SUBSCRIBE
;
2638 res
.ics_head
.error
= error
;
2639 openais_conn_send_response(conn
, &res
, sizeof(res
));
2643 * saEvtEventUnsubscribe Handler
2645 static void lib_evt_event_unsubscribe(void *conn
, void *message
)
2647 struct req_evt_event_unsubscribe
*req
;
2648 struct res_evt_event_unsubscribe res
;
2649 struct event_svr_channel_open
*eco
;
2650 struct event_svr_channel_instance
*eci
;
2651 struct event_svr_channel_subscr
*ecs
;
2652 SaAisErrorT error
= SA_AIS_OK
;
2655 struct libevt_pd
*esip
;
2657 esip
= (struct libevt_pd
*)openais_conn_private_data_get(conn
);
2661 log_printf(LOG_LEVEL_DEBUG
,
2662 "saEvtEventUnsubscribe (Unsubscribe request)\n");
2663 log_printf(LOG_LEVEL_DEBUG
, "subscription Id: 0x%llx\n",
2664 (unsigned long long)req
->icu_sub_id
);
2667 * look up the channel handle, get the open channel
2670 ret
= hdb_handle_get(&esip
->esi_hdb
,
2671 req
->icu_channel_handle
, &ptr
);
2673 error
= SA_AIS_ERR_BAD_HANDLE
;
2678 eci
= eco
->eco_channel
;
2681 * Make sure that the id exists.
2683 ecs
= find_subscr(eco
, req
->icu_sub_id
);
2685 error
= SA_AIS_ERR_NOT_EXIST
;
2689 list_del(&ecs
->ecs_entry
);
2691 log_printf(LOG_LEVEL_DEBUG
,
2692 "unsubscribe from channel %s subscription ID 0x%x "
2693 "with %d filters\n",
2694 eci
->esc_channel_name
.value
,
2695 ecs
->ecs_sub_id
, (int)ecs
->ecs_filters
->filters_number
);
2697 free_filters(ecs
->ecs_filters
);
2701 hdb_handle_put(&esip
->esi_hdb
, req
->icu_channel_handle
);
2703 res
.icu_head
.size
= sizeof(res
);
2704 res
.icu_head
.id
= MESSAGE_RES_EVT_UNSUBSCRIBE
;
2705 res
.icu_head
.error
= error
;
2706 openais_conn_send_response(conn
, &res
, sizeof(res
));
2710 * saEvtEventPublish Handler
2712 static void lib_evt_event_publish(void *conn
, void *message
)
2714 struct lib_event_data
*req
;
2715 struct res_evt_event_publish res
;
2716 struct event_svr_channel_open
*eco
;
2717 struct event_svr_channel_instance
*eci
;
2718 mar_evteventid_t event_id
= 0;
2719 uint64_t msg_id
= 0;
2720 SaAisErrorT error
= SA_AIS_OK
;
2721 struct iovec pub_iovec
;
2725 struct libevt_pd
*esip
;
2727 esip
= (struct libevt_pd
*)openais_conn_private_data_get(conn
);
2732 log_printf(LOG_LEVEL_DEBUG
,
2733 "saEvtEventPublish (Publish event request)\n");
2737 * look up and validate open channel info
2739 ret
= hdb_handle_get(&esip
->esi_hdb
,
2740 req
->led_svr_channel_handle
, &ptr
);
2742 error
= SA_AIS_ERR_BAD_HANDLE
;
2747 eci
= eco
->eco_channel
;
2750 * modify the request structure for sending event data to subscribed
2753 get_event_id(&event_id
, &msg_id
);
2754 req
->led_head
.id
= SERVICE_ID_MAKE(EVT_SERVICE
, MESSAGE_REQ_EXEC_EVT_EVENTDATA
);
2755 req
->led_chan_name
= eci
->esc_channel_name
;
2756 req
->led_event_id
= event_id
;
2757 req
->led_msg_id
= msg_id
;
2758 req
->led_chan_unlink_id
= eci
->esc_unlink_id
;
2761 * Distribute the event.
2762 * The multicasted event will be picked up and delivered
2763 * locally by the local network event receiver.
2765 pub_iovec
.iov_base
= (char *)req
;
2766 pub_iovec
.iov_len
= req
->led_head
.size
;
2767 result
= totempg_groups_mcast_joined(openais_group_handle
, &pub_iovec
, 1, TOTEMPG_AGREED
);
2769 error
= SA_AIS_ERR_LIBRARY
;
2772 hdb_handle_put(&esip
->esi_hdb
, req
->led_svr_channel_handle
);
2774 res
.iep_head
.size
= sizeof(res
);
2775 res
.iep_head
.id
= MESSAGE_RES_EVT_PUBLISH
;
2776 res
.iep_head
.error
= error
;
2777 res
.iep_event_id
= event_id
;
2778 openais_conn_send_response(conn
, &res
, sizeof(res
));
2782 * saEvtEventRetentionTimeClear handler
2784 static void lib_evt_event_clear_retentiontime(void *conn
, void *message
)
2786 struct req_evt_event_clear_retentiontime
*req
;
2787 struct res_evt_event_clear_retentiontime res
;
2788 struct req_evt_chan_command cpkt
;
2789 struct retention_time_clear_pending
*rtc
= 0;
2790 struct iovec rtn_iovec
;
2796 log_printf(RETENTION_TIME_DEBUG
,
2797 "saEvtEventRetentionTimeClear (Clear event retentiontime request)\n");
2798 log_printf(RETENTION_TIME_DEBUG
,
2799 "event ID 0x%llx, chan handle 0x%x\n",
2800 (unsigned long long)req
->iec_event_id
,
2801 req
->iec_channel_handle
);
2803 rtc
= malloc(sizeof(*rtc
));
2805 log_printf(LOG_LEVEL_ERROR
,
2806 "saEvtEventRetentionTimeClear: Memory allocation failure\n");
2807 error
= SA_AIS_ERR_TRY_AGAIN
;
2808 goto evt_ret_clr_err
;
2810 rtc
->rtc_event_id
= req
->iec_event_id
;
2811 rtc
->rtc_conn
= conn
;
2812 list_init(&rtc
->rtc_entry
);
2813 list_add_tail(&rtc
->rtc_entry
, &clear_pending
);
2816 * Send the clear request
2818 memset(&cpkt
, 0, sizeof(cpkt
));
2820 SERVICE_ID_MAKE(EVT_SERVICE
, MESSAGE_REQ_EXEC_EVT_CHANCMD
);
2821 cpkt
.chc_head
.size
= sizeof(cpkt
);
2822 cpkt
.chc_op
= EVT_CLEAR_RET_OP
;
2823 cpkt
.u
.chc_event_id
= req
->iec_event_id
;
2824 rtn_iovec
.iov_base
= (char *)&cpkt
;
2825 rtn_iovec
.iov_len
= cpkt
.chc_head
.size
;
2826 ret
= totempg_groups_mcast_joined(openais_group_handle
,
2827 &rtn_iovec
, 1, TOTEMPG_AGREED
);
2829 // TODO this should really assert
2832 error
= SA_AIS_ERR_LIBRARY
;
2836 list_del(&rtc
->rtc_entry
);
2839 res
.iec_head
.size
= sizeof(res
);
2840 res
.iec_head
.id
= MESSAGE_RES_EVT_CLEAR_RETENTIONTIME
;
2841 res
.iec_head
.error
= error
;
2842 openais_conn_send_response(conn
, &res
, sizeof(res
));
2847 * Send requested event data to the application
2849 static void lib_evt_event_data_get(void *conn
, void *message
)
2851 struct lib_event_data res
;
2852 struct chan_event_list
*cel
;
2853 struct event_data
*edp
;
2855 struct libevt_pd
*esip
;
2857 esip
= (struct libevt_pd
*)openais_conn_private_data_get(conn
);
2861 * Deliver events in publish order within priority
2863 for (i
= SA_EVT_HIGHEST_PRIORITY
; i
<= SA_EVT_LOWEST_PRIORITY
; i
++) {
2864 if (!list_empty(&esip
->esi_events
[i
])) {
2865 cel
= list_entry(esip
->esi_events
[i
].next
, struct chan_event_list
,
2867 list_del(&cel
->cel_entry
);
2868 list_init(&cel
->cel_entry
);
2869 esip
->esi_nevents
--;
2870 if (esip
->esi_queue_blocked
&&
2871 (esip
->esi_nevents
< evt_delivery_queue_resume
)) {
2872 esip
->esi_queue_blocked
= 0;
2873 log_printf(LOG_LEVEL_DEBUG
, "unblock\n");
2875 edp
= cel
->cel_event
;
2876 edp
->ed_event
.led_lib_channel_handle
= cel
->cel_chan_handle
;
2877 edp
->ed_event
.led_sub_id
= cel
->cel_sub_id
;
2878 edp
->ed_event
.led_head
.id
= MESSAGE_RES_EVT_EVENT_DATA
;
2879 edp
->ed_event
.led_head
.error
= SA_AIS_OK
;
2881 openais_conn_send_response(conn
, &edp
->ed_event
,
2882 edp
->ed_event
.led_head
.size
);
2883 free_event_data(edp
);
2888 res
.led_head
.size
= sizeof(res
.led_head
);
2889 res
.led_head
.id
= MESSAGE_RES_EVT_EVENT_DATA
;
2890 res
.led_head
.error
= SA_AIS_ERR_NOT_EXIST
;
2891 openais_conn_send_response(conn
, &res
, res
.led_head
.size
);
2894 * See if there are any events that the app doesn't know about
2895 * because the notify pipe was full.
2898 if (esip
->esi_nevents
) {
2899 __notify_event(conn
);
2904 * Scan the list of channels and remove the specified node.
2906 static void remove_chan_open_info(mar_uint32_t node_id
)
2908 struct list_head
*l
, *nxt
;
2909 struct event_svr_channel_instance
*eci
;
2911 for (l
= esc_head
.next
; l
!= &esc_head
; l
= nxt
) {
2913 eci
= list_entry(l
, struct event_svr_channel_instance
, esc_entry
);
2914 remove_open_count(eci
, node_id
);
2921 * Called when there is a configuration change in the cluster.
2922 * This function looks at any joiners and leavers and updates the evt
2923 * node list. The node list is used to keep track of event IDs
2924 * received for each node for the detection of duplicate events.
2926 static void evt_conf_change(
2927 enum totem_configuration_type configuration_type
,
2928 unsigned int *member_list
, int member_list_entries
,
2929 unsigned int *left_list
, int left_list_entries
,
2930 unsigned int *joined_list
, int joined_list_entries
,
2931 struct memb_ring_id
*ring_id
)
2933 log_printf(RECOVERY_DEBUG
, "Evt conf change %d\n",
2934 configuration_type
);
2935 log_printf(RECOVERY_DEBUG
, "m %d, j %d, l %d\n",
2936 member_list_entries
,
2937 joined_list_entries
,
2941 * Save the various membership lists for later processing by
2942 * the synchronization functions. The left list is only
2943 * valid in the transitional configuration, the joined list is
2944 * only valid in the regular configuration. Other than for the
2945 * purposes of delivering retained events from merging partitions,
2946 * we only care about the final membership from the regular
2949 if (configuration_type
== TOTEM_CONFIGURATION_TRANSITIONAL
) {
2951 left_member_count
= left_list_entries
;
2952 trans_member_count
= member_list_entries
;
2954 if (left_member_list
) {
2955 free(left_member_list
);
2956 left_member_list
= 0;
2958 if (left_list_entries
) {
2960 malloc(sizeof(unsigned int) * left_list_entries
);
2961 if (!left_member_list
) {
2963 * ERROR: No recovery.
2965 log_printf(LOG_LEVEL_ERROR
,
2966 "Config change left list allocation error\n");
2969 memcpy(left_member_list
, left_list
,
2970 sizeof(unsigned int) * left_list_entries
);
2973 if (trans_member_list
) {
2974 free(trans_member_list
);
2975 trans_member_list
= 0;
2977 if (member_list_entries
) {
2979 malloc(sizeof(unsigned int) * member_list_entries
);
2981 if (!trans_member_list
) {
2983 * ERROR: No recovery.
2985 log_printf(LOG_LEVEL_ERROR
,
2986 "Config change transitional member list allocation error\n");
2989 memcpy(trans_member_list
, member_list
,
2990 sizeof(unsigned int) * member_list_entries
);
2994 if (configuration_type
== TOTEM_CONFIGURATION_REGULAR
) {
2996 joined_member_count
= joined_list_entries
;
2997 total_member_count
= member_list_entries
;
2999 if (joined_member_list
) {
3000 free(joined_member_list
);
3001 joined_member_list
= 0;
3003 if (joined_list_entries
) {
3004 joined_member_list
=
3005 malloc(sizeof(unsigned int) * joined_list_entries
);
3006 if (!joined_member_list
) {
3008 * ERROR: No recovery.
3010 log_printf(LOG_LEVEL_ERROR
,
3011 "Config change joined list allocation error\n");
3014 memcpy(joined_member_list
, joined_list
,
3015 sizeof(unsigned int) * joined_list_entries
);
3019 if (current_member_list
) {
3020 free(current_member_list
);
3021 current_member_list
= 0;
3023 if (member_list_entries
) {
3024 current_member_list
=
3025 malloc(sizeof(unsigned int) * member_list_entries
);
3027 if (!current_member_list
) {
3029 * ERROR: No recovery.
3031 log_printf(LOG_LEVEL_ERROR
,
3032 "Config change member list allocation error\n");
3035 memcpy(current_member_list
, member_list
,
3036 sizeof(unsigned int) * member_list_entries
);
3042 * saEvtFinalize Handler
3044 static int evt_lib_exit(void *conn
)
3047 struct event_svr_channel_open
*eco
;
3048 struct list_head
*l
, *nxt
;
3049 struct open_chan_pending
*ocp
;
3050 struct unlink_chan_pending
*ucp
;
3051 struct retention_time_clear_pending
*rtc
;
3052 struct libevt_pd
*esip
=
3053 openais_conn_private_data_get(openais_conn_partner_get(conn
));
3055 log_printf(LOG_LEVEL_DEBUG
, "saEvtFinalize (Event exit request)\n");
3056 log_printf(LOG_LEVEL_DEBUG
, "saEvtFinalize %d evts on list\n",
3060 * Clean up any open channels and associated subscriptions.
3062 for (l
= esip
->esi_open_chans
.next
; l
!= &esip
->esi_open_chans
; l
= nxt
) {
3064 eco
= list_entry(l
, struct event_svr_channel_open
, eco_instance_entry
);
3065 common_chan_close(eco
, esip
);
3066 hdb_handle_destroy(&esip
->esi_hdb
, eco
->eco_my_handle
);
3070 * Clean up any pending async operations
3072 for (l
= open_pending
.next
; l
!= &open_pending
; l
= nxt
) {
3074 ocp
= list_entry(l
, struct open_chan_pending
, ocp_entry
);
3075 if (esip
== openais_conn_private_data_get(ocp
->ocp_conn
)) {
3076 list_del(&ocp
->ocp_entry
);
3081 for (l
= unlink_pending
.next
; l
!= &unlink_pending
; l
= nxt
) {
3083 ucp
= list_entry(l
, struct unlink_chan_pending
, ucp_entry
);
3084 if (esip
== openais_conn_private_data_get(ucp
->ucp_conn
)) {
3085 list_del(&ucp
->ucp_entry
);
3090 for (l
= clear_pending
.next
;
3091 l
!= &clear_pending
; l
= nxt
) {
3093 rtc
= list_entry(l
, struct retention_time_clear_pending
, rtc_entry
);
3094 if (esip
== openais_conn_private_data_get(rtc
->rtc_conn
)) {
3095 list_del(&rtc
->rtc_entry
);
3101 * Destroy the open channel handle database
3103 hdb_destroy(&esip
->esi_hdb
);
3109 * Called at service start time.
3111 static int evt_exec_init(struct objdb_iface_ver0
*objdb
)
3113 unsigned int object_service_handle
;
3116 log_printf(LOG_LEVEL_DEBUG
, "Evt exec init request\n");
3118 objdb
->object_find_reset (OBJECT_PARENT_HANDLE
);
3119 if (objdb
->object_find (
3120 OBJECT_PARENT_HANDLE
,
3123 &object_service_handle
) == 0) {
3126 if ( !objdb
->object_key_get (object_service_handle
,
3127 "delivery_queue_size",
3128 strlen ("delivery_queue_size"),
3131 evt_delivery_queue_size
= atoi(value
);
3132 log_printf(LOG_LEVEL_NOTICE
,
3133 "event delivery_queue_size set to %u\n",
3134 evt_delivery_queue_size
);
3137 if ( !objdb
->object_key_get (object_service_handle
,
3138 "delivery_queue_resume",
3139 strlen ("delivery_queue_resume"),
3142 evt_delivery_queue_resume
= atoi(value
);
3143 log_printf(LOG_LEVEL_NOTICE
,
3144 "event delivery_queue_resume set to %u\n",
3145 evt_delivery_queue_size
);
3150 * Create an event to be sent when we have to drop messages
3151 * for an application.
3153 dropped_event_size
= sizeof(*dropped_event
) + sizeof(dropped_pattern
);
3154 dropped_event
= malloc(dropped_event_size
);
3155 if (dropped_event
== 0) {
3156 log_printf(LOG_LEVEL_ERROR
,
3157 "Memory Allocation Failure, event service not started\n");
3161 memset(dropped_event
, 0, sizeof(*dropped_event
) + sizeof(dropped_pattern
));
3162 dropped_event
->ed_ref_count
= 1;
3163 list_init(&dropped_event
->ed_retained
);
3164 dropped_event
->ed_event
.led_head
.size
=
3165 sizeof(*dropped_event
) + sizeof(dropped_pattern
);
3166 dropped_event
->ed_event
.led_head
.error
= SA_AIS_OK
;
3167 dropped_event
->ed_event
.led_priority
= SA_EVT_HIGHEST_PRIORITY
;
3168 dropped_event
->ed_event
.led_chan_name
= lost_chan
;
3169 dropped_event
->ed_event
.led_publisher_name
= dropped_publisher
;
3170 dropped_event
->ed_event
.led_patterns_number
= 1;
3171 memcpy(&dropped_event
->ed_event
.led_body
[0],
3172 &dropped_pattern
, sizeof(dropped_pattern
));
3177 try_deliver_event(struct event_data
*evt
,
3178 struct event_svr_channel_instance
*eci
)
3180 struct list_head
*l
, *l1
;
3181 struct event_svr_channel_open
*eco
;
3182 struct event_svr_channel_subscr
*ecs
;
3183 int delivered_event
= 0;
3185 * Check open channels
3187 for (l
= eci
->esc_open_chans
.next
; l
!= &eci
->esc_open_chans
; l
= l
->next
) {
3188 eco
= list_entry(l
, struct event_svr_channel_open
, eco_entry
);
3190 * See if enabled to receive
3192 if (!(eco
->eco_flags
& SA_EVT_CHANNEL_SUBSCRIBER
)) {
3197 * Check subscriptions
3199 for (l1
= eco
->eco_subscr
.next
; l1
!= &eco
->eco_subscr
; l1
= l1
->next
) {
3200 ecs
= list_entry(l1
, struct event_svr_channel_subscr
, ecs_entry
);
3202 * Apply filter rules and deliver if patterns
3204 * Only deliver one event per open channel
3206 if (event_match(evt
, ecs
) == SA_AIS_OK
) {
3207 deliver_event(evt
, eco
, ecs
);
3213 return delivered_event
;
3217 * Receive the network event message and distribute it to local subscribers
3219 static void evt_remote_evt(void *msg
, unsigned int nodeid
)
3222 * - retain events that have a retention time
3223 * - Find assocated channel
3224 * - Scan list of subscribers
3226 * - Deliver events that pass the filter test
3228 struct lib_event_data
*evtpkt
= msg
;
3229 struct event_svr_channel_instance
*eci
;
3230 struct event_data
*evt
;
3231 SaClmClusterNodeT
*cn
;
3233 log_printf(LOG_LEVEL_DEBUG
, "Remote event data received from nodeid %s\n",
3234 totempg_ifaces_print (nodeid
));
3237 * See where the message came from so that we can set the
3238 * publishing node id in the message before delivery.
3240 cn
= main_clm_get_by_nodeid(nodeid
);
3243 * Not sure how this can happen...
3245 log_printf(LOG_LEVEL_DEBUG
, "No cluster node data for nodeid %s\n",
3246 totempg_ifaces_print (nodeid
));
3250 log_printf(LOG_LEVEL_DEBUG
, "Cluster node ID %s name %s\n",
3251 totempg_ifaces_print (cn
->nodeId
), cn
->nodeName
.value
);
3253 evtpkt
->led_publisher_node_id
= nodeid
;
3254 evtpkt
->led_nodeid
= nodeid
;
3255 evtpkt
->led_receive_time
= clust_time_now();
3257 if (evtpkt
->led_chan_unlink_id
!= EVT_CHAN_ACTIVE
) {
3258 log_printf(CHAN_UNLINK_DEBUG
,
3259 "evt_remote_evt(0): chan %s, id 0x%llx\n",
3260 evtpkt
->led_chan_name
.value
,
3261 (unsigned long long)evtpkt
->led_chan_unlink_id
);
3263 eci
= find_channel(&evtpkt
->led_chan_name
, evtpkt
->led_chan_unlink_id
);
3265 * We may have had some events that were already queued when an
3266 * unlink happened, if we don't find the channel in the active list
3267 * look for the last unlinked channel of the same name. If this channel
3268 * is re-opened the messages will still be routed correctly because new
3269 * active channel messages will be ordered behind the open.
3271 if (!eci
&& (evtpkt
->led_chan_unlink_id
== EVT_CHAN_ACTIVE
)) {
3272 log_printf(CHAN_UNLINK_DEBUG
,
3273 "evt_remote_evt(1): chan %s, id 0x%llx\n",
3274 evtpkt
->led_chan_name
.value
,
3275 (unsigned long long)evtpkt
->led_chan_unlink_id
);
3276 eci
= find_last_unlinked_channel(&evtpkt
->led_chan_name
);
3280 * We shouldn't normally see an event for a channel that we
3284 log_printf(LOG_LEVEL_DEBUG
, "Channel %s doesn't exist\n",
3285 evtpkt
->led_chan_name
.value
);
3289 if (check_last_event(evtpkt
, nodeid
)) {
3293 evt
= make_local_event(evtpkt
, eci
);
3295 log_printf(LOG_LEVEL_WARNING
,
3296 "1Memory allocation error, can't deliver event\n");
3300 if (evt
->ed_event
.led_retention_time
) {
3304 try_deliver_event(evt
, eci
);
3305 free_event_data(evt
);
3309 * Calculate the remaining retention time of a received event during recovery
3311 inline mar_time_t
calc_retention_time(mar_time_t retention
,
3312 mar_time_t received
, mar_time_t now
)
3314 if ((received
< now
) && ((now
- received
) < retention
)) {
3315 return retention
- (now
- received
);
3322 * Receive a recovery network event message and save it in the retained list
3324 static void evt_remote_recovery_evt(void *msg
, unsigned int nodeid
)
3327 * - calculate remaining retention time
3328 * - Find assocated channel
3329 * - Scan list of subscribers
3331 * - Deliver events that pass the filter test
3333 struct lib_event_data
*evtpkt
= msg
;
3334 struct event_svr_channel_instance
*eci
;
3335 struct event_data
*evt
;
3336 struct member_node_data
*md
;
3340 now
= clust_time_now();
3342 log_printf(RECOVERY_EVENT_DEBUG
,
3343 "Remote recovery event data received from nodeid %d\n", nodeid
);
3345 if (recovery_phase
== evt_recovery_complete
) {
3346 log_printf(RECOVERY_EVENT_DEBUG
,
3347 "Received recovery data, not in recovery mode\n");
3351 log_printf(RECOVERY_EVENT_DEBUG
,
3352 "Processing recovery of retained events\n");
3353 if (recovery_node
) {
3354 log_printf(RECOVERY_EVENT_DEBUG
, "This node is the recovery node\n");
3357 log_printf(RECOVERY_EVENT_DEBUG
, "(1)EVT ID: %llx, Time: %llx\n",
3358 (unsigned long long)evtpkt
->led_event_id
,
3359 (unsigned long long)evtpkt
->led_retention_time
);
3361 * Calculate remaining retention time
3363 evtpkt
->led_retention_time
= calc_retention_time(
3364 evtpkt
->led_retention_time
,
3365 evtpkt
->led_receive_time
,
3368 log_printf(RECOVERY_EVENT_DEBUG
,
3369 "(2)EVT ID: %llx, ret: %llx, rec: %llx, now: %llx\n",
3370 (unsigned long long)evtpkt
->led_event_id
,
3371 (unsigned long long)evtpkt
->led_retention_time
,
3372 (unsigned long long)evtpkt
->led_receive_time
,
3373 (unsigned long long)now
);
3376 * If we haven't seen this event yet and it has remaining time, process
3379 if (!check_last_event(evtpkt
, evtpkt
->led_nodeid
) &&
3380 evtpkt
->led_retention_time
) {
3382 * See where the message came from so that we can set the
3383 * publishing node id in the message before delivery.
3385 md
= evt_find_node(evtpkt
->led_nodeid
);
3388 * Not sure how this can happen
3390 log_printf(LOG_LEVEL_NOTICE
, "No node for nodeid %s\n",
3391 totempg_ifaces_print (evtpkt
->led_nodeid
));
3394 log_printf(LOG_LEVEL_DEBUG
, "Cluster node ID %s name %s\n",
3395 totempg_ifaces_print (md
->mn_node_info
.nodeId
),
3396 md
->mn_node_info
.nodeName
.value
);
3398 log_printf(CHAN_UNLINK_DEBUG
,
3399 "evt_recovery_event: chan %s, id 0x%llx\n",
3400 evtpkt
->led_chan_name
.value
,
3401 (unsigned long long)evtpkt
->led_chan_unlink_id
);
3402 eci
= find_channel(&evtpkt
->led_chan_name
, evtpkt
->led_chan_unlink_id
);
3405 * We shouldn't normally see an event for a channel that we don't
3409 log_printf(RECOVERY_EVENT_DEBUG
, "Channel %s doesn't exist\n",
3410 evtpkt
->led_chan_name
.value
);
3414 evt
= make_local_event(evtpkt
, eci
);
3416 log_printf(LOG_LEVEL_WARNING
,
3417 "2Memory allocation error, can't deliver event\n");
3423 num_delivered
= try_deliver_event(evt
, eci
);
3424 log_printf(RECOVERY_EVENT_DEBUG
, "Delivered to %d subscribers\n",
3426 free_event_data(evt
);
3432 * Timeout handler for event channel open. We flag the structure
3433 * as timed out. Then if the open request is ever returned, we can
3434 * issue a close channel and keep the reference counting correct.
3436 static void chan_open_timeout(void *data
)
3438 struct open_chan_pending
*ocp
= (struct open_chan_pending
*)data
;
3439 struct res_evt_channel_open res
;
3441 res
.ico_head
.size
= sizeof(res
);
3442 res
.ico_head
.id
= MESSAGE_RES_EVT_OPEN_CHANNEL
;
3443 res
.ico_head
.error
= SA_AIS_ERR_TIMEOUT
;
3444 ocp
->ocp_invocation
= OPEN_TIMED_OUT
;
3445 openais_conn_send_response(ocp
->ocp_conn
, &res
, sizeof(res
));
3449 * Called by the channel open exec handler to finish the open and
3450 * respond to the application
3452 static void evt_chan_open_finish(struct open_chan_pending
*ocp
,
3453 struct event_svr_channel_instance
*eci
)
3455 struct event_svr_channel_open
*eco
;
3456 SaAisErrorT error
= SA_AIS_OK
;
3457 unsigned int ret
= 0;
3458 unsigned int timer_del_status
= 0;
3460 uint32_t handle
= 0;
3461 struct libevt_pd
*esip
;
3463 esip
= (struct libevt_pd
*)openais_conn_private_data_get(ocp
->ocp_conn
);
3465 log_printf(CHAN_OPEN_DEBUG
, "Open channel finish %s\n",
3466 get_mar_name_t(&ocp
->ocp_chan_name
));
3467 if (ocp
->ocp_timer_handle
) {
3468 openais_timer_delete (ocp
->ocp_timer_handle
);
3472 * If this is a finished open for a timed out request, then
3473 * send out a close on this channel to clean things up.
3475 if (ocp
->ocp_invocation
== OPEN_TIMED_OUT
) {
3476 log_printf(CHAN_OPEN_DEBUG
, "Closing timed out open of %s\n",
3477 get_mar_name_t(&ocp
->ocp_chan_name
));
3478 error
= evt_close_channel(&ocp
->ocp_chan_name
, EVT_CHAN_ACTIVE
);
3479 if (error
!= SA_AIS_OK
) {
3480 log_printf(CHAN_OPEN_DEBUG
,
3481 "Close of timed out open failed for %s\n",
3482 get_mar_name_t(&ocp
->ocp_chan_name
));
3484 list_del(&ocp
->ocp_entry
);
3490 * Create a handle to give back to the caller to associate
3491 * with this channel open instance.
3493 ret
= hdb_handle_create(&esip
->esi_hdb
, sizeof(*eco
), &handle
);
3497 ret
= hdb_handle_get(&esip
->esi_hdb
, handle
, &ptr
);
3504 * Initailize and link into the global channel structure.
3506 list_init(&eco
->eco_subscr
);
3507 list_init(&eco
->eco_entry
);
3508 list_init(&eco
->eco_instance_entry
);
3509 eco
->eco_flags
= ocp
->ocp_open_flag
;
3510 eco
->eco_channel
= eci
;
3511 eco
->eco_lib_handle
= ocp
->ocp_c_handle
;
3512 eco
->eco_my_handle
= handle
;
3513 eco
->eco_conn
= ocp
->ocp_conn
;
3514 list_add_tail(&eco
->eco_entry
, &eci
->esc_open_chans
);
3515 list_add_tail(&eco
->eco_instance_entry
, &esip
->esi_open_chans
);
3518 * respond back with a handle to access this channel
3519 * open instance for later subscriptions, etc.
3521 hdb_handle_put(&esip
->esi_hdb
, handle
);
3524 log_printf(CHAN_OPEN_DEBUG
, "Open channel finish %s send response %d\n",
3525 get_mar_name_t(&ocp
->ocp_chan_name
),
3527 if (ocp
->ocp_async
) {
3528 struct res_evt_open_chan_async resa
;
3529 resa
.ica_head
.size
= sizeof(resa
);
3530 resa
.ica_head
.id
= MESSAGE_RES_EVT_CHAN_OPEN_CALLBACK
;
3531 resa
.ica_head
.error
= (ret
== 0 ? SA_AIS_OK
: SA_AIS_ERR_BAD_HANDLE
);
3532 resa
.ica_channel_handle
= handle
;
3533 resa
.ica_c_handle
= ocp
->ocp_c_handle
;
3534 resa
.ica_invocation
= ocp
->ocp_invocation
;
3535 openais_conn_send_response(openais_conn_partner_get(ocp
->ocp_conn
),
3536 &resa
, sizeof(resa
));
3538 struct res_evt_channel_open res
;
3539 res
.ico_head
.size
= sizeof(res
);
3540 res
.ico_head
.id
= MESSAGE_RES_EVT_OPEN_CHANNEL
;
3541 res
.ico_head
.error
= (ret
== 0 ? SA_AIS_OK
: SA_AIS_ERR_BAD_HANDLE
);
3542 res
.ico_channel_handle
= handle
;
3543 openais_conn_send_response(ocp
->ocp_conn
, &res
, sizeof(res
));
3546 if (timer_del_status
== 0) {
3547 list_del(&ocp
->ocp_entry
);
3553 * Called by the channel unlink exec handler to
3554 * respond to the application.
3556 static void evt_chan_unlink_finish(struct unlink_chan_pending
*ucp
)
3558 struct res_evt_channel_unlink res
;
3560 log_printf(CHAN_UNLINK_DEBUG
, "Unlink channel finish ID 0x%llx\n",
3561 (unsigned long long)ucp
->ucp_unlink_id
);
3563 list_del(&ucp
->ucp_entry
);
3565 res
.iuc_head
.size
= sizeof(res
);
3566 res
.iuc_head
.id
= MESSAGE_RES_EVT_UNLINK_CHANNEL
;
3567 res
.iuc_head
.error
= SA_AIS_OK
;
3568 openais_conn_send_response(ucp
->ucp_conn
, &res
, sizeof(res
));
3574 * Called by the retention time clear exec handler to
3575 * respond to the application.
3577 static void evt_ret_time_clr_finish(struct retention_time_clear_pending
*rtc
,
3580 struct res_evt_event_clear_retentiontime res
;
3582 log_printf(RETENTION_TIME_DEBUG
, "Retention Time Clear finish ID 0x%llx\n",
3583 (unsigned long long)rtc
->rtc_event_id
);
3585 res
.iec_head
.size
= sizeof(res
);
3586 res
.iec_head
.id
= MESSAGE_RES_EVT_CLEAR_RETENTIONTIME
;
3587 res
.iec_head
.error
= ret
;
3588 openais_conn_send_response(rtc
->rtc_conn
, &res
, sizeof(res
));
3590 list_del(&rtc
->rtc_entry
);
3595 * Take the channel command data and swap the elements so they match
3596 * our architectures word order.
3599 convert_chan_packet(void *msg
)
3601 struct req_evt_chan_command
*cpkt
= (struct req_evt_chan_command
*)msg
;
3604 * converted in the main deliver_fn:
3605 * led_head.id, led_head.size.
3609 cpkt
->chc_op
= swab32(cpkt
->chc_op
);
3612 * Which elements of the packet that are converted depend
3615 switch (cpkt
->chc_op
) {
3617 case EVT_OPEN_CHAN_OP
:
3618 swab_mar_name_t (&cpkt
->u
.chc_chan
.ocr_name
);
3619 cpkt
->u
.chc_chan
.ocr_serial_no
= swab64(cpkt
->u
.chc_chan
.ocr_serial_no
);
3622 case EVT_UNLINK_CHAN_OP
:
3623 case EVT_CLOSE_CHAN_OP
:
3624 swab_mar_name_t (&cpkt
->u
.chcu
.chcu_name
);
3625 cpkt
->u
.chcu
.chcu_unlink_id
= swab64(cpkt
->u
.chcu
.chcu_unlink_id
);
3628 case EVT_CLEAR_RET_OP
:
3629 cpkt
->u
.chc_event_id
= swab64(cpkt
->u
.chc_event_id
);
3633 cpkt
->u
.chc_set_id
.chc_nodeid
=
3634 swab32(cpkt
->u
.chc_set_id
.chc_nodeid
);
3635 cpkt
->u
.chc_set_id
.chc_last_id
= swab64(cpkt
->u
.chc_set_id
.chc_last_id
);
3638 case EVT_OPEN_COUNT
:
3639 swab_mar_name_t (&cpkt
->u
.chc_set_opens
.chc_chan_name
);
3640 cpkt
->u
.chc_set_opens
.chc_open_count
=
3641 swab32(cpkt
->u
.chc_set_opens
.chc_open_count
);
3645 * No data assocaited with these ops.
3648 case EVT_OPEN_COUNT_DONE
:
3652 * Make sure that this function is updated when new ops are added.
3661 * Receive and process remote event operations.
3662 * Used to communicate channel opens/closes, clear retention time,
3663 * config change updates...
3665 static void evt_remote_chan_op(void *msg
, unsigned int nodeid
)
3667 struct req_evt_chan_command
*cpkt
= msg
;
3668 unsigned int local_node
= {SA_CLM_LOCAL_NODE_ID
};
3669 SaClmClusterNodeT
*cn
, *my_node
;
3670 struct member_node_data
*mn
;
3671 struct event_svr_channel_instance
*eci
;
3673 log_printf(REMOTE_OP_DEBUG
, "Remote channel operation request\n");
3674 my_node
= main_clm_get_by_nodeid(local_node
);
3675 log_printf(REMOTE_OP_DEBUG
, "my node ID: 0x%x\n", my_node
->nodeId
);
3677 mn
= evt_find_node(nodeid
);
3679 cn
= main_clm_get_by_nodeid(nodeid
);
3681 log_printf(LOG_LEVEL_WARNING
,
3682 "Evt remote channel op: Node data for nodeid %d is NULL\n",
3686 evt_add_node(nodeid
, cn
);
3687 mn
= evt_find_node(nodeid
);
3691 switch (cpkt
->chc_op
) {
3693 * Open channel remote command. The open channel request is done
3694 * in two steps. When an pplication tries to open, we send an open
3695 * channel message to the other nodes. When we receive an open channel
3696 * message, we may create the channel structure if it doesn't exist
3697 * and also complete applicaiton open requests for the specified
3698 * channel. We keep a counter of total opens for the given channel and
3699 * later when it has been completely closed (everywhere in the cluster)
3700 * we will free up the assocated channel data.
3702 case EVT_OPEN_CHAN_OP
: {
3703 struct open_chan_pending
*ocp
;
3704 struct list_head
*l
, *nxt
;
3706 log_printf(CHAN_OPEN_DEBUG
, "Opening channel %s for node %s\n",
3707 cpkt
->u
.chc_chan
.ocr_name
.value
,
3708 totempg_ifaces_print (mn
->mn_node_info
.nodeId
));
3709 eci
= find_channel(&cpkt
->u
.chc_chan
.ocr_name
, EVT_CHAN_ACTIVE
);
3712 eci
= create_channel(&cpkt
->u
.chc_chan
.ocr_name
);
3715 log_printf(LOG_LEVEL_WARNING
, "Could not create channel %s\n",
3716 get_mar_name_t(&cpkt
->u
.chc_chan
.ocr_name
));
3720 inc_open_count(eci
, mn
->mn_node_info
.nodeId
);
3722 if (mn
->mn_node_info
.nodeId
== my_node
->nodeId
) {
3724 * Complete one of our pending open requests
3726 for (l
= open_pending
.next
; l
!= &open_pending
; l
= nxt
) {
3728 ocp
= list_entry(l
, struct open_chan_pending
, ocp_entry
);
3729 log_printf(CHAN_OPEN_DEBUG
,
3730 "Compare channel req no %llu %llu\n",
3731 (unsigned long long)ocp
->ocp_serial_no
,
3732 (unsigned long long)cpkt
->u
.chc_chan
.ocr_serial_no
);
3733 if (ocp
->ocp_serial_no
== cpkt
->u
.chc_chan
.ocr_serial_no
) {
3734 evt_chan_open_finish(ocp
, eci
);
3739 log_printf(CHAN_OPEN_DEBUG
,
3740 "Open channel %s t %d, l %d, r %d\n",
3741 get_mar_name_t(&eci
->esc_channel_name
),
3742 eci
->esc_total_opens
, eci
->esc_local_opens
,
3743 eci
->esc_retained_count
);
3748 * Handle a channel close. We'll decrement the global open counter and
3749 * free up channel associated data when all instances are closed.
3751 case EVT_CLOSE_CHAN_OP
:
3752 log_printf(LOG_LEVEL_DEBUG
, "Closing channel %s for node 0x%x\n",
3753 cpkt
->u
.chcu
.chcu_name
.value
, mn
->mn_node_info
.nodeId
);
3754 eci
= find_channel(&cpkt
->u
.chcu
.chcu_name
, cpkt
->u
.chcu
.chcu_unlink_id
);
3756 log_printf(LOG_LEVEL_NOTICE
,
3757 "Channel close request for %s not found\n",
3758 cpkt
->u
.chcu
.chcu_name
.value
);
3763 * if last instance, we can free up assocated data.
3765 dec_open_count(eci
, mn
->mn_node_info
.nodeId
);
3766 log_printf(LOG_LEVEL_DEBUG
,
3767 "Close channel %s t %d, l %d, r %d\n",
3768 eci
->esc_channel_name
.value
,
3769 eci
->esc_total_opens
, eci
->esc_local_opens
,
3770 eci
->esc_retained_count
);
3771 delete_channel(eci
);
3775 * Handle a request for channel unlink saEvtChannelUnlink().
3776 * We'll look up the channel and mark it as unlinked. Respond to
3777 * local library unlink commands.
3779 case EVT_UNLINK_CHAN_OP
: {
3780 struct unlink_chan_pending
*ucp
;
3781 struct list_head
*l
, *nxt
;
3783 log_printf(CHAN_UNLINK_DEBUG
,
3784 "Unlink request channel %s unlink ID 0x%llx from node 0x%x\n",
3785 cpkt
->u
.chcu
.chcu_name
.value
,
3786 (unsigned long long)cpkt
->u
.chcu
.chcu_unlink_id
,
3787 mn
->mn_node_info
.nodeId
);
3791 * look up the channel name and get its assoicated data.
3793 eci
= find_channel(&cpkt
->u
.chcu
.chcu_name
,
3796 log_printf(LOG_LEVEL_NOTICE
,
3797 "Channel unlink request for %s not found\n",
3798 cpkt
->u
.chcu
.chcu_name
.value
);
3801 * Mark channel as unlinked.
3803 unlink_channel(eci
, cpkt
->u
.chcu
.chcu_unlink_id
);
3807 * respond only to local library requests.
3809 if (mn
->mn_node_info
.nodeId
== my_node
->nodeId
) {
3811 * Complete one of our pending unlink requests
3813 for (l
= unlink_pending
.next
; l
!= &unlink_pending
; l
= nxt
) {
3815 ucp
= list_entry(l
, struct unlink_chan_pending
, ucp_entry
);
3816 log_printf(CHAN_UNLINK_DEBUG
,
3817 "Compare channel id 0x%llx 0x%llx\n",
3818 (unsigned long long)ucp
->ucp_unlink_id
,
3819 (unsigned long long)cpkt
->u
.chcu
.chcu_unlink_id
);
3820 if (ucp
->ucp_unlink_id
== cpkt
->u
.chcu
.chcu_unlink_id
) {
3821 evt_chan_unlink_finish(ucp
);
3831 * saEvtClearRetentionTime handler.
3833 case EVT_CLEAR_RET_OP
:
3835 SaAisErrorT did_clear
;
3836 struct retention_time_clear_pending
*rtc
;
3837 struct list_head
*l
, *nxt
;
3839 log_printf(RETENTION_TIME_DEBUG
, "Clear retention time request %llx\n",
3840 (unsigned long long)cpkt
->u
.chc_event_id
);
3841 did_clear
= clear_retention_time(cpkt
->u
.chc_event_id
);
3844 * Respond to local library requests
3846 if (mn
->mn_node_info
.nodeId
== my_node
->nodeId
) {
3848 * Complete pending request
3850 for (l
= clear_pending
.next
; l
!= &clear_pending
; l
= nxt
) {
3852 rtc
= list_entry(l
, struct retention_time_clear_pending
,
3854 if (rtc
->rtc_event_id
== cpkt
->u
.chc_event_id
) {
3855 evt_ret_time_clr_finish(rtc
, did_clear
);
3864 * Set our next event ID based on the largest event ID seen
3865 * by others in the cluster. This way, if we've left and re-joined, we'll
3866 * start using an event ID that is unique.
3868 case EVT_SET_ID_OP
: {
3869 int log_level
= LOG_LEVEL_DEBUG
;
3870 if (cpkt
->u
.chc_set_id
.chc_nodeid
== my_node
->nodeId
) {
3871 log_level
= RECOVERY_DEBUG
;
3873 log_printf(log_level
,
3874 "Received Set event ID OP from nodeid %x to %llx for %x my addr %s base %llx\n",
3876 (unsigned long long)cpkt
->u
.chc_set_id
.chc_last_id
,
3877 cpkt
->u
.chc_set_id
.chc_nodeid
,
3878 totempg_ifaces_print (my_node
->nodeId
),
3879 (unsigned long long)base_id
);
3880 if (cpkt
->u
.chc_set_id
.chc_nodeid
== my_node
->nodeId
) {
3881 if (cpkt
->u
.chc_set_id
.chc_last_id
>= base_id
) {
3882 log_printf(RECOVERY_DEBUG
,
3883 "Set event ID from nodeid %s to %llx\n",
3884 totempg_ifaces_print (nodeid
),
3885 (unsigned long long)cpkt
->u
.chc_set_id
.chc_last_id
);
3886 base_id
= cpkt
->u
.chc_set_id
.chc_last_id
+ 1;
3893 * Receive the open count for a particular channel during recovery.
3894 * This insures that everyone has the same notion of who has a channel
3895 * open so that it can be removed when no one else has it open anymore.
3897 case EVT_OPEN_COUNT
:
3898 if (recovery_phase
== evt_recovery_complete
) {
3899 log_printf(LOG_LEVEL_ERROR
,
3900 "Evt open count msg from nodeid %s, but not in membership change\n",
3901 totempg_ifaces_print (nodeid
));
3905 * Zero out all open counts because we're setting then based
3906 * on each nodes local counts.
3908 if (!processed_open_counts
) {
3909 zero_chan_open_counts();
3910 processed_open_counts
= 1;
3912 log_printf(RECOVERY_DEBUG
,
3913 "Open channel count %s is %d for node %s\n",
3914 cpkt
->u
.chc_set_opens
.chc_chan_name
.value
,
3915 cpkt
->u
.chc_set_opens
.chc_open_count
,
3916 totempg_ifaces_print (mn
->mn_node_info
.nodeId
));
3918 eci
= find_channel(&cpkt
->u
.chc_set_opens
.chc_chan_name
,
3921 eci
= create_channel(&cpkt
->u
.chc_set_opens
.chc_chan_name
);
3924 log_printf(LOG_LEVEL_WARNING
, "Could not create channel %s\n",
3925 get_mar_name_t(&cpkt
->u
.chc_set_opens
.chc_chan_name
));
3928 if (set_open_count(eci
, mn
->mn_node_info
.nodeId
,
3929 cpkt
->u
.chc_set_opens
.chc_open_count
)) {
3930 log_printf(LOG_LEVEL_ERROR
,
3931 "Error setting Open channel count %s for node %s\n",
3932 cpkt
->u
.chc_set_opens
.chc_chan_name
.value
,
3933 totempg_ifaces_print (mn
->mn_node_info
.nodeId
));
3938 * Once we get all the messages from
3939 * the current membership, determine who delivers any retained events.
3941 case EVT_OPEN_COUNT_DONE
: {
3942 if (recovery_phase
== evt_recovery_complete
) {
3943 log_printf(LOG_LEVEL_ERROR
,
3944 "Evt config msg from nodeid %s, but not in membership change\n",
3945 totempg_ifaces_print (nodeid
));
3947 log_printf(RECOVERY_DEBUG
,
3948 "Receive EVT_CONF_CHANGE_DONE from nodeid %s members %d checked in %d\n",
3949 totempg_ifaces_print (nodeid
), total_member_count
, checked_in
+1);
3951 log_printf(RECOVERY_DEBUG
,
3952 "NO NODE DATA AVAILABLE FOR nodeid %s\n", totempg_ifaces_print (nodeid
));
3955 if (++checked_in
== total_member_count
) {
3957 * We're all here, now figure out who should send the
3961 if (mn
&& mn
->mn_node_info
.nodeId
== my_node_id
) {
3962 log_printf(RECOVERY_DEBUG
,
3963 "I am oldest in my transitional config\n");
3965 recovery_phase
= evt_send_retained_events
;
3967 recovery_phase
= evt_send_retained_events_done
;
3976 * Count up the nodes again, when all the nodes have responded, we've
3977 * distributed the retained events and we're done with recovery and can
3978 * continue operations.
3980 case EVT_CONF_DONE
: {
3981 log_printf(RECOVERY_DEBUG
,
3982 "Receive EVT_CONF_DONE from nodeid %s, members %d checked in %d\n",
3983 totempg_ifaces_print (nodeid
),
3984 total_member_count
, checked_in
+1);
3985 if (++checked_in
== total_member_count
) {
3987 * All recovery complete, carry on.
3989 recovery_phase
= evt_recovery_complete
;
3990 #ifdef DUMP_CHAN_INFO
3999 log_printf(LOG_LEVEL_NOTICE
, "Invalid channel operation %d\n",
4006 * Set up initial conditions for processing event service
4009 static void evt_sync_init(void)
4011 SaClmClusterNodeT
*cn
;
4012 struct member_node_data
*md
;
4013 unsigned int my_node
= {SA_CLM_LOCAL_NODE_ID
};
4014 int left_list_entries
= left_member_count
;
4015 unsigned int *left_list
= left_member_list
;
4017 log_printf(RECOVERY_DEBUG
, "Evt synchronize initialization\n");
4020 * Set the base event id
4023 cn
= main_clm_get_by_nodeid(my_node
);
4024 log_printf(RECOVERY_DEBUG
, "My node ID %s\n",
4025 totempg_ifaces_print (cn
->nodeId
));
4026 my_node_id
= cn
->nodeId
;
4027 set_event_id(my_node_id
);
4031 * account for nodes that left the membership
4033 while (left_list_entries
--) {
4034 md
= evt_find_node(*left_list
);
4036 log_printf(LOG_LEVEL_WARNING
,
4037 "Can't find cluster node at %s\n",
4038 totempg_ifaces_print (*left_list
));
4040 * Mark this one as down.
4043 log_printf(RECOVERY_DEBUG
, "cluster node at %s down\n",
4044 totempg_ifaces_print(*left_list
));
4046 remove_chan_open_info(md
->mn_node_info
.nodeId
);
4052 * set up for recovery processing, first phase:
4054 recovery_phase
= evt_send_event_id
;
4057 * List used to distribute last know event IDs.
4059 add_list
= current_member_list
;
4060 add_count
= total_member_count
;
4061 processed_open_counts
= 0;
4064 * List used for distributing open channel counts
4066 next_chan
= esc_head
.next
;
4069 * List used for distributing retained events
4071 next_retained
= retained_list
.next
;
4074 * Member check in counts for open channel counts and
4081 * Handle event service recovery. It passes through a number of states to
4082 * finish the recovery.
4084 * First, the node broadcasts the highest event ID that it has seen for any
4085 * joinig node. This helps to make sure that rejoining nodes don't re-use
4086 * event IDs that have already been seen.
4088 * Next, The node broadcasts its open channel information to the other nodes.
4089 * This makes sure that any joining nodes have complete data on any channels
4092 * Once done sending open channel information the node waits in a state for
4093 * the rest of the nodes to finish sending their data. When the last node
4094 * has checked in, then the remote channel operation handler selects the next
4095 * state which is evt_send_retained_events if this is the oldest node in the
4096 * cluster, or otherwise to evt_wait_send_retained_events to wait for the
4097 * retained events to be sent. When the retained events have been sent, the
4098 * state is changed to evt_recovery_complete and this function exits with
4099 * zero to inidicate that recovery is done.
4101 static int evt_sync_process(void)
4104 log_printf(RECOVERY_DEBUG
, "Process Evt synchronization \n");
4106 switch (recovery_phase
) {
4109 * Send last know event ID to joining nodes to prevent duplicate
4112 case evt_send_event_id
:
4114 struct member_node_data
*md
;
4115 SaClmClusterNodeT
*cn
;
4116 struct req_evt_chan_command cpkt
;
4117 struct iovec chn_iovec
;
4120 log_printf(RECOVERY_DEBUG
, "Send max event ID updates\n");
4123 * If we've seen this node before, send out the last msg ID
4124 * that we've seen from him. He will set his base ID for
4125 * generating event and message IDs to the highest one seen.
4127 md
= evt_find_node(*add_list
);
4129 log_printf(RECOVERY_DEBUG
,
4130 "Send set evt ID %llx to %s\n",
4131 (unsigned long long)md
->mn_last_msg_id
,
4132 totempg_ifaces_print (*add_list
));
4134 memset(&cpkt
, 0, sizeof(cpkt
));
4136 SERVICE_ID_MAKE (EVT_SERVICE
, MESSAGE_REQ_EXEC_EVT_CHANCMD
);
4137 cpkt
.chc_head
.size
= sizeof(cpkt
);
4138 cpkt
.chc_op
= EVT_SET_ID_OP
;
4139 cpkt
.u
.chc_set_id
.chc_nodeid
= *add_list
;
4140 cpkt
.u
.chc_set_id
.chc_last_id
= md
->mn_last_msg_id
;
4141 chn_iovec
.iov_base
= (char *)&cpkt
;
4142 chn_iovec
.iov_len
= cpkt
.chc_head
.size
;
4143 res
= totempg_groups_mcast_joined(openais_group_handle
,
4144 &chn_iovec
, 1, TOTEMPG_AGREED
);
4146 log_printf(RECOVERY_DEBUG
,
4147 "Unable to send event id to %s\n",
4148 totempg_ifaces_print (*add_list
));
4150 * We'll try again later.
4157 * Not seen before, add it to our list of nodes.
4159 cn
= main_clm_get_by_nodeid(*add_list
);
4162 * Error: shouldn't happen
4164 log_printf(LOG_LEVEL_ERROR
,
4165 "recovery error node: %s not found\n",
4166 totempg_ifaces_print (*add_list
));
4169 evt_add_node(*add_list
, cn
);
4176 recovery_phase
= evt_send_open_count
;
4181 * Send channel open counts so all members have the same channel open
4184 case evt_send_open_count
:
4186 struct req_evt_chan_command cpkt
;
4187 struct iovec chn_iovec
;
4188 struct event_svr_channel_instance
*eci
;
4191 log_printf(RECOVERY_DEBUG
, "Send open count updates\n");
4193 * Process messages. When we're done, send the done message
4196 memset(&cpkt
, 0, sizeof(cpkt
));
4197 for (;next_chan
!= &esc_head
;
4198 next_chan
= next_chan
->next
) {
4199 log_printf(RECOVERY_DEBUG
, "Sending next open count\n");
4200 eci
= list_entry(next_chan
, struct event_svr_channel_instance
,
4203 SERVICE_ID_MAKE(EVT_SERVICE
, MESSAGE_REQ_EXEC_EVT_CHANCMD
);
4204 cpkt
.chc_head
.size
= sizeof(cpkt
);
4205 cpkt
.chc_op
= EVT_OPEN_COUNT
;
4206 cpkt
.u
.chc_set_opens
.chc_chan_name
= eci
->esc_channel_name
;
4207 cpkt
.u
.chc_set_opens
.chc_open_count
= eci
->esc_local_opens
;
4208 chn_iovec
.iov_base
= (char *)&cpkt
;
4209 chn_iovec
.iov_len
= cpkt
.chc_head
.size
;
4210 res
= totempg_groups_mcast_joined(openais_group_handle
,
4211 &chn_iovec
, 1, TOTEMPG_AGREED
);
4220 memset(&cpkt
, 0, sizeof(cpkt
));
4222 SERVICE_ID_MAKE(EVT_SERVICE
, MESSAGE_REQ_EXEC_EVT_CHANCMD
);
4223 cpkt
.chc_head
.size
= sizeof(cpkt
);
4224 cpkt
.chc_op
= EVT_OPEN_COUNT_DONE
;
4225 chn_iovec
.iov_base
= (char *)&cpkt
;
4226 chn_iovec
.iov_len
= cpkt
.chc_head
.size
;
4227 res
= totempg_groups_mcast_joined(openais_group_handle
,
4228 &chn_iovec
, 1,TOTEMPG_AGREED
);
4235 log_printf(RECOVERY_DEBUG
, "DONE Sending open counts\n");
4237 recovery_phase
= evt_wait_open_count_done
;
4242 * Wait for all nodes to finish sending open updates before proceding.
4243 * the EVT_OPEN_COUNT_DONE handler will set the state to
4244 * evt_send_retained_events to get us out of this.
4246 case evt_wait_open_count_done
:
4248 log_printf(RECOVERY_DEBUG
, "Wait for open count done\n");
4253 * If I'm the oldest node, send out retained events so that new nodes
4254 * have all the information.
4256 case evt_send_retained_events
:
4258 struct iovec chn_iovec
;
4259 struct event_data
*evt
;
4262 log_printf(RECOVERY_DEBUG
, "Send retained event updates\n");
4265 * Process messages. When we're done, send the done message
4268 for (;next_retained
!= &retained_list
;
4269 next_retained
= next_retained
->next
) {
4270 log_printf(LOG_LEVEL_DEBUG
, "Sending next retained event\n");
4271 evt
= list_entry(next_retained
, struct event_data
, ed_retained
);
4272 evt
->ed_event
.led_head
.id
=
4273 SERVICE_ID_MAKE(EVT_SERVICE
, MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA
);
4274 chn_iovec
.iov_base
= (char *)&evt
->ed_event
;
4275 chn_iovec
.iov_len
= evt
->ed_event
.led_head
.size
;
4276 res
= totempg_groups_mcast_joined(openais_group_handle
,
4277 &chn_iovec
, 1, TOTEMPG_AGREED
);
4287 recovery_phase
= evt_send_retained_events_done
;
4291 case evt_send_retained_events_done
:
4293 struct req_evt_chan_command cpkt
;
4294 struct iovec chn_iovec
;
4297 log_printf(RECOVERY_DEBUG
, "DONE Sending retained events\n");
4298 memset(&cpkt
, 0, sizeof(cpkt
));
4300 SERVICE_ID_MAKE(EVT_SERVICE
, MESSAGE_REQ_EXEC_EVT_CHANCMD
);
4301 cpkt
.chc_head
.size
= sizeof(cpkt
);
4302 cpkt
.chc_op
= EVT_CONF_DONE
;
4303 chn_iovec
.iov_base
= (char *)&cpkt
;
4304 chn_iovec
.iov_len
= cpkt
.chc_head
.size
;
4305 res
= totempg_groups_mcast_joined(openais_group_handle
,
4306 &chn_iovec
, 1, TOTEMPG_AGREED
);
4308 recovery_phase
= evt_wait_send_retained_events
;
4313 * Wait for send of retained events to finish
4314 * the EVT_CONF_DONE handler will set the state to
4315 * evt_recovery_complete to get us out of this.
4317 case evt_wait_send_retained_events
:
4319 log_printf(RECOVERY_DEBUG
, "Wait for retained events\n");
4323 case evt_recovery_complete
:
4325 log_printf(RECOVERY_DEBUG
, "Recovery complete\n");
4330 log_printf(LOG_LEVEL_WARNING
, "Bad recovery phase state: %u\n",
4332 recovery_phase
= evt_recovery_complete
;
4340 * Not used at this time
4342 static void evt_sync_activate(void)
4344 log_printf(RECOVERY_DEBUG
, "Evt synchronize activation\n");
4348 * Not used at this time
4350 static void evt_sync_abort(void)
4352 log_printf(RECOVERY_DEBUG
, "Abort Evt synchronization\n");
4356 * vi: set autoindent tabstop=4 shiftwidth=4 :