2 * Unix SMB/CIFS implementation.
4 * Copyright (C) Volker Lendecke 2014
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "librpc/gen_ndr/notify.h"
22 #include "librpc/gen_ndr/messaging.h"
23 #include "librpc/gen_ndr/server_id.h"
24 #include "lib/dbwrap/dbwrap.h"
25 #include "lib/dbwrap/dbwrap_rbt.h"
31 #include "lib/util/server_id_db.h"
32 #include "lib/util/tevent_unix.h"
33 #include "ctdbd_conn.h"
34 #include "ctdb_srvids.h"
35 #include "source3/smbd/proto.h"
36 #include "ctdb/include/ctdb_protocol.h"
37 #include "server_id_db_util.h"
38 #include "lib/util/iov_buf.h"
39 #include "messages_util.h"
44 * All of notifyd's state
47 struct notifyd_state
{
48 struct tevent_context
*ev
;
49 struct messaging_context
*msg_ctx
;
50 struct ctdbd_connection
*ctdbd_conn
;
53 * Database of everything clients show interest in. Indexed by
54 * absolute path. The database keys are not 0-terminated
55 * because the criticial operation, notifyd_trigger, can walk
56 * the structure from the top without adding intermediate 0s.
57 * The database records contain an array of
59 * struct notifyd_instance
61 * to be maintained by parsed by notifyd_entry_parse()
63 struct db_context
*entries
;
66 * In the cluster case, this is the place where we store a log
67 * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
68 * forward them to our peer notifyd's in the cluster once a
69 * second or when the log grows too large.
72 struct messaging_reclog
*log
;
75 * Array of companion notifyd's in a cluster. Every notifyd
76 * broadcasts its messaging_reclog to every other notifyd in
77 * the cluster. This is done by making ctdb send a message to
78 * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
79 * number CTDB_BROADCAST_VNNMAP. Everybody in the cluster who
80 * had called register_with_ctdbd this srvid will receive the
83 * Database replication happens via these broadcasts. Also,
84 * they serve as liveness indication. If a notifyd receives a
85 * broadcast from an unknown peer, it will create one for this
86 * srvid. Also when we don't hear anything from a peer for a
87 * while, we will discard it.
90 struct notifyd_peer
**peers
;
93 sys_notify_watch_fn sys_notify_watch
;
94 struct sys_notify_context
*sys_notify_ctx
;
98 * notifyd's representation of a notify instance
100 struct notifyd_instance
{
101 struct server_id client
;
102 struct notify_instance instance
;
104 void *sys_watch
; /* inotify/fam/etc handle */
107 * Filters after sys_watch took responsibility of some bits
109 uint32_t internal_filter
;
110 uint32_t internal_subdir_filter
;
113 struct notifyd_peer
{
114 struct notifyd_state
*state
;
115 struct server_id pid
;
117 struct db_context
*db
;
118 time_t last_broadcast
;
121 static bool notifyd_rec_change(struct messaging_context
*msg_ctx
,
122 struct messaging_rec
**prec
,
124 static bool notifyd_trigger(struct messaging_context
*msg_ctx
,
125 struct messaging_rec
**prec
,
127 static bool notifyd_get_db(struct messaging_context
*msg_ctx
,
128 struct messaging_rec
**prec
,
130 static bool notifyd_got_db(struct messaging_context
*msg_ctx
,
131 struct messaging_rec
**prec
,
133 static void notifyd_broadcast_reclog(struct ctdbd_connection
*ctdbd_conn
,
134 struct server_id src
,
135 struct messaging_reclog
*log
);
136 static void notifyd_sys_callback(struct sys_notify_context
*ctx
,
137 void *private_data
, struct notify_event
*ev
);
139 static struct tevent_req
*notifyd_broadcast_reclog_send(
140 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
141 struct ctdbd_connection
*ctdbd_conn
, struct server_id src
,
142 struct messaging_reclog
*log
);
143 static int notifyd_broadcast_reclog_recv(struct tevent_req
*req
);
145 static struct tevent_req
*notifyd_clean_peers_send(
146 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
147 struct notifyd_state
*notifyd
);
148 static int notifyd_clean_peers_recv(struct tevent_req
*req
);
150 static int sys_notify_watch_dummy(
152 struct sys_notify_context
*ctx
,
155 uint32_t *subdir_filter
,
156 void (*callback
)(struct sys_notify_context
*ctx
,
158 struct notify_event
*ev
),
162 void **handle
= handle_p
;
167 static void notifyd_handler_done(struct tevent_req
*subreq
);
168 static void notifyd_broadcast_reclog_finished(struct tevent_req
*subreq
);
169 static void notifyd_clean_peers_finished(struct tevent_req
*subreq
);
170 static void notifyd_snoop_broadcast(uint32_t src_vnn
, uint32_t dst_vnn
,
172 const uint8_t *msg
, size_t msglen
,
175 struct tevent_req
*notifyd_send(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
176 struct messaging_context
*msg_ctx
,
177 struct ctdbd_connection
*ctdbd_conn
,
178 sys_notify_watch_fn sys_notify_watch
,
179 struct sys_notify_context
*sys_notify_ctx
)
181 struct tevent_req
*req
, *subreq
;
182 struct notifyd_state
*state
;
183 struct server_id_db
*names_db
;
187 req
= tevent_req_create(mem_ctx
, &state
, struct notifyd_state
);
192 state
->msg_ctx
= msg_ctx
;
193 state
->ctdbd_conn
= ctdbd_conn
;
195 if (sys_notify_watch
== NULL
) {
196 sys_notify_watch
= sys_notify_watch_dummy
;
199 state
->sys_notify_watch
= sys_notify_watch
;
200 state
->sys_notify_ctx
= sys_notify_ctx
;
202 state
->entries
= db_open_rbt(state
);
203 if (tevent_req_nomem(state
->entries
, req
)) {
204 return tevent_req_post(req
, ev
);
207 subreq
= messaging_handler_send(state
, ev
, msg_ctx
,
208 MSG_SMB_NOTIFY_REC_CHANGE
,
209 notifyd_rec_change
, state
);
210 if (tevent_req_nomem(subreq
, req
)) {
211 return tevent_req_post(req
, ev
);
213 tevent_req_set_callback(subreq
, notifyd_handler_done
, req
);
215 subreq
= messaging_handler_send(state
, ev
, msg_ctx
,
216 MSG_SMB_NOTIFY_TRIGGER
,
217 notifyd_trigger
, state
);
218 if (tevent_req_nomem(subreq
, req
)) {
219 return tevent_req_post(req
, ev
);
221 tevent_req_set_callback(subreq
, notifyd_handler_done
, req
);
223 subreq
= messaging_handler_send(state
, ev
, msg_ctx
,
224 MSG_SMB_NOTIFY_GET_DB
,
225 notifyd_get_db
, state
);
226 if (tevent_req_nomem(subreq
, req
)) {
227 return tevent_req_post(req
, ev
);
229 tevent_req_set_callback(subreq
, notifyd_handler_done
, req
);
231 subreq
= messaging_handler_send(state
, ev
, msg_ctx
,
233 notifyd_got_db
, state
);
234 if (tevent_req_nomem(subreq
, req
)) {
235 return tevent_req_post(req
, ev
);
237 tevent_req_set_callback(subreq
, notifyd_handler_done
, req
);
239 names_db
= messaging_names_db(msg_ctx
);
241 ret
= server_id_db_set_exclusive(names_db
, "notify-daemon");
243 DEBUG(10, ("%s: server_id_db_add failed: %s\n",
244 __func__
, strerror(ret
)));
245 tevent_req_error(req
, ret
);
246 return tevent_req_post(req
, ev
);
249 if (ctdbd_conn
== NULL
) {
251 * No cluster around, skip the database replication
257 state
->log
= talloc_zero(state
, struct messaging_reclog
);
258 if (tevent_req_nomem(state
->log
, req
)) {
259 return tevent_req_post(req
, ev
);
262 subreq
= notifyd_broadcast_reclog_send(
263 state
->log
, ev
, ctdbd_conn
, messaging_server_id(msg_ctx
),
265 if (tevent_req_nomem(subreq
, req
)) {
266 return tevent_req_post(req
, ev
);
268 tevent_req_set_callback(subreq
, notifyd_broadcast_reclog_finished
,
271 subreq
= notifyd_clean_peers_send(state
, ev
, state
);
272 if (tevent_req_nomem(subreq
, req
)) {
273 return tevent_req_post(req
, ev
);
275 tevent_req_set_callback(subreq
, notifyd_clean_peers_finished
,
278 status
= register_with_ctdbd(ctdbd_conn
, CTDB_SRVID_SAMBA_NOTIFY_PROXY
,
279 notifyd_snoop_broadcast
, state
);
280 if (!NT_STATUS_IS_OK(status
)) {
281 tevent_req_error(req
, map_errno_from_nt_status(status
));
282 return tevent_req_post(req
, ev
);
288 static void notifyd_handler_done(struct tevent_req
*subreq
)
290 struct tevent_req
*req
= tevent_req_callback_data(
291 subreq
, struct tevent_req
);
294 ret
= messaging_handler_recv(subreq
);
296 tevent_req_error(req
, ret
);
299 static void notifyd_broadcast_reclog_finished(struct tevent_req
*subreq
)
301 struct tevent_req
*req
= tevent_req_callback_data(
302 subreq
, struct tevent_req
);
305 ret
= notifyd_broadcast_reclog_recv(subreq
);
307 tevent_req_error(req
, ret
);
310 static void notifyd_clean_peers_finished(struct tevent_req
*subreq
)
312 struct tevent_req
*req
= tevent_req_callback_data(
313 subreq
, struct tevent_req
);
316 ret
= notifyd_clean_peers_recv(subreq
);
318 tevent_req_error(req
, ret
);
321 int notifyd_recv(struct tevent_req
*req
)
323 return tevent_req_simple_recv_unix(req
);
327 * Parse an entry in the notifyd_context->entries database
330 static bool notifyd_parse_entry(uint8_t *buf
, size_t buflen
,
331 struct notifyd_instance
**instances
,
332 size_t *num_instances
)
334 if ((buflen
% sizeof(struct notifyd_instance
)) != 0) {
335 DEBUG(1, ("%s: invalid buffer size: %u\n",
336 __func__
, (unsigned)buflen
));
340 if (instances
!= NULL
) {
341 *instances
= (struct notifyd_instance
*)buf
;
343 if (num_instances
!= NULL
) {
344 *num_instances
= buflen
/ sizeof(struct notifyd_instance
);
349 static bool notifyd_apply_rec_change(
350 const struct server_id
*client
,
351 const char *path
, size_t pathlen
,
352 const struct notify_instance
*chg
,
353 struct db_context
*entries
,
354 sys_notify_watch_fn sys_notify_watch
,
355 struct sys_notify_context
*sys_notify_ctx
,
356 struct messaging_context
*msg_ctx
)
358 struct db_record
*rec
;
359 struct notifyd_instance
*instances
;
360 size_t num_instances
;
362 struct notifyd_instance
*instance
;
368 DEBUG(1, ("%s: pathlen==0\n", __func__
));
371 if (path
[pathlen
-1] != '\0') {
372 DEBUG(1, ("%s: path not 0-terminated\n", __func__
));
376 DEBUG(10, ("%s: path=%s, filter=%u, subdir_filter=%u, "
377 "private_data=%p\n", __func__
, path
,
378 (unsigned)chg
->filter
, (unsigned)chg
->subdir_filter
,
381 rec
= dbwrap_fetch_locked(
383 make_tdb_data((const uint8_t *)path
, pathlen
-1));
386 DEBUG(1, ("%s: dbwrap_fetch_locked failed\n", __func__
));
391 value
= dbwrap_record_get_value(rec
);
393 if (value
.dsize
!= 0) {
394 if (!notifyd_parse_entry(value
.dptr
, value
.dsize
, NULL
,
401 * Overallocate by one instance to avoid a realloc when adding
403 instances
= talloc_array(rec
, struct notifyd_instance
,
405 if (instances
== NULL
) {
406 DEBUG(1, ("%s: talloc failed\n", __func__
));
410 if (value
.dsize
!= 0) {
411 memcpy(instances
, value
.dptr
, value
.dsize
);
414 for (i
=0; i
<num_instances
; i
++) {
415 instance
= &instances
[i
];
417 if (server_id_equal(&instance
->client
, client
) &&
418 (instance
->instance
.private_data
== chg
->private_data
)) {
423 if (i
< num_instances
) {
424 instance
->instance
= *chg
;
427 * We've overallocated for one instance
429 instance
= &instances
[num_instances
];
431 *instance
= (struct notifyd_instance
) {
434 .internal_filter
= chg
->filter
,
435 .internal_subdir_filter
= chg
->subdir_filter
441 if ((instance
->instance
.filter
!= 0) ||
442 (instance
->instance
.subdir_filter
!= 0)) {
445 TALLOC_FREE(instance
->sys_watch
);
447 ret
= sys_notify_watch(entries
, sys_notify_ctx
, path
,
448 &instance
->internal_filter
,
449 &instance
->internal_subdir_filter
,
450 notifyd_sys_callback
, msg_ctx
,
451 &instance
->sys_watch
);
453 DEBUG(1, ("%s: inotify_watch returned %s\n",
454 __func__
, strerror(errno
)));
458 if ((instance
->instance
.filter
== 0) &&
459 (instance
->instance
.subdir_filter
== 0)) {
460 /* This is a delete request */
461 TALLOC_FREE(instance
->sys_watch
);
462 *instance
= instances
[num_instances
-1];
466 DEBUG(10, ("%s: %s has %u instances\n", __func__
,
467 path
, (unsigned)num_instances
));
469 if (num_instances
== 0) {
470 status
= dbwrap_record_delete(rec
);
471 if (!NT_STATUS_IS_OK(status
)) {
472 DEBUG(1, ("%s: dbwrap_record_delete returned %s\n",
473 __func__
, nt_errstr(status
)));
477 value
= make_tdb_data(
478 (uint8_t *)instances
,
479 sizeof(struct notifyd_instance
) * num_instances
);
481 status
= dbwrap_record_store(rec
, value
, 0);
482 if (!NT_STATUS_IS_OK(status
)) {
483 DEBUG(1, ("%s: dbwrap_record_store returned %s\n",
484 __func__
, nt_errstr(status
)));
495 static void notifyd_sys_callback(struct sys_notify_context
*ctx
,
496 void *private_data
, struct notify_event
*ev
)
498 struct messaging_context
*msg_ctx
= talloc_get_type_abort(
499 private_data
, struct messaging_context
);
500 struct notify_trigger_msg msg
;
504 msg
= (struct notify_trigger_msg
) {
505 .when
= timespec_current(),
506 .action
= ev
->action
,
510 iov
[0].iov_base
= &msg
;
511 iov
[0].iov_len
= offsetof(struct notify_trigger_msg
, path
);
512 iov
[1].iov_base
= discard_const_p(char, ev
->dir
);
513 iov
[1].iov_len
= strlen(ev
->dir
);
514 iov
[2].iov_base
= &slash
;
516 iov
[3].iov_base
= discard_const_p(char, ev
->path
);
517 iov
[3].iov_len
= strlen(ev
->path
)+1;
520 msg_ctx
, messaging_server_id(msg_ctx
),
521 MSG_SMB_NOTIFY_TRIGGER
, iov
, ARRAY_SIZE(iov
), NULL
, 0);
524 static bool notifyd_parse_rec_change(uint8_t *buf
, size_t bufsize
,
525 struct notify_rec_change_msg
**pmsg
,
528 struct notify_rec_change_msg
*msg
;
530 if (bufsize
< offsetof(struct notify_rec_change_msg
, path
) + 1) {
531 DEBUG(1, ("%s: message too short, ignoring: %u\n", __func__
,
536 *pmsg
= msg
= (struct notify_rec_change_msg
*)buf
;
537 *pathlen
= bufsize
- offsetof(struct notify_rec_change_msg
, path
);
539 DEBUG(10, ("%s: Got rec_change_msg filter=%u, subdir_filter=%u, "
540 "private_data=%p, path=%.*s\n",
541 __func__
, (unsigned)msg
->instance
.filter
,
542 (unsigned)msg
->instance
.subdir_filter
,
543 msg
->instance
.private_data
, (int)(*pathlen
), msg
->path
));
548 static bool notifyd_rec_change(struct messaging_context
*msg_ctx
,
549 struct messaging_rec
**prec
,
552 struct notifyd_state
*state
= talloc_get_type_abort(
553 private_data
, struct notifyd_state
);
554 struct server_id_buf idbuf
;
555 struct messaging_rec
*rec
= *prec
;
556 struct messaging_rec
**tmp
;
557 struct messaging_reclog
*log
;
558 struct notify_rec_change_msg
*msg
;
562 DEBUG(10, ("%s: Got %d bytes from %s\n", __func__
,
563 (unsigned)rec
->buf
.length
,
564 server_id_str_buf(rec
->src
, &idbuf
)));
566 ok
= notifyd_parse_rec_change(rec
->buf
.data
, rec
->buf
.length
,
572 ok
= notifyd_apply_rec_change(
573 &rec
->src
, msg
->path
, pathlen
, &msg
->instance
,
574 state
->entries
, state
->sys_notify_watch
, state
->sys_notify_ctx
,
577 DEBUG(1, ("%s: notifyd_apply_rec_change failed, ignoring\n",
582 if ((state
->log
== NULL
) || (state
->ctdbd_conn
== NULL
)) {
587 tmp
= talloc_realloc(log
, log
->recs
, struct messaging_rec
*,
590 DEBUG(1, ("%s: talloc_realloc failed, ignoring\n", __func__
));
595 log
->recs
[log
->num_recs
] = talloc_move(log
->recs
, prec
);
598 if (log
->num_recs
>= 100) {
600 * Don't let the log grow too large
602 notifyd_broadcast_reclog(state
->ctdbd_conn
,
603 messaging_server_id(msg_ctx
), log
);
609 struct notifyd_trigger_state
{
610 struct messaging_context
*msg_ctx
;
611 struct notify_trigger_msg
*msg
;
613 bool covered_by_sys_notify
;
616 static void notifyd_trigger_parser(TDB_DATA key
, TDB_DATA data
,
619 static bool notifyd_trigger(struct messaging_context
*msg_ctx
,
620 struct messaging_rec
**prec
,
623 struct notifyd_state
*state
= talloc_get_type_abort(
624 private_data
, struct notifyd_state
);
625 struct server_id my_id
= messaging_server_id(msg_ctx
);
626 struct messaging_rec
*rec
= *prec
;
627 struct notifyd_trigger_state tstate
;
629 const char *p
, *next_p
;
631 if (rec
->buf
.length
< offsetof(struct notify_trigger_msg
, path
) + 1) {
632 DEBUG(1, ("message too short, ignoring: %u\n",
633 (unsigned)rec
->buf
.length
));
636 if (rec
->buf
.data
[rec
->buf
.length
-1] != 0) {
637 DEBUG(1, ("%s: path not 0-terminated, ignoring\n", __func__
));
641 tstate
.msg_ctx
= msg_ctx
;
643 tstate
.covered_by_sys_notify
= (rec
->src
.vnn
== my_id
.vnn
);
644 tstate
.covered_by_sys_notify
&= !server_id_equal(&rec
->src
, &my_id
);
646 tstate
.msg
= (struct notify_trigger_msg
*)rec
->buf
.data
;
647 path
= tstate
.msg
->path
;
649 DEBUG(10, ("%s: Got trigger_msg action=%u, filter=%u, path=%s\n",
650 __func__
, (unsigned)tstate
.msg
->action
,
651 (unsigned)tstate
.msg
->filter
, path
));
653 if (path
[0] != '/') {
654 DEBUG(1, ("%s: path %s does not start with /, ignoring\n",
659 for (p
= strchr(path
+1, '/'); p
!= NULL
; p
= next_p
) {
660 ptrdiff_t path_len
= p
- path
;
664 next_p
= strchr(p
+1, '/');
665 tstate
.recursive
= (next_p
!= NULL
);
667 DEBUG(10, ("%s: Trying path %.*s\n", __func__
,
668 (int)path_len
, path
));
670 key
= (TDB_DATA
) { .dptr
= discard_const_p(uint8_t, path
),
673 dbwrap_parse_record(state
->entries
, key
,
674 notifyd_trigger_parser
, &tstate
);
676 if (state
->peers
== NULL
) {
680 if (rec
->src
.vnn
!= my_id
.vnn
) {
684 for (i
=0; i
<state
->num_peers
; i
++) {
685 if (state
->peers
[i
]->db
== NULL
) {
687 * Inactive peer, did not get a db yet
691 dbwrap_parse_record(state
->peers
[i
]->db
, key
,
692 notifyd_trigger_parser
, &tstate
);
699 static void notifyd_send_delete(struct messaging_context
*msg_ctx
,
701 struct notifyd_instance
*instance
);
703 static void notifyd_trigger_parser(TDB_DATA key
, TDB_DATA data
,
707 struct notifyd_trigger_state
*tstate
= private_data
;
708 struct notify_event_msg msg
= { .action
= tstate
->msg
->action
};
710 size_t path_len
= key
.dsize
;
711 struct notifyd_instance
*instances
= NULL
;
712 size_t num_instances
= 0;
715 if (!notifyd_parse_entry(data
.dptr
, data
.dsize
, &instances
,
717 DEBUG(1, ("%s: Could not parse notifyd_entry\n", __func__
));
721 DEBUG(10, ("%s: Found %u instances for %.*s\n", __func__
,
722 (unsigned)num_instances
, (int)key
.dsize
,
725 iov
[0].iov_base
= &msg
;
726 iov
[0].iov_len
= offsetof(struct notify_event_msg
, path
);
727 iov
[1].iov_base
= tstate
->msg
->path
+ path_len
+ 1;
728 iov
[1].iov_len
= strlen((char *)(iov
[1].iov_base
)) + 1;
730 for (i
=0; i
<num_instances
; i
++) {
731 struct notifyd_instance
*instance
= &instances
[i
];
732 struct server_id_buf idbuf
;
736 if (tstate
->covered_by_sys_notify
) {
737 if (tstate
->recursive
) {
738 i_filter
= instance
->internal_subdir_filter
;
740 i_filter
= instance
->internal_filter
;
743 if (tstate
->recursive
) {
744 i_filter
= instance
->instance
.subdir_filter
;
746 i_filter
= instance
->instance
.filter
;
750 if ((i_filter
& tstate
->msg
->filter
) == 0) {
754 msg
.private_data
= instance
->instance
.private_data
;
756 status
= messaging_send_iov(
757 tstate
->msg_ctx
, instance
->client
,
758 MSG_PVFS_NOTIFY
, iov
, ARRAY_SIZE(iov
), NULL
, 0);
760 DEBUG(10, ("%s: messaging_send_iov to %s returned %s\n",
762 server_id_str_buf(instance
->client
, &idbuf
),
765 if (NT_STATUS_EQUAL(status
, NT_STATUS_OBJECT_NAME_NOT_FOUND
) &&
766 procid_is_local(&instance
->client
)) {
768 * That process has died
770 notifyd_send_delete(tstate
->msg_ctx
, key
, instance
);
774 if (!NT_STATUS_IS_OK(status
)) {
775 DEBUG(1, ("%s: messaging_send_iov returned %s\n",
776 __func__
, nt_errstr(status
)));
782 * Send a delete request to ourselves to properly discard a notify
783 * record for an smbd that has died.
786 static void notifyd_send_delete(struct messaging_context
*msg_ctx
,
788 struct notifyd_instance
*instance
)
790 struct notify_rec_change_msg msg
= {
791 .instance
.private_data
= instance
->instance
.private_data
798 * Send a rec_change to ourselves to delete a dead entry
801 iov
[0] = (struct iovec
) {
803 .iov_len
= offsetof(struct notify_rec_change_msg
, path
) };
804 iov
[1] = (struct iovec
) { .iov_base
= key
.dptr
, .iov_len
= key
.dsize
};
805 iov
[2] = (struct iovec
) { .iov_base
= &nul
, .iov_len
= sizeof(nul
) };
807 status
= messaging_send_iov_from(
808 msg_ctx
, instance
->client
, messaging_server_id(msg_ctx
),
809 MSG_SMB_NOTIFY_REC_CHANGE
, iov
, ARRAY_SIZE(iov
), NULL
, 0);
811 if (!NT_STATUS_IS_OK(status
)) {
812 DEBUG(10, ("%s: messaging_send_iov_from returned %s\n",
813 __func__
, nt_errstr(status
)));
817 static bool notifyd_get_db(struct messaging_context
*msg_ctx
,
818 struct messaging_rec
**prec
,
821 struct notifyd_state
*state
= talloc_get_type_abort(
822 private_data
, struct notifyd_state
);
823 struct messaging_rec
*rec
= *prec
;
824 struct server_id_buf id1
, id2
;
826 uint64_t rec_index
= UINT64_MAX
;
827 uint8_t index_buf
[sizeof(uint64_t)];
832 dbsize
= dbwrap_marshall(state
->entries
, NULL
, 0);
834 buf
= talloc_array(rec
, uint8_t, dbsize
);
836 DEBUG(1, ("%s: talloc_array(%ju) failed\n",
837 __func__
, (uintmax_t)dbsize
));
841 dbsize
= dbwrap_marshall(state
->entries
, buf
, dbsize
);
843 if (dbsize
!= talloc_get_size(buf
)) {
844 DEBUG(1, ("%s: dbsize changed: %ju->%ju\n", __func__
,
845 (uintmax_t)talloc_get_size(buf
),
851 if (state
->log
!= NULL
) {
852 rec_index
= state
->log
->rec_index
;
854 SBVAL(index_buf
, 0, rec_index
);
856 iov
[0] = (struct iovec
) { .iov_base
= index_buf
,
857 .iov_len
= sizeof(index_buf
) };
858 iov
[1] = (struct iovec
) { .iov_base
= buf
,
861 DEBUG(10, ("%s: Sending %ju bytes to %s->%s\n", __func__
,
862 (uintmax_t)iov_buflen(iov
, ARRAY_SIZE(iov
)),
863 server_id_str_buf(messaging_server_id(msg_ctx
), &id1
),
864 server_id_str_buf(rec
->src
, &id2
)));
866 status
= messaging_send_iov(msg_ctx
, rec
->src
, MSG_SMB_NOTIFY_DB
,
867 iov
, ARRAY_SIZE(iov
), NULL
, 0);
869 if (!NT_STATUS_IS_OK(status
)) {
870 DEBUG(1, ("%s: messaging_send_iov failed: %s\n",
871 __func__
, nt_errstr(status
)));
877 static int notifyd_add_proxy_syswatches(struct db_record
*rec
,
880 static bool notifyd_got_db(struct messaging_context
*msg_ctx
,
881 struct messaging_rec
**prec
,
884 struct notifyd_state
*state
= talloc_get_type_abort(
885 private_data
, struct notifyd_state
);
886 struct messaging_rec
*rec
= *prec
;
887 struct notifyd_peer
*p
= NULL
;
888 struct server_id_buf idbuf
;
893 for (i
=0; i
<state
->num_peers
; i
++) {
894 if (server_id_equal(&rec
->src
, &state
->peers
[i
]->pid
)) {
901 DEBUG(10, ("%s: Did not find peer for db from %s\n",
902 __func__
, server_id_str_buf(rec
->src
, &idbuf
)));
906 if (rec
->buf
.length
< 8) {
907 DEBUG(10, ("%s: Got short db length %u from %s\n", __func__
,
908 (unsigned)rec
->buf
.length
,
909 server_id_str_buf(rec
->src
, &idbuf
)));
914 p
->rec_index
= BVAL(rec
->buf
.data
, 0);
916 p
->db
= db_open_rbt(p
);
918 DEBUG(10, ("%s: db_open_rbt failed\n", __func__
));
923 status
= dbwrap_unmarshall(p
->db
, rec
->buf
.data
+ 8,
924 rec
->buf
.length
- 8);
925 if (!NT_STATUS_IS_OK(status
)) {
926 DEBUG(10, ("%s: dbwrap_unmarshall returned %s for db %s\n",
927 __func__
, nt_errstr(status
),
928 server_id_str_buf(rec
->src
, &idbuf
)));
933 dbwrap_traverse_read(p
->db
, notifyd_add_proxy_syswatches
, state
,
936 DEBUG(10, ("%s: Database from %s contained %d records\n", __func__
,
937 server_id_str_buf(rec
->src
, &idbuf
), count
));
942 static void notifyd_broadcast_reclog(struct ctdbd_connection
*ctdbd_conn
,
943 struct server_id src
,
944 struct messaging_reclog
*log
)
947 enum ndr_err_code ndr_err
;
948 uint8_t msghdr
[MESSAGE_HDR_LENGTH
];
956 DEBUG(10, ("%s: rec_index=%ju, num_recs=%u\n", __func__
,
957 (uintmax_t)log
->rec_index
, (unsigned)log
->num_recs
));
959 message_hdr_put(msghdr
, MSG_SMB_NOTIFY_REC_CHANGES
, src
,
960 (struct server_id
) {0 });
961 iov
[0] = (struct iovec
) { .iov_base
= msghdr
,
962 .iov_len
= sizeof(msghdr
) };
964 ndr_err
= ndr_push_struct_blob(
966 (ndr_push_flags_fn_t
)ndr_push_messaging_reclog
);
967 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
968 DEBUG(1, ("%s: ndr_push_messaging_recs failed: %s\n",
969 __func__
, ndr_errstr(ndr_err
)));
972 iov
[1] = (struct iovec
) { .iov_base
= blob
.data
,
973 .iov_len
= blob
.length
};
975 status
= ctdbd_messaging_send_iov(
976 ctdbd_conn
, CTDB_BROADCAST_VNNMAP
,
977 CTDB_SRVID_SAMBA_NOTIFY_PROXY
, iov
, ARRAY_SIZE(iov
));
978 TALLOC_FREE(blob
.data
);
979 if (!NT_STATUS_IS_OK(status
)) {
980 DEBUG(1, ("%s: ctdbd_messaging_send failed: %s\n",
981 __func__
, nt_errstr(status
)));
989 TALLOC_FREE(log
->recs
);
992 struct notifyd_broadcast_reclog_state
{
993 struct tevent_context
*ev
;
994 struct ctdbd_connection
*ctdbd_conn
;
995 struct server_id src
;
996 struct messaging_reclog
*log
;
999 static void notifyd_broadcast_reclog_next(struct tevent_req
*subreq
);
1001 static struct tevent_req
*notifyd_broadcast_reclog_send(
1002 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
1003 struct ctdbd_connection
*ctdbd_conn
, struct server_id src
,
1004 struct messaging_reclog
*log
)
1006 struct tevent_req
*req
, *subreq
;
1007 struct notifyd_broadcast_reclog_state
*state
;
1009 req
= tevent_req_create(mem_ctx
, &state
,
1010 struct notifyd_broadcast_reclog_state
);
1015 state
->ctdbd_conn
= ctdbd_conn
;
1019 subreq
= tevent_wakeup_send(state
, state
->ev
,
1020 timeval_current_ofs_msec(1000));
1021 if (tevent_req_nomem(subreq
, req
)) {
1022 return tevent_req_post(req
, ev
);
1024 tevent_req_set_callback(subreq
, notifyd_broadcast_reclog_next
, req
);
1028 static void notifyd_broadcast_reclog_next(struct tevent_req
*subreq
)
1030 struct tevent_req
*req
= tevent_req_callback_data(
1031 subreq
, struct tevent_req
);
1032 struct notifyd_broadcast_reclog_state
*state
= tevent_req_data(
1033 req
, struct notifyd_broadcast_reclog_state
);
1036 ok
= tevent_wakeup_recv(subreq
);
1037 TALLOC_FREE(subreq
);
1039 tevent_req_oom(req
);
1043 notifyd_broadcast_reclog(state
->ctdbd_conn
, state
->src
, state
->log
);
1045 subreq
= tevent_wakeup_send(state
, state
->ev
,
1046 timeval_current_ofs_msec(1000));
1047 if (tevent_req_nomem(subreq
, req
)) {
1050 tevent_req_set_callback(subreq
, notifyd_broadcast_reclog_next
, req
);
1053 static int notifyd_broadcast_reclog_recv(struct tevent_req
*req
)
1055 return tevent_req_simple_recv_unix(req
);
1058 struct notifyd_clean_peers_state
{
1059 struct tevent_context
*ev
;
1060 struct notifyd_state
*notifyd
;
1063 static void notifyd_clean_peers_next(struct tevent_req
*subreq
);
1065 static struct tevent_req
*notifyd_clean_peers_send(
1066 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
1067 struct notifyd_state
*notifyd
)
1069 struct tevent_req
*req
, *subreq
;
1070 struct notifyd_clean_peers_state
*state
;
1072 req
= tevent_req_create(mem_ctx
, &state
,
1073 struct notifyd_clean_peers_state
);
1078 state
->notifyd
= notifyd
;
1080 subreq
= tevent_wakeup_send(state
, state
->ev
,
1081 timeval_current_ofs_msec(30000));
1082 if (tevent_req_nomem(subreq
, req
)) {
1083 return tevent_req_post(req
, ev
);
1085 tevent_req_set_callback(subreq
, notifyd_clean_peers_next
, req
);
1089 static void notifyd_clean_peers_next(struct tevent_req
*subreq
)
1091 struct tevent_req
*req
= tevent_req_callback_data(
1092 subreq
, struct tevent_req
);
1093 struct notifyd_clean_peers_state
*state
= tevent_req_data(
1094 req
, struct notifyd_clean_peers_state
);
1095 struct notifyd_state
*notifyd
= state
->notifyd
;
1098 time_t now
= time(NULL
);
1100 ok
= tevent_wakeup_recv(subreq
);
1101 TALLOC_FREE(subreq
);
1103 tevent_req_oom(req
);
1108 while (i
< notifyd
->num_peers
) {
1109 struct notifyd_peer
*p
= notifyd
->peers
[i
];
1111 if ((now
- p
->last_broadcast
) > 60) {
1112 struct server_id_buf idbuf
;
1115 * Haven't heard for more than 60 seconds. Call this
1119 DEBUG(10, ("%s: peer %s died\n", __func__
,
1120 server_id_str_buf(p
->pid
, &idbuf
)));
1122 * This implicitly decrements notifyd->num_peers
1130 subreq
= tevent_wakeup_send(state
, state
->ev
,
1131 timeval_current_ofs_msec(30000));
1132 if (tevent_req_nomem(subreq
, req
)) {
1135 tevent_req_set_callback(subreq
, notifyd_clean_peers_next
, req
);
1138 static int notifyd_clean_peers_recv(struct tevent_req
*req
)
1140 return tevent_req_simple_recv_unix(req
);
1143 static int notifyd_add_proxy_syswatches(struct db_record
*rec
,
1146 struct notifyd_state
*state
= talloc_get_type_abort(
1147 private_data
, struct notifyd_state
);
1148 struct db_context
*db
= dbwrap_record_get_db(rec
);
1149 TDB_DATA key
= dbwrap_record_get_key(rec
);
1150 TDB_DATA value
= dbwrap_record_get_value(rec
);
1151 struct notifyd_instance
*instances
= NULL
;
1152 size_t num_instances
= 0;
1154 char path
[key
.dsize
+1];
1157 memcpy(path
, key
.dptr
, key
.dsize
);
1158 path
[key
.dsize
] = '\0';
1160 ok
= notifyd_parse_entry(value
.dptr
, value
.dsize
, &instances
,
1163 DEBUG(1, ("%s: Could not parse notifyd entry for %s\n",
1168 for (i
=0; i
<num_instances
; i
++) {
1169 struct notifyd_instance
*instance
= &instances
[i
];
1170 uint32_t filter
= instance
->instance
.filter
;
1171 uint32_t subdir_filter
= instance
->instance
.subdir_filter
;
1174 ret
= state
->sys_notify_watch(
1175 db
, state
->sys_notify_ctx
, path
,
1176 &filter
, &subdir_filter
,
1177 notifyd_sys_callback
, state
->msg_ctx
,
1178 &instance
->sys_watch
);
1180 DEBUG(1, ("%s: inotify_watch returned %s\n",
1181 __func__
, strerror(errno
)));
1188 static int notifyd_db_del_syswatches(struct db_record
*rec
, void *private_data
)
1190 TDB_DATA key
= dbwrap_record_get_key(rec
);
1191 TDB_DATA value
= dbwrap_record_get_value(rec
);
1192 struct notifyd_instance
*instances
= NULL
;
1193 size_t num_instances
= 0;
1197 ok
= notifyd_parse_entry(value
.dptr
, value
.dsize
, &instances
,
1200 DEBUG(1, ("%s: Could not parse notifyd entry for %.*s\n",
1201 __func__
, (int)key
.dsize
, (char *)key
.dptr
));
1204 for (i
=0; i
<num_instances
; i
++) {
1205 TALLOC_FREE(instances
[i
].sys_watch
);
1210 static int notifyd_peer_destructor(struct notifyd_peer
*p
)
1212 struct notifyd_state
*state
= p
->state
;
1215 dbwrap_traverse_read(p
->db
, notifyd_db_del_syswatches
, NULL
, NULL
);
1217 for (i
= 0; i
<state
->num_peers
; i
++) {
1218 if (p
== state
->peers
[i
]) {
1219 state
->peers
[i
] = state
->peers
[state
->num_peers
-1];
1220 state
->num_peers
-= 1;
1227 static struct notifyd_peer
*notifyd_peer_new(
1228 struct notifyd_state
*state
, struct server_id pid
)
1230 struct notifyd_peer
*p
, **tmp
;
1232 tmp
= talloc_realloc(state
, state
->peers
, struct notifyd_peer
*,
1233 state
->num_peers
+1);
1239 p
= talloc_zero(state
->peers
, struct notifyd_peer
);
1246 state
->peers
[state
->num_peers
] = p
;
1247 state
->num_peers
+= 1;
1249 talloc_set_destructor(p
, notifyd_peer_destructor
);
1254 static void notifyd_apply_reclog(struct notifyd_peer
*peer
,
1255 const uint8_t *msg
, size_t msglen
)
1257 struct notifyd_state
*state
= peer
->state
;
1258 DATA_BLOB blob
= { .data
= discard_const_p(uint8_t, msg
),
1260 struct server_id_buf idbuf
;
1261 struct messaging_reclog
*log
;
1262 enum ndr_err_code ndr_err
;
1265 if (peer
->db
== NULL
) {
1272 log
= talloc(peer
, struct messaging_reclog
);
1274 DEBUG(10, ("%s: talloc failed\n", __func__
));
1278 ndr_err
= ndr_pull_struct_blob_all(
1280 (ndr_pull_flags_fn_t
)ndr_pull_messaging_reclog
);
1281 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
1282 DEBUG(10, ("%s: ndr_pull_messaging_reclog failed: %s\n",
1283 __func__
, ndr_errstr(ndr_err
)));
1287 DEBUG(10, ("%s: Got %u recs index %ju from %s\n", __func__
,
1288 (unsigned)log
->num_recs
, (uintmax_t)log
->rec_index
,
1289 server_id_str_buf(peer
->pid
, &idbuf
)));
1291 if (log
->rec_index
!= peer
->rec_index
) {
1292 DEBUG(3, ("%s: Got rec index %ju from %s, expected %ju\n",
1293 __func__
, (uintmax_t)log
->rec_index
,
1294 server_id_str_buf(peer
->pid
, &idbuf
),
1295 (uintmax_t)peer
->rec_index
));
1299 for (i
=0; i
<log
->num_recs
; i
++) {
1300 struct messaging_rec
*r
= log
->recs
[i
];
1301 struct notify_rec_change_msg
*chg
;
1305 ok
= notifyd_parse_rec_change(r
->buf
.data
, r
->buf
.length
,
1308 DEBUG(3, ("%s: notifyd_parse_rec_change failed\n",
1313 ok
= notifyd_apply_rec_change(&r
->src
, chg
->path
, pathlen
,
1314 &chg
->instance
, peer
->db
,
1315 state
->sys_notify_watch
,
1316 state
->sys_notify_ctx
,
1319 DEBUG(3, ("%s: notifyd_apply_rec_change failed\n",
1325 peer
->rec_index
+= 1;
1326 peer
->last_broadcast
= time(NULL
);
1332 DEBUG(10, ("%s: Dropping peer %s\n", __func__
,
1333 server_id_str_buf(peer
->pid
, &idbuf
)));
1338 * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
1339 * messages) broadcasts by other notifyds. Several cases:
1341 * We don't know the source. This creates a new peer. Creating a peer
1342 * involves asking the peer for its full database. We assume ordered
1343 * messages, so the new database will arrive before the next broadcast
1346 * We know the source and the log index matches. We will apply the log
1347 * locally to our peer's db as if we had received it from a local
1350 * We know the source but the log index does not match. This means we
1351 * lost a message. We just drop the whole peer and wait for the next
1352 * broadcast, which will then trigger a fresh database pull.
1355 static void notifyd_snoop_broadcast(uint32_t src_vnn
, uint32_t dst_vnn
,
1357 const uint8_t *msg
, size_t msglen
,
1360 struct notifyd_state
*state
= talloc_get_type_abort(
1361 private_data
, struct notifyd_state
);
1362 struct server_id my_id
= messaging_server_id(state
->msg_ctx
);
1363 struct notifyd_peer
*p
;
1366 struct server_id src
, dst
;
1367 struct server_id_buf idbuf
;
1370 if (msglen
< MESSAGE_HDR_LENGTH
) {
1371 DEBUG(10, ("%s: Got short broadcast\n", __func__
));
1374 message_hdr_get(&msg_type
, &src
, &dst
, msg
);
1376 if (msg_type
!= MSG_SMB_NOTIFY_REC_CHANGES
) {
1377 DEBUG(10, ("%s Got message %u, ignoring\n", __func__
,
1378 (unsigned)msg_type
));
1381 if (server_id_equal(&src
, &my_id
)) {
1382 DEBUG(10, ("%s: Ignoring my own broadcast\n", __func__
));
1386 DEBUG(10, ("%s: Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
1387 __func__
, server_id_str_buf(src
, &idbuf
)));
1389 for (i
=0; i
<state
->num_peers
; i
++) {
1390 if (server_id_equal(&state
->peers
[i
]->pid
, &src
)) {
1392 DEBUG(10, ("%s: Applying changes to peer %u\n",
1393 __func__
, (unsigned)i
));
1395 notifyd_apply_reclog(state
->peers
[i
],
1396 msg
+ MESSAGE_HDR_LENGTH
,
1397 msglen
- MESSAGE_HDR_LENGTH
);
1402 DEBUG(10, ("%s: Creating new peer for %s\n", __func__
,
1403 server_id_str_buf(src
, &idbuf
)));
1405 p
= notifyd_peer_new(state
, src
);
1407 DEBUG(10, ("%s: notifyd_peer_new failed\n", __func__
));
1411 status
= messaging_send_buf(state
->msg_ctx
, src
, MSG_SMB_NOTIFY_GET_DB
,
1413 if (!NT_STATUS_IS_OK(status
)) {
1414 DEBUG(10, ("%s: messaging_send_buf failed: %s\n",
1415 __func__
, nt_errstr(status
)));
1421 struct notifyd_parse_db_state
{
1422 bool (*fn
)(const char *path
,
1423 struct server_id server
,
1424 const struct notify_instance
*instance
,
1425 void *private_data
);
1429 static bool notifyd_parse_db_parser(TDB_DATA key
, TDB_DATA value
,
1432 struct notifyd_parse_db_state
*state
= private_data
;
1433 char path
[key
.dsize
+1];
1434 struct notifyd_instance
*instances
= NULL
;
1435 size_t num_instances
= 0;
1439 memcpy(path
, key
.dptr
, key
.dsize
);
1440 path
[key
.dsize
] = 0;
1442 ok
= notifyd_parse_entry(value
.dptr
, value
.dsize
, &instances
,
1445 DEBUG(10, ("%s: Could not parse entry for path %s\n",
1450 for (i
=0; i
<num_instances
; i
++) {
1451 ok
= state
->fn(path
, instances
[i
].client
,
1452 &instances
[i
].instance
,
1453 state
->private_data
);
1462 int notifyd_parse_db(const uint8_t *buf
, size_t buflen
,
1463 uint64_t *log_index
,
1464 bool (*fn
)(const char *path
,
1465 struct server_id server
,
1466 const struct notify_instance
*instance
,
1467 void *private_data
),
1470 struct notifyd_parse_db_state state
= {
1471 .fn
= fn
, .private_data
= private_data
1478 *log_index
= BVAL(buf
, 0);
1483 status
= dbwrap_parse_marshall_buf(
1484 buf
, buflen
, notifyd_parse_db_parser
, &state
);
1485 if (!NT_STATUS_IS_OK(status
)) {
1486 return map_errno_from_nt_status(status
);