4 Copyright (C) Rusty Russell 2010
5 Copyright (C) Ronnie Sahlberg 2011
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include <sys/socket.h>
30 #include <sys/socket.h>
32 #include <sys/ioctl.h>
34 #include "libctdb_private.h"
36 #include "local_tdb.h"
38 #include <dlinklist.h>
39 #include <ctdb_protocol.h>
41 /* Remove type-safety macros. */
42 #undef ctdb_attachdb_send
43 #undef ctdb_readrecordlock_async
44 #undef ctdb_readonlyrecordlock_async
48 struct ctdb_lock
*next
, *prev
;
50 struct ctdb_db
*ctdb_db
;
53 /* Is this a request for read-only lock ? */
56 /* This will always be set by the time user sees this. */
57 unsigned long held_magic
;
58 struct ctdb_ltdb_header
*hdr
;
60 /* For convenience, we stash original callback here. */
61 ctdb_rrl_callback_t callback
;
65 struct ctdb_connection
*ctdb
;
69 struct tdb_context
*tdb
;
71 ctdb_callback_t callback
;
75 static void remove_lock(struct ctdb_connection
*ctdb
, struct ctdb_lock
*lock
)
77 DLIST_REMOVE(ctdb
->locks
, lock
);
80 /* FIXME: for thread safety, need tid info too. */
81 static bool holding_lock(struct ctdb_connection
*ctdb
)
83 /* For the moment, you can't ever hold more than 1 lock. */
84 return (ctdb
->locks
!= NULL
);
87 static void add_lock(struct ctdb_connection
*ctdb
, struct ctdb_lock
*lock
)
89 DLIST_ADD(ctdb
->locks
, lock
);
92 static void cleanup_locks(struct ctdb_connection
*ctdb
, struct ctdb_db
*db
)
94 struct ctdb_lock
*i
, *next
;
96 for (i
= ctdb
->locks
; i
; i
= next
) {
97 /* Grab next pointer, as release_lock will free i */
99 if (i
->ctdb_db
== db
) {
100 ctdb_release_lock(db
, i
);
105 /* FIXME: Could be in shared util code with rest of ctdb */
106 static void close_noerr(int fd
)
113 /* FIXME: Could be in shared util code with rest of ctdb */
114 static void free_noerr(void *p
)
121 /* FIXME: Could be in shared util code with rest of ctdb */
122 static void set_nonblocking(int fd
)
125 v
= fcntl(fd
, F_GETFL
, 0);
126 fcntl(fd
, F_SETFL
, v
| O_NONBLOCK
);
129 /* FIXME: Could be in shared util code with rest of ctdb */
130 static void set_close_on_exec(int fd
)
133 v
= fcntl(fd
, F_GETFD
, 0);
134 fcntl(fd
, F_SETFD
, v
| FD_CLOEXEC
);
137 static void set_pnn(struct ctdb_connection
*ctdb
,
138 struct ctdb_request
*req
,
141 if (!ctdb_getpnn_recv(ctdb
, req
, &ctdb
->pnn
)) {
142 DEBUG(ctdb
, LOG_CRIT
,
143 "ctdb_connect(async): failed to get pnn");
146 ctdb_request_free(req
);
149 struct ctdb_connection
*ctdb_connect(const char *addr
,
150 ctdb_log_fn_t log_fn
, void *log_priv
)
152 struct ctdb_connection
*ctdb
;
153 struct sockaddr_un sun
;
155 ctdb
= malloc(sizeof(*ctdb
));
157 /* With no format string, we hope it doesn't use ap! */
159 memset(&ap
, 0, sizeof(ap
));
161 log_fn(log_priv
, LOG_ERR
, "ctdb_connect: no memory", ap
);
168 ctdb
->inqueue
= NULL
;
169 ctdb
->message_handlers
= NULL
;
171 ctdb
->broken
= false;
173 ctdb
->log_priv
= log_priv
;
176 memset(&sun
, 0, sizeof(sun
));
177 sun
.sun_family
= AF_UNIX
;
180 strncpy(sun
.sun_path
, addr
, sizeof(sun
.sun_path
)-1);
181 ctdb
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0);
185 if (connect(ctdb
->fd
, (struct sockaddr
*)&sun
, sizeof(sun
)) == -1)
188 set_nonblocking(ctdb
->fd
);
189 set_close_on_exec(ctdb
->fd
);
191 /* Immediately queue a request to get our pnn. */
192 if (!ctdb_getpnn_send(ctdb
, CTDB_CURRENT_NODE
, set_pnn
, NULL
))
198 close_noerr(ctdb
->fd
);
205 void ctdb_disconnect(struct ctdb_connection
*ctdb
)
207 struct ctdb_request
*i
;
209 DEBUG(ctdb
, LOG_DEBUG
, "ctdb_disconnect");
211 while ((i
= ctdb
->outq
) != NULL
) {
212 DLIST_REMOVE(ctdb
->outq
, i
);
213 ctdb_request_free(i
);
216 while ((i
= ctdb
->doneq
) != NULL
) {
217 DLIST_REMOVE(ctdb
->doneq
, i
);
218 ctdb_request_free(i
);
222 free_io_elem(ctdb
->in
);
224 remove_message_handlers(ctdb
);
227 /* Just in case they try to reuse */
232 int ctdb_get_fd(struct ctdb_connection
*ctdb
)
237 int ctdb_which_events(struct ctdb_connection
*ctdb
)
246 struct ctdb_request
*new_ctdb_request(struct ctdb_connection
*ctdb
, size_t len
,
247 ctdb_callback_t cb
, void *cbdata
)
249 struct ctdb_request
*req
= malloc(sizeof(*req
));
252 req
->io
= new_io_elem(len
);
258 req
->hdr
.hdr
= io_elem_data(req
->io
, NULL
);
261 req
->priv_data
= cbdata
;
263 req
->extra_destructor
= NULL
;
267 void ctdb_request_free(struct ctdb_request
*req
)
269 struct ctdb_connection
*ctdb
= req
->ctdb
;
271 if (req
->next
|| req
->prev
) {
272 DEBUG(ctdb
, LOG_ALERT
,
273 "ctdb_request_free: request not complete! ctdb_cancel? %p (id %u)",
274 req
, req
->hdr
.hdr
? req
->hdr
.hdr
->reqid
: 0);
275 ctdb_cancel(ctdb
, req
);
278 if (req
->extra_destructor
) {
279 req
->extra_destructor(ctdb
, req
);
282 free_io_elem(req
->reply
);
284 free_io_elem(req
->io
);
288 /* Sanity-checking wrapper for reply. */
289 static struct ctdb_reply_call
*unpack_reply_call(struct ctdb_request
*req
,
293 struct ctdb_reply_call
*inhdr
= io_elem_data(req
->reply
, &len
);
295 /* Library user error if this isn't a reply to a call. */
296 if (req
->hdr
.hdr
->operation
!= CTDB_REQ_CALL
) {
298 DEBUG(req
->ctdb
, LOG_ALERT
,
299 "This was not a ctdbd call request: operation %u",
300 req
->hdr
.hdr
->operation
);
304 if (req
->hdr
.call
->callid
!= callid
) {
306 DEBUG(req
->ctdb
, LOG_ALERT
,
307 "This was not a ctdbd %u call request: %u",
308 callid
, req
->hdr
.call
->callid
);
312 /* ctdbd or our error if this isn't a reply call. */
313 if (len
< sizeof(*inhdr
) || inhdr
->hdr
.operation
!= CTDB_REPLY_CALL
) {
315 DEBUG(req
->ctdb
, LOG_CRIT
,
316 "Invalid ctdbd call reply: len %zu, operation %u",
317 len
, inhdr
->hdr
.operation
);
324 /* Sanity-checking wrapper for reply. */
325 struct ctdb_reply_control
*unpack_reply_control(struct ctdb_request
*req
,
326 enum ctdb_controls control
)
329 struct ctdb_reply_control
*inhdr
= io_elem_data(req
->reply
, &len
);
331 /* Library user error if this isn't a reply to a call. */
332 if (len
< sizeof(*inhdr
)) {
334 DEBUG(req
->ctdb
, LOG_ALERT
,
335 "Short ctdbd control reply: %zu bytes", len
);
338 if (req
->hdr
.hdr
->operation
!= CTDB_REQ_CONTROL
) {
340 DEBUG(req
->ctdb
, LOG_ALERT
,
341 "This was not a ctdbd control request: operation %u",
342 req
->hdr
.hdr
->operation
);
346 /* ... or if it was a different control from what we expected. */
347 if (req
->hdr
.control
->opcode
!= control
) {
349 DEBUG(req
->ctdb
, LOG_ALERT
,
350 "This was not an opcode %u ctdbd control request: %u",
351 control
, req
->hdr
.control
->opcode
);
355 /* ctdbd or our error if this isn't a reply call. */
356 if (inhdr
->hdr
.operation
!= CTDB_REPLY_CONTROL
) {
358 DEBUG(req
->ctdb
, LOG_CRIT
,
359 "Invalid ctdbd control reply: operation %u",
360 inhdr
->hdr
.operation
);
367 static void handle_incoming(struct ctdb_connection
*ctdb
, struct io_elem
*in
)
369 struct ctdb_req_header
*hdr
;
371 struct ctdb_request
*i
;
373 hdr
= io_elem_data(in
, &len
);
374 /* FIXME: use len to check packet! */
376 if (hdr
->operation
== CTDB_REQ_MESSAGE
) {
377 deliver_message(ctdb
, hdr
);
381 for (i
= ctdb
->doneq
; i
; i
= i
->next
) {
382 if (i
->hdr
.hdr
->reqid
== hdr
->reqid
) {
383 DLIST_REMOVE(ctdb
->doneq
, i
);
385 i
->callback(ctdb
, i
, i
->priv_data
);
389 DEBUG(ctdb
, LOG_WARNING
,
390 "Unexpected ctdbd request reply: operation %u reqid %u",
391 hdr
->operation
, hdr
->reqid
);
395 /* Remove "harmless" errors. */
396 static ssize_t
real_error(ssize_t ret
)
398 if (ret
< 0 && (errno
== EINTR
|| errno
== EWOULDBLOCK
))
403 bool ctdb_service(struct ctdb_connection
*ctdb
, int revents
)
409 if (holding_lock(ctdb
)) {
410 DEBUG(ctdb
, LOG_ALERT
, "Do not block while holding lock!");
413 if (revents
& POLLOUT
) {
415 if (real_error(write_io_elem(ctdb
->fd
,
416 ctdb
->outq
->io
)) < 0) {
418 "ctdb_service: error writing to ctdbd");
422 if (io_elem_finished(ctdb
->outq
->io
)) {
423 struct ctdb_request
*done
= ctdb
->outq
;
424 DLIST_REMOVE(ctdb
->outq
, done
);
425 /* We add at the head: any dead ones
427 DLIST_ADD(ctdb
->doneq
, done
);
432 while (revents
& POLLIN
) {
436 if (ioctl(ctdb
->fd
, FIONREAD
, &num_ready
) != 0) {
438 "ctdb_service: ioctl(FIONREAD) %d", errno
);
442 if (num_ready
== 0) {
443 /* the descriptor has been closed or we have all our data */
449 ctdb
->in
= new_io_elem(sizeof(struct ctdb_req_header
));
452 "ctdb_service: allocating readbuf");
458 ret
= read_io_elem(ctdb
->fd
, ctdb
->in
);
459 if (real_error(ret
) < 0 || ret
== 0) {
460 /* They closed fd? */
464 "ctdb_service: error reading from ctdbd");
467 } else if (ret
< 0) {
468 /* No progress, stop loop. */
470 } else if (io_elem_finished(ctdb
->in
)) {
471 io_elem_queue(ctdb
, ctdb
->in
);
477 while (ctdb
->inqueue
!= NULL
) {
478 struct io_elem
*io
= ctdb
->inqueue
;
480 io_elem_dequeue(ctdb
, io
);
481 handle_incoming(ctdb
, io
);
487 /* This is inefficient. We could pull in idtree.c. */
488 static bool reqid_used(const struct ctdb_connection
*ctdb
, uint32_t reqid
)
490 struct ctdb_request
*i
;
492 for (i
= ctdb
->outq
; i
; i
= i
->next
) {
493 if (i
->hdr
.hdr
->reqid
== reqid
) {
497 for (i
= ctdb
->doneq
; i
; i
= i
->next
) {
498 if (i
->hdr
.hdr
->reqid
== reqid
) {
505 uint32_t new_reqid(struct ctdb_connection
*ctdb
)
507 while (reqid_used(ctdb
, ctdb
->next_id
)) {
510 return ctdb
->next_id
++;
513 struct ctdb_request
*new_ctdb_control_request(struct ctdb_connection
*ctdb
,
516 const void *extra_data
,
518 ctdb_callback_t callback
,
521 struct ctdb_request
*req
;
522 struct ctdb_req_control
*pkt
;
524 req
= new_ctdb_request(
525 ctdb
, offsetof(struct ctdb_req_control
, data
) + extra
,
530 io_elem_init_req_header(req
->io
,
531 CTDB_REQ_CONTROL
, destnode
, new_reqid(ctdb
));
533 pkt
= req
->hdr
.control
;
535 pkt
->opcode
= opcode
;
539 pkt
->datalen
= extra
;
540 memcpy(pkt
->data
, extra_data
, extra
);
541 DLIST_ADD(ctdb
->outq
, req
);
545 void ctdb_cancel_callback(struct ctdb_connection
*ctdb
,
546 struct ctdb_request
*req
,
549 ctdb_request_free(req
);
552 void ctdb_cancel(struct ctdb_connection
*ctdb
, struct ctdb_request
*req
)
554 if (!req
->next
&& !req
->prev
) {
555 DEBUG(ctdb
, LOG_ALERT
,
556 "ctdb_cancel: request completed! ctdb_request_free? %p (id %u)",
557 req
, req
->hdr
.hdr
? req
->hdr
.hdr
->reqid
: 0);
558 ctdb_request_free(req
);
562 DEBUG(ctdb
, LOG_DEBUG
, "ctdb_cancel: %p (id %u)",
563 req
, req
->hdr
.hdr
? req
->hdr
.hdr
->reqid
: 0);
565 /* FIXME: If it's not sent, we could just free it right now. */
566 req
->callback
= ctdb_cancel_callback
;
569 void ctdb_detachdb(struct ctdb_connection
*ctdb
, struct ctdb_db
*db
)
571 cleanup_locks(ctdb
, db
);
576 static void destroy_req_db(struct ctdb_connection
*ctdb
,
577 struct ctdb_request
*req
);
578 static void attachdb_done(struct ctdb_connection
*ctdb
,
579 struct ctdb_request
*req
,
581 static void attachdb_getdbpath_done(struct ctdb_connection
*ctdb
,
582 struct ctdb_request
*req
,
585 struct ctdb_request
*
586 ctdb_attachdb_send(struct ctdb_connection
*ctdb
,
587 const char *name
, bool persistent
, uint32_t tdb_flags
,
588 ctdb_callback_t callback
, void *private_data
)
590 struct ctdb_request
*req
;
594 /* FIXME: Search if db already open. */
595 db
= malloc(sizeof(*db
));
601 opcode
= CTDB_CONTROL_DB_ATTACH_PERSISTENT
;
603 opcode
= CTDB_CONTROL_DB_ATTACH
;
606 req
= new_ctdb_control_request(ctdb
, opcode
, CTDB_CURRENT_NODE
, name
,
607 strlen(name
) + 1, attachdb_done
, db
);
610 "ctdb_attachdb_send: failed allocating DB_ATTACH");
616 db
->tdb_flags
= tdb_flags
;
617 db
->persistent
= persistent
;
618 db
->callback
= callback
;
619 db
->private_data
= private_data
;
621 req
->extra_destructor
= destroy_req_db
;
622 /* This is set non-NULL when we succeed, see ctdb_attachdb_recv */
625 /* Flags get overloaded into srvid. */
626 req
->hdr
.control
->srvid
= tdb_flags
;
627 DEBUG(db
->ctdb
, LOG_DEBUG
,
628 "ctdb_attachdb_send: DB_ATTACH request %p", req
);
632 static void destroy_req_db(struct ctdb_connection
*ctdb
,
633 struct ctdb_request
*req
)
635 /* Incomplete db is in priv_data. */
636 free(req
->priv_data
);
637 /* second request is chained off this one. */
639 ctdb_request_free(req
->extra
);
643 static void attachdb_done(struct ctdb_connection
*ctdb
,
644 struct ctdb_request
*req
,
647 struct ctdb_db
*db
= _db
;
648 struct ctdb_request
*req2
;
649 struct ctdb_reply_control
*reply
;
650 enum ctdb_controls control
= CTDB_CONTROL_DB_ATTACH
;
652 if (db
->persistent
) {
653 control
= CTDB_CONTROL_DB_ATTACH_PERSISTENT
;
656 reply
= unpack_reply_control(req
, control
);
657 if (!reply
|| reply
->status
!= 0) {
660 "ctdb_attachdb_send(async): DB_ATTACH status %i",
663 /* We failed. Hand request to user and have them discover it
664 * via ctdb_attachdb_recv. */
665 db
->callback(ctdb
, req
, db
->private_data
);
668 db
->id
= *(uint32_t *)reply
->data
;
670 /* Now we do another call, to get the dbpath. */
671 req2
= new_ctdb_control_request(db
->ctdb
, CTDB_CONTROL_GETDBPATH
,
673 &db
->id
, sizeof(db
->id
),
674 attachdb_getdbpath_done
, db
);
676 DEBUG(db
->ctdb
, LOG_ERR
,
677 "ctdb_attachdb_send(async): failed to allocate");
678 db
->callback(ctdb
, req
, db
->private_data
);
683 DEBUG(db
->ctdb
, LOG_DEBUG
,
684 "ctdb_attachdb_send(async): created getdbpath request");
687 static void attachdb_getdbpath_done(struct ctdb_connection
*ctdb
,
688 struct ctdb_request
*req
,
691 struct ctdb_db
*db
= _db
;
693 /* Do callback on original request. */
694 db
->callback(ctdb
, req
->extra
, db
->private_data
);
697 struct ctdb_db
*ctdb_attachdb_recv(struct ctdb_connection
*ctdb
,
698 struct ctdb_request
*req
)
700 struct ctdb_request
*dbpath_req
= req
->extra
;
701 struct ctdb_reply_control
*reply
;
702 struct ctdb_db
*db
= req
->priv_data
;
703 uint32_t tdb_flags
= db
->tdb_flags
;
704 struct tdb_logging_context log
;
706 /* Never sent the dbpath request? We've failed. */
708 /* FIXME: Save errno? */
713 reply
= unpack_reply_control(dbpath_req
, CTDB_CONTROL_GETDBPATH
);
717 if (reply
->status
!= 0) {
718 DEBUG(db
->ctdb
, LOG_ERR
,
719 "ctdb_attachdb_recv: reply status %i", reply
->status
);
723 tdb_flags
= db
->persistent
? TDB_DEFAULT
: TDB_NOSYNC
;
724 tdb_flags
|= TDB_DISALLOW_NESTING
;
726 log
.log_fn
= ctdb_tdb_log_bridge
;
727 log
.log_private
= ctdb
;
728 db
->tdb
= tdb_open_ex((char *)reply
->data
, 0, tdb_flags
, O_RDWR
, 0,
730 if (db
->tdb
== NULL
) {
731 DEBUG(db
->ctdb
, LOG_ERR
,
732 "ctdb_attachdb_recv: failed to tdb_open %s",
733 (char *)reply
->data
);
737 /* Finally, separate the db from the request (see destroy_req_db). */
738 req
->priv_data
= NULL
;
739 DEBUG(db
->ctdb
, LOG_DEBUG
,
740 "ctdb_attachdb_recv: db %p, tdb %s", db
, (char *)reply
->data
);
744 static unsigned long lock_magic(struct ctdb_lock
*lock
)
746 /* A non-zero magic specific to this structure. */
747 return ((unsigned long)lock
->key
.dptr
748 ^ (((unsigned long)lock
->key
.dptr
) << 16)
749 ^ 0xBADC0FFEEBADC0DEULL
)
753 /* This is only called on locks before they're held. */
754 static void free_lock(struct ctdb_lock
*lock
)
756 if (lock
->held_magic
) {
757 DEBUG(lock
->ctdb_db
->ctdb
, LOG_ALERT
,
758 "free_lock invalid lock %p", lock
);
765 void ctdb_release_lock(struct ctdb_db
*ctdb_db
, struct ctdb_lock
*lock
)
767 if (lock
->held_magic
!= lock_magic(lock
)) {
768 DEBUG(lock
->ctdb_db
->ctdb
, LOG_ALERT
,
769 "ctdb_release_lock invalid lock %p", lock
);
770 } else if (lock
->ctdb_db
!= ctdb_db
) {
772 DEBUG(ctdb_db
->ctdb
, LOG_ALERT
,
773 "ctdb_release_lock: wrong ctdb_db.");
775 tdb_chainunlock(lock
->ctdb_db
->tdb
, lock
->key
);
776 DEBUG(lock
->ctdb_db
->ctdb
, LOG_DEBUG
,
777 "ctdb_release_lock %p", lock
);
778 remove_lock(lock
->ctdb_db
->ctdb
, lock
);
780 lock
->held_magic
= 0;
785 /* We keep the lock if local node is the dmaster. */
786 static bool try_readrecordlock(struct ctdb_lock
*lock
, TDB_DATA
*data
)
788 struct ctdb_ltdb_header
*hdr
;
790 if (tdb_chainlock(lock
->ctdb_db
->tdb
, lock
->key
) != 0) {
791 DEBUG(lock
->ctdb_db
->ctdb
, LOG_WARNING
,
792 "ctdb_readrecordlock_async: failed to chainlock");
796 hdr
= ctdb_local_fetch(lock
->ctdb_db
->tdb
, lock
->key
, data
);
797 if (hdr
&& lock
->readonly
&& (hdr
->flags
& CTDB_REC_RO_HAVE_READONLY
) ) {
798 DEBUG(lock
->ctdb_db
->ctdb
, LOG_DEBUG
,
799 "ctdb_readrecordlock_async: got local lock for ro");
800 lock
->held_magic
= lock_magic(lock
);
802 add_lock(lock
->ctdb_db
->ctdb
, lock
);
805 if (hdr
&& hdr
->dmaster
== lock
->ctdb_db
->ctdb
->pnn
) {
806 DEBUG(lock
->ctdb_db
->ctdb
, LOG_DEBUG
,
807 "ctdb_readrecordlock_async: got local lock");
808 lock
->held_magic
= lock_magic(lock
);
810 add_lock(lock
->ctdb_db
->ctdb
, lock
);
814 /* we dont have the record locally,
815 * drop to writelock to force a migration
817 if (!hdr
&& lock
->readonly
) {
818 lock
->readonly
= false;
821 tdb_chainunlock(lock
->ctdb_db
->tdb
, lock
->key
);
826 /* If they shutdown before we hand them the lock, we free it here. */
827 static void destroy_lock(struct ctdb_connection
*ctdb
,
828 struct ctdb_request
*req
)
830 free_lock(req
->extra
);
833 static void readrecordlock_retry(struct ctdb_connection
*ctdb
,
834 struct ctdb_request
*req
, void *private)
836 struct ctdb_lock
*lock
= req
->extra
;
837 struct ctdb_reply_call
*reply
;
840 /* OK, we've received reply to fetch migration */
841 reply
= unpack_reply_call(req
, CTDB_FETCH_FUNC
);
842 if (!reply
|| reply
->status
!= 0) {
845 "ctdb_readrecordlock_async(async):"
846 " FETCH returned %i", reply
->status
);
848 lock
->callback(lock
->ctdb_db
, NULL
, tdb_null
, private);
849 ctdb_request_free(req
); /* Also frees lock. */
853 /* Can we get lock now? */
854 if (try_readrecordlock(lock
, &data
)) {
855 /* Now it's their responsibility to free lock & request! */
856 req
->extra_destructor
= NULL
;
857 lock
->callback(lock
->ctdb_db
, lock
, data
, private);
858 ctdb_request_free(req
);
862 /* Retransmit the same request again (we lost race). */
863 io_elem_reset(req
->io
);
864 DLIST_ADD(ctdb
->outq
, req
);
868 ctdb_readrecordlock_internal(struct ctdb_db
*ctdb_db
, TDB_DATA key
,
870 ctdb_rrl_callback_t callback
, void *cbdata
)
872 struct ctdb_request
*req
;
873 struct ctdb_lock
*lock
;
876 if (holding_lock(ctdb_db
->ctdb
)) {
877 DEBUG(ctdb_db
->ctdb
, LOG_ALERT
,
878 "ctdb_readrecordlock_async: already holding lock");
883 lock
= malloc(sizeof(*lock
) + key
.dsize
);
885 DEBUG(ctdb_db
->ctdb
, LOG_ERR
,
886 "ctdb_readrecordlock_async: lock allocation failed");
889 lock
->key
.dptr
= (void *)(lock
+ 1);
890 memcpy(lock
->key
.dptr
, key
.dptr
, key
.dsize
);
891 lock
->key
.dsize
= key
.dsize
;
892 lock
->ctdb_db
= ctdb_db
;
894 lock
->held_magic
= 0;
895 lock
->readonly
= readonly
;
898 if (try_readrecordlock(lock
, &data
)) {
899 callback(ctdb_db
, lock
, data
, cbdata
);
903 /* Slow path: create request. */
904 req
= new_ctdb_request(
906 offsetof(struct ctdb_req_call
, data
) + key
.dsize
,
907 readrecordlock_retry
, cbdata
);
909 DEBUG(ctdb_db
->ctdb
, LOG_ERR
,
910 "ctdb_readrecordlock_async: allocation failed");
915 req
->extra_destructor
= destroy_lock
;
916 /* We store the original callback in the lock, and use our own. */
917 lock
->callback
= callback
;
919 io_elem_init_req_header(req
->io
, CTDB_REQ_CALL
, CTDB_CURRENT_NODE
,
920 new_reqid(ctdb_db
->ctdb
));
922 if (lock
->readonly
) {
923 req
->hdr
.call
->flags
= CTDB_WANT_READONLY
;
925 req
->hdr
.call
->flags
= CTDB_IMMEDIATE_MIGRATION
;
927 req
->hdr
.call
->db_id
= ctdb_db
->id
;
928 req
->hdr
.call
->callid
= CTDB_FETCH_FUNC
;
929 req
->hdr
.call
->hopcount
= 0;
930 req
->hdr
.call
->keylen
= key
.dsize
;
931 req
->hdr
.call
->calldatalen
= 0;
932 memcpy(req
->hdr
.call
->data
, key
.dptr
, key
.dsize
);
933 DLIST_ADD(ctdb_db
->ctdb
->outq
, req
);
938 ctdb_readrecordlock_async(struct ctdb_db
*ctdb_db
, TDB_DATA key
,
939 ctdb_rrl_callback_t callback
, void *cbdata
)
941 return ctdb_readrecordlock_internal(ctdb_db
, key
,
947 ctdb_readonlyrecordlock_async(struct ctdb_db
*ctdb_db
, TDB_DATA key
,
948 ctdb_rrl_callback_t callback
, void *cbdata
)
950 return ctdb_readrecordlock_internal(ctdb_db
, key
,
955 bool ctdb_writerecord(struct ctdb_db
*ctdb_db
,
956 struct ctdb_lock
*lock
, TDB_DATA data
)
958 if (lock
->readonly
) {
960 DEBUG(ctdb_db
->ctdb
, LOG_ALERT
,
961 "ctdb_writerecord: Can not write, read-only record.");
965 if (lock
->ctdb_db
!= ctdb_db
) {
967 DEBUG(ctdb_db
->ctdb
, LOG_ALERT
,
968 "ctdb_writerecord: Can not write, wrong ctdb_db.");
972 if (lock
->held_magic
!= lock_magic(lock
)) {
974 DEBUG(ctdb_db
->ctdb
, LOG_ALERT
,
975 "ctdb_writerecord: Can not write. Lock has been released.");
979 if (ctdb_db
->persistent
) {
981 DEBUG(ctdb_db
->ctdb
, LOG_ALERT
,
982 "ctdb_writerecord: cannot write to persistent db");
986 switch (ctdb_local_store(ctdb_db
->tdb
, lock
->key
, lock
->hdr
, data
)) {
988 DEBUG(ctdb_db
->ctdb
, LOG_DEBUG
,
989 "ctdb_writerecord: optimized away noop write.");
997 DEBUG(ctdb_db
->ctdb
, LOG_CRIT
,
998 "ctdb_writerecord: out of memory.");
1001 DEBUG(ctdb_db
->ctdb
, LOG_ALERT
,
1002 "ctdb_writerecord: record changed under lock?");
1004 default: /* TDB already logged. */
1012 struct ctdb_traverse_state
{
1013 struct ctdb_request
*handle
;
1014 struct ctdb_db
*ctdb_db
;
1017 ctdb_traverse_callback_t callback
;
1021 static void traverse_remhnd_cb(struct ctdb_connection
*ctdb
,
1022 struct ctdb_request
*req
, void *private_data
)
1024 struct ctdb_traverse_state
*state
= private_data
;
1026 if (!ctdb_remove_message_handler_recv(ctdb
, state
->handle
)) {
1027 DEBUG(ctdb
, LOG_ERR
,
1028 "Failed to remove message handler for"
1030 state
->callback(state
->ctdb_db
->ctdb
, state
->ctdb_db
,
1031 TRAVERSE_STATUS_ERROR
,
1035 ctdb_request_free(state
->handle
);
1036 state
->handle
= NULL
;
1040 static void msg_h(struct ctdb_connection
*ctdb
, uint64_t srvid
,
1041 TDB_DATA data
, void *private_data
)
1043 struct ctdb_traverse_state
*state
= private_data
;
1044 struct ctdb_db
*ctdb_db
= state
->ctdb_db
;
1045 struct ctdb_rec_data
*d
= (struct ctdb_rec_data
*)data
.dptr
;
1048 if (data
.dsize
< sizeof(uint32_t) ||
1049 d
->length
!= data
.dsize
) {
1050 DEBUG(ctdb
, LOG_ERR
,
1051 "Bad data size %u in traverse_handler",
1052 (unsigned)data
.dsize
);
1053 state
->callback(state
->ctdb_db
->ctdb
, state
->ctdb_db
,
1054 TRAVERSE_STATUS_ERROR
,
1057 state
->handle
= ctdb_remove_message_handler_send(
1058 state
->ctdb_db
->ctdb
, state
->srvid
,
1060 traverse_remhnd_cb
, state
);
1064 key
.dsize
= d
->keylen
;
1065 key
.dptr
= &d
->data
[0];
1066 data
.dsize
= d
->datalen
;
1067 data
.dptr
= &d
->data
[d
->keylen
];
1069 if (key
.dsize
== 0 && data
.dsize
== 0) {
1070 state
->callback(state
->ctdb_db
->ctdb
, state
->ctdb_db
,
1071 TRAVERSE_STATUS_FINISHED
,
1074 state
->handle
= ctdb_remove_message_handler_send(
1075 state
->ctdb_db
->ctdb
, state
->srvid
,
1077 traverse_remhnd_cb
, state
);
1081 if (data
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1082 /* empty records are deleted records in ctdb */
1086 data
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1087 data
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1089 if (state
->callback(ctdb
, ctdb_db
,
1090 TRAVERSE_STATUS_RECORD
,
1091 key
, data
, state
->cbdata
) != 0) {
1092 state
->handle
= ctdb_remove_message_handler_send(
1093 state
->ctdb_db
->ctdb
, state
->srvid
,
1095 traverse_remhnd_cb
, state
);
1100 static void traverse_start_cb(struct ctdb_connection
*ctdb
,
1101 struct ctdb_request
*req
, void *private_data
)
1103 struct ctdb_traverse_state
*state
= private_data
;
1105 ctdb_request_free(state
->handle
);
1106 state
->handle
= NULL
;
1109 static void traverse_msghnd_cb(struct ctdb_connection
*ctdb
,
1110 struct ctdb_request
*req
, void *private_data
)
1112 struct ctdb_traverse_state
*state
= private_data
;
1113 struct ctdb_db
*ctdb_db
= state
->ctdb_db
;
1114 struct ctdb_traverse_start t
;
1116 if (!ctdb_set_message_handler_recv(ctdb
, state
->handle
)) {
1117 DEBUG(ctdb
, LOG_ERR
,
1118 "Failed to register message handler for"
1120 state
->callback(state
->ctdb_db
->ctdb
, state
->ctdb_db
,
1121 TRAVERSE_STATUS_ERROR
,
1124 ctdb_request_free(state
->handle
);
1125 state
->handle
= NULL
;
1129 ctdb_request_free(state
->handle
);
1130 state
->handle
= NULL
;
1132 t
.db_id
= ctdb_db
->id
;
1133 t
.srvid
= state
->srvid
;
1136 state
->handle
= new_ctdb_control_request(ctdb
,
1137 CTDB_CONTROL_TRAVERSE_START
,
1140 traverse_start_cb
, state
);
1141 if (state
->handle
== NULL
) {
1142 DEBUG(ctdb
, LOG_ERR
,
1143 "ctdb_traverse_async:"
1144 " failed to send traverse_start control");
1145 state
->callback(state
->ctdb_db
->ctdb
, state
->ctdb_db
,
1146 TRAVERSE_STATUS_ERROR
,
1149 state
->handle
= ctdb_remove_message_handler_send(
1150 state
->ctdb_db
->ctdb
, state
->srvid
,
1152 traverse_remhnd_cb
, state
);
1157 bool ctdb_traverse_async(struct ctdb_db
*ctdb_db
,
1158 ctdb_traverse_callback_t callback
, void *cbdata
)
1160 struct ctdb_connection
*ctdb
= ctdb_db
->ctdb
;
1161 struct ctdb_traverse_state
*state
;
1162 static uint32_t tid
= 0;
1164 state
= malloc(sizeof(struct ctdb_traverse_state
));
1165 if (state
== NULL
) {
1166 DEBUG(ctdb
, LOG_ERR
,
1167 "ctdb_traverse_async: no memory."
1168 " allocate state failed");
1173 state
->srvid
= CTDB_SRVID_TRAVERSE_RANGE
|tid
;
1175 state
->callback
= callback
;
1176 state
->cbdata
= cbdata
;
1177 state
->ctdb_db
= ctdb_db
;
1179 state
->handle
= ctdb_set_message_handler_send(ctdb_db
->ctdb
,
1182 traverse_msghnd_cb
, state
);
1183 if (state
->handle
== NULL
) {
1184 DEBUG(ctdb
, LOG_ERR
,
1185 "ctdb_traverse_async:"
1186 " failed ctdb_set_message_handler_send");
1194 int ctdb_num_out_queue(struct ctdb_connection
*ctdb
)
1196 struct ctdb_request
*req
;
1199 for (i
= 0, req
= ctdb
->outq
; req
; req
= req
->next
, i
++)
1205 int ctdb_num_in_flight(struct ctdb_connection
*ctdb
)
1207 struct ctdb_request
*req
;
1210 for (i
= 0, req
= ctdb
->doneq
; req
; req
= req
->next
, i
++)
1216 int ctdb_num_active(struct ctdb_connection
*ctdb
)
1218 return ctdb_num_out_queue(ctdb
)
1219 + ctdb_num_in_flight(ctdb
);