s4:dsdb/repl: convert dreplsrv_out_drsuapi_send/recv to tevent_req
[Samba/fernandojvsilva.git] / source4 / dsdb / repl / drepl_notify.c
blob2f0fa4892b175d7c0aaedb47f884635c0ed2571d
1 /*
2 Unix SMB/CIFS mplementation.
4 DSDB replication service periodic notification handling
6 Copyright (C) Andrew Tridgell 2009
7 based on drepl_periodic
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
24 #include "includes.h"
25 #include "lib/events/events.h"
26 #include "dsdb/samdb/samdb.h"
27 #include "auth/auth.h"
28 #include "smbd/service.h"
29 #include "lib/messaging/irpc.h"
30 #include "dsdb/repl/drepl_service.h"
31 #include "lib/ldb/include/ldb_errors.h"
32 #include "../lib/util/dlinklist.h"
33 #include "librpc/gen_ndr/ndr_misc.h"
34 #include "librpc/gen_ndr/ndr_drsuapi.h"
35 #include "librpc/gen_ndr/ndr_drsblobs.h"
36 #include "libcli/composite/composite.h"
39 struct dreplsrv_op_notify_state {
40 struct composite_context *creq;
42 struct dreplsrv_out_connection *conn;
44 struct dreplsrv_drsuapi_connection *drsuapi;
46 struct drsuapi_DsBindInfoCtr bind_info_ctr;
47 struct drsuapi_DsBind bind_r;
48 struct dreplsrv_notify_operation *op;
52 receive a DsReplicaSync reply
54 static void dreplsrv_op_notify_replica_sync_recv(struct rpc_request *req)
56 struct dreplsrv_op_notify_state *st = talloc_get_type(req->async.private_data,
57 struct dreplsrv_op_notify_state);
58 struct composite_context *c = st->creq;
59 struct drsuapi_DsReplicaSync *r = talloc_get_type(req->ndr.struct_ptr,
60 struct drsuapi_DsReplicaSync);
62 c->status = dcerpc_ndr_request_recv(req);
63 if (!composite_is_ok(c)) return;
65 if (!W_ERROR_IS_OK(r->out.result)) {
66 composite_error(c, werror_to_ntstatus(r->out.result));
67 return;
70 composite_done(c);
74 send a DsReplicaSync
76 static void dreplsrv_op_notify_replica_sync_send(struct dreplsrv_op_notify_state *st)
78 struct composite_context *c = st->creq;
79 struct dreplsrv_partition *partition = st->op->source_dsa->partition;
80 struct dreplsrv_drsuapi_connection *drsuapi = st->op->source_dsa->conn->drsuapi;
81 struct rpc_request *req;
82 struct drsuapi_DsReplicaSync *r;
84 r = talloc_zero(st, struct drsuapi_DsReplicaSync);
85 if (composite_nomem(r, c)) return;
87 r->in.bind_handle = &drsuapi->bind_handle;
88 r->in.level = 1;
89 r->in.req.req1.naming_context = &partition->nc;
90 r->in.req.req1.source_dsa_guid = st->op->service->ntds_guid;
91 r->in.req.req1.options =
92 DRSUAPI_DS_REPLICA_SYNC_ASYNCHRONOUS_OPERATION |
93 DRSUAPI_DS_REPLICA_SYNC_WRITEABLE |
94 DRSUAPI_DS_REPLICA_SYNC_ALL_SOURCES;
97 req = dcerpc_drsuapi_DsReplicaSync_send(drsuapi->pipe, r, r);
98 composite_continue_rpc(c, req, dreplsrv_op_notify_replica_sync_recv, st);
102 called when we have an established connection
104 static void dreplsrv_op_notify_connect_done(struct tevent_req *subreq)
106 struct dreplsrv_op_notify_state *st = tevent_req_callback_data(subreq,
107 struct dreplsrv_op_notify_state);
108 struct composite_context *c = st->creq;
110 c->status = dreplsrv_out_drsuapi_recv(subreq);
111 TALLOC_FREE(subreq);
112 if (!composite_is_ok(c)) return;
114 dreplsrv_op_notify_replica_sync_send(st);
118 start the ReplicaSync async call
120 static struct composite_context *dreplsrv_op_notify_send(struct dreplsrv_notify_operation *op)
122 struct composite_context *c;
123 struct dreplsrv_op_notify_state *st;
124 struct tevent_req *subreq;
126 c = composite_create(op, op->service->task->event_ctx);
127 if (c == NULL) return NULL;
129 st = talloc_zero(c, struct dreplsrv_op_notify_state);
130 if (composite_nomem(st, c)) return c;
132 st->creq = c;
133 st->op = op;
135 subreq = dreplsrv_out_drsuapi_send(st,
136 op->service->task->event_ctx,
137 op->source_dsa->conn);
138 if (composite_nomem(subreq, c)) return c;
139 tevent_req_set_callback(subreq, dreplsrv_op_notify_connect_done, st);
141 return c;
144 static void dreplsrv_notify_del_repsTo(struct dreplsrv_notify_operation *op)
146 uint32_t count;
147 struct repsFromToBlob *reps;
148 WERROR werr;
149 struct dreplsrv_service *s = op->service;
150 int i;
152 werr = dsdb_loadreps(s->samdb, op, op->source_dsa->partition->dn, "repsTo", &reps, &count);
153 if (!W_ERROR_IS_OK(werr)) {
154 DEBUG(0,(__location__ ": Failed to load repsTo for %s\n",
155 ldb_dn_get_linearized(op->source_dsa->partition->dn)));
156 return;
159 for (i=0; i<count; i++) {
160 if (GUID_compare(&reps[i].ctr.ctr1.source_dsa_obj_guid,
161 &op->source_dsa->repsFrom1->source_dsa_obj_guid) == 0) {
162 memmove(&reps[i], &reps[i+1],
163 sizeof(reps[i])*(count-(i+1)));
164 count--;
168 werr = dsdb_savereps(s->samdb, op, op->source_dsa->partition->dn, "repsTo", reps, count);
169 if (!W_ERROR_IS_OK(werr)) {
170 DEBUG(0,(__location__ ": Failed to save repsTo for %s\n",
171 ldb_dn_get_linearized(op->source_dsa->partition->dn)));
172 return;
177 called when a notify operation has completed
179 static void dreplsrv_notify_op_callback(struct dreplsrv_notify_operation *op)
181 NTSTATUS status;
182 struct dreplsrv_service *s = op->service;
184 status = composite_wait(op->creq);
185 if (!NT_STATUS_IS_OK(status)) {
186 DEBUG(0,("dreplsrv_notify: Failed to send DsReplicaSync to %s for %s - %s\n",
187 op->source_dsa->repsFrom1->other_info->dns_name,
188 ldb_dn_get_linearized(op->source_dsa->partition->dn),
189 nt_errstr(status)));
190 } else {
191 DEBUG(2,("dreplsrv_notify: DsReplicaSync OK for %s\n",
192 op->source_dsa->repsFrom1->other_info->dns_name));
193 op->source_dsa->notify_uSN = op->uSN;
194 /* delete the repsTo for this replication partner in the
195 partition, as we have successfully told him to sync */
196 dreplsrv_notify_del_repsTo(op);
198 talloc_free(op->creq);
200 talloc_free(op);
201 s->ops.n_current = NULL;
202 dreplsrv_notify_run_ops(s);
206 static void dreplsrv_notify_op_callback_creq(struct composite_context *creq)
208 struct dreplsrv_notify_operation *op = talloc_get_type(creq->async.private_data,
209 struct dreplsrv_notify_operation);
210 dreplsrv_notify_op_callback(op);
214 run any pending replica sync calls
216 void dreplsrv_notify_run_ops(struct dreplsrv_service *s)
218 struct dreplsrv_notify_operation *op;
220 if (s->ops.n_current || s->ops.current) {
221 /* if there's still one running, we're done */
222 return;
225 if (!s->ops.notifies) {
226 /* if there're no pending operations, we're done */
227 return;
230 op = s->ops.notifies;
231 s->ops.n_current = op;
232 DLIST_REMOVE(s->ops.notifies, op);
234 op->creq = dreplsrv_op_notify_send(op);
235 if (!op->creq) {
236 dreplsrv_notify_op_callback(op);
237 return;
240 op->creq->async.fn = dreplsrv_notify_op_callback_creq;
241 op->creq->async.private_data = op;
246 find a source_dsa for a given guid
248 static struct dreplsrv_partition_source_dsa *dreplsrv_find_source_dsa(struct dreplsrv_partition *p,
249 struct GUID *guid)
251 struct dreplsrv_partition_source_dsa *s;
253 for (s=p->sources; s; s=s->next) {
254 if (GUID_compare(&s->repsFrom1->source_dsa_obj_guid, guid) == 0) {
255 return s;
258 return NULL;
263 schedule a replicaSync message
265 static WERROR dreplsrv_schedule_notify_sync(struct dreplsrv_service *service,
266 struct dreplsrv_partition *p,
267 struct repsFromToBlob *reps,
268 TALLOC_CTX *mem_ctx,
269 uint64_t uSN)
271 struct dreplsrv_notify_operation *op;
272 struct dreplsrv_partition_source_dsa *s;
274 s = dreplsrv_find_source_dsa(p, &reps->ctr.ctr1.source_dsa_obj_guid);
275 if (s == NULL) {
276 DEBUG(0,(__location__ ": Unable to find source_dsa for %s\n",
277 GUID_string(mem_ctx, &reps->ctr.ctr1.source_dsa_obj_guid)));
278 return WERR_DS_UNAVAILABLE;
281 op = talloc_zero(mem_ctx, struct dreplsrv_notify_operation);
282 W_ERROR_HAVE_NO_MEMORY(op);
284 op->service = service;
285 op->source_dsa = s;
286 op->uSN = uSN;
288 DLIST_ADD_END(service->ops.notifies, op, struct dreplsrv_notify_operation *);
289 talloc_steal(service, op);
290 return WERR_OK;
294 see if a partition has a hugher uSN than what is in the repsTo and
295 if so then send a DsReplicaSync
297 static WERROR dreplsrv_notify_check(struct dreplsrv_service *s,
298 struct dreplsrv_partition *p,
299 TALLOC_CTX *mem_ctx)
301 uint32_t count=0;
302 struct repsFromToBlob *reps;
303 WERROR werr;
304 uint64_t uSN;
305 int ret, i;
307 werr = dsdb_loadreps(s->samdb, mem_ctx, p->dn, "repsTo", &reps, &count);
308 if (count == 0) {
309 werr = dsdb_loadreps(s->samdb, mem_ctx, p->dn, "repsFrom", &reps, &count);
311 if (!W_ERROR_IS_OK(werr)) {
312 DEBUG(0,(__location__ ": Failed to load repsTo for %s\n",
313 ldb_dn_get_linearized(p->dn)));
314 return werr;
317 /* loads the partition uSNHighest */
318 ret = dsdb_load_partition_usn(s->samdb, p->dn, &uSN);
319 if (ret != LDB_SUCCESS || uSN == 0) {
320 /* nothing to do */
321 return WERR_OK;
324 /* see if any of our partners need some of our objects */
325 for (i=0; i<count; i++) {
326 struct dreplsrv_partition_source_dsa *sdsa;
327 sdsa = dreplsrv_find_source_dsa(p, &reps[i].ctr.ctr1.source_dsa_obj_guid);
328 if (sdsa == NULL) continue;
329 if (sdsa->notify_uSN < uSN) {
330 /* we need to tell this partner to replicate
331 with us */
332 werr = dreplsrv_schedule_notify_sync(s, p, &reps[i], mem_ctx, uSN);
333 if (!W_ERROR_IS_OK(werr)) {
334 DEBUG(0,(__location__ ": Failed to setup notify to %s for %s\n",
335 reps[i].ctr.ctr1.other_info->dns_name,
336 ldb_dn_get_linearized(p->dn)));
337 return werr;
342 return WERR_OK;
346 see if any of the partitions have changed, and if so then send a
347 DsReplicaSync to all the replica partners in the repsTo object
349 static WERROR dreplsrv_notify_check_all(struct dreplsrv_service *s, TALLOC_CTX *mem_ctx)
351 WERROR status;
352 struct dreplsrv_partition *p;
354 for (p = s->partitions; p; p = p->next) {
355 status = dreplsrv_notify_check(s, p, mem_ctx);
356 W_ERROR_NOT_OK_RETURN(status);
359 return WERR_OK;
362 static void dreplsrv_notify_run(struct dreplsrv_service *service);
364 static void dreplsrv_notify_handler_te(struct tevent_context *ev, struct tevent_timer *te,
365 struct timeval t, void *ptr)
367 struct dreplsrv_service *service = talloc_get_type(ptr, struct dreplsrv_service);
368 WERROR status;
370 service->notify.te = NULL;
372 dreplsrv_notify_run(service);
374 status = dreplsrv_notify_schedule(service, service->notify.interval);
375 if (!W_ERROR_IS_OK(status)) {
376 task_server_terminate(service->task, win_errstr(status), false);
377 return;
381 WERROR dreplsrv_notify_schedule(struct dreplsrv_service *service, uint32_t next_interval)
383 TALLOC_CTX *tmp_mem;
384 struct tevent_timer *new_te;
385 struct timeval next_time;
387 /* prevent looping */
388 if (next_interval == 0) next_interval = 1;
390 next_time = timeval_current_ofs(next_interval, 50);
392 if (service->notify.te) {
394 * if the timestamp of the new event is higher,
395 * as current next we don't need to reschedule
397 if (timeval_compare(&next_time, &service->notify.next_event) > 0) {
398 return WERR_OK;
402 /* reset the next scheduled timestamp */
403 service->notify.next_event = next_time;
405 new_te = event_add_timed(service->task->event_ctx, service,
406 service->notify.next_event,
407 dreplsrv_notify_handler_te, service);
408 W_ERROR_HAVE_NO_MEMORY(new_te);
410 tmp_mem = talloc_new(service);
411 DEBUG(4,("dreplsrv_notify_schedule(%u) %sscheduled for: %s\n",
412 next_interval,
413 (service->notify.te?"re":""),
414 nt_time_string(tmp_mem, timeval_to_nttime(&next_time))));
415 talloc_free(tmp_mem);
417 talloc_free(service->notify.te);
418 service->notify.te = new_te;
420 return WERR_OK;
423 static void dreplsrv_notify_run(struct dreplsrv_service *service)
425 TALLOC_CTX *mem_ctx;
427 mem_ctx = talloc_new(service);
428 dreplsrv_notify_check_all(service, mem_ctx);
429 talloc_free(mem_ctx);
431 dreplsrv_run_pending_ops(service);
432 dreplsrv_notify_run_ops(service);