libcli/cldap: make use of samba_tevent_context_init()
[Samba/gebeck_regimport.git] / source3 / lib / dbwrap / dbwrap_watch.c
blobd8f1b74a42c7765537a9653c0e5be2fe88972938
1 /*
2 Unix SMB/CIFS implementation.
3 Watch dbwrap record changes
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "dbwrap/dbwrap.h"
23 #include "dbwrap_watch.h"
24 #include "dbwrap_open.h"
25 #include "msg_channel.h"
26 #include "lib/util/util_tdb.h"
27 #include "lib/util/tevent_ntstatus.h"
29 static struct db_context *dbwrap_record_watchers_db(void)
31 static struct db_context *watchers_db;
33 if (watchers_db == NULL) {
34 watchers_db = db_open(
35 NULL, lock_path("dbwrap_watchers.tdb"), 0,
36 TDB_CLEAR_IF_FIRST | TDB_INCOMPATIBLE_HASH,
37 O_RDWR|O_CREAT, 0600, DBWRAP_LOCK_ORDER_3);
39 return watchers_db;
42 static TDB_DATA dbwrap_record_watchers_key(TALLOC_CTX *mem_ctx,
43 struct db_context *db,
44 struct db_record *rec,
45 TDB_DATA *rec_key)
47 const uint8_t *db_id;
48 size_t db_id_len;
49 TDB_DATA key, wkey;
51 dbwrap_db_id(db, &db_id, &db_id_len);
52 key = dbwrap_record_get_key(rec);
54 wkey.dsize = sizeof(uint32_t) + db_id_len + key.dsize;
55 wkey.dptr = talloc_array(mem_ctx, uint8_t, wkey.dsize);
56 if (wkey.dptr == NULL) {
57 return make_tdb_data(NULL, 0);
59 SIVAL(wkey.dptr, 0, db_id_len);
60 memcpy(wkey.dptr + sizeof(uint32_t), db_id, db_id_len);
61 memcpy(wkey.dptr + sizeof(uint32_t) + db_id_len, key.dptr, key.dsize);
63 if (rec_key != NULL) {
64 rec_key->dptr = wkey.dptr + sizeof(uint32_t) + db_id_len;
65 rec_key->dsize = key.dsize;
68 return wkey;
71 static bool dbwrap_record_watchers_key_parse(
72 TDB_DATA wkey, uint8_t **p_db_id, size_t *p_db_id_len, TDB_DATA *key)
74 size_t db_id_len;
76 if (wkey.dsize < sizeof(uint32_t)) {
77 DEBUG(1, ("Invalid watchers key\n"));
78 return false;
80 db_id_len = IVAL(wkey.dptr, 0);
81 if (db_id_len > (wkey.dsize - sizeof(uint32_t))) {
82 DEBUG(1, ("Invalid watchers key, wkey.dsize=%d, "
83 "db_id_len=%d\n", (int)wkey.dsize, (int)db_id_len));
84 return false;
86 *p_db_id = wkey.dptr + sizeof(uint32_t);
87 *p_db_id_len = db_id_len;
88 key->dptr = wkey.dptr + sizeof(uint32_t) + db_id_len;
89 key->dsize = wkey.dsize - sizeof(uint32_t) - db_id_len;
90 return true;
93 static NTSTATUS dbwrap_record_add_watcher(TDB_DATA w_key, struct server_id id)
95 struct TALLOC_CTX *frame = talloc_stackframe();
96 struct db_context *db;
97 struct db_record *rec;
98 TDB_DATA value;
99 struct server_id *ids;
100 size_t num_ids;
101 NTSTATUS status;
103 db = dbwrap_record_watchers_db();
104 if (db == NULL) {
105 status = map_nt_error_from_unix(errno);
106 goto fail;
108 rec = dbwrap_fetch_locked(db, talloc_tos(), w_key);
109 if (rec == NULL) {
110 status = map_nt_error_from_unix(errno);
111 goto fail;
113 value = dbwrap_record_get_value(rec);
115 if ((value.dsize % sizeof(struct server_id)) != 0) {
116 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
117 goto fail;
120 ids = (struct server_id *)value.dptr;
121 num_ids = value.dsize / sizeof(struct server_id);
123 ids = talloc_array(talloc_tos(), struct server_id,
124 num_ids + 1);
125 if (ids == NULL) {
126 status = NT_STATUS_NO_MEMORY;
127 goto fail;
129 memcpy(ids, value.dptr, value.dsize);
130 ids[num_ids] = id;
131 num_ids += 1;
133 status = dbwrap_record_store(
134 rec, make_tdb_data((uint8_t *)ids, talloc_get_size(ids)), 0);
135 fail:
136 TALLOC_FREE(frame);
137 return status;
140 static NTSTATUS dbwrap_record_del_watcher(TDB_DATA w_key, struct server_id id)
142 struct TALLOC_CTX *frame = talloc_stackframe();
143 struct db_context *db;
144 struct db_record *rec;
145 struct server_id *ids;
146 size_t i, num_ids;
147 TDB_DATA value;
148 NTSTATUS status;
150 db = dbwrap_record_watchers_db();
151 if (db == NULL) {
152 status = map_nt_error_from_unix(errno);
153 goto fail;
155 rec = dbwrap_fetch_locked(db, talloc_tos(), w_key);
156 if (rec == NULL) {
157 status = map_nt_error_from_unix(errno);
158 goto fail;
160 value = dbwrap_record_get_value(rec);
162 if ((value.dsize % sizeof(struct server_id)) != 0) {
163 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
164 goto fail;
167 ids = (struct server_id *)value.dptr;
168 num_ids = value.dsize / sizeof(struct server_id);
170 for (i=0; i<num_ids; i++) {
171 if (serverid_equal(&id, &ids[i])) {
172 ids[i] = ids[num_ids-1];
173 value.dsize -= sizeof(struct server_id);
174 break;
177 if (value.dsize == 0) {
178 status = dbwrap_record_delete(rec);
179 goto done;
181 status = dbwrap_record_store(rec, value, 0);
182 fail:
183 done:
184 TALLOC_FREE(frame);
185 return status;
188 static NTSTATUS dbwrap_record_get_watchers(struct db_context *db,
189 struct db_record *rec,
190 TALLOC_CTX *mem_ctx,
191 struct server_id **p_ids,
192 size_t *p_num_ids)
194 struct db_context *w_db;
195 TDB_DATA key = { 0, };
196 TDB_DATA value = { 0, };
197 struct server_id *ids;
198 NTSTATUS status;
200 key = dbwrap_record_watchers_key(talloc_tos(), db, rec, NULL);
201 if (key.dptr == NULL) {
202 status = NT_STATUS_NO_MEMORY;
203 goto fail;
205 w_db = dbwrap_record_watchers_db();
206 if (w_db == NULL) {
207 status = NT_STATUS_INTERNAL_ERROR;
208 goto fail;
210 status = dbwrap_fetch(w_db, mem_ctx, key, &value);
211 TALLOC_FREE(key.dptr);
212 if (!NT_STATUS_IS_OK(status)) {
213 goto fail;
215 if ((value.dsize % sizeof(struct server_id)) != 0) {
216 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
217 goto fail;
219 ids = (struct server_id *)value.dptr;
220 *p_ids = talloc_move(mem_ctx, &ids);
221 *p_num_ids = value.dsize / sizeof(struct server_id);
222 return NT_STATUS_OK;
223 fail:
224 TALLOC_FREE(key.dptr);
225 TALLOC_FREE(value.dptr);
226 return status;
229 struct dbwrap_record_watch_state {
230 struct tevent_context *ev;
231 struct db_context *db;
232 struct tevent_req *req;
233 struct messaging_context *msg;
234 struct msg_channel *channel;
235 TDB_DATA key;
236 TDB_DATA w_key;
239 static void dbwrap_record_watch_done(struct tevent_req *subreq);
240 static int dbwrap_record_watch_state_destructor(
241 struct dbwrap_record_watch_state *state);
243 struct tevent_req *dbwrap_record_watch_send(TALLOC_CTX *mem_ctx,
244 struct tevent_context *ev,
245 struct db_record *rec,
246 struct messaging_context *msg)
248 struct tevent_req *req, *subreq;
249 struct dbwrap_record_watch_state *state;
250 struct db_context *watchers_db;
251 NTSTATUS status;
252 int ret;
254 req = tevent_req_create(mem_ctx, &state,
255 struct dbwrap_record_watch_state);
256 if (req == NULL) {
257 return NULL;
259 state->db = dbwrap_record_get_db(rec);
260 state->ev = ev;
261 state->req = req;
262 state->msg = msg;
264 watchers_db = dbwrap_record_watchers_db();
265 if (watchers_db == NULL) {
266 tevent_req_nterror(req, map_nt_error_from_unix(errno));
267 return tevent_req_post(req, ev);
270 state->w_key = dbwrap_record_watchers_key(state, state->db, rec,
271 &state->key);
272 if (tevent_req_nomem(state->w_key.dptr, req)) {
273 return tevent_req_post(req, ev);
276 ret = msg_channel_init(state, state->msg, MSG_DBWRAP_MODIFIED,
277 &state->channel);
278 if (ret != 0) {
279 tevent_req_nterror(req, map_nt_error_from_unix(ret));
280 return tevent_req_post(req, ev);
283 status = dbwrap_record_add_watcher(
284 state->w_key, messaging_server_id(state->msg));
285 if (tevent_req_nterror(req, status)) {
286 return tevent_req_post(req, ev);
288 talloc_set_destructor(state, dbwrap_record_watch_state_destructor);
290 subreq = msg_read_send(state, state->ev, state->channel);
291 if (tevent_req_nomem(subreq, req)) {
292 return tevent_req_post(req, ev);
294 tevent_req_set_callback(subreq, dbwrap_record_watch_done, req);
295 return req;
298 static int dbwrap_record_watch_state_destructor(
299 struct dbwrap_record_watch_state *s)
301 if (s->msg != NULL) {
302 dbwrap_record_del_watcher(
303 s->w_key, messaging_server_id(s->msg));
305 return 0;
308 static void dbwrap_watch_record_stored(struct db_context *db,
309 struct db_record *rec,
310 void *private_data)
312 struct messaging_context *msg = talloc_get_type_abort(
313 private_data, struct messaging_context);
314 struct server_id *ids = NULL;
315 size_t num_ids = 0;
316 TDB_DATA w_key = { 0, };
317 DATA_BLOB w_blob;
318 NTSTATUS status;
319 uint32_t i;
321 status = dbwrap_record_get_watchers(db, rec, talloc_tos(),
322 &ids, &num_ids);
323 if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
324 goto done;
326 if (!NT_STATUS_IS_OK(status)) {
327 DEBUG(1, ("dbwrap_record_get_watchers failed: %s\n",
328 nt_errstr(status)));
329 goto done;
331 w_key = dbwrap_record_watchers_key(talloc_tos(), db, rec, NULL);
332 if (w_key.dptr == NULL) {
333 DEBUG(1, ("dbwrap_record_watchers_key failed\n"));
334 goto done;
336 w_blob.data = w_key.dptr;
337 w_blob.length = w_key.dsize;
339 for (i=0; i<num_ids; i++) {
340 status = messaging_send(msg, ids[i], MSG_DBWRAP_MODIFIED,
341 &w_blob);
342 if (!NT_STATUS_IS_OK(status)) {
343 char *str = procid_str_static(&ids[i]);
344 DEBUG(1, ("messaging_send to %s failed: %s\n",
345 str, nt_errstr(status)));
346 TALLOC_FREE(str);
349 done:
350 TALLOC_FREE(w_key.dptr);
351 TALLOC_FREE(ids);
352 return;
355 void dbwrap_watch_db(struct db_context *db, struct messaging_context *msg)
357 dbwrap_set_stored_callback(db, dbwrap_watch_record_stored, msg);
360 static void dbwrap_record_watch_done(struct tevent_req *subreq)
362 struct tevent_req *req = tevent_req_callback_data(
363 subreq, struct tevent_req);
364 struct dbwrap_record_watch_state *state = tevent_req_data(
365 req, struct dbwrap_record_watch_state);
366 struct messaging_rec *rec;
367 int ret;
369 ret = msg_read_recv(subreq, talloc_tos(), &rec);
370 TALLOC_FREE(subreq);
371 if (ret != 0) {
372 tevent_req_nterror(req, map_nt_error_from_unix(ret));
373 return;
376 if ((rec->buf.length == state->w_key.dsize) &&
377 (memcmp(rec->buf.data, state->w_key.dptr, rec->buf.length) == 0)) {
378 tevent_req_done(req);
379 return;
383 * Not our record, wait for the next one
385 subreq = msg_read_send(state, state->ev, state->channel);
386 if (tevent_req_nomem(subreq, req)) {
387 return;
389 tevent_req_set_callback(subreq, dbwrap_record_watch_done, req);
392 NTSTATUS dbwrap_record_watch_recv(struct tevent_req *req,
393 TALLOC_CTX *mem_ctx,
394 struct db_record **prec)
396 struct dbwrap_record_watch_state *state = tevent_req_data(
397 req, struct dbwrap_record_watch_state);
398 NTSTATUS status;
399 struct db_record *rec;
401 if (tevent_req_is_nterror(req, &status)) {
402 return status;
404 rec = dbwrap_fetch_locked(state->db, mem_ctx, state->key);
405 if (rec == NULL) {
406 return NT_STATUS_INTERNAL_DB_ERROR;
408 *prec = rec;
409 return NT_STATUS_OK;
412 struct dbwrap_watchers_traverse_read_state {
413 int (*fn)(const uint8_t *db_id, size_t db_id_len, const TDB_DATA key,
414 const struct server_id *watchers, size_t num_watchers,
415 void *private_data);
416 void *private_data;
419 static int dbwrap_watchers_traverse_read_callback(
420 struct db_record *rec, void *private_data)
422 struct dbwrap_watchers_traverse_read_state *state =
423 (struct dbwrap_watchers_traverse_read_state *)private_data;
424 uint8_t *db_id;
425 size_t db_id_len;
426 TDB_DATA w_key, key, w_data;
427 int res;
429 w_key = dbwrap_record_get_key(rec);
430 w_data = dbwrap_record_get_value(rec);
432 if (!dbwrap_record_watchers_key_parse(w_key, &db_id, &db_id_len,
433 &key)) {
434 return 0;
436 if ((w_data.dsize % sizeof(struct server_id)) != 0) {
437 return 0;
439 res = state->fn(db_id, db_id_len, key,
440 (struct server_id *)w_data.dptr,
441 w_data.dsize / sizeof(struct server_id),
442 state->private_data);
443 return res;
446 void dbwrap_watchers_traverse_read(
447 int (*fn)(const uint8_t *db_id, size_t db_id_len, const TDB_DATA key,
448 const struct server_id *watchers, size_t num_watchers,
449 void *private_data),
450 void *private_data)
452 struct dbwrap_watchers_traverse_read_state state;
453 struct db_context *db;
455 db = dbwrap_record_watchers_db();
456 if (db == NULL) {
457 return;
459 state.fn = fn;
460 state.private_data = private_data;
461 dbwrap_traverse_read(db, dbwrap_watchers_traverse_read_callback,
462 &state, NULL);
465 static int dbwrap_wakeall_cb(const uint8_t *db_id, size_t db_id_len,
466 const TDB_DATA key,
467 const struct server_id *watchers,
468 size_t num_watchers,
469 void *private_data)
471 struct messaging_context *msg = talloc_get_type_abort(
472 private_data, struct messaging_context);
473 uint32_t i;
474 DATA_BLOB blob;
476 blob.data = key.dptr;
477 blob.length = key.dsize;
479 for (i=0; i<num_watchers; i++) {
480 messaging_send(msg, watchers[i], MSG_DBWRAP_MODIFIED, &blob);
482 return 0;
485 void dbwrap_watchers_wakeall(struct messaging_context *msg)
487 dbwrap_watchers_traverse_read(dbwrap_wakeall_cb, msg);