s4-dsdb: always cancel transactions on all partitions
[Samba/cd1.git] / source4 / dsdb / samdb / ldb_modules / partition.c
blob01ae0a10a70914ebb10cae82113b78596d2e0ed4
1 /*
2 Partitions ldb module
4 Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5 Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 * Name: ldb
24 * Component: ldb partitions module
26 * Description: Implement LDAP partitions
28 * Author: Andrew Bartlett
29 * Author: Stefan Metzmacher
32 #include "dsdb/samdb/ldb_modules/partition.h"
34 struct part_request {
35 struct ldb_module *module;
36 struct ldb_request *req;
39 struct partition_context {
40 struct ldb_module *module;
41 struct ldb_request *req;
42 bool got_success;
44 struct part_request *part_req;
45 int num_requests;
46 int finished_requests;
49 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
51 struct partition_context *ac;
53 ac = talloc_zero(req, struct partition_context);
54 if (ac == NULL) {
55 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
56 return NULL;
59 ac->module = module;
60 ac->req = req;
62 return ac;
66 * helper functions to call the next module in chain
67 * */
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
71 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) { \
72 const struct dsdb_control_current_partition *partition = NULL;
73 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74 if (partition_ctrl) {
75 partition = talloc_get_type(partition_ctrl->data,
76 struct dsdb_control_current_partition);
79 if (partition != NULL) {
80 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> %s",
81 ldb_dn_get_linearized(partition->dn));
82 } else {
83 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");
87 return ldb_next_request(module, request);
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91 struct ldb_dn *dn,
92 struct ldb_request *req)
94 int i;
95 struct ldb_control *partition_ctrl;
97 /* see if the request has the partition DN specified in a
98 * control. The repl_meta_data module can specify this to
99 * ensure that replication happens to the right partition
101 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102 if (partition_ctrl) {
103 const struct dsdb_control_current_partition *partition;
104 partition = talloc_get_type(partition_ctrl->data,
105 struct dsdb_control_current_partition);
106 if (partition != NULL) {
107 dn = partition->dn;
111 if (dn == NULL) {
112 return NULL;
115 /* Look at base DN */
116 /* Figure out which partition it is under */
117 /* Skip the lot if 'data' isn't here yet (initialisation) */
118 for (i=0; data && data->partitions && data->partitions[i]; i++) {
119 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120 return data->partitions[i];
124 return NULL;
128 * fire the caller's callback for every entry, but only send 'done' once.
130 static int partition_req_callback(struct ldb_request *req,
131 struct ldb_reply *ares)
133 struct partition_context *ac;
134 struct ldb_module *module;
135 struct ldb_request *nreq;
136 int ret;
137 struct partition_private_data *data;
138 struct ldb_control *partition_ctrl;
140 ac = talloc_get_type(req->context, struct partition_context);
141 data = talloc_get_type(ac->module->private_data, struct partition_private_data);
143 if (!ares) {
144 return ldb_module_done(ac->req, NULL, NULL,
145 LDB_ERR_OPERATIONS_ERROR);
148 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
149 if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
150 /* If we didn't fan this request out to mulitple partitions,
151 * or this is an individual search result, we can
152 * deterministily tell the caller what partition this was
153 * written to (repl_meta_data likes to know) */
154 ret = ldb_reply_add_control(ares,
155 DSDB_CONTROL_CURRENT_PARTITION_OID,
156 false, partition_ctrl->data);
157 if (ret != LDB_SUCCESS) {
158 return ldb_module_done(ac->req, NULL, NULL,
159 ret);
163 if (ares->error != LDB_SUCCESS && !ac->got_success) {
164 return ldb_module_done(ac->req, ares->controls,
165 ares->response, ares->error);
168 switch (ares->type) {
169 case LDB_REPLY_REFERRAL:
170 /* ignore referrals for now */
171 break;
173 case LDB_REPLY_ENTRY:
174 if (ac->req->operation != LDB_SEARCH) {
175 ldb_set_errstring(ldb_module_get_ctx(ac->module),
176 "partition_req_callback:"
177 " Unsupported reply type for this request");
178 return ldb_module_done(ac->req, NULL, NULL,
179 LDB_ERR_OPERATIONS_ERROR);
182 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
184 case LDB_REPLY_DONE:
185 if (ares->error == LDB_SUCCESS) {
186 ac->got_success = true;
188 if (ac->req->operation == LDB_EXTENDED) {
189 /* FIXME: check for ares->response, replmd does not fill it ! */
190 if (ares->response) {
191 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
192 ldb_set_errstring(ldb_module_get_ctx(ac->module),
193 "partition_req_callback:"
194 " Unknown extended reply, "
195 "only supports START_TLS");
196 talloc_free(ares);
197 return ldb_module_done(ac->req, NULL, NULL,
198 LDB_ERR_OPERATIONS_ERROR);
203 ac->finished_requests++;
204 if (ac->finished_requests == ac->num_requests) {
205 /* this was the last one, call callback */
206 return ldb_module_done(ac->req, ares->controls,
207 ares->response,
208 ac->got_success?LDB_SUCCESS:ares->error);
211 /* not the last, now call the next one */
212 module = ac->part_req[ac->finished_requests].module;
213 nreq = ac->part_req[ac->finished_requests].req;
215 ret = partition_request(module, nreq);
216 if (ret != LDB_SUCCESS) {
217 talloc_free(ares);
218 return ldb_module_done(ac->req, NULL, NULL, ret);
221 break;
224 talloc_free(ares);
225 return LDB_SUCCESS;
228 static int partition_prep_request(struct partition_context *ac,
229 struct dsdb_partition *partition)
231 int ret;
232 struct ldb_request *req;
234 ac->part_req = talloc_realloc(ac, ac->part_req,
235 struct part_request,
236 ac->num_requests + 1);
237 if (ac->part_req == NULL) {
238 ldb_oom(ldb_module_get_ctx(ac->module));
239 return LDB_ERR_OPERATIONS_ERROR;
242 switch (ac->req->operation) {
243 case LDB_SEARCH:
244 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
245 ac->part_req,
246 ac->req->op.search.base,
247 ac->req->op.search.scope,
248 ac->req->op.search.tree,
249 ac->req->op.search.attrs,
250 ac->req->controls,
251 ac, partition_req_callback,
252 ac->req);
253 break;
254 case LDB_ADD:
255 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
256 ac->req->op.add.message,
257 ac->req->controls,
258 ac, partition_req_callback,
259 ac->req);
260 break;
261 case LDB_MODIFY:
262 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
263 ac->req->op.mod.message,
264 ac->req->controls,
265 ac, partition_req_callback,
266 ac->req);
267 break;
268 case LDB_DELETE:
269 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
270 ac->req->op.del.dn,
271 ac->req->controls,
272 ac, partition_req_callback,
273 ac->req);
274 break;
275 case LDB_RENAME:
276 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
277 ac->req->op.rename.olddn,
278 ac->req->op.rename.newdn,
279 ac->req->controls,
280 ac, partition_req_callback,
281 ac->req);
282 break;
283 case LDB_EXTENDED:
284 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
285 ac->part_req,
286 ac->req->op.extended.oid,
287 ac->req->op.extended.data,
288 ac->req->controls,
289 ac, partition_req_callback,
290 ac->req);
291 break;
292 default:
293 ldb_set_errstring(ldb_module_get_ctx(ac->module),
294 "Unsupported request type!");
295 ret = LDB_ERR_UNWILLING_TO_PERFORM;
298 if (ret != LDB_SUCCESS) {
299 return ret;
302 ac->part_req[ac->num_requests].req = req;
304 if (ac->req->controls) {
305 req->controls = talloc_memdup(req, ac->req->controls,
306 talloc_get_size(ac->req->controls));
307 if (req->controls == NULL) {
308 ldb_oom(ldb_module_get_ctx(ac->module));
309 return LDB_ERR_OPERATIONS_ERROR;
313 if (partition) {
314 ac->part_req[ac->num_requests].module = partition->module;
316 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
317 ret = ldb_request_add_control(req,
318 DSDB_CONTROL_CURRENT_PARTITION_OID,
319 false, partition->ctrl);
320 if (ret != LDB_SUCCESS) {
321 return ret;
325 if (req->operation == LDB_SEARCH) {
326 /* If the search is for 'more' than this partition,
327 * then change the basedn, so a remote LDAP server
328 * doesn't object */
329 if (ldb_dn_compare_base(partition->ctrl->dn,
330 req->op.search.base) != 0) {
331 req->op.search.base = partition->ctrl->dn;
335 } else {
336 /* make sure you put the module here, or
337 * or ldb_next_request() will skip a module */
338 ac->part_req[ac->num_requests].module = ac->module;
341 ac->num_requests++;
343 return LDB_SUCCESS;
346 static int partition_call_first(struct partition_context *ac)
348 return partition_request(ac->part_req[0].module, ac->part_req[0].req);
352 * Send a request down to all the partitions
354 static int partition_send_all(struct ldb_module *module,
355 struct partition_context *ac,
356 struct ldb_request *req)
358 int i;
359 struct partition_private_data *data = talloc_get_type(module->private_data,
360 struct partition_private_data);
361 int ret = partition_prep_request(ac, NULL);
362 if (ret != LDB_SUCCESS) {
363 return ret;
365 for (i=0; data && data->partitions && data->partitions[i]; i++) {
366 ret = partition_prep_request(ac, data->partitions[i]);
367 if (ret != LDB_SUCCESS) {
368 return ret;
372 /* fire the first one */
373 return partition_call_first(ac);
377 * Figure out which backend a request needs to be aimed at. Some
378 * requests must be replicated to all backends
380 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn)
382 struct partition_context *ac;
383 unsigned i;
384 int ret;
385 struct dsdb_partition *partition;
386 struct partition_private_data *data = talloc_get_type(module->private_data,
387 struct partition_private_data);
388 if (!data || !data->partitions) {
389 return ldb_next_request(module, req);
392 if (req->operation != LDB_SEARCH && ldb_dn_is_special(dn)) {
393 /* Is this a special DN, we need to replicate to every backend? */
394 for (i=0; data->replicate && data->replicate[i]; i++) {
395 if (ldb_dn_compare(data->replicate[i],
396 dn) == 0) {
398 ac = partition_init_ctx(module, req);
399 if (!ac) {
400 return LDB_ERR_OPERATIONS_ERROR;
403 return partition_send_all(module, ac, req);
408 /* Otherwise, we need to find the partition to fire it to */
410 /* Find partition */
411 partition = find_partition(data, dn, req);
412 if (!partition) {
414 * if we haven't found a matching partition
415 * pass the request to the main ldb
417 * TODO: we should maybe return an error here
418 * if it's not a special dn
421 return ldb_next_request(module, req);
424 ac = partition_init_ctx(module, req);
425 if (!ac) {
426 return LDB_ERR_OPERATIONS_ERROR;
429 /* we need to add a control but we never touch the original request */
430 ret = partition_prep_request(ac, partition);
431 if (ret != LDB_SUCCESS) {
432 return ret;
435 /* fire the first one */
436 return partition_call_first(ac);
439 /* search */
440 static int partition_search(struct ldb_module *module, struct ldb_request *req)
442 int ret;
443 struct ldb_control **saved_controls;
444 /* Find backend */
445 struct partition_private_data *data = talloc_get_type(module->private_data,
446 struct partition_private_data);
448 /* issue request */
450 /* (later) consider if we should be searching multiple
451 * partitions (for 'invisible' partition behaviour */
453 struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
454 struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
456 struct ldb_search_options_control *search_options = NULL;
457 struct dsdb_partition *p;
459 ret = partition_reload_if_required(module, data);
460 if (ret != LDB_SUCCESS) {
461 return ret;
464 p = find_partition(data, NULL, req);
465 if (p != NULL) {
466 /* the caller specified what partition they want the
467 * search - just pass it on
469 return ldb_next_request(p->module, req);
473 if (search_control) {
474 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
477 /* Remove the domain_scope control, so we don't confuse a backend server */
478 if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
479 ldb_oom(ldb_module_get_ctx(module));
480 return LDB_ERR_OPERATIONS_ERROR;
484 * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
485 * down as uncritical to make windows 2008 dcpromo happy.
487 if (search_control) {
488 search_control->critical = 0;
491 /* TODO:
492 Generate referrals (look for a partition under this DN) if we don't have the above control specified
495 if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
496 int i;
497 struct partition_context *ac;
498 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
499 /* We have processed this flag, so we are done with this control now */
501 /* Remove search control, so we don't confuse a backend server */
502 if (search_control && !save_controls(search_control, req, &saved_controls)) {
503 ldb_oom(ldb_module_get_ctx(module));
504 return LDB_ERR_OPERATIONS_ERROR;
507 ac = partition_init_ctx(module, req);
508 if (!ac) {
509 return LDB_ERR_OPERATIONS_ERROR;
512 /* Search from the base DN */
513 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
514 return partition_send_all(module, ac, req);
516 for (i=0; data && data->partitions && data->partitions[i]; i++) {
517 bool match = false, stop = false;
518 /* Find all partitions under the search base
520 we match if:
522 1) the DN we are looking for exactly matches the partition
524 2) the DN we are looking for is a parent of the partition and it isn't
525 a scope base search
527 3) the DN we are looking for is a child of the partition
529 if (ldb_dn_compare(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
530 match = true;
531 if (req->op.search.scope == LDB_SCOPE_BASE) {
532 stop = true;
535 if (!match &&
536 (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->ctrl->dn) == 0 &&
537 req->op.search.scope != LDB_SCOPE_BASE)) {
538 match = true;
540 if (!match &&
541 ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
542 match = true;
543 stop = true; /* note that this relies on partition ordering */
545 if (match) {
546 ret = partition_prep_request(ac, data->partitions[i]);
547 if (ret != LDB_SUCCESS) {
548 return ret;
551 if (stop) break;
554 /* Perhaps we didn't match any partitions. Try the main partition, only */
555 if (ac->num_requests == 0) {
556 talloc_free(ac);
557 return ldb_next_request(module, req);
560 /* fire the first one */
561 return partition_call_first(ac);
563 } else {
564 /* Handle this like all other requests */
565 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
566 /* We have processed this flag, so we are done with this control now */
568 /* Remove search control, so we don't confuse a backend server */
569 if (search_control && !save_controls(search_control, req, &saved_controls)) {
570 ldb_oom(ldb_module_get_ctx(module));
571 return LDB_ERR_OPERATIONS_ERROR;
575 return partition_replicate(module, req, req->op.search.base);
579 /* add */
580 static int partition_add(struct ldb_module *module, struct ldb_request *req)
582 return partition_replicate(module, req, req->op.add.message->dn);
585 /* modify */
586 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
588 return partition_replicate(module, req, req->op.mod.message->dn);
591 /* delete */
592 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
594 return partition_replicate(module, req, req->op.del.dn);
597 /* rename */
598 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
600 /* Find backend */
601 struct dsdb_partition *backend, *backend2;
603 struct partition_private_data *data = talloc_get_type(module->private_data,
604 struct partition_private_data);
606 /* Skip the lot if 'data' isn't here yet (initialisation) */
607 if (!data) {
608 return LDB_ERR_OPERATIONS_ERROR;
611 backend = find_partition(data, req->op.rename.olddn, req);
612 backend2 = find_partition(data, req->op.rename.newdn, req);
614 if ((backend && !backend2) || (!backend && backend2)) {
615 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
618 if (backend != backend2) {
619 ldb_asprintf_errstring(ldb_module_get_ctx(module),
620 "Cannot rename from %s in %s to %s in %s: %s",
621 ldb_dn_get_linearized(req->op.rename.olddn),
622 ldb_dn_get_linearized(backend->ctrl->dn),
623 ldb_dn_get_linearized(req->op.rename.newdn),
624 ldb_dn_get_linearized(backend2->ctrl->dn),
625 ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
626 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
629 return partition_replicate(module, req, req->op.rename.olddn);
632 /* start a transaction */
633 static int partition_start_trans(struct ldb_module *module)
635 int i, ret;
636 struct partition_private_data *data = talloc_get_type(module->private_data,
637 struct partition_private_data);
638 /* Look at base DN */
639 /* Figure out which partition it is under */
640 /* Skip the lot if 'data' isn't here yet (initialization) */
641 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
642 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
644 ret = ldb_next_start_trans(module);
645 if (ret != LDB_SUCCESS) {
646 return ret;
649 ret = partition_reload_if_required(module, data);
650 if (ret != LDB_SUCCESS) {
651 return ret;
654 for (i=0; data && data->partitions && data->partitions[i]; i++) {
655 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
656 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
657 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
659 ret = ldb_next_start_trans(data->partitions[i]->module);
660 if (ret != LDB_SUCCESS) {
661 /* Back it out, if it fails on one */
662 for (i--; i >= 0; i--) {
663 ldb_next_del_trans(data->partitions[i]->module);
665 ldb_next_del_trans(module);
666 return ret;
670 data->in_transaction++;
672 return LDB_SUCCESS;
675 /* prepare for a commit */
676 static int partition_prepare_commit(struct ldb_module *module)
678 int i;
679 struct partition_private_data *data = talloc_get_type(module->private_data,
680 struct partition_private_data);
682 for (i=0; data && data->partitions && data->partitions[i]; i++) {
683 int ret;
685 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
686 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
687 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
689 ret = ldb_next_prepare_commit(data->partitions[i]->module);
690 if (ret != LDB_SUCCESS) {
691 ldb_asprintf_errstring(module->ldb, "prepare_commit error on %s: %s", ldb_dn_get_linearized(data->partitions[i]->ctrl->dn), ldb_errstring(module->ldb));
692 return ret;
696 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
697 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
699 return ldb_next_prepare_commit(module);
703 /* end a transaction */
704 static int partition_end_trans(struct ldb_module *module)
706 int i, ret, ret2;
707 struct partition_private_data *data = talloc_get_type(module->private_data,
708 struct partition_private_data);
710 ret = LDB_SUCCESS;
712 if (data->in_transaction == 0) {
713 DEBUG(0,("partition end transaction mismatch\n"));
714 ret = LDB_ERR_OPERATIONS_ERROR;
715 } else {
716 data->in_transaction--;
719 for (i=0; data && data->partitions && data->partitions[i]; i++) {
720 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
721 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> %s",
722 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
724 ret2 = ldb_next_end_trans(data->partitions[i]->module);
725 if (ret2 != LDB_SUCCESS) {
726 ldb_asprintf_errstring(module->ldb, "end_trans error on %s: %s", ldb_dn_get_linearized(data->partitions[i]->ctrl->dn), ldb_errstring(module->ldb));
727 ret = ret2;
731 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
732 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
734 ret2 = ldb_next_end_trans(module);
735 if (ret2 != LDB_SUCCESS) {
736 ret = ret2;
738 return ret;
741 /* delete a transaction */
742 static int partition_del_trans(struct ldb_module *module)
744 int i, ret, final_ret = LDB_SUCCESS;
745 struct partition_private_data *data = talloc_get_type(module->private_data,
746 struct partition_private_data);
747 for (i=0; data && data->partitions && data->partitions[i]; i++) {
748 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
749 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> %s",
750 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
752 ret = ldb_next_del_trans(data->partitions[i]->module);
753 if (ret != LDB_SUCCESS) {
754 ldb_asprintf_errstring(module->ldb, "del_trans error on %s: %s", ldb_dn_get_linearized(data->partitions[i]->ctrl->dn), ldb_errstring(module->ldb));
755 final_ret = ret;
759 if (data->in_transaction == 0) {
760 DEBUG(0,("partition del transaction mismatch\n"));
761 return LDB_ERR_OPERATIONS_ERROR;
763 data->in_transaction--;
765 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
766 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
768 ret = ldb_next_del_trans(module);
769 if (ret != LDB_SUCCESS) {
770 final_ret = ret;
772 return final_ret;
775 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx,
776 enum ldb_sequence_type type, uint64_t *seq_number)
778 int ret;
779 struct ldb_result *res;
780 struct ldb_seqnum_request *tseq;
781 struct ldb_request *treq;
782 struct ldb_seqnum_result *seqr;
783 res = talloc_zero(mem_ctx, struct ldb_result);
784 if (res == NULL) {
785 return LDB_ERR_OPERATIONS_ERROR;
787 tseq = talloc_zero(res, struct ldb_seqnum_request);
788 if (tseq == NULL) {
789 talloc_free(res);
790 return LDB_ERR_OPERATIONS_ERROR;
792 tseq->type = type;
794 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
795 LDB_EXTENDED_SEQUENCE_NUMBER,
796 tseq,
797 NULL,
798 res,
799 ldb_extended_default_callback,
800 NULL);
801 if (ret != LDB_SUCCESS) {
802 talloc_free(res);
803 return ret;
806 ret = ldb_next_request(module, treq);
807 if (ret != LDB_SUCCESS) {
808 talloc_free(res);
809 return ret;
811 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
812 if (ret != LDB_SUCCESS) {
813 talloc_free(res);
814 return ret;
817 seqr = talloc_get_type(res->extended->data,
818 struct ldb_seqnum_result);
819 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
820 ret = LDB_ERR_OPERATIONS_ERROR;
821 ldb_set_errstring(ldb_module_get_ctx(module), "Primary backend in partitions module returned a timestamp based seq number (must return a normal number)");
822 talloc_free(res);
823 return ret;
824 } else {
825 *seq_number = seqr->seq_num;
827 talloc_free(res);
828 return LDB_SUCCESS;
831 /* FIXME: This function is still semi-async */
832 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
834 int i, ret;
835 uint64_t seq_number = 0;
836 uint64_t timestamp_sequence = 0;
837 uint64_t timestamp = 0;
838 struct partition_private_data *data = talloc_get_type(module->private_data,
839 struct partition_private_data);
840 struct ldb_seqnum_request *seq;
841 struct ldb_seqnum_result *seqr;
842 struct ldb_request *treq;
843 struct ldb_seqnum_request *tseq;
844 struct ldb_seqnum_result *tseqr;
845 struct ldb_extended *ext;
846 struct ldb_result *res;
847 struct dsdb_partition *p;
849 p = find_partition(data, NULL, req);
850 if (p != NULL) {
851 /* the caller specified what partition they want the
852 * sequence number operation on - just pass it on
854 return ldb_next_request(p->module, req);
857 seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
859 switch (seq->type) {
860 case LDB_SEQ_NEXT:
861 case LDB_SEQ_HIGHEST_SEQ:
863 ret = partition_primary_sequence_number(module, req, seq->type, &seq_number);
864 if (ret != LDB_SUCCESS) {
865 return ret;
868 /* Skip the lot if 'data' isn't here yet (initialisation) */
869 for (i=0; data && data->partitions && data->partitions[i]; i++) {
871 res = talloc_zero(req, struct ldb_result);
872 if (res == NULL) {
873 return LDB_ERR_OPERATIONS_ERROR;
875 tseq = talloc_zero(res, struct ldb_seqnum_request);
876 if (tseq == NULL) {
877 talloc_free(res);
878 return LDB_ERR_OPERATIONS_ERROR;
880 tseq->type = seq->type;
882 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
883 LDB_EXTENDED_SEQUENCE_NUMBER,
884 tseq,
885 NULL,
886 res,
887 ldb_extended_default_callback,
888 NULL);
889 if (ret != LDB_SUCCESS) {
890 talloc_free(res);
891 return ret;
894 if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
895 ret = ldb_request_add_control(treq,
896 DSDB_CONTROL_CURRENT_PARTITION_OID,
897 false, data->partitions[i]->ctrl);
898 if (ret != LDB_SUCCESS) {
899 talloc_free(res);
900 return ret;
904 ret = partition_request(data->partitions[i]->module, treq);
905 if (ret != LDB_SUCCESS) {
906 talloc_free(res);
907 return ret;
909 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
910 if (ret != LDB_SUCCESS) {
911 talloc_free(res);
912 return ret;
914 tseqr = talloc_get_type(res->extended->data,
915 struct ldb_seqnum_result);
916 if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
917 timestamp_sequence = MAX(timestamp_sequence,
918 tseqr->seq_num);
919 } else {
920 seq_number += tseqr->seq_num;
922 talloc_free(res);
924 /* fall through */
925 case LDB_SEQ_HIGHEST_TIMESTAMP:
927 res = talloc_zero(req, struct ldb_result);
928 if (res == NULL) {
929 return LDB_ERR_OPERATIONS_ERROR;
932 tseq = talloc_zero(res, struct ldb_seqnum_request);
933 if (tseq == NULL) {
934 talloc_free(res);
935 return LDB_ERR_OPERATIONS_ERROR;
937 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
939 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
940 LDB_EXTENDED_SEQUENCE_NUMBER,
941 tseq,
942 NULL,
943 res,
944 ldb_extended_default_callback,
945 NULL);
946 if (ret != LDB_SUCCESS) {
947 talloc_free(res);
948 return ret;
951 ret = ldb_next_request(module, treq);
952 if (ret != LDB_SUCCESS) {
953 talloc_free(res);
954 return ret;
956 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
957 if (ret != LDB_SUCCESS) {
958 talloc_free(res);
959 return ret;
962 tseqr = talloc_get_type(res->extended->data,
963 struct ldb_seqnum_result);
964 timestamp = tseqr->seq_num;
966 talloc_free(res);
968 /* Skip the lot if 'data' isn't here yet (initialisation) */
969 for (i=0; data && data->partitions && data->partitions[i]; i++) {
971 res = talloc_zero(req, struct ldb_result);
972 if (res == NULL) {
973 return LDB_ERR_OPERATIONS_ERROR;
976 tseq = talloc_zero(res, struct ldb_seqnum_request);
977 if (tseq == NULL) {
978 talloc_free(res);
979 return LDB_ERR_OPERATIONS_ERROR;
981 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
983 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
984 LDB_EXTENDED_SEQUENCE_NUMBER,
985 tseq,
986 NULL,
987 res,
988 ldb_extended_default_callback,
989 NULL);
990 if (ret != LDB_SUCCESS) {
991 talloc_free(res);
992 return ret;
995 if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
996 ret = ldb_request_add_control(treq,
997 DSDB_CONTROL_CURRENT_PARTITION_OID,
998 false, data->partitions[i]->ctrl);
999 if (ret != LDB_SUCCESS) {
1000 talloc_free(res);
1001 return ret;
1005 ret = partition_request(data->partitions[i]->module, treq);
1006 if (ret != LDB_SUCCESS) {
1007 talloc_free(res);
1008 return ret;
1010 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1011 if (ret != LDB_SUCCESS) {
1012 talloc_free(res);
1013 return ret;
1016 tseqr = talloc_get_type(res->extended->data,
1017 struct ldb_seqnum_result);
1018 timestamp = MAX(timestamp, tseqr->seq_num);
1020 talloc_free(res);
1023 break;
1026 ext = talloc_zero(req, struct ldb_extended);
1027 if (!ext) {
1028 return LDB_ERR_OPERATIONS_ERROR;
1030 seqr = talloc_zero(ext, struct ldb_seqnum_result);
1031 if (seqr == NULL) {
1032 talloc_free(ext);
1033 return LDB_ERR_OPERATIONS_ERROR;
1035 ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1036 ext->data = seqr;
1038 switch (seq->type) {
1039 case LDB_SEQ_NEXT:
1040 case LDB_SEQ_HIGHEST_SEQ:
1042 /* Has someone above set a timebase sequence? */
1043 if (timestamp_sequence) {
1044 seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1045 } else {
1046 seqr->seq_num = seq_number;
1049 if (timestamp_sequence > seqr->seq_num) {
1050 seqr->seq_num = timestamp_sequence;
1051 seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1054 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1055 break;
1056 case LDB_SEQ_HIGHEST_TIMESTAMP:
1057 seqr->seq_num = timestamp;
1058 break;
1061 if (seq->type == LDB_SEQ_NEXT) {
1062 seqr->seq_num++;
1065 /* send request done */
1066 return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1069 /* extended */
1070 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1072 struct partition_private_data *data;
1073 struct partition_context *ac;
1074 int ret;
1076 data = talloc_get_type(module->private_data, struct partition_private_data);
1077 if (!data) {
1078 return ldb_next_request(module, req);
1081 ret = partition_reload_if_required(module, data);
1082 if (ret != LDB_SUCCESS) {
1083 return ret;
1086 if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1087 return partition_sequence_number(module, req);
1090 if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1091 return partition_create(module, req);
1095 * as the extended operation has no dn
1096 * we need to send it to all partitions
1099 ac = partition_init_ctx(module, req);
1100 if (!ac) {
1101 return LDB_ERR_OPERATIONS_ERROR;
1104 return partition_send_all(module, ac, req);
1107 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1108 .name = "partition",
1109 .init_context = partition_init,
1110 .search = partition_search,
1111 .add = partition_add,
1112 .modify = partition_modify,
1113 .del = partition_delete,
1114 .rename = partition_rename,
1115 .extended = partition_extended,
1116 .start_transaction = partition_start_trans,
1117 .prepare_commit = partition_prepare_commit,
1118 .end_transaction = partition_end_trans,
1119 .del_transaction = partition_del_trans,