ctdb-locking: Run debug locks script only if the node is active
[Samba.git] / ctdb / server / ctdb_lock.c
blob84c0de77eee1e68ebcb3574106169eca0b033c0d
1 /*
2 ctdb lock handling
3 provide API to do non-blocking locks for single or all databases
5 Copyright (C) Amitay Isaacs 2012
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "include/ctdb_private.h"
22 #include "include/ctdb_protocol.h"
23 #include "tevent.h"
24 #include "tdb.h"
25 #include "db_wrap.h"
26 #include "system/filesys.h"
27 #include "lib/util/dlinklist.h"
30 * Non-blocking Locking API
32 * 1. Create a child process to do blocking locks.
33 * 2. Once the locks are obtained, signal parent process via fd.
34 * 3. Invoke registered callback routine with locking status.
35 * 4. If the child process cannot get locks within certain time,
36 * execute an external script to debug.
38 * ctdb_lock_record() - get a lock on a record
39 * ctdb_lock_db() - get a lock on a DB
40 * ctdb_lock_alldb_prio() - get a lock on all DBs with given priority
41 * ctdb_lock_alldb() - get a lock on all DBs
43 * auto_mark - whether to mark/unmark DBs in before/after callback
46 enum lock_type {
47 LOCK_RECORD,
48 LOCK_DB,
49 LOCK_ALLDB_PRIO,
50 LOCK_ALLDB,
53 static const char * const lock_type_str[] = {
54 "lock_record",
55 "lock_db",
56 "lock_alldb_prio",
57 "lock_alldb",
60 struct lock_request;
62 /* lock_context is the common part for a lock request */
63 struct lock_context {
64 struct lock_context *next, *prev;
65 enum lock_type type;
66 struct ctdb_context *ctdb;
67 struct ctdb_db_context *ctdb_db;
68 TDB_DATA key;
69 uint32_t priority;
70 bool auto_mark;
71 struct lock_request *request;
72 pid_t child;
73 int fd[2];
74 struct tevent_fd *tfd;
75 struct tevent_timer *ttimer;
76 struct timeval start_time;
77 uint32_t key_hash;
78 bool can_schedule;
81 /* lock_request is the client specific part for a lock request */
82 struct lock_request {
83 struct lock_context *lctx;
84 void (*callback)(void *, bool);
85 void *private_data;
90 * Support samba 3.6.x (and older) versions which do not set db priority.
92 * By default, all databases are set to priority 1. So only when priority
93 * is set to 1, check for databases that need higher priority.
95 static bool later_db(struct ctdb_context *ctdb, const char *name)
97 if (ctdb->tunable.samba3_hack == 0) {
98 return false;
101 if (strstr(name, "brlock") ||
102 strstr(name, "g_lock") ||
103 strstr(name, "notify_onelevel") ||
104 strstr(name, "serverid") ||
105 strstr(name, "xattr_tdb")) {
106 return true;
109 return false;
112 typedef int (*db_handler_t)(struct ctdb_db_context *ctdb_db,
113 uint32_t priority,
114 void *private_data);
116 static int ctdb_db_iterator(struct ctdb_context *ctdb, uint32_t priority,
117 db_handler_t handler, void *private_data)
119 struct ctdb_db_context *ctdb_db;
120 int ret;
122 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
123 if (ctdb_db->priority != priority) {
124 continue;
126 if (later_db(ctdb, ctdb_db->db_name)) {
127 continue;
129 ret = handler(ctdb_db, priority, private_data);
130 if (ret != 0) {
131 return -1;
135 /* If priority != 1, later_db check is not required and can return */
136 if (priority != 1) {
137 return 0;
140 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
141 if (!later_db(ctdb, ctdb_db->db_name)) {
142 continue;
144 ret = handler(ctdb_db, priority, private_data);
145 if (ret != 0) {
146 return -1;
150 return 0;
155 * lock all databases - mark only
157 static int db_lock_mark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
158 void *private_data)
160 int tdb_transaction_write_lock_mark(struct tdb_context *);
162 DEBUG(DEBUG_INFO, ("marking locked database %s, priority:%u\n",
163 ctdb_db->db_name, priority));
165 if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
166 DEBUG(DEBUG_ERR, ("Failed to mark (transaction lock) database %s\n",
167 ctdb_db->db_name));
168 return -1;
171 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
172 DEBUG(DEBUG_ERR, ("Failed to mark (all lock) database %s\n",
173 ctdb_db->db_name));
174 return -1;
177 return 0;
180 int ctdb_lockall_mark_prio(struct ctdb_context *ctdb, uint32_t priority)
183 * This function is only used by the main dameon during recovery.
184 * At this stage, the databases have already been locked, by a
185 * dedicated child process. The freeze_mode variable is used to track
186 * whether the actual locks are held by the child process or not.
189 if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
190 DEBUG(DEBUG_ERR, ("Attempt to mark all databases locked when not frozen\n"));
191 return -1;
194 return ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL);
197 static int ctdb_lockall_mark(struct ctdb_context *ctdb)
199 uint32_t priority;
201 for (priority=1; priority<=NUM_DB_PRIORITIES; priority++) {
202 if (ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL) != 0) {
203 return -1;
207 return 0;
212 * lock all databases - unmark only
214 static int db_lock_unmark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
215 void *private_data)
217 int tdb_transaction_write_lock_unmark(struct tdb_context *);
219 DEBUG(DEBUG_INFO, ("unmarking locked database %s, priority:%u\n",
220 ctdb_db->db_name, priority));
222 if (tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb) != 0) {
223 DEBUG(DEBUG_ERR, ("Failed to unmark (transaction lock) database %s\n",
224 ctdb_db->db_name));
225 return -1;
228 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
229 DEBUG(DEBUG_ERR, ("Failed to unmark (all lock) database %s\n",
230 ctdb_db->db_name));
231 return -1;
234 return 0;
237 int ctdb_lockall_unmark_prio(struct ctdb_context *ctdb, uint32_t priority)
240 * This function is only used by the main daemon during recovery.
241 * At this stage, the databases have already been locked, by a
242 * dedicated child process. The freeze_mode variable is used to track
243 * whether the actual locks are held by the child process or not.
246 if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
247 DEBUG(DEBUG_ERR, ("Attempt to unmark all databases locked when not frozen\n"));
248 return -1;
251 return ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL);
254 static int ctdb_lockall_unmark(struct ctdb_context *ctdb)
256 uint32_t priority;
258 for (priority=NUM_DB_PRIORITIES; priority>0; priority--) {
259 if (ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL) != 0) {
260 return -1;
264 return 0;
268 static void ctdb_lock_schedule(struct ctdb_context *ctdb);
271 * Destructor to kill the child locking process
273 static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
275 if (lock_ctx->child > 0) {
276 ctdb_kill(lock_ctx->ctdb, lock_ctx->child, SIGKILL);
277 if (lock_ctx->type == LOCK_RECORD) {
278 DLIST_REMOVE(lock_ctx->ctdb_db->lock_current, lock_ctx);
279 } else {
280 DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
282 if (lock_ctx->ctdb_db) {
283 lock_ctx->ctdb_db->lock_num_current--;
285 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_current);
286 if (lock_ctx->ctdb_db) {
287 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
289 } else {
290 if (lock_ctx->type == LOCK_RECORD) {
291 DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
292 } else {
293 DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
295 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
296 if (lock_ctx->ctdb_db) {
297 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
301 ctdb_lock_schedule(lock_ctx->ctdb);
303 return 0;
308 * Destructor to remove lock request
310 static int ctdb_lock_request_destructor(struct lock_request *lock_request)
312 lock_request->lctx->request = NULL;
313 return 0;
316 void ctdb_lock_free_request_context(struct lock_request *lock_req)
318 struct lock_context *lock_ctx;
320 lock_ctx = lock_req->lctx;
321 talloc_free(lock_req);
322 talloc_free(lock_ctx);
327 * Process all the callbacks waiting for lock
329 * If lock has failed, callback is executed with locked=false
331 static void process_callbacks(struct lock_context *lock_ctx, bool locked)
333 struct lock_request *request;
335 if (lock_ctx->auto_mark && locked) {
336 switch (lock_ctx->type) {
337 case LOCK_RECORD:
338 tdb_chainlock_mark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
339 break;
341 case LOCK_DB:
342 tdb_lockall_mark(lock_ctx->ctdb_db->ltdb->tdb);
343 break;
345 case LOCK_ALLDB_PRIO:
346 ctdb_lockall_mark_prio(lock_ctx->ctdb, lock_ctx->priority);
347 break;
349 case LOCK_ALLDB:
350 ctdb_lockall_mark(lock_ctx->ctdb);
351 break;
355 request = lock_ctx->request;
356 if (lock_ctx->auto_mark) {
357 /* Reset the destructor, so request is not removed from the list */
358 talloc_set_destructor(request, NULL);
360 request->callback(request->private_data, locked);
362 if (lock_ctx->auto_mark && locked) {
363 switch (lock_ctx->type) {
364 case LOCK_RECORD:
365 tdb_chainlock_unmark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
366 break;
368 case LOCK_DB:
369 tdb_lockall_unmark(lock_ctx->ctdb_db->ltdb->tdb);
370 break;
372 case LOCK_ALLDB_PRIO:
373 ctdb_lockall_unmark_prio(lock_ctx->ctdb, lock_ctx->priority);
374 break;
376 case LOCK_ALLDB:
377 ctdb_lockall_unmark(lock_ctx->ctdb);
378 break;
384 static int lock_bucket_id(double t)
386 double ms = 1.e-3, s = 1;
387 int id;
389 if (t < 1*ms) {
390 id = 0;
391 } else if (t < 10*ms) {
392 id = 1;
393 } else if (t < 100*ms) {
394 id = 2;
395 } else if (t < 1*s) {
396 id = 3;
397 } else if (t < 2*s) {
398 id = 4;
399 } else if (t < 4*s) {
400 id = 5;
401 } else if (t < 8*s) {
402 id = 6;
403 } else if (t < 16*s) {
404 id = 7;
405 } else if (t < 32*s) {
406 id = 8;
407 } else if (t < 64*s) {
408 id = 9;
409 } else {
410 id = 10;
413 return id;
417 * Callback routine when the required locks are obtained.
418 * Called from parent context
420 static void ctdb_lock_handler(struct tevent_context *ev,
421 struct tevent_fd *tfd,
422 uint16_t flags,
423 void *private_data)
425 struct lock_context *lock_ctx;
426 TALLOC_CTX *tmp_ctx = NULL;
427 char c;
428 bool locked;
429 double t;
430 int id;
432 lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
434 /* cancel the timeout event */
435 TALLOC_FREE(lock_ctx->ttimer);
437 t = timeval_elapsed(&lock_ctx->start_time);
438 id = lock_bucket_id(t);
440 if (lock_ctx->auto_mark) {
441 tmp_ctx = talloc_new(ev);
442 talloc_steal(tmp_ctx, lock_ctx);
445 /* Read the status from the child process */
446 if (sys_read(lock_ctx->fd[0], &c, 1) != 1) {
447 locked = false;
448 } else {
449 locked = (c == 0 ? true : false);
452 /* Update statistics */
453 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_calls);
454 if (lock_ctx->ctdb_db) {
455 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_calls);
458 if (locked) {
459 if (lock_ctx->ctdb_db) {
460 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.buckets[id]);
461 CTDB_UPDATE_LATENCY(lock_ctx->ctdb, lock_ctx->ctdb_db,
462 lock_type_str[lock_ctx->type], locks.latency,
463 lock_ctx->start_time);
465 CTDB_UPDATE_DB_LATENCY(lock_ctx->ctdb_db, lock_type_str[lock_ctx->type], locks.latency, t);
466 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.buckets[id]);
468 } else {
469 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_failed);
470 if (lock_ctx->ctdb_db) {
471 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_failed);
475 process_callbacks(lock_ctx, locked);
477 if (lock_ctx->auto_mark) {
478 talloc_free(tmp_ctx);
484 * Callback routine when required locks are not obtained within timeout
485 * Called from parent context
487 static void ctdb_lock_timeout_handler(struct tevent_context *ev,
488 struct tevent_timer *ttimer,
489 struct timeval current_time,
490 void *private_data)
492 static const char * debug_locks = NULL;
493 struct lock_context *lock_ctx;
494 struct ctdb_context *ctdb;
495 pid_t pid;
497 lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
498 ctdb = lock_ctx->ctdb;
500 /* If a node stopped/banned, don't spam the logs */
501 if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
502 return;
504 if (lock_ctx->ctdb_db) {
505 DEBUG(DEBUG_WARNING,
506 ("Unable to get %s lock on database %s for %.0lf seconds\n",
507 (lock_ctx->type == LOCK_RECORD ? "RECORD" : "DB"),
508 lock_ctx->ctdb_db->db_name,
509 timeval_elapsed(&lock_ctx->start_time)));
510 } else {
511 DEBUG(DEBUG_WARNING,
512 ("Unable to get ALLDB locks for %.0lf seconds\n",
513 timeval_elapsed(&lock_ctx->start_time)));
516 /* Fire a child process to find the blocking process. */
517 if (debug_locks == NULL) {
518 debug_locks = getenv("CTDB_DEBUG_LOCKS");
519 if (debug_locks == NULL) {
520 debug_locks = talloc_asprintf(ctdb,
521 "%s/debug_locks.sh",
522 getenv("CTDB_BASE"));
525 if (debug_locks != NULL) {
526 pid = vfork();
527 if (pid == 0) {
528 execl(debug_locks, debug_locks, NULL);
529 _exit(0);
531 ctdb_track_child(ctdb, pid);
532 } else {
533 DEBUG(DEBUG_WARNING,
534 (__location__
535 " Unable to setup lock debugging - no memory?\n"));
538 /* reset the timeout timer */
539 // talloc_free(lock_ctx->ttimer);
540 lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
541 lock_ctx,
542 timeval_current_ofs(10, 0),
543 ctdb_lock_timeout_handler,
544 (void *)lock_ctx);
548 static int db_count_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
549 void *private_data)
551 int *count = (int *)private_data;
553 (*count)++;
555 return 0;
558 struct db_namelist {
559 char **names;
560 int n;
563 static int db_name_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
564 void *private_data)
566 struct db_namelist *list = (struct db_namelist *)private_data;
568 list->names[list->n] = talloc_strdup(list->names, ctdb_db->db_path);
569 list->n++;
571 return 0;
574 static char **lock_helper_args(TALLOC_CTX *mem_ctx, struct lock_context *lock_ctx, int fd)
576 struct ctdb_context *ctdb = lock_ctx->ctdb;
577 char **args = NULL;
578 int nargs, i;
579 int priority;
580 struct db_namelist list;
582 switch (lock_ctx->type) {
583 case LOCK_RECORD:
584 nargs = 6;
585 break;
587 case LOCK_DB:
588 nargs = 5;
589 break;
591 case LOCK_ALLDB_PRIO:
592 nargs = 4;
593 ctdb_db_iterator(ctdb, lock_ctx->priority, db_count_handler, &nargs);
594 break;
596 case LOCK_ALLDB:
597 nargs = 4;
598 for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
599 ctdb_db_iterator(ctdb, priority, db_count_handler, &nargs);
601 break;
604 /* Add extra argument for null termination */
605 nargs++;
607 args = talloc_array(mem_ctx, char *, nargs);
608 if (args == NULL) {
609 return NULL;
612 args[0] = talloc_strdup(args, "ctdb_lock_helper");
613 args[1] = talloc_asprintf(args, "%d", getpid());
614 args[2] = talloc_asprintf(args, "%d", fd);
616 switch (lock_ctx->type) {
617 case LOCK_RECORD:
618 args[3] = talloc_strdup(args, "RECORD");
619 args[4] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
620 if (lock_ctx->key.dsize == 0) {
621 args[5] = talloc_strdup(args, "NULL");
622 } else {
623 args[5] = hex_encode_talloc(args, lock_ctx->key.dptr, lock_ctx->key.dsize);
625 break;
627 case LOCK_DB:
628 args[3] = talloc_strdup(args, "DB");
629 args[4] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
630 break;
632 case LOCK_ALLDB_PRIO:
633 args[3] = talloc_strdup(args, "DB");
634 list.names = args;
635 list.n = 4;
636 ctdb_db_iterator(ctdb, lock_ctx->priority, db_name_handler, &list);
637 break;
639 case LOCK_ALLDB:
640 args[3] = talloc_strdup(args, "DB");
641 list.names = args;
642 list.n = 4;
643 for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
644 ctdb_db_iterator(ctdb, priority, db_name_handler, &list);
646 break;
649 /* Make sure last argument is NULL */
650 args[nargs-1] = NULL;
652 for (i=0; i<nargs-1; i++) {
653 if (args[i] == NULL) {
654 talloc_free(args);
655 return NULL;
659 return args;
663 * Find a lock request that can be scheduled
665 struct lock_context *ctdb_find_lock_context(struct ctdb_context *ctdb)
667 struct lock_context *lock_ctx, *next_ctx;
668 struct ctdb_db_context *ctdb_db;
670 /* First check if there are database lock requests */
672 for (lock_ctx = ctdb->lock_pending; lock_ctx != NULL;
673 lock_ctx = next_ctx) {
675 if (lock_ctx->request != NULL) {
676 /* Found a lock context with a request */
677 return lock_ctx;
680 next_ctx = lock_ctx->next;
682 DEBUG(DEBUG_INFO, ("Removing lock context without lock "
683 "request\n"));
684 DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
685 CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
686 if (lock_ctx->ctdb_db) {
687 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db,
688 locks.num_pending);
690 talloc_free(lock_ctx);
693 /* Next check database queues */
694 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
695 if (ctdb_db->lock_num_current ==
696 ctdb->tunable.lock_processes_per_db) {
697 continue;
700 for (lock_ctx = ctdb_db->lock_pending; lock_ctx != NULL;
701 lock_ctx = next_ctx) {
703 next_ctx = lock_ctx->next;
705 if (lock_ctx->request != NULL) {
706 return lock_ctx;
709 DEBUG(DEBUG_INFO, ("Removing lock context without "
710 "lock request\n"));
711 DLIST_REMOVE(ctdb_db->lock_pending, lock_ctx);
712 CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
713 CTDB_DECREMENT_DB_STAT(ctdb_db, locks.num_pending);
714 talloc_free(lock_ctx);
718 return NULL;
722 * Schedule a new lock child process
723 * Set up callback handler and timeout handler
725 static void ctdb_lock_schedule(struct ctdb_context *ctdb)
727 struct lock_context *lock_ctx;
728 int ret;
729 TALLOC_CTX *tmp_ctx;
730 const char *helper = BINDIR "/ctdb_lock_helper";
731 static const char *prog = NULL;
732 char **args;
734 if (prog == NULL) {
735 const char *t;
737 t = getenv("CTDB_LOCK_HELPER");
738 if (t != NULL) {
739 prog = talloc_strdup(ctdb, t);
740 } else {
741 prog = talloc_strdup(ctdb, helper);
743 CTDB_NO_MEMORY_VOID(ctdb, prog);
746 /* Find a lock context with requests */
747 lock_ctx = ctdb_find_lock_context(ctdb);
748 if (lock_ctx == NULL) {
749 return;
752 lock_ctx->child = -1;
753 ret = pipe(lock_ctx->fd);
754 if (ret != 0) {
755 DEBUG(DEBUG_ERR, ("Failed to create pipe in ctdb_lock_schedule\n"));
756 return;
759 set_close_on_exec(lock_ctx->fd[0]);
761 /* Create data for child process */
762 tmp_ctx = talloc_new(lock_ctx);
763 if (tmp_ctx == NULL) {
764 DEBUG(DEBUG_ERR, ("Failed to allocate memory for helper args\n"));
765 close(lock_ctx->fd[0]);
766 close(lock_ctx->fd[1]);
767 return;
770 /* Create arguments for lock helper */
771 args = lock_helper_args(tmp_ctx, lock_ctx, lock_ctx->fd[1]);
772 if (args == NULL) {
773 DEBUG(DEBUG_ERR, ("Failed to create lock helper args\n"));
774 close(lock_ctx->fd[0]);
775 close(lock_ctx->fd[1]);
776 talloc_free(tmp_ctx);
777 return;
780 lock_ctx->child = vfork();
782 if (lock_ctx->child == (pid_t)-1) {
783 DEBUG(DEBUG_ERR, ("Failed to create a child in ctdb_lock_schedule\n"));
784 close(lock_ctx->fd[0]);
785 close(lock_ctx->fd[1]);
786 talloc_free(tmp_ctx);
787 return;
791 /* Child process */
792 if (lock_ctx->child == 0) {
793 ret = execv(prog, args);
794 if (ret < 0) {
795 DEBUG(DEBUG_ERR, ("Failed to execute helper %s (%d, %s)\n",
796 prog, errno, strerror(errno)));
798 _exit(1);
801 /* Parent process */
802 ctdb_track_child(ctdb, lock_ctx->child);
803 close(lock_ctx->fd[1]);
805 talloc_set_destructor(lock_ctx, ctdb_lock_context_destructor);
807 talloc_free(tmp_ctx);
809 /* Set up timeout handler */
810 lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
811 lock_ctx,
812 timeval_current_ofs(10, 0),
813 ctdb_lock_timeout_handler,
814 (void *)lock_ctx);
815 if (lock_ctx->ttimer == NULL) {
816 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
817 lock_ctx->child = -1;
818 talloc_set_destructor(lock_ctx, NULL);
819 close(lock_ctx->fd[0]);
820 return;
823 /* Set up callback */
824 lock_ctx->tfd = tevent_add_fd(ctdb->ev,
825 lock_ctx,
826 lock_ctx->fd[0],
827 EVENT_FD_READ,
828 ctdb_lock_handler,
829 (void *)lock_ctx);
830 if (lock_ctx->tfd == NULL) {
831 TALLOC_FREE(lock_ctx->ttimer);
832 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
833 lock_ctx->child = -1;
834 talloc_set_destructor(lock_ctx, NULL);
835 close(lock_ctx->fd[0]);
836 return;
838 tevent_fd_set_auto_close(lock_ctx->tfd);
840 /* Move the context from pending to current */
841 if (lock_ctx->type == LOCK_RECORD) {
842 DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
843 DLIST_ADD_END(lock_ctx->ctdb_db->lock_current, lock_ctx, NULL);
844 } else {
845 DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
846 DLIST_ADD_END(ctdb->lock_current, lock_ctx, NULL);
848 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
849 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
850 if (lock_ctx->ctdb_db) {
851 lock_ctx->ctdb_db->lock_num_current++;
852 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
853 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
859 * Lock record / db depending on type
861 static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
862 struct ctdb_db_context *ctdb_db,
863 TDB_DATA key,
864 uint32_t priority,
865 void (*callback)(void *, bool),
866 void *private_data,
867 enum lock_type type,
868 bool auto_mark)
870 struct lock_context *lock_ctx = NULL;
871 struct lock_request *request;
873 if (callback == NULL) {
874 DEBUG(DEBUG_WARNING, ("No callback function specified, not locking\n"));
875 return NULL;
878 lock_ctx = talloc_zero(ctdb, struct lock_context);
879 if (lock_ctx == NULL) {
880 DEBUG(DEBUG_ERR, ("Failed to create a new lock context\n"));
881 return NULL;
884 if ((request = talloc_zero(lock_ctx, struct lock_request)) == NULL) {
885 talloc_free(lock_ctx);
886 return NULL;
889 lock_ctx->type = type;
890 lock_ctx->ctdb = ctdb;
891 lock_ctx->ctdb_db = ctdb_db;
892 lock_ctx->key.dsize = key.dsize;
893 if (key.dsize > 0) {
894 lock_ctx->key.dptr = talloc_memdup(lock_ctx, key.dptr, key.dsize);
895 if (lock_ctx->key.dptr == NULL) {
896 DEBUG(DEBUG_ERR, (__location__ "Memory allocation error\n"));
897 talloc_free(lock_ctx);
898 return NULL;
900 lock_ctx->key_hash = ctdb_hash(&key);
901 } else {
902 lock_ctx->key.dptr = NULL;
904 lock_ctx->priority = priority;
905 lock_ctx->auto_mark = auto_mark;
907 lock_ctx->request = request;
908 lock_ctx->child = -1;
910 /* Non-record locks are required by recovery and should be scheduled
911 * immediately, so keep them at the head of the pending queue.
913 if (lock_ctx->type == LOCK_RECORD) {
914 DLIST_ADD_END(ctdb_db->lock_pending, lock_ctx, NULL);
915 } else {
916 DLIST_ADD_END(ctdb->lock_pending, lock_ctx, NULL);
918 CTDB_INCREMENT_STAT(ctdb, locks.num_pending);
919 if (ctdb_db) {
920 CTDB_INCREMENT_DB_STAT(ctdb_db, locks.num_pending);
923 /* Start the timer when we activate the context */
924 lock_ctx->start_time = timeval_current();
926 request->lctx = lock_ctx;
927 request->callback = callback;
928 request->private_data = private_data;
930 talloc_set_destructor(request, ctdb_lock_request_destructor);
932 ctdb_lock_schedule(ctdb);
934 return request;
939 * obtain a lock on a record in a database
941 struct lock_request *ctdb_lock_record(struct ctdb_db_context *ctdb_db,
942 TDB_DATA key,
943 bool auto_mark,
944 void (*callback)(void *, bool),
945 void *private_data)
947 return ctdb_lock_internal(ctdb_db->ctdb,
948 ctdb_db,
949 key,
951 callback,
952 private_data,
953 LOCK_RECORD,
954 auto_mark);
959 * obtain a lock on a database
961 struct lock_request *ctdb_lock_db(struct ctdb_db_context *ctdb_db,
962 bool auto_mark,
963 void (*callback)(void *, bool),
964 void *private_data)
966 return ctdb_lock_internal(ctdb_db->ctdb,
967 ctdb_db,
968 tdb_null,
970 callback,
971 private_data,
972 LOCK_DB,
973 auto_mark);
978 * obtain locks on all databases of specified priority
980 struct lock_request *ctdb_lock_alldb_prio(struct ctdb_context *ctdb,
981 uint32_t priority,
982 bool auto_mark,
983 void (*callback)(void *, bool),
984 void *private_data)
986 if (priority < 1 || priority > NUM_DB_PRIORITIES) {
987 DEBUG(DEBUG_ERR, ("Invalid db priority: %u\n", priority));
988 return NULL;
991 return ctdb_lock_internal(ctdb,
992 NULL,
993 tdb_null,
994 priority,
995 callback,
996 private_data,
997 LOCK_ALLDB_PRIO,
998 auto_mark);
1003 * obtain locks on all databases
1005 struct lock_request *ctdb_lock_alldb(struct ctdb_context *ctdb,
1006 bool auto_mark,
1007 void (*callback)(void *, bool),
1008 void *private_data)
1010 return ctdb_lock_internal(ctdb,
1011 NULL,
1012 tdb_null,
1014 callback,
1015 private_data,
1016 LOCK_ALLDB,
1017 auto_mark);