2 Unix SMB/CIFS implementation.
3 Watch dbwrap record changes
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/filesys.h"
22 #include "dbwrap/dbwrap.h"
23 #include "dbwrap_watch.h"
24 #include "dbwrap_open.h"
25 #include "lib/util/util_tdb.h"
26 #include "lib/util/tevent_ntstatus.h"
28 static struct db_context
*dbwrap_record_watchers_db(void)
30 static struct db_context
*watchers_db
;
32 if (watchers_db
== NULL
) {
33 watchers_db
= db_open(
34 NULL
, lock_path("dbwrap_watchers.tdb"), 0,
35 TDB_CLEAR_IF_FIRST
| TDB_INCOMPATIBLE_HASH
,
36 O_RDWR
|O_CREAT
, 0600, DBWRAP_LOCK_ORDER_3
,
42 static TDB_DATA
dbwrap_record_watchers_key(TALLOC_CTX
*mem_ctx
,
43 struct db_context
*db
,
44 struct db_record
*rec
,
51 dbwrap_db_id(db
, &db_id
, &db_id_len
);
52 key
= dbwrap_record_get_key(rec
);
54 wkey
.dsize
= sizeof(uint32_t) + db_id_len
+ key
.dsize
;
55 wkey
.dptr
= talloc_array(mem_ctx
, uint8_t, wkey
.dsize
);
56 if (wkey
.dptr
== NULL
) {
57 return make_tdb_data(NULL
, 0);
59 SIVAL(wkey
.dptr
, 0, db_id_len
);
60 memcpy(wkey
.dptr
+ sizeof(uint32_t), db_id
, db_id_len
);
61 memcpy(wkey
.dptr
+ sizeof(uint32_t) + db_id_len
, key
.dptr
, key
.dsize
);
63 if (rec_key
!= NULL
) {
64 rec_key
->dptr
= wkey
.dptr
+ sizeof(uint32_t) + db_id_len
;
65 rec_key
->dsize
= key
.dsize
;
71 static bool dbwrap_record_watchers_key_parse(
72 TDB_DATA wkey
, uint8_t **p_db_id
, size_t *p_db_id_len
, TDB_DATA
*key
)
76 if (wkey
.dsize
< sizeof(uint32_t)) {
77 DEBUG(1, ("Invalid watchers key\n"));
80 db_id_len
= IVAL(wkey
.dptr
, 0);
81 if (db_id_len
> (wkey
.dsize
- sizeof(uint32_t))) {
82 DEBUG(1, ("Invalid watchers key, wkey.dsize=%d, "
83 "db_id_len=%d\n", (int)wkey
.dsize
, (int)db_id_len
));
86 *p_db_id
= wkey
.dptr
+ sizeof(uint32_t);
87 *p_db_id_len
= db_id_len
;
88 key
->dptr
= wkey
.dptr
+ sizeof(uint32_t) + db_id_len
;
89 key
->dsize
= wkey
.dsize
- sizeof(uint32_t) - db_id_len
;
93 static NTSTATUS
dbwrap_record_add_watcher(TDB_DATA w_key
, struct server_id id
)
95 struct TALLOC_CTX
*frame
= talloc_stackframe();
96 struct db_context
*db
;
97 struct db_record
*rec
;
99 struct server_id
*ids
;
103 db
= dbwrap_record_watchers_db();
105 status
= map_nt_error_from_unix(errno
);
108 rec
= dbwrap_fetch_locked(db
, talloc_tos(), w_key
);
110 status
= map_nt_error_from_unix(errno
);
113 value
= dbwrap_record_get_value(rec
);
115 if ((value
.dsize
% sizeof(struct server_id
)) != 0) {
116 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
120 ids
= (struct server_id
*)value
.dptr
;
121 num_ids
= value
.dsize
/ sizeof(struct server_id
);
123 ids
= talloc_array(talloc_tos(), struct server_id
,
126 status
= NT_STATUS_NO_MEMORY
;
129 memcpy(ids
, value
.dptr
, value
.dsize
);
133 status
= dbwrap_record_store(
134 rec
, make_tdb_data((uint8_t *)ids
, talloc_get_size(ids
)), 0);
140 static NTSTATUS
dbwrap_record_del_watcher(TDB_DATA w_key
, struct server_id id
)
142 struct TALLOC_CTX
*frame
= talloc_stackframe();
143 struct db_context
*db
;
144 struct db_record
*rec
;
145 struct server_id
*ids
;
150 db
= dbwrap_record_watchers_db();
152 status
= map_nt_error_from_unix(errno
);
155 rec
= dbwrap_fetch_locked(db
, talloc_tos(), w_key
);
157 status
= map_nt_error_from_unix(errno
);
160 value
= dbwrap_record_get_value(rec
);
162 if ((value
.dsize
% sizeof(struct server_id
)) != 0) {
163 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
167 ids
= (struct server_id
*)value
.dptr
;
168 num_ids
= value
.dsize
/ sizeof(struct server_id
);
170 for (i
=0; i
<num_ids
; i
++) {
171 if (serverid_equal(&id
, &ids
[i
])) {
172 ids
[i
] = ids
[num_ids
-1];
173 value
.dsize
-= sizeof(struct server_id
);
177 if (value
.dsize
== 0) {
178 status
= dbwrap_record_delete(rec
);
181 status
= dbwrap_record_store(rec
, value
, 0);
188 static NTSTATUS
dbwrap_record_get_watchers(struct db_context
*db
,
189 struct db_record
*rec
,
191 struct server_id
**p_ids
,
194 struct db_context
*w_db
;
195 TDB_DATA key
= { 0, };
196 TDB_DATA value
= { 0, };
197 struct server_id
*ids
;
200 key
= dbwrap_record_watchers_key(talloc_tos(), db
, rec
, NULL
);
201 if (key
.dptr
== NULL
) {
202 status
= NT_STATUS_NO_MEMORY
;
205 w_db
= dbwrap_record_watchers_db();
207 status
= NT_STATUS_INTERNAL_ERROR
;
210 status
= dbwrap_fetch(w_db
, mem_ctx
, key
, &value
);
211 TALLOC_FREE(key
.dptr
);
212 if (!NT_STATUS_IS_OK(status
)) {
215 if ((value
.dsize
% sizeof(struct server_id
)) != 0) {
216 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
219 ids
= (struct server_id
*)value
.dptr
;
220 *p_ids
= talloc_move(mem_ctx
, &ids
);
221 *p_num_ids
= value
.dsize
/ sizeof(struct server_id
);
224 TALLOC_FREE(key
.dptr
);
225 TALLOC_FREE(value
.dptr
);
229 struct dbwrap_record_watch_state
{
230 struct tevent_context
*ev
;
231 struct db_context
*db
;
232 struct tevent_req
*req
;
233 struct messaging_context
*msg
;
238 static void dbwrap_record_watch_done(struct tevent_req
*subreq
);
239 static int dbwrap_record_watch_state_destructor(
240 struct dbwrap_record_watch_state
*state
);
242 struct tevent_req
*dbwrap_record_watch_send(TALLOC_CTX
*mem_ctx
,
243 struct tevent_context
*ev
,
244 struct db_record
*rec
,
245 struct messaging_context
*msg
)
247 struct tevent_req
*req
, *subreq
;
248 struct dbwrap_record_watch_state
*state
;
249 struct db_context
*watchers_db
;
252 req
= tevent_req_create(mem_ctx
, &state
,
253 struct dbwrap_record_watch_state
);
257 state
->db
= dbwrap_record_get_db(rec
);
262 watchers_db
= dbwrap_record_watchers_db();
263 if (watchers_db
== NULL
) {
264 tevent_req_nterror(req
, map_nt_error_from_unix(errno
));
265 return tevent_req_post(req
, ev
);
268 state
->w_key
= dbwrap_record_watchers_key(state
, state
->db
, rec
,
270 if (tevent_req_nomem(state
->w_key
.dptr
, req
)) {
271 return tevent_req_post(req
, ev
);
274 subreq
= messaging_read_send(state
, ev
, state
->msg
,
275 MSG_DBWRAP_MODIFIED
);
276 if (tevent_req_nomem(subreq
, req
)) {
277 return tevent_req_post(req
, ev
);
279 tevent_req_set_callback(subreq
, dbwrap_record_watch_done
, req
);
281 status
= dbwrap_record_add_watcher(
282 state
->w_key
, messaging_server_id(state
->msg
));
283 if (tevent_req_nterror(req
, status
)) {
284 return tevent_req_post(req
, ev
);
286 talloc_set_destructor(state
, dbwrap_record_watch_state_destructor
);
291 static int dbwrap_record_watch_state_destructor(
292 struct dbwrap_record_watch_state
*s
)
294 if (s
->msg
!= NULL
) {
295 dbwrap_record_del_watcher(
296 s
->w_key
, messaging_server_id(s
->msg
));
301 static void dbwrap_watch_record_stored(struct db_context
*db
,
302 struct db_record
*rec
,
305 struct messaging_context
*msg
= talloc_get_type_abort(
306 private_data
, struct messaging_context
);
307 struct server_id
*ids
= NULL
;
309 TDB_DATA w_key
= { 0, };
313 status
= dbwrap_record_get_watchers(db
, rec
, talloc_tos(),
315 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
318 if (!NT_STATUS_IS_OK(status
)) {
319 DEBUG(1, ("dbwrap_record_get_watchers failed: %s\n",
323 w_key
= dbwrap_record_watchers_key(talloc_tos(), db
, rec
, NULL
);
324 if (w_key
.dptr
== NULL
) {
325 DEBUG(1, ("dbwrap_record_watchers_key failed\n"));
329 for (i
=0; i
<num_ids
; i
++) {
330 status
= messaging_send_buf(msg
, ids
[i
], MSG_DBWRAP_MODIFIED
,
331 w_key
.dptr
, w_key
.dsize
);
332 if (!NT_STATUS_IS_OK(status
)) {
333 char *str
= procid_str_static(&ids
[i
]);
334 DEBUG(1, ("messaging_send to %s failed: %s\n",
335 str
, nt_errstr(status
)));
340 TALLOC_FREE(w_key
.dptr
);
345 void dbwrap_watch_db(struct db_context
*db
, struct messaging_context
*msg
)
347 dbwrap_set_stored_callback(db
, dbwrap_watch_record_stored
, msg
);
350 static void dbwrap_record_watch_done(struct tevent_req
*subreq
)
352 struct tevent_req
*req
= tevent_req_callback_data(
353 subreq
, struct tevent_req
);
354 struct dbwrap_record_watch_state
*state
= tevent_req_data(
355 req
, struct dbwrap_record_watch_state
);
356 struct messaging_rec
*rec
;
359 ret
= messaging_read_recv(subreq
, talloc_tos(), &rec
);
362 tevent_req_nterror(req
, map_nt_error_from_unix(ret
));
366 if ((rec
->buf
.length
== state
->w_key
.dsize
) &&
367 (memcmp(rec
->buf
.data
, state
->w_key
.dptr
, rec
->buf
.length
) == 0)) {
368 tevent_req_done(req
);
373 * Not our record, wait for the next one
375 subreq
= messaging_read_send(state
, state
->ev
, state
->msg
,
376 MSG_DBWRAP_MODIFIED
);
377 if (tevent_req_nomem(subreq
, req
)) {
380 tevent_req_set_callback(subreq
, dbwrap_record_watch_done
, req
);
383 NTSTATUS
dbwrap_record_watch_recv(struct tevent_req
*req
,
385 struct db_record
**prec
)
387 struct dbwrap_record_watch_state
*state
= tevent_req_data(
388 req
, struct dbwrap_record_watch_state
);
390 struct db_record
*rec
;
392 if (tevent_req_is_nterror(req
, &status
)) {
398 rec
= dbwrap_fetch_locked(state
->db
, mem_ctx
, state
->key
);
400 return NT_STATUS_INTERNAL_DB_ERROR
;
406 struct dbwrap_watchers_traverse_read_state
{
407 int (*fn
)(const uint8_t *db_id
, size_t db_id_len
, const TDB_DATA key
,
408 const struct server_id
*watchers
, size_t num_watchers
,
413 static int dbwrap_watchers_traverse_read_callback(
414 struct db_record
*rec
, void *private_data
)
416 struct dbwrap_watchers_traverse_read_state
*state
=
417 (struct dbwrap_watchers_traverse_read_state
*)private_data
;
420 TDB_DATA w_key
, key
, w_data
;
423 w_key
= dbwrap_record_get_key(rec
);
424 w_data
= dbwrap_record_get_value(rec
);
426 if (!dbwrap_record_watchers_key_parse(w_key
, &db_id
, &db_id_len
,
430 if ((w_data
.dsize
% sizeof(struct server_id
)) != 0) {
433 res
= state
->fn(db_id
, db_id_len
, key
,
434 (struct server_id
*)w_data
.dptr
,
435 w_data
.dsize
/ sizeof(struct server_id
),
436 state
->private_data
);
440 void dbwrap_watchers_traverse_read(
441 int (*fn
)(const uint8_t *db_id
, size_t db_id_len
, const TDB_DATA key
,
442 const struct server_id
*watchers
, size_t num_watchers
,
446 struct dbwrap_watchers_traverse_read_state state
;
447 struct db_context
*db
;
449 db
= dbwrap_record_watchers_db();
454 state
.private_data
= private_data
;
455 dbwrap_traverse_read(db
, dbwrap_watchers_traverse_read_callback
,
459 static int dbwrap_wakeall_cb(const uint8_t *db_id
, size_t db_id_len
,
461 const struct server_id
*watchers
,
465 struct messaging_context
*msg
= talloc_get_type_abort(
466 private_data
, struct messaging_context
);
470 blob
.data
= key
.dptr
;
471 blob
.length
= key
.dsize
;
473 for (i
=0; i
<num_watchers
; i
++) {
474 messaging_send(msg
, watchers
[i
], MSG_DBWRAP_MODIFIED
, &blob
);
479 void dbwrap_watchers_wakeall(struct messaging_context
*msg
)
481 dbwrap_watchers_traverse_read(dbwrap_wakeall_cb
, msg
);