From 802b282a8f24610dd4ba0b7d3032d400fa5b70ec Mon Sep 17 00:00:00 2001 From: Volker Lendecke Date: Fri, 21 Nov 2014 16:52:47 +0100 Subject: [PATCH] smbd: Add the notify daemon This adds the notify daemon listening on MSG_SMB_NOTIFY_REC_CHANGE and MSG_SMB_NOTIFY_TRIGGER messages. It relies on ctdbd to distribute the notify database and events in a cluster. Signed-off-by: Volker Lendecke Reviewed-by: Jeremy Allison --- librpc/idl/messaging.idl | 13 + source3/smbd/notifyd/notifyd.c | 1419 ++++++++++++++++++++++++++++++++++++ source3/smbd/notifyd/notifyd.h | 143 ++++ source3/smbd/notifyd/tests.c | 118 +++ source3/smbd/notifyd/wscript_build | 12 + source3/wscript_build | 1 + 6 files changed, 1706 insertions(+) create mode 100644 source3/smbd/notifyd/notifyd.c create mode 100644 source3/smbd/notifyd/notifyd.h create mode 100644 source3/smbd/notifyd/tests.c create mode 100644 source3/smbd/notifyd/wscript_build diff --git a/librpc/idl/messaging.idl b/librpc/idl/messaging.idl index 2b902ec0a1d..ca99f414a1e 100644 --- a/librpc/idl/messaging.idl +++ b/librpc/idl/messaging.idl @@ -99,6 +99,13 @@ interface messaging /* Cancel a notify, directory got deleted */ MSG_SMB_NOTIFY_CANCEL_DELETED = 0x0319, + /* notifyd messages */ + MSG_SMB_NOTIFY_REC_CHANGE = 0x031A, + MSG_SMB_NOTIFY_TRIGGER = 0x031B, + MSG_SMB_NOTIFY_GET_DB = 0x031C, + MSG_SMB_NOTIFY_DB = 0x031D, + MSG_SMB_NOTIFY_REC_CHANGES = 0x031E, + /* winbind messages */ MSG_WINBIND_FINISHED = 0x0401, MSG_WINBIND_FORGET_STATE = 0x0402, @@ -152,4 +159,10 @@ interface messaging uint8 num_fds; dlong fds[num_fds]; } messaging_rec; + + typedef [public] struct { + hyper rec_index; + uint32 num_recs; + messaging_rec *recs[num_recs]; + } messaging_reclog; } diff --git a/source3/smbd/notifyd/notifyd.c b/source3/smbd/notifyd/notifyd.c new file mode 100644 index 00000000000..0f31a61fd1f --- /dev/null +++ b/source3/smbd/notifyd/notifyd.c @@ -0,0 +1,1419 @@ +/* + * Unix SMB/CIFS implementation. + * + * Copyright (C) Volker Lendecke 2014 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include "includes.h" +#include "librpc/gen_ndr/notify.h" +#include "librpc/gen_ndr/messaging.h" +#include "librpc/gen_ndr/server_id.h" +#include "lib/dbwrap/dbwrap.h" +#include "lib/dbwrap/dbwrap_rbt.h" +#include "messages.h" +#include "proto.h" +#include "tdb.h" +#include "util_tdb.h" +#include "notifyd.h" +#include "lib/util/server_id_db.h" +#include "lib/util/tevent_unix.h" +#include "ctdbd_conn.h" +#include "ctdb_srvids.h" +#include "source3/smbd/proto.h" +#include "ctdb/include/ctdb_protocol.h" +#include "server_id_db_util.h" +#include "lib/util/iov_buf.h" +#include "messages_util.h" + +struct notifyd_peer; + +/* + * All of notifyd's state + */ + +struct notifyd_state { + struct tevent_context *ev; + struct messaging_context *msg_ctx; + struct ctdbd_connection *ctdbd_conn; + + /* + * Database of everything clients show interest in. Indexed by + * absolute path. The database keys are not 0-terminated + * because the criticial operation, notifyd_trigger, can walk + * the structure from the top without adding intermediate 0s. + * The database records contain an array of + * + * struct notifyd_instance + * + * to be maintained by parsed by notifyd_entry_parse() + */ + struct db_context *entries; + + /* + * In the cluster case, this is the place where we store a log + * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1 + * forward them to our peer notifyd's in the cluster once a + * second or when the log grows too large. + */ + + struct messaging_reclog *log; + + /* + * Array of companion notifyd's in a cluster. Every notifyd + * broadcasts its messaging_reclog to every other notifyd in + * the cluster. This is done by making ctdb send a message to + * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node + * number CTDB_BROADCAST_VNNMAP. Everybody in the cluster who + * had called register_with_ctdbd this srvid will receive the + * broadcasts. + * + * Database replication happens via these broadcasts. Also, + * they serve as liveness indication. If a notifyd receives a + * broadcast from an unknown peer, it will create one for this + * srvid. Also when we don't hear anything from a peer for a + * while, we will discard it. + */ + + struct notifyd_peer **peers; + size_t num_peers; + + sys_notify_watch_fn sys_notify_watch; + struct sys_notify_context *sys_notify_ctx; +}; + +/* + * notifyd's representation of a notify instance + */ +struct notifyd_instance { + struct server_id client; + struct notify_instance instance; + + void *sys_watch; /* inotify/fam/etc handle */ + + /* + * Filters after sys_watch took responsibility of some bits + */ + uint32_t internal_filter; + uint32_t internal_subdir_filter; +}; + +struct notifyd_peer { + struct notifyd_state *state; + struct server_id pid; + uint64_t rec_index; + struct db_context *db; + time_t last_broadcast; +}; + +static bool notifyd_rec_change(struct messaging_context *msg_ctx, + struct messaging_rec **prec, + void *private_data); +static bool notifyd_trigger(struct messaging_context *msg_ctx, + struct messaging_rec **prec, + void *private_data); +static bool notifyd_get_db(struct messaging_context *msg_ctx, + struct messaging_rec **prec, + void *private_data); +static bool notifyd_got_db(struct messaging_context *msg_ctx, + struct messaging_rec **prec, + void *private_data); +static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn, + struct server_id src, + struct messaging_reclog *log); +static void notifyd_sys_callback(struct sys_notify_context *ctx, + void *private_data, struct notify_event *ev); + +static struct tevent_req *notifyd_broadcast_reclog_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdbd_connection *ctdbd_conn, struct server_id src, + struct messaging_reclog *log); +static int notifyd_broadcast_reclog_recv(struct tevent_req *req); + +static struct tevent_req *notifyd_clean_peers_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct notifyd_state *notifyd); +static int notifyd_clean_peers_recv(struct tevent_req *req); + +static int sys_notify_watch_dummy( + TALLOC_CTX *mem_ctx, + struct sys_notify_context *ctx, + const char *path, + uint32_t *filter, + uint32_t *subdir_filter, + void (*callback)(struct sys_notify_context *ctx, + void *private_data, + struct notify_event *ev), + void *private_data, + void *handle_p) +{ + void **handle = handle_p; + *handle = NULL; + return 0; +} + +static void notifyd_handler_done(struct tevent_req *subreq); +static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq); +static void notifyd_clean_peers_finished(struct tevent_req *subreq); +static void notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn, + uint64_t dst_srvid, + const uint8_t *msg, size_t msglen, + void *private_data); + +struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct messaging_context *msg_ctx, + struct ctdbd_connection *ctdbd_conn, + sys_notify_watch_fn sys_notify_watch, + struct sys_notify_context *sys_notify_ctx) +{ + struct tevent_req *req, *subreq; + struct notifyd_state *state; + struct server_id_db *names_db; + NTSTATUS status; + int ret; + + req = tevent_req_create(mem_ctx, &state, struct notifyd_state); + if (req == NULL) { + return NULL; + } + state->ev = ev; + state->msg_ctx = msg_ctx; + state->ctdbd_conn = ctdbd_conn; + + if (sys_notify_watch == NULL) { + sys_notify_watch = sys_notify_watch_dummy; + } + + state->sys_notify_watch = sys_notify_watch; + state->sys_notify_ctx = sys_notify_ctx; + + state->entries = db_open_rbt(state); + if (tevent_req_nomem(state->entries, req)) { + return tevent_req_post(req, ev); + } + + subreq = messaging_handler_send(state, ev, msg_ctx, + MSG_SMB_NOTIFY_REC_CHANGE, + notifyd_rec_change, state); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, notifyd_handler_done, req); + + subreq = messaging_handler_send(state, ev, msg_ctx, + MSG_SMB_NOTIFY_TRIGGER, + notifyd_trigger, state); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, notifyd_handler_done, req); + + subreq = messaging_handler_send(state, ev, msg_ctx, + MSG_SMB_NOTIFY_GET_DB, + notifyd_get_db, state); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, notifyd_handler_done, req); + + subreq = messaging_handler_send(state, ev, msg_ctx, + MSG_SMB_NOTIFY_DB, + notifyd_got_db, state); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, notifyd_handler_done, req); + + names_db = messaging_names_db(msg_ctx); + + ret = server_id_db_set_exclusive(names_db, "notify-daemon"); + if (ret != 0) { + DEBUG(10, ("%s: server_id_db_add failed: %s\n", + __func__, strerror(ret))); + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + if (ctdbd_conn == NULL) { + /* + * No cluster around, skip the database replication + * engine + */ + return req; + } + + state->log = talloc_zero(state, struct messaging_reclog); + if (tevent_req_nomem(state->log, req)) { + return tevent_req_post(req, ev); + } + + subreq = notifyd_broadcast_reclog_send( + state->log, ev, ctdbd_conn, messaging_server_id(msg_ctx), + state->log); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, notifyd_broadcast_reclog_finished, + req); + + subreq = notifyd_clean_peers_send(state, ev, state); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, notifyd_clean_peers_finished, + req); + + status = register_with_ctdbd(ctdbd_conn, CTDB_SRVID_SAMBA_NOTIFY_PROXY, + notifyd_snoop_broadcast, state); + if (!NT_STATUS_IS_OK(status)) { + tevent_req_error(req, map_errno_from_nt_status(status)); + return tevent_req_post(req, ev); + } + + return req; +} + +static void notifyd_handler_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + int ret; + + ret = messaging_handler_recv(subreq); + TALLOC_FREE(subreq); + tevent_req_error(req, ret); +} + +static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + int ret; + + ret = notifyd_broadcast_reclog_recv(subreq); + TALLOC_FREE(subreq); + tevent_req_error(req, ret); +} + +static void notifyd_clean_peers_finished(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + int ret; + + ret = notifyd_clean_peers_recv(subreq); + TALLOC_FREE(subreq); + tevent_req_error(req, ret); +} + +int notifyd_recv(struct tevent_req *req) +{ + return tevent_req_simple_recv_unix(req); +} + +/* + * Parse an entry in the notifyd_context->entries database + */ + +static bool notifyd_parse_entry(uint8_t *buf, size_t buflen, + struct notifyd_instance **instances, + size_t *num_instances) +{ + if ((buflen % sizeof(struct notifyd_instance)) != 0) { + DEBUG(1, ("%s: invalid buffer size: %u\n", + __func__, (unsigned)buflen)); + return false; + } + + if (instances != NULL) { + *instances = (struct notifyd_instance *)buf; + } + if (num_instances != NULL) { + *num_instances = buflen / sizeof(struct notifyd_instance); + } + return true; +} + +static bool notifyd_apply_rec_change( + const struct server_id *client, + const char *path, size_t pathlen, + const struct notify_instance *chg, + struct db_context *entries, + sys_notify_watch_fn sys_notify_watch, + struct sys_notify_context *sys_notify_ctx, + struct messaging_context *msg_ctx) +{ + struct db_record *rec; + struct notifyd_instance *instances; + size_t num_instances; + size_t i; + struct notifyd_instance *instance; + TDB_DATA value; + NTSTATUS status; + bool ok = false; + + if (pathlen == 0) { + DEBUG(1, ("%s: pathlen==0\n", __func__)); + return false; + } + if (path[pathlen-1] != '\0') { + DEBUG(1, ("%s: path not 0-terminated\n", __func__)); + return false; + } + + DEBUG(10, ("%s: path=%s, filter=%u, subdir_filter=%u, " + "private_data=%p\n", __func__, path, + (unsigned)chg->filter, (unsigned)chg->subdir_filter, + chg->private_data)); + + rec = dbwrap_fetch_locked( + entries, entries, + make_tdb_data((const uint8_t *)path, pathlen-1)); + + if (rec == NULL) { + DEBUG(1, ("%s: dbwrap_fetch_locked failed\n", __func__)); + goto fail; + } + + num_instances = 0; + value = dbwrap_record_get_value(rec); + + if (value.dsize != 0) { + if (!notifyd_parse_entry(value.dptr, value.dsize, NULL, + &num_instances)) { + goto fail; + } + } + + /* + * Overallocate by one instance to avoid a realloc when adding + */ + instances = talloc_array(rec, struct notifyd_instance, + num_instances + 1); + if (instances == NULL) { + DEBUG(1, ("%s: talloc failed\n", __func__)); + goto fail; + } + + if (value.dsize != 0) { + memcpy(instances, value.dptr, value.dsize); + } + + for (i=0; iclient, client) && + (instance->instance.private_data == chg->private_data)) { + break; + } + } + + if (i < num_instances) { + instance->instance = *chg; + } else { + /* + * We've overallocated for one instance + */ + instance = &instances[num_instances]; + + *instance = (struct notifyd_instance) { + .client = *client, + .instance = *chg, + .internal_filter = chg->filter, + .internal_subdir_filter = chg->subdir_filter + }; + + num_instances += 1; + } + + if ((instance->instance.filter != 0) || + (instance->instance.subdir_filter != 0)) { + int ret; + + TALLOC_FREE(instance->sys_watch); + + ret = sys_notify_watch(entries, sys_notify_ctx, path, + &instance->internal_filter, + &instance->internal_subdir_filter, + notifyd_sys_callback, msg_ctx, + &instance->sys_watch); + if (ret != 0) { + DEBUG(1, ("%s: inotify_watch returned %s\n", + __func__, strerror(errno))); + } + } + + if ((instance->instance.filter == 0) && + (instance->instance.subdir_filter == 0)) { + /* This is a delete request */ + TALLOC_FREE(instance->sys_watch); + *instance = instances[num_instances-1]; + num_instances -= 1; + } + + DEBUG(10, ("%s: %s has %u instances\n", __func__, + path, (unsigned)num_instances)); + + if (num_instances == 0) { + status = dbwrap_record_delete(rec); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(1, ("%s: dbwrap_record_delete returned %s\n", + __func__, nt_errstr(status))); + goto fail; + } + } else { + value = make_tdb_data( + (uint8_t *)instances, + sizeof(struct notifyd_instance) * num_instances); + + status = dbwrap_record_store(rec, value, 0); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(1, ("%s: dbwrap_record_store returned %s\n", + __func__, nt_errstr(status))); + goto fail; + } + } + + ok = true; +fail: + TALLOC_FREE(rec); + return ok; +} + +static void notifyd_sys_callback(struct sys_notify_context *ctx, + void *private_data, struct notify_event *ev) +{ + struct messaging_context *msg_ctx = talloc_get_type_abort( + private_data, struct messaging_context); + struct notify_trigger_msg msg; + struct iovec iov[4]; + char slash = '/'; + + msg = (struct notify_trigger_msg) { + .when = timespec_current(), + .action = ev->action, + .filter = UINT32_MAX + }; + + iov[0].iov_base = &msg; + iov[0].iov_len = offsetof(struct notify_trigger_msg, path); + iov[1].iov_base = discard_const_p(char, ev->dir); + iov[1].iov_len = strlen(ev->dir); + iov[2].iov_base = &slash; + iov[2].iov_len = 1; + iov[3].iov_base = discard_const_p(char, ev->path); + iov[3].iov_len = strlen(ev->path)+1; + + messaging_send_iov( + msg_ctx, messaging_server_id(msg_ctx), + MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0); +} + +static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize, + struct notify_rec_change_msg **pmsg, + size_t *pathlen) +{ + struct notify_rec_change_msg *msg; + + if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) { + DEBUG(1, ("%s: message too short, ignoring: %u\n", __func__, + (unsigned)bufsize)); + return false; + } + + *pmsg = msg = (struct notify_rec_change_msg *)buf; + *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path); + + DEBUG(10, ("%s: Got rec_change_msg filter=%u, subdir_filter=%u, " + "private_data=%p, path=%.*s\n", + __func__, (unsigned)msg->instance.filter, + (unsigned)msg->instance.subdir_filter, + msg->instance.private_data, (int)(*pathlen), msg->path)); + + return true; +} + +static bool notifyd_rec_change(struct messaging_context *msg_ctx, + struct messaging_rec **prec, + void *private_data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct server_id_buf idbuf; + struct messaging_rec *rec = *prec; + struct messaging_rec **tmp; + struct messaging_reclog *log; + struct notify_rec_change_msg *msg; + size_t pathlen; + bool ok; + + DEBUG(10, ("%s: Got %d bytes from %s\n", __func__, + (unsigned)rec->buf.length, + server_id_str_buf(rec->src, &idbuf))); + + ok = notifyd_parse_rec_change(rec->buf.data, rec->buf.length, + &msg, &pathlen); + if (!ok) { + return true; + } + + ok = notifyd_apply_rec_change( + &rec->src, msg->path, pathlen, &msg->instance, + state->entries, state->sys_notify_watch, state->sys_notify_ctx, + state->msg_ctx); + if (!ok) { + DEBUG(1, ("%s: notifyd_apply_rec_change failed, ignoring\n", + __func__)); + return true; + } + + if ((state->log == NULL) || (state->ctdbd_conn == NULL)) { + return true; + } + log = state->log; + + tmp = talloc_realloc(log, log->recs, struct messaging_rec *, + log->num_recs+1); + if (tmp == NULL) { + DEBUG(1, ("%s: talloc_realloc failed, ignoring\n", __func__)); + return true; + } + log->recs = tmp; + + log->recs[log->num_recs] = talloc_move(log->recs, prec); + log->num_recs += 1; + + if (log->num_recs >= 100) { + /* + * Don't let the log grow too large + */ + notifyd_broadcast_reclog(state->ctdbd_conn, + messaging_server_id(msg_ctx), log); + } + + return true; +} + +struct notifyd_trigger_state { + struct messaging_context *msg_ctx; + struct notify_trigger_msg *msg; + bool recursive; + bool covered_by_sys_notify; +}; + +static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data, + void *private_data); + +static bool notifyd_trigger(struct messaging_context *msg_ctx, + struct messaging_rec **prec, + void *private_data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct server_id my_id = messaging_server_id(msg_ctx); + struct messaging_rec *rec = *prec; + struct notifyd_trigger_state tstate; + const char *path; + const char *p, *next_p; + + if (rec->buf.length < offsetof(struct notify_trigger_msg, path) + 1) { + DEBUG(1, ("message too short, ignoring: %u\n", + (unsigned)rec->buf.length)); + return true; + } + if (rec->buf.data[rec->buf.length-1] != 0) { + DEBUG(1, ("%s: path not 0-terminated, ignoring\n", __func__)); + return true; + } + + tstate.msg_ctx = msg_ctx; + + tstate.covered_by_sys_notify = (rec->src.vnn == my_id.vnn); + tstate.covered_by_sys_notify &= !server_id_equal(&rec->src, &my_id); + + tstate.msg = (struct notify_trigger_msg *)rec->buf.data; + path = tstate.msg->path; + + DEBUG(10, ("%s: Got trigger_msg action=%u, filter=%u, path=%s\n", + __func__, (unsigned)tstate.msg->action, + (unsigned)tstate.msg->filter, path)); + + if (path[0] != '/') { + DEBUG(1, ("%s: path %s does not start with /, ignoring\n", + __func__, path)); + return true; + } + + for (p = strchr(path+1, '/'); p != NULL; p = next_p) { + ptrdiff_t path_len = p - path; + TDB_DATA key; + uint32_t i; + + next_p = strchr(p+1, '/'); + tstate.recursive = (next_p != NULL); + + DEBUG(10, ("%s: Trying path %.*s\n", __func__, + (int)path_len, path)); + + key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path), + .dsize = path_len }; + + dbwrap_parse_record(state->entries, key, + notifyd_trigger_parser, &tstate); + + if (state->peers == NULL) { + continue; + } + + if (rec->src.vnn != my_id.vnn) { + continue; + } + + for (i=0; inum_peers; i++) { + if (state->peers[i]->db == NULL) { + /* + * Inactive peer, did not get a db yet + */ + continue; + } + dbwrap_parse_record(state->peers[i]->db, key, + notifyd_trigger_parser, &tstate); + } + } + + return true; +} + +static void notifyd_send_delete(struct messaging_context *msg_ctx, + TDB_DATA key, + struct notifyd_instance *instance); + +static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data, + void *private_data) + +{ + struct notifyd_trigger_state *tstate = private_data; + struct notify_event_msg msg = { .action = tstate->msg->action }; + struct iovec iov[2]; + size_t path_len = key.dsize; + struct notifyd_instance *instances = NULL; + size_t num_instances = 0; + size_t i; + + if (!notifyd_parse_entry(data.dptr, data.dsize, &instances, + &num_instances)) { + DEBUG(1, ("%s: Could not parse notifyd_entry\n", __func__)); + return; + } + + DEBUG(10, ("%s: Found %u instances for %.*s\n", __func__, + (unsigned)num_instances, (int)key.dsize, + (char *)key.dptr)); + + iov[0].iov_base = &msg; + iov[0].iov_len = offsetof(struct notify_event_msg, path); + iov[1].iov_base = tstate->msg->path + path_len + 1; + iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1; + + for (i=0; icovered_by_sys_notify) { + if (tstate->recursive) { + i_filter = instance->internal_subdir_filter; + } else { + i_filter = instance->internal_filter; + } + } else { + if (tstate->recursive) { + i_filter = instance->instance.subdir_filter; + } else { + i_filter = instance->instance.filter; + } + } + + if ((i_filter & tstate->msg->filter) == 0) { + continue; + } + + msg.private_data = instance->instance.private_data; + + status = messaging_send_iov( + tstate->msg_ctx, instance->client, + MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0); + + DEBUG(10, ("%s: messaging_send_iov to %s returned %s\n", + __func__, + server_id_str_buf(instance->client, &idbuf), + nt_errstr(status))); + + if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) && + procid_is_local(&instance->client)) { + /* + * That process has died + */ + notifyd_send_delete(tstate->msg_ctx, key, instance); + continue; + } + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(1, ("%s: messaging_send_iov returned %s\n", + __func__, nt_errstr(status))); + } + } +} + +/* + * Send a delete request to ourselves to properly discard a notify + * record for an smbd that has died. + */ + +static void notifyd_send_delete(struct messaging_context *msg_ctx, + TDB_DATA key, + struct notifyd_instance *instance) +{ + struct notify_rec_change_msg msg = { + .instance.private_data = instance->instance.private_data + }; + uint8_t nul = 0; + struct iovec iov[3]; + NTSTATUS status; + + /* + * Send a rec_change to ourselves to delete a dead entry + */ + + iov[0] = (struct iovec) { + .iov_base = &msg, + .iov_len = offsetof(struct notify_rec_change_msg, path) }; + iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize }; + iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) }; + + status = messaging_send_iov_from( + msg_ctx, instance->client, messaging_server_id(msg_ctx), + MSG_SMB_NOTIFY_REC_CHANGE, iov, ARRAY_SIZE(iov), NULL, 0); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("%s: messaging_send_iov_from returned %s\n", + __func__, nt_errstr(status))); + } +} + +static bool notifyd_get_db(struct messaging_context *msg_ctx, + struct messaging_rec **prec, + void *private_data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct messaging_rec *rec = *prec; + struct server_id_buf id1, id2; + NTSTATUS status; + uint64_t rec_index = UINT64_MAX; + uint8_t index_buf[sizeof(uint64_t)]; + size_t dbsize; + uint8_t *buf; + struct iovec iov[2]; + + dbsize = dbwrap_marshall(state->entries, NULL, 0); + + buf = talloc_array(rec, uint8_t, dbsize); + if (buf == NULL) { + DEBUG(1, ("%s: talloc_array(%ju) failed\n", + __func__, (uintmax_t)dbsize)); + return true; + } + + dbsize = dbwrap_marshall(state->entries, buf, dbsize); + + if (dbsize != talloc_get_size(buf)) { + DEBUG(1, ("%s: dbsize changed: %ju->%ju\n", __func__, + (uintmax_t)talloc_get_size(buf), + (uintmax_t)dbsize)); + TALLOC_FREE(buf); + return true; + } + + if (state->log != NULL) { + rec_index = state->log->rec_index; + } + SBVAL(index_buf, 0, rec_index); + + iov[0] = (struct iovec) { .iov_base = index_buf, + .iov_len = sizeof(index_buf) }; + iov[1] = (struct iovec) { .iov_base = buf, + .iov_len = dbsize }; + + DEBUG(10, ("%s: Sending %ju bytes to %s->%s\n", __func__, + (uintmax_t)iov_buflen(iov, ARRAY_SIZE(iov)), + server_id_str_buf(messaging_server_id(msg_ctx), &id1), + server_id_str_buf(rec->src, &id2))); + + status = messaging_send_iov(msg_ctx, rec->src, MSG_SMB_NOTIFY_DB, + iov, ARRAY_SIZE(iov), NULL, 0); + TALLOC_FREE(buf); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(1, ("%s: messaging_send_iov failed: %s\n", + __func__, nt_errstr(status))); + } + + return true; +} + +static int notifyd_add_proxy_syswatches(struct db_record *rec, + void *private_data); + +static bool notifyd_got_db(struct messaging_context *msg_ctx, + struct messaging_rec **prec, + void *private_data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct messaging_rec *rec = *prec; + struct notifyd_peer *p = NULL; + struct server_id_buf idbuf; + NTSTATUS status; + int count; + size_t i; + + for (i=0; inum_peers; i++) { + if (server_id_equal(&rec->src, &state->peers[i]->pid)) { + p = state->peers[i]; + break; + } + } + + if (p == NULL) { + DEBUG(10, ("%s: Did not find peer for db from %s\n", + __func__, server_id_str_buf(rec->src, &idbuf))); + return true; + } + + if (rec->buf.length < 8) { + DEBUG(10, ("%s: Got short db length %u from %s\n", __func__, + (unsigned)rec->buf.length, + server_id_str_buf(rec->src, &idbuf))); + TALLOC_FREE(p); + return true; + } + + p->rec_index = BVAL(rec->buf.data, 0); + + p->db = db_open_rbt(p); + if (p->db == NULL) { + DEBUG(10, ("%s: db_open_rbt failed\n", __func__)); + TALLOC_FREE(p); + return true; + } + + status = dbwrap_unmarshall(p->db, rec->buf.data + 8, + rec->buf.length - 8); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("%s: dbwrap_unmarshall returned %s for db %s\n", + __func__, nt_errstr(status), + server_id_str_buf(rec->src, &idbuf))); + TALLOC_FREE(p); + return true; + } + + dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state, + &count); + + DEBUG(10, ("%s: Database from %s contained %d records\n", __func__, + server_id_str_buf(rec->src, &idbuf), count)); + + return true; +} + +static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn, + struct server_id src, + struct messaging_reclog *log) +{ + NTSTATUS status; + enum ndr_err_code ndr_err; + uint8_t msghdr[MESSAGE_HDR_LENGTH]; + DATA_BLOB blob; + struct iovec iov[2]; + + if (log == NULL) { + return; + } + + DEBUG(10, ("%s: rec_index=%ju, num_recs=%u\n", __func__, + (uintmax_t)log->rec_index, (unsigned)log->num_recs)); + + message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src, + (struct server_id) {0 }); + iov[0] = (struct iovec) { .iov_base = msghdr, + .iov_len = sizeof(msghdr) }; + + ndr_err = ndr_push_struct_blob( + &blob, log, log, + (ndr_push_flags_fn_t)ndr_push_messaging_reclog); + if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { + DEBUG(1, ("%s: ndr_push_messaging_recs failed: %s\n", + __func__, ndr_errstr(ndr_err))); + goto done; + } + iov[1] = (struct iovec) { .iov_base = blob.data, + .iov_len = blob.length }; + + status = ctdbd_messaging_send_iov( + ctdbd_conn, CTDB_BROADCAST_VNNMAP, + CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov)); + TALLOC_FREE(blob.data); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(1, ("%s: ctdbd_messaging_send failed: %s\n", + __func__, nt_errstr(status))); + goto done; + } + + log->rec_index += 1; + +done: + log->num_recs = 0; + TALLOC_FREE(log->recs); +} + +struct notifyd_broadcast_reclog_state { + struct tevent_context *ev; + struct ctdbd_connection *ctdbd_conn; + struct server_id src; + struct messaging_reclog *log; +}; + +static void notifyd_broadcast_reclog_next(struct tevent_req *subreq); + +static struct tevent_req *notifyd_broadcast_reclog_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdbd_connection *ctdbd_conn, struct server_id src, + struct messaging_reclog *log) +{ + struct tevent_req *req, *subreq; + struct notifyd_broadcast_reclog_state *state; + + req = tevent_req_create(mem_ctx, &state, + struct notifyd_broadcast_reclog_state); + if (req == NULL) { + return NULL; + } + state->ev = ev; + state->ctdbd_conn = ctdbd_conn; + state->src = src; + state->log = log; + + subreq = tevent_wakeup_send(state, state->ev, + timeval_current_ofs_msec(1000)); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req); + return req; +} + +static void notifyd_broadcast_reclog_next(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct notifyd_broadcast_reclog_state *state = tevent_req_data( + req, struct notifyd_broadcast_reclog_state); + bool ok; + + ok = tevent_wakeup_recv(subreq); + TALLOC_FREE(subreq); + if (!ok) { + tevent_req_oom(req); + return; + } + + notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log); + + subreq = tevent_wakeup_send(state, state->ev, + timeval_current_ofs_msec(1000)); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req); +} + +static int notifyd_broadcast_reclog_recv(struct tevent_req *req) +{ + return tevent_req_simple_recv_unix(req); +} + +struct notifyd_clean_peers_state { + struct tevent_context *ev; + struct notifyd_state *notifyd; +}; + +static void notifyd_clean_peers_next(struct tevent_req *subreq); + +static struct tevent_req *notifyd_clean_peers_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct notifyd_state *notifyd) +{ + struct tevent_req *req, *subreq; + struct notifyd_clean_peers_state *state; + + req = tevent_req_create(mem_ctx, &state, + struct notifyd_clean_peers_state); + if (req == NULL) { + return NULL; + } + state->ev = ev; + state->notifyd = notifyd; + + subreq = tevent_wakeup_send(state, state->ev, + timeval_current_ofs_msec(30000)); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, notifyd_clean_peers_next, req); + return req; +} + +static void notifyd_clean_peers_next(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct notifyd_clean_peers_state *state = tevent_req_data( + req, struct notifyd_clean_peers_state); + struct notifyd_state *notifyd = state->notifyd; + size_t i; + bool ok; + time_t now = time(NULL); + + ok = tevent_wakeup_recv(subreq); + TALLOC_FREE(subreq); + if (!ok) { + tevent_req_oom(req); + return; + } + + i = 0; + while (i < notifyd->num_peers) { + struct notifyd_peer *p = notifyd->peers[i]; + + if ((now - p->last_broadcast) > 60) { + struct server_id_buf idbuf; + + /* + * Haven't heard for more than 60 seconds. Call this + * peer dead + */ + + DEBUG(10, ("%s: peer %s died\n", __func__, + server_id_str_buf(p->pid, &idbuf))); + /* + * This implicitly decrements notifyd->num_peers + */ + TALLOC_FREE(p); + } else { + i += 1; + } + } + + subreq = tevent_wakeup_send(state, state->ev, + timeval_current_ofs_msec(30000)); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, notifyd_clean_peers_next, req); +} + +static int notifyd_clean_peers_recv(struct tevent_req *req) +{ + return tevent_req_simple_recv_unix(req); +} + +static int notifyd_add_proxy_syswatches(struct db_record *rec, + void *private_data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct db_context *db = dbwrap_record_get_db(rec); + TDB_DATA key = dbwrap_record_get_key(rec); + TDB_DATA value = dbwrap_record_get_value(rec); + struct notifyd_instance *instances = NULL; + size_t num_instances = 0; + size_t i; + char path[key.dsize+1]; + bool ok; + + memcpy(path, key.dptr, key.dsize); + path[key.dsize] = '\0'; + + ok = notifyd_parse_entry(value.dptr, value.dsize, &instances, + &num_instances); + if (!ok) { + DEBUG(1, ("%s: Could not parse notifyd entry for %s\n", + __func__, path)); + return 0; + } + + for (i=0; iinstance.filter; + uint32_t subdir_filter = instance->instance.subdir_filter; + int ret; + + ret = state->sys_notify_watch( + db, state->sys_notify_ctx, path, + &filter, &subdir_filter, + notifyd_sys_callback, state->msg_ctx, + &instance->sys_watch); + if (ret != 0) { + DEBUG(1, ("%s: inotify_watch returned %s\n", + __func__, strerror(errno))); + } + } + + return 0; +} + +static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data) +{ + TDB_DATA key = dbwrap_record_get_key(rec); + TDB_DATA value = dbwrap_record_get_value(rec); + struct notifyd_instance *instances = NULL; + size_t num_instances = 0; + size_t i; + bool ok; + + ok = notifyd_parse_entry(value.dptr, value.dsize, &instances, + &num_instances); + if (!ok) { + DEBUG(1, ("%s: Could not parse notifyd entry for %.*s\n", + __func__, (int)key.dsize, (char *)key.dptr)); + return 0; + } + for (i=0; istate; + size_t i; + + dbwrap_traverse_read(p->db, notifyd_db_del_syswatches, NULL, NULL); + + for (i = 0; inum_peers; i++) { + if (p == state->peers[i]) { + state->peers[i] = state->peers[state->num_peers-1]; + state->num_peers -= 1; + break; + } + } + return 0; +} + +static struct notifyd_peer *notifyd_peer_new( + struct notifyd_state *state, struct server_id pid) +{ + struct notifyd_peer *p, **tmp; + + tmp = talloc_realloc(state, state->peers, struct notifyd_peer *, + state->num_peers+1); + if (tmp == NULL) { + return NULL; + } + state->peers = tmp; + + p = talloc_zero(state->peers, struct notifyd_peer); + if (p == NULL) { + return NULL; + } + p->state = state; + p->pid = pid; + + state->peers[state->num_peers] = p; + state->num_peers += 1; + + talloc_set_destructor(p, notifyd_peer_destructor); + + return p; +} + +static void notifyd_apply_reclog(struct notifyd_peer *peer, + const uint8_t *msg, size_t msglen) +{ + struct notifyd_state *state = peer->state; + DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg), + .length = msglen }; + struct server_id_buf idbuf; + struct messaging_reclog *log; + enum ndr_err_code ndr_err; + uint32_t i; + + if (peer->db == NULL) { + /* + * No db yet + */ + return; + } + + log = talloc(peer, struct messaging_reclog); + if (log == NULL) { + DEBUG(10, ("%s: talloc failed\n", __func__)); + return; + } + + ndr_err = ndr_pull_struct_blob_all( + &blob, log, log, + (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog); + if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { + DEBUG(10, ("%s: ndr_pull_messaging_reclog failed: %s\n", + __func__, ndr_errstr(ndr_err))); + goto fail; + } + + DEBUG(10, ("%s: Got %u recs index %ju from %s\n", __func__, + (unsigned)log->num_recs, (uintmax_t)log->rec_index, + server_id_str_buf(peer->pid, &idbuf))); + + if (log->rec_index != peer->rec_index) { + DEBUG(3, ("%s: Got rec index %ju from %s, expected %ju\n", + __func__, (uintmax_t)log->rec_index, + server_id_str_buf(peer->pid, &idbuf), + (uintmax_t)peer->rec_index)); + goto fail; + } + + for (i=0; inum_recs; i++) { + struct messaging_rec *r = log->recs[i]; + struct notify_rec_change_msg *chg; + size_t pathlen; + bool ok; + + ok = notifyd_parse_rec_change(r->buf.data, r->buf.length, + &chg, &pathlen); + if (!ok) { + DEBUG(3, ("%s: notifyd_parse_rec_change failed\n", + __func__)); + goto fail; + } + + ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen, + &chg->instance, peer->db, + state->sys_notify_watch, + state->sys_notify_ctx, + state->msg_ctx); + if (!ok) { + DEBUG(3, ("%s: notifyd_apply_rec_change failed\n", + __func__)); + goto fail; + } + } + + peer->rec_index += 1; + peer->last_broadcast = time(NULL); + + TALLOC_FREE(log); + return; + +fail: + DEBUG(10, ("%s: Dropping peer %s\n", __func__, + server_id_str_buf(peer->pid, &idbuf))); + TALLOC_FREE(peer); +} + +/* + * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE + * messages) broadcasts by other notifyds. Several cases: + * + * We don't know the source. This creates a new peer. Creating a peer + * involves asking the peer for its full database. We assume ordered + * messages, so the new database will arrive before the next broadcast + * will. + * + * We know the source and the log index matches. We will apply the log + * locally to our peer's db as if we had received it from a local + * client. + * + * We know the source but the log index does not match. This means we + * lost a message. We just drop the whole peer and wait for the next + * broadcast, which will then trigger a fresh database pull. + */ + +static void notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn, + uint64_t dst_srvid, + const uint8_t *msg, size_t msglen, + void *private_data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct server_id my_id = messaging_server_id(state->msg_ctx); + struct notifyd_peer *p; + uint32_t i; + uint32_t msg_type; + struct server_id src, dst; + struct server_id_buf idbuf; + NTSTATUS status; + + if (msglen < MESSAGE_HDR_LENGTH) { + DEBUG(10, ("%s: Got short broadcast\n", __func__)); + return; + } + message_hdr_get(&msg_type, &src, &dst, msg); + + if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) { + DEBUG(10, ("%s Got message %u, ignoring\n", __func__, + (unsigned)msg_type)); + return; + } + if (server_id_equal(&src, &my_id)) { + DEBUG(10, ("%s: Ignoring my own broadcast\n", __func__)); + return; + } + + DEBUG(10, ("%s: Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n", + __func__, server_id_str_buf(src, &idbuf))); + + for (i=0; inum_peers; i++) { + if (server_id_equal(&state->peers[i]->pid, &src)) { + + DEBUG(10, ("%s: Applying changes to peer %u\n", + __func__, (unsigned)i)); + + notifyd_apply_reclog(state->peers[i], + msg + MESSAGE_HDR_LENGTH, + msglen - MESSAGE_HDR_LENGTH); + return; + } + } + + DEBUG(10, ("%s: Creating new peer for %s\n", __func__, + server_id_str_buf(src, &idbuf))); + + p = notifyd_peer_new(state, src); + if (p == NULL) { + DEBUG(10, ("%s: notifyd_peer_new failed\n", __func__)); + return; + } + + status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB, + NULL, 0); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("%s: messaging_send_buf failed: %s\n", + __func__, nt_errstr(status))); + TALLOC_FREE(p); + return; + } +} diff --git a/source3/smbd/notifyd/notifyd.h b/source3/smbd/notifyd/notifyd.h new file mode 100644 index 00000000000..dc0a4e8392d --- /dev/null +++ b/source3/smbd/notifyd/notifyd.h @@ -0,0 +1,143 @@ +/* + * Unix SMB/CIFS implementation. + * + * Copyright (C) Volker Lendecke 2014 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef __NOTIFYD_NOTIFYD_H__ +#define __NOTIFYD_NOTIFYD_H__ + +#include "includes.h" +#include "librpc/gen_ndr/notify.h" +#include "librpc/gen_ndr/messaging.h" +#include "lib/dbwrap/dbwrap.h" +#include "lib/dbwrap/dbwrap_rbt.h" +#include "messages.h" +#include "tdb.h" +#include "util_tdb.h" + +/* + * Filechangenotify based on asynchronous messages + * + * smbds talk to local notify daemons to inform them about paths they are + * interested in. They also tell local notify daemons about changes they have + * done to the file system. There's two message types from smbd to + * notifyd. The first is used to inform notifyd about changes in notify + * interest. These are only sent from smbd to notifyd if the SMB client issues + * FileChangeNotify requests. + */ + +/* + * The notifyd implementation is designed to cope with multiple daemons taking + * care of just a subset of smbds. The goal is to minimize the traffic between + * the notify daemons. The idea behind this is a samba/ctdb cluster, but it + * could also be used to spread the load of notifyd instances on a single + * node, should this become a bottleneck. The following diagram illustrates + * the setup. The numbers in the boxes are node:process ids. + * + * +-----------+ +-----------+ + * |notifyd 0:5|------------------|notifyd 1:6| + * +-----------+ +-----------+ + * / | \ / \ + * / | \ / \ + * +--------+ | +--------+ +--------+ +--------+ + * |smbd 0:1| | |smbd 0:4| |smbd 1:7| |smbd 1:2| + * +--------+ | +--------+ +--------+ +--------+ + * | + * +---------+ + * |smbd 0:20| + * +---------+ + * + * Suppose 0:1 and 0:4 are interested in changes for /foo and 0:20 creates the + * file /foo/bar, if everything fully connected, 0:20 would have to send two + * local messages, one to 0:1 and one to 0:4. With the notifyd design, 0:20 + * only has to send one message, it lets notifyd 0:5 do the hard work to + * multicast the change to 0:1 and 0:4. + * + * Now lets assume 1:7 on the other node creates /foo/baz. It tells its + * notifyd 1:6 about this change. All 1:6 will know about is that its peer + * notifyd 0:5 is interested in the change. Thus it forwards the event to 0:5, + * which sees it as if it came from just another local event creator. 0:5 will + * multicast the change to 0:1 and 0:4. To prevent notify loops, the message + * from 1:6 to 0:5 will carry a "proxied" flag, so that 0:5 will only forward + * the event to local clients. + */ + +/* + * Data that notifyd maintains per smbd notify instance + */ +struct notify_instance { + struct timespec creation_time; + uint32_t filter; + uint32_t subdir_filter; + void *private_data; +}; + +/* MSG_SMB_NOTIFY_REC_CHANGE payload */ +struct notify_rec_change_msg { + struct notify_instance instance; + char path[]; +}; + +/* + * The second message from smbd to notifyd is sent whenever an smbd makes a + * file system change. It tells notifyd to inform all interested parties about + * that change. This is the message that needs to be really fast in smbd + * because it is called a lot. + */ + +/* MSG_SMB_NOTIFY_TRIGGER payload */ +struct notify_trigger_msg { + struct timespec when; + uint32_t action; + uint32_t filter; + char path[]; +}; + +/* + * In response to a MSG_SMB_NOTIFY_TRIGGER message notifyd walks its database + * and sends out the following message to all interested clients + */ + +/* MSG_PVFS_NOTIFY payload */ +struct notify_event_msg { + struct timespec when; + void *private_data; + uint32_t action; + char path[]; +}; + +struct sys_notify_context; + +typedef int (*sys_notify_watch_fn)(TALLOC_CTX *mem_ctx, + struct sys_notify_context *ctx, + const char *path, + uint32_t *filter, + uint32_t *subdir_filter, + void (*callback)(struct sys_notify_context *ctx, + void *private_data, + struct notify_event *ev), + void *private_data, + void *handle_p); + +struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct messaging_context *msg_ctx, + struct ctdbd_connection *ctdbd_conn, + sys_notify_watch_fn sys_notify_watch, + struct sys_notify_context *sys_notify_ctx); +int notifyd_recv(struct tevent_req *req); + +#endif diff --git a/source3/smbd/notifyd/tests.c b/source3/smbd/notifyd/tests.c new file mode 100644 index 00000000000..6bcce6aa9fb --- /dev/null +++ b/source3/smbd/notifyd/tests.c @@ -0,0 +1,118 @@ +/* + * Unix SMB/CIFS implementation. + * + * Copyright (C) Volker Lendecke 2014 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include "replace.h" +#include "notifyd.h" +#include "messages.h" +#include "lib/util/server_id_db.h" + +int main(int argc, const char *argv[]) +{ + TALLOC_CTX *frame = talloc_stackframe(); + struct tevent_context *ev; + struct messaging_context *msg_ctx; + struct server_id_db *names; + struct server_id notifyd; + struct tevent_req *req; + unsigned i; + bool ok; + + if (argc != 2) { + fprintf(stderr, "Usage: %s \n", argv[0]); + exit(1); + } + + setup_logging(argv[0], DEBUG_STDOUT); + lp_load_global(argv[1]); + + ev = tevent_context_init(NULL); + if (ev == NULL) { + fprintf(stderr, "tevent_context_init failed\n"); + exit(1); + } + + msg_ctx = messaging_init(ev, ev); + if (msg_ctx == NULL) { + fprintf(stderr, "messaging_init failed\n"); + exit(1); + } + + names = messaging_names_db(msg_ctx); + + ok = server_id_db_lookup_one(names, "notify-daemon", ¬ifyd); + if (!ok) { + fprintf(stderr, "no notifyd\n"); + exit(1); + } + + for (i=0; i<50000; i++) { + struct notify_rec_change_msg msg = { + .instance.filter = UINT32_MAX, + .instance.subdir_filter = UINT32_MAX + }; + char path[64]; + size_t len; + struct iovec iov[2]; + NTSTATUS status; + + len = snprintf(path, sizeof(path), "/tmp%u", i); + + iov[0].iov_base = &msg; + iov[0].iov_len = offsetof(struct notify_rec_change_msg, path); + iov[1].iov_base = path; + iov[1].iov_len = len+1; + + status = messaging_send_iov( + msg_ctx, notifyd, MSG_SMB_NOTIFY_REC_CHANGE, + iov, ARRAY_SIZE(iov), NULL, 0); + if (!NT_STATUS_IS_OK(status)) { + fprintf(stderr, "messaging_send_iov returned %s\n", + nt_errstr(status)); + exit(1); + } + + msg.instance.filter = 0; + msg.instance.subdir_filter = 0; + + status = messaging_send_iov( + msg_ctx, notifyd, MSG_SMB_NOTIFY_REC_CHANGE, + iov, ARRAY_SIZE(iov), NULL, 0); + if (!NT_STATUS_IS_OK(status)) { + fprintf(stderr, "messaging_send_iov returned %s\n", + nt_errstr(status)); + exit(1); + } + } + + req = messaging_read_send(ev, ev, msg_ctx, MSG_PONG); + if (req == NULL) { + fprintf(stderr, "messaging_read_send failed\n"); + exit(1); + } + messaging_send_buf(msg_ctx, notifyd, MSG_PING, NULL, 0); + + ok = tevent_req_poll(req, ev); + if (!ok) { + fprintf(stderr, "tevent_req_poll failed\n"); + exit(1); + } + + TALLOC_FREE(frame); + return 0; +} diff --git a/source3/smbd/notifyd/wscript_build b/source3/smbd/notifyd/wscript_build new file mode 100644 index 00000000000..90a9505e4ac --- /dev/null +++ b/source3/smbd/notifyd/wscript_build @@ -0,0 +1,12 @@ +#!/usr/bin/env python + +bld.SAMBA3_SUBSYSTEM('notifyd', + source='notifyd.c', + deps='util_tdb TDB_LIB messages_util') + +bld.SAMBA3_BINARY('notifyd-tests', + source='tests.c', + install=False, + deps=''' + param + ''') diff --git a/source3/wscript_build b/source3/wscript_build index 9863b485457..e9f276cc1fe 100755 --- a/source3/wscript_build +++ b/source3/wscript_build @@ -1536,6 +1536,7 @@ bld.RECURSE('../examples/pdb') bld.RECURSE('../examples/VFS') bld.RECURSE('lib/netapi/tests') bld.RECURSE('lib/netapi/examples') +bld.RECURSE('smbd/notifyd') bld.ENFORCE_GROUP_ORDERING() bld.CHECK_PROJECT_RULES() -- 2.11.4.GIT