2 efficient async ctdb traverse
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/filesys.h"
22 #include "system/network.h"
23 #include "system/wait.h"
24 #include "system/time.h"
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33 #include "lib/util/sys_rw.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
44 typedef void (*ctdb_traverse_fn_t
)(void *private_data
, TDB_DATA key
, TDB_DATA data
);
47 handle returned to caller - freeing this handler will kill the child and
48 terminate the traverse
50 struct ctdb_traverse_local_handle
{
51 struct ctdb_traverse_local_handle
*next
, *prev
;
52 struct ctdb_db_context
*ctdb_db
;
56 uint32_t client_reqid
;
60 ctdb_traverse_fn_t callback
;
61 bool withemptyrecords
;
62 struct tevent_fd
*fde
;
68 * called when traverse is completed by child or on error
70 static void ctdb_traverse_child_handler(struct tevent_context
*ev
, struct tevent_fd
*fde
,
71 uint16_t flags
, void *private_data
)
73 struct ctdb_traverse_local_handle
*h
= talloc_get_type(private_data
,
74 struct ctdb_traverse_local_handle
);
75 ctdb_traverse_fn_t callback
= h
->callback
;
76 void *p
= h
->private_data
;
80 /* Read the number of records sent by traverse child */
81 n
= sys_read(h
->fd
[0], &res
, sizeof(res
));
82 if (n
< 0 || n
!= sizeof(res
)) {
83 /* Traverse child failed */
84 DEBUG(DEBUG_ERR
, ("Local traverse failed db:%s reqid:%d\n",
85 h
->ctdb_db
->db_name
, h
->reqid
));
89 DEBUG(DEBUG_ERR
, ("Local traverse failed db:%s reqid:%d records:%d\n",
90 h
->ctdb_db
->db_name
, h
->reqid
, res
));
92 DEBUG(DEBUG_INFO
, ("Local traverse end db:%s reqid:%d records:%d\n",
93 h
->ctdb_db
->db_name
, h
->reqid
, res
));
96 callback(p
, tdb_null
, tdb_null
);
100 destroy a in-flight traverse operation
102 static int traverse_local_destructor(struct ctdb_traverse_local_handle
*h
)
104 DLIST_REMOVE(h
->ctdb_db
->traverse
, h
);
105 ctdb_kill(h
->ctdb_db
->ctdb
, h
->child
, SIGKILL
);
110 callback from tdb_traverse_read()
112 static int ctdb_traverse_local_fn(struct tdb_context
*tdb
, TDB_DATA key
, TDB_DATA data
, void *p
)
114 struct ctdb_traverse_local_handle
*h
= talloc_get_type(p
,
115 struct ctdb_traverse_local_handle
);
116 struct ctdb_rec_data_old
*d
;
117 struct ctdb_ltdb_header
*hdr
;
121 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
123 if (h
->ctdb_db
->persistent
== 0) {
124 /* filter out zero-length records */
125 if (!h
->withemptyrecords
&&
126 data
.dsize
<= sizeof(struct ctdb_ltdb_header
))
131 /* filter out non-authoritative records */
132 if (hdr
->dmaster
!= h
->ctdb_db
->ctdb
->pnn
) {
137 d
= ctdb_marshall_record(h
, h
->reqid
, key
, NULL
, data
);
139 /* error handling is tricky in this child code .... */
144 outdata
.dptr
= (uint8_t *)d
;
145 outdata
.dsize
= d
->length
;
147 res
= ctdb_control(h
->ctdb_db
->ctdb
, h
->srcnode
, 0, CTDB_CONTROL_TRAVERSE_DATA
,
148 CTDB_CTRL_FLAG_NOREPLY
, outdata
, NULL
, NULL
, &status
, NULL
, NULL
);
149 if (res
!= 0 || status
!= 0) {
158 struct traverse_all_state
{
159 struct ctdb_context
*ctdb
;
160 struct ctdb_traverse_local_handle
*h
;
163 uint32_t client_reqid
;
165 bool withemptyrecords
;
169 setup a non-blocking traverse of a local ltdb. The callback function
170 will be called on every record in the local ltdb. To stop the
171 traverse, talloc_free() the traverse_handle.
173 The traverse is finished when the callback is called with tdb_null for key and data
175 static struct ctdb_traverse_local_handle
*ctdb_traverse_local(struct ctdb_db_context
*ctdb_db
,
176 ctdb_traverse_fn_t callback
,
177 struct traverse_all_state
*all_state
)
179 struct ctdb_traverse_local_handle
*h
;
182 h
= talloc_zero(all_state
, struct ctdb_traverse_local_handle
);
194 h
->child
= ctdb_fork(ctdb_db
->ctdb
);
196 if (h
->child
== (pid_t
)-1) {
203 h
->callback
= callback
;
204 h
->private_data
= all_state
;
205 h
->ctdb_db
= ctdb_db
;
206 h
->client_reqid
= all_state
->client_reqid
;
207 h
->reqid
= all_state
->reqid
;
208 h
->srvid
= all_state
->srvid
;
209 h
->srcnode
= all_state
->srcnode
;
210 h
->withemptyrecords
= all_state
->withemptyrecords
;
213 /* start the traverse in the child */
215 pid_t parent
= getpid();
216 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
217 struct ctdb_rec_data_old
*d
;
222 prctl_set_comment("ctdb_traverse");
223 if (switch_from_server_to_client(ctdb
) != 0) {
224 DEBUG(DEBUG_CRIT
, ("Failed to switch traverse child into client mode\n"));
228 d
= ctdb_marshall_record(h
, h
->reqid
, tdb_null
, NULL
, tdb_null
);
231 sys_write(h
->fd
[1], &res
, sizeof(int));
235 res
= tdb_traverse_read(ctdb_db
->ltdb
->tdb
, ctdb_traverse_local_fn
, h
);
236 if (res
== -1 || h
->records_failed
> 0) {
237 /* traverse failed */
238 res
= -(h
->records_sent
);
240 res
= h
->records_sent
;
243 /* Wait till all the data is flushed from output queue */
244 while (ctdb_queue_length(ctdb
->daemon
.queue
) > 0) {
245 tevent_loop_once(ctdb
->ev
);
248 /* End traverse by sending empty record */
249 outdata
.dptr
= (uint8_t *)d
;
250 outdata
.dsize
= d
->length
;
251 ret
= ctdb_control(ctdb
, h
->srcnode
, 0,
252 CTDB_CONTROL_TRAVERSE_DATA
,
253 CTDB_CTRL_FLAG_NOREPLY
, outdata
,
254 NULL
, NULL
, &status
, NULL
, NULL
);
255 if (ret
== -1 || status
== -1) {
261 sys_write(h
->fd
[1], &res
, sizeof(res
));
263 ctdb_wait_for_process_to_exit(parent
);
268 set_close_on_exec(h
->fd
[0]);
270 talloc_set_destructor(h
, traverse_local_destructor
);
272 DLIST_ADD(ctdb_db
->traverse
, h
);
274 h
->fde
= tevent_add_fd(ctdb_db
->ctdb
->ev
, h
, h
->fd
[0], TEVENT_FD_READ
,
275 ctdb_traverse_child_handler
, h
);
276 if (h
->fde
== NULL
) {
281 tevent_fd_set_auto_close(h
->fde
);
287 struct ctdb_traverse_all_handle
{
288 struct ctdb_context
*ctdb
;
289 struct ctdb_db_context
*ctdb_db
;
291 ctdb_traverse_fn_t callback
;
298 destroy a traverse_all op
300 static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle
*state
)
302 reqid_remove(state
->ctdb
->idr
, state
->reqid
);
306 /* called when a traverse times out */
307 static void ctdb_traverse_all_timeout(struct tevent_context
*ev
,
308 struct tevent_timer
*te
,
309 struct timeval t
, void *private_data
)
311 struct ctdb_traverse_all_handle
*state
= talloc_get_type(private_data
, struct ctdb_traverse_all_handle
);
313 DEBUG(DEBUG_ERR
,(__location__
" Traverse all timeout on database:%s\n", state
->ctdb_db
->db_name
));
314 CTDB_INCREMENT_STAT(state
->ctdb
, timeouts
.traverse
);
316 state
->timedout
= true;
317 state
->callback(state
->private_data
, tdb_null
, tdb_null
);
321 struct traverse_start_state
{
322 struct ctdb_context
*ctdb
;
323 struct ctdb_traverse_all_handle
*h
;
328 bool withemptyrecords
;
334 setup a cluster-wide non-blocking traverse of a ctdb. The
335 callback function will be called on every record in the local
336 ltdb. To stop the traverse, talloc_free() the traverse_handle.
338 The traverse is finished when the callback is called with tdb_null
341 static struct ctdb_traverse_all_handle
*ctdb_daemon_traverse_all(struct ctdb_db_context
*ctdb_db
,
342 ctdb_traverse_fn_t callback
,
343 struct traverse_start_state
*start_state
)
345 struct ctdb_traverse_all_handle
*state
;
346 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
349 struct ctdb_traverse_all r
;
350 struct ctdb_traverse_all_ext r_ext
;
351 uint32_t destination
;
353 state
= talloc(start_state
, struct ctdb_traverse_all_handle
);
359 state
->ctdb_db
= ctdb_db
;
360 state
->reqid
= reqid_new(ctdb_db
->ctdb
->idr
, state
);
361 state
->callback
= callback
;
362 state
->private_data
= start_state
;
363 state
->null_count
= 0;
364 state
->timedout
= false;
366 talloc_set_destructor(state
, ctdb_traverse_all_destructor
);
368 if (start_state
->withemptyrecords
) {
369 r_ext
.db_id
= ctdb_db
->db_id
;
370 r_ext
.reqid
= state
->reqid
;
371 r_ext
.pnn
= ctdb
->pnn
;
372 r_ext
.client_reqid
= start_state
->reqid
;
373 r_ext
.srvid
= start_state
->srvid
;
374 r_ext
.withemptyrecords
= start_state
->withemptyrecords
;
376 data
.dptr
= (uint8_t *)&r_ext
;
377 data
.dsize
= sizeof(r_ext
);
379 r
.db_id
= ctdb_db
->db_id
;
380 r
.reqid
= state
->reqid
;
382 r
.client_reqid
= start_state
->reqid
;
383 r
.srvid
= start_state
->srvid
;
385 data
.dptr
= (uint8_t *)&r
;
386 data
.dsize
= sizeof(r
);
389 if (ctdb_db
->persistent
== 0) {
390 /* normal database, traverse all nodes */
391 destination
= CTDB_BROADCAST_VNNMAP
;
394 /* persistent database, traverse one node, preferably
397 destination
= ctdb
->pnn
;
398 /* check we are in the vnnmap */
399 for (i
=0; i
< ctdb
->vnn_map
->size
; i
++) {
400 if (ctdb
->vnn_map
->map
[i
] == ctdb
->pnn
) {
404 /* if we are not in the vnn map we just pick the first
407 if (i
== ctdb
->vnn_map
->size
) {
408 destination
= ctdb
->vnn_map
->map
[0];
412 /* tell all the nodes in the cluster to start sending records to this
413 * node, or if it is a persistent database, just tell the local
417 if (start_state
->withemptyrecords
) {
418 ret
= ctdb_daemon_send_control(ctdb
, destination
, 0,
419 CTDB_CONTROL_TRAVERSE_ALL_EXT
,
420 0, CTDB_CTRL_FLAG_NOREPLY
, data
, NULL
, NULL
);
422 ret
= ctdb_daemon_send_control(ctdb
, destination
, 0,
423 CTDB_CONTROL_TRAVERSE_ALL
,
424 0, CTDB_CTRL_FLAG_NOREPLY
, data
, NULL
, NULL
);
432 DEBUG(DEBUG_NOTICE
,("Starting traverse on DB %s (id %d)\n",
433 ctdb_db
->db_name
, state
->reqid
));
435 /* timeout the traverse */
436 tevent_add_timer(ctdb
->ev
, state
,
437 timeval_current_ofs(ctdb
->tunable
.traverse_timeout
, 0),
438 ctdb_traverse_all_timeout
, state
);
444 called when local traverse ends
446 static void traverse_all_callback(void *p
, TDB_DATA key
, TDB_DATA data
)
448 struct traverse_all_state
*state
= talloc_get_type(p
, struct traverse_all_state
);
455 * extended version to take the "withemptyrecords" parameter"
457 int32_t ctdb_control_traverse_all_ext(struct ctdb_context
*ctdb
, TDB_DATA data
, TDB_DATA
*outdata
)
459 struct ctdb_traverse_all_ext
*c
= (struct ctdb_traverse_all_ext
*)data
.dptr
;
460 struct traverse_all_state
*state
;
461 struct ctdb_db_context
*ctdb_db
;
463 if (data
.dsize
!= sizeof(struct ctdb_traverse_all_ext
)) {
464 DEBUG(DEBUG_ERR
,(__location__
" Invalid size in ctdb_control_traverse_all_ext\n"));
468 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
469 if (ctdb_db
== NULL
) {
473 if (ctdb_db
->unhealthy_reason
) {
474 if (ctdb
->tunable
.allow_unhealthy_db_read
== 0) {
475 DEBUG(DEBUG_ERR
,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
476 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
479 DEBUG(DEBUG_WARNING
,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
480 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
483 state
= talloc(ctdb_db
, struct traverse_all_state
);
488 state
->reqid
= c
->reqid
;
489 state
->srcnode
= c
->pnn
;
491 state
->client_reqid
= c
->client_reqid
;
492 state
->srvid
= c
->srvid
;
493 state
->withemptyrecords
= c
->withemptyrecords
;
495 state
->h
= ctdb_traverse_local(ctdb_db
, traverse_all_callback
, state
);
496 if (state
->h
== NULL
) {
505 called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
506 setup a traverse of our local ltdb, sending the records as
507 CTDB_CONTROL_TRAVERSE_DATA records back to the originator
509 int32_t ctdb_control_traverse_all(struct ctdb_context
*ctdb
, TDB_DATA data
, TDB_DATA
*outdata
)
511 struct ctdb_traverse_all
*c
= (struct ctdb_traverse_all
*)data
.dptr
;
512 struct traverse_all_state
*state
;
513 struct ctdb_db_context
*ctdb_db
;
515 if (data
.dsize
!= sizeof(struct ctdb_traverse_all
)) {
516 DEBUG(DEBUG_ERR
,(__location__
" Invalid size in ctdb_control_traverse_all\n"));
520 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
521 if (ctdb_db
== NULL
) {
525 if (ctdb_db
->unhealthy_reason
) {
526 if (ctdb
->tunable
.allow_unhealthy_db_read
== 0) {
527 DEBUG(DEBUG_ERR
,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
528 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
531 DEBUG(DEBUG_WARNING
,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
532 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
535 state
= talloc(ctdb_db
, struct traverse_all_state
);
540 state
->reqid
= c
->reqid
;
541 state
->srcnode
= c
->pnn
;
543 state
->client_reqid
= c
->client_reqid
;
544 state
->srvid
= c
->srvid
;
545 state
->withemptyrecords
= false;
547 state
->h
= ctdb_traverse_local(ctdb_db
, traverse_all_callback
, state
);
548 if (state
->h
== NULL
) {
558 called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
559 call the traverse_all callback with the record
561 int32_t ctdb_control_traverse_data(struct ctdb_context
*ctdb
, TDB_DATA data
, TDB_DATA
*outdata
)
563 struct ctdb_rec_data_old
*d
= (struct ctdb_rec_data_old
*)data
.dptr
;
564 struct ctdb_traverse_all_handle
*state
;
566 ctdb_traverse_fn_t callback
;
569 if (data
.dsize
< sizeof(uint32_t) || data
.dsize
!= d
->length
) {
570 DEBUG(DEBUG_ERR
,("Bad record size in ctdb_control_traverse_data\n"));
574 state
= reqid_find(ctdb
->idr
, d
->reqid
, struct ctdb_traverse_all_handle
);
575 if (state
== NULL
|| d
->reqid
!= state
->reqid
) {
576 /* traverse might have been terminated already */
580 key
.dsize
= d
->keylen
;
581 key
.dptr
= &d
->data
[0];
582 data
.dsize
= d
->datalen
;
583 data
.dptr
= &d
->data
[d
->keylen
];
585 if (key
.dsize
== 0 && data
.dsize
== 0) {
587 /* Persistent databases are only scanned on one node (the local
590 if (state
->ctdb_db
->persistent
== 0) {
591 if (state
->null_count
!= ctdb_get_num_active_nodes(ctdb
)) {
597 callback
= state
->callback
;
598 private_data
= state
->private_data
;
600 callback(private_data
, key
, data
);
605 kill a in-progress traverse, used when a client disconnects
607 int32_t ctdb_control_traverse_kill(struct ctdb_context
*ctdb
, TDB_DATA data
,
608 TDB_DATA
*outdata
, uint32_t srcnode
)
610 struct ctdb_traverse_start
*d
= (struct ctdb_traverse_start
*)data
.dptr
;
611 struct ctdb_db_context
*ctdb_db
;
612 struct ctdb_traverse_local_handle
*t
;
614 ctdb_db
= find_ctdb_db(ctdb
, d
->db_id
);
615 if (ctdb_db
== NULL
) {
619 for (t
=ctdb_db
->traverse
; t
; t
=t
->next
) {
620 if (t
->client_reqid
== d
->reqid
&&
621 t
->srvid
== d
->srvid
) {
632 this is called when a client disconnects during a traverse
633 we need to notify all the nodes taking part in the search that they
634 should kill their traverse children
636 static int ctdb_traverse_start_destructor(struct traverse_start_state
*state
)
638 struct ctdb_traverse_start r
;
641 DEBUG(DEBUG_ERR
,(__location__
" Traverse cancelled by client disconnect for database:0x%08x\n", state
->db_id
));
642 r
.db_id
= state
->db_id
;
643 r
.reqid
= state
->reqid
;
644 r
.srvid
= state
->srvid
;
646 data
.dptr
= (uint8_t *)&r
;
647 data
.dsize
= sizeof(r
);
649 ctdb_daemon_send_control(state
->ctdb
, CTDB_BROADCAST_CONNECTED
, 0,
650 CTDB_CONTROL_TRAVERSE_KILL
,
651 0, CTDB_CTRL_FLAG_NOREPLY
, data
, NULL
, NULL
);
656 callback which sends records as messages to the client
658 static void traverse_start_callback(void *p
, TDB_DATA key
, TDB_DATA data
)
660 struct traverse_start_state
*state
;
661 struct ctdb_rec_data_old
*d
;
664 state
= talloc_get_type(p
, struct traverse_start_state
);
666 d
= ctdb_marshall_record(state
, state
->reqid
, key
, NULL
, data
);
671 cdata
.dptr
= (uint8_t *)d
;
672 cdata
.dsize
= d
->length
;
674 srvid_dispatch(state
->ctdb
->srv
, state
->srvid
, 0, cdata
);
675 if (key
.dsize
== 0 && data
.dsize
== 0) {
676 DEBUG(DEBUG_NOTICE
, ("Ending traverse on DB %s (id %d), records %d\n",
677 state
->h
->ctdb_db
->db_name
, state
->h
->reqid
,
678 state
->num_records
));
680 if (state
->h
->timedout
) {
681 /* timed out, send TRAVERSE_KILL control */
684 /* end of traverse */
685 talloc_set_destructor(state
, NULL
);
689 state
->num_records
++;
695 * start a traverse_all - called as a control from a client.
696 * extended version to take the "withemptyrecords" parameter.
698 int32_t ctdb_control_traverse_start_ext(struct ctdb_context
*ctdb
,
704 struct ctdb_traverse_start_ext
*d
= (struct ctdb_traverse_start_ext
*)data
.dptr
;
705 struct traverse_start_state
*state
;
706 struct ctdb_db_context
*ctdb_db
;
707 struct ctdb_client
*client
= reqid_find(ctdb
->idr
, client_id
, struct ctdb_client
);
709 if (client
== NULL
) {
710 DEBUG(DEBUG_ERR
,(__location__
" No client found\n"));
714 if (data
.dsize
!= sizeof(*d
)) {
715 DEBUG(DEBUG_ERR
,("Bad record size in ctdb_control_traverse_start\n"));
719 ctdb_db
= find_ctdb_db(ctdb
, d
->db_id
);
720 if (ctdb_db
== NULL
) {
724 if (ctdb_db
->unhealthy_reason
) {
725 if (ctdb
->tunable
.allow_unhealthy_db_read
== 0) {
726 DEBUG(DEBUG_ERR
,("db(%s) unhealty in ctdb_control_traverse_start: %s\n",
727 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
730 DEBUG(DEBUG_WARNING
,("warn: db(%s) unhealty in ctdb_control_traverse_start: %s\n",
731 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
734 state
= talloc(client
, struct traverse_start_state
);
739 state
->srcnode
= srcnode
;
740 state
->reqid
= d
->reqid
;
741 state
->srvid
= d
->srvid
;
742 state
->db_id
= d
->db_id
;
744 state
->withemptyrecords
= d
->withemptyrecords
;
745 state
->num_records
= 0;
747 state
->h
= ctdb_daemon_traverse_all(ctdb_db
, traverse_start_callback
, state
);
748 if (state
->h
== NULL
) {
753 talloc_set_destructor(state
, ctdb_traverse_start_destructor
);
759 * start a traverse_all - called as a control from a client.
761 int32_t ctdb_control_traverse_start(struct ctdb_context
*ctdb
,
767 struct ctdb_traverse_start
*d
= (struct ctdb_traverse_start
*)data
.dptr
;
768 struct ctdb_traverse_start_ext d2
;
775 d2
.withemptyrecords
= false;
777 data2
.dsize
= sizeof(d2
);
778 data2
.dptr
= (uint8_t *)&d2
;
780 return ctdb_control_traverse_start_ext(ctdb
, data2
, outdata
, srcnode
, client_id
);