VERSION: Release Samba 4.0.0rc1
[Samba/gebeck_regimport.git] / source3 / lib / dbwrap / dbwrap_watch.c
blob701ac9d02f2c6ddae2750fde7686d6df4ae8a393
1 /*
2 Unix SMB/CIFS implementation.
3 Watch dbwrap record changes
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "dbwrap/dbwrap.h"
23 #include "dbwrap_watch.h"
24 #include "dbwrap_open.h"
25 #include "msg_channel.h"
26 #include "lib/util/util_tdb.h"
27 #include "lib/util/tevent_ntstatus.h"
29 static struct db_context *dbwrap_record_watchers_db(void)
31 static struct db_context *watchers_db;
33 if (watchers_db == NULL) {
34 watchers_db = db_open(NULL, lock_path("dbwrap_watchers.tdb"),
35 0, TDB_CLEAR_IF_FIRST, O_RDWR|O_CREAT,
36 0600, DBWRAP_LOCK_ORDER_3);
38 return watchers_db;
41 static TDB_DATA dbwrap_record_watchers_key(TALLOC_CTX *mem_ctx,
42 struct db_context *db,
43 struct db_record *rec,
44 TDB_DATA *rec_key)
46 const uint8_t *db_id;
47 size_t db_id_len;
48 TDB_DATA key, wkey;
50 dbwrap_db_id(db, &db_id, &db_id_len);
51 key = dbwrap_record_get_key(rec);
53 wkey.dsize = sizeof(uint32_t) + db_id_len + key.dsize;
54 wkey.dptr = talloc_array(mem_ctx, uint8_t, wkey.dsize);
55 if (wkey.dptr == NULL) {
56 return make_tdb_data(NULL, 0);
58 SIVAL(wkey.dptr, 0, db_id_len);
59 memcpy(wkey.dptr + sizeof(uint32_t), db_id, db_id_len);
60 memcpy(wkey.dptr + sizeof(uint32_t) + db_id_len, key.dptr, key.dsize);
62 if (rec_key != NULL) {
63 rec_key->dptr = wkey.dptr + sizeof(uint32_t) + db_id_len;
64 rec_key->dsize = key.dsize;
67 return wkey;
70 static bool dbwrap_record_watchers_key_parse(
71 TDB_DATA wkey, uint8_t **p_db_id, size_t *p_db_id_len, TDB_DATA *key)
73 size_t db_id_len;
75 if (wkey.dsize < sizeof(uint32_t)) {
76 DEBUG(1, ("Invalid watchers key\n"));
77 return false;
79 db_id_len = IVAL(wkey.dptr, 0);
80 if (db_id_len > (wkey.dsize - sizeof(uint32_t))) {
81 DEBUG(1, ("Invalid watchers key, wkey.dsize=%d, "
82 "db_id_len=%d\n", (int)wkey.dsize, (int)db_id_len));
83 return false;
85 *p_db_id = wkey.dptr + sizeof(uint32_t);
86 *p_db_id_len = db_id_len;
87 key->dptr = wkey.dptr + sizeof(uint32_t) + db_id_len;
88 key->dsize = wkey.dsize - sizeof(uint32_t) - db_id_len;
89 return true;
92 static NTSTATUS dbwrap_record_add_watcher(TDB_DATA w_key, struct server_id id)
94 struct TALLOC_CTX *frame = talloc_stackframe();
95 struct db_context *db;
96 struct db_record *rec;
97 TDB_DATA value;
98 struct server_id *ids;
99 size_t num_ids;
100 NTSTATUS status;
102 db = dbwrap_record_watchers_db();
103 if (db == NULL) {
104 status = map_nt_error_from_unix(errno);
105 goto fail;
107 rec = dbwrap_fetch_locked(db, talloc_tos(), w_key);
108 if (rec == NULL) {
109 status = map_nt_error_from_unix(errno);
110 goto fail;
112 value = dbwrap_record_get_value(rec);
114 if ((value.dsize % sizeof(struct server_id)) != 0) {
115 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
116 goto fail;
119 ids = (struct server_id *)value.dptr;
120 num_ids = value.dsize / sizeof(struct server_id);
122 ids = talloc_realloc(talloc_tos(), ids, struct server_id,
123 num_ids + 1);
124 if (ids == NULL) {
125 status = NT_STATUS_NO_MEMORY;
126 goto fail;
128 ids[num_ids] = id;
129 num_ids += 1;
131 status = dbwrap_record_store(
132 rec, make_tdb_data((uint8_t *)ids, talloc_get_size(ids)), 0);
133 fail:
134 TALLOC_FREE(frame);
135 return status;
138 static NTSTATUS dbwrap_record_del_watcher(TDB_DATA w_key, struct server_id id)
140 struct TALLOC_CTX *frame = talloc_stackframe();
141 struct db_context *db;
142 struct db_record *rec;
143 struct server_id *ids;
144 size_t i, num_ids;
145 TDB_DATA value;
146 NTSTATUS status;
148 db = dbwrap_record_watchers_db();
149 if (db == NULL) {
150 status = map_nt_error_from_unix(errno);
151 goto fail;
153 rec = dbwrap_fetch_locked(db, talloc_tos(), w_key);
154 if (rec == NULL) {
155 status = map_nt_error_from_unix(errno);
156 goto fail;
158 value = dbwrap_record_get_value(rec);
160 if ((value.dsize % sizeof(struct server_id)) != 0) {
161 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
162 goto fail;
165 ids = (struct server_id *)value.dptr;
166 num_ids = value.dsize / sizeof(struct server_id);
168 for (i=0; i<num_ids; i++) {
169 if (serverid_equal(&id, &ids[i])) {
170 ids[i] = ids[num_ids-1];
171 value.dsize -= sizeof(struct server_id);
172 break;
175 if (value.dsize == 0) {
176 status = dbwrap_record_delete(rec);
177 goto done;
179 status = dbwrap_record_store(rec, value, 0);
180 fail:
181 done:
182 TALLOC_FREE(frame);
183 return status;
186 static NTSTATUS dbwrap_record_get_watchers(struct db_context *db,
187 struct db_record *rec,
188 TALLOC_CTX *mem_ctx,
189 struct server_id **p_ids,
190 size_t *p_num_ids)
192 struct db_context *w_db;
193 TDB_DATA key = { 0, };
194 TDB_DATA value = { 0, };
195 struct server_id *ids;
196 NTSTATUS status;
198 key = dbwrap_record_watchers_key(talloc_tos(), db, rec, NULL);
199 if (key.dptr == NULL) {
200 status = NT_STATUS_NO_MEMORY;
201 goto fail;
203 w_db = dbwrap_record_watchers_db();
204 if (w_db == NULL) {
205 status = NT_STATUS_INTERNAL_ERROR;
206 goto fail;
208 status = dbwrap_fetch(w_db, mem_ctx, key, &value);
209 TALLOC_FREE(key.dptr);
210 if (!NT_STATUS_IS_OK(status)) {
211 goto fail;
213 if ((value.dsize % sizeof(struct server_id)) != 0) {
214 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
215 goto fail;
217 ids = (struct server_id *)value.dptr;
218 *p_ids = talloc_move(mem_ctx, &ids);
219 *p_num_ids = value.dsize / sizeof(struct server_id);
220 return NT_STATUS_OK;
221 fail:
222 TALLOC_FREE(key.dptr);
223 TALLOC_FREE(value.dptr);
224 return status;
227 struct dbwrap_record_watch_state {
228 struct tevent_context *ev;
229 struct db_context *db;
230 struct tevent_req *req;
231 struct messaging_context *msg;
232 struct msg_channel *channel;
233 TDB_DATA key;
234 TDB_DATA w_key;
237 static void dbwrap_record_watch_done(struct tevent_req *subreq);
238 static int dbwrap_record_watch_state_destructor(
239 struct dbwrap_record_watch_state *state);
241 struct tevent_req *dbwrap_record_watch_send(TALLOC_CTX *mem_ctx,
242 struct tevent_context *ev,
243 struct db_record *rec,
244 struct messaging_context *msg)
246 struct tevent_req *req, *subreq;
247 struct dbwrap_record_watch_state *state;
248 struct db_context *watchers_db;
249 NTSTATUS status;
250 int ret;
252 req = tevent_req_create(mem_ctx, &state,
253 struct dbwrap_record_watch_state);
254 if (req == NULL) {
255 return NULL;
257 state->db = dbwrap_record_get_db(rec);
258 state->ev = ev;
259 state->req = req;
260 state->msg = msg;
262 watchers_db = dbwrap_record_watchers_db();
263 if (watchers_db == NULL) {
264 tevent_req_nterror(req, map_nt_error_from_unix(errno));
265 return tevent_req_post(req, ev);
268 state->w_key = dbwrap_record_watchers_key(state, state->db, rec,
269 &state->key);
270 if (tevent_req_nomem(state->w_key.dptr, req)) {
271 return tevent_req_post(req, ev);
274 ret = msg_channel_init(state, state->msg, MSG_DBWRAP_MODIFIED,
275 &state->channel);
276 if (ret != 0) {
277 tevent_req_nterror(req, map_nt_error_from_unix(ret));
278 return tevent_req_post(req, ev);
281 status = dbwrap_record_add_watcher(
282 state->w_key, messaging_server_id(state->msg));
283 if (tevent_req_nterror(req, status)) {
284 return tevent_req_post(req, ev);
286 talloc_set_destructor(state, dbwrap_record_watch_state_destructor);
288 subreq = msg_read_send(state, state->ev, state->channel);
289 if (tevent_req_nomem(subreq, req)) {
290 return tevent_req_post(req, ev);
292 tevent_req_set_callback(subreq, dbwrap_record_watch_done, req);
293 return req;
296 static int dbwrap_record_watch_state_destructor(
297 struct dbwrap_record_watch_state *s)
299 if (s->msg != NULL) {
300 dbwrap_record_del_watcher(
301 s->w_key, messaging_server_id(s->msg));
303 return 0;
306 static void dbwrap_watch_record_stored(struct db_context *db,
307 struct db_record *rec,
308 void *private_data)
310 struct messaging_context *msg = talloc_get_type_abort(
311 private_data, struct messaging_context);
312 struct server_id *ids = NULL;
313 size_t num_ids = 0;
314 TDB_DATA w_key = { 0, };
315 DATA_BLOB w_blob;
316 NTSTATUS status;
317 uint32_t i;
319 status = dbwrap_record_get_watchers(db, rec, talloc_tos(),
320 &ids, &num_ids);
321 if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
322 goto done;
324 if (!NT_STATUS_IS_OK(status)) {
325 DEBUG(1, ("dbwrap_record_get_watchers failed: %s\n",
326 nt_errstr(status)));
327 goto done;
329 w_key = dbwrap_record_watchers_key(talloc_tos(), db, rec, NULL);
330 if (w_key.dptr == NULL) {
331 DEBUG(1, ("dbwrap_record_watchers_key failed\n"));
332 goto done;
334 w_blob.data = w_key.dptr;
335 w_blob.length = w_key.dsize;
337 for (i=0; i<num_ids; i++) {
338 status = messaging_send(msg, ids[i], MSG_DBWRAP_MODIFIED,
339 &w_blob);
340 if (!NT_STATUS_IS_OK(status)) {
341 char *str = procid_str_static(&ids[i]);
342 DEBUG(1, ("messaging_send to %s failed: %s\n",
343 str, nt_errstr(status)));
344 TALLOC_FREE(str);
347 done:
348 TALLOC_FREE(w_key.dptr);
349 TALLOC_FREE(ids);
350 return;
353 void dbwrap_watch_db(struct db_context *db, struct messaging_context *msg)
355 dbwrap_set_stored_callback(db, dbwrap_watch_record_stored, msg);
358 static void dbwrap_record_watch_done(struct tevent_req *subreq)
360 struct tevent_req *req = tevent_req_callback_data(
361 subreq, struct tevent_req);
362 struct dbwrap_record_watch_state *state = tevent_req_data(
363 req, struct dbwrap_record_watch_state);
364 struct messaging_rec *rec;
365 int ret;
367 ret = msg_read_recv(subreq, talloc_tos(), &rec);
368 TALLOC_FREE(subreq);
369 if (ret != 0) {
370 tevent_req_nterror(req, map_nt_error_from_unix(ret));
371 return;
374 if ((rec->buf.length == state->w_key.dsize) &&
375 (memcmp(rec->buf.data, state->w_key.dptr, rec->buf.length) == 0)) {
376 tevent_req_done(req);
377 return;
381 * Not our record, wait for the next one
383 subreq = msg_read_send(state, state->ev, state->channel);
384 if (tevent_req_nomem(subreq, req)) {
385 return;
387 tevent_req_set_callback(subreq, dbwrap_record_watch_done, req);
390 NTSTATUS dbwrap_record_watch_recv(struct tevent_req *req,
391 TALLOC_CTX *mem_ctx,
392 struct db_record **prec)
394 struct dbwrap_record_watch_state *state = tevent_req_data(
395 req, struct dbwrap_record_watch_state);
396 NTSTATUS status;
397 struct db_record *rec;
399 if (tevent_req_is_nterror(req, &status)) {
400 return status;
402 rec = dbwrap_fetch_locked(state->db, mem_ctx, state->key);
403 if (rec == NULL) {
404 return NT_STATUS_INTERNAL_DB_ERROR;
406 *prec = rec;
407 return NT_STATUS_OK;
410 struct dbwrap_watchers_traverse_read_state {
411 int (*fn)(const uint8_t *db_id, size_t db_id_len, const TDB_DATA key,
412 const struct server_id *watchers, size_t num_watchers,
413 void *private_data);
414 void *private_data;
417 static int dbwrap_watchers_traverse_read_callback(
418 struct db_record *rec, void *private_data)
420 struct dbwrap_watchers_traverse_read_state *state =
421 (struct dbwrap_watchers_traverse_read_state *)private_data;
422 uint8_t *db_id;
423 size_t db_id_len;
424 TDB_DATA w_key, key, w_data;
425 int res;
427 w_key = dbwrap_record_get_key(rec);
428 w_data = dbwrap_record_get_value(rec);
430 if (!dbwrap_record_watchers_key_parse(w_key, &db_id, &db_id_len,
431 &key)) {
432 return 0;
434 if ((w_data.dsize % sizeof(struct server_id)) != 0) {
435 return 0;
437 res = state->fn(db_id, db_id_len, key,
438 (struct server_id *)w_data.dptr,
439 w_data.dsize / sizeof(struct server_id),
440 state->private_data);
441 return res;
444 void dbwrap_watchers_traverse_read(
445 int (*fn)(const uint8_t *db_id, size_t db_id_len, const TDB_DATA key,
446 const struct server_id *watchers, size_t num_watchers,
447 void *private_data),
448 void *private_data)
450 struct dbwrap_watchers_traverse_read_state state;
451 struct db_context *db;
453 db = dbwrap_record_watchers_db();
454 if (db == NULL) {
455 return;
457 state.fn = fn;
458 state.private_data = private_data;
459 dbwrap_traverse_read(db, dbwrap_watchers_traverse_read_callback,
460 &state, NULL);
463 static int dbwrap_wakeall_cb(const uint8_t *db_id, size_t db_id_len,
464 const TDB_DATA key,
465 const struct server_id *watchers,
466 size_t num_watchers,
467 void *private_data)
469 struct messaging_context *msg = talloc_get_type_abort(
470 private_data, struct messaging_context);
471 uint32_t i;
472 DATA_BLOB blob;
474 blob.data = key.dptr;
475 blob.length = key.dsize;
477 for (i=0; i<num_watchers; i++) {
478 messaging_send(msg, watchers[i], MSG_DBWRAP_MODIFIED, &blob);
480 return 0;
483 void dbwrap_watchers_wakeall(struct messaging_context *msg)
485 dbwrap_watchers_traverse_read(dbwrap_wakeall_cb, msg);