s3:smb2_create: don't take 'state->te' as indication for "was_deferred" (bug #9196)
[Samba.git] / source3 / smbd / notify_internal.c
blob938c57b574a28fe24903da2b06b19b3cf5cba8c6
1 /*
2 Unix SMB/CIFS implementation.
4 Copyright (C) Andrew Tridgell 2006
5 Copyright (C) Volker Lendecke 2012
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 this is the change notify database. It implements mechanisms for
23 storing current change notify waiters in a tdb, and checking if a
24 given event matches any of the stored notify waiiters.
27 #include "includes.h"
28 #include "system/filesys.h"
29 #include "librpc/gen_ndr/ndr_notify.h"
30 #include "dbwrap/dbwrap.h"
31 #include "dbwrap/dbwrap_open.h"
32 #include "dbwrap/dbwrap_tdb.h"
33 #include "smbd/smbd.h"
34 #include "messages.h"
35 #include "lib/tdb_wrap/tdb_wrap.h"
36 #include "util_tdb.h"
37 #include "lib/param/param.h"
38 #include "lib/dbwrap/dbwrap_cache.h"
39 #include "ctdb_srvids.h"
40 #include "ctdbd_conn.h"
41 #include "ctdb_conn.h"
42 #include "lib/util/tevent_unix.h"
44 struct notify_list {
45 struct notify_list *next, *prev;
46 const char *path;
47 void (*callback)(void *, const struct notify_event *);
48 void *private_data;
51 struct notify_context {
52 struct messaging_context *msg;
53 struct notify_list *list;
56 * The notify database is split up into two databases: One
57 * relatively static index db and the real notify db with the
58 * volatile entries.
62 * "db_notify" is indexed by pathname. Per record it stores an
63 * array of notify_db_entry structs. These represent the
64 * notify records as requested by the smb client. This
65 * database is always held locally, it is never clustered.
67 struct db_context *db_notify;
70 * "db_index" is indexed by pathname. The records are an array
71 * of VNNs which have any interest in notifies for this path
72 * name.
74 * In the non-clustered case this database is cached in RAM by
75 * means of db_cache_open, which maintains a cache per
76 * process. Cache consistency is maintained by the tdb
77 * sequence number.
79 * In the clustered case right now we can not use the tdb
80 * sequence number, but by means of read only records we
81 * should be able to avoid a lot of full migrations.
83 * In both cases, it is important to keep the update
84 * operations to db_index to a minimum. This is achieved by
85 * delayed deletion. When a db_notify is initially created,
86 * the db_index record is also created. When more notifies are
87 * add for a path, then only the db_notify record needs to be
88 * modified, the db_index record is not touched. When the last
89 * entry from the db_notify record is deleted, the db_index
90 * record is not immediately deleted. Instead, the db_notify
91 * record is replaced with a current timestamp. A regular
92 * cleanup process will delete all db_index records that are
93 * older than a minute.
95 struct db_context *db_index;
98 static void notify_trigger_local(struct notify_context *notify,
99 uint32_t action, uint32_t filter,
100 const char *path, size_t path_len,
101 bool recursive);
102 static NTSTATUS notify_send(struct notify_context *notify,
103 struct server_id *pid,
104 const char *path, uint32_t action,
105 void *private_data);
106 static NTSTATUS notify_add_entry(struct db_record *rec,
107 const struct notify_db_entry *e,
108 bool *p_add_idx);
109 static NTSTATUS notify_add_idx(struct db_record *rec, uint32_t vnn);
111 static NTSTATUS notify_del_entry(struct db_record *rec,
112 const struct server_id *pid,
113 void *private_data);
114 static NTSTATUS notify_del_idx(struct db_record *rec, uint32_t vnn);
116 static int notify_context_destructor(struct notify_context *notify);
118 static void notify_handler(struct messaging_context *msg_ctx,
119 void *private_data, uint32_t msg_type,
120 struct server_id server_id, DATA_BLOB *data);
122 struct notify_context *notify_init(TALLOC_CTX *mem_ctx,
123 struct messaging_context *msg,
124 struct event_context *ev)
126 struct loadparm_context *lp_ctx;
127 struct notify_context *notify;
129 notify = talloc(mem_ctx, struct notify_context);
130 if (notify == NULL) {
131 goto fail;
133 notify->msg = msg;
134 notify->list = NULL;
136 lp_ctx = loadparm_init_s3(notify, loadparm_s3_helpers());
137 notify->db_notify = db_open_tdb(
138 notify, lp_ctx, lock_path("notify.tdb"),
139 0, TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
140 O_RDWR|O_CREAT, 0644, DBWRAP_LOCK_ORDER_2);
141 talloc_unlink(notify, lp_ctx);
142 if (notify->db_notify == NULL) {
143 goto fail;
145 notify->db_index = db_open(
146 notify, lock_path("notify_index.tdb"),
147 0, TDB_SEQNUM|TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
148 O_RDWR|O_CREAT, 0644, DBWRAP_LOCK_ORDER_3);
149 if (notify->db_index == NULL) {
150 goto fail;
152 if (!lp_clustering()) {
153 notify->db_index = db_open_cache(notify, notify->db_index);
154 if (notify->db_index == NULL) {
155 goto fail;
159 if (notify->msg != NULL) {
160 NTSTATUS status;
162 status = messaging_register(notify->msg, notify,
163 MSG_PVFS_NOTIFY, notify_handler);
164 if (!NT_STATUS_IS_OK(status)) {
165 DEBUG(1, ("messaging_register returned %s\n",
166 nt_errstr(status)));
167 goto fail;
171 talloc_set_destructor(notify, notify_context_destructor);
173 return notify;
174 fail:
175 TALLOC_FREE(notify);
176 return NULL;
179 static int notify_context_destructor(struct notify_context *notify)
181 DEBUG(10, ("notify_context_destructor called\n"));
183 if (notify->msg != NULL) {
184 messaging_deregister(notify->msg, MSG_PVFS_NOTIFY, notify);
187 while (notify->list != NULL) {
188 DEBUG(10, ("Removing private_data=%p\n",
189 notify->list->private_data));
190 notify_remove(notify, notify->list->private_data);
192 return 0;
195 NTSTATUS notify_add(struct notify_context *notify,
196 const char *path, uint32_t filter, uint32_t subdir_filter,
197 void (*callback)(void *, const struct notify_event *),
198 void *private_data)
200 struct notify_db_entry e;
201 struct notify_list *listel;
202 struct db_record *notify_rec, *idx_rec;
203 bool add_idx;
204 NTSTATUS status;
205 TDB_DATA key, notify_copy;
207 if (notify == NULL) {
208 return NT_STATUS_NOT_IMPLEMENTED;
211 DEBUG(10, ("notify_add: path=[%s], private_data=%p\n", path,
212 private_data));
214 listel = talloc(notify, struct notify_list);
215 if (listel == NULL) {
216 return NT_STATUS_NO_MEMORY;
218 listel->callback = callback;
219 listel->private_data = private_data;
220 listel->path = talloc_strdup(listel, path);
221 if (listel->path == NULL) {
222 TALLOC_FREE(listel);
223 return NT_STATUS_NO_MEMORY;
225 DLIST_ADD(notify->list, listel);
227 ZERO_STRUCT(e);
228 e.filter = filter;
229 e.subdir_filter = subdir_filter;
230 e.server = messaging_server_id(notify->msg);
231 e.private_data = private_data;
233 key = string_tdb_data(path);
235 notify_rec = dbwrap_fetch_locked(notify->db_notify,
236 talloc_tos(), key);
237 if (notify_rec == NULL) {
238 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
239 goto fail;
243 * Make a copy of the notify_rec for easy restore in case
244 * updating the index_rec fails;
246 notify_copy = dbwrap_record_get_value(notify_rec);
247 if (notify_copy.dsize != 0) {
248 notify_copy.dptr = (uint8_t *)talloc_memdup(
249 notify_rec, notify_copy.dptr,
250 notify_copy.dsize);
251 if (notify_copy.dptr == NULL) {
252 TALLOC_FREE(notify_rec);
253 status = NT_STATUS_NO_MEMORY;
254 goto fail;
258 if (DEBUGLEVEL >= 10) {
259 NDR_PRINT_DEBUG(notify_db_entry, &e);
262 status = notify_add_entry(notify_rec, &e, &add_idx);
263 if (!NT_STATUS_IS_OK(status)) {
264 goto fail;
266 if (!add_idx) {
268 * Someone else has added the idx entry already
270 TALLOC_FREE(notify_rec);
271 return NT_STATUS_OK;
274 idx_rec = dbwrap_fetch_locked(notify->db_index,
275 talloc_tos(), key);
276 if (idx_rec == NULL) {
277 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
278 goto restore_notify;
280 status = notify_add_idx(idx_rec, get_my_vnn());
281 if (!NT_STATUS_IS_OK(status)) {
282 goto restore_notify;
285 TALLOC_FREE(idx_rec);
286 TALLOC_FREE(notify_rec);
287 return NT_STATUS_OK;
289 restore_notify:
290 if (notify_copy.dsize != 0) {
291 dbwrap_record_store(notify_rec, notify_copy, 0);
292 } else {
293 dbwrap_record_delete(notify_rec);
295 TALLOC_FREE(notify_rec);
296 fail:
297 DLIST_REMOVE(notify->list, listel);
298 TALLOC_FREE(listel);
299 return status;
302 static NTSTATUS notify_add_entry(struct db_record *rec,
303 const struct notify_db_entry *e,
304 bool *p_add_idx)
306 TDB_DATA value = dbwrap_record_get_value(rec);
307 struct notify_db_entry *entries;
308 size_t num_entries;
309 bool add_idx = true;
310 NTSTATUS status;
312 if (value.dsize == sizeof(time_t)) {
313 DEBUG(10, ("Re-using deleted entry\n"));
314 value.dsize = 0;
315 add_idx = false;
318 if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
319 DEBUG(1, ("Invalid value.dsize = %u\n",
320 (unsigned)value.dsize));
321 return NT_STATUS_INTERNAL_DB_CORRUPTION;
323 num_entries = value.dsize / sizeof(struct notify_db_entry);
325 if (num_entries != 0) {
326 add_idx = false;
329 entries = talloc_array(rec, struct notify_db_entry, num_entries + 1);
330 if (entries == NULL) {
331 return NT_STATUS_NO_MEMORY;
333 memcpy(entries, value.dptr, value.dsize);
335 entries[num_entries] = *e;
336 value = make_tdb_data((uint8_t *)entries, talloc_get_size(entries));
337 status = dbwrap_record_store(rec, value, 0);
338 TALLOC_FREE(entries);
339 if (!NT_STATUS_IS_OK(status)) {
340 return status;
342 *p_add_idx = add_idx;
343 return NT_STATUS_OK;
346 static NTSTATUS notify_add_idx(struct db_record *rec, uint32_t vnn)
348 TDB_DATA value = dbwrap_record_get_value(rec);
349 uint32_t *vnns;
350 size_t i, num_vnns;
351 NTSTATUS status;
353 if ((value.dsize % sizeof(uint32_t)) != 0) {
354 DEBUG(1, ("Invalid value.dsize = %u\n",
355 (unsigned)value.dsize));
356 return NT_STATUS_INTERNAL_DB_CORRUPTION;
358 num_vnns = value.dsize / sizeof(uint32_t);
359 vnns = (uint32_t *)value.dptr;
361 for (i=0; i<num_vnns; i++) {
362 if (vnns[i] == vnn) {
363 return NT_STATUS_OK;
365 if (vnns[i] > vnn) {
366 break;
370 value.dptr = (uint8_t *)talloc_realloc(
371 rec, value.dptr, uint32_t, num_vnns + 1);
372 if (value.dptr == NULL) {
373 return NT_STATUS_NO_MEMORY;
375 value.dsize = talloc_get_size(value.dptr);
377 vnns = (uint32_t *)value.dptr;
379 memmove(&vnns[i+1], &vnns[i], sizeof(uint32_t) * (num_vnns - i));
380 vnns[i] = vnn;
382 status = dbwrap_record_store(rec, value, 0);
383 if (!NT_STATUS_IS_OK(status)) {
384 return status;
386 return NT_STATUS_OK;
389 NTSTATUS notify_remove(struct notify_context *notify, void *private_data)
391 struct server_id pid;
392 struct notify_list *listel;
393 struct db_record *notify_rec;
394 NTSTATUS status;
396 if ((notify == NULL) || (notify->msg == NULL)) {
397 return NT_STATUS_NOT_IMPLEMENTED;
400 DEBUG(10, ("notify_remove: private_data=%p\n", private_data));
402 pid = messaging_server_id(notify->msg);
404 for (listel=notify->list;listel;listel=listel->next) {
405 if (listel->private_data == private_data) {
406 DLIST_REMOVE(notify->list, listel);
407 break;
410 if (listel == NULL) {
411 DEBUG(10, ("%p not found\n", private_data));
412 return NT_STATUS_NOT_FOUND;
414 notify_rec = dbwrap_fetch_locked(notify->db_notify, talloc_tos(),
415 string_tdb_data(listel->path));
416 TALLOC_FREE(listel);
417 if (notify_rec == NULL) {
418 return NT_STATUS_INTERNAL_DB_CORRUPTION;
420 status = notify_del_entry(notify_rec, &pid, private_data);
421 DEBUG(10, ("del_entry returned %s\n", nt_errstr(status)));
422 TALLOC_FREE(notify_rec);
423 return status;
426 static NTSTATUS notify_del_entry(struct db_record *rec,
427 const struct server_id *pid,
428 void *private_data)
430 TDB_DATA value = dbwrap_record_get_value(rec);
431 struct notify_db_entry *entries;
432 size_t i, num_entries;
433 time_t now;
435 DEBUG(10, ("del_entry called for %s %p\n", procid_str_static(pid),
436 private_data));
438 if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
439 DEBUG(1, ("Invalid value.dsize = %u\n",
440 (unsigned)value.dsize));
441 return NT_STATUS_INTERNAL_DB_CORRUPTION;
443 num_entries = value.dsize / sizeof(struct notify_db_entry);
444 entries = (struct notify_db_entry *)value.dptr;
446 for (i=0; i<num_entries; i++) {
447 struct notify_db_entry *e = &entries[i];
449 if (DEBUGLEVEL >= 10) {
450 NDR_PRINT_DEBUG(notify_db_entry, e);
453 if (e->private_data != private_data) {
454 continue;
456 if (serverid_equal(&e->server, pid)) {
457 break;
460 if (i == num_entries) {
461 return NT_STATUS_NOT_FOUND;
463 entries[i] = entries[num_entries-1];
464 value.dsize -= sizeof(struct notify_db_entry);
466 if (value.dsize == 0) {
467 now = time(NULL);
468 value.dptr = (uint8_t *)&now;
469 value.dsize = sizeof(now);
471 return dbwrap_record_store(rec, value, 0);
474 struct notify_trigger_index_state {
475 TALLOC_CTX *mem_ctx;
476 uint32_t *vnns;
477 uint32_t my_vnn;
478 bool found_my_vnn;
481 static void notify_trigger_index_parser(TDB_DATA key, TDB_DATA data,
482 void *private_data)
484 struct notify_trigger_index_state *state =
485 (struct notify_trigger_index_state *)private_data;
486 uint32_t *new_vnns;
487 size_t i, num_vnns, num_new_vnns;
489 if ((data.dsize % sizeof(uint32_t)) != 0) {
490 DEBUG(1, ("Invalid record size in notify index db: %u\n",
491 (unsigned)data.dsize));
492 return;
494 new_vnns = (uint32_t *)data.dptr;
495 num_new_vnns = data.dsize / sizeof(uint32_t);
497 num_vnns = talloc_array_length(state->vnns);
499 for (i=0; i<num_new_vnns; i++) {
500 if (new_vnns[i] == state->my_vnn) {
501 state->found_my_vnn = true;
505 state->vnns = talloc_realloc(state->mem_ctx, state->vnns, uint32_t,
506 num_vnns + num_new_vnns);
507 if ((num_vnns + num_new_vnns != 0) && (state->vnns == NULL)) {
508 DEBUG(1, ("talloc_realloc failed\n"));
509 return;
511 memcpy(&state->vnns[num_vnns], data.dptr, data.dsize);
514 static int vnn_cmp(const void *p1, const void *p2)
516 const uint32_t *vnn1 = (const uint32_t *)p1;
517 const uint32_t *vnn2 = (const uint32_t *)p2;
519 if (*vnn1 < *vnn2) {
520 return -1;
522 if (*vnn1 == *vnn2) {
523 return 0;
525 return 1;
528 static bool notify_push_remote_blob(TALLOC_CTX *mem_ctx, uint32_t action,
529 uint32_t filter, const char *path,
530 uint8_t **pblob, size_t *pblob_len)
532 struct notify_remote_event ev;
533 DATA_BLOB data;
534 enum ndr_err_code ndr_err;
536 ev.action = action;
537 ev.filter = filter;
538 ev.path = path;
540 if (DEBUGLEVEL >= 10) {
541 NDR_PRINT_DEBUG(notify_remote_event, &ev);
544 ndr_err = ndr_push_struct_blob(
545 &data, mem_ctx, &ev,
546 (ndr_push_flags_fn_t)ndr_push_notify_remote_event);
547 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
548 return false;
550 *pblob = data.data;
551 *pblob_len = data.length;
552 return true;
555 static bool notify_pull_remote_blob(TALLOC_CTX *mem_ctx,
556 const uint8_t *blob, size_t blob_len,
557 uint32_t *paction, uint32_t *pfilter,
558 char **path)
560 struct notify_remote_event *ev;
561 enum ndr_err_code ndr_err;
562 DATA_BLOB data;
563 char *p;
565 data.data = discard_const_p(uint8_t, blob);
566 data.length = blob_len;
568 ev = talloc(mem_ctx, struct notify_remote_event);
569 if (ev == NULL) {
570 return false;
573 ndr_err = ndr_pull_struct_blob(
574 &data, ev, ev,
575 (ndr_pull_flags_fn_t)ndr_pull_notify_remote_event);
576 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
577 TALLOC_FREE(ev);
578 return false;
580 if (DEBUGLEVEL >= 10) {
581 NDR_PRINT_DEBUG(notify_remote_event, ev);
583 *paction = ev->action;
584 *pfilter = ev->filter;
585 p = discard_const_p(char, ev->path);
586 *path = talloc_move(mem_ctx, &p);
588 TALLOC_FREE(ev);
589 return true;
592 void notify_trigger(struct notify_context *notify,
593 uint32_t action, uint32_t filter, const char *path)
595 struct ctdbd_connection *ctdbd_conn;
596 struct notify_trigger_index_state idx_state;
597 const char *p, *next_p;
598 size_t i, num_vnns;
599 uint32_t last_vnn;
600 uint8_t *remote_blob = NULL;
601 size_t remote_blob_len = 0;
603 DEBUG(10, ("notify_trigger called action=0x%x, filter=0x%x, "
604 "path=%s\n", (unsigned)action, (unsigned)filter, path));
606 /* see if change notify is enabled at all */
607 if (notify == NULL) {
608 return;
611 idx_state.mem_ctx = talloc_tos();
612 idx_state.vnns = NULL;
613 idx_state.my_vnn = get_my_vnn();
615 for (p = path; p != NULL; p = next_p) {
616 ptrdiff_t path_len = p - path;
617 bool recursive;
619 next_p = strchr(p+1, '/');
620 recursive = (next_p != NULL);
622 idx_state.found_my_vnn = false;
624 dbwrap_parse_record(
625 notify->db_index,
626 make_tdb_data(discard_const_p(uint8_t, path), path_len),
627 notify_trigger_index_parser, &idx_state);
629 if (!idx_state.found_my_vnn) {
630 continue;
632 notify_trigger_local(notify, action, filter,
633 path, path_len, recursive);
636 ctdbd_conn = messaging_ctdbd_connection();
637 if (ctdbd_conn == NULL) {
638 goto done;
641 num_vnns = talloc_array_length(idx_state.vnns);
642 qsort(idx_state.vnns, num_vnns, sizeof(uint32_t), vnn_cmp);
644 last_vnn = 0xffffffff;
645 remote_blob = NULL;
647 for (i=0; i<num_vnns; i++) {
648 uint32_t vnn = idx_state.vnns[i];
649 NTSTATUS status;
651 if (vnn == last_vnn) {
652 continue;
654 if (vnn == idx_state.my_vnn) {
655 continue;
657 if ((remote_blob == NULL) &&
658 !notify_push_remote_blob(
659 talloc_tos(), action, filter,
660 path, &remote_blob, &remote_blob_len)) {
661 break;
664 status = ctdbd_messaging_send_blob(
665 ctdbd_conn, vnn, CTDB_SRVID_SAMBA_NOTIFY_PROXY,
666 remote_blob, remote_blob_len);
667 if (!NT_STATUS_IS_OK(status)) {
668 DEBUG(10, ("ctdbd_messaging_send_blob to vnn %d "
669 "returned %s, ignoring\n", (int)vnn,
670 nt_errstr(status)));
673 last_vnn = vnn;
676 done:
677 TALLOC_FREE(remote_blob);
678 TALLOC_FREE(idx_state.vnns);
681 static void notify_trigger_local(struct notify_context *notify,
682 uint32_t action, uint32_t filter,
683 const char *path, size_t path_len,
684 bool recursive)
686 TDB_DATA data;
687 struct notify_db_entry *entries;
688 size_t i, num_entries;
689 NTSTATUS status;
691 DEBUG(10, ("notify_trigger_local called for %*s, path_len=%d, "
692 "filter=%d\n", (int)path_len, path, (int)path_len,
693 (int)filter));
695 status = dbwrap_fetch(
696 notify->db_notify, talloc_tos(),
697 make_tdb_data(discard_const_p(uint8_t, path), path_len), &data);
698 if (!NT_STATUS_IS_OK(status)) {
699 DEBUG(10, ("dbwrap_fetch returned %s\n",
700 nt_errstr(status)));
701 return;
703 if (data.dsize == sizeof(time_t)) {
704 DEBUG(10, ("Got deleted record\n"));
705 goto done;
707 if ((data.dsize % sizeof(struct notify_db_entry)) != 0) {
708 DEBUG(1, ("Invalid data.dsize = %u\n",
709 (unsigned)data.dsize));
710 goto done;
713 entries = (struct notify_db_entry *)data.dptr;
714 num_entries = data.dsize / sizeof(struct notify_db_entry);
716 DEBUG(10, ("recursive = %s pathlen=%d (%c)\n",
717 recursive ? "true" : "false", (int)path_len,
718 path[path_len]));
720 for (i=0; i<num_entries; i++) {
721 struct notify_db_entry *e = &entries[i];
722 uint32_t e_filter;
724 if (DEBUGLEVEL >= 10) {
725 NDR_PRINT_DEBUG(notify_db_entry, e);
728 e_filter = recursive ? e->subdir_filter : e->filter;
730 if ((filter & e_filter) == 0) {
731 continue;
734 if (!procid_is_local(&e->server)) {
735 DEBUG(1, ("internal error: Non-local pid %s in "
736 "notify.tdb\n",
737 procid_str_static(&e->server)));
738 continue;
741 status = notify_send(notify, &e->server, path + path_len + 1,
742 action, e->private_data);
743 if (!NT_STATUS_IS_OK(status)) {
744 DEBUG(10, ("notify_send returned %s\n",
745 nt_errstr(status)));
749 done:
750 TALLOC_FREE(data.dptr);
753 static NTSTATUS notify_send(struct notify_context *notify,
754 struct server_id *pid,
755 const char *path, uint32_t action,
756 void *private_data)
758 struct notify_event ev;
759 DATA_BLOB data;
760 NTSTATUS status;
761 enum ndr_err_code ndr_err;
763 ev.action = action;
764 ev.path = path;
765 ev.private_data = private_data;
767 ndr_err = ndr_push_struct_blob(
768 &data, talloc_tos(), &ev,
769 (ndr_push_flags_fn_t)ndr_push_notify_event);
770 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
771 return ndr_map_error2ntstatus(ndr_err);
773 status = messaging_send(notify->msg, *pid, MSG_PVFS_NOTIFY,
774 &data);
775 TALLOC_FREE(data.data);
776 return status;
779 static void notify_handler(struct messaging_context *msg_ctx,
780 void *private_data, uint32_t msg_type,
781 struct server_id server_id, DATA_BLOB *data)
783 struct notify_context *notify = talloc_get_type_abort(
784 private_data, struct notify_context);
785 enum ndr_err_code ndr_err;
786 struct notify_event *n;
787 struct notify_list *listel;
789 n = talloc(talloc_tos(), struct notify_event);
790 if (n == NULL) {
791 DEBUG(1, ("talloc failed\n"));
792 return;
795 ndr_err = ndr_pull_struct_blob(
796 data, n, n, (ndr_pull_flags_fn_t)ndr_pull_notify_event);
797 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
798 TALLOC_FREE(n);
799 return;
801 if (DEBUGLEVEL >= 10) {
802 NDR_PRINT_DEBUG(notify_event, n);
805 for (listel=notify->list;listel;listel=listel->next) {
806 if (listel->private_data == n->private_data) {
807 listel->callback(listel->private_data, n);
808 break;
811 TALLOC_FREE(n);
814 struct notify_walk_idx_state {
815 void (*fn)(const char *path,
816 uint32_t *vnns, size_t num_vnns,
817 void *private_data);
818 void *private_data;
821 static int notify_walk_idx_fn(struct db_record *rec, void *private_data)
823 struct notify_walk_idx_state *state =
824 (struct notify_walk_idx_state *)private_data;
825 TDB_DATA key, value;
826 char *path;
828 key = dbwrap_record_get_key(rec);
829 value = dbwrap_record_get_value(rec);
831 if ((value.dsize % sizeof(uint32_t)) != 0) {
832 DEBUG(1, ("invalid value size in notify index db: %u\n",
833 (unsigned)(value.dsize)));
834 return 0;
837 path = talloc_strndup(talloc_tos(), (char *)key.dptr, key.dsize);
838 if (path == NULL) {
839 DEBUG(1, ("talloc_strndup failed\n"));
840 return 0;
842 state->fn(path, (uint32_t *)value.dptr, value.dsize/sizeof(uint32_t),
843 state->private_data);
844 TALLOC_FREE(path);
845 return 0;
848 void notify_walk_idx(struct notify_context *notify,
849 void (*fn)(const char *path,
850 uint32_t *vnns, size_t num_vnns,
851 void *private_data),
852 void *private_data)
854 struct notify_walk_idx_state state;
855 state.fn = fn;
856 state.private_data = private_data;
857 dbwrap_traverse_read(notify->db_index, notify_walk_idx_fn, &state,
858 NULL);
861 struct notify_walk_state {
862 void (*fn)(const char *path,
863 struct notify_db_entry *entries, size_t num_entries,
864 time_t deleted_time, void *private_data);
865 void *private_data;
868 static int notify_walk_fn(struct db_record *rec, void *private_data)
870 struct notify_walk_state *state =
871 (struct notify_walk_state *)private_data;
872 TDB_DATA key, value;
873 struct notify_db_entry *entries;
874 size_t num_entries;
875 time_t deleted_time;
876 char *path;
878 key = dbwrap_record_get_key(rec);
879 value = dbwrap_record_get_value(rec);
881 if (value.dsize == sizeof(deleted_time)) {
882 memcpy(&deleted_time, value.dptr, sizeof(deleted_time));
883 entries = NULL;
884 num_entries = 0;
885 } else {
886 if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
887 DEBUG(1, ("invalid value size in notify db: %u\n",
888 (unsigned)(value.dsize)));
889 return 0;
891 entries = (struct notify_db_entry *)value.dptr;
892 num_entries = value.dsize / sizeof(struct notify_db_entry);
893 deleted_time = 0;
896 path = talloc_strndup(talloc_tos(), (char *)key.dptr, key.dsize);
897 if (path == NULL) {
898 DEBUG(1, ("talloc_strndup failed\n"));
899 return 0;
901 state->fn(path, entries, num_entries, deleted_time,
902 state->private_data);
903 TALLOC_FREE(path);
904 return 0;
907 void notify_walk(struct notify_context *notify,
908 void (*fn)(const char *path,
909 struct notify_db_entry *entries,
910 size_t num_entries,
911 time_t deleted_time, void *private_data),
912 void *private_data)
914 struct notify_walk_state state;
915 state.fn = fn;
916 state.private_data = private_data;
917 dbwrap_traverse_read(notify->db_notify, notify_walk_fn, &state,
918 NULL);
921 struct notify_cleanup_state {
922 TALLOC_CTX *mem_ctx;
923 time_t delete_before;
924 ssize_t array_size;
925 uint32_t num_paths;
926 char **paths;
929 static void notify_cleanup_collect(
930 const char *path, struct notify_db_entry *entries, size_t num_entries,
931 time_t deleted_time, void *private_data)
933 struct notify_cleanup_state *state =
934 (struct notify_cleanup_state *)private_data;
935 char *p;
937 if (num_entries != 0) {
938 return;
940 if (deleted_time >= state->delete_before) {
941 return;
944 p = talloc_strdup(state->mem_ctx, path);
945 if (p == NULL) {
946 DEBUG(1, ("talloc_strdup failed\n"));
947 return;
949 add_to_large_array(state->mem_ctx, sizeof(p), (void *)&p,
950 &state->paths, &state->num_paths,
951 &state->array_size);
952 if (state->array_size == -1) {
953 TALLOC_FREE(p);
957 static bool notify_cleanup_path(struct notify_context *notify,
958 const char *path, time_t delete_before);
960 void notify_cleanup(struct notify_context *notify)
962 struct notify_cleanup_state state;
963 uint32_t failure_pool;
965 ZERO_STRUCT(state);
966 state.mem_ctx = talloc_stackframe();
968 state.delete_before = time(NULL)
969 - lp_parm_int(-1, "smbd", "notify cleanup interval", 60);
971 notify_walk(notify, notify_cleanup_collect, &state);
973 failure_pool = state.num_paths;
975 while (state.num_paths != 0) {
976 size_t idx;
979 * This loop is designed to be as kind as possible to
980 * ctdb. ctdb does not like it if many smbds hammer on a
981 * single record. If on many nodes the cleanup process starts
982 * running, it can happen that all of them need to clean up
983 * records in the same order. This would generate a ctdb
984 * migrate storm on these records. Randomizing the load across
985 * multiple records reduces the load on the individual record.
988 generate_random_buffer((uint8_t *)&idx, sizeof(idx));
989 idx = idx % state.num_paths;
991 if (!notify_cleanup_path(notify, state.paths[idx],
992 state.delete_before)) {
994 * notify_cleanup_path failed, the most likely reason
995 * is that dbwrap_try_fetch_locked failed due to
996 * contention. We allow one failed attempt per deleted
997 * path on average before we give up.
999 failure_pool -= 1;
1000 if (failure_pool == 0) {
1002 * Too many failures. We will come back here,
1003 * maybe next time there is less contention.
1005 break;
1009 TALLOC_FREE(state.paths[idx]);
1010 state.paths[idx] = state.paths[state.num_paths-1];
1011 state.num_paths -= 1;
1013 TALLOC_FREE(state.mem_ctx);
1016 static bool notify_cleanup_path(struct notify_context *notify,
1017 const char *path, time_t delete_before)
1019 struct db_record *notify_rec = NULL;
1020 struct db_record *idx_rec = NULL;
1021 TDB_DATA key = string_tdb_data(path);
1022 TDB_DATA value;
1023 time_t deleted;
1024 NTSTATUS status;
1026 notify_rec = dbwrap_fetch_locked(notify->db_notify, talloc_tos(), key);
1027 if (notify_rec == NULL) {
1028 DEBUG(10, ("Could not fetch notify_rec\n"));
1029 return false;
1031 value = dbwrap_record_get_value(notify_rec);
1033 if (value.dsize != sizeof(deleted)) {
1034 DEBUG(10, ("record %s has been re-used\n", path));
1035 goto done;
1037 memcpy(&deleted, value.dptr, sizeof(deleted));
1039 if (deleted >= delete_before) {
1040 DEBUG(10, ("record %s too young\n", path));
1041 goto done;
1045 * Be kind to ctdb and only try one dmaster migration at most.
1047 idx_rec = dbwrap_try_fetch_locked(notify->db_index, talloc_tos(), key);
1048 if (idx_rec == NULL) {
1049 DEBUG(10, ("Could not fetch idx_rec\n"));
1050 goto done;
1053 status = dbwrap_record_delete(notify_rec);
1054 if (!NT_STATUS_IS_OK(status)) {
1055 DEBUG(10, ("Could not delete notify_rec: %s\n",
1056 nt_errstr(status)));
1059 status = notify_del_idx(idx_rec, get_my_vnn());
1060 if (!NT_STATUS_IS_OK(status)) {
1061 DEBUG(10, ("Could not delete idx_rec: %s\n",
1062 nt_errstr(status)));
1065 done:
1066 TALLOC_FREE(idx_rec);
1067 TALLOC_FREE(notify_rec);
1068 return true;
1071 static NTSTATUS notify_del_idx(struct db_record *rec, uint32_t vnn)
1073 TDB_DATA value = dbwrap_record_get_value(rec);
1074 uint32_t *vnns;
1075 size_t i, num_vnns;
1077 if ((value.dsize % sizeof(uint32_t)) != 0) {
1078 DEBUG(1, ("Invalid value.dsize = %u\n",
1079 (unsigned)value.dsize));
1080 return NT_STATUS_INTERNAL_DB_CORRUPTION;
1082 num_vnns = value.dsize / sizeof(uint32_t);
1083 vnns = (uint32_t *)value.dptr;
1085 for (i=0; i<num_vnns; i++) {
1086 if (vnns[i] == vnn) {
1087 break;
1091 if (i == num_vnns) {
1093 * Not found. Should not happen, but okay...
1095 return NT_STATUS_OK;
1098 memmove(&vnns[i], &vnns[i+1], sizeof(uint32_t) * (num_vnns - i - 1));
1099 value.dsize -= sizeof(uint32_t);
1101 if (value.dsize == 0) {
1102 return dbwrap_record_delete(rec);
1104 return dbwrap_record_store(rec, value, 0);
1107 struct notify_cluster_proxy_state {
1108 struct tevent_context *ev;
1109 struct notify_context *notify;
1110 struct ctdb_msg_channel *chan;
1113 static void notify_cluster_proxy_got_chan(struct tevent_req *subreq);
1114 static void notify_cluster_proxy_got_msg(struct tevent_req *subreq);
1115 static void notify_cluster_proxy_trigger(struct notify_context *notify,
1116 uint32_t action, uint32_t filter,
1117 char *path);
1119 struct tevent_req *notify_cluster_proxy_send(
1120 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1121 struct notify_context *notify)
1123 struct tevent_req *req, *subreq;
1124 struct notify_cluster_proxy_state *state;
1126 req = tevent_req_create(mem_ctx, &state,
1127 struct notify_cluster_proxy_state);
1128 if (req == NULL) {
1129 return NULL;
1131 state->ev = ev;
1132 state->notify = notify;
1134 subreq = ctdb_msg_channel_init_send(
1135 state, state->ev, lp_ctdbd_socket(),
1136 CTDB_SRVID_SAMBA_NOTIFY_PROXY);
1137 if (tevent_req_nomem(subreq, req)) {
1138 return tevent_req_post(req, ev);
1140 tevent_req_set_callback(subreq, notify_cluster_proxy_got_chan, req);
1141 return req;
1144 static void notify_cluster_proxy_got_chan(struct tevent_req *subreq)
1146 struct tevent_req *req = tevent_req_callback_data(
1147 subreq, struct tevent_req);
1148 struct notify_cluster_proxy_state *state = tevent_req_data(
1149 req, struct notify_cluster_proxy_state);
1150 int ret;
1152 ret = ctdb_msg_channel_init_recv(subreq, state, &state->chan);
1153 TALLOC_FREE(subreq);
1154 if (ret != 0) {
1155 tevent_req_error(req, ret);
1156 return;
1158 subreq = ctdb_msg_read_send(state, state->ev, state->chan);
1159 if (tevent_req_nomem(subreq, req)) {
1160 return;
1162 tevent_req_set_callback(subreq, notify_cluster_proxy_got_msg, req);
1165 static void notify_cluster_proxy_got_msg(struct tevent_req *subreq)
1167 struct tevent_req *req = tevent_req_callback_data(
1168 subreq, struct tevent_req);
1169 struct notify_cluster_proxy_state *state = tevent_req_data(
1170 req, struct notify_cluster_proxy_state);
1171 uint8_t *msg;
1172 size_t msg_len;
1173 uint32_t action, filter;
1174 char *path;
1175 int ret;
1176 bool res;
1178 ret = ctdb_msg_read_recv(subreq, talloc_tos(), &msg, &msg_len);
1179 TALLOC_FREE(subreq);
1180 if (ret != 0) {
1181 tevent_req_error(req, ret);
1182 return;
1185 res = notify_pull_remote_blob(talloc_tos(), msg, msg_len,
1186 &action, &filter, &path);
1187 TALLOC_FREE(msg);
1188 if (!res) {
1189 tevent_req_error(req, EIO);
1190 return;
1192 notify_cluster_proxy_trigger(state->notify, action, filter, path);
1193 TALLOC_FREE(path);
1195 subreq = ctdb_msg_read_send(state, state->ev, state->chan);
1196 if (tevent_req_nomem(subreq, req)) {
1197 return;
1199 tevent_req_set_callback(subreq, notify_cluster_proxy_got_msg, req);
1202 static void notify_cluster_proxy_trigger(struct notify_context *notify,
1203 uint32_t action, uint32_t filter,
1204 char *path)
1206 const char *p, *next_p;
1208 for (p = path; p != NULL; p = next_p) {
1209 ptrdiff_t path_len = p - path;
1210 bool recursive;
1212 next_p = strchr(p+1, '/');
1213 recursive = (next_p != NULL);
1215 notify_trigger_local(notify, action, filter,
1216 path, path_len, recursive);
1220 int notify_cluster_proxy_recv(struct tevent_req *req)
1222 int err;
1224 if (tevent_req_is_unix_error(req, &err)) {
1225 return err;
1227 return 0;