2 efficient async ctdb traverse
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/filesys.h"
22 #include "system/wait.h"
25 #include "../include/ctdb_private.h"
26 #include "lib/util/dlinklist.h"
28 typedef void (*ctdb_traverse_fn_t
)(void *private_data
, TDB_DATA key
, TDB_DATA data
);
31 handle returned to caller - freeing this handler will kill the child and
32 terminate the traverse
34 struct ctdb_traverse_local_handle
{
35 struct ctdb_traverse_local_handle
*next
, *prev
;
36 struct ctdb_db_context
*ctdb_db
;
40 uint32_t client_reqid
;
44 ctdb_traverse_fn_t callback
;
45 bool withemptyrecords
;
46 struct tevent_fd
*fde
;
52 * called when traverse is completed by child or on error
54 static void ctdb_traverse_child_handler(struct tevent_context
*ev
, struct tevent_fd
*fde
,
55 uint16_t flags
, void *private_data
)
57 struct ctdb_traverse_local_handle
*h
= talloc_get_type(private_data
,
58 struct ctdb_traverse_local_handle
);
59 ctdb_traverse_fn_t callback
= h
->callback
;
60 void *p
= h
->private_data
;
64 /* Read the number of records sent by traverse child */
65 n
= read(h
->fd
[0], &res
, sizeof(res
));
66 if (n
< 0 || n
!= sizeof(res
)) {
67 /* Traverse child failed */
68 DEBUG(DEBUG_ERR
, ("Local traverse failed db:%s reqid:%d\n",
69 h
->ctdb_db
->db_name
, h
->reqid
));
73 DEBUG(DEBUG_ERR
, ("Local traverse failed db:%s reqid:%d records:%d\n",
74 h
->ctdb_db
->db_name
, h
->reqid
, res
));
76 DEBUG(DEBUG_INFO
, ("Local traverse end db:%s reqid:%d records:%d\n",
77 h
->ctdb_db
->db_name
, h
->reqid
, res
));
80 callback(p
, tdb_null
, tdb_null
);
84 destroy a in-flight traverse operation
86 static int traverse_local_destructor(struct ctdb_traverse_local_handle
*h
)
88 DLIST_REMOVE(h
->ctdb_db
->traverse
, h
);
89 ctdb_kill(h
->ctdb_db
->ctdb
, h
->child
, SIGKILL
);
94 callback from tdb_traverse_read()
96 static int ctdb_traverse_local_fn(struct tdb_context
*tdb
, TDB_DATA key
, TDB_DATA data
, void *p
)
98 struct ctdb_traverse_local_handle
*h
= talloc_get_type(p
,
99 struct ctdb_traverse_local_handle
);
100 struct ctdb_rec_data
*d
;
101 struct ctdb_ltdb_header
*hdr
;
105 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
107 if (h
->ctdb_db
->persistent
== 0) {
108 /* filter out zero-length records */
109 if (!h
->withemptyrecords
&&
110 data
.dsize
<= sizeof(struct ctdb_ltdb_header
))
115 /* filter out non-authoritative records */
116 if (hdr
->dmaster
!= h
->ctdb_db
->ctdb
->pnn
) {
121 d
= ctdb_marshall_record(h
, h
->reqid
, key
, NULL
, data
);
123 /* error handling is tricky in this child code .... */
128 outdata
.dptr
= (uint8_t *)d
;
129 outdata
.dsize
= d
->length
;
131 res
= ctdb_control(h
->ctdb_db
->ctdb
, h
->srcnode
, 0, CTDB_CONTROL_TRAVERSE_DATA
,
132 CTDB_CTRL_FLAG_NOREPLY
, outdata
, NULL
, NULL
, &status
, NULL
, NULL
);
133 if (res
!= 0 || status
!= 0) {
142 struct traverse_all_state
{
143 struct ctdb_context
*ctdb
;
144 struct ctdb_traverse_local_handle
*h
;
147 uint32_t client_reqid
;
149 bool withemptyrecords
;
153 setup a non-blocking traverse of a local ltdb. The callback function
154 will be called on every record in the local ltdb. To stop the
155 traverse, talloc_free() the traverse_handle.
157 The traverse is finished when the callback is called with tdb_null for key and data
159 static struct ctdb_traverse_local_handle
*ctdb_traverse_local(struct ctdb_db_context
*ctdb_db
,
160 ctdb_traverse_fn_t callback
,
161 struct traverse_all_state
*all_state
)
163 struct ctdb_traverse_local_handle
*h
;
166 h
= talloc_zero(all_state
, struct ctdb_traverse_local_handle
);
178 h
->child
= ctdb_fork(ctdb_db
->ctdb
);
180 if (h
->child
== (pid_t
)-1) {
187 h
->callback
= callback
;
188 h
->private_data
= all_state
;
189 h
->ctdb_db
= ctdb_db
;
190 h
->client_reqid
= all_state
->client_reqid
;
191 h
->reqid
= all_state
->reqid
;
192 h
->srvid
= all_state
->srvid
;
193 h
->srcnode
= all_state
->srcnode
;
194 h
->withemptyrecords
= all_state
->withemptyrecords
;
197 /* start the traverse in the child */
199 pid_t parent
= getpid();
200 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
201 struct ctdb_rec_data
*d
;
206 ctdb_set_process_name("ctdb_traverse");
207 if (switch_from_server_to_client(ctdb
, "traverse_local-%s:",
208 ctdb_db
->db_name
) != 0) {
209 DEBUG(DEBUG_CRIT
, ("Failed to switch traverse child into client mode\n"));
213 d
= ctdb_marshall_record(h
, h
->reqid
, tdb_null
, NULL
, tdb_null
);
216 write(h
->fd
[1], &res
, sizeof(int));
220 res
= tdb_traverse_read(ctdb_db
->ltdb
->tdb
, ctdb_traverse_local_fn
, h
);
221 if (res
== -1 || h
->records_failed
> 0) {
222 /* traverse failed */
223 res
= -(h
->records_sent
);
225 res
= h
->records_sent
;
228 /* Wait till all the data is flushed from output queue */
229 while (ctdb_queue_length(ctdb
->daemon
.queue
) > 0) {
230 tevent_loop_once(ctdb
->ev
);
233 /* End traverse by sending empty record */
234 outdata
.dptr
= (uint8_t *)d
;
235 outdata
.dsize
= d
->length
;
236 ret
= ctdb_control(ctdb
, h
->srcnode
, 0,
237 CTDB_CONTROL_TRAVERSE_DATA
,
238 CTDB_CTRL_FLAG_NOREPLY
, outdata
,
239 NULL
, NULL
, &status
, NULL
, NULL
);
240 if (ret
== -1 || status
== -1) {
246 write(h
->fd
[1], &res
, sizeof(res
));
248 while (ctdb_kill(ctdb
, parent
, 0) == 0 || errno
!= ESRCH
) {
255 set_close_on_exec(h
->fd
[0]);
257 talloc_set_destructor(h
, traverse_local_destructor
);
259 DLIST_ADD(ctdb_db
->traverse
, h
);
261 h
->fde
= tevent_add_fd(ctdb_db
->ctdb
->ev
, h
, h
->fd
[0], EVENT_FD_READ
,
262 ctdb_traverse_child_handler
, h
);
263 if (h
->fde
== NULL
) {
268 tevent_fd_set_auto_close(h
->fde
);
274 struct ctdb_traverse_all_handle
{
275 struct ctdb_context
*ctdb
;
276 struct ctdb_db_context
*ctdb_db
;
278 ctdb_traverse_fn_t callback
;
285 destroy a traverse_all op
287 static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle
*state
)
289 ctdb_reqid_remove(state
->ctdb
, state
->reqid
);
293 struct ctdb_traverse_all
{
297 uint32_t client_reqid
;
301 struct ctdb_traverse_all_ext
{
305 uint32_t client_reqid
;
307 bool withemptyrecords
;
310 /* called when a traverse times out */
311 static void ctdb_traverse_all_timeout(struct event_context
*ev
, struct timed_event
*te
,
312 struct timeval t
, void *private_data
)
314 struct ctdb_traverse_all_handle
*state
= talloc_get_type(private_data
, struct ctdb_traverse_all_handle
);
316 DEBUG(DEBUG_ERR
,(__location__
" Traverse all timeout on database:%s\n", state
->ctdb_db
->db_name
));
317 CTDB_INCREMENT_STAT(state
->ctdb
, timeouts
.traverse
);
319 state
->timedout
= true;
320 state
->callback(state
->private_data
, tdb_null
, tdb_null
);
324 struct traverse_start_state
{
325 struct ctdb_context
*ctdb
;
326 struct ctdb_traverse_all_handle
*h
;
331 bool withemptyrecords
;
337 setup a cluster-wide non-blocking traverse of a ctdb. The
338 callback function will be called on every record in the local
339 ltdb. To stop the traverse, talloc_free() the traverse_handle.
341 The traverse is finished when the callback is called with tdb_null
344 static struct ctdb_traverse_all_handle
*ctdb_daemon_traverse_all(struct ctdb_db_context
*ctdb_db
,
345 ctdb_traverse_fn_t callback
,
346 struct traverse_start_state
*start_state
)
348 struct ctdb_traverse_all_handle
*state
;
349 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
352 struct ctdb_traverse_all r
;
353 struct ctdb_traverse_all_ext r_ext
;
354 uint32_t destination
;
356 state
= talloc(start_state
, struct ctdb_traverse_all_handle
);
362 state
->ctdb_db
= ctdb_db
;
363 state
->reqid
= ctdb_reqid_new(ctdb_db
->ctdb
, state
);
364 state
->callback
= callback
;
365 state
->private_data
= start_state
;
366 state
->null_count
= 0;
367 state
->timedout
= false;
369 talloc_set_destructor(state
, ctdb_traverse_all_destructor
);
371 if (start_state
->withemptyrecords
) {
372 r_ext
.db_id
= ctdb_db
->db_id
;
373 r_ext
.reqid
= state
->reqid
;
374 r_ext
.pnn
= ctdb
->pnn
;
375 r_ext
.client_reqid
= start_state
->reqid
;
376 r_ext
.srvid
= start_state
->srvid
;
377 r_ext
.withemptyrecords
= start_state
->withemptyrecords
;
379 data
.dptr
= (uint8_t *)&r_ext
;
380 data
.dsize
= sizeof(r_ext
);
382 r
.db_id
= ctdb_db
->db_id
;
383 r
.reqid
= state
->reqid
;
385 r
.client_reqid
= start_state
->reqid
;
386 r
.srvid
= start_state
->srvid
;
388 data
.dptr
= (uint8_t *)&r
;
389 data
.dsize
= sizeof(r
);
392 if (ctdb_db
->persistent
== 0) {
393 /* normal database, traverse all nodes */
394 destination
= CTDB_BROADCAST_VNNMAP
;
397 /* persistent database, traverse one node, preferably
400 destination
= ctdb
->pnn
;
401 /* check we are in the vnnmap */
402 for (i
=0; i
< ctdb
->vnn_map
->size
; i
++) {
403 if (ctdb
->vnn_map
->map
[i
] == ctdb
->pnn
) {
407 /* if we are not in the vnn map we just pick the first
410 if (i
== ctdb
->vnn_map
->size
) {
411 destination
= ctdb
->vnn_map
->map
[0];
415 /* tell all the nodes in the cluster to start sending records to this
416 * node, or if it is a persistent database, just tell the local
420 if (start_state
->withemptyrecords
) {
421 ret
= ctdb_daemon_send_control(ctdb
, destination
, 0,
422 CTDB_CONTROL_TRAVERSE_ALL_EXT
,
423 0, CTDB_CTRL_FLAG_NOREPLY
, data
, NULL
, NULL
);
425 ret
= ctdb_daemon_send_control(ctdb
, destination
, 0,
426 CTDB_CONTROL_TRAVERSE_ALL
,
427 0, CTDB_CTRL_FLAG_NOREPLY
, data
, NULL
, NULL
);
435 DEBUG(DEBUG_NOTICE
,("Starting traverse on DB %s (id %d)\n",
436 ctdb_db
->db_name
, state
->reqid
));
438 /* timeout the traverse */
439 event_add_timed(ctdb
->ev
, state
,
440 timeval_current_ofs(ctdb
->tunable
.traverse_timeout
, 0),
441 ctdb_traverse_all_timeout
, state
);
447 called when local traverse ends
449 static void traverse_all_callback(void *p
, TDB_DATA key
, TDB_DATA data
)
451 struct traverse_all_state
*state
= talloc_get_type(p
, struct traverse_all_state
);
458 * extended version to take the "withemptyrecords" parameter"
460 int32_t ctdb_control_traverse_all_ext(struct ctdb_context
*ctdb
, TDB_DATA data
, TDB_DATA
*outdata
)
462 struct ctdb_traverse_all_ext
*c
= (struct ctdb_traverse_all_ext
*)data
.dptr
;
463 struct traverse_all_state
*state
;
464 struct ctdb_db_context
*ctdb_db
;
466 if (data
.dsize
!= sizeof(struct ctdb_traverse_all_ext
)) {
467 DEBUG(DEBUG_ERR
,(__location__
" Invalid size in ctdb_control_traverse_all_ext\n"));
471 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
472 if (ctdb_db
== NULL
) {
476 if (ctdb_db
->unhealthy_reason
) {
477 if (ctdb
->tunable
.allow_unhealthy_db_read
== 0) {
478 DEBUG(DEBUG_ERR
,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
479 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
482 DEBUG(DEBUG_WARNING
,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
483 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
486 state
= talloc(ctdb_db
, struct traverse_all_state
);
491 state
->reqid
= c
->reqid
;
492 state
->srcnode
= c
->pnn
;
494 state
->client_reqid
= c
->client_reqid
;
495 state
->srvid
= c
->srvid
;
496 state
->withemptyrecords
= c
->withemptyrecords
;
498 state
->h
= ctdb_traverse_local(ctdb_db
, traverse_all_callback
, state
);
499 if (state
->h
== NULL
) {
508 called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
509 setup a traverse of our local ltdb, sending the records as
510 CTDB_CONTROL_TRAVERSE_DATA records back to the originator
512 int32_t ctdb_control_traverse_all(struct ctdb_context
*ctdb
, TDB_DATA data
, TDB_DATA
*outdata
)
514 struct ctdb_traverse_all
*c
= (struct ctdb_traverse_all
*)data
.dptr
;
515 struct traverse_all_state
*state
;
516 struct ctdb_db_context
*ctdb_db
;
518 if (data
.dsize
!= sizeof(struct ctdb_traverse_all
)) {
519 DEBUG(DEBUG_ERR
,(__location__
" Invalid size in ctdb_control_traverse_all\n"));
523 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
524 if (ctdb_db
== NULL
) {
528 if (ctdb_db
->unhealthy_reason
) {
529 if (ctdb
->tunable
.allow_unhealthy_db_read
== 0) {
530 DEBUG(DEBUG_ERR
,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
531 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
534 DEBUG(DEBUG_WARNING
,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
535 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
538 state
= talloc(ctdb_db
, struct traverse_all_state
);
543 state
->reqid
= c
->reqid
;
544 state
->srcnode
= c
->pnn
;
546 state
->client_reqid
= c
->client_reqid
;
547 state
->srvid
= c
->srvid
;
548 state
->withemptyrecords
= false;
550 state
->h
= ctdb_traverse_local(ctdb_db
, traverse_all_callback
, state
);
551 if (state
->h
== NULL
) {
561 called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
562 call the traverse_all callback with the record
564 int32_t ctdb_control_traverse_data(struct ctdb_context
*ctdb
, TDB_DATA data
, TDB_DATA
*outdata
)
566 struct ctdb_rec_data
*d
= (struct ctdb_rec_data
*)data
.dptr
;
567 struct ctdb_traverse_all_handle
*state
;
569 ctdb_traverse_fn_t callback
;
572 if (data
.dsize
< sizeof(uint32_t) || data
.dsize
!= d
->length
) {
573 DEBUG(DEBUG_ERR
,("Bad record size in ctdb_control_traverse_data\n"));
577 state
= ctdb_reqid_find(ctdb
, d
->reqid
, struct ctdb_traverse_all_handle
);
578 if (state
== NULL
|| d
->reqid
!= state
->reqid
) {
579 /* traverse might have been terminated already */
583 key
.dsize
= d
->keylen
;
584 key
.dptr
= &d
->data
[0];
585 data
.dsize
= d
->datalen
;
586 data
.dptr
= &d
->data
[d
->keylen
];
588 if (key
.dsize
== 0 && data
.dsize
== 0) {
590 /* Persistent databases are only scanned on one node (the local
593 if (state
->ctdb_db
->persistent
== 0) {
594 if (state
->null_count
!= ctdb_get_num_active_nodes(ctdb
)) {
600 callback
= state
->callback
;
601 private_data
= state
->private_data
;
603 callback(private_data
, key
, data
);
608 kill a in-progress traverse, used when a client disconnects
610 int32_t ctdb_control_traverse_kill(struct ctdb_context
*ctdb
, TDB_DATA data
,
611 TDB_DATA
*outdata
, uint32_t srcnode
)
613 struct ctdb_traverse_start
*d
= (struct ctdb_traverse_start
*)data
.dptr
;
614 struct ctdb_db_context
*ctdb_db
;
615 struct ctdb_traverse_local_handle
*t
;
617 ctdb_db
= find_ctdb_db(ctdb
, d
->db_id
);
618 if (ctdb_db
== NULL
) {
622 for (t
=ctdb_db
->traverse
; t
; t
=t
->next
) {
623 if (t
->client_reqid
== d
->reqid
&&
624 t
->srvid
== d
->srvid
) {
635 this is called when a client disconnects during a traverse
636 we need to notify all the nodes taking part in the search that they
637 should kill their traverse children
639 static int ctdb_traverse_start_destructor(struct traverse_start_state
*state
)
641 struct ctdb_traverse_start r
;
644 DEBUG(DEBUG_ERR
,(__location__
" Traverse cancelled by client disconnect for database:0x%08x\n", state
->db_id
));
645 r
.db_id
= state
->db_id
;
646 r
.reqid
= state
->reqid
;
647 r
.srvid
= state
->srvid
;
649 data
.dptr
= (uint8_t *)&r
;
650 data
.dsize
= sizeof(r
);
652 ctdb_daemon_send_control(state
->ctdb
, CTDB_BROADCAST_CONNECTED
, 0,
653 CTDB_CONTROL_TRAVERSE_KILL
,
654 0, CTDB_CTRL_FLAG_NOREPLY
, data
, NULL
, NULL
);
659 callback which sends records as messages to the client
661 static void traverse_start_callback(void *p
, TDB_DATA key
, TDB_DATA data
)
663 struct traverse_start_state
*state
;
664 struct ctdb_rec_data
*d
;
667 state
= talloc_get_type(p
, struct traverse_start_state
);
669 d
= ctdb_marshall_record(state
, state
->reqid
, key
, NULL
, data
);
674 cdata
.dptr
= (uint8_t *)d
;
675 cdata
.dsize
= d
->length
;
677 ctdb_dispatch_message(state
->ctdb
, state
->srvid
, cdata
);
678 if (key
.dsize
== 0 && data
.dsize
== 0) {
679 DEBUG(DEBUG_NOTICE
, ("Ending traverse on DB %s (id %d), records %d\n",
680 state
->h
->ctdb_db
->db_name
, state
->h
->reqid
,
681 state
->num_records
));
683 if (state
->h
->timedout
) {
684 /* timed out, send TRAVERSE_KILL control */
687 /* end of traverse */
688 talloc_set_destructor(state
, NULL
);
692 state
->num_records
++;
698 * start a traverse_all - called as a control from a client.
699 * extended version to take the "withemptyrecords" parameter.
701 int32_t ctdb_control_traverse_start_ext(struct ctdb_context
*ctdb
,
707 struct ctdb_traverse_start_ext
*d
= (struct ctdb_traverse_start_ext
*)data
.dptr
;
708 struct traverse_start_state
*state
;
709 struct ctdb_db_context
*ctdb_db
;
710 struct ctdb_client
*client
= ctdb_reqid_find(ctdb
, client_id
, struct ctdb_client
);
712 if (client
== NULL
) {
713 DEBUG(DEBUG_ERR
,(__location__
" No client found\n"));
717 if (data
.dsize
!= sizeof(*d
)) {
718 DEBUG(DEBUG_ERR
,("Bad record size in ctdb_control_traverse_start\n"));
722 ctdb_db
= find_ctdb_db(ctdb
, d
->db_id
);
723 if (ctdb_db
== NULL
) {
727 if (ctdb_db
->unhealthy_reason
) {
728 if (ctdb
->tunable
.allow_unhealthy_db_read
== 0) {
729 DEBUG(DEBUG_ERR
,("db(%s) unhealty in ctdb_control_traverse_start: %s\n",
730 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
733 DEBUG(DEBUG_WARNING
,("warn: db(%s) unhealty in ctdb_control_traverse_start: %s\n",
734 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
737 state
= talloc(client
, struct traverse_start_state
);
742 state
->srcnode
= srcnode
;
743 state
->reqid
= d
->reqid
;
744 state
->srvid
= d
->srvid
;
745 state
->db_id
= d
->db_id
;
747 state
->withemptyrecords
= d
->withemptyrecords
;
748 state
->num_records
= 0;
750 state
->h
= ctdb_daemon_traverse_all(ctdb_db
, traverse_start_callback
, state
);
751 if (state
->h
== NULL
) {
756 talloc_set_destructor(state
, ctdb_traverse_start_destructor
);
762 * start a traverse_all - called as a control from a client.
764 int32_t ctdb_control_traverse_start(struct ctdb_context
*ctdb
,
770 struct ctdb_traverse_start
*d
= (struct ctdb_traverse_start
*)data
.dptr
;
771 struct ctdb_traverse_start_ext d2
;
778 d2
.withemptyrecords
= false;
780 data2
.dsize
= sizeof(d2
);
781 data2
.dptr
= (uint8_t *)&d2
;
783 return ctdb_control_traverse_start_ext(ctdb
, data2
, outdata
, srcnode
, client_id
);