2 Unix SMB/CIFS implementation.
4 Copyright (C) Andrew Tridgell 2006
5 Copyright (C) Volker Lendecke 2012
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 this is the change notify database. It implements mechanisms for
23 storing current change notify waiters in a tdb, and checking if a
24 given event matches any of the stored notify waiters.
28 #include "system/filesys.h"
29 #include "librpc/gen_ndr/ndr_notify.h"
30 #include "dbwrap/dbwrap.h"
31 #include "dbwrap/dbwrap_open.h"
32 #include "dbwrap/dbwrap_tdb.h"
33 #include "smbd/smbd.h"
35 #include "lib/tdb_wrap/tdb_wrap.h"
37 #include "lib/param/param.h"
38 #include "lib/dbwrap/dbwrap_cache.h"
39 #include "ctdb_srvids.h"
40 #include "ctdbd_conn.h"
41 #include "ctdb_conn.h"
42 #include "lib/util/tevent_unix.h"
45 struct notify_list
*next
, *prev
;
47 void (*callback
)(void *, struct timespec
, const struct notify_event
*);
51 struct notify_context
{
52 struct messaging_context
*msg
;
53 struct notify_list
*list
;
56 * The notify database is split up into two databases: One
57 * relatively static index db and the real notify db with the
62 * "db_notify" is indexed by pathname. Per record it stores an
63 * array of notify_db_entry structs. These represent the
64 * notify records as requested by the smb client. This
65 * database is always held locally, it is never clustered.
67 struct db_context
*db_notify
;
70 * "db_index" is indexed by pathname. The records are an array
71 * of VNNs which have any interest in notifies for this path
74 * In the non-clustered case this database is cached in RAM by
75 * means of db_cache_open, which maintains a cache per
76 * process. Cache consistency is maintained by the tdb
79 * In the clustered case right now we can not use the tdb
80 * sequence number, but by means of read only records we
81 * should be able to avoid a lot of full migrations.
83 * In both cases, it is important to keep the update
84 * operations to db_index to a minimum. This is achieved by
85 * delayed deletion. When a db_notify is initially created,
86 * the db_index record is also created. When more notifies are
87 * added for a path, then only the db_notify record needs to be
88 * modified, the db_index record is not touched. When the last
89 * entry from the db_notify record is deleted, the db_index
90 * record is not immediately deleted. Instead, the db_notify
91 * record is replaced with a current timestamp. A regular
92 * cleanup process will delete all db_index records that are
93 * older than a minute.
95 struct db_context
*db_index
;
98 static void notify_trigger_local(struct notify_context
*notify
,
99 uint32_t action
, uint32_t filter
,
100 const char *path
, size_t path_len
,
102 static NTSTATUS
notify_send(struct notify_context
*notify
,
103 struct server_id
*pid
,
104 const char *path
, uint32_t action
,
106 static NTSTATUS
notify_add_entry(struct db_record
*rec
,
107 const struct notify_db_entry
*e
,
109 static NTSTATUS
notify_add_idx(struct db_record
*rec
, uint32_t vnn
);
111 static NTSTATUS
notify_del_entry(struct db_record
*rec
,
112 const struct server_id
*pid
,
114 static NTSTATUS
notify_del_idx(struct db_record
*rec
, uint32_t vnn
);
116 static int notify_context_destructor(struct notify_context
*notify
);
118 static void notify_handler(struct messaging_context
*msg_ctx
,
119 void *private_data
, uint32_t msg_type
,
120 struct server_id server_id
, DATA_BLOB
*data
);
122 struct notify_context
*notify_init(TALLOC_CTX
*mem_ctx
,
123 struct messaging_context
*msg
,
124 struct tevent_context
*ev
)
126 struct loadparm_context
*lp_ctx
;
127 struct notify_context
*notify
;
130 notify
= talloc(mem_ctx
, struct notify_context
);
131 if (notify
== NULL
) {
137 lp_ctx
= loadparm_init_s3(notify
, loadparm_s3_helpers());
139 db_path
= lock_path("notify.tdb");
140 if (db_path
== NULL
) {
144 notify
->db_notify
= db_open_tdb(
145 notify
, lp_ctx
, db_path
,
146 0, TDB_CLEAR_IF_FIRST
|TDB_INCOMPATIBLE_HASH
,
147 O_RDWR
|O_CREAT
, 0644, DBWRAP_LOCK_ORDER_2
, DBWRAP_FLAG_NONE
);
148 talloc_unlink(notify
, lp_ctx
);
149 TALLOC_FREE(db_path
);
150 if (notify
->db_notify
== NULL
) {
154 db_path
= lock_path("notify_index.tdb");
155 if (db_path
== NULL
) {
159 notify
->db_index
= db_open(
161 0, TDB_SEQNUM
|TDB_CLEAR_IF_FIRST
|TDB_INCOMPATIBLE_HASH
,
162 O_RDWR
|O_CREAT
, 0644, DBWRAP_LOCK_ORDER_3
, DBWRAP_FLAG_NONE
);
163 TALLOC_FREE(db_path
);
164 if (notify
->db_index
== NULL
) {
167 if (!lp_clustering()) {
168 notify
->db_index
= db_open_cache(notify
, notify
->db_index
);
169 if (notify
->db_index
== NULL
) {
174 if (notify
->msg
!= NULL
) {
177 status
= messaging_register(notify
->msg
, notify
,
178 MSG_PVFS_NOTIFY
, notify_handler
);
179 if (!NT_STATUS_IS_OK(status
)) {
180 DEBUG(1, ("messaging_register returned %s\n",
186 talloc_set_destructor(notify
, notify_context_destructor
);
194 static int notify_context_destructor(struct notify_context
*notify
)
196 DEBUG(10, ("notify_context_destructor called\n"));
198 if (notify
->msg
!= NULL
) {
199 messaging_deregister(notify
->msg
, MSG_PVFS_NOTIFY
, notify
);
202 while (notify
->list
!= NULL
) {
203 DEBUG(10, ("Removing private_data=%p\n",
204 notify
->list
->private_data
));
205 notify_remove(notify
, notify
->list
->private_data
);
210 NTSTATUS
notify_add(struct notify_context
*notify
,
211 const char *path
, uint32_t filter
, uint32_t subdir_filter
,
212 void (*callback
)(void *, struct timespec
,
213 const struct notify_event
*),
216 struct notify_db_entry e
;
217 struct notify_list
*listel
;
218 struct db_record
*notify_rec
, *idx_rec
;
221 TDB_DATA key
, notify_copy
;
223 if (notify
== NULL
) {
224 return NT_STATUS_NOT_IMPLEMENTED
;
227 DEBUG(10, ("notify_add: path=[%s], private_data=%p\n", path
,
230 listel
= talloc(notify
, struct notify_list
);
231 if (listel
== NULL
) {
232 return NT_STATUS_NO_MEMORY
;
234 listel
->callback
= callback
;
235 listel
->private_data
= private_data
;
236 listel
->path
= talloc_strdup(listel
, path
);
237 if (listel
->path
== NULL
) {
239 return NT_STATUS_NO_MEMORY
;
241 DLIST_ADD(notify
->list
, listel
);
245 e
.subdir_filter
= subdir_filter
;
246 e
.server
= messaging_server_id(notify
->msg
);
247 e
.private_data
= private_data
;
249 key
= string_tdb_data(path
);
251 notify_rec
= dbwrap_fetch_locked(notify
->db_notify
,
253 if (notify_rec
== NULL
) {
254 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
259 * Make a copy of the notify_rec for easy restore in case
260 * updating the index_rec fails;
262 notify_copy
= dbwrap_record_get_value(notify_rec
);
263 if (notify_copy
.dsize
!= 0) {
264 notify_copy
.dptr
= (uint8_t *)talloc_memdup(
265 notify_rec
, notify_copy
.dptr
,
267 if (notify_copy
.dptr
== NULL
) {
268 TALLOC_FREE(notify_rec
);
269 status
= NT_STATUS_NO_MEMORY
;
274 if (DEBUGLEVEL
>= 10) {
275 NDR_PRINT_DEBUG(notify_db_entry
, &e
);
278 status
= notify_add_entry(notify_rec
, &e
, &add_idx
);
279 if (!NT_STATUS_IS_OK(status
)) {
284 * Someone else has added the idx entry already
286 TALLOC_FREE(notify_rec
);
290 idx_rec
= dbwrap_fetch_locked(notify
->db_index
,
292 if (idx_rec
== NULL
) {
293 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
296 status
= notify_add_idx(idx_rec
, get_my_vnn());
297 if (!NT_STATUS_IS_OK(status
)) {
301 TALLOC_FREE(idx_rec
);
302 TALLOC_FREE(notify_rec
);
306 if (notify_copy
.dsize
!= 0) {
307 dbwrap_record_store(notify_rec
, notify_copy
, 0);
309 dbwrap_record_delete(notify_rec
);
311 TALLOC_FREE(notify_rec
);
313 DLIST_REMOVE(notify
->list
, listel
);
318 static NTSTATUS
notify_add_entry(struct db_record
*rec
,
319 const struct notify_db_entry
*e
,
322 TDB_DATA value
= dbwrap_record_get_value(rec
);
323 struct notify_db_entry
*entries
;
328 if (value
.dsize
== sizeof(time_t)) {
329 DEBUG(10, ("Re-using deleted entry\n"));
334 if ((value
.dsize
% sizeof(struct notify_db_entry
)) != 0) {
335 DEBUG(1, ("Invalid value.dsize = %u\n",
336 (unsigned)value
.dsize
));
337 return NT_STATUS_INTERNAL_DB_CORRUPTION
;
339 num_entries
= value
.dsize
/ sizeof(struct notify_db_entry
);
341 if (num_entries
!= 0) {
345 entries
= talloc_array(rec
, struct notify_db_entry
, num_entries
+ 1);
346 if (entries
== NULL
) {
347 return NT_STATUS_NO_MEMORY
;
349 memcpy(entries
, value
.dptr
, value
.dsize
);
351 entries
[num_entries
] = *e
;
352 value
= make_tdb_data((uint8_t *)entries
, talloc_get_size(entries
));
353 status
= dbwrap_record_store(rec
, value
, 0);
354 TALLOC_FREE(entries
);
355 if (!NT_STATUS_IS_OK(status
)) {
358 *p_add_idx
= add_idx
;
362 static NTSTATUS
notify_add_idx(struct db_record
*rec
, uint32_t vnn
)
364 TDB_DATA value
= dbwrap_record_get_value(rec
);
369 if ((value
.dsize
% sizeof(uint32_t)) != 0) {
370 DEBUG(1, ("Invalid value.dsize = %u\n",
371 (unsigned)value
.dsize
));
372 return NT_STATUS_INTERNAL_DB_CORRUPTION
;
374 num_vnns
= value
.dsize
/ sizeof(uint32_t);
375 vnns
= (uint32_t *)value
.dptr
;
377 for (i
=0; i
<num_vnns
; i
++) {
378 if (vnns
[i
] == vnn
) {
386 value
.dptr
= (uint8_t *)talloc_realloc(
387 rec
, value
.dptr
, uint32_t, num_vnns
+ 1);
388 if (value
.dptr
== NULL
) {
389 return NT_STATUS_NO_MEMORY
;
391 value
.dsize
= talloc_get_size(value
.dptr
);
393 vnns
= (uint32_t *)value
.dptr
;
395 memmove(&vnns
[i
+1], &vnns
[i
], sizeof(uint32_t) * (num_vnns
- i
));
398 status
= dbwrap_record_store(rec
, value
, 0);
399 if (!NT_STATUS_IS_OK(status
)) {
405 NTSTATUS
notify_remove(struct notify_context
*notify
, void *private_data
)
407 struct server_id pid
;
408 struct notify_list
*listel
;
409 struct db_record
*notify_rec
;
412 if ((notify
== NULL
) || (notify
->msg
== NULL
)) {
413 return NT_STATUS_NOT_IMPLEMENTED
;
416 DEBUG(10, ("notify_remove: private_data=%p\n", private_data
));
418 pid
= messaging_server_id(notify
->msg
);
420 for (listel
=notify
->list
;listel
;listel
=listel
->next
) {
421 if (listel
->private_data
== private_data
) {
422 DLIST_REMOVE(notify
->list
, listel
);
426 if (listel
== NULL
) {
427 DEBUG(10, ("%p not found\n", private_data
));
428 return NT_STATUS_NOT_FOUND
;
430 notify_rec
= dbwrap_fetch_locked(notify
->db_notify
, talloc_tos(),
431 string_tdb_data(listel
->path
));
433 if (notify_rec
== NULL
) {
434 return NT_STATUS_INTERNAL_DB_CORRUPTION
;
436 status
= notify_del_entry(notify_rec
, &pid
, private_data
);
437 DEBUG(10, ("del_entry returned %s\n", nt_errstr(status
)));
438 TALLOC_FREE(notify_rec
);
442 static NTSTATUS
notify_del_entry(struct db_record
*rec
,
443 const struct server_id
*pid
,
446 TDB_DATA value
= dbwrap_record_get_value(rec
);
447 struct server_id_buf tmp
;
448 struct notify_db_entry
*entries
;
449 size_t i
, num_entries
;
452 DEBUG(10, ("del_entry called for %s %p\n",
453 server_id_str_buf(*pid
, &tmp
), private_data
));
455 if ((value
.dsize
% sizeof(struct notify_db_entry
)) != 0) {
456 DEBUG(1, ("Invalid value.dsize = %u\n",
457 (unsigned)value
.dsize
));
458 return NT_STATUS_INTERNAL_DB_CORRUPTION
;
460 num_entries
= value
.dsize
/ sizeof(struct notify_db_entry
);
461 entries
= (struct notify_db_entry
*)value
.dptr
;
463 for (i
=0; i
<num_entries
; i
++) {
464 struct notify_db_entry
*e
= &entries
[i
];
466 if (DEBUGLEVEL
>= 10) {
467 NDR_PRINT_DEBUG(notify_db_entry
, e
);
470 if (e
->private_data
!= private_data
) {
473 if (serverid_equal(&e
->server
, pid
)) {
477 if (i
== num_entries
) {
478 return NT_STATUS_NOT_FOUND
;
480 entries
[i
] = entries
[num_entries
-1];
481 value
.dsize
-= sizeof(struct notify_db_entry
);
483 if (value
.dsize
== 0) {
485 value
.dptr
= (uint8_t *)&now
;
486 value
.dsize
= sizeof(now
);
488 return dbwrap_record_store(rec
, value
, 0);
491 struct notify_trigger_index_state
{
498 static void notify_trigger_index_parser(TDB_DATA key
, TDB_DATA data
,
501 struct notify_trigger_index_state
*state
=
502 (struct notify_trigger_index_state
*)private_data
;
504 size_t i
, num_vnns
, num_new_vnns
, num_remote_vnns
;
506 if ((data
.dsize
% sizeof(uint32_t)) != 0) {
507 DEBUG(1, ("Invalid record size in notify index db: %u\n",
508 (unsigned)data
.dsize
));
511 new_vnns
= (uint32_t *)data
.dptr
;
512 num_new_vnns
= data
.dsize
/ sizeof(uint32_t);
513 num_remote_vnns
= num_new_vnns
;
515 for (i
=0; i
<num_new_vnns
; i
++) {
516 if (new_vnns
[i
] == state
->my_vnn
) {
517 state
->found_my_vnn
= true;
518 num_remote_vnns
-= 1;
521 if (num_remote_vnns
== 0) {
525 num_vnns
= talloc_array_length(state
->vnns
);
526 state
->vnns
= talloc_realloc(state
->mem_ctx
, state
->vnns
, uint32_t,
527 num_vnns
+ num_remote_vnns
);
528 if (state
->vnns
== NULL
) {
529 DEBUG(1, ("talloc_realloc failed\n"));
533 for (i
=0; i
<num_new_vnns
; i
++) {
534 if (new_vnns
[i
] != state
->my_vnn
) {
535 state
->vnns
[num_vnns
] = new_vnns
[i
];
541 static int vnn_cmp(const void *p1
, const void *p2
)
543 const uint32_t *vnn1
= (const uint32_t *)p1
;
544 const uint32_t *vnn2
= (const uint32_t *)p2
;
549 if (*vnn1
== *vnn2
) {
555 static bool notify_push_remote_blob(TALLOC_CTX
*mem_ctx
, uint32_t action
,
556 uint32_t filter
, const char *path
,
557 uint8_t **pblob
, size_t *pblob_len
)
559 struct notify_remote_event ev
;
561 enum ndr_err_code ndr_err
;
567 if (DEBUGLEVEL
>= 10) {
568 NDR_PRINT_DEBUG(notify_remote_event
, &ev
);
571 ndr_err
= ndr_push_struct_blob(
573 (ndr_push_flags_fn_t
)ndr_push_notify_remote_event
);
574 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
578 *pblob_len
= data
.length
;
582 static bool notify_pull_remote_blob(TALLOC_CTX
*mem_ctx
,
583 const uint8_t *blob
, size_t blob_len
,
584 uint32_t *paction
, uint32_t *pfilter
,
587 struct notify_remote_event
*ev
;
588 enum ndr_err_code ndr_err
;
592 data
.data
= discard_const_p(uint8_t, blob
);
593 data
.length
= blob_len
;
595 ev
= talloc(mem_ctx
, struct notify_remote_event
);
600 ndr_err
= ndr_pull_struct_blob(
602 (ndr_pull_flags_fn_t
)ndr_pull_notify_remote_event
);
603 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
607 if (DEBUGLEVEL
>= 10) {
608 NDR_PRINT_DEBUG(notify_remote_event
, ev
);
610 *paction
= ev
->action
;
611 *pfilter
= ev
->filter
;
612 p
= discard_const_p(char, ev
->path
);
613 *path
= talloc_move(mem_ctx
, &p
);
619 void notify_trigger(struct notify_context
*notify
,
620 uint32_t action
, uint32_t filter
,
621 const char *dir
, const char *name
)
623 struct ctdbd_connection
*ctdbd_conn
;
624 struct notify_trigger_index_state idx_state
;
625 const char *p
, *next_p
;
628 uint8_t *remote_blob
= NULL
;
629 size_t remote_blob_len
= 0;
631 char *path
, *to_free
;
632 char tmpbuf
[PATH_MAX
];
635 DEBUG(10, ("notify_trigger called action=0x%x, filter=0x%x, "
636 "dir=%s, name=%s\n", (unsigned)action
, (unsigned)filter
,
639 /* see if change notify is enabled at all */
640 if (notify
== NULL
) {
646 * The rest of this routine assumes an absolute path.
651 len
= full_path_tos(dir
, name
, tmpbuf
, sizeof(tmpbuf
),
654 DEBUG(1, ("full_path_tos failed\n"));
658 idx_state
.mem_ctx
= talloc_tos();
659 idx_state
.vnns
= NULL
;
660 idx_state
.found_my_vnn
= false;
661 idx_state
.my_vnn
= get_my_vnn();
663 for (p
= strchr(path
+1, '/'); p
!= NULL
; p
= next_p
) {
664 ptrdiff_t path_len
= p
- path
;
667 next_p
= strchr(p
+1, '/');
668 recursive
= (next_p
!= NULL
);
672 make_tdb_data(discard_const_p(uint8_t, path
), path_len
),
673 notify_trigger_index_parser
, &idx_state
);
675 if (idx_state
.found_my_vnn
) {
676 notify_trigger_local(notify
, action
, filter
,
677 path
, path_len
, recursive
);
678 idx_state
.found_my_vnn
= false;
682 if (idx_state
.vnns
== NULL
) {
686 ctdbd_conn
= messaging_ctdbd_connection();
687 if (ctdbd_conn
== NULL
) {
691 num_vnns
= talloc_array_length(idx_state
.vnns
);
692 qsort(idx_state
.vnns
, num_vnns
, sizeof(uint32_t), vnn_cmp
);
694 last_vnn
= 0xffffffff;
696 if (!notify_push_remote_blob(talloc_tos(), action
, filter
, path
,
697 &remote_blob
, &remote_blob_len
)) {
698 DEBUG(1, ("notify_push_remote_blob failed\n"));
702 iov
= (struct iovec
) { .iov_base
= remote_blob
,
703 .iov_len
= remote_blob_len
};
705 for (i
=0; i
<num_vnns
; i
++) {
706 uint32_t vnn
= idx_state
.vnns
[i
];
709 if (vnn
== last_vnn
) {
713 status
= ctdbd_messaging_send_iov(
714 ctdbd_conn
, vnn
, CTDB_SRVID_SAMBA_NOTIFY_PROXY
,
716 if (!NT_STATUS_IS_OK(status
)) {
717 DEBUG(10, ("ctdbd_messaging_send_iov to vnn %d "
718 "returned %s, ignoring\n", (int)vnn
,
726 TALLOC_FREE(remote_blob
);
727 TALLOC_FREE(idx_state
.vnns
);
728 TALLOC_FREE(to_free
);
731 static void notify_trigger_local(struct notify_context
*notify
,
732 uint32_t action
, uint32_t filter
,
733 const char *path
, size_t path_len
,
737 struct notify_db_entry
*entries
;
738 size_t i
, num_entries
;
741 DEBUG(10, ("notify_trigger_local called for %*s, path_len=%d, "
742 "filter=%d\n", (int)path_len
, path
, (int)path_len
,
745 status
= dbwrap_fetch(
746 notify
->db_notify
, talloc_tos(),
747 make_tdb_data(discard_const_p(uint8_t, path
), path_len
), &data
);
748 if (!NT_STATUS_IS_OK(status
)) {
749 DEBUG(10, ("dbwrap_fetch returned %s\n",
753 if (data
.dsize
== sizeof(time_t)) {
754 DEBUG(10, ("Got deleted record\n"));
757 if ((data
.dsize
% sizeof(struct notify_db_entry
)) != 0) {
758 DEBUG(1, ("Invalid data.dsize = %u\n",
759 (unsigned)data
.dsize
));
763 entries
= (struct notify_db_entry
*)data
.dptr
;
764 num_entries
= data
.dsize
/ sizeof(struct notify_db_entry
);
766 DEBUG(10, ("recursive = %s pathlen=%d (%c)\n",
767 recursive
? "true" : "false", (int)path_len
,
770 for (i
=0; i
<num_entries
; i
++) {
771 struct notify_db_entry
*e
= &entries
[i
];
774 if (DEBUGLEVEL
>= 10) {
775 NDR_PRINT_DEBUG(notify_db_entry
, e
);
778 e_filter
= recursive
? e
->subdir_filter
: e
->filter
;
780 if ((filter
& e_filter
) == 0) {
784 if (!procid_is_local(&e
->server
)) {
785 struct server_id_buf tmp
;
786 DEBUG(1, ("internal error: Non-local pid %s in "
788 server_id_str_buf(e
->server
, &tmp
)));
792 status
= notify_send(notify
, &e
->server
, path
+ path_len
+ 1,
793 action
, e
->private_data
);
794 if (!NT_STATUS_IS_OK(status
)) {
795 DEBUG(10, ("notify_send returned %s\n",
801 TALLOC_FREE(data
.dptr
);
805 struct timespec when
;
811 static NTSTATUS
notify_send(struct notify_context
*notify
,
812 struct server_id
*pid
,
813 const char *path
, uint32_t action
,
816 struct notify_msg m
= {};
819 m
.when
= timespec_current();
820 m
.private_data
= private_data
;
823 iov
[0].iov_base
= &m
;
824 iov
[0].iov_len
= offsetof(struct notify_msg
, path
);
825 iov
[1].iov_base
= discard_const_p(char, path
);
826 iov
[1].iov_len
= strlen(path
)+1;
828 return messaging_send_iov(notify
->msg
, *pid
, MSG_PVFS_NOTIFY
,
829 iov
, ARRAY_SIZE(iov
), NULL
, 0);
832 static void notify_handler(struct messaging_context
*msg_ctx
,
833 void *private_data
, uint32_t msg_type
,
834 struct server_id server_id
, DATA_BLOB
*data
)
836 struct notify_context
*notify
= talloc_get_type_abort(
837 private_data
, struct notify_context
);
838 struct notify_msg
*m
;
839 struct notify_event e
;
840 struct notify_list
*listel
;
842 if (data
->length
== 0) {
843 DEBUG(1, ("%s: Got 0-sized MSG_PVFS_NOTIFY msg\n", __func__
));
846 if (data
->data
[data
->length
-1] != 0) {
847 DEBUG(1, ("%s: MSG_PVFS_NOTIFY path not 0-terminated\n",
852 m
= (struct notify_msg
*)data
->data
;
854 e
= (struct notify_event
) {
857 .private_data
= m
->private_data
,
858 .dir
= discard_const_p(char, "")
861 for (listel
=notify
->list
;listel
;listel
=listel
->next
) {
862 if (listel
->private_data
== m
->private_data
) {
863 listel
->callback(listel
->private_data
, m
->when
, &e
);
869 struct notify_walk_idx_state
{
870 void (*fn
)(const char *path
,
871 uint32_t *vnns
, size_t num_vnns
,
876 static int notify_walk_idx_fn(struct db_record
*rec
, void *private_data
)
878 struct notify_walk_idx_state
*state
=
879 (struct notify_walk_idx_state
*)private_data
;
883 key
= dbwrap_record_get_key(rec
);
884 value
= dbwrap_record_get_value(rec
);
886 if ((value
.dsize
% sizeof(uint32_t)) != 0) {
887 DEBUG(1, ("invalid value size in notify index db: %u\n",
888 (unsigned)(value
.dsize
)));
892 path
= talloc_strndup(talloc_tos(), (char *)key
.dptr
, key
.dsize
);
894 DEBUG(1, ("talloc_strndup failed\n"));
897 state
->fn(path
, (uint32_t *)value
.dptr
, value
.dsize
/sizeof(uint32_t),
898 state
->private_data
);
903 void notify_walk_idx(struct notify_context
*notify
,
904 void (*fn
)(const char *path
,
905 uint32_t *vnns
, size_t num_vnns
,
909 struct notify_walk_idx_state state
;
911 state
.private_data
= private_data
;
912 dbwrap_traverse_read(notify
->db_index
, notify_walk_idx_fn
, &state
,
916 struct notify_walk_state
{
917 void (*fn
)(const char *path
,
918 struct notify_db_entry
*entries
, size_t num_entries
,
919 time_t deleted_time
, void *private_data
);
923 static int notify_walk_fn(struct db_record
*rec
, void *private_data
)
925 struct notify_walk_state
*state
=
926 (struct notify_walk_state
*)private_data
;
928 struct notify_db_entry
*entries
;
933 key
= dbwrap_record_get_key(rec
);
934 value
= dbwrap_record_get_value(rec
);
936 if (value
.dsize
== sizeof(deleted_time
)) {
937 memcpy(&deleted_time
, value
.dptr
, sizeof(deleted_time
));
941 if ((value
.dsize
% sizeof(struct notify_db_entry
)) != 0) {
942 DEBUG(1, ("invalid value size in notify db: %u\n",
943 (unsigned)(value
.dsize
)));
946 entries
= (struct notify_db_entry
*)value
.dptr
;
947 num_entries
= value
.dsize
/ sizeof(struct notify_db_entry
);
951 path
= talloc_strndup(talloc_tos(), (char *)key
.dptr
, key
.dsize
);
953 DEBUG(1, ("talloc_strndup failed\n"));
956 state
->fn(path
, entries
, num_entries
, deleted_time
,
957 state
->private_data
);
962 void notify_walk(struct notify_context
*notify
,
963 void (*fn
)(const char *path
,
964 struct notify_db_entry
*entries
,
966 time_t deleted_time
, void *private_data
),
969 struct notify_walk_state state
;
971 state
.private_data
= private_data
;
972 dbwrap_traverse_read(notify
->db_notify
, notify_walk_fn
, &state
,
976 struct notify_cleanup_state
{
978 time_t delete_before
;
984 static void notify_cleanup_collect(
985 const char *path
, struct notify_db_entry
*entries
, size_t num_entries
,
986 time_t deleted_time
, void *private_data
)
988 struct notify_cleanup_state
*state
=
989 (struct notify_cleanup_state
*)private_data
;
992 if (num_entries
!= 0) {
995 if (deleted_time
>= state
->delete_before
) {
999 p
= talloc_strdup(state
->mem_ctx
, path
);
1001 DEBUG(1, ("talloc_strdup failed\n"));
1004 add_to_large_array(state
->mem_ctx
, sizeof(p
), (void *)&p
,
1005 &state
->paths
, &state
->num_paths
,
1006 &state
->array_size
);
1007 if (state
->array_size
== -1) {
1012 static bool notify_cleanup_path(struct notify_context
*notify
,
1013 const char *path
, time_t delete_before
);
1015 void notify_cleanup(struct notify_context
*notify
)
1017 struct notify_cleanup_state state
;
1018 uint32_t failure_pool
;
1021 state
.mem_ctx
= talloc_stackframe();
1023 state
.delete_before
= time(NULL
)
1024 - lp_parm_int(-1, "smbd", "notify cleanup interval", 60);
1026 notify_walk(notify
, notify_cleanup_collect
, &state
);
1028 failure_pool
= state
.num_paths
;
1030 while (state
.num_paths
!= 0) {
1034 * This loop is designed to be as kind as possible to
1035 * ctdb. ctdb does not like it if many smbds hammer on a
1036 * single record. If on many nodes the cleanup process starts
1037 * running, it can happen that all of them need to clean up
1038 * records in the same order. This would generate a ctdb
1039 * migrate storm on these records. Randomizing the load across
1040 * multiple records reduces the load on the individual record.
1043 generate_random_buffer((uint8_t *)&idx
, sizeof(idx
));
1044 idx
= idx
% state
.num_paths
;
1046 if (!notify_cleanup_path(notify
, state
.paths
[idx
],
1047 state
.delete_before
)) {
1049 * notify_cleanup_path failed, the most likely reason
1050 * is that dbwrap_try_fetch_locked failed due to
1051 * contention. We allow one failed attempt per deleted
1052 * path on average before we give up.
1055 if (failure_pool
== 0) {
1057 * Too many failures. We will come back here,
1058 * maybe next time there is less contention.
1064 TALLOC_FREE(state
.paths
[idx
]);
1065 state
.paths
[idx
] = state
.paths
[state
.num_paths
-1];
1066 state
.num_paths
-= 1;
1068 TALLOC_FREE(state
.mem_ctx
);
1071 static bool notify_cleanup_path(struct notify_context
*notify
,
1072 const char *path
, time_t delete_before
)
1074 struct db_record
*notify_rec
= NULL
;
1075 struct db_record
*idx_rec
= NULL
;
1076 TDB_DATA key
= string_tdb_data(path
);
1081 notify_rec
= dbwrap_fetch_locked(notify
->db_notify
, talloc_tos(), key
);
1082 if (notify_rec
== NULL
) {
1083 DEBUG(10, ("Could not fetch notify_rec\n"));
1086 value
= dbwrap_record_get_value(notify_rec
);
1088 if (value
.dsize
!= sizeof(deleted
)) {
1089 DEBUG(10, ("record %s has been re-used\n", path
));
1092 memcpy(&deleted
, value
.dptr
, sizeof(deleted
));
1094 if (deleted
>= delete_before
) {
1095 DEBUG(10, ("record %s too young\n", path
));
1100 * Be kind to ctdb and only try one dmaster migration at most.
1102 idx_rec
= dbwrap_try_fetch_locked(notify
->db_index
, talloc_tos(), key
);
1103 if (idx_rec
== NULL
) {
1104 DEBUG(10, ("Could not fetch idx_rec\n"));
1108 status
= dbwrap_record_delete(notify_rec
);
1109 if (!NT_STATUS_IS_OK(status
)) {
1110 DEBUG(10, ("Could not delete notify_rec: %s\n",
1111 nt_errstr(status
)));
1114 status
= notify_del_idx(idx_rec
, get_my_vnn());
1115 if (!NT_STATUS_IS_OK(status
)) {
1116 DEBUG(10, ("Could not delete idx_rec: %s\n",
1117 nt_errstr(status
)));
1121 TALLOC_FREE(idx_rec
);
1122 TALLOC_FREE(notify_rec
);
1126 static NTSTATUS
notify_del_idx(struct db_record
*rec
, uint32_t vnn
)
1128 TDB_DATA value
= dbwrap_record_get_value(rec
);
1132 if ((value
.dsize
% sizeof(uint32_t)) != 0) {
1133 DEBUG(1, ("Invalid value.dsize = %u\n",
1134 (unsigned)value
.dsize
));
1135 return NT_STATUS_INTERNAL_DB_CORRUPTION
;
1137 num_vnns
= value
.dsize
/ sizeof(uint32_t);
1138 vnns
= (uint32_t *)value
.dptr
;
1140 for (i
=0; i
<num_vnns
; i
++) {
1141 if (vnns
[i
] == vnn
) {
1146 if (i
== num_vnns
) {
1148 * Not found. Should not happen, but okay...
1150 return NT_STATUS_OK
;
1153 memmove(&vnns
[i
], &vnns
[i
+1], sizeof(uint32_t) * (num_vnns
- i
- 1));
1154 value
.dsize
-= sizeof(uint32_t);
1156 if (value
.dsize
== 0) {
1157 return dbwrap_record_delete(rec
);
1159 return dbwrap_record_store(rec
, value
, 0);
1162 struct notify_cluster_proxy_state
{
1163 struct tevent_context
*ev
;
1164 struct notify_context
*notify
;
1165 struct ctdb_msg_channel
*chan
;
1168 static void notify_cluster_proxy_got_chan(struct tevent_req
*subreq
);
1169 static void notify_cluster_proxy_got_msg(struct tevent_req
*subreq
);
1170 static void notify_cluster_proxy_trigger(struct notify_context
*notify
,
1171 uint32_t action
, uint32_t filter
,
1174 struct tevent_req
*notify_cluster_proxy_send(
1175 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
1176 struct notify_context
*notify
)
1178 struct tevent_req
*req
, *subreq
;
1179 struct notify_cluster_proxy_state
*state
;
1181 req
= tevent_req_create(mem_ctx
, &state
,
1182 struct notify_cluster_proxy_state
);
1187 state
->notify
= notify
;
1189 subreq
= ctdb_msg_channel_init_send(
1190 state
, state
->ev
, lp_ctdbd_socket(),
1191 CTDB_SRVID_SAMBA_NOTIFY_PROXY
);
1192 if (tevent_req_nomem(subreq
, req
)) {
1193 return tevent_req_post(req
, ev
);
1195 tevent_req_set_callback(subreq
, notify_cluster_proxy_got_chan
, req
);
1199 static void notify_cluster_proxy_got_chan(struct tevent_req
*subreq
)
1201 struct tevent_req
*req
= tevent_req_callback_data(
1202 subreq
, struct tevent_req
);
1203 struct notify_cluster_proxy_state
*state
= tevent_req_data(
1204 req
, struct notify_cluster_proxy_state
);
1207 ret
= ctdb_msg_channel_init_recv(subreq
, state
, &state
->chan
);
1208 TALLOC_FREE(subreq
);
1210 tevent_req_error(req
, ret
);
1213 subreq
= ctdb_msg_read_send(state
, state
->ev
, state
->chan
);
1214 if (tevent_req_nomem(subreq
, req
)) {
1217 tevent_req_set_callback(subreq
, notify_cluster_proxy_got_msg
, req
);
1220 static void notify_cluster_proxy_got_msg(struct tevent_req
*subreq
)
1222 struct tevent_req
*req
= tevent_req_callback_data(
1223 subreq
, struct tevent_req
);
1224 struct notify_cluster_proxy_state
*state
= tevent_req_data(
1225 req
, struct notify_cluster_proxy_state
);
1228 uint32_t action
, filter
;
1233 ret
= ctdb_msg_read_recv(subreq
, talloc_tos(), &msg
, &msg_len
);
1234 TALLOC_FREE(subreq
);
1236 tevent_req_error(req
, ret
);
1240 res
= notify_pull_remote_blob(talloc_tos(), msg
, msg_len
,
1241 &action
, &filter
, &path
);
1244 tevent_req_error(req
, EIO
);
1247 notify_cluster_proxy_trigger(state
->notify
, action
, filter
, path
);
1250 subreq
= ctdb_msg_read_send(state
, state
->ev
, state
->chan
);
1251 if (tevent_req_nomem(subreq
, req
)) {
1254 tevent_req_set_callback(subreq
, notify_cluster_proxy_got_msg
, req
);
1257 static void notify_cluster_proxy_trigger(struct notify_context
*notify
,
1258 uint32_t action
, uint32_t filter
,
1261 const char *p
, *next_p
;
1263 for (p
= path
; p
!= NULL
; p
= next_p
) {
1264 ptrdiff_t path_len
= p
- path
;
1267 next_p
= strchr(p
+1, '/');
1268 recursive
= (next_p
!= NULL
);
1270 notify_trigger_local(notify
, action
, filter
,
1271 path
, path_len
, recursive
);
1275 int notify_cluster_proxy_recv(struct tevent_req
*req
)
1277 return tevent_req_simple_recv_unix(req
);