smbd: Simplify validate_lock_entries
[Samba.git] / ctdb / server / ctdb_traverse.c
blob99e7e8f3e0dd76cf9f01949a036619fbc8492352
1 /*
2 efficient async ctdb traverse
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/wait.h"
23 #include "db_wrap.h"
24 #include "tdb.h"
25 #include "../include/ctdb_private.h"
26 #include "lib/util/dlinklist.h"
28 typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
31 handle returned to caller - freeing this handler will kill the child and
32 terminate the traverse
34 struct ctdb_traverse_local_handle {
35 struct ctdb_traverse_local_handle *next, *prev;
36 struct ctdb_db_context *ctdb_db;
37 int fd[2];
38 pid_t child;
39 uint64_t srvid;
40 uint32_t client_reqid;
41 uint32_t reqid;
42 int srcnode;
43 void *private_data;
44 ctdb_traverse_fn_t callback;
45 bool withemptyrecords;
46 struct tevent_fd *fde;
47 int records_failed;
48 int records_sent;
52 * called when traverse is completed by child or on error
54 static void ctdb_traverse_child_handler(struct tevent_context *ev, struct tevent_fd *fde,
55 uint16_t flags, void *private_data)
57 struct ctdb_traverse_local_handle *h = talloc_get_type(private_data,
58 struct ctdb_traverse_local_handle);
59 ctdb_traverse_fn_t callback = h->callback;
60 void *p = h->private_data;
61 int res;
62 ssize_t n;
64 /* Read the number of records sent by traverse child */
65 n = read(h->fd[0], &res, sizeof(res));
66 if (n < 0 || n != sizeof(res)) {
67 /* Traverse child failed */
68 DEBUG(DEBUG_ERR, ("Local traverse failed db:%s reqid:%d\n",
69 h->ctdb_db->db_name, h->reqid));
70 } else if (res < 0) {
71 /* Traverse failed */
72 res = -res;
73 DEBUG(DEBUG_ERR, ("Local traverse failed db:%s reqid:%d records:%d\n",
74 h->ctdb_db->db_name, h->reqid, res));
75 } else {
76 DEBUG(DEBUG_INFO, ("Local traverse end db:%s reqid:%d records:%d\n",
77 h->ctdb_db->db_name, h->reqid, res));
80 callback(p, tdb_null, tdb_null);
84 destroy a in-flight traverse operation
86 static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
88 DLIST_REMOVE(h->ctdb_db->traverse, h);
89 ctdb_kill(h->ctdb_db->ctdb, h->child, SIGKILL);
90 return 0;
94 callback from tdb_traverse_read()
96 static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
98 struct ctdb_traverse_local_handle *h = talloc_get_type(p,
99 struct ctdb_traverse_local_handle);
100 struct ctdb_rec_data *d;
101 struct ctdb_ltdb_header *hdr;
102 int res, status;
103 TDB_DATA outdata;
105 hdr = (struct ctdb_ltdb_header *)data.dptr;
107 if (h->ctdb_db->persistent == 0) {
108 /* filter out zero-length records */
109 if (!h->withemptyrecords &&
110 data.dsize <= sizeof(struct ctdb_ltdb_header))
112 return 0;
115 /* filter out non-authoritative records */
116 if (hdr->dmaster != h->ctdb_db->ctdb->pnn) {
117 return 0;
121 d = ctdb_marshall_record(h, h->reqid, key, NULL, data);
122 if (d == NULL) {
123 /* error handling is tricky in this child code .... */
124 h->records_failed++;
125 return -1;
128 outdata.dptr = (uint8_t *)d;
129 outdata.dsize = d->length;
131 res = ctdb_control(h->ctdb_db->ctdb, h->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
132 CTDB_CTRL_FLAG_NOREPLY, outdata, NULL, NULL, &status, NULL, NULL);
133 if (res != 0 || status != 0) {
134 h->records_failed++;
135 return -1;
138 h->records_sent++;
139 return 0;
142 struct traverse_all_state {
143 struct ctdb_context *ctdb;
144 struct ctdb_traverse_local_handle *h;
145 uint32_t reqid;
146 uint32_t srcnode;
147 uint32_t client_reqid;
148 uint64_t srvid;
149 bool withemptyrecords;
153 setup a non-blocking traverse of a local ltdb. The callback function
154 will be called on every record in the local ltdb. To stop the
155 traverse, talloc_free() the traverse_handle.
157 The traverse is finished when the callback is called with tdb_null for key and data
159 static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
160 ctdb_traverse_fn_t callback,
161 struct traverse_all_state *all_state)
163 struct ctdb_traverse_local_handle *h;
164 int ret;
166 h = talloc_zero(all_state, struct ctdb_traverse_local_handle);
167 if (h == NULL) {
168 return NULL;
171 ret = pipe(h->fd);
173 if (ret != 0) {
174 talloc_free(h);
175 return NULL;
178 h->child = ctdb_fork(ctdb_db->ctdb);
180 if (h->child == (pid_t)-1) {
181 close(h->fd[0]);
182 close(h->fd[1]);
183 talloc_free(h);
184 return NULL;
187 h->callback = callback;
188 h->private_data = all_state;
189 h->ctdb_db = ctdb_db;
190 h->client_reqid = all_state->client_reqid;
191 h->reqid = all_state->reqid;
192 h->srvid = all_state->srvid;
193 h->srcnode = all_state->srcnode;
194 h->withemptyrecords = all_state->withemptyrecords;
196 if (h->child == 0) {
197 /* start the traverse in the child */
198 int res, status;
199 pid_t parent = getpid();
200 struct ctdb_context *ctdb = ctdb_db->ctdb;
201 struct ctdb_rec_data *d;
202 TDB_DATA outdata;
204 close(h->fd[0]);
206 ctdb_set_process_name("ctdb_traverse");
207 if (switch_from_server_to_client(ctdb, "traverse_local-%s:",
208 ctdb_db->db_name) != 0) {
209 DEBUG(DEBUG_CRIT, ("Failed to switch traverse child into client mode\n"));
210 _exit(0);
213 d = ctdb_marshall_record(h, h->reqid, tdb_null, NULL, tdb_null);
214 if (d == NULL) {
215 res = 0;
216 write(h->fd[1], &res, sizeof(int));
217 _exit(0);
220 res = tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
221 if (res == -1 || h->records_failed > 0) {
222 /* traverse failed */
223 res = -(h->records_sent);
224 } else {
225 res = h->records_sent;
228 /* Wait till all the data is flushed from output queue */
229 while (ctdb_queue_length(ctdb->daemon.queue) > 0) {
230 tevent_loop_once(ctdb->ev);
233 /* End traverse by sending empty record */
234 outdata.dptr = (uint8_t *)d;
235 outdata.dsize = d->length;
236 ret = ctdb_control(ctdb, h->srcnode, 0,
237 CTDB_CONTROL_TRAVERSE_DATA,
238 CTDB_CTRL_FLAG_NOREPLY, outdata,
239 NULL, NULL, &status, NULL, NULL);
240 if (ret == -1 || status == -1) {
241 if (res > 0) {
242 res = -res;
246 write(h->fd[1], &res, sizeof(res));
248 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
249 sleep(5);
251 _exit(0);
254 close(h->fd[1]);
255 set_close_on_exec(h->fd[0]);
257 talloc_set_destructor(h, traverse_local_destructor);
259 DLIST_ADD(ctdb_db->traverse, h);
261 h->fde = tevent_add_fd(ctdb_db->ctdb->ev, h, h->fd[0], EVENT_FD_READ,
262 ctdb_traverse_child_handler, h);
263 if (h->fde == NULL) {
264 close(h->fd[0]);
265 talloc_free(h);
266 return NULL;
268 tevent_fd_set_auto_close(h->fde);
270 return h;
274 struct ctdb_traverse_all_handle {
275 struct ctdb_context *ctdb;
276 struct ctdb_db_context *ctdb_db;
277 uint32_t reqid;
278 ctdb_traverse_fn_t callback;
279 void *private_data;
280 uint32_t null_count;
281 bool timedout;
285 destroy a traverse_all op
287 static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
289 ctdb_reqid_remove(state->ctdb, state->reqid);
290 return 0;
293 struct ctdb_traverse_all {
294 uint32_t db_id;
295 uint32_t reqid;
296 uint32_t pnn;
297 uint32_t client_reqid;
298 uint64_t srvid;
301 struct ctdb_traverse_all_ext {
302 uint32_t db_id;
303 uint32_t reqid;
304 uint32_t pnn;
305 uint32_t client_reqid;
306 uint64_t srvid;
307 bool withemptyrecords;
310 /* called when a traverse times out */
311 static void ctdb_traverse_all_timeout(struct event_context *ev, struct timed_event *te,
312 struct timeval t, void *private_data)
314 struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
316 DEBUG(DEBUG_ERR,(__location__ " Traverse all timeout on database:%s\n", state->ctdb_db->db_name));
317 CTDB_INCREMENT_STAT(state->ctdb, timeouts.traverse);
319 state->timedout = true;
320 state->callback(state->private_data, tdb_null, tdb_null);
324 struct traverse_start_state {
325 struct ctdb_context *ctdb;
326 struct ctdb_traverse_all_handle *h;
327 uint32_t srcnode;
328 uint32_t reqid;
329 uint32_t db_id;
330 uint64_t srvid;
331 bool withemptyrecords;
332 int num_records;
337 setup a cluster-wide non-blocking traverse of a ctdb. The
338 callback function will be called on every record in the local
339 ltdb. To stop the traverse, talloc_free() the traverse_handle.
341 The traverse is finished when the callback is called with tdb_null
342 for key and data
344 static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
345 ctdb_traverse_fn_t callback,
346 struct traverse_start_state *start_state)
348 struct ctdb_traverse_all_handle *state;
349 struct ctdb_context *ctdb = ctdb_db->ctdb;
350 int ret;
351 TDB_DATA data;
352 struct ctdb_traverse_all r;
353 struct ctdb_traverse_all_ext r_ext;
354 uint32_t destination;
356 state = talloc(start_state, struct ctdb_traverse_all_handle);
357 if (state == NULL) {
358 return NULL;
361 state->ctdb = ctdb;
362 state->ctdb_db = ctdb_db;
363 state->reqid = ctdb_reqid_new(ctdb_db->ctdb, state);
364 state->callback = callback;
365 state->private_data = start_state;
366 state->null_count = 0;
367 state->timedout = false;
369 talloc_set_destructor(state, ctdb_traverse_all_destructor);
371 if (start_state->withemptyrecords) {
372 r_ext.db_id = ctdb_db->db_id;
373 r_ext.reqid = state->reqid;
374 r_ext.pnn = ctdb->pnn;
375 r_ext.client_reqid = start_state->reqid;
376 r_ext.srvid = start_state->srvid;
377 r_ext.withemptyrecords = start_state->withemptyrecords;
379 data.dptr = (uint8_t *)&r_ext;
380 data.dsize = sizeof(r_ext);
381 } else {
382 r.db_id = ctdb_db->db_id;
383 r.reqid = state->reqid;
384 r.pnn = ctdb->pnn;
385 r.client_reqid = start_state->reqid;
386 r.srvid = start_state->srvid;
388 data.dptr = (uint8_t *)&r;
389 data.dsize = sizeof(r);
392 if (ctdb_db->persistent == 0) {
393 /* normal database, traverse all nodes */
394 destination = CTDB_BROADCAST_VNNMAP;
395 } else {
396 int i;
397 /* persistent database, traverse one node, preferably
398 * the local one
400 destination = ctdb->pnn;
401 /* check we are in the vnnmap */
402 for (i=0; i < ctdb->vnn_map->size; i++) {
403 if (ctdb->vnn_map->map[i] == ctdb->pnn) {
404 break;
407 /* if we are not in the vnn map we just pick the first
408 * node instead
410 if (i == ctdb->vnn_map->size) {
411 destination = ctdb->vnn_map->map[0];
415 /* tell all the nodes in the cluster to start sending records to this
416 * node, or if it is a persistent database, just tell the local
417 * node
420 if (start_state->withemptyrecords) {
421 ret = ctdb_daemon_send_control(ctdb, destination, 0,
422 CTDB_CONTROL_TRAVERSE_ALL_EXT,
423 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
424 } else {
425 ret = ctdb_daemon_send_control(ctdb, destination, 0,
426 CTDB_CONTROL_TRAVERSE_ALL,
427 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
430 if (ret != 0) {
431 talloc_free(state);
432 return NULL;
435 DEBUG(DEBUG_NOTICE,("Starting traverse on DB %s (id %d)\n",
436 ctdb_db->db_name, state->reqid));
438 /* timeout the traverse */
439 event_add_timed(ctdb->ev, state,
440 timeval_current_ofs(ctdb->tunable.traverse_timeout, 0),
441 ctdb_traverse_all_timeout, state);
443 return state;
447 called when local traverse ends
449 static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
451 struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
453 /* we're done */
454 talloc_free(state);
458 * extended version to take the "withemptyrecords" parameter"
460 int32_t ctdb_control_traverse_all_ext(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
462 struct ctdb_traverse_all_ext *c = (struct ctdb_traverse_all_ext *)data.dptr;
463 struct traverse_all_state *state;
464 struct ctdb_db_context *ctdb_db;
466 if (data.dsize != sizeof(struct ctdb_traverse_all_ext)) {
467 DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all_ext\n"));
468 return -1;
471 ctdb_db = find_ctdb_db(ctdb, c->db_id);
472 if (ctdb_db == NULL) {
473 return -1;
476 if (ctdb_db->unhealthy_reason) {
477 if (ctdb->tunable.allow_unhealthy_db_read == 0) {
478 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
479 ctdb_db->db_name, ctdb_db->unhealthy_reason));
480 return -1;
482 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
483 ctdb_db->db_name, ctdb_db->unhealthy_reason));
486 state = talloc(ctdb_db, struct traverse_all_state);
487 if (state == NULL) {
488 return -1;
491 state->reqid = c->reqid;
492 state->srcnode = c->pnn;
493 state->ctdb = ctdb;
494 state->client_reqid = c->client_reqid;
495 state->srvid = c->srvid;
496 state->withemptyrecords = c->withemptyrecords;
498 state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
499 if (state->h == NULL) {
500 talloc_free(state);
501 return -1;
504 return 0;
508 called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
509 setup a traverse of our local ltdb, sending the records as
510 CTDB_CONTROL_TRAVERSE_DATA records back to the originator
512 int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
514 struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
515 struct traverse_all_state *state;
516 struct ctdb_db_context *ctdb_db;
518 if (data.dsize != sizeof(struct ctdb_traverse_all)) {
519 DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all\n"));
520 return -1;
523 ctdb_db = find_ctdb_db(ctdb, c->db_id);
524 if (ctdb_db == NULL) {
525 return -1;
528 if (ctdb_db->unhealthy_reason) {
529 if (ctdb->tunable.allow_unhealthy_db_read == 0) {
530 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
531 ctdb_db->db_name, ctdb_db->unhealthy_reason));
532 return -1;
534 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
535 ctdb_db->db_name, ctdb_db->unhealthy_reason));
538 state = talloc(ctdb_db, struct traverse_all_state);
539 if (state == NULL) {
540 return -1;
543 state->reqid = c->reqid;
544 state->srcnode = c->pnn;
545 state->ctdb = ctdb;
546 state->client_reqid = c->client_reqid;
547 state->srvid = c->srvid;
548 state->withemptyrecords = false;
550 state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
551 if (state->h == NULL) {
552 talloc_free(state);
553 return -1;
556 return 0;
561 called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
562 call the traverse_all callback with the record
564 int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
566 struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
567 struct ctdb_traverse_all_handle *state;
568 TDB_DATA key;
569 ctdb_traverse_fn_t callback;
570 void *private_data;
572 if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
573 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_data\n"));
574 return -1;
577 state = ctdb_reqid_find(ctdb, d->reqid, struct ctdb_traverse_all_handle);
578 if (state == NULL || d->reqid != state->reqid) {
579 /* traverse might have been terminated already */
580 return -1;
583 key.dsize = d->keylen;
584 key.dptr = &d->data[0];
585 data.dsize = d->datalen;
586 data.dptr = &d->data[d->keylen];
588 if (key.dsize == 0 && data.dsize == 0) {
589 state->null_count++;
590 /* Persistent databases are only scanned on one node (the local
591 * node)
593 if (state->ctdb_db->persistent == 0) {
594 if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
595 return 0;
600 callback = state->callback;
601 private_data = state->private_data;
603 callback(private_data, key, data);
604 return 0;
608 kill a in-progress traverse, used when a client disconnects
610 int32_t ctdb_control_traverse_kill(struct ctdb_context *ctdb, TDB_DATA data,
611 TDB_DATA *outdata, uint32_t srcnode)
613 struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
614 struct ctdb_db_context *ctdb_db;
615 struct ctdb_traverse_local_handle *t;
617 ctdb_db = find_ctdb_db(ctdb, d->db_id);
618 if (ctdb_db == NULL) {
619 return -1;
622 for (t=ctdb_db->traverse; t; t=t->next) {
623 if (t->client_reqid == d->reqid &&
624 t->srvid == d->srvid) {
625 talloc_free(t);
626 break;
630 return 0;
635 this is called when a client disconnects during a traverse
636 we need to notify all the nodes taking part in the search that they
637 should kill their traverse children
639 static int ctdb_traverse_start_destructor(struct traverse_start_state *state)
641 struct ctdb_traverse_start r;
642 TDB_DATA data;
644 DEBUG(DEBUG_ERR,(__location__ " Traverse cancelled by client disconnect for database:0x%08x\n", state->db_id));
645 r.db_id = state->db_id;
646 r.reqid = state->reqid;
647 r.srvid = state->srvid;
649 data.dptr = (uint8_t *)&r;
650 data.dsize = sizeof(r);
652 ctdb_daemon_send_control(state->ctdb, CTDB_BROADCAST_CONNECTED, 0,
653 CTDB_CONTROL_TRAVERSE_KILL,
654 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
655 return 0;
659 callback which sends records as messages to the client
661 static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
663 struct traverse_start_state *state;
664 struct ctdb_rec_data *d;
665 TDB_DATA cdata;
667 state = talloc_get_type(p, struct traverse_start_state);
669 d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
670 if (d == NULL) {
671 return;
674 cdata.dptr = (uint8_t *)d;
675 cdata.dsize = d->length;
677 ctdb_dispatch_message(state->ctdb, state->srvid, cdata);
678 if (key.dsize == 0 && data.dsize == 0) {
679 DEBUG(DEBUG_NOTICE, ("Ending traverse on DB %s (id %d), records %d\n",
680 state->h->ctdb_db->db_name, state->h->reqid,
681 state->num_records));
683 if (state->h->timedout) {
684 /* timed out, send TRAVERSE_KILL control */
685 talloc_free(state);
686 } else {
687 /* end of traverse */
688 talloc_set_destructor(state, NULL);
689 talloc_free(state);
691 } else {
692 state->num_records++;
698 * start a traverse_all - called as a control from a client.
699 * extended version to take the "withemptyrecords" parameter.
701 int32_t ctdb_control_traverse_start_ext(struct ctdb_context *ctdb,
702 TDB_DATA data,
703 TDB_DATA *outdata,
704 uint32_t srcnode,
705 uint32_t client_id)
707 struct ctdb_traverse_start_ext *d = (struct ctdb_traverse_start_ext *)data.dptr;
708 struct traverse_start_state *state;
709 struct ctdb_db_context *ctdb_db;
710 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
712 if (client == NULL) {
713 DEBUG(DEBUG_ERR,(__location__ " No client found\n"));
714 return -1;
717 if (data.dsize != sizeof(*d)) {
718 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_start\n"));
719 return -1;
722 ctdb_db = find_ctdb_db(ctdb, d->db_id);
723 if (ctdb_db == NULL) {
724 return -1;
727 if (ctdb_db->unhealthy_reason) {
728 if (ctdb->tunable.allow_unhealthy_db_read == 0) {
729 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_start: %s\n",
730 ctdb_db->db_name, ctdb_db->unhealthy_reason));
731 return -1;
733 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_start: %s\n",
734 ctdb_db->db_name, ctdb_db->unhealthy_reason));
737 state = talloc(client, struct traverse_start_state);
738 if (state == NULL) {
739 return -1;
742 state->srcnode = srcnode;
743 state->reqid = d->reqid;
744 state->srvid = d->srvid;
745 state->db_id = d->db_id;
746 state->ctdb = ctdb;
747 state->withemptyrecords = d->withemptyrecords;
748 state->num_records = 0;
750 state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
751 if (state->h == NULL) {
752 talloc_free(state);
753 return -1;
756 talloc_set_destructor(state, ctdb_traverse_start_destructor);
758 return 0;
762 * start a traverse_all - called as a control from a client.
764 int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb,
765 TDB_DATA data,
766 TDB_DATA *outdata,
767 uint32_t srcnode,
768 uint32_t client_id)
770 struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
771 struct ctdb_traverse_start_ext d2;
772 TDB_DATA data2;
774 ZERO_STRUCT(d2);
775 d2.db_id = d->db_id;
776 d2.reqid = d->reqid;
777 d2.srvid = d->srvid;
778 d2.withemptyrecords = false;
780 data2.dsize = sizeof(d2);
781 data2.dptr = (uint8_t *)&d2;
783 return ctdb_control_traverse_start_ext(ctdb, data2, outdata, srcnode, client_id);