s4: torture: Add an async SMB2_OP_FLUSH + SMB2_OP_CLOSE test to smb2.compound_async.
[Samba.git] / source4 / dsdb / repl / drepl_notify.c
blob20be3b53dd0e40be4a5662bee0804966b49075fe
1 /*
2 Unix SMB/CIFS Implementation.
4 DSDB replication service periodic notification handling
6 Copyright (C) Andrew Tridgell 2009
7 based on drepl_periodic
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
24 #include "includes.h"
25 #include "lib/events/events.h"
26 #include "dsdb/samdb/samdb.h"
27 #include "auth/auth.h"
28 #include "samba/service.h"
29 #include "dsdb/repl/drepl_service.h"
30 #include <ldb_errors.h>
31 #include "../lib/util/dlinklist.h"
32 #include "librpc/gen_ndr/ndr_misc.h"
33 #include "librpc/gen_ndr/ndr_drsuapi.h"
34 #include "librpc/gen_ndr/ndr_drsblobs.h"
35 #include "libcli/composite/composite.h"
36 #include "../lib/util/tevent_ntstatus.h"
38 #undef DBGC_CLASS
39 #define DBGC_CLASS DBGC_DRS_REPL
42 struct dreplsrv_op_notify_state {
43 struct tevent_context *ev;
44 struct dreplsrv_notify_operation *op;
45 void *ndr_struct_ptr;
48 static void dreplsrv_op_notify_connect_done(struct tevent_req *subreq);
51 start the ReplicaSync async call
53 static struct tevent_req *dreplsrv_op_notify_send(TALLOC_CTX *mem_ctx,
54 struct tevent_context *ev,
55 struct dreplsrv_notify_operation *op)
57 struct tevent_req *req;
58 struct dreplsrv_op_notify_state *state;
59 struct tevent_req *subreq;
61 req = tevent_req_create(mem_ctx, &state,
62 struct dreplsrv_op_notify_state);
63 if (req == NULL) {
64 return NULL;
66 state->ev = ev;
67 state->op = op;
69 subreq = dreplsrv_out_drsuapi_send(state,
70 ev,
71 op->source_dsa->conn);
72 if (tevent_req_nomem(subreq, req)) {
73 return tevent_req_post(req, ev);
75 tevent_req_set_callback(subreq, dreplsrv_op_notify_connect_done, req);
77 return req;
80 static void dreplsrv_op_notify_replica_sync_trigger(struct tevent_req *req);
82 static void dreplsrv_op_notify_connect_done(struct tevent_req *subreq)
84 struct tevent_req *req = tevent_req_callback_data(subreq,
85 struct tevent_req);
86 NTSTATUS status;
88 status = dreplsrv_out_drsuapi_recv(subreq);
89 TALLOC_FREE(subreq);
90 if (tevent_req_nterror(req, status)) {
91 return;
94 dreplsrv_op_notify_replica_sync_trigger(req);
97 static void dreplsrv_op_notify_replica_sync_done(struct tevent_req *subreq);
99 static void dreplsrv_op_notify_replica_sync_trigger(struct tevent_req *req)
101 struct dreplsrv_op_notify_state *state =
102 tevent_req_data(req,
103 struct dreplsrv_op_notify_state);
104 struct dreplsrv_partition *partition = state->op->source_dsa->partition;
105 struct dreplsrv_drsuapi_connection *drsuapi = state->op->source_dsa->conn->drsuapi;
106 struct drsuapi_DsReplicaSync *r;
107 struct tevent_req *subreq;
109 r = talloc_zero(state, struct drsuapi_DsReplicaSync);
110 if (tevent_req_nomem(r, req)) {
111 return;
113 r->in.req = talloc_zero(r, union drsuapi_DsReplicaSyncRequest);
114 if (tevent_req_nomem(r, req)) {
115 return;
117 r->in.bind_handle = &drsuapi->bind_handle;
118 r->in.level = 1;
119 r->in.req->req1.naming_context = &partition->nc;
120 r->in.req->req1.source_dsa_guid = state->op->service->ntds_guid;
121 r->in.req->req1.options =
122 DRSUAPI_DRS_ASYNC_OP |
123 DRSUAPI_DRS_UPDATE_NOTIFICATION |
124 DRSUAPI_DRS_WRIT_REP;
126 if (state->op->is_urgent) {
127 r->in.req->req1.options |= DRSUAPI_DRS_SYNC_URGENT;
130 state->ndr_struct_ptr = r;
132 if (DEBUGLVL(10)) {
133 NDR_PRINT_IN_DEBUG(drsuapi_DsReplicaSync, r);
136 subreq = dcerpc_drsuapi_DsReplicaSync_r_send(state,
137 state->ev,
138 drsuapi->drsuapi_handle,
140 if (tevent_req_nomem(subreq, req)) {
141 return;
143 tevent_req_set_callback(subreq, dreplsrv_op_notify_replica_sync_done, req);
146 static void dreplsrv_op_notify_replica_sync_done(struct tevent_req *subreq)
148 struct tevent_req *req =
149 tevent_req_callback_data(subreq,
150 struct tevent_req);
151 struct dreplsrv_op_notify_state *state =
152 tevent_req_data(req,
153 struct dreplsrv_op_notify_state);
154 struct drsuapi_DsReplicaSync *r = talloc_get_type(state->ndr_struct_ptr,
155 struct drsuapi_DsReplicaSync);
156 NTSTATUS status;
158 state->ndr_struct_ptr = NULL;
160 status = dcerpc_drsuapi_DsReplicaSync_r_recv(subreq, r);
161 TALLOC_FREE(subreq);
162 if (tevent_req_nterror(req, status)) {
163 return;
166 if (!W_ERROR_IS_OK(r->out.result)) {
167 status = werror_to_ntstatus(r->out.result);
168 tevent_req_nterror(req, status);
169 return;
172 tevent_req_done(req);
175 static NTSTATUS dreplsrv_op_notify_recv(struct tevent_req *req)
177 return tevent_req_simple_recv_ntstatus(req);
181 called when a notify operation has completed
183 static void dreplsrv_notify_op_callback(struct tevent_req *subreq)
185 struct dreplsrv_notify_operation *op =
186 tevent_req_callback_data(subreq,
187 struct dreplsrv_notify_operation);
188 NTSTATUS status;
189 struct dreplsrv_service *s = op->service;
190 WERROR werr;
192 status = dreplsrv_op_notify_recv(subreq);
193 werr = ntstatus_to_werror(status);
194 TALLOC_FREE(subreq);
195 if (!NT_STATUS_IS_OK(status)) {
196 DBG_INFO("dreplsrv_notify: Failed to send DsReplicaSync to %s for %s - %s : %s\n",
197 op->source_dsa->repsFrom1->other_info->dns_name,
198 ldb_dn_get_linearized(op->source_dsa->partition->dn),
199 nt_errstr(status), win_errstr(werr));
200 } else {
201 DBG_INFO("dreplsrv_notify: DsReplicaSync successfully sent to %s\n",
202 op->source_dsa->repsFrom1->other_info->dns_name);
203 op->source_dsa->notify_uSN = op->uSN;
206 drepl_reps_update(s, "repsTo", op->source_dsa->partition->dn,
207 &op->source_dsa->repsFrom1->source_dsa_obj_guid,
208 werr);
210 talloc_free(op);
211 s->ops.n_current = NULL;
212 dreplsrv_run_pending_ops(s);
216 run any pending replica sync calls
218 void dreplsrv_notify_run_ops(struct dreplsrv_service *s)
220 struct dreplsrv_notify_operation *op;
221 struct tevent_req *subreq;
223 if (s->ops.n_current || s->ops.current) {
224 /* if there's still one running, we're done */
225 return;
228 if (!s->ops.notifies) {
229 /* if there're no pending operations, we're done */
230 return;
233 op = s->ops.notifies;
234 s->ops.n_current = op;
235 DLIST_REMOVE(s->ops.notifies, op);
237 subreq = dreplsrv_op_notify_send(op, s->task->event_ctx, op);
238 if (!subreq) {
239 DBG_ERR("dreplsrv_notify_run_ops: dreplsrv_op_notify_send[%s][%s] - no memory\n",
240 op->source_dsa->repsFrom1->other_info->dns_name,
241 ldb_dn_get_linearized(op->source_dsa->partition->dn));
242 return;
244 tevent_req_set_callback(subreq, dreplsrv_notify_op_callback, op);
245 DBG_INFO("started DsReplicaSync for %s to %s\n",
246 ldb_dn_get_linearized(op->source_dsa->partition->dn),
247 op->source_dsa->repsFrom1->other_info->dns_name);
252 find a source_dsa for a given guid
254 static struct dreplsrv_partition_source_dsa *dreplsrv_find_notify_dsa(struct dreplsrv_partition *p,
255 struct GUID *guid)
257 struct dreplsrv_partition_source_dsa *s;
259 /* first check the sources list */
260 for (s=p->sources; s; s=s->next) {
261 if (GUID_equal(&s->repsFrom1->source_dsa_obj_guid, guid)) {
262 return s;
266 /* then the notifies list */
267 for (s=p->notifies; s; s=s->next) {
268 if (GUID_equal(&s->repsFrom1->source_dsa_obj_guid, guid)) {
269 return s;
272 return NULL;
277 schedule a replicaSync message
279 static WERROR dreplsrv_schedule_notify_sync(struct dreplsrv_service *service,
280 struct dreplsrv_partition *p,
281 struct repsFromToBlob *reps,
282 TALLOC_CTX *mem_ctx,
283 uint64_t uSN,
284 bool is_urgent,
285 uint32_t replica_flags)
287 struct dreplsrv_notify_operation *op;
288 struct dreplsrv_partition_source_dsa *s;
290 s = dreplsrv_find_notify_dsa(p, &reps->ctr.ctr1.source_dsa_obj_guid);
291 if (s == NULL) {
292 DBG_ERR("Unable to find source_dsa for %s\n",
293 GUID_string(mem_ctx, &reps->ctr.ctr1.source_dsa_obj_guid));
294 return WERR_DS_UNAVAILABLE;
297 /* first try to find an existing notify operation */
298 for (op = service->ops.notifies; op; op = op->next) {
299 if (op->source_dsa != s) {
300 continue;
303 if (op->is_urgent != is_urgent) {
304 continue;
307 if (op->replica_flags != replica_flags) {
308 continue;
311 if (op->uSN < uSN) {
312 op->uSN = uSN;
315 /* reuse the notify operation, as it's not yet started */
316 return WERR_OK;
319 op = talloc_zero(mem_ctx, struct dreplsrv_notify_operation);
320 W_ERROR_HAVE_NO_MEMORY(op);
322 op->service = service;
323 op->source_dsa = s;
324 op->uSN = uSN;
325 op->is_urgent = is_urgent;
326 op->replica_flags = replica_flags;
327 op->schedule_time = time(NULL);
329 DLIST_ADD_END(service->ops.notifies, op);
330 talloc_steal(service, op);
331 return WERR_OK;
335 see if a partition has a hugher uSN than what is in the repsTo and
336 if so then send a DsReplicaSync
338 static WERROR dreplsrv_notify_check(struct dreplsrv_service *s,
339 struct dreplsrv_partition *p,
340 TALLOC_CTX *mem_ctx)
342 uint32_t count=0;
343 struct repsFromToBlob *reps;
344 WERROR werr;
345 uint64_t uSNHighest;
346 uint64_t uSNUrgent;
347 uint32_t i;
348 int ret;
350 werr = dsdb_loadreps(s->samdb, mem_ctx, p->dn, "repsTo", &reps, &count);
351 if (!W_ERROR_IS_OK(werr)) {
352 DBG_ERR("Failed to load repsTo for %s\n",
353 ldb_dn_get_linearized(p->dn));
354 return werr;
357 /* loads the partition uSNHighest and uSNUrgent */
358 ret = dsdb_load_partition_usn(s->samdb, p->dn, &uSNHighest, &uSNUrgent);
359 if (ret != LDB_SUCCESS || uSNHighest == 0) {
360 /* nothing to do */
361 return WERR_OK;
364 /* see if any of our partners need some of our objects */
365 for (i=0; i<count; i++) {
366 struct dreplsrv_partition_source_dsa *sdsa;
367 uint32_t replica_flags;
368 sdsa = dreplsrv_find_notify_dsa(p, &reps[i].ctr.ctr1.source_dsa_obj_guid);
369 replica_flags = reps[i].ctr.ctr1.replica_flags;
370 if (sdsa == NULL) continue;
371 if (sdsa->notify_uSN < uSNHighest) {
372 /* we need to tell this partner to replicate
373 with us */
374 bool is_urgent = sdsa->notify_uSN < uSNUrgent;
376 /* check if urgent replication is needed */
377 werr = dreplsrv_schedule_notify_sync(s, p, &reps[i], mem_ctx,
378 uSNHighest, is_urgent, replica_flags);
379 if (!W_ERROR_IS_OK(werr)) {
380 DBG_ERR("Failed to setup notify to %s for %s\n",
381 reps[i].ctr.ctr1.other_info->dns_name,
382 ldb_dn_get_linearized(p->dn));
383 return werr;
385 DBG_DEBUG("queued DsReplicaSync for %s to %s "
386 "(urgent=%s) uSN=%llu:%llu\n",
387 ldb_dn_get_linearized(p->dn),
388 reps[i].ctr.ctr1.other_info->dns_name,
389 is_urgent?"true":"false",
390 (unsigned long long)sdsa->notify_uSN,
391 (unsigned long long)uSNHighest);
395 return WERR_OK;
399 see if any of the partitions have changed, and if so then send a
400 DsReplicaSync to all the replica partners in the repsTo object
402 static WERROR dreplsrv_notify_check_all(struct dreplsrv_service *s, TALLOC_CTX *mem_ctx)
404 WERROR status;
405 struct dreplsrv_partition *p;
407 for (p = s->partitions; p; p = p->next) {
408 status = dreplsrv_notify_check(s, p, mem_ctx);
409 W_ERROR_NOT_OK_RETURN(status);
412 return WERR_OK;
415 static void dreplsrv_notify_run(struct dreplsrv_service *service);
417 static void dreplsrv_notify_handler_te(struct tevent_context *ev, struct tevent_timer *te,
418 struct timeval t, void *ptr)
420 struct dreplsrv_service *service = talloc_get_type(ptr, struct dreplsrv_service);
421 WERROR status;
423 service->notify.te = NULL;
425 dreplsrv_notify_run(service);
427 status = dreplsrv_notify_schedule(service, service->notify.interval);
428 if (!W_ERROR_IS_OK(status)) {
429 task_server_terminate(service->task, win_errstr(status), false);
430 return;
434 WERROR dreplsrv_notify_schedule(struct dreplsrv_service *service, uint32_t next_interval)
436 TALLOC_CTX *tmp_mem;
437 struct tevent_timer *new_te;
438 struct timeval next_time;
440 /* prevent looping */
441 if (next_interval == 0) next_interval = 1;
443 next_time = timeval_current_ofs(next_interval, 50);
445 if (service->notify.te) {
447 * if the timestamp of the new event is higher,
448 * as current next we don't need to reschedule
450 if (timeval_compare(&next_time, &service->notify.next_event) > 0) {
451 return WERR_OK;
455 /* reset the next scheduled timestamp */
456 service->notify.next_event = next_time;
458 new_te = tevent_add_timer(service->task->event_ctx, service,
459 service->notify.next_event,
460 dreplsrv_notify_handler_te, service);
461 W_ERROR_HAVE_NO_MEMORY(new_te);
463 tmp_mem = talloc_new(service);
464 DBG_DEBUG("dreplsrv_notify_schedule(%u) %sscheduled for: %s\n",
465 next_interval,
466 (service->notify.te?"re":""),
467 nt_time_string(tmp_mem, timeval_to_nttime(&next_time)));
468 talloc_free(tmp_mem);
470 talloc_free(service->notify.te);
471 service->notify.te = new_te;
473 return WERR_OK;
476 static void dreplsrv_notify_run(struct dreplsrv_service *service)
478 TALLOC_CTX *mem_ctx;
480 mem_ctx = talloc_new(service);
481 dreplsrv_notify_check_all(service, mem_ctx);
482 talloc_free(mem_ctx);
484 dreplsrv_run_pending_ops(service);