s3:dbwrap_watch: add dbwrap_watched_watch_reset_alerting() helper
[Samba.git] / source3 / lib / dbwrap / dbwrap_watch.c
blob375d2effd531e4c0864775a1497f3d190b121929
1 /*
2 Unix SMB/CIFS implementation.
3 Watch dbwrap record changes
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "lib/util/server_id.h"
23 #include "dbwrap/dbwrap.h"
24 #include "dbwrap_watch.h"
25 #include "dbwrap_open.h"
26 #include "lib/util/util_tdb.h"
27 #include "lib/util/tevent_ntstatus.h"
28 #include "serverid.h"
29 #include "server_id_watch.h"
30 #include "lib/dbwrap/dbwrap_private.h"
32 struct dbwrap_watcher {
34 * Process watching this record
36 struct server_id pid;
38 * Individual instance inside the waiter, incremented each
39 * time a watcher is created
41 uint64_t instance;
44 #define DBWRAP_WATCHER_BUF_LENGTH (SERVER_ID_BUF_LENGTH + sizeof(uint64_t))
45 #define DBWRAP_MAX_WATCHERS (INT32_MAX/DBWRAP_WATCHER_BUF_LENGTH)
48 * Watched records contain a header of:
50 * [uint32] num_records
51 * 0 [DBWRAP_WATCHER_BUF_LENGTH] \
52 * 1 [DBWRAP_WATCHER_BUF_LENGTH] |
53 * .. |- Array of watchers
54 * (num_records-1)[DBWRAP_WATCHER_BUF_LENGTH] /
56 * [Remainder of record....]
58 * If this header is absent then this is a
59 * fresh record of length zero (no watchers).
62 static bool dbwrap_watch_rec_parse(
63 TDB_DATA data,
64 uint8_t **pwatchers,
65 size_t *pnum_watchers,
66 TDB_DATA *pdata)
68 size_t num_watchers;
70 if (data.dsize == 0) {
71 /* Fresh record */
72 if (pwatchers != NULL) {
73 *pwatchers = NULL;
75 if (pnum_watchers != NULL) {
76 *pnum_watchers = 0;
78 if (pdata != NULL) {
79 *pdata = (TDB_DATA) { .dptr = NULL };
81 return true;
84 if (data.dsize < sizeof(uint32_t)) {
85 /* Invalid record */
86 return false;
89 num_watchers = IVAL(data.dptr, 0);
91 data.dptr += sizeof(uint32_t);
92 data.dsize -= sizeof(uint32_t);
94 if (num_watchers > data.dsize/DBWRAP_WATCHER_BUF_LENGTH) {
95 /* Invalid record */
96 return false;
99 if (pwatchers != NULL) {
100 *pwatchers = data.dptr;
102 if (pnum_watchers != NULL) {
103 *pnum_watchers = num_watchers;
105 if (pdata != NULL) {
106 size_t watchers_len = num_watchers * DBWRAP_WATCHER_BUF_LENGTH;
107 *pdata = (TDB_DATA) {
108 .dptr = data.dptr + watchers_len,
109 .dsize = data.dsize - watchers_len
113 return true;
116 static void dbwrap_watcher_get(struct dbwrap_watcher *w,
117 const uint8_t buf[DBWRAP_WATCHER_BUF_LENGTH])
119 server_id_get(&w->pid, buf);
120 w->instance = BVAL(buf, SERVER_ID_BUF_LENGTH);
123 static void dbwrap_watcher_put(uint8_t buf[DBWRAP_WATCHER_BUF_LENGTH],
124 const struct dbwrap_watcher *w)
126 server_id_put(buf, w->pid);
127 SBVAL(buf, SERVER_ID_BUF_LENGTH, w->instance);
130 static void dbwrap_watch_log_invalid_record(
131 struct db_context *db, TDB_DATA key, TDB_DATA value)
133 DBG_ERR("Found invalid record in %s\n", dbwrap_name(db));
134 dump_data(1, key.dptr, key.dsize);
135 dump_data(1, value.dptr, value.dsize);
138 struct db_watched_ctx {
139 struct db_context *backend;
140 struct messaging_context *msg;
143 struct db_watched_record {
144 struct db_record *rec;
145 struct server_id self;
146 struct {
147 struct db_record *rec;
148 TDB_DATA initial_value;
149 bool initial_valid;
150 } backend;
151 bool force_fini_store;
152 struct dbwrap_watcher added;
153 bool removed_first;
154 struct {
156 * The is the number of watcher records
157 * parsed from backend.initial_value
159 size_t count;
161 * This is the pointer to
162 * the optentially first watcher record
163 * parsed from backend.initial_value
165 * The pointer actually points to memory
166 * in backend.initial_value.
168 * Note it might be NULL, if count is 0.
170 uint8_t *first;
172 * This remembers if we already
173 * notified the watchers.
175 * As we only need to do that once during:
176 * do_locked
177 * or:
178 * between rec = fetch_locked
179 * and
180 * TALLOC_FREE(rec)
182 bool alerted;
183 } watchers;
184 struct {
185 struct dbwrap_watcher watcher;
186 } wakeup;
189 static struct db_watched_record *db_record_get_watched_record(struct db_record *rec)
192 * we can't use wrec = talloc_get_type_abort() here!
193 * because wrec is likely a stack variable in
194 * dbwrap_watched_do_locked_fn()
196 * In order to have a least some protection
197 * we verify the cross reference pointers
198 * between rec and wrec
200 struct db_watched_record *wrec =
201 (struct db_watched_record *)rec->private_data;
202 SMB_ASSERT(wrec->rec == rec);
203 return wrec;
206 static NTSTATUS dbwrap_watched_record_storev(
207 struct db_watched_record *wrec,
208 const TDB_DATA *dbufs, int num_dbufs, int flags);
209 static NTSTATUS dbwrap_watched_storev(struct db_record *rec,
210 const TDB_DATA *dbufs, int num_dbufs,
211 int flags);
212 static NTSTATUS dbwrap_watched_delete(struct db_record *rec);
213 static void dbwrap_watched_trigger_wakeup(struct messaging_context *msg_ctx,
214 struct dbwrap_watcher *watcher);
215 static int db_watched_record_destructor(struct db_watched_record *wrec);
217 static void db_watched_record_init(struct db_context *db,
218 struct messaging_context *msg_ctx,
219 struct db_record *rec,
220 struct db_watched_record *wrec,
221 struct db_record *backend_rec,
222 TDB_DATA backend_value)
224 bool ok;
226 *rec = (struct db_record) {
227 .db = db,
228 .key = dbwrap_record_get_key(backend_rec),
229 .storev = dbwrap_watched_storev,
230 .delete_rec = dbwrap_watched_delete,
231 .private_data = wrec,
234 *wrec = (struct db_watched_record) {
235 .rec = rec,
236 .self = messaging_server_id(msg_ctx),
237 .backend = {
238 .rec = backend_rec,
239 .initial_value = backend_value,
240 .initial_valid = true,
244 ok = dbwrap_watch_rec_parse(backend_value,
245 &wrec->watchers.first,
246 &wrec->watchers.count,
247 &rec->value);
248 if (!ok) {
249 dbwrap_watch_log_invalid_record(rec->db, rec->key, backend_value);
250 /* wipe invalid data */
251 rec->value = (TDB_DATA) { .dptr = NULL, .dsize = 0 };
255 static struct db_record *dbwrap_watched_fetch_locked(
256 struct db_context *db, TALLOC_CTX *mem_ctx, TDB_DATA key)
258 struct db_watched_ctx *ctx = talloc_get_type_abort(
259 db->private_data, struct db_watched_ctx);
260 struct db_record *rec = NULL;
261 struct db_watched_record *wrec = NULL;
262 struct db_record *backend_rec = NULL;
263 TDB_DATA backend_value = { .dptr = NULL, };
265 rec = talloc_zero(mem_ctx, struct db_record);
266 if (rec == NULL) {
267 return NULL;
269 wrec = talloc_zero(rec, struct db_watched_record);
270 if (wrec == NULL) {
271 TALLOC_FREE(rec);
272 return NULL;
275 backend_rec = dbwrap_fetch_locked(ctx->backend, wrec, key);
276 if (backend_rec == NULL) {
277 TALLOC_FREE(rec);
278 return NULL;
280 backend_value = dbwrap_record_get_value(backend_rec);
282 db_watched_record_init(db, ctx->msg,
283 rec, wrec,
284 backend_rec, backend_value);
285 rec->value_valid = true;
286 talloc_set_destructor(wrec, db_watched_record_destructor);
288 return rec;
291 struct db_watched_record_fini_state {
292 struct db_watched_record *wrec;
293 TALLOC_CTX *frame;
294 TDB_DATA dbufs[2];
295 int num_dbufs;
296 bool ok;
299 static void db_watched_record_fini_fetcher(TDB_DATA key,
300 TDB_DATA backend_value,
301 void *private_data)
303 struct db_watched_record_fini_state *state =
304 (struct db_watched_record_fini_state *)private_data;
305 struct db_watched_record *wrec = state->wrec;
306 struct db_record *rec = wrec->rec;
307 TDB_DATA value = {};
308 bool ok;
309 size_t copy_size;
312 * We're within dbwrap_parse_record()
313 * and backend_value directly points into
314 * the mmap'ed tdb, so we need to copy the
315 * parts we require.
318 ok = dbwrap_watch_rec_parse(backend_value, NULL, NULL, &value);
319 if (!ok) {
320 struct db_context *db = dbwrap_record_get_db(rec);
322 dbwrap_watch_log_invalid_record(db, key, backend_value);
324 /* wipe invalid data */
325 value = (TDB_DATA) { .dptr = NULL, .dsize = 0 };
328 copy_size = MIN(rec->value.dsize, value.dsize);
329 if (copy_size != 0) {
331 * First reuse the buffer we already had
332 * as much as we can.
334 memcpy(rec->value.dptr, value.dptr, copy_size);
335 state->dbufs[state->num_dbufs++] = rec->value;
336 value.dsize -= copy_size;
337 value.dptr += copy_size;
340 if (value.dsize != 0) {
341 uint8_t *p = NULL;
344 * There's still new data left
345 * allocate it on callers stackframe
347 p = talloc_memdup(state->frame, value.dptr, value.dsize);
348 if (p == NULL) {
349 DBG_WARNING("failed to allocate %zu bytes\n",
350 value.dsize);
351 return;
354 state->dbufs[state->num_dbufs++] = (TDB_DATA) {
355 .dptr = p, .dsize = value.dsize,
359 state->ok = true;
362 static void db_watched_record_fini(struct db_watched_record *wrec)
364 struct db_watched_record_fini_state state = { .wrec = wrec, };
365 struct db_context *backend = dbwrap_record_get_db(wrec->backend.rec);
366 struct db_record *rec = wrec->rec;
367 TDB_DATA key = dbwrap_record_get_key(wrec->backend.rec);
368 NTSTATUS status;
370 if (!wrec->force_fini_store) {
371 return;
374 if (wrec->backend.initial_valid) {
375 if (rec->value.dsize != 0) {
376 state.dbufs[state.num_dbufs++] = rec->value;
378 } else {
380 * We need to fetch the current
381 * value from the backend again,
382 * which may need to allocate memory
383 * on the provided stackframe.
386 state.frame = talloc_stackframe();
388 status = dbwrap_parse_record(backend, key,
389 db_watched_record_fini_fetcher, &state);
390 if (!NT_STATUS_IS_OK(status)) {
391 DBG_WARNING("dbwrap_parse_record failed: %s\n",
392 nt_errstr(status));
393 TALLOC_FREE(state.frame);
394 return;
396 if (!state.ok) {
397 TALLOC_FREE(state.frame);
398 return;
403 * We don't want to wake up others just because
404 * we added ourself as new watcher. But if we
405 * removed outself from the first position
406 * we need to alert the next one.
408 if (!wrec->removed_first) {
409 dbwrap_watched_watch_skip_alerting(rec);
412 status = dbwrap_watched_record_storev(wrec, state.dbufs, state.num_dbufs, 0);
413 TALLOC_FREE(state.frame);
414 if (!NT_STATUS_IS_OK(status)) {
415 DBG_WARNING("dbwrap_watched_record_storev failed: %s\n",
416 nt_errstr(status));
417 return;
420 return;
423 static int db_watched_record_destructor(struct db_watched_record *wrec)
425 struct db_record *rec = wrec->rec;
426 struct db_watched_ctx *ctx = talloc_get_type_abort(
427 rec->db->private_data, struct db_watched_ctx);
429 db_watched_record_fini(wrec);
430 TALLOC_FREE(wrec->backend.rec);
431 dbwrap_watched_trigger_wakeup(ctx->msg, &wrec->wakeup.watcher);
432 return 0;
435 struct dbwrap_watched_do_locked_state {
436 struct db_context *db;
437 struct messaging_context *msg_ctx;
438 struct db_watched_record *wrec;
439 struct db_record *rec;
440 void (*fn)(struct db_record *rec,
441 TDB_DATA value,
442 void *private_data);
443 void *private_data;
446 static void dbwrap_watched_do_locked_fn(
447 struct db_record *backend_rec,
448 TDB_DATA backend_value,
449 void *private_data)
451 struct dbwrap_watched_do_locked_state *state =
452 (struct dbwrap_watched_do_locked_state *)private_data;
454 db_watched_record_init(state->db, state->msg_ctx,
455 state->rec, state->wrec,
456 backend_rec, backend_value);
458 state->fn(state->rec, state->rec->value, state->private_data);
460 db_watched_record_fini(state->wrec);
463 static NTSTATUS dbwrap_watched_do_locked(struct db_context *db, TDB_DATA key,
464 void (*fn)(struct db_record *rec,
465 TDB_DATA value,
466 void *private_data),
467 void *private_data)
469 struct db_watched_ctx *ctx = talloc_get_type_abort(
470 db->private_data, struct db_watched_ctx);
471 struct db_watched_record wrec;
472 struct db_record rec;
473 struct dbwrap_watched_do_locked_state state = {
474 .db = db, .msg_ctx = ctx->msg,
475 .rec = &rec, .wrec = &wrec,
476 .fn = fn, .private_data = private_data,
478 NTSTATUS status;
480 status = dbwrap_do_locked(
481 ctx->backend, key, dbwrap_watched_do_locked_fn, &state);
482 if (!NT_STATUS_IS_OK(status)) {
483 DBG_DEBUG("dbwrap_do_locked returned %s\n", nt_errstr(status));
484 return status;
487 DBG_DEBUG("dbwrap_watched_do_locked_fn returned\n");
489 dbwrap_watched_trigger_wakeup(state.msg_ctx, &wrec.wakeup.watcher);
491 return NT_STATUS_OK;
494 static void dbwrap_watched_record_prepare_wakeup(
495 struct db_watched_record *wrec)
498 * Wakeup only needs to happen once (if at all)
500 if (wrec->watchers.alerted) {
501 /* already done */
502 return;
504 wrec->watchers.alerted = true;
506 if (wrec->watchers.count == 0) {
507 DBG_DEBUG("No watchers\n");
508 return;
511 while (wrec->watchers.count != 0) {
512 struct server_id_buf tmp;
513 bool exists;
515 dbwrap_watcher_get(&wrec->wakeup.watcher, wrec->watchers.first);
516 exists = serverid_exists(&wrec->wakeup.watcher.pid);
517 if (!exists) {
518 DBG_DEBUG("Discard non-existing waiter %s:%"PRIu64"\n",
519 server_id_str_buf(wrec->wakeup.watcher.pid, &tmp),
520 wrec->wakeup.watcher.instance);
521 wrec->watchers.first += DBWRAP_WATCHER_BUF_LENGTH;
522 wrec->watchers.count -= 1;
523 continue;
527 * We will only wakeup the first waiter, via
528 * dbwrap_watched_trigger_wakeup(), but keep
529 * all (including the first one) in the list that
530 * will be flushed back to the backend record
531 * again. Waiters are removing their entries
532 * via dbwrap_watched_watch_remove_instance()
533 * when they no longer want to monitor the record.
535 DBG_DEBUG("Will alert first waiter %s:%"PRIu64"\n",
536 server_id_str_buf(wrec->wakeup.watcher.pid, &tmp),
537 wrec->wakeup.watcher.instance);
538 break;
542 static void dbwrap_watched_trigger_wakeup(struct messaging_context *msg_ctx,
543 struct dbwrap_watcher *watcher)
545 struct server_id_buf tmp;
546 uint8_t instance_buf[8];
547 NTSTATUS status;
549 if (watcher->instance == 0) {
550 DBG_DEBUG("No one to wakeup\n");
551 return;
554 DBG_DEBUG("Alerting %s:%"PRIu64"\n",
555 server_id_str_buf(watcher->pid, &tmp),
556 watcher->instance);
558 SBVAL(instance_buf, 0, watcher->instance);
560 status = messaging_send_buf(
561 msg_ctx,
562 watcher->pid,
563 MSG_DBWRAP_MODIFIED,
564 instance_buf,
565 sizeof(instance_buf));
566 if (!NT_STATUS_IS_OK(status)) {
567 DBG_WARNING("messaging_send_buf to %s failed: %s - ignoring...\n",
568 server_id_str_buf(watcher->pid, &tmp),
569 nt_errstr(status));
573 static NTSTATUS dbwrap_watched_record_storev(
574 struct db_watched_record *wrec,
575 const TDB_DATA *dbufs, int num_dbufs, int flags)
577 uint8_t num_watchers_buf[4] = { 0 };
578 uint8_t add_buf[DBWRAP_WATCHER_BUF_LENGTH];
579 size_t num_store_watchers;
580 TDB_DATA my_dbufs[num_dbufs+3];
581 int num_my_dbufs = 0;
582 NTSTATUS status;
583 size_t add_count = 0;
585 dbwrap_watched_record_prepare_wakeup(wrec);
587 wrec->backend.initial_valid = false;
588 wrec->force_fini_store = false;
590 if (wrec->added.pid.pid != 0) {
591 dbwrap_watcher_put(add_buf, &wrec->added);
592 add_count = 1;
595 num_store_watchers = wrec->watchers.count + add_count;
596 if (num_store_watchers == 0 && num_dbufs == 0) {
597 status = dbwrap_record_delete(wrec->backend.rec);
598 return status;
600 if (num_store_watchers >= DBWRAP_MAX_WATCHERS) {
601 DBG_WARNING("Can't handle %zu watchers\n",
602 num_store_watchers);
603 return NT_STATUS_INSUFFICIENT_RESOURCES;
606 SIVAL(num_watchers_buf, 0, num_store_watchers);
608 my_dbufs[num_my_dbufs++] = (TDB_DATA) {
609 .dptr = num_watchers_buf, .dsize = sizeof(num_watchers_buf),
611 if (wrec->watchers.count != 0) {
612 my_dbufs[num_my_dbufs++] = (TDB_DATA) {
613 .dptr = wrec->watchers.first, .dsize = wrec->watchers.count * DBWRAP_WATCHER_BUF_LENGTH,
616 if (add_count != 0) {
617 my_dbufs[num_my_dbufs++] = (TDB_DATA) {
618 .dptr = add_buf,
619 .dsize = sizeof(add_buf),
622 if (num_dbufs != 0) {
623 memcpy(my_dbufs+num_my_dbufs, dbufs, num_dbufs * sizeof(*dbufs));
624 num_my_dbufs += num_dbufs;
627 SMB_ASSERT(num_my_dbufs <= ARRAY_SIZE(my_dbufs));
629 status = dbwrap_record_storev(
630 wrec->backend.rec, my_dbufs, num_my_dbufs, flags);
631 return status;
634 static NTSTATUS dbwrap_watched_storev(struct db_record *rec,
635 const TDB_DATA *dbufs, int num_dbufs,
636 int flags)
638 struct db_watched_record *wrec = db_record_get_watched_record(rec);
640 return dbwrap_watched_record_storev(wrec, dbufs, num_dbufs, flags);
643 static NTSTATUS dbwrap_watched_delete(struct db_record *rec)
645 struct db_watched_record *wrec = db_record_get_watched_record(rec);
648 * dbwrap_watched_record_storev() will figure out
649 * if the record should be deleted or if there are still
650 * watchers to be stored.
652 return dbwrap_watched_record_storev(wrec, NULL, 0, 0);
655 struct dbwrap_watched_traverse_state {
656 int (*fn)(struct db_record *rec, void *private_data);
657 void *private_data;
660 static int dbwrap_watched_traverse_fn(struct db_record *rec,
661 void *private_data)
663 struct dbwrap_watched_traverse_state *state = private_data;
664 struct db_record prec = *rec;
665 bool ok;
667 ok = dbwrap_watch_rec_parse(rec->value, NULL, NULL, &prec.value);
668 if (!ok) {
669 return 0;
671 if (prec.value.dsize == 0) {
672 return 0;
674 prec.value_valid = true;
676 return state->fn(&prec, state->private_data);
679 static int dbwrap_watched_traverse(struct db_context *db,
680 int (*fn)(struct db_record *rec,
681 void *private_data),
682 void *private_data)
684 struct db_watched_ctx *ctx = talloc_get_type_abort(
685 db->private_data, struct db_watched_ctx);
686 struct dbwrap_watched_traverse_state state = {
687 .fn = fn, .private_data = private_data };
688 NTSTATUS status;
689 int ret;
691 status = dbwrap_traverse(
692 ctx->backend, dbwrap_watched_traverse_fn, &state, &ret);
693 if (!NT_STATUS_IS_OK(status)) {
694 return -1;
696 return ret;
699 static int dbwrap_watched_traverse_read(struct db_context *db,
700 int (*fn)(struct db_record *rec,
701 void *private_data),
702 void *private_data)
704 struct db_watched_ctx *ctx = talloc_get_type_abort(
705 db->private_data, struct db_watched_ctx);
706 struct dbwrap_watched_traverse_state state = {
707 .fn = fn, .private_data = private_data };
708 NTSTATUS status;
709 int ret;
711 status = dbwrap_traverse_read(
712 ctx->backend, dbwrap_watched_traverse_fn, &state, &ret);
713 if (!NT_STATUS_IS_OK(status)) {
714 return -1;
716 return ret;
719 static int dbwrap_watched_get_seqnum(struct db_context *db)
721 struct db_watched_ctx *ctx = talloc_get_type_abort(
722 db->private_data, struct db_watched_ctx);
723 return dbwrap_get_seqnum(ctx->backend);
726 static int dbwrap_watched_transaction_start(struct db_context *db)
728 struct db_watched_ctx *ctx = talloc_get_type_abort(
729 db->private_data, struct db_watched_ctx);
730 return dbwrap_transaction_start(ctx->backend);
733 static int dbwrap_watched_transaction_commit(struct db_context *db)
735 struct db_watched_ctx *ctx = talloc_get_type_abort(
736 db->private_data, struct db_watched_ctx);
737 return dbwrap_transaction_commit(ctx->backend);
740 static int dbwrap_watched_transaction_cancel(struct db_context *db)
742 struct db_watched_ctx *ctx = talloc_get_type_abort(
743 db->private_data, struct db_watched_ctx);
744 return dbwrap_transaction_cancel(ctx->backend);
747 struct dbwrap_watched_parse_record_state {
748 struct db_context *db;
749 void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data);
750 void *private_data;
751 bool ok;
754 static void dbwrap_watched_parse_record_parser(TDB_DATA key, TDB_DATA data,
755 void *private_data)
757 struct dbwrap_watched_parse_record_state *state = private_data;
758 TDB_DATA userdata;
760 state->ok = dbwrap_watch_rec_parse(data, NULL, NULL, &userdata);
761 if (!state->ok) {
762 dbwrap_watch_log_invalid_record(state->db, key, data);
763 return;
766 state->parser(key, userdata, state->private_data);
769 static NTSTATUS dbwrap_watched_parse_record(
770 struct db_context *db, TDB_DATA key,
771 void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data),
772 void *private_data)
774 struct db_watched_ctx *ctx = talloc_get_type_abort(
775 db->private_data, struct db_watched_ctx);
776 struct dbwrap_watched_parse_record_state state = {
777 .db = db,
778 .parser = parser,
779 .private_data = private_data,
781 NTSTATUS status;
783 status = dbwrap_parse_record(
784 ctx->backend, key, dbwrap_watched_parse_record_parser, &state);
785 if (!NT_STATUS_IS_OK(status)) {
786 return status;
788 if (!state.ok) {
789 return NT_STATUS_NOT_FOUND;
791 return NT_STATUS_OK;
794 static void dbwrap_watched_parse_record_done(struct tevent_req *subreq);
796 static struct tevent_req *dbwrap_watched_parse_record_send(
797 TALLOC_CTX *mem_ctx,
798 struct tevent_context *ev,
799 struct db_context *db,
800 TDB_DATA key,
801 void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data),
802 void *private_data,
803 enum dbwrap_req_state *req_state)
805 struct db_watched_ctx *ctx = talloc_get_type_abort(
806 db->private_data, struct db_watched_ctx);
807 struct tevent_req *req = NULL;
808 struct tevent_req *subreq = NULL;
809 struct dbwrap_watched_parse_record_state *state = NULL;
811 req = tevent_req_create(mem_ctx, &state,
812 struct dbwrap_watched_parse_record_state);
813 if (req == NULL) {
814 *req_state = DBWRAP_REQ_ERROR;
815 return NULL;
818 *state = (struct dbwrap_watched_parse_record_state) {
819 .parser = parser,
820 .private_data = private_data,
821 .ok = true,
824 subreq = dbwrap_parse_record_send(state,
826 ctx->backend,
827 key,
828 dbwrap_watched_parse_record_parser,
829 state,
830 req_state);
831 if (tevent_req_nomem(subreq, req)) {
832 *req_state = DBWRAP_REQ_ERROR;
833 return tevent_req_post(req, ev);
836 tevent_req_set_callback(subreq, dbwrap_watched_parse_record_done, req);
837 return req;
840 static void dbwrap_watched_parse_record_done(struct tevent_req *subreq)
842 struct tevent_req *req = tevent_req_callback_data(
843 subreq, struct tevent_req);
844 struct dbwrap_watched_parse_record_state *state = tevent_req_data(
845 req, struct dbwrap_watched_parse_record_state);
846 NTSTATUS status;
848 status = dbwrap_parse_record_recv(subreq);
849 TALLOC_FREE(subreq);
850 if (tevent_req_nterror(req, status)) {
851 return;
854 if (!state->ok) {
855 tevent_req_nterror(req, NT_STATUS_NOT_FOUND);
856 return;
859 tevent_req_done(req);
860 return;
863 static NTSTATUS dbwrap_watched_parse_record_recv(struct tevent_req *req)
865 NTSTATUS status;
867 if (tevent_req_is_nterror(req, &status)) {
868 tevent_req_received(req);
869 return status;
872 tevent_req_received(req);
873 return NT_STATUS_OK;
876 static int dbwrap_watched_exists(struct db_context *db, TDB_DATA key)
878 struct db_watched_ctx *ctx = talloc_get_type_abort(
879 db->private_data, struct db_watched_ctx);
881 return dbwrap_exists(ctx->backend, key);
884 static size_t dbwrap_watched_id(struct db_context *db, uint8_t *id,
885 size_t idlen)
887 struct db_watched_ctx *ctx = talloc_get_type_abort(
888 db->private_data, struct db_watched_ctx);
890 return dbwrap_db_id(ctx->backend, id, idlen);
893 struct db_context *db_open_watched(TALLOC_CTX *mem_ctx,
894 struct db_context **backend,
895 struct messaging_context *msg)
897 struct db_context *db;
898 struct db_watched_ctx *ctx;
900 db = talloc_zero(mem_ctx, struct db_context);
901 if (db == NULL) {
902 return NULL;
904 ctx = talloc_zero(db, struct db_watched_ctx);
905 if (ctx == NULL) {
906 TALLOC_FREE(db);
907 return NULL;
909 db->private_data = ctx;
911 ctx->msg = msg;
913 ctx->backend = talloc_move(ctx, backend);
914 db->lock_order = ctx->backend->lock_order;
915 ctx->backend->lock_order = DBWRAP_LOCK_ORDER_NONE;
917 db->fetch_locked = dbwrap_watched_fetch_locked;
918 db->do_locked = dbwrap_watched_do_locked;
919 db->traverse = dbwrap_watched_traverse;
920 db->traverse_read = dbwrap_watched_traverse_read;
921 db->get_seqnum = dbwrap_watched_get_seqnum;
922 db->transaction_start = dbwrap_watched_transaction_start;
923 db->transaction_commit = dbwrap_watched_transaction_commit;
924 db->transaction_cancel = dbwrap_watched_transaction_cancel;
925 db->parse_record = dbwrap_watched_parse_record;
926 db->parse_record_send = dbwrap_watched_parse_record_send;
927 db->parse_record_recv = dbwrap_watched_parse_record_recv;
928 db->exists = dbwrap_watched_exists;
929 db->id = dbwrap_watched_id;
930 db->name = dbwrap_name(ctx->backend);
932 return db;
935 uint64_t dbwrap_watched_watch_add_instance(struct db_record *rec)
937 struct db_watched_record *wrec = db_record_get_watched_record(rec);
938 static uint64_t global_instance = 1;
940 SMB_ASSERT(wrec->added.instance == 0);
942 wrec->added = (struct dbwrap_watcher) {
943 .pid = wrec->self,
944 .instance = global_instance++,
947 wrec->force_fini_store = true;
949 return wrec->added.instance;
952 void dbwrap_watched_watch_remove_instance(struct db_record *rec, uint64_t instance)
954 struct db_watched_record *wrec = db_record_get_watched_record(rec);
955 struct dbwrap_watcher clear_watcher = {
956 .pid = wrec->self,
957 .instance = instance,
959 size_t i;
960 struct server_id_buf buf;
962 if (instance == 0) {
963 return;
966 if (wrec->added.instance == instance) {
967 SMB_ASSERT(server_id_equal(&wrec->added.pid, &wrec->self));
968 DBG_DEBUG("Watcher %s:%"PRIu64" reverted from adding\n",
969 server_id_str_buf(clear_watcher.pid, &buf),
970 clear_watcher.instance);
971 ZERO_STRUCT(wrec->added);
974 for (i=0; i < wrec->watchers.count; i++) {
975 struct dbwrap_watcher watcher;
976 size_t off = i*DBWRAP_WATCHER_BUF_LENGTH;
977 size_t next_off;
978 size_t full_len;
979 size_t move_len;
981 dbwrap_watcher_get(&watcher, wrec->watchers.first + off);
983 if (clear_watcher.instance != watcher.instance) {
984 continue;
986 if (!server_id_equal(&clear_watcher.pid, &watcher.pid)) {
987 continue;
990 wrec->force_fini_store = true;
992 if (i == 0) {
993 DBG_DEBUG("Watcher %s:%"PRIu64" removed from first position of %zu\n",
994 server_id_str_buf(clear_watcher.pid, &buf),
995 clear_watcher.instance,
996 wrec->watchers.count);
997 wrec->watchers.first += DBWRAP_WATCHER_BUF_LENGTH;
998 wrec->watchers.count -= 1;
999 wrec->removed_first = true;
1000 return;
1002 if (i == (wrec->watchers.count-1)) {
1003 DBG_DEBUG("Watcher %s:%"PRIu64" removed from last position of %zu\n",
1004 server_id_str_buf(clear_watcher.pid, &buf),
1005 clear_watcher.instance,
1006 wrec->watchers.count);
1007 wrec->watchers.count -= 1;
1008 return;
1011 DBG_DEBUG("Watcher %s:%"PRIu64" cleared at position %zu from %zu\n",
1012 server_id_str_buf(clear_watcher.pid, &buf),
1013 clear_watcher.instance, i+1,
1014 wrec->watchers.count);
1016 next_off = off + DBWRAP_WATCHER_BUF_LENGTH;
1017 full_len = wrec->watchers.count * DBWRAP_WATCHER_BUF_LENGTH;
1018 move_len = full_len - next_off;
1019 memmove(wrec->watchers.first + off,
1020 wrec->watchers.first + next_off,
1021 move_len);
1022 wrec->watchers.count -= 1;
1023 return;
1026 DBG_DEBUG("Watcher %s:%"PRIu64" not found in %zu watchers\n",
1027 server_id_str_buf(clear_watcher.pid, &buf),
1028 clear_watcher.instance,
1029 wrec->watchers.count);
1030 return;
1033 void dbwrap_watched_watch_skip_alerting(struct db_record *rec)
1035 struct db_watched_record *wrec = db_record_get_watched_record(rec);
1037 wrec->wakeup.watcher = (struct dbwrap_watcher) { .instance = 0, };
1038 wrec->watchers.alerted = true;
1041 void dbwrap_watched_watch_reset_alerting(struct db_record *rec)
1043 struct db_watched_record *wrec = db_record_get_watched_record(rec);
1045 wrec->wakeup.watcher = (struct dbwrap_watcher) { .instance = 0, };
1046 wrec->watchers.alerted = false;
1049 struct dbwrap_watched_watch_state {
1050 struct db_context *db;
1051 TDB_DATA key;
1052 struct dbwrap_watcher watcher;
1053 struct server_id blocker;
1054 bool blockerdead;
1057 static bool dbwrap_watched_msg_filter(struct messaging_rec *rec,
1058 void *private_data);
1059 static void dbwrap_watched_watch_done(struct tevent_req *subreq);
1060 static void dbwrap_watched_watch_blocker_died(struct tevent_req *subreq);
1061 static int dbwrap_watched_watch_state_destructor(
1062 struct dbwrap_watched_watch_state *state);
1064 struct tevent_req *dbwrap_watched_watch_send(TALLOC_CTX *mem_ctx,
1065 struct tevent_context *ev,
1066 struct db_record *rec,
1067 uint64_t resumed_instance,
1068 struct server_id blocker)
1070 struct db_context *db = dbwrap_record_get_db(rec);
1071 struct db_watched_ctx *ctx = talloc_get_type_abort(
1072 db->private_data, struct db_watched_ctx);
1073 struct db_watched_record *wrec = db_record_get_watched_record(rec);
1074 struct tevent_req *req, *subreq;
1075 struct dbwrap_watched_watch_state *state;
1076 uint64_t instance;
1078 req = tevent_req_create(mem_ctx, &state,
1079 struct dbwrap_watched_watch_state);
1080 if (req == NULL) {
1081 return NULL;
1083 state->db = db;
1084 state->blocker = blocker;
1086 if (ctx->msg == NULL) {
1087 tevent_req_nterror(req, NT_STATUS_NOT_SUPPORTED);
1088 return tevent_req_post(req, ev);
1091 if (resumed_instance == 0 && wrec->added.instance == 0) {
1093 * Adding a new instance
1095 instance = dbwrap_watched_watch_add_instance(rec);
1096 } else if (resumed_instance != 0 && wrec->added.instance == 0) {
1098 * Resuming an existing instance that was
1099 * already present before do_locked started
1101 instance = resumed_instance;
1102 } else if (resumed_instance == wrec->added.instance) {
1104 * The caller used dbwrap_watched_watch_add_instance()
1105 * already during this do_locked() invocation.
1107 instance = resumed_instance;
1108 } else {
1109 tevent_req_nterror(req, NT_STATUS_REQUEST_NOT_ACCEPTED);
1110 return tevent_req_post(req, ev);
1113 state->watcher = (struct dbwrap_watcher) {
1114 .pid = messaging_server_id(ctx->msg),
1115 .instance = instance,
1118 state->key = tdb_data_talloc_copy(state, rec->key);
1119 if (tevent_req_nomem(state->key.dptr, req)) {
1120 return tevent_req_post(req, ev);
1123 subreq = messaging_filtered_read_send(
1124 state, ev, ctx->msg, dbwrap_watched_msg_filter, state);
1125 if (tevent_req_nomem(subreq, req)) {
1126 return tevent_req_post(req, ev);
1128 tevent_req_set_callback(subreq, dbwrap_watched_watch_done, req);
1130 talloc_set_destructor(state, dbwrap_watched_watch_state_destructor);
1132 if (blocker.pid != 0) {
1133 subreq = server_id_watch_send(state, ev, blocker);
1134 if (tevent_req_nomem(subreq, req)) {
1135 return tevent_req_post(req, ev);
1137 tevent_req_set_callback(
1138 subreq, dbwrap_watched_watch_blocker_died, req);
1141 return req;
1144 static void dbwrap_watched_watch_blocker_died(struct tevent_req *subreq)
1146 struct tevent_req *req = tevent_req_callback_data(
1147 subreq, struct tevent_req);
1148 struct dbwrap_watched_watch_state *state = tevent_req_data(
1149 req, struct dbwrap_watched_watch_state);
1150 int ret;
1152 ret = server_id_watch_recv(subreq, NULL);
1153 TALLOC_FREE(subreq);
1154 if (ret != 0) {
1155 tevent_req_nterror(req, map_nt_error_from_unix(ret));
1156 return;
1158 state->blockerdead = true;
1159 tevent_req_done(req);
1162 static void dbwrap_watched_watch_state_destructor_fn(
1163 struct db_record *rec,
1164 TDB_DATA value,
1165 void *private_data)
1167 struct dbwrap_watched_watch_state *state = talloc_get_type_abort(
1168 private_data, struct dbwrap_watched_watch_state);
1171 * Here we just remove ourself from the in memory
1172 * watchers array and let db_watched_record_fini()
1173 * call dbwrap_watched_record_storev() to do the magic
1174 * of writing back the modified in memory copy.
1176 dbwrap_watched_watch_remove_instance(rec, state->watcher.instance);
1177 return;
1180 static int dbwrap_watched_watch_state_destructor(
1181 struct dbwrap_watched_watch_state *state)
1183 NTSTATUS status;
1185 status = dbwrap_do_locked(
1186 state->db,
1187 state->key,
1188 dbwrap_watched_watch_state_destructor_fn,
1189 state);
1190 if (!NT_STATUS_IS_OK(status)) {
1191 DBG_WARNING("dbwrap_do_locked failed: %s\n",
1192 nt_errstr(status));
1194 return 0;
1197 static bool dbwrap_watched_msg_filter(struct messaging_rec *rec,
1198 void *private_data)
1200 struct dbwrap_watched_watch_state *state = talloc_get_type_abort(
1201 private_data, struct dbwrap_watched_watch_state);
1202 uint64_t instance;
1204 if (rec->msg_type != MSG_DBWRAP_MODIFIED) {
1205 return false;
1207 if (rec->num_fds != 0) {
1208 return false;
1211 if (rec->buf.length != sizeof(instance)) {
1212 DBG_DEBUG("Got size %zu, expected %zu\n",
1213 rec->buf.length,
1214 sizeof(instance));
1215 return false;
1218 instance = BVAL(rec->buf.data, 0);
1220 if (instance != state->watcher.instance) {
1221 DBG_DEBUG("Got instance %"PRIu64", expected %"PRIu64"\n",
1222 instance,
1223 state->watcher.instance);
1224 return false;
1227 return true;
1230 static void dbwrap_watched_watch_done(struct tevent_req *subreq)
1232 struct tevent_req *req = tevent_req_callback_data(
1233 subreq, struct tevent_req);
1234 struct dbwrap_watched_watch_state *state = tevent_req_data(
1235 req, struct dbwrap_watched_watch_state);
1236 struct messaging_rec *rec;
1237 int ret;
1239 ret = messaging_filtered_read_recv(subreq, state, &rec);
1240 TALLOC_FREE(subreq);
1241 if (ret != 0) {
1242 tevent_req_nterror(req, map_nt_error_from_unix(ret));
1243 return;
1245 tevent_req_done(req);
1248 NTSTATUS dbwrap_watched_watch_recv(struct tevent_req *req,
1249 uint64_t *pkeep_instance,
1250 bool *blockerdead,
1251 struct server_id *blocker)
1253 struct dbwrap_watched_watch_state *state = tevent_req_data(
1254 req, struct dbwrap_watched_watch_state);
1255 NTSTATUS status;
1257 if (tevent_req_is_nterror(req, &status)) {
1258 tevent_req_received(req);
1259 return status;
1261 if (pkeep_instance != NULL) {
1262 *pkeep_instance = state->watcher.instance;
1264 * No need to remove ourselves anymore,
1265 * the caller will take care of removing itself.
1267 talloc_set_destructor(state, NULL);
1269 if (blockerdead != NULL) {
1270 *blockerdead = state->blockerdead;
1272 if (blocker != NULL) {
1273 *blocker = state->blocker;
1275 tevent_req_received(req);
1276 return NT_STATUS_OK;