smbd: Don't start the notify cleanup anymore
[Samba.git] / source3 / smbd / notify_internal.c
blobdc50d4f2b2543550ad0eab003d0016c013128038
1 /*
2 Unix SMB/CIFS implementation.
4 Copyright (C) Andrew Tridgell 2006
5 Copyright (C) Volker Lendecke 2012
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 this is the change notify database. It implements mechanisms for
23 storing current change notify waiters in a tdb, and checking if a
24 given event matches any of the stored notify waiters.
27 #include "includes.h"
28 #include "system/filesys.h"
29 #include "librpc/gen_ndr/ndr_notify.h"
30 #include "dbwrap/dbwrap.h"
31 #include "dbwrap/dbwrap_open.h"
32 #include "dbwrap/dbwrap_tdb.h"
33 #include "smbd/smbd.h"
34 #include "messages.h"
35 #include "lib/tdb_wrap/tdb_wrap.h"
36 #include "util_tdb.h"
37 #include "lib/param/param.h"
38 #include "lib/dbwrap/dbwrap_cache.h"
39 #include "ctdb_srvids.h"
40 #include "ctdbd_conn.h"
41 #include "ctdb_conn.h"
42 #include "lib/util/tevent_unix.h"
44 struct notify_list {
45 struct notify_list *next, *prev;
46 const char *path;
47 void (*callback)(void *, struct timespec, const struct notify_event *);
48 void *private_data;
51 struct notify_context {
52 struct messaging_context *msg;
53 struct notify_list *list;
56 * The notify database is split up into two databases: One
57 * relatively static index db and the real notify db with the
58 * volatile entries.
62 * "db_notify" is indexed by pathname. Per record it stores an
63 * array of notify_db_entry structs. These represent the
64 * notify records as requested by the smb client. This
65 * database is always held locally, it is never clustered.
67 struct db_context *db_notify;
70 * "db_index" is indexed by pathname. The records are an array
71 * of VNNs which have any interest in notifies for this path
72 * name.
74 * In the non-clustered case this database is cached in RAM by
75 * means of db_cache_open, which maintains a cache per
76 * process. Cache consistency is maintained by the tdb
77 * sequence number.
79 * In the clustered case right now we can not use the tdb
80 * sequence number, but by means of read only records we
81 * should be able to avoid a lot of full migrations.
83 * In both cases, it is important to keep the update
84 * operations to db_index to a minimum. This is achieved by
85 * delayed deletion. When a db_notify is initially created,
86 * the db_index record is also created. When more notifies are
87 * added for a path, then only the db_notify record needs to be
88 * modified, the db_index record is not touched. When the last
89 * entry from the db_notify record is deleted, the db_index
90 * record is not immediately deleted. Instead, the db_notify
91 * record is replaced with a current timestamp. A regular
92 * cleanup process will delete all db_index records that are
93 * older than a minute.
95 struct db_context *db_index;
98 static void notify_trigger_local(struct notify_context *notify,
99 uint32_t action, uint32_t filter,
100 const char *path, size_t path_len,
101 bool recursive);
102 static NTSTATUS notify_send(struct notify_context *notify,
103 struct server_id *pid,
104 const char *path, uint32_t action,
105 void *private_data);
106 static NTSTATUS notify_add_entry(struct db_record *rec,
107 const struct notify_db_entry *e,
108 bool *p_add_idx);
109 static NTSTATUS notify_add_idx(struct db_record *rec, uint32_t vnn);
111 static NTSTATUS notify_del_entry(struct db_record *rec,
112 const struct server_id *pid,
113 void *private_data);
114 static NTSTATUS notify_del_idx(struct db_record *rec, uint32_t vnn);
116 static int notify_context_destructor(struct notify_context *notify);
118 static void notify_handler(struct messaging_context *msg_ctx,
119 void *private_data, uint32_t msg_type,
120 struct server_id server_id, DATA_BLOB *data);
122 struct notify_context *notify_init(TALLOC_CTX *mem_ctx,
123 struct messaging_context *msg,
124 struct tevent_context *ev)
126 struct loadparm_context *lp_ctx;
127 struct notify_context *notify;
128 char *db_path;
130 notify = talloc(mem_ctx, struct notify_context);
131 if (notify == NULL) {
132 goto fail;
134 notify->msg = msg;
135 notify->list = NULL;
137 lp_ctx = loadparm_init_s3(notify, loadparm_s3_helpers());
139 db_path = lock_path("notify.tdb");
140 if (db_path == NULL) {
141 goto fail;
144 notify->db_notify = db_open_tdb(
145 notify, lp_ctx, db_path,
146 0, TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
147 O_RDWR|O_CREAT, 0644, DBWRAP_LOCK_ORDER_2, DBWRAP_FLAG_NONE);
148 talloc_unlink(notify, lp_ctx);
149 TALLOC_FREE(db_path);
150 if (notify->db_notify == NULL) {
151 goto fail;
154 db_path = lock_path("notify_index.tdb");
155 if (db_path == NULL) {
156 goto fail;
159 notify->db_index = db_open(
160 notify, db_path,
161 0, TDB_SEQNUM|TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
162 O_RDWR|O_CREAT, 0644, DBWRAP_LOCK_ORDER_3, DBWRAP_FLAG_NONE);
163 TALLOC_FREE(db_path);
164 if (notify->db_index == NULL) {
165 goto fail;
167 if (!lp_clustering()) {
168 notify->db_index = db_open_cache(notify, notify->db_index);
169 if (notify->db_index == NULL) {
170 goto fail;
174 if (notify->msg != NULL) {
175 NTSTATUS status;
177 status = messaging_register(notify->msg, notify,
178 MSG_PVFS_NOTIFY, notify_handler);
179 if (!NT_STATUS_IS_OK(status)) {
180 DEBUG(1, ("messaging_register returned %s\n",
181 nt_errstr(status)));
182 goto fail;
186 talloc_set_destructor(notify, notify_context_destructor);
188 return notify;
189 fail:
190 TALLOC_FREE(notify);
191 return NULL;
194 static int notify_context_destructor(struct notify_context *notify)
196 DEBUG(10, ("notify_context_destructor called\n"));
198 if (notify->msg != NULL) {
199 messaging_deregister(notify->msg, MSG_PVFS_NOTIFY, notify);
202 while (notify->list != NULL) {
203 DEBUG(10, ("Removing private_data=%p\n",
204 notify->list->private_data));
205 notify_remove(notify, notify->list->private_data);
207 return 0;
210 NTSTATUS notify_add(struct notify_context *notify,
211 const char *path, uint32_t filter, uint32_t subdir_filter,
212 void (*callback)(void *, struct timespec,
213 const struct notify_event *),
214 void *private_data)
216 struct notify_db_entry e;
217 struct notify_list *listel;
218 struct db_record *notify_rec, *idx_rec;
219 bool add_idx;
220 NTSTATUS status;
221 TDB_DATA key, notify_copy;
223 if (notify == NULL) {
224 return NT_STATUS_NOT_IMPLEMENTED;
227 DEBUG(10, ("notify_add: path=[%s], private_data=%p\n", path,
228 private_data));
230 listel = talloc(notify, struct notify_list);
231 if (listel == NULL) {
232 return NT_STATUS_NO_MEMORY;
234 listel->callback = callback;
235 listel->private_data = private_data;
236 listel->path = talloc_strdup(listel, path);
237 if (listel->path == NULL) {
238 TALLOC_FREE(listel);
239 return NT_STATUS_NO_MEMORY;
241 DLIST_ADD(notify->list, listel);
243 ZERO_STRUCT(e);
244 e.filter = filter;
245 e.subdir_filter = subdir_filter;
246 e.server = messaging_server_id(notify->msg);
247 e.private_data = private_data;
249 key = string_tdb_data(path);
251 notify_rec = dbwrap_fetch_locked(notify->db_notify,
252 talloc_tos(), key);
253 if (notify_rec == NULL) {
254 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
255 goto fail;
259 * Make a copy of the notify_rec for easy restore in case
260 * updating the index_rec fails;
262 notify_copy = dbwrap_record_get_value(notify_rec);
263 if (notify_copy.dsize != 0) {
264 notify_copy.dptr = (uint8_t *)talloc_memdup(
265 notify_rec, notify_copy.dptr,
266 notify_copy.dsize);
267 if (notify_copy.dptr == NULL) {
268 TALLOC_FREE(notify_rec);
269 status = NT_STATUS_NO_MEMORY;
270 goto fail;
274 if (DEBUGLEVEL >= 10) {
275 NDR_PRINT_DEBUG(notify_db_entry, &e);
278 status = notify_add_entry(notify_rec, &e, &add_idx);
279 if (!NT_STATUS_IS_OK(status)) {
280 goto fail;
282 if (!add_idx) {
284 * Someone else has added the idx entry already
286 TALLOC_FREE(notify_rec);
287 return NT_STATUS_OK;
290 idx_rec = dbwrap_fetch_locked(notify->db_index,
291 talloc_tos(), key);
292 if (idx_rec == NULL) {
293 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
294 goto restore_notify;
296 status = notify_add_idx(idx_rec, get_my_vnn());
297 if (!NT_STATUS_IS_OK(status)) {
298 goto restore_notify;
301 TALLOC_FREE(idx_rec);
302 TALLOC_FREE(notify_rec);
303 return NT_STATUS_OK;
305 restore_notify:
306 if (notify_copy.dsize != 0) {
307 dbwrap_record_store(notify_rec, notify_copy, 0);
308 } else {
309 dbwrap_record_delete(notify_rec);
311 TALLOC_FREE(notify_rec);
312 fail:
313 DLIST_REMOVE(notify->list, listel);
314 TALLOC_FREE(listel);
315 return status;
318 static NTSTATUS notify_add_entry(struct db_record *rec,
319 const struct notify_db_entry *e,
320 bool *p_add_idx)
322 TDB_DATA value = dbwrap_record_get_value(rec);
323 struct notify_db_entry *entries;
324 size_t num_entries;
325 bool add_idx = true;
326 NTSTATUS status;
328 if (value.dsize == sizeof(time_t)) {
329 DEBUG(10, ("Re-using deleted entry\n"));
330 value.dsize = 0;
331 add_idx = false;
334 if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
335 DEBUG(1, ("Invalid value.dsize = %u\n",
336 (unsigned)value.dsize));
337 return NT_STATUS_INTERNAL_DB_CORRUPTION;
339 num_entries = value.dsize / sizeof(struct notify_db_entry);
341 if (num_entries != 0) {
342 add_idx = false;
345 entries = talloc_array(rec, struct notify_db_entry, num_entries + 1);
346 if (entries == NULL) {
347 return NT_STATUS_NO_MEMORY;
349 memcpy(entries, value.dptr, value.dsize);
351 entries[num_entries] = *e;
352 value = make_tdb_data((uint8_t *)entries, talloc_get_size(entries));
353 status = dbwrap_record_store(rec, value, 0);
354 TALLOC_FREE(entries);
355 if (!NT_STATUS_IS_OK(status)) {
356 return status;
358 *p_add_idx = add_idx;
359 return NT_STATUS_OK;
362 static NTSTATUS notify_add_idx(struct db_record *rec, uint32_t vnn)
364 TDB_DATA value = dbwrap_record_get_value(rec);
365 uint32_t *vnns;
366 size_t i, num_vnns;
367 NTSTATUS status;
369 if ((value.dsize % sizeof(uint32_t)) != 0) {
370 DEBUG(1, ("Invalid value.dsize = %u\n",
371 (unsigned)value.dsize));
372 return NT_STATUS_INTERNAL_DB_CORRUPTION;
374 num_vnns = value.dsize / sizeof(uint32_t);
375 vnns = (uint32_t *)value.dptr;
377 for (i=0; i<num_vnns; i++) {
378 if (vnns[i] == vnn) {
379 return NT_STATUS_OK;
381 if (vnns[i] > vnn) {
382 break;
386 value.dptr = (uint8_t *)talloc_realloc(
387 rec, value.dptr, uint32_t, num_vnns + 1);
388 if (value.dptr == NULL) {
389 return NT_STATUS_NO_MEMORY;
391 value.dsize = talloc_get_size(value.dptr);
393 vnns = (uint32_t *)value.dptr;
395 memmove(&vnns[i+1], &vnns[i], sizeof(uint32_t) * (num_vnns - i));
396 vnns[i] = vnn;
398 status = dbwrap_record_store(rec, value, 0);
399 if (!NT_STATUS_IS_OK(status)) {
400 return status;
402 return NT_STATUS_OK;
405 NTSTATUS notify_remove(struct notify_context *notify, void *private_data)
407 struct server_id pid;
408 struct notify_list *listel;
409 struct db_record *notify_rec;
410 NTSTATUS status;
412 if ((notify == NULL) || (notify->msg == NULL)) {
413 return NT_STATUS_NOT_IMPLEMENTED;
416 DEBUG(10, ("notify_remove: private_data=%p\n", private_data));
418 pid = messaging_server_id(notify->msg);
420 for (listel=notify->list;listel;listel=listel->next) {
421 if (listel->private_data == private_data) {
422 DLIST_REMOVE(notify->list, listel);
423 break;
426 if (listel == NULL) {
427 DEBUG(10, ("%p not found\n", private_data));
428 return NT_STATUS_NOT_FOUND;
430 notify_rec = dbwrap_fetch_locked(notify->db_notify, talloc_tos(),
431 string_tdb_data(listel->path));
432 TALLOC_FREE(listel);
433 if (notify_rec == NULL) {
434 return NT_STATUS_INTERNAL_DB_CORRUPTION;
436 status = notify_del_entry(notify_rec, &pid, private_data);
437 DEBUG(10, ("del_entry returned %s\n", nt_errstr(status)));
438 TALLOC_FREE(notify_rec);
439 return status;
442 static NTSTATUS notify_del_entry(struct db_record *rec,
443 const struct server_id *pid,
444 void *private_data)
446 TDB_DATA value = dbwrap_record_get_value(rec);
447 struct server_id_buf tmp;
448 struct notify_db_entry *entries;
449 size_t i, num_entries;
450 time_t now;
452 DEBUG(10, ("del_entry called for %s %p\n",
453 server_id_str_buf(*pid, &tmp), private_data));
455 if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
456 DEBUG(1, ("Invalid value.dsize = %u\n",
457 (unsigned)value.dsize));
458 return NT_STATUS_INTERNAL_DB_CORRUPTION;
460 num_entries = value.dsize / sizeof(struct notify_db_entry);
461 entries = (struct notify_db_entry *)value.dptr;
463 for (i=0; i<num_entries; i++) {
464 struct notify_db_entry *e = &entries[i];
466 if (DEBUGLEVEL >= 10) {
467 NDR_PRINT_DEBUG(notify_db_entry, e);
470 if (e->private_data != private_data) {
471 continue;
473 if (serverid_equal(&e->server, pid)) {
474 break;
477 if (i == num_entries) {
478 return NT_STATUS_NOT_FOUND;
480 entries[i] = entries[num_entries-1];
481 value.dsize -= sizeof(struct notify_db_entry);
483 if (value.dsize == 0) {
484 now = time(NULL);
485 value.dptr = (uint8_t *)&now;
486 value.dsize = sizeof(now);
488 return dbwrap_record_store(rec, value, 0);
491 struct notify_trigger_index_state {
492 TALLOC_CTX *mem_ctx;
493 uint32_t *vnns;
494 uint32_t my_vnn;
495 bool found_my_vnn;
498 static void notify_trigger_index_parser(TDB_DATA key, TDB_DATA data,
499 void *private_data)
501 struct notify_trigger_index_state *state =
502 (struct notify_trigger_index_state *)private_data;
503 uint32_t *new_vnns;
504 size_t i, num_vnns, num_new_vnns, num_remote_vnns;
506 if ((data.dsize % sizeof(uint32_t)) != 0) {
507 DEBUG(1, ("Invalid record size in notify index db: %u\n",
508 (unsigned)data.dsize));
509 return;
511 new_vnns = (uint32_t *)data.dptr;
512 num_new_vnns = data.dsize / sizeof(uint32_t);
513 num_remote_vnns = num_new_vnns;
515 for (i=0; i<num_new_vnns; i++) {
516 if (new_vnns[i] == state->my_vnn) {
517 state->found_my_vnn = true;
518 num_remote_vnns -= 1;
521 if (num_remote_vnns == 0) {
522 return;
525 num_vnns = talloc_array_length(state->vnns);
526 state->vnns = talloc_realloc(state->mem_ctx, state->vnns, uint32_t,
527 num_vnns + num_remote_vnns);
528 if (state->vnns == NULL) {
529 DEBUG(1, ("talloc_realloc failed\n"));
530 return;
533 for (i=0; i<num_new_vnns; i++) {
534 if (new_vnns[i] != state->my_vnn) {
535 state->vnns[num_vnns] = new_vnns[i];
536 num_vnns += 1;
541 static int vnn_cmp(const void *p1, const void *p2)
543 const uint32_t *vnn1 = (const uint32_t *)p1;
544 const uint32_t *vnn2 = (const uint32_t *)p2;
546 if (*vnn1 < *vnn2) {
547 return -1;
549 if (*vnn1 == *vnn2) {
550 return 0;
552 return 1;
555 static bool notify_push_remote_blob(TALLOC_CTX *mem_ctx, uint32_t action,
556 uint32_t filter, const char *path,
557 uint8_t **pblob, size_t *pblob_len)
559 struct notify_remote_event ev;
560 DATA_BLOB data;
561 enum ndr_err_code ndr_err;
563 ev.action = action;
564 ev.filter = filter;
565 ev.path = path;
567 if (DEBUGLEVEL >= 10) {
568 NDR_PRINT_DEBUG(notify_remote_event, &ev);
571 ndr_err = ndr_push_struct_blob(
572 &data, mem_ctx, &ev,
573 (ndr_push_flags_fn_t)ndr_push_notify_remote_event);
574 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
575 return false;
577 *pblob = data.data;
578 *pblob_len = data.length;
579 return true;
582 static bool notify_pull_remote_blob(TALLOC_CTX *mem_ctx,
583 const uint8_t *blob, size_t blob_len,
584 uint32_t *paction, uint32_t *pfilter,
585 char **path)
587 struct notify_remote_event *ev;
588 enum ndr_err_code ndr_err;
589 DATA_BLOB data;
590 char *p;
592 data.data = discard_const_p(uint8_t, blob);
593 data.length = blob_len;
595 ev = talloc(mem_ctx, struct notify_remote_event);
596 if (ev == NULL) {
597 return false;
600 ndr_err = ndr_pull_struct_blob(
601 &data, ev, ev,
602 (ndr_pull_flags_fn_t)ndr_pull_notify_remote_event);
603 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
604 TALLOC_FREE(ev);
605 return false;
607 if (DEBUGLEVEL >= 10) {
608 NDR_PRINT_DEBUG(notify_remote_event, ev);
610 *paction = ev->action;
611 *pfilter = ev->filter;
612 p = discard_const_p(char, ev->path);
613 *path = talloc_move(mem_ctx, &p);
615 TALLOC_FREE(ev);
616 return true;
619 void notify_trigger(struct notify_context *notify,
620 uint32_t action, uint32_t filter,
621 const char *dir, const char *name)
623 struct ctdbd_connection *ctdbd_conn;
624 struct notify_trigger_index_state idx_state;
625 const char *p, *next_p;
626 size_t i, num_vnns;
627 uint32_t last_vnn;
628 uint8_t *remote_blob = NULL;
629 size_t remote_blob_len = 0;
630 struct iovec iov;
631 char *path, *to_free;
632 char tmpbuf[PATH_MAX];
633 ssize_t len;
635 DEBUG(10, ("notify_trigger called action=0x%x, filter=0x%x, "
636 "dir=%s, name=%s\n", (unsigned)action, (unsigned)filter,
637 dir, name));
639 /* see if change notify is enabled at all */
640 if (notify == NULL) {
641 return;
644 if (dir[0] != '/') {
646 * The rest of this routine assumes an absolute path.
648 return;
651 len = full_path_tos(dir, name, tmpbuf, sizeof(tmpbuf),
652 &path, &to_free);
653 if (len == -1) {
654 DEBUG(1, ("full_path_tos failed\n"));
655 return;
658 idx_state.mem_ctx = talloc_tos();
659 idx_state.vnns = NULL;
660 idx_state.found_my_vnn = false;
661 idx_state.my_vnn = get_my_vnn();
663 for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
664 ptrdiff_t path_len = p - path;
665 bool recursive;
667 next_p = strchr(p+1, '/');
668 recursive = (next_p != NULL);
670 dbwrap_parse_record(
671 notify->db_index,
672 make_tdb_data(discard_const_p(uint8_t, path), path_len),
673 notify_trigger_index_parser, &idx_state);
675 if (idx_state.found_my_vnn) {
676 notify_trigger_local(notify, action, filter,
677 path, path_len, recursive);
678 idx_state.found_my_vnn = false;
682 if (idx_state.vnns == NULL) {
683 goto done;
686 ctdbd_conn = messaging_ctdbd_connection();
687 if (ctdbd_conn == NULL) {
688 goto done;
691 num_vnns = talloc_array_length(idx_state.vnns);
692 qsort(idx_state.vnns, num_vnns, sizeof(uint32_t), vnn_cmp);
694 last_vnn = 0xffffffff;
696 if (!notify_push_remote_blob(talloc_tos(), action, filter, path,
697 &remote_blob, &remote_blob_len)) {
698 DEBUG(1, ("notify_push_remote_blob failed\n"));
699 goto done;
702 iov = (struct iovec) { .iov_base = remote_blob,
703 .iov_len = remote_blob_len };
705 for (i=0; i<num_vnns; i++) {
706 uint32_t vnn = idx_state.vnns[i];
707 NTSTATUS status;
709 if (vnn == last_vnn) {
710 continue;
713 status = ctdbd_messaging_send_iov(
714 ctdbd_conn, vnn, CTDB_SRVID_SAMBA_NOTIFY_PROXY,
715 &iov, 1);
716 if (!NT_STATUS_IS_OK(status)) {
717 DEBUG(10, ("ctdbd_messaging_send_iov to vnn %d "
718 "returned %s, ignoring\n", (int)vnn,
719 nt_errstr(status)));
722 last_vnn = vnn;
725 done:
726 TALLOC_FREE(remote_blob);
727 TALLOC_FREE(idx_state.vnns);
728 TALLOC_FREE(to_free);
731 static void notify_trigger_local(struct notify_context *notify,
732 uint32_t action, uint32_t filter,
733 const char *path, size_t path_len,
734 bool recursive)
736 TDB_DATA data;
737 struct notify_db_entry *entries;
738 size_t i, num_entries;
739 NTSTATUS status;
741 DEBUG(10, ("notify_trigger_local called for %*s, path_len=%d, "
742 "filter=%d\n", (int)path_len, path, (int)path_len,
743 (int)filter));
745 status = dbwrap_fetch(
746 notify->db_notify, talloc_tos(),
747 make_tdb_data(discard_const_p(uint8_t, path), path_len), &data);
748 if (!NT_STATUS_IS_OK(status)) {
749 DEBUG(10, ("dbwrap_fetch returned %s\n",
750 nt_errstr(status)));
751 return;
753 if (data.dsize == sizeof(time_t)) {
754 DEBUG(10, ("Got deleted record\n"));
755 goto done;
757 if ((data.dsize % sizeof(struct notify_db_entry)) != 0) {
758 DEBUG(1, ("Invalid data.dsize = %u\n",
759 (unsigned)data.dsize));
760 goto done;
763 entries = (struct notify_db_entry *)data.dptr;
764 num_entries = data.dsize / sizeof(struct notify_db_entry);
766 DEBUG(10, ("recursive = %s pathlen=%d (%c)\n",
767 recursive ? "true" : "false", (int)path_len,
768 path[path_len]));
770 for (i=0; i<num_entries; i++) {
771 struct notify_db_entry *e = &entries[i];
772 uint32_t e_filter;
774 if (DEBUGLEVEL >= 10) {
775 NDR_PRINT_DEBUG(notify_db_entry, e);
778 e_filter = recursive ? e->subdir_filter : e->filter;
780 if ((filter & e_filter) == 0) {
781 continue;
784 if (!procid_is_local(&e->server)) {
785 struct server_id_buf tmp;
786 DEBUG(1, ("internal error: Non-local pid %s in "
787 "notify.tdb\n",
788 server_id_str_buf(e->server, &tmp)));
789 continue;
792 status = notify_send(notify, &e->server, path + path_len + 1,
793 action, e->private_data);
794 if (!NT_STATUS_IS_OK(status)) {
795 DEBUG(10, ("notify_send returned %s\n",
796 nt_errstr(status)));
800 done:
801 TALLOC_FREE(data.dptr);
804 struct notify_msg {
805 struct timespec when;
806 void *private_data;
807 uint32_t action;
808 char path[1];
811 static NTSTATUS notify_send(struct notify_context *notify,
812 struct server_id *pid,
813 const char *path, uint32_t action,
814 void *private_data)
816 struct notify_msg m = {};
817 struct iovec iov[2];
819 m.when = timespec_current();
820 m.private_data = private_data;
821 m.action = action;
823 iov[0].iov_base = &m;
824 iov[0].iov_len = offsetof(struct notify_msg, path);
825 iov[1].iov_base = discard_const_p(char, path);
826 iov[1].iov_len = strlen(path)+1;
828 return messaging_send_iov(notify->msg, *pid, MSG_PVFS_NOTIFY,
829 iov, ARRAY_SIZE(iov), NULL, 0);
832 static void notify_handler(struct messaging_context *msg_ctx,
833 void *private_data, uint32_t msg_type,
834 struct server_id server_id, DATA_BLOB *data)
836 struct notify_context *notify = talloc_get_type_abort(
837 private_data, struct notify_context);
838 struct notify_msg *m;
839 struct notify_event e;
840 struct notify_list *listel;
842 if (data->length == 0) {
843 DEBUG(1, ("%s: Got 0-sized MSG_PVFS_NOTIFY msg\n", __func__));
844 return;
846 if (data->data[data->length-1] != 0) {
847 DEBUG(1, ("%s: MSG_PVFS_NOTIFY path not 0-terminated\n",
848 __func__));
849 return;
852 m = (struct notify_msg *)data->data;
854 e = (struct notify_event) {
855 .action = m->action,
856 .path = m->path,
857 .private_data = m->private_data,
858 .dir = discard_const_p(char, "")
861 for (listel=notify->list;listel;listel=listel->next) {
862 if (listel->private_data == m->private_data) {
863 listel->callback(listel->private_data, m->when, &e);
864 break;
869 struct notify_walk_idx_state {
870 void (*fn)(const char *path,
871 uint32_t *vnns, size_t num_vnns,
872 void *private_data);
873 void *private_data;
876 static int notify_walk_idx_fn(struct db_record *rec, void *private_data)
878 struct notify_walk_idx_state *state =
879 (struct notify_walk_idx_state *)private_data;
880 TDB_DATA key, value;
881 char *path;
883 key = dbwrap_record_get_key(rec);
884 value = dbwrap_record_get_value(rec);
886 if ((value.dsize % sizeof(uint32_t)) != 0) {
887 DEBUG(1, ("invalid value size in notify index db: %u\n",
888 (unsigned)(value.dsize)));
889 return 0;
892 path = talloc_strndup(talloc_tos(), (char *)key.dptr, key.dsize);
893 if (path == NULL) {
894 DEBUG(1, ("talloc_strndup failed\n"));
895 return 0;
897 state->fn(path, (uint32_t *)value.dptr, value.dsize/sizeof(uint32_t),
898 state->private_data);
899 TALLOC_FREE(path);
900 return 0;
903 void notify_walk_idx(struct notify_context *notify,
904 void (*fn)(const char *path,
905 uint32_t *vnns, size_t num_vnns,
906 void *private_data),
907 void *private_data)
909 struct notify_walk_idx_state state;
910 state.fn = fn;
911 state.private_data = private_data;
912 dbwrap_traverse_read(notify->db_index, notify_walk_idx_fn, &state,
913 NULL);
916 struct notify_walk_state {
917 void (*fn)(const char *path,
918 struct notify_db_entry *entries, size_t num_entries,
919 time_t deleted_time, void *private_data);
920 void *private_data;
923 static int notify_walk_fn(struct db_record *rec, void *private_data)
925 struct notify_walk_state *state =
926 (struct notify_walk_state *)private_data;
927 TDB_DATA key, value;
928 struct notify_db_entry *entries;
929 size_t num_entries;
930 time_t deleted_time;
931 char *path;
933 key = dbwrap_record_get_key(rec);
934 value = dbwrap_record_get_value(rec);
936 if (value.dsize == sizeof(deleted_time)) {
937 memcpy(&deleted_time, value.dptr, sizeof(deleted_time));
938 entries = NULL;
939 num_entries = 0;
940 } else {
941 if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
942 DEBUG(1, ("invalid value size in notify db: %u\n",
943 (unsigned)(value.dsize)));
944 return 0;
946 entries = (struct notify_db_entry *)value.dptr;
947 num_entries = value.dsize / sizeof(struct notify_db_entry);
948 deleted_time = 0;
951 path = talloc_strndup(talloc_tos(), (char *)key.dptr, key.dsize);
952 if (path == NULL) {
953 DEBUG(1, ("talloc_strndup failed\n"));
954 return 0;
956 state->fn(path, entries, num_entries, deleted_time,
957 state->private_data);
958 TALLOC_FREE(path);
959 return 0;
962 void notify_walk(struct notify_context *notify,
963 void (*fn)(const char *path,
964 struct notify_db_entry *entries,
965 size_t num_entries,
966 time_t deleted_time, void *private_data),
967 void *private_data)
969 struct notify_walk_state state;
970 state.fn = fn;
971 state.private_data = private_data;
972 dbwrap_traverse_read(notify->db_notify, notify_walk_fn, &state,
973 NULL);
976 struct notify_cleanup_state {
977 TALLOC_CTX *mem_ctx;
978 time_t delete_before;
979 ssize_t array_size;
980 uint32_t num_paths;
981 char **paths;
984 static void notify_cleanup_collect(
985 const char *path, struct notify_db_entry *entries, size_t num_entries,
986 time_t deleted_time, void *private_data)
988 struct notify_cleanup_state *state =
989 (struct notify_cleanup_state *)private_data;
990 char *p;
992 if (num_entries != 0) {
993 return;
995 if (deleted_time >= state->delete_before) {
996 return;
999 p = talloc_strdup(state->mem_ctx, path);
1000 if (p == NULL) {
1001 DEBUG(1, ("talloc_strdup failed\n"));
1002 return;
1004 add_to_large_array(state->mem_ctx, sizeof(p), (void *)&p,
1005 &state->paths, &state->num_paths,
1006 &state->array_size);
1007 if (state->array_size == -1) {
1008 TALLOC_FREE(p);
1012 static bool notify_cleanup_path(struct notify_context *notify,
1013 const char *path, time_t delete_before);
1015 void notify_cleanup(struct notify_context *notify)
1017 struct notify_cleanup_state state;
1018 uint32_t failure_pool;
1020 ZERO_STRUCT(state);
1021 state.mem_ctx = talloc_stackframe();
1023 state.delete_before = time(NULL)
1024 - lp_parm_int(-1, "smbd", "notify cleanup interval", 60);
1026 notify_walk(notify, notify_cleanup_collect, &state);
1028 failure_pool = state.num_paths;
1030 while (state.num_paths != 0) {
1031 size_t idx;
1034 * This loop is designed to be as kind as possible to
1035 * ctdb. ctdb does not like it if many smbds hammer on a
1036 * single record. If on many nodes the cleanup process starts
1037 * running, it can happen that all of them need to clean up
1038 * records in the same order. This would generate a ctdb
1039 * migrate storm on these records. Randomizing the load across
1040 * multiple records reduces the load on the individual record.
1043 generate_random_buffer((uint8_t *)&idx, sizeof(idx));
1044 idx = idx % state.num_paths;
1046 if (!notify_cleanup_path(notify, state.paths[idx],
1047 state.delete_before)) {
1049 * notify_cleanup_path failed, the most likely reason
1050 * is that dbwrap_try_fetch_locked failed due to
1051 * contention. We allow one failed attempt per deleted
1052 * path on average before we give up.
1054 failure_pool -= 1;
1055 if (failure_pool == 0) {
1057 * Too many failures. We will come back here,
1058 * maybe next time there is less contention.
1060 break;
1064 TALLOC_FREE(state.paths[idx]);
1065 state.paths[idx] = state.paths[state.num_paths-1];
1066 state.num_paths -= 1;
1068 TALLOC_FREE(state.mem_ctx);
1071 static bool notify_cleanup_path(struct notify_context *notify,
1072 const char *path, time_t delete_before)
1074 struct db_record *notify_rec = NULL;
1075 struct db_record *idx_rec = NULL;
1076 TDB_DATA key = string_tdb_data(path);
1077 TDB_DATA value;
1078 time_t deleted;
1079 NTSTATUS status;
1081 notify_rec = dbwrap_fetch_locked(notify->db_notify, talloc_tos(), key);
1082 if (notify_rec == NULL) {
1083 DEBUG(10, ("Could not fetch notify_rec\n"));
1084 return false;
1086 value = dbwrap_record_get_value(notify_rec);
1088 if (value.dsize != sizeof(deleted)) {
1089 DEBUG(10, ("record %s has been re-used\n", path));
1090 goto done;
1092 memcpy(&deleted, value.dptr, sizeof(deleted));
1094 if (deleted >= delete_before) {
1095 DEBUG(10, ("record %s too young\n", path));
1096 goto done;
1100 * Be kind to ctdb and only try one dmaster migration at most.
1102 idx_rec = dbwrap_try_fetch_locked(notify->db_index, talloc_tos(), key);
1103 if (idx_rec == NULL) {
1104 DEBUG(10, ("Could not fetch idx_rec\n"));
1105 goto done;
1108 status = dbwrap_record_delete(notify_rec);
1109 if (!NT_STATUS_IS_OK(status)) {
1110 DEBUG(10, ("Could not delete notify_rec: %s\n",
1111 nt_errstr(status)));
1114 status = notify_del_idx(idx_rec, get_my_vnn());
1115 if (!NT_STATUS_IS_OK(status)) {
1116 DEBUG(10, ("Could not delete idx_rec: %s\n",
1117 nt_errstr(status)));
1120 done:
1121 TALLOC_FREE(idx_rec);
1122 TALLOC_FREE(notify_rec);
1123 return true;
1126 static NTSTATUS notify_del_idx(struct db_record *rec, uint32_t vnn)
1128 TDB_DATA value = dbwrap_record_get_value(rec);
1129 uint32_t *vnns;
1130 size_t i, num_vnns;
1132 if ((value.dsize % sizeof(uint32_t)) != 0) {
1133 DEBUG(1, ("Invalid value.dsize = %u\n",
1134 (unsigned)value.dsize));
1135 return NT_STATUS_INTERNAL_DB_CORRUPTION;
1137 num_vnns = value.dsize / sizeof(uint32_t);
1138 vnns = (uint32_t *)value.dptr;
1140 for (i=0; i<num_vnns; i++) {
1141 if (vnns[i] == vnn) {
1142 break;
1146 if (i == num_vnns) {
1148 * Not found. Should not happen, but okay...
1150 return NT_STATUS_OK;
1153 memmove(&vnns[i], &vnns[i+1], sizeof(uint32_t) * (num_vnns - i - 1));
1154 value.dsize -= sizeof(uint32_t);
1156 if (value.dsize == 0) {
1157 return dbwrap_record_delete(rec);
1159 return dbwrap_record_store(rec, value, 0);
1162 struct notify_cluster_proxy_state {
1163 struct tevent_context *ev;
1164 struct notify_context *notify;
1165 struct ctdb_msg_channel *chan;
1168 static void notify_cluster_proxy_got_chan(struct tevent_req *subreq);
1169 static void notify_cluster_proxy_got_msg(struct tevent_req *subreq);
1170 static void notify_cluster_proxy_trigger(struct notify_context *notify,
1171 uint32_t action, uint32_t filter,
1172 char *path);
1174 struct tevent_req *notify_cluster_proxy_send(
1175 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1176 struct notify_context *notify)
1178 struct tevent_req *req, *subreq;
1179 struct notify_cluster_proxy_state *state;
1181 req = tevent_req_create(mem_ctx, &state,
1182 struct notify_cluster_proxy_state);
1183 if (req == NULL) {
1184 return NULL;
1186 state->ev = ev;
1187 state->notify = notify;
1189 subreq = ctdb_msg_channel_init_send(
1190 state, state->ev, lp_ctdbd_socket(),
1191 CTDB_SRVID_SAMBA_NOTIFY_PROXY);
1192 if (tevent_req_nomem(subreq, req)) {
1193 return tevent_req_post(req, ev);
1195 tevent_req_set_callback(subreq, notify_cluster_proxy_got_chan, req);
1196 return req;
1199 static void notify_cluster_proxy_got_chan(struct tevent_req *subreq)
1201 struct tevent_req *req = tevent_req_callback_data(
1202 subreq, struct tevent_req);
1203 struct notify_cluster_proxy_state *state = tevent_req_data(
1204 req, struct notify_cluster_proxy_state);
1205 int ret;
1207 ret = ctdb_msg_channel_init_recv(subreq, state, &state->chan);
1208 TALLOC_FREE(subreq);
1209 if (ret != 0) {
1210 tevent_req_error(req, ret);
1211 return;
1213 subreq = ctdb_msg_read_send(state, state->ev, state->chan);
1214 if (tevent_req_nomem(subreq, req)) {
1215 return;
1217 tevent_req_set_callback(subreq, notify_cluster_proxy_got_msg, req);
1220 static void notify_cluster_proxy_got_msg(struct tevent_req *subreq)
1222 struct tevent_req *req = tevent_req_callback_data(
1223 subreq, struct tevent_req);
1224 struct notify_cluster_proxy_state *state = tevent_req_data(
1225 req, struct notify_cluster_proxy_state);
1226 uint8_t *msg;
1227 size_t msg_len;
1228 uint32_t action, filter;
1229 char *path;
1230 int ret;
1231 bool res;
1233 ret = ctdb_msg_read_recv(subreq, talloc_tos(), &msg, &msg_len);
1234 TALLOC_FREE(subreq);
1235 if (ret != 0) {
1236 tevent_req_error(req, ret);
1237 return;
1240 res = notify_pull_remote_blob(talloc_tos(), msg, msg_len,
1241 &action, &filter, &path);
1242 TALLOC_FREE(msg);
1243 if (!res) {
1244 tevent_req_error(req, EIO);
1245 return;
1247 notify_cluster_proxy_trigger(state->notify, action, filter, path);
1248 TALLOC_FREE(path);
1250 subreq = ctdb_msg_read_send(state, state->ev, state->chan);
1251 if (tevent_req_nomem(subreq, req)) {
1252 return;
1254 tevent_req_set_callback(subreq, notify_cluster_proxy_got_msg, req);
1257 static void notify_cluster_proxy_trigger(struct notify_context *notify,
1258 uint32_t action, uint32_t filter,
1259 char *path)
1261 const char *p, *next_p;
1263 for (p = path; p != NULL; p = next_p) {
1264 ptrdiff_t path_len = p - path;
1265 bool recursive;
1267 next_p = strchr(p+1, '/');
1268 recursive = (next_p != NULL);
1270 notify_trigger_local(notify, action, filter,
1271 path, path_len, recursive);
1275 int notify_cluster_proxy_recv(struct tevent_req *req)
1277 return tevent_req_simple_recv_unix(req);