4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
28 #include "common/logging.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
40 static struct ctdb_db_context
*client_db_handle(
41 struct ctdb_client_context
*client
,
44 struct ctdb_db_context
*db
;
46 for (db
= client
->db
; db
!= NULL
; db
= db
->next
) {
47 if (strcmp(db_name
, db
->db_name
) == 0) {
55 static bool ctdb_db_persistent(struct ctdb_db_context
*db
)
57 if (db
->db_flags
& CTDB_DB_FLAGS_PERSISTENT
) {
63 static bool ctdb_db_replicated(struct ctdb_db_context
*db
)
65 if (db
->db_flags
& CTDB_DB_FLAGS_REPLICATED
) {
71 static bool ctdb_db_volatile(struct ctdb_db_context
*db
)
73 if (db
->db_flags
& CTDB_DB_FLAGS_PERSISTENT
||
74 db
->db_flags
& CTDB_DB_FLAGS_REPLICATED
) {
80 struct ctdb_set_db_flags_state
{
81 struct tevent_context
*ev
;
82 struct ctdb_client_context
*client
;
83 struct timeval timeout
;
86 bool readonly_done
, sticky_done
;
91 static void ctdb_set_db_flags_nodemap_done(struct tevent_req
*subreq
);
92 static void ctdb_set_db_flags_readonly_done(struct tevent_req
*subreq
);
93 static void ctdb_set_db_flags_sticky_done(struct tevent_req
*subreq
);
95 static struct tevent_req
*ctdb_set_db_flags_send(
97 struct tevent_context
*ev
,
98 struct ctdb_client_context
*client
,
99 uint32_t destnode
, struct timeval timeout
,
100 uint32_t db_id
, uint8_t db_flags
)
102 struct tevent_req
*req
, *subreq
;
103 struct ctdb_set_db_flags_state
*state
;
104 struct ctdb_req_control request
;
106 req
= tevent_req_create(mem_ctx
, &state
,
107 struct ctdb_set_db_flags_state
);
112 if (! (db_flags
& (CTDB_DB_FLAGS_READONLY
| CTDB_DB_FLAGS_STICKY
))) {
113 tevent_req_done(req
);
114 return tevent_req_post(req
, ev
);
118 state
->client
= client
;
119 state
->timeout
= timeout
;
120 state
->db_id
= db_id
;
121 state
->db_flags
= db_flags
;
123 ctdb_req_control_get_nodemap(&request
);
124 subreq
= ctdb_client_control_send(state
, ev
, client
, destnode
, timeout
,
126 if (tevent_req_nomem(subreq
, req
)) {
127 return tevent_req_post(req
, ev
);
129 tevent_req_set_callback(subreq
, ctdb_set_db_flags_nodemap_done
, req
);
134 static void ctdb_set_db_flags_nodemap_done(struct tevent_req
*subreq
)
136 struct tevent_req
*req
= tevent_req_callback_data(
137 subreq
, struct tevent_req
);
138 struct ctdb_set_db_flags_state
*state
= tevent_req_data(
139 req
, struct ctdb_set_db_flags_state
);
140 struct ctdb_req_control request
;
141 struct ctdb_reply_control
*reply
;
142 struct ctdb_node_map
*nodemap
;
146 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
150 ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n",
152 tevent_req_error(req
, ret
);
156 ret
= ctdb_reply_control_get_nodemap(reply
, state
, &nodemap
);
160 ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n",
162 tevent_req_error(req
, ret
);
166 state
->count
= list_of_connected_nodes(nodemap
, CTDB_UNKNOWN_PNN
,
167 state
, &state
->pnn_list
);
168 talloc_free(nodemap
);
169 if (state
->count
<= 0) {
171 ("set_db_flags: 0x%08x no connected nodes, count=%d\n",
172 state
->db_id
, state
->count
));
173 tevent_req_error(req
, ENOMEM
);
177 if (state
->db_flags
& CTDB_DB_FLAGS_READONLY
) {
178 ctdb_req_control_set_db_readonly(&request
, state
->db_id
);
179 subreq
= ctdb_client_control_multi_send(
180 state
, state
->ev
, state
->client
,
181 state
->pnn_list
, state
->count
,
182 state
->timeout
, &request
);
183 if (tevent_req_nomem(subreq
, req
)) {
186 tevent_req_set_callback(subreq
,
187 ctdb_set_db_flags_readonly_done
, req
);
189 state
->readonly_done
= true;
192 if (state
->db_flags
& CTDB_DB_FLAGS_STICKY
) {
193 ctdb_req_control_set_db_sticky(&request
, state
->db_id
);
194 subreq
= ctdb_client_control_multi_send(
195 state
, state
->ev
, state
->client
,
196 state
->pnn_list
, state
->count
,
197 state
->timeout
, &request
);
198 if (tevent_req_nomem(subreq
, req
)) {
201 tevent_req_set_callback(subreq
, ctdb_set_db_flags_sticky_done
,
204 state
->sticky_done
= true;
208 static void ctdb_set_db_flags_readonly_done(struct tevent_req
*subreq
)
210 struct tevent_req
*req
= tevent_req_callback_data(
211 subreq
, struct tevent_req
);
212 struct ctdb_set_db_flags_state
*state
= tevent_req_data(
213 req
, struct ctdb_set_db_flags_state
);
217 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, NULL
,
222 ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n",
224 tevent_req_error(req
, ret
);
228 state
->readonly_done
= true;
230 if (state
->readonly_done
&& state
->sticky_done
) {
231 tevent_req_done(req
);
235 static void ctdb_set_db_flags_sticky_done(struct tevent_req
*subreq
)
237 struct tevent_req
*req
= tevent_req_callback_data(
238 subreq
, struct tevent_req
);
239 struct ctdb_set_db_flags_state
*state
= tevent_req_data(
240 req
, struct ctdb_set_db_flags_state
);
244 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, NULL
,
249 ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n",
251 tevent_req_error(req
, ret
);
255 state
->sticky_done
= true;
257 if (state
->readonly_done
&& state
->sticky_done
) {
258 tevent_req_done(req
);
262 static bool ctdb_set_db_flags_recv(struct tevent_req
*req
, int *perr
)
266 if (tevent_req_is_unix_error(req
, &err
)) {
275 struct ctdb_attach_state
{
276 struct tevent_context
*ev
;
277 struct ctdb_client_context
*client
;
278 struct timeval timeout
;
281 struct ctdb_db_context
*db
;
284 static void ctdb_attach_dbid_done(struct tevent_req
*subreq
);
285 static void ctdb_attach_dbpath_done(struct tevent_req
*subreq
);
286 static void ctdb_attach_health_done(struct tevent_req
*subreq
);
287 static void ctdb_attach_flags_done(struct tevent_req
*subreq
);
288 static void ctdb_attach_open_flags_done(struct tevent_req
*subreq
);
290 struct tevent_req
*ctdb_attach_send(TALLOC_CTX
*mem_ctx
,
291 struct tevent_context
*ev
,
292 struct ctdb_client_context
*client
,
293 struct timeval timeout
,
294 const char *db_name
, uint8_t db_flags
)
296 struct tevent_req
*req
, *subreq
;
297 struct ctdb_attach_state
*state
;
298 struct ctdb_req_control request
;
300 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_attach_state
);
305 state
->db
= client_db_handle(client
, db_name
);
306 if (state
->db
!= NULL
) {
307 tevent_req_done(req
);
308 return tevent_req_post(req
, ev
);
312 state
->client
= client
;
313 state
->timeout
= timeout
;
314 state
->destnode
= ctdb_client_pnn(client
);
315 state
->db_flags
= db_flags
;
317 state
->db
= talloc_zero(client
, struct ctdb_db_context
);
318 if (tevent_req_nomem(state
->db
, req
)) {
319 return tevent_req_post(req
, ev
);
322 state
->db
->db_name
= talloc_strdup(state
->db
, db_name
);
323 if (tevent_req_nomem(state
->db
, req
)) {
324 return tevent_req_post(req
, ev
);
327 state
->db
->db_flags
= db_flags
;
329 if (ctdb_db_persistent(state
->db
)) {
330 ctdb_req_control_db_attach_persistent(&request
,
332 } else if (ctdb_db_replicated(state
->db
)) {
333 ctdb_req_control_db_attach_replicated(&request
,
336 ctdb_req_control_db_attach(&request
, state
->db
->db_name
);
339 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
340 state
->destnode
, state
->timeout
,
342 if (tevent_req_nomem(subreq
, req
)) {
343 return tevent_req_post(req
, ev
);
345 tevent_req_set_callback(subreq
, ctdb_attach_dbid_done
, req
);
350 static void ctdb_attach_dbid_done(struct tevent_req
*subreq
)
352 struct tevent_req
*req
= tevent_req_callback_data(
353 subreq
, struct tevent_req
);
354 struct ctdb_attach_state
*state
= tevent_req_data(
355 req
, struct ctdb_attach_state
);
356 struct ctdb_req_control request
;
357 struct ctdb_reply_control
*reply
;
361 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
364 DEBUG(DEBUG_ERR
, ("attach: %s %s failed, ret=%d\n",
366 (ctdb_db_persistent(state
->db
)
367 ? "DB_ATTACH_PERSISTENT"
368 : (ctdb_db_replicated(state
->db
)
369 ? "DB_ATTACH_REPLICATED"
372 tevent_req_error(req
, ret
);
376 if (ctdb_db_persistent(state
->db
)) {
377 ret
= ctdb_reply_control_db_attach_persistent(
378 reply
, &state
->db
->db_id
);
379 } else if (ctdb_db_replicated(state
->db
)) {
380 ret
= ctdb_reply_control_db_attach_replicated(
381 reply
, &state
->db
->db_id
);
383 ret
= ctdb_reply_control_db_attach(reply
, &state
->db
->db_id
);
387 DEBUG(DEBUG_ERR
, ("attach: %s failed to get db_id, ret=%d\n",
388 state
->db
->db_name
, ret
));
389 tevent_req_error(req
, ret
);
393 ctdb_req_control_getdbpath(&request
, state
->db
->db_id
);
394 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
395 state
->destnode
, state
->timeout
,
397 if (tevent_req_nomem(subreq
, req
)) {
400 tevent_req_set_callback(subreq
, ctdb_attach_dbpath_done
, req
);
403 static void ctdb_attach_dbpath_done(struct tevent_req
*subreq
)
405 struct tevent_req
*req
= tevent_req_callback_data(
406 subreq
, struct tevent_req
);
407 struct ctdb_attach_state
*state
= tevent_req_data(
408 req
, struct ctdb_attach_state
);
409 struct ctdb_reply_control
*reply
;
410 struct ctdb_req_control request
;
414 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
417 DEBUG(DEBUG_ERR
, ("attach: %s GETDBPATH failed, ret=%d\n",
418 state
->db
->db_name
, ret
));
419 tevent_req_error(req
, ret
);
423 ret
= ctdb_reply_control_getdbpath(reply
, state
->db
,
424 &state
->db
->db_path
);
427 DEBUG(DEBUG_ERR
, ("attach: %s GETDBPATH parse failed, ret=%d\n",
428 state
->db
->db_name
, ret
));
429 tevent_req_error(req
, ret
);
433 ctdb_req_control_db_get_health(&request
, state
->db
->db_id
);
434 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
435 state
->destnode
, state
->timeout
,
437 if (tevent_req_nomem(subreq
, req
)) {
440 tevent_req_set_callback(subreq
, ctdb_attach_health_done
, req
);
443 static void ctdb_attach_health_done(struct tevent_req
*subreq
)
445 struct tevent_req
*req
= tevent_req_callback_data(
446 subreq
, struct tevent_req
);
447 struct ctdb_attach_state
*state
= tevent_req_data(
448 req
, struct ctdb_attach_state
);
449 struct ctdb_reply_control
*reply
;
454 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
457 DEBUG(DEBUG_ERR
, ("attach: %s DB_GET_HEALTH failed, ret=%d\n",
458 state
->db
->db_name
, ret
));
459 tevent_req_error(req
, ret
);
463 ret
= ctdb_reply_control_db_get_health(reply
, state
, &reason
);
466 ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n",
467 state
->db
->db_name
, ret
));
468 tevent_req_error(req
, ret
);
472 if (reason
!= NULL
) {
473 /* Database unhealthy, avoid attach */
474 DEBUG(DEBUG_ERR
, ("attach: %s database unhealthy (%s)\n",
475 state
->db
->db_name
, reason
));
476 tevent_req_error(req
, EIO
);
480 subreq
= ctdb_set_db_flags_send(state
, state
->ev
, state
->client
,
481 state
->destnode
, state
->timeout
,
482 state
->db
->db_id
, state
->db_flags
);
483 if (tevent_req_nomem(subreq
, req
)) {
486 tevent_req_set_callback(subreq
, ctdb_attach_flags_done
, req
);
489 static void ctdb_attach_flags_done(struct tevent_req
*subreq
)
491 struct tevent_req
*req
= tevent_req_callback_data(
492 subreq
, struct tevent_req
);
493 struct ctdb_attach_state
*state
= tevent_req_data(
494 req
, struct ctdb_attach_state
);
495 struct ctdb_req_control request
;
499 status
= ctdb_set_db_flags_recv(subreq
, &ret
);
502 DEBUG(DEBUG_ERR
, ("attach: %s set db flags 0x%08x failed\n",
503 state
->db
->db_name
, state
->db_flags
));
504 tevent_req_error(req
, ret
);
508 ctdb_req_control_db_open_flags(&request
, state
->db
->db_id
);
509 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
510 state
->destnode
, state
->timeout
,
512 if (tevent_req_nomem(subreq
, req
)) {
515 tevent_req_set_callback(subreq
, ctdb_attach_open_flags_done
, req
);
518 static void ctdb_attach_open_flags_done(struct tevent_req
*subreq
)
520 struct tevent_req
*req
= tevent_req_callback_data(
521 subreq
, struct tevent_req
);
522 struct ctdb_attach_state
*state
= tevent_req_data(
523 req
, struct ctdb_attach_state
);
524 struct ctdb_reply_control
*reply
;
528 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
531 DEBUG(DEBUG_ERR
, ("attach: %s DB_OPEN_FLAGS failed, ret=%d\n",
532 state
->db
->db_name
, ret
));
533 tevent_req_error(req
, ret
);
537 ret
= ctdb_reply_control_db_open_flags(reply
, &tdb_flags
);
540 DEBUG(DEBUG_ERR
, ("attach: %s DB_OPEN_FLAGS parse failed,"
541 " ret=%d\n", state
->db
->db_name
, ret
));
542 tevent_req_error(req
, ret
);
546 state
->db
->ltdb
= tdb_wrap_open(state
->db
, state
->db
->db_path
, 0,
547 tdb_flags
, O_RDWR
, 0);
548 if (tevent_req_nomem(state
->db
->ltdb
, req
)) {
549 DEBUG(DEBUG_ERR
, ("attach: %s tdb_wrap_open failed\n",
550 state
->db
->db_name
));
553 DLIST_ADD(state
->client
->db
, state
->db
);
555 tevent_req_done(req
);
558 bool ctdb_attach_recv(struct tevent_req
*req
, int *perr
,
559 struct ctdb_db_context
**out
)
561 struct ctdb_attach_state
*state
= tevent_req_data(
562 req
, struct ctdb_attach_state
);
565 if (tevent_req_is_unix_error(req
, &err
)) {
578 int ctdb_attach(struct tevent_context
*ev
,
579 struct ctdb_client_context
*client
,
580 struct timeval timeout
,
581 const char *db_name
, uint8_t db_flags
,
582 struct ctdb_db_context
**out
)
585 struct tevent_req
*req
;
589 mem_ctx
= talloc_new(client
);
590 if (mem_ctx
== NULL
) {
594 req
= ctdb_attach_send(mem_ctx
, ev
, client
, timeout
,
597 talloc_free(mem_ctx
);
601 tevent_req_poll(req
, ev
);
603 status
= ctdb_attach_recv(req
, &ret
, out
);
605 talloc_free(mem_ctx
);
610 ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
611 ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
612 ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
615 talloc_free(mem_ctx
);
619 struct ctdb_detach_state
{
620 struct ctdb_client_context
*client
;
621 struct tevent_context
*ev
;
622 struct timeval timeout
;
627 static void ctdb_detach_dbname_done(struct tevent_req
*subreq
);
628 static void ctdb_detach_done(struct tevent_req
*subreq
);
630 struct tevent_req
*ctdb_detach_send(TALLOC_CTX
*mem_ctx
,
631 struct tevent_context
*ev
,
632 struct ctdb_client_context
*client
,
633 struct timeval timeout
, uint32_t db_id
)
635 struct tevent_req
*req
, *subreq
;
636 struct ctdb_detach_state
*state
;
637 struct ctdb_req_control request
;
639 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_detach_state
);
644 state
->client
= client
;
646 state
->timeout
= timeout
;
647 state
->db_id
= db_id
;
649 ctdb_req_control_get_dbname(&request
, db_id
);
650 subreq
= ctdb_client_control_send(state
, ev
, client
,
651 ctdb_client_pnn(client
), timeout
,
653 if (tevent_req_nomem(subreq
, req
)) {
654 return tevent_req_post(req
, ev
);
656 tevent_req_set_callback(subreq
, ctdb_detach_dbname_done
, req
);
661 static void ctdb_detach_dbname_done(struct tevent_req
*subreq
)
663 struct tevent_req
*req
= tevent_req_callback_data(
664 subreq
, struct tevent_req
);
665 struct ctdb_detach_state
*state
= tevent_req_data(
666 req
, struct ctdb_detach_state
);
667 struct ctdb_reply_control
*reply
;
668 struct ctdb_req_control request
;
672 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
675 DEBUG(DEBUG_ERR
, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
677 tevent_req_error(req
, ret
);
681 ret
= ctdb_reply_control_get_dbname(reply
, state
, &state
->db_name
);
683 DEBUG(DEBUG_ERR
, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
685 tevent_req_error(req
, ret
);
689 ctdb_req_control_db_detach(&request
, state
->db_id
);
690 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
691 ctdb_client_pnn(state
->client
),
692 state
->timeout
, &request
);
693 if (tevent_req_nomem(subreq
, req
)) {
696 tevent_req_set_callback(subreq
, ctdb_detach_done
, req
);
700 static void ctdb_detach_done(struct tevent_req
*subreq
)
702 struct tevent_req
*req
= tevent_req_callback_data(
703 subreq
, struct tevent_req
);
704 struct ctdb_detach_state
*state
= tevent_req_data(
705 req
, struct ctdb_detach_state
);
706 struct ctdb_reply_control
*reply
;
707 struct ctdb_db_context
*db
;
711 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
714 DEBUG(DEBUG_ERR
, ("detach: %s DB_DETACH failed, ret=%d\n",
715 state
->db_name
, ret
));
716 tevent_req_error(req
, ret
);
720 ret
= ctdb_reply_control_db_detach(reply
);
722 DEBUG(DEBUG_ERR
, ("detach: %s DB_DETACH failed, ret=%d\n",
723 state
->db_name
, ret
));
724 tevent_req_error(req
, ret
);
728 db
= client_db_handle(state
->client
, state
->db_name
);
730 DLIST_REMOVE(state
->client
->db
, db
);
734 tevent_req_done(req
);
737 bool ctdb_detach_recv(struct tevent_req
*req
, int *perr
)
741 if (tevent_req_is_unix_error(req
, &ret
)) {
751 int ctdb_detach(struct tevent_context
*ev
,
752 struct ctdb_client_context
*client
,
753 struct timeval timeout
, uint32_t db_id
)
756 struct tevent_req
*req
;
760 mem_ctx
= talloc_new(client
);
761 if (mem_ctx
== NULL
) {
765 req
= ctdb_detach_send(mem_ctx
, ev
, client
, timeout
, db_id
);
767 talloc_free(mem_ctx
);
771 tevent_req_poll(req
, ev
);
773 status
= ctdb_detach_recv(req
, &ret
);
775 talloc_free(mem_ctx
);
779 talloc_free(mem_ctx
);
783 uint32_t ctdb_db_id(struct ctdb_db_context
*db
)
788 struct ctdb_db_traverse_local_state
{
789 ctdb_rec_parser_func_t parser
;
795 static int ctdb_db_traverse_local_handler(struct tdb_context
*tdb
,
796 TDB_DATA key
, TDB_DATA data
,
799 struct ctdb_db_traverse_local_state
*state
=
800 (struct ctdb_db_traverse_local_state
*)private_data
;
803 if (state
->extract_header
) {
804 struct ctdb_ltdb_header header
;
806 ret
= ctdb_ltdb_header_extract(&data
, &header
);
812 ret
= state
->parser(0, &header
, key
, data
, state
->private_data
);
814 ret
= state
->parser(0, NULL
, key
, data
, state
->private_data
);
825 int ctdb_db_traverse_local(struct ctdb_db_context
*db
, bool readonly
,
827 ctdb_rec_parser_func_t parser
, void *private_data
)
829 struct ctdb_db_traverse_local_state state
;
832 state
.parser
= parser
;
833 state
.private_data
= private_data
;
834 state
.extract_header
= extract_header
;
838 ret
= tdb_traverse_read(db
->ltdb
->tdb
,
839 ctdb_db_traverse_local_handler
,
842 ret
= tdb_traverse(db
->ltdb
->tdb
,
843 ctdb_db_traverse_local_handler
, &state
);
853 struct ctdb_db_traverse_state
{
854 struct tevent_context
*ev
;
855 struct ctdb_client_context
*client
;
856 struct ctdb_db_context
*db
;
859 struct timeval timeout
;
860 ctdb_rec_parser_func_t parser
;
865 static void ctdb_db_traverse_handler_set(struct tevent_req
*subreq
);
866 static void ctdb_db_traverse_started(struct tevent_req
*subreq
);
867 static void ctdb_db_traverse_handler(uint64_t srvid
, TDB_DATA data
,
869 static void ctdb_db_traverse_remove_handler(struct tevent_req
*req
);
870 static void ctdb_db_traverse_handler_removed(struct tevent_req
*subreq
);
872 struct tevent_req
*ctdb_db_traverse_send(TALLOC_CTX
*mem_ctx
,
873 struct tevent_context
*ev
,
874 struct ctdb_client_context
*client
,
875 struct ctdb_db_context
*db
,
877 struct timeval timeout
,
878 ctdb_rec_parser_func_t parser
,
881 struct tevent_req
*req
, *subreq
;
882 struct ctdb_db_traverse_state
*state
;
884 req
= tevent_req_create(mem_ctx
, &state
,
885 struct ctdb_db_traverse_state
);
891 state
->client
= client
;
893 state
->destnode
= destnode
;
894 state
->srvid
= CTDB_SRVID_CLIENT_RANGE
| getpid();
895 state
->timeout
= timeout
;
896 state
->parser
= parser
;
897 state
->private_data
= private_data
;
899 subreq
= ctdb_client_set_message_handler_send(state
, ev
, client
,
901 ctdb_db_traverse_handler
,
903 if (tevent_req_nomem(subreq
, req
)) {
904 return tevent_req_post(req
, ev
);
906 tevent_req_set_callback(subreq
, ctdb_db_traverse_handler_set
, req
);
911 static void ctdb_db_traverse_handler_set(struct tevent_req
*subreq
)
913 struct tevent_req
*req
= tevent_req_callback_data(
914 subreq
, struct tevent_req
);
915 struct ctdb_db_traverse_state
*state
= tevent_req_data(
916 req
, struct ctdb_db_traverse_state
);
917 struct ctdb_traverse_start_ext traverse
;
918 struct ctdb_req_control request
;
922 status
= ctdb_client_set_message_handler_recv(subreq
, &ret
);
925 tevent_req_error(req
, ret
);
929 traverse
= (struct ctdb_traverse_start_ext
) {
930 .db_id
= ctdb_db_id(state
->db
),
932 .srvid
= state
->srvid
,
933 .withemptyrecords
= false,
936 ctdb_req_control_traverse_start_ext(&request
, &traverse
);
937 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
938 state
->destnode
, state
->timeout
,
940 if (subreq
== NULL
) {
941 state
->result
= ENOMEM
;
942 ctdb_db_traverse_remove_handler(req
);
945 tevent_req_set_callback(subreq
, ctdb_db_traverse_started
, req
);
948 static void ctdb_db_traverse_started(struct tevent_req
*subreq
)
950 struct tevent_req
*req
= tevent_req_callback_data(
951 subreq
, struct tevent_req
);
952 struct ctdb_db_traverse_state
*state
= tevent_req_data(
953 req
, struct ctdb_db_traverse_state
);
954 struct ctdb_reply_control
*reply
;
958 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
961 DEBUG(DEBUG_ERR
, ("traverse: control failed, ret=%d\n", ret
));
963 ctdb_db_traverse_remove_handler(req
);
967 ret
= ctdb_reply_control_traverse_start_ext(reply
);
970 DEBUG(DEBUG_ERR
, ("traverse: control reply failed, ret=%d\n",
973 ctdb_db_traverse_remove_handler(req
);
978 static void ctdb_db_traverse_handler(uint64_t srvid
, TDB_DATA data
,
981 struct tevent_req
*req
= talloc_get_type_abort(
982 private_data
, struct tevent_req
);
983 struct ctdb_db_traverse_state
*state
= tevent_req_data(
984 req
, struct ctdb_db_traverse_state
);
985 struct ctdb_rec_data
*rec
;
986 struct ctdb_ltdb_header header
;
990 ret
= ctdb_rec_data_pull(data
.dptr
, data
.dsize
, state
, &rec
, &np
);
995 if (rec
->key
.dsize
== 0 && rec
->data
.dsize
== 0) {
997 ctdb_db_traverse_remove_handler(req
);
1001 ret
= ctdb_ltdb_header_extract(&rec
->data
, &header
);
1007 if (rec
->data
.dsize
== 0) {
1012 ret
= state
->parser(rec
->reqid
, &header
, rec
->key
, rec
->data
,
1013 state
->private_data
);
1016 state
->result
= ret
;
1017 ctdb_db_traverse_remove_handler(req
);
1021 static void ctdb_db_traverse_remove_handler(struct tevent_req
*req
)
1023 struct ctdb_db_traverse_state
*state
= tevent_req_data(
1024 req
, struct ctdb_db_traverse_state
);
1025 struct tevent_req
*subreq
;
1027 subreq
= ctdb_client_remove_message_handler_send(state
, state
->ev
,
1030 if (tevent_req_nomem(subreq
, req
)) {
1033 tevent_req_set_callback(subreq
, ctdb_db_traverse_handler_removed
, req
);
1036 static void ctdb_db_traverse_handler_removed(struct tevent_req
*subreq
)
1038 struct tevent_req
*req
= tevent_req_callback_data(
1039 subreq
, struct tevent_req
);
1040 struct ctdb_db_traverse_state
*state
= tevent_req_data(
1041 req
, struct ctdb_db_traverse_state
);
1045 status
= ctdb_client_remove_message_handler_recv(subreq
, &ret
);
1046 TALLOC_FREE(subreq
);
1048 tevent_req_error(req
, ret
);
1052 if (state
->result
!= 0) {
1053 tevent_req_error(req
, state
->result
);
1057 tevent_req_done(req
);
1060 bool ctdb_db_traverse_recv(struct tevent_req
*req
, int *perr
)
1064 if (tevent_req_is_unix_error(req
, &ret
)) {
1074 int ctdb_db_traverse(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
1075 struct ctdb_client_context
*client
,
1076 struct ctdb_db_context
*db
,
1077 uint32_t destnode
, struct timeval timeout
,
1078 ctdb_rec_parser_func_t parser
, void *private_data
)
1080 struct tevent_req
*req
;
1084 req
= ctdb_db_traverse_send(mem_ctx
, ev
, client
, db
, destnode
,
1085 timeout
, parser
, private_data
);
1090 tevent_req_poll(req
, ev
);
1092 status
= ctdb_db_traverse_recv(req
, &ret
);
1100 int ctdb_ltdb_fetch(struct ctdb_db_context
*db
, TDB_DATA key
,
1101 struct ctdb_ltdb_header
*header
,
1102 TALLOC_CTX
*mem_ctx
, TDB_DATA
*data
)
1108 rec
= tdb_fetch(db
->ltdb
->tdb
, key
);
1109 if (rec
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
1110 /* No record present */
1111 if (rec
.dptr
!= NULL
) {
1115 if (tdb_error(db
->ltdb
->tdb
) != TDB_ERR_NOEXIST
) {
1119 *header
= (struct ctdb_ltdb_header
) {
1120 .dmaster
= CTDB_UNKNOWN_PNN
,
1129 ret
= ctdb_ltdb_header_pull(rec
.dptr
, rec
.dsize
, header
, &np
);
1136 data
->dsize
= rec
.dsize
- np
;
1137 data
->dptr
= talloc_memdup(mem_ctx
, rec
.dptr
+ np
,
1139 if (data
->dptr
== NULL
) {
1149 * Fetch a record from volatile database
1152 * 1. Get a lock on the hash chain
1153 * 2. If the record does not exist, migrate the record
1154 * 3. If readonly=true and delegations do not exist, migrate the record.
1155 * 4. If readonly=false and delegations exist, migrate the record.
1156 * 5. If the local node is not dmaster, migrate the record.
1160 struct ctdb_fetch_lock_state
{
1161 struct tevent_context
*ev
;
1162 struct ctdb_client_context
*client
;
1163 struct ctdb_record_handle
*h
;
1168 static int ctdb_fetch_lock_check(struct tevent_req
*req
);
1169 static void ctdb_fetch_lock_migrate(struct tevent_req
*req
);
1170 static void ctdb_fetch_lock_migrate_done(struct tevent_req
*subreq
);
1172 struct tevent_req
*ctdb_fetch_lock_send(TALLOC_CTX
*mem_ctx
,
1173 struct tevent_context
*ev
,
1174 struct ctdb_client_context
*client
,
1175 struct ctdb_db_context
*db
,
1176 TDB_DATA key
, bool readonly
)
1178 struct ctdb_fetch_lock_state
*state
;
1179 struct tevent_req
*req
;
1182 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_fetch_lock_state
);
1188 state
->client
= client
;
1190 state
->h
= talloc_zero(db
, struct ctdb_record_handle
);
1191 if (tevent_req_nomem(state
->h
, req
)) {
1192 return tevent_req_post(req
, ev
);
1194 state
->h
->client
= client
;
1196 state
->h
->key
.dptr
= talloc_memdup(state
->h
, key
.dptr
, key
.dsize
);
1197 if (tevent_req_nomem(state
->h
->key
.dptr
, req
)) {
1198 return tevent_req_post(req
, ev
);
1200 state
->h
->key
.dsize
= key
.dsize
;
1201 state
->h
->readonly
= false;
1203 state
->readonly
= readonly
;
1204 state
->pnn
= ctdb_client_pnn(client
);
1206 /* Check that database is not persistent */
1207 if (! ctdb_db_volatile(db
)) {
1208 DEBUG(DEBUG_ERR
, ("fetch_lock: %s database not volatile\n",
1210 tevent_req_error(req
, EINVAL
);
1211 return tevent_req_post(req
, ev
);
1214 ret
= ctdb_fetch_lock_check(req
);
1216 tevent_req_done(req
);
1217 return tevent_req_post(req
, ev
);
1219 if (ret
!= EAGAIN
) {
1220 tevent_req_error(req
, ret
);
1221 return tevent_req_post(req
, ev
);
1226 static int ctdb_fetch_lock_check(struct tevent_req
*req
)
1228 struct ctdb_fetch_lock_state
*state
= tevent_req_data(
1229 req
, struct ctdb_fetch_lock_state
);
1230 struct ctdb_record_handle
*h
= state
->h
;
1231 struct ctdb_ltdb_header header
;
1232 TDB_DATA data
= tdb_null
;
1235 bool do_migrate
= false;
1237 ret
= tdb_chainlock(h
->db
->ltdb
->tdb
, h
->key
);
1240 ("fetch_lock: %s tdb_chainlock failed, %s\n",
1241 h
->db
->db_name
, tdb_errorstr(h
->db
->ltdb
->tdb
)));
1246 data
= tdb_fetch(h
->db
->ltdb
->tdb
, h
->key
);
1247 if (data
.dptr
== NULL
) {
1248 if (tdb_error(h
->db
->ltdb
->tdb
) == TDB_ERR_NOEXIST
) {
1256 /* Got the record */
1257 ret
= ctdb_ltdb_header_pull(data
.dptr
, data
.dsize
, &header
, &np
);
1263 if (! state
->readonly
) {
1264 /* Read/write access */
1265 if (header
.dmaster
== state
->pnn
&&
1266 header
.flags
& CTDB_REC_RO_HAVE_DELEGATIONS
) {
1270 if (header
.dmaster
!= state
->pnn
) {
1274 /* Readonly access */
1275 if (header
.dmaster
!= state
->pnn
&&
1276 ! (header
.flags
& (CTDB_REC_RO_HAVE_READONLY
|
1277 CTDB_REC_RO_HAVE_DELEGATIONS
))) {
1282 /* We are the dmaster or readonly delegation */
1285 if (header
.flags
& (CTDB_REC_RO_HAVE_READONLY
|
1286 CTDB_REC_RO_HAVE_DELEGATIONS
)) {
1296 if (data
.dptr
!= NULL
) {
1299 ret
= tdb_chainunlock(h
->db
->ltdb
->tdb
, h
->key
);
1302 ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1303 h
->db
->db_name
, tdb_errorstr(h
->db
->ltdb
->tdb
)));
1308 ctdb_fetch_lock_migrate(req
);
1313 static void ctdb_fetch_lock_migrate(struct tevent_req
*req
)
1315 struct ctdb_fetch_lock_state
*state
= tevent_req_data(
1316 req
, struct ctdb_fetch_lock_state
);
1317 struct ctdb_req_call request
;
1318 struct tevent_req
*subreq
;
1320 ZERO_STRUCT(request
);
1321 request
.flags
= CTDB_IMMEDIATE_MIGRATION
;
1322 if (state
->readonly
) {
1323 request
.flags
|= CTDB_WANT_READONLY
;
1325 request
.db_id
= state
->h
->db
->db_id
;
1326 request
.callid
= CTDB_NULL_FUNC
;
1327 request
.key
= state
->h
->key
;
1328 request
.calldata
= tdb_null
;
1330 subreq
= ctdb_client_call_send(state
, state
->ev
, state
->client
,
1332 if (tevent_req_nomem(subreq
, req
)) {
1336 tevent_req_set_callback(subreq
, ctdb_fetch_lock_migrate_done
, req
);
1339 static void ctdb_fetch_lock_migrate_done(struct tevent_req
*subreq
)
1341 struct tevent_req
*req
= tevent_req_callback_data(
1342 subreq
, struct tevent_req
);
1343 struct ctdb_fetch_lock_state
*state
= tevent_req_data(
1344 req
, struct ctdb_fetch_lock_state
);
1345 struct ctdb_reply_call
*reply
;
1349 status
= ctdb_client_call_recv(subreq
, state
, &reply
, &ret
);
1350 TALLOC_FREE(subreq
);
1352 DEBUG(DEBUG_ERR
, ("fetch_lock: %s CALL failed, ret=%d\n",
1353 state
->h
->db
->db_name
, ret
));
1354 tevent_req_error(req
, ret
);
1358 if (reply
->status
!= 0) {
1359 tevent_req_error(req
, EIO
);
1364 ret
= ctdb_fetch_lock_check(req
);
1366 if (ret
!= EAGAIN
) {
1367 tevent_req_error(req
, ret
);
1372 tevent_req_done(req
);
1375 static int ctdb_record_handle_destructor(struct ctdb_record_handle
*h
)
1379 ret
= tdb_chainunlock(h
->db
->ltdb
->tdb
, h
->key
);
1382 ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1383 h
->db
->db_name
, tdb_errorstr(h
->db
->ltdb
->tdb
)));
1389 struct ctdb_record_handle
*ctdb_fetch_lock_recv(struct tevent_req
*req
,
1390 struct ctdb_ltdb_header
*header
,
1391 TALLOC_CTX
*mem_ctx
,
1392 TDB_DATA
*data
, int *perr
)
1394 struct ctdb_fetch_lock_state
*state
= tevent_req_data(
1395 req
, struct ctdb_fetch_lock_state
);
1396 struct ctdb_record_handle
*h
= state
->h
;
1399 if (tevent_req_is_unix_error(req
, &err
)) {
1401 TALLOC_FREE(state
->h
);
1407 if (header
!= NULL
) {
1408 *header
= h
->header
;
1413 offset
= ctdb_ltdb_header_len(&h
->header
);
1415 data
->dsize
= h
->data
.dsize
- offset
;
1416 data
->dptr
= talloc_memdup(mem_ctx
, h
->data
.dptr
+ offset
,
1418 if (data
->dptr
== NULL
) {
1419 TALLOC_FREE(state
->h
);
1427 talloc_set_destructor(h
, ctdb_record_handle_destructor
);
1431 int ctdb_fetch_lock(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
1432 struct ctdb_client_context
*client
,
1433 struct ctdb_db_context
*db
, TDB_DATA key
, bool readonly
,
1434 struct ctdb_record_handle
**out
,
1435 struct ctdb_ltdb_header
*header
, TDB_DATA
*data
)
1437 struct tevent_req
*req
;
1438 struct ctdb_record_handle
*h
;
1441 req
= ctdb_fetch_lock_send(mem_ctx
, ev
, client
, db
, key
, readonly
);
1446 tevent_req_poll(req
, ev
);
1448 h
= ctdb_fetch_lock_recv(req
, header
, mem_ctx
, data
, &ret
);
1457 int ctdb_store_record(struct ctdb_record_handle
*h
, TDB_DATA data
)
1459 uint8_t header
[sizeof(struct ctdb_ltdb_header
)];
1464 /* Cannot modify the record if it was obtained as a readonly copy */
1469 /* Check if the new data is same */
1470 if (h
->data
.dsize
== data
.dsize
&&
1471 memcmp(h
->data
.dptr
, data
.dptr
, data
.dsize
) == 0) {
1472 /* No need to do anything */
1476 ctdb_ltdb_header_push(&h
->header
, header
, &np
);
1479 rec
[0].dptr
= header
;
1481 rec
[1].dsize
= data
.dsize
;
1482 rec
[1].dptr
= data
.dptr
;
1484 ret
= tdb_storev(h
->db
->ltdb
->tdb
, h
->key
, rec
, 2, TDB_REPLACE
);
1487 ("store_record: %s tdb_storev failed, %s\n",
1488 h
->db
->db_name
, tdb_errorstr(h
->db
->ltdb
->tdb
)));
1495 struct ctdb_delete_record_state
{
1496 struct ctdb_record_handle
*h
;
1499 static void ctdb_delete_record_done(struct tevent_req
*subreq
);
1501 struct tevent_req
*ctdb_delete_record_send(TALLOC_CTX
*mem_ctx
,
1502 struct tevent_context
*ev
,
1503 struct ctdb_record_handle
*h
)
1505 struct tevent_req
*req
, *subreq
;
1506 struct ctdb_delete_record_state
*state
;
1507 struct ctdb_key_data key
;
1508 struct ctdb_req_control request
;
1509 uint8_t header
[sizeof(struct ctdb_ltdb_header
)];
1514 req
= tevent_req_create(mem_ctx
, &state
,
1515 struct ctdb_delete_record_state
);
1522 /* Cannot delete the record if it was obtained as a readonly copy */
1524 DEBUG(DEBUG_ERR
, ("fetch_lock delete: %s readonly record\n",
1526 tevent_req_error(req
, EINVAL
);
1527 return tevent_req_post(req
, ev
);
1530 ctdb_ltdb_header_push(&h
->header
, header
, &np
);
1535 ret
= tdb_store(h
->db
->ltdb
->tdb
, h
->key
, rec
, TDB_REPLACE
);
1538 ("fetch_lock delete: %s tdb_sore failed, %s\n",
1539 h
->db
->db_name
, tdb_errorstr(h
->db
->ltdb
->tdb
)));
1540 tevent_req_error(req
, EIO
);
1541 return tevent_req_post(req
, ev
);
1544 key
.db_id
= h
->db
->db_id
;
1545 key
.header
= h
->header
;
1548 ctdb_req_control_schedule_for_deletion(&request
, &key
);
1549 subreq
= ctdb_client_control_send(state
, ev
, h
->client
,
1550 ctdb_client_pnn(h
->client
),
1551 tevent_timeval_zero(),
1553 if (tevent_req_nomem(subreq
, req
)) {
1554 return tevent_req_post(req
, ev
);
1556 tevent_req_set_callback(subreq
, ctdb_delete_record_done
, req
);
1561 static void ctdb_delete_record_done(struct tevent_req
*subreq
)
1563 struct tevent_req
*req
= tevent_req_callback_data(
1564 subreq
, struct tevent_req
);
1565 struct ctdb_delete_record_state
*state
= tevent_req_data(
1566 req
, struct ctdb_delete_record_state
);
1570 status
= ctdb_client_control_recv(subreq
, &ret
, NULL
, NULL
);
1571 TALLOC_FREE(subreq
);
1574 ("delete_record: %s SCHDULE_FOR_DELETION failed, "
1575 "ret=%d\n", state
->h
->db
->db_name
, ret
));
1576 tevent_req_error(req
, ret
);
1580 tevent_req_done(req
);
1583 bool ctdb_delete_record_recv(struct tevent_req
*req
, int *perr
)
1587 if (tevent_req_is_unix_error(req
, &err
)) {
1598 int ctdb_delete_record(struct ctdb_record_handle
*h
)
1600 struct tevent_context
*ev
= h
->ev
;
1601 TALLOC_CTX
*mem_ctx
;
1602 struct tevent_req
*req
;
1606 mem_ctx
= talloc_new(NULL
);
1607 if (mem_ctx
== NULL
) {
1611 req
= ctdb_delete_record_send(mem_ctx
, ev
, h
);
1613 talloc_free(mem_ctx
);
1617 tevent_req_poll(req
, ev
);
1619 status
= ctdb_delete_record_recv(req
, &ret
);
1620 talloc_free(mem_ctx
);
1629 * Global lock functions
1632 struct ctdb_g_lock_lock_state
{
1633 struct tevent_context
*ev
;
1634 struct ctdb_client_context
*client
;
1635 struct ctdb_db_context
*db
;
1637 struct ctdb_server_id my_sid
;
1638 enum ctdb_g_lock_type lock_type
;
1639 struct ctdb_record_handle
*h
;
1640 /* state for verification of active locks */
1641 struct ctdb_g_lock_list
*lock_list
;
1642 unsigned int current
;
1645 static void ctdb_g_lock_lock_fetched(struct tevent_req
*subreq
);
1646 static void ctdb_g_lock_lock_process_locks(struct tevent_req
*req
);
1647 static void ctdb_g_lock_lock_checked(struct tevent_req
*subreq
);
1648 static int ctdb_g_lock_lock_update(struct tevent_req
*req
);
1649 static void ctdb_g_lock_lock_retry(struct tevent_req
*subreq
);
1651 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1
,
1652 enum ctdb_g_lock_type l2
)
1654 if ((l1
== CTDB_G_LOCK_READ
) && (l2
== CTDB_G_LOCK_READ
)) {
1660 struct tevent_req
*ctdb_g_lock_lock_send(TALLOC_CTX
*mem_ctx
,
1661 struct tevent_context
*ev
,
1662 struct ctdb_client_context
*client
,
1663 struct ctdb_db_context
*db
,
1664 const char *keyname
,
1665 struct ctdb_server_id
*sid
,
1668 struct tevent_req
*req
, *subreq
;
1669 struct ctdb_g_lock_lock_state
*state
;
1671 req
= tevent_req_create(mem_ctx
, &state
,
1672 struct ctdb_g_lock_lock_state
);
1678 state
->client
= client
;
1680 state
->key
.dptr
= discard_const(keyname
);
1681 state
->key
.dsize
= strlen(keyname
) + 1;
1682 state
->my_sid
= *sid
;
1683 state
->lock_type
= (readonly
? CTDB_G_LOCK_READ
: CTDB_G_LOCK_WRITE
);
1685 subreq
= ctdb_fetch_lock_send(state
, ev
, client
, db
, state
->key
,
1687 if (tevent_req_nomem(subreq
, req
)) {
1688 return tevent_req_post(req
, ev
);
1690 tevent_req_set_callback(subreq
, ctdb_g_lock_lock_fetched
, req
);
1695 static void ctdb_g_lock_lock_fetched(struct tevent_req
*subreq
)
1697 struct tevent_req
*req
= tevent_req_callback_data(
1698 subreq
, struct tevent_req
);
1699 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1700 req
, struct ctdb_g_lock_lock_state
);
1705 state
->h
= ctdb_fetch_lock_recv(subreq
, NULL
, state
, &data
, &ret
);
1706 TALLOC_FREE(subreq
);
1707 if (state
->h
== NULL
) {
1708 DEBUG(DEBUG_ERR
, ("g_lock_lock: %s fetch lock failed\n",
1709 (char *)state
->key
.dptr
));
1710 tevent_req_error(req
, ret
);
1714 if (state
->lock_list
!= NULL
) {
1715 TALLOC_FREE(state
->lock_list
);
1719 ret
= ctdb_g_lock_list_pull(data
.dptr
, data
.dsize
, state
,
1720 &state
->lock_list
, &np
);
1721 talloc_free(data
.dptr
);
1723 DEBUG(DEBUG_ERR
, ("g_lock_lock: %s invalid lock data\n",
1724 (char *)state
->key
.dptr
));
1725 tevent_req_error(req
, ret
);
1729 ctdb_g_lock_lock_process_locks(req
);
1732 static void ctdb_g_lock_lock_process_locks(struct tevent_req
*req
)
1734 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1735 req
, struct ctdb_g_lock_lock_state
);
1736 struct tevent_req
*subreq
;
1737 struct ctdb_g_lock
*lock
;
1738 bool check_server
= false;
1741 while (state
->current
< state
->lock_list
->num
) {
1742 lock
= &state
->lock_list
->lock
[state
->current
];
1744 /* We should not ask for the same lock more than once */
1745 if (ctdb_server_id_equal(&lock
->sid
, &state
->my_sid
)) {
1746 DEBUG(DEBUG_ERR
, ("g_lock_lock: %s deadlock\n",
1747 (char *)state
->key
.dptr
));
1748 tevent_req_error(req
, EDEADLK
);
1752 if (ctdb_g_lock_conflicts(lock
->type
, state
->lock_type
)) {
1753 check_server
= true;
1757 state
->current
+= 1;
1761 struct ctdb_req_control request
;
1763 ctdb_req_control_process_exists(&request
, lock
->sid
.pid
);
1764 subreq
= ctdb_client_control_send(state
, state
->ev
,
1767 tevent_timeval_zero(),
1769 if (tevent_req_nomem(subreq
, req
)) {
1772 tevent_req_set_callback(subreq
, ctdb_g_lock_lock_checked
, req
);
1776 /* There is no conflict, add ourself to the lock_list */
1777 state
->lock_list
->lock
= talloc_realloc(state
->lock_list
,
1778 state
->lock_list
->lock
,
1780 state
->lock_list
->num
+ 1);
1781 if (state
->lock_list
->lock
== NULL
) {
1782 tevent_req_error(req
, ENOMEM
);
1786 lock
= &state
->lock_list
->lock
[state
->lock_list
->num
];
1787 lock
->type
= state
->lock_type
;
1788 lock
->sid
= state
->my_sid
;
1789 state
->lock_list
->num
+= 1;
1791 ret
= ctdb_g_lock_lock_update(req
);
1793 tevent_req_error(req
, ret
);
1797 TALLOC_FREE(state
->h
);
1798 tevent_req_done(req
);
1801 static void ctdb_g_lock_lock_checked(struct tevent_req
*subreq
)
1803 struct tevent_req
*req
= tevent_req_callback_data(
1804 subreq
, struct tevent_req
);
1805 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1806 req
, struct ctdb_g_lock_lock_state
);
1807 struct ctdb_reply_control
*reply
;
1811 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
1812 TALLOC_FREE(subreq
);
1815 ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n",
1816 (char *)state
->key
.dptr
, ret
));
1817 tevent_req_error(req
, ret
);
1821 ret
= ctdb_reply_control_process_exists(reply
, &value
);
1823 tevent_req_error(req
, ret
);
1829 /* server process exists, need to retry */
1830 TALLOC_FREE(state
->h
);
1831 subreq
= tevent_wakeup_send(state
, state
->ev
,
1832 tevent_timeval_current_ofs(0,1000));
1833 if (tevent_req_nomem(subreq
, req
)) {
1836 tevent_req_set_callback(subreq
, ctdb_g_lock_lock_retry
, req
);
1840 /* server process does not exist, remove conflicting entry */
1841 state
->lock_list
->lock
[state
->current
] =
1842 state
->lock_list
->lock
[state
->lock_list
->num
-1];
1843 state
->lock_list
->num
-= 1;
1845 ret
= ctdb_g_lock_lock_update(req
);
1847 tevent_req_error(req
, ret
);
1851 ctdb_g_lock_lock_process_locks(req
);
1854 static int ctdb_g_lock_lock_update(struct tevent_req
*req
)
1856 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1857 req
, struct ctdb_g_lock_lock_state
);
1862 data
.dsize
= ctdb_g_lock_list_len(state
->lock_list
);
1863 data
.dptr
= talloc_size(state
, data
.dsize
);
1864 if (data
.dptr
== NULL
) {
1868 ctdb_g_lock_list_push(state
->lock_list
, data
.dptr
, &np
);
1869 ret
= ctdb_store_record(state
->h
, data
);
1870 talloc_free(data
.dptr
);
1874 static void ctdb_g_lock_lock_retry(struct tevent_req
*subreq
)
1876 struct tevent_req
*req
= tevent_req_callback_data(
1877 subreq
, struct tevent_req
);
1878 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1879 req
, struct ctdb_g_lock_lock_state
);
1882 success
= tevent_wakeup_recv(subreq
);
1883 TALLOC_FREE(subreq
);
1885 tevent_req_error(req
, ENOMEM
);
1889 subreq
= ctdb_fetch_lock_send(state
, state
->ev
, state
->client
,
1890 state
->db
, state
->key
, false);
1891 if (tevent_req_nomem(subreq
, req
)) {
1894 tevent_req_set_callback(subreq
, ctdb_g_lock_lock_fetched
, req
);
1897 bool ctdb_g_lock_lock_recv(struct tevent_req
*req
, int *perr
)
1899 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1900 req
, struct ctdb_g_lock_lock_state
);
1903 TALLOC_FREE(state
->h
);
1905 if (tevent_req_is_unix_error(req
, &err
)) {
1915 struct ctdb_g_lock_unlock_state
{
1916 struct tevent_context
*ev
;
1917 struct ctdb_client_context
*client
;
1918 struct ctdb_db_context
*db
;
1920 struct ctdb_server_id my_sid
;
1921 struct ctdb_record_handle
*h
;
1922 struct ctdb_g_lock_list
*lock_list
;
1925 static void ctdb_g_lock_unlock_fetched(struct tevent_req
*subreq
);
1926 static int ctdb_g_lock_unlock_update(struct tevent_req
*req
);
1927 static void ctdb_g_lock_unlock_deleted(struct tevent_req
*subreq
);
1929 struct tevent_req
*ctdb_g_lock_unlock_send(TALLOC_CTX
*mem_ctx
,
1930 struct tevent_context
*ev
,
1931 struct ctdb_client_context
*client
,
1932 struct ctdb_db_context
*db
,
1933 const char *keyname
,
1934 struct ctdb_server_id sid
)
1936 struct tevent_req
*req
, *subreq
;
1937 struct ctdb_g_lock_unlock_state
*state
;
1939 req
= tevent_req_create(mem_ctx
, &state
,
1940 struct ctdb_g_lock_unlock_state
);
1946 state
->client
= client
;
1948 state
->key
.dptr
= discard_const(keyname
);
1949 state
->key
.dsize
= strlen(keyname
) + 1;
1950 state
->my_sid
= sid
;
1952 subreq
= ctdb_fetch_lock_send(state
, ev
, client
, db
, state
->key
,
1954 if (tevent_req_nomem(subreq
, req
)) {
1955 return tevent_req_post(req
, ev
);
1957 tevent_req_set_callback(subreq
, ctdb_g_lock_unlock_fetched
, req
);
1962 static void ctdb_g_lock_unlock_fetched(struct tevent_req
*subreq
)
1964 struct tevent_req
*req
= tevent_req_callback_data(
1965 subreq
, struct tevent_req
);
1966 struct ctdb_g_lock_unlock_state
*state
= tevent_req_data(
1967 req
, struct ctdb_g_lock_unlock_state
);
1972 state
->h
= ctdb_fetch_lock_recv(subreq
, NULL
, state
, &data
, &ret
);
1973 TALLOC_FREE(subreq
);
1974 if (state
->h
== NULL
) {
1975 DEBUG(DEBUG_ERR
, ("g_lock_unlock: %s fetch lock failed\n",
1976 (char *)state
->key
.dptr
));
1977 tevent_req_error(req
, ret
);
1981 ret
= ctdb_g_lock_list_pull(data
.dptr
, data
.dsize
, state
,
1982 &state
->lock_list
, &np
);
1984 DEBUG(DEBUG_ERR
, ("g_lock_unlock: %s invalid lock data\n",
1985 (char *)state
->key
.dptr
));
1986 tevent_req_error(req
, ret
);
1990 ret
= ctdb_g_lock_unlock_update(req
);
1992 tevent_req_error(req
, ret
);
1996 if (state
->lock_list
->num
== 0) {
1997 subreq
= ctdb_delete_record_send(state
, state
->ev
, state
->h
);
1998 if (tevent_req_nomem(subreq
, req
)) {
2001 tevent_req_set_callback(subreq
, ctdb_g_lock_unlock_deleted
,
2006 TALLOC_FREE(state
->h
);
2007 tevent_req_done(req
);
2010 static int ctdb_g_lock_unlock_update(struct tevent_req
*req
)
2012 struct ctdb_g_lock_unlock_state
*state
= tevent_req_data(
2013 req
, struct ctdb_g_lock_unlock_state
);
2014 struct ctdb_g_lock
*lock
;
2017 for (i
=0; i
<state
->lock_list
->num
; i
++) {
2018 lock
= &state
->lock_list
->lock
[i
];
2020 if (ctdb_server_id_equal(&lock
->sid
, &state
->my_sid
)) {
2025 if (i
< state
->lock_list
->num
) {
2026 state
->lock_list
->lock
[i
] =
2027 state
->lock_list
->lock
[state
->lock_list
->num
-1];
2028 state
->lock_list
->num
-= 1;
2031 if (state
->lock_list
->num
!= 0) {
2035 data
.dsize
= ctdb_g_lock_list_len(state
->lock_list
);
2036 data
.dptr
= talloc_size(state
, data
.dsize
);
2037 if (data
.dptr
== NULL
) {
2041 ctdb_g_lock_list_push(state
->lock_list
, data
.dptr
, &np
);
2042 ret
= ctdb_store_record(state
->h
, data
);
2043 talloc_free(data
.dptr
);
2052 static void ctdb_g_lock_unlock_deleted(struct tevent_req
*subreq
)
2054 struct tevent_req
*req
= tevent_req_callback_data(
2055 subreq
, struct tevent_req
);
2056 struct ctdb_g_lock_unlock_state
*state
= tevent_req_data(
2057 req
, struct ctdb_g_lock_unlock_state
);
2061 status
= ctdb_delete_record_recv(subreq
, &ret
);
2064 ("g_lock_unlock %s delete record failed, ret=%d\n",
2065 (char *)state
->key
.dptr
, ret
));
2066 tevent_req_error(req
, ret
);
2070 TALLOC_FREE(state
->h
);
2071 tevent_req_done(req
);
2074 bool ctdb_g_lock_unlock_recv(struct tevent_req
*req
, int *perr
)
2076 struct ctdb_g_lock_unlock_state
*state
= tevent_req_data(
2077 req
, struct ctdb_g_lock_unlock_state
);
2080 TALLOC_FREE(state
->h
);
2082 if (tevent_req_is_unix_error(req
, &err
)) {
2093 * Persistent database functions
2095 struct ctdb_transaction_start_state
{
2096 struct tevent_context
*ev
;
2097 struct ctdb_client_context
*client
;
2098 struct timeval timeout
;
2099 struct ctdb_transaction_handle
*h
;
2103 static void ctdb_transaction_g_lock_attached(struct tevent_req
*subreq
);
2104 static void ctdb_transaction_g_lock_done(struct tevent_req
*subreq
);
2106 struct tevent_req
*ctdb_transaction_start_send(TALLOC_CTX
*mem_ctx
,
2107 struct tevent_context
*ev
,
2108 struct ctdb_client_context
*client
,
2109 struct timeval timeout
,
2110 struct ctdb_db_context
*db
,
2113 struct ctdb_transaction_start_state
*state
;
2114 struct tevent_req
*req
, *subreq
;
2115 struct ctdb_transaction_handle
*h
;
2117 req
= tevent_req_create(mem_ctx
, &state
,
2118 struct ctdb_transaction_start_state
);
2123 if (ctdb_db_volatile(db
)) {
2124 tevent_req_error(req
, EINVAL
);
2125 return tevent_req_post(req
, ev
);
2129 state
->client
= client
;
2130 state
->destnode
= ctdb_client_pnn(client
);
2132 h
= talloc_zero(db
, struct ctdb_transaction_handle
);
2133 if (tevent_req_nomem(h
, req
)) {
2134 return tevent_req_post(req
, ev
);
2140 h
->readonly
= readonly
;
2143 /* SRVID is unique for databases, so client can have transactions
2144 * active for multiple databases */
2145 h
->sid
= ctdb_client_get_server_id(client
, db
->db_id
);
2147 h
->recbuf
= ctdb_rec_buffer_init(h
, db
->db_id
);
2148 if (tevent_req_nomem(h
->recbuf
, req
)) {
2149 return tevent_req_post(req
, ev
);
2152 h
->lock_name
= talloc_asprintf(h
, "transaction_db_0x%08x", db
->db_id
);
2153 if (tevent_req_nomem(h
->lock_name
, req
)) {
2154 return tevent_req_post(req
, ev
);
2159 subreq
= ctdb_attach_send(state
, ev
, client
, timeout
, "g_lock.tdb", 0);
2160 if (tevent_req_nomem(subreq
, req
)) {
2161 return tevent_req_post(req
, ev
);
2163 tevent_req_set_callback(subreq
, ctdb_transaction_g_lock_attached
, req
);
2168 static void ctdb_transaction_g_lock_attached(struct tevent_req
*subreq
)
2170 struct tevent_req
*req
= tevent_req_callback_data(
2171 subreq
, struct tevent_req
);
2172 struct ctdb_transaction_start_state
*state
= tevent_req_data(
2173 req
, struct ctdb_transaction_start_state
);
2177 status
= ctdb_attach_recv(subreq
, &ret
, &state
->h
->db_g_lock
);
2178 TALLOC_FREE(subreq
);
2181 ("transaction_start: %s attach g_lock.tdb failed\n",
2182 state
->h
->db
->db_name
));
2183 tevent_req_error(req
, ret
);
2187 subreq
= ctdb_g_lock_lock_send(state
, state
->ev
, state
->client
,
2188 state
->h
->db_g_lock
,
2189 state
->h
->lock_name
,
2190 &state
->h
->sid
, state
->h
->readonly
);
2191 if (tevent_req_nomem(subreq
, req
)) {
2194 tevent_req_set_callback(subreq
, ctdb_transaction_g_lock_done
, req
);
2197 static void ctdb_transaction_g_lock_done(struct tevent_req
*subreq
)
2199 struct tevent_req
*req
= tevent_req_callback_data(
2200 subreq
, struct tevent_req
);
2201 struct ctdb_transaction_start_state
*state
= tevent_req_data(
2202 req
, struct ctdb_transaction_start_state
);
2206 status
= ctdb_g_lock_lock_recv(subreq
, &ret
);
2207 TALLOC_FREE(subreq
);
2210 ("transaction_start: %s g_lock lock failed, ret=%d\n",
2211 state
->h
->db
->db_name
, ret
));
2212 tevent_req_error(req
, ret
);
2216 tevent_req_done(req
);
2219 struct ctdb_transaction_handle
*ctdb_transaction_start_recv(
2220 struct tevent_req
*req
,
2223 struct ctdb_transaction_start_state
*state
= tevent_req_data(
2224 req
, struct ctdb_transaction_start_state
);
2227 if (tevent_req_is_unix_error(req
, &err
)) {
2237 int ctdb_transaction_start(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
2238 struct ctdb_client_context
*client
,
2239 struct timeval timeout
,
2240 struct ctdb_db_context
*db
, bool readonly
,
2241 struct ctdb_transaction_handle
**out
)
2243 struct tevent_req
*req
;
2244 struct ctdb_transaction_handle
*h
;
2247 req
= ctdb_transaction_start_send(mem_ctx
, ev
, client
, timeout
, db
,
2253 tevent_req_poll(req
, ev
);
2255 h
= ctdb_transaction_start_recv(req
, &ret
);
2264 struct ctdb_transaction_record_fetch_state
{
2266 struct ctdb_ltdb_header header
;
2270 static int ctdb_transaction_record_fetch_traverse(
2272 struct ctdb_ltdb_header
*nullheader
,
2273 TDB_DATA key
, TDB_DATA data
,
2276 struct ctdb_transaction_record_fetch_state
*state
=
2277 (struct ctdb_transaction_record_fetch_state
*)private_data
;
2279 if (state
->key
.dsize
== key
.dsize
&&
2280 memcmp(state
->key
.dptr
, key
.dptr
, key
.dsize
) == 0) {
2283 ret
= ctdb_ltdb_header_extract(&data
, &state
->header
);
2286 ("record_fetch: Failed to extract header, "
2292 state
->found
= true;
2298 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle
*h
,
2300 struct ctdb_ltdb_header
*header
,
2303 struct ctdb_transaction_record_fetch_state state
;
2307 state
.found
= false;
2309 ret
= ctdb_rec_buffer_traverse(h
->recbuf
,
2310 ctdb_transaction_record_fetch_traverse
,
2317 if (header
!= NULL
) {
2318 *header
= state
.header
;
2329 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle
*h
,
2331 TALLOC_CTX
*mem_ctx
, TDB_DATA
*data
)
2334 struct ctdb_ltdb_header header
;
2337 ret
= ctdb_transaction_record_fetch(h
, key
, NULL
, &tmp_data
);
2339 data
->dptr
= talloc_memdup(mem_ctx
, tmp_data
.dptr
,
2341 if (data
->dptr
== NULL
) {
2344 data
->dsize
= tmp_data
.dsize
;
2348 ret
= ctdb_ltdb_fetch(h
->db
, key
, &header
, mem_ctx
, data
);
2353 ret
= ctdb_rec_buffer_add(h
, h
->recbuf
, 0, &header
, key
, *data
);
2361 int ctdb_transaction_store_record(struct ctdb_transaction_handle
*h
,
2362 TDB_DATA key
, TDB_DATA data
)
2364 TALLOC_CTX
*tmp_ctx
;
2365 struct ctdb_ltdb_header header
;
2373 tmp_ctx
= talloc_new(h
);
2374 if (tmp_ctx
== NULL
) {
2378 ret
= ctdb_transaction_record_fetch(h
, key
, &header
, &old_data
);
2380 ret
= ctdb_ltdb_fetch(h
->db
, key
, &header
, tmp_ctx
, &old_data
);
2386 if (old_data
.dsize
== data
.dsize
&&
2387 memcmp(old_data
.dptr
, data
.dptr
, data
.dsize
) == 0) {
2388 talloc_free(tmp_ctx
);
2392 header
.dmaster
= ctdb_client_pnn(h
->client
);
2395 ret
= ctdb_rec_buffer_add(h
, h
->recbuf
, 0, &header
, key
, data
);
2396 talloc_free(tmp_ctx
);
2405 int ctdb_transaction_delete_record(struct ctdb_transaction_handle
*h
,
2408 return ctdb_transaction_store_record(h
, key
, tdb_null
);
2411 static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle
*h
,
2414 const char *keyname
= CTDB_DB_SEQNUM_KEY
;
2416 struct ctdb_ltdb_header header
;
2419 key
.dptr
= discard_const(keyname
);
2420 key
.dsize
= strlen(keyname
) + 1;
2422 ret
= ctdb_ltdb_fetch(h
->db
, key
, &header
, h
, &data
);
2425 ("transaction_commit: %s seqnum fetch failed, ret=%d\n",
2426 h
->db
->db_name
, ret
));
2430 if (data
.dsize
== 0) {
2436 if (data
.dsize
!= sizeof(uint64_t)) {
2437 talloc_free(data
.dptr
);
2441 *seqnum
= *(uint64_t *)data
.dptr
;
2443 talloc_free(data
.dptr
);
2447 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle
*h
,
2450 const char *keyname
= CTDB_DB_SEQNUM_KEY
;
2453 key
.dptr
= discard_const(keyname
);
2454 key
.dsize
= strlen(keyname
) + 1;
2456 data
.dptr
= (uint8_t *)&seqnum
;
2457 data
.dsize
= sizeof(seqnum
);
2459 return ctdb_transaction_store_record(h
, key
, data
);
2462 struct ctdb_transaction_commit_state
{
2463 struct tevent_context
*ev
;
2464 struct timeval timeout
;
2465 struct ctdb_transaction_handle
*h
;
2469 static void ctdb_transaction_commit_done(struct tevent_req
*subreq
);
2470 static void ctdb_transaction_commit_g_lock_done(struct tevent_req
*subreq
);
2472 struct tevent_req
*ctdb_transaction_commit_send(
2473 TALLOC_CTX
*mem_ctx
,
2474 struct tevent_context
*ev
,
2475 struct timeval timeout
,
2476 struct ctdb_transaction_handle
*h
)
2478 struct tevent_req
*req
, *subreq
;
2479 struct ctdb_transaction_commit_state
*state
;
2480 struct ctdb_req_control request
;
2483 req
= tevent_req_create(mem_ctx
, &state
,
2484 struct ctdb_transaction_commit_state
);
2490 state
->timeout
= timeout
;
2493 ret
= ctdb_transaction_fetch_db_seqnum(h
, &state
->seqnum
);
2495 tevent_req_error(req
, ret
);
2496 return tevent_req_post(req
, ev
);
2499 ret
= ctdb_transaction_store_db_seqnum(h
, state
->seqnum
+1);
2501 tevent_req_error(req
, ret
);
2502 return tevent_req_post(req
, ev
);
2505 ctdb_req_control_trans3_commit(&request
, h
->recbuf
);
2506 subreq
= ctdb_client_control_send(state
, ev
, h
->client
,
2507 ctdb_client_pnn(h
->client
),
2509 if (tevent_req_nomem(subreq
, req
)) {
2510 return tevent_req_post(req
, ev
);
2512 tevent_req_set_callback(subreq
, ctdb_transaction_commit_done
, req
);
2517 static void ctdb_transaction_commit_done(struct tevent_req
*subreq
)
2519 struct tevent_req
*req
= tevent_req_callback_data(
2520 subreq
, struct tevent_req
);
2521 struct ctdb_transaction_commit_state
*state
= tevent_req_data(
2522 req
, struct ctdb_transaction_commit_state
);
2523 struct ctdb_transaction_handle
*h
= state
->h
;
2524 struct ctdb_reply_control
*reply
;
2529 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2530 TALLOC_FREE(subreq
);
2533 ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n",
2534 h
->db
->db_name
, ret
));
2535 tevent_req_error(req
, ret
);
2539 ret
= ctdb_reply_control_trans3_commit(reply
);
2543 /* Control failed due to recovery */
2545 ret
= ctdb_transaction_fetch_db_seqnum(h
, &seqnum
);
2547 tevent_req_error(req
, ret
);
2551 if (seqnum
== state
->seqnum
) {
2552 struct ctdb_req_control request
;
2555 ctdb_req_control_trans3_commit(&request
,
2557 subreq
= ctdb_client_control_send(
2558 state
, state
->ev
, state
->h
->client
,
2559 ctdb_client_pnn(state
->h
->client
),
2560 state
->timeout
, &request
);
2561 if (tevent_req_nomem(subreq
, req
)) {
2564 tevent_req_set_callback(subreq
,
2565 ctdb_transaction_commit_done
,
2570 if (seqnum
!= state
->seqnum
+ 1) {
2572 ("transaction_commit: %s seqnum mismatch "
2573 "0x%"PRIx64
" != 0x%"PRIx64
" + 1\n",
2574 state
->h
->db
->db_name
, seqnum
, state
->seqnum
));
2575 tevent_req_error(req
, EIO
);
2580 /* trans3_commit successful */
2581 subreq
= ctdb_g_lock_unlock_send(state
, state
->ev
, h
->client
,
2582 h
->db_g_lock
, h
->lock_name
, h
->sid
);
2583 if (tevent_req_nomem(subreq
, req
)) {
2586 tevent_req_set_callback(subreq
, ctdb_transaction_commit_g_lock_done
,
2590 static void ctdb_transaction_commit_g_lock_done(struct tevent_req
*subreq
)
2592 struct tevent_req
*req
= tevent_req_callback_data(
2593 subreq
, struct tevent_req
);
2594 struct ctdb_transaction_commit_state
*state
= tevent_req_data(
2595 req
, struct ctdb_transaction_commit_state
);
2599 status
= ctdb_g_lock_unlock_recv(subreq
, &ret
);
2600 TALLOC_FREE(subreq
);
2603 ("transaction_commit: %s g_lock unlock failed, ret=%d\n",
2604 state
->h
->db
->db_name
, ret
));
2605 tevent_req_error(req
, ret
);
2609 talloc_free(state
->h
);
2610 tevent_req_done(req
);
2613 bool ctdb_transaction_commit_recv(struct tevent_req
*req
, int *perr
)
2617 if (tevent_req_is_unix_error(req
, &err
)) {
2627 int ctdb_transaction_commit(struct ctdb_transaction_handle
*h
)
2629 struct tevent_context
*ev
= h
->ev
;
2630 TALLOC_CTX
*mem_ctx
;
2631 struct tevent_req
*req
;
2635 if (h
->readonly
|| ! h
->updated
) {
2636 return ctdb_transaction_cancel(h
);
2639 mem_ctx
= talloc_new(NULL
);
2640 if (mem_ctx
== NULL
) {
2644 req
= ctdb_transaction_commit_send(mem_ctx
, ev
,
2645 tevent_timeval_zero(), h
);
2647 talloc_free(mem_ctx
);
2651 tevent_req_poll(req
, ev
);
2653 status
= ctdb_transaction_commit_recv(req
, &ret
);
2655 talloc_free(mem_ctx
);
2659 talloc_free(mem_ctx
);
2663 struct ctdb_transaction_cancel_state
{
2664 struct tevent_context
*ev
;
2665 struct ctdb_transaction_handle
*h
;
2666 struct timeval timeout
;
2669 static void ctdb_transaction_cancel_done(struct tevent_req
*subreq
);
2671 struct tevent_req
*ctdb_transaction_cancel_send(
2672 TALLOC_CTX
*mem_ctx
,
2673 struct tevent_context
*ev
,
2674 struct timeval timeout
,
2675 struct ctdb_transaction_handle
*h
)
2677 struct tevent_req
*req
, *subreq
;
2678 struct ctdb_transaction_cancel_state
*state
;
2680 req
= tevent_req_create(mem_ctx
, &state
,
2681 struct ctdb_transaction_cancel_state
);
2688 state
->timeout
= timeout
;
2690 subreq
= ctdb_g_lock_unlock_send(state
, state
->ev
, state
->h
->client
,
2691 state
->h
->db_g_lock
,
2692 state
->h
->lock_name
, state
->h
->sid
);
2693 if (tevent_req_nomem(subreq
, req
)) {
2694 return tevent_req_post(req
, ev
);
2696 tevent_req_set_callback(subreq
, ctdb_transaction_cancel_done
,
2702 static void ctdb_transaction_cancel_done(struct tevent_req
*subreq
)
2704 struct tevent_req
*req
= tevent_req_callback_data(
2705 subreq
, struct tevent_req
);
2706 struct ctdb_transaction_cancel_state
*state
= tevent_req_data(
2707 req
, struct ctdb_transaction_cancel_state
);
2711 status
= ctdb_g_lock_unlock_recv(subreq
, &ret
);
2712 TALLOC_FREE(subreq
);
2715 ("transaction_cancel: %s g_lock unlock failed, ret=%d\n",
2716 state
->h
->db
->db_name
, ret
));
2717 talloc_free(state
->h
);
2718 tevent_req_error(req
, ret
);
2722 talloc_free(state
->h
);
2723 tevent_req_done(req
);
2726 bool ctdb_transaction_cancel_recv(struct tevent_req
*req
, int *perr
)
2730 if (tevent_req_is_unix_error(req
, &err
)) {
2740 int ctdb_transaction_cancel(struct ctdb_transaction_handle
*h
)
2742 struct tevent_context
*ev
= h
->ev
;
2743 struct tevent_req
*req
;
2744 TALLOC_CTX
*mem_ctx
;
2748 mem_ctx
= talloc_new(NULL
);
2749 if (mem_ctx
== NULL
) {
2754 req
= ctdb_transaction_cancel_send(mem_ctx
, ev
,
2755 tevent_timeval_zero(), h
);
2757 talloc_free(mem_ctx
);
2762 tevent_req_poll(req
, ev
);
2764 status
= ctdb_transaction_cancel_recv(req
, &ret
);
2766 talloc_free(mem_ctx
);
2770 talloc_free(mem_ctx
);
2777 * In future Samba should register SERVER_ID.
2778 * Make that structure same as struct srvid {}.