2 Unix SMB/CIFS implementation.
3 Watch dbwrap record changes
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/filesys.h"
22 #include "dbwrap/dbwrap.h"
23 #include "dbwrap_watch.h"
24 #include "dbwrap_open.h"
25 #include "msg_channel.h"
26 #include "lib/util/util_tdb.h"
27 #include "lib/util/tevent_ntstatus.h"
29 static struct db_context
*dbwrap_record_watchers_db(void)
31 static struct db_context
*watchers_db
;
33 if (watchers_db
== NULL
) {
34 watchers_db
= db_open(
35 NULL
, lock_path("dbwrap_watchers.tdb"), 0,
36 TDB_CLEAR_IF_FIRST
| TDB_INCOMPATIBLE_HASH
,
37 O_RDWR
|O_CREAT
, 0600, DBWRAP_LOCK_ORDER_3
);
42 static TDB_DATA
dbwrap_record_watchers_key(TALLOC_CTX
*mem_ctx
,
43 struct db_context
*db
,
44 struct db_record
*rec
,
51 dbwrap_db_id(db
, &db_id
, &db_id_len
);
52 key
= dbwrap_record_get_key(rec
);
54 wkey
.dsize
= sizeof(uint32_t) + db_id_len
+ key
.dsize
;
55 wkey
.dptr
= talloc_array(mem_ctx
, uint8_t, wkey
.dsize
);
56 if (wkey
.dptr
== NULL
) {
57 return make_tdb_data(NULL
, 0);
59 SIVAL(wkey
.dptr
, 0, db_id_len
);
60 memcpy(wkey
.dptr
+ sizeof(uint32_t), db_id
, db_id_len
);
61 memcpy(wkey
.dptr
+ sizeof(uint32_t) + db_id_len
, key
.dptr
, key
.dsize
);
63 if (rec_key
!= NULL
) {
64 rec_key
->dptr
= wkey
.dptr
+ sizeof(uint32_t) + db_id_len
;
65 rec_key
->dsize
= key
.dsize
;
71 static bool dbwrap_record_watchers_key_parse(
72 TDB_DATA wkey
, uint8_t **p_db_id
, size_t *p_db_id_len
, TDB_DATA
*key
)
76 if (wkey
.dsize
< sizeof(uint32_t)) {
77 DEBUG(1, ("Invalid watchers key\n"));
80 db_id_len
= IVAL(wkey
.dptr
, 0);
81 if (db_id_len
> (wkey
.dsize
- sizeof(uint32_t))) {
82 DEBUG(1, ("Invalid watchers key, wkey.dsize=%d, "
83 "db_id_len=%d\n", (int)wkey
.dsize
, (int)db_id_len
));
86 *p_db_id
= wkey
.dptr
+ sizeof(uint32_t);
87 *p_db_id_len
= db_id_len
;
88 key
->dptr
= wkey
.dptr
+ sizeof(uint32_t) + db_id_len
;
89 key
->dsize
= wkey
.dsize
- sizeof(uint32_t) - db_id_len
;
93 static NTSTATUS
dbwrap_record_add_watcher(TDB_DATA w_key
, struct server_id id
)
95 struct TALLOC_CTX
*frame
= talloc_stackframe();
96 struct db_context
*db
;
97 struct db_record
*rec
;
99 struct server_id
*ids
;
103 db
= dbwrap_record_watchers_db();
105 status
= map_nt_error_from_unix(errno
);
108 rec
= dbwrap_fetch_locked(db
, talloc_tos(), w_key
);
110 status
= map_nt_error_from_unix(errno
);
113 value
= dbwrap_record_get_value(rec
);
115 if ((value
.dsize
% sizeof(struct server_id
)) != 0) {
116 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
120 ids
= (struct server_id
*)value
.dptr
;
121 num_ids
= value
.dsize
/ sizeof(struct server_id
);
123 ids
= talloc_array(talloc_tos(), struct server_id
,
126 status
= NT_STATUS_NO_MEMORY
;
129 memcpy(ids
, value
.dptr
, value
.dsize
);
133 status
= dbwrap_record_store(
134 rec
, make_tdb_data((uint8_t *)ids
, talloc_get_size(ids
)), 0);
140 static NTSTATUS
dbwrap_record_del_watcher(TDB_DATA w_key
, struct server_id id
)
142 struct TALLOC_CTX
*frame
= talloc_stackframe();
143 struct db_context
*db
;
144 struct db_record
*rec
;
145 struct server_id
*ids
;
150 db
= dbwrap_record_watchers_db();
152 status
= map_nt_error_from_unix(errno
);
155 rec
= dbwrap_fetch_locked(db
, talloc_tos(), w_key
);
157 status
= map_nt_error_from_unix(errno
);
160 value
= dbwrap_record_get_value(rec
);
162 if ((value
.dsize
% sizeof(struct server_id
)) != 0) {
163 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
167 ids
= (struct server_id
*)value
.dptr
;
168 num_ids
= value
.dsize
/ sizeof(struct server_id
);
170 for (i
=0; i
<num_ids
; i
++) {
171 if (serverid_equal(&id
, &ids
[i
])) {
172 ids
[i
] = ids
[num_ids
-1];
173 value
.dsize
-= sizeof(struct server_id
);
177 if (value
.dsize
== 0) {
178 status
= dbwrap_record_delete(rec
);
181 status
= dbwrap_record_store(rec
, value
, 0);
188 static NTSTATUS
dbwrap_record_get_watchers(struct db_context
*db
,
189 struct db_record
*rec
,
191 struct server_id
**p_ids
,
194 struct db_context
*w_db
;
195 TDB_DATA key
= { 0, };
196 TDB_DATA value
= { 0, };
197 struct server_id
*ids
;
200 key
= dbwrap_record_watchers_key(talloc_tos(), db
, rec
, NULL
);
201 if (key
.dptr
== NULL
) {
202 status
= NT_STATUS_NO_MEMORY
;
205 w_db
= dbwrap_record_watchers_db();
207 status
= NT_STATUS_INTERNAL_ERROR
;
210 status
= dbwrap_fetch(w_db
, mem_ctx
, key
, &value
);
211 TALLOC_FREE(key
.dptr
);
212 if (!NT_STATUS_IS_OK(status
)) {
215 if ((value
.dsize
% sizeof(struct server_id
)) != 0) {
216 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
219 ids
= (struct server_id
*)value
.dptr
;
220 *p_ids
= talloc_move(mem_ctx
, &ids
);
221 *p_num_ids
= value
.dsize
/ sizeof(struct server_id
);
224 TALLOC_FREE(key
.dptr
);
225 TALLOC_FREE(value
.dptr
);
229 struct dbwrap_record_watch_state
{
230 struct tevent_context
*ev
;
231 struct db_context
*db
;
232 struct tevent_req
*req
;
233 struct messaging_context
*msg
;
234 struct msg_channel
*channel
;
239 static void dbwrap_record_watch_done(struct tevent_req
*subreq
);
240 static int dbwrap_record_watch_state_destructor(
241 struct dbwrap_record_watch_state
*state
);
243 struct tevent_req
*dbwrap_record_watch_send(TALLOC_CTX
*mem_ctx
,
244 struct tevent_context
*ev
,
245 struct db_record
*rec
,
246 struct messaging_context
*msg
)
248 struct tevent_req
*req
, *subreq
;
249 struct dbwrap_record_watch_state
*state
;
250 struct db_context
*watchers_db
;
254 req
= tevent_req_create(mem_ctx
, &state
,
255 struct dbwrap_record_watch_state
);
259 state
->db
= dbwrap_record_get_db(rec
);
264 watchers_db
= dbwrap_record_watchers_db();
265 if (watchers_db
== NULL
) {
266 tevent_req_nterror(req
, map_nt_error_from_unix(errno
));
267 return tevent_req_post(req
, ev
);
270 state
->w_key
= dbwrap_record_watchers_key(state
, state
->db
, rec
,
272 if (tevent_req_nomem(state
->w_key
.dptr
, req
)) {
273 return tevent_req_post(req
, ev
);
276 ret
= msg_channel_init(state
, state
->msg
, MSG_DBWRAP_MODIFIED
,
279 tevent_req_nterror(req
, map_nt_error_from_unix(ret
));
280 return tevent_req_post(req
, ev
);
283 status
= dbwrap_record_add_watcher(
284 state
->w_key
, messaging_server_id(state
->msg
));
285 if (tevent_req_nterror(req
, status
)) {
286 return tevent_req_post(req
, ev
);
288 talloc_set_destructor(state
, dbwrap_record_watch_state_destructor
);
290 subreq
= msg_read_send(state
, state
->ev
, state
->channel
);
291 if (tevent_req_nomem(subreq
, req
)) {
292 return tevent_req_post(req
, ev
);
294 tevent_req_set_callback(subreq
, dbwrap_record_watch_done
, req
);
298 static int dbwrap_record_watch_state_destructor(
299 struct dbwrap_record_watch_state
*s
)
301 if (s
->msg
!= NULL
) {
302 dbwrap_record_del_watcher(
303 s
->w_key
, messaging_server_id(s
->msg
));
308 static void dbwrap_watch_record_stored(struct db_context
*db
,
309 struct db_record
*rec
,
312 struct messaging_context
*msg
= talloc_get_type_abort(
313 private_data
, struct messaging_context
);
314 struct server_id
*ids
= NULL
;
316 TDB_DATA w_key
= { 0, };
321 status
= dbwrap_record_get_watchers(db
, rec
, talloc_tos(),
323 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
326 if (!NT_STATUS_IS_OK(status
)) {
327 DEBUG(1, ("dbwrap_record_get_watchers failed: %s\n",
331 w_key
= dbwrap_record_watchers_key(talloc_tos(), db
, rec
, NULL
);
332 if (w_key
.dptr
== NULL
) {
333 DEBUG(1, ("dbwrap_record_watchers_key failed\n"));
336 w_blob
.data
= w_key
.dptr
;
337 w_blob
.length
= w_key
.dsize
;
339 for (i
=0; i
<num_ids
; i
++) {
340 status
= messaging_send(msg
, ids
[i
], MSG_DBWRAP_MODIFIED
,
342 if (!NT_STATUS_IS_OK(status
)) {
343 char *str
= procid_str_static(&ids
[i
]);
344 DEBUG(1, ("messaging_send to %s failed: %s\n",
345 str
, nt_errstr(status
)));
350 TALLOC_FREE(w_key
.dptr
);
355 void dbwrap_watch_db(struct db_context
*db
, struct messaging_context
*msg
)
357 dbwrap_set_stored_callback(db
, dbwrap_watch_record_stored
, msg
);
360 static void dbwrap_record_watch_done(struct tevent_req
*subreq
)
362 struct tevent_req
*req
= tevent_req_callback_data(
363 subreq
, struct tevent_req
);
364 struct dbwrap_record_watch_state
*state
= tevent_req_data(
365 req
, struct dbwrap_record_watch_state
);
366 struct messaging_rec
*rec
;
369 ret
= msg_read_recv(subreq
, talloc_tos(), &rec
);
372 tevent_req_nterror(req
, map_nt_error_from_unix(ret
));
376 if ((rec
->buf
.length
== state
->w_key
.dsize
) &&
377 (memcmp(rec
->buf
.data
, state
->w_key
.dptr
, rec
->buf
.length
) == 0)) {
378 tevent_req_done(req
);
383 * Not our record, wait for the next one
385 subreq
= msg_read_send(state
, state
->ev
, state
->channel
);
386 if (tevent_req_nomem(subreq
, req
)) {
389 tevent_req_set_callback(subreq
, dbwrap_record_watch_done
, req
);
392 NTSTATUS
dbwrap_record_watch_recv(struct tevent_req
*req
,
394 struct db_record
**prec
)
396 struct dbwrap_record_watch_state
*state
= tevent_req_data(
397 req
, struct dbwrap_record_watch_state
);
399 struct db_record
*rec
;
401 if (tevent_req_is_nterror(req
, &status
)) {
407 rec
= dbwrap_fetch_locked(state
->db
, mem_ctx
, state
->key
);
409 return NT_STATUS_INTERNAL_DB_ERROR
;
415 struct dbwrap_watchers_traverse_read_state
{
416 int (*fn
)(const uint8_t *db_id
, size_t db_id_len
, const TDB_DATA key
,
417 const struct server_id
*watchers
, size_t num_watchers
,
422 static int dbwrap_watchers_traverse_read_callback(
423 struct db_record
*rec
, void *private_data
)
425 struct dbwrap_watchers_traverse_read_state
*state
=
426 (struct dbwrap_watchers_traverse_read_state
*)private_data
;
429 TDB_DATA w_key
, key
, w_data
;
432 w_key
= dbwrap_record_get_key(rec
);
433 w_data
= dbwrap_record_get_value(rec
);
435 if (!dbwrap_record_watchers_key_parse(w_key
, &db_id
, &db_id_len
,
439 if ((w_data
.dsize
% sizeof(struct server_id
)) != 0) {
442 res
= state
->fn(db_id
, db_id_len
, key
,
443 (struct server_id
*)w_data
.dptr
,
444 w_data
.dsize
/ sizeof(struct server_id
),
445 state
->private_data
);
449 void dbwrap_watchers_traverse_read(
450 int (*fn
)(const uint8_t *db_id
, size_t db_id_len
, const TDB_DATA key
,
451 const struct server_id
*watchers
, size_t num_watchers
,
455 struct dbwrap_watchers_traverse_read_state state
;
456 struct db_context
*db
;
458 db
= dbwrap_record_watchers_db();
463 state
.private_data
= private_data
;
464 dbwrap_traverse_read(db
, dbwrap_watchers_traverse_read_callback
,
468 static int dbwrap_wakeall_cb(const uint8_t *db_id
, size_t db_id_len
,
470 const struct server_id
*watchers
,
474 struct messaging_context
*msg
= talloc_get_type_abort(
475 private_data
, struct messaging_context
);
479 blob
.data
= key
.dptr
;
480 blob
.length
= key
.dsize
;
482 for (i
=0; i
<num_watchers
; i
++) {
483 messaging_send(msg
, watchers
[i
], MSG_DBWRAP_MODIFIED
, &blob
);
488 void dbwrap_watchers_wakeall(struct messaging_context
*msg
)
490 dbwrap_watchers_traverse_read(dbwrap_wakeall_cb
, msg
);