2 efficient async ctdb traverse
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/filesys.h"
22 #include "system/network.h"
23 #include "system/wait.h"
24 #include "system/time.h"
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33 #include "lib/util/util_process.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
38 #include "common/reqid.h"
39 #include "common/system.h"
40 #include "common/common.h"
41 #include "common/logging.h"
43 typedef void (*ctdb_traverse_fn_t
)(void *private_data
, TDB_DATA key
, TDB_DATA data
);
46 handle returned to caller - freeing this handler will kill the child and
47 terminate the traverse
49 struct ctdb_traverse_local_handle
{
50 struct ctdb_traverse_local_handle
*next
, *prev
;
51 struct ctdb_db_context
*ctdb_db
;
55 uint32_t client_reqid
;
59 ctdb_traverse_fn_t callback
;
60 bool withemptyrecords
;
61 struct tevent_fd
*fde
;
67 * called when traverse is completed by child or on error
69 static void ctdb_traverse_child_handler(struct tevent_context
*ev
, struct tevent_fd
*fde
,
70 uint16_t flags
, void *private_data
)
72 struct ctdb_traverse_local_handle
*h
= talloc_get_type(private_data
,
73 struct ctdb_traverse_local_handle
);
74 ctdb_traverse_fn_t callback
= h
->callback
;
75 void *p
= h
->private_data
;
79 /* Read the number of records sent by traverse child */
80 n
= sys_read(h
->fd
[0], &res
, sizeof(res
));
81 if (n
< 0 || n
!= sizeof(res
)) {
82 /* Traverse child failed */
83 DEBUG(DEBUG_ERR
, ("Local traverse failed db:%s reqid:%d\n",
84 h
->ctdb_db
->db_name
, h
->reqid
));
88 DEBUG(DEBUG_ERR
, ("Local traverse failed db:%s reqid:%d records:%d\n",
89 h
->ctdb_db
->db_name
, h
->reqid
, res
));
91 DEBUG(DEBUG_INFO
, ("Local traverse end db:%s reqid:%d records:%d\n",
92 h
->ctdb_db
->db_name
, h
->reqid
, res
));
95 callback(p
, tdb_null
, tdb_null
);
99 destroy a in-flight traverse operation
101 static int traverse_local_destructor(struct ctdb_traverse_local_handle
*h
)
103 DLIST_REMOVE(h
->ctdb_db
->traverse
, h
);
104 ctdb_kill(h
->ctdb_db
->ctdb
, h
->child
, SIGKILL
);
109 callback from tdb_traverse_read()
111 static int ctdb_traverse_local_fn(struct tdb_context
*tdb
, TDB_DATA key
, TDB_DATA data
, void *p
)
113 struct ctdb_traverse_local_handle
*h
= talloc_get_type(p
,
114 struct ctdb_traverse_local_handle
);
115 struct ctdb_rec_data_old
*d
;
116 struct ctdb_ltdb_header
*hdr
;
120 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
122 if (h
->ctdb_db
->persistent
== 0) {
123 /* filter out zero-length records */
124 if (!h
->withemptyrecords
&&
125 data
.dsize
<= sizeof(struct ctdb_ltdb_header
))
130 /* filter out non-authoritative records */
131 if (hdr
->dmaster
!= h
->ctdb_db
->ctdb
->pnn
) {
136 d
= ctdb_marshall_record(h
, h
->reqid
, key
, NULL
, data
);
138 /* error handling is tricky in this child code .... */
143 outdata
.dptr
= (uint8_t *)d
;
144 outdata
.dsize
= d
->length
;
146 res
= ctdb_control(h
->ctdb_db
->ctdb
, h
->srcnode
, 0, CTDB_CONTROL_TRAVERSE_DATA
,
147 CTDB_CTRL_FLAG_NOREPLY
, outdata
, NULL
, NULL
, &status
, NULL
, NULL
);
148 if (res
!= 0 || status
!= 0) {
157 struct traverse_all_state
{
158 struct ctdb_context
*ctdb
;
159 struct ctdb_traverse_local_handle
*h
;
162 uint32_t client_reqid
;
164 bool withemptyrecords
;
168 setup a non-blocking traverse of a local ltdb. The callback function
169 will be called on every record in the local ltdb. To stop the
170 traverse, talloc_free() the traverse_handle.
172 The traverse is finished when the callback is called with tdb_null for key and data
174 static struct ctdb_traverse_local_handle
*ctdb_traverse_local(struct ctdb_db_context
*ctdb_db
,
175 ctdb_traverse_fn_t callback
,
176 struct traverse_all_state
*all_state
)
178 struct ctdb_traverse_local_handle
*h
;
181 h
= talloc_zero(all_state
, struct ctdb_traverse_local_handle
);
193 h
->child
= ctdb_fork(ctdb_db
->ctdb
);
195 if (h
->child
== (pid_t
)-1) {
202 h
->callback
= callback
;
203 h
->private_data
= all_state
;
204 h
->ctdb_db
= ctdb_db
;
205 h
->client_reqid
= all_state
->client_reqid
;
206 h
->reqid
= all_state
->reqid
;
207 h
->srvid
= all_state
->srvid
;
208 h
->srcnode
= all_state
->srcnode
;
209 h
->withemptyrecords
= all_state
->withemptyrecords
;
212 /* start the traverse in the child */
214 pid_t parent
= getpid();
215 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
216 struct ctdb_rec_data_old
*d
;
221 prctl_set_comment("ctdb_traverse");
222 if (switch_from_server_to_client(ctdb
, "traverse_local-%s:",
223 ctdb_db
->db_name
) != 0) {
224 DEBUG(DEBUG_CRIT
, ("Failed to switch traverse child into client mode\n"));
228 d
= ctdb_marshall_record(h
, h
->reqid
, tdb_null
, NULL
, tdb_null
);
231 sys_write(h
->fd
[1], &res
, sizeof(int));
235 res
= tdb_traverse_read(ctdb_db
->ltdb
->tdb
, ctdb_traverse_local_fn
, h
);
236 if (res
== -1 || h
->records_failed
> 0) {
237 /* traverse failed */
238 res
= -(h
->records_sent
);
240 res
= h
->records_sent
;
243 /* Wait till all the data is flushed from output queue */
244 while (ctdb_queue_length(ctdb
->daemon
.queue
) > 0) {
245 tevent_loop_once(ctdb
->ev
);
248 /* End traverse by sending empty record */
249 outdata
.dptr
= (uint8_t *)d
;
250 outdata
.dsize
= d
->length
;
251 ret
= ctdb_control(ctdb
, h
->srcnode
, 0,
252 CTDB_CONTROL_TRAVERSE_DATA
,
253 CTDB_CTRL_FLAG_NOREPLY
, outdata
,
254 NULL
, NULL
, &status
, NULL
, NULL
);
255 if (ret
== -1 || status
== -1) {
261 sys_write(h
->fd
[1], &res
, sizeof(res
));
263 while (ctdb_kill(ctdb
, parent
, 0) == 0 || errno
!= ESRCH
) {
270 set_close_on_exec(h
->fd
[0]);
272 talloc_set_destructor(h
, traverse_local_destructor
);
274 DLIST_ADD(ctdb_db
->traverse
, h
);
276 h
->fde
= tevent_add_fd(ctdb_db
->ctdb
->ev
, h
, h
->fd
[0], TEVENT_FD_READ
,
277 ctdb_traverse_child_handler
, h
);
278 if (h
->fde
== NULL
) {
283 tevent_fd_set_auto_close(h
->fde
);
289 struct ctdb_traverse_all_handle
{
290 struct ctdb_context
*ctdb
;
291 struct ctdb_db_context
*ctdb_db
;
293 ctdb_traverse_fn_t callback
;
300 destroy a traverse_all op
302 static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle
*state
)
304 reqid_remove(state
->ctdb
->idr
, state
->reqid
);
308 /* called when a traverse times out */
309 static void ctdb_traverse_all_timeout(struct tevent_context
*ev
,
310 struct tevent_timer
*te
,
311 struct timeval t
, void *private_data
)
313 struct ctdb_traverse_all_handle
*state
= talloc_get_type(private_data
, struct ctdb_traverse_all_handle
);
315 DEBUG(DEBUG_ERR
,(__location__
" Traverse all timeout on database:%s\n", state
->ctdb_db
->db_name
));
316 CTDB_INCREMENT_STAT(state
->ctdb
, timeouts
.traverse
);
318 state
->timedout
= true;
319 state
->callback(state
->private_data
, tdb_null
, tdb_null
);
323 struct traverse_start_state
{
324 struct ctdb_context
*ctdb
;
325 struct ctdb_traverse_all_handle
*h
;
330 bool withemptyrecords
;
336 setup a cluster-wide non-blocking traverse of a ctdb. The
337 callback function will be called on every record in the local
338 ltdb. To stop the traverse, talloc_free() the traverse_handle.
340 The traverse is finished when the callback is called with tdb_null
343 static struct ctdb_traverse_all_handle
*ctdb_daemon_traverse_all(struct ctdb_db_context
*ctdb_db
,
344 ctdb_traverse_fn_t callback
,
345 struct traverse_start_state
*start_state
)
347 struct ctdb_traverse_all_handle
*state
;
348 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
351 struct ctdb_traverse_all r
;
352 struct ctdb_traverse_all_ext r_ext
;
353 uint32_t destination
;
355 state
= talloc(start_state
, struct ctdb_traverse_all_handle
);
361 state
->ctdb_db
= ctdb_db
;
362 state
->reqid
= reqid_new(ctdb_db
->ctdb
->idr
, state
);
363 state
->callback
= callback
;
364 state
->private_data
= start_state
;
365 state
->null_count
= 0;
366 state
->timedout
= false;
368 talloc_set_destructor(state
, ctdb_traverse_all_destructor
);
370 if (start_state
->withemptyrecords
) {
371 r_ext
.db_id
= ctdb_db
->db_id
;
372 r_ext
.reqid
= state
->reqid
;
373 r_ext
.pnn
= ctdb
->pnn
;
374 r_ext
.client_reqid
= start_state
->reqid
;
375 r_ext
.srvid
= start_state
->srvid
;
376 r_ext
.withemptyrecords
= start_state
->withemptyrecords
;
378 data
.dptr
= (uint8_t *)&r_ext
;
379 data
.dsize
= sizeof(r_ext
);
381 r
.db_id
= ctdb_db
->db_id
;
382 r
.reqid
= state
->reqid
;
384 r
.client_reqid
= start_state
->reqid
;
385 r
.srvid
= start_state
->srvid
;
387 data
.dptr
= (uint8_t *)&r
;
388 data
.dsize
= sizeof(r
);
391 if (ctdb_db
->persistent
== 0) {
392 /* normal database, traverse all nodes */
393 destination
= CTDB_BROADCAST_VNNMAP
;
396 /* persistent database, traverse one node, preferably
399 destination
= ctdb
->pnn
;
400 /* check we are in the vnnmap */
401 for (i
=0; i
< ctdb
->vnn_map
->size
; i
++) {
402 if (ctdb
->vnn_map
->map
[i
] == ctdb
->pnn
) {
406 /* if we are not in the vnn map we just pick the first
409 if (i
== ctdb
->vnn_map
->size
) {
410 destination
= ctdb
->vnn_map
->map
[0];
414 /* tell all the nodes in the cluster to start sending records to this
415 * node, or if it is a persistent database, just tell the local
419 if (start_state
->withemptyrecords
) {
420 ret
= ctdb_daemon_send_control(ctdb
, destination
, 0,
421 CTDB_CONTROL_TRAVERSE_ALL_EXT
,
422 0, CTDB_CTRL_FLAG_NOREPLY
, data
, NULL
, NULL
);
424 ret
= ctdb_daemon_send_control(ctdb
, destination
, 0,
425 CTDB_CONTROL_TRAVERSE_ALL
,
426 0, CTDB_CTRL_FLAG_NOREPLY
, data
, NULL
, NULL
);
434 DEBUG(DEBUG_NOTICE
,("Starting traverse on DB %s (id %d)\n",
435 ctdb_db
->db_name
, state
->reqid
));
437 /* timeout the traverse */
438 tevent_add_timer(ctdb
->ev
, state
,
439 timeval_current_ofs(ctdb
->tunable
.traverse_timeout
, 0),
440 ctdb_traverse_all_timeout
, state
);
446 called when local traverse ends
448 static void traverse_all_callback(void *p
, TDB_DATA key
, TDB_DATA data
)
450 struct traverse_all_state
*state
= talloc_get_type(p
, struct traverse_all_state
);
457 * extended version to take the "withemptyrecords" parameter"
459 int32_t ctdb_control_traverse_all_ext(struct ctdb_context
*ctdb
, TDB_DATA data
, TDB_DATA
*outdata
)
461 struct ctdb_traverse_all_ext
*c
= (struct ctdb_traverse_all_ext
*)data
.dptr
;
462 struct traverse_all_state
*state
;
463 struct ctdb_db_context
*ctdb_db
;
465 if (data
.dsize
!= sizeof(struct ctdb_traverse_all_ext
)) {
466 DEBUG(DEBUG_ERR
,(__location__
" Invalid size in ctdb_control_traverse_all_ext\n"));
470 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
471 if (ctdb_db
== NULL
) {
475 if (ctdb_db
->unhealthy_reason
) {
476 if (ctdb
->tunable
.allow_unhealthy_db_read
== 0) {
477 DEBUG(DEBUG_ERR
,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
478 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
481 DEBUG(DEBUG_WARNING
,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
482 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
485 state
= talloc(ctdb_db
, struct traverse_all_state
);
490 state
->reqid
= c
->reqid
;
491 state
->srcnode
= c
->pnn
;
493 state
->client_reqid
= c
->client_reqid
;
494 state
->srvid
= c
->srvid
;
495 state
->withemptyrecords
= c
->withemptyrecords
;
497 state
->h
= ctdb_traverse_local(ctdb_db
, traverse_all_callback
, state
);
498 if (state
->h
== NULL
) {
507 called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
508 setup a traverse of our local ltdb, sending the records as
509 CTDB_CONTROL_TRAVERSE_DATA records back to the originator
511 int32_t ctdb_control_traverse_all(struct ctdb_context
*ctdb
, TDB_DATA data
, TDB_DATA
*outdata
)
513 struct ctdb_traverse_all
*c
= (struct ctdb_traverse_all
*)data
.dptr
;
514 struct traverse_all_state
*state
;
515 struct ctdb_db_context
*ctdb_db
;
517 if (data
.dsize
!= sizeof(struct ctdb_traverse_all
)) {
518 DEBUG(DEBUG_ERR
,(__location__
" Invalid size in ctdb_control_traverse_all\n"));
522 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
523 if (ctdb_db
== NULL
) {
527 if (ctdb_db
->unhealthy_reason
) {
528 if (ctdb
->tunable
.allow_unhealthy_db_read
== 0) {
529 DEBUG(DEBUG_ERR
,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
530 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
533 DEBUG(DEBUG_WARNING
,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
534 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
537 state
= talloc(ctdb_db
, struct traverse_all_state
);
542 state
->reqid
= c
->reqid
;
543 state
->srcnode
= c
->pnn
;
545 state
->client_reqid
= c
->client_reqid
;
546 state
->srvid
= c
->srvid
;
547 state
->withemptyrecords
= false;
549 state
->h
= ctdb_traverse_local(ctdb_db
, traverse_all_callback
, state
);
550 if (state
->h
== NULL
) {
560 called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
561 call the traverse_all callback with the record
563 int32_t ctdb_control_traverse_data(struct ctdb_context
*ctdb
, TDB_DATA data
, TDB_DATA
*outdata
)
565 struct ctdb_rec_data_old
*d
= (struct ctdb_rec_data_old
*)data
.dptr
;
566 struct ctdb_traverse_all_handle
*state
;
568 ctdb_traverse_fn_t callback
;
571 if (data
.dsize
< sizeof(uint32_t) || data
.dsize
!= d
->length
) {
572 DEBUG(DEBUG_ERR
,("Bad record size in ctdb_control_traverse_data\n"));
576 state
= reqid_find(ctdb
->idr
, d
->reqid
, struct ctdb_traverse_all_handle
);
577 if (state
== NULL
|| d
->reqid
!= state
->reqid
) {
578 /* traverse might have been terminated already */
582 key
.dsize
= d
->keylen
;
583 key
.dptr
= &d
->data
[0];
584 data
.dsize
= d
->datalen
;
585 data
.dptr
= &d
->data
[d
->keylen
];
587 if (key
.dsize
== 0 && data
.dsize
== 0) {
589 /* Persistent databases are only scanned on one node (the local
592 if (state
->ctdb_db
->persistent
== 0) {
593 if (state
->null_count
!= ctdb_get_num_active_nodes(ctdb
)) {
599 callback
= state
->callback
;
600 private_data
= state
->private_data
;
602 callback(private_data
, key
, data
);
607 kill a in-progress traverse, used when a client disconnects
609 int32_t ctdb_control_traverse_kill(struct ctdb_context
*ctdb
, TDB_DATA data
,
610 TDB_DATA
*outdata
, uint32_t srcnode
)
612 struct ctdb_traverse_start
*d
= (struct ctdb_traverse_start
*)data
.dptr
;
613 struct ctdb_db_context
*ctdb_db
;
614 struct ctdb_traverse_local_handle
*t
;
616 ctdb_db
= find_ctdb_db(ctdb
, d
->db_id
);
617 if (ctdb_db
== NULL
) {
621 for (t
=ctdb_db
->traverse
; t
; t
=t
->next
) {
622 if (t
->client_reqid
== d
->reqid
&&
623 t
->srvid
== d
->srvid
) {
634 this is called when a client disconnects during a traverse
635 we need to notify all the nodes taking part in the search that they
636 should kill their traverse children
638 static int ctdb_traverse_start_destructor(struct traverse_start_state
*state
)
640 struct ctdb_traverse_start r
;
643 DEBUG(DEBUG_ERR
,(__location__
" Traverse cancelled by client disconnect for database:0x%08x\n", state
->db_id
));
644 r
.db_id
= state
->db_id
;
645 r
.reqid
= state
->reqid
;
646 r
.srvid
= state
->srvid
;
648 data
.dptr
= (uint8_t *)&r
;
649 data
.dsize
= sizeof(r
);
651 ctdb_daemon_send_control(state
->ctdb
, CTDB_BROADCAST_CONNECTED
, 0,
652 CTDB_CONTROL_TRAVERSE_KILL
,
653 0, CTDB_CTRL_FLAG_NOREPLY
, data
, NULL
, NULL
);
658 callback which sends records as messages to the client
660 static void traverse_start_callback(void *p
, TDB_DATA key
, TDB_DATA data
)
662 struct traverse_start_state
*state
;
663 struct ctdb_rec_data_old
*d
;
666 state
= talloc_get_type(p
, struct traverse_start_state
);
668 d
= ctdb_marshall_record(state
, state
->reqid
, key
, NULL
, data
);
673 cdata
.dptr
= (uint8_t *)d
;
674 cdata
.dsize
= d
->length
;
676 srvid_dispatch(state
->ctdb
->srv
, state
->srvid
, 0, cdata
);
677 if (key
.dsize
== 0 && data
.dsize
== 0) {
678 DEBUG(DEBUG_NOTICE
, ("Ending traverse on DB %s (id %d), records %d\n",
679 state
->h
->ctdb_db
->db_name
, state
->h
->reqid
,
680 state
->num_records
));
682 if (state
->h
->timedout
) {
683 /* timed out, send TRAVERSE_KILL control */
686 /* end of traverse */
687 talloc_set_destructor(state
, NULL
);
691 state
->num_records
++;
697 * start a traverse_all - called as a control from a client.
698 * extended version to take the "withemptyrecords" parameter.
700 int32_t ctdb_control_traverse_start_ext(struct ctdb_context
*ctdb
,
706 struct ctdb_traverse_start_ext
*d
= (struct ctdb_traverse_start_ext
*)data
.dptr
;
707 struct traverse_start_state
*state
;
708 struct ctdb_db_context
*ctdb_db
;
709 struct ctdb_client
*client
= reqid_find(ctdb
->idr
, client_id
, struct ctdb_client
);
711 if (client
== NULL
) {
712 DEBUG(DEBUG_ERR
,(__location__
" No client found\n"));
716 if (data
.dsize
!= sizeof(*d
)) {
717 DEBUG(DEBUG_ERR
,("Bad record size in ctdb_control_traverse_start\n"));
721 ctdb_db
= find_ctdb_db(ctdb
, d
->db_id
);
722 if (ctdb_db
== NULL
) {
726 if (ctdb_db
->unhealthy_reason
) {
727 if (ctdb
->tunable
.allow_unhealthy_db_read
== 0) {
728 DEBUG(DEBUG_ERR
,("db(%s) unhealty in ctdb_control_traverse_start: %s\n",
729 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
732 DEBUG(DEBUG_WARNING
,("warn: db(%s) unhealty in ctdb_control_traverse_start: %s\n",
733 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
736 state
= talloc(client
, struct traverse_start_state
);
741 state
->srcnode
= srcnode
;
742 state
->reqid
= d
->reqid
;
743 state
->srvid
= d
->srvid
;
744 state
->db_id
= d
->db_id
;
746 state
->withemptyrecords
= d
->withemptyrecords
;
747 state
->num_records
= 0;
749 state
->h
= ctdb_daemon_traverse_all(ctdb_db
, traverse_start_callback
, state
);
750 if (state
->h
== NULL
) {
755 talloc_set_destructor(state
, ctdb_traverse_start_destructor
);
761 * start a traverse_all - called as a control from a client.
763 int32_t ctdb_control_traverse_start(struct ctdb_context
*ctdb
,
769 struct ctdb_traverse_start
*d
= (struct ctdb_traverse_start
*)data
.dptr
;
770 struct ctdb_traverse_start_ext d2
;
777 d2
.withemptyrecords
= false;
779 data2
.dsize
= sizeof(d2
);
780 data2
.dptr
= (uint8_t *)&d2
;
782 return ctdb_control_traverse_start_ext(ctdb
, data2
, outdata
, srcnode
, client_id
);