Allow make from the exec directory.
[openais.git] / exec / evt.c
blob13fb20dee09df4de125ca7e5aa2e3594f3c44239
1 /*
2 * Copyright (c) 2004-2006 Mark Haverkamp
3 * Copyright (c) 2004-2006 Open Source Development Lab
4 * Copyright (c) 2006 Sun Microsystems, Inc.
6 * All rights reserved.
8 * This software licensed under BSD license, the text of which follows:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions are met:
13 * - Redistributions of source code must retain the above copyright notice,
14 * this list of conditions and the following disclaimer.
15 * - Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 * - Neither the name of the Open Source Development Lab nor the names of its
19 * contributors may be used to endorse or promote products derived from this
20 * software without specific prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
26 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
32 * THE POSSIBILITY OF SUCH DAMAGE.
35 #define DUMP_CHAN_INFO
36 #define RECOVERY_EVENT_DEBUG LOG_LEVEL_DEBUG
37 #define RECOVERY_DEBUG LOG_LEVEL_DEBUG
38 #define CHAN_DEL_DEBUG LOG_LEVEL_DEBUG
39 #define CHAN_OPEN_DEBUG LOG_LEVEL_DEBUG
40 #define CHAN_UNLINK_DEBUG LOG_LEVEL_DEBUG
41 #define REMOTE_OP_DEBUG LOG_LEVEL_DEBUG
42 #define RETENTION_TIME_DEBUG LOG_LEVEL_DEBUG
44 #include <sys/types.h>
45 #include <stdlib.h>
46 #include <errno.h>
47 #include <sys/time.h>
48 #include <sys/uio.h>
49 #include <sys/socket.h>
50 #include <netinet/in.h>
51 #include <arpa/inet.h>
52 #include "../include/hdb.h"
53 #include "../include/ipc_evt.h"
54 #include "../include/list.h"
55 #include "../include/queue.h"
56 #include "../lcr/lcr_comp.h"
57 #include "util.h"
58 #include "objdb.h"
59 #include "totem.h"
60 #include "service.h"
61 #include "mempool.h"
62 #include "main.h"
63 #include "flow.h"
64 #include "tlist.h"
65 #include "ipc.h"
66 #include "totempg.h"
67 #include "swab.h"
68 #include "logsys.h"
69 #include "tlist.h"
70 #include "timer.h"
72 LOGSYS_DECLARE_SUBSYS ("EVT", LOG_INFO);
74 * event instance structure. Contains information about the
75 * active connection to the API library.
77 * esi_version: Version that the library is running.
78 * esi_open_chans: list of open channels associated with this
79 * instance. Used to clean up any data left
80 * allocated when the finalize is done.
81 * (event_svr_channel_open.eco_instance_entry)
82 * esi_events: list of pending events to be delivered on this
83 * instance (struct chan_event_list.cel_entry)
84 * esi_queue_blocked: non-zero if the delivery queue got too full
85 * and we're blocking new messages until we
86 * drain some of the queued messages.
87 * esi_nevents: Number of events in events lists to be sent.
88 * esi_hdb: Handle data base for open channels on this
89 * instance. Used for a quick lookup of
90 * open channel data from a lib api message.
92 struct libevt_pd {
93 SaVersionT esi_version;
94 struct list_head esi_open_chans;
95 struct list_head esi_events[SA_EVT_LOWEST_PRIORITY+1];
96 int esi_nevents;
97 int esi_queue_blocked;
98 struct hdb_handle_database esi_hdb;
102 enum evt_message_req_types {
103 MESSAGE_REQ_EXEC_EVT_EVENTDATA = 0,
104 MESSAGE_REQ_EXEC_EVT_CHANCMD = 1,
105 MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA = 2
108 static void lib_evt_open_channel(void *conn, void *message);
109 static void lib_evt_open_channel_async(void *conn, void *message);
110 static void lib_evt_close_channel(void *conn, void *message);
111 static void lib_evt_unlink_channel(void *conn, void *message);
112 static void lib_evt_event_subscribe(void *conn, void *message);
113 static void lib_evt_event_unsubscribe(void *conn, void *message);
114 static void lib_evt_event_publish(void *conn, void *message);
115 static void lib_evt_event_clear_retentiontime(void *conn, void *message);
116 static void lib_evt_event_data_get(void *conn, void *message);
118 static void evt_conf_change(
119 enum totem_configuration_type configuration_type,
120 unsigned int *member_list, int member_list_entries,
121 unsigned int *left_list, int left_list_entries,
122 unsigned int *joined_list, int joined_list_entries,
123 struct memb_ring_id *ring_id);
125 static int evt_lib_init(void *conn);
126 static int evt_lib_exit(void *conn);
127 static int evt_exec_init(struct objdb_iface_ver0 *objdb);
130 * Recovery sync functions
132 static void evt_sync_init(void);
133 static int evt_sync_process(void);
134 static void evt_sync_activate(void);
135 static void evt_sync_abort(void);
137 static void convert_event(void *msg);
138 static void convert_chan_packet(void *msg);
140 static struct openais_lib_handler evt_lib_service[] = {
142 .lib_handler_fn = lib_evt_open_channel,
143 .response_size = sizeof(struct res_evt_channel_open),
144 .response_id = MESSAGE_RES_EVT_OPEN_CHANNEL,
145 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
148 .lib_handler_fn = lib_evt_open_channel_async,
149 .response_size = sizeof(struct res_evt_channel_open),
150 .response_id = MESSAGE_RES_EVT_OPEN_CHANNEL,
151 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
154 .lib_handler_fn = lib_evt_close_channel,
155 .response_size = sizeof(struct res_evt_channel_close),
156 .response_id = MESSAGE_RES_EVT_CLOSE_CHANNEL,
157 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
160 .lib_handler_fn = lib_evt_unlink_channel,
161 .response_size = sizeof(struct res_evt_channel_unlink),
162 .response_id = MESSAGE_RES_EVT_UNLINK_CHANNEL,
163 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
166 .lib_handler_fn = lib_evt_event_subscribe,
167 .response_size = sizeof(struct res_evt_event_subscribe),
168 .response_id = MESSAGE_RES_EVT_SUBSCRIBE,
169 .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED
172 .lib_handler_fn = lib_evt_event_unsubscribe,
173 .response_size = sizeof(struct res_evt_event_unsubscribe),
174 .response_id = MESSAGE_RES_EVT_UNSUBSCRIBE,
175 .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED
178 .lib_handler_fn = lib_evt_event_publish,
179 .response_size = sizeof(struct res_evt_event_publish),
180 .response_id = MESSAGE_RES_EVT_PUBLISH,
181 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
184 .lib_handler_fn = lib_evt_event_clear_retentiontime,
185 .response_size = sizeof(struct res_evt_event_clear_retentiontime),
186 .response_id = MESSAGE_RES_EVT_CLEAR_RETENTIONTIME,
187 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
190 .lib_handler_fn = lib_evt_event_data_get,
191 .response_size = sizeof(struct lib_event_data),
192 .response_id = MESSAGE_RES_EVT_EVENT_DATA,
193 .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED
198 static void evt_remote_evt(void *msg, unsigned int nodeid);
199 static void evt_remote_recovery_evt(void *msg, unsigned int nodeid);
200 static void evt_remote_chan_op(void *msg, unsigned int nodeid);
202 static struct openais_exec_handler evt_exec_service[] = {
204 .exec_handler_fn = evt_remote_evt,
205 .exec_endian_convert_fn = convert_event
208 .exec_handler_fn = evt_remote_chan_op,
209 .exec_endian_convert_fn = convert_chan_packet
212 .exec_handler_fn = evt_remote_recovery_evt,
213 .exec_endian_convert_fn = convert_event
217 struct openais_service_handler evt_service_handler = {
218 .name = "openais event service B.01.01",
219 .id = EVT_SERVICE,
220 .private_data_size = sizeof (struct libevt_pd),
221 .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED,
222 .lib_init_fn = evt_lib_init,
223 .lib_exit_fn = evt_lib_exit,
224 .lib_service = evt_lib_service,
225 .lib_service_count = sizeof(evt_lib_service) / sizeof(struct openais_lib_handler),
226 .exec_init_fn = evt_exec_init,
227 .exec_service = evt_exec_service,
228 .exec_service_count = sizeof(evt_exec_service) / sizeof(struct openais_exec_handler),
229 .exec_dump_fn = NULL,
230 .confchg_fn = evt_conf_change,
231 .sync_init = evt_sync_init,
232 .sync_process = evt_sync_process,
233 .sync_activate = evt_sync_activate,
234 .sync_abort = evt_sync_abort
237 static struct openais_service_handler *evt_get_handler_ver0(void);
239 static struct openais_service_handler_iface_ver0 evt_service_handler_iface = {
240 .openais_get_service_handler_ver0 = evt_get_handler_ver0
243 static struct lcr_iface openais_evt_ver0[1] = {
245 .name = "openais_evt",
246 .version = 0,
247 .versions_replace = 0,
248 .versions_replace_count = 0,
249 .dependencies = 0,
250 .dependency_count = 0,
251 .constructor = NULL,
252 .destructor = NULL,
253 .interfaces = NULL,
257 static struct lcr_comp evt_comp_ver0 = {
258 .iface_count = 1,
259 .ifaces = openais_evt_ver0
262 static struct openais_service_handler *evt_get_handler_ver0(void)
264 return (&evt_service_handler);
267 __attribute__ ((constructor)) static void evt_comp_register (void) {
268 lcr_interfaces_set (&openais_evt_ver0[0], &evt_service_handler_iface);
270 lcr_component_register (&evt_comp_ver0);
274 * MESSAGE_REQ_EXEC_EVT_CHANCMD
276 * Used for various event related operations.
279 enum evt_chan_ops {
280 EVT_OPEN_CHAN_OP, /* chc_chan */
281 EVT_CLOSE_CHAN_OP, /* chc_close_unlink_chan */
282 EVT_UNLINK_CHAN_OP, /* chc_close_unlink_chan */
283 EVT_CLEAR_RET_OP, /* chc_event_id */
284 EVT_SET_ID_OP, /* chc_set_id */
285 EVT_CONF_DONE, /* no data used */
286 EVT_OPEN_COUNT, /* chc_set_opens */
287 EVT_OPEN_COUNT_DONE /* no data used */
291 * Used during recovery to set the next issued event ID
292 * based on the highest ID seen by any of the members
294 struct evt_set_id {
295 mar_uint32_t chc_nodeid __attribute__((aligned(8)));
296 mar_uint64_t chc_last_id __attribute__((aligned(8)));
300 * For set open count used during recovery to syncronize all nodes
302 * chc_chan_name: Channel name.
303 * chc_open_count: number of local opens of this channel.
305 struct evt_set_opens {
306 mar_name_t chc_chan_name __attribute__((aligned(8)));
307 mar_uint32_t chc_open_count __attribute__((aligned(8)));
311 * Used to communicate channel to close or unlink.
313 #define EVT_CHAN_ACTIVE 0
314 struct evt_close_unlink_chan {
315 mar_name_t chcu_name __attribute__((aligned(8)));
316 mar_uint64_t chcu_unlink_id __attribute__((aligned(8)));
319 struct open_chan_req {
320 mar_name_t ocr_name __attribute__((aligned(8)));
321 mar_uint64_t ocr_serial_no __attribute__((aligned(8)));
325 * Sent via MESSAGE_REQ_EXEC_EVT_CHANCMD
327 * chc_head: Request head
328 * chc_op: Channel operation (open, close, clear retentiontime)
329 * u: union of operation options.
331 struct req_evt_chan_command {
332 mar_req_header_t chc_head __attribute__((aligned(8)));
333 mar_uint32_t chc_op __attribute__((aligned(8)));
334 union {
335 struct open_chan_req chc_chan __attribute__((aligned(8)));
336 mar_evteventid_t chc_event_id __attribute__((aligned(8)));
337 struct evt_set_id chc_set_id __attribute__((aligned(8)));
338 struct evt_set_opens chc_set_opens __attribute__((aligned(8)));
339 struct evt_close_unlink_chan chcu __attribute__((aligned(8)));
340 } u;
344 * list of all retained events
345 * struct event_data
347 static DECLARE_LIST_INIT(retained_list);
350 * list of all event channel information
351 * struct event_svr_channel_instance
353 static DECLARE_LIST_INIT(esc_head);
356 * list of all unlinked event channel information
357 * struct event_svr_channel_instance
359 static DECLARE_LIST_INIT(esc_unlinked_head);
362 * Track the state of event service recovery.
364 * evt_recovery_complete: Normal operational mode
366 * evt_send_event_id: Node is sending known last
367 * event IDs.
369 * evt_send_open_count: Node is sending its open
370 * Channel information.
372 * evt_wait_open_count_done: Node is done sending open channel data and
373 * is waiting for the other nodes to finish.
375 * evt_send_retained_events: Node is sending retained event data.
377 * evt_send_retained_events_done: Node is sending done message.
379 * evt_wait_send_retained_events: Node is waiting for other nodes to
380 * finish sending retained event data.
382 enum recovery_phases {
383 evt_recovery_complete,
384 evt_send_event_id,
385 evt_send_open_count,
386 evt_wait_open_count_done,
387 evt_send_retained_events,
388 evt_send_retained_events_done,
389 evt_wait_send_retained_events
393 * Global varaibles used by the event service
395 * base_id_top: upper bits of next event ID to assign
396 * base_id: Lower bits of Next event ID to assign
397 * my_node_id: My cluster node id
398 * checked_in: keep track during config change.
399 * recovery_node: True if we're the recovery node. i.e. the
400 * node that sends the retained events.
401 * next_retained: pointer to next retained message to send
402 * during recovery.
403 * next_chan: pointer to next channel to send during recovery.
404 * recovery_phase: Indicates what recovery is taking place.
405 * left_member_count: How many left this configuration.
406 * left_member_list: Members that left this config
407 * joined_member_count: How many joined this configuration.
408 * joined_member_list: Members that joined this config
409 * total_member_count: how many members in this cluster
410 * current_member_list: Total membership this config
411 * trans_member_count: Node count in transitional membership
412 * trans_member_list: Total membership from the transitional membership
413 * add_count: count of joined members used for sending event id
414 * recovery data.
415 * add_list: pointer to joined list used for sending event id
416 * recovery data.
417 * processed_open_counts: Flag used to coordinate clearing open
418 * channel counts for config change recovery.
422 #define BASE_ID_MASK 0xffffffffLL
423 static mar_evteventid_t base_id = 1;
424 static mar_evteventid_t base_id_top = 0;
425 static mar_uint32_t my_node_id = 0;
426 static int checked_in = 0;
427 static int recovery_node = 0;
428 static struct list_head *next_retained = 0;
429 static struct list_head *next_chan = 0;
430 static enum recovery_phases recovery_phase = evt_recovery_complete;
431 static int left_member_count = 0;
432 static unsigned int *left_member_list = 0;
433 static int joined_member_count = 0;
434 static unsigned int *joined_member_list = 0;
435 static int total_member_count = 0;
436 static unsigned int *current_member_list = 0;
437 static int trans_member_count = 0;
438 static unsigned int *trans_member_list = 0;
439 static int add_count = 0;
440 static unsigned int *add_list = 0;
441 static int processed_open_counts = 0;
444 * Structure to track pending channel open requests.
445 * ocp_async: 1 for async open
446 * ocp_invocation: invocation for async open
447 * ocp_chan_name: requested channel
448 * ocp_conn: conn for returning to the library.
449 * ocp_open_flags: channel open flags
450 * ocp_timer_handle: timer handle for sync open
451 * ocp_serial_no: Identifier for the request
452 * ocp_entry: list entry for pending open list.
454 struct open_chan_pending {
455 int ocp_async;
456 mar_invocation_t ocp_invocation;
457 mar_name_t ocp_chan_name;
458 void *ocp_conn;
459 mar_evtchannelopenflags_t ocp_open_flag;
460 timer_handle ocp_timer_handle;
461 uint64_t ocp_c_handle;
462 uint64_t ocp_serial_no;
463 struct list_head ocp_entry;
465 static uint64_t open_serial_no = 0;
468 * code to indicate that the open request has timed out. The
469 * invocation data element is used for this code since it is
470 * only used by the open async call which cannot have a timeout.
472 #define OPEN_TIMED_OUT 0x5a6b5a6b5a6b5a6bLLU
475 * list of pending channel opens
477 static DECLARE_LIST_INIT(open_pending);
478 static void chan_open_timeout(void *data);
481 * Structure to track pending channel unlink requests.
482 * ucp_unlink_id: unlink ID of unlinked channel.
483 * ucp_conn: conn for returning to the library.
484 * ucp_entry: list entry for pending unlink list.
486 struct unlink_chan_pending {
487 uint64_t ucp_unlink_id;
488 void *ucp_conn;
489 struct list_head ucp_entry;
493 * list of pending unlink requests
495 static DECLARE_LIST_INIT(unlink_pending);
498 * Structure to track pending retention time clear requests.
499 * rtc_event_id: event ID to clear.
500 * rtc_conn: conn for returning to the library.
501 * rtc_entry: list entry for pending clear list.
503 struct retention_time_clear_pending {
504 mar_evteventid_t rtc_event_id;
505 void *rtc_conn;
506 struct list_head rtc_entry;
510 * list of pending clear requests.
512 static DECLARE_LIST_INIT(clear_pending);
515 * Next unlink ID
517 static uint64_t base_unlink_id = 0;
518 inline uint64_t
519 next_chan_unlink_id()
521 uint64_t uid = my_node_id;
522 uid = (uid << 32ULL) | base_unlink_id;
523 base_unlink_id = (base_unlink_id + 1ULL) & BASE_ID_MASK;
524 return uid;
527 #define min(a,b) ((a) < (b) ? (a) : (b))
530 * Throttle event delivery to applications to keep
531 * the exec from using too much memory if the app is
532 * slow to process its events.
534 #define MAX_EVT_DELIVERY_QUEUE 1000
535 #define MIN_EVT_QUEUE_RESUME (MAX_EVT_DELIVERY_QUEUE / 2)
537 static unsigned int evt_delivery_queue_size = MAX_EVT_DELIVERY_QUEUE;
538 static unsigned int evt_delivery_queue_resume = MIN_EVT_QUEUE_RESUME;
541 #define LOST_PUB "EVENT_SERIVCE"
542 #define LOST_CHAN "LOST EVENT"
544 * Event to send when the delivery queue gets too full
546 char lost_evt[] = SA_EVT_LOST_EVENT;
547 static int dropped_event_size;
548 static struct event_data *dropped_event;
549 struct evt_pattern {
550 mar_evt_event_pattern_t pat;
551 char str[sizeof(lost_evt)];
553 static struct evt_pattern dropped_pattern = {
554 .pat = {
555 sizeof(lost_evt),
556 sizeof(lost_evt),
557 (SaUint8T *) &dropped_pattern.str[0]},
558 .str = {SA_EVT_LOST_EVENT}
561 mar_name_t lost_chan = {
562 .value = LOST_CHAN,
563 .length = sizeof(LOST_CHAN)
566 mar_name_t dropped_publisher = {
567 .value = LOST_PUB,
568 .length = sizeof(LOST_PUB)
571 struct event_svr_channel_open;
572 struct event_svr_channel_subscr;
574 struct open_count {
575 mar_uint32_t oc_node_id;
576 int32_t oc_open_count;
580 * Structure to contain global channel releated information
582 * esc_channel_name: The name of this channel.
583 * esc_total_opens: The total number of opens on this channel including
584 * other nodes.
585 * esc_local_opens: The number of opens on this channel for this node.
586 * esc_oc_size: The total number of entries in esc_node_opens;
587 * esc_node_opens: list of node IDs and how many opens are associated.
588 * esc_retained_count: How many retained events for this channel
589 * esc_open_chans: list of opens of this channel.
590 * (event_svr_channel_open.eco_entry)
591 * esc_entry: links to other channels. (used by esc_head)
592 * esc_unlink_id: If non-zero, then the channel has been marked
593 * for unlink. This unlink ID is used to
594 * mark events still associated with current openings
595 * so they get delivered to the proper recipients.
597 struct event_svr_channel_instance {
598 mar_name_t esc_channel_name;
599 int32_t esc_total_opens;
600 int32_t esc_local_opens;
601 uint32_t esc_oc_size;
602 struct open_count *esc_node_opens;
603 uint32_t esc_retained_count;
604 struct list_head esc_open_chans;
605 struct list_head esc_entry;
606 uint64_t esc_unlink_id;
610 * Has the event data in the correct format to send to the library API
611 * with aditional field for accounting.
613 * ed_ref_count: how many other strutures are referencing.
614 * ed_retained: retained event list.
615 * ed_timer_handle: Timer handle for retained event expiration.
616 * ed_delivered: arrays of open channel pointers that this event
617 * has been delivered to. (only used for events
618 * with a retention time).
619 * ed_delivered_count: Number of entries available in ed_delivered.
620 * ed_delivered_next: Next free spot in ed_delivered
621 * ed_my_chan: pointer to the global channel instance associated
622 * with this event.
623 * ed_event: The event data formatted to be ready to send.
625 struct event_data {
626 uint32_t ed_ref_count;
627 struct list_head ed_retained;
628 timer_handle ed_timer_handle;
629 struct event_svr_channel_open **ed_delivered;
630 uint32_t ed_delivered_count;
631 uint32_t ed_delivered_next;
632 struct event_svr_channel_instance *ed_my_chan;
633 struct lib_event_data ed_event;
637 * Contains a list of pending events to be delivered to a subscribed
638 * application.
640 * cel_chan_handle: associated library channel handle
641 * cel_sub_id: associated library subscription ID
642 * cel_event: event structure to deliver.
643 * cel_entry: list of pending events
644 * (struct event_server_instance.esi_events)
646 struct chan_event_list {
647 uint64_t cel_chan_handle;
648 uint32_t cel_sub_id;
649 struct event_data* cel_event;
650 struct list_head cel_entry;
654 * Contains information about each open for a given channel
656 * eco_flags: How the channel was opened.
657 * eco_lib_handle: channel handle in the app. Used for event delivery.
658 * eco_my_handle: the handle used to access this data structure.
659 * eco_channel: Pointer to global channel info.
660 * eco_entry: links to other opeinings of this channel.
661 * eco_instance_entry: links to other channel opeinings for the
662 * associated server instance.
663 * eco_subscr: head of list of sbuscriptions for this channel open.
664 * (event_svr_channel_subscr.ecs_entry)
665 * eco_conn: refrence to EvtInitialize who owns this open.
667 struct event_svr_channel_open {
668 uint8_t eco_flags;
669 uint64_t eco_lib_handle;
670 uint32_t eco_my_handle;
671 struct event_svr_channel_instance *eco_channel;
672 struct list_head eco_entry;
673 struct list_head eco_instance_entry;
674 struct list_head eco_subscr;
675 void *eco_conn;
679 * Contains information about each channel subscription
681 * ecs_open_chan: Link to our open channel.
682 * ecs_sub_id: Subscription ID.
683 * ecs_filter_count: number of filters in ecs_filters
684 * ecs_filters: filters for determining event delivery.
685 * ecs_entry: Links to other subscriptions to this channel opening.
687 struct event_svr_channel_subscr {
688 struct event_svr_channel_open *ecs_open_chan;
689 uint32_t ecs_sub_id;
690 mar_evt_event_filter_array_t *ecs_filters;
691 struct list_head ecs_entry;
696 * Member node data
697 * mn_node_info: cluster node info from membership
698 * mn_last_msg_id: last seen message ID for this node
699 * mn_started: Indicates that event service has started
700 * on this node.
701 * mn_next: pointer to the next node in the hash chain.
702 * mn_entry: List of all nodes.
704 struct member_node_data {
705 unsigned int mn_nodeid;
706 SaClmClusterNodeT mn_node_info;
707 mar_evteventid_t mn_last_msg_id;
708 mar_uint32_t mn_started;
709 struct member_node_data *mn_next;
710 struct list_head mn_entry;
712 DECLARE_LIST_INIT(mnd);
714 * Take the filters we received from the application via the library and
715 * make them into a real mar_evt_event_filter_array_t
717 static SaAisErrorT evtfilt_to_aisfilt(struct req_evt_event_subscribe *req,
718 mar_evt_event_filter_array_t **evtfilters)
721 mar_evt_event_filter_array_t *filta =
722 (mar_evt_event_filter_array_t *)req->ics_filter_data;
723 mar_evt_event_filter_array_t *filters;
724 mar_evt_event_filter_t *filt = (void *)filta + sizeof(mar_evt_event_filter_array_t);
725 SaUint8T *str = (void *)filta + sizeof(mar_evt_event_filter_array_t) +
726 (sizeof(mar_evt_event_filter_t) * filta->filters_number);
727 int i;
728 int j;
730 filters = malloc(sizeof(mar_evt_event_filter_array_t));
731 if (!filters) {
732 return SA_AIS_ERR_NO_MEMORY;
735 filters->filters_number = filta->filters_number;
736 filters->filters = malloc(sizeof(mar_evt_event_filter_t) *
737 filta->filters_number);
738 if (!filters->filters) {
739 free(filters);
740 return SA_AIS_ERR_NO_MEMORY;
743 for (i = 0; i < filters->filters_number; i++) {
744 filters->filters[i].filter.pattern =
745 malloc(filt[i].filter.pattern_size);
747 if (!filters->filters[i].filter.pattern) {
748 for (j = 0; j < i; j++) {
749 free(filters->filters[j].filter.pattern);
751 free(filters->filters);
752 free(filters);
753 return SA_AIS_ERR_NO_MEMORY;
755 filters->filters[i].filter.pattern_size =
756 filt[i].filter.pattern_size;
757 filters->filters[i].filter.allocated_size =
758 filt[i].filter.pattern_size;
759 memcpy(filters->filters[i].filter.pattern,
760 str, filters->filters[i].filter.pattern_size);
761 filters->filters[i].filter_type = filt[i].filter_type;
762 str += filters->filters[i].filter.pattern_size;
765 *evtfilters = filters;
767 return SA_AIS_OK;
771 * Free up filter data
773 static void free_filters(mar_evt_event_filter_array_t *fp)
775 int i;
777 for (i = 0; i < fp->filters_number; i++) {
778 free(fp->filters[i].filter.pattern);
781 free(fp->filters);
782 free(fp);
786 * Look up a channel in the global channel list
788 static struct event_svr_channel_instance *
789 find_channel(mar_name_t *chan_name, uint64_t unlink_id)
791 struct list_head *l, *head;
792 struct event_svr_channel_instance *eci;
795 * choose which list to look through
797 if (unlink_id == EVT_CHAN_ACTIVE) {
798 head = &esc_head;
799 } else {
800 head = &esc_unlinked_head;
803 for (l = head->next; l != head; l = l->next) {
805 eci = list_entry(l, struct event_svr_channel_instance, esc_entry);
806 if (!mar_name_match(chan_name, &eci->esc_channel_name)) {
807 continue;
808 } else if (unlink_id != eci->esc_unlink_id) {
809 continue;
811 return eci;
813 return 0;
817 * Find the last unlinked version of a channel.
819 static struct event_svr_channel_instance *
820 find_last_unlinked_channel(mar_name_t *chan_name)
822 struct list_head *l;
823 struct event_svr_channel_instance *eci;
826 * unlinked channels are added to the head of the list
827 * so the first one we see is the last one added.
829 for (l = esc_unlinked_head.next; l != &esc_unlinked_head; l = l->next) {
831 eci = list_entry(l, struct event_svr_channel_instance, esc_entry);
832 if (!mar_name_match(chan_name, &eci->esc_channel_name)) {
833 continue;
836 return 0;
840 * Create and initialize a channel instance structure
842 static struct event_svr_channel_instance *create_channel(mar_name_t *cn)
844 struct event_svr_channel_instance *eci;
845 eci = (struct event_svr_channel_instance *) malloc(sizeof(*eci));
846 if (!eci) {
847 return (eci);
850 memset(eci, 0, sizeof(*eci));
851 list_init(&eci->esc_entry);
852 list_init(&eci->esc_open_chans);
853 eci->esc_oc_size = total_member_count;
854 eci->esc_node_opens =
855 malloc(sizeof(struct open_count) * total_member_count);
856 if (!eci->esc_node_opens) {
857 free(eci);
858 return 0;
860 memset(eci->esc_node_opens, 0,
861 sizeof(struct open_count) * total_member_count);
862 eci->esc_channel_name = *cn;
863 eci->esc_channel_name.value[eci->esc_channel_name.length] = '\0';
864 list_add(&eci->esc_entry, &esc_head);
866 return eci;
871 * Make sure that the list of nodes is large enough to hold the whole
872 * membership
874 static int check_open_size(struct event_svr_channel_instance *eci)
876 struct open_count *esc_node_opens_tmp;
878 if (total_member_count > eci->esc_oc_size) {
879 esc_node_opens_tmp = realloc (eci->esc_node_opens,
880 sizeof(struct open_count) * total_member_count);
881 if (esc_node_opens_tmp == NULL) {
882 log_printf(LOG_LEVEL_WARNING,
883 "Memory error realloc of node list\n");
884 return -1;
886 eci->esc_node_opens = esc_node_opens_tmp;
887 memset(&eci->esc_node_opens[eci->esc_oc_size], 0,
888 sizeof(struct open_count) *
889 (total_member_count - eci->esc_oc_size));
890 eci->esc_oc_size = total_member_count;
892 return 0;
896 * Find the specified node ID in the node list of the channel.
897 * If it's not in the list, add it.
899 static struct open_count* find_open_count(
900 struct event_svr_channel_instance *eci,
901 mar_uint32_t node_id)
903 int i;
905 for (i = 0; i < eci->esc_oc_size; i++) {
906 if (eci->esc_node_opens[i].oc_node_id == 0) {
907 eci->esc_node_opens[i].oc_node_id = node_id;
908 eci->esc_node_opens[i].oc_open_count = 0;
910 if (eci->esc_node_opens[i].oc_node_id == node_id) {
911 return &eci->esc_node_opens[i];
914 log_printf(LOG_LEVEL_DEBUG,
915 "find_open_count: node id %s not found\n",
916 totempg_ifaces_print (node_id));
917 return 0;
920 static void dump_chan_opens(struct event_svr_channel_instance *eci)
922 int i;
923 log_printf(LOG_LEVEL_NOTICE,
924 "Channel %s, total %d, local %d\n",
925 eci->esc_channel_name.value,
926 eci->esc_total_opens,
927 eci->esc_local_opens);
928 for (i = 0; i < eci->esc_oc_size; i++) {
929 if (eci->esc_node_opens[i].oc_node_id == 0) {
930 break;
932 log_printf(LOG_LEVEL_NOTICE, "Node %s, count %d\n",
933 totempg_ifaces_print (eci->esc_node_opens[i].oc_node_id),
934 eci->esc_node_opens[i].oc_open_count);
938 #ifdef DUMP_CHAN_INFO
940 * Scan the list of channels and dump the open count info.
942 static void dump_all_chans()
944 struct list_head *l;
945 struct event_svr_channel_instance *eci;
947 for (l = esc_head.next; l != &esc_head; l = l->next) {
948 eci = list_entry(l, struct event_svr_channel_instance, esc_entry);
949 dump_chan_opens(eci);
953 #endif
956 * Scan the list of channels and zero out the open counts
958 static void zero_chan_open_counts()
960 struct list_head *l;
961 struct event_svr_channel_instance *eci;
962 int i;
964 for (l = esc_head.next; l != &esc_head; l = l->next) {
965 eci = list_entry(l, struct event_svr_channel_instance, esc_entry);
966 for (i = 0; i < eci->esc_oc_size; i++) {
967 if (eci->esc_node_opens[i].oc_node_id == 0) {
968 break;
970 eci->esc_node_opens[i].oc_open_count = 0;
972 eci->esc_total_opens = 0;
976 * Replace the current open count for a node with the specified value.
978 static int set_open_count(struct event_svr_channel_instance *eci,
979 mar_uint32_t node_id, uint32_t open_count)
981 struct open_count *oc;
982 int i;
984 if ((i = check_open_size(eci)) != 0) {
985 return i;
988 oc = find_open_count(eci, node_id);
989 if (oc) {
990 log_printf(RECOVERY_DEBUG,
991 "Set count: Chan %s for node %s, was %d, now %d\n",
992 eci->esc_channel_name.value, totempg_ifaces_print (node_id),
993 oc->oc_open_count, open_count);
995 eci->esc_total_opens -= oc->oc_open_count;
996 eci->esc_total_opens += open_count;
997 oc->oc_open_count = open_count;
998 return 0;
1000 return -1;
1004 * Increment the open count for the specified node.
1006 static int inc_open_count(struct event_svr_channel_instance *eci,
1007 mar_uint32_t node_id)
1010 struct open_count *oc;
1011 int i;
1013 if ((i = check_open_size(eci)) != 0) {
1014 return i;
1017 if (node_id == my_node_id) {
1018 eci->esc_local_opens++;
1020 oc = find_open_count(eci, node_id);
1021 if (oc) {
1022 eci->esc_total_opens++;
1023 oc->oc_open_count++;
1024 return 0;
1026 return -1;
1030 * Decrement the open count for the specified node in the
1031 * specified channel.
1033 static int dec_open_count(struct event_svr_channel_instance *eci,
1034 mar_uint32_t node_id)
1037 struct open_count *oc;
1038 int i;
1040 if ((i = check_open_size(eci)) != 0) {
1041 return i;
1044 if (node_id == my_node_id) {
1045 eci->esc_local_opens--;
1047 oc = find_open_count(eci, node_id);
1048 if (oc) {
1049 eci->esc_total_opens--;
1050 oc->oc_open_count--;
1051 if ((eci->esc_total_opens < 0) || (oc->oc_open_count < 0)) {
1052 log_printf(LOG_LEVEL_ERROR, "Channel open decrement error\n");
1053 dump_chan_opens(eci);
1055 return 0;
1057 return -1;
1062 * Remove a channel and free its memory if it's not in use anymore.
1064 static void delete_channel(struct event_svr_channel_instance *eci)
1067 log_printf(CHAN_DEL_DEBUG,
1068 "Called Delete channel %s t %d, l %d, r %d\n",
1069 eci->esc_channel_name.value,
1070 eci->esc_total_opens, eci->esc_local_opens,
1071 eci->esc_retained_count);
1073 * If no one has the channel open anywhere and there are no unexpired
1074 * retained events for this channel, and it has been marked for deletion
1075 * by an unlink, then it is OK to delete the data structure.
1077 if ((eci->esc_retained_count == 0) && (eci->esc_total_opens == 0) &&
1078 (eci->esc_unlink_id != EVT_CHAN_ACTIVE)) {
1079 log_printf(CHAN_DEL_DEBUG, "Delete channel %s\n",
1080 eci->esc_channel_name.value);
1081 log_printf(CHAN_UNLINK_DEBUG, "Delete channel %s, unlink_id %0llx\n",
1082 eci->esc_channel_name.value, (unsigned long long)eci->esc_unlink_id);
1084 if (!list_empty(&eci->esc_open_chans)) {
1085 log_printf(LOG_LEVEL_NOTICE,
1086 "Last channel close request for %s (still open)\n",
1087 eci->esc_channel_name.value);
1088 dump_chan_opens(eci);
1089 return;
1093 * adjust if we're sending open counts on a config change.
1095 if ((recovery_phase != evt_recovery_complete) &&
1096 (&eci->esc_entry == next_chan)) {
1097 next_chan = eci->esc_entry.next;
1100 list_del(&eci->esc_entry);
1101 if (eci->esc_node_opens) {
1102 free(eci->esc_node_opens);
1104 free(eci);
1109 * Free up an event structure if it isn't being used anymore.
1111 static void
1112 free_event_data(struct event_data *edp)
1114 if (--edp->ed_ref_count) {
1115 return;
1117 log_printf(LOG_LEVEL_DEBUG, "Freeing event ID: 0x%llx\n",
1118 (unsigned long long)edp->ed_event.led_event_id);
1119 if (edp->ed_delivered) {
1120 free(edp->ed_delivered);
1123 free(edp);
1127 * Mark a channel for deletion.
1129 static void unlink_channel(struct event_svr_channel_instance *eci,
1130 uint64_t unlink_id)
1132 struct event_data *edp;
1133 struct list_head *l, *nxt;
1135 log_printf(CHAN_UNLINK_DEBUG, "Unlink request: %s, id 0x%llx\n",
1136 eci->esc_channel_name.value, (unsigned long long)unlink_id);
1138 * The unlink ID is used to note that the channel has been marked
1139 * for deletion and is a way to distinguish between multiple
1140 * channels of the same name each marked for deletion.
1142 eci->esc_unlink_id = unlink_id;
1145 * Move the unlinked channel to the unlinked list. This way
1146 * we don't have to worry about filtering it out when we need to
1147 * distribute retained events at recovery time.
1149 list_del(&eci->esc_entry);
1150 list_add(&eci->esc_entry, &esc_unlinked_head);
1153 * Scan the retained event list and remove any retained events.
1154 * Since no new opens can occur there won't be any need of sending
1155 * retained events on the channel.
1157 for (l = retained_list.next; l != &retained_list; l = nxt) {
1158 nxt = l->next;
1159 edp = list_entry(l, struct event_data, ed_retained);
1160 if ((edp->ed_my_chan == eci) &&
1161 (edp->ed_event.led_chan_unlink_id == EVT_CHAN_ACTIVE)) {
1162 openais_timer_delete(edp->ed_timer_handle);
1163 edp->ed_event.led_retention_time = 0;
1164 list_del(&edp->ed_retained);
1165 list_init(&edp->ed_retained);
1166 edp->ed_my_chan->esc_retained_count--;
1168 log_printf(CHAN_UNLINK_DEBUG,
1169 "Unlink: Delete retained event id 0x%llx\n",
1170 (unsigned long long)edp->ed_event.led_event_id);
1171 free_event_data(edp);
1175 delete_channel(eci);
1179 * Remove the specified node from the node list in this channel.
1181 static int remove_open_count(
1182 struct event_svr_channel_instance *eci,
1183 mar_uint32_t node_id)
1185 int i;
1186 int j;
1189 * Find the node, remove it and re-pack the array.
1191 for (i = 0; i < eci->esc_oc_size; i++) {
1192 if (eci->esc_node_opens[i].oc_node_id == 0) {
1193 break;
1196 log_printf(RECOVERY_DEBUG, "roc: %s/%s, t %d, oc %d\n",
1197 totempg_ifaces_print (node_id),
1198 totempg_ifaces_print (eci->esc_node_opens[i].oc_node_id),
1199 eci->esc_total_opens, eci->esc_node_opens[i].oc_open_count);
1201 if (eci->esc_node_opens[i].oc_node_id == node_id) {
1203 eci->esc_total_opens -= eci->esc_node_opens[i].oc_open_count;
1205 for (j = i+1; j < eci->esc_oc_size; j++, i++) {
1206 eci->esc_node_opens[i].oc_node_id =
1207 eci->esc_node_opens[j].oc_node_id;
1208 eci->esc_node_opens[i].oc_open_count =
1209 eci->esc_node_opens[j].oc_open_count;
1212 eci->esc_node_opens[eci->esc_oc_size-1].oc_node_id = 0;
1213 eci->esc_node_opens[eci->esc_oc_size-1].oc_open_count = 0;
1216 * Remove the channel if it's not being used anymore
1218 delete_channel(eci);
1219 return 0;
1222 return -1;
1227 * Send a request to open a channel to the rest of the cluster.
1229 static SaAisErrorT evt_open_channel(mar_name_t *cn, SaUint8T flgs)
1231 struct req_evt_chan_command cpkt;
1232 struct event_svr_channel_instance *eci;
1233 struct iovec chn_iovec;
1234 int res;
1235 SaAisErrorT ret;
1237 ret = SA_AIS_OK;
1239 eci = find_channel(cn, EVT_CHAN_ACTIVE);
1242 * If the create flag set, and it doesn't exist, we can make the channel.
1243 * Otherwise, it's an error since we're notified of channels being created
1244 * and opened.
1246 if (!eci && !(flgs & SA_EVT_CHANNEL_CREATE)) {
1247 ret = SA_AIS_ERR_NOT_EXIST;
1248 goto chan_open_end;
1252 * create the channel packet to send. Tell the the cluster
1253 * to create the channel.
1255 memset(&cpkt, 0, sizeof(cpkt));
1256 cpkt.chc_head.id =
1257 SERVICE_ID_MAKE(EVT_SERVICE, MESSAGE_REQ_EXEC_EVT_CHANCMD);
1258 cpkt.chc_head.size = sizeof(cpkt);
1259 cpkt.chc_op = EVT_OPEN_CHAN_OP;
1260 cpkt.u.chc_chan.ocr_name = *cn;
1261 cpkt.u.chc_chan.ocr_serial_no = ++open_serial_no;
1262 chn_iovec.iov_base = (char *)&cpkt;
1263 chn_iovec.iov_len = cpkt.chc_head.size;
1264 log_printf(CHAN_OPEN_DEBUG, "evt_open_channel: Send open mcast\n");
1265 res = totempg_groups_mcast_joined(openais_group_handle,
1266 &chn_iovec, 1, TOTEMPG_AGREED);
1267 log_printf(CHAN_OPEN_DEBUG, "evt_open_channel: Open mcast result: %d\n",
1268 res);
1269 if (res != 0) {
1270 ret = SA_AIS_ERR_LIBRARY;
1273 chan_open_end:
1274 return ret;
1279 * Send a request to close a channel with the rest of the cluster.
1281 static SaAisErrorT evt_close_channel(mar_name_t *cn, uint64_t unlink_id)
1283 struct req_evt_chan_command cpkt;
1284 struct iovec chn_iovec;
1285 int res;
1286 SaAisErrorT ret;
1288 ret = SA_AIS_OK;
1291 * create the channel packet to send. Tell the the cluster
1292 * to close the channel.
1294 memset(&cpkt, 0, sizeof(cpkt));
1295 cpkt.chc_head.id =
1296 SERVICE_ID_MAKE(EVT_SERVICE, MESSAGE_REQ_EXEC_EVT_CHANCMD);
1297 cpkt.chc_head.size = sizeof(cpkt);
1298 cpkt.chc_op = EVT_CLOSE_CHAN_OP;
1299 cpkt.u.chcu.chcu_name = *cn;
1300 cpkt.u.chcu.chcu_unlink_id = unlink_id;
1301 chn_iovec.iov_base = (char *)&cpkt;
1302 chn_iovec.iov_len = cpkt.chc_head.size;
1303 res = totempg_groups_mcast_joined(openais_group_handle,
1304 &chn_iovec, 1, TOTEMPG_AGREED);
1305 if (res != 0) {
1306 ret = SA_AIS_ERR_LIBRARY;
1308 return ret;
1313 * Node data access functions. Used to keep track of event IDs
1314 * delivery of messages.
1316 * add_node: Add a new member node to our list.
1317 * remove_node: Remove a node that left membership.
1318 * find_node: Given the node ID return a pointer to node information.
1321 #define NODE_HASH_SIZE 256
1322 static struct member_node_data *nl[NODE_HASH_SIZE] = {0};
1323 inline int
1324 hash_sock_addr(unsigned int nodeid)
1326 return nodeid & (NODE_HASH_SIZE - 1);
1329 static struct member_node_data **lookup_node(unsigned int nodeid)
1331 int index = hash_sock_addr(nodeid);
1332 struct member_node_data **nlp;
1334 nlp = &nl[index];
1335 for (nlp = &nl[index]; *nlp; nlp = &((*nlp)->mn_next)) {
1336 if ((*(nlp))->mn_nodeid == nodeid) {
1337 break;
1341 return nlp;
1344 static struct member_node_data *
1345 evt_find_node(unsigned int nodeid)
1347 struct member_node_data **nlp;
1349 nlp = lookup_node(nodeid);
1351 if (!nlp) {
1352 log_printf(LOG_LEVEL_DEBUG, "find_node: Got NULL nlp?\n");
1353 return 0;
1356 return *nlp;
1359 static SaAisErrorT
1360 evt_add_node(
1361 unsigned int nodeid,
1362 SaClmClusterNodeT *cn)
1364 struct member_node_data **nlp;
1365 struct member_node_data *nl;
1366 SaAisErrorT err = SA_AIS_ERR_EXIST;
1368 nlp = lookup_node(nodeid);
1370 if (!nlp) {
1371 log_printf(LOG_LEVEL_DEBUG, "add_node: Got NULL nlp?\n");
1372 goto an_out;
1375 if (*nlp) {
1376 goto an_out;
1379 *nlp = malloc(sizeof(struct member_node_data));
1380 if (!(*nlp)) {
1381 return SA_AIS_ERR_NO_MEMORY;
1383 nl = *nlp;
1384 if (nl) {
1385 memset(nl, 0, sizeof(*nl));
1386 err = SA_AIS_OK;
1387 nl->mn_nodeid = nodeid;
1388 nl->mn_started = 1;
1390 list_init(&nl->mn_entry);
1391 list_add(&nl->mn_entry, &mnd);
1392 nl->mn_node_info = *cn;
1394 an_out:
1395 return err;
1399 * Find the oldest node in the membership. This is the one we choose to
1400 * perform some cluster-wide functions like distributing retained events.
1401 * We only check nodes that were in our transitional configuration. In this
1402 * way there is a recovery node chosen for each original partition in case
1403 * of a merge.
1405 static struct member_node_data* oldest_node()
1407 struct member_node_data *mn = 0;
1408 struct member_node_data *oldest = 0;
1409 int i;
1411 for (i = 0; i < trans_member_count; i++) {
1412 mn = evt_find_node(trans_member_list[i]);
1413 if (!mn || (mn->mn_started == 0)) {
1414 log_printf(LOG_LEVEL_ERROR,
1415 "Transitional config Node %s not active\n",
1416 totempg_ifaces_print (trans_member_list[i]));
1417 continue;
1419 if ((oldest == NULL) ||
1420 (mn->mn_node_info.bootTimestamp <
1421 oldest->mn_node_info.bootTimestamp )) {
1422 oldest = mn;
1423 } else if (mn->mn_node_info.bootTimestamp ==
1424 oldest->mn_node_info.bootTimestamp) {
1425 if (mn->mn_node_info.nodeId < oldest->mn_node_info.nodeId) {
1426 oldest = mn;
1430 return oldest;
1435 * keep track of the last event ID from a node.
1436 * If we get an event ID less than our last, we've already
1437 * seen it. It's probably a retained event being sent to
1438 * a new node.
1440 static int check_last_event(
1441 struct lib_event_data *evtpkt,
1442 unsigned int nodeid)
1444 struct member_node_data *nd;
1445 SaClmClusterNodeT *cn;
1448 nd = evt_find_node(nodeid);
1449 if (!nd) {
1450 log_printf(LOG_LEVEL_DEBUG,
1451 "Node ID %s not found for event %llx\n",
1452 totempg_ifaces_print (evtpkt->led_publisher_node_id),
1453 (unsigned long long)evtpkt->led_event_id);
1454 cn = main_clm_get_by_nodeid(nodeid);
1455 if (!cn) {
1456 log_printf(LOG_LEVEL_DEBUG,
1457 "Cluster Node 0x%s not found for event %llx\n",
1458 totempg_ifaces_print (evtpkt->led_publisher_node_id),
1459 (unsigned long long)evtpkt->led_event_id);
1460 } else {
1461 evt_add_node(nodeid, cn);
1462 nd = evt_find_node(nodeid);
1466 if (!nd) {
1467 return 0;
1470 if ((nd->mn_last_msg_id < evtpkt->led_msg_id)) {
1471 nd->mn_last_msg_id = evtpkt->led_msg_id;
1472 return 0;
1474 return 1;
1478 * event id generating code. We use the node ID for this node for the
1479 * upper 32 bits of the event ID to make sure that we can generate a cluster
1480 * wide unique event ID for a given event.
1482 SaAisErrorT set_event_id(mar_uint32_t node_id)
1484 SaAisErrorT err = SA_AIS_OK;
1485 if (base_id_top) {
1486 err = SA_AIS_ERR_EXIST;
1488 base_id_top = (mar_evteventid_t)node_id << 32;
1489 return err;
1493 * See if an event Id is still in use in the retained event
1494 * list.
1496 static int id_in_use(uint64_t id, uint64_t base)
1498 struct list_head *l;
1499 struct event_data *edp;
1500 mar_evteventid_t evtid = (id << 32) | (base & BASE_ID_MASK);
1502 for (l = retained_list.next; l != &retained_list; l = l->next) {
1503 edp = list_entry(l, struct event_data, ed_retained);
1504 if (edp->ed_event.led_event_id == evtid) {
1505 return 1;
1508 return 0;
1511 static SaAisErrorT get_event_id(uint64_t *event_id, uint64_t *msg_id)
1514 * Don't reuse an event ID if it is still valid because of
1515 * a retained event.
1517 while (id_in_use(base_id_top, base_id)) {
1518 base_id++;
1521 *event_id = base_id_top | (base_id & BASE_ID_MASK) ;
1522 *msg_id = base_id++;
1523 return SA_AIS_OK;
1529 * Timer handler to delete expired events.
1532 static void
1533 event_retention_timeout(void *data)
1535 struct event_data *edp = data;
1536 log_printf(RETENTION_TIME_DEBUG, "Event ID %llx expired\n",
1537 (unsigned long long)edp->ed_event.led_event_id);
1539 * adjust next_retained if we're in recovery and
1540 * were in charge of sending retained events.
1542 if (recovery_phase != evt_recovery_complete && recovery_node) {
1543 if (next_retained == &edp->ed_retained) {
1544 next_retained = edp->ed_retained.next;
1547 list_del(&edp->ed_retained);
1548 list_init(&edp->ed_retained);
1550 * Check to see if the channel isn't in use anymore.
1552 edp->ed_my_chan->esc_retained_count--;
1553 if (edp->ed_my_chan->esc_retained_count == 0) {
1554 delete_channel(edp->ed_my_chan);
1556 free_event_data(edp);
1560 * clear a particular event's retention time.
1561 * This will free the event as long as it isn't being
1562 * currently used.
1565 static SaAisErrorT
1566 clear_retention_time(mar_evteventid_t event_id)
1568 struct event_data *edp;
1569 struct list_head *l, *nxt;
1571 log_printf(RETENTION_TIME_DEBUG, "Search for Event ID %llx\n",
1572 (unsigned long long)event_id);
1573 for (l = retained_list.next; l != &retained_list; l = nxt) {
1574 nxt = l->next;
1575 edp = list_entry(l, struct event_data, ed_retained);
1576 if (edp->ed_event.led_event_id != event_id) {
1577 continue;
1580 log_printf(RETENTION_TIME_DEBUG,
1581 "Clear retention time for Event ID %llx\n",
1582 (unsigned long long)edp->ed_event.led_event_id);
1583 openais_timer_delete(edp->ed_timer_handle);
1584 edp->ed_event.led_retention_time = 0;
1585 list_del(&edp->ed_retained);
1586 list_init(&edp->ed_retained);
1589 * Check to see if the channel isn't in use anymore.
1591 edp->ed_my_chan->esc_retained_count--;
1592 if (edp->ed_my_chan->esc_retained_count == 0) {
1593 delete_channel(edp->ed_my_chan);
1595 free_event_data(edp);
1596 return SA_AIS_OK;
1598 return SA_AIS_ERR_NOT_EXIST;
1602 * Remove specified channel from event delivery list
1604 static void
1605 remove_delivered_channel(struct event_svr_channel_open *eco)
1607 int i;
1608 struct list_head *l;
1609 struct event_data *edp;
1611 for (l = retained_list.next; l != &retained_list; l = l->next) {
1612 edp = list_entry(l, struct event_data, ed_retained);
1614 for (i = 0; i < edp->ed_delivered_next; i++) {
1615 if (edp->ed_delivered[i] == eco) {
1616 edp->ed_delivered_next--;
1617 if (edp->ed_delivered_next == i) {
1618 break;
1620 memmove(&edp->ed_delivered[i],
1621 &edp->ed_delivered[i+1],
1622 &edp->ed_delivered[edp->ed_delivered_next] -
1623 &edp->ed_delivered[i]);
1624 break;
1631 * If there is a retention time, add this open channel to the event so
1632 * we can check if we've already delivered this message later if a new
1633 * subscription matches.
1635 #define DELIVER_SIZE 8
1636 static void
1637 evt_delivered(struct event_data *evt, struct event_svr_channel_open *eco)
1639 if (!evt->ed_event.led_retention_time) {
1640 return;
1643 log_printf(LOG_LEVEL_DEBUG, "delivered ID %llx to eco %p\n",
1644 (unsigned long long)evt->ed_event.led_event_id, eco);
1645 if (evt->ed_delivered_count == evt->ed_delivered_next) {
1646 struct event_svr_channel_open **ed_delivered_tmp;
1648 ed_delivered_tmp = realloc (evt->ed_delivered,
1649 DELIVER_SIZE * sizeof(struct event_svr_channel_open *));
1650 if (ed_delivered_tmp == NULL) {
1651 log_printf(LOG_LEVEL_WARNING, "Memory error realloc\n");
1652 return;
1654 evt->ed_delivered = ed_delivered_tmp;
1655 memset(evt->ed_delivered + evt->ed_delivered_next, 0,
1656 DELIVER_SIZE * sizeof(struct event_svr_channel_open *));
1657 evt->ed_delivered_next = evt->ed_delivered_count;
1658 evt->ed_delivered_count += DELIVER_SIZE;
1661 evt->ed_delivered[evt->ed_delivered_next++] = eco;
1665 * Check to see if an event has already been delivered to this open channel
1667 static int
1668 evt_already_delivered(struct event_data *evt,
1669 struct event_svr_channel_open *eco)
1671 int i;
1673 if (!evt->ed_event.led_retention_time) {
1674 return 0;
1677 log_printf(LOG_LEVEL_DEBUG, "Deliver count: %d deliver_next %d\n",
1678 evt->ed_delivered_count, evt->ed_delivered_next);
1679 for (i = 0; i < evt->ed_delivered_next; i++) {
1680 log_printf(LOG_LEVEL_DEBUG, "Checking ID %llx delivered %p eco %p\n",
1681 (unsigned long long)evt->ed_event.led_event_id,
1682 evt->ed_delivered[i], eco);
1683 if (evt->ed_delivered[i] == eco) {
1684 return 1;
1687 return 0;
1691 * Compare a filter to a given pattern.
1692 * return SA_AIS_OK if the pattern matches a filter
1694 static SaAisErrorT
1695 filter_match(mar_evt_event_pattern_t *ep, mar_evt_event_filter_t *ef)
1697 int ret;
1698 ret = SA_AIS_ERR_FAILED_OPERATION;
1700 switch (ef->filter_type) {
1701 case SA_EVT_PREFIX_FILTER:
1702 if (ef->filter.pattern_size > ep->pattern_size) {
1703 break;
1705 if (strncmp((char *)ef->filter.pattern, (char *)ep->pattern,
1706 ef->filter.pattern_size) == 0) {
1707 ret = SA_AIS_OK;
1709 break;
1710 case SA_EVT_SUFFIX_FILTER:
1711 if (ef->filter.pattern_size > ep->pattern_size) {
1712 break;
1714 if (strncmp((char *)ef->filter.pattern,
1715 (char *)&ep->pattern[ep->pattern_size - ef->filter.pattern_size],
1716 ef->filter.pattern_size) == 0) {
1717 ret = SA_AIS_OK;
1720 break;
1721 case SA_EVT_EXACT_FILTER:
1722 if (ef->filter.pattern_size != ep->pattern_size) {
1723 break;
1725 if (strncmp((char *)ef->filter.pattern, (char *)ep->pattern,
1726 ef->filter.pattern_size) == 0) {
1727 ret = SA_AIS_OK;
1729 break;
1730 case SA_EVT_PASS_ALL_FILTER:
1731 ret = SA_AIS_OK;
1732 break;
1733 default:
1734 break;
1736 return ret;
1740 * compare the event's patterns with the subscription's filter rules.
1741 * SA_AIS_OK is returned if the event matches the filter rules.
1743 static SaAisErrorT
1744 event_match(struct event_data *evt,
1745 struct event_svr_channel_subscr *ecs)
1747 mar_evt_event_filter_t *ef;
1748 mar_evt_event_pattern_t *ep;
1749 uint32_t filt_count;
1750 SaAisErrorT ret = SA_AIS_OK;
1751 int i;
1753 ep = (mar_evt_event_pattern_t *)(&evt->ed_event.led_body[0]);
1754 ef = ecs->ecs_filters->filters;
1755 filt_count = min(ecs->ecs_filters->filters_number,
1756 evt->ed_event.led_patterns_number);
1758 for (i = 0; i < filt_count; i++) {
1759 ret = filter_match(ep, ef);
1760 if (ret != SA_AIS_OK) {
1761 break;
1763 ep++;
1764 ef++;
1766 return ret;
1770 * Scan undelivered pending events and either remove them if no subscription
1771 * filters match anymore or re-assign them to another matching subscription
1773 static void
1774 filter_undelivered_events(struct event_svr_channel_open *op_chan)
1776 struct event_svr_channel_open *eco;
1777 struct event_svr_channel_instance *eci;
1778 struct event_svr_channel_subscr *ecs;
1779 struct chan_event_list *cel;
1780 struct libevt_pd *esip;
1781 struct list_head *l, *nxt;
1782 struct list_head *l1, *l2;
1783 int i;
1785 esip = (struct libevt_pd *)openais_conn_private_data_get(op_chan->eco_conn);
1786 eci = op_chan->eco_channel;
1789 * Scan each of the priority queues for messages
1791 for (i = SA_EVT_HIGHEST_PRIORITY; i <= SA_EVT_LOWEST_PRIORITY; i++) {
1793 * examine each message queued for delivery
1795 for (l = esip->esi_events[i].next; l != &esip->esi_events[i]; l = nxt) {
1796 nxt = l->next;
1797 cel = list_entry(l, struct chan_event_list, cel_entry);
1799 * Check open channels
1801 for (l1 = eci->esc_open_chans.next;
1802 l1 != &eci->esc_open_chans; l1 = l1->next) {
1803 eco = list_entry(l1, struct event_svr_channel_open, eco_entry);
1806 * See if this channel open instance belongs
1807 * to this evtinitialize instance
1809 if (eco->eco_conn != op_chan->eco_conn) {
1810 continue;
1814 * See if enabled to receive
1816 if (!(eco->eco_flags & SA_EVT_CHANNEL_SUBSCRIBER)) {
1817 continue;
1821 * Check subscriptions
1823 for (l2 = eco->eco_subscr.next;
1824 l2 != &eco->eco_subscr; l2 = l2->next) {
1825 ecs = list_entry(l2,
1826 struct event_svr_channel_subscr, ecs_entry);
1827 if (event_match(cel->cel_event, ecs) == SA_AIS_OK) {
1829 * Something still matches.
1830 * We'll assign it to
1831 * the new subscription.
1833 cel->cel_sub_id = ecs->ecs_sub_id;
1834 cel->cel_chan_handle = eco->eco_lib_handle;
1835 goto next_event;
1840 * No subscription filter matches anymore. We
1841 * can delete this event.
1843 list_del(&cel->cel_entry);
1844 list_init(&cel->cel_entry);
1845 esip->esi_nevents--;
1847 free_event_data(cel->cel_event);
1848 free(cel);
1849 next_event:
1850 continue;
1856 * Notify the library of a pending event
1858 static void __notify_event(void *conn)
1860 struct res_evt_event_data res;
1861 struct libevt_pd *esip;
1863 esip = (struct libevt_pd *)openais_conn_private_data_get(conn);
1864 log_printf(LOG_LEVEL_DEBUG, "DELIVER: notify\n");
1865 if (esip->esi_nevents != 0) {
1866 res.evd_head.size = sizeof(res);
1867 res.evd_head.id = MESSAGE_RES_EVT_AVAILABLE;
1868 res.evd_head.error = SA_AIS_OK;
1869 openais_conn_send_response(openais_conn_partner_get(conn),
1870 &res, sizeof(res));
1874 inline void notify_event(void *conn)
1876 struct libevt_pd *esip;
1878 esip = (struct libevt_pd *)openais_conn_private_data_get(conn);
1881 * Give the library a kick if there aren't already
1882 * events queued for delivery.
1884 if (esip->esi_nevents++ == 0) {
1885 __notify_event(conn);
1890 * sends/queues up an event for a subscribed channel.
1892 static void
1893 deliver_event(struct event_data *evt,
1894 struct event_svr_channel_open *eco,
1895 struct event_svr_channel_subscr *ecs)
1897 struct chan_event_list *ep;
1898 SaEvtEventPriorityT evt_prio = evt->ed_event.led_priority;
1899 struct chan_event_list *cel;
1900 int do_deliver_event = 0;
1901 int do_deliver_warning = 0;
1902 int i;
1903 struct libevt_pd *esip;
1905 esip = (struct libevt_pd *)openais_conn_private_data_get(eco->eco_conn);
1907 if (evt_prio > SA_EVT_LOWEST_PRIORITY) {
1908 evt_prio = SA_EVT_LOWEST_PRIORITY;
1912 * Delivery queue check.
1913 * - If the queue is blocked, see if we've sent enough messages to
1914 * unblock it.
1915 * - If it isn't blocked, see if this message will put us over the top.
1916 * - If we can't deliver this message, see if we can toss some lower
1917 * priority message to make room for this one.
1918 * - If we toss any messages, queue up an event of SA_EVT_LOST_EVENT_PATTERN
1919 * to let the application know that we dropped some messages.
1921 if (esip->esi_queue_blocked) {
1922 if (esip->esi_nevents < evt_delivery_queue_resume) {
1923 esip->esi_queue_blocked = 0;
1924 log_printf(LOG_LEVEL_DEBUG, "unblock\n");
1928 assert (esip->esi_nevents >= 0);
1929 if (!esip->esi_queue_blocked &&
1930 (esip->esi_nevents >= evt_delivery_queue_size)) {
1931 log_printf(LOG_LEVEL_DEBUG, "block\n");
1932 esip->esi_queue_blocked = 1;
1933 do_deliver_warning = 1;
1936 if (esip->esi_queue_blocked) {
1937 do_deliver_event = 0;
1938 for (i = SA_EVT_LOWEST_PRIORITY; i > evt_prio; i--) {
1939 if (!list_empty(&esip->esi_events[i])) {
1941 * Get the last item on the list, so we drop the most
1942 * recent lowest priority event.
1944 cel = list_entry(esip->esi_events[i].prev,
1945 struct chan_event_list, cel_entry);
1946 log_printf(LOG_LEVEL_DEBUG, "Drop 0x%0llx\n",
1947 (unsigned long long)cel->cel_event->ed_event.led_event_id);
1948 list_del(&cel->cel_entry);
1949 free_event_data(cel->cel_event);
1950 free(cel);
1951 esip->esi_nevents--;
1952 do_deliver_event = 1;
1953 break;
1956 } else {
1957 do_deliver_event = 1;
1961 * Queue the event for delivery
1963 if (do_deliver_event) {
1964 ep = malloc(sizeof(*ep));
1965 if (!ep) {
1966 log_printf(LOG_LEVEL_WARNING,
1967 "3Memory allocation error, can't deliver event\n");
1968 return;
1970 evt->ed_ref_count++;
1971 ep->cel_chan_handle = eco->eco_lib_handle;
1972 ep->cel_sub_id = ecs->ecs_sub_id;
1973 list_init(&ep->cel_entry);
1974 ep->cel_event = evt;
1975 list_add_tail(&ep->cel_entry, &esip->esi_events[evt_prio]);
1976 evt_delivered(evt, eco);
1977 notify_event(eco->eco_conn);
1981 * If we dropped an event, queue this so that the application knows
1982 * what has happened.
1984 if (do_deliver_warning) {
1985 struct event_data *ed;
1986 ed = malloc(dropped_event_size);
1987 if (!ed) {
1988 log_printf(LOG_LEVEL_WARNING,
1989 "4Memory allocation error, can't deliver event\n");
1990 return;
1992 log_printf(LOG_LEVEL_DEBUG, "Warn 0x%0llx\n",
1993 (unsigned long long)evt->ed_event.led_event_id);
1994 memcpy(ed, dropped_event, dropped_event_size);
1995 ed->ed_event.led_publish_time = clust_time_now();
1996 ed->ed_event.led_event_id = SA_EVT_EVENTID_LOST;
1997 list_init(&ed->ed_retained);
1999 ep = malloc(sizeof(*ep));
2000 if (!ep) {
2001 log_printf(LOG_LEVEL_WARNING,
2002 "5Memory allocation error, can't deliver event\n");
2003 return;
2005 ep->cel_chan_handle = eco->eco_lib_handle;
2006 ep->cel_sub_id = ecs->ecs_sub_id;
2007 list_init(&ep->cel_entry);
2008 ep->cel_event = ed;
2009 list_add_tail(&ep->cel_entry, &esip->esi_events[SA_EVT_HIGHEST_PRIORITY]);
2010 notify_event(eco->eco_conn);
2015 * Take the event data and swap the elements so they match our architectures
2016 * word order.
2018 static void
2019 convert_event(void *msg)
2021 struct lib_event_data *evt = (struct lib_event_data *)msg;
2022 mar_evt_event_pattern_t *eps;
2023 int i;
2026 * The following elements don't require processing:
2028 * converted in the main deliver_fn:
2029 * led_head.id, led_head.size.
2031 * Supplied localy:
2032 * source_addr, publisher_node_id, receive_time.
2034 * Used internaly only:
2035 * led_svr_channel_handle and led_lib_channel_handle.
2038 swab_mar_name_t (&evt->led_chan_name);
2039 evt->led_chan_unlink_id = swab64(evt->led_chan_unlink_id);
2040 evt->led_event_id = swab64(evt->led_event_id);
2041 evt->led_sub_id = swab32(evt->led_sub_id);
2042 swab_mar_name_t (&evt->led_publisher_name);
2043 evt->led_retention_time = swab64(evt->led_retention_time);
2044 evt->led_publish_time = swab64(evt->led_publish_time);
2045 evt->led_user_data_offset = swab32(evt->led_user_data_offset);
2046 evt->led_user_data_size = swab32(evt->led_user_data_size);
2047 evt->led_patterns_number = swab32(evt->led_patterns_number);
2050 * Now we need to go through the led_body and swizzle pattern counts.
2051 * We can't do anything about user data since it doesn't have a specified
2052 * format. The application is on its own here.
2054 eps = (mar_evt_event_pattern_t *)evt->led_body;
2055 for (i = 0; i < evt->led_patterns_number; i++) {
2056 eps->pattern_size = swab32(eps->pattern_size);
2057 eps->allocated_size = swab32(eps->allocated_size);
2058 eps++;
2064 * Take an event received from the network and fix it up to be usable.
2065 * - fix up pointers for pattern list.
2066 * - fill in some channel info
2068 static struct event_data *
2069 make_local_event(struct lib_event_data *p,
2070 struct event_svr_channel_instance *eci)
2072 struct event_data *ed;
2073 mar_evt_event_pattern_t *eps;
2074 SaUint8T *str;
2075 uint32_t ed_size;
2076 int i;
2078 ed_size = sizeof(*ed) + p->led_user_data_offset + p->led_user_data_size;
2079 ed = malloc(ed_size);
2080 if (!ed) {
2081 log_printf(LOG_LEVEL_WARNING,
2082 "Failed to allocate %u bytes for event, offset %u, data size %u\n",
2083 ed_size, p->led_user_data_offset, p->led_user_data_size);
2084 return 0;
2086 memset(ed, 0, ed_size);
2087 list_init(&ed->ed_retained);
2088 ed->ed_my_chan = eci;
2091 * Fill in lib_event_data and make the pattern pointers valid
2093 memcpy(&ed->ed_event, p, sizeof(*p) +
2094 p->led_user_data_offset + p->led_user_data_size);
2096 eps = (mar_evt_event_pattern_t *)ed->ed_event.led_body;
2097 str = ed->ed_event.led_body +
2098 (ed->ed_event.led_patterns_number * sizeof(mar_evt_event_pattern_t));
2099 for (i = 0; i < ed->ed_event.led_patterns_number; i++) {
2100 eps->pattern = str;
2101 str += eps->pattern_size;
2102 eps++;
2105 ed->ed_ref_count++;
2106 return ed;
2110 * Set an event to be retained.
2112 static void retain_event(struct event_data *evt)
2114 uint32_t ret;
2115 evt->ed_ref_count++;
2116 evt->ed_my_chan->esc_retained_count++;
2117 list_add_tail(&evt->ed_retained, &retained_list);
2119 ret = openais_timer_add_duration (
2120 evt->ed_event.led_retention_time,
2121 evt,
2122 event_retention_timeout,
2123 &evt->ed_timer_handle);
2125 if (ret != 0) {
2126 log_printf(LOG_LEVEL_ERROR,
2127 "retention of event id 0x%llx failed\n",
2128 (unsigned long long)evt->ed_event.led_event_id);
2129 } else {
2130 log_printf(RETENTION_TIME_DEBUG, "Retain event ID 0x%llx for %llu ms\n",
2131 (unsigned long long)evt->ed_event.led_event_id, evt->ed_event.led_retention_time/100000LL);
2136 * Scan the subscription list and look for the specified subsctiption ID.
2137 * Only look for the ID in subscriptions that are associated with the
2138 * saEvtInitialize associated with the specified open channel.
2140 static struct event_svr_channel_subscr *find_subscr(
2141 struct event_svr_channel_open *open_chan, SaEvtSubscriptionIdT sub_id)
2143 struct event_svr_channel_instance *eci;
2144 struct event_svr_channel_subscr *ecs;
2145 struct event_svr_channel_open *eco;
2146 struct list_head *l, *l1;
2147 void *conn = open_chan->eco_conn;
2149 eci = open_chan->eco_channel;
2152 * Check for subscription id already in use.
2153 * Subscriptions are unique within saEvtInitialize (Callback scope).
2155 for (l = eci->esc_open_chans.next; l != &eci->esc_open_chans; l = l->next) {
2156 eco = list_entry(l, struct event_svr_channel_open, eco_entry);
2158 * Don't bother with open channels associated with another
2159 * EvtInitialize
2161 if (eco->eco_conn != conn) {
2162 continue;
2165 for (l1 = eco->eco_subscr.next; l1 != &eco->eco_subscr; l1 = l1->next) {
2166 ecs = list_entry(l1, struct event_svr_channel_subscr, ecs_entry);
2167 if (ecs->ecs_sub_id == sub_id) {
2168 return ecs;
2172 return 0;
2176 * Handler for saEvtInitialize
2178 static int evt_lib_init(void *conn)
2180 struct libevt_pd *libevt_pd;
2181 int i;
2183 libevt_pd = (struct libevt_pd *)openais_conn_private_data_get(conn);
2186 log_printf(LOG_LEVEL_DEBUG, "saEvtInitialize request.\n");
2189 * Initailze event instance data
2191 memset(libevt_pd, 0, sizeof(*libevt_pd));
2194 * Initialize the open channel handle database.
2196 hdb_create(&libevt_pd->esi_hdb);
2199 * list of channels open on this instance
2201 list_init(&libevt_pd->esi_open_chans);
2204 * pending event lists for each piriority
2206 for (i = SA_EVT_HIGHEST_PRIORITY; i <= SA_EVT_LOWEST_PRIORITY; i++) {
2207 list_init(&libevt_pd->esi_events[i]);
2210 return 0;
2214 * Handler for saEvtChannelOpen
2216 static void lib_evt_open_channel(void *conn, void *message)
2218 SaAisErrorT error;
2219 struct req_evt_channel_open *req;
2220 struct res_evt_channel_open res;
2221 struct open_chan_pending *ocp;
2222 int ret;
2224 req = message;
2227 log_printf(CHAN_OPEN_DEBUG,
2228 "saEvtChannelOpen (Open channel request)\n");
2229 log_printf(CHAN_OPEN_DEBUG,
2230 "handle 0x%llx, to 0x%llx\n",
2231 (unsigned long long)req->ico_c_handle,
2232 (unsigned long long)req->ico_timeout);
2233 log_printf(CHAN_OPEN_DEBUG, "flags %x, channel name(%d) %s\n",
2234 req->ico_open_flag,
2235 req->ico_channel_name.length,
2236 req->ico_channel_name.value);
2238 * Open the channel.
2241 error = evt_open_channel(&req->ico_channel_name, req->ico_open_flag);
2243 if (error != SA_AIS_OK) {
2244 goto open_return;
2247 ocp = malloc(sizeof(struct open_chan_pending));
2248 if (!ocp) {
2249 error = SA_AIS_ERR_NO_MEMORY;
2250 goto open_return;
2253 ocp->ocp_async = 0;
2254 ocp->ocp_invocation = 0;
2255 ocp->ocp_chan_name = req->ico_channel_name;
2256 ocp->ocp_open_flag = req->ico_open_flag;
2257 ocp->ocp_conn = conn;
2258 ocp->ocp_c_handle = req->ico_c_handle;
2259 ocp->ocp_timer_handle = 0;
2260 ocp->ocp_serial_no = open_serial_no;
2261 list_init(&ocp->ocp_entry);
2262 list_add_tail(&ocp->ocp_entry, &open_pending);
2263 ret = openais_timer_add_duration (
2264 req->ico_timeout,
2265 ocp,
2266 chan_open_timeout,
2267 &ocp->ocp_timer_handle);
2268 if (ret != 0) {
2269 log_printf(LOG_LEVEL_WARNING,
2270 "Error setting timeout for open channel %s\n",
2271 req->ico_channel_name.value);
2273 return;
2276 open_return:
2277 res.ico_head.size = sizeof(res);
2278 res.ico_head.id = MESSAGE_RES_EVT_OPEN_CHANNEL;
2279 res.ico_head.error = error;
2280 openais_conn_send_response(conn, &res, sizeof(res));
2284 * Handler for saEvtChannelOpen
2286 static void lib_evt_open_channel_async(void *conn, void *message)
2288 SaAisErrorT error;
2289 struct req_evt_channel_open *req;
2290 struct res_evt_channel_open res;
2291 struct open_chan_pending *ocp;
2293 req = message;
2296 log_printf(CHAN_OPEN_DEBUG,
2297 "saEvtChannelOpenAsync (Async Open channel request)\n");
2298 log_printf(CHAN_OPEN_DEBUG,
2299 "handle 0x%llx, to 0x%llx\n",
2300 (unsigned long long)req->ico_c_handle,
2301 (unsigned long long)req->ico_invocation);
2302 log_printf(CHAN_OPEN_DEBUG, "flags %x, channel name(%d) %s\n",
2303 req->ico_open_flag,
2304 req->ico_channel_name.length,
2305 req->ico_channel_name.value);
2307 * Open the channel.
2310 error = evt_open_channel(&req->ico_channel_name, req->ico_open_flag);
2312 if (error != SA_AIS_OK) {
2313 goto open_return;
2316 ocp = malloc(sizeof(struct open_chan_pending));
2317 if (!ocp) {
2318 error = SA_AIS_ERR_NO_MEMORY;
2319 goto open_return;
2322 ocp->ocp_async = 1;
2323 ocp->ocp_invocation = req->ico_invocation;
2324 ocp->ocp_c_handle = req->ico_c_handle;
2325 ocp->ocp_chan_name = req->ico_channel_name;
2326 ocp->ocp_open_flag = req->ico_open_flag;
2327 ocp->ocp_conn = conn;
2328 ocp->ocp_timer_handle = 0;
2329 ocp->ocp_serial_no = open_serial_no;
2330 list_init(&ocp->ocp_entry);
2331 list_add_tail(&ocp->ocp_entry, &open_pending);
2333 open_return:
2334 res.ico_head.size = sizeof(res);
2335 res.ico_head.id = MESSAGE_RES_EVT_OPEN_CHANNEL;
2336 res.ico_head.error = error;
2337 openais_conn_send_response(conn, &res, sizeof(res));
2343 * Used by the channel close code and by the implicit close
2344 * when saEvtFinalize is called with channels open.
2346 static SaAisErrorT
2347 common_chan_close(struct event_svr_channel_open *eco, struct libevt_pd *esip)
2349 struct event_svr_channel_subscr *ecs;
2350 struct list_head *l, *nxt;
2352 log_printf(LOG_LEVEL_DEBUG, "Close channel %s flags 0x%02x\n",
2353 eco->eco_channel->esc_channel_name.value,
2354 eco->eco_flags);
2357 * Disconnect the channel open structure.
2359 * Check for subscriptions and deal with them. In this case
2360 * if there are any, we just implicitly unsubscribe.
2362 * When We're done with the channel open data then we can
2363 * remove it's handle (this frees the memory too).
2366 list_del(&eco->eco_entry);
2367 list_del(&eco->eco_instance_entry);
2369 for (l = eco->eco_subscr.next; l != &eco->eco_subscr; l = nxt) {
2370 nxt = l->next;
2371 ecs = list_entry(l, struct event_svr_channel_subscr, ecs_entry);
2372 log_printf(LOG_LEVEL_DEBUG, "Unsubscribe ID: %x\n",
2373 ecs->ecs_sub_id);
2374 list_del(&ecs->ecs_entry);
2375 free(ecs);
2377 * Purge any pending events associated with this subscription
2378 * that don't match another subscription.
2380 filter_undelivered_events(eco);
2384 * Remove this channel from the retained event's notion
2385 * of who they have been delivered to.
2387 remove_delivered_channel(eco);
2388 return evt_close_channel(&eco->eco_channel->esc_channel_name,
2389 eco->eco_channel->esc_unlink_id);
2393 * Handler for saEvtChannelClose
2395 static void lib_evt_close_channel(void *conn, void *message)
2397 struct req_evt_channel_close *req;
2398 struct res_evt_channel_close res;
2399 struct event_svr_channel_open *eco;
2400 unsigned int ret;
2401 void *ptr;
2402 struct libevt_pd *esip;
2404 esip = (struct libevt_pd *)openais_conn_private_data_get(conn);
2406 req = message;
2408 log_printf(LOG_LEVEL_DEBUG,
2409 "saEvtChannelClose (Close channel request)\n");
2410 log_printf(LOG_LEVEL_DEBUG, "handle 0x%x\n", req->icc_channel_handle);
2413 * look up the channel handle
2415 ret = hdb_handle_get(&esip->esi_hdb,
2416 req->icc_channel_handle, &ptr);
2417 if (ret != 0) {
2418 goto chan_close_done;
2420 eco = ptr;
2422 common_chan_close(eco, esip);
2423 hdb_handle_destroy(&esip->esi_hdb, req->icc_channel_handle);
2424 hdb_handle_put(&esip->esi_hdb, req->icc_channel_handle);
2426 chan_close_done:
2427 res.icc_head.size = sizeof(res);
2428 res.icc_head.id = MESSAGE_RES_EVT_CLOSE_CHANNEL;
2429 res.icc_head.error = ((ret == 0) ? SA_AIS_OK : SA_AIS_ERR_BAD_HANDLE);
2430 openais_conn_send_response(conn, &res, sizeof(res));
2434 * Handler for saEvtChannelUnlink
2436 static void lib_evt_unlink_channel(void *conn, void *message)
2438 struct req_evt_channel_unlink *req;
2439 struct res_evt_channel_unlink res;
2440 struct iovec chn_iovec;
2441 struct unlink_chan_pending *ucp = 0;
2442 struct req_evt_chan_command cpkt;
2443 SaAisErrorT error = SA_AIS_ERR_LIBRARY;
2445 req = message;
2447 log_printf(CHAN_UNLINK_DEBUG,
2448 "saEvtChannelUnlink (Unlink channel request)\n");
2449 log_printf(CHAN_UNLINK_DEBUG, "Channel Name %s\n",
2450 req->iuc_channel_name.value);
2452 if (!find_channel(&req->iuc_channel_name, EVT_CHAN_ACTIVE)) {
2453 log_printf(CHAN_UNLINK_DEBUG, "Channel Name doesn't exist\n");
2454 error = SA_AIS_ERR_NOT_EXIST;
2455 goto evt_unlink_err;
2459 * Set up the data structure so that the channel op
2460 * mcast handler can complete the unlink comamnd back to the
2461 * requestor.
2463 ucp = malloc(sizeof(*ucp));
2464 if (!ucp) {
2465 log_printf(LOG_LEVEL_ERROR,
2466 "saEvtChannelUnlink: Memory allocation failure\n");
2467 error = SA_AIS_ERR_TRY_AGAIN;
2468 goto evt_unlink_err;
2471 ucp->ucp_unlink_id = next_chan_unlink_id();
2472 ucp->ucp_conn = conn;
2473 list_init(&ucp->ucp_entry);
2474 list_add_tail(&ucp->ucp_entry, &unlink_pending);
2477 * Put together a mcast packet to notify everyone
2478 * of the channel unlink.
2480 memset(&cpkt, 0, sizeof(cpkt));
2481 cpkt.chc_head.id =
2482 SERVICE_ID_MAKE(EVT_SERVICE, MESSAGE_REQ_EXEC_EVT_CHANCMD);
2483 cpkt.chc_head.size = sizeof(cpkt);
2484 cpkt.chc_op = EVT_UNLINK_CHAN_OP;
2485 cpkt.u.chcu.chcu_name = req->iuc_channel_name;
2486 cpkt.u.chcu.chcu_unlink_id = ucp->ucp_unlink_id;
2487 chn_iovec.iov_base = (char *)&cpkt;
2488 chn_iovec.iov_len = cpkt.chc_head.size;
2489 if (totempg_groups_mcast_joined(openais_group_handle,
2490 &chn_iovec, 1, TOTEMPG_AGREED) == 0) {
2491 return;
2494 evt_unlink_err:
2495 if (ucp) {
2496 list_del(&ucp->ucp_entry);
2497 free(ucp);
2499 res.iuc_head.size = sizeof(res);
2500 res.iuc_head.id = MESSAGE_RES_EVT_UNLINK_CHANNEL;
2501 res.iuc_head.error = error;
2502 openais_conn_send_response(conn, &res, sizeof(res));
2506 * Subscribe to an event channel.
2508 * - First look up the channel to subscribe.
2509 * - Make sure that the subscription ID is not already in use.
2510 * - Fill in the subscription data structures and add them to the channels
2511 * subscription list.
2512 * - See if there are any events with retetion times that need to be delivered
2513 * because of the new subscription.
2515 static char *filter_types[] = {
2516 "INVALID FILTER TYPE",
2517 "SA_EVT_PREFIX_FILTER",
2518 "SA_EVT_SUFFIX_FILTER",
2519 "SA_EVT_EXACT_FILTER",
2520 "SA_EVT_PASS_ALL_FILTER",
2524 * saEvtEventSubscribe Handler
2526 static void lib_evt_event_subscribe(void *conn, void *message)
2528 struct req_evt_event_subscribe *req;
2529 struct res_evt_event_subscribe res;
2530 mar_evt_event_filter_array_t *filters;
2531 SaAisErrorT error;
2532 struct event_svr_channel_open *eco;
2533 struct event_svr_channel_instance *eci;
2534 struct event_svr_channel_subscr *ecs;
2535 struct event_data *evt;
2536 struct list_head *l;
2537 void *ptr;
2538 unsigned int ret;
2539 int i;
2540 struct libevt_pd *esip;
2542 esip = (struct libevt_pd *)openais_conn_private_data_get(conn);
2544 req = message;
2546 log_printf(LOG_LEVEL_DEBUG,
2547 "saEvtEventSubscribe (Subscribe request)\n");
2548 log_printf(LOG_LEVEL_DEBUG, "subscription Id: 0x%llx\n",
2549 (unsigned long long)req->ics_sub_id);
2552 * look up the channel handle
2554 ret = hdb_handle_get(&esip->esi_hdb, req->ics_channel_handle, &ptr);
2555 if (ret != 0) {
2556 error = SA_AIS_ERR_BAD_HANDLE;
2557 goto subr_done;
2559 eco = ptr;
2561 eci = eco->eco_channel;
2564 * See if the id is already being used
2566 ecs = find_subscr(eco, req->ics_sub_id);
2567 if (ecs) {
2568 error = SA_AIS_ERR_EXIST;
2569 goto subr_put;
2572 error = evtfilt_to_aisfilt(req, &filters);
2574 if (error == SA_AIS_OK) {
2575 log_printf(LOG_LEVEL_DEBUG, "Subscribe filters count %d\n",
2576 (int)filters->filters_number);
2577 for (i = 0; i < filters->filters_number; i++) {
2578 log_printf(LOG_LEVEL_DEBUG, "type %s(%d) sz %d, <%s>\n",
2579 filter_types[filters->filters[i].filter_type],
2580 filters->filters[i].filter_type,
2581 (int)filters->filters[i].filter.pattern_size,
2582 (filters->filters[i].filter.pattern_size)
2583 ? (char *)filters->filters[i].filter.pattern
2584 : "");
2588 if (error != SA_AIS_OK) {
2589 goto subr_put;
2592 ecs = (struct event_svr_channel_subscr *)malloc(sizeof(*ecs));
2593 if (!ecs) {
2594 error = SA_AIS_ERR_NO_MEMORY;
2595 goto subr_put;
2597 ecs->ecs_filters = filters;
2598 ecs->ecs_sub_id = req->ics_sub_id;
2599 list_init(&ecs->ecs_entry);
2600 list_add(&ecs->ecs_entry, &eco->eco_subscr);
2603 res.ics_head.size = sizeof(res);
2604 res.ics_head.id = MESSAGE_RES_EVT_SUBSCRIBE;
2605 res.ics_head.error = error;
2606 openais_conn_send_response(conn, &res, sizeof(res));
2609 * See if an existing event with a retention time
2610 * needs to be delivered based on this subscription
2612 for (l = retained_list.next; l != &retained_list; l = l->next) {
2613 evt = list_entry(l, struct event_data, ed_retained);
2614 log_printf(LOG_LEVEL_DEBUG,
2615 "Checking event ID %llx chanp %p -- sub chanp %p\n",
2616 (unsigned long long)evt->ed_event.led_event_id,
2617 evt->ed_my_chan, eci);
2618 if (evt->ed_my_chan == eci) {
2619 if (evt_already_delivered(evt, eco)) {
2620 continue;
2622 if (event_match(evt, ecs) == SA_AIS_OK) {
2623 log_printf(LOG_LEVEL_DEBUG,
2624 "deliver event ID: 0x%llx\n",
2625 (unsigned long long)evt->ed_event.led_event_id);
2626 deliver_event(evt, eco, ecs);
2630 hdb_handle_put(&esip->esi_hdb, req->ics_channel_handle);
2631 return;
2633 subr_put:
2634 hdb_handle_put(&esip->esi_hdb, req->ics_channel_handle);
2635 subr_done:
2636 res.ics_head.size = sizeof(res);
2637 res.ics_head.id = MESSAGE_RES_EVT_SUBSCRIBE;
2638 res.ics_head.error = error;
2639 openais_conn_send_response(conn, &res, sizeof(res));
2643 * saEvtEventUnsubscribe Handler
2645 static void lib_evt_event_unsubscribe(void *conn, void *message)
2647 struct req_evt_event_unsubscribe *req;
2648 struct res_evt_event_unsubscribe res;
2649 struct event_svr_channel_open *eco;
2650 struct event_svr_channel_instance *eci;
2651 struct event_svr_channel_subscr *ecs;
2652 SaAisErrorT error = SA_AIS_OK;
2653 unsigned int ret;
2654 void *ptr;
2655 struct libevt_pd *esip;
2657 esip = (struct libevt_pd *)openais_conn_private_data_get(conn);
2659 req = message;
2661 log_printf(LOG_LEVEL_DEBUG,
2662 "saEvtEventUnsubscribe (Unsubscribe request)\n");
2663 log_printf(LOG_LEVEL_DEBUG, "subscription Id: 0x%llx\n",
2664 (unsigned long long)req->icu_sub_id);
2667 * look up the channel handle, get the open channel
2668 * data.
2670 ret = hdb_handle_get(&esip->esi_hdb,
2671 req->icu_channel_handle, &ptr);
2672 if (ret != 0) {
2673 error = SA_AIS_ERR_BAD_HANDLE;
2674 goto unsubr_done;
2676 eco = ptr;
2678 eci = eco->eco_channel;
2681 * Make sure that the id exists.
2683 ecs = find_subscr(eco, req->icu_sub_id);
2684 if (!ecs) {
2685 error = SA_AIS_ERR_NOT_EXIST;
2686 goto unsubr_put;
2689 list_del(&ecs->ecs_entry);
2691 log_printf(LOG_LEVEL_DEBUG,
2692 "unsubscribe from channel %s subscription ID 0x%x "
2693 "with %d filters\n",
2694 eci->esc_channel_name.value,
2695 ecs->ecs_sub_id, (int)ecs->ecs_filters->filters_number);
2697 free_filters(ecs->ecs_filters);
2698 free(ecs);
2700 unsubr_put:
2701 hdb_handle_put(&esip->esi_hdb, req->icu_channel_handle);
2702 unsubr_done:
2703 res.icu_head.size = sizeof(res);
2704 res.icu_head.id = MESSAGE_RES_EVT_UNSUBSCRIBE;
2705 res.icu_head.error = error;
2706 openais_conn_send_response(conn, &res, sizeof(res));
2710 * saEvtEventPublish Handler
2712 static void lib_evt_event_publish(void *conn, void *message)
2714 struct lib_event_data *req;
2715 struct res_evt_event_publish res;
2716 struct event_svr_channel_open *eco;
2717 struct event_svr_channel_instance *eci;
2718 mar_evteventid_t event_id = 0;
2719 uint64_t msg_id = 0;
2720 SaAisErrorT error = SA_AIS_OK;
2721 struct iovec pub_iovec;
2722 void *ptr;
2723 int result;
2724 unsigned int ret;
2725 struct libevt_pd *esip;
2727 esip = (struct libevt_pd *)openais_conn_private_data_get(conn);
2730 req = message;
2732 log_printf(LOG_LEVEL_DEBUG,
2733 "saEvtEventPublish (Publish event request)\n");
2737 * look up and validate open channel info
2739 ret = hdb_handle_get(&esip->esi_hdb,
2740 req->led_svr_channel_handle, &ptr);
2741 if (ret != 0) {
2742 error = SA_AIS_ERR_BAD_HANDLE;
2743 goto pub_done;
2745 eco = ptr;
2747 eci = eco->eco_channel;
2750 * modify the request structure for sending event data to subscribed
2751 * processes.
2753 get_event_id(&event_id, &msg_id);
2754 req->led_head.id = SERVICE_ID_MAKE(EVT_SERVICE, MESSAGE_REQ_EXEC_EVT_EVENTDATA);
2755 req->led_chan_name = eci->esc_channel_name;
2756 req->led_event_id = event_id;
2757 req->led_msg_id = msg_id;
2758 req->led_chan_unlink_id = eci->esc_unlink_id;
2761 * Distribute the event.
2762 * The multicasted event will be picked up and delivered
2763 * locally by the local network event receiver.
2765 pub_iovec.iov_base = (char *)req;
2766 pub_iovec.iov_len = req->led_head.size;
2767 result = totempg_groups_mcast_joined(openais_group_handle, &pub_iovec, 1, TOTEMPG_AGREED);
2768 if (result != 0) {
2769 error = SA_AIS_ERR_LIBRARY;
2772 hdb_handle_put(&esip->esi_hdb, req->led_svr_channel_handle);
2773 pub_done:
2774 res.iep_head.size = sizeof(res);
2775 res.iep_head.id = MESSAGE_RES_EVT_PUBLISH;
2776 res.iep_head.error = error;
2777 res.iep_event_id = event_id;
2778 openais_conn_send_response(conn, &res, sizeof(res));
2782 * saEvtEventRetentionTimeClear handler
2784 static void lib_evt_event_clear_retentiontime(void *conn, void *message)
2786 struct req_evt_event_clear_retentiontime *req;
2787 struct res_evt_event_clear_retentiontime res;
2788 struct req_evt_chan_command cpkt;
2789 struct retention_time_clear_pending *rtc = 0;
2790 struct iovec rtn_iovec;
2791 SaAisErrorT error;
2792 int ret;
2794 req = message;
2796 log_printf(RETENTION_TIME_DEBUG,
2797 "saEvtEventRetentionTimeClear (Clear event retentiontime request)\n");
2798 log_printf(RETENTION_TIME_DEBUG,
2799 "event ID 0x%llx, chan handle 0x%x\n",
2800 (unsigned long long)req->iec_event_id,
2801 req->iec_channel_handle);
2803 rtc = malloc(sizeof(*rtc));
2804 if (!rtc) {
2805 log_printf(LOG_LEVEL_ERROR,
2806 "saEvtEventRetentionTimeClear: Memory allocation failure\n");
2807 error = SA_AIS_ERR_TRY_AGAIN;
2808 goto evt_ret_clr_err;
2810 rtc->rtc_event_id = req->iec_event_id;
2811 rtc->rtc_conn = conn;
2812 list_init(&rtc->rtc_entry);
2813 list_add_tail(&rtc->rtc_entry, &clear_pending);
2816 * Send the clear request
2818 memset(&cpkt, 0, sizeof(cpkt));
2819 cpkt.chc_head.id =
2820 SERVICE_ID_MAKE(EVT_SERVICE, MESSAGE_REQ_EXEC_EVT_CHANCMD);
2821 cpkt.chc_head.size = sizeof(cpkt);
2822 cpkt.chc_op = EVT_CLEAR_RET_OP;
2823 cpkt.u.chc_event_id = req->iec_event_id;
2824 rtn_iovec.iov_base = (char *)&cpkt;
2825 rtn_iovec.iov_len = cpkt.chc_head.size;
2826 ret = totempg_groups_mcast_joined(openais_group_handle,
2827 &rtn_iovec, 1, TOTEMPG_AGREED);
2828 if (ret == 0) {
2829 // TODO this should really assert
2830 return;
2832 error = SA_AIS_ERR_LIBRARY;
2834 evt_ret_clr_err:
2835 if (rtc) {
2836 list_del(&rtc->rtc_entry);
2837 free(rtc);
2839 res.iec_head.size = sizeof(res);
2840 res.iec_head.id = MESSAGE_RES_EVT_CLEAR_RETENTIONTIME;
2841 res.iec_head.error = error;
2842 openais_conn_send_response(conn, &res, sizeof(res));
2847 * Send requested event data to the application
2849 static void lib_evt_event_data_get(void *conn, void *message)
2851 struct lib_event_data res;
2852 struct chan_event_list *cel;
2853 struct event_data *edp;
2854 int i;
2855 struct libevt_pd *esip;
2857 esip = (struct libevt_pd *)openais_conn_private_data_get(conn);
2861 * Deliver events in publish order within priority
2863 for (i = SA_EVT_HIGHEST_PRIORITY; i <= SA_EVT_LOWEST_PRIORITY; i++) {
2864 if (!list_empty(&esip->esi_events[i])) {
2865 cel = list_entry(esip->esi_events[i].next, struct chan_event_list,
2866 cel_entry);
2867 list_del(&cel->cel_entry);
2868 list_init(&cel->cel_entry);
2869 esip->esi_nevents--;
2870 if (esip->esi_queue_blocked &&
2871 (esip->esi_nevents < evt_delivery_queue_resume)) {
2872 esip->esi_queue_blocked = 0;
2873 log_printf(LOG_LEVEL_DEBUG, "unblock\n");
2875 edp = cel->cel_event;
2876 edp->ed_event.led_lib_channel_handle = cel->cel_chan_handle;
2877 edp->ed_event.led_sub_id = cel->cel_sub_id;
2878 edp->ed_event.led_head.id = MESSAGE_RES_EVT_EVENT_DATA;
2879 edp->ed_event.led_head.error = SA_AIS_OK;
2880 free(cel);
2881 openais_conn_send_response(conn, &edp->ed_event,
2882 edp->ed_event.led_head.size);
2883 free_event_data(edp);
2884 goto data_get_done;
2888 res.led_head.size = sizeof(res.led_head);
2889 res.led_head.id = MESSAGE_RES_EVT_EVENT_DATA;
2890 res.led_head.error = SA_AIS_ERR_NOT_EXIST;
2891 openais_conn_send_response(conn, &res, res.led_head.size);
2894 * See if there are any events that the app doesn't know about
2895 * because the notify pipe was full.
2897 data_get_done:
2898 if (esip->esi_nevents) {
2899 __notify_event(conn);
2904 * Scan the list of channels and remove the specified node.
2906 static void remove_chan_open_info(mar_uint32_t node_id)
2908 struct list_head *l, *nxt;
2909 struct event_svr_channel_instance *eci;
2911 for (l = esc_head.next; l != &esc_head; l = nxt) {
2912 nxt = l->next;
2913 eci = list_entry(l, struct event_svr_channel_instance, esc_entry);
2914 remove_open_count(eci, node_id);
2921 * Called when there is a configuration change in the cluster.
2922 * This function looks at any joiners and leavers and updates the evt
2923 * node list. The node list is used to keep track of event IDs
2924 * received for each node for the detection of duplicate events.
2926 static void evt_conf_change(
2927 enum totem_configuration_type configuration_type,
2928 unsigned int *member_list, int member_list_entries,
2929 unsigned int *left_list, int left_list_entries,
2930 unsigned int *joined_list, int joined_list_entries,
2931 struct memb_ring_id *ring_id)
2933 log_printf(RECOVERY_DEBUG, "Evt conf change %d\n",
2934 configuration_type);
2935 log_printf(RECOVERY_DEBUG, "m %d, j %d, l %d\n",
2936 member_list_entries,
2937 joined_list_entries,
2938 left_list_entries);
2941 * Save the various membership lists for later processing by
2942 * the synchronization functions. The left list is only
2943 * valid in the transitional configuration, the joined list is
2944 * only valid in the regular configuration. Other than for the
2945 * purposes of delivering retained events from merging partitions,
2946 * we only care about the final membership from the regular
2947 * configuration.
2949 if (configuration_type == TOTEM_CONFIGURATION_TRANSITIONAL) {
2951 left_member_count = left_list_entries;
2952 trans_member_count = member_list_entries;
2954 if (left_member_list) {
2955 free(left_member_list);
2956 left_member_list = 0;
2958 if (left_list_entries) {
2959 left_member_list =
2960 malloc(sizeof(unsigned int) * left_list_entries);
2961 if (!left_member_list) {
2963 * ERROR: No recovery.
2965 log_printf(LOG_LEVEL_ERROR,
2966 "Config change left list allocation error\n");
2967 assert(0);
2969 memcpy(left_member_list, left_list,
2970 sizeof(unsigned int) * left_list_entries);
2973 if (trans_member_list) {
2974 free(trans_member_list);
2975 trans_member_list = 0;
2977 if (member_list_entries) {
2978 trans_member_list =
2979 malloc(sizeof(unsigned int) * member_list_entries);
2981 if (!trans_member_list) {
2983 * ERROR: No recovery.
2985 log_printf(LOG_LEVEL_ERROR,
2986 "Config change transitional member list allocation error\n");
2987 assert(0);
2989 memcpy(trans_member_list, member_list,
2990 sizeof(unsigned int) * member_list_entries);
2994 if (configuration_type == TOTEM_CONFIGURATION_REGULAR) {
2996 joined_member_count = joined_list_entries;
2997 total_member_count = member_list_entries;
2999 if (joined_member_list) {
3000 free(joined_member_list);
3001 joined_member_list = 0;
3003 if (joined_list_entries) {
3004 joined_member_list =
3005 malloc(sizeof(unsigned int) * joined_list_entries);
3006 if (!joined_member_list) {
3008 * ERROR: No recovery.
3010 log_printf(LOG_LEVEL_ERROR,
3011 "Config change joined list allocation error\n");
3012 assert(0);
3014 memcpy(joined_member_list, joined_list,
3015 sizeof(unsigned int) * joined_list_entries);
3019 if (current_member_list) {
3020 free(current_member_list);
3021 current_member_list = 0;
3023 if (member_list_entries) {
3024 current_member_list =
3025 malloc(sizeof(unsigned int) * member_list_entries);
3027 if (!current_member_list) {
3029 * ERROR: No recovery.
3031 log_printf(LOG_LEVEL_ERROR,
3032 "Config change member list allocation error\n");
3033 assert(0);
3035 memcpy(current_member_list, member_list,
3036 sizeof(unsigned int) * member_list_entries);
3042 * saEvtFinalize Handler
3044 static int evt_lib_exit(void *conn)
3047 struct event_svr_channel_open *eco;
3048 struct list_head *l, *nxt;
3049 struct open_chan_pending *ocp;
3050 struct unlink_chan_pending *ucp;
3051 struct retention_time_clear_pending *rtc;
3052 struct libevt_pd *esip =
3053 openais_conn_private_data_get(openais_conn_partner_get(conn));
3055 log_printf(LOG_LEVEL_DEBUG, "saEvtFinalize (Event exit request)\n");
3056 log_printf(LOG_LEVEL_DEBUG, "saEvtFinalize %d evts on list\n",
3057 esip->esi_nevents);
3060 * Clean up any open channels and associated subscriptions.
3062 for (l = esip->esi_open_chans.next; l != &esip->esi_open_chans; l = nxt) {
3063 nxt = l->next;
3064 eco = list_entry(l, struct event_svr_channel_open, eco_instance_entry);
3065 common_chan_close(eco, esip);
3066 hdb_handle_destroy(&esip->esi_hdb, eco->eco_my_handle);
3070 * Clean up any pending async operations
3072 for (l = open_pending.next; l != &open_pending; l = nxt) {
3073 nxt = l->next;
3074 ocp = list_entry(l, struct open_chan_pending, ocp_entry);
3075 if (esip == openais_conn_private_data_get(ocp->ocp_conn)) {
3076 list_del(&ocp->ocp_entry);
3077 free(ocp);
3081 for (l = unlink_pending.next; l != &unlink_pending; l = nxt) {
3082 nxt = l->next;
3083 ucp = list_entry(l, struct unlink_chan_pending, ucp_entry);
3084 if (esip == openais_conn_private_data_get(ucp->ucp_conn)) {
3085 list_del(&ucp->ucp_entry);
3086 free(ucp);
3090 for (l = clear_pending.next;
3091 l != &clear_pending; l = nxt) {
3092 nxt = l->next;
3093 rtc = list_entry(l, struct retention_time_clear_pending, rtc_entry);
3094 if (esip == openais_conn_private_data_get(rtc->rtc_conn)) {
3095 list_del(&rtc->rtc_entry);
3096 free(rtc);
3101 * Destroy the open channel handle database
3103 hdb_destroy(&esip->esi_hdb);
3105 return 0;
3109 * Called at service start time.
3111 static int evt_exec_init(struct objdb_iface_ver0 *objdb)
3113 unsigned int object_service_handle;
3114 char *value;
3116 log_printf(LOG_LEVEL_DEBUG, "Evt exec init request\n");
3118 objdb->object_find_reset (OBJECT_PARENT_HANDLE);
3119 if (objdb->object_find (
3120 OBJECT_PARENT_HANDLE,
3121 "event",
3122 strlen ("event"),
3123 &object_service_handle) == 0) {
3125 value = NULL;
3126 if ( !objdb->object_key_get (object_service_handle,
3127 "delivery_queue_size",
3128 strlen ("delivery_queue_size"),
3129 (void *)&value,
3130 NULL) && value) {
3131 evt_delivery_queue_size = atoi(value);
3132 log_printf(LOG_LEVEL_NOTICE,
3133 "event delivery_queue_size set to %u\n",
3134 evt_delivery_queue_size);
3136 value = NULL;
3137 if ( !objdb->object_key_get (object_service_handle,
3138 "delivery_queue_resume",
3139 strlen ("delivery_queue_resume"),
3140 (void *)&value,
3141 NULL) && value) {
3142 evt_delivery_queue_resume = atoi(value);
3143 log_printf(LOG_LEVEL_NOTICE,
3144 "event delivery_queue_resume set to %u\n",
3145 evt_delivery_queue_size);
3150 * Create an event to be sent when we have to drop messages
3151 * for an application.
3153 dropped_event_size = sizeof(*dropped_event) + sizeof(dropped_pattern);
3154 dropped_event = malloc(dropped_event_size);
3155 if (dropped_event == 0) {
3156 log_printf(LOG_LEVEL_ERROR,
3157 "Memory Allocation Failure, event service not started\n");
3158 errno = ENOMEM;
3159 return -1;
3161 memset(dropped_event, 0, sizeof(*dropped_event) + sizeof(dropped_pattern));
3162 dropped_event->ed_ref_count = 1;
3163 list_init(&dropped_event->ed_retained);
3164 dropped_event->ed_event.led_head.size =
3165 sizeof(*dropped_event) + sizeof(dropped_pattern);
3166 dropped_event->ed_event.led_head.error = SA_AIS_OK;
3167 dropped_event->ed_event.led_priority = SA_EVT_HIGHEST_PRIORITY;
3168 dropped_event->ed_event.led_chan_name = lost_chan;
3169 dropped_event->ed_event.led_publisher_name = dropped_publisher;
3170 dropped_event->ed_event.led_patterns_number = 1;
3171 memcpy(&dropped_event->ed_event.led_body[0],
3172 &dropped_pattern, sizeof(dropped_pattern));
3173 return 0;
3176 static int
3177 try_deliver_event(struct event_data *evt,
3178 struct event_svr_channel_instance *eci)
3180 struct list_head *l, *l1;
3181 struct event_svr_channel_open *eco;
3182 struct event_svr_channel_subscr *ecs;
3183 int delivered_event = 0;
3185 * Check open channels
3187 for (l = eci->esc_open_chans.next; l != &eci->esc_open_chans; l = l->next) {
3188 eco = list_entry(l, struct event_svr_channel_open, eco_entry);
3190 * See if enabled to receive
3192 if (!(eco->eco_flags & SA_EVT_CHANNEL_SUBSCRIBER)) {
3193 continue;
3197 * Check subscriptions
3199 for (l1 = eco->eco_subscr.next; l1 != &eco->eco_subscr; l1 = l1->next) {
3200 ecs = list_entry(l1, struct event_svr_channel_subscr, ecs_entry);
3202 * Apply filter rules and deliver if patterns
3203 * match filters.
3204 * Only deliver one event per open channel
3206 if (event_match(evt, ecs) == SA_AIS_OK) {
3207 deliver_event(evt, eco, ecs);
3208 delivered_event++;
3209 break;
3213 return delivered_event;
3217 * Receive the network event message and distribute it to local subscribers
3219 static void evt_remote_evt(void *msg, unsigned int nodeid)
3222 * - retain events that have a retention time
3223 * - Find assocated channel
3224 * - Scan list of subscribers
3225 * - Apply filters
3226 * - Deliver events that pass the filter test
3228 struct lib_event_data *evtpkt = msg;
3229 struct event_svr_channel_instance *eci;
3230 struct event_data *evt;
3231 SaClmClusterNodeT *cn;
3233 log_printf(LOG_LEVEL_DEBUG, "Remote event data received from nodeid %s\n",
3234 totempg_ifaces_print (nodeid));
3237 * See where the message came from so that we can set the
3238 * publishing node id in the message before delivery.
3240 cn = main_clm_get_by_nodeid(nodeid);
3241 if (!cn) {
3243 * Not sure how this can happen...
3245 log_printf(LOG_LEVEL_DEBUG, "No cluster node data for nodeid %s\n",
3246 totempg_ifaces_print (nodeid));
3247 errno = ENXIO;
3248 return;
3250 log_printf(LOG_LEVEL_DEBUG, "Cluster node ID %s name %s\n",
3251 totempg_ifaces_print (cn->nodeId), cn->nodeName.value);
3253 evtpkt->led_publisher_node_id = nodeid;
3254 evtpkt->led_nodeid = nodeid;
3255 evtpkt->led_receive_time = clust_time_now();
3257 if (evtpkt->led_chan_unlink_id != EVT_CHAN_ACTIVE) {
3258 log_printf(CHAN_UNLINK_DEBUG,
3259 "evt_remote_evt(0): chan %s, id 0x%llx\n",
3260 evtpkt->led_chan_name.value,
3261 (unsigned long long)evtpkt->led_chan_unlink_id);
3263 eci = find_channel(&evtpkt->led_chan_name, evtpkt->led_chan_unlink_id);
3265 * We may have had some events that were already queued when an
3266 * unlink happened, if we don't find the channel in the active list
3267 * look for the last unlinked channel of the same name. If this channel
3268 * is re-opened the messages will still be routed correctly because new
3269 * active channel messages will be ordered behind the open.
3271 if (!eci && (evtpkt->led_chan_unlink_id == EVT_CHAN_ACTIVE)) {
3272 log_printf(CHAN_UNLINK_DEBUG,
3273 "evt_remote_evt(1): chan %s, id 0x%llx\n",
3274 evtpkt->led_chan_name.value,
3275 (unsigned long long)evtpkt->led_chan_unlink_id);
3276 eci = find_last_unlinked_channel(&evtpkt->led_chan_name);
3280 * We shouldn't normally see an event for a channel that we
3281 * don't know about.
3283 if (!eci) {
3284 log_printf(LOG_LEVEL_DEBUG, "Channel %s doesn't exist\n",
3285 evtpkt->led_chan_name.value);
3286 return;
3289 if (check_last_event(evtpkt, nodeid)) {
3290 return;
3293 evt = make_local_event(evtpkt, eci);
3294 if (!evt) {
3295 log_printf(LOG_LEVEL_WARNING,
3296 "1Memory allocation error, can't deliver event\n");
3297 return;
3300 if (evt->ed_event.led_retention_time) {
3301 retain_event(evt);
3304 try_deliver_event(evt, eci);
3305 free_event_data(evt);
3309 * Calculate the remaining retention time of a received event during recovery
3311 inline mar_time_t calc_retention_time(mar_time_t retention,
3312 mar_time_t received, mar_time_t now)
3314 if ((received < now) && ((now - received) < retention)) {
3315 return retention - (now - received);
3316 } else {
3317 return 0;
3322 * Receive a recovery network event message and save it in the retained list
3324 static void evt_remote_recovery_evt(void *msg, unsigned int nodeid)
3327 * - calculate remaining retention time
3328 * - Find assocated channel
3329 * - Scan list of subscribers
3330 * - Apply filters
3331 * - Deliver events that pass the filter test
3333 struct lib_event_data *evtpkt = msg;
3334 struct event_svr_channel_instance *eci;
3335 struct event_data *evt;
3336 struct member_node_data *md;
3337 int num_delivered;
3338 mar_time_t now;
3340 now = clust_time_now();
3342 log_printf(RECOVERY_EVENT_DEBUG,
3343 "Remote recovery event data received from nodeid %d\n", nodeid);
3345 if (recovery_phase == evt_recovery_complete) {
3346 log_printf(RECOVERY_EVENT_DEBUG,
3347 "Received recovery data, not in recovery mode\n");
3348 return;
3351 log_printf(RECOVERY_EVENT_DEBUG,
3352 "Processing recovery of retained events\n");
3353 if (recovery_node) {
3354 log_printf(RECOVERY_EVENT_DEBUG, "This node is the recovery node\n");
3357 log_printf(RECOVERY_EVENT_DEBUG, "(1)EVT ID: %llx, Time: %llx\n",
3358 (unsigned long long)evtpkt->led_event_id,
3359 (unsigned long long)evtpkt->led_retention_time);
3361 * Calculate remaining retention time
3363 evtpkt->led_retention_time = calc_retention_time(
3364 evtpkt->led_retention_time,
3365 evtpkt->led_receive_time,
3366 now);
3368 log_printf(RECOVERY_EVENT_DEBUG,
3369 "(2)EVT ID: %llx, ret: %llx, rec: %llx, now: %llx\n",
3370 (unsigned long long)evtpkt->led_event_id,
3371 (unsigned long long)evtpkt->led_retention_time,
3372 (unsigned long long)evtpkt->led_receive_time,
3373 (unsigned long long)now);
3376 * If we haven't seen this event yet and it has remaining time, process
3377 * the event.
3379 if (!check_last_event(evtpkt, evtpkt->led_nodeid) &&
3380 evtpkt->led_retention_time) {
3382 * See where the message came from so that we can set the
3383 * publishing node id in the message before delivery.
3385 md = evt_find_node(evtpkt->led_nodeid);
3386 if (!md) {
3388 * Not sure how this can happen
3390 log_printf(LOG_LEVEL_NOTICE, "No node for nodeid %s\n",
3391 totempg_ifaces_print (evtpkt->led_nodeid));
3392 return;
3394 log_printf(LOG_LEVEL_DEBUG, "Cluster node ID %s name %s\n",
3395 totempg_ifaces_print (md->mn_node_info.nodeId),
3396 md->mn_node_info.nodeName.value);
3398 log_printf(CHAN_UNLINK_DEBUG,
3399 "evt_recovery_event: chan %s, id 0x%llx\n",
3400 evtpkt->led_chan_name.value,
3401 (unsigned long long)evtpkt->led_chan_unlink_id);
3402 eci = find_channel(&evtpkt->led_chan_name, evtpkt->led_chan_unlink_id);
3405 * We shouldn't normally see an event for a channel that we don't
3406 * know about.
3408 if (!eci) {
3409 log_printf(RECOVERY_EVENT_DEBUG, "Channel %s doesn't exist\n",
3410 evtpkt->led_chan_name.value);
3411 return;
3414 evt = make_local_event(evtpkt, eci);
3415 if (!evt) {
3416 log_printf(LOG_LEVEL_WARNING,
3417 "2Memory allocation error, can't deliver event\n");
3418 errno = ENOMEM;
3419 return;
3422 retain_event(evt);
3423 num_delivered = try_deliver_event(evt, eci);
3424 log_printf(RECOVERY_EVENT_DEBUG, "Delivered to %d subscribers\n",
3425 num_delivered);
3426 free_event_data(evt);
3432 * Timeout handler for event channel open. We flag the structure
3433 * as timed out. Then if the open request is ever returned, we can
3434 * issue a close channel and keep the reference counting correct.
3436 static void chan_open_timeout(void *data)
3438 struct open_chan_pending *ocp = (struct open_chan_pending *)data;
3439 struct res_evt_channel_open res;
3441 res.ico_head.size = sizeof(res);
3442 res.ico_head.id = MESSAGE_RES_EVT_OPEN_CHANNEL;
3443 res.ico_head.error = SA_AIS_ERR_TIMEOUT;
3444 ocp->ocp_invocation = OPEN_TIMED_OUT;
3445 openais_conn_send_response(ocp->ocp_conn, &res, sizeof(res));
3449 * Called by the channel open exec handler to finish the open and
3450 * respond to the application
3452 static void evt_chan_open_finish(struct open_chan_pending *ocp,
3453 struct event_svr_channel_instance *eci)
3455 struct event_svr_channel_open *eco;
3456 SaAisErrorT error = SA_AIS_OK;
3457 unsigned int ret = 0;
3458 unsigned int timer_del_status = 0;
3459 void *ptr = 0;
3460 uint32_t handle = 0;
3461 struct libevt_pd *esip;
3463 esip = (struct libevt_pd *)openais_conn_private_data_get(ocp->ocp_conn);
3465 log_printf(CHAN_OPEN_DEBUG, "Open channel finish %s\n",
3466 get_mar_name_t(&ocp->ocp_chan_name));
3467 if (ocp->ocp_timer_handle) {
3468 openais_timer_delete (ocp->ocp_timer_handle);
3472 * If this is a finished open for a timed out request, then
3473 * send out a close on this channel to clean things up.
3475 if (ocp->ocp_invocation == OPEN_TIMED_OUT) {
3476 log_printf(CHAN_OPEN_DEBUG, "Closing timed out open of %s\n",
3477 get_mar_name_t(&ocp->ocp_chan_name));
3478 error = evt_close_channel(&ocp->ocp_chan_name, EVT_CHAN_ACTIVE);
3479 if (error != SA_AIS_OK) {
3480 log_printf(CHAN_OPEN_DEBUG,
3481 "Close of timed out open failed for %s\n",
3482 get_mar_name_t(&ocp->ocp_chan_name));
3484 list_del(&ocp->ocp_entry);
3485 free(ocp);
3486 return;
3490 * Create a handle to give back to the caller to associate
3491 * with this channel open instance.
3493 ret = hdb_handle_create(&esip->esi_hdb, sizeof(*eco), &handle);
3494 if (ret != 0) {
3495 goto open_return;
3497 ret = hdb_handle_get(&esip->esi_hdb, handle, &ptr);
3498 if (ret != 0) {
3499 goto open_return;
3501 eco = ptr;
3504 * Initailize and link into the global channel structure.
3506 list_init(&eco->eco_subscr);
3507 list_init(&eco->eco_entry);
3508 list_init(&eco->eco_instance_entry);
3509 eco->eco_flags = ocp->ocp_open_flag;
3510 eco->eco_channel = eci;
3511 eco->eco_lib_handle = ocp->ocp_c_handle;
3512 eco->eco_my_handle = handle;
3513 eco->eco_conn = ocp->ocp_conn;
3514 list_add_tail(&eco->eco_entry, &eci->esc_open_chans);
3515 list_add_tail(&eco->eco_instance_entry, &esip->esi_open_chans);
3518 * respond back with a handle to access this channel
3519 * open instance for later subscriptions, etc.
3521 hdb_handle_put(&esip->esi_hdb, handle);
3523 open_return:
3524 log_printf(CHAN_OPEN_DEBUG, "Open channel finish %s send response %d\n",
3525 get_mar_name_t(&ocp->ocp_chan_name),
3526 error);
3527 if (ocp->ocp_async) {
3528 struct res_evt_open_chan_async resa;
3529 resa.ica_head.size = sizeof(resa);
3530 resa.ica_head.id = MESSAGE_RES_EVT_CHAN_OPEN_CALLBACK;
3531 resa.ica_head.error = (ret == 0 ? SA_AIS_OK: SA_AIS_ERR_BAD_HANDLE);
3532 resa.ica_channel_handle = handle;
3533 resa.ica_c_handle = ocp->ocp_c_handle;
3534 resa.ica_invocation = ocp->ocp_invocation;
3535 openais_conn_send_response(openais_conn_partner_get(ocp->ocp_conn),
3536 &resa, sizeof(resa));
3537 } else {
3538 struct res_evt_channel_open res;
3539 res.ico_head.size = sizeof(res);
3540 res.ico_head.id = MESSAGE_RES_EVT_OPEN_CHANNEL;
3541 res.ico_head.error = (ret == 0 ? SA_AIS_OK : SA_AIS_ERR_BAD_HANDLE);
3542 res.ico_channel_handle = handle;
3543 openais_conn_send_response(ocp->ocp_conn, &res, sizeof(res));
3546 if (timer_del_status == 0) {
3547 list_del(&ocp->ocp_entry);
3548 free(ocp);
3553 * Called by the channel unlink exec handler to
3554 * respond to the application.
3556 static void evt_chan_unlink_finish(struct unlink_chan_pending *ucp)
3558 struct res_evt_channel_unlink res;
3560 log_printf(CHAN_UNLINK_DEBUG, "Unlink channel finish ID 0x%llx\n",
3561 (unsigned long long)ucp->ucp_unlink_id);
3563 list_del(&ucp->ucp_entry);
3565 res.iuc_head.size = sizeof(res);
3566 res.iuc_head.id = MESSAGE_RES_EVT_UNLINK_CHANNEL;
3567 res.iuc_head.error = SA_AIS_OK;
3568 openais_conn_send_response(ucp->ucp_conn, &res, sizeof(res));
3570 free(ucp);
3574 * Called by the retention time clear exec handler to
3575 * respond to the application.
3577 static void evt_ret_time_clr_finish(struct retention_time_clear_pending *rtc,
3578 SaAisErrorT ret)
3580 struct res_evt_event_clear_retentiontime res;
3582 log_printf(RETENTION_TIME_DEBUG, "Retention Time Clear finish ID 0x%llx\n",
3583 (unsigned long long)rtc->rtc_event_id);
3585 res.iec_head.size = sizeof(res);
3586 res.iec_head.id = MESSAGE_RES_EVT_CLEAR_RETENTIONTIME;
3587 res.iec_head.error = ret;
3588 openais_conn_send_response(rtc->rtc_conn, &res, sizeof(res));
3590 list_del(&rtc->rtc_entry);
3591 free(rtc);
3595 * Take the channel command data and swap the elements so they match
3596 * our architectures word order.
3598 static void
3599 convert_chan_packet(void *msg)
3601 struct req_evt_chan_command *cpkt = (struct req_evt_chan_command *)msg;
3604 * converted in the main deliver_fn:
3605 * led_head.id, led_head.size.
3609 cpkt->chc_op = swab32(cpkt->chc_op);
3612 * Which elements of the packet that are converted depend
3613 * on the operation.
3615 switch (cpkt->chc_op) {
3617 case EVT_OPEN_CHAN_OP:
3618 swab_mar_name_t (&cpkt->u.chc_chan.ocr_name);
3619 cpkt->u.chc_chan.ocr_serial_no = swab64(cpkt->u.chc_chan.ocr_serial_no);
3620 break;
3622 case EVT_UNLINK_CHAN_OP:
3623 case EVT_CLOSE_CHAN_OP:
3624 swab_mar_name_t (&cpkt->u.chcu.chcu_name);
3625 cpkt->u.chcu.chcu_unlink_id = swab64(cpkt->u.chcu.chcu_unlink_id);
3626 break;
3628 case EVT_CLEAR_RET_OP:
3629 cpkt->u.chc_event_id = swab64(cpkt->u.chc_event_id);
3630 break;
3632 case EVT_SET_ID_OP:
3633 cpkt->u.chc_set_id.chc_nodeid =
3634 swab32(cpkt->u.chc_set_id.chc_nodeid);
3635 cpkt->u.chc_set_id.chc_last_id = swab64(cpkt->u.chc_set_id.chc_last_id);
3636 break;
3638 case EVT_OPEN_COUNT:
3639 swab_mar_name_t (&cpkt->u.chc_set_opens.chc_chan_name);
3640 cpkt->u.chc_set_opens.chc_open_count =
3641 swab32(cpkt->u.chc_set_opens.chc_open_count);
3642 break;
3645 * No data assocaited with these ops.
3647 case EVT_CONF_DONE:
3648 case EVT_OPEN_COUNT_DONE:
3649 break;
3652 * Make sure that this function is updated when new ops are added.
3654 default:
3655 assert(0);
3661 * Receive and process remote event operations.
3662 * Used to communicate channel opens/closes, clear retention time,
3663 * config change updates...
3665 static void evt_remote_chan_op(void *msg, unsigned int nodeid)
3667 struct req_evt_chan_command *cpkt = msg;
3668 unsigned int local_node = {SA_CLM_LOCAL_NODE_ID};
3669 SaClmClusterNodeT *cn, *my_node;
3670 struct member_node_data *mn;
3671 struct event_svr_channel_instance *eci;
3673 log_printf(REMOTE_OP_DEBUG, "Remote channel operation request\n");
3674 my_node = main_clm_get_by_nodeid(local_node);
3675 log_printf(REMOTE_OP_DEBUG, "my node ID: 0x%x\n", my_node->nodeId);
3677 mn = evt_find_node(nodeid);
3678 if (mn == NULL) {
3679 cn = main_clm_get_by_nodeid(nodeid);
3680 if (cn == NULL) {
3681 log_printf(LOG_LEVEL_WARNING,
3682 "Evt remote channel op: Node data for nodeid %d is NULL\n",
3683 nodeid);
3684 return;
3685 } else {
3686 evt_add_node(nodeid, cn);
3687 mn = evt_find_node(nodeid);
3691 switch (cpkt->chc_op) {
3693 * Open channel remote command. The open channel request is done
3694 * in two steps. When an pplication tries to open, we send an open
3695 * channel message to the other nodes. When we receive an open channel
3696 * message, we may create the channel structure if it doesn't exist
3697 * and also complete applicaiton open requests for the specified
3698 * channel. We keep a counter of total opens for the given channel and
3699 * later when it has been completely closed (everywhere in the cluster)
3700 * we will free up the assocated channel data.
3702 case EVT_OPEN_CHAN_OP: {
3703 struct open_chan_pending *ocp;
3704 struct list_head *l, *nxt;
3706 log_printf(CHAN_OPEN_DEBUG, "Opening channel %s for node %s\n",
3707 cpkt->u.chc_chan.ocr_name.value,
3708 totempg_ifaces_print (mn->mn_node_info.nodeId));
3709 eci = find_channel(&cpkt->u.chc_chan.ocr_name, EVT_CHAN_ACTIVE);
3711 if (!eci) {
3712 eci = create_channel(&cpkt->u.chc_chan.ocr_name);
3714 if (!eci) {
3715 log_printf(LOG_LEVEL_WARNING, "Could not create channel %s\n",
3716 get_mar_name_t(&cpkt->u.chc_chan.ocr_name));
3717 break;
3720 inc_open_count(eci, mn->mn_node_info.nodeId);
3722 if (mn->mn_node_info.nodeId == my_node->nodeId) {
3724 * Complete one of our pending open requests
3726 for (l = open_pending.next; l != &open_pending; l = nxt) {
3727 nxt = l->next;
3728 ocp = list_entry(l, struct open_chan_pending, ocp_entry);
3729 log_printf(CHAN_OPEN_DEBUG,
3730 "Compare channel req no %llu %llu\n",
3731 (unsigned long long)ocp->ocp_serial_no,
3732 (unsigned long long)cpkt->u.chc_chan.ocr_serial_no);
3733 if (ocp->ocp_serial_no == cpkt->u.chc_chan.ocr_serial_no) {
3734 evt_chan_open_finish(ocp, eci);
3735 break;
3739 log_printf(CHAN_OPEN_DEBUG,
3740 "Open channel %s t %d, l %d, r %d\n",
3741 get_mar_name_t(&eci->esc_channel_name),
3742 eci->esc_total_opens, eci->esc_local_opens,
3743 eci->esc_retained_count);
3744 break;
3748 * Handle a channel close. We'll decrement the global open counter and
3749 * free up channel associated data when all instances are closed.
3751 case EVT_CLOSE_CHAN_OP:
3752 log_printf(LOG_LEVEL_DEBUG, "Closing channel %s for node 0x%x\n",
3753 cpkt->u.chcu.chcu_name.value, mn->mn_node_info.nodeId);
3754 eci = find_channel(&cpkt->u.chcu.chcu_name, cpkt->u.chcu.chcu_unlink_id);
3755 if (!eci) {
3756 log_printf(LOG_LEVEL_NOTICE,
3757 "Channel close request for %s not found\n",
3758 cpkt->u.chcu.chcu_name.value);
3759 break;
3763 * if last instance, we can free up assocated data.
3765 dec_open_count(eci, mn->mn_node_info.nodeId);
3766 log_printf(LOG_LEVEL_DEBUG,
3767 "Close channel %s t %d, l %d, r %d\n",
3768 eci->esc_channel_name.value,
3769 eci->esc_total_opens, eci->esc_local_opens,
3770 eci->esc_retained_count);
3771 delete_channel(eci);
3772 break;
3775 * Handle a request for channel unlink saEvtChannelUnlink().
3776 * We'll look up the channel and mark it as unlinked. Respond to
3777 * local library unlink commands.
3779 case EVT_UNLINK_CHAN_OP: {
3780 struct unlink_chan_pending *ucp;
3781 struct list_head *l, *nxt;
3783 log_printf(CHAN_UNLINK_DEBUG,
3784 "Unlink request channel %s unlink ID 0x%llx from node 0x%x\n",
3785 cpkt->u.chcu.chcu_name.value,
3786 (unsigned long long)cpkt->u.chcu.chcu_unlink_id,
3787 mn->mn_node_info.nodeId);
3791 * look up the channel name and get its assoicated data.
3793 eci = find_channel(&cpkt->u.chcu.chcu_name,
3794 EVT_CHAN_ACTIVE);
3795 if (!eci) {
3796 log_printf(LOG_LEVEL_NOTICE,
3797 "Channel unlink request for %s not found\n",
3798 cpkt->u.chcu.chcu_name.value);
3799 } else {
3801 * Mark channel as unlinked.
3803 unlink_channel(eci, cpkt->u.chcu.chcu_unlink_id);
3807 * respond only to local library requests.
3809 if (mn->mn_node_info.nodeId == my_node->nodeId) {
3811 * Complete one of our pending unlink requests
3813 for (l = unlink_pending.next; l != &unlink_pending; l = nxt) {
3814 nxt = l->next;
3815 ucp = list_entry(l, struct unlink_chan_pending, ucp_entry);
3816 log_printf(CHAN_UNLINK_DEBUG,
3817 "Compare channel id 0x%llx 0x%llx\n",
3818 (unsigned long long)ucp->ucp_unlink_id,
3819 (unsigned long long)cpkt->u.chcu.chcu_unlink_id);
3820 if (ucp->ucp_unlink_id == cpkt->u.chcu.chcu_unlink_id) {
3821 evt_chan_unlink_finish(ucp);
3822 break;
3826 break;
3831 * saEvtClearRetentionTime handler.
3833 case EVT_CLEAR_RET_OP:
3835 SaAisErrorT did_clear;
3836 struct retention_time_clear_pending *rtc;
3837 struct list_head *l, *nxt;
3839 log_printf(RETENTION_TIME_DEBUG, "Clear retention time request %llx\n",
3840 (unsigned long long)cpkt->u.chc_event_id);
3841 did_clear = clear_retention_time(cpkt->u.chc_event_id);
3844 * Respond to local library requests
3846 if (mn->mn_node_info.nodeId == my_node->nodeId) {
3848 * Complete pending request
3850 for (l = clear_pending.next; l != &clear_pending; l = nxt) {
3851 nxt = l->next;
3852 rtc = list_entry(l, struct retention_time_clear_pending,
3853 rtc_entry);
3854 if (rtc->rtc_event_id == cpkt->u.chc_event_id) {
3855 evt_ret_time_clr_finish(rtc, did_clear);
3856 break;
3860 break;
3864 * Set our next event ID based on the largest event ID seen
3865 * by others in the cluster. This way, if we've left and re-joined, we'll
3866 * start using an event ID that is unique.
3868 case EVT_SET_ID_OP: {
3869 int log_level = LOG_LEVEL_DEBUG;
3870 if (cpkt->u.chc_set_id.chc_nodeid == my_node->nodeId) {
3871 log_level = RECOVERY_DEBUG;
3873 log_printf(log_level,
3874 "Received Set event ID OP from nodeid %x to %llx for %x my addr %s base %llx\n",
3875 nodeid,
3876 (unsigned long long)cpkt->u.chc_set_id.chc_last_id,
3877 cpkt->u.chc_set_id.chc_nodeid,
3878 totempg_ifaces_print (my_node->nodeId),
3879 (unsigned long long)base_id);
3880 if (cpkt->u.chc_set_id.chc_nodeid == my_node->nodeId) {
3881 if (cpkt->u.chc_set_id.chc_last_id >= base_id) {
3882 log_printf(RECOVERY_DEBUG,
3883 "Set event ID from nodeid %s to %llx\n",
3884 totempg_ifaces_print (nodeid),
3885 (unsigned long long)cpkt->u.chc_set_id.chc_last_id);
3886 base_id = cpkt->u.chc_set_id.chc_last_id + 1;
3889 break;
3893 * Receive the open count for a particular channel during recovery.
3894 * This insures that everyone has the same notion of who has a channel
3895 * open so that it can be removed when no one else has it open anymore.
3897 case EVT_OPEN_COUNT:
3898 if (recovery_phase == evt_recovery_complete) {
3899 log_printf(LOG_LEVEL_ERROR,
3900 "Evt open count msg from nodeid %s, but not in membership change\n",
3901 totempg_ifaces_print (nodeid));
3905 * Zero out all open counts because we're setting then based
3906 * on each nodes local counts.
3908 if (!processed_open_counts) {
3909 zero_chan_open_counts();
3910 processed_open_counts = 1;
3912 log_printf(RECOVERY_DEBUG,
3913 "Open channel count %s is %d for node %s\n",
3914 cpkt->u.chc_set_opens.chc_chan_name.value,
3915 cpkt->u.chc_set_opens.chc_open_count,
3916 totempg_ifaces_print (mn->mn_node_info.nodeId));
3918 eci = find_channel(&cpkt->u.chc_set_opens.chc_chan_name,
3919 EVT_CHAN_ACTIVE);
3920 if (!eci) {
3921 eci = create_channel(&cpkt->u.chc_set_opens.chc_chan_name);
3923 if (!eci) {
3924 log_printf(LOG_LEVEL_WARNING, "Could not create channel %s\n",
3925 get_mar_name_t(&cpkt->u.chc_set_opens.chc_chan_name));
3926 break;
3928 if (set_open_count(eci, mn->mn_node_info.nodeId,
3929 cpkt->u.chc_set_opens.chc_open_count)) {
3930 log_printf(LOG_LEVEL_ERROR,
3931 "Error setting Open channel count %s for node %s\n",
3932 cpkt->u.chc_set_opens.chc_chan_name.value,
3933 totempg_ifaces_print (mn->mn_node_info.nodeId));
3935 break;
3938 * Once we get all the messages from
3939 * the current membership, determine who delivers any retained events.
3941 case EVT_OPEN_COUNT_DONE: {
3942 if (recovery_phase == evt_recovery_complete) {
3943 log_printf(LOG_LEVEL_ERROR,
3944 "Evt config msg from nodeid %s, but not in membership change\n",
3945 totempg_ifaces_print (nodeid));
3947 log_printf(RECOVERY_DEBUG,
3948 "Receive EVT_CONF_CHANGE_DONE from nodeid %s members %d checked in %d\n",
3949 totempg_ifaces_print (nodeid), total_member_count, checked_in+1);
3950 if (!mn) {
3951 log_printf(RECOVERY_DEBUG,
3952 "NO NODE DATA AVAILABLE FOR nodeid %s\n", totempg_ifaces_print (nodeid));
3955 if (++checked_in == total_member_count) {
3957 * We're all here, now figure out who should send the
3958 * retained events.
3960 mn = oldest_node();
3961 if (mn && mn->mn_node_info.nodeId == my_node_id) {
3962 log_printf(RECOVERY_DEBUG,
3963 "I am oldest in my transitional config\n");
3964 recovery_node = 1;
3965 recovery_phase = evt_send_retained_events;
3966 } else {
3967 recovery_phase = evt_send_retained_events_done;
3968 recovery_node = 0;
3970 checked_in = 0;
3972 break;
3976 * Count up the nodes again, when all the nodes have responded, we've
3977 * distributed the retained events and we're done with recovery and can
3978 * continue operations.
3980 case EVT_CONF_DONE: {
3981 log_printf(RECOVERY_DEBUG,
3982 "Receive EVT_CONF_DONE from nodeid %s, members %d checked in %d\n",
3983 totempg_ifaces_print (nodeid),
3984 total_member_count, checked_in+1);
3985 if (++checked_in == total_member_count) {
3987 * All recovery complete, carry on.
3989 recovery_phase = evt_recovery_complete;
3990 #ifdef DUMP_CHAN_INFO
3991 dump_all_chans();
3992 #endif
3995 break;
3998 default:
3999 log_printf(LOG_LEVEL_NOTICE, "Invalid channel operation %d\n",
4000 cpkt->chc_op);
4001 break;
4006 * Set up initial conditions for processing event service
4007 * recovery.
4009 static void evt_sync_init(void)
4011 SaClmClusterNodeT *cn;
4012 struct member_node_data *md;
4013 unsigned int my_node = {SA_CLM_LOCAL_NODE_ID};
4014 int left_list_entries = left_member_count;
4015 unsigned int *left_list = left_member_list;
4017 log_printf(RECOVERY_DEBUG, "Evt synchronize initialization\n");
4020 * Set the base event id
4022 if (!my_node_id) {
4023 cn = main_clm_get_by_nodeid(my_node);
4024 log_printf(RECOVERY_DEBUG, "My node ID %s\n",
4025 totempg_ifaces_print (cn->nodeId));
4026 my_node_id = cn->nodeId;
4027 set_event_id(my_node_id);
4031 * account for nodes that left the membership
4033 while (left_list_entries--) {
4034 md = evt_find_node(*left_list);
4035 if (md == 0) {
4036 log_printf(LOG_LEVEL_WARNING,
4037 "Can't find cluster node at %s\n",
4038 totempg_ifaces_print (*left_list));
4040 * Mark this one as down.
4042 } else {
4043 log_printf(RECOVERY_DEBUG, "cluster node at %s down\n",
4044 totempg_ifaces_print(*left_list));
4045 md->mn_started = 0;
4046 remove_chan_open_info(md->mn_node_info.nodeId);
4048 left_list++;
4052 * set up for recovery processing, first phase:
4054 recovery_phase = evt_send_event_id;
4057 * List used to distribute last know event IDs.
4059 add_list = current_member_list;
4060 add_count = total_member_count;
4061 processed_open_counts = 0;
4064 * List used for distributing open channel counts
4066 next_chan = esc_head.next;
4069 * List used for distributing retained events
4071 next_retained = retained_list.next;
4074 * Member check in counts for open channel counts and
4075 * retained events.
4077 checked_in = 0;
4081 * Handle event service recovery. It passes through a number of states to
4082 * finish the recovery.
4084 * First, the node broadcasts the highest event ID that it has seen for any
4085 * joinig node. This helps to make sure that rejoining nodes don't re-use
4086 * event IDs that have already been seen.
4088 * Next, The node broadcasts its open channel information to the other nodes.
4089 * This makes sure that any joining nodes have complete data on any channels
4090 * already open.
4092 * Once done sending open channel information the node waits in a state for
4093 * the rest of the nodes to finish sending their data. When the last node
4094 * has checked in, then the remote channel operation handler selects the next
4095 * state which is evt_send_retained_events if this is the oldest node in the
4096 * cluster, or otherwise to evt_wait_send_retained_events to wait for the
4097 * retained events to be sent. When the retained events have been sent, the
4098 * state is changed to evt_recovery_complete and this function exits with
4099 * zero to inidicate that recovery is done.
4101 static int evt_sync_process(void)
4104 log_printf(RECOVERY_DEBUG, "Process Evt synchronization \n");
4106 switch (recovery_phase) {
4109 * Send last know event ID to joining nodes to prevent duplicate
4110 * event IDs.
4112 case evt_send_event_id:
4114 struct member_node_data *md;
4115 SaClmClusterNodeT *cn;
4116 struct req_evt_chan_command cpkt;
4117 struct iovec chn_iovec;
4118 int res;
4120 log_printf(RECOVERY_DEBUG, "Send max event ID updates\n");
4121 while (add_count) {
4123 * If we've seen this node before, send out the last msg ID
4124 * that we've seen from him. He will set his base ID for
4125 * generating event and message IDs to the highest one seen.
4127 md = evt_find_node(*add_list);
4128 if (md != NULL) {
4129 log_printf(RECOVERY_DEBUG,
4130 "Send set evt ID %llx to %s\n",
4131 (unsigned long long)md->mn_last_msg_id,
4132 totempg_ifaces_print (*add_list));
4133 md->mn_started = 1;
4134 memset(&cpkt, 0, sizeof(cpkt));
4135 cpkt.chc_head.id =
4136 SERVICE_ID_MAKE (EVT_SERVICE, MESSAGE_REQ_EXEC_EVT_CHANCMD);
4137 cpkt.chc_head.size = sizeof(cpkt);
4138 cpkt.chc_op = EVT_SET_ID_OP;
4139 cpkt.u.chc_set_id.chc_nodeid = *add_list;
4140 cpkt.u.chc_set_id.chc_last_id = md->mn_last_msg_id;
4141 chn_iovec.iov_base = (char *)&cpkt;
4142 chn_iovec.iov_len = cpkt.chc_head.size;
4143 res = totempg_groups_mcast_joined(openais_group_handle,
4144 &chn_iovec, 1, TOTEMPG_AGREED);
4145 if (res != 0) {
4146 log_printf(RECOVERY_DEBUG,
4147 "Unable to send event id to %s\n",
4148 totempg_ifaces_print (*add_list));
4150 * We'll try again later.
4152 return 1;
4155 } else {
4157 * Not seen before, add it to our list of nodes.
4159 cn = main_clm_get_by_nodeid(*add_list);
4160 if (!cn) {
4162 * Error: shouldn't happen
4164 log_printf(LOG_LEVEL_ERROR,
4165 "recovery error node: %s not found\n",
4166 totempg_ifaces_print (*add_list));
4167 assert(0);
4168 } else {
4169 evt_add_node(*add_list, cn);
4173 add_list++;
4174 add_count--;
4176 recovery_phase = evt_send_open_count;
4177 return 1;
4181 * Send channel open counts so all members have the same channel open
4182 * counts.
4184 case evt_send_open_count:
4186 struct req_evt_chan_command cpkt;
4187 struct iovec chn_iovec;
4188 struct event_svr_channel_instance *eci;
4189 int res;
4191 log_printf(RECOVERY_DEBUG, "Send open count updates\n");
4193 * Process messages. When we're done, send the done message
4194 * to the nodes.
4196 memset(&cpkt, 0, sizeof(cpkt));
4197 for (;next_chan != &esc_head;
4198 next_chan = next_chan->next) {
4199 log_printf(RECOVERY_DEBUG, "Sending next open count\n");
4200 eci = list_entry(next_chan, struct event_svr_channel_instance,
4201 esc_entry);
4202 cpkt.chc_head.id =
4203 SERVICE_ID_MAKE(EVT_SERVICE, MESSAGE_REQ_EXEC_EVT_CHANCMD);
4204 cpkt.chc_head.size = sizeof(cpkt);
4205 cpkt.chc_op = EVT_OPEN_COUNT;
4206 cpkt.u.chc_set_opens.chc_chan_name = eci->esc_channel_name;
4207 cpkt.u.chc_set_opens.chc_open_count = eci->esc_local_opens;
4208 chn_iovec.iov_base = (char *)&cpkt;
4209 chn_iovec.iov_len = cpkt.chc_head.size;
4210 res = totempg_groups_mcast_joined(openais_group_handle,
4211 &chn_iovec, 1, TOTEMPG_AGREED);
4213 if (res != 0) {
4215 * Try again later.
4217 return 1;
4220 memset(&cpkt, 0, sizeof(cpkt));
4221 cpkt.chc_head.id =
4222 SERVICE_ID_MAKE(EVT_SERVICE, MESSAGE_REQ_EXEC_EVT_CHANCMD);
4223 cpkt.chc_head.size = sizeof(cpkt);
4224 cpkt.chc_op = EVT_OPEN_COUNT_DONE;
4225 chn_iovec.iov_base = (char *)&cpkt;
4226 chn_iovec.iov_len = cpkt.chc_head.size;
4227 res = totempg_groups_mcast_joined(openais_group_handle,
4228 &chn_iovec, 1,TOTEMPG_AGREED);
4229 if (res != 0) {
4231 * Try again later.
4233 return 1;
4235 log_printf(RECOVERY_DEBUG, "DONE Sending open counts\n");
4237 recovery_phase = evt_wait_open_count_done;
4238 return 1;
4242 * Wait for all nodes to finish sending open updates before proceding.
4243 * the EVT_OPEN_COUNT_DONE handler will set the state to
4244 * evt_send_retained_events to get us out of this.
4246 case evt_wait_open_count_done:
4248 log_printf(RECOVERY_DEBUG, "Wait for open count done\n");
4249 return 1;
4253 * If I'm the oldest node, send out retained events so that new nodes
4254 * have all the information.
4256 case evt_send_retained_events:
4258 struct iovec chn_iovec;
4259 struct event_data *evt;
4260 int res;
4262 log_printf(RECOVERY_DEBUG, "Send retained event updates\n");
4265 * Process messages. When we're done, send the done message
4266 * to the nodes.
4268 for (;next_retained != &retained_list;
4269 next_retained = next_retained->next) {
4270 log_printf(LOG_LEVEL_DEBUG, "Sending next retained event\n");
4271 evt = list_entry(next_retained, struct event_data, ed_retained);
4272 evt->ed_event.led_head.id =
4273 SERVICE_ID_MAKE(EVT_SERVICE, MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA);
4274 chn_iovec.iov_base = (char *)&evt->ed_event;
4275 chn_iovec.iov_len = evt->ed_event.led_head.size;
4276 res = totempg_groups_mcast_joined(openais_group_handle,
4277 &chn_iovec, 1, TOTEMPG_AGREED);
4279 if (res != 0) {
4281 * Try again later.
4283 return -1;
4287 recovery_phase = evt_send_retained_events_done;
4288 return 1;
4291 case evt_send_retained_events_done:
4293 struct req_evt_chan_command cpkt;
4294 struct iovec chn_iovec;
4295 int res;
4297 log_printf(RECOVERY_DEBUG, "DONE Sending retained events\n");
4298 memset(&cpkt, 0, sizeof(cpkt));
4299 cpkt.chc_head.id =
4300 SERVICE_ID_MAKE(EVT_SERVICE, MESSAGE_REQ_EXEC_EVT_CHANCMD);
4301 cpkt.chc_head.size = sizeof(cpkt);
4302 cpkt.chc_op = EVT_CONF_DONE;
4303 chn_iovec.iov_base = (char *)&cpkt;
4304 chn_iovec.iov_len = cpkt.chc_head.size;
4305 res = totempg_groups_mcast_joined(openais_group_handle,
4306 &chn_iovec, 1, TOTEMPG_AGREED);
4308 recovery_phase = evt_wait_send_retained_events;
4309 return 1;
4313 * Wait for send of retained events to finish
4314 * the EVT_CONF_DONE handler will set the state to
4315 * evt_recovery_complete to get us out of this.
4317 case evt_wait_send_retained_events:
4319 log_printf(RECOVERY_DEBUG, "Wait for retained events\n");
4320 return 1;
4323 case evt_recovery_complete:
4325 log_printf(RECOVERY_DEBUG, "Recovery complete\n");
4326 return 0;
4329 default:
4330 log_printf(LOG_LEVEL_WARNING, "Bad recovery phase state: %u\n",
4331 recovery_phase);
4332 recovery_phase = evt_recovery_complete;
4333 return 0;
4336 return 0;
4340 * Not used at this time
4342 static void evt_sync_activate(void)
4344 log_printf(RECOVERY_DEBUG, "Evt synchronize activation\n");
4348 * Not used at this time
4350 static void evt_sync_abort(void)
4352 log_printf(RECOVERY_DEBUG, "Abort Evt synchronization\n");
4356 * vi: set autoindent tabstop=4 shiftwidth=4 :