smbd: Rename lck2->rw_probe in brl_conflict_other
[Samba.git] / source3 / smbd / notify_internal.c
blobe902bf4f3e85663ce11fcdaa1fa8492ee28fb261
1 /*
2 Unix SMB/CIFS implementation.
4 Copyright (C) Andrew Tridgell 2006
5 Copyright (C) Volker Lendecke 2012
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 this is the change notify database. It implements mechanisms for
23 storing current change notify waiters in a tdb, and checking if a
24 given event matches any of the stored notify waiters.
27 #include "includes.h"
28 #include "system/filesys.h"
29 #include "librpc/gen_ndr/ndr_notify.h"
30 #include "dbwrap/dbwrap.h"
31 #include "dbwrap/dbwrap_open.h"
32 #include "dbwrap/dbwrap_tdb.h"
33 #include "smbd/smbd.h"
34 #include "messages.h"
35 #include "lib/tdb_wrap/tdb_wrap.h"
36 #include "util_tdb.h"
37 #include "lib/param/param.h"
38 #include "lib/dbwrap/dbwrap_cache.h"
39 #include "ctdb_srvids.h"
40 #include "ctdbd_conn.h"
41 #include "ctdb_conn.h"
42 #include "lib/util/tevent_unix.h"
44 struct notify_list {
45 struct notify_list *next, *prev;
46 const char *path;
47 void (*callback)(void *, struct timespec, const struct notify_event *);
48 void *private_data;
51 struct notify_context {
52 struct messaging_context *msg;
53 struct notify_list *list;
56 * The notify database is split up into two databases: One
57 * relatively static index db and the real notify db with the
58 * volatile entries.
62 * "db_notify" is indexed by pathname. Per record it stores an
63 * array of notify_db_entry structs. These represent the
64 * notify records as requested by the smb client. This
65 * database is always held locally, it is never clustered.
67 struct db_context *db_notify;
70 * "db_index" is indexed by pathname. The records are an array
71 * of VNNs which have any interest in notifies for this path
72 * name.
74 * In the non-clustered case this database is cached in RAM by
75 * means of db_cache_open, which maintains a cache per
76 * process. Cache consistency is maintained by the tdb
77 * sequence number.
79 * In the clustered case right now we can not use the tdb
80 * sequence number, but by means of read only records we
81 * should be able to avoid a lot of full migrations.
83 * In both cases, it is important to keep the update
84 * operations to db_index to a minimum. This is achieved by
85 * delayed deletion. When a db_notify is initially created,
86 * the db_index record is also created. When more notifies are
87 * added for a path, then only the db_notify record needs to be
88 * modified, the db_index record is not touched. When the last
89 * entry from the db_notify record is deleted, the db_index
90 * record is not immediately deleted. Instead, the db_notify
91 * record is replaced with a current timestamp. A regular
92 * cleanup process will delete all db_index records that are
93 * older than a minute.
95 struct db_context *db_index;
98 static void notify_trigger_local(struct notify_context *notify,
99 uint32_t action, uint32_t filter,
100 const char *path, size_t path_len,
101 bool recursive);
102 static NTSTATUS notify_send(struct notify_context *notify,
103 struct server_id *pid,
104 const char *path, uint32_t action,
105 void *private_data);
106 static NTSTATUS notify_add_entry(struct db_record *rec,
107 const struct notify_db_entry *e,
108 bool *p_add_idx);
109 static NTSTATUS notify_add_idx(struct db_record *rec, uint32_t vnn);
111 static NTSTATUS notify_del_entry(struct db_record *rec,
112 const struct server_id *pid,
113 void *private_data);
114 static NTSTATUS notify_del_idx(struct db_record *rec, uint32_t vnn);
116 static int notify_context_destructor(struct notify_context *notify);
118 static void notify_handler(struct messaging_context *msg_ctx,
119 void *private_data, uint32_t msg_type,
120 struct server_id server_id, DATA_BLOB *data);
122 struct notify_context *notify_init(TALLOC_CTX *mem_ctx,
123 struct messaging_context *msg,
124 struct tevent_context *ev)
126 struct loadparm_context *lp_ctx;
127 struct notify_context *notify;
129 notify = talloc(mem_ctx, struct notify_context);
130 if (notify == NULL) {
131 goto fail;
133 notify->msg = msg;
134 notify->list = NULL;
136 lp_ctx = loadparm_init_s3(notify, loadparm_s3_helpers());
137 notify->db_notify = db_open_tdb(
138 notify, lp_ctx, lock_path("notify.tdb"),
139 0, TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
140 O_RDWR|O_CREAT, 0644, DBWRAP_LOCK_ORDER_2, DBWRAP_FLAG_NONE);
141 talloc_unlink(notify, lp_ctx);
142 if (notify->db_notify == NULL) {
143 goto fail;
145 notify->db_index = db_open(
146 notify, lock_path("notify_index.tdb"),
147 0, TDB_SEQNUM|TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
148 O_RDWR|O_CREAT, 0644, DBWRAP_LOCK_ORDER_3, DBWRAP_FLAG_NONE);
149 if (notify->db_index == NULL) {
150 goto fail;
152 if (!lp_clustering()) {
153 notify->db_index = db_open_cache(notify, notify->db_index);
154 if (notify->db_index == NULL) {
155 goto fail;
159 if (notify->msg != NULL) {
160 NTSTATUS status;
162 status = messaging_register(notify->msg, notify,
163 MSG_PVFS_NOTIFY, notify_handler);
164 if (!NT_STATUS_IS_OK(status)) {
165 DEBUG(1, ("messaging_register returned %s\n",
166 nt_errstr(status)));
167 goto fail;
171 talloc_set_destructor(notify, notify_context_destructor);
173 return notify;
174 fail:
175 TALLOC_FREE(notify);
176 return NULL;
179 static int notify_context_destructor(struct notify_context *notify)
181 DEBUG(10, ("notify_context_destructor called\n"));
183 if (notify->msg != NULL) {
184 messaging_deregister(notify->msg, MSG_PVFS_NOTIFY, notify);
187 while (notify->list != NULL) {
188 DEBUG(10, ("Removing private_data=%p\n",
189 notify->list->private_data));
190 notify_remove(notify, notify->list->private_data);
192 return 0;
195 NTSTATUS notify_add(struct notify_context *notify,
196 const char *path, uint32_t filter, uint32_t subdir_filter,
197 void (*callback)(void *, struct timespec,
198 const struct notify_event *),
199 void *private_data)
201 struct notify_db_entry e;
202 struct notify_list *listel;
203 struct db_record *notify_rec, *idx_rec;
204 bool add_idx;
205 NTSTATUS status;
206 TDB_DATA key, notify_copy;
208 if (notify == NULL) {
209 return NT_STATUS_NOT_IMPLEMENTED;
212 DEBUG(10, ("notify_add: path=[%s], private_data=%p\n", path,
213 private_data));
215 listel = talloc(notify, struct notify_list);
216 if (listel == NULL) {
217 return NT_STATUS_NO_MEMORY;
219 listel->callback = callback;
220 listel->private_data = private_data;
221 listel->path = talloc_strdup(listel, path);
222 if (listel->path == NULL) {
223 TALLOC_FREE(listel);
224 return NT_STATUS_NO_MEMORY;
226 DLIST_ADD(notify->list, listel);
228 ZERO_STRUCT(e);
229 e.filter = filter;
230 e.subdir_filter = subdir_filter;
231 e.server = messaging_server_id(notify->msg);
232 e.private_data = private_data;
234 key = string_tdb_data(path);
236 notify_rec = dbwrap_fetch_locked(notify->db_notify,
237 talloc_tos(), key);
238 if (notify_rec == NULL) {
239 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
240 goto fail;
244 * Make a copy of the notify_rec for easy restore in case
245 * updating the index_rec fails;
247 notify_copy = dbwrap_record_get_value(notify_rec);
248 if (notify_copy.dsize != 0) {
249 notify_copy.dptr = (uint8_t *)talloc_memdup(
250 notify_rec, notify_copy.dptr,
251 notify_copy.dsize);
252 if (notify_copy.dptr == NULL) {
253 TALLOC_FREE(notify_rec);
254 status = NT_STATUS_NO_MEMORY;
255 goto fail;
259 if (DEBUGLEVEL >= 10) {
260 NDR_PRINT_DEBUG(notify_db_entry, &e);
263 status = notify_add_entry(notify_rec, &e, &add_idx);
264 if (!NT_STATUS_IS_OK(status)) {
265 goto fail;
267 if (!add_idx) {
269 * Someone else has added the idx entry already
271 TALLOC_FREE(notify_rec);
272 return NT_STATUS_OK;
275 idx_rec = dbwrap_fetch_locked(notify->db_index,
276 talloc_tos(), key);
277 if (idx_rec == NULL) {
278 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
279 goto restore_notify;
281 status = notify_add_idx(idx_rec, get_my_vnn());
282 if (!NT_STATUS_IS_OK(status)) {
283 goto restore_notify;
286 TALLOC_FREE(idx_rec);
287 TALLOC_FREE(notify_rec);
288 return NT_STATUS_OK;
290 restore_notify:
291 if (notify_copy.dsize != 0) {
292 dbwrap_record_store(notify_rec, notify_copy, 0);
293 } else {
294 dbwrap_record_delete(notify_rec);
296 TALLOC_FREE(notify_rec);
297 fail:
298 DLIST_REMOVE(notify->list, listel);
299 TALLOC_FREE(listel);
300 return status;
303 static NTSTATUS notify_add_entry(struct db_record *rec,
304 const struct notify_db_entry *e,
305 bool *p_add_idx)
307 TDB_DATA value = dbwrap_record_get_value(rec);
308 struct notify_db_entry *entries;
309 size_t num_entries;
310 bool add_idx = true;
311 NTSTATUS status;
313 if (value.dsize == sizeof(time_t)) {
314 DEBUG(10, ("Re-using deleted entry\n"));
315 value.dsize = 0;
316 add_idx = false;
319 if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
320 DEBUG(1, ("Invalid value.dsize = %u\n",
321 (unsigned)value.dsize));
322 return NT_STATUS_INTERNAL_DB_CORRUPTION;
324 num_entries = value.dsize / sizeof(struct notify_db_entry);
326 if (num_entries != 0) {
327 add_idx = false;
330 entries = talloc_array(rec, struct notify_db_entry, num_entries + 1);
331 if (entries == NULL) {
332 return NT_STATUS_NO_MEMORY;
334 memcpy(entries, value.dptr, value.dsize);
336 entries[num_entries] = *e;
337 value = make_tdb_data((uint8_t *)entries, talloc_get_size(entries));
338 status = dbwrap_record_store(rec, value, 0);
339 TALLOC_FREE(entries);
340 if (!NT_STATUS_IS_OK(status)) {
341 return status;
343 *p_add_idx = add_idx;
344 return NT_STATUS_OK;
347 static NTSTATUS notify_add_idx(struct db_record *rec, uint32_t vnn)
349 TDB_DATA value = dbwrap_record_get_value(rec);
350 uint32_t *vnns;
351 size_t i, num_vnns;
352 NTSTATUS status;
354 if ((value.dsize % sizeof(uint32_t)) != 0) {
355 DEBUG(1, ("Invalid value.dsize = %u\n",
356 (unsigned)value.dsize));
357 return NT_STATUS_INTERNAL_DB_CORRUPTION;
359 num_vnns = value.dsize / sizeof(uint32_t);
360 vnns = (uint32_t *)value.dptr;
362 for (i=0; i<num_vnns; i++) {
363 if (vnns[i] == vnn) {
364 return NT_STATUS_OK;
366 if (vnns[i] > vnn) {
367 break;
371 value.dptr = (uint8_t *)talloc_realloc(
372 rec, value.dptr, uint32_t, num_vnns + 1);
373 if (value.dptr == NULL) {
374 return NT_STATUS_NO_MEMORY;
376 value.dsize = talloc_get_size(value.dptr);
378 vnns = (uint32_t *)value.dptr;
380 memmove(&vnns[i+1], &vnns[i], sizeof(uint32_t) * (num_vnns - i));
381 vnns[i] = vnn;
383 status = dbwrap_record_store(rec, value, 0);
384 if (!NT_STATUS_IS_OK(status)) {
385 return status;
387 return NT_STATUS_OK;
390 NTSTATUS notify_remove(struct notify_context *notify, void *private_data)
392 struct server_id pid;
393 struct notify_list *listel;
394 struct db_record *notify_rec;
395 NTSTATUS status;
397 if ((notify == NULL) || (notify->msg == NULL)) {
398 return NT_STATUS_NOT_IMPLEMENTED;
401 DEBUG(10, ("notify_remove: private_data=%p\n", private_data));
403 pid = messaging_server_id(notify->msg);
405 for (listel=notify->list;listel;listel=listel->next) {
406 if (listel->private_data == private_data) {
407 DLIST_REMOVE(notify->list, listel);
408 break;
411 if (listel == NULL) {
412 DEBUG(10, ("%p not found\n", private_data));
413 return NT_STATUS_NOT_FOUND;
415 notify_rec = dbwrap_fetch_locked(notify->db_notify, talloc_tos(),
416 string_tdb_data(listel->path));
417 TALLOC_FREE(listel);
418 if (notify_rec == NULL) {
419 return NT_STATUS_INTERNAL_DB_CORRUPTION;
421 status = notify_del_entry(notify_rec, &pid, private_data);
422 DEBUG(10, ("del_entry returned %s\n", nt_errstr(status)));
423 TALLOC_FREE(notify_rec);
424 return status;
427 static NTSTATUS notify_del_entry(struct db_record *rec,
428 const struct server_id *pid,
429 void *private_data)
431 TDB_DATA value = dbwrap_record_get_value(rec);
432 struct notify_db_entry *entries;
433 size_t i, num_entries;
434 time_t now;
436 DEBUG(10, ("del_entry called for %s %p\n", procid_str_static(pid),
437 private_data));
439 if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
440 DEBUG(1, ("Invalid value.dsize = %u\n",
441 (unsigned)value.dsize));
442 return NT_STATUS_INTERNAL_DB_CORRUPTION;
444 num_entries = value.dsize / sizeof(struct notify_db_entry);
445 entries = (struct notify_db_entry *)value.dptr;
447 for (i=0; i<num_entries; i++) {
448 struct notify_db_entry *e = &entries[i];
450 if (DEBUGLEVEL >= 10) {
451 NDR_PRINT_DEBUG(notify_db_entry, e);
454 if (e->private_data != private_data) {
455 continue;
457 if (serverid_equal(&e->server, pid)) {
458 break;
461 if (i == num_entries) {
462 return NT_STATUS_NOT_FOUND;
464 entries[i] = entries[num_entries-1];
465 value.dsize -= sizeof(struct notify_db_entry);
467 if (value.dsize == 0) {
468 now = time(NULL);
469 value.dptr = (uint8_t *)&now;
470 value.dsize = sizeof(now);
472 return dbwrap_record_store(rec, value, 0);
475 struct notify_trigger_index_state {
476 TALLOC_CTX *mem_ctx;
477 uint32_t *vnns;
478 uint32_t my_vnn;
479 bool found_my_vnn;
482 static void notify_trigger_index_parser(TDB_DATA key, TDB_DATA data,
483 void *private_data)
485 struct notify_trigger_index_state *state =
486 (struct notify_trigger_index_state *)private_data;
487 uint32_t *new_vnns;
488 size_t i, num_vnns, num_new_vnns, num_remote_vnns;
490 if ((data.dsize % sizeof(uint32_t)) != 0) {
491 DEBUG(1, ("Invalid record size in notify index db: %u\n",
492 (unsigned)data.dsize));
493 return;
495 new_vnns = (uint32_t *)data.dptr;
496 num_new_vnns = data.dsize / sizeof(uint32_t);
497 num_remote_vnns = num_new_vnns;
499 for (i=0; i<num_new_vnns; i++) {
500 if (new_vnns[i] == state->my_vnn) {
501 state->found_my_vnn = true;
502 num_remote_vnns -= 1;
505 if (num_remote_vnns == 0) {
506 return;
509 num_vnns = talloc_array_length(state->vnns);
510 state->vnns = talloc_realloc(state->mem_ctx, state->vnns, uint32_t,
511 num_vnns + num_remote_vnns);
512 if (state->vnns == NULL) {
513 DEBUG(1, ("talloc_realloc failed\n"));
514 return;
517 for (i=0; i<num_new_vnns; i++) {
518 if (new_vnns[i] != state->my_vnn) {
519 state->vnns[num_vnns] = new_vnns[i];
520 num_vnns += 1;
525 static int vnn_cmp(const void *p1, const void *p2)
527 const uint32_t *vnn1 = (const uint32_t *)p1;
528 const uint32_t *vnn2 = (const uint32_t *)p2;
530 if (*vnn1 < *vnn2) {
531 return -1;
533 if (*vnn1 == *vnn2) {
534 return 0;
536 return 1;
539 static bool notify_push_remote_blob(TALLOC_CTX *mem_ctx, uint32_t action,
540 uint32_t filter, const char *path,
541 uint8_t **pblob, size_t *pblob_len)
543 struct notify_remote_event ev;
544 DATA_BLOB data;
545 enum ndr_err_code ndr_err;
547 ev.action = action;
548 ev.filter = filter;
549 ev.path = path;
551 if (DEBUGLEVEL >= 10) {
552 NDR_PRINT_DEBUG(notify_remote_event, &ev);
555 ndr_err = ndr_push_struct_blob(
556 &data, mem_ctx, &ev,
557 (ndr_push_flags_fn_t)ndr_push_notify_remote_event);
558 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
559 return false;
561 *pblob = data.data;
562 *pblob_len = data.length;
563 return true;
566 static bool notify_pull_remote_blob(TALLOC_CTX *mem_ctx,
567 const uint8_t *blob, size_t blob_len,
568 uint32_t *paction, uint32_t *pfilter,
569 char **path)
571 struct notify_remote_event *ev;
572 enum ndr_err_code ndr_err;
573 DATA_BLOB data;
574 char *p;
576 data.data = discard_const_p(uint8_t, blob);
577 data.length = blob_len;
579 ev = talloc(mem_ctx, struct notify_remote_event);
580 if (ev == NULL) {
581 return false;
584 ndr_err = ndr_pull_struct_blob(
585 &data, ev, ev,
586 (ndr_pull_flags_fn_t)ndr_pull_notify_remote_event);
587 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
588 TALLOC_FREE(ev);
589 return false;
591 if (DEBUGLEVEL >= 10) {
592 NDR_PRINT_DEBUG(notify_remote_event, ev);
594 *paction = ev->action;
595 *pfilter = ev->filter;
596 p = discard_const_p(char, ev->path);
597 *path = talloc_move(mem_ctx, &p);
599 TALLOC_FREE(ev);
600 return true;
603 void notify_trigger(struct notify_context *notify,
604 uint32_t action, uint32_t filter, const char *path)
606 struct ctdbd_connection *ctdbd_conn;
607 struct notify_trigger_index_state idx_state;
608 const char *p, *next_p;
609 size_t i, num_vnns;
610 uint32_t last_vnn;
611 uint8_t *remote_blob = NULL;
612 size_t remote_blob_len = 0;
614 DEBUG(10, ("notify_trigger called action=0x%x, filter=0x%x, "
615 "path=%s\n", (unsigned)action, (unsigned)filter, path));
617 /* see if change notify is enabled at all */
618 if (notify == NULL) {
619 return;
622 if (path[0] != '/') {
624 * The rest of this routine assumes an absolute path.
626 return;
629 idx_state.mem_ctx = talloc_tos();
630 idx_state.vnns = NULL;
631 idx_state.found_my_vnn = false;
632 idx_state.my_vnn = get_my_vnn();
634 for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
635 ptrdiff_t path_len = p - path;
636 bool recursive;
638 next_p = strchr(p+1, '/');
639 recursive = (next_p != NULL);
641 dbwrap_parse_record(
642 notify->db_index,
643 make_tdb_data(discard_const_p(uint8_t, path), path_len),
644 notify_trigger_index_parser, &idx_state);
646 if (idx_state.found_my_vnn) {
647 notify_trigger_local(notify, action, filter,
648 path, path_len, recursive);
649 idx_state.found_my_vnn = false;
653 if (idx_state.vnns == NULL) {
654 goto done;
657 ctdbd_conn = messaging_ctdbd_connection();
658 if (ctdbd_conn == NULL) {
659 goto done;
662 num_vnns = talloc_array_length(idx_state.vnns);
663 qsort(idx_state.vnns, num_vnns, sizeof(uint32_t), vnn_cmp);
665 last_vnn = 0xffffffff;
667 if (!notify_push_remote_blob(talloc_tos(), action, filter, path,
668 &remote_blob, &remote_blob_len)) {
669 DEBUG(1, ("notify_push_remote_blob failed\n"));
670 goto done;
673 for (i=0; i<num_vnns; i++) {
674 uint32_t vnn = idx_state.vnns[i];
675 NTSTATUS status;
677 if (vnn == last_vnn) {
678 continue;
681 status = ctdbd_messaging_send_blob(
682 ctdbd_conn, vnn, CTDB_SRVID_SAMBA_NOTIFY_PROXY,
683 remote_blob, remote_blob_len);
684 if (!NT_STATUS_IS_OK(status)) {
685 DEBUG(10, ("ctdbd_messaging_send_blob to vnn %d "
686 "returned %s, ignoring\n", (int)vnn,
687 nt_errstr(status)));
690 last_vnn = vnn;
693 done:
694 TALLOC_FREE(remote_blob);
695 TALLOC_FREE(idx_state.vnns);
698 static void notify_trigger_local(struct notify_context *notify,
699 uint32_t action, uint32_t filter,
700 const char *path, size_t path_len,
701 bool recursive)
703 TDB_DATA data;
704 struct notify_db_entry *entries;
705 size_t i, num_entries;
706 NTSTATUS status;
708 DEBUG(10, ("notify_trigger_local called for %*s, path_len=%d, "
709 "filter=%d\n", (int)path_len, path, (int)path_len,
710 (int)filter));
712 status = dbwrap_fetch(
713 notify->db_notify, talloc_tos(),
714 make_tdb_data(discard_const_p(uint8_t, path), path_len), &data);
715 if (!NT_STATUS_IS_OK(status)) {
716 DEBUG(10, ("dbwrap_fetch returned %s\n",
717 nt_errstr(status)));
718 return;
720 if (data.dsize == sizeof(time_t)) {
721 DEBUG(10, ("Got deleted record\n"));
722 goto done;
724 if ((data.dsize % sizeof(struct notify_db_entry)) != 0) {
725 DEBUG(1, ("Invalid data.dsize = %u\n",
726 (unsigned)data.dsize));
727 goto done;
730 entries = (struct notify_db_entry *)data.dptr;
731 num_entries = data.dsize / sizeof(struct notify_db_entry);
733 DEBUG(10, ("recursive = %s pathlen=%d (%c)\n",
734 recursive ? "true" : "false", (int)path_len,
735 path[path_len]));
737 for (i=0; i<num_entries; i++) {
738 struct notify_db_entry *e = &entries[i];
739 uint32_t e_filter;
741 if (DEBUGLEVEL >= 10) {
742 NDR_PRINT_DEBUG(notify_db_entry, e);
745 e_filter = recursive ? e->subdir_filter : e->filter;
747 if ((filter & e_filter) == 0) {
748 continue;
751 if (!procid_is_local(&e->server)) {
752 DEBUG(1, ("internal error: Non-local pid %s in "
753 "notify.tdb\n",
754 procid_str_static(&e->server)));
755 continue;
758 status = notify_send(notify, &e->server, path + path_len + 1,
759 action, e->private_data);
760 if (!NT_STATUS_IS_OK(status)) {
761 DEBUG(10, ("notify_send returned %s\n",
762 nt_errstr(status)));
766 done:
767 TALLOC_FREE(data.dptr);
770 struct notify_msg {
771 struct timespec when;
772 void *private_data;
773 uint32_t action;
774 char path[1];
777 static NTSTATUS notify_send(struct notify_context *notify,
778 struct server_id *pid,
779 const char *path, uint32_t action,
780 void *private_data)
782 struct notify_msg m = {};
783 struct iovec iov[2];
785 m.when = timespec_current();
786 m.private_data = private_data;
787 m.action = action;
789 iov[0].iov_base = &m;
790 iov[0].iov_len = offsetof(struct notify_msg, path);
791 iov[1].iov_base = discard_const_p(char, path);
792 iov[1].iov_len = strlen(path)+1;
794 return messaging_send_iov(notify->msg, *pid, MSG_PVFS_NOTIFY,
795 iov, ARRAY_SIZE(iov));
798 static void notify_handler(struct messaging_context *msg_ctx,
799 void *private_data, uint32_t msg_type,
800 struct server_id server_id, DATA_BLOB *data)
802 struct notify_context *notify = talloc_get_type_abort(
803 private_data, struct notify_context);
804 struct notify_msg *m;
805 struct notify_event e;
806 struct notify_list *listel;
808 if (data->length == 0) {
809 DEBUG(1, ("%s: Got 0-sized MSG_PVFS_NOTIFY msg\n", __func__));
810 return;
812 if (data->data[data->length-1] != 0) {
813 DEBUG(1, ("%s: MSG_PVFS_NOTIFY path not 0-terminated\n",
814 __func__));
815 return;
818 m = (struct notify_msg *)data->data;
819 e.action = m->action;
820 e.path = m->path;
821 e.private_data = m->private_data;
823 for (listel=notify->list;listel;listel=listel->next) {
824 if (listel->private_data == m->private_data) {
825 listel->callback(listel->private_data, m->when, &e);
826 break;
831 struct notify_walk_idx_state {
832 void (*fn)(const char *path,
833 uint32_t *vnns, size_t num_vnns,
834 void *private_data);
835 void *private_data;
838 static int notify_walk_idx_fn(struct db_record *rec, void *private_data)
840 struct notify_walk_idx_state *state =
841 (struct notify_walk_idx_state *)private_data;
842 TDB_DATA key, value;
843 char *path;
845 key = dbwrap_record_get_key(rec);
846 value = dbwrap_record_get_value(rec);
848 if ((value.dsize % sizeof(uint32_t)) != 0) {
849 DEBUG(1, ("invalid value size in notify index db: %u\n",
850 (unsigned)(value.dsize)));
851 return 0;
854 path = talloc_strndup(talloc_tos(), (char *)key.dptr, key.dsize);
855 if (path == NULL) {
856 DEBUG(1, ("talloc_strndup failed\n"));
857 return 0;
859 state->fn(path, (uint32_t *)value.dptr, value.dsize/sizeof(uint32_t),
860 state->private_data);
861 TALLOC_FREE(path);
862 return 0;
865 void notify_walk_idx(struct notify_context *notify,
866 void (*fn)(const char *path,
867 uint32_t *vnns, size_t num_vnns,
868 void *private_data),
869 void *private_data)
871 struct notify_walk_idx_state state;
872 state.fn = fn;
873 state.private_data = private_data;
874 dbwrap_traverse_read(notify->db_index, notify_walk_idx_fn, &state,
875 NULL);
878 struct notify_walk_state {
879 void (*fn)(const char *path,
880 struct notify_db_entry *entries, size_t num_entries,
881 time_t deleted_time, void *private_data);
882 void *private_data;
885 static int notify_walk_fn(struct db_record *rec, void *private_data)
887 struct notify_walk_state *state =
888 (struct notify_walk_state *)private_data;
889 TDB_DATA key, value;
890 struct notify_db_entry *entries;
891 size_t num_entries;
892 time_t deleted_time;
893 char *path;
895 key = dbwrap_record_get_key(rec);
896 value = dbwrap_record_get_value(rec);
898 if (value.dsize == sizeof(deleted_time)) {
899 memcpy(&deleted_time, value.dptr, sizeof(deleted_time));
900 entries = NULL;
901 num_entries = 0;
902 } else {
903 if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
904 DEBUG(1, ("invalid value size in notify db: %u\n",
905 (unsigned)(value.dsize)));
906 return 0;
908 entries = (struct notify_db_entry *)value.dptr;
909 num_entries = value.dsize / sizeof(struct notify_db_entry);
910 deleted_time = 0;
913 path = talloc_strndup(talloc_tos(), (char *)key.dptr, key.dsize);
914 if (path == NULL) {
915 DEBUG(1, ("talloc_strndup failed\n"));
916 return 0;
918 state->fn(path, entries, num_entries, deleted_time,
919 state->private_data);
920 TALLOC_FREE(path);
921 return 0;
924 void notify_walk(struct notify_context *notify,
925 void (*fn)(const char *path,
926 struct notify_db_entry *entries,
927 size_t num_entries,
928 time_t deleted_time, void *private_data),
929 void *private_data)
931 struct notify_walk_state state;
932 state.fn = fn;
933 state.private_data = private_data;
934 dbwrap_traverse_read(notify->db_notify, notify_walk_fn, &state,
935 NULL);
938 struct notify_cleanup_state {
939 TALLOC_CTX *mem_ctx;
940 time_t delete_before;
941 ssize_t array_size;
942 uint32_t num_paths;
943 char **paths;
946 static void notify_cleanup_collect(
947 const char *path, struct notify_db_entry *entries, size_t num_entries,
948 time_t deleted_time, void *private_data)
950 struct notify_cleanup_state *state =
951 (struct notify_cleanup_state *)private_data;
952 char *p;
954 if (num_entries != 0) {
955 return;
957 if (deleted_time >= state->delete_before) {
958 return;
961 p = talloc_strdup(state->mem_ctx, path);
962 if (p == NULL) {
963 DEBUG(1, ("talloc_strdup failed\n"));
964 return;
966 add_to_large_array(state->mem_ctx, sizeof(p), (void *)&p,
967 &state->paths, &state->num_paths,
968 &state->array_size);
969 if (state->array_size == -1) {
970 TALLOC_FREE(p);
974 static bool notify_cleanup_path(struct notify_context *notify,
975 const char *path, time_t delete_before);
977 void notify_cleanup(struct notify_context *notify)
979 struct notify_cleanup_state state;
980 uint32_t failure_pool;
982 ZERO_STRUCT(state);
983 state.mem_ctx = talloc_stackframe();
985 state.delete_before = time(NULL)
986 - lp_parm_int(-1, "smbd", "notify cleanup interval", 60);
988 notify_walk(notify, notify_cleanup_collect, &state);
990 failure_pool = state.num_paths;
992 while (state.num_paths != 0) {
993 size_t idx;
996 * This loop is designed to be as kind as possible to
997 * ctdb. ctdb does not like it if many smbds hammer on a
998 * single record. If on many nodes the cleanup process starts
999 * running, it can happen that all of them need to clean up
1000 * records in the same order. This would generate a ctdb
1001 * migrate storm on these records. Randomizing the load across
1002 * multiple records reduces the load on the individual record.
1005 generate_random_buffer((uint8_t *)&idx, sizeof(idx));
1006 idx = idx % state.num_paths;
1008 if (!notify_cleanup_path(notify, state.paths[idx],
1009 state.delete_before)) {
1011 * notify_cleanup_path failed, the most likely reason
1012 * is that dbwrap_try_fetch_locked failed due to
1013 * contention. We allow one failed attempt per deleted
1014 * path on average before we give up.
1016 failure_pool -= 1;
1017 if (failure_pool == 0) {
1019 * Too many failures. We will come back here,
1020 * maybe next time there is less contention.
1022 break;
1026 TALLOC_FREE(state.paths[idx]);
1027 state.paths[idx] = state.paths[state.num_paths-1];
1028 state.num_paths -= 1;
1030 TALLOC_FREE(state.mem_ctx);
1033 static bool notify_cleanup_path(struct notify_context *notify,
1034 const char *path, time_t delete_before)
1036 struct db_record *notify_rec = NULL;
1037 struct db_record *idx_rec = NULL;
1038 TDB_DATA key = string_tdb_data(path);
1039 TDB_DATA value;
1040 time_t deleted;
1041 NTSTATUS status;
1043 notify_rec = dbwrap_fetch_locked(notify->db_notify, talloc_tos(), key);
1044 if (notify_rec == NULL) {
1045 DEBUG(10, ("Could not fetch notify_rec\n"));
1046 return false;
1048 value = dbwrap_record_get_value(notify_rec);
1050 if (value.dsize != sizeof(deleted)) {
1051 DEBUG(10, ("record %s has been re-used\n", path));
1052 goto done;
1054 memcpy(&deleted, value.dptr, sizeof(deleted));
1056 if (deleted >= delete_before) {
1057 DEBUG(10, ("record %s too young\n", path));
1058 goto done;
1062 * Be kind to ctdb and only try one dmaster migration at most.
1064 idx_rec = dbwrap_try_fetch_locked(notify->db_index, talloc_tos(), key);
1065 if (idx_rec == NULL) {
1066 DEBUG(10, ("Could not fetch idx_rec\n"));
1067 goto done;
1070 status = dbwrap_record_delete(notify_rec);
1071 if (!NT_STATUS_IS_OK(status)) {
1072 DEBUG(10, ("Could not delete notify_rec: %s\n",
1073 nt_errstr(status)));
1076 status = notify_del_idx(idx_rec, get_my_vnn());
1077 if (!NT_STATUS_IS_OK(status)) {
1078 DEBUG(10, ("Could not delete idx_rec: %s\n",
1079 nt_errstr(status)));
1082 done:
1083 TALLOC_FREE(idx_rec);
1084 TALLOC_FREE(notify_rec);
1085 return true;
1088 static NTSTATUS notify_del_idx(struct db_record *rec, uint32_t vnn)
1090 TDB_DATA value = dbwrap_record_get_value(rec);
1091 uint32_t *vnns;
1092 size_t i, num_vnns;
1094 if ((value.dsize % sizeof(uint32_t)) != 0) {
1095 DEBUG(1, ("Invalid value.dsize = %u\n",
1096 (unsigned)value.dsize));
1097 return NT_STATUS_INTERNAL_DB_CORRUPTION;
1099 num_vnns = value.dsize / sizeof(uint32_t);
1100 vnns = (uint32_t *)value.dptr;
1102 for (i=0; i<num_vnns; i++) {
1103 if (vnns[i] == vnn) {
1104 break;
1108 if (i == num_vnns) {
1110 * Not found. Should not happen, but okay...
1112 return NT_STATUS_OK;
1115 memmove(&vnns[i], &vnns[i+1], sizeof(uint32_t) * (num_vnns - i - 1));
1116 value.dsize -= sizeof(uint32_t);
1118 if (value.dsize == 0) {
1119 return dbwrap_record_delete(rec);
1121 return dbwrap_record_store(rec, value, 0);
1124 struct notify_cluster_proxy_state {
1125 struct tevent_context *ev;
1126 struct notify_context *notify;
1127 struct ctdb_msg_channel *chan;
1130 static void notify_cluster_proxy_got_chan(struct tevent_req *subreq);
1131 static void notify_cluster_proxy_got_msg(struct tevent_req *subreq);
1132 static void notify_cluster_proxy_trigger(struct notify_context *notify,
1133 uint32_t action, uint32_t filter,
1134 char *path);
1136 struct tevent_req *notify_cluster_proxy_send(
1137 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1138 struct notify_context *notify)
1140 struct tevent_req *req, *subreq;
1141 struct notify_cluster_proxy_state *state;
1143 req = tevent_req_create(mem_ctx, &state,
1144 struct notify_cluster_proxy_state);
1145 if (req == NULL) {
1146 return NULL;
1148 state->ev = ev;
1149 state->notify = notify;
1151 subreq = ctdb_msg_channel_init_send(
1152 state, state->ev, lp_ctdbd_socket(),
1153 CTDB_SRVID_SAMBA_NOTIFY_PROXY);
1154 if (tevent_req_nomem(subreq, req)) {
1155 return tevent_req_post(req, ev);
1157 tevent_req_set_callback(subreq, notify_cluster_proxy_got_chan, req);
1158 return req;
1161 static void notify_cluster_proxy_got_chan(struct tevent_req *subreq)
1163 struct tevent_req *req = tevent_req_callback_data(
1164 subreq, struct tevent_req);
1165 struct notify_cluster_proxy_state *state = tevent_req_data(
1166 req, struct notify_cluster_proxy_state);
1167 int ret;
1169 ret = ctdb_msg_channel_init_recv(subreq, state, &state->chan);
1170 TALLOC_FREE(subreq);
1171 if (ret != 0) {
1172 tevent_req_error(req, ret);
1173 return;
1175 subreq = ctdb_msg_read_send(state, state->ev, state->chan);
1176 if (tevent_req_nomem(subreq, req)) {
1177 return;
1179 tevent_req_set_callback(subreq, notify_cluster_proxy_got_msg, req);
1182 static void notify_cluster_proxy_got_msg(struct tevent_req *subreq)
1184 struct tevent_req *req = tevent_req_callback_data(
1185 subreq, struct tevent_req);
1186 struct notify_cluster_proxy_state *state = tevent_req_data(
1187 req, struct notify_cluster_proxy_state);
1188 uint8_t *msg;
1189 size_t msg_len;
1190 uint32_t action, filter;
1191 char *path;
1192 int ret;
1193 bool res;
1195 ret = ctdb_msg_read_recv(subreq, talloc_tos(), &msg, &msg_len);
1196 TALLOC_FREE(subreq);
1197 if (ret != 0) {
1198 tevent_req_error(req, ret);
1199 return;
1202 res = notify_pull_remote_blob(talloc_tos(), msg, msg_len,
1203 &action, &filter, &path);
1204 TALLOC_FREE(msg);
1205 if (!res) {
1206 tevent_req_error(req, EIO);
1207 return;
1209 notify_cluster_proxy_trigger(state->notify, action, filter, path);
1210 TALLOC_FREE(path);
1212 subreq = ctdb_msg_read_send(state, state->ev, state->chan);
1213 if (tevent_req_nomem(subreq, req)) {
1214 return;
1216 tevent_req_set_callback(subreq, notify_cluster_proxy_got_msg, req);
1219 static void notify_cluster_proxy_trigger(struct notify_context *notify,
1220 uint32_t action, uint32_t filter,
1221 char *path)
1223 const char *p, *next_p;
1225 for (p = path; p != NULL; p = next_p) {
1226 ptrdiff_t path_len = p - path;
1227 bool recursive;
1229 next_p = strchr(p+1, '/');
1230 recursive = (next_p != NULL);
1232 notify_trigger_local(notify, action, filter,
1233 path, path_len, recursive);
1237 int notify_cluster_proxy_recv(struct tevent_req *req)
1239 int err;
1241 if (tevent_req_is_unix_error(req, &err)) {
1242 return err;
1244 return 0;