s4-samba-tool: add password verification in change user pass
[Samba/gebeck_regimport.git] / source4 / ntvfs / common / brlock_tdb.c
blob5c89b41b22faad2e4809c3a06b3dc24b261ccdf1
1 /*
2 Unix SMB/CIFS implementation.
4 generic byte range locking code - tdb backend
6 Copyright (C) Andrew Tridgell 1992-2006
7 Copyright (C) Jeremy Allison 1992-2000
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /* This module implements a tdb based byte range locking service,
24 replacing the fcntl() based byte range locking previously
25 used. This allows us to provide the same semantics as NT */
27 #include "includes.h"
28 #include "system/filesys.h"
29 #include "tdb_compat.h"
30 #include "messaging/messaging.h"
31 #include "lib/tdb_wrap/tdb_wrap.h"
32 #include "lib/messaging/irpc.h"
33 #include "libcli/libcli.h"
34 #include "cluster/cluster.h"
35 #include "ntvfs/common/brlock.h"
36 #include "ntvfs/ntvfs.h"
37 #include "param/param.h"
40 in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies
41 a file. For a local posix filesystem this will usually be a combination
42 of the device and inode numbers of the file, but it can be anything
43 that uniquely idetifies a file for locking purposes, as long
44 as it is applied consistently.
47 /* this struct is typicaly attached to tcon */
48 struct brl_context {
49 struct tdb_wrap *w;
50 struct server_id server;
51 struct imessaging_context *imessaging_ctx;
55 the lock context contains the elements that define whether one
56 lock is the same as another lock
58 struct lock_context {
59 struct server_id server;
60 uint32_t smbpid;
61 struct brl_context *ctx;
64 /* The data in brlock records is an unsorted linear array of these
65 records. It is unnecessary to store the count as tdb provides the
66 size of the record */
67 struct lock_struct {
68 struct lock_context context;
69 struct ntvfs_handle *ntvfs;
70 uint64_t start;
71 uint64_t size;
72 enum brl_type lock_type;
73 void *notify_ptr;
76 /* this struct is attached to on oprn file handle */
77 struct brl_handle {
78 DATA_BLOB key;
79 struct ntvfs_handle *ntvfs;
80 struct lock_struct last_lock;
83 /* see if we have wrapped locks, which are no longer allowed (windows
84 * changed this in win7 */
85 static bool brl_invalid_lock_range(uint64_t start, uint64_t size)
87 return (size > 1 && (start + size < start));
91 Open up the brlock.tdb database. Close it down using
92 talloc_free(). We need the imessaging_ctx to allow for
93 pending lock notifications.
95 static struct brl_context *brl_tdb_init(TALLOC_CTX *mem_ctx, struct server_id server,
96 struct loadparm_context *lp_ctx,
97 struct imessaging_context *imessaging_ctx)
99 struct brl_context *brl;
101 brl = talloc(mem_ctx, struct brl_context);
102 if (brl == NULL) {
103 return NULL;
106 brl->w = cluster_tdb_tmp_open(brl, lp_ctx, "brlock.tdb", TDB_DEFAULT);
107 if (brl->w == NULL) {
108 talloc_free(brl);
109 return NULL;
112 brl->server = server;
113 brl->imessaging_ctx = imessaging_ctx;
115 return brl;
118 static struct brl_handle *brl_tdb_create_handle(TALLOC_CTX *mem_ctx, struct ntvfs_handle *ntvfs,
119 DATA_BLOB *file_key)
121 struct brl_handle *brlh;
123 brlh = talloc(mem_ctx, struct brl_handle);
124 if (brlh == NULL) {
125 return NULL;
128 brlh->key = *file_key;
129 brlh->ntvfs = ntvfs;
130 ZERO_STRUCT(brlh->last_lock);
132 return brlh;
136 see if two locking contexts are equal
138 static bool brl_tdb_same_context(struct lock_context *ctx1, struct lock_context *ctx2)
140 return (cluster_id_equal(&ctx1->server, &ctx2->server) &&
141 ctx1->smbpid == ctx2->smbpid &&
142 ctx1->ctx == ctx2->ctx);
146 see if lck1 and lck2 overlap
148 lck1 is the existing lock. lck2 is the new lock we are
149 looking at adding
151 static bool brl_tdb_overlap(struct lock_struct *lck1,
152 struct lock_struct *lck2)
154 /* this extra check is not redundent - it copes with locks
155 that go beyond the end of 64 bit file space */
156 if (lck1->size != 0 &&
157 lck1->start == lck2->start &&
158 lck1->size == lck2->size) {
159 return true;
162 if (lck1->start >= (lck2->start+lck2->size) ||
163 lck2->start >= (lck1->start+lck1->size)) {
164 return false;
167 /* we have a conflict. Now check to see if lck1 really still
168 * exists, which involves checking if the process still
169 * exists. We leave this test to last as its the most
170 * expensive test, especially when we are clustered */
171 /* TODO: need to do this via a server_id_exists() call, which
172 * hasn't been written yet. When clustered this will need to
173 * call into ctdb */
175 return true;
179 See if lock2 can be added when lock1 is in place.
181 static bool brl_tdb_conflict(struct lock_struct *lck1,
182 struct lock_struct *lck2)
184 /* pending locks don't conflict with anything */
185 if (lck1->lock_type >= PENDING_READ_LOCK ||
186 lck2->lock_type >= PENDING_READ_LOCK) {
187 return false;
190 if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
191 return false;
194 if (brl_tdb_same_context(&lck1->context, &lck2->context) &&
195 lck2->lock_type == READ_LOCK && lck1->ntvfs == lck2->ntvfs) {
196 return false;
199 return brl_tdb_overlap(lck1, lck2);
204 Check to see if this lock conflicts, but ignore our own locks on the
205 same fnum only.
207 static bool brl_tdb_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2)
209 /* pending locks don't conflict with anything */
210 if (lck1->lock_type >= PENDING_READ_LOCK ||
211 lck2->lock_type >= PENDING_READ_LOCK) {
212 return false;
215 if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK)
216 return false;
219 * note that incoming write calls conflict with existing READ
220 * locks even if the context is the same. JRA. See LOCKTEST7
221 * in smbtorture.
223 if (brl_tdb_same_context(&lck1->context, &lck2->context) &&
224 lck1->ntvfs == lck2->ntvfs &&
225 (lck2->lock_type == READ_LOCK || lck1->lock_type == WRITE_LOCK)) {
226 return false;
229 return brl_tdb_overlap(lck1, lck2);
234 amazingly enough, w2k3 "remembers" whether the last lock failure
235 is the same as this one and changes its error code. I wonder if any
236 app depends on this?
238 static NTSTATUS brl_tdb_lock_failed(struct brl_handle *brlh, struct lock_struct *lock)
241 * this function is only called for non pending lock!
244 /* in SMB2 mode always return NT_STATUS_LOCK_NOT_GRANTED! */
245 if (lock->ntvfs->ctx->protocol >= PROTOCOL_SMB2_02) {
246 return NT_STATUS_LOCK_NOT_GRANTED;
250 * if the notify_ptr is non NULL,
251 * it means that we're at the end of a pending lock
252 * and the real lock is requested after the timout went by
253 * In this case we need to remember the last_lock and always
254 * give FILE_LOCK_CONFLICT
256 if (lock->notify_ptr) {
257 brlh->last_lock = *lock;
258 return NT_STATUS_FILE_LOCK_CONFLICT;
262 * amazing the little things you learn with a test
263 * suite. Locks beyond this offset (as a 64 bit
264 * number!) always generate the conflict error code,
265 * unless the top bit is set
267 if (lock->start >= 0xEF000000 && (lock->start >> 63) == 0) {
268 brlh->last_lock = *lock;
269 return NT_STATUS_FILE_LOCK_CONFLICT;
273 * if the current lock matches the last failed lock on the file handle
274 * and starts at the same offset, then FILE_LOCK_CONFLICT should be returned
276 if (cluster_id_equal(&lock->context.server, &brlh->last_lock.context.server) &&
277 lock->context.ctx == brlh->last_lock.context.ctx &&
278 lock->ntvfs == brlh->last_lock.ntvfs &&
279 lock->start == brlh->last_lock.start) {
280 return NT_STATUS_FILE_LOCK_CONFLICT;
283 brlh->last_lock = *lock;
284 return NT_STATUS_LOCK_NOT_GRANTED;
288 Lock a range of bytes. The lock_type can be a PENDING_*_LOCK, in
289 which case a real lock is first tried, and if that fails then a
290 pending lock is created. When the pending lock is triggered (by
291 someone else closing an overlapping lock range) a messaging
292 notification is sent, identified by the notify_ptr
294 static NTSTATUS brl_tdb_lock(struct brl_context *brl,
295 struct brl_handle *brlh,
296 uint32_t smbpid,
297 uint64_t start, uint64_t size,
298 enum brl_type lock_type,
299 void *notify_ptr)
301 TDB_DATA kbuf, dbuf;
302 int count=0, i;
303 struct lock_struct lock, *locks=NULL;
304 NTSTATUS status;
306 kbuf.dptr = brlh->key.data;
307 kbuf.dsize = brlh->key.length;
309 if (brl_invalid_lock_range(start, size)) {
310 return NT_STATUS_INVALID_LOCK_RANGE;
313 if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
314 return NT_STATUS_INTERNAL_DB_CORRUPTION;
317 /* if this is a pending lock, then with the chainlock held we
318 try to get the real lock. If we succeed then we don't need
319 to make it pending. This prevents a possible race condition
320 where the pending lock gets created after the lock that is
321 preventing the real lock gets removed */
322 if (lock_type >= PENDING_READ_LOCK) {
323 enum brl_type rw = (lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
325 /* here we need to force that the last_lock isn't overwritten */
326 lock = brlh->last_lock;
327 status = brl_tdb_lock(brl, brlh, smbpid, start, size, rw, NULL);
328 brlh->last_lock = lock;
330 if (NT_STATUS_IS_OK(status)) {
331 tdb_chainunlock(brl->w->tdb, kbuf);
332 return NT_STATUS_OK;
336 dbuf = tdb_fetch_compat(brl->w->tdb, kbuf);
338 lock.context.smbpid = smbpid;
339 lock.context.server = brl->server;
340 lock.context.ctx = brl;
341 lock.ntvfs = brlh->ntvfs;
342 lock.context.ctx = brl;
343 lock.start = start;
344 lock.size = size;
345 lock.lock_type = lock_type;
346 lock.notify_ptr = notify_ptr;
348 if (dbuf.dptr) {
349 /* there are existing locks - make sure they don't conflict */
350 locks = (struct lock_struct *)dbuf.dptr;
351 count = dbuf.dsize / sizeof(*locks);
352 for (i=0; i<count; i++) {
353 if (brl_tdb_conflict(&locks[i], &lock)) {
354 status = brl_tdb_lock_failed(brlh, &lock);
355 goto fail;
360 /* no conflicts - add it to the list of locks */
361 locks = realloc_p(locks, struct lock_struct, count+1);
362 if (!locks) {
363 status = NT_STATUS_NO_MEMORY;
364 goto fail;
365 } else {
366 dbuf.dptr = (uint8_t *)locks;
368 locks[count] = lock;
369 dbuf.dsize += sizeof(lock);
371 if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
372 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
373 goto fail;
376 free(dbuf.dptr);
377 tdb_chainunlock(brl->w->tdb, kbuf);
379 /* the caller needs to know if the real lock was granted. If
380 we have reached here then it must be a pending lock that
381 was granted, so tell them the lock failed */
382 if (lock_type >= PENDING_READ_LOCK) {
383 return NT_STATUS_LOCK_NOT_GRANTED;
386 return NT_STATUS_OK;
388 fail:
390 free(dbuf.dptr);
391 tdb_chainunlock(brl->w->tdb, kbuf);
392 return status;
397 we are removing a lock that might be holding up a pending lock. Scan for pending
398 locks that cover this range and if we find any then notify the server that it should
399 retry the lock
401 static void brl_tdb_notify_unlock(struct brl_context *brl,
402 struct lock_struct *locks, int count,
403 struct lock_struct *removed_lock)
405 int i, last_notice;
407 /* the last_notice logic is to prevent stampeding on a lock
408 range. It prevents us sending hundreds of notifies on the
409 same range of bytes. It doesn't prevent all possible
410 stampedes, but it does prevent the most common problem */
411 last_notice = -1;
413 for (i=0;i<count;i++) {
414 if (locks[i].lock_type >= PENDING_READ_LOCK &&
415 brl_tdb_overlap(&locks[i], removed_lock)) {
416 if (last_notice != -1 && brl_tdb_overlap(&locks[i], &locks[last_notice])) {
417 continue;
419 if (locks[i].lock_type == PENDING_WRITE_LOCK) {
420 last_notice = i;
422 imessaging_send_ptr(brl->imessaging_ctx, locks[i].context.server,
423 MSG_BRL_RETRY, locks[i].notify_ptr);
430 send notifications for all pending locks - the file is being closed by this
431 user
433 static void brl_tdb_notify_all(struct brl_context *brl,
434 struct lock_struct *locks, int count)
436 int i;
437 for (i=0;i<count;i++) {
438 if (locks->lock_type >= PENDING_READ_LOCK) {
439 brl_tdb_notify_unlock(brl, locks, count, &locks[i]);
447 Unlock a range of bytes.
449 static NTSTATUS brl_tdb_unlock(struct brl_context *brl,
450 struct brl_handle *brlh,
451 uint32_t smbpid,
452 uint64_t start, uint64_t size)
454 TDB_DATA kbuf, dbuf;
455 int count, i;
456 struct lock_struct *locks, *lock;
457 struct lock_context context;
458 NTSTATUS status;
460 kbuf.dptr = brlh->key.data;
461 kbuf.dsize = brlh->key.length;
463 if (brl_invalid_lock_range(start, size)) {
464 return NT_STATUS_INVALID_LOCK_RANGE;
467 if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
468 return NT_STATUS_INTERNAL_DB_CORRUPTION;
471 dbuf = tdb_fetch_compat(brl->w->tdb, kbuf);
472 if (!dbuf.dptr) {
473 tdb_chainunlock(brl->w->tdb, kbuf);
474 return NT_STATUS_RANGE_NOT_LOCKED;
477 context.smbpid = smbpid;
478 context.server = brl->server;
479 context.ctx = brl;
481 /* there are existing locks - find a match */
482 locks = (struct lock_struct *)dbuf.dptr;
483 count = dbuf.dsize / sizeof(*locks);
485 for (i=0; i<count; i++) {
486 lock = &locks[i];
487 if (brl_tdb_same_context(&lock->context, &context) &&
488 lock->ntvfs == brlh->ntvfs &&
489 lock->start == start &&
490 lock->size == size &&
491 lock->lock_type == WRITE_LOCK) {
492 break;
495 if (i < count) goto found;
497 for (i=0; i<count; i++) {
498 lock = &locks[i];
499 if (brl_tdb_same_context(&lock->context, &context) &&
500 lock->ntvfs == brlh->ntvfs &&
501 lock->start == start &&
502 lock->size == size &&
503 lock->lock_type < PENDING_READ_LOCK) {
504 break;
508 found:
509 if (i < count) {
510 /* found it - delete it */
511 if (count == 1) {
512 if (tdb_delete(brl->w->tdb, kbuf) != 0) {
513 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
514 goto fail;
516 } else {
517 struct lock_struct removed_lock = *lock;
518 if (i < count-1) {
519 memmove(&locks[i], &locks[i+1],
520 sizeof(*locks)*((count-1) - i));
522 count--;
524 /* send notifications for any relevant pending locks */
525 brl_tdb_notify_unlock(brl, locks, count, &removed_lock);
527 dbuf.dsize = count * sizeof(*locks);
529 if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
530 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
531 goto fail;
535 free(dbuf.dptr);
536 tdb_chainunlock(brl->w->tdb, kbuf);
537 return NT_STATUS_OK;
540 /* we didn't find it */
541 status = NT_STATUS_RANGE_NOT_LOCKED;
543 fail:
544 free(dbuf.dptr);
545 tdb_chainunlock(brl->w->tdb, kbuf);
546 return status;
551 remove a pending lock. This is called when the caller has either
552 given up trying to establish a lock or when they have succeeded in
553 getting it. In either case they no longer need to be notified.
555 static NTSTATUS brl_tdb_remove_pending(struct brl_context *brl,
556 struct brl_handle *brlh,
557 void *notify_ptr)
559 TDB_DATA kbuf, dbuf;
560 int count, i;
561 struct lock_struct *locks;
562 NTSTATUS status;
564 kbuf.dptr = brlh->key.data;
565 kbuf.dsize = brlh->key.length;
567 if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
568 return NT_STATUS_INTERNAL_DB_CORRUPTION;
571 dbuf = tdb_fetch_compat(brl->w->tdb, kbuf);
572 if (!dbuf.dptr) {
573 tdb_chainunlock(brl->w->tdb, kbuf);
574 return NT_STATUS_RANGE_NOT_LOCKED;
577 /* there are existing locks - find a match */
578 locks = (struct lock_struct *)dbuf.dptr;
579 count = dbuf.dsize / sizeof(*locks);
581 for (i=0; i<count; i++) {
582 struct lock_struct *lock = &locks[i];
584 if (lock->lock_type >= PENDING_READ_LOCK &&
585 lock->notify_ptr == notify_ptr &&
586 cluster_id_equal(&lock->context.server, &brl->server)) {
587 /* found it - delete it */
588 if (count == 1) {
589 if (tdb_delete(brl->w->tdb, kbuf) != 0) {
590 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
591 goto fail;
593 } else {
594 if (i < count-1) {
595 memmove(&locks[i], &locks[i+1],
596 sizeof(*locks)*((count-1) - i));
598 count--;
599 dbuf.dsize = count * sizeof(*locks);
600 if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
601 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
602 goto fail;
606 free(dbuf.dptr);
607 tdb_chainunlock(brl->w->tdb, kbuf);
608 return NT_STATUS_OK;
612 /* we didn't find it */
613 status = NT_STATUS_RANGE_NOT_LOCKED;
615 fail:
616 free(dbuf.dptr);
617 tdb_chainunlock(brl->w->tdb, kbuf);
618 return status;
623 Test if we are allowed to perform IO on a region of an open file
625 static NTSTATUS brl_tdb_locktest(struct brl_context *brl,
626 struct brl_handle *brlh,
627 uint32_t smbpid,
628 uint64_t start, uint64_t size,
629 enum brl_type lock_type)
631 TDB_DATA kbuf, dbuf;
632 int count, i;
633 struct lock_struct lock, *locks;
635 kbuf.dptr = brlh->key.data;
636 kbuf.dsize = brlh->key.length;
638 if (brl_invalid_lock_range(start, size)) {
639 return NT_STATUS_INVALID_LOCK_RANGE;
642 dbuf = tdb_fetch_compat(brl->w->tdb, kbuf);
643 if (dbuf.dptr == NULL) {
644 return NT_STATUS_OK;
647 lock.context.smbpid = smbpid;
648 lock.context.server = brl->server;
649 lock.context.ctx = brl;
650 lock.ntvfs = brlh->ntvfs;
651 lock.start = start;
652 lock.size = size;
653 lock.lock_type = lock_type;
655 /* there are existing locks - make sure they don't conflict */
656 locks = (struct lock_struct *)dbuf.dptr;
657 count = dbuf.dsize / sizeof(*locks);
659 for (i=0; i<count; i++) {
660 if (brl_tdb_conflict_other(&locks[i], &lock)) {
661 free(dbuf.dptr);
662 return NT_STATUS_FILE_LOCK_CONFLICT;
666 free(dbuf.dptr);
667 return NT_STATUS_OK;
672 Remove any locks associated with a open file.
674 static NTSTATUS brl_tdb_close(struct brl_context *brl,
675 struct brl_handle *brlh)
677 TDB_DATA kbuf, dbuf;
678 int count, i, dcount=0;
679 struct lock_struct *locks;
680 NTSTATUS status;
682 kbuf.dptr = brlh->key.data;
683 kbuf.dsize = brlh->key.length;
685 if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
686 return NT_STATUS_INTERNAL_DB_CORRUPTION;
689 dbuf = tdb_fetch_compat(brl->w->tdb, kbuf);
690 if (!dbuf.dptr) {
691 tdb_chainunlock(brl->w->tdb, kbuf);
692 return NT_STATUS_OK;
695 /* there are existing locks - remove any for this fnum */
696 locks = (struct lock_struct *)dbuf.dptr;
697 count = dbuf.dsize / sizeof(*locks);
699 for (i=0; i<count; i++) {
700 struct lock_struct *lock = &locks[i];
702 if (lock->context.ctx == brl &&
703 cluster_id_equal(&lock->context.server, &brl->server) &&
704 lock->ntvfs == brlh->ntvfs) {
705 /* found it - delete it */
706 if (count > 1 && i < count-1) {
707 memmove(&locks[i], &locks[i+1],
708 sizeof(*locks)*((count-1) - i));
710 count--;
711 i--;
712 dcount++;
716 status = NT_STATUS_OK;
718 if (count == 0) {
719 if (tdb_delete(brl->w->tdb, kbuf) != 0) {
720 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
722 } else if (dcount != 0) {
723 /* tell all pending lock holders for this file that
724 they have a chance now. This is a bit indiscriminant,
725 but works OK */
726 brl_tdb_notify_all(brl, locks, count);
728 dbuf.dsize = count * sizeof(*locks);
730 if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
731 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
735 free(dbuf.dptr);
736 tdb_chainunlock(brl->w->tdb, kbuf);
738 return status;
741 static NTSTATUS brl_tdb_count(struct brl_context *brl, struct brl_handle *brlh,
742 int *count)
744 TDB_DATA kbuf, dbuf;
746 kbuf.dptr = brlh->key.data;
747 kbuf.dsize = brlh->key.length;
748 *count = 0;
750 if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
751 return NT_STATUS_INTERNAL_DB_CORRUPTION;
754 dbuf = tdb_fetch_compat(brl->w->tdb, kbuf);
755 if (!dbuf.dptr) {
756 tdb_chainunlock(brl->w->tdb, kbuf);
757 return NT_STATUS_OK;
760 *count = dbuf.dsize / sizeof(struct lock_struct);
762 free(dbuf.dptr);
763 tdb_chainunlock(brl->w->tdb, kbuf);
765 return NT_STATUS_OK;
768 static const struct brlock_ops brlock_tdb_ops = {
769 .brl_init = brl_tdb_init,
770 .brl_create_handle = brl_tdb_create_handle,
771 .brl_lock = brl_tdb_lock,
772 .brl_unlock = brl_tdb_unlock,
773 .brl_remove_pending = brl_tdb_remove_pending,
774 .brl_locktest = brl_tdb_locktest,
775 .brl_close = brl_tdb_close,
776 .brl_count = brl_tdb_count
780 void brl_tdb_init_ops(void)
782 brlock_set_ops(&brlock_tdb_ops);