4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
28 #include "common/logging.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
40 static struct ctdb_db_context
*client_db_handle(
41 struct ctdb_client_context
*client
,
44 struct ctdb_db_context
*db
;
46 for (db
= client
->db
; db
!= NULL
; db
= db
->next
) {
47 if (strcmp(db_name
, db
->db_name
) == 0) {
55 struct ctdb_set_db_flags_state
{
56 struct tevent_context
*ev
;
57 struct ctdb_client_context
*client
;
58 struct timeval timeout
;
61 bool readonly_done
, sticky_done
;
66 static void ctdb_set_db_flags_nodemap_done(struct tevent_req
*subreq
);
67 static void ctdb_set_db_flags_readonly_done(struct tevent_req
*subreq
);
68 static void ctdb_set_db_flags_sticky_done(struct tevent_req
*subreq
);
70 static struct tevent_req
*ctdb_set_db_flags_send(
72 struct tevent_context
*ev
,
73 struct ctdb_client_context
*client
,
74 uint32_t destnode
, struct timeval timeout
,
75 uint32_t db_id
, uint8_t db_flags
)
77 struct tevent_req
*req
, *subreq
;
78 struct ctdb_set_db_flags_state
*state
;
79 struct ctdb_req_control request
;
81 req
= tevent_req_create(mem_ctx
, &state
,
82 struct ctdb_set_db_flags_state
);
87 if (! (db_flags
& (CTDB_DB_FLAGS_READONLY
| CTDB_DB_FLAGS_STICKY
))) {
89 return tevent_req_post(req
, ev
);
93 state
->client
= client
;
94 state
->timeout
= timeout
;
96 state
->db_flags
= db_flags
;
98 ctdb_req_control_get_nodemap(&request
);
99 subreq
= ctdb_client_control_send(state
, ev
, client
, destnode
, timeout
,
101 if (tevent_req_nomem(subreq
, req
)) {
102 return tevent_req_post(req
, ev
);
104 tevent_req_set_callback(subreq
, ctdb_set_db_flags_nodemap_done
, req
);
109 static void ctdb_set_db_flags_nodemap_done(struct tevent_req
*subreq
)
111 struct tevent_req
*req
= tevent_req_callback_data(
112 subreq
, struct tevent_req
);
113 struct ctdb_set_db_flags_state
*state
= tevent_req_data(
114 req
, struct ctdb_set_db_flags_state
);
115 struct ctdb_req_control request
;
116 struct ctdb_reply_control
*reply
;
117 struct ctdb_node_map
*nodemap
;
121 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
124 tevent_req_error(req
, ret
);
128 ret
= ctdb_reply_control_get_nodemap(reply
, state
, &nodemap
);
131 tevent_req_error(req
, ret
);
135 state
->count
= list_of_connected_nodes(nodemap
, CTDB_UNKNOWN_PNN
,
136 state
, &state
->pnn_list
);
137 talloc_free(nodemap
);
138 if (state
->count
<= 0) {
139 tevent_req_error(req
, ENOMEM
);
143 if (state
->db_flags
& CTDB_DB_FLAGS_READONLY
) {
144 ctdb_req_control_set_db_readonly(&request
, state
->db_id
);
145 subreq
= ctdb_client_control_multi_send(
146 state
, state
->ev
, state
->client
,
147 state
->pnn_list
, state
->count
,
148 state
->timeout
, &request
);
149 if (tevent_req_nomem(subreq
, req
)) {
152 tevent_req_set_callback(subreq
,
153 ctdb_set_db_flags_readonly_done
, req
);
155 state
->readonly_done
= true;
158 if (state
->db_flags
& CTDB_DB_FLAGS_STICKY
) {
159 ctdb_req_control_set_db_sticky(&request
, state
->db_id
);
160 subreq
= ctdb_client_control_multi_send(
161 state
, state
->ev
, state
->client
,
162 state
->pnn_list
, state
->count
,
163 state
->timeout
, &request
);
164 if (tevent_req_nomem(subreq
, req
)) {
167 tevent_req_set_callback(subreq
, ctdb_set_db_flags_sticky_done
,
170 state
->sticky_done
= true;
174 static void ctdb_set_db_flags_readonly_done(struct tevent_req
*subreq
)
176 struct tevent_req
*req
= tevent_req_callback_data(
177 subreq
, struct tevent_req
);
178 struct ctdb_set_db_flags_state
*state
= tevent_req_data(
179 req
, struct ctdb_set_db_flags_state
);
183 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, NULL
,
187 tevent_req_error(req
, ret
);
191 state
->readonly_done
= true;
193 if (state
->readonly_done
&& state
->sticky_done
) {
194 tevent_req_done(req
);
198 static void ctdb_set_db_flags_sticky_done(struct tevent_req
*subreq
)
200 struct tevent_req
*req
= tevent_req_callback_data(
201 subreq
, struct tevent_req
);
202 struct ctdb_set_db_flags_state
*state
= tevent_req_data(
203 req
, struct ctdb_set_db_flags_state
);
207 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, NULL
,
211 tevent_req_error(req
, ret
);
215 state
->sticky_done
= true;
217 if (state
->readonly_done
&& state
->sticky_done
) {
218 tevent_req_done(req
);
222 static bool ctdb_set_db_flags_recv(struct tevent_req
*req
, int *perr
)
226 if (tevent_req_is_unix_error(req
, &err
)) {
235 struct ctdb_attach_state
{
236 struct tevent_context
*ev
;
237 struct ctdb_client_context
*client
;
238 struct timeval timeout
;
242 struct ctdb_db_context
*db
;
245 static void ctdb_attach_mutex_done(struct tevent_req
*subreq
);
246 static void ctdb_attach_dbid_done(struct tevent_req
*subreq
);
247 static void ctdb_attach_dbpath_done(struct tevent_req
*subreq
);
248 static void ctdb_attach_health_done(struct tevent_req
*subreq
);
249 static void ctdb_attach_flags_done(struct tevent_req
*subreq
);
251 struct tevent_req
*ctdb_attach_send(TALLOC_CTX
*mem_ctx
,
252 struct tevent_context
*ev
,
253 struct ctdb_client_context
*client
,
254 struct timeval timeout
,
255 const char *db_name
, uint8_t db_flags
)
257 struct tevent_req
*req
, *subreq
;
258 struct ctdb_attach_state
*state
;
259 struct ctdb_req_control request
;
261 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_attach_state
);
266 state
->db
= client_db_handle(client
, db_name
);
267 if (state
->db
!= NULL
) {
268 tevent_req_done(req
);
269 return tevent_req_post(req
, ev
);
273 state
->client
= client
;
274 state
->timeout
= timeout
;
275 state
->destnode
= ctdb_client_pnn(client
);
276 state
->db_flags
= db_flags
;
278 state
->db
= talloc_zero(client
, struct ctdb_db_context
);
279 if (tevent_req_nomem(state
->db
, req
)) {
280 return tevent_req_post(req
, ev
);
283 state
->db
->db_name
= talloc_strdup(state
->db
, db_name
);
284 if (tevent_req_nomem(state
->db
, req
)) {
285 return tevent_req_post(req
, ev
);
288 if (db_flags
& CTDB_DB_FLAGS_PERSISTENT
) {
289 state
->db
->persistent
= true;
292 ctdb_req_control_get_tunable(&request
, "TDBMutexEnabled");
293 subreq
= ctdb_client_control_send(state
, ev
, client
,
294 ctdb_client_pnn(client
), timeout
,
296 if (tevent_req_nomem(subreq
, req
)) {
297 return tevent_req_post(req
, ev
);
299 tevent_req_set_callback(subreq
, ctdb_attach_mutex_done
, req
);
304 static void ctdb_attach_mutex_done(struct tevent_req
*subreq
)
306 struct tevent_req
*req
= tevent_req_callback_data(
307 subreq
, struct tevent_req
);
308 struct ctdb_attach_state
*state
= tevent_req_data(
309 req
, struct ctdb_attach_state
);
310 struct ctdb_reply_control
*reply
;
311 struct ctdb_req_control request
;
312 uint32_t mutex_enabled
;
316 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
319 tevent_req_error(req
, ret
);
323 ret
= ctdb_reply_control_get_tunable(reply
, &mutex_enabled
);
325 /* Treat error as mutex support not available */
329 state
->tdb_flags
= TDB_DEFAULT
;
330 if (! state
->db
->persistent
) {
331 state
->tdb_flags
|= (TDB_INCOMPATIBLE_HASH
|
334 if (mutex_enabled
== 1) {
335 state
->tdb_flags
|= TDB_MUTEX_LOCKING
;
338 if (state
->db
->persistent
) {
339 ctdb_req_control_db_attach_persistent(&request
,
343 ctdb_req_control_db_attach(&request
, state
->db
->db_name
,
347 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
348 state
->destnode
, state
->timeout
,
350 if (tevent_req_nomem(subreq
, req
)) {
353 tevent_req_set_callback(subreq
, ctdb_attach_dbid_done
, req
);
356 static void ctdb_attach_dbid_done(struct tevent_req
*subreq
)
358 struct tevent_req
*req
= tevent_req_callback_data(
359 subreq
, struct tevent_req
);
360 struct ctdb_attach_state
*state
= tevent_req_data(
361 req
, struct ctdb_attach_state
);
362 struct ctdb_req_control request
;
363 struct ctdb_reply_control
*reply
;
367 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
370 tevent_req_error(req
, ret
);
374 if (state
->db
->persistent
) {
375 ret
= ctdb_reply_control_db_attach_persistent(
376 reply
, &state
->db
->db_id
);
378 ret
= ctdb_reply_control_db_attach(reply
, &state
->db
->db_id
);
382 tevent_req_error(req
, ret
);
386 ctdb_req_control_getdbpath(&request
, state
->db
->db_id
);
387 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
388 state
->destnode
, state
->timeout
,
390 if (tevent_req_nomem(subreq
, req
)) {
393 tevent_req_set_callback(subreq
, ctdb_attach_dbpath_done
, req
);
396 static void ctdb_attach_dbpath_done(struct tevent_req
*subreq
)
398 struct tevent_req
*req
= tevent_req_callback_data(
399 subreq
, struct tevent_req
);
400 struct ctdb_attach_state
*state
= tevent_req_data(
401 req
, struct ctdb_attach_state
);
402 struct ctdb_reply_control
*reply
;
403 struct ctdb_req_control request
;
407 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
410 tevent_req_error(req
, ret
);
414 ret
= ctdb_reply_control_getdbpath(reply
, state
->db
,
415 &state
->db
->db_path
);
418 tevent_req_error(req
, ret
);
422 ctdb_req_control_db_get_health(&request
, state
->db
->db_id
);
423 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
424 state
->destnode
, state
->timeout
,
426 if (tevent_req_nomem(subreq
, req
)) {
429 tevent_req_set_callback(subreq
, ctdb_attach_health_done
, req
);
432 static void ctdb_attach_health_done(struct tevent_req
*subreq
)
434 struct tevent_req
*req
= tevent_req_callback_data(
435 subreq
, struct tevent_req
);
436 struct ctdb_attach_state
*state
= tevent_req_data(
437 req
, struct ctdb_attach_state
);
438 struct ctdb_reply_control
*reply
;
443 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
446 tevent_req_error(req
, ret
);
450 ret
= ctdb_reply_control_db_get_health(reply
, state
, &reason
);
452 tevent_req_error(req
, ret
);
456 if (reason
!= NULL
) {
457 /* Database unhealthy, avoid attach */
458 /* FIXME: Log here */
459 tevent_req_error(req
, EIO
);
463 subreq
= ctdb_set_db_flags_send(state
, state
->ev
, state
->client
,
464 state
->destnode
, state
->timeout
,
465 state
->db
->db_id
, state
->db_flags
);
466 if (tevent_req_nomem(subreq
, req
)) {
469 tevent_req_set_callback(subreq
, ctdb_attach_flags_done
, req
);
472 static void ctdb_attach_flags_done(struct tevent_req
*subreq
)
474 struct tevent_req
*req
= tevent_req_callback_data(
475 subreq
, struct tevent_req
);
476 struct ctdb_attach_state
*state
= tevent_req_data(
477 req
, struct ctdb_attach_state
);
481 status
= ctdb_set_db_flags_recv(subreq
, &ret
);
484 tevent_req_error(req
, ret
);
488 state
->db
->ltdb
= tdb_wrap_open(state
->db
, state
->db
->db_path
, 0,
489 state
->tdb_flags
, O_RDWR
, 0);
490 if (tevent_req_nomem(state
->db
->ltdb
, req
)) {
493 DLIST_ADD(state
->client
->db
, state
->db
);
495 tevent_req_done(req
);
498 bool ctdb_attach_recv(struct tevent_req
*req
, int *perr
,
499 struct ctdb_db_context
**out
)
501 struct ctdb_attach_state
*state
= tevent_req_data(
502 req
, struct ctdb_attach_state
);
505 if (tevent_req_is_unix_error(req
, &err
)) {
518 int ctdb_attach(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
519 struct ctdb_client_context
*client
,
520 struct timeval timeout
,
521 const char *db_name
, uint8_t db_flags
,
522 struct ctdb_db_context
**out
)
524 struct tevent_req
*req
;
528 req
= ctdb_attach_send(mem_ctx
, ev
, client
, timeout
,
534 tevent_req_poll(req
, ev
);
536 status
= ctdb_attach_recv(req
, &ret
, out
);
542 ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
543 ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
544 ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
550 int ctdb_detach(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
551 struct ctdb_client_context
*client
,
552 struct timeval timeout
, uint32_t db_id
)
554 struct ctdb_db_context
*db
;
557 ret
= ctdb_ctrl_db_detach(mem_ctx
, ev
, client
, client
->pnn
, timeout
,
563 for (db
= client
->db
; db
!= NULL
; db
= db
->next
) {
564 if (db
->db_id
== db_id
) {
565 DLIST_REMOVE(client
->db
, db
);
573 uint32_t ctdb_db_id(struct ctdb_db_context
*db
)
578 struct ctdb_db_traverse_state
{
579 ctdb_rec_parser_func_t parser
;
585 static int ctdb_db_traverse_handler(struct tdb_context
*tdb
, TDB_DATA key
,
586 TDB_DATA data
, void *private_data
)
588 struct ctdb_db_traverse_state
*state
=
589 (struct ctdb_db_traverse_state
*)private_data
;
592 if (state
->extract_header
) {
593 struct ctdb_ltdb_header header
;
595 ret
= ctdb_ltdb_header_extract(&data
, &header
);
601 ret
= state
->parser(0, &header
, key
, data
, state
->private_data
);
603 ret
= state
->parser(0, NULL
, key
, data
, state
->private_data
);
614 int ctdb_db_traverse(struct ctdb_db_context
*db
, bool readonly
,
616 ctdb_rec_parser_func_t parser
, void *private_data
)
618 struct ctdb_db_traverse_state state
;
621 state
.parser
= parser
;
622 state
.private_data
= private_data
;
623 state
.extract_header
= extract_header
;
627 ret
= tdb_traverse_read(db
->ltdb
->tdb
,
628 ctdb_db_traverse_handler
, &state
);
630 ret
= tdb_traverse(db
->ltdb
->tdb
,
631 ctdb_db_traverse_handler
, &state
);
641 static int ctdb_ltdb_fetch(struct ctdb_db_context
*db
, TDB_DATA key
,
642 struct ctdb_ltdb_header
*header
,
643 TALLOC_CTX
*mem_ctx
, TDB_DATA
*data
)
648 rec
= tdb_fetch(db
->ltdb
->tdb
, key
);
649 if (rec
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
650 /* No record present */
651 if (rec
.dptr
!= NULL
) {
655 if (tdb_error(db
->ltdb
->tdb
) != TDB_ERR_NOEXIST
) {
660 header
->dmaster
= CTDB_UNKNOWN_PNN
;
669 ret
= ctdb_ltdb_header_pull(rec
.dptr
, rec
.dsize
, header
);
676 size_t offset
= ctdb_ltdb_header_len(header
);
678 data
->dsize
= rec
.dsize
- offset
;
679 data
->dptr
= talloc_memdup(mem_ctx
, rec
.dptr
+ offset
,
681 if (data
->dptr
== NULL
) {
691 * Fetch a record from volatile database
694 * 1. Get a lock on the hash chain
695 * 2. If the record does not exist, migrate the record
696 * 3. If readonly=true and delegations do not exist, migrate the record.
697 * 4. If readonly=false and delegations exist, migrate the record.
698 * 5. If the local node is not dmaster, migrate the record.
702 struct ctdb_fetch_lock_state
{
703 struct tevent_context
*ev
;
704 struct ctdb_client_context
*client
;
705 struct ctdb_record_handle
*h
;
710 static int ctdb_fetch_lock_check(struct tevent_req
*req
);
711 static void ctdb_fetch_lock_migrate(struct tevent_req
*req
);
712 static void ctdb_fetch_lock_migrate_done(struct tevent_req
*subreq
);
714 struct tevent_req
*ctdb_fetch_lock_send(TALLOC_CTX
*mem_ctx
,
715 struct tevent_context
*ev
,
716 struct ctdb_client_context
*client
,
717 struct ctdb_db_context
*db
,
718 TDB_DATA key
, bool readonly
)
720 struct ctdb_fetch_lock_state
*state
;
721 struct tevent_req
*req
;
724 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_fetch_lock_state
);
730 state
->client
= client
;
732 state
->h
= talloc_zero(db
, struct ctdb_record_handle
);
733 if (tevent_req_nomem(state
->h
, req
)) {
734 return tevent_req_post(req
, ev
);
736 state
->h
->client
= client
;
738 state
->h
->key
.dptr
= talloc_memdup(state
->h
, key
.dptr
, key
.dsize
);
739 if (tevent_req_nomem(state
->h
->key
.dptr
, req
)) {
740 return tevent_req_post(req
, ev
);
742 state
->h
->key
.dsize
= key
.dsize
;
743 state
->h
->readonly
= false;
745 state
->readonly
= readonly
;
746 state
->pnn
= ctdb_client_pnn(client
);
748 /* Check that database is not persistent */
749 if (db
->persistent
) {
750 tevent_req_error(req
, EINVAL
);
751 return tevent_req_post(req
, ev
);
754 ret
= ctdb_fetch_lock_check(req
);
756 tevent_req_done(req
);
757 return tevent_req_post(req
, ev
);
760 tevent_req_error(req
, ret
);
761 return tevent_req_post(req
, ev
);
766 static int ctdb_fetch_lock_check(struct tevent_req
*req
)
768 struct ctdb_fetch_lock_state
*state
= tevent_req_data(
769 req
, struct ctdb_fetch_lock_state
);
770 struct ctdb_record_handle
*h
= state
->h
;
771 struct ctdb_ltdb_header header
;
772 TDB_DATA data
= tdb_null
;
774 bool do_migrate
= false;
776 ret
= tdb_chainlock(state
->h
->db
->ltdb
->tdb
, state
->h
->key
);
782 data
= tdb_fetch(h
->db
->ltdb
->tdb
, h
->key
);
783 if (data
.dptr
== NULL
) {
784 if (tdb_error(h
->db
->ltdb
->tdb
) == TDB_ERR_NOEXIST
) {
793 ret
= ctdb_ltdb_header_pull(data
.dptr
, data
.dsize
, &header
);
799 if (! state
->readonly
) {
800 /* Read/write access */
801 if (header
.dmaster
== state
->pnn
&&
802 header
.flags
& CTDB_REC_RO_HAVE_DELEGATIONS
) {
806 if (header
.dmaster
!= state
->pnn
) {
810 /* Readonly access */
811 if (header
.dmaster
!= state
->pnn
&&
812 ! (header
.flags
& (CTDB_REC_RO_HAVE_READONLY
|
813 CTDB_REC_RO_HAVE_DELEGATIONS
))) {
818 /* We are the dmaster or readonly delegation */
821 if (header
.flags
& (CTDB_REC_RO_HAVE_READONLY
|
822 CTDB_REC_RO_HAVE_DELEGATIONS
)) {
832 if (data
.dptr
!= NULL
) {
835 ret
= tdb_chainunlock(h
->db
->ltdb
->tdb
, h
->key
);
837 DEBUG(DEBUG_ERR
, ("tdb_chainunlock failed on %s\n",
843 ctdb_fetch_lock_migrate(req
);
848 static void ctdb_fetch_lock_migrate(struct tevent_req
*req
)
850 struct ctdb_fetch_lock_state
*state
= tevent_req_data(
851 req
, struct ctdb_fetch_lock_state
);
852 struct ctdb_req_call request
;
853 struct tevent_req
*subreq
;
855 ZERO_STRUCT(request
);
856 request
.flags
= CTDB_IMMEDIATE_MIGRATION
;
857 if (state
->readonly
) {
858 request
.flags
|= CTDB_WANT_READONLY
;
860 request
.db_id
= state
->h
->db
->db_id
;
861 request
.callid
= CTDB_NULL_FUNC
;
862 request
.key
= state
->h
->key
;
864 subreq
= ctdb_client_call_send(state
, state
->ev
, state
->client
,
866 if (tevent_req_nomem(subreq
, req
)) {
870 tevent_req_set_callback(subreq
, ctdb_fetch_lock_migrate_done
, req
);
873 static void ctdb_fetch_lock_migrate_done(struct tevent_req
*subreq
)
875 struct tevent_req
*req
= tevent_req_callback_data(
876 subreq
, struct tevent_req
);
877 struct ctdb_fetch_lock_state
*state
= tevent_req_data(
878 req
, struct ctdb_fetch_lock_state
);
879 struct ctdb_reply_call
*reply
;
883 status
= ctdb_client_call_recv(subreq
, state
, &reply
, &ret
);
886 tevent_req_error(req
, ret
);
890 if (reply
->status
!= 0) {
891 tevent_req_error(req
, EIO
);
896 ret
= ctdb_fetch_lock_check(req
);
898 tevent_req_error(req
, ret
);
902 tevent_req_done(req
);
905 static int ctdb_record_handle_destructor(struct ctdb_record_handle
*h
)
907 tdb_chainunlock(h
->db
->ltdb
->tdb
, h
->key
);
912 struct ctdb_record_handle
*ctdb_fetch_lock_recv(struct tevent_req
*req
,
913 struct ctdb_ltdb_header
*header
,
915 TDB_DATA
*data
, int *perr
)
917 struct ctdb_fetch_lock_state
*state
= tevent_req_data(
918 req
, struct ctdb_fetch_lock_state
);
919 struct ctdb_record_handle
*h
= state
->h
;
922 if (tevent_req_is_unix_error(req
, &err
)) {
929 if (header
!= NULL
) {
935 offset
= ctdb_ltdb_header_len(&h
->header
);
937 data
->dsize
= h
->data
.dsize
- offset
;
938 data
->dptr
= talloc_memdup(mem_ctx
, h
->data
.dptr
+ offset
,
940 if (data
->dptr
== NULL
) {
941 TALLOC_FREE(state
->h
);
949 talloc_set_destructor(h
, ctdb_record_handle_destructor
);
953 int ctdb_fetch_lock(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
954 struct ctdb_client_context
*client
,
955 struct ctdb_db_context
*db
, TDB_DATA key
, bool readonly
,
956 struct ctdb_record_handle
**out
,
957 struct ctdb_ltdb_header
*header
, TDB_DATA
*data
)
959 struct tevent_req
*req
;
960 struct ctdb_record_handle
*h
;
963 req
= ctdb_fetch_lock_send(mem_ctx
, ev
, client
, db
, key
, readonly
);
968 tevent_req_poll(req
, ev
);
970 h
= ctdb_fetch_lock_recv(req
, header
, mem_ctx
, data
, &ret
);
979 int ctdb_store_record(struct ctdb_record_handle
*h
, TDB_DATA data
)
985 /* Cannot modify the record if it was obtained as a readonly copy */
990 /* Check if the new data is same */
991 if (h
->data
.dsize
== data
.dsize
&&
992 memcmp(h
->data
.dptr
, data
.dptr
, data
.dsize
) == 0) {
993 /* No need to do anything */
997 offset
= ctdb_ltdb_header_len(&h
->header
);
998 rec
.dsize
= offset
+ data
.dsize
;
999 rec
.dptr
= talloc_size(h
, rec
.dsize
);
1000 if (rec
.dptr
== NULL
) {
1004 ctdb_ltdb_header_push(&h
->header
, rec
.dptr
);
1005 memcpy(rec
.dptr
+ offset
, data
.dptr
, data
.dsize
);
1007 ret
= tdb_store(h
->db
->ltdb
->tdb
, h
->key
, rec
, TDB_REPLACE
);
1009 DEBUG(DEBUG_ERR
, ("Failed to store record in DB %s\n",
1014 talloc_free(rec
.dptr
);
1018 int ctdb_delete_record(struct ctdb_record_handle
*h
)
1021 struct ctdb_key_data key
;
1024 /* Cannot delete the record if it was obtained as a readonly copy */
1029 rec
.dsize
= ctdb_ltdb_header_len(&h
->header
);
1030 rec
.dptr
= talloc_size(h
, rec
.dsize
);
1031 if (rec
.dptr
== NULL
) {
1035 ctdb_ltdb_header_push(&h
->header
, rec
.dptr
);
1037 ret
= tdb_store(h
->db
->ltdb
->tdb
, h
->key
, rec
, TDB_REPLACE
);
1038 talloc_free(rec
.dptr
);
1040 DEBUG(DEBUG_ERR
, ("Failed to delete record in DB %s\n",
1045 key
.db_id
= h
->db
->db_id
;
1046 key
.header
= h
->header
;
1049 ret
= ctdb_ctrl_schedule_for_deletion(h
, h
->ev
, h
->client
,
1051 tevent_timeval_zero(), &key
);
1053 DEBUG(DEBUG_WARNING
,
1054 ("Failed to mark record to be deleted in DB %s\n",
1063 * Global lock functions
1066 struct ctdb_g_lock_lock_state
{
1067 struct tevent_context
*ev
;
1068 struct ctdb_client_context
*client
;
1069 struct ctdb_db_context
*db
;
1071 struct ctdb_server_id my_sid
;
1072 enum ctdb_g_lock_type lock_type
;
1073 struct ctdb_record_handle
*h
;
1074 /* state for verification of active locks */
1075 struct ctdb_g_lock_list
*lock_list
;
1076 unsigned int current
;
1079 static void ctdb_g_lock_lock_fetched(struct tevent_req
*subreq
);
1080 static void ctdb_g_lock_lock_process_locks(struct tevent_req
*req
);
1081 static void ctdb_g_lock_lock_checked(struct tevent_req
*subreq
);
1082 static int ctdb_g_lock_lock_update(struct tevent_req
*req
);
1083 static void ctdb_g_lock_lock_retry(struct tevent_req
*subreq
);
1085 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1
,
1086 enum ctdb_g_lock_type l2
)
1088 if ((l1
== CTDB_G_LOCK_READ
) && (l2
== CTDB_G_LOCK_READ
)) {
1094 struct tevent_req
*ctdb_g_lock_lock_send(TALLOC_CTX
*mem_ctx
,
1095 struct tevent_context
*ev
,
1096 struct ctdb_client_context
*client
,
1097 struct ctdb_db_context
*db
,
1098 const char *keyname
,
1099 struct ctdb_server_id
*sid
,
1102 struct tevent_req
*req
, *subreq
;
1103 struct ctdb_g_lock_lock_state
*state
;
1105 req
= tevent_req_create(mem_ctx
, &state
,
1106 struct ctdb_g_lock_lock_state
);
1112 state
->client
= client
;
1114 state
->key
.dptr
= discard_const(keyname
);
1115 state
->key
.dsize
= strlen(keyname
) + 1;
1116 state
->my_sid
= *sid
;
1117 state
->lock_type
= (readonly
? CTDB_G_LOCK_READ
: CTDB_G_LOCK_WRITE
);
1119 subreq
= ctdb_fetch_lock_send(state
, ev
, client
, db
, state
->key
,
1121 if (tevent_req_nomem(subreq
, req
)) {
1122 return tevent_req_post(req
, ev
);
1124 tevent_req_set_callback(subreq
, ctdb_g_lock_lock_fetched
, req
);
1129 static void ctdb_g_lock_lock_fetched(struct tevent_req
*subreq
)
1131 struct tevent_req
*req
= tevent_req_callback_data(
1132 subreq
, struct tevent_req
);
1133 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1134 req
, struct ctdb_g_lock_lock_state
);
1138 state
->h
= ctdb_fetch_lock_recv(subreq
, NULL
, state
, &data
, &ret
);
1139 TALLOC_FREE(subreq
);
1140 if (state
->h
== NULL
) {
1141 tevent_req_error(req
, ret
);
1145 if (state
->lock_list
!= NULL
) {
1146 TALLOC_FREE(state
->lock_list
);
1150 ret
= ctdb_g_lock_list_pull(data
.dptr
, data
.dsize
, state
,
1152 talloc_free(data
.dptr
);
1154 tevent_req_error(req
, ret
);
1158 ctdb_g_lock_lock_process_locks(req
);
1161 static void ctdb_g_lock_lock_process_locks(struct tevent_req
*req
)
1163 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1164 req
, struct ctdb_g_lock_lock_state
);
1165 struct tevent_req
*subreq
;
1166 struct ctdb_g_lock
*lock
;
1167 bool check_server
= false;
1170 while (state
->current
< state
->lock_list
->num
) {
1171 lock
= &state
->lock_list
->lock
[state
->current
];
1173 /* We should not ask for the same lock more than once */
1174 if (ctdb_server_id_equal(&lock
->sid
, &state
->my_sid
)) {
1175 tevent_req_error(req
, EDEADLK
);
1179 if (ctdb_g_lock_conflicts(lock
->type
, state
->lock_type
)) {
1180 check_server
= true;
1184 state
->current
+= 1;
1188 struct ctdb_req_control request
;
1189 struct ctdb_uint64_array u64_array
;
1192 u64_array
.val
= &lock
->sid
.unique_id
;
1194 ctdb_req_control_check_srvids(&request
, &u64_array
);
1195 subreq
= ctdb_client_control_send(state
, state
->ev
,
1198 tevent_timeval_zero(),
1200 if (tevent_req_nomem(subreq
, req
)) {
1203 tevent_req_set_callback(subreq
, ctdb_g_lock_lock_checked
, req
);
1207 /* There is no conflict, add ourself to the lock_list */
1208 state
->lock_list
->lock
= talloc_realloc(state
->lock_list
,
1209 state
->lock_list
->lock
,
1211 state
->lock_list
->num
+ 1);
1212 if (state
->lock_list
->lock
== NULL
) {
1213 tevent_req_error(req
, ENOMEM
);
1217 lock
= &state
->lock_list
->lock
[state
->lock_list
->num
];
1218 lock
->type
= state
->lock_type
;
1219 lock
->sid
= state
->my_sid
;
1220 state
->lock_list
->num
+= 1;
1222 ret
= ctdb_g_lock_lock_update(req
);
1224 tevent_req_error(req
, ret
);
1228 tevent_req_done(req
);
1231 static void ctdb_g_lock_lock_checked(struct tevent_req
*subreq
)
1233 struct tevent_req
*req
= tevent_req_callback_data(
1234 subreq
, struct tevent_req
);
1235 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1236 req
, struct ctdb_g_lock_lock_state
);
1237 struct ctdb_reply_control
*reply
;
1238 struct ctdb_uint8_array
*u8_array
;
1243 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
1244 TALLOC_FREE(subreq
);
1246 tevent_req_error(req
, ret
);
1250 ret
= ctdb_reply_control_check_srvids(reply
, state
, &u8_array
);
1252 tevent_req_error(req
, ENOMEM
);
1256 if (u8_array
->num
!= 1) {
1257 talloc_free(u8_array
);
1258 tevent_req_error(req
, EIO
);
1262 val
= u8_array
->val
[0];
1263 talloc_free(u8_array
);
1266 /* server process exists, need to retry */
1267 subreq
= tevent_wakeup_send(state
, state
->ev
,
1268 tevent_timeval_current_ofs(1,0));
1269 if (tevent_req_nomem(subreq
, req
)) {
1272 tevent_req_set_callback(subreq
, ctdb_g_lock_lock_retry
, req
);
1276 /* server process does not exist, remove conflicting entry */
1277 state
->lock_list
->lock
[state
->current
] =
1278 state
->lock_list
->lock
[state
->lock_list
->num
-1];
1279 state
->lock_list
->num
-= 1;
1281 ret
= ctdb_g_lock_lock_update(req
);
1283 tevent_req_error(req
, ret
);
1287 ctdb_g_lock_lock_process_locks(req
);
1290 static int ctdb_g_lock_lock_update(struct tevent_req
*req
)
1292 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1293 req
, struct ctdb_g_lock_lock_state
);
1297 data
.dsize
= ctdb_g_lock_list_len(state
->lock_list
);
1298 data
.dptr
= talloc_size(state
, data
.dsize
);
1299 if (data
.dptr
== NULL
) {
1303 ctdb_g_lock_list_push(state
->lock_list
, data
.dptr
);
1304 ret
= ctdb_store_record(state
->h
, data
);
1305 talloc_free(data
.dptr
);
1310 static int ctdb_g_lock_lock_update(struct ctdb_g_lock_lock_state
*state
,
1311 struct ctdb_g_lock_list
*lock_list
,
1312 struct ctdb_record_handle
*h
)
1314 struct ctdb_g_lock
*lock
;
1315 bool conflict
= false;
1316 bool modified
= false;
1319 for (i
=0; i
<lock_list
->num
; i
++) {
1320 lock
= &lock_list
->lock
[i
];
1322 /* We should not ask for lock more than once */
1323 if (ctdb_server_id_equal(&lock
->sid
, &state
->my_sid
)) {
1327 if (ctdb_g_lock_conflicts(lock
->type
, state
->lock_type
)) {
1331 ret
= ctdb_server_id_exists(state
->client
, &lock
->sid
,
1341 /* Server does not exist, delete conflicting entry */
1342 lock_list
->lock
[i
] = lock_list
->lock
[lock_list
->num
-1];
1343 lock_list
->num
-= 1;
1349 lock
= talloc_realloc(lock_list
, lock_list
->lock
,
1350 struct ctdb_g_lock
, lock_list
->num
+1);
1355 lock
[lock_list
->num
].type
= state
->lock_type
;
1356 lock
[lock_list
->num
].sid
= state
->my_sid
;
1357 lock_list
->lock
= lock
;
1358 lock_list
->num
+= 1;
1365 data
.dsize
= ctdb_g_lock_list_len(lock_list
);
1366 data
.dptr
= talloc_size(state
, data
.dsize
);
1367 if (data
.dptr
== NULL
) {
1371 ctdb_g_lock_list_push(lock_list
, data
.dptr
);
1372 ret
= ctdb_store_record(h
, data
);
1373 talloc_free(data
.dptr
);
1386 static void ctdb_g_lock_lock_retry(struct tevent_req
*subreq
)
1388 struct tevent_req
*req
= tevent_req_callback_data(
1389 subreq
, struct tevent_req
);
1390 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1391 req
, struct ctdb_g_lock_lock_state
);
1394 success
= tevent_wakeup_recv(subreq
);
1395 TALLOC_FREE(subreq
);
1397 tevent_req_error(req
, ENOMEM
);
1401 subreq
= ctdb_fetch_lock_send(state
, state
->ev
, state
->client
,
1402 state
->db
, state
->key
, false);
1403 if (tevent_req_nomem(subreq
, req
)) {
1406 tevent_req_set_callback(subreq
, ctdb_g_lock_lock_fetched
, req
);
1409 bool ctdb_g_lock_lock_recv(struct tevent_req
*req
, int *perr
)
1411 struct ctdb_g_lock_lock_state
*state
= tevent_req_data(
1412 req
, struct ctdb_g_lock_lock_state
);
1415 TALLOC_FREE(state
->h
);
1417 if (tevent_req_is_unix_error(req
, &err
)) {
1427 struct ctdb_g_lock_unlock_state
{
1428 struct tevent_context
*ev
;
1429 struct ctdb_client_context
*client
;
1430 struct ctdb_db_context
*db
;
1432 struct ctdb_server_id my_sid
;
1433 struct ctdb_record_handle
*h
;
1434 struct ctdb_g_lock_list
*lock_list
;
1437 static void ctdb_g_lock_unlock_fetched(struct tevent_req
*subreq
);
1438 static int ctdb_g_lock_unlock_update(struct tevent_req
*req
);
1440 struct tevent_req
*ctdb_g_lock_unlock_send(TALLOC_CTX
*mem_ctx
,
1441 struct tevent_context
*ev
,
1442 struct ctdb_client_context
*client
,
1443 struct ctdb_db_context
*db
,
1444 const char *keyname
,
1445 struct ctdb_server_id sid
)
1447 struct tevent_req
*req
, *subreq
;
1448 struct ctdb_g_lock_unlock_state
*state
;
1450 req
= tevent_req_create(mem_ctx
, &state
,
1451 struct ctdb_g_lock_unlock_state
);
1457 state
->client
= client
;
1459 state
->key
.dptr
= discard_const(keyname
);
1460 state
->key
.dsize
= strlen(keyname
) + 1;
1461 state
->my_sid
= sid
;
1463 subreq
= ctdb_fetch_lock_send(state
, ev
, client
, db
, state
->key
,
1465 if (tevent_req_nomem(subreq
, req
)) {
1466 return tevent_req_post(req
, ev
);
1468 tevent_req_set_callback(subreq
, ctdb_g_lock_unlock_fetched
, req
);
1473 static void ctdb_g_lock_unlock_fetched(struct tevent_req
*subreq
)
1475 struct tevent_req
*req
= tevent_req_callback_data(
1476 subreq
, struct tevent_req
);
1477 struct ctdb_g_lock_unlock_state
*state
= tevent_req_data(
1478 req
, struct ctdb_g_lock_unlock_state
);
1482 state
->h
= ctdb_fetch_lock_recv(subreq
, NULL
, state
, &data
, &ret
);
1483 TALLOC_FREE(subreq
);
1484 if (state
->h
== NULL
) {
1485 tevent_req_error(req
, ret
);
1489 ret
= ctdb_g_lock_list_pull(data
.dptr
, data
.dsize
, state
,
1492 tevent_req_error(req
, ret
);
1496 ret
= ctdb_g_lock_unlock_update(req
);
1498 tevent_req_error(req
, ret
);
1502 tevent_req_done(req
);
1505 static int ctdb_g_lock_unlock_update(struct tevent_req
*req
)
1507 struct ctdb_g_lock_unlock_state
*state
= tevent_req_data(
1508 req
, struct ctdb_g_lock_unlock_state
);
1509 struct ctdb_g_lock
*lock
;
1512 for (i
=0; i
<state
->lock_list
->num
; i
++) {
1513 lock
= &state
->lock_list
->lock
[i
];
1515 if (ctdb_server_id_equal(&lock
->sid
, &state
->my_sid
)) {
1520 if (i
< state
->lock_list
->num
) {
1521 state
->lock_list
->lock
[i
] =
1522 state
->lock_list
->lock
[state
->lock_list
->num
-1];
1523 state
->lock_list
->num
-= 1;
1526 if (state
->lock_list
->num
== 0) {
1527 ctdb_delete_record(state
->h
);
1531 data
.dsize
= ctdb_g_lock_list_len(state
->lock_list
);
1532 data
.dptr
= talloc_size(state
, data
.dsize
);
1533 if (data
.dptr
== NULL
) {
1537 ctdb_g_lock_list_push(state
->lock_list
, data
.dptr
);
1538 ret
= ctdb_store_record(state
->h
, data
);
1539 talloc_free(data
.dptr
);
1548 bool ctdb_g_lock_unlock_recv(struct tevent_req
*req
, int *perr
)
1550 struct ctdb_g_lock_unlock_state
*state
= tevent_req_data(
1551 req
, struct ctdb_g_lock_unlock_state
);
1554 TALLOC_FREE(state
->h
);
1556 if (tevent_req_is_unix_error(req
, &err
)) {
1567 * Persistent database functions
1569 struct ctdb_transaction_start_state
{
1570 struct tevent_context
*ev
;
1571 struct ctdb_client_context
*client
;
1572 struct timeval timeout
;
1573 struct ctdb_transaction_handle
*h
;
1577 static void ctdb_transaction_g_lock_attached(struct tevent_req
*subreq
);
1578 static void ctdb_transaction_register_done(struct tevent_req
*subreq
);
1579 static void ctdb_transaction_g_lock_done(struct tevent_req
*subreq
);
1580 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle
*h
);
1582 struct tevent_req
*ctdb_transaction_start_send(TALLOC_CTX
*mem_ctx
,
1583 struct tevent_context
*ev
,
1584 struct ctdb_client_context
*client
,
1585 struct timeval timeout
,
1586 struct ctdb_db_context
*db
,
1589 struct ctdb_transaction_start_state
*state
;
1590 struct tevent_req
*req
, *subreq
;
1591 struct ctdb_transaction_handle
*h
;
1593 req
= tevent_req_create(mem_ctx
, &state
,
1594 struct ctdb_transaction_start_state
);
1599 if (! db
->persistent
) {
1600 tevent_req_error(req
, EINVAL
);
1601 return tevent_req_post(req
, ev
);
1605 state
->client
= client
;
1606 state
->destnode
= ctdb_client_pnn(client
);
1608 h
= talloc_zero(db
, struct ctdb_transaction_handle
);
1609 if (tevent_req_nomem(h
, req
)) {
1610 return tevent_req_post(req
, ev
);
1616 h
->readonly
= readonly
;
1619 /* SRVID is unique for databases, so client can have transactions active
1620 * for multiple databases */
1621 h
->sid
.pid
= getpid();
1622 h
->sid
.task_id
= db
->db_id
;
1623 h
->sid
.vnn
= state
->destnode
;
1624 h
->sid
.unique_id
= h
->sid
.task_id
;
1625 h
->sid
.unique_id
= (h
->sid
.unique_id
<< 32) | h
->sid
.pid
;
1627 h
->recbuf
= ctdb_rec_buffer_init(h
, db
->db_id
);
1628 if (tevent_req_nomem(h
->recbuf
, req
)) {
1629 return tevent_req_post(req
, ev
);
1632 h
->lock_name
= talloc_asprintf(h
, "transaction_db_0x%08x", db
->db_id
);
1633 if (tevent_req_nomem(h
->lock_name
, req
)) {
1634 return tevent_req_post(req
, ev
);
1639 subreq
= ctdb_attach_send(state
, ev
, client
, timeout
, "g_lock.tdb", 0);
1640 if (tevent_req_nomem(subreq
, req
)) {
1641 return tevent_req_post(req
, ev
);
1643 tevent_req_set_callback(subreq
, ctdb_transaction_g_lock_attached
, req
);
1648 static void ctdb_transaction_g_lock_attached(struct tevent_req
*subreq
)
1650 struct tevent_req
*req
= tevent_req_callback_data(
1651 subreq
, struct tevent_req
);
1652 struct ctdb_transaction_start_state
*state
= tevent_req_data(
1653 req
, struct ctdb_transaction_start_state
);
1654 struct ctdb_req_control request
;
1658 status
= ctdb_attach_recv(subreq
, &ret
, &state
->h
->db_g_lock
);
1659 TALLOC_FREE(subreq
);
1661 tevent_req_error(req
, ret
);
1665 ctdb_req_control_register_srvid(&request
, state
->h
->sid
.unique_id
);
1666 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
1667 state
->destnode
, state
->timeout
,
1669 if (tevent_req_nomem(subreq
, req
)) {
1672 tevent_req_set_callback(subreq
, ctdb_transaction_register_done
, req
);
1675 static void ctdb_transaction_register_done(struct tevent_req
*subreq
)
1677 struct tevent_req
*req
= tevent_req_callback_data(
1678 subreq
, struct tevent_req
);
1679 struct ctdb_transaction_start_state
*state
= tevent_req_data(
1680 req
, struct ctdb_transaction_start_state
);
1681 struct ctdb_reply_control
*reply
;
1685 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
1686 TALLOC_FREE(subreq
);
1688 tevent_req_error(req
, ret
);
1692 ret
= ctdb_reply_control_register_srvid(reply
);
1695 tevent_req_error(req
, ret
);
1699 subreq
= ctdb_g_lock_lock_send(state
, state
->ev
, state
->client
,
1700 state
->h
->db_g_lock
, state
->h
->lock_name
,
1701 &state
->h
->sid
, state
->h
->readonly
);
1702 if (tevent_req_nomem(subreq
, req
)) {
1705 tevent_req_set_callback(subreq
, ctdb_transaction_g_lock_done
, req
);
1708 static void ctdb_transaction_g_lock_done(struct tevent_req
*subreq
)
1710 struct tevent_req
*req
= tevent_req_callback_data(
1711 subreq
, struct tevent_req
);
1715 status
= ctdb_g_lock_lock_recv(subreq
, &ret
);
1716 TALLOC_FREE(subreq
);
1718 tevent_req_error(req
, ret
);
1722 tevent_req_done(req
);
1725 struct ctdb_transaction_handle
*ctdb_transaction_start_recv(
1726 struct tevent_req
*req
,
1729 struct ctdb_transaction_start_state
*state
= tevent_req_data(
1730 req
, struct ctdb_transaction_start_state
);
1731 struct ctdb_transaction_handle
*h
= state
->h
;
1734 if (tevent_req_is_unix_error(req
, &err
)) {
1741 talloc_set_destructor(h
, ctdb_transaction_handle_destructor
);
1745 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle
*h
)
1749 ret
= ctdb_ctrl_deregister_srvid(h
, h
->ev
, h
->client
, h
->client
->pnn
,
1750 tevent_timeval_zero(),
1753 DEBUG(DEBUG_WARNING
, ("Failed to deregister SRVID\n"));
1759 int ctdb_transaction_start(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
1760 struct ctdb_client_context
*client
,
1761 struct timeval timeout
,
1762 struct ctdb_db_context
*db
, bool readonly
,
1763 struct ctdb_transaction_handle
**out
)
1765 struct tevent_req
*req
;
1766 struct ctdb_transaction_handle
*h
;
1769 req
= ctdb_transaction_start_send(mem_ctx
, ev
, client
, timeout
, db
,
1775 tevent_req_poll(req
, ev
);
1777 h
= ctdb_transaction_start_recv(req
, &ret
);
1786 struct ctdb_transaction_record_fetch_state
{
1788 struct ctdb_ltdb_header header
;
1792 static int ctdb_transaction_record_fetch_traverse(uint32_t reqid
,
1793 struct ctdb_ltdb_header
*header
,
1798 struct ctdb_transaction_record_fetch_state
*state
=
1799 (struct ctdb_transaction_record_fetch_state
*)private_data
;
1801 if (state
->key
.dsize
== key
.dsize
&&
1802 memcmp(state
->key
.dptr
, key
.dptr
, key
.dsize
) == 0) {
1804 state
->header
= *header
;
1805 state
->found
= true;
1811 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle
*h
,
1813 struct ctdb_ltdb_header
*header
,
1816 struct ctdb_transaction_record_fetch_state state
;
1820 state
.found
= false;
1822 ret
= ctdb_rec_buffer_traverse(h
->recbuf
,
1823 ctdb_transaction_record_fetch_traverse
,
1830 if (header
!= NULL
) {
1831 *header
= state
.header
;
1842 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle
*h
,
1844 TALLOC_CTX
*mem_ctx
, TDB_DATA
*data
)
1847 struct ctdb_ltdb_header header
;
1850 ret
= ctdb_transaction_record_fetch(h
, key
, NULL
, &tmp_data
);
1852 data
->dptr
= talloc_memdup(mem_ctx
, tmp_data
.dptr
,
1854 if (data
->dptr
== NULL
) {
1857 data
->dsize
= tmp_data
.dsize
;
1861 ret
= ctdb_ltdb_fetch(h
->db
, key
, &header
, mem_ctx
, data
);
1866 ret
= ctdb_rec_buffer_add(h
, h
->recbuf
, 0, &header
, key
, *data
);
1874 int ctdb_transaction_store_record(struct ctdb_transaction_handle
*h
,
1875 TDB_DATA key
, TDB_DATA data
)
1877 TALLOC_CTX
*tmp_ctx
;
1878 struct ctdb_ltdb_header header
;
1886 tmp_ctx
= talloc_new(h
);
1887 if (tmp_ctx
== NULL
) {
1891 ret
= ctdb_transaction_record_fetch(h
, key
, &header
, &old_data
);
1893 ret
= ctdb_ltdb_fetch(h
->db
, key
, &header
, tmp_ctx
, &old_data
);
1899 if (old_data
.dsize
== data
.dsize
&&
1900 memcmp(old_data
.dptr
, data
.dptr
, data
.dsize
) == 0) {
1901 talloc_free(tmp_ctx
);
1905 header
.dmaster
= ctdb_client_pnn(h
->client
);
1908 ret
= ctdb_rec_buffer_add(h
, h
->recbuf
, 0, &header
, key
, data
);
1909 talloc_free(tmp_ctx
);
1918 int ctdb_transaction_delete_record(struct ctdb_transaction_handle
*h
,
1921 return ctdb_transaction_store_record(h
, key
, tdb_null
);
1924 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle
*h
,
1927 const char *keyname
= CTDB_DB_SEQNUM_KEY
;
1930 key
.dptr
= discard_const(keyname
);
1931 key
.dsize
= strlen(keyname
) + 1;
1933 data
.dptr
= (uint8_t *)&seqnum
;
1934 data
.dsize
= sizeof(seqnum
);
1936 return ctdb_transaction_store_record(h
, key
, data
);
1939 struct ctdb_transaction_commit_state
{
1940 struct tevent_context
*ev
;
1941 struct ctdb_transaction_handle
*h
;
1945 static void ctdb_transaction_commit_done(struct tevent_req
*subreq
);
1946 static void ctdb_transaction_commit_try(struct tevent_req
*subreq
);
1948 struct tevent_req
*ctdb_transaction_commit_send(
1949 TALLOC_CTX
*mem_ctx
,
1950 struct tevent_context
*ev
,
1951 struct ctdb_transaction_handle
*h
)
1953 struct tevent_req
*req
, *subreq
;
1954 struct ctdb_transaction_commit_state
*state
;
1957 req
= tevent_req_create(mem_ctx
, &state
,
1958 struct ctdb_transaction_commit_state
);
1966 ret
= ctdb_ctrl_get_db_seqnum(state
, ev
, h
->client
,
1967 h
->client
->pnn
, tevent_timeval_zero(),
1968 h
->db
->db_id
, &state
->seqnum
);
1970 tevent_req_error(req
, ret
);
1971 return tevent_req_post(req
, ev
);
1974 ret
= ctdb_transaction_store_db_seqnum(h
, state
->seqnum
+1);
1976 tevent_req_error(req
, ret
);
1977 return tevent_req_post(req
, ev
);
1980 subreq
= ctdb_recovery_wait_send(state
, ev
, h
->client
);
1981 if (tevent_req_nomem(subreq
, req
)) {
1982 return tevent_req_post(req
, ev
);
1984 tevent_req_set_callback(subreq
, ctdb_transaction_commit_try
, req
);
1989 static void ctdb_transaction_commit_try(struct tevent_req
*subreq
)
1991 struct tevent_req
*req
= tevent_req_callback_data(
1992 subreq
, struct tevent_req
);
1993 struct ctdb_transaction_commit_state
*state
= tevent_req_data(
1994 req
, struct ctdb_transaction_commit_state
);
1995 struct ctdb_req_control request
;
1999 status
= ctdb_recovery_wait_recv(subreq
, &ret
);
2000 TALLOC_FREE(subreq
);
2002 tevent_req_error(req
, ret
);
2006 ctdb_req_control_trans3_commit(&request
, state
->h
->recbuf
);
2007 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->h
->client
,
2008 state
->h
->client
->pnn
,
2009 tevent_timeval_zero(), &request
);
2010 if (tevent_req_nomem(subreq
, req
)) {
2013 tevent_req_set_callback(subreq
, ctdb_transaction_commit_done
, req
);
2016 static void ctdb_transaction_commit_done(struct tevent_req
*subreq
)
2018 struct tevent_req
*req
= tevent_req_callback_data(
2019 subreq
, struct tevent_req
);
2020 struct ctdb_transaction_commit_state
*state
= tevent_req_data(
2021 req
, struct ctdb_transaction_commit_state
);
2022 struct ctdb_reply_control
*reply
;
2027 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2028 TALLOC_FREE(subreq
);
2030 tevent_req_error(req
, ret
);
2034 ret
= ctdb_reply_control_trans3_commit(reply
);
2036 /* Control failed due to recovery */
2037 subreq
= ctdb_recovery_wait_send(state
, state
->ev
,
2039 if (tevent_req_nomem(subreq
, req
)) {
2042 tevent_req_set_callback(subreq
, ctdb_transaction_commit_try
,
2047 ret
= ctdb_ctrl_get_db_seqnum(state
, state
->ev
, state
->h
->client
,
2048 state
->h
->client
->pnn
,
2049 tevent_timeval_zero(),
2050 state
->h
->db
->db_id
, &seqnum
);
2052 tevent_req_error(req
, ret
);
2056 if (seqnum
== state
->seqnum
) {
2057 subreq
= ctdb_recovery_wait_send(state
, state
->ev
,
2059 if (tevent_req_nomem(subreq
, req
)) {
2062 tevent_req_set_callback(subreq
, ctdb_transaction_commit_try
,
2067 if (seqnum
!= state
->seqnum
+ 1) {
2068 tevent_req_error(req
, EIO
);
2072 tevent_req_done(req
);
2075 bool ctdb_transaction_commit_recv(struct tevent_req
*req
, int *perr
)
2077 struct ctdb_transaction_commit_state
*state
= tevent_req_data(
2078 req
, struct ctdb_transaction_commit_state
);
2081 if (tevent_req_is_unix_error(req
, &err
)) {
2085 TALLOC_FREE(state
->h
);
2089 TALLOC_FREE(state
->h
);
2093 int ctdb_transaction_commit(struct ctdb_transaction_handle
*h
)
2095 struct tevent_req
*req
;
2099 if (h
->readonly
|| ! h
->updated
) {
2104 req
= ctdb_transaction_commit_send(h
, h
->ev
, h
);
2110 tevent_req_poll(req
, h
->ev
);
2112 status
= ctdb_transaction_commit_recv(req
, &ret
);
2122 int ctdb_transaction_cancel(struct ctdb_transaction_handle
*h
)
2131 * In future Samba should register SERVER_ID.
2132 * Make that structure same as struct srvid {}.