ctdb: make use of ctdb_canonicalize_ip_inplace() in ctdb_control_tcp_client()
[Samba.git] / source3 / lib / g_lock.c
blob33f088b2a438fcc98a7dbf289ec5e1f5107a9ed7
1 /*
2 Unix SMB/CIFS implementation.
3 global locks based on dbwrap and messaging
4 Copyright (C) 2009 by Volker Lendecke
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "lib/util/server_id.h"
23 #include "lib/util/debug.h"
24 #include "lib/util/talloc_stack.h"
25 #include "lib/util/samba_util.h"
26 #include "lib/util_path.h"
27 #include "dbwrap/dbwrap.h"
28 #include "dbwrap/dbwrap_open.h"
29 #include "dbwrap/dbwrap_watch.h"
30 #include "g_lock.h"
31 #include "util_tdb.h"
32 #include "../lib/util/tevent_ntstatus.h"
33 #include "messages.h"
34 #include "serverid.h"
36 struct g_lock_ctx {
37 struct db_context *db;
38 struct messaging_context *msg;
39 enum dbwrap_lock_order lock_order;
40 bool busy;
43 struct g_lock {
44 struct server_id exclusive;
45 size_t num_shared;
46 uint8_t *shared;
47 uint64_t unique_lock_epoch;
48 uint64_t unique_data_epoch;
49 size_t datalen;
50 uint8_t *data;
53 static bool g_lock_parse(uint8_t *buf, size_t buflen, struct g_lock *lck)
55 struct server_id exclusive;
56 size_t num_shared, shared_len;
57 uint64_t unique_lock_epoch;
58 uint64_t unique_data_epoch;
60 if (buflen < (SERVER_ID_BUF_LENGTH + /* exclusive */
61 sizeof(uint64_t) + /* seqnum */
62 sizeof(uint32_t))) { /* num_shared */
63 struct g_lock ret = {
64 .exclusive.pid = 0,
65 .unique_lock_epoch = generate_unique_u64(0),
66 .unique_data_epoch = generate_unique_u64(0),
68 *lck = ret;
69 return true;
72 server_id_get(&exclusive, buf);
73 buf += SERVER_ID_BUF_LENGTH;
74 buflen -= SERVER_ID_BUF_LENGTH;
76 unique_lock_epoch = BVAL(buf, 0);
77 buf += sizeof(uint64_t);
78 buflen -= sizeof(uint64_t);
80 unique_data_epoch = BVAL(buf, 0);
81 buf += sizeof(uint64_t);
82 buflen -= sizeof(uint64_t);
84 num_shared = IVAL(buf, 0);
85 buf += sizeof(uint32_t);
86 buflen -= sizeof(uint32_t);
88 if (num_shared > buflen/SERVER_ID_BUF_LENGTH) {
89 DBG_DEBUG("num_shared=%zu, buflen=%zu\n",
90 num_shared,
91 buflen);
92 return false;
95 shared_len = num_shared * SERVER_ID_BUF_LENGTH;
97 *lck = (struct g_lock) {
98 .exclusive = exclusive,
99 .num_shared = num_shared,
100 .shared = buf,
101 .unique_lock_epoch = unique_lock_epoch,
102 .unique_data_epoch = unique_data_epoch,
103 .datalen = buflen-shared_len,
104 .data = buf+shared_len,
107 return true;
110 static void g_lock_get_shared(const struct g_lock *lck,
111 size_t i,
112 struct server_id *shared)
114 if (i >= lck->num_shared) {
115 abort();
117 server_id_get(shared, lck->shared + i*SERVER_ID_BUF_LENGTH);
120 static void g_lock_del_shared(struct g_lock *lck, size_t i)
122 if (i >= lck->num_shared) {
123 abort();
125 lck->num_shared -= 1;
126 if (i < lck->num_shared) {
127 memcpy(lck->shared + i*SERVER_ID_BUF_LENGTH,
128 lck->shared + lck->num_shared*SERVER_ID_BUF_LENGTH,
129 SERVER_ID_BUF_LENGTH);
133 static NTSTATUS g_lock_store(
134 struct db_record *rec,
135 struct g_lock *lck,
136 struct server_id *new_shared,
137 const TDB_DATA *new_dbufs,
138 size_t num_new_dbufs)
140 uint8_t exclusive[SERVER_ID_BUF_LENGTH];
141 uint8_t seqnum_buf[sizeof(uint64_t)*2];
142 uint8_t sizebuf[sizeof(uint32_t)];
143 uint8_t new_shared_buf[SERVER_ID_BUF_LENGTH];
145 struct TDB_DATA dbufs[6 + num_new_dbufs];
147 dbufs[0] = (TDB_DATA) {
148 .dptr = exclusive, .dsize = sizeof(exclusive),
150 dbufs[1] = (TDB_DATA) {
151 .dptr = seqnum_buf, .dsize = sizeof(seqnum_buf),
153 dbufs[2] = (TDB_DATA) {
154 .dptr = sizebuf, .dsize = sizeof(sizebuf),
156 dbufs[3] = (TDB_DATA) {
157 .dptr = lck->shared,
158 .dsize = lck->num_shared * SERVER_ID_BUF_LENGTH,
160 dbufs[4] = (TDB_DATA) { 0 };
161 dbufs[5] = (TDB_DATA) {
162 .dptr = lck->data, .dsize = lck->datalen,
165 if (num_new_dbufs != 0) {
166 memcpy(&dbufs[6],
167 new_dbufs,
168 num_new_dbufs * sizeof(TDB_DATA));
171 server_id_put(exclusive, lck->exclusive);
172 SBVAL(seqnum_buf, 0, lck->unique_lock_epoch);
173 SBVAL(seqnum_buf, 8, lck->unique_data_epoch);
175 if (new_shared != NULL) {
176 if (lck->num_shared >= UINT32_MAX) {
177 return NT_STATUS_BUFFER_OVERFLOW;
180 server_id_put(new_shared_buf, *new_shared);
182 dbufs[4] = (TDB_DATA) {
183 .dptr = new_shared_buf,
184 .dsize = sizeof(new_shared_buf),
187 lck->num_shared += 1;
190 SIVAL(sizebuf, 0, lck->num_shared);
192 return dbwrap_record_storev(rec, dbufs, ARRAY_SIZE(dbufs), 0);
195 struct g_lock_ctx *g_lock_ctx_init_backend(
196 TALLOC_CTX *mem_ctx,
197 struct messaging_context *msg,
198 struct db_context **backend)
200 struct g_lock_ctx *result;
202 result = talloc_zero(mem_ctx, struct g_lock_ctx);
203 if (result == NULL) {
204 return NULL;
206 result->msg = msg;
207 result->lock_order = DBWRAP_LOCK_ORDER_NONE;
209 result->db = db_open_watched(result, backend, msg);
210 if (result->db == NULL) {
211 DBG_WARNING("db_open_watched failed\n");
212 TALLOC_FREE(result);
213 return NULL;
215 return result;
218 void g_lock_set_lock_order(struct g_lock_ctx *ctx,
219 enum dbwrap_lock_order lock_order)
221 ctx->lock_order = lock_order;
224 struct g_lock_ctx *g_lock_ctx_init(TALLOC_CTX *mem_ctx,
225 struct messaging_context *msg)
227 char *db_path = NULL;
228 struct db_context *backend = NULL;
229 struct g_lock_ctx *ctx = NULL;
231 db_path = lock_path(mem_ctx, "g_lock.tdb");
232 if (db_path == NULL) {
233 return NULL;
236 backend = db_open(
237 mem_ctx,
238 db_path,
240 TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH|TDB_VOLATILE,
241 O_RDWR|O_CREAT,
242 0600,
243 DBWRAP_LOCK_ORDER_3,
244 DBWRAP_FLAG_NONE);
245 TALLOC_FREE(db_path);
246 if (backend == NULL) {
247 DBG_WARNING("Could not open g_lock.tdb\n");
248 return NULL;
251 ctx = g_lock_ctx_init_backend(mem_ctx, msg, &backend);
252 return ctx;
255 static void g_lock_cleanup_dead(
256 struct g_lock *lck,
257 struct server_id *dead_blocker)
259 bool exclusive_died;
260 struct server_id_buf tmp;
262 if (dead_blocker == NULL) {
263 return;
266 exclusive_died = server_id_equal(dead_blocker, &lck->exclusive);
268 if (exclusive_died) {
269 DBG_DEBUG("Exclusive holder %s died\n",
270 server_id_str_buf(lck->exclusive, &tmp));
271 lck->exclusive.pid = 0;
274 if (lck->num_shared != 0) {
275 bool shared_died;
276 struct server_id shared;
278 g_lock_get_shared(lck, 0, &shared);
279 shared_died = server_id_equal(dead_blocker, &shared);
281 if (shared_died) {
282 DBG_DEBUG("Shared holder %s died\n",
283 server_id_str_buf(shared, &tmp));
284 g_lock_del_shared(lck, 0);
289 static ssize_t g_lock_find_shared(
290 struct g_lock *lck,
291 const struct server_id *self)
293 size_t i;
295 for (i=0; i<lck->num_shared; i++) {
296 struct server_id shared;
297 bool same;
299 g_lock_get_shared(lck, i, &shared);
301 same = server_id_equal(self, &shared);
302 if (same) {
303 return i;
307 return -1;
310 static void g_lock_cleanup_shared(struct g_lock *lck)
312 size_t i;
313 struct server_id check;
314 bool exists;
316 if (lck->num_shared == 0) {
317 return;
321 * Read locks can stay around forever if the process dies. Do
322 * a heuristic check for process existence: Check one random
323 * process for existence. Hopefully this will keep runaway
324 * read locks under control.
326 i = generate_random() % lck->num_shared;
327 g_lock_get_shared(lck, i, &check);
329 exists = serverid_exists(&check);
330 if (!exists) {
331 struct server_id_buf tmp;
332 DBG_DEBUG("Shared locker %s died -- removing\n",
333 server_id_str_buf(check, &tmp));
334 g_lock_del_shared(lck, i);
338 struct g_lock_lock_cb_state {
339 struct g_lock_ctx *ctx;
340 struct db_record *rec;
341 struct g_lock *lck;
342 struct server_id *new_shared;
343 g_lock_lock_cb_fn_t cb_fn;
344 void *cb_private;
345 TALLOC_CTX *update_mem_ctx;
346 TDB_DATA updated_data;
347 bool existed;
348 bool modified;
349 bool unlock;
352 NTSTATUS g_lock_lock_cb_dump(struct g_lock_lock_cb_state *cb_state,
353 void (*fn)(struct server_id exclusive,
354 size_t num_shared,
355 const struct server_id *shared,
356 const uint8_t *data,
357 size_t datalen,
358 void *private_data),
359 void *private_data)
361 struct g_lock *lck = cb_state->lck;
363 /* We allow a cn_fn only for G_LOCK_WRITE for now... */
364 SMB_ASSERT(lck->num_shared == 0);
366 fn(lck->exclusive,
367 0, /* num_shared */
368 NULL, /* shared */
369 lck->data,
370 lck->datalen,
371 private_data);
373 return NT_STATUS_OK;
376 NTSTATUS g_lock_lock_cb_writev(struct g_lock_lock_cb_state *cb_state,
377 const TDB_DATA *dbufs,
378 size_t num_dbufs)
380 NTSTATUS status;
382 status = dbwrap_merge_dbufs(&cb_state->updated_data,
383 cb_state->update_mem_ctx,
384 dbufs, num_dbufs);
385 if (!NT_STATUS_IS_OK(status)) {
386 return status;
389 cb_state->modified = true;
390 cb_state->lck->data = cb_state->updated_data.dptr;
391 cb_state->lck->datalen = cb_state->updated_data.dsize;
393 return NT_STATUS_OK;
396 void g_lock_lock_cb_unlock(struct g_lock_lock_cb_state *cb_state)
398 cb_state->unlock = true;
401 struct g_lock_lock_cb_watch_data_state {
402 struct tevent_context *ev;
403 struct g_lock_ctx *ctx;
404 TDB_DATA key;
405 struct server_id blocker;
406 bool blockerdead;
407 uint64_t unique_lock_epoch;
408 uint64_t unique_data_epoch;
409 uint64_t watch_instance;
410 NTSTATUS status;
413 static void g_lock_lock_cb_watch_data_done(struct tevent_req *subreq);
415 struct tevent_req *g_lock_lock_cb_watch_data_send(
416 TALLOC_CTX *mem_ctx,
417 struct tevent_context *ev,
418 struct g_lock_lock_cb_state *cb_state,
419 struct server_id blocker)
421 struct tevent_req *req = NULL;
422 struct g_lock_lock_cb_watch_data_state *state = NULL;
423 struct tevent_req *subreq = NULL;
424 TDB_DATA key = dbwrap_record_get_key(cb_state->rec);
426 req = tevent_req_create(
427 mem_ctx, &state, struct g_lock_lock_cb_watch_data_state);
428 if (req == NULL) {
429 return NULL;
431 state->ev = ev;
432 state->ctx = cb_state->ctx;
433 state->blocker = blocker;
435 state->key = tdb_data_talloc_copy(state, key);
436 if (tevent_req_nomem(state->key.dptr, req)) {
437 return tevent_req_post(req, ev);
440 state->unique_lock_epoch = cb_state->lck->unique_lock_epoch;
441 state->unique_data_epoch = cb_state->lck->unique_data_epoch;
443 DBG_DEBUG("state->unique_data_epoch=%"PRIu64"\n", state->unique_data_epoch);
445 subreq = dbwrap_watched_watch_send(
446 state, state->ev, cb_state->rec, 0, state->blocker);
447 if (tevent_req_nomem(subreq, req)) {
448 return tevent_req_post(req, ev);
450 tevent_req_set_callback(subreq, g_lock_lock_cb_watch_data_done, req);
452 return req;
455 static void g_lock_lock_cb_watch_data_done_fn(
456 struct db_record *rec,
457 TDB_DATA value,
458 void *private_data)
460 struct tevent_req *req = talloc_get_type_abort(
461 private_data, struct tevent_req);
462 struct g_lock_lock_cb_watch_data_state *state = tevent_req_data(
463 req, struct g_lock_lock_cb_watch_data_state);
464 struct tevent_req *subreq = NULL;
465 struct g_lock lck;
466 bool ok;
468 ok = g_lock_parse(value.dptr, value.dsize, &lck);
469 if (!ok) {
470 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
471 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
472 return;
475 if (lck.unique_data_epoch != state->unique_data_epoch) {
476 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
477 DBG_DEBUG("lck.unique_data_epoch=%"PRIu64", "
478 "state->unique_data_epoch=%"PRIu64"\n",
479 lck.unique_data_epoch,
480 state->unique_data_epoch);
481 state->status = NT_STATUS_OK;
482 return;
486 * The lock epoch changed, so we better
487 * remove ourself from the waiter list
488 * (most likely the first position)
489 * and re-add us at the end of the list.
491 * This gives other lock waiters a change
492 * to make progress.
494 * Otherwise we'll keep our waiter instance alive,
495 * keep waiting (most likely at first position).
497 if (lck.unique_lock_epoch != state->unique_lock_epoch) {
498 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
499 state->watch_instance = dbwrap_watched_watch_add_instance(rec);
500 state->unique_lock_epoch = lck.unique_lock_epoch;
503 subreq = dbwrap_watched_watch_send(
504 state, state->ev, rec, state->watch_instance, state->blocker);
505 if (subreq == NULL) {
506 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
507 state->status = NT_STATUS_NO_MEMORY;
508 return;
510 tevent_req_set_callback(subreq, g_lock_lock_cb_watch_data_done, req);
512 state->status = NT_STATUS_EVENT_PENDING;
515 static void g_lock_lock_cb_watch_data_done(struct tevent_req *subreq)
517 struct tevent_req *req = tevent_req_callback_data(
518 subreq, struct tevent_req);
519 struct g_lock_lock_cb_watch_data_state *state = tevent_req_data(
520 req, struct g_lock_lock_cb_watch_data_state);
521 NTSTATUS status;
522 uint64_t instance = 0;
524 status = dbwrap_watched_watch_recv(
525 subreq, &instance, &state->blockerdead, &state->blocker);
526 TALLOC_FREE(subreq);
527 if (tevent_req_nterror(req, status)) {
528 DBG_DEBUG("dbwrap_watched_watch_recv returned %s\n",
529 nt_errstr(status));
530 return;
533 state->watch_instance = instance;
535 status = dbwrap_do_locked(
536 state->ctx->db, state->key, g_lock_lock_cb_watch_data_done_fn, req);
537 if (tevent_req_nterror(req, status)) {
538 DBG_DEBUG("dbwrap_do_locked returned %s\n", nt_errstr(status));
539 return;
541 if (NT_STATUS_EQUAL(state->status, NT_STATUS_EVENT_PENDING)) {
542 return;
544 if (tevent_req_nterror(req, state->status)) {
545 return;
547 tevent_req_done(req);
550 NTSTATUS g_lock_lock_cb_watch_data_recv(
551 struct tevent_req *req,
552 bool *blockerdead,
553 struct server_id *blocker)
555 struct g_lock_lock_cb_watch_data_state *state = tevent_req_data(
556 req, struct g_lock_lock_cb_watch_data_state);
557 NTSTATUS status;
559 if (tevent_req_is_nterror(req, &status)) {
560 return status;
562 if (blockerdead != NULL) {
563 *blockerdead = state->blockerdead;
565 if (blocker != NULL) {
566 *blocker = state->blocker;
569 return NT_STATUS_OK;
572 void g_lock_lock_cb_wake_watchers(struct g_lock_lock_cb_state *cb_state)
574 struct g_lock *lck = cb_state->lck;
576 lck->unique_data_epoch = generate_unique_u64(lck->unique_data_epoch);
577 cb_state->modified = true;
580 static NTSTATUS g_lock_lock_cb_run_and_store(struct g_lock_lock_cb_state *cb_state)
582 struct g_lock *lck = cb_state->lck;
583 NTSTATUS success_status = NT_STATUS_OK;
584 NTSTATUS status;
586 if (cb_state->cb_fn != NULL) {
588 SMB_ASSERT(lck->num_shared == 0);
589 SMB_ASSERT(cb_state->new_shared == NULL);
591 if (cb_state->ctx->lock_order != DBWRAP_LOCK_ORDER_NONE) {
592 const char *name = dbwrap_name(cb_state->ctx->db);
593 dbwrap_lock_order_lock(name, cb_state->ctx->lock_order);
596 cb_state->ctx->busy = true;
597 cb_state->cb_fn(cb_state, cb_state->cb_private);
598 cb_state->ctx->busy = false;
600 if (cb_state->ctx->lock_order != DBWRAP_LOCK_ORDER_NONE) {
601 const char *name = dbwrap_name(cb_state->ctx->db);
602 dbwrap_lock_order_unlock(name, cb_state->ctx->lock_order);
606 if (cb_state->unlock) {
608 * Unlocked should wake up watchers.
610 * We no longer need the lock, so
611 * force a wakeup of the next watchers,
612 * even if we don't do any update.
614 dbwrap_watched_watch_reset_alerting(cb_state->rec);
615 dbwrap_watched_watch_force_alerting(cb_state->rec);
616 if (!cb_state->modified) {
618 * The record was not changed at
619 * all, so we can also avoid
620 * storing the lck.unique_lock_epoch
621 * change
623 return NT_STATUS_WAS_UNLOCKED;
625 lck->exclusive = (struct server_id) { .pid = 0 };
626 cb_state->new_shared = NULL;
628 if (lck->datalen == 0) {
629 if (!cb_state->existed) {
630 return NT_STATUS_WAS_UNLOCKED;
633 status = dbwrap_record_delete(cb_state->rec);
634 if (!NT_STATUS_IS_OK(status)) {
635 DBG_WARNING("dbwrap_record_delete() failed: %s\n",
636 nt_errstr(status));
637 return status;
639 return NT_STATUS_WAS_UNLOCKED;
642 success_status = NT_STATUS_WAS_UNLOCKED;
645 status = g_lock_store(cb_state->rec,
646 cb_state->lck,
647 cb_state->new_shared,
648 NULL, 0);
649 if (!NT_STATUS_IS_OK(status)) {
650 DBG_WARNING("g_lock_store() failed: %s\n",
651 nt_errstr(status));
652 return status;
655 return success_status;
658 struct g_lock_lock_state {
659 struct tevent_context *ev;
660 struct g_lock_ctx *ctx;
661 TDB_DATA key;
662 enum g_lock_type type;
663 bool retry;
664 g_lock_lock_cb_fn_t cb_fn;
665 void *cb_private;
668 struct g_lock_lock_fn_state {
669 struct g_lock_lock_state *req_state;
670 struct server_id *dead_blocker;
672 struct tevent_req *watch_req;
673 uint64_t watch_instance;
674 NTSTATUS status;
677 static int g_lock_lock_state_destructor(struct g_lock_lock_state *s);
679 static NTSTATUS g_lock_trylock(
680 struct db_record *rec,
681 struct g_lock_lock_fn_state *state,
682 TDB_DATA data,
683 struct server_id *blocker)
685 struct g_lock_lock_state *req_state = state->req_state;
686 struct server_id self = messaging_server_id(req_state->ctx->msg);
687 enum g_lock_type type = req_state->type;
688 bool retry = req_state->retry;
689 struct g_lock lck = { .exclusive.pid = 0 };
690 struct g_lock_lock_cb_state cb_state = {
691 .ctx = req_state->ctx,
692 .rec = rec,
693 .lck = &lck,
694 .cb_fn = req_state->cb_fn,
695 .cb_private = req_state->cb_private,
696 .existed = data.dsize != 0,
697 .update_mem_ctx = talloc_tos(),
699 struct server_id_buf tmp;
700 NTSTATUS status;
701 bool ok;
703 ok = g_lock_parse(data.dptr, data.dsize, &lck);
704 if (!ok) {
705 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
706 DBG_DEBUG("g_lock_parse failed\n");
707 return NT_STATUS_INTERNAL_DB_CORRUPTION;
710 g_lock_cleanup_dead(&lck, state->dead_blocker);
712 lck.unique_lock_epoch = generate_unique_u64(lck.unique_lock_epoch);
714 if (lck.exclusive.pid != 0) {
715 bool self_exclusive = server_id_equal(&self, &lck.exclusive);
717 if (!self_exclusive) {
718 bool exists = serverid_exists(&lck.exclusive);
719 if (!exists) {
720 lck.exclusive = (struct server_id) { .pid=0 };
721 goto noexclusive;
724 DBG_DEBUG("%s has an exclusive lock\n",
725 server_id_str_buf(lck.exclusive, &tmp));
727 if (type == G_LOCK_DOWNGRADE) {
728 struct server_id_buf tmp2;
730 dbwrap_watched_watch_remove_instance(rec,
731 state->watch_instance);
733 DBG_DEBUG("%s: Trying to downgrade %s\n",
734 server_id_str_buf(self, &tmp),
735 server_id_str_buf(
736 lck.exclusive, &tmp2));
737 return NT_STATUS_NOT_LOCKED;
740 if (type == G_LOCK_UPGRADE) {
741 ssize_t shared_idx;
743 dbwrap_watched_watch_remove_instance(rec,
744 state->watch_instance);
746 shared_idx = g_lock_find_shared(&lck, &self);
748 if (shared_idx == -1) {
749 DBG_DEBUG("Trying to upgrade %s "
750 "without "
751 "existing shared lock\n",
752 server_id_str_buf(
753 self, &tmp));
754 return NT_STATUS_NOT_LOCKED;
758 * We're trying to upgrade, and the
759 * exclusive lock is taken by someone
760 * else. This means that someone else
761 * is waiting for us to give up our
762 * shared lock. If we now also wait
763 * for someone to give their shared
764 * lock, we will deadlock.
767 DBG_DEBUG("Trying to upgrade %s while "
768 "someone else is also "
769 "trying to upgrade\n",
770 server_id_str_buf(self, &tmp));
771 return NT_STATUS_POSSIBLE_DEADLOCK;
774 DBG_DEBUG("Waiting for lck.exclusive=%s\n",
775 server_id_str_buf(lck.exclusive, &tmp));
778 * We will return NT_STATUS_LOCK_NOT_GRANTED
779 * and need to monitor the record.
781 * If we don't have a watcher instance yet,
782 * we should add one.
784 if (state->watch_instance == 0) {
785 state->watch_instance =
786 dbwrap_watched_watch_add_instance(rec);
789 *blocker = lck.exclusive;
790 return NT_STATUS_LOCK_NOT_GRANTED;
793 if (type == G_LOCK_DOWNGRADE) {
794 DBG_DEBUG("Downgrading %s from WRITE to READ\n",
795 server_id_str_buf(self, &tmp));
797 lck.exclusive = (struct server_id) { .pid = 0 };
798 goto do_shared;
801 if (!retry) {
802 dbwrap_watched_watch_remove_instance(rec,
803 state->watch_instance);
805 DBG_DEBUG("%s already locked by self\n",
806 server_id_str_buf(self, &tmp));
807 return NT_STATUS_WAS_LOCKED;
810 g_lock_cleanup_shared(&lck);
812 if (lck.num_shared != 0) {
813 g_lock_get_shared(&lck, 0, blocker);
815 DBG_DEBUG("Continue waiting for shared lock %s\n",
816 server_id_str_buf(*blocker, &tmp));
819 * We will return NT_STATUS_LOCK_NOT_GRANTED
820 * and need to monitor the record.
822 * If we don't have a watcher instance yet,
823 * we should add one.
825 if (state->watch_instance == 0) {
826 state->watch_instance =
827 dbwrap_watched_watch_add_instance(rec);
830 return NT_STATUS_LOCK_NOT_GRANTED;
834 * Retry after a conflicting lock was released..
835 * All pending readers are gone so we got the lock...
837 goto got_lock;
840 noexclusive:
842 if (type == G_LOCK_UPGRADE) {
843 ssize_t shared_idx = g_lock_find_shared(&lck, &self);
845 if (shared_idx == -1) {
846 dbwrap_watched_watch_remove_instance(rec,
847 state->watch_instance);
849 DBG_DEBUG("Trying to upgrade %s without "
850 "existing shared lock\n",
851 server_id_str_buf(self, &tmp));
852 return NT_STATUS_NOT_LOCKED;
855 g_lock_del_shared(&lck, shared_idx);
856 type = G_LOCK_WRITE;
859 if (type == G_LOCK_WRITE) {
860 ssize_t shared_idx = g_lock_find_shared(&lck, &self);
862 if (shared_idx != -1) {
863 dbwrap_watched_watch_remove_instance(rec,
864 state->watch_instance);
865 DBG_DEBUG("Trying to writelock existing shared %s\n",
866 server_id_str_buf(self, &tmp));
867 return NT_STATUS_WAS_LOCKED;
870 lck.exclusive = self;
872 g_lock_cleanup_shared(&lck);
874 if (lck.num_shared == 0) {
876 * If we store ourself as exclusive writer,
877 * without any pending readers ...
879 goto got_lock;
882 if (state->watch_instance == 0) {
884 * Here we have lck.num_shared != 0.
886 * We will return NT_STATUS_LOCK_NOT_GRANTED
887 * below.
889 * And don't have a watcher instance yet!
891 * We add it here before g_lock_store()
892 * in order to trigger just one
893 * low level dbwrap_do_locked() call.
895 state->watch_instance =
896 dbwrap_watched_watch_add_instance(rec);
899 status = g_lock_store(rec, &lck, NULL, NULL, 0);
900 if (!NT_STATUS_IS_OK(status)) {
901 DBG_DEBUG("g_lock_store() failed: %s\n",
902 nt_errstr(status));
903 return status;
906 talloc_set_destructor(
907 req_state, g_lock_lock_state_destructor);
909 g_lock_get_shared(&lck, 0, blocker);
911 DBG_DEBUG("Waiting for %zu shared locks, "
912 "picking blocker %s\n",
913 lck.num_shared,
914 server_id_str_buf(*blocker, &tmp));
916 return NT_STATUS_LOCK_NOT_GRANTED;
919 do_shared:
921 g_lock_cleanup_shared(&lck);
922 cb_state.new_shared = &self;
923 goto got_lock;
925 got_lock:
927 * We got the lock we asked for, so we no
928 * longer need to monitor the record.
930 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
932 status = g_lock_lock_cb_run_and_store(&cb_state);
933 if (!NT_STATUS_IS_OK(status) &&
934 !NT_STATUS_EQUAL(status, NT_STATUS_WAS_UNLOCKED))
936 DBG_WARNING("g_lock_lock_cb_run_and_store() failed: %s\n",
937 nt_errstr(status));
938 return status;
941 talloc_set_destructor(req_state, NULL);
942 return status;
945 static void g_lock_lock_fn(
946 struct db_record *rec,
947 TDB_DATA value,
948 void *private_data)
950 struct g_lock_lock_fn_state *state = private_data;
951 struct server_id blocker = {0};
954 * We're trying to get a lock and if we are
955 * successful in doing that, we should not
956 * wakeup any other waiters, all they would
957 * find is that we're holding a lock they
958 * are conflicting with.
960 dbwrap_watched_watch_skip_alerting(rec);
962 state->status = g_lock_trylock(rec, state, value, &blocker);
963 if (!NT_STATUS_IS_OK(state->status)) {
964 DBG_DEBUG("g_lock_trylock returned %s\n",
965 nt_errstr(state->status));
967 if (!NT_STATUS_EQUAL(state->status, NT_STATUS_LOCK_NOT_GRANTED)) {
968 return;
971 state->watch_req = dbwrap_watched_watch_send(
972 state->req_state, state->req_state->ev, rec, state->watch_instance, blocker);
973 if (state->watch_req == NULL) {
974 state->status = NT_STATUS_NO_MEMORY;
978 static int g_lock_lock_state_destructor(struct g_lock_lock_state *s)
980 NTSTATUS status = g_lock_unlock(s->ctx, s->key);
981 if (!NT_STATUS_IS_OK(status)) {
982 DBG_DEBUG("g_lock_unlock failed: %s\n", nt_errstr(status));
984 return 0;
987 static void g_lock_lock_retry(struct tevent_req *subreq);
989 struct tevent_req *g_lock_lock_send(TALLOC_CTX *mem_ctx,
990 struct tevent_context *ev,
991 struct g_lock_ctx *ctx,
992 TDB_DATA key,
993 enum g_lock_type type,
994 g_lock_lock_cb_fn_t cb_fn,
995 void *cb_private)
997 struct tevent_req *req;
998 struct g_lock_lock_state *state;
999 struct g_lock_lock_fn_state fn_state;
1000 NTSTATUS status;
1001 bool ok;
1003 SMB_ASSERT(!ctx->busy);
1005 req = tevent_req_create(mem_ctx, &state, struct g_lock_lock_state);
1006 if (req == NULL) {
1007 return NULL;
1009 state->ev = ev;
1010 state->ctx = ctx;
1011 state->key = key;
1012 state->type = type;
1013 state->cb_fn = cb_fn;
1014 state->cb_private = cb_private;
1016 fn_state = (struct g_lock_lock_fn_state) {
1017 .req_state = state,
1021 * We allow a cn_fn only for G_LOCK_WRITE for now.
1023 * It's all we currently need and it makes a few things
1024 * easier to implement.
1026 if (unlikely(cb_fn != NULL && type != G_LOCK_WRITE)) {
1027 tevent_req_nterror(req, NT_STATUS_INVALID_PARAMETER_6);
1028 return tevent_req_post(req, ev);
1031 status = dbwrap_do_locked(ctx->db, key, g_lock_lock_fn, &fn_state);
1032 if (tevent_req_nterror(req, status)) {
1033 DBG_DEBUG("dbwrap_do_locked failed: %s\n",
1034 nt_errstr(status));
1035 return tevent_req_post(req, ev);
1038 if (NT_STATUS_IS_OK(fn_state.status)) {
1039 tevent_req_done(req);
1040 return tevent_req_post(req, ev);
1042 if (!NT_STATUS_EQUAL(fn_state.status, NT_STATUS_LOCK_NOT_GRANTED)) {
1043 tevent_req_nterror(req, fn_state.status);
1044 return tevent_req_post(req, ev);
1047 if (tevent_req_nomem(fn_state.watch_req, req)) {
1048 return tevent_req_post(req, ev);
1051 ok = tevent_req_set_endtime(
1052 fn_state.watch_req,
1053 state->ev,
1054 timeval_current_ofs(5 + generate_random() % 5, 0));
1055 if (!ok) {
1056 tevent_req_oom(req);
1057 return tevent_req_post(req, ev);
1059 tevent_req_set_callback(fn_state.watch_req, g_lock_lock_retry, req);
1061 return req;
1064 static void g_lock_lock_retry(struct tevent_req *subreq)
1066 struct tevent_req *req = tevent_req_callback_data(
1067 subreq, struct tevent_req);
1068 struct g_lock_lock_state *state = tevent_req_data(
1069 req, struct g_lock_lock_state);
1070 struct g_lock_lock_fn_state fn_state;
1071 struct server_id blocker = { .pid = 0 };
1072 bool blockerdead = false;
1073 NTSTATUS status;
1074 uint64_t instance = 0;
1076 status = dbwrap_watched_watch_recv(subreq, &instance, &blockerdead, &blocker);
1077 DBG_DEBUG("watch_recv returned %s\n", nt_errstr(status));
1078 TALLOC_FREE(subreq);
1080 if (!NT_STATUS_IS_OK(status) &&
1081 !NT_STATUS_EQUAL(status, NT_STATUS_IO_TIMEOUT)) {
1082 tevent_req_nterror(req, status);
1083 return;
1086 state->retry = true;
1088 fn_state = (struct g_lock_lock_fn_state) {
1089 .req_state = state,
1090 .dead_blocker = blockerdead ? &blocker : NULL,
1091 .watch_instance = instance,
1094 status = dbwrap_do_locked(state->ctx->db, state->key,
1095 g_lock_lock_fn, &fn_state);
1096 if (tevent_req_nterror(req, status)) {
1097 DBG_DEBUG("dbwrap_do_locked failed: %s\n",
1098 nt_errstr(status));
1099 return;
1102 if (NT_STATUS_IS_OK(fn_state.status)) {
1103 tevent_req_done(req);
1104 return;
1106 if (!NT_STATUS_EQUAL(fn_state.status, NT_STATUS_LOCK_NOT_GRANTED)) {
1107 tevent_req_nterror(req, fn_state.status);
1108 return;
1111 if (tevent_req_nomem(fn_state.watch_req, req)) {
1112 return;
1115 if (!tevent_req_set_endtime(
1116 fn_state.watch_req, state->ev,
1117 timeval_current_ofs(5 + generate_random() % 5, 0))) {
1118 return;
1120 tevent_req_set_callback(fn_state.watch_req, g_lock_lock_retry, req);
1123 NTSTATUS g_lock_lock_recv(struct tevent_req *req)
1125 struct g_lock_lock_state *state = tevent_req_data(
1126 req, struct g_lock_lock_state);
1127 struct g_lock_ctx *ctx = state->ctx;
1128 NTSTATUS status;
1130 if (tevent_req_is_nterror(req, &status)) {
1131 if (NT_STATUS_EQUAL(status, NT_STATUS_WAS_UNLOCKED)) {
1132 return NT_STATUS_OK;
1134 return status;
1137 if ((ctx->lock_order != DBWRAP_LOCK_ORDER_NONE) &&
1138 ((state->type == G_LOCK_READ) ||
1139 (state->type == G_LOCK_WRITE))) {
1140 const char *name = dbwrap_name(ctx->db);
1141 dbwrap_lock_order_lock(name, ctx->lock_order);
1144 return NT_STATUS_OK;
1147 struct g_lock_lock_simple_state {
1148 struct g_lock_ctx *ctx;
1149 struct server_id me;
1150 enum g_lock_type type;
1151 NTSTATUS status;
1152 g_lock_lock_cb_fn_t cb_fn;
1153 void *cb_private;
1156 static void g_lock_lock_simple_fn(
1157 struct db_record *rec,
1158 TDB_DATA value,
1159 void *private_data)
1161 struct g_lock_lock_simple_state *state = private_data;
1162 struct server_id_buf buf;
1163 struct g_lock lck = { .exclusive.pid = 0 };
1164 struct g_lock_lock_cb_state cb_state = {
1165 .ctx = state->ctx,
1166 .rec = rec,
1167 .lck = &lck,
1168 .cb_fn = state->cb_fn,
1169 .cb_private = state->cb_private,
1170 .existed = value.dsize != 0,
1171 .update_mem_ctx = talloc_tos(),
1173 bool ok;
1175 ok = g_lock_parse(value.dptr, value.dsize, &lck);
1176 if (!ok) {
1177 DBG_DEBUG("g_lock_parse failed\n");
1178 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1179 return;
1182 if (lck.exclusive.pid != 0) {
1183 DBG_DEBUG("locked by %s\n",
1184 server_id_str_buf(lck.exclusive, &buf));
1185 goto not_granted;
1188 if (state->type == G_LOCK_WRITE) {
1189 if (lck.num_shared != 0) {
1190 DBG_DEBUG("num_shared=%zu\n", lck.num_shared);
1191 goto not_granted;
1193 lck.exclusive = state->me;
1194 } else if (state->type == G_LOCK_READ) {
1195 g_lock_cleanup_shared(&lck);
1196 cb_state.new_shared = &state->me;
1197 } else {
1198 smb_panic(__location__);
1201 lck.unique_lock_epoch = generate_unique_u64(lck.unique_lock_epoch);
1204 * We are going to store us as owner,
1205 * so we got what we were waiting for.
1207 * So we no longer need to monitor the
1208 * record.
1210 dbwrap_watched_watch_skip_alerting(rec);
1212 state->status = g_lock_lock_cb_run_and_store(&cb_state);
1213 if (!NT_STATUS_IS_OK(state->status) &&
1214 !NT_STATUS_EQUAL(state->status, NT_STATUS_WAS_UNLOCKED))
1216 DBG_WARNING("g_lock_lock_cb_run_and_store() failed: %s\n",
1217 nt_errstr(state->status));
1218 return;
1221 return;
1223 not_granted:
1224 state->status = NT_STATUS_LOCK_NOT_GRANTED;
1227 NTSTATUS g_lock_lock(struct g_lock_ctx *ctx, TDB_DATA key,
1228 enum g_lock_type type, struct timeval timeout,
1229 g_lock_lock_cb_fn_t cb_fn,
1230 void *cb_private)
1232 TALLOC_CTX *frame;
1233 struct tevent_context *ev;
1234 struct tevent_req *req;
1235 struct timeval end;
1236 NTSTATUS status;
1238 SMB_ASSERT(!ctx->busy);
1241 * We allow a cn_fn only for G_LOCK_WRITE for now.
1243 * It's all we currently need and it makes a few things
1244 * easier to implement.
1246 if (unlikely(cb_fn != NULL && type != G_LOCK_WRITE)) {
1247 return NT_STATUS_INVALID_PARAMETER_5;
1250 if ((type == G_LOCK_READ) || (type == G_LOCK_WRITE)) {
1252 * This is an abstraction violation: Normally we do
1253 * the sync wrappers around async functions with full
1254 * nested event contexts. However, this is used in
1255 * very hot code paths, so avoid the event context
1256 * creation for the good path where there's no lock
1257 * contention. My benchmark gave a factor of 2
1258 * improvement for lock/unlock.
1260 struct g_lock_lock_simple_state state = {
1261 .ctx = ctx,
1262 .me = messaging_server_id(ctx->msg),
1263 .type = type,
1264 .cb_fn = cb_fn,
1265 .cb_private = cb_private,
1267 status = dbwrap_do_locked(
1268 ctx->db, key, g_lock_lock_simple_fn, &state);
1269 if (!NT_STATUS_IS_OK(status)) {
1270 DBG_DEBUG("dbwrap_do_locked() failed: %s\n",
1271 nt_errstr(status));
1272 return status;
1275 DBG_DEBUG("status=%s, state.status=%s\n",
1276 nt_errstr(status),
1277 nt_errstr(state.status));
1279 if (NT_STATUS_IS_OK(state.status)) {
1280 if (ctx->lock_order != DBWRAP_LOCK_ORDER_NONE) {
1281 const char *name = dbwrap_name(ctx->db);
1282 dbwrap_lock_order_lock(name, ctx->lock_order);
1284 return NT_STATUS_OK;
1286 if (NT_STATUS_EQUAL(state.status, NT_STATUS_WAS_UNLOCKED)) {
1287 /* without dbwrap_lock_order_lock() */
1288 return NT_STATUS_OK;
1290 if (!NT_STATUS_EQUAL(
1291 state.status, NT_STATUS_LOCK_NOT_GRANTED)) {
1292 return state.status;
1295 if (timeval_is_zero(&timeout)) {
1296 return NT_STATUS_LOCK_NOT_GRANTED;
1300 * Fall back to the full g_lock_trylock logic,
1301 * g_lock_lock_simple_fn() called above only covers
1302 * the uncontended path.
1306 frame = talloc_stackframe();
1307 status = NT_STATUS_NO_MEMORY;
1309 ev = samba_tevent_context_init(frame);
1310 if (ev == NULL) {
1311 goto fail;
1313 req = g_lock_lock_send(frame, ev, ctx, key, type, cb_fn, cb_private);
1314 if (req == NULL) {
1315 goto fail;
1317 end = timeval_current_ofs(timeout.tv_sec, timeout.tv_usec);
1318 if (!tevent_req_set_endtime(req, ev, end)) {
1319 goto fail;
1321 if (!tevent_req_poll_ntstatus(req, ev, &status)) {
1322 goto fail;
1324 status = g_lock_lock_recv(req);
1325 fail:
1326 TALLOC_FREE(frame);
1327 return status;
1330 struct g_lock_unlock_state {
1331 struct server_id self;
1332 NTSTATUS status;
1335 static void g_lock_unlock_fn(
1336 struct db_record *rec,
1337 TDB_DATA value,
1338 void *private_data)
1340 struct g_lock_unlock_state *state = private_data;
1341 struct server_id_buf tmp1, tmp2;
1342 struct g_lock lck;
1343 size_t i;
1344 bool ok, exclusive;
1346 ok = g_lock_parse(value.dptr, value.dsize, &lck);
1347 if (!ok) {
1348 DBG_DEBUG("g_lock_parse() failed\n");
1349 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1350 return;
1353 exclusive = server_id_equal(&state->self, &lck.exclusive);
1355 for (i=0; i<lck.num_shared; i++) {
1356 struct server_id shared;
1357 g_lock_get_shared(&lck, i, &shared);
1358 if (server_id_equal(&state->self, &shared)) {
1359 break;
1363 if (i < lck.num_shared) {
1364 if (exclusive) {
1365 DBG_DEBUG("%s both exclusive and shared (%zu)\n",
1366 server_id_str_buf(state->self, &tmp1),
1368 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1369 return;
1371 g_lock_del_shared(&lck, i);
1372 } else {
1373 if (!exclusive) {
1374 DBG_DEBUG("Lock not found, self=%s, lck.exclusive=%s, "
1375 "num_shared=%zu\n",
1376 server_id_str_buf(state->self, &tmp1),
1377 server_id_str_buf(lck.exclusive, &tmp2),
1378 lck.num_shared);
1379 state->status = NT_STATUS_NOT_FOUND;
1380 return;
1382 lck.exclusive = (struct server_id) { .pid = 0 };
1385 if ((lck.exclusive.pid == 0) &&
1386 (lck.num_shared == 0) &&
1387 (lck.datalen == 0)) {
1388 state->status = dbwrap_record_delete(rec);
1389 return;
1392 if (!exclusive && lck.exclusive.pid != 0) {
1394 * We only had a read lock and there's
1395 * someone waiting for an exclusive lock.
1397 * Don't alert the exclusive lock waiter
1398 * if there are still other read lock holders.
1400 g_lock_cleanup_shared(&lck);
1401 if (lck.num_shared != 0) {
1402 dbwrap_watched_watch_skip_alerting(rec);
1406 lck.unique_lock_epoch = generate_unique_u64(lck.unique_lock_epoch);
1408 state->status = g_lock_store(rec, &lck, NULL, NULL, 0);
1411 NTSTATUS g_lock_unlock(struct g_lock_ctx *ctx, TDB_DATA key)
1413 struct g_lock_unlock_state state = {
1414 .self = messaging_server_id(ctx->msg),
1416 NTSTATUS status;
1418 SMB_ASSERT(!ctx->busy);
1420 status = dbwrap_do_locked(ctx->db, key, g_lock_unlock_fn, &state);
1421 if (!NT_STATUS_IS_OK(status)) {
1422 DBG_WARNING("dbwrap_do_locked failed: %s\n",
1423 nt_errstr(status));
1424 return status;
1426 if (!NT_STATUS_IS_OK(state.status)) {
1427 DBG_WARNING("g_lock_unlock_fn failed: %s\n",
1428 nt_errstr(state.status));
1429 return state.status;
1432 if (ctx->lock_order != DBWRAP_LOCK_ORDER_NONE) {
1433 const char *name = dbwrap_name(ctx->db);
1434 dbwrap_lock_order_unlock(name, ctx->lock_order);
1437 return NT_STATUS_OK;
1440 struct g_lock_writev_data_state {
1441 TDB_DATA key;
1442 struct server_id self;
1443 const TDB_DATA *dbufs;
1444 size_t num_dbufs;
1445 NTSTATUS status;
1448 static void g_lock_writev_data_fn(
1449 struct db_record *rec,
1450 TDB_DATA value,
1451 void *private_data)
1453 struct g_lock_writev_data_state *state = private_data;
1454 struct g_lock lck;
1455 bool exclusive;
1456 bool ok;
1459 * We're holding an exclusive write lock.
1461 * Now we're updating the content of the record.
1463 * We should not wakeup any other waiters, all they
1464 * would find is that we're still holding a lock they
1465 * are conflicting with.
1467 dbwrap_watched_watch_skip_alerting(rec);
1469 ok = g_lock_parse(value.dptr, value.dsize, &lck);
1470 if (!ok) {
1471 DBG_DEBUG("g_lock_parse for %s failed\n",
1472 tdb_data_dbg(state->key));
1473 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1474 return;
1477 exclusive = server_id_equal(&state->self, &lck.exclusive);
1480 * Make sure we're really exclusive. We are marked as
1481 * exclusive when we are waiting for an exclusive lock
1483 exclusive &= (lck.num_shared == 0);
1485 if (!exclusive) {
1486 struct server_id_buf buf1, buf2;
1487 DBG_DEBUG("Not locked by us: self=%s, lck.exclusive=%s, "
1488 "lck.num_shared=%zu\n",
1489 server_id_str_buf(state->self, &buf1),
1490 server_id_str_buf(lck.exclusive, &buf2),
1491 lck.num_shared);
1492 state->status = NT_STATUS_NOT_LOCKED;
1493 return;
1496 lck.unique_data_epoch = generate_unique_u64(lck.unique_data_epoch);
1497 lck.data = NULL;
1498 lck.datalen = 0;
1499 state->status = g_lock_store(
1500 rec, &lck, NULL, state->dbufs, state->num_dbufs);
1503 NTSTATUS g_lock_writev_data(
1504 struct g_lock_ctx *ctx,
1505 TDB_DATA key,
1506 const TDB_DATA *dbufs,
1507 size_t num_dbufs)
1509 struct g_lock_writev_data_state state = {
1510 .key = key,
1511 .self = messaging_server_id(ctx->msg),
1512 .dbufs = dbufs,
1513 .num_dbufs = num_dbufs,
1515 NTSTATUS status;
1517 SMB_ASSERT(!ctx->busy);
1519 status = dbwrap_do_locked(
1520 ctx->db, key, g_lock_writev_data_fn, &state);
1521 if (!NT_STATUS_IS_OK(status)) {
1522 DBG_WARNING("dbwrap_do_locked failed: %s\n",
1523 nt_errstr(status));
1524 return status;
1526 if (!NT_STATUS_IS_OK(state.status)) {
1527 DBG_WARNING("g_lock_writev_data_fn failed: %s\n",
1528 nt_errstr(state.status));
1529 return state.status;
1532 return NT_STATUS_OK;
1535 NTSTATUS g_lock_write_data(struct g_lock_ctx *ctx, TDB_DATA key,
1536 const uint8_t *buf, size_t buflen)
1538 TDB_DATA dbuf = {
1539 .dptr = discard_const_p(uint8_t, buf),
1540 .dsize = buflen,
1542 return g_lock_writev_data(ctx, key, &dbuf, 1);
1545 struct g_lock_locks_state {
1546 int (*fn)(TDB_DATA key, void *private_data);
1547 void *private_data;
1550 static int g_lock_locks_fn(struct db_record *rec, void *priv)
1552 TDB_DATA key;
1553 struct g_lock_locks_state *state = (struct g_lock_locks_state *)priv;
1555 key = dbwrap_record_get_key(rec);
1556 return state->fn(key, state->private_data);
1559 int g_lock_locks(struct g_lock_ctx *ctx,
1560 int (*fn)(TDB_DATA key, void *private_data),
1561 void *private_data)
1563 struct g_lock_locks_state state;
1564 NTSTATUS status;
1565 int count;
1567 SMB_ASSERT(!ctx->busy);
1569 state.fn = fn;
1570 state.private_data = private_data;
1572 status = dbwrap_traverse_read(ctx->db, g_lock_locks_fn, &state, &count);
1573 if (!NT_STATUS_IS_OK(status)) {
1574 return -1;
1576 return count;
1579 struct g_lock_dump_state {
1580 TALLOC_CTX *mem_ctx;
1581 TDB_DATA key;
1582 void (*fn)(struct server_id exclusive,
1583 size_t num_shared,
1584 const struct server_id *shared,
1585 const uint8_t *data,
1586 size_t datalen,
1587 void *private_data);
1588 void *private_data;
1589 NTSTATUS status;
1590 enum dbwrap_req_state req_state;
1593 static void g_lock_dump_fn(TDB_DATA key, TDB_DATA data,
1594 void *private_data)
1596 struct g_lock_dump_state *state = private_data;
1597 struct g_lock lck = (struct g_lock) { .exclusive.pid = 0 };
1598 struct server_id *shared = NULL;
1599 size_t i;
1600 bool ok;
1602 ok = g_lock_parse(data.dptr, data.dsize, &lck);
1603 if (!ok) {
1604 DBG_DEBUG("g_lock_parse failed for %s\n",
1605 tdb_data_dbg(state->key));
1606 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1607 return;
1610 if (lck.num_shared > 0) {
1611 shared = talloc_array(
1612 state->mem_ctx, struct server_id, lck.num_shared);
1613 if (shared == NULL) {
1614 DBG_DEBUG("talloc failed\n");
1615 state->status = NT_STATUS_NO_MEMORY;
1616 return;
1620 for (i=0; i<lck.num_shared; i++) {
1621 g_lock_get_shared(&lck, i, &shared[i]);
1624 state->fn(lck.exclusive,
1625 lck.num_shared,
1626 shared,
1627 lck.data,
1628 lck.datalen,
1629 state->private_data);
1631 TALLOC_FREE(shared);
1633 state->status = NT_STATUS_OK;
1636 NTSTATUS g_lock_dump(struct g_lock_ctx *ctx, TDB_DATA key,
1637 void (*fn)(struct server_id exclusive,
1638 size_t num_shared,
1639 const struct server_id *shared,
1640 const uint8_t *data,
1641 size_t datalen,
1642 void *private_data),
1643 void *private_data)
1645 struct g_lock_dump_state state = {
1646 .mem_ctx = ctx, .key = key,
1647 .fn = fn, .private_data = private_data
1649 NTSTATUS status;
1651 SMB_ASSERT(!ctx->busy);
1653 status = dbwrap_parse_record(ctx->db, key, g_lock_dump_fn, &state);
1654 if (!NT_STATUS_IS_OK(status)) {
1655 DBG_DEBUG("dbwrap_parse_record returned %s\n",
1656 nt_errstr(status));
1657 return status;
1659 if (!NT_STATUS_IS_OK(state.status)) {
1660 DBG_DEBUG("g_lock_dump_fn returned %s\n",
1661 nt_errstr(state.status));
1662 return state.status;
1664 return NT_STATUS_OK;
1667 static void g_lock_dump_done(struct tevent_req *subreq);
1669 struct tevent_req *g_lock_dump_send(
1670 TALLOC_CTX *mem_ctx,
1671 struct tevent_context *ev,
1672 struct g_lock_ctx *ctx,
1673 TDB_DATA key,
1674 void (*fn)(struct server_id exclusive,
1675 size_t num_shared,
1676 const struct server_id *shared,
1677 const uint8_t *data,
1678 size_t datalen,
1679 void *private_data),
1680 void *private_data)
1682 struct tevent_req *req = NULL, *subreq = NULL;
1683 struct g_lock_dump_state *state = NULL;
1685 SMB_ASSERT(!ctx->busy);
1687 req = tevent_req_create(mem_ctx, &state, struct g_lock_dump_state);
1688 if (req == NULL) {
1689 return NULL;
1691 state->mem_ctx = state;
1692 state->key = key;
1693 state->fn = fn;
1694 state->private_data = private_data;
1696 SMB_ASSERT(!ctx->busy);
1698 subreq = dbwrap_parse_record_send(
1699 state,
1701 ctx->db,
1702 key,
1703 g_lock_dump_fn,
1704 state,
1705 &state->req_state);
1706 if (tevent_req_nomem(subreq, req)) {
1707 return tevent_req_post(req, ev);
1709 tevent_req_set_callback(subreq, g_lock_dump_done, req);
1710 return req;
1713 static void g_lock_dump_done(struct tevent_req *subreq)
1715 struct tevent_req *req = tevent_req_callback_data(
1716 subreq, struct tevent_req);
1717 struct g_lock_dump_state *state = tevent_req_data(
1718 req, struct g_lock_dump_state);
1719 NTSTATUS status;
1721 status = dbwrap_parse_record_recv(subreq);
1722 TALLOC_FREE(subreq);
1723 if (tevent_req_nterror(req, status) ||
1724 tevent_req_nterror(req, state->status)) {
1725 return;
1727 tevent_req_done(req);
1730 NTSTATUS g_lock_dump_recv(struct tevent_req *req)
1732 return tevent_req_simple_recv_ntstatus(req);
1735 int g_lock_seqnum(struct g_lock_ctx *ctx)
1737 return dbwrap_get_seqnum(ctx->db);
1740 struct g_lock_watch_data_state {
1741 struct tevent_context *ev;
1742 struct g_lock_ctx *ctx;
1743 TDB_DATA key;
1744 struct server_id blocker;
1745 bool blockerdead;
1746 uint64_t unique_lock_epoch;
1747 uint64_t unique_data_epoch;
1748 uint64_t watch_instance;
1749 NTSTATUS status;
1752 static void g_lock_watch_data_done(struct tevent_req *subreq);
1754 static void g_lock_watch_data_send_fn(
1755 struct db_record *rec,
1756 TDB_DATA value,
1757 void *private_data)
1759 struct tevent_req *req = talloc_get_type_abort(
1760 private_data, struct tevent_req);
1761 struct g_lock_watch_data_state *state = tevent_req_data(
1762 req, struct g_lock_watch_data_state);
1763 struct tevent_req *subreq = NULL;
1764 struct g_lock lck;
1765 bool ok;
1767 ok = g_lock_parse(value.dptr, value.dsize, &lck);
1768 if (!ok) {
1769 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1770 return;
1772 state->unique_lock_epoch = lck.unique_lock_epoch;
1773 state->unique_data_epoch = lck.unique_data_epoch;
1775 DBG_DEBUG("state->unique_data_epoch=%"PRIu64"\n", state->unique_data_epoch);
1777 subreq = dbwrap_watched_watch_send(
1778 state, state->ev, rec, 0, state->blocker);
1779 if (subreq == NULL) {
1780 state->status = NT_STATUS_NO_MEMORY;
1781 return;
1783 tevent_req_set_callback(subreq, g_lock_watch_data_done, req);
1785 state->status = NT_STATUS_EVENT_PENDING;
1788 struct tevent_req *g_lock_watch_data_send(
1789 TALLOC_CTX *mem_ctx,
1790 struct tevent_context *ev,
1791 struct g_lock_ctx *ctx,
1792 TDB_DATA key,
1793 struct server_id blocker)
1795 struct tevent_req *req = NULL;
1796 struct g_lock_watch_data_state *state = NULL;
1797 NTSTATUS status;
1799 SMB_ASSERT(!ctx->busy);
1801 req = tevent_req_create(
1802 mem_ctx, &state, struct g_lock_watch_data_state);
1803 if (req == NULL) {
1804 return NULL;
1806 state->ev = ev;
1807 state->ctx = ctx;
1808 state->blocker = blocker;
1810 state->key = tdb_data_talloc_copy(state, key);
1811 if (tevent_req_nomem(state->key.dptr, req)) {
1812 return tevent_req_post(req, ev);
1815 status = dbwrap_do_locked(
1816 ctx->db, key, g_lock_watch_data_send_fn, req);
1817 if (tevent_req_nterror(req, status)) {
1818 DBG_DEBUG("dbwrap_do_locked returned %s\n", nt_errstr(status));
1819 return tevent_req_post(req, ev);
1822 if (NT_STATUS_EQUAL(state->status, NT_STATUS_EVENT_PENDING)) {
1823 return req;
1825 if (tevent_req_nterror(req, state->status)) {
1826 return tevent_req_post(req, ev);
1828 tevent_req_done(req);
1829 return tevent_req_post(req, ev);
1832 static void g_lock_watch_data_done_fn(
1833 struct db_record *rec,
1834 TDB_DATA value,
1835 void *private_data)
1837 struct tevent_req *req = talloc_get_type_abort(
1838 private_data, struct tevent_req);
1839 struct g_lock_watch_data_state *state = tevent_req_data(
1840 req, struct g_lock_watch_data_state);
1841 struct tevent_req *subreq = NULL;
1842 struct g_lock lck;
1843 bool ok;
1845 ok = g_lock_parse(value.dptr, value.dsize, &lck);
1846 if (!ok) {
1847 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
1848 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1849 return;
1852 if (lck.unique_data_epoch != state->unique_data_epoch) {
1853 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
1854 DBG_DEBUG("lck.unique_data_epoch=%"PRIu64", "
1855 "state->unique_data_epoch=%"PRIu64"\n",
1856 lck.unique_data_epoch,
1857 state->unique_data_epoch);
1858 state->status = NT_STATUS_OK;
1859 return;
1863 * The lock epoch changed, so we better
1864 * remove ourself from the waiter list
1865 * (most likely the first position)
1866 * and re-add us at the end of the list.
1868 * This gives other lock waiters a change
1869 * to make progress.
1871 * Otherwise we'll keep our waiter instance alive,
1872 * keep waiting (most likely at first position).
1874 if (lck.unique_lock_epoch != state->unique_lock_epoch) {
1875 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
1876 state->watch_instance = dbwrap_watched_watch_add_instance(rec);
1877 state->unique_lock_epoch = lck.unique_lock_epoch;
1880 subreq = dbwrap_watched_watch_send(
1881 state, state->ev, rec, state->watch_instance, state->blocker);
1882 if (subreq == NULL) {
1883 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
1884 state->status = NT_STATUS_NO_MEMORY;
1885 return;
1887 tevent_req_set_callback(subreq, g_lock_watch_data_done, req);
1889 state->status = NT_STATUS_EVENT_PENDING;
1892 static void g_lock_watch_data_done(struct tevent_req *subreq)
1894 struct tevent_req *req = tevent_req_callback_data(
1895 subreq, struct tevent_req);
1896 struct g_lock_watch_data_state *state = tevent_req_data(
1897 req, struct g_lock_watch_data_state);
1898 NTSTATUS status;
1899 uint64_t instance = 0;
1901 status = dbwrap_watched_watch_recv(
1902 subreq, &instance, &state->blockerdead, &state->blocker);
1903 TALLOC_FREE(subreq);
1904 if (tevent_req_nterror(req, status)) {
1905 DBG_DEBUG("dbwrap_watched_watch_recv returned %s\n",
1906 nt_errstr(status));
1907 return;
1910 state->watch_instance = instance;
1912 status = dbwrap_do_locked(
1913 state->ctx->db, state->key, g_lock_watch_data_done_fn, req);
1914 if (tevent_req_nterror(req, status)) {
1915 DBG_DEBUG("dbwrap_do_locked returned %s\n", nt_errstr(status));
1916 return;
1918 if (NT_STATUS_EQUAL(state->status, NT_STATUS_EVENT_PENDING)) {
1919 return;
1921 if (tevent_req_nterror(req, state->status)) {
1922 return;
1924 tevent_req_done(req);
1927 NTSTATUS g_lock_watch_data_recv(
1928 struct tevent_req *req,
1929 bool *blockerdead,
1930 struct server_id *blocker)
1932 struct g_lock_watch_data_state *state = tevent_req_data(
1933 req, struct g_lock_watch_data_state);
1934 NTSTATUS status;
1936 if (tevent_req_is_nterror(req, &status)) {
1937 return status;
1939 if (blockerdead != NULL) {
1940 *blockerdead = state->blockerdead;
1942 if (blocker != NULL) {
1943 *blocker = state->blocker;
1946 return NT_STATUS_OK;
1949 static void g_lock_wake_watchers_fn(
1950 struct db_record *rec,
1951 TDB_DATA value,
1952 void *private_data)
1954 struct g_lock lck = { .exclusive.pid = 0 };
1955 NTSTATUS status;
1956 bool ok;
1958 ok = g_lock_parse(value.dptr, value.dsize, &lck);
1959 if (!ok) {
1960 DBG_WARNING("g_lock_parse failed\n");
1961 return;
1964 lck.unique_data_epoch = generate_unique_u64(lck.unique_data_epoch);
1966 status = g_lock_store(rec, &lck, NULL, NULL, 0);
1967 if (!NT_STATUS_IS_OK(status)) {
1968 DBG_WARNING("g_lock_store failed: %s\n", nt_errstr(status));
1969 return;
1973 void g_lock_wake_watchers(struct g_lock_ctx *ctx, TDB_DATA key)
1975 NTSTATUS status;
1977 SMB_ASSERT(!ctx->busy);
1979 status = dbwrap_do_locked(ctx->db, key, g_lock_wake_watchers_fn, NULL);
1980 if (!NT_STATUS_IS_OK(status)) {
1981 DBG_DEBUG("dbwrap_do_locked returned %s\n",
1982 nt_errstr(status));