Allow make from the exec directory.
[openais.git] / exec / ckpt.c
blob9f26b63e7402f9e5a8b7eb2d4f2e9bc82e3ad89b
1 /*
2 * Copyright (c) 2003-2006 MontaVista Software, Inc.
3 * Copyright (c) 2006-2007 Red Hat, Inc.
5 * All rights reserved.
7 * Authors: Steven Dake (sdake@mvista.com)
8 * Muni Bajpai (muni.osdl@gmail.com)
10 * This software licensed under BSD license, the text of which follows:
12 * Redistribution and use in source and binary forms, with or without
13 * modification, are permitted provided that the following conditions are met:
15 * - Redistributions of source code must retain the above copyright notice,
16 * this list of conditions and the following disclaimer.
17 * - Redistributions in binary form must reproduce the above copyright notice,
18 * this list of conditions and the following disclaimer in the documentation
19 * and/or other materials provided with the distribution.
20 * - Neither the name of the MontaVista Software, Inc. nor the names of its
21 * contributors may be used to endorse or promote products derived from this
22 * software without specific prior written permission.
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
34 * THE POSSIBILITY OF SUCH DAMAGE.
36 #include <sys/types.h>
37 #include <sys/uio.h>
38 #include <sys/socket.h>
39 #include <sys/un.h>
40 #include <sys/time.h>
41 #include <netinet/in.h>
42 #include <unistd.h>
43 #include <fcntl.h>
44 #include <stdlib.h>
45 #include <stdio.h>
46 #include <errno.h>
47 #include <signal.h>
48 #include <arpa/inet.h>
50 #include "../include/saAis.h"
51 #include "../include/saCkpt.h"
52 #include "../include/mar_ckpt.h"
53 #include "../include/ipc_ckpt.h"
54 #include "../include/list.h"
55 #include "../include/queue.h"
56 #include "../include/hdb.h"
57 #include "../lcr/lcr_comp.h"
58 #include "objdb.h"
59 #include "totem.h"
60 #include "service.h"
61 #include "mempool.h"
62 #include "tlist.h"
63 #include "timer.h"
64 #include "util.h"
65 #include "main.h"
66 #include "flow.h"
67 #include "ipc.h"
68 #include "totempg.h"
69 #include "logsys.h"
71 LOGSYS_DECLARE_SUBSYS ("CKPT", LOG_INFO);
73 #define CKPT_MAX_SECTION_DATA_SEND (1024*400)
75 enum ckpt_message_req_types {
76 MESSAGE_REQ_EXEC_CKPT_CHECKPOINTOPEN = 0,
77 MESSAGE_REQ_EXEC_CKPT_CHECKPOINTCLOSE = 1,
78 MESSAGE_REQ_EXEC_CKPT_CHECKPOINTUNLINK = 2,
79 MESSAGE_REQ_EXEC_CKPT_CHECKPOINTRETENTIONDURATIONSET = 3,
80 MESSAGE_REQ_EXEC_CKPT_CHECKPOINTRETENTIONDURATIONEXPIRE = 4,
81 MESSAGE_REQ_EXEC_CKPT_SECTIONCREATE = 5,
82 MESSAGE_REQ_EXEC_CKPT_SECTIONDELETE = 6,
83 MESSAGE_REQ_EXEC_CKPT_SECTIONEXPIRATIONTIMESET = 7,
84 MESSAGE_REQ_EXEC_CKPT_SECTIONWRITE = 8,
85 MESSAGE_REQ_EXEC_CKPT_SECTIONOVERWRITE = 9,
86 MESSAGE_REQ_EXEC_CKPT_SECTIONREAD = 10,
87 MESSAGE_REQ_EXEC_CKPT_SYNCCHECKPOINT = 11,
88 MESSAGE_REQ_EXEC_CKPT_SYNCCHECKPOINTSECTION = 12,
89 MESSAGE_REQ_EXEC_CKPT_SYNCCHECKPOINTREFCOUNT = 13
92 struct checkpoint_section {
93 struct list_head list;
94 mar_ckpt_section_descriptor_t section_descriptor;
95 void *section_data;
96 timer_handle expiration_timer;
99 enum sync_state {
100 SYNC_STATE_CHECKPOINT,
101 SYNC_STATE_REFCOUNT
104 enum iteration_state {
105 ITERATION_STATE_CHECKPOINT,
106 ITERATION_STATE_SECTION
109 struct refcount_set {
110 unsigned int refcount;
111 unsigned int nodeid;
114 typedef struct {
115 unsigned int refcount __attribute__((aligned(8)));
116 unsigned int nodeid __attribute__((aligned(8)));
117 } mar_refcount_set_t;
119 static inline void marshall_to_mar_refcount_set_t (
120 mar_refcount_set_t *dest,
121 struct refcount_set *src)
123 dest->refcount = src->refcount;
124 dest->nodeid = src->nodeid;
127 static inline void marshall_to_mar_refcount_set_t_all (
128 mar_refcount_set_t *dest,
129 struct refcount_set *src)
131 unsigned int i;
132 for (i = 0; i < PROCESSOR_COUNT_MAX; i++) {
133 marshall_to_mar_refcount_set_t (&dest[i], &src[i]);
137 static inline void marshall_from_mar_refcount_set_t (
138 struct refcount_set *dest,
139 mar_refcount_set_t *src)
141 dest->refcount = src->refcount;
142 dest->nodeid = src->nodeid;
145 static inline void marshall_from_mar_refcount_set_t_all (
146 struct refcount_set *dest,
147 mar_refcount_set_t *src)
149 unsigned int i;
151 for (i = 0; i < PROCESSOR_COUNT_MAX; i++) {
152 marshall_from_mar_refcount_set_t (&dest[i], &src[i]);
156 static inline void swab_mar_refcount_set_t (mar_refcount_set_t *to_swab)
158 swab_mar_uint32_t (&to_swab->refcount);
159 swab_mar_uint32_t (&to_swab->nodeid);
162 struct checkpoint {
163 struct list_head list;
164 mar_name_t name;
165 mar_uint32_t ckpt_id;
166 mar_ckpt_checkpoint_creation_attributes_t checkpoint_creation_attributes;
167 struct list_head sections_list_head;
168 int reference_count;
169 int unlinked;
170 timer_handle retention_timer;
171 int active_replica_set;
172 int section_count;
173 struct refcount_set refcount_set[PROCESSOR_COUNT_MAX];
176 struct iteration_entry {
177 char *section_id;
178 unsigned int section_id_len;
181 struct iteration_instance {
182 struct iteration_entry *iteration_entries;
183 mar_name_t checkpoint_name;
184 mar_uint32_t ckpt_id;
185 int iteration_entries_count;
186 unsigned int iteration_pos;
189 struct ckpt_pd {
190 struct list_head checkpoint_list;
191 struct hdb_handle_database iteration_hdb;
192 unsigned int iteration_pos;
195 struct ckpt_identifier {
196 mar_name_t ckpt_name;
197 mar_uint32_t ckpt_id;
198 mar_ckpt_section_id_t ckpt_section_id;
201 static int ckpt_exec_init_fn (struct objdb_iface_ver0 *);
203 static int ckpt_lib_exit_fn (void *conn);
205 static int ckpt_lib_init_fn (void *conn);
207 static void ckpt_dump_fn (void);
209 static void message_handler_req_lib_ckpt_checkpointopen (
210 void *conn,
211 void *msg);
213 static void message_handler_req_lib_ckpt_checkpointclose (
214 void *conn,
215 void *msg);
217 static void message_handler_req_lib_ckpt_checkpointunlink (
218 void *conn,
219 void *msg);
221 static void message_handler_req_lib_ckpt_checkpointretentiondurationset (
222 void *conn,
223 void *msg);
225 static void message_handler_req_lib_ckpt_activereplicaset (
226 void *conn,
227 void *msg);
229 static void message_handler_req_lib_ckpt_checkpointstatusget (
230 void *conn,
231 void *msg);
233 static void message_handler_req_lib_ckpt_sectioncreate (
234 void *conn,
235 void *msg);
237 static void message_handler_req_lib_ckpt_sectiondelete (
238 void *conn,
239 void *msg);
241 static void message_handler_req_lib_ckpt_sectionexpirationtimeset (
242 void *conn,
243 void *msg);
245 static void message_handler_req_lib_ckpt_sectionwrite (
246 void *conn,
247 void *msg);
249 static void message_handler_req_lib_ckpt_sectionoverwrite (
250 void *conn,
251 void *msg);
253 static void message_handler_req_lib_ckpt_sectionread (
254 void *conn,
255 void *msg);
257 static void message_handler_req_lib_ckpt_checkpointsynchronize (
258 void *conn,
259 void *msg);
261 static void message_handler_req_lib_ckpt_checkpointsynchronizeasync (
262 void *conn,
263 void *msg);
265 static void message_handler_req_lib_ckpt_sectioniterationinitialize (
266 void *conn,
267 void *msg);
269 static void message_handler_req_lib_ckpt_sectioniterationfinalize (
270 void *conn,
271 void *msg);
273 static void message_handler_req_lib_ckpt_sectioniterationnext (
274 void *conn,
275 void *msg);
277 static void message_handler_req_exec_ckpt_checkpointopen (
278 void *message,
279 unsigned int nodeid);
281 static void message_handler_req_exec_ckpt_sync_checkpoint (
282 void *message,
283 unsigned int nodeid);
285 static void message_handler_req_exec_ckpt_sync_checkpoint_section (
286 void *message,
287 unsigned int nodeid);
289 static void message_handler_req_exec_ckpt_sync_checkpoint_refcount (
290 void *message,
291 unsigned int nodeid);
293 static void message_handler_req_exec_ckpt_checkpointclose (
294 void *message,
295 unsigned int nodeid);
297 static void message_handler_req_exec_ckpt_checkpointunlink (
298 void *message,
299 unsigned int nodeid);
301 static void message_handler_req_exec_ckpt_checkpointretentiondurationset (
302 void *message,
303 unsigned int nodeid);
305 static void message_handler_req_exec_ckpt_checkpointretentiondurationexpire (
306 void *message,
307 unsigned int nodeid);
309 static void message_handler_req_exec_ckpt_sectioncreate (
310 void *message,
311 unsigned int nodeid);
313 static void message_handler_req_exec_ckpt_sectiondelete (
314 void *message,
315 unsigned int nodeid);
317 static void message_handler_req_exec_ckpt_sectionexpirationtimeset (
318 void *message,
319 unsigned int nodeid);
321 static void message_handler_req_exec_ckpt_sectionwrite (
322 void *message,
323 unsigned int nodeid);
325 static void message_handler_req_exec_ckpt_sectionoverwrite (
326 void *message,
327 unsigned int nodeid);
329 static void message_handler_req_exec_ckpt_sectionread (
330 void *message,
331 unsigned int nodeid);
333 static void exec_ckpt_checkpointopen_endian_convert (void *msg);
334 static void exec_ckpt_checkpointclose_endian_convert (void *msg);
335 static void exec_ckpt_checkpointunlink_endian_convert (void *msg);
336 static void exec_ckpt_checkpointretentiondurationset_endian_convert (void *msg);
337 static void exec_ckpt_checkpointretentiondurationexpire_endian_convert (void *msg);
338 static void exec_ckpt_sectioncreate_endian_convert (void *msg);
339 static void exec_ckpt_sectiondelete_endian_convert (void *msg);
340 static void exec_ckpt_sectrionexpirationtimeset_endian_convert (void *msg);
341 static void exec_ckpt_sectionwrite_endian_convert (void *msg);
342 static void exec_ckpt_sectionoverwrite_endian_convert (void *msg);
343 static void exec_ckpt_sectionread_endian_convert (void *msg);
344 static void exec_ckpt_sync_checkpoint_endian_convert (void *msg);
345 static void exec_ckpt_sync_checkpoint_section_endian_convert (void *msg);
346 static void exec_ckpt_sync_checkpoint_refcount_endian_convert (void *msg);
349 static void ckpt_sync_init (void);
350 static void ckpt_sync_activate (void);
351 static int ckpt_sync_process (void);
352 static void ckpt_sync_abort(void);
354 static void sync_refcount_increment (
355 struct checkpoint *checkpoint, unsigned int nodeid);
357 static void sync_refcount_decrement (
358 struct checkpoint *checkpoint, unsigned int nodeid);
360 static void sync_refcount_calculate (
361 struct checkpoint *checkpoint);
363 void checkpoint_release (struct checkpoint *checkpoint);
364 void timer_function_retention (void *data);
365 unsigned int abstime_to_msec (mar_time_t time);
366 void timer_function_section_expire (void *data);
367 void clean_checkpoint_list(struct list_head* head);
369 DECLARE_LIST_INIT(checkpoint_list_head);
371 DECLARE_LIST_INIT(sync_checkpoint_list_head);
373 DECLARE_LIST_INIT(checkpoint_iteration_list_head);
375 DECLARE_LIST_INIT(checkpoint_recovery_list_head);
377 static mar_uint32_t global_ckpt_id = 0;
379 static enum sync_state my_sync_state;
381 static enum iteration_state my_iteration_state;
383 static struct list_head *my_iteration_state_checkpoint;
385 static struct list_head *my_iteration_state_section;
387 static unsigned int my_member_list[PROCESSOR_COUNT_MAX];
389 static unsigned int my_member_list_entries = 0;
391 static unsigned int my_lowest_nodeid = 0;
393 struct checkpoint_cleanup {
394 struct list_head list;
395 mar_name_t checkpoint_name;
396 mar_uint32_t ckpt_id;
399 static struct memb_ring_id my_saved_ring_id;
401 static void ckpt_confchg_fn (
402 enum totem_configuration_type configuration_type,
403 unsigned int *member_list, int member_list_entries,
404 unsigned int *left_list, int left_list_entries,
405 unsigned int *joined_list, int joined_list_entries,
406 struct memb_ring_id *ring_id);
409 * Executive Handler Definition
411 static struct openais_lib_handler ckpt_lib_service[] =
413 { /* 0 */
414 .lib_handler_fn = message_handler_req_lib_ckpt_checkpointopen,
415 .response_size = sizeof (struct res_lib_ckpt_checkpointopen),
416 .response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPEN,
417 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
419 { /* 1 */
420 .lib_handler_fn = message_handler_req_lib_ckpt_checkpointclose,
421 .response_size = sizeof (struct res_lib_ckpt_checkpointclose),
422 .response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTCLOSE,
423 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
425 { /* 2 */
426 .lib_handler_fn = message_handler_req_lib_ckpt_checkpointunlink,
427 .response_size = sizeof (struct res_lib_ckpt_checkpointunlink),
428 .response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTUNLINK,
429 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
431 { /* 3 */
432 .lib_handler_fn = message_handler_req_lib_ckpt_checkpointretentiondurationset,
433 .response_size = sizeof (struct res_lib_ckpt_checkpointretentiondurationset),
434 .response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTRETENTIONDURATIONSET,
435 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
437 { /* 4 */
438 .lib_handler_fn = message_handler_req_lib_ckpt_activereplicaset,
439 .response_size = sizeof (struct res_lib_ckpt_activereplicaset),
440 .response_id = MESSAGE_RES_CKPT_ACTIVEREPLICASET,
441 .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED
443 { /* 5 */
444 .lib_handler_fn = message_handler_req_lib_ckpt_checkpointstatusget,
445 .response_size = sizeof (struct res_lib_ckpt_checkpointstatusget),
446 .response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSTATUSGET,
447 .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED
449 { /* 6 */
450 .lib_handler_fn = message_handler_req_lib_ckpt_sectioncreate,
451 .response_size = sizeof (struct res_lib_ckpt_sectioncreate),
452 .response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONCREATE,
453 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
455 { /* 7 */
456 .lib_handler_fn = message_handler_req_lib_ckpt_sectiondelete,
457 .response_size = sizeof (struct res_lib_ckpt_sectiondelete),
458 .response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONDELETE,
459 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
461 { /* 8 */
462 .lib_handler_fn = message_handler_req_lib_ckpt_sectionexpirationtimeset,
463 .response_size = sizeof (struct res_lib_ckpt_sectionexpirationtimeset),
464 .response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONEXPIRATIONTIMESET,
465 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
467 { /* 9 */
468 .lib_handler_fn = message_handler_req_lib_ckpt_sectionwrite,
469 .response_size = sizeof (struct res_lib_ckpt_sectionwrite),
470 .response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONWRITE,
471 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
473 { /* 10 */
474 .lib_handler_fn = message_handler_req_lib_ckpt_sectionoverwrite,
475 .response_size = sizeof (struct res_lib_ckpt_sectionoverwrite),
476 .response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONOVERWRITE,
477 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
479 { /* 11 */
480 .lib_handler_fn = message_handler_req_lib_ckpt_sectionread,
481 .response_size = sizeof (struct res_lib_ckpt_sectionread),
482 .response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONREAD,
483 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
485 { /* 12 */
486 .lib_handler_fn = message_handler_req_lib_ckpt_checkpointsynchronize,
487 .response_size = sizeof (struct res_lib_ckpt_checkpointsynchronize),
488 .response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZE,
489 .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED
491 { /* 13 */
492 .lib_handler_fn = message_handler_req_lib_ckpt_checkpointsynchronizeasync,
493 .response_size = sizeof (struct res_lib_ckpt_checkpointsynchronizeasync), /* TODO RESPONSE */
494 .response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZEASYNC,
495 .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED
497 { /* 14 */
498 .lib_handler_fn = message_handler_req_lib_ckpt_sectioniterationinitialize,
499 .response_size = sizeof (struct res_lib_ckpt_sectioniterationinitialize),
500 .response_id = MESSAGE_RES_CKPT_SECTIONITERATIONINITIALIZE,
501 .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED
503 { /* 15 */
504 .lib_handler_fn = message_handler_req_lib_ckpt_sectioniterationfinalize,
505 .response_size = sizeof (struct res_lib_ckpt_sectioniterationfinalize),
506 .response_id = MESSAGE_RES_CKPT_SECTIONITERATIONFINALIZE,
507 .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED
509 { /* 16 */
510 .lib_handler_fn = message_handler_req_lib_ckpt_sectioniterationnext,
511 .response_size = sizeof (struct res_lib_ckpt_sectioniterationnext),
512 .response_id = MESSAGE_RES_CKPT_SECTIONITERATIONNEXT,
513 .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED
518 static struct openais_exec_handler ckpt_exec_service[] = {
520 .exec_handler_fn = message_handler_req_exec_ckpt_checkpointopen,
521 .exec_endian_convert_fn = exec_ckpt_checkpointopen_endian_convert
525 .exec_handler_fn = message_handler_req_exec_ckpt_checkpointclose,
526 .exec_endian_convert_fn = exec_ckpt_checkpointclose_endian_convert
529 .exec_handler_fn = message_handler_req_exec_ckpt_checkpointunlink,
530 .exec_endian_convert_fn = exec_ckpt_checkpointunlink_endian_convert
533 .exec_handler_fn = message_handler_req_exec_ckpt_checkpointretentiondurationset,
534 .exec_endian_convert_fn = exec_ckpt_checkpointretentiondurationset_endian_convert
537 .exec_handler_fn = message_handler_req_exec_ckpt_checkpointretentiondurationexpire,
538 .exec_endian_convert_fn = exec_ckpt_checkpointretentiondurationexpire_endian_convert
541 .exec_handler_fn = message_handler_req_exec_ckpt_sectioncreate,
542 .exec_endian_convert_fn = exec_ckpt_sectioncreate_endian_convert
545 .exec_handler_fn = message_handler_req_exec_ckpt_sectiondelete,
546 .exec_endian_convert_fn = exec_ckpt_sectiondelete_endian_convert
549 .exec_handler_fn = message_handler_req_exec_ckpt_sectionexpirationtimeset,
550 .exec_endian_convert_fn = exec_ckpt_sectrionexpirationtimeset_endian_convert
553 .exec_handler_fn = message_handler_req_exec_ckpt_sectionwrite,
554 .exec_endian_convert_fn = exec_ckpt_sectionwrite_endian_convert
557 .exec_handler_fn = message_handler_req_exec_ckpt_sectionoverwrite,
558 .exec_endian_convert_fn = exec_ckpt_sectionoverwrite_endian_convert
561 .exec_handler_fn = message_handler_req_exec_ckpt_sectionread,
562 .exec_endian_convert_fn = exec_ckpt_sectionread_endian_convert
565 .exec_handler_fn = message_handler_req_exec_ckpt_sync_checkpoint,
566 .exec_endian_convert_fn = exec_ckpt_sync_checkpoint_endian_convert
569 .exec_handler_fn = message_handler_req_exec_ckpt_sync_checkpoint_section,
570 .exec_endian_convert_fn = exec_ckpt_sync_checkpoint_section_endian_convert
573 .exec_handler_fn = message_handler_req_exec_ckpt_sync_checkpoint_refcount,
574 .exec_endian_convert_fn = exec_ckpt_sync_checkpoint_refcount_endian_convert
578 struct openais_service_handler ckpt_service_handler = {
579 .name = "openais checkpoint service B.01.01",
580 .id = CKPT_SERVICE,
581 .private_data_size = sizeof (struct ckpt_pd),
582 .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED,
583 .lib_init_fn = ckpt_lib_init_fn,
584 .lib_exit_fn = ckpt_lib_exit_fn,
585 .lib_service = ckpt_lib_service,
586 .lib_service_count = sizeof (ckpt_lib_service) / sizeof (struct openais_lib_handler),
587 .exec_init_fn = ckpt_exec_init_fn,
588 .exec_dump_fn = ckpt_dump_fn,
589 .exec_service = ckpt_exec_service,
590 .exec_service_count = sizeof (ckpt_exec_service) / sizeof (struct openais_exec_handler),
591 .confchg_fn = ckpt_confchg_fn,
592 .sync_init = ckpt_sync_init,
593 .sync_process = ckpt_sync_process,
594 .sync_activate = ckpt_sync_activate,
595 .sync_abort = ckpt_sync_abort,
599 * Dynamic loader definition
601 static struct openais_service_handler *ckpt_get_handler_ver0 (void);
603 static struct openais_service_handler_iface_ver0 ckpt_service_handler_iface = {
604 .openais_get_service_handler_ver0 = ckpt_get_handler_ver0
607 static struct lcr_iface openais_ckpt_ver0[1] = {
609 .name = "openais_ckpt",
610 .version = 0,
611 .versions_replace = 0,
612 .versions_replace_count = 0,
613 .dependencies = 0,
614 .dependency_count = 0,
615 .constructor = NULL,
616 .destructor = NULL,
617 .interfaces = NULL
621 static struct lcr_comp ckpt_comp_ver0 = {
622 .iface_count = 1,
623 .ifaces = openais_ckpt_ver0
626 static struct openais_service_handler *ckpt_get_handler_ver0 (void)
628 return (&ckpt_service_handler);
631 __attribute__ ((constructor)) static void register_this_component (void) {
632 lcr_interfaces_set (&openais_ckpt_ver0[0], &ckpt_service_handler_iface);
634 lcr_component_register (&ckpt_comp_ver0);
638 * All data types used for executive messages
640 struct req_exec_ckpt_checkpointopen {
641 mar_req_header_t header __attribute__((aligned(8)));
642 mar_message_source_t source __attribute__((aligned(8)));
643 mar_name_t checkpoint_name __attribute__((aligned(8)));
644 mar_uint32_t ckpt_id __attribute__((aligned(8)));
645 mar_ckpt_checkpoint_creation_attributes_t checkpoint_creation_attributes __attribute__((aligned(8)));
646 mar_uint32_t checkpoint_creation_attributes_set __attribute__((aligned(8)));
647 mar_ckpt_checkpoint_open_flags_t checkpoint_open_flags __attribute__((aligned(8)));
648 mar_ckpt_checkpoint_handle_t checkpoint_handle __attribute__((aligned(8)));
649 mar_invocation_t invocation __attribute__((aligned(8)));
650 mar_uint32_t async_call __attribute__((aligned(8)));
651 mar_uint32_t fail_with_error __attribute__((aligned(8)));
654 struct req_exec_ckpt_checkpointclose {
655 mar_req_header_t header __attribute__((aligned(8)));
656 mar_message_source_t source __attribute__((aligned(8)));
657 mar_name_t checkpoint_name __attribute__((aligned(8)));
658 mar_uint32_t ckpt_id __attribute__((aligned(8)));
661 struct req_exec_ckpt_checkpointretentiondurationset {
662 mar_req_header_t header __attribute__((aligned(8)));
663 mar_message_source_t source __attribute__((aligned(8)));
664 mar_name_t checkpoint_name __attribute__((aligned(8)));
665 mar_uint32_t ckpt_id __attribute__((aligned(8)));
666 mar_time_t retention_duration __attribute__((aligned(8)));
669 struct req_exec_ckpt_checkpointretentiondurationexpire {
670 mar_req_header_t header __attribute__((aligned(8)));
671 mar_name_t checkpoint_name __attribute__((aligned(8)));
672 mar_uint32_t ckpt_id __attribute__((aligned(8)));
675 struct req_exec_ckpt_checkpointunlink {
676 mar_req_header_t header __attribute__((aligned(8)));
677 mar_message_source_t source __attribute__((aligned(8)));
678 mar_name_t checkpoint_name __attribute__((aligned(8)));
681 struct req_exec_ckpt_sectioncreate {
682 mar_req_header_t header __attribute__((aligned(8)));
683 mar_message_source_t source __attribute__((aligned(8)));
684 mar_name_t checkpoint_name __attribute__((aligned(8)));
685 mar_uint32_t ckpt_id __attribute__((aligned(8)));
686 mar_uint32_t id_len __attribute__((aligned(8)));
687 mar_time_t expiration_time __attribute__((aligned(8)));
688 mar_uint32_t initial_data_size __attribute__((aligned(8)));
691 struct req_exec_ckpt_sectiondelete {
692 mar_req_header_t header __attribute__((aligned(8)));
693 mar_message_source_t source __attribute__((aligned(8)));
694 mar_name_t checkpoint_name __attribute__((aligned(8)));
695 mar_uint32_t ckpt_id __attribute__((aligned(8)));
696 mar_uint32_t id_len __attribute__((aligned(8)));
699 struct req_exec_ckpt_sectionexpirationtimeset {
700 mar_req_header_t header __attribute__((aligned(8)));
701 mar_message_source_t source __attribute__((aligned(8)));
702 mar_name_t checkpoint_name __attribute__((aligned(8)));
703 mar_uint32_t ckpt_id __attribute__((aligned(8)));
704 mar_uint32_t id_len __attribute__((aligned(8)));
705 mar_time_t expiration_time __attribute__((aligned(8)));
708 struct req_exec_ckpt_sectionwrite {
709 mar_req_header_t header __attribute__((aligned(8)));
710 mar_message_source_t source __attribute__((aligned(8)));
711 mar_name_t checkpoint_name __attribute__((aligned(8)));
712 mar_uint32_t ckpt_id __attribute__((aligned(8)));
713 mar_uint32_t id_len __attribute__((aligned(8)));
714 mar_offset_t data_offset __attribute__((aligned(8)));
715 mar_offset_t data_size __attribute__((aligned(8)));
718 struct req_exec_ckpt_sectionoverwrite {
719 mar_req_header_t header __attribute__((aligned(8)));
720 mar_message_source_t source __attribute__((aligned(8)));
721 mar_name_t checkpoint_name __attribute__((aligned(8)));
722 mar_uint32_t ckpt_id __attribute__((aligned(8)));
723 mar_uint32_t id_len __attribute__((aligned(8)));
724 mar_offset_t data_size __attribute__((aligned(8)));
727 struct req_exec_ckpt_sectionread {
728 mar_req_header_t header __attribute__((aligned(8)));
729 mar_message_source_t source __attribute__((aligned(8)));
730 mar_name_t checkpoint_name __attribute__((aligned(8)));
731 mar_uint32_t ckpt_id __attribute__((aligned(8)));
732 mar_uint32_t id_len __attribute__((aligned(8)));
733 mar_offset_t data_offset __attribute__((aligned(8)));
734 mar_offset_t data_size __attribute__((aligned(8)));
737 struct req_exec_ckpt_sync_checkpoint {
738 mar_req_header_t header __attribute__((aligned(8)));
739 struct memb_ring_id ring_id __attribute__((aligned(8)));
740 mar_name_t checkpoint_name __attribute__((aligned(8)));
741 mar_uint32_t ckpt_id __attribute__((aligned(8)));
742 mar_ckpt_checkpoint_creation_attributes_t checkpoint_creation_attributes __attribute__((aligned(8)));
743 mar_uint32_t checkpoint_creation_attributes_set __attribute__((aligned(8)));
744 mar_uint32_t active_replica_set __attribute__((aligned(8)));
745 mar_uint32_t unlinked __attribute__((aligned(8)));
748 struct req_exec_ckpt_sync_checkpoint_section {
749 mar_req_header_t header __attribute__((aligned(8)));
750 struct memb_ring_id ring_id __attribute__((aligned(8)));
751 mar_name_t checkpoint_name __attribute__((aligned(8)));
752 mar_uint32_t ckpt_id __attribute__((aligned(8)));
753 mar_uint32_t id_len __attribute__((aligned(8)));
754 mar_time_t expiration_time __attribute__((aligned(8)));
755 mar_uint32_t section_size __attribute__((aligned(8)));
758 struct req_exec_ckpt_sync_checkpoint_refcount {
759 mar_req_header_t header __attribute__((aligned(8)));
760 struct memb_ring_id ring_id __attribute__((aligned(8)));
761 mar_name_t checkpoint_name __attribute__((aligned(8)));
762 mar_uint32_t ckpt_id __attribute__((aligned(8)));
763 mar_refcount_set_t refcount_set[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
767 * Implementation
770 void clean_checkpoint_list(struct list_head *head)
772 struct list_head *checkpoint_list;
773 struct checkpoint *checkpoint;
775 if (list_empty(head)) {
776 log_printf (LOG_LEVEL_DEBUG, "clean_checkpoint_list: List is empty \n");
777 return;
780 checkpoint_list = head->next;
781 while (checkpoint_list != head) {
782 checkpoint = list_entry (checkpoint_list,
783 struct checkpoint, list);
784 assert (checkpoint > 0);
787 * If checkpoint has been unlinked and this is the last reference, delete it
789 if (checkpoint->unlinked && checkpoint->reference_count == 0) {
790 log_printf (LOG_LEVEL_DEBUG,"clean_checkpoint_list: deallocating checkpoint %s.\n",
791 checkpoint->name.value);
792 checkpoint_list = checkpoint_list->next;
793 checkpoint_release (checkpoint);
794 continue;
797 else if (checkpoint->reference_count == 0) {
798 log_printf (LOG_LEVEL_DEBUG, "clean_checkpoint_list: Starting timer to release checkpoint %s.\n",
799 checkpoint->name.value);
800 openais_timer_delete (checkpoint->retention_timer);
801 openais_timer_add_duration (
802 checkpoint->checkpoint_creation_attributes.retention_duration,
803 checkpoint,
804 timer_function_retention,
805 &checkpoint->retention_timer);
807 checkpoint_list = checkpoint_list->next;
811 static void ckpt_confchg_fn (
812 enum totem_configuration_type configuration_type,
813 unsigned int *member_list, int member_list_entries,
814 unsigned int *left_list, int left_list_entries,
815 unsigned int *joined_list, int joined_list_entries,
816 struct memb_ring_id *ring_id)
818 unsigned int i, j;
821 * Determine lowest nodeid in old regular configuration for the
822 * purpose of executing the synchronization algorithm
824 if (configuration_type == TOTEM_CONFIGURATION_TRANSITIONAL) {
825 for (i = 0; i < left_list_entries; i++) {
826 for (j = 0; j < my_member_list_entries; j++) {
827 if (left_list[i] == my_member_list[j]) {
828 my_member_list[j] = 0;
834 my_lowest_nodeid = 0xffffffff;
837 * Handle regular configuration
839 if (configuration_type == TOTEM_CONFIGURATION_REGULAR) {
840 memcpy (my_member_list, member_list,
841 sizeof (unsigned int) * member_list_entries);
842 my_member_list_entries = member_list_entries;
843 memcpy (&my_saved_ring_id, ring_id,
844 sizeof (struct memb_ring_id));
845 for (i = 0; i < my_member_list_entries; i++) {
846 if ((my_member_list[i] != 0) &&
847 (my_member_list[i] < my_lowest_nodeid)) {
849 my_lowest_nodeid = my_member_list[i];
855 static struct checkpoint *checkpoint_find (
856 struct list_head *ckpt_list_head,
857 mar_name_t *name,
858 mar_uint32_t ckpt_id)
860 struct list_head *list;
861 struct checkpoint *checkpoint;
863 for (list = ckpt_list_head->next;
864 list != ckpt_list_head;
865 list = list->next) {
867 checkpoint = list_entry (list,
868 struct checkpoint, list);
870 if (mar_name_match (name, &checkpoint->name) &&
871 ckpt_id == checkpoint->ckpt_id) {
872 return (checkpoint);
875 return (0);
878 static struct checkpoint *checkpoint_find_linked (
879 struct list_head *ckpt_list_head,
880 mar_name_t *name)
882 struct list_head *list;
883 struct checkpoint *checkpoint;
885 for (list = ckpt_list_head->next;
886 list != ckpt_list_head;
887 list = list->next) {
889 checkpoint = list_entry (list, struct checkpoint, list);
891 if (mar_name_match (name, &checkpoint->name) &&
892 checkpoint->unlinked == 0) {
893 return (checkpoint);
896 return (0);
899 static struct checkpoint *checkpoint_find_specific (
900 struct list_head *ckpt_list_head,
901 mar_name_t *name,
902 mar_uint32_t ckpt_id)
904 struct list_head *list;
905 struct checkpoint *checkpoint;
907 for (list = ckpt_list_head->next;
908 list != ckpt_list_head;
909 list = list->next) {
911 checkpoint = list_entry (list, struct checkpoint, list);
913 if (mar_name_match (name, &checkpoint->name) &&
914 (ckpt_id == checkpoint->ckpt_id)) {
915 return (checkpoint);
918 return (0);
921 static void ckpt_checkpoint_remove_cleanup (
922 void *conn,
923 mar_name_t checkpoint_name,
924 mar_uint32_t ckpt_id)
926 struct list_head *list;
927 struct checkpoint_cleanup *checkpoint_cleanup;
928 struct ckpt_pd *ckpt_pd = (struct ckpt_pd *)openais_conn_private_data_get (conn);
930 for (list = ckpt_pd->checkpoint_list.next;
931 list != &ckpt_pd->checkpoint_list;
932 list = list->next) {
934 checkpoint_cleanup = list_entry (list, struct checkpoint_cleanup, list);
935 if (mar_name_match (&checkpoint_cleanup->checkpoint_name,
936 &checkpoint_name) &&
937 (checkpoint_cleanup->ckpt_id == ckpt_id)) {
939 list_del (&checkpoint_cleanup->list);
940 free (checkpoint_cleanup);
941 return;
946 static struct checkpoint_section *checkpoint_section_find (
947 struct checkpoint *checkpoint,
948 char *id,
949 int id_len)
951 struct list_head *checkpoint_section_list;
952 struct checkpoint_section *checkpoint_section;
954 if (id_len != 0) {
955 log_printf (LOG_LEVEL_DEBUG, "Finding checkpoint section id %s %d\n", (char*)id, id_len);
957 else {
958 log_printf (LOG_LEVEL_DEBUG, "Finding default checkpoint section\n");
961 for (checkpoint_section_list = checkpoint->sections_list_head.next;
962 checkpoint_section_list != &checkpoint->sections_list_head;
963 checkpoint_section_list = checkpoint_section_list->next) {
965 checkpoint_section = list_entry (checkpoint_section_list,
966 struct checkpoint_section, list);
967 if (checkpoint_section->section_descriptor.section_id.id_len) {
968 log_printf (LOG_LEVEL_DEBUG, "Checking section id %d %*s\n",
969 checkpoint_section->section_descriptor.section_id.id_len,
970 checkpoint_section->section_descriptor.section_id.id_len,
971 checkpoint_section->section_descriptor.section_id.id);
973 else {
974 log_printf (LOG_LEVEL_DEBUG, "Checking default section id\n");
978 All 3 of these values being checked MUST be = 0 to return
979 The default section. If even one of them is NON zero follow
980 the normal route
982 if ((id_len ||
983 checkpoint_section->section_descriptor.section_id.id ||
984 checkpoint_section->section_descriptor.section_id.id_len) == 0) {
985 log_printf (LOG_LEVEL_DEBUG, "Returning default section\n");
986 return (checkpoint_section);
989 if (checkpoint_section->section_descriptor.section_id.id_len == id_len &&
990 (checkpoint_section->section_descriptor.section_id.id)&&
991 (id)&&
992 (memcmp (checkpoint_section->section_descriptor.section_id.id,
993 id, id_len) == 0)) {
995 log_printf (LOG_LEVEL_DEBUG, "Returning section %s(0x%p)\n", checkpoint_section->section_descriptor.section_id.id,
996 checkpoint_section);
998 return (checkpoint_section);
1001 return 0;
1004 void checkpoint_section_release (struct checkpoint_section *section)
1006 log_printf (LOG_LEVEL_DEBUG, "checkpoint_section_release expiration timer = 0x%p\n", section->expiration_timer);
1007 list_del (&section->list);
1009 openais_timer_delete (section->expiration_timer);
1010 if (section->section_descriptor.section_id.id) {
1011 free (section->section_descriptor.section_id.id);
1013 if (section->section_data) {
1014 free (section->section_data);
1016 free (section);
1020 void checkpoint_release (struct checkpoint *checkpoint)
1022 struct list_head *list;
1023 struct checkpoint_section *section;
1025 openais_timer_delete (checkpoint->retention_timer);
1028 * Release all checkpoint sections for this checkpoint
1030 for (list = checkpoint->sections_list_head.next;
1031 list != &checkpoint->sections_list_head;) {
1033 section = list_entry (list,
1034 struct checkpoint_section, list);
1036 list = list->next;
1037 checkpoint->section_count -= 1;
1038 openais_timer_delete (section->expiration_timer);
1039 checkpoint_section_release (section);
1041 list_del (&checkpoint->list);
1042 free (checkpoint);
1045 int ckpt_checkpoint_close (
1046 mar_name_t *checkpoint_name,
1047 mar_uint32_t ckpt_id)
1049 struct req_exec_ckpt_checkpointclose req_exec_ckpt_checkpointclose;
1050 struct iovec iovec;
1052 req_exec_ckpt_checkpointclose.header.size =
1053 sizeof (struct req_exec_ckpt_checkpointclose);
1054 req_exec_ckpt_checkpointclose.header.id =
1055 SERVICE_ID_MAKE (CKPT_SERVICE,
1056 MESSAGE_REQ_EXEC_CKPT_CHECKPOINTCLOSE);
1058 memcpy (&req_exec_ckpt_checkpointclose.checkpoint_name,
1059 checkpoint_name, sizeof (mar_name_t));
1060 req_exec_ckpt_checkpointclose.ckpt_id = ckpt_id;
1061 memset (&req_exec_ckpt_checkpointclose.source, 0,
1062 sizeof (mar_message_source_t));
1064 iovec.iov_base = (char *)&req_exec_ckpt_checkpointclose;
1065 iovec.iov_len = sizeof (req_exec_ckpt_checkpointclose);
1067 assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0);
1069 return (-1);
1072 static int ckpt_exec_init_fn (struct objdb_iface_ver0 *objdb)
1074 return (0);
1079 * Endian conversion routines for executive message handlers
1081 static void exec_ckpt_checkpointopen_endian_convert (void *msg)
1083 struct req_exec_ckpt_checkpointopen *req_exec_ckpt_checkpointopen = (struct req_exec_ckpt_checkpointopen *)msg;
1085 swab_mar_req_header_t (&req_exec_ckpt_checkpointopen->header);
1086 swab_mar_message_source_t (&req_exec_ckpt_checkpointopen->source);
1087 swab_mar_name_t (&req_exec_ckpt_checkpointopen->checkpoint_name);
1088 swab_mar_uint32_t (&req_exec_ckpt_checkpointopen->ckpt_id);
1089 swab_mar_ckpt_checkpoint_creation_attributes_t (&req_exec_ckpt_checkpointopen->checkpoint_creation_attributes);
1090 swab_mar_uint32_t (&req_exec_ckpt_checkpointopen->checkpoint_creation_attributes_set);
1091 swab_mar_ckpt_checkpoint_open_flags_t (&req_exec_ckpt_checkpointopen->checkpoint_open_flags);
1092 swab_mar_ckpt_checkpoint_handle_t (&req_exec_ckpt_checkpointopen->checkpoint_handle);
1093 swab_mar_invocation_t (&req_exec_ckpt_checkpointopen->invocation);
1094 swab_mar_uint32_t (&req_exec_ckpt_checkpointopen->async_call);
1095 swab_mar_uint32_t (&req_exec_ckpt_checkpointopen->fail_with_error);
1098 static void exec_ckpt_checkpointclose_endian_convert (void *msg)
1100 struct req_exec_ckpt_checkpointclose *req_exec_ckpt_checkpointclose = (struct req_exec_ckpt_checkpointclose *)msg;
1102 swab_mar_req_header_t (&req_exec_ckpt_checkpointclose->header);
1103 swab_mar_message_source_t (&req_exec_ckpt_checkpointclose->source);
1104 swab_mar_name_t (&req_exec_ckpt_checkpointclose->checkpoint_name);
1105 swab_mar_uint32_t (&req_exec_ckpt_checkpointclose->ckpt_id);
1107 static void exec_ckpt_checkpointunlink_endian_convert (void *msg)
1109 struct req_exec_ckpt_checkpointunlink *req_exec_ckpt_checkpointunlink = (struct req_exec_ckpt_checkpointunlink *)msg;
1111 swab_mar_req_header_t (&req_exec_ckpt_checkpointunlink->header);
1112 swab_mar_message_source_t (&req_exec_ckpt_checkpointunlink->source);
1113 swab_mar_name_t (&req_exec_ckpt_checkpointunlink->checkpoint_name);
1116 static void exec_ckpt_checkpointretentiondurationset_endian_convert (void *msg)
1118 struct req_exec_ckpt_checkpointretentiondurationset *req_exec_ckpt_checkpointretentiondurationset = (struct req_exec_ckpt_checkpointretentiondurationset *)msg;
1120 swab_mar_req_header_t (&req_exec_ckpt_checkpointretentiondurationset->header);
1121 swab_mar_message_source_t (&req_exec_ckpt_checkpointretentiondurationset->source);
1122 swab_mar_name_t (&req_exec_ckpt_checkpointretentiondurationset->checkpoint_name);
1123 swab_mar_uint32_t (&req_exec_ckpt_checkpointretentiondurationset->ckpt_id);
1124 swab_mar_time_t (&req_exec_ckpt_checkpointretentiondurationset->retention_duration);
1127 static void exec_ckpt_checkpointretentiondurationexpire_endian_convert (void *msg)
1129 struct req_exec_ckpt_checkpointretentiondurationexpire *req_exec_ckpt_checkpointretentiondurationexpire = (struct req_exec_ckpt_checkpointretentiondurationexpire *)msg;
1131 swab_mar_req_header_t (&req_exec_ckpt_checkpointretentiondurationexpire->header);
1132 swab_mar_name_t (&req_exec_ckpt_checkpointretentiondurationexpire->checkpoint_name);
1133 swab_mar_uint32_t (&req_exec_ckpt_checkpointretentiondurationexpire->ckpt_id);
1136 static void exec_ckpt_sectioncreate_endian_convert (void *msg)
1138 struct req_exec_ckpt_sectioncreate *req_exec_ckpt_sectioncreate = (struct req_exec_ckpt_sectioncreate *)msg;
1140 swab_mar_req_header_t (&req_exec_ckpt_sectioncreate->header);
1141 swab_mar_message_source_t (&req_exec_ckpt_sectioncreate->source);
1142 swab_mar_name_t (&req_exec_ckpt_sectioncreate->checkpoint_name);
1143 swab_mar_uint32_t (&req_exec_ckpt_sectioncreate->ckpt_id);
1144 swab_mar_uint32_t (&req_exec_ckpt_sectioncreate->id_len);
1145 swab_mar_time_t (&req_exec_ckpt_sectioncreate->expiration_time);
1146 swab_mar_uint32_t (&req_exec_ckpt_sectioncreate->initial_data_size);
1149 static void exec_ckpt_sectiondelete_endian_convert (void *msg)
1151 struct req_exec_ckpt_sectiondelete *req_exec_ckpt_sectiondelete = (struct req_exec_ckpt_sectiondelete *)msg;
1153 swab_mar_req_header_t (&req_exec_ckpt_sectiondelete->header);
1154 swab_mar_message_source_t (&req_exec_ckpt_sectiondelete->source);
1155 swab_mar_name_t (&req_exec_ckpt_sectiondelete->checkpoint_name);
1156 swab_mar_uint32_t (&req_exec_ckpt_sectiondelete->ckpt_id);
1157 swab_mar_uint32_t (&req_exec_ckpt_sectiondelete->id_len);
1160 static void exec_ckpt_sectrionexpirationtimeset_endian_convert (void *msg)
1162 struct req_exec_ckpt_sectionexpirationtimeset *req_exec_ckpt_sectionexpirationtimeset = (struct req_exec_ckpt_sectionexpirationtimeset *)msg;
1164 swab_mar_req_header_t (&req_exec_ckpt_sectionexpirationtimeset->header);
1165 swab_mar_message_source_t (&req_exec_ckpt_sectionexpirationtimeset->source);
1166 swab_mar_name_t (&req_exec_ckpt_sectionexpirationtimeset->checkpoint_name);
1167 swab_mar_uint32_t (&req_exec_ckpt_sectionexpirationtimeset->ckpt_id);
1168 swab_mar_uint32_t (&req_exec_ckpt_sectionexpirationtimeset->id_len);
1169 swab_mar_time_t (&req_exec_ckpt_sectionexpirationtimeset->expiration_time);
1172 static void exec_ckpt_sectionwrite_endian_convert (void *msg)
1174 struct req_exec_ckpt_sectionwrite *req_exec_ckpt_sectionwrite = (struct req_exec_ckpt_sectionwrite *)msg;
1176 swab_mar_req_header_t (&req_exec_ckpt_sectionwrite->header);
1177 swab_mar_message_source_t (&req_exec_ckpt_sectionwrite->source);
1178 swab_mar_name_t (&req_exec_ckpt_sectionwrite->checkpoint_name);
1179 swab_mar_uint32_t (&req_exec_ckpt_sectionwrite->ckpt_id);
1180 swab_mar_uint32_t (&req_exec_ckpt_sectionwrite->id_len);
1181 swab_mar_offset_t (&req_exec_ckpt_sectionwrite->data_size);
1184 static void exec_ckpt_sectionoverwrite_endian_convert (void *msg)
1186 struct req_exec_ckpt_sectionoverwrite *req_exec_ckpt_sectionoverwrite = (struct req_exec_ckpt_sectionoverwrite *)msg;
1188 swab_mar_req_header_t (&req_exec_ckpt_sectionoverwrite->header);
1189 swab_mar_message_source_t (&req_exec_ckpt_sectionoverwrite->source);
1190 swab_mar_name_t (&req_exec_ckpt_sectionoverwrite->checkpoint_name);
1191 swab_mar_uint32_t (&req_exec_ckpt_sectionoverwrite->ckpt_id);
1192 swab_mar_uint32_t (&req_exec_ckpt_sectionoverwrite->id_len);
1193 swab_mar_offset_t (&req_exec_ckpt_sectionoverwrite->data_size);
1196 static void exec_ckpt_sectionread_endian_convert (void *msg)
1198 struct req_exec_ckpt_sectionread *req_exec_ckpt_sectionread = (struct req_exec_ckpt_sectionread *)msg;
1200 swab_mar_req_header_t (&req_exec_ckpt_sectionread->header);
1201 swab_mar_message_source_t (&req_exec_ckpt_sectionread->source);
1202 swab_mar_name_t (&req_exec_ckpt_sectionread->checkpoint_name);
1203 swab_mar_uint32_t (&req_exec_ckpt_sectionread->ckpt_id);
1204 swab_mar_uint32_t (&req_exec_ckpt_sectionread->id_len);
1205 swab_mar_offset_t (&req_exec_ckpt_sectionread->data_offset);
1206 swab_mar_offset_t (&req_exec_ckpt_sectionread->data_size);
1209 static void exec_ckpt_sync_checkpoint_endian_convert (void *msg)
1212 static void exec_ckpt_sync_checkpoint_section_endian_convert (void *msg)
1215 static void exec_ckpt_sync_checkpoint_refcount_endian_convert (void *msg)
1219 #ifdef ABC
1220 static void exec_ckpt_sync_state_endian_convert (void *msg)
1222 struct req_exec_ckpt_sync_state *req_exec_ckpt_sync_state = (struct req_exec_ckpt_sync_state *)msg;
1223 unsigned int i;
1225 swab_mar_req_header_t (&req_exec_ckpt_sync_state->header);
1226 // swab_mar_memb_ring_id_t (&req_exec_ckpt_sync_state->memb_ring_id);
1227 swab_mar_name_t (&req_exec_ckpt_sync_state->checkpoint_name);
1228 swab_mar_uint32_t (&req_exec_ckpt_sync_state->ckpt_id);
1229 swab_mar_ckpt_checkpoint_creation_attributes_t (&req_exec_ckpt_sync_state->checkpoint_creation_attributes);
1230 // swab_mar_ckpt_section_descriptor_t (&req_exec_ckpt_sync_state->section_descriptor);
1231 swab_mar_uint32_t (&req_exec_ckpt_sync_state->nodeid);
1232 for (i = 0; i < PROCESSOR_COUNT_MAX; i++) {
1233 swab_mar_refcount_set_t (&req_exec_ckpt_sync_state->refcount_set[i]);
1237 static void exec_ckpt_sync_section_endian_convert (void *msg)
1239 struct req_exec_ckpt_sync_section *req_exec_ckpt_sync_section = (struct req_exec_ckpt_sync_section *)msg;
1240 swab_mar_req_header_t (&req_exec_ckpt_sync_section->header);
1241 // swab_mar_memb_ring_id_t (&req_exec_ckpt_sync_section->memb_ring_id);
1242 swab_mar_name_t (&req_exec_ckpt_sync_section->checkpoint_name);
1243 swab_mar_uint32_t (&req_exec_ckpt_sync_section->ckpt_id);
1244 swab_mar_uint32_t (&req_exec_ckpt_sync_section->id_len);
1245 swab_mar_offset_t (&req_exec_ckpt_sync_section->data_offset);
1246 swab_mar_offset_t (&req_exec_ckpt_sync_section->data_size);
1248 #endif
1251 * Executive message handlers
1253 static void message_handler_req_exec_ckpt_checkpointopen (
1254 void *message,
1255 unsigned int nodeid)
1257 struct req_exec_ckpt_checkpointopen *req_exec_ckpt_checkpointopen = (struct req_exec_ckpt_checkpointopen *)message;
1258 struct res_lib_ckpt_checkpointopen res_lib_ckpt_checkpointopen;
1259 struct res_lib_ckpt_checkpointopenasync res_lib_ckpt_checkpointopenasync;
1261 struct checkpoint *checkpoint = 0;
1262 struct checkpoint_section *checkpoint_section = 0;
1263 struct checkpoint_cleanup *checkpoint_cleanup = 0;
1264 struct ckpt_pd *ckpt_pd;
1265 SaAisErrorT error = SA_AIS_OK;
1267 log_printf (LOG_LEVEL_DEBUG, "Executive request to open checkpoint %p\n", req_exec_ckpt_checkpointopen);
1269 if (req_exec_ckpt_checkpointopen->fail_with_error != SA_AIS_OK) {
1270 error = req_exec_ckpt_checkpointopen->fail_with_error;
1271 goto error_exit;
1274 if (message_source_is_local(&req_exec_ckpt_checkpointopen->source)) {
1275 checkpoint_cleanup = malloc (sizeof (struct checkpoint_cleanup));
1276 if (checkpoint_cleanup == 0) {
1277 error = SA_AIS_ERR_NO_MEMORY;
1278 goto error_exit;
1282 checkpoint = checkpoint_find_linked (
1283 &checkpoint_list_head,
1284 &req_exec_ckpt_checkpointopen->checkpoint_name);
1287 * If checkpoint doesn't exist, create one
1289 if (checkpoint == 0) {
1290 if ((req_exec_ckpt_checkpointopen->checkpoint_open_flags & SA_CKPT_CHECKPOINT_CREATE) == 0) {
1291 error = SA_AIS_ERR_NOT_EXIST;
1292 goto error_exit;
1294 checkpoint = malloc (sizeof (struct checkpoint));
1295 if (checkpoint == 0) {
1296 error = SA_AIS_ERR_NO_MEMORY;
1297 goto error_exit;
1300 memcpy (&checkpoint->name,
1301 &req_exec_ckpt_checkpointopen->checkpoint_name,
1302 sizeof (mar_name_t));
1303 memcpy (&checkpoint->checkpoint_creation_attributes,
1304 &req_exec_ckpt_checkpointopen->checkpoint_creation_attributes,
1305 sizeof (mar_ckpt_checkpoint_creation_attributes_t));
1306 checkpoint->unlinked = 0;
1307 list_init (&checkpoint->list);
1308 list_init (&checkpoint->sections_list_head);
1309 list_add (&checkpoint->list, &checkpoint_list_head);
1310 checkpoint->reference_count = 1;
1311 checkpoint->retention_timer = 0;
1312 checkpoint->section_count = 0;
1313 checkpoint->ckpt_id = global_ckpt_id++;
1315 if ((checkpoint->checkpoint_creation_attributes.creation_flags & (SA_CKPT_WR_ACTIVE_REPLICA | SA_CKPT_WR_ACTIVE_REPLICA_WEAK)) &&
1316 (checkpoint->checkpoint_creation_attributes.creation_flags & SA_CKPT_CHECKPOINT_COLLOCATED) == 0) {
1317 checkpoint->active_replica_set = 1;
1318 } else
1319 if ((checkpoint->checkpoint_creation_attributes.creation_flags & SA_CKPT_WR_ALL_REPLICAS) == 1) {
1320 checkpoint->active_replica_set = 1;
1321 } else {
1322 checkpoint->active_replica_set = 0;
1325 memset (&checkpoint->refcount_set, 0,
1326 sizeof (struct refcount_set) * PROCESSOR_COUNT_MAX);
1329 * Create default section id if max_sections is 1
1331 if (checkpoint->checkpoint_creation_attributes.max_sections == 1) {
1333 * Add in default checkpoint section
1335 checkpoint_section = malloc (sizeof (struct checkpoint_section));
1336 if (checkpoint_section == 0) {
1337 free (checkpoint);
1338 error = SA_AIS_ERR_NO_MEMORY;
1339 goto error_exit;
1342 list_init (&checkpoint_section->list);
1343 list_add (&checkpoint_section->list, &checkpoint->sections_list_head);
1346 checkpoint_section->section_descriptor.section_id.id = 0;
1347 checkpoint_section->section_descriptor.section_id.id_len = 0;
1348 checkpoint_section->section_descriptor.expiration_time = SA_TIME_END;
1349 checkpoint_section->section_descriptor.section_state = SA_CKPT_SECTION_VALID;
1350 checkpoint_section->section_descriptor.last_update = 0; /*current time*/
1351 checkpoint_section->section_descriptor.section_size = 0;
1352 checkpoint_section->section_data = NULL;
1353 checkpoint_section->expiration_timer = 0;
1355 } else {
1356 if (req_exec_ckpt_checkpointopen->checkpoint_creation_attributes_set &&
1357 memcmp (&checkpoint->checkpoint_creation_attributes,
1358 &req_exec_ckpt_checkpointopen->checkpoint_creation_attributes,
1359 sizeof (mar_ckpt_checkpoint_creation_attributes_t)) != 0) {
1361 error = SA_AIS_ERR_EXIST;
1362 goto error_exit;
1366 assert (checkpoint->unlinked == 0);
1369 * Setup connection information and mark checkpoint as referenced
1371 log_printf (LOG_LEVEL_DEBUG, "CHECKPOINT opened is %p\n", checkpoint);
1373 sync_refcount_increment (checkpoint, nodeid);
1374 sync_refcount_calculate (checkpoint);
1377 * Reset retention duration since this checkpoint was just opened
1379 openais_timer_delete (checkpoint->retention_timer);
1380 checkpoint->retention_timer = 0;
1383 * Send error result to CKPT library
1385 error_exit:
1387 * If this node was the source of the message, respond to this node
1389 if (message_source_is_local(&req_exec_ckpt_checkpointopen->source)) {
1391 * If its an async call respond with the invocation and handle
1393 if (req_exec_ckpt_checkpointopen->async_call) {
1394 res_lib_ckpt_checkpointopenasync.header.size = sizeof (struct res_lib_ckpt_checkpointopenasync);
1395 res_lib_ckpt_checkpointopenasync.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC;
1396 res_lib_ckpt_checkpointopenasync.header.error = error;
1397 res_lib_ckpt_checkpointopenasync.checkpoint_handle = req_exec_ckpt_checkpointopen->checkpoint_handle;
1398 res_lib_ckpt_checkpointopenasync.invocation = req_exec_ckpt_checkpointopen->invocation;
1399 if (error == SA_AIS_OK) {
1400 res_lib_ckpt_checkpointopenasync.ckpt_id = checkpoint->ckpt_id;
1403 openais_conn_send_response (
1404 req_exec_ckpt_checkpointopen->source.conn,
1405 &res_lib_ckpt_checkpointopenasync,
1406 sizeof (struct res_lib_ckpt_checkpointopenasync));
1407 openais_conn_send_response (
1408 openais_conn_partner_get (req_exec_ckpt_checkpointopen->source.conn),
1409 &res_lib_ckpt_checkpointopenasync,
1410 sizeof (struct res_lib_ckpt_checkpointopenasync));
1411 } else {
1413 * otherwise respond with the normal checkpointopen response
1415 res_lib_ckpt_checkpointopen.header.size = sizeof (struct res_lib_ckpt_checkpointopen);
1416 res_lib_ckpt_checkpointopen.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPEN;
1417 if (error == SA_AIS_OK) {
1418 res_lib_ckpt_checkpointopen.ckpt_id = checkpoint->ckpt_id;
1420 res_lib_ckpt_checkpointopen.header.error = error;
1422 openais_conn_send_response (
1423 req_exec_ckpt_checkpointopen->source.conn,
1424 &res_lib_ckpt_checkpointopen,
1425 sizeof (struct res_lib_ckpt_checkpointopen));
1429 * This is the path taken when all goes well and this call was local
1431 if (error == SA_AIS_OK) {
1432 ckpt_pd = openais_conn_private_data_get (req_exec_ckpt_checkpointopen->source.conn);
1434 memcpy(&checkpoint_cleanup->checkpoint_name,
1435 &checkpoint->name, sizeof (mar_name_t));
1436 checkpoint_cleanup->ckpt_id = checkpoint->ckpt_id;
1438 list_add (&checkpoint_cleanup->list,
1439 &ckpt_pd->checkpoint_list);
1440 } else {
1442 * We allocated this in the hope of using it but an error occured
1443 * so deallocate it.
1445 free (checkpoint_cleanup);
1450 unsigned int abstime_to_msec (mar_time_t time)
1452 struct timeval tv;
1453 unsigned long long curr_time;
1454 unsigned long long msec_time;
1456 gettimeofday (&tv, NULL);
1457 curr_time = ((((unsigned long long)tv.tv_sec) * ((unsigned long)1000)) +
1458 (((unsigned long long)tv.tv_usec) / ((unsigned long long)1000)));
1459 msec_time = (((unsigned long long)time) / 1000000) -
1460 (unsigned long long)curr_time;
1462 return ((unsigned int)(msec_time));
1465 void timer_function_section_expire (void *data)
1467 struct checkpoint *checkpoint = 0;
1468 struct checkpoint_section *checkpoint_section = 0;
1469 struct ckpt_identifier *ckpt_id = 0;
1471 ckpt_id = (struct ckpt_identifier *)data;
1472 log_printf (LOG_LEVEL_DEBUG, "timer_function_section_expire data = 0x%p\n",data);
1473 if (ckpt_id->ckpt_section_id.id_len && ckpt_id->ckpt_section_id.id) {
1474 log_printf (LOG_LEVEL_DEBUG, "Attempting to expire section %s in ckpt %s\n",
1475 ckpt_id->ckpt_section_id.id,
1476 ckpt_id->ckpt_name.value);
1478 else {
1479 log_printf (LOG_LEVEL_ERROR, "timer_function_section_expire data incorect\n");
1480 goto free_mem;
1483 checkpoint = checkpoint_find (
1484 &checkpoint_list_head,
1485 &ckpt_id->ckpt_name,
1486 ckpt_id->ckpt_id);
1487 if (checkpoint == 0) {
1488 log_printf (LOG_LEVEL_ERROR, "timer_function_section_expire could not find ckpt %s\n",
1489 ckpt_id->ckpt_name.value);
1490 goto free_mem;
1493 checkpoint_section = checkpoint_section_find (checkpoint,
1494 (char *)ckpt_id->ckpt_section_id.id,
1495 (int)ckpt_id->ckpt_section_id.id_len);
1496 if (checkpoint_section == 0) {
1497 log_printf (LOG_LEVEL_ERROR, "timer_function_section_expire could not find section %s in ckpt %s\n",
1498 ckpt_id->ckpt_section_id.id,
1499 ckpt_id->ckpt_name.value);
1500 goto free_mem;
1503 log_printf (LOG_LEVEL_DEBUG, "Expiring section %s in ckpt %s\n",
1504 ckpt_id->ckpt_section_id.id,
1505 ckpt_id->ckpt_name.value);
1507 checkpoint->section_count -= 1;
1508 checkpoint_section_release (checkpoint_section);
1510 free_mem :
1511 free (ckpt_id);
1515 void timer_function_retention (void *data)
1517 struct checkpoint *checkpoint = (struct checkpoint *)data;
1518 struct req_exec_ckpt_checkpointretentiondurationexpire req_exec_ckpt_checkpointretentiondurationexpire;
1519 struct iovec iovec;
1521 checkpoint->retention_timer = 0;
1522 req_exec_ckpt_checkpointretentiondurationexpire.header.size =
1523 sizeof (struct req_exec_ckpt_checkpointretentiondurationexpire);
1524 req_exec_ckpt_checkpointretentiondurationexpire.header.id =
1525 SERVICE_ID_MAKE (CKPT_SERVICE,
1526 MESSAGE_REQ_EXEC_CKPT_CHECKPOINTRETENTIONDURATIONEXPIRE);
1528 memcpy (&req_exec_ckpt_checkpointretentiondurationexpire.checkpoint_name,
1529 &checkpoint->name,
1530 sizeof (mar_name_t));
1531 req_exec_ckpt_checkpointretentiondurationexpire.ckpt_id =
1532 checkpoint->ckpt_id;
1534 iovec.iov_base = (char *)&req_exec_ckpt_checkpointretentiondurationexpire;
1535 iovec.iov_len = sizeof (req_exec_ckpt_checkpointretentiondurationexpire);
1537 assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0);
1540 static void message_handler_req_exec_ckpt_checkpointclose (
1541 void *message,
1542 unsigned int nodeid)
1544 struct req_exec_ckpt_checkpointclose *req_exec_ckpt_checkpointclose = (struct req_exec_ckpt_checkpointclose *)message;
1545 struct res_lib_ckpt_checkpointclose res_lib_ckpt_checkpointclose;
1546 struct checkpoint *checkpoint = 0;
1547 SaAisErrorT error = SA_AIS_OK;
1548 int release_checkpoint = 0;
1550 log_printf (LOG_LEVEL_DEBUG, "Got EXEC request to close checkpoint %s\n",
1551 get_mar_name_t (&req_exec_ckpt_checkpointclose->checkpoint_name));
1553 checkpoint = checkpoint_find (
1554 &checkpoint_list_head,
1555 &req_exec_ckpt_checkpointclose->checkpoint_name,
1556 req_exec_ckpt_checkpointclose->ckpt_id);
1557 if (checkpoint == 0) {
1558 error = SA_AIS_ERR_NOT_EXIST;
1559 goto error_exit;
1562 sync_refcount_decrement (checkpoint, nodeid);
1563 sync_refcount_calculate (checkpoint);
1565 log_printf (LOG_LEVEL_DEBUG, "Close checkpoint->reference_count %d\n",
1566 checkpoint->reference_count);
1567 assert (checkpoint->reference_count >= 0);
1570 * If checkpoint has been unlinked and this is the last reference, delete it
1572 if (checkpoint->unlinked && checkpoint->reference_count == 0) {
1573 log_printf (LOG_LEVEL_DEBUG, "Unlinking checkpoint.\n");
1574 release_checkpoint = 1;
1575 } else
1576 if (checkpoint->reference_count == 0) {
1577 if (checkpoint->checkpoint_creation_attributes.retention_duration != SA_TIME_END) {
1578 openais_timer_add_duration (
1579 checkpoint->checkpoint_creation_attributes.retention_duration,
1580 checkpoint,
1581 timer_function_retention,
1582 &checkpoint->retention_timer);
1586 error_exit:
1588 * Remove the checkpoint from my connections checkpoint list
1590 if (message_source_is_local(&req_exec_ckpt_checkpointclose->source)) {
1592 res_lib_ckpt_checkpointclose.header.size = sizeof (struct res_lib_ckpt_checkpointclose);
1593 res_lib_ckpt_checkpointclose.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTCLOSE;
1594 res_lib_ckpt_checkpointclose.header.error = error;
1595 openais_conn_send_response (req_exec_ckpt_checkpointclose->source.conn,
1596 &res_lib_ckpt_checkpointclose, sizeof (struct res_lib_ckpt_checkpointclose));
1600 * Release the checkpoint if instructed to do so.
1602 if (error == SA_AIS_OK && release_checkpoint) {
1603 checkpoint_release(checkpoint);
1607 static void message_handler_req_exec_ckpt_checkpointunlink (
1608 void *message,
1609 unsigned int nodeid)
1611 struct req_exec_ckpt_checkpointunlink *req_exec_ckpt_checkpointunlink = (struct req_exec_ckpt_checkpointunlink *)message;
1613 struct res_lib_ckpt_checkpointunlink res_lib_ckpt_checkpointunlink;
1614 struct checkpoint *checkpoint = 0;
1615 SaAisErrorT error = SA_AIS_OK;
1617 log_printf (LOG_LEVEL_DEBUG, "Got EXEC request to unlink checkpoint %p\n", req_exec_ckpt_checkpointunlink);
1618 checkpoint = checkpoint_find_linked (
1619 &checkpoint_list_head,
1620 &req_exec_ckpt_checkpointunlink->checkpoint_name);
1621 if (checkpoint == 0) {
1622 error = SA_AIS_ERR_NOT_EXIST;
1623 goto error_exit;
1626 assert (checkpoint->unlinked == 0);
1628 checkpoint->unlinked = 1;
1630 * Immediately delete entry if reference count is zero
1632 if (checkpoint->reference_count == 0) {
1634 * Remove retention timer since this checkpoint was unlinked and is no
1635 * longer referenced
1637 checkpoint_release (checkpoint);
1640 error_exit:
1642 * If this node was the source of the message, respond to this node
1644 if (message_source_is_local(&req_exec_ckpt_checkpointunlink->source)) {
1645 res_lib_ckpt_checkpointunlink.header.size = sizeof (struct res_lib_ckpt_checkpointunlink);
1646 res_lib_ckpt_checkpointunlink.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTUNLINK;
1647 res_lib_ckpt_checkpointunlink.header.error = error;
1648 openais_conn_send_response (
1649 req_exec_ckpt_checkpointunlink->source.conn,
1650 &res_lib_ckpt_checkpointunlink,
1651 sizeof (struct res_lib_ckpt_checkpointunlink));
1655 static void message_handler_req_exec_ckpt_checkpointretentiondurationset (
1656 void *message,
1657 unsigned int nodeid)
1659 struct req_exec_ckpt_checkpointretentiondurationset *req_exec_ckpt_checkpointretentiondurationset = (struct req_exec_ckpt_checkpointretentiondurationset *)message;
1660 struct res_lib_ckpt_checkpointretentiondurationset res_lib_ckpt_checkpointretentiondurationset;
1661 struct checkpoint *checkpoint;
1662 SaAisErrorT error = SA_AIS_ERR_BAD_OPERATION;
1664 checkpoint = checkpoint_find (
1665 &checkpoint_list_head,
1666 &req_exec_ckpt_checkpointretentiondurationset->checkpoint_name,
1667 req_exec_ckpt_checkpointretentiondurationset->ckpt_id);
1668 if (checkpoint) {
1669 log_printf (LOG_LEVEL_DEBUG, "Setting retention duration for checkpoint %s\n",
1670 get_mar_name_t (&req_exec_ckpt_checkpointretentiondurationset->checkpoint_name));
1671 if (checkpoint->unlinked == 0) {
1672 checkpoint->checkpoint_creation_attributes.retention_duration =
1673 req_exec_ckpt_checkpointretentiondurationset->retention_duration;
1675 if (checkpoint->reference_count == 0) {
1676 openais_timer_delete (checkpoint->retention_timer);
1678 openais_timer_add_duration (
1679 checkpoint->checkpoint_creation_attributes.retention_duration,
1680 checkpoint,
1681 timer_function_retention,
1682 &checkpoint->retention_timer);
1684 error = SA_AIS_OK;
1689 * Respond to library if this processor sent the duration set request
1691 if (message_source_is_local(&req_exec_ckpt_checkpointretentiondurationset->source)) {
1692 res_lib_ckpt_checkpointretentiondurationset.header.size = sizeof (struct res_lib_ckpt_checkpointretentiondurationset);
1693 res_lib_ckpt_checkpointretentiondurationset.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTRETENTIONDURATIONSET;
1694 res_lib_ckpt_checkpointretentiondurationset.header.error = error;
1696 openais_conn_send_response (
1697 req_exec_ckpt_checkpointretentiondurationset->source.conn,
1698 &res_lib_ckpt_checkpointretentiondurationset,
1699 sizeof (struct res_lib_ckpt_checkpointretentiondurationset));
1703 static void message_handler_req_exec_ckpt_checkpointretentiondurationexpire (
1704 void *message,
1705 unsigned int nodeid)
1707 struct req_exec_ckpt_checkpointretentiondurationexpire *req_exec_ckpt_checkpointretentiondurationexpire = (struct req_exec_ckpt_checkpointretentiondurationexpire *)message;
1708 struct req_exec_ckpt_checkpointunlink req_exec_ckpt_checkpointunlink;
1709 struct checkpoint *checkpoint;
1710 struct iovec iovec;
1712 checkpoint = checkpoint_find (
1713 &checkpoint_list_head,
1714 &req_exec_ckpt_checkpointretentiondurationexpire->checkpoint_name,
1715 req_exec_ckpt_checkpointretentiondurationexpire->ckpt_id);
1716 log_printf (LOG_LEVEL_NOTICE, "Expiring checkpoint %s\n",
1717 get_mar_name_t (&req_exec_ckpt_checkpointretentiondurationexpire->checkpoint_name));
1718 if (checkpoint && (checkpoint->reference_count == 0)) {
1719 log_printf (LOG_LEVEL_NOTICE, "Expiring checkpoint %s\n",
1720 get_mar_name_t (&req_exec_ckpt_checkpointretentiondurationexpire->checkpoint_name));
1722 req_exec_ckpt_checkpointunlink.header.size =
1723 sizeof (struct req_exec_ckpt_checkpointunlink);
1724 req_exec_ckpt_checkpointunlink.header.id =
1725 SERVICE_ID_MAKE (CKPT_SERVICE,
1726 MESSAGE_REQ_EXEC_CKPT_CHECKPOINTUNLINK);
1728 req_exec_ckpt_checkpointunlink.source.conn = 0;
1729 req_exec_ckpt_checkpointunlink.source.nodeid = 0;
1731 memcpy (&req_exec_ckpt_checkpointunlink.checkpoint_name,
1732 &req_exec_ckpt_checkpointretentiondurationexpire->checkpoint_name,
1733 sizeof (mar_name_t));
1735 iovec.iov_base = (char *)&req_exec_ckpt_checkpointunlink;
1736 iovec.iov_len = sizeof (req_exec_ckpt_checkpointunlink);
1738 assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0);
1743 static void message_handler_req_exec_ckpt_sectioncreate (
1744 void *message,
1745 unsigned int nodeid)
1747 struct req_exec_ckpt_sectioncreate *req_exec_ckpt_sectioncreate = (struct req_exec_ckpt_sectioncreate *)message;
1748 struct res_lib_ckpt_sectioncreate res_lib_ckpt_sectioncreate;
1749 struct checkpoint *checkpoint;
1750 struct checkpoint_section *checkpoint_section;
1751 void *initial_data;
1752 void *section_id;
1753 struct ckpt_identifier *ckpt_id = 0;
1754 SaAisErrorT error = SA_AIS_OK;
1756 log_printf (LOG_LEVEL_DEBUG, "Executive request to create a checkpoint section.\n");
1757 checkpoint = checkpoint_find (
1758 &checkpoint_list_head,
1759 &req_exec_ckpt_sectioncreate->checkpoint_name,
1760 req_exec_ckpt_sectioncreate->ckpt_id);
1761 if (checkpoint == 0) {
1762 error = SA_AIS_ERR_NOT_EXIST;
1763 goto error_exit;
1766 if (checkpoint->section_count == checkpoint->checkpoint_creation_attributes.max_sections) {
1767 error = SA_AIS_ERR_NO_SPACE;
1768 goto error_exit;
1771 if (checkpoint->checkpoint_creation_attributes.max_sections == 1) {
1772 error = SA_AIS_ERR_EXIST;
1773 goto error_exit;
1776 if (checkpoint->checkpoint_creation_attributes.max_section_size <
1777 req_exec_ckpt_sectioncreate->initial_data_size) {
1779 error = SA_AIS_ERR_INVALID_PARAM;
1780 goto error_exit;
1783 if (checkpoint->checkpoint_creation_attributes.max_section_id_size <
1784 req_exec_ckpt_sectioncreate->id_len) {
1786 error = SA_AIS_ERR_INVALID_PARAM;
1787 goto error_exit;
1791 * Determine if user-specified checkpoint section already exists
1793 checkpoint_section = checkpoint_section_find (checkpoint,
1794 ((char *)req_exec_ckpt_sectioncreate) +
1795 sizeof (struct req_exec_ckpt_sectioncreate),
1796 req_exec_ckpt_sectioncreate->id_len);
1797 if (checkpoint_section) {
1798 error = SA_AIS_ERR_EXIST;
1799 goto error_exit;
1803 * Allocate checkpoint section
1805 checkpoint_section = malloc (sizeof (struct checkpoint_section));
1806 if (checkpoint_section == 0) {
1807 error = SA_AIS_ERR_NO_MEMORY;
1808 goto error_exit;
1811 * Allocate checkpoint section data
1813 initial_data = malloc (req_exec_ckpt_sectioncreate->initial_data_size);
1814 if (initial_data == 0) {
1815 free (checkpoint_section);
1816 error = SA_AIS_ERR_NO_MEMORY;
1817 goto error_exit;
1820 * Allocate checkpoint section id
1822 section_id = malloc (req_exec_ckpt_sectioncreate->id_len + 1);
1823 if (section_id == 0) {
1824 free (checkpoint_section);
1825 free (initial_data);
1826 error = SA_AIS_ERR_NO_MEMORY;
1827 goto error_exit;
1831 * Copy checkpoint section and section ID
1833 memcpy (section_id,
1834 ((char *)req_exec_ckpt_sectioncreate) +
1835 sizeof (struct req_exec_ckpt_sectioncreate),
1836 req_exec_ckpt_sectioncreate->id_len);
1838 /*Must be null terminated if it already isn't*/
1839 ((char*)(section_id))[req_exec_ckpt_sectioncreate->id_len] = '\0';
1841 memcpy (initial_data,
1842 ((char *)req_exec_ckpt_sectioncreate) +
1843 sizeof (struct req_exec_ckpt_sectioncreate) +
1844 req_exec_ckpt_sectioncreate->id_len,
1845 req_exec_ckpt_sectioncreate->initial_data_size);
1848 * Configure checkpoint section
1850 checkpoint_section->section_descriptor.section_id.id = section_id;
1851 checkpoint_section->section_descriptor.section_id.id_len =
1852 req_exec_ckpt_sectioncreate->id_len;
1853 checkpoint_section->section_descriptor.section_size =
1854 req_exec_ckpt_sectioncreate->initial_data_size;
1855 checkpoint_section->section_descriptor.expiration_time =
1856 req_exec_ckpt_sectioncreate->expiration_time;
1857 checkpoint_section->section_descriptor.section_state =
1858 SA_CKPT_SECTION_VALID;
1859 checkpoint_section->section_descriptor.last_update = 0; /* TODO current time */
1860 checkpoint_section->section_data = initial_data;
1861 checkpoint_section->expiration_timer = 0;
1863 if (req_exec_ckpt_sectioncreate->expiration_time != SA_TIME_END) {
1864 ckpt_id = malloc (sizeof(struct ckpt_identifier));
1865 assert(ckpt_id);
1866 memcpy(&ckpt_id->ckpt_name,
1867 &req_exec_ckpt_sectioncreate->checkpoint_name,
1868 sizeof(mar_name_t));
1869 ckpt_id->ckpt_id = req_exec_ckpt_sectioncreate->ckpt_id;
1870 memcpy(&ckpt_id->ckpt_section_id,
1871 &checkpoint_section->section_descriptor.section_id,
1872 sizeof(mar_ckpt_section_id_t));
1873 log_printf (LOG_LEVEL_DEBUG, "req_exec_ckpt_sectioncreate Enqueuing Timer to Expire section %s in ckpt %s\n",
1874 ckpt_id->ckpt_section_id.id,
1875 ckpt_id->ckpt_name.value);
1876 openais_timer_add_absolute (
1877 checkpoint_section->section_descriptor.expiration_time,
1878 ckpt_id,
1879 timer_function_section_expire,
1880 &checkpoint_section->expiration_timer);
1881 log_printf (LOG_LEVEL_DEBUG,
1882 "req_exec_ckpt_sectionicreate expiration timer = 0x%p\n",
1883 checkpoint_section->expiration_timer);
1886 log_printf (LOG_LEVEL_DEBUG,
1887 "message_handler_req_exec_ckpt_sectioncreate created section with id = %s, id_len = %d\n",
1888 checkpoint_section->section_descriptor.section_id.id,
1889 checkpoint_section->section_descriptor.section_id.id_len);
1891 * Add checkpoint section to checkpoint
1893 list_init (&checkpoint_section->list);
1894 list_add (&checkpoint_section->list,
1895 &checkpoint->sections_list_head);
1896 checkpoint->section_count += 1;
1898 error_exit:
1899 if (message_source_is_local(&req_exec_ckpt_sectioncreate->source)) {
1900 res_lib_ckpt_sectioncreate.header.size = sizeof (struct res_lib_ckpt_sectioncreate);
1901 res_lib_ckpt_sectioncreate.header.id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONCREATE;
1902 res_lib_ckpt_sectioncreate.header.error = error;
1904 openais_conn_send_response (req_exec_ckpt_sectioncreate->source.conn,
1905 &res_lib_ckpt_sectioncreate,
1906 sizeof (struct res_lib_ckpt_sectioncreate));
1910 static void message_handler_req_exec_ckpt_sectiondelete (
1911 void *message,
1912 unsigned int nodeid)
1914 struct req_exec_ckpt_sectiondelete *req_exec_ckpt_sectiondelete = (struct req_exec_ckpt_sectiondelete *)message;
1915 struct res_lib_ckpt_sectiondelete res_lib_ckpt_sectiondelete;
1916 struct checkpoint *checkpoint;
1917 struct checkpoint_section *checkpoint_section;
1918 SaAisErrorT error = SA_AIS_OK;
1920 checkpoint = checkpoint_find (
1921 &checkpoint_list_head,
1922 &req_exec_ckpt_sectiondelete->checkpoint_name,
1923 req_exec_ckpt_sectiondelete->ckpt_id);
1924 if (checkpoint == 0) {
1925 error = SA_AIS_ERR_NOT_EXIST;
1926 goto error_exit;
1929 if (checkpoint->active_replica_set == 0) {
1930 log_printf (LOG_LEVEL_DEBUG, "sectiondelete: no active replica, returning error.\n");
1931 error = SA_AIS_ERR_NOT_EXIST;
1932 goto error_exit;
1936 * Determine if the user is trying to delete the default section
1938 if (req_exec_ckpt_sectiondelete->id_len == 0) {
1939 error = SA_AIS_ERR_INVALID_PARAM;
1940 goto error_exit;
1944 * Find checkpoint section to be deleted
1946 checkpoint_section = checkpoint_section_find (checkpoint,
1947 ((char *)(req_exec_ckpt_sectiondelete) + sizeof (struct req_exec_ckpt_sectiondelete)),
1948 req_exec_ckpt_sectiondelete->id_len);
1949 if (checkpoint_section == 0) {
1950 error = SA_AIS_ERR_NOT_EXIST;
1951 goto error_exit;
1955 * Delete checkpoint section
1957 checkpoint->section_count -= 1;
1958 checkpoint_section_release (checkpoint_section);
1961 * return result to CKPT library
1963 error_exit:
1964 if (message_source_is_local(&req_exec_ckpt_sectiondelete->source)) {
1965 res_lib_ckpt_sectiondelete.header.size = sizeof (struct res_lib_ckpt_sectiondelete);
1966 res_lib_ckpt_sectiondelete.header.id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONDELETE;
1967 res_lib_ckpt_sectiondelete.header.error = error;
1969 openais_conn_send_response (
1970 req_exec_ckpt_sectiondelete->source.conn,
1971 &res_lib_ckpt_sectiondelete,
1972 sizeof (struct res_lib_ckpt_sectiondelete));
1976 static void message_handler_req_exec_ckpt_sectionexpirationtimeset (
1977 void *message,
1978 unsigned int nodeid)
1980 struct req_exec_ckpt_sectionexpirationtimeset *req_exec_ckpt_sectionexpirationtimeset = (struct req_exec_ckpt_sectionexpirationtimeset *)message;
1981 struct res_lib_ckpt_sectionexpirationtimeset res_lib_ckpt_sectionexpirationtimeset;
1982 struct checkpoint *checkpoint;
1983 struct checkpoint_section *checkpoint_section;
1984 struct ckpt_identifier *ckpt_id = 0;
1985 SaAisErrorT error = SA_AIS_OK;
1987 log_printf (LOG_LEVEL_DEBUG, "Executive request to set section expiration time\n");
1988 checkpoint = checkpoint_find (
1989 &checkpoint_list_head,
1990 &req_exec_ckpt_sectionexpirationtimeset->checkpoint_name,
1991 req_exec_ckpt_sectionexpirationtimeset->ckpt_id);
1992 if (checkpoint == 0) {
1993 error = SA_AIS_ERR_NOT_EXIST;
1994 goto error_exit;
1997 if (checkpoint->active_replica_set == 0) {
1998 log_printf (LOG_LEVEL_DEBUG, "expirationset: no active replica, returning error.\n");
1999 error = SA_AIS_ERR_NOT_EXIST;
2000 goto error_exit;
2004 * Determine if the user is trying to set expiration time for the default section
2006 if (req_exec_ckpt_sectionexpirationtimeset->id_len == 0) {
2007 error = SA_AIS_ERR_INVALID_PARAM;
2008 goto error_exit;
2012 * Find checkpoint section that expiration time should be set for
2014 checkpoint_section = checkpoint_section_find (checkpoint,
2015 ((char *)req_exec_ckpt_sectionexpirationtimeset) +
2016 sizeof (struct req_exec_ckpt_sectionexpirationtimeset),
2017 req_exec_ckpt_sectionexpirationtimeset->id_len);
2019 if (checkpoint_section == 0) {
2020 error = SA_AIS_ERR_NOT_EXIST;
2021 goto error_exit;
2024 checkpoint_section->section_descriptor.expiration_time =
2025 req_exec_ckpt_sectionexpirationtimeset->expiration_time;
2027 openais_timer_delete (checkpoint_section->expiration_timer);
2028 checkpoint_section->expiration_timer = 0;
2030 if (req_exec_ckpt_sectionexpirationtimeset->expiration_time != SA_TIME_END) {
2031 ckpt_id = malloc (sizeof(struct ckpt_identifier));
2032 assert(ckpt_id);
2033 memcpy(&ckpt_id->ckpt_name,
2034 &req_exec_ckpt_sectionexpirationtimeset->checkpoint_name,
2035 sizeof(mar_name_t));
2036 ckpt_id->ckpt_id =
2037 req_exec_ckpt_sectionexpirationtimeset->ckpt_id;
2038 memcpy(&ckpt_id->ckpt_section_id,
2039 &checkpoint_section->section_descriptor.section_id,
2040 sizeof(mar_ckpt_section_id_t));
2041 log_printf (LOG_LEVEL_DEBUG, "req_exec_ckpt_sectionexpirationtimeset Enqueuing Timer to Expire section %s in ckpt %s, ref = 0x%p\n",
2042 ckpt_id->ckpt_section_id.id,
2043 ckpt_id->ckpt_name.value,
2044 ckpt_id);
2045 openais_timer_add_absolute (
2046 checkpoint_section->section_descriptor.expiration_time,
2047 ckpt_id,
2048 timer_function_section_expire,
2049 &checkpoint_section->expiration_timer);
2050 log_printf (LOG_LEVEL_DEBUG, "req_exec_ckpt_sectionexpirationtimeset expiration timer = 0x%p\n",
2051 checkpoint_section->expiration_timer);
2054 error_exit:
2055 if (message_source_is_local (&req_exec_ckpt_sectionexpirationtimeset->source)) {
2056 res_lib_ckpt_sectionexpirationtimeset.header.size =
2057 sizeof (struct res_lib_ckpt_sectionexpirationtimeset);
2058 res_lib_ckpt_sectionexpirationtimeset.header.id =
2059 MESSAGE_RES_CKPT_CHECKPOINT_SECTIONEXPIRATIONTIMESET;
2060 res_lib_ckpt_sectionexpirationtimeset.header.error = error;
2062 openais_conn_send_response (
2063 req_exec_ckpt_sectionexpirationtimeset->source.conn,
2064 &res_lib_ckpt_sectionexpirationtimeset,
2065 sizeof (struct res_lib_ckpt_sectionexpirationtimeset));
2069 static void message_handler_req_exec_ckpt_sectionwrite (
2070 void *message,
2071 unsigned int nodeid)
2073 struct req_exec_ckpt_sectionwrite *req_exec_ckpt_sectionwrite = (struct req_exec_ckpt_sectionwrite *)message;
2074 struct res_lib_ckpt_sectionwrite res_lib_ckpt_sectionwrite;
2075 struct checkpoint *checkpoint;
2076 struct checkpoint_section *checkpoint_section = 0;
2077 int size_required;
2078 void *section_data;
2079 SaAisErrorT error = SA_AIS_OK;
2081 log_printf (LOG_LEVEL_DEBUG, "Executive request to section write.\n");
2082 checkpoint = checkpoint_find (
2083 &checkpoint_list_head,
2084 &req_exec_ckpt_sectionwrite->checkpoint_name,
2085 req_exec_ckpt_sectionwrite->ckpt_id);
2086 if (checkpoint == 0) {
2087 log_printf (LOG_LEVEL_ERROR, "checkpoint_find returned 0 Calling error_exit.\n");
2088 error = SA_AIS_ERR_NOT_EXIST;
2089 goto error_exit;
2092 if (checkpoint->active_replica_set == 0) {
2093 log_printf (LOG_LEVEL_DEBUG, "checkpointwrite: no active replica, returning error.\n");
2094 error = SA_AIS_ERR_NOT_EXIST;
2095 goto error_exit;
2098 if (checkpoint->checkpoint_creation_attributes.max_section_size < req_exec_ckpt_sectionwrite->data_size) {
2099 error = SA_AIS_ERR_INVALID_PARAM;
2100 goto error_exit;
2103 log_printf (LOG_LEVEL_DEBUG, "writing checkpoint section is %s\n",
2104 ((char *)req_exec_ckpt_sectionwrite) +
2105 sizeof (struct req_exec_ckpt_sectionwrite));
2108 * Find checkpoint section to be written
2110 checkpoint_section = checkpoint_section_find (checkpoint,
2111 ((char *)req_exec_ckpt_sectionwrite) +
2112 sizeof (struct req_exec_ckpt_sectionwrite),
2113 req_exec_ckpt_sectionwrite->id_len);
2114 if (checkpoint_section == 0) {
2115 if (req_exec_ckpt_sectionwrite->id_len == 0) {
2116 log_printf (LOG_LEVEL_DEBUG, "CANT FIND DEFAULT SECTION.\n");
2118 else {
2119 log_printf (LOG_LEVEL_DEBUG, "CANT FIND SECTION '%s'\n",
2120 ((char *)req_exec_ckpt_sectionwrite) +
2121 sizeof (struct req_exec_ckpt_sectionwrite));
2123 error = SA_AIS_ERR_NOT_EXIST;
2124 goto error_exit;
2128 * If write would extend past end of section data, enlarge section
2130 size_required = req_exec_ckpt_sectionwrite->data_offset +
2131 req_exec_ckpt_sectionwrite->data_size;
2132 if (size_required > checkpoint_section->section_descriptor.section_size) {
2133 section_data = realloc (checkpoint_section->section_data, size_required);
2134 if (section_data == 0) {
2135 log_printf (LOG_LEVEL_ERROR, "section_data realloc returned 0 Calling error_exit.\n");
2136 error = SA_AIS_ERR_NO_MEMORY;
2137 goto error_exit;
2141 * Install new section data
2143 checkpoint_section->section_data = section_data;
2144 checkpoint_section->section_descriptor.section_size = size_required;
2148 * Write checkpoint section to section data
2150 if (req_exec_ckpt_sectionwrite->data_size > 0) {
2151 char *sd;
2152 int *val;
2153 val = checkpoint_section->section_data;
2154 sd = (char *)checkpoint_section->section_data;
2155 memcpy (&sd[req_exec_ckpt_sectionwrite->data_offset],
2156 ((char *)req_exec_ckpt_sectionwrite) +
2157 sizeof (struct req_exec_ckpt_sectionwrite) +
2158 req_exec_ckpt_sectionwrite->id_len,
2159 req_exec_ckpt_sectionwrite->data_size);
2162 * Write sectionwrite response to CKPT library
2164 error_exit:
2165 if (message_source_is_local(&req_exec_ckpt_sectionwrite->source)) {
2166 res_lib_ckpt_sectionwrite.header.size =
2167 sizeof (struct res_lib_ckpt_sectionwrite);
2168 res_lib_ckpt_sectionwrite.header.id =
2169 MESSAGE_RES_CKPT_CHECKPOINT_SECTIONWRITE;
2170 res_lib_ckpt_sectionwrite.header.error = error;
2172 openais_conn_send_response (
2173 req_exec_ckpt_sectionwrite->source.conn,
2174 &res_lib_ckpt_sectionwrite,
2175 sizeof (struct res_lib_ckpt_sectionwrite));
2179 static void message_handler_req_exec_ckpt_sectionoverwrite (
2180 void *message,
2181 unsigned int nodeid)
2183 struct req_exec_ckpt_sectionoverwrite *req_exec_ckpt_sectionoverwrite = (struct req_exec_ckpt_sectionoverwrite *)message;
2184 struct res_lib_ckpt_sectionoverwrite res_lib_ckpt_sectionoverwrite;
2185 struct checkpoint *checkpoint;
2186 struct checkpoint_section *checkpoint_section;
2187 void *section_data;
2188 SaAisErrorT error = SA_AIS_OK;
2190 log_printf (LOG_LEVEL_DEBUG, "Executive request to section overwrite.\n");
2191 checkpoint = checkpoint_find (
2192 &checkpoint_list_head,
2193 &req_exec_ckpt_sectionoverwrite->checkpoint_name,
2194 req_exec_ckpt_sectionoverwrite->ckpt_id);
2195 if (checkpoint == 0) {
2196 error = SA_AIS_ERR_NOT_EXIST;
2197 goto error_exit;
2200 if (checkpoint->active_replica_set == 0) {
2201 log_printf (LOG_LEVEL_DEBUG, "sectionoverwrite: no active replica, returning error.\n");
2202 error = SA_AIS_ERR_NOT_EXIST;
2203 goto error_exit;
2206 if (checkpoint->checkpoint_creation_attributes.max_section_size <
2207 req_exec_ckpt_sectionoverwrite->data_size) {
2209 error = SA_AIS_ERR_INVALID_PARAM;
2210 goto error_exit;
2214 * Find checkpoint section to be overwritten
2216 checkpoint_section = checkpoint_section_find (checkpoint,
2217 ((char *)req_exec_ckpt_sectionoverwrite) +
2218 sizeof (struct req_exec_ckpt_sectionoverwrite),
2219 req_exec_ckpt_sectionoverwrite->id_len);
2220 if (checkpoint_section == 0) {
2221 error = SA_AIS_ERR_NOT_EXIST;
2222 goto error_exit;
2226 * Allocate checkpoint section data
2228 section_data = malloc (req_exec_ckpt_sectionoverwrite->data_size);
2229 if (section_data == 0) {
2230 error = SA_AIS_ERR_NO_MEMORY;
2231 goto error_exit;
2234 memcpy (section_data,
2235 ((char *)req_exec_ckpt_sectionoverwrite) +
2236 sizeof (struct req_exec_ckpt_sectionoverwrite) +
2237 req_exec_ckpt_sectionoverwrite->id_len,
2238 req_exec_ckpt_sectionoverwrite->data_size);
2241 * release old checkpoint section data
2243 free (checkpoint_section->section_data);
2246 * Install overwritten checkpoint section data
2248 checkpoint_section->section_descriptor.section_size =
2249 req_exec_ckpt_sectionoverwrite->data_size;
2250 checkpoint_section->section_descriptor.section_state =
2251 SA_CKPT_SECTION_VALID;
2253 * TODO current time
2255 checkpoint_section->section_descriptor.last_update = 0;
2256 checkpoint_section->section_data = section_data;
2259 * return result to CKPT library
2261 error_exit:
2262 if (message_source_is_local(&req_exec_ckpt_sectionoverwrite->source)) {
2263 res_lib_ckpt_sectionoverwrite.header.size =
2264 sizeof (struct res_lib_ckpt_sectionoverwrite);
2265 res_lib_ckpt_sectionoverwrite.header.id =
2266 MESSAGE_RES_CKPT_CHECKPOINT_SECTIONOVERWRITE;
2267 res_lib_ckpt_sectionoverwrite.header.error = error;
2269 openais_conn_send_response (
2270 req_exec_ckpt_sectionoverwrite->source.conn,
2271 &res_lib_ckpt_sectionoverwrite,
2272 sizeof (struct res_lib_ckpt_sectionoverwrite));
2276 static void message_handler_req_exec_ckpt_sectionread (
2277 void *message,
2278 unsigned int nodeid)
2280 struct req_exec_ckpt_sectionread *req_exec_ckpt_sectionread = (struct req_exec_ckpt_sectionread *)message;
2281 struct res_lib_ckpt_sectionread res_lib_ckpt_sectionread;
2282 struct checkpoint *checkpoint;
2283 struct checkpoint_section *checkpoint_section = 0;
2284 int section_size = 0;
2285 SaAisErrorT error = SA_AIS_OK;
2287 res_lib_ckpt_sectionread.data_read = 0;
2289 log_printf (LOG_LEVEL_DEBUG, "Executive request for section read.\n");
2291 checkpoint = checkpoint_find (
2292 &checkpoint_list_head,
2293 &req_exec_ckpt_sectionread->checkpoint_name,
2294 req_exec_ckpt_sectionread->ckpt_id);
2295 if (checkpoint == 0) {
2296 error = SA_AIS_ERR_LIBRARY;
2297 goto error_exit;
2300 if (checkpoint->active_replica_set == 0) {
2301 error = SA_AIS_ERR_NOT_EXIST;
2302 goto error_exit;
2306 * Find checkpoint section to be read
2308 checkpoint_section = checkpoint_section_find (checkpoint,
2309 ((char *)req_exec_ckpt_sectionread) +
2310 sizeof (struct req_exec_ckpt_sectionread),
2311 req_exec_ckpt_sectionread->id_len);
2312 if (checkpoint_section == 0) {
2313 error = SA_AIS_ERR_NOT_EXIST;
2314 goto error_exit;
2318 * If data size is greater then max section size, return INVALID_PARAM
2320 if (checkpoint->checkpoint_creation_attributes.max_section_size <
2321 req_exec_ckpt_sectionread->data_size) {
2323 error = SA_AIS_ERR_INVALID_PARAM;
2324 goto error_exit;
2328 * If data_offset is past end of data, return INVALID_PARAM
2330 if (req_exec_ckpt_sectionread->data_offset > checkpoint_section->section_descriptor.section_size) {
2331 error = SA_AIS_ERR_INVALID_PARAM;
2332 goto error_exit;
2336 * Determine the section size
2338 section_size = checkpoint_section->section_descriptor.section_size -
2339 req_exec_ckpt_sectionread->data_offset;
2342 * If the library has less space available then can be sent from the
2343 * section, reduce bytes sent to library to max requested
2345 if (section_size > req_exec_ckpt_sectionread->data_size) {
2346 section_size = req_exec_ckpt_sectionread->data_size;
2350 * Write read response to CKPT library
2352 error_exit:
2353 if (message_source_is_local(&req_exec_ckpt_sectionread->source)) {
2354 res_lib_ckpt_sectionread.header.size = sizeof (struct res_lib_ckpt_sectionread) + section_size;
2355 res_lib_ckpt_sectionread.header.id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONREAD;
2356 res_lib_ckpt_sectionread.header.error = error;
2358 if (section_size != 0) {
2359 res_lib_ckpt_sectionread.data_read = section_size;
2362 openais_conn_send_response (
2363 req_exec_ckpt_sectionread->source.conn,
2364 &res_lib_ckpt_sectionread,
2365 sizeof (struct res_lib_ckpt_sectionread));
2368 * Write checkpoint to CKPT library section if section has data
2370 if (error == SA_AIS_OK) {
2371 char *sd;
2372 sd = (char *)checkpoint_section->section_data;
2373 openais_conn_send_response (
2374 req_exec_ckpt_sectionread->source.conn,
2375 &sd[req_exec_ckpt_sectionread->data_offset],
2376 section_size);
2381 static int ckpt_lib_init_fn (void *conn)
2383 struct ckpt_pd *ckpt_pd = (struct ckpt_pd *)openais_conn_private_data_get (conn);
2385 hdb_create (&ckpt_pd->iteration_hdb);
2387 list_init (&ckpt_pd->checkpoint_list);
2389 return (0);
2393 static int ckpt_lib_exit_fn (void *conn)
2395 struct checkpoint_cleanup *checkpoint_cleanup;
2396 struct list_head *list;
2397 struct ckpt_pd *ckpt_pd = (struct ckpt_pd *)openais_conn_private_data_get (conn);
2399 log_printf (LOG_LEVEL_DEBUG, "checkpoint exit conn %p\n", conn);
2402 * close all checkpoints opened on this connection
2404 list = ckpt_pd->checkpoint_list.next;
2405 while (!list_empty(&ckpt_pd->checkpoint_list)) {
2407 checkpoint_cleanup = list_entry (list,
2408 struct checkpoint_cleanup, list);
2410 assert (checkpoint_cleanup->checkpoint_name.length != 0);
2411 ckpt_checkpoint_close (
2412 &checkpoint_cleanup->checkpoint_name,
2413 checkpoint_cleanup->ckpt_id);
2415 list_del (&checkpoint_cleanup->list);
2416 free (checkpoint_cleanup);
2418 list = ckpt_pd->checkpoint_list.next;
2421 hdb_destroy (&ckpt_pd->iteration_hdb);
2423 return (0);
2427 static void message_handler_req_lib_ckpt_checkpointopen (
2428 void *conn,
2429 void *msg)
2431 struct req_lib_ckpt_checkpointopen *req_lib_ckpt_checkpointopen = (struct req_lib_ckpt_checkpointopen *)msg;
2432 struct req_exec_ckpt_checkpointopen req_exec_ckpt_checkpointopen;
2433 struct iovec iovec;
2435 log_printf (LOG_LEVEL_DEBUG, "Library request to open checkpoint.\n");
2436 req_exec_ckpt_checkpointopen.header.size =
2437 sizeof (struct req_exec_ckpt_checkpointopen);
2438 req_exec_ckpt_checkpointopen.header.id =
2439 SERVICE_ID_MAKE (CKPT_SERVICE, MESSAGE_REQ_EXEC_CKPT_CHECKPOINTOPEN);
2441 message_source_set (&req_exec_ckpt_checkpointopen.source, conn);
2442 memcpy (&req_exec_ckpt_checkpointopen.checkpoint_name,
2443 &req_lib_ckpt_checkpointopen->checkpoint_name,
2444 sizeof (mar_name_t));
2445 req_exec_ckpt_checkpointopen.ckpt_id =
2446 req_lib_ckpt_checkpointopen->ckpt_id;
2447 memcpy (&req_exec_ckpt_checkpointopen.checkpoint_creation_attributes,
2448 &req_lib_ckpt_checkpointopen->checkpoint_creation_attributes,
2449 sizeof (mar_ckpt_checkpoint_creation_attributes_t));
2450 req_exec_ckpt_checkpointopen.checkpoint_creation_attributes_set =
2451 req_lib_ckpt_checkpointopen->checkpoint_creation_attributes_set;
2452 req_exec_ckpt_checkpointopen.checkpoint_open_flags =
2453 req_lib_ckpt_checkpointopen->checkpoint_open_flags;
2454 req_exec_ckpt_checkpointopen.invocation =
2455 req_lib_ckpt_checkpointopen->invocation;
2456 req_exec_ckpt_checkpointopen.checkpoint_handle =
2457 req_lib_ckpt_checkpointopen->checkpoint_handle;
2458 req_exec_ckpt_checkpointopen.fail_with_error =
2459 req_lib_ckpt_checkpointopen->fail_with_error;
2460 req_exec_ckpt_checkpointopen.async_call =
2461 req_lib_ckpt_checkpointopen->async_call;
2463 iovec.iov_base = (char *)&req_exec_ckpt_checkpointopen;
2464 iovec.iov_len = sizeof (req_exec_ckpt_checkpointopen);
2466 assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0);
2469 static void message_handler_req_lib_ckpt_checkpointclose (
2470 void *conn,
2471 void *msg)
2473 struct req_lib_ckpt_checkpointclose *req_lib_ckpt_checkpointclose = (struct req_lib_ckpt_checkpointclose *)msg;
2474 struct req_exec_ckpt_checkpointclose req_exec_ckpt_checkpointclose;
2475 struct iovec iovec;
2477 req_exec_ckpt_checkpointclose.header.size =
2478 sizeof (struct req_exec_ckpt_checkpointclose);
2479 req_exec_ckpt_checkpointclose.header.id =
2480 SERVICE_ID_MAKE (CKPT_SERVICE,
2481 MESSAGE_REQ_EXEC_CKPT_CHECKPOINTCLOSE);
2483 message_source_set (&req_exec_ckpt_checkpointclose.source, conn);
2485 memcpy (&req_exec_ckpt_checkpointclose.checkpoint_name,
2486 &req_lib_ckpt_checkpointclose->checkpoint_name, sizeof (mar_name_t));
2487 req_exec_ckpt_checkpointclose.ckpt_id =
2488 req_lib_ckpt_checkpointclose->ckpt_id;
2490 iovec.iov_base = (char *)&req_exec_ckpt_checkpointclose;
2491 iovec.iov_len = sizeof (req_exec_ckpt_checkpointclose);
2493 ckpt_checkpoint_remove_cleanup (
2494 conn,
2495 req_lib_ckpt_checkpointclose->checkpoint_name,
2496 req_lib_ckpt_checkpointclose->ckpt_id);
2497 assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0);
2500 static void message_handler_req_lib_ckpt_checkpointunlink (
2501 void *conn,
2502 void *msg)
2504 struct req_lib_ckpt_checkpointunlink *req_lib_ckpt_checkpointunlink = (struct req_lib_ckpt_checkpointunlink *)msg;
2505 struct req_exec_ckpt_checkpointunlink req_exec_ckpt_checkpointunlink;
2506 struct iovec iovec;
2508 req_exec_ckpt_checkpointunlink.header.size =
2509 sizeof (struct req_exec_ckpt_checkpointunlink);
2510 req_exec_ckpt_checkpointunlink.header.id =
2511 SERVICE_ID_MAKE (CKPT_SERVICE, MESSAGE_REQ_EXEC_CKPT_CHECKPOINTUNLINK);
2513 message_source_set (&req_exec_ckpt_checkpointunlink.source, conn);
2515 memcpy (&req_exec_ckpt_checkpointunlink.checkpoint_name,
2516 &req_lib_ckpt_checkpointunlink->checkpoint_name,
2517 sizeof (mar_name_t));
2519 iovec.iov_base = (char *)&req_exec_ckpt_checkpointunlink;
2520 iovec.iov_len = sizeof (req_exec_ckpt_checkpointunlink);
2522 assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
2523 TOTEMPG_AGREED) == 0);
2526 static void message_handler_req_lib_ckpt_checkpointretentiondurationset (
2527 void *conn,
2528 void *msg)
2530 struct req_lib_ckpt_checkpointretentiondurationset *req_lib_ckpt_checkpointretentiondurationset = (struct req_lib_ckpt_checkpointretentiondurationset *)msg;
2531 struct req_exec_ckpt_checkpointretentiondurationset req_exec_ckpt_checkpointretentiondurationset;
2532 struct iovec iovec;
2534 log_printf (LOG_LEVEL_DEBUG, "DURATION SET FROM API conn %p\n", conn);
2535 req_exec_ckpt_checkpointretentiondurationset.header.id =
2536 SERVICE_ID_MAKE (CKPT_SERVICE,
2537 MESSAGE_REQ_EXEC_CKPT_CHECKPOINTRETENTIONDURATIONSET);
2538 req_exec_ckpt_checkpointretentiondurationset.header.size = sizeof (struct req_exec_ckpt_checkpointretentiondurationset);
2540 message_source_set (&req_exec_ckpt_checkpointretentiondurationset.source, conn);
2541 memcpy (&req_exec_ckpt_checkpointretentiondurationset.checkpoint_name,
2542 &req_lib_ckpt_checkpointretentiondurationset->checkpoint_name,
2543 sizeof (mar_name_t));
2544 req_exec_ckpt_checkpointretentiondurationset.ckpt_id =
2545 req_lib_ckpt_checkpointretentiondurationset->ckpt_id;
2546 req_exec_ckpt_checkpointretentiondurationset.retention_duration =
2547 req_lib_ckpt_checkpointretentiondurationset->retention_duration;
2549 iovec.iov_base = (char *)&req_exec_ckpt_checkpointretentiondurationset;
2550 iovec.iov_len = sizeof (req_exec_ckpt_checkpointretentiondurationset);
2552 assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
2553 TOTEMPG_AGREED) == 0);
2556 static void message_handler_req_lib_ckpt_activereplicaset (
2557 void *conn,
2558 void *msg)
2560 struct req_lib_ckpt_activereplicaset *req_lib_ckpt_activereplicaset = (struct req_lib_ckpt_activereplicaset *)msg;
2561 struct res_lib_ckpt_activereplicaset res_lib_ckpt_activereplicaset;
2562 struct checkpoint *checkpoint;
2563 SaAisErrorT error = SA_AIS_OK;
2565 checkpoint = checkpoint_find (
2566 &checkpoint_list_head,
2567 &req_lib_ckpt_activereplicaset->checkpoint_name,
2568 req_lib_ckpt_activereplicaset->ckpt_id);
2571 * Make sure checkpoint is collocated and async update option
2573 if (((checkpoint->checkpoint_creation_attributes.creation_flags & SA_CKPT_CHECKPOINT_COLLOCATED) == 0) ||
2574 (checkpoint->checkpoint_creation_attributes.creation_flags & (SA_CKPT_WR_ACTIVE_REPLICA | SA_CKPT_WR_ACTIVE_REPLICA_WEAK)) == 0) {
2575 error = SA_AIS_ERR_BAD_OPERATION;
2577 checkpoint->active_replica_set = 1;
2578 res_lib_ckpt_activereplicaset.header.size = sizeof (struct res_lib_ckpt_activereplicaset);
2579 res_lib_ckpt_activereplicaset.header.id = MESSAGE_RES_CKPT_ACTIVEREPLICASET;
2580 res_lib_ckpt_activereplicaset.header.error = error;
2582 openais_conn_send_response (
2583 conn,
2584 &res_lib_ckpt_activereplicaset,
2585 sizeof (struct res_lib_ckpt_activereplicaset));
2588 static void message_handler_req_lib_ckpt_checkpointstatusget (
2589 void *conn,
2590 void *msg)
2592 struct req_lib_ckpt_checkpointstatusget *req_lib_ckpt_checkpointstatusget = (struct req_lib_ckpt_checkpointstatusget *)msg;
2593 struct res_lib_ckpt_checkpointstatusget res_lib_ckpt_checkpointstatusget;
2594 struct checkpoint *checkpoint;
2595 int memory_used = 0;
2596 int number_of_sections = 0;
2597 struct list_head *checkpoint_section_list;
2598 struct checkpoint_section *checkpointSection;
2601 * Count memory used by checkpoint sections
2603 checkpoint = checkpoint_find (
2604 &checkpoint_list_head,
2605 &req_lib_ckpt_checkpointstatusget->checkpoint_name,
2606 req_lib_ckpt_checkpointstatusget->ckpt_id);
2608 if (checkpoint) {
2610 for (checkpoint_section_list = checkpoint->sections_list_head.next;
2611 checkpoint_section_list != &checkpoint->sections_list_head;
2612 checkpoint_section_list = checkpoint_section_list->next) {
2614 checkpointSection = list_entry (checkpoint_section_list,
2615 struct checkpoint_section, list);
2617 memory_used += checkpointSection->section_descriptor.section_size;
2618 number_of_sections += 1;
2622 * Build checkpoint status get response
2624 res_lib_ckpt_checkpointstatusget.header.size = sizeof (struct res_lib_ckpt_checkpointstatusget);
2625 res_lib_ckpt_checkpointstatusget.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSTATUSGET;
2626 if (checkpoint->active_replica_set == 1) {
2627 res_lib_ckpt_checkpointstatusget.header.error = SA_AIS_OK;
2628 } else {
2629 res_lib_ckpt_checkpointstatusget.header.error = SA_AIS_ERR_NOT_EXIST;
2632 memcpy (&res_lib_ckpt_checkpointstatusget.checkpoint_descriptor.checkpoint_creation_attributes,
2633 &checkpoint->checkpoint_creation_attributes,
2634 sizeof (mar_ckpt_checkpoint_creation_attributes_t));
2635 res_lib_ckpt_checkpointstatusget.checkpoint_descriptor.number_of_sections = number_of_sections;
2636 res_lib_ckpt_checkpointstatusget.checkpoint_descriptor.memory_used = memory_used;
2638 else {
2639 log_printf (LOG_LEVEL_ERROR, "#### Could Not Find the Checkpoint's status so Returning Error. ####\n");
2641 res_lib_ckpt_checkpointstatusget.header.size = sizeof (struct res_lib_ckpt_checkpointstatusget);
2642 res_lib_ckpt_checkpointstatusget.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSTATUSGET;
2643 res_lib_ckpt_checkpointstatusget.header.error = SA_AIS_ERR_NOT_EXIST;
2645 openais_conn_send_response (
2646 conn,
2647 &res_lib_ckpt_checkpointstatusget,
2648 sizeof (struct res_lib_ckpt_checkpointstatusget));
2651 static void message_handler_req_lib_ckpt_sectioncreate (
2652 void *conn,
2653 void *msg)
2655 struct req_lib_ckpt_sectioncreate *req_lib_ckpt_sectioncreate = (struct req_lib_ckpt_sectioncreate *)msg;
2656 struct req_exec_ckpt_sectioncreate req_exec_ckpt_sectioncreate;
2657 struct iovec iovecs[2];
2659 log_printf (LOG_LEVEL_DEBUG, "Section create from conn %p\n", conn);
2661 req_exec_ckpt_sectioncreate.header.id =
2662 SERVICE_ID_MAKE (CKPT_SERVICE,
2663 MESSAGE_REQ_EXEC_CKPT_SECTIONCREATE);
2664 req_exec_ckpt_sectioncreate.header.size = sizeof (struct req_exec_ckpt_sectioncreate);
2666 message_source_set (&req_exec_ckpt_sectioncreate.source, conn);
2668 memcpy (&req_exec_ckpt_sectioncreate.checkpoint_name,
2669 &req_lib_ckpt_sectioncreate->checkpoint_name,
2670 sizeof (mar_name_t));
2671 req_exec_ckpt_sectioncreate.ckpt_id =
2672 req_lib_ckpt_sectioncreate->ckpt_id;
2673 req_exec_ckpt_sectioncreate.id_len = req_lib_ckpt_sectioncreate->id_len;
2674 req_exec_ckpt_sectioncreate.expiration_time =
2675 req_lib_ckpt_sectioncreate->expiration_time;
2676 req_exec_ckpt_sectioncreate.initial_data_size =
2677 req_lib_ckpt_sectioncreate->initial_data_size;
2679 iovecs[0].iov_base = (char *)&req_exec_ckpt_sectioncreate;
2680 iovecs[0].iov_len = sizeof (req_exec_ckpt_sectioncreate);
2683 * Send section name and initial data in message
2685 iovecs[1].iov_base = ((char *)req_lib_ckpt_sectioncreate) + sizeof (struct req_lib_ckpt_sectioncreate);
2686 iovecs[1].iov_len = req_lib_ckpt_sectioncreate->header.size - sizeof (struct req_lib_ckpt_sectioncreate);
2687 req_exec_ckpt_sectioncreate.header.size += iovecs[1].iov_len;
2689 if (iovecs[1].iov_len) {
2690 log_printf (LOG_LEVEL_DEBUG, "message_handler_req_lib_ckpt_sectioncreate Section = %p, id_len = %d\n",
2691 iovecs[1].iov_base,
2692 (int)iovecs[1].iov_len);
2695 if (iovecs[1].iov_len > 0) {
2696 log_printf (LOG_LEVEL_DEBUG, "IOV_BASE is %p\n", iovecs[1].iov_base);
2697 assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0);
2698 } else {
2699 assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
2703 static void message_handler_req_lib_ckpt_sectiondelete (
2704 void *conn,
2705 void *msg)
2707 struct req_lib_ckpt_sectiondelete *req_lib_ckpt_sectiondelete = (struct req_lib_ckpt_sectiondelete *)msg;
2708 struct req_exec_ckpt_sectiondelete req_exec_ckpt_sectiondelete;
2709 struct iovec iovecs[2];
2711 log_printf (LOG_LEVEL_DEBUG, "section delete from conn %p\n", conn);
2713 req_exec_ckpt_sectiondelete.header.id =
2714 SERVICE_ID_MAKE (CKPT_SERVICE,
2715 MESSAGE_REQ_EXEC_CKPT_SECTIONDELETE);
2716 req_exec_ckpt_sectiondelete.header.size = sizeof (struct req_exec_ckpt_sectiondelete);
2718 message_source_set (&req_exec_ckpt_sectiondelete.source, conn);
2720 memcpy (&req_exec_ckpt_sectiondelete.checkpoint_name,
2721 &req_lib_ckpt_sectiondelete->checkpoint_name,
2722 sizeof (mar_name_t));
2723 req_exec_ckpt_sectiondelete.ckpt_id =
2724 req_lib_ckpt_sectiondelete->ckpt_id;
2725 req_exec_ckpt_sectiondelete.id_len = req_lib_ckpt_sectiondelete->id_len;
2727 iovecs[0].iov_base = (char *)&req_exec_ckpt_sectiondelete;
2728 iovecs[0].iov_len = sizeof (req_exec_ckpt_sectiondelete);
2731 * Send section name
2733 iovecs[1].iov_base = ((char *)req_lib_ckpt_sectiondelete) +
2734 sizeof (struct req_lib_ckpt_sectiondelete);
2735 iovecs[1].iov_len = req_lib_ckpt_sectiondelete->header.size -
2736 sizeof (struct req_lib_ckpt_sectiondelete);
2737 req_exec_ckpt_sectiondelete.header.size += iovecs[1].iov_len;
2739 if (iovecs[1].iov_len > 0) {
2740 assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0);
2741 } else {
2742 assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
2746 static void message_handler_req_lib_ckpt_sectionexpirationtimeset (
2747 void *conn,
2748 void *msg)
2750 struct req_lib_ckpt_sectionexpirationtimeset *req_lib_ckpt_sectionexpirationtimeset = (struct req_lib_ckpt_sectionexpirationtimeset *)msg;
2751 struct req_exec_ckpt_sectionexpirationtimeset req_exec_ckpt_sectionexpirationtimeset;
2752 struct iovec iovecs[2];
2754 log_printf (LOG_LEVEL_DEBUG, "section expiration time set from conn %p\n", conn);
2755 req_exec_ckpt_sectionexpirationtimeset.header.id =
2756 SERVICE_ID_MAKE (CKPT_SERVICE,
2757 MESSAGE_REQ_EXEC_CKPT_SECTIONEXPIRATIONTIMESET);
2758 req_exec_ckpt_sectionexpirationtimeset.header.size = sizeof (struct req_exec_ckpt_sectionexpirationtimeset);
2760 message_source_set (&req_exec_ckpt_sectionexpirationtimeset.source, conn);
2762 memcpy (&req_exec_ckpt_sectionexpirationtimeset.checkpoint_name,
2763 &req_lib_ckpt_sectionexpirationtimeset->checkpoint_name,
2764 sizeof (mar_name_t));
2765 req_exec_ckpt_sectionexpirationtimeset.ckpt_id =
2766 req_lib_ckpt_sectionexpirationtimeset->ckpt_id;
2767 req_exec_ckpt_sectionexpirationtimeset.id_len =
2768 req_lib_ckpt_sectionexpirationtimeset->id_len;
2769 req_exec_ckpt_sectionexpirationtimeset.expiration_time =
2770 req_lib_ckpt_sectionexpirationtimeset->expiration_time;
2772 iovecs[0].iov_base = (char *)&req_exec_ckpt_sectionexpirationtimeset;
2773 iovecs[0].iov_len = sizeof (req_exec_ckpt_sectionexpirationtimeset);
2776 * Send section name
2778 iovecs[1].iov_base = ((char *)req_lib_ckpt_sectionexpirationtimeset) +
2779 sizeof (struct req_lib_ckpt_sectionexpirationtimeset);
2780 iovecs[1].iov_len = req_lib_ckpt_sectionexpirationtimeset->header.size -
2781 sizeof (struct req_lib_ckpt_sectionexpirationtimeset);
2782 req_exec_ckpt_sectionexpirationtimeset.header.size += iovecs[1].iov_len;
2784 if (iovecs[1].iov_len > 0) {
2785 assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0);
2786 } else {
2787 assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
2791 static void message_handler_req_lib_ckpt_sectionwrite (
2792 void *conn,
2793 void *msg)
2795 struct req_lib_ckpt_sectionwrite *req_lib_ckpt_sectionwrite = (struct req_lib_ckpt_sectionwrite *)msg;
2796 struct req_exec_ckpt_sectionwrite req_exec_ckpt_sectionwrite;
2797 struct iovec iovecs[2];
2799 log_printf (LOG_LEVEL_DEBUG, "Received data from lib with len = %d and ref = 0x%lx\n",
2800 (int)req_lib_ckpt_sectionwrite->data_size,
2801 (long)req_lib_ckpt_sectionwrite->data_offset);
2803 log_printf (LOG_LEVEL_DEBUG, "Checkpoint section being written to is %s, id_len = %d\n",
2804 ((char *)req_lib_ckpt_sectionwrite) +
2805 sizeof (struct req_lib_ckpt_sectionwrite),
2806 req_lib_ckpt_sectionwrite->id_len);
2808 log_printf (LOG_LEVEL_DEBUG, "Section write from conn %p\n", conn);
2811 * checkpoint opened is writeable mode so send message to cluster
2813 req_exec_ckpt_sectionwrite.header.id =
2814 SERVICE_ID_MAKE (CKPT_SERVICE,
2815 MESSAGE_REQ_EXEC_CKPT_SECTIONWRITE);
2816 req_exec_ckpt_sectionwrite.header.size =
2817 sizeof (struct req_exec_ckpt_sectionwrite);
2819 message_source_set (&req_exec_ckpt_sectionwrite.source, conn);
2821 memcpy (&req_exec_ckpt_sectionwrite.checkpoint_name,
2822 &req_lib_ckpt_sectionwrite->checkpoint_name,
2823 sizeof (mar_name_t));
2824 req_exec_ckpt_sectionwrite.ckpt_id =
2825 req_lib_ckpt_sectionwrite->ckpt_id;
2826 req_exec_ckpt_sectionwrite.id_len =
2827 req_lib_ckpt_sectionwrite->id_len;
2828 req_exec_ckpt_sectionwrite.data_offset =
2829 req_lib_ckpt_sectionwrite->data_offset;
2830 req_exec_ckpt_sectionwrite.data_size =
2831 req_lib_ckpt_sectionwrite->data_size;
2833 iovecs[0].iov_base = (char *)&req_exec_ckpt_sectionwrite;
2834 iovecs[0].iov_len = sizeof (req_exec_ckpt_sectionwrite);
2837 * Send section name and data to write in message
2839 iovecs[1].iov_base = ((char *)req_lib_ckpt_sectionwrite) +
2840 sizeof (struct req_lib_ckpt_sectionwrite);
2841 iovecs[1].iov_len = req_lib_ckpt_sectionwrite->header.size -
2842 sizeof (struct req_lib_ckpt_sectionwrite);
2843 req_exec_ckpt_sectionwrite.header.size += iovecs[1].iov_len;
2845 if (iovecs[1].iov_len > 0) {
2846 assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0);
2847 } else {
2848 assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
2852 static void message_handler_req_lib_ckpt_sectionoverwrite (
2853 void *conn,
2854 void *msg)
2856 struct req_lib_ckpt_sectionoverwrite *req_lib_ckpt_sectionoverwrite = (struct req_lib_ckpt_sectionoverwrite *)msg;
2857 struct req_exec_ckpt_sectionoverwrite req_exec_ckpt_sectionoverwrite;
2858 struct iovec iovecs[2];
2860 log_printf (LOG_LEVEL_DEBUG, "Section overwrite from conn %p\n", conn);
2862 req_exec_ckpt_sectionoverwrite.header.id =
2863 SERVICE_ID_MAKE (CKPT_SERVICE,
2864 MESSAGE_REQ_EXEC_CKPT_SECTIONOVERWRITE);
2865 req_exec_ckpt_sectionoverwrite.header.size =
2866 sizeof (struct req_exec_ckpt_sectionoverwrite);
2867 message_source_set (&req_exec_ckpt_sectionoverwrite.source, conn);
2868 memcpy (&req_exec_ckpt_sectionoverwrite.checkpoint_name,
2869 &req_lib_ckpt_sectionoverwrite->checkpoint_name,
2870 sizeof (mar_name_t));
2871 req_exec_ckpt_sectionoverwrite.ckpt_id =
2872 req_lib_ckpt_sectionoverwrite->ckpt_id;
2873 req_exec_ckpt_sectionoverwrite.id_len =
2874 req_lib_ckpt_sectionoverwrite->id_len;
2875 req_exec_ckpt_sectionoverwrite.data_size =
2876 req_lib_ckpt_sectionoverwrite->data_size;
2878 iovecs[0].iov_base = (char *)&req_exec_ckpt_sectionoverwrite;
2879 iovecs[0].iov_len = sizeof (req_exec_ckpt_sectionoverwrite);
2882 * Send section name and data to overwrite in message
2884 iovecs[1].iov_base = ((char *)req_lib_ckpt_sectionoverwrite) +
2885 sizeof (struct req_lib_ckpt_sectionoverwrite);
2886 iovecs[1].iov_len = req_lib_ckpt_sectionoverwrite->header.size -
2887 sizeof (struct req_lib_ckpt_sectionoverwrite);
2888 req_exec_ckpt_sectionoverwrite.header.size += iovecs[1].iov_len;
2890 if (iovecs[1].iov_len > 0) {
2891 assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0);
2892 } else {
2893 assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
2897 static void message_handler_req_lib_ckpt_sectionread (
2898 void *conn,
2899 void *msg)
2901 struct req_lib_ckpt_sectionread *req_lib_ckpt_sectionread = (struct req_lib_ckpt_sectionread *)msg;
2902 struct req_exec_ckpt_sectionread req_exec_ckpt_sectionread;
2903 struct iovec iovecs[2];
2905 log_printf (LOG_LEVEL_DEBUG, "Section read from conn %p\n", conn);
2908 * checkpoint opened is writeable mode so send message to cluster
2910 req_exec_ckpt_sectionread.header.id =
2911 SERVICE_ID_MAKE (CKPT_SERVICE,
2912 MESSAGE_REQ_EXEC_CKPT_SECTIONREAD);
2913 req_exec_ckpt_sectionread.header.size =
2914 sizeof (struct req_exec_ckpt_sectionread);
2915 message_source_set (&req_exec_ckpt_sectionread.source, conn);
2916 memcpy (&req_exec_ckpt_sectionread.checkpoint_name,
2917 &req_lib_ckpt_sectionread->checkpoint_name,
2918 sizeof (mar_name_t));
2919 req_exec_ckpt_sectionread.ckpt_id =
2920 req_lib_ckpt_sectionread->ckpt_id;
2921 req_exec_ckpt_sectionread.id_len =
2922 req_lib_ckpt_sectionread->id_len;
2923 req_exec_ckpt_sectionread.data_offset =
2924 req_lib_ckpt_sectionread->data_offset;
2925 req_exec_ckpt_sectionread.data_size =
2926 req_lib_ckpt_sectionread->data_size;
2928 iovecs[0].iov_base = (char *)&req_exec_ckpt_sectionread;
2929 iovecs[0].iov_len = sizeof (req_exec_ckpt_sectionread);
2932 * Send section name and data to overwrite in message
2934 iovecs[1].iov_base = ((char *)req_lib_ckpt_sectionread) +
2935 sizeof (struct req_lib_ckpt_sectionread);
2936 iovecs[1].iov_len = req_lib_ckpt_sectionread->header.size -
2937 sizeof (struct req_lib_ckpt_sectionread);
2938 req_exec_ckpt_sectionread.header.size += iovecs[1].iov_len;
2940 if (iovecs[1].iov_len > 0) {
2941 assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0);
2942 } else {
2943 assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
2947 static void message_handler_req_lib_ckpt_checkpointsynchronize (
2948 void *conn,
2949 void *msg)
2951 struct req_lib_ckpt_checkpointsynchronize *req_lib_ckpt_checkpointsynchronize = (struct req_lib_ckpt_checkpointsynchronize *)msg;
2952 struct res_lib_ckpt_checkpointsynchronize res_lib_ckpt_checkpointsynchronize;
2953 struct checkpoint *checkpoint;
2955 checkpoint = checkpoint_find (
2956 &checkpoint_list_head,
2957 &req_lib_ckpt_checkpointsynchronize->checkpoint_name,
2958 req_lib_ckpt_checkpointsynchronize->ckpt_id);
2959 if ((checkpoint->checkpoint_creation_attributes.creation_flags & (SA_CKPT_WR_ACTIVE_REPLICA | SA_CKPT_WR_ACTIVE_REPLICA_WEAK)) == 0) {
2960 res_lib_ckpt_checkpointsynchronize.header.error = SA_AIS_ERR_BAD_OPERATION;
2961 } else
2962 if (checkpoint->active_replica_set == 1) {
2963 res_lib_ckpt_checkpointsynchronize.header.error = SA_AIS_OK;
2964 } else {
2965 res_lib_ckpt_checkpointsynchronize.header.error = SA_AIS_ERR_NOT_EXIST;
2968 res_lib_ckpt_checkpointsynchronize.header.size = sizeof (struct res_lib_ckpt_checkpointsynchronize);
2969 res_lib_ckpt_checkpointsynchronize.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZE;
2971 openais_conn_send_response (
2972 conn,
2973 &res_lib_ckpt_checkpointsynchronize,
2974 sizeof (struct res_lib_ckpt_checkpointsynchronize));
2977 static void message_handler_req_lib_ckpt_checkpointsynchronizeasync (
2978 void *conn,
2979 void *msg)
2981 struct req_lib_ckpt_checkpointsynchronizeasync *req_lib_ckpt_checkpointsynchronizeasync = (struct req_lib_ckpt_checkpointsynchronizeasync *)msg;
2982 struct res_lib_ckpt_checkpointsynchronizeasync res_lib_ckpt_checkpointsynchronizeasync;
2983 struct checkpoint *checkpoint;
2985 checkpoint = checkpoint_find (
2986 &checkpoint_list_head,
2987 &req_lib_ckpt_checkpointsynchronizeasync->checkpoint_name,
2988 req_lib_ckpt_checkpointsynchronizeasync->ckpt_id);
2989 if ((checkpoint->checkpoint_creation_attributes.creation_flags & (SA_CKPT_WR_ACTIVE_REPLICA | SA_CKPT_WR_ACTIVE_REPLICA_WEAK)) == 0) {
2990 res_lib_ckpt_checkpointsynchronizeasync.header.error = SA_AIS_ERR_BAD_OPERATION;
2991 } else
2992 if (checkpoint->active_replica_set == 1) {
2993 res_lib_ckpt_checkpointsynchronizeasync.header.error = SA_AIS_OK;
2994 } else {
2995 res_lib_ckpt_checkpointsynchronizeasync.header.error = SA_AIS_ERR_NOT_EXIST;
2998 res_lib_ckpt_checkpointsynchronizeasync.header.size = sizeof (struct res_lib_ckpt_checkpointsynchronizeasync);
2999 res_lib_ckpt_checkpointsynchronizeasync.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZEASYNC;
3000 res_lib_ckpt_checkpointsynchronizeasync.invocation = req_lib_ckpt_checkpointsynchronizeasync->invocation;
3002 openais_conn_send_response (
3003 conn,
3004 &res_lib_ckpt_checkpointsynchronizeasync,
3005 sizeof (struct res_lib_ckpt_checkpointsynchronizeasync));
3007 openais_conn_send_response (
3008 openais_conn_partner_get (conn),
3009 &res_lib_ckpt_checkpointsynchronizeasync,
3010 sizeof (struct res_lib_ckpt_checkpointsynchronizeasync));
3013 static void message_handler_req_lib_ckpt_sectioniterationinitialize (
3014 void *conn,
3015 void *msg)
3017 struct req_lib_ckpt_sectioniterationinitialize *req_lib_ckpt_sectioniterationinitialize = (struct req_lib_ckpt_sectioniterationinitialize *)msg;
3018 struct res_lib_ckpt_sectioniterationinitialize res_lib_ckpt_sectioniterationinitialize;
3019 struct checkpoint *checkpoint;
3020 struct checkpoint_section *checkpoint_section;
3021 struct iteration_entry *iteration_entries;
3022 struct list_head *section_list;
3023 struct iteration_instance *iteration_instance;
3024 void *iteration_instance_p;
3025 unsigned int iteration_handle = 0;
3026 int res;
3027 SaAisErrorT error = SA_AIS_OK;
3029 struct ckpt_pd *ckpt_pd = (struct ckpt_pd *)openais_conn_private_data_get (conn);
3031 log_printf (LOG_LEVEL_DEBUG, "section iteration initialize\n");
3033 checkpoint = checkpoint_find (
3034 &checkpoint_list_head,
3035 &req_lib_ckpt_sectioniterationinitialize->checkpoint_name,
3036 req_lib_ckpt_sectioniterationinitialize->ckpt_id);
3037 if (checkpoint == 0) {
3038 error = SA_AIS_ERR_NOT_EXIST;
3039 goto error_exit;
3042 if (checkpoint->active_replica_set == 0) {
3043 log_printf (LOG_LEVEL_DEBUG, "iterationinitialize: no active replica, returning error.\n");
3044 error = SA_AIS_ERR_NOT_EXIST;
3045 goto error_exit;
3048 res = hdb_handle_create (&ckpt_pd->iteration_hdb, sizeof(struct iteration_instance),
3049 &iteration_handle);
3050 if (res != 0) {
3051 goto error_exit;
3054 res = hdb_handle_get (&ckpt_pd->iteration_hdb, iteration_handle,
3055 &iteration_instance_p);
3056 if (res != 0) {
3057 hdb_handle_destroy (&ckpt_pd->iteration_hdb, iteration_handle);
3058 goto error_exit;
3060 iteration_instance = (struct iteration_instance *)iteration_instance_p;
3062 iteration_instance->iteration_entries = NULL;
3063 iteration_instance->iteration_entries_count = 0;
3064 iteration_instance->iteration_pos = 0;
3066 memcpy (&iteration_instance->checkpoint_name,
3067 &req_lib_ckpt_sectioniterationinitialize->checkpoint_name,
3068 sizeof (mar_name_t));
3069 iteration_instance->ckpt_id =
3070 req_lib_ckpt_sectioniterationinitialize->ckpt_id;
3073 * Iterate list of checkpoint sections
3075 for (section_list = checkpoint->sections_list_head.next;
3076 section_list != &checkpoint->sections_list_head;
3077 section_list = section_list->next) {
3079 checkpoint_section = list_entry (section_list,
3080 struct checkpoint_section, list);
3082 switch (req_lib_ckpt_sectioniterationinitialize->sections_chosen) {
3083 case SA_CKPT_SECTIONS_FOREVER:
3084 if (checkpoint_section->section_descriptor.expiration_time != SA_TIME_END) {
3085 continue;
3087 break;
3088 case SA_CKPT_SECTIONS_LEQ_EXPIRATION_TIME:
3089 if (checkpoint_section->section_descriptor.expiration_time > req_lib_ckpt_sectioniterationinitialize->expiration_time) {
3090 continue;
3092 break;
3093 case SA_CKPT_SECTIONS_GEQ_EXPIRATION_TIME:
3094 if (checkpoint_section->section_descriptor.expiration_time < req_lib_ckpt_sectioniterationinitialize->expiration_time) {
3095 continue;
3097 break;
3098 case SA_CKPT_SECTIONS_CORRUPTED:
3099 /* there can be no corrupted sections - do nothing */
3100 break;
3101 case SA_CKPT_SECTIONS_ANY:
3102 /* iterate all sections - do nothing */
3103 break;
3105 iteration_entries = realloc (
3106 iteration_instance->iteration_entries,
3107 sizeof (struct iteration_entry) *
3108 (iteration_instance->iteration_entries_count + 1));
3109 if (iteration_entries == NULL) {
3110 error = SA_AIS_ERR_NO_MEMORY;
3111 goto error_put;
3113 iteration_instance->iteration_entries = iteration_entries;
3115 iteration_entries[iteration_instance->iteration_entries_count].section_id =
3116 malloc (checkpoint_section->section_descriptor.section_id.id_len);
3117 assert (iteration_entries[iteration_instance->iteration_entries_count].section_id);
3118 memcpy (iteration_entries[iteration_instance->iteration_entries_count].section_id,
3119 checkpoint_section->section_descriptor.section_id.id,
3120 checkpoint_section->section_descriptor.section_id.id_len);
3121 iteration_entries[iteration_instance->iteration_entries_count].section_id_len = checkpoint_section->section_descriptor.section_id.id_len;
3122 iteration_instance->iteration_entries_count += 1;
3126 error_put:
3127 hdb_handle_put (&ckpt_pd->iteration_hdb, iteration_handle);
3129 error_exit:
3130 res_lib_ckpt_sectioniterationinitialize.header.size = sizeof (struct res_lib_ckpt_sectioniterationinitialize);
3131 res_lib_ckpt_sectioniterationinitialize.header.id = MESSAGE_RES_CKPT_SECTIONITERATIONINITIALIZE;
3132 res_lib_ckpt_sectioniterationinitialize.header.error = error;
3133 res_lib_ckpt_sectioniterationinitialize.iteration_handle = iteration_handle;
3134 res_lib_ckpt_sectioniterationinitialize.max_section_id_size =
3135 checkpoint->checkpoint_creation_attributes.max_section_id_size;
3137 openais_conn_send_response (
3138 conn,
3139 &res_lib_ckpt_sectioniterationinitialize,
3140 sizeof (struct res_lib_ckpt_sectioniterationinitialize));
3143 static void message_handler_req_lib_ckpt_sectioniterationfinalize (
3144 void *conn,
3145 void *msg)
3147 struct req_lib_ckpt_sectioniterationfinalize *req_lib_ckpt_sectioniterationfinalize = (struct req_lib_ckpt_sectioniterationfinalize *)msg;
3148 struct res_lib_ckpt_sectioniterationfinalize res_lib_ckpt_sectioniterationfinalize;
3149 SaAisErrorT error = SA_AIS_OK;
3150 struct iteration_instance *iteration_instance;
3151 void *iteration_instance_p;
3152 unsigned int res;
3154 struct ckpt_pd *ckpt_pd = (struct ckpt_pd *)openais_conn_private_data_get (conn);
3156 res = hdb_handle_get (&ckpt_pd->iteration_hdb,
3157 req_lib_ckpt_sectioniterationfinalize->iteration_handle,
3158 &iteration_instance_p);
3159 if (res != 0) {
3160 error = SA_AIS_ERR_LIBRARY;
3161 goto error_exit;
3163 iteration_instance = (struct iteration_instance *)iteration_instance_p;
3165 free (iteration_instance->iteration_entries);
3167 hdb_handle_put (&ckpt_pd->iteration_hdb,
3168 req_lib_ckpt_sectioniterationfinalize->iteration_handle);
3170 hdb_handle_destroy (&ckpt_pd->iteration_hdb,
3171 req_lib_ckpt_sectioniterationfinalize->iteration_handle);
3173 error_exit:
3174 res_lib_ckpt_sectioniterationfinalize.header.size = sizeof (struct res_lib_ckpt_sectioniterationfinalize);
3175 res_lib_ckpt_sectioniterationfinalize.header.id = MESSAGE_RES_CKPT_SECTIONITERATIONFINALIZE;
3176 res_lib_ckpt_sectioniterationfinalize.header.error = error;
3178 openais_conn_send_response (
3179 conn,
3180 &res_lib_ckpt_sectioniterationfinalize,
3181 sizeof (struct res_lib_ckpt_sectioniterationfinalize));
3184 static void message_handler_req_lib_ckpt_sectioniterationnext (
3185 void *conn,
3186 void *msg)
3188 struct req_lib_ckpt_sectioniterationnext *req_lib_ckpt_sectioniterationnext = (struct req_lib_ckpt_sectioniterationnext *)msg;
3189 struct res_lib_ckpt_sectioniterationnext res_lib_ckpt_sectioniterationnext;
3190 SaAisErrorT error = SA_AIS_OK;
3191 int section_id_size = 0;
3192 unsigned int res;
3193 struct iteration_instance *iteration_instance = NULL;
3194 void *iteration_instance_p;
3195 struct checkpoint *checkpoint;
3196 struct checkpoint_section *checkpoint_section = NULL;
3198 struct ckpt_pd *ckpt_pd = (struct ckpt_pd *)openais_conn_private_data_get (conn);
3200 log_printf (LOG_LEVEL_DEBUG, "section iteration next\n");
3201 res = hdb_handle_get (&ckpt_pd->iteration_hdb,
3202 req_lib_ckpt_sectioniterationnext->iteration_handle,
3203 &iteration_instance_p);
3204 if (res != 0) {
3205 error = SA_AIS_ERR_LIBRARY;
3206 goto error_exit;
3209 iteration_instance = (struct iteration_instance *)iteration_instance_p;
3210 assert (iteration_instance);
3212 * Find active iteration entry
3214 for (;;) {
3216 * No more sections in iteration
3218 if (iteration_instance->iteration_pos == iteration_instance->iteration_entries_count) {
3219 error = SA_AIS_ERR_NO_SECTIONS;
3220 goto error_put;
3224 * Find the checkpoint section to respond to library
3226 checkpoint = checkpoint_find_specific (
3227 &checkpoint_list_head,
3228 &iteration_instance->checkpoint_name,
3229 iteration_instance->ckpt_id);
3231 assert (checkpoint);
3233 checkpoint_section = checkpoint_section_find (
3234 checkpoint,
3235 iteration_instance->iteration_entries[iteration_instance->iteration_pos].section_id,
3236 iteration_instance->iteration_entries[iteration_instance->iteration_pos].section_id_len);
3239 iteration_instance->iteration_pos += 1;
3241 * If checkpoint section found, then return it in iteration
3243 if (checkpoint_section) {
3244 section_id_size = checkpoint_section->section_descriptor.section_id.id_len;
3246 memcpy (&res_lib_ckpt_sectioniterationnext.section_descriptor,
3247 &checkpoint_section->section_descriptor,
3248 sizeof (mar_ckpt_section_descriptor_t));
3251 * This drops out of for loop
3253 break;
3258 error_put:
3259 hdb_handle_put (&ckpt_pd->iteration_hdb, req_lib_ckpt_sectioniterationnext->iteration_handle);
3261 error_exit:
3262 res_lib_ckpt_sectioniterationnext.header.size = sizeof (struct res_lib_ckpt_sectioniterationnext) + section_id_size;
3263 res_lib_ckpt_sectioniterationnext.header.id = MESSAGE_RES_CKPT_SECTIONITERATIONNEXT;
3264 res_lib_ckpt_sectioniterationnext.header.error = error;
3266 openais_conn_send_response (
3267 conn,
3268 &res_lib_ckpt_sectioniterationnext,
3269 sizeof (struct res_lib_ckpt_sectioniterationnext));
3271 if (error == SA_AIS_OK) {
3272 openais_conn_send_response (
3273 conn,
3274 checkpoint_section->section_descriptor.section_id.id,
3275 checkpoint_section->section_descriptor.section_id.id_len);
3280 * Recovery after network partition or merge
3282 void sync_refcount_increment (
3283 struct checkpoint *checkpoint,
3284 unsigned int nodeid)
3286 unsigned int i;
3288 for (i = 0; i < PROCESSOR_COUNT_MAX; i++) {
3289 if (checkpoint->refcount_set[i].nodeid == 0) {
3290 checkpoint->refcount_set[i].nodeid = nodeid;
3291 checkpoint->refcount_set[i].refcount = 1;
3292 break;
3294 if (checkpoint->refcount_set[i].nodeid == nodeid) {
3295 checkpoint->refcount_set[i].refcount += 1;
3296 break;
3301 void sync_refcount_add (
3302 struct checkpoint *checkpoint,
3303 unsigned int nodeid,
3304 unsigned int count)
3306 unsigned int i;
3308 for (i = 0; i < PROCESSOR_COUNT_MAX; i++) {
3309 if (checkpoint->refcount_set[i].nodeid == 0) {
3310 checkpoint->refcount_set[i].nodeid = nodeid;
3311 checkpoint->refcount_set[i].refcount = count;
3312 break;
3314 if (checkpoint->refcount_set[i].nodeid == nodeid) {
3315 checkpoint->refcount_set[i].refcount += count;
3316 break;
3321 void sync_refcount_decrement (
3322 struct checkpoint *checkpoint,
3323 unsigned int nodeid)
3325 unsigned int i;
3327 for (i = 0; i < PROCESSOR_COUNT_MAX; i++) {
3328 if (checkpoint->refcount_set[i].nodeid == 0) {
3329 break;
3331 if (checkpoint->refcount_set[i].nodeid == nodeid) {
3332 checkpoint->refcount_set[i].refcount -= 1;
3333 break;
3339 * Sum all reference counts for the checkpoint
3341 void sync_refcount_calculate (
3342 struct checkpoint *checkpoint)
3344 checkpoint->reference_count = 0;
3345 unsigned int i;
3347 for (i = 0; i < PROCESSOR_COUNT_MAX; i++) {
3348 if (checkpoint->refcount_set[i].nodeid == 0) {
3349 break;
3352 checkpoint->reference_count += checkpoint->refcount_set[i].refcount;
3356 void sync_checkpoints_free (struct list_head *ckpt_list_head)
3358 struct checkpoint *checkpoint;
3359 struct list_head *list;
3361 list = ckpt_list_head->next;
3362 while (list != ckpt_list_head) {
3363 checkpoint = list_entry (list, struct checkpoint, list);
3364 list = list->next;
3365 checkpoint_release (checkpoint);
3367 list_init (ckpt_list_head);
3370 static inline void sync_checkpoints_enter (void)
3372 struct checkpoint *checkpoint;
3374 ENTER();
3376 my_sync_state = SYNC_STATE_CHECKPOINT;
3377 my_iteration_state = ITERATION_STATE_CHECKPOINT;
3378 my_iteration_state_checkpoint = checkpoint_list_head.next;
3380 checkpoint = list_entry (checkpoint_list_head.next, struct checkpoint,
3381 list);
3382 my_iteration_state_section = checkpoint->sections_list_head.next;
3384 LEAVE();
3387 static inline void sync_refcounts_enter (void)
3389 my_sync_state = SYNC_STATE_REFCOUNT;
3392 static void ckpt_sync_init (void)
3394 ENTER();
3396 sync_checkpoints_enter();
3398 LEAVE();
3401 static int sync_checkpoint_transmit (struct checkpoint *checkpoint)
3403 struct req_exec_ckpt_sync_checkpoint req_exec_ckpt_sync_checkpoint;
3404 struct iovec iovec;
3406 req_exec_ckpt_sync_checkpoint.header.size =
3407 sizeof (struct req_exec_ckpt_sync_checkpoint);
3408 req_exec_ckpt_sync_checkpoint.header.id =
3409 SERVICE_ID_MAKE (CKPT_SERVICE,
3410 MESSAGE_REQ_EXEC_CKPT_SYNCCHECKPOINT);
3412 memcpy (&req_exec_ckpt_sync_checkpoint.ring_id,
3413 &my_saved_ring_id, sizeof (struct memb_ring_id));
3415 memcpy (&req_exec_ckpt_sync_checkpoint.checkpoint_name,
3416 &checkpoint->name, sizeof (mar_name_t));
3418 req_exec_ckpt_sync_checkpoint.ckpt_id = checkpoint->ckpt_id;
3420 memcpy (&req_exec_ckpt_sync_checkpoint.checkpoint_creation_attributes,
3421 &checkpoint->checkpoint_creation_attributes,
3422 sizeof (mar_ckpt_checkpoint_creation_attributes_t));
3424 req_exec_ckpt_sync_checkpoint.active_replica_set =
3425 checkpoint->active_replica_set;
3427 req_exec_ckpt_sync_checkpoint.unlinked =
3428 checkpoint->unlinked;
3430 iovec.iov_base = (char *)&req_exec_ckpt_sync_checkpoint;
3431 iovec.iov_len = sizeof (req_exec_ckpt_sync_checkpoint);
3433 return (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED));
3436 static int sync_checkpoint_section_transmit (
3437 struct checkpoint *checkpoint,
3438 struct checkpoint_section *checkpoint_section)
3440 struct req_exec_ckpt_sync_checkpoint_section req_exec_ckpt_sync_checkpoint_section;
3441 struct iovec iovecs[3];
3443 ENTER();
3445 TRACE1 ("transmitting section\n");
3446 req_exec_ckpt_sync_checkpoint_section.header.size =
3447 sizeof (struct req_exec_ckpt_sync_checkpoint_section);
3448 req_exec_ckpt_sync_checkpoint_section.header.id =
3449 SERVICE_ID_MAKE (CKPT_SERVICE,
3450 MESSAGE_REQ_EXEC_CKPT_SYNCCHECKPOINTSECTION);
3452 memcpy (&req_exec_ckpt_sync_checkpoint_section.ring_id,
3453 &my_saved_ring_id, sizeof (struct memb_ring_id));
3455 memcpy (&req_exec_ckpt_sync_checkpoint_section.checkpoint_name,
3456 &checkpoint->name, sizeof (mar_name_t));
3458 req_exec_ckpt_sync_checkpoint_section.ckpt_id = checkpoint->ckpt_id;
3460 req_exec_ckpt_sync_checkpoint_section.id_len =
3461 checkpoint_section->section_descriptor.section_id.id_len;
3463 req_exec_ckpt_sync_checkpoint_section.section_size =
3464 checkpoint_section->section_descriptor.section_size;
3466 req_exec_ckpt_sync_checkpoint_section.section_size =
3467 checkpoint_section->section_descriptor.section_size;
3469 req_exec_ckpt_sync_checkpoint_section.expiration_time =
3470 checkpoint_section->section_descriptor.expiration_time;
3472 iovecs[0].iov_base = (char *)&req_exec_ckpt_sync_checkpoint_section;
3473 iovecs[0].iov_len = sizeof (req_exec_ckpt_sync_checkpoint_section);
3474 iovecs[1].iov_base = (char *)checkpoint_section->section_descriptor.section_id.id;
3475 iovecs[1].iov_len = checkpoint_section->section_descriptor.section_id.id_len;
3476 iovecs[2].iov_base = checkpoint_section->section_data;
3477 iovecs[2].iov_len = checkpoint_section->section_descriptor.section_size;
3479 LEAVE();
3480 return (totempg_groups_mcast_joined (openais_group_handle, iovecs, 3, TOTEMPG_AGREED));
3483 static int sync_checkpoint_refcount_transmit (
3484 struct checkpoint *checkpoint)
3486 struct req_exec_ckpt_sync_checkpoint_refcount req_exec_ckpt_sync_checkpoint_refcount;
3487 struct iovec iovec;
3489 ENTER();
3491 TRACE1 ("transmitting refcounts for checkpoints\n");
3492 req_exec_ckpt_sync_checkpoint_refcount.header.size =
3493 sizeof (struct req_exec_ckpt_sync_checkpoint_refcount);
3494 req_exec_ckpt_sync_checkpoint_refcount.header.id =
3495 SERVICE_ID_MAKE (CKPT_SERVICE,
3496 MESSAGE_REQ_EXEC_CKPT_SYNCCHECKPOINTREFCOUNT);
3498 memcpy (&req_exec_ckpt_sync_checkpoint_refcount.ring_id,
3499 &my_saved_ring_id, sizeof (struct memb_ring_id));
3501 memcpy (&req_exec_ckpt_sync_checkpoint_refcount.checkpoint_name,
3502 &checkpoint->name, sizeof (mar_name_t));
3504 req_exec_ckpt_sync_checkpoint_refcount.ckpt_id = checkpoint->ckpt_id;
3506 marshall_to_mar_refcount_set_t_all (
3507 req_exec_ckpt_sync_checkpoint_refcount.refcount_set,
3508 checkpoint->refcount_set);
3510 iovec.iov_base = (char *)&req_exec_ckpt_sync_checkpoint_refcount;
3511 iovec.iov_len = sizeof (struct req_exec_ckpt_sync_checkpoint_refcount);
3513 LEAVE();
3514 return (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED));
3517 unsigned int sync_checkpoints_iterate (void)
3519 struct checkpoint *checkpoint;
3520 struct checkpoint_section *checkpoint_section;
3521 struct list_head *checkpoint_list;
3522 struct list_head *section_list;
3523 unsigned int res = 0;
3525 for (checkpoint_list = checkpoint_list_head.next;
3526 checkpoint_list != &checkpoint_list_head;
3527 checkpoint_list = checkpoint_list->next) {
3529 checkpoint = list_entry (checkpoint_list, struct checkpoint, list);
3531 res = sync_checkpoint_transmit (checkpoint);
3532 if (res != 0) {
3533 break;
3535 for (section_list = checkpoint->sections_list_head.next;
3536 section_list != &checkpoint->sections_list_head;
3537 section_list = section_list->next) {
3539 checkpoint_section = list_entry (section_list, struct checkpoint_section, list);
3540 res = sync_checkpoint_section_transmit (checkpoint, checkpoint_section);
3543 return (res);
3546 unsigned int sync_refcounts_iterate (void)
3548 struct checkpoint *checkpoint;
3549 struct list_head *list;
3550 unsigned int res = 0;
3552 for (list = checkpoint_list_head.next;
3553 list != &checkpoint_list_head;
3554 list = list->next) {
3556 checkpoint = list_entry (list, struct checkpoint, list);
3558 res = sync_checkpoint_refcount_transmit (checkpoint);
3559 if (res != 0) {
3560 break;
3563 return (res);
3566 static int ckpt_sync_process (void)
3568 unsigned int done_queueing = 1;
3569 unsigned int continue_processing = 0;
3570 unsigned int res;
3572 ENTER();
3574 switch (my_sync_state) {
3575 case SYNC_STATE_CHECKPOINT:
3576 if (my_lowest_nodeid == totempg_my_nodeid_get()) {
3577 TRACE1 ("should transmit checkpoints because lowest member in old configuration.\n");
3578 res = sync_checkpoints_iterate ();
3580 if (res == 0) {
3581 done_queueing = 1;
3584 if (done_queueing) {
3585 sync_refcounts_enter ();
3589 * TODO recover current iteration state
3591 continue_processing = 1;
3592 break;
3594 case SYNC_STATE_REFCOUNT:
3595 done_queueing = 1;
3596 if (my_lowest_nodeid == totempg_my_nodeid_get()) {
3597 TRACE1 ("transmit refcounts because this processor is the lowest member in old configuration.\n");
3598 res = sync_refcounts_iterate ();
3600 if (done_queueing) {
3601 continue_processing = 0;
3603 break;
3606 LEAVE();
3607 return (continue_processing);
3610 static void ckpt_sync_activate (void)
3612 ENTER();
3614 sync_checkpoints_free (&checkpoint_list_head);
3616 list_init (&checkpoint_list_head);
3618 if (!list_empty (&sync_checkpoint_list_head)) {
3619 list_splice (&sync_checkpoint_list_head, &checkpoint_list_head);
3622 list_init (&sync_checkpoint_list_head);
3624 my_sync_state = SYNC_STATE_CHECKPOINT;
3626 LEAVE();
3629 static void ckpt_sync_abort (void)
3631 sync_checkpoints_free (&sync_checkpoint_list_head);
3634 static void message_handler_req_exec_ckpt_sync_checkpoint (
3635 void *message,
3636 unsigned int nodeid)
3638 struct req_exec_ckpt_sync_checkpoint *req_exec_ckpt_sync_checkpoint =
3639 (struct req_exec_ckpt_sync_checkpoint *)message;
3640 struct checkpoint *checkpoint = 0;
3642 ENTER();
3645 * Ignore messages from previous ring ids
3647 if (memcmp (&req_exec_ckpt_sync_checkpoint->ring_id,
3648 &my_saved_ring_id, sizeof (struct memb_ring_id)) != 0) {
3649 return;
3652 checkpoint = checkpoint_find_specific (
3653 &sync_checkpoint_list_head,
3654 &req_exec_ckpt_sync_checkpoint->checkpoint_name,
3655 req_exec_ckpt_sync_checkpoint->ckpt_id);
3658 * If checkpoint doesn't exist, create one
3660 if (checkpoint == 0) {
3661 checkpoint = malloc (sizeof (struct checkpoint));
3662 if (checkpoint == 0) {
3663 LEAVE();
3664 openais_exit_error (AIS_DONE_OUT_OF_MEMORY);
3666 memset (checkpoint, 0, sizeof (struct checkpoint));
3668 memcpy (&checkpoint->name,
3669 &req_exec_ckpt_sync_checkpoint->checkpoint_name,
3670 sizeof (mar_name_t));
3672 memcpy (&checkpoint->checkpoint_creation_attributes,
3673 &req_exec_ckpt_sync_checkpoint->checkpoint_creation_attributes,
3674 sizeof (mar_ckpt_checkpoint_creation_attributes_t));
3676 memset (&checkpoint->refcount_set, 0, sizeof (struct refcount_set) * PROCESSOR_COUNT_MAX);
3677 checkpoint->ckpt_id = req_exec_ckpt_sync_checkpoint->ckpt_id;
3679 checkpoint->active_replica_set = req_exec_ckpt_sync_checkpoint->active_replica_set;
3681 checkpoint->unlinked = req_exec_ckpt_sync_checkpoint->unlinked;
3682 checkpoint->reference_count = 0;
3683 checkpoint->retention_timer = 0;
3684 checkpoint->section_count = 0;
3686 list_init (&checkpoint->list);
3687 list_init (&checkpoint->sections_list_head);
3688 list_add (&checkpoint->list, &sync_checkpoint_list_head);
3690 memset (checkpoint->refcount_set, 0,
3691 sizeof (struct refcount_set) * PROCESSOR_COUNT_MAX);
3694 if (checkpoint->ckpt_id >= global_ckpt_id) {
3695 global_ckpt_id = checkpoint->ckpt_id + 1;
3698 LEAVE();
3701 static void message_handler_req_exec_ckpt_sync_checkpoint_section (
3702 void *message,
3703 unsigned int nodeid)
3705 struct req_exec_ckpt_sync_checkpoint_section *req_exec_ckpt_sync_checkpoint_section =
3706 (struct req_exec_ckpt_sync_checkpoint_section *)message;
3707 struct checkpoint *checkpoint;
3708 struct checkpoint_section *checkpoint_section;
3709 char *section_contents;
3710 char *section_id;
3712 ENTER();
3715 * Ignore messages from previous ring ids
3717 if (memcmp (&req_exec_ckpt_sync_checkpoint_section->ring_id,
3718 &my_saved_ring_id, sizeof (struct memb_ring_id)) != 0) {
3719 LEAVE();
3720 return;
3723 checkpoint = checkpoint_find_specific (
3724 &sync_checkpoint_list_head,
3725 &req_exec_ckpt_sync_checkpoint_section->checkpoint_name,
3726 req_exec_ckpt_sync_checkpoint_section->ckpt_id);
3728 assert (checkpoint != NULL);
3731 * Determine if user-specified checkpoint section already exists
3733 checkpoint_section = checkpoint_section_find (
3734 checkpoint,
3735 ((char *)req_exec_ckpt_sync_checkpoint_section) +
3736 sizeof (struct req_exec_ckpt_sync_checkpoint_section),
3737 req_exec_ckpt_sync_checkpoint_section->id_len);
3738 if (checkpoint_section == NULL) {
3740 * Allocate checkpoint section
3742 checkpoint_section = malloc (sizeof (struct checkpoint_section));
3743 if (checkpoint_section == 0) {
3744 LEAVE();
3745 openais_exit_error (AIS_DONE_OUT_OF_MEMORY);
3747 section_contents = malloc (req_exec_ckpt_sync_checkpoint_section->section_size);
3748 if (section_contents == 0) {
3749 free (checkpoint_section);
3750 LEAVE();
3751 openais_exit_error (AIS_DONE_OUT_OF_MEMORY);
3753 if (req_exec_ckpt_sync_checkpoint_section->id_len) {
3755 section_id = malloc (req_exec_ckpt_sync_checkpoint_section->id_len + 1);
3756 if (section_id == 0) {
3757 free (checkpoint_section);
3758 free (section_contents);
3759 LEAVE();
3760 openais_exit_error (AIS_DONE_OUT_OF_MEMORY);
3764 * Copy checkpoint section and section ID
3766 memcpy (section_id,
3767 ((char *)req_exec_ckpt_sync_checkpoint_section) +
3768 sizeof (struct req_exec_ckpt_sync_checkpoint_section),
3769 req_exec_ckpt_sync_checkpoint_section->id_len);
3772 * Null terminate the section id for printing purposes
3774 ((char*)(section_id))[req_exec_ckpt_sync_checkpoint_section->id_len] = '\0';
3776 } else {
3778 * Default section
3780 section_id = NULL;
3783 memcpy (section_contents,
3784 ((char *)req_exec_ckpt_sync_checkpoint_section) +
3785 sizeof (struct req_exec_ckpt_sync_checkpoint_section) +
3786 req_exec_ckpt_sync_checkpoint_section->id_len,
3787 req_exec_ckpt_sync_checkpoint_section->section_size);
3790 * Configure checkpoint section
3792 checkpoint_section->section_descriptor.section_id.id = (unsigned char *)section_id;
3793 checkpoint_section->section_descriptor.section_id.id_len =
3794 req_exec_ckpt_sync_checkpoint_section->id_len;
3795 checkpoint_section->section_descriptor.section_size =
3796 req_exec_ckpt_sync_checkpoint_section->section_size;
3797 checkpoint_section->section_descriptor.expiration_time =
3798 req_exec_ckpt_sync_checkpoint_section->expiration_time;
3799 checkpoint_section->section_descriptor.section_state =
3800 SA_CKPT_SECTION_VALID;
3801 checkpoint_section->section_descriptor.last_update = 0; /* TODO current time */
3802 checkpoint_section->section_data = section_contents;
3803 checkpoint_section->expiration_timer = 0;
3806 * Add checkpoint section to checkpoint
3808 list_init (&checkpoint_section->list);
3809 list_add (&checkpoint_section->list,
3810 &checkpoint->sections_list_head);
3811 checkpoint->section_count += 1;
3814 LEAVE();
3817 static void message_handler_req_exec_ckpt_sync_checkpoint_refcount (
3818 void *message,
3819 unsigned int nodeid)
3821 struct req_exec_ckpt_sync_checkpoint_refcount *req_exec_ckpt_sync_checkpoint_refcount
3822 = (struct req_exec_ckpt_sync_checkpoint_refcount *)message;
3823 struct checkpoint *checkpoint;
3824 unsigned int i, j;
3826 ENTER();
3829 * Ignore messages from previous ring ids
3831 if (memcmp (&req_exec_ckpt_sync_checkpoint_refcount->ring_id,
3832 &my_saved_ring_id, sizeof (struct memb_ring_id)) != 0) {
3833 LEAVE();
3834 return;
3837 checkpoint = checkpoint_find_specific (
3838 &sync_checkpoint_list_head,
3839 &req_exec_ckpt_sync_checkpoint_refcount->checkpoint_name,
3840 req_exec_ckpt_sync_checkpoint_refcount->ckpt_id);
3842 assert (checkpoint != NULL);
3844 for (i = 0; i < PROCESSOR_COUNT_MAX; i++) {
3845 if (req_exec_ckpt_sync_checkpoint_refcount->refcount_set[i].nodeid == 0) {
3846 break;
3848 for (j = 0; j < PROCESSOR_COUNT_MAX; j++) {
3849 if (checkpoint->refcount_set[j].nodeid == 0) {
3850 checkpoint->refcount_set[j].nodeid =
3851 req_exec_ckpt_sync_checkpoint_refcount->refcount_set[i].nodeid;
3852 checkpoint->refcount_set[j].refcount =
3853 req_exec_ckpt_sync_checkpoint_refcount->refcount_set[i].refcount;
3855 * No match found, added processor reference count
3857 break;
3860 if (req_exec_ckpt_sync_checkpoint_refcount->refcount_set[i].nodeid == checkpoint->refcount_set[j].nodeid) {
3861 checkpoint->refcount_set[j].refcount +=
3862 req_exec_ckpt_sync_checkpoint_refcount->refcount_set[i].refcount;
3864 * Found match, so look at next processor ref count
3866 break;
3871 sync_refcount_calculate (checkpoint);
3873 LEAVE();
3877 static void ckpt_dump_fn (void)
3879 struct list_head *checkpoint_list;
3880 struct checkpoint *checkpoint;
3881 struct list_head *checkpoint_section_list;
3882 struct checkpoint_section *section;
3884 log_printf (LOG_LEVEL_NOTICE,
3885 "========== Checkpoint Information ===========");
3886 log_printf (LOG_LEVEL_NOTICE, "global_ckpt_id: %u", global_ckpt_id);
3888 for (checkpoint_list = checkpoint_list_head.next;
3889 checkpoint_list != &checkpoint_list_head;
3890 checkpoint_list = checkpoint_list->next) {
3892 checkpoint = list_entry (checkpoint_list, struct checkpoint, list);
3894 if (checkpoint == NULL) {
3895 return;
3898 log_printf (LOG_LEVEL_NOTICE, "Checkpoint %s (%d):",
3899 checkpoint->name.value, checkpoint->name.length);
3900 log_printf (LOG_LEVEL_NOTICE, " id: %u", checkpoint->ckpt_id);
3901 log_printf (LOG_LEVEL_NOTICE, " sec cnt: %u", checkpoint->section_count);
3902 log_printf (LOG_LEVEL_NOTICE, " ref cnt: %u", checkpoint->reference_count);
3903 log_printf (LOG_LEVEL_NOTICE, " unlinked: %u", checkpoint->unlinked);
3905 for (checkpoint_section_list = checkpoint->sections_list_head.next;
3906 checkpoint_section_list != &checkpoint->sections_list_head;
3907 checkpoint_section_list = checkpoint_section_list->next) {
3909 section = list_entry (checkpoint_section_list,
3910 struct checkpoint_section, list);
3912 log_printf (LOG_LEVEL_NOTICE, " Section %s (%d)",
3913 section->section_descriptor.section_id.id,
3914 section->section_descriptor.section_id.id_len);
3915 log_printf (LOG_LEVEL_NOTICE, " size: %llu",
3916 section->section_descriptor.section_size);
3917 log_printf (LOG_LEVEL_NOTICE, " state: %u",
3918 section->section_descriptor.section_state);
3919 log_printf (LOG_LEVEL_NOTICE, " exp time: %llu",
3920 section->section_descriptor.expiration_time);