4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
28 #include "common/logging.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
40 static struct ctdb_db_context
*client_db_handle(
41 struct ctdb_client_context
*client
,
44 struct ctdb_db_context
*db
;
46 for (db
= client
->db
; db
!= NULL
; db
= db
->next
) {
47 if (strcmp(db_name
, db
->db_name
) == 0) {
55 static bool ctdb_db_persistent(struct ctdb_db_context
*db
)
57 if (db
->db_flags
& CTDB_DB_FLAGS_PERSISTENT
) {
63 static bool ctdb_db_replicated(struct ctdb_db_context
*db
)
65 if (db
->db_flags
& CTDB_DB_FLAGS_REPLICATED
) {
71 static bool ctdb_db_volatile(struct ctdb_db_context
*db
)
73 if (db
->db_flags
& CTDB_DB_FLAGS_PERSISTENT
||
74 db
->db_flags
& CTDB_DB_FLAGS_REPLICATED
) {
80 struct ctdb_set_db_flags_state
{
81 struct tevent_context
*ev
;
82 struct ctdb_client_context
*client
;
83 struct timeval timeout
;
86 bool readonly_done
, sticky_done
;
91 static void ctdb_set_db_flags_nodemap_done(struct tevent_req
*subreq
);
92 static void ctdb_set_db_flags_readonly_done(struct tevent_req
*subreq
);
93 static void ctdb_set_db_flags_sticky_done(struct tevent_req
*subreq
);
95 static struct tevent_req
*ctdb_set_db_flags_send(
97 struct tevent_context
*ev
,
98 struct ctdb_client_context
*client
,
99 uint32_t destnode
, struct timeval timeout
,
100 uint32_t db_id
, uint8_t db_flags
)
102 struct tevent_req
*req
, *subreq
;
103 struct ctdb_set_db_flags_state
*state
;
104 struct ctdb_req_control request
;
106 req
= tevent_req_create(mem_ctx
, &state
,
107 struct ctdb_set_db_flags_state
);
112 if (! (db_flags
& (CTDB_DB_FLAGS_READONLY
| CTDB_DB_FLAGS_STICKY
))) {
113 tevent_req_done(req
);
114 return tevent_req_post(req
, ev
);
118 state
->client
= client
;
119 state
->timeout
= timeout
;
120 state
->db_id
= db_id
;
121 state
->db_flags
= db_flags
;
123 ctdb_req_control_get_nodemap(&request
);
124 subreq
= ctdb_client_control_send(state
, ev
, client
, destnode
, timeout
,
126 if (tevent_req_nomem(subreq
, req
)) {
127 return tevent_req_post(req
, ev
);
129 tevent_req_set_callback(subreq
, ctdb_set_db_flags_nodemap_done
, req
);
134 static void ctdb_set_db_flags_nodemap_done(struct tevent_req
*subreq
)
136 struct tevent_req
*req
= tevent_req_callback_data(
137 subreq
, struct tevent_req
);
138 struct ctdb_set_db_flags_state
*state
= tevent_req_data(
139 req
, struct ctdb_set_db_flags_state
);
140 struct ctdb_req_control request
;
141 struct ctdb_reply_control
*reply
;
142 struct ctdb_node_map
*nodemap
;
146 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
150 ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n",
152 tevent_req_error(req
, ret
);
156 ret
= ctdb_reply_control_get_nodemap(reply
, state
, &nodemap
);
160 ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n",
162 tevent_req_error(req
, ret
);
166 state
->count
= list_of_connected_nodes(nodemap
, CTDB_UNKNOWN_PNN
,
167 state
, &state
->pnn_list
);
168 talloc_free(nodemap
);
169 if (state
->count
<= 0) {
171 ("set_db_flags: 0x%08x no connected nodes, count=%d\n",
172 state
->db_id
, state
->count
));
173 tevent_req_error(req
, ENOMEM
);
177 if (state
->db_flags
& CTDB_DB_FLAGS_READONLY
) {
178 ctdb_req_control_set_db_readonly(&request
, state
->db_id
);
179 subreq
= ctdb_client_control_multi_send(
180 state
, state
->ev
, state
->client
,
181 state
->pnn_list
, state
->count
,
182 state
->timeout
, &request
);
183 if (tevent_req_nomem(subreq
, req
)) {
186 tevent_req_set_callback(subreq
,
187 ctdb_set_db_flags_readonly_done
, req
);
189 state
->readonly_done
= true;
192 if (state
->db_flags
& CTDB_DB_FLAGS_STICKY
) {
193 ctdb_req_control_set_db_sticky(&request
, state
->db_id
);
194 subreq
= ctdb_client_control_multi_send(
195 state
, state
->ev
, state
->client
,
196 state
->pnn_list
, state
->count
,
197 state
->timeout
, &request
);
198 if (tevent_req_nomem(subreq
, req
)) {
201 tevent_req_set_callback(subreq
, ctdb_set_db_flags_sticky_done
,
204 state
->sticky_done
= true;
208 static void ctdb_set_db_flags_readonly_done(struct tevent_req
*subreq
)
210 struct tevent_req
*req
= tevent_req_callback_data(
211 subreq
, struct tevent_req
);
212 struct ctdb_set_db_flags_state
*state
= tevent_req_data(
213 req
, struct ctdb_set_db_flags_state
);
217 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, NULL
,
222 ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n",
224 tevent_req_error(req
, ret
);
228 state
->readonly_done
= true;
230 if (state
->readonly_done
&& state
->sticky_done
) {
231 tevent_req_done(req
);
235 static void ctdb_set_db_flags_sticky_done(struct tevent_req
*subreq
)
237 struct tevent_req
*req
= tevent_req_callback_data(
238 subreq
, struct tevent_req
);
239 struct ctdb_set_db_flags_state
*state
= tevent_req_data(
240 req
, struct ctdb_set_db_flags_state
);
244 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, NULL
,
249 ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n",
251 tevent_req_error(req
, ret
);
255 state
->sticky_done
= true;
257 if (state
->readonly_done
&& state
->sticky_done
) {
258 tevent_req_done(req
);
262 static bool ctdb_set_db_flags_recv(struct tevent_req
*req
, int *perr
)
266 if (tevent_req_is_unix_error(req
, &err
)) {
275 struct ctdb_attach_state
{
276 struct tevent_context
*ev
;
277 struct ctdb_client_context
*client
;
278 struct timeval timeout
;
281 struct ctdb_db_context
*db
;
284 static void ctdb_attach_dbid_done(struct tevent_req
*subreq
);
285 static void ctdb_attach_dbpath_done(struct tevent_req
*subreq
);
286 static void ctdb_attach_health_done(struct tevent_req
*subreq
);
287 static void ctdb_attach_flags_done(struct tevent_req
*subreq
);
288 static void ctdb_attach_open_flags_done(struct tevent_req
*subreq
);
290 struct tevent_req
*ctdb_attach_send(TALLOC_CTX
*mem_ctx
,
291 struct tevent_context
*ev
,
292 struct ctdb_client_context
*client
,
293 struct timeval timeout
,
294 const char *db_name
, uint8_t db_flags
)
296 struct tevent_req
*req
, *subreq
;
297 struct ctdb_attach_state
*state
;
298 struct ctdb_req_control request
;
300 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_attach_state
);
305 state
->db
= client_db_handle(client
, db_name
);
306 if (state
->db
!= NULL
) {
307 tevent_req_done(req
);
308 return tevent_req_post(req
, ev
);
312 state
->client
= client
;
313 state
->timeout
= timeout
;
314 state
->destnode
= ctdb_client_pnn(client
);
315 state
->db_flags
= db_flags
;
317 state
->db
= talloc_zero(client
, struct ctdb_db_context
);
318 if (tevent_req_nomem(state
->db
, req
)) {
319 return tevent_req_post(req
, ev
);
322 state
->db
->db_name
= talloc_strdup(state
->db
, db_name
);
323 if (tevent_req_nomem(state
->db
, req
)) {
324 return tevent_req_post(req
, ev
);
327 state
->db
->db_flags
= db_flags
;
329 if (ctdb_db_persistent(state
->db
)) {
330 ctdb_req_control_db_attach_persistent(&request
,
332 } else if (ctdb_db_replicated(state
->db
)) {
333 ctdb_req_control_db_attach_replicated(&request
,
336 ctdb_req_control_db_attach(&request
, state
->db
->db_name
);
339 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
340 state
->destnode
, state
->timeout
,
342 if (tevent_req_nomem(subreq
, req
)) {
343 return tevent_req_post(req
, ev
);
345 tevent_req_set_callback(subreq
, ctdb_attach_dbid_done
, req
);
350 static void ctdb_attach_dbid_done(struct tevent_req
*subreq
)
352 struct tevent_req
*req
= tevent_req_callback_data(
353 subreq
, struct tevent_req
);
354 struct ctdb_attach_state
*state
= tevent_req_data(
355 req
, struct ctdb_attach_state
);
356 struct ctdb_req_control request
;
357 struct ctdb_reply_control
*reply
;
361 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
364 DEBUG(DEBUG_ERR
, ("attach: %s %s failed, ret=%d\n",
366 (ctdb_db_persistent(state
->db
)
367 ? "DB_ATTACH_PERSISTENT"
368 : (ctdb_db_replicated(state
->db
)
369 ? "DB_ATTACH_REPLICATED"
372 tevent_req_error(req
, ret
);
376 if (ctdb_db_persistent(state
->db
)) {
377 ret
= ctdb_reply_control_db_attach_persistent(
378 reply
, &state
->db
->db_id
);
379 } else if (ctdb_db_replicated(state
->db
)) {
380 ret
= ctdb_reply_control_db_attach_replicated(
381 reply
, &state
->db
->db_id
);
383 ret
= ctdb_reply_control_db_attach(reply
, &state
->db
->db_id
);
387 DEBUG(DEBUG_ERR
, ("attach: %s failed to get db_id, ret=%d\n",
388 state
->db
->db_name
, ret
));
389 tevent_req_error(req
, ret
);
393 ctdb_req_control_getdbpath(&request
, state
->db
->db_id
);
394 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
395 state
->destnode
, state
->timeout
,
397 if (tevent_req_nomem(subreq
, req
)) {
400 tevent_req_set_callback(subreq
, ctdb_attach_dbpath_done
, req
);
403 static void ctdb_attach_dbpath_done(struct tevent_req
*subreq
)
405 struct tevent_req
*req
= tevent_req_callback_data(
406 subreq
, struct tevent_req
);
407 struct ctdb_attach_state
*state
= tevent_req_data(
408 req
, struct ctdb_attach_state
);
409 struct ctdb_reply_control
*reply
;
410 struct ctdb_req_control request
;
414 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
417 DEBUG(DEBUG_ERR
, ("attach: %s GETDBPATH failed, ret=%d\n",
418 state
->db
->db_name
, ret
));
419 tevent_req_error(req
, ret
);
423 ret
= ctdb_reply_control_getdbpath(reply
, state
->db
,
424 &state
->db
->db_path
);
427 DEBUG(DEBUG_ERR
, ("attach: %s GETDBPATH parse failed, ret=%d\n",
428 state
->db
->db_name
, ret
));
429 tevent_req_error(req
, ret
);
433 ctdb_req_control_db_get_health(&request
, state
->db
->db_id
);
434 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
435 state
->destnode
, state
->timeout
,
437 if (tevent_req_nomem(subreq
, req
)) {
440 tevent_req_set_callback(subreq
, ctdb_attach_health_done
, req
);
443 static void ctdb_attach_health_done(struct tevent_req
*subreq
)
445 struct tevent_req
*req
= tevent_req_callback_data(
446 subreq
, struct tevent_req
);
447 struct ctdb_attach_state
*state
= tevent_req_data(
448 req
, struct ctdb_attach_state
);
449 struct ctdb_reply_control
*reply
;
454 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
457 DEBUG(DEBUG_ERR
, ("attach: %s DB_GET_HEALTH failed, ret=%d\n",
458 state
->db
->db_name
, ret
));
459 tevent_req_error(req
, ret
);
463 ret
= ctdb_reply_control_db_get_health(reply
, state
, &reason
);
466 ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n",
467 state
->db
->db_name
, ret
));
468 tevent_req_error(req
, ret
);
472 if (reason
!= NULL
) {
473 /* Database unhealthy, avoid attach */
474 DEBUG(DEBUG_ERR
, ("attach: %s database unhealthy (%s)\n",
475 state
->db
->db_name
, reason
));
476 tevent_req_error(req
, EIO
);
480 subreq
= ctdb_set_db_flags_send(state
, state
->ev
, state
->client
,
481 state
->destnode
, state
->timeout
,
482 state
->db
->db_id
, state
->db_flags
);
483 if (tevent_req_nomem(subreq
, req
)) {
486 tevent_req_set_callback(subreq
, ctdb_attach_flags_done
, req
);
489 static void ctdb_attach_flags_done(struct tevent_req
*subreq
)
491 struct tevent_req
*req
= tevent_req_callback_data(
492 subreq
, struct tevent_req
);
493 struct ctdb_attach_state
*state
= tevent_req_data(
494 req
, struct ctdb_attach_state
);
495 struct ctdb_req_control request
;
499 status
= ctdb_set_db_flags_recv(subreq
, &ret
);
502 DEBUG(DEBUG_ERR
, ("attach: %s set db flags 0x%08x failed\n",
503 state
->db
->db_name
, state
->db_flags
));
504 tevent_req_error(req
, ret
);
508 ctdb_req_control_db_open_flags(&request
, state
->db
->db_id
);
509 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
510 state
->destnode
, state
->timeout
,
512 if (tevent_req_nomem(subreq
, req
)) {
515 tevent_req_set_callback(subreq
, ctdb_attach_open_flags_done
, req
);
518 static void ctdb_attach_open_flags_done(struct tevent_req
*subreq
)
520 struct tevent_req
*req
= tevent_req_callback_data(
521 subreq
, struct tevent_req
);
522 struct ctdb_attach_state
*state
= tevent_req_data(
523 req
, struct ctdb_attach_state
);
524 struct ctdb_reply_control
*reply
;
528 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
531 DEBUG(DEBUG_ERR
, ("attach: %s DB_OPEN_FLAGS failed, ret=%d\n",
532 state
->db
->db_name
, ret
));
533 tevent_req_error(req
, ret
);
537 ret
= ctdb_reply_control_db_open_flags(reply
, &tdb_flags
);
540 DEBUG(DEBUG_ERR
, ("attach: %s DB_OPEN_FLAGS parse failed,"
541 " ret=%d\n", state
->db
->db_name
, ret
));
542 tevent_req_error(req
, ret
);
546 state
->db
->ltdb
= tdb_wrap_open(state
->db
, state
->db
->db_path
, 0,
547 tdb_flags
, O_RDWR
, 0);
548 if (tevent_req_nomem(state
->db
->ltdb
, req
)) {
549 DEBUG(DEBUG_ERR
, ("attach: %s tdb_wrap_open failed\n",
550 state
->db
->db_name
));
553 DLIST_ADD(state
->client
->db
, state
->db
);
555 tevent_req_done(req
);
558 bool ctdb_attach_recv(struct tevent_req
*req
, int *perr
,
559 struct ctdb_db_context
**out
)
561 struct ctdb_attach_state
*state
= tevent_req_data(
562 req
, struct ctdb_attach_state
);
565 if (tevent_req_is_unix_error(req
, &err
)) {
578 int ctdb_attach(struct tevent_context
*ev
,
579 struct ctdb_client_context
*client
,
580 struct timeval timeout
,
581 const char *db_name
, uint8_t db_flags
,
582 struct ctdb_db_context
**out
)
585 struct tevent_req
*req
;
589 mem_ctx
= talloc_new(client
);
590 if (mem_ctx
== NULL
) {
594 req
= ctdb_attach_send(mem_ctx
, ev
, client
, timeout
,
597 talloc_free(mem_ctx
);
601 tevent_req_poll(req
, ev
);
603 status
= ctdb_attach_recv(req
, &ret
, out
);
605 talloc_free(mem_ctx
);
610 ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
611 ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
612 ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
615 talloc_free(mem_ctx
);
619 struct ctdb_detach_state
{
620 struct ctdb_client_context
*client
;
621 struct tevent_context
*ev
;
622 struct timeval timeout
;
627 static void ctdb_detach_dbname_done(struct tevent_req
*subreq
);
628 static void ctdb_detach_done(struct tevent_req
*subreq
);
630 struct tevent_req
*ctdb_detach_send(TALLOC_CTX
*mem_ctx
,
631 struct tevent_context
*ev
,
632 struct ctdb_client_context
*client
,
633 struct timeval timeout
, uint32_t db_id
)
635 struct tevent_req
*req
, *subreq
;
636 struct ctdb_detach_state
*state
;
637 struct ctdb_req_control request
;
639 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_detach_state
);
644 state
->client
= client
;
646 state
->timeout
= timeout
;
647 state
->db_id
= db_id
;
649 ctdb_req_control_get_dbname(&request
, db_id
);
650 subreq
= ctdb_client_control_send(state
, ev
, client
,
651 ctdb_client_pnn(client
), timeout
,
653 if (tevent_req_nomem(subreq
, req
)) {
654 return tevent_req_post(req
, ev
);
656 tevent_req_set_callback(subreq
, ctdb_detach_dbname_done
, req
);
661 static void ctdb_detach_dbname_done(struct tevent_req
*subreq
)
663 struct tevent_req
*req
= tevent_req_callback_data(
664 subreq
, struct tevent_req
);
665 struct ctdb_detach_state
*state
= tevent_req_data(
666 req
, struct ctdb_detach_state
);
667 struct ctdb_reply_control
*reply
;
668 struct ctdb_req_control request
;
672 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
675 DEBUG(DEBUG_ERR
, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
677 tevent_req_error(req
, ret
);
681 ret
= ctdb_reply_control_get_dbname(reply
, state
, &state
->db_name
);
683 DEBUG(DEBUG_ERR
, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
685 tevent_req_error(req
, ret
);
689 ctdb_req_control_db_detach(&request
, state
->db_id
);
690 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
691 ctdb_client_pnn(state
->client
),
692 state
->timeout
, &request
);
693 if (tevent_req_nomem(subreq
, req
)) {
696 tevent_req_set_callback(subreq
, ctdb_detach_done
, req
);
700 static void ctdb_detach_done(struct tevent_req
*subreq
)
702 struct tevent_req
*req
= tevent_req_callback_data(
703 subreq
, struct tevent_req
);
704 struct ctdb_detach_state
*state
= tevent_req_data(
705 req
, struct ctdb_detach_state
);
706 struct ctdb_reply_control
*reply
;
707 struct ctdb_db_context
*db
;
711 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
714 DEBUG(DEBUG_ERR
, ("detach: %s DB_DETACH failed, ret=%d\n",
715 state
->db_name
, ret
));
716 tevent_req_error(req
, ret
);
720 ret
= ctdb_reply_control_db_detach(reply
);
722 DEBUG(DEBUG_ERR
, ("detach: %s DB_DETACH failed, ret=%d\n",
723 state
->db_name
, ret
));
724 tevent_req_error(req
, ret
);
728 db
= client_db_handle(state
->client
, state
->db_name
);
730 DLIST_REMOVE(state
->client
->db
, db
);
734 tevent_req_done(req
);
737 bool ctdb_detach_recv(struct tevent_req
*req
, int *perr
)
741 if (tevent_req_is_unix_error(req
, &ret
)) {
751 int ctdb_detach(struct tevent_context
*ev
,
752 struct ctdb_client_context
*client
,
753 struct timeval timeout
, uint32_t db_id
)
756 struct tevent_req
*req
;
760 mem_ctx
= talloc_new(client
);
761 if (mem_ctx
== NULL
) {
765 req
= ctdb_detach_send(mem_ctx
, ev
, client
, timeout
, db_id
);
767 talloc_free(mem_ctx
);
771 tevent_req_poll(req
, ev
);
773 status
= ctdb_detach_recv(req
, &ret
);
775 talloc_free(mem_ctx
);
779 talloc_free(mem_ctx
);
783 uint32_t ctdb_db_id(struct ctdb_db_context
*db
)
788 struct ctdb_db_traverse_local_state
{
789 ctdb_rec_parser_func_t parser
;
795 static int ctdb_db_traverse_local_handler(struct tdb_context
*tdb
,
796 TDB_DATA key
, TDB_DATA data
,
799 struct ctdb_db_traverse_local_state
*state
=
800 (struct ctdb_db_traverse_local_state
*)private_data
;
803 if (state
->extract_header
) {
804 struct ctdb_ltdb_header header
;
806 ret
= ctdb_ltdb_header_extract(&data
, &header
);
812 ret
= state
->parser(0, &header
, key
, data
, state
->private_data
);
814 ret
= state
->parser(0, NULL
, key
, data
, state
->private_data
);
825 int ctdb_db_traverse_local(struct ctdb_db_context
*db
, bool readonly
,
827 ctdb_rec_parser_func_t parser
, void *private_data
)
829 struct ctdb_db_traverse_local_state state
;
832 state
.parser
= parser
;
833 state
.private_data
= private_data
;
834 state
.extract_header
= extract_header
;
838 ret
= tdb_traverse_read(db
->ltdb
->tdb
,
839 ctdb_db_traverse_local_handler
,
842 ret
= tdb_traverse(db
->ltdb
->tdb
,
843 ctdb_db_traverse_local_handler
, &state
);
853 struct ctdb_db_traverse_state
{
854 struct tevent_context
*ev
;
855 struct ctdb_client_context
*client
;
856 struct ctdb_db_context
*db
;
859 struct timeval timeout
;
860 ctdb_rec_parser_func_t parser
;
865 static void ctdb_db_traverse_handler_set(struct tevent_req
*subreq
);
866 static void ctdb_db_traverse_started(struct tevent_req
*subreq
);
867 static void ctdb_db_traverse_handler(uint64_t srvid
, TDB_DATA data
,
869 static void ctdb_db_traverse_remove_handler(struct tevent_req
*req
);
870 static void ctdb_db_traverse_handler_removed(struct tevent_req
*subreq
);
872 struct tevent_req
*ctdb_db_traverse_send(TALLOC_CTX
*mem_ctx
,
873 struct tevent_context
*ev
,
874 struct ctdb_client_context
*client
,
875 struct ctdb_db_context
*db
,
877 struct timeval timeout
,
878 ctdb_rec_parser_func_t parser
,
881 struct tevent_req
*req
, *subreq
;
882 struct ctdb_db_traverse_state
*state
;
884 req
= tevent_req_create(mem_ctx
, &state
,
885 struct ctdb_db_traverse_state
);
891 state
->client
= client
;
893 state
->destnode
= destnode
;
894 state
->srvid
= CTDB_SRVID_CLIENT_RANGE
| getpid();
895 state
->timeout
= timeout
;
896 state
->parser
= parser
;
897 state
->private_data
= private_data
;
899 subreq
= ctdb_client_set_message_handler_send(state
, ev
, client
,
901 ctdb_db_traverse_handler
,
903 if (tevent_req_nomem(subreq
, req
)) {
904 return tevent_req_post(req
, ev
);
906 tevent_req_set_callback(subreq
, ctdb_db_traverse_handler_set
, req
);
911 static void ctdb_db_traverse_handler_set(struct tevent_req
*subreq
)
913 struct tevent_req
*req
= tevent_req_callback_data(
914 subreq
, struct tevent_req
);
915 struct ctdb_db_traverse_state
*state
= tevent_req_data(
916 req
, struct ctdb_db_traverse_state
);
917 struct ctdb_traverse_start_ext traverse
;
918 struct ctdb_req_control request
;
922 status
= ctdb_client_set_message_handler_recv(subreq
, &ret
);
925 tevent_req_error(req
, ret
);
929 traverse
= (struct ctdb_traverse_start_ext
) {
930 .db_id
= ctdb_db_id(state
->db
),
932 .srvid
= state
->srvid
,
933 .withemptyrecords
= false,
936 ctdb_req_control_traverse_start_ext(&request
, &traverse
);
937 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
938 state
->destnode
, state
->timeout
,
940 if (subreq
== NULL
) {
941 state
->result
= ENOMEM
;
942 ctdb_db_traverse_remove_handler(req
);
945 tevent_req_set_callback(subreq
, ctdb_db_traverse_started
, req
);
948 static void ctdb_db_traverse_started(struct tevent_req
*subreq
)
950 struct tevent_req
*req
= tevent_req_callback_data(
951 subreq
, struct tevent_req
);
952 struct ctdb_db_traverse_state
*state
= tevent_req_data(
953 req
, struct ctdb_db_traverse_state
);
954 struct ctdb_reply_control
*reply
;
958 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
961 DEBUG(DEBUG_ERR
, ("traverse: control failed, ret=%d\n", ret
));
963 ctdb_db_traverse_remove_handler(req
);
967 ret
= ctdb_reply_control_traverse_start_ext(reply
);
970 DEBUG(DEBUG_ERR
, ("traverse: control reply failed, ret=%d\n",
973 ctdb_db_traverse_remove_handler(req
);
978 static void ctdb_db_traverse_handler(uint64_t srvid
, TDB_DATA data
,
981 struct tevent_req
*req
= talloc_get_type_abort(
982 private_data
, struct tevent_req
);
983 struct ctdb_db_traverse_state
*state
= tevent_req_data(
984 req
, struct ctdb_db_traverse_state
);
985 struct ctdb_rec_data
*rec
;
986 struct ctdb_ltdb_header header
;
990 ret
= ctdb_rec_data_pull(data
.dptr
, data
.dsize
, state
, &rec
, &np
);
995 if (rec
->key
.dsize
== 0 && rec
->data
.dsize
== 0) {
997 ctdb_db_traverse_remove_handler(req
);
1001 ret
= ctdb_ltdb_header_extract(&rec
->data
, &header
);
1007 if (rec
->data
.dsize
== 0) {
1012 ret
= state
->parser(rec
->reqid
, &header
, rec
->key
, rec
->data
,
1013 state
->private_data
);
1016 state
->result
= ret
;
1017 ctdb_db_traverse_remove_handler(req
);
1021 static void ctdb_db_traverse_remove_handler(struct tevent_req
*req
)
1023 struct ctdb_db_traverse_state
*state
= tevent_req_data(
1024 req
, struct ctdb_db_traverse_state
);
1025 struct tevent_req
*subreq
;
1027 subreq
= ctdb_client_remove_message_handler_send(state
, state
->ev
,
1030 if (tevent_req_nomem(subreq
, req
)) {
1033 tevent_req_set_callback(subreq
, ctdb_db_traverse_handler_removed
, req
);
1036 static void ctdb_db_traverse_handler_removed(struct tevent_req
*subreq
)
1038 struct tevent_req
*req
= tevent_req_callback_data(
1039 subreq
, struct tevent_req
);
1040 struct ctdb_db_traverse_state
*state
= tevent_req_data(
1041 req
, struct ctdb_db_traverse_state
);
1045 status
= ctdb_client_remove_message_handler_recv(subreq
, &ret
);
1046 TALLOC_FREE(subreq
);
1048 tevent_req_error(req
, ret
);
1052 if (state
->result
!= 0) {
1053 tevent_req_error(req
, state
->result
);
1057 tevent_req_done(req
);
1060 bool ctdb_db_traverse_recv(struct tevent_req
*req
, int *perr
)
1064 if (tevent_req_is_unix_error(req
, &ret
)) {
1074 int ctdb_db_traverse(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
1075 struct ctdb_client_context
*client
,
1076 struct ctdb_db_context
*db
,
1077 uint32_t destnode
, struct timeval timeout
,
1078 ctdb_rec_parser_func_t parser
, void *private_data
)
1080 struct tevent_req
*req
;
1084 req
= ctdb_db_traverse_send(mem_ctx
, ev
, client
, db
, destnode
,
1085 timeout
, parser
, private_data
);
1090 tevent_req_poll(req
, ev
);
1092 status
= ctdb_db_traverse_recv(req
, &ret
);
1100 int ctdb_ltdb_fetch(struct ctdb_db_context
*db
, TDB_DATA key
,
1101 struct ctdb_ltdb_header
*header
,
1102 TALLOC_CTX
*mem_ctx
, TDB_DATA
*data
)
1108 rec
= tdb_fetch(db
->ltdb
->tdb
, key
);
1109 if (rec
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
1110 /* No record present */
1111 if (rec
.dptr
!= NULL
) {
1115 if (tdb_error(db
->ltdb
->tdb
) != TDB_ERR_NOEXIST
) {
1119 *header
= (struct ctdb_ltdb_header
) {
1120 .dmaster
= CTDB_UNKNOWN_PNN
,
1129 ret
= ctdb_ltdb_header_pull(rec
.dptr
, rec
.dsize
, header
, &np
);
1136 data
->dsize
= rec
.dsize
- np
;
1137 data
->dptr
= talloc_memdup(mem_ctx
, rec
.dptr
+ np
,
1139 if (data
->dptr
== NULL
) {
1149 * Fetch a record from volatile database
1152 * 1. Get a lock on the hash chain
1153 * 2. If the record does not exist, migrate the record
1154 * 3. If readonly=true and delegations do not exist, migrate the record.
1155 * 4. If readonly=false and delegations exist, migrate the record.
1156 * 5. If the local node is not dmaster, migrate the record.
1160 struct ctdb_fetch_lock_state
{
1161 struct tevent_context
*ev
;
1162 struct ctdb_client_context
*client
;
1163 struct ctdb_record_handle
*h
;
1168 static int ctdb_fetch_lock_check(struct tevent_req
*req
);
1169 static void ctdb_fetch_lock_migrate(struct tevent_req
*req
);
1170 static void ctdb_fetch_lock_migrate_done(struct tevent_req
*subreq
);
1172 struct tevent_req
*ctdb_fetch_lock_send(TALLOC_CTX
*mem_ctx
,
1173 struct tevent_context
*ev
,
1174 struct ctdb_client_context
*client
,
1175 struct ctdb_db_context
*db
,
1176 TDB_DATA key
, bool readonly
)
1178 struct ctdb_fetch_lock_state
*state
;
1179 struct tevent_req
*req
;
1182 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_fetch_lock_state
);
1188 state
->client
= client
;
1190 state
->h
= talloc_zero(db
, struct ctdb_record_handle
);
1191 if (tevent_req_nomem(state
->h
, req
)) {
1192 return tevent_req_post(req
, ev
);
1195 state
->h
->client
= client
;
1197 state
->h
->key
.dptr
= talloc_memdup(state
->h
, key
.dptr
, key
.dsize
);
1198 if (tevent_req_nomem(state
->h
->key
.dptr
, req
)) {
1199 return tevent_req_post(req
, ev
);
1201 state
->h
->key
.dsize
= key
.dsize
;
1202 state
->h
->readonly
= false;
1204 state
->readonly
= readonly
;
1205 state
->pnn
= ctdb_client_pnn(client
);
1207 /* Check that database is not persistent */
1208 if (! ctdb_db_volatile(db
)) {
1209 DEBUG(DEBUG_ERR
, ("fetch_lock: %s database not volatile\n",
1211 tevent_req_error(req
, EINVAL
);
1212 return tevent_req_post(req
, ev
);
1215 ret
= ctdb_fetch_lock_check(req
);
1217 tevent_req_done(req
);
1218 return tevent_req_post(req
, ev
);
1220 if (ret
!= EAGAIN
) {
1221 tevent_req_error(req
, ret
);
1222 return tevent_req_post(req
, ev
);
1227 static int ctdb_fetch_lock_check(struct tevent_req
*req
)
1229 struct ctdb_fetch_lock_state
*state
= tevent_req_data(
1230 req
, struct ctdb_fetch_lock_state
);
1231 struct ctdb_record_handle
*h
= state
->h
;
1232 struct ctdb_ltdb_header header
;
1233 TDB_DATA data
= tdb_null
;
1236 bool do_migrate
= false;
1238 ret
= tdb_chainlock(h
->db
->ltdb
->tdb
, h
->key
);
1241 ("fetch_lock: %s tdb_chainlock failed, %s\n",
1242 h
->db
->db_name
, tdb_errorstr(h
->db
->ltdb
->tdb
)));
1247 data
= tdb_fetch(h
->db
->ltdb
->tdb
, h
->key
);
1248 if (data
.dptr
== NULL
) {
1249 if (tdb_error(h
->db
->ltdb
->tdb
) == TDB_ERR_NOEXIST
) {
1257 /* Got the record */
1258 ret
= ctdb_ltdb_header_pull(data
.dptr
, data
.dsize
, &header
, &np
);
1264 if (! state
->readonly
) {
1265 /* Read/write access */
1266 if (header
.dmaster
== state
->pnn
&&
1267 header
.flags
& CTDB_REC_RO_HAVE_DELEGATIONS
) {
1271 if (header
.dmaster
!= state
->pnn
) {
1275 /* Readonly access */
1276 if (header
.dmaster
!= state
->pnn
&&
1277 ! (header
.flags
& (CTDB_REC_RO_HAVE_READONLY
|
1278 CTDB_REC_RO_HAVE_DELEGATIONS
))) {
1283 /* We are the dmaster or readonly delegation */
1286 if (header
.flags
& (CTDB_REC_RO_HAVE_READONLY
|
1287 CTDB_REC_RO_HAVE_DELEGATIONS
)) {
1297 if (data
.dptr
!= NULL
) {
1300 ret
= tdb_chainunlock(h
->db
->ltdb
->tdb
, h
->key
);
1303 ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1304 h
->db
->db_name
, tdb_errorstr(h
->db
->ltdb
->tdb
)));
1309 ctdb_fetch_lock_migrate(req
);
1314 static void ctdb_fetch_lock_migrate(struct tevent_req
*req
)
1316 struct ctdb_fetch_lock_state
*state
= tevent_req_data(
1317 req
, struct ctdb_fetch_lock_state
);
1318 struct ctdb_req_call request
;
1319 struct tevent_req
*subreq
;
1321 ZERO_STRUCT(request
);
1322 request
.flags
= CTDB_IMMEDIATE_MIGRATION
;
1323 if (state
->readonly
) {
1324 request
.flags
|= CTDB_WANT_READONLY
;
1326 request
.db_id
= state
->h
->db
->db_id
;
1327 request
.callid
= CTDB_NULL_FUNC
;
1328 request
.key
= state
->h
->key
;
1329 request
.calldata
= tdb_null
;
1331 subreq
= ctdb_client_call_send(state
, state
->ev
, state
->client
,
1333 if (tevent_req_nomem(subreq
, req
)) {
1337 tevent_req_set_callback(subreq
, ctdb_fetch_lock_migrate_done
, req
);
1340 static void ctdb_fetch_lock_migrate_done(struct tevent_req
*subreq
)
1342 struct tevent_req
*req
= tevent_req_callback_data(
1343 subreq
, struct tevent_req
);
1344 struct ctdb_fetch_lock_state
*state
= tevent_req_data(
1345 req
, struct ctdb_fetch_lock_state
);
1346 struct ctdb_reply_call
*reply
;
1350 status
= ctdb_client_call_recv(subreq
, state
, &reply
, &ret
);
1351 TALLOC_FREE(subreq
);
1353 DEBUG(DEBUG_ERR
, ("fetch_lock: %s CALL failed, ret=%d\n",
1354 state
->h
->db
->db_name
, ret
));
1355 tevent_req_error(req
, ret
);
1359 if (reply
->status
!= 0) {
1360 tevent_req_error(req
, EIO
);
1365 ret
= ctdb_fetch_lock_check(req
);
1367 if (ret
!= EAGAIN
) {
1368 tevent_req_error(req
, ret
);
1373 tevent_req_done(req
);
1376 static int ctdb_record_handle_destructor(struct ctdb_record_handle
*h
)
1380 ret
= tdb_chainunlock(h
->db
->ltdb
->tdb
, h
->key
);
1383 ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1384 h
->db
->db_name
, tdb_errorstr(h
->db
->ltdb
->tdb
)));
1390 struct ctdb_record_handle
*ctdb_fetch_lock_recv(struct tevent_req
*req
,
1391 struct ctdb_ltdb_header
*header
,
1392 TALLOC_CTX
*mem_ctx
,
1393 TDB_DATA
*data
, int *perr
)
1395 struct ctdb_fetch_lock_state
*state
= tevent_req_data(
1396 req
, struct ctdb_fetch_lock_state
);
1397 struct ctdb_record_handle
*h
= state
->h
;
1400 if (tevent_req_is_unix_error(req
, &err
)) {
1402 TALLOC_FREE(state
->h
);
1408 if (header
!= NULL
) {
1409 *header
= h
->header
;
1414 offset
= ctdb_ltdb_header_len(&h
->header
);
1416 data
->dsize
= h
->data
.dsize
- offset
;
1417 if (data
->dsize
== 0) {
1420 data
->dptr
= talloc_memdup(mem_ctx
,
1421 h
->data
.dptr
+ offset
,
1423 if (data
->dptr
== NULL
) {
1424 TALLOC_FREE(state
->h
);
1433 talloc_set_destructor(h
, ctdb_record_handle_destructor
);
1437 int ctdb_fetch_lock(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
1438 struct ctdb_client_context
*client
,
1439 struct ctdb_db_context
*db
, TDB_DATA key
, bool readonly
,
1440 struct ctdb_record_handle
**out
,
1441 struct ctdb_ltdb_header
*header
, TDB_DATA
*data
)
1443 struct tevent_req
*req
;
1444 struct ctdb_record_handle
*h
;
1447 req
= ctdb_fetch_lock_send(mem_ctx
, ev
, client
, db
, key
, readonly
);
1452 tevent_req_poll(req
, ev
);
1454 h
= ctdb_fetch_lock_recv(req
, header
, mem_ctx
, data
, &ret
);
1463 int ctdb_store_record(struct ctdb_record_handle
*h
, TDB_DATA data
)
1465 uint8_t header
[sizeof(struct ctdb_ltdb_header
)];
1470 /* Cannot modify the record if it was obtained as a readonly copy */
1475 /* Check if the new data is same */
1476 if (h
->data
.dsize
== data
.dsize
&&
1477 memcmp(h
->data
.dptr
, data
.dptr
, data
.dsize
) == 0) {
1478 /* No need to do anything */
1482 ctdb_ltdb_header_push(&h
->header
, header
, &np
);
1485 rec
[0].dptr
= header
;
1487 rec
[1].dsize
= data
.dsize
;
1488 rec
[1].dptr
= data
.dptr
;
1490 ret
= tdb_storev(h
->db
->ltdb
->tdb
, h
->key
, rec
, 2, TDB_REPLACE
);
1493 ("store_record: %s tdb_storev failed, %s\n",
1494 h
->db
->db_name
, tdb_errorstr(h
->db
->ltdb
->tdb
)));
1501 struct ctdb_delete_record_state
{
1502 struct ctdb_record_handle
*h
;
1505 static void ctdb_delete_record_done(struct tevent_req
*subreq
);
1507 struct tevent_req
*ctdb_delete_record_send(TALLOC_CTX
*mem_ctx
,
1508 struct tevent_context
*ev
,
1509 struct ctdb_record_handle
*h
)
1511 struct tevent_req
*req
, *subreq
;
1512 struct ctdb_delete_record_state
*state
;
1513 struct ctdb_key_data key
;
1514 struct ctdb_req_control request
;
1515 uint8_t header
[sizeof(struct ctdb_ltdb_header
)];
1520 req
= tevent_req_create(mem_ctx
, &state
,
1521 struct ctdb_delete_record_state
);
1528 /* Cannot delete the record if it was obtained as a readonly copy */
1530 DEBUG(DEBUG_ERR
, ("fetch_lock delete: %s readonly record\n",
1532 tevent_req_error(req
, EINVAL
);
1533 return tevent_req_post(req
, ev
);
1536 ctdb_ltdb_header_push(&h
->header
, header
, &np
);
1541 ret
= tdb_store(h
->db
->ltdb
->tdb
, h
->key
, rec
, TDB_REPLACE
);
1544 ("fetch_lock delete: %s tdb_sore failed, %s\n",
1545 h
->db
->db_name
, tdb_errorstr(h
->db
->ltdb
->tdb
)));
1546 tevent_req_error(req
, EIO
);
1547 return tevent_req_post(req
, ev
);
1550 key
.db_id
= h
->db
->db_id
;
1551 key
.header
= h
->header
;
1554 ctdb_req_control_schedule_for_deletion(&request
, &key
);
1555 subreq
= ctdb_client_control_send(state
, ev
, h
->client
,
1556 ctdb_client_pnn(h
->client
),
1557 tevent_timeval_zero(),
1559 if (tevent_req_nomem(subreq
, req
)) {
1560 return tevent_req_post(req
, ev
);
1562 tevent_req_set_callback(subreq
, ctdb_delete_record_done
, req
);
1567 static void ctdb_delete_record_done(struct tevent_req
*subreq
)
1569 struct tevent_req
*req
= tevent_req_callback_data(
1570 subreq
, struct tevent_req
);
1571 struct ctdb_delete_record_state
*state
= tevent_req_data(
1572 req
, struct ctdb_delete_record_state
);
1576 status
= ctdb_client_control_recv(subreq
, &ret
, NULL
, NULL
);
1577 TALLOC_FREE(subreq
);
1580 ("delete_record: %s SCHDULE_FOR_DELETION failed, "
1581 "ret=%d\n", state
->h
->db
->db_name
, ret
));
1582 tevent_req_error(req
, ret
);
1586 tevent_req_done(req
);
1589 bool ctdb_delete_record_recv(struct tevent_req
*req
, int *perr
)
1593 if (tevent_req_is_unix_error(req
, &err
)) {
1604 int ctdb_delete_record(struct ctdb_record_handle
*h
)
1606 struct tevent_context
*ev
= h
->ev
;
1607 TALLOC_CTX
*mem_ctx
;
1608 struct tevent_req
*req
;
1612 mem_ctx
= talloc_new(NULL
);
1613 if (mem_ctx
== NULL
) {
1617 req
= ctdb_delete_record_send(mem_ctx
, ev
, h
);
1619 talloc_free(mem_ctx
);
1623 tevent_req_poll(req
, ev
);
1625 status
= ctdb_delete_record_recv(req
, &ret
);
1626 talloc_free(mem_ctx
);
1635 * Global lock functions
1638 struct ctdb_g_lock_lock_state
{
1639 struct tevent_context
*ev
;
1640 struct ctdb_client_context
*client
;
1641 struct ctdb_db_context
*db
;
1643 struct ctdb_server_id my_sid
;
1644 enum ctdb_g_lock_type lock_type
;
1645 struct ctdb_record_handle
*h
;
1646 /* state for verification of active locks */
1647 struct ctdb_g_lock_list
*lock_list
;
1648 unsigned int current
;
1651 static void ctdb_g_lock_lock_fetched(struct tevent_req
*subreq
);
1652 static void ctdb_g_lock_lock_process_locks(struct tevent_req
*req
);
1653 static void ctdb_g_lock_lock_checked(struct tevent_req
*subreq
);
1654 static int ctdb_g_lock_lock_update(struct tevent_req
*req
);
1655 static void ctdb_g_lock_lock_retry(struct tevent_req
*subreq
);
1657 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1
,
1658 enum ctdb_g_lock_type l2
)
1660 if ((l1
== CTDB_G_LOCK_READ
) && (l2
== CTDB_G_LOCK_READ
)) {
1666 struct tevent_req
*ctdb_g_lock_lock_send(TALLOC_CTX
*mem_ctx
,
1667 struct tevent_context
*ev
,
1668 struct ctdb_client_context
*client
,
1669 struct ctdb_db_context
*db
,
1670 const char *keyname
,
1671 struct ctdb_server_id
*sid
,
1674 struct tevent_req
*req
, *subreq
;
1675 struct ctdb_g_lock_lock_state
*state
;
1677 req
= tevent_req_create(mem_ctx
, &state
,
1678 struct ctdb_g_lock_lock_state
);
1684 state
->client
= client
;
1686 state
->key
.dptr
= discard_const(keyname
);
1687 state
->key
.dsize
= strlen(keyname
) + 1;
1688 state
->my_sid
= *sid
;
1689 state
->lock_type
= (readonly
? CTDB_G_LOCK_READ
: CTDB_G_LOCK_WRITE
);
1691 subreq
= ctdb_fetch_lock_send(state
, ev
, client
, db
, state
->key
,
1693 if (tevent_req_nomem(subreq
, req
)) {
1694 return tevent_req_post(req
, ev
);
1696 tevent_req_set_callback(subreq
, ctdb_g_lock_lock_fetched
, req
);
1701 static void ctdb_g_lock_lock_fetched(struct tevent_req
*subreq
)
1703 struct tevent_req
*req
= tevent_req_callback_data(
1704 subreq
, struct tevent_req
);
1705 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1706 req
, struct ctdb_g_lock_lock_state
);
1711 state
->h
= ctdb_fetch_lock_recv(subreq
, NULL
, state
, &data
, &ret
);
1712 TALLOC_FREE(subreq
);
1713 if (state
->h
== NULL
) {
1714 DEBUG(DEBUG_ERR
, ("g_lock_lock: %s fetch lock failed\n",
1715 (char *)state
->key
.dptr
));
1716 tevent_req_error(req
, ret
);
1720 if (state
->lock_list
!= NULL
) {
1721 TALLOC_FREE(state
->lock_list
);
1725 ret
= ctdb_g_lock_list_pull(data
.dptr
, data
.dsize
, state
,
1726 &state
->lock_list
, &np
);
1727 talloc_free(data
.dptr
);
1729 DEBUG(DEBUG_ERR
, ("g_lock_lock: %s invalid lock data\n",
1730 (char *)state
->key
.dptr
));
1731 tevent_req_error(req
, ret
);
1735 ctdb_g_lock_lock_process_locks(req
);
1738 static void ctdb_g_lock_lock_process_locks(struct tevent_req
*req
)
1740 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1741 req
, struct ctdb_g_lock_lock_state
);
1742 struct tevent_req
*subreq
;
1743 struct ctdb_g_lock
*lock
;
1744 bool check_server
= false;
1747 while (state
->current
< state
->lock_list
->num
) {
1748 lock
= &state
->lock_list
->lock
[state
->current
];
1750 /* We should not ask for the same lock more than once */
1751 if (ctdb_server_id_equal(&lock
->sid
, &state
->my_sid
)) {
1752 DEBUG(DEBUG_ERR
, ("g_lock_lock: %s deadlock\n",
1753 (char *)state
->key
.dptr
));
1754 tevent_req_error(req
, EDEADLK
);
1758 if (ctdb_g_lock_conflicts(lock
->type
, state
->lock_type
)) {
1759 check_server
= true;
1763 state
->current
+= 1;
1767 struct ctdb_req_control request
;
1769 ctdb_req_control_process_exists(&request
, lock
->sid
.pid
);
1770 subreq
= ctdb_client_control_send(state
, state
->ev
,
1773 tevent_timeval_zero(),
1775 if (tevent_req_nomem(subreq
, req
)) {
1778 tevent_req_set_callback(subreq
, ctdb_g_lock_lock_checked
, req
);
1782 /* There is no conflict, add ourself to the lock_list */
1783 state
->lock_list
->lock
= talloc_realloc(state
->lock_list
,
1784 state
->lock_list
->lock
,
1786 state
->lock_list
->num
+ 1);
1787 if (state
->lock_list
->lock
== NULL
) {
1788 tevent_req_error(req
, ENOMEM
);
1792 lock
= &state
->lock_list
->lock
[state
->lock_list
->num
];
1793 lock
->type
= state
->lock_type
;
1794 lock
->sid
= state
->my_sid
;
1795 state
->lock_list
->num
+= 1;
1797 ret
= ctdb_g_lock_lock_update(req
);
1799 tevent_req_error(req
, ret
);
1803 TALLOC_FREE(state
->h
);
1804 tevent_req_done(req
);
1807 static void ctdb_g_lock_lock_checked(struct tevent_req
*subreq
)
1809 struct tevent_req
*req
= tevent_req_callback_data(
1810 subreq
, struct tevent_req
);
1811 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1812 req
, struct ctdb_g_lock_lock_state
);
1813 struct ctdb_reply_control
*reply
;
1817 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
1818 TALLOC_FREE(subreq
);
1821 ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n",
1822 (char *)state
->key
.dptr
, ret
));
1823 tevent_req_error(req
, ret
);
1827 ret
= ctdb_reply_control_process_exists(reply
, &value
);
1829 tevent_req_error(req
, ret
);
1835 /* server process exists, need to retry */
1836 TALLOC_FREE(state
->h
);
1837 subreq
= tevent_wakeup_send(state
, state
->ev
,
1838 tevent_timeval_current_ofs(0,1000));
1839 if (tevent_req_nomem(subreq
, req
)) {
1842 tevent_req_set_callback(subreq
, ctdb_g_lock_lock_retry
, req
);
1846 /* server process does not exist, remove conflicting entry */
1847 state
->lock_list
->lock
[state
->current
] =
1848 state
->lock_list
->lock
[state
->lock_list
->num
-1];
1849 state
->lock_list
->num
-= 1;
1851 ret
= ctdb_g_lock_lock_update(req
);
1853 tevent_req_error(req
, ret
);
1857 ctdb_g_lock_lock_process_locks(req
);
1860 static int ctdb_g_lock_lock_update(struct tevent_req
*req
)
1862 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1863 req
, struct ctdb_g_lock_lock_state
);
1868 data
.dsize
= ctdb_g_lock_list_len(state
->lock_list
);
1869 data
.dptr
= talloc_size(state
, data
.dsize
);
1870 if (data
.dptr
== NULL
) {
1874 ctdb_g_lock_list_push(state
->lock_list
, data
.dptr
, &np
);
1875 ret
= ctdb_store_record(state
->h
, data
);
1876 talloc_free(data
.dptr
);
1880 static void ctdb_g_lock_lock_retry(struct tevent_req
*subreq
)
1882 struct tevent_req
*req
= tevent_req_callback_data(
1883 subreq
, struct tevent_req
);
1884 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1885 req
, struct ctdb_g_lock_lock_state
);
1888 success
= tevent_wakeup_recv(subreq
);
1889 TALLOC_FREE(subreq
);
1891 tevent_req_error(req
, ENOMEM
);
1895 subreq
= ctdb_fetch_lock_send(state
, state
->ev
, state
->client
,
1896 state
->db
, state
->key
, false);
1897 if (tevent_req_nomem(subreq
, req
)) {
1900 tevent_req_set_callback(subreq
, ctdb_g_lock_lock_fetched
, req
);
1903 bool ctdb_g_lock_lock_recv(struct tevent_req
*req
, int *perr
)
1905 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1906 req
, struct ctdb_g_lock_lock_state
);
1909 TALLOC_FREE(state
->h
);
1911 if (tevent_req_is_unix_error(req
, &err
)) {
1921 struct ctdb_g_lock_unlock_state
{
1922 struct tevent_context
*ev
;
1923 struct ctdb_client_context
*client
;
1924 struct ctdb_db_context
*db
;
1926 struct ctdb_server_id my_sid
;
1927 struct ctdb_record_handle
*h
;
1928 struct ctdb_g_lock_list
*lock_list
;
1931 static void ctdb_g_lock_unlock_fetched(struct tevent_req
*subreq
);
1932 static int ctdb_g_lock_unlock_update(struct tevent_req
*req
);
1933 static void ctdb_g_lock_unlock_deleted(struct tevent_req
*subreq
);
1935 struct tevent_req
*ctdb_g_lock_unlock_send(TALLOC_CTX
*mem_ctx
,
1936 struct tevent_context
*ev
,
1937 struct ctdb_client_context
*client
,
1938 struct ctdb_db_context
*db
,
1939 const char *keyname
,
1940 struct ctdb_server_id sid
)
1942 struct tevent_req
*req
, *subreq
;
1943 struct ctdb_g_lock_unlock_state
*state
;
1945 req
= tevent_req_create(mem_ctx
, &state
,
1946 struct ctdb_g_lock_unlock_state
);
1952 state
->client
= client
;
1954 state
->key
.dptr
= discard_const(keyname
);
1955 state
->key
.dsize
= strlen(keyname
) + 1;
1956 state
->my_sid
= sid
;
1958 subreq
= ctdb_fetch_lock_send(state
, ev
, client
, db
, state
->key
,
1960 if (tevent_req_nomem(subreq
, req
)) {
1961 return tevent_req_post(req
, ev
);
1963 tevent_req_set_callback(subreq
, ctdb_g_lock_unlock_fetched
, req
);
1968 static void ctdb_g_lock_unlock_fetched(struct tevent_req
*subreq
)
1970 struct tevent_req
*req
= tevent_req_callback_data(
1971 subreq
, struct tevent_req
);
1972 struct ctdb_g_lock_unlock_state
*state
= tevent_req_data(
1973 req
, struct ctdb_g_lock_unlock_state
);
1978 state
->h
= ctdb_fetch_lock_recv(subreq
, NULL
, state
, &data
, &ret
);
1979 TALLOC_FREE(subreq
);
1980 if (state
->h
== NULL
) {
1981 DEBUG(DEBUG_ERR
, ("g_lock_unlock: %s fetch lock failed\n",
1982 (char *)state
->key
.dptr
));
1983 tevent_req_error(req
, ret
);
1987 ret
= ctdb_g_lock_list_pull(data
.dptr
, data
.dsize
, state
,
1988 &state
->lock_list
, &np
);
1990 DEBUG(DEBUG_ERR
, ("g_lock_unlock: %s invalid lock data\n",
1991 (char *)state
->key
.dptr
));
1992 tevent_req_error(req
, ret
);
1996 ret
= ctdb_g_lock_unlock_update(req
);
1998 tevent_req_error(req
, ret
);
2002 if (state
->lock_list
->num
== 0) {
2003 subreq
= ctdb_delete_record_send(state
, state
->ev
, state
->h
);
2004 if (tevent_req_nomem(subreq
, req
)) {
2007 tevent_req_set_callback(subreq
, ctdb_g_lock_unlock_deleted
,
2012 TALLOC_FREE(state
->h
);
2013 tevent_req_done(req
);
2016 static int ctdb_g_lock_unlock_update(struct tevent_req
*req
)
2018 struct ctdb_g_lock_unlock_state
*state
= tevent_req_data(
2019 req
, struct ctdb_g_lock_unlock_state
);
2020 struct ctdb_g_lock
*lock
;
2023 for (i
=0; i
<state
->lock_list
->num
; i
++) {
2024 lock
= &state
->lock_list
->lock
[i
];
2026 if (ctdb_server_id_equal(&lock
->sid
, &state
->my_sid
)) {
2031 if (i
< state
->lock_list
->num
) {
2032 state
->lock_list
->lock
[i
] =
2033 state
->lock_list
->lock
[state
->lock_list
->num
-1];
2034 state
->lock_list
->num
-= 1;
2037 if (state
->lock_list
->num
!= 0) {
2041 data
.dsize
= ctdb_g_lock_list_len(state
->lock_list
);
2042 data
.dptr
= talloc_size(state
, data
.dsize
);
2043 if (data
.dptr
== NULL
) {
2047 ctdb_g_lock_list_push(state
->lock_list
, data
.dptr
, &np
);
2048 ret
= ctdb_store_record(state
->h
, data
);
2049 talloc_free(data
.dptr
);
2058 static void ctdb_g_lock_unlock_deleted(struct tevent_req
*subreq
)
2060 struct tevent_req
*req
= tevent_req_callback_data(
2061 subreq
, struct tevent_req
);
2062 struct ctdb_g_lock_unlock_state
*state
= tevent_req_data(
2063 req
, struct ctdb_g_lock_unlock_state
);
2067 status
= ctdb_delete_record_recv(subreq
, &ret
);
2070 ("g_lock_unlock %s delete record failed, ret=%d\n",
2071 (char *)state
->key
.dptr
, ret
));
2072 tevent_req_error(req
, ret
);
2076 TALLOC_FREE(state
->h
);
2077 tevent_req_done(req
);
2080 bool ctdb_g_lock_unlock_recv(struct tevent_req
*req
, int *perr
)
2082 struct ctdb_g_lock_unlock_state
*state
= tevent_req_data(
2083 req
, struct ctdb_g_lock_unlock_state
);
2086 TALLOC_FREE(state
->h
);
2088 if (tevent_req_is_unix_error(req
, &err
)) {
2099 * Persistent database functions
2101 struct ctdb_transaction_start_state
{
2102 struct tevent_context
*ev
;
2103 struct ctdb_client_context
*client
;
2104 struct timeval timeout
;
2105 struct ctdb_transaction_handle
*h
;
2109 static void ctdb_transaction_g_lock_attached(struct tevent_req
*subreq
);
2110 static void ctdb_transaction_g_lock_done(struct tevent_req
*subreq
);
2112 struct tevent_req
*ctdb_transaction_start_send(TALLOC_CTX
*mem_ctx
,
2113 struct tevent_context
*ev
,
2114 struct ctdb_client_context
*client
,
2115 struct timeval timeout
,
2116 struct ctdb_db_context
*db
,
2119 struct ctdb_transaction_start_state
*state
;
2120 struct tevent_req
*req
, *subreq
;
2121 struct ctdb_transaction_handle
*h
;
2123 req
= tevent_req_create(mem_ctx
, &state
,
2124 struct ctdb_transaction_start_state
);
2129 if (ctdb_db_volatile(db
)) {
2130 tevent_req_error(req
, EINVAL
);
2131 return tevent_req_post(req
, ev
);
2135 state
->client
= client
;
2136 state
->destnode
= ctdb_client_pnn(client
);
2138 h
= talloc_zero(db
, struct ctdb_transaction_handle
);
2139 if (tevent_req_nomem(h
, req
)) {
2140 return tevent_req_post(req
, ev
);
2146 h
->readonly
= readonly
;
2149 /* SRVID is unique for databases, so client can have transactions
2150 * active for multiple databases */
2151 h
->sid
= ctdb_client_get_server_id(client
, db
->db_id
);
2153 h
->recbuf
= ctdb_rec_buffer_init(h
, db
->db_id
);
2154 if (tevent_req_nomem(h
->recbuf
, req
)) {
2155 return tevent_req_post(req
, ev
);
2158 h
->lock_name
= talloc_asprintf(h
, "transaction_db_0x%08x", db
->db_id
);
2159 if (tevent_req_nomem(h
->lock_name
, req
)) {
2160 return tevent_req_post(req
, ev
);
2165 subreq
= ctdb_attach_send(state
, ev
, client
, timeout
, "g_lock.tdb", 0);
2166 if (tevent_req_nomem(subreq
, req
)) {
2167 return tevent_req_post(req
, ev
);
2169 tevent_req_set_callback(subreq
, ctdb_transaction_g_lock_attached
, req
);
2174 static void ctdb_transaction_g_lock_attached(struct tevent_req
*subreq
)
2176 struct tevent_req
*req
= tevent_req_callback_data(
2177 subreq
, struct tevent_req
);
2178 struct ctdb_transaction_start_state
*state
= tevent_req_data(
2179 req
, struct ctdb_transaction_start_state
);
2183 status
= ctdb_attach_recv(subreq
, &ret
, &state
->h
->db_g_lock
);
2184 TALLOC_FREE(subreq
);
2187 ("transaction_start: %s attach g_lock.tdb failed\n",
2188 state
->h
->db
->db_name
));
2189 tevent_req_error(req
, ret
);
2193 subreq
= ctdb_g_lock_lock_send(state
, state
->ev
, state
->client
,
2194 state
->h
->db_g_lock
,
2195 state
->h
->lock_name
,
2196 &state
->h
->sid
, state
->h
->readonly
);
2197 if (tevent_req_nomem(subreq
, req
)) {
2200 tevent_req_set_callback(subreq
, ctdb_transaction_g_lock_done
, req
);
2203 static void ctdb_transaction_g_lock_done(struct tevent_req
*subreq
)
2205 struct tevent_req
*req
= tevent_req_callback_data(
2206 subreq
, struct tevent_req
);
2207 struct ctdb_transaction_start_state
*state
= tevent_req_data(
2208 req
, struct ctdb_transaction_start_state
);
2212 status
= ctdb_g_lock_lock_recv(subreq
, &ret
);
2213 TALLOC_FREE(subreq
);
2216 ("transaction_start: %s g_lock lock failed, ret=%d\n",
2217 state
->h
->db
->db_name
, ret
));
2218 tevent_req_error(req
, ret
);
2222 tevent_req_done(req
);
2225 struct ctdb_transaction_handle
*ctdb_transaction_start_recv(
2226 struct tevent_req
*req
,
2229 struct ctdb_transaction_start_state
*state
= tevent_req_data(
2230 req
, struct ctdb_transaction_start_state
);
2233 if (tevent_req_is_unix_error(req
, &err
)) {
2243 int ctdb_transaction_start(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
2244 struct ctdb_client_context
*client
,
2245 struct timeval timeout
,
2246 struct ctdb_db_context
*db
, bool readonly
,
2247 struct ctdb_transaction_handle
**out
)
2249 struct tevent_req
*req
;
2250 struct ctdb_transaction_handle
*h
;
2253 req
= ctdb_transaction_start_send(mem_ctx
, ev
, client
, timeout
, db
,
2259 tevent_req_poll(req
, ev
);
2261 h
= ctdb_transaction_start_recv(req
, &ret
);
2270 struct ctdb_transaction_record_fetch_state
{
2272 struct ctdb_ltdb_header header
;
2276 static int ctdb_transaction_record_fetch_traverse(
2278 struct ctdb_ltdb_header
*nullheader
,
2279 TDB_DATA key
, TDB_DATA data
,
2282 struct ctdb_transaction_record_fetch_state
*state
=
2283 (struct ctdb_transaction_record_fetch_state
*)private_data
;
2285 if (state
->key
.dsize
== key
.dsize
&&
2286 memcmp(state
->key
.dptr
, key
.dptr
, key
.dsize
) == 0) {
2289 ret
= ctdb_ltdb_header_extract(&data
, &state
->header
);
2292 ("record_fetch: Failed to extract header, "
2298 state
->found
= true;
2304 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle
*h
,
2306 struct ctdb_ltdb_header
*header
,
2309 struct ctdb_transaction_record_fetch_state state
;
2313 state
.found
= false;
2315 ret
= ctdb_rec_buffer_traverse(h
->recbuf
,
2316 ctdb_transaction_record_fetch_traverse
,
2323 if (header
!= NULL
) {
2324 *header
= state
.header
;
2335 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle
*h
,
2337 TALLOC_CTX
*mem_ctx
, TDB_DATA
*data
)
2340 struct ctdb_ltdb_header header
;
2343 ret
= ctdb_transaction_record_fetch(h
, key
, NULL
, &tmp_data
);
2345 data
->dptr
= talloc_memdup(mem_ctx
, tmp_data
.dptr
,
2347 if (data
->dptr
== NULL
) {
2350 data
->dsize
= tmp_data
.dsize
;
2354 ret
= ctdb_ltdb_fetch(h
->db
, key
, &header
, mem_ctx
, data
);
2359 ret
= ctdb_rec_buffer_add(h
, h
->recbuf
, 0, &header
, key
, *data
);
2367 int ctdb_transaction_store_record(struct ctdb_transaction_handle
*h
,
2368 TDB_DATA key
, TDB_DATA data
)
2370 TALLOC_CTX
*tmp_ctx
;
2371 struct ctdb_ltdb_header header
;
2379 tmp_ctx
= talloc_new(h
);
2380 if (tmp_ctx
== NULL
) {
2384 ret
= ctdb_transaction_record_fetch(h
, key
, &header
, &old_data
);
2386 ret
= ctdb_ltdb_fetch(h
->db
, key
, &header
, tmp_ctx
, &old_data
);
2392 if (old_data
.dsize
== data
.dsize
&&
2393 memcmp(old_data
.dptr
, data
.dptr
, data
.dsize
) == 0) {
2394 talloc_free(tmp_ctx
);
2398 header
.dmaster
= ctdb_client_pnn(h
->client
);
2401 ret
= ctdb_rec_buffer_add(h
, h
->recbuf
, 0, &header
, key
, data
);
2402 talloc_free(tmp_ctx
);
2411 int ctdb_transaction_delete_record(struct ctdb_transaction_handle
*h
,
2414 return ctdb_transaction_store_record(h
, key
, tdb_null
);
2417 static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle
*h
,
2420 const char *keyname
= CTDB_DB_SEQNUM_KEY
;
2422 struct ctdb_ltdb_header header
;
2425 key
.dptr
= discard_const(keyname
);
2426 key
.dsize
= strlen(keyname
) + 1;
2428 ret
= ctdb_ltdb_fetch(h
->db
, key
, &header
, h
, &data
);
2431 ("transaction_commit: %s seqnum fetch failed, ret=%d\n",
2432 h
->db
->db_name
, ret
));
2436 if (data
.dsize
== 0) {
2442 if (data
.dsize
!= sizeof(uint64_t)) {
2443 talloc_free(data
.dptr
);
2447 *seqnum
= *(uint64_t *)data
.dptr
;
2449 talloc_free(data
.dptr
);
2453 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle
*h
,
2456 const char *keyname
= CTDB_DB_SEQNUM_KEY
;
2459 key
.dptr
= discard_const(keyname
);
2460 key
.dsize
= strlen(keyname
) + 1;
2462 data
.dptr
= (uint8_t *)&seqnum
;
2463 data
.dsize
= sizeof(seqnum
);
2465 return ctdb_transaction_store_record(h
, key
, data
);
2468 struct ctdb_transaction_commit_state
{
2469 struct tevent_context
*ev
;
2470 struct timeval timeout
;
2471 struct ctdb_transaction_handle
*h
;
2475 static void ctdb_transaction_commit_done(struct tevent_req
*subreq
);
2476 static void ctdb_transaction_commit_g_lock_done(struct tevent_req
*subreq
);
2478 struct tevent_req
*ctdb_transaction_commit_send(
2479 TALLOC_CTX
*mem_ctx
,
2480 struct tevent_context
*ev
,
2481 struct timeval timeout
,
2482 struct ctdb_transaction_handle
*h
)
2484 struct tevent_req
*req
, *subreq
;
2485 struct ctdb_transaction_commit_state
*state
;
2486 struct ctdb_req_control request
;
2489 req
= tevent_req_create(mem_ctx
, &state
,
2490 struct ctdb_transaction_commit_state
);
2496 state
->timeout
= timeout
;
2499 ret
= ctdb_transaction_fetch_db_seqnum(h
, &state
->seqnum
);
2501 tevent_req_error(req
, ret
);
2502 return tevent_req_post(req
, ev
);
2505 ret
= ctdb_transaction_store_db_seqnum(h
, state
->seqnum
+1);
2507 tevent_req_error(req
, ret
);
2508 return tevent_req_post(req
, ev
);
2511 ctdb_req_control_trans3_commit(&request
, h
->recbuf
);
2512 subreq
= ctdb_client_control_send(state
, ev
, h
->client
,
2513 ctdb_client_pnn(h
->client
),
2515 if (tevent_req_nomem(subreq
, req
)) {
2516 return tevent_req_post(req
, ev
);
2518 tevent_req_set_callback(subreq
, ctdb_transaction_commit_done
, req
);
2523 static void ctdb_transaction_commit_done(struct tevent_req
*subreq
)
2525 struct tevent_req
*req
= tevent_req_callback_data(
2526 subreq
, struct tevent_req
);
2527 struct ctdb_transaction_commit_state
*state
= tevent_req_data(
2528 req
, struct ctdb_transaction_commit_state
);
2529 struct ctdb_transaction_handle
*h
= state
->h
;
2530 struct ctdb_reply_control
*reply
;
2535 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2536 TALLOC_FREE(subreq
);
2539 ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n",
2540 h
->db
->db_name
, ret
));
2541 tevent_req_error(req
, ret
);
2545 ret
= ctdb_reply_control_trans3_commit(reply
);
2549 /* Control failed due to recovery */
2551 ret
= ctdb_transaction_fetch_db_seqnum(h
, &seqnum
);
2553 tevent_req_error(req
, ret
);
2557 if (seqnum
== state
->seqnum
) {
2558 struct ctdb_req_control request
;
2561 ctdb_req_control_trans3_commit(&request
,
2563 subreq
= ctdb_client_control_send(
2564 state
, state
->ev
, state
->h
->client
,
2565 ctdb_client_pnn(state
->h
->client
),
2566 state
->timeout
, &request
);
2567 if (tevent_req_nomem(subreq
, req
)) {
2570 tevent_req_set_callback(subreq
,
2571 ctdb_transaction_commit_done
,
2576 if (seqnum
!= state
->seqnum
+ 1) {
2578 ("transaction_commit: %s seqnum mismatch "
2579 "0x%"PRIx64
" != 0x%"PRIx64
" + 1\n",
2580 state
->h
->db
->db_name
, seqnum
, state
->seqnum
));
2581 tevent_req_error(req
, EIO
);
2586 /* trans3_commit successful */
2587 subreq
= ctdb_g_lock_unlock_send(state
, state
->ev
, h
->client
,
2588 h
->db_g_lock
, h
->lock_name
, h
->sid
);
2589 if (tevent_req_nomem(subreq
, req
)) {
2592 tevent_req_set_callback(subreq
, ctdb_transaction_commit_g_lock_done
,
2596 static void ctdb_transaction_commit_g_lock_done(struct tevent_req
*subreq
)
2598 struct tevent_req
*req
= tevent_req_callback_data(
2599 subreq
, struct tevent_req
);
2600 struct ctdb_transaction_commit_state
*state
= tevent_req_data(
2601 req
, struct ctdb_transaction_commit_state
);
2605 status
= ctdb_g_lock_unlock_recv(subreq
, &ret
);
2606 TALLOC_FREE(subreq
);
2609 ("transaction_commit: %s g_lock unlock failed, ret=%d\n",
2610 state
->h
->db
->db_name
, ret
));
2611 tevent_req_error(req
, ret
);
2615 talloc_free(state
->h
);
2616 tevent_req_done(req
);
2619 bool ctdb_transaction_commit_recv(struct tevent_req
*req
, int *perr
)
2623 if (tevent_req_is_unix_error(req
, &err
)) {
2633 int ctdb_transaction_commit(struct ctdb_transaction_handle
*h
)
2635 struct tevent_context
*ev
= h
->ev
;
2636 TALLOC_CTX
*mem_ctx
;
2637 struct tevent_req
*req
;
2641 if (h
->readonly
|| ! h
->updated
) {
2642 return ctdb_transaction_cancel(h
);
2645 mem_ctx
= talloc_new(NULL
);
2646 if (mem_ctx
== NULL
) {
2650 req
= ctdb_transaction_commit_send(mem_ctx
, ev
,
2651 tevent_timeval_zero(), h
);
2653 talloc_free(mem_ctx
);
2657 tevent_req_poll(req
, ev
);
2659 status
= ctdb_transaction_commit_recv(req
, &ret
);
2661 talloc_free(mem_ctx
);
2665 talloc_free(mem_ctx
);
2669 struct ctdb_transaction_cancel_state
{
2670 struct tevent_context
*ev
;
2671 struct ctdb_transaction_handle
*h
;
2672 struct timeval timeout
;
2675 static void ctdb_transaction_cancel_done(struct tevent_req
*subreq
);
2677 struct tevent_req
*ctdb_transaction_cancel_send(
2678 TALLOC_CTX
*mem_ctx
,
2679 struct tevent_context
*ev
,
2680 struct timeval timeout
,
2681 struct ctdb_transaction_handle
*h
)
2683 struct tevent_req
*req
, *subreq
;
2684 struct ctdb_transaction_cancel_state
*state
;
2686 req
= tevent_req_create(mem_ctx
, &state
,
2687 struct ctdb_transaction_cancel_state
);
2694 state
->timeout
= timeout
;
2696 subreq
= ctdb_g_lock_unlock_send(state
, state
->ev
, state
->h
->client
,
2697 state
->h
->db_g_lock
,
2698 state
->h
->lock_name
, state
->h
->sid
);
2699 if (tevent_req_nomem(subreq
, req
)) {
2700 return tevent_req_post(req
, ev
);
2702 tevent_req_set_callback(subreq
, ctdb_transaction_cancel_done
,
2708 static void ctdb_transaction_cancel_done(struct tevent_req
*subreq
)
2710 struct tevent_req
*req
= tevent_req_callback_data(
2711 subreq
, struct tevent_req
);
2712 struct ctdb_transaction_cancel_state
*state
= tevent_req_data(
2713 req
, struct ctdb_transaction_cancel_state
);
2717 status
= ctdb_g_lock_unlock_recv(subreq
, &ret
);
2718 TALLOC_FREE(subreq
);
2721 ("transaction_cancel: %s g_lock unlock failed, ret=%d\n",
2722 state
->h
->db
->db_name
, ret
));
2723 talloc_free(state
->h
);
2724 tevent_req_error(req
, ret
);
2728 talloc_free(state
->h
);
2729 tevent_req_done(req
);
2732 bool ctdb_transaction_cancel_recv(struct tevent_req
*req
, int *perr
)
2736 if (tevent_req_is_unix_error(req
, &err
)) {
2746 int ctdb_transaction_cancel(struct ctdb_transaction_handle
*h
)
2748 struct tevent_context
*ev
= h
->ev
;
2749 struct tevent_req
*req
;
2750 TALLOC_CTX
*mem_ctx
;
2754 mem_ctx
= talloc_new(NULL
);
2755 if (mem_ctx
== NULL
) {
2760 req
= ctdb_transaction_cancel_send(mem_ctx
, ev
,
2761 tevent_timeval_zero(), h
);
2763 talloc_free(mem_ctx
);
2768 tevent_req_poll(req
, ev
);
2770 status
= ctdb_transaction_cancel_recv(req
, &ret
);
2772 talloc_free(mem_ctx
);
2776 talloc_free(mem_ctx
);
2783 * In future Samba should register SERVER_ID.
2784 * Make that structure same as struct srvid {}.