2 Unix SMB/CIFS implementation.
3 Watch dbwrap record changes
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/filesys.h"
22 #include "dbwrap/dbwrap.h"
23 #include "dbwrap_watch.h"
24 #include "dbwrap_open.h"
25 #include "msg_channel.h"
26 #include "lib/util/util_tdb.h"
27 #include "lib/util/tevent_ntstatus.h"
29 static struct db_context
*dbwrap_record_watchers_db(void)
31 static struct db_context
*watchers_db
;
33 if (watchers_db
== NULL
) {
34 watchers_db
= db_open(NULL
, lock_path("dbwrap_watchers.tdb"),
35 0, TDB_CLEAR_IF_FIRST
, O_RDWR
|O_CREAT
,
36 0600, DBWRAP_LOCK_ORDER_3
);
41 static TDB_DATA
dbwrap_record_watchers_key(TALLOC_CTX
*mem_ctx
,
42 struct db_context
*db
,
43 struct db_record
*rec
,
50 dbwrap_db_id(db
, &db_id
, &db_id_len
);
51 key
= dbwrap_record_get_key(rec
);
53 wkey
.dsize
= sizeof(uint32_t) + db_id_len
+ key
.dsize
;
54 wkey
.dptr
= talloc_array(mem_ctx
, uint8_t, wkey
.dsize
);
55 if (wkey
.dptr
== NULL
) {
56 return make_tdb_data(NULL
, 0);
58 SIVAL(wkey
.dptr
, 0, db_id_len
);
59 memcpy(wkey
.dptr
+ sizeof(uint32_t), db_id
, db_id_len
);
60 memcpy(wkey
.dptr
+ sizeof(uint32_t) + db_id_len
, key
.dptr
, key
.dsize
);
62 if (rec_key
!= NULL
) {
63 rec_key
->dptr
= wkey
.dptr
+ sizeof(uint32_t) + db_id_len
;
64 rec_key
->dsize
= key
.dsize
;
70 static bool dbwrap_record_watchers_key_parse(
71 TDB_DATA wkey
, uint8_t **p_db_id
, size_t *p_db_id_len
, TDB_DATA
*key
)
75 if (wkey
.dsize
< sizeof(uint32_t)) {
76 DEBUG(1, ("Invalid watchers key\n"));
79 db_id_len
= IVAL(wkey
.dptr
, 0);
80 if (db_id_len
> (wkey
.dsize
- sizeof(uint32_t))) {
81 DEBUG(1, ("Invalid watchers key, wkey.dsize=%d, "
82 "db_id_len=%d\n", (int)wkey
.dsize
, (int)db_id_len
));
85 *p_db_id
= wkey
.dptr
+ sizeof(uint32_t);
86 *p_db_id_len
= db_id_len
;
87 key
->dptr
= wkey
.dptr
+ sizeof(uint32_t) + db_id_len
;
88 key
->dsize
= wkey
.dsize
- sizeof(uint32_t) - db_id_len
;
92 static NTSTATUS
dbwrap_record_add_watcher(TDB_DATA w_key
, struct server_id id
)
94 struct TALLOC_CTX
*frame
= talloc_stackframe();
95 struct db_context
*db
;
96 struct db_record
*rec
;
98 struct server_id
*ids
;
102 db
= dbwrap_record_watchers_db();
104 status
= map_nt_error_from_unix(errno
);
107 rec
= dbwrap_fetch_locked(db
, talloc_tos(), w_key
);
109 status
= map_nt_error_from_unix(errno
);
112 value
= dbwrap_record_get_value(rec
);
114 if ((value
.dsize
% sizeof(struct server_id
)) != 0) {
115 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
119 ids
= (struct server_id
*)value
.dptr
;
120 num_ids
= value
.dsize
/ sizeof(struct server_id
);
122 ids
= talloc_realloc(talloc_tos(), ids
, struct server_id
,
125 status
= NT_STATUS_NO_MEMORY
;
131 status
= dbwrap_record_store(
132 rec
, make_tdb_data((uint8_t *)ids
, talloc_get_size(ids
)), 0);
138 static NTSTATUS
dbwrap_record_del_watcher(TDB_DATA w_key
, struct server_id id
)
140 struct TALLOC_CTX
*frame
= talloc_stackframe();
141 struct db_context
*db
;
142 struct db_record
*rec
;
143 struct server_id
*ids
;
148 db
= dbwrap_record_watchers_db();
150 status
= map_nt_error_from_unix(errno
);
153 rec
= dbwrap_fetch_locked(db
, talloc_tos(), w_key
);
155 status
= map_nt_error_from_unix(errno
);
158 value
= dbwrap_record_get_value(rec
);
160 if ((value
.dsize
% sizeof(struct server_id
)) != 0) {
161 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
165 ids
= (struct server_id
*)value
.dptr
;
166 num_ids
= value
.dsize
/ sizeof(struct server_id
);
168 for (i
=0; i
<num_ids
; i
++) {
169 if (serverid_equal(&id
, &ids
[i
])) {
170 ids
[i
] = ids
[num_ids
-1];
171 value
.dsize
-= sizeof(struct server_id
);
175 if (value
.dsize
== 0) {
176 status
= dbwrap_record_delete(rec
);
179 status
= dbwrap_record_store(rec
, value
, 0);
186 static NTSTATUS
dbwrap_record_get_watchers(struct db_context
*db
,
187 struct db_record
*rec
,
189 struct server_id
**p_ids
,
192 struct db_context
*w_db
;
193 TDB_DATA key
= { 0, };
194 TDB_DATA value
= { 0, };
195 struct server_id
*ids
;
198 key
= dbwrap_record_watchers_key(talloc_tos(), db
, rec
, NULL
);
199 if (key
.dptr
== NULL
) {
200 status
= NT_STATUS_NO_MEMORY
;
203 w_db
= dbwrap_record_watchers_db();
205 status
= NT_STATUS_INTERNAL_ERROR
;
208 status
= dbwrap_fetch(w_db
, mem_ctx
, key
, &value
);
209 TALLOC_FREE(key
.dptr
);
210 if (!NT_STATUS_IS_OK(status
)) {
213 if ((value
.dsize
% sizeof(struct server_id
)) != 0) {
214 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
217 ids
= (struct server_id
*)value
.dptr
;
218 *p_ids
= talloc_move(mem_ctx
, &ids
);
219 *p_num_ids
= value
.dsize
/ sizeof(struct server_id
);
222 TALLOC_FREE(key
.dptr
);
223 TALLOC_FREE(value
.dptr
);
227 struct dbwrap_record_watch_state
{
228 struct tevent_context
*ev
;
229 struct db_context
*db
;
230 struct tevent_req
*req
;
231 struct messaging_context
*msg
;
232 struct msg_channel
*channel
;
237 static void dbwrap_record_watch_done(struct tevent_req
*subreq
);
238 static int dbwrap_record_watch_state_destructor(
239 struct dbwrap_record_watch_state
*state
);
241 struct tevent_req
*dbwrap_record_watch_send(TALLOC_CTX
*mem_ctx
,
242 struct tevent_context
*ev
,
243 struct db_record
*rec
,
244 struct messaging_context
*msg
)
246 struct tevent_req
*req
, *subreq
;
247 struct dbwrap_record_watch_state
*state
;
248 struct db_context
*watchers_db
;
252 req
= tevent_req_create(mem_ctx
, &state
,
253 struct dbwrap_record_watch_state
);
257 state
->db
= dbwrap_record_get_db(rec
);
262 watchers_db
= dbwrap_record_watchers_db();
263 if (watchers_db
== NULL
) {
264 tevent_req_nterror(req
, map_nt_error_from_unix(errno
));
265 return tevent_req_post(req
, ev
);
268 state
->w_key
= dbwrap_record_watchers_key(state
, state
->db
, rec
,
270 if (tevent_req_nomem(state
->w_key
.dptr
, req
)) {
271 return tevent_req_post(req
, ev
);
274 ret
= msg_channel_init(state
, state
->msg
, MSG_DBWRAP_MODIFIED
,
277 tevent_req_nterror(req
, map_nt_error_from_unix(ret
));
278 return tevent_req_post(req
, ev
);
281 status
= dbwrap_record_add_watcher(
282 state
->w_key
, messaging_server_id(state
->msg
));
283 if (tevent_req_nterror(req
, status
)) {
284 return tevent_req_post(req
, ev
);
286 talloc_set_destructor(state
, dbwrap_record_watch_state_destructor
);
288 subreq
= msg_read_send(state
, state
->ev
, state
->channel
);
289 if (tevent_req_nomem(subreq
, req
)) {
290 return tevent_req_post(req
, ev
);
292 tevent_req_set_callback(subreq
, dbwrap_record_watch_done
, req
);
296 static int dbwrap_record_watch_state_destructor(
297 struct dbwrap_record_watch_state
*s
)
299 if (s
->msg
!= NULL
) {
300 dbwrap_record_del_watcher(
301 s
->w_key
, messaging_server_id(s
->msg
));
306 static void dbwrap_watch_record_stored(struct db_context
*db
,
307 struct db_record
*rec
,
310 struct messaging_context
*msg
= talloc_get_type_abort(
311 private_data
, struct messaging_context
);
312 struct server_id
*ids
= NULL
;
314 TDB_DATA w_key
= { 0, };
319 status
= dbwrap_record_get_watchers(db
, rec
, talloc_tos(),
321 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
324 if (!NT_STATUS_IS_OK(status
)) {
325 DEBUG(1, ("dbwrap_record_get_watchers failed: %s\n",
329 w_key
= dbwrap_record_watchers_key(talloc_tos(), db
, rec
, NULL
);
330 if (w_key
.dptr
== NULL
) {
331 DEBUG(1, ("dbwrap_record_watchers_key failed\n"));
334 w_blob
.data
= w_key
.dptr
;
335 w_blob
.length
= w_key
.dsize
;
337 for (i
=0; i
<num_ids
; i
++) {
338 status
= messaging_send(msg
, ids
[i
], MSG_DBWRAP_MODIFIED
,
340 if (!NT_STATUS_IS_OK(status
)) {
341 char *str
= procid_str_static(&ids
[i
]);
342 DEBUG(1, ("messaging_send to %s failed: %s\n",
343 str
, nt_errstr(status
)));
348 TALLOC_FREE(w_key
.dptr
);
353 void dbwrap_watch_db(struct db_context
*db
, struct messaging_context
*msg
)
355 dbwrap_set_stored_callback(db
, dbwrap_watch_record_stored
, msg
);
358 static void dbwrap_record_watch_done(struct tevent_req
*subreq
)
360 struct tevent_req
*req
= tevent_req_callback_data(
361 subreq
, struct tevent_req
);
362 struct dbwrap_record_watch_state
*state
= tevent_req_data(
363 req
, struct dbwrap_record_watch_state
);
364 struct messaging_rec
*rec
;
367 ret
= msg_read_recv(subreq
, talloc_tos(), &rec
);
370 tevent_req_nterror(req
, map_nt_error_from_unix(ret
));
374 if ((rec
->buf
.length
== state
->w_key
.dsize
) &&
375 (memcmp(rec
->buf
.data
, state
->w_key
.dptr
, rec
->buf
.length
) == 0)) {
376 tevent_req_done(req
);
381 * Not our record, wait for the next one
383 subreq
= msg_read_send(state
, state
->ev
, state
->channel
);
384 if (tevent_req_nomem(subreq
, req
)) {
387 tevent_req_set_callback(subreq
, dbwrap_record_watch_done
, req
);
390 NTSTATUS
dbwrap_record_watch_recv(struct tevent_req
*req
,
392 struct db_record
**prec
)
394 struct dbwrap_record_watch_state
*state
= tevent_req_data(
395 req
, struct dbwrap_record_watch_state
);
397 struct db_record
*rec
;
399 if (tevent_req_is_nterror(req
, &status
)) {
402 rec
= dbwrap_fetch_locked(state
->db
, mem_ctx
, state
->key
);
404 return NT_STATUS_INTERNAL_DB_ERROR
;
410 struct dbwrap_watchers_traverse_read_state
{
411 int (*fn
)(const uint8_t *db_id
, size_t db_id_len
, const TDB_DATA key
,
412 const struct server_id
*watchers
, size_t num_watchers
,
417 static int dbwrap_watchers_traverse_read_callback(
418 struct db_record
*rec
, void *private_data
)
420 struct dbwrap_watchers_traverse_read_state
*state
=
421 (struct dbwrap_watchers_traverse_read_state
*)private_data
;
424 TDB_DATA w_key
, key
, w_data
;
427 w_key
= dbwrap_record_get_key(rec
);
428 w_data
= dbwrap_record_get_value(rec
);
430 if (!dbwrap_record_watchers_key_parse(w_key
, &db_id
, &db_id_len
,
434 if ((w_data
.dsize
% sizeof(struct server_id
)) != 0) {
437 res
= state
->fn(db_id
, db_id_len
, key
,
438 (struct server_id
*)w_data
.dptr
,
439 w_data
.dsize
/ sizeof(struct server_id
),
440 state
->private_data
);
444 void dbwrap_watchers_traverse_read(
445 int (*fn
)(const uint8_t *db_id
, size_t db_id_len
, const TDB_DATA key
,
446 const struct server_id
*watchers
, size_t num_watchers
,
450 struct dbwrap_watchers_traverse_read_state state
;
451 struct db_context
*db
;
453 db
= dbwrap_record_watchers_db();
458 state
.private_data
= private_data
;
459 dbwrap_traverse_read(db
, dbwrap_watchers_traverse_read_callback
,
463 static int dbwrap_wakeall_cb(const uint8_t *db_id
, size_t db_id_len
,
465 const struct server_id
*watchers
,
469 struct messaging_context
*msg
= talloc_get_type_abort(
470 private_data
, struct messaging_context
);
474 blob
.data
= key
.dptr
;
475 blob
.length
= key
.dsize
;
477 for (i
=0; i
<num_watchers
; i
++) {
478 messaging_send(msg
, watchers
[i
], MSG_DBWRAP_MODIFIED
, &blob
);
483 void dbwrap_watchers_wakeall(struct messaging_context
*msg
)
485 dbwrap_watchers_traverse_read(dbwrap_wakeall_cb
, msg
);