2 Unix SMB/CIFS implementation.
3 global locks based on dbwrap and messaging
4 Copyright (C) 2009 by Volker Lendecke
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/filesys.h"
22 #include "lib/util/server_id.h"
23 #include "lib/util/debug.h"
24 #include "lib/util/talloc_stack.h"
25 #include "lib/util/samba_util.h"
26 #include "lib/util_path.h"
27 #include "dbwrap/dbwrap.h"
28 #include "dbwrap/dbwrap_open.h"
29 #include "dbwrap/dbwrap_watch.h"
32 #include "../lib/util/tevent_ntstatus.h"
37 struct db_context
*db
;
38 struct messaging_context
*msg
;
39 enum dbwrap_lock_order lock_order
;
43 struct server_id exclusive
;
46 uint64_t unique_data_epoch
;
51 static bool g_lock_parse(uint8_t *buf
, size_t buflen
, struct g_lock
*lck
)
53 struct server_id exclusive
;
54 size_t num_shared
, shared_len
;
55 uint64_t unique_data_epoch
;
57 if (buflen
< (SERVER_ID_BUF_LENGTH
+ /* exclusive */
58 sizeof(uint64_t) + /* seqnum */
59 sizeof(uint32_t))) { /* num_shared */
62 .unique_data_epoch
= generate_unique_u64(0),
68 server_id_get(&exclusive
, buf
);
69 buf
+= SERVER_ID_BUF_LENGTH
;
70 buflen
-= SERVER_ID_BUF_LENGTH
;
72 unique_data_epoch
= BVAL(buf
, 0);
73 buf
+= sizeof(uint64_t);
74 buflen
-= sizeof(uint64_t);
76 num_shared
= IVAL(buf
, 0);
77 buf
+= sizeof(uint32_t);
78 buflen
-= sizeof(uint32_t);
80 if (num_shared
> buflen
/SERVER_ID_BUF_LENGTH
) {
81 DBG_DEBUG("num_shared=%zu, buflen=%zu\n",
87 shared_len
= num_shared
* SERVER_ID_BUF_LENGTH
;
89 *lck
= (struct g_lock
) {
90 .exclusive
= exclusive
,
91 .num_shared
= num_shared
,
93 .unique_data_epoch
= unique_data_epoch
,
94 .datalen
= buflen
-shared_len
,
95 .data
= buf
+shared_len
,
101 static void g_lock_get_shared(const struct g_lock
*lck
,
103 struct server_id
*shared
)
105 if (i
>= lck
->num_shared
) {
108 server_id_get(shared
, lck
->shared
+ i
*SERVER_ID_BUF_LENGTH
);
111 static void g_lock_del_shared(struct g_lock
*lck
, size_t i
)
113 if (i
>= lck
->num_shared
) {
116 lck
->num_shared
-= 1;
117 if (i
< lck
->num_shared
) {
118 memcpy(lck
->shared
+ i
*SERVER_ID_BUF_LENGTH
,
119 lck
->shared
+ lck
->num_shared
*SERVER_ID_BUF_LENGTH
,
120 SERVER_ID_BUF_LENGTH
);
124 static NTSTATUS
g_lock_store(
125 struct db_record
*rec
,
127 struct server_id
*new_shared
,
128 const TDB_DATA
*new_dbufs
,
129 size_t num_new_dbufs
)
131 uint8_t exclusive
[SERVER_ID_BUF_LENGTH
];
132 uint8_t seqnum_buf
[sizeof(uint64_t)];
133 uint8_t sizebuf
[sizeof(uint32_t)];
134 uint8_t new_shared_buf
[SERVER_ID_BUF_LENGTH
];
136 struct TDB_DATA dbufs
[6 + num_new_dbufs
];
138 dbufs
[0] = (TDB_DATA
) {
139 .dptr
= exclusive
, .dsize
= sizeof(exclusive
),
141 dbufs
[1] = (TDB_DATA
) {
142 .dptr
= seqnum_buf
, .dsize
= sizeof(seqnum_buf
),
144 dbufs
[2] = (TDB_DATA
) {
145 .dptr
= sizebuf
, .dsize
= sizeof(sizebuf
),
147 dbufs
[3] = (TDB_DATA
) {
149 .dsize
= lck
->num_shared
* SERVER_ID_BUF_LENGTH
,
151 dbufs
[4] = (TDB_DATA
) { 0 };
152 dbufs
[5] = (TDB_DATA
) {
153 .dptr
= lck
->data
, .dsize
= lck
->datalen
,
156 if (num_new_dbufs
!= 0) {
159 num_new_dbufs
* sizeof(TDB_DATA
));
162 server_id_put(exclusive
, lck
->exclusive
);
163 SBVAL(seqnum_buf
, 0, lck
->unique_data_epoch
);
165 if (new_shared
!= NULL
) {
166 if (lck
->num_shared
>= UINT32_MAX
) {
167 return NT_STATUS_BUFFER_OVERFLOW
;
170 server_id_put(new_shared_buf
, *new_shared
);
172 dbufs
[4] = (TDB_DATA
) {
173 .dptr
= new_shared_buf
,
174 .dsize
= sizeof(new_shared_buf
),
177 lck
->num_shared
+= 1;
180 SIVAL(sizebuf
, 0, lck
->num_shared
);
182 return dbwrap_record_storev(rec
, dbufs
, ARRAY_SIZE(dbufs
), 0);
185 struct g_lock_ctx
*g_lock_ctx_init_backend(
187 struct messaging_context
*msg
,
188 struct db_context
**backend
)
190 struct g_lock_ctx
*result
;
192 result
= talloc(mem_ctx
, struct g_lock_ctx
);
193 if (result
== NULL
) {
197 result
->lock_order
= DBWRAP_LOCK_ORDER_NONE
;
199 result
->db
= db_open_watched(result
, backend
, msg
);
200 if (result
->db
== NULL
) {
201 DBG_WARNING("db_open_watched failed\n");
208 void g_lock_set_lock_order(struct g_lock_ctx
*ctx
,
209 enum dbwrap_lock_order lock_order
)
211 ctx
->lock_order
= lock_order
;
214 struct g_lock_ctx
*g_lock_ctx_init(TALLOC_CTX
*mem_ctx
,
215 struct messaging_context
*msg
)
217 char *db_path
= NULL
;
218 struct db_context
*backend
= NULL
;
219 struct g_lock_ctx
*ctx
= NULL
;
221 db_path
= lock_path(mem_ctx
, "g_lock.tdb");
222 if (db_path
== NULL
) {
230 TDB_CLEAR_IF_FIRST
|TDB_INCOMPATIBLE_HASH
,
235 TALLOC_FREE(db_path
);
236 if (backend
== NULL
) {
237 DBG_WARNING("Could not open g_lock.tdb\n");
241 ctx
= g_lock_ctx_init_backend(mem_ctx
, msg
, &backend
);
245 static NTSTATUS
g_lock_cleanup_dead(
246 struct db_record
*rec
,
248 struct server_id
*dead_blocker
)
250 bool modified
= false;
252 NTSTATUS status
= NT_STATUS_OK
;
253 struct server_id_buf tmp
;
255 if (dead_blocker
== NULL
) {
259 exclusive_died
= server_id_equal(dead_blocker
, &lck
->exclusive
);
261 if (exclusive_died
) {
262 DBG_DEBUG("Exclusive holder %s died\n",
263 server_id_str_buf(lck
->exclusive
, &tmp
));
264 lck
->exclusive
.pid
= 0;
268 if (lck
->num_shared
!= 0) {
270 struct server_id shared
;
272 g_lock_get_shared(lck
, 0, &shared
);
273 shared_died
= server_id_equal(dead_blocker
, &shared
);
276 DBG_DEBUG("Shared holder %s died\n",
277 server_id_str_buf(shared
, &tmp
));
278 g_lock_del_shared(lck
, 0);
284 status
= g_lock_store(rec
, lck
, NULL
, NULL
, 0);
285 if (!NT_STATUS_IS_OK(status
)) {
286 DBG_DEBUG("g_lock_store() failed: %s\n",
294 static ssize_t
g_lock_find_shared(
296 const struct server_id
*self
)
300 for (i
=0; i
<lck
->num_shared
; i
++) {
301 struct server_id shared
;
304 g_lock_get_shared(lck
, i
, &shared
);
306 same
= server_id_equal(self
, &shared
);
315 static void g_lock_cleanup_shared(struct g_lock
*lck
)
318 struct server_id check
;
321 if (lck
->num_shared
== 0) {
326 * Read locks can stay around forever if the process dies. Do
327 * a heuristic check for process existence: Check one random
328 * process for existence. Hopefully this will keep runaway
329 * read locks under control.
331 i
= generate_random() % lck
->num_shared
;
332 g_lock_get_shared(lck
, i
, &check
);
334 exists
= serverid_exists(&check
);
336 struct server_id_buf tmp
;
337 DBG_DEBUG("Shared locker %s died -- removing\n",
338 server_id_str_buf(check
, &tmp
));
339 g_lock_del_shared(lck
, i
);
343 struct g_lock_lock_state
{
344 struct tevent_context
*ev
;
345 struct g_lock_ctx
*ctx
;
347 enum g_lock_type type
;
351 struct g_lock_lock_fn_state
{
352 struct g_lock_lock_state
*req_state
;
353 struct server_id
*dead_blocker
;
355 struct tevent_req
*watch_req
;
359 static int g_lock_lock_state_destructor(struct g_lock_lock_state
*s
);
361 static NTSTATUS
g_lock_trylock(
362 struct db_record
*rec
,
363 struct g_lock_lock_fn_state
*state
,
365 struct server_id
*blocker
)
367 struct g_lock_lock_state
*req_state
= state
->req_state
;
368 struct server_id self
= messaging_server_id(req_state
->ctx
->msg
);
369 enum g_lock_type type
= req_state
->type
;
370 bool retry
= req_state
->retry
;
371 struct g_lock lck
= { .exclusive
.pid
= 0 };
372 struct server_id_buf tmp
;
376 ok
= g_lock_parse(data
.dptr
, data
.dsize
, &lck
);
378 DBG_DEBUG("g_lock_parse failed\n");
379 return NT_STATUS_INTERNAL_DB_CORRUPTION
;
382 status
= g_lock_cleanup_dead(rec
, &lck
, state
->dead_blocker
);
383 if (!NT_STATUS_IS_OK(status
)) {
384 DBG_DEBUG("g_lock_cleanup_dead() failed: %s\n",
389 if (lck
.exclusive
.pid
!= 0) {
390 bool self_exclusive
= server_id_equal(&self
, &lck
.exclusive
);
392 if (!self_exclusive
) {
393 bool exists
= serverid_exists(&lck
.exclusive
);
395 lck
.exclusive
= (struct server_id
) { .pid
=0 };
399 DBG_DEBUG("%s has an exclusive lock\n",
400 server_id_str_buf(lck
.exclusive
, &tmp
));
402 if (type
== G_LOCK_DOWNGRADE
) {
403 struct server_id_buf tmp2
;
404 DBG_DEBUG("%s: Trying to downgrade %s\n",
405 server_id_str_buf(self
, &tmp
),
407 lck
.exclusive
, &tmp2
));
408 return NT_STATUS_NOT_LOCKED
;
411 if (type
== G_LOCK_UPGRADE
) {
413 shared_idx
= g_lock_find_shared(&lck
, &self
);
415 if (shared_idx
== -1) {
416 DBG_DEBUG("Trying to upgrade %s "
418 "existing shared lock\n",
421 return NT_STATUS_NOT_LOCKED
;
425 * We're trying to upgrade, and the
426 * exlusive lock is taken by someone
427 * else. This means that someone else
428 * is waiting for us to give up our
429 * shared lock. If we now also wait
430 * for someone to give their shared
431 * lock, we will deadlock.
434 DBG_DEBUG("Trying to upgrade %s while "
435 "someone else is also "
436 "trying to upgrade\n",
437 server_id_str_buf(self
, &tmp
));
438 return NT_STATUS_POSSIBLE_DEADLOCK
;
441 DBG_DEBUG("Waiting for lck.exclusive=%s\n",
442 server_id_str_buf(lck
.exclusive
, &tmp
));
444 *blocker
= lck
.exclusive
;
445 return NT_STATUS_LOCK_NOT_GRANTED
;
448 if (type
== G_LOCK_DOWNGRADE
) {
449 DBG_DEBUG("Downgrading %s from WRITE to READ\n",
450 server_id_str_buf(self
, &tmp
));
452 lck
.exclusive
= (struct server_id
) { .pid
= 0 };
457 DBG_DEBUG("%s already locked by self\n",
458 server_id_str_buf(self
, &tmp
));
459 return NT_STATUS_WAS_LOCKED
;
462 if (lck
.num_shared
!= 0) {
463 g_lock_get_shared(&lck
, 0, blocker
);
465 DBG_DEBUG("Continue waiting for shared lock %s\n",
466 server_id_str_buf(*blocker
, &tmp
));
468 return NT_STATUS_LOCK_NOT_GRANTED
;
471 talloc_set_destructor(req_state
, NULL
);
474 * Retry after a conflicting lock was released
481 if (type
== G_LOCK_UPGRADE
) {
482 ssize_t shared_idx
= g_lock_find_shared(&lck
, &self
);
484 if (shared_idx
== -1) {
485 DBG_DEBUG("Trying to upgrade %s without "
486 "existing shared lock\n",
487 server_id_str_buf(self
, &tmp
));
488 return NT_STATUS_NOT_LOCKED
;
491 g_lock_del_shared(&lck
, shared_idx
);
495 if (type
== G_LOCK_WRITE
) {
496 ssize_t shared_idx
= g_lock_find_shared(&lck
, &self
);
498 if (shared_idx
!= -1) {
499 DBG_DEBUG("Trying to writelock existing shared %s\n",
500 server_id_str_buf(self
, &tmp
));
501 return NT_STATUS_WAS_LOCKED
;
504 lck
.exclusive
= self
;
506 status
= g_lock_store(rec
, &lck
, NULL
, NULL
, 0);
507 if (!NT_STATUS_IS_OK(status
)) {
508 DBG_DEBUG("g_lock_store() failed: %s\n",
513 if (lck
.num_shared
!= 0) {
514 talloc_set_destructor(
515 req_state
, g_lock_lock_state_destructor
);
517 g_lock_get_shared(&lck
, 0, blocker
);
519 DBG_DEBUG("Waiting for %zu shared locks, "
520 "picking blocker %s\n",
522 server_id_str_buf(*blocker
, &tmp
));
524 return NT_STATUS_LOCK_NOT_GRANTED
;
527 talloc_set_destructor(req_state
, NULL
);
534 if (lck
.num_shared
== 0) {
535 status
= g_lock_store(rec
, &lck
, &self
, NULL
, 0);
536 if (!NT_STATUS_IS_OK(status
)) {
537 DBG_DEBUG("g_lock_store() failed: %s\n",
544 g_lock_cleanup_shared(&lck
);
546 status
= g_lock_store(rec
, &lck
, &self
, NULL
, 0);
547 if (!NT_STATUS_IS_OK(status
)) {
548 DBG_DEBUG("g_lock_store() failed: %s\n",
556 static void g_lock_lock_fn(
557 struct db_record
*rec
,
561 struct g_lock_lock_fn_state
*state
= private_data
;
562 struct server_id blocker
= {0};
564 state
->status
= g_lock_trylock(rec
, state
, value
, &blocker
);
565 if (!NT_STATUS_IS_OK(state
->status
)) {
566 DBG_DEBUG("g_lock_trylock returned %s\n",
567 nt_errstr(state
->status
));
569 if (!NT_STATUS_EQUAL(state
->status
, NT_STATUS_LOCK_NOT_GRANTED
)) {
573 state
->watch_req
= dbwrap_watched_watch_send(
574 state
->req_state
, state
->req_state
->ev
, rec
, blocker
);
575 if (state
->watch_req
== NULL
) {
576 state
->status
= NT_STATUS_NO_MEMORY
;
580 static int g_lock_lock_state_destructor(struct g_lock_lock_state
*s
)
582 NTSTATUS status
= g_lock_unlock(s
->ctx
, s
->key
);
583 if (!NT_STATUS_IS_OK(status
)) {
584 DBG_DEBUG("g_lock_unlock failed: %s\n", nt_errstr(status
));
589 static void g_lock_lock_retry(struct tevent_req
*subreq
);
591 struct tevent_req
*g_lock_lock_send(TALLOC_CTX
*mem_ctx
,
592 struct tevent_context
*ev
,
593 struct g_lock_ctx
*ctx
,
595 enum g_lock_type type
)
597 struct tevent_req
*req
;
598 struct g_lock_lock_state
*state
;
599 struct g_lock_lock_fn_state fn_state
;
603 req
= tevent_req_create(mem_ctx
, &state
, struct g_lock_lock_state
);
612 fn_state
= (struct g_lock_lock_fn_state
) {
616 status
= dbwrap_do_locked(ctx
->db
, key
, g_lock_lock_fn
, &fn_state
);
617 if (tevent_req_nterror(req
, status
)) {
618 DBG_DEBUG("dbwrap_do_locked failed: %s\n",
620 return tevent_req_post(req
, ev
);
623 if (NT_STATUS_IS_OK(fn_state
.status
)) {
624 tevent_req_done(req
);
625 return tevent_req_post(req
, ev
);
627 if (!NT_STATUS_EQUAL(fn_state
.status
, NT_STATUS_LOCK_NOT_GRANTED
)) {
628 tevent_req_nterror(req
, fn_state
.status
);
629 return tevent_req_post(req
, ev
);
632 if (tevent_req_nomem(fn_state
.watch_req
, req
)) {
633 return tevent_req_post(req
, ev
);
636 ok
= tevent_req_set_endtime(
639 timeval_current_ofs(5 + generate_random() % 5, 0));
642 return tevent_req_post(req
, ev
);
644 tevent_req_set_callback(fn_state
.watch_req
, g_lock_lock_retry
, req
);
649 static void g_lock_lock_retry(struct tevent_req
*subreq
)
651 struct tevent_req
*req
= tevent_req_callback_data(
652 subreq
, struct tevent_req
);
653 struct g_lock_lock_state
*state
= tevent_req_data(
654 req
, struct g_lock_lock_state
);
655 struct g_lock_lock_fn_state fn_state
;
656 struct server_id blocker
= { .pid
= 0 };
657 bool blockerdead
= false;
660 status
= dbwrap_watched_watch_recv(subreq
, &blockerdead
, &blocker
);
661 DBG_DEBUG("watch_recv returned %s\n", nt_errstr(status
));
664 if (!NT_STATUS_IS_OK(status
) &&
665 !NT_STATUS_EQUAL(status
, NT_STATUS_IO_TIMEOUT
)) {
666 tevent_req_nterror(req
, status
);
672 fn_state
= (struct g_lock_lock_fn_state
) {
674 .dead_blocker
= blockerdead
? &blocker
: NULL
,
677 status
= dbwrap_do_locked(state
->ctx
->db
, state
->key
,
678 g_lock_lock_fn
, &fn_state
);
679 if (tevent_req_nterror(req
, status
)) {
680 DBG_DEBUG("dbwrap_do_locked failed: %s\n",
685 if (NT_STATUS_IS_OK(fn_state
.status
)) {
686 tevent_req_done(req
);
689 if (!NT_STATUS_EQUAL(fn_state
.status
, NT_STATUS_LOCK_NOT_GRANTED
)) {
690 tevent_req_nterror(req
, fn_state
.status
);
694 if (tevent_req_nomem(fn_state
.watch_req
, req
)) {
698 if (!tevent_req_set_endtime(
699 fn_state
.watch_req
, state
->ev
,
700 timeval_current_ofs(5 + generate_random() % 5, 0))) {
703 tevent_req_set_callback(fn_state
.watch_req
, g_lock_lock_retry
, req
);
706 NTSTATUS
g_lock_lock_recv(struct tevent_req
*req
)
708 struct g_lock_lock_state
*state
= tevent_req_data(
709 req
, struct g_lock_lock_state
);
710 struct g_lock_ctx
*ctx
= state
->ctx
;
713 if (tevent_req_is_nterror(req
, &status
)) {
717 if ((ctx
->lock_order
!= DBWRAP_LOCK_ORDER_NONE
) &&
718 ((state
->type
== G_LOCK_READ
) ||
719 (state
->type
== G_LOCK_WRITE
))) {
720 const char *name
= dbwrap_name(ctx
->db
);
721 dbwrap_lock_order_lock(name
, ctx
->lock_order
);
727 struct g_lock_lock_simple_state
{
729 enum g_lock_type type
;
733 static void g_lock_lock_simple_fn(
734 struct db_record
*rec
,
738 struct g_lock_lock_simple_state
*state
= private_data
;
739 struct server_id_buf buf
;
740 struct g_lock lck
= { .exclusive
.pid
= 0 };
743 ok
= g_lock_parse(value
.dptr
, value
.dsize
, &lck
);
745 DBG_DEBUG("g_lock_parse failed\n");
746 state
->status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
750 if (lck
.exclusive
.pid
!= 0) {
751 DBG_DEBUG("locked by %s\n",
752 server_id_str_buf(lck
.exclusive
, &buf
));
756 if (state
->type
== G_LOCK_WRITE
) {
757 if (lck
.num_shared
!= 0) {
758 DBG_DEBUG("num_shared=%zu\n", lck
.num_shared
);
761 lck
.exclusive
= state
->me
;
762 state
->status
= g_lock_store(rec
, &lck
, NULL
, NULL
, 0);
766 if (state
->type
== G_LOCK_READ
) {
767 g_lock_cleanup_shared(&lck
);
768 state
->status
= g_lock_store(rec
, &lck
, &state
->me
, NULL
, 0);
773 state
->status
= NT_STATUS_LOCK_NOT_GRANTED
;
776 NTSTATUS
g_lock_lock(struct g_lock_ctx
*ctx
, TDB_DATA key
,
777 enum g_lock_type type
, struct timeval timeout
)
780 struct tevent_context
*ev
;
781 struct tevent_req
*req
;
785 if ((type
== G_LOCK_READ
) || (type
== G_LOCK_WRITE
)) {
787 * This is an abstraction violation: Normally we do
788 * the sync wrappers around async functions with full
789 * nested event contexts. However, this is used in
790 * very hot code paths, so avoid the event context
791 * creation for the good path where there's no lock
792 * contention. My benchmark gave a factor of 2
793 * improvement for lock/unlock.
795 struct g_lock_lock_simple_state state
= {
796 .me
= messaging_server_id(ctx
->msg
),
799 status
= dbwrap_do_locked(
800 ctx
->db
, key
, g_lock_lock_simple_fn
, &state
);
801 if (!NT_STATUS_IS_OK(status
)) {
802 DBG_DEBUG("dbwrap_do_locked() failed: %s\n",
807 DBG_DEBUG("status=%s, state.status=%s\n",
809 nt_errstr(state
.status
));
811 if (NT_STATUS_IS_OK(state
.status
)) {
812 if (ctx
->lock_order
!= DBWRAP_LOCK_ORDER_NONE
) {
813 const char *name
= dbwrap_name(ctx
->db
);
814 dbwrap_lock_order_lock(name
, ctx
->lock_order
);
818 if (!NT_STATUS_EQUAL(
819 state
.status
, NT_STATUS_LOCK_NOT_GRANTED
)) {
824 * Fall back to the full g_lock_trylock logic,
825 * g_lock_lock_simple_fn() called above only covers
826 * the uncontended path.
830 frame
= talloc_stackframe();
831 status
= NT_STATUS_NO_MEMORY
;
833 ev
= samba_tevent_context_init(frame
);
837 req
= g_lock_lock_send(frame
, ev
, ctx
, key
, type
);
841 end
= timeval_current_ofs(timeout
.tv_sec
, timeout
.tv_usec
);
842 if (!tevent_req_set_endtime(req
, ev
, end
)) {
845 if (!tevent_req_poll_ntstatus(req
, ev
, &status
)) {
848 status
= g_lock_lock_recv(req
);
854 struct g_lock_unlock_state
{
855 struct server_id self
;
859 static void g_lock_unlock_fn(
860 struct db_record
*rec
,
864 struct g_lock_unlock_state
*state
= private_data
;
865 struct server_id_buf tmp1
, tmp2
;
870 ok
= g_lock_parse(value
.dptr
, value
.dsize
, &lck
);
872 DBG_DEBUG("g_lock_parse() failed\n");
873 state
->status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
877 exclusive
= server_id_equal(&state
->self
, &lck
.exclusive
);
879 for (i
=0; i
<lck
.num_shared
; i
++) {
880 struct server_id shared
;
881 g_lock_get_shared(&lck
, i
, &shared
);
882 if (server_id_equal(&state
->self
, &shared
)) {
887 if (i
< lck
.num_shared
) {
889 DBG_DEBUG("%s both exclusive and shared (%zu)\n",
890 server_id_str_buf(state
->self
, &tmp1
),
892 state
->status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
895 g_lock_del_shared(&lck
, i
);
898 DBG_DEBUG("Lock not found, self=%s, lck.exclusive=%s, "
900 server_id_str_buf(state
->self
, &tmp1
),
901 server_id_str_buf(lck
.exclusive
, &tmp2
),
903 state
->status
= NT_STATUS_NOT_FOUND
;
906 lck
.exclusive
= (struct server_id
) { .pid
= 0 };
909 if ((lck
.exclusive
.pid
== 0) &&
910 (lck
.num_shared
== 0) &&
911 (lck
.datalen
== 0)) {
912 state
->status
= dbwrap_record_delete(rec
);
916 state
->status
= g_lock_store(rec
, &lck
, NULL
, NULL
, 0);
919 NTSTATUS
g_lock_unlock(struct g_lock_ctx
*ctx
, TDB_DATA key
)
921 struct g_lock_unlock_state state
= {
922 .self
= messaging_server_id(ctx
->msg
),
926 status
= dbwrap_do_locked(ctx
->db
, key
, g_lock_unlock_fn
, &state
);
927 if (!NT_STATUS_IS_OK(status
)) {
928 DBG_WARNING("dbwrap_do_locked failed: %s\n",
932 if (!NT_STATUS_IS_OK(state
.status
)) {
933 DBG_WARNING("g_lock_unlock_fn failed: %s\n",
934 nt_errstr(state
.status
));
938 if (ctx
->lock_order
!= DBWRAP_LOCK_ORDER_NONE
) {
939 const char *name
= dbwrap_name(ctx
->db
);
940 dbwrap_lock_order_unlock(name
, ctx
->lock_order
);
946 struct g_lock_writev_data_state
{
948 struct server_id self
;
949 const TDB_DATA
*dbufs
;
954 static void g_lock_writev_data_fn(
955 struct db_record
*rec
,
959 struct g_lock_writev_data_state
*state
= private_data
;
964 ok
= g_lock_parse(value
.dptr
, value
.dsize
, &lck
);
966 DBG_DEBUG("g_lock_parse for %s failed\n",
967 hex_encode_talloc(talloc_tos(),
970 state
->status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
974 exclusive
= server_id_equal(&state
->self
, &lck
.exclusive
);
977 * Make sure we're really exclusive. We are marked as
978 * exclusive when we are waiting for an exclusive lock
980 exclusive
&= (lck
.num_shared
== 0);
983 struct server_id_buf buf1
, buf2
;
984 DBG_DEBUG("Not locked by us: self=%s, lck.exclusive=%s, "
985 "lck.num_shared=%zu\n",
986 server_id_str_buf(state
->self
, &buf1
),
987 server_id_str_buf(lck
.exclusive
, &buf2
),
989 state
->status
= NT_STATUS_NOT_LOCKED
;
993 lck
.unique_data_epoch
= generate_unique_u64(lck
.unique_data_epoch
);
996 state
->status
= g_lock_store(
997 rec
, &lck
, NULL
, state
->dbufs
, state
->num_dbufs
);
1000 NTSTATUS
g_lock_writev_data(
1001 struct g_lock_ctx
*ctx
,
1003 const TDB_DATA
*dbufs
,
1006 struct g_lock_writev_data_state state
= {
1008 .self
= messaging_server_id(ctx
->msg
),
1010 .num_dbufs
= num_dbufs
,
1014 status
= dbwrap_do_locked(
1015 ctx
->db
, key
, g_lock_writev_data_fn
, &state
);
1016 if (!NT_STATUS_IS_OK(status
)) {
1017 DBG_WARNING("dbwrap_do_locked failed: %s\n",
1021 if (!NT_STATUS_IS_OK(state
.status
)) {
1022 DBG_WARNING("g_lock_writev_data_fn failed: %s\n",
1023 nt_errstr(state
.status
));
1024 return state
.status
;
1027 return NT_STATUS_OK
;
1030 NTSTATUS
g_lock_write_data(struct g_lock_ctx
*ctx
, TDB_DATA key
,
1031 const uint8_t *buf
, size_t buflen
)
1034 .dptr
= discard_const_p(uint8_t, buf
),
1037 return g_lock_writev_data(ctx
, key
, &dbuf
, 1);
1040 struct g_lock_locks_state
{
1041 int (*fn
)(TDB_DATA key
, void *private_data
);
1045 static int g_lock_locks_fn(struct db_record
*rec
, void *priv
)
1048 struct g_lock_locks_state
*state
= (struct g_lock_locks_state
*)priv
;
1050 key
= dbwrap_record_get_key(rec
);
1051 return state
->fn(key
, state
->private_data
);
1054 int g_lock_locks(struct g_lock_ctx
*ctx
,
1055 int (*fn
)(TDB_DATA key
, void *private_data
),
1058 struct g_lock_locks_state state
;
1063 state
.private_data
= private_data
;
1065 status
= dbwrap_traverse_read(ctx
->db
, g_lock_locks_fn
, &state
, &count
);
1066 if (!NT_STATUS_IS_OK(status
)) {
1072 struct g_lock_dump_state
{
1073 TALLOC_CTX
*mem_ctx
;
1075 void (*fn
)(struct server_id exclusive
,
1077 struct server_id
*shared
,
1078 const uint8_t *data
,
1080 void *private_data
);
1083 enum dbwrap_req_state req_state
;
1086 static void g_lock_dump_fn(TDB_DATA key
, TDB_DATA data
,
1089 struct g_lock_dump_state
*state
= private_data
;
1090 struct g_lock lck
= (struct g_lock
) { .exclusive
.pid
= 0 };
1091 struct server_id
*shared
= NULL
;
1095 ok
= g_lock_parse(data
.dptr
, data
.dsize
, &lck
);
1097 DBG_DEBUG("g_lock_parse failed for %s\n",
1098 hex_encode_talloc(talloc_tos(),
1101 state
->status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
1105 shared
= talloc_array(
1106 state
->mem_ctx
, struct server_id
, lck
.num_shared
);
1107 if (shared
== NULL
) {
1108 DBG_DEBUG("talloc failed\n");
1109 state
->status
= NT_STATUS_NO_MEMORY
;
1113 for (i
=0; i
<lck
.num_shared
; i
++) {
1114 g_lock_get_shared(&lck
, i
, &shared
[i
]);
1117 state
->fn(lck
.exclusive
,
1122 state
->private_data
);
1124 TALLOC_FREE(shared
);
1126 state
->status
= NT_STATUS_OK
;
1129 NTSTATUS
g_lock_dump(struct g_lock_ctx
*ctx
, TDB_DATA key
,
1130 void (*fn
)(struct server_id exclusive
,
1132 struct server_id
*shared
,
1133 const uint8_t *data
,
1135 void *private_data
),
1138 struct g_lock_dump_state state
= {
1139 .mem_ctx
= ctx
, .key
= key
,
1140 .fn
= fn
, .private_data
= private_data
1144 status
= dbwrap_parse_record(ctx
->db
, key
, g_lock_dump_fn
, &state
);
1145 if (!NT_STATUS_IS_OK(status
)) {
1146 DBG_DEBUG("dbwrap_parse_record returned %s\n",
1150 if (!NT_STATUS_IS_OK(state
.status
)) {
1151 DBG_DEBUG("g_lock_dump_fn returned %s\n",
1152 nt_errstr(state
.status
));
1153 return state
.status
;
1155 return NT_STATUS_OK
;
1158 static void g_lock_dump_done(struct tevent_req
*subreq
);
1160 struct tevent_req
*g_lock_dump_send(
1161 TALLOC_CTX
*mem_ctx
,
1162 struct tevent_context
*ev
,
1163 struct g_lock_ctx
*ctx
,
1165 void (*fn
)(struct server_id exclusive
,
1167 struct server_id
*shared
,
1168 const uint8_t *data
,
1170 void *private_data
),
1173 struct tevent_req
*req
= NULL
, *subreq
= NULL
;
1174 struct g_lock_dump_state
*state
= NULL
;
1176 req
= tevent_req_create(mem_ctx
, &state
, struct g_lock_dump_state
);
1180 state
->mem_ctx
= state
;
1183 state
->private_data
= private_data
;
1185 subreq
= dbwrap_parse_record_send(
1193 if (tevent_req_nomem(subreq
, req
)) {
1194 return tevent_req_post(req
, ev
);
1196 tevent_req_set_callback(subreq
, g_lock_dump_done
, req
);
1200 static void g_lock_dump_done(struct tevent_req
*subreq
)
1202 struct tevent_req
*req
= tevent_req_callback_data(
1203 subreq
, struct tevent_req
);
1204 struct g_lock_dump_state
*state
= tevent_req_data(
1205 req
, struct g_lock_dump_state
);
1208 status
= dbwrap_parse_record_recv(subreq
);
1209 TALLOC_FREE(subreq
);
1210 if (tevent_req_nterror(req
, status
) ||
1211 tevent_req_nterror(req
, state
->status
)) {
1214 tevent_req_done(req
);
1217 NTSTATUS
g_lock_dump_recv(struct tevent_req
*req
)
1219 return tevent_req_simple_recv_ntstatus(req
);
1222 int g_lock_seqnum(struct g_lock_ctx
*ctx
)
1224 return dbwrap_get_seqnum(ctx
->db
);
1227 struct g_lock_watch_data_state
{
1228 struct tevent_context
*ev
;
1229 struct g_lock_ctx
*ctx
;
1231 struct server_id blocker
;
1233 uint64_t unique_data_epoch
;
1237 static void g_lock_watch_data_done(struct tevent_req
*subreq
);
1239 static void g_lock_watch_data_send_fn(
1240 struct db_record
*rec
,
1244 struct tevent_req
*req
= talloc_get_type_abort(
1245 private_data
, struct tevent_req
);
1246 struct g_lock_watch_data_state
*state
= tevent_req_data(
1247 req
, struct g_lock_watch_data_state
);
1248 struct tevent_req
*subreq
= NULL
;
1252 ok
= g_lock_parse(value
.dptr
, value
.dsize
, &lck
);
1254 state
->status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
1257 state
->unique_data_epoch
= lck
.unique_data_epoch
;
1259 DBG_DEBUG("state->unique_data_epoch=%"PRIu64
"\n", state
->unique_data_epoch
);
1261 subreq
= dbwrap_watched_watch_send(
1262 state
, state
->ev
, rec
, state
->blocker
);
1263 if (subreq
== NULL
) {
1264 state
->status
= NT_STATUS_NO_MEMORY
;
1267 tevent_req_set_callback(subreq
, g_lock_watch_data_done
, req
);
1269 state
->status
= NT_STATUS_EVENT_PENDING
;
1272 struct tevent_req
*g_lock_watch_data_send(
1273 TALLOC_CTX
*mem_ctx
,
1274 struct tevent_context
*ev
,
1275 struct g_lock_ctx
*ctx
,
1277 struct server_id blocker
)
1279 struct tevent_req
*req
= NULL
;
1280 struct g_lock_watch_data_state
*state
= NULL
;
1283 req
= tevent_req_create(
1284 mem_ctx
, &state
, struct g_lock_watch_data_state
);
1290 state
->blocker
= blocker
;
1292 state
->key
= tdb_data_talloc_copy(state
, key
);
1293 if (tevent_req_nomem(state
->key
.dptr
, req
)) {
1294 return tevent_req_post(req
, ev
);
1297 status
= dbwrap_do_locked(
1298 ctx
->db
, key
, g_lock_watch_data_send_fn
, req
);
1299 if (tevent_req_nterror(req
, status
)) {
1300 DBG_DEBUG("dbwrap_do_locked returned %s\n", nt_errstr(status
));
1301 return tevent_req_post(req
, ev
);
1304 if (NT_STATUS_IS_OK(state
->status
)) {
1305 tevent_req_done(req
);
1306 return tevent_req_post(req
, ev
);
1312 static void g_lock_watch_data_done_fn(
1313 struct db_record
*rec
,
1317 struct tevent_req
*req
= talloc_get_type_abort(
1318 private_data
, struct tevent_req
);
1319 struct g_lock_watch_data_state
*state
= tevent_req_data(
1320 req
, struct g_lock_watch_data_state
);
1321 struct tevent_req
*subreq
= NULL
;
1325 ok
= g_lock_parse(value
.dptr
, value
.dsize
, &lck
);
1327 state
->status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
1331 if (lck
.unique_data_epoch
!= state
->unique_data_epoch
) {
1332 DBG_DEBUG("lck.unique_data_epoch=%"PRIu64
", "
1333 "state->unique_data_epoch=%"PRIu64
"\n",
1334 lck
.unique_data_epoch
,
1335 state
->unique_data_epoch
);
1336 state
->status
= NT_STATUS_OK
;
1340 subreq
= dbwrap_watched_watch_send(
1341 state
, state
->ev
, rec
, state
->blocker
);
1342 if (subreq
== NULL
) {
1343 state
->status
= NT_STATUS_NO_MEMORY
;
1346 tevent_req_set_callback(subreq
, g_lock_watch_data_done
, req
);
1348 state
->status
= NT_STATUS_EVENT_PENDING
;
1351 static void g_lock_watch_data_done(struct tevent_req
*subreq
)
1353 struct tevent_req
*req
= tevent_req_callback_data(
1354 subreq
, struct tevent_req
);
1355 struct g_lock_watch_data_state
*state
= tevent_req_data(
1356 req
, struct g_lock_watch_data_state
);
1359 status
= dbwrap_watched_watch_recv(
1360 subreq
, &state
->blockerdead
, &state
->blocker
);
1361 TALLOC_FREE(subreq
);
1362 if (tevent_req_nterror(req
, status
)) {
1363 DBG_DEBUG("dbwrap_watched_watch_recv returned %s\n",
1368 status
= dbwrap_do_locked(
1369 state
->ctx
->db
, state
->key
, g_lock_watch_data_done_fn
, req
);
1370 if (tevent_req_nterror(req
, status
)) {
1371 DBG_DEBUG("dbwrap_do_locked returned %s\n", nt_errstr(status
));
1374 if (NT_STATUS_EQUAL(state
->status
, NT_STATUS_EVENT_PENDING
)) {
1377 if (tevent_req_nterror(req
, state
->status
)) {
1380 tevent_req_done(req
);
1383 NTSTATUS
g_lock_watch_data_recv(
1384 struct tevent_req
*req
,
1386 struct server_id
*blocker
)
1388 struct g_lock_watch_data_state
*state
= tevent_req_data(
1389 req
, struct g_lock_watch_data_state
);
1392 if (tevent_req_is_nterror(req
, &status
)) {
1395 if (blockerdead
!= NULL
) {
1396 *blockerdead
= state
->blockerdead
;
1398 if (blocker
!= NULL
) {
1399 *blocker
= state
->blocker
;
1402 return NT_STATUS_OK
;
1405 static void g_lock_wake_watchers_fn(
1406 struct db_record
*rec
,
1410 struct g_lock lck
= { .exclusive
.pid
= 0 };
1414 ok
= g_lock_parse(value
.dptr
, value
.dsize
, &lck
);
1416 DBG_WARNING("g_lock_parse failed\n");
1420 lck
.unique_data_epoch
= generate_unique_u64(lck
.unique_data_epoch
);
1422 status
= g_lock_store(rec
, &lck
, NULL
, NULL
, 0);
1423 if (!NT_STATUS_IS_OK(status
)) {
1424 DBG_WARNING("g_lock_store failed: %s\n", nt_errstr(status
));
1429 void g_lock_wake_watchers(struct g_lock_ctx
*ctx
, TDB_DATA key
)
1433 status
= dbwrap_do_locked(ctx
->db
, key
, g_lock_wake_watchers_fn
, NULL
);
1434 if (!NT_STATUS_IS_OK(status
)) {
1435 DBG_DEBUG("dbwrap_do_locked returned %s\n",